-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.go
88 lines (69 loc) · 1.95 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package gearman // import "github.com/nathanaelle/gearman/v2"
import (
"context"
"log"
"github.com/nathanaelle/gearman/v2/protocol"
)
type (
// Client define the exposed interface of a gearman Client
Client interface {
AddServers(...Conn)
Submit(Task) Task
Close() error
AssignTask(TaskID)
GetTask(TaskID) Task
ExtractTask(TaskID) Task
Receivers() (<-chan Message, context.Context)
}
)
func clientLoop(c Client, dbg *log.Logger) {
var tid TaskID
var err error
mQueue, ctx := c.Receivers()
for {
select {
case msg, done := <-mQueue:
if msg.Pkt == nil {
if done {
return
}
debug(dbg, "CLI CORRUPTED MESSAGE \t%#v\n", msg)
continue
}
debug(dbg, "CLI\t%s\n", msg.Pkt)
switch msg.Pkt.Cmd() {
case protocol.Noop:
case protocol.EchoRes:
debug(dbg, "CLI\tECHO [%s]\n", string(msg.Pkt.At(0).Bytes()))
case protocol.Error:
debug(dbg, "CLI\tERR [%s] [%s]\n", msg.Pkt.At(0).Bytes(), string(msg.Pkt.At(1).Bytes()))
case protocol.JobCreated:
if err = tid.Cast(msg.Pkt.At(0)); err != nil {
debug(dbg, "CLI\tprotocol.JobCreated TID [%s] err : %v\n", string(msg.Pkt.At(0).Bytes()), err)
panic(err)
}
c.AssignTask(tid)
case protocol.WorkData, protocol.WorkWarning, protocol.WorkStatus:
if err = tid.Cast(msg.Pkt.At(0)); err != nil {
debug(dbg, "CLI\t%s TID [%s] err : %v\n", msg.Pkt.Cmd(), string(msg.Pkt.At(0).Bytes()), err)
panic(err)
}
c.GetTask(tid).Handle(msg.Pkt)
case protocol.WorkComplete, protocol.WorkFail, protocol.WorkException:
if err = tid.Cast(msg.Pkt.At(0)); err != nil {
debug(dbg, "CLI\t%s TID [%s] err : %v\n", msg.Pkt.Cmd(), string(msg.Pkt.At(0).Bytes()), err)
panic(err)
}
c.ExtractTask(tid).Handle(msg.Pkt)
case protocol.StatusRes:
panic("status_res not wrote")
case protocol.OptionRes:
panic("option_res not wrote")
default:
debug(dbg, "CLI\t%s\n", msg.Pkt)
}
case <-ctx.Done():
return
}
}
}