Skip to content

Commit

Permalink
✨ Pass contract details to API backend in Consumer Pull HTTP requests
Browse files Browse the repository at this point in the history
  • Loading branch information
agmangas committed May 13, 2024
1 parent 6279ca2 commit fb10e7a
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 23 deletions.
1 change: 1 addition & 0 deletions connector/openapi-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation(libs.swaggerParser)
implementation(libs.slugify)
implementation(libs.json)
implementation(libs.okhttp3.okhttp)

if (
project.hasProperty("useOauthIdentity") &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package eu.datacellar.connector;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.dataplane.http.spi.HttpParamsDecorator;
import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams.Builder;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;

import okhttp3.HttpUrl;

/**
* Decorator class for adding contract details to the proxied HTTP requests.
*/
public class ContractDetailsHttpParamsDecorator implements HttpParamsDecorator {

private final Monitor monitor;
private final ContractNegotiationStore contractNegotiationStore;
private static final String QUERY_PARAM_CONTRACT_ID = "contractId";
private static final String HEADER_CONNECTOR_EXT = "X-Connector-Extension";
private static final String HEADER_ASSET_ID = "X-Connector-Asset-Id";
private static final String HEADER_CONSUMER_ID = "X-Connector-Consumer-Id";
private static final String HEADER_CONTRACT_SIGNING_DATE = "X-Connector-Contract-Signing-Date";
private static final String PROP_QUERY_PARAMS = "queryParams";

/**
* Constructs a new instance of the ContractDetailsHttpParamsDecorator class.
*
* @param monitor The monitor object used for monitoring.
* @param contractNegotiationStore The contract negotiation store object used
* for storing contract details.
*/
public ContractDetailsHttpParamsDecorator(Monitor monitor, ContractNegotiationStore contractNegotiationStore) {
this.monitor = monitor;
this.contractNegotiationStore = contractNegotiationStore;
}

private Map<String, List<String>> queryParamsStringToMap(String queryParams) {
Map<String, List<String>> queryMap = new HashMap<>();

HttpUrl url = HttpUrl.parse("https://example.com?%s".formatted(queryParams));

for (int i = 0, size = url.querySize(); i < size; i++) {
String key = url.queryParameterName(i);
String value = url.queryParameterValue(i);

if (queryMap.containsKey(key)) {
queryMap.get(key).add(value);
} else {
queryMap.put(key, List.of(value));
}
}

return queryMap;
}

private String mapToQueryParamsString(Map<String, List<String>> queryMap) {
HttpUrl.Builder urlBuilder = new HttpUrl.Builder()
.scheme("https")
.host("example.com");

for (Map.Entry<String, List<String>> entry : queryMap.entrySet()) {
for (String value : entry.getValue()) {
urlBuilder.addQueryParameter(entry.getKey(), value);
}
}

HttpUrl url = urlBuilder.build();

return url.query();
}

/**
* Add contract details to the proxied request.
*/
@Override
public Builder decorate(DataFlowStartMessage request, HttpDataAddress address, Builder builder) {
Package pkg = ContractDetailsHttpParamsDecorator.class.getPackage();
String packageName = pkg.getName();
builder.header(HEADER_CONNECTOR_EXT, packageName);

String queryParams = request.getProperties().getOrDefault(PROP_QUERY_PARAMS, null);

if (queryParams == null || queryParams.isEmpty()) {
monitor.debug("Query parameters not found in request properties");
return builder;
}

monitor.debug("Request properties: %s".formatted(request.getProperties()));
Map<String, List<String>> queryMap = queryParamsStringToMap(queryParams);
monitor.debug("Parsed query map: %s".formatted(queryMap));

String contractId = Optional.ofNullable(queryMap.get(QUERY_PARAM_CONTRACT_ID))
.map(list -> list.get(0))
.orElse(null);

if (contractId == null) {
monitor.debug("Contract ID not found in query parameters");
return builder;
}

monitor.debug("Contract ID: %s".formatted(contractId));
ContractAgreement contractAgreement = contractNegotiationStore.findContractAgreement(contractId);

if (contractAgreement == null) {
monitor.debug("Contract agreement with id '%s' not found in store".formatted(contractId));
return builder;
}

monitor.debug("Contract agreement: %s".formatted(contractAgreement));

builder.header(HEADER_ASSET_ID, contractAgreement.getAssetId());
builder.header(HEADER_CONSUMER_ID, contractAgreement.getConsumerId());

String contractSigningDate = DateTimeFormatter.ISO_INSTANT
.withZone(ZoneOffset.UTC)
.format(Instant.ofEpochSecond(contractAgreement.getContractSigningDate()));

builder.header(HEADER_CONTRACT_SIGNING_DATE, contractSigningDate);

queryMap.remove(QUERY_PARAM_CONTRACT_ID);
String updatedQueryString = mapToQueryParamsString(queryMap);
monitor.debug("Updated query string: %s".formatted(updatedQueryString));
builder.queryParams(updatedQueryString);

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import javax.sql.DataSource;

import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.offer.store.ContractDefinitionStore;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
Expand Down Expand Up @@ -106,6 +107,9 @@ public class OpenAPICoreExtension implements ServiceExtension {
@Inject
private ContractDefinitionStore contractStore;

@Inject
private ContractNegotiationStore contractNegotiationStore;

@Inject
private DataPlaneInstanceStore dataPlaneStore;

Expand Down Expand Up @@ -404,28 +408,8 @@ public void initialize(ServiceExtensionContext context) {
monitor.warning(String.format("OpenAPI URL (property '%s') is not set", OPENAPI_URL));
}

Package pkg = OpenAPICoreExtension.class.getPackage();
String pkgVersion = pkg.getImplementationVersion();

paramsProvider.registerSourceDecorator((request, address, builder) -> {
if (pkgVersion != null) {
builder.header("X-OpenAPI-Connector-Source-Version", pkgVersion);
}

builder.header("X-OpenAPI-Connector", "source");

return builder;
});

paramsProvider.registerSinkDecorator((request, address, builder) -> {
if (pkgVersion != null) {
builder.header("X-OpenAPI-Connector-Sink-Version", pkgVersion);
}

builder.header("X-OpenAPI-Connector", "sink");

return builder;
});
paramsProvider
.registerSourceDecorator(new ContractDetailsHttpParamsDecorator(monitor, contractNegotiationStore));

monitor.info(String.format("Initialized extension: %s", this.getClass().getName()));
}
Expand Down
2 changes: 2 additions & 0 deletions edcpy/edcpy/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class EndpointDataReference(BaseModel):
authKey: str
authCode: str
properties: dict
contractId: str


async def get_messaging_app() -> AsyncGenerator[MessagingApp, None]:
Expand Down Expand Up @@ -127,6 +128,7 @@ async def http_pull_endpoint(
endpoint=item.endpoint,
id=item.id,
properties=item.properties,
contract_id=item.contractId,
)

# The provider hostname is included in the routing key to facilitate
Expand Down
2 changes: 2 additions & 0 deletions edcpy/edcpy/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class HttpPullMessage(BaseModel):
endpoint: str
id: str
properties: dict
contract_id: str

@property
def http_method(self) -> str:
Expand All @@ -46,6 +47,7 @@ def request_args(self) -> dict:
"method": self.http_method,
"url": self.endpoint,
"headers": {self.auth_key: self.auth_code},
"params": {"contractId": self.contract_id},
}

@property
Expand Down
2 changes: 1 addition & 1 deletion edcpy/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "edcpy"
version = "0.3.0a0"
version = "0.4.0a0"
description = "Package that provides a series of utilities to facilitate interaction with the Management and Control APIs of an EDC connector"
authors = ["Andres Garcia Mangas <[email protected]>"]
license = "EUPL-1.2"
Expand Down

0 comments on commit fb10e7a

Please sign in to comment.