-
Notifications
You must be signed in to change notification settings - Fork 25
/
mbox.py
312 lines (268 loc) · 9.18 KB
/
mbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
#!/usr/bin/env python3
#
# Copyright 2016 Red Hat, Inc.
#
# Authors:
# Fam Zheng <[email protected]>
#
# This work is licensed under the MIT License. Please see the LICENSE file or
# http://opensource.org/licenses/MIT.
import email
import email.utils
import email.header
import datetime
import re
from rest_framework.fields import DateTimeField
def _parse_header(header):
r = ""
for h, c in email.header.decode_header(header):
if isinstance(h, bytes):
h = h.decode(c or "utf-8", "replace")
r += h
if "\n" in r:
r = " ".join([x.strip() for x in r.splitlines()])
return r
def parse_address(addr_str):
name, addr = email.utils.parseaddr(addr_str)
name = _parse_header(name)
return name, addr
def _addr_fmt_text(name, addr):
if name:
return "%s <%s>" % (name, addr)
else:
return addr
def addr_db_to_rest(obj):
if obj[0] != obj[1]:
return {"name": obj[0], "address": obj[1]}
else:
return {"address": obj[1]}
def decode_payload(m):
payload = m.get_payload(decode=True)
charset = m.get_content_charset()
try:
return payload.decode(charset or "utf-8", errors="replace")
except:
if charset != "utf-8":
# Still fall back from non-utf-8 to utf-8
return payload.decode("utf-8")
else:
raise
class MboxMessage:
"""Helper class to process mbox"""
def __init__(self, m):
self._m = email.message_from_string(m)
self._status = {}
self._mbox = m
def get_mbox(self):
return self._mbox
def get_subject(
self, upper=False, strip_tags=False, suppress_re=None, strip_re=False
):
"""Process and return subject of the message.
upper: convert subject to upper case.
strip_tags: remove all the leading [xxx] tags
suppress_re: a subject str to compare to, if ours is the same or
only to prepend Re:, return an empty str
strip_re: drop leading "Re:" prefixes"""
def do_strip_tags(t):
while t.startswith("[") and "]" in t:
t = t[t.find("]") + 1 :].strip()
return t
r = _parse_header(self._m["subject"])
if upper:
r = r.upper()
if strip_tags:
r = do_strip_tags(r)
if suppress_re or strip_re:
while r.upper().startswith("RE:"):
r = r[3:].strip()
if strip_re:
return r
if suppress_re and suppress_re == r:
return "Re: ..."
return r
def get_from(self, text=False):
name, addr = parse_address(self._m["from"])
name = name or addr
if text:
return _addr_fmt_text(name, addr)
return name, addr
def _get_addr_list(self, field, text):
ret = []
f = self._m.get_all(field, [])
f = (_parse_header(x) for x in f)
addrs = email.utils.getaddresses(f)
for name, addr in addrs:
name = name or addr
if not addr:
continue
if text:
ret.append(_addr_fmt_text(name, addr))
else:
ret.append((name, addr))
if text:
ret = ", ".join(ret)
return ret
def get_to(self, text=False):
return self._get_addr_list("to", text)
def get_cc(self, text=False):
return self._get_addr_list("cc", text)
def clean_message_id(self, msgid):
if not msgid:
return msgid
if msgid.startswith("<"):
msgid = msgid[1 : msgid.find(">")]
else:
for x in msgid.split("\n"):
if x.startswith("<") and x.endswith(">"):
msgid = x[1:-1]
break
else:
return None
msgid = msgid.replace("_", "._5F")
msgid = msgid.replace("/", "._2F")
return msgid
def get_in_reply_to(self):
msgid = self._m["in-reply-to"]
if not msgid:
refs = self._m["references"]
if refs:
msgid = refs.split()[-1]
return self.clean_message_id(msgid)
def get_date(self, timestamp=False):
tup = email.utils.parsedate_tz(self._m["date"])
if tup:
stamp = email.utils.mktime_tz(tup)
if timestamp:
return stamp
return datetime.datetime.utcfromtimestamp(stamp)
def get_message_id(self):
return self.clean_message_id(self._m["message-id"])
def get_prefixes(self, upper=False):
"""Return tags extracted from the leading "[XXX] [YYY ZZZ]... in subject"""
r = []
s = self.get_subject(upper=upper, strip_re=True)
while s.startswith("[") and "]" in s:
t = s[1 : s.find("]")]
for k in t.split(" "):
r.append(k)
if "]" in s:
s = s[s.find("]") + 1 :].strip()
return r
def get_version(self):
v = 1
for tag in self.get_prefixes(True):
if tag.startswith("PATCH"):
tag = tag[5:]
if tag.startswith("V"):
try:
v = int(tag[1:])
except:
pass
return v
def find_tags(self, *tags):
"""Return intersection of *tags is present in this message"""
s = set([x.upper() for x in tags])
return s.intersection(self.get_prefixes(upper=True))
def get_body(self):
def _get_message_text(m):
payload = m.get_payload(decode=not self._m.is_multipart())
body = ""
if m.get_content_type() == "text/plain" or m.get_content_type() == "application/octet-stream":
body = decode_payload(m)
elif isinstance(payload, list):
for p in payload:
body += _get_message_text(p)
return body
body = _get_message_text(self._m)
return body
def get_preview(self, maxchar=1000):
r = ""
quote = False
for l in self.get_body().splitlines():
if l.startswith(">"):
if not quote:
r += "..."
quote = True
continue
quote = False
r += l
if len(r) >= 1000:
break
return r
def _find_line(self, pattern):
rexp = re.compile(pattern)
for l in self.get_body().splitlines():
if rexp.match(l):
return l
def get_reviewed_by(self):
"""Try to find a "Reviewed-by:" line in message body"""
prefix = "Reviewed-by:"
r = self._find_line("^" + prefix + ".*>$")
if r:
return email.utils.parseaddr(r[len(prefix) :].strip())
else:
return None
def get_num(self):
cur, total = None, None
for tag in self.get_prefixes():
if "/" in tag:
try:
n, m = tag.split("/")
cur, total = int(n), int(m)
break
except:
pass
return cur, total
def is_reply(self):
return self.get_subject(upper=True, strip_tags=True).startswith("RE:")
def set_status(self, st, val):
self._status[st] = val
def get_status(self, st, default=None):
return self._status.get(st, default)
def get_status_by_prefix(self, pref):
return dict([(k, v) for k, v in self._status.items() if k.startswith(pref)])
def _has_lines(self, text, *lines):
i = 0
for l in text.splitlines():
if i == len(lines):
break
if l.startswith(lines[i]):
i += 1
return i == len(lines)
def is_patch(self):
"""Return true if the email body is a patch"""
body = self.get_body()
if self.get_subject().startswith("Re:"):
return False
return (
self._has_lines(body, "---", "diff ", "index ", "---", "+++", "@@")
or self._has_lines(body, "---", "diff ", "index ", "GIT binary patch")
or self._has_lines(body, "---", "diff ", "old mode ", "new mode ")
or self._has_lines(body, "---", "+++", "@@")
or self._has_lines(body, "---", "diff ", "rename from", "rename to")
)
def is_series_head(self):
"""Create and return a Series from Message if it is one, otherwise
return None"""
if self.get_subject().startswith("Re:"):
return False
c, t = self.get_num()
if (c, t) == (None, None) and self.is_patch():
return True
if c == 0:
return True
return False
def get_json(self):
"""Return the JSON format of the mbox"""
msg = {}
msg["message_id"] = self.get_message_id()
msg["in_reply_to"] = self.get_in_reply_to() or ""
msg["date"] = DateTimeField().to_representation(self.get_date())
msg["subject"] = self.get_subject()
msg["sender"] = addr_db_to_rest(self.get_from())
msg["recipients"] = [
addr_db_to_rest(x) for x in (self.get_to() + self.get_cc())
]
msg["mbox"] = self.get_mbox()
return msg