-
Notifications
You must be signed in to change notification settings - Fork 8
/
sql.py
130 lines (105 loc) · 4.15 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
import logging
from contextlib import contextmanager
from jsonpickle import encode, decode
from typing import Any
from sqlalchemy import (
Table, MetaData, Column, Integer, String,
ForeignKey, create_engine, select)
from sqlalchemy.orm import mapper, sessionmaker
from sqlalchemy.orm.exc import NoResultFound
from errbot.storage.base import StorageBase, StoragePluginBase
log = logging.getLogger('errbot.storage.sql')
DATA_URL_ENTRY = 'data_url'
class KV(object):
"""This is a basic key/value. Pickling in JSON."""
def __init__(self, key: str, value: Any):
self._key = key
self._value = encode(value)
@property
def key(self) -> str:
return self._key
@property
def value(self) -> Any:
return decode(self._value)
class SQLStorage(StorageBase):
def __init__(self, session, clazz):
self.session = session
self.clazz = clazz
@contextmanager
def _session_op(self):
try:
yield self.session
self.session.commit()
except:
self.session.rollback()
raise
def get(self, key: str) -> Any:
try:
with self._session_op() as session:
result = session.query(self.clazz).filter(self.clazz._key == key).one().value
except NoResultFound:
raise KeyError("%s doesn't exists." % key)
return result
def remove(self, key: str):
try:
with self._session_op() as session:
session.query(self.clazz).filter(self.clazz._key == key).delete()
except NoResultFound:
raise KeyError("%s doesn't exists." % key)
def set(self, key: str, value: Any) -> None:
with self._session_op() as session:
session.merge(self.clazz(key, value))
def len(self):
with self._session_op() as session:
length = session.query(self.clazz).count()
return length
def keys(self):
return (kv.key for kv in self.session.query(self.clazz).all())
def close(self) -> None:
self.session.commit()
class SQLPlugin(StoragePluginBase):
def __init__(self, bot_config):
super().__init__(bot_config)
config = self._storage_config
if DATA_URL_ENTRY not in config:
raise Exception(
'You need to specify a connection URL for the database in your'
'config.py. For example:\n'
'STORAGE_CONFIG={\n'
'"data_url": "postgresql://'
'scott:tiger@localhost/mydatabase/",\n'
'}')
# Hack around the multithreading issue in memory only sqlite.
# This mode is useful for testing.
if config[DATA_URL_ENTRY].startswith('sqlite://'):
from sqlalchemy.pool import StaticPool
self._engine = create_engine(
config[DATA_URL_ENTRY],
connect_args={'check_same_thread': False},
poolclass=StaticPool,
echo=bot_config.BOT_LOG_LEVEL == logging.DEBUG)
else:
self._engine = create_engine(
config[DATA_URL_ENTRY],
pool_recycle=config.get('connection_recycle', 1800),
pool_pre_ping=config.get('connection_ping', True),
echo=bot_config.BOT_LOG_LEVEL == logging.DEBUG)
self._metadata = MetaData()
self._sessionmaker = sessionmaker()
self._sessionmaker.configure(bind=self._engine)
def open(self, namespace: str) -> StorageBase:
# Create a table with the given namespace
table = Table(namespace, self._metadata,
Column('key', String(767), primary_key=True),
Column('value', String(32768)),
extend_existing=True)
class NewKV(KV):
pass
mapper(NewKV, table, properties={
'_key': table.c.key,
'_value': table.c.value})
# ensure that the table for this namespace exists
self._metadata.create_all(self._engine)
# create an autonomous session for it.
return SQLStorage(self._sessionmaker(), NewKV)