Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] POC for actor-based simulation testing framework #9302

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/simtester/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# third party
from setuptools import find_packages
from setuptools import setup

__version__ = "0.1.0"

if __name__ == "__main__":
setup(
name="simtester",
version=__version__,
packages=find_packages(where="src"),
package_dir={"": "src"},
entry_points={
"console_scripts": [
"simtester=simtester.__main__:main", # Exposes the command 'simtester'
],
},
)
3 changes: 3 additions & 0 deletions packages/simtester/src/simtester/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# relative
from .actor import Actor
from .actor import action
76 changes: 76 additions & 0 deletions packages/simtester/src/simtester/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# stdlib
import argparse
import asyncio
import importlib.util
import os

# relative
from .actor import Actor
from .runner import Runner


def load_actor_class(module_path, class_name):
spec = importlib.util.spec_from_file_location("actor_module", module_path)
actor_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(actor_module)

# Find the specific class by name
obj = getattr(actor_module, class_name, None)
if obj is None:
raise ImportError(f"Class `{class_name}` not found in `{module_path}`.")
if not isinstance(obj, type) or not issubclass(obj, Actor):
raise TypeError(f"`{class_name}` is not a valid subclass of `Actor`.")
return obj


def parse_actor_args(args):
actor_data = []
for i in range(
0, len(args), 2
): # Iterate over the pairs of `module_path::class_name` and `count`
path_class_pair = args[i]
count = int(args[i + 1])

if "::" not in path_class_pair:
raise ValueError(
f"Invalid format for actor class specification: {path_class_pair}"
)

module_path, class_name = path_class_pair.split("::")

# Resolve the absolute module path
module_path = os.path.abspath(module_path)

actor_data.append((module_path, class_name, count))
return actor_data


def main():
parser = argparse.ArgumentParser(description="Run the simulation tests")
parser.add_argument(
"actor_args",
nargs="+",
help="Actor class specifications in the format path/to/file.py::Class count.\n"
"Example usage: `bigquery/level_0_simtest.py::DataScientist 5 bigquery/level_0_simtest.py::Admin 2`.\n"
"This will spawn 5 DataScientist actors and 2 Admin actors.",
)
args = parser.parse_args()

actor_specs = parse_actor_args(args.actor_args)

actor_classes = []
for module_path, class_name, count in actor_specs:
try:
actor_class = load_actor_class(module_path, class_name)
except (ImportError, TypeError) as e:
print(e)
return
actor_classes.append((actor_class, count))

# Run the simulation with multiple actor classes
runner = Runner(actor_classes)
asyncio.run(runner.start())


if __name__ == "__main__":
main()
49 changes: 49 additions & 0 deletions packages/simtester/src/simtester/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# stdlib
import asyncio
import logging
import random


class Actor:
cooldown_period: int = 1

def __init__(self, name):
self.name = name
self.logger = logging.getLogger(self.name)
self.logger.setLevel(logging.INFO) # Set default logging level
handler = logging.StreamHandler() # Log to stdout
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)

self.actions = [
method
for method in dir(self)
if callable(getattr(self, method))
and getattr(getattr(self, method), "is_action", False)
]
# Call setup if defined
if hasattr(self, "setup") and callable(self.setup):
self.setup()

async def run(self):
try:
while True:
action = random.choice(self.actions)
await getattr(self, action)()
if isinstance(self.cooldown_period, int):
cooldown = self.cooldown_period
elif isinstance(self.cooldown_period, tuple):
cooldown = random.randint(*self.cooldown_period)
await asyncio.sleep(cooldown)
finally:
# Call teardown if defined
if hasattr(self, "teardown") and callable(self.teardown):
self.teardown()


def action(func):
func.is_action = True
return func
20 changes: 20 additions & 0 deletions packages/simtester/src/simtester/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# stdlib
import asyncio

# relative
from .actor import Actor


class Runner:
def __init__(self, actor_classes: list[tuple[type[Actor], int]]):
# `actor_classes` is a list of tuples (ActorClass, count)
self.actor_classes = actor_classes

async def start(self):
tasks = []
for actor_class, count in self.actor_classes:
for i in range(count):
# Instantiate the actor
actor = actor_class(name=f"{actor_class.__name__}-{i}")
tasks.append(asyncio.create_task(actor.run()))
await asyncio.gather(*tasks)
96 changes: 96 additions & 0 deletions tests/scenarios/bigquery/level_2_sim_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# third party
from simtester import Actor
from simtester import action


class DataScientist(Actor):
cooldown_period = (1, 5)

def setup(self):
# TODO create a server, or connect to an existing one
# TODO register this test user on the server
self.logger.info(f"{self.name}: Setup complete")

def teardown(self):
# TODO In case of live server, delete the user and perform any cleanup
self.logger.info(f"{self.name}: Teardown complete")

@action
async def whoami(self):
self.logger.info(f"Actor {self.name} is running whoami")
# self.client.account()

@action
async def guest_register(self):
# TODO register a guest user
self.logger.info(f"Actor {self.name} is running guest_register")

@action
async def query_mock(self):
# TODO run a query on bigquery.test_query
self.logger.info(f"Actor {self.name} is running query_mock")

@action
async def submit_query(self):
# TODO submit a query to bigquery.submit_query
self.logger.info(f"Actor {self.name} is running submit_query")

@action
async def get_query_results(self):
# TODO get the results of a query using get_results
self.logger.info(f"Actor {self.name} is running get_query_results")


class Admin(Actor):
cooldown_period = (1, 5)

def setup(self):
# TODO create a server, or connect to an existing one
# server = make_server()
# admin = make_admin()
# self.root_client = admin.client(server)
# create_prebuilt_worker_image
# get_prebuilt_worker_image
# add_external_registry
self.logger.info(f"{self.name}: Setup complete")

def teardown(self):
self.logger.info(f"{self.name}: Teardown complete")

@action
async def approve_pending_requests(self):
self.logger.info(f"Actor {self.name} is running approve_pending_requests")

@action
async def reject_pending_requests(self):
self.logger.info(f"Actor {self.name} is running reject_pending_requests")

@action
async def create_worker_pool(self):
self.logger.info(f"Actor {self.name} is running create_worker_pool")

@action
async def remove_worker_pool(self):
self.logger.info(f"Actor {self.name} is running remove_worker_pool")

@action
async def disallow_guest_signup(self):
self.logger.info(f"Actor {self.name} is running disallow_guest_signup")

@action
async def allow_guest_signup(self):
self.logger.info(f"Actor {self.name} is running allow_guest_signup")

@action
async def create_endpoints(self):
# create_endpoints_query
# create_endpoints_schema
# create_endpoints_submit_query
self.logger.info(f"Actor {self.name} is running create_endpoints_query")

@action
async def remove_endpoints(self):
# remove_endpoints_query
# remove_endpoints_schema
# remove_endpoints_submit_query
self.logger.info(f"Actor {self.name} is running remove_endpoints_query")
Loading