From 86df3a0ea8760094e09d9fdae07674b4b566c3f8 Mon Sep 17 00:00:00 2001 From: Azizbek Khushvakov Date: Tue, 21 May 2024 18:23:57 +0500 Subject: [PATCH 1/2] Updated acq-modeles to include fix for isBindaryActive field --- ramls/acq-models | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ramls/acq-models b/ramls/acq-models index 1e3e96d6..8510ad54 160000 --- a/ramls/acq-models +++ b/ramls/acq-models @@ -1 +1 @@ -Subproject commit 1e3e96d69134cbcc7418acbba8bb49c06b52e152 +Subproject commit 8510ad54577cd44c29b1bcf0f598ff6fa497c78a From 554ed5c8fd0fe80dad5acb3ff296105b618786d1 Mon Sep 17 00:00:00 2001 From: Serhii_Nosko Date: Tue, 21 May 2024 19:00:35 +0300 Subject: [PATCH 2/2] MODINVOSTO-172. Make kafka tests more stable --- .../CreateInvoiceEventHandlerTest.java | 120 +++++++++--------- 1 file changed, 61 insertions(+), 59 deletions(-) diff --git a/src/test/java/org/folio/dataimport/handlers/actions/CreateInvoiceEventHandlerTest.java b/src/test/java/org/folio/dataimport/handlers/actions/CreateInvoiceEventHandlerTest.java index 896bb3ca..0c7d8f36 100644 --- a/src/test/java/org/folio/dataimport/handlers/actions/CreateInvoiceEventHandlerTest.java +++ b/src/test/java/org/folio/dataimport/handlers/actions/CreateInvoiceEventHandlerTest.java @@ -59,8 +59,9 @@ import lombok.SneakyThrows; import net.mguenther.kafka.junit.KeyValue; import net.mguenther.kafka.junit.ObserveKeyValues; +import net.mguenther.kafka.junit.ReadKeyValues; import net.mguenther.kafka.junit.SendKeyValues; -import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.commons.collections4.CollectionUtils; import org.folio.ActionProfile; import org.folio.ApiTestSuite; import org.folio.DataImportEventPayload; @@ -276,10 +277,12 @@ public void shouldCreateInvoiceAndPublishDiCompletedEvent() throws InterruptedEx Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfile); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -301,13 +304,9 @@ public void shouldCreateInvoiceAndPublishDiCompletedEvent() throws InterruptedEx // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_COMPLETED.value()); - List> observedRecords = kafkaCluster.observe(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(40, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); - assertEquals(record.getId(), new String(observedRecords.get(0).getHeaders().lastHeader(RECORD_ID_HEADER).value(), UTF_8)); - Event obtainedEvent = Json.decodeValue(observedRecords.get(0).getValue(), Event.class); + Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); assertEquals(DI_INVOICE_CREATED.value(), eventPayload.getEventsChain().get(eventPayload.getEventsChain().size() -1)); @@ -343,10 +342,12 @@ public void shouldNotProcessEventWhenRecordToInvoiceFutureFails() Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfile); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -368,10 +369,7 @@ public void shouldNotProcessEventWhenRecordToInvoiceFutureFails() // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_ERROR.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event publishedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(publishedEvent.getEventPayload(), DataImportEventPayload.class); @@ -388,12 +386,14 @@ public void shouldCreateInvoiceLinesWithCorrectOrderFromEdifactFile() throws IOE Record record = new Record().withParsedRecord(new ParsedRecord().withContent(parsedEdifact)).withId(RECORD_ID); ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfileWithPoLineSyntax); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); payloadContext.put(DATA_IMPORT_PAYLOAD_OKAPI_PERMISSIONS, Json.encode(Collections.singletonList(AcqDesiredPermissions.ASSIGN.getPermission()))); payloadContext.put(DATA_IMPORT_PAYLOAD_OKAPI_USER_ID, USER_ID); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -414,10 +414,7 @@ public void shouldCreateInvoiceLinesWithCorrectOrderFromEdifactFile() throws IOE // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_COMPLETED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -449,12 +446,14 @@ public void shouldMatchPoLinesByPoLineNumberAndCreateInvoiceLinesWithDescription Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfileWithPoLineSyntax); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); payloadContext.put(DATA_IMPORT_PAYLOAD_OKAPI_PERMISSIONS, Json.encode(Collections.singletonList(AcqDesiredPermissions.ASSIGN.getPermission()))); payloadContext.put(DATA_IMPORT_PAYLOAD_OKAPI_USER_ID, USER_ID); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -475,10 +474,7 @@ public void shouldMatchPoLinesByPoLineNumberAndCreateInvoiceLinesWithDescription // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_COMPLETED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -515,11 +511,13 @@ public void shouldMatchPoLinesByRefNumberAndCreateInvoiceLinesWithDescriptionFro ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfileWithPoLineSyntax); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -540,10 +538,7 @@ public void shouldMatchPoLinesByRefNumberAndCreateInvoiceLinesWithDescriptionFro // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_COMPLETED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -588,11 +583,13 @@ public void shouldMatchPoLinesByPoLineNumberAndCreateInvoiceLinesWithPoLinesFund ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfileWithPoLineFundDistribution); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -613,10 +610,7 @@ public void shouldMatchPoLinesByPoLineNumberAndCreateInvoiceLinesWithPoLinesFund // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_COMPLETED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -653,11 +647,13 @@ public void shouldNotLinkInvoiceLinesToPoLinesWhenMultiplePoLinesAreMatchedByRef ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfileWithPoLineSyntax); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -678,10 +674,7 @@ public void shouldNotLinkInvoiceLinesToPoLinesWhenMultiplePoLinesAreMatchedByRef // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_COMPLETED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -713,11 +706,13 @@ public void shouldMatchPoLineByPoLineNumberAndLeaveEmptyInvoiceLineFundDistribut ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfileWithMixedFundDistributionMapping); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -738,10 +733,7 @@ public void shouldMatchPoLineByPoLineNumberAndLeaveEmptyInvoiceLineFundDistribut // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_COMPLETED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -774,15 +766,17 @@ public void shouldPublishDiErrorEventWhenHasNoSourceRecord() throws InterruptedE // given ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfile); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); + HashMap payloadContext = new HashMap<>(); + payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) .withTenant(TENANT_ID) .withOkapiUrl(OKAPI_URL) .withToken(TOKEN) - .withContext(new HashMap<>() {{ - put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); - }}); + .withContext(payloadContext); String topic = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), TENANT_ID, dataImportEventPayload.getEventType()); Event event = new Event().withEventPayload(Json.encode(dataImportEventPayload)); @@ -796,10 +790,7 @@ public void shouldPublishDiErrorEventWhenHasNoSourceRecord() throws InterruptedE // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), TENANT_ID, DI_ERROR.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event publishedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(publishedEvent.getEventPayload(), DataImportEventPayload.class); @@ -812,11 +803,13 @@ public void shouldPublishDiErrorEventWhenPostInvoiceToStorageFailed() throws Int // given ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfile); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -837,10 +830,7 @@ public void shouldPublishDiErrorEventWhenPostInvoiceToStorageFailed() throws Int // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), ERROR_TENANT, DI_ERROR.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event publishedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(publishedEvent.getEventPayload(), DataImportEventPayload.class); @@ -864,11 +854,13 @@ public void shouldNotPublishDiErrorEventWhenDuplicateException() throws Interrup // given ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfile); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); Record record = new Record().withParsedRecord(new ParsedRecord().withContent(edifactParsedContent)).withId(RECORD_ID); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -889,10 +881,7 @@ public void shouldNotPublishDiErrorEventWhenDuplicateException() throws Interrup // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DUPLICATE_ERROR_TENANT, DI_ERROR.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 0) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 0); assertEquals(0, observedValues.size()); } @@ -904,10 +893,12 @@ public void shouldPublishDiErrorWithInvoiceLineErrorWhenOneOfInvoiceLinesCreatio .withId(RECORD_ID); ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfile); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -929,10 +920,7 @@ public void shouldPublishDiErrorWithInvoiceLineErrorWhenOneOfInvoiceLinesCreatio // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_ERROR.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(30, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -962,10 +950,12 @@ public void shouldPublishDiErrorWhenMappingProfileHasInvalidMappingSyntax() thro .withId(RECORD_ID); ProfileSnapshotWrapper profileSnapshotWrapper = buildProfileSnapshotWrapper(jobProfile, actionProfile, mappingProfileWithInvalidMappingSyntax); addMockEntry(JOB_PROFILE_SNAPSHOTS_MOCK, profileSnapshotWrapper); + String testId = UUID.randomUUID().toString(); HashMap payloadContext = new HashMap<>(); payloadContext.put(EDIFACT_INVOICE.value(), Json.encode(record)); payloadContext.put(JOB_PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()); + payloadContext.put("testId", testId); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withEventType(DI_INCOMING_EDIFACT_RECORD_PARSED.value()) @@ -986,11 +976,7 @@ public void shouldPublishDiErrorWhenMappingProfileHasInvalidMappingSyntax() thro // then String topicToObserve = KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), DI_POST_INVOICE_LINES_SUCCESS_TENANT, DI_ERROR.value()); - - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) - .with(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID) - .observeFor(60, TimeUnit.SECONDS) - .build()); + List observedValues = observeValuesAndFilterByTestId(testId, topicToObserve, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); @@ -1058,4 +1044,20 @@ private ProfileSnapshotWrapper buildProfileSnapshotWrapper(JobProfile jobProfile .withContent(JsonObject.mapFrom(mappingProfile).getMap()))))); } + public List observeValuesAndFilterByTestId(String testId, String topicToObserve, Integer countToObserve) throws InterruptedException { + List result = new ArrayList<>(); + List observedValues = kafkaCluster.readValues(ReadKeyValues.from(topicToObserve).build()); + if (CollectionUtils.isEmpty(observedValues)) { + observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, countToObserve) + .observeFor(30, TimeUnit.SECONDS) + .build()); + } + for (String observedValue : observedValues) { + if (observedValue.contains(testId)) { + result.add(observedValue); + } + } + return result; + } + }