diff --git a/BeamMP.py b/BeamMP.py index 8d821ae..d9f9689 100644 --- a/BeamMP.py +++ b/BeamMP.py @@ -1,6 +1,5 @@ from datetime import datetime, timezone from pymongo import UpdateOne -import requests from MasterServer import MasterServer @@ -12,7 +11,7 @@ def __init__(self) -> None: def job(self): # Fetch data until empty - servers = self._fetch() + servers = self._fetch_url('https://backend.beammp.com/servers-info') # Perform bulk write (upsert) operation self._upsert_bulk_write(servers) @@ -32,13 +31,6 @@ def find(self, *, host: str, port: int): return result - def _fetch(self) -> list: - url = "https://backend.beammp.com/servers-info" - response = requests.get(url, timeout=15) - response.raise_for_status() - data = response.json() - return data - def _upsert_bulk_write(self, server_list: list): # Prepare the updates updates = [ diff --git a/Factorio.py b/Factorio.py index b493c49..ce6ed26 100644 --- a/Factorio.py +++ b/Factorio.py @@ -1,7 +1,6 @@ import os from datetime import datetime, timezone from pymongo import UpdateOne -import requests from MasterServer import MasterServer @@ -38,10 +37,7 @@ def _fetch(self) -> list: username, token = os.getenv("FACTORIO_USERNAME") token = os.getenv("FACTORIO_TOKEN") url = f"https://multiplayer.factorio.com/get-games?username={username}&token={token}" - - response = requests.get(url, timeout=15) - response.raise_for_status() - data = response.json() + data = self._fetch_url(url) if "message" in data: # Possible error messages diff --git a/MasterServer.py b/MasterServer.py index 3da83ba..6be66c4 100644 --- a/MasterServer.py +++ b/MasterServer.py @@ -1,10 +1,12 @@ from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone +import json import os import time from pymongo import MongoClient from dotenv import load_dotenv +import requests from tqdm import tqdm load_dotenv() @@ -42,6 +44,32 @@ def run(self): elapsed_time = time.time() - start_time print(f"Job done: {self.key}. Time elapsed: {elapsed_time:.2f} seconds") + def _fetch_url(self, url: str): + response = requests.get(url, stream=True, timeout=15) + response.raise_for_status() + + # Get the total content length (in bytes) from the response headers + total_size = int(response.headers.get("content-length", 0)) + + # Initialize an empty data buffer + data = b'' + + # Create a progress bar + desc = f"[{self.key}] Fetching data from url" + with tqdm(total=total_size, unit="B", unit_scale=True, unit_divisor=1024, desc=desc) as pbar: + for chunk in response.iter_content(chunk_size=1024): + if chunk: + data += chunk + pbar.update(len(chunk)) + + # Convert bytes to string + str_data = data.decode('utf-8') + + # Convert string to JSON + json_data = json.loads(str_data) + + return json_data + def _bulk_write(self, updates: list): # Chunk size for bulk write max_workers = min(32, os.cpu_count() + 4) @@ -51,7 +79,7 @@ def _bulk_write(self, updates: list): update_chunks = [updates[i:i + chunk_size] for i in range(0, len(updates), chunk_size)] - pbar = tqdm(total=len(updates), desc="Bulk Write") + pbar = tqdm(total=len(updates), desc=f"[{self.key}] Bulk Write") def perform_bulk_write(i: int): self.collection.bulk_write(update_chunks[i], ordered=False) diff --git a/Palworld.py b/Palworld.py index 470dfb6..88eb98b 100644 --- a/Palworld.py +++ b/Palworld.py @@ -61,7 +61,7 @@ def _upsert_bulk_write(self, server_list: list): def _fetch_until_empty(self): servers = [] stop = False - pbar = tqdm(total=0) + pbar = tqdm(total=0, desc=f'[{self.key}] Fetch Page') with ThreadPoolExecutor() as executor: futures = {executor.submit(self._fetch_page, pbar.total + 1)} @@ -79,7 +79,6 @@ def _fetch_until_empty(self): stop = True if not stop: - pbar.set_description(f'Page {pbar.total + 1}') pbar.total += 1 pbar.refresh() futures.add(executor.submit( diff --git a/Scum.py b/Scum.py new file mode 100644 index 0000000..dc8cc12 --- /dev/null +++ b/Scum.py @@ -0,0 +1,125 @@ +from datetime import datetime, timezone +import socket +import struct +from pymongo import UpdateOne +from tqdm import tqdm + +from MasterServer import MasterServer + + +class Scum(MasterServer): + # Unknown index = | index: 0 4 5 6 7 8 | 10 | 12 + __SERVER_INFO_STRUCT = struct.Struct("<4B H 100s x B B B B B 7s 8B") + + def __init__(self) -> None: + super().__init__('Scum') + self.collection.create_index({'ip': 1, 'port': 1}) + + def job(self): + # Fetch data until empty + servers = self._fetch() + + # Perform bulk write (upsert) operation + self._upsert_bulk_write(servers) + + # Remove old documents (assuming this method exists) + self._remove_old_documents(minutes=30) + + def find(self, *, host: str, port: int): + # Define the query to find documents with a specific address and port + query = {'ip': host, 'port': port} + + # Specify the projection to exclude certain fields + projection = {'_id': 0, '_created_at': 0, '_last_modified': 0} + + # Retrieve the result + result = self.collection.find_one(query, projection) + + return result + + def _fetch(self): + __addresses = [ + ("176.57.138.2", 1040), + ("172.107.16.215", 1040), + ("206.189.248.133", 1040), + ] + size = self.__SERVER_INFO_STRUCT.size # 127 bytes for each server + + for address in __addresses: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: + try: + client_socket.connect(address) + except Exception as e: + print(f"Failed to connect to {address}: {e}") + continue + + client_socket.sendall(b"\x04\x03\x00\x00") + + total: int = struct.unpack('> 1) & 1) == 1 + + # Convert the result to hexadecimal and pad with zeros + hex_values = [hex(result[12 + i])[2:].rjust(2, '0') for i in range(8)] + + # Reverse the list + reversed_hex_values = list(reversed(hex_values)) + + # Extract version components + major = int(reversed_hex_values[0], 16) + minor = int(reversed_hex_values[1], 16) + patch = int(reversed_hex_values[2] + reversed_hex_values[3], 16) + build = int(''.join(reversed_hex_values[4:]), 16) + + # Format the version string + server['version'] = f"{major}.{minor}.{patch}.{build}" + + return server + + def _upsert_bulk_write(self, server_list: list): + # Prepare the updates + updates = [ + UpdateOne( + {'ip': server['ip'], 'port': server['port']}, + { + '$set': server, + '$currentDate': {'_last_modified': True}, + '$setOnInsert': {'_created_at': datetime.now(timezone.utc)} + }, + upsert=True + ) + for server in server_list + ] + return self._bulk_write(updates) + + +if __name__ == "__main__": + scum = Scum() + # scum.job() + server = scum.find(host='15.235.181.18', port=7042) + print(server) diff --git a/app.py b/app.py index 6847482..7d06d57 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ from BeamMP import BeamMP from Factorio import Factorio from Palworld import Palworld +from Scum import Scum app = Flask(__name__) @@ -46,5 +47,10 @@ def palworld_search(): return search(request.args, Palworld()) +@app.route('/scum/search', methods=['GET']) +def scum_search(): + return search(request.args, Scum()) + + if __name__ == '__main__': app.run(debug=True) diff --git a/main.py b/main.py index 6e868c6..026bbb8 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from BeamMP import BeamMP from Factorio import Factorio from Palworld import Palworld +from Scum import Scum threads: dict[str, Thread] = {} @@ -20,6 +21,8 @@ def run_threaded(master_server: MasterServer): schedule.every(5).minutes.do(run_threaded, BeamMP()) schedule.every(5).minutes.do(run_threaded, Factorio()) schedule.every(5).minutes.do(run_threaded, Palworld()) +schedule.every(5).minutes.do(run_threaded, Palworld()) +schedule.every(5).minutes.do(run_threaded, Scum()) for job in schedule.get_jobs(): print(f"Job: {job}, Next run: {job.next_run}, Period: {job.period}")