From d2347df5dc47f57070a993ca8cba061548924630 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 24 Jul 2023 19:28:05 -0500 Subject: [PATCH 1/8] Initial changes to contributor breadth Signed-off-by: Andrew Brain --- .../contributor_breadth_worker.py | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py index f8b6a9a585..abf5646be8 100644 --- a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py +++ b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py @@ -4,7 +4,7 @@ import sqlalchemy as s from augur.tasks.init.celery_app import celery_app as celery -from augur.application.db.session import DatabaseSession +from augur.tasks.github.util.github_task_session import GithubTaskManifest from augur.tasks.github.util.github_paginator import GithubPaginator from augur.application.db.models import ContributorRepo @@ -65,12 +65,7 @@ def contributor_breadth_model() -> None: #### Available for duplicate checking ######################################################## - action_map = { - 'insert': { - 'source': ['id'], - 'augur': ['event_id'] - } - } + # Eliminate any duplicate event_ids from what will be inserted # Because of Bulk Insert @@ -80,44 +75,51 @@ def contributor_breadth_model() -> None: # Query for existing event ids to avoid duplication ######################################################## - dup_query = s.sql.text(""" - SELECT DISTINCT event_id - FROM augur_data.contributor_repo - WHERE 1 = 1 - """) + # dup_query = s.sql.text(""" + # SELECT DISTINCT event_id + # FROM augur_data.contributor_repo + # WHERE 1 = 1 + # """) - current_event_ids = json.loads(pd.read_sql(dup_query, engine, params={}).to_json(orient="records")) + #current_event_ids = json.loads(pd.read_sql(dup_query, engine, params={}).to_json(orient="records")) #Convert list of dictionaries to regular list of 'event_ids'. #The only values that the sql query returns are event_ids so #it makes no sense to be a list of many dicts of one key. - current_event_ids = [value for elem in current_event_ids for value in elem.values()] + #current_event_ids = [value for elem in current_event_ids for value in elem.values()] + + #logger.info(f"current event ids are: {current_event_ids}") - logger.info(f"current event ids are: {current_event_ids}") + with GithubTaskManifest(logger) as manifest: - for cntrb in current_cntrb_logins: + for cntrb in current_cntrb_logins: - repo_cntrb_url = f"https://api.github.com/users/{cntrb['gh_login']}/events" - # source_cntrb_repos seemed like not exactly what the variable is for; its a list of actions for - # each Github gh_login value already in our database + repo_cntrb_url = f"https://api.github.com/users/{cntrb['gh_login']}/events" + # source_cntrb_repos seemed like not exactly what the variable is for; its a list of actions for + # each Github gh_login value already in our database - with DatabaseSession(logger, engine) as session: cntrb_events = [] - for page_data, page in GithubPaginator(repo_cntrb_url, session.oauths, logger).iter_pages(): + for page_data, page in GithubPaginator(repo_cntrb_url, manifest.key_auth, logger).iter_pages(): if page_data: cntrb_events += page_data - process_contributor_events(cntrb, cntrb_events, current_event_ids, logger) + if not len(cntrb_events) == 0: + continue + + events = process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source) + + natural_keys = ["event_id"] + manifest.augur_db.insert_data(events, ContributorRepo, natural_keys) # source_cntrb_events = self.paginate_endpoint(repo_cntrb_url, action_map=action_map, # table=self.contributor_repo_table) -def process_contributor_events(cntrb, cntrb_events, current_event_ids, logger): +def process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source): if not cntrb_events: logger.info("There are no events, or new events for this user.\n") - return + return [] ## current_event_ids are the ones ALREADY IN THE AUGUR DB. SKIP THOSE. ## source_cntrb_events are the ones the API pulls. @@ -125,8 +127,8 @@ def process_contributor_events(cntrb, cntrb_events, current_event_ids, logger): for event_id_api in cntrb_events: logger.info(f"Keys of event_id_api: {event_id_api.keys()}") #logger.info(f"Keys of current_event_ids: {current_event_ids.keys()}") - if int(event_id_api['id']) in current_event_ids: - continue + # if int(event_id_api['id']) in current_event_ids: + # continue cntrb_repos_insert.append({ @@ -138,12 +140,11 @@ def process_contributor_events(cntrb, cntrb_events, current_event_ids, logger): "repo_name": event_id_api['repo']['name'], "gh_repo_id": event_id_api['repo']['id'], "cntrb_category": event_id_api['type'], - "event_id": event_id_api['id'], + "event_id": int(event_id_api['id']), "created_at": event_id_api['created_at'] }) - # else: # # Print the message if the value does not exist # logger.info(f"event_id is found in JSON data {current_event_ids[event_id]}.") @@ -155,6 +156,9 @@ def process_contributor_events(cntrb, cntrb_events, current_event_ids, logger): #cntrb_repos_insert = [] #cntrb_ids_idx = pd.Index(cntrb_ids, name=contributors) - cntrb_repo_insert_result, cntrb_repo_update_result = self.bulk_insert(self.contributor_repo_table, - unique_columns='event_id', insert=cntrb_repos_insert) + # cntrb_repo_insert_result, cntrb_repo_update_result = self.bulk_insert(self.contributor_repo_table, + # unique_columns='event_id', insert=cntrb_repos_insert) + + + From 6ac8d04401c98ef5a44724d035b995b3f817c72a Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 24 Jul 2023 21:34:08 -0500 Subject: [PATCH 2/8] Implement more contributor breadth functionality Signed-off-by: Andrew Brain --- .../contributor_breadth_worker.py | 27 +++++++++++-------- augur/tasks/init/celery_app.py | 6 +++-- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py index abf5646be8..dfa4e96e33 100644 --- a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py +++ b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py @@ -44,14 +44,15 @@ def contributor_breadth_model() -> None: WHERE gh_login IS NOT NULL """) - - current_cntrb_logins = json.loads(pd.read_sql(cntrb_login_query, engine, params={}).to_json(orient="records")) + result = engine.execute(cntrb_login_query) + + current_cntrb_logins = [dict(row) for row in result] ## We need a list of all contributors so we can iterate through them to gather events ## We need a list of event ids to avoid insertion of duplicate events. We ignore the event ## If it already exists - logger.info(f"Contributor Logins are: {current_cntrb_logins}") + # logger.info(f"Contributor Logins are: {current_cntrb_logins}") ######################################################## #### List of existing contributor ids and their corresponding gh_login @@ -92,8 +93,13 @@ def contributor_breadth_model() -> None: with GithubTaskManifest(logger) as manifest: + index = 1 + total = len(current_cntrb_logins) for cntrb in current_cntrb_logins: + print(f"Processing cntrb {index} of {total}") + index += 1 + repo_cntrb_url = f"https://api.github.com/users/{cntrb['gh_login']}/events" # source_cntrb_repos seemed like not exactly what the variable is for; its a list of actions for # each Github gh_login value already in our database @@ -104,28 +110,25 @@ def contributor_breadth_model() -> None: if page_data: cntrb_events += page_data - if not len(cntrb_events) == 0: + if len(cntrb_events) == 0: + logger.info("There are no events, or new events for this user.\n") continue events = process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source) - natural_keys = ["event_id"] - manifest.augur_db.insert_data(events, ContributorRepo, natural_keys) + logger.info(f"Inserting {len(events)} events") + natural_keys = ["event_id", "tool_version"] + manifest.augur_db.insert_data(events, ContributorRepo, natural_keys) # source_cntrb_events = self.paginate_endpoint(repo_cntrb_url, action_map=action_map, # table=self.contributor_repo_table) def process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source): - if not cntrb_events: - logger.info("There are no events, or new events for this user.\n") - return [] - ## current_event_ids are the ones ALREADY IN THE AUGUR DB. SKIP THOSE. ## source_cntrb_events are the ones the API pulls. cntrb_repos_insert = [] for event_id_api in cntrb_events: - logger.info(f"Keys of event_id_api: {event_id_api.keys()}") #logger.info(f"Keys of current_event_ids: {current_event_ids.keys()}") # if int(event_id_api['id']) in current_event_ids: # continue @@ -144,6 +147,8 @@ def process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_ve "created_at": event_id_api['created_at'] }) + return cntrb_repos_insert + # else: # # Print the message if the value does not exist diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 69abc837aa..666146126b 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -58,7 +58,8 @@ class CollectionState(Enum): 'augur.tasks.data_analysis.clustering_worker.tasks', 'augur.tasks.data_analysis.discourse_analysis.tasks', 'augur.tasks.data_analysis.pull_request_analysis_worker.tasks', - 'augur.tasks.data_analysis.insight_worker.tasks'] + 'augur.tasks.data_analysis.insight_worker.tasks', + 'augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker'] materialized_view_tasks = ['augur.tasks.db.refresh_materialized_views'] @@ -139,7 +140,8 @@ def on_failure(self,exc,task_id,args,kwargs,einfo): 'augur.tasks.git.dependency_tasks.tasks.process_ossf_dependency_metrics': {'queue': 'secondary'}, 'augur.tasks.git.dependency_tasks.tasks.process_dependency_metrics': {'queue': 'facade'}, 'augur.tasks.git.dependency_libyear_tasks.tasks.process_libyear_dependency_metrics': {'queue': 'facade'}, - 'augur.tasks.frontend.*': {'queue': 'frontend'} + 'augur.tasks.frontend.*': {'queue': 'frontend'}, + 'augur.tasks.data_analysis.contributor_breadth_worker.*': {'queue': 'secondary'}, } #Setting to be able to see more detailed states of running tasks From de6178b8f440eebc98e20965661e17719e57f53f Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Tue, 25 Jul 2023 06:54:36 -0500 Subject: [PATCH 3/8] Schedule contributor breadth every 30 days Signed-off-by: Andrew Brain --- augur/application/cli/backend.py | 2 +- augur/tasks/init/celery_app.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index aaede20d0a..e94b7bb737 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -85,7 +85,7 @@ def start(disable_collection, development, port): logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}') processes = start_celery_worker_processes(float(worker_vmem_cap), disable_collection) - time.sleep(5) + time.sleep(10) if os.path.exists("celerybeat-schedule.db"): logger.info("Deleting old task schedule") os.remove("celerybeat-schedule.db") diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 666146126b..ac6e18fc64 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -206,6 +206,7 @@ def setup_periodic_tasks(sender, **kwargs): from augur.tasks.start_tasks import non_repo_domain_tasks from augur.tasks.git.facade_tasks import clone_repos from augur.tasks.db.refresh_materialized_views import refresh_materialized_views + from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model with DatabaseEngine() as engine, DatabaseSession(logger, engine) as session: @@ -227,6 +228,10 @@ def setup_periodic_tasks(sender, **kwargs): logger.info(f"Scheduling update of collection weights on midnight each day") sender.add_periodic_task(crontab(hour=0, minute=0),augur_collection_update_weights.s()) + logger.info(f"Scheduling contributor breadth every 30 days") + thirty_days_in_seconds = 30*24*60*60 + sender.add_periodic_task(thirty_days_in_seconds, contributor_breadth_model.s()) + @after_setup_logger.connect def setup_loggers(*args,**kwargs): From 844177e3306ee289f23abfab66eefedb7b4827d0 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Tue, 25 Jul 2023 06:54:04 -0500 Subject: [PATCH 4/8] Schedule contributor breadth every 30 days Signed-off-by: Andrew Brain --- augur/application/cli/backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index e94b7bb737..4cc01c0cb4 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -105,9 +105,9 @@ def start(disable_collection, development, port): time.sleep(3) # start cloning repos when augur starts - clone_repos.si().apply_async() + #clone_repos.si().apply_async() - augur_collection_monitor.si().apply_async() + #augur_collection_monitor.si().apply_async() else: logger.info("Collection disabled") From 802a7475f5916521db3ccd5765e10548603ebd7f Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Tue, 25 Jul 2023 07:54:49 -0500 Subject: [PATCH 5/8] Clean up/ optimize collection by skipping existing events Signed-off-by: Andrew Brain --- augur/application/cli/backend.py | 3 + .../contributor_breadth_worker.py | 109 +++++------------- 2 files changed, 31 insertions(+), 81 deletions(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 4cc01c0cb4..22f25a2e6d 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -21,6 +21,7 @@ from augur import instance_id from augur.tasks.start_tasks import augur_collection_monitor, CollectionState, create_collection_status_records from augur.tasks.git.facade_tasks import clone_repos +from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model from augur.tasks.init.redis_connection import redis_connection from augur.application.db.models import Repo, CollectionStatus, UserRepo from augur.application.db.session import DatabaseSession @@ -104,6 +105,8 @@ def start(disable_collection, development, port): create_collection_status_records.si().apply_async() time.sleep(3) + contributor_breadth_model.si().apply_async() + # start cloning repos when augur starts #clone_repos.si().apply_async() diff --git a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py index dfa4e96e33..062411d075 100644 --- a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py +++ b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py @@ -2,6 +2,7 @@ import logging, json import pandas as pd import sqlalchemy as s +from datetime import datetime from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.github.util.github_task_session import GithubTaskManifest @@ -14,13 +15,6 @@ ### Logic: For each unique platform contributor, gather non duplicate events, using the GitHub "id" ### for the event API (GitLab coming!) - -###################### -# -# IN PROGRESS -# -###################### - @celery.task def contributor_breadth_model() -> None: @@ -33,11 +27,6 @@ def contributor_breadth_model() -> None: data_source = 'GitHub API' - ## Get all the contributors currently in the database - #!/usr/bin/env python3 - - #cntrb_key = gh_login - cntrb_login_query = s.sql.text(""" SELECT DISTINCT gh_login, cntrb_id FROM augur_data.contributors @@ -48,48 +37,25 @@ def contributor_breadth_model() -> None: current_cntrb_logins = [dict(row) for row in result] - ## We need a list of all contributors so we can iterate through them to gather events - ## We need a list of event ids to avoid insertion of duplicate events. We ignore the event - ## If it already exists - - # logger.info(f"Contributor Logins are: {current_cntrb_logins}") - - ######################################################## - #### List of existing contributor ids and their corresponding gh_login - #### is contained in the `current_cntrb_logins` variable - ######################################################## + cntrb_newest_events_query = s.sql.text(""" + SELECT c.gh_login, MAX(cr.created_at) as newest_event_date + FROM contributor_repo AS cr + JOIN contributors AS c ON cr.cntrb_id = c.cntrb_id + GROUP BY c.gh_login; + """) - ######################################################## - #### Define the action map for events to avoid duplicates - #### Query event_ids so a list of existing events are - #### Available for duplicate checking - ######################################################## - - - - # Eliminate any duplicate event_ids from what will be inserted - # Because of Bulk Insert - # keyVal = event_id - - ######################################################## - # Query for existing event ids to avoid duplication - ######################################################## - - # dup_query = s.sql.text(""" - # SELECT DISTINCT event_id - # FROM augur_data.contributor_repo - # WHERE 1 = 1 - # """) + cntrb_newest_events_list = engine.execute(cntrb_newest_events_query) + cntrb_newest_events_list = [dict(row) for row in cntrb_newest_events_list] - #current_event_ids = json.loads(pd.read_sql(dup_query, engine, params={}).to_json(orient="records")) + cntrb_newest_events_map = {} + for cntrb_event in cntrb_newest_events_list: - #Convert list of dictionaries to regular list of 'event_ids'. - #The only values that the sql query returns are event_ids so - #it makes no sense to be a list of many dicts of one key. - #current_event_ids = [value for elem in current_event_ids for value in elem.values()] + gh_login = cntrb_event["gh_login"] + newest_event_date = cntrb_event["newest_event_date"] + + cntrb_newest_events_map[gh_login] = newest_event_date - #logger.info(f"current event ids are: {current_event_ids}") with GithubTaskManifest(logger) as manifest: @@ -101,38 +67,38 @@ def contributor_breadth_model() -> None: index += 1 repo_cntrb_url = f"https://api.github.com/users/{cntrb['gh_login']}/events" - # source_cntrb_repos seemed like not exactly what the variable is for; its a list of actions for - # each Github gh_login value already in our database + + newest_event_in_db = "1970" + if cntrb["gh_login"] in cntrb_newest_events_map: + newest_event_in_db = cntrb_newest_events_map[cntrb["gh_login"]] + cntrb_events = [] for page_data, page in GithubPaginator(repo_cntrb_url, manifest.key_auth, logger).iter_pages(): - if page_data: + if page_data: cntrb_events += page_data + oldest_event_on_page = datetime.strptime(page_data[-1]["created_at"], "%Y-%m-%dT%H:%M:%SZ") + if oldest_event_on_page < newest_event_in_db: + print("Found cntrb events we already have...skipping the rest") + break + if len(cntrb_events) == 0: - logger.info("There are no events, or new events for this user.\n") + logger.info("There are no cntrb events, or new events for this user.\n") continue events = process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source) logger.info(f"Inserting {len(events)} events") natural_keys = ["event_id", "tool_version"] - manifest.augur_db.insert_data(events, ContributorRepo, natural_keys) + manifest.augur_db.insert_data(events, ContributorRepo, natural_keys) - # source_cntrb_events = self.paginate_endpoint(repo_cntrb_url, action_map=action_map, - # table=self.contributor_repo_table) def process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source): - ## current_event_ids are the ones ALREADY IN THE AUGUR DB. SKIP THOSE. - ## source_cntrb_events are the ones the API pulls. cntrb_repos_insert = [] for event_id_api in cntrb_events: - #logger.info(f"Keys of current_event_ids: {current_event_ids.keys()}") - # if int(event_id_api['id']) in current_event_ids: - # continue - cntrb_repos_insert.append({ "cntrb_id": cntrb['cntrb_id'], @@ -148,22 +114,3 @@ def process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_ve }) return cntrb_repos_insert - - - # else: - # # Print the message if the value does not exist - # logger.info(f"event_id is found in JSON data {current_event_ids[event_id]}.") - - ######################################################## - # Do the Inserts - ######################################################## - - #cntrb_repos_insert = [] - #cntrb_ids_idx = pd.Index(cntrb_ids, name=contributors) - - # cntrb_repo_insert_result, cntrb_repo_update_result = self.bulk_insert(self.contributor_repo_table, - # unique_columns='event_id', insert=cntrb_repos_insert) - - - - From e4f40c4f12157ef5c011db5d3c4015e2028d8c24 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 31 Jul 2023 17:35:10 -0500 Subject: [PATCH 6/8] Uncomment Signed-off-by: Andrew Brain --- augur/application/cli/backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 22f25a2e6d..f0ab7e3a94 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -108,9 +108,9 @@ def start(disable_collection, development, port): contributor_breadth_model.si().apply_async() # start cloning repos when augur starts - #clone_repos.si().apply_async() + clone_repos.si().apply_async() - #augur_collection_monitor.si().apply_async() + augur_collection_monitor.si().apply_async() else: logger.info("Collection disabled") From 81d7a468f18ea61ad273b562c1e081c305fdfcfe Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 31 Jul 2023 17:44:51 -0500 Subject: [PATCH 7/8] Update time to datetime Signed-off-by: Andrew Brain --- .../contributor_breadth_worker/contributor_breadth_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py index 062411d075..44a6761cf9 100644 --- a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py +++ b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py @@ -68,7 +68,7 @@ def contributor_breadth_model() -> None: repo_cntrb_url = f"https://api.github.com/users/{cntrb['gh_login']}/events" - newest_event_in_db = "1970" + newest_event_in_db = datetime(1970, 1, 1) if cntrb["gh_login"] in cntrb_newest_events_map: newest_event_in_db = cntrb_newest_events_map[cntrb["gh_login"]] From 44683a99d5c2192ad8018e1d597f35ce4cc8d795 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 31 Jul 2023 18:41:03 -0500 Subject: [PATCH 8/8] Add sleeps to ensure workers are started before starting tasks Signed-off-by: Andrew Brain --- augur/application/cli/backend.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index f0ab7e3a94..548c1eeff4 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -86,7 +86,7 @@ def start(disable_collection, development, port): logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}') processes = start_celery_worker_processes(float(worker_vmem_cap), disable_collection) - time.sleep(10) + if os.path.exists("celerybeat-schedule.db"): logger.info("Deleting old task schedule") os.remove("celerybeat-schedule.db") @@ -150,6 +150,7 @@ def start_celery_worker_processes(vmem_cap_ratio, disable_collection=False): available_memory_in_bytes = psutil.virtual_memory().total * vmem_cap_ratio available_memory_in_megabytes = available_memory_in_bytes / (1024 ** 2) max_process_estimate = available_memory_in_megabytes // 500 + sleep_time = 0 #Get a subset of the maximum procesess available using a ratio, not exceeding a maximum value def determine_worker_processes(ratio,maximum): @@ -158,6 +159,7 @@ def determine_worker_processes(ratio,maximum): frontend_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=1 -n frontend:{uuid.uuid4().hex}@%h -Q frontend" max_process_estimate -= 1 process_list.append(subprocess.Popen(frontend_worker.split(" "))) + sleep_time += 6 if not disable_collection: @@ -165,18 +167,21 @@ def determine_worker_processes(ratio,maximum): scheduling_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=2 -n scheduling:{uuid.uuid4().hex}@%h -Q scheduling" max_process_estimate -= 2 process_list.append(subprocess.Popen(scheduling_worker.split(" "))) + sleep_time += 6 #60% of estimate, Maximum value of 45 core_num_processes = determine_worker_processes(.6, 45) logger.info(f"Starting core worker processes with concurrency={core_num_processes}") core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={core_num_processes} -n core:{uuid.uuid4().hex}@%h" process_list.append(subprocess.Popen(core_worker.split(" "))) + sleep_time += 6 #20% of estimate, Maximum value of 25 secondary_num_processes = determine_worker_processes(.2, 25) logger.info(f"Starting secondary worker processes with concurrency={secondary_num_processes}") secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={secondary_num_processes} -n secondary:{uuid.uuid4().hex}@%h -Q secondary" process_list.append(subprocess.Popen(secondary_worker.split(" "))) + sleep_time += 6 #15% of estimate, Maximum value of 20 facade_num_processes = determine_worker_processes(.2, 20) @@ -184,6 +189,9 @@ def determine_worker_processes(ratio,maximum): facade_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={facade_num_processes} -n facade:{uuid.uuid4().hex}@%h -Q facade" process_list.append(subprocess.Popen(facade_worker.split(" "))) + sleep_time += 6 + + time.sleep(sleep_time) return process_list