-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.go
127 lines (105 loc) · 2.22 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package hansip
import (
"errors"
"sync/atomic"
"time"
"github.com/go-pg/pg"
)
var errPingTimeout = errors.New("ping timeout")
// connection abstracts connection to a database server.
// it handles connection updates by pinging the server every connTickDelay.
type connection struct {
host string
s sql
pingTimeout time.Duration
connCheckDelay time.Duration
pingFn func() error
pingRunning int32
closeFn func()
// 1 for connected, 0 for not
connected int32
closed bool
quitChan chan struct{}
}
// create a new connection instance
// and start loop in background to update connection status
func newConnection(options *pg.Options, pingTimeout, connCheckDelay time.Duration) (*connection, error) {
db := pg.Connect(options)
conn := &connection{
host: options.Addr,
s: &gopgSQL{
db: db,
},
pingTimeout: pingTimeout,
connCheckDelay: connCheckDelay,
quitChan: make(chan struct{}),
pingFn: func() error {
_, err := db.Exec("select 1;")
return err
},
closeFn: func() {
db.Close()
},
}
// check if connection is working
if err := conn.ping(); err != nil {
return nil, err
}
conn.updateStatus()
// start main loop
go conn.loop()
return conn, nil
}
func (c *connection) ping() error {
if !atomic.CompareAndSwapInt32(&c.pingRunning, 0, 1) {
return nil
}
defer atomic.StoreInt32(&c.pingRunning, 0)
errChan := make(chan error)
go func() {
errChan <- c.pingFn()
}()
select {
case <-time.After(c.pingTimeout):
return errPingTimeout
case err := <-errChan:
return err
}
}
func (c *connection) getConnected() bool {
return atomic.LoadInt32(&c.connected) == 1
}
func (c *connection) setConnected(connected bool) {
if connected {
atomic.StoreInt32(&c.connected, 1)
} else {
atomic.StoreInt32(&c.connected, 0)
}
}
func (c *connection) loop() {
ticker := time.NewTicker(c.connCheckDelay)
for {
select {
case <-ticker.C:
c.updateStatus()
case <-c.quitChan:
return
}
}
}
func (c *connection) updateStatus() {
connected := c.ping() == nil
c.setConnected(connected)
}
func (c *connection) quit() {
if c.closed {
return
}
// stop loop
c.quitChan <- struct{}{}
if c.closeFn != nil {
c.closeFn()
}
c.setConnected(false)
c.closed = true
}