Skip to content

Commit

Permalink
feat: logging house sender worker
Browse files Browse the repository at this point in the history
  • Loading branch information
jannotti-glaucio committed Jul 1, 2024
1 parent cb9127c commit 2e3489e
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.truzzt.extension.logginghouse.client;

public class ConfigConstants {

static final String LOGGINGHOUSE_ENABLED = "edc.logginghouse.client.enabled";

static final String LOGGINGHOUSE_SERVER_URL_SETTING = "edc.logginghouse.client.server.url";

static final String LOGGINGHOUSE_FLYWAY_REPAIR_SETTING = "edc.logginghouse.client.flyway.repair";

static final String LOGGINGHOUSE_FLYWAY_CLEAN_SETTING = "edc.logginghouse.client.flyway.clean";

static final String LOGGINGHOUSE_EXTENSION_MAX_WORKERS = "edc.logginghouse.client.workers.max";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,26 @@
import com.truzzt.extension.logginghouse.client.messages.LogMessageSender;
import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher;
import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer;
import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.store.sql.SqlLoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.store.sql.schema.postgres.PostgresDialectStatements;
import com.truzzt.extension.logginghouse.client.worker.WorkersManager;
import de.fraunhofer.iais.eis.LogMessage;
import de.fraunhofer.iais.eis.RequestMessage;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAgreed;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationTerminated;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessFailed;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessRequested;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;

import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.event.EventRouter;
Expand All @@ -59,6 +61,12 @@
import java.net.URL;
import java.util.Map;

import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_ENABLED;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_SERVER_URL_SETTING;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_REPAIR_SETTING;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_CLEAN_SETTING;
import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_EXTENSION_MAX_WORKERS;

public class LoggingHouseClientExtension implements ServiceExtension {

public static final String LOGGINGHOUSE_CLIENT_EXTENSION = "LoggingHouseClientExtension";
Expand Down Expand Up @@ -94,21 +102,12 @@ public class LoggingHouseClientExtension implements ServiceExtension {
"cat", "http://w3id.org/mds/data-categories#",
"ids", "https://w3id.org/idsa/core/",
"idsc", "https://w3id.org/idsa/code/");
@Setting
public static final String LOGGINGHOUSE_LOG_URL_SETTING = "edc.logginghouse.extension.url";

@Setting
public static final String LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED = "edc.logginghouse.extension.enabled";

@Setting
private static final String EDC_DATASOURCE_REPAIR_SETTING = "edc.flyway.repair";

@Setting
private static final String FLYWAY_CLEAN = "edc.flyway.clean";

private URL loggingHouseLogUrl;
public Monitor monitor;
private boolean enabled;
private URL loggingHouseLogUrl;

private WorkersManager workersManager;

@Override
public String name() {
Expand All @@ -117,50 +116,50 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
this.monitor = context.getMonitor();
monitor = context.getMonitor();

var extensionEnabled = context.getSetting(LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED, true);
var extensionEnabled = context.getSetting(LOGGINGHOUSE_ENABLED, true);
if (!extensionEnabled) {
this.enabled = false;
this.monitor.info("Logginghouse client extension is disabled.");
enabled = false;
monitor.info("Logginghouse client extension is disabled.");
return;
}
this.enabled = true;
this.monitor.info("Logginghouse client extension is enabled.");
enabled = true;
monitor.info("Logginghouse client extension is enabled.");

this.loggingHouseLogUrl = readUrlFromSettings(context);
loggingHouseLogUrl = readUrlFromSettings(context);

runFlywayMigrations(context);

registerSerializerClearingHouseMessages(context);
registerClearingHouseMessageSenders(context);

var loggingHouseMessageStore = initializeLoggingHouseMessageStore(typeManager);
var store = initializeLoggingHouseMessageStore(typeManager);
registerEventSubscriber(context, store);

registerEventSubscriber(context, loggingHouseMessageStore);
registerDispatcher(context);
workersManager = initializeWorkersManager(context, store);
}

private URL readUrlFromSettings(ServiceExtensionContext context) {
try {
var urlString = context.getSetting(LoggingHouseClientExtension.LOGGINGHOUSE_LOG_URL_SETTING, null);
var urlString = context.getSetting(LOGGINGHOUSE_SERVER_URL_SETTING, null);
if (urlString == null) {
throw new EdcException(String.format("Could not initialize " +
"LoggingHouseClientExtension: " +
"No url specified using setting %s", LoggingHouseClientExtension.LOGGINGHOUSE_LOG_URL_SETTING));
throw new EdcException(String.format("Could not initialize LoggingHouseClientExtension: " +
"No url specified using setting %s", LOGGINGHOUSE_SERVER_URL_SETTING));
}

return new URL(urlString);
} catch (MalformedURLException e) {
throw new EdcException(String.format("Could not parse setting %s to Url",
LoggingHouseClientExtension.LOGGINGHOUSE_LOG_URL_SETTING), e);
LOGGINGHOUSE_SERVER_URL_SETTING), e);
}
}

private void runFlywayMigrations(ServiceExtensionContext context) {
var flywayService = new FlywayService(
context.getMonitor(),
context.getSetting(EDC_DATASOURCE_REPAIR_SETTING, false),
context.getSetting(FLYWAY_CLEAN, false)
context.getSetting(LOGGINGHOUSE_FLYWAY_REPAIR_SETTING, false),
context.getSetting(LOGGINGHOUSE_FLYWAY_CLEAN_SETTING, false)
);
var migrationManager = new DatabaseMigrationManager(context.getConfig(), context.getMonitor(), flywayService);
migrationManager.migrate();
Expand All @@ -177,7 +176,7 @@ private SqlLoggingHouseMessageStore initializeLoggingHouseMessageStore(TypeManag
);
}

private void registerEventSubscriber(ServiceExtensionContext context, SqlLoggingHouseMessageStore loggingHouseMessageStore) {
private void registerEventSubscriber(ServiceExtensionContext context, LoggingHouseMessageStore loggingHouseMessageStore) {
monitor.debug("Registering event subscriber for LoggingHouseClientExtension");

var eventSubscriber = new LoggingHouseEventSubscriber(
Expand Down Expand Up @@ -221,8 +220,18 @@ private void registerCommonTypes(TypeManager typeManager) {
monitor.debug("Registered serializers for LoggingHouseClientExtension");
}

private void registerClearingHouseMessageSenders(ServiceExtensionContext context) {
monitor.debug("Registering message senders for LoggingHouseClientExtension");
private WorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store) {
return new WorkersManager(this.monitor,
context.getSetting(LOGGINGHOUSE_EXTENSION_MAX_WORKERS, 1),
store,
dispatcherRegistry,
hostname,
loggingHouseLogUrl
);
}

private void registerDispatcher(ServiceExtensionContext context) {
monitor.debug("Registering IDS dispatch sender for LoggingHouseClientExtension");

var httpClient = context.getService(EdcHttpClient.class);
var objectMapper = typeManager.getMapper(TYPE_MANAGER_SERIALIZER_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage;

import java.util.List;

public interface LoggingHouseMessageStore {

void save(LoggingHouseMessage message);

List<LoggingHouseMessage> listNotSent();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,36 @@

public class LoggingHouseMessage {
private Long id;
private String eventType;
private String eventId;
private Object eventToLog;
private String processId;
private String consumerId;
private String providerId;
private ZonedDateTime createdAt;
private ZonedDateTime sentAt;

public Long getId() {
return id;
}
public String getEventType() {
return eventType;
}
public String getEventId() {
return eventId;
}
public Object getEventToLog() {
return eventToLog;
}
public String getProcessId() {
return processId;
}
public String getConsumerId() {
return consumerId;
}
public String getProviderId() {
return providerId;
}
public ZonedDateTime getCreatedAt() {
return createdAt;
}
Expand All @@ -50,10 +70,30 @@ public LoggingHouseMessage.Builder id(Long id) {
this.event.id = id;
return this;
}
public LoggingHouseMessage.Builder eventType(String eventType) {
this.event.eventType = eventType;
return this;
}
public LoggingHouseMessage.Builder eventId(String eventId) {
this.event.eventId = eventId;
return this;
}
public LoggingHouseMessage.Builder eventToLog(Object eventToLog) {
this.event.eventToLog = eventToLog;
return this;
}
public LoggingHouseMessage.Builder processId(String processId) {
this.event.processId = processId;
return this;
}
public LoggingHouseMessage.Builder consumerId(String consumerId) {
this.event.consumerId = consumerId;
return this;
}
public LoggingHouseMessage.Builder providerId(String providerId) {
this.event.providerId = providerId;
return this;
}
public LoggingHouseMessage.Builder createdAt(ZonedDateTime createdAt) {
this.event.createdAt = createdAt;
return this;
Expand All @@ -64,7 +104,12 @@ public LoggingHouseMessage.Builder sentAt(ZonedDateTime sentAt) {
}

public LoggingHouseMessage build() {
Objects.requireNonNull(this.event.eventType, "Message eventType must not be null");
Objects.requireNonNull(this.event.eventId, "Message eventId must not be null");
Objects.requireNonNull(this.event.eventToLog, "Message eventToLog must not be null");
Objects.requireNonNull(this.event.processId, "Message processId must not be null");
Objects.requireNonNull(this.event.consumerId, "Message consumerId must not be null");
Objects.requireNonNull(this.event.providerId, "Message providerId must not be null");
Objects.requireNonNull(this.event.createdAt, "Message createdAt must not be null");
return this.event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class SqlLoggingHouseMessageStore extends AbstractSqlStore implements LoggingHouseMessageStore {
Expand Down Expand Up @@ -62,6 +64,11 @@ public void save(LoggingHouseMessage event) {
});
}

@Override
public List<LoggingHouseMessage> listNotSent() {
return new ArrayList<LoggingHouseMessage>();
}

private Long mapFromZonedDateTime(ZonedDateTime zonedDateTime) {
return zonedDateTime != null ? zonedDateTime.toEpochSecond() : null;
}
Expand Down
Loading

0 comments on commit 2e3489e

Please sign in to comment.