-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: small refactor + init of processor service * feat: processing service finished * feat: processing service implemented * feat: bugfixes and cleanup of new service * feat: additional test added * docs: little remark added * fix: invalid logging levels * feat: PR remarks * feat: small improvements * fix: broken ci pipeline
- Loading branch information
Showing
40 changed files
with
1,159 additions
and
397 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
<?xml version="1.0" encoding="UTF-8" ?> | ||
<scriptlet id="validate-ldes" xmlns="http://www.gitb.com/tdl/v1/"> | ||
<params> | ||
<var name="ldesUrl" type="string"/> | ||
<var name="shaclShape" type="string"/> | ||
<var name="delayDuration" type="number"> | ||
<value>60000</value> | ||
</var> | ||
<var name="addresses" type="map" /> | ||
</params> | ||
<steps stopOnError="true"> | ||
<process output="startupReport" desc="Start Replication" id="startReplicatingProcess" | ||
handler="$addresses{processing}"> | ||
<operation>startReplicating</operation> | ||
<input name="ldes-url">$ldesUrl</input> | ||
</process> | ||
<log>"Waiting for the LDES Client status to be available"</log> | ||
<process desc="Wait until client is available" handler="DelayProcessor"> | ||
<operation>delay</operation> | ||
<input>5000</input> | ||
</process> | ||
|
||
<log>"Start checking LDES Client status"</log> | ||
<assign to="replicationOutput"/> | ||
<repuntil desc="Check if replication has ended"> | ||
<do> | ||
<process desc="Wait" handler="DelayProcessor"> | ||
<operation>delay</operation> | ||
<input>$delayDuration</input> | ||
</process> | ||
<process output="replicationOutput" handler="$addresses{processing}"> | ||
<operation>haltWhenReplicated</operation> | ||
</process> | ||
<log level="DEBUG">$replicationOutput{STATUS}</log> | ||
</do> | ||
<cond>$replicationOutput{STATUS} = 'REPLICATING'</cond> | ||
</repuntil> | ||
<log>"Starting shacl validation"</log> | ||
|
||
<verify output="validatorOutput" id="shaclValidationStep" desc="validate against shacl" | ||
handler="$addresses{validation}"> | ||
<input name="shacl-shape">$shaclShape</input> | ||
</verify> | ||
<log>"shacl verification finished"</log> | ||
<process desc="Delete pipeline" handler="$addresses{processing}" operation="destroyPipeline"/> | ||
</steps> | ||
<output name="validatorOutput"> | ||
$validatorOutput | ||
</output> | ||
</scriptlet> |
77 changes: 77 additions & 0 deletions
77
src/main/java/be/vlaanderen/informatievlaanderen/ldes/gitb/ReplicationProcessingService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package be.vlaanderen.informatievlaanderen.ldes.gitb; | ||
|
||
import be.vlaanderen.informatievlaanderen.ldes.gitb.services.replication.ProcessExecutors; | ||
import be.vlaanderen.informatievlaanderen.ldes.gitb.valueobjects.ParameterDefinition; | ||
import be.vlaanderen.informatievlaanderen.ldes.gitb.valueobjects.ProcessParameters; | ||
import be.vlaanderen.informatievlaanderen.ldes.gitb.valueobjects.ProcessResult; | ||
import com.gitb.core.ConfigurationParameters; | ||
import com.gitb.core.Metadata; | ||
import com.gitb.core.TypedParameters; | ||
import com.gitb.ps.Void; | ||
import com.gitb.ps.*; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.stereotype.Component; | ||
|
||
@Component | ||
public class ReplicationProcessingService implements ProcessingService { | ||
private static final String SERVICE_NAME = "ReplicationProcessingService"; | ||
private static final Logger log = LoggerFactory.getLogger(ReplicationProcessingService.class); | ||
private final ProcessExecutors processExecutors; | ||
|
||
public ReplicationProcessingService(ProcessExecutors processExecutors) { | ||
this.processExecutors = processExecutors; | ||
} | ||
|
||
@Override | ||
public GetModuleDefinitionResponse getModuleDefinition(Void parameters) { | ||
final ProcessingModule processingModule = new ProcessingModule(); | ||
processingModule.setId(SERVICE_NAME); | ||
|
||
final Metadata metadata = new Metadata(); | ||
metadata.setName(SERVICE_NAME); | ||
processingModule.setMetadata(metadata); | ||
|
||
processingModule.setConfigs(new ConfigurationParameters()); | ||
|
||
processExecutors.getProcessExecutors().stream() | ||
.map(processExecutor -> { | ||
final var processingOperation = new ProcessingOperation(); | ||
final var typedParameters = new TypedParameters(); | ||
processingOperation.setName(processExecutor.getName()); | ||
typedParameters.getParam().addAll(processExecutor | ||
.getParameterDefinitions() | ||
.stream() | ||
.map(ParameterDefinition::convertToTypedParameter) | ||
.toList()); | ||
processingOperation.setInputs(typedParameters); | ||
return processingOperation; | ||
}) | ||
.forEach(processingModule.getOperation()::add); | ||
|
||
final GetModuleDefinitionResponse getModuleDefinitionResponse = new GetModuleDefinitionResponse(); | ||
getModuleDefinitionResponse.setModule(processingModule); | ||
return getModuleDefinitionResponse; | ||
} | ||
|
||
@Override | ||
public ProcessResponse process(ProcessRequest parameters) { | ||
log.info("Received 'process' command with '{}' operation from test bed for session [{}]", parameters.getOperation(), parameters.getSessionId()); | ||
return processExecutors.getProcessExecutor(parameters.getOperation()) | ||
.map(processExecutor -> processExecutor.execute(new ProcessParameters(parameters.getSessionId(), parameters.getInput()))) | ||
.orElseGet(() -> ProcessResult.invalidOperation(parameters.getOperation())) | ||
.convertToResponse(); | ||
} | ||
|
||
@Override | ||
public BeginTransactionResponse beginTransaction(BeginTransactionRequest parameters) { | ||
return new BeginTransactionResponse(); | ||
} | ||
|
||
@Override | ||
public Void endTransaction(BasicRequest parameters) { | ||
return new Void(); | ||
} | ||
|
||
|
||
} |
Oops, something went wrong.