Skip to content

Commit

Permalink
MODSOURCE-752: fix kafka headers and event model
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Aug 6, 2024
1 parent 8208cde commit d11f6ac
Show file tree
Hide file tree
Showing 29 changed files with 432 additions and 576 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public Future<String> handle(KafkaConsumerRecord<String, byte[]> targetRecord) {
LOGGER.debug("handle:: RecordCollection has been received with event: '{}', jobExecutionId '{}', chunkId: '{}', starting processing... chunkNumber '{}'-'{}'",
event.getEventType(), jobExecutionId, chunkId, chunkNumber, key);
setUserMetadata(recordCollection, userId);
return recordService.saveRecords(recordCollection, toOkapiHeaders(kafkaHeaders, null))
return recordService.saveRecords(recordCollection, toOkapiHeaders(kafkaHeaders))
.compose(recordsBatchResponse -> sendBackRecordsBatchResponse(recordsBatchResponse, kafkaHeaders, tenantId, chunkNumber, event.getEventType(), targetRecord));
} catch (Exception e) {
LOGGER.warn("handle:: RecordCollection processing has failed with errors jobExecutionId '{}', chunkId: '{}', chunkNumber '{}'-'{}'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static org.folio.dao.util.QMEventTypes.QM_ERROR;
import static org.folio.dao.util.QMEventTypes.QM_SRS_MARC_RECORD_UPDATED;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.services.util.EventHandlingUtil.createProducer;
import static org.folio.services.util.EventHandlingUtil.createProducerRecord;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;
Expand Down Expand Up @@ -65,12 +65,12 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
log.trace("handle:: Handling kafka consumerRecord {}", consumerRecord);

var kafkaHeaders = consumerRecord.headers();
var okapiHeaders = toOkapiHeaders(kafkaHeaders, null);
var okapiHeaders = toOkapiHeaders(kafkaHeaders);

return getEventPayload(consumerRecord)
.compose(eventPayload -> {
String snapshotId = eventPayload.getOrDefault(SNAPSHOT_ID_KEY, UUID.randomUUID().toString());
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
return getRecordDto(eventPayload)
.compose(recordDto -> recordService.updateSourceRecord(recordDto, snapshotId, okapiHeaders))
.compose(updatedRecord -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
import static org.folio.dao.util.RecordDaoUtil.getExternalId;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING;
import static org.folio.rest.jooq.Tables.RAW_RECORDS_LB;
import static org.folio.rest.jooq.Tables.RECORDS_LB;
import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB;
import static org.folio.rest.jooq.enums.RecordType.MARC_BIB;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.QueryParamUtil.toRecordType;
import static org.jooq.impl.DSL.condition;
import static org.jooq.impl.DSL.countDistinct;
Expand Down Expand Up @@ -733,7 +733,7 @@ public Future<Optional<Record>> getRecordByCondition(ReactiveClassicGenericQuery

@Override
public Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders));
}
Expand All @@ -748,7 +748,7 @@ public Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Recor

@Override
public Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
logRecordCollection("saveRecords:: Saving", recordCollection, tenantId);
Promise<RecordsBatchResponse> finalPromise = Promise.promise();
Context context = Vertx.currentContext();
Expand Down Expand Up @@ -966,7 +966,7 @@ public Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollectio

@Override
public Future<Record> updateRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId())
.compose(optionalRecord -> optionalRecord
Expand Down Expand Up @@ -1078,7 +1078,7 @@ public Future<Integer> calculateGeneration(ReactiveClassicGenericQueryExecutor t

@Override
public Future<ParsedRecord> updateParsedRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
LOG.trace("updateParsedRecord:: Updating {} record {} for tenant {}", record.getRecordType(),
record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> GenericCompositeFuture.all(Lists.newArrayList(
Expand All @@ -1090,7 +1090,7 @@ public Future<ParsedRecord> updateParsedRecord(Record record, Map<String, String

@Override
public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
logRecordCollection("updateParsedRecords:: Updating", recordCollection, tenantId);
Promise<ParsedRecordsBatchResponse> promise = Promise.promise();
Context context = Vertx.currentContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package org.folio.rest.impl;

import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.List;
import java.util.Map;

import javax.ws.rs.core.Response;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dataimport.util.ExceptionHelper;
import org.folio.rest.jaxrs.model.FetchParsedRecordsBatchRequest;
import org.folio.rest.jaxrs.model.RecordCollection;
Expand All @@ -17,14 +22,6 @@
import org.folio.spring.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class SourceStorageBatchImpl implements SourceStorageBatch {

private static final Logger LOG = LogManager.getLogger();
Expand Down Expand Up @@ -59,7 +56,7 @@ public void postSourceStorageBatchVerifiedRecords(List<String> marcBibIds, Map<S
public void postSourceStorageBatchRecords(RecordCollection entity, Map<String, String> okapiHeaders,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
okapiHeaders.put(TENANT, tenantId);
okapiHeaders.put(OKAPI_TENANT_HEADER, tenantId);
try {
MetadataUtil.populateMetadata(entity.getRecords(), okapiHeaders);
recordService.saveRecords(entity, okapiHeaders)
Expand All @@ -85,7 +82,7 @@ public void postSourceStorageBatchRecords(RecordCollection entity, Map<String, S
public void putSourceStorageBatchParsedRecords(RecordCollection entity, Map<String, String> okapiHeaders,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
okapiHeaders.put(TENANT, tenantId);
okapiHeaders.put(OKAPI_TENANT_HEADER, tenantId);
try {
MetadataUtil.populateMetadata(entity.getRecords(), okapiHeaders);
recordService.updateParsedRecords(entity, okapiHeaders)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
import static org.folio.dao.util.RecordDaoUtil.getExternalIdsConditionWithQualifier;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.QueryParamUtil.toRecordType;
import static org.folio.services.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.services.util.AdditionalFieldsUtil.addFieldToMarcRecord;
import static org.folio.services.util.AdditionalFieldsUtil.getFieldFromMarcRecord;

import io.reactivex.Flowable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.pgclient.PgException;
import io.vertx.sqlclient.Row;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -32,57 +40,48 @@
import java.util.stream.Collectors;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException;

import io.reactivex.Flowable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.pgclient.PgException;
import io.vertx.sqlclient.Row;
import net.sf.jsqlparser.JSQLParserException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.RecordDao;
import org.folio.dao.util.IdType;
import org.folio.dao.util.ParsedRecordDaoUtil;
import org.folio.dao.util.MatchField;
import org.folio.dao.util.ParsedRecordDaoUtil;
import org.folio.dao.util.RecordDaoUtil;
import org.folio.dao.util.RecordType;
import org.folio.dao.util.SnapshotDaoUtil;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.processing.value.ListValue;
import org.folio.rest.jaxrs.model.Filter;
import org.folio.rest.jaxrs.model.RecordIdentifiersDto;
import org.folio.rest.jaxrs.model.RecordMatchingDto;
import org.folio.rest.jaxrs.model.RecordsIdentifiersCollection;
import org.folio.services.exceptions.DuplicateRecordException;
import org.folio.services.util.AdditionalFieldsUtil;
import org.folio.services.util.TypeConnection;
import org.jooq.Condition;
import org.jooq.OrderField;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.folio.dao.RecordDao;
import org.folio.rest.jaxrs.model.FetchParsedRecordsBatchRequest;
import org.folio.rest.jaxrs.model.FieldRange;
import org.folio.rest.jaxrs.model.Filter;
import org.folio.rest.jaxrs.model.MarcBibCollection;
import org.folio.rest.jaxrs.model.ParsedRecord;
import org.folio.rest.jaxrs.model.ParsedRecordDto;
import org.folio.rest.jaxrs.model.ParsedRecordsBatchResponse;
import org.folio.rest.jaxrs.model.RawRecord;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.RecordCollection;
import org.folio.rest.jaxrs.model.RecordIdentifiersDto;
import org.folio.rest.jaxrs.model.RecordMatchingDto;
import org.folio.rest.jaxrs.model.RecordsBatchResponse;
import org.folio.rest.jaxrs.model.RecordsIdentifiersCollection;
import org.folio.rest.jaxrs.model.Snapshot;
import org.folio.rest.jaxrs.model.SourceRecord;
import org.folio.rest.jaxrs.model.SourceRecordCollection;
import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.exceptions.DuplicateRecordException;
import org.folio.services.util.AdditionalFieldsUtil;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
import org.folio.services.util.parser.SearchExpressionParser;
import org.jooq.Condition;
import org.jooq.OrderField;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RecordServiceImpl implements RecordService {
Expand Down Expand Up @@ -125,7 +124,7 @@ public Future<Optional<Record>> getRecordById(String id, String tenantId) {

@Override
public Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
LOG.debug("saveRecord:: Saving record with id: {} for tenant: {}", record.getId(), tenantId);
ensureRecordHasId(record);
ensureRecordHasSuppressDiscovery(record);
Expand Down Expand Up @@ -168,7 +167,7 @@ public Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollectio
}
List<Future> setMatchedIdsFutures = new ArrayList<>();
recordCollection.getRecords().forEach(record -> setMatchedIdsFutures.add(setMatchedIdForRecord(record,
okapiHeaders.get(TENANT))));
okapiHeaders.get(OKAPI_TENANT_HEADER))));
return GenericCompositeFuture.all(setMatchedIdsFutures)
.compose(ar -> ar.succeeded() ?
recordDao.saveRecords(recordCollection, okapiHeaders)
Expand All @@ -189,7 +188,7 @@ public Future<Record> updateRecordGeneration(String matchedId, Record record, Ma
}
record.setId(UUID.randomUUID().toString());

return recordDao.getRecordByMatchedId(matchedId, okapiHeaders.get(TENANT))
return recordDao.getRecordByMatchedId(matchedId, okapiHeaders.get(OKAPI_TENANT_HEADER))
.map(r -> r.orElseThrow(() -> new NotFoundException(format(RECORD_WITH_GIVEN_MATCHED_ID_NOT_FOUND, matchedId))))
.compose(v -> saveRecord(record, okapiHeaders))
.recover(throwable -> {
Expand Down Expand Up @@ -316,7 +315,7 @@ public Future<Record> updateSourceRecord(ParsedRecordDto parsedRecordDto, String
.withAdditionalInfo(parsedRecordDto.getAdditionalInfo())
.withMetadata(parsedRecordDto.getMetadata()), existingRecord.withState(Record.State.OLD), okapiHeaders)))
.orElse(Future.failedFuture(new NotFoundException(
format(RECORD_NOT_FOUND_TEMPLATE, parsedRecordDto.getId()))))), okapiHeaders.get(TENANT));
format(RECORD_NOT_FOUND_TEMPLATE, parsedRecordDto.getId()))))), okapiHeaders.get(OKAPI_TENANT_HEADER));
}

@Override
Expand Down Expand Up @@ -346,7 +345,7 @@ public Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(RecordM

@Override
public Future<Void> deleteRecordById(String id, IdType idType, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
return recordDao.getRecordByExternalId(id, idType, tenantId)
.map(recordOptional -> recordOptional.orElseThrow(() -> new NotFoundException(format(NOT_FOUND_MESSAGE, Record.class.getSimpleName(), id))))
.map(record -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package org.folio.services.domainevent;

import static java.util.Objects.isNull;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.okapi.common.XOkapiHeaders.TOKEN;
import static org.folio.okapi.common.XOkapiHeaders.URL;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_UPDATED;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_CREATED;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_UPDATED;

import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.services.kafka.KafkaSender;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.SourceRecordDomainEvent;
import org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType;
import org.folio.services.kafka.KafkaSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
Expand All @@ -40,15 +37,15 @@ public void publishRecordUpdated(Record updated, Map<String, String> okapiHeader
publishRecord(updated, okapiHeaders, SOURCE_RECORD_UPDATED);
}

private void publishRecord(Record aRecord, Map<String, String> okapiHeaders, EventType eventType) {
private void publishRecord(Record aRecord, Map<String, String> okapiHeaders, SourceRecordDomainEventType eventType) {
if (!domainEventsEnabled || notValidForPublishing(aRecord)) {
return;
}
try {
var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType());
var key = aRecord.getId();
kafkaSender.sendEventToKafka(okapiHeaders.get(TENANT), getEvent(aRecord, eventType), eventType.value(),
kafkaHeaders, key);
kafkaSender.sendEventToKafka(okapiHeaders.get(OKAPI_TENANT_HEADER), aRecord.getRawRecord().getContent(),
eventType.name(), kafkaHeaders, key);
} catch (Exception e) {
LOG.error("Exception during Record domain event sending", e);
}
Expand All @@ -73,19 +70,11 @@ private boolean notValidForPublishing(Record aRecord) {

private List<KafkaHeader> getKafkaHeaders(Map<String, String> okapiHeaders, Record.RecordType recordType) {
return List.of(
KafkaHeader.header(URL, okapiHeaders.get(URL)),
KafkaHeader.header(TENANT, okapiHeaders.get(TENANT)),
KafkaHeader.header(TOKEN, okapiHeaders.get(TOKEN)),
KafkaHeader.header(OKAPI_URL_HEADER, okapiHeaders.get(OKAPI_URL_HEADER)),
KafkaHeader.header(OKAPI_TENANT_HEADER, okapiHeaders.get(OKAPI_TENANT_HEADER)),
KafkaHeader.header(OKAPI_TOKEN_HEADER, okapiHeaders.get(OKAPI_TOKEN_HEADER)),
KafkaHeader.header(RECORD_TYPE, recordType.value())
);
}

private String getEvent(Record eventRecord, EventType type) {
var event = new SourceRecordDomainEvent()
.withId(eventRecord.getId())
.withEventType(type)
.withEventPayload(eventRecord.getRawRecord().getContent());
return Json.encode(event);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.folio.services.domainevent;

public enum SourceRecordDomainEventType {
SOURCE_RECORD_CREATED, SOURCE_RECORD_UPDATED
}
Loading

0 comments on commit d11f6ac

Please sign in to comment.