Skip to content

Commit

Permalink
Merge pull request #4699 from rizer1980/okx_implement_channel_inactiv…
Browse files Browse the repository at this point in the history
…e_listener

[okx] implement channel inactive listener
  • Loading branch information
timmolter authored Nov 13, 2023
2 parents b7dfec0 + f17baf7 commit d56e61f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.reactivex.Completable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
Expand Down Expand Up @@ -82,4 +83,14 @@ public StreamingTradeService getStreamingTradeService() {
public void useCompressedMessages(boolean compressedMessages) {
throw new NotYetImplementedForExchangeException("useCompressedMessage");
}

/**
* Enables the user to listen on channel inactive events and react appropriately.
*
* @param channelInactiveHandler a WebSocketMessageHandler instance.
*/
public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
streamingService.setChannelInactiveHandler(channelInactiveHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,13 @@
import info.bitrich.xchangestream.okex.dto.OkexLoginMessage;
import info.bitrich.xchangestream.okex.dto.OkexSubscribeMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.exceptions.ExchangeException;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
Expand All @@ -27,6 +20,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class OkexStreamingService extends JsonNettyStreamingService {

private static final Logger LOG = LoggerFactory.getLogger(OkexStreamingService.class);
Expand All @@ -45,6 +49,8 @@ public class OkexStreamingService extends JsonNettyStreamingService {

private final Observable<Long> pingPongSrc = Observable.interval(15, 15, TimeUnit.SECONDS);

private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;

private Disposable pingPongSubscription;

private final ExchangeSpecification xSpec;
Expand Down Expand Up @@ -165,6 +171,42 @@ private OkexSubscribeMessage.SubscriptionTopic getTopic(String channelName){
}
}

@Override
protected WebSocketClientHandler getWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) {
LOG.info("Registering OkxWebSocketClientHandler");
return new OkxWebSocketClientHandler(handshake, handler);
}

public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
this.channelInactiveHandler = channelInactiveHandler;
}

/**
* Custom client handler in order to execute an external, user-provided handler on channel events.
*/
class OkxWebSocketClientHandler extends NettyWebSocketClientHandler {

public OkxWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketMessageHandler handler) {
super(handshake, handler);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
if (channelInactiveHandler != null) {
channelInactiveHandler.onMessage("WebSocket Client disconnected!");
}
}
}

public void pingPongDisconnectIfConnected() {
if (pingPongSubscription != null && !pingPongSubscription.isDisposed()) {
pingPongSubscription.dispose();
Expand Down

0 comments on commit d56e61f

Please sign in to comment.