Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-reactive chat memory crashes response streaming #1120

Open
florian-h05 opened this issue Nov 29, 2024 · 17 comments
Open

Non-reactive chat memory crashes response streaming #1120

florian-h05 opened this issue Nov 29, 2024 · 17 comments
Labels
bug Something isn't working

Comments

@florian-h05
Copy link

florian-h05 commented Nov 29, 2024

When using response streaming in a REST resource, I always receive the following exception when using quarkus-langchain4j-memory-store-redis:

java.lang.IllegalStateException: The current thread cannot be blocked: vert.x-eventloop-thread-9
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:30)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
        at io.quarkus.redis.runtime.datasource.BlockingStringCommandsImpl.get(BlockingStringCommandsImpl.java:41)
        at io.quarkiverse.langchain4j.memorystore.RedisChatMemoryStore.getMessages(RedisChatMemoryStore.java:33)
        at io.quarkiverse.langchain4j.memorystore.RedisChatMemoryStore_xTEUBJ9I9t7XqF0naKuYsMHw80A_Synthetic_ClientProxy.getMessages(Unknown Source)
        at dev.langchain4j.memory.chat.MessageWindowChatMemory.messages(MessageWindowChatMemory.java:84)
        at io.quarkiverse.langchain4j.runtime.aiservice.AiServiceMethodImplementationSupport.doImplement(AiServiceMethodImplementationSupport.java:150)
        at io.quarkiverse.langchain4j.runtime.aiservice.AiServiceMethodImplementationSupport.implement(AiServiceMethodImplementationSupport.java:131)
        ...

Full reproducal project can be found at https://github.com/florian-h05/quarkus-langchain4j-1120.

How can I fix that? Is there a way to have a reactive chat memory store?

I am using Quarkus 3.17.0 with Quarkus LangChain4j 0.22.0.

@florian-h05 florian-h05 changed the title Bug: Non-reactive chat memory crashes response streaming Non-reactive chat memory crashes response streaming - need help Nov 29, 2024
@geoand
Copy link
Collaborator

geoand commented Nov 29, 2024

Thanks for reporting.

I will have a look next week

@cescoffier
Copy link
Collaborator

That's a good point. We may have to switch to worker thread when dealing with the memory, or introduce a reactive API.

@florian-h05
Copy link
Author

IIRC Quarkus has a reactive Redis client, but I have no idea how to use that and keep it compatible with LangChain4j‘s ChatMemorxStore interface at the same time.

BTW the same issue as with the chat memory store also occurs when using a RetrievalAugmentor because the embedding store is queried blocking.

@dennysfredericci
Copy link
Contributor

Ah, yes, that's a good one. I encountered the same issue last week and, to work around it, did not use reactive in the RegisterAiService.

Switching to a worker thread for handling memory operations seems a good approach.

@geoand
Copy link
Collaborator

geoand commented Dec 2, 2024

We definitely need to fix this as the current situation is not good at all.

@florian-h05 can you attach a small sample I can use to test various ideas against?

@geoand geoand changed the title Non-reactive chat memory crashes response streaming - need help Non-reactive chat memory crashes response streaming Dec 2, 2024
@florian-h05
Copy link
Author

florian-h05 commented Dec 2, 2024

I have updated my first message with a ChatMemoryProvider bean, a config and a Docker compose for the Redis store.
Reading https://docs.quarkiverse.io/quarkus-langchain4j/dev/ai-services.html#memory, the registered AI service should automatically make use of this. You need to add the quarkus-langchain4j-memory-store-redis plugin installed.

Is that enough or should I spinup a small sample project repo?

BTW, I am in fact not using @RegisterAiService in production (but have verified the issue exists with this as well), instead I create my AiServices through the AiService.builder() as I need to have different implementations of my AiService with different chat language models configurable during runtime.

@geoand
Copy link
Collaborator

geoand commented Dec 2, 2024

Is that enough or should I spinup a small sample project repo?

Please, do.

Thanks

@florian-h05
Copy link
Author

@geoand I have a minimal sample project to reproduce the issue: https://github.com/florian-h05/quarkus-langchain4j-1120

@geoand
Copy link
Collaborator

geoand commented Dec 2, 2024

🙏🏽

@geoand
Copy link
Collaborator

geoand commented Dec 2, 2024

I'll have a closer look tomorrow but for the time being any solution we come with won't be pretty

@geoand
Copy link
Collaborator

geoand commented Dec 2, 2024

The simplest way around it for the time being is to add @Blocking on RestResource

@florian-h05
Copy link
Author

florian-h05 commented Dec 2, 2024

I still receive the same exception, even though Quarkus Dev UI confirms that the request should be executed on a worker thread:

java.lang.IllegalStateException: The current thread cannot be blocked: vert.x-eventloop-thread-10
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:30)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
        at io.quarkus.redis.runtime.datasource.BlockingStringCommandsImpl.get(BlockingStringCommandsImpl.java:41)
        at io.quarkiverse.langchain4j.memorystore.RedisChatMemoryStore.getMessages(RedisChatMemoryStore.java:33)
        at io.quarkiverse.langchain4j.memorystore.RedisChatMemoryStore_xTEUBJ9I9t7XqF0naKuYsMHw80A_Synthetic_ClientProxy.getMessages(Unknown Source)
        at dev.langchain4j.memory.chat.MessageWindowChatMemory.messages(MessageWindowChatMemory.java:84)

@geoand
Copy link
Collaborator

geoand commented Dec 2, 2024

It works just fine in this simpliefied scenario:

diff --git a/src/main/java/com/florianhotze/quarkus/langchain4j/Assistant.java b/src/main/java/com/florianhotze/quarkus/langchain4j/Assistant.java
index c2b81ef..93dd8c9 100644
--- a/src/main/java/com/florianhotze/quarkus/langchain4j/Assistant.java
+++ b/src/main/java/com/florianhotze/quarkus/langchain4j/Assistant.java
@@ -1,13 +1,10 @@
 package com.florianhotze.quarkus.langchain4j;
 
-import dev.langchain4j.service.MemoryId;
-import dev.langchain4j.service.TokenStream;
 import dev.langchain4j.service.UserMessage;
 import io.quarkiverse.langchain4j.RegisterAiService;
+import io.smallrye.mutiny.Multi;
 
 @RegisterAiService
 public interface Assistant {
-    TokenStream chat(@UserMessage String prompt);
-
-    TokenStream chat(@MemoryId Object memoryId, @UserMessage String prompt);
+    Multi<String> chat(@UserMessage String prompt);
 }
diff --git a/src/main/java/com/florianhotze/quarkus/langchain4j/RestResource.java b/src/main/java/com/florianhotze/quarkus/langchain4j/RestResource.java
index 2dc030e..e9a355b 100644
--- a/src/main/java/com/florianhotze/quarkus/langchain4j/RestResource.java
+++ b/src/main/java/com/florianhotze/quarkus/langchain4j/RestResource.java
@@ -1,5 +1,6 @@
 package com.florianhotze.quarkus.langchain4j;
 
+import io.smallrye.common.annotation.Blocking;
 import io.smallrye.mutiny.Multi;
 import jakarta.inject.Inject;
 import jakarta.ws.rs.Consumes;
@@ -37,18 +38,8 @@ class RestResource {
             @Content(
                     mediaType = MediaType.SERVER_SENT_EVENTS,
                     schema = @Schema(implementation = String.class)))
+    @Blocking
     public Multi<String> streamingPrompt(String prompt) {
-        Multi<String> sourceMulti =
-                Multi.createFrom()
-                        .emitter(
-                                emitter ->
-                                        assistant
-                                                .chat(prompt)
-                                                .onNext(emitter::emit)
-                                                .onError(emitter::fail)
-                                                .onComplete(response -> emitter.complete())
-                                                .start());
-
-        return RestMulti.fromMultiData(sourceMulti).withDemand(1).status(200).build();
+        return RestMulti.fromMultiData(assistant.chat(prompt)).withDemand(1).status(200).build();
     }
 }
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 85ce614..633366c 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,2 +1,2 @@
-quarkus.langchain4j.memorystore.redis.client-name=chat-memory
-quarkus.redis.chat-memory.hosts=redis://localhost:6379/0
+#quarkus.langchain4j.memorystore.redis.client-name=chat-memory
+#quarkus.redis.chat-memory.hosts=redis://localhost:6379/0

@geoand geoand added the bug Something isn't working label Dec 2, 2024
@cescoffier
Copy link
Collaborator

I think it's because you added the @Blocking.

@geoand
Copy link
Collaborator

geoand commented Dec 2, 2024

Yes I know, that's my point, that for the time being adding blocking on the SSE endpoint should get over the problem

@florian-h05
Copy link
Author

It works just fine in this simpliefied scenario:

I have applied your diff in the reproduce repo, for me it did not fix the issue. I don't understand why not.
I jave just invited you to the reproduce repo, can you please push your changes to a new branch there?

@florian-h05
Copy link
Author

Thanks for pushing your changes.
Just tried this on a different machine, still doesn't work for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants