diff --git a/.lifemonitor.yml b/.lifemonitor.yml deleted file mode 100644 index 4b1d6c4d8..000000000 --- a/.lifemonitor.yml +++ /dev/null @@ -1,44 +0,0 @@ -# worfklow name (override name defined on the RO-Crate metadata) -# name: MyWorkflow -# worfklow visibility -public: False - -# Issue Checker Settings -issues: - # Enable/Disable issue checker - # The list of issue types can be found @ /workflows/issues - # (e.g., https://api.lifemonitor.eu/workflows/issues) - check: true - # csv of issues to check (all included by default) - # include: [missing_config_file, not_initialised_repository_issue, missing_workflow_file, missing_metadata_file, missing_ro_crate_workflow_file, outdated_metadata_file, missing_workflow_name] - # csv of issues to ignore (none ignored by default) - # exclude: [missing_config_file, not_initialised_repository_issue, missing_workflow_file, missing_metadata_file, missing_ro_crate_workflow_file, outdated_metadata_file, missing_workflow_name] - - -# Github Integration Settings -push: - branches: - # Define the list of branches to watch - # - name: feature/XXX # wildcards can be used to specify branches (e.g., feature/*) - # update_registries: ["wfhubdev"] # available registries are listed - # # by the endpoint `/registries` - # # (e.g., https://api.lifemontor.eu/registries) - - name: "main" - update_registries: [] - enable_notifications: true - # - name: "develop" - # update_registries: [] - # enable_notifications: true - - tags: - # Define the list of tags to watch - # - name: v*.*.* # wildcards can be used to specify tags (e.g., feature/*) - # update_registries: ["wfhub"] # available registries are listed - # # by the endpoint `/registries` - # # (e.g., https://api.lifemontor.eu/registries) - - name: "v*.*.*" - update_registries: [wfhubdev, wfhubprod] - enable_notifications: true - - name: "*.*.*" - update_registries: [wfhubdev, wfhubprod] - enable_notifications: true \ No newline at end of file diff --git a/lifemonitor/api/models/repositories/__init__.py b/lifemonitor/api/models/repositories/__init__.py index 625f0b92f..199bc9903 100644 --- a/lifemonitor/api/models/repositories/__init__.py +++ b/lifemonitor/api/models/repositories/__init__.py @@ -28,13 +28,13 @@ from .github import (GithubWorkflowRepository, InstallationGithubWorkflowRepository, RepoCloneContextManager) -from .local import (LocalWorkflowRepository, - LocalGitWorkflowRepository, - ZippedWorkflowRepository) +from .local import (LocalGitWorkflowRepository, LocalWorkflowRepository, + Base64WorkflowRepository, ZippedWorkflowRepository) __all__ = [ "RepositoryFile", "WorkflowRepositoryConfig", "WorkflowFile", "TemplateRepositoryFile", "WorkflowRepository", "WorkflowRepositoryMetadata", "IssueCheckResult", - "LocalWorkflowRepository", "LocalGitWorkflowRepository", "ZippedWorkflowRepository", + "LocalWorkflowRepository", "LocalGitWorkflowRepository", + "Base64WorkflowRepository", "ZippedWorkflowRepository", "InstallationGithubWorkflowRepository", "GithubWorkflowRepository", "RepoCloneContextManager" ] diff --git a/lifemonitor/api/models/repositories/base.py b/lifemonitor/api/models/repositories/base.py index b8768d6c3..29648ccfe 100644 --- a/lifemonitor/api/models/repositories/base.py +++ b/lifemonitor/api/models/repositories/base.py @@ -29,16 +29,13 @@ from datetime import datetime from typing import Any, Dict, List, Optional, Tuple, Type, Union -import git -import giturlparse -import requests from rocrate.rocrate import Metadata, ROCrate import lifemonitor.api.models.issues as issues from lifemonitor.api.models.repositories.config import WorkflowRepositoryConfig -from lifemonitor.exceptions import IllegalStateException, LifeMonitorException +from lifemonitor.exceptions import IllegalStateException from lifemonitor.test_metadata import get_roc_suites, get_workflow_authors -from lifemonitor.utils import to_camel_case +from lifemonitor.utils import get_current_username, to_camel_case from .files import RepositoryFile, WorkflowFile @@ -51,18 +48,23 @@ class WorkflowRepository(): def __init__(self, local_path: str, - url: Optional[str] = None, + remote_url: Optional[str] = None, + owner: Optional[str] = None, name: Optional[str] = None, license: Optional[str] = None, - exclude: Optional[List[str]] = None) -> None: + exclude: Optional[List[str]] = None, + owner_as_system_user: bool = False) -> None: if not local_path: raise ValueError("empty local_path argument") self._local_path = local_path self._metadata = None self.exclude = exclude if exclude is not None else DEFAULT_IGNORED_FILES self._config = None - self._url = url + self._remote_url = remote_url self._name = name + self._owner = owner + if not owner and owner_as_system_user: + self._owner = get_current_username() self._license = license @property @@ -82,65 +84,44 @@ def metadata(self) -> Optional[WorkflowRepositoryMetadata]: return None return self._metadata - @property - def _remote_parser(self) -> giturlparse.GitUrlParsed: - try: - r = git.Repo(self.local_path) - remote = next((x for x in r.remotes if x.name == 'origin'), r.remotes[0]) - assert remote, "Unable to find a Git remote" - return giturlparse.parse(remote.url) - except Exception as e: - if logger.isEnabledFor(logging.DEBUG): - logger.exception(e) - raise LifeMonitorException(f"Not valid workflow repository: {e}") - @property def name(self) -> str: - if not self._name: - try: - self._name = self._remote_parser.name - except Exception as e: - if logger.isEnabledFor(logging.DEBUG): - logger.exception(e) - raise LifeMonitorException(f"Not valid workflow repository: {e}") - assert self._name, "Unable to detect repository name" return self._name + @name.setter + def name(self, value): + self._name = value + + @property + def owner(self) -> str: + return self._owner + + @owner.setter + def owner(self, value): + self._owner = value + @property def full_name(self) -> str: - try: - parser = self._remote_parser - return f"{parser.owner}/{parser.name}" - except Exception as e: - if logger.isEnabledFor(logging.DEBUG): - logger.exception(e) - raise LifeMonitorException(f"Not valid workflow repository: {e}") + if self.owner: + return f"{self.owner}/{self.name}" + return self.name @property - def https_url(self) -> str: - if not self._url: - try: - self._url = self._remote_parser.url2https.removesuffix('.git') - except Exception as e: - if logger.isEnabledFor(logging.DEBUG): - logger.exception(e) - raise LifeMonitorException(f"Not valid workflow repository: {e}") - assert self._url, "Unable to detect repository url" - return self._url + def remote_url(self) -> str: + return self._remote_url + + @remote_url.setter + def remote_url(self, value): + self._remote_url = value @property def license(self) -> Optional[str]: - if not self._license: - try: - parser = self._remote_parser - if parser.host == 'github.com': - l_info = requests.get(f"https://api.github.com/repos/{self.full_name}/license") - self._license = l_info.json()['license']['spdx_id'] - except Exception as e: - if logger.isEnabledFor(logging.DEBUG): - logger.error(e) return self._license + @license.setter + def license(self, value): + self._license = value + @abstractclassmethod def find_file_by_pattern(self, search: str, path: str = '.') -> RepositoryFile: raise NotImplementedError() diff --git a/lifemonitor/api/models/repositories/github.py b/lifemonitor/api/models/repositories/github.py index 838476d05..71ef28cbc 100644 --- a/lifemonitor/api/models/repositories/github.py +++ b/lifemonitor/api/models/repositories/github.py @@ -30,6 +30,11 @@ from typing import Any, Dict, List, Optional, Union import giturlparse +import requests +from github.ContentFile import ContentFile +from github.Repository import Repository as GithubRepository +from github.Requester import Requester + from lifemonitor.api.models.repositories.base import ( WorkflowRepository, WorkflowRepositoryMetadata) from lifemonitor.api.models.repositories.config import WorkflowRepositoryConfig @@ -41,11 +46,7 @@ from lifemonitor.utils import (checkout_ref, clone_repo, get_current_ref, get_git_repo_revision) -from github.ContentFile import ContentFile -from github.Repository import Repository as GithubRepository -from github.Requester import Requester - -from .local import ZippedWorkflowRepository +from .local import LocalGitWorkflowRepository, ZippedWorkflowRepository DEFAULT_BASE_URL = "https://api.github.com" DEFAULT_TIMEOUT = 15 @@ -163,7 +164,7 @@ def __init__(self, requester: Requester, headers: Dict[str, Union[str, int]], attributes: Dict[str, Any], completed: bool, ref: Optional[str] = None, rev: Optional[str] = None, - name: Optional[str] = None, license: Optional[str] = None, + exclude: Optional[List[str]] = None, local_path: Optional[str] = None, auto_cleanup: bool = True) -> None: super().__init__(requester, headers, attributes, completed) self._ref = ref @@ -173,8 +174,14 @@ def __init__(self, requester: Requester, self._local_repo: Optional[LocalWorkflowRepository] = None self._local_path = local_path self._config = None - self._name = name - self._license = license + self._license = None + self._exclude = exclude or [] + # Check if the local path is a git repo: + # if so, we do not need to clone it again and we can disable the auto-cleanup + if local_path and ( + not os.path.exists(local_path) or not LocalWorkflowRepository.is_git_repo(local_path)): + logger.warning("Local path %r already exists and it is a git repository. Thus, auto-cleanup is disabled.", local_path) + self.auto_cleanup = False def __repr__(self) -> str: return f"{self.__class__.__name__} bound to {self.url} (ref: {self.ref}, rev: {self.rev})" @@ -194,13 +201,33 @@ def checkout_ref(self, ref: str, token: Optional[str] = None, branch_name: Optio return checkout_ref(self.local_path, ref, auth_token=token, branch_name=branch_name) @property - def https_url(self) -> str: + def remote_url(self) -> str: return self.html_url + @property + def owner(self) -> str: + onwer = super().owner + return onwer.login if onwer else None + + @property + def license(self) -> Optional[str]: + if not self._license: + try: + l_info = requests.get(f"https://api.github.com/repos/{self.full_name}/license") + self._license = l_info.json()['license']['spdx_id'] + except Exception as e: + if logger.isEnabledFor(logging.DEBUG): + logger.error(e) + return self._license + + @property + def exclude(self) -> List[str]: + return self._exclude + @property def _remote_parser(self) -> giturlparse.GitUrlParsed: try: - return giturlparse.parse(self.https_url) + return giturlparse.parse(self.remote_url) except Exception as e: if logger.isEnabledFor(logging.DEBUG): logger.exception(e) @@ -333,12 +360,15 @@ def write_zip(self, target_path: str): return self.local_repo.write_zip(target_path=target_path) @property - def local_repo(self) -> LocalWorkflowRepository: + def local_repo(self) -> LocalGitWorkflowRepository: if not self._local_repo: local_path = self._local_path or tempfile.mkdtemp(dir=BaseConfig.BASE_TEMP_FOLDER) - logger.debug("Cloning %r", self) - clone_repo(self.clone_url, ref=self.ref, target_path=local_path) - self._local_repo = LocalWorkflowRepository(local_path=local_path) + if not os.path.exists(local_path) or not LocalWorkflowRepository.is_git_repo(local_path): + logger.debug("Cloning %r", self.clone_url) + clone_repo(self.clone_url, ref=self.ref, target_path=local_path) + else: + logger.debug("Skipping cloning of %r", self.clone_url) + self._local_repo = LocalGitWorkflowRepository(local_path=local_path) return self._local_repo @property @@ -360,13 +390,17 @@ def cleanup(self) -> None: class GithubWorkflowRepository(InstallationGithubWorkflowRepository): def __init__(self, full_name_or_id: str, token: Optional[str] = None, - ref: Optional[str] = None, rev: Optional[str] = None, local_path: Optional[str] = None, auto_cleanup: bool = True) -> None: + exclude: Optional[List[str]] = None, + ref: Optional[str] = None, rev: Optional[str] = None, + local_path: Optional[str] = None, auto_cleanup: bool = True, + ) -> None: assert isinstance(full_name_or_id, (str, int)), full_name_or_id url_base = "/repositories/" if isinstance(full_name_or_id, int) else "/repos/" url = f"{url_base}{full_name_or_id}" super().__init__( __make_requester__(token=token), headers={}, attributes={'url': url}, completed=False, - ref=ref, rev=rev, local_path=local_path, auto_cleanup=auto_cleanup) + ref=ref, rev=rev, exclude=exclude, + local_path=local_path, auto_cleanup=auto_cleanup) @classmethod def from_url(cls, url: str, token: Optional[str] = None, ref: Optional[str] = None, diff --git a/lifemonitor/api/models/repositories/local.py b/lifemonitor/api/models/repositories/local.py index 1a1a65e93..e7595de26 100644 --- a/lifemonitor/api/models/repositories/local.py +++ b/lifemonitor/api/models/repositories/local.py @@ -32,6 +32,8 @@ from pathlib import Path from typing import List, Optional +import git + from lifemonitor.api.models.repositories.base import ( WorkflowRepository, WorkflowRepositoryMetadata) from lifemonitor.api.models.repositories.files import (RepositoryFile, @@ -41,7 +43,7 @@ IllegalStateException, LifeMonitorException, NotValidROCrateException) -from lifemonitor.utils import extract_zip, walk +from lifemonitor.utils import RemoteGitRepoInfo, extract_zip, walk # set module level logger logger = logging.getLogger(__name__) @@ -51,9 +53,25 @@ class LocalWorkflowRepository(WorkflowRepository): def __init__(self, local_path: str, + remote_url: Optional[str] = None, + owner: Optional[str] = None, + name: Optional[str] = None, + license: Optional[str] = None, exclude: Optional[List[str]] = None) -> None: - super().__init__(local_path, exclude=exclude) + super().__init__(local_path=local_path, + remote_url=remote_url, + owner=owner, + name=name, + license=license, + exclude=exclude) self._transient_files = {'add': {}, 'remove': {}} + # check if the local path is defined + if not local_path: + raise ValueError("Local path not set") + # check if the local path is a git repository + # and if so, raise a warning + if self.is_git_repo(self.local_path): + logger.warning("The repository is a git repository. You should use the LocalGitWorkflowRepository instead") @staticmethod def is_git_repo(local_path: str) -> bool: @@ -124,7 +142,7 @@ def generate_metadata(self, workflow_version=workflow_version, local_repo_path=self.local_path, license=license or self.license, - repo_url=repo_url or self.https_url, **kwargs) + repo_url=repo_url or self.remote_url, **kwargs) self._metadata = WorkflowRepositoryMetadata(self, init=False, exclude=self.exclude, local_path=self.local_path) except Exception as e: @@ -168,10 +186,20 @@ class TemporaryLocalWorkflowRepository(LocalWorkflowRepository): def __init__(self, local_path: str, + remote_url: Optional[str] = None, + owner: Optional[str] = None, + name: Optional[str] = None, + license: Optional[str] = None, exclude: Optional[List[str]] = None, auto_cleanup: bool = True) -> None: self.auto_cleanup = auto_cleanup - super().__init__(local_path, exclude) + super().__init__( + local_path=local_path, + remote_url=remote_url, + owner=owner, + name=name, + license=license, + exclude=exclude) def cleanup(self) -> None: logger.debug("Cleaning temp extraction folder of zipped repository @ %s ...", self.local_path) @@ -186,9 +214,22 @@ def __del__(self): class ZippedWorkflowRepository(TemporaryLocalWorkflowRepository): - def __init__(self, archive_path: str | Path, exclude: Optional[List[str]] = None, auto_cleanup: bool = True) -> None: - local_path = tempfile.mkdtemp(dir=BaseConfig.BASE_TEMP_FOLDER) - super().__init__(local_path=local_path, exclude=exclude, auto_cleanup=auto_cleanup) + def __init__(self, archive_path: str | Path, + local_path: Optional[str] = None, + remote_url: Optional[str] = None, + owner: Optional[str] = None, + name: Optional[str] = None, + license: Optional[str] = None, + exclude: Optional[List[str]] = None, + auto_cleanup: bool = True) -> None: + local_path = local_path or tempfile.mkdtemp(dir=BaseConfig.BASE_TEMP_FOLDER) + super().__init__(local_path=local_path, + remote_url=remote_url, + owner=owner, + name=name, + license=license, + exclude=exclude, + auto_cleanup=auto_cleanup) try: extract_zip(archive_path, local_path) self.archive_path = archive_path @@ -201,10 +242,25 @@ def __init__(self, archive_path: str | Path, exclude: Optional[List[str]] = None class Base64WorkflowRepository(TemporaryLocalWorkflowRepository): - def __init__(self, base64_rocrate: str) -> None: - local_path = tempfile.mkdtemp(dir=BaseConfig.BASE_TEMP_FOLDER) - super().__init__(local_path, auto_cleanup=True) + def __init__(self, base64_rocrate: str, + local_path: Optional[str] = None, + remote_url: Optional[str] = None, + owner: Optional[str] = None, + name: Optional[str] = None, + license: Optional[str] = None, + exclude: Optional[List[str]] = None, + auto_cleanup: bool = True) -> None: + local_path = local_path or tempfile.mkdtemp(dir=BaseConfig.BASE_TEMP_FOLDER) + super().__init__( + local_path=local_path, + remote_url=remote_url, + owner=owner, + name=name, + license=license, + exclude=exclude, + auto_cleanup=auto_cleanup) try: + self._base64 = base64_rocrate rocrate = base64.b64decode(base64_rocrate) zip_file = zipfile.ZipFile(BytesIO(rocrate)) zip_file.extractall(local_path) @@ -216,17 +272,63 @@ def __init__(self, base64_rocrate: str) -> None: logger.debug(e) raise DecodeROCrateException(detail=str(e)) + @property + def base64_archive(self) -> str: + return self._base64 + class LocalGitWorkflowRepository(LocalWorkflowRepository): """ A LocalWorkflowRepository that is also a Git repository. """ - def __init__(self, local_path: str, exclude: Optional[List[str]] = None) -> None: - from git import Repo - super().__init__(local_path, exclude) - self._git_repo = Repo(self.local_path) + def __init__(self, + local_path: Optional[str] = None, + remote_url: Optional[str] = None, + owner: Optional[str] = None, + name: Optional[str] = None, + license: Optional[str] = None, + exclude: Optional[List[str]] = None) -> None: + super().__init__( + local_path=local_path, + remote_url=remote_url, + owner=owner, + name=name, + license=license, + exclude=exclude + ) + self._git_repo = git.Repo(self.local_path) + self._remote_repo_info = None + try: + self._remote_repo_info = RemoteGitRepoInfo.parse(self._git_repo.remotes.origin.url) + except git.exc.GitCommandError as e: + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) @property def main_branch(self) -> str: return self._git_repo.active_branch.name + + @property + def owner(self) -> str: + return super().owner or \ + self._remote_repo_info.owner if self._remote_repo_info else None + + @property + def name(self) -> str: + return super().name or \ + self._remote_repo_info.repo if self._remote_repo_info else None + + @property + def remote_url(self) -> str: + return super().remote_url or \ + self._remote_repo_info.url if self._remote_repo_info else None + + @property + def license(self) -> str: + return super().license or \ + self._remote_repo_info.license if self._remote_repo_info else None + + @property + def remote_info(self) -> RemoteGitRepoInfo | None: + return self._remote_repo_info diff --git a/lifemonitor/utils.py b/lifemonitor/utils.py index 232c80de8..67e3df915 100644 --- a/lifemonitor/utils.py +++ b/lifemonitor/utils.py @@ -18,6 +18,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from __future__ import annotations import base64 import fnmatch @@ -33,6 +34,7 @@ import shutil import socket import string +import subprocess import tempfile import time import urllib @@ -45,6 +47,8 @@ from urllib.parse import urlparse import flask +import git +import giturlparse import networkx as nx import pygit2 import requests @@ -431,6 +435,17 @@ def isoformat_to_datetime(iso: str) -> datetime: raise ValueError(f"Datetime string {iso} is not in ISO format") from e +def get_current_username() -> str: + try: + import pwd + return pwd.getpwuid(os.getuid()).pw_name + except Exception as e: + logger.warning("Unable to get current username: %s", e) + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) + return "unknown" + + def parse_date_interval(interval: str) -> Tuple[Literal['<=', '>=', '<', '>', '..'], Optional[datetime], datetime]: """Parse a date interval string. @@ -698,6 +713,84 @@ def checkout_ref(repo_path: str, ref: str, auth_token: Optional[str] = None, bra raise lm_exceptions.DownloadException(detail=str(e)) +def detect_default_remote_branch(local_repo_path: str) -> Optional[str]: + '''Return the default remote branch of the repo; None if not found''' + assert os.path.isdir(local_repo_path), "Path should be a folder" + try: + pattern = r"HEAD branch: (\w+)" + repo = git.Repo(local_repo_path) + for remote in repo.remotes: + try: + output = subprocess.run(['git', 'remote', 'show', remote.url], + check=False, stdout=subprocess.PIPE, cwd=local_repo_path).stdout.decode('utf-8') + match = re.search(pattern, output) + if match: + detected_branch = match.group(1) + logger.debug("Found default branch %r for remote %r", detected_branch, remote.url) + return detected_branch + except Exception as e: + logger.debug("Unable to get default branch for remote %r: %r", remote.url, e) + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) + except Exception as e: + logger.error(e) + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) + return None + + +def get_current_active_branch(local_repo_path: str) -> str: + assert os.path.isdir(local_repo_path), "Path should be a folder" + try: + repo = git.Repo(local_repo_path) + return repo.active_branch.name + except git.InvalidGitRepositoryError: + raise ValueError(f"Invalid git repository: {local_repo_path}") + except Exception as e: + raise lm_exceptions.LifeMonitorException(detail=f"Unable to get the current active branch: {e}") + + +class RemoteGitRepoInfo(giturlparse.result.GitUrlParsed): + pathname: str = None + protocol: str = None + owner: str = None + + def __init__(self, parsed_info): + # fix for giturlparse: protocols are not parsed correctly + del parsed_info['protocols'] + super().__init__(parsed_info) + + @property + def fullname(self): + return f"{self.owner}/{self.repo}" + + @property + def urls(self) -> Dict[str, str]: + urls = super().urls + # fix for giturlparse: https urls should not have a .git suffix + urls['https'] = urls['https'].rstrip('.git') + return urls + + @property + def protocols(self) -> List[str]: + return list(self.urls.keys()) + + @property + def license(self) -> Optional[str]: + try: + if self.host == 'github.com': + l_info = requests.get(f"https://api.github.com/repos/{self.fullname}/license") + self._license = l_info.json()['license']['spdx_id'] + except Exception as e: + if logger.isEnabledFor(logging.DEBUG): + logger.error(e) + return self._license + + @staticmethod + def parse(git_remote_url: str) -> RemoteGitRepoInfo: + return RemoteGitRepoInfo(giturlparse.parser.parse(git_remote_url)) + + def get_current_ref(local_repo_path: str) -> str: assert os.path.isdir(local_repo_path), "Path should be a folder" repo = pygit2.Repository(local_repo_path) diff --git a/prova b/prova deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/conftest.py b/tests/conftest.py index 672708efe..be7e8c8d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,7 +18,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - +import base64 import logging import os import pathlib @@ -43,6 +43,7 @@ from lifemonitor.api.models.repositories import (GithubWorkflowRepository, LocalGitWorkflowRepository, LocalWorkflowRepository, + Base64WorkflowRepository, ZippedWorkflowRepository) from lifemonitor.api.services import LifeMonitor from lifemonitor.cache import cache, clear_cache @@ -523,3 +524,25 @@ def simple_local_wf_repo(test_repo_collection_path: Path) -> Generator[LocalGitW (tmp_repo_path / '.dot-git').rename(tmp_repo_path / '.git') repo = LocalGitWorkflowRepository(str(tmp_repo_path)) yield repo + + +@pytest.fixture +def simple_zip_wf_repo(simple_local_wf_repo) -> Generator[ZippedWorkflowRepository, None, None]: + from rocrate.rocrate import ROCrate + with tempfile.NamedTemporaryFile(suffix='.zip') as tmpzip: + crate = ROCrate(simple_local_wf_repo.local_path) + crate.write_zip(tmpzip.name) + tmpzip.seek(0) + repo = ZippedWorkflowRepository(tmpzip.name) + yield repo + + +@pytest.fixture +def simple_base64_wf_repo(simple_zip_wf_repo: ZippedWorkflowRepository) -> Generator[Base64WorkflowRepository, None, None]: + base64_encoded_repo = None + with open(simple_zip_wf_repo.archive_path, mode="rb") as zip_file: + contents = zip_file.read() + base64_encoded_repo = base64.b64encode(contents) + if not base64_encoded_repo: + raise RuntimeError("Could not base64 encode the repository") + return Base64WorkflowRepository(base64_encoded_repo) diff --git a/tests/unit/api/models/repositories/test_github_repos.py b/tests/unit/api/models/repositories/test_github_repos.py new file mode 100644 index 000000000..d8cf5c707 --- /dev/null +++ b/tests/unit/api/models/repositories/test_github_repos.py @@ -0,0 +1,73 @@ +# Copyright (c) 2020-2022 CRS4 +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import logging +from typing import Dict + +import pytest + +import lifemonitor.api.models.repositories as repos + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def test_repo_info(simple_local_wf_repo) -> Dict[str, str]: + return { + "name": "test-galaxy-wf-repo", + "owner": "ilveroluca", + "license": "MIT", + "exclude": [".*"], + "local_path": simple_local_wf_repo.local_path, + "default_branch": "main", + "active_branch": "main", + "remote_url": 'https://github.com/ilveroluca/test-galaxy-wf-repo' + } + + +def test_github_repo(test_repo_info, simple_local_wf_repo): + repo = repos.GithubWorkflowRepository(full_name_or_id=simple_local_wf_repo.full_name, + local_path=test_repo_info['local_path'], + exclude=test_repo_info['exclude'],) + + assert repo, "Repository object is None" + assert isinstance(repo, repos.GithubWorkflowRepository), "Repository is not a WorkflowRepository" + assert repo.name == test_repo_info['name'], "Repository name is not correct" + assert repo.owner == test_repo_info['owner'], "Repository owner is not correct" + assert repo.full_name == f"{test_repo_info['owner']}/{test_repo_info['name']}", "Repository full name is not correct" + assert repo.license == test_repo_info['license'], "Repository license is not correct" + assert repo.exclude == test_repo_info['exclude'], "Repository exclude is not correct" + assert repo.local_path == test_repo_info['local_path'], "Repository local path is not correct" + assert repo.remote_url == test_repo_info['remote_url'], "Repository remote url is not correct" + + +def test_github_repo_no_local_path(test_repo_info, simple_local_wf_repo): + repo = repos.GithubWorkflowRepository(full_name_or_id=simple_local_wf_repo.full_name, + exclude=test_repo_info['exclude'],) + + assert repo, "Repository object is None" + assert isinstance(repo, repos.GithubWorkflowRepository), "Repository is not a WorkflowRepository" + assert repo.name == test_repo_info['name'], "Repository name is not correct" + assert repo.owner == test_repo_info['owner'], "Repository owner is not correct" + assert repo.full_name == f"{test_repo_info['owner']}/{test_repo_info['name']}", "Repository full name is not correct" + assert repo.license == test_repo_info['license'], "Repository license is not correct" + assert repo.exclude == test_repo_info['exclude'], "Repository exclude is not correct" + assert repo.local_path.startswith('/tmp'), "Repository local path is not correct" + assert repo.remote_url == test_repo_info['remote_url'], "Repository remote url is not correct" diff --git a/tests/unit/api/models/repositories/test_local.py b/tests/unit/api/models/repositories/test_local.py deleted file mode 100644 index d3c2d1286..000000000 --- a/tests/unit/api/models/repositories/test_local.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright (c) 2020-2022 CRS4 -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -import logging - -import lifemonitor.api.models.repositories as repos - -logger = logging.getLogger(__name__) - - -def test_local_git_repo(simple_local_wf_repo): - assert repos.LocalWorkflowRepository.is_git_repo(simple_local_wf_repo.local_path) - assert repos.LocalGitWorkflowRepository.is_git_repo(simple_local_wf_repo.local_path) - assert "main" == simple_local_wf_repo.main_branch diff --git a/tests/unit/api/models/repositories/test_local_repos.py b/tests/unit/api/models/repositories/test_local_repos.py new file mode 100644 index 000000000..5cdec5353 --- /dev/null +++ b/tests/unit/api/models/repositories/test_local_repos.py @@ -0,0 +1,197 @@ +# Copyright (c) 2020-2022 CRS4 +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import logging +import tempfile +from typing import Dict + +import pytest + +import lifemonitor.api.models.repositories as repos +import lifemonitor.utils as utils + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def test_repo_info(simple_local_wf_repo) -> Dict[str, str]: + return { + "name": "test_repo", + "owner": "test_owner", + "license": "MIT", + "exclude": [".*"], + "local_path": simple_local_wf_repo.local_path, + "default_branch": "main", + "active_branch": "main", + "remote_url": 'https://repo_url.git' + } + + +def test_base_repo(test_repo_info): + repo = repos.WorkflowRepository(local_path=test_repo_info['local_path'], + remote_url=test_repo_info['remote_url'], + name=test_repo_info['name'], + owner=test_repo_info['owner'], + license=test_repo_info['license'], + exclude=test_repo_info['exclude'],) + + assert repo, "Repository object is None" + assert isinstance(repo, repos.WorkflowRepository), "Repository is not a WorkflowRepository" + assert repo.name == test_repo_info['name'], "Repository name is not correct" + assert repo.owner == test_repo_info['owner'], "Repository owner is not correct" + assert repo.full_name == f"{test_repo_info['owner']}/{test_repo_info['name']}", "Repository full name is not correct" + assert repo.license == test_repo_info['license'], "Repository license is not correct" + assert repo.exclude == test_repo_info['exclude'], "Repository exclude is not correct" + assert repo.local_path == test_repo_info['local_path'], "Repository local path is not correct" + assert repo.remote_url == test_repo_info['remote_url'], "Repository remote url is not correct" + + +def test_base_repo_fullname_wo_owner(test_repo_info): + repo = repos.WorkflowRepository(local_path=test_repo_info['local_path'], + name=test_repo_info['name']) + assert isinstance(repo, repos.WorkflowRepository), "Repository is not a WorkflowRepository" + assert repo.name == test_repo_info['name'], "Repository name is not correct" + assert repo.owner is None, "Repository owner is not correct" + assert repo.full_name == test_repo_info['name'], "Repository full name is not correct" + + +def test_base_repo_system_user_as_owner(test_repo_info): + repo = repos.WorkflowRepository(local_path=test_repo_info['local_path'], + name=test_repo_info['name'], + license=test_repo_info['license'], + exclude=test_repo_info['exclude'], + owner_as_system_user=True) + + current_username = utils.get_current_username() + assert repo, "Repository object is None" + assert isinstance(repo, repos.WorkflowRepository), "Repository is not a WorkflowRepository" + assert repo.name == test_repo_info['name'], "Repository name is not correct" + assert repo.owner == current_username, "Repository owner is not correct" + assert repo.full_name == f"{current_username}/{test_repo_info['name']}", "Repository full name is not correct" + + +def test_local_git_repo(simple_local_wf_repo): + assert repos.LocalWorkflowRepository.is_git_repo(simple_local_wf_repo.local_path) + assert repos.LocalGitWorkflowRepository.is_git_repo(simple_local_wf_repo.local_path) + assert "main" == simple_local_wf_repo.main_branch + + +def test_local_git_repo_no_remote_url(simple_local_wf_repo): + logger.debug("Remote url: %s", simple_local_wf_repo.remote_url) + assert simple_local_wf_repo.remote_url is not None, "Remote url is None" + assert simple_local_wf_repo.remote_url == \ + 'https://github.com/ilveroluca/test-galaxy-wf-repo.git', \ + "Remote url is not correct" + + +def test_local_git_repo_no_name(simple_local_wf_repo): + logger.debug("Repository name: %s", simple_local_wf_repo.name) + assert simple_local_wf_repo.name is not None, "Repository name is None" + assert simple_local_wf_repo.name == 'test-galaxy-wf-repo', \ + "Repository name is not correct" + + +def test_local_git_repo_no_owner(simple_local_wf_repo): + assert simple_local_wf_repo.owner == 'ilveroluca' + + +def test_local_git_repo_owner_overwrite(simple_local_wf_repo): + assert simple_local_wf_repo.owner == 'ilveroluca' + + repo = repos.LocalGitWorkflowRepository(simple_local_wf_repo.local_path, owner='test_owner') + assert repo.owner == 'test_owner', "Repository owner is not correct" + + +def test_local_git_repo_no_license(simple_local_wf_repo): + logger.debug("Repository license: %s", simple_local_wf_repo.license) + assert simple_local_wf_repo.license is not None, "Repository license is None" + + +def test_zip_repo_empty_local_path(simple_zip_wf_repo): + # check if the repository is a zip repository + assert isinstance(simple_zip_wf_repo, repos.ZippedWorkflowRepository) + + logger.debug("Archive path: %s", simple_zip_wf_repo.archive_path) + + # create a new repository from the zip file + # NOTE: the local_path is None + repo = repos.ZippedWorkflowRepository( + simple_zip_wf_repo.archive_path, + local_path=None + ) + # check if the repository is a zip repository + assert isinstance(repo, repos.ZippedWorkflowRepository) + assert repo.archive_path == simple_zip_wf_repo.archive_path + assert repo.local_path is not None + + +def test_zip_repo(test_repo_info, simple_zip_wf_repo): + # check if the repository is a zip repository + assert isinstance(simple_zip_wf_repo, repos.ZippedWorkflowRepository) + + logger.debug("Archive path: %s", simple_zip_wf_repo.archive_path) + + with tempfile.TemporaryDirectory() as tmpdir: + # create a new repository from the zip file + repo = repos.ZippedWorkflowRepository( + simple_zip_wf_repo.archive_path, + local_path=tmpdir, + name=test_repo_info['name'], + owner=test_repo_info['owner'], + license=test_repo_info['license'], + exclude=test_repo_info['exclude'], + ) + # check if the repository is a zip repository + assert isinstance(repo, repos.ZippedWorkflowRepository) + # check paths + assert repo.archive_path == simple_zip_wf_repo.archive_path + assert repo.local_path is tmpdir, "Repository local path is not correct" + # check metadata + assert repo.owner == test_repo_info['owner'], "Repository owner is not correct" + assert repo.name == test_repo_info['name'], "Repository name is not correct" + assert repo.full_name == f"{test_repo_info['owner']}/{test_repo_info['name']}", "Repository full name is not correct" + assert repo.license == test_repo_info['license'], "Repository license is not correct" + + +def test_base64_repo(test_repo_info, simple_base64_wf_repo): + assert isinstance(simple_base64_wf_repo, repos.Base64WorkflowRepository) + + # check paths + with tempfile.TemporaryDirectory() as tmpdir: + # create a new repository from the zip file + repo = repos.Base64WorkflowRepository( + simple_base64_wf_repo.base64_archive, + local_path=tmpdir, + name=test_repo_info['name'], + owner=test_repo_info['owner'], + license=test_repo_info['license'], + exclude=test_repo_info['exclude'], + ) + # check if the repository is a base64 repository + assert isinstance(repo, repos.Base64WorkflowRepository), "Repository is not a Base64WorkflowRepository" + # check encodede archive + assert repo.base64_archive == simple_base64_wf_repo.base64_archive, "Repository base64 archive is not correct" + # check paths + assert repo.local_path is tmpdir, "Repository local path is not correct" + # check metadata + assert repo.owner == test_repo_info['owner'], "Repository owner is not correct" + assert repo.name == test_repo_info['name'], "Repository name is not correct" + assert repo.full_name == f"{test_repo_info['owner']}/{test_repo_info['name']}", "Repository full name is not correct" + assert repo.license == test_repo_info['license'], "Repository license is not correct" diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 1a7845f81..4b0aec9ed 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -18,14 +18,18 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import logging import os import tempfile +from typing import Dict import pytest import lifemonitor.exceptions as lm_exceptions import lifemonitor.utils as utils +logger = logging.getLogger(__name__) + def test_download_url_404(): with tempfile.TemporaryDirectory() as d: @@ -145,3 +149,54 @@ def test_match_ref(): assert utils.match_ref('1.0.1', ['*.*.*']) == ('1.0.1', '*.*.*') assert utils.match_ref('pippo', ['*.*.*']) is None assert utils.match_ref('v1.0.1', ['v*.*.*', '*.*.*']) == ('v1.0.1', 'v*.*.*') + + +def test_main_branch_detection_no_remote(simple_local_wf_repo): + logger.debug("Testing main branch detection... (repo: %r)", simple_local_wf_repo) + logger.debug("Repo branches: %r", simple_local_wf_repo.local_path) + assert utils.detect_default_remote_branch(simple_local_wf_repo.local_path) == 'main', "No remote, main branch detection should fail" + + +def test_default_branch_detection(simple_local_wf_repo): + logger.debug("Testing main branch detection of LifeMonitor repo... (repo: %r)", '.') + logger.debug("Current dir: %r", simple_local_wf_repo.local_path) + assert utils.detect_default_remote_branch(simple_local_wf_repo.local_path) == 'main', "main branch detection failed" + + +def test_active_branch_detection(simple_local_wf_repo): + logger.debug("Testing active branch detection... (repo: %r)", simple_local_wf_repo) + logger.debug("Repo local path: %r", simple_local_wf_repo.local_path) + assert utils.get_current_active_branch(simple_local_wf_repo.local_path) == 'main', "active branch detection failed" + + +def test_active_branch_detection_against_no_git_folder(): + with tempfile.TemporaryDirectory() as tmpdir: + logger.debug("Testing active branch detection... (repo: %r)", tmpdir) + with pytest.raises(ValueError): + assert utils.get_current_active_branch(tmpdir) is None, "active branch detection failed" + + +def __git_remote_urls__() -> Dict[str, str]: + return { + 'https': 'https://github.com/crs4/life_monitor', + 'ssh': 'git@github.com:crs4/life_monitor.git', + 'git': 'git://github.com/crs4/life_monitor.git' + } + + +@pytest.mark.parametrize("protocol,remote_git_url", list(__git_remote_urls__().items())) +def test_remote_git_info_detection(protocol, remote_git_url): + remote_info = utils.RemoteGitRepoInfo.parse(remote_git_url) + assert remote_info is not None, "remote info detection failed" + assert isinstance(remote_info, utils.RemoteGitRepoInfo), "remote info detection failed" + + assert remote_info.url == remote_git_url, "Invalid remote url" + assert remote_info.owner == 'crs4', "Invalid remote owner" + assert remote_info.repo == 'life_monitor', "Invalid remote repo" + assert remote_info.fullname == 'crs4/life_monitor', "Invalid remote fullname" + assert protocol in remote_info.protocols, "Invalid remote protocols" + assert remote_info.host == 'github.com', "Invalid remote host" + assert protocol in remote_info.urls, "Invalid remote urls" + + for p, u in __git_remote_urls__().items(): + assert u == remote_info.urls[p], "Invalid remote url for the %s protocol" % p