From 9a2390a74a2b4805d1030ce58710bd6f85ee7428 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Sun, 29 Jun 2025 19:57:56 -0700 Subject: [PATCH 01/11] WIP: begin refactoring dojo building into dojo_builder.py --- dojo_plugin/utils/dojo.py | 337 +---------------------- dojo_plugin/utils/dojo_builder.py | 443 ++++++++++++++++++++++++++++++ 2 files changed, 444 insertions(+), 336 deletions(-) create mode 100644 dojo_plugin/utils/dojo_builder.py diff --git a/dojo_plugin/utils/dojo.py b/dojo_plugin/utils/dojo.py index 55921d211..0f1de42cf 100644 --- a/dojo_plugin/utils/dojo.py +++ b/dojo_plugin/utils/dojo.py @@ -3,8 +3,6 @@ import subprocess import sys import tempfile -import traceback -import datetime import functools import inspect import pathlib @@ -22,181 +20,12 @@ from ..models import DojoAdmins, Dojos, DojoModules, DojoChallenges, DojoResources, DojoChallengeVisibilities, DojoResourceVisibilities, DojoModuleVisibilities from ..config import DOJOS_DIR from ..utils import get_current_container +from .dojo_builder import dojo_from_spec DOJOS_TMP_DIR = DOJOS_DIR/"tmp" DOJOS_TMP_DIR.mkdir(exist_ok=True) -ID_REGEX = Regex(r"^[a-z0-9-]{1,32}$") -UNIQUE_ID_REGEX = Regex(r"^[a-z0-9-~]{1,128}$") -NAME_REGEX = Regex(r"^[\S ]{1,128}$") -IMAGE_REGEX = Regex(r"^[\S]{1,256}$") -FILE_PATH_REGEX = Regex(r"^[A-Za-z0-9_][A-Za-z0-9-_./]*$") -FILE_URL_REGEX = Regex(r"^https://www.dropbox.com/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9.-_]*?rlkey=[a-zA-Z0-9]*&dl=1") -DATE = Use(datetime.datetime.fromisoformat) - -ID_NAME_DESCRIPTION = { - Optional("id"): ID_REGEX, - Optional("name"): NAME_REGEX, - Optional("description"): str, -} - -VISIBILITY = { - Optional("visibility", default={}): { - Optional("start"): DATE, - Optional("stop"): DATE, - } -} - -DOJO_SPEC = Schema({ - **ID_NAME_DESCRIPTION, - **VISIBILITY, - - Optional("password"): Regex(r"^[\S ]{8,128}$"), - - Optional("type"): ID_REGEX, - Optional("award"): { - Optional("emoji"): Regex(r"^\S$"), - Optional("belt"): IMAGE_REGEX - }, - - Optional("image"): IMAGE_REGEX, - Optional("allow_privileged"): bool, - Optional("importable"): bool, - - Optional("import"): { - "dojo": UNIQUE_ID_REGEX, - }, - - Optional("auxiliary", default={}, ignore_extra_keys=True): dict, - - Optional("survey"): Or( - { - "type": "multiplechoice", - "prompt": str, - Optional("probability"): float, - "options": [str], - }, - { - "type": "thumb", - "prompt": str, - Optional("probability"): float, - }, - { - "type": "freeform", - "prompt": str, - Optional("probability"): float, - }, - ), - - Optional("modules", default=[]): [{ - **ID_NAME_DESCRIPTION, - **VISIBILITY, - - Optional("image"): IMAGE_REGEX, - Optional("allow_privileged"): bool, - Optional("importable"): bool, - - Optional("import"): { - Optional("dojo"): UNIQUE_ID_REGEX, - "module": ID_REGEX, - }, - - Optional("survey"): Or( - { - "type": "multiplechoice", - "prompt": str, - Optional("probability"): float, - "options": [str], - }, - { - "type": "thumb", - "prompt": str, - Optional("probability"): float, - }, - { - "type": "freeform", - "prompt": str, - Optional("probability"): float, - }, - ), - - Optional("challenges", default=[]): [{ - **ID_NAME_DESCRIPTION, - **VISIBILITY, - - Optional("image"): IMAGE_REGEX, - Optional("allow_privileged"): bool, - Optional("importable"): bool, - Optional("progression_locked"): bool, - Optional("auxiliary", default={}, ignore_extra_keys=True): dict, - # Optional("path"): Regex(r"^[^\s\.\/][^\s\.]{,255}$"), - - Optional("import"): { - Optional("dojo"): UNIQUE_ID_REGEX, - Optional("module"): ID_REGEX, - "challenge": ID_REGEX, - }, - - Optional("transfer"): { - Optional("dojo"): UNIQUE_ID_REGEX, - Optional("module"): ID_REGEX, - "challenge": ID_REGEX, - }, - - Optional("survey"): Or( - { - "type": "multiplechoice", - "prompt": str, - Optional("probability"): float, - "options": [str], - }, - { - "type": "thumb", - "prompt": str, - Optional("probability"): float, - }, - { - "type": "freeform", - "prompt": str, - Optional("probability"): float, - }, - ) - }], - - Optional("resources", default=[]): [Or( - { - "type": "markdown", - "name": NAME_REGEX, - "content": str, - **VISIBILITY, - }, - { - "type": "lecture", - "name": NAME_REGEX, - Optional("video"): str, - Optional("playlist"): str, - Optional("slides"): str, - **VISIBILITY, - }, - )], - - Optional("auxiliary", default={}, ignore_extra_keys=True): dict, - }], - Optional("pages", default=[]): [str], - Optional("files", default=[]): [Or( - { - "type": "download", - "path": FILE_PATH_REGEX, - "url": FILE_URL_REGEX, - }, - { - "type": "text", - "path": FILE_PATH_REGEX, - "content": str, - } - )], -}) def setdefault_name(entry): @@ -296,170 +125,6 @@ def dojo_from_dir(dojo_dir, *, dojo=None): return dojo_from_spec(data, dojo_dir=dojo_dir, dojo=dojo) -def dojo_from_spec(data, *, dojo_dir=None, dojo=None): - try: - dojo_data = DOJO_SPEC.validate(data) - except SchemaError as e: - raise AssertionError(e) # TODO: this probably shouldn't be re-raised as an AssertionError - - def assert_importable(o): - assert o.importable, f"Import disallowed for {o}." - if isinstance(o, Dojos): - for m in o.module: - assert_importable(m) - if isinstance(o, DojoModules): - for c in o.challenges: - assert_importable(c) - - def assert_import_one(query, error_message): - try: - o = query.one() - assert_importable(o) - return o - except NoResultFound: - raise AssertionError(error_message) - - # TODO: we probably don't need to restrict imports to official dojos - import_dojo = ( - assert_import_one(Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True), - "Import dojo `{dojo_data['import']['dojo']}` does not exist") - if "import" in dojo_data else None - ) - - dojo_kwargs = { - field: dojo_data.get(field, getattr(import_dojo, field, None)) - for field in ["id", "name", "description", "password", "type", "award"] - } - - if dojo is None: - dojo = Dojos(**dojo_kwargs) - else: - for name, value in dojo_kwargs.items(): - setattr(dojo, name, value) - - existing_challenges = {(challenge.module.id, challenge.id): challenge.challenge for challenge in dojo.challenges} - def challenge(module_id, challenge_id, transfer=None): - if (module_id, challenge_id) in existing_challenges: - return existing_challenges[(module_id, challenge_id)] - if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first(): - return chal - if transfer: - assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()) - old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"] - old_dojo = Dojos.from_id(old_dojo_id).first() - old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first() - assert old_dojo and old_challenge, f"unable to find source dojo/module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" - old_challenge.category = dojo.hex_dojo_id - old_challenge.name = f"{module_id}:{challenge_id}" - return old_challenge - return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")]) - - def visibility(cls, *args): - start = None - stop = None - for arg in args: - start = arg.get("visibility", {}).get("start") or start - stop = arg.get("visibility", {}).get("stop") or stop - if start or stop: - start = start.astimezone(datetime.timezone.utc) if start else None - stop = stop.astimezone(datetime.timezone.utc) if stop else None - return cls(start=start, stop=stop) - - _missing = object() - def shadow(attr, *datas, default=_missing, default_dict=None): - for data in reversed(datas): - if attr in data: - return data[attr] - if default is not _missing: - return default - elif default_dict and attr in default_dict: - return default_dict[attr] - raise KeyError(f"Missing `{attr}` in `{datas}`") - - def import_ids(attrs, *datas): - datas_import = [data.get("import", {}) for data in datas] - return tuple(shadow(id, *datas_import) for id in attrs) - - dojo.modules = [ - DojoModules( - **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]}, - challenges=[ - DojoChallenges( - **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]}, - image=shadow("image", dojo_data, module_data, challenge_data, default=None), - allow_privileged=shadow("allow_privileged", dojo_data, module_data, challenge_data, default_dict=DojoChallenges.data_defaults), - importable=shadow("importable", dojo_data, module_data, challenge_data, default_dict=DojoChallenges.data_defaults), - challenge=challenge( - module_data.get("id"), challenge_data.get("id"), transfer=challenge_data.get("transfer", None) - ) if "import" not in challenge_data else None, - progression_locked=challenge_data.get("progression_locked"), - visibility=visibility(DojoChallengeVisibilities, dojo_data, module_data, challenge_data), - survey=shadow("survey", dojo_data, module_data, challenge_data, default=None), - default=(assert_import_one(DojoChallenges.from_id(*import_ids(["dojo", "module", "challenge"], dojo_data, module_data, challenge_data)), - f"Import challenge `{'/'.join(import_ids(['dojo', 'module', 'challenge'], dojo_data, module_data, challenge_data))}` does not exist") - if "import" in challenge_data else None), - ) - for challenge_data in module_data["challenges"] - ] if "challenges" in module_data else None, - resources = [ - DojoResources( - **{kwarg: resource_data.get(kwarg) for kwarg in ["name", "type", "content", "video", "playlist", "slides"]}, - visibility=visibility(DojoResourceVisibilities, dojo_data, module_data, resource_data), - ) - for resource_data in module_data["resources"] - ] if "resources" in module_data else None, - default=(assert_import_one(DojoModules.from_id(*import_ids(["dojo", "module"], dojo_data, module_data)), - f"Import module `{'/'.join(import_ids(['dojo', 'module'], dojo_data, module_data))}` does not exist") - if "import" in module_data else None), - visibility=visibility(DojoModuleVisibilities, dojo_data, module_data), - ) - for module_data in dojo_data["modules"] - ] if "modules" in dojo_data else [ - DojoModules( - default=module, - visibility=visibility(DojoModuleVisibilities, dojo_data, module_data), - ) - for module in (import_dojo.modules if import_dojo else []) - ] - - if dojo_dir: - with dojo.located_at(dojo_dir): - missing_challenge_paths = [ - challenge - for module in dojo.modules - for challenge in module.challenges - if not challenge.path.exists() - ] - assert not missing_challenge_paths, "".join( - f"Missing challenge path: {challenge.module.id}/{challenge.id}\n" - for challenge in missing_challenge_paths) - - course_yml_path = dojo_dir / "course.yml" - if course_yml_path.exists(): - course = yaml.safe_load(course_yml_path.read_text()) - - if "discord_role" in course and not dojo.official: - raise AssertionError("Unofficial dojos cannot have a discord role") - - dojo.course = course - - students_yml_path = dojo_dir / "students.yml" - if students_yml_path.exists(): - students = yaml.safe_load(students_yml_path.read_text()) - dojo.course["students"] = students - - syllabus_path = dojo_dir / "SYLLABUS.md" - if "syllabus" not in dojo.course and syllabus_path.exists(): - dojo.course["syllabus"] = syllabus_path.read_text() - - grade_path = dojo_dir / "grade.py" - if grade_path.exists(): - dojo.course["grade_code"] = grade_path.read_text() - - if dojo_data.get("pages"): - dojo.pages = dojo_data["pages"] - - return dojo def generate_ssh_keypair(): diff --git a/dojo_plugin/utils/dojo_builder.py b/dojo_plugin/utils/dojo_builder.py new file mode 100644 index 000000000..3380ba6f6 --- /dev/null +++ b/dojo_plugin/utils/dojo_builder.py @@ -0,0 +1,443 @@ +import datetime +import typing +import yaml + +from pathlib import Path +from schema import Schema, Optional, Regex, Or, Use, SchemaError + +from typing import Any +from dojo_plugin.models import Dojos, DojoModules, DojoChallenges, DojoResources, DojoChallengeVisibilities, DojoModuleVisibilities, DojoResourceVisibilities, Challenges, Flags +from sqlalchemy.orm.exc import NoResultFound +from CTFd.utils.user import is_admin + + +ID_REGEX = Regex(r"^[a-z0-9-]{1,32}$") +UNIQUE_ID_REGEX = Regex(r"^[a-z0-9-~]{1,128}$") +NAME_REGEX = Regex(r"^[\S ]{1,128}$") +IMAGE_REGEX = Regex(r"^[\S]{1,256}$") +FILE_PATH_REGEX = Regex(r"^[A-Za-z0-9_][A-Za-z0-9-_./]*$") +FILE_URL_REGEX = Regex(r"^https://www.dropbox.com/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9.-_]*?rlkey=[a-zA-Z0-9]*&dl=1") +DATE = Use(datetime.datetime.fromisoformat) + +ID_NAME_DESCRIPTION = { + Optional("id"): ID_REGEX, + Optional("name"): NAME_REGEX, + Optional("description"): str, +} + +VISIBILITY = { + Optional("visibility", default={}): { + Optional("start"): DATE, + Optional("stop"): DATE, + } +} + + +DOJO_SPEC = Schema({ + **ID_NAME_DESCRIPTION, + **VISIBILITY, + + Optional("password"): Regex(r"^[\S ]{8,128}$"), + + Optional("type"): ID_REGEX, + Optional("award"): { + Optional("emoji"): Regex(r"^\S$"), + Optional("belt"): IMAGE_REGEX + }, + + Optional("image"): IMAGE_REGEX, + Optional("allow_privileged"): bool, + Optional("importable"): bool, + + Optional("import"): { + "dojo": UNIQUE_ID_REGEX, + }, + + Optional("auxiliary", default={}, ignore_extra_keys=True): dict, + + Optional("survey"): Or( + { + "type": "multiplechoice", + "prompt": str, + Optional("probability"): float, + "options": [str], + }, + { + "type": "thumb", + "prompt": str, + Optional("probability"): float, + }, + { + "type": "freeform", + "prompt": str, + Optional("probability"): float, + }, + ), + + Optional("pages", default=[]): [str], + Optional("files", default=[]): [Or( + { + "type": "download", + "path": FILE_PATH_REGEX, + "url": FILE_URL_REGEX, + }, + { + "type": "text", + "path": FILE_PATH_REGEX, + "content": str, + } + )], + + Optional("modules", default=[]): list, # Defer module validation until later +}) +""" +This is the validation Schema that parses the dojo.yaml file during dojo initialization. + +In order to create a valid dojo.yaml, it must conform to the schema defined here. +""" + + +def dojo_from_spec(data: dict[str, Any], *, dojo_dir:typing.Optional[Path]=None, dojo:typing.Optional[Dojos]=None) -> Dojos: + try: + dojo_data = DOJO_SPEC.validate(data) + except SchemaError as e: + raise AssertionError(f"Invalid dojo specification: {e}") + + # def assert_importable(o): + # assert o.importable, f"Import disallowed for {o}." + # if isinstance(o, Dojos): + # for m in o.module: + # assert_importable(m) + # if isinstance(o, DojoModules): + # for c in o.challenges: + # assert_importable(c) + + def assert_import_one(query, error_message): + """ + Since dojos are queried by id, this ensures that only one dojo matches the id, as well as making sure that dojo is importable. + """ + try: + o = query.one() + assert o.importable, f"Import disallowed for {o}." + return o + except NoResultFound: + raise AssertionError(error_message) + + # TODO: we probably don't need to restrict imports to official dojos + import_dojo = ( + assert_import_one(Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True), + "Import dojo `{dojo_data['import']['dojo']}` does not exist") + if "import" in dojo_data else None + ) + + dojo_kwargs = { + field: dojo_data.get(field, getattr(import_dojo, field, None)) + for field in ["id", "name", "description", "password", "type", "award"] + } + + if dojo is None: + dojo = Dojos(**dojo_kwargs) + else: + for name, value in dojo_kwargs.items(): + setattr(dojo, name, value) + + existing_challenges = {(challenge.module.id, challenge.id): challenge.challenge for challenge in dojo.challenges} + def challenge(module_id: str, challenge_id: str, transfer: typing.Optional[dict[str, Any]]) -> Challenges: + """ + Retrieves or creates a dojo challenge object based on the given module and challenge identifiers. + + This function performs the following logic: + - If the challenge has already been retrieved (cached in `existing_challenges`), it is returned immediately. + - If a challenge matching the `module_id` and `challenge_id` exists in the database, it is returned. + - If a `transfer` is provided, the function attempts to locate the challenge in the source dojo, validate transfer permissions, + and return a modified version scoped to the current dojo. + - If no existing or transferrable challenge is found, a new challenge instance is created and returned (but not committed). + """ + if (module_id, challenge_id) in existing_challenges: # Don't re-query for challenges that are already in the dojo + return existing_challenges[(module_id, challenge_id)] + if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first(): + return chal + if transfer: + assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()), "Transfer Error: transfers can only be utilized by official dojos or by system admins during dojo creation" + old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"] + old_dojo = Dojos.from_id(old_dojo_id).first() + assert old_dojo, f"Transfer Error: unable to find source dojo in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" + old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first() + assert old_challenge, f"Transfer Error: unable to find source module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" + old_challenge.category = dojo.hex_dojo_id + old_challenge.name = f"{module_id}:{challenge_id}" + return old_challenge + return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")]) + + def visibility(cls, *args): + """ + Constructs a visibility window from one or more argument dictionaries. + + This method scans the provided dictionaries for a nested "visibility" key containing + optional "start" and "stop" datetime values. The latest non-`None` values found take priority and are used + to create a new instance of `cls` with UTC-normalized timestamps. + """ + start = None + stop = None + for arg in args: + start = arg.get("visibility", {}).get("start") or start + stop = arg.get("visibility", {}).get("stop") or stop + if start or stop: + start = start.astimezone(datetime.timezone.utc) if start else None + stop = stop.astimezone(datetime.timezone.utc) if stop else None + return cls(start=start, stop=stop) + + _missing = object() + def shadow(attr, *datas, default=_missing, default_dict=None): + """ + Looks for `attr` in the given datas (in reverse order), returning the first found value. + + If not found: + - Returns `default` if explicitly provided + - Returns `default_dict[attr]` if present + - Otherwise raises KeyError. + """ + for data in reversed(datas): + if attr in data: + return data[attr] + if default is not _missing: + return default + elif default_dict and attr in default_dict: + return default_dict[attr] + raise KeyError(f"Missing `{attr}` in `{datas}`") + + def import_ids(attrs: list[str], *datas) -> tuple: + """ + Resolves the import sources by extracting the "import" attribute from the `datas` and extracting all of the attributes under `import` which are specified by `attr` + """ + datas_import = [data.get("import", {}) for data in datas] + return tuple(shadow(attr, *datas_import) for attr in attrs) + + def build_dojo_resources(module_data): + if "resources" not in module_data: + return None + return [ + DojoResources( + **{kwarg: resource_data.get(kwarg) for kwarg in ["name", "type", "content", "video", "playlist", "slides"]}, + visibility=visibility(DojoResourceVisibilities, dojo_data, module_data, resource_data), + ) + for resource_data in module_data["resources"] + ] + dojo.modules = modules_from_spec(dojo_data["modules"]) + + + + # FIXME address imports later + if "modules" in dojo_data else [ + DojoModules( + default=module, + visibility=visibility(DojoModuleVisibilities, dojo_data, module_data), + ) + for module in (import_dojo.modules if import_dojo else []) + ] + + if dojo_dir: + with dojo.located_at(dojo_dir): + missing_challenge_paths = [ + challenge + for module in dojo.modules + for challenge in module.challenges + if not challenge.path.exists() + ] + assert not missing_challenge_paths, "".join( + f"Missing challenge path: {challenge.module.id}/{challenge.id}\n" + for challenge in missing_challenge_paths) + + course_yml_path = dojo_dir / "course.yml" + if course_yml_path.exists(): + course = yaml.safe_load(course_yml_path.read_text()) + + if "discord_role" in course and not dojo.official: + raise AssertionError("Unofficial dojos cannot have a discord role") + + dojo.course = course + + students_yml_path = dojo_dir / "students.yml" + if students_yml_path.exists(): + students = yaml.safe_load(students_yml_path.read_text()) + dojo.course["students"] = students + + syllabus_path = dojo_dir / "SYLLABUS.md" + if "syllabus" not in dojo.course and syllabus_path.exists(): + dojo.course["syllabus"] = syllabus_path.read_text() + + grade_path = dojo_dir / "grade.py" + if grade_path.exists(): + dojo.course["grade_code"] = grade_path.read_text() + + if dojo_data.get("pages"): + dojo.pages = dojo_data["pages"] + + return dojo + + + +MODULE_SPEC = Schema([{ + **ID_NAME_DESCRIPTION, + **VISIBILITY, + + Optional("image"): IMAGE_REGEX, + Optional("allow_privileged"): bool, + Optional("importable"): bool, + + Optional("import"): { + Optional("dojo"): UNIQUE_ID_REGEX, + "module": ID_REGEX, + }, + + Optional("survey"): Or( + { + "type": "multiplechoice", + "prompt": str, + Optional("probability"): float, + "options": [str], + }, + { + "type": "thumb", + "prompt": str, + Optional("probability"): float, + }, + { + "type": "freeform", + "prompt": str, + Optional("probability"): float, + }, + ), + + + Optional("resources", default=[]): [Or( + { + "type": "markdown", + "name": NAME_REGEX, + "content": str, + **VISIBILITY, + }, + { + "type": "lecture", + "name": NAME_REGEX, + Optional("video"): str, + Optional("playlist"): str, + Optional("slides"): str, + **VISIBILITY, + }, + )], + + Optional("auxiliary", default={}, ignore_extra_keys=True): dict, + + Optional("challenges", default=[]): list, # Defer challenge validation +}]) + + +def modules_from_spec(raw_module_data): + try: + module_list = MODULE_SPEC.validate(raw_module_data) + except SchemaError as e: + raise AssertionError(f"Invalid module specification: {e}") + + + return [ + DojoModules( + **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]}, + resources = build_dojo_resources(module_data), + default=(assert_import_one(DojoModules.from_id(*import_ids(["dojo", "module"], dojo_data, module_data)), + f"Import module `{'/'.join(import_ids(['dojo', 'module'], dojo_data, module_data))}` does not exist") + if "import" in module_data else None), + visibility=visibility(DojoModuleVisibilities, dojo_data, module_data), + + challenges=challenges_from_spec(module_data["challenges"]), + ) + for module_data in module_list + ] + + + +CHALLENGE_SPEC = Schema([{ + **ID_NAME_DESCRIPTION, + **VISIBILITY, + + Optional("image"): IMAGE_REGEX, + Optional("allow_privileged"): bool, + Optional("importable"): bool, + Optional("progression_locked"): bool, + Optional("auxiliary", default={}, ignore_extra_keys=True): dict, + # Optional("path"): Regex(r"^[^\s\.\/][^\s\.]{,255}$"), + + Optional("import"): { + Optional("dojo"): UNIQUE_ID_REGEX, + Optional("module"): ID_REGEX, + "challenge": ID_REGEX, + }, + + Optional("transfer"): { + Optional("dojo"): UNIQUE_ID_REGEX, + Optional("module"): ID_REGEX, + "challenge": ID_REGEX, + }, + + Optional("survey"): Or( + { + "type": "multiplechoice", + "prompt": str, + Optional("probability"): float, + "options": [str], + }, + { + "type": "thumb", + "prompt": str, + Optional("probability"): float, + }, + { + "type": "freeform", + "prompt": str, + Optional("probability"): float, + }, + ) +}]) + + +def first_present_or_none(key, *dicts): + for d in dicts: + if key in d: + return d[key] + return None + +def get_visibility(cls, *dicts): + visibility = first_present_or_none("visibility", *dicts) + + if visibility: + start = visibility["start"].astimezone(datetime.timezone.utc) if "start" in visibility else None + stop = visibility["stop"].astimezone(datetime.timezone.utc) if "stop" in visibility else None + assert start or stop, "`start` or `stop` value must be present under visibility" + return cls(start=start, stop=stop) + + return None + + +def challenges_from_spec(raw_challenge_data, defaults): + try: + challenge_list = CHALLENGE_SPEC.validate(raw_challenge_data) + except SchemaError as e: + raise AssertionError(f"Invalid challenge specification: {e}") + + return [ + DojoChallenges( + **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]}, + image=first_present_or_none("image", challenge_data, defaults), + allow_privileged=first_present_or_none("allow_privileged", challenge_data, defaults, DojoChallenges.data_defaults), + importable=first_present_or_none("importable", challenge_data, defaults, DojoChallenges.data_defaults), + challenge=challenge( + module_data.get("id"), challenge_data.get("id"), transfer=challenge_data.get("transfer", None) + ) if "import" not in challenge_data else None, + progression_locked=challenge_data.get("progression_locked"), + visibility=get_visibility(DojoChallengeVisibilities, challenge_data, defaults), + survey=first_present_or_none("survey", challenge_data, defaults), + # TODO Handle imports seperately + ) + for challenge_data in challenge_list + ] + \ No newline at end of file From 852c778be475c5a407fc1d6020c180e1628ccca8 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Wed, 2 Jul 2025 12:50:13 -0700 Subject: [PATCH 02/11] Finished module and challenge building --- dojo_plugin/models/__init__.py | 14 -- dojo_plugin/utils/dojo_builder.py | 341 +++++++++++++++--------------- 2 files changed, 171 insertions(+), 184 deletions(-) diff --git a/dojo_plugin/models/__init__.py b/dojo_plugin/models/__init__.py index f2bc1bc33..5f02fdb72 100644 --- a/dojo_plugin/models/__init__.py +++ b/dojo_plugin/models/__init__.py @@ -353,7 +353,6 @@ class DojoModules(db.Model): def __init__(self, *args, **kwargs): - default = kwargs.pop("default", None) visibility = kwargs["visibility"] if "visibility" in kwargs else None data = kwargs.pop("data", {}) @@ -488,25 +487,12 @@ class DojoChallenges(db.Model): survey_responses = db.relationship("SurveyResponses", back_populates="challenge", cascade="all, delete-orphan") def __init__(self, *args, **kwargs): - default = kwargs.pop("default", None) - data = kwargs.pop("data", {}) for field in self.data_fields: if field in kwargs: data[field] = kwargs.pop(field) kwargs["data"] = data - if default: - if kwargs.get("challenge") is not None: - raise AttributeError("Import requires challenge to be None") - - for field in ["id", "name", "description", "challenge"]: - kwargs[field] = kwargs[field] if kwargs.get(field) is not None else getattr(default, field, None) - - # TODO: maybe we should track the entire import - kwargs["data"]["image"] = default.data.get("image") - kwargs["data"]["path_override"] = str(default.path) - super().__init__(*args, **kwargs) def __getattr__(self, name): diff --git a/dojo_plugin/utils/dojo_builder.py b/dojo_plugin/utils/dojo_builder.py index 3380ba6f6..824e2c78a 100644 --- a/dojo_plugin/utils/dojo_builder.py +++ b/dojo_plugin/utils/dojo_builder.py @@ -97,43 +97,37 @@ """ -def dojo_from_spec(data: dict[str, Any], *, dojo_dir:typing.Optional[Path]=None, dojo:typing.Optional[Dojos]=None) -> Dojos: +def import_one(query, error_message): try: - dojo_data = DOJO_SPEC.validate(data) - except SchemaError as e: - raise AssertionError(f"Invalid dojo specification: {e}") + o = query.one() + assert o.importable, f"Import disallowed for {o}." + return o + except NoResultFound: + raise AssertionError(error_message) - # def assert_importable(o): - # assert o.importable, f"Import disallowed for {o}." - # if isinstance(o, Dojos): - # for m in o.module: - # assert_importable(m) - # if isinstance(o, DojoModules): - # for c in o.challenges: - # assert_importable(c) - - def assert_import_one(query, error_message): - """ - Since dojos are queried by id, this ensures that only one dojo matches the id, as well as making sure that dojo is importable. - """ - try: - o = query.one() - assert o.importable, f"Import disallowed for {o}." - return o - except NoResultFound: - raise AssertionError(error_message) +def import_dojo(dojo_data): # TODO: we probably don't need to restrict imports to official dojos - import_dojo = ( - assert_import_one(Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True), - "Import dojo `{dojo_data['import']['dojo']}` does not exist") - if "import" in dojo_data else None + imported_dojo = import_one( + Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True), + f"Import dojo `{dojo_data['import']['dojo']}` does not exist" ) - dojo_kwargs = { - field: dojo_data.get(field, getattr(import_dojo, field, None)) - for field in ["id", "name", "description", "password", "type", "award"] - } + for attr in ["id", "name", "description", "password", "type", "award"]: + if attr not in dojo_data: + dojo_data[attr] = getattr(import_dojo, attr) + + + + +def dojo_from_spec(data: dict, *, dojo_dir=None, dojo=None) -> Dojos: + try: + dojo_data = DOJO_SPEC.validate(data) + except SchemaError as e: + raise AssertionError(f"Invalid dojo specification: {e}") + + if "import" in dojo_data: + import_dojo(dojo_data) if dojo is None: dojo = Dojos(**dojo_kwargs) @@ -141,100 +135,9 @@ def assert_import_one(query, error_message): for name, value in dojo_kwargs.items(): setattr(dojo, name, value) - existing_challenges = {(challenge.module.id, challenge.id): challenge.challenge for challenge in dojo.challenges} - def challenge(module_id: str, challenge_id: str, transfer: typing.Optional[dict[str, Any]]) -> Challenges: - """ - Retrieves or creates a dojo challenge object based on the given module and challenge identifiers. - - This function performs the following logic: - - If the challenge has already been retrieved (cached in `existing_challenges`), it is returned immediately. - - If a challenge matching the `module_id` and `challenge_id` exists in the database, it is returned. - - If a `transfer` is provided, the function attempts to locate the challenge in the source dojo, validate transfer permissions, - and return a modified version scoped to the current dojo. - - If no existing or transferrable challenge is found, a new challenge instance is created and returned (but not committed). - """ - if (module_id, challenge_id) in existing_challenges: # Don't re-query for challenges that are already in the dojo - return existing_challenges[(module_id, challenge_id)] - if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first(): - return chal - if transfer: - assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()), "Transfer Error: transfers can only be utilized by official dojos or by system admins during dojo creation" - old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"] - old_dojo = Dojos.from_id(old_dojo_id).first() - assert old_dojo, f"Transfer Error: unable to find source dojo in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" - old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first() - assert old_challenge, f"Transfer Error: unable to find source module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" - old_challenge.category = dojo.hex_dojo_id - old_challenge.name = f"{module_id}:{challenge_id}" - return old_challenge - return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")]) - - def visibility(cls, *args): - """ - Constructs a visibility window from one or more argument dictionaries. - - This method scans the provided dictionaries for a nested "visibility" key containing - optional "start" and "stop" datetime values. The latest non-`None` values found take priority and are used - to create a new instance of `cls` with UTC-normalized timestamps. - """ - start = None - stop = None - for arg in args: - start = arg.get("visibility", {}).get("start") or start - stop = arg.get("visibility", {}).get("stop") or stop - if start or stop: - start = start.astimezone(datetime.timezone.utc) if start else None - stop = stop.astimezone(datetime.timezone.utc) if stop else None - return cls(start=start, stop=stop) - - _missing = object() - def shadow(attr, *datas, default=_missing, default_dict=None): - """ - Looks for `attr` in the given datas (in reverse order), returning the first found value. - - If not found: - - Returns `default` if explicitly provided - - Returns `default_dict[attr]` if present - - Otherwise raises KeyError. - """ - for data in reversed(datas): - if attr in data: - return data[attr] - if default is not _missing: - return default - elif default_dict and attr in default_dict: - return default_dict[attr] - raise KeyError(f"Missing `{attr}` in `{datas}`") - - def import_ids(attrs: list[str], *datas) -> tuple: - """ - Resolves the import sources by extracting the "import" attribute from the `datas` and extracting all of the attributes under `import` which are specified by `attr` - """ - datas_import = [data.get("import", {}) for data in datas] - return tuple(shadow(attr, *datas_import) for attr in attrs) - - def build_dojo_resources(module_data): - if "resources" not in module_data: - return None - return [ - DojoResources( - **{kwarg: resource_data.get(kwarg) for kwarg in ["name", "type", "content", "video", "playlist", "slides"]}, - visibility=visibility(DojoResourceVisibilities, dojo_data, module_data, resource_data), - ) - for resource_data in module_data["resources"] - ] - dojo.modules = modules_from_spec(dojo_data["modules"]) + dojo.modules = modules_from_spec(dojo, dojo_data) - - # FIXME address imports later - if "modules" in dojo_data else [ - DojoModules( - default=module, - visibility=visibility(DojoModuleVisibilities, dojo_data, module_data), - ) - for module in (import_dojo.modules if import_dojo else []) - ] if dojo_dir: with dojo.located_at(dojo_dir): @@ -275,6 +178,23 @@ def build_dojo_resources(module_data): return dojo +def first_present(key, *dicts): + for d in dicts: + if key in d: + return d[key] + return None + +def get_visibility(cls, *dicts): + visibility = first_present("visibility", *dicts) + + if visibility: + start = visibility["start"].astimezone(datetime.timezone.utc) if "start" in visibility else None + stop = visibility["stop"].astimezone(datetime.timezone.utc) if "stop" in visibility else None + assert start or stop, "`start` or `stop` value must be present under visibility" + return cls(start=start, stop=stop) + + return None + MODULE_SPEC = Schema([{ @@ -333,28 +253,65 @@ def build_dojo_resources(module_data): }]) -def modules_from_spec(raw_module_data): - try: - module_list = MODULE_SPEC.validate(raw_module_data) - except SchemaError as e: - raise AssertionError(f"Invalid module specification: {e}") +RESOURCE_ATTRIBUTES = ["name", "type", "content", "video", "playlist", "slides"] +def build_dojo_resources(module_data, dojo_data): + if "resources" not in module_data: + return None return [ - DojoModules( - **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]}, - resources = build_dojo_resources(module_data), - default=(assert_import_one(DojoModules.from_id(*import_ids(["dojo", "module"], dojo_data, module_data)), - f"Import module `{'/'.join(import_ids(['dojo', 'module'], dojo_data, module_data))}` does not exist") - if "import" in module_data else None), - visibility=visibility(DojoModuleVisibilities, dojo_data, module_data), - - challenges=challenges_from_spec(module_data["challenges"]), + DojoResources( + **{attr: resource_data.get(attr) for attr in RESOURCE_ATTRIBUTES}, + visibility=get_visibility(DojoResourceVisibilities, resource_data, module_data, dojo_data), ) - for module_data in module_list + for resource_data in module_data["resources"] ] +def import_module(module_data, dojo_data): + import_data = ( + module_data["import"]["module"], + first_present("dojo", module_data["import"], dojo_data["import"]), + ) + + imported_module = import_one(DojoModules.from_id(*import_data), f"{'/'.join(import_data)} does not exist") + for attr in ["id", "name", "description"]: + if attr not in module_data: + module_data[attr] = getattr(imported_module, attr) + + if "challenges" not in module_data: + # The idea here is that once it reaches challenges_from_spec it will process the actual challenge importing + module_data["challenges"] = [{"import": {"challenge": challenge.id}} for challenge in import_module.challenges] + + if "resources" not in module_data: + module_data["resources"] = [ + { + attr: getattr(resource, attr) for attr in RESOURCE_ATTRIBUTES if getattr(resource, attr, None) is not None + } for resource in import_module.resources + ] + + +def modules_from_spec(dojo, dojo_data): + try: + module_list = MODULE_SPEC.validate(dojo_data["modules"]) + except SchemaError as e: + raise AssertionError(f"Invalid module specification: {e}") + + result = [] + for module_data in module_list: + if "import" in module_data: + import_module(module_data, dojo_data) + result.append( + DojoModules( + **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]}, + resources=build_dojo_resources(module_data, dojo_data), + visibility=get_visibility(DojoModuleVisibilities, module_data, dojo_data), + challenges=challenges_from_spec(dojo, dojo_data, module_data), + ) + ) + return result + + CHALLENGE_SPEC = Schema([{ **ID_NAME_DESCRIPTION, @@ -400,44 +357,88 @@ def modules_from_spec(raw_module_data): }]) -def first_present_or_none(key, *dicts): - for d in dicts: - if key in d: - return d[key] - return None -def get_visibility(cls, *dicts): - visibility = first_present_or_none("visibility", *dicts) - if visibility: - start = visibility["start"].astimezone(datetime.timezone.utc) if "start" in visibility else None - stop = visibility["stop"].astimezone(datetime.timezone.utc) if "stop" in visibility else None - assert start or stop, "`start` or `stop` value must be present under visibility" - return cls(start=start, stop=stop) +def get_challenge(dojo, module_id, challenge_id, transfer) -> Challenges: + """ + Retrieves or creates a dojo challenge object based on the given module and challenge identifiers. + + This function performs the following logic: + - If the challenge has already been retrieved (cached in `existing_challenges`), it is returned immediately. + - If a challenge matching the `module_id` and `challenge_id` exists in the database, it is returned. + - If a `transfer` is provided, the function attempts to locate the challenge in the source dojo, validate transfer permissions, + and return a modified version scoped to the current dojo. + - If no existing or transferrable challenge is found, a new challenge instance is created and returned (but not committed). + """ + if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first(): + return chal + if transfer: + assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()), "Transfer Error: transfers can only be utilized by official dojos or by system admins during dojo creation" + old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"] + old_dojo = Dojos.from_id(old_dojo_id).first() + assert old_dojo, f"Transfer Error: unable to find source dojo in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" + old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first() + assert old_challenge, f"Transfer Error: unable to find source module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" + old_challenge.category = dojo.hex_dojo_id + old_challenge.name = f"{module_id}:{challenge_id}" + return old_challenge + return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")]) + + +def import_challenge(challenge_data, module_data, dojo_data) -> Challenges: + # Handles the heirarchy of imports + import_data = ( + challenge_data["import"]["challenge"], + first_present("module", challenge_data["import"], module_data["import"]), # No need to check dojo_data imports because module can never be defined there + first_present("dojo", challenge_data["import"], module_data["import"], dojo_data["import"]), + ) - return None + imported_challenge = import_one(DojoChallenges.from_id(*import_data), f"{'/'.join(import_data)} does not exist") + for attr in ["id", "name", "description"]: + if attr not in challenge_data: + challenge_data[attr] = getattr(imported_challenge, attr) + + # TODO: maybe we should track the entire import + challenge_data["image"] = imported_challenge.data.get("image") + challenge_data["path_override"] = str(imported_challenge.path) + return imported_challenge.challenge -def challenges_from_spec(raw_challenge_data, defaults): + +def challenges_from_spec(dojo, dojo_data, module_data) -> list[DojoChallenges]: try: - challenge_list = CHALLENGE_SPEC.validate(raw_challenge_data) + challenge_list = CHALLENGE_SPEC.validate(module_data["challenges"]) except SchemaError as e: raise AssertionError(f"Invalid challenge specification: {e}") - - return [ - DojoChallenges( - **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]}, - image=first_present_or_none("image", challenge_data, defaults), - allow_privileged=first_present_or_none("allow_privileged", challenge_data, defaults, DojoChallenges.data_defaults), - importable=first_present_or_none("importable", challenge_data, defaults, DojoChallenges.data_defaults), - challenge=challenge( - module_data.get("id"), challenge_data.get("id"), transfer=challenge_data.get("transfer", None) - ) if "import" not in challenge_data else None, - progression_locked=challenge_data.get("progression_locked"), - visibility=get_visibility(DojoChallengeVisibilities, challenge_data, defaults), - survey=first_present_or_none("survey", challenge_data, defaults), - # TODO Handle imports seperately + + module_id = module_data["id"] + + # This is for caching existing challenges to improve performance of updating a dojo + existing_module = next((module for module in dojo.modules if module.id == module_id), None) + existing_challenges = {challenge.id: challenge.challenge for challenge in existing_module.challenges} if existing_module else {} + + result = [] + for challenge_data in challenge_list: + data_priority_chain = (challenge_data, module_data, dojo_data) + challenge_id = challenge_data.get("id") + + if "import" in challenge_data: + challenge = import_challenge(*data_priority_chain) # import has to be done before DojoChallenges creation because it modifies challenge_data + elif challenge_id in existing_challenges: + challenge = existing_challenges[challenge_id] + else: + challenge = get_challenge(dojo, module_id, challenge_data.get("id"), transfer=challenge_data.get("transfer")) + + result.append( + DojoChallenges( + **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]}, + image=first_present("image", *data_priority_chain), + allow_privileged=first_present("allow_privileged", *data_priority_chain, DojoChallenges.data_defaults), + importable=first_present("importable", *data_priority_chain, DojoChallenges.data_defaults), + progression_locked=first_present("progression_locked", challenge_data, DojoChallenges.data_defaults), + survey=first_present("survey", *data_priority_chain), + visibility=get_visibility(DojoChallengeVisibilities, *data_priority_chain), + challenge=challenge + ) ) - for challenge_data in challenge_list - ] - \ No newline at end of file + return result \ No newline at end of file From 4add2f7ba0919749a0c2226fc2f1fd3a87c9f9e4 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Wed, 2 Jul 2025 17:44:01 -0700 Subject: [PATCH 03/11] Separate dojo, module, and challenge building into seperate files --- dojo_plugin/models/__init__.py | 23 +- dojo_plugin/utils/dojo.py | 47 +- dojo_plugin/utils/dojo_builder.py | 444 ------------------ .../utils/dojo_creation/builder_utils.py | 87 ++++ .../utils/dojo_creation/challenge_builder.py | 112 +++++ .../utils/dojo_creation/dojo_builder.py | 88 ++++ .../utils/dojo_creation/module_builder.py | 104 ++++ 7 files changed, 438 insertions(+), 467 deletions(-) delete mode 100644 dojo_plugin/utils/dojo_builder.py create mode 100644 dojo_plugin/utils/dojo_creation/builder_utils.py create mode 100644 dojo_plugin/utils/dojo_creation/challenge_builder.py create mode 100644 dojo_plugin/utils/dojo_creation/dojo_builder.py create mode 100644 dojo_plugin/utils/dojo_creation/module_builder.py diff --git a/dojo_plugin/models/__init__.py b/dojo_plugin/models/__init__.py index 5f02fdb72..0719f2fe4 100644 --- a/dojo_plugin/models/__init__.py +++ b/dojo_plugin/models/__init__.py @@ -353,33 +353,13 @@ class DojoModules(db.Model): def __init__(self, *args, **kwargs): - visibility = kwargs["visibility"] if "visibility" in kwargs else None - data = kwargs.pop("data", {}) + for field in self.data_fields: if field in kwargs: data[field] = kwargs.pop(field) kwargs["data"] = data - if default: - for field in ["id", "name", "description"]: - kwargs[field] = kwargs[field] if kwargs.get(field) is not None else getattr(default, field, None) - - kwargs["challenges"] = ( - kwargs.pop("challenges", None) or - ([DojoChallenges( - default=challenge, - visibility=(DojoChallengeVisibilities(start=visibility.start) if visibility else None), - ) for challenge in default.challenges] if default else []) - ) - kwargs["resources"] = ( - kwargs.pop("resources", None) or - ([DojoResources( - default=resource, - visibility=(DojoResourceVisibilities(start=visibility.start) if visibility else None), - ) for resource in default.resources] if default else []) - ) - super().__init__(*args, **kwargs) def __getattr__(self, name): @@ -488,6 +468,7 @@ class DojoChallenges(db.Model): def __init__(self, *args, **kwargs): data = kwargs.pop("data", {}) + for field in self.data_fields: if field in kwargs: data[field] = kwargs.pop(field) diff --git a/dojo_plugin/utils/dojo.py b/dojo_plugin/utils/dojo.py index 0f1de42cf..2a7e5659b 100644 --- a/dojo_plugin/utils/dojo.py +++ b/dojo_plugin/utils/dojo.py @@ -20,7 +20,7 @@ from ..models import DojoAdmins, Dojos, DojoModules, DojoChallenges, DojoResources, DojoChallengeVisibilities, DojoResourceVisibilities, DojoModuleVisibilities from ..config import DOJOS_DIR from ..utils import get_current_container -from .dojo_builder import dojo_from_spec +from .dojo_creation.dojo_builder import dojo_from_spec DOJOS_TMP_DIR = DOJOS_DIR/"tmp" @@ -122,7 +122,50 @@ def dojo_from_dir(dojo_dir, *, dojo=None): data_raw = yaml.safe_load(dojo_yml_path.read_text()) data = load_dojo_subyamls(data_raw, dojo_dir) dojo_initialize_files(data, dojo_dir) - return dojo_from_spec(data, dojo_dir=dojo_dir, dojo=dojo) + + built_dojo = dojo_from_spec(data, dojo=dojo) + + validate_challenge_paths(built_dojo, dojo_dir) + initialize_course(built_dojo, dojo_dir) + + return built_dojo + + + +def validate_challenge_paths(dojo, dojo_dir): + with dojo.located_at(dojo_dir): + missing_challenge_paths = [ + challenge + for module in dojo.modules + for challenge in module.challenges + if not challenge.path.exists() + ] + assert not missing_challenge_paths, "".join( + f"Missing challenge path: {challenge.module.id}/{challenge.id}\n" + for challenge in missing_challenge_paths) + +def initialize_course(dojo, dojo_dir): + course_yml_path = dojo_dir / "course.yml" + if course_yml_path.exists(): + course = yaml.safe_load(course_yml_path.read_text()) + + if "discord_role" in course and not dojo.official: + raise AssertionError("Unofficial dojos cannot have a discord role") + + dojo.course = course + + students_yml_path = dojo_dir / "students.yml" + if students_yml_path.exists(): + students = yaml.safe_load(students_yml_path.read_text()) + dojo.course["students"] = students + + syllabus_path = dojo_dir / "SYLLABUS.md" + if "syllabus" not in dojo.course and syllabus_path.exists(): + dojo.course["syllabus"] = syllabus_path.read_text() + + grade_path = dojo_dir / "grade.py" + if grade_path.exists(): + dojo.course["grade_code"] = grade_path.read_text() diff --git a/dojo_plugin/utils/dojo_builder.py b/dojo_plugin/utils/dojo_builder.py deleted file mode 100644 index 824e2c78a..000000000 --- a/dojo_plugin/utils/dojo_builder.py +++ /dev/null @@ -1,444 +0,0 @@ -import datetime -import typing -import yaml - -from pathlib import Path -from schema import Schema, Optional, Regex, Or, Use, SchemaError - -from typing import Any -from dojo_plugin.models import Dojos, DojoModules, DojoChallenges, DojoResources, DojoChallengeVisibilities, DojoModuleVisibilities, DojoResourceVisibilities, Challenges, Flags -from sqlalchemy.orm.exc import NoResultFound -from CTFd.utils.user import is_admin - - -ID_REGEX = Regex(r"^[a-z0-9-]{1,32}$") -UNIQUE_ID_REGEX = Regex(r"^[a-z0-9-~]{1,128}$") -NAME_REGEX = Regex(r"^[\S ]{1,128}$") -IMAGE_REGEX = Regex(r"^[\S]{1,256}$") -FILE_PATH_REGEX = Regex(r"^[A-Za-z0-9_][A-Za-z0-9-_./]*$") -FILE_URL_REGEX = Regex(r"^https://www.dropbox.com/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9.-_]*?rlkey=[a-zA-Z0-9]*&dl=1") -DATE = Use(datetime.datetime.fromisoformat) - -ID_NAME_DESCRIPTION = { - Optional("id"): ID_REGEX, - Optional("name"): NAME_REGEX, - Optional("description"): str, -} - -VISIBILITY = { - Optional("visibility", default={}): { - Optional("start"): DATE, - Optional("stop"): DATE, - } -} - - -DOJO_SPEC = Schema({ - **ID_NAME_DESCRIPTION, - **VISIBILITY, - - Optional("password"): Regex(r"^[\S ]{8,128}$"), - - Optional("type"): ID_REGEX, - Optional("award"): { - Optional("emoji"): Regex(r"^\S$"), - Optional("belt"): IMAGE_REGEX - }, - - Optional("image"): IMAGE_REGEX, - Optional("allow_privileged"): bool, - Optional("importable"): bool, - - Optional("import"): { - "dojo": UNIQUE_ID_REGEX, - }, - - Optional("auxiliary", default={}, ignore_extra_keys=True): dict, - - Optional("survey"): Or( - { - "type": "multiplechoice", - "prompt": str, - Optional("probability"): float, - "options": [str], - }, - { - "type": "thumb", - "prompt": str, - Optional("probability"): float, - }, - { - "type": "freeform", - "prompt": str, - Optional("probability"): float, - }, - ), - - Optional("pages", default=[]): [str], - Optional("files", default=[]): [Or( - { - "type": "download", - "path": FILE_PATH_REGEX, - "url": FILE_URL_REGEX, - }, - { - "type": "text", - "path": FILE_PATH_REGEX, - "content": str, - } - )], - - Optional("modules", default=[]): list, # Defer module validation until later -}) -""" -This is the validation Schema that parses the dojo.yaml file during dojo initialization. - -In order to create a valid dojo.yaml, it must conform to the schema defined here. -""" - - -def import_one(query, error_message): - try: - o = query.one() - assert o.importable, f"Import disallowed for {o}." - return o - except NoResultFound: - raise AssertionError(error_message) - - -def import_dojo(dojo_data): - # TODO: we probably don't need to restrict imports to official dojos - imported_dojo = import_one( - Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True), - f"Import dojo `{dojo_data['import']['dojo']}` does not exist" - ) - - for attr in ["id", "name", "description", "password", "type", "award"]: - if attr not in dojo_data: - dojo_data[attr] = getattr(import_dojo, attr) - - - - -def dojo_from_spec(data: dict, *, dojo_dir=None, dojo=None) -> Dojos: - try: - dojo_data = DOJO_SPEC.validate(data) - except SchemaError as e: - raise AssertionError(f"Invalid dojo specification: {e}") - - if "import" in dojo_data: - import_dojo(dojo_data) - - if dojo is None: - dojo = Dojos(**dojo_kwargs) - else: - for name, value in dojo_kwargs.items(): - setattr(dojo, name, value) - - dojo.modules = modules_from_spec(dojo, dojo_data) - - - - if dojo_dir: - with dojo.located_at(dojo_dir): - missing_challenge_paths = [ - challenge - for module in dojo.modules - for challenge in module.challenges - if not challenge.path.exists() - ] - assert not missing_challenge_paths, "".join( - f"Missing challenge path: {challenge.module.id}/{challenge.id}\n" - for challenge in missing_challenge_paths) - - course_yml_path = dojo_dir / "course.yml" - if course_yml_path.exists(): - course = yaml.safe_load(course_yml_path.read_text()) - - if "discord_role" in course and not dojo.official: - raise AssertionError("Unofficial dojos cannot have a discord role") - - dojo.course = course - - students_yml_path = dojo_dir / "students.yml" - if students_yml_path.exists(): - students = yaml.safe_load(students_yml_path.read_text()) - dojo.course["students"] = students - - syllabus_path = dojo_dir / "SYLLABUS.md" - if "syllabus" not in dojo.course and syllabus_path.exists(): - dojo.course["syllabus"] = syllabus_path.read_text() - - grade_path = dojo_dir / "grade.py" - if grade_path.exists(): - dojo.course["grade_code"] = grade_path.read_text() - - if dojo_data.get("pages"): - dojo.pages = dojo_data["pages"] - - return dojo - -def first_present(key, *dicts): - for d in dicts: - if key in d: - return d[key] - return None - -def get_visibility(cls, *dicts): - visibility = first_present("visibility", *dicts) - - if visibility: - start = visibility["start"].astimezone(datetime.timezone.utc) if "start" in visibility else None - stop = visibility["stop"].astimezone(datetime.timezone.utc) if "stop" in visibility else None - assert start or stop, "`start` or `stop` value must be present under visibility" - return cls(start=start, stop=stop) - - return None - - - -MODULE_SPEC = Schema([{ - **ID_NAME_DESCRIPTION, - **VISIBILITY, - - Optional("image"): IMAGE_REGEX, - Optional("allow_privileged"): bool, - Optional("importable"): bool, - - Optional("import"): { - Optional("dojo"): UNIQUE_ID_REGEX, - "module": ID_REGEX, - }, - - Optional("survey"): Or( - { - "type": "multiplechoice", - "prompt": str, - Optional("probability"): float, - "options": [str], - }, - { - "type": "thumb", - "prompt": str, - Optional("probability"): float, - }, - { - "type": "freeform", - "prompt": str, - Optional("probability"): float, - }, - ), - - - Optional("resources", default=[]): [Or( - { - "type": "markdown", - "name": NAME_REGEX, - "content": str, - **VISIBILITY, - }, - { - "type": "lecture", - "name": NAME_REGEX, - Optional("video"): str, - Optional("playlist"): str, - Optional("slides"): str, - **VISIBILITY, - }, - )], - - Optional("auxiliary", default={}, ignore_extra_keys=True): dict, - - Optional("challenges", default=[]): list, # Defer challenge validation -}]) - - - -RESOURCE_ATTRIBUTES = ["name", "type", "content", "video", "playlist", "slides"] -def build_dojo_resources(module_data, dojo_data): - if "resources" not in module_data: - return None - - return [ - DojoResources( - **{attr: resource_data.get(attr) for attr in RESOURCE_ATTRIBUTES}, - visibility=get_visibility(DojoResourceVisibilities, resource_data, module_data, dojo_data), - ) - for resource_data in module_data["resources"] - ] - - -def import_module(module_data, dojo_data): - import_data = ( - module_data["import"]["module"], - first_present("dojo", module_data["import"], dojo_data["import"]), - ) - - imported_module = import_one(DojoModules.from_id(*import_data), f"{'/'.join(import_data)} does not exist") - for attr in ["id", "name", "description"]: - if attr not in module_data: - module_data[attr] = getattr(imported_module, attr) - - if "challenges" not in module_data: - # The idea here is that once it reaches challenges_from_spec it will process the actual challenge importing - module_data["challenges"] = [{"import": {"challenge": challenge.id}} for challenge in import_module.challenges] - - if "resources" not in module_data: - module_data["resources"] = [ - { - attr: getattr(resource, attr) for attr in RESOURCE_ATTRIBUTES if getattr(resource, attr, None) is not None - } for resource in import_module.resources - ] - - -def modules_from_spec(dojo, dojo_data): - try: - module_list = MODULE_SPEC.validate(dojo_data["modules"]) - except SchemaError as e: - raise AssertionError(f"Invalid module specification: {e}") - - result = [] - for module_data in module_list: - if "import" in module_data: - import_module(module_data, dojo_data) - result.append( - DojoModules( - **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]}, - resources=build_dojo_resources(module_data, dojo_data), - visibility=get_visibility(DojoModuleVisibilities, module_data, dojo_data), - challenges=challenges_from_spec(dojo, dojo_data, module_data), - ) - ) - return result - - - -CHALLENGE_SPEC = Schema([{ - **ID_NAME_DESCRIPTION, - **VISIBILITY, - - Optional("image"): IMAGE_REGEX, - Optional("allow_privileged"): bool, - Optional("importable"): bool, - Optional("progression_locked"): bool, - Optional("auxiliary", default={}, ignore_extra_keys=True): dict, - # Optional("path"): Regex(r"^[^\s\.\/][^\s\.]{,255}$"), - - Optional("import"): { - Optional("dojo"): UNIQUE_ID_REGEX, - Optional("module"): ID_REGEX, - "challenge": ID_REGEX, - }, - - Optional("transfer"): { - Optional("dojo"): UNIQUE_ID_REGEX, - Optional("module"): ID_REGEX, - "challenge": ID_REGEX, - }, - - Optional("survey"): Or( - { - "type": "multiplechoice", - "prompt": str, - Optional("probability"): float, - "options": [str], - }, - { - "type": "thumb", - "prompt": str, - Optional("probability"): float, - }, - { - "type": "freeform", - "prompt": str, - Optional("probability"): float, - }, - ) -}]) - - - - -def get_challenge(dojo, module_id, challenge_id, transfer) -> Challenges: - """ - Retrieves or creates a dojo challenge object based on the given module and challenge identifiers. - - This function performs the following logic: - - If the challenge has already been retrieved (cached in `existing_challenges`), it is returned immediately. - - If a challenge matching the `module_id` and `challenge_id` exists in the database, it is returned. - - If a `transfer` is provided, the function attempts to locate the challenge in the source dojo, validate transfer permissions, - and return a modified version scoped to the current dojo. - - If no existing or transferrable challenge is found, a new challenge instance is created and returned (but not committed). - """ - if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first(): - return chal - if transfer: - assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()), "Transfer Error: transfers can only be utilized by official dojos or by system admins during dojo creation" - old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"] - old_dojo = Dojos.from_id(old_dojo_id).first() - assert old_dojo, f"Transfer Error: unable to find source dojo in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" - old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first() - assert old_challenge, f"Transfer Error: unable to find source module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" - old_challenge.category = dojo.hex_dojo_id - old_challenge.name = f"{module_id}:{challenge_id}" - return old_challenge - return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")]) - - -def import_challenge(challenge_data, module_data, dojo_data) -> Challenges: - # Handles the heirarchy of imports - import_data = ( - challenge_data["import"]["challenge"], - first_present("module", challenge_data["import"], module_data["import"]), # No need to check dojo_data imports because module can never be defined there - first_present("dojo", challenge_data["import"], module_data["import"], dojo_data["import"]), - ) - - imported_challenge = import_one(DojoChallenges.from_id(*import_data), f"{'/'.join(import_data)} does not exist") - for attr in ["id", "name", "description"]: - if attr not in challenge_data: - challenge_data[attr] = getattr(imported_challenge, attr) - - # TODO: maybe we should track the entire import - challenge_data["image"] = imported_challenge.data.get("image") - challenge_data["path_override"] = str(imported_challenge.path) - return imported_challenge.challenge - - - -def challenges_from_spec(dojo, dojo_data, module_data) -> list[DojoChallenges]: - try: - challenge_list = CHALLENGE_SPEC.validate(module_data["challenges"]) - except SchemaError as e: - raise AssertionError(f"Invalid challenge specification: {e}") - - module_id = module_data["id"] - - # This is for caching existing challenges to improve performance of updating a dojo - existing_module = next((module for module in dojo.modules if module.id == module_id), None) - existing_challenges = {challenge.id: challenge.challenge for challenge in existing_module.challenges} if existing_module else {} - - result = [] - for challenge_data in challenge_list: - data_priority_chain = (challenge_data, module_data, dojo_data) - challenge_id = challenge_data.get("id") - - if "import" in challenge_data: - challenge = import_challenge(*data_priority_chain) # import has to be done before DojoChallenges creation because it modifies challenge_data - elif challenge_id in existing_challenges: - challenge = existing_challenges[challenge_id] - else: - challenge = get_challenge(dojo, module_id, challenge_data.get("id"), transfer=challenge_data.get("transfer")) - - result.append( - DojoChallenges( - **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]}, - image=first_present("image", *data_priority_chain), - allow_privileged=first_present("allow_privileged", *data_priority_chain, DojoChallenges.data_defaults), - importable=first_present("importable", *data_priority_chain, DojoChallenges.data_defaults), - progression_locked=first_present("progression_locked", challenge_data, DojoChallenges.data_defaults), - survey=first_present("survey", *data_priority_chain), - visibility=get_visibility(DojoChallengeVisibilities, *data_priority_chain), - challenge=challenge - ) - ) - return result \ No newline at end of file diff --git a/dojo_plugin/utils/dojo_creation/builder_utils.py b/dojo_plugin/utils/dojo_creation/builder_utils.py new file mode 100644 index 000000000..319677e38 --- /dev/null +++ b/dojo_plugin/utils/dojo_creation/builder_utils.py @@ -0,0 +1,87 @@ +from schema import Optional, Regex, Or, Use +import datetime +from sqlalchemy.orm.exc import NoResultFound + + +ID_REGEX = Regex(r"^[a-z0-9-]{1,32}$") +UNIQUE_ID_REGEX = Regex(r"^[a-z0-9-~]{1,128}$") +NAME_REGEX = Regex(r"^[\S ]{1,128}$") +IMAGE_REGEX = Regex(r"^[\S]{1,256}$") +FILE_PATH_REGEX = Regex(r"^[A-Za-z0-9_][A-Za-z0-9-_./]*$") +FILE_URL_REGEX = Regex(r"^https://www.dropbox.com/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9.-_]*?rlkey=[a-zA-Z0-9]*&dl=1") +DATE = Use(datetime.datetime.fromisoformat) + +ID_NAME_DESCRIPTION = { + Optional("id"): ID_REGEX, + Optional("name"): NAME_REGEX, + Optional("description"): str, +} + +VISIBILITY = { + Optional("visibility", default={}): { + Optional("start"): DATE, + Optional("stop"): DATE, + } +} + +SURVEY = { + Optional("survey"): Or( + { + "type": "multiplechoice", + "prompt": str, + Optional("probability"): float, + "options": [str], + }, + { + "type": "thumb", + "prompt": str, + Optional("probability"): float, + }, + { + "type": "freeform", + "prompt": str, + Optional("probability"): float, + }, + ) +} + +BASE_SPEC = { + **ID_NAME_DESCRIPTION, + **VISIBILITY, + + Optional("image"): IMAGE_REGEX, + Optional("allow_privileged"): bool, + Optional("importable"): bool, + Optional("auxiliary", default={}, ignore_extra_keys=True): dict, + + **SURVEY, +} +""" +Dictionary for specification fields that are defined identically in all three layers of the specification schema +""" + + +def import_one(query, error_message): + try: + o = query.one() + assert o.importable, f"Import disallowed for {o}." + return o + except NoResultFound: + raise AssertionError(error_message) + +def first_present(key, *dicts): + for d in dicts: + if key in d: + return d[key] + return None + +def get_visibility(cls, *dicts): + visibility = first_present("visibility", *dicts) + + if visibility: + start = visibility["start"].astimezone(datetime.timezone.utc) if "start" in visibility else None + stop = visibility["stop"].astimezone(datetime.timezone.utc) if "stop" in visibility else None + assert start or stop, "`start` or `stop` value must be present under visibility" + return cls(start=start, stop=stop) + + return None \ No newline at end of file diff --git a/dojo_plugin/utils/dojo_creation/challenge_builder.py b/dojo_plugin/utils/dojo_creation/challenge_builder.py new file mode 100644 index 000000000..dcf08b1eb --- /dev/null +++ b/dojo_plugin/utils/dojo_creation/challenge_builder.py @@ -0,0 +1,112 @@ +from schema import Schema, Optional, SchemaError + +from ...models import Dojos, DojoChallenges, DojoChallengeVisibilities, Challenges, Flags +from CTFd.utils.user import is_admin +from .builder_utils import ( + ID_REGEX, + UNIQUE_ID_REGEX, + BASE_SPEC, + import_one, + first_present, + get_visibility, +) + + +CHALLENGE_SPEC = Schema([{ + **BASE_SPEC, + + Optional("progression_locked"): bool, + # Optional("path"): Regex(r"^[^\s\.\/][^\s\.]{,255}$"), + + Optional("import"): { + Optional("dojo"): UNIQUE_ID_REGEX, + Optional("module"): ID_REGEX, + "challenge": ID_REGEX, + }, + + Optional("transfer"): { + Optional("dojo"): UNIQUE_ID_REGEX, + Optional("module"): ID_REGEX, + "challenge": ID_REGEX, + }, +}]) + + + + +def get_challenge(dojo, module_id, challenge_id, transfer) -> Challenges: + if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first(): + return chal + if transfer: + assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()), "Transfer Error: transfers can only be utilized by official dojos or by system admins during dojo creation" + old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"] + old_dojo = Dojos.from_id(old_dojo_id).first() + assert old_dojo, f"Transfer Error: unable to find source dojo in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" + old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first() + assert old_challenge, f"Transfer Error: unable to find source module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}" + old_challenge.category = dojo.hex_dojo_id + old_challenge.name = f"{module_id}:{challenge_id}" + return old_challenge + return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")]) + + +def import_challenge(challenge_data, module_data, dojo_data) -> DojoChallenges: + # Handles the heirarchy of imports + import_data = ( + first_present("dojo", challenge_data["import"], module_data["import"], dojo_data["import"]), + first_present("module", challenge_data["import"], module_data["import"]), # No need to check dojo_data imports because module can never be defined there + challenge_data["import"]["challenge"], + ) + + imported_challenge = import_one(DojoChallenges.from_id(*import_data), f"{'/'.join(import_data)} does not exist") + for attr in ["id", "name", "description"]: + if attr not in challenge_data: + challenge_data[attr] = getattr(imported_challenge, attr) + + # TODO: maybe we should track the entire import + challenge_data["image"] = imported_challenge.data.get("image") + return imported_challenge + + + +def challenges_from_spec(dojo, dojo_data, module_data) -> list[DojoChallenges]: + try: + challenge_list = CHALLENGE_SPEC.validate(module_data["challenges"]) + except SchemaError as e: + raise AssertionError(f"Invalid challenge specification: {e}") + + module_id = module_data["id"] + + # This is for caching existing challenges to improve performance of updating a dojo + existing_module = next((module for module in dojo.modules if module.id == module_id), None) + existing_challenges = {challenge.id: challenge.challenge for challenge in existing_module.challenges} if existing_module else {} + + result = [] + for challenge_data in challenge_list: + data_priority_chain = (challenge_data, module_data, dojo_data) + + path_override = None + challenge_id = challenge_data.get("id") + if "import" in challenge_data: + imported_challenge = import_challenge(*data_priority_chain) # import has to be done before DojoChallenges creation because it modifies challenge_data + path_override = str(imported_challenge.path) + ctfd_challenge = imported_challenge.challenge + elif challenge_id in existing_challenges: + ctfd_challenge = existing_challenges[challenge_id] + else: + ctfd_challenge = get_challenge(dojo, module_id, challenge_data.get("id"), transfer=challenge_data.get("transfer")) + + result.append( + DojoChallenges( + **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]}, + image=first_present("image", *data_priority_chain), + allow_privileged=first_present("allow_privileged", *data_priority_chain, DojoChallenges.data_defaults), + importable=first_present("importable", *data_priority_chain, DojoChallenges.data_defaults), + progression_locked=first_present("progression_locked", challenge_data, DojoChallenges.data_defaults), + survey=first_present("survey", *data_priority_chain), + visibility=get_visibility(DojoChallengeVisibilities, *data_priority_chain), + path_override=path_override, + challenge=ctfd_challenge, + ) + ) + return result \ No newline at end of file diff --git a/dojo_plugin/utils/dojo_creation/dojo_builder.py b/dojo_plugin/utils/dojo_creation/dojo_builder.py new file mode 100644 index 000000000..0e4526320 --- /dev/null +++ b/dojo_plugin/utils/dojo_creation/dojo_builder.py @@ -0,0 +1,88 @@ +import yaml + +from schema import Schema, Optional, Regex, Or, SchemaError + +from ...models import Dojos +from .builder_utils import ( + ID_REGEX, + UNIQUE_ID_REGEX, + IMAGE_REGEX, + FILE_PATH_REGEX, + FILE_URL_REGEX, + BASE_SPEC, + import_one, +) +from .module_builder import modules_from_spec + + +DOJO_SPEC = Schema({ + **BASE_SPEC, + + Optional("password"): Regex(r"^[\S ]{8,128}$"), + + Optional("type"): ID_REGEX, + Optional("award"): { + Optional("emoji"): Regex(r"^\S$"), + Optional("belt"): IMAGE_REGEX + }, + + + Optional("import"): { + "dojo": UNIQUE_ID_REGEX, + }, + + + Optional("pages", default=[]): [str], + Optional("files", default=[]): [Or( + { + "type": "download", + "path": FILE_PATH_REGEX, + "url": FILE_URL_REGEX, + }, + { + "type": "text", + "path": FILE_PATH_REGEX, + "content": str, + } + )], + + Optional("modules", default=[]): list, # Defer module validation until later +}) + + +DOJO_ATTRIBUTES = ["id", "name", "description", "password", "type", "award", "pages"] +def import_dojo(dojo_data): + # TODO: we probably don't need to restrict imports to official dojos + imported_dojo = import_one( + Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True), + f"Import dojo `{dojo_data['import']['dojo']}` does not exist" + ) + + for attr in DOJO_ATTRIBUTES: + if attr not in dojo_data: + dojo_data[attr] = getattr(imported_dojo, attr) + + # Modules will be initialized at the module layer, and challenges at the challenge layer + if not dojo_data["modules"]: + dojo_data["modules"] = [{"import": {"module": module.id}} for module in imported_dojo.modules] + + +def dojo_from_spec(data: dict, *, dojo=None) -> Dojos: + try: + dojo_data = DOJO_SPEC.validate(data) + except SchemaError as e: + raise AssertionError(f"Invalid dojo specification: {e}") + + if "import" in dojo_data: + import_dojo(dojo_data) + + dojo_kwargs = {attr: dojo_data.get(attr) for attr in DOJO_ATTRIBUTES} + if dojo is None: + dojo = Dojos(**dojo_kwargs) + else: + for name, value in dojo_kwargs.items(): + setattr(dojo, name, value) + + dojo.modules = modules_from_spec(dojo, dojo_data) + + return dojo \ No newline at end of file diff --git a/dojo_plugin/utils/dojo_creation/module_builder.py b/dojo_plugin/utils/dojo_creation/module_builder.py new file mode 100644 index 000000000..de8f56935 --- /dev/null +++ b/dojo_plugin/utils/dojo_creation/module_builder.py @@ -0,0 +1,104 @@ +from schema import Schema, Optional, Or, SchemaError + +from ...models import DojoModules, DojoResources, DojoModuleVisibilities, DojoResourceVisibilities +from .builder_utils import ( + ID_REGEX, + UNIQUE_ID_REGEX, + NAME_REGEX, + VISIBILITY, + BASE_SPEC, + import_one, + first_present, + get_visibility, +) +from .challenge_builder import challenges_from_spec + + + +MODULE_SPEC = Schema([{ + **BASE_SPEC, + + Optional("import"): { + Optional("dojo"): UNIQUE_ID_REGEX, + "module": ID_REGEX, + }, + Optional("resources", default=[]): [Or( + { + "type": "markdown", + "name": NAME_REGEX, + "content": str, + **VISIBILITY, + }, + { + "type": "lecture", + "name": NAME_REGEX, + Optional("video"): str, + Optional("playlist"): str, + Optional("slides"): str, + **VISIBILITY, + }, + )], + + + Optional("challenges", default=[]): list, # Defer challenge validation +}]) + + + +RESOURCE_ATTRIBUTES = ["name", "type", "content", "video", "playlist", "slides"] +def build_dojo_resources(module_data, dojo_data): + if "resources" not in module_data: + return None + + return [ + DojoResources( + **{attr: resource_data.get(attr) for attr in RESOURCE_ATTRIBUTES}, + visibility=get_visibility(DojoResourceVisibilities, resource_data, module_data, dojo_data), + ) + for resource_data in module_data["resources"] + ] + + +def import_module(module_data, dojo_data): + import_data = ( + first_present("dojo", module_data["import"], dojo_data["import"]), + module_data["import"]["module"], + ) + + imported_module = import_one(DojoModules.from_id(*import_data), f"{'/'.join(import_data)} does not exist") + for attr in ["id", "name", "description"]: + if attr not in module_data: + module_data[attr] = getattr(imported_module, attr) + + # The idea here is that once it reaches challenges_from_spec it will process the actual challenge importing + if not module_data["challenges"]: + module_data["challenges"] = [{"import": {"challenge": challenge.id}} for challenge in imported_module.challenges] + + if not module_data["resources"]: + module_data["resources"] = [ + { + attr: getattr(resource, attr) for attr in RESOURCE_ATTRIBUTES if getattr(resource, attr, None) is not None + } for resource in imported_module.resources + ] + + +def modules_from_spec(dojo, dojo_data): + try: + module_list = MODULE_SPEC.validate(dojo_data["modules"]) + except SchemaError as e: + raise AssertionError(f"Invalid module specification: {e}") + + result = [] + for module_data in module_list: + if "import" in module_data: + import_module(module_data, dojo_data) + + result.append( + DojoModules( + **{kwarg: module_data.get(kwarg) for kwarg in ["id", "name", "description"]}, + resources=build_dojo_resources(module_data, dojo_data), + visibility=get_visibility(DojoModuleVisibilities, module_data, dojo_data), + challenges=challenges_from_spec(dojo, dojo_data, module_data), + ) + ) + return result \ No newline at end of file From 97e4200d7c4b89b024584d68b0271f8cca222981 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Wed, 2 Jul 2025 18:28:43 -0700 Subject: [PATCH 04/11] Separate dojo creation of files into its own file --- dojo_plugin/api/v1/dojos.py | 3 +- dojo_plugin/pages/dojo.py | 3 +- dojo_plugin/utils/dojo.py | 290 +----------------- .../utils/dojo_creation/dojo_initializer.py | 288 +++++++++++++++++ 4 files changed, 299 insertions(+), 285 deletions(-) create mode 100644 dojo_plugin/utils/dojo_creation/dojo_initializer.py diff --git a/dojo_plugin/api/v1/dojos.py b/dojo_plugin/api/v1/dojos.py index a84bc3b57..aad3756f7 100644 --- a/dojo_plugin/api/v1/dojos.py +++ b/dojo_plugin/api/v1/dojos.py @@ -11,7 +11,8 @@ from ...models import DojoStudents, Dojos, DojoModules, DojoChallenges, DojoUsers, Emojis, SurveyResponses from ...utils import render_markdown, is_challenge_locked -from ...utils.dojo import dojo_route, dojo_admins_only, dojo_create +from ...utils.dojo import dojo_route, dojo_admins_only +from ...utils.dojo_creation.dojo_initializer import dojo_create dojos_namespace = Namespace( diff --git a/dojo_plugin/pages/dojo.py b/dojo_plugin/pages/dojo.py index 6aacd1a46..620358fd5 100644 --- a/dojo_plugin/pages/dojo.py +++ b/dojo_plugin/pages/dojo.py @@ -14,8 +14,9 @@ from ..utils import get_current_container, get_all_containers, render_markdown from ..utils.stats import get_container_stats, get_dojo_stats -from ..utils.dojo import dojo_route, get_current_dojo_challenge, dojo_update, dojo_admins_only +from ..utils.dojo import dojo_route, get_current_dojo_challenge, dojo_admins_only from ..models import Dojos, DojoUsers, DojoStudents, DojoModules, DojoMembers, DojoChallenges +from ..utils.dojo_creation.dojo_initializer import dojo_update dojo = Blueprint("pwncollege_dojo", __name__) #pylint:disable=redefined-outer-name diff --git a/dojo_plugin/utils/dojo.py b/dojo_plugin/utils/dojo.py index 2a7e5659b..5a5d22d18 100644 --- a/dojo_plugin/utils/dojo.py +++ b/dojo_plugin/utils/dojo.py @@ -1,173 +1,14 @@ -import os -import re import subprocess -import sys import tempfile import functools import inspect import pathlib -import urllib.request -import yaml -import requests -from schema import Schema, Optional, Regex, Or, Use, SchemaError from flask import abort, g -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.exc import NoResultFound -from CTFd.models import db, Challenges, Flags from CTFd.utils.user import get_current_user, is_admin -from ..models import DojoAdmins, Dojos, DojoModules, DojoChallenges, DojoResources, DojoChallengeVisibilities, DojoResourceVisibilities, DojoModuleVisibilities -from ..config import DOJOS_DIR +from ..models import Dojos, DojoModules, DojoChallenges from ..utils import get_current_container -from .dojo_creation.dojo_builder import dojo_from_spec - - -DOJOS_TMP_DIR = DOJOS_DIR/"tmp" -DOJOS_TMP_DIR.mkdir(exist_ok=True) - - - -def setdefault_name(entry): - if "import" in entry: - return - if "name" in entry: - return - if "id" not in entry: - return - entry["name"] = entry["id"].replace("-", " ").title() - - -def setdefault_file(data, key, file_path): - if file_path.exists(): - data.setdefault("description", file_path.read_text()) - - -def setdefault_subyaml(data, subyaml_path): - if not subyaml_path.exists(): - return data - - topyaml_data = dict(data) - subyaml_data = yaml.safe_load(subyaml_path.read_text()) - data.clear() - data.update(subyaml_data) - data.update(topyaml_data) - - -def load_dojo_subyamls(data, dojo_dir): - """ - The dojo yaml gets augmented with additional yamls and markdown files found in the dojo repo structure. - - The meta-structure is: - - repo-root/dojo.yml - repo-root/DESCRIPTION.md <- if dojo description is missing - repo-root/module-id/module.yml <- fills in missing fields for module in dojo.yml (only module id *needs* to be in dojo.yml) - repo-root/module-id/DESCRIPTION.md <- if module description is missing - repo-root/module-id/challenge-id/challenge.yml <- fills in missing fields for challenge in higher-level ymls (only challenge id *needs* to be in dojo.yml/module.yml) - repo-root/module-id/challenge-id/DESCRIPTION.md <- if challenge description is missing - - The higher-level details override the lower-level details. - """ - - setdefault_file(data, "description", dojo_dir / "DESCRIPTION.md") - - for module_data in data.get("modules", []): - if "id" not in module_data: - continue - - module_dir = dojo_dir / module_data["id"] - setdefault_subyaml(module_data, module_dir / "module.yml") - setdefault_file(module_data, "description", module_dir / "DESCRIPTION.md") - setdefault_name(module_data) - - for challenge_data in module_data.get("challenges", []): - if "id" not in challenge_data: - continue - - challenge_dir = module_dir / challenge_data["id"] - setdefault_subyaml(challenge_data, challenge_dir / "challenge.yml") - setdefault_file(challenge_data, "description", challenge_dir / "DESCRIPTION.md") - setdefault_name(challenge_data) - - return data - - -def dojo_initialize_files(data, dojo_dir): - for dojo_file in data.get("files", []): - assert is_admin(), "yml-specified files support requires admin privileges" - rel_path = dojo_dir / dojo_file["path"] - - abs_path = dojo_dir / rel_path - assert not abs_path.is_symlink(), f"{rel_path} is a symbolic link!" - if abs_path.exists(): - continue - abs_path.parent.mkdir(parents=True, exist_ok=True) - - if dojo_file["type"] == "download": - urllib.request.urlretrieve(dojo_file["url"], str(abs_path)) - assert abs_path.stat().st_size >= 50*1024*1024, f"{rel_path} is small enough to fit into git ({abs_path.stat().st_size} bytes) --- put it in the repository!" - if dojo_file["type"] == "text": - with open(abs_path, "w") as o: - o.write(dojo_file["content"]) - - -def dojo_from_dir(dojo_dir, *, dojo=None): - dojo_yml_path = dojo_dir / "dojo.yml" - assert dojo_yml_path.exists(), "Missing file: `dojo.yml`" - - for path in dojo_dir.rglob("**"): - assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo" - - data_raw = yaml.safe_load(dojo_yml_path.read_text()) - data = load_dojo_subyamls(data_raw, dojo_dir) - dojo_initialize_files(data, dojo_dir) - - built_dojo = dojo_from_spec(data, dojo=dojo) - - validate_challenge_paths(built_dojo, dojo_dir) - initialize_course(built_dojo, dojo_dir) - - return built_dojo - - - -def validate_challenge_paths(dojo, dojo_dir): - with dojo.located_at(dojo_dir): - missing_challenge_paths = [ - challenge - for module in dojo.modules - for challenge in module.challenges - if not challenge.path.exists() - ] - assert not missing_challenge_paths, "".join( - f"Missing challenge path: {challenge.module.id}/{challenge.id}\n" - for challenge in missing_challenge_paths) - -def initialize_course(dojo, dojo_dir): - course_yml_path = dojo_dir / "course.yml" - if course_yml_path.exists(): - course = yaml.safe_load(course_yml_path.read_text()) - - if "discord_role" in course and not dojo.official: - raise AssertionError("Unofficial dojos cannot have a discord role") - - dojo.course = course - - students_yml_path = dojo_dir / "students.yml" - if students_yml_path.exists(): - students = yaml.safe_load(students_yml_path.read_text()) - dojo.course["students"] = students - - syllabus_path = dojo_dir / "SYLLABUS.md" - if "syllabus" not in dojo.course and syllabus_path.exists(): - dojo.course["syllabus"] = syllabus_path.read_text() - - grade_path = dojo_dir / "grade.py" - if grade_path.exists(): - dojo.course["grade_code"] = grade_path.read_text() - - def generate_ssh_keypair(): @@ -188,46 +29,6 @@ def generate_ssh_keypair(): return (public_key.read_text().strip(), private_key.read_text()) -def dojo_yml_dir(spec): - yml_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR) # TODO: ignore_cleanup_errors=True - yml_dir_path = pathlib.Path(yml_dir.name) - with open(yml_dir_path / "dojo.yml", "w") as do: - do.write(spec) - return yml_dir - - -def _assert_no_symlinks(dojo_dir): - if not isinstance(dojo_dir, pathlib.Path): - dojo_dir = pathlib.Path(dojo_dir) - for path in dojo_dir.rglob("*"): - assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo" - - -def dojo_clone(repository, private_key): - tmp_dojos_dir = DOJOS_TMP_DIR - tmp_dojos_dir.mkdir(exist_ok=True) - clone_dir = tempfile.TemporaryDirectory(dir=tmp_dojos_dir) # TODO: ignore_cleanup_errors=True - - key_file = tempfile.NamedTemporaryFile("w") - key_file.write(private_key) - key_file.flush() - - url = f"https://github.com/{repository}" - if requests.head(url).status_code != 200: - url = f"git@github.com:{repository}" - subprocess.run(["git", "clone", "--depth=1", "--recurse-submodules", url, clone_dir.name], - env={ - "GIT_SSH_COMMAND": f"ssh -i {key_file.name}", - "GIT_TERMINAL_PROMPT": "0", - }, - check=True, - capture_output=True) - - _assert_no_symlinks(clone_dir.name) - - return clone_dir - - def dojo_git_command(dojo, *args, repo_path=None): key_file = tempfile.NamedTemporaryFile("w") key_file.write(dojo.private_key) @@ -245,89 +46,6 @@ def dojo_git_command(dojo, *args, repo_path=None): capture_output=True) -def dojo_create(user, repository, public_key, private_key, spec): - try: - if repository: - repository_re = r"[\w\-]+/[\w\-]+" - repository = repository.replace("https://github.com/", "") - assert re.match(repository_re, repository), f"Invalid repository, expected format: {repository_re}" - - if Dojos.query.filter_by(repository=repository).first(): - raise AssertionError("This repository already exists as a dojo") - - dojo_dir = dojo_clone(repository, private_key) - - elif spec: - assert is_admin(), "Must be an admin user to create dojos from spec rather than repositories" - dojo_dir = dojo_yml_dir(spec) - repository, public_key, private_key = None, None, None - - else: - raise AssertionError("Repository is required") - - dojo_path = pathlib.Path(dojo_dir.name) - - dojo = dojo_from_dir(dojo_path) - dojo.repository = repository - dojo.public_key = public_key - dojo.private_key = private_key - dojo.admins = [DojoAdmins(user=user)] - - db.session.add(dojo) - db.session.commit() - - dojo.path.parent.mkdir(exist_ok=True) - dojo_path.rename(dojo.path) - dojo_path.mkdir() # TODO: ignore_cleanup_errors=True - - except subprocess.CalledProcessError as e: - deploy_url = f"https://github.com/{repository}/settings/keys" - raise RuntimeError(f"Failed to clone: add deploy key") - - except IntegrityError: - raise RuntimeError("This repository already exists as a dojo") - - except AssertionError as e: - raise RuntimeError(str(e)) - - except Exception as e: - traceback.print_exc(file=sys.stderr) - raise RuntimeError("An error occurred while creating the dojo") - - return dojo - - -def dojo_update(dojo): - if dojo.path.exists(): - old_commit = dojo_git_command(dojo, "rev-parse", "HEAD").stdout.decode().strip() - - tmp_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR) - - os.rename(str(dojo.path), tmp_dir.name) - - dojo_git_command(dojo, "fetch", "--depth=1", "origin", repo_path=tmp_dir.name) - dojo_git_command(dojo, "reset", "--hard", "origin", repo_path=tmp_dir.name) - dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name) - - try: - _assert_no_symlinks(tmp_dir.name) - except AssertionError: - dojo_git_command(dojo, "reset", "--hard", old_commit, repo_path=tmp_dir.name) - dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name) - raise - finally: - os.rename(tmp_dir.name, str(dojo.path)) - else: - tmpdir = dojo_clone(dojo.repository, dojo.private_key) - os.rename(tmpdir.name, str(dojo.path)) - return dojo_from_dir(dojo.path, dojo=dojo) - - -def dojo_accessible(id): - if is_admin(): - return Dojos.from_id(id).first() - return Dojos.viewable(id=id, user=get_current_user()).first() - def dojo_admins_only(func): signature = inspect.signature(func) @@ -343,6 +61,12 @@ def wrapper(*args, **kwargs): return wrapper +def dojo_accessible(id: int) -> Dojos: + if is_admin(): + return Dojos.from_id(id).first() + return Dojos.viewable(id=id, user=get_current_user()).first() + + def dojo_route(func): signature = inspect.signature(func) @functools.wraps(func) diff --git a/dojo_plugin/utils/dojo_creation/dojo_initializer.py b/dojo_plugin/utils/dojo_creation/dojo_initializer.py new file mode 100644 index 000000000..05c0686c8 --- /dev/null +++ b/dojo_plugin/utils/dojo_creation/dojo_initializer.py @@ -0,0 +1,288 @@ +import os +import re +import sys +import subprocess +import tempfile +import pathlib +import urllib.request + +import yaml +import requests +import typing +from typing import Any +from sqlalchemy.exc import IntegrityError +from pathlib import Path +from CTFd.models import db, Users +from CTFd.utils.user import is_admin + +from ...models import DojoAdmins, Dojos +from ...config import DOJOS_DIR +from ..dojo import dojo_git_command +from .dojo_builder import dojo_from_spec + + +DOJOS_TMP_DIR = DOJOS_DIR/"tmp" +DOJOS_TMP_DIR.mkdir(exist_ok=True) + + + +def setdefault_name(data): + if "import" in data: + return + if "name" in data: + return + if "id" not in data: + return + data["name"] = data["id"].replace("-", " ").title() + + +def setdefault_description(data, file_path): + if file_path.exists(): + data.setdefault("description", file_path.read_text()) + + +def setdefault_subyaml(data: dict[str, Any], subyaml_path: Path): + if not subyaml_path.exists(): + return data + + topyaml_data = dict(data) + subyaml_data = yaml.safe_load(subyaml_path.read_text()) + data.clear() + data.update(subyaml_data) + data.update(topyaml_data) # This overwrites any subyaml data with the "topyaml" data + + +def load_dojo_subyamls(data: dict[str, Any], dojo_dir: Path) -> dict[str, Any]: + """ + The dojo yaml gets augmented with additional yamls and markdown files found in the dojo repo structure. + + The meta-structure is: + + repo-root/dojo.yml + repo-root/DESCRIPTION.md <- if dojo description is missing + repo-root/module-id/module.yml <- fills in missing fields for module in dojo.yml (only module id *needs* to be in dojo.yml) + repo-root/module-id/DESCRIPTION.md <- if module description is missing + repo-root/module-id/challenge-id/challenge.yml <- fills in missing fields for challenge in higher-level ymls (only challenge id *needs* to be in dojo.yml/module.yml) + repo-root/module-id/challenge-id/DESCRIPTION.md <- if challenge description is missing + + The higher-level details override the lower-level details. + """ + + setdefault_description(data, dojo_dir / "DESCRIPTION.md") + + for module_data in data.get("modules", []): + if "id" not in module_data: + continue + + module_dir = dojo_dir / module_data["id"] + setdefault_subyaml(module_data, module_dir / "module.yml") + setdefault_description(module_data, module_dir / "DESCRIPTION.md") + setdefault_name(module_data) + + for challenge_data in module_data.get("challenges", []): + if "id" not in challenge_data: + continue + + challenge_dir = module_dir / challenge_data["id"] + setdefault_subyaml(challenge_data, challenge_dir / "challenge.yml") + setdefault_description(challenge_data, challenge_dir / "DESCRIPTION.md") + setdefault_name(challenge_data) + + return data + + +def dojo_initialize_files(data: dict[str, Any], dojo_dir: Path): + for dojo_file in data.get("files", []): + assert is_admin(), "yml-specified files support requires admin privileges" + rel_path = dojo_dir / dojo_file["path"] + + abs_path = dojo_dir / rel_path + assert not abs_path.is_symlink(), f"{rel_path} is a symbolic link!" + if abs_path.exists(): + continue + abs_path.parent.mkdir(parents=True, exist_ok=True) + + if dojo_file["type"] == "download": + urllib.request.urlretrieve(dojo_file["url"], str(abs_path)) + assert abs_path.stat().st_size >= 50*1024*1024, f"{rel_path} is small enough to fit into git ({abs_path.stat().st_size} bytes) --- put it in the repository!" + if dojo_file["type"] == "text": + with open(abs_path, "w") as o: + o.write(dojo_file["content"]) + + +def dojo_from_dir(dojo_dir: Path, *, dojo: typing.Optional[Dojos]=None) -> Dojos: + dojo_yml_path = dojo_dir / "dojo.yml" + assert dojo_yml_path.exists(), "Missing file: `dojo.yml`" + + for path in dojo_dir.rglob("**"): + assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo" + + data_raw = yaml.safe_load(dojo_yml_path.read_text()) + data = load_dojo_subyamls(data_raw, dojo_dir) + dojo_initialize_files(data, dojo_dir) + + built_dojo = dojo_from_spec(data, dojo=dojo) + + validate_challenge_paths(built_dojo, dojo_dir) + initialize_course(built_dojo, dojo_dir) + + return built_dojo + + + +def validate_challenge_paths(dojo, dojo_dir): + with dojo.located_at(dojo_dir): + missing_challenge_paths = [ + challenge + for module in dojo.modules + for challenge in module.challenges + if not challenge.path.exists() + ] + assert not missing_challenge_paths, "".join( + f"Missing challenge path: {challenge.module.id}/{challenge.id}\n" + for challenge in missing_challenge_paths) + +def initialize_course(dojo, dojo_dir): + course_yml_path = dojo_dir / "course.yml" + if course_yml_path.exists(): + course = yaml.safe_load(course_yml_path.read_text()) + + if "discord_role" in course and not dojo.official: + raise AssertionError("Unofficial dojos cannot have a discord role") + + dojo.course = course + + students_yml_path = dojo_dir / "students.yml" + if students_yml_path.exists(): + students = yaml.safe_load(students_yml_path.read_text()) + dojo.course["students"] = students + + syllabus_path = dojo_dir / "SYLLABUS.md" + if "syllabus" not in dojo.course and syllabus_path.exists(): + dojo.course["syllabus"] = syllabus_path.read_text() + + grade_path = dojo_dir / "grade.py" + if grade_path.exists(): + dojo.course["grade_code"] = grade_path.read_text() + + +def dojo_yml_dir(spec: str) -> tempfile.TemporaryDirectory: + yml_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR) # TODO: ignore_cleanup_errors=True + yml_dir_path = pathlib.Path(yml_dir.name) + with open(yml_dir_path / "dojo.yml", "w") as do: + do.write(spec) + return yml_dir + + +def _assert_no_symlinks(dojo_dir): + if not isinstance(dojo_dir, pathlib.Path): + dojo_dir = pathlib.Path(dojo_dir) + for path in dojo_dir.rglob("*"): + assert dojo_dir == path or dojo_dir in path.resolve().parents, f"Error: symlink `{path}` references path outside of the dojo" + + +def dojo_clone(repository, private_key): + tmp_dojos_dir = DOJOS_TMP_DIR + tmp_dojos_dir.mkdir(exist_ok=True) # Creates the DOJOS_TMP_DIR if it doesn't already exist + clone_dir = tempfile.TemporaryDirectory(dir=tmp_dojos_dir) # TODO: ignore_cleanup_errors=True + + key_file = tempfile.NamedTemporaryFile("w") + key_file.write(private_key) + key_file.flush() + + url = f"https://github.com/{repository}" + + # If the github repository isn't public, the url is set so that cloning can be done over ssh + if requests.head(url).status_code != 200: + url = f"git@github.com:{repository}" + + subprocess.run(["git", "clone", "--depth=1", "--recurse-submodules", url, clone_dir.name], + env={ + "GIT_SSH_COMMAND": f"ssh -i {key_file.name}", + "GIT_TERMINAL_PROMPT": "0", + }, + check=True, + capture_output=True) + + _assert_no_symlinks(clone_dir.name) + + return clone_dir + + +def dojo_create(user: Users, repository: str, public_key: str, private_key: str , spec: str): + try: + if repository: + repository_re = r"[\w\-]+/[\w\-]+" + repository = repository.replace("https://github.com/", "") + assert re.match(repository_re, repository), f"Invalid repository, expected format: {repository_re}" + + if Dojos.query.filter_by(repository=repository).first(): + raise AssertionError("This repository already exists as a dojo") + + dojo_dir = dojo_clone(repository, private_key) + + elif spec: + assert is_admin(), "Must be an admin user to create dojos from spec rather than repositories" + dojo_dir = dojo_yml_dir(spec) + repository, public_key, private_key = None, None, None + + else: + raise AssertionError("Repository or specification is required") + + dojo_path = pathlib.Path(dojo_dir.name) + + dojo = dojo_from_dir(dojo_path) + dojo.repository = repository + dojo.public_key = public_key + dojo.private_key = private_key + dojo.admins = [DojoAdmins(user=user)] + + db.session.add(dojo) + db.session.commit() + + dojo.path.parent.mkdir(exist_ok=True) + dojo_path.rename(dojo.path) + dojo_path.mkdir() # TODO: ignore_cleanup_errors=True + + except subprocess.CalledProcessError as e: + deploy_url = f"https://github.com/{repository}/settings/keys" + raise RuntimeError(f"Failed to clone: add deploy key") + + except IntegrityError: + raise RuntimeError("This repository already exists as a dojo") + + except AssertionError as e: + raise RuntimeError(str(e)) + + except Exception as e: + print(f"Encountered error: {e}", file=sys.stderr, flush=True) + raise RuntimeError("An error occurred while creating the dojo") + + return dojo + + +def dojo_update(dojo): + if dojo.path.exists(): + old_commit = dojo_git_command(dojo, "rev-parse", "HEAD").stdout.decode().strip() + + tmp_dir = tempfile.TemporaryDirectory(dir=DOJOS_TMP_DIR) + + os.rename(str(dojo.path), tmp_dir.name) + + dojo_git_command(dojo, "fetch", "--depth=1", "origin", repo_path=tmp_dir.name) + dojo_git_command(dojo, "reset", "--hard", "origin", repo_path=tmp_dir.name) + dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name) + + try: + _assert_no_symlinks(tmp_dir.name) + except AssertionError: + dojo_git_command(dojo, "reset", "--hard", old_commit, repo_path=tmp_dir.name) + dojo_git_command(dojo, "submodule", "update", "--init", "--recursive", repo_path=tmp_dir.name) + raise + finally: + os.rename(tmp_dir.name, str(dojo.path)) + else: + tmpdir = dojo_clone(dojo.repository, dojo.private_key) + os.rename(tmpdir.name, str(dojo.path)) + return dojo_from_dir(dojo.path, dojo=dojo) + From 7f8c387ccc2bba5871b9c05b2ec1d4b2e5bc9bbf Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Wed, 2 Jul 2025 18:39:30 -0700 Subject: [PATCH 05/11] Add Connor's error logging improvement --- dojo_plugin/utils/dojo_creation/dojo_initializer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dojo_plugin/utils/dojo_creation/dojo_initializer.py b/dojo_plugin/utils/dojo_creation/dojo_initializer.py index 05c0686c8..1ed76922f 100644 --- a/dojo_plugin/utils/dojo_creation/dojo_initializer.py +++ b/dojo_plugin/utils/dojo_creation/dojo_initializer.py @@ -3,6 +3,7 @@ import sys import subprocess import tempfile +import traceback import pathlib import urllib.request @@ -255,7 +256,7 @@ def dojo_create(user: Users, repository: str, public_key: str, private_key: str raise RuntimeError(str(e)) except Exception as e: - print(f"Encountered error: {e}", file=sys.stderr, flush=True) + traceback.print_exc(file=sys.stderr) raise RuntimeError("An error occurred while creating the dojo") return dojo From fa2dc5187a8fc6b190f9e38b3063182bc70ce9b4 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Thu, 3 Jul 2025 09:21:46 -0700 Subject: [PATCH 06/11] Re-fix a benign bug in dojo_initialize_files --- dojo_plugin/utils/dojo_creation/dojo_initializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dojo_plugin/utils/dojo_creation/dojo_initializer.py b/dojo_plugin/utils/dojo_creation/dojo_initializer.py index 1ed76922f..878e68a38 100644 --- a/dojo_plugin/utils/dojo_creation/dojo_initializer.py +++ b/dojo_plugin/utils/dojo_creation/dojo_initializer.py @@ -95,7 +95,7 @@ def load_dojo_subyamls(data: dict[str, Any], dojo_dir: Path) -> dict[str, Any]: def dojo_initialize_files(data: dict[str, Any], dojo_dir: Path): for dojo_file in data.get("files", []): assert is_admin(), "yml-specified files support requires admin privileges" - rel_path = dojo_dir / dojo_file["path"] + rel_path = dojo_file["path"] abs_path = dojo_dir / rel_path assert not abs_path.is_symlink(), f"{rel_path} is a symbolic link!" From cde61ba69014df01fdc12cd969712b28a4063796 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Thu, 3 Jul 2025 10:51:13 -0700 Subject: [PATCH 07/11] Move dojo_creation to top level of dojo_plugin --- dojo_plugin/api/v1/dojos.py | 2 +- dojo_plugin/{utils => }/dojo_creation/builder_utils.py | 0 dojo_plugin/{utils => }/dojo_creation/challenge_builder.py | 2 +- dojo_plugin/{utils => }/dojo_creation/dojo_builder.py | 2 +- dojo_plugin/{utils => }/dojo_creation/dojo_initializer.py | 6 +++--- dojo_plugin/{utils => }/dojo_creation/module_builder.py | 2 +- dojo_plugin/pages/dojo.py | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) rename dojo_plugin/{utils => }/dojo_creation/builder_utils.py (100%) rename dojo_plugin/{utils => }/dojo_creation/challenge_builder.py (98%) rename dojo_plugin/{utils => }/dojo_creation/dojo_builder.py (98%) rename dojo_plugin/{utils => }/dojo_creation/dojo_initializer.py (98%) rename dojo_plugin/{utils => }/dojo_creation/module_builder.py (96%) diff --git a/dojo_plugin/api/v1/dojos.py b/dojo_plugin/api/v1/dojos.py index aad3756f7..c0a9db047 100644 --- a/dojo_plugin/api/v1/dojos.py +++ b/dojo_plugin/api/v1/dojos.py @@ -12,7 +12,7 @@ from ...models import DojoStudents, Dojos, DojoModules, DojoChallenges, DojoUsers, Emojis, SurveyResponses from ...utils import render_markdown, is_challenge_locked from ...utils.dojo import dojo_route, dojo_admins_only -from ...utils.dojo_creation.dojo_initializer import dojo_create +from ...dojo_creation.dojo_initializer import dojo_create dojos_namespace = Namespace( diff --git a/dojo_plugin/utils/dojo_creation/builder_utils.py b/dojo_plugin/dojo_creation/builder_utils.py similarity index 100% rename from dojo_plugin/utils/dojo_creation/builder_utils.py rename to dojo_plugin/dojo_creation/builder_utils.py diff --git a/dojo_plugin/utils/dojo_creation/challenge_builder.py b/dojo_plugin/dojo_creation/challenge_builder.py similarity index 98% rename from dojo_plugin/utils/dojo_creation/challenge_builder.py rename to dojo_plugin/dojo_creation/challenge_builder.py index dcf08b1eb..721492f0f 100644 --- a/dojo_plugin/utils/dojo_creation/challenge_builder.py +++ b/dojo_plugin/dojo_creation/challenge_builder.py @@ -1,6 +1,6 @@ from schema import Schema, Optional, SchemaError -from ...models import Dojos, DojoChallenges, DojoChallengeVisibilities, Challenges, Flags +from ..models import Dojos, DojoChallenges, DojoChallengeVisibilities, Challenges, Flags from CTFd.utils.user import is_admin from .builder_utils import ( ID_REGEX, diff --git a/dojo_plugin/utils/dojo_creation/dojo_builder.py b/dojo_plugin/dojo_creation/dojo_builder.py similarity index 98% rename from dojo_plugin/utils/dojo_creation/dojo_builder.py rename to dojo_plugin/dojo_creation/dojo_builder.py index 0e4526320..4eecdbbea 100644 --- a/dojo_plugin/utils/dojo_creation/dojo_builder.py +++ b/dojo_plugin/dojo_creation/dojo_builder.py @@ -2,7 +2,7 @@ from schema import Schema, Optional, Regex, Or, SchemaError -from ...models import Dojos +from ..models import Dojos from .builder_utils import ( ID_REGEX, UNIQUE_ID_REGEX, diff --git a/dojo_plugin/utils/dojo_creation/dojo_initializer.py b/dojo_plugin/dojo_creation/dojo_initializer.py similarity index 98% rename from dojo_plugin/utils/dojo_creation/dojo_initializer.py rename to dojo_plugin/dojo_creation/dojo_initializer.py index 878e68a38..3ad9d3ef1 100644 --- a/dojo_plugin/utils/dojo_creation/dojo_initializer.py +++ b/dojo_plugin/dojo_creation/dojo_initializer.py @@ -16,9 +16,9 @@ from CTFd.models import db, Users from CTFd.utils.user import is_admin -from ...models import DojoAdmins, Dojos -from ...config import DOJOS_DIR -from ..dojo import dojo_git_command +from ..models import DojoAdmins, Dojos +from ..config import DOJOS_DIR +from ..utils.dojo import dojo_git_command from .dojo_builder import dojo_from_spec diff --git a/dojo_plugin/utils/dojo_creation/module_builder.py b/dojo_plugin/dojo_creation/module_builder.py similarity index 96% rename from dojo_plugin/utils/dojo_creation/module_builder.py rename to dojo_plugin/dojo_creation/module_builder.py index de8f56935..c0af1bbe7 100644 --- a/dojo_plugin/utils/dojo_creation/module_builder.py +++ b/dojo_plugin/dojo_creation/module_builder.py @@ -1,6 +1,6 @@ from schema import Schema, Optional, Or, SchemaError -from ...models import DojoModules, DojoResources, DojoModuleVisibilities, DojoResourceVisibilities +from ..models import DojoModules, DojoResources, DojoModuleVisibilities, DojoResourceVisibilities from .builder_utils import ( ID_REGEX, UNIQUE_ID_REGEX, diff --git a/dojo_plugin/pages/dojo.py b/dojo_plugin/pages/dojo.py index 620358fd5..52ae5504d 100644 --- a/dojo_plugin/pages/dojo.py +++ b/dojo_plugin/pages/dojo.py @@ -16,7 +16,7 @@ from ..utils.stats import get_container_stats, get_dojo_stats from ..utils.dojo import dojo_route, get_current_dojo_challenge, dojo_admins_only from ..models import Dojos, DojoUsers, DojoStudents, DojoModules, DojoMembers, DojoChallenges -from ..utils.dojo_creation.dojo_initializer import dojo_update +from ..dojo_creation.dojo_initializer import dojo_update dojo = Blueprint("pwncollege_dojo", __name__) #pylint:disable=redefined-outer-name From 3ef3300d1952bdaa1c707b62410881c3b533c081 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Thu, 3 Jul 2025 10:59:32 -0700 Subject: [PATCH 08/11] Remove dojo_resources defaults --- dojo_plugin/models/__init__.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/dojo_plugin/models/__init__.py b/dojo_plugin/models/__init__.py index 0719f2fe4..3c74c3120 100644 --- a/dojo_plugin/models/__init__.py +++ b/dojo_plugin/models/__init__.py @@ -619,28 +619,12 @@ class DojoResources(db.Model): def __init__(self, *args, **kwargs): - default = kwargs.pop("default", None) - data = kwargs.pop("data", {}) for field in self.data_fields: if field in kwargs: data[field] = kwargs.pop(field) kwargs["data"] = data - if default: - if kwargs.get("data"): - raise AttributeError("Import requires data to be empty") - - for field in ["type", "name"]: - kwargs[field] = kwargs[field] if kwargs.get(field) is not None else getattr(default, field, None) - - for field in self.data_fields: - kwargs["data"][field] = ( - kwargs["data"][field] - if kwargs["data"].get(field) is not None - else getattr(default, field, None) - ) - super().__init__(*args, **kwargs) def __getattr__(self, name): From ea9916f871c2fd37abbde38fbecdc4a207d914ae Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Thu, 3 Jul 2025 16:50:53 -0700 Subject: [PATCH 09/11] Fix missing id and import module/challenge issue --- dojo_plugin/dojo_creation/builder_utils.py | 6 ++++-- dojo_plugin/dojo_creation/challenge_builder.py | 15 ++++++++++----- dojo_plugin/dojo_creation/dojo_builder.py | 2 ++ dojo_plugin/dojo_creation/module_builder.py | 13 +++++++++---- 4 files changed, 25 insertions(+), 11 deletions(-) diff --git a/dojo_plugin/dojo_creation/builder_utils.py b/dojo_plugin/dojo_creation/builder_utils.py index 319677e38..ae7188d68 100644 --- a/dojo_plugin/dojo_creation/builder_utils.py +++ b/dojo_plugin/dojo_creation/builder_utils.py @@ -69,10 +69,12 @@ def import_one(query, error_message): except NoResultFound: raise AssertionError(error_message) -def first_present(key, *dicts): +def first_present(key, *dicts, required=False): for d in dicts: - if key in d: + if d and key in d: return d[key] + if required: + raise KeyError(f"Required key '{key}' not found in data.") return None def get_visibility(cls, *dicts): diff --git a/dojo_plugin/dojo_creation/challenge_builder.py b/dojo_plugin/dojo_creation/challenge_builder.py index 721492f0f..2ab9f4bd3 100644 --- a/dojo_plugin/dojo_creation/challenge_builder.py +++ b/dojo_plugin/dojo_creation/challenge_builder.py @@ -52,11 +52,14 @@ def get_challenge(dojo, module_id, challenge_id, transfer) -> Challenges: def import_challenge(challenge_data, module_data, dojo_data) -> DojoChallenges: # Handles the heirarchy of imports - import_data = ( - first_present("dojo", challenge_data["import"], module_data["import"], dojo_data["import"]), - first_present("module", challenge_data["import"], module_data["import"]), # No need to check dojo_data imports because module can never be defined there - challenge_data["import"]["challenge"], - ) + try: + import_data = ( + first_present("dojo", challenge_data["import"], module_data.get("import"), dojo_data.get("import"), required=True), + first_present("module", challenge_data["import"], module_data.get("import"), required=True), # No need to check dojo_data imports because module can never be defined there + challenge_data["import"]["challenge"], + ) + except KeyError as e: + raise AssertionError(f'Import Error: {e}') imported_challenge = import_one(DojoChallenges.from_id(*import_data), f"{'/'.join(import_data)} does not exist") for attr in ["id", "name", "description"]: @@ -96,6 +99,8 @@ def challenges_from_spec(dojo, dojo_data, module_data) -> list[DojoChallenges]: else: ctfd_challenge = get_challenge(dojo, module_id, challenge_data.get("id"), transfer=challenge_data.get("transfer")) + assert challenge_data.get("id") is not None, f"Challenge id not present in challenge data. {challenge_data=}" + result.append( DojoChallenges( **{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]}, diff --git a/dojo_plugin/dojo_creation/dojo_builder.py b/dojo_plugin/dojo_creation/dojo_builder.py index 4eecdbbea..8443094fa 100644 --- a/dojo_plugin/dojo_creation/dojo_builder.py +++ b/dojo_plugin/dojo_creation/dojo_builder.py @@ -83,6 +83,8 @@ def dojo_from_spec(data: dict, *, dojo=None) -> Dojos: for name, value in dojo_kwargs.items(): setattr(dojo, name, value) + assert dojo_data.get("id") is not None, "Dojo id must be defined" + dojo.modules = modules_from_spec(dojo, dojo_data) return dojo \ No newline at end of file diff --git a/dojo_plugin/dojo_creation/module_builder.py b/dojo_plugin/dojo_creation/module_builder.py index c0af1bbe7..0263f372d 100644 --- a/dojo_plugin/dojo_creation/module_builder.py +++ b/dojo_plugin/dojo_creation/module_builder.py @@ -60,10 +60,13 @@ def build_dojo_resources(module_data, dojo_data): def import_module(module_data, dojo_data): - import_data = ( - first_present("dojo", module_data["import"], dojo_data["import"]), - module_data["import"]["module"], - ) + try: + import_data = ( + first_present("dojo", module_data["import"], dojo_data.get("import"), required=True), + module_data["import"]["module"], + ) + except KeyError as e: + raise AssertionError(f'Import Error: {e}') imported_module = import_one(DojoModules.from_id(*import_data), f"{'/'.join(import_data)} does not exist") for attr in ["id", "name", "description"]: @@ -92,6 +95,8 @@ def modules_from_spec(dojo, dojo_data): for module_data in module_list: if "import" in module_data: import_module(module_data, dojo_data) + + assert module_data.get("id") is not None, f"Module id not present in module data. {module_data=}" result.append( DojoModules( From ec2e0885380036672fa627f0494cca02cb08d100 Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Thu, 3 Jul 2025 17:13:04 -0700 Subject: [PATCH 10/11] Remove dojo from error message print Dojo should not be printed to the console on error because if the dojo creation failed, there are cases when the dojo is not printable, causing it to throw more errors. --- dojo_plugin/pages/dojo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dojo_plugin/pages/dojo.py b/dojo_plugin/pages/dojo.py index 52ae5504d..4705b5e39 100644 --- a/dojo_plugin/pages/dojo.py +++ b/dojo_plugin/pages/dojo.py @@ -147,7 +147,7 @@ def update_dojo(dojo, update_code=None): dojo_update(dojo) db.session.commit() except Exception as e: - print(f"ERROR: Dojo failed for {dojo}", file=sys.stderr, flush=True) + print(f"ERROR: Dojo update failed.", file=sys.stderr, flush=True) traceback.print_exc(file=sys.stderr) return {"success": False, "error": str(e)}, 400 return {"success": True} From 4d62fdc86187963ff57b673029f2b0d71163a0cf Mon Sep 17 00:00:00 2001 From: Max Trapido Date: Mon, 7 Jul 2025 20:05:39 -0700 Subject: [PATCH 11/11] Remove random comments --- dojo_plugin/dojo_creation/dojo_initializer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dojo_plugin/dojo_creation/dojo_initializer.py b/dojo_plugin/dojo_creation/dojo_initializer.py index 3ad9d3ef1..a6c13e9a9 100644 --- a/dojo_plugin/dojo_creation/dojo_initializer.py +++ b/dojo_plugin/dojo_creation/dojo_initializer.py @@ -184,7 +184,7 @@ def _assert_no_symlinks(dojo_dir): def dojo_clone(repository, private_key): tmp_dojos_dir = DOJOS_TMP_DIR - tmp_dojos_dir.mkdir(exist_ok=True) # Creates the DOJOS_TMP_DIR if it doesn't already exist + tmp_dojos_dir.mkdir(exist_ok=True) clone_dir = tempfile.TemporaryDirectory(dir=tmp_dojos_dir) # TODO: ignore_cleanup_errors=True key_file = tempfile.NamedTemporaryFile("w") @@ -193,7 +193,6 @@ def dojo_clone(repository, private_key): url = f"https://github.com/{repository}" - # If the github repository isn't public, the url is set so that cloning can be done over ssh if requests.head(url).status_code != 200: url = f"git@github.com:{repository}"