samples: zbus: add confirmed channel sample

This commit adds a new sample to zbus that shows how to implement a
confirmed channel. Developers commonly asked about similar features on
zbus discord channel and RFC (in the past). Adding this sample, we can
show developers how to solve their needs (confirmed channels) without
zbus changes.

Signed-off-by: Rodrigo Peixoto <rodrigopex@gmail.com>
This commit is contained in:
Rodrigo Peixoto 2023-06-08 10:56:30 -03:00 committed by Carles Cufí
parent c8a9e445bb
commit a658eeecd2
5 changed files with 219 additions and 0 deletions

View file

@ -0,0 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(confirmed_channel)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})

View file

@ -0,0 +1,46 @@
.. _zbus-confirmed-channel-sample:
Confirmed channel sample
########################
Overview
********
This sample implements a simple way of using confirmed channels in zbus.
The confirmed channel can only be published when all the subscribers consume the message.
Building and Running
********************
This project outputs to the console. It can be built and executed
on QEMU as follows:
.. zephyr-app-commands::
:zephyr-app: samples/subsys/zbus/confirmed_channel
:host-os: unix
:board: qemu_x86
:goals: run
Sample Output
=============
.. code-block:: console
I: From listener -> Confirmed message payload = 0
I: From bar_sub2 subscriber -> Confirmed message payload = 0
I: From bar_sub1 subscriber -> Confirmed message payload = 0
I: From bar_sub3 subscriber -> Confirmed message payload = 0
I: From listener -> Confirmed message payload = 1
I: From bar_sub2 subscriber -> Confirmed message payload = 1
I: From bar_sub1 subscriber -> Confirmed message payload = 1
I: From bar_sub3 subscriber -> Confirmed message payload = 1
I: From listener -> Confirmed message payload = 2
I: From bar_sub2 subscriber -> Confirmed message payload = 2
I: From bar_sub1 subscriber -> Confirmed message payload = 2
I: From bar_sub3 subscriber -> Confirmed message payload = 2
I: From listener -> Confirmed message payload = 3
I: From bar_sub2 subscriber -> Confirmed message payload = 3
I: From bar_sub1 subscriber -> Confirmed message payload = 3
I: From bar_sub3 subscriber -> Confirmed message payload = 3
<continues>
Exit QEMU by pressing :kbd:`CTRL+A` :kbd:`x`.

View file

@ -0,0 +1,6 @@
CONFIG_LOG=y
CONFIG_LOG_MODE_MINIMAL=y
CONFIG_BOOT_BANNER=n
CONFIG_MAIN_THREAD_PRIORITY=5
CONFIG_ZBUS=y
CONFIG_ZBUS_LOG_LEVEL_INF=y

View file

@ -0,0 +1,23 @@
sample:
name: Confirmed message
tests:
sample.zbus.confirmed_message:
harness: console
harness_config:
type: multi_line
ordered: false
regex:
- "I: From listener -> Confirmed message payload = 0"
- "I: From bar_sub1 subscriber -> Confirmed message payload = 0"
- "I: From bar_sub2 subscriber -> Confirmed message payload = 0"
- "I: From bar_sub3 subscriber -> Confirmed message payload = 0"
- "I: From listener -> Confirmed message payload = 1"
- "I: From bar_sub1 subscriber -> Confirmed message payload = 1"
- "I: From bar_sub2 subscriber -> Confirmed message payload = 1"
- "I: From bar_sub3 subscriber -> Confirmed message payload = 1"
- "I: From listener -> Confirmed message payload = 2"
- "I: From bar_sub1 subscriber -> Confirmed message payload = 2"
- "I: From bar_sub2 subscriber -> Confirmed message payload = 2"
- "I: From bar_sub3 subscriber -> Confirmed message payload = 2"
platform_exclude: qemu_leon3
tags: zbus

View file

@ -0,0 +1,136 @@
/*
* Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
#include <zephyr/zbus/zbus.h>
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
static atomic_t sub_count = ATOMIC_INIT(0);
struct confirmed_msg {
uint32_t payload;
};
ZBUS_CHAN_DEFINE(confirmed_chan, /* Name */
struct confirmed_msg, /* Message type */
NULL, /* Validator */
&sub_count, /* User data */
ZBUS_OBSERVERS(foo_lis, bar_sub1, bar_sub2, bar_sub3), /* observers */
ZBUS_MSG_INIT(.payload = 0) /* Initial value */
);
static void listener_callback_example(const struct zbus_channel *chan)
{
const struct confirmed_msg *cm = zbus_chan_const_msg(chan);
LOG_INF("From listener -> Confirmed message payload = %u", cm->payload);
}
ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example);
ZBUS_SUBSCRIBER_DEFINE(bar_sub1, 4);
static void bar_sub1_task(void)
{
const struct zbus_channel *chan;
while (!zbus_sub_wait(&bar_sub1, &chan, K_FOREVER)) {
struct confirmed_msg cm;
if (&confirmed_chan != chan) {
continue;
}
zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
k_msleep(2500);
atomic_dec(zbus_chan_user_data(&confirmed_chan));
LOG_INF("From bar_sub1 subscriber -> Confirmed "
"message payload = "
"%u",
cm.payload);
}
}
K_THREAD_DEFINE(bar_sub1_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub1_task, NULL, NULL, NULL, 3, 0, 0);
ZBUS_SUBSCRIBER_DEFINE(bar_sub2, 4);
static void bar_sub2_task(void)
{
const struct zbus_channel *chan;
while (!zbus_sub_wait(&bar_sub2, &chan, K_FOREVER)) {
struct confirmed_msg cm;
if (&confirmed_chan != chan) {
continue;
}
zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
k_msleep(1000);
atomic_dec(zbus_chan_user_data(&confirmed_chan));
LOG_INF("From bar_sub2 subscriber -> Confirmed "
"message payload = "
"%u",
cm.payload);
}
}
K_THREAD_DEFINE(bar_sub2_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub2_task, NULL, NULL, NULL, 3, 0, 0);
ZBUS_SUBSCRIBER_DEFINE(bar_sub3, 4);
static void bar_sub3_task(void)
{
const struct zbus_channel *chan;
while (!zbus_sub_wait(&bar_sub3, &chan, K_FOREVER)) {
struct confirmed_msg cm;
if (&confirmed_chan != chan) {
continue;
}
zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
k_msleep(5000);
atomic_dec(zbus_chan_user_data(&confirmed_chan));
LOG_INF("From bar_sub3 subscriber -> Confirmed "
"message payload = "
"%u",
cm.payload);
}
}
K_THREAD_DEFINE(bar_sub3_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub3_task, NULL, NULL, NULL, 3, 0, 0);
static void pub_to_confirmed_channel(struct confirmed_msg *cm)
{
/* Wait for channel be consumed */
while (atomic_get(zbus_chan_user_data(&confirmed_chan)) > 0) {
k_msleep(100);
}
/* Set the number of subscribers to consume the channel */
atomic_set(zbus_chan_user_data(&confirmed_chan), 3);
zbus_chan_pub(&confirmed_chan, cm, K_MSEC(500));
}
int main(void)
{
struct confirmed_msg cm = {0};
while (1) {
pub_to_confirmed_channel(&cm);
++cm.payload;
}
return 0;
}