Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dojo_plugin/api/v1/dojos.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

from ...models import DojoStudents, Dojos, DojoModules, DojoChallenges, DojoUsers, Emojis, SurveyResponses
from ...utils import render_markdown, is_challenge_locked
from ...utils.dojo import dojo_route, dojo_admins_only, dojo_create
from ...utils.dojo import dojo_route, dojo_admins_only
from ...dojo_creation.dojo_initializer import dojo_create


dojos_namespace = Namespace(
Expand Down
89 changes: 89 additions & 0 deletions dojo_plugin/dojo_creation/builder_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from schema import Optional, Regex, Or, Use
import datetime
from sqlalchemy.orm.exc import NoResultFound


ID_REGEX = Regex(r"^[a-z0-9-]{1,32}$")
UNIQUE_ID_REGEX = Regex(r"^[a-z0-9-~]{1,128}$")
NAME_REGEX = Regex(r"^[\S ]{1,128}$")
IMAGE_REGEX = Regex(r"^[\S]{1,256}$")
FILE_PATH_REGEX = Regex(r"^[A-Za-z0-9_][A-Za-z0-9-_./]*$")
FILE_URL_REGEX = Regex(r"^https://www.dropbox.com/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9]*/[a-zA-Z0-9.-_]*?rlkey=[a-zA-Z0-9]*&dl=1")
DATE = Use(datetime.datetime.fromisoformat)

ID_NAME_DESCRIPTION = {
Optional("id"): ID_REGEX,
Optional("name"): NAME_REGEX,
Optional("description"): str,
}

VISIBILITY = {
Optional("visibility", default={}): {
Optional("start"): DATE,
Optional("stop"): DATE,
}
}

SURVEY = {
Optional("survey"): Or(
{
"type": "multiplechoice",
"prompt": str,
Optional("probability"): float,
"options": [str],
},
{
"type": "thumb",
"prompt": str,
Optional("probability"): float,
},
{
"type": "freeform",
"prompt": str,
Optional("probability"): float,
},
)
}

BASE_SPEC = {
**ID_NAME_DESCRIPTION,
**VISIBILITY,

Optional("image"): IMAGE_REGEX,
Optional("allow_privileged"): bool,
Optional("importable"): bool,
Optional("auxiliary", default={}, ignore_extra_keys=True): dict,

**SURVEY,
}
"""
Dictionary for specification fields that are defined identically in all three layers of the specification schema
"""


def import_one(query, error_message):
try:
o = query.one()
assert o.importable, f"Import disallowed for {o}."
return o
except NoResultFound:
raise AssertionError(error_message)

def first_present(key, *dicts, required=False):
for d in dicts:
if d and key in d:
return d[key]
if required:
raise KeyError(f"Required key '{key}' not found in data.")
return None

def get_visibility(cls, *dicts):
visibility = first_present("visibility", *dicts)

if visibility:
start = visibility["start"].astimezone(datetime.timezone.utc) if "start" in visibility else None
stop = visibility["stop"].astimezone(datetime.timezone.utc) if "stop" in visibility else None
assert start or stop, "`start` or `stop` value must be present under visibility"
return cls(start=start, stop=stop)

return None
117 changes: 117 additions & 0 deletions dojo_plugin/dojo_creation/challenge_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from schema import Schema, Optional, SchemaError

from ..models import Dojos, DojoChallenges, DojoChallengeVisibilities, Challenges, Flags
from CTFd.utils.user import is_admin
from .builder_utils import (
ID_REGEX,
UNIQUE_ID_REGEX,
BASE_SPEC,
import_one,
first_present,
get_visibility,
)


CHALLENGE_SPEC = Schema([{
**BASE_SPEC,

Optional("progression_locked"): bool,
# Optional("path"): Regex(r"^[^\s\.\/][^\s\.]{,255}$"),

Optional("import"): {
Optional("dojo"): UNIQUE_ID_REGEX,
Optional("module"): ID_REGEX,
"challenge": ID_REGEX,
},

Optional("transfer"): {
Optional("dojo"): UNIQUE_ID_REGEX,
Optional("module"): ID_REGEX,
"challenge": ID_REGEX,
},
}])




def get_challenge(dojo, module_id, challenge_id, transfer) -> Challenges:
if chal := Challenges.query.filter_by(category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}").first():
return chal
if transfer:
assert dojo.official or (is_admin() and not Dojos.from_id(dojo.id).first()), "Transfer Error: transfers can only be utilized by official dojos or by system admins during dojo creation"
old_dojo_id, old_module_id, old_challenge_id = transfer["dojo"], transfer["module"], transfer["challenge"]
old_dojo = Dojos.from_id(old_dojo_id).first()
assert old_dojo, f"Transfer Error: unable to find source dojo in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}"
old_challenge = Challenges.query.filter_by(category=old_dojo.hex_dojo_id, name=f"{old_module_id}:{old_challenge_id}").first()
assert old_challenge, f"Transfer Error: unable to find source module/challenge in database for {old_dojo_id}:{old_module_id}:{old_challenge_id}"
old_challenge.category = dojo.hex_dojo_id
old_challenge.name = f"{module_id}:{challenge_id}"
return old_challenge
return Challenges(type="dojo", category=dojo.hex_dojo_id, name=f"{module_id}:{challenge_id}", flags=[Flags(type="dojo")])


def import_challenge(challenge_data, module_data, dojo_data) -> DojoChallenges:
# Handles the heirarchy of imports
try:
import_data = (
first_present("dojo", challenge_data["import"], module_data.get("import"), dojo_data.get("import"), required=True),
first_present("module", challenge_data["import"], module_data.get("import"), required=True), # No need to check dojo_data imports because module can never be defined there
challenge_data["import"]["challenge"],
)
except KeyError as e:
raise AssertionError(f'Import Error: {e}')

imported_challenge = import_one(DojoChallenges.from_id(*import_data), f"{'/'.join(import_data)} does not exist")
for attr in ["id", "name", "description"]:
if attr not in challenge_data:
challenge_data[attr] = getattr(imported_challenge, attr)

# TODO: maybe we should track the entire import
challenge_data["image"] = imported_challenge.data.get("image")
return imported_challenge



def challenges_from_spec(dojo, dojo_data, module_data) -> list[DojoChallenges]:
try:
challenge_list = CHALLENGE_SPEC.validate(module_data["challenges"])
except SchemaError as e:
raise AssertionError(f"Invalid challenge specification: {e}")

module_id = module_data["id"]

# This is for caching existing challenges to improve performance of updating a dojo
existing_module = next((module for module in dojo.modules if module.id == module_id), None)
existing_challenges = {challenge.id: challenge.challenge for challenge in existing_module.challenges} if existing_module else {}

result = []
for challenge_data in challenge_list:
data_priority_chain = (challenge_data, module_data, dojo_data)

path_override = None
challenge_id = challenge_data.get("id")
if "import" in challenge_data:
imported_challenge = import_challenge(*data_priority_chain) # import has to be done before DojoChallenges creation because it modifies challenge_data
path_override = str(imported_challenge.path)
ctfd_challenge = imported_challenge.challenge
elif challenge_id in existing_challenges:
ctfd_challenge = existing_challenges[challenge_id]
else:
ctfd_challenge = get_challenge(dojo, module_id, challenge_data.get("id"), transfer=challenge_data.get("transfer"))

assert challenge_data.get("id") is not None, f"Challenge id not present in challenge data. {challenge_data=}"

result.append(
DojoChallenges(
**{kwarg: challenge_data.get(kwarg) for kwarg in ["id", "name", "description"]},
image=first_present("image", *data_priority_chain),
allow_privileged=first_present("allow_privileged", *data_priority_chain, DojoChallenges.data_defaults),
importable=first_present("importable", *data_priority_chain, DojoChallenges.data_defaults),
progression_locked=first_present("progression_locked", challenge_data, DojoChallenges.data_defaults),
survey=first_present("survey", *data_priority_chain),
visibility=get_visibility(DojoChallengeVisibilities, *data_priority_chain),
path_override=path_override,
challenge=ctfd_challenge,
)
)
return result
94 changes: 94 additions & 0 deletions dojo_plugin/dojo_creation/dojo_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import yaml

from schema import Schema, Optional, Regex, Or, SchemaError

from ..models import Dojos
from .builder_utils import (
ID_REGEX,
UNIQUE_ID_REGEX,
IMAGE_REGEX,
FILE_PATH_REGEX,
FILE_URL_REGEX,
BASE_SPEC,
import_one,
)
from .module_builder import modules_from_spec


DOJO_SPEC = Schema({
**BASE_SPEC,

Optional("password"): Regex(r"^[\S ]{8,128}$"),

Optional("type"): ID_REGEX,
Optional("award"): {
Optional("emoji"): Regex(r"^\S$"),
Optional("belt"): IMAGE_REGEX
},

Optional("show_scoreboard"): bool,


Optional("import"): {
"dojo": UNIQUE_ID_REGEX,
},


Optional("pages", default=[]): [str],
Optional("files", default=[]): [Or(
{
"type": "download",
"path": FILE_PATH_REGEX,
"url": FILE_URL_REGEX,
},
{
"type": "text",
"path": FILE_PATH_REGEX,
"content": str,
}
)],

Optional("modules", default=[]): list, # Defer module validation until later
})


DOJO_ATTRIBUTES = ["id", "name", "description", "password", "type", "award", "pages", "show_scoreboard"]


def import_dojo(dojo_data):
# TODO: we probably don't need to restrict imports to official dojos
imported_dojo = import_one(
Dojos.from_id(dojo_data["import"]["dojo"]).filter_by(official=True),
f"Import dojo `{dojo_data['import']['dojo']}` does not exist"
)

for attr in DOJO_ATTRIBUTES:
if attr not in dojo_data:
dojo_data[attr] = getattr(imported_dojo, attr)

# Modules will be initialized at the module layer, and challenges at the challenge layer
if not dojo_data["modules"]:
dojo_data["modules"] = [{"import": {"module": module.id}} for module in imported_dojo.modules]


def dojo_from_spec(data: dict, *, dojo=None) -> Dojos:
try:
dojo_data = DOJO_SPEC.validate(data)
except SchemaError as e:
raise AssertionError(f"Invalid dojo specification: {e}")

if "import" in dojo_data:
import_dojo(dojo_data)

dojo_kwargs = {attr: dojo_data.get(attr) for attr in DOJO_ATTRIBUTES}
if dojo is None:
dojo = Dojos(**dojo_kwargs)
else:
for name, value in dojo_kwargs.items():
setattr(dojo, name, value)

assert dojo_data.get("id") is not None, "Dojo id must be defined"

dojo.modules = modules_from_spec(dojo, dojo_data)

return dojo
Loading