zbus: add priority boost feature

Replace mutexes with semaphores to protect the channels in conjunction with
a priority boost algorithm based on the observers' priority.

Signed-off-by: Rodrigo Peixoto <rodrigopex@gmail.com>
This commit is contained in:
Rodrigo Peixoto 2023-09-05 12:48:55 -03:00 committed by Carles Cufí
parent c25ac487af
commit c992707251
7 changed files with 396 additions and 101 deletions

View file

@ -216,6 +216,7 @@ endif()
if(CONFIG_ZBUS) if(CONFIG_ZBUS)
zephyr_iterable_section(NAME zbus_channel KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) zephyr_iterable_section(NAME zbus_channel KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4)
zephyr_iterable_section(NAME zbus_observer KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4)
zephyr_iterable_section(NAME zbus_channel_observation KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) zephyr_iterable_section(NAME zbus_channel_observation KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4)
endif() endif()

View file

@ -133,7 +133,6 @@
#endif /* CONFIG_SENSING */ #endif /* CONFIG_SENSING */
#if defined(CONFIG_ZBUS) #if defined(CONFIG_ZBUS)
ITERABLE_SECTION_RAM(zbus_observer, 4)
ITERABLE_SECTION_RAM(zbus_channel_observation_mask, 1) ITERABLE_SECTION_RAM(zbus_channel_observation_mask, 1)
#endif /* CONFIG_ZBUS */ #endif /* CONFIG_ZBUS */

View file

@ -36,6 +36,7 @@
#if defined(CONFIG_ZBUS) #if defined(CONFIG_ZBUS)
ITERABLE_SECTION_ROM(zbus_channel, 4) ITERABLE_SECTION_ROM(zbus_channel, 4)
ITERABLE_SECTION_ROM(zbus_observer, 4)
ITERABLE_SECTION_ROM(zbus_channel_observation, 4) ITERABLE_SECTION_ROM(zbus_channel_observation, 4)
#endif /* CONFIG_ZBUS */ #endif /* CONFIG_ZBUS */

View file

@ -38,10 +38,17 @@ struct zbus_channel_data {
*/ */
int16_t observers_end_idx; int16_t observers_end_idx;
/** Access control mutex. Points to the mutex used to avoid race conditions /** Access control semaphore. Points to the semaphore used to avoid race conditions
* for accessing the channel. * for accessing the channel.
*/ */
struct k_mutex mutex; struct k_sem sem;
#if defined(CONFIG_ZBUS_PRIORITY_BOOST)
/** Highest observer priority. Indicates the priority that the VDED will use to boost the
* notification process avoiding preemptions.
*/
int highest_observer_priority;
#endif /* CONFIG_ZBUS_PRIORITY_BOOST */
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__) #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__)
/** Channel observer list. Represents the channel's observers list, it can be empty /** Channel observer list. Represents the channel's observers list, it can be empty
@ -96,6 +103,16 @@ enum __packed zbus_observer_type {
ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
}; };
struct zbus_observer_data {
/** Enabled flag. Indicates if observer is receiving notification. */
bool enabled;
#if defined(CONFIG_ZBUS_PRIORITY_BOOST)
/** Subscriber attached thread priority. */
int priority;
#endif /* CONFIG_ZBUS_PRIORITY_BOOST */
};
/** /**
* @brief Type used to represent an observer. * @brief Type used to represent an observer.
* *
@ -119,8 +136,8 @@ struct zbus_observer {
/** Type indication. */ /** Type indication. */
enum zbus_observer_type type; enum zbus_observer_type type;
/** Enabled flag. Indicates if observer is receiving notification. */ /** Mutable observer data struct. */
bool enabled; struct zbus_observer_data *const data;
union { union {
/** Observer message queue. It turns the observer into a subscriber. */ /** Observer message queue. It turns the observer into a subscriber. */
@ -154,6 +171,8 @@ struct zbus_channel_observation {
#define _ZBUS_CPP_EXTERN #define _ZBUS_CPP_EXTERN
#endif /* __cplusplus */ #endif /* __cplusplus */
#define ZBUS_MIN_THREAD_PRIORITY (CONFIG_NUM_PREEMPT_PRIORITIES - 1)
#if defined(CONFIG_ZBUS_ASSERT_MOCK) #if defined(CONFIG_ZBUS_ASSERT_MOCK)
#define _ZBUS_ASSERT(_cond, _fmt, ...) \ #define _ZBUS_ASSERT(_cond, _fmt, ...) \
do { \ do { \
@ -233,6 +252,7 @@ struct zbus_channel_observation {
/** @endcond */ /** @endcond */
/* clang-format off */
/** /**
* @brief Add a static channel observervation. * @brief Add a static channel observervation.
* *
@ -247,10 +267,14 @@ struct zbus_channel_observation {
#define ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, _masked, _prio) \ #define ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, _masked, _prio) \
const STRUCT_SECTION_ITERABLE(zbus_channel_observation, \ const STRUCT_SECTION_ITERABLE(zbus_channel_observation, \
_CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs))) = { \ _CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs))) = { \
.chan = &_chan, .obs = &_obs}; \ .chan = &_chan, \
.obs = &_obs, \
}; \
STRUCT_SECTION_ITERABLE(zbus_channel_observation_mask, \ STRUCT_SECTION_ITERABLE(zbus_channel_observation_mask, \
_CONCAT(_CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs)), \ _CONCAT(_CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs)), \
_mask)) = {.enabled = _masked} _mask)) = {.enabled = _masked}
/* clang-format on */
/** /**
* @brief Add a static channel observervation. * @brief Add a static channel observervation.
* *
@ -290,6 +314,7 @@ struct zbus_channel_observation {
*/ */
#define ZBUS_OBSERVERS(...) __VA_ARGS__ #define ZBUS_OBSERVERS(...) __VA_ARGS__
/* clang-format off */
/** /**
* @brief Zbus channel definition. * @brief Zbus channel definition.
* *
@ -308,17 +333,26 @@ struct zbus_channel_observation {
#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \ #define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \
static _type _CONCAT(_zbus_message_, _name) = _init_val; \ static _type _CONCAT(_zbus_message_, _name) = _init_val; \
static struct zbus_channel_data _CONCAT(_zbus_chan_data_, _name) = { \ static struct zbus_channel_data _CONCAT(_zbus_chan_data_, _name) = { \
.observers_start_idx = -1, .observers_end_idx = -1}; \ .observers_start_idx = -1, \
.observers_end_idx = -1, \
IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \
.highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY, \
)) \
}; \
static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \ static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \
_ZBUS_CPP_EXTERN const STRUCT_SECTION_ITERABLE(zbus_channel, _name) = { \ _ZBUS_CPP_EXTERN const STRUCT_SECTION_ITERABLE(zbus_channel, _name) = { \
ZBUS_CHANNEL_NAME_INIT(_name) /* Maybe removed */ \ ZBUS_CHANNEL_NAME_INIT(_name) /* Maybe removed */ \
.message = &_CONCAT(_zbus_message_, _name), \ .message = &_CONCAT(_zbus_message_, _name), \
.message_size = sizeof(_type), .user_data = _user_data, .validator = (_validator), \ .message_size = sizeof(_type), \
.data = &_CONCAT(_zbus_chan_data_, _name)}; \ .user_data = _user_data, \
.validator = _validator, \
.data = &_CONCAT(_zbus_chan_data_, _name), \
}; \
/* Extern declaration of observers */ \ /* Extern declaration of observers */ \
ZBUS_OBS_DECLARE(_observers); \ ZBUS_OBS_DECLARE(_observers); \
/* Create all channel observations from observers list */ \ /* Create all channel observations from observers list */ \
FOR_EACH_FIXED_ARG_NONEMPTY_TERM(_ZBUS_CHAN_OBSERVATION, (;), _name, _observers) FOR_EACH_FIXED_ARG_NONEMPTY_TERM(_ZBUS_CHAN_OBSERVATION, (;), _name, _observers)
/* clang-format on */
/** /**
* @brief Initialize a message. * @brief Initialize a message.
@ -334,6 +368,7 @@ struct zbus_channel_observation {
_val, ##__VA_ARGS__ \ _val, ##__VA_ARGS__ \
} }
/* clang-format off */
/** /**
* @brief Define and initialize a subscriber. * @brief Define and initialize a subscriber.
* *
@ -346,12 +381,24 @@ struct zbus_channel_observation {
* @param[in] _enable The subscriber initial enable state. * @param[in] _enable The subscriber initial enable state.
*/ */
#define ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, _enable) \ #define ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, _enable) \
K_MSGQ_DEFINE(_zbus_observer_queue_##_name, sizeof(const struct zbus_channel *), \ K_MSGQ_DEFINE(_zbus_observer_queue_##_name, \
_queue_size, sizeof(const struct zbus_channel *)); \ sizeof(const struct zbus_channel *), \
_queue_size, sizeof(const struct zbus_channel *) \
); \
static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \
.enabled = _enable, \
IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \
.priority = ZBUS_MIN_THREAD_PRIORITY, \
)) \
}; \
STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \
ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.type = ZBUS_OBSERVER_SUBSCRIBER_TYPE, \ .type = ZBUS_OBSERVER_SUBSCRIBER_TYPE, \
.enabled = _enable, .queue = &_zbus_observer_queue_##_name} .data = &_CONCAT(_zbus_obs_data_, _name), \
.queue = &_zbus_observer_queue_##_name, \
}
/* clang-format on */
/** /**
* @brief Define and initialize a subscriber. * @brief Define and initialize a subscriber.
* *
@ -366,6 +413,7 @@ struct zbus_channel_observation {
#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size) \ #define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size) \
ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, true) ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, true)
/* clang-format off */
/** /**
* @brief Define and initialize a listener. * @brief Define and initialize a listener.
* *
@ -378,10 +426,20 @@ struct zbus_channel_observation {
* @param[in] _enable The listener initial enable state. * @param[in] _enable The listener initial enable state.
*/ */
#define ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, _enable) \ #define ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, _enable) \
STRUCT_SECTION_ITERABLE(zbus_observer, \ static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \
_name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ .enabled = _enable, \
IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \
.priority = ZBUS_MIN_THREAD_PRIORITY, \
)) \
}; \
STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \
ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.type = ZBUS_OBSERVER_LISTENER_TYPE, \ .type = ZBUS_OBSERVER_LISTENER_TYPE, \
.enabled = _enable, .callback = (_cb)} .data = &_CONCAT(_zbus_obs_data_, _name), \
.callback = (_cb) \
}
/* clang-format on */
/** /**
* @brief Define and initialize a listener. * @brief Define and initialize a listener.
* *
@ -394,6 +452,7 @@ struct zbus_channel_observation {
*/ */
#define ZBUS_LISTENER_DEFINE(_name, _cb) ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, true) #define ZBUS_LISTENER_DEFINE(_name, _cb) ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, true)
/* clang-format off */
/** /**
* @brief Define and initialize a message subscriber. * @brief Define and initialize a message subscriber.
* *
@ -406,12 +465,19 @@ struct zbus_channel_observation {
*/ */
#define ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _enable) \ #define ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _enable) \
static K_FIFO_DEFINE(_zbus_observer_fifo_##_name); \ static K_FIFO_DEFINE(_zbus_observer_fifo_##_name); \
static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \
.enabled = _enable, \
IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \
.priority = ZBUS_MIN_THREAD_PRIORITY, \
)) \
}; \
STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \
ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.type = ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, \ .type = ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, \
.enabled = _enable, \ .data = &_CONCAT(_zbus_obs_data_, _name), \
.message_fifo = &_zbus_observer_fifo_##_name, \ .message_fifo = &_zbus_observer_fifo_##_name, \
} }
/* clang-format on */
/** /**
* @brief Define and initialize an enabled message subscriber. * @brief Define and initialize an enabled message subscriber.
@ -501,8 +567,6 @@ int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout);
* @param chan The channel's reference. * @param chan The channel's reference.
* *
* @retval 0 Channel finished. * @retval 0 Channel finished.
* @retval -EPERM The channel was claimed by other thread.
* @retval -EINVAL The channel's mutex is not locked.
* @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The * @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The
* function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. * function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled.
*/ */
@ -519,9 +583,9 @@ int zbus_chan_finish(const struct zbus_channel *chan);
* or one of the special values K_NO_WAIT and K_FOREVER. * or one of the special values K_NO_WAIT and K_FOREVER.
* *
* @retval 0 Channel notified. * @retval 0 Channel notified.
* @retval -EPERM The current thread does not own the channel. * @retval -EBUSY The channel's semaphore returned without waiting.
* @retval -EBUSY The channel's mutex returned without waiting. * @retval -EAGAIN Timeout to take the channel's semaphore.
* @retval -EAGAIN Timeout to acquiring the channel's mutex. * @retval -ENOMEM There is not more buffer on the messgage buffers pool.
* @retval -EFAULT A parameter is incorrect, the notification could not be sent to one or more * @retval -EFAULT A parameter is incorrect, the notification could not be sent to one or more
* observer, or the function context is invalid (inside an ISR). The function only returns this * observer, or the function context is invalid (inside an ISR). The function only returns this
* value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. * value when the CONFIG_ZBUS_ASSERT_MOCK is enabled.
@ -553,7 +617,7 @@ static inline const char *zbus_chan_name(const struct zbus_channel *chan)
* *
* This routine returns the reference of a channel message. * This routine returns the reference of a channel message.
* *
* @warning This function must only be used directly for acquired (locked by mutex) channels. This * @warning This function must only be used directly for already locked channels. This
* can be done inside a listener for the receiving channel or after claim a channel. * can be done inside a listener for the receiving channel or after claim a channel.
* *
* @param chan The channel's reference. * @param chan The channel's reference.
@ -574,7 +638,7 @@ static inline void *zbus_chan_msg(const struct zbus_channel *chan)
* inside listeners to access the message directly. In this way zbus prevents the listener of * inside listeners to access the message directly. In this way zbus prevents the listener of
* changing the notifying channel's message during the notification process. * changing the notifying channel's message during the notification process.
* *
* @warning This function must only be used directly for acquired (locked by mutex) channels. This * @warning This function must only be used directly for already locked channels. This
* can be done inside a listener for the receiving channel or after claim a channel. * can be done inside a listener for the receiving channel or after claim a channel.
* *
* @param chan The channel's constant reference. * @param chan The channel's constant reference.
@ -685,14 +749,7 @@ struct zbus_observer_node {
* @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The * @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The
* function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. * function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled.
*/ */
static inline int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled) int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled);
{
_ZBUS_ASSERT(obs != NULL, "obs is required");
obs->enabled = enabled;
return 0;
}
/** /**
* @brief Get the observer state. * @brief Get the observer state.
@ -709,7 +766,7 @@ static inline int zbus_obs_is_enabled(struct zbus_observer *obs, bool *enable)
_ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(obs != NULL, "obs is required");
_ZBUS_ASSERT(enable != NULL, "enable is required"); _ZBUS_ASSERT(enable != NULL, "enable is required");
*enable = obs->enabled; *enable = obs->data->enabled;
return 0; return 0;
} }
@ -766,6 +823,32 @@ static inline const char *zbus_obs_name(const struct zbus_observer *obs)
#endif #endif
#if defined(CONFIG_ZBUS_PRIORITY_BOOST) || defined(__DOXYGEN__)
/**
* @brief Set the observer thread priority by attaching it to a thread.
*
* @param[in] obs The observer's reference.
*
* @retval 0 Observer detached from the thread.
* @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The
* function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled.
*/
int zbus_obs_attach_to_thread(const struct zbus_observer *obs);
/**
* @brief Clear the observer thread priority by detaching it from a thread.
*
* @param[in] obs The observer's reference.
*
* @retval 0 Observer detached from the thread.
* @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The
* function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled.
*/
int zbus_obs_detach_from_thread(const struct zbus_observer *obs);
#endif /* CONFIG_ZBUS_PRIORITY_BOOST */
/** /**
* @brief Wait for a channel notification. * @brief Wait for a channel notification.
* *

View file

@ -53,6 +53,13 @@ endif # ZBUS_MSG_SUBSCRIBER
config ZBUS_RUNTIME_OBSERVERS config ZBUS_RUNTIME_OBSERVERS
bool "Runtime observers support." bool "Runtime observers support."
config ZBUS_PRIORITY_BOOST
bool "ZBus priority boost algorithm"
default y
help
ZBus implements the Highest Locker Protocol that relies on the observers thread priority
to determine a temporary publisher priority.
config ZBUS_ASSERT_MOCK config ZBUS_ASSERT_MOCK
bool "Zbus assert mock for test purposes." bool "Zbus assert mock for test purposes."
help help

View file

@ -12,6 +12,13 @@
#include <zephyr/zbus/zbus.h> #include <zephyr/zbus/zbus.h>
LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL); LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
#if defined(CONFIG_ZBUS_PRIORITY_BOOST)
/* Available only when the priority boost is enabled */
static struct k_spinlock _zbus_chan_slock;
#endif /* CONFIG_ZBUS_PRIORITY_BOOST */
static struct k_spinlock obs_slock;
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC) #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
@ -69,7 +76,7 @@ int _zbus_init(void)
++(curr->data->observers_end_idx); ++(curr->data->observers_end_idx);
} }
STRUCT_SECTION_FOREACH(zbus_channel, chan) { STRUCT_SECTION_FOREACH(zbus_channel, chan) {
k_mutex_init(&chan->data->mutex); k_sem_init(&chan->data->sem, 1, 1);
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
sys_slist_init(&chan->data->observers); sys_slist_init(&chan->data->observers);
@ -145,7 +152,7 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
const struct zbus_observer *obs = observation->obs; const struct zbus_observer *obs = observation->obs;
if (!obs->enabled || observation_mask->enabled) { if (!obs->data->enabled || observation_mask->enabled) {
continue; continue;
} }
@ -174,7 +181,7 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
const struct zbus_observer *obs = obs_nd->obs; const struct zbus_observer *obs = obs_nd->obs;
if (!obs->enabled) { if (!obs->data->enabled) {
continue; continue;
} }
@ -191,21 +198,171 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
return last_error; return last_error;
} }
#if defined(CONFIG_ZBUS_PRIORITY_BOOST)
static inline void chan_update_hop(const struct zbus_channel *chan)
{
struct zbus_channel_observation *observation;
struct zbus_channel_observation_mask *observation_mask;
int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY;
K_SPINLOCK(&_zbus_chan_slock) {
const int limit = chan->data->observers_end_idx;
for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
__ASSERT(observation != NULL, "observation must be not NULL");
const struct zbus_observer *obs = observation->obs;
if (!obs->data->enabled || observation_mask->enabled) {
continue;
}
if (chan_highest_observer_priority > obs->data->priority) {
chan_highest_observer_priority = obs->data->priority;
}
}
chan->data->highest_observer_priority = chan_highest_observer_priority;
}
}
static inline void update_all_channels_hop(const struct zbus_observer *obs)
{
struct zbus_channel_observation *observation;
int count;
STRUCT_SECTION_COUNT(zbus_channel_observation, &count);
for (int16_t i = 0; i < count; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
if (obs != observation->obs) {
continue;
}
chan_update_hop(observation->chan);
}
}
int zbus_obs_attach_to_thread(const struct zbus_observer *obs)
{
_ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR");
_ZBUS_ASSERT(obs != NULL, "obs is required");
int current_thread_priority = k_thread_priority_get(k_current_get());
K_SPINLOCK(&obs_slock) {
if (obs->data->priority != current_thread_priority) {
obs->data->priority = current_thread_priority;
update_all_channels_hop(obs);
}
}
return 0;
}
int zbus_obs_detach_from_thread(const struct zbus_observer *obs)
{
_ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR");
_ZBUS_ASSERT(obs != NULL, "obs is required");
K_SPINLOCK(&obs_slock) {
obs->data->priority = ZBUS_MIN_THREAD_PRIORITY;
update_all_channels_hop(obs);
}
return 0;
}
#else
static inline void update_all_channels_hop(const struct zbus_observer *obs)
{
}
#endif /* CONFIG_ZBUS_PRIORITY_BOOST */
static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio)
{
bool boosting = false;
#if defined(CONFIG_ZBUS_PRIORITY_BOOST)
if (!k_is_in_isr()) {
*prio = k_thread_priority_get(k_current_get());
K_SPINLOCK(&_zbus_chan_slock) {
if (*prio > chan->data->highest_observer_priority) {
int new_prio = chan->data->highest_observer_priority - 1;
new_prio = MAX(new_prio, 0);
/* Elevating priority since the highest_observer_priority is
* greater than the current thread
*/
k_thread_priority_set(k_current_get(), new_prio);
boosting = true;
}
}
}
#endif /* CONFIG_ZBUS_PRIORITY_BOOST */
int err = k_sem_take(&chan->data->sem, timeout);
if (err) {
/* When the priority boost is disabled, this IF will be optimized out. */
if (boosting) {
/* Restoring thread priority since the semaphore is not available */
k_thread_priority_set(k_current_get(), *prio);
}
return err;
}
return 0;
}
static inline void chan_unlock(const struct zbus_channel *chan, int prio)
{
k_sem_give(&chan->data->sem);
#if defined(CONFIG_ZBUS_PRIORITY_BOOST)
/* During the unlock phase, with the priority boost enabled, the priority must be
* restored to the original value in case it was elevated
*/
if (prio < ZBUS_MIN_THREAD_PRIORITY) {
k_thread_priority_set(k_current_get(), prio);
}
#endif /* CONFIG_ZBUS_PRIORITY_BOOST */
}
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout) int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
{ {
int err; int err;
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(msg != NULL, "msg is required"); _ZBUS_ASSERT(msg != NULL, "msg is required");
if (k_is_in_isr()) {
timeout = K_NO_WAIT;
}
k_timepoint_t end_time = sys_timepoint_calc(timeout); k_timepoint_t end_time = sys_timepoint_calc(timeout);
if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) { if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
return -ENOMSG; return -ENOMSG;
} }
err = k_mutex_lock(&chan->data->mutex, timeout); int context_priority = ZBUS_MIN_THREAD_PRIORITY;
err = chan_lock(chan, timeout, &context_priority);
if (err) { if (err) {
return err; return err;
} }
@ -214,56 +371,67 @@ int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t
err = _zbus_vded_exec(chan, end_time); err = _zbus_vded_exec(chan, end_time);
k_mutex_unlock(&chan->data->mutex); chan_unlock(chan, context_priority);
return err; return err;
} }
int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout) int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
{ {
int err;
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(msg != NULL, "msg is required"); _ZBUS_ASSERT(msg != NULL, "msg is required");
err = k_mutex_lock(&chan->data->mutex, timeout); if (k_is_in_isr()) {
timeout = K_NO_WAIT;
}
int err = k_sem_take(&chan->data->sem, timeout);
if (err) { if (err) {
return err; return err;
} }
memcpy(msg, chan->message, chan->message_size); memcpy(msg, chan->message, chan->message_size);
return k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return 0;
} }
int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout) int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
{ {
int err; int err;
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
if (k_is_in_isr()) {
timeout = K_NO_WAIT;
}
k_timepoint_t end_time = sys_timepoint_calc(timeout); k_timepoint_t end_time = sys_timepoint_calc(timeout);
err = k_mutex_lock(&chan->data->mutex, timeout); int context_priority = ZBUS_MIN_THREAD_PRIORITY;
err = chan_lock(chan, timeout, &context_priority);
if (err) { if (err) {
return err; return err;
} }
err = _zbus_vded_exec(chan, end_time); err = _zbus_vded_exec(chan, end_time);
k_mutex_unlock(&chan->data->mutex); chan_unlock(chan, context_priority);
return err; return err;
} }
int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
{ {
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
int err = k_mutex_lock(&chan->data->mutex, timeout); if (k_is_in_isr()) {
timeout = K_NO_WAIT;
}
int err = k_sem_take(&chan->data->sem, timeout);
if (err) { if (err) {
return err; return err;
@ -274,18 +442,17 @@ int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
int zbus_chan_finish(const struct zbus_channel *chan) int zbus_chan_finish(const struct zbus_channel *chan)
{ {
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
int err = k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return err; return 0;
} }
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
k_timeout_t timeout) k_timeout_t timeout)
{ {
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs");
_ZBUS_ASSERT(sub != NULL, "sub is required"); _ZBUS_ASSERT(sub != NULL, "sub is required");
_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER"); _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
_ZBUS_ASSERT(sub->queue != NULL, "sub queue is required"); _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required");
@ -299,7 +466,7 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c
int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
k_timeout_t timeout) k_timeout_t timeout)
{ {
_ZBUS_ASSERT(!k_is_in_isr(), "zbus subscribers cannot be used inside ISRs"); _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs");
_ZBUS_ASSERT(sub != NULL, "sub is required"); _ZBUS_ASSERT(sub != NULL, "sub is required");
_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
"sub must be a MSG_SUBSCRIBER"); "sub must be a MSG_SUBSCRIBER");
@ -330,22 +497,35 @@ int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
_ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(obs != NULL, "obs is required");
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
int err = -ESRCH;
struct zbus_channel_observation *observation; struct zbus_channel_observation *observation;
struct zbus_channel_observation_mask *observation_mask; struct zbus_channel_observation_mask *observation_mask;
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; K_SPINLOCK(&obs_slock) {
for (int16_t i = chan->data->observers_start_idx,
limit = chan->data->observers_end_idx;
i < limit; ++i) { i < limit; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
_ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); __ASSERT(observation != NULL, "observation must be not NULL");
if (observation->obs == obs) { if (observation->obs == obs) {
if (observation_mask->enabled != masked) {
observation_mask->enabled = masked; observation_mask->enabled = masked;
return 0;
update_all_channels_hop(obs);
}
err = 0;
K_SPINLOCK_BREAK;
} }
} }
return -ESRCH; }
return err;
} }
int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
@ -354,20 +534,44 @@ int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
_ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(obs != NULL, "obs is required");
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
int err = -ESRCH;
struct zbus_channel_observation *observation; struct zbus_channel_observation *observation;
struct zbus_channel_observation_mask *observation_mask; struct zbus_channel_observation_mask *observation_mask;
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; K_SPINLOCK(&obs_slock) {
i < limit; ++i) { const int limit = chan->data->observers_end_idx;
for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
_ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); __ASSERT(observation != NULL, "observation must be not NULL");
if (observation->obs == obs) { if (observation->obs == obs) {
*masked = observation_mask->enabled; *masked = observation_mask->enabled;
return 0;
err = 0;
K_SPINLOCK_BREAK;
} }
} }
return -ESRCH; }
return err;
}
int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled)
{
_ZBUS_ASSERT(obs != NULL, "obs is required");
K_SPINLOCK(&obs_slock) {
if (obs->data->enabled != enabled) {
obs->data->enabled = enabled;
update_all_channels_hop(obs);
}
}
return 0;
} }

View file

@ -19,7 +19,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(obs != NULL, "obs is required");
err = k_mutex_lock(&chan->data->mutex, timeout); err = k_sem_take(&chan->data->sem, timeout);
if (err) { if (err) {
return err; return err;
} }
@ -31,7 +31,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
__ASSERT(observation != NULL, "observation must be not NULL"); __ASSERT(observation != NULL, "observation must be not NULL");
if (observation->obs == obs) { if (observation->obs == obs) {
k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return -EEXIST; return -EEXIST;
} }
@ -40,7 +40,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
/* Check if the observer is already a runtime observer */ /* Check if the observer is already a runtime observer */
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
if (obs_nd->obs == obs) { if (obs_nd->obs == obs) {
k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return -EALREADY; return -EALREADY;
} }
@ -51,7 +51,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
if (new_obs_nd == NULL) { if (new_obs_nd == NULL) {
LOG_ERR("Could not allocate observer node the heap is full!"); LOG_ERR("Could not allocate observer node the heap is full!");
k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return -ENOMEM; return -ENOMEM;
} }
@ -60,7 +60,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
sys_slist_append(&chan->data->observers, &new_obs_nd->node); sys_slist_append(&chan->data->observers, &new_obs_nd->node);
k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return 0; return 0;
} }
@ -76,7 +76,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer
_ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(obs != NULL, "obs is required");
err = k_mutex_lock(&chan->data->mutex, timeout); err = k_sem_take(&chan->data->sem, timeout);
if (err) { if (err) {
return err; return err;
} }
@ -87,7 +87,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer
k_free(obs_nd); k_free(obs_nd);
k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return 0; return 0;
} }
@ -95,7 +95,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer
prev_obs_nd = obs_nd; prev_obs_nd = obs_nd;
} }
k_mutex_unlock(&chan->data->mutex); k_sem_give(&chan->data->sem);
return -ENODATA; return -ENODATA;
} }