Skip to content

Commit

Permalink
Merge branch 'feature/436856-cambios-conector-registration' into 'dev…
Browse files Browse the repository at this point in the history
…elop'

Feature/436856 cambios conector registration

See merge request upm-inesdata/inesdata-connector!29
  • Loading branch information
ralconada-gmv committed Jul 18, 2024
2 parents 1c14d02 + bf385e9 commit 87e3c38
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 2 deletions.
29 changes: 29 additions & 0 deletions extensions/participants-from-registration-service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
### Catalog Service Participants Extension

This extension has the functionality for getting participant data within a service catalog.

#### Overview

The `FromRegistrationServiceParticipantsExtension` class implements `ServiceExtension` to facilitate periodic updates of participant data from a registration service. It utilizes the Eclipse EDC framework for managing and processing metadata.

#### Features

- **Periodic Updates**: Periodically retrieves participant data from a configurable registration service.
- **Integration**: Integrates with Eclipse EDC for metadata management and service integration.
- **Concurrency**: Uses scheduled tasks for efficient and timely data updates.

#### Setup

1. **Dependencies**: Ensure dependencies like Eclipse EDC are included in the project.

2. **Configuration**: Adjust settings such as `edc.participants.cache.execution.period.seconds` based on operational requirements.

#### Usage

The extension initializes by retrieving participant configurations and scheduling periodic updates using an in-memory directory (`InMemoryNodeDirectory`). Participant data is obtained via HTTP GET requests and transformed into `TargetNode` objects for ingestion into the directory.

#### Components

- **ParticipantConfiguration**: Manages HTTP requests to the registration service and transforms responses into `TargetNode` objects.

- **SharedNodeDirectory**: Implements `TargetNodeDirectory` to maintain and update participant nodes across the application.
10 changes: 10 additions & 0 deletions extensions/participants-from-registration-service/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
`java-library`
id("com.gmv.inesdata.edc-application")
}

dependencies {
implementation(libs.edc.federated.catalog.spi)
implementation(libs.edc.federated.catalog.core)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.upm.inesdata.catalog;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.catalog.directory.InMemoryNodeDirectory;
import org.eclipse.edc.crawler.spi.TargetNodeDirectory;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.iam.TokenParameters;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.eclipse.edc.catalog.spi.CacheSettings.DEFAULT_EXECUTION_PERIOD_SECONDS;

public class FromRegistrationServiceParticipantsExtension implements ServiceExtension {

@Setting("The time to elapse between two crawl runs")
public static final String EXECUTION_PLAN_PERIOD_SECONDS = "edc.participants.cache.execution.period.seconds";

@Inject
private IdentityService identityService;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private SharedNodeDirectory sharedNodeDirectory = new SharedNodeDirectory();

@Override
public void initialize(ServiceExtensionContext context) {
var periodSeconds = context.getSetting(EXECUTION_PLAN_PERIOD_SECONDS, DEFAULT_EXECUTION_PERIOD_SECONDS);
var monitor = context.getMonitor();
var participantRegistrationService = new ParticipantRegistrationService(monitor, new ObjectMapper());

// Initial update
updateTargetNodeDirectory(context, participantRegistrationService);

// Schedule periodic updates
scheduler.scheduleAtFixedRate(() -> {
try {
updateTargetNodeDirectory(context, participantRegistrationService);
} catch (Exception e) {
monitor.severe("Error updating TargetNodeDirectory", e);
}
}, periodSeconds, periodSeconds, TimeUnit.SECONDS);
}

private void updateTargetNodeDirectory(ServiceExtensionContext context, ParticipantRegistrationService participantRegistrationService) {
var newDir = new InMemoryNodeDirectory();

Result<TokenRepresentation> tokenRepresentationResult = identityService.obtainClientCredentials(
TokenParameters.Builder.newInstance().build());

for (var target : participantRegistrationService.getTargetNodes(context.getConfig(), tokenRepresentationResult)) {
// skipping null target nodes
if (target != null){
newDir.insert(target);
}
}

updateDirectoryInContext(newDir);
}

private void updateDirectoryInContext(InMemoryNodeDirectory newDir) {
sharedNodeDirectory.update(newDir);
}

@Provider
public TargetNodeDirectory federatedCacheNodeDirectory(ServiceExtensionContext context) {
return sharedNodeDirectory;
}

@Override
public void shutdown() {
scheduler.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.upm.inesdata.catalog;

import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.monitor.Monitor;

import org.eclipse.edc.crawler.spi.TargetNode;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.system.configuration.Config;

import static java.lang.String.format;

/**
* Handles participants in configuration and transforms them into TargetNodes
*/
public class ParticipantRegistrationService {

public static final List<String> SUPPORTED_PROTOCOLS = List.of("dataspace-protocol-http");

@Setting
public static final String EDC_CATALOG_REGISTRATION_SERVICE_HOST = "edc.catalog.registration.service.host";
public static final String RESOURCE_URL = "/public/participants";

private final Monitor monitor;
private final Client client = ClientBuilder.newClient();
private final ObjectMapper objectMapper;


/**
* Constructor
*
* @param monitor monitor
* @param objectMapper mapper
*/
public ParticipantRegistrationService(Monitor monitor, ObjectMapper objectMapper) {
this.monitor = monitor;
this.objectMapper = objectMapper;
}




/**
* Makes an HTTP GET request to the specified URL and returns the response as a string.
*
* @param url the URL to make the GET request to
* @param tokenRepresentationResult token
* @return the response from the GET request
*/
public String makeHttpGetRequest(String url, Result<TokenRepresentation> tokenRepresentationResult) {
String token = tokenRepresentationResult.getContent().getToken();
WebTarget target = client.target(url);
return target.request(MediaType.APPLICATION_JSON)
.header("Authorization", "Bearer " + token).get(String.class);
}

/**
* Retrieve TargetNodes from configuration
*
* @param baseConfig EDC Configuration
* @param tokenRepresentationResult token
* @return list of TargetNodes from configuration
*/
public List<TargetNode> getTargetNodes(Config baseConfig, Result<TokenRepresentation> tokenRepresentationResult) {
var participantsConfig = baseConfig.getConfig(EDC_CATALOG_REGISTRATION_SERVICE_HOST);

if (participantsConfig.getEntries().isEmpty()) {
monitor.severe("Error processing url registration service.");
return new ArrayList<>();
} else {
var url = participantsConfig.getEntries().get(EDC_CATALOG_REGISTRATION_SERVICE_HOST) + RESOURCE_URL;

try {
String response = makeHttpGetRequest(url, tokenRepresentationResult);
if(response==null){
return new ArrayList<>();
}
// Process the response and convert it to TargetNodes
// Assuming a method processResponseToTargetNodes(response)
return processResponseToTargetNodes(response);
} catch (Exception e) {
monitor.severe("Exception occurred while making HTTP GET request: " + e.getMessage());
return new ArrayList<>();
}
}
}

private List<TargetNode> processResponseToTargetNodes(String response) {
List<TargetNode> targetNodes = new ArrayList<>();

try {
List<JsonNode> nodes = objectMapper.readValue(response, new TypeReference<>() {});

for (JsonNode node : nodes) {
String participantId = node.get("participantId").asText();
String url = node.get("url").asText();
TargetNode targetNode = new TargetNode(participantId, participantId, url, SUPPORTED_PROTOCOLS);
targetNodes.add(targetNode);
}
} catch (Exception e) {
monitor.severe("Failed to deserialize the registration service response");
}

return targetNodes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.upm.inesdata.catalog;

import org.eclipse.edc.catalog.directory.InMemoryNodeDirectory;
import org.eclipse.edc.crawler.spi.TargetNode;
import org.eclipse.edc.crawler.spi.TargetNodeDirectory;

import java.util.List;

public class SharedNodeDirectory implements TargetNodeDirectory {

private volatile InMemoryNodeDirectory nodeDirectory = new InMemoryNodeDirectory();

public synchronized void update(InMemoryNodeDirectory newDirectory) {
this.nodeDirectory = newDirectory;
}

@Override
public List<TargetNode> getAll() {
return nodeDirectory.getAll();
}

@Override
public void insert(TargetNode node) {
nodeDirectory.insert(node);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.upm.inesdata.catalog.FromRegistrationServiceParticipantsExtension
2 changes: 1 addition & 1 deletion launchers/connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ dependencies {
implementation(libs.edc.observability.api)

// Federated Catalog
implementation(project(":extensions:participants-from-configuration"))
implementation(project(":extensions:participants-from-registration-service"))
implementation(libs.edc.federated.catalog.spi)
implementation(libs.edc.federated.catalog.core)
implementation(libs.edc.federated.catalog.api)
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ include(":spi:count-elements-spi")

// Extensions
include(":extensions:auth-oauth2-jwt")
include(":extensions:participants-from-configuration")
include(":extensions:participants-from-registration-service")
include(":extensions:policy-always-true")
include(":extensions:policy-time-interval")
include(":extensions:vocabulary-api")
Expand Down

0 comments on commit 87e3c38

Please sign in to comment.