diff --git a/README.md b/README.md index ca6c17a6f1..9977fc81a9 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Augur NEW Release v0.53.2 +# Augur NEW Release v0.60.0 [![first-timers-only](https://img.shields.io/badge/first--timers--only-friendly-blue.svg?style=flat-square)](https://www.firsttimersonly.com/) We follow the [First Timers Only](https://www.firsttimersonly.com/) philosophy of tagging issues for first timers only, and walking one newcomer through the resolution process weekly. [You can find these issues tagged with "first timers only" on our issues list.](https://github.com/chaoss/augur/labels/first-timers-only). @@ -7,8 +7,7 @@ ## NEW RELEASE ALERT! ### [If you want to jump right in, updated docker build/compose and bare metal installation instructions are available here](docs/new-install.md) - -Augur is now releasing a dramatically improved new version to the main branch. It is also available here: https://github.com/chaoss/augur/releases/tag/v0.53.2 +Augur is now releasing a dramatically improved new version to the main branch. It is also available here: https://github.com/chaoss/augur/releases/tag/v0.60.0 - The `main` branch is a stable version of our new architecture, which features: - Dramatic improvement in the speed of large scale data collection (100,000+ repos). All data is obtained for 100k+ repos within 2 weeks. - A new job management architecture that uses Celery and Redis to manage queues, and enables users to run a Flower job monitoring dashboard diff --git a/augur/api/routes/dei.py b/augur/api/routes/dei.py index da724197df..dea79b79c2 100644 --- a/augur/api/routes/dei.py +++ b/augur/api/routes/dei.py @@ -17,7 +17,7 @@ from augur.application.db.session import DatabaseSession from augur.application.config import AugurConfig -from augur.tasks.util.collection_util import start_block_of_repos, get_enabled_phase_names_from_config, core_task_success_util +from augur.tasks.util.collection_util import CollectionRequest,AugurTaskRoutine, get_enabled_phase_names_from_config, core_task_success_util from augur.tasks.start_tasks import prelim_phase, primary_repo_collect_phase from augur.tasks.github.util.github_task_session import GithubTaskSession from augur.tasks.init.redis_connection import redis_connection as redis @@ -96,7 +96,13 @@ def core_task_success_util_gen(repo_git): record = BadgingDEI(**record) session.add(record) - start_block_of_repos(logger, session, [repo_url], primary_enabled_phases, "new") + + deiHook = CollectionRequest("core",primary_enabled_phases) + deiHook.repo_list = [repo_url] + + singleRoutine = AugurTaskRoutine(session,[deiHook]) + singleRoutine.start_data_collection() + #start_block_of_repos(logger, session, [repo_url], primary_enabled_phases, "new") session.close() diff --git a/augur/api/server.py b/augur/api/server.py index e3c9663650..d3e92ad99e 100644 --- a/augur/api/server.py +++ b/augur/api/server.py @@ -10,24 +10,30 @@ import base64 import logging import importlib +import graphene from typing import Optional, List, Any, Tuple from pathlib import Path -from flask import Flask, request, Response, redirect +from flask import Flask, request, Response, redirect, jsonify from flask_cors import CORS import pandas as pd from beaker.util import parse_cache_config_options from beaker.cache import CacheManager, Cache from sqlalchemy import create_engine from sqlalchemy.pool import StaticPool +from flask_graphql import GraphQLView +from graphene_sqlalchemy import SQLAlchemyObjectType + from augur.application.logs import AugurLogger from augur.application.config import AugurConfig from augur.application.db.session import DatabaseSession from augur.application.db.engine import get_database_string, create_database_engine from metadata import __version__ as augur_code_version +from augur.application.db.models import Repo, Issue, PullRequest, Message, PullRequestReview, Commit, IssueAssignee, PullRequestAssignee, PullRequestCommit, PullRequestFile, Contributor, IssueLabel, PullRequestLabel, ContributorsAlias, Release, ClientApplication + # from augur.api.routes import AUGUR_API_VERSION AUGUR_API_VERSION = "api/unstable" @@ -327,6 +333,335 @@ def get_server_cache(config, cache_manager) -> Cache: db_session = DatabaseSession(logger, engine) augur_config = AugurConfig(logger, db_session) + +def get_connection(table, cursor_field_name, connection_class, after, limit, extra_condition=False): + + cursor_field = getattr(table, cursor_field_name) + query = db_session.query(table).order_by(cursor_field) + + if after: + cursor_id = after + query = query.filter(cursor_field > cursor_id) + + if extra_condition: + field = getattr(table, extra_condition["field_name"]) + query = query.filter(field == extra_condition["value"]) + + # get one more item to determine if there is a next page + items = query.limit(limit + 1).all() + has_next_page = len(items) > limit + items = items[:limit] + + + if items: + next_cursor = getattr(items[-1], cursor_field_name) + else: + next_cursor = None + + return connection_class(items=items, page_info=PageInfoType(next_cursor=next_cursor, has_next_page=has_next_page)) + + + + +########### Repo Types ################## +class RepoType(SQLAlchemyObjectType): + class Meta: + model = Repo + use_connection = True + + issues = graphene.Field(lambda: IssueConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + prs = graphene.Field(lambda: PullRequestConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + messages = graphene.Field(lambda: MessageConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + releases = graphene.List(lambda: ReleaseType) + cursor = graphene.String() + + def resolve_cursor(self, info): + return str(self.repo_id) + + def resolve_issues(self, info, after=None, limit=None): + condition = {"field_name": "repo_id", "value": self.repo_id} + issue_connection = get_connection(Issue, "issue_id", IssueConnection, after, limit, condition) + return issue_connection + + def resolve_prs(self, info, after=None, limit=None): + condition = {"field_name": "repo_id", "value": self.repo_id} + pr_connection = get_connection(PullRequest, "pull_request_id", PullRequestConnection, after, limit, condition) + return pr_connection + + def resolve_messages(self, info, after=None, limit=None): + condition = {"field_name": "repo_id", "value": self.repo_id} + messages_connection = get_connection(Message, "msg_id", MessageConnection, after, limit,condition) + return messages_connection + + def resolve_releases(self, info): + return self.releases + +class ReleaseType(SQLAlchemyObjectType): + + class Meta: + model = Release + use_connection = True + + +############### Issue Objects ############# +class IssueType(SQLAlchemyObjectType): + class Meta: + model = Issue + use_connection = True + + repo = graphene.Field(RepoType) + messages = graphene.List(lambda: MessageType) + labels = graphene.List(lambda: IssueLabelType) + assignees = graphene.List(lambda: IssueAssigneeType) + cursor = graphene.String() + + def resolve_cursor(self, info): + return str(self.issue_id) + + def resolve_repo(self, info): + return self.repo + + def resolve_messages(self, info): + messages = [ref.message for ref in self.message_refs] + return messages + + def resolve_labels(self, info): + return self.labels + + def resolve_assignees(self, info): + return self.assignees + +class IssueAssigneeType(SQLAlchemyObjectType): + + class Meta: + model = IssueAssignee + use_connection = True + +class IssueLabelType(SQLAlchemyObjectType): + + class Meta: + model = IssueLabel + use_connection = True + + +################ Pull Request Objects ############ +class PullRequestType(SQLAlchemyObjectType): + class Meta: + model = PullRequest + use_connection = True + + repo = graphene.Field(RepoType) + messages = graphene.List(lambda: MessageType) + reviews = graphene.List(lambda: PullRequestReviewType) + labels = graphene.List(lambda: PrLabelType) + assignees = graphene.List(lambda: PullRequestAssigneeType) + files = graphene.List(lambda: PullRequestFileType) + cursor = graphene.String() + + def resolve_cursor(self, info): + return str(self.pull_request_id) + + def resolve_repo(self, info): + return self.repo + + def resolve_messages(self, info): + messages = [ref.message for ref in self.message_refs] + return messages + + def resolve_reviews(self, info): + return self.reviews + + def resolve_labels(self, info): + return self.labels + + def resolve_assignees(self, info): + return self.assignees + + def resolve_files(self, info): + return self.files + +class PullRequestAssigneeType(SQLAlchemyObjectType): + + class Meta: + model = PullRequestAssignee + use_connection = True + +class PullRequestReviewType(SQLAlchemyObjectType): + + class Meta: + model = PullRequestReview + use_connection = True + +class PrLabelType(SQLAlchemyObjectType): + + class Meta: + model = PullRequestLabel + use_connection = True + + +class PullRequestFileType(SQLAlchemyObjectType): + + class Meta: + model = PullRequestFile + use_connection = True + +class PullRequestCommitType(SQLAlchemyObjectType): + + class Meta: + model = PullRequestCommit + use_connection = True + + + +########### Contributor Types ############# +class ContributorType(SQLAlchemyObjectType): + + class Meta: + model = Contributor + use_connection = True + + issues_opened = graphene.List(lambda: IssueType) + pull_requests = graphene.List(lambda: PullRequestType) + pull_request_reviews = graphene.List(lambda: PullRequestReviewType) + commits = graphene.List(lambda: CommitType) + cursor = graphene.String() + + def resolve_cursor(self, info): + return str(self.cntrb_id) + + def resolve_issues_opened(self, info): + return self.issues_opened + + def resolve_pull_requests(self, info): + return self.pull_requests + + def resolve_pull_request_reviews(self, info): + return self.pull_request_reviews + + def resolve_commits(self, info): + return self.commits + +class ContributorAliasType(SQLAlchemyObjectType): + + class Meta: + model = ContributorsAlias + use_connection = True + + + +########### Other Types ################ +class MessageType(SQLAlchemyObjectType): + + class Meta: + model = Message + use_connection = True + + def resolve_repo(self, info): + return self.repo + + cursor = graphene.String() + + def resolve_cursor(self, info): + return str(self.msg_id) + +class CommitType(SQLAlchemyObjectType): + + class Meta: + model = Commit + use_connection = True + + messages = graphene.List(MessageType) + + def resolve_repo(self, info): + return self.repo + +class PageInfoType(graphene.ObjectType): + next_cursor = graphene.String() + has_next_page = graphene.Boolean() + + + + +########### Connection Objects ############# +class GenericConnection(graphene.ObjectType): + page_info = graphene.Field(PageInfoType) + +class RepoConnection(GenericConnection): + items = graphene.List(RepoType) + +class IssueConnection(GenericConnection): + items = graphene.List(IssueType) + +class PullRequestConnection(GenericConnection): + items = graphene.List(PullRequestType) + +class CommitConnection(GenericConnection): + items = graphene.List(CommitType) + +class ContributorConnection(GenericConnection): + items = graphene.List(ContributorType) + +class MessageConnection(GenericConnection): + items = graphene.List(MessageType) + + +############### Base Query object ############## +class Query(graphene.ObjectType): + + repos = graphene.Field(RepoConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + repo = graphene.Field(RepoType, id=graphene.Int()) + + issues = graphene.Field(IssueConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + issue = graphene.Field(IssueType, id=graphene.Int()) + + prs = graphene.Field(PullRequestConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + pr = graphene.List(PullRequestType, id=graphene.Int()) + + messages = graphene.Field(MessageConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + commits = graphene.Field(CommitConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + + contributors = graphene.Field(ContributorConnection, after=graphene.String(), limit=graphene.Int(default_value=10)) + contributor = graphene.Field(ContributorType, id=graphene.UUID()) + + def resolve_repos(self, info, after=None, limit=None): + repo_connection = get_connection(Repo, "repo_id", RepoConnection, after, limit) + return repo_connection + + def resolve_repo(self, info, id): + return db_session.query(Repo).filter(Repo.repo_id==id).first() + + def resolve_issues(self, info, after=None, limit=None): + issue_connection = get_connection(Issue, "issue_id", IssueConnection, after, limit) + return issue_connection + + def resolve_issue(self, info, id): + return db_session.query(Issue).filter(Issue.issue_id==id).first() + + def resolve_prs(self, info, after=None, limit=None): + pr_connection = get_connection(PullRequest, "pull_request_id", PullRequestConnection, after, limit) + return pr_connection + + def resolve_pr(self, info, id): + return db_session.query(PullRequest).filter(PullRequest.pull_request_id==id).first() + + def resolve_messages(self, info, after=None, limit=None): + messages_connection = get_connection(Message, "msg_id", MessageConnection, after, limit) + return messages_connection + + def resolve_commits(self, info, after=None, limit=None): + commit_connection = get_connection(Commit, "cmt_id", CommitConnection, after, limit) + return commit_connection + + def resolve_contributors(self, info, after=None, limit=None): + contributors_connection = get_connection(Contributor, "cntrb_id", ContributorConnection, after, limit) + return contributors_connection + + def resolve_contributor(self, info, id): + return db_session.query(Contributor).filter(Contributor.cntrb_id==id).first() + + + + template_dir = str(Path(__file__).parent.parent / "templates") static_dir = str(Path(__file__).parent.parent / "static") @@ -371,6 +706,25 @@ def status(): status=200, mimetype="application/json") +schema = graphene.Schema(query=Query) + +class AuthenticatedGraphQLView(GraphQLView): + def dispatch_request(self): + + api_key = request.headers.get('x-api-key') + + client_applications = db_session.query(ClientApplication).all() + api_keys = [app.api_key for app in client_applications] + + if not api_key or api_key not in api_keys: + return jsonify(error="Invalid or missing API key"), 403 + + return super().dispatch_request() + +schema = graphene.Schema(query=Query) + +app.add_url_rule(f'/{app.augur_api_version}/graphql', view_func=AuthenticatedGraphQLView.as_view('graphql', schema=schema, graphiql=True)) + from .routes import * # import frontend routes diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 548c1eeff4..29afab2b0d 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -177,14 +177,14 @@ def determine_worker_processes(ratio,maximum): sleep_time += 6 #20% of estimate, Maximum value of 25 - secondary_num_processes = determine_worker_processes(.2, 25) + secondary_num_processes = determine_worker_processes(.25, 25) logger.info(f"Starting secondary worker processes with concurrency={secondary_num_processes}") secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={secondary_num_processes} -n secondary:{uuid.uuid4().hex}@%h -Q secondary" process_list.append(subprocess.Popen(secondary_worker.split(" "))) sleep_time += 6 #15% of estimate, Maximum value of 20 - facade_num_processes = determine_worker_processes(.2, 20) + facade_num_processes = determine_worker_processes(.15, 20) logger.info(f"Starting facade worker processes with concurrency={facade_num_processes}") facade_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={facade_num_processes} -n facade:{uuid.uuid4().hex}@%h -Q facade" diff --git a/augur/application/cli/tasks.py b/augur/application/cli/tasks.py index db31943ff1..b4bec994eb 100644 --- a/augur/application/cli/tasks.py +++ b/augur/application/cli/tasks.py @@ -36,8 +36,8 @@ def start(): secondary_worker_process = None scheduling_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=1 -n scheduling:{uuid.uuid4().hex}@%h -Q scheduling" - core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=14 -n core:{uuid.uuid4().hex}@%h" - secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=5 -n secondary:{uuid.uuid4().hex}@%h -Q secondary" + core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=45 -n core:{uuid.uuid4().hex}@%h" + secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=25 -n secondary:{uuid.uuid4().hex}@%h -Q secondary" scheduling_worker_process = subprocess.Popen(scheduling_worker.split(" ")) core_worker_process = subprocess.Popen(core_worker.split(" ")) @@ -92,4 +92,4 @@ def clear(): else: logger.error("Invalid input") - \ No newline at end of file + diff --git a/augur/application/db/models/augur_data.py b/augur/application/db/models/augur_data.py index 676a71deec..95cb0725d7 100644 --- a/augur/application/db/models/augur_data.py +++ b/augur/application/db/models/augur_data.py @@ -259,6 +259,12 @@ class Contributor(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) + issues_opened = relationship("Issue", primaryjoin="Issue.reporter_id == Contributor.cntrb_id", back_populates="reporter") + pull_requests = relationship("PullRequest", back_populates="cntrb") + pull_request_reviews = relationship("PullRequestReview", back_populates="cntrb") + commits = relationship("Commit", primaryjoin="Commit.cmt_author_platform_username == Contributor.cntrb_login", back_populates="contributor") + alias = relationship("ContributorsAlias", back_populates="cntrb") + @classmethod def from_github(cls, contributor, tool_source, tool_version, data_source): @@ -794,7 +800,7 @@ class ContributorsAlias(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - cntrb = relationship("Contributor") + cntrb = relationship("Contributor", back_populates="alias") class Repo(Base): @@ -862,6 +868,11 @@ class Repo(Base): repo_group = relationship("RepoGroup") user_repo = relationship("UserRepo") collection_status = relationship("CollectionStatus", back_populates="repo") + issues = relationship("Issue", back_populates="repo") + prs = relationship("PullRequest", back_populates="repo") + messages = relationship("Message", back_populates="repo") + commits = relationship("Commit", back_populates="repo") + releases = relationship("Release", back_populates="repo") @staticmethod def get_by_id(session, repo_id): @@ -1194,12 +1205,14 @@ class Commit(Base): contributor = relationship( "Contributor", primaryjoin="Commit.cmt_author_platform_username == Contributor.cntrb_login", + back_populates="commits" ) contributor1 = relationship( "Contributor", primaryjoin="Commit.cmt_author_platform_username == Contributor.cntrb_login", ) - repo = relationship("Repo") + repo = relationship("Repo", back_populates="commits") + message_ref = relationship("CommitCommentRef", back_populates="cmt") class Issue(Base): @@ -1259,12 +1272,14 @@ class Issue(Base): ) cntrb = relationship( - "Contributor", primaryjoin="Issue.cntrb_id == Contributor.cntrb_id" - ) - repo = relationship("Repo") + "Contributor", primaryjoin="Issue.cntrb_id == Contributor.cntrb_id") + repo = relationship("Repo", back_populates="issues") reporter = relationship( - "Contributor", primaryjoin="Issue.reporter_id == Contributor.cntrb_id" + "Contributor", primaryjoin="Issue.reporter_id == Contributor.cntrb_id", back_populates="issues_opened" ) + message_refs = relationship("IssueMessageRef", back_populates="issue") + assignees = relationship("IssueAssignee", back_populates="issue") + labels = relationship("IssueLabel", back_populates="issue") # @classmethod # def from_github(cls): @@ -1408,8 +1423,11 @@ class Message(Base): cntrb = relationship("Contributor") pltfrm = relationship("Platform") - repo = relationship("Repo") + repo = relationship("Repo", back_populates="messages") rgls = relationship("RepoGroupsListServe") + pr_message_ref = relationship("PullRequestMessageRef", back_populates="message") + issue_message_ref = relationship("IssueMessageRef", back_populates="message") + commit_message_ref = relationship("CommitCommentRef", back_populates="msg") # @classmethod # def from_github(cls): @@ -1582,8 +1600,13 @@ class PullRequest(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - pr_augur_contributor = relationship("Contributor") - repo = relationship("Repo") + cntrb = relationship("Contributor", back_populates="pull_requests") + repo = relationship("Repo", back_populates="prs") + message_refs = relationship("PullRequestMessageRef", back_populates="pr") + reviews = relationship("PullRequestReview", back_populates="pr") + labels = relationship("PullRequestLabel", back_populates="pull_request") + assignees = relationship("PullRequestAssignee", back_populates="pull_request") + files = relationship("PullRequestFile", back_populates="") @classmethod def from_github(cls, pr, repo_id, tool_source, tool_version): @@ -1661,7 +1684,7 @@ class Release(Base): TIMESTAMP(precision=6), server_default=text("CURRENT_TIMESTAMP") ) - repo = relationship("Repo") + repo = relationship("Repo", back_populates="releases") class RepoBadging(Base): @@ -2136,7 +2159,7 @@ class CommitCommentRef(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - cmt = relationship("Commit") + cmt = relationship("Commit", back_populates="message_ref") msg = relationship("Message") @@ -2236,7 +2259,7 @@ class IssueAssignee(Base): ) cntrb = relationship("Contributor") - issue = relationship("Issue") + issue = relationship("Issue", back_populates="assignees") repo = relationship("Repo") @classmethod @@ -2379,7 +2402,7 @@ class IssueLabel(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - issue = relationship("Issue") + issue = relationship("Issue", back_populates="labels") repo = relationship("Repo") @classmethod @@ -2456,8 +2479,8 @@ class IssueMessageRef(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - issue = relationship("Issue") - msg = relationship("Message") + issue = relationship("Issue", back_populates="message_refs") + message = relationship("Message", back_populates="issue_message_ref") repo = relationship("Repo") @@ -2683,7 +2706,7 @@ class PullRequestAssignee(Base): ) contrib = relationship("Contributor") - pull_request = relationship("PullRequest") + pull_request = relationship("PullRequest", back_populates="assignees") repo = relationship("Repo") @classmethod @@ -2896,7 +2919,7 @@ class PullRequestFile(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - pull_request = relationship("PullRequest") + pull_request = relationship("PullRequest", back_populates="files") repo = relationship("Repo") # @classmethod @@ -2945,7 +2968,7 @@ class PullRequestLabel(Base): ) - pull_request = relationship("PullRequest") + pull_request = relationship("PullRequest", back_populates="labels") repo = relationship("Repo") @classmethod @@ -3013,8 +3036,8 @@ class PullRequestMessageRef(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - msg = relationship("Message") - pull_request = relationship("PullRequest") + message = relationship("Message", back_populates="pr_message_ref") + pr = relationship("PullRequest", back_populates="message_refs") repo = relationship("Repo") @@ -3209,9 +3232,9 @@ class PullRequestReview(Base): TIMESTAMP(precision=0), server_default=text("CURRENT_TIMESTAMP") ) - cntrb = relationship("Contributor") + cntrb = relationship("Contributor", back_populates="pull_request_reviews") platform = relationship("Platform") - pull_request = relationship("PullRequest") + pr = relationship("PullRequest", back_populates="reviews") repo = relationship("Repo") # @classmethod diff --git a/augur/static/css/dashboard.css b/augur/static/css/dashboard.css index 1e998a10a4..08b98b3785 100644 --- a/augur/static/css/dashboard.css +++ b/augur/static/css/dashboard.css @@ -44,6 +44,20 @@ body { color: #bcd0f7; } +.circle-opaque { + border-radius: 50%; /* Make it a circle */ + display: inline-block; + position: absolute; /* Able to position it, overlaying the other image */ + left:0px; /* Customise the position, but make sure it */ + top:0px; /* is the same as .circle-transparent */ + z-index: -1; /* Makes the image sit *behind* .circle-transparent */ +} + +.circle-opaque img { + border-radius: 50%; /* Make it a circle */ + z-index: -1; +} + table { background-color: var(--color-fg); color: var(--color-fg-contrast); diff --git a/augur/static/css/stylesheet.css b/augur/static/css/stylesheet.css index 7ebcfc6430..59bbf07857 100644 --- a/augur/static/css/stylesheet.css +++ b/augur/static/css/stylesheet.css @@ -125,6 +125,10 @@ body { overflow: auto; } +.display-table th { + word-wrap: normal; +} + .paginationActive { background-color: var(--color-accent-dark); border-color: var(--color-accent-dark); diff --git a/augur/static/img/tswiftjet.png b/augur/static/img/tswiftjet.png new file mode 100644 index 0000000000..1d7a96ee8a Binary files /dev/null and b/augur/static/img/tswiftjet.png differ diff --git a/augur/tasks/db/refresh_materialized_views.py b/augur/tasks/db/refresh_materialized_views.py index 39ab698fd5..76420c253e 100644 --- a/augur/tasks/db/refresh_materialized_views.py +++ b/augur/tasks/db/refresh_materialized_views.py @@ -53,7 +53,7 @@ def refresh_materialized_views(): COMMIT; """) - mv_8_refresh = s.sql.text(""" + mv8_refresh = s.sql.text(""" REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_contributor_actions with data; COMMIT; diff --git a/augur/tasks/git/dependency_tasks/tasks.py b/augur/tasks/git/dependency_tasks/tasks.py index 4c540a3ea5..5e7c1d846f 100644 --- a/augur/tasks/git/dependency_tasks/tasks.py +++ b/augur/tasks/git/dependency_tasks/tasks.py @@ -3,7 +3,7 @@ from augur.application.db.session import DatabaseSession from augur.tasks.git.dependency_tasks.core import * from augur.tasks.init.celery_app import celery_app as celery -from augur.tasks.init.celery_app import AugurFacadeRepoCollectionTask, AugurCoreRepoCollectionTask +from augur.tasks.init.celery_app import AugurFacadeRepoCollectionTask, AugurCoreRepoCollectionTask, AugurSecondaryRepoCollectionTask from augur.application.db.util import execute_session_query from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_absolute_repo_path from augur.application.config import AugurConfig @@ -33,7 +33,7 @@ def process_dependency_metrics(repo_git): generate_deps_data(session,repo.repo_id,absolute_repo_path) -@celery.task(base=AugurCoreRepoCollectionTask) +@celery.task(base=AugurSecondaryRepoCollectionTask) def process_ossf_dependency_metrics(repo_git): from augur.tasks.init.celery_app import engine diff --git a/augur/tasks/git/facade_tasks.py b/augur/tasks/git/facade_tasks.py index 3f08fde974..c763a2a2c1 100644 --- a/augur/tasks/git/facade_tasks.py +++ b/augur/tasks/git/facade_tasks.py @@ -23,12 +23,12 @@ from datetime import timedelta import sqlalchemy as s -from sqlalchemy import or_, and_, update +from sqlalchemy import or_, and_, update, insert -from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import update_repo_log, trim_commit, store_working_author, trim_author +from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import update_repo_log, trim_commits, store_working_author, trim_author from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_absolute_repo_path, get_parent_commits_set, get_existing_commits_set from augur.tasks.git.util.facade_worker.facade_worker.analyzecommit import analyze_commit -from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_facade_weight_time_factor, get_repo_commit_count, update_facade_scheduling_fields, get_facade_weight_with_commit_count +from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_facade_weight_time_factor, get_repo_commit_count, update_facade_scheduling_fields, get_facade_weight_with_commit_count, facade_bulk_insert_commits from augur.tasks.github.facade_github.tasks import * from augur.tasks.util.collection_util import CollectionState, get_collection_status_repo_git_from_filter @@ -125,16 +125,9 @@ def update_analysis_log(repos_id,status): # If there's a commit still there, the previous run was interrupted and # the commit data may be incomplete. It should be trimmed, just in case. - for commit in working_commits: - trim_commit(session, repo_id,commit['working_commit']) - - # Remove the working commit. - remove_commit = s.sql.text("""DELETE FROM working_commits - WHERE repos_id = :repo_id AND - working_commit = :commit""").bindparams(repo_id=repo_id,commit=commit['working_commit']) - session.execute_sql(remove_commit) - session.log_activity('Debug',f"Removed working commit: {commit['working_commit']}") - + commits_to_trim = [commit['working_commit'] for commit in working_commits] + + trim_commits(session,repo_id,commits_to_trim) # Start the main analysis update_analysis_log(repo_id,'Collecting data') @@ -194,8 +187,8 @@ def update_analysis_log(repos_id,status): - for commit in trimmed_commits: - trim_commit(session,repo_id,commit) + #for commit in trimmed_commits: + trim_commits(session,repo_id,trimmed_commits) update_analysis_log(repo_id,'Commit trimming complete') @@ -256,20 +249,20 @@ def analyze_commits_in_parallel(repo_git, multithreaded: bool)-> None: missing_commits = parent_commits - existing_commits session.log_activity('Debug',f"Commits missing from repo {repo_id}: {len(missing_commits)}") + - queue = [] - if len(missing_commits) > 0: + if not len(missing_commits): #session.log_activity('Info','Type of missing_commits: %s' % type(missing_commits)) - - #encode the repo_id with the commit. - commits = list(missing_commits) - #Get all missing commits into one large list to split into task pools - queue.extend(commits) - else: return + + queue = list(missing_commits) logger.info(f"Got to analysis!") - + absoulte_path = get_absolute_repo_path(session.repo_base_directory, repo.repo_id, repo.repo_path,repo.repo_name) + repo_loc = (f"{absoulte_path}/.git") + + pendingCommitRecordsToInsert = [] + for count, commitTuple in enumerate(queue): quarterQueue = int(len(queue) / 4) @@ -282,13 +275,27 @@ def analyze_commits_in_parallel(repo_git, multithreaded: bool)-> None: #logger.info(f"Got to analysis!") - absoulte_path = get_absolute_repo_path(session.repo_base_directory, repo.repo_id, repo.repo_path,repo.repo_name) - repo_loc = (f"{absoulte_path}/.git") + commitRecords = analyze_commit(session, repo_id, repo_loc, commitTuple) + #logger.debug(commitRecord) + if len(commitRecords): + pendingCommitRecordsToInsert.extend(commitRecords) + if len(pendingCommitRecordsToInsert) >= 1000: + facade_bulk_insert_commits(session,pendingCommitRecordsToInsert) + pendingCommitRecordsToInsert = [] + + facade_bulk_insert_commits(session,pendingCommitRecordsToInsert) - analyze_commit(session, repo_id, repo_loc, commitTuple) + - logger.info("Analysis complete") + + # Remove the working commit. + remove_commit = s.sql.text("""DELETE FROM working_commits + WHERE repos_id = :repo_id AND working_commit IN :hashes + """).bindparams(repo_id=repo_id,hashes=tuple(queue)) + session.execute_sql(remove_commit) + + logger.info("Analysis complete") return @celery.task @@ -390,7 +397,7 @@ def clone_repos(): # with FacadeSession(logger) as session: # check_for_repo_updates(session, repo_git) -@celery.task +@celery.task(base=AugurFacadeRepoCollectionTask) def git_update_commit_count_weight(repo_git): from augur.tasks.init.celery_app import engine @@ -403,7 +410,7 @@ def git_update_commit_count_weight(repo_git): update_facade_scheduling_fields(session, repo_git, facade_weight, commit_count) -@celery.task +@celery.task(base=AugurFacadeRepoCollectionTask) def git_repo_updates_facade_task(repo_git): logger = logging.getLogger(git_repo_updates_facade_task.__name__) diff --git a/augur/tasks/git/util/facade_worker/facade_worker/analyzecommit.py b/augur/tasks/git/util/facade_worker/facade_worker/analyzecommit.py index 2126d1ee9a..285ec6c780 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/analyzecommit.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/analyzecommit.py @@ -103,7 +103,7 @@ def discover_alias(email): else: return email - def store_commit(repos_id,commit,filename, + def generate_commit_record(repos_id,commit,filename, author_name,author_email,author_date,author_timestamp, committer_name,committer_email,committer_date,committer_timestamp, added,removed, whitespace): @@ -122,72 +122,31 @@ def store_commit(repos_id,commit,filename, #2021-10-11 11:57:46 -0500 placeholder_date = "1970-01-01 00:00:15 -0500" - #session.logger.info(f"Timestamp: {author_timestamp}") commit_record = { 'repo_id' : repos_id, - 'commit' : str(commit), - 'filename' : filename, - 'author_name' : str(author_name), - 'author_email_raw' : author_email, - 'author_email' : discover_alias(author_email), - 'author_date' : author_date, - 'author_timestamp' : author_timestamp if len(author_timestamp.replace(" ", "")) != 0 else placeholder_date, - 'committer_name' : committer_name, - 'committer_email_raw' : committer_email, - 'committer_email' : discover_alias(committer_email), - 'committer_date' : committer_date if len(committer_date.replace(" ", "")) != 0 else placeholder_date, - 'committer_timestamp' : committer_timestamp if len(committer_timestamp.replace(" ","")) != 0 else placeholder_date, - 'added' : added, - 'removed' : removed, - 'whitespace' : whitespace, - 'committer_date' : committer_date if len(committer_date.replace(" ","")) != 0 else placeholder_date, + 'cmt_commit_hash' : str(commit), + 'cmt_filename' : filename, + 'cmt_author_name' : str(author_name), + 'cmt_author_raw_email' : author_email, + 'cmt_author_email' : discover_alias(author_email), + 'cmt_author_date' : author_date, + 'cmt_author_timestamp' : author_timestamp if len(author_timestamp.replace(" ", "")) != 0 else placeholder_date, + 'cmt_committer_name' : committer_name, + 'cmt_committer_raw_email' : committer_email, + 'cmt_committer_email' : discover_alias(committer_email), + 'cmt_committer_date' : committer_date if len(committer_date.replace(" ", "")) != 0 else placeholder_date, + 'cmt_committer_timestamp' : committer_timestamp if len(committer_timestamp.replace(" ","")) != 0 else placeholder_date, + 'cmt_added' : added, + 'cmt_removed' : removed, + 'cmt_whitespace' : whitespace, + 'cmt_date_attempted' : committer_date if len(committer_date.replace(" ","")) != 0 else placeholder_date, 'tool_source' : "Facade", 'tool_version' : "0.42", 'data_source' : "git" } - #TODO: replace with a postgres on conflict do nothing. - IM 10/11/22 - store = s.sql.text("""INSERT INTO commits (repo_id,cmt_commit_hash,cmt_filename, - cmt_author_name,cmt_author_raw_email,cmt_author_email,cmt_author_date,cmt_author_timestamp, - cmt_committer_name,cmt_committer_raw_email,cmt_committer_email,cmt_committer_date,cmt_committer_timestamp, - cmt_added,cmt_removed,cmt_whitespace, cmt_date_attempted, tool_source, tool_version, data_source) - VALUES (:repo_id,:commit,:filename,:author_name,:author_email_raw,:author_email,:author_date,:author_timestamp, - :committer_name,:committer_email_raw,:committer_email,:committer_date,:committer_timestamp, - :added,:removed,:whitespace,:committer_date,:tool_source,:tool_version,:data_source) - """).bindparams(**commit_record) - - try: - session.execute_sql(store) - except DataError as e: - session.logger.error(f"Ran into bad data when trying to insert commit with values: \n {commit_record} \n Error: {e}") - - #Check for improper utc timezone offset - #UTC timezone offset should be betwen -14:00 and +14:00 - - if "time zone displacement" in f"{e}": - commit_record['author_timestamp'] = placeholder_date - commit_record['committer_timestamp'] = placeholder_date - - store = s.sql.text("""INSERT INTO commits (repo_id,cmt_commit_hash,cmt_filename, - cmt_author_name,cmt_author_raw_email,cmt_author_email,cmt_author_date,cmt_author_timestamp, - cmt_committer_name,cmt_committer_raw_email,cmt_committer_email,cmt_committer_date,cmt_committer_timestamp, - cmt_added,cmt_removed,cmt_whitespace, cmt_date_attempted, tool_source, tool_version, data_source) - VALUES (:repo_id,:commit,:filename,:author_name,:author_email_raw,:author_email,:author_date,:author_timestamp, - :committer_name,:committer_email_raw,:committer_email,:committer_date,:committer_timestamp, - :added,:removed,:whitespace,:committer_date,:tool_source,:tool_version,:data_source) - """).bindparams(**commit_record) - - session.execute_sql(store) - else: - raise e - except Exception as e: - - session.logger.error(f"Ran into issue when trying to insert commit with values: \n {commit_record} \n Error: {e}") - raise e - - - #session.log_activity('Debug',f"Stored commit: {commit}") + return commit_record ### The real function starts here ### @@ -199,6 +158,8 @@ def store_commit(repos_id,commit,filename, removed = 0 whitespace = 0 + recordsToInsert = [] + # Go get the contributors (committers) for this repo here: # curl https://api.github.com/repos/chaoss/augur/contributors # Load the contributors @@ -297,10 +258,10 @@ def store_commit(repos_id,commit,filename, if not header: - store_commit(repo_id,commit,filename, + recordsToInsert.append(generate_commit_record(repo_id,commit,filename, author_name,author_email,author_date,author_timestamp, committer_name,committer_email,committer_date,committer_timestamp, - added,removed,whitespace) + added,removed,whitespace)) header = False @@ -356,19 +317,10 @@ def store_commit(repos_id,commit,filename, whitespaceCheck.append(line[1:].strip()) # Store the last stats from the git log - store_commit(repo_id,commit,filename, + recordsToInsert.append(generate_commit_record(repo_id,commit,filename, author_name,author_email,author_date,author_timestamp, committer_name,committer_email,committer_date,committer_timestamp, - added,removed,whitespace) - - # Remove the working commit. - try: - remove_commit = s.sql.text("""DELETE FROM working_commits - WHERE repos_id = :repo_id AND working_commit = :hash - """).bindparams(repo_id=repo_id,hash=commit) - session.execute_sql(remove_commit) - - #session.log_activity('Debug',f"Completed and removed working commit: {commit}") - except: - session.log_activity('Info', f"Working Commit: {commit}") - # If multithreading, clean up the local database + added,removed,whitespace)) + + + return recordsToInsert diff --git a/augur/tasks/git/util/facade_worker/facade_worker/facade00mainprogram.py b/augur/tasks/git/util/facade_worker/facade_worker/facade00mainprogram.py index a88fd940a3..909c418094 100755 --- a/augur/tasks/git/util/facade_worker/facade_worker/facade00mainprogram.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/facade00mainprogram.py @@ -30,7 +30,7 @@ import sys, platform, imp, time, datetime, html.parser, subprocess, os, getopt, xlsxwriter, configparser, logging from multiprocessing import Process, Queue from .config import FacadeSession as FacadeSession -from .utilitymethods import trim_commit, store_working_author, trim_author +from .utilitymethods import trim_commits, store_working_author, trim_author from .analyzecommit import analyze_commit from .postanalysiscleanup import git_repo_cleanup from .repofetch import git_repo_initialize, check_for_repo_updates, force_repo_updates, force_repo_analysis, git_repo_updates diff --git a/augur/tasks/git/util/facade_worker/facade_worker/rebuildcache.py b/augur/tasks/git/util/facade_worker/facade_worker/rebuildcache.py index e74c33a82e..03206b0242 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/rebuildcache.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/rebuildcache.py @@ -37,7 +37,7 @@ import xlsxwriter import configparser import sqlalchemy as s -from .utilitymethods import update_repo_log, trim_commit, store_working_author, trim_author +from .utilitymethods import update_repo_log, trim_commits, store_working_author, trim_author # if platform.python_implementation() == 'PyPy': # import pymysql # else: diff --git a/augur/tasks/git/util/facade_worker/facade_worker/repofetch.py b/augur/tasks/git/util/facade_worker/facade_worker/repofetch.py index c738976e41..35110239bf 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/repofetch.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/repofetch.py @@ -38,7 +38,7 @@ import configparser import pathlib import sqlalchemy as s -from .utilitymethods import update_repo_log, trim_commit, store_working_author, trim_author, get_absolute_repo_path +from .utilitymethods import update_repo_log, trim_commits, store_working_author, trim_author, get_absolute_repo_path from augur.application.db.models.augur_data import * from augur.application.db.models.augur_operations import CollectionStatus from augur.application.db.util import execute_session_query, convert_orm_list_to_dict_list diff --git a/augur/tasks/git/util/facade_worker/facade_worker/utilitymethods.py b/augur/tasks/git/util/facade_worker/facade_worker/utilitymethods.py index e73a202e97..aef4e59989 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/utilitymethods.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/utilitymethods.py @@ -38,6 +38,7 @@ import xlsxwriter import configparser import sqlalchemy as s +from sqlalchemy.exc import IntegrityError, DataError from .config import get_database_args_from_env from augur.application.db.models import * from .config import FacadeSession as FacadeSession @@ -60,19 +61,28 @@ def update_repo_log(session, repos_id,status): session.logger.error(f"Ran into error in update_repo_log: {e}") pass -def trim_commit(session, repo_id,commit): +def trim_commits(session, repo_id,commits): -# Quickly remove a given commit + # Quickly remove a given commit - remove_commit = s.sql.text("""DELETE FROM commits - WHERE repo_id=:repo_id - AND cmt_commit_hash=:hash""").bindparams(repo_id=repo_id,hash=commit) - - - - session.execute_sql(remove_commit) + if len(commits): + remove_commit = s.sql.text("""DELETE FROM commits + WHERE repo_id=:repo_id + AND cmt_commit_hash IN :hashes""").bindparams(repo_id=repo_id,hashes=tuple(commits)) + + + session.execute_sql(remove_commit) + + # Remove the working commit. + remove_commit = s.sql.text("""DELETE FROM working_commits + WHERE repos_id = :repo_id AND + working_commit IN :hashes""").bindparams(repo_id=repo_id,hashes=tuple(commits)) + + session.execute_sql(remove_commit) - session.log_activity('Debug',f"Trimmed commit: {commit}") + for commit in commits: + session.log_activity('Debug',f"Trimmed commit: {commit}") + session.log_activity('Debug',f"Removed working commit: {commit}") def store_working_author(session, email): @@ -205,3 +215,43 @@ def update_facade_scheduling_fields(session, repo_git, weight, commit_count): session.execute(update_query) session.commit() + +def facade_bulk_insert_commits(session,records): + + try: + session.execute( + s.insert(Commit), + records, + ) + session.commit() + except Exception as e: + + if len(records) > 1: + session.logger.error(f"Ran into issue when trying to insert commits \n Error: {e}") + + #split list into halves and retry insert until we isolate offending record + firsthalfRecords = records[:len(records)//2] + secondhalfRecords = records[len(records)//2:] + + facade_bulk_insert_commits(session,firsthalfRecords) + facade_bulk_insert_commits(session,secondhalfRecords) + elif len(records) == 1 and isinstance(e,DataError) and "time zone displacement" in f"{e}": + commit_record = records[0] + #replace incomprehensible dates with epoch. + #2021-10-11 11:57:46 -0500 + placeholder_date = "1970-01-01 00:00:15 -0500" + + #Check for improper utc timezone offset + #UTC timezone offset should be betwen -14:00 and +14:00 + + commit_record['author_timestamp'] = placeholder_date + commit_record['committer_timestamp'] = placeholder_date + + session.execute( + s.insert(Commit), + [commit_record], + ) + session.commit() + else: + raise e + diff --git a/augur/tasks/github/events/tasks.py b/augur/tasks/github/events/tasks.py index 54996c42cc..129afd0de5 100644 --- a/augur/tasks/github/events/tasks.py +++ b/augur/tasks/github/events/tasks.py @@ -1,6 +1,7 @@ import time import logging import traceback +import sqlalchemy as s from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask @@ -180,6 +181,7 @@ def process_events(events, task_name, repo_id, logger, augur_db): issue_event_natural_keys = ["issue_id", "issue_event_src_id"] augur_db.insert_data(issue_event_dicts, IssueEvent, issue_event_natural_keys) + update_issue_closed_cntrbs_from_events(augur_db.engine, repo_id) # TODO: Should we skip an event if there is no contributor to resolve it o def process_github_event_contributors(logger, event, tool_source, tool_version, data_source): @@ -194,3 +196,30 @@ def process_github_event_contributors(logger, event, tool_source, tool_version, return event, None return event, event_cntrb + + +def update_issue_closed_cntrbs_from_events(engine, repo_id): + + get_ranked_issues = s.text(f""" + WITH RankedIssues AS ( + SELECT repo_id, issue_id, cntrb_id, + ROW_NUMBER() OVER(PARTITION BY issue_id ORDER BY created_at DESC) AS rn + FROM issue_events + WHERE "action" = 'closed' + ) + + SELECT issue_id, cntrb_id from RankedIssues where rn=1 and repo_id={repo_id} and cntrb_id is not NULL + """) + result = engine.execute(get_ranked_issues).fetchall() + + update_data = [{'issue_id': row['issue_id'], 'cntrb_id': row['cntrb_id'], 'repo_id': repo_id} for row in result] + with engine.connect() as connection: + update_stmt = s.text(""" + UPDATE issues + SET cntrb_id = :cntrb_id + WHERE issue_id = :issue_id + AND repo_id = :repo_id + """) + connection.execute(update_stmt, update_data) + + diff --git a/augur/tasks/github/facade_github/tasks.py b/augur/tasks/github/facade_github/tasks.py index 04ccc16e12..577f17c32b 100644 --- a/augur/tasks/github/facade_github/tasks.py +++ b/augur/tasks/github/facade_github/tasks.py @@ -3,6 +3,7 @@ from augur.tasks.init.celery_app import celery_app as celery +from augur.tasks.init.celery_app import AugurFacadeRepoCollectionTask from augur.tasks.github.util.github_paginator import GithubPaginator, hit_api, retrieve_dict_from_endpoint from augur.tasks.github.util.github_task_session import GithubTaskSession, GithubTaskManifest from augur.tasks.github.util.util import get_owner_repo @@ -198,7 +199,7 @@ def link_commits_to_contributor(session,contributorQueue): # Update the contributors table from the data facade has gathered. -@celery.task +@celery.task(base=AugurFacadeRepoCollectionTask) def insert_facade_contributors(repo_id): from augur.tasks.init.celery_app import engine diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 6d52db8299..54068d30a0 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -143,65 +143,50 @@ def non_repo_domain_tasks(): tasks.apply_async() - - """ - The below functions define augur's collection hooks. - Each collection hook schedules tasks for a number of repos - """ -def start_primary_collection(session,max_repo, days_until_collect_again = 1): - - #Get list of enabled phases - enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) - - #Primary collection hook. +def build_primary_repo_collect_request(session,enabled_phase_names, days_until_collect_again = 1): + #Add all required tasks to a list and pass it to the CollectionRequest primary_enabled_phases = [] #Primary jobs if prelim_phase.__name__ in enabled_phase_names: primary_enabled_phases.append(prelim_phase) - - + primary_enabled_phases.append(primary_repo_collect_phase) #task success is scheduled no matter what the config says. def core_task_success_util_gen(repo_git): return core_task_success_util.si(repo_git) - - primary_enabled_phases.append(core_task_success_util_gen) - - start_repos_by_user(session, max_repo, primary_enabled_phases) -def start_secondary_collection(session,max_repo, days_until_collect_again = 1): + primary_enabled_phases.append(core_task_success_util_gen) - #Get list of enabled phases - enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) + primary_request = CollectionRequest("core",primary_enabled_phases,max_repo=40, days_until_collect_again=7) + primary_request.get_valid_repos(session) + return primary_request +def build_secondary_repo_collect_request(session,enabled_phase_names, days_until_collect_again = 1): #Deal with secondary collection secondary_enabled_phases = [] if prelim_phase.__name__ in enabled_phase_names: secondary_enabled_phases.append(prelim_phase_secondary) - + secondary_enabled_phases.append(secondary_repo_collect_phase) def secondary_task_success_util_gen(repo_git): return secondary_task_success_util.si(repo_git) secondary_enabled_phases.append(secondary_task_success_util_gen) + request = CollectionRequest("secondary",secondary_enabled_phases,max_repo=10, days_until_collect_again=10) - conds = f"augur_operations.collection_status.core_status = '{str(CollectionState.SUCCESS.value)}'"#[CollectionStatus.core_status == str(CollectionState.SUCCESS.value)] - start_repos_by_user( - session, max_repo, - secondary_enabled_phases,hook="secondary", - additional_conditions=conds - ) + request.get_valid_repos(session) + return request -def start_facade_collection(session,max_repo,days_until_collect_again = 1): - #Deal with secondary collection +def build_facade_repo_collect_request(session,enabled_phase_names, days_until_collect_again = 1): + #Deal with facade collection facade_enabled_phases = [] - + facade_enabled_phases.append(facade_phase) def facade_task_success_util_gen(repo_git): @@ -214,22 +199,12 @@ def facade_task_update_weight_util_gen(repo_git): facade_enabled_phases.append(facade_task_update_weight_util_gen) - #cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) - #not_pending = CollectionStatus.facade_status != str(CollectionState.PENDING.value) - #not_failed_clone = CollectionStatus.facade_status != str(CollectionState.FAILED_CLONE.value) - #not_initializing = CollectionStatus.facade_status != str(CollectionState.INITIALIZING.value) - - conds = f"augur_operations.collection_status.facade_status != '{str(CollectionState.PENDING.value)}' "#[not_pending,not_failed_clone,not_initializing] - conds += f"AND augur_operations.collection_status.facade_status != '{str(CollectionState.FAILED_CLONE.value)}' " - conds += f"AND augur_operations.collection_status.facade_status != '{str(CollectionState.INITIALIZING.value)}'" + request = CollectionRequest("facade",facade_enabled_phases,max_repo=30, days_until_collect_again=7) - start_repos_by_user( - session, max_repo, - facade_enabled_phases,hook="facade", - new_status=CollectionState.UPDATE.value,additional_conditions=conds - ) + request.get_valid_repos(session) + return request -def start_ml_collection(session,max_repo, days_until_collect_again=7): +def build_ml_repo_collect_request(session,enabled_phase_names, days_until_collect_again = 1): ml_enabled_phases = [] ml_enabled_phases.append(machine_learning_phase) @@ -239,13 +214,9 @@ def ml_task_success_util_gen(repo_git): ml_enabled_phases.append(ml_task_success_util_gen) - conds = f"augur_operations.collection_status.secondary_status = '{str(CollectionState.SUCCESS.value)}'" - - start_repos_by_user( - session,max_repo, - ml_enabled_phases,hook="ml",additional_conditions=conds - ) - + request = CollectionRequest("ml",ml_enabled_phases,max_repo=5, days_until_collect_again=10) + request.get_valid_repos(session) + return request @celery.task def augur_collection_monitor(): @@ -260,17 +231,27 @@ def augur_collection_monitor(): #Get list of enabled phases enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) + enabled_collection_hooks = [] + if primary_repo_collect_phase.__name__ in enabled_phase_names: - start_primary_collection(session, max_repo=30) + enabled_collection_hooks.append(build_primary_repo_collect_request(session,enabled_phase_names)) if secondary_repo_collect_phase.__name__ in enabled_phase_names: - start_secondary_collection(session, max_repo=10) + enabled_collection_hooks.append(build_secondary_repo_collect_request(session,enabled_phase_names)) + #start_secondary_collection(session, max_repo=10) if facade_phase.__name__ in enabled_phase_names: - start_facade_collection(session, max_repo=20) + #start_facade_collection(session, max_repo=30) + enabled_collection_hooks.append(build_facade_repo_collect_request(session,enabled_phase_names)) if machine_learning_phase.__name__ in enabled_phase_names: - start_ml_collection(session,max_repo=1) + enabled_collection_hooks.append(build_ml_repo_collect_request(session,enabled_phase_names)) + #start_ml_collection(session,max_repo=5) + + logger.info(f"Starting collection phases: {[h.name for h in enabled_collection_hooks]}") + main_routine = AugurTaskRoutine(session,enabled_collection_hooks) + + main_routine.start_data_collection() # have a pipe of 180 @@ -338,4 +319,4 @@ def create_collection_status_records(): repo = session.execute_sql(query).first() #Check for new repos every seven minutes to be out of step with the clone_repos task - create_collection_status_records.si().apply_async(countdown=60*7) \ No newline at end of file + create_collection_status_records.si().apply_async(countdown=60*7) diff --git a/augur/tasks/util/collection_util.py b/augur/tasks/util/collection_util.py index 288f8132e1..4d5b663a20 100644 --- a/augur/tasks/util/collection_util.py +++ b/augur/tasks/util/collection_util.py @@ -36,6 +36,173 @@ class CollectionState(Enum): UPDATE = "Update" FAILED_CLONE = "Failed Clone" +def get_list_of_all_users(session): + #Get a list of all users. + query = s.sql.text(""" + SELECT + user_id + FROM augur_operations.users + """) + + users = session.execute_sql(query).fetchall() + return users + + +def get_required_conditions_for_core_repos(allow_collected_before = False, days_until_collect_again = 1): + + if not allow_collected_before: + condition_concat_string = f""" + core_status='{str(CollectionState.PENDING.value)}' AND core_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.core_data_last_collected IS NULL + AND core_status!='{str(CollectionState.COLLECTING.value)}' + """ + else: + condition_concat_string = f""" + core_status='Success' AND core_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.core_data_last_collected IS NOT NULL + AND core_status!='{str(CollectionState.COLLECTING.value)}' + AND core_data_last_collected <= NOW() - INTERVAL '{days_until_collect_again} DAYS' + """ + + return condition_concat_string + +def get_required_conditions_for_secondary_repos(allow_collected_before = False, days_until_collect_again = 1): + + if not allow_collected_before: + condition_concat_string = f""" + secondary_status='{str(CollectionState.PENDING.value)}' AND secondary_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.core_status = '{str(CollectionState.SUCCESS.value)}' + AND augur_operations.collection_status.secondary_data_last_collected IS NULL + AND secondary_status!='{str(CollectionState.COLLECTING.value)}' + """ + else: + condition_concat_string = f""" + secondary_status='Success' AND secondary_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.secondary_data_last_collected IS NOT NULL + AND augur_operations.collection_status.core_status = '{str(CollectionState.SUCCESS.value)}' + AND secondary_status!='{str(CollectionState.COLLECTING.value)}' + AND secondary_data_last_collected <= NOW() - INTERVAL '{days_until_collect_again} DAYS' + """ + + return condition_concat_string + +def get_required_conditions_for_facade_repos(allow_collected_before = False, days_until_collect_again = 1): + + if not allow_collected_before: + condition_concat_string = f""" + facade_status='{str(CollectionState.UPDATE.value)}' AND facade_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.facade_status != '{str(CollectionState.PENDING.value)}' + AND augur_operations.collection_status.facade_status != '{str(CollectionState.FAILED_CLONE.value)}' + AND augur_operations.collection_status.facade_status != '{str(CollectionState.INITIALIZING.value)}' + AND augur_operations.collection_status.facade_data_last_collected IS NULL + AND facade_status!='{str(CollectionState.COLLECTING.value)}' + """ + else: + condition_concat_string = f""" + facade_status='Success' AND facade_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.facade_data_last_collected IS NOT NULL + AND augur_operations.collection_status.facade_status != '{str(CollectionState.PENDING.value)}' + AND augur_operations.collection_status.facade_status != '{str(CollectionState.FAILED_CLONE.value)}' + AND augur_operations.collection_status.facade_status != '{str(CollectionState.INITIALIZING.value)}' + AND facade_status!='{str(CollectionState.COLLECTING.value)}' + AND facade_data_last_collected <= NOW() - INTERVAL '{days_until_collect_again} DAYS' + """ + + return condition_concat_string + +def get_required_conditions_for_ml_repos(allow_collected_before = False, days_until_collect_again = 1): + + if not allow_collected_before: + condition_concat_string = f""" + ml_status='{str(CollectionState.PENDING.value)}' AND ml_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.secondary_status = '{str(CollectionState.SUCCESS.value)}' + AND augur_operations.collection_status.ml_data_last_collected IS NULL + AND ml_status!='{str(CollectionState.COLLECTING.value)}' + """ + else: + condition_concat_string = f""" + ml_status='Success' AND ml_status!='{str(CollectionState.ERROR.value)}' + AND augur_operations.collection_status.ml_data_last_collected IS NOT NULL + AND ml_status!='{str(CollectionState.COLLECTING.value)}' + AND ml_data_last_collected <= NOW() - INTERVAL '{days_until_collect_again} DAYS' + """ + + return condition_concat_string + + + +class CollectionRequest: + def __init__(self,name,phases,max_repo = 10,days_until_collect_again = 1): + self.name = name + self.phases = phases + self.max_repo = max_repo + self.days_until_collect_again = days_until_collect_again + self.new_status = CollectionState.PENDING.value + self.repo_list = [] + + self.status_column = f"{name}_status" + + + if name == "facade": + self.new_status = CollectionState.UPDATE.value + + def get_active_repo_count(self,session): + return len(session.query(CollectionStatus).filter(getattr(CollectionStatus,f"{self.name}_status" ) == CollectionState.COLLECTING.value).all()) + + #Get repo urls based on passed in info. + def get_valid_repos(self,session): + #getattr(CollectionStatus,f"{hook}_status" ) represents the status of the given hook + #Get the count of repos that are currently running this collection hook + #status_column = f"{hook}_status" + active_repo_count = self.get_active_repo_count(session) + + #Will always disallow errored repos and repos that are already collecting + + #The maximum amount of repos to schedule is affected by the existing repos running tasks + limit = self.max_repo-active_repo_count + + #Extract the user id from the randomized list and split into four chunks + split_user_list = split_random_users_list(session,f"{self.name}_status",self.new_status) + + session.logger.info(f"User_list: {split_user_list}") + + #Iterate through each fourth of the users fetched + for quarter_list in split_user_list: + if limit <= 0: + return + + collection_list = get_valid_repos_for_users(session,limit,tuple(quarter_list),hook=self.name, days_to_wait_until_next_collection=self.days_until_collect_again) + + self.repo_list.extend(collection_list) + #Update limit with amount of repos started + limit -= len(collection_list) + + #Now start old repos if there is space to do so. + if limit <= 0: + return + + + user_list = get_list_of_all_users(session) + random.shuffle(user_list) + + #Extract the user id from the randomized list and split into four chunks + split_user_list = split_list_into_chunks([row[0] for row in user_list], 4) + + for quarter_list in split_user_list: + + #Break out if limit has been reached + if limit <= 0: + return + + #only start repos older than the specified amount of days + #Query a set of valid repositories sorted by weight, also making sure that the repos aren't new or errored + #Order by the relevant weight for the collection hook + collection_list = get_valid_repos_for_users(session,limit,tuple(quarter_list),allow_old_repos=True,hook=self.name, days_to_wait_until_next_collection=self.days_until_collect_again) + + self.repo_list.extend(collection_list) + limit -= len(collection_list) + + def get_enabled_phase_names_from_config(logger, session): config = AugurConfig(logger, session) @@ -372,6 +539,7 @@ class AugurTaskRoutine: """ class to keep track of various groups of collection tasks for a group of repos. Simple version to just schedule a number of repos not worrying about repo weight. + The repo weight matters when constructing the CollectionRequest through get_valid_repos Used when scheduling repo clones/updates. @@ -382,28 +550,23 @@ class to keep track of various groups of collection tasks for a group of repos. collection_hook (str): String determining the attributes to update when collection for a repo starts. e.g. core session: Database session to use """ - def __init__(self,session,repos: List[str]=[],collection_phases: List=[],collection_hook: str="core"): + def __init__(self,session,collection_hooks): self.logger = session.logger - #self.session = TaskSession(self.logger) - self.collection_phases = collection_phases - #self.disabled_collection_tasks = disabled_collection_tasks - self.repos = repos - self.session = session - self.collection_hook = collection_hook - #Also have attribute to determine what to set repos' status as when they are run - self.start_state = CollectionState.COLLECTING.value + self.collection_hooks = collection_hooks + self.session = session - def update_status_and_id(self,repo_git, task_id): + def update_status_and_id(self,repo_git, task_id, name): repo = self.session.query(Repo).filter(Repo.repo_git == repo_git).one() #Set status in database to collecting repoStatus = repo.collection_status[0] # - setattr(repoStatus,f"{self.collection_hook}_task_id",task_id) - setattr(repoStatus,f"{self.collection_hook}_status",self.start_state) + setattr(repoStatus,f"{name}_task_id",task_id) + setattr(repoStatus,f"{name}_status", CollectionState.COLLECTING.value) self.session.commit() + def start_data_collection(self): """Start all task items and return. @@ -415,48 +578,64 @@ def start_data_collection(self): #Send messages starts each repo and yields its running info #to concurrently update the correct field in the database. - for repo_git, task_id in self.send_messages(): - self.update_status_and_id(repo_git,task_id) + for repo_git, task_id, hook_name in self.send_messages(): + self.update_status_and_id(repo_git,task_id,hook_name) def send_messages(self): augur_collection_list = [] - for repo_git in self.repos: - - #repo = self.session.query(Repo).filter(Repo.repo_git == repo_git).one() - #repo_id = repo.repo_id - - augur_collection_sequence = [] - for job in self.collection_phases: - #Add the phase to the sequence in order as a celery task. - #The preliminary task creates the larger task chain - augur_collection_sequence.append(job(repo_git)) - - #augur_collection_sequence.append(core_task_success_util.si(repo_git)) - #Link all phases in a chain and send to celery - augur_collection_chain = chain(*augur_collection_sequence) - task_id = augur_collection_chain.apply_async().task_id + for col_hook in self.collection_hooks: + + self.logger.info(f"Starting collection on {len(col_hook.repo_list)} {col_hook.name} repos") + + for repo_git in col_hook.repo_list: + + #repo = self.session.query(Repo).filter(Repo.repo_git == repo_git).one() + #repo_id = repo.repo_id + + augur_collection_sequence = [] + for job in col_hook.phases: + #Add the phase to the sequence in order as a celery task. + #The preliminary task creates the larger task chain + augur_collection_sequence.append(job(repo_git)) + + #augur_collection_sequence.append(core_task_success_util.si(repo_git)) + #Link all phases in a chain and send to celery + augur_collection_chain = chain(*augur_collection_sequence) + task_id = augur_collection_chain.apply_async().task_id + + self.logger.info(f"Setting repo {col_hook.name} status to collecting for repo: {repo_git}") + + #yield the value of the task_id to the calling method so that the proper collectionStatus field can be updated + yield repo_git, task_id, col_hook.name + +#def start_block_of_repos(logger,session,repo_git_identifiers,phases,repos_type,hook="core"): +# +# logger.info(f"Starting collection on {len(repo_git_identifiers)} {repos_type} {hook} repos") +# if len(repo_git_identifiers) == 0: +# return 0 +# +# logger.info(f"Collection starting for {hook}: {tuple(repo_git_identifiers)}") +# +# routine = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=phases,collection_hook=hook) +# +# routine.start_data_collection() +# +# return len(repo_git_identifiers) + +def get_valid_repos_for_users(session,limit,users,allow_old_repos = False,hook="core",days_to_wait_until_next_collection = 1): + + condition_string = "1" + + if hook == "core": + condition_string = get_required_conditions_for_core_repos(allow_collected_before=allow_old_repos,days_until_collect_again= days_to_wait_until_next_collection) + elif hook == "secondary": + condition_string = get_required_conditions_for_secondary_repos(allow_collected_before=allow_old_repos,days_until_collect_again = days_to_wait_until_next_collection) + elif hook == "facade": + condition_string = get_required_conditions_for_facade_repos(allow_collected_before=allow_old_repos,days_until_collect_again = days_to_wait_until_next_collection) + elif hook == "ml": + condition_string = get_required_conditions_for_ml_repos(allow_collected_before=allow_old_repos,days_until_collect_again = days_to_wait_until_next_collection) - self.logger.info(f"Setting repo {self.collection_hook} status to collecting for repo: {repo_git}") - - #yield the value of the task_id to the calling method so that the proper collectionStatus field can be updated - yield repo_git, task_id - -def start_block_of_repos(logger,session,repo_git_identifiers,phases,repos_type,hook="core"): - - logger.info(f"Starting collection on {len(repo_git_identifiers)} {repos_type} {hook} repos") - if len(repo_git_identifiers) == 0: - return 0 - - logger.info(f"Collection starting for {hook}: {tuple(repo_git_identifiers)}") - - routine = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=phases,collection_hook=hook) - - routine.start_data_collection() - - return len(repo_git_identifiers) - -def start_repos_from_given_group_of_users(session,limit,users,condition_string,phases,hook="core",repos_type="new"): #Query a set of valid repositories sorted by weight, also making sure that the repos are new #Order by the relevant weight for the collection hook repo_query = s.sql.text(f""" @@ -477,29 +656,15 @@ def start_repos_from_given_group_of_users(session,limit,users,condition_string,p session.logger.info(f"valid repo git list: {tuple(valid_repo_git_list)}") #start repos for new primary collection hook - collection_size = start_block_of_repos( - session.logger, session, - valid_repo_git_list, - phases, repos_type=repos_type, hook=hook - ) - - return collection_size - -""" - Generalized function for starting a phase of tasks for a given collection hook with options to add restrictive conditions -""" -def start_repos_by_user(session, max_repo,phase_list, days_until_collect_again = 1, hook="core",new_status=CollectionState.PENDING.value,additional_conditions=None): + #collection_size = start_block_of_repos( + # session.logger, session, + # valid_repo_git_list, + # phases, repos_type=repos_type, hook=hook + #) - #getattr(CollectionStatus,f"{hook}_status" ) represents the status of the given hook - #Get the count of repos that are currently running this collection hook - status_column = f"{hook}_status" - active_repo_count = len(session.query(CollectionStatus).filter(getattr(CollectionStatus,status_column ) == CollectionState.COLLECTING.value).all()) - - #Will always disallow errored repos and repos that are already collecting - - #The maximum amount of repos to schedule is affected by the existing repos running tasks - limit = max_repo-active_repo_count + return valid_repo_git_list +def split_random_users_list(session,status_col, status_new): #Split all users that have new repos into four lists and randomize order query = s.sql.text(f""" SELECT @@ -508,7 +673,7 @@ def start_repos_by_user(session, max_repo,phase_list, days_until_collect_again = JOIN augur_operations.user_repos ON augur_operations.user_groups.group_id = augur_operations.user_repos.group_id JOIN augur_data.repo ON augur_operations.user_repos.repo_id = augur_data.repo.repo_id JOIN augur_operations.collection_status ON augur_operations.user_repos.repo_id = augur_operations.collection_status.repo_id - WHERE {status_column}='{str(new_status)}' + WHERE {status_col}='{str(status_new)}' GROUP BY user_id """) @@ -518,55 +683,5 @@ def start_repos_by_user(session, max_repo,phase_list, days_until_collect_again = #Extract the user id from the randomized list and split into four chunks split_user_list = split_list_into_chunks([row[0] for row in user_list], 4) - session.logger.info(f"User_list: {split_user_list}") - - #Iterate through each fourth of the users fetched - for quarter_list in split_user_list: - if limit <= 0: - return - - condition_concat_string = f""" - {status_column}='{str(new_status)}' AND {status_column}!='{str(CollectionState.ERROR.value)}' - AND {additional_conditions if additional_conditions else 'TRUE'} AND augur_operations.collection_status.{hook}_data_last_collected IS NULL - AND {status_column}!='{str(CollectionState.COLLECTING.value)}' - """ - - collection_size = start_repos_from_given_group_of_users(session,limit,tuple(quarter_list),condition_concat_string,phase_list,hook=hook) - #Update limit with amount of repos started - limit -= collection_size - - #Now start old repos if there is space to do so. - if limit <= 0: - return - - #Get a list of all users. - query = s.sql.text(""" - SELECT - user_id - FROM augur_operations.users - """) - - user_list = session.execute_sql(query).fetchall() - random.shuffle(user_list) - - #Extract the user id from the randomized list and split into four chunks - split_user_list = split_list_into_chunks([row[0] for row in user_list], 4) - - for quarter_list in split_user_list: - - #Break out if limit has been reached - if limit <= 0: - return - - condition_concat_string = f""" - {status_column}='Success' AND {status_column}!='{str(CollectionState.ERROR.value)}' - AND {additional_conditions if additional_conditions else 'TRUE'} AND augur_operations.collection_status.{hook}_data_last_collected IS NOT NULL - AND {status_column}!='{str(CollectionState.COLLECTING.value)}' AND {hook}_data_last_collected <= NOW() - INTERVAL '{days_until_collect_again} DAYS' - """ - - #only start repos older than the specified amount of days - #Query a set of valid repositories sorted by weight, also making sure that the repos aren't new or errored - #Order by the relevant weight for the collection hook - collection_size = start_repos_from_given_group_of_users(session,limit,tuple(quarter_list),condition_concat_string,phase_list,hook=hook,repos_type="old") + return split_user_list - limit -= collection_size \ No newline at end of file diff --git a/augur/templates/settings.j2 b/augur/templates/settings.j2 index cefa4ac587..31230897c4 100644 --- a/augur/templates/settings.j2 +++ b/augur/templates/settings.j2 @@ -132,7 +132,7 @@ {"title" : "Favorite"}, {"title" : ""}] -%} -