diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java index 8934e3a407c..05b22cd6218 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java @@ -1,12 +1,20 @@ package info.bitrich.xchangestream.binance; +import static java.util.Collections.emptyMap; + import info.bitrich.xchangestream.binance.BinanceUserDataChannel.NoActiveChannelException; import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.service.netty.ConnectionStateModel.State; +import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; import info.bitrich.xchangestream.util.Events; import io.reactivex.Completable; import io.reactivex.Observable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.knowm.xchange.binance.BinanceAuthenticated; import org.knowm.xchange.binance.BinanceExchange; @@ -17,14 +25,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static java.util.Collections.emptyMap; - public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange { private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingExchange.class); @@ -322,4 +322,14 @@ public void enableLiveSubscription() { public void disableLiveSubscription() { if (this.streamingService != null) this.streamingService.disableLiveSubscription(); } + + /** + * Enables the user to listen on channel inactive events and react appropriately. + * + * @param channelInactiveHandler a WebSocketMessageHandler instance. + */ + public void setChannelInactiveHandler( + WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { + streamingService.setChannelInactiveHandler(channelInactiveHandler); + } } diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingService.java index bcbd2eb682a..af1bf3f3f9f 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingService.java @@ -7,10 +7,10 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler; +import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -20,6 +20,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BinanceStreamingService extends JsonNettyStreamingService { @@ -32,6 +34,9 @@ public class BinanceStreamingService extends JsonNettyStreamingService { private final KlineSubscription klineSubscription; private boolean isLiveSubscriptionEnabled = false; + + private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; + private final Map liveSubscriptionMessage = new ConcurrentHashMap<>(); @@ -236,4 +241,39 @@ public void unsubscribeChannel(final String channelId) { } } } + + @Override + protected WebSocketClientHandler getWebSocketClientHandler( + WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) { + LOGGER.info("Registering BinanceWebSocketClientHandler"); + return new BinanceWebSocketClientHandler(handshake, handler); + } + + public void setChannelInactiveHandler( + WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { + this.channelInactiveHandler = channelInactiveHandler; + } + + /** + * Custom client handler in order to execute an external, user-provided handler on channel events. + */ + class BinanceWebSocketClientHandler extends NettyWebSocketClientHandler { + + public BinanceWebSocketClientHandler( + WebSocketClientHandshaker handshake, WebSocketMessageHandler handler) { + super(handshake, handler); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + super.channelInactive(ctx); + if (channelInactiveHandler != null) + channelInactiveHandler.onMessage("WebSocket Client disconnected!"); + } + } }