Skip to content

Commit

Permalink
use get_streamed, execution report, define output port
Browse files Browse the repository at this point in the history
  • Loading branch information
muddymudskipper committed Dec 5, 2024
1 parent c398e90 commit 9e822df
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 73 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p

## [unreleased]

### Changed

- Load graphs with get_streamed instead of get
- Defined output port
- Update execution report

## [5.1.0] 2024-12-03

### Added
Expand Down
80 changes: 57 additions & 23 deletions cmem_plugin_pyshacl/plugin_pyshacl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from warnings import simplefilter

import validators.url
from cmem.cmempy.dp.proxy.graph import get, post_streamed
from cmem_plugin_base.dataintegration.context import ExecutionContext
from cmem.cmempy.dp.proxy.graph import get_streamed, post_streamed
from cmem_plugin_base.dataintegration.context import ExecutionContext, ExecutionReport
from cmem_plugin_base.dataintegration.description import Icon, Plugin, PluginParameter
from cmem_plugin_base.dataintegration.entity import (
Entities,
Expand All @@ -23,6 +23,7 @@
get_graphs_list,
)
from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin
from cmem_plugin_base.dataintegration.ports import FixedNumberOfInputs, FixedSchemaPort
from cmem_plugin_base.dataintegration.types import (
BoolParameterType,
IntParameterType,
Expand Down Expand Up @@ -57,6 +58,16 @@
"http://www.w3.org/2002/07/owl#Ontology",
"https://vocab.eccenca.com/dsm/ThesaurusProject",
]
SH_PROPERTIES = [
"focusNode",
"resultPath",
"value",
"sourceShape",
"sourceConstraintComponent",
# "detail",
"resultMessage",
"resultSeverity",
]


def e_t(start: float) -> float:
Expand Down Expand Up @@ -360,6 +371,33 @@ def __init__( # noqa: PLR0913
self.remove_shape_catalog_graph_type = remove_shape_catalog_graph_type
self.max_validation_depth = max_validation_depth

self.input_ports = FixedNumberOfInputs([])
if self.output_entities:
self.schema = self.generate_output_schema()
self.output_port = FixedSchemaPort(self.schema)
else:
self.output_port = None

def generate_output_schema(self) -> EntitySchema:
"""Generate output entity schema."""
paths = [EntityPath(path=SH[p]) for p in SH_PROPERTIES] + [
EntityPath(path=SH.conforms),
EntityPath(path=PROV.wasDerivedFrom),
EntityPath(path=PROV.wasInformedBy),
EntityPath(path=PROV.generatedAtTime),
]
return EntitySchema(type_uri=SH.ValidationResult, paths=paths)

def update_report(self, operation: str, operation_desc: str, entity_count: int) -> None:
"""Update execution report"""
self.context.report.update(
ExecutionReport(
operation=operation,
operation_desc=operation_desc,
entity_count=entity_count,
)
)

def add_prov(self, validation_graph: Graph, utctime: str) -> Graph:
"""Add provenance data"""
self.log.info("Adding PROV information validation graph")
Expand Down Expand Up @@ -507,16 +545,6 @@ def make_entities(
) -> Entities:
"""Create entities"""
self.log.info("Creating entities")
shp = [
"focusNode",
"resultPath",
"value",
"sourceShape",
"sourceConstraintComponent",
# "detail",
"resultMessage",
"resultSeverity",
]
entities = []
conforms = next(iter(validation_graph.objects(predicate=SH.conforms)))
for validation_result in list(validation_graph.subjects(RDF.type, SH.ValidationResult)):
Expand All @@ -530,24 +558,21 @@ def make_entities(
shacl_graph,
)
]
for p in shp
for p in SH_PROPERTIES
] + [[conforms], [self.data_graph_uri], [self.shacl_graph_uri], [utctime]]
entities.append(Entity(uri=validation_result, values=values))
paths = [EntityPath(path=SH[p]) for p in shp] + [
EntityPath(path=SH.conforms),
EntityPath(path=PROV.wasDerivedFrom),
EntityPath(path=PROV.wasInformedBy),
EntityPath(path=PROV.generatedAtTime),
]

return Entities(
entities=entities,
schema=EntitySchema(type_uri=SH.ValidationResult, paths=paths),
schema=self.generate_output_schema(),
)

def get_graph(self, uri: str) -> Graph:
"""Get graph from cmem"""
graph = Graph()
graph.parse(data=get(uri, owl_imports_resolution=self.owl_imports).text, format="turtle")
graph.parse(
data=get_streamed(uri, owl_imports_resolution=self.owl_imports).text, format="turtle"
)
return graph

def check_parameters( # noqa: C901 PLR0912
Expand Down Expand Up @@ -602,14 +627,17 @@ def remove_graph_type(self, data_graph: Graph, iri: str) -> None:
self.log.info(f"Removing graph type <{iri}> from data graph")
data_graph.remove((URIRef(self.data_graph_uri), RDF.type, URIRef(iri)))

def execute( # noqa: C901
def execute( # noqa: C901 PLR0915
self,
inputs: tuple, # noqa: ARG002
inputs: None, # noqa: ARG002
context: ExecutionContext = ExecutionContext,
) -> Entities | None:
"""Execute plugin"""
self.context = context
setup_cmempy_user_access(context.user)
self.check_parameters()
self.update_report("validate", "graphs validated.", 0)

self.log.info(f"Loading data graph <{self.data_graph_uri}> into memory...")
start = time()
data_graph = self.get_graph(self.data_graph_uri)
Expand All @@ -621,16 +649,19 @@ def execute( # noqa: C901
self.remove_graph_type(data_graph, "https://vocab.eccenca.com/dsm/ThesaurusProject")
if self.remove_shape_catalog_graph_type:
self.remove_graph_type(data_graph, "https://vocab.eccenca.com/shui/ShapeCatalog")
self.update_report("load", "graphs loaded", 1)

self.log.info(f"Loading SHACL graph <{self.shacl_graph_uri}> into memory...")
start = time()
shacl_graph = self.get_graph(self.shacl_graph_uri)
self.log.info(f"Finished loading SHACL graph in {e_t(start)} seconds")
self.update_report("load", "graphs loaded", 2)

if self.ontology_graph_uri:
self.log.info(f"Loading ontology graph <{self.ontology_graph_uri}> into memory...")
ontology_graph = self.get_graph(self.ontology_graph_uri)
self.log.info(f"Finished loading ontology graph in {e_t(start)} seconds")
self.update_report("load", "graphs loaded", 3)
else:
ontology_graph = None

Expand Down Expand Up @@ -670,7 +701,10 @@ def execute( # noqa: C901

self.post_graph(validation_graph)

self.update_report("validate", "graph validated.", 1)

if self.output_entities:
self.log.info("Outputting entities")
return entities

return None
Loading

0 comments on commit 9e822df

Please sign in to comment.