diff --git a/cubids/cli.py b/cubids/cli.py index f87ffc6c..d2f8a920 100644 --- a/cubids/cli.py +++ b/cubids/cli.py @@ -107,6 +107,41 @@ def _enter_validate(argv=None): workflows.validate(**args) +def _parse_bids_version(): + parser = argparse.ArgumentParser( + description="cubids bids-version: Get BIDS Validator and Schema version", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + PathExists = partial(_path_exists, parser=parser) + + parser.add_argument( + "bids_dir", + type=PathExists, + action="store", + help=( + "the root of a BIDS dataset. It should contain " + "sub-X directories and dataset_description.json" + ), + ) + parser.add_argument( + "--write", + action="store_true", + default=False, + help=( + "Save the validator and schema version to 'dataset_description.json' " + "when using `cubids bids-version /bids/path --write`. " + "By default, `cubids bids-version /bids/path` prints to the terminal." + ), + ) + return parser + + +def _enter_bids_version(argv=None): + options = _parse_bids_version().parse_args(argv) + args = vars(options).copy() + workflows.bids_version(**args) + + def _parse_bids_sidecar_merge(): parser = argparse.ArgumentParser( description=("bids-sidecar-merge: merge critical keys from one sidecar to another"), @@ -655,6 +690,7 @@ def _enter_print_metadata_fields(argv=None): COMMANDS = [ ("validate", _parse_validate, workflows.validate), + ("bids-version", _parse_bids_version, workflows.bids_version), ("sidecar-merge", _parse_bids_sidecar_merge, workflows.bids_sidecar_merge), ("group", _parse_group, workflows.group), ("apply", _parse_apply, workflows.apply), diff --git a/cubids/validator.py b/cubids/validator.py index 7fba8138..fe0e08ef 100644 --- a/cubids/validator.py +++ b/cubids/validator.py @@ -6,6 +6,7 @@ import os import pathlib import subprocess +import re import pandas as pd @@ -24,6 +25,22 @@ def build_validator_call(path, ignore_headers=False): return command +def get_bids_validator_version(): + """Get the version of the BIDS validator. + + Returns + ------- + version : :obj:`str` + Version of the BIDS validator. + """ + command = ["deno", "run", "-A", "jsr:@bids/validator", "--version"] + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output = result.stdout.decode("utf-8").strip() + version = output.split()[-1] + clean_ver = re.sub(r'\x1b\[[0-9;]*m', '', version) # Remove ANSI color codes + return {"ValidatorVersion": clean_ver} + + def build_subject_paths(bids_dir): """Build a list of BIDS dirs with 1 subject each.""" bids_dir = str(bids_dir) @@ -52,6 +69,26 @@ def build_subject_paths(bids_dir): return subjects_dict +def build_first_subject_path(bids_dir, subject): + """Build a list of BIDS dirs with 1 subject each.""" + bids_dir = str(bids_dir) + if not bids_dir.endswith("/"): + bids_dir += "/" + + root_files = [x for x in glob.glob(bids_dir + "*") if os.path.isfile(x)] + + subject_dict = {} + + purepath = pathlib.PurePath(subject) + sub_label = purepath.name + + files = [x for x in glob.glob(subject + "**", recursive=True) if os.path.isfile(x)] + files.extend(root_files) + subject_dict[sub_label] = files + + return subject_dict + + def run_validator(call): """Run the validator with subprocess. @@ -103,6 +140,7 @@ def parse_issue(issue_dict): return { "location": issue_dict.get("location", ""), "code": issue_dict.get("code", ""), + "issueMessage": issue_dict.get("issueMessage", ""), "subCode": issue_dict.get("subCode", ""), "severity": issue_dict.get("severity", ""), "rule": issue_dict.get("rule", ""), @@ -114,7 +152,9 @@ def parse_issue(issue_dict): # Extract issues issues = data.get("issues", {}).get("issues", []) if not issues: - return pd.DataFrame(columns=["location", "code", "subCode", "severity", "rule"]) + return pd.DataFrame( + columns=["location", "code", "issueMessage", "subCode", "severity", "rule"] + ) # Parse all issues parsed_issues = [parse_issue(issue) for issue in issues] @@ -135,7 +175,99 @@ def get_val_dictionary(): return { "location": {"Description": "File with the validation issue."}, "code": {"Description": "Code of the validation issue."}, + "issueMessage": {"Description": "Validation issue message."}, "subCode": {"Description": "Subcode providing additional issue details."}, "severity": {"Description": "Severity of the issue (e.g., warning, error)."}, "rule": {"Description": "Validation rule that triggered the issue."}, } + + +def extract_summary_info(output): + """Extract summary information from the JSON output. + + Parameters + ---------- + output : str + JSON string of BIDS validator output. + + Returns + ------- + dict + Dictionary containing SchemaVersion and other summary info. + """ + try: + data = json.loads(output) + except json.JSONDecodeError as e: + raise ValueError("Invalid JSON provided to get SchemaVersion.") from e + + summary = data.get("summary", {}) + + return {"SchemaVersion": summary.get("schemaVersion", "")} + + +def update_dataset_description(path, new_info): + """Update or append information to dataset_description.json. + + Parameters + ---------- + path : :obj:`str` + Path to the dataset. + new_info : :obj:`dict` + Information to add or update. + """ + description_path = os.path.join(path, "dataset_description.json") + + # Load existing data if the file exists + if os.path.exists(description_path): + with open(description_path, "r") as f: + existing_data = json.load(f) + else: + existing_data = {} + + # Update the existing data with the new info + existing_data.update(new_info) + + # Write the updated data back to the file + with open(description_path, "w") as f: + json.dump(existing_data, f, indent=4) + print(f"Updated dataset_description.json at: {description_path}") + + # Check if .datalad directory exists before running the DataLad save command + datalad_dir = os.path.join(path, ".datalad") + if os.path.exists(datalad_dir) and os.path.isdir(datalad_dir): + try: + subprocess.run( + ["datalad", "save", "-m", + "Save BIDS validator and schema version to dataset_description", + description_path], + check=True + ) + print("Changes saved with DataLad.") + except subprocess.CalledProcessError as e: + print(f"Error running DataLad save: {e}") + + +def bids_validator_version(output, path, write=False): + """Save BIDS validator and schema version. + + Parameters + ---------- + output : :obj:`str` + Path to JSON file of BIDS validator output. + path : :obj:`str` + Path to the dataset. + write : :obj:`bool` + If True, write to dataset_description.json. If False, print to terminal. + """ + # Get the BIDS validator version + validator_version = get_bids_validator_version() + # Extract schemaVersion + summary_info = extract_summary_info(output) + + combined_info = {**validator_version, **summary_info} + + if write: + # Update the dataset_description.json file + update_dataset_description(path, combined_info) + elif not write: + print(combined_info) \ No newline at end of file diff --git a/cubids/workflows.py b/cubids/workflows.py index 6cbc1e42..69bed501 100644 --- a/cubids/workflows.py +++ b/cubids/workflows.py @@ -22,6 +22,8 @@ get_val_dictionary, parse_validator_output, run_validator, + build_first_subject_path, + bids_validator_version, ) warnings.simplefilter(action="ignore", category=FutureWarning) @@ -258,6 +260,73 @@ def validate( sys.exit(proc.returncode) +def bids_version( + bids_dir, + write=False +): + """Get BIDS validator and schema version. + + Parameters + ---------- + bids_dir : :obj:`pathlib.Path` + Path to the BIDS directory. + write : :obj:`bool` + If True, write to dataset_description.json. If False, print to terminal. + """ + # Need to run validator to get output with schema version + # Copy code from `validate --sequential` + + try: # return first subject + # Get all folders that start with "sub-" + sub_folders = [ + name + for name in os.listdir(bids_dir) + if os.path.isdir(os.path.join(bids_dir, name)) and name.startswith("sub-") + ] + if not sub_folders: + raise ValueError("No folders starting with 'sub-' found. Please provide a valid BIDS.") + subject = sub_folders[0] + except FileNotFoundError: + raise FileNotFoundError(f"The directory {bids_dir} does not exist.") + except ValueError as ve: + raise ve + + # build a dictionary with {SubjectLabel: [List of files]} + # run first subject only + subject_dict = build_first_subject_path(bids_dir, subject) + + # iterate over the dictionary + for subject, files_list in subject_dict.items(): + # logger.info(" ".join(["Processing subject:", subject])) + # create a temporary directory and symlink the data + with tempfile.TemporaryDirectory() as tmpdirname: + for fi in files_list: + # cut the path down to the subject label + bids_start = fi.find(subject) + + # maybe it's a single file + if bids_start < 1: + bids_folder = tmpdirname + fi_tmpdir = tmpdirname + + else: + bids_folder = Path(fi[bids_start:]).parent + fi_tmpdir = tmpdirname + "/" + str(bids_folder) + + if not os.path.exists(fi_tmpdir): + os.makedirs(fi_tmpdir) + output = fi_tmpdir + "/" + str(Path(fi).name) + shutil.copy2(fi, output) + + # run the validator + call = build_validator_call(tmpdirname) + ret = run_validator(call) + + # Get BIDS validator and schema version + decoded = ret.stdout.decode("UTF-8") + bids_validator_version(decoded, bids_dir, write=write) + + def bids_sidecar_merge(from_json, to_json): """Merge critical keys from one sidecar to another.""" merge_status = merge_json_into_json(from_json, to_json, raise_on_error=False)