rtio: Add lock free MPSC queue for iodevs and more
Adds a lock free/wait free MPSC queue to the rtio subsystem. While the SPSC ring queue is fast and cache friendly it doesn't work for all scenarios. Particularly the case where multiple rtio contexts are attempting to work with a single iodev. An MPSC queue works perfectly in this scenario. Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
This commit is contained in:
parent
22ff4d6102
commit
e9f6eef791
140
include/zephyr/rtio/rtio_mpsc.h
Normal file
140
include/zephyr/rtio/rtio_mpsc.h
Normal file
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Copyright (c) 2010-2011 Dmitry Vyukov
|
||||
* Copyright (c) 2022 Intel Corporation
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
|
||||
#ifndef ZEPHYR_RTIO_MPSC_H_
|
||||
#define ZEPHYR_RTIO_MPSC_H_
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include <zephyr/sys/atomic.h>
|
||||
|
||||
/**
|
||||
* @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API
|
||||
* @defgroup rtio_mpsc RTIO MPSC API
|
||||
* @ingroup rtio
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file rtio_mpsc.h
|
||||
*
|
||||
* @brief A wait-free intrusive multi producer single consumer (MPSC) queue using
|
||||
* a singly linked list. Ordering is First-In-First-Out.
|
||||
*
|
||||
* Based on the well known and widely used wait-free MPSC queue described by
|
||||
* Dmitry Vyukov with some slight changes to account for needs of an
|
||||
* RTOS on a variety of archs. Both consumer and producer are wait free. No CAS
|
||||
* loop or lock is needed.
|
||||
*
|
||||
* An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop.
|
||||
*
|
||||
* @warning MPSC is *not* safe to consume in multiple execution contexts.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Queue member
|
||||
*/
|
||||
struct rtio_mpsc_node {
|
||||
atomic_ptr_t next;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief MPSC Queue
|
||||
*/
|
||||
struct rtio_mpsc {
|
||||
atomic_ptr_t head;
|
||||
struct rtio_mpsc_node *tail;
|
||||
struct rtio_mpsc_node stub;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Initialize queue
|
||||
*
|
||||
* @param q Queue to initialize or reset
|
||||
*/
|
||||
static inline void rtio_mpsc_init(struct rtio_mpsc *q)
|
||||
{
|
||||
atomic_ptr_set(&q->head, &q->stub);
|
||||
q->tail = &q->stub;
|
||||
atomic_ptr_set(&q->stub.next, NULL);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Push a node
|
||||
*
|
||||
* @param q Queue to push the node to
|
||||
* @param n Node to push into the queue
|
||||
*/
|
||||
static inline void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
|
||||
{
|
||||
struct rtio_mpsc_node *prev;
|
||||
int key;
|
||||
|
||||
atomic_ptr_set(&n->next, NULL);
|
||||
|
||||
key = arch_irq_lock();
|
||||
prev = atomic_ptr_set(&q->head, n);
|
||||
atomic_ptr_set(&prev->next, n);
|
||||
arch_irq_unlock(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Pop a node off of the list
|
||||
*
|
||||
* @retval NULL When no node is available
|
||||
* @retval node When node is available
|
||||
*/
|
||||
static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
|
||||
{
|
||||
struct rtio_mpsc_node *head;
|
||||
struct rtio_mpsc_node *tail = q->tail;
|
||||
struct rtio_mpsc_node *next = atomic_ptr_get(&tail->next);
|
||||
|
||||
/* Skip over the stub/sentinel */
|
||||
if (tail == &q->stub) {
|
||||
if (next == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
q->tail = next;
|
||||
tail = next;
|
||||
next = atomic_ptr_get(&next->next);
|
||||
}
|
||||
|
||||
/* If next is non-NULL then a valid node is found, return it */
|
||||
if (next != NULL) {
|
||||
q->tail = next;
|
||||
return tail;
|
||||
}
|
||||
|
||||
head = atomic_ptr_get(&q->head);
|
||||
|
||||
/* If next is NULL, and the tail != HEAD then the queue has pending
|
||||
* updates that can't yet be accessed.
|
||||
*/
|
||||
if (tail != head) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rtio_mpsc_push(q, &q->stub);
|
||||
|
||||
next = atomic_ptr_get(&tail->next);
|
||||
|
||||
if (next != NULL) {
|
||||
q->tail = next;
|
||||
return tail;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @}
|
||||
*/
|
||||
|
||||
#endif /* ZEPHYR_RTIO_MPSC_H_ */
|
|
@ -11,7 +11,10 @@
|
|||
#include <zephyr/sys/kobject.h>
|
||||
#include <zephyr/sys/libc-hooks.h>
|
||||
#include <zephyr/app_memory/mem_domain.h>
|
||||
#include <zephyr/sys/util_loops.h>
|
||||
#include <zephyr/sys/time_units.h>
|
||||
#include <zephyr/rtio/rtio_spsc.h>
|
||||
#include <zephyr/rtio/rtio_mpsc.h>
|
||||
#include <zephyr/rtio/rtio.h>
|
||||
#include <zephyr/rtio/rtio_executor_simple.h>
|
||||
#include <zephyr/rtio/rtio_executor_concurrent.h>
|
||||
|
@ -234,6 +237,171 @@ ZTEST(rtio_spsc, test_spsc_threaded)
|
|||
k_thread_join(tinfo[0].tid, K_FOREVER);
|
||||
}
|
||||
|
||||
static struct rtio_mpsc push_pop_q;
|
||||
static struct rtio_mpsc_node push_pop_nodes[2];
|
||||
|
||||
/*
|
||||
* @brief Push and pop one element
|
||||
*
|
||||
* @see rtio_mpsc_push(), rtio_mpsc_pop()
|
||||
*
|
||||
* @ingroup rtio_tests
|
||||
*/
|
||||
ZTEST(rtio_mpsc, test_push_pop)
|
||||
{
|
||||
|
||||
struct rtio_mpsc_node *node, *head, *stub, *next, *tail;
|
||||
|
||||
rtio_mpsc_init(&push_pop_q);
|
||||
|
||||
head = atomic_ptr_get(&push_pop_q.head);
|
||||
tail = push_pop_q.tail;
|
||||
stub = &push_pop_q.stub;
|
||||
next = atomic_ptr_get(&stub->next);
|
||||
|
||||
zassert_equal(head, stub, "Head should point at stub");
|
||||
zassert_equal(tail, stub, "Tail should point at stub");
|
||||
zassert_is_null(next, "Next should be null");
|
||||
|
||||
node = rtio_mpsc_pop(&push_pop_q);
|
||||
zassert_is_null(node, "Pop on empty queue should return null");
|
||||
|
||||
rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]);
|
||||
|
||||
head = atomic_ptr_get(&push_pop_q.head);
|
||||
|
||||
zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node");
|
||||
next = atomic_ptr_get(&push_pop_nodes[0].next);
|
||||
zassert_is_null(next, NULL, "push_pop_node next should point at null");
|
||||
next = atomic_ptr_get(&push_pop_q.stub.next);
|
||||
zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node");
|
||||
tail = push_pop_q.tail;
|
||||
stub = &push_pop_q.stub;
|
||||
zassert_equal(tail, stub, "Tail should point at stub");
|
||||
|
||||
node = rtio_mpsc_pop(&push_pop_q);
|
||||
stub = &push_pop_q.stub;
|
||||
|
||||
zassert_not_equal(node, stub, "Pop should not return stub");
|
||||
zassert_not_null(node, "Pop should not return null");
|
||||
zassert_equal(node, &push_pop_nodes[0],
|
||||
"Pop should return push_pop_node %p, instead was %p",
|
||||
&push_pop_nodes[0], node);
|
||||
|
||||
node = rtio_mpsc_pop(&push_pop_q);
|
||||
zassert_is_null(node, "Pop on empty queue should return null");
|
||||
}
|
||||
|
||||
#define MPSC_FREEQ_SZ 8
|
||||
#define MPSC_ITERATIONS 100000
|
||||
#define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE)
|
||||
#define MPSC_THREADS_NUM 4
|
||||
|
||||
static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM];
|
||||
static struct k_thread mpsc_thread[MPSC_THREADS_NUM];
|
||||
static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE);
|
||||
|
||||
struct mpsc_node {
|
||||
uint32_t id;
|
||||
struct rtio_mpsc_node n;
|
||||
};
|
||||
|
||||
|
||||
RTIO_SPSC_DECLARE(node_sq, struct mpsc_node, MPSC_FREEQ_SZ);
|
||||
|
||||
#define SPSC_INIT(n, sz) RTIO_SPSC_INITIALIZER(sz)
|
||||
|
||||
struct rtio_spsc_node_sq node_q[MPSC_THREADS_NUM] = {
|
||||
LISTIFY(MPSC_THREADS_NUM, SPSC_INIT, (,), MPSC_FREEQ_SZ)
|
||||
};
|
||||
|
||||
static struct rtio_mpsc mpsc_q;
|
||||
|
||||
static void mpsc_consumer(void *p1, void *p2, void *p3)
|
||||
{
|
||||
struct rtio_mpsc_node *n;
|
||||
struct mpsc_node *nn;
|
||||
uint32_t start_t = k_cycle_get_32();
|
||||
|
||||
for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) {
|
||||
do {
|
||||
n = rtio_mpsc_pop(&mpsc_q);
|
||||
} while (n == NULL);
|
||||
|
||||
zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub");
|
||||
|
||||
nn = CONTAINER_OF(n, struct mpsc_node, n);
|
||||
|
||||
rtio_spsc_acquire(&node_q[nn->id]);
|
||||
rtio_spsc_produce(&node_q[nn->id]);
|
||||
}
|
||||
|
||||
uint32_t diff = k_cycle_get_32() - start_t;
|
||||
|
||||
uint32_t us = k_cyc_to_us_floor32(diff);
|
||||
|
||||
TC_PRINT("%d mpsc consumes and spsc produces in %u cycles (%u microseconds)\n",
|
||||
(MPSC_ITERATIONS)*(MPSC_THREADS_NUM-1), diff, us);
|
||||
}
|
||||
|
||||
static void mpsc_producer(void *p1, void *p2, void *p3)
|
||||
{
|
||||
struct mpsc_node *n;
|
||||
uint32_t id = (uint32_t)(uintptr_t)p1;
|
||||
|
||||
for (int i = 0; i < MPSC_ITERATIONS; i++) {
|
||||
do {
|
||||
n = rtio_spsc_consume(&node_q[id]);
|
||||
} while (n == NULL);
|
||||
|
||||
rtio_spsc_release(&node_q[id]);
|
||||
n->id = id;
|
||||
rtio_mpsc_push(&mpsc_q, &n->n);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Test that the producer and consumer are indeed thread safe
|
||||
*
|
||||
* This can and should be validated on SMP machines where incoherent
|
||||
* memory could cause issues.
|
||||
*/
|
||||
ZTEST(rtio_mpsc, test_mpsc_threaded)
|
||||
{
|
||||
rtio_mpsc_init(&mpsc_q);
|
||||
|
||||
TC_PRINT("setting up mpsc producer free queues\n");
|
||||
/* Setup node free queues */
|
||||
for (int i = 0; i < MPSC_THREADS_NUM; i++) {
|
||||
for (int j = 0; j < MPSC_FREEQ_SZ; j++) {
|
||||
rtio_spsc_acquire(&node_q[i]);
|
||||
}
|
||||
rtio_spsc_produce_all(&node_q[i]);
|
||||
}
|
||||
|
||||
TC_PRINT("starting consumer\n");
|
||||
mpsc_tinfo[0].tid =
|
||||
k_thread_create(&mpsc_thread[0], mpsc_stack[0], STACK_SIZE,
|
||||
(k_thread_entry_t)mpsc_consumer,
|
||||
NULL, NULL, NULL,
|
||||
K_PRIO_PREEMPT(5),
|
||||
K_INHERIT_PERMS, K_NO_WAIT);
|
||||
|
||||
for (int i = 1; i < MPSC_THREADS_NUM; i++) {
|
||||
TC_PRINT("starting producer %i\n", i);
|
||||
mpsc_tinfo[i].tid =
|
||||
k_thread_create(&mpsc_thread[i], mpsc_stack[i], STACK_SIZE,
|
||||
(k_thread_entry_t)mpsc_producer,
|
||||
(void *)(uintptr_t)i, NULL, NULL,
|
||||
K_PRIO_PREEMPT(5),
|
||||
K_INHERIT_PERMS, K_NO_WAIT);
|
||||
}
|
||||
|
||||
for (int i = 0; i < MPSC_THREADS_NUM; i++) {
|
||||
TC_PRINT("joining mpsc thread %d\n", i);
|
||||
k_thread_join(mpsc_tinfo[i].tid, K_FOREVER);
|
||||
}
|
||||
}
|
||||
|
||||
RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp);
|
||||
RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4);
|
||||
|
@ -505,4 +673,5 @@ ZTEST(rtio_api, test_rtio_syscalls)
|
|||
|
||||
|
||||
ZTEST_SUITE(rtio_spsc, NULL, NULL, NULL, NULL, NULL);
|
||||
ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL);
|
||||
ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL);
|
||||
|
|
Loading…
Reference in a new issue