From 82b608cc3a1f9b7f11ae36f72a0137ce860af019 Mon Sep 17 00:00:00 2001 From: TatLead Date: Mon, 11 Mar 2024 20:48:33 +0000 Subject: [PATCH] Support Factorio --- .env.example | 3 ++ Factorio.py | 74 +++++++++++++++++++++++++++++++++++++++++++++++++ MasterServer.py | 55 ++++++++++++++++++++++++++++++++++-- Palworld.py | 54 ++++-------------------------------- app.py | 23 +++++++++++---- main.py | 5 ++-- 6 files changed, 156 insertions(+), 58 deletions(-) create mode 100644 Factorio.py diff --git a/.env.example b/.env.example index 4c2f61b..a5fd1d5 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,5 @@ DATABASE_URL= PORT=8000 + +FACTORIO_USERNAME= +FACTORIO_TOKEN= diff --git a/Factorio.py b/Factorio.py new file mode 100644 index 0000000..d64e02f --- /dev/null +++ b/Factorio.py @@ -0,0 +1,74 @@ +import os +from datetime import datetime, timezone +from pymongo import UpdateOne +import requests + +from MasterServer import MasterServer + + +class Factorio(MasterServer): + def __init__(self) -> None: + super().__init__('Factorio') + self.collection.create_index('host_address') + + def job(self): + # Fetch data until empty + servers = self._fetch() + + # Perform bulk write (upsert) operation + self._upsert_bulk_write(servers) + + # Remove old documents (assuming this method exists) + self._remove_old_documents(minutes=30) + + def find(self, *, host: str, port: int): + # Define the query to find documents with a specific address and port + query = {'host_address': f'{host}:{port}'} + + # Specify the projection to exclude certain fields + projection = {'_id': 0, '_created_at': 0, '_last_modified': 0} + + # Retrieve the result + result = self.collection.find_one(query, projection) + + return result + + def _fetch(self) -> list: + username, token = os.getenv("FACTORIO_USERNAME") + token = os.getenv("FACTORIO_TOKEN") + url = f"https://multiplayer.factorio.com/get-games?username={username}&token={token}" + + response = requests.get(url, timeout=15) + response.raise_for_status() + data = response.json() + + if "message" in data: + # Possible error messages + # 1. User not found. -> Invalid FACTORIO_USERNAME + # 2. Token doesn't match. -> Invalid FACTORIO_TOKEN + raise LookupError(data["message"]) + + return data + + def _upsert_bulk_write(self, server_list: list): + # Prepare the updates + updates = [ + UpdateOne( + {'server_id': server['server_id']}, + { + '$set': server, + '$currentDate': {'_last_modified': True}, + '$setOnInsert': {'_created_at': datetime.now(timezone.utc)} + }, + upsert=True + ) + for server in server_list + ] + return self._bulk_write(updates) + + +if __name__ == "__main__": + factorio = Factorio() + # factorio.job() + server = factorio.find(host='176.93.252.86', port=24609) + print(server) diff --git a/MasterServer.py b/MasterServer.py index 09fa9b8..3da83ba 100644 --- a/MasterServer.py +++ b/MasterServer.py @@ -1,16 +1,22 @@ from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timedelta, timezone import os +import time from pymongo import MongoClient from dotenv import load_dotenv +from tqdm import tqdm load_dotenv() class MasterServer(ABC): - def __init__(self) -> None: - self._key = '' + def __init__(self, key: str) -> None: + self._key = key uri = os.getenv('DATABASE_URL') self.client = MongoClient(uri) + self.db = self.client['MasterServer'] + self.collection = self.db[key] @property def key(self): @@ -23,3 +29,48 @@ def job(self): @abstractmethod def find(self, *, host: str, port: int): pass + + def run(self): + # Record the start time + start_time = time.time() + print(f"Running job: {self.key}") + + # Run job + self.job() + + # Calculate elapsed time + elapsed_time = time.time() - start_time + print(f"Job done: {self.key}. Time elapsed: {elapsed_time:.2f} seconds") + + def _bulk_write(self, updates: list): + # Chunk size for bulk write + max_workers = min(32, os.cpu_count() + 4) + chunk_size = -(-len(updates) // max_workers) + + # Split the updates into chunks + update_chunks = [updates[i:i + chunk_size] + for i in range(0, len(updates), chunk_size)] + + pbar = tqdm(total=len(updates), desc="Bulk Write") + + def perform_bulk_write(i: int): + self.collection.bulk_write(update_chunks[i], ordered=False) + pbar.update(len(update_chunks[i])) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + results = executor.map(perform_bulk_write, range(max_workers)) + + return results + + def _remove_old_documents(self, minutes: int): + # Calculate the time 'minutes' ago + time_ago = datetime.now(timezone.utc) - timedelta(minutes=minutes) + + # Remove documents that haven't been updated for 'minutes' + result = self.collection.delete_many( + {'_last_modified': {'$lt': time_ago}}) + + # Print the count of deleted documents + print(f"Deleted {result.deleted_count} documents that haven't been updated for {minutes} minutes.") + + return result diff --git a/Palworld.py b/Palworld.py index 564abf5..470dfb6 100644 --- a/Palworld.py +++ b/Palworld.py @@ -1,7 +1,5 @@ -import os from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED -from datetime import datetime, timedelta, timezone -import time +from datetime import datetime, timezone from pymongo import UpdateOne from tqdm import tqdm import requests @@ -11,18 +9,11 @@ class Palworld(MasterServer): def __init__(self) -> None: - super().__init__() - self._key = 'Palworld' - self.db = self.client['MasterServer'] - self.collection = self.db['PalWorld'] + super().__init__('Palworld') self.collection.create_index('server_id') self.collection.create_index({'address': 1, 'port': 1}) def job(self): - # Record the start time - start_time = time.time() - print(f"Running job: {self.key}") - # Fetch data until empty servers = self._fetch_until_empty() @@ -32,10 +23,6 @@ def job(self): # Remove old documents (assuming this method exists) self._remove_old_documents(minutes=30) - # Calculate elapsed time - elapsed_time = time.time() - start_time - print(f"Job done: {self.key}. Time elapsed: {elapsed_time:.2f} seconds") - def find(self, *, host: str, port: int): # Define the query to find documents with a specific address and port query = {'address': host, 'port': port} @@ -50,7 +37,7 @@ def find(self, *, host: str, port: int): def _fetch_page(self, page: int) -> list: url = f"https://api.palworldgame.com/server/list?page={page}" - response = requests.get(url) + response = requests.get(url, timeout=15) response.raise_for_status() data = response.json() return data['server_list'] @@ -69,25 +56,7 @@ def _upsert_bulk_write(self, server_list: list): ) for server in server_list ] - - # Chunk size for bulk write - max_workers = min(32, os.cpu_count() + 4) - chunk_size = -(-len(updates) // max_workers) - - # Split the updates into chunks - update_chunks = [updates[i:i + chunk_size] - for i in range(0, len(updates), chunk_size)] - - pbar = tqdm(total=len(updates), desc="Bulk Write") - - def perform_bulk_write(i: int): - self.collection.bulk_write(update_chunks[i], ordered=False) - pbar.update(len(update_chunks[i])) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - results = executor.map(perform_bulk_write, range(max_workers)) - - return results + return self._bulk_write(updates) def _fetch_until_empty(self): servers = [] @@ -118,22 +87,9 @@ def _fetch_until_empty(self): return servers - def _remove_old_documents(self, minutes: int): - # Calculate the time 'minutes' ago - time_ago = datetime.now(timezone.utc) - timedelta(minutes=minutes) - - # Remove documents that haven't been updated for 'minutes' - result = self.collection.delete_many( - {'_last_modified': {'$lt': time_ago}}) - - # Print the count of deleted documents - print(f"Deleted {result.deleted_count} documents that haven't been updated for {minutes} minutes.") - - return result - if __name__ == "__main__": palword = Palworld() - palword.run() + # palword.job() server = palword.find(host='104.192.227.52', port=8211) print(server) diff --git a/app.py b/app.py index 12f0e2d..7fc310a 100644 --- a/app.py +++ b/app.py @@ -1,13 +1,16 @@ from flask import Flask, abort, request, jsonify + +from MasterServer import MasterServer + +from Factorio import Factorio from Palworld import Palworld app = Flask(__name__) -@app.route('/palworld/search', methods=['GET']) -def palworld_search(): - host = request.args.get('host') - port = request.args.get('port') +def search(args: dict, master_server: MasterServer): + host = args.get('host') + port = args.get('port') # Check if host and port are provided if not host or not port: @@ -18,7 +21,7 @@ def palworld_search(): except ValueError: abort(400, description="'port' must be an integer.") - result = Palworld().find(host=host, port=port) + result = master_server.find(host=host, port=port) # Check if result is found if not result: @@ -27,5 +30,15 @@ def palworld_search(): return jsonify(result) +@app.route('/factorio/search', methods=['GET']) +def factorio_search(): + return search(request.args, Factorio()) + + +@app.route('/palworld/search', methods=['GET']) +def palworld_search(): + return search(request.args, Palworld()) + + if __name__ == '__main__': app.run(debug=True) diff --git a/main.py b/main.py index 0f2c075..1e36d13 100644 --- a/main.py +++ b/main.py @@ -3,18 +3,19 @@ import schedule from MasterServer import MasterServer +from Factorio import Factorio from Palworld import Palworld - threads: dict[str, Thread] = {} def run_threaded(master_server: MasterServer): if master_server.key not in threads or not threads[master_server.key].is_alive(): - threads[master_server.key] = Thread(target=master_server.job) + threads[master_server.key] = Thread(target=master_server.run) threads[master_server.key].start() +schedule.every(5).minutes.do(run_threaded, Factorio()) schedule.every(5).minutes.do(run_threaded, Palworld()) for job in schedule.get_jobs():