diff --git a/test_data_model/config.json b/test_data_model/config.json index 849346140b..453eb8e52f 100644 --- a/test_data_model/config.json +++ b/test_data_model/config.json @@ -1,6 +1,6 @@ { - "results_dir": "/home/aabella/PycharmProjects/data-models/test_data_model/results", + "results_dir": "", "results_dir_help": "This directory will store the results of the tests either one or multiple. It has to be writable by the script", - "download_dir": "/home/aabella/transparentia/CLIENTES/EU/FIWARE/GITHUB/repo_to_test", + "download_dir": "", "download_dir_help": "this directory is use for temporal download of files and removed once finished. Don't point to any directory with valuable content" -} \ No newline at end of file +} diff --git a/test_data_model/master_tests.py b/test_data_model/master_tests.py index 69cff42f0a..7828b865bc 100644 --- a/test_data_model/master_tests.py +++ b/test_data_model/master_tests.py @@ -1,12 +1,30 @@ -import json -import importlib -import sys -import os -import requests -import shutil +################################################################################# +# Licensed to the FIWARE Foundation (FF) under one # +# or more contributor license agreements. The FF licenses this file # +# to you under the Apache License, Version 2.0 (the "License") # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +# Author: Alberto Abella # +################################################################################# +# version 26/02/25 - 1 + +from json import dump, dumps, load +from importlib import import_module +from os.path import join, dirname, exists +from os import makedirs +from requests import get +from shutil import copy, rmtree from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime -import argparse +from argparse import ArgumentParser # Import argparse for command-line argument parsing from pathlib import Path from typing import Dict, Any @@ -14,6 +32,19 @@ def load_config(config_path: str = None) -> Dict[str, Any]: """ Load configuration from a JSON file. + + Loads configuration data from a JSON file, searching default locations if a path isn't provided. + It also validates that required keys are present and converts specific paths to absolute paths. + + Parameters: + config_path (str, optional): The path to the configuration file. If None, default locations are searched. + + Returns: + Dict[str, Any]: The loaded configuration data. + + Raises: + FileNotFoundError: If no configuration file is found. + ValueError: If required keys are missing in the configuration. """ default_locations = [ Path("config.json"), @@ -44,41 +75,114 @@ def load_config(config_path: str = None) -> Dict[str, Any]: def is_url(path): + """ + Check if the provided path is a URL. + + Parameters: + path (str): The path to check. + + Returns: + bool: True if the path is a URL, False otherwise. + """ return path.startswith(("http://", "https://")) def convert_github_url_to_raw(subject_root): + """ + Convert a GitHub repository URL to the corresponding raw file URL. + + Parameters: + subject_root (str): The GitHub repository URL (e.g., https://github.com/smart-data-models/dataModel.Weather/blob/master/WeatherObserved/schema.json). + + Returns: + str: The raw file base URL (e.g., https://raw.githubusercontent.com/smart-data-models/dataModel.Weather/refs/heads/master/WeatherObserved/). + """ try: if "github.com" not in subject_root: raise ValueError("Invalid GitHub repository URL.") if "/blob/" in subject_root: - raw_url = subject_root.replace("github.com", "raw.githubusercontent.com") - return raw_url.replace("/blob/", "/") + return _extracted_from_convert_github_url_to_raw( + subject_root, "/blob/", "/" + ) elif "/tree/" in subject_root: - raw_url = subject_root.replace("github.com", "raw.githubusercontent.com") - return raw_url.replace("/tree/", "/") + return _extracted_from_convert_github_url_to_raw(subject_root, "/tree/", "/") else: - return subject_root.replace("github.com", "raw.githubusercontent.com") + "/master" + parts = subject_root.split('/') + url = '/'.join(parts[:-1]) + '/refs/heads/master/' + parts[-1] + return f"{url.replace("github.com", "raw.githubusercontent.com")}" except Exception as e: - raise ValueError(f"Error converting GitHub URL to raw URL: {e}") + raise ValueError(f"Error converting GitHub URL to raw URL: {e}") from e + + +def _extracted_from_convert_github_url_to_raw(repo_url: str, arg1: str, arg2: str) -> str: + """ + Replace parts of a repository URL to convert it to a raw file URL. + + Replaces specific parts of the input URL with other parts to construct the raw file URL. + This is a helper function used by `convert_github_url_to_raw`. + + Parameters: + repo_url (str): The original repository URL. + arg1 (str): The part of the URL to be replaced. + arg2 (str): The replacement string. + + Returns: + str: The modified URL. + """ + raw_url = repo_url.replace("github.com", "raw.githubusercontent.com") + return raw_url.replace(arg1, arg2) def download_file(url, file_path): + """ + Download a single file from a URL and save it to the specified path. + + Parameters: + url (str): The URL of the file to download. + file_path (str): The path where the file should be saved. + + Returns: + tuple: (file_path, success, message) + """ try: - os.makedirs(os.path.dirname(file_path), exist_ok=True) - response = requests.get(url, timeout=10) + # Ensure the directory structure exists + makedirs(name=dirname(p=file_path), exist_ok=True) + + # Download the file + response = get(url=url, timeout=10) response.raise_for_status() + with open(file_path, "wb") as f: f.write(response.content) - return file_path, True, None + + return file_path, True, "Download successful" except Exception as e: - return file_path, False, str(e) + return file_path, False, f"Error downloading {url}: {e}" + +def download_files(subject_root: str, download_dir: str): + """ + Download or copy files from a URL or local directory. -def download_files(subject_root, download_dir): + Downloads or copies a predefined set of files from a given URL or local directory to a specified download directory. + If the source is a URL, parallel downloads are used. If it's a local path, files are copied. + + Parameters: + subject_root (str): The URL or local path to download/copy files from. + download_dir (str): The directory to save the downloaded/copied files. + + Returns: + str: The path to the download directory. + + Raises: + Exception: If any error occurs during download or copying. + """ try: - os.makedirs(download_dir, exist_ok=True) + # Ensure the download directory exists + makedirs(download_dir, exist_ok=True) + + # List of files to download/copy (adjust as needed) files_to_download = [ "schema.json", "examples/example.json", @@ -90,24 +194,32 @@ def download_files(subject_root, download_dir): ] if is_url(subject_root): - with ThreadPoolExecutor(max_workers=5) as executor: + # Download files from a URL using parallel threads + with ThreadPoolExecutor(max_workers=5) as executor: # Adjust max_workers as needed futures = [] for file in files_to_download: file_url = f"{subject_root.rstrip('/')}/{file}" - file_path = os.path.join(download_dir, file) + file_path = join(download_dir, file) + futures.append(executor.submit(download_file, file_url, file_path)) + # Wait for all downloads to complete and check for errors for future in as_completed(futures): file_path, success, message = future.result() if not success and message: raise Exception(message) else: + # Copy files from a local directory (no parallelization needed) for file in files_to_download: - src_path = os.path.join(subject_root, file) - dest_path = os.path.join(download_dir, file) - os.makedirs(os.path.dirname(dest_path), exist_ok=True) - if os.path.exists(src_path): - shutil.copy(src_path, dest_path) + src_path = join(subject_root, file) + dest_path = join(download_dir, file) + + # Ensure the directory structure exists + makedirs(name=dirname(p=dest_path), exist_ok=True) + + # Copy the file + if exists(src_path): + copy(src_path, dest_path) else: raise Exception(f"File not found: {src_path}") @@ -116,13 +228,29 @@ def download_files(subject_root, download_dir): raise Exception(f"Error downloading/copying files: {e}") -def run_tests(test_files, repo_to_test, only_report_errors, options): +def run_tests(test_files: list, repo_to_test: str, only_report_errors: bool, options: dict) -> dict: + """ + Run a series of tests on a file. + + Parameters: + test_files (list): List of test module names (e.g., ["test_valid_json", "test_file_exists"]). + repo_to_test (str): The path to the directory where the files are located. + only_report_errors (bool): Whether to include only failed tests in the results. + options (dict): Additional options for the tests (e.g., {"published": True, "private": False}). + + Returns: + dict: Results of the tests. + """ + results = {} for test_file in test_files: try: - module = importlib.import_module(f"tests.{test_file}") + # Import the test module + module = import_module(f"tests.{test_file}") + # Run the test function (assumes the function name is the same as the module name without 'test_') test_function = getattr(module, test_file) test_name, success, message = test_function(repo_to_test, options) + # Include the test result only if it failed or if only_report_errors is False if not only_report_errors or not success: results[test_file] = { "test_name": test_name, @@ -135,94 +263,160 @@ def run_tests(test_files, repo_to_test, only_report_errors, options): "success": False, "message": f"Error running test: {e}" } + return results def main(): - result = { - "success": False, - "error": None, - "test_results": None, - "metadata": { - "timestamp": datetime.now().isoformat() - } - } + """ + Main function to parse command-line arguments and run quality analysis. + + Parses command-line arguments for repository URL, email, reporting options, and additional settings. + Then, it runs the quality analysis process using the provided parameters. + Returns: + dict | None: The result of the quality analysis, or None if an error occurred. + """ try: - config = load_config() - results_dir = config['results_dir'] - download_dir = config['download_dir'] + parser = ArgumentParser(description="Run tests on a repository.") - Path(results_dir).mkdir(parents=True, exist_ok=True) - Path(download_dir).mkdir(parents=True, exist_ok=True) + # Mandatory arguments + parser.add_argument("subject_root", type=str, help="The subject root of the repository to check.") + parser.add_argument("email", type=str, help="The email address for reporting results.") + parser.add_argument("only_report_errors", type=str, help="Whether to report only errors (true/false or 1/0).") - parser = argparse.ArgumentParser() - parser.add_argument("subject_root", type=str) - parser.add_argument("email", type=str) - parser.add_argument("only_report_errors", type=str) - parser.add_argument("--published", type=str, default="false") - parser.add_argument("--private", type=str, default="false") - parser.add_argument("--output", type=str, default=None) + # Optional arguments + parser.add_argument("--published", type=str, help="Whether the model is officially published (true/false or 1/0).", default="false") + parser.add_argument("--private", type=str, help="Whether the model is private (true/false or 1/0).", default="false") + parser.add_argument("--output", type=str, help="Additional output file path for the test results.", default=None) + # Parse arguments args = parser.parse_args() - if not args.email or "@" not in args.email: - raise ValueError("Missing or invalid email address") - + # Convert string arguments to appropriate types only_report_errors = args.only_report_errors.lower() in ("true", "1") published = args.published.lower() in ("true", "1") private = args.private.lower() in ("true", "1") + output_file = args.output - if is_url(args.subject_root): - raw_base_url = convert_github_url_to_raw(args.subject_root) - else: - raw_base_url = args.subject_root + # Validate the email (basic check) + if not args.email or "@" not in args.email: + raise ValueError("Missing or invalid email address") - repo_path = download_files(raw_base_url, download_dir) + return quality_analysis(base_url=args.subject_root, + published=published, + private=private, + only_report_errors=only_report_errors, + email=args.email, + output_file=output_file) + except Exception as e: + Exception(f"Error analyzing the data model: {e}") - test_files = [ - "test_file_exists", - "test_valid_json", - "test_yaml_files", - "test_schema_descriptions", - "test_schema_metadata", - "test_string_incorrect", - "test_valid_keyvalues_examples", - "test_valid_ngsiv2", - "test_valid_ngsild", - "test_duplicated_attributes", - "test_array_object_structure" - ] - test_results = run_tests(test_files, repo_path, only_report_errors, { +def quality_analysis(base_url: str, email: str, only_report_errors: bool, published: bool =False, + private: bool =False, output_file: str =None) -> dict | None: + """ + Performs quality analysis on a data model repository. + + Downloads or copies the repository files, runs a series of tests, and saves the results to a JSON file. + The analysis can be customized to report only errors and to handle both published and private repositories. + + Parameters: + base_url (str): The URL or local path of the repository. + email (str): The email address for reporting results. + only_report_errors (bool): Whether to report only errors. + published (bool, optional): Whether the model is published. Defaults to False. + private (bool, optional): Whether the model is private. Defaults to False. + output_file (str, optional): An additional output file path. Defaults to None. + + Returns: + dict | None: A dictionary containing the analysis results, or None if an error occurred. + + Raises: + ValueError: If the email address is missing or invalid, or if the GitHub URL is invalid. + Exception: If any error occurs during file download, copying, or test execution. + """ + result = { + "success": False, + "error": None, + "test_results": None, + "metadata": { + "timestamp": datetime.now().isoformat() + } + } + + # Validate the subject_root, if the input is a URL, convert it to a raw file base URL + if is_url(base_url): + raw_base_url = convert_github_url_to_raw(base_url) + else: + raw_base_url = base_url + + config = load_config() + results_dir = config['results_dir'] + download_dir = config['download_dir'] + + Path(results_dir).mkdir(parents=True, exist_ok=True) + Path(download_dir).mkdir(parents=True, exist_ok=True) + + try: + # Download or copy the files + repo_path = download_files(raw_base_url, download_dir) + + # List of test files to run + test_files = ["test_file_exists", + "test_valid_json", + "test_yaml_files", + "test_schema_descriptions", + "test_schema_metadata", + "test_string_incorrect", + "test_valid_keyvalues_examples", + "test_valid_ngsiv2", + "test_valid_ngsild", + "test_duplicated_attributes", + "test_array_object_structure" + ] + + # Create options dictionary + options = { "published": published, "private": private - }) + } + + # Run the tests with the options object + test_results = run_tests(test_files=test_files, + repo_to_test=repo_path, + only_report_errors=only_report_errors, + options=options) - test_results["email"] = args.email - result.update({ - "success": True, - "test_results": test_results - }) + # Add email to the results + test_results["email"] = email - email_name = args.email.replace("@", "_at_") - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - filename = f"{results_dir}/{timestamp}_{email_name}.json" + # Display the results + result |= {"success": True, "test_results": test_results} + + # Save a file with the results + email_name = email.replace("@", "_at_") + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + filename = f"{results_dir}/test_results_{timestamp}_{email_name}.json" with open(filename, "w") as f: - json.dump(test_results, f, indent=4) + dump(test_results, f, indent=4) - if args.output: - with open(args.output, "w") as f: - json.dump(test_results, f, indent=4) + # Save an additional copy of the results if --output is provided + if output_file: + with open(output_file, "w") as f: + dump(test_results, f, indent=4) except Exception as e: result["error"] = str(e) finally: - if 'download_dir' in locals() and os.path.exists(download_dir): - shutil.rmtree(download_dir) + # Clean up the temporary directory + if 'download_dir' in locals() and exists(download_dir): + rmtree(download_dir) # Ensure we always print valid JSON - print(json.dumps(result, indent=2)) + print(dumps(result, indent=2)) + + return result if __name__ == "__main__": diff --git a/test_data_model/multiple_tests.py b/test_data_model/multiple_tests.py index a60bc33c35..79d8ff07c9 100644 --- a/test_data_model/multiple_tests.py +++ b/test_data_model/multiple_tests.py @@ -16,11 +16,11 @@ ################################################################################# # version 26/02/25 - 1 -import sys -import subprocess -import requests +from sys import argv +from json import dump +from requests import get from datetime import datetime -import json +from master_tests import quality_analysis def get_subdirectories(subject_root): @@ -33,32 +33,65 @@ def get_subdirectories(subject_root): Returns: list: List of subdirectory names. """ + # Extract the owner and repo name from the URL + api_url = get_api_url(subject_root=subject_root) + try: - # Extract the owner, repo, branch, and root directory from the subject_root - parts = subject_root.strip("/").split("/") + # Fetch the contents of the root directory + response = get(api_url) + if response.status_code != 200: + raise Exception(f"Failed to fetch directory contents: HTTP {response.status_code}") + + contents = response.json() + return [item['name'] for item in contents if item['type'] == 'dir'] + except Exception as e: + raise Exception(f"Error fetching subdirectories: {e}") from e + + +def get_api_url(subject_root: str) -> str: + """ + Construct the GitHub API URL to fetch the contents of a directory. + + Constructs the URL based on the provided subject_root, which can point to either the master branch or a specific branch/commit. + The URL is used to retrieve directory contents from the GitHub API. + + Parameters: + subject_root (str): The URL of the GitHub repository, including the root directory. + + Returns: + str: The GitHub API URL. + + Raises: + ValueError: If the subject_root URL is invalid. + """ + # Extract the owner and repo name from the URL + parts = subject_root.strip("/").split("/") + + owner = parts[3] # e.g., "smart-data-models" + repo = parts[4] # e.g., "incubated" + + if 'tree' in parts: if len(parts) < 7: raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.") - owner = parts[3] # e.g., "smart-data-models" - repo = parts[4] # e.g., "incubated" branch = parts[6] # e.g., "d7b7b48f03b9b221d141e074e1d311985ab04f25" root_directory = "/".join(parts[7:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance" # GitHub API URL to list contents of the root directory api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref={branch}" + else: + if len(parts) < 5: + raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.") - # Fetch the contents of the root directory - response = requests.get(api_url) - if response.status_code == 200: - contents = response.json() - # Filter out only directories - subdirectories = [item['name'] for item in contents if item['type'] == 'dir'] - return subdirectories - else: - raise Exception(f"Failed to fetch directory contents: HTTP {response.status_code}") - except Exception as e: - raise Exception(f"Error fetching subdirectories: {e}") -def run_master_tests(subject_root, subdirectory, email, only_report_errors): + root_directory = "/".join(parts[5:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance" + + # GitHub API URL to list contents of the root directory + api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref=master" + + return api_url + + +def run_master_tests(subject_root: str, subdirectory: str, email:str, only_report_errors: bool) -> dict: """ Run the master_tests.py script for a specific subdirectory. @@ -74,67 +107,54 @@ def run_master_tests(subject_root, subdirectory, email, only_report_errors): try: # Construct the full URL to the subdirectory # Remove any trailing slashes and append the subdirectory - print("before directory") - print(subject_root) subject_root = subject_root.rstrip("/") - print(subdirectory) subdirectory_url = f"{subject_root}/{subdirectory}" print(f"Testing subdirectory: {subdirectory_url}") # Run the master_tests.py script - result = subprocess.run( - [ - "python3", "master_tests.py", - subdirectory_url, - email, - "1" if only_report_errors else "0" - ], - capture_output=True, - text=True - ) - - # Parse the output as JSON - return json.loads(result.stdout) + result = quality_analysis(base_url=subdirectory_url, + email=email, + only_report_errors=only_report_errors) + + return result except Exception as e: print(f"Error running tests for {subdirectory}: {e}") return {"error": str(e)} + def main(): - if len(sys.argv) != 4: + """ + Main function to execute tests on multiple subdirectories of a GitHub repository. + + Retrieves the subdirectories from the specified GitHub repository URL and runs quality analysis for each subdirectory. + The results are then saved to a JSON file. + """ + if len(argv) != 4: print("Usage: python3 multiple_tests.py ") - sys.exit(1) + exit(1) - ### remove - print(sys.argv[1]) - subject_root = sys.argv[1] - email = sys.argv[2] - only_report_errors = sys.argv[3].lower() == "true" + subject_root = argv[1] + email = argv[2] + only_report_errors = argv[3].lower() == "true" # Get the list of subdirectories subdirectories = get_subdirectories(subject_root) + # Run tests for each subdirectory and collect results results = [] - print(subdirectories) - for subdirectory in subdirectories: - print(f"Running tests for {subdirectory}...") - test_result = run_master_tests(subject_root, subdirectory, email, only_report_errors) - ### remove - print(test_result) - # for item in test_result: - # print(item) - # item["datamodel"] = subdirectory - results.append({ - "datamodel": subdirectory, - "result": test_result - }) + results = \ + [{"datamodel": subdirectory, + "result": run_master_tests(subject_root, subdirectory, email, only_report_errors)} + for subdirectory in subdirectories] # Save the results to a JSON file - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"test_results_{timestamp}.json" with open(output_filename, "w") as f: - json.dump(results, f, indent=4) + dump(results, f, indent=4) print(f"Test results saved to {output_filename}") + if __name__ == "__main__": main() \ No newline at end of file diff --git a/test_data_model/requirements.txt b/test_data_model/requirements.txt new file mode 100644 index 0000000000..c52406d2e8 --- /dev/null +++ b/test_data_model/requirements.txt @@ -0,0 +1,6 @@ +# Python3.13 project +requests==2.32.3 +pyyaml==6.0.2 +jsonpointer==3.0.0 +jsonschema==4.23.0 +jsonref==1.1.0 diff --git a/test_data_model/tests/test_array_object_structure.py b/test_data_model/tests/test_array_object_structure.py index 26f3110b94..e0eb06ed9d 100644 --- a/test_data_model/tests/test_array_object_structure.py +++ b/test_data_model/tests/test_array_object_structure.py @@ -15,9 +15,9 @@ # Author: Alberto Abella # ################################################################################# # version 26/02/25 - 1 -import json -import os -import requests +from json import load, JSONDecodeError +from os.path import join +from requests import get from urllib.parse import urljoin from jsonpointer import resolve_pointer @@ -34,10 +34,7 @@ def resolve_ref(repo_path, ref, base_uri=""): dict: The resolved schema fragment. """ try: - if "#" in ref: - url_part, pointer_part = ref.split("#", 1) - else: - url_part, pointer_part = ref, "" + url_part, pointer_part = ref.split("#", 1) if "#" in ref else (ref, "") if url_part.startswith("http"): # External reference (absolute URL) @@ -48,28 +45,24 @@ def resolve_ref(repo_path, ref, base_uri=""): else: # Local reference within the same file # Use the base URI to determine the file name - if base_uri: - resolved_url = base_uri - else: - # Fallback to the primary schema file in the repo path - resolved_url = os.path.join(repo_path, "schema.json") - + resolved_url = base_uri or join(repo_path, "schema.json") + # Fetch the schema if resolved_url.startswith("http"): - response = requests.get(resolved_url) + response = get(resolved_url) if response.status_code != 200: raise ValueError(f"Failed to fetch external schema from {resolved_url}") schema = response.json() else: with open(resolved_url, 'r') as file: - schema = json.load(file) + schema = load(file) # Resolve the JSON Pointer if it exists if pointer_part: try: schema = resolve_pointer(schema, pointer_part) except Exception as e: - raise ValueError(f"Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}") + raise ValueError(f"Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}") from e # Recursively resolve any nested $refs in the resolved schema # Use the resolved URL as the base URI for nested $refs @@ -77,7 +70,7 @@ def resolve_ref(repo_path, ref, base_uri=""): return schema except Exception as e: - raise ValueError(f"Error resolving reference {ref}: {e}") + raise ValueError(f"Error resolving reference {ref}: {e}") from e def resolve_nested_refs(schema, base_uri): """ @@ -86,16 +79,16 @@ def resolve_nested_refs(schema, base_uri): if isinstance(schema, dict): if "$ref" in schema: return resolve_ref("", schema["$ref"], base_uri) - else: - for key, value in schema.items(): - schema[key] = resolve_nested_refs(value, base_uri) + + for key, value in schema.items(): + schema[key] = resolve_nested_refs(value, base_uri) elif isinstance(schema, list): for i, item in enumerate(schema): schema[i] = resolve_nested_refs(item, base_uri) return schema -def validate_properties(repo_path, properties, base_uri, path="", success=True, output=[]): +def validate_properties(repo_path, properties, base_uri, path="", success=True, output=None): """ Recursively validate properties in the schema, ensuring that arrays have 'items' and objects have 'properties'. @@ -110,6 +103,9 @@ def validate_properties(repo_path, properties, base_uri, path="", success=True, Returns: tuple: (success: bool, output: list) """ + if output is None: + output = [] + for key, value in properties.items(): current_path = f"{path}.{key}" if path else key @@ -135,9 +131,24 @@ def validate_properties(repo_path, properties, base_uri, path="", success=True, # Recursively check nested properties if "properties" in value and isinstance(value["properties"], dict): - success, output = validate_properties(repo_path, value["properties"], base_uri, current_path + ".", success, output) + success, output = validate_properties( + repo_path, + value["properties"], + base_uri, + f"{current_path}.", + success, + output, + ) + if "items" in value and isinstance(value["items"], dict): - success, output = validate_properties(repo_path, value["items"], base_uri, current_path + ".", success, output) + success, output = validate_properties( + repo_path, + value["items"], + base_uri, + f"{current_path}.", + success, + output, + ) return success, output @@ -159,7 +170,7 @@ def test_array_object_structure(repo_path, options): try: with open(f"{repo_path}/schema.json", 'r') as file: - schema = json.load(file) + schema = load(file) base_uri = schema.get("$id", "") # Use $id as the base URI for resolving relative $refs @@ -171,7 +182,7 @@ def test_array_object_structure(repo_path, options): elif "properties" in schema and isinstance(schema["properties"], dict): success, output = validate_properties(repo_path, schema["properties"], base_uri, "", success, output) - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** schema.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_duplicated_attributes.py b/test_data_model/tests/test_duplicated_attributes.py index 0d8f9c7635..b1db49321b 100644 --- a/test_data_model/tests/test_duplicated_attributes.py +++ b/test_data_model/tests/test_duplicated_attributes.py @@ -15,11 +15,11 @@ # Author: Alberto Abella # ################################################################################# # version 26/02/25 - 1 -import json -import os -import jsonref -import urllib.request -import urllib.parse +from json import dumps, load +from os.path import join, exists, dirname, abspath +from jsonref import loads +from urllib.request import pathname2url +from urllib.parse import urljoin def extract_attributes_from_payload(payload, parent_path=""): @@ -87,22 +87,22 @@ def test_duplicated_attributes(repo_to_test, options): success (bool): True if all attributes are defined, False otherwise. output (list): List of messages describing the results of the test. """ - schema_file = os.path.join(repo_to_test, "schema.json") - payload_file = os.path.join(repo_to_test, "examples/example.json") + schema_file = join(repo_to_test, "schema.json") + payload_file = join(repo_to_test, "examples/example.json") - if not os.path.exists(schema_file): + if not exists(schema_file): return "Checking that all payload attributes are defined in the schema", False, ["Schema file not found."] - if not os.path.exists(payload_file): + if not exists(payload_file): return "Checking that all payload attributes are defined in the schema", False, ["Payload file not found."] # Normalize the base URI to ensure proper resolution of references - schema_dir = os.path.dirname(os.path.abspath(schema_file)) - base_uri = urllib.parse.urljoin('file:', urllib.request.pathname2url(schema_dir)) + schema_dir = dirname(abspath(schema_file)) + base_uri = urljoin('file:', pathname2url(schema_dir)) # Load the schema and fully resolve all $ref references using jsonref with open(schema_file, 'r') as f: - schema = jsonref.loads( - json.dumps(json.load(f)), + schema = loads( + dumps(load(f)), base_uri=base_uri, lazy_load=False, load_on_repr=True @@ -110,7 +110,7 @@ def test_duplicated_attributes(repo_to_test, options): # Load the payload with open(payload_file, 'r') as f: - payload = json.load(f) + payload = load(f) output = [] @@ -124,17 +124,20 @@ def test_duplicated_attributes(repo_to_test, options): # Check for attributes in the payload that are not in the schema undefined_attributes = [] - for attribute in payload_attributes: - if attribute not in schema_attributes: - undefined_attributes.append(attribute) + undefined_attributes.extend( + attribute + for attribute in payload_attributes + if attribute not in schema_attributes + ) if undefined_attributes: output.append("The following attributes in the payload are not defined in the schema:") - for attribute in sorted(undefined_attributes): - output.append(f"*** Attribute '{attribute}' in the payload is not defined in the schema.") - + output.extend( + f"*** Attribute '{attribute}' in the payload is not defined in the schema." + for attribute in sorted(undefined_attributes) + ) # Determine if the test was successful - success = len(undefined_attributes) == 0 + success = not undefined_attributes test_name = "Checking that all payload attributes are defined in the schema" return test_name, success, output \ No newline at end of file diff --git a/test_data_model/tests/test_file_exists.py b/test_data_model/tests/test_file_exists.py index 4b4d2d2413..2c3a4da2ee 100644 --- a/test_data_model/tests/test_file_exists.py +++ b/test_data_model/tests/test_file_exists.py @@ -15,7 +15,8 @@ # Author: Alberto Abella # ################################################################################# # version 26/02/25 - 1 -import os +#import os +from os.path import join, exists def test_file_exists(repo_path, options): """ @@ -52,8 +53,8 @@ def test_file_exists(repo_path, options): # Check if each mandatory file exists for file in mandatory_files: - path_to_file = os.path.join(repo_path, file) - exist_file = os.path.exists(path_to_file) + path_to_file = join(repo_path, file) + exist_file = exists(path_to_file) success = success and exist_file if exist_file: diff --git a/test_data_model/tests/test_schema_descriptions.py b/test_data_model/tests/test_schema_descriptions.py index c47f549322..9aad63da3d 100644 --- a/test_data_model/tests/test_schema_descriptions.py +++ b/test_data_model/tests/test_schema_descriptions.py @@ -15,25 +15,32 @@ # Author: Alberto Abella # ################################################################################# # version 28/02/25 - 1 -import json -import os -import requests +from json import load +from os.path import join, exists +from requests import get from urllib.parse import urljoin from jsonpointer import resolve_pointer +from itertools import product def validate_description(description): """ - Validate that the description follows the required format. - - The description must include a mandatory NGSI type (Property, GeoProperty, or Relationship). - - The NGSI type must not contain extra spaces. - - Optional elements (Model, Units, Enum, Privacy, Multilingual) must follow the format Key:'value'. - - The description must be at least 15 characters long. - """ + Validate the description of a schema property. + - The description must include a mandatory NGSI type (Property, GeoProperty, or Relationship). + - The NGSI type must not contain extra spaces. + - Optional elements (Model, Units, Enum, Privacy, Multilingual) must follow the format Key:'value'. + - The description must be at least 15 characters long. + + Parameters: + description (str): The description string to validate. + + Returns: + tuple: A tuple containing a boolean indicating whether the description is valid and a message explaining the validation result. + """ if len(description) < 15: return False, "*** Description must be at least 15 characters long." - parts = [part for part in description.split(". ")] + parts = list(description.split(". ")) valid_ngsi_types = ["Property", "GeoProperty", "Relationship", "LanguageProperty", "ListProperty"] ngsi_type_found = None @@ -43,44 +50,56 @@ def validate_description(description): break if not ngsi_type_found: - for part in parts: - for ngsi_type in valid_ngsi_types: - if ngsi_type in part and part != ngsi_type: - return False, f"NGSI type '{part}' contains extra characters." - return False, "*** NGSI type is not described. Must be one of: Property, GeoProperty, Relationship, LanguageProperty, ListProperty" - + return next( + ( + (False, f"NGSI type '{part}' contains extra characters.") + for part, ngsi_type in product(parts, valid_ngsi_types) + if ngsi_type in part and part != ngsi_type + ), + ( + False, + "*** NGSI type is not described. Must be one of: Property, GeoProperty, Relationship, LanguageProperty, ListProperty", + ), + ) + if ngsi_type_found.strip() != ngsi_type_found: return False, f"*** NGSI type '{ngsi_type_found}' contains extra spaces." optional_keys = ["Model:", "Units:", "Enum:", "Privacy:", "Multilingual"] - for part in parts: - for key in optional_keys: - if part.startswith(key): - if not part[len(key):].startswith("'"): - return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." - if not part.endswith("'"): - return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." + for part, key in product(parts, optional_keys): + if part.startswith(key): + if not part[len(key):].startswith("'"): + return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." + if not part.endswith("'"): + return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." return True, "Description is valid." def resolve_ref(ref, base_uri): """ - Resolve a $ref to its external schema and return the referenced schema. - Handles both remote URLs and JSON Pointers, and recursively resolves nested $refs. - JSON Pointers (starting with #) are resolved relative to the schema being referenced. + Resolve a JSON Schema $ref to its corresponding schema and return the referenced schema. + - Handles both local and remote references, resolving JSON Pointers if present. + - Recursively resolves nested $refs within the resolved schema. + + Parameters: + ref (str): The JSON Schema $ref string. + base_uri (str): The base URI to resolve relative references against. + + Returns: + dict: The resolved schema. + + Raises: + ValueError: If the reference cannot be resolved or if an error occurs during resolution. """ - if "#" in ref: - url_part, pointer_part = ref.split("#", 1) - else: - url_part, pointer_part = ref, "" + url_part, pointer_part = ref.split("#", 1) if "#" in ref else (ref, "") if url_part.startswith("http"): resolved_url = url_part else: resolved_url = urljoin(base_uri, url_part) - response = requests.get(resolved_url) + response = get(resolved_url) if response.status_code != 200: raise ValueError(f"*** Failed to fetch external schema from {resolved_url}") @@ -91,7 +110,9 @@ def resolve_ref(ref, base_uri): # Resolve the JSON Pointer relative to the fetched schema schema = resolve_pointer(schema, pointer_part) except Exception as e: - raise ValueError(f"*** Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}") + raise ValueError( + f"*** Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}" + ) from e # Recursively resolve any nested $refs in the resolved schema schema = resolve_nested_refs(schema, resolved_url if url_part else base_uri) @@ -101,14 +122,24 @@ def resolve_ref(ref, base_uri): def resolve_nested_refs(schema, base_uri): """ - Recursively resolve any nested $refs in the schema. + Recursively resolve nested JSON Schema $refs within a schema. + + Traverses the schema object and resolves any $ref properties to their corresponding schemas. + Handles both dictionaries and lists. + + Parameters: + schema (dict or list): The schema object to resolve references within. + base_uri (str): The base URI to resolve relative references against. + + Returns: + dict or list: The schema with all nested $refs resolved. """ if isinstance(schema, dict): if "$ref" in schema: return resolve_ref(schema["$ref"], base_uri) - else: - for key, value in schema.items(): - schema[key] = resolve_nested_refs(value, base_uri) + + for key, value in schema.items(): + schema[key] = resolve_nested_refs(value, base_uri) elif isinstance(schema, list): for i, item in enumerate(schema): schema[i] = resolve_nested_refs(item, base_uri) @@ -118,8 +149,19 @@ def resolve_nested_refs(schema, base_uri): def check_property_descriptions(properties, base_uri, output, path="", processed_refs=None): """ - Recursively check descriptions for all properties, including nested ones and arrays. - Keeps track of processed references to avoid duplicate processing. + Recursively checks descriptions for all properties in a schema. + + This function traverses the properties of a schema, including nested properties within objects and arrays, + and validates their descriptions against predefined criteria. It handles $ref references, resolving them + to check descriptions in external or referenced schemas. It also checks for descriptions in array items + and anyOf properties. + + Parameters: + properties (dict): The properties object from the schema. + base_uri (str): The base URI for resolving $ref references. + output (list): A list to store the output messages. + path (str, optional): The current path within the schema being checked. Defaults to "". + processed_refs (set, optional): A set to keep track of processed $refs to avoid infinite recursion. Defaults to None. """ if processed_refs is None: processed_refs = set() @@ -141,8 +183,12 @@ def check_property_descriptions(properties, base_uri, output, path="", processed try: ref_schema = resolve_ref(ref, base_uri) if "properties" in ref_schema: - check_property_descriptions(ref_schema["properties"], base_uri, output, current_path, - processed_refs) + check_property_descriptions(properties=ref_schema["properties"], + base_uri=base_uri, + output=output, + path=current_path, + processed_refs=processed_refs) + if "description" in ref_schema: description = ref_schema["description"] is_valid, message = validate_description(description) @@ -154,6 +200,7 @@ def check_property_descriptions(properties, base_uri, output, path="", processed output.append(f"*** The attribute '{current_path}' is missing a description.") except ValueError as e: output.append(f"*** Error resolving $ref for property '{current_path}': {e}") + continue # Check description for the current property @@ -248,21 +295,27 @@ def check_property_descriptions(properties, base_uri, output, path="", processed else: output.append(f"The attribute '{current_path}.items' is properly documented.") - + def test_schema_descriptions(repo_to_test, options): """ - Test that all elements in the schema.json file include a description and that the description is valid. + Test the descriptions in a JSON Schema. + + This test checks if a schema.json file exists and validates the descriptions of all properties within the schema, + including nested properties and properties referenced via $ref. It ensures that descriptions meet certain criteria, + such as minimum length and the inclusion of specific elements (e.g., NGSI type, Model, Units). + + Parameters: + repo_to_test (str): Path to the repository being tested. + Returns: - test_name (str): Name of the test. - success (bool): True if all descriptions are valid, False otherwise. - output (list): List of messages describing the results of the test. + tuple: A tuple containing the test name, a boolean indicating success or failure, and a list of output messages. """ - schema_file = os.path.join(repo_to_test, "schema.json") - if not os.path.exists(schema_file): + schema_file = join(repo_to_test, "schema.json") + if not exists(schema_file): return "Checking that the schema is properly described in all its attributes", False, ["Schema file not found."] with open(schema_file, 'r') as f: - schema = json.load(f) + schema = load(f) output = [] base_uri = schema.get("$id", "") diff --git a/test_data_model/tests/test_schema_metadata.py b/test_data_model/tests/test_schema_metadata.py index 255c1aa8d2..72214c8a10 100644 --- a/test_data_model/tests/test_schema_metadata.py +++ b/test_data_model/tests/test_schema_metadata.py @@ -16,9 +16,9 @@ ################################################################################# # version 26/02/25 - 1 -import json -import re -import requests +from json import load, JSONDecodeError +from re import compile +from requests import get, RequestException def test_schema_metadata(repo_path, options): """ @@ -35,7 +35,8 @@ def test_schema_metadata(repo_path, options): - it has a license (even if it is empty) just a warning Parameters: - file_path (str): The path to the schema.json file. + repo_path (str): The path to the schema.json file. + options (dict): The options passed to the requests library. Returns: tuple: (success: bool, message: str) @@ -55,7 +56,7 @@ def test_schema_metadata(repo_path, options): try: with open(f"{repo_path}/schema.json", 'r') as file: - schema = json.load(file) + schema = load(file) # Check for $schema and validate its value if "$schema" not in schema: @@ -64,7 +65,8 @@ def test_schema_metadata(repo_path, options): else: if schema["$schema"] != "https://json-schema.org/draft/2020-12/schema": success = False - output.append(f"*** $schema is not pointing to https://json-schema.org/draft/2020-12/schema (found: {schema['$schema']})") + output.append(f"*** $schema is not pointing to https://json-schema.org/draft/2020-12/schema " + f"(found: {schema['$schema']})") else: output.append("$schema is valid") @@ -83,7 +85,7 @@ def test_schema_metadata(repo_path, options): success = False output.append("*** $schemaVersion is missing") else: - version_pattern = re.compile(r"^\d{1,2}\.\d{1,2}\.\d{1,2}$") + version_pattern = compile(r"^\d{1,2}\.\d{1,2}\.\d{1,2}$") if not version_pattern.match(schema["$schemaVersion"]): success = False output.append(f"*** $schemaVersion is not in the correct format (XX.XX.XX) (found: {schema['$schemaVersion']})") @@ -94,23 +96,21 @@ def test_schema_metadata(repo_path, options): if "title" not in schema: success = False output.append("*** title is missing") + elif len(schema["title"]) < minTitleLength: + success = False + output.append(f"*** title is too short (minimum {minTitleLength} characters) (found: {len(schema['title'])} characters)") else: - if len(schema["title"]) < minTitleLength: - success = False - output.append(f"*** title is too short (minimum {minTitleLength} characters) (found: {len(schema['title'])} characters)") - else: - output.append("title is valid") + output.append("title is valid") # Check for description and ensure it is at least 50 characters long if "description" not in schema: success = False output.append("*** description is missing") + elif len(schema["description"]) < minDescriptionLength: + success = False + output.append(f"*** description is too short (minimum {minDescriptionLength} characters) (found: {len(schema['description'])} characters)") else: - if len(schema["description"]) < minDescriptionLength: - success = False - output.append(f"*** description is too short (minimum {minDescriptionLength} characters) (found: {len(schema['description'])} characters)") - else: - output.append("description is valid") + output.append("description is valid") # Check for $id and validate that it points to a real site if "$id" not in schema: @@ -118,7 +118,7 @@ def test_schema_metadata(repo_path, options): output.append("*** $id is missing") else: try: - response = requests.get(schema["$id"]) + response = get(schema["$id"]) if response.status_code != 200: if unpublished: success = True @@ -129,7 +129,7 @@ def test_schema_metadata(repo_path, options): output.append(f"*** $id does not point to a valid site (status code: {response.status_code})") else: output.append("$id is valid and points to a real site") - except requests.RequestException as e: + except RequestException as e: success = False output.append(f"*** $id is not reachable: {e}") @@ -171,7 +171,7 @@ def test_schema_metadata(repo_path, options): else: output.append("license is present and not empty") - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** schema.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_string_incorrect.py b/test_data_model/tests/test_string_incorrect.py index b2dd57831d..ed80ebb545 100644 --- a/test_data_model/tests/test_string_incorrect.py +++ b/test_data_model/tests/test_string_incorrect.py @@ -16,7 +16,7 @@ ################################################################################# # version 26/02/25 - 1 -import json +from json import load, JSONDecodeError def test_string_incorrect(repo_path, options): """ @@ -40,7 +40,7 @@ def test_string_incorrect(repo_path, options): try: with open(f"{repo_path}/schema.json", 'r') as file: - schema = json.load(file) + schema = load(file) def validate_properties(properties, path=""): nonlocal success @@ -58,7 +58,7 @@ def validate_properties(properties, path=""): if "properties" in schema and isinstance(schema["properties"], dict): validate_properties(schema["properties"]) - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** schema.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_valid_json.py b/test_data_model/tests/test_valid_json.py index 78dc790f84..481bb2d113 100644 --- a/test_data_model/tests/test_valid_json.py +++ b/test_data_model/tests/test_valid_json.py @@ -16,7 +16,7 @@ ################################################################################# # version 26/02/25 - 1 -import json +from json import load, JSONDecodeError def test_valid_json(file_path, options): """ @@ -29,29 +29,25 @@ def test_valid_json(file_path, options): tuple: (success: bool, message: str) """ test_name = "Checking that the mandatory json files are valid json files" - mandatory_json_files = ["schema.json", "examples/example.json", "examples/example-normalized.json", "examples/example.jsonld", "examples/example-normalized.jsonld"] + mandatory_json_files = ["schema.json", + "examples/example.json", + "examples/example-normalized.json", + "examples/example.jsonld", + "examples/example-normalized.jsonld" + ] success = True output = [] - # Example usage of the options parameter (optional, for future flexibility) -# if options.get("published", False): -# unpublished = True -# if options.get("private", False): -# output.append("This is a private model.") - - - for file in mandatory_json_files: - try: - local_path = file_path + "/" + file + local_path = f"{file_path}/{file}" # print(f"The local path to the file is {local_path}") with open(local_path, 'r') as local_file: - json.load(local_file) + load(local_file) success = success and True output.append(f"file {file} is a valid json") - except json.JSONDecodeError as e: + except JSONDecodeError as e: success = success and False output.append(f"*** file {file} is NOT a valid json") diff --git a/test_data_model/tests/test_valid_keyvalues_examples.py b/test_data_model/tests/test_valid_keyvalues_examples.py index 948fadabfa..646e8551d1 100644 --- a/test_data_model/tests/test_valid_keyvalues_examples.py +++ b/test_data_model/tests/test_valid_keyvalues_examples.py @@ -16,9 +16,9 @@ ################################################################################# # version 26/02/25 - 1 -import json -import os -import requests +from json import load +from os.path import join, exists +from requests import get from jsonschema import validate, ValidationError def validate_json_against_schema(json_data, schema): @@ -56,7 +56,7 @@ def check_context_url(context): if isinstance(context, str): # Single URL case try: - response = requests.get(context) + response = get(context) if response.status_code == 200: return True, f"The @context URL '{context}' is valid." else: @@ -68,7 +68,7 @@ def check_context_url(context): warnings = [] for url in context: try: - response = requests.get(url) + response = get(url) if response.status_code != 200: warnings.append(f"*** The @context URL '{url}' does not return a valid response (HTTP {response.status_code}).") except Exception as e: @@ -96,9 +96,9 @@ def test_valid_keyvalues_examples(repo_to_test, options): output (list): List of messages describing the results of the test. """ # Paths to the files - schema_file = os.path.join(repo_to_test, "schema.json") - example_json_file = os.path.join(repo_to_test, "examples", "example.json") - example_jsonld_file = os.path.join(repo_to_test, "examples", "example.jsonld") + schema_file = join(repo_to_test, "schema.json") + example_json_file = join(repo_to_test, "examples", "example.json") + example_jsonld_file = join(repo_to_test, "examples", "example.jsonld") output = [] success = True @@ -111,17 +111,17 @@ def test_valid_keyvalues_examples(repo_to_test, options): # Check if the schema file exists - if not os.path.exists(schema_file): + if not exists(schema_file): return "Checking that example files are valid against the schema", False, ["Schema file not found."] # Load the schema with open(schema_file, 'r') as f: - schema = json.load(f) + schema = load(f) # Validate example.json - if os.path.exists(example_json_file): + if exists(example_json_file): with open(example_json_file, 'r') as f: - example_json = json.load(f) + example_json = load(f) is_valid, message = validate_json_against_schema(example_json, schema) output.append(f"example.json: {message}") if not is_valid: @@ -131,9 +131,9 @@ def test_valid_keyvalues_examples(repo_to_test, options): success = False # Validate example.jsonld - if os.path.exists(example_jsonld_file): + if exists(example_jsonld_file): with open(example_jsonld_file, 'r') as f: - example_jsonld = json.load(f) + example_jsonld = load(f) is_valid, message = validate_json_against_schema(example_jsonld, schema) output.append(f"example.jsonld: {message}") if not is_valid: @@ -143,10 +143,7 @@ def test_valid_keyvalues_examples(repo_to_test, options): if "@context" in example_jsonld: context = example_jsonld["@context"] is_context_valid, context_message = check_context_url(context) - if not is_context_valid: - output.append(context_message) # Warning message - else: - output.append(context_message) + output.append(context_message) # Warning message else: output.append("*** example.jsonld is missing the mandatory '@context' attribute.") success = False diff --git a/test_data_model/tests/test_valid_ngsild.py b/test_data_model/tests/test_valid_ngsild.py index f536979e21..10b7a56143 100644 --- a/test_data_model/tests/test_valid_ngsild.py +++ b/test_data_model/tests/test_valid_ngsild.py @@ -16,8 +16,8 @@ ################################################################################# # version 26/02/25 - 1 -import json -import requests +from json import load, JSONDecodeError +from requests import get def check_context_url(context): @@ -36,7 +36,7 @@ def check_context_url(context): if isinstance(context, str): # Single URL case try: - response = requests.get(context) + response = get(context) if response.status_code == 200: return True, f"The @context URL '{context}' is valid." else: @@ -48,7 +48,7 @@ def check_context_url(context): warnings = [] for url in context: try: - response = requests.get(url) + response = get(url) if response.status_code != 200: warnings.append( f"*** The @context URL '{url}' does not return a valid response (HTTP {response.status_code}).") @@ -83,7 +83,7 @@ def test_valid_ngsild(repo_path, options): try: # Load the example-normalized.jsonld file with open(f"{repo_path}/examples/example-normalized.jsonld", 'r') as file: - entity = json.load(file) + entity = load(file) # Validate that the root element is a single entity (a dictionary) if not isinstance(entity, dict): @@ -152,7 +152,7 @@ def test_valid_ngsild(repo_path, options): output.append(f"*** Property '{key}' is missing the 'value' field") - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** example-normalized.jsonld is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_valid_ngsiv2.py b/test_data_model/tests/test_valid_ngsiv2.py index bd5159e6d0..338a22590c 100644 --- a/test_data_model/tests/test_valid_ngsiv2.py +++ b/test_data_model/tests/test_valid_ngsiv2.py @@ -16,7 +16,7 @@ ################################################################################# # version 26/02/25 - 1 -import json +from json import load, JSONDecodeError def validate_entity(entity): """ @@ -77,7 +77,7 @@ def test_valid_ngsiv2(repo_path, options): try: # Load the example-normalized.json file with open(f"{repo_path}/examples/example-normalized.json", 'r') as file: - data = json.load(file) + data = load(file) success, output = validate_entity(data) @@ -97,7 +97,7 @@ def test_valid_ngsiv2(repo_path, options): if "value" not in data [entity]: success = False output.append(f"*** {entity} has not value") - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** example-normalized.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_yaml_files.py b/test_data_model/tests/test_yaml_files.py index d93279876b..290cd1060b 100644 --- a/test_data_model/tests/test_yaml_files.py +++ b/test_data_model/tests/test_yaml_files.py @@ -16,8 +16,8 @@ ################################################################################# # version 26/02/25 - 1 -import os -import yaml +from os.path import join, exists, basename +from yaml import safe_load, YAMLError def validate_yaml_file(file_path): """ @@ -33,17 +33,17 @@ def validate_yaml_file(file_path): """ try: with open(file_path, 'r') as file: - yaml.safe_load(file) + safe_load(file) # Extract only the filename from the full path - file_name = os.path.basename(file_path) + file_name = basename(file_path) return True, f"The file '{file_name}' is a valid YAML file." - except yaml.YAMLError as e: + except YAMLError as e: # Extract only the filename from the full path - file_name = os.path.basename(file_path) + file_name = basename(file_path) return False, f"*** The file '{file_name}' is not a valid YAML file: {e}" except Exception as e: # Extract only the filename from the full path - file_name = os.path.basename(file_path) + file_name = basename(file_path) return False, f"*** An error occurred while reading '{file_name}': {e}" def test_yaml_files(repo_to_test, options): @@ -72,8 +72,8 @@ def test_yaml_files(repo_to_test, options): for yaml_file in yaml_files: - file_path = os.path.join(repo_to_test, yaml_file) - if not os.path.exists(file_path): + file_path = join(repo_to_test, yaml_file) + if not exists(file_path): output.append(f"*** The file '{yaml_file}' does not exist.") success = False else: