diff --git a/__init__.py b/__init__.py index e5a68c2..bddb798 100644 --- a/__init__.py +++ b/__init__.py @@ -1 +1 @@ -import libs \ No newline at end of file +import libs diff --git a/libs/custom_logger.py b/libs/custom_logger.py index 0686b8b..f44f4a1 100644 --- a/libs/custom_logger.py +++ b/libs/custom_logger.py @@ -1,24 +1,45 @@ import logging + class CustomFormatter(logging.Formatter): """Logging colored formatter, adapted from https://stackoverflow.com/a/56944256/3638629""" - grey = '\x1b[38;21m' - blue = '\x1b[38;5;39m' - yellow = '\x1b[38;5;226m' - red = '\x1b[38;5;196m' - bold_red = '\x1b[31;1m' - reset = '\x1b[0m' + grey = "\x1b[38;21m" + blue = "\x1b[38;5;39m" + yellow = "\x1b[38;5;226m" + red = "\x1b[38;5;196m" + bold_red = "\x1b[31;1m" + reset = "\x1b[0m" def __init__(self, fmt): super().__init__() self.fmt = fmt.split("|") self.FORMATS = { - logging.DEBUG: self.grey+ self.fmt[0] + self.fmt[1] + self.reset+ self.fmt[2], - logging.INFO: self.blue+ self.fmt[0] + self.fmt[1] + self.reset+ self.fmt[2], - logging.WARNING: self.yellow+ self.fmt[0] + self.fmt[1] + self.reset+ self.fmt[2], - logging.ERROR: self.red+ self.fmt[0] + self.fmt[1] + self.reset+ self.fmt[2], - logging.CRITICAL: self.bold_red+ self.fmt[0] + self.fmt[1] + self.reset+ self.fmt[2] + logging.DEBUG: self.grey + + self.fmt[0] + + self.fmt[1] + + self.reset + + self.fmt[2], + logging.INFO: self.blue + + self.fmt[0] + + self.fmt[1] + + self.reset + + self.fmt[2], + logging.WARNING: self.yellow + + self.fmt[0] + + self.fmt[1] + + self.reset + + self.fmt[2], + logging.ERROR: self.red + + self.fmt[0] + + self.fmt[1] + + self.reset + + self.fmt[2], + logging.CRITICAL: self.bold_red + + self.fmt[0] + + self.fmt[1] + + self.reset + + self.fmt[2], } def format(self, record): @@ -26,12 +47,13 @@ def format(self, record): formatter = logging.Formatter(log_fmt) return formatter.format(record) + # Create custom logger logging all five levels logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Define format for logs -fmt = '%(asctime)s | %(levelname)8s | %(message)s' +fmt = "%(asctime)s | %(levelname)8s | %(message)s" # Create stdout handler for logging to the console (logs all five levels) stdout_handler = logging.StreamHandler() diff --git a/libs/dns_request.py b/libs/dns_request.py index 0111738..ff0953a 100644 --- a/libs/dns_request.py +++ b/libs/dns_request.py @@ -1,4 +1,4 @@ -#from domain name get dns information and detect all subdomain associated +# from domain name get dns information and detect all subdomain associated import dns.resolver import requests import socket @@ -8,10 +8,10 @@ def get_dns_information(domain): - #get the dns information of the domain with dnspyton - #return a list of dns information - dns_informations= [] - #set timeout to 0.2 seconds + # get the dns information of the domain with dnspyton + # return a list of dns information + dns_informations = [] + # set timeout to 0.2 seconds socket.setdefaulttimeout(0.2) # try: # answers = dns.resolver.resolve(domain, 'NS') @@ -19,125 +19,131 @@ def get_dns_information(domain): # dns_informations.append(rdata) # except: # pass - try : - answers = dns.resolver.resolve(domain, 'MX') + try: + answers = dns.resolver.resolve(domain, "MX") for rdata in answers: dns_informations.append(rdata) except: pass - try : - answers = dns.resolver.resolve(domain, 'A') + try: + answers = dns.resolver.resolve(domain, "A") for rdata in answers: dns_informations.append(rdata) except: pass - try : - answers = dns.resolver.resolve(domain, 'AAAA') + try: + answers = dns.resolver.resolve(domain, "AAAA") for rdata in answers: dns_informations.append(rdata) except: pass - try : - answers = dns.resolver.resolve(domain, 'CNAME') + try: + answers = dns.resolver.resolve(domain, "CNAME") for rdata in answers: dns_informations.append(rdata) except: pass - try : - #get the dns information - answers = dns.resolver.resolve(domain, 'TXT') + try: + # get the dns information + answers = dns.resolver.resolve(domain, "TXT") for rdata in answers: dns_informations.append(rdata) except: pass return dns_informations -def get_dns_informations_thread(domain :str, threads_number:int) : - dns_informations=[] + +def get_dns_informations_thread(domain: str, threads_number: int): + dns_informations = [] with ThreadPoolExecutor(max_workers=threads_number) as executor: results = executor.map(get_dns_information, domain) for result in results: - dns_informations+= result + dns_informations += result return dns_informations -def detect_subdomain(dns_information :list) -> list: - #detect all the subdomain from the dns information - #return a list of subdomain + +def detect_subdomain(dns_information: list) -> list: + # detect all the subdomain from the dns information + # return a list of subdomain subdomains = [] for dns in dns_information: - #get the dns information + # get the dns information dns = str(dns) - #split the dns information + # split the dns information dns = dns.split(" ") - #get the subdomain + # get the subdomain subdomain = dns[0] - #add the subdomain to the list + # add the subdomain to the list subdomains.append(subdomain) return subdomains -def detect_real_subdomain(subdomains :list) -> list: - #detect all the real subdomain from the list of subdomain - #return a list of real subdomain + +def detect_real_subdomain(subdomains: list) -> list: + # detect all the real subdomain from the list of subdomain + # return a list of real subdomain real_subdomains = [] for subdomain in subdomains: - #test if the subdomain is real + # test if the subdomain is real try: - #try to connect to the subdomain + # try to connect to the subdomain socket.gethostbyname(subdomain) - #if the connection is successful, add the subdomain to the list + # if the connection is successful, add the subdomain to the list real_subdomains.append(subdomain) except: pass return real_subdomains -def delete_ip_from_list(subdomains :list) -> list: - #delete all the ip address from the list of subdomain - #return a list of subdomain + +def delete_ip_from_list(subdomains: list) -> list: + # delete all the ip address from the list of subdomain + # return a list of subdomain subdomains_without_ip = [] for subdomain in subdomains: - #test if the subdomain is an ip address + # test if the subdomain is an ip address try: - #try to convert the subdomain to an ip address + # try to convert the subdomain to an ip address socket.inet_aton(subdomain) except: - #if the subdomain is not an ip address, add it to the list + # if the subdomain is not an ip address, add it to the list subdomains_without_ip.append(subdomain) return subdomains_without_ip -def test_dns_zone_transfer(domain :str) -> list: - #test if the dns zone transfer is enable - #return a list of subdomain + +def test_dns_zone_transfer(domain: str) -> list: + # test if the dns zone transfer is enable + # return a list of subdomain subdomains = [] - #get the dns server - dns_server = dns.resolver.resolve(domain, 'NS') - #test if the dns zone transfer is enable + # get the dns server + dns_server = dns.resolver.resolve(domain, "NS") + # test if the dns zone transfer is enable for server in dns_server: - #get the dns server + # get the dns server server = str(server) - #split the dns server + # split the dns server server = server.split(" ") - #get the dns server + # get the dns server server = server[0] - #test if the dns zone transfer is enable + # test if the dns zone transfer is enable try: - #try to get the dns zone transfer + # try to get the dns zone transfer answers = dns.query.xfr(server, domain) - #if the dns zone transfer is enable, add the subdomain to the list + # if the dns zone transfer is enable, add the subdomain to the list for answer in answers: subdomains.append(answer) except: pass return subdomains + def main(domain, threads_number): - #get the dns information + # get the dns information dns_information = get_dns_informations_thread(domain, threads_number) - #detect all the subdomain + # detect all the subdomain subdomains = detect_subdomain(dns_information) - #detect all the real subdomain + # detect all the real subdomain real_subdomains = detect_real_subdomain(subdomains) - #delete all the ip address from the list of subdomain + # delete all the ip address from the list of subdomain subdomains_without_ip = delete_ip_from_list(real_subdomains) - #return the list of subdomain + # return the list of subdomain return subdomains_without_ip diff --git a/libs/domain_parser.py b/libs/domain_parser.py index 87a621f..94fa0ff 100644 --- a/libs/domain_parser.py +++ b/libs/domain_parser.py @@ -4,21 +4,28 @@ from concurrent.futures import ThreadPoolExecutor from urllib3.exceptions import InsecureRequestWarning import webtech + + def detect_redirect(url: str) -> bool: - if check_up(url) : + if check_up(url): try: - - response = requests.get("https://"+url, headers={'User-Agent': 'Google Chrome'}, timeout=1) + response = requests.get( + "https://" + url, headers={"User-Agent": "Google Chrome"}, timeout=1 + ) except: - try : - if check_up(url) : - response = requests.get("http://"+url, headers={'User-Agent': 'Google Chrome'}, timeout=1) + try: + if check_up(url): + response = requests.get( + "http://" + url, + headers={"User-Agent": "Google Chrome"}, + timeout=1, + ) except: return False - try : + try: if response.history: - #split response.url to get the domain - url_redirected= response.url.split("//")[1] + # split response.url to get the domain + url_redirected = response.url.split("//")[1] domain = url_redirected.split("/")[0] if url_redirected == url: return False @@ -27,26 +34,38 @@ def detect_redirect(url: str) -> bool: return False except: return False - else : + else: return "dead" + def try_access_web_port(url: str) -> bool: requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) try: - response = requests.get("http://"+url, headers={'User-Agent': 'Google Chrome'}, timeout=3, verify=False) - if response.status_code == 400 : + response = requests.get( + "http://" + url, + headers={"User-Agent": "Google Chrome"}, + timeout=3, + verify=False, + ) + if response.status_code == 400: raise Exception("Bad request") except: - try : - response = requests.get("https://"+url, headers={'User-Agent': 'Google Chrome'}, timeout=3, verify=False) + try: + response = requests.get( + "https://" + url, + headers={"User-Agent": "Google Chrome"}, + timeout=3, + verify=False, + ) except: return False if response.status_code == 200: - #return the x-powered-by header + # return the x-powered-by header return True else: return False + def detect_redirect_with_thread_limit(subdomains: list, thread_number: int) -> list: real_subdomains = [] subdomains_with_redirect = [] @@ -55,23 +74,35 @@ def detect_redirect_with_thread_limit(subdomains: list, thread_number: int) -> l results = executor.map(detect_redirect, subdomains) for subdomain, is_redirect in zip(subdomains, results): if is_redirect == True: - if is_redirect not in subdomains and is_redirect != "" : + if is_redirect not in subdomains and is_redirect != "": real_subdomains.append(is_redirect) - #print(f'> {subdomain} is a redirect') + # print(f'> {subdomain} is a redirect') subdomains_with_redirect.append(subdomain) elif is_redirect == None: pass elif is_redirect == "dead": dead_subdomains.append(subdomain) else: - #print(f'> {subdomain} is not a redirect') + # print(f'> {subdomain} is not a redirect') real_subdomains.append(subdomain) return real_subdomains, subdomains_with_redirect, dead_subdomains -def detect_web_port(ip_dict :dict, thread_number: int, scan_mode: str) -> dict: + +def detect_web_port(ip_dict: dict, thread_number: int, scan_mode: str) -> dict: with ThreadPoolExecutor(thread_number) as executor: for ip in ip_dict: - if (scan_mode == "W" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != []) or (scan_mode == "WR" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != [] or ip_dict[ip]["subdomains"]["subdomain_with_redirect"] != []) or (scan_mode =="A") : + if ( + ( + scan_mode == "W" + and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != [] + ) + or ( + scan_mode == "WR" + and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != [] + or ip_dict[ip]["subdomains"]["subdomain_with_redirect"] != [] + ) + or (scan_mode == "A") + ): for port, service in ip_dict[ip]["ports"].items(): print(f"Checking if {ip}:{port} is a web port") result = executor.submit(try_access_web_port, f"{ip}:{port}") @@ -79,75 +110,99 @@ def detect_web_port(ip_dict :dict, thread_number: int, scan_mode: str) -> dict: ip_dict[ip]["ports"][port] = "website" return ip_dict + def detect_web_techno(ip_dict: dict, mode: str) -> dict: for ip in ip_dict: - if (mode == "W" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != []) or (mode == "WR" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != [] or ip_dict[ip]["subdomains"]["subdomain_with_redirect"] != []) or (mode =="A") : + if ( + (mode == "W" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != []) + or ( + mode == "WR" + and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != [] + or ip_dict[ip]["subdomains"]["subdomain_with_redirect"] != [] + ) + or (mode == "A") + ): for port, service in ip_dict[ip]["ports"].items(): if service == "website": print(f"Detecting web technology for {ip}:{port}") try: - result = webtech.WebTech(options={'json': True}).start_from_url(f"http://{ip}:{port}") + result = webtech.WebTech(options={"json": True}).start_from_url( + f"http://{ip}:{port}" + ) except: try: - result = webtech.WebTech(options={'json': True}).start_from_url(f"https://{ip}:{port}") + result = webtech.WebTech( + options={"json": True} + ).start_from_url(f"https://{ip}:{port}") except: result = {} if result != {}: ip_dict[ip]["ports"][port] = result return ip_dict + def detect_web_techno_domain(ip_dict: dict, mode: str) -> dict: for ip in ip_dict: - ip_dict[ip]["subdomains"]["web_techno"]={} - if mode == "W" or mode=="WR" or mode== "A": + ip_dict[ip]["subdomains"]["web_techno"] = {} + if mode == "W" or mode == "WR" or mode == "A": for subdomain in ip_dict[ip]["subdomains"]["subdomain_withdomain"]: print(f"Detecting web technology for {subdomain}") try: - result = webtech.WebTech(options={'json': True}).start_from_url(f"http://{subdomain}") + result = webtech.WebTech(options={"json": True}).start_from_url( + f"http://{subdomain}" + ) except: try: - result = webtech.WebTech(options={'json': True}).start_from_url(f"https://{subdomain}") + result = webtech.WebTech(options={"json": True}).start_from_url( + f"https://{subdomain}" + ) except: result = {} if result != {}: ip_dict[ip]["subdomains"]["web_techno"][subdomain] = result - if mode == "WR" or mode== "A": + if mode == "WR" or mode == "A": for subdomain in ip_dict[ip]["subdomains"]["subdomain_with_redirect"]: print(f"Detecting web technology for {subdomain}") try: result = webtech.WebTech().start_from_url(f"http://{subdomain}") except: try: - result = webtech.WebTech().start_from_url(f"https://{subdomain}") + result = webtech.WebTech().start_from_url( + f"https://{subdomain}" + ) except: result = {} if result != {}: ip_dict[ip]["subdomains"]["web_techno"][subdomain] = result return ip_dict + + def check_up(url: str) -> bool: try: - response = requests.get(url, headers={'User-Agent': 'Google Chrome'}, timeout=3) + response = requests.get(url, headers={"User-Agent": "Google Chrome"}, timeout=3) except: # Use ping to check if the server is up try: ping(url, timeout=1) return True except Exception as e: - #print(e) + # print(e) return False if response.status_code == 200: return True else: return False + def check_dns(domain: str) -> bool: try: - dns.resolver.resolve(domain, 'A') + dns.resolver.resolve(domain, "A") return True except: return False + if __name__ == "__main__": print(check_up("benoit.fage.fr")) print(detect_redirect("benoit.fage.fr")) - # print(check_up("https://content.pizza.benoit.fage.fr")) \ No newline at end of file + # print(check_up("https://content.pizza.benoit.fage.fr")) diff --git a/libs/ip_scan.py b/libs/ip_scan.py index db1e8da..846a0d4 100644 --- a/libs/ip_scan.py +++ b/libs/ip_scan.py @@ -6,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor from multiprocessing import Pool from libs import custom_logger + logger = custom_logger.logger from pprint import pprint import os @@ -18,51 +19,56 @@ import random +def ping(ip: str) -> bool: + # ping the ip address using scapy + # return True if the ip is up, False otherwise + # create the arp request + arp = ARP(pdst=ip) + # create the ether broadcast packet + # ff:ff:ff:ff:ff:ff MAC address indicates broadcasting + ether = Ether(dst="ff:ff:ff:ff:ff:ff") + # stack them + packet = ether / arp + # send the packet and receive a response + result = srp(packet, timeout=3, verbose=0)[0] + # check if the ip is up + if result: + return True + else: + return False + + def get_ip(domain): - #get the ip address from the domain - try : + # get the ip address from the domain + try: ip = gethostbyname(domain) return ip except: return None -def get_ip_from_network(network: str) : - #scan the network and retrieve the ip addresses - #return the ip addresses + +def get_ip_from_network(network: str): + # scan the network and retrieve the ip addresses + # return the ip addresses if not "/" in network: network = network + "/24" - #create the arp request + # create the arp request arp = ARP(pdst=network) - #create the ether broadcast packet - #ff:ff:ff:ff:ff:ff MAC address indicates broadcasting + # create the ether broadcast packet + # ff:ff:ff:ff:ff:ff MAC address indicates broadcasting ether = Ether(dst="ff:ff:ff:ff:ff:ff") - #stack them - packet = ether/arp - #send the packet and receive a response + # stack them + packet = ether / arp + # send the packet and receive a response result = srp(packet, timeout=3, verbose=0)[0] - #a list of clients, we will fill this in the upcoming loop + # a list of clients, we will fill this in the upcoming loop clients = [] for sent, received in result: - #for each response, append ip and mac address to `clients` list - clients.append({'ip': received.psrc, 'mac': received.hwsrc}) - #print clients + # for each response, append ip and mac address to `clients` list + clients.append({"ip": received.psrc, "mac": received.hwsrc}) + # print clients return clients - -def check_filtered(host): - target_ports = range(30000, 65535) - start = time.time() - for i in random.sample(target_ports, 10): - try: - s = socket(AF_INET, SOCK_STREAM) - s.settimeout(1) - s.connect((host, i)) - s.close() - except: - pass - end = time.time() - if end - start < 5: - return True # returns True if a connection can be made, False otherwise def test_port_number(host, port): @@ -77,76 +83,94 @@ def test_port_number(host, port): sock.connect((host, port)) # a successful connection was made end = time.time() - #close the socket + # close the socket sock.close() return True except: # ignore the failure return False + def port_scan(host, ports): open_ports = [] - logger.info(f'Scanning {host}...') + logger.info(f"Scanning {host}...") # create the thread pool with ThreadPoolExecutor(len(ports)) as executor: # dispatch all tasks - results = executor.map(test_port_number, [host]*len(ports), ports) + results = executor.map(test_port_number, [host] * len(ports), ports) # report results in order - for port,is_open in zip(ports,results): + for port, is_open in zip(ports, results): if is_open: open_ports.append(port) pprint(open_ports) return open_ports -def port_scan_with_thread_limit(host: str, ports:range, thread_number: int): - #scan the host with the ports with a thread limit - #return the open ports - logger.info(f'Checking if {host} filtered...') + +def check_filtered(host): + target_ports = range(30000, 65535) + start = time.time() + port_scan(host, random.sample(target_ports, 1000)) + end = time.time() + if end - start < 1.7: + return True + + +def port_scan_with_thread_limit(host: str, ports: range, thread_number: int): + # scan the host with the ports with a thread limit + # return the open ports + logger.info("Checkin if the host is up...") + if not ping(host): + logger.warning(f"{host} is down") + return [] + logger.info(f"Checking if {host} filtered...") if check_filtered(host): - logger.warning(f'{host} is filtered') + logger.warning(f"{host} is filtered") return [] + open_ports = [] - logger.info(f'Scanning {host}...') + logger.info(f"Scanning {host}...") # create the thread pool with ThreadPoolExecutor(thread_number) as executor: # dispatch all tasks - results = executor.map(test_port_number, [host]*len(ports), ports) + results = executor.map(test_port_number, [host] * len(ports), ports) # report results in order - for port,is_open in zip(ports,results): + for port, is_open in zip(ports, results): if is_open: - logger.info(f'> {host}:{port} open') + logger.info(f"> {host}:{port} open") open_ports.append(port) return open_ports def detect_service(ip, port): - #detect the service from the ip address and the port - #return the service + # detect the service from the ip address and the port + # return the service try: - #try to connect to the port + # try to connect to the port create_connection((ip, port)) - #if the connection is successful, get the service + # if the connection is successful, get the service service = getservbyport(port) return service except: return None + def detect_banner(ip, port): - #detect the banner from the ip address and the port - #return the banner + # detect the banner from the ip address and the port + # return the banner try: - #try to connect to the port + # try to connect to the port s = create_connection((ip, port)) - #if the connection is successful, get the banner + # if the connection is successful, get the banner banner = s.recv(1024) return banner except: return None -def get_all_ip(subdomains: dict, domain :str): - #for all subdomains ping them and retrive their ip address - #return a dict with ip address as key and subdomains as value - + +def get_all_ip(subdomains: dict, domain: str): + # for all subdomains ping them and retrive their ip address + # return a dict with ip address as key and subdomains as value + """ subdomains = { "subdomain_withdomain": [], @@ -176,9 +200,9 @@ def get_all_ip(subdomains: dict, domain :str): "subdomains": { "subdomain_withdomain": [], "subdomain_withoutdomain": [], - "subdomain_with_redirect": [] + "subdomain_with_redirect": [], }, - "ports": {} + "ports": {}, } dict[ip]["subdomains"]["subdomain_withdomain"].append(subdomain) for subdomain in subdomains["subdomain_withoutdomain"]: @@ -189,9 +213,9 @@ def get_all_ip(subdomains: dict, domain :str): "subdomains": { "subdomain_withdomain": [], "subdomain_withoutdomain": [], - "subdomain_with_redirect": [] + "subdomain_with_redirect": [], }, - "ports": {} + "ports": {}, } dict[ip]["subdomains"]["subdomain_withoutdomain"].append(subdomain) for subdomain in subdomains["subdomain_with_redirect"]: @@ -202,48 +226,52 @@ def get_all_ip(subdomains: dict, domain :str): "subdomains": { "subdomain_withdomain": [], "subdomain_withoutdomain": [], - "subdomain_with_redirect": [] + "subdomain_with_redirect": [], }, - "ports": {} + "ports": {}, } dict[ip]["subdomains"]["subdomain_with_redirect"].append(subdomain) return dict + def check_if_nuclei_installed() -> bool: - #check if nuclei is installed by doing nuclei -h - #return True if installed, False otherwise - #use subprocess to run the command and don't show the output + # check if nuclei is installed by doing nuclei -h + # return True if installed, False otherwise + # use subprocess to run the command and don't show the output try: - subprocess.run(["nuclei", "-h"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + ["nuclei", "-h"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) return True except: return False + def nuclei_scan(hosts: list, domain: str, vulnconf: str) -> dict: - #scan the hosts with nuclei - #return the results - #check if nuclei is installed + # scan the hosts with nuclei + # return the results + # check if nuclei is installed logger.info("Checking if nuclei is installed...") if not check_if_nuclei_installed(): logger.error("Nuclei is not installed") return None - else : + else: logger.info("Nuclei is installed") - #create nuclei folder in project folder + # create nuclei folder in project folder if not os.path.exists("nuclei"): os.mkdir("nuclei") - - #create a folder for the domain + + # create a folder for the domain if not os.path.exists(f"nuclei/{domain}"): os.mkdir(f"nuclei/{domain}") - - #create a hosts.txt with one host per line + + # create a hosts.txt with one host per line with open(f"nuclei/{domain}/hosts.txt", "w") as f: for host in hosts: f.write(f"{host}\r") f.close() - #parse the file and delete if line is empty + # parse the file and delete if line is empty with open(f"nuclei/{domain}/hosts.txt", "r") as f: lines = f.readlines() f.close() @@ -252,26 +280,34 @@ def nuclei_scan(hosts: list, domain: str, vulnconf: str) -> dict: if line.strip("\r") != "": f.write(line) f.close() - - #update nuclei don't show the output + + # update nuclei don't show the output logger.info("Updating nuclei") - subprocess.run(["nuclei", "-update"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - #update nuclei templates + subprocess.run( + ["nuclei", "-update"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + # update nuclei templates logger.info("Updating nuclei templates") - subprocess.run(["nuclei", "-ut"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - #run nuclei and save the results in a json file + subprocess.run( + ["nuclei", "-ut"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + + # run nuclei and save the results in a json file actual_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") if vulnconf != "": logger.info("Running nuclei with config file") - os.system(f"nuclei -l nuclei/{domain}/hosts.txt -config {vulnconf} -json -o nuclei/{domain}/results.json") - else : - os.system(f"nuclei -l nuclei/{domain}/hosts.txt -json -o nuclei/{domain}/results_{actual_time}.json") - #read the output + os.system( + f"nuclei -l nuclei/{domain}/hosts.txt -config {vulnconf} -json -o nuclei/{domain}/results.json" + ) + else: + os.system( + f"nuclei -l nuclei/{domain}/hosts.txt -json -o nuclei/{domain}/results_{actual_time}.json" + ) + # read the output with open(f"nuclei/{domain}/results_{actual_time}.json", "r") as f: if f.read() == "": return None - #each line is a json + # each line is a json f.seek(0) results = [] for line in f: @@ -279,54 +315,71 @@ def nuclei_scan(hosts: list, domain: str, vulnconf: str) -> dict: f.close() return results -def run_parse_nuclei(ip_dict: dict, domain: str, mode :str, vulnconf :str) -> dict: + +def run_parse_nuclei(ip_dict: dict, domain: str, mode: str, vulnconf: str) -> dict: logger.info("Running nuclei scan...") hosts_list = [] for ip in ip_dict: - if (mode == "W" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != []) or (mode == "WR" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != [] or ip_dict[ip]["subdomains"]["subdomain_with_redirect"] != []) or (mode =="A"): + if ( + (mode == "W" and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != []) + or ( + mode == "WR" + and ip_dict[ip]["subdomains"]["subdomain_withdomain"] != [] + or ip_dict[ip]["subdomains"]["subdomain_with_redirect"] != [] + ) + or (mode == "A") + ): hosts_list.append(ip) if mode == "W" or mode == "WR" or mode == "A": for subdomain in ip_dict[ip]["subdomains"]["subdomain_withdomain"]: - hosts_list.append("https://"+subdomain) + hosts_list.append("https://" + subdomain) if mode == "WR" or mode == "A": for subdomain in ip_dict[ip]["subdomains"]["subdomain_with_redirect"]: - hosts_list.append("https://"+subdomain) + hosts_list.append("https://" + subdomain) if mode == "A": for subdomain in ip_dict[ip]["subdomains"]["subdomain_withoutdomain"]: - hosts_list.append("https://"+subdomain) - + hosts_list.append("https://" + subdomain) nuclei_results = nuclei_scan(hosts_list, domain, vulnconf) logger.info("Nuclei scan finished") if nuclei_results: logger.info("Parsing nuclei results...") - #in ip_dict add vulns key and add the nuclei results + # in ip_dict add vulns key and add the nuclei results for ip in ip_dict: ip_dict[ip]["vulns"] = [] for result in nuclei_results: - if result["host"] == ip or result["host"] == "https://"+ip: + if result["host"] == ip or result["host"] == "https://" + ip: ip_dict[ip]["vulns"].append(result) - #add vulns key to subdomains and add the nuclei results, split the 'https://' from the subdomain + # add vulns key to subdomains and add the nuclei results, split the 'https://' from the subdomain for ip in ip_dict: - ip_dict[ip]["subdomains"]["vulns"]={} + ip_dict[ip]["subdomains"]["vulns"] = {} for subdomain in ip_dict[ip]["subdomains"]["subdomain_withdomain"]: ip_dict[ip]["subdomains"]["vulns"][subdomain] = [] for result in nuclei_results: - if result["host"] == "https://"+subdomain or result["host"] == subdomain or result["host"] == "http://"+subdomain: + if ( + result["host"] == "https://" + subdomain + or result["host"] == subdomain + or result["host"] == "http://" + subdomain + ): ip_dict[ip]["subdomains"]["vulns"][subdomain].append(result) for subdomain in ip_dict[ip]["subdomains"]["subdomain_with_redirect"]: ip_dict[ip]["subdomains"]["vulns"][subdomain] = [] for result in nuclei_results: - if result["host"] == "https://"+subdomain or result["host"] == subdomain or result["host"] == "http://"+subdomain: + if ( + result["host"] == "https://" + subdomain + or result["host"] == subdomain + or result["host"] == "http://" + subdomain + ): ip_dict[ip]["subdomains"]["vulns"][subdomain].append(result) for subdomain in ip_dict[ip]["subdomains"]["subdomain_withoutdomain"]: ip_dict[ip]["subdomains"]["vulns"][subdomain] = [] for result in nuclei_results: - if result["host"] == "https://"+subdomain or result["host"] == subdomain or result["host"] == "http://"+subdomain: + if ( + result["host"] == "https://" + subdomain + or result["host"] == subdomain + or result["host"] == "http://" + subdomain + ): ip_dict[ip]["subdomains"]["vulns"][subdomain].append(result) logger.info("Nuclei results parsed") return ip_dict - - - diff --git a/libs/result_parser.py b/libs/result_parser.py index 9989187..58e4fa3 100644 --- a/libs/result_parser.py +++ b/libs/result_parser.py @@ -1,26 +1,38 @@ import csv import os from libs import custom_logger as cl + logger = cl.logger import socket -def delete_occurences(list : list): - #delete all the occurences in a list - #example: [1,2,3,4,5,1,2,3,4,5] -> [1,2,3,4,5] + + +def delete_occurences(list: list): + # delete all the occurences in a list + # example: [1,2,3,4,5,1,2,3,4,5] -> [1,2,3,4,5] new_list = [] for i in list: if i not in new_list: new_list.append(i) return new_list -def delete_star(list : list) -> list: - #delete all the occurences of * in a list + +def delete_star(list: list) -> list: + # delete all the occurences of * in a list new_list = [] for i in list: if i != "*": new_list.append(i) return new_list -def result_filter(list : list, domain : str, subdomain_with_redirect:list, dead_subdomains:list, dns_exist:list) -> dict : - #from the list of sudbomains return all subomains containing the domain + + +def result_filter( + list: list, + domain: str, + subdomain_with_redirect: list, + dead_subdomains: list, + dns_exist: list, +) -> dict: + # from the list of sudbomains return all subomains containing the domain """ dict = { "subdomain_withdomain": [], @@ -33,7 +45,7 @@ def result_filter(list : list, domain : str, subdomain_with_redirect:list, dead_ "subdomain_withoutdomain": [], "subdomain_with_redirect": [], "dead_subdomains": [], - "dns_exist": [] + "dns_exist": [], } for subdomain in list: if domain in subdomain: @@ -45,29 +57,30 @@ def result_filter(list : list, domain : str, subdomain_with_redirect:list, dead_ dict["dns_exist"] = dns_exist return dict -def dynamic_save(all_results: dict, domain : str, mode : str): - up=[] - down=[] + +def dynamic_save(all_results: dict, domain: str, mode: str): + up = [] + down = [] if mode == "create": if all_results: if not os.path.exists(f"exports/{domain}"): os.makedirs(f"exports/{domain}") with open(f"exports/{domain}/dynamic_sub_save.txt", "w") as file: for sub in all_results: - #do dns resolution + # do dns resolution try: - #timeout is 1 second + # timeout is 1 second socket.setdefaulttimeout(0.5) ip = socket.gethostbyname(sub) up.append(sub) except: down.append(sub) - #first write the up subdomains + # first write the up subdomains file.write("Up subdomains: \r \r") for sub in up: file.write(sub + "\r") file.write("\r \r") - #then write the down subdomains + # then write the down subdomains file.write("Down subdomains: \r \r") for sub in down: file.write(sub + "\r") @@ -75,19 +88,19 @@ def dynamic_save(all_results: dict, domain : str, mode : str): else: logger.error("No subdomains found, exiting...") exit(1) - if mode == "add" : - #add the new subdomains to the file + if mode == "add": + # add the new subdomains to the file if all_results: for sub in all_results: - #do dns resolution + # do dns resolution try: - #timeout is 1 second + # timeout is 1 second socket.setdefaulttimeout(0.5) ip = socket.gethostbyname(sub) up.append(sub) except: down.append(sub) - #first write the up subdomains to the right position in the file + # first write the up subdomains to the right position in the file with open(f"exports/{domain}/dynamic_sub_save.txt", "r") as file: lines = file.readlines() for i in range(len(lines)): @@ -96,7 +109,7 @@ def dynamic_save(all_results: dict, domain : str, mode : str): with open(f"exports/{domain}/dynamic_sub_save.txt", "a") as file: for sub in up: file.write(sub + "\r") - #then write the down subdomains to the right position in the file + # then write the down subdomains to the right position in the file with open(f"exports/{domain}/dynamic_sub_save.txt", "r") as file: lines = file.readlines() for i in range(len(lines)): @@ -109,14 +122,15 @@ def dynamic_save(all_results: dict, domain : str, mode : str): logger.error("No subdomains found, exiting...") exit(1) -def service_recognizer(scan_dict :dict) -> dict: - #open the file with all the services in wordlists/tcp.csv - #the csv file is in the format: + +def service_recognizer(scan_dict: dict) -> dict: + # open the file with all the services in wordlists/tcp.csv + # the csv file is in the format: """ Service Name,Port Number,Transport Protocol,Description,Assignee,Contact,Registration Date,Modification Date,Reference,Service Code,Unauthorized Use Reported,Assignment Notes """ - #get only service name and port number for tcp + # get only service name and port number for tcp tcp_services = {} with open("wordlists/service-names-port-numbers.csv", "r") as file: reader = csv.reader(file) @@ -124,7 +138,7 @@ def service_recognizer(scan_dict :dict) -> dict: if row[2] == "tcp": tcp_services[row[1]] = row[0] - #scan_dict is in the format: + # scan_dict is in the format: """ 1.1.1.1{ "ports":{ @@ -135,11 +149,14 @@ def service_recognizer(scan_dict :dict) -> dict: "subdomains":[] } """ - #if in scan_dict there is a service in state "None" it will be replaced with the service in the csv file + # if in scan_dict there is a service in state "None" it will be replaced with the service in the csv file for ip in scan_dict: - try: + try: for port in scan_dict[ip]["ports"]: - if scan_dict[ip]["ports"][port] == None or scan_dict[ip]["ports"][port] == "null": + if ( + scan_dict[ip]["ports"][port] == None + or scan_dict[ip]["ports"][port] == "null" + ): if str(port) in tcp_services: scan_dict[ip]["ports"][port] = tcp_services[str(port)] else: diff --git a/libs/sub_harvester.py b/libs/sub_harvester.py index 69f819c..f44c230 100644 --- a/libs/sub_harvester.py +++ b/libs/sub_harvester.py @@ -4,14 +4,15 @@ import socket import threading from libs import custom_logger + logger = custom_logger.logger def hacker_target_parser(domain): - #get all the subdomain of the domain from hackertarget - #the url is https://api.hackertarget.com/hostsearch/?q={domain} + # get all the subdomain of the domain from hackertarget + # the url is https://api.hackertarget.com/hostsearch/?q={domain} url = "https://api.hackertarget.com/hostsearch/?q=" + domain - try : + try: response = requests.get(url) """ the response is in this form : @@ -25,18 +26,18 @@ def hacker_target_parser(domain): dolibarr.benoit.fage.fr,82.66.13.124 content.benoit.fage.fr,157.90.145.185 """ - #split the response in lines + # split the response in lines lines = response.text.split("\n") - #get all the subdomains + # get all the subdomains subdomains = [] for line in lines: if line != "" and "*" not in line.split(",")[0]: - if line == "API count exceeded - Increase Quota with Membership" : + if line == "API count exceeded - Increase Quota with Membership": raise Exception("API") subdomains.append(line.split(",")[0]) - #delete all the occurences in the list + # delete all the occurences in the list subdomains = rp.delete_occurences(subdomains) - + return subdomains except Exception as e: if e.args[0] == "API": @@ -45,17 +46,18 @@ def hacker_target_parser(domain): logger.error("Impossible to get subdomains from hackertarget") return [] + def crtsh_parser(domain): - #get all the subdomain of the domain from crtsh - url= f"https://crt.sh/?q={domain}&output=json" - #user agent firefox + # get all the subdomain of the domain from crtsh + url = f"https://crt.sh/?q={domain}&output=json" + # user agent firefox headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0" } - try : + try: response = requests.get(url, headers=headers) - #response is a json format - #convert response.text in json + # response is a json format + # convert response.text in json json_data = json.loads(response.text) """ Example of json_data from crtsh : @@ -83,11 +85,11 @@ def crtsh_parser(domain): "serial_number": "030da3a68189369d6475a61ad5ec6618a11c" }] """ - #get all the common_name and name_value + # get all the common_name and name_value subdomains = [] for item in json_data: subdomains.append(item["common_name"]) - #split name_value in lines + # split name_value in lines lines = item["name_value"].split("\n") for line in lines: if line != "" and "*" not in line: @@ -97,14 +99,18 @@ def crtsh_parser(domain): except Exception as e: logger.error("Impossible to get subdomains from crtsh") return [] + + def alienvault_parser(domain): - #get all the subdomain of the domain from alienvault - #url https://otx.alienvault.com/api/v1/indicators/domain/{domain}/passive_dns - url = "https://otx.alienvault.com/api/v1/indicators/domain/" + domain + "/passive_dns" - try : + # get all the subdomain of the domain from alienvault + # url https://otx.alienvault.com/api/v1/indicators/domain/{domain}/passive_dns + url = ( + "https://otx.alienvault.com/api/v1/indicators/domain/" + domain + "/passive_dns" + ) + try: response = requests.get(url) - #response is a json format - #convert response.text in json + # response is a json format + # convert response.text in json json_data = json.loads(response.text) """ Example of json_data from alienvault @@ -135,15 +141,15 @@ def alienvault_parser(domain): "asn": "AS12322 free sas" } """ - #get all the hostname + # get all the hostname subdomains = [] for i in json_data["passive_dns"]: - try : - if "*" not in subdomains.append(i["hostname"]) : + try: + if "*" not in subdomains.append(i["hostname"]): subdomains.append(i["hostname"]) except: pass - #delete all the occurences in the list + # delete all the occurences in the list subdomains = rp.delete_occurences(subdomains) return subdomains except Exception as e: @@ -152,49 +158,54 @@ def alienvault_parser(domain): def from_wordlist(domain, wordlist_chunks): - #wordlist is Subdomain.txt - #open the file + # wordlist is Subdomain.txt + # open the file - #test all the subdomains like {subdomain}.{domain} + # test all the subdomains like {subdomain}.{domain} subdomains = [] for line in wordlist_chunks: if "*" in line: pass - #delete the \n + # delete the \n line = line.replace("\r", "") - #loaading percentage - print(f"{domain}\tWordlist testing : {str(round(wordlist_chunks.index(line) / len(wordlist_chunks) * 100, 2))}% ", end="\r") + # loaading percentage + print( + f"{domain}\tWordlist testing : {str(round(wordlist_chunks.index(line) / len(wordlist_chunks) * 100, 2))}% ", + end="\r", + ) request_to_test = line.strip() + "." + domain try: - #try to connect to the subdomain - #detect if there is a redirection - #if there is a redirection, check if the redirection is the same as the actual subdomain tested - #if the redirection is the same as the actual subdomain tested, add the subdomain to the list - #if the redirection is not the same as the actual subdomain tested, don't add the subdomain to the list - #if there is no redirection, add the subdomain to the list + # try to connect to the subdomain + # detect if there is a redirection + # if there is a redirection, check if the redirection is the same as the actual subdomain tested + # if the redirection is the same as the actual subdomain tested, add the subdomain to the list + # if the redirection is not the same as the actual subdomain tested, don't add the subdomain to the list + # if there is no redirection, add the subdomain to the list socket.gethostbyname(request_to_test) - #if the connection is successful, add the subdomain to the list + # if the connection is successful, add the subdomain to the list subdomains.append(request_to_test) except: pass return subdomains + + def divide_chunks(l, n): - # looping till length l for i in range(0, len(l), n): - yield l[i:i + n] + yield l[i : i + n] + def from_wordlist_thread(domain, thread_number, wordlist): with open(wordlist, "r") as file: - #read all the lines + # read all the lines lines = file.readlines() - #delete all \n + # delete all \n lines = [line.replace("\n", "") for line in lines] - ranges= list(divide_chunks(lines, len(lines) // thread_number)) + ranges = list(divide_chunks(lines, len(lines) // thread_number)) subdomains = [] threads = [] for i in ranges: - t = threading.Thread(target= lambda: subdomains.append(from_wordlist(domain, i))) + t = threading.Thread(target=lambda: subdomains.append(from_wordlist(domain, i))) threads.append(t) t.start() for i in threads: @@ -203,4 +214,4 @@ def from_wordlist_thread(domain, thread_number, wordlist): for i in subdomains: final_subdomains += i final_subdomains = rp.delete_occurences(final_subdomains) - return final_subdomains \ No newline at end of file + return final_subdomains diff --git a/main.py b/main.py index 14beead..541320f 100644 --- a/main.py +++ b/main.py @@ -15,36 +15,46 @@ import sys import time import requests + logger = cl.logger + def clear_screen(): if sys.platform == "win32": os.system("cls") else: os.system("clear") + def check_update(): logger.info("Checking for update...") try: with open("manifest", "r") as f: version = f.read() - url = f"https://raw.githubusercontent.com/ugomeguerditchian/OrgASM/main/manifest" + url = ( + f"https://raw.githubusercontent.com/ugomeguerditchian/OrgASM/main/manifest" + ) response = requests.get(url).text if response == version: logger.info("You are up to date") else: - logger.warning("Update available, please download the new version on https://github.com/ugomeguerditchian/OrgASM") + logger.warning( + "Update available, please download the new version on https://github.com/ugomeguerditchian/OrgASM" + ) logger.info("Resume in 3 seconds...") time.sleep(3) except Exception as e: logger.error("Impossible to check for update") -def recursive_subdomains(subs : list, wt :int, wd :str, mode :str, domain :str) -> list: + +def recursive_subdomains(subs: list, wt: int, wd: str, mode: str, domain: str) -> list: temp_subs = [] new_subs = [] - for subdomain in subs : - logger.info(f"Recursive scan of {subdomain} | {subs.index(subdomain)}/{len(subs)}") - if "O" in mode : + for subdomain in subs: + logger.info( + f"Recursive scan of {subdomain} | {subs.index(subdomain)}/{len(subs)}" + ) + if "O" in mode: logger.info("Alienvault testing...") temp_subs += sh.alienvault_parser(subdomain) logger.info("Alienvault testing done") @@ -54,11 +64,11 @@ def recursive_subdomains(subs : list, wt :int, wd :str, mode :str, domain :str) logger.info("Crt.sh testing...") temp_subs += sh.crtsh_parser(subdomain) logger.info("Crt.sh testing done") - if "B" in mode : + if "B" in mode: logger.info("Wordlist testing...") temp_subs += sh.from_wordlist_thread(subdomain, wt, f"wordlists/{wd}.txt") logger.info("Wordlist testing done") - print("\n") # some space between each subdomain + print("\n") # some space between each subdomain temp_subs = rp.delete_occurences(temp_subs) for subdomain in temp_subs: if subdomain not in subs: @@ -66,92 +76,196 @@ def recursive_subdomains(subs : list, wt :int, wd :str, mode :str, domain :str) if len(new_subs) == 0: return [] else: - logger.info(f"Found {len(new_subs)} new subdomains, saved them to : exports/{domain}/dynamic_sub_save.txt") + logger.info( + f"Found {len(new_subs)} new subdomains, saved them to : exports/{domain}/dynamic_sub_save.txt" + ) rp.dynamic_save(new_subs, domain, "add") return new_subs + def menu(): argpars = argparse.ArgumentParser() argpars.add_argument("-d", "--domain", required=False, help="Domain to scan") argpars.add_argument("-ip", "--ip", required=False, help="IP to scan") - argpars.add_argument("-net", "--network", required=False, help="Network to scan, don't forget the CIDR (ex: 192.168.1.0/24)") - argpars.add_argument("-m", "--mode", required=False, default="OBS", help="Mode to use, O for OSINT (API request), B for bruteforce, S for IP scan (default OBS)") - argpars.add_argument("-sF", "--subfile", required=False, help="Path to file with subdomains, one per line") - argpars.add_argument("-ipF", "--ipfile", required=False, help="Path to file with IPs, one per line") - argpars.add_argument("-R", "--recursive", required=False, default=1, type= int, help="Recursive scan, will rescan all the subdomains finds and go deeper as you want, default is 0") - argpars.add_argument("-w", "--wordlist", default="medium", type=str, required=False, help="Wordlist to use (small, medium(default), big)") - argpars.add_argument("-wT", "--wordlistThreads", default=500, type=int, required=False, help="Number of threads to use for Wordlist(default 500)") - argpars.add_argument("-iS", "--IPScanType", default="W", type=str, required=False, help="Choose what IPs to scan (W: only subdomains IP containing domain given, WR: only subdomains IP containtaining domain given but with a redirect, A: All subdomains detected") - argpars.add_argument("-iT", "--IPthreads", default=2000, type=int, required=False, help="Number of threads to use for IP scan(default 2000)") - argpars.add_argument("-sT", "--subdomainsThreads", default=500, type=int, required=False, help="Number of threads to use for check real subdomains(default 500)") - argpars.add_argument("-cP", "--checkPortsThreads", default=30, type=int, required=False, help="Check all ports of subdomains for all IP in IPScantype (-iS) and try to access them to check if it's a webport (default True) (deactivate with 0)") - argpars.add_argument("-dT", "--detectTechno", default=True, type=bool, required=False, help="Detect techno used by subdomains (default True) (deactivate with False)") - argpars.add_argument("-vuln", "--vulnScan", default=False, action="store_true", help="Scan subdomains using Nuclei, you need to have nuclei installed and in your PATH (default False)") - argpars.add_argument("-vulnconf", "--vulnConfig", default="", type=str, required=False, help="Path to config file for nuclei (default is the default config)") - argpars.add_argument("-limit", "--limit", default=False, action="store_true", required=False, help="Limit the scope of scan to the domain given or the subdomains given in the file (-sF) (default False)") + argpars.add_argument( + "-net", + "--network", + required=False, + help="Network to scan, don't forget the CIDR (ex: 192.168.1.0/24)", + ) + argpars.add_argument( + "-m", + "--mode", + required=False, + default="OBS", + help="Mode to use, O for OSINT (API request), B for bruteforce, S for IP scan (default OBS)", + ) + argpars.add_argument( + "-sF", + "--subfile", + required=False, + help="Path to file with subdomains, one per line", + ) + argpars.add_argument( + "-ipF", "--ipfile", required=False, help="Path to file with IPs, one per line" + ) + argpars.add_argument( + "-R", + "--recursive", + required=False, + default=1, + type=int, + help="Recursive scan, will rescan all the subdomains finds and go deeper as you want, default is 0", + ) + argpars.add_argument( + "-w", + "--wordlist", + default="medium", + type=str, + required=False, + help="Wordlist to use (small, medium(default), big)", + ) + argpars.add_argument( + "-wT", + "--wordlistThreads", + default=500, + type=int, + required=False, + help="Number of threads to use for Wordlist(default 500)", + ) + argpars.add_argument( + "-iS", + "--IPScanType", + default="W", + type=str, + required=False, + help="Choose what IPs to scan (W: only subdomains IP containing domain given, WR: only subdomains IP containtaining domain given but with a redirect, A: All subdomains detected", + ) + argpars.add_argument( + "-iT", + "--IPthreads", + default=2000, + type=int, + required=False, + help="Number of threads to use for IP scan(default 2000)", + ) + argpars.add_argument( + "-sT", + "--subdomainsThreads", + default=500, + type=int, + required=False, + help="Number of threads to use for check real subdomains(default 500)", + ) + argpars.add_argument( + "-cP", + "--checkPortsThreads", + default=30, + type=int, + required=False, + help="Check all ports of subdomains for all IP in IPScantype (-iS) and try to access them to check if it's a webport (default True) (deactivate with 0)", + ) + argpars.add_argument( + "-dT", + "--detectTechno", + default=True, + type=bool, + required=False, + help="Detect techno used by subdomains (default True) (deactivate with False)", + ) + argpars.add_argument( + "-vuln", + "--vulnScan", + default=False, + action="store_true", + help="Scan subdomains using Nuclei, you need to have nuclei installed and in your PATH (default False)", + ) + argpars.add_argument( + "-vulnconf", + "--vulnConfig", + default="", + type=str, + required=False, + help="Path to config file for nuclei (default is the default config)", + ) + argpars.add_argument( + "-limit", + "--limit", + default=False, + action="store_true", + required=False, + help="Limit the scope of scan to the domain given or the subdomains given in the file (-sF) (default False)", + ) args = argpars.parse_args() mode = args.mode wordlist_size = args.wordlist wordlist_thread_number = int(args.wordlistThreads) - #verify mode + # verify mode check_update() if len(mode) > 3 or len(mode) < 1 or not re.match("^[OBS]+$", mode): - logger.error("Mode not valid, must be O B or S (or concatenation like : OS, OB, OBS)") + logger.error( + "Mode not valid, must be O B or S (or concatenation like : OS, OB, OBS)" + ) exit(1) if mode == "S": - logger.error("Cannot use only IP scan mode, you have to use at least one of the other modes with it") + logger.error( + "Cannot use only IP scan mode, you have to use at least one of the other modes with it" + ) domain = "" if args.domain: domain = args.domain - - if args.ip : + + if args.ip: domain = args.ip - elif args.ipfile : + elif args.ipfile: domain = "multiple_ips" - elif args.network : + elif args.network: if "/" in args.network: domain = args.network.split("/")[0] - else : + else: domain = args.network - else : + else: domain = args.domain # help for argpars - - + if args.limit and args.recursive > 0: logger.warning("It's recommended to set recursive to 0 when using limit mode") input("Do you want to continue ? (y/n) ") if input == "n": exit(0) - else : - pass + else: + pass if args.limit and args.subfile == None: - logger.warning("It's recommended to put a file with subdomains to scan when using limit mode") + logger.warning( + "It's recommended to put a file with subdomains to scan when using limit mode" + ) input("Do you want to continue ? (y/n) ") if input == "n": exit(0) - else : - pass + else: + pass final_dict_result = {} - #ask for domain name - #Check if the domain name is valid with regex + # ask for domain name + # Check if the domain name is valid with regex if args.domain: - while not re.match(r"^[a-zA-Z0-9]+([\-\.]{1}[a-zA-Z0-9]+)*\.[a-zA-Z]{2,5}$", domain): + while not re.match( + r"^[a-zA-Z0-9]+([\-\.]{1}[a-zA-Z0-9]+)*\.[a-zA-Z]{2,5}$", domain + ): logger.error("Invalid domain name") domain = input("Enter domain name: ") - + if args.subfile: - all_results=[] + all_results = [] with open(args.subfile, "r") as file: for line in file: all_results.append(line.strip()) - else : + else: all_results = [] - - #get all the subdomains from alienvault - if "O" in mode : + + # get all the subdomains from alienvault + if "O" in mode: logger.info("Alienvault testing...") all_results += sh.alienvault_parser(domain) logger.info("Alienvault testing done") @@ -161,16 +275,21 @@ def menu(): logger.info("Crt.sh testing...") all_results += sh.crtsh_parser(domain) logger.info("Crt.sh testing done") - if "B" in mode : + if "B" in mode: logger.info("Wordlist testing...") - all_results += sh.from_wordlist_thread(domain, args.wordlistThreads, f"wordlists/{wordlist_size}.txt") + all_results += sh.from_wordlist_thread( + domain, args.wordlistThreads, f"wordlists/{wordlist_size}.txt" + ) if args.limit and args.subfile != None: with open(args.subfile, "r") as file: for line in file: - all_results += sh.from_wordlist_thread(line.strip(), args.wordlistThreads, f"wordlists/{wordlist_size}.txt") + all_results += sh.from_wordlist_thread( + line.strip(), + args.wordlistThreads, + f"wordlists/{wordlist_size}.txt", + ) logger.info("Wordlist testing done") - logger.info("Deleting occurences...") all_results = rp.delete_occurences(all_results) all_results = rp.delete_star(all_results) @@ -182,50 +301,62 @@ def menu(): with open(args.subfile, "r") as file: for line in file: sub_file.append(line.strip()) - for sub in all_results : + for sub in all_results: if sub not in sub_file and sub != domain: for sub2 in sub_file: if sub2 in sub: continue - else : + else: to_remove.append(sub) break - else : - for sub in all_results : + else: + for sub in all_results: if sub != domain: to_remove.append(sub) for sub in to_remove: all_results.remove(sub) - #clear the screen + # clear the screen clear_screen() - - logger.info(f"Actually creating the list of subdomains at : exports/{domain}/dynamic_sub_save.txt") + + logger.info( + f"Actually creating the list of subdomains at : exports/{domain}/dynamic_sub_save.txt" + ) rp.dynamic_save(all_results, domain, "create") - if args.recursive > 0 : + if args.recursive > 0: logger.info("Recursive scan...") - new_scan = recursive_subdomains(all_results, wordlist_thread_number, wordlist_size, mode, domain) + new_scan = recursive_subdomains( + all_results, wordlist_thread_number, wordlist_size, mode, domain + ) all_results += new_scan all_results = rp.delete_occurences(all_results) counter = 1 while len(new_scan) > 0 and counter < args.recursive: all_results += new_scan - new_scan = recursive_subdomains(new_scan, wordlist_thread_number, wordlist_size, mode, domain) + new_scan = recursive_subdomains( + new_scan, wordlist_thread_number, wordlist_size, mode, domain + ) all_results = rp.delete_occurences(all_results) counter += 1 logger.info("Recursive scan done") - #delete all the occurences in the list - + # delete all the occurences in the list + logger.info("Deleting occurences...") all_results = rp.delete_occurences(all_results) - #check subdomains by accessing them with dp.detect_redirect - if "S" in mode or "B" in mode : + # check subdomains by accessing them with dp.detect_redirect + if "S" in mode or "B" in mode: cl.logger.info("Checking subdomains...") - subdomains_with_redirect=[] + subdomains_with_redirect = [] temp_all_results = [] dead_subdomains = [] dns_exist = [] - temp_all_results, subdomains_with_redirect, dead_subdomains = dp.detect_redirect_with_thread_limit(all_results, args.subdomainsThreads) + ( + temp_all_results, + subdomains_with_redirect, + dead_subdomains, + ) = dp.detect_redirect_with_thread_limit( + all_results, args.subdomainsThreads + ) for dead in dead_subdomains: if dp.check_dns(dead): dns_exist.append(dead) @@ -234,13 +365,15 @@ def menu(): cl.logger.info("Checking subdomains done") logger.info("Deleting occurences...") all_results = rp.delete_occurences(all_results) - else : + else: subdomains_with_redirect = [] dead_subdomains = [] dns_exist = [] logger.info("All done") - - final_dict= rp.result_filter(all_results, domain, subdomains_with_redirect, dead_subdomains, dns_exist) + + final_dict = rp.result_filter( + all_results, domain, subdomains_with_redirect, dead_subdomains, dns_exist + ) logger.info(f"Subdomains containing {domain}:") for subdomain in final_dict["subdomain_withdomain"]: print(subdomain) @@ -255,209 +388,245 @@ def menu(): print(subdomain) final_dict_result = final_dict if "S" in mode: - final_dict_result= {} + final_dict_result = {} logger.info("IP sorting...") ip_dict = ips.get_all_ip(final_dict, domain) logger.info("IP sorting done") logger.info("IP sorting results:") - final_dict_result= ip_dict - #pop dead_subdomains + final_dict_result = ip_dict + # pop dead_subdomains final_dict_result["dead_subdomains"] = final_dict["dead_subdomains"] final_dict_result["dns_exist"] = final_dict["dns_exist"] pprint(final_dict_result) logger.info("Done") - deads= final_dict_result.pop("dead_subdomains") + deads = final_dict_result.pop("dead_subdomains") dns_exist = final_dict_result.pop("dns_exist") logger.info("IP scanning...") if args.IPScanType == "W": - for ip in final_dict_result : - if final_dict_result[ip]["subdomains"]["subdomain_withdomain"] != []: - open_ports= ips.port_scan_with_thread_limit(ip, range(65536), args.IPthreads) + for ip in final_dict_result: + if ( + final_dict_result[ip]["subdomains"]["subdomain_withdomain"] + != [] + ): + open_ports = ips.port_scan_with_thread_limit( + ip, range(65536), args.IPthreads + ) for port in open_ports: - final_dict_result[ip]["ports"][port]= ips.detect_service(ip, port) + final_dict_result[ip]["ports"][port] = ips.detect_service( + ip, port + ) elif args.IPScanType == "WR": - for ip in final_dict_result : - if final_dict_result[ip]["subdomains"]["subdomain_with_redirect"] != [] or final_dict_result[ip]["subdomains"]["subdomain_withdomain"] != []: - open_ports= ips.port_scan_with_thread_limit(ip, range(65536), args.IPthreads) + for ip in final_dict_result: + if ( + final_dict_result[ip]["subdomains"]["subdomain_with_redirect"] + != [] + or final_dict_result[ip]["subdomains"]["subdomain_withdomain"] + != [] + ): + open_ports = ips.port_scan_with_thread_limit( + ip, range(65536), args.IPthreads + ) for port in open_ports: - final_dict_result[ip]["ports"][port]= ips.detect_service(ip, port) + final_dict_result[ip]["ports"][port] = ips.detect_service( + ip, port + ) elif args.IPScanType == "A": - for ip in final_dict_result : - open_ports= ips.port_scan_with_thread_limit(ip, range(65536), args.IPthreads) + for ip in final_dict_result: + open_ports = ips.port_scan_with_thread_limit( + ip, range(65536), args.IPthreads + ) for port in open_ports: - final_dict_result[ip]["ports"][port]= ips.detect_service(ip, port) - + final_dict_result[ip]["ports"][port] = ips.detect_service( + ip, port + ) logger.info("IP scanning done") logger.info("IP scanning service analysis...") - final_dict_result= rp.service_recognizer(final_dict_result) + final_dict_result = rp.service_recognizer(final_dict_result) logger.info("IP scanning service analysis done") logger.info("Detecting web ports...") if args.checkPortsThreads != 0: - final_dict_result= dp.detect_web_port(final_dict_result, args.checkPortsThreads, args.IPScanType) - if args.detectTechno : - final_dict_result = dp.detect_web_techno(final_dict_result, args.IPScanType) - final_dict_result = dp.detect_web_techno_domain(final_dict_result, args.IPScanType) - if args.vulnScan : - final_dict_result = ips.run_parse_nuclei(final_dict_result, domain, args.IPScanType, args.vulnConfig) + final_dict_result = dp.detect_web_port( + final_dict_result, args.checkPortsThreads, args.IPScanType + ) + if args.detectTechno: + final_dict_result = dp.detect_web_techno( + final_dict_result, args.IPScanType + ) + final_dict_result = dp.detect_web_techno_domain( + final_dict_result, args.IPScanType + ) + if args.vulnScan: + final_dict_result = ips.run_parse_nuclei( + final_dict_result, domain, args.IPScanType, args.vulnConfig + ) logger.info("Detecting web ports done") logger.info("IP scanning results:") - final_dict_result["dead_subdomains"]= deads + final_dict_result["dead_subdomains"] = deads final_dict_result["dns_exist"] = dns_exist pprint(final_dict_result) logger.info("Done") - elif args.ip : - if args.ipfile : + elif args.ip: + if args.ipfile: logger.error("You can't use -ip and -ipFile at the same time") exit(1) - if args.network : + if args.network: logger.error("You can't use -ip and -network at the same time") exit(1) - final_dict_result= { - args.ip : { - "ports" : {}, - "subdomains" : { - "subdomain_withdomain" : [], - "subdomain_withoutdomain" : [], - "subdomain_with_redirect" : [], - "dead_subdomains" : [], - "vulns" : {}, + final_dict_result = { + args.ip: { + "ports": {}, + "subdomains": { + "subdomain_withdomain": [], + "subdomain_withoutdomain": [], + "subdomain_with_redirect": [], + "dead_subdomains": [], + "vulns": {}, }, - "techno" : { - "web" : [], - "web_domain" : [] - }, - "vulns" : [] + "techno": {"web": [], "web_domain": []}, + "vulns": [], } } logger.info("IP scanning...") - open_ports= ips.port_scan_with_thread_limit(args.ip, range(65536), args.IPthreads) + open_ports = ips.port_scan_with_thread_limit( + args.ip, range(65536), args.IPthreads + ) for port in open_ports: - final_dict_result[args.ip]["ports"][port]= ips.detect_service(args.ip, port) + final_dict_result[args.ip]["ports"][port] = ips.detect_service( + args.ip, port + ) logger.info("IP scanning done") logger.info("IP scanning service analysis...") - final_dict_result= rp.service_recognizer(final_dict_result) + final_dict_result = rp.service_recognizer(final_dict_result) logger.info("IP scanning service analysis done") logger.info("Detecting web ports...") if args.checkPortsThreads != 0: - final_dict_result= dp.detect_web_port(final_dict_result, args.checkPortsThreads, "A") - if args.detectTechno : + final_dict_result = dp.detect_web_port( + final_dict_result, args.checkPortsThreads, "A" + ) + if args.detectTechno: final_dict_result = dp.detect_web_techno(final_dict_result, "A") final_dict_result = dp.detect_web_techno_domain(final_dict_result, "A") - if args.vulnScan : - logger.info('Running nuclei scan...') - final_dict_result = ips.run_parse_nuclei(final_dict_result, domain, "A", args.vulnConfig) + if args.vulnScan: + logger.info("Running nuclei scan...") + final_dict_result = ips.run_parse_nuclei( + final_dict_result, domain, "A", args.vulnConfig + ) logger.info("Detecting web ports done") logger.info("IP scanning results:") pprint(final_dict_result) logger.info("Done") - elif args.ipfile : - if args.network : + elif args.ipfile: + if args.network: logger.error("You can't use -ipFile and -network at the same time") exit(1) - if args.ip : + if args.ip: logger.error("You can't use -ipFile and -ip at the same time") exit(1) - final_dict_result= {} + final_dict_result = {} logger.info("IP scanning...") with open(args.ipfile, "r") as file: for line in file: - ip= line.strip() + ip = line.strip() final_dict_result[ip] = { - "ports" : {}, - "subdomains" : { - "subdomain_withdomain" : [], - "subdomain_withoutdomain" : [], - "subdomain_with_redirect" : [], - "dead_subdomains" : [], - "vulns" : {}, - }, - "techno" : { - "web" : [], - "web_domain" : [] + "ports": {}, + "subdomains": { + "subdomain_withdomain": [], + "subdomain_withoutdomain": [], + "subdomain_with_redirect": [], + "dead_subdomains": [], + "vulns": {}, }, - "vulns" : [] + "techno": {"web": [], "web_domain": []}, + "vulns": [], } - open_ports= ips.port_scan_with_thread_limit(ip, range(65536), args.IPthreads) + open_ports = ips.port_scan_with_thread_limit( + ip, range(65536), args.IPthreads + ) for port in open_ports: - final_dict_result[ip]["ports"][port]= ips.detect_service(ip, port) + final_dict_result[ip]["ports"][port] = ips.detect_service(ip, port) logger.info("IP scanning done") logger.info("IP scanning service analysis...") - final_dict_result= rp.service_recognizer(final_dict_result) + final_dict_result = rp.service_recognizer(final_dict_result) logger.info("IP scanning service analysis done") logger.info("Detecting web ports...") if args.checkPortsThreads != 0: - final_dict_result= dp.detect_web_port(final_dict_result, args.checkPortsThreads, "A") - if args.detectTechno : + final_dict_result = dp.detect_web_port( + final_dict_result, args.checkPortsThreads, "A" + ) + if args.detectTechno: final_dict_result = dp.detect_web_techno(final_dict_result, "A") final_dict_result = dp.detect_web_techno_domain(final_dict_result, "A") - if args.vulnScan : - logger.info('Running nuclei scan...') - final_dict_result = ips.run_parse_nuclei(final_dict_result, domain, "A", args.vulnConfig) + if args.vulnScan: + logger.info("Running nuclei scan...") + final_dict_result = ips.run_parse_nuclei( + final_dict_result, domain, "A", args.vulnConfig + ) logger.info("Detecting web ports done") logger.info("IP scanning results:") pprint(final_dict_result) logger.info("Done") - elif args.network : - if args.ipfile : + elif args.network: + if args.ipfile: logger.error("You can't use -network and -ipFile at the same time") exit(1) - if args.ip : + if args.ip: logger.error("You can't use -network and -ip at the same time") exit(1) - final_dict_result= {} + final_dict_result = {} logger.info("Network scanning...") for ip in ips.get_ip_from_network(args.network): ip = ip["ip"] final_dict_result[ip] = { - "ports" : {}, - "subdomains" : { - "subdomain_withdomain" : [], - "subdomain_withoutdomain" : [], - "subdomain_with_redirect" : [], - "dead_subdomains" : [], - "vulns" : {}, - }, - "techno" : { - "web" : [], - "web_domain" : [] + "ports": {}, + "subdomains": { + "subdomain_withdomain": [], + "subdomain_withoutdomain": [], + "subdomain_with_redirect": [], + "dead_subdomains": [], + "vulns": {}, }, - "vulns" : [] + "techno": {"web": [], "web_domain": []}, + "vulns": [], } - open_ports= ips.port_scan_with_thread_limit(ip, range(65536), args.IPthreads) + open_ports = ips.port_scan_with_thread_limit( + ip, range(65536), args.IPthreads + ) for port in open_ports: - final_dict_result[ip]["ports"][port]= ips.detect_service(ip, port) + final_dict_result[ip]["ports"][port] = ips.detect_service(ip, port) logger.info("IP scanning done") logger.info("IP scanning service analysis...") - final_dict_result= rp.service_recognizer(final_dict_result) + final_dict_result = rp.service_recognizer(final_dict_result) logger.info("IP scanning service analysis done") logger.info("Detecting web ports...") if args.checkPortsThreads != 0: - final_dict_result= dp.detect_web_port(final_dict_result, args.checkPortsThreads, "A") - if args.detectTechno : + final_dict_result = dp.detect_web_port( + final_dict_result, args.checkPortsThreads, "A" + ) + if args.detectTechno: final_dict_result = dp.detect_web_techno(final_dict_result, "A") final_dict_result = dp.detect_web_techno_domain(final_dict_result, "A") - if args.vulnScan : - logger.info('Running nuclei scan...') - final_dict_result = ips.run_parse_nuclei(final_dict_result, domain, "A", args.vulnConfig) + if args.vulnScan: + logger.info("Running nuclei scan...") + final_dict_result = ips.run_parse_nuclei( + final_dict_result, domain, "A", args.vulnConfig + ) logger.info("Detecting web ports done") logger.info("IP scanning results:") pprint(final_dict_result) logger.info("Done") - if not os.path.exists("exports"): os.mkdir("exports") - - if not os.path.exists("exports/"+domain): - os.mkdir("exports/"+domain) + if not os.path.exists("exports/" + domain): + os.mkdir("exports/" + domain) date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") file_name = f"result_{domain.replace('.','-')}_{date}.json" - with open("exports/"+domain+"/"+file_name, "w") as f: + with open("exports/" + domain + "/" + file_name, "w") as f: json.dump(final_dict_result, f, indent=4) logger.info(f"File saved in exports/{file_name}") @@ -465,15 +634,19 @@ def menu(): html = template.render(data=final_dict_result, metadata={"version": "1.0.0"}) with open(f"exports/{domain}/html_report_{domain}_{date}.html", "w") as file: file.write(html) - logger.info(f"HTML report saved in exports/{domain}/html_report_{domain}_{date}.html") + logger.info( + f"HTML report saved in exports/{domain}/html_report_{domain}_{date}.html" + ) input("Press enter to open the result in your browser...") - path = os.sep.join([os.getcwd(),f"exports/{domain}/html_report_{domain}_{date}.html"]) + path = os.sep.join( + [os.getcwd(), f"exports/{domain}/html_report_{domain}_{date}.html"] + ) if os.name == "nt": os.startfile(path) if sys.platform.startswith("linux"): os.system("xdg-open " + path) logger.info("Exiting...") + if __name__ == "__main__": menu() - diff --git a/manifest b/manifest index e8fbede..f67cfd2 100644 --- a/manifest +++ b/manifest @@ -1 +1 @@ -V2.2 \ No newline at end of file +V2.2.1 \ No newline at end of file