lib: os: mpsc_pbuf: Fix concurrency issues
Fixed issues which were leading to failures when producing and consuming is preempted at various stages. Signed-off-by: Krzysztof Chruscinski <krzysztof.chruscinski@nordicsemi.no>
This commit is contained in:
parent
d6442ac9e1
commit
accaebb708
|
@ -124,7 +124,7 @@ static inline bool is_invalid(union mpsc_pbuf_generic *item)
|
|||
}
|
||||
|
||||
static inline uint32_t idx_inc(struct mpsc_pbuf_buffer *buffer,
|
||||
uint32_t idx, uint32_t val)
|
||||
uint32_t idx, int32_t val)
|
||||
{
|
||||
uint32_t i = idx + val;
|
||||
|
||||
|
@ -145,7 +145,7 @@ static inline uint32_t get_skip(union mpsc_pbuf_generic *item)
|
|||
}
|
||||
|
||||
|
||||
static ALWAYS_INLINE void tmp_wr_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
|
||||
static ALWAYS_INLINE void tmp_wr_idx_inc(struct mpsc_pbuf_buffer *buffer, int32_t wlen)
|
||||
{
|
||||
buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, wlen);
|
||||
if (buffer->tmp_wr_idx == buffer->rd_idx) {
|
||||
|
@ -153,7 +153,7 @@ static ALWAYS_INLINE void tmp_wr_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32
|
|||
}
|
||||
}
|
||||
|
||||
static void rd_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
|
||||
static void rd_idx_inc(struct mpsc_pbuf_buffer *buffer, int32_t wlen)
|
||||
{
|
||||
buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, wlen);
|
||||
buffer->flags &= ~MPSC_PBUF_FULL;
|
||||
|
@ -170,66 +170,119 @@ static void add_skip_item(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
|
|||
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
|
||||
}
|
||||
|
||||
/* Attempts to drop a packet. If user packets dropping is allowed then any
|
||||
* type of packet is dropped. Otherwise only skip packets (internal padding).
|
||||
*
|
||||
* If user packet was dropped @p user_packet is set to true. Function returns
|
||||
* a pointer to a dropped packet or null if nothing was dropped. It may point
|
||||
* to user packet (@p user_packet set to true) or internal, skip packet.
|
||||
*/
|
||||
static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer,
|
||||
uint32_t free_wlen,
|
||||
bool allow_drop,
|
||||
bool *user_packet)
|
||||
static bool drop_item_locked(struct mpsc_pbuf_buffer *buffer,
|
||||
uint32_t free_wlen,
|
||||
union mpsc_pbuf_generic **item_to_drop,
|
||||
uint32_t *tmp_wr_idx_shift)
|
||||
{
|
||||
union mpsc_pbuf_generic *item;
|
||||
uint32_t rd_wlen;
|
||||
uint32_t skip_wlen;
|
||||
|
||||
*user_packet = false;
|
||||
item = (union mpsc_pbuf_generic *)&buffer->buf[buffer->rd_idx];
|
||||
skip_wlen = get_skip(item);
|
||||
*item_to_drop = NULL;
|
||||
*tmp_wr_idx_shift = 0;
|
||||
|
||||
rd_wlen = skip_wlen ? skip_wlen : buffer->get_wlen(item);
|
||||
if (skip_wlen) {
|
||||
MPSC_PBUF_DBG(NULL, "Skip packet found (len: %u)", skip_wlen);
|
||||
allow_drop = true;
|
||||
} else if (allow_drop) {
|
||||
if (item->hdr.busy) {
|
||||
MPSC_PBUF_DBG(NULL, "Busy user packet found");
|
||||
/* item is currently processed and cannot be overwritten. */
|
||||
if (free_wlen) {
|
||||
add_skip_item(buffer, free_wlen);
|
||||
}
|
||||
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen);
|
||||
tmp_wr_idx_inc(buffer, rd_wlen);
|
||||
/* Skip packet found, can be dropped to free some space */
|
||||
MPSC_PBUF_DBG(buffer, "no space: Found skip packet %d len", skip_wlen);
|
||||
|
||||
/* Get next itme followed the busy one. */
|
||||
uint32_t next_rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen);
|
||||
|
||||
item = (union mpsc_pbuf_generic *)&buffer->buf[next_rd_idx];
|
||||
skip_wlen = get_skip(item);
|
||||
if (skip_wlen) {
|
||||
rd_wlen += skip_wlen;
|
||||
} else {
|
||||
rd_wlen += buffer->get_wlen(item);
|
||||
*user_packet = true;
|
||||
}
|
||||
} else {
|
||||
MPSC_PBUF_DBG(NULL, "User packet to drop (len %u)", rd_wlen);
|
||||
*user_packet = true;
|
||||
}
|
||||
} else {
|
||||
item = NULL;
|
||||
rd_idx_inc(buffer, skip_wlen);
|
||||
buffer->tmp_rd_idx = buffer->rd_idx;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (allow_drop) {
|
||||
/* Other options for dropping available only in overwrite mode. */
|
||||
if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
uint32_t rd_wlen = buffer->get_wlen(item);
|
||||
|
||||
/* If packet is busy need to be ommited. */
|
||||
if (!is_valid(item)) {
|
||||
return false;
|
||||
} else if (item->hdr.busy) {
|
||||
MPSC_PBUF_DBG(buffer, "no space: Found busy packet %p (len:%d)", item, rd_wlen);
|
||||
/* Add skip packet before claimed packet. */
|
||||
if (free_wlen) {
|
||||
add_skip_item(buffer, free_wlen);
|
||||
MPSC_PBUF_DBG(buffer, "no space: Added skip packet (len:%d)", free_wlen);
|
||||
}
|
||||
/* Move all indexes forward, after claimed packet. */
|
||||
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen);
|
||||
|
||||
/* If allocation wrapped around the buffer and found busy packet
|
||||
* that was already ommited, skip it again.
|
||||
*/
|
||||
if (buffer->rd_idx == buffer->tmp_rd_idx) {
|
||||
buffer->tmp_rd_idx = idx_inc(buffer, buffer->tmp_rd_idx, rd_wlen);
|
||||
}
|
||||
|
||||
buffer->tmp_wr_idx = buffer->tmp_rd_idx;
|
||||
buffer->rd_idx = buffer->tmp_rd_idx;
|
||||
buffer->flags |= MPSC_PBUF_FULL;
|
||||
} else {
|
||||
/* Prepare packet dropping. */
|
||||
rd_idx_inc(buffer, rd_wlen);
|
||||
buffer->tmp_rd_idx = buffer->rd_idx;
|
||||
MPSC_PBUF_DBG(buffer, "Incremented rd indexes after drop");
|
||||
/* Temporary move tmp_wr idx forward to ensure that packet
|
||||
* will not be dropped twice and content will not be
|
||||
* overwritten.
|
||||
*/
|
||||
if (free_wlen) {
|
||||
/* Free location mark as invalid to prevent
|
||||
* reading incomplete data.
|
||||
*/
|
||||
union mpsc_pbuf_generic invalid = {
|
||||
.hdr = {
|
||||
.valid = 0,
|
||||
.busy = 0
|
||||
}
|
||||
};
|
||||
|
||||
buffer->buf[buffer->tmp_wr_idx] = invalid.raw;
|
||||
}
|
||||
|
||||
*tmp_wr_idx_shift = rd_wlen + free_wlen;
|
||||
buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, *tmp_wr_idx_shift);
|
||||
buffer->flags |= MPSC_PBUF_FULL;
|
||||
item->hdr.valid = 0;
|
||||
*item_to_drop = item;
|
||||
MPSC_PBUF_DBG(buffer, "no space: dropping packet %p (len: %d)",
|
||||
item, rd_wlen);
|
||||
}
|
||||
|
||||
return item;
|
||||
return true;
|
||||
}
|
||||
|
||||
static void post_drop_action(struct mpsc_pbuf_buffer *buffer,
|
||||
uint32_t prev_tmp_wr_idx,
|
||||
uint32_t tmp_wr_idx_shift)
|
||||
{
|
||||
uint32_t cmp_tmp_wr_idx = idx_inc(buffer, prev_tmp_wr_idx, tmp_wr_idx_shift);
|
||||
|
||||
if (cmp_tmp_wr_idx == buffer->tmp_wr_idx) {
|
||||
/* Operation not interrupted by another alloc. */
|
||||
buffer->tmp_wr_idx = prev_tmp_wr_idx;
|
||||
buffer->flags &= ~MPSC_PBUF_FULL;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Operation interrupted, mark area as to be skipped. */
|
||||
union mpsc_pbuf_generic skip = {
|
||||
.skip = {
|
||||
.valid = 0,
|
||||
.busy = 1,
|
||||
.len = tmp_wr_idx_shift
|
||||
}
|
||||
};
|
||||
|
||||
buffer->buf[prev_tmp_wr_idx] = skip.raw;
|
||||
buffer->wr_idx = idx_inc(buffer,
|
||||
buffer->wr_idx,
|
||||
tmp_wr_idx_shift);
|
||||
/* full flag? */
|
||||
}
|
||||
|
||||
void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer,
|
||||
|
@ -239,11 +292,17 @@ void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer,
|
|||
uint32_t free_wlen;
|
||||
k_spinlock_key_t key;
|
||||
union mpsc_pbuf_generic *dropped_item = NULL;
|
||||
bool valid_drop;
|
||||
uint32_t tmp_wr_idx_shift = 0;
|
||||
uint32_t tmp_wr_idx_val = 0;
|
||||
|
||||
do {
|
||||
cont = false;
|
||||
key = k_spin_lock(&buffer->lock);
|
||||
|
||||
if (tmp_wr_idx_shift) {
|
||||
post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift);
|
||||
tmp_wr_idx_shift = 0;
|
||||
}
|
||||
|
||||
(void)free_space(buffer, &free_wlen);
|
||||
|
||||
MPSC_PBUF_DBG(buffer, "put_word (%d free space)", (int)free_wlen);
|
||||
|
@ -251,26 +310,25 @@ void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer,
|
|||
if (free_wlen) {
|
||||
buffer->buf[buffer->tmp_wr_idx] = item.raw;
|
||||
tmp_wr_idx_inc(buffer, 1);
|
||||
cont = false;
|
||||
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, 1);
|
||||
max_utilization_update(buffer);
|
||||
} else {
|
||||
bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
|
||||
|
||||
dropped_item = drop_item_locked(buffer, free_wlen,
|
||||
user_drop, &valid_drop);
|
||||
cont = dropped_item != NULL;
|
||||
tmp_wr_idx_val = buffer->tmp_wr_idx;
|
||||
cont = drop_item_locked(buffer, free_wlen,
|
||||
&dropped_item, &tmp_wr_idx_shift);
|
||||
}
|
||||
|
||||
k_spin_unlock(&buffer->lock, key);
|
||||
|
||||
if (cont && valid_drop) {
|
||||
if (dropped_item) {
|
||||
/* Notify about item being dropped. */
|
||||
if (buffer->notify_drop) {
|
||||
buffer->notify_drop(buffer, dropped_item);
|
||||
}
|
||||
dropped_item = NULL;
|
||||
}
|
||||
} while (cont);
|
||||
|
||||
}
|
||||
|
||||
union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
|
||||
|
@ -278,9 +336,10 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
|
|||
{
|
||||
union mpsc_pbuf_generic *item = NULL;
|
||||
union mpsc_pbuf_generic *dropped_item = NULL;
|
||||
bool cont;
|
||||
bool cont = true;
|
||||
uint32_t free_wlen;
|
||||
bool valid_drop;
|
||||
uint32_t tmp_wr_idx_shift = 0;
|
||||
uint32_t tmp_wr_idx_val = 0;
|
||||
|
||||
MPSC_PBUF_DBG(buffer, "alloc %d words", (int)wlen);
|
||||
|
||||
|
@ -293,8 +352,12 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
|
|||
k_spinlock_key_t key;
|
||||
bool wrap;
|
||||
|
||||
cont = false;
|
||||
key = k_spin_lock(&buffer->lock);
|
||||
if (tmp_wr_idx_shift) {
|
||||
post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift);
|
||||
tmp_wr_idx_shift = 0;
|
||||
}
|
||||
|
||||
wrap = free_space(buffer, &free_wlen);
|
||||
|
||||
if (free_wlen >= wlen) {
|
||||
|
@ -303,30 +366,25 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
|
|||
item->hdr.valid = 0;
|
||||
item->hdr.busy = 0;
|
||||
tmp_wr_idx_inc(buffer, wlen);
|
||||
cont = false;
|
||||
} else if (wrap) {
|
||||
add_skip_item(buffer, free_wlen);
|
||||
cont = true;
|
||||
} else if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT) &&
|
||||
!k_is_in_isr()) {
|
||||
} else if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT) && !k_is_in_isr()) {
|
||||
int err;
|
||||
|
||||
k_spin_unlock(&buffer->lock, key);
|
||||
err = k_sem_take(&buffer->sem, timeout);
|
||||
key = k_spin_lock(&buffer->lock);
|
||||
if (err == 0) {
|
||||
cont = true;
|
||||
}
|
||||
} else {
|
||||
bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
|
||||
|
||||
dropped_item = drop_item_locked(buffer, free_wlen,
|
||||
user_drop, &valid_drop);
|
||||
cont = dropped_item != NULL;
|
||||
cont = (err == 0) ? true : false;
|
||||
} else if (cont) {
|
||||
tmp_wr_idx_val = buffer->tmp_wr_idx;
|
||||
cont = drop_item_locked(buffer, free_wlen,
|
||||
&dropped_item, &tmp_wr_idx_shift);
|
||||
}
|
||||
|
||||
k_spin_unlock(&buffer->lock, key);
|
||||
|
||||
if (cont && dropped_item && valid_drop) {
|
||||
if (dropped_item) {
|
||||
/* Notify about item being dropped. */
|
||||
if (buffer->notify_drop) {
|
||||
buffer->notify_drop(buffer, dropped_item);
|
||||
|
@ -335,6 +393,7 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
|
|||
}
|
||||
} while (cont);
|
||||
|
||||
|
||||
MPSC_PBUF_DBG(buffer, "allocated %p", item);
|
||||
|
||||
if (IS_ENABLED(CONFIG_MPSC_CLEAR_ALLOCATED) && item) {
|
||||
|
@ -367,15 +426,21 @@ void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer,
|
|||
(sizeof(item) + sizeof(data)) / sizeof(uint32_t);
|
||||
union mpsc_pbuf_generic *dropped_item = NULL;
|
||||
bool cont;
|
||||
bool valid_drop;
|
||||
uint32_t tmp_wr_idx_shift = 0;
|
||||
uint32_t tmp_wr_idx_val = 0;
|
||||
|
||||
do {
|
||||
k_spinlock_key_t key;
|
||||
uint32_t free_wlen;
|
||||
bool wrap;
|
||||
|
||||
cont = false;
|
||||
key = k_spin_lock(&buffer->lock);
|
||||
|
||||
if (tmp_wr_idx_shift) {
|
||||
post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift);
|
||||
tmp_wr_idx_shift = 0;
|
||||
}
|
||||
|
||||
wrap = free_space(buffer, &free_wlen);
|
||||
|
||||
if (free_wlen >= l) {
|
||||
|
@ -386,21 +451,20 @@ void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer,
|
|||
*p = (void *)data;
|
||||
tmp_wr_idx_inc(buffer, l);
|
||||
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, l);
|
||||
cont = false;
|
||||
max_utilization_update(buffer);
|
||||
} else if (wrap) {
|
||||
add_skip_item(buffer, free_wlen);
|
||||
cont = true;
|
||||
} else {
|
||||
bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
|
||||
|
||||
dropped_item = drop_item_locked(buffer, free_wlen,
|
||||
user_drop, &valid_drop);
|
||||
cont = dropped_item != NULL;
|
||||
tmp_wr_idx_val = buffer->tmp_wr_idx;
|
||||
cont = drop_item_locked(buffer, free_wlen,
|
||||
&dropped_item, &tmp_wr_idx_shift);
|
||||
}
|
||||
|
||||
k_spin_unlock(&buffer->lock, key);
|
||||
|
||||
if (cont && dropped_item && valid_drop) {
|
||||
if (dropped_item) {
|
||||
/* Notify about item being dropped. */
|
||||
if (buffer->notify_drop) {
|
||||
buffer->notify_drop(buffer, dropped_item);
|
||||
|
@ -415,15 +479,21 @@ void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, const uint32_t *data,
|
|||
{
|
||||
bool cont;
|
||||
union mpsc_pbuf_generic *dropped_item = NULL;
|
||||
bool valid_drop;
|
||||
uint32_t tmp_wr_idx_shift = 0;
|
||||
uint32_t tmp_wr_idx_val = 0;
|
||||
|
||||
do {
|
||||
uint32_t free_wlen;
|
||||
k_spinlock_key_t key;
|
||||
bool wrap;
|
||||
|
||||
cont = false;
|
||||
key = k_spin_lock(&buffer->lock);
|
||||
|
||||
if (tmp_wr_idx_shift) {
|
||||
post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift);
|
||||
tmp_wr_idx_shift = 0;
|
||||
}
|
||||
|
||||
wrap = free_space(buffer, &free_wlen);
|
||||
|
||||
if (free_wlen >= wlen) {
|
||||
|
@ -431,22 +501,22 @@ void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, const uint32_t *data,
|
|||
wlen * sizeof(uint32_t));
|
||||
buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
|
||||
tmp_wr_idx_inc(buffer, wlen);
|
||||
cont = false;
|
||||
max_utilization_update(buffer);
|
||||
} else if (wrap) {
|
||||
add_skip_item(buffer, free_wlen);
|
||||
cont = true;
|
||||
} else {
|
||||
bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
|
||||
|
||||
dropped_item = drop_item_locked(buffer, free_wlen,
|
||||
user_drop, &valid_drop);
|
||||
cont = dropped_item != NULL;
|
||||
tmp_wr_idx_val = buffer->tmp_wr_idx;
|
||||
cont = drop_item_locked(buffer, free_wlen,
|
||||
&dropped_item, &tmp_wr_idx_shift);
|
||||
}
|
||||
|
||||
k_spin_unlock(&buffer->lock, key);
|
||||
|
||||
if (cont && dropped_item && valid_drop) {
|
||||
if (dropped_item) {
|
||||
/* Notify about item being dropped. */
|
||||
dropped_item->hdr.valid = 0;
|
||||
if (buffer->notify_drop) {
|
||||
buffer->notify_drop(buffer, dropped_item);
|
||||
}
|
||||
|
@ -471,6 +541,7 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
|
|||
&buffer->buf[buffer->tmp_rd_idx];
|
||||
|
||||
if (!a || is_invalid(item)) {
|
||||
MPSC_PBUF_DBG(buffer, "invalid claim %d: %p", a, item);
|
||||
item = NULL;
|
||||
} else {
|
||||
uint32_t skip = get_skip(item);
|
||||
|
@ -492,7 +563,7 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
|
|||
}
|
||||
|
||||
if (!cont) {
|
||||
MPSC_PBUF_DBG(buffer, "claimed: %p", item);
|
||||
MPSC_PBUF_DBG(buffer, ">>claimed %d: %p", a, item);
|
||||
}
|
||||
k_spin_unlock(&buffer->lock, key);
|
||||
} while (cont);
|
||||
|
@ -511,12 +582,22 @@ void mpsc_pbuf_free(struct mpsc_pbuf_buffer *buffer,
|
|||
if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE) ||
|
||||
((uint32_t *)item == &buffer->buf[buffer->rd_idx])) {
|
||||
witem->hdr.busy = 0;
|
||||
if (buffer->rd_idx == buffer->tmp_rd_idx) {
|
||||
/* There is a chance that there are so many new packets
|
||||
* added between claim and free that rd_idx points again
|
||||
* at claimed item. In that case tmp_rd_idx points at
|
||||
* the same location. In that case increment also tmp_rd_idx
|
||||
* which will mark freed buffer as the only free space in
|
||||
* the buffer.
|
||||
*/
|
||||
buffer->tmp_rd_idx = idx_inc(buffer, buffer->tmp_rd_idx, wlen);
|
||||
}
|
||||
rd_idx_inc(buffer, wlen);
|
||||
} else {
|
||||
MPSC_PBUF_DBG(buffer, "Allocation occurred during claim");
|
||||
witem->skip.len = wlen;
|
||||
}
|
||||
MPSC_PBUF_DBG(buffer, "freed: %p", item);
|
||||
MPSC_PBUF_DBG(buffer, "<<freed: %p", item);
|
||||
|
||||
k_spin_unlock(&buffer->lock, key);
|
||||
k_sem_give(&buffer->sem);
|
||||
|
|
Loading…
Reference in a new issue