/* * Copyright (c) 2022 Rodrigo Peixoto * SPDX-License-Identifier: Apache-2.0 */ #include #include #include #include #include #include LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL); int _zbus_init(void) { const struct zbus_channel *curr = NULL; const struct zbus_channel *prev = NULL; STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) { curr = observation->chan; if (prev != curr) { if (prev == NULL) { curr->data->observers_start_idx = 0; curr->data->observers_end_idx = 0; } else { curr->data->observers_start_idx = prev->data->observers_end_idx; curr->data->observers_end_idx = prev->data->observers_end_idx; } prev = curr; } ++(curr->data->observers_end_idx); } STRUCT_SECTION_FOREACH(zbus_channel, chan) { k_mutex_init(&chan->data->mutex); #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) sys_slist_init(&chan->data->observers); #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ } return 0; } SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY); static inline int _zbus_notify_observer(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timepoint_t end_time) { int err = 0; if (obs->type == ZBUS_OBSERVER_LISTENER_TYPE) { obs->callback(chan); } else if (obs->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE) { err = k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); } else { CODE_UNREACHABLE; } return err; } static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time) { int err = 0; int last_error = 0; _ZBUS_ASSERT(chan != NULL, "chan is required"); /* Static observer event dispatcher logic */ struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); const struct zbus_observer *obs = observation->obs; if (!obs->enabled || observation_mask->enabled) { continue; } err = _zbus_notify_observer(chan, obs, end_time); _ZBUS_ASSERT(err == 0, "could not deliver notification to observer %s. Error code %d", _ZBUS_OBS_NAME(obs), err); if (err) { last_error = err; } } #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) /* Dynamic observer event dispatcher logic */ struct zbus_observer_node *obs_nd, *tmp; SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { _ZBUS_ASSERT(obs_nd != NULL, "observer node is NULL"); const struct zbus_observer *obs = obs_nd->obs; if (!obs->enabled) { continue; } err = _zbus_notify_observer(chan, obs, end_time); if (err) { last_error = err; } } #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ return last_error; } int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout) { int err; _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); k_timepoint_t end_time = sys_timepoint_calc(timeout); if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) { return -ENOMSG; } err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } memcpy(chan->message, msg, chan->message_size); err = _zbus_vded_exec(chan, end_time); k_mutex_unlock(&chan->data->mutex); return err; } int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout) { int err; _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } memcpy(msg, chan->message, chan->message_size); return k_mutex_unlock(&chan->data->mutex); } int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout) { int err; _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); k_timepoint_t end_time = sys_timepoint_calc(timeout); err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } err = _zbus_vded_exec(chan, end_time); k_mutex_unlock(&chan->data->mutex); return err; } int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) { _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); int err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } return 0; } int zbus_chan_finish(const struct zbus_channel *chan) { _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); int err = k_mutex_unlock(&chan->data->mutex); return err; } int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout) { _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(sub != NULL, "sub is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); if (sub->queue == NULL) { return -EINVAL; } return k_msgq_get(sub->queue, chan, timeout); } int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, const struct zbus_channel *chan, bool masked) { _ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); if (observation->obs == obs) { observation_mask->enabled = masked; return 0; } } return -ESRCH; } int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, const struct zbus_channel *chan, bool *masked) { _ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); if (observation->obs == obs) { *masked = observation_mask->enabled; return 0; } } return -ESRCH; }