Skip to content

Commit

Permalink
Drain previous values on ReadTask
Browse files Browse the repository at this point in the history
  • Loading branch information
alxarch committed Feb 8, 2019
1 parent 241a083 commit 996cf18
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (p Pipeline) RunTask(ctx context.Context, task StreamTask) Pipeline {

func ReadFromTask(r io.Reader, format Format) StreamTask {
return StreamFunc(func(s Stream) error {
if err := Drain(s); err != nil {
return err
}
dec := NewDecoder(r, format)
for {
v := new(Value)
Expand All @@ -122,6 +125,7 @@ func ReadFromTask(r io.Reader, format Format) StreamTask {
// println("read err", err.Error())
return err
}

if !s.Push(v) {
// println("push failed")
return nil
Expand Down

0 comments on commit 996cf18

Please sign in to comment.