From 4a8b2d2d2fed8580ac20555210482505251881ab Mon Sep 17 00:00:00 2001 From: Youvedeep Singh Date: Mon, 19 Mar 2018 20:45:27 +0530 Subject: [PATCH] kernel: POSIX: Compatibility layer for POSIX message queue APIs. This patch provides POSIX message queue APIs for POSIX 1003.1 PSE52 standard. Signed-off-by: Youvedeep Singh --- arch/posix/include/posix_cheats.h | 16 ++ include/posix/mqueue.h | 66 +++++ kernel/Kconfig | 36 ++- kernel/posix/CMakeLists.txt | 1 + kernel/posix/mqueue.c | 434 ++++++++++++++++++++++++++++++ 5 files changed, 552 insertions(+), 1 deletion(-) create mode 100644 include/posix/mqueue.h create mode 100644 kernel/posix/mqueue.c diff --git a/arch/posix/include/posix_cheats.h b/arch/posix/include/posix_cheats.h index 229042c29a..a4da1aaf8b 100644 --- a/arch/posix/include/posix_cheats.h +++ b/arch/posix/include/posix_cheats.h @@ -41,6 +41,11 @@ #define sigevent zap_sigevent #define pthread_rwlock_obj zap_pthread_rwlock_obj #define pthread_rwlockattr_t zap_pthread_rwlockattr_t +#define mqueue_object zap_mqueue_object +#define mqueue_desc zap_mqueue_desc +#define mqd_t zap_mqd_t +#define mq_attr zap_mq_attr + /* Condition variables */ #define pthread_cond_init(...) zap_pthread_cond_init(__VA_ARGS__) @@ -147,6 +152,17 @@ #define pthread_rwlockattr_destroy(...)\ zap_pthread_rwlockattr_destroy(__VA_ARGS__) +/* message queue */ +#define mq_open(...) zap_mq_open(__VA_ARGS__) +#define mq_close(...) zap_mq_close(__VA_ARGS__) +#define mq_unlink(...) zap_mq_unlink(__VA_ARGS__) +#define mq_getattr(...) zap_mq_getattr(__VA_ARGS__) +#define mq_receive(...) zap_mq_receive(__VA_ARGS__) +#define mq_send(...) zap_mq_send(__VA_ARGS__) +#define mq_setattr(...) zap_mq_setattr(__VA_ARGS__) +#define mq_timedreceive(...) zap_mq_timedreceive(__VA_ARGS__) +#define mq_timedsend(...) zap_mq_timedsend(__VA_ARGS__) + #endif /* CONFIG_PTHREAD_IPC */ #endif /* CONFIG_ARCH_POSIX */ diff --git a/include/posix/mqueue.h b/include/posix/mqueue.h new file mode 100644 index 0000000000..1148c3fc5b --- /dev/null +++ b/include/posix/mqueue.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2018 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef __MQUEUE_H__ +#define __MQUEUE_H__ + +#include +#include +#include "sys/types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void *mqd_t; +typedef unsigned int mode_t; + +typedef struct mq_attr { + long mq_flags; + long mq_maxmsg; + long mq_msgsize; + long mq_curmsgs; /* Number of messages currently queued. */ +} mq_attr; + +/* FIXME: below should be defined into fcntl.h file. + * This is temporarily put here. + */ + +#ifndef _SYS_FCNTL_H_ +#define O_CREAT_POS 9 +#define O_CREAT (1 << O_CREAT_POS) + +#define O_EXCL_POS 11 +#define O_EXCL (1 << O_EXCL_POS) + +#define O_NONBLOCK_POS 14 +#define O_NONBLOCK (1 << O_NONBLOCK_POS) + +#define O_RDONLY 0 +#define O_WRONLY 1 +#define O_RDWR 2 +#endif /* _SYS_FCNTL_H_ */ + +mqd_t mq_open(const char *name, int oflags, ...); +int mq_close(mqd_t mqdes); +int mq_unlink(const char *name); +int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat); +int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, + unsigned int *msg_prio); +int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, + unsigned int msg_prio); +int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, + struct mq_attr *omqstat); +int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, + unsigned int *msg_prio, const struct timespec *abstime); +int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, + unsigned int msg_prio, const struct timespec *abstime); + +#ifdef __cplusplus +} +#endif + +#endif /* __MQUEUE_H__ */ diff --git a/kernel/Kconfig b/kernel/Kconfig index 1077b00112..b04e4e133a 100644 --- a/kernel/Kconfig +++ b/kernel/Kconfig @@ -371,7 +371,8 @@ config NUM_PIPE_ASYNC_MSGS config HEAP_MEM_POOL_SIZE int prompt "Heap memory pool size (in bytes)" - default 0 + default 0 if !POSIX_MQUEUE + default 1024 if POSIX_MQUEUE help This option specifies the size of the heap memory pool used when dynamically allocating memory using k_malloc(). Supported values @@ -520,6 +521,39 @@ config MAX_TIMER_COUNT range 0 255 help Mention maximum number of timers in POSIX compliant application. + +config POSIX_MQUEUE + bool + prompt "Enable POSIX message queue" + default n + help + This enabled POSIX message queue related APIs. + +if POSIX_MQUEUE +config MSG_COUNT_MAX + int + prompt "Maximum number of messages in message queue" + default 16 + help + Mention maximum number of messages in message queue in POSIX compliant + application. + +config MSG_SIZE_MAX + int + prompt "Maximum size of a message" + default 16 + help + Mention maximum size of message in bytes. + +config MQUEUE_NAMELEN_MAX + int + prompt "Maximum size of a name length" + default 16 + range 2 255 + help + Mention length of message queue name in number of characters. + +endif endif endmenu diff --git a/kernel/posix/CMakeLists.txt b/kernel/posix/CMakeLists.txt index ccfb59c0aa..0ecc735142 100644 --- a/kernel/posix/CMakeLists.txt +++ b/kernel/posix/CMakeLists.txt @@ -8,3 +8,4 @@ target_sources(kernel PRIVATE posix/clock.c) target_sources(kernel PRIVATE posix/timer.c) target_sources(kernel PRIVATE posix/pthread_rwlock.c) target_sources(kernel PRIVATE posix/semaphore.c) +target_sources_ifdef(CONFIG_POSIX_MQUEUE kernel PRIVATE posix/mqueue.c) diff --git a/kernel/posix/mqueue.c b/kernel/posix/mqueue.c new file mode 100644 index 0000000000..1d6086c3b5 --- /dev/null +++ b/kernel/posix/mqueue.c @@ -0,0 +1,434 @@ +/* + * Copyright (c) 2018 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include +#include +#include +#include +#include +#include + +typedef struct mqueue_object { + sys_snode_t snode; + char *mem_buffer; + char *mem_obj; + struct k_msgq queue; + atomic_t ref_count; + char *name; +} mqueue_object; + +typedef struct mqueue_desc { + char *mem_desc; + mqueue_object *mqueue; + u32_t flags; +} mqueue_desc; + +K_SEM_DEFINE(mq_sem, 1, 1); + +/* Initialize the list */ +sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list); + +s64_t timespec_to_timeoutms(const struct timespec *abstime); +static mqueue_object *find_in_list(const char *name); +static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len, + s32_t timeout); +static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, + s32_t timeout); +static void remove_mq(mqueue_object *msg_queue); + +/** + * @brief Open a message queue. + * + * Number of message queue and descriptor to message queue are limited by + * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE. + * + * See IEEE 1003.1 + */ +mqd_t mq_open(const char *name, int oflags, ...) +{ + va_list va; + mode_t mode; + mq_attr *attrs = NULL; + u32_t msg_size = 0, max_msgs = 0; + mqueue_object *msg_queue; + mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1); + char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr; + + va_start(va, oflags); + if (oflags & O_CREAT) { + mode = va_arg(va, mode_t); + attrs = va_arg(va, mq_attr*); + } + va_end(va); + + if (attrs != NULL) { + msg_size = attrs->mq_msgsize; + max_msgs = attrs->mq_maxmsg; + } + + if (name == NULL || ((oflags & O_CREAT) && (msg_size <= 0 || + max_msgs <= 0))) { + errno = EINVAL; + return (mqd_t)mqd; + } + + if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) { + errno = ENAMETOOLONG; + return (mqd_t)mqd; + } + + /* Check if queue already exists */ + k_sem_take(&mq_sem, K_FOREVER); + msg_queue = find_in_list(name); + k_sem_give(&mq_sem); + + if ((msg_queue != NULL) && (oflags & O_CREAT) && (oflags & O_EXCL)) { + /* Message queue has alreadey been opened and O_EXCL is set */ + errno = EEXIST; + return (mqd_t)mqd; + } + + if (msg_queue == NULL && !(oflags & O_CREAT)) { + errno = ENOENT; + return (mqd_t)mqd; + } + + mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc)); + if (mq_desc_ptr != NULL) { + memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc)); + msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr; + msg_queue_desc->mem_desc = mq_desc_ptr; + } else { + goto free_mq_desc; + } + + + /* Allocate mqueue object for new message queue */ + if (msg_queue == NULL) { + + /* Check for message quantity and size in message queue */ + if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX && + attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) { + goto free_mq_desc; + } + + mq_obj_ptr = k_malloc(sizeof(mqueue_object)); + if (mq_obj_ptr != NULL) { + memset(mq_obj_ptr, 0, sizeof(mqueue_object)); + msg_queue = (mqueue_object *)mq_obj_ptr; + msg_queue->mem_obj = mq_obj_ptr; + + } else { + goto free_mq_object; + } + + mq_name_ptr = k_malloc(strlen(name) + 1); + if (mq_name_ptr != NULL) { + memset(mq_name_ptr, 0, strlen(name) + 1); + msg_queue->name = mq_name_ptr; + + } else { + goto free_mq_name; + } + + strcpy(msg_queue->name, name); + + mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(u8_t)); + if (mq_buf_ptr != NULL) { + memset(mq_buf_ptr, 0, + msg_size * max_msgs * sizeof(u8_t)); + msg_queue->mem_buffer = mq_buf_ptr; + } else { + goto free_mq_buffer; + } + + atomic_set(&msg_queue->ref_count, 1); + /* initialize zephyr message queue */ + k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size, + max_msgs); + k_sem_take(&mq_sem, K_FOREVER); + sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode)); + k_sem_give(&mq_sem); + + } else { + atomic_inc(&msg_queue->ref_count); + } + + msg_queue_desc->mqueue = msg_queue; + msg_queue_desc->flags = (oflags & O_NONBLOCK) ? O_NONBLOCK : 0; + return (mqd_t)msg_queue_desc; + +free_mq_buffer: + k_free(mq_name_ptr); +free_mq_name: + k_free(mq_obj_ptr); +free_mq_object: + k_free(mq_desc_ptr); +free_mq_desc: + errno = ENOSPC; + return (mqd_t)mqd; +} + +/** + * @brief Close a message queue descriptor. + * + * See IEEE 1003.1 + */ +int mq_close(mqd_t mqdes) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + + if (mqd == NULL) { + errno = EBADF; + return -1; + } + + atomic_dec(&mqd->mqueue->ref_count); + + /* remove mq if marked for unlink */ + if (mqd->mqueue->name == NULL) { + remove_mq(mqd->mqueue); + } + + k_free(mqd->mem_desc); + return 0; +} + +/** + * @brief Remove a message queue. + * + * See IEEE 1003.1 + */ +int mq_unlink(const char *name) +{ + mqueue_object *msg_queue; + + k_sem_take(&mq_sem, K_FOREVER); + msg_queue = find_in_list(name); + + if (msg_queue == NULL) { + k_sem_give(&mq_sem); + errno = EBADF; + return -1; + } + + k_free(msg_queue->name); + msg_queue->name = NULL; + k_sem_give(&mq_sem); + remove_mq(msg_queue); + return 0; +} + +/** + * @brief Send a message to a message queue. + * + * All messages in message queue are of equal priority. + * + * See IEEE 1003.1 + */ +int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, + unsigned int msg_prio) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + s32_t timeout = K_FOREVER; + + return send_message(mqd, msg_ptr, msg_len, timeout); +} + +/** + * @brief Send message to a message queue within abstime time. + * + * All messages in message queue are of equal priority. + * + * See IEEE 1003.1 + */ +int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, + unsigned int msg_prio, const struct timespec *abstime) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + s32_t timeout; + + timeout = (s32_t) timespec_to_timeoutms(abstime); + return send_message(mqd, msg_ptr, msg_len, timeout); +} + +/** + * @brief Receive a message from a message queue. + * + * All messages in message queue are of equal priority. + * + * See IEEE 1003.1 + */ +int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, + unsigned int *msg_prio) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + s32_t timeout = K_FOREVER; + + return receive_message(mqd, msg_ptr, msg_len, timeout); + +} + +/** + * @brief Receive message from a message queue within abstime time. + * + * All messages in message queue are of equal priority. + * + * See IEEE 1003.1 + */ +int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, + unsigned int *msg_prio, const struct timespec *abstime) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + s32_t timeout = K_NO_WAIT; + + timeout = (s32_t) timespec_to_timeoutms(abstime); + return receive_message(mqd, msg_ptr, msg_len, timeout); +} + +/** + * @brief Get message queue attributes. + * + * See IEEE 1003.1 + */ +int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + mqueue_object *msg_queue = mqd->mqueue; + struct k_msgq_attrs attrs; + + if (mqd == NULL) { + errno = EBADF; + return -1; + } + + k_sem_take(&mq_sem, K_FOREVER); + k_msgq_get_attrs(&msg_queue->queue, &attrs); + mqstat->mq_flags = mqd->flags; + mqstat->mq_maxmsg = attrs.max_msgs; + mqstat->mq_msgsize = attrs.msg_size; + mqstat->mq_curmsgs = attrs.used_msgs; + k_sem_give(&mq_sem); + return 0; +} + +/** + * @brief Set message queue attributes. + * + * See IEEE 1003.1 + */ +int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, + struct mq_attr *omqstat) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + + if (mqd == NULL) { + errno = EBADF; + return -1; + } + + if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) { + errno = EINVAL; + return -1; + } + + if (omqstat != NULL) { + mq_getattr(mqdes, omqstat); + } + + k_sem_take(&mq_sem, K_FOREVER); + mqd->flags = mqstat->mq_flags; + k_sem_give(&mq_sem); + + return 0; +} + +/* Internal functions */ +static mqueue_object *find_in_list(const char *name) +{ + sys_snode_t *mq; + mqueue_object *msg_queue; + + mq = mq_list.head; + + while (mq != NULL) { + msg_queue = (mqueue_object *)mq; + if (strcmp(msg_queue->name, name) == 0) { + return msg_queue; + } + + mq = mq->next; + } + + return NULL; +} + +static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len, + s32_t timeout) +{ + s32_t ret = -1; + + if (mqd == NULL) { + errno = EBADF; + return ret; + } + + if (mqd->flags & O_NONBLOCK) { + timeout = K_NO_WAIT; + } + + if (msg_len > mqd->mqueue->queue.msg_size) { + errno = EMSGSIZE; + return ret; + } + + if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) { + errno = (timeout == K_NO_WAIT) ? EAGAIN : ETIMEDOUT; + return ret; + } + + return 0; +} + +static s32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, + s32_t timeout) +{ + int ret = -1; + + if (mqd == NULL) { + errno = EBADF; + return ret; + } + + if (msg_len < mqd->mqueue->queue.msg_size) { + errno = EMSGSIZE; + return ret; + } + + if (mqd->flags & O_NONBLOCK) { + timeout = K_NO_WAIT; + } + + if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) { + errno = (timeout != K_NO_WAIT) ? ETIMEDOUT : EAGAIN; + } else { + ret = mqd->mqueue->queue.msg_size; + } + + return ret; +} + +static void remove_mq(mqueue_object *msg_queue) +{ + if (atomic_cas(&msg_queue->ref_count, 0, 0)) { + k_sem_take(&mq_sem, K_FOREVER); + sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue); + k_sem_give(&mq_sem); + + /* Free mq buffer and pbject */ + k_free(msg_queue->mem_buffer); + k_free(msg_queue->mem_obj); + } +}