From 11757bf48313ef457a51ae7d4175f971d73fa8e6 Mon Sep 17 00:00:00 2001 From: Tiejun Zhou Date: Tue, 5 Mar 2024 09:11:48 +0000 Subject: [PATCH 1/2] Fix timeout of send thread in Async client for v5 When the outgoing packets fails to be sent due to hitting the limitation of the inflight maximum, it will wait on send_cond/send_sem. Send thread should be woken up on PUBACK message received. So it does not need to wait until timeout (1 second). Signed-off-by: Tiejun Zhou --- src/MQTTAsyncUtils.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/MQTTAsyncUtils.c b/src/MQTTAsyncUtils.c index 6d933040..15c4239f 100644 --- a/src/MQTTAsyncUtils.c +++ b/src/MQTTAsyncUtils.c @@ -3078,7 +3078,15 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc) else if (msgtype == PUBREC) *rc = MQTTProtocol_handlePubrecs(pack, *sock, &pubToRemove); else if (msgtype == PUBACK) + { *rc = MQTTProtocol_handlePubacks(pack, *sock, &pubToRemove); + if (sendThread_state != STOPPED) +#if !defined(_WIN32) && !defined(_WIN64) + Thread_signal_cond(send_cond); +#else + Thread_post_sem(send_sem); +#endif + } if (!m) Log(LOG_ERROR, -1, "PUBCOMP, PUBACK or PUBREC received for no client, msgid %d", msgid); if (m && (msgtype != PUBREC || ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR)) From 54786b6d6d5018a196178acd068b4cab66625d09 Mon Sep 17 00:00:00 2001 From: Tiejun Zhou Date: Wed, 6 Mar 2024 00:50:34 +0000 Subject: [PATCH 2/2] Fix the same issue to PUBCOMP as suggested by @icraggs --- src/MQTTAsyncUtils.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/MQTTAsyncUtils.c b/src/MQTTAsyncUtils.c index 15c4239f..1abc66e4 100644 --- a/src/MQTTAsyncUtils.c +++ b/src/MQTTAsyncUtils.c @@ -3074,7 +3074,15 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc) } if (msgtype == PUBCOMP) + { *rc = MQTTProtocol_handlePubcomps(pack, *sock, &pubToRemove); + if (sendThread_state != STOPPED) +#if !defined(_WIN32) && !defined(_WIN64) + Thread_signal_cond(send_cond); +#else + Thread_post_sem(send_sem); +#endif + } else if (msgtype == PUBREC) *rc = MQTTProtocol_handlePubrecs(pack, *sock, &pubToRemove); else if (msgtype == PUBACK)