net: lwm2m: LwM2M Pause and resume support

New API for suspend and resume LwM2M engine.
New event LWM2M_RD_CLIENT_EVENT_ENGINE_SUSPENDED for indicate
application that engine is suspended.

Simplify stack suspend and resume state same time for queue mode.

New CONFIG_LWM2M_RD_CLIENT_SUSPEND_SOCKET_AT_IDLE for enable skip socket
close at RX_OFF_IDDLE state that socket is only suspended and close is
called only when connection is resumed.

Signed-off-by: Juha Heiskanen <juha.heiskanen@nordicsemi.no>
This commit is contained in:
Juha Heiskanen 2022-08-03 12:34:36 +03:00 committed by Carles Cufí
parent 0356e1a925
commit ed5f3cdf06
7 changed files with 290 additions and 83 deletions

View file

@ -168,13 +168,13 @@ struct lwm2m_ctx {
*/
bool use_dtls;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
/**
* Flag to indicate that the socket connection is suspended.
* With queue mode, this will tell if there is a need to reconnect.
*/
bool connection_suspended;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
/**
* Flag to indicate that the client is buffering Notifications and Send messages.
* True value buffer Notifications and Send messages.
@ -1229,6 +1229,7 @@ enum lwm2m_rd_client_event {
LWM2M_RD_CLIENT_EVENT_DEREGISTER_FAILURE,
LWM2M_RD_CLIENT_EVENT_DISCONNECT,
LWM2M_RD_CLIENT_EVENT_QUEUE_MODE_RX_OFF,
LWM2M_RD_CLIENT_EVENT_ENGINE_SUSPENDED,
LWM2M_RD_CLIENT_EVENT_NETWORK_ERROR,
};
@ -1284,6 +1285,29 @@ int lwm2m_rd_client_start(struct lwm2m_ctx *client_ctx, const char *ep_name,
int lwm2m_rd_client_stop(struct lwm2m_ctx *client_ctx,
lwm2m_ctx_event_cb_t event_cb, bool deregister);
/**
* @brief Suspend the LwM2M engine Thread
*
* Suspend LwM2M engine. Use case could be when network connection is down.
* LwM2M Engine indicate before it suspend by
* LWM2M_RD_CLIENT_EVENT_ENGINE_SUSPENDED event.
*
* @return 0 for success or negative in case of error.
*/
int lwm2m_engine_pause(void);
/**
* @brief Resume the LwM2M engine thread
*
* Resume suspended LwM2M engine. After successful resume call engine will do
* full registration or registration update based on suspended time.
* Event's LWM2M_RD_CLIENT_EVENT_REGISTRATION_COMPLETE or WM2M_RD_CLIENT_EVENT_REG_UPDATE_COMPLETE
* indicate that client is connected to server.
*
* @return 0 for success or negative in case of error.
*/
int lwm2m_engine_resume(void);
/**
* @brief Trigger a Registration Update of the LwM2M RD Client
*/

View file

@ -191,6 +191,12 @@ config LWM2M_TLS_SESSION_CACHING
help
Enabling this only when feature is supported in TLS library.
config LWM2M_RD_CLIENT_SUSPEND_SOCKET_AT_IDLE
bool "Socket close is skipped at RX_ON_IDLE state"
depends on LWM2M_RD_CLIENT_SUPPORT
help
This config suspend socket handler which skip socket polling process.
config LWM2M_RD_CLIENT_SUPPORT
bool "support for LWM2M client bootstrap/registration state machine"
default y

View file

@ -80,6 +80,9 @@ LOG_MODULE_REGISTER(LOG_MODULE_NAME);
static struct lwm2m_obj_path_list observe_paths[LWM2M_ENGINE_MAX_OBSERVER_PATH];
#define MAX_PERIODIC_SERVICE 10
static k_tid_t engine_thread_id;
static bool suspend_engine_thread;
struct service_node {
sys_snode_t node;
k_work_handler_t service_work;
@ -109,6 +112,8 @@ int lwm2m_sock_nfds(void) { return sock_nfds; }
struct lwm2m_block_context *lwm2m_block1_context(void) { return block1_contexts; }
static void lwm2m_socket_update(struct lwm2m_ctx *ctx);
/* for debugging: to print IP addresses */
char *lwm2m_sprint_ip_addr(const struct sockaddr *addr)
{
@ -161,26 +166,101 @@ char *sprint_token(const uint8_t *token, uint8_t tkl)
/* utility functions */
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
int lwm2m_engine_connection_resume(struct lwm2m_ctx *client_ctx)
int lwm2m_open_socket(struct lwm2m_ctx *client_ctx)
{
#ifdef CONFIG_LWM2M_DTLS_SUPPORT
if (!client_ctx->use_dtls) {
return 0;
if (client_ctx->sock_fd < 0) {
/* open socket */
if (IS_ENABLED(CONFIG_LWM2M_DTLS_SUPPORT) && client_ctx->use_dtls) {
client_ctx->sock_fd = socket(client_ctx->remote_addr.sa_family, SOCK_DGRAM,
IPPROTO_DTLS_1_2);
} else {
client_ctx->sock_fd =
socket(client_ctx->remote_addr.sa_family, SOCK_DGRAM, IPPROTO_UDP);
}
if (client_ctx->sock_fd < 0) {
LOG_ERR("Failed to create socket: %d", errno);
return -errno;
}
lwm2m_socket_update(client_ctx);
}
return 0;
}
int lwm2m_close_socket(struct lwm2m_ctx *client_ctx)
{
int ret = 0;
if (client_ctx->sock_fd >= 0) {
ret = close(client_ctx->sock_fd);
if (ret) {
LOG_ERR("Failed to close socket: %d", errno);
ret = -errno;
return ret;
}
client_ctx->sock_fd = -1;
client_ctx->connection_suspended = true;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
/* Enable Queue mode buffer store */
client_ctx->buffer_client_messages = true;
#endif
lwm2m_socket_update(client_ctx);
}
return ret;
}
int lwm2m_socket_suspend(struct lwm2m_ctx *client_ctx)
{
int ret = 0;
if (client_ctx->sock_fd >= 0 && !client_ctx->connection_suspended) {
int socket_temp_id = client_ctx->sock_fd;
client_ctx->sock_fd = -1;
client_ctx->connection_suspended = true;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
/* Enable Queue mode buffer store */
client_ctx->buffer_client_messages = true;
#endif
lwm2m_socket_update(client_ctx);
client_ctx->sock_fd = socket_temp_id;
}
return ret;
}
int lwm2m_engine_connection_resume(struct lwm2m_ctx *client_ctx)
{
int ret;
if (client_ctx->connection_suspended) {
lwm2m_close_socket(client_ctx);
client_ctx->connection_suspended = false;
ret = lwm2m_open_socket(client_ctx);
if (ret) {
return ret;
}
if (!client_ctx->use_dtls) {
return 0;
}
LOG_DBG("Resume suspended connection");
return lwm2m_socket_start(client_ctx);
}
#endif
return 0;
}
#endif
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
int lwm2m_push_queued_buffers(struct lwm2m_ctx *client_ctx)
{
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
client_ctx->buffer_client_messages = false;
while (!sys_slist_is_empty(&client_ctx->queued_messages)) {
sys_snode_t *msg_node = sys_slist_get(&client_ctx->queued_messages);
@ -192,9 +272,9 @@ int lwm2m_push_queued_buffers(struct lwm2m_ctx *client_ctx)
msg = SYS_SLIST_CONTAINER(msg_node, msg, node);
sys_slist_append(&msg->ctx->pending_sends, &msg->node);
}
#endif
return 0;
}
#endif
bool lwm2m_engine_bootstrap_override(struct lwm2m_ctx *client_ctx, struct lwm2m_obj_path *path)
{
@ -434,43 +514,6 @@ static int32_t lwm2m_engine_service(const int64_t timestamp)
return engine_next_service_timeout_ms(ENGINE_UPDATE_INTERVAL_MS, timestamp);
}
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
int lwm2m_engine_close_socket_connection(struct lwm2m_ctx *client_ctx)
{
int ret = 0;
/* Enable Queue mode buffer store */
client_ctx->buffer_client_messages = true;
#ifdef CONFIG_LWM2M_DTLS_SUPPORT
if (!client_ctx->use_dtls) {
return 0;
}
if (client_ctx->sock_fd >= 0) {
ret = close(client_ctx->sock_fd);
if (ret) {
LOG_ERR("Failed to close socket: %d", errno);
ret = -errno;
return ret;
}
client_ctx->sock_fd = -1;
client_ctx->connection_suspended = true;
}
/* Open socket again that Observation and re-send functionality works */
client_ctx->sock_fd =
socket(client_ctx->remote_addr.sa_family, SOCK_DGRAM, IPPROTO_DTLS_1_2);
if (client_ctx->sock_fd < 0) {
LOG_ERR("Failed to create socket: %d", errno);
return -errno;
}
#endif
return ret;
}
#endif
/* LwM2M Socket Integration */
int lwm2m_socket_add(struct lwm2m_ctx *ctx)
@ -487,6 +530,17 @@ int lwm2m_socket_add(struct lwm2m_ctx *ctx)
return 0;
}
static void lwm2m_socket_update(struct lwm2m_ctx *ctx)
{
for (int i = 0; i < sock_nfds; i++) {
if (sock_ctx[i] != ctx) {
continue;
}
sock_fds[i].fd = ctx->sock_fd;
return;
}
}
void lwm2m_socket_del(struct lwm2m_ctx *ctx)
{
for (int i = 0; i < sock_nfds; i++) {
@ -519,8 +573,8 @@ static void check_notifications(struct lwm2m_ctx *ctx, const int64_t timestamp)
if (!obs->event_timestamp || timestamp < obs->event_timestamp) {
continue;
}
/* Check That There is not pending process and client is registred */
if (obs->active_tx_operation || !lwm2m_rd_client_is_registred(ctx)) {
/* Check That There is not pending process*/
if (obs->active_tx_operation) {
continue;
}
@ -602,6 +656,18 @@ static void socket_loop(void)
int32_t timeout, next_retransmit;
while (1) {
/* Check is Thread Suspend Requested */
if (suspend_engine_thread) {
#if defined(CONFIG_LWM2M_RD_CLIENT_SUPPORT)
lwm2m_rd_client_pause();
#endif
suspend_engine_thread = false;
k_thread_suspend(engine_thread_id);
#if defined(CONFIG_LWM2M_RD_CLIENT_SUPPORT)
lwm2m_rd_client_resume();
#endif
}
timestamp = k_uptime_get();
timeout = lwm2m_engine_service(timestamp);
@ -618,7 +684,8 @@ static void socket_loop(void)
timeout = next_retransmit;
}
}
if (sys_slist_is_empty(&sock_ctx[i]->pending_sends)) {
if (sys_slist_is_empty(&sock_ctx[i]->pending_sends) &&
lwm2m_rd_client_is_registred(sock_ctx[i])) {
check_notifications(sock_ctx[i], timestamp);
}
}
@ -638,6 +705,11 @@ static void socket_loop(void)
}
for (i = 0; i < sock_nfds; i++) {
if (sock_ctx[i]->sock_fd < 0) {
continue;
}
if ((sock_fds[i].revents & POLLERR) || (sock_fds[i].revents & POLLNVAL) ||
(sock_fds[i].revents & POLLHUP)) {
LOG_ERR("Poll reported a socket error, %02x.", sock_fds[i].revents);
@ -732,23 +804,12 @@ int lwm2m_socket_start(struct lwm2m_ctx *client_ctx)
if (client_ctx->sock_fd < 0) {
allocate_socket = true;
#if defined(CONFIG_LWM2M_DTLS_SUPPORT)
if (client_ctx->use_dtls) {
client_ctx->sock_fd = socket(client_ctx->remote_addr.sa_family, SOCK_DGRAM,
IPPROTO_DTLS_1_2);
} else
#endif /* CONFIG_LWM2M_DTLS_SUPPORT */
{
client_ctx->sock_fd =
socket(client_ctx->remote_addr.sa_family, SOCK_DGRAM, IPPROTO_UDP);
ret = lwm2m_open_socket(client_ctx);
if (ret) {
return ret;
}
}
if (client_ctx->sock_fd < 0) {
LOG_ERR("Failed to create socket: %d", errno);
return -errno;
}
#if defined(CONFIG_LWM2M_DTLS_SUPPORT)
if (client_ctx->use_dtls) {
sec_tag_t tls_tag_list[] = {
@ -859,6 +920,44 @@ int lwm2m_engine_start(struct lwm2m_ctx *client_ctx)
return lwm2m_socket_start(client_ctx);
}
int lwm2m_engine_pause(void)
{
char buffer[32];
const char *str;
str = k_thread_state_str(engine_thread_id, buffer, sizeof(buffer));
if (suspend_engine_thread || !strcmp(str, "suspended")) {
LOG_WRN("Engine thread already suspended");
return 0;
}
suspend_engine_thread = true;
while (strcmp(str, "suspended")) {
k_msleep(10);
str = k_thread_state_str(engine_thread_id, buffer, sizeof(buffer));
}
LOG_INF("LWM2M engine thread paused (%s) ", str);
return 0;
}
int lwm2m_engine_resume(void)
{
char buffer[32];
const char *str;
str = k_thread_state_str(engine_thread_id, buffer, sizeof(buffer));
if (strcmp(str, "suspended")) {
LOG_WRN("LWM2M engine thread state not ok for resume %s", str);
return -EPERM;
}
k_thread_resume(engine_thread_id);
str = k_thread_state_str(engine_thread_id, buffer, sizeof(buffer));
LOG_INF("LWM2M engine thread resume (%s)", str);
return 0;
}
static int lwm2m_engine_init(const struct device *dev)
{
int i;
@ -870,7 +969,7 @@ static int lwm2m_engine_init(const struct device *dev)
(void)memset(block1_contexts, 0, sizeof(block1_contexts));
/* start sock receive thread */
k_thread_create(&engine_thread_data, &engine_thread_stack[0],
engine_thread_id = k_thread_create(&engine_thread_data, &engine_thread_stack[0],
K_KERNEL_STACK_SIZEOF(engine_thread_stack), (k_thread_entry_t)socket_loop,
NULL, NULL, NULL, THREAD_PRIORITY, 0, K_NO_WAIT);
k_thread_name_set(&engine_thread_data, "lwm2m-sock-recv");

View file

@ -83,11 +83,11 @@ uint8_t lwm2m_firmware_get_update_result(void);
int lwm2m_socket_add(struct lwm2m_ctx *ctx);
void lwm2m_socket_del(struct lwm2m_ctx *ctx);
int lwm2m_socket_start(struct lwm2m_ctx *client_ctx);
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
int lwm2m_engine_close_socket_connection(struct lwm2m_ctx *client_ctx);
int lwm2m_engine_connection_resume(struct lwm2m_ctx *client_ctx);
int lwm2m_open_socket(struct lwm2m_ctx *client_ctx);
int lwm2m_close_socket(struct lwm2m_ctx *client_ctx);
int lwm2m_socket_suspend(struct lwm2m_ctx *client_ctx);
int lwm2m_push_queued_buffers(struct lwm2m_ctx *client_ctx);
#endif
/* Resources */
struct lwm2m_ctx **lwm2m_sock_ctx(void);

View file

@ -196,8 +196,9 @@ int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx)
coap_pendings_clear(client_ctx->pendings, ARRAY_SIZE(client_ctx->pendings));
coap_replies_clear(client_ctx->replies, ARRAY_SIZE(client_ctx->replies));
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
client_ctx->connection_suspended = false;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
client_ctx->buffer_client_messages = true;
#endif
lwm2m_socket_del(client_ctx);
@ -213,9 +214,9 @@ void lwm2m_engine_context_init(struct lwm2m_ctx *client_ctx)
{
sys_slist_init(&client_ctx->pending_sends);
sys_slist_init(&client_ctx->observer);
client_ctx->connection_suspended = false;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
client_ctx->buffer_client_messages = true;
client_ctx->connection_suspended = false;
sys_slist_init(&client_ctx->queued_messages);
#endif
}

View file

@ -55,9 +55,11 @@ LOG_MODULE_REGISTER(LOG_MODULE_NAME);
#include <errno.h>
#include <zephyr/init.h>
#include <zephyr/sys/printk.h>
#include <zephyr/net/socket.h>
#include "lwm2m_object.h"
#include "lwm2m_engine.h"
#include "lwm2m_rd_client.h"
#include "lwm2m_rw_link_format.h"
#define LWM2M_RD_CLIENT_URI "rd"
@ -92,6 +94,7 @@ enum sm_engine_state {
ENGINE_REGISTRATION_DONE,
ENGINE_REGISTRATION_DONE_RX_OFF,
ENGINE_UPDATE_SENT,
ENGINE_SUSPENDED,
ENGINE_DEREGISTER,
ENGINE_DEREGISTER_SENT,
ENGINE_DEREGISTERED,
@ -125,6 +128,7 @@ struct lwm2m_rd_client_info {
* documented in the LwM2M specification.
*/
static char query_buffer[MAX(32, sizeof("ep=") + CLIENT_EP_LEN)];
static enum sm_engine_state suspended_client_state;
static struct lwm2m_message *rd_get_message(void)
{
@ -166,20 +170,13 @@ static void set_sm_state(uint8_t sm_state)
if (client.engine_state == ENGINE_UPDATE_SENT &&
(sm_state == ENGINE_REGISTRATION_DONE ||
sm_state == ENGINE_REGISTRATION_DONE_RX_OFF)) {
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
lwm2m_push_queued_buffers(client.ctx);
#endif
event = LWM2M_RD_CLIENT_EVENT_REG_UPDATE_COMPLETE;
} else if (sm_state == ENGINE_REGISTRATION_DONE) {
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
lwm2m_push_queued_buffers(client.ctx);
#endif
event = LWM2M_RD_CLIENT_EVENT_REGISTRATION_COMPLETE;
} else if (sm_state == ENGINE_REGISTRATION_DONE_RX_OFF) {
event = LWM2M_RD_CLIENT_EVENT_QUEUE_MODE_RX_OFF;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
lwm2m_engine_close_socket_connection(client.ctx);
#endif
} else if ((sm_state == ENGINE_INIT ||
sm_state == ENGINE_DEREGISTERED) &&
(client.engine_state >= ENGINE_DO_REGISTRATION &&
@ -201,6 +198,15 @@ static void set_sm_state(uint8_t sm_state)
if (event > LWM2M_RD_CLIENT_EVENT_NONE && client.ctx->event_cb) {
client.ctx->event_cb(client.ctx, event);
}
/* Suspend socket after Event callback */
if (event == LWM2M_RD_CLIENT_EVENT_QUEUE_MODE_RX_OFF) {
if (IS_ENABLED(CONFIG_LWM2M_RD_CLIENT_SUSPEND_SOCKET_AT_IDLE)) {
lwm2m_socket_suspend(client.ctx);
} else {
lwm2m_close_socket(client.ctx);
}
}
}
static bool sm_is_registered(void)
@ -981,7 +987,7 @@ static int sm_registration_done(void)
update_objects = client.update_objects;
client.trigger_update = false;
client.update_objects = false;
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
ret = lwm2m_engine_connection_resume(client.ctx);
if (ret) {
lwm2m_engine_context_close(client.ctx);
@ -989,7 +995,6 @@ static int sm_registration_done(void)
set_sm_state(ENGINE_DO_REGISTRATION);
return ret;
}
#endif
ret = sm_send_registration(update_objects,
do_update_reply_cb,
@ -1102,6 +1107,9 @@ static void lwm2m_rd_client_service(struct k_work *work)
sm_do_init();
break;
case ENGINE_SUSPENDED:
break;
#if defined(CONFIG_LWM2M_RD_CLIENT_SUPPORT_BOOTSTRAP)
case ENGINE_DO_BOOTSTRAP_REG:
sm_do_bootstrap_reg();
@ -1231,6 +1239,77 @@ int lwm2m_rd_client_stop(struct lwm2m_ctx *client_ctx,
return 0;
}
int lwm2m_rd_client_pause(void)
{
enum lwm2m_rd_client_event event = LWM2M_RD_CLIENT_EVENT_ENGINE_SUSPENDED;
k_mutex_lock(&client.mutex, K_FOREVER);
if (!client.ctx) {
k_mutex_unlock(&client.mutex);
LOG_ERR("Cannot pause. No context");
return -EPERM;
} else if (client.engine_state == ENGINE_SUSPENDED) {
k_mutex_unlock(&client.mutex);
LOG_ERR("LwM2M client already suspended");
return 0;
}
LOG_INF("Suspend client");
if (!client.ctx->connection_suspended && client.ctx->event_cb) {
client.ctx->event_cb(client.ctx, event);
}
suspended_client_state = get_sm_state();
client.engine_state = ENGINE_SUSPENDED;
k_mutex_unlock(&client.mutex);
return 0;
}
int lwm2m_rd_client_resume(void)
{
int ret;
k_mutex_lock(&client.mutex, K_FOREVER);
if (!client.ctx) {
k_mutex_unlock(&client.mutex);
LOG_WRN("Cannot resume. No context");
return -EPERM;
}
if (client.engine_state != ENGINE_SUSPENDED) {
k_mutex_unlock(&client.mutex);
LOG_WRN("Cannot resume state is not Suspended");
return -EPERM;
}
LOG_INF("Resume Client state");
lwm2m_close_socket(client.ctx);
client.engine_state = suspended_client_state;
if (!sm_is_registered() ||
(sm_is_registered() &&
(client.lifetime <= (k_uptime_get() - client.last_update) / 1000))) {
client.engine_state = ENGINE_DO_REGISTRATION;
} else {
lwm2m_rd_client_connection_resume(client.ctx);
client.trigger_update = true;
}
ret = lwm2m_open_socket(client.ctx);
if (ret) {
LOG_ERR("Socket Open Fail");
client.engine_state = ENGINE_INIT;
}
k_mutex_unlock(&client.mutex);
return 0;
}
void lwm2m_rd_client_update(void)
{
engine_trigger_update(false);
@ -1241,7 +1320,6 @@ struct lwm2m_ctx *lwm2m_rd_client_ctx(void)
return client.ctx;
}
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
int lwm2m_rd_client_connection_resume(struct lwm2m_ctx *client_ctx)
{
if (client.ctx != client_ctx) {
@ -1268,7 +1346,6 @@ int lwm2m_rd_client_connection_resume(struct lwm2m_ctx *client_ctx)
return 0;
}
#endif
int lwm2m_rd_client_timeout(struct lwm2m_ctx *client_ctx)
{

View file

@ -40,15 +40,15 @@
void engine_trigger_update(bool update_objects);
int engine_trigger_bootstrap(void);
int lwm2m_rd_client_pause(void);
int lwm2m_rd_client_resume(void);
int lwm2m_rd_client_timeout(struct lwm2m_ctx *client_ctx);
bool lwm2m_rd_client_is_registred(struct lwm2m_ctx *client_ctx);
#if defined(CONFIG_LWM2M_RD_CLIENT_SUPPORT_BOOTSTRAP)
void engine_bootstrap_finish(void);
#endif
#if defined(CONFIG_LWM2M_QUEUE_MODE_ENABLED)
int lwm2m_rd_client_connection_resume(struct lwm2m_ctx *client_ctx);
#endif
void engine_update_tx_time(void);
struct lwm2m_message *lwm2m_get_ongoing_rd_msg(void);