-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sinks.many().replay().latest()
holds two values in cache
#3340
Comments
I will just add that the possible solution stated here fixes the head != tail issue but the buffer will not emit its value, it looks like this is a "simple" off-by-one error. (edit: limit 2 yields 3 items referenced) Consider making a singleton implementation of the buffer. And moving to using the Java's AtomicReferenceArray in stead of the linked list ( |
My least effort implementations of a SingletonReplayBuffer and an AtomiReferenceArray-based implementation There is bound to be some issues with these, but they both passed a naive test: public static <T> Sinks.Many<T> replayLatestSink() {
var sink = new SinkManyReplayProcessor<>(new AtomicArraySizeBoundReplayBuffer<T>(1));
return sink; // and wrapped: new SinkManySerialized<>(sink, sink);
}
public static <T> Sinks.Many<T> replayLatestSinkSingleton() {
var sink = new SinkManyReplayProcessor<>(new SingletonReplayBuffer<T>());
return sink; // and wrapped: new SinkManySerialized<>(sink, sink);
}
@Test
void testFunctionalityOfSinksManyReplayLatest() {
Sinks.Many<Integer> sink = replayLatestSink(); // or replayLatestSingleton();
sink.tryEmitNext(12);
var firstResult = sink.asFlux().blockFirst();
assert firstResult != null;
assert firstResult == 12;
var secondResult = sink.asFlux().blockFirst();
assert firstResult.equals(secondResult);
sink.tryEmitNext(21);
firstResult = sink.asFlux().blockFirst();
assert firstResult != null;
assert firstResult == 21;
secondResult = sink.asFlux().blockFirst();
assert firstResult.equals(secondResult);
} sorry it's not in a PR form, I do this on my work PC and I cannot clone from github (private PC not set up for Java, nor IntelliJ) static final class SingletonReplayBuffer<T> implements FluxReplay.ReplayBuffer<T> {
final AtomicReference<T> val = new AtomicReference<>(null);
final int indexUpdateLimit = Operators.unboundedOrLimit(1);
volatile boolean done;
Throwable error;
@Override
public void add(T value) {
val.set(value);
}
@Override
public void onError(Throwable ex) {
error = ex;
done = true;
}
@Override
public Throwable getError() {
return error;
}
@Override
public void onComplete() {
done = true;
}
@Override
public void replay(FluxReplay.ReplaySubscription<T> rs) {
if (!rs.enter()) {
return;
}
if (rs.fusionMode() == FluxReplay.NONE) {
replayNormal(rs);
}
else {
replayFused(rs);
}
}
private void replayNormal(FluxReplay.ReplaySubscription<T> rs) {
final Subscriber<? super T> a = rs.actual();
int missed = 1;
long r = rs.requested(); // can this be 0?
long e = 0L;
for (; ; ) {
@SuppressWarnings("unchecked") T curr = (T) rs.node();
T t = val.get();
if (curr == null) {
curr = t;
rs.node(curr);
}
boolean d = done;
boolean empty = t == null;
if (d && empty) {
rs.node(null);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
}
else {
a.onComplete();
}
return;
}
if (empty) {
continue;
}
if (rs.isCancelled()) {
rs.node(null);
return;
}
a.onNext(t);
e++;
if (r != Long.MAX_VALUE) {
rs.produced(1);
}
rs.node(t);
missed = rs.leave(missed);
if (missed == 0 || e == r) {
break;
}
}
}
private void replayFused(FluxReplay.ReplaySubscription<T> rs) {
// I have no idea...
}
@Override
public boolean isDone() {
return done;
}
@Override
public T poll(FluxReplay.ReplaySubscription<T> rs) {
@SuppressWarnings("unchecked") T node = (T) rs.node();
T next = val.get();
if (node == null) {
rs.node(next);
}
// if ((1) % indexUpdateLimit == 0) {
// rs.requestMore(1);
// } // this should be rethought for this implementation
return next;
}
@Override
public void clear(FluxReplay.ReplaySubscription<T> rs) {
rs.node(null);
}
@Override
public boolean isEmpty(FluxReplay.ReplaySubscription<T> rs) {
@SuppressWarnings("unchecked") T node = (T) rs.node();
if (node == null) {
rs.node(val.get());
return false;
// doesn't this mean that the subscription is not up-to-date?
// then the subscription can progress, why add the node, then?
}
return node == val.get();
}
@Override
public int size(FluxReplay.ReplaySubscription<T> rs) {
@SuppressWarnings("unchecked") T node = (T) rs.node();
T t = val.get();
if (node == null) {
rs.node(t);
}
return sizeOf(t);
}
@Override
public int size() {
return sizeOf(val.get());
}
private int sizeOf(@Nullable T t) {
return t != null ? 1 : 0;
}
@Override
public int capacity() {
return 1;
}
@Override
public boolean isExpired() {
return false;
}
}
static final class AtomicArraySizeBoundReplayBuffer<T> implements FluxReplay.ReplayBuffer<T> {
final AtomicInteger head = new AtomicInteger();
final int indexUpdateLimit;
volatile boolean done;
Throwable error;
final AtomicReferenceArray<T> buffer;
AtomicArraySizeBoundReplayBuffer(int limit) {
if (limit < 0) {
throw new IllegalArgumentException("Limit cannot be negative");
}
this.indexUpdateLimit = Operators.unboundedOrLimit(limit);
this.buffer = new AtomicReferenceArray<>(limit);
}
@Override
public boolean isExpired() {
return false;
}
@Override
public int capacity() {
return buffer.length();
}
@Override
public void add(T value) {
buffer.set(
head.accumulateAndGet(capacity(), (curr, cap) -> {
if (curr + 1 == cap) return 0;
return curr + 1;
}),
value
);
}
@Override
public void onError(Throwable ex) {
error = ex;
done = true;
}
@Override
public void onComplete() {
done = true;
}
void replayNormal(FluxReplay.ReplaySubscription<T> rs) {
final Subscriber<? super T> a = rs.actual();
int missed = 1;
for (; ; ) {
long r = rs.requested();
long e = 0L;
Integer curr = (Integer) rs.node();
if (curr == null) {
curr = head.get();
rs.node(curr);
}
int start = curr;
while (e != r) {
if (rs.isCancelled()) {
rs.node(null);
return;
}
boolean d = done;
T next = buffer.get(curr);
boolean empty = next == null;
if (d && empty) { // if start is not 0 here that is a clear bug!
rs.node(null);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
}
else {
a.onComplete();
}
return;
}
if (empty) {
break;
}
a.onNext(next);
e++;
if (curr == capacity() && start != 0) {
curr = 0;
} else {
curr++;
}
if (curr == start) break;
if ((curr + 1) % indexUpdateLimit == 0) {
rs.requestMore(curr + 1);
}
}
if (e != 0L) {
if (r != Long.MAX_VALUE) {
rs.produced(e);
}
}
rs.node(curr);
missed = rs.leave(missed);
if (missed == 0) {
break;
}
}
}
void replayFused(FluxReplay.ReplaySubscription<T> rs) {
int missed = 1;
final Subscriber<? super T> a = rs.actual();
for (; ; ) {
if (rs.isCancelled()) {
rs.node(null);
return;
}
boolean d = done;
a.onNext(null);
if (d) {
Throwable ex = error;
if (ex != null) {
a.onError(ex);
}
else {
a.onComplete();
}
return;
}
missed = rs.leave(missed);
if (missed == 0) {
break;
}
}
}
@Override
public void replay(FluxReplay.ReplaySubscription<T> rs) {
if (!rs.enter()) {
return;
}
if (rs.fusionMode() == FluxReplay.NONE) {
replayNormal(rs);
}
else {
replayFused(rs);
}
}
@Override
@Nullable
public Throwable getError() {
return error;
}
@Override
public boolean isDone() {
return done;
}
@Override
@Nullable
public T poll(FluxReplay.ReplaySubscription<T> rs) {
Integer node = (Integer) rs.node();
if (node == null) {
node = head.get();
rs.node(node);
}
T next = buffer.get(node);
if (next == null) {
return null;
}
rs.node(node+1);
if ((node + 1) % indexUpdateLimit == 0) {
rs.requestMore(node + 1);
}
return next;
}
@Override
public void clear(FluxReplay.ReplaySubscription<T> rs) {
//clear array?
rs.node(null);
}
@Override
public boolean isEmpty(FluxReplay.ReplaySubscription<T> rs) {
Integer node = (Integer) rs.node();
if (node == null) {
node = head.get();
rs.node(node);
}
return buffer.get(0) == null;
}
@Override
public int size(FluxReplay.ReplaySubscription<T> rs) {
Integer node = (Integer) rs.node();
if (node == null) {
node = head.get();
}
return sizeFrom(node);
}
@Override
public int size() {
return sizeFrom(head.get());
}
private int sizeFrom(int position) {
if (
position+1 >= capacity()
|| buffer.get(position+1) != null
) return capacity();
return position;
}
} There is bound to be some bugs here... but anyway the implementations are pretty simple. I didn't really understand how to use the rs.node() nor requestMore for the singleton implementation; |
fixed a bug in replay: void replayNormal(FluxReplay.ReplaySubscription<T> rs) {
final Subscriber<? super T> a = rs.actual();
int missed = 1;
for (; ; ) {
long r = rs.requested();
long e = 0L;
Integer curr = (Integer) rs.node();
if (curr == null) {
curr = head.get();
rs.node(curr);
}
int last = curr;
curr = nextOrReset(curr);
while (e != r) {
if (rs.isCancelled()) {
rs.node(null);
return;
}
boolean d = done;
T next = buffer.get(curr);
boolean empty = next == null;
if (d && empty) { // if start is not 0 here that is a clear bug!
rs.node(null);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
}
else {
a.onComplete();
}
return;
}
if (empty) {
break;
}
a.onNext(next);
e++;
if (curr == last) break;
curr = nextOrReset(curr);
if ((curr + 1) % indexUpdateLimit == 0) {
rs.requestMore(curr + 1);
}
}
if (e != 0L) {
if (r != Long.MAX_VALUE) {
rs.produced(e);
}
}
rs.node(curr);
missed = rs.leave(missed);
if (missed == 0) {
break;
}
}
}
private Integer nextOrReset(Integer curr) {
if (curr + 1 == capacity()) {
return 0;
}
return curr + 1;
} But there is some timing issue/race condition in the code... Replay is called twice, producing double responses. but stepping through it works... and some times I got 11 and 13 outputs in a buffer of size 10 (always in correct order), by adding a breakpoint... it may be easier to reimplement than to go with this implementation then :( |
Hi, @MikkelHJuul! Can you reproduce that issue via black-box test as well? Something like a test that demonstrates the operator fails on its own can be useful. Consider JCStress Test, etc Thanks, |
In the implementation? Or the original issue? Because the original issue is not really something that is testable via the API, but I can tell give you some jmap histogram showing the 2:1 correlation. Again, I cannot really be of much help because I cannot share actual code with you guys because I don't have a dev PC where it is permitted. |
@MikkelHJuul I'm not sure the problem is mitigatable since in all possible cases the previous value is referenced and being held longer than needed. This is "by design" of the current replay storage and I can only guess that this is the best effort, the most efficient implementation. I'm afraid we will not fix it actually, RxJava has the same issue, so I wonder what @akarnokd thinks about it |
@MikkelHJuul can you tell us how critical is this issue for you? |
@OlegDokuka - Well... our use of this is kinda spurious as well.. holding all cache objects in a for a possible fix of the above add eager tests for equality: Integer curr = (Integer) rs.node();
if (curr == null) {
curr = head.get();
rs.node(curr);
} replaced by Integer curr = (Integer) rs.node();
Integer hd = head.get();
if (hd.equals(curr)) {
break;
}
if (curr == null) {
curr = hd;
rs.node(curr);
} This I am mostly afraid of a subscriber that dissallows and for the Singleton; add a simple if (curr == t) {
break;
} add the similar point for the Singletons implementation It seemed to fix all the issues, but you will have to test yourself. I'm not entirely sure that I bite into the we save one dangling AtomicReference too many "by design". I understand that the implementation works wrt. your tests, that is not the same as it being the most correct implementation. (we had some actual memory leaks (growing) which is also probably the only reason that I found this at all) I may try to get a developer environment running on my personal PC so I can make an actual PR if you are not interested in taking over. |
Hi, @MikkelHJuul! Thank you for the open response with deeper explanation of the problem. First of all, any contribution is appreciated, and it is not equal to 'we dont wanna mess with that' 😅 What i mean is that the atomic replay store is singular case, while the rest of scenarios (e.g. That means although your case is covered the others arent. From, what you say it sounds you care alot about performance, so I start wondering whether you can replace sink API with something different to unblock you for now. 🧐 Nevertheless, I want to do a bit deeper research first to see if we can solve the general scenario as well, and weight your proposal as an good alternative. 🤓 That say, dont hesitate to send your PR while we going to spawn own research in parallel 🙂 |
Thanks, I'm more concerned with holding on to the memory too long. I think I will look at it this week. Can you tell me if it's maybe just more feasible to copy the implementation of the FluxOnBackpressureBufferLatest? |
hello, I would like to do a short summary, since I have now had some more time to look into this, I think I may need some help to get completely finished. Yesterday I implemented a replacement for the SizeBoundReplayBuffer in https://github.com/MikkelHJuul/reactor-core/tree/issues/3340-object-array using an using index solely does give the buffers (Singleton and Array) a discrete bug, in which after overflow, and reaching index = 0 the buffer will skip a single emission on a subscriber arriving at then. Should this bug be accepted or should it be prevented (by rolling the buffer to a valid index, that would prevent this, singleton can simply skip 0 and go to 1, the array could move to I had a bit too many questions to simply go on myself at this point. Thanks |
I don't fully understand the issue. We have an operator parameter in RxJava to avoid the retention of the head item. |
The code is not completely comparable, although close. I would expect the same leak to be present there (and be reducible by this flag) Is the RXJava buffer also protected behind a construct like sinkManySerialized? It looks close so I would expect the same; not safe for concurrent use. (Note the JCStress test in the PR related to this and a singleton implementation) |
The buffer is fed by a single producer. Each subscriber walks the linked list from a node they captured at the time of subscription. Every new item arriving will indicate the need for more draining per subscriber. Where is this JCStress code? |
The reactor SizeBoundReplayBuffer has a hard failure (NullPointerException) |
Is |
I highly doubt it at this time. The Object-array implementation I have been working on is error safe but does have the issue of missing emissions (in 2% of cases in JCStress) when tryEmitNext is called. This is through the design/implementation of the ReplayProcessor and how add and replay is called
There is a big difference in using |
Hello, I just wanted to chip in on this issue. I just added a new PR that implements the SizeBoundReplayBuffer as an atomic object-array. I'm not entirely sure which implementation should go in; there are now 3 to chose from: the SingletonReplayBuffer, enhancing the SizeBoundReplayBuffer and the ArraySizeBoundReplayBuffer. In all cases, I believe there could be an effort to pull in things across multiple of these PRs. fx the cancelOnNext test in the object-array implementation. The general tests in the enhanced SizeBoundReplayBuffer-case etc. Also I see that the SizeBoundReplayBuffer could still be further improved. It could be done like the implementation in the object-array-implementation, or it could be changed to be atomically updated with CAS-semantics. There is an issue with the license I believe? I will not change anything in any licenses, so please help me fix that. Please contact me if to clarify. |
Linking the PR for future reference: #3688 |
Hi, @MikkelHJuul! Thank you for your hard effort putting all potential improvements in multiple PRs. We have reviewed all of them and estimated the impact on the existing behaviour. To understand the impact please consider the following sample: public static void main(String[] args) {
once the sample is executed, the output is the following: 12:39:36.831 [main] INFO SUB1 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner) with your suggestion we have the following logs output: 12:39:36.831 [main] INFO SUB1 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner) For now, all of the PRs introduces behaviour change which is not acceptable for this operator (it stays in the codebase from the very first release of the 3.x line with this behaviour). To mitigate that breaking change we need to add an extra builder step in the MulticastReplaySpec which would let one to decide transparently which behaviour is expected. Consider the following configurations code sample:
or
and
Would this solution work for you? Thanks, |
Hi @OlegDokuka Thanks, I didn't actually catch that difference. I'm not sure that any performance/memory differences is worth keeping the singleton implementation over the array implementation. We need to decide on that. And I guess we should go with the fixes for the add method that makes the SizeBoundReplayBuffer (and timed) safe for concurrent use at least (and maybe explain how the unsafe API actually behaves - latest element always emitted to all subscribers but elements emitted concurrently may not be visible to all subscribers - for the array buffer) The explanation above further explains why the SizeBoundReplayBuffer fairs comparatively well in the concurrent example; it only needs to replay once and will always replay all remaining elements. For the array based implementation, each #add competes with #replay. We could fix this btw if the added item also is added in the call to replay (method parameter); then all added items must be replayed, albeit with probable loss of ordering. Looking at the two proposed API changes the latter (bestEffort) is in line with what I expect; in fact the former is not easily achieveable; the wrapping handler would have to verify demand before hitting the buffer Also, I suppose I need to add a similar implementation for the SizeAndTimeBound implementation? On a side note. I have been running the SingletonReplayBuffer in our production environment since November now, in unsafe mode (we only care for latest in case of multiple rapid updates). (We have 3 million instances, saving about 200-300MB in bookkeeping - and one of these buffers are known to hold a reference to a 700MB string that is updated once in a while so this really helped reduce memory for us) |
Should any changes be made available via the |
I added a PR to fix the current implementations. We could also proceed with the enhancements in another ticket |
@OlegDokuka - I have slowly started work on this. Please review the referenced PR. It should be fully compatible with current behavior, but adds safe concurrency. |
Hey, @MikkelHJuul. I realize it has been a while since your last two PRs were opened. I am just now starting to look into this issue. A few things that are not super clear to me and we need a plan going forward. I notice the implementation in RxJava is different than in Reactor currently and according to @akarnokd it allows to avoid storing the unnecessary item. Link to the PR with an explanation of the trade-offs. Can you explain:
Thanks in advance for following up. |
Hi and thanks for catching up. It has been a while. And I must admit that I was pondering lately if the array based implementation was better than copy-pasting the already implemented version but throwing away the reference from tail on |
Definitely. Are you considering a contribution which ports the evolution of the replay-buffer implementations from RxJava into Reactor? I think that with the current spec, we don't need to make the current linked list implementation thread-safe. I believe the proposed implementation should also yield worse performance results if you do a JMH benchmarks due to the looping and CAS operations. If you are ok with the implementation from RxJava, I'd suggest that we close #3714. Regarding the array-based best-effort implementation, I think we can also make a pause there and close #3798 as you suggest we'd be able to communicate to users what risks there are with the current linked-list implementation with a proper note in the javadoc. While still the primary concern of holding a reference to a stale value is solved using the eager truncation of the head element at the only cost of always allocating a dummy head once the size limitation is reached. |
The caching used for
Sinks.many().replay().latest()
holds bothhead
andtail
in the cache, although only one is available for replay (as it should). This means that the extra reference holds unnecessary memory, and holds onto a reference that cannot be reclaimed.Expected Behavior
The cache holds only a single reference in memory.
Actual Behavior
The https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java#L774 SizeBoundReplayBuffer holds two items in cache.
Steps to Reproduce
Possible Solution
Your Environment
doesn't bloody matter
The text was updated successfully, but these errors were encountered: