zbus: improve the way of storing observers

ZBus stores observers in two ways: statically using a list and dynamically
using a memory slab. Both present limitations. Static observers work only
for channel definition. The dynamic observers rely on a memory slab that
forces the user to manage its size to avoid issues with adding
observers. This commit fixes the static allocation problem by using the
iterable sections for allocating observation data and replacing the VDED
execution sequence since now it is possible to prioritize static observer
execution. All the runtime observers are dynamically allocated on the heap
instead of a specific memory pool.

BREAK changes (only internal, not APIs):

* ZBus channel metadata changed. Remove the observers' static array
pointer. Rename the `runtime_observers` pointer to `observers`. Add
`observer_start_idx` and `observer_end_idx`;
* Change the VDED execution sequence. The position (on definition time),
the priority in conjunction with the lexical order, is considered for
static post-definition time observers. At last, the runtime observer
follows the adding sequence;
* Replace the `CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE` with
`CONFIG_ZBUS_RUNTIME_OBSERVERS`.

New APIs:

* New iterable section iterators (for channels and observers) can now
receive a user_data pointer to keep context between the function calls;
* New `ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, _enable)` and
`ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, enable)` that
enable developers define disabled observers. They need to be enabled
during runtime to receive notifications from the bus;
* `ZBUS_CHAN_ADD_OBS` macro for adding post-definition static observers of
a channel.

Important changes:

* Move the ZBus LD file content to the `common-ram.ld` LD file. That was
necessary to make ZBus compatible with some Xtensa and RISCV boards.

Signed-off-by: Rodrigo Peixoto <rodrigopex@gmail.com>
This commit is contained in:
Rodrigo Peixoto 2023-07-22 12:55:48 -03:00 committed by Carles Cufí
parent 6a0de9bb7c
commit 7e44469dcc
11 changed files with 498 additions and 206 deletions

View file

@ -103,8 +103,8 @@ if(CONFIG_ZTEST_NEW_API)
endif()
if(CONFIG_ZBUS)
zephyr_iterable_section(NAME zbus_channel GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4)
zephyr_iterable_section(NAME zbus_observer GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4)
zephyr_iterable_section(NAME zbus_channel_observation_mask GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4)
endif()
if(CONFIG_UVB)

View file

@ -213,3 +213,8 @@ endif()
if(CONFIG_USBD_MSC_CLASS)
zephyr_iterable_section(NAME usbd_msc_lun KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4)
endif()
if(CONFIG_ZBUS)
zephyr_iterable_section(NAME zbus_channel KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4)
zephyr_iterable_section(NAME zbus_channel_observation KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4)
endif()

View file

@ -124,6 +124,11 @@
ITERABLE_SECTION_RAM(sensing_sensor, 4)
#endif /* CONFIG_SENSING */
#if defined(CONFIG_ZBUS)
ITERABLE_SECTION_RAM(zbus_observer, 4)
ITERABLE_SECTION_RAM(zbus_channel_observation_mask, 1)
#endif /* CONFIG_ZBUS */
#ifdef CONFIG_USERSPACE
_static_kernel_objects_end = .;
#endif

View file

@ -34,6 +34,11 @@
ITERABLE_SECTION_ROM(emul, 4)
#endif /* CONFIG_EMUL */
#if defined(CONFIG_ZBUS)
ITERABLE_SECTION_ROM(zbus_channel, 4)
ITERABLE_SECTION_ROM(zbus_channel_observation, 4)
#endif /* CONFIG_ZBUS */
SECTION_DATA_PROLOGUE(symbol_to_keep,,)
{
__symbol_to_keep_start = .;

View file

@ -22,6 +22,35 @@ extern "C" {
* @{
*/
/**
* @brief Type used to represent a channel mutable data.
*
* Every channel has a zbus_channel_data structure associated.
*/
struct zbus_channel_data {
/** Static channel observer list start index. Considering the ITERABLE SECTIONS allocation
* order.
*/
int16_t observers_start_idx;
/** Static channel observer list end index. Considering the ITERABLE SECTIONS allocation
* order.
*/
int16_t observers_end_idx;
/** Access control mutex. Points to the mutex used to avoid race conditions
* for accessing the channel.
*/
struct k_mutex mutex;
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__)
/** Channel observer list. Represents the channel's observers list, it can be empty
* or have listeners and subscribers mixed in any sequence. It can be changed in runtime.
*/
sys_slist_t observers;
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
};
/**
* @brief Type used to represent a channel.
*
@ -33,40 +62,37 @@ struct zbus_channel {
/** Channel name. */
const char *const name;
#endif
/** Message reference. Represents the message's reference that points to the actual
* shared memory region.
*/
void *const message;
/** Message size. Represents the channel's message size. */
const uint16_t message_size;
const size_t message_size;
/** User data available to extend zbus features. The channel must be claimed before
* using this field.
*/
void *const user_data;
/** Message reference. Represents the message's reference that points to the actual
* shared memory region.
*/
void *const message;
/** Message validator. Stores the reference to the function to check the message
* validity before actually performing the publishing. No invalid messages can be
* published. Every message is valid when this field is empty.
*/
bool (*const validator)(const void *msg, size_t msg_size);
/** Access control mutex. Points to the mutex used to avoid race conditions
* for accessing the channel.
*/
struct k_mutex *mutex;
#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0) || defined(__DOXYGEN__)
/** Dynamic channel observer list. Represents the channel's observers list, it can be empty
* or have listeners and subscribers mixed in any sequence. It can be changed in runtime.
*/
sys_slist_t *runtime_observers;
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
/** Mutable channel data struct. */
struct zbus_channel_data *const data;
};
/** Channel observer list. Represents the channel's observers list, it can be empty or
* have listeners and subscribers mixed in any sequence.
*/
const struct zbus_observer *const *observers;
/**
* @brief Type used to represent an observer type.
*
* A observer can be a listener or a subscriber.
*/
enum __packed zbus_observer_type {
ZBUS_OBSERVER_LISTENER_TYPE,
ZBUS_OBSERVER_SUBSCRIBER_TYPE
};
/**
@ -89,16 +115,30 @@ struct zbus_observer {
/** Observer name. */
const char *const name;
#endif
/** Type indication. */
enum zbus_observer_type type;
/** Enabled flag. Indicates if observer is receiving notification. */
bool enabled;
/** Observer message queue. It turns the observer into a subscriber. */
struct k_msgq *const queue;
/** Observer callback function. It turns the observer into a listener. */
void (*const callback)(const struct zbus_channel *chan);
union {
/** Observer message queue. It turns the observer into a subscriber. */
struct k_msgq *const queue;
/** Observer callback function. It turns the observer into a listener. */
void (*const callback)(const struct zbus_channel *chan);
};
};
/** @cond INTERNAL_HIDDEN */
struct zbus_channel_observation_mask {
bool enabled;
};
struct zbus_channel_observation {
const struct zbus_channel *const chan;
const struct zbus_observer *const obs;
};
#if defined(CONFIG_ZBUS_ASSERT_MOCK)
#define _ZBUS_ASSERT(_cond, _fmt, ...) \
@ -122,13 +162,13 @@ struct zbus_observer {
#if defined(CONFIG_ZBUS_OBSERVER_NAME)
#define ZBUS_OBSERVER_NAME_INIT(_name) .name = #_name,
#define _ZBUS_OBS_NAME(_obs) (_obs)->name
#define _ZBUS_OBS_NAME(_obs) (_obs)->name
#else
#define ZBUS_OBSERVER_NAME_INIT(_name)
#define _ZBUS_OBS_NAME(_obs) ""
#endif
#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
#define ZBUS_RUNTIME_OBSERVERS_LIST_DECL(_slist_name) static sys_slist_t _slist_name
#define ZBUS_RUNTIME_OBSERVERS_LIST_INIT(_slist_name) .runtime_observers = &_slist_name,
#else
@ -136,26 +176,84 @@ struct zbus_observer {
#define ZBUS_RUNTIME_OBSERVERS_LIST_INIT(_slist_name) /* No runtime observers */
#endif
#if defined(CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS)
#define _ZBUS_STRUCT_DECLARE(_type, _name) STRUCT_SECTION_ITERABLE(_type, _name)
#else
#define _ZBUS_STRUCT_DECLARE(_type, _name) struct _type _name
#endif /* CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS */
#define _ZBUS_OBS_EXTERN(_name) extern struct zbus_observer _name
#define _ZBUS_CHAN_EXTERN(_name) extern const struct zbus_channel _name
#define ZBUS_REF(_value) &(_value)
#define FOR_EACH_FIXED_ARG_NONEMPTY_TERM(F, sep, fixed_arg, ...) \
COND_CODE_0(/* are there zero non-empty arguments ? */ \
NUM_VA_ARGS_LESS_1( \
LIST_DROP_EMPTY(__VA_ARGS__, _)), /* if so, expand to nothing */ \
(), /* otherwise, expand to: */ \
(FOR_EACH_IDX_FIXED_ARG( \
F, sep, fixed_arg, \
LIST_DROP_EMPTY(__VA_ARGS__)) /* plus a final terminator */ \
__DEBRACKET sep))
#define _ZBUS_OBSERVATION_PREFIX(_idx) \
GET_ARG_N(_idx, 00, 01, 02, 03, 04, 05, 06, 07, 08, 09, 10, 11, 12, 13, 14, 15, 16, 17, \
18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, \
38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, \
58, 59, 60, 61, 62, 63)
#define _ZBUS_CHAN_OBSERVATION(_idx, _obs, _chan) \
const STRUCT_SECTION_ITERABLE( \
zbus_channel_observation, \
_CONCAT(_chan, _ZBUS_OBSERVATION_PREFIX(UTIL_INC(_idx)))) = {.chan = &_chan, \
.obs = &_obs}; \
STRUCT_SECTION_ITERABLE(zbus_channel_observation_mask, \
_CONCAT(_CONCAT(_chan, _ZBUS_OBSERVATION_PREFIX(UTIL_INC(_idx))), \
_mask)) = {.enabled = false};
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__)
#define _ZBUS_RUNTIME_OBSERVERS(_name) .observers = &(_CONCAT(_observers_, _name)),
#define _ZBUS_RUNTIME_OBSERVERS_DECL(_name) static sys_slist_t _CONCAT(_observers_, _name);
#else
#define _ZBUS_RUNTIME_OBSERVERS(_name)
#define _ZBUS_RUNTIME_OBSERVERS_DECL(_name)
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
k_timeout_t _zbus_timeout_remainder(uint64_t end_ticks);
/** @endcond */
/**
* @brief Add a static channel observervation.
*
* This macro initializes a channel observation by receiving the
* channel and the observer.
*
* @param _chan Channel instance.
* @param _obs Observer instance.
* @param _masked Observation state.
* @param _prio Observer notification sequence priority.
*/
#define ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, _masked, _prio) \
const STRUCT_SECTION_ITERABLE(zbus_channel_observation, \
_CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs))) = { \
.chan = &_chan, .obs = &_obs}; \
STRUCT_SECTION_ITERABLE(zbus_channel_observation_mask, \
_CONCAT(_CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs)), \
_mask)) = {.enabled = _masked}
/**
* @brief Add a static channel observervation.
*
* This macro initializes a channel observation by receiving the
* channel and the observer.
*
* @param _chan Channel instance.
* @param _obs Observer instance.
* @param _prio Observer notification sequence priority.
*/
#define ZBUS_CHAN_ADD_OBS(_chan, _obs, _prio) ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, false, _prio)
/**
* @def ZBUS_OBS_DECLARE
* This macro list the observers to be used in a file. Internally, it declares the observers with
* the extern statement. Note it is only necessary when the observers are declared outside the file.
*/
#define ZBUS_OBS_DECLARE(...) FOR_EACH(_ZBUS_OBS_EXTERN, (;), __VA_ARGS__)
#define ZBUS_OBS_DECLARE(...) FOR_EACH_NONEMPTY_TERM(_ZBUS_OBS_EXTERN, (;), __VA_ARGS__)
/**
* @def ZBUS_CHAN_DECLARE
@ -192,23 +290,20 @@ struct zbus_observer {
* first the highest priority.
* @param _init_val The message initialization.
*/
#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \
static _type _CONCAT(_zbus_message_, _name) = _init_val; \
static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \
ZBUS_RUNTIME_OBSERVERS_LIST_DECL(_CONCAT(_runtime_observers_, _name)); \
FOR_EACH_NONEMPTY_TERM(_ZBUS_OBS_EXTERN, (;), _observers) \
static const struct zbus_observer *const _CONCAT(_zbus_observers_, _name)[] = { \
FOR_EACH_NONEMPTY_TERM(ZBUS_REF, (,), _observers) NULL}; \
const _ZBUS_STRUCT_DECLARE(zbus_channel, _name) = { \
ZBUS_CHANNEL_NAME_INIT(_name) /* Name */ \
.message_size = sizeof(_type), /* Message size */ \
.user_data = _user_data, /* User data */ \
.message = &_CONCAT(_zbus_message_, _name), /* Reference to the message */\
.validator = (_validator), /* Validator function */ \
.mutex = &_CONCAT(_zbus_mutex_, _name), /* Channel's Mutex */ \
ZBUS_RUNTIME_OBSERVERS_LIST_INIT( \
_CONCAT(_runtime_observers_, _name)) /* Runtime observer list */ \
.observers = _CONCAT(_zbus_observers_, _name)} /* Static observer list */
#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \
static _type _CONCAT(_zbus_message_, _name) = _init_val; \
static struct zbus_channel_data _CONCAT(_zbus_chan_data_, _name) = { \
.observers_start_idx = -1, .observers_end_idx = -1}; \
static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \
const STRUCT_SECTION_ITERABLE(zbus_channel, _name) = { \
ZBUS_CHANNEL_NAME_INIT(_name) /* Maybe removed */ \
.message = &_CONCAT(_zbus_message_, _name), \
.message_size = sizeof(_type), .user_data = _user_data, .validator = (_validator), \
.data = &_CONCAT(_zbus_chan_data_, _name)}; \
/* Extern declaration of observers */ \
ZBUS_OBS_DECLARE(_observers); \
/* Create all channel observations from observers list */ \
FOR_EACH_FIXED_ARG_NONEMPTY_TERM(_ZBUS_CHAN_OBSERVATION, (;), _name, _observers)
/**
* @brief Initialize a message.
@ -233,14 +328,27 @@ struct zbus_observer {
*
* @param[in] _name The subscriber's name.
* @param[in] _queue_size The notification queue's size.
* @param[in] _enable The subscriber initial enable state.
*/
#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size) \
#define ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, _enable) \
K_MSGQ_DEFINE(_zbus_observer_queue_##_name, sizeof(const struct zbus_channel *), \
_queue_size, sizeof(const struct zbus_channel *)); \
_ZBUS_STRUCT_DECLARE(zbus_observer, \
_name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.enabled = true, \
.queue = &_zbus_observer_queue_##_name, .callback = NULL}
STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \
ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.type = ZBUS_OBSERVER_SUBSCRIBER_TYPE, \
.enabled = _enable, .queue = &_zbus_observer_queue_##_name}
/**
* @brief Define and initialize a subscriber.
*
* This macro defines an observer of subscriber type. It defines a message queue where the
* subscriber will receive the notification asynchronously, and initialize the ``struct
* zbus_observer`` defining the subscriber.
*
* @param[in] _name The subscriber's name.
* @param[in] _queue_size The notification queue's size.
*/
#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size) \
ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, true)
/**
* @brief Define and initialize a listener.
@ -251,12 +359,24 @@ struct zbus_observer {
*
* @param[in] _name The listener's name.
* @param[in] _cb The callback function.
* @param[in] _enable The listener initial enable state.
*/
#define ZBUS_LISTENER_DEFINE(_name, _cb) \
_ZBUS_STRUCT_DECLARE(zbus_observer, \
_name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.enabled = true, \
.queue = NULL, .callback = (_cb)}
#define ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, _enable) \
STRUCT_SECTION_ITERABLE(zbus_observer, \
_name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.type = ZBUS_OBSERVER_LISTENER_TYPE, \
.enabled = _enable, .callback = (_cb)}
/**
* @brief Define and initialize a listener.
*
* This macro defines an observer of listener type. This macro establishes the callback where the
* listener will be notified synchronously and initialize the ``struct zbus_observer`` defining the
* listener. The listeners are defined in the disabled state with this macro.
*
* @param[in] _name The listener's name.
* @param[in] _cb The callback function.
*/
#define ZBUS_LISTENER_DEFINE(_name, _cb) ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, true)
/**
*
@ -453,7 +573,7 @@ static inline void *zbus_chan_user_data(const struct zbus_channel *chan)
return chan->user_data;
}
#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0) || defined(__DOXYGEN__)
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__)
/**
* @brief Add an observer to a channel.
@ -494,15 +614,6 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout);
/**
* @brief Get zbus runtime observers pool.
*
* This routine returns a reference of the runtime observers pool.
*
* @return Reference of runtime observers pool.
*/
struct k_mem_slab *zbus_runtime_obs_pool(void);
/** @cond INTERNAL_HIDDEN */
struct zbus_observer_node {
@ -512,7 +623,7 @@ struct zbus_observer_node {
/** @endcond */
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
/**
* @brief Change the observer state.
@ -536,6 +647,58 @@ static inline int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled)
return 0;
}
/**
* @brief Get the observer state.
*
* This routine retrieves the observer state.
*
* @param[in] obs The observer's reference.
* @param[out] enable The boolean output's reference.
*
* @return Observer state.
*/
static inline int zbus_obs_is_enabled(struct zbus_observer *obs, bool *enable)
{
_ZBUS_ASSERT(obs != NULL, "obs is required");
_ZBUS_ASSERT(enable != NULL, "enable is required");
*enable = obs->enabled;
return 0;
}
/**
* @brief Mask notifications from a channel to an observer.
*
* The observer can mask notifications from a specific observing channel by calling this function.
*
* @param obs The observer's reference to be added.
* @param chan The channel's reference.
* @param masked The mask state. When the mask is true, the observer will not receive notifications
* from the channel.
*
* @retval 0 Channel notifications masked to the observer.
* @retval -ESRCH No observation found for the related pair chan/obs.
* @retval -EINVAL Some parameter is invalid.
*/
int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
const struct zbus_channel *chan, bool masked);
/**
* @brief Get the notifications masking state from a channel to an observer.
*
* @param obs The observer's reference to be added.
* @param chan The channel's reference.
* @param[out] masked The mask state. When the mask is true, the observer will not receive
* notifications from the channel.
*
* @retval 0 Retrieved the masked state.
* @retval -ESRCH No observation found for the related pair chan/obs.
* @retval -EINVAL Some parameter is invalid.
*/
int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
const struct zbus_channel *chan, bool *masked);
#if defined(CONFIG_ZBUS_OBSERVER_NAME) || defined(__DOXYGEN__)
/**
@ -577,7 +740,6 @@ static inline const char *zbus_obs_name(const struct zbus_observer *obs)
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
k_timeout_t timeout);
#if defined(CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS) || defined(__DOXYGEN__)
/**
*
* @brief Iterate over channels.
@ -586,10 +748,28 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c
* iterator_func which is called for each channel. If the iterator_func returns false all
* the iteration stops.
*
* @param[in] iterator_func The function that will be execute on each iteration.
*
* @retval true Iterator executed for all channels.
* @retval false Iterator could not be executed. Some iterate returned false.
*/
bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel *chan));
/**
*
* @brief Iterate over channels with user data.
*
* Enables the developer to iterate over the channels giving to this function an
* iterator_func which is called for each channel. If the iterator_func returns false all
* the iteration stops.
*
* @param[in] iterator_func The function that will be execute on each iteration.
* @param[in] user_data The user data that can be passed in the function.
*
* @retval true Iterator executed for all channels.
* @retval false Iterator could not be executed. Some iterate returned false.
*/
bool zbus_iterate_over_channels_with_user_data(
bool (*iterator_func)(const struct zbus_channel *chan, void *user_data), void *user_data);
/**
*
@ -599,12 +779,29 @@ bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel
* iterator_func which is called for each observer. If the iterator_func returns false all
* the iteration stops.
*
* @param[in] iterator_func The function that will be execute on each iteration.
*
* @retval true Iterator executed for all channels.
* @retval false Iterator could not be executed. Some iterate returned false.
*/
bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observer *obs));
/**
*
* @brief Iterate over observers with user data.
*
* Enables the developer to iterate over the observers giving to this function an
* iterator_func which is called for each observer. If the iterator_func returns false all
* the iteration stops.
*
* @param[in] iterator_func The function that will be execute on each iteration.
* @param[in] user_data The user data that can be passed in the function.
*
* @retval true Iterator executed for all channels.
* @retval false Iterator could not be executed. Some iterate returned false.
*/
bool zbus_iterate_over_observers_with_user_data(
bool (*iterator_func)(const struct zbus_observer *obs, void *user_data), void *user_data);
#endif /* CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS */
/**
* @}
*/

View file

@ -4,11 +4,8 @@ zephyr_library()
zephyr_library_sources(zbus.c)
if(CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE GREATER 0)
if(CONFIG_ZBUS_RUNTIME_OBSERVERS)
zephyr_library_sources(zbus_runtime_observers.c)
endif()
if(CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS)
zephyr_library_sources(zbus_iterable_sections.c)
zephyr_linker_sources(DATA_SECTIONS zbus.ld)
endif()
zephyr_library_sources(zbus_iterable_sections.c)

View file

@ -8,10 +8,9 @@ menuconfig ZBUS
if ZBUS
config ZBUS_STRUCTS_ITERABLE_ACCESS
bool "Zbus iterable sections support."
depends on !XTENSA
default y
config ZBUS_CHANNELS_SYS_INIT_PRIORITY
default 5
int "The priority used during the SYS_INIT procedure."
config ZBUS_CHANNEL_NAME
bool "Channel name field"
@ -19,14 +18,9 @@ config ZBUS_CHANNEL_NAME
config ZBUS_OBSERVER_NAME
bool "Observer name field"
config ZBUS_RUNTIME_OBSERVERS_POOL_SIZE
int "The size of the runtime observers pool."
default 0
help
When the size is bigger than zero this feature will be enabled. It applies the Object Pool Pattern,
where the objects in the pool are pre-allocated and can be used and recycled after use. The
technique avoids dynamic allocation and allows the code to increase the number of observers by
only changing a configuration.
config ZBUS_RUNTIME_OBSERVERS
bool "Runtime observers support."
default n
config ZBUS_ASSERT_MOCK
bool "Zbus assert mock for test purposes."

View file

@ -4,118 +4,145 @@
*/
#include <zephyr/kernel.h>
#include <zephyr/init.h>
#include <zephyr/sys/iterable_sections.h>
#include <zephyr/logging/log.h>
#include <zephyr/sys/printk.h>
#include <zephyr/zbus/zbus.h>
LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0)
static inline void _zbus_notify_runtime_listeners(const struct zbus_channel *chan)
int _zbus_init(void)
{
__ASSERT(chan != NULL, "chan is required");
const struct zbus_channel *curr = NULL;
const struct zbus_channel *prev = NULL;
struct zbus_observer_node *obs_nd, *tmp;
STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
curr = observation->chan;
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
__ASSERT(obs_nd != NULL, "observer node is NULL");
if (obs_nd->obs->enabled && (obs_nd->obs->callback != NULL)) {
obs_nd->obs->callback(chan);
if (prev != curr) {
if (prev == NULL) {
curr->data->observers_start_idx = 0;
curr->data->observers_end_idx = 0;
} else {
curr->data->observers_start_idx = prev->data->observers_end_idx;
curr->data->observers_end_idx = prev->data->observers_end_idx;
}
prev = curr;
}
++(curr->data->observers_end_idx);
}
STRUCT_SECTION_FOREACH(zbus_channel, chan) {
k_mutex_init(&chan->data->mutex);
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
sys_slist_init(&chan->data->observers);
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
}
return 0;
}
SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY);
static inline int _zbus_notify_observer(const struct zbus_channel *chan,
const struct zbus_observer *obs, k_timepoint_t end_time)
{
int err = 0;
if (obs->type == ZBUS_OBSERVER_LISTENER_TYPE) {
obs->callback(chan);
} else if (obs->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE) {
err = k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
} else {
CODE_UNREACHABLE;
}
return err;
}
static inline int _zbus_notify_runtime_subscribers(const struct zbus_channel *chan,
k_timepoint_t end_time)
static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
{
__ASSERT(chan != NULL, "chan is required");
int err = 0;
int last_error = 0;
int last_error = 0, err;
_ZBUS_ASSERT(chan != NULL, "chan is required");
/* Static observer event dispatcher logic */
struct zbus_channel_observation *observation;
struct zbus_channel_observation_mask *observation_mask;
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
i < limit; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
_ZBUS_ASSERT(observation != NULL, "observation must be not NULL");
const struct zbus_observer *obs = observation->obs;
if (!obs->enabled || observation_mask->enabled) {
continue;
}
err = _zbus_notify_observer(chan, obs, end_time);
_ZBUS_ASSERT(err == 0,
"could not deliver notification to observer %s. Error code %d",
_ZBUS_OBS_NAME(obs), err);
if (err) {
last_error = err;
}
}
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
/* Dynamic observer event dispatcher logic */
struct zbus_observer_node *obs_nd, *tmp;
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
__ASSERT(obs_nd != NULL, "observer node is NULL");
_ZBUS_ASSERT(obs_nd != NULL, "observer node is NULL");
if (obs_nd->obs->enabled && (obs_nd->obs->queue != NULL)) {
err = k_msgq_put(obs_nd->obs->queue, &chan,
sys_timepoint_timeout(end_time));
const struct zbus_observer *obs = obs_nd->obs;
_ZBUS_ASSERT(err == 0,
"could not deliver notification to observer %s. Error code %d",
_ZBUS_OBS_NAME(obs_nd->obs), err);
if (!obs->enabled) {
continue;
}
if (err) {
last_error = err;
}
err = _zbus_notify_observer(chan, obs, end_time);
if (err) {
last_error = err;
}
}
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
return last_error;
}
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
static int _zbus_notify_observers(const struct zbus_channel *chan, k_timepoint_t end_time)
{
int last_error = 0, err;
/* Notify static listeners */
for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
if ((*obs)->enabled && ((*obs)->callback != NULL)) {
(*obs)->callback(chan);
}
}
#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
_zbus_notify_runtime_listeners(chan);
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
/* Notify static subscribers */
for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
if ((*obs)->enabled && ((*obs)->queue != NULL)) {
err = k_msgq_put((*obs)->queue, &chan, sys_timepoint_timeout(end_time));
_ZBUS_ASSERT(err == 0, "could not deliver notification to observer %s.",
_ZBUS_OBS_NAME(*obs));
if (err) {
LOG_ERR("Observer %s at %p could not be notified. Error code %d",
_ZBUS_OBS_NAME(*obs), *obs, err);
last_error = err;
}
}
}
#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
err = _zbus_notify_runtime_subscribers(chan, end_time);
if (err) {
last_error = err;
}
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
return last_error;
}
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
{
int err;
k_timepoint_t end_time = sys_timepoint_calc(timeout);
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(msg != NULL, "msg is required");
k_timepoint_t end_time = sys_timepoint_calc(timeout);
if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
return -ENOMSG;
}
err = k_mutex_lock(chan->mutex, timeout);
err = k_mutex_lock(&chan->data->mutex, timeout);
if (err) {
return err;
}
memcpy(chan->message, msg, chan->message_size);
err = _zbus_notify_observers(chan, end_time);
err = _zbus_vded_exec(chan, end_time);
k_mutex_unlock(chan->mutex);
k_mutex_unlock(&chan->data->mutex);
return err;
}
@ -128,32 +155,33 @@ int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeo
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(msg != NULL, "msg is required");
err = k_mutex_lock(chan->mutex, timeout);
err = k_mutex_lock(&chan->data->mutex, timeout);
if (err) {
return err;
}
memcpy(msg, chan->message, chan->message_size);
return k_mutex_unlock(chan->mutex);
return k_mutex_unlock(&chan->data->mutex);
}
int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
{
int err;
k_timepoint_t end_time = sys_timepoint_calc(timeout);
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required");
err = k_mutex_lock(chan->mutex, timeout);
k_timepoint_t end_time = sys_timepoint_calc(timeout);
err = k_mutex_lock(&chan->data->mutex, timeout);
if (err) {
return err;
}
err = _zbus_notify_observers(chan, end_time);
err = _zbus_vded_exec(chan, end_time);
k_mutex_unlock(chan->mutex);
k_mutex_unlock(&chan->data->mutex);
return err;
}
@ -163,7 +191,7 @@ int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required");
int err = k_mutex_lock(chan->mutex, timeout);
int err = k_mutex_lock(&chan->data->mutex, timeout);
if (err) {
return err;
@ -177,7 +205,7 @@ int zbus_chan_finish(const struct zbus_channel *chan)
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required");
int err = k_mutex_unlock(chan->mutex);
int err = k_mutex_unlock(&chan->data->mutex);
return err;
}
@ -195,3 +223,51 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c
return k_msgq_get(sub->queue, chan, timeout);
}
int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
const struct zbus_channel *chan, bool masked)
{
_ZBUS_ASSERT(obs != NULL, "obs is required");
_ZBUS_ASSERT(chan != NULL, "chan is required");
struct zbus_channel_observation *observation;
struct zbus_channel_observation_mask *observation_mask;
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
i < limit; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
_ZBUS_ASSERT(observation != NULL, "observation must be not NULL");
if (observation->obs == obs) {
observation_mask->enabled = masked;
return 0;
}
}
return -ESRCH;
}
int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
const struct zbus_channel *chan, bool *masked)
{
_ZBUS_ASSERT(obs != NULL, "obs is required");
_ZBUS_ASSERT(chan != NULL, "chan is required");
struct zbus_channel_observation *observation;
struct zbus_channel_observation_mask *observation_mask;
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
i < limit; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
_ZBUS_ASSERT(observation != NULL, "observation must be not NULL");
if (observation->obs == obs) {
*masked = observation_mask->enabled;
return 0;
}
}
return -ESRCH;
}

View file

@ -1,4 +0,0 @@
#include <zephyr/linker/iterable_sections.h>
ITERABLE_SECTION_RAM(zbus_channel, 4)
ITERABLE_SECTION_RAM(zbus_observer, 4)

View file

@ -17,6 +17,17 @@ bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel
return true;
}
bool zbus_iterate_over_channels_with_user_data(
bool (*iterator_func)(const struct zbus_channel *chan, void *user_data), void *user_data)
{
STRUCT_SECTION_FOREACH(zbus_channel, chan) {
if (!(*iterator_func)(chan, user_data)) {
return false;
}
}
return true;
}
bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observer *obs))
{
STRUCT_SECTION_FOREACH(zbus_observer, obs) {
@ -26,3 +37,14 @@ bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observe
}
return true;
}
bool zbus_iterate_over_observers_with_user_data(
bool (*iterator_func)(const struct zbus_observer *obs, void *user_data), void *user_data)
{
STRUCT_SECTION_FOREACH(zbus_observer, obs) {
if (!(*iterator_func)(obs, user_data)) {
return false;
}
}
return true;
}

View file

@ -8,63 +8,59 @@
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
K_MEM_SLAB_DEFINE_STATIC(_zbus_runtime_obs_pool, sizeof(struct zbus_observer_node),
CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE, 4);
struct k_mem_slab *zbus_runtime_obs_pool(void)
{
return &_zbus_runtime_obs_pool;
}
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout)
{
int err;
struct zbus_observer_node *obs_nd, *tmp;
k_timepoint_t end_time = sys_timepoint_calc(timeout);
struct zbus_channel_observation *observation;
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(obs != NULL, "obs is required");
/* Check if the observer is already a static observer */
for (const struct zbus_observer *const *static_obs = chan->observers; *static_obs != NULL;
++static_obs) {
if (*static_obs == obs) {
return -EEXIST;
}
}
err = k_mutex_lock(chan->mutex, timeout);
err = k_mutex_lock(&chan->data->mutex, timeout);
if (err) {
return err;
}
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
i < limit; ++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
__ASSERT(observation != NULL, "observation must be not NULL");
if (observation->obs == obs) {
k_mutex_unlock(&chan->data->mutex);
return -EEXIST;
}
}
/* Check if the observer is already a runtime observer */
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
if (obs_nd->obs == obs) {
k_mutex_unlock(chan->mutex);
k_mutex_unlock(&chan->data->mutex);
return -EALREADY;
}
}
err = k_mem_slab_alloc(&_zbus_runtime_obs_pool, (void **)&obs_nd,
sys_timepoint_timeout(end_time));
struct zbus_observer_node *new_obs_nd = k_malloc(sizeof(struct zbus_observer_node));
if (err) {
LOG_ERR("Could not allocate memory on runtime observers pool\n");
if (new_obs_nd == NULL) {
LOG_ERR("Could not allocate observer node the heap is full!");
k_mutex_unlock(chan->mutex);
k_mutex_unlock(&chan->data->mutex);
return err;
return -ENOMEM;
}
obs_nd->obs = obs;
new_obs_nd->obs = obs;
sys_slist_append(chan->runtime_observers, &obs_nd->node);
sys_slist_append(&chan->data->observers, &new_obs_nd->node);
k_mutex_unlock(chan->mutex);
k_mutex_unlock(&chan->data->mutex);
return 0;
}
@ -80,19 +76,18 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(obs != NULL, "obs is required");
err = k_mutex_lock(chan->mutex, timeout);
err = k_mutex_lock(&chan->data->mutex, timeout);
if (err) {
return err;
}
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
if (obs_nd->obs == obs) {
sys_slist_remove(chan->runtime_observers, &prev_obs_nd->node,
&obs_nd->node);
sys_slist_remove(&chan->data->observers, &prev_obs_nd->node, &obs_nd->node);
k_mem_slab_free(&_zbus_runtime_obs_pool, (void **)&obs_nd);
k_free(obs_nd);
k_mutex_unlock(chan->mutex);
k_mutex_unlock(&chan->data->mutex);
return 0;
}
@ -100,7 +95,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer
prev_obs_nd = obs_nd;
}
k_mutex_unlock(chan->mutex);
k_mutex_unlock(&chan->data->mutex);
return -ENODATA;
}