zephyr/subsys/rtio/rtio_executor_concurrent.c
Tom Burdick 912e7ff863 rtio: Add callback op
Adds a callback op to RTIO enabling C logic to be interspersed with
I/O operations. This is not safe from userspace but allowable in kernel
space.

Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
2023-04-03 09:51:02 +02:00

289 lines
7.9 KiB
C

/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/spinlock.h>
#include <zephyr/rtio/rtio_executor_concurrent.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(rtio_executor_concurrent, CONFIG_RTIO_LOG_LEVEL);
#include "rtio_executor_common.h"
#define CONEX_TASK_COMPLETE BIT(0)
#define CONEX_TASK_SUSPENDED BIT(1)
/**
* @file
* @brief Concurrent RTIO Executor
*
* The concurrent executor provides fixed amounts of concurrency
* using minimal overhead but assumes a small number of concurrent tasks.
*
* Many of the task lookup and management functions in here are O(N) over N
* tasks. This is fine when the task set is *small*. Task lookup could be
* improved in the future with a binary search at the expense of code size.
*
* The assumption here is that perhaps only 8-16 concurrent tasks are likely
* such that simple short for loops over task array are reasonably fast.
*
* A maximum of 65K submissions queue entries are possible.
*/
/**
* check if there is a free task available
*/
static bool conex_task_free(struct rtio_concurrent_executor *exc)
{
return (exc->task_in - exc->task_out) < (exc->task_mask + 1);
}
/**
* get the next free available task index
*/
static uint16_t conex_task_next(struct rtio_concurrent_executor *exc)
{
uint16_t task_id = exc->task_in;
exc->task_in++;
return task_id & exc->task_mask;
}
static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc,
const struct rtio_iodev_sqe *iodev_sqe)
{
__ASSERT_NO_MSG(iodev_sqe <= &exc->task_cur[exc->task_mask] &&
iodev_sqe >= &exc->task_cur[0]);
return iodev_sqe - &exc->task_cur[0];
}
static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc)
{
struct rtio_sqe *sqe = rtio_spsc_consume(r->sq);
while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) {
rtio_spsc_release(r->sq);
sqe = rtio_spsc_consume(r->sq);
}
rtio_spsc_release(r->sq);
if (sqe == exc->last_sqe) {
exc->last_sqe = NULL;
}
}
/**
* @brief Sweep like a GC of sorts old tasks that are completed in order
*
* Will only sweep tasks in the order they arrived in the submission queue.
* Meaning there might be completed tasks that could be freed but are not yet
* because something before it has not yet completed.
*/
static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
{
/* In order sweep up */
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) {
LOG_DBG("sweeping oldest task %d", task_id);
conex_sweep_task(r, exc);
exc->task_out++;
} else {
break;
}
}
}
/**
* @brief Prepare tasks to run by iterating through the submission queue
*
* For each submission in the queue that begins a chain or transaction
* start a task if possible. Concurrency is limited by the allocated concurrency
* per executor instance.
*/
static void conex_prepare(struct rtio *r, struct rtio_concurrent_executor *exc)
{
struct rtio_sqe *sqe, *last_sqe;
/* If never submitted before peek at the first item
* otherwise start back up where the last submit call
* left off
*/
if (exc->last_sqe == NULL) {
last_sqe = NULL;
sqe = rtio_spsc_peek(r->sq);
} else {
last_sqe = exc->last_sqe;
sqe = rtio_spsc_next(r->sq, last_sqe);
}
LOG_DBG("starting at sqe %p, last %p", sqe, exc->last_sqe);
while (sqe != NULL && conex_task_free(exc)) {
/* Get the next free task id */
uint16_t task_idx = conex_task_next(exc);
LOG_DBG("preparing task %d, sqe %p", task_idx, sqe);
/* Setup task */
exc->task_cur[task_idx].sqe = sqe;
exc->task_cur[task_idx].r = r;
exc->task_status[task_idx] = CONEX_TASK_SUSPENDED;
/* Go to the next sqe not in the current chain or transaction */
while (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION)) {
sqe = rtio_spsc_next(r->sq, sqe);
}
/* SQE is the end of the previous chain or transaction so skip it */
last_sqe = sqe;
sqe = rtio_spsc_next(r->sq, sqe);
}
/* Out of available tasks so remember where we left off to begin again once tasks free up */
exc->last_sqe = last_sqe;
}
/**
* @brief Resume tasks that are suspended
*
* All tasks begin as suspended tasks. This kicks them off to the submissions
* associated iodev.
*/
static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
{
/* In order resume tasks */
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) {
LOG_DBG("resuming suspended task %d", task_id);
exc->task_status[task_id & exc->task_mask] &= ~CONEX_TASK_SUSPENDED;
rtio_executor_submit(&exc->task_cur[task_id & exc->task_mask]);
}
}
}
/**
* @brief Submit submissions to concurrent executor
*
* @param r RTIO context
*
* @retval 0 Always succeeds
*/
int rtio_concurrent_submit(struct rtio *r)
{
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
k_spinlock_key_t key;
key = k_spin_lock(&exc->lock);
/* Prepare tasks to run, they start in a suspended state */
conex_prepare(r, exc);
/* Resume all suspended tasks */
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
return 0;
}
/**
* @brief Callback from an iodev describing success
*/
void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
{
struct rtio *r = iodev_sqe->r;
const struct rtio_sqe *sqe = iodev_sqe->sqe;
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
const struct rtio_sqe *next_sqe;
k_spinlock_key_t key;
/* Interrupt may occur in spsc_acquire, breaking the contract
* so spin around it effectively preventing another interrupt on
* this core, and another core trying to concurrently work in here.
*/
key = k_spin_lock(&exc->lock);
LOG_DBG("completed sqe %p", sqe);
/* Determine the task id by memory offset O(1) */
uint16_t task_id = conex_task_id(exc, iodev_sqe);
if (sqe->flags & RTIO_SQE_CHAINED) {
next_sqe = rtio_spsc_next(r->sq, sqe);
exc->task_cur[task_id].sqe = next_sqe;
rtio_executor_submit(&exc->task_cur[task_id]);
} else {
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
}
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
while (transaction) {
sqe = rtio_spsc_next(r->sq, sqe);
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
}
conex_sweep(r, exc);
rtio_cqe_submit(r, result, sqe->userdata);
conex_prepare(r, exc);
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
}
/**
* @brief Callback from an iodev describing error
*/
void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
{
k_spinlock_key_t key;
struct rtio *r = iodev_sqe->r;
const struct rtio_sqe *sqe = iodev_sqe->sqe;
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
void *userdata = sqe->userdata;
bool chained = sqe->flags & RTIO_SQE_CHAINED;
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
uint16_t task_id = conex_task_id(exc, iodev_sqe);
/* Another interrupt (and sqe complete) may occur in spsc_acquire,
* breaking the contract so spin around it effectively preventing another
* interrupt on this core, and another core trying to concurrently work
* in here.
*/
key = k_spin_lock(&exc->lock);
if (!transaction) {
rtio_cqe_submit(r, result, userdata);
}
/* While the last sqe was marked as chained or transactional, do more work */
while (chained | transaction) {
sqe = rtio_spsc_next(r->sq, sqe);
chained = sqe->flags & RTIO_SQE_CHAINED;
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
userdata = sqe->userdata;
if (!transaction) {
rtio_cqe_submit(r, result, userdata);
} else {
rtio_cqe_submit(r, -ECANCELED, userdata);
}
}
/* Determine the task id : O(1) */
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
conex_sweep(r, exc);
conex_prepare(r, exc);
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
}