Skip to content

Commit

Permalink
feat: logging house messages store
Browse files Browse the repository at this point in the history
  • Loading branch information
Glaucio Jannotti committed Jun 28, 2024
1 parent 7f86700 commit 3df5a4b
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 181 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* truzzt GmbH - Initial implementation
*
*/

Expand All @@ -21,6 +22,8 @@
import com.truzzt.extension.logginghouse.client.messages.LogMessageSender;
import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher;
import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer;
import com.truzzt.extension.logginghouse.client.store.sql.SqlLoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.store.sql.schema.postgres.PostgresDialectStatements;
import de.fraunhofer.iais.eis.LogMessage;
import de.fraunhofer.iais.eis.RequestMessage;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted;
Expand Down Expand Up @@ -48,14 +51,16 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;

public class LoggingHouseClientExtension implements ServiceExtension {


public static final String LOGGINGHOUSE_CLIENT_EXTENSION = "LoggingHouseClientExtension";
private static final String TYPE_MANAGER_SERIALIZER_KEY = "ids-clearinghouse";

Expand All @@ -71,6 +76,13 @@ public class LoggingHouseClientExtension implements ServiceExtension {
@Inject
private Hostname hostname;

@Inject
private DataSourceRegistry dataSourceRegistry;
@Inject
private TransactionContext transactionContext;
@Inject
private QueryExecutor queryExecutor;

@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
Expand Down Expand Up @@ -101,7 +113,6 @@ public class LoggingHouseClientExtension implements ServiceExtension {
public Monitor monitor;
private boolean enabled;


@Override
public String name() {
return LOGGINGHOUSE_CLIENT_EXTENSION;
Expand All @@ -127,7 +138,9 @@ public void initialize(ServiceExtensionContext context) {
registerSerializerClearingHouseMessages(context);
registerClearingHouseMessageSenders(context);

registerEventSubscriber(context);
var loggingHouseMessageStore = initializeLoggingHouseMessageStore(typeManager);

registerEventSubscriber(context, loggingHouseMessageStore);
}

private URL readUrlFromSettings(ServiceExtensionContext context) {
Expand All @@ -146,7 +159,7 @@ private URL readUrlFromSettings(ServiceExtensionContext context) {
}
}

public void runFlywayMigrations(ServiceExtensionContext context) {
private void runFlywayMigrations(ServiceExtensionContext context) {
var flywayService = new FlywayService(
context.getMonitor(),
context.getSetting(EDC_DATASOURCE_REPAIR_SETTING, false),
Expand All @@ -157,13 +170,22 @@ public void runFlywayMigrations(ServiceExtensionContext context) {
migrationManager.migrate();
}

private void registerEventSubscriber(ServiceExtensionContext context) {
private SqlLoggingHouseMessageStore initializeLoggingHouseMessageStore(TypeManager typeManager) {
return new SqlLoggingHouseMessageStore(
dataSourceRegistry,
DataSourceRegistry.DEFAULT_DATASOURCE,
transactionContext,
typeManager.getMapper(),
new PostgresDialectStatements(),
queryExecutor
);
}

private void registerEventSubscriber(ServiceExtensionContext context, SqlLoggingHouseMessageStore loggingHouseMessageStore) {
monitor.debug("Registering event subscriber for LoggingHouseClientExtension");

var eventSubscriber = new IdsClearingHouseServiceImpl(
dispatcherRegistry,
hostname,
loggingHouseLogUrl,
var eventSubscriber = new LoggingHouseEventSubscriber(
loggingHouseMessageStore,
contractNegotiationStore,
transferProcessStore,
monitor);
Expand All @@ -178,7 +200,7 @@ private void registerEventSubscriber(ServiceExtensionContext context) {
eventRouter.registerSync(TransferProcessCompleted.class, eventSubscriber);
eventRouter.registerSync(TransferProcessFailed.class, eventSubscriber);
eventRouter.registerSync(TransferProcessTerminated.class, eventSubscriber);
context.registerService(IdsClearingHouseServiceImpl.class, eventSubscriber);
context.registerService(LoggingHouseEventSubscriber.class, eventSubscriber);

monitor.debug("Registered event subscriber for LoggingHouseClientExtension");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2022 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*
*/

package com.truzzt.extension.logginghouse.client;

import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.event.EventSubscriber;
import org.eclipse.edc.spi.monitor.Monitor;

import java.time.ZonedDateTime;
import java.util.Objects;

public class LoggingHouseEventSubscriber implements EventSubscriber {

private final LoggingHouseMessageStore loggingHouseMessageStore;
private final ContractNegotiationStore contractNegotiationStore;
private final TransferProcessStore transferProcessStore;
private final Monitor monitor;

public LoggingHouseEventSubscriber(
LoggingHouseMessageStore loggingHouseMessageStore,
ContractNegotiationStore contractNegotiationStore,
TransferProcessStore transferProcessStore,
Monitor monitor) {
this.loggingHouseMessageStore = loggingHouseMessageStore;
this.contractNegotiationStore = contractNegotiationStore;
this.transferProcessStore = transferProcessStore;
this.monitor = monitor;
}

public void storeContractAgreement(ContractAgreement contractAgreement) {
monitor.info("Storing ContractAgreement to send to LoggingHouse");

var message = LoggingHouseMessage.Builder.newInstance()
.eventToLog(contractAgreement)
.createdAt(ZonedDateTime.now())
.build();
loggingHouseMessageStore.save(message);
}

public void storeTransferProcess(TransferProcess transferProcess) {
monitor.info("Storing TransferProcess to send to LoggingHouse");

var message = LoggingHouseMessage.Builder.newInstance()
.eventToLog(transferProcess)
.createdAt(ZonedDateTime.now())
.build();
loggingHouseMessageStore.save(message);
}

@Override
public <E extends Event> void on(EventEnvelope<E> event) {
if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) {
monitor.debug("Storing ContractNegotiationFinalized event with id " + event.getId());

var contractAgreement = resolveContractAgreement(contractNegotiationFinalized);
try {
storeContractAgreement(contractAgreement);
} catch (Exception e) {
monitor.warning("Could not store ContractNegotiation: " + e.getMessage());
}
} else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) {
monitor.debug("Storing TransferProcess event with id " + event.getId());

var transferProcess = resolveTransferProcess(transferProcessEvent);
try {
storeTransferProcess(transferProcess);
} catch (Exception e) {
monitor.warning("Could not store TransferProcess: " + e.getMessage());
}
}
}

private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException {
var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId();
var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId);
return Objects.requireNonNull(contractNegotiation).getContractAgreement();
}

private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) {
var transferProcessId = transferProcessEvent.getTransferProcessId();
return transferProcessStore.findById(transferProcessId);
}
}
Loading

0 comments on commit 3df5a4b

Please sign in to comment.