zbus: Add Message subscriber
Besides the changed channel reference, the message subscribers receive a copy of the message during the VDED execution. ZBus guarantees message delivery for `MSG_SUBSCRIBERS`. Signed-off-by: Rodrigo Peixoto <rodrigopex@gmail.com>
This commit is contained in:
parent
a49720ab08
commit
0b0aa435af
|
@ -92,7 +92,8 @@ struct zbus_channel {
|
|||
*/
|
||||
enum __packed zbus_observer_type {
|
||||
ZBUS_OBSERVER_LISTENER_TYPE,
|
||||
ZBUS_OBSERVER_SUBSCRIBER_TYPE
|
||||
ZBUS_OBSERVER_SUBSCRIBER_TYPE,
|
||||
ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -127,6 +128,13 @@ struct zbus_observer {
|
|||
|
||||
/** Observer callback function. It turns the observer into a listener. */
|
||||
void (*const callback)(const struct zbus_channel *chan);
|
||||
|
||||
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) || defined(__DOXYGEN__)
|
||||
/** Observer message FIFO. It turns the observer into a message subscriber. It only
|
||||
* exists if the @kconfig{CONFIG_ZBUS_MSG_SUBSCRIBER} is enabled.
|
||||
*/
|
||||
struct k_fifo *const message_fifo;
|
||||
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
|
||||
};
|
||||
};
|
||||
|
||||
|
@ -156,8 +164,10 @@ struct zbus_channel_observation {
|
|||
|
||||
#if defined(CONFIG_ZBUS_CHANNEL_NAME)
|
||||
#define ZBUS_CHANNEL_NAME_INIT(_name) .name = #_name,
|
||||
#define _ZBUS_CHAN_NAME(_chan) (_chan)->name
|
||||
#else
|
||||
#define ZBUS_CHANNEL_NAME_INIT(_name)
|
||||
#define _ZBUS_CHAN_NAME(_chan) ""
|
||||
#endif
|
||||
|
||||
#if defined(CONFIG_ZBUS_OBSERVER_NAME)
|
||||
|
@ -379,6 +389,37 @@ k_timeout_t _zbus_timeout_remainder(uint64_t end_ticks);
|
|||
*/
|
||||
#define ZBUS_LISTENER_DEFINE(_name, _cb) ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, true)
|
||||
|
||||
/**
|
||||
* @brief Define and initialize a message subscriber.
|
||||
*
|
||||
* This macro defines an observer of @ref ZBUS_OBSERVER_SUBSCRIBER_TYPE type. It defines a FIFO
|
||||
* where the subscriber will receive the message asynchronously and initialize the @ref
|
||||
* zbus_observer defining the subscriber.
|
||||
*
|
||||
* @param[in] _name The subscriber's name.
|
||||
* @param[in] _enable The subscriber's initial state.
|
||||
*/
|
||||
#define ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _enable) \
|
||||
static K_FIFO_DEFINE(_zbus_observer_fifo_##_name); \
|
||||
STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \
|
||||
ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
|
||||
.type = ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, \
|
||||
.enabled = _enable, \
|
||||
.message_fifo = &_zbus_observer_fifo_##_name, \
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Define and initialize an enabled message subscriber.
|
||||
*
|
||||
* This macro defines an observer of message subscriber type. It defines a FIFO where the
|
||||
* subscriber will receive the message asynchronously and initialize the @ref
|
||||
* zbus_observer defining the subscriber. The message subscribers are defined in the enabled state
|
||||
* with this macro.
|
||||
|
||||
*
|
||||
* @param[in] _name The subscriber's name.
|
||||
*/
|
||||
#define ZBUS_MSG_SUBSCRIBER_DEFINE(_name) ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, true)
|
||||
/**
|
||||
*
|
||||
* @brief Publish to a channel
|
||||
|
@ -741,6 +782,31 @@ static inline const char *zbus_obs_name(const struct zbus_observer *obs)
|
|||
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
|
||||
k_timeout_t timeout);
|
||||
|
||||
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) || defined(__DOXYGEN__)
|
||||
|
||||
/**
|
||||
* @brief Wait for a channel message.
|
||||
*
|
||||
* This routine makes the subscriber wait for the new message in case of channel publication.
|
||||
*
|
||||
* @param[in] sub The subscriber's reference.
|
||||
* @param[out] chan The notification channel's reference.
|
||||
* @param[out] msg A reference to a copy of the published message.
|
||||
* @param[in] timeout Waiting period for a notification arrival,
|
||||
* or one of the special values, K_NO_WAIT and K_FOREVER.
|
||||
*
|
||||
* @retval 0 Message received.
|
||||
* @retval -EINVAL The observer is not a subscriber.
|
||||
* @retval -ENOMSG Could not retrieve the net_buf from the subscriber FIFO.
|
||||
* @retval -EILSEQ Received an invalid channel reference.
|
||||
* @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The
|
||||
* function only returns this value when the @kconfig{CONFIG_ZBUS_ASSERT_MOCK} is enabled.
|
||||
*/
|
||||
int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
|
||||
k_timeout_t timeout);
|
||||
|
||||
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
|
||||
|
||||
/**
|
||||
*
|
||||
* @brief Iterate over channels.
|
||||
|
|
|
@ -18,6 +18,36 @@ config ZBUS_CHANNEL_NAME
|
|||
config ZBUS_OBSERVER_NAME
|
||||
bool "Observer name field"
|
||||
|
||||
config ZBUS_MSG_SUBSCRIBER
|
||||
select NET_BUF
|
||||
bool "Message subscribers will receive all messages in sequence."
|
||||
|
||||
if ZBUS_MSG_SUBSCRIBER
|
||||
|
||||
choice
|
||||
prompt "ZBus msg_subscribers buffer allocation"
|
||||
|
||||
config ZBUS_MSG_SUBSCRIBER_NET_BUF_DYNAMIC
|
||||
bool "Use heap to allocate msg_subscriber buffers data"
|
||||
|
||||
config ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC
|
||||
bool "Use fixed data size for msg_subscriber buffers pool"
|
||||
|
||||
endchoice
|
||||
|
||||
config ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE
|
||||
default 16
|
||||
int "The count of net_buf available to be used simutaneously."
|
||||
|
||||
if ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC
|
||||
|
||||
config ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE
|
||||
int "The size of the biggest message used with ZBus."
|
||||
|
||||
endif # ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC
|
||||
|
||||
endif # ZBUS_MSG_SUBSCRIBER
|
||||
|
||||
config ZBUS_RUNTIME_OBSERVERS
|
||||
bool "Runtime observers support."
|
||||
default n
|
||||
|
|
|
@ -8,11 +8,47 @@
|
|||
#include <zephyr/sys/iterable_sections.h>
|
||||
#include <zephyr/logging/log.h>
|
||||
#include <zephyr/sys/printk.h>
|
||||
#include <zephyr/net/buf.h>
|
||||
#include <zephyr/zbus/zbus.h>
|
||||
LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
|
||||
|
||||
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
|
||||
|
||||
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_DYNAMIC)
|
||||
|
||||
NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE,
|
||||
sizeof(struct zbus_channel *), NULL);
|
||||
BUILD_ASSERT(CONFIG_HEAP_MEM_POOL_SIZE > 0, "MSG_SUBSCRIBER feature requires heap memory pool.");
|
||||
|
||||
static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
return net_buf_alloc_len(&_zbus_msg_subscribers_pool, size, timeout);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool,
|
||||
(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE),
|
||||
(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE),
|
||||
sizeof(struct zbus_channel *), NULL);
|
||||
|
||||
static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
__ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE,
|
||||
"CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to "
|
||||
"%d",
|
||||
(int)size);
|
||||
return net_buf_alloc(&_zbus_msg_subscribers_pool, timeout);
|
||||
}
|
||||
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_DYNAMIC */
|
||||
|
||||
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
|
||||
|
||||
int _zbus_init(void)
|
||||
{
|
||||
|
||||
const struct zbus_channel *curr = NULL;
|
||||
const struct zbus_channel *prev = NULL;
|
||||
|
||||
|
@ -44,32 +80,62 @@ int _zbus_init(void)
|
|||
SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY);
|
||||
|
||||
static inline int _zbus_notify_observer(const struct zbus_channel *chan,
|
||||
const struct zbus_observer *obs, k_timepoint_t end_time)
|
||||
const struct zbus_observer *obs, k_timepoint_t end_time,
|
||||
struct net_buf *buf)
|
||||
{
|
||||
int err = 0;
|
||||
|
||||
if (obs->type == ZBUS_OBSERVER_LISTENER_TYPE) {
|
||||
switch (obs->type) {
|
||||
case ZBUS_OBSERVER_LISTENER_TYPE: {
|
||||
obs->callback(chan);
|
||||
|
||||
} else if (obs->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE) {
|
||||
err = k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
|
||||
} else {
|
||||
CODE_UNREACHABLE;
|
||||
break;
|
||||
}
|
||||
return err;
|
||||
case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
|
||||
return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
|
||||
}
|
||||
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
|
||||
case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
|
||||
struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
|
||||
|
||||
if (cloned_buf == NULL) {
|
||||
return -ENOMEM;
|
||||
}
|
||||
memcpy(net_buf_user_data(cloned_buf), &chan, sizeof(struct zbus_channel *));
|
||||
|
||||
net_buf_put(obs->message_fifo, cloned_buf);
|
||||
|
||||
break;
|
||||
}
|
||||
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
|
||||
|
||||
default:
|
||||
_ZBUS_ASSERT(false, "Unreachable");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
|
||||
{
|
||||
int err = 0;
|
||||
int last_error = 0;
|
||||
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
struct net_buf *buf = NULL;
|
||||
|
||||
/* Static observer event dispatcher logic */
|
||||
struct zbus_channel_observation *observation;
|
||||
struct zbus_channel_observation_mask *observation_mask;
|
||||
|
||||
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
|
||||
buf = _zbus_create_net_buf(&_zbus_msg_subscribers_pool, zbus_chan_msg_size(chan),
|
||||
sys_timepoint_timeout(end_time));
|
||||
|
||||
_ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is "
|
||||
"unavailable or heap is full");
|
||||
|
||||
net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan));
|
||||
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
|
||||
|
||||
LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan));
|
||||
|
||||
int __maybe_unused index = 0;
|
||||
|
||||
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
|
||||
i < limit; ++i) {
|
||||
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
|
||||
|
@ -83,15 +149,21 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
|
|||
continue;
|
||||
}
|
||||
|
||||
err = _zbus_notify_observer(chan, obs, end_time);
|
||||
|
||||
_ZBUS_ASSERT(err == 0,
|
||||
"could not deliver notification to observer %s. Error code %d",
|
||||
_ZBUS_OBS_NAME(obs), err);
|
||||
err = _zbus_notify_observer(chan, obs, end_time, buf);
|
||||
|
||||
if (err) {
|
||||
last_error = err;
|
||||
LOG_ERR("could not deliver notification to observer %s. Error code %d",
|
||||
_ZBUS_OBS_NAME(obs), err);
|
||||
if (err == -ENOMEM) {
|
||||
if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) {
|
||||
net_buf_unref(buf);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs));
|
||||
}
|
||||
|
||||
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
|
||||
|
@ -100,15 +172,13 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
|
|||
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
|
||||
|
||||
_ZBUS_ASSERT(obs_nd != NULL, "observer node is NULL");
|
||||
|
||||
const struct zbus_observer *obs = obs_nd->obs;
|
||||
|
||||
if (!obs->enabled) {
|
||||
continue;
|
||||
}
|
||||
|
||||
err = _zbus_notify_observer(chan, obs, end_time);
|
||||
err = _zbus_notify_observer(chan, obs, end_time, buf);
|
||||
|
||||
if (err) {
|
||||
last_error = err;
|
||||
|
@ -116,6 +186,8 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
|
|||
}
|
||||
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
|
||||
|
||||
IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);))
|
||||
|
||||
return last_error;
|
||||
}
|
||||
|
||||
|
@ -215,15 +287,43 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c
|
|||
{
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(sub != NULL, "sub is required");
|
||||
_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
|
||||
_ZBUS_ASSERT(sub->queue != NULL, "sub queue is required");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
|
||||
if (sub->queue == NULL) {
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
return k_msgq_get(sub->queue, chan, timeout);
|
||||
}
|
||||
|
||||
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
|
||||
|
||||
int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus subscribers cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(sub != NULL, "sub is required");
|
||||
_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
|
||||
"sub must be a MSG_SUBSCRIBER");
|
||||
_ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
_ZBUS_ASSERT(msg != NULL, "msg is required");
|
||||
|
||||
struct net_buf *buf = net_buf_get(sub->message_fifo, timeout);
|
||||
|
||||
if (buf == NULL) {
|
||||
return -ENOMSG;
|
||||
}
|
||||
|
||||
*chan = *((struct zbus_channel **)net_buf_user_data(buf));
|
||||
|
||||
memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan));
|
||||
|
||||
net_buf_unref(buf);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
|
||||
|
||||
int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
|
||||
const struct zbus_channel *chan, bool masked)
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue