From 2bc06e29228a143b29293d27a7b759d6daffbe89 Mon Sep 17 00:00:00 2001 From: "George Liu (eva2000)" Date: Sat, 29 Jul 2023 02:44:08 +1000 Subject: [PATCH] abuseipdb-reporter.py 0.4.0 - update to check for IPv6 public IPs to be masked - update is_log_file_valid function to account for empty debug/api logs --- abuseipdb-reporter.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/abuseipdb-reporter.py b/abuseipdb-reporter.py index 442b7c1..ddae4e2 100755 --- a/abuseipdb-reporter.py +++ b/abuseipdb-reporter.py @@ -46,7 +46,7 @@ import datetime from urllib.parse import quote -VERSION = "0.3.9" +VERSION = "0.4.0" # Set the DEBUG and LOG_API_REQUEST variables here (True or False) # DEBUG doesn't send to AbuseIPDB. Only logs to file # LOG_API_REQUEST, when True, logs API requests to file @@ -276,7 +276,7 @@ def update_cache(ip, cache): def get_all_public_ips(): try: - cmd = "ip addr show | grep 'inet .*global' | awk '{print $2}' | cut -d '/' -f1" + cmd = "ip -6 -4 addr show | grep -e 'inet ' -e 'inet6 ' | awk '{print $2}' | cut -d '/' -f1" output = subprocess.check_output(cmd, shell=True).decode('utf-8') ips = output.strip().split('\n') return ips @@ -313,16 +313,17 @@ def get_all_public_ips(): master_ips = [] # Replace the reported IP address in the logs with the mask IP -ip_pattern = re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b') +ip_v4_pattern = re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b') +ip_v6_pattern = re.compile(r'\b(?:(?:(?:[A-Fa-f0-9]{1,4}:){6}|::(?:[A-Fa-f0-9]{1,4}:){5}|(?:[A-Fa-f0-9]{1,4})?::(?:[A-Fa-f0-9]{1,4}:){4}|(?:(?:[A-Fa-f0-9]{1,4}:){0,1}[A-Fa-f0-9]{1,4})?::(?:[A-Fa-f0-9]{1,4}:){3}|(?:(?:[A-Fa-f0-9]{1,4}:){0,2}[A-Fa-f0-9]{1,4})?::(?:[A-Fa-f0-9]{1,4}:){2}|(?:(?:[A-Fa-f0-9]{1,4}:){0,3}[A-Fa-f0-9]{1,4})?::[A-Fa-f0-9]{1,4}:|(?:(?:[A-Fa-f0-9]{1,4}:){0,4}[A-Fa-f0-9]{1,4})?::)(?:[A-Fa-f0-9]{1,4}:[A-Fa-f0-9]{1,4}|(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d))|(?:(?:[A-Fa-f0-9]{1,4}:){0,5}[A-Fa-f0-9]{1,4})?::[A-Fa-f0-9]{1,4}|(?:(?:[A-Fa-f0-9]{1,4}:){0,6}[A-Fa-f0-9]{1,4})?::)\b') time_pattern = re.compile(r'^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})') if LOG_MODE == 'full': - filtered_logs = '\n'.join([log for log in logs.split('\n') if ip_pattern.search(log) is not None]) - filtered_logs = '\n'.join([re.sub(r'^(\S+\s+\S+\s+)(\S+\s+)(\S+\s+)', r'\1\2', log) for log in filtered_logs.split('\n') if time_pattern.search(log) is not None and ip_pattern.search(log) is not None]) + filtered_logs = '\n'.join([log for log in logs.split('\n') if ip_v4_pattern.search(log) is not None or ip_v6_pattern.search(log) is not None]) + filtered_logs = '\n'.join([re.sub(r'^(\S+\s+\S+\s+)(\S+\s+)(\S+\s+)', r'\1\2', log) for log in filtered_logs.split('\n') if time_pattern.search(log) is not None and (ip_v4_pattern.search(log) is not None or ip_v6_pattern.search(log) is not None)]) elif LOG_MODE == 'compact': filtered_logs = logs.split('\n')[0] # Extract the first line filtered_logs = re.sub(r'^(\S+\s+\S+\s+)(\S+\s+)(\S+\s+)', r'\1\2', filtered_logs) # Remove the 4th field - filtered_logs = '\n'.join([filtered_logs for log in filtered_logs.split('\n') if time_pattern.search(log) is not None and ip_pattern.search(log) is not None]) + filtered_logs = '\n'.join([filtered_logs for log in filtered_logs.split('\n') if time_pattern.search(log) is not None and (ip_v4_pattern.search(log) is not None or ip_v6_pattern.search(log) is not None)]) else: print("Error: Invalid LOG_MODE. Supported modes: 'full' or 'compact'.") sys.exit(1) @@ -465,16 +466,24 @@ def get_all_public_ips(): 'comment': masked_comment } -def is_log_file_valid(file_path): - if not os.path.exists(file_path): +def is_log_file_valid(filepath): + # Check if the file exists and is not empty + if os.path.exists(filepath) and os.path.getsize(filepath) > 0: + with open(filepath, 'rb+') as f: + try: + # Seek to the last two characters of the file + f.seek(-2, os.SEEK_END) + last_chars = f.read().decode('utf-8') + + # If the last characters are "\n]", then the file is valid + return last_chars == "\n]" + except OSError: + # If an error occurs while seeking to the end of the file, + # then the file is not valid + return False + else: return False - with open(file_path, 'rb') as f: - f.seek(-2, os.SEEK_END) - last_chars = f.read().decode() - - return last_chars == "\n]" - def contains_cluster_member_pattern(message): pattern = r"Cluster member (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) \((.*?)\) said," return re.search(pattern, message) is not None