lib: os: mpsc_pbuf: Use flag for buffer full indication

Use flag instead of word in the buffer. Using this method allows
to dedicate full buffer capacity for data.

Signed-off-by: Krzysztof Chruscinski <krzysztof.chruscinski@nordicsemi.no>
This commit is contained in:
Krzysztof Chruscinski 2021-09-16 07:27:57 +02:00 committed by Carles Cufí
parent e46efdcd94
commit 235ee63233
3 changed files with 75 additions and 38 deletions

View file

@ -61,6 +61,9 @@ extern "C" {
/** @brief Flag indicating that maximum buffer usage is tracked. */
#define MPSC_PBUF_MAX_UTILIZATION BIT(2)
/** @brief Flag indicated that buffer is currently full. */
#define MPSC_PBUF_FULL BIT(3)
/**@} */
/* Forward declaration */

View file

@ -47,33 +47,50 @@ void mpsc_pbuf_init(struct mpsc_pbuf_buffer *buffer,
ARG_UNUSED(err);
}
/* Calculate free space available or till end of buffer.
*
* @param buffer Buffer.
* @param[out] res Destination where free space is written.
*
* @retval true when space was calculated until end of buffer (and there might
* be more space available after wrapping.
* @retval false When result is total free space.
*/
static inline bool free_space(struct mpsc_pbuf_buffer *buffer, uint32_t *res)
{
if (buffer->rd_idx > buffer->tmp_wr_idx) {
*res = buffer->rd_idx - buffer->tmp_wr_idx - 1;
return false;
} else if (!buffer->rd_idx) {
*res = buffer->size - buffer->tmp_wr_idx - 1;
if (buffer->flags & MPSC_PBUF_FULL) {
*res = 0;
return false;
}
if (buffer->rd_idx > buffer->tmp_wr_idx) {
*res = buffer->rd_idx - buffer->tmp_wr_idx;
return false;
}
*res = buffer->size - buffer->tmp_wr_idx;
return true;
}
/* Get amount of valid data.
*
* @param buffer Buffer.
* @param[out] res Destination where available space is written.
*
* @retval true when space was calculated until end of buffer (and there might
* be more space available after wrapping.
* @retval false When result is total free space.
*/
static inline bool available(struct mpsc_pbuf_buffer *buffer, uint32_t *res)
{
if (buffer->tmp_rd_idx <= buffer->wr_idx) {
*res = (buffer->wr_idx - buffer->tmp_rd_idx);
return false;
if (buffer->flags & MPSC_PBUF_FULL || buffer->tmp_rd_idx > buffer->wr_idx) {
*res = buffer->size - buffer->tmp_rd_idx;
return true;
}
*res = buffer->size - buffer->tmp_rd_idx;
*res = (buffer->wr_idx - buffer->tmp_rd_idx);
return true;
return false;
}
static inline uint32_t get_usage(struct mpsc_pbuf_buffer *buffer)
@ -127,6 +144,21 @@ static inline uint32_t get_skip(union mpsc_pbuf_generic *item)
return 0;
}
static ALWAYS_INLINE void tmp_wr_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
{
buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, wlen);
if (buffer->tmp_wr_idx == buffer->rd_idx) {
buffer->flags |= MPSC_PBUF_FULL;
}
}
static void rd_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
{
buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, wlen);
buffer->flags &= ~MPSC_PBUF_FULL;
}
static void add_skip_item(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
{
union mpsc_pbuf_generic skip = {
@ -134,7 +166,7 @@ static void add_skip_item(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
};
buffer->buf[buffer->tmp_wr_idx] = skip.raw;
buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, wlen);
tmp_wr_idx_inc(buffer, wlen);
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
}
@ -160,13 +192,17 @@ static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer
rd_wlen = skip_wlen ? skip_wlen : buffer->get_wlen(item);
if (skip_wlen) {
MPSC_PBUF_DBG(NULL, "Skip packet found (len: %u)", skip_wlen);
allow_drop = true;
} else if (allow_drop) {
if (item->hdr.busy) {
MPSC_PBUF_DBG(NULL, "Busy user packet found");
/* item is currently processed and cannot be overwritten. */
add_skip_item(buffer, free_wlen + 1);
if (free_wlen) {
add_skip_item(buffer, free_wlen);
}
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen);
buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, rd_wlen);
tmp_wr_idx_inc(buffer, rd_wlen);
/* Get next itme followed the busy one. */
uint32_t next_rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen);
@ -180,6 +216,7 @@ static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer
*user_packet = true;
}
} else {
MPSC_PBUF_DBG(NULL, "User packet to drop (len %u)", rd_wlen);
*user_packet = true;
}
} else {
@ -187,8 +224,9 @@ static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer
}
if (allow_drop) {
buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen);
rd_idx_inc(buffer, rd_wlen);
buffer->tmp_rd_idx = buffer->rd_idx;
MPSC_PBUF_DBG(buffer, "Incremented rd indexes after drop");
}
return item;
@ -207,10 +245,12 @@ void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer,
cont = false;
key = k_spin_lock(&buffer->lock);
(void)free_space(buffer, &free_wlen);
MPSC_PBUF_DBG(buffer, "put_word (%d free space)", (int)free_wlen);
if (free_wlen) {
buffer->buf[buffer->tmp_wr_idx] = item.raw;
buffer->tmp_wr_idx = idx_inc(buffer,
buffer->tmp_wr_idx, 1);
tmp_wr_idx_inc(buffer, 1);
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, 1);
max_utilization_update(buffer);
} else {
@ -244,8 +284,8 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
MPSC_PBUF_DBG(buffer, "alloc %d words", (int)wlen);
if (wlen > (buffer->size - 1)) {
MPSC_PBUF_DBG(buffer, "Failed to alloc, ");
if (wlen > (buffer->size)) {
MPSC_PBUF_DBG(buffer, "Failed to alloc");
return NULL;
}
@ -262,8 +302,7 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
(union mpsc_pbuf_generic *)&buffer->buf[buffer->tmp_wr_idx];
item->hdr.valid = 0;
item->hdr.busy = 0;
buffer->tmp_wr_idx = idx_inc(buffer,
buffer->tmp_wr_idx, wlen);
tmp_wr_idx_inc(buffer, wlen);
} else if (wrap) {
add_skip_item(buffer, free_wlen);
cont = true;
@ -345,8 +384,7 @@ void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer,
(void **)&buffer->buf[buffer->tmp_wr_idx + 1];
*p = (void *)data;
buffer->tmp_wr_idx =
idx_inc(buffer, buffer->tmp_wr_idx, l);
tmp_wr_idx_inc(buffer, l);
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, l);
max_utilization_update(buffer);
} else if (wrap) {
@ -391,9 +429,8 @@ void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, const uint32_t *data,
if (free_wlen >= wlen) {
memcpy(&buffer->buf[buffer->tmp_wr_idx], data,
wlen * sizeof(uint32_t));
buffer->tmp_wr_idx =
idx_inc(buffer, buffer->tmp_wr_idx, wlen);
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
tmp_wr_idx_inc(buffer, wlen);
max_utilization_update(buffer);
} else if (wrap) {
add_skip_item(buffer, free_wlen);
@ -444,8 +481,7 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
buffer->tmp_rd_idx =
idx_inc(buffer, buffer->tmp_rd_idx, inc);
buffer->rd_idx =
idx_inc(buffer, buffer->rd_idx, inc);
rd_idx_inc(buffer, inc);
cont = true;
} else {
item->hdr.busy = 1;
@ -475,8 +511,9 @@ void mpsc_pbuf_free(struct mpsc_pbuf_buffer *buffer,
if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE) ||
((uint32_t *)item == &buffer->buf[buffer->rd_idx])) {
witem->hdr.busy = 0;
buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, wlen);
rd_idx_inc(buffer, wlen);
} else {
MPSC_PBUF_DBG(buffer, "Allocation occurred during claim");
witem->skip.len = wlen;
}
MPSC_PBUF_DBG(buffer, "freed: %p", item);

View file

@ -566,16 +566,14 @@ ZTEST(test_log_api, test_log_from_declared_module)
* adding new message will lead to one message drop, otherwise 2 message will
* be dropped.
*/
static size_t get_short_msg_capacity(bool *remainder)
static size_t get_short_msg_capacity(void)
{
*remainder = (CONFIG_LOG_BUFFER_SIZE % LOG_SIMPLE_MSG_LEN) ?
true : false;
return (CONFIG_LOG_BUFFER_SIZE - sizeof(int)) / LOG_SIMPLE_MSG_LEN;
return CONFIG_LOG_BUFFER_SIZE / LOG_SIMPLE_MSG_LEN;
}
static void log_n_messages(uint32_t n_msg, uint32_t exp_dropped)
{
printk("ex dropped:%d\n", exp_dropped);
log_timestamp_t exp_timestamp = TIMESTAMP_INIT_VAL;
log_setup(false);
@ -626,14 +624,13 @@ ZTEST(test_log_api_1cpu, test_log_msg_dropped_notification)
ztest_test_skip();
}
bool remainder;
uint32_t capacity = get_short_msg_capacity(&remainder);
uint32_t capacity = get_short_msg_capacity();
log_n_messages(capacity, 0);
/* Expect messages dropped when logger more than buffer capacity. */
log_n_messages(capacity + 1, 1 + (remainder ? 1 : 0));
log_n_messages(capacity + 2, 2 + (remainder ? 1 : 0));
log_n_messages(capacity + 1, 1);
log_n_messages(capacity + 2, 2);
}
/* Test checks if panic is correctly executed. On panic logger should flush all