diff --git a/packages/simtester/setup.py b/packages/simtester/setup.py new file mode 100644 index 00000000000..238f8721ed7 --- /dev/null +++ b/packages/simtester/setup.py @@ -0,0 +1,18 @@ +# third party +from setuptools import find_packages +from setuptools import setup + +__version__ = "0.1.0" + +if __name__ == "__main__": + setup( + name="simtester", + version=__version__, + packages=find_packages(where="src"), + package_dir={"": "src"}, + entry_points={ + "console_scripts": [ + "simtester=simtester.__main__:main", # Exposes the command 'simtester' + ], + }, + ) diff --git a/packages/simtester/src/simtester/__init__.py b/packages/simtester/src/simtester/__init__.py new file mode 100644 index 00000000000..e8cfa040eef --- /dev/null +++ b/packages/simtester/src/simtester/__init__.py @@ -0,0 +1,3 @@ +# relative +from .actor import Actor +from .actor import action diff --git a/packages/simtester/src/simtester/__main__.py b/packages/simtester/src/simtester/__main__.py new file mode 100644 index 00000000000..2d09381fef9 --- /dev/null +++ b/packages/simtester/src/simtester/__main__.py @@ -0,0 +1,76 @@ +# stdlib +import argparse +import asyncio +import importlib.util +import os + +# relative +from .actor import Actor +from .runner import Runner + + +def load_actor_class(module_path, class_name): + spec = importlib.util.spec_from_file_location("actor_module", module_path) + actor_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(actor_module) + + # Find the specific class by name + obj = getattr(actor_module, class_name, None) + if obj is None: + raise ImportError(f"Class `{class_name}` not found in `{module_path}`.") + if not isinstance(obj, type) or not issubclass(obj, Actor): + raise TypeError(f"`{class_name}` is not a valid subclass of `Actor`.") + return obj + + +def parse_actor_args(args): + actor_data = [] + for i in range( + 0, len(args), 2 + ): # Iterate over the pairs of `module_path::class_name` and `count` + path_class_pair = args[i] + count = int(args[i + 1]) + + if "::" not in path_class_pair: + raise ValueError( + f"Invalid format for actor class specification: {path_class_pair}" + ) + + module_path, class_name = path_class_pair.split("::") + + # Resolve the absolute module path + module_path = os.path.abspath(module_path) + + actor_data.append((module_path, class_name, count)) + return actor_data + + +def main(): + parser = argparse.ArgumentParser(description="Run the simulation tests") + parser.add_argument( + "actor_args", + nargs="+", + help="Actor class specifications in the format path/to/file.py::Class count.\n" + "Example usage: `bigquery/level_0_simtest.py::DataScientist 5 bigquery/level_0_simtest.py::Admin 2`.\n" + "This will spawn 5 DataScientist actors and 2 Admin actors.", + ) + args = parser.parse_args() + + actor_specs = parse_actor_args(args.actor_args) + + actor_classes = [] + for module_path, class_name, count in actor_specs: + try: + actor_class = load_actor_class(module_path, class_name) + except (ImportError, TypeError) as e: + print(e) + return + actor_classes.append((actor_class, count)) + + # Run the simulation with multiple actor classes + runner = Runner(actor_classes) + asyncio.run(runner.start()) + + +if __name__ == "__main__": + main() diff --git a/packages/simtester/src/simtester/actor.py b/packages/simtester/src/simtester/actor.py new file mode 100644 index 00000000000..1218db83d13 --- /dev/null +++ b/packages/simtester/src/simtester/actor.py @@ -0,0 +1,49 @@ +# stdlib +import asyncio +import logging +import random + + +class Actor: + cooldown_period: int = 1 + + def __init__(self, name): + self.name = name + self.logger = logging.getLogger(self.name) + self.logger.setLevel(logging.INFO) # Set default logging level + handler = logging.StreamHandler() # Log to stdout + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + self.actions = [ + method + for method in dir(self) + if callable(getattr(self, method)) + and getattr(getattr(self, method), "is_action", False) + ] + # Call setup if defined + if hasattr(self, "setup") and callable(self.setup): + self.setup() + + async def run(self): + try: + while True: + action = random.choice(self.actions) + await getattr(self, action)() + if isinstance(self.cooldown_period, int): + cooldown = self.cooldown_period + elif isinstance(self.cooldown_period, tuple): + cooldown = random.randint(*self.cooldown_period) + await asyncio.sleep(cooldown) + finally: + # Call teardown if defined + if hasattr(self, "teardown") and callable(self.teardown): + self.teardown() + + +def action(func): + func.is_action = True + return func diff --git a/packages/simtester/src/simtester/runner.py b/packages/simtester/src/simtester/runner.py new file mode 100644 index 00000000000..8834ab66722 --- /dev/null +++ b/packages/simtester/src/simtester/runner.py @@ -0,0 +1,20 @@ +# stdlib +import asyncio + +# relative +from .actor import Actor + + +class Runner: + def __init__(self, actor_classes: list[tuple[type[Actor], int]]): + # `actor_classes` is a list of tuples (ActorClass, count) + self.actor_classes = actor_classes + + async def start(self): + tasks = [] + for actor_class, count in self.actor_classes: + for i in range(count): + # Instantiate the actor + actor = actor_class(name=f"{actor_class.__name__}-{i}") + tasks.append(asyncio.create_task(actor.run())) + await asyncio.gather(*tasks) diff --git a/tests/scenarios/bigquery/level_2_sim_test.py b/tests/scenarios/bigquery/level_2_sim_test.py new file mode 100644 index 00000000000..a1bb8838ae2 --- /dev/null +++ b/tests/scenarios/bigquery/level_2_sim_test.py @@ -0,0 +1,96 @@ +# third party +from simtester import Actor +from simtester import action + + +class DataScientist(Actor): + cooldown_period = (1, 5) + + def setup(self): + # TODO create a server, or connect to an existing one + # TODO register this test user on the server + self.logger.info(f"{self.name}: Setup complete") + + def teardown(self): + # TODO In case of live server, delete the user and perform any cleanup + self.logger.info(f"{self.name}: Teardown complete") + + @action + async def whoami(self): + self.logger.info(f"Actor {self.name} is running whoami") + # self.client.account() + + @action + async def guest_register(self): + # TODO register a guest user + self.logger.info(f"Actor {self.name} is running guest_register") + + @action + async def query_mock(self): + # TODO run a query on bigquery.test_query + self.logger.info(f"Actor {self.name} is running query_mock") + + @action + async def submit_query(self): + # TODO submit a query to bigquery.submit_query + self.logger.info(f"Actor {self.name} is running submit_query") + + @action + async def get_query_results(self): + # TODO get the results of a query using get_results + self.logger.info(f"Actor {self.name} is running get_query_results") + + +class Admin(Actor): + cooldown_period = (1, 5) + + def setup(self): + # TODO create a server, or connect to an existing one + # server = make_server() + # admin = make_admin() + # self.root_client = admin.client(server) + # create_prebuilt_worker_image + # get_prebuilt_worker_image + # add_external_registry + self.logger.info(f"{self.name}: Setup complete") + + def teardown(self): + self.logger.info(f"{self.name}: Teardown complete") + + @action + async def approve_pending_requests(self): + self.logger.info(f"Actor {self.name} is running approve_pending_requests") + + @action + async def reject_pending_requests(self): + self.logger.info(f"Actor {self.name} is running reject_pending_requests") + + @action + async def create_worker_pool(self): + self.logger.info(f"Actor {self.name} is running create_worker_pool") + + @action + async def remove_worker_pool(self): + self.logger.info(f"Actor {self.name} is running remove_worker_pool") + + @action + async def disallow_guest_signup(self): + self.logger.info(f"Actor {self.name} is running disallow_guest_signup") + + @action + async def allow_guest_signup(self): + self.logger.info(f"Actor {self.name} is running allow_guest_signup") + + @action + async def create_endpoints(self): + # create_endpoints_query + # create_endpoints_schema + # create_endpoints_submit_query + self.logger.info(f"Actor {self.name} is running create_endpoints_query") + + @action + async def remove_endpoints(self): + # remove_endpoints_query + # remove_endpoints_schema + # remove_endpoints_submit_query + self.logger.info(f"Actor {self.name} is running remove_endpoints_query")