Skip to content

Commit

Permalink
Fix handling of timeouts in amqp_consume().
Browse files Browse the repository at this point in the history
If messages were constantly received, this function would essentially
never return -- which violates the spirit of having a timeout parameter
to begin with.

To reliably consume "endlessly", use a while(TRUE) loop.
  • Loading branch information
atheriel committed Jul 14, 2020
1 parent 5c25ecc commit f776a1e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# longears 0.2.2.9000

- `amqp_listen()` will now respect the timeout value even when processing many
messages. To reliably consume "endlessly", use a `while(TRUE)` loop.

- Connections now set common client properties, which are visible in the
RabbitMQ management plugin web interface.

Expand Down
5 changes: 4 additions & 1 deletion src/consume.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <stdlib.h> /* for malloc */
#include <string.h> /* for memcpy */
#include <sys/time.h>
#include <time.h> /* for time() */

#include <amqp.h>
#include <amqp_tcp_socket.h>
Expand Down Expand Up @@ -131,6 +132,7 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout)
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
time_t start = time(NULL);

SEXP message, body;
SEXP R_fcall = PROTECT(allocList(2));
Expand Down Expand Up @@ -184,7 +186,7 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout)

switch (status) {
case AMQP_STATUS_TIMEOUT:
current_wait++;
/* OK. */
break;
case AMQP_STATUS_CONNECTION_CLOSED:
/* fallthrough */
Expand Down Expand Up @@ -245,6 +247,7 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout)
UNPROTECT(2);
}

current_wait = time(NULL) - start;
R_CheckUserInterrupt(); // Escape hatch.
}

Expand Down

0 comments on commit f776a1e

Please sign in to comment.