diff --git a/dojo_plugin/api/v1/dojos.py b/dojo_plugin/api/v1/dojos.py
index a84bc3b57..c0a9db047 100644
--- a/dojo_plugin/api/v1/dojos.py
+++ b/dojo_plugin/api/v1/dojos.py
@@ -11,7 +11,8 @@
from ...models import DojoStudents, Dojos, DojoModules, DojoChallenges, DojoUsers, Emojis, SurveyResponses
from ...utils import render_markdown, is_challenge_locked
-from ...utils.dojo import dojo_route, dojo_admins_only, dojo_create
+from ...utils.dojo import dojo_route, dojo_admins_only
+from ...dojo_creation.dojo_initializer import dojo_create
dojos_namespace = Namespace(
diff --git a/dojo_plugin/dojo_creation/builder_utils.py b/dojo_plugin/dojo_creation/builder_utils.py
new file mode 100644
index 000000000..ae7188d68
--- /dev/null
+++ b/dojo_plugin/dojo_creation/builder_utils.py
@@ -0,0 +1,89 @@
+from schema import Optional, Regex, Or, Use
+import datetime
+from sqlalchemy.orm.exc import NoResultFound
+
+
+ID_REGEX = Regex(r"^[a-z0-9-]{1,32}$")
+UNIQUE_ID_REGEX = Regex(r"^[a-z0-9-~]{1,128}$")
+NAME_REGEX = Regex(r"^[\S ]{1,128}$")
+IMAGE_REGEX = Regex(r"^[\S]{1,256}$")
+FILE_PATH_REGEX = Regex(r"^[A-Za-z0-9_][A-Za-z0-9-_./]*$")
+FILE_URL_REGEX = Regex(r"^https://www.dropbox.com/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9.-_]*?rlkey=[a-zA-Z0-9]*&dl=1")
+DATE = Use(datetime.datetime.fromisoformat)
+
+ID_NAME_DESCRIPTION = {
+ Optional("id"): ID_REGEX,
+ Optional("name"): NAME_REGEX,
+ Optional("description"): str,
+}
+
+VISIBILITY = {
+ Optional("visibility", default={}): {
+ Optional("start"): DATE,
+ Optional("stop"): DATE,
+ }
+}
+
+SURVEY = {
+ Optional("survey"): Or(
+ {
+ "type": "multiplechoice",
+ "prompt": str,
+ Optional("probability"): float,
+ "options": [str],
+ },
+ {
+ "type": "thumb",
+ "prompt": str,
+ Optional("probability"): float,
+ },
+ {
+ "type": "freeform",
+ "prompt": str,
+ Optional("probability"): float,
+ },
+ )
+}
+
+BASE_SPEC = {
+ **ID_NAME_DESCRIPTION,
+ **VISIBILITY,
+
+ Optional("image"): IMAGE_REGEX,
+ Optional("allow_privileged"): bool,
+ Optional("importable"): bool,
+ Optional("auxiliary", default={}, ignore_extra_keys=True): dict,
+
+ **SURVEY,
+}
+"""
+Dictionary for specification fields that are defined identically in all three layers of the specification schema
+"""
+
+
+def import_one(query, error_message):
+ try:
+ o = query.one()
+ assert o.importable, f"Import disallowed for {o}."
+ return o
+ except NoResultFound:
+ raise AssertionError(error_message)
+
+def first_present(key, *dicts, required=False):
+ for d in dicts:
+ if d and key in d:
+ return d[key]
+ if required:
+ raise KeyError(f"Required key '{key}' not found in data.")
+ return None
+
+def get_visibility(cls, *dicts):
+ visibility = first_present("visibility", *dicts)
+
+ if visibility:
+ start = visibility["start"].astimezone(datetime.timezone.utc) if "start" in visibility else None
+ stop = visibility["stop"].astimezone(datetime.timezone.utc) if "stop" in visibility else None
+ assert start or stop, "`start` or `stop` value must be present under visibility"
+ return cls(start=start, stop=stop)
+
+ return None
\ No newline at end of file
diff --git a/dojo_plugin/dojo_creation/challenge_builder.py b/dojo_plugin/dojo_creation/challenge_builder.py
new file mode 100644
index 000000000..2ab9f4bd3
--- /dev/null
+++ b/dojo_plugin/dojo_creation/challenge_builder.py
@@ -0,0 +1,117 @@
+from schema import Schema, Optional, SchemaError
+
+from ..models import Dojos, DojoChallenges, DojoChallengeVisibilities, Challenges, Flags
+from CTFd.utils.user import is_admin
+from .builder_utils import (
+ ID_REGEX,
+ UNIQUE_ID_REGEX,
+ BASE_SPEC,
+ import_one,
+ first_present,
+ get_visibility,
+)
+
+
+CHALLENGE_SPEC = Schema([{
+ **BASE_SPEC,
+
+ Optional("progression_locked"): bool,
+ # Optional("path"): Regex(r"^[^\s\.\/][^\s\.]{,255}$"),
+
+ Optional("import"): {
+ Optional("dojo"): UNIQUE_ID_REGEX,
+ Optional("module"): ID_REGEX,
+ "challenge": ID_REGEX,
+ },
+
+ Optional("transfer"): {
+ Optional("dojo"): UNIQUE_ID_REGEX,
+ Optional("module"): ID_REGEX,
+ "challenge": ID_REGEX,
+ },
+}])
+
+
+
+
+def get_challenge(dojo, module_id, challenge_id, transfer) -> Challenges:
+ if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first():
+ return chal
+ if transfer:
+ assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()), "Transfer Error: transfers can only be utilized by official dojos or by system admins during dojo creation"
+ old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"]
+ old_dojo = Dojos.from_id(old_dojo_id).first()
+ assert old_dojo, f"Transfer Error: unable to find source dojo in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}"
+ old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first()
+ assert old_challenge, f"Transfer Error: unable to find source module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}"
+ old_challenge.category = dojo.hex_dojo_id
+ old_challenge.name = f"{module_id}:{challenge_id}"
+ return old_challenge
+ return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")])
+
+
+def import_challenge(challenge_data, module_data, dojo_data) -> DojoChallenges:
+ # Handles the heirarchy of imports
+ try:
+ import_data = (
+ first_present("dojo", challenge_data["import"], module_data.get("import"), dojo_data.get("import"), required=True),
+ first_present("module", challenge_data["import"], module_data.get("import"), required=True), # No need to check dojo_data imports because module can never be defined there
+ challenge_data["import"]["challenge"],
+ )
+ except KeyError as e:
+ raise AssertionError(f'Import Error: {e}')
+
+ imported_challenge = import_one(DojoChallenges.from_id(*import_data), f"{'/'.join(import_data)} does not exist")
+ for attr in ["id", "name", "description"]:
+ if attr not in challenge_data:
+ challenge_data[attr] = getattr(imported_challenge, attr)
+
+ # TODO: maybe we should track the entire import
+ challenge_data["image"] = imported_challenge.data.get("image")
+ return imported_challenge
+
+
+
+def challenges_from_spec(dojo, dojo_data, module_data) -> list[DojoChallenges]:
+ try:
+ challenge_list = CHALLENGE_SPEC.validate(module_data["challenges"])
+ except SchemaError as e:
+ raise AssertionError(f"Invalid challenge specification: {e}")
+
+ module_id = module_data["id"]
+
+ # This is for caching existing challenges to improve performance of updating a dojo
+ existing_module = next((module for module in dojo.modules if module.id == module_id), None)
+ existing_challenges = {challenge.id: challenge.challenge for challenge in existing_module.challenges} if existing_module else {}
+
+ result = []
+ for challenge_data in challenge_list:
+ data_priority_chain = (challenge_data, module_data, dojo_data)
+
+ path_override = None
+ challenge_id = challenge_data.get("id")
+ if "import" in challenge_data:
+ imported_challenge = import_challenge(*data_priority_chain) # import has to be done before DojoChallenges creation because it modifies challenge_data
+ path_override = str(imported_challenge.path)
+ ctfd_challenge = imported_challenge.challenge
+ elif challenge_id in existing_challenges:
+ ctfd_challenge = existing_challenges[challenge_id]
+ else:
+ ctfd_challenge = get_challenge(dojo, module_id, challenge_data.get("id"), transfer=challenge_data.get("transfer"))
+
+ assert challenge_data.get("id") is not None, f"Challenge id not present in challenge data. {challenge_data=}"
+
+ result.append(
+ DojoChallenges(
+ **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]},
+ image=first_present("image", *data_priority_chain),
+ allow_privileged=first_present("allow_privileged", *data_priority_chain, DojoChallenges.data_defaults),
+ importable=first_present("importable", *data_priority_chain, DojoChallenges.data_defaults),
+ progression_locked=first_present("progression_locked", challenge_data, DojoChallenges.data_defaults),
+ survey=first_present("survey", *data_priority_chain),
+ visibility=get_visibility(DojoChallengeVisibilities, *data_priority_chain),
+ path_override=path_override,
+ challenge=ctfd_challenge,
+ )
+ )
+ return result
\ No newline at end of file
diff --git a/dojo_plugin/dojo_creation/dojo_builder.py b/dojo_plugin/dojo_creation/dojo_builder.py
new file mode 100644
index 000000000..aaf7f48c7
--- /dev/null
+++ b/dojo_plugin/dojo_creation/dojo_builder.py
@@ -0,0 +1,94 @@
+import yaml
+
+from schema import Schema, Optional, Regex, Or, SchemaError
+
+from ..models import Dojos
+from .builder_utils import (
+ ID_REGEX,
+ UNIQUE_ID_REGEX,
+ IMAGE_REGEX,
+ FILE_PATH_REGEX,
+ FILE_URL_REGEX,
+ BASE_SPEC,
+ import_one,
+)
+from .module_builder import modules_from_spec
+
+
+DOJO_SPEC = Schema({
+ **BASE_SPEC,
+
+ Optional("password"): Regex(r"^[\S ]{8,128}$"),
+
+ Optional("type"): ID_REGEX,
+ Optional("award"): {
+ Optional("emoji"): Regex(r"^\S$"),
+ Optional("belt"): IMAGE_REGEX
+ },
+
+ Optional("show_scoreboard"): bool,
+
+
+ Optional("import"): {
+ "dojo": UNIQUE_ID_REGEX,
+ },
+
+
+ Optional("pages", default=[]): [str],
+ Optional("files", default=[]): [Or(
+ {
+ "type": "download",
+ "path": FILE_PATH_REGEX,
+ "url": FILE_URL_REGEX,
+ },
+ {
+ "type": "text",
+ "path": FILE_PATH_REGEX,
+ "content": str,
+ }
+ )],
+
+ Optional("modules", default=[]): list, # Defer module validation until later
+})
+
+
+DOJO_ATTRIBUTES = ["id", "name", "description", "password", "type", "award", "pages", "show_scoreboard"]
+
+
+def import_dojo(dojo_data):
+ # TODO: we probably don't need to restrict imports to official dojos
+ imported_dojo = import_one(
+ Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True),
+ f"Import dojo `{dojo_data['import']['dojo']}` does not exist"
+ )
+
+ for attr in DOJO_ATTRIBUTES:
+ if attr not in dojo_data:
+ dojo_data[attr] = getattr(imported_dojo, attr)
+
+ # Modules will be initialized at the module layer, and challenges at the challenge layer
+ if not dojo_data["modules"]:
+ dojo_data["modules"] = [{"import": {"module": module.id}} for module in imported_dojo.modules]
+
+
+def dojo_from_spec(data: dict, *, dojo=None) -> Dojos:
+ try:
+ dojo_data = DOJO_SPEC.validate(data)
+ except SchemaError as e:
+ raise AssertionError(f"Invalid dojo specification: {e}")
+
+ if "import" in dojo_data:
+ import_dojo(dojo_data)
+
+ dojo_kwargs = {attr: dojo_data.get(attr) for attr in DOJO_ATTRIBUTES}
+ if dojo is None:
+ dojo = Dojos(**dojo_kwargs)
+ else:
+ for name, value in dojo_kwargs.items():
+ setattr(dojo, name, value)
+
+ assert dojo_data.get("id") is not None, "Dojo id must be defined"
+
+ dojo.modules = modules_from_spec(dojo, dojo_data)
+
+ return dojo
\ No newline at end of file
diff --git a/dojo_plugin/dojo_creation/dojo_initializer.py b/dojo_plugin/dojo_creation/dojo_initializer.py
new file mode 100644
index 000000000..a6c13e9a9
--- /dev/null
+++ b/dojo_plugin/dojo_creation/dojo_initializer.py
@@ -0,0 +1,288 @@
+import os
+import re
+import sys
+import subprocess
+import tempfile
+import traceback
+import pathlib
+import urllib.request
+
+import yaml
+import requests
+import typing
+from typing import Any
+from sqlalchemy.exc import IntegrityError
+from pathlib import Path
+from CTFd.models import db, Users
+from CTFd.utils.user import is_admin
+
+from ..models import DojoAdmins, Dojos
+from ..config import DOJOS_DIR
+from ..utils.dojo import dojo_git_command
+from .dojo_builder import dojo_from_spec
+
+
+DOJOS_TMP_DIR = DOJOS_DIR/"tmp"
+DOJOS_TMP_DIR.mkdir(exist_ok=True)
+
+
+
+def setdefault_name(data):
+ if "import" in data:
+ return
+ if "name" in data:
+ return
+ if "id" not in data:
+ return
+ data["name"] = data["id"].replace("-", " ").title()
+
+
+def setdefault_description(data, file_path):
+ if file_path.exists():
+ data.setdefault("description", file_path.read_text())
+
+
+def setdefault_subyaml(data: dict[str, Any], subyaml_path: Path):
+ if not subyaml_path.exists():
+ return data
+
+ topyaml_data = dict(data)
+ subyaml_data = yaml.safe_load(subyaml_path.read_text())
+ data.clear()
+ data.update(subyaml_data)
+ data.update(topyaml_data) # This overwrites any subyaml data with the "topyaml" data
+
+
+def load_dojo_subyamls(data: dict[str, Any], dojo_dir: Path) -> dict[str, Any]:
+ """
+ The dojo yaml gets augmented with additional yamls and markdown files found in the dojo repo structure.
+
+ The meta-structure is:
+
+ repo-root/dojo.yml
+ repo-root/DESCRIPTION.md <- if dojo description is missing
+ repo-root/module-id/module.yml <- fills in missing fields for module in dojo.yml (only module id *needs* to be in dojo.yml)
+ repo-root/module-id/DESCRIPTION.md <- if module description is missing
+ repo-root/module-id/challenge-id/challenge.yml <- fills in missing fields for challenge in higher-level ymls (only challenge id *needs* to be in dojo.yml/module.yml)
+ repo-root/module-id/challenge-id/DESCRIPTION.md <- if challenge description is missing
+
+ The higher-level details override the lower-level details.
+ """
+
+ setdefault_description(data, dojo_dir / "DESCRIPTION.md")
+
+ for module_data in data.get("modules", []):
+ if "id" not in module_data:
+ continue
+
+ module_dir = dojo_dir / module_data["id"]
+ setdefault_subyaml(module_data, module_dir / "module.yml")
+ setdefault_description(module_data, module_dir / "DESCRIPTION.md")
+ setdefault_name(module_data)
+
+ for challenge_data in module_data.get("challenges", []):
+ if "id" not in challenge_data:
+ continue
+
+ challenge_dir = module_dir / challenge_data["id"]
+ setdefault_subyaml(challenge_data, challenge_dir / "challenge.yml")
+ setdefault_description(challenge_data, challenge_dir / "DESCRIPTION.md")
+ setdefault_name(challenge_data)
+
+ return data
+
+
+def dojo_initialize_files(data: dict[str, Any], dojo_dir: Path):
+ for dojo_file in data.get("files", []):
+ assert is_admin(), "yml-specified files support requires admin privileges"
+ rel_path = dojo_file["path"]
+
+ abs_path = dojo_dir / rel_path
+ assert not abs_path.is_symlink(), f"{rel_path} is a symbolic link!"
+ if abs_path.exists():
+ continue
+ abs_path.parent.mkdir(parents=True, exist_ok=True)
+
+ if dojo_file["type"] == "download":
+ urllib.request.urlretrieve(dojo_file["url"], str(abs_path))
+ assert abs_path.stat().st_size >= 50*1024*1024, f"{rel_path} is small enough to fit into git ({abs_path.stat().st_size} bytes) --- put it in the repository!"
+ if dojo_file["type"] == "text":
+ with open(abs_path, "w") as o:
+ o.write(dojo_file["content"])
+
+
+def dojo_from_dir(dojo_dir: Path, *, dojo: typing.Optional[Dojos]=None) -> Dojos:
+ dojo_yml_path = dojo_dir / "dojo.yml"
+ assert dojo_yml_path.exists(), "Missing file: `dojo.yml`"
+
+ for path in dojo_dir.rglob("**"):
+ assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo"
+
+ data_raw = yaml.safe_load(dojo_yml_path.read_text())
+ data = load_dojo_subyamls(data_raw, dojo_dir)
+ dojo_initialize_files(data, dojo_dir)
+
+ built_dojo = dojo_from_spec(data, dojo=dojo)
+
+ validate_challenge_paths(built_dojo, dojo_dir)
+ initialize_course(built_dojo, dojo_dir)
+
+ return built_dojo
+
+
+
+def validate_challenge_paths(dojo, dojo_dir):
+ with dojo.located_at(dojo_dir):
+ missing_challenge_paths = [
+ challenge
+ for module in dojo.modules
+ for challenge in module.challenges
+ if not challenge.path.exists()
+ ]
+ assert not missing_challenge_paths, "".join(
+ f"Missing challenge path: {challenge.module.id}/{challenge.id}\n"
+ for challenge in missing_challenge_paths)
+
+def initialize_course(dojo, dojo_dir):
+ course_yml_path = dojo_dir / "course.yml"
+ if course_yml_path.exists():
+ course = yaml.safe_load(course_yml_path.read_text())
+
+ if "discord_role" in course and not dojo.official:
+ raise AssertionError("Unofficial dojos cannot have a discord role")
+
+ dojo.course = course
+
+ students_yml_path = dojo_dir / "students.yml"
+ if students_yml_path.exists():
+ students = yaml.safe_load(students_yml_path.read_text())
+ dojo.course["students"] = students
+
+ syllabus_path = dojo_dir / "SYLLABUS.md"
+ if "syllabus" not in dojo.course and syllabus_path.exists():
+ dojo.course["syllabus"] = syllabus_path.read_text()
+
+ grade_path = dojo_dir / "grade.py"
+ if grade_path.exists():
+ dojo.course["grade_code"] = grade_path.read_text()
+
+
+def dojo_yml_dir(spec: str) -> tempfile.TemporaryDirectory:
+ yml_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR) # TODO: ignore_cleanup_errors=True
+ yml_dir_path = pathlib.Path(yml_dir.name)
+ with open(yml_dir_path / "dojo.yml", "w") as do:
+ do.write(spec)
+ return yml_dir
+
+
+def _assert_no_symlinks(dojo_dir):
+ if not isinstance(dojo_dir, pathlib.Path):
+ dojo_dir = pathlib.Path(dojo_dir)
+ for path in dojo_dir.rglob("*"):
+ assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo"
+
+
+def dojo_clone(repository, private_key):
+ tmp_dojos_dir = DOJOS_TMP_DIR
+ tmp_dojos_dir.mkdir(exist_ok=True)
+ clone_dir = tempfile.TemporaryDirectory(dir=tmp_dojos_dir) # TODO: ignore_cleanup_errors=True
+
+ key_file = tempfile.NamedTemporaryFile("w")
+ key_file.write(private_key)
+ key_file.flush()
+
+ url = f"https://github.com/{repository}"
+
+ if requests.head(url).status_code != 200:
+ url = f"git@github.com:{repository}"
+
+ subprocess.run(["git", "clone", "--depth=1", "--recurse-submodules", url, clone_dir.name],
+ env={
+ "GIT_SSH_COMMAND": f"ssh -i {key_file.name}",
+ "GIT_TERMINAL_PROMPT": "0",
+ },
+ check=True,
+ capture_output=True)
+
+ _assert_no_symlinks(clone_dir.name)
+
+ return clone_dir
+
+
+def dojo_create(user: Users, repository: str, public_key: str, private_key: str , spec: str):
+ try:
+ if repository:
+ repository_re = r"[\w\-]+/[\w\-]+"
+ repository = repository.replace("https://github.com/", "")
+ assert re.match(repository_re, repository), f"Invalid repository, expected format: {repository_re}
"
+
+ if Dojos.query.filter_by(repository=repository).first():
+ raise AssertionError("This repository already exists as a dojo")
+
+ dojo_dir = dojo_clone(repository, private_key)
+
+ elif spec:
+ assert is_admin(), "Must be an admin user to create dojos from spec rather than repositories"
+ dojo_dir = dojo_yml_dir(spec)
+ repository, public_key, private_key = None, None, None
+
+ else:
+ raise AssertionError("Repository or specification is required")
+
+ dojo_path = pathlib.Path(dojo_dir.name)
+
+ dojo = dojo_from_dir(dojo_path)
+ dojo.repository = repository
+ dojo.public_key = public_key
+ dojo.private_key = private_key
+ dojo.admins = [DojoAdmins(user=user)]
+
+ db.session.add(dojo)
+ db.session.commit()
+
+ dojo.path.parent.mkdir(exist_ok=True)
+ dojo_path.rename(dojo.path)
+ dojo_path.mkdir() # TODO: ignore_cleanup_errors=True
+
+ except subprocess.CalledProcessError as e:
+ deploy_url = f"https://github.com/{repository}/settings/keys"
+ raise RuntimeError(f"Failed to clone: add deploy key")
+
+ except IntegrityError:
+ raise RuntimeError("This repository already exists as a dojo")
+
+ except AssertionError as e:
+ raise RuntimeError(str(e))
+
+ except Exception as e:
+ traceback.print_exc(file=sys.stderr)
+ raise RuntimeError("An error occurred while creating the dojo")
+
+ return dojo
+
+
+def dojo_update(dojo):
+ if dojo.path.exists():
+ old_commit = dojo_git_command(dojo, "rev-parse", "HEAD").stdout.decode().strip()
+
+ tmp_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR)
+
+ os.rename(str(dojo.path), tmp_dir.name)
+
+ dojo_git_command(dojo, "fetch", "--depth=1", "origin", repo_path=tmp_dir.name)
+ dojo_git_command(dojo, "reset", "--hard", "origin", repo_path=tmp_dir.name)
+ dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name)
+
+ try:
+ _assert_no_symlinks(tmp_dir.name)
+ except AssertionError:
+ dojo_git_command(dojo, "reset", "--hard", old_commit, repo_path=tmp_dir.name)
+ dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name)
+ raise
+ finally:
+ os.rename(tmp_dir.name, str(dojo.path))
+ else:
+ tmpdir = dojo_clone(dojo.repository, dojo.private_key)
+ os.rename(tmpdir.name, str(dojo.path))
+ return dojo_from_dir(dojo.path, dojo=dojo)
+
diff --git a/dojo_plugin/dojo_creation/module_builder.py b/dojo_plugin/dojo_creation/module_builder.py
new file mode 100644
index 000000000..a320198fe
--- /dev/null
+++ b/dojo_plugin/dojo_creation/module_builder.py
@@ -0,0 +1,114 @@
+from schema import Schema, Optional, Or, SchemaError
+
+from ..models import DojoModules, DojoResources, DojoModuleVisibilities, DojoResourceVisibilities
+from .builder_utils import (
+ ID_REGEX,
+ UNIQUE_ID_REGEX,
+ NAME_REGEX,
+ VISIBILITY,
+ BASE_SPEC,
+ import_one,
+ first_present,
+ get_visibility,
+)
+from .challenge_builder import challenges_from_spec
+
+
+
+MODULE_SPEC = Schema([{
+ **BASE_SPEC,
+
+ Optional("show_challenges"): bool,
+ Optional("show_scoreboard"): bool,
+
+ Optional("import"): {
+ Optional("dojo"): UNIQUE_ID_REGEX,
+ "module": ID_REGEX,
+ },
+ Optional("resources", default=[]): [Or(
+ {
+ "type": "markdown",
+ "name": NAME_REGEX,
+ "content": str,
+ **VISIBILITY,
+ },
+ {
+ "type": "lecture",
+ "name": NAME_REGEX,
+ Optional("video"): str,
+ Optional("playlist"): str,
+ Optional("slides"): str,
+ **VISIBILITY,
+ },
+ )],
+
+
+ Optional("challenges", default=[]): list, # Defer challenge validation
+}])
+
+
+
+RESOURCE_ATTRIBUTES = ["name", "type", "content", "video", "playlist", "slides"]
+def build_dojo_resources(module_data, dojo_data):
+ if "resources" not in module_data:
+ return None
+
+ return [
+ DojoResources(
+ **{attr: resource_data.get(attr) for attr in RESOURCE_ATTRIBUTES},
+ visibility=get_visibility(DojoResourceVisibilities, resource_data, module_data, dojo_data),
+ )
+ for resource_data in module_data["resources"]
+ ]
+
+
+def import_module(module_data, dojo_data):
+ try:
+ import_data = (
+ first_present("dojo", module_data["import"], dojo_data.get("import"), required=True),
+ module_data["import"]["module"],
+ )
+ except KeyError as e:
+ raise AssertionError(f'Import Error: {e}')
+
+ imported_module = import_one(DojoModules.from_id(*import_data), f"{'/'.join(import_data)} does not exist")
+ for attr in ["id", "name", "description"]:
+ if attr not in module_data:
+ module_data[attr] = getattr(imported_module, attr)
+
+ # The idea here is that once it reaches challenges_from_spec it will process the actual challenge importing
+ if not module_data["challenges"]:
+ module_data["challenges"] = [{"import": {"challenge": challenge.id}} for challenge in imported_module.challenges]
+
+ if not module_data["resources"]:
+ module_data["resources"] = [
+ {
+ attr: getattr(resource, attr) for attr in RESOURCE_ATTRIBUTES if getattr(resource, attr, None) is not None
+ } for resource in imported_module.resources
+ ]
+
+
+def modules_from_spec(dojo, dojo_data):
+ try:
+ module_list = MODULE_SPEC.validate(dojo_data["modules"])
+ except SchemaError as e:
+ raise AssertionError(f"Invalid module specification: {e}")
+
+ result = []
+ for module_data in module_list:
+ if "import" in module_data:
+ import_module(module_data, dojo_data)
+
+ assert module_data.get("id") is not None, f"Module id not present in module data. {module_data=}"
+
+ result.append(
+ DojoModules(
+ **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]},
+ resources=build_dojo_resources(module_data, dojo_data),
+ visibility=get_visibility(DojoModuleVisibilities, module_data, dojo_data),
+ show_challenges=first_present("show_challenges", module_data, dojo_data, DojoModules.data_defaults),
+ show_scoreboard=first_present("show_scoreboard", module_data, dojo_data, DojoModules.data_defaults),
+ challenges=challenges_from_spec(dojo, dojo_data, module_data),
+ )
+ )
+ return result
\ No newline at end of file
diff --git a/dojo_plugin/models/__init__.py b/dojo_plugin/models/__init__.py
index 6e021348a..d27807cdb 100644
--- a/dojo_plugin/models/__init__.py
+++ b/dojo_plugin/models/__init__.py
@@ -364,34 +364,13 @@ class DojoModules(db.Model):
def __init__(self, *args, **kwargs):
- default = kwargs.pop("default", None)
- visibility = kwargs["visibility"] if "visibility" in kwargs else None
-
data = kwargs.pop("data", {})
+
for field in self.data_fields:
if field in kwargs:
data[field] = kwargs.pop(field)
kwargs["data"] = data
- if default:
- for field in ["id", "name", "description"]:
- kwargs[field] = kwargs[field] if kwargs.get(field) is not None else getattr(default, field, None)
-
- kwargs["challenges"] = (
- kwargs.pop("challenges", None) or
- ([DojoChallenges(
- default=challenge,
- visibility=(DojoChallengeVisibilities(start=visibility.start) if visibility else None),
- ) for challenge in default.challenges] if default else [])
- )
- kwargs["resources"] = (
- kwargs.pop("resources", None) or
- ([DojoResources(
- default=resource,
- visibility=(DojoResourceVisibilities(start=visibility.start) if visibility else None),
- ) for resource in default.resources] if default else [])
- )
-
super().__init__(*args, **kwargs)
def __getattr__(self, name):
@@ -499,25 +478,13 @@ class DojoChallenges(db.Model):
# survey_responses = db.relationship("SurveyResponses", back_populates="challenge", cascade="all, delete-orphan")
def __init__(self, *args, **kwargs):
- default = kwargs.pop("default", None)
-
data = kwargs.pop("data", {})
+
for field in self.data_fields:
if field in kwargs:
data[field] = kwargs.pop(field)
kwargs["data"] = data
- if default:
- if kwargs.get("challenge") is not None:
- raise AttributeError("Import requires challenge to be None")
-
- for field in ["id", "name", "description", "challenge"]:
- kwargs[field] = kwargs[field] if kwargs.get(field) is not None else getattr(default, field, None)
-
- # TODO: maybe we should track the entire import
- kwargs["data"]["image"] = default.data.get("image")
- kwargs["data"]["path_override"] = str(default.path)
-
super().__init__(*args, **kwargs)
def __getattr__(self, name):
@@ -663,28 +630,12 @@ class DojoResources(db.Model):
def __init__(self, *args, **kwargs):
- default = kwargs.pop("default", None)
-
data = kwargs.pop("data", {})
for field in self.data_fields:
if field in kwargs:
data[field] = kwargs.pop(field)
kwargs["data"] = data
- if default:
- if kwargs.get("data"):
- raise AttributeError("Import requires data to be empty")
-
- for field in ["type", "name"]:
- kwargs[field] = kwargs[field] if kwargs.get(field) is not None else getattr(default, field, None)
-
- for field in self.data_fields:
- kwargs["data"][field] = (
- kwargs["data"][field]
- if kwargs["data"].get(field) is not None
- else getattr(default, field, None)
- )
-
super().__init__(*args, **kwargs)
def __getattr__(self, name):
diff --git a/dojo_plugin/pages/dojo.py b/dojo_plugin/pages/dojo.py
index 6aacd1a46..4705b5e39 100644
--- a/dojo_plugin/pages/dojo.py
+++ b/dojo_plugin/pages/dojo.py
@@ -14,8 +14,9 @@
from ..utils import get_current_container, get_all_containers, render_markdown
from ..utils.stats import get_container_stats, get_dojo_stats
-from ..utils.dojo import dojo_route, get_current_dojo_challenge, dojo_update, dojo_admins_only
+from ..utils.dojo import dojo_route, get_current_dojo_challenge, dojo_admins_only
from ..models import Dojos, DojoUsers, DojoStudents, DojoModules, DojoMembers, DojoChallenges
+from ..dojo_creation.dojo_initializer import dojo_update
dojo = Blueprint("pwncollege_dojo", __name__)
#pylint:disable=redefined-outer-name
@@ -146,7 +147,7 @@ def update_dojo(dojo, update_code=None):
dojo_update(dojo)
db.session.commit()
except Exception as e:
- print(f"ERROR: Dojo failed for {dojo}", file=sys.stderr, flush=True)
+ print(f"ERROR: Dojo update failed.", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
return {"success": False, "error": str(e)}, 400
return {"success": True}
diff --git a/dojo_plugin/utils/dojo.py b/dojo_plugin/utils/dojo.py
index 95e528eda..5a5d22d18 100644
--- a/dojo_plugin/utils/dojo.py
+++ b/dojo_plugin/utils/dojo.py
@@ -1,474 +1,16 @@
-import os
-import re
import subprocess
-import sys
import tempfile
-import traceback
-import datetime
import functools
import inspect
import pathlib
-import urllib.request
-import yaml
-import requests
-from schema import Schema, Optional, Regex, Or, Use, SchemaError
from flask import abort, g
-from sqlalchemy.exc import IntegrityError
-from sqlalchemy.orm.exc import NoResultFound
-from CTFd.models import db, Challenges, Flags
from CTFd.utils.user import get_current_user, is_admin
-from ..models import DojoAdmins, Dojos, DojoModules, DojoChallenges, DojoResources, DojoChallengeVisibilities, DojoResourceVisibilities, DojoModuleVisibilities
-from ..config import DOJOS_DIR
+from ..models import Dojos, DojoModules, DojoChallenges
from ..utils import get_current_container
-DOJOS_TMP_DIR = DOJOS_DIR/"tmp"
-DOJOS_TMP_DIR.mkdir(exist_ok=True)
-
-ID_REGEX = Regex(r"^[a-z0-9-]{1,32}$")
-UNIQUE_ID_REGEX = Regex(r"^[a-z0-9-~]{1,128}$")
-NAME_REGEX = Regex(r"^[\S ]{1,128}$")
-IMAGE_REGEX = Regex(r"^[\S]{1,256}$")
-FILE_PATH_REGEX = Regex(r"^[A-Za-z0-9_][A-Za-z0-9-_./]*$")
-FILE_URL_REGEX = Regex(r"^https://www.dropbox.com/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9.-_]*?rlkey=[a-zA-Z0-9]*&dl=1")
-DATE = Use(datetime.datetime.fromisoformat)
-
-ID_NAME_DESCRIPTION = {
- Optional("id"): ID_REGEX,
- Optional("name"): NAME_REGEX,
- Optional("description"): str,
-}
-
-VISIBILITY = {
- Optional("visibility", default={}): {
- Optional("start"): DATE,
- Optional("stop"): DATE,
- }
-}
-
-DOJO_SPEC = Schema({
- **ID_NAME_DESCRIPTION,
- **VISIBILITY,
-
- Optional("password"): Regex(r"^[\S ]{8,128}$"),
-
- Optional("type"): ID_REGEX,
- Optional("award"): {
- Optional("emoji"): Regex(r"^\S$"),
- Optional("belt"): IMAGE_REGEX
- },
-
- Optional("image"): IMAGE_REGEX,
- Optional("allow_privileged"): bool,
- Optional("show_scoreboard"): bool,
- Optional("importable"): bool,
-
- Optional("import"): {
- "dojo": UNIQUE_ID_REGEX,
- },
-
- Optional("auxiliary", default={}, ignore_extra_keys=True): dict,
-
- Optional("survey"): Or(
- {
- "type": "multiplechoice",
- "prompt": str,
- Optional("probability"): float,
- "options": [str],
- },
- {
- "type": "thumb",
- "prompt": str,
- Optional("probability"): float,
- },
- {
- "type": "freeform",
- "prompt": str,
- Optional("probability"): float,
- },
- ),
-
- Optional("modules", default=[]): [{
- **ID_NAME_DESCRIPTION,
- **VISIBILITY,
-
- Optional("image"): IMAGE_REGEX,
- Optional("allow_privileged"): bool,
- Optional("show_challenges"): bool,
- Optional("show_scoreboard"): bool,
- Optional("importable"): bool,
-
- Optional("import"): {
- Optional("dojo"): UNIQUE_ID_REGEX,
- "module": ID_REGEX,
- },
-
- Optional("survey"): Or(
- {
- "type": "multiplechoice",
- "prompt": str,
- Optional("probability"): float,
- "options": [str],
- },
- {
- "type": "thumb",
- "prompt": str,
- Optional("probability"): float,
- },
- {
- "type": "freeform",
- "prompt": str,
- Optional("probability"): float,
- },
- ),
-
- Optional("challenges", default=[]): [{
- **ID_NAME_DESCRIPTION,
- **VISIBILITY,
-
- Optional("image"): IMAGE_REGEX,
- Optional("allow_privileged"): bool,
- Optional("importable"): bool,
- Optional("progression_locked"): bool,
- Optional("auxiliary", default={}, ignore_extra_keys=True): dict,
- # Optional("path"): Regex(r"^[^\s\.\/][^\s\.]{,255}$"),
-
- Optional("import"): {
- Optional("dojo"): UNIQUE_ID_REGEX,
- Optional("module"): ID_REGEX,
- "challenge": ID_REGEX,
- },
-
- Optional("transfer"): {
- Optional("dojo"): UNIQUE_ID_REGEX,
- Optional("module"): ID_REGEX,
- "challenge": ID_REGEX,
- },
-
- Optional("survey"): Or(
- {
- "type": "multiplechoice",
- "prompt": str,
- Optional("probability"): float,
- "options": [str],
- },
- {
- "type": "thumb",
- "prompt": str,
- Optional("probability"): float,
- },
- {
- "type": "freeform",
- "prompt": str,
- Optional("probability"): float,
- },
- )
- }],
-
- Optional("resources", default=[]): [Or(
- {
- "type": "markdown",
- "name": NAME_REGEX,
- "content": str,
- **VISIBILITY,
- },
- {
- "type": "lecture",
- "name": NAME_REGEX,
- Optional("video"): str,
- Optional("playlist"): str,
- Optional("slides"): str,
- **VISIBILITY,
- },
- )],
-
- Optional("auxiliary", default={}, ignore_extra_keys=True): dict,
- }],
- Optional("pages", default=[]): [str],
- Optional("files", default=[]): [Or(
- {
- "type": "download",
- "path": FILE_PATH_REGEX,
- "url": FILE_URL_REGEX,
- },
- {
- "type": "text",
- "path": FILE_PATH_REGEX,
- "content": str,
- }
- )],
-})
-
-
-def setdefault_name(entry):
- if "import" in entry:
- return
- if "name" in entry:
- return
- if "id" not in entry:
- return
- entry["name"] = entry["id"].replace("-", " ").title()
-
-
-def setdefault_file(data, key, file_path):
- if file_path.exists():
- data.setdefault("description", file_path.read_text())
-
-
-def setdefault_subyaml(data, subyaml_path):
- if not subyaml_path.exists():
- return data
-
- topyaml_data = dict(data)
- subyaml_data = yaml.safe_load(subyaml_path.read_text())
- data.clear()
- data.update(subyaml_data)
- data.update(topyaml_data)
-
-
-def load_dojo_subyamls(data, dojo_dir):
- """
- The dojo yaml gets augmented with additional yamls and markdown files found in the dojo repo structure.
-
- The meta-structure is:
-
- repo-root/dojo.yml
- repo-root/DESCRIPTION.md <- if dojo description is missing
- repo-root/module-id/module.yml <- fills in missing fields for module in dojo.yml (only module id *needs* to be in dojo.yml)
- repo-root/module-id/DESCRIPTION.md <- if module description is missing
- repo-root/module-id/challenge-id/challenge.yml <- fills in missing fields for challenge in higher-level ymls (only challenge id *needs* to be in dojo.yml/module.yml)
- repo-root/module-id/challenge-id/DESCRIPTION.md <- if challenge description is missing
-
- The higher-level details override the lower-level details.
- """
-
- setdefault_file(data, "description", dojo_dir / "DESCRIPTION.md")
-
- for module_data in data.get("modules", []):
- if "id" not in module_data:
- continue
-
- module_dir = dojo_dir / module_data["id"]
- setdefault_subyaml(module_data, module_dir / "module.yml")
- setdefault_file(module_data, "description", module_dir / "DESCRIPTION.md")
- setdefault_name(module_data)
-
- for challenge_data in module_data.get("challenges", []):
- if "id" not in challenge_data:
- continue
-
- challenge_dir = module_dir / challenge_data["id"]
- setdefault_subyaml(challenge_data, challenge_dir / "challenge.yml")
- setdefault_file(challenge_data, "description", challenge_dir / "DESCRIPTION.md")
- setdefault_name(challenge_data)
-
- return data
-
-
-def dojo_initialize_files(data, dojo_dir):
- for dojo_file in data.get("files", []):
- assert is_admin(), "yml-specified files support requires admin privileges"
- rel_path = dojo_dir / dojo_file["path"]
-
- abs_path = dojo_dir / rel_path
- assert not abs_path.is_symlink(), f"{rel_path} is a symbolic link!"
- if abs_path.exists():
- continue
- abs_path.parent.mkdir(parents=True, exist_ok=True)
-
- if dojo_file["type"] == "download":
- urllib.request.urlretrieve(dojo_file["url"], str(abs_path))
- assert abs_path.stat().st_size >= 50*1024*1024, f"{rel_path} is small enough to fit into git ({abs_path.stat().st_size} bytes) --- put it in the repository!"
- if dojo_file["type"] == "text":
- with open(abs_path, "w") as o:
- o.write(dojo_file["content"])
-
-
-def dojo_from_dir(dojo_dir, *, dojo=None):
- dojo_yml_path = dojo_dir / "dojo.yml"
- assert dojo_yml_path.exists(), "Missing file: `dojo.yml`"
-
- for path in dojo_dir.rglob("**"):
- assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo"
-
- data_raw = yaml.safe_load(dojo_yml_path.read_text())
- data = load_dojo_subyamls(data_raw, dojo_dir)
- dojo_initialize_files(data, dojo_dir)
- return dojo_from_spec(data, dojo_dir=dojo_dir, dojo=dojo)
-
-
-def dojo_from_spec(data, *, dojo_dir=None, dojo=None):
- try:
- dojo_data = DOJO_SPEC.validate(data)
- except SchemaError as e:
- raise AssertionError(e) # TODO: this probably shouldn't be re-raised as an AssertionError
-
- def assert_importable(o):
- assert o.importable, f"Import disallowed for {o}."
- if isinstance(o, Dojos):
- for m in o.module:
- assert_importable(m)
- if isinstance(o, DojoModules):
- for c in o.challenges:
- assert_importable(c)
-
- def assert_import_one(query, error_message):
- try:
- o = query.one()
- assert_importable(o)
- return o
- except NoResultFound:
- raise AssertionError(error_message)
-
- # TODO: we probably don't need to restrict imports to official dojos
- import_dojo = (
- assert_import_one(Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True),
- "Import dojo `{dojo_data['import']['dojo']}` does not exist")
- if "import" in dojo_data else None
- )
-
- dojo_kwargs = {
- field: dojo_data.get(field, getattr(import_dojo, field, None))
- for field in ["id", "name", "description", "password", "type", "award", "show_scoreboard"]
- }
-
- assert dojo_kwargs.get("id") is not None, "Dojo id must be defined"
-
- if dojo is None:
- dojo = Dojos(**dojo_kwargs)
- else:
- for name, value in dojo_kwargs.items():
- setattr(dojo, name, value)
-
- existing_challenges = {(challenge.module.id, challenge.id): challenge.challenge for challenge in dojo.challenges}
- def challenge(module_id, challenge_id, transfer=None):
- if (module_id, challenge_id) in existing_challenges:
- return existing_challenges[(module_id, challenge_id)]
- if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first():
- return chal
- if transfer:
- assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first())
- old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"]
- old_dojo = Dojos.from_id(old_dojo_id).first()
- old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first()
- assert old_dojo and old_challenge, f"unable to find source dojo/module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}"
- old_challenge.category = dojo.hex_dojo_id
- old_challenge.name = f"{module_id}:{challenge_id}"
- return old_challenge
- return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")])
-
- def visibility(cls, *args):
- start = None
- stop = None
- for arg in args:
- start = arg.get("visibility", {}).get("start") or start
- stop = arg.get("visibility", {}).get("stop") or stop
- if start or stop:
- start = start.astimezone(datetime.timezone.utc) if start else None
- stop = stop.astimezone(datetime.timezone.utc) if stop else None
- return cls(start=start, stop=stop)
-
- _missing = object()
- def shadow(attr, *datas, default=_missing, default_dict=None):
- for data in reversed(datas):
- if attr in data:
- return data[attr]
- if default is not _missing:
- return default
- elif default_dict and attr in default_dict:
- return default_dict[attr]
- raise KeyError(f"Missing `{attr}` in `{datas}`")
-
- def import_ids(attrs, *datas):
- datas_import = [data.get("import", {}) for data in datas]
- return tuple(shadow(id, *datas_import) for id in attrs)
-
- dojo.modules = [
- DojoModules(
- **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]},
- challenges=[
- DojoChallenges(
- **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]},
- image=shadow("image", dojo_data, module_data, challenge_data, default=None),
- allow_privileged=shadow("allow_privileged", dojo_data, module_data, challenge_data, default_dict=DojoChallenges.data_defaults),
- importable=shadow("importable", dojo_data, module_data, challenge_data, default_dict=DojoChallenges.data_defaults),
- challenge=challenge(
- module_data.get("id"), challenge_data.get("id"), transfer=challenge_data.get("transfer", None)
- ) if "import" not in challenge_data else None,
- progression_locked=challenge_data.get("progression_locked"),
- visibility=visibility(DojoChallengeVisibilities, dojo_data, module_data, challenge_data),
- survey=shadow("survey", dojo_data, module_data, challenge_data, default=None),
- default=(assert_import_one(DojoChallenges.from_id(*import_ids(["dojo", "module", "challenge"], dojo_data, module_data, challenge_data)),
- f"Import challenge `{'/'.join(import_ids(['dojo', 'module', 'challenge'], dojo_data, module_data, challenge_data))}` does not exist")
- if "import" in challenge_data else None),
- )
- for challenge_data in module_data["challenges"]
- ] if "challenges" in module_data else None,
- resources = [
- DojoResources(
- **{kwarg: resource_data.get(kwarg) for kwarg in ["name", "type", "content", "video", "playlist", "slides"]},
- visibility=visibility(DojoResourceVisibilities, dojo_data, module_data, resource_data),
- )
- for resource_data in module_data["resources"]
- ] if "resources" in module_data else None,
- default=(assert_import_one(DojoModules.from_id(*import_ids(["dojo", "module"], dojo_data, module_data)),
- f"Import module `{'/'.join(import_ids(['dojo', 'module'], dojo_data, module_data))}` does not exist")
- if "import" in module_data else None),
- visibility=visibility(DojoModuleVisibilities, dojo_data, module_data),
- show_challenges=shadow("show_challenges", dojo_data, module_data, default_dict=DojoModules.data_defaults),
- show_scoreboard=shadow("show_scoreboard", dojo_data, module_data, default_dict=DojoModules.data_defaults),
- )
- for module_data in dojo_data["modules"]
- ] if "modules" in dojo_data else [
- DojoModules(
- default=module,
- visibility=visibility(DojoModuleVisibilities, dojo_data, module_data),
- )
- for module in (import_dojo.modules if import_dojo else [])
- ]
-
- if dojo_dir:
- with dojo.located_at(dojo_dir):
- missing_challenge_paths = [
- challenge
- for module in dojo.modules
- for challenge in module.challenges
- if not challenge.path.exists()
- ]
- assert not missing_challenge_paths, "".join(
- f"Missing challenge path: {challenge.module.id}/{challenge.id}\n"
- for challenge in missing_challenge_paths)
-
- course_yml_path = dojo_dir / "course.yml"
- if course_yml_path.exists():
- course = yaml.safe_load(course_yml_path.read_text())
-
- if "discord_role" in course and not dojo.official:
- raise AssertionError("Unofficial dojos cannot have a discord role")
-
- dojo.course = course
-
- students_yml_path = dojo_dir / "students.yml"
- if students_yml_path.exists():
- students = yaml.safe_load(students_yml_path.read_text())
- dojo.course["students"] = students
-
- syllabus_path = dojo_dir / "SYLLABUS.md"
- if "syllabus" not in dojo.course and syllabus_path.exists():
- dojo.course["syllabus"] = syllabus_path.read_text()
-
- grade_path = dojo_dir / "grade.py"
- if grade_path.exists():
- dojo.course["grade_code"] = grade_path.read_text()
-
- if dojo_data.get("pages"):
- dojo.pages = dojo_data["pages"]
-
- return dojo
-
-
def generate_ssh_keypair():
temp_dir = tempfile.TemporaryDirectory()
key_dir = pathlib.Path(temp_dir.name)
@@ -487,46 +29,6 @@ def generate_ssh_keypair():
return (public_key.read_text().strip(), private_key.read_text())
-def dojo_yml_dir(spec):
- yml_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR) # TODO: ignore_cleanup_errors=True
- yml_dir_path = pathlib.Path(yml_dir.name)
- with open(yml_dir_path / "dojo.yml", "w") as do:
- do.write(spec)
- return yml_dir
-
-
-def _assert_no_symlinks(dojo_dir):
- if not isinstance(dojo_dir, pathlib.Path):
- dojo_dir = pathlib.Path(dojo_dir)
- for path in dojo_dir.rglob("*"):
- assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo"
-
-
-def dojo_clone(repository, private_key):
- tmp_dojos_dir = DOJOS_TMP_DIR
- tmp_dojos_dir.mkdir(exist_ok=True)
- clone_dir = tempfile.TemporaryDirectory(dir=tmp_dojos_dir) # TODO: ignore_cleanup_errors=True
-
- key_file = tempfile.NamedTemporaryFile("w")
- key_file.write(private_key)
- key_file.flush()
-
- url = f"https://github.com/{repository}"
- if requests.head(url).status_code != 200:
- url = f"git@github.com:{repository}"
- subprocess.run(["git", "clone", "--depth=1", "--recurse-submodules", url, clone_dir.name],
- env={
- "GIT_SSH_COMMAND": f"ssh -i {key_file.name}",
- "GIT_TERMINAL_PROMPT": "0",
- },
- check=True,
- capture_output=True)
-
- _assert_no_symlinks(clone_dir.name)
-
- return clone_dir
-
-
def dojo_git_command(dojo, *args, repo_path=None):
key_file = tempfile.NamedTemporaryFile("w")
key_file.write(dojo.private_key)
@@ -544,89 +46,6 @@ def dojo_git_command(dojo, *args, repo_path=None):
capture_output=True)
-def dojo_create(user, repository, public_key, private_key, spec):
- try:
- if repository:
- repository_re = r"[\w\-]+/[\w\-]+"
- repository = repository.replace("https://github.com/", "")
- assert re.match(repository_re, repository), f"Invalid repository, expected format: {repository_re}
"
-
- if Dojos.query.filter_by(repository=repository).first():
- raise AssertionError("This repository already exists as a dojo")
-
- dojo_dir = dojo_clone(repository, private_key)
-
- elif spec:
- assert is_admin(), "Must be an admin user to create dojos from spec rather than repositories"
- dojo_dir = dojo_yml_dir(spec)
- repository, public_key, private_key = None, None, None
-
- else:
- raise AssertionError("Repository is required")
-
- dojo_path = pathlib.Path(dojo_dir.name)
-
- dojo = dojo_from_dir(dojo_path)
- dojo.repository = repository
- dojo.public_key = public_key
- dojo.private_key = private_key
- dojo.admins = [DojoAdmins(user=user)]
-
- db.session.add(dojo)
- db.session.commit()
-
- dojo.path.parent.mkdir(exist_ok=True)
- dojo_path.rename(dojo.path)
- dojo_path.mkdir() # TODO: ignore_cleanup_errors=True
-
- except subprocess.CalledProcessError as e:
- deploy_url = f"https://github.com/{repository}/settings/keys"
- raise RuntimeError(f"Failed to clone: add deploy key")
-
- except IntegrityError:
- raise RuntimeError("This repository already exists as a dojo")
-
- except AssertionError as e:
- raise RuntimeError(str(e))
-
- except Exception as e:
- traceback.print_exc(file=sys.stderr)
- raise RuntimeError("An error occurred while creating the dojo")
-
- return dojo
-
-
-def dojo_update(dojo):
- if dojo.path.exists():
- old_commit = dojo_git_command(dojo, "rev-parse", "HEAD").stdout.decode().strip()
-
- tmp_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR)
-
- os.rename(str(dojo.path), tmp_dir.name)
-
- dojo_git_command(dojo, "fetch", "--depth=1", "origin", repo_path=tmp_dir.name)
- dojo_git_command(dojo, "reset", "--hard", "origin", repo_path=tmp_dir.name)
- dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name)
-
- try:
- _assert_no_symlinks(tmp_dir.name)
- except AssertionError:
- dojo_git_command(dojo, "reset", "--hard", old_commit, repo_path=tmp_dir.name)
- dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name)
- raise
- finally:
- os.rename(tmp_dir.name, str(dojo.path))
- else:
- tmpdir = dojo_clone(dojo.repository, dojo.private_key)
- os.rename(tmpdir.name, str(dojo.path))
- return dojo_from_dir(dojo.path, dojo=dojo)
-
-
-def dojo_accessible(id):
- if is_admin():
- return Dojos.from_id(id).first()
- return Dojos.viewable(id=id, user=get_current_user()).first()
-
def dojo_admins_only(func):
signature = inspect.signature(func)
@@ -642,6 +61,12 @@ def wrapper(*args, **kwargs):
return wrapper
+def dojo_accessible(id: int) -> Dojos:
+ if is_admin():
+ return Dojos.from_id(id).first()
+ return Dojos.viewable(id=id, user=get_current_user()).first()
+
+
def dojo_route(func):
signature = inspect.signature(func)
@functools.wraps(func)