From c992707251efc208130ada7d0a54f19ce038c199 Mon Sep 17 00:00:00 2001 From: Rodrigo Peixoto Date: Tue, 5 Sep 2023 12:48:55 -0300 Subject: [PATCH] zbus: add priority boost feature Replace mutexes with semaphores to protect the channels in conjunction with a priority boost algorithm based on the observers' priority. Signed-off-by: Rodrigo Peixoto --- cmake/linker_script/common/common-rom.cmake | 1 + include/zephyr/linker/common-ram.ld | 1 - .../linker/common-rom/common-rom-misc.ld | 1 + include/zephyr/zbus/zbus.h | 189 ++++++++---- subsys/zbus/Kconfig | 7 + subsys/zbus/zbus.c | 282 +++++++++++++++--- subsys/zbus/zbus_runtime_observers.c | 16 +- 7 files changed, 396 insertions(+), 101 deletions(-) diff --git a/cmake/linker_script/common/common-rom.cmake b/cmake/linker_script/common/common-rom.cmake index d79fa223fc..d955c8ad0b 100644 --- a/cmake/linker_script/common/common-rom.cmake +++ b/cmake/linker_script/common/common-rom.cmake @@ -216,6 +216,7 @@ endif() if(CONFIG_ZBUS) zephyr_iterable_section(NAME zbus_channel KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) + zephyr_iterable_section(NAME zbus_observer KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) zephyr_iterable_section(NAME zbus_channel_observation KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) endif() diff --git a/include/zephyr/linker/common-ram.ld b/include/zephyr/linker/common-ram.ld index 7bb6e55f1c..df70b13ca7 100644 --- a/include/zephyr/linker/common-ram.ld +++ b/include/zephyr/linker/common-ram.ld @@ -133,7 +133,6 @@ #endif /* CONFIG_SENSING */ #if defined(CONFIG_ZBUS) - ITERABLE_SECTION_RAM(zbus_observer, 4) ITERABLE_SECTION_RAM(zbus_channel_observation_mask, 1) #endif /* CONFIG_ZBUS */ diff --git a/include/zephyr/linker/common-rom/common-rom-misc.ld b/include/zephyr/linker/common-rom/common-rom-misc.ld index 1fbf777ba5..189a9e3188 100644 --- a/include/zephyr/linker/common-rom/common-rom-misc.ld +++ b/include/zephyr/linker/common-rom/common-rom-misc.ld @@ -36,6 +36,7 @@ #if defined(CONFIG_ZBUS) ITERABLE_SECTION_ROM(zbus_channel, 4) + ITERABLE_SECTION_ROM(zbus_observer, 4) ITERABLE_SECTION_ROM(zbus_channel_observation, 4) #endif /* CONFIG_ZBUS */ diff --git a/include/zephyr/zbus/zbus.h b/include/zephyr/zbus/zbus.h index 285338d473..2c0c0622ae 100644 --- a/include/zephyr/zbus/zbus.h +++ b/include/zephyr/zbus/zbus.h @@ -38,10 +38,17 @@ struct zbus_channel_data { */ int16_t observers_end_idx; - /** Access control mutex. Points to the mutex used to avoid race conditions + /** Access control semaphore. Points to the semaphore used to avoid race conditions * for accessing the channel. */ - struct k_mutex mutex; + struct k_sem sem; + +#if defined(CONFIG_ZBUS_PRIORITY_BOOST) + /** Highest observer priority. Indicates the priority that the VDED will use to boost the + * notification process avoiding preemptions. + */ + int highest_observer_priority; +#endif /* CONFIG_ZBUS_PRIORITY_BOOST */ #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__) /** Channel observer list. Represents the channel's observers list, it can be empty @@ -96,6 +103,16 @@ enum __packed zbus_observer_type { ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, }; +struct zbus_observer_data { + /** Enabled flag. Indicates if observer is receiving notification. */ + bool enabled; + +#if defined(CONFIG_ZBUS_PRIORITY_BOOST) + /** Subscriber attached thread priority. */ + int priority; +#endif /* CONFIG_ZBUS_PRIORITY_BOOST */ +}; + /** * @brief Type used to represent an observer. * @@ -119,8 +136,8 @@ struct zbus_observer { /** Type indication. */ enum zbus_observer_type type; - /** Enabled flag. Indicates if observer is receiving notification. */ - bool enabled; + /** Mutable observer data struct. */ + struct zbus_observer_data *const data; union { /** Observer message queue. It turns the observer into a subscriber. */ @@ -154,6 +171,8 @@ struct zbus_channel_observation { #define _ZBUS_CPP_EXTERN #endif /* __cplusplus */ +#define ZBUS_MIN_THREAD_PRIORITY (CONFIG_NUM_PREEMPT_PRIORITIES - 1) + #if defined(CONFIG_ZBUS_ASSERT_MOCK) #define _ZBUS_ASSERT(_cond, _fmt, ...) \ do { \ @@ -233,6 +252,7 @@ struct zbus_channel_observation { /** @endcond */ +/* clang-format off */ /** * @brief Add a static channel observervation. * @@ -246,11 +266,15 @@ struct zbus_channel_observation { */ #define ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, _masked, _prio) \ const STRUCT_SECTION_ITERABLE(zbus_channel_observation, \ - _CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs))) = { \ - .chan = &_chan, .obs = &_obs}; \ + _CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs))) = { \ + .chan = &_chan, \ + .obs = &_obs, \ + }; \ STRUCT_SECTION_ITERABLE(zbus_channel_observation_mask, \ _CONCAT(_CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs)), \ _mask)) = {.enabled = _masked} +/* clang-format on */ + /** * @brief Add a static channel observervation. * @@ -290,6 +314,7 @@ struct zbus_channel_observation { */ #define ZBUS_OBSERVERS(...) __VA_ARGS__ +/* clang-format off */ /** * @brief Zbus channel definition. * @@ -305,20 +330,29 @@ struct zbus_channel_observation { * first the highest priority. * @param _init_val The message initialization. */ -#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \ - static _type _CONCAT(_zbus_message_, _name) = _init_val; \ - static struct zbus_channel_data _CONCAT(_zbus_chan_data_, _name) = { \ - .observers_start_idx = -1, .observers_end_idx = -1}; \ - static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \ - _ZBUS_CPP_EXTERN const STRUCT_SECTION_ITERABLE(zbus_channel, _name) = { \ - ZBUS_CHANNEL_NAME_INIT(_name) /* Maybe removed */ \ - .message = &_CONCAT(_zbus_message_, _name), \ - .message_size = sizeof(_type), .user_data = _user_data, .validator = (_validator), \ - .data = &_CONCAT(_zbus_chan_data_, _name)}; \ - /* Extern declaration of observers */ \ - ZBUS_OBS_DECLARE(_observers); \ - /* Create all channel observations from observers list */ \ +#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \ + static _type _CONCAT(_zbus_message_, _name) = _init_val; \ + static struct zbus_channel_data _CONCAT(_zbus_chan_data_, _name) = { \ + .observers_start_idx = -1, \ + .observers_end_idx = -1, \ + IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \ + .highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY, \ + )) \ + }; \ + static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \ + _ZBUS_CPP_EXTERN const STRUCT_SECTION_ITERABLE(zbus_channel, _name) = { \ + ZBUS_CHANNEL_NAME_INIT(_name) /* Maybe removed */ \ + .message = &_CONCAT(_zbus_message_, _name), \ + .message_size = sizeof(_type), \ + .user_data = _user_data, \ + .validator = _validator, \ + .data = &_CONCAT(_zbus_chan_data_, _name), \ + }; \ + /* Extern declaration of observers */ \ + ZBUS_OBS_DECLARE(_observers); \ + /* Create all channel observations from observers list */ \ FOR_EACH_FIXED_ARG_NONEMPTY_TERM(_ZBUS_CHAN_OBSERVATION, (;), _name, _observers) +/* clang-format on */ /** * @brief Initialize a message. @@ -334,6 +368,7 @@ struct zbus_channel_observation { _val, ##__VA_ARGS__ \ } +/* clang-format off */ /** * @brief Define and initialize a subscriber. * @@ -345,13 +380,25 @@ struct zbus_channel_observation { * @param[in] _queue_size The notification queue's size. * @param[in] _enable The subscriber initial enable state. */ -#define ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, _enable) \ - K_MSGQ_DEFINE(_zbus_observer_queue_##_name, sizeof(const struct zbus_channel *), \ - _queue_size, sizeof(const struct zbus_channel *)); \ - STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ - ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ - .type = ZBUS_OBSERVER_SUBSCRIBER_TYPE, \ - .enabled = _enable, .queue = &_zbus_observer_queue_##_name} +#define ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, _enable) \ + K_MSGQ_DEFINE(_zbus_observer_queue_##_name, \ + sizeof(const struct zbus_channel *), \ + _queue_size, sizeof(const struct zbus_channel *) \ + ); \ + static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \ + .enabled = _enable, \ + IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \ + .priority = ZBUS_MIN_THREAD_PRIORITY, \ + )) \ + }; \ + STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ + ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_SUBSCRIBER_TYPE, \ + .data = &_CONCAT(_zbus_obs_data_, _name), \ + .queue = &_zbus_observer_queue_##_name, \ + } +/* clang-format on */ + /** * @brief Define and initialize a subscriber. * @@ -366,6 +413,7 @@ struct zbus_channel_observation { #define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size) \ ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, true) +/* clang-format off */ /** * @brief Define and initialize a listener. * @@ -378,10 +426,20 @@ struct zbus_channel_observation { * @param[in] _enable The listener initial enable state. */ #define ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, _enable) \ - STRUCT_SECTION_ITERABLE(zbus_observer, \ - _name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ - .type = ZBUS_OBSERVER_LISTENER_TYPE, \ - .enabled = _enable, .callback = (_cb)} + static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \ + .enabled = _enable, \ + IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \ + .priority = ZBUS_MIN_THREAD_PRIORITY, \ + )) \ + }; \ + STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ + ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_LISTENER_TYPE, \ + .data = &_CONCAT(_zbus_obs_data_, _name), \ + .callback = (_cb) \ + } +/* clang-format on */ + /** * @brief Define and initialize a listener. * @@ -394,6 +452,7 @@ struct zbus_channel_observation { */ #define ZBUS_LISTENER_DEFINE(_name, _cb) ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, true) +/* clang-format off */ /** * @brief Define and initialize a message subscriber. * @@ -404,14 +463,21 @@ struct zbus_channel_observation { * @param[in] _name The subscriber's name. * @param[in] _enable The subscriber's initial state. */ -#define ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _enable) \ - static K_FIFO_DEFINE(_zbus_observer_fifo_##_name); \ - STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ - ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ - .type = ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, \ - .enabled = _enable, \ - .message_fifo = &_zbus_observer_fifo_##_name, \ +#define ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _enable) \ + static K_FIFO_DEFINE(_zbus_observer_fifo_##_name); \ + static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \ + .enabled = _enable, \ + IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \ + .priority = ZBUS_MIN_THREAD_PRIORITY, \ + )) \ + }; \ + STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ + ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, \ + .data = &_CONCAT(_zbus_obs_data_, _name), \ + .message_fifo = &_zbus_observer_fifo_##_name, \ } +/* clang-format on */ /** * @brief Define and initialize an enabled message subscriber. @@ -501,8 +567,6 @@ int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout); * @param chan The channel's reference. * * @retval 0 Channel finished. - * @retval -EPERM The channel was claimed by other thread. - * @retval -EINVAL The channel's mutex is not locked. * @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The * function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. */ @@ -519,9 +583,9 @@ int zbus_chan_finish(const struct zbus_channel *chan); * or one of the special values K_NO_WAIT and K_FOREVER. * * @retval 0 Channel notified. - * @retval -EPERM The current thread does not own the channel. - * @retval -EBUSY The channel's mutex returned without waiting. - * @retval -EAGAIN Timeout to acquiring the channel's mutex. + * @retval -EBUSY The channel's semaphore returned without waiting. + * @retval -EAGAIN Timeout to take the channel's semaphore. + * @retval -ENOMEM There is not more buffer on the messgage buffers pool. * @retval -EFAULT A parameter is incorrect, the notification could not be sent to one or more * observer, or the function context is invalid (inside an ISR). The function only returns this * value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. @@ -553,7 +617,7 @@ static inline const char *zbus_chan_name(const struct zbus_channel *chan) * * This routine returns the reference of a channel message. * - * @warning This function must only be used directly for acquired (locked by mutex) channels. This + * @warning This function must only be used directly for already locked channels. This * can be done inside a listener for the receiving channel or after claim a channel. * * @param chan The channel's reference. @@ -574,7 +638,7 @@ static inline void *zbus_chan_msg(const struct zbus_channel *chan) * inside listeners to access the message directly. In this way zbus prevents the listener of * changing the notifying channel's message during the notification process. * - * @warning This function must only be used directly for acquired (locked by mutex) channels. This + * @warning This function must only be used directly for already locked channels. This * can be done inside a listener for the receiving channel or after claim a channel. * * @param chan The channel's constant reference. @@ -685,14 +749,7 @@ struct zbus_observer_node { * @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The * function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. */ -static inline int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled) -{ - _ZBUS_ASSERT(obs != NULL, "obs is required"); - - obs->enabled = enabled; - - return 0; -} +int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled); /** * @brief Get the observer state. @@ -709,7 +766,7 @@ static inline int zbus_obs_is_enabled(struct zbus_observer *obs, bool *enable) _ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(enable != NULL, "enable is required"); - *enable = obs->enabled; + *enable = obs->data->enabled; return 0; } @@ -766,6 +823,32 @@ static inline const char *zbus_obs_name(const struct zbus_observer *obs) #endif +#if defined(CONFIG_ZBUS_PRIORITY_BOOST) || defined(__DOXYGEN__) + +/** + * @brief Set the observer thread priority by attaching it to a thread. + * + * @param[in] obs The observer's reference. + * + * @retval 0 Observer detached from the thread. + * @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The + * function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. + */ +int zbus_obs_attach_to_thread(const struct zbus_observer *obs); + +/** + * @brief Clear the observer thread priority by detaching it from a thread. + * + * @param[in] obs The observer's reference. + * + * @retval 0 Observer detached from the thread. + * @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The + * function only returns this value when the CONFIG_ZBUS_ASSERT_MOCK is enabled. + */ +int zbus_obs_detach_from_thread(const struct zbus_observer *obs); + +#endif /* CONFIG_ZBUS_PRIORITY_BOOST */ + /** * @brief Wait for a channel notification. * diff --git a/subsys/zbus/Kconfig b/subsys/zbus/Kconfig index 72f6c0c561..dfb839abe0 100644 --- a/subsys/zbus/Kconfig +++ b/subsys/zbus/Kconfig @@ -53,6 +53,13 @@ endif # ZBUS_MSG_SUBSCRIBER config ZBUS_RUNTIME_OBSERVERS bool "Runtime observers support." +config ZBUS_PRIORITY_BOOST + bool "ZBus priority boost algorithm" + default y + help + ZBus implements the Highest Locker Protocol that relies on the observers’ thread priority + to determine a temporary publisher priority. + config ZBUS_ASSERT_MOCK bool "Zbus assert mock for test purposes." help diff --git a/subsys/zbus/zbus.c b/subsys/zbus/zbus.c index 959ef59aa8..d16cc27d96 100644 --- a/subsys/zbus/zbus.c +++ b/subsys/zbus/zbus.c @@ -12,6 +12,13 @@ #include LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL); +#if defined(CONFIG_ZBUS_PRIORITY_BOOST) +/* Available only when the priority boost is enabled */ +static struct k_spinlock _zbus_chan_slock; +#endif /* CONFIG_ZBUS_PRIORITY_BOOST */ + +static struct k_spinlock obs_slock; + #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC) @@ -69,7 +76,7 @@ int _zbus_init(void) ++(curr->data->observers_end_idx); } STRUCT_SECTION_FOREACH(zbus_channel, chan) { - k_mutex_init(&chan->data->mutex); + k_sem_init(&chan->data->sem, 1, 1); #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) sys_slist_init(&chan->data->observers); @@ -145,7 +152,7 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t const struct zbus_observer *obs = observation->obs; - if (!obs->enabled || observation_mask->enabled) { + if (!obs->data->enabled || observation_mask->enabled) { continue; } @@ -174,7 +181,7 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t const struct zbus_observer *obs = obs_nd->obs; - if (!obs->enabled) { + if (!obs->data->enabled) { continue; } @@ -191,21 +198,171 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t return last_error; } +#if defined(CONFIG_ZBUS_PRIORITY_BOOST) + +static inline void chan_update_hop(const struct zbus_channel *chan) +{ + struct zbus_channel_observation *observation; + struct zbus_channel_observation_mask *observation_mask; + + int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY; + + K_SPINLOCK(&_zbus_chan_slock) { + const int limit = chan->data->observers_end_idx; + + for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); + + __ASSERT(observation != NULL, "observation must be not NULL"); + + const struct zbus_observer *obs = observation->obs; + + if (!obs->data->enabled || observation_mask->enabled) { + continue; + } + + if (chan_highest_observer_priority > obs->data->priority) { + chan_highest_observer_priority = obs->data->priority; + } + } + chan->data->highest_observer_priority = chan_highest_observer_priority; + } +} + +static inline void update_all_channels_hop(const struct zbus_observer *obs) +{ + struct zbus_channel_observation *observation; + + int count; + + STRUCT_SECTION_COUNT(zbus_channel_observation, &count); + + for (int16_t i = 0; i < count; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + + if (obs != observation->obs) { + continue; + } + + chan_update_hop(observation->chan); + } +} + +int zbus_obs_attach_to_thread(const struct zbus_observer *obs) +{ + _ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR"); + _ZBUS_ASSERT(obs != NULL, "obs is required"); + + int current_thread_priority = k_thread_priority_get(k_current_get()); + + K_SPINLOCK(&obs_slock) { + if (obs->data->priority != current_thread_priority) { + obs->data->priority = current_thread_priority; + + update_all_channels_hop(obs); + } + } + + return 0; +} + +int zbus_obs_detach_from_thread(const struct zbus_observer *obs) +{ + _ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR"); + _ZBUS_ASSERT(obs != NULL, "obs is required"); + + K_SPINLOCK(&obs_slock) { + obs->data->priority = ZBUS_MIN_THREAD_PRIORITY; + + update_all_channels_hop(obs); + } + + return 0; +} + +#else + +static inline void update_all_channels_hop(const struct zbus_observer *obs) +{ +} + +#endif /* CONFIG_ZBUS_PRIORITY_BOOST */ + +static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio) +{ + bool boosting = false; + +#if defined(CONFIG_ZBUS_PRIORITY_BOOST) + if (!k_is_in_isr()) { + *prio = k_thread_priority_get(k_current_get()); + + K_SPINLOCK(&_zbus_chan_slock) { + if (*prio > chan->data->highest_observer_priority) { + int new_prio = chan->data->highest_observer_priority - 1; + + new_prio = MAX(new_prio, 0); + + /* Elevating priority since the highest_observer_priority is + * greater than the current thread + */ + k_thread_priority_set(k_current_get(), new_prio); + + boosting = true; + } + } + } +#endif /* CONFIG_ZBUS_PRIORITY_BOOST */ + + int err = k_sem_take(&chan->data->sem, timeout); + + if (err) { + /* When the priority boost is disabled, this IF will be optimized out. */ + if (boosting) { + /* Restoring thread priority since the semaphore is not available */ + k_thread_priority_set(k_current_get(), *prio); + } + + return err; + } + + return 0; +} + +static inline void chan_unlock(const struct zbus_channel *chan, int prio) +{ + k_sem_give(&chan->data->sem); + +#if defined(CONFIG_ZBUS_PRIORITY_BOOST) + /* During the unlock phase, with the priority boost enabled, the priority must be + * restored to the original value in case it was elevated + */ + if (prio < ZBUS_MIN_THREAD_PRIORITY) { + k_thread_priority_set(k_current_get(), prio); + } +#endif /* CONFIG_ZBUS_PRIORITY_BOOST */ +} + int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout) { int err; - _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); + if (k_is_in_isr()) { + timeout = K_NO_WAIT; + } + k_timepoint_t end_time = sys_timepoint_calc(timeout); if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) { return -ENOMSG; } - err = k_mutex_lock(&chan->data->mutex, timeout); + int context_priority = ZBUS_MIN_THREAD_PRIORITY; + + err = chan_lock(chan, timeout, &context_priority); if (err) { return err; } @@ -214,56 +371,67 @@ int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t err = _zbus_vded_exec(chan, end_time); - k_mutex_unlock(&chan->data->mutex); + chan_unlock(chan, context_priority); return err; } int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout) { - int err; - - _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); - err = k_mutex_lock(&chan->data->mutex, timeout); + if (k_is_in_isr()) { + timeout = K_NO_WAIT; + } + + int err = k_sem_take(&chan->data->sem, timeout); if (err) { return err; } memcpy(msg, chan->message, chan->message_size); - return k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); + + return 0; } int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout) { int err; - _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); + if (k_is_in_isr()) { + timeout = K_NO_WAIT; + } + k_timepoint_t end_time = sys_timepoint_calc(timeout); - err = k_mutex_lock(&chan->data->mutex, timeout); + int context_priority = ZBUS_MIN_THREAD_PRIORITY; + + err = chan_lock(chan, timeout, &context_priority); if (err) { return err; } err = _zbus_vded_exec(chan, end_time); - k_mutex_unlock(&chan->data->mutex); + chan_unlock(chan, context_priority); return err; } int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) { - _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); - int err = k_mutex_lock(&chan->data->mutex, timeout); + if (k_is_in_isr()) { + timeout = K_NO_WAIT; + } + + int err = k_sem_take(&chan->data->sem, timeout); if (err) { return err; @@ -274,18 +442,17 @@ int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) int zbus_chan_finish(const struct zbus_channel *chan) { - _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); - int err = k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); - return err; + return 0; } int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout) { - _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); + _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs"); _ZBUS_ASSERT(sub != NULL, "sub is required"); _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER"); _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required"); @@ -299,7 +466,7 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, k_timeout_t timeout) { - _ZBUS_ASSERT(!k_is_in_isr(), "zbus subscribers cannot be used inside ISRs"); + _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs"); _ZBUS_ASSERT(sub != NULL, "sub is required"); _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, "sub must be a MSG_SUBSCRIBER"); @@ -330,22 +497,35 @@ int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, _ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); + int err = -ESRCH; + struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; - for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; - i < limit; ++i) { - STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); - STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); + K_SPINLOCK(&obs_slock) { + for (int16_t i = chan->data->observers_start_idx, + limit = chan->data->observers_end_idx; + i < limit; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); - _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); + __ASSERT(observation != NULL, "observation must be not NULL"); - if (observation->obs == obs) { - observation_mask->enabled = masked; - return 0; + if (observation->obs == obs) { + if (observation_mask->enabled != masked) { + observation_mask->enabled = masked; + + update_all_channels_hop(obs); + } + + err = 0; + + K_SPINLOCK_BREAK; + } } } - return -ESRCH; + + return err; } int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, @@ -354,20 +534,44 @@ int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, _ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); + int err = -ESRCH; + struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; - for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; - i < limit; ++i) { - STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); - STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); + K_SPINLOCK(&obs_slock) { + const int limit = chan->data->observers_end_idx; - _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); + for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); - if (observation->obs == obs) { - *masked = observation_mask->enabled; - return 0; + __ASSERT(observation != NULL, "observation must be not NULL"); + + if (observation->obs == obs) { + *masked = observation_mask->enabled; + + err = 0; + + K_SPINLOCK_BREAK; + } } } - return -ESRCH; + + return err; +} + +int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled) +{ + _ZBUS_ASSERT(obs != NULL, "obs is required"); + + K_SPINLOCK(&obs_slock) { + if (obs->data->enabled != enabled) { + obs->data->enabled = enabled; + + update_all_channels_hop(obs); + } + } + + return 0; } diff --git a/subsys/zbus/zbus_runtime_observers.c b/subsys/zbus/zbus_runtime_observers.c index 5fc47484de..56c7cbb453 100644 --- a/subsys/zbus/zbus_runtime_observers.c +++ b/subsys/zbus/zbus_runtime_observers.c @@ -19,7 +19,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(obs != NULL, "obs is required"); - err = k_mutex_lock(&chan->data->mutex, timeout); + err = k_sem_take(&chan->data->sem, timeout); if (err) { return err; } @@ -31,7 +31,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe __ASSERT(observation != NULL, "observation must be not NULL"); if (observation->obs == obs) { - k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); return -EEXIST; } @@ -40,7 +40,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe /* Check if the observer is already a runtime observer */ SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { if (obs_nd->obs == obs) { - k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); return -EALREADY; } @@ -51,7 +51,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe if (new_obs_nd == NULL) { LOG_ERR("Could not allocate observer node the heap is full!"); - k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); return -ENOMEM; } @@ -60,7 +60,7 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe sys_slist_append(&chan->data->observers, &new_obs_nd->node); - k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); return 0; } @@ -76,7 +76,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(obs != NULL, "obs is required"); - err = k_mutex_lock(&chan->data->mutex, timeout); + err = k_sem_take(&chan->data->sem, timeout); if (err) { return err; } @@ -87,7 +87,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer k_free(obs_nd); - k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); return 0; } @@ -95,7 +95,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer prev_obs_nd = obs_nd; } - k_mutex_unlock(&chan->data->mutex); + k_sem_give(&chan->data->sem); return -ENODATA; }