From dacf52db2541995f8cce5f23feba2ca744f5dd6d Mon Sep 17 00:00:00 2001 From: Toan Nguyen Date: Sun, 3 Dec 2023 09:14:57 +0700 Subject: [PATCH] add sync mode for subscription events (#114) --- README.md | 4 +++- subscription.go | 18 ++++++++++++++++-- subscription_test.go | 12 +++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 136ef8e..f0d699d 100644 --- a/README.md +++ b/README.md @@ -590,7 +590,9 @@ client. WithExitWhenNoSubscription(false). // WithRetryStatusCodes allow retry the subscription connection when receiving one of these codes // the input parameter can be number string or range, e.g 4000-5000 - WithRetryStatusCodes("4000", "4000-4050") + WithRetryStatusCodes("4000", "4000-4050"). + // WithSyncMode subscription messages are executed in sequence (without goroutine) + WithSyncMode(true) ``` #### Subscription Protocols diff --git a/subscription.go b/subscription.go index 2efd32e..e34d6c9 100644 --- a/subscription.go +++ b/subscription.go @@ -364,6 +364,7 @@ type SubscriptionClient struct { onError func(sc *SubscriptionClient, err error) error errorChan chan error exitWhenNoSubscription bool + syncMode bool keepAliveInterval time.Duration retryDelay time.Duration mutex sync.Mutex @@ -464,6 +465,12 @@ func (sc *SubscriptionClient) WithExitWhenNoSubscription(value bool) *Subscripti return sc } +// WithSyncMode subscription messages are executed in sequence (without goroutine) +func (sc *SubscriptionClient) WithSyncMode(value bool) *SubscriptionClient { + sc.syncMode = value + return sc +} + // Keep alive subroutine to send ping on specified interval func startKeepAlive(ctx context.Context, c WebsocketConn, interval time.Duration) { ticker := time.NewTicker(interval) @@ -806,13 +813,20 @@ func (sc *SubscriptionClient) Run() error { if sub == nil { sub = &Subscription{} } - go func() { + + execMessage := func() { if err := sc.protocol.OnMessage(subContext, *sub, message); err != nil { sc.errorChan <- err } sc.checkSubscriptionStatuses(subContext) - }() + } + + if sc.syncMode { + execMessage() + } else { + go execMessage() + } } } }() diff --git a/subscription_test.go b/subscription_test.go index 2e3b9b7..5add1f2 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -13,7 +13,8 @@ import ( "nhooyr.io/websocket" ) -func TestSubscription_LifeCycleEvents(t *testing.T) { +func testSubscription_LifeCycleEvents(t *testing.T, syncMode bool) { + server := subscription_setupServer(8082) client, subscriptionClient := subscription_setupClients(8082) msg := randomID() @@ -84,6 +85,7 @@ func TestSubscription_LifeCycleEvents(t *testing.T) { subscriptionClient = subscriptionClient. WithExitWhenNoSubscription(false). WithTimeout(3 * time.Second). + WithSyncMode(syncMode). OnConnected(func() { lock.Lock() defer lock.Unlock() @@ -200,6 +202,14 @@ func TestSubscription_LifeCycleEvents(t *testing.T) { } } +func TestSubscription_LifeCycleEvents(t *testing.T) { + testSubscription_LifeCycleEvents(t, false) +} + +func TestSubscription_WithSyncMode(t *testing.T) { + testSubscription_LifeCycleEvents(t, true) +} + func TestSubscription_WithRetryStatusCodes(t *testing.T) { stop := make(chan bool) msg := randomID()