posix: Add implementation of mq_notify() function
The function was the last missing piece of the `_POSIX_MESSAGE_PASSING` option group. Due to lack of signal subsystem in the Zephyr RTOS the `sigev_notify` member of the `sigevent` structure that describes the notification cannot be set to `SIGEV_SIGNAL` - this notification type is not implemented, the function will return -1 and set `errno` to `ENOSYS`. `mq_notify` documentation: https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_notify.html Fixes #66958 Signed-off-by: Adam Wojasinski <awojasinski@baylibre.com>
This commit is contained in:
parent
9f1c256e46
commit
c1643f9701
|
@ -10,6 +10,7 @@
|
||||||
#include <zephyr/kernel.h>
|
#include <zephyr/kernel.h>
|
||||||
#include <zephyr/posix/time.h>
|
#include <zephyr/posix/time.h>
|
||||||
#include <zephyr/posix/fcntl.h>
|
#include <zephyr/posix/fcntl.h>
|
||||||
|
#include <zephyr/posix/signal.h>
|
||||||
#include <zephyr/posix/sys/stat.h>
|
#include <zephyr/posix/sys/stat.h>
|
||||||
#include "posix_types.h"
|
#include "posix_types.h"
|
||||||
|
|
||||||
|
@ -40,6 +41,7 @@ int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
|
||||||
unsigned int *msg_prio, const struct timespec *abstime);
|
unsigned int *msg_prio, const struct timespec *abstime);
|
||||||
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
|
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
|
||||||
unsigned int msg_prio, const struct timespec *abstime);
|
unsigned int msg_prio, const struct timespec *abstime);
|
||||||
|
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2018 Intel Corporation
|
* Copyright (c) 2018 Intel Corporation
|
||||||
|
* Copyright (c) 2024 BayLibre, SAS
|
||||||
*
|
*
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
*/
|
*/
|
||||||
|
@ -7,8 +8,10 @@
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <zephyr/sys/atomic.h>
|
#include <zephyr/sys/atomic.h>
|
||||||
#include <zephyr/posix/time.h>
|
|
||||||
#include <zephyr/posix/mqueue.h>
|
#include <zephyr/posix/mqueue.h>
|
||||||
|
#include <zephyr/posix/pthread.h>
|
||||||
|
|
||||||
|
#define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)
|
||||||
|
|
||||||
typedef struct mqueue_object {
|
typedef struct mqueue_object {
|
||||||
sys_snode_t snode;
|
sys_snode_t snode;
|
||||||
|
@ -17,6 +20,7 @@ typedef struct mqueue_object {
|
||||||
struct k_msgq queue;
|
struct k_msgq queue;
|
||||||
atomic_t ref_count;
|
atomic_t ref_count;
|
||||||
char *name;
|
char *name;
|
||||||
|
struct sigevent not;
|
||||||
} mqueue_object;
|
} mqueue_object;
|
||||||
|
|
||||||
typedef struct mqueue_desc {
|
typedef struct mqueue_desc {
|
||||||
|
@ -34,9 +38,11 @@ int64_t timespec_to_timeoutms(const struct timespec *abstime);
|
||||||
static mqueue_object *find_in_list(const char *name);
|
static mqueue_object *find_in_list(const char *name);
|
||||||
static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
|
static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
|
||||||
k_timeout_t timeout);
|
k_timeout_t timeout);
|
||||||
static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
|
static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
|
||||||
k_timeout_t timeout);
|
k_timeout_t timeout);
|
||||||
|
static void remove_notification(mqueue_object *msg_queue);
|
||||||
static void remove_mq(mqueue_object *msg_queue);
|
static void remove_mq(mqueue_object *msg_queue);
|
||||||
|
static void *mq_notify_thread(void *arg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Open a message queue.
|
* @brief Open a message queue.
|
||||||
|
@ -341,6 +347,74 @@ int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Notify process that a message is available.
|
||||||
|
*
|
||||||
|
* See IEEE 1003.1
|
||||||
|
*/
|
||||||
|
int mq_notify(mqd_t mqdes, const struct sigevent *notification)
|
||||||
|
{
|
||||||
|
mqueue_desc *mqd = (mqueue_desc *)mqdes;
|
||||||
|
|
||||||
|
if (mqd == NULL) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mqueue_object *msg_queue = mqd->mqueue;
|
||||||
|
|
||||||
|
if (notification == NULL) {
|
||||||
|
if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
remove_notification(msg_queue);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) {
|
||||||
|
errno = EBUSY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (notification->sigev_notify == SIGEV_SIGNAL) {
|
||||||
|
errno = ENOSYS;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (notification->sigev_notify_attributes != NULL) {
|
||||||
|
int ret = pthread_attr_setdetachstate(notification->sigev_notify_attributes,
|
||||||
|
PTHREAD_CREATE_DETACHED);
|
||||||
|
if (ret != 0) {
|
||||||
|
errno = ret;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
k_sem_take(&mq_sem, K_FOREVER);
|
||||||
|
memcpy(&msg_queue->not, notification, sizeof(struct sigevent));
|
||||||
|
k_sem_give(&mq_sem);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *mq_notify_thread(void *arg)
|
||||||
|
{
|
||||||
|
mqueue_object *mqueue = (mqueue_object *)arg;
|
||||||
|
struct sigevent *sevp = &mqueue->not;
|
||||||
|
|
||||||
|
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
|
||||||
|
|
||||||
|
if (sevp->sigev_notify_attributes == NULL) {
|
||||||
|
pthread_detach(pthread_self());
|
||||||
|
}
|
||||||
|
|
||||||
|
sevp->sigev_notify_function(sevp->sigev_value);
|
||||||
|
|
||||||
|
remove_notification(mqueue);
|
||||||
|
|
||||||
|
pthread_exit(NULL);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Internal functions */
|
/* Internal functions */
|
||||||
static mqueue_object *find_in_list(const char *name)
|
static mqueue_object *find_in_list(const char *name)
|
||||||
{
|
{
|
||||||
|
@ -380,11 +454,28 @@ static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_le
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue);
|
||||||
|
|
||||||
if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
|
if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
|
||||||
errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
|
errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) {
|
||||||
|
struct sigevent *sevp = &mqd->mqueue->not;
|
||||||
|
|
||||||
|
if (sevp->sigev_notify == SIGEV_NONE) {
|
||||||
|
sevp->sigev_notify_function(sevp->sigev_value);
|
||||||
|
} else if (sevp->sigev_notify == SIGEV_THREAD) {
|
||||||
|
pthread_t th;
|
||||||
|
|
||||||
|
ret = pthread_create(&th,
|
||||||
|
sevp->sigev_notify_attributes,
|
||||||
|
mq_notify_thread,
|
||||||
|
mqd->mqueue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -428,3 +519,10 @@ static void remove_mq(mqueue_object *msg_queue)
|
||||||
k_free(msg_queue->mem_obj);
|
k_free(msg_queue->mem_obj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void remove_notification(mqueue_object *msg_queue)
|
||||||
|
{
|
||||||
|
k_sem_take(&mq_sem, K_FOREVER);
|
||||||
|
memset(&msg_queue->not, 0, sizeof(struct sigevent));
|
||||||
|
k_sem_give(&mq_sem);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue