Skip to content

Commit

Permalink
MODSOURCE-752: updateParsedRecord & updateParsedRecords methods handled
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Aug 1, 2024
1 parent 57d354f commit 8208cde
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,19 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* Updates {@link ParsedRecord} in the db
*
* @param record record dto from which {@link ParsedRecord} will be updated
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated ParsedRecord
*/
Future<ParsedRecord> updateParsedRecord(Record record, String tenantId);
Future<ParsedRecord> updateParsedRecord(Record record, Map<String, String> okapiHeaders);

/**
* Update parsed records from collection of records and external relations ids in one transaction
*
* @param recordCollection collection of records from which parsed records will be updated
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with response containing list of successfully updated records and error messages for records that were not updated
*/
Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, String tenantId);
Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders);

/**
* Searches for {@link Record} by id of external entity which was created from desired record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,21 +1077,26 @@ public Future<Integer> calculateGeneration(ReactiveClassicGenericQueryExecutor t
}

@Override
public Future<ParsedRecord> updateParsedRecord(Record record, String tenantId) {
LOG.trace("updateParsedRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
public Future<ParsedRecord> updateParsedRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
LOG.trace("updateParsedRecord:: Updating {} record {} for tenant {}", record.getRecordType(),
record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> GenericCompositeFuture.all(Lists.newArrayList(
updateExternalIdsForRecord(txQE, record),
ParsedRecordDaoUtil.update(txQE, record.getParsedRecord(), ParsedRecordDaoUtil.toRecordType(record))
)).map(res -> record.getParsedRecord()));
)).onSuccess(updated -> recordDomainEventPublisher.publishRecordUpdated(record, okapiHeaders))
.map(res -> record.getParsedRecord()));
}

@Override
public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, String tenantId) {
public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
logRecordCollection("updateParsedRecords:: Updating", recordCollection, tenantId);
Promise<ParsedRecordsBatchResponse> promise = Promise.promise();
Context context = Vertx.currentContext();
if(context == null) return Future.failedFuture("updateParsedRecords must be called by a vertx thread");

var recordsUpdated = new ArrayList<Record>();
context.owner().<ParsedRecordsBatchResponse>executeBlocking(blockingPromise ->
{
Set<String> recordTypes = new HashSet<>();
Expand All @@ -1105,7 +1110,7 @@ public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection r
Field<UUID> prtId = field(name(ID), UUID.class);
Field<JSONB> prtContent = field(name(CONTENT), JSONB.class);

List<ParsedRecord> parsedRecords = recordCollection.getRecords()
List<Record> processedRecords = recordCollection.getRecords()
.stream()
.map(this::validateParsedRecordId)
.peek(record -> {
Expand Down Expand Up @@ -1187,9 +1192,9 @@ public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection r
.setId(null);
}

}).map(Record::getParsedRecord)
.filter(parsedRecord -> Objects.nonNull(parsedRecord.getId()))
.collect(Collectors.toList());
})
.filter(processedRecord -> Objects.nonNull(processedRecord.getParsedRecord().getId()))
.toList();

try (Connection connection = getConnection(tenantId)) {
DSL.using(connection).transaction(ctx -> {
Expand All @@ -1210,21 +1215,21 @@ public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection r
int[] parsedRecordUpdateResults = dsl.batch(parsedRecordUpdates).execute();

// check parsed record update results
List<ParsedRecord> parsedRecordsUpdated = new ArrayList<>();
for (int i = 0; i < parsedRecordUpdateResults.length; i++) {
int result = parsedRecordUpdateResults[i];
ParsedRecord parsedRecord = parsedRecords.get(i);
var processedRecord = processedRecords.get(i);
if (result == 0) {
errorMessages.add(format("Parsed Record with id '%s' was not updated", parsedRecord.getId()));
errorMessages.add(format("Parsed Record with id '%s' was not updated",
processedRecord.getParsedRecord().getId()));
} else {
parsedRecordsUpdated.add(parsedRecord);
recordsUpdated.add(processedRecord);
}
}

blockingPromise.complete(new ParsedRecordsBatchResponse()
.withErrorMessages(errorMessages)
.withParsedRecords(parsedRecordsUpdated)
.withTotalRecords(parsedRecordsUpdated.size()));
.withParsedRecords(recordsUpdated.stream().map(Record::getParsedRecord).collect(Collectors.toList()))
.withTotalRecords(recordsUpdated.size()));
});
} catch (SQLException e) {
LOG.warn("updateParsedRecords:: Failed to update records", e);
Expand All @@ -1242,7 +1247,10 @@ public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection r
}
});

return promise.future();
return promise.future()
.onSuccess(response ->
recordsUpdated.forEach(updated -> recordDomainEventPublisher.publishRecordUpdated(updated, okapiHeaders))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.folio.rest.impl;

import static org.folio.okapi.common.XOkapiHeaders.TENANT;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -57,6 +59,7 @@ public void postSourceStorageBatchVerifiedRecords(List<String> marcBibIds, Map<S
public void postSourceStorageBatchRecords(RecordCollection entity, Map<String, String> okapiHeaders,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
okapiHeaders.put(TENANT, tenantId);
try {
MetadataUtil.populateMetadata(entity.getRecords(), okapiHeaders);
recordService.saveRecords(entity, okapiHeaders)
Expand All @@ -82,9 +85,10 @@ public void postSourceStorageBatchRecords(RecordCollection entity, Map<String, S
public void putSourceStorageBatchParsedRecords(RecordCollection entity, Map<String, String> okapiHeaders,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
okapiHeaders.put(TENANT, tenantId);
try {
MetadataUtil.populateMetadata(entity.getRecords(), okapiHeaders);
recordService.updateParsedRecords(entity, tenantId)
recordService.updateParsedRecords(entity, okapiHeaders)
.map(parsedRecordsBatchResponse -> {
if (!parsedRecordsBatchResponse.getParsedRecords().isEmpty()) {
return PutSourceStorageBatchParsedRecordsResponse.respond200WithApplicationJson(parsedRecordsBatchResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,19 @@ public interface RecordService {
* Updates {@link ParsedRecord} in the db
*
* @param record record dto from which {@link ParsedRecord} will be updated
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated ParsedRecord
*/
Future<ParsedRecord> updateParsedRecord(Record record, String tenantId);
Future<ParsedRecord> updateParsedRecord(Record record, Map<String, String> okapiHeaders);

/**
* Update parsed records from collection of records and external relations ids in one transaction
*
* @param recordCollection collection of records from which parsed records will be updated
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with response containing list of successfully updated records and error messages for records that were not updated
*/
Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, String tenantId);
Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders);

/**
* Fetch stripped parsed records by ids and filter marc fields by provided range of fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,18 @@ public Future<Optional<SourceRecord>> getSourceRecordById(String id, IdType idTy
}

@Override
public Future<ParsedRecord> updateParsedRecord(Record record, String tenantId) {
return recordDao.updateParsedRecord(record, tenantId);
public Future<ParsedRecord> updateParsedRecord(Record record, Map<String, String> okapiHeaders) {
return recordDao.updateParsedRecord(record, okapiHeaders);
}

@Override
public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, String tenantId) {
public Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders) {
if (recordCollection.getRecords().isEmpty()) {
Promise<ParsedRecordsBatchResponse> promise = Promise.promise();
promise.complete(new ParsedRecordsBatchResponse().withTotalRecords(0));
return promise.future();
}
return recordDao.updateParsedRecords(recordCollection, tenantId);
return recordDao.updateParsedRecords(recordCollection, okapiHeaders);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private Future<Record> saveRecord(Record record, Map<String, String> okapiHeader
return recordService.getRecordById(record.getId(), tenantId)
.compose(r -> {
if (r.isPresent()) {
return recordService.updateParsedRecord(record, tenantId).map(record.withGeneration(r.get().getGeneration()));
return recordService.updateParsedRecord(record, okapiHeaders).map(record.withGeneration(r.get().getGeneration()));
} else {
record.getRawRecord().setId(record.getId());
return recordService.saveRecord(record, okapiHeaders).map(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1938,7 +1938,8 @@ private void updateParsedMarcRecords(TestContext context, Record.RecordType reco
List<ParsedRecord> expected = updated.stream()
.map(Record::getParsedRecord)
.collect(Collectors.toList());
recordService.updateParsedRecords(recordCollection, TENANT_ID).onComplete(update -> {
var okapiHeaders = Map.of(TENANT, TENANT_ID);
recordService.updateParsedRecords(recordCollection, okapiHeaders).onComplete(update -> {
if (update.failed()) {
context.fail(update.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void shouldProceedIfConsortiumTrackExists(TestContext context) {

doAnswer(invocationOnMock -> Future.succeededFuture(Optional.of(record))).when(mockedRecordService).getRecordById(anyString(), anyString());

doAnswer(invocationOnMock -> Future.succeededFuture(record.getParsedRecord())).when(mockedRecordService).updateParsedRecord(any(), anyString());
doAnswer(invocationOnMock -> Future.succeededFuture(record.getParsedRecord())).when(mockedRecordService).updateParsedRecord(any(), any());

doAnswer(invocationOnMock -> Future.succeededFuture(recordCollection)).when(mockedRecordService).getRecords(any(), any(), any(), anyInt(), anyInt(), anyString());

Expand Down Expand Up @@ -237,7 +237,7 @@ public void shouldProceedIfConsortiumTrackExists(TestContext context) {
if (e != null) {
context.fail(e);
}
verify(mockedRecordService, times(1)).updateParsedRecord(any(), anyString());
verify(mockedRecordService, times(1)).updateParsedRecord(any(), any());
context.assertNull(payload.getContext().get(CENTRAL_TENANT_INSTANCE_UPDATED_FLAG));
context.assertEquals(expectedCentralTenantId, payload.getContext().get(CENTRAL_TENANT_ID));
async.complete();
Expand Down

0 comments on commit 8208cde

Please sign in to comment.