diff --git a/invenio_vcs/contrib/github.py b/invenio_vcs/contrib/github.py new file mode 100644 index 00000000..97be2ced --- /dev/null +++ b/invenio_vcs/contrib/github.py @@ -0,0 +1,446 @@ +# -*- coding: utf-8 -*- +# This file is part of Invenio. +# Copyright (C) 2025 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. +"""Contrib provider implementation for GitHub.""" + +import json + +import dateutil +import github3 +import requests +from flask import current_app +from github3.repos import ShortRepository +from invenio_i18n import gettext as _ +from invenio_oauthclient.contrib.github import GitHubOAuthSettingsHelper +from werkzeug.utils import cached_property + +from invenio_vcs.errors import ReleaseZipballFetchError, VCSTokenNotFound +from invenio_vcs.generic_models import ( + GenericContributor, + GenericOwner, + GenericOwnerType, + GenericRelease, + GenericRepository, + GenericUser, + GenericWebhook, +) +from invenio_vcs.providers import ( + RepositoryServiceProvider, + RepositoryServiceProviderFactory, +) + + +class GitHubProviderFactory(RepositoryServiceProviderFactory): + """Contrib implementation factory for VCS.""" + + def __init__( + self, + base_url, + webhook_receiver_url, + id="github", + name="GitHub", + description="Automatically archive your repositories", + credentials_key="GITHUB_APP_CREDENTIALS", + config={}, + ): + """Initialise with GitHub-specific defaults.""" + super().__init__( + GitHubProvider, + base_url=base_url, + webhook_receiver_url=webhook_receiver_url, + id=id, + name=name, + description=description, + credentials_key=credentials_key, + icon="github", + repository_name="repository", + repository_name_plural="repositories", + ) + + self._config = dict() + self._config.update( + shared_secret="", + insecure_ssl=False, + ) + self._config.update(config) + + @property + def remote_config(self): + """ + Use the existing GitHub OAuth client implementation in invenio-oauthclient with some minor modifications. + + We are keeping this client in invenio-oauthclient for backwards-compatibility and because some installations + may already be using GitHub OAuth as a login method without the full integration. + """ + request_token_params = { + # General `repo` scope is required for reading collaborators + # https://docs.github.com/en/apps/oauth-apps/building-oauth-apps/scopes-for-oauth-apps + "scope": "read:user,user:email,admin:repo_hook,read:org,repo" + } + + helper = GitHubOAuthSettingsHelper( + title=self.name, + icon="fa fa-{}".format(self.icon), + description=self.description, + base_url=self.base_url, + app_key=self.credentials_key, + ) + github_app = helper.remote_app + github_app["disconnect_handler"] = self.oauth_handlers.disconnect_handler + github_app["signup_handler"][ + "setup" + ] = self.oauth_handlers.account_setup_handler + github_app["params"]["request_token_params"] = request_token_params + + return github_app + + @property + def config(self): + """Returns the GitHub-specific config dict.""" + return self._config + + def webhook_is_create_release_event(self, event_payload): + """Three possible event types can correspond to a create release event.""" + action = event_payload.get("action") + is_draft_release = event_payload.get("release", {}).get("draft") + + # Draft releases do not create releases on invenio + is_create_release_event = ( + action in ("published", "released", "created") and not is_draft_release + ) + return is_create_release_event + + @staticmethod + def _extract_license(gh_repo_dict): + """ + The GitHub API returns the `license` as a simple key of the ShortRepository. + + But for some reason github3py does not include a mapping for this. + So the only way to access it without making an additional request is to convert + the repo to a dict. + """ + license_obj = gh_repo_dict.get("license") + if license_obj is not None: + spdx = license_obj["spdx_id"] + if spdx == "NOASSERTION": + # For 'other' type of licenses, Github sets the spdx_id to NOASSERTION + return None + return spdx + return None + + def webhook_event_to_generic(self, event_payload): + """Convert the webhook payload to a generic release and repository without making additional API calls and using just the payload data.""" + release_published_at = event_payload["release"].get("published_at") + if release_published_at is not None: + release_published_at = dateutil.parser.parse(release_published_at) + + release = GenericRelease( + id=str(event_payload["release"]["id"]), + name=event_payload["release"].get("name"), + tag_name=event_payload["release"]["tag_name"], + tarball_url=event_payload["release"].get("tarball_url"), + zipball_url=event_payload["release"].get("zipball_url"), + body=event_payload["release"].get("body"), + created_at=dateutil.parser.parse(event_payload["release"]["created_at"]), + published_at=release_published_at, + ) + + license_spdx = GitHubProviderFactory._extract_license( + event_payload["repository"] + ) + + repo = GenericRepository( + id=str(event_payload["repository"]["id"]), + full_name=event_payload["repository"]["full_name"], + description=event_payload["repository"].get("description"), + default_branch=event_payload["repository"]["default_branch"], + license_spdx=license_spdx, + ) + + return (release, repo) + + def url_for_repository(self, repository_name: str) -> str: + """URL to view a repository.""" + return "{}/{}".format(self.base_url, repository_name) + + def url_for_release( + self, repository_name: str, release_id: str, release_tag: str + ) -> str: + """URL to view a release.""" + return "{}/{}/releases/tag/{}".format( + self.base_url, repository_name, release_tag + ) + + def url_for_tag(self, repository_name: str, tag_name: str): + """URL to view a tag.""" + return "{}/{}/tree/{}".format(self.base_url, repository_name, tag_name) + + def url_for_new_release(self, repository_name: str): + """URL for creating a new release.""" + return "{}/{}/releases/new".format(self.base_url, repository_name) + + def url_for_new_file(self, repository_name: str, branch_name: str, file_name: str): + """URL for creating a new file in the web editor.""" + return "{}/{}/new/{}?filename={}".format( + self.base_url, repository_name, branch_name, file_name + ) + + def url_for_new_repo(self) -> str: + """URL for creating a new repository.""" + return "{}/new".format(self.base_url) + + +class GitHubProvider(RepositoryServiceProvider): + """Contrib user-specific implementation for GitHub.""" + + @cached_property + def _gh(self): + """Initialise the GitHub API object (either for public or enterprise self-hosted GitHub).""" + if self.remote_token is None: + raise VCSTokenNotFound + + _gh = None + if self.factory.base_url == "https://github.com": + _gh = github3.login(token=self.remote_token.access_token) + else: + _gh = github3.enterprise_login( + url=self.factory.base_url, token=self.remote_token.access_token + ) + + # login can return None if it's unsuccessful. + assert _gh is not None + return _gh + + def list_repositories(self): + """List the user's top repos.""" + repos: dict[str, GenericRepository] = {} + for repo in self._gh.repositories(): + assert isinstance(repo, ShortRepository) + + if repo.permissions["admin"]: + repos[str(repo.id)] = GenericRepository( + id=str(repo.id), + full_name=repo.full_name, + description=repo.description, + default_branch=repo.default_branch, + license_spdx=GitHubProviderFactory._extract_license(repo.as_dict()), + ) + + return repos + + def list_repository_webhooks(self, repository_id): + """List a repo's webhooks.""" + assert repository_id.isdigit() + repo = self._gh.repository_with_id(int(repository_id)) + if repo is None: + return None + + hooks = [] + for hook in repo.hooks(): + hooks.append( + GenericWebhook( + id=str(hook.id), + repository_id=repository_id, + url=hook.config.get("url"), + ) + ) + return hooks + + def list_repository_user_ids(self, repository_id: str): + """List the admin collaborator User IDs of a repository.""" + assert repository_id.isdigit() + repo = self._gh.repository_with_id(int(repository_id)) + if repo is None: + return None + + user_ids: list[str] = [] + for collaborator in repo.collaborators(): + if not collaborator.permissions["admin"]: + continue + + user_ids.append(str(collaborator.id)) + + return user_ids + + def get_repository(self, repository_id): + """Get a single repository.""" + assert repository_id.isdigit() + + repo = self._gh.repository_with_id(int(repository_id)) + if repo is None: + return None + + return GenericRepository( + id=str(repo.id), + full_name=repo.full_name, + description=repo.description, + default_branch=repo.default_branch, + license_spdx=GitHubProviderFactory._extract_license(repo.as_dict()), + ) + + def create_webhook(self, repository_id): + """Create a webhook using some custom GitHub-specific config options.""" + assert repository_id.isdigit() + + hook_config = dict( + url=self.webhook_url, + content_type="json", + secret=self.factory.config["shared_secret"], + insecure_ssl="1" if self.factory.config["insecure_ssl"] else "0", + ) + + repo = self._gh.repository_with_id(int(repository_id)) + if repo is None: + return None + + hooks = (h for h in repo.hooks() if h.config.get("url", "") == self.webhook_url) + hook = next(hooks, None) + + if not hook: + hook = repo.create_hook("web", hook_config, events=["release"]) + else: + hook.edit(config=hook_config, events=["release"]) + + return str(hook.id) + + def delete_webhook(self, repository_id, hook_id=None): + """Delete a webhook.""" + assert repository_id.isdigit() + + repo = self._gh.repository_with_id(int(repository_id)) + if repo is None: + return False + + if hook_id is not None: + hook = repo.hook(hook_id) + else: + hooks = ( + h + for h in repo.hooks() + if self.is_valid_webhook(h.config.get("url", "")) + ) + hook = next(hooks, None) + + if not hook or hook.delete(): + return True + return False + + def get_own_user(self): + """Get the currently logged in user.""" + user = self._gh.me() + if user is not None: + return GenericUser(str(user.id), user.login, user.name) + + return None + + def list_repository_contributors(self, repository_id, max): + """List and sort (by contribution count) the contributors of a repo.""" + assert repository_id.isdigit() + + repo = self._gh.repository_with_id(int(repository_id)) + if repo is None: + return None + + contributors = [] + for c in repo.contributors(number=max): + contributions_count = c.contributions_count + c = c.refresh() + contributors.append( + GenericContributor( + id=str(c.id), + username=c.login, + display_name=c.name, + contributions_count=contributions_count, + company=c.company, + ) + ) + + return contributors + + def get_repository_owner(self, repository_id): + """Get the owner of a repo.""" + assert repository_id.isdigit() + + repo = self._gh.repository_with_id(int(repository_id)) + if repo is None: + return None + + owner_type = ( + GenericOwnerType.Person + if repo.owner.type == "User" + else GenericOwnerType.Organization + ) + + return GenericOwner( + id=str(repo.owner.id), + path_name=repo.owner.login, + type=owner_type, + # GitHub API does not return the display name for the owner + ) + + def resolve_release_zipball_url(self, release_zipball_url): + """Handle some GitHub-specific quirks related to URL authentication.""" + url = release_zipball_url + + # Execute a HEAD request to the zipball url to test if it is accessible. + response = self._gh.session.head(url, allow_redirects=True) + + # In case where there is a tag and branch with the same name, we might get back + # a "300 Multiple Choices" response, which requires fetching an "alternate" + # link. + if response.status_code == 300: + alternate_url = response.links.get("alternate", {}).get("url") + if alternate_url: + url = alternate_url # Use the alternate URL + response = self._gh.session.head(url, allow_redirects=True) + + # Another edge-case, is when the access token we have does not have the + # scopes/permissions to access public links. In that rare case we fallback to a + # non-authenticated request. + if response.status_code == 404: + current_app.logger.warning( + "GitHub zipball URL {url} not found, trying unauthenticated request.", + extra={"url": response.url}, + ) + response = requests.head(url, allow_redirects=True) + # If this response is successful we want to use the finally resolved URL to + # fetch the ZIP from. + if response.status_code == 200: + return response.url + + if response.status_code != 200: + raise ReleaseZipballFetchError() + + return response.url + + def fetch_release_zipball(self, release_zipball_url, timeout): + """Fetch a specific release artifact file using a raw authenticated API request.""" + with self._gh.session.get( + release_zipball_url, stream=True, timeout=timeout + ) as resp: + yield resp.raw + + def retrieve_remote_file(self, repository_id, ref_name, file_name): + """Retrieve a specific file from the repo via the API.""" + assert repository_id.isdigit() + + try: + resp = self._gh.repository_with_id(int(repository_id)).file_contents( + path=file_name, ref=ref_name + ) + return resp.decoded + except github3.exceptions.NotFoundError: + return None + + def revoke_token(self, access_token): + """Delete the specified access token using a custom API request.""" + client_id, client_secret = self._gh.session.retrieve_client_credentials() + url = self._gh._build_url("applications", str(client_id), "token") + with self._gh.session.temporary_basic_auth(client_id, client_secret): + response = self._gh._delete( + url, data=json.dumps({"access_token": access_token}) + ) + return response diff --git a/invenio_vcs/contrib/gitlab.py b/invenio_vcs/contrib/gitlab.py new file mode 100644 index 00000000..40231898 --- /dev/null +++ b/invenio_vcs/contrib/gitlab.py @@ -0,0 +1,452 @@ +# -*- coding: utf-8 -*- +# This file is part of Invenio. +# Copyright (C) 2025 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. +# +# Some of the code in this file was taken from https://codebase.helmholtz.cloud/rodare/invenio-gitlab +# and relicensed under MIT with permission from the authors. +"""Contrib provider implementation for GitLab.""" + +from __future__ import annotations + +from typing import Any + +import dateutil +import gitlab +import gitlab.const +import requests +from flask import current_app +from invenio_oauthclient import current_oauthclient +from werkzeug.utils import cached_property + +from invenio_vcs.errors import VCSTokenNotFound +from invenio_vcs.generic_models import ( + GenericContributor, + GenericOwner, + GenericOwnerType, + GenericRelease, + GenericRepository, + GenericUser, + GenericWebhook, +) +from invenio_vcs.providers import ( + RepositoryServiceProvider, + RepositoryServiceProviderFactory, +) + + +def _gl_response_error_handler(f): + """Handle common error codes returned by the API.""" + + def inner_function(*args, **kwargs): + try: + return f(*args, **kwargs) + except gitlab.GitlabGetError as e: + if e.response_code == 404: + return None + else: + raise e + except gitlab.GitlabCreateError as e: + if e.response_code == 404: + return None + else: + raise e + + return inner_function + + +class GitLabProviderFactory(RepositoryServiceProviderFactory): + """Contrib implementation factory for GitLab.""" + + def __init__( + self, + base_url: str, + webhook_receiver_url: str, + id="gitlab", + name="GitLab", + description="Automatically archive your repositories", + credentials_key="GITLAB_APP_CREDENTIALS", + config={}, + ): + """Initialise with GitLab-specific defaults.""" + super().__init__( + GitLabProvider, + base_url=base_url, + webhook_receiver_url=webhook_receiver_url, + id=id, + name=name, + description=description, + credentials_key=credentials_key, + icon="gitlab", + repository_name="project", + repository_name_plural="projects", + ) + self._config = dict() + self._config.update(shared_validation_token="") + self._config.update(config) + + def _account_info_handler(self, remote, resp: dict): + """Helper for the OAuth client.""" + gl = gitlab.Gitlab( + self.base_url, + oauth_token=resp["access_token"], + ) + gl.auth() + user_attrs = gl.user.attributes + handlers = current_oauthclient.signup_handlers[remote.name] + # Pass through `info_serializer` which converts the user to an Invenio user (and performs additional validation). + return handlers["info_serializer"](resp, user_attrs) + + def _account_info_serializer(self, remote, resp, user_info, **kwargs): + """Helper for the OAuth client.""" + return dict( + user=dict( + email=user_info["email"], + profile=dict( + username=user_info["username"], + full_name=user_info["name"], + ), + ), + external_id=str(user_info["id"]), + external_method="gitlab", + ) + + @property + def remote_config(self): + """Custom OAuth client config for GitLab.""" + return dict( + title=self.name, + description=self.description, + icon="fa fa-{}".format(self.icon), + authorized_handler="invenio_oauthclient.handlers:authorized_signup_handler", + disconnect_handler=self.oauth_handlers.disconnect_handler, + signup_handler=dict( + info=self._account_info_handler, + info_serializer=self._account_info_serializer, + setup=self.oauth_handlers.account_setup_handler, + view="invenio_oauthclient.handlers:signup_handler", + ), + params=dict( + base_url="{}/api/v4/".format(self.base_url), + request_token_url=None, + access_token_url="{}/oauth/token".format(self.base_url), + access_token_method="POST", + authorize_url="{}/oauth/authorize".format(self.base_url), + app_key=self.credentials_key, + ), + ) + + @property + def config(self): + """Returns the GitLab-specific config dict.""" + return self._config + + def url_for_repository(self, repository_name: str) -> str: + """URL for viewing a repository.""" + return "{}/{}".format(self.base_url, repository_name) + + def url_for_release( + self, repository_name: str, release_id: str, release_tag: str + ) -> str: + """URL for viewing a release.""" + return "{}/{}/-/releases/{}".format(self.base_url, repository_name, release_tag) + + def url_for_tag(self, repository_name, tag_name) -> str: + """The URL for viewing a tag.""" + return "{}/{}/-/tags/{}".format(self.base_url, repository_name, tag_name) + + def url_for_new_file(self, repository_name, branch_name, file_name) -> str: + """The URL for creating a new file in the web editor.""" + return "{}/{}/-/new/{}/?file_name={}".format( + self.base_url, repository_name, branch_name, file_name + ) + + def url_for_new_release(self, repository_name) -> str: + """The URL for creating a new release.""" + return "{}/{}/-/releases/new".format(self.base_url, repository_name) + + def url_for_new_repo(self) -> str: + """The URL for creating a new repository.""" + return "{}/projects/new".format(self.base_url) + + def webhook_is_create_release_event(self, event_payload: dict[str, Any]): + """Identify if the webhook payload is one we want to use.""" + # https://archives.docs.gitlab.com/17.11/user/project/integrations/webhook_events/#release-events + + # GitLab does not have unpublished/draft releases the way GitHub does. However, it does have + # "upcoming releases" (https://archives.docs.gitlab.com/17.11/api/releases/#upcoming-releases) + # meaning ones with a release date in the future. + # TODO: do we want to return False for upcoming releases? + + object_kind = event_payload.get("object_kind") + action = event_payload.get("action") + + # existing `invenio-gitlab` instead uses the `tag_push` event which is more general than the `release` + # event (https://codebase.helmholtz.cloud/rodare/invenio-gitlab/-/blob/d66181697b8a34383b333306b559d13cd6fa829a/invenio_gitlab/receivers.py#L41). + # TODO: I recommend using the `release` event as this is a more 'formal' manual action and better corresponds to the release event in GitHub. Is this okay? + return object_kind == "release" and action == "create" + + def webhook_event_to_generic( + self, event_payload: dict[str, Any] + ) -> tuple[GenericRelease, GenericRepository]: + """Convert a webhook event.""" + # https://archives.docs.gitlab.com/18.0/user/project/integrations/webhook_events/#release-events + # https://archives.docs.gitlab.com/17.11/user/project/integrations/webhook_events/#release-events + # https://archives.docs.gitlab.com/16.11/ee/user/project/integrations/webhook_events.html#release-events + + zipball_url: str | None = None + tarball_url: str | None = None + + for source in event_payload["assets"]["sources"]: + format = source["format"] + url = source["url"] + if format == "zip": + zipball_url = url + elif format == "tar": + tarball_url = url + + release = GenericRelease( + id=str(event_payload["id"]), + tag_name=event_payload["tag"], + name=event_payload["name"], + body=event_payload["description"], + zipball_url=zipball_url, + tarball_url=tarball_url, + created_at=dateutil.parser.parse(event_payload["created_at"]), + published_at=dateutil.parser.parse(event_payload["released_at"]), + ) + + repo = GitLabProviderFactory._proj_to_generic(event_payload["project"]) + return (release, repo) + + @staticmethod + def _extract_license(proj_attrs: dict[str, Any]): + """Extract the SPDX ID from the license of a dict-ified project.""" + license_obj = proj_attrs.get("license") + if license_obj is not None: + return license_obj["key"].upper() + return None + + @staticmethod + def _proj_to_generic(proj_attrs: dict[str, Any]): + """Convert a dict-ified project to a GenericRepository.""" + return GenericRepository( + id=str(proj_attrs["id"]), + full_name=proj_attrs["path_with_namespace"], + default_branch=proj_attrs["default_branch"], + description=proj_attrs["description"], + license_spdx=GitLabProviderFactory._extract_license(proj_attrs), + ) + + +class GitLabProvider(RepositoryServiceProvider): + """Contrib user-specific implementation for GitLab.""" + + @cached_property + def _gl(self): + """Construct the GitLab API client and make a test auth request (which populates essential data).""" + if self.remote_token is None: + raise VCSTokenNotFound + gl = gitlab.Gitlab( + self.factory.base_url, oauth_token=self.remote_token.access_token + ) + gl.auth() + return gl + + @_gl_response_error_handler + def list_repositories(self) -> dict[str, GenericRepository] | None: + """List all projects.""" + repos: dict[str, GenericRepository] = {} + for project in self._gl.projects.list( + iterator=True, + simple=False, + min_access_level=gitlab.const.MAINTAINER_ACCESS, + ): + repos[str(project.id)] = GenericRepository( + id=str(project.id), + full_name=project.path_with_namespace, + default_branch=project.default_branch, + description=project.description, + # TODO: license is not returned in the projects list (only when querying an individual project). + # This would be super slow. Do we really need license here? + license_spdx=None, + ) + return repos + + @_gl_response_error_handler + def get_repository(self, repository_id: str) -> GenericRepository | None: + """Get a single project.""" + assert repository_id.isdigit() + proj = self._gl.projects.get(int(repository_id)) + return GitLabProviderFactory._proj_to_generic(proj.asdict()) + + @_gl_response_error_handler + def list_repository_contributors( + self, repository_id: str, max: int + ) -> list[GenericContributor] | None: + """Get and enrich the contributor list as much as possible with the limited data returned by the API.""" + assert repository_id.isdigit() + proj = self._gl.projects.get(int(repository_id), lazy=True) + + contribs: list[GenericContributor] = [] + for index, contrib in enumerate( + proj.repository_contributors(iterator=True, order_by="commits", sort="desc") + ): + email = contrib["email"] + contrib_count = contrib["commits"] + + # repository_contributors returns a very small amount of data (not even the username) + # See here https://archives.docs.gitlab.com/17.11/api/repositories/#contributors + # So we try to enrich the data by searching for the user with the matching email. + # We will fail to find it if a) the user doesn't exist (e.g. repos imported/forked from somewhere else) + # or b) if the user has not made their email address public. + # By default, email addresses on GitLab are private, so this is unlikely to succeed. + matching_users = self._gl.users.list(search=email) + if len(matching_users) == 0: + contribs.append( + GenericContributor( + id=email, + username=email, + display_name=contrib["name"], + contributions_count=contrib_count, + ) + ) + else: + matching_user = matching_users[0] + contribs.append( + GenericContributor( + id=str(matching_user.id), + username=matching_user.username, + display_name=matching_user.name, + contributions_count=contrib_count, + ) + ) + + if index + 1 == max: + break + + return contribs + + @_gl_response_error_handler + def get_repository_owner(self, repository_id: str): + """Get the owner of the project.""" + assert repository_id.isdigit() + proj = self._gl.projects.get(int(repository_id)) + return GenericOwner( + id=str(proj.namespace["id"]), + path_name=proj.namespace["path"], + display_name=proj.namespace["name"], + type=( + GenericOwnerType.Person + if proj.namespace["kind"] == "user" + else GenericOwnerType.Organization + ), + ) + + @_gl_response_error_handler + def list_repository_webhooks( + self, repository_id: str + ) -> list[GenericWebhook] | None: + """Convert the repository's webhooks to a generic list.""" + assert repository_id.isdigit() + proj = self._gl.projects.get(int(repository_id), lazy=True) + hooks: list[GenericWebhook] = [] + for hook in proj.hooks.list(iterator=True): + hooks.append( + GenericWebhook( + id=str(hook.id), + repository_id=str(hook.project_id), + url=hook.url, + ) + ) + return hooks + + def list_repository_user_ids(self, repository_id: str) -> list[str] | None: + """See https://docs.gitlab.com/api/members/#list-all-members-of-a-group-or-project-including-inherited-and-invited-members.""" + user_ids: list[str] = [] + for member in self._gl.projects.get( + int(repository_id), lazy=True + ).members_all.list(iterator=True): + if member.access_level >= gitlab.const.MAINTAINER_ACCESS: + user_ids.append(str(member.id)) + return user_ids + + @_gl_response_error_handler + def create_webhook(self, repository_id: str) -> str | None: + """Create a webhook with a metadata description to avoid confusion.""" + assert repository_id.isdigit() + proj = self._gl.projects.get(int(repository_id), lazy=True) + + hook_data = { + "url": self.webhook_url, + "token": self.factory.config.get("shared_validation_token"), + "releases_events": True, + "description": "Managed by {}".format( + current_app.config.get("THEME_SITENAME", "Invenio") + ), + } + + resp = proj.hooks.create(hook_data) + return str(resp.id) + + @_gl_response_error_handler + def delete_webhook(self, repository_id: str, hook_id=None) -> bool: + """Delete the hook from the project if it exists.""" + assert repository_id.isdigit() + if hook_id is not None: + assert hook_id.isdigit() + + proj = self._gl.projects.get(int(repository_id), lazy=True) + if hook_id is None: + first_valid = self.get_first_valid_webhook(repository_id) + if first_valid is None: + return True + + proj.hooks.delete(int(first_valid.id)) + else: + proj.hooks.delete(int(hook_id)) + + return True + + @_gl_response_error_handler + def get_own_user(self) -> GenericUser | None: + """Return the currently signed in user.""" + user = self._gl.user + if user is None: + return None + return GenericUser( + id=str(user.id), + username=user.username, + display_name=user.name, + ) + + def resolve_release_zipball_url(self, release_zipball_url: str) -> str | None: + """No further resolution needs to be done for GitLab, so this is a no-op.""" + return release_zipball_url + + @_gl_response_error_handler + def fetch_release_zipball(self, release_zipball_url: str, timeout: int): + """Make a raw request with the API token to download the file.""" + resp = self._gl.http_get( + release_zipball_url, raw=True, streamed=True, timeout=timeout + ) + assert isinstance(resp, requests.Response) + with resp: + yield resp.raw + + @_gl_response_error_handler + def retrieve_remote_file(self, repository_id: str, ref_name: str, file_name: str): + """Download and decode the given file using the API.""" + assert repository_id.isdigit() + proj = self._gl.projects.get(int(repository_id), lazy=True) + try: + file = proj.files.get(file_path=file_name, ref=ref_name) + return file.decode() + except gitlab.GitlabGetError: + return None + + def revoke_token(self, access_token: str): + """TODO: GitLab implements RFC7009 for OAuth Token Revocation. We might need to do this via OAuth instead of the GitLab API.""" + pass diff --git a/invenio_vcs/generic_models.py b/invenio_vcs/generic_models.py new file mode 100644 index 00000000..39ab79b1 --- /dev/null +++ b/invenio_vcs/generic_models.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# This file is part of Invenio. +# Copyright (C) 2025 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. +""" +Generic dataclass models to represent the bare minimum necessary data from VCS providers. + +These are essentially the "lowest common factor" of +the otherwise large, complex, and heterogenous responses returned by APIs. + +These are used by higher-level calls to have a common set of data to +operate on. Provider implementations are responsible for converting API +responses into these generic classes. +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass +from datetime import datetime +from enum import Enum + +from invenio_vcs.models import Repository + + +@dataclass +class GenericWebhook: + """Generic webhook representation.""" + + id: str + repository_id: str + url: str + + +@dataclass +class GenericRepository: + """Generic repository representation.""" + + id: str + full_name: str + default_branch: str + description: str | None = None + license_spdx: str | None = None + + @staticmethod + def from_model(model: Repository): + """Create a GenericRepository from a Repository model.""" + return GenericRepository( + id=model.provider_id, + full_name=model.full_name, + default_branch=model.default_branch, + description=model.description, + license_spdx=model.license_spdx, + ) + + def to_model(self, model: Repository): + """Update a Repository model with this generic repository's data.""" + changed = False + for key, value in asdict(self).items(): + if key in ["id"]: + continue + + db_value = getattr(model, key) + if db_value != value: + changed = True + setattr(model, key, value) + + return changed + + +@dataclass +class GenericRelease: + """Generic release representation.""" + + id: str + tag_name: str + created_at: datetime + name: str | None = None + body: str | None = None + tarball_url: str | None = None + zipball_url: str | None = None + published_at: datetime | None = None + """Releases may be published at a different time than when they're created. + + For example, the publication to a package repository (e.g. NPM) may have taken place + a few minutes before the maintainers published the release on the VCS. The date may + even be in the future if a release is pre-scheduled (quite common on GitLab). + """ + + +@dataclass +class GenericUser: + """Generic user representation.""" + + id: str + username: str + display_name: str | None = None + + +class GenericOwnerType(Enum): + """Types of repository owners.""" + + Person = 1 + Organization = 2 + + +@dataclass +class GenericOwner: + """Generic repository owner representation.""" + + id: str + path_name: str + type: GenericOwnerType + display_name: str | None = None + + +@dataclass +class GenericContributor: + """Generic contributor representation.""" + + id: str + username: str + company: str | None = None + contributions_count: int | None = None + display_name: str | None = None diff --git a/invenio_vcs/providers.py b/invenio_vcs/providers.py new file mode 100644 index 00000000..f318d3ae --- /dev/null +++ b/invenio_vcs/providers.py @@ -0,0 +1,403 @@ +# -*- coding: utf-8 -*- +# This file is part of Invenio. +# Copyright (C) 2025 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Abstract classes to be implemented for each provider.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, Generator +from urllib.parse import urlparse + +from invenio_i18n import gettext as _ +from invenio_oauth2server.models import Token as ProviderToken +from invenio_oauthclient import current_oauthclient +from invenio_oauthclient.models import RemoteAccount, RemoteToken +from urllib3 import HTTPResponse +from werkzeug.local import LocalProxy +from werkzeug.utils import cached_property + +from invenio_vcs.errors import RemoteAccountDataNotSet +from invenio_vcs.generic_models import ( + GenericContributor, + GenericOwner, + GenericRelease, + GenericRepository, + GenericUser, + GenericWebhook, +) +from invenio_vcs.oauth.handlers import OAuthHandlers + + +class RepositoryServiceProviderFactory(ABC): + """ + A factory to create user-specific VCS providers. + + This class is instantiated once per instance, + usually in the `invenio.cfg` file. It contains general settings and methods that are impossible + to generalise and must be specified on a provider-specific level. + + All methods within this class (except the constructor) should be pure functions. + """ + + def __init__( + self, + provider: type["RepositoryServiceProvider"], + base_url: str, + webhook_receiver_url: str, + id: str, + name: str, + description: str, + icon: str, + credentials_key: str, + repository_name: str, + repository_name_plural: str, + ): + """Initialize the repository service provider factory.""" + self.provider = provider + self.base_url = base_url + self.webhook_receiver_url = webhook_receiver_url + self.id = id + self.name = name + self.description = description + self.icon = icon + self.credentials_key = credentials_key + self.repository_name = repository_name + self.repository_name_plural = repository_name_plural + + @property + @abstractmethod + def remote_config(self) -> dict[str, Any]: + """ + Returns a dictionary as the config of the OAuth remote app for this provider. + + The config of the app is usually based on the config variables provided + in the constructor. + """ + raise NotImplementedError + + @property + def oauth_handlers(self): + """OAuth client handlers (for invenio-oauthclient) specific to the provider.""" + return OAuthHandlers(self) + + @cached_property + def remote(self): + """The corresponding remote OAuth client app.""" + return LocalProxy(lambda: current_oauthclient.oauth.remote_apps[self.id]) + + @property + @abstractmethod + def config(self) -> dict: + """Returns a configuration dictionary with options that are specific to a given provider.""" + raise NotImplementedError + + @abstractmethod + def url_for_repository(self, repository_name: str) -> str: + """Generates the URL for the UI homepage of a repository.""" + raise NotImplementedError + + @abstractmethod + def url_for_release( + self, repository_name: str, release_id: str, release_tag: str + ) -> str: + """Generates the URL for the UI page of the details of a release.""" + raise NotImplementedError + + @abstractmethod + def url_for_tag(self, repository_name: str, tag_name: str) -> str: + """ + Generates the URL for the UI page showing the file tree for the latest commit with a given named tag. + + If the VCS does not implement a separate page for the release details and its tree, then `url_for_release` may + return the same value as `url_for_tag`. + """ + raise NotImplementedError + + @abstractmethod + def url_for_new_release(self, repository_name: str) -> str: + """Generates the URL for the UI page through which the user can create a new release for a specific repository.""" + raise NotImplementedError + + @abstractmethod + def url_for_new_file( + self, repository_name: str, branch_name: str, file_name: str + ) -> str: + """ + Generates the URL for the UI pages through which a new file with a specific name on a specific branch in a specific repository can be created. + + Usually, this allows the user to type the file contents directly or upload an existing file. + """ + raise NotImplementedError + + @abstractmethod + def url_for_new_repo(self) -> str: + """Generates the URL for the UI page through which a new repository can be created.""" + raise NotImplementedError + + @abstractmethod + def webhook_is_create_release_event(self, event_payload: dict[str, Any]): + """ + Returns whether the raw JSON payload of a webhook event is an event corresponding to the publication of a webhook. + + Returning False will end further processing of the event. + """ + raise NotImplementedError + + @abstractmethod + def webhook_event_to_generic( + self, event_payload: dict[str, Any] + ) -> tuple[GenericRelease, GenericRepository]: + """Returns the data of the release and repository as extracted from the raw JSON payload of a webhook event, in generic form.""" + raise NotImplementedError + + def for_user(self, user_id: int): + """Creates a provider for a specific user, taking the access token from the DB.""" + return self.provider(self, user_id) + + def for_access_token(self, user_id: int, access_token: str): + """Creates a provider for a specific user, taking the access token directly as an argument.""" + return self.provider(self, user_id, access_token=access_token) + + @property + def vocabulary(self): + """UI terminology (and icon) for the provider.""" + return { + "id": self.id, + "name": self.name, + "repository_name": self.repository_name, + "repository_name_plural": self.repository_name_plural, + "icon": self.icon, + } + + +class RepositoryServiceProvider(ABC): + """ + The methods to interact with the API of a VCS provider. + + This class is user-specific and is always created from a `RepositoryServiceProviderFactory`. + + While some of the default method implementations (such as `access_token`) make access to + the DB, overrides of the unimplemented methods should avoid doing so to minimise + unexpected behaviour. Interaction should be solely with the API of the VCS provider. + + Providers must currently support all of these operations. + """ + + def __init__( + self, factory: RepositoryServiceProviderFactory, user_id: int, access_token=None + ) -> None: + """ + Internal method for constructing the provider. + + It's recommended to use `for_user` in the factory instead. + """ + self.factory = factory + self.user_id = user_id + self._access_token = access_token + + @cached_property + def remote_account(self): + """Returns the OAuth Remote Account corresponding to the user's authentication with the provider.""" + return RemoteAccount.get(self.user_id, self.factory.remote.consumer_key) + + @cached_property + def remote_token(self): + """Return OAuth remote token model.""" + if self._access_token is not None: + return self._access_token + + return RemoteToken.get(self.user_id, self.factory.remote.consumer_key) + + @cached_property + def webhook_url(self): + """ + Returns a formatted version of the webhook receiver URL specified in the provider factory. + + The `{token}` variable in this URL string is replaced with the user-specific + webhook token. + """ + if not self.remote_account.extra_data.get("tokens", {}).get("webhook"): + raise RemoteAccountDataNotSet( + self.user_id, _("Webhook data not found for user tokens (remote data).") + ) + + webhook_token = ProviderToken.query.filter_by( + id=self.remote_account.extra_data["tokens"]["webhook"] + ).first() + if webhook_token: + return self.factory.webhook_receiver_url.format( + token=webhook_token.access_token + ) + + def is_valid_webhook(self, url: str | None): + """Check if webhook url is valid. + + The webhook url is valid if it has the same host as the configured webhook url. + + :param str url: The webhook url to be checked. + :returns: True if the webhook url is valid, False otherwise. + """ + if not url: + return False + configured_host = urlparse(self.webhook_url).netloc + url_host = urlparse(url).netloc + if not (configured_host and url_host): + return False + return configured_host == url_host + + @abstractmethod + def list_repositories(self) -> dict[str, GenericRepository] | None: + """ + Returns a dictionary of {repository_id: GenericRepository} for the current user. + + This should return _all_ repositories for which the user has permission + to create and delete webhooks. + + This means this function could return extremely large dictionaries in some cases, + but it will only be called during irregular sync events and stored in the DB. + """ + raise NotImplementedError + + @abstractmethod + def list_repository_webhooks( + self, repository_id: str + ) -> list[GenericWebhook] | None: + """ + Returns an arbitrarily ordered list of the current webhooks of a repository. + + This list should only include active webhooks which generate events for which + the corresponding `RepositoryServiceProviderFactory.webhook_is_create_release_event` + would return True. + """ + raise NotImplementedError + + def get_first_valid_webhook(self, repository_id: str) -> GenericWebhook | None: + """Get the first webhook for which `is_valid_webhook` is true.""" + webhooks = self.list_repository_webhooks(repository_id) + if webhooks is None: + return None + for hook in webhooks: + if self.is_valid_webhook(hook.url): + return hook + return None + + @abstractmethod + def get_repository(self, repository_id: str) -> GenericRepository | None: + """Returns the details of a specific repository by ID, or None if the repository does not exist or the user has no permission to view it.""" + raise NotImplementedError + + @abstractmethod + def list_repository_contributors( + self, repository_id: str, max: int + ) -> list[GenericContributor] | None: + """ + Returns the list of entities that have contributed to a given repository. + + This list may contain entities that are not currently or have never been + registered users of the VCS provider (e.g. in the case of repos imported + from a remote source). The order of the list is arbitrary, and it may include + non-human contributors (e.g. automated tools or organisations). + + Returns None if the repository does not exist or the user has no permission + to view it or its contributors. + """ + raise NotImplementedError + + @abstractmethod + def list_repository_user_ids(self, repository_id: str) -> list[str] | None: + """ + Returns a list of the IDs of valid users registered with the VCS provider that have sufficient permission to create/delete webhooks on the given repository. + + This list should contain all users for which the corresponding + repo would be included in a `list_repositories` call. + + Returns None if the repository does not exist or the user has no permission + to view it or its member users. + """ + raise NotImplementedError + + @abstractmethod + def get_repository_owner(self, repository_id: str) -> GenericOwner | None: + """ + Returns the 'owner' of a repository, which is either a user or a group/organization. + + Returns None if the repository does not exist or the user does not have permission + to find out its owner. + """ + raise NotImplementedError + + @abstractmethod + def create_webhook(self, repository_id: str) -> str | None: + """ + Creates a new webhook for a given repository, trigerred by a "create release" event. + + The URL destination is specified by `RepositoryServiceProvider.webhook_url`. + Events must be delivered via an HTTP POST request with a JSON payload. + + Returns the ID of the new webhook as returned by the provider, or None if the + creation failed due to the repository not existing or the user not having permission + to create a webhook. + """ + raise NotImplementedError + + @abstractmethod + def delete_webhook(self, repository_id: str, hook_id: str | None = None) -> bool: + """ + Deletes a webhook from the specified repository. + + If `hook_id` is specified, the webhook with that ID must be deleted. + Otherwise, all webhooks with URLs for which `is_valid_webhook` would return + True should be deleted. + + Returns True if the deletion was successful, and False if it failed due to + the repository not existing or the user not having permission to delete its + webhooks. + """ + raise NotImplementedError + + @abstractmethod + def get_own_user(self) -> GenericUser | None: + """ + Returns information about the user for which this class has been instantiated, or None if the user does not exist. + + For example, if the user ID is incorrectly specified. + """ + raise NotImplementedError + + @abstractmethod + def resolve_release_zipball_url(self, release_zipball_url: str) -> str | None: + """TODO: why do we have this.""" + raise NotImplementedError + + @abstractmethod + def fetch_release_zipball( + self, release_zipball_url: str, timeout: int + ) -> Generator[HTTPResponse]: + """ + Returns the HTTP response for downloading the contents of a zipball from a given release. + + This is provider-specific functionality as it will require attaching an auth token + to the request for private repos (and even public repos to avoid rate limits sometimes). + """ + raise NotImplementedError + + @abstractmethod + def retrieve_remote_file( + self, repository_id: str, ref_name: str, file_name: str + ) -> bytes | None: + """ + Downloads the contents of a specific file in a repo for a given ref (which could be a tag, a commit ref, a branch name, etc). + + Returns the raw bytes, or None if the repo/file does not exist or the user doesn't have permission to view it. + """ + raise NotImplementedError + + @abstractmethod + def revoke_token(self, access_token: str): + """Revoke the validity of a specific access token permanently.""" + raise NotImplementedError