-
Notifications
You must be signed in to change notification settings - Fork 0
/
filemanager.py
162 lines (121 loc) · 5.56 KB
/
filemanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
import music_tag
import eyed3
from eyed3.id3.frames import ImageFrame
import re
def music_file_with_missing_attributes(filepath):
if os.path.splitext(filepath)[1] != '.mp3':
return False
file = music_tag.load_file(filepath)
for key in ["title", "artist", "albumartist", "album", "genre", "year", "tracknumber"]:
if not field_is_valid(file[key]):
return True
return False
def field_is_valid(metadata_item):
if type(metadata_item.value) == int:
# todo this check isn't correct
return True
return len(metadata_item.value) > 0
def find_candidate_files(path, quarantine_dir, filter_fcn=music_file_with_missing_attributes):
for root, dirs, files in os.walk(path):
if not root.endswith(quarantine_dir):
for name in files:
full_path = os.path.join(root, name)
if filter_fcn(full_path):
yield full_path
def write_file_metadata(filepath, song_details, album_details):
file = music_tag.load_file(filepath)
file['artist'] = album_details['data'][0]['attributes']['artistName']
file['albumartist'] = album_details['data'][0]['attributes']['artistName']
file['title'] = song_details['title']
file['album'] = album_details['data'][0]['attributes']['name']
file['genre'] = song_details['genres']['primary']
file.raw['tracknumber'] = retrieve_track_number(song_details['title'], album_details)
# file.raw['year'] = album_details['data'][0]['attributes']['releaseDate'][0:4]
file['year'] = int(album_details['data'][0]['attributes']['releaseDate'][0:4])
# f = eyed3.load(filepath)
# f.initTag()
# f.tag.year = int(album_details['data'][0]['attributes']['releaseDate'][0:4])
# f.tag.save()
# time.sleep(5)
# year field is written but only displays when we view it first in the debugger
# _ = file['year']
file.save()
def retrieve_track_number(song_title, album_details):
song_list = album_details['data'][0]['relationships']['tracks']['data']
res = -1
for idx, song in enumerate(song_list):
if song['attributes']['name'] == song_title:
res = idx + 1
break
if res < 10:
return '0' + str(res)
return str(res)
def rename_file(filepath, song_details):
working_dir = os.path.dirname(filepath)
if not os.path.exists(os.path.join(working_dir, song_details['title'] + '.mp3')):
os.rename(filepath, os.path.join(working_dir, song_details['title'] + '.mp3'))
def move_problem_file(filepath, quarantine_dir):
base_name = os.path.basename(filepath)
os.rename(filepath, os.path.join(quarantine_dir, base_name))
def add_album_artwork(filepath):
base_name = os.path.basename(filepath)
base_name_no_ext = base_name[:base_name.rindex('.')]
dir_name = os.path.dirname(filepath)
jpg_path = os.path.join(dir_name, base_name_no_ext + '.jpg')
audio_file = eyed3.load(filepath)
if audio_file.tag is None:
audio_file.initTag()
audio_file.tag.images.set(ImageFrame.FRONT_COVER, open(jpg_path, 'rb').read(), 'image/jpeg')
audio_file.tag.save(version=eyed3.id3.ID3_V2_3)
def cleanup_raw_files():
curr_dir = os.path.dirname(os.path.realpath(__file__))
for root, dirs, files in os.walk(curr_dir):
for filename in files:
if filename.endswith('.raw'):
os.remove(os.path.join(curr_dir, filename))
def cleanup_jpg_files(working_dir):
for root, dirs, files in os.walk(working_dir):
for filename in files:
if filename.endswith('.jpg'):
os.remove(os.path.join(root, filename))
def makedir_if_absent(path):
if not os.path.exists(path):
os.makedirs(path)
def reorganize_files(path, quarantine_dir):
for root, dirs, files in os.walk(path):
if not root.endswith(quarantine_dir):
for name in files:
if os.path.splitext(name)[1] == '.mp3':
full_path = os.path.join(root, name)
organize_song_by_tags(path, full_path, quarantine_dir)
def organize_song_by_tags(base_music_path, filepath, quarantine_dir):
file = music_tag.load_file(filepath)
artist_name = str(file['artist'])
album_name = str(file['album'])
artist_name_sanitized = sanitize_name_for_directory(artist_name)
album_name_sanitized = sanitize_name_for_directory(album_name)
makedir_if_absent(os.path.join(base_music_path, artist_name_sanitized))
makedir_if_absent(os.path.join(base_music_path, artist_name_sanitized, album_name_sanitized))
if not check_song_location(filepath, artist_name_sanitized, artist_name_sanitized):
try:
os.rename(filepath, os.path.join(base_music_path, artist_name_sanitized, album_name_sanitized,
os.path.basename(filepath)))
except FileExistsError:
try:
makedir_if_absent(os.path.join(quarantine_dir, 'DUPLICATE_FILES'))
os.rename(filepath, os.path.join(quarantine_dir, 'DUPLICATE_FILES', os.path.basename(filepath)))
except FileExistsError:
return
def check_song_location(filepath, artist_name_sanitized, album_name_sanitized):
directory_list = filepath.split(os.sep)
if len(directory_list) < 2:
return False
if directory_list[-1] != album_name_sanitized:
return False
if directory_list[-2] != artist_name_sanitized:
return False
return True
def sanitize_name_for_directory(name):
#TODO this check is too aggressive, need to rework
return re.sub(r'[^\w_. -]', '_', name)