rtio: Add support for multishot reads

- Introduce multishot reads which remain on the SQ until canceled

Signed-off-by: Yuval Peress <peress@google.com>
This commit is contained in:
Yuval Peress 2023-05-03 22:44:56 -06:00 committed by Anas Nashif
parent 2c30920b40
commit 7153157f88
4 changed files with 147 additions and 21 deletions

View file

@ -130,6 +130,14 @@ extern "C" {
*/
#define RTIO_SQE_CANCELED BIT(3)
/**
* @brief The SQE should continue producing CQEs until canceled
*
* This flag must exist along :c:macro:`RTIO_SQE_MEMPOOL_BUFFER` and signals that when a read is
* complete. It should be placed back in queue until canceled.
*/
#define RTIO_SQE_MULTISHOT BIT(4)
/**
* @}
*/
@ -269,22 +277,6 @@ struct rtio_cqe {
uint32_t flags; /**< Flags associated with the operation */
};
/**
* Internal state of the mempool sqe map entry.
*/
enum rtio_mempool_entry_state {
/** The SQE has no mempool buffer allocated */
RTIO_MEMPOOL_ENTRY_STATE_FREE = 0,
/** The SQE has an active mempool buffer allocated */
RTIO_MEMPOOL_ENTRY_STATE_ALLOCATED,
/** The SQE has a mempool buffer allocated that is currently owned by a CQE */
RTIO_MEMPOOL_ENTRY_STATE_ZOMBIE,
RTIO_MEMPOOL_ENTRY_STATE_COUNT,
};
/* Check that we can always fit the state in 2 bits */
BUILD_ASSERT(RTIO_MEMPOOL_ENTRY_STATE_COUNT < 4);
struct rtio_sqe_pool {
struct rtio_mpsc free_q;
const uint16_t pool_size;
@ -493,6 +485,14 @@ static inline void rtio_sqe_prep_read_with_pool(struct rtio_sqe *sqe,
sqe->flags = RTIO_SQE_MEMPOOL_BUFFER;
}
static inline void rtio_sqe_prep_read_multishot(struct rtio_sqe *sqe,
const struct rtio_iodev *iodev, int8_t prio,
void *userdata)
{
rtio_sqe_prep_read_with_pool(sqe, iodev, prio, userdata);
sqe->flags |= RTIO_SQE_MULTISHOT;
}
/**
* @brief Prepare a write op submission
*/

View file

@ -98,8 +98,38 @@ void rtio_executor_submit(struct rtio *r)
}
}
/**
* @brief Handle common logic when :c:macro:`RTIO_SQE_MULTISHOT` is set
*
* @param[in] r RTIO context
* @param[in] curr Current IODev SQE that's being marked for finished.
* @param[in] is_canceled Whether or not the SQE is canceled
*/
static inline void rtio_executor_handle_multishot(struct rtio *r, struct rtio_iodev_sqe *curr,
bool is_canceled)
{
/* Reset the mempool if needed */
if (curr->sqe.op == RTIO_OP_RX && FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, curr->sqe.flags)) {
if (is_canceled) {
/* Free the memory first since no CQE will be generated */
LOG_DBG("Releasing memory @%p size=%u", (void *)curr->sqe.buf,
curr->sqe.buf_len);
rtio_release_buffer(r, curr->sqe.buf, curr->sqe.buf_len);
}
/* Reset the buffer info so the next request can get a new one */
curr->sqe.buf = NULL;
curr->sqe.buf_len = 0;
}
if (!is_canceled) {
/* Request was not canceled, put the SQE back in the queue */
rtio_mpsc_push(&r->sq, &curr->q);
rtio_executor_submit(r);
}
}
static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
{
const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;
const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
struct rtio *r = iodev_sqe->r;
struct rtio_iodev_sqe *curr = iodev_sqe, *next;
@ -112,10 +142,13 @@ static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int resu
cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
next = rtio_iodev_sqe_next(curr);
/* SQE is no longer needed, release it */
rtio_sqe_pool_free(r->sqe_pool, curr);
if (is_multishot) {
rtio_executor_handle_multishot(r, curr, is_canceled);
}
if (!is_multishot || is_canceled) {
/* SQE is no longer needed, release it */
rtio_sqe_pool_free(r->sqe_pool, curr);
}
if (!is_canceled) {
/* Request was not canceled, generate a CQE */
rtio_cqe_submit(r, result, userdata, cqe_flags);

View file

@ -58,10 +58,20 @@ out:
static void rtio_iodev_timer_fn(struct k_timer *tm)
{
static struct rtio_iodev_sqe *last_iodev_sqe;
static int consecutive_sqes;
struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer);
struct rtio_iodev_sqe *iodev_sqe = data->txn_curr;
struct rtio_iodev *iodev = (struct rtio_iodev *)data->txn_head->sqe.iodev;
if (iodev_sqe == last_iodev_sqe) {
consecutive_sqes++;
} else {
consecutive_sqes = 0;
}
last_iodev_sqe = iodev_sqe;
if (iodev_sqe->sqe.op == RTIO_OP_RX) {
uint8_t *buf;
uint32_t buf_len;
@ -93,7 +103,11 @@ static void rtio_iodev_timer_fn(struct k_timer *tm)
data->txn_head = NULL;
data->txn_curr = NULL;
rtio_iodev_test_next(iodev);
rtio_iodev_sqe_ok(iodev_sqe, 0);
if (consecutive_sqes == 0) {
rtio_iodev_sqe_ok(iodev_sqe, 0);
} else {
rtio_iodev_sqe_err(iodev_sqe, consecutive_sqes);
}
}
static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe)

View file

@ -503,6 +503,85 @@ ZTEST(rtio_api, test_rtio_transaction_cancel)
#endif
}
static inline void test_rtio_simple_multishot_(struct rtio *r, int idx)
{
int res;
struct rtio_sqe sqe;
struct rtio_cqe cqe;
struct rtio_sqe *handle;
for (int i = 0; i < MEM_BLK_SIZE; ++i) {
mempool_data[i] = i + idx;
}
TC_PRINT("setting up single mempool read\n");
rtio_sqe_prep_read_multishot(&sqe, (struct rtio_iodev *)&iodev_test_simple, 0,
mempool_data);
TC_PRINT("Calling rtio_sqe_copy_in()\n");
res = rtio_sqe_copy_in_get_handles(r, &sqe, &handle, 1);
zassert_ok(res);
TC_PRINT("submit with wait, handle=%p\n", handle);
res = rtio_submit(r, 1);
zassert_ok(res, "Should return ok from rtio_execute");
TC_PRINT("Calling rtio_cqe_copy_out\n");
zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER));
zassert_ok(cqe.result, "Result should be ok but got %d", cqe.result);
zassert_equal_ptr(cqe.userdata, mempool_data, "Expected userdata back");
uint8_t *buffer = NULL;
uint32_t buffer_len = 0;
TC_PRINT("Calling rtio_cqe_get_mempool_buffer\n");
zassert_ok(rtio_cqe_get_mempool_buffer(r, &cqe, &buffer, &buffer_len));
zassert_not_null(buffer, "Expected an allocated mempool buffer");
zassert_equal(buffer_len, MEM_BLK_SIZE);
zassert_mem_equal(buffer, mempool_data, MEM_BLK_SIZE, "Data expected to be the same");
TC_PRINT("Calling rtio_release_buffer\n");
rtio_release_buffer(r, buffer, buffer_len);
TC_PRINT("Waiting for next cqe\n");
zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER));
zassert_equal(1, cqe.result, "Result should be ok but got %d", cqe.result);
zassert_equal_ptr(cqe.userdata, mempool_data, "Expected userdata back");
rtio_cqe_get_mempool_buffer(r, &cqe, &buffer, &buffer_len);
rtio_release_buffer(r, buffer, buffer_len);
TC_PRINT("Canceling %p\n", handle);
rtio_sqe_cancel(handle);
/* Flush any pending CQEs */
while (rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(15)) != 0) {
rtio_cqe_get_mempool_buffer(r, &cqe, &buffer, &buffer_len);
rtio_release_buffer(r, buffer, buffer_len);
}
}
static void rtio_simple_multishot_test(void *a, void *b, void *c)
{
ARG_UNUSED(a);
ARG_UNUSED(b);
ARG_UNUSED(c);
for (int i = 0; i < TEST_REPEATS; i++) {
test_rtio_simple_multishot_(&r_simple, i);
}
}
ZTEST(rtio_api, test_rtio_multishot)
{
rtio_iodev_test_init(&iodev_test_simple);
#ifdef CONFIG_USERSPACE
k_mem_domain_add_thread(&rtio_domain, k_current_get());
rtio_access_grant(&r_simple, k_current_get());
k_object_access_grant(&iodev_test_simple, k_current_get());
k_thread_user_mode_enter(rtio_simple_multishot_test, NULL, NULL, NULL);
#else
rtio_simple_multishot_test(NULL, NULL, NULL);
#endif
}
ZTEST(rtio_api, test_rtio_syscalls)
{