samples: net: mqtt: Add a sample to connect to AWS IoT core

This sample application demonstrates the implementation of an
MQTT client that can publish messages to AWS IoT Core using
the MQTT protocol.

Fixes #22866

Signed-off-by: Lucas Dietrich <ld.adecy@gmail.com>
This commit is contained in:
Lucas Dietrich 2023-05-10 14:46:31 +02:00 committed by Carles Cufí
parent afd4161a84
commit e803b77463
16 changed files with 982 additions and 0 deletions

View file

@ -0,0 +1,24 @@
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(aws_iot_mqtt)
if(USE_DUMMY_CREDS)
set(creds "src/creds/dummy.c")
else()
if(NOT EXISTS ${APPLICATION_SOURCE_DIR}/src/creds/key.c OR
NOT EXISTS ${APPLICATION_SOURCE_DIR}/src/creds/cert.c OR
NOT EXISTS ${APPLICATION_SOURCE_DIR}/src/creds/ca.c)
message(FATAL_ERROR
"Credentials not found. Please run "
"'python3 src/creds/convert_keys.py' before building"
)
endif()
set(creds "src/creds/ca.c" "src/creds/key.c" "src/creds/cert.c")
endif()
target_sources(app PRIVATE "src/main.c" ${creds})
target_sources_ifdef(CONFIG_NET_DHCPV4 app PRIVATE "src/dhcp.c")

View file

@ -0,0 +1,78 @@
#
# Copyright (c) 2023 Lucas Dietrich <ld.adecy@gmail.com>
#
# SPDX-License-Identifier: Apache-2.0
#
menu "AWS"
mainmenu "AWS IoT Core MQTT sample application"
config AWS_ENDPOINT
string "AWS endpoint"
default ""
help
Endpoint (hostname) of the AWS MQTT broker.
Note that the endpoint is different when using AWS Device Advisor.
config AWS_THING_NAME
string "AWS Thing name"
default "myThingName"
help
Set the AWS Thing name created on IoT Console
config AWS_SUBSCRIBE_TOPIC
string "MQTT subscribe topic"
default "myThingName/downlink"
help
MQTT topic the client should subscribe to.
config AWS_PUBLISH_TOPIC
string "MQTT publish topic"
default "myThingName/data"
help
MQTT topic the client should publish to.
choice AWS_TEST_SUITE
prompt "Device Advisor test suite"
default AWS_TEST_SUITE_NONE
help
Select the AWS Device Advisor test suite to run.
config AWS_TEST_SUITE_NONE
bool "No test suite running"
config AWS_TEST_SUITE_DQP
bool "Device Qualification Program (DQP)"
help
Make sure your region supports AWS Device Advisor for DQP
config AWS_TEST_SUITE_RECV_QOS1
bool "Test suite for receiving QoS 1 messages"
help
For single test case "MQTT Client Puack QoS1"
endchoice
config AWS_QOS
int "MQTT QoS"
default 0 if AWS_TEST_SUITE_DQP
default 1 if AWS_TEST_SUITE_RECV_QOS1
default 0
range 0 1
help
Quality of Service to use for publishing and subscribing to topics.
Notes:
- Use QoS 0 when passing DQP test suite
- QoS 2 is not supported by AWS MQTT broker
config AWS_EXPONENTIAL_BACKOFF
bool "enable exponential backoff"
default n if AWS_TEST_SUITE_DQP || AWS_TEST_SUITE_RECV_QOS1
default y
help
Enable AWS exponential backoff for reconnecting to AWS MQTT broker.
endmenu
source "Kconfig.zephyr"

View file

@ -0,0 +1,107 @@
.. _aws-iot-mqtt-sample:
AWS IoT Core MQTT Sample
########################
Overview
********
This sample application demonstrates the implementation of an MQTT client that
can publish messages to AWS IoT Core using the MQTT protocol. Key features include:
- Acquiring a DHCPv4 lease
- Connecting to an SNTP server to obtain the current time
- Establishing a TLS 1.2 connection with AWS IoT Core servers
- Subscribing to a topic on AWS IoT Core
- Publishing data to AWS IoT Core
- Passing the AWS Device Qualification Program (DQP) test suite: `Device Qualification Program (DQP) <https://aws.amazon.com/partners/programs/dqp/>`_
- Sending and receiving keep-alive pings
- Retrying connections using an exponential backoff algorithm
Requirements
************
- An entropy source
- An AWS account with access to AWS IoT Core
- AWS credentials and necessary information
- Network connectivity
Building and Running
********************
This application has been built and tested on the ST NUCLEO-F429ZI board and
QEMU x86 target. A valid certificate and private key are required to
authenticate to the AWS IoT Core. The sample includes a script to convert
the certificate and private key in order to embed them in the application.
Register a *thing* in AWS IoT Core and download the certificate and private key.
Copy these files to the :zephyr_file:`samples/net/cloud/aws_iot_mqtt/src/creds`
directory. Run the :zephyr_file:`samples/net/cloud/aws_iot_mqtt/src/creds/convert_certs.py`
script, which will generate files ``ca.c``, ``cert.c`` and ``key.c``.
To configure the sample, set the following Kconfig options based on your AWS IoT
Core region, thing, and device advisor configuration:
- :kconfig:option:`CONFIG_AWS_ENDPOINT`: The AWS IoT Core broker endpoint, found in the AWS IoT Core
console. This will be specific if running a test suite using device advisor.
- :kconfig:option:`CONFIG_AWS_THING_NAME`: The name of the thing created in AWS IoT Core. Associated
with the certificate it will be used as the client id. We will use
``zephyr_sample`` in this example.
- :kconfig:option:`CONFIG_AWS_SUBSCRIBE_TOPIC`: The topic to subscribe to.
- :kconfig:option:`CONFIG_AWS_PUBLISH_TOPIC`: The topic to publish to.
- :kconfig:option:`CONFIG_AWS_QOS`: The QoS level for subscriptions and publications.
- :kconfig:option:`CONFIG_AWS_EXPONENTIAL_BACKOFF`: Enable the exponential backoff algorithm.
Refer to the `AWS IoT Core Documentation <https://docs.aws.amazon.com/iot/index.html>`_
for more information.
Additionnaly, it is possible to tune the firmware to pass the AWS DQP test
suite, to do set Kconfig option :kconfig:option:`CONFIG_AWS_TEST_SUITE_DQP` to ``y``.
More information about the AWS device advisor can be found here:
`AWS IoT Core Device Advisor <https://aws.amazon.com/iot-core/device-advisor/>`_.
MQTT test client
================
Access the MQTT test client in the AWS IoT Core console, subscribe to the
``zephyr_sample/data`` topic, and publish a payload to the ``zephyr_sample/downlink``
topic. The device console will display the payload received by your device, and
the AWS console will show the JSON message sent by the device under the
``zephyr_sample/data`` topic.
Sample output
=============
This is the output from the ST-Link UART on the NUCLEO-F429ZI board.
.. code-block:: console
*** Booting Zephyr OS build zephyr-v3.3.0 ***
[00:00:01.626,000] <inf> aws: starting DHCPv4
[00:00:01.969,000] <dbg> aws: sntp_sync_time: Acquired time from NTP server: 1683472436
[00:00:01.977,000] <inf> aws: Resolved: 52.212.60.110:8883
[00:00:03.327,000] <dbg> aws: mqtt_event_cb: MQTT event: CONNACK [0] result: 0
[00:00:03.327,000] <inf> aws: Subscribing to 1 topic(s)
[00:00:03.390,000] <dbg> aws: mqtt_event_cb: MQTT event: SUBACK [7] result: 0
[00:00:03.390,000] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 1 qos: 0 ], payload: 13 B
[00:00:03.390,000] <dbg> aws: publish_message: Published payload:
7b 22 63 6f 75 6e 74 65 72 22 3a 30 7d |{"counte r":0}
[00:00:11.856,000] <dbg> aws: mqtt_event_cb: MQTT event: PUBLISH [2] result: 0
[00:00:11.856,000] <inf> aws: RECEIVED on topic "zephyr_sample/downlink" [ id: 13 qos: 0 ] payload: 45 / 4096 B
[00:00:11.856,000] <dbg> aws: handle_published_message: Received payload:
7b 0a 20 20 22 6d 65 73 73 61 67 65 22 3a 20 22 |{. "mes sage": "
48 65 6c 6c 6f 20 66 72 6f 6d 20 41 57 53 20 49 |Hello fr om AWS I
6f 54 20 63 6f 6e 73 6f 6c 65 22 0a 7d |oT conso le".}
[00:00:11.857,000] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 2 qos: 0 ], payload: 13 B
[00:00:11.857,000] <dbg> aws: publish_message: Published payload:
7b 22 63 6f 75 6e 74 65 72 22 3a 31 7d |{"counte r":1}
[00:01:11.755,000] <dbg> aws: mqtt_event_cb: MQTT event: 9 result: 0
[00:02:11.755,000] <dbg> aws: mqtt_event_cb: MQTT event: 9 result: 0
Run in QEMU x86
===============
The sample can be run in QEMU x86. To do so, you will need to configure
NAT/MASQUERADE on your host machine. Refer to the Zephyr documentation
:ref:`networking_with_qemu`. for more information.

View file

@ -0,0 +1,4 @@
CONFIG_ENTROPY_DEVICE_RANDOM_GENERATOR=y
CONFIG_NET_DHCPV4=y
CONFIG_NET_DHCPV4_INITIAL_DELAY_MAX=2

View file

@ -0,0 +1,3 @@
&mac {
local-mac-address = [00 00 00 77 77 77];
};

View file

@ -0,0 +1,11 @@
CONFIG_QEMU_ICOUNT=n
# QEMU networking configuration
CONFIG_NET_CONFIG_SETTINGS=y
CONFIG_NET_CONFIG_NEED_IPV6=y
CONFIG_NET_CONFIG_MY_IPV6_ADDR="2001:db8::1"
CONFIG_NET_CONFIG_PEER_IPV6_ADDR="2001:db8::2"
CONFIG_NET_CONFIG_NEED_IPV4=y
CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.1"
CONFIG_NET_CONFIG_PEER_IPV4_ADDR="192.0.2.2"
CONFIG_NET_CONFIG_MY_IPV4_GW="192.0.2.2"

View file

@ -0,0 +1,60 @@
CONFIG_AWS_ENDPOINT="a31gokdeokxhl8-ats.iot.eu-west-1.amazonaws.com"
CONFIG_AWS_THING_NAME="zephyr_sample"
CONFIG_AWS_PUBLISH_TOPIC="zephyr_sample/data"
CONFIG_AWS_SUBSCRIBE_TOPIC="zephyr_sample/downlink"
CONFIG_AWS_TEST_SUITE_DQP=n
CONFIG_MAIN_STACK_SIZE=4096
CONFIG_ENTROPY_GENERATOR=y
CONFIG_TEST_RANDOM_GENERATOR=y
CONFIG_INIT_STACKS=y
CONFIG_HW_STACK_PROTECTION=y
CONFIG_NEWLIB_LIBC=y
CONFIG_SNTP=y
CONFIG_JSON_LIBRARY=y
CONFIG_POSIX_CLOCK=y
# DNS
CONFIG_DNS_RESOLVER=y
CONFIG_DNS_RESOLVER_ADDITIONAL_BUF_CTR=2
CONFIG_DNS_RESOLVER_MAX_SERVERS=1
CONFIG_DNS_SERVER_IP_ADDRESSES=y
CONFIG_DNS_SERVER1="8.8.8.8"
CONFIG_NET_SOCKETS_DNS_TIMEOUT=5000
CONFIG_DNS_RESOLVER_LOG_LEVEL_DBG=n
# Generic networking options
CONFIG_NETWORKING=y
CONFIG_NET_UDP=y
CONFIG_NET_TCP=y
CONFIG_NET_IPV6=y
CONFIG_NET_IPV4=y
CONFIG_NET_SOCKETS=y
CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
# Logging
CONFIG_LOG=y
# Network buffers
CONFIG_NET_PKT_RX_COUNT=32
CONFIG_NET_PKT_TX_COUNT=16
CONFIG_NET_BUF_RX_COUNT=64
CONFIG_NET_BUF_TX_COUNT=32
# MQTT
CONFIG_MQTT_LIB=y
CONFIG_MQTT_LIB_TLS=y
CONFIG_MQTT_KEEPALIVE=60
# TLS
CONFIG_MBEDTLS=y
CONFIG_MBEDTLS_BUILTIN=y
CONFIG_MBEDTLS_ENABLE_HEAP=y
CONFIG_MBEDTLS_HEAP_SIZE=65536
CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=16384
CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
CONFIG_MBEDTLS_SERVER_NAME_INDICATION=y
CONFIG_MBEDTLS_AES_ROM_TABLES=y
CONFIG_MBEDTLS_TLS_VERSION_1_2=y
CONFIG_MBEDTLS_MEMORY_DEBUG=y
CONFIG_MBEDTLS_HAVE_TIME_DATE=y

View file

@ -0,0 +1,14 @@
sample:
description: MQTT sample app to AWS IoT Core
name: aws_iot_mqtt
common:
tags: net mqtt cloud
harness: net
filter: TOOLCHAIN_HAS_NEWLIB == 1
extra_args: USE_DUMMY_CREDS=1
tests:
sample.net.cloud.aws_iot_mqtt:
depends_on: netif
platform_allow: qemu_x86 nucleo_f429zi
integration_platforms:
- qemu_x86

View file

@ -0,0 +1,6 @@
*-certificate.pem.crt
*-private.pem.key
*-public.pem.key
/ca.c
/key.c
/cert.c

View file

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDQTCCAimgAwIBAgITBmyfz5m/jAo54vB4ikPmljZbyjANBgkqhkiG9w0BAQsF
ADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6
b24gUm9vdCBDQSAxMB4XDTE1MDUyNjAwMDAwMFoXDTM4MDExNzAwMDAwMFowOTEL
MAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv
b3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj
ca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM
9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw
IFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6
VOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L
93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm
jgSubJrIqg0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMC
AYYwHQYDVR0OBBYEFIQYzIU07LwMlJQuCFmcx7IQTgoIMA0GCSqGSIb3DQEBCwUA
A4IBAQCY8jdaQZChGsV2USggNiMOruYou6r4lK5IpDB/G/wkjUu0yKGX9rbxenDI
U5PMCCjjmCXPI6T53iHTfIUJrU6adTrCC2qJeHZERxhlbI1Bjjt/msv0tadQ1wUs
N+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv
o/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU
5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy
rqXRfboQnoZsG4q5WTP468SQvvG5
-----END CERTIFICATE-----

View file

@ -0,0 +1,48 @@
# Copyright (c) 2023 Lucas Dietrich <ld.adecy@gmail.com>
# SPDX-License-Identifier: Apache-2.0
import os
import glob
def bin2array(name, fin, fout):
with open(fin, 'rb') as f:
data = f.read()
data += b'\0' # add null terminator
with open(fout, 'w') as f:
f.write("#include <stdint.h>\n")
f.write(f"const uint8_t {name}[] = {{")
for i in range(0, len(data), 16):
f.write("\n\t")
f.write(", ".join(f"0x{b:02x}" for b in data[i:i+16]))
f.write(",")
f.write("\n};\n")
f.write(f"const uint32_t {name}_len = sizeof({name});\n")
print(
f"[{name.center(13, ' ')}]: {os.path.relpath(fin)} -> {os.path.relpath(fout)}")
if __name__ == "__main__":
creds_dir = os.path.dirname(os.path.realpath(__file__))
creds = glob.glob(f"{creds_dir}/*.pem.*")
cert_found, key_found = False, False
for cred in creds:
if cred.endswith('-certificate.pem.crt'):
bin2array("public_cert", cred, os.path.join(creds_dir, "cert.c"))
cert_found = True
elif cred.endswith('-private.pem.key'):
bin2array("private_key", cred, os.path.join(creds_dir, "key.c"))
key_found = True
if not cert_found:
print("No certificate found !")
if not key_found:
print("No private key found !")
bin2array("ca_cert", os.path.join(creds_dir, "AmazonRootCA1.pem"),
os.path.join(creds_dir, "ca.c"))

View file

@ -0,0 +1,21 @@
/*
* Copyright (c) 2023 Lucas Dietrich <ld.adecy@gmail.com>
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef _AWS_CERTS_H_
#define _AWS_CERTS_H_
#include <stdint.h>
extern const uint8_t ca_cert[];
extern const uint32_t ca_cert_len;
extern const uint8_t private_key[];
extern const uint32_t private_key_len;
extern const uint8_t public_cert[];
extern const uint32_t public_cert_len;
#endif /* _AWS_CERTS_H_ */

View file

@ -0,0 +1,13 @@
/*
* Copyright (c) 2023 Lucas Dietrich <ld.adecy@gmail.com>
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <stdint.h>
const uint8_t public_cert[] = { 0x00 };
const uint32_t public_cert_len = sizeof(public_cert);
const uint8_t ca_cert[] = { 0x00 };
const uint32_t ca_cert_len = sizeof(ca_cert);
const uint8_t private_key[] = { 0x00 };
const uint32_t private_key_len = sizeof(private_key);

View file

@ -0,0 +1,66 @@
/* DHCPv4 client startup. */
/*
* Copyright (c) 2018 Linaro Ltd
* Copyright (c) 2023 Lucas Dietrich <ld.adecy@gmail.com>
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/logging/log.h>
LOG_MODULE_DECLARE(aws, LOG_LEVEL_DBG);
#include <zephyr/kernel.h>
#include <zephyr/net/net_if.h>
#include <zephyr/net/net_core.h>
#include <zephyr/net/net_context.h>
#include <zephyr/net/net_mgmt.h>
static struct net_mgmt_event_callback mgmt_cb;
/* Semaphore to indicate a lease has been acquired. */
static K_SEM_DEFINE(got_address, 0, 1);
static void handler(struct net_mgmt_event_callback *cb,
uint32_t mgmt_event,
struct net_if *iface)
{
int i;
bool notified = false;
if (mgmt_event != NET_EVENT_IPV4_ADDR_ADD) {
return;
}
for (i = 0; i < NET_IF_MAX_IPV4_ADDR; i++) {
if (iface->config.ip.ipv4->unicast[i].addr_type !=
NET_ADDR_DHCP) {
continue;
}
if (!notified) {
k_sem_give(&got_address);
notified = true;
}
break;
}
}
/**
* Start a DHCP client, and wait for a lease to be acquired.
*/
void app_dhcpv4_startup(void)
{
LOG_INF("starting DHCPv4");
net_mgmt_init_event_callback(&mgmt_cb, handler,
NET_EVENT_IPV4_ADDR_ADD);
net_mgmt_add_event_callback(&mgmt_cb);
net_dhcpv4_start(net_if_get_default());
/* Wait for a lease. */
k_sem_take(&got_address, K_FOREVER);
}

View file

@ -0,0 +1,15 @@
/* DHCPv4 client startup. */
/*
* Copyright (c) 2018 Linaro Ltd
* Copyright (c) 2023 Lucas Dietrich <ld.adecy@gmail.com>
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef __DHCP_H__
#define __DHCP_H__
void app_dhcpv4_startup(void);
#endif

View file

@ -0,0 +1,492 @@
/*
* Copyright (c) 2023 Lucas Dietrich <ld.adecy@gmail.com>
*
* SPDX-License-Identifier: Apache-2.0
*/
#include "creds/creds.h"
#include "dhcp.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/dns_resolve.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/net/sntp.h>
#include <zephyr/net/tls_credentials.h>
#include <zephyr/data/json.h>
#include <zephyr/random/rand32.h>
#include <zephyr/posix/time.h>
#include <zephyr/logging/log.h>
#if defined(CONFIG_MBEDTLS_MEMORY_DEBUG)
#include <mbedtls/memory_buffer_alloc.h>
#endif
LOG_MODULE_REGISTER(aws, LOG_LEVEL_DBG);
#define SNTP_SERVER "0.pool.ntp.org"
#define AWS_BROKER_PORT "8883"
#define MQTT_BUFFER_SIZE 256u
#define APP_BUFFER_SIZE 4096u
#define MAX_RETRIES 10u
#define BACKOFF_EXP_BASE_MS 1000u
#define BACKOFF_EXP_MAX_MS 60000u
#define BACKOFF_CONST_MS 5000u
static struct sockaddr_in aws_broker;
static uint8_t rx_buffer[MQTT_BUFFER_SIZE];
static uint8_t tx_buffer[MQTT_BUFFER_SIZE];
static uint8_t buffer[APP_BUFFER_SIZE]; /* Shared between published and received messages */
static struct mqtt_client client_ctx;
static const char mqtt_client_name[] = CONFIG_AWS_THING_NAME;
static uint32_t messages_received_counter;
static bool do_publish; /* Trigger client to publish */
static bool do_subscribe; /* Trigger client to subscribe */
#define TLS_TAG_DEVICE_CERTIFICATE 1
#define TLS_TAG_DEVICE_PRIVATE_KEY 1
#define TLS_TAG_AWS_CA_CERTIFICATE 2
static sec_tag_t sec_tls_tags[] = {
TLS_TAG_DEVICE_CERTIFICATE,
TLS_TAG_AWS_CA_CERTIFICATE,
};
static int setup_credentials(void)
{
int ret;
ret = tls_credential_add(TLS_TAG_DEVICE_CERTIFICATE, TLS_CREDENTIAL_SERVER_CERTIFICATE,
public_cert, public_cert_len);
if (ret < 0) {
LOG_ERR("Failed to add device certificate: %d", ret);
goto exit;
}
ret = tls_credential_add(TLS_TAG_DEVICE_PRIVATE_KEY, TLS_CREDENTIAL_PRIVATE_KEY,
private_key, private_key_len);
if (ret < 0) {
LOG_ERR("Failed to add device private key: %d", ret);
goto exit;
}
ret = tls_credential_add(TLS_TAG_AWS_CA_CERTIFICATE, TLS_CREDENTIAL_CA_CERTIFICATE, ca_cert,
ca_cert_len);
if (ret < 0) {
LOG_ERR("Failed to add device private key: %d", ret);
goto exit;
}
exit:
return ret;
}
static int subscribe_topic(void)
{
int ret;
struct mqtt_topic topics[] = {{
.topic = {.utf8 = CONFIG_AWS_SUBSCRIBE_TOPIC,
.size = strlen(CONFIG_AWS_SUBSCRIBE_TOPIC)},
.qos = CONFIG_AWS_QOS,
}};
const struct mqtt_subscription_list sub_list = {
.list = topics,
.list_count = ARRAY_SIZE(topics),
.message_id = 1u,
};
LOG_INF("Subscribing to %hu topic(s)", sub_list.list_count);
ret = mqtt_subscribe(&client_ctx, &sub_list);
if (ret != 0) {
LOG_ERR("Failed to subscribe to topics: %d", ret);
}
return ret;
}
static int publish_message(const char *topic, size_t topic_len, uint8_t *payload,
size_t payload_len)
{
static uint32_t message_id = 1u;
int ret;
struct mqtt_publish_param msg;
msg.retain_flag = 0u;
msg.message.topic.topic.utf8 = topic;
msg.message.topic.topic.size = topic_len;
msg.message.topic.qos = CONFIG_AWS_QOS;
msg.message.payload.data = payload;
msg.message.payload.len = payload_len;
msg.message_id = message_id++;
ret = mqtt_publish(&client_ctx, &msg);
if (ret != 0) {
LOG_ERR("Failed to publish message: %d", ret);
}
LOG_INF("PUBLISHED on topic \"%s\" [ id: %u qos: %u ], payload: %u B", topic,
msg.message_id, msg.message.topic.qos, payload_len);
LOG_HEXDUMP_DBG(payload, payload_len, "Published payload:");
return ret;
}
static ssize_t handle_published_message(const struct mqtt_publish_param *pub)
{
int ret;
size_t received = 0u;
const size_t message_size = pub->message.payload.len;
const bool discarded = message_size > APP_BUFFER_SIZE;
LOG_INF("RECEIVED on topic \"%s\" [ id: %u qos: %u ] payload: %u / %u B",
(const char *)pub->message.topic.topic.utf8, pub->message_id,
pub->message.topic.qos, message_size, APP_BUFFER_SIZE);
while (received < message_size) {
uint8_t *p = discarded ? buffer : &buffer[received];
ret = mqtt_read_publish_payload_blocking(&client_ctx, p, APP_BUFFER_SIZE);
if (ret < 0) {
return ret;
}
received += ret;
}
if (!discarded) {
LOG_HEXDUMP_DBG(buffer, MIN(message_size, 256u), "Received payload:");
}
/* Send ACK */
switch (pub->message.topic.qos) {
case MQTT_QOS_1_AT_LEAST_ONCE: {
struct mqtt_puback_param puback;
puback.message_id = pub->message_id;
mqtt_publish_qos1_ack(&client_ctx, &puback);
} break;
case MQTT_QOS_2_EXACTLY_ONCE: /* unhandled (not supported by AWS) */
case MQTT_QOS_0_AT_MOST_ONCE: /* nothing to do */
default:
break;
}
return discarded ? -ENOMEM : received;
}
const char *mqtt_evt_type_to_str(enum mqtt_evt_type type)
{
static const char *const types[] = {
"CONNACK", "DISCONNECT", "PUBLISH", "PUBACK", "PUBREC",
"PUBREL", "PUBCOMP", "SUBACK", "UNSUBACK", "PINGRESP",
};
return (type < ARRAY_SIZE(types)) ? types[type] : "<unknown>";
}
static void mqtt_event_cb(struct mqtt_client *client, const struct mqtt_evt *evt)
{
LOG_DBG("MQTT event: %s [%u] result: %d", mqtt_evt_type_to_str(evt->type), evt->type,
evt->result);
switch (evt->type) {
case MQTT_EVT_CONNACK: {
do_subscribe = true;
} break;
case MQTT_EVT_PUBLISH: {
const struct mqtt_publish_param *pub = &evt->param.publish;
handle_published_message(pub);
messages_received_counter++;
#if !defined(CONFIG_AWS_TEST_SUITE_RECV_QOS1)
do_publish = true;
#endif
} break;
case MQTT_EVT_SUBACK: {
#if !defined(CONFIG_AWS_TEST_SUITE_RECV_QOS1)
do_publish = true;
#endif
} break;
case MQTT_EVT_PUBACK:
case MQTT_EVT_DISCONNECT:
case MQTT_EVT_PUBREC:
case MQTT_EVT_PUBREL:
case MQTT_EVT_PUBCOMP:
case MQTT_EVT_PINGRESP:
case MQTT_EVT_UNSUBACK:
default:
break;
}
}
static void aws_client_setup(void)
{
mqtt_client_init(&client_ctx);
client_ctx.broker = &aws_broker;
client_ctx.evt_cb = mqtt_event_cb;
client_ctx.client_id.utf8 = (uint8_t *)mqtt_client_name;
client_ctx.client_id.size = sizeof(mqtt_client_name) - 1;
client_ctx.password = NULL;
client_ctx.user_name = NULL;
client_ctx.keepalive = CONFIG_MQTT_KEEPALIVE;
client_ctx.protocol_version = MQTT_VERSION_3_1_1;
client_ctx.rx_buf = rx_buffer;
client_ctx.rx_buf_size = MQTT_BUFFER_SIZE;
client_ctx.tx_buf = tx_buffer;
client_ctx.tx_buf_size = MQTT_BUFFER_SIZE;
/* setup TLS */
client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
struct mqtt_sec_config *const tls_config = &client_ctx.transport.tls.config;
tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
tls_config->cipher_list = NULL;
tls_config->sec_tag_list = sec_tls_tags;
tls_config->sec_tag_count = ARRAY_SIZE(sec_tls_tags);
tls_config->hostname = CONFIG_AWS_ENDPOINT;
tls_config->cert_nocopy = TLS_CERT_NOCOPY_NONE;
}
struct backoff_context {
uint16_t retries_count;
uint16_t max_retries;
#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
uint32_t attempt_max_backoff; /* ms */
uint32_t max_backoff; /* ms */
#endif
};
static void backoff_context_init(struct backoff_context *bo)
{
__ASSERT_NO_MSG(bo != NULL);
bo->retries_count = 0u;
bo->max_retries = MAX_RETRIES;
#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
bo->attempt_max_backoff = BACKOFF_EXP_BASE_MS;
bo->max_backoff = BACKOFF_EXP_MAX_MS;
#endif
}
/* https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ */
static void backoff_get_next(struct backoff_context *bo, uint32_t *next_backoff_ms)
{
__ASSERT_NO_MSG(bo != NULL);
__ASSERT_NO_MSG(next_backoff_ms != NULL);
#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
if (bo->retries_count <= bo->max_retries) {
*next_backoff_ms = sys_rand32_get() % (bo->attempt_max_backoff + 1u);
/* Calculate max backoff for the next attempt (~ 2**attempt) */
bo->attempt_max_backoff = MIN(bo->attempt_max_backoff * 2u, bo->max_backoff);
bo->retries_count++;
}
#else
*next_backoff_ms = BACKOFF_CONST_MS;
#endif
}
static int aws_client_try_connect(void)
{
int ret;
uint32_t backoff_ms;
struct backoff_context bo;
backoff_context_init(&bo);
while (bo.retries_count <= bo.max_retries) {
ret = mqtt_connect(&client_ctx);
if (ret == 0) {
goto exit;
}
backoff_get_next(&bo, &backoff_ms);
LOG_ERR("Failed to connect: %d backoff delay: %u ms", ret, backoff_ms);
k_msleep(backoff_ms);
}
exit:
return ret;
}
struct publish_payload {
uint32_t counter;
};
static const struct json_obj_descr json_descr[] = {
JSON_OBJ_DESCR_PRIM(struct publish_payload, counter, JSON_TOK_NUMBER),
};
static int publish(void)
{
struct publish_payload pl = {.counter = messages_received_counter};
json_obj_encode_buf(json_descr, ARRAY_SIZE(json_descr), &pl, buffer, sizeof(buffer));
return publish_message(CONFIG_AWS_PUBLISH_TOPIC, strlen(CONFIG_AWS_PUBLISH_TOPIC), buffer,
strlen(buffer));
}
void aws_client_loop(void)
{
int rc;
int timeout;
struct zsock_pollfd fds;
aws_client_setup();
rc = aws_client_try_connect();
if (rc != 0) {
goto cleanup;
}
fds.fd = client_ctx.transport.tcp.sock;
fds.events = ZSOCK_POLLIN;
for (;;) {
timeout = mqtt_keepalive_time_left(&client_ctx);
rc = zsock_poll(&fds, 1u, timeout);
if (rc >= 0) {
if (fds.revents & ZSOCK_POLLIN) {
rc = mqtt_input(&client_ctx);
if (rc != 0) {
LOG_ERR("Failed to read MQTT input: %d", rc);
break;
}
}
if (fds.revents & (ZSOCK_POLLHUP | ZSOCK_POLLERR)) {
LOG_ERR("Socket closed/error");
break;
}
rc = mqtt_live(&client_ctx);
if ((rc != 0) && (rc != -EAGAIN)) {
LOG_ERR("Failed to live MQTT: %d", rc);
break;
}
} else {
LOG_ERR("poll failed: %d", rc);
break;
}
if (do_publish) {
do_publish = false;
publish();
}
if (do_subscribe) {
do_subscribe = false;
subscribe_topic();
}
}
cleanup:
mqtt_disconnect(&client_ctx);
zsock_close(fds.fd);
fds.fd = -1;
}
int sntp_sync_time(void)
{
int rc;
struct sntp_time now;
struct timespec tspec;
rc = sntp_simple(SNTP_SERVER, SYS_FOREVER_MS, &now);
if (rc == 0) {
tspec.tv_sec = now.seconds;
tspec.tv_nsec = ((uint64_t)now.fraction * (1000lu * 1000lu * 1000lu)) >> 32;
clock_settime(CLOCK_REALTIME, &tspec);
LOG_DBG("Acquired time from NTP server: %u", (uint32_t)tspec.tv_sec);
} else {
LOG_ERR("Failed to acquire SNTP, code %d\n", rc);
}
return rc;
}
static int resolve_broker_addr(struct sockaddr_in *broker)
{
int ret;
struct zsock_addrinfo *ai = NULL;
const struct zsock_addrinfo hints = {
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM,
.ai_protocol = 0,
};
ret = zsock_getaddrinfo(CONFIG_AWS_ENDPOINT, AWS_BROKER_PORT, &hints, &ai);
if (ret == 0) {
char addr_str[INET_ADDRSTRLEN];
memcpy(broker, ai->ai_addr, MIN(ai->ai_addrlen, sizeof(struct sockaddr_storage)));
zsock_inet_ntop(AF_INET, &broker->sin_addr, addr_str, sizeof(addr_str));
LOG_INF("Resolved: %s:%u", addr_str, htons(broker->sin_port));
} else {
LOG_ERR("failed to resolve hostname err = %d (errno = %d)", ret, errno);
}
zsock_freeaddrinfo(ai);
return ret;
}
int main(void)
{
#if defined(CONFIG_NET_DHCPV4)
app_dhcpv4_startup();
#endif
sntp_sync_time();
setup_credentials();
for (;;) {
resolve_broker_addr(&aws_broker);
aws_client_loop();
#if defined(CONFIG_MBEDTLS_MEMORY_DEBUG)
size_t cur_used, cur_blocks, max_used, max_blocks;
mbedtls_memory_buffer_alloc_cur_get(&cur_used, &cur_blocks);
mbedtls_memory_buffer_alloc_max_get(&max_used, &max_blocks);
LOG_INF("mbedTLS heap usage: MAX %u/%u (%u) CUR %u (%u)", max_used,
CONFIG_MBEDTLS_HEAP_SIZE, max_blocks, cur_used, cur_blocks);
#endif
k_sleep(K_SECONDS(1));
}
return 0;
}