Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add RPS based load option #65

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

dagrayvid
Copy link
Collaborator

No description provided.

@@ -59,17 +63,48 @@ def _init_user_process_logging(self):
self.logger = logging.getLogger("user")
return logging.getLogger("user")

def _user_loop(self, test_end_time):
while self.stop_q.empty():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought for the future. What if we have the main process SIGTERM (or SIGUSR1) the subprocesses as a stop message and write a custom signal handler to clean up?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah totally, that's long overdue. I was looking into it and should work on it sometime.

load_test.py Outdated
@@ -89,48 +119,50 @@ def main(args):
log_reader_thread = logging_utils.init_logging(args.log_level, logger_q)

# Create processes and their Users
schedule_q = mp_ctx.Queue(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
schedule_q = mp_ctx.Queue(1)
schedule_q = mp_ctx.Queue(1)
schedule_q.cancel_join_thread()

Toggle cancel_join_thread() here to avoid the queue blocking on exit.

for query in dataset.get_next_n_queries(2 * concurrency):
dataset_q.put(query)
request_q.put((None, query))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a clarity perspective I think it would be better to have this be a dict or object. E.g.

Suggested change
request_q.put((None, query))
request_q.put(dict(query=query, req_time=None))

or set a field on the query dict.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a field in the query dict is much more elegant!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like the idea of making it a field in the query dict.

Comment on lines +37 to +38

return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop this return?

Suggested change
return

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't but I wonder if adding a dedicated try-catch exception block in this function worth it. We currently catch all the cascade exceptions with the generic Exception class in the main function but it's probably not the cleanest way to handle the exception IMO.

Not suggesting this should be addressed in this PR but a follow-up PR to cleanup our exception handling might be good.

Copy link
Collaborator

@npalaska npalaska left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor nits and comments but this looks ready to go.

max_sequence_tokens: 2048
load_options:
type: constant #Future options: loadgen, stair-step
concurrency: 1
type: rps #Options: concurrency, rps, loadgen, stair-step
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you replaced the word constant load type with concurrency? imo, constant sounds more closer to Constant Load which is a Continuous stream of requests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that constant is ambiguous, as RPS can also be constant. My other thinking is that we might later add dynamically changing RPS or dynamically changing concurrency so either RPS or concurrency could be constant or dynamic.

Comment on lines +37 to +38

return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't but I wonder if adding a dedicated try-catch exception block in this function worth it. We currently catch all the cascade exceptions with the generic Exception class in the main function but it's probably not the cleanest way to handle the exception IMO.

Not suggesting this should be addressed in this PR but a follow-up PR to cleanup our exception handling might be good.


def main_loop_concurrency_mode(dataset, request_q, start_time, end_time):
"""Let all users send requests repeatedly until end_time"""
logging.info("Test from main process")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this logging statement here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'll remove this thanks!

for query in dataset.get_next_n_queries(2 * concurrency):
dataset_q.put(query)
request_q.put((None, query))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like the idea of making it a field in the query dict.

@@ -79,7 +79,6 @@ def request_grpc(self, query, user_id, test_end_time: float=0):
result.output_tokens_before_timeout = result.output_tokens
result.output_text = response

result.calculate_results()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if its time to depreciate the caikit_client_plugin?

@@ -35,7 +35,6 @@ def request_http(self, query, user_id, test_end_time: float=0):

result.end_time = time.time()

result.calculate_results()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we do a cleanup we probably should remove this file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was originally added with the thought that it could be used in some test cases but we may want to remove it depending on how we decide to handle testing (unit tests, e2e tests, etc...)

@@ -59,17 +63,48 @@ def _init_user_process_logging(self):
self.logger = logging.getLogger("user")
return logging.getLogger("user")

def _user_loop(self, test_end_time):
while self.stop_q.empty():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah totally, that's long overdue. I was looking into it and should work on it sometime.

except queue.Empty:
# if timeout passes, queue.Empty will be thrown
# User should check if stop_q has been set, else poll again
# self.debug.info("User waiting for a request to be scheduled")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be uncommented?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants