Skip to content

Commit

Permalink
provide setting of socket recv buffer size on raw ethernet sockets
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Jordense <[email protected]>
  • Loading branch information
MarcelJordense committed Feb 2, 2024
1 parent 1c9bc19 commit 65d8b6f
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 45 deletions.
14 changes: 11 additions & 3 deletions src/core/ddsi/src/ddsi__tran.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ typedef int (*ddsi_is_loopbackaddr_fn_t) (const struct ddsi_tran_factory *tran,
typedef int (*ddsi_is_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const ddsi_locator_t *loc);
typedef int (*ddsi_is_ssm_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const ddsi_locator_t *loc);
typedef int (*ddsi_is_valid_port_fn_t) (const struct ddsi_tran_factory *tran, uint32_t port);
typedef uint32_t (*ddsi_receive_buffer_size_fn_t) (const struct ddsi_tran_factory *fact);
typedef uint32_t (*m_get_locator_port_fn_t) (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc);
typedef void (*m_set_locator_port_fn_t) (const struct ddsi_tran_factory *factory, ddsi_locator_t *loc, uint32_t port);
typedef uint32_t (*m_get_locator_aux_fn_t) (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc);
Expand Down Expand Up @@ -225,7 +224,6 @@ struct ddsi_tran_factory
ddsi_locator_to_string_fn_t m_locator_to_string_fn;
ddsi_enumerate_interfaces_fn_t m_enumerate_interfaces_fn;
ddsi_is_valid_port_fn_t m_is_valid_port_fn;
ddsi_receive_buffer_size_fn_t m_receive_buffer_size_fn;
ddsi_locator_from_sockaddr_fn_t m_locator_from_sockaddr_fn;
m_get_locator_port_fn_t m_get_locator_port_fn;
m_set_locator_port_fn_t m_set_locator_port_fn;
Expand Down Expand Up @@ -262,6 +260,10 @@ struct ddsi_tran_factory
/// no default address exists.
const char *m_default_spdp_address;

/// Actual minimum receive buffer size in use
/// Atomically loaded/stored so we don't have to lie about constness
ddsrt_atomic_uint32_t m_receive_buf_size;

struct ddsi_domaingv *gv;

/* Relationships */
Expand Down Expand Up @@ -330,7 +332,7 @@ inline int ddsi_is_valid_port (const struct ddsi_tran_factory *factory, uint32_t

/** @component transport */
inline uint32_t ddsi_receive_buffer_size (const struct ddsi_tran_factory *factory) {
return factory->m_receive_buffer_size_fn (factory);
return ddsrt_atomic_ld32 (&factory->m_receive_buf_size);
}

/** @component transport */
Expand Down Expand Up @@ -452,6 +454,12 @@ void ddsi_listener_unblock (struct ddsi_tran_listener * listener);
/** @component transport */
void ddsi_listener_free (struct ddsi_tran_listener * listener);

/** @component transport */
dds_return_t ddsi_tran_set_rcvbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size);

/** @component transport */
dds_return_t ddsi_tran_set_sndbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size);

#if defined (__cplusplus)
}
#endif
Expand Down
23 changes: 15 additions & 8 deletions src/core/ddsi/src/ddsi_raweth.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ typedef struct ddsi_raweth_conn {
int m_ifindex;
} *ddsi_raweth_conn_t;


struct ddsi_ethernet_header {
unsigned char dmac[ETH_ALEN];
unsigned char smac[ETH_ALEN];
Expand Down Expand Up @@ -304,6 +303,18 @@ static dds_return_t ddsi_raweth_create_conn (struct ddsi_tran_conn **conn_out, s
return DDS_RETCODE_ERROR;
}

if ((rc = ddsi_tran_set_rcvbuf(fact, sock, &gv->config.socket_rcvbuf_size, 1048576)) < 0)
{
ddsrt_close(sock);
return DDS_RETCODE_ERROR;
}

if ((rc = ddsi_tran_set_sndbuf(fact, sock, &gv->config.socket_sndbuf_size, 65536)) < 0)
{
ddsrt_close(sock);
return DDS_RETCODE_ERROR;
}

memset(&addr, 0, sizeof(addr));
addr.sll_family = AF_PACKET;
addr.sll_protocol = htons(ETH_P_ALL);
Expand Down Expand Up @@ -529,12 +540,6 @@ static int ddsi_raweth_is_valid_port (const struct ddsi_tran_factory *fact, uint
return (eport >= 1 && eport <= 65535) && (vlanid < 4095) && vlancfi == 0;
}

static uint32_t ddsi_raweth_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}

static int ddsi_raweth_locator_from_sockaddr (const struct ddsi_tran_factory *tran, ddsi_locator_t *loc, const struct sockaddr *sockaddr)
{
(void) tran;
Expand Down Expand Up @@ -598,17 +603,19 @@ int ddsi_raweth_init (struct ddsi_domaingv *gv)
fact->m_locator_to_string_fn = ddsi_raweth_to_string;
fact->m_enumerate_interfaces_fn = ddsi_raweth_enumerate_interfaces;
fact->m_is_valid_port_fn = ddsi_raweth_is_valid_port;
fact->m_receive_buffer_size_fn = ddsi_raweth_receive_buffer_size;
fact->m_locator_from_sockaddr_fn = ddsi_raweth_locator_from_sockaddr;
fact->m_get_locator_port_fn = ddsi_raweth_get_locator_port;
fact->m_set_locator_port_fn = ddsi_raweth_set_locator_port;
fact->m_get_locator_aux_fn = ddsi_raweth_get_locator_aux;
fact->m_set_locator_aux_fn = ddsi_raweth_set_locator_aux;
ddsrt_atomic_st32 (&fact->m_receive_buf_size, UINT32_MAX);

ddsi_factory_add (gv, fact);
GVLOG (DDS_LC_CONFIG, "raweth initialized\n");
return 0;
}


#else

int ddsi_raweth_init (struct ddsi_domaingv *gv) { (void) gv; return 0; }
Expand Down
9 changes: 2 additions & 7 deletions src/core/ddsi/src/ddsi_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1182,12 +1182,6 @@ static int ddsi_tcp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_
return (0 < port && port <= 65535);
}

static uint32_t ddsi_tcp_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}

static char *ddsi_tcp_locator_to_string (char *dst, size_t sizeof_dst, const ddsi_locator_t *loc, struct ddsi_tran_conn * conn, int with_port)
{
(void) conn;
Expand Down Expand Up @@ -1240,9 +1234,10 @@ int ddsi_tcp_init (struct ddsi_domaingv *gv)
fact->fact.m_is_ssm_mcaddr_fn = ddsi_tcp_is_ssm_mcaddr;
fact->fact.m_is_nearby_address_fn = ddsi_tcp_is_nearby_address;
fact->fact.m_is_valid_port_fn = ddsi_tcp_is_valid_port;
fact->fact.m_receive_buffer_size_fn = ddsi_tcp_receive_buffer_size;
fact->fact.m_locator_from_sockaddr_fn = ddsi_tcp_locator_from_sockaddr;

ddsrt_atomic_st32 (&fact->fact.m_receive_buf_size, 0);

#if DDSRT_HAVE_IPV6
if (gv->config.transport_selector == DDSI_TRANS_TCP6)
{
Expand Down
119 changes: 119 additions & 0 deletions src/core/ddsi/src/ddsi_tran.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/string.h"
#include "dds/ddsrt/ifaddrs.h"
#include "dds/ddsrt/sockets.h"
#include "dds/ddsi/ddsi_log.h"
#include "dds/ddsi/ddsi_domaingv.h"
#include "ddsi__tran.h"
Expand Down Expand Up @@ -407,3 +408,121 @@ int ddsi_enumerate_interfaces (struct ddsi_tran_factory * factory, enum ddsi_tra
{
return factory->m_enumerate_interfaces_fn (factory, transport_selector, interfs);
}


















static dds_return_t set_socket_buffer (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, int32_t socket_option, const char *socket_option_name, const char *name, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
// if (min, max)= and initbuf= then request= and result=
// (def, def) < defmin defmin whatever it is
// (def, N) anything N whatever it is
// (M, def) < M M error if < M
// (M, N<M) < M M error if < M
// (M, N>=M) anything N error if < M
// defmin = 1MB for receive buffer, 0B for send buffer
const bool always_set_size = // whether to call setsockopt unconditionally
((config->min.isdefault && !config->max.isdefault) ||
(!config->min.isdefault && !config->max.isdefault && config->max.value >= config->min.value));
const uint32_t socket_min_buf_size = // error if it ends up below this
!config->min.isdefault ? config->min.value : 0;
const uint32_t socket_req_buf_size = // size to request
(!config->max.isdefault && config->max.value > socket_min_buf_size) ? config->max.value
: !config->min.isdefault ? config->min.value
: default_min_size;

uint32_t actsize;
socklen_t optlen = (socklen_t) sizeof (actsize);
dds_return_t rc;

rc = ddsrt_getsockopt (sock, SOL_SOCKET, socket_option, &actsize, &optlen);
if (rc == DDS_RETCODE_BAD_PARAMETER || rc == DDS_RETCODE_UNSUPPORTED)
{
/* not all stacks support getting/setting RCVBUF */
GVLOG (DDS_LC_CONFIG, "cannot retrieve socket %s buffer size\n", name);
return DDS_RETCODE_OK;
}
else if (rc != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: get %s failed: %s\n", socket_option_name, dds_strretcode (rc));
return rc;
}

if (always_set_size || actsize < socket_req_buf_size)
{
(void) ddsrt_setsockopt (sock, SOL_SOCKET, socket_option, &socket_req_buf_size, sizeof (actsize));

/* We don't check the return code from setsockopt, because some O/Ss tend
to silently cap the buffer size. The only way to make sure is to read
the option value back and check it is now set correctly. */
if ((rc = ddsrt_getsockopt (sock, SOL_SOCKET, socket_option, &actsize, &optlen)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: get %s failed: %s\n", socket_option_name, dds_strretcode (rc));
return rc;
}

if (actsize >= socket_req_buf_size)
GVLOG (DDS_LC_CONFIG, "socket %s buffer size set to %"PRIu32" bytes\n", name, actsize);
else if (actsize >= socket_min_buf_size)
GVLOG (DDS_LC_CONFIG,
"failed to increase socket %s buffer size to %"PRIu32" bytes, continuing with %"PRIu32" bytes\n",
name, socket_req_buf_size, actsize);
else
{
/* If the configuration states it must be >= X, then error out if the
kernel doesn't give us at least X */
GVLOG (DDS_LC_CONFIG | DDS_LC_ERROR,
"failed to increase socket %s buffer size to at least %"PRIu32" bytes, current is %"PRIu32" bytes\n",
name, socket_min_buf_size, actsize);
rc = DDS_RETCODE_NOT_ENOUGH_SPACE;
}
}

return (rc < 0) ? rc : (actsize > (uint32_t) INT32_MAX) ? INT32_MAX : (int32_t) actsize;
}

dds_return_t ddsi_tran_set_rcvbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
dds_return_t rc;

if ((rc = set_socket_buffer (fact->gv, sock, SO_RCVBUF, "SO_RCVBUF", "receive", config, default_min_size)) < 0)
return rc;

if (rc > 0) {
// set fact->receive_buf_size to the smallest observed value
uint32_t old;
do {
old = ddsrt_atomic_ld32 (&fact->m_receive_buf_size);
if ((uint32_t) rc >= old)
break;
} while (!ddsrt_atomic_cas32 (&fact->m_receive_buf_size, old, (uint32_t) rc));
}
return rc;
}

dds_return_t ddsi_tran_set_sndbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
return set_socket_buffer (fact->gv, sock, SO_SNDBUF, "SO_SNDBUF", "send", config, default_min_size);
}








27 changes: 7 additions & 20 deletions src/core/ddsi/src/ddsi_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ static dds_return_t set_dont_route (struct ddsi_domaingv const * const gv, ddsrt
return rc;
}

#if 0
static dds_return_t set_socket_buffer (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, int32_t socket_option, const char *socket_option_name, const char *name, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
// if (min, max)= and initbuf= then request= and result=
Expand Down Expand Up @@ -359,6 +360,7 @@ static dds_return_t set_sndbuf (struct ddsi_domaingv const * const gv, ddsrt_soc
{
return set_socket_buffer (gv, sock, SO_SNDBUF, "SO_SNDBUF", "send", config, 65536);
}
#endif

static dds_return_t set_mc_options_transmit_ipv6 (struct ddsi_domaingv const * const gv, struct ddsi_network_interface const * const intf, ddsrt_socket_t sock)
{
Expand Down Expand Up @@ -513,20 +515,12 @@ static dds_return_t ddsi_udp_create_conn (struct ddsi_tran_conn **conn_out, stru
}
}

if ((rc = set_rcvbuf (gv, sock, &gv->config.socket_rcvbuf_size)) < 0)
if ((rc = ddsi_tran_set_rcvbuf(fact_cmn, sock, &gv->config.socket_rcvbuf_size, 1048576)) < 0)
goto fail_w_socket;
if (rc > 0) {
// set fact->receive_buf_size to the smallest observed value
uint32_t old;
do {
old = ddsrt_atomic_ld32 (&fact->receive_buf_size);
if ((uint32_t) rc >= old)
break;
} while (!ddsrt_atomic_cas32 (&fact->receive_buf_size, old, (uint32_t) rc));
}

if (set_sndbuf (gv, sock, &gv->config.socket_sndbuf_size) < 0)
goto fail_w_socket;
if ((rc = ddsi_tran_set_sndbuf(fact_cmn, sock, &gv->config.socket_sndbuf_size, 65536)) < 0)
goto fail_w_socket;

if (gv->config.dontRoute && set_dont_route (gv, sock, ipv6) != DDS_RETCODE_OK)
goto fail_w_socket;

Expand Down Expand Up @@ -865,12 +859,6 @@ static int ddsi_udp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_
return (0 < port && port <= 65535);
}

static uint32_t ddsi_udp_receive_buffer_size (const struct ddsi_tran_factory *fact_cmn)
{
const struct ddsi_udp_tran_factory *fact = (const struct ddsi_udp_tran_factory *) fact_cmn;
return ddsrt_atomic_ld32 (&fact->receive_buf_size);
}

static int ddsi_udp_locator_from_sockaddr (const struct ddsi_tran_factory *tran_cmn, ddsi_locator_t *loc, const struct sockaddr *sockaddr)
{
struct ddsi_udp_tran_factory const * const tran = (const struct ddsi_udp_tran_factory *) tran_cmn;
Expand Down Expand Up @@ -915,7 +903,6 @@ int ddsi_udp_init (struct ddsi_domaingv*gv)
fact->fact.m_locator_to_string_fn = ddsi_udp_locator_to_string;
fact->fact.m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces;
fact->fact.m_is_valid_port_fn = ddsi_udp_is_valid_port;
fact->fact.m_receive_buffer_size_fn = ddsi_udp_receive_buffer_size;
fact->fact.m_locator_from_sockaddr_fn = ddsi_udp_locator_from_sockaddr;
#if DDSRT_HAVE_IPV6
if (gv->config.transport_selector == DDSI_TRANS_UDP6)
Expand All @@ -925,7 +912,7 @@ int ddsi_udp_init (struct ddsi_domaingv*gv)
fact->fact.m_default_spdp_address = "udp6/ff02::ffff:239.255.0.1";
}
#endif
ddsrt_atomic_st32 (&fact->receive_buf_size, UINT32_MAX);
ddsrt_atomic_st32 (&fact->fact.m_receive_buf_size, UINT32_MAX);

ddsi_factory_add (gv, &fact->fact);
GVLOG (DDS_LC_CONFIG, "udp initialized\n");
Expand Down
9 changes: 2 additions & 7 deletions src/core/ddsi/src/ddsi_vnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@ static int ddsi_vnet_is_valid_port (const struct ddsi_tran_factory *fact, uint32
return true;
}

static uint32_t ddsi_vnet_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}

static int ddsi_vnet_locator_from_sockaddr (const struct ddsi_tran_factory *tran, ddsi_locator_t *loc, const struct sockaddr *sockaddr)
{
(void) sockaddr;
Expand Down Expand Up @@ -224,8 +218,9 @@ int ddsi_vnet_init (struct ddsi_domaingv *gv, const char *name, int32_t locator_
fact->m_base.m_locator_to_string_fn = ddsi_vnet_to_string;
fact->m_base.m_enumerate_interfaces_fn = ddsi_vnet_enumerate_interfaces;
fact->m_base.m_is_valid_port_fn = ddsi_vnet_is_valid_port;
fact->m_base.m_receive_buffer_size_fn = ddsi_vnet_receive_buffer_size;
fact->m_base.m_locator_from_sockaddr_fn = ddsi_vnet_locator_from_sockaddr;
ddsrt_atomic_st32 (&fact->m_base.m_receive_buf_size, 0);

ddsi_factory_add (gv, &fact->m_base);
GVLOG (DDS_LC_CONFIG, "vnet %s initialized\n", name);
return 0;
Expand Down

0 comments on commit 65d8b6f

Please sign in to comment.