-
Notifications
You must be signed in to change notification settings - Fork 0
/
planner_gtp.py
402 lines (341 loc) · 14.8 KB
/
planner_gtp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
import asyncio
import nextcord
from azure.identity.aio import ClientSecretCredential
from msgraph import GraphServiceClient
from msgraph.generated.users.users_request_builder import UsersRequestBuilder
from nextcord import SlashOption
from msgraph.generated.models.planner_assignments import PlannerAssignments
from msgraph.generated.models.planner_task import PlannerTask
import json
import boto3
def load_config():
try:
with open('bot/config.json', 'r') as config_file:
return json.load(config_file)
except FileNotFoundError:
print("Configuration file not found. Please check the path.")
exit(1)
except json.JSONDecodeError:
print("Error parsing the configuration file. Please check its format.")
exit(1)
config = load_config()
intents = nextcord.Intents.default()
intents.members = True
bot = nextcord.Client(intents=nextcord.Intents.all())
bot_offline_alert_task = None
TENANT_ID = config['azure']['tenant_id']
CLIENT_ID = config['azure']['client_id']
CLIENT_SECRET = config['azure']['client_secret']
PLAN_ID = config['azure']['plan_id']
BOT_TOKEN = config['discord']['token']
LOGS_CHANNEL_ID= config['discord']['log_channel_id']
# Access the mappings
discord_id_mapping = config['mappings']['discord_id_mapping']
bucket_id_to_name_mapping = config['mappings']['bucket_id_to_name_mapping']
user_id_to_name_mapping = config['mappings']['user_id_to_name_mapping']
discord_channel_mapping = config['mappings']['discord_channel_mapping']
# Invert the mapping to look up bucket IDs by Discord channel IDs.
bucket_id_by_discord_channel = {v: k for k, v in discord_channel_mapping.items()}
# Invert the mapping to look up user IDs by Discord ID.
discord_id_to_user_id_mapping = {v: k for k, v in discord_id_mapping.items()}
# Load AWS Config
AWS_ACCESS_KEY = config['aws']['access_key_id']
AWS_SECRET_KEY = config['aws']['secret_access_key']
SECURITY_GROUP_ID = config['aws']['security_group_id']
# Initialize EC2 Resource with AWS Config
ec2 = boto3.resource(
'ec2',
region_name='us-east-1',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
)
def split_messages(tasks, max_length=2000):
# If the entire message is shorter than the max length, return it as is.
full_message = "".join(tasks)
if len(full_message) <= max_length:
return [full_message]
messages = []
current_message = ""
for task in tasks:
if len(current_message) + len(task) > max_length:
messages.append(current_message)
current_message = task
else:
current_message += task
messages.append(current_message)
return messages
async def alert_offline():
await asyncio.sleep(60)
if not bot.is_ready():
logs_channel = bot.get_channel(LOGS_CHANNEL_ID)
if logs_channel:
embed = nextcord.Embed(
title="Shop Status", description="Shop is offline", colour=0xFF0000
)
await logs_channel.send(embed=embed)
else:
print("Logs channel not found. Please check the provided ID.")
@bot.event
async def on_disconnect():
global bot_offline_alert_task
bot_offline_alert_task = asyncio.create_task(alert_offline())
@bot.event
async def on_ready():
global bot_offline_alert_task
if bot_offline_alert_task:
bot_offline_alert_task.cancel()
print(f'{bot.user.name} is Ready!')
@bot.slash_command()
async def ping(interaction: nextcord.Interaction):
await interaction.response.send_message(f"Pong! {round(bot.latency * 1000)}ms")
async def create_client_credential():
return ClientSecretCredential(TENANT_ID, CLIENT_ID, CLIENT_SECRET)
async def create_graph_service_client():
client_credential = await create_client_credential()
return GraphServiceClient(client_credential)
########################################
async def get_users():
app_client = await create_graph_service_client()
query_params = UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
select=['displayName', 'id', 'mail'],
top=25,
orderby=['displayName']
)
request_config = UsersRequestBuilder.UsersRequestBuilderGetRequestConfiguration(
query_parameters=query_params
)
users = await app_client.users.get(request_configuration=request_config)
return users
@bot.slash_command(description="List users from Microsoft Graph")
async def list_users(interaction: nextcord.Interaction):
users_page = await get_users()
if users_page and users_page.value:
response_message = ""
for user in users_page.value:
discord_id = discord_id_mapping.get(user.id, 'Discord ID not available')
user_info = (f'User: **{user.display_name}**\n'
f'Email: {user.mail}\n'
f'User ID: {user.id}\n'
f'Discord ID: <@{discord_id}>\n\n')
response_message += user_info
# Split the response message if it's too long for Discord
if len(response_message) >= 2000:
# Send the message in parts if it's too long
for i in range(0, len(response_message), 1990):
await interaction.followup.send(response_message[i:i+1990])
else:
await interaction.response.send_message(response_message)
else:
await interaction.response.send_message("No users were found.")
########################################
async def get_user_tasks(user_id):
app_client = await create_graph_service_client()
if user_id:
try:
tasks = await app_client.users.by_user_id(user_id).planner.tasks.get()
return tasks
except Exception as e:
print(f"Error fetching tasks: {e}")
return None
else:
print("User ID not provided")
return None
@bot.slash_command(description="List a user's tasks from Microsoft Planner")
async def user_tasks(
interaction: nextcord.Interaction,
member: nextcord.Member = SlashOption(
name="user",
description="The user whose tasks to list",
required=True
),
status: str = SlashOption(
name="status",
description="Filter tasks by status",
required=False,
choices={
"Not Started": "Not Started",
"In Progress": "In Progress",
"Completed": "Completed"
}
)
):
await interaction.response.defer()
discord_id = str(member.id)
user_id = discord_id_to_user_id_mapping.get(discord_id)
if user_id is None:
await interaction.followup.send("No matching user ID found for the provided Discord ID")
return
tasks_page = await get_user_tasks(user_id)
if tasks_page and tasks_page.value:
tasks_details = []
for task in tasks_page.value:
task_status = 'Not Started' if task.percent_complete == 0 else 'Completed' if task.percent_complete == 100 else 'In Progress'
if status and status != task_status:
continue
bucket_name = bucket_id_to_name_mapping.get(task.bucket_id, "Unknown Bucket")
# Check if created_by and user are not None before accessing user.id
if task.created_by and task.created_by.user:
creator_id = task.created_by.user.id
else:
creator_id = None
creator_name = user_id_to_name_mapping.get(creator_id, "Unknown User")
created_date = task.created_date_time.strftime('%Y-%m-%d') if task.created_date_time else "Unknown Date"
task_details = (f'Task: **{task.title}**\n'
f'Bucket: {bucket_name}\n'
f'Created By: {creator_name}\n'
f'Status: {task_status}\n'
f'Created Date: {created_date}\n\n')
tasks_details.append(task_details)
if not tasks_details:
await interaction.followup.send("No tasks found with the given status.")
else:
# Now, split the messages if necessary
split_messages_list = split_messages(tasks_details)
for message in split_messages_list:
await interaction.followup.send(message)
else:
await interaction.followup.send("No tasks were found for the user.")
########################################
async def get_tasks_in_bucket(bucket_id):
app_client = await create_graph_service_client()
tasks = await app_client.planner.buckets.by_planner_bucket_id(bucket_id).tasks.get()
return tasks
async def get_tasks_in_channel(discord_channel_id):
bucket_id = bucket_id_by_discord_channel.get(discord_channel_id)
if bucket_id:
try:
return await get_tasks_in_bucket(bucket_id)
except Exception as e:
print(f"Error fetching tasks for bucket ID {bucket_id}: {e}")
return None
else:
print(f"No bucket found for Discord channel ID {discord_channel_id}")
return None
@bot.slash_command(description="List tasks for a specified channel or the current channel if none is specified.")
async def channel_tasks(
interaction: nextcord.Interaction,
channel: nextcord.abc.GuildChannel = nextcord.SlashOption(
required=False,
channel_types=[nextcord.ChannelType.text],
description="The channel to list tasks for"
),
status: str = nextcord.SlashOption(
required=False,
choices={"Not Started": "Not Started", "In Progress": "In Progress", "Completed": "Completed"},
description="The status of the tasks to list"
)
):
await interaction.response.defer()
target_channel_id = str(channel.id if channel else interaction.channel_id)
tasks_page = await get_tasks_in_channel(target_channel_id)
if tasks_page and tasks_page.value:
tasks_details = []
for task in tasks_page.value:
task_status = 'Not Started' if task.percent_complete == 0 else 'Completed' if task.percent_complete == 100 else 'In Progress'
if status and status != task_status:
continue
assignees = task.assignments.additional_data if task.assignments else {}
assignee_names = [user_id_to_name_mapping.get(a_id, "Unknown User") for a_id in assignees]
# Check if created_by and user are not None before accessing user.id
if task.created_by and task.created_by.user:
creator_id = task.created_by.user.id
else:
creator_id = None
creator_name = user_id_to_name_mapping.get(creator_id, "Unknown User")
created_date = task.created_date_time.strftime('%Y-%m-%d') if task.created_date_time else "Unknown Date"
task_details = (
f'Task: **{task.title}**\n'
f'Created By: {creator_name}\n'
f'Created Date: {created_date}\n'
f'Status: {task_status}\n'
f'Assigned to: {", ".join(assignee_names)}\n\n'
)
tasks_details.append(task_details)
if not tasks_details:
await interaction.followup.send("No tasks found for this channel with the given status.")
else:
split_messages_list = split_messages(tasks_details)
for message in split_messages_list:
await interaction.followup.send(message)
else:
await interaction.followup.send("No tasks were found for this channel.")
async def create_planner_task(user_id,task_title, bucket_id):
graph_client = await create_graph_service_client()
new_task = PlannerTask(
plan_id=PLAN_ID,
bucket_id=bucket_id,
title=task_title,
assignments = PlannerAssignments(
additional_data = {
user_id : {
"@odata.type" : "#microsoft.graph.plannerAssignment",
"orderHint" : " !",
},
}
),
)
try:
result = await graph_client.planner.tasks.post(new_task)
return result
except Exception as e:
print(f"An error occurred: {e}")
return None
@bot.slash_command()
async def create_task(interaction: nextcord.Interaction, member: nextcord.Member = SlashOption(
name="user",
description="The user to assign the task to",
required=True
), task_title: str = SlashOption(
name="task_title",
description="The title of the task to create",
required=True
), bucket: nextcord.abc.GuildChannel = nextcord.SlashOption(
required=False,
channel_types=[nextcord.ChannelType.text],
description="The bucket to assign the task to"
)):
await interaction.response.defer()
discord_id = str(member.id)
user_id = discord_id_to_user_id_mapping.get(discord_id)
if user_id is None:
await interaction.followup.send("No matching user ID found for the provided Discord ID")
return
target_channel_id = bucket.id if bucket else interaction.channel_id
bucket_id = bucket_id_by_discord_channel.get(str(target_channel_id))
result = await create_planner_task(user_id, task_title, bucket_id)
if result:
# Create a success embed
embed = nextcord.Embed(title="Task Created Successfully", color=nextcord.Color.green())
embed.add_field(name="Task Title", value=task_title, inline=False)
# Construct a clickable channel mention
if bucket:
bucket_mention = bucket.mention
else:
bucket_mention = f"<#{interaction.channel.id}>"
embed.add_field(name="Bucket", value=bucket_mention, inline=True)
# Mention the user
assigned_user_mention = member.mention
embed.add_field(name="Assigned To", value=assigned_user_mention, inline=True)
# Send the success embed
await interaction.followup.send(embed=embed)
else:
# Create a failure embed
embed = nextcord.Embed(title="Failed to Create Task", color=nextcord.Color.red())
# Send the failure embed
await interaction.followup.send(embed=embed)
@bot.slash_command(description="Whitelist an IP address")
async def whitelist(interaction: nextcord.Interaction, ip_address: str, port: int = 8080):
await interaction.response.defer()
try:
security_group = ec2.SecurityGroup(SECURITY_GROUP_ID)
ip_permission = {
'IpProtocol': 'tcp',
'FromPort': port,
'ToPort': port,
'IpRanges': [{'CidrIp': f'{ip_address}/32'}]
}
security_group.authorize_ingress(IpPermissions=[ip_permission])
await interaction.followup.send(f"Your IP has been whitelisted on port {port}.")
except Exception as e:
await interaction.followup.send(f"Failed to whitelist: {str(e)}")
bot.run(BOT_TOKEN)