Skip to content

Commit

Permalink
[#11807] Backport: Fix reactor-plugin empty mono and flux
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Dec 10, 2024
1 parent 842db77 commit 494993f
Showing 1 changed file with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void setup(ProfilerPluginSetupContext context) {
logger.info("{} disabled", this.getClass().getSimpleName());
return;
}
logger.info("{} version range=[3.1.0.RELEASE, 3.3.0.RELEASE], config:{}", this.getClass().getSimpleName(), config);
logger.info("{}, config:{}", this.getClass().getSimpleName(), config);

addFluxAndMono();
addThreadingAndSchedulers();
Expand Down Expand Up @@ -205,6 +205,7 @@ private void addFlux() {
addFluxOperatorTransform("reactor.core.publisher.FluxConcatMap");
addFluxOperatorTransform("reactor.core.publisher.FluxConcatMapNoPrefetch");
addFluxOperatorTransform("reactor.core.publisher.FluxContextWrite");
// FluxContextWriteRestoringThreadLocals
addFluxTransform("reactor.core.publisher.FluxCreate");
addFluxOperatorTransform("reactor.core.publisher.FluxDefaultIfEmpty");
addFluxTransform("reactor.core.publisher.FluxDefer");
Expand All @@ -223,7 +224,8 @@ private void addFlux() {
addFluxOperatorTransform("reactor.core.publisher.FluxDoOnEach");
addFluxOperatorTransform("reactor.core.publisher.FluxDoOnEachFuseable");
addFluxOperatorTransform("reactor.core.publisher.FluxElapsed");
addFluxOperatorTransform("reactor.core.publisher.FluxEmpty");
// empty
addFluxEmptyTransform("reactor.core.publisher.FluxEmpty");
addFluxTransform("reactor.core.publisher.FluxError");
addFluxTransform("reactor.core.publisher.FluxErrorOnRequest");
addFluxTransform("reactor.core.publisher.FluxErrorSupplied");
Expand Down Expand Up @@ -375,7 +377,8 @@ private void addMono() {
addMonoOperatorTransform("reactor.core.publisher.MonoDoOnEachFuseable");
addMonoOperatorTransform("reactor.core.publisher.MonoElapsed");
addMonoOperatorTransform("reactor.core.publisher.MonoElementAt");
addMonoTransform("reactor.core.publisher.MonoEmpty");
// empty
addMonoEmptyTransform("reactor.core.publisher.MonoEmpty");
addMonoTransform("reactor.core.publisher.MonoError");
addMonoTransform("reactor.core.publisher.MonoErrorSupplied");
addMonoOperatorTransform("reactor.core.publisher.MonoExpand");
Expand Down Expand Up @@ -428,6 +431,8 @@ private void addMono() {
addMonoOperatorTransform("reactor.core.publisher.MonoRunnable");
addMonoTransform("reactor.core.publisher.MonoSequenceEqual");
addMonoOperatorTransform("reactor.core.publisher.MonoSingle");
// MonoSingleOptional
// MonoSingleOptionalCallable
addMonoOperatorTransform("reactor.core.publisher.MonoSingleCallable");
addMonoOperatorTransform("reactor.core.publisher.MonoSingleMono");
addMonoTransform("reactor.core.publisher.MonoSource");
Expand All @@ -439,8 +444,11 @@ private void addMono() {
addMonoOperatorTransform("reactor.core.publisher.MonoSwitchIfEmpty");
addMonoOperatorTransform("reactor.core.publisher.MonoTakeLastOne");
addMonoOperatorTransform("reactor.core.publisher.MonoTakeUntilOther");
// MonoTap
// MonoTapFuseable
addMonoOperatorTransform("reactor.core.publisher.MonoTimed");
addMonoOperatorTransform("reactor.core.publisher.MonoTimeout");
// MonoToCompletableFuture
addMonoTransform("reactor.core.publisher.MonoUsing");
addMonoTransform("reactor.core.publisher.MonoUsingWhen");
addMonoTransform("reactor.core.publisher.MonoWhen");
Expand Down Expand Up @@ -670,6 +678,21 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin
}
}

void addFluxEmptyTransform(String className) {
transformTemplate.transform(className, FluxEmptyTransform.class);
}

public static class FluxEmptyTransform implements TransformCallback {

@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer);

return target.toBytecode();
}
}


void addRunnableCoreSubscriberTransform(String className) {
transformTemplate.transform(className, RunnableCoreSubscriberTransform.class);
}
Expand Down Expand Up @@ -860,6 +883,19 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin
}
}

void addMonoEmptyTransform(String className) {
transformTemplate.transform(className, MonoEmptyTransform.class);
}

public static class MonoEmptyTransform implements TransformCallback {
@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer);

return target.toBytecode();
}
}

public static class ParallelFluxMethodTransform implements TransformCallback {
@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
Expand Down

0 comments on commit 494993f

Please sign in to comment.