Skip to content

Refactoring of the code #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions test_data_model/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"results_dir": "/home/aabella/PycharmProjects/data-models/test_data_model/results",
"results_dir": "<result directory>",
"results_dir_help": "This directory will store the results of the tests either one or multiple. It has to be writable by the script",
"download_dir": "/home/aabella/transparentia/CLIENTES/EU/FIWARE/GITHUB/repo_to_test",
"download_dir": "<temporal directory>",
"download_dir_help": "this directory is use for temporal download of files and removed once finished. Don't point to any directory with valuable content"
}
}
366 changes: 280 additions & 86 deletions test_data_model/master_tests.py

Large diffs are not rendered by default.

136 changes: 78 additions & 58 deletions test_data_model/multiple_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
#################################################################################
# version 26/02/25 - 1

import sys
import subprocess
import requests
from sys import argv
from json import dump
from requests import get
from datetime import datetime
import json
from master_tests import quality_analysis


def get_subdirectories(subject_root):
Expand All @@ -33,32 +33,65 @@ def get_subdirectories(subject_root):
Returns:
list: List of subdirectory names.
"""
# Extract the owner and repo name from the URL
api_url = get_api_url(subject_root=subject_root)

try:
# Extract the owner, repo, branch, and root directory from the subject_root
parts = subject_root.strip("/").split("/")
# Fetch the contents of the root directory
response = get(api_url)
if response.status_code != 200:
raise Exception(f"Failed to fetch directory contents: HTTP {response.status_code}")

contents = response.json()
return [item['name'] for item in contents if item['type'] == 'dir']
except Exception as e:
raise Exception(f"Error fetching subdirectories: {e}") from e


def get_api_url(subject_root: str) -> str:
"""
Construct the GitHub API URL to fetch the contents of a directory.

Constructs the URL based on the provided subject_root, which can point to either the master branch or a specific branch/commit.
The URL is used to retrieve directory contents from the GitHub API.

Parameters:
subject_root (str): The URL of the GitHub repository, including the root directory.

Returns:
str: The GitHub API URL.

Raises:
ValueError: If the subject_root URL is invalid.
"""
# Extract the owner and repo name from the URL
parts = subject_root.strip("/").split("/")

owner = parts[3] # e.g., "smart-data-models"
repo = parts[4] # e.g., "incubated"

if 'tree' in parts:
if len(parts) < 7:
raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.")

owner = parts[3] # e.g., "smart-data-models"
repo = parts[4] # e.g., "incubated"
branch = parts[6] # e.g., "d7b7b48f03b9b221d141e074e1d311985ab04f25"
root_directory = "/".join(parts[7:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance"

# GitHub API URL to list contents of the root directory
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref={branch}"
else:
if len(parts) < 5:
raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.")

# Fetch the contents of the root directory
response = requests.get(api_url)
if response.status_code == 200:
contents = response.json()
# Filter out only directories
subdirectories = [item['name'] for item in contents if item['type'] == 'dir']
return subdirectories
else:
raise Exception(f"Failed to fetch directory contents: HTTP {response.status_code}")
except Exception as e:
raise Exception(f"Error fetching subdirectories: {e}")
def run_master_tests(subject_root, subdirectory, email, only_report_errors):
root_directory = "/".join(parts[5:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance"

# GitHub API URL to list contents of the root directory
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref=master"

return api_url


def run_master_tests(subject_root: str, subdirectory: str, email:str, only_report_errors: bool) -> dict:
"""
Run the master_tests.py script for a specific subdirectory.

Expand All @@ -74,67 +107,54 @@ def run_master_tests(subject_root, subdirectory, email, only_report_errors):
try:
# Construct the full URL to the subdirectory
# Remove any trailing slashes and append the subdirectory
print("before directory")
print(subject_root)
subject_root = subject_root.rstrip("/")
print(subdirectory)
subdirectory_url = f"{subject_root}/{subdirectory}"
print(f"Testing subdirectory: {subdirectory_url}")

# Run the master_tests.py script
result = subprocess.run(
[
"python3", "master_tests.py",
subdirectory_url,
email,
"1" if only_report_errors else "0"
],
capture_output=True,
text=True
)

# Parse the output as JSON
return json.loads(result.stdout)
result = quality_analysis(base_url=subdirectory_url,
email=email,
only_report_errors=only_report_errors)

return result
except Exception as e:
print(f"Error running tests for {subdirectory}: {e}")
return {"error": str(e)}


def main():
if len(sys.argv) != 4:
"""
Main function to execute tests on multiple subdirectories of a GitHub repository.

Retrieves the subdirectories from the specified GitHub repository URL and runs quality analysis for each subdirectory.
The results are then saved to a JSON file.
"""
if len(argv) != 4:
print("Usage: python3 multiple_tests.py <subject_root> <email> <only_report_errors>")
sys.exit(1)
exit(1)

### remove
print(sys.argv[1])
subject_root = sys.argv[1]
email = sys.argv[2]
only_report_errors = sys.argv[3].lower() == "true"
subject_root = argv[1]
email = argv[2]
only_report_errors = argv[3].lower() == "true"

# Get the list of subdirectories
subdirectories = get_subdirectories(subject_root)

# Run tests for each subdirectory and collect results
results = []
print(subdirectories)
for subdirectory in subdirectories:
print(f"Running tests for {subdirectory}...")
test_result = run_master_tests(subject_root, subdirectory, email, only_report_errors)
### remove
print(test_result)
# for item in test_result:
# print(item)
# item["datamodel"] = subdirectory
results.append({
"datamodel": subdirectory,
"result": test_result
})
results = \
[{"datamodel": subdirectory,
"result": run_master_tests(subject_root, subdirectory, email, only_report_errors)}
for subdirectory in subdirectories]

# Save the results to a JSON file
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"test_results_{timestamp}.json"
with open(output_filename, "w") as f:
json.dump(results, f, indent=4)
dump(results, f, indent=4)

print(f"Test results saved to {output_filename}")


if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions test_data_model/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Python3.13 project
requests==2.32.3
pyyaml==6.0.2
jsonpointer==3.0.0
jsonschema==4.23.0
jsonref==1.1.0
61 changes: 36 additions & 25 deletions test_data_model/tests/test_array_object_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# Author: Alberto Abella #
#################################################################################
# version 26/02/25 - 1
import json
import os
import requests
from json import load, JSONDecodeError
from os.path import join
from requests import get
from urllib.parse import urljoin
from jsonpointer import resolve_pointer

Expand All @@ -34,10 +34,7 @@ def resolve_ref(repo_path, ref, base_uri=""):
dict: The resolved schema fragment.
"""
try:
if "#" in ref:
url_part, pointer_part = ref.split("#", 1)
else:
url_part, pointer_part = ref, ""
url_part, pointer_part = ref.split("#", 1) if "#" in ref else (ref, "")

if url_part.startswith("http"):
# External reference (absolute URL)
Expand All @@ -48,36 +45,32 @@ def resolve_ref(repo_path, ref, base_uri=""):
else:
# Local reference within the same file
# Use the base URI to determine the file name
if base_uri:
resolved_url = base_uri
else:
# Fallback to the primary schema file in the repo path
resolved_url = os.path.join(repo_path, "schema.json")

resolved_url = base_uri or join(repo_path, "schema.json")

# Fetch the schema
if resolved_url.startswith("http"):
response = requests.get(resolved_url)
response = get(resolved_url)
if response.status_code != 200:
raise ValueError(f"Failed to fetch external schema from {resolved_url}")
schema = response.json()
else:
with open(resolved_url, 'r') as file:
schema = json.load(file)
schema = load(file)

# Resolve the JSON Pointer if it exists
if pointer_part:
try:
schema = resolve_pointer(schema, pointer_part)
except Exception as e:
raise ValueError(f"Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}")
raise ValueError(f"Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}") from e

# Recursively resolve any nested $refs in the resolved schema
# Use the resolved URL as the base URI for nested $refs
schema = resolve_nested_refs(schema, resolved_url if url_part else base_uri)

return schema
except Exception as e:
raise ValueError(f"Error resolving reference {ref}: {e}")
raise ValueError(f"Error resolving reference {ref}: {e}") from e

def resolve_nested_refs(schema, base_uri):
"""
Expand All @@ -86,16 +79,16 @@ def resolve_nested_refs(schema, base_uri):
if isinstance(schema, dict):
if "$ref" in schema:
return resolve_ref("", schema["$ref"], base_uri)
else:
for key, value in schema.items():
schema[key] = resolve_nested_refs(value, base_uri)

for key, value in schema.items():
schema[key] = resolve_nested_refs(value, base_uri)
elif isinstance(schema, list):
for i, item in enumerate(schema):
schema[i] = resolve_nested_refs(item, base_uri)

return schema

def validate_properties(repo_path, properties, base_uri, path="", success=True, output=[]):
def validate_properties(repo_path, properties, base_uri, path="", success=True, output=None):
"""
Recursively validate properties in the schema, ensuring that arrays have 'items' and objects have 'properties'.

Expand All @@ -110,6 +103,9 @@ def validate_properties(repo_path, properties, base_uri, path="", success=True,
Returns:
tuple: (success: bool, output: list)
"""
if output is None:
output = []

for key, value in properties.items():
current_path = f"{path}.{key}" if path else key

Expand All @@ -135,9 +131,24 @@ def validate_properties(repo_path, properties, base_uri, path="", success=True,

# Recursively check nested properties
if "properties" in value and isinstance(value["properties"], dict):
success, output = validate_properties(repo_path, value["properties"], base_uri, current_path + ".", success, output)
success, output = validate_properties(
repo_path,
value["properties"],
base_uri,
f"{current_path}.",
success,
output,
)

if "items" in value and isinstance(value["items"], dict):
success, output = validate_properties(repo_path, value["items"], base_uri, current_path + ".", success, output)
success, output = validate_properties(
repo_path,
value["items"],
base_uri,
f"{current_path}.",
success,
output,
)

return success, output

Expand All @@ -159,7 +170,7 @@ def test_array_object_structure(repo_path, options):

try:
with open(f"{repo_path}/schema.json", 'r') as file:
schema = json.load(file)
schema = load(file)

base_uri = schema.get("$id", "") # Use $id as the base URI for resolving relative $refs

Expand All @@ -171,7 +182,7 @@ def test_array_object_structure(repo_path, options):
elif "properties" in schema and isinstance(schema["properties"], dict):
success, output = validate_properties(repo_path, schema["properties"], base_uri, "", success, output)

except json.JSONDecodeError:
except JSONDecodeError:
success = False
output.append("*** schema.json is not a valid JSON file")
except FileNotFoundError:
Expand Down
Loading