net: zperf: Convert UDP receiver to use socket services

Use socket services API for UDP receiver.

Signed-off-by: Jukka Rissanen <jukka.rissanen@nordicsemi.no>
This commit is contained in:
Jukka Rissanen 2024-01-17 14:57:43 +02:00 committed by Alberto Escolar
parent 07823f7710
commit 1e8cbd5d43
3 changed files with 86 additions and 93 deletions

View file

@ -222,7 +222,6 @@ static int zperf_init(void)
zperf_udp_uploader_init();
zperf_tcp_uploader_init();
zperf_udp_receiver_init();
zperf_session_init();

View file

@ -103,7 +103,6 @@ uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps);
void zperf_async_work_submit(struct k_work *work);
void zperf_udp_uploader_init(void);
void zperf_tcp_uploader_init(void);
void zperf_udp_receiver_init(void);
void zperf_shell_init(void);

View file

@ -13,6 +13,7 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/socket_service.h>
#include <zephyr/net/zperf.h>
#include "zperf_internal.h"
@ -25,14 +26,6 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
static struct sockaddr_in6 *in6_addr_my;
static struct sockaddr_in *in4_addr_my;
#if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
#define UDP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8)
#else
#define UDP_RECEIVER_THREAD_PRIORITY K_PRIO_PREEMPT(8)
#endif
#define UDP_RECEIVER_STACK_SIZE 2048
#define SOCK_ID_IPV4 0
#define SOCK_ID_IPV6 1
#define SOCK_ID_MAX 2
@ -40,9 +33,6 @@ static struct sockaddr_in *in4_addr_my;
#define UDP_RECEIVER_BUF_SIZE 1500
#define POLL_TIMEOUT_MS 100
static K_THREAD_STACK_DEFINE(udp_receiver_stack_area, UDP_RECEIVER_STACK_SIZE);
static struct k_thread udp_receiver_thread_data;
static zperf_callback udp_session_cb;
static void *udp_user_data;
static bool udp_server_running;
@ -51,6 +41,13 @@ static uint16_t udp_server_port;
static struct sockaddr udp_server_addr;
static K_SEM_DEFINE(udp_server_run, 0, 1);
struct zsock_pollfd fds[SOCK_ID_MAX] = { 0 };
static void udp_svc_handler(struct k_work *work);
NET_SOCKET_SERVICE_SYNC_DEFINE_STATIC(svc_udp, NULL, udp_svc_handler,
SOCK_ID_MAX);
static inline void build_reply(struct zperf_udp_datagram *hdr,
struct zperf_server_hdr *stat,
uint8_t *buf)
@ -230,10 +227,79 @@ static void udp_received(int sock, const struct sockaddr *addr, uint8_t *data,
}
}
static void udp_server_session(void)
static int udp_recv_data(struct net_socket_service_event *pev)
{
static uint8_t buf[UDP_RECEIVER_BUF_SIZE];
struct zsock_pollfd fds[SOCK_ID_MAX] = { 0 };
int i, ret = 0;
int family;
struct sockaddr addr;
socklen_t optlen = sizeof(int);
socklen_t addrlen = sizeof(addr);
if (udp_server_stop) {
ret = -ENOENT;
goto cleanup;
}
if ((pev->event.revents & ZSOCK_POLLERR) ||
(pev->event.revents & ZSOCK_POLLNVAL)) {
(void)zsock_getsockopt(pev->event.fd, SOL_SOCKET,
SO_DOMAIN, &family, &optlen);
NET_ERR("UDP receiver IPv%d socket error",
family == AF_INET ? 4 : 6);
goto error;
}
if (!(pev->event.revents & ZSOCK_POLLIN)) {
return 0;
}
ret = zsock_recvfrom(pev->event.fd, buf, sizeof(buf), 0,
&addr, &addrlen);
if (ret < 0) {
(void)zsock_getsockopt(pev->event.fd, SOL_SOCKET,
SO_DOMAIN, &family, &optlen);
NET_ERR("recv failed on IPv%d socket (%d)",
family == AF_INET ? 4 : 6, errno);
goto error;
}
udp_received(pev->event.fd, &addr, buf, ret);
return ret;
error:
if (udp_session_cb != NULL) {
udp_session_cb(ZPERF_SESSION_ERROR, NULL, udp_user_data);
}
cleanup:
for (i = 0; i < ARRAY_SIZE(fds); i++) {
if (fds[i].fd >= 0) {
zsock_close(fds[i].fd);
fds[i].fd = -1;
}
}
(void)net_socket_service_unregister(&svc_udp);
return ret;
}
static void udp_svc_handler(struct k_work *work)
{
struct net_socket_service_event *pev =
CONTAINER_OF(work, struct net_socket_service_event, work);
int ret;
ret = udp_recv_data(pev);
if (ret < 0) {
(void)net_socket_service_unregister(&svc_udp);
}
}
static void zperf_udp_receiver_init(void)
{
int ret;
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
@ -341,87 +407,14 @@ use_any_ipv6:
NET_INFO("Listening on port %d", udp_server_port);
while (true) {
ret = zsock_poll(fds, ARRAY_SIZE(fds), POLL_TIMEOUT_MS);
if (ret < 0) {
NET_ERR("UDP receiver poll error (%d)", errno);
goto error;
}
if (udp_server_stop) {
goto cleanup;
}
if (ret == 0) {
continue;
}
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
struct sockaddr addr;
socklen_t addrlen = sizeof(addr);
if ((fds[i].revents & ZSOCK_POLLERR) ||
(fds[i].revents & ZSOCK_POLLNVAL)) {
NET_ERR("UDP receiver IPv%d socket error",
(i == SOCK_ID_IPV4) ? 4 : 6);
goto error;
}
if (!(fds[i].revents & ZSOCK_POLLIN)) {
continue;
}
ret = zsock_recvfrom(fds[i].fd, buf, sizeof(buf), 0,
&addr, &addrlen);
if (ret < 0) {
NET_ERR("recv failed on IPv%d socket (%d)",
(i == SOCK_ID_IPV4) ? 4 : 6, errno);
goto error;
}
udp_received(fds[i].fd, &addr, buf, ret);
}
ret = net_socket_service_register(&svc_udp, fds,
ARRAY_SIZE(fds), NULL);
if (ret < 0) {
LOG_ERR("Cannot register socket service handler (%d)", ret);
}
error:
if (udp_session_cb != NULL) {
udp_session_cb(ZPERF_SESSION_ERROR, NULL, udp_user_data);
}
cleanup:
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
if (fds[i].fd >= 0) {
zsock_close(fds[i].fd);
}
}
}
static void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
{
ARG_UNUSED(ptr1);
ARG_UNUSED(ptr2);
ARG_UNUSED(ptr3);
while (true) {
k_sem_take(&udp_server_run, K_FOREVER);
udp_server_session();
udp_server_running = false;
}
}
void zperf_udp_receiver_init(void)
{
k_thread_create(&udp_receiver_thread_data,
udp_receiver_stack_area,
K_THREAD_STACK_SIZEOF(udp_receiver_stack_area),
udp_receiver_thread,
NULL, NULL, NULL,
UDP_RECEIVER_THREAD_PRIORITY,
IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
K_INHERIT_PERMS : 0,
K_NO_WAIT);
return;
}
int zperf_udp_download(const struct zperf_download_params *param,
@ -442,6 +435,8 @@ int zperf_udp_download(const struct zperf_download_params *param,
udp_server_stop = false;
memcpy(&udp_server_addr, &param->addr, sizeof(struct sockaddr));
zperf_udp_receiver_init();
k_sem_give(&udp_server_run);
return 0;