Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writes out of order #606

Merged
merged 1 commit into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ private void recalculateMaxPerSubscriber(int oldSubCount, int newSubCount) {
private boolean isDone; /*Guarded by guard*/
private Scheduler.Worker writeWorker; /*Guarded by guard*/
private boolean atleastOneWriteEnqueued; /*Guarded by guard*/
private int enqueued; /*Guarded by guard*/

private boolean isPromiseCompletedOnWriteComplete; /*Guarded by guard. Only transition should be false->true*/

Expand Down Expand Up @@ -533,7 +534,11 @@ public void onNext(Object nextItem) {
}
}

enqueue = null != writeWorker && inEL;
enqueue = null != writeWorker && (inEL || enqueued > 0);

if (enqueue) {
enqueued++;
}
}

final ChannelFuture channelFuture = enqueue ? enqueueWrite(nextItem) : ctx.write(nextItem);
Expand Down Expand Up @@ -611,6 +616,9 @@ private ChannelFuture enqueueWrite(final Object nextItem) {
@Override
public void call() {
ctx.write(nextItem, toReturn);
synchronized (guard) {
enqueued--;
}
}
});
return toReturn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/
package io.reactivex.netty.channel;

import io.netty.channel.WriteBufferWaterMark;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.concurrent.AbstractScheduledEventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.ObjectUtil;
import io.reactivex.netty.channel.BackpressureManagingHandler.BytesWriteInterceptor;
import io.reactivex.netty.channel.BackpressureManagingHandler.WriteStreamSubscriber;
import io.reactivex.netty.test.util.MockProducer;
Expand All @@ -26,11 +30,21 @@
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import rx.Scheduler;
import rx.functions.Action0;
import rx.schedulers.Schedulers;

import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.reactivex.netty.channel.BackpressureManagingHandler.BytesWriteInterceptor.MAX_PER_SUBSCRIBER_REQUEST;
import static io.reactivex.netty.channel.BytesWriteInterceptorTest.InspectorRule.defaultRequestN;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import static rx.Observable.just;

public class BytesWriteInterceptorTest {

Expand Down Expand Up @@ -113,6 +127,35 @@ public void testOneLongWriteAndManySmallWrites() throws Exception {
assertThat("Unexpected items requested.", producer1.getRequested(), is(97L));
}

@Test(timeout = 100000)
public void testWritesInOrderFromDifferentThreads() throws Exception {
final WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();

// Set the current thread to be the thread of the event loop
inspectorRule.setEventLoopThread();

// Send 1000 messages from two different threads
int msgCount = 1000;
Scheduler.Worker worker = Schedulers.computation().createWorker();
for (int i = 1; i < msgCount; i+=2) {
sub1.onNext(String.valueOf(i));

// Send from other thread
inspectorRule.sendFromOtherThread(sub1, worker, String.valueOf(i+1));
}

// In lack of a way of running all pending tasks on computation scheduler
Thread.sleep(500);

// Ensure messages are in order
Queue<Object> written = inspectorRule.getWrittenMessages();
for (int i = 1; i <= msgCount; i++) {
Object msg = written.poll();
String strMsg = ((ByteBuf) msg).toString(Charset.defaultCharset());
assertThat("Not in order ", strMsg, is(String.valueOf(i)));
}
}

@Test(timeout = 10000)
public void testBatchedSubscriberRemoves() throws Exception {
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
Expand Down Expand Up @@ -153,7 +196,7 @@ public Statement apply(final Statement base, Description description) {
@Override
public void evaluate() throws Throwable {
interceptor = new BytesWriteInterceptor("foo");
channel = new EmbeddedChannel(new WriteTransformer(), interceptor);
channel = new TestEmbeddedChannel(new WriteTransformer(), interceptor);
base.evaluate();
}
};
Expand Down Expand Up @@ -207,5 +250,174 @@ public void setupNewSubscriberAndComplete(int expectedSubCount, boolean runPendi
channel.runPendingTasks();
}
}

public Queue<Object> getWrittenMessages() {
channel.runPendingTasks();
channel.flush();
return channel.outboundMessages();
}

public void setEventLoopThread() {
ChannelPromise deregisterPromise = channel.newPromise();
channel.deregister(deregisterPromise);
channel.runPendingTasks();
assertThat("failed to deregister", deregisterPromise.isDone() && deregisterPromise.isSuccess());

ThreadAwareEmbeddedEventLoop loop = new ThreadAwareEmbeddedEventLoop(Thread.currentThread());
ChannelFuture registerPromise = loop.register(channel);
assertThat("failed to register", registerPromise.isDone() && registerPromise.isSuccess());
}

private void sendFromOtherThread(final WriteStreamSubscriber subscriber, Scheduler.Worker worker, final Object msg) throws InterruptedException {
final CountDownLatch countDown = new CountDownLatch(1);
worker.schedule(new Action0() {
@Override
public void call() {
subscriber.onNext(msg);
countDown.countDown();
}
});
countDown.await();
}
}

/**
* A custom EmbeddedChannel allowing a special EventLoop, so that we can simulate calls not coming from the event loop.
*/
private static class TestEmbeddedChannel extends EmbeddedChannel {

public TestEmbeddedChannel(WriteTransformer writeTransformer, BytesWriteInterceptor interceptor) {
super(writeTransformer, interceptor);
}

@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof ThreadAwareEmbeddedEventLoop || super.isCompatible(loop);
}

@Override
public void runPendingTasks() {
if (super.eventLoop() instanceof ThreadAwareEmbeddedEventLoop) {
ThreadAwareEmbeddedEventLoop loop = (ThreadAwareEmbeddedEventLoop) super.eventLoop();
loop.runTasks();
} else {
super.runPendingTasks();
}
}
}

/**
* Need an embedded event loop that considers a single thread to be "on the loop" in order to have writes from
* outside the event loop.
* Due to final modifier of EmbeddedEventLoop there was some copying needed.
*/
private static class ThreadAwareEmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {

private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
private final Thread loopThread;

public ThreadAwareEmbeddedEventLoop(Thread loopThread) {
this.loopThread = loopThread;
}

@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();
}

@Override
public EventLoop next() {
return (EventLoop) super.next();
}

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException("command");
}
tasks.add(command);
}

void runTasks() {
for (;;) {
Runnable task = tasks.poll();
if (task == null) {
break;
}

task.run();
}
}

@Override
protected void cancelScheduledTasks() {
super.cancelScheduledTasks();
}

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public Future<?> terminationFuture() {
throw new UnsupportedOperationException();
}

@Override
@Deprecated
public void shutdown() {
throw new UnsupportedOperationException();
}

@Override
public boolean isShuttingDown() {
return false;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}

@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
channel.unsafe().register(this, promise);
return promise;
}

@Override
public boolean inEventLoop() {
return Thread.currentThread() == loopThread;
}

@Override
public boolean inEventLoop(Thread thread) {
return thread == loopThread;
}
}

}