forked from hut8/attachment
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mbox-extract-attachments
executable file
·107 lines (88 loc) · 3.27 KB
/
mbox-extract-attachments
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__license__ = "GNU GPLv3+"
__version__ = 1.0
__date__ = "2021-07-19"
import email
import mailbox
import os
import sys
import logging
import fnmatch
import hashlib
import email.header
BLACKLIST = set(['signature.asc', 'message-footer.txt', 'smime.p7s'])
def extract_attachment(msg, destination, attachment_db):
if msg.is_multipart():
logging.error("tried to extract from multipart: %s" % destination)
return
attachment_data = msg.get_payload(decode=True)
attachment_hash = hashlib.sha1(attachment_data).hexdigest()
if attachment_hash in attachment_db:
logging.debug("already extracted attachment")
return
attachment_db.add(attachment_hash)
orig_destination = destination
n = 1
while os.path.exists(destination):
base, ext = os.path.splitext(orig_destination)
destination = base + str(n) + ext
n += 1
try:
with open(destination, "wb") as sink:
sink.write(attachment_data)
except IOError as e:
logging.error("io error while saving attachment: %s" % str(e))
def decode_mime_words(s):
decoded_words = []
for text, charset in email.header.decode_header(s):
if isinstance(text, bytes):
text = text.decode(charset or 'utf8')
decoded_words.append(text)
return ''.join(decoded_words)
def wanted(filename):
decoded_filename = decode_mime_words(filename)
if decoded_filename in BLACKLIST:
return False
for ext in ['*.doc', '*.docx', '*.odt', '*.pdf', '*.rtf']:
if fnmatch.fnmatch(decoded_filename.lower(), ext):
return True
return False
def process_message(msg, directory, attachment_db):
for part in msg.walk():
if part.get_content_disposition() in ['attachment', 'inline']:
filename = part.get_filename()
if filename and wanted(filename):
logging.debug("extract filename: %s" % filename)
decoded_filename = decode_mime_words(filename)
destination = os.path.join(directory, decoded_filename)
extract_attachment(part, destination, attachment_db)
if not filename:
logging.debug("found message with nameless attachment: %s" % msg['subject'])
def main():
if len(sys.argv) < 2 or len(sys.argv) > 3:
print("usage: %s <mbox_file> [directory]" % sys.argv[0])
sys.exit(1)
filename = sys.argv[1]
directory = os.path.curdir
logging.basicConfig(
filename='attachment-%s.log' % os.path.basename(filename),
level=logging.DEBUG)
if not os.path.exists(filename):
print("file doesn't exist:", filename)
sys.exit(1)
if len(sys.argv) == 3:
directory = sys.argv[2]
if not os.path.exists(directory) or not os.path.isdir(directory):
print("Directory doesn't exist:", directory)
sys.exit(1)
box = mailbox.mbox(filename)
print("counting messages for %s... " % filename)
message_count = len(box)
print("%s contains %s messages" % (filename, message_count))
attachment_db = set()
for msg in box:
process_message(msg, directory, attachment_db)
print("extracted %s attachments" % len(attachment_db))
if __name__ == '__main__':
main()