-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
257 lines (194 loc) · 7.36 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import time
import threading
from functools import wraps
import psycopg2
import psycopg2.extras
import psycopg2.extensions
from flask import g
# Default exports
__all__ = ('configure_flask', 'configure_flask_socketio')
###
# Flask + psycopg helpers
###
def configure_flask(flask_app, params, idle_timeout_secs=30, register_types=True):
###
# Configure db access for a regular (non-socketio) flask app.
# Set up DB-oriented before_first_request(), before_request(), and
# teardown_request() lifecycle hooks in order to deal with
# connection acquisition and placement as 'g.con', request boundary
# is a db transaction boundary, and ensuring a rollback if the
# flask request exceptions out (in production anyway, see notes
# about bad interaction with flask debugger up in
# ManagedConnection.begin_trasaction()).
###
global mc
configure(params, timeout_secs=idle_timeout_secs)
flask_app.before_first_request(mc.start_closing_thread)
if register_types:
flask_app.before_first_request(register_composite_types)
def begin_and_assign_tx():
g.con = mc.begin_transaction()
flask_app.before_request(begin_and_assign_tx)
def finish_transaction(response):
g.pop('con', None)
mc.complete_transaction()
return response
flask_app.teardown_request(finish_transaction)
@flask_app.errorhandler(500)
def error_500(error):
mc.set_rollback_only()
raise error
def configure_flask_socketio(params, register_types=True,
idle_timeout_secs=30):
global mc
configure(params, timeout_secs=idle_timeout_secs)
# flask-socketio does not fire before_first_request(),
# before_request(), or teardown_request(), so less can be
# done here in lieu of more decorators around the socketio
# event handlers.
mc.start_closing_thread()
if register_types:
register_composite_types()
return mc
####
# Internals from here on out
####
class ManagedConnection():
# Singleton class managing db connection, transaction state,
# and auto-closing the db connection after 30sec inactivity.
def __init__(self, params, cursor_factory, timeout_secs=30):
self.params = params
self.cursor_factory = cursor_factory
self.timeout_secs = timeout_secs
self.con = None
self.busy = False
self.last_used = None
self.commit_after_complete = True
def start_closing_thread(self):
def close_when_idle():
while True:
time.sleep(self.timeout_secs)
if not self.busy \
and self.con \
and time.time() > self.last_used + self.timeout_secs:
self.close()
threading.Thread(target=close_when_idle).start()
def close(self):
if self.con:
self.con.close()
self.con = None
self.busy = False
def begin_transaction(self):
if not self.con:
self.con = self.__connection()
if self.busy:
# Wacky! Holdover from bad interaction with flask debugger
# in devel mode and hitting a caught exception (in debugger)
# on the prior request. Grr. The flask debugger is deeper in
# flask wsgi server than our "with_transaction()" decorator,
# so it doesn't have a chance to rollback itself.
self.con.rollback()
self.busy = True
self.last_used = time.time()
return self.con
def set_rollback_only(self):
# Indicate that the only way this TX should end is
# via rollback, not commit. Observed by complete_transaction()
self.commit_after_complete = False
def complete_transaction(self):
if self.con:
if self.commit_after_complete:
self.con.commit()
else:
self.con.rollback()
self.commit_after_complete = True # clear it for next request.
self.busy = False
def with_transaction(self, func,):
"""
Decorator to use around web dispatched functions,
useful for socketio event handlers, since flask's
more natural way (before_request hooks) don't fire
within a flask_socketio app. Grr.
"""
@wraps(func)
def doit(*args):
con = self.begin_transaction()
try:
func(con, *args)
except Exception as e:
self.set_rollback_only()
raise e
finally:
self.complete_transaction()
return doit
def __connection(self):
return psycopg2.connect(self.params,
cursor_factory=self.cursor_factory)
# The singleton instance.
mc = None
def configure(params, timeout_secs=30,
cursor_factory=psycopg2.extras.NamedTupleCursor,
connect=False):
global mc
mc = ManagedConnection(params, timeout_secs=timeout_secs,
cursor_factory=cursor_factory)
if connect:
return mc.begin_transaction()
def connection():
return mc.begin_transaction()
def flask_connection():
global mc
if '_con' not in g:
g._con = mc.begin_transaction()
return g._con
def register_composite_types():
###
# Teach psycopg about any custom type oids hinted at in custom
# recipe table 'psycopg_types', if needed in this project.
#
# Needed for both 'array of domain' and custom composite type
# support, queries containing either of which will be returning
# novel / surprising oids. So here we hint psycopg2 how to deal
# with them.
###
con = mc.begin_transaction()
typeconverters = [o for o in psycopg2.__dict__.values() if
isinstance(o, type(psycopg2.STRING))]
oid_to_typeconverter = {}
for tc in typeconverters:
for oid in tc.values:
oid_to_typeconverter[oid] = tc
cur = con.cursor()
cur.execute('''
select
pt.type_to_register,
pgt.typcategory,
pgt.typarray,
pgt.typbasetype,
pgt_base.typname
from psycopg_types pt
join pg_catalog.pg_type pgt
left join pg_catalog.pg_type pgt_base
on (pgt.typbasetype = pgt_base.oid
and pgt.typcategory='S')
on pt.type_to_register = pgt.oid;
''')
for c in cur.fetchall():
if c.typcategory == 'C':
# Composite type
psycopg2.extras.register_composite(c.type_to_register,
con, globally=True)
else:
assert c.typcategory == 'S', \
'Do not understand type category %s' % c.typcategory
base_type_adaptor = oid_to_typeconverter.get(c.typbasetype)
if not base_type_adaptor:
raise Exception('Unknown type adaptor for %s'
' for domain array type %s'
% (c.typname, c.type_to_register))
psycopg2.extensions.register_type(
psycopg2.extensions.new_array_type(
(c.typarray,), c.type_to_register + '[]',
base_type_adaptor))
cur.close()
mc.complete_transaction()