Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yet Another V2 Assistant Streaming Implemenetation #748

Open
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

hayeah
Copy link

@hayeah hayeah commented May 19, 2024

This PR implements Assistant Streaming for the OpenAI API V2 (BETA).

https://platform.openai.com/docs/api-reference/assistants-streaming

Key Features

  • No additional goroutines are created.
  • Provides a bufio.Scanner-inspired API for accessing stream events.
  • Implements type-safe processing of stream events through type switching.
  • Includes convenience unwrappers for common event types.
  • The streamer is an io.Reader, simplifying text streaming and integration with Go's standard libraries.

This PR is built on the excellent work of #737. The main departure from @coolbaluk's work is to improve the streaming response type AssistantStreamEvent to handle the polymorphic nature of the stream events better.

Work in progress. Would love to seek feedback & suggestions from the community.

Tagging:

@coolbaluk #737

@tanzyy96

@CallOrRet #731

@sashabaranov

Stream Events as Scanner

The StreamerV2 struct Scan stream events in a loop as they are received from the server.

Unlike the current implemenetation of the CompletionStreaming, he events in the stream are polymorphic and are handled naturally through type switching.

The Scan API mimics the bufio.Scanner class:

var oa *openai.Client
var req openai.CreateThreadAndRunRequest

stream, err := oa.CreateThreadAndStream(ctx, req)
if err != nil {
    return err
}
defer stream.Close()

// read the stream events until there is no Next
for stream.Next() {
    // get the current event
    event := stream.Event()

    // type switch on the event type
    switch event := event.(type) {
    case openai.StreamThreadMessageDelta:
        // type safe access to the event
        fmt.Println("Message:", event.Delta.Content)
        for _, content := range event.Delta.Content {
            if content.Text != nil {
                fmt.Println("Text:", content.Text.Value)
            }

            if content.ImageFile != nil {
                fmt.Println("ImageFile:", content.ImageFile)
            }
        }
    case openai.StreamDone:
        fmt.Println("Done")
    }
}

// check for streaming error
if err := stream.Err(); err != nil {
		return err
}

Unwrappers for Polymporphic Events

Type switching on stream events allows precise and type-safe access. However, it can be tedious to write user-level code for simpler cases. To address this, unwrapper helpers "cast" an event to a specific type and return the value if the cast is successful:

for stream.Next() {
    // we are only interested in the text deltas
    text, ok := stream.MessageDeltaText()
    if !ok {
        // skip this event if it is not a text delta
        continue
    }

    fmt.Print(text)
}

This is similar to the Bytes and Text methods in bufio.Scanner, which provide access to the current item in different forms:

scanner.Bytes()
scanner.Text()

Libraries designed to handle polymorphic data often provide similar unwrapper helpers. For example, the [gjson]https://github.com/tidwall/gjson?tab=readme-ov-file#result-type) library, offer these unwrapper methods for the polymorphic Result type:

result.Exists() bool
result.Value() interface{}
result.Int() int64
result.Uint() uint64
result.Float() float64
result.String() string
result.Bool() bool
result.Time() time.Time

Stream Events as io.Reader

The most common use case is to just stream the text, and it would be nice to
have a familiar & obvious API for this.

The StreamerV2 struct implements the io.Reader interface, which wraps the
Next/Scan API, to provide a simple way read the text deltas from the
thread.message.delta events.

var oa *openai.Client
var req openai.CreateThreadAndRunRequest

// The StreamerV2 is also an io.Reader
s, err := oa.CreateThreadAndStream(ctx, req)
if err != nil {
    return err
}
defer s.Close()

_, err = io.Copy(os.Stdout, s)
return err

TODO Items

  • Naming convention for the stream events? There are quite a few... should they be put in a v2 package?
  • Fuzz the new SSE reader.
  • Complete the type mappings for all the stream events.
  • Integrate Stream into other methods like CreateThreadAndStream.
  • Make a common interface for stream events to return Type()

@hayeah hayeah force-pushed the asst-stream branch 2 times, most recently from 6827844 to f8d19ae Compare May 19, 2024 03:36
Copy link

codecov bot commented May 19, 2024

Codecov Report

Attention: Patch coverage is 84.12698% with 30 lines in your changes are missing coverage. Please review.

Project coverage is 96.61%. Comparing base (774fc9d) to head (f4e1603).
Report is 15 commits behind head on master.

Current head f4e1603 differs from pull request most recent head f8c9b69

Please upload reports for the commit f8c9b69 to get more accurate results.

Files Patch % Lines
run.go 76.92% 8 Missing and 7 partials ⚠️
stream_v2.go 82.75% 8 Missing and 2 partials ⚠️
sse.go 92.42% 3 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #748      +/-   ##
==========================================
- Coverage   98.46%   96.61%   -1.85%     
==========================================
  Files          24       26       +2     
  Lines        1364     1329      -35     
==========================================
- Hits         1343     1284      -59     
- Misses         15       28      +13     
- Partials        6       17      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants