net: mqtt: Add write message handler

Add new transport handler for MQTT, with sendmsg-like functionality.
This allows TCP transport to send PUBLISH packets w/o fragmentation at
the TCP layer. Implement this new functionality for all existing
transports.

Fixes #22679

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2020-02-12 15:25:08 +01:00 committed by Jukka Rissanen
parent 5b58273a34
commit 0c8d81c46f
6 changed files with 126 additions and 6 deletions

View file

@ -159,6 +159,27 @@ static int client_write(struct mqtt_client *client, const u8_t *data,
return 0;
}
static int client_write_msg(struct mqtt_client *client,
const struct msghdr *message)
{
int err_code;
MQTT_TRC("[%p]: Transport writing message.", client);
err_code = mqtt_transport_write_msg(client, message);
if (err_code < 0) {
MQTT_TRC("Transport write failed, err_code = %d, "
"closing connection", err_code);
client_disconnect(client, err_code);
return err_code;
}
MQTT_TRC("[%p]: Transport write complete.", client);
client->internal.last_activity = mqtt_sys_tick_in_ms_get();
return 0;
}
void mqtt_client_init(struct mqtt_client *client)
{
NULL_PARAM_CHECK_VOID(client);
@ -233,6 +254,8 @@ int mqtt_publish(struct mqtt_client *client,
{
int err_code;
struct buf_ctx packet;
struct iovec io_vector[2];
struct msghdr msg;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
@ -256,13 +279,17 @@ int mqtt_publish(struct mqtt_client *client,
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
if (err_code < 0) {
goto error;
}
io_vector[0].iov_base = packet.cur;
io_vector[0].iov_len = packet.end - packet.cur;
io_vector[1].iov_base = param->message.payload.data;
io_vector[1].iov_len = param->message.payload.len;
err_code = client_write(client, param->message.payload.data,
param->message.payload.len);
memset(&msg, 0, sizeof(msg));
msg.msg_iov = io_vector;
msg.msg_iovlen = ARRAY_SIZE(io_vector);
err_code = client_write_msg(client, &msg);
error:
MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x",

View file

@ -16,6 +16,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
{
mqtt_client_tcp_connect,
mqtt_client_tcp_write,
mqtt_client_tcp_write_msg,
mqtt_client_tcp_read,
mqtt_client_tcp_disconnect,
},
@ -23,6 +24,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
{
mqtt_client_tls_connect,
mqtt_client_tls_write,
mqtt_client_tls_write_msg,
mqtt_client_tls_read,
mqtt_client_tls_disconnect,
},
@ -31,6 +33,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
{
mqtt_client_websocket_connect,
mqtt_client_websocket_write,
mqtt_client_websocket_write_msg,
mqtt_client_websocket_read,
mqtt_client_websocket_disconnect,
},
@ -38,6 +41,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
{
mqtt_client_websocket_connect,
mqtt_client_websocket_write,
mqtt_client_websocket_write_msg,
mqtt_client_websocket_read,
mqtt_client_websocket_disconnect,
},
@ -57,6 +61,12 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
datalen);
}
int mqtt_transport_write_msg(struct mqtt_client *client,
const struct msghdr *message)
{
return transport_fn[client->transport.type].write_msg(client, message);
}
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{

View file

@ -25,6 +25,10 @@ typedef int (*transport_connect_handler_t)(struct mqtt_client *client);
typedef int (*transport_write_handler_t)(struct mqtt_client *client,
const u8_t *data, u32_t datalen);
/**@brief Transport write message handler, similar to POSIX sendmsg function. */
typedef int (*transport_write_msg_handler_t)(struct mqtt_client *client,
const struct msghdr *message);
/**@brief Transport read handler. */
typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data,
u32_t buflen, bool shall_block);
@ -44,6 +48,11 @@ struct transport_procedure {
*/
transport_write_handler_t write;
/** Transport write message handler. Handles transport write based
* on type of transport.
*/
transport_write_msg_handler_t write_msg;
/** Transport read handler. Handles transport read based on type of
* transport.
*/
@ -74,6 +83,17 @@ int mqtt_transport_connect(struct mqtt_client *client);
int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
/**@brief Handles write message requests on configured transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] message Pointer to the `struct msghdr` structure, containing data
* to be written on the transport.
*
* @retval 0 or an error code indicating reason for failure.
*/
int mqtt_transport_write_msg(struct mqtt_client *client,
const struct msghdr *message);
/**@brief Handles read requests on configured transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
@ -99,6 +119,8 @@ int mqtt_transport_disconnect(struct mqtt_client *client);
int mqtt_client_tcp_connect(struct mqtt_client *client);
int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
int mqtt_client_tcp_write_msg(struct mqtt_client *client,
const struct msghdr *message);
int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data,
u32_t buflen, bool shall_block);
int mqtt_client_tcp_disconnect(struct mqtt_client *client);
@ -108,6 +130,8 @@ int mqtt_client_tcp_disconnect(struct mqtt_client *client);
int mqtt_client_tls_connect(struct mqtt_client *client);
int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
int mqtt_client_tls_write_msg(struct mqtt_client *client,
const struct msghdr *message);
int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data,
u32_t buflen, bool shall_block);
int mqtt_client_tls_disconnect(struct mqtt_client *client);
@ -117,6 +141,8 @@ int mqtt_client_tls_disconnect(struct mqtt_client *client);
int mqtt_client_websocket_connect(struct mqtt_client *client);
int mqtt_client_websocket_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
int mqtt_client_websocket_write_msg(struct mqtt_client *client,
const struct msghdr *message);
int mqtt_client_websocket_read(struct mqtt_client *client, u8_t *data,
u32_t buflen, bool shall_block);
int mqtt_client_websocket_disconnect(struct mqtt_client *client);

View file

@ -79,6 +79,20 @@ int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
return 0;
}
int mqtt_client_tcp_write_msg(struct mqtt_client *client,
const struct msghdr *message)
{
int ret;
ret = sendmsg(client->transport.tcp.sock, message, 0);
if (ret < 0) {
return -errno;
}
return 0;
}
int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{

View file

@ -117,6 +117,19 @@ int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
return 0;
}
int mqtt_client_tls_write_msg(struct mqtt_client *client,
const struct msghdr *message)
{
int ret;
ret = sendmsg(client->transport.tls.sock, message, 0);
if (ret < 0) {
return -errno;
}
return 0;
}
int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{

View file

@ -107,6 +107,36 @@ int mqtt_client_websocket_write(struct mqtt_client *client, const u8_t *data,
return 0;
}
int mqtt_client_websocket_write_msg(struct mqtt_client *client,
const struct msghdr *message)
{
enum websocket_opcode opcode = WEBSOCKET_OPCODE_DATA_BINARY;
bool final = false;
ssize_t len;
ssize_t ret;
int i;
len = 0;
for (i = 0; i < message->msg_iovlen; i++) {
if (i == message->msg_iovlen - 1) {
final = true;
}
ret = websocket_send_msg(client->transport.websocket.sock,
message->msg_iov[i].iov_base,
message->msg_iov[i].iov_len, opcode,
true, final, K_FOREVER);
if (ret < 0) {
return ret;
}
opcode = WEBSOCKET_OPCODE_CONTINUE;
len += ret;
}
return len;
}
int mqtt_client_websocket_read(struct mqtt_client *client, u8_t *data,
u32_t buflen, bool shall_block)
{