Skip to content

Commit

Permalink
feat: logging house sender worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Glaucio Jannotti committed Jul 3, 2024
1 parent 2e3489e commit 817ef84
Show file tree
Hide file tree
Showing 34 changed files with 459 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ public class ConfigConstants {
static final String LOGGINGHOUSE_FLYWAY_CLEAN_SETTING = "edc.logginghouse.client.flyway.clean";

static final String LOGGINGHOUSE_EXTENSION_MAX_WORKERS = "edc.logginghouse.client.workers.max";

static final String LOGGINGHOUSE_EXTENSION_WORKERS_DELAY = "edc.logginghouse.client.workers.delay";

static final String LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD = "edc.logginghouse.client.workers.period";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@

package com.truzzt.extension.logginghouse.client;

import com.truzzt.extension.logginghouse.client.events.LoggingHouseEventSubscriber;
import com.truzzt.extension.logginghouse.client.flyway.FlywayService;
import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties;
import com.truzzt.extension.logginghouse.client.flyway.migration.DatabaseMigrationManager;
import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender;
import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessageSender;
import com.truzzt.extension.logginghouse.client.messages.LogMessageSender;
import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartSender;
import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessageSender;
import com.truzzt.extension.logginghouse.client.events.messages.LogMessageSender;
import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher;
import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer;
import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.store.sql.SqlLoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.store.sql.schema.postgres.PostgresDialectStatements;
import com.truzzt.extension.logginghouse.client.worker.WorkersManager;
import com.truzzt.extension.logginghouse.client.worker.LoggingHouseWorkersManager;
import com.truzzt.extension.logginghouse.client.worker.WorkersExecutor;
import de.fraunhofer.iais.eis.LogMessage;
import de.fraunhofer.iais.eis.RequestMessage;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAgreed;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationTerminated;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessFailed;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated;
Expand All @@ -41,7 +41,9 @@
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;

import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Requires;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.event.EventRouter;
Expand All @@ -59,18 +61,39 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.util.Map;

import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_ENABLED;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_SERVER_URL_SETTING;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_REPAIR_SETTING;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_CLEAN_SETTING;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_EXTENSION_MAX_WORKERS;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.*;

@Extension(value = LoggingHouseClientExtension.NAME)
@Requires(value = {
Hostname.class,

TypeManager.class,
EventRouter.class,
IdentityService.class,
RemoteMessageDispatcherRegistry.class,

DataSourceRegistry.class,
TransactionContext.class,
QueryExecutor.class,

ContractNegotiationStore.class,
TransferProcessStore.class,
AssetIndex.class
})
public class LoggingHouseClientExtension implements ServiceExtension {

public static final String LOGGINGHOUSE_CLIENT_EXTENSION = "LoggingHouseClientExtension";
public static final String NAME = "LoggingHouseClientExtension";
private static final String TYPE_MANAGER_SERIALIZER_KEY = "ids-clearinghouse";
private static final Map<String, String> CONTEXT_MAP = Map.of(
"cat", "http://w3id.org/mds/data-categories#",
"ids", "https://w3id.org/idsa/core/",
"idsc", "https://w3id.org/idsa/code/");

@Inject
private Hostname hostname;

@Inject
private TypeManager typeManager;
Expand All @@ -81,9 +104,6 @@ public class LoggingHouseClientExtension implements ServiceExtension {
@Inject
private RemoteMessageDispatcherRegistry dispatcherRegistry;

@Inject
private Hostname hostname;

@Inject
private DataSourceRegistry dataSourceRegistry;
@Inject
Expand All @@ -98,20 +118,14 @@ public class LoggingHouseClientExtension implements ServiceExtension {
@Inject
private AssetIndex assetIndex;

private static final Map<String, String> CONTEXT_MAP = Map.of(
"cat", "http://w3id.org/mds/data-categories#",
"ids", "https://w3id.org/idsa/core/",
"idsc", "https://w3id.org/idsa/code/");

public Monitor monitor;
private boolean enabled;
private URL loggingHouseLogUrl;

private WorkersManager workersManager;
private LoggingHouseWorkersManager workersManager;

@Override
public String name() {
return LOGGINGHOUSE_CLIENT_EXTENSION;
return NAME;
}

@Override
Expand Down Expand Up @@ -168,7 +182,7 @@ private void runFlywayMigrations(ServiceExtensionContext context) {
private SqlLoggingHouseMessageStore initializeLoggingHouseMessageStore(TypeManager typeManager) {
return new SqlLoggingHouseMessageStore(
dataSourceRegistry,
DataSourceRegistry.DEFAULT_DATASOURCE,
DatasourceProperties.LOGGING_HOUSE_DATASOURCE,
transactionContext,
typeManager.getMapper(),
new PostgresDialectStatements(),
Expand All @@ -186,9 +200,7 @@ private void registerEventSubscriber(ServiceExtensionContext context, LoggingHou
monitor);

eventRouter.registerSync(ContractNegotiationFinalized.class, eventSubscriber);
eventRouter.registerSync(ContractNegotiationAgreed.class, eventSubscriber);
eventRouter.registerSync(ContractNegotiationAccepted.class, eventSubscriber);
eventRouter.registerSync(ContractNegotiationTerminated.class, eventSubscriber); // TODO: check pid

eventRouter.registerSync(TransferProcessRequested.class, eventSubscriber);
eventRouter.registerSync(TransferProcessInitiated.class, eventSubscriber);
eventRouter.registerSync(TransferProcessStarted.class, eventSubscriber);
Expand Down Expand Up @@ -220,8 +232,13 @@ private void registerCommonTypes(TypeManager typeManager) {
monitor.debug("Registered serializers for LoggingHouseClientExtension");
}

private WorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store) {
return new WorkersManager(this.monitor,
private LoggingHouseWorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store) {
var periodSeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_DELAY, 30);
var initialDelaySeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD, 10);
var executor = new WorkersExecutor(Duration.ofSeconds(periodSeconds), Duration.ofSeconds(initialDelaySeconds), monitor);

return new LoggingHouseWorkersManager(executor,
monitor,
context.getSetting(LOGGINGHOUSE_EXTENSION_MAX_WORKERS, 1),
store,
dispatcherRegistry,
Expand Down Expand Up @@ -254,6 +271,8 @@ public void start() {
} else {
monitor.info("Starting Logginghouse client extension.");
}

workersManager.execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
*
*/

package com.truzzt.extension.logginghouse.client;
package com.truzzt.extension.logginghouse.client.events;

import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage;
import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessageStatus;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
Expand Down Expand Up @@ -48,26 +49,6 @@ public LoggingHouseEventSubscriber(
this.monitor = monitor;
}

public void storeContractAgreement(ContractAgreement contractAgreement) {
monitor.info("Storing ContractAgreement to send to LoggingHouse");

var message = LoggingHouseMessage.Builder.newInstance()
.eventToLog(contractAgreement)
.createdAt(ZonedDateTime.now())
.build();
loggingHouseMessageStore.save(message);
}

public void storeTransferProcess(TransferProcess transferProcess) {
monitor.info("Storing TransferProcess to send to LoggingHouse");

var message = LoggingHouseMessage.Builder.newInstance()
.eventToLog(transferProcess)
.createdAt(ZonedDateTime.now())
.build();
loggingHouseMessageStore.save(message);
}

@Override
public <E extends Event> void on(EventEnvelope<E> event) {
if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) {
Expand All @@ -91,7 +72,7 @@ public <E extends Event> void on(EventEnvelope<E> event) {
}
}

private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException {
private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) {
var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId();
var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId);
return Objects.requireNonNull(contractNegotiation).getContractAgreement();
Expand All @@ -101,4 +82,36 @@ private TransferProcess resolveTransferProcess(TransferProcessEvent transferProc
var transferProcessId = transferProcessEvent.getTransferProcessId();
return transferProcessStore.findById(transferProcessId);
}

public void storeContractAgreement(ContractAgreement contractAgreement) {
monitor.info("Storing ContractAgreement to send to LoggingHouse");

var message = LoggingHouseMessage.Builder.newInstance()
.eventType(contractAgreement.getClass())
.eventId(contractAgreement.getId())
.eventToLog(contractAgreement)
.createProcess(true)
.processId(contractAgreement.getId())
.consumerId(contractAgreement.getConsumerId())
.providerId(contractAgreement.getProviderId())
.status(LoggingHouseMessageStatus.PENDING)
.createdAt(ZonedDateTime.now())
.build();
loggingHouseMessageStore.save(message);
}

public void storeTransferProcess(TransferProcess transferProcess) {
monitor.info("Storing TransferProcess to send to LoggingHouse");

var message = LoggingHouseMessage.Builder.newInstance()
.eventType(transferProcess.getClass())
.eventId(transferProcess.getId())
.eventToLog(transferProcess)
.createProcess(false)
.processId(transferProcess.getContractId())
.status(LoggingHouseMessageStatus.PENDING)
.createdAt(ZonedDateTime.now())
.build();
loggingHouseMessageStore.save(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
*/

package com.truzzt.extension.logginghouse.client.messages;
package com.truzzt.extension.logginghouse.client.events.messages;

import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
*
*/

package com.truzzt.extension.logginghouse.client.messages;
package com.truzzt.extension.logginghouse.client.events.messages;

import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsConstants;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartParts;
import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartResponse;
import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartSenderDelegate;
import com.truzzt.extension.logginghouse.client.ids.multipart.ResponseUtil;
import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil;
import de.fraunhofer.iais.eis.DynamicAttributeToken;
import de.fraunhofer.iais.eis.Message;
import de.fraunhofer.iais.eis.MessageProcessedNotificationMessageImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*
*/

package com.truzzt.extension.logginghouse.client.messages;
package com.truzzt.extension.logginghouse.client.events.messages;

import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
*
*/

package com.truzzt.extension.logginghouse.client.messages;

import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsConstants;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartParts;
import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartResponse;
import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartSenderDelegate;
import com.truzzt.extension.logginghouse.client.ids.multipart.ResponseUtil;
package com.truzzt.extension.logginghouse.client.events.messages;

import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil;
import de.fraunhofer.iais.eis.DynamicAttributeToken;
import de.fraunhofer.iais.eis.LogMessageBuilder;
import de.fraunhofer.iais.eis.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,26 @@

package com.truzzt.extension.logginghouse.client.flyway.connection;

import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.system.configuration.Config;

import static org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry.DEFAULT_DATASOURCE;

public class DatasourceProperties {
private static final String LOGGING_HOUSE_DATASOURCE = "logginghouse";
public static final String LOGGING_HOUSE_DATASOURCE = "logginghouse";

private static final String DATASOURCE_SETTING_NAME = "edc.datasource.%s.name";
private static final String DATASOURCE_SETTING_JDBC_URL = "edc.datasource.%s.url";
private static final String DATASOURCE_SETTING_USER = "edc.datasource.%s.user";
private static final String DATASOURCE_SETTING_PASSWORD = "edc.datasource.%s.password";
private static final String DATASOURCE_SETTING_NAME = "edc.datasource.logginghouse.name";
private static final String DATASOURCE_SETTING_JDBC_URL = "edc.datasource.logginghouse.url";
private static final String DATASOURCE_SETTING_USER = "edc.datasource.logginghouse.user";
private static final String DATASOURCE_SETTING_PASSWORD = "edc.datasource.logginghouse.password";

private final String name;
private final String jdbcUrl;
private final String user;
private final String password;

public DatasourceProperties(Config config) {
name = getSetting(config, String.format(DATASOURCE_SETTING_NAME, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE));

jdbcUrl = getSetting(config, String.format(DATASOURCE_SETTING_JDBC_URL, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_JDBC_URL, DEFAULT_DATASOURCE));

user = getSetting(config, String.format(DATASOURCE_SETTING_USER, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_USER, DEFAULT_DATASOURCE));

password = getSetting(config, String.format(DATASOURCE_SETTING_PASSWORD, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_PASSWORD, DEFAULT_DATASOURCE));
}

private String getSetting(Config config, String setting, String defaultSetting) {
try {
return config.getString(setting);
} catch (EdcException e) {
return config.getString(defaultSetting);
}
name = config.getString(DATASOURCE_SETTING_NAME);
jdbcUrl = config.getString(DATASOURCE_SETTING_JDBC_URL);
user = config.getString(DATASOURCE_SETTING_USER);
password = config.getString(DATASOURCE_SETTING_PASSWORD);
}

public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

package com.truzzt.extension.logginghouse.client.multipart;

import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartRemoteMessageDispatcher;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartRemoteMessageDispatcher;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartSender;

public class IdsMultipartClearingRemoteMessageDispatcher extends IdsMultipartRemoteMessageDispatcher {

Expand Down
Loading

0 comments on commit 817ef84

Please sign in to comment.