kernel: POSIX: Compatibility layer for POSIX message queue APIs.

This patch provides POSIX message queue APIs for POSIX
1003.1 PSE52 standard.

Signed-off-by: Youvedeep Singh <youvedeep.singh@intel.com>
This commit is contained in:
Youvedeep Singh 2018-03-19 20:45:27 +05:30 committed by Anas Nashif
parent 188c1ab5ca
commit 4a8b2d2d2f
5 changed files with 552 additions and 1 deletions

View file

@ -41,6 +41,11 @@
#define sigevent zap_sigevent #define sigevent zap_sigevent
#define pthread_rwlock_obj zap_pthread_rwlock_obj #define pthread_rwlock_obj zap_pthread_rwlock_obj
#define pthread_rwlockattr_t zap_pthread_rwlockattr_t #define pthread_rwlockattr_t zap_pthread_rwlockattr_t
#define mqueue_object zap_mqueue_object
#define mqueue_desc zap_mqueue_desc
#define mqd_t zap_mqd_t
#define mq_attr zap_mq_attr
/* Condition variables */ /* Condition variables */
#define pthread_cond_init(...) zap_pthread_cond_init(__VA_ARGS__) #define pthread_cond_init(...) zap_pthread_cond_init(__VA_ARGS__)
@ -147,6 +152,17 @@
#define pthread_rwlockattr_destroy(...)\ #define pthread_rwlockattr_destroy(...)\
zap_pthread_rwlockattr_destroy(__VA_ARGS__) zap_pthread_rwlockattr_destroy(__VA_ARGS__)
/* message queue */
#define mq_open(...) zap_mq_open(__VA_ARGS__)
#define mq_close(...) zap_mq_close(__VA_ARGS__)
#define mq_unlink(...) zap_mq_unlink(__VA_ARGS__)
#define mq_getattr(...) zap_mq_getattr(__VA_ARGS__)
#define mq_receive(...) zap_mq_receive(__VA_ARGS__)
#define mq_send(...) zap_mq_send(__VA_ARGS__)
#define mq_setattr(...) zap_mq_setattr(__VA_ARGS__)
#define mq_timedreceive(...) zap_mq_timedreceive(__VA_ARGS__)
#define mq_timedsend(...) zap_mq_timedsend(__VA_ARGS__)
#endif /* CONFIG_PTHREAD_IPC */ #endif /* CONFIG_PTHREAD_IPC */
#endif /* CONFIG_ARCH_POSIX */ #endif /* CONFIG_ARCH_POSIX */

66
include/posix/mqueue.h Normal file
View file

@ -0,0 +1,66 @@
/*
* Copyright (c) 2018 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef __MQUEUE_H__
#define __MQUEUE_H__
#include <kernel.h>
#include <time.h>
#include "sys/types.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef void *mqd_t;
typedef unsigned int mode_t;
typedef struct mq_attr {
long mq_flags;
long mq_maxmsg;
long mq_msgsize;
long mq_curmsgs; /* Number of messages currently queued. */
} mq_attr;
/* FIXME: below should be defined into fcntl.h file.
* This is temporarily put here.
*/
#ifndef _SYS_FCNTL_H_
#define O_CREAT_POS 9
#define O_CREAT (1 << O_CREAT_POS)
#define O_EXCL_POS 11
#define O_EXCL (1 << O_EXCL_POS)
#define O_NONBLOCK_POS 14
#define O_NONBLOCK (1 << O_NONBLOCK_POS)
#define O_RDONLY 0
#define O_WRONLY 1
#define O_RDWR 2
#endif /* _SYS_FCNTL_H_ */
mqd_t mq_open(const char *name, int oflags, ...);
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio);
int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
struct mq_attr *omqstat);
int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio, const struct timespec *abstime);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio, const struct timespec *abstime);
#ifdef __cplusplus
}
#endif
#endif /* __MQUEUE_H__ */

View file

@ -371,7 +371,8 @@ config NUM_PIPE_ASYNC_MSGS
config HEAP_MEM_POOL_SIZE config HEAP_MEM_POOL_SIZE
int int
prompt "Heap memory pool size (in bytes)" prompt "Heap memory pool size (in bytes)"
default 0 default 0 if !POSIX_MQUEUE
default 1024 if POSIX_MQUEUE
help help
This option specifies the size of the heap memory pool used when This option specifies the size of the heap memory pool used when
dynamically allocating memory using k_malloc(). Supported values dynamically allocating memory using k_malloc(). Supported values
@ -520,6 +521,39 @@ config MAX_TIMER_COUNT
range 0 255 range 0 255
help help
Mention maximum number of timers in POSIX compliant application. Mention maximum number of timers in POSIX compliant application.
config POSIX_MQUEUE
bool
prompt "Enable POSIX message queue"
default n
help
This enabled POSIX message queue related APIs.
if POSIX_MQUEUE
config MSG_COUNT_MAX
int
prompt "Maximum number of messages in message queue"
default 16
help
Mention maximum number of messages in message queue in POSIX compliant
application.
config MSG_SIZE_MAX
int
prompt "Maximum size of a message"
default 16
help
Mention maximum size of message in bytes.
config MQUEUE_NAMELEN_MAX
int
prompt "Maximum size of a name length"
default 16
range 2 255
help
Mention length of message queue name in number of characters.
endif
endif endif
endmenu endmenu

View file

@ -8,3 +8,4 @@ target_sources(kernel PRIVATE posix/clock.c)
target_sources(kernel PRIVATE posix/timer.c) target_sources(kernel PRIVATE posix/timer.c)
target_sources(kernel PRIVATE posix/pthread_rwlock.c) target_sources(kernel PRIVATE posix/pthread_rwlock.c)
target_sources(kernel PRIVATE posix/semaphore.c) target_sources(kernel PRIVATE posix/semaphore.c)
target_sources_ifdef(CONFIG_POSIX_MQUEUE kernel PRIVATE posix/mqueue.c)

434
kernel/posix/mqueue.c Normal file
View file

@ -0,0 +1,434 @@
/*
* Copyright (c) 2018 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <kernel.h>
#include <time.h>
#include <errno.h>
#include <mqueue.h>
#include <string.h>
#include <atomic.h>
typedef struct mqueue_object {
sys_snode_t snode;
char *mem_buffer;
char *mem_obj;
struct k_msgq queue;
atomic_t ref_count;
char *name;
} mqueue_object;
typedef struct mqueue_desc {
char *mem_desc;
mqueue_object *mqueue;
u32_t flags;
} mqueue_desc;
K_SEM_DEFINE(mq_sem, 1, 1);
/* Initialize the list */
sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
s64_t timespec_to_timeoutms(const struct timespec *abstime);
static mqueue_object *find_in_list(const char *name);
static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
s32_t timeout);
static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
s32_t timeout);
static void remove_mq(mqueue_object *msg_queue);
/**
* @brief Open a message queue.
*
* Number of message queue and descriptor to message queue are limited by
* heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
*
* See IEEE 1003.1
*/
mqd_t mq_open(const char *name, int oflags, ...)
{
va_list va;
mode_t mode;
mq_attr *attrs = NULL;
u32_t msg_size = 0, max_msgs = 0;
mqueue_object *msg_queue;
mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
va_start(va, oflags);
if (oflags & O_CREAT) {
mode = va_arg(va, mode_t);
attrs = va_arg(va, mq_attr*);
}
va_end(va);
if (attrs != NULL) {
msg_size = attrs->mq_msgsize;
max_msgs = attrs->mq_maxmsg;
}
if (name == NULL || ((oflags & O_CREAT) && (msg_size <= 0 ||
max_msgs <= 0))) {
errno = EINVAL;
return (mqd_t)mqd;
}
if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
errno = ENAMETOOLONG;
return (mqd_t)mqd;
}
/* Check if queue already exists */
k_sem_take(&mq_sem, K_FOREVER);
msg_queue = find_in_list(name);
k_sem_give(&mq_sem);
if ((msg_queue != NULL) && (oflags & O_CREAT) && (oflags & O_EXCL)) {
/* Message queue has alreadey been opened and O_EXCL is set */
errno = EEXIST;
return (mqd_t)mqd;
}
if (msg_queue == NULL && !(oflags & O_CREAT)) {
errno = ENOENT;
return (mqd_t)mqd;
}
mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
if (mq_desc_ptr != NULL) {
memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
msg_queue_desc->mem_desc = mq_desc_ptr;
} else {
goto free_mq_desc;
}
/* Allocate mqueue object for new message queue */
if (msg_queue == NULL) {
/* Check for message quantity and size in message queue */
if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) {
goto free_mq_desc;
}
mq_obj_ptr = k_malloc(sizeof(mqueue_object));
if (mq_obj_ptr != NULL) {
memset(mq_obj_ptr, 0, sizeof(mqueue_object));
msg_queue = (mqueue_object *)mq_obj_ptr;
msg_queue->mem_obj = mq_obj_ptr;
} else {
goto free_mq_object;
}
mq_name_ptr = k_malloc(strlen(name) + 1);
if (mq_name_ptr != NULL) {
memset(mq_name_ptr, 0, strlen(name) + 1);
msg_queue->name = mq_name_ptr;
} else {
goto free_mq_name;
}
strcpy(msg_queue->name, name);
mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(u8_t));
if (mq_buf_ptr != NULL) {
memset(mq_buf_ptr, 0,
msg_size * max_msgs * sizeof(u8_t));
msg_queue->mem_buffer = mq_buf_ptr;
} else {
goto free_mq_buffer;
}
atomic_set(&msg_queue->ref_count, 1);
/* initialize zephyr message queue */
k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
max_msgs);
k_sem_take(&mq_sem, K_FOREVER);
sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
k_sem_give(&mq_sem);
} else {
atomic_inc(&msg_queue->ref_count);
}
msg_queue_desc->mqueue = msg_queue;
msg_queue_desc->flags = (oflags & O_NONBLOCK) ? O_NONBLOCK : 0;
return (mqd_t)msg_queue_desc;
free_mq_buffer:
k_free(mq_name_ptr);
free_mq_name:
k_free(mq_obj_ptr);
free_mq_object:
k_free(mq_desc_ptr);
free_mq_desc:
errno = ENOSPC;
return (mqd_t)mqd;
}
/**
* @brief Close a message queue descriptor.
*
* See IEEE 1003.1
*/
int mq_close(mqd_t mqdes)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;
if (mqd == NULL) {
errno = EBADF;
return -1;
}
atomic_dec(&mqd->mqueue->ref_count);
/* remove mq if marked for unlink */
if (mqd->mqueue->name == NULL) {
remove_mq(mqd->mqueue);
}
k_free(mqd->mem_desc);
return 0;
}
/**
* @brief Remove a message queue.
*
* See IEEE 1003.1
*/
int mq_unlink(const char *name)
{
mqueue_object *msg_queue;
k_sem_take(&mq_sem, K_FOREVER);
msg_queue = find_in_list(name);
if (msg_queue == NULL) {
k_sem_give(&mq_sem);
errno = EBADF;
return -1;
}
k_free(msg_queue->name);
msg_queue->name = NULL;
k_sem_give(&mq_sem);
remove_mq(msg_queue);
return 0;
}
/**
* @brief Send a message to a message queue.
*
* All messages in message queue are of equal priority.
*
* See IEEE 1003.1
*/
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;
s32_t timeout = K_FOREVER;
return send_message(mqd, msg_ptr, msg_len, timeout);
}
/**
* @brief Send message to a message queue within abstime time.
*
* All messages in message queue are of equal priority.
*
* See IEEE 1003.1
*/
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio, const struct timespec *abstime)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;
s32_t timeout;
timeout = (s32_t) timespec_to_timeoutms(abstime);
return send_message(mqd, msg_ptr, msg_len, timeout);
}
/**
* @brief Receive a message from a message queue.
*
* All messages in message queue are of equal priority.
*
* See IEEE 1003.1
*/
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;
s32_t timeout = K_FOREVER;
return receive_message(mqd, msg_ptr, msg_len, timeout);
}
/**
* @brief Receive message from a message queue within abstime time.
*
* All messages in message queue are of equal priority.
*
* See IEEE 1003.1
*/
int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio, const struct timespec *abstime)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;
s32_t timeout = K_NO_WAIT;
timeout = (s32_t) timespec_to_timeoutms(abstime);
return receive_message(mqd, msg_ptr, msg_len, timeout);
}
/**
* @brief Get message queue attributes.
*
* See IEEE 1003.1
*/
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;
mqueue_object *msg_queue = mqd->mqueue;
struct k_msgq_attrs attrs;
if (mqd == NULL) {
errno = EBADF;
return -1;
}
k_sem_take(&mq_sem, K_FOREVER);
k_msgq_get_attrs(&msg_queue->queue, &attrs);
mqstat->mq_flags = mqd->flags;
mqstat->mq_maxmsg = attrs.max_msgs;
mqstat->mq_msgsize = attrs.msg_size;
mqstat->mq_curmsgs = attrs.used_msgs;
k_sem_give(&mq_sem);
return 0;
}
/**
* @brief Set message queue attributes.
*
* See IEEE 1003.1
*/
int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
struct mq_attr *omqstat)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;
if (mqd == NULL) {
errno = EBADF;
return -1;
}
if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
errno = EINVAL;
return -1;
}
if (omqstat != NULL) {
mq_getattr(mqdes, omqstat);
}
k_sem_take(&mq_sem, K_FOREVER);
mqd->flags = mqstat->mq_flags;
k_sem_give(&mq_sem);
return 0;
}
/* Internal functions */
static mqueue_object *find_in_list(const char *name)
{
sys_snode_t *mq;
mqueue_object *msg_queue;
mq = mq_list.head;
while (mq != NULL) {
msg_queue = (mqueue_object *)mq;
if (strcmp(msg_queue->name, name) == 0) {
return msg_queue;
}
mq = mq->next;
}
return NULL;
}
static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
s32_t timeout)
{
s32_t ret = -1;
if (mqd == NULL) {
errno = EBADF;
return ret;
}
if (mqd->flags & O_NONBLOCK) {
timeout = K_NO_WAIT;
}
if (msg_len > mqd->mqueue->queue.msg_size) {
errno = EMSGSIZE;
return ret;
}
if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
errno = (timeout == K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
return ret;
}
return 0;
}
static s32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
s32_t timeout)
{
int ret = -1;
if (mqd == NULL) {
errno = EBADF;
return ret;
}
if (msg_len < mqd->mqueue->queue.msg_size) {
errno = EMSGSIZE;
return ret;
}
if (mqd->flags & O_NONBLOCK) {
timeout = K_NO_WAIT;
}
if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
errno = (timeout != K_NO_WAIT) ? ETIMEDOUT : EAGAIN;
} else {
ret = mqd->mqueue->queue.msg_size;
}
return ret;
}
static void remove_mq(mqueue_object *msg_queue)
{
if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
k_sem_take(&mq_sem, K_FOREVER);
sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
k_sem_give(&mq_sem);
/* Free mq buffer and pbject */
k_free(msg_queue->mem_buffer);
k_free(msg_queue->mem_obj);
}
}