-
Notifications
You must be signed in to change notification settings - Fork 27
/
operations.py
345 lines (284 loc) · 13.9 KB
/
operations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
import config
from utils import *
from ws_wrapper import *
import global_var
def operation_general_response(sender, message, group_id, channel_id, is_guild):
at_user_in_group(sender, sender,
"收到" + (", 我的主人, 您的消息是: " if sender in master_id else "") + message
.replace("[", "[")
.replace("]", "]"), group_id, channel_id, is_guild)
def operation_set_online(sender, _, group_id, channel_id, is_guild):
if sender in master_id or sender in auth_set_online_id:
working_groups.add(group_id)
if is_remote_machine():
global_var.is_gpu_connected = True
global_var.gpu_connect_confirm_timer.reset()
at_user_in_group(sender, sender, "主人, 我已上线", group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_set_offline(sender, _, group_id, channel_id, is_guild):
if sender in master_id or sender in auth_set_offline_id:
working_groups.remove(group_id)
if is_remote_machine():
at_user_in_group(sender, sender, "再见, 主人QAQ", group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_gen_image(sender, message, group_id, channel_id, is_guild):
new_message = message
if new_message.startswith("#d"):
if is_vip(sender):
new_message = new_message.replace("#d", "")
if is_group_online(group_id):
global_var.gpu_connect_confirm_timer.run()
if is_not_remote_machine():
at_user_in_group(sender, sender,
f"尊贵的vip用户, 检测到您已进行了py交易, 支持使用额外指令, 现已为您{start_gen_tag_msg}",
group_id, channel_id, is_guild)
else:
if is_remote_machine():
operation_general_response(sender, message, group_id, channel_id, is_guild)
return
else:
if is_group_online(group_id):
global_var.gpu_connect_confirm_timer.run()
if is_not_remote_machine():
at_user_in_group(sender, sender, f"收到, {start_gen_tag_msg}", group_id, channel_id, is_guild)
new_message = new_message.replace(paint_command_msg, "")
new_message = new_message.strip()
if len(new_message) == 0:
global_var.image_gen_messages.append(({}, sender, group_id, channel_id, is_guild, False))
elif new_message[0] == '{':
try:
obj = json.loads(new_message)
global_var.image_gen_messages.append((obj, sender, group_id, channel_id, is_guild, False))
except Exception as e:
if is_remote_machine():
send_err_to_group(sender, e, group_id, channel_id, is_guild)
else:
try:
res = new_message.replace(";", ".").split('.', 3)
new_message = res[-1]
res_len = len(res)
seed, steps, w, h = None, None, None, None
if res_len >= 2:
wh = res[0].split("x", 1)
w = wh[0]
if len(wh) > 1:
h = wh[1]
if res_len >= 3:
steps = res[1]
if res_len >= 4:
seed = res[2]
gen_message = default_gen.copy()
if len(new_message) != 0:
gen_message["prompt"] = new_message
if steps is not None and len(steps) != 0:
gen_message["steps"] = int(steps)
if seed is not None and len(seed) != 0:
gen_message["seed"] = int(seed)
if w is not None and len(w) != 0:
gen_message["width"] = int(w)
if h is not None and len(h) != 0:
gen_message["height"] = int(h)
global_var.image_gen_messages.append((gen_message, sender, group_id, channel_id, is_guild, False))
except Exception as e:
if is_remote_machine():
send_err_to_group(sender, e, group_id, channel_id, is_guild)
def operation_delete_msg(sender, _, group_id, channel_id, is_guild):
if not is_guild:
delete_msg(global_var.last_msg_id_of_user[sender])
def operation_help(sender, _, group_id, channel_id, is_guild):
at_user_in_group(sender, sender, f'''机器人使用说明:
#消息
- 支持转换CQ消息原始数据, 外网服务器可借此拉取外网资源
{paint_command_msg}+描述
- {paint_command_msg}后直接连接关键词, 进行快捷绘图
- e.g. {paint_command_msg}girls
{paint_command_msg}+宽度x高度.步数.种子.描述
- 快捷配置绘图, 点号间可为空使用默认值, 点号与分号可混用
- e.g. {paint_command_msg}128x128.10.2556931196.girls
{paint_command_msg}{"{}"}
- 花括号内传入标准json数据控制绘画, 支持字段prompt,steps,width,height,cfg_scale,sampler_index,seed,negative_prompt
- 输入#默认, 以查看示例
{delete_command_msg}
- 撤回上一条由你触发的机器人消息
#默认
- 查看画图的默认配置
''', group_id, channel_id, is_guild)
def operation_default_config(sender, _, group_id, channel_id, is_guild):
at_user_in_group(sender, sender, json.dumps(default_gen, indent=2), group_id, channel_id, is_guild)
def operation_set_banned(sender, message, group_id, channel_id, is_guild):
if sender in master_id or sender in auth_ban_id:
new_message = message.replace("#拉黑", "")
user = int(new_message)
if user not in master_id:
global_var.banned_user_id.add(user)
if is_remote_machine():
at_user_in_group(sender, sender, "拉黑成功", group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "我怎么可能拉黑我最爱的主人呢???!!!", group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_remove_banned(sender, message, group_id, channel_id, is_guild):
if sender in master_id or sender in auth_ban_id:
new_message = message.replace("#解除", "")
user = int(new_message)
global_var.banned_user_id.remove(user)
if is_remote_machine():
at_user_in_group(sender, sender, "解除黑名单成功", group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_show_blacklist(sender, _, group_id, channel_id, is_guild):
if sender in master_id or sender in auth_blacklist_id:
at_user_in_group(sender, sender, "黑名单:\n" + str(global_var.banned_user_id), group_id, channel_id, is_guild)
else:
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_add_vip(sender, message, group_id, channel_id, is_guild):
if sender in master_id:
new_message = message.replace("#vip", "")
user = int(new_message)
global_var.auth_vip_id.add(user)
if is_remote_machine():
at_user_in_group(sender, sender, f"新增vip: {user}", group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_remove_vip(sender, message, group_id, channel_id, is_guild):
if sender in master_id:
new_message = message.replace("#unvip", "")
user = int(new_message)
global_var.auth_vip_id.remove(user)
if is_remote_machine():
at_user_in_group(sender, sender, f"移除vip: {user}", group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_switch_gpt(sender, _, group_id, channel_id, is_guild):
if sender in master_id:
global_var.use_chatgpt = not global_var.use_chatgpt
print("use_chatgpt:", global_var.use_chatgpt)
if is_not_remote_machine():
at_user_in_group(sender, sender, "已切换至chatGPT" if global_var.use_chatgpt else (
"已切换至自托管模型" if config.use_selfhostedllm else "已切换至GPT3"), group_id, channel_id, is_guild)
else:
if is_remote_machine():
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
def operation_clear_chat(sender, _, group_id, channel_id, is_guild):
if global_var.is_remote_machine:
return
history_id = get_sender_key_in_group(group_id, sender)
if history_id not in global_var.chat_history:
at_user_in_group(sender, sender, "没有可以清理的对话", group_id, channel_id, is_guild)
return
if shared_context:
if sender in master_id:
global_var.chat_history[history_id].clear()
at_user_in_group(sender, sender, "已清理群内共享对话上下文", group_id, channel_id, is_guild)
else:
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
else:
global_var.chat_history[history_id].clear()
at_user_in_group(sender, sender, "已清理你的对话上下文", group_id, channel_id, is_guild)
def operation_switch_at_mode(sender, _, group_id, channel_id, is_guild):
if global_var.is_remote_machine:
return
sender_key = get_sender_key_in_group(group_id, sender)
if sender_key not in global_var.users_not_need_at:
global_var.users_not_need_at[sender_key] = True
at_user_in_group(sender, sender, "你已无需at即可对话", group_id, channel_id, is_guild)
return
del global_var.users_not_need_at[sender_key]
at_user_in_group(sender, sender, "你已恢复到需要at机器人再进行对话", group_id, channel_id, is_guild)
def operation_switch_model(sender, message, group_id, channel_id, is_guild):
if global_var.is_remote_machine:
return
if sender not in master_id:
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
return
new_message = message.replace("#model", "")
new_message = new_message.strip()
response = requests.get(f"{gpu_url}/sdapi/v1/sd-models")
if response.status_code == 200:
models = [model["title"] for model in response.json()]
else:
at_user_in_group(sender, sender, "获取模型列表失败", group_id, channel_id, is_guild)
return
try:
options = requests.get(f"{gpu_url}/sdapi/v1/options").json()
if new_message == "":
at_user_in_group(
sender, sender,
f"当前激活模型:\n{options['sd_model_checkpoint']}\n\n当前可用模型:\n" + "\n".join(models), group_id,
channel_id, is_guild)
else:
for model_title in models:
if new_message.lower() in model_title.lower():
options['sd_model_checkpoint'] = model_title
requests.post(f"{gpu_url}/sdapi/v1/options", json=options)
at_user_in_group(sender, sender, f"已切换至模型:\n{model_title}", group_id, channel_id, is_guild)
return
at_user_in_group(sender, sender, "未找到匹配的模型", group_id, channel_id, is_guild)
except Exception as e:
send_err_to_group(sender, e, group_id, channel_id, is_guild)
return
def operation_switch_vae(sender, message, group_id, channel_id, is_guild):
if global_var.is_remote_machine:
return
if sender not in master_id:
at_user_in_group(sender, sender, "权限不足", group_id, channel_id, is_guild)
return
new_message = message.replace("#vae", "")
new_message = new_message.strip()
try:
options = requests.get(f"{gpu_url}/sdapi/v1/options").json()
if new_message == "":
at_user_in_group(sender, sender, f"当前激活VAE:\n{options['sd_vae']}", group_id, channel_id, is_guild)
else:
old_vae = options['sd_vae']
options['sd_vae'] = new_message
requests.post(f"{gpu_url}/sdapi/v1/options", json=options)
at_user_in_group(
sender, sender,
f"已尝试切换VAE, 注意VAE切换必须准确匹配名称\n如果出现效果异常, 可能是切换错误\n可切换回先前的VAE:\n{old_vae}",
group_id, channel_id, is_guild)
except Exception as e:
send_err_to_group(sender, e, group_id, channel_id, is_guild)
return
def operation_show_balance(sender, _, group_id, channel_id, is_guild):
if global_var.is_remote_machine:
return
response = requests.get("https://api.openai.com/dashboard/billing/credit_grants", headers={
"Content-Type": 'application/json',
"Authorization": f"Bearer {api_key}",
})
if response.status_code == 200:
at_user_in_group(sender, sender, response.text, group_id, channel_id, is_guild)
else:
at_user_in_group(sender, sender, "查询失败:\n" + response.text, group_id, channel_id, is_guild)
both_operations = {
"#上线": operation_set_online,
"#下线": operation_set_offline,
paint_command_msg: operation_gen_image,
"#d": operation_gen_image,
delete_command_msg: operation_delete_msg,
"#拉黑": operation_set_banned,
"#解除": operation_remove_banned,
"#vip": operation_add_vip,
"#unvip": operation_remove_vip,
"#gpt切换": operation_switch_gpt,
"#清理对话": operation_clear_chat,
"#at切换": operation_switch_at_mode,
"#model": operation_switch_model,
"#vae": operation_switch_vae,
"#余额": operation_show_balance,
}
remote_operations = {
"#帮助": operation_help,
"#默认": operation_default_config,
"#黑名单": operation_show_blacklist,
}