shell: Add MQTT backend
Add MQTT backed for shell module. Signed-off-by: Yong Cong Sin <yongcong.sin@gmail.com>
This commit is contained in:
parent
f66142c535
commit
74a0ae01bc
|
@ -597,6 +597,7 @@
|
|||
/include/drivers/ptp_clock.h @tbursztyka
|
||||
/include/shared_irq.h @dcpleung @nashif @andyross
|
||||
/include/shell/ @jakub-uC @nordic-krch
|
||||
/include/shell/shell_mqtt.h @ycsin
|
||||
/include/sw_isr_table.h @dcpleung @nashif @andyross
|
||||
/include/sys_clock.h @dcpleung @nashif @andyross
|
||||
/include/sys/sys_io.h @dcpleung @nashif @andyross
|
||||
|
@ -747,6 +748,7 @@ scripts/gen_image_info.py @tejlmand
|
|||
/subsys/random/ @dleach02
|
||||
/subsys/settings/ @nvlsianpu
|
||||
/subsys/shell/ @jakub-uC @nordic-krch
|
||||
/subsys/shell/backends/shell_mqtt.c @ycsin
|
||||
/subsys/stats/ @nvlsianpu
|
||||
/subsys/storage/ @nvlsianpu
|
||||
/subsys/task_wdt/ @martinjaeger
|
||||
|
|
140
include/shell/shell_mqtt.h
Normal file
140
include/shell/shell_mqtt.h
Normal file
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Copyright (c) 2022 G-Technologies Sdn. Bhd.
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
#ifndef SHELL_MQTT_H__
|
||||
#define SHELL_MQTT_H__
|
||||
|
||||
#include <zephyr.h>
|
||||
#include <shell/shell.h>
|
||||
#include <net/socket.h>
|
||||
#include <net/net_mgmt.h>
|
||||
#include <net/net_event.h>
|
||||
#include <net/net_conn_mgr.h>
|
||||
#include <net/mqtt.h>
|
||||
#include <sys/ring_buffer.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define RX_RB_SIZE CONFIG_SHELL_MQTT_RX_BUF_SIZE
|
||||
#define TX_BUF_SIZE CONFIG_SHELL_MQTT_TX_BUF_SIZE
|
||||
#define SH_MQTT_BUFFER_SIZE 64
|
||||
#define DEVICE_ID_BIN_MAX_SIZE 3
|
||||
#define DEVICE_ID_HEX_MAX_SIZE ((DEVICE_ID_BIN_MAX_SIZE * 2) + 1)
|
||||
#define SH_MQTT_TOPIC_MAX_SIZE DEVICE_ID_HEX_MAX_SIZE + 3
|
||||
|
||||
extern const struct shell_transport_api shell_mqtt_transport_api;
|
||||
|
||||
struct shell_mqtt_tx_buf {
|
||||
/** tx buffer. */
|
||||
char buf[TX_BUF_SIZE];
|
||||
|
||||
/** Current tx buf length. */
|
||||
uint16_t len;
|
||||
};
|
||||
|
||||
/** MQTT-based shell transport. */
|
||||
struct shell_mqtt {
|
||||
char device_id[DEVICE_ID_HEX_MAX_SIZE];
|
||||
char sub_topic[SH_MQTT_TOPIC_MAX_SIZE];
|
||||
char pub_topic[SH_MQTT_TOPIC_MAX_SIZE];
|
||||
|
||||
/** Handler function registered by shell. */
|
||||
shell_transport_handler_t shell_handler;
|
||||
|
||||
struct ring_buf rx_rb;
|
||||
uint8_t rx_rb_buf[RX_RB_SIZE];
|
||||
uint8_t *rx_rb_ptr;
|
||||
|
||||
struct shell_mqtt_tx_buf tx_buf;
|
||||
|
||||
/** Context registered by shell. */
|
||||
void *shell_context;
|
||||
|
||||
/** The mqtt client struct */
|
||||
struct mqtt_client mqtt_cli;
|
||||
|
||||
/* Buffers for MQTT client. */
|
||||
struct buffer {
|
||||
uint8_t rx[SH_MQTT_BUFFER_SIZE];
|
||||
uint8_t tx[SH_MQTT_BUFFER_SIZE];
|
||||
} buf;
|
||||
|
||||
struct k_mutex lock;
|
||||
|
||||
/** MQTT Broker details. */
|
||||
struct sockaddr_storage broker;
|
||||
|
||||
struct zsock_addrinfo *haddr;
|
||||
struct zsock_pollfd fds[1];
|
||||
int nfds;
|
||||
|
||||
struct mqtt_publish_param pub_data;
|
||||
|
||||
struct net_mgmt_event_callback mgmt_cb;
|
||||
|
||||
/** work */
|
||||
struct k_work_q workq;
|
||||
struct k_work net_disconnected_work;
|
||||
struct k_work_delayable connect_dwork;
|
||||
struct k_work_delayable subscribe_dwork;
|
||||
struct k_work_delayable process_dwork;
|
||||
struct k_work_delayable publish_dwork;
|
||||
|
||||
/** MQTT connection states */
|
||||
enum sh_mqtt_transport_state {
|
||||
SHELL_MQTT_TRANSPORT_DISCONNECTED,
|
||||
SHELL_MQTT_TRANSPORT_CONNECTED,
|
||||
} transport_state;
|
||||
|
||||
/** MQTT subscription states */
|
||||
enum sh_mqtt_subscribe_state {
|
||||
SHELL_MQTT_NOT_SUBSCRIBED,
|
||||
SHELL_MQTT_SUBSCRIBED,
|
||||
} subscribe_state;
|
||||
|
||||
/** Network states */
|
||||
enum sh_mqtt_network_state {
|
||||
SHELL_MQTT_NETWORK_DISCONNECTED,
|
||||
SHELL_MQTT_NETWORK_CONNECTED,
|
||||
} network_state;
|
||||
};
|
||||
|
||||
#define SHELL_MQTT_DEFINE(_name) \
|
||||
static struct shell_mqtt _name##_shell_mqtt; \
|
||||
struct shell_transport _name = { .api = &shell_mqtt_transport_api, \
|
||||
.ctx = (struct shell_mqtt *)&_name##_shell_mqtt }
|
||||
|
||||
/**
|
||||
* @brief This function provides pointer to shell mqtt backend instance.
|
||||
*
|
||||
* Function returns pointer to the shell mqtt instance. This instance can be
|
||||
* next used with shell_execute_cmd function in order to test commands behavior.
|
||||
*
|
||||
* @returns Pointer to the shell instance.
|
||||
*/
|
||||
const struct shell *shell_backend_mqtt_get_ptr(void);
|
||||
|
||||
/**
|
||||
* @brief Function to define the device ID (devid) for which the shell mqtt backend uses as a
|
||||
* client ID when it connects to the broker. It will publish its output to devid_tx and subscribe
|
||||
* to devid_rx for input .
|
||||
*
|
||||
* @note This is a weak-linked function, and can be overridden if desired.
|
||||
*
|
||||
* @param id Pointer to the devid buffer
|
||||
* @param id_max_len Maximum size of the devid buffer defined by DEVICE_ID_HEX_MAX_SIZE
|
||||
*
|
||||
* @return true if length of devid > 0
|
||||
*/
|
||||
bool shell_mqtt_get_devid(char *id, int id_max_len);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* SHELL_MQTT_H__ */
|
|
@ -19,3 +19,8 @@ zephyr_sources_ifdef(
|
|||
CONFIG_SHELL_BACKEND_TELNET
|
||||
shell_telnet.c
|
||||
)
|
||||
|
||||
zephyr_sources_ifdef(
|
||||
CONFIG_SHELL_BACKEND_MQTT
|
||||
shell_mqtt.c
|
||||
)
|
||||
|
|
|
@ -187,6 +187,102 @@ source "subsys/logging/Kconfig.template.log_config"
|
|||
|
||||
endif # SHELL_BACKEND_RTT
|
||||
|
||||
config SHELL_BACKEND_MQTT
|
||||
bool "MQTT backend"
|
||||
depends on NET_TCP
|
||||
depends on NET_IPV4
|
||||
depends on NETWORKING
|
||||
select DNS_RESOLVER
|
||||
select HWINFO
|
||||
select MQTT_LIB
|
||||
select NET_MGMT
|
||||
select NET_MGMT_EVENT
|
||||
help
|
||||
Enable MQTT backend.
|
||||
|
||||
if SHELL_BACKEND_MQTT
|
||||
|
||||
config SHELL_MQTT_SERVER_ADDR
|
||||
string "MQTT server address"
|
||||
default "192.168.0.100"
|
||||
help
|
||||
MQTT server address.
|
||||
|
||||
config SHELL_MQTT_SERVER_PORT
|
||||
int "MQTT server port"
|
||||
default 1883
|
||||
help
|
||||
MQTT server port.
|
||||
|
||||
config SHELL_MQTT_SERVER_USERNAME
|
||||
string "MQTT server username"
|
||||
help
|
||||
MQTT server username.
|
||||
|
||||
config SHELL_MQTT_SERVER_PASSWORD
|
||||
string "MQTT server password"
|
||||
help
|
||||
MQTT server password.
|
||||
|
||||
config SHELL_MQTT_RX_BUF_SIZE
|
||||
int "RX buffer size"
|
||||
default 256
|
||||
help
|
||||
Buffer size for the MQTT data reception.
|
||||
|
||||
config SHELL_MQTT_TX_BUF_SIZE
|
||||
int "TX buffer size"
|
||||
range 32 65535
|
||||
default 256
|
||||
help
|
||||
Buffer size for the MQTT data transmission.
|
||||
|
||||
module = SHELL_BACKEND_MQTT
|
||||
default-timeout = 100
|
||||
source "subsys/shell/Kconfig.template.shell_log_queue_timeout"
|
||||
|
||||
default-size = 10
|
||||
source "subsys/shell/Kconfig.template.shell_log_queue_size"
|
||||
|
||||
choice
|
||||
prompt "Initial log level limit"
|
||||
default SHELL_MQTT_INIT_LOG_LEVEL_DEFAULT
|
||||
|
||||
config SHELL_MQTT_INIT_LOG_LEVEL_DEFAULT
|
||||
bool "System limit (LOG_MAX_LEVEL)"
|
||||
|
||||
config SHELL_MQTT_INIT_LOG_LEVEL_DBG
|
||||
bool "Debug"
|
||||
|
||||
config SHELL_MQTT_INIT_LOG_LEVEL_INF
|
||||
bool "Info"
|
||||
|
||||
config SHELL_MQTT_INIT_LOG_LEVEL_WRN
|
||||
bool "Warning"
|
||||
|
||||
config SHELL_MQTT_INIT_LOG_LEVEL_ERR
|
||||
bool "Error"
|
||||
|
||||
config SHELL_MQTT_INIT_LOG_LEVEL_NONE
|
||||
bool "None"
|
||||
|
||||
endchoice # SHELL_MQTT_INIT_LOG_LEVEL
|
||||
|
||||
config SHELL_MQTT_INIT_LOG_LEVEL
|
||||
int
|
||||
default 0 if SHELL_MQTT_INIT_LOG_LEVEL_NONE
|
||||
default 1 if SHELL_MQTT_INIT_LOG_LEVEL_ERR
|
||||
default 2 if SHELL_MQTT_INIT_LOG_LEVEL_WRN
|
||||
default 3 if SHELL_MQTT_INIT_LOG_LEVEL_INF
|
||||
default 4 if SHELL_MQTT_INIT_LOG_LEVEL_DBG
|
||||
default 5 if SHELL_MQTT_INIT_LOG_LEVEL_DEFAULT
|
||||
|
||||
module = SHELL_MQTT
|
||||
module-str = MQTT shell backend
|
||||
source "subsys/logging/Kconfig.template.log_config"
|
||||
|
||||
endif # SHELL_BACKEND_MQTT
|
||||
|
||||
config SHELL_BACKEND_TELNET
|
||||
bool "TELNET backend."
|
||||
depends on NET_TCP
|
||||
|
|
826
subsys/shell/backends/shell_mqtt.c
Normal file
826
subsys/shell/backends/shell_mqtt.c
Normal file
|
@ -0,0 +1,826 @@
|
|||
/*
|
||||
* Copyright (c) 2022 G-Technologies Sdn. Bhd.
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
#include <zephyr.h>
|
||||
#include <shell/shell_mqtt.h>
|
||||
#include <init.h>
|
||||
#include <logging/log.h>
|
||||
#include <string.h>
|
||||
#include <stdio.h>
|
||||
#include <drivers/hwinfo.h>
|
||||
|
||||
SHELL_MQTT_DEFINE(shell_transport_mqtt);
|
||||
SHELL_DEFINE(shell_mqtt, "", &shell_transport_mqtt,
|
||||
CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_SIZE,
|
||||
CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_TIMEOUT, SHELL_FLAG_OLF_CRLF);
|
||||
|
||||
LOG_MODULE_REGISTER(shell_mqtt, CONFIG_SHELL_MQTT_LOG_LEVEL);
|
||||
|
||||
#define NET_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED)
|
||||
#define CONNECT_TIMEOUT_MS 2000
|
||||
#define LISTEN_TIMEOUT_MS 500
|
||||
#define MQTT_SEND_DELAY_MS K_MSEC(100)
|
||||
#define PROCESS_INTERVAL K_SECONDS(2)
|
||||
#define SHELL_MQTT_WORKQ_STACK_SIZE 2048
|
||||
|
||||
#ifdef CONFIG_SHELL_MQTT_SERVER_USERNAME
|
||||
#define MQTT_USERNAME CONFIG_SHELL_MQTT_SERVER_USERNAME
|
||||
#else
|
||||
#define MQTT_USERNAME NULL
|
||||
#endif /* CONFIG_SHELL_MQTT_SERVER_USERNAME */
|
||||
|
||||
#ifdef CONFIG_SHELL_MQTT_SERVER_PASSWORD
|
||||
#define MQTT_PASSWORD CONFIG_SHELL_MQTT_SERVER_PASSWORD
|
||||
#else
|
||||
#define MQTT_PASSWORD NULL
|
||||
#endif /*SHELL_MQTT_SERVER_PASSWORD */
|
||||
|
||||
struct shell_mqtt *sh_mqtt;
|
||||
K_KERNEL_STACK_DEFINE(sh_mqtt_workq_stack, SHELL_MQTT_WORKQ_STACK_SIZE);
|
||||
|
||||
static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt);
|
||||
|
||||
static inline int sh_mqtt_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
|
||||
{
|
||||
return k_work_reschedule_for_queue(&sh_mqtt->workq, dwork, delay);
|
||||
}
|
||||
|
||||
static inline int sh_mqtt_work_submit(struct k_work *work)
|
||||
{
|
||||
return k_work_submit_to_queue(&sh_mqtt->workq, work);
|
||||
}
|
||||
|
||||
/* Lock the context of the shell mqtt */
|
||||
static inline int sh_mqtt_context_lock(k_timeout_t timeout)
|
||||
{
|
||||
return k_mutex_lock(&sh_mqtt->lock, timeout);
|
||||
}
|
||||
|
||||
/* Unlock the context of the shell mqtt */
|
||||
static inline void sh_mqtt_context_unlock(void)
|
||||
{
|
||||
(void)k_mutex_unlock(&sh_mqtt->lock);
|
||||
}
|
||||
|
||||
static void sh_mqtt_rx_rb_flush(void)
|
||||
{
|
||||
uint8_t c;
|
||||
uint32_t size = ring_buf_size_get(&sh_mqtt->rx_rb);
|
||||
|
||||
while (size) {
|
||||
size = ring_buf_get(&sh_mqtt->rx_rb, &c, 1U);
|
||||
}
|
||||
}
|
||||
|
||||
bool __weak shell_mqtt_get_devid(char *id, int id_max_len)
|
||||
{
|
||||
uint8_t hwinfo_id[DEVICE_ID_BIN_MAX_SIZE];
|
||||
ssize_t length;
|
||||
|
||||
length = hwinfo_get_device_id(hwinfo_id, DEVICE_ID_BIN_MAX_SIZE);
|
||||
if (length <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
(void)memset(id, 0, id_max_len);
|
||||
length = bin2hex(hwinfo_id, (size_t)length, id, id_max_len - 1);
|
||||
|
||||
return length > 0;
|
||||
}
|
||||
|
||||
static void prepare_fds(void)
|
||||
{
|
||||
if (sh_mqtt->mqtt_cli.transport.type == MQTT_TRANSPORT_NON_SECURE) {
|
||||
sh_mqtt->fds[0].fd = sh_mqtt->mqtt_cli.transport.tcp.sock;
|
||||
}
|
||||
|
||||
sh_mqtt->fds[0].events = ZSOCK_POLLIN;
|
||||
sh_mqtt->nfds = 1;
|
||||
}
|
||||
|
||||
static void clear_fds(void)
|
||||
{
|
||||
sh_mqtt->nfds = 0;
|
||||
}
|
||||
|
||||
static int wait(int timeout)
|
||||
{
|
||||
int rc = 0;
|
||||
|
||||
if (sh_mqtt->nfds > 0) {
|
||||
rc = zsock_poll(sh_mqtt->fds, sh_mqtt->nfds, timeout);
|
||||
if (rc < 0) {
|
||||
LOG_ERR("poll error: %d", errno);
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Query IP address for the broker URL */
|
||||
static int get_mqtt_broker_addrinfo(void)
|
||||
{
|
||||
int rc;
|
||||
struct zsock_addrinfo hints = { .ai_family = AF_INET,
|
||||
.ai_socktype = SOCK_STREAM,
|
||||
.ai_protocol = 0 };
|
||||
|
||||
if (sh_mqtt->haddr != NULL) {
|
||||
zsock_freeaddrinfo(sh_mqtt->haddr);
|
||||
}
|
||||
|
||||
rc = zsock_getaddrinfo(CONFIG_SHELL_MQTT_SERVER_ADDR,
|
||||
STRINGIFY(CONFIG_SHELL_MQTT_SERVER_PORT), &hints, &sh_mqtt->haddr);
|
||||
if (rc == 0) {
|
||||
LOG_INF("DNS%s resolved for %s:%d", "", CONFIG_SHELL_MQTT_SERVER_ADDR,
|
||||
CONFIG_SHELL_MQTT_SERVER_PORT);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
LOG_ERR("DNS%s resolved for %s:%d, retrying", " not", CONFIG_SHELL_MQTT_SERVER_ADDR,
|
||||
CONFIG_SHELL_MQTT_SERVER_PORT);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Close MQTT connection properly and cleanup socket */
|
||||
static void sh_mqtt_close_and_cleanup(void)
|
||||
{
|
||||
/* Initialize to negative value so that the mqtt_abort case can run */
|
||||
int rc = -1;
|
||||
|
||||
/* If both network & mqtt connected, mqtt_disconnect will send a
|
||||
* disconnection packet to the broker, it will invoke
|
||||
* mqtt_evt_handler:MQTT_EVT_DISCONNECT if success
|
||||
*/
|
||||
if ((sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
|
||||
(sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED)) {
|
||||
rc = mqtt_disconnect(&sh_mqtt->mqtt_cli);
|
||||
}
|
||||
|
||||
/* If network/mqtt disconnected, or mqtt_disconnect failed, do mqtt_abort */
|
||||
if (rc) {
|
||||
/* mqtt_abort doesn't send disconnection packet to the broker, but it
|
||||
* makes sure that the MQTT connection is aborted locally and will
|
||||
* always invoke mqtt_evt_handler:MQTT_EVT_DISCONNECT
|
||||
*/
|
||||
(void)mqtt_abort(&sh_mqtt->mqtt_cli);
|
||||
}
|
||||
|
||||
/* Cleanup socket */
|
||||
clear_fds();
|
||||
}
|
||||
|
||||
static void broker_init(void)
|
||||
{
|
||||
struct sockaddr_in *broker4 = (struct sockaddr_in *)&sh_mqtt->broker;
|
||||
|
||||
broker4->sin_family = AF_INET;
|
||||
broker4->sin_port = htons(CONFIG_SHELL_MQTT_SERVER_PORT);
|
||||
|
||||
net_ipaddr_copy(&broker4->sin_addr, &net_sin(sh_mqtt->haddr->ai_addr)->sin_addr);
|
||||
}
|
||||
|
||||
static void client_init(void)
|
||||
{
|
||||
static struct mqtt_utf8 password;
|
||||
static struct mqtt_utf8 username;
|
||||
|
||||
password.utf8 = (uint8_t *)MQTT_PASSWORD;
|
||||
password.size = strlen(MQTT_PASSWORD);
|
||||
username.utf8 = (uint8_t *)MQTT_USERNAME;
|
||||
username.size = strlen(MQTT_USERNAME);
|
||||
|
||||
mqtt_client_init(&sh_mqtt->mqtt_cli);
|
||||
|
||||
/* MQTT client configuration */
|
||||
sh_mqtt->mqtt_cli.broker = &sh_mqtt->broker;
|
||||
sh_mqtt->mqtt_cli.evt_cb = mqtt_evt_handler;
|
||||
sh_mqtt->mqtt_cli.client_id.utf8 = (uint8_t *)sh_mqtt->device_id;
|
||||
sh_mqtt->mqtt_cli.client_id.size = strlen(sh_mqtt->device_id);
|
||||
sh_mqtt->mqtt_cli.password = &password;
|
||||
sh_mqtt->mqtt_cli.user_name = &username;
|
||||
sh_mqtt->mqtt_cli.protocol_version = MQTT_VERSION_3_1_1;
|
||||
|
||||
/* MQTT buffers configuration */
|
||||
sh_mqtt->mqtt_cli.rx_buf = sh_mqtt->buf.rx;
|
||||
sh_mqtt->mqtt_cli.rx_buf_size = sizeof(sh_mqtt->buf.rx);
|
||||
sh_mqtt->mqtt_cli.tx_buf = sh_mqtt->buf.tx;
|
||||
sh_mqtt->mqtt_cli.tx_buf_size = sizeof(sh_mqtt->buf.tx);
|
||||
|
||||
/* MQTT transport configuration */
|
||||
sh_mqtt->mqtt_cli.transport.type = MQTT_TRANSPORT_NON_SECURE;
|
||||
}
|
||||
|
||||
/* Work routine to process MQTT packet and keep alive MQTT connection */
|
||||
static void sh_mqtt_process_handler(struct k_work *work)
|
||||
{
|
||||
ARG_UNUSED(work);
|
||||
int rc;
|
||||
int64_t remaining = LISTEN_TIMEOUT_MS;
|
||||
int64_t start_time = k_uptime_get();
|
||||
|
||||
if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
|
||||
LOG_DBG("%s_work while %s", "process", "network disconnected");
|
||||
return;
|
||||
}
|
||||
|
||||
/* If context can't be locked, that means net conn cb locked it */
|
||||
if (sh_mqtt_context_lock(K_NO_WAIT)) {
|
||||
/* In that case we should simply return */
|
||||
LOG_DBG("%s_work unable to lock context", "process");
|
||||
return;
|
||||
}
|
||||
|
||||
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
||||
LOG_DBG("MQTT %s", "not connected");
|
||||
goto process_error;
|
||||
}
|
||||
|
||||
if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
|
||||
LOG_DBG("%s_work while %s", "process", "MQTT not subscribed");
|
||||
goto process_error;
|
||||
}
|
||||
|
||||
LOG_DBG("MQTT %s", "Processing");
|
||||
/* Listen to the port for a duration defined by LISTEN_TIMEOUT_MS */
|
||||
while ((remaining > 0) && (sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
|
||||
(sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) &&
|
||||
(sh_mqtt->subscribe_state == SHELL_MQTT_SUBSCRIBED)) {
|
||||
LOG_DBG("Listening to socket");
|
||||
if (wait(remaining)) {
|
||||
LOG_DBG("Process socket for MQTT packet");
|
||||
rc = mqtt_input(&sh_mqtt->mqtt_cli);
|
||||
if (rc != 0) {
|
||||
LOG_ERR("%s error: %d", "processed: mqtt_input", rc);
|
||||
goto process_error;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DBG("MQTT %s", "Keepalive");
|
||||
rc = mqtt_live(&sh_mqtt->mqtt_cli);
|
||||
if (rc != 0 && rc != -EAGAIN) {
|
||||
LOG_ERR("%s error: %d", "mqtt_live", rc);
|
||||
goto process_error;
|
||||
}
|
||||
|
||||
remaining = LISTEN_TIMEOUT_MS + start_time - k_uptime_get();
|
||||
}
|
||||
|
||||
/* Reschedule the process work */
|
||||
LOG_DBG("Scheduling %s work", "process");
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, K_SECONDS(2));
|
||||
sh_mqtt_context_unlock();
|
||||
return;
|
||||
|
||||
process_error:
|
||||
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
|
||||
sh_mqtt_close_and_cleanup();
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
|
||||
sh_mqtt_context_unlock();
|
||||
}
|
||||
|
||||
static void sh_mqtt_subscribe_handler(struct k_work *work)
|
||||
{
|
||||
ARG_UNUSED(work);
|
||||
/* Subscribe config information */
|
||||
struct mqtt_topic subs_topic = { .topic = { .utf8 = sh_mqtt->sub_topic,
|
||||
.size = strlen(sh_mqtt->sub_topic) },
|
||||
.qos = MQTT_QOS_1_AT_LEAST_ONCE };
|
||||
const struct mqtt_subscription_list subs_list = { .list = &subs_topic,
|
||||
.list_count = 1U,
|
||||
.message_id = 1U };
|
||||
int rc;
|
||||
|
||||
if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
|
||||
LOG_DBG("%s_work while %s", "subscribe", "network disconnected");
|
||||
return;
|
||||
}
|
||||
|
||||
/* If context can't be locked, that means net conn cb locked it */
|
||||
if (sh_mqtt_context_lock(K_NO_WAIT)) {
|
||||
/* In that case we should simply return */
|
||||
LOG_DBG("%s_work unable to lock context", "subscribe");
|
||||
return;
|
||||
}
|
||||
|
||||
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
||||
LOG_DBG("%s_work while %s", "subscribe", "transport disconnected");
|
||||
goto subscribe_error;
|
||||
}
|
||||
|
||||
rc = mqtt_subscribe(&sh_mqtt->mqtt_cli, &subs_list);
|
||||
if (rc == 0) {
|
||||
/* Wait for mqtt's connack */
|
||||
LOG_DBG("Listening to socket");
|
||||
if (wait(CONNECT_TIMEOUT_MS)) {
|
||||
LOG_DBG("Process socket for MQTT packet");
|
||||
rc = mqtt_input(&sh_mqtt->mqtt_cli);
|
||||
if (rc != 0) {
|
||||
LOG_ERR("%s error: %d", "subscribe: mqtt_input", rc);
|
||||
goto subscribe_error;
|
||||
}
|
||||
}
|
||||
|
||||
/* No suback, fail */
|
||||
if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
|
||||
goto subscribe_error;
|
||||
}
|
||||
|
||||
LOG_DBG("Scheduling MQTT process work");
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, PROCESS_INTERVAL);
|
||||
sh_mqtt_context_unlock();
|
||||
|
||||
LOG_INF("Logs will be published to: %s", log_strdup(sh_mqtt->pub_topic));
|
||||
LOG_INF("Subscribing shell cmds from: %s", log_strdup(sh_mqtt->sub_topic));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
subscribe_error:
|
||||
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "subscribe");
|
||||
sh_mqtt_close_and_cleanup();
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
|
||||
sh_mqtt_context_unlock();
|
||||
}
|
||||
|
||||
/* Work routine to connect to MQTT */
|
||||
static void sh_mqtt_connect_handler(struct k_work *work)
|
||||
{
|
||||
ARG_UNUSED(work);
|
||||
int rc;
|
||||
|
||||
if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
|
||||
LOG_DBG("%s_work while %s", "connect", "network disconnected");
|
||||
return;
|
||||
}
|
||||
|
||||
/* If context can't be locked, that means net conn cb locked it */
|
||||
if (sh_mqtt_context_lock(K_NO_WAIT)) {
|
||||
/* In that case we should simply return */
|
||||
LOG_DBG("%s_work unable to lock context", "connect");
|
||||
return;
|
||||
}
|
||||
|
||||
if (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) {
|
||||
__ASSERT(0, "MQTT shouldn't be already connected");
|
||||
LOG_ERR("MQTT shouldn't be already connected");
|
||||
goto connect_error;
|
||||
}
|
||||
|
||||
/* Resolve the broker URL */
|
||||
LOG_DBG("Resolving DNS");
|
||||
rc = get_mqtt_broker_addrinfo();
|
||||
if (rc) {
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
|
||||
sh_mqtt_context_unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_DBG("Initializing MQTT client");
|
||||
broker_init();
|
||||
client_init();
|
||||
|
||||
/* Try to connect to mqtt */
|
||||
LOG_DBG("Connecting to MQTT broker");
|
||||
rc = mqtt_connect(&sh_mqtt->mqtt_cli);
|
||||
if (rc != 0) {
|
||||
LOG_ERR("%s error: %d", "mqtt_connect", rc);
|
||||
goto connect_error;
|
||||
}
|
||||
|
||||
/* Prepare port config */
|
||||
LOG_DBG("Preparing socket");
|
||||
prepare_fds();
|
||||
|
||||
/* Wait for mqtt's connack */
|
||||
LOG_DBG("Listening to socket");
|
||||
if (wait(CONNECT_TIMEOUT_MS)) {
|
||||
LOG_DBG("Process socket for MQTT packet");
|
||||
rc = mqtt_input(&sh_mqtt->mqtt_cli);
|
||||
if (rc != 0) {
|
||||
LOG_ERR("%s error: %d", "connect: mqtt_input", rc);
|
||||
goto connect_error;
|
||||
}
|
||||
}
|
||||
|
||||
/* No connack, fail */
|
||||
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
||||
goto connect_error;
|
||||
}
|
||||
|
||||
LOG_DBG("Scheduling %s work", "subscribe");
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->subscribe_dwork, K_SECONDS(2));
|
||||
sh_mqtt_context_unlock();
|
||||
return;
|
||||
|
||||
connect_error:
|
||||
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
|
||||
sh_mqtt_close_and_cleanup();
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
|
||||
sh_mqtt_context_unlock();
|
||||
}
|
||||
|
||||
static int sh_mqtt_publish(uint8_t *data, uint32_t len)
|
||||
{
|
||||
sh_mqtt->pub_data.message.payload.data = data;
|
||||
sh_mqtt->pub_data.message.payload.len = len;
|
||||
sh_mqtt->pub_data.message_id++;
|
||||
|
||||
return mqtt_publish(&sh_mqtt->mqtt_cli, &sh_mqtt->pub_data);
|
||||
}
|
||||
|
||||
static int sh_mqtt_publish_tx_buf(bool is_work)
|
||||
{
|
||||
int rc;
|
||||
|
||||
rc = sh_mqtt_publish(&sh_mqtt->tx_buf.buf[0], sh_mqtt->tx_buf.len);
|
||||
memset(&sh_mqtt->tx_buf, 0, sizeof(sh_mqtt->tx_buf));
|
||||
if (rc != 0) {
|
||||
LOG_ERR("MQTT publish error: %d", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Arbitrary delay to not kill the session */
|
||||
if (!is_work) {
|
||||
k_sleep(MQTT_SEND_DELAY_MS);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void sh_mqtt_publish_handler(struct k_work *work)
|
||||
{
|
||||
ARG_UNUSED(work);
|
||||
int rc;
|
||||
|
||||
(void)sh_mqtt_context_lock(K_FOREVER);
|
||||
|
||||
rc = sh_mqtt_publish_tx_buf(true);
|
||||
if (rc != 0) {
|
||||
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "publish");
|
||||
sh_mqtt_close_and_cleanup();
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
|
||||
}
|
||||
|
||||
sh_mqtt_context_unlock();
|
||||
}
|
||||
|
||||
static void cancel_dworks_and_cleanup(void)
|
||||
{
|
||||
(void)k_work_cancel_delayable(&sh_mqtt->connect_dwork);
|
||||
(void)k_work_cancel_delayable(&sh_mqtt->subscribe_dwork);
|
||||
(void)k_work_cancel_delayable(&sh_mqtt->process_dwork);
|
||||
(void)k_work_cancel_delayable(&sh_mqtt->publish_dwork);
|
||||
sh_mqtt_close_and_cleanup();
|
||||
}
|
||||
|
||||
static void net_disconnect_handler(struct k_work *work)
|
||||
{
|
||||
ARG_UNUSED(work);
|
||||
|
||||
LOG_WRN("Network %s", "disconnected");
|
||||
sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
|
||||
|
||||
/* Stop all possible work */
|
||||
(void)sh_mqtt_context_lock(K_FOREVER);
|
||||
cancel_dworks_and_cleanup();
|
||||
sh_mqtt_context_unlock();
|
||||
/* If the transport was requested, the connect work will be rescheduled
|
||||
* when internet is connected again
|
||||
*/
|
||||
}
|
||||
|
||||
/* Network connection event handler */
|
||||
static void network_evt_handler(struct net_mgmt_event_callback *cb, uint32_t mgmt_event,
|
||||
struct net_if *iface)
|
||||
{
|
||||
if (mgmt_event == NET_EVENT_L4_CONNECTED &&
|
||||
sh_mqtt->network_state == SHELL_MQTT_NETWORK_DISCONNECTED) {
|
||||
LOG_WRN("Network %s", "connected");
|
||||
sh_mqtt->network_state = SHELL_MQTT_NETWORK_CONNECTED;
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
|
||||
} else if (mgmt_event == NET_EVENT_L4_DISCONNECTED &&
|
||||
sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) {
|
||||
(void)sh_mqtt_work_submit(&sh_mqtt->net_disconnected_work);
|
||||
}
|
||||
}
|
||||
|
||||
static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
|
||||
{
|
||||
switch (evt->type) {
|
||||
case MQTT_EVT_CONNACK:
|
||||
if (evt->result != 0) {
|
||||
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
|
||||
LOG_ERR("MQTT %s %d", "connect failed", evt->result);
|
||||
break;
|
||||
}
|
||||
|
||||
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_CONNECTED;
|
||||
LOG_WRN("MQTT %s", "client connected!");
|
||||
|
||||
break;
|
||||
case MQTT_EVT_SUBACK:
|
||||
if (evt->result != 0) {
|
||||
LOG_ERR("MQTT subscribe: %s", "error");
|
||||
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
||||
break;
|
||||
}
|
||||
|
||||
LOG_WRN("MQTT subscribe: %s", "ok");
|
||||
sh_mqtt->subscribe_state = SHELL_MQTT_SUBSCRIBED;
|
||||
break;
|
||||
|
||||
case MQTT_EVT_UNSUBACK:
|
||||
LOG_DBG("UNSUBACK packet id: %u", evt->param.suback.message_id);
|
||||
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
||||
break;
|
||||
|
||||
case MQTT_EVT_DISCONNECT:
|
||||
LOG_WRN("MQTT disconnected: %d", evt->result);
|
||||
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
|
||||
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
||||
break;
|
||||
|
||||
case MQTT_EVT_PUBLISH: {
|
||||
const struct mqtt_publish_param *pub = &evt->param.publish;
|
||||
uint32_t size, payload_left, rb_free_space;
|
||||
|
||||
payload_left = pub->message.payload.len;
|
||||
rb_free_space = ring_buf_space_get(&sh_mqtt->rx_rb);
|
||||
|
||||
LOG_DBG("MQTT publish received %d, %d bytes", evt->result, payload_left);
|
||||
LOG_DBG(" id: %d, qos: %d", pub->message_id, pub->message.topic.qos);
|
||||
LOG_DBG(" item: %s", log_strdup(pub->message.topic.topic.utf8));
|
||||
|
||||
/* For MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */
|
||||
if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
|
||||
struct mqtt_puback_param puback = { .message_id = pub->message_id };
|
||||
|
||||
(void)mqtt_publish_qos1_ack(client, &puback);
|
||||
}
|
||||
|
||||
while (payload_left > 0) {
|
||||
/* Attempt to claim `payload_left` bytes of buffer in rb */
|
||||
size = ring_buf_put_claim(&sh_mqtt->rx_rb, &sh_mqtt->rx_rb_ptr,
|
||||
payload_left);
|
||||
/* Read `size` bytes of payload from mqtt */
|
||||
size = mqtt_read_publish_payload_blocking(client, sh_mqtt->rx_rb_ptr, size);
|
||||
|
||||
/* errno value, return */
|
||||
if (size < 0) {
|
||||
(void)ring_buf_put_finish(&sh_mqtt->rx_rb, 0U);
|
||||
sh_mqtt_rx_rb_flush();
|
||||
return;
|
||||
}
|
||||
|
||||
/* Indicate that `size` bytes of payload has been written into rb */
|
||||
(void)ring_buf_put_finish(&sh_mqtt->rx_rb, size);
|
||||
/* Update `payload_left` */
|
||||
payload_left -= size;
|
||||
/* Tells the shell that we have new data for it */
|
||||
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
|
||||
/* Arbitrary sleep for the shell to do its thing */
|
||||
(void)k_msleep(100);
|
||||
}
|
||||
|
||||
/* Shell won't execute the cmds without \r\n */
|
||||
while (true) {
|
||||
/* Check if rb's free space is enough to fit in \r\n */
|
||||
size = ring_buf_space_get(&sh_mqtt->rx_rb);
|
||||
if (size >= sizeof("\r\n")) {
|
||||
(void)ring_buf_put(&sh_mqtt->rx_rb, "\r\n", sizeof("\r\n"));
|
||||
break;
|
||||
}
|
||||
/* Arbitrary sleep for the shell to do its thing */
|
||||
(void)k_msleep(100);
|
||||
}
|
||||
|
||||
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
|
||||
break;
|
||||
}
|
||||
|
||||
case MQTT_EVT_PUBACK:
|
||||
if (evt->result != 0) {
|
||||
LOG_ERR("MQTT PUBACK error %d", evt->result);
|
||||
break;
|
||||
}
|
||||
|
||||
LOG_DBG("PUBACK packet id: %u", evt->param.puback.message_id);
|
||||
break;
|
||||
|
||||
case MQTT_EVT_PINGRESP:
|
||||
LOG_DBG("PINGRESP packet");
|
||||
break;
|
||||
|
||||
default:
|
||||
LOG_DBG("MQTT event received %d", evt->type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static int init(const struct shell_transport *transport, const void *config,
|
||||
shell_transport_handler_t evt_handler, void *context)
|
||||
{
|
||||
sh_mqtt = (struct shell_mqtt *)transport->ctx;
|
||||
|
||||
(void)memset(sh_mqtt, 0, sizeof(struct shell_mqtt));
|
||||
|
||||
(void)k_mutex_init(&sh_mqtt->lock);
|
||||
|
||||
if (!shell_mqtt_get_devid(sh_mqtt->device_id, DEVICE_ID_HEX_MAX_SIZE)) {
|
||||
LOG_ERR("Unable to get device identity, using dummy value");
|
||||
(void)snprintf(sh_mqtt->device_id, sizeof("dummy"), "dummy");
|
||||
}
|
||||
|
||||
LOG_DBG("Client ID is %s", log_strdup(sh_mqtt->device_id));
|
||||
|
||||
(void)snprintf(sh_mqtt->pub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_tx", sh_mqtt->device_id);
|
||||
(void)snprintf(sh_mqtt->sub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_rx", sh_mqtt->device_id);
|
||||
|
||||
ring_buf_init(&sh_mqtt->rx_rb, RX_RB_SIZE, sh_mqtt->rx_rb_buf);
|
||||
|
||||
LOG_DBG("Initializing shell MQTT backend");
|
||||
|
||||
sh_mqtt->shell_handler = evt_handler;
|
||||
sh_mqtt->shell_context = context;
|
||||
|
||||
sh_mqtt->pub_data.message.topic.qos = MQTT_QOS_0_AT_MOST_ONCE;
|
||||
sh_mqtt->pub_data.message.topic.topic.utf8 = (uint8_t *)sh_mqtt->pub_topic;
|
||||
sh_mqtt->pub_data.message.topic.topic.size =
|
||||
strlen(sh_mqtt->pub_data.message.topic.topic.utf8);
|
||||
sh_mqtt->pub_data.dup_flag = 0U;
|
||||
sh_mqtt->pub_data.retain_flag = 0U;
|
||||
|
||||
/* Initialize the work queue */
|
||||
k_work_queue_init(&sh_mqtt->workq);
|
||||
k_work_queue_start(&sh_mqtt->workq, sh_mqtt_workq_stack,
|
||||
K_KERNEL_STACK_SIZEOF(sh_mqtt_workq_stack), K_PRIO_COOP(7), NULL);
|
||||
(void)k_thread_name_set(&sh_mqtt->workq.thread, "sh_mqtt_workq");
|
||||
k_work_init(&sh_mqtt->net_disconnected_work, net_disconnect_handler);
|
||||
k_work_init_delayable(&sh_mqtt->connect_dwork, sh_mqtt_connect_handler);
|
||||
k_work_init_delayable(&sh_mqtt->subscribe_dwork, sh_mqtt_subscribe_handler);
|
||||
k_work_init_delayable(&sh_mqtt->process_dwork, sh_mqtt_process_handler);
|
||||
k_work_init_delayable(&sh_mqtt->publish_dwork, sh_mqtt_publish_handler);
|
||||
|
||||
LOG_DBG("Initializing listener for network");
|
||||
net_mgmt_init_event_callback(&sh_mqtt->mgmt_cb, network_evt_handler, NET_EVENT_MASK);
|
||||
|
||||
sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
|
||||
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
|
||||
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int uninit(const struct shell_transport *transport)
|
||||
{
|
||||
ARG_UNUSED(transport);
|
||||
|
||||
/* Not initialized yet */
|
||||
if (sh_mqtt == NULL) {
|
||||
return -ENODEV;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int enable(const struct shell_transport *transport, bool blocking)
|
||||
{
|
||||
ARG_UNUSED(transport);
|
||||
ARG_UNUSED(blocking);
|
||||
|
||||
/* Not initialized yet */
|
||||
if (sh_mqtt == NULL) {
|
||||
return -ENODEV;
|
||||
}
|
||||
|
||||
/* Listen for network connection status */
|
||||
net_mgmt_add_event_callback(&sh_mqtt->mgmt_cb);
|
||||
net_conn_mgr_resend_status();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int write(const struct shell_transport *transport, const void *data, size_t length,
|
||||
size_t *cnt)
|
||||
{
|
||||
ARG_UNUSED(transport);
|
||||
int rc = 0;
|
||||
struct k_work_sync ws;
|
||||
size_t copy_len;
|
||||
|
||||
*cnt = 0;
|
||||
|
||||
/* Not initialized yet */
|
||||
if (sh_mqtt == NULL) {
|
||||
return -ENODEV;
|
||||
}
|
||||
|
||||
/* Not connected to broker */
|
||||
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
||||
goto out;
|
||||
}
|
||||
|
||||
(void)k_work_cancel_delayable_sync(&sh_mqtt->publish_dwork, &ws);
|
||||
|
||||
do {
|
||||
if (sh_mqtt->tx_buf.len + length - *cnt > TX_BUF_SIZE) {
|
||||
copy_len = TX_BUF_SIZE - sh_mqtt->tx_buf.len;
|
||||
} else {
|
||||
copy_len = length - *cnt;
|
||||
}
|
||||
|
||||
memcpy(sh_mqtt->tx_buf.buf + sh_mqtt->tx_buf.len, (uint8_t *)data + *cnt, copy_len);
|
||||
sh_mqtt->tx_buf.len += copy_len;
|
||||
|
||||
/* Send the data immediately if the buffer is full */
|
||||
if (sh_mqtt->tx_buf.len == TX_BUF_SIZE) {
|
||||
rc = sh_mqtt_publish_tx_buf(false);
|
||||
if (rc != 0) {
|
||||
sh_mqtt_close_and_cleanup();
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork,
|
||||
K_SECONDS(2));
|
||||
*cnt = length;
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
*cnt += copy_len;
|
||||
} while (*cnt < length);
|
||||
|
||||
if (sh_mqtt->tx_buf.len) {
|
||||
(void)sh_mqtt_work_reschedule(&sh_mqtt->publish_dwork, MQTT_SEND_DELAY_MS);
|
||||
}
|
||||
|
||||
/* Inform shell that it is ready for next TX */
|
||||
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, sh_mqtt->shell_context);
|
||||
|
||||
out:
|
||||
/* We will always assume that we sent everything */
|
||||
*cnt = length;
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int read(const struct shell_transport *transport, void *data, size_t length, size_t *cnt)
|
||||
{
|
||||
ARG_UNUSED(transport);
|
||||
|
||||
/* Not initialized yet */
|
||||
if (sh_mqtt == NULL) {
|
||||
return -ENODEV;
|
||||
}
|
||||
|
||||
/* Not subscribed yet */
|
||||
if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
|
||||
*cnt = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
*cnt = ring_buf_get(&sh_mqtt->rx_rb, data, length);
|
||||
|
||||
/* Inform the shell if there are still data in the rb */
|
||||
if (ring_buf_size_get(&sh_mqtt->rx_rb) > 0) {
|
||||
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
const struct shell_transport_api shell_mqtt_transport_api = { .init = init,
|
||||
.uninit = uninit,
|
||||
.enable = enable,
|
||||
.write = write,
|
||||
.read = read };
|
||||
|
||||
static int enable_shell_mqtt(const struct device *arg)
|
||||
{
|
||||
ARG_UNUSED(arg);
|
||||
|
||||
bool log_backend = CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > 0;
|
||||
uint32_t level = (CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
|
||||
CONFIG_LOG_MAX_LEVEL :
|
||||
CONFIG_SHELL_MQTT_INIT_LOG_LEVEL;
|
||||
static const struct shell_backend_config_flags cfg_flags = {
|
||||
.insert_mode = 0,
|
||||
.echo = 0,
|
||||
.obscure = 0,
|
||||
.mode_delete = 0,
|
||||
.use_colors = 0,
|
||||
.use_vt100 = 0,
|
||||
};
|
||||
|
||||
return shell_init(&shell_mqtt, NULL, cfg_flags, log_backend, level);
|
||||
}
|
||||
|
||||
/* Function is used for testing purposes */
|
||||
const struct shell *shell_backend_mqtt_get_ptr(void)
|
||||
{
|
||||
return &shell_mqtt;
|
||||
}
|
||||
|
||||
SYS_INIT(enable_shell_mqtt, APPLICATION, CONFIG_APPLICATION_INIT_PRIORITY);
|
Loading…
Reference in a new issue