Skip to content

Commit

Permalink
Add cancelStream function.
Browse files Browse the repository at this point in the history
  • Loading branch information
cjcenizal committed Jun 6, 2024
1 parent 91e79de commit 8e46fb8
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
9 changes: 8 additions & 1 deletion docs/src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ const App = () => {
}
};

streamQueryV2(configurationOptions, onStreamEvent);
const { cancelStream } = await streamQueryV2(
configurationOptions,
onStreamEvent
);

setTimeout(() => {
cancelStream();
}, 2000);
};

return (
Expand Down
2 changes: 1 addition & 1 deletion src/apiV1/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export const streamQueryV1 = async (

const url = config.endpoint ?? `${DEFAULT_DOMAIN}/v1/stream-query`;

const stream = await generateStream(requestHeaders, requestBody, url);
const { stream } = await generateStream(requestHeaders, requestBody, url);

let previousAnswerText = "";

Expand Down
29 changes: 22 additions & 7 deletions src/apiV2/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,32 @@ export const streamQueryV2 = async (
const url = `${endpoint ?? DEFAULT_DOMAIN}${path}`;

try {
const stream = await generateStream(headers, JSON.stringify(body), url);
const buffer = new EventBuffer(onStreamEvent);
const { cancelStream, stream } = await generateStream(
headers,
JSON.stringify(body),
url
);

for await (const chunk of stream) {
new Promise(async (resolve, reject) => {
try {
buffer.consumeChunk(chunk);
buffer.drainEvents();
const buffer = new EventBuffer(onStreamEvent);

for await (const chunk of stream) {
try {
buffer.consumeChunk(chunk);
buffer.drainEvents();
} catch (error) {
console.log("error", error);
}
}

resolve(undefined);
} catch (error) {
console.log("error", error);
reject(error);
}
}
});

return { cancelStream };
} catch (error) {
console.log("error", error);
}
Expand Down
12 changes: 10 additions & 2 deletions src/common/generateStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@ export const generateStream = async (
headers: Record<string, string>,
body: string,
url: string
): Promise<AsyncIterable<string>> => {
) => {
let controller = new AbortController();

const response = await fetch(url, {
method: "POST",
headers,
body,
signal: controller.signal,
});

if (response.status !== 200) throw new Error(response.status.toString());
if (!response.body) throw new Error("Response body does not exist");
return getIterableStream(response.body);

return {
stream: getIterableStream(response.body),
cancelStream: () => controller.abort(),
};
};

async function* getIterableStream(
Expand Down

0 comments on commit 8e46fb8

Please sign in to comment.