Skip to content

Commit

Permalink
Merge pull request #744 from oko256/reap-interval
Browse files Browse the repository at this point in the history
Adjust reap interval based on timeouts
  • Loading branch information
sphaero authored Oct 3, 2024
2 parents 0fdb8c4 + d3a6d5e commit dcd74b5
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 9 deletions.
58 changes: 58 additions & 0 deletions src/zyre.c
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,64 @@ zyre_test (bool verbose)
zstr_free (&command);
zmsg_destroy (&msg);

// First node should also receive ENTER and JOINs from second node
msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "ENTER"));
zstr_free (&command);
assert (zmsg_size (msg) == 4);
peerid = zmsg_popstr (msg);
name = zmsg_popstr (msg);
assert (streq (name, "node2"));
zstr_free (&peerid);
zstr_free (&name);
zmsg_destroy (&msg);

msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "JOIN"));
zstr_free (&command);
assert (zmsg_size (msg) == 3);
zmsg_destroy (&msg);

msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "JOIN"));
zstr_free (&command);
assert (zmsg_size (msg) == 3);
zmsg_destroy (&msg);

// Test evasive timeout
const int evasive_test_interval = 100;
zyre_set_evasive_timeout (node1, evasive_test_interval);
// Refresh peers to apply new timeouts immediately
zyre_shouts (node1, "GLOBAL", "Hello again");
zyre_shouts (node2, "GLOBAL", "Hello again");
msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "SHOUT"));
zstr_free (&command);
zmsg_destroy (&msg);
msg = zyre_recv (node2);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "SHOUT"));
zstr_free (&command);
zmsg_destroy (&msg);

int64_t recv_start = zclock_mono ();
msg = zyre_recv (node1);
assert ((zclock_mono () - recv_start) < (evasive_test_interval + 100));
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "EVASIVE"));
zstr_free (&command);
zmsg_destroy (&msg);

zyre_stop (node2);

msg = zyre_recv (node2);
Expand Down
27 changes: 18 additions & 9 deletions src/zyre_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ s_string_compare (void *item1, void *item2)
return strcmp (str1, str2);
}

static int64_t
s_reap_interval (zyre_node_t *self)
{
uint64_t interval = self->evasive_timeout;
if (self->expired_timeout < interval)
interval = self->expired_timeout;
if (interval > REAP_INTERVAL)
interval = REAP_INTERVAL;
return interval;
}

// --------------------------------------------------------------------------
// Constructor

Expand Down Expand Up @@ -1547,15 +1558,15 @@ zyre_node_ping_peer (const char *key, void *item, void *argument)
zstr_sendm (self->outbox, "EVASIVE");
zstr_sendm (self->outbox, zyre_peer_identity (peer));
zstr_send (self->outbox, zyre_peer_name (peer));
if (zclock_mono () >= zyre_peer_evasive_at (peer) + REAP_INTERVAL) {
if (zclock_mono () >= zyre_peer_evasive_at (peer) + s_reap_interval (self)) {
// Inform the calling application this peer is being silent
// despite having tried to ping it. Something is wrong with
// the connection to this peer (or with the network).
// NB: this is an improvement of the EVASIVE event which triggers
// before getting ping result and thus has poor meaning.
if (self->verbose)
zsys_info ("(%s) peer '%s' has not answered ping after %d milliseconds (silent)",
self->name, zyre_peer_name(peer), REAP_INTERVAL);
self->name, zyre_peer_name(peer), s_reap_interval (self));
zstr_sendm (self->outbox, "SILENT");
zstr_sendm (self->outbox, zyre_peer_identity (peer));
zstr_send (self->outbox, zyre_peer_name (peer));
Expand All @@ -1581,7 +1592,7 @@ zyre_node_actor (zsock_t *pipe, void *args)
zsock_signal (self->pipe, 0);

// Loop until the agent is terminated one way or another
int64_t reap_at = zclock_mono () + REAP_INTERVAL;
int64_t last_reaped_at = zclock_mono ();
while (!self->terminated) {

// Start beacon as soon as we can
Expand Down Expand Up @@ -1635,10 +1646,8 @@ zyre_node_actor (zsock_t *pipe, void *args)
zstr_free(&hostname);
}

int timeout = (int) (reap_at - zclock_mono ());
if (timeout > REAP_INTERVAL)
timeout = REAP_INTERVAL;
else
// If nothing else happens, wait until the next reap
int timeout = (int) ((last_reaped_at + s_reap_interval (self)) - zclock_mono ());
if (timeout < 0)
timeout = 0;

Expand All @@ -1661,9 +1670,9 @@ zyre_node_actor (zsock_t *pipe, void *args)
break; // Interrupted, check before expired
else
if (zpoller_expired (self->poller)) {
if (zclock_mono () >= reap_at) {
if (zclock_mono () >= (last_reaped_at + s_reap_interval (self))) {
void *item;
reap_at = zclock_mono () + REAP_INTERVAL;
last_reaped_at = zclock_mono ();
// Ping all peers and reap any expired ones
for (item = zhash_first (self->peers); item != NULL;
item = zhash_next (self->peers))
Expand Down

0 comments on commit dcd74b5

Please sign in to comment.