Skip to content

Commit

Permalink
Fix goroutine deadlock issues
Browse files Browse the repository at this point in the history
  • Loading branch information
alxarch committed Feb 8, 2019
1 parent 996cf18 commit 2333537
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 46 deletions.
67 changes: 40 additions & 27 deletions arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ycat
import (
"errors"
"fmt"
"os"
"io"
"strconv"
"strings"

Expand Down Expand Up @@ -109,24 +109,20 @@ func (p *argParser) parseLong(name, value string, argv []string) ([]string, erro
return argv, err
}
eval := EvalTask(p.vm, e.Bind, filename, snippet)
if input := p.inputTask(); len(input) == 0 {
p.tasks = append(p.tasks, eval)
} else {
p.tasks = append(p.tasks, input, eval)
}
p.addTask(eval)
case "output":
value, argv = shiftArgV(value, argv)
if p.output = OutputFromString(value); p.output == OutputInvalid {
return argv, fmt.Errorf("Invalid output format: %q", value)
}
case "null":
p.input = append(p.input[:0], NullStream{})
p.input = append(p.input, NullStream{})
case "to-json":
p.output = OutputJSON
case "help":
p.help = true
case "array":
p.tasks = append(p.tasks, ToArray{})
p.addTask(ToArray{})
case "yaml":
return p.parseFiles(value, argv, YAML), nil
case "json":
Expand All @@ -137,12 +133,13 @@ func (p *argParser) parseLong(name, value string, argv []string) ([]string, erro
return argv, nil
}

// func (p *argParser) lastTask() StreamTask {
// if len(p.tasks) > 0 {
// return p.tasks[len(p.tasks)-1]
// }
// return nil
// }
func (p *argParser) addTask(t StreamTask) {
if input := p.inputTask(); input == nil {
p.tasks = append(p.tasks, t)
} else {
p.tasks = append(p.tasks, input, t)
}
}

func (p *argParser) parseShort(a string, argv []string) ([]string, error) {
for ; len(a) > 0; a = a[1:] {
Expand Down Expand Up @@ -206,6 +203,8 @@ func isOption(a string) bool {

type argParser struct {
vm *jsonnet.VM
stdin io.Reader
stdout io.WriteCloser
eval *Eval
output Output
input []StreamTask
Expand All @@ -215,12 +214,24 @@ type argParser struct {
}

func (p *argParser) addFile(path string, format Format) {
f := InputFile{format, path}
p.input = append(p.input, &f)
if format == Auto {
format = DetectFormat(path)
}
switch path {
case "", "-":
// Handle here to be able to test stdin
p.input = append(p.input, ReadFromTask(p.stdin, format))
default:
f := InputFile{format, path}
p.input = append(p.input, &f)
}
}

func ParseArgs(argv []string) ([]StreamTask, bool, error) {
p := argParser{}
func ParseArgs(argv []string, stdin io.Reader, stdout io.WriteCloser) ([]StreamTask, bool, error) {
p := argParser{
stdin: stdin,
stdout: stdout,
}
if err := p.Parse(argv); err != nil {
return nil, false, err
}
Expand All @@ -237,7 +248,7 @@ func (p *argParser) Parse(argv []string) (err error) {
if len(a) == 2 {
// Special -- arg
for _, a := range argv {
p.addFile(a, DetectFormat(a))
p.addFile(a, Auto)
}
return
}
Expand All @@ -254,18 +265,20 @@ func (p *argParser) Parse(argv []string) (err error) {
}
func (p *argParser) Tasks() (tasks []StreamTask) {
tasks = append(tasks, p.tasks...)
if len(tasks) == 0 {
tasks = append(tasks, p.inputTask())
if task := p.inputTask(); task != nil {
tasks = append(tasks, task)
} else if len(tasks) == 0 {
tasks = append(tasks, ReadFromTask(p.stdin, YAML))
}

return append(tasks, StreamWriteTo(os.Stdout, p.output))
tasks = append(tasks, StreamWriteTo(p.stdout, p.output))
return tasks
}

func (p *argParser) inputTask() (s StreamTaskSequence) {
func (p *argParser) inputTask() (s StreamTask) {
if p.input == nil {
return append(s, ReadFromTask(os.Stdin, YAML))
return nil
}
s = append(s, p.input...)
p.input = p.input[:0]
s = StreamTaskSequence(p.input)
p.input = nil
return
}
67 changes: 67 additions & 0 deletions arguments_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ycat_test

import (
"bytes"
"context"
"io"
"strings"
"testing"

"github.com/alxarch/ycat"
)

func fullTest(t *testing.T) {
t.Helper()

}

type nopCloser struct {
io.Writer
}

func (nopCloser) Close() error {
return nil
}
func TestParseArgs(t *testing.T) {
type TestCase struct {
Args []string
Help bool
Err bool
NumTasks int
Stdin string
Output string
}
tcs := []TestCase{
{nil, false, false, 2, "1", "1\n"},
{[]string{"testdata/foo.yaml"}, false, false, 2, "", "foo: bar\n"},
{[]string{"testdata/foo.yaml", "-e", `x + {bar: "baz"}`}, false, false, 2, "", "foo: bar\nbar: baz\n"},
// {[]string{""}, false, false, 2, "1", "1\n"},
}
for i, tc := range tcs {
buf := &bytes.Buffer{}
stdout := &nopCloser{buf}
stdin := strings.NewReader(tc.Stdin)
tasks, help, err := ycat.ParseArgs(tc.Args, stdin, stdout)
if err != nil {
t.Fatal(i, err)
}
if len(tasks) != tc.NumTasks {
t.Errorf("%s Invalid tasks %d != %d", tc.Args, len(tasks), tc.NumTasks)
}
if help != tc.Help {
t.Errorf("Invalid help")
}

p := ycat.BlankPipeline()
p = p.Pipe(context.Background(), tasks...)
for err := range p.Errors() {
if err != nil {
t.Error(err)
}
}
if buf.String() != tc.Output {
t.Errorf("%#v Wrong output: %q != %q", tc.Args, buf.String(), tc.Output)
}
}

}
2 changes: 1 addition & 1 deletion cmd/ycat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func printUsage(err error) {
}

func main() {
tasks, help, err := ycat.ParseArgs(os.Args[1:])
tasks, help, err := ycat.ParseArgs(os.Args[1:], os.Stdin, os.Stdout)
if err != nil {
printUsage(err)
os.Exit(2)
Expand Down
39 changes: 22 additions & 17 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ type InputFile struct {
Path string
}

type InputFiles []InputFile

func (files InputFiles) Run(s Stream) error {
for i := range files {
f := &files[i]
if err := f.Run(s); err != nil {
return err
}
}
return nil

}
func (InputFiles) Size(ctx context.Context) int {
return 0
}

type Output int

const (
Expand Down Expand Up @@ -96,25 +112,14 @@ func NewEncoder(w io.Writer, format Output) Encoder {
}

func (f *InputFile) Size(_ context.Context) int {
return 2
return 0
}

func (f *InputFile) Run(s Stream) error {
format := f.Format
if format == Auto {
format = DetectFormat(f.Path)
}
var r io.Reader
switch f.Path {
case "", "-":
r = os.Stdin
default:
f, err := os.Open(f.Path)
if err != nil {
return err
}
defer f.Close()
r = f
r, err := os.Open(f.Path)
if err != nil {
return err
}
return ReadFromTask(r, format).Run(s)
defer r.Close()
return ReadFromTask(r, f.Format).Run(s)
}
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (s *stream) Push(v *Value) bool {
case s.out <- v:
return true
case <-s.done:
println("push s.done", false)
return false
}
}
Expand Down Expand Up @@ -111,6 +110,7 @@ func (p Pipeline) RunTask(ctx context.Context, task StreamTask) Pipeline {

func ReadFromTask(r io.Reader, format Format) StreamTask {
return StreamFunc(func(s Stream) error {
// Needed for reading in the middle of a pipeline
if err := Drain(s); err != nil {
return err
}
Expand Down

0 comments on commit 2333537

Please sign in to comment.