net: tcp: Rework data queueing API

Rework how data is queued for the TCP connections:
  * net_context no longer allocates net_pkt for TCP connections. This
    was not only inefficient (net_context has no knowledge of the TX
    window size), but also error-prone in certain configuration (for
    example when IP fragmentation was enabled, net_context may attempt
    to allocate enormous packet, instead of let the data be fragmented
    for the TCP stream.
  * Instead, implement already defined `net_tcp_queue()` API, which
    takes raw buffer and length. This allows to take TX window into
    account and also better manage the allocated net_buf's (like for
    example avoid allocation if there's still room in the buffer). In
    result, the TCP stack will not only no longer exceed the TX window,
    but also prevent empty gaps in allocated net_buf's, which should
    lead to less out-of-mem issues with the stack.
  * As net_pkt-based `net_tcp_queue_data()` is no longer in use, it was
    removed.

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2023-11-14 12:40:49 +01:00 committed by Fabio Baltieri
parent 16fd744c13
commit 9976ebb24b
4 changed files with 132 additions and 96 deletions

View file

@ -1726,7 +1726,7 @@ static int context_sendto(struct net_context *context,
{
const struct msghdr *msghdr = NULL;
struct net_if *iface;
struct net_pkt *pkt;
struct net_pkt *pkt = NULL;
size_t tmp_len;
int ret;
@ -1948,6 +1948,15 @@ static int context_sendto(struct net_context *context,
return -ENETDOWN;
}
context->send_cb = cb;
context->user_data = user_data;
if (IS_ENABLED(CONFIG_NET_TCP) &&
net_context_get_proto(context) == IPPROTO_TCP &&
!net_if_is_ip_offloaded(net_context_get_iface(context))) {
goto skip_alloc;
}
pkt = context_alloc_pkt(context, len, PKT_WAIT_TIME);
if (!pkt) {
NET_ERR("Failed to allocate net_pkt");
@ -1966,9 +1975,6 @@ static int context_sendto(struct net_context *context,
len = tmp_len;
}
context->send_cb = cb;
context->user_data = user_data;
if (IS_ENABLED(CONFIG_NET_CONTEXT_PRIORITY)) {
uint8_t priority;
@ -1990,6 +1996,7 @@ static int context_sendto(struct net_context *context,
}
}
skip_alloc:
if (IS_ENABLED(CONFIG_NET_OFFLOAD) &&
net_if_is_ip_offloaded(net_context_get_iface(context))) {
ret = context_write_data(pkt, buf, len, msghdr);
@ -2021,16 +2028,12 @@ static int context_sendto(struct net_context *context,
} else if (IS_ENABLED(CONFIG_NET_TCP) &&
net_context_get_proto(context) == IPPROTO_TCP) {
ret = context_write_data(pkt, buf, len, msghdr);
ret = net_tcp_queue(context, buf, len, msghdr);
if (ret < 0) {
goto fail;
}
net_pkt_cursor_init(pkt);
ret = net_tcp_queue_data(context, pkt);
if (ret < 0) {
goto fail;
}
len = ret;
ret = net_tcp_send_data(context, cb, user_data);
} else if (IS_ENABLED(CONFIG_NET_SOCKETS_PACKET) &&
@ -2086,7 +2089,9 @@ static int context_sendto(struct net_context *context,
return len;
fail:
net_pkt_unref(pkt);
if (pkt != NULL) {
net_pkt_unref(pkt);
}
return ret;
}

View file

@ -1347,6 +1347,49 @@ static int tcp_pkt_peek(struct net_pkt *to, struct net_pkt *from, size_t pos,
return net_pkt_copy(to, from, len);
}
static int tcp_pkt_append(struct net_pkt *pkt, const uint8_t *data, size_t len)
{
size_t alloc_len = len;
struct net_buf *buf = NULL;
int ret = 0;
if (pkt->buffer) {
buf = net_buf_frag_last(pkt->buffer);
if (len > net_buf_tailroom(buf)) {
alloc_len -= net_buf_tailroom(buf);
} else {
alloc_len = 0;
}
}
if (alloc_len > 0) {
ret = net_pkt_alloc_buffer_raw(pkt, alloc_len,
TCP_PKT_ALLOC_TIMEOUT);
if (ret < 0) {
return -ENOBUFS;
}
}
if (buf == NULL) {
buf = pkt->buffer;
}
while (buf != NULL && len > 0) {
size_t write_len = MIN(len, net_buf_tailroom(buf));
net_buf_add_mem(buf, data, write_len);
data += write_len;
len -= write_len;
buf = buf->frags;
}
NET_ASSERT(len == 0, "Not all bytes written");
return ret;
}
static bool tcp_window_full(struct tcp *conn)
{
bool window_full = (conn->send_data_total >= conn->send_win);
@ -3229,13 +3272,12 @@ int net_tcp_update_recv_wnd(struct net_context *context, int32_t delta)
return ret;
}
/* net_context queues the outgoing data for the TCP connection */
int net_tcp_queue_data(struct net_context *context, struct net_pkt *pkt)
int net_tcp_queue(struct net_context *context, const void *data, size_t len,
const struct msghdr *msg)
{
struct tcp *conn = context->tcp;
struct net_buf *orig_buf = NULL;
size_t queued_len = 0;
int ret = 0;
size_t len;
if (!conn || conn->state != TCP_ESTABLISHED) {
return -ENOTCONN;
@ -3252,72 +3294,69 @@ int net_tcp_queue_data(struct net_context *context, struct net_pkt *pkt)
goto out;
}
len = net_pkt_get_len(pkt);
if (msg) {
len = 0;
if (conn->send_data->buffer) {
orig_buf = net_buf_frag_last(conn->send_data->buffer);
for (int i = 0; i < msg->msg_iovlen; i++) {
len += msg->msg_iov[i].iov_len;
}
}
net_pkt_append_buffer(conn->send_data, pkt->buffer);
conn->send_data_total += len;
NET_DBG("conn: %p Queued %zu bytes (total %zu)", conn, len,
conn->send_data_total);
pkt->buffer = NULL;
/* Queue no more than TX window permits. It's guaranteed at this point
* that conn->send_data_total is less than conn->send_win, as it was
* verified in tcp_window_full() check above. As the connection mutex
* is held, their values shall not change since.
*/
len = MIN(conn->send_win - conn->send_data_total, len);
if (msg) {
for (int i = 0; i < msg->msg_iovlen; i++) {
int iovlen = MIN(msg->msg_iov[i].iov_len, len);
ret = tcp_pkt_append(conn->send_data,
msg->msg_iov[i].iov_base,
iovlen);
if (ret < 0) {
if (queued_len == 0) {
goto out;
} else {
break;
}
}
queued_len += iovlen;
len -= iovlen;
if (len == 0) {
break;
}
}
} else {
ret = tcp_pkt_append(conn->send_data, data, len);
if (ret < 0) {
goto out;
}
queued_len = len;
}
conn->send_data_total += queued_len;
/* Successfully queued data for transmission. Even if there's a transmit
* failure now (out-of-buf case), it can be ignored for now, retransmit
* timer will take care of queued data retransmission.
*/
ret = tcp_send_queued_data(conn);
if (ret < 0 && ret != -ENOBUFS) {
tcp_conn_close(conn, ret);
goto out;
}
if ((ret == -ENOBUFS) &&
(conn->send_data_total < (conn->unacked_len + len))) {
/* Some of the data has been sent, we cannot remove the
* whole chunk, the remainder portion is already
* in the send_data and will be transmitted upon a
* received ack or the next send call
*
* Set the return code back to 0 to pretend we just
* transmitted the chunk
*/
ret = 0;
if (tcp_window_full(conn)) {
(void)k_sem_take(&conn->tx_sem, K_NO_WAIT);
}
if (ret == -ENOBUFS) {
/* Restore the original data so that we do not resend the pkt
* data multiple times.
*/
conn->send_data_total -= len;
if (orig_buf) {
pkt->buffer = orig_buf->frags;
orig_buf->frags = NULL;
} else {
pkt->buffer = conn->send_data->buffer;
conn->send_data->buffer = NULL;
}
/* If we have out-of-bufs case, and the send_data buffer has
* become empty, till the retransmit timer, as there is no
* data to retransmit.
* The socket layer will catch this and resend data if needed.
* Only perform this when it is just the newly added packet,
* otherwise it can disrupt any pending transmission
*/
if (conn->send_data_total == 0) {
NET_DBG("No bufs, cancelling retransmit timer");
k_work_cancel_delayable(&conn->send_data_timer);
}
} else {
if (tcp_window_full(conn)) {
(void)k_sem_take(&conn->tx_sem, K_NO_WAIT);
}
/* We should not free the pkt if there was an error. It will be
* freed in net_context.c:context_sendto()
*/
tcp_pkt_unref(pkt);
}
ret = queued_len;
out:
k_mutex_unlock(&conn->lock);
@ -3674,7 +3713,9 @@ static size_t tp_tcp_recv_cb(struct tcp *conn, struct net_pkt *pkt)
net_pkt_pull(up, net_pkt_get_len(up) - len);
net_tcp_queue_data(conn->context, up);
for (struct net_buf *buf = pkt->buffer; buf != NULL; buf = buf->frags) {
net_tcp_queue(conn->context, buf->data, buf->len);
}
return len;
}
@ -3817,12 +3858,7 @@ enum net_verdict tp_input(struct net_conn *net_conn,
responded = true;
NET_DBG("tcp_send(\"%s\")", tp->data);
{
struct net_pkt *data_pkt;
data_pkt = tcp_pkt_alloc(conn, len);
net_pkt_write(data_pkt, buf, len);
net_pkt_cursor_init(data_pkt);
net_tcp_queue_data(conn->context, data_pkt);
net_tcp_queue(conn->context, buf, len);
}
}
break;

View file

@ -31,6 +31,7 @@
extern "C" {
#endif
#include <errno.h>
#include <sys/types.h>
/**
@ -72,18 +73,7 @@ int net_tcp_listen(struct net_context *context);
*/
int net_tcp_accept(struct net_context *context, net_tcp_accept_cb_t cb,
void *user_data);
/**
* @brief Enqueue data for transmission
*
* @param context Network context
* @param buf Pointer to the data
* @param len Number of bytes
* @param msghdr Data for a vector array operation
*
* @return 0 if ok, < 0 if error
*/
int net_tcp_queue(struct net_context *context, const void *buf, size_t len,
const struct msghdr *msghdr);
/* TODO: split into 2 functions, conn -> context, queue -> send? */
/* The following functions are provided solely for the compatibility
@ -112,7 +102,6 @@ void net_tcp_init(void);
#define net_tcp_init(...)
#endif
int net_tcp_update_recv_wnd(struct net_context *context, int32_t delta);
int net_tcp_queue_data(struct net_context *context, struct net_pkt *pkt);
int net_tcp_finalize(struct net_pkt *pkt, bool force_chksum);
#if defined(CONFIG_NET_TEST_PROTOCOL)

View file

@ -283,21 +283,27 @@ struct net_tcp_hdr *net_tcp_input(struct net_pkt *pkt,
#endif
/**
* @brief Enqueue a single packet for transmission
* @brief Enqueue data for transmission
*
* @param context TCP context
* @param pkt Packet
* @param context Network context
* @param data Pointer to the data
* @param len Number of bytes
* @param msg Data for a vector array operation
*
* @return 0 if ok, < 0 if error
*/
#if defined(CONFIG_NET_NATIVE_TCP)
int net_tcp_queue_data(struct net_context *context, struct net_pkt *pkt);
int net_tcp_queue(struct net_context *context, const void *data, size_t len,
const struct msghdr *msg);
#else
static inline int net_tcp_queue_data(struct net_context *context,
struct net_pkt *pkt)
static inline int net_tcp_queue(struct net_context *context, const void *data,
size_t len, const struct msghdr *msg)
{
ARG_UNUSED(context);
ARG_UNUSED(pkt);
ARG_UNUSED(data);
ARG_UNUSED(len);
ARG_UNUSED(msg);
return -EPROTONOSUPPORT;
}
#endif