forked from mit-athena/delete
-
Notifications
You must be signed in to change notification settings - Fork 0
/
expunge
executable file
·195 lines (181 loc) · 7.48 KB
/
expunge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/python
import logging
import optparse
import os
import sys
import libdelete
header = "The following deleted files are going to be expunged:\n"
footer = """
The above files, which have been marked for deletion, are about to be
expunged forever! Make sure you don't need any of them before continuing.
"""
confirmation = "Do you wish to continue [return = no]? "
logger = logging.getLogger('expunge')
whoami = os.path.basename(sys.argv[0])
def debug_callback(option, opt_str, value, parser):
"""
An OptionParser callback that enables debugging.
"""
all_loggers = [logger.name, 'libdelete']
loggers = [x.strip() for x in value.split(',')]
if value.lower() == 'all':
loggers = all_loggers
else:
if not set(loggers) == set(all_loggers):
parser.error('Valid debug targets: {0}'.format(
", ".join(all_loggers)))
for l in loggers:
logging.getLogger(l).setLevel(logging.DEBUG)
def ask(question, *args, **kwargs):
"""
Ask a question, possibly prepended with the name of the program
and determine whether the user answered in the affirmative
"""
yes = ('y', 'yes')
prepend = '' if kwargs.get('nowhoami', False) else "{0}: ".format(whoami)
try:
return raw_input("%s%s " % (prepend,
question % args)).strip().lower() in yes
except KeyboardInterrupt:
sys.exit(0)
def perror(message, **kwargs):
"""
Format an error message, log it in the debug log
and maybe also print it to stderr.
"""
should_print = not kwargs.pop('_maybe', False)
msg = "{0}: {1}".format(whoami, message.format(**kwargs))
logger.debug("Error: %s", msg)
if should_print:
print >>sys.stderr, msg
def getsize(path):
size = os.path.getsize(path)
return (size, "(%dKB)" % (libdelete.to_kb(size),))
def expunge(deleted_files, options):
expunged_size = 0
errors = 0
if options.listfiles:
print header
print libdelete.format_columns(sorted(
[libdelete.relpath(
libdelete.undeleted_name(x)) for x in deleted_files]))
print footer
if not options.force and \
not ask(confirmation, nowhoami=True):
logger.debug("User failed to confirm; exiting")
sys.exit(0)
for f in deleted_files:
logger.debug("Processing %s", f)
real_name = libdelete.relpath(libdelete.undeleted_name(f))
logger.debug("Undeleted name: %s", real_name)
if options.interactive or options.yieldsize or options.verbose:
size, size_str = (0, '(??KB)')
try:
size, size_str = getsize(f)
except OSError as e:
perror('{filename}: {error} while getting size',
filename=e.filename, error=e.strerror)
errors = 1
logger.debug("Size, size_str = %s, %s", size, size_str)
expunged_size += size
if options.interactive:
filetype = 'directory' if os.path.isdir(f) else ''
# This is correct. We do not display sizes of directories
# when prompting, but do display them below. Because why not.
if not ask('Expunge %s%s%s?', filetype, real_name,
'' if os.path.isdir(f) else size_str):
logger.debug("User failed to confirm, exiting...")
# We exit here, not keep going, as the original code did
sys.exit(errors)
if options.verbose:
print "{whoami}: {path} {size} {maybe}expunged ({total}KB total)".format(whoami=whoami, path=f, size=size_str, maybe='would be ' if options.noop else '', total=libdelete.to_kb(expunged_size))
if not options.noop:
if os.path.isdir(f) and not os.path.islink(f):
logger.debug("rmdir: %s", f)
os.rmdir(f)
else:
logger.debug("unlink: %s", f)
os.unlink(f)
if options.yieldsize:
print "Total expunged: {0}KB".format(libdelete.to_kb(expunged_size))
return errors
def parse_options():
parser = optparse.OptionParser(usage="%prog [options] filename ...")
parser.add_option(
"-l", dest="listfiles", action="store_true", default=False,
help="List files before expunging")
parser.add_option(
"-r", dest="recursive", action="store_true", default=False,
help="Recursively delete non-empty directories")
parser.add_option(
"-f", dest="force", action="store_true", default=False,
help="Do not ask questions or report errors about nonexistent files")
parser.add_option(
"-i", dest="interactive", action="store_true", default=False,
help="Prompt for confirmation before deleting each file/directory")
parser.add_option(
"-n", dest="noop", action="store_true", default=False,
help="Don't actually delete, just print what would be deleted")
parser.add_option(
"-v", dest="verbose", action="store_true", default=False,
help="Print each filename as it is deleted")
parser.add_option(
"-t", dest="timev", action="store", type="int", default=0,
help="Only list n-day-or-older files", metavar="n")
parser.add_option(
"-y", dest="yieldsize", action="store_true", default=False,
help="Report total space taken up by files")
parser.add_option(
"-s", dest="f_links", action="store_true", default=False,
help="Follow symlinks to directories")
parser.add_option(
"-m", dest="f_mounts", action="store_true", default=False,
help="Follow mount points")
parser.add_option(
"--debug", action="callback", type='string', help="Enable debugging (logger target or 'all')",
callback=debug_callback, metavar='target')
(options, args) = parser.parse_args()
if options.noop:
# -n implies -v
options.verbose = True
return (options, args)
def main():
rv = 0
if ((whoami == "purge") and len(sys.argv) > 1):
if (len(sys.argv) == 2) and (sys.argv[1] == '--debug'):
sys.argv.append('all')
else:
print >>sys.stderr, "purge does not take any arguments or options."
sys.exit(1)
(options, args) = parse_options()
if (whoami == "purge"):
args = [os.path.expanduser('~')]
options.recursive = True
options.listfiles = True
if len(args) < 1:
args.append('.')
deleted_files = []
for filename in args:
try:
deleted_files += libdelete.find_deleted_files(
filename,
follow_links=options.f_links,
follow_mounts=options.f_mounts,
recurse_undeleted_subdirs=options.recursive or None,
recurse_deleted_subdirs=True,
n_days=options.timev)
except libdelete.DeleteError as e:
perror(e.message)
rv = 1
logger.debug("Found %d files", len(deleted_files))
if len(deleted_files):
# Sort them so we're deleting leaves first
# Doesn't cover all corner cases, but covers everything the old
# code supported. In particular, weird symlinks will make this
# sad
deleted_files.sort(reverse=True, key=lambda x: x.count(os.path.sep))
rv += expunge(deleted_files, options)
return rv
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
sys.exit(main())