c992707251
Replace mutexes with semaphores to protect the channels in conjunction with a priority boost algorithm based on the observers' priority. Signed-off-by: Rodrigo Peixoto <rodrigopex@gmail.com>
102 lines
2.3 KiB
C
102 lines
2.3 KiB
C
/*
|
|
* Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
#include <zephyr/kernel.h>
|
|
#include <zephyr/logging/log.h>
|
|
#include <zephyr/zbus/zbus.h>
|
|
|
|
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
|
|
|
|
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
|
|
k_timeout_t timeout)
|
|
{
|
|
int err;
|
|
struct zbus_observer_node *obs_nd, *tmp;
|
|
struct zbus_channel_observation *observation;
|
|
|
|
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
|
|
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
|
_ZBUS_ASSERT(obs != NULL, "obs is required");
|
|
|
|
err = k_sem_take(&chan->data->sem, timeout);
|
|
if (err) {
|
|
return err;
|
|
}
|
|
|
|
for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
|
|
i < limit; ++i) {
|
|
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
|
|
|
|
__ASSERT(observation != NULL, "observation must be not NULL");
|
|
|
|
if (observation->obs == obs) {
|
|
k_sem_give(&chan->data->sem);
|
|
|
|
return -EEXIST;
|
|
}
|
|
}
|
|
|
|
/* Check if the observer is already a runtime observer */
|
|
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
|
|
if (obs_nd->obs == obs) {
|
|
k_sem_give(&chan->data->sem);
|
|
|
|
return -EALREADY;
|
|
}
|
|
}
|
|
|
|
struct zbus_observer_node *new_obs_nd = k_malloc(sizeof(struct zbus_observer_node));
|
|
|
|
if (new_obs_nd == NULL) {
|
|
LOG_ERR("Could not allocate observer node the heap is full!");
|
|
|
|
k_sem_give(&chan->data->sem);
|
|
|
|
return -ENOMEM;
|
|
}
|
|
|
|
new_obs_nd->obs = obs;
|
|
|
|
sys_slist_append(&chan->data->observers, &new_obs_nd->node);
|
|
|
|
k_sem_give(&chan->data->sem);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
|
|
k_timeout_t timeout)
|
|
{
|
|
int err;
|
|
struct zbus_observer_node *obs_nd, *tmp;
|
|
struct zbus_observer_node *prev_obs_nd = NULL;
|
|
|
|
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
|
|
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
|
_ZBUS_ASSERT(obs != NULL, "obs is required");
|
|
|
|
err = k_sem_take(&chan->data->sem, timeout);
|
|
if (err) {
|
|
return err;
|
|
}
|
|
|
|
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
|
|
if (obs_nd->obs == obs) {
|
|
sys_slist_remove(&chan->data->observers, &prev_obs_nd->node, &obs_nd->node);
|
|
|
|
k_free(obs_nd);
|
|
|
|
k_sem_give(&chan->data->sem);
|
|
|
|
return 0;
|
|
}
|
|
|
|
prev_obs_nd = obs_nd;
|
|
}
|
|
|
|
k_sem_give(&chan->data->sem);
|
|
|
|
return -ENODATA;
|
|
}
|