Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leak due to ByteBuffer not being released before garbage collection #3541

Closed
tkaesler opened this issue Jul 21, 2023 · 6 comments
Closed

Leak due to ByteBuffer not being released before garbage collection #3541

tkaesler opened this issue Jul 21, 2023 · 6 comments
Labels
status/need-user-input This needs user input to proceed

Comments

@tkaesler
Copy link

tkaesler commented Jul 21, 2023

When opening a lot of Flux streams and using the .reduce() operator on them, after a while (depending on the system and the frequency with which the streams are created), eventuall leak detection alerts because a ByteBuffer was not released before garbage collection.
Changing the code from using the .reduce() to .collectList().map{it.sum()} results on no leakage.
Funny enough, the data stream is empty in the reproducer.

Expected Behavior

No memory leakage

Actual Behavior

Memory leakage after a certain amount of time.

Steps to Reproduce

Repository with application that reproduces the issue: https://github.com/tkaesler/spring-leak-reproducer
Readme describes setup.

I could not yet reproduce without Spring and the database connection, but am also lacking some knowledge there yet.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jul 21, 2023
@OlegDokuka
Copy link
Contributor

Hi @tkaesler!

In rector we have a separate test-suit which ensures that some operators do not leak. This list includes both collectList and reduce operators (see here).

That said the problem could be in a different place (e.g. spring data or r2dbc-postgress driver).
Please consider simplifying example so it demonstrate the reduce problem explicitly.

Thanks,
Oleh

@OlegDokuka OlegDokuka added the status/need-user-input This needs user input to proceed label Jul 24, 2023
@OlegDokuka
Copy link
Contributor

@tkaesler I'm closing this issues due to no response. Feel free to reopen once you have any input

@OlegDokuka OlegDokuka closed this as not planned Won't fix, can't repro, duplicate, stale Sep 18, 2023
@subham611
Copy link

Hi,
I am facing similar issue while using netty + spring boot webflux
Here is complete stack trace:

{"timestamp":1712654400358,"thread":"reactor-http-epoll-27","logger":"io.netty.util.ResourceLeakDetector","message":"LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:404)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
	org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:72)
	org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
	org.springframework.core.codec.CharSequenceEncoder.encodeValue(CharSequenceEncoder.java:90)
	org.springframework.core.codec.CharSequenceEncoder.lambda$encode$0(CharSequenceEncoder.java:74)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
	reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest(MonoSingle.java:103)
	reactor.core.publisher.Operators$MonoInnerProducerBase.request(Operators.java:2909)
	reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
	reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
	reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
	reactor.core.publisher.Mono.subscribe(Mono.java:4568)
	reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
	reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
	reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:63)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
	reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297)
	reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478)
	reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470)
	reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
	reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
	reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
	reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129)
	reactor.core.publisher.Mono.subscribe(Mono.java:4568)
	reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
	reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
	reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
	reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
	reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
	reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
	reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
	reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
	reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:208)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:149)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
	reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:156)
	reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onSubscribe(FluxConcatArray.java:194)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onSubscribe(FluxPeekFuseable.java:471)
	reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
	reactor.core.publisher.Mono.subscribe(Mono.java:4568)
	reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:260)
	reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:79)
	reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
	reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2842)
	reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
	reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2573)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest(MonoSingle.java:103)
	reactor.core.publisher.Operators$MonoInnerProducerBase.request(Operators.java:2909)
	reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
	reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
	reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
	reactor.core.publisher.Mono.subscribe(Mono.java:4568)
	reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
	reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
	reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:63)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:294)
	reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:188)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
	reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297)
	reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478)
	reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
	reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
	reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
	reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
	reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097)
	reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
	reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415)
	reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446)
	reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:687)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:284)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

Java version: 17
Spring boot version: 3.2.4
JVM args passed:

-XX:MaxDirectMemorySize=1024M
-Dio.netty.leakDetection.targetRecords=40
-Dio.netty.leakDetection.level=ADVANCED
-Dlogging.level._reactor.netty.channel.LeakDetection=debug

@chemicL
Copy link
Member

chemicL commented Apr 10, 2024

Please provide a minimal reproducible example and we can reopen if it shows reactor-core's operators are at fault.

@chemicL chemicL removed the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Apr 10, 2024
@valeevr
Copy link

valeevr commented Aug 23, 2024

Hi, i updated versions in example above https://github.com/Dambldore/spring-leak-reproducer and problem still exists:
OpenJDK Runtime Environment Corretto-17.0.8.8.1 (build 17.0.8.1+8-LTS)
Spring boot version: 3.3.2
kotlin: 1.9.24

2024-08-23T11:46:38.878+05:00 ERROR 42980 --- [ctor-tcp-nio-11] io.netty.util.ResourceLeakDetector       : LEAK: DataRow.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.r2dbc.postgresql.message.backend.DataRow.<init>(DataRow.java:37)
	io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:141)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:208)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
2024-08-23T11:47:05.932+05:00 ERROR 42980 --- [ctor-tcp-nio-11] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:144)
	io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
	io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:108)
	io.r2dbc.postgresql.message.backend.DataRow.decodeColumn(DataRow.java:149)
	io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:139)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:208)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
2024-08-23T11:47:05.933+05:00 ERROR 42980 --- [ctor-tcp-nio-10] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:298)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#2:
	io.netty.buffer.AdvancedLeakAwareByteBuf.readInt(AdvancedLeakAwareByteBuf.java:437)
	io.r2dbc.postgresql.message.backend.DataRow.decodeColumn(DataRow.java:148)
	io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:139)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:208)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.readShort(AdvancedLeakAwareByteBuf.java:413)
	io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:135)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:208)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#4:
	io.netty.buffer.AdvancedLeakAwareByteBuf.skipBytes(AdvancedLeakAwareByteBuf.java:533)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:38)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:208)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#5:
	io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:401)
	io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:37)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:208)
	reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#6:
	Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
Created at:
	io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:144)
	io.netty.buffer.SimpleLeakAwareByteBuf.retainedSlice(SimpleLeakAwareByteBuf.java:57)
	io.netty.buffer.AdvancedLeakAwareByteBuf.retainedSlice(AdvancedLeakAwareByteBuf.java:96)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.extractFrame(LengthFieldBasedFrameDecoder.java:502)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:440)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
: 1 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.
2024-08-23T11:47:05.933+05:00 ERROR 42980 --- [ctor-tcp-nio-11] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.buffer.AbstractPooledDerivedByteBuf.deallocate(AbstractPooledDerivedByteBuf.java:87)
	io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
	io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	io.netty.buffer.WrappedByteBuf.release(WrappedByteBuf.java:1037)
	io.netty.buffer.SimpleLeakAwareByteBuf.release(SimpleLeakAwareByteBuf.java:102)
	io.netty.buffer.AdvancedLeakAwareByteBuf.release(AdvancedLeakAwareByteBuf.java:942)
	io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:298)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#2:
	io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedInt(AdvancedLeakAwareByteBuf.java:197)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.getUnadjustedFrameLength(LengthFieldBasedFrameDecoder.java:468)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:410)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.order(AdvancedLeakAwareByteBuf.java:71)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.getUnadjustedFrameLength(LengthFieldBasedFrameDecoder.java:455)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:410)
	io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:333)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#4:
	Hint: 'LengthFieldBasedFrameDecoder' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#5:
	Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
#6:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:635)
	io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:356)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:404)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:140)
	io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:833)
: 17 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.

@chemicL
Copy link
Member

chemicL commented Aug 30, 2024

@Dambldore Thanks for the example. I believe you should report this in the r2dbc or spring-data repository.

The difference between Mono.collectList() and Mono.reduce() in your example is that collectList() signals an empty list, while reduce will terminate the Mono without a result. Combining the latter with Mono.zip() for two sources means that the other source will be cancelled once the reduced Mono completes first without value.

The issue is with the cancellation of repository.count(). You can also reproduce this with:

@Scheduled(fixedDelayString = "10")
fun createdCommandCheck() {
    println(
        repository
            .count()
            .timeout(Duration.ofMillis(5)) // cancels the upstream in 5ms if no result yet
            .onErrorReturn(-1)
            .block(Duration.ofMillis(300000))
    )
}

CC @mp911de: this looks like it is related to r2dbc/r2dbc-pool#198 although this time it's data leaks and not connections.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-user-input This needs user input to proceed
Projects
None yet
Development

No branches or pull requests

6 participants