Skip to content

Commit

Permalink
Merge pull request #2473 from chaoss/contributor-breadth-conversion
Browse files Browse the repository at this point in the history
Contributor breadth conversion
  • Loading branch information
sgoggins authored Aug 1, 2023
2 parents 73c8140 + 44683a9 commit 34b847c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 94 deletions.
13 changes: 12 additions & 1 deletion augur/application/cli/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from augur import instance_id
from augur.tasks.start_tasks import augur_collection_monitor, CollectionState, create_collection_status_records
from augur.tasks.git.facade_tasks import clone_repos
from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model
from augur.tasks.init.redis_connection import redis_connection
from augur.application.db.models import Repo, CollectionStatus, UserRepo
from augur.application.db.session import DatabaseSession
Expand Down Expand Up @@ -85,7 +86,7 @@ def start(disable_collection, development, port):
logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}')

processes = start_celery_worker_processes(float(worker_vmem_cap), disable_collection)
time.sleep(5)

if os.path.exists("celerybeat-schedule.db"):
logger.info("Deleting old task schedule")
os.remove("celerybeat-schedule.db")
Expand All @@ -104,6 +105,8 @@ def start(disable_collection, development, port):
create_collection_status_records.si().apply_async()
time.sleep(3)

contributor_breadth_model.si().apply_async()

# start cloning repos when augur starts
clone_repos.si().apply_async()

Expand Down Expand Up @@ -147,6 +150,7 @@ def start_celery_worker_processes(vmem_cap_ratio, disable_collection=False):
available_memory_in_bytes = psutil.virtual_memory().total * vmem_cap_ratio
available_memory_in_megabytes = available_memory_in_bytes / (1024 ** 2)
max_process_estimate = available_memory_in_megabytes // 500
sleep_time = 0

#Get a subset of the maximum procesess available using a ratio, not exceeding a maximum value
def determine_worker_processes(ratio,maximum):
Expand All @@ -155,32 +159,39 @@ def determine_worker_processes(ratio,maximum):
frontend_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=1 -n frontend:{uuid.uuid4().hex}@%h -Q frontend"
max_process_estimate -= 1
process_list.append(subprocess.Popen(frontend_worker.split(" ")))
sleep_time += 6

if not disable_collection:

#2 processes are always reserved as a baseline.
scheduling_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=2 -n scheduling:{uuid.uuid4().hex}@%h -Q scheduling"
max_process_estimate -= 2
process_list.append(subprocess.Popen(scheduling_worker.split(" ")))
sleep_time += 6

#60% of estimate, Maximum value of 45
core_num_processes = determine_worker_processes(.6, 45)
logger.info(f"Starting core worker processes with concurrency={core_num_processes}")
core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={core_num_processes} -n core:{uuid.uuid4().hex}@%h"
process_list.append(subprocess.Popen(core_worker.split(" ")))
sleep_time += 6

#20% of estimate, Maximum value of 25
secondary_num_processes = determine_worker_processes(.2, 25)
logger.info(f"Starting secondary worker processes with concurrency={secondary_num_processes}")
secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={secondary_num_processes} -n secondary:{uuid.uuid4().hex}@%h -Q secondary"
process_list.append(subprocess.Popen(secondary_worker.split(" ")))
sleep_time += 6

#15% of estimate, Maximum value of 20
facade_num_processes = determine_worker_processes(.2, 20)
logger.info(f"Starting facade worker processes with concurrency={facade_num_processes}")
facade_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={facade_num_processes} -n facade:{uuid.uuid4().hex}@%h -Q facade"

process_list.append(subprocess.Popen(facade_worker.split(" ")))
sleep_time += 6

time.sleep(sleep_time)

return process_list

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import logging, json
import pandas as pd
import sqlalchemy as s
from datetime import datetime

from augur.tasks.init.celery_app import celery_app as celery
from augur.application.db.session import DatabaseSession
from augur.tasks.github.util.github_task_session import GithubTaskManifest
from augur.tasks.github.util.github_paginator import GithubPaginator
from augur.application.db.models import ContributorRepo

Expand All @@ -14,13 +15,6 @@
### Logic: For each unique platform contributor, gather non duplicate events, using the GitHub "id"
### for the event API (GitLab coming!)


######################
#
# IN PROGRESS
#
######################

@celery.task
def contributor_breadth_model() -> None:

Expand All @@ -33,101 +27,78 @@ def contributor_breadth_model() -> None:
data_source = 'GitHub API'


## Get all the contributors currently in the database
#!/usr/bin/env python3

#cntrb_key = gh_login

cntrb_login_query = s.sql.text("""
SELECT DISTINCT gh_login, cntrb_id
FROM augur_data.contributors
WHERE gh_login IS NOT NULL
""")


current_cntrb_logins = json.loads(pd.read_sql(cntrb_login_query, engine, params={}).to_json(orient="records"))

## We need a list of all contributors so we can iterate through them to gather events
## We need a list of event ids to avoid insertion of duplicate events. We ignore the event
## If it already exists
result = engine.execute(cntrb_login_query)

logger.info(f"Contributor Logins are: {current_cntrb_logins}")
current_cntrb_logins = [dict(row) for row in result]

########################################################
#### List of existing contributor ids and their corresponding gh_login
#### is contained in the `current_cntrb_logins` variable
########################################################

cntrb_newest_events_query = s.sql.text("""
SELECT c.gh_login, MAX(cr.created_at) as newest_event_date
FROM contributor_repo AS cr
JOIN contributors AS c ON cr.cntrb_id = c.cntrb_id
GROUP BY c.gh_login;
""")

########################################################
#### Define the action map for events to avoid duplicates
#### Query event_ids so a list of existing events are
#### Available for duplicate checking
########################################################

action_map = {
'insert': {
'source': ['id'],
'augur': ['event_id']
}
}
cntrb_newest_events_list = engine.execute(cntrb_newest_events_query)
cntrb_newest_events_list = [dict(row) for row in cntrb_newest_events_list]

# Eliminate any duplicate event_ids from what will be inserted
# Because of Bulk Insert
# keyVal = event_id
cntrb_newest_events_map = {}
for cntrb_event in cntrb_newest_events_list:

########################################################
# Query for existing event ids to avoid duplication
########################################################
gh_login = cntrb_event["gh_login"]
newest_event_date = cntrb_event["newest_event_date"]

cntrb_newest_events_map[gh_login] = newest_event_date

dup_query = s.sql.text("""
SELECT DISTINCT event_id
FROM augur_data.contributor_repo
WHERE 1 = 1
""")

current_event_ids = json.loads(pd.read_sql(dup_query, engine, params={}).to_json(orient="records"))
with GithubTaskManifest(logger) as manifest:

#Convert list of dictionaries to regular list of 'event_ids'.
#The only values that the sql query returns are event_ids so
#it makes no sense to be a list of many dicts of one key.
current_event_ids = [value for elem in current_event_ids for value in elem.values()]
index = 1
total = len(current_cntrb_logins)
for cntrb in current_cntrb_logins:

logger.info(f"current event ids are: {current_event_ids}")
print(f"Processing cntrb {index} of {total}")
index += 1

for cntrb in current_cntrb_logins:
repo_cntrb_url = f"https://api.github.com/users/{cntrb['gh_login']}/events"

repo_cntrb_url = f"https://api.github.com/users/{cntrb['gh_login']}/events"
# source_cntrb_repos seemed like not exactly what the variable is for; its a list of actions for
# each Github gh_login value already in our database
newest_event_in_db = datetime(1970, 1, 1)
if cntrb["gh_login"] in cntrb_newest_events_map:
newest_event_in_db = cntrb_newest_events_map[cntrb["gh_login"]]


with DatabaseSession(logger, engine) as session:
cntrb_events = []
for page_data, page in GithubPaginator(repo_cntrb_url, session.oauths, logger).iter_pages():
for page_data, page in GithubPaginator(repo_cntrb_url, manifest.key_auth, logger).iter_pages():

if page_data:
if page_data:
cntrb_events += page_data

process_contributor_events(cntrb, cntrb_events, current_event_ids, logger)
oldest_event_on_page = datetime.strptime(page_data[-1]["created_at"], "%Y-%m-%dT%H:%M:%SZ")
if oldest_event_on_page < newest_event_in_db:
print("Found cntrb events we already have...skipping the rest")
break

if len(cntrb_events) == 0:
logger.info("There are no cntrb events, or new events for this user.\n")
continue

# source_cntrb_events = self.paginate_endpoint(repo_cntrb_url, action_map=action_map,
# table=self.contributor_repo_table)
events = process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source)

def process_contributor_events(cntrb, cntrb_events, current_event_ids, logger):
logger.info(f"Inserting {len(events)} events")
natural_keys = ["event_id", "tool_version"]
manifest.augur_db.insert_data(events, ContributorRepo, natural_keys)

if not cntrb_events:
logger.info("There are no events, or new events for this user.\n")
return

## current_event_ids are the ones ALREADY IN THE AUGUR DB. SKIP THOSE.
## source_cntrb_events are the ones the API pulls.
def process_contributor_events(cntrb, cntrb_events, logger, tool_source, tool_version, data_source):

cntrb_repos_insert = []
for event_id_api in cntrb_events:
logger.info(f"Keys of event_id_api: {event_id_api.keys()}")
#logger.info(f"Keys of current_event_ids: {current_event_ids.keys()}")
if int(event_id_api['id']) in current_event_ids:
continue


cntrb_repos_insert.append({
"cntrb_id": cntrb['cntrb_id'],
Expand All @@ -138,23 +109,8 @@ def process_contributor_events(cntrb, cntrb_events, current_event_ids, logger):
"repo_name": event_id_api['repo']['name'],
"gh_repo_id": event_id_api['repo']['id'],
"cntrb_category": event_id_api['type'],
"event_id": event_id_api['id'],
"event_id": int(event_id_api['id']),
"created_at": event_id_api['created_at']
})



# else:
# # Print the message if the value does not exist
# logger.info(f"event_id is found in JSON data {current_event_ids[event_id]}.")

########################################################
# Do the Inserts
########################################################

#cntrb_repos_insert = []
#cntrb_ids_idx = pd.Index(cntrb_ids, name=contributors)

cntrb_repo_insert_result, cntrb_repo_update_result = self.bulk_insert(self.contributor_repo_table,
unique_columns='event_id', insert=cntrb_repos_insert)

return cntrb_repos_insert
11 changes: 9 additions & 2 deletions augur/tasks/init/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class CollectionState(Enum):
'augur.tasks.data_analysis.clustering_worker.tasks',
'augur.tasks.data_analysis.discourse_analysis.tasks',
'augur.tasks.data_analysis.pull_request_analysis_worker.tasks',
'augur.tasks.data_analysis.insight_worker.tasks']
'augur.tasks.data_analysis.insight_worker.tasks',
'augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker']

materialized_view_tasks = ['augur.tasks.db.refresh_materialized_views']

Expand Down Expand Up @@ -139,7 +140,8 @@ def on_failure(self,exc,task_id,args,kwargs,einfo):
'augur.tasks.git.dependency_tasks.tasks.process_ossf_dependency_metrics': {'queue': 'secondary'},
'augur.tasks.git.dependency_tasks.tasks.process_dependency_metrics': {'queue': 'facade'},
'augur.tasks.git.dependency_libyear_tasks.tasks.process_libyear_dependency_metrics': {'queue': 'facade'},
'augur.tasks.frontend.*': {'queue': 'frontend'}
'augur.tasks.frontend.*': {'queue': 'frontend'},
'augur.tasks.data_analysis.contributor_breadth_worker.*': {'queue': 'secondary'},
}

#Setting to be able to see more detailed states of running tasks
Expand Down Expand Up @@ -204,6 +206,7 @@ def setup_periodic_tasks(sender, **kwargs):
from augur.tasks.start_tasks import non_repo_domain_tasks
from augur.tasks.git.facade_tasks import clone_repos
from augur.tasks.db.refresh_materialized_views import refresh_materialized_views
from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model

with DatabaseEngine() as engine, DatabaseSession(logger, engine) as session:

Expand All @@ -225,6 +228,10 @@ def setup_periodic_tasks(sender, **kwargs):
logger.info(f"Scheduling update of collection weights on midnight each day")
sender.add_periodic_task(crontab(hour=0, minute=0),augur_collection_update_weights.s())

logger.info(f"Scheduling contributor breadth every 30 days")
thirty_days_in_seconds = 30*24*60*60
sender.add_periodic_task(thirty_days_in_seconds, contributor_breadth_model.s())


@after_setup_logger.connect
def setup_loggers(*args,**kwargs):
Expand Down

0 comments on commit 34b847c

Please sign in to comment.