From d3a6d5ef28853e7379209d8c5303a4171067e9ac Mon Sep 17 00:00:00 2001 From: oko256 <139907514+oko256@users.noreply.github.com> Date: Thu, 3 Oct 2024 02:26:00 +0300 Subject: [PATCH] Adjust reap interval based on timeouts Hardcoded reap interval of 1 second causes evasive/silent/expired timeouts of less than 1 second to be completely ineffective. Choose the smallest timeout for reap interval instead and only use the hardcoded value as the maximum reap interval when big timeouts are being used. --- src/zyre.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++++ src/zyre_node.c | 27 +++++++++++++++-------- 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/src/zyre.c b/src/zyre.c index 62001f2d..e11c69b9 100644 --- a/src/zyre.c +++ b/src/zyre.c @@ -890,6 +890,64 @@ zyre_test (bool verbose) zstr_free (&command); zmsg_destroy (&msg); + // First node should also receive ENTER and JOINs from second node + msg = zyre_recv (node1); + assert (msg); + command = zmsg_popstr (msg); + assert (streq (command, "ENTER")); + zstr_free (&command); + assert (zmsg_size (msg) == 4); + peerid = zmsg_popstr (msg); + name = zmsg_popstr (msg); + assert (streq (name, "node2")); + zstr_free (&peerid); + zstr_free (&name); + zmsg_destroy (&msg); + + msg = zyre_recv (node1); + assert (msg); + command = zmsg_popstr (msg); + assert (streq (command, "JOIN")); + zstr_free (&command); + assert (zmsg_size (msg) == 3); + zmsg_destroy (&msg); + + msg = zyre_recv (node1); + assert (msg); + command = zmsg_popstr (msg); + assert (streq (command, "JOIN")); + zstr_free (&command); + assert (zmsg_size (msg) == 3); + zmsg_destroy (&msg); + + // Test evasive timeout + const int evasive_test_interval = 100; + zyre_set_evasive_timeout (node1, evasive_test_interval); + // Refresh peers to apply new timeouts immediately + zyre_shouts (node1, "GLOBAL", "Hello again"); + zyre_shouts (node2, "GLOBAL", "Hello again"); + msg = zyre_recv (node1); + assert (msg); + command = zmsg_popstr (msg); + assert (streq (command, "SHOUT")); + zstr_free (&command); + zmsg_destroy (&msg); + msg = zyre_recv (node2); + assert (msg); + command = zmsg_popstr (msg); + assert (streq (command, "SHOUT")); + zstr_free (&command); + zmsg_destroy (&msg); + + int64_t recv_start = zclock_mono (); + msg = zyre_recv (node1); + assert ((zclock_mono () - recv_start) < (evasive_test_interval + 100)); + assert (msg); + command = zmsg_popstr (msg); + assert (streq (command, "EVASIVE")); + zstr_free (&command); + zmsg_destroy (&msg); + zyre_stop (node2); msg = zyre_recv (node2); diff --git a/src/zyre_node.c b/src/zyre_node.c index 0507549f..e58101c2 100644 --- a/src/zyre_node.c +++ b/src/zyre_node.c @@ -91,6 +91,17 @@ s_string_compare (void *item1, void *item2) return strcmp (str1, str2); } +static int64_t +s_reap_interval (zyre_node_t *self) +{ + uint64_t interval = self->evasive_timeout; + if (self->expired_timeout < interval) + interval = self->expired_timeout; + if (interval > REAP_INTERVAL) + interval = REAP_INTERVAL; + return interval; +} + // -------------------------------------------------------------------------- // Constructor @@ -1547,7 +1558,7 @@ zyre_node_ping_peer (const char *key, void *item, void *argument) zstr_sendm (self->outbox, "EVASIVE"); zstr_sendm (self->outbox, zyre_peer_identity (peer)); zstr_send (self->outbox, zyre_peer_name (peer)); - if (zclock_mono () >= zyre_peer_evasive_at (peer) + REAP_INTERVAL) { + if (zclock_mono () >= zyre_peer_evasive_at (peer) + s_reap_interval (self)) { // Inform the calling application this peer is being silent // despite having tried to ping it. Something is wrong with // the connection to this peer (or with the network). @@ -1555,7 +1566,7 @@ zyre_node_ping_peer (const char *key, void *item, void *argument) // before getting ping result and thus has poor meaning. if (self->verbose) zsys_info ("(%s) peer '%s' has not answered ping after %d milliseconds (silent)", - self->name, zyre_peer_name(peer), REAP_INTERVAL); + self->name, zyre_peer_name(peer), s_reap_interval (self)); zstr_sendm (self->outbox, "SILENT"); zstr_sendm (self->outbox, zyre_peer_identity (peer)); zstr_send (self->outbox, zyre_peer_name (peer)); @@ -1581,7 +1592,7 @@ zyre_node_actor (zsock_t *pipe, void *args) zsock_signal (self->pipe, 0); // Loop until the agent is terminated one way or another - int64_t reap_at = zclock_mono () + REAP_INTERVAL; + int64_t last_reaped_at = zclock_mono (); while (!self->terminated) { // Start beacon as soon as we can @@ -1635,10 +1646,8 @@ zyre_node_actor (zsock_t *pipe, void *args) zstr_free(&hostname); } - int timeout = (int) (reap_at - zclock_mono ()); - if (timeout > REAP_INTERVAL) - timeout = REAP_INTERVAL; - else + // If nothing else happens, wait until the next reap + int timeout = (int) ((last_reaped_at + s_reap_interval (self)) - zclock_mono ()); if (timeout < 0) timeout = 0; @@ -1661,9 +1670,9 @@ zyre_node_actor (zsock_t *pipe, void *args) break; // Interrupted, check before expired else if (zpoller_expired (self->poller)) { - if (zclock_mono () >= reap_at) { + if (zclock_mono () >= (last_reaped_at + s_reap_interval (self))) { void *item; - reap_at = zclock_mono () + REAP_INTERVAL; + last_reaped_at = zclock_mono (); // Ping all peers and reap any expired ones for (item = zhash_first (self->peers); item != NULL; item = zhash_next (self->peers))