net: mqtt-sn: Add MQTT-SN library

This commit adds an implementation of MQTT-SN v1.2.
The specification is available on oasis-open.org:
https://www.oasis-open.org/committees/download.php/66091/MQTT-SN_spec_v1.2.pdf

The following things are missing in this implementation:
- Pre-defined topic IDs
- QoS -1 - it's most useful with predefined topics
- Gateway discovery using ADVERTISE, SEARCHGW and GWINFO messages.
- Setting the will topic and message after the initial connect
- Forwarder Encapsulation

Signed-off-by: René Beckmann <rene.beckmann@grandcentrix.net>
This commit is contained in:
René Beckmann 2022-10-30 15:09:30 +01:00 committed by Carles Cufí
parent b2c5d7cb60
commit 26758117d6
10 changed files with 2955 additions and 0 deletions

View file

@ -0,0 +1,316 @@
/*
* Copyright (c) 2022 René Beckmann
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_sn.h
*
* @defgroup mqtt_sn_socket MQTT Client library
* @ingroup networking
* @{
* @brief MQTT-SN Client Implementation
*
* @details
* MQTT-SN Client's Application interface is defined in this header.
* Targets protocol version 1.2.
*
*/
#ifndef ZEPHYR_INCLUDE_NET_MQTT_SN_H_
#define ZEPHYR_INCLUDE_NET_MQTT_SN_H_
#include <stddef.h>
#include <zephyr/net/buf.h>
#include <zephyr/types.h>
#include <sys/types.h>
#ifdef CONFIG_MQTT_SN_TRANSPORT_UDP
#include <zephyr/net/net_ip.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
/**
* Quality of Service. QoS 0-2 work the same as basic MQTT, QoS -1 is an MQTT-SN addition.
* QOS -1 is not supported yet.
*/
enum mqtt_sn_qos {
MQTT_SN_QOS_0, /**< QOS 0 */
MQTT_SN_QOS_1, /**< QOS 1 */
MQTT_SN_QOS_2, /**< QOS 2 */
MQTT_SN_QOS_M1 /**< QOS -1 */
};
/**
* MQTT-SN topic types.
*/
enum mqtt_sn_topic_type {
MQTT_SN_TOPIC_TYPE_NORMAL,
MQTT_SN_TOPIC_TYPE_PREDEF,
MQTT_SN_TOPIC_TYPE_SHORT
};
enum mqtt_sn_return_code {
MQTT_SN_CODE_ACCEPTED = 0x00, /**< Accepted */
MQTT_SN_CODE_REJECTED_CONGESTION = 0x01, /**< Rejected: congestion */
MQTT_SN_CODE_REJECTED_TOPIC_ID = 0x02, /**< Rejected: Invalid Topic ID */
MQTT_SN_CODE_REJECTED_NOTSUP = 0x03, /**< Rejected: Not Supported */
};
/** @brief Abstracts memory buffers. */
struct mqtt_sn_data {
const uint8_t *data; /**< Pointer to data. */
uint16_t size; /**< Size of data, in bytes. */
};
/**
* @brief Initialize memory buffer from C literal string.
*
* Use it as follows:
*
* struct mqtt_sn_data topic = MQTT_SN_DATA_STRING_LITERAL("/zephyr");
*
* @param[in] literal Literal string from which to generate mqtt_sn_data object.
*/
#define MQTT_SN_DATA_STRING_LITERAL(literal) ((struct mqtt_sn_data){literal, sizeof(literal) - 1})
/**
* @brief Initialize memory buffer from single bytes.
*
* Use it as follows:
*
* struct mqtt_sn_data data = MQTT_SN_DATA_BYTES(0x13, 0x37);
*/
#define MQTT_SN_DATA_BYTES(...) \
((struct mqtt_sn_data) { (uint8_t[]){ __VA_ARGS__ }, sizeof((uint8_t[]){ __VA_ARGS__ })})
/**
* Event types that can be emitted by the library.
*/
enum mqtt_sn_evt_type {
MQTT_SN_EVT_CONNECTED, /**< Connected to a gateway */
MQTT_SN_EVT_DISCONNECTED, /**< Disconnected */
MQTT_SN_EVT_ASLEEP, /**< Entered ASLEEP state */
MQTT_SN_EVT_AWAKE, /**< Entered AWAKE state */
MQTT_SN_EVT_PUBLISH, /**< Received a PUBLISH message */
MQTT_SN_EVT_PINGRESP /**< Received a PINGRESP */
};
/**
* Event metadata.
*/
union mqtt_sn_evt_param {
struct {
struct mqtt_sn_data data;
enum mqtt_sn_topic_type topic_type;
uint16_t topic_id;
} publish;
};
/**
* MQTT-SN event structure to be handled by the event callback.
*/
struct mqtt_sn_evt {
enum mqtt_sn_evt_type type;
union mqtt_sn_evt_param param;
};
struct mqtt_sn_client;
/**
* @brief Asynchronous event notification callback registered by the
* application.
*
* @param[in] client Identifies the client for which the event is notified.
* @param[in] evt Event description along with result and associated
* parameters (if any).
*/
typedef void (*mqtt_sn_evt_cb_t)(struct mqtt_sn_client *client, const struct mqtt_sn_evt *evt);
/**
* @brief Structure to describe an MQTT-SN transport.
*
* MQTT-SN does not require transports to be reliable or to hold a connection.
* Transports just need to be frame-based, so you can use UDP, ZigBee, or even
* a simple UART, given some kind of framing protocol is used.
*/
struct mqtt_sn_transport {
/**
* @brief Will be called once on client init to initialize the transport.
*
* Use this to open sockets or similar. May be NULL.
*/
int (*init)(struct mqtt_sn_transport *transport);
/**
* @brief Will be called on client deinit
*
* Use this to close sockets or similar. May be NULL.
*/
void (*deinit)(struct mqtt_sn_transport *transport);
/**
* Will be called by the library when it wants to send a message.
*/
int (*msg_send)(struct mqtt_sn_client *client, void *buf, size_t sz);
/**
* @brief Will be called by the library when it wants to receive a message.
*
* Implementations should follow recv conventions.
*/
ssize_t (*recv)(struct mqtt_sn_client *client, void *buffer, size_t length);
/**
* @brief Check if incoming data is available.
*
* If poll() returns a positive number, recv must not block.
*
* May be NULL, but recv should not block then either.
*
* @return Positive number if data is available, or zero if there is none.
* Negative values signal errors.
*/
int (*poll)(struct mqtt_sn_client *client);
};
#ifdef CONFIG_MQTT_SN_TRANSPORT_UDP
/**
* Transport struct for UDP based transport.
*/
struct mqtt_sn_transport_udp {
/** Parent struct */
struct mqtt_sn_transport tp;
/** Socket FD */
int sock;
/** Address of the gateway */
struct sockaddr gwaddr;
socklen_t gwaddrlen;
};
#define UDP_TRANSPORT(transport) CONTAINER_OF(transport, struct mqtt_sn_transport_udp, tp)
/**
* @brief Initialize the UDP transport.
*
* @param[in] udp The transport to be initialized
* @param[in] gwaddr Pre-initialized gateway address
* @param[in] addrlen Size of the gwaddr structure.
*/
int mqtt_sn_transport_udp_init(struct mqtt_sn_transport_udp *udp, struct sockaddr *gwaddr,
socklen_t addrlen);
#endif
/**
* Structure describing an MQTT-SN client.
*/
struct mqtt_sn_client {
struct mqtt_sn_data client_id; /**< 1-23 character unique client ID */
struct mqtt_sn_data will_topic; /**< Must be initialized before connecting with will=true */
struct mqtt_sn_data will_msg; /**< Must be initialized before connecting with will=true */
enum mqtt_sn_qos will_qos;
bool will_retain;
struct mqtt_sn_transport *transport;
struct net_buf_simple tx;
struct net_buf_simple rx;
mqtt_sn_evt_cb_t evt_cb;
uint16_t next_msg_id;
sys_slist_t publish;
sys_slist_t topic;
int state;
int64_t last_ping;
uint8_t ping_retries;
struct k_work_delayable process_work;
};
/**
* @brief Initialize a client.
*/
int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
size_t txsz, void *rx, size_t rxsz);
/**
* @brief Deinitialize the client
*
* This removes all topics and publishes, and also deinits the transport.
*/
void mqtt_sn_client_deinit(struct mqtt_sn_client *client);
/**
* @brief Connect the client.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session);
/**
* @brief Disconnect the client.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_disconnect(struct mqtt_sn_client *client);
/**
* @brief Set the client into sleep state for the given duration (seconds).
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration);
/**
* @brief Subscribe to a given topic.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name);
/**
* @brief Unsubscribe from a topic.
*/
int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name);
/**
* @brief Publish a value.
*
* If the topic is not yet registered with the gateway, the library takes care of it.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data);
/**
* @brief Check the transport for new incoming data.
*
* Call this function periodically, or if you have good reason to believe there is any data.
* If the client's transport struct contains a poll-function, this function is non-blocking.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_input(struct mqtt_sn_client *client);
#ifdef __cplusplus
}
#endif
#endif /* ZEPHYR_INCLUDE_NET_MQTT_SN_H_ */
/**@} */

View file

@ -6,6 +6,7 @@ add_subdirectory_ifdef(CONFIG_LWM2M lwm2m)
add_subdirectory_ifdef(CONFIG_SOCKS socks)
add_subdirectory_ifdef(CONFIG_SNTP sntp)
add_subdirectory_ifdef(CONFIG_MQTT_LIB mqtt)
add_subdirectory_ifdef(CONFIG_MQTT_SN_LIB mqtt_sn)
add_subdirectory_ifdef(CONFIG_TFTP_LIB tftp)
add_subdirectory_ifdef(CONFIG_NET_CONFIG_SETTINGS config)
add_subdirectory_ifdef(CONFIG_NET_SOCKETS sockets)

View file

@ -9,6 +9,8 @@ source "subsys/net/lib/dns/Kconfig"
source "subsys/net/lib/mqtt/Kconfig"
source "subsys/net/lib/mqtt_sn/Kconfig"
source "subsys/net/lib/tftp/Kconfig"
source "subsys/net/lib/http/Kconfig"

View file

@ -0,0 +1,13 @@
# SPDX-License-Identifier: Apache-2.0
zephyr_library()
zephyr_library_sources(
mqtt_sn.c
mqtt_sn_decoder.c
mqtt_sn_encoder.c
)
zephyr_library_sources_ifdef(CONFIG_MQTT_SN_TRANSPORT_UDP
mqtt_sn_transport_udp.c
)

View file

@ -0,0 +1,60 @@
# MQTT-SN Library for Zephyr
# Copyright (c) 2022 René Beckmann
# SPDX-License-Identifier: Apache-2.0
config MQTT_SN_LIB
bool "MQTT-SN Library Support [EXPERIMENTAL]"
select EXPERIMENTAL
help
Enable the Zephyr MQTT Library
if MQTT_SN_LIB
config MQTT_SN_LIB_MAX_PAYLOAD_SIZE
int "Maximum payload size of an MQTT-SN message"
default 255
config MQTT_SN_LIB_MAX_MSGS
int "Number of preallocated messages"
default 10
config MQTT_SN_LIB_MAX_TOPICS
int "Number of topics that can be managed"
default 20
config MQTT_SN_LIB_MAX_TOPIC_SIZE
int "Maximum topic length"
default 64
config MQTT_SN_LIB_MAX_PUBLISH
int "Number of publishes that can be in-flight at the same time"
default 5
config MQTT_SN_KEEPALIVE
int "Maximum number of clients Keep alive time for MQTT-SN (in seconds)"
default 60
help
Keep alive time for MQTT-SN (in seconds). Sending of Ping Requests to
keep the connection alive are governed by this value.
config MQTT_SN_TRANSPORT_UDP
bool "UDP transport for MQTT-SN"
select NET_SOCKETS
config MQTT_SN_LIB_N_RETRY
int "Number of times to retry messages"
range 1 20
default 5
config MQTT_SN_LIB_T_RETRY
int "Time (seconds) to wait for responses"
default 10
module=MQTT_SN
module-dep=NET_LOG
module-str=Log level for MQTT-SN
module-help=Enables mqtt-sn debug messages.
source "subsys/net/Kconfig.template.log_config.net"
endif # MQTT_SN_LIB

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,375 @@
/*
* Copyright (c) 2022 René Beckmann
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_sn_decoder.c
*
* @brief MQTT-SN message decoder.
*/
#include "mqtt_sn_msg.h"
#include <zephyr/net/mqtt_sn.h>
#include <zephyr/logging/log.h>
LOG_MODULE_DECLARE(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
/**
* @brief Decode the length of a message payload.
*
* From the specification:
*
* The Length field is either 1- or 3-octet long and specifies the total number of octets
* contained in the message (including the Length field itself).
* If the first octet of the Length field is coded 0x01 then the Length field is 3-octet long;
* in this case, the two following octets specify the total number of octets of the message
* (most-significant octet first). Otherwise, the Length field is only 1-octet long and specifies
* itself the total number of octets contained in the message. The 3-octet format allows the
* encoding of message lengths up to 65535 octets. Messages with lengths smaller than 256 octets
* may use the shorter 1-octet format.
*
* @param buf
* @return Size of the message not including the length field or negative error code
*/
static ssize_t decode_payload_length(struct net_buf_simple *buf)
{
size_t length;
size_t length_field_s = 1;
size_t buflen = buf->len;
/*
* Size must not be larger than an uint16_t can fit,
* minus 3 bytes for the length field itself
*/
if (buflen > UINT16_MAX) {
LOG_ERR("Message too large");
return -EFBIG;
}
length = net_buf_simple_pull_u8(buf);
if (length == MQTT_SN_LENGTH_FIELD_EXTENDED_PREFIX) {
length = net_buf_simple_pull_be16(buf);
length_field_s = 3;
}
if (length != buflen) {
LOG_ERR("Message length %zu != buffer size %zu", length, buflen);
return -EPROTO;
}
if (length <= length_field_s) {
LOG_ERR("Message length %zu - contains no data?", length);
return -ENODATA;
}
/* subtract the size of the length field to get message length */
return length - length_field_s;
}
static void decode_flags(struct net_buf_simple *buf, struct mqtt_sn_flags *flags)
{
uint8_t b = net_buf_simple_pull_u8(buf);
flags->dup = (bool)(b & MQTT_SN_FLAGS_DUP);
flags->retain = (bool)(b & MQTT_SN_FLAGS_RETAIN);
flags->will = (bool)(b & MQTT_SN_FLAGS_WILL);
flags->clean_session = (bool)(b & MQTT_SN_FLAGS_CLEANSESSION);
flags->qos = (enum mqtt_sn_qos)((b & MQTT_SN_FLAGS_MASK_QOS) >> MQTT_SN_FLAGS_SHIFT_QOS);
flags->topic_type = (enum mqtt_sn_topic_type)((b & MQTT_SN_FLAGS_MASK_TOPICID_TYPE) >>
MQTT_SN_FLAGS_SHIFT_TOPICID_TYPE);
}
static void decode_data(struct net_buf_simple *buf, struct mqtt_sn_data *dest)
{
dest->size = buf->len;
dest->data = net_buf_simple_pull_mem(buf, buf->len);
}
static int decode_empty_message(struct net_buf_simple *buf)
{
if (buf->len) {
LOG_ERR("Message not empty");
return -EPROTO;
}
return 0;
}
static int decode_msg_advertise(struct net_buf_simple *buf, struct mqtt_sn_param_advertise *params)
{
if (buf->len != 3) {
return -EPROTO;
}
params->gw_id = net_buf_simple_pull_u8(buf);
params->duration = net_buf_simple_pull_be16(buf);
return 0;
}
static int decode_msg_gwinfo(struct net_buf_simple *buf, struct mqtt_sn_param_gwinfo *params)
{
if (buf->len < 1) {
return -EPROTO;
}
params->gw_id = net_buf_simple_pull_u8(buf);
if (buf->len) {
decode_data(buf, &params->gw_add);
}
return 0;
}
static int decode_msg_connack(struct net_buf_simple *buf, struct mqtt_sn_param_connack *params)
{
if (buf->len != 1) {
return -EPROTO;
}
params->ret_code = net_buf_simple_pull_u8(buf);
return 0;
}
static int decode_msg_willtopicreq(struct net_buf_simple *buf)
{
return decode_empty_message(buf);
}
static int decode_msg_willmsgreq(struct net_buf_simple *buf)
{
return decode_empty_message(buf);
}
static int decode_msg_register(struct net_buf_simple *buf, struct mqtt_sn_param_register *params)
{
if (buf->len < 5) {
return -EPROTO;
}
params->topic_id = net_buf_simple_pull_be16(buf);
params->msg_id = net_buf_simple_pull_be16(buf);
decode_data(buf, &params->topic);
return 0;
}
static int decode_msg_regack(struct net_buf_simple *buf, struct mqtt_sn_param_regack *params)
{
if (buf->len != 5) {
return -EPROTO;
}
params->topic_id = net_buf_simple_pull_be16(buf);
params->msg_id = net_buf_simple_pull_be16(buf);
params->ret_code = net_buf_simple_pull_u8(buf);
return 0;
}
static int decode_msg_publish(struct net_buf_simple *buf, struct mqtt_sn_param_publish *params)
{
struct mqtt_sn_flags flags;
if (buf->len < 6) {
return -EPROTO;
}
decode_flags(buf, &flags);
params->dup = flags.dup;
params->qos = flags.qos;
params->retain = flags.retain;
params->topic_type = flags.topic_type;
params->topic_id = net_buf_simple_pull_be16(buf);
params->msg_id = net_buf_simple_pull_be16(buf);
decode_data(buf, &params->data);
return 0;
}
static int decode_msg_puback(struct net_buf_simple *buf, struct mqtt_sn_param_puback *params)
{
if (buf->len != 5) {
return -EPROTO;
}
params->topic_id = net_buf_simple_pull_be16(buf);
params->msg_id = net_buf_simple_pull_be16(buf);
params->ret_code = net_buf_simple_pull_u8(buf);
return 0;
}
static int decode_msg_pubrec(struct net_buf_simple *buf, struct mqtt_sn_param_pubrec *params)
{
if (buf->len != 2) {
return -EPROTO;
}
params->msg_id = net_buf_simple_pull_be16(buf);
return 0;
}
static int decode_msg_pubrel(struct net_buf_simple *buf, struct mqtt_sn_param_pubrel *params)
{
if (buf->len != 2) {
return -EPROTO;
}
params->msg_id = net_buf_simple_pull_be16(buf);
return 0;
}
static int decode_msg_pubcomp(struct net_buf_simple *buf, struct mqtt_sn_param_pubcomp *params)
{
if (buf->len != 2) {
return -EPROTO;
}
params->msg_id = net_buf_simple_pull_be16(buf);
return 0;
}
static int decode_msg_suback(struct net_buf_simple *buf, struct mqtt_sn_param_suback *params)
{
struct mqtt_sn_flags flags;
if (buf->len != 6) {
return -EPROTO;
}
decode_flags(buf, &flags);
params->qos = flags.qos;
params->topic_id = net_buf_simple_pull_be16(buf);
params->msg_id = net_buf_simple_pull_be16(buf);
params->ret_code = net_buf_simple_pull_u8(buf);
return 0;
}
static int decode_msg_unsuback(struct net_buf_simple *buf, struct mqtt_sn_param_unsuback *params)
{
if (buf->len != 2) {
return -EPROTO;
}
params->msg_id = net_buf_simple_pull_be16(buf);
return 0;
}
static int decode_msg_pingreq(struct net_buf_simple *buf)
{
/* The client_id field is only set if the message was sent by a client. */
return decode_empty_message(buf);
}
static int decode_msg_pingresp(struct net_buf_simple *buf)
{
return decode_empty_message(buf);
}
static int decode_msg_disconnect(struct net_buf_simple *buf,
struct mqtt_sn_param_disconnect *params)
{
/* The duration field is only set if the message was sent by a client. */
return decode_empty_message(buf);
}
static int decode_msg_willtopicresp(struct net_buf_simple *buf,
struct mqtt_sn_param_willtopicresp *params)
{
if (buf->len != 1) {
return -EPROTO;
}
params->ret_code = net_buf_simple_pull_u8(buf);
return 0;
}
static int decode_msg_willmsgresp(struct net_buf_simple *buf,
struct mqtt_sn_param_willmsgresp *params)
{
if (buf->len != 1) {
return -EPROTO;
}
params->ret_code = net_buf_simple_pull_u8(buf);
return 0;
}
int mqtt_sn_decode_msg(struct net_buf_simple *buf, struct mqtt_sn_param *params)
{
ssize_t len;
if (!buf || !buf->len) {
return -EINVAL;
}
len = decode_payload_length(buf);
if (len < 0) {
LOG_ERR("Could not decode message: %d", (int)len);
return (int)len;
}
params->type = (enum mqtt_sn_msg_type)net_buf_simple_pull_u8(buf);
LOG_INF("Decoding message type: %d", params->type);
switch (params->type) {
case MQTT_SN_MSG_TYPE_ADVERTISE:
return decode_msg_advertise(buf, &params->params.advertise);
case MQTT_SN_MSG_TYPE_GWINFO:
return decode_msg_gwinfo(buf, &params->params.gwinfo);
case MQTT_SN_MSG_TYPE_CONNACK:
return decode_msg_connack(buf, &params->params.connack);
case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
return decode_msg_willtopicreq(buf);
case MQTT_SN_MSG_TYPE_WILLMSGREQ:
return decode_msg_willmsgreq(buf);
case MQTT_SN_MSG_TYPE_REGISTER:
return decode_msg_register(buf, &params->params.reg);
case MQTT_SN_MSG_TYPE_REGACK:
return decode_msg_regack(buf, &params->params.regack);
case MQTT_SN_MSG_TYPE_PUBLISH:
return decode_msg_publish(buf, &params->params.publish);
case MQTT_SN_MSG_TYPE_PUBACK:
return decode_msg_puback(buf, &params->params.puback);
case MQTT_SN_MSG_TYPE_PUBREC:
return decode_msg_pubrec(buf, &params->params.pubrec);
case MQTT_SN_MSG_TYPE_PUBREL:
return decode_msg_pubrel(buf, &params->params.pubrel);
case MQTT_SN_MSG_TYPE_PUBCOMP:
return decode_msg_pubcomp(buf, &params->params.pubcomp);
case MQTT_SN_MSG_TYPE_SUBACK:
return decode_msg_suback(buf, &params->params.suback);
case MQTT_SN_MSG_TYPE_UNSUBACK:
return decode_msg_unsuback(buf, &params->params.unsuback);
case MQTT_SN_MSG_TYPE_PINGREQ:
return decode_msg_pingreq(buf);
case MQTT_SN_MSG_TYPE_PINGRESP:
return decode_msg_pingresp(buf);
case MQTT_SN_MSG_TYPE_DISCONNECT:
return decode_msg_disconnect(buf, &params->params.disconnect);
case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
return decode_msg_willtopicresp(buf, &params->params.willtopicresp);
case MQTT_SN_MSG_TYPE_WILLMSGRESP:
return decode_msg_willmsgresp(buf, &params->params.willmsgresp);
default:
LOG_ERR("Got unexpected message type %d", params->type);
return -EINVAL;
}
}

View file

@ -0,0 +1,526 @@
/*
* Copyright (c) 2022 René Beckmann
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_sn_encoder.c
*
* @brief MQTT-SN message encoder.
*/
#include "mqtt_sn_msg.h"
#include <zephyr/net/mqtt_sn.h>
#include <zephyr/logging/log.h>
LOG_MODULE_DECLARE(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
/**
* @brief Prepare a message
*
* @param buf Message struct to use
* @param sz The length of the message's payload without the length field and the type.
* @param type Message type
* @return 0 or negative error code
*/
static int prepare_message(struct net_buf_simple *buf, size_t sz, enum mqtt_sn_msg_type type)
{
/* Add one for the type field */
sz++;
/* add size of length field */
sz += (sz > 254 ? 3 : 1);
size_t maxlen = net_buf_simple_max_len(buf);
LOG_DBG("Preparing message of type %d with size %zu", type, sz);
/* Size must not be larger than an uint16_t can fit */
if (sz > UINT16_MAX) {
LOG_ERR("Message of size %zu is too large for MQTT-SN", sz);
return -EFBIG;
}
if (sz > maxlen) {
LOG_ERR("Message of size %zu does not fit in buffer of length %zu", sz, maxlen);
return -ENOMEM;
}
if (sz <= 255) {
net_buf_simple_add_u8(buf, (uint8_t)sz);
} else {
net_buf_simple_add_u8(buf, MQTT_SN_LENGTH_FIELD_EXTENDED_PREFIX);
net_buf_simple_add_be16(buf, (uint16_t)sz);
}
net_buf_simple_add_u8(buf, (uint8_t)type);
return 0;
}
static void encode_flags(struct net_buf_simple *buf, struct mqtt_sn_flags *flags)
{
uint8_t b = 0;
LOG_DBG("Encode flags %d, %d, %d, %d, %d, %d", flags->dup, flags->retain, flags->will,
flags->clean_session, flags->qos, flags->topic_type);
b |= flags->dup ? MQTT_SN_FLAGS_DUP : 0;
b |= flags->retain ? MQTT_SN_FLAGS_RETAIN : 0;
b |= flags->will ? MQTT_SN_FLAGS_WILL : 0;
b |= flags->clean_session ? MQTT_SN_FLAGS_CLEANSESSION : 0;
b |= ((flags->qos << MQTT_SN_FLAGS_SHIFT_QOS) & MQTT_SN_FLAGS_MASK_QOS);
b |= ((flags->topic_type << MQTT_SN_FLAGS_SHIFT_TOPICID_TYPE) &
MQTT_SN_FLAGS_MASK_TOPICID_TYPE);
net_buf_simple_add_u8(buf, b);
}
static int mqtt_sn_encode_msg_searchgw(struct net_buf_simple *buf,
struct mqtt_sn_param_searchgw *params)
{
size_t msgsz = 1;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_SEARCHGW);
if (err) {
return err;
}
net_buf_simple_add_u8(buf, params->radius);
return 0;
}
static int mqtt_sn_encode_msg_gwinfo(struct net_buf_simple *buf,
struct mqtt_sn_param_gwinfo *params)
{
size_t msgsz = 1 + params->gw_add.size;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_GWINFO);
if (err) {
return err;
}
net_buf_simple_add_u8(buf, params->gw_id);
net_buf_simple_add_data(buf, &params->gw_add);
return 0;
}
static int mqtt_sn_encode_msg_connect(struct net_buf_simple *buf,
struct mqtt_sn_param_connect *params)
{
size_t msgsz = 4 + params->client_id.size;
struct mqtt_sn_flags flags = {.will = params->will, .clean_session = params->clean_session};
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_CONNECT);
if (err) {
return err;
}
encode_flags(buf, &flags);
net_buf_simple_add_u8(buf, MQTT_SN_PROTOCOL_ID);
net_buf_simple_add_be16(buf, params->duration);
net_buf_simple_add_data(buf, &params->client_id);
return 0;
}
static int mqtt_sn_encode_msg_willtopic(struct net_buf_simple *buf,
struct mqtt_sn_param_willtopic *params)
{
size_t msgsz = 1 + params->topic.size;
struct mqtt_sn_flags flags = {.qos = params->qos, .retain = params->retain};
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_WILLTOPIC);
if (err) {
return err;
}
encode_flags(buf, &flags);
net_buf_simple_add_data(buf, &params->topic);
return 0;
}
static int mqtt_sn_encode_msg_willmsg(struct net_buf_simple *buf,
struct mqtt_sn_param_willmsg *params)
{
size_t msgsz = params->msg.size;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_WILLMSG);
if (err) {
return err;
}
net_buf_simple_add_data(buf, &params->msg);
return 0;
}
static int mqtt_sn_encode_msg_register(struct net_buf_simple *buf,
struct mqtt_sn_param_register *params)
{
size_t msgsz = 4 + params->topic.size;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_REGISTER);
if (err) {
return err;
}
/* When sent by the client, the topic ID is always 0x0000 */
net_buf_simple_add_be16(buf, 0x00);
net_buf_simple_add_be16(buf, params->msg_id);
net_buf_simple_add_data(buf, &params->topic);
return 0;
}
static int mqtt_sn_encode_msg_regack(struct net_buf_simple *buf,
struct mqtt_sn_param_regack *params)
{
size_t msgsz = 5;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_REGACK);
if (err) {
return err;
}
net_buf_simple_add_be16(buf, params->topic_id);
net_buf_simple_add_be16(buf, params->msg_id);
net_buf_simple_add_u8(buf, params->ret_code);
return 0;
}
static int mqtt_sn_encode_msg_publish(struct net_buf_simple *buf,
struct mqtt_sn_param_publish *params)
{
size_t msgsz = 5 + params->data.size;
struct mqtt_sn_flags flags = {.dup = params->dup,
.retain = params->retain,
.qos = params->qos,
.topic_type = params->topic_type};
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_PUBLISH);
if (err) {
return err;
}
encode_flags(buf, &flags);
net_buf_simple_add_be16(buf, params->topic_id);
if (params->qos == MQTT_SN_QOS_1 || params->qos == MQTT_SN_QOS_2) {
net_buf_simple_add_be16(buf, params->msg_id);
} else {
/* Only relevant in case of QoS levels 1 and 2, otherwise coded 0x0000. */
net_buf_simple_add_be16(buf, 0x0000);
}
net_buf_simple_add_data(buf, &params->data);
return 0;
}
static int mqtt_sn_encode_msg_puback(struct net_buf_simple *buf,
struct mqtt_sn_param_puback *params)
{
size_t msgsz = 5;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_PUBACK);
if (err) {
return err;
}
net_buf_simple_add_be16(buf, params->topic_id);
net_buf_simple_add_be16(buf, params->msg_id);
net_buf_simple_add_u8(buf, params->ret_code);
return 0;
}
static int mqtt_sn_encode_msg_pubrec(struct net_buf_simple *buf,
struct mqtt_sn_param_pubrec *params)
{
size_t msgsz = 2;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_PUBREC);
if (err) {
return err;
}
net_buf_simple_add_be16(buf, params->msg_id);
return 0;
}
static int mqtt_sn_encode_msg_pubrel(struct net_buf_simple *buf,
struct mqtt_sn_param_pubrel *params)
{
size_t msgsz = 2;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_PUBREL);
if (err) {
return err;
}
net_buf_simple_add_be16(buf, params->msg_id);
return 0;
}
static int mqtt_sn_encode_msg_pubcomp(struct net_buf_simple *buf,
struct mqtt_sn_param_pubcomp *params)
{
size_t msgsz = 2;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_PUBCOMP);
if (err) {
return err;
}
net_buf_simple_add_be16(buf, params->msg_id);
return 0;
}
static int mqtt_sn_encode_msg_subscribe(struct net_buf_simple *buf,
struct mqtt_sn_param_subscribe *params)
{
size_t msgsz = 3;
struct mqtt_sn_flags flags = {
.dup = params->dup, .qos = params->qos, .topic_type = params->topic_type};
int err;
if (params->topic_type == MQTT_SN_TOPIC_TYPE_NORMAL) {
msgsz += params->topic.topic_name.size;
} else {
msgsz += 2;
}
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_SUBSCRIBE);
if (err) {
return err;
}
encode_flags(buf, &flags);
net_buf_simple_add_be16(buf, params->msg_id);
if (params->topic_type == MQTT_SN_TOPIC_TYPE_NORMAL) {
net_buf_simple_add_data(buf, &params->topic.topic_name);
} else {
net_buf_simple_add_be16(buf, params->topic.topic_id);
}
return 0;
}
static int mqtt_sn_encode_msg_unsubscribe(struct net_buf_simple *buf,
struct mqtt_sn_param_unsubscribe *params)
{
size_t msgsz = 3;
struct mqtt_sn_flags flags = {.topic_type = params->topic_type};
if (params->topic_type == MQTT_SN_TOPIC_TYPE_NORMAL) {
msgsz += params->topic.topic_name.size;
} else {
msgsz += 2;
}
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_UNSUBSCRIBE);
if (err) {
return err;
}
encode_flags(buf, &flags);
net_buf_simple_add_be16(buf, params->msg_id);
if (params->topic_type == MQTT_SN_TOPIC_TYPE_NORMAL) {
net_buf_simple_add_data(buf, &params->topic.topic_name);
} else {
net_buf_simple_add_be16(buf, params->topic.topic_id);
}
return 0;
}
static int mqtt_sn_encode_msg_pingreq(struct net_buf_simple *buf,
struct mqtt_sn_param_pingreq *params)
{
size_t msgsz = params->client_id.size;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_PINGREQ);
if (err) {
return err;
}
if (params->client_id.size) {
net_buf_simple_add_data(buf, &params->client_id);
}
return 0;
}
static int mqtt_sn_encode_msg_pingresp(struct net_buf_simple *buf)
{
return prepare_message(buf, 0, MQTT_SN_MSG_TYPE_PINGRESP);
}
static int mqtt_sn_encode_msg_disconnect(struct net_buf_simple *buf,
struct mqtt_sn_param_disconnect *params)
{
size_t msgsz = params->duration ? 2 : 0;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_DISCONNECT);
if (err) {
return err;
}
if (params->duration) {
net_buf_simple_add_be16(buf, params->duration);
}
return 0;
}
static int mqtt_sn_encode_msg_willtopicupd(struct net_buf_simple *buf,
struct mqtt_sn_param_willtopicupd *params)
{
size_t msgsz = 0;
struct mqtt_sn_flags flags = {.qos = params->qos, .retain = params->retain};
int err;
if (params->topic.size) {
msgsz += 1 + params->topic.size;
}
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_WILLTOPICUPD);
if (err) {
return err;
}
/* If the topic is empty, send an empty message to delete the will topic & message. */
if (params->topic.size) {
encode_flags(buf, &flags);
net_buf_simple_add_data(buf, &params->topic);
}
return 0;
}
static int mqtt_sn_encode_msg_willmsgupd(struct net_buf_simple *buf,
struct mqtt_sn_param_willmsgupd *params)
{
size_t msgsz = params->msg.size;
int err;
err = prepare_message(buf, msgsz, MQTT_SN_MSG_TYPE_WILLMSGUPD);
if (err) {
return err;
}
net_buf_simple_add_data(buf, &params->msg);
return 0;
}
int mqtt_sn_encode_msg(struct net_buf_simple *buf, struct mqtt_sn_param *param)
{
int result;
if (buf->len) {
LOG_ERR("Buffer not clean - bug?");
return -EBUSY;
}
LOG_DBG("Encoding message of type %d", param->type);
switch (param->type) {
case MQTT_SN_MSG_TYPE_SEARCHGW:
result = mqtt_sn_encode_msg_searchgw(buf, &param->params.searchgw);
break;
case MQTT_SN_MSG_TYPE_GWINFO:
result = mqtt_sn_encode_msg_gwinfo(buf, &param->params.gwinfo);
break;
case MQTT_SN_MSG_TYPE_CONNECT:
result = mqtt_sn_encode_msg_connect(buf, &param->params.connect);
break;
case MQTT_SN_MSG_TYPE_WILLTOPIC:
result = mqtt_sn_encode_msg_willtopic(buf, &param->params.willtopic);
break;
case MQTT_SN_MSG_TYPE_WILLMSG:
result = mqtt_sn_encode_msg_willmsg(buf, &param->params.willmsg);
break;
case MQTT_SN_MSG_TYPE_REGISTER:
result = mqtt_sn_encode_msg_register(buf, &param->params.reg);
break;
case MQTT_SN_MSG_TYPE_REGACK:
result = mqtt_sn_encode_msg_regack(buf, &param->params.regack);
break;
case MQTT_SN_MSG_TYPE_PUBLISH:
result = mqtt_sn_encode_msg_publish(buf, &param->params.publish);
break;
case MQTT_SN_MSG_TYPE_PUBACK:
result = mqtt_sn_encode_msg_puback(buf, &param->params.puback);
break;
case MQTT_SN_MSG_TYPE_PUBREC:
result = mqtt_sn_encode_msg_pubrec(buf, &param->params.pubrec);
break;
case MQTT_SN_MSG_TYPE_PUBREL:
result = mqtt_sn_encode_msg_pubrel(buf, &param->params.pubrel);
break;
case MQTT_SN_MSG_TYPE_PUBCOMP:
result = mqtt_sn_encode_msg_pubcomp(buf, &param->params.pubcomp);
break;
case MQTT_SN_MSG_TYPE_SUBSCRIBE:
result = mqtt_sn_encode_msg_subscribe(buf, &param->params.subscribe);
break;
case MQTT_SN_MSG_TYPE_UNSUBSCRIBE:
result = mqtt_sn_encode_msg_unsubscribe(buf, &param->params.unsubscribe);
break;
case MQTT_SN_MSG_TYPE_PINGREQ:
result = mqtt_sn_encode_msg_pingreq(buf, &param->params.pingreq);
break;
case MQTT_SN_MSG_TYPE_PINGRESP:
result = mqtt_sn_encode_msg_pingresp(buf);
break;
case MQTT_SN_MSG_TYPE_DISCONNECT:
result = mqtt_sn_encode_msg_disconnect(buf, &param->params.disconnect);
break;
case MQTT_SN_MSG_TYPE_WILLTOPICUPD:
result = mqtt_sn_encode_msg_willtopicupd(buf, &param->params.willtopicupd);
break;
case MQTT_SN_MSG_TYPE_WILLMSGUPD:
result = mqtt_sn_encode_msg_willmsgupd(buf, &param->params.willmsgupd);
break;
default:
LOG_ERR("Unsupported msg type %d", param->type);
result = -ENOTSUP;
break;
}
return result;
}

View file

@ -0,0 +1,257 @@
/*
* Copyright (c) 2022 René Beckmann
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_sn_msg.h
*
* @brief Function and data structures internal to MQTT-SN module.
*/
#ifndef MQTT_SN_MSG_H_
#define MQTT_SN_MSG_H_
#include <zephyr/net/mqtt_sn.h>
#include <zephyr/net/buf.h>
#ifdef __cplusplus
extern "C" {
#endif
#define MQTT_SN_LENGTH_FIELD_EXTENDED_PREFIX 0x01
#define MQTT_SN_PROTOCOL_ID 0x01
struct mqtt_sn_flags {
bool dup;
enum mqtt_sn_qos qos;
bool retain;
bool will;
bool clean_session;
enum mqtt_sn_topic_type topic_type;
};
enum mqtt_sn_msg_type {
MQTT_SN_MSG_TYPE_ADVERTISE = 0x00,
MQTT_SN_MSG_TYPE_SEARCHGW = 0x01,
MQTT_SN_MSG_TYPE_GWINFO = 0x02,
MQTT_SN_MSG_TYPE_CONNECT = 0x04,
MQTT_SN_MSG_TYPE_CONNACK = 0x05,
MQTT_SN_MSG_TYPE_WILLTOPICREQ = 0x06,
MQTT_SN_MSG_TYPE_WILLTOPIC = 0x07,
MQTT_SN_MSG_TYPE_WILLMSGREQ = 0x08,
MQTT_SN_MSG_TYPE_WILLMSG = 0x09,
MQTT_SN_MSG_TYPE_REGISTER = 0x0A,
MQTT_SN_MSG_TYPE_REGACK = 0x0B,
MQTT_SN_MSG_TYPE_PUBLISH = 0x0C,
MQTT_SN_MSG_TYPE_PUBACK = 0x0D,
MQTT_SN_MSG_TYPE_PUBCOMP = 0x0E,
MQTT_SN_MSG_TYPE_PUBREC = 0x0F,
MQTT_SN_MSG_TYPE_PUBREL = 0x10,
MQTT_SN_MSG_TYPE_SUBSCRIBE = 0x12,
MQTT_SN_MSG_TYPE_SUBACK = 0x13,
MQTT_SN_MSG_TYPE_UNSUBSCRIBE = 0x14,
MQTT_SN_MSG_TYPE_UNSUBACK = 0x15,
MQTT_SN_MSG_TYPE_PINGREQ = 0x16,
MQTT_SN_MSG_TYPE_PINGRESP = 0x17,
MQTT_SN_MSG_TYPE_DISCONNECT = 0x18,
MQTT_SN_MSG_TYPE_WILLTOPICUPD = 0x1A,
MQTT_SN_MSG_TYPE_WILLTOPICRESP = 0x1B,
MQTT_SN_MSG_TYPE_WILLMSGUPD = 0x1C,
MQTT_SN_MSG_TYPE_WILLMSGRESP = 0x1D,
MQTT_SN_MSG_TYPE_ENCAPSULATED = 0xFE,
};
struct mqtt_sn_param_advertise {
uint8_t gw_id;
uint16_t duration;
};
struct mqtt_sn_param_searchgw {
uint8_t radius;
};
struct mqtt_sn_param_gwinfo {
uint8_t gw_id;
struct mqtt_sn_data gw_add;
};
struct mqtt_sn_param_connect {
bool will;
bool clean_session;
uint16_t duration;
struct mqtt_sn_data client_id;
};
struct mqtt_sn_param_connack {
enum mqtt_sn_return_code ret_code;
};
struct mqtt_sn_param_willtopic {
enum mqtt_sn_qos qos;
bool retain;
struct mqtt_sn_data topic;
};
struct mqtt_sn_param_willmsg {
struct mqtt_sn_data msg;
};
struct mqtt_sn_param_register {
uint16_t msg_id;
uint16_t topic_id;
struct mqtt_sn_data topic;
};
struct mqtt_sn_param_regack {
uint16_t msg_id;
uint16_t topic_id;
enum mqtt_sn_return_code ret_code;
};
struct mqtt_sn_param_publish {
bool dup;
bool retain;
enum mqtt_sn_qos qos;
enum mqtt_sn_topic_type topic_type;
uint16_t topic_id;
uint16_t msg_id;
struct mqtt_sn_data data;
};
struct mqtt_sn_param_puback {
uint16_t msg_id;
uint16_t topic_id;
enum mqtt_sn_return_code ret_code;
};
struct mqtt_sn_param_pubrec {
uint16_t msg_id;
};
struct mqtt_sn_param_pubrel {
uint16_t msg_id;
};
struct mqtt_sn_param_pubcomp {
uint16_t msg_id;
};
struct mqtt_sn_param_subscribe {
bool dup;
enum mqtt_sn_qos qos;
enum mqtt_sn_topic_type topic_type;
uint16_t msg_id;
struct {
struct mqtt_sn_data topic_name;
uint16_t topic_id;
} topic;
};
struct mqtt_sn_param_suback {
enum mqtt_sn_qos qos;
uint16_t topic_id;
uint16_t msg_id;
enum mqtt_sn_return_code ret_code;
};
struct mqtt_sn_param_unsubscribe {
enum mqtt_sn_topic_type topic_type;
uint16_t msg_id;
union {
struct mqtt_sn_data topic_name;
uint16_t topic_id;
} topic;
};
struct mqtt_sn_param_unsuback {
uint16_t msg_id;
};
struct mqtt_sn_param_pingreq {
struct mqtt_sn_data client_id;
};
struct mqtt_sn_param_disconnect {
uint16_t duration;
};
struct mqtt_sn_param_willtopicupd {
enum mqtt_sn_qos qos;
bool retain;
struct mqtt_sn_data topic;
};
struct mqtt_sn_param_willmsgupd {
struct mqtt_sn_data msg;
};
struct mqtt_sn_param_willtopicresp {
enum mqtt_sn_return_code ret_code;
};
struct mqtt_sn_param_willmsgresp {
enum mqtt_sn_return_code ret_code;
};
struct mqtt_sn_param {
enum mqtt_sn_msg_type type;
union {
struct mqtt_sn_param_advertise advertise;
struct mqtt_sn_param_searchgw searchgw;
struct mqtt_sn_param_gwinfo gwinfo;
struct mqtt_sn_param_connect connect;
struct mqtt_sn_param_connack connack;
struct mqtt_sn_param_willtopic willtopic;
struct mqtt_sn_param_willmsg willmsg;
struct mqtt_sn_param_register reg;
struct mqtt_sn_param_regack regack;
struct mqtt_sn_param_publish publish;
struct mqtt_sn_param_puback puback;
struct mqtt_sn_param_pubrec pubrec;
struct mqtt_sn_param_pubrel pubrel;
struct mqtt_sn_param_pubcomp pubcomp;
struct mqtt_sn_param_subscribe subscribe;
struct mqtt_sn_param_suback suback;
struct mqtt_sn_param_unsubscribe unsubscribe;
struct mqtt_sn_param_unsuback unsuback;
struct mqtt_sn_param_pingreq pingreq;
struct mqtt_sn_param_disconnect disconnect;
struct mqtt_sn_param_willtopicupd willtopicupd;
struct mqtt_sn_param_willmsgupd willmsgupd;
struct mqtt_sn_param_willtopicresp willtopicresp;
struct mqtt_sn_param_willmsgresp willmsgresp;
} params;
};
/**@brief MQTT-SN Flags-field bitmasks */
#define MQTT_SN_FLAGS_DUP BIT(7)
#define MQTT_SN_FLAGS_QOS_0 0
#define MQTT_SN_FLAGS_QOS_1 BIT(5)
#define MQTT_SN_FLAGS_QOS_2 BIT(6)
#define MQTT_SN_FLAGS_QOS_M1 BIT(5) | BIT(6)
#define MQTT_SN_FLAGS_MASK_QOS (BIT(5) | BIT(6))
#define MQTT_SN_FLAGS_SHIFT_QOS 5
#define MQTT_SN_FLAGS_RETAIN BIT(4)
#define MQTT_SN_FLAGS_WILL BIT(3)
#define MQTT_SN_FLAGS_CLEANSESSION BIT(2)
#define MQTT_SN_FLAGS_TOPICID_TYPE_NORMAL 0
#define MQTT_SN_FLAGS_TOPICID_TYPE_PREDEF BIT(0)
#define MQTT_SN_FLAGS_TOPICID_TYPE_SHORT BIT(1)
#define MQTT_SN_FLAGS_MASK_TOPICID_TYPE (BIT(0) | BIT(1))
#define MQTT_SN_FLAGS_SHIFT_TOPICID_TYPE 0
static inline void *net_buf_simple_add_data(struct net_buf_simple *buf, struct mqtt_sn_data *data)
{
return net_buf_simple_add_mem(buf, data->data, data->size);
}
int mqtt_sn_encode_msg(struct net_buf_simple *buf, struct mqtt_sn_param *params);
int mqtt_sn_decode_msg(struct net_buf_simple *buf, struct mqtt_sn_param *params);
#ifdef __cplusplus
}
#endif
#endif /* MQTT_SN_MSG_H_ */

View file

@ -0,0 +1,155 @@
/*
* Copyright (c) 2022 René Beckmann
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_sn_transport_udp.c
*
* @brief MQTT-SN UDP transport.
*/
#include <errno.h>
#include <zephyr/net/mqtt_sn.h>
#include <zephyr/net/net_ip.h>
#include <zephyr/net/socket.h>
#include <fcntl.h>
#include <zephyr/logging/log.h>
LOG_MODULE_DECLARE(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
static char *get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen)
{
switch (sa->sa_family) {
case AF_INET:
inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), s, maxlen);
break;
case AF_INET6:
inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), s, maxlen);
break;
default:
strncpy(s, "Unknown AF", maxlen);
break;
}
return s;
}
static int tp_udp_init(struct mqtt_sn_transport *transport)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(transport);
int err;
udp->sock = zsock_socket(PF_INET, SOCK_DGRAM, 0);
if (udp->sock < 0) {
return errno;
}
LOG_DBG("Socket %d", udp->sock);
#ifdef LOG_DBG
char ip[30], *out;
out = get_ip_str((struct sockaddr *)&udp->gwaddr, ip, sizeof(ip));
if (out != NULL) {
LOG_DBG("Connecting to IP %s:%u", out,
((struct sockaddr_in *)&udp->gwaddr)->sin_port);
}
#endif
err = zsock_connect(udp->sock, (struct sockaddr *)&udp->gwaddr, udp->gwaddrlen);
if (err < 0) {
return errno;
}
return 0;
}
static void tp_udp_deinit(struct mqtt_sn_transport *transport)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(transport);
zsock_close(udp->sock);
}
static int tp_udp_msg_send(struct mqtt_sn_client *client, void *buf, size_t sz)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(client->transport);
int rc;
LOG_HEXDUMP_DBG(buf, sz, "Sending UDP packet");
rc = zsock_send(udp->sock, buf, sz, 0);
if (rc < 0) {
return -errno;
}
if (rc != sz) {
return -EIO;
}
return 0;
}
static ssize_t tp_udp_recv(struct mqtt_sn_client *client, void *buffer, size_t length)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(client->transport);
int rc;
rc = zsock_recv(udp->sock, buffer, length, 0);
LOG_DBG("recv %d", rc);
if (rc < 0) {
return -errno;
}
LOG_HEXDUMP_DBG(buffer, rc, "recv");
return rc;
}
static int tp_udp_poll(struct mqtt_sn_client *client)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(client->transport);
int rc;
struct zsock_pollfd pollfd = {
.fd = udp->sock,
.events = ZSOCK_POLLIN,
};
rc = zsock_poll(&pollfd, 1, 0);
if (rc < 1) {
return rc;
}
LOG_DBG("revents %d", pollfd.revents & ZSOCK_POLLIN);
return pollfd.revents & ZSOCK_POLLIN;
}
int mqtt_sn_transport_udp_init(struct mqtt_sn_transport_udp *udp, struct sockaddr *gwaddr,
socklen_t addrlen)
{
if (!udp || !gwaddr || !addrlen) {
return -EINVAL;
}
memset(udp, 0, sizeof(*udp));
udp->tp = (struct mqtt_sn_transport){.init = tp_udp_init,
.deinit = tp_udp_deinit,
.msg_send = tp_udp_msg_send,
.poll = tp_udp_poll,
.recv = tp_udp_recv};
udp->sock = 0;
memcpy(&udp->gwaddr, gwaddr, addrlen);
udp->gwaddrlen = addrlen;
return 0;
}