Skip to content

Commit

Permalink
Merge pull request #141 from catenax-ng/release/v1.3.0-delete-cache
Browse files Browse the repository at this point in the history
[2º] - Release/v1.3.0 delete cache: Implemented Mechanism to Delete Cache
  • Loading branch information
matbmoser authored Nov 3, 2023
2 parents 697d380 + 23d4a6c commit 32fb75e
Show file tree
Hide file tree
Showing 25 changed files with 580 additions and 233 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ The changelog format is based on [Keep a Changelog](https://keepachangelog.com/e
- Added structure to manage the information coming from the IRS and jobs initiated
- Enabled callback mechanism with the IRS component
- Created `/api/irs/{processId}/tree` and `/api/irs/{processId}/components` APIs
- Added process to refresh the cache when the transfer has failed
- Added timestamp to every known DTR in the cache for refreshing the contract id every time it is reached.
- Added a mechanism to parse/update file system json files by specific properties, avoiding conflicts

## Issues Fixed
- Fix IRS tree component bugs related to the Digital Twin parsing
- Fix IRS job tracker to one single job.
- Fix bug related to the broadcast search of digital twin registry
- Fix minor bugs related to the digital twin search and the caching mechanism


## [released]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class DtrConfig {
/** ATTRIBUTES **/
String internalDtr;
Timeouts timeouts;
Boolean temporaryStorage;
TemporaryStorage temporaryStorage;
DecentralApis decentralApis;
String assetType;
String endpointInterface;
Expand All @@ -52,7 +52,7 @@ public class DtrConfig {
public DtrConfig() {
}

public DtrConfig(String internalDtr, Timeouts timeouts, Boolean temporaryStorage, DecentralApis decentralApis, String assetType, String endpointInterface, String dspEndpointKey, String semanticIdTypeKey) {
public DtrConfig(String internalDtr, Timeouts timeouts, TemporaryStorage temporaryStorage, DecentralApis decentralApis, String assetType, String endpointInterface, String dspEndpointKey, String semanticIdTypeKey) {
this.internalDtr = internalDtr;
this.timeouts = timeouts;
this.temporaryStorage = temporaryStorage;
Expand All @@ -77,12 +77,6 @@ public String getInternalDtr() {
public void setInternalDtr(String internalDtr) {
this.internalDtr = internalDtr;
}
public Boolean getTemporaryStorage() {
return temporaryStorage;
}
public void setTemporaryStorage(Boolean temporaryStorage) {
this.temporaryStorage = temporaryStorage;
}
public Timeouts getTimeouts() {
return timeouts;
}
Expand Down Expand Up @@ -116,11 +110,49 @@ public void setSemanticIdTypeKey(String semanticIdTypeKey) {
this.semanticIdTypeKey = semanticIdTypeKey;
}

public TemporaryStorage getTemporaryStorage() {
return temporaryStorage;
}

public void setTemporaryStorage(TemporaryStorage temporaryStorage) {
this.temporaryStorage = temporaryStorage;
}

/** INNER CLASSES **/

/**
* This class consists exclusively to define the attributes and methods needed for the DTR's timeouts configuration.
**/

public static class TemporaryStorage{

/** ATTRIBUTES **/
Boolean enabled;

Integer lifetime;

public TemporaryStorage(Boolean enabled, Integer lifetime) {
this.enabled = enabled;
this.lifetime = lifetime;
}

/** GETTERS AND SETTERS **/
public Boolean getEnabled() {
return enabled;
}

public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}

public Integer getLifetime() {
return lifetime;
}

public void setLifetime(Integer lifetime) {
this.lifetime = lifetime;
}
}
public static class Timeouts{

/** ATTRIBUTES **/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@Configuration
@ConfigurationProperties(prefix="configuration.passport")
public class PassportConfig {
private String searchIdSchema;

private List<String> aspects;

Expand All @@ -44,6 +45,11 @@ public PassportConfig(List<String> aspects) {
this.aspects = aspects;
}

public PassportConfig(String searchIdSchema, List<String> aspects) {
this.searchIdSchema = searchIdSchema;
this.aspects = aspects;
}

public List<String> getAspects() {
return aspects;
}
Expand All @@ -52,4 +58,11 @@ public void setAspects(List<String> aspects) {
}


public String getSearchIdSchema() {
return searchIdSchema;
}

public void setSearchIdSchema(String searchIdSchema) {
this.searchIdSchema = searchIdSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.tractusx.productpass.config.DtrConfig;
import org.eclipse.tractusx.productpass.config.IrsConfig;
import org.eclipse.tractusx.productpass.config.PassportConfig;
import org.eclipse.tractusx.productpass.config.ProcessConfig;
import org.eclipse.tractusx.productpass.exceptions.ControllerException;
import org.eclipse.tractusx.productpass.managers.ProcessManager;
Expand Down Expand Up @@ -101,6 +102,10 @@ public class AppController {
IrsConfig irsConfig;
@Autowired
DtrConfig dtrConfig;

@Autowired
PassportConfig passportConfig;

@SuppressWarnings("Unused")
private @Autowired ProcessConfig processConfig;

Expand Down Expand Up @@ -220,6 +225,11 @@ public Response getDigitalTwin(@RequestBody Object body, @PathVariable String pr
if (connectorAddress.isEmpty() || assetId.isEmpty()) {
LogUtil.printError("Failed to parse endpoint [" + connectorAddress + "] or the assetId is not found!");
}
LogUtil.printDebug("[PROCESS " + processId + "] Digital Twin [" + digitalTwin.getIdentification() + "] and Submodel [" + subModel.getIdentification() + "] with EDC endpoint [" + connectorAddress + "] retrieved from DTR");
processManager.setStatus(processId, "digital-twin-found", new History(
assetId,
"READY"
));
String bpn = dtr.getBpn();
Boolean childrenCondition = search.getChildren();
processManager.saveTransferInfo(processId, connectorAddress, semanticId, dataPlaneUrl, bpn, childrenCondition);
Expand All @@ -231,17 +241,12 @@ public Response getDigitalTwin(@RequestBody Object body, @PathVariable String pr
String globalAssetId = digitalTwin.getGlobalAssetId();
String actualPath = status.getTreeState() + "/" + globalAssetId;
processManager.setTreeState(processId, actualPath);
this.treeManager.setNodeByPath(processId, actualPath, new Node(digitalTwin));
this.treeManager.setNodeByPath(processId, actualPath, new Node(digitalTwin, this.passportConfig.getSearchIdSchema()));

// Get children from the node
this.irsService.getChildren(processId, actualPath, globalAssetId, bpn);
}

LogUtil.printDebug("[PROCESS " + processId + "] Digital Twin [" + digitalTwin.getIdentification() + "] and Submodel [" + subModel.getIdentification() + "] with EDC endpoint [" + connectorAddress + "] retrieved from DTR");
processManager.setStatus(processId, "digital-twin-found", new History(
assetId,
"READY"
));

} catch (Exception e) {
LogUtil.printException(e, "This request is not allowed! It must contain the valid attributes from an EDC endpoint");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ public Response getData(@Valid @RequestBody TokenRequest tokenRequestBody) {
}

if (!status.historyExists("data-received")) {
response = httpUtil.getNotFound("The data is not available!");
return httpUtil.buildResponse(response, httpResponse);
status = processManager.getStatus(processId); // Retry to get the status before giving an error
if(!status.historyExists("data-received")) {
response = httpUtil.getNotFound("The data is not available!");
return httpUtil.buildResponse(response, httpResponse);
}
}

if (status.historyExists("data-retrieved")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.validation.Valid;
import org.apache.commons.logging.Log;
import org.eclipse.tractusx.productpass.config.DiscoveryConfig;
import org.eclipse.tractusx.productpass.config.DtrConfig;
import org.eclipse.tractusx.productpass.config.PassportConfig;
Expand Down Expand Up @@ -147,33 +148,26 @@ public Response create(@Valid @RequestBody DiscoverySearch searchBody) {
for(BpnDiscovery bpnDiscovery : bpnDiscoveries){
bpnList.addAll(bpnDiscovery.getBpnNumbers());
}
if(bpnList.size() == 0){
response.message = "The asset was not found in the BPN Discovery!";
response.status = 404;
response.statusText = "Not Found";
return httpUtil.buildResponse(response, httpResponse);
}
String processId = processManager.initProcess();
ConcurrentHashMap<String, List<Dtr>> dataModel = null;
List<EdcDiscoveryEndpoint> edcEndpointBinded = null;
if(dtrConfig.getTemporaryStorage()) {
if(dtrConfig.getTemporaryStorage().getEnabled()) {
try {
dataModel = this.dtrSearchManager.loadDataModel();
} catch (Exception e) {
LogUtil.printWarning("Failed to load data model from disk!");
}
}
// This checks if the cache is deactivated or if the bns are not in thedataModel, if one of them is not in the data model then we need to check for them
if(!dtrConfig.getTemporaryStorage() || ((dataModel==null) || !jsonUtil.checkJsonKeys(dataModel, bpnList, ".", false))){
List<EdcDiscoveryEndpoint> edcEndpoints = catenaXService.getEdcDiscovery(bpnList);
try {
edcEndpointBinded = (List<EdcDiscoveryEndpoint>) jsonUtil.bindReferenceType(edcEndpoints, new TypeReference<List<EdcDiscoveryEndpoint>>() {});
} catch (Exception e) {
throw new ControllerException(this.getClass().getName(), e, "Could not bind the reference type!");
}
if(!this.dtrConfig.getInternalDtr().isEmpty()) {
edcEndpointBinded.stream().filter(endpoint -> endpoint.getBpn().equals(vaultService.getLocalSecret("edc.participantId"))).forEach(endpoint -> {
endpoint.getConnectorEndpoint().add(this.dtrConfig.getInternalDtr());
});
}

catenaXService.searchDTRs(edcEndpointBinded, processId);
if(!dtrConfig.getTemporaryStorage().getEnabled() || ((dataModel==null) || !jsonUtil.checkJsonKeys(dataModel, bpnList, ".", false))){
catenaXService.searchDTRs(bpnList, processId);
}else{

boolean requestDtrs = false;
// Take the results from cache
for(String bpn: bpnList){
List<Dtr> dtrs = null;
Expand All @@ -182,14 +176,29 @@ public Response create(@Valid @RequestBody DiscoverySearch searchBody) {
} catch (Exception e) {
throw new ControllerException(this.getClass().getName(), e, "Could not bind the reference type!");
}

if(dtrs.isEmpty()){
response.message = "Failed to get the bpns from the datamodel";
return httpUtil.buildResponse(response, httpResponse);
}
// Interate over every DTR and add it to the file
Long currentTimestamp = DateTimeUtil.getTimestamp();

// Iterate over every DTR and add it to the file
for(Dtr dtr: dtrs){

Long validUntil = dtr.getValidUntil();
if(validUntil == null || validUntil < currentTimestamp){
requestDtrs = true; // If the cache invalidation time has come request Dtrs
break;
}

processManager.addSearchStatusDtr(processId, dtr);
}
if(requestDtrs){
dtrSearchManager.deleteBpns(dataModel, bpnList); // Delete BPN numbers
catenaXService.searchDTRs(bpnList, processId); // Start again the search
break;
}
}
}

Expand Down Expand Up @@ -289,9 +298,51 @@ public Response search(@Valid @RequestBody Search searchBody) {
}
assetSearch = aasService.decentralDtrSearch(process.id, searchBody);


if(assetSearch == null){
response = httpUtil.getBadRequest("No digital twin was found!");
return httpUtil.buildResponse(response, httpResponse);
status = processManager.getStatus(processId);
// Here start the algorithm to refresh the dtrs in the cache if the transfer was incompleted
List<Dtr> dtrList = new ArrayList<Dtr>();
Map<String, Dtr> dtrs = searchStatus.getDtrs();
List<String> bpnList = new ArrayList<String>();
for(String dtrId: searchStatus.getDtrs().keySet()){
// Check if any dtr search was incomplete
if(!status.historyExists("dtr-"+dtrId+"-transfer-incomplete")) {
continue;
}
// Add the dtr bpn to the update cache list
Dtr dtr = dtrs.get(dtrId);
String bpn = dtr.getBpn();
if(!bpnList.contains(bpn)) {
bpnList.add(dtr.getBpn()); // Add bpn to delete in the cache
}
dtrList.add(dtr);
}

// If no bpn numbers need to be updated is because there is no digital twin found
if(bpnList.size() == 0){
response = httpUtil.getBadRequest("No digital twin was found!");
return httpUtil.buildResponse(response, httpResponse);
}

LogUtil.printWarning("["+dtrList.size()+"] Digital Twin Registries Contracts are invalid and need to be refreshed! For the following BPN Number(s): "+ bpnList.toString());
// Refresh cache or search id
if(dtrConfig.getTemporaryStorage().getEnabled()) {
ConcurrentHashMap<String, List<Dtr>> dataModel = null;
try {
dataModel = this.dtrSearchManager.loadDataModel();
} catch (Exception e) {
LogUtil.printWarning("Failed to load data model from disk!");
}
dtrSearchManager.deleteBpns(dataModel, bpnList); // Delete BPN numbers
}
LogUtil.printMessage("Refreshing ["+bpnList.size()+"] BPN Number Endpoints...");
catenaXService.searchDTRs(bpnList, processId); // Start again the search for refreshing the dtrs
assetSearch = aasService.decentralDtrSearch(process.id, searchBody); // Start again the search
if(assetSearch == null) { // If again was not found then we give an error
response = httpUtil.getBadRequest("No digital twin was found! Even after retrying the digital twin transfer!");
return httpUtil.buildResponse(response, httpResponse);
}
}
// Assing the variables with the content
String assetId = assetSearch.getAssetId();
Expand All @@ -313,10 +364,15 @@ public Response search(@Valid @RequestBody Search searchBody) {

// Check if contract offer was not received
if (dataset == null) {
response.message = "Asset Id not found in any contract!";
response.status = 404;
response.statusText = "Not Found";
return httpUtil.buildResponse(response, httpResponse);
// Retry again...
LogUtil.printWarning("[PROCESS " + process.id + "] No asset id found for the dataset contract offers in the catalog! Requesting catalog again...");
dataset = dataService.getContractOfferByAssetId(assetId, connectorAddress);
if (dataset == null) { // If the contract catalog is not reachable retry...
response.message = "Asset Id not found in any contract!";
response.status = 404;
response.statusText = "Not Found";
return httpUtil.buildResponse(response, httpResponse);
}
}
LogUtil.printDebug("[PROCESS " + process.id + "] Contract found for asset [" + assetId + "] in EDC Endpoint [" + connectorAddress + "]");

Expand Down
Loading

0 comments on commit 32fb75e

Please sign in to comment.