zephyr/subsys/rtio/rtio_executor_concurrent.c
Tom Burdick 610d307fa0 rtio: Properly track last sqe in the queue
The pending_sqe logic to track where in the ring queue the concurrent
executor had left off was slightly flawed. It didn't account for starting
all sqes in the queue and ending back up at the beginning.

Instead track the last SQE in the queue, from which the next one in the
queue will the one to start next.

If we happen to sweep the last known SQE in the queue, reset it to NULL
so the next time prepare is called we start at the beginning of the queue
again.

Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
2023-03-17 12:49:57 -05:00

290 lines
7.9 KiB
C

/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/spinlock.h>
#include <zephyr/rtio/rtio_executor_concurrent.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(rtio_executor_concurrent, CONFIG_RTIO_LOG_LEVEL);
#define CONEX_TASK_COMPLETE BIT(0)
#define CONEX_TASK_SUSPENDED BIT(1)
/**
* @file
* @brief Concurrent RTIO Executor
*
* The concurrent executor provides fixed amounts of concurrency
* using minimal overhead but assumes a small number of concurrent tasks.
*
* Many of the task lookup and management functions in here are O(N) over N
* tasks. This is fine when the task set is *small*. Task lookup could be
* improved in the future with a binary search at the expense of code size.
*
* The assumption here is that perhaps only 8-16 concurrent tasks are likely
* such that simple short for loops over task array are reasonably fast.
*
* A maximum of 65K submissions queue entries are possible.
*/
/**
* check if there is a free task available
*/
static bool conex_task_free(struct rtio_concurrent_executor *exc)
{
return (exc->task_in - exc->task_out) < (exc->task_mask + 1);
}
/**
* get the next free available task index
*/
static uint16_t conex_task_next(struct rtio_concurrent_executor *exc)
{
uint16_t task_id = exc->task_in;
exc->task_in++;
return task_id & exc->task_mask;
}
static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc,
const struct rtio_iodev_sqe *iodev_sqe)
{
__ASSERT_NO_MSG(iodev_sqe <= &exc->task_cur[exc->task_mask] &&
iodev_sqe >= &exc->task_cur[0]);
return iodev_sqe - &exc->task_cur[0];
}
static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc)
{
struct rtio_sqe *sqe = rtio_spsc_consume(r->sq);
while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) {
rtio_spsc_release(r->sq);
sqe = rtio_spsc_consume(r->sq);
}
rtio_spsc_release(r->sq);
if (sqe == exc->last_sqe) {
exc->last_sqe = NULL;
}
}
/**
* @brief Sweep like a GC of sorts old tasks that are completed in order
*
* Will only sweep tasks in the order they arrived in the submission queue.
* Meaning there might be completed tasks that could be freed but are not yet
* because something before it has not yet completed.
*/
static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
{
/* In order sweep up */
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) {
LOG_DBG("sweeping oldest task %d", task_id);
conex_sweep_task(r, exc);
exc->task_out++;
} else {
break;
}
}
}
/**
* @brief Prepare tasks to run by iterating through the submission queue
*
* For each submission in the queue that begins a chain or transaction
* start a task if possible. Concurrency is limited by the allocated concurrency
* per executor instance.
*/
static void conex_prepare(struct rtio *r, struct rtio_concurrent_executor *exc)
{
struct rtio_sqe *sqe, *last_sqe;
/* If never submitted before peek at the first item
* otherwise start back up where the last submit call
* left off
*/
if (exc->last_sqe == NULL) {
last_sqe = NULL;
sqe = rtio_spsc_peek(r->sq);
} else {
last_sqe = exc->last_sqe;
sqe = rtio_spsc_next(r->sq, last_sqe);
}
LOG_DBG("starting at sqe %p, last %p", sqe, exc->last_sqe);
while (sqe != NULL && conex_task_free(exc)) {
/* Get the next free task id */
uint16_t task_idx = conex_task_next(exc);
LOG_DBG("preparing task %d, sqe %p", task_idx, sqe);
/* Setup task */
exc->task_cur[task_idx].sqe = sqe;
exc->task_cur[task_idx].r = r;
exc->task_status[task_idx] = CONEX_TASK_SUSPENDED;
/* Go to the next sqe not in the current chain or transaction */
while (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION)) {
sqe = rtio_spsc_next(r->sq, sqe);
}
/* SQE is the end of the previous chain or transaction so skip it */
last_sqe = sqe;
sqe = rtio_spsc_next(r->sq, sqe);
}
/* Out of available tasks so remember where we left off to begin again once tasks free up */
exc->last_sqe = last_sqe;
}
/**
* @brief Resume tasks that are suspended
*
* All tasks begin as suspended tasks. This kicks them off to the submissions
* associated iodev.
*/
static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
{
/* In order resume tasks */
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) {
LOG_DBG("resuming suspended task %d", task_id);
exc->task_status[task_id & exc->task_mask] &= ~CONEX_TASK_SUSPENDED;
rtio_iodev_submit(&exc->task_cur[task_id & exc->task_mask]);
}
}
}
/**
* @brief Submit submissions to concurrent executor
*
* @param r RTIO context
*
* @retval 0 Always succeeds
*/
int rtio_concurrent_submit(struct rtio *r)
{
struct rtio_concurrent_executor *exc =
(struct rtio_concurrent_executor *)r->executor;
k_spinlock_key_t key;
key = k_spin_lock(&exc->lock);
/* Prepare tasks to run, they start in a suspended state */
conex_prepare(r, exc);
/* Resume all suspended tasks */
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
return 0;
}
/**
* @brief Callback from an iodev describing success
*/
void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
{
struct rtio *r = iodev_sqe->r;
const struct rtio_sqe *sqe = iodev_sqe->sqe;
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
const struct rtio_sqe *next_sqe;
k_spinlock_key_t key;
/* Interrupt may occur in spsc_acquire, breaking the contract
* so spin around it effectively preventing another interrupt on
* this core, and another core trying to concurrently work in here.
*/
key = k_spin_lock(&exc->lock);
LOG_DBG("completed sqe %p", sqe);
/* Determine the task id by memory offset O(1) */
uint16_t task_id = conex_task_id(exc, iodev_sqe);
if (sqe->flags & RTIO_SQE_CHAINED) {
next_sqe = rtio_spsc_next(r->sq, sqe);
exc->task_cur[task_id].sqe = next_sqe;
rtio_iodev_submit(&exc->task_cur[task_id]);
} else {
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
}
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
while (transaction) {
sqe = rtio_spsc_next(r->sq, sqe);
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
}
conex_sweep(r, exc);
rtio_cqe_submit(r, result, sqe->userdata);
conex_prepare(r, exc);
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
}
/**
* @brief Callback from an iodev describing error
*/
void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
{
k_spinlock_key_t key;
struct rtio *r = iodev_sqe->r;
const struct rtio_sqe *sqe = iodev_sqe->sqe;
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
void *userdata = sqe->userdata;
bool chained = sqe->flags & RTIO_SQE_CHAINED;
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
uint16_t task_id = conex_task_id(exc, iodev_sqe);
/* Another interrupt (and sqe complete) may occur in spsc_acquire,
* breaking the contract so spin around it effectively preventing another
* interrupt on this core, and another core trying to concurrently work
* in here.
*/
key = k_spin_lock(&exc->lock);
if (!transaction) {
rtio_cqe_submit(r, result, userdata);
}
/* While the last sqe was marked as chained or transactional, do more work */
while (chained | transaction) {
sqe = rtio_spsc_next(r->sq, sqe);
chained = sqe->flags & RTIO_SQE_CHAINED;
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
userdata = sqe->userdata;
if (!transaction) {
rtio_cqe_submit(r, result, userdata);
} else {
rtio_cqe_submit(r, -ECANCELED, userdata);
}
}
/* Determine the task id : O(1) */
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
conex_sweep(r, exc);
conex_prepare(r, exc);
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
}