-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
126 lines (109 loc) · 4.15 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import hashlib
import uvicorn
import watchdog
import werkzeug
from datetime import datetime
from watchdog.observers import Observer
from fastapi import FastAPI, HTTPException, Request
from fastapi.templating import Jinja2Templates
from db import *
from notify import apobj
FILES_DIRECTORY = os.getenv("FILES_DIRECTORY", "./monito")
app = FastAPI()
templates = Jinja2Templates(directory="templates")
def calculate_hash(file_path):
with open(file_path, "rb") as file:
sha256_hash = hashlib.sha256()
while chunk := file.read(8192):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def on_modified(event):
if event.is_directory:
return
apobj.notify(
body=f"👀 Name: {event.src_path}\n#️⃣ Hash: {calculate_hash(event.src_path)}\n⏰ Date: {str(datetime.now())}",
title='⚠️ == File Modified == ⚠️'
)
path = event.src_path
filename = os.path.basename(path)
hash_value = calculate_hash(path)
file_info = get_file(filename, path)
if not file_info:
insert_file(filename, path, hash_value)
try:
if file_info.hash != hash_value:
update_file(filename, path, hash_value)
except:
pass
def first_run():
for root, dirs, files in os.walk(FILES_DIRECTORY):
for file in files:
path = os.path.join(root, file)
hash_value = calculate_hash(path)
file_info = get_file(file, path)
if not file_info:
insert_file(file, path, hash_value)
if file_info.hash != hash_value:
update_file(file, path, hash_value)
@app.get("/")
def json_data():
files = get_files()
files_final = []
for i in files:
files_final.append({"filename": i.filename,
"path": i.path,
"hash": i.hash,
"old_hash": i.old_hash,
"last_modified": i.last_modified})
return {"files": files_final}
@app.get("/board")
def dashboard(request: Request):
files = get_files()
files_final = []
for i in files:
files_final.append({"filename": i.filename,
"path": i.path,
"hash": i.hash,
"old_hash": i.old_hash,
"last_modified": i.last_modified})
return templates.TemplateResponse("dashboard.html", {"request": request, "files_data": files_final})
@app.get("/files/{filename:path}")
def read_file(filename: str):
secure_filename = werkzeug.utils.secure_filename(filename)
print(secure_filename)
path = os.path.join(FILES_DIRECTORY, secure_filename)
if not os.path.isfile(path):
raise HTTPException(status_code=404, detail="File not found.")
hash_value = calculate_hash(path)
file_info = get_file(secure_filename, path)
if not file_info:
insert_file(secure_filename, path, hash_value)
if file_info.hash != hash_value:
update_file(secure_filename, path, hash_value)
return {"status": "File Modified", "filename": secure_filename, "path": path, "hash": hash_value, "old_hash": file_info[4], "last_modified": file_info[5]}
return {"status": "OK.","filename": secure_filename, "path": path, "hash": hash_value, "old_hash": file_info.old_hash, "last_modified": file_info.last_modified}
@app.get("/files")
def read_files():
files = get_files()
files_final = []
for i in files:
files_final.append({"filename": i.filename,
"path": i.path,
"hash": i.hash,
"old_hash": i.old_hash,
"last_modified": i.last_modified})
return {"files": files_final}
if __name__ == "__main__":
create_table()
observer = Observer()
event_handler = watchdog.events.FileSystemEventHandler()
event_handler.on_modified = on_modified
observer.schedule(event_handler, path=FILES_DIRECTORY, recursive=True)
observer.start()
try:
first_run()
uvicorn.run(app, host="0.0.0.0", port=8000)
except KeyboardInterrupt:
observer.stop()
observer.join()