net/mqtt: Convert mqtt lib to use net app API

Use net app API since we want to enable MQTT with TLS.
mqtt_connect() and mqtt_close() are added to build and close the
connection to the broker. The caller doesn't need to deal with
the net context anymore and the most of network setup code in
mqtt_publisher is removed.

Signed-off-by: Aska Wu <aska.wu@linaro.org>
This commit is contained in:
Aska Wu 2017-06-30 17:22:19 +08:00 committed by Jukka Rissanen
parent 6bb446ed8e
commit fb79837862
9 changed files with 183 additions and 372 deletions

View file

@ -9,6 +9,7 @@
#include <net/mqtt_types.h>
#include <net/net_context.h>
#include <net/net_app.h>
#ifdef __cplusplus
extern "C" {
@ -61,11 +62,15 @@ enum mqtt_app {
* the state of the received and sent messages.</b>
*/
struct mqtt_ctx {
/** IP stack context structure */
struct net_context *net_ctx;
/** Network timeout for tx and rx routines */
/** Net app context structure */
struct net_app_ctx net_app_ctx;
s32_t net_init_timeout;
s32_t net_timeout;
/** Connectivity */
char *peer_addr_str;
u16_t peer_port;
/** Callback executed when a MQTT CONNACK msg is received and validated.
* If this function pointer is not used, must be set to NULL.
*/
@ -176,6 +181,23 @@ struct mqtt_ctx {
*/
int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type);
/**
* Release the MQTT context structure
*
* @param ctx MQTT context structure
* @retval 0 on success, and <0 if error
*/
int mqtt_close(struct mqtt_ctx *ctx);
/**
* Connect to an MQTT broker
*
* @param ctx MQTT context structure
* @retval 0 on success, and <0 if error
*/
int mqtt_connect(struct mqtt_ctx *ctx);
/**
* Sends the MQTT CONNECT message
*

View file

@ -29,6 +29,7 @@
#define APP_SLEEP_MSECS 500
#define APP_TX_RX_TIMEOUT 300
#define APP_NET_INIT_TIMEOUT 10000
#define APP_CONNECT_TRIES 10

View file

@ -61,15 +61,11 @@ struct mqtt_client_ctx {
void *publish_data;
};
/* This is the network context structure. */
static struct net_context *net_ctx;
/* The mqtt client struct */
static struct mqtt_client_ctx client_ctx;
/* This routine sets some basic properties for the network context variable */
static int network_setup(struct net_context **net_ctx, const char *local_addr,
const char *server_addr, u16_t server_port);
static int network_setup(void);
/* The signature of this routine must match the connect callback declared at
* the mqtt.h header.
@ -243,45 +239,24 @@ static void publisher(void)
{
int i, rc;
/* The net_ctx variable must be ready BEFORE passing it to the MQTT API.
*/
for (i = 0; i < CONN_TRIES; i++) {
rc = network_setup(&net_ctx, ZEPHYR_ADDR, SERVER_ADDR,
SERVER_PORT);
if (!rc) {
goto connected;
}
}
PRINT_RESULT("network_setup", rc);
goto exit_app;
connected:
/* Set everything to 0 and later just assign the required fields. */
memset(&client_ctx, 0x00, sizeof(client_ctx));
/* The network context is the only field that must be set BEFORE
* calling the mqtt_init routine.
*/
client_ctx.mqtt_ctx.net_ctx = net_ctx;
/* connect, disconnect and malformed may be set to NULL */
client_ctx.mqtt_ctx.connect = connect_cb;
client_ctx.mqtt_ctx.disconnect = disconnect_cb;
client_ctx.mqtt_ctx.malformed = malformed_cb;
client_ctx.mqtt_ctx.net_init_timeout = APP_NET_INIT_TIMEOUT;
client_ctx.mqtt_ctx.net_timeout = APP_TX_RX_TIMEOUT;
client_ctx.mqtt_ctx.peer_addr_str = SERVER_ADDR;
client_ctx.mqtt_ctx.peer_port = SERVER_PORT;
/* Publisher apps TX the MQTT PUBLISH msg */
client_ctx.mqtt_ctx.publish_tx = publish_cb;
rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER);
PRINT_RESULT("mqtt_init", rc);
if (rc != 0) {
goto exit_app;
}
/* The connect message will be sent to the MQTT server (broker).
* If clean_session here is 0, the mqtt_ctx clean_session variable
* will be set to 0 also. Please don't do that, set always to 1.
@ -295,6 +270,30 @@ connected:
client_ctx.disconnect_data = "DISCONNECTED";
client_ctx.publish_data = "PUBLISH";
rc = network_setup();
PRINT_RESULT("network_setup", rc);
if (rc < 0) {
return;
}
rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER);
PRINT_RESULT("mqtt_init", rc);
if (rc != 0) {
return;
}
for (i = 0; i < CONN_TRIES; i++) {
rc = mqtt_connect(&client_ctx.mqtt_ctx);
PRINT_RESULT("mqtt_connect", rc);
if (!rc) {
goto connected;
}
}
goto exit_app;
connected:
rc = try_to_connect(&client_ctx);
PRINT_RESULT("try_to_connect", rc);
if (rc != 0) {
@ -327,34 +326,12 @@ connected:
PRINT_RESULT("mqtt_tx_disconnect", rc);
exit_app:
net_context_put(net_ctx);
mqtt_close(&client_ctx.mqtt_ctx);
printk("\nBye!\n");
}
static int set_addr(struct sockaddr *sock_addr, const char *addr, u16_t port)
{
void *ptr;
int rc;
#ifdef CONFIG_NET_IPV6
net_sin6(sock_addr)->sin6_port = htons(port);
sock_addr->family = AF_INET6;
ptr = &(net_sin6(sock_addr)->sin6_addr);
rc = net_addr_pton(AF_INET6, addr, ptr);
#else
net_sin(sock_addr)->sin_port = htons(port);
sock_addr->family = AF_INET;
ptr = &(net_sin(sock_addr)->sin_addr);
rc = net_addr_pton(AF_INET, addr, ptr);
#endif
if (rc) {
printk("Invalid IP address: %s\n", addr);
}
return rc;
}
#if defined(CONFIG_NET_L2_BLUETOOTH)
static bool bt_connected;
@ -378,24 +355,13 @@ struct bt_conn_cb bt_conn_cb = {
};
#endif
static int network_setup(struct net_context **net_ctx, const char *local_addr,
const char *server_addr, u16_t server_port)
static int network_setup(void)
{
#ifdef CONFIG_NET_IPV6
socklen_t addr_len = sizeof(struct sockaddr_in6);
sa_family_t family = AF_INET6;
#else
socklen_t addr_len = sizeof(struct sockaddr_in);
sa_family_t family = AF_INET;
#endif
struct sockaddr server_sock, local_sock;
void *p;
int rc;
#if defined(CONFIG_NET_L2_BLUETOOTH)
const char *progress_mark = "/-\\|";
int i = 0;
int rc;
rc = bt_enable(NULL);
if (rc) {
@ -420,57 +386,7 @@ static int network_setup(struct net_context **net_ctx, const char *local_addr,
printk("\n");
#endif
rc = set_addr(&local_sock, local_addr, 0);
if (rc) {
printk("set_addr (local) error\n");
return rc;
}
#ifdef CONFIG_NET_IPV6
p = net_if_ipv6_addr_add(net_if_get_default(),
&net_sin6(&local_sock)->sin6_addr,
NET_ADDR_MANUAL, 0);
#else
p = net_if_ipv4_addr_add(net_if_get_default(),
&net_sin(&local_sock)->sin_addr,
NET_ADDR_MANUAL, 0);
#endif
if (!p) {
return -EINVAL;
}
rc = net_context_get(family, SOCK_STREAM, IPPROTO_TCP, net_ctx);
if (rc) {
printk("net_context_get error\n");
return rc;
}
rc = net_context_bind(*net_ctx, &local_sock, addr_len);
if (rc) {
printk("net_context_bind error\n");
goto lb_exit;
}
rc = set_addr(&server_sock, server_addr, server_port);
if (rc) {
printk("set_addr (server) error\n");
goto lb_exit;
}
rc = net_context_connect(*net_ctx, &server_sock, addr_len, NULL,
APP_SLEEP_MSECS, NULL);
if (rc) {
printk("net_context_connect error\n");
goto lb_exit;
}
return 0;
lb_exit:
net_context_put(*net_ctx);
return rc;
}
void main(void)

View file

@ -9,6 +9,7 @@
config MQTT_LIB
bool "MQTT Library Support"
default n
select NET_APP_CLIENT
help
Enable the Zephyr MQTT Library

View file

@ -9,6 +9,7 @@
#include <net/net_ip.h>
#include <net/net_pkt.h>
#include <net/net_app.h>
#include <net/buf.h>
#include <errno.h>
@ -42,7 +43,8 @@ int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg)
goto exit_connect;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
tx = net_app_get_net_pkt(&ctx->net_app_ctx,
AF_UNSPEC, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_connect;
@ -51,7 +53,8 @@ int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg)
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
rc = net_app_send_pkt(&ctx->net_app_ctx,
tx, NULL, 0, ctx->net_timeout, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
@ -79,7 +82,8 @@ int mqtt_tx_disconnect(struct mqtt_ctx *ctx)
return -EINVAL;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
tx = net_app_get_net_pkt(&ctx->net_app_ctx,
AF_UNSPEC, ctx->net_timeout);
if (tx == NULL) {
return -ENOMEM;
}
@ -90,7 +94,8 @@ int mqtt_tx_disconnect(struct mqtt_ctx *ctx)
goto exit_disconnect;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
rc = net_app_send_pkt(&ctx->net_app_ctx,
tx, NULL, 0, ctx->net_timeout, NULL);
if (rc < 0) {
goto exit_disconnect;
}
@ -152,7 +157,8 @@ int mqtt_tx_pub_msgs(struct mqtt_ctx *ctx, u16_t id,
return -EINVAL;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
tx = net_app_get_net_pkt(&ctx->net_app_ctx,
AF_UNSPEC, ctx->net_timeout);
if (tx == NULL) {
return -ENOMEM;
}
@ -163,7 +169,8 @@ int mqtt_tx_pub_msgs(struct mqtt_ctx *ctx, u16_t id,
goto exit_send;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
rc = net_app_send_pkt(&ctx->net_app_ctx,
tx, NULL, 0, ctx->net_timeout, NULL);
if (rc < 0) {
goto exit_send;
}
@ -215,7 +222,8 @@ int mqtt_tx_publish(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg)
goto exit_publish;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
tx = net_app_get_net_pkt(&ctx->net_app_ctx,
AF_UNSPEC, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_publish;
@ -224,7 +232,8 @@ int mqtt_tx_publish(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg)
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
rc = net_app_send_pkt(&ctx->net_app_ctx,
tx, NULL, 0, ctx->net_timeout, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
@ -251,7 +260,8 @@ int mqtt_tx_pingreq(struct mqtt_ctx *ctx)
return -EINVAL;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
tx = net_app_get_net_pkt(&ctx->net_app_ctx,
AF_UNSPEC, ctx->net_timeout);
if (tx == NULL) {
return -ENOMEM;
}
@ -262,7 +272,8 @@ int mqtt_tx_pingreq(struct mqtt_ctx *ctx)
goto exit_pingreq;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
rc = net_app_send_pkt(&ctx->net_app_ctx,
tx, NULL, 0, ctx->net_timeout, NULL);
if (rc < 0) {
goto exit_pingreq;
}
@ -296,7 +307,8 @@ int mqtt_tx_subscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
goto exit_subs;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
tx = net_app_get_net_pkt(&ctx->net_app_ctx,
AF_UNSPEC, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_subs;
@ -305,7 +317,8 @@ int mqtt_tx_subscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
rc = net_app_send_pkt(&ctx->net_app_ctx,
tx, NULL, 0, ctx->net_timeout, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
@ -339,7 +352,8 @@ int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
goto exit_unsub;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
tx = net_app_get_net_pkt(&ctx->net_app_ctx,
AF_UNSPEC, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_unsub;
@ -348,7 +362,8 @@ int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
rc = net_app_send_pkt(&ctx->net_app_ctx,
tx, NULL, 0, ctx->net_timeout, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
@ -743,13 +758,13 @@ int mqtt_parser(struct mqtt_ctx *ctx, struct net_pkt *rx)
}
static
void mqtt_recv(struct net_context *net_ctx, struct net_pkt *pkt, int status,
void mqtt_recv(struct net_app_ctx *ctx, struct net_pkt *pkt, int status,
void *data)
{
struct mqtt_ctx *mqtt = (struct mqtt_ctx *)data;
/* net_ctx is already referenced to by the mqtt_ctx struct */
ARG_UNUSED(net_ctx);
ARG_UNUSED(ctx);
if (status || !pkt) {
return;
@ -765,6 +780,49 @@ lb_exit:
net_pkt_unref(pkt);
}
int mqtt_connect(struct mqtt_ctx *ctx)
{
int rc = 0;
if (!ctx) {
return -EFAULT;
}
rc = net_app_init_tcp_client(&ctx->net_app_ctx,
NULL,
NULL,
ctx->peer_addr_str,
ctx->peer_port,
ctx->net_init_timeout,
ctx);
if (rc < 0) {
goto error_connect;
}
rc = net_app_set_cb(&ctx->net_app_ctx,
NULL,
mqtt_recv,
NULL,
NULL);
if (rc < 0) {
goto error_connect;
}
rc = net_app_connect(&ctx->net_app_ctx, ctx->net_timeout);
if (rc < 0) {
goto error_connect;
}
return rc;
error_connect:
/* clean net app context, so mqtt_connect() can be called repeatedly */
net_app_close(&ctx->net_app_ctx);
net_app_release(&ctx->net_app_ctx);
return rc;
}
int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type)
{
/* So far, only clean session = 1 is supported */
@ -774,10 +832,19 @@ int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type)
ctx->app_type = app_type;
ctx->rcv = mqtt_parser;
/* Install the receiver callback, timeout is set to K_NO_WAIT.
* In this case, no return code is evaluated.
*/
(void)net_context_recv(ctx->net_ctx, mqtt_recv, K_NO_WAIT, ctx);
return 0;
}
int mqtt_close(struct mqtt_ctx *ctx)
{
if (!ctx) {
return -EFAULT;
}
if (ctx->net_app_ctx.is_init) {
net_app_close(&ctx->net_app_ctx);
net_app_release(&ctx->net_app_ctx);
}
return 0;
}

View file

@ -29,6 +29,7 @@
#define APP_SLEEP_MSECS 500
#define APP_TX_RX_TIMEOUT 300
#define APP_NET_INIT_TIMEOUT 1000
#define APP_CONNECT_TRIES 10

View file

@ -9,6 +9,7 @@
#include <net/net_context.h>
#include <net/net_pkt.h>
#include <net/net_app.h>
#include <string.h>
#include <errno.h>
@ -56,16 +57,9 @@ struct mqtt_client_ctx {
/* This is mqtt payload message. */
char payload[] = "DOORS:OPEN_QoSx";
/* This is the network context structure. */
static struct net_context *net_ctx;
/* The mqtt client struct */
static struct mqtt_client_ctx client_ctx;
/* This routine sets some basic properties for the network context variable */
static int network_setup(struct net_context **net_ctx, const char *local_addr,
const char *server_addr, u16_t server_port);
/* The signature of this routine must match the connect callback declared at
* the mqtt.h header.
*/
@ -223,37 +217,24 @@ static int init_network(void)
{
int rc;
/* The net_ctx variable must be ready BEFORE passing it to the MQTT API.
*/
rc = network_setup(&net_ctx, ZEPHYR_ADDR, SERVER_ADDR, SERVER_PORT);
if (rc != 0) {
goto exit_app;
}
/* Set everything to 0 and later just assign the required fields. */
memset(&client_ctx, 0x00, sizeof(client_ctx));
/* The network context is the only field that must be set BEFORE
* calling the mqtt_init routine.
*/
client_ctx.mqtt_ctx.net_ctx = net_ctx;
/* connect, disconnect and malformed may be set to NULL */
client_ctx.mqtt_ctx.connect = connect_cb;
client_ctx.mqtt_ctx.disconnect = disconnect_cb;
client_ctx.mqtt_ctx.malformed = malformed_cb;
client_ctx.mqtt_ctx.net_init_timeout = APP_NET_INIT_TIMEOUT;
client_ctx.mqtt_ctx.net_timeout = APP_TX_RX_TIMEOUT;
client_ctx.mqtt_ctx.peer_addr_str = SERVER_ADDR;
client_ctx.mqtt_ctx.peer_port = SERVER_PORT;
/* Publisher apps TX the MQTT PUBLISH msg */
client_ctx.mqtt_ctx.publish_tx = publish_cb;
rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER);
if (rc != 0) {
goto exit_app;
}
/* The connect message will be sent to the MQTT server (broker).
* If clean_session here is 0, the mqtt_ctx clean_session variable
* will be set to 0 also. Please don't do that, set always to 1.
@ -267,10 +248,20 @@ static int init_network(void)
client_ctx.disconnect_data = "DISCONNECTED";
client_ctx.publish_data = "PUBLISH";
rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER);
if (rc != 0) {
goto exit_app;
}
rc = mqtt_connect(&client_ctx.mqtt_ctx);
if (!rc) {
goto exit_app;
}
return TC_PASS;
exit_app:
net_context_put(net_ctx);
mqtt_close(&client_ctx.mqtt_ctx);
return TC_FAIL;
}
@ -327,99 +318,6 @@ static int test_disconnect(void)
return TC_PASS;
}
static int set_addr(struct sockaddr *sock_addr, const char *addr, u16_t port)
{
void *ptr;
int rc;
#ifdef CONFIG_NET_IPV6
net_sin6(sock_addr)->sin6_port = htons(port);
sock_addr->family = AF_INET6;
ptr = &(net_sin6(sock_addr)->sin6_addr);
rc = net_addr_pton(AF_INET6, addr, ptr);
#else
net_sin(sock_addr)->sin_port = htons(port);
sock_addr->family = AF_INET;
ptr = &(net_sin(sock_addr)->sin_addr);
rc = net_addr_pton(AF_INET, addr, ptr);
#endif
if (rc) {
TC_PRINT("Invalid IP address: %s\n", addr);
}
return rc;
}
static int network_setup(struct net_context **net_ctx, const char *local_addr,
const char *server_addr, u16_t server_port)
{
#ifdef CONFIG_NET_IPV6
socklen_t addr_len = sizeof(struct sockaddr_in6);
sa_family_t family = AF_INET6;
#else
socklen_t addr_len = sizeof(struct sockaddr_in);
sa_family_t family = AF_INET;
#endif
struct sockaddr server_sock, local_sock;
void *p;
int rc;
rc = set_addr(&local_sock, local_addr, 0);
if (rc) {
TC_PRINT("set_addr (local) error\n");
return TC_FAIL;
}
#ifdef CONFIG_NET_IPV6
p = net_if_ipv6_addr_add(net_if_get_default(),
&net_sin6(&local_sock)->sin6_addr,
NET_ADDR_MANUAL, 0);
#else
p = net_if_ipv4_addr_add(net_if_get_default(),
&net_sin(&local_sock)->sin_addr,
NET_ADDR_MANUAL, 0);
#endif
if (!p) {
return TC_FAIL;
}
rc = net_context_get(family, SOCK_STREAM, IPPROTO_TCP, net_ctx);
if (rc) {
TC_PRINT("net_context_get error\n");
return TC_FAIL;
}
rc = net_context_bind(*net_ctx, &local_sock, addr_len);
if (rc) {
TC_PRINT("net_context_bind error\n");
goto lb_exit;
}
rc = set_addr(&server_sock, server_addr, server_port);
if (rc) {
TC_PRINT("set_addr (server) error\n");
goto lb_exit;
}
rc = net_context_connect(*net_ctx, &server_sock, addr_len, NULL,
APP_SLEEP_MSECS, NULL);
if (rc) {
TC_PRINT("net_context_connect error\n"
"Is the server (broker) up and running?\n");
goto lb_exit;
}
return TC_PASS;
lb_exit:
net_context_put(*net_ctx);
return TC_FAIL;
}
void test_mqtt_init(void)
{
zassert_true(init_network() == TC_PASS, NULL);

View file

@ -29,6 +29,7 @@
#define APP_SLEEP_MSECS 500
#define APP_TX_RX_TIMEOUT 300
#define APP_NET_INIT_TIMEOUT 1000
#define APP_CONNECT_TRIES 10

View file

@ -60,16 +60,9 @@ struct mqtt_client_ctx {
void *unsubscribe_data;
};
/* This is the network context structure. */
static struct net_context *net_ctx;
/* The mqtt client struct */
static struct mqtt_client_ctx client_ctx;
/* This routine sets some basic properties for the network context variable */
static int network_setup(struct net_context **net_ctx, const char *local_addr,
const char *server_addr, u16_t server_port);
/* The signature of this routine must match the connect callback declared at
* the mqtt.h header.
*/
@ -249,21 +242,9 @@ static int init_network(void)
{
int rc;
/* The net_ctx variable must be ready BEFORE passing it to the MQTT API.
*/
rc = network_setup(&net_ctx, ZEPHYR_ADDR, SERVER_ADDR, SERVER_PORT);
if (rc != 0) {
goto exit_app;
}
/* Set everything to 0 and later just assign the required fields. */
memset(&client_ctx, 0x00, sizeof(client_ctx));
/* The network context is the only field that must be set BEFORE
* calling the mqtt_init routine.
*/
client_ctx.mqtt_ctx.net_ctx = net_ctx;
/* connect, disconnect and malformed may be set to NULL */
client_ctx.mqtt_ctx.connect = connect_cb;
@ -275,8 +256,14 @@ static int init_network(void)
client_ctx.mqtt_ctx.unsubscribe = unsubscribe_cb;
client_ctx.mqtt_ctx.net_init_timeout = APP_NET_INIT_TIMEOUT;
client_ctx.mqtt_ctx.net_timeout = APP_TX_RX_TIMEOUT;
client_ctx.mqtt_ctx.peer_addr_str = SERVER_ADDR;
client_ctx.mqtt_ctx.peer_port = SERVER_PORT;
/* Publisher apps TX the MQTT PUBLISH msg */
client_ctx.mqtt_ctx.publish_rx = publish_rx_cb;
@ -299,10 +286,20 @@ static int init_network(void)
client_ctx.subscribe_data = "SUBSCRIBE";
client_ctx.unsubscribe_data = "UNSUBSCRIBE";
rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_SUBSCRIBER);
if (rc != 0) {
goto exit_app;
}
rc = mqtt_connect(&client_ctx.mqtt_ctx);
if (!rc) {
goto exit_app;
}
return TC_PASS;
exit_app:
net_context_put(net_ctx);
mqtt_close(&client_ctx.mqtt_ctx);
return TC_FAIL;
}
@ -364,99 +361,6 @@ static int test_disconnect(void)
return TC_PASS;
}
static int set_addr(struct sockaddr *sock_addr, const char *addr, u16_t port)
{
void *ptr;
int rc;
#ifdef CONFIG_NET_IPV6
net_sin6(sock_addr)->sin6_port = htons(port);
sock_addr->family = AF_INET6;
ptr = &(net_sin6(sock_addr)->sin6_addr);
rc = net_addr_pton(AF_INET6, addr, ptr);
#else
net_sin(sock_addr)->sin_port = htons(port);
sock_addr->family = AF_INET;
ptr = &(net_sin(sock_addr)->sin_addr);
rc = net_addr_pton(AF_INET, addr, ptr);
#endif
if (rc) {
printk("Invalid IP address: %s\n", addr);
}
return rc;
}
static int network_setup(struct net_context **net_ctx, const char *local_addr,
const char *server_addr, u16_t server_port)
{
#ifdef CONFIG_NET_IPV6
socklen_t addr_len = sizeof(struct sockaddr_in6);
sa_family_t family = AF_INET6;
#else
socklen_t addr_len = sizeof(struct sockaddr_in);
sa_family_t family = AF_INET;
#endif
struct sockaddr server_sock, local_sock;
void *p;
int rc;
rc = set_addr(&local_sock, local_addr, 0);
if (rc) {
printk("set_addr (local) error\n");
return rc;
}
#ifdef CONFIG_NET_IPV6
p = net_if_ipv6_addr_add(net_if_get_default(),
&net_sin6(&local_sock)->sin6_addr,
NET_ADDR_MANUAL, 0);
#else
p = net_if_ipv4_addr_add(net_if_get_default(),
&net_sin(&local_sock)->sin_addr,
NET_ADDR_MANUAL, 0);
#endif
if (!p) {
return -EINVAL;
}
rc = net_context_get(family, SOCK_STREAM, IPPROTO_TCP, net_ctx);
if (rc) {
printk("net_context_get error\n");
return rc;
}
rc = net_context_bind(*net_ctx, &local_sock, addr_len);
if (rc) {
printk("net_context_bind error\n");
goto lb_exit;
}
rc = set_addr(&server_sock, server_addr, server_port);
if (rc) {
printk("set_addr (server) error\n");
goto lb_exit;
}
rc = net_context_connect(*net_ctx, &server_sock, addr_len, NULL,
APP_SLEEP_MSECS, NULL);
if (rc) {
printk("net_context_connect error\n"
"Is the server (broker) up and running?\n");
goto lb_exit;
}
return 0;
lb_exit:
net_context_put(*net_ctx);
return rc;
}
void test_mqtt_init(void)
{
zassert_true(init_network() == TC_PASS, NULL);