posix: eventfd: revise locking, signaling, and allocation

TL;DR - a complete rewrite.

Previously, the prototypical `eventfd()` usage (one thread
performing a blocking `read()`, followed by another thread
performing a `write()`) would deadlock Zephyr. This shortcoming
has existed in Zephyr's `eventfd()` implementation from the
start and the suggested workaround was to use `poll()`.

However, that is not sufficient for integrating 3rd-party
libraries that may rely on proper `eventfd()` blocking
operations such as `eventfd_read()` and `eventfd_write()`.

The culprit was the per-fdtable-entry `struct k_mutex`.

Here we perform a minor revision of the locking strategy
and employ `k_condvar_broadcast()` and `k_condvar_wait()`
to signal and wait on the holder of a given `struct k_mutex`.

It is important to note, however, that the primary means of
synchronizing the eventfd state is actually the eventfd
spinlock. The fdtable mutex and condition variable are mainly
used for the purposes of blocking io (r,w,close) and are not
used in the code path of non-blocking reads.

The `wait_q` and `k_poll_signal` entries were removed from
`struct eventfd` as they were unnecessary.

Additionally, switch to using a bitarray because it is
possibly faster than linear search for allocating and
deallocating eventfd resources.

Signed-off-by: Christopher Friedt <cfriedt@meta.com>
This commit is contained in:
Christopher Friedt 2023-05-30 17:16:20 -04:00 committed by Anas Nashif
parent 90343a1f6d
commit e6eb0a705b
2 changed files with 295 additions and 189 deletions

View file

@ -7,10 +7,9 @@
#ifndef ZEPHYR_INCLUDE_POSIX_SYS_EVENTFD_H_
#define ZEPHYR_INCLUDE_POSIX_SYS_EVENTFD_H_
#include <zephyr/kernel.h>
#include <zephyr/sys/fdtable.h>
#include <sys/types.h>
#include <stdint.h>
#include <zephyr/kernel.h>
#include <zephyr/posix/fcntl.h>
#ifdef __cplusplus

View file

@ -1,55 +1,57 @@
/*
* Copyright (c) 2020 Tobias Svehagen
* Copyright (c) 2023, Meta
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/kernel.h>
#include <zephyr/wait_q.h>
#include <zephyr/posix/sys/eventfd.h>
#include <zephyr/net/socket.h>
#include <ksched.h>
#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
#include <zephyr/posix/sys/eventfd.h>
#include <zephyr/sys/bitarray.h>
#include <zephyr/sys/fdtable.h>
#include <zephyr/sys/math_extras.h>
struct eventfd {
struct k_poll_signal read_sig;
struct k_poll_signal write_sig;
struct k_spinlock lock;
_wait_q_t wait_q;
eventfd_t cnt;
int flags;
};
K_MUTEX_DEFINE(eventfd_mtx);
static ssize_t eventfd_rw_op(void *obj, void *buf, size_t sz,
int (*op)(struct eventfd *efd, eventfd_t *value));
SYS_BITARRAY_DEFINE_STATIC(efds_bitarray, CONFIG_EVENTFD_MAX);
static struct eventfd efds[CONFIG_EVENTFD_MAX];
static const struct fd_op_vtable eventfd_fd_vtable;
static inline bool eventfd_is_in_use(struct eventfd *efd)
{
return (efd->flags & EFD_IN_USE) != 0;
}
static inline bool eventfd_is_semaphore(struct eventfd *efd)
{
return (efd->flags & EFD_SEMAPHORE) != 0;
}
static inline bool eventfd_is_blocking(struct eventfd *efd)
{
return (efd->flags & EFD_NONBLOCK) == 0;
}
static int eventfd_poll_prepare(struct eventfd *efd,
struct zsock_pollfd *pfd,
struct k_poll_event **pev,
struct k_poll_event *pev_end)
{
if (pfd->events & ZSOCK_POLLIN) {
if ((pfd->events & (ZSOCK_POLLOUT | ZSOCK_POLLIN)) != 0) {
if (*pev == pev_end) {
errno = ENOMEM;
return -1;
}
(*pev)->obj = &efd->read_sig;
(*pev)->type = K_POLL_TYPE_SIGNAL;
(*pev)->mode = K_POLL_MODE_NOTIFY_ONLY;
(*pev)->state = K_POLL_STATE_NOT_READY;
(*pev)++;
}
if (pfd->events & ZSOCK_POLLOUT) {
if (*pev == pev_end) {
errno = ENOMEM;
return -1;
}
(*pev)->obj = &efd->write_sig;
(*pev)->type = K_POLL_TYPE_SIGNAL;
(*pev)->mode = K_POLL_MODE_NOTIFY_ONLY;
(*pev)->state = K_POLL_STATE_NOT_READY;
**pev = (struct k_poll_event){0};
(*pev)++;
}
@ -60,141 +62,147 @@ static int eventfd_poll_update(struct eventfd *efd,
struct zsock_pollfd *pfd,
struct k_poll_event **pev)
{
ARG_UNUSED(efd);
if (pfd->events & ZSOCK_POLLIN) {
if ((*pev)->state != K_POLL_STATE_NOT_READY) {
pfd->revents |= ZSOCK_POLLIN;
}
pfd->revents |= ZSOCK_POLLIN * (efd->cnt > 0);
(*pev)++;
}
if (pfd->events & ZSOCK_POLLOUT) {
if ((*pev)->state != K_POLL_STATE_NOT_READY) {
pfd->revents |= ZSOCK_POLLOUT;
}
pfd->revents |= ZSOCK_POLLOUT * (efd->cnt < UINT64_MAX - 1);
(*pev)++;
}
return 0;
}
static int eventfd_read_locked(struct eventfd *efd, eventfd_t *value)
{
if (!eventfd_is_in_use(efd)) {
/* file descriptor has been closed */
return -EBADF;
}
if (efd->cnt == 0) {
/* would block / try again */
return -EAGAIN;
}
/* successful read */
if (eventfd_is_semaphore(efd)) {
*value = 1;
--efd->cnt;
} else {
*value = efd->cnt;
efd->cnt = 0;
}
return 0;
}
static int eventfd_write_locked(struct eventfd *efd, eventfd_t *value)
{
eventfd_t result;
if (!eventfd_is_in_use(efd)) {
/* file descriptor has been closed */
return -EBADF;
}
if (*value == UINT64_MAX) {
/* not a permitted value */
return -EINVAL;
}
if (u64_add_overflow(efd->cnt, *value, &result) || result == UINT64_MAX) {
/* would block / try again */
return -EAGAIN;
}
/* successful write */
efd->cnt = result;
return 0;
}
static ssize_t eventfd_read_op(void *obj, void *buf, size_t sz)
{
struct eventfd *efd = obj;
int result = 0;
eventfd_t count = 0;
k_spinlock_key_t key;
if (sz < sizeof(eventfd_t)) {
errno = EINVAL;
return -1;
}
for (;;) {
key = k_spin_lock(&efd->lock);
if ((efd->flags & EFD_NONBLOCK) && efd->cnt == 0) {
result = EAGAIN;
break;
} else if (efd->cnt == 0) {
z_pend_curr(&efd->lock, key, &efd->wait_q, K_FOREVER);
} else {
count = (efd->flags & EFD_SEMAPHORE) ? 1 : efd->cnt;
efd->cnt -= count;
if (efd->cnt == 0) {
k_poll_signal_reset(&efd->read_sig);
}
k_poll_signal_raise(&efd->write_sig, 0);
break;
}
}
if (z_unpend_all(&efd->wait_q) != 0) {
z_reschedule(&efd->lock, key);
} else {
k_spin_unlock(&efd->lock, key);
}
if (result != 0) {
errno = result;
return -1;
}
*(eventfd_t *)buf = count;
return sizeof(eventfd_t);
return eventfd_rw_op(obj, buf, sz, eventfd_read_locked);
}
static ssize_t eventfd_write_op(void *obj, const void *buf, size_t sz)
{
struct eventfd *efd = obj;
int result = 0;
eventfd_t count;
bool overflow;
k_spinlock_key_t key;
if (sz < sizeof(eventfd_t)) {
errno = EINVAL;
return -1;
}
count = *((eventfd_t *)buf);
if (count == UINT64_MAX) {
errno = EINVAL;
return -1;
}
if (count == 0) {
return sizeof(eventfd_t);
}
for (;;) {
key = k_spin_lock(&efd->lock);
overflow = UINT64_MAX - count <= efd->cnt;
if ((efd->flags & EFD_NONBLOCK) && overflow) {
result = EAGAIN;
break;
} else if (overflow) {
z_pend_curr(&efd->lock, key, &efd->wait_q, K_FOREVER);
} else {
efd->cnt += count;
if (efd->cnt == (UINT64_MAX - 1)) {
k_poll_signal_reset(&efd->write_sig);
}
k_poll_signal_raise(&efd->read_sig, 0);
break;
}
}
if (z_unpend_all(&efd->wait_q) != 0) {
z_reschedule(&efd->lock, key);
} else {
k_spin_unlock(&efd->lock, key);
}
if (result != 0) {
errno = result;
return -1;
}
return sizeof(eventfd_t);
return eventfd_rw_op(obj, (eventfd_t *)buf, sz, eventfd_write_locked);
}
static int eventfd_close_op(void *obj)
{
int ret;
int err;
k_spinlock_key_t key;
struct k_mutex *lock = NULL;
struct k_condvar *cond = NULL;
struct eventfd *efd = (struct eventfd *)obj;
efd->flags = 0;
if (k_is_in_isr()) {
/* not covered by the man page, but necessary in Zephyr */
errno = EWOULDBLOCK;
return -1;
}
return 0;
err = (int)z_get_obj_lock_and_cond(obj, &eventfd_fd_vtable, &lock, &cond);
__ASSERT((bool)err, "z_get_obj_lock_and_cond() failed");
__ASSERT_NO_MSG(lock != NULL);
__ASSERT_NO_MSG(cond != NULL);
err = k_mutex_lock(lock, K_FOREVER);
__ASSERT(err == 0, "k_mutex_lock() failed: %d", err);
key = k_spin_lock(&efd->lock);
if (!eventfd_is_in_use(efd)) {
errno = EBADF;
ret = -1;
goto unlock;
}
err = sys_bitarray_free(&efds_bitarray, 1, (struct eventfd *)obj - efds);
__ASSERT(err == 0, "sys_bitarray_free() failed: %d", err);
efd->flags = 0;
efd->cnt = 0;
ret = 0;
unlock:
k_spin_unlock(&efd->lock, key);
/* when closing an eventfd, broadcast to all waiters */
err = k_condvar_broadcast(cond);
__ASSERT(err == 0, "k_condvar_broadcast() failed: %d", err);
err = k_mutex_unlock(lock);
__ASSERT(err == 0, "k_mutex_unlock() failed: %d", err);
return ret;
}
static int eventfd_ioctl_op(void *obj, unsigned int request, va_list args)
{
int ret;
k_spinlock_key_t key;
struct eventfd *efd = (struct eventfd *)obj;
/* note: zsock_poll_internal() has already taken the mutex */
key = k_spin_lock(&efd->lock);
if (!eventfd_is_in_use(efd)) {
errno = EBADF;
ret = -1;
goto unlock;
}
switch (request) {
case F_GETFL:
return efd->flags & EFD_FLAGS_SET;
ret = efd->flags & EFD_FLAGS_SET;
break;
case F_SETFL: {
int flags;
@ -203,13 +211,12 @@ static int eventfd_ioctl_op(void *obj, unsigned int request, va_list args)
if (flags & ~EFD_FLAGS_SET) {
errno = EINVAL;
return -1;
ret = -1;
} else {
efd->flags = flags;
ret = 0;
}
efd->flags = flags;
return 0;
}
} break;
case ZFD_IOCTL_POLL_PREPARE: {
struct zsock_pollfd *pfd;
@ -220,8 +227,8 @@ static int eventfd_ioctl_op(void *obj, unsigned int request, va_list args)
pev = va_arg(args, struct k_poll_event **);
pev_end = va_arg(args, struct k_poll_event *);
return eventfd_poll_prepare(obj, pfd, pev, pev_end);
}
ret = eventfd_poll_prepare(obj, pfd, pev, pev_end);
} break;
case ZFD_IOCTL_POLL_UPDATE: {
struct zsock_pollfd *pfd;
@ -230,13 +237,19 @@ static int eventfd_ioctl_op(void *obj, unsigned int request, va_list args)
pfd = va_arg(args, struct zsock_pollfd *);
pev = va_arg(args, struct k_poll_event **);
return eventfd_poll_update(obj, pfd, pev);
}
ret = eventfd_poll_update(obj, pfd, pev);
} break;
default:
errno = EOPNOTSUPP;
return -1;
ret = -1;
break;
}
unlock:
k_spin_unlock(&efd->lock, key);
return ret;
}
static const struct fd_op_vtable eventfd_fd_vtable = {
@ -246,87 +259,181 @@ static const struct fd_op_vtable eventfd_fd_vtable = {
.ioctl = eventfd_ioctl_op,
};
/* common to both eventfd_read_op() and eventfd_write_op() */
static ssize_t eventfd_rw_op(void *obj, void *buf, size_t sz,
int (*op)(struct eventfd *efd, eventfd_t *value))
{
int err;
ssize_t ret;
k_spinlock_key_t key;
struct eventfd *efd = obj;
struct k_mutex *lock = NULL;
struct k_condvar *cond = NULL;
if (sz < sizeof(eventfd_t)) {
errno = EINVAL;
return -1;
}
if (buf == NULL) {
errno = EFAULT;
return -1;
}
key = k_spin_lock(&efd->lock);
if (!eventfd_is_blocking(efd)) {
/*
* Handle the non-blocking case entirely within this scope
*/
ret = op(efd, buf);
if (ret < 0) {
errno = -ret;
ret = -1;
} else {
ret = sizeof(eventfd_t);
}
goto unlock_spin;
}
/*
* Handle the blocking case below
*/
__ASSERT_NO_MSG(eventfd_is_blocking(efd));
if (k_is_in_isr()) {
/* not covered by the man page, but necessary in Zephyr */
errno = EWOULDBLOCK;
ret = -1;
goto unlock_spin;
}
err = (int)z_get_obj_lock_and_cond(obj, &eventfd_fd_vtable, &lock, &cond);
__ASSERT((bool)err, "z_get_obj_lock_and_cond() failed");
__ASSERT_NO_MSG(lock != NULL);
__ASSERT_NO_MSG(cond != NULL);
/* do not hold a spinlock when taking a mutex */
k_spin_unlock(&efd->lock, key);
err = k_mutex_lock(lock, K_FOREVER);
__ASSERT(err == 0, "k_mutex_lock() failed: %d", err);
while (true) {
/* retake the spinlock */
key = k_spin_lock(&efd->lock);
ret = op(efd, buf);
switch (ret) {
case -EAGAIN:
/* not an error in blocking mode. break and try again */
break;
case 0:
/* success! */
ret = sizeof(eventfd_t);
goto unlock_mutex;
default:
/* some other error */
__ASSERT_NO_MSG(ret < 0);
errno = -ret;
ret = -1;
goto unlock_mutex;
}
/* do not hold a spinlock when taking a mutex */
k_spin_unlock(&efd->lock, key);
/* wait for a write or close */
err = k_condvar_wait(cond, lock, K_FOREVER);
__ASSERT(err == 0, "k_condvar_wait() failed: %d", err);
}
unlock_mutex:
k_spin_unlock(&efd->lock, key);
/* only wake a single waiter */
err = k_condvar_signal(cond);
__ASSERT(err == 0, "k_condvar_signal() failed: %d", err);
err = k_mutex_unlock(lock);
__ASSERT(err == 0, "k_mutex_unlock() failed: %d", err);
goto out;
unlock_spin:
k_spin_unlock(&efd->lock, key);
out:
return ret;
}
/*
* Public-facing API
*/
int eventfd(unsigned int initval, int flags)
{
int fd = 1;
size_t offset;
struct eventfd *efd = NULL;
int fd = -1;
int i;
if (flags & ~EFD_FLAGS_SET) {
errno = EINVAL;
return -1;
}
k_mutex_lock(&eventfd_mtx, K_FOREVER);
for (i = 0; i < ARRAY_SIZE(efds); ++i) {
if (!(efds[i].flags & EFD_IN_USE)) {
efd = &efds[i];
break;
}
}
if (efd == NULL) {
if (sys_bitarray_alloc(&efds_bitarray, 1, &offset) < 0) {
errno = ENOMEM;
goto exit_mtx;
return -1;
}
efd = &efds[offset];
fd = z_reserve_fd();
if (fd < 0) {
goto exit_mtx;
sys_bitarray_free(&efds_bitarray, 1, offset);
return -1;
}
efd->flags = EFD_IN_USE | flags;
efd->cnt = initval;
k_poll_signal_init(&efd->write_sig);
k_poll_signal_init(&efd->read_sig);
z_waitq_init(&efd->wait_q);
if (initval != 0) {
k_poll_signal_raise(&efd->read_sig, 0);
}
k_poll_signal_raise(&efd->write_sig, 0);
z_finalize_fd(fd, efd, &eventfd_fd_vtable);
exit_mtx:
k_mutex_unlock(&eventfd_mtx);
return fd;
}
int eventfd_read(int fd, eventfd_t *value)
{
const struct fd_op_vtable *efd_vtable;
struct k_mutex *lock;
ssize_t ret;
int ret;
void *obj;
obj = z_get_fd_obj_and_vtable(fd, &efd_vtable, &lock);
obj = z_get_fd_obj(fd, &eventfd_fd_vtable, EBADF);
if (obj == NULL) {
return -1;
}
(void)k_mutex_lock(lock, K_FOREVER);
ret = eventfd_rw_op(obj, value, sizeof(eventfd_t), eventfd_read_locked);
__ASSERT_NO_MSG(ret == -1 || ret == sizeof(eventfd_t));
if (ret < 0) {
return -1;
}
ret = efd_vtable->read(obj, value, sizeof(*value));
k_mutex_unlock(lock);
return ret == sizeof(eventfd_t) ? 0 : -1;
return 0;
}
int eventfd_write(int fd, eventfd_t value)
{
const struct fd_op_vtable *efd_vtable;
struct k_mutex *lock;
ssize_t ret;
int ret;
void *obj;
obj = z_get_fd_obj_and_vtable(fd, &efd_vtable, &lock);
obj = z_get_fd_obj(fd, &eventfd_fd_vtable, EBADF);
if (obj == NULL) {
return -1;
}
(void)k_mutex_lock(lock, K_FOREVER);
ret = eventfd_rw_op(obj, &value, sizeof(eventfd_t), eventfd_write_locked);
__ASSERT_NO_MSG(ret == -1 || ret == sizeof(eventfd_t));
if (ret < 0) {
return -1;
}
ret = efd_vtable->write(obj, &value, sizeof(value));
k_mutex_unlock(lock);
return ret == sizeof(eventfd_t) ? 0 : -1;
return 0;
}