Skip to content

Commit

Permalink
Add a function to save bids validator and schema version
Browse files Browse the repository at this point in the history
  • Loading branch information
tientong98 committed Dec 12, 2024
1 parent 5ba0c71 commit 38cf11b
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 1 deletion.
36 changes: 36 additions & 0 deletions cubids/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,41 @@ def _enter_validate(argv=None):
workflows.validate(**args)


def _parse_bids_version():
parser = argparse.ArgumentParser(
description="cubids bids-version: Get BIDS Validator and Schema version",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
PathExists = partial(_path_exists, parser=parser)

parser.add_argument(
"bids_dir",
type=PathExists,
action="store",
help=(
"the root of a BIDS dataset. It should contain "
"sub-X directories and dataset_description.json"
),
)
parser.add_argument(
"--write",
action="store_true",
default=False,
help=(
"Save the validator and schema version to 'dataset_description.json' "
"when using `cubids bids-version /bids/path --write`. "
"By default, `cubids bids-version /bids/path` prints to the terminal."
),
)
return parser


def _enter_bids_version(argv=None):
options = _parse_bids_version().parse_args(argv)
args = vars(options).copy()
workflows.bids_version(**args)


def _parse_bids_sidecar_merge():
parser = argparse.ArgumentParser(
description=("bids-sidecar-merge: merge critical keys from one sidecar to another"),
Expand Down Expand Up @@ -655,6 +690,7 @@ def _enter_print_metadata_fields(argv=None):

COMMANDS = [
("validate", _parse_validate, workflows.validate),
("bids-version", _parse_bids_version, workflows.bids_version),
("sidecar-merge", _parse_bids_sidecar_merge, workflows.bids_sidecar_merge),
("group", _parse_group, workflows.group),
("apply", _parse_apply, workflows.apply),
Expand Down
134 changes: 133 additions & 1 deletion cubids/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import pathlib
import subprocess
import re

import pandas as pd

Expand All @@ -24,6 +25,22 @@ def build_validator_call(path, ignore_headers=False):
return command


def get_bids_validator_version():
"""Get the version of the BIDS validator.
Returns
-------
version : :obj:`str`
Version of the BIDS validator.
"""
command = ["deno", "run", "-A", "jsr:@bids/validator", "--version"]
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = result.stdout.decode("utf-8").strip()
version = output.split()[-1]
clean_ver = re.sub(r'\x1b\[[0-9;]*m', '', version) # Remove ANSI color codes
return {"ValidatorVersion": clean_ver}


def build_subject_paths(bids_dir):
"""Build a list of BIDS dirs with 1 subject each."""
bids_dir = str(bids_dir)
Expand Down Expand Up @@ -52,6 +69,26 @@ def build_subject_paths(bids_dir):
return subjects_dict


def build_first_subject_path(bids_dir, subject):
"""Build a list of BIDS dirs with 1 subject each."""
bids_dir = str(bids_dir)
if not bids_dir.endswith("/"):
bids_dir += "/"

root_files = [x for x in glob.glob(bids_dir + "*") if os.path.isfile(x)]

subject_dict = {}

purepath = pathlib.PurePath(subject)
sub_label = purepath.name

files = [x for x in glob.glob(subject + "**", recursive=True) if os.path.isfile(x)]
files.extend(root_files)
subject_dict[sub_label] = files

return subject_dict


def run_validator(call):
"""Run the validator with subprocess.
Expand Down Expand Up @@ -103,6 +140,7 @@ def parse_issue(issue_dict):
return {
"location": issue_dict.get("location", ""),
"code": issue_dict.get("code", ""),
"issueMessage": issue_dict.get("issueMessage", ""),
"subCode": issue_dict.get("subCode", ""),
"severity": issue_dict.get("severity", ""),
"rule": issue_dict.get("rule", ""),
Expand All @@ -114,7 +152,9 @@ def parse_issue(issue_dict):
# Extract issues
issues = data.get("issues", {}).get("issues", [])
if not issues:
return pd.DataFrame(columns=["location", "code", "subCode", "severity", "rule"])
return pd.DataFrame(
columns=["location", "code", "issueMessage", "subCode", "severity", "rule"]
)

# Parse all issues
parsed_issues = [parse_issue(issue) for issue in issues]
Expand All @@ -135,7 +175,99 @@ def get_val_dictionary():
return {
"location": {"Description": "File with the validation issue."},
"code": {"Description": "Code of the validation issue."},
"issueMessage": {"Description": "Validation issue message."},
"subCode": {"Description": "Subcode providing additional issue details."},
"severity": {"Description": "Severity of the issue (e.g., warning, error)."},
"rule": {"Description": "Validation rule that triggered the issue."},
}


def extract_summary_info(output):
"""Extract summary information from the JSON output.
Parameters
----------
output : str
JSON string of BIDS validator output.
Returns
-------
dict
Dictionary containing SchemaVersion and other summary info.
"""
try:
data = json.loads(output)
except json.JSONDecodeError as e:
raise ValueError("Invalid JSON provided to get SchemaVersion.") from e

summary = data.get("summary", {})

return {"SchemaVersion": summary.get("schemaVersion", "")}


def update_dataset_description(path, new_info):
"""Update or append information to dataset_description.json.
Parameters
----------
path : :obj:`str`
Path to the dataset.
new_info : :obj:`dict`
Information to add or update.
"""
description_path = os.path.join(path, "dataset_description.json")

# Load existing data if the file exists
if os.path.exists(description_path):
with open(description_path, "r") as f:
existing_data = json.load(f)
else:
existing_data = {}

# Update the existing data with the new info
existing_data.update(new_info)

# Write the updated data back to the file
with open(description_path, "w") as f:
json.dump(existing_data, f, indent=4)
print(f"Updated dataset_description.json at: {description_path}")

# Check if .datalad directory exists before running the DataLad save command
datalad_dir = os.path.join(path, ".datalad")
if os.path.exists(datalad_dir) and os.path.isdir(datalad_dir):
try:
subprocess.run(
["datalad", "save", "-m",
"Save BIDS validator and schema version to dataset_description",
description_path],
check=True
)
print("Changes saved with DataLad.")
except subprocess.CalledProcessError as e:
print(f"Error running DataLad save: {e}")


def bids_validator_version(output, path, write=False):
"""Save BIDS validator and schema version.
Parameters
----------
output : :obj:`str`
Path to JSON file of BIDS validator output.
path : :obj:`str`
Path to the dataset.
write : :obj:`bool`
If True, write to dataset_description.json. If False, print to terminal.
"""
# Get the BIDS validator version
validator_version = get_bids_validator_version()
# Extract schemaVersion
summary_info = extract_summary_info(output)

combined_info = {**validator_version, **summary_info}

if write:
# Update the dataset_description.json file
update_dataset_description(path, combined_info)
elif not write:
print(combined_info)
69 changes: 69 additions & 0 deletions cubids/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
get_val_dictionary,
parse_validator_output,
run_validator,
build_first_subject_path,
bids_validator_version,
)

warnings.simplefilter(action="ignore", category=FutureWarning)
Expand Down Expand Up @@ -258,6 +260,73 @@ def validate(
sys.exit(proc.returncode)


def bids_version(
bids_dir,
write=False
):
"""Get BIDS validator and schema version.
Parameters
----------
bids_dir : :obj:`pathlib.Path`
Path to the BIDS directory.
write : :obj:`bool`
If True, write to dataset_description.json. If False, print to terminal.
"""
# Need to run validator to get output with schema version
# Copy code from `validate --sequential`

try: # return first subject
# Get all folders that start with "sub-"
sub_folders = [
name
for name in os.listdir(bids_dir)
if os.path.isdir(os.path.join(bids_dir, name)) and name.startswith("sub-")
]
if not sub_folders:
raise ValueError("No folders starting with 'sub-' found. Please provide a valid BIDS.")
subject = sub_folders[0]
except FileNotFoundError:
raise FileNotFoundError(f"The directory {bids_dir} does not exist.")
except ValueError as ve:
raise ve

# build a dictionary with {SubjectLabel: [List of files]}
# run first subject only
subject_dict = build_first_subject_path(bids_dir, subject)

# iterate over the dictionary
for subject, files_list in subject_dict.items():
# logger.info(" ".join(["Processing subject:", subject]))
# create a temporary directory and symlink the data
with tempfile.TemporaryDirectory() as tmpdirname:
for fi in files_list:
# cut the path down to the subject label
bids_start = fi.find(subject)

# maybe it's a single file
if bids_start < 1:
bids_folder = tmpdirname
fi_tmpdir = tmpdirname

else:
bids_folder = Path(fi[bids_start:]).parent
fi_tmpdir = tmpdirname + "/" + str(bids_folder)

if not os.path.exists(fi_tmpdir):
os.makedirs(fi_tmpdir)
output = fi_tmpdir + "/" + str(Path(fi).name)
shutil.copy2(fi, output)

# run the validator
call = build_validator_call(tmpdirname)
ret = run_validator(call)

# Get BIDS validator and schema version
decoded = ret.stdout.decode("UTF-8")
bids_validator_version(decoded, bids_dir, write=write)


def bids_sidecar_merge(from_json, to_json):
"""Merge critical keys from one sidecar to another."""
merge_status = merge_json_into_json(from_json, to_json, raise_on_error=False)
Expand Down

0 comments on commit 38cf11b

Please sign in to comment.