-
Notifications
You must be signed in to change notification settings - Fork 0
/
purgelogs.py
119 lines (104 loc) · 4.2 KB
/
purgelogs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
#
# Copyright 2020 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import argparse
import logging
import os
import shutil
import sys
from datetime import datetime, timedelta
from logging import Logger
from pathlib import Path
from typing import List, Generator, Optional, Tuple, Set
DirContent = Tuple[Set[Path], Set[str]]
def check_dir_path(log_path: str) -> Optional[Path]:
"""Ensures initial directory is valid"""
p = Path(log_path)
if not p.exists():
return None
return p.resolve()
def delete_dir(dir_path: Path) -> None:
"""Recursively deletes a path"""
shutil.rmtree(dir_path)
def get_jobdir(dirs: Set[Path], files: Set[str]) -> bool:
"""Check if directory content is a job dir"""
dirs_name = set(map(lambda s: s.name, dirs))
is_zuul = 'zuul-info' in dirs_name
is_jenkins = 'ara-database' in dirs_name
is_jenkins_console = 'consoleText.txt' in files
is_empty_dir = not files and not dirs
return is_zuul or is_jenkins or is_jenkins_console or is_empty_dir
def ls(dir_path: Path) -> DirContent:
"""Returns the aboluste list of directory and files in a directory, ignoring symlinks"""
dirs = set()
files = set()
for entry in os.listdir(dir_path):
entry_path = dir_path / entry
if not entry_path.is_symlink():
if entry_path.is_dir():
dirs.add(entry_path)
elif entry_path.exists():
files.add(entry)
return (dirs, files)
def find_old_files(log: Logger, calculated_time: datetime, log_path: Path) -> Generator[Path, None, None]:
"""Finds old files in the log path, stopping when a directory is a jobdir"""
queue = set((log_path, ))
while queue:
root = queue.pop()
current_dirs, current_files = ls(root)
if get_jobdir(current_dirs, current_files):
log.debug("%s : is a job dir", root)
dir_date = datetime.fromtimestamp(os.path.getmtime(root))
if dir_date < calculated_time:
yield root
else:
log.debug("%s : walking", root)
queue = queue.union(current_dirs)
def search_and_destroy(log: Logger, calculated_time: datetime, dry_run: bool, log_path: Path) -> None:
"""Removes log dir that are older than the calcultated time"""
for job_dir in find_old_files(log, calculated_time, log_path):
log.debug("%s : removing old logs", job_dir)
if not dry_run and log_path != job_dir:
delete_dir(job_dir)
def usage(argv: List[str]) -> argparse.Namespace:
"""The script usage
>>> usage([])
Namespace(debug=False, dry_run=False, log_path_dir='/var/www/logs', retention_days=31)
"""
parser = argparse.ArgumentParser(description="Purge old logs")
parser.add_argument('--dry-run', action='store_true')
parser.add_argument('--retention-days', type=int, default=31)
parser.add_argument('--log-path-dir', default='/var/www/logs')
parser.add_argument('--debug', action='store_true')
return parser.parse_args(argv)
def setup_logging(debug: bool) -> Logger:
"""Create the logger with a nice format string"""
logging.basicConfig(
format='%(asctime)s %(levelname)-5.5s %(message)s',
level=logging.DEBUG if debug else logging.INFO)
return logging.getLogger()
def main() -> None:
"""The script entrypoint"""
args = usage(sys.argv[1:])
root = check_dir_path(args.log_path_dir)
calculated_time = datetime.now() - timedelta(days=args.retention_days)
log = setup_logging(args.debug)
if not root:
log.error("The provided log path dir does not exists")
exit(1)
search_and_destroy(log, calculated_time, args.dry_run, root)
if __name__ == "__main__":
main()