-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
229 lines (190 loc) · 8.44 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# coding=utf-8
import ConfigParser
import importlib
import json
import logging
import unittest
import urllib
import sys
import urllib2
import telegram
import telegram_commands.getgame as getgame
# standard app engine imports
from google.appengine.api import urlfetch
from google.appengine.ext import ndb
import webapp2
from telegram_commands import gettopgames
from telegram_commands import getpopgames
from telegram_commands import watchtopgames
from telegram_commands import watchpopgames
from telegram_commands import unwatchtopgames
from telegram_commands import unwatchpopgames
BASE_URL = 'https://api.telegram.org/bot'
# Read keys.ini file at program start (don't forget to put your keys in there!)
keyConfig = ConfigParser.ConfigParser()
keyConfig.read(["keys.ini", "..\keys.ini"])
bot = telegram.Bot(keyConfig.get('Telegram', 'TELE_BOT_ID'))
# ================================
class AllWatchesValue(ndb.Model):
# key name: AllWatches
currentValue = ndb.StringProperty(indexed=False, default='')
# ================================
def addToAllWatches(command, chat_id, request=''):
es = AllWatchesValue.get_or_insert('AllWatches')
es.currentValue += ',' + str(chat_id) + ':' + command + (':' + request if request != '' else '')
es.put()
def AllWatchesContains(command, chat_id, request=''):
es = AllWatchesValue.get_by_id('AllWatches')
if es:
return (',' + str(chat_id) + ':' + command + (':' + request if request != '' else '')) in str(es.currentValue) or \
(str(chat_id) + ':' + command + (':' + request if request != '' else '') + ',') in str(es.currentValue)
return False
def setAllWatchesValue(NewValue):
es = AllWatchesValue.get_or_insert('AllWatches')
es.currentValue = NewValue
es.put()
def getAllWatches():
es = AllWatchesValue.get_by_id('AllWatches')
if es:
return es.currentValue
return ''
def removeFromAllWatches(watch):
setAllWatchesValue(getAllWatches().replace(',' + watch + ',', ',')
.replace(',' + watch, '')
.replace(watch + ',', ''))
# ================================
class MeHandler(webapp2.RequestHandler):
def get(self):
urlfetch.set_default_fetch_deadline(60)
self.response.write(json.dumps(json.load(urllib2.urlopen(
BASE_URL + keyConfig.get('Telegram', 'TELE_BOT_ID') + '/getMe'))))
class GetUpdatesHandler(webapp2.RequestHandler):
def get(self):
urlfetch.set_default_fetch_deadline(60)
self.response.write(json.dumps(json.load(urllib2.urlopen(
BASE_URL + keyConfig.get('Telegram', 'TELE_BOT_ID') + '/getUpdates'))))
class SetWebhookHandler(webapp2.RequestHandler):
def get(self):
urlfetch.set_default_fetch_deadline(60)
url = self.request.get('url')
if url:
self.response.write(json.dumps(json.load(urllib2.urlopen(
BASE_URL + keyConfig.get('Telegram', 'TELE_BOT_ID') + '/setWebhook', urllib.urlencode({'url': url})))))
class WebhookHandler(webapp2.RequestHandler):
def post(self):
urlfetch.set_default_fetch_deadline(60)
body = json.loads(self.request.body)
logging.info('request body:')
logging.info(body)
self.response.write(json.dumps(body))
if 'message' in body:
message = body['message']
text = str(message.get('text'))
fr = message.get('from')
user = fr['username'] \
if 'username' in fr \
else fr['first_name'] + ' ' + fr['last_name'] \
if 'first_name' in fr and 'last_name' in fr \
else fr['first_name'] if 'first_name' in fr \
else 'Dave'
chat = message['chat']
chat_id = chat['id']
if not text:
logging.info('no text')
return
text = text.replace(bot.name, '').strip()
print('got text ' + text)
if text.startswith('/game'):
split = text[1:].lower().split(" ", 1)
try:
getgame.run(bot, chat_id, user, '', split[1] if len(split) > 1 else '')
except:
print("Unexpected error running command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
elif text.startswith('/gettopgames'):
try:
gettopgames.run(bot, str(chat_id), user)
except:
print("Unexpected error running get top games command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
elif text.startswith('/getpopgames'):
try:
getpopgames.run(bot, str(chat_id), user)
except:
print("Unexpected error running get pop games command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
elif text.startswith('/watchtopgames'):
try:
watchtopgames.run(bot, str(chat_id), user)
except:
print("Unexpected error running watch top games command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
elif text.startswith('/watchpopgames'):
try:
watchpopgames.run(bot, str(chat_id), user)
except:
print("Unexpected error running watch pop games command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
elif text.startswith('/unwatchtopgames'):
try:
unwatchtopgames.run(bot, str(chat_id), user)
except:
print("Unexpected error running unwatch top games command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
elif text.startswith('/unwatchpopgames'):
try:
unwatchpopgames.run(bot, str(chat_id), user)
except:
print("Unexpected error running unwatch pop games command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
class RunTestsHandler(webapp2.RequestHandler):
def get(self):
urlfetch.set_default_fetch_deadline(60)
suite = unittest.TestSuite()
formattedResultText = ''
getTest = unittest.defaultTestLoader.loadTestsFromName('tests.test_getgame')
suite.addTest(getTest)
formattedResultText += str(unittest.TextTestRunner().run(suite))\
.replace('<unittest.runner.TextTestResult ', '')\
.replace('>', '')
self.response.write(formattedResultText)
class WebCommandRunHandler(webapp2.RequestHandler):
def get(self):
urlfetch.set_default_fetch_deadline(60)
text = self.request.get('text') or self.request.get('game')
user = self.request.get('user') or 'Admin'
if not text:
self.response.write('Argument missing: \'text\' or \'game\'.')
return
chat_id = self.request.get('chat_id')
if not chat_id:
chat_id = keyConfig.get('BotAdministration', 'ADMIN_GROUP_CHAT_ID')
split = text[1:].lower().split(" ", 1)
try:
getgame.run(bot, chat_id, user, split[1] if len(split) > 1 else '')
except:
print("Unexpected error running command:", str(sys.exc_info()[0]) + str(sys.exc_info()[1]))
class TriggerAllWatches(webapp2.RequestHandler):
def get(self):
AllWatches = getAllWatches()
watches_split = AllWatches.split(',')
if len(watches_split) >= 1:
for watch in watches_split:
print('got watch ' + watch)
split = watch.split(':')
if len(split) >= 2:
print('executing command: ' + split[1].replace('get', ''))
mod = importlib.import_module('telegram_commands.watch' + split[1].replace('get', ''))
chat_id = split[0]
mod.run(bot, str(chat_id), 'Watcher')
else:
print('removing from all watches: ' + watch)
removeFromAllWatches(watch)
class ClearAllWatches(webapp2.RequestHandler):
def get(self):
setAllWatchesValue('')
import mirror_handler
app = webapp2.WSGIApplication([
('/me', MeHandler),
('/updates', GetUpdatesHandler),
('/set_webhook', SetWebhookHandler),
('/webhook', WebhookHandler),
('/run_tests', RunTestsHandler),
('/run', WebCommandRunHandler),
('/allwatches', TriggerAllWatches),
('/clearallwatches', ClearAllWatches),
('\/([^\/]+).*', mirror_handler.MirrorHandler)
], debug=True)