Skip to content

Commit 606c8f0

Browse files
authored
Udpate kafka tests (eclipse-basyx#700)
* Udpate kafka tests * Fix compilation error * Fix missing get Type * Trigger rebuilt * Try to handle all events directly * Fix AfterClass was not static * Trigger test * Fix wrong assert * Trigger build * trigger build * Trigger build * Trigger build * Trigger build * Trigger build * Trigger build * Trigger build * Trigger build * Trigger build * Trigger build * Remove empty await no events at test end * Fix wrong event type checked
1 parent 4ee04e8 commit 606c8f0

File tree

10 files changed

+144
-47
lines changed

10 files changed

+144
-47
lines changed

basyx.aasenvironment/basyx.aasenvironment.component/src/test/java/org/eclipse/digitaltwin/basyx/aasenvironment/component/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
******************************************************************************/
2626
package org.eclipse.digitaltwin.basyx.aasenvironment.component;
2727

28-
import java.util.concurrent.TimeUnit;
29-
3028
import org.eclipse.digitaltwin.aas4j.v3.dataformat.json.JsonSerializer;
3129
import org.eclipse.digitaltwin.aas4j.v3.model.AssetAdministrationShell;
3230
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
@@ -35,12 +33,14 @@
3533
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.KafkaAasRepositoryFeature;
3634
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.TestShells;
3735
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEvent;
36+
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEventType;
3837
import org.eclipse.digitaltwin.basyx.core.pagination.PaginationInfo;
3938
import org.eclipse.digitaltwin.basyx.submodelrepository.SubmodelRepository;
4039
import org.eclipse.digitaltwin.basyx.submodelrepository.feature.kafka.KafkaSubmodelRepositoryFeature;
4140
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.SubmodelEventKafkaListener;
4241
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.TestSubmodels;
4342
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEvent;
43+
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEventType;
4444
import org.junit.After;
4545
import org.junit.Assert;
4646
import org.junit.Before;
@@ -145,11 +145,14 @@ public void testFeatureIsEnabled() {
145145
public void cleanup() throws InterruptedException {
146146
for (AssetAdministrationShell aas : aasRepo.getAllAas(new PaginationInfo(null, null)).getResult()) {
147147
aasRepo.deleteAas(aas.getId());
148+
AasEvent aasEvt = aasEventListener.next();
149+
Assert.assertEquals(AasEventType.AAS_DELETED, aasEvt.getType());
148150
}
149151

150152
for (Submodel sm : smRepo.getAllSubmodels(new PaginationInfo(null, null)).getResult()) {
151153
smRepo.deleteSubmodel(sm.getId());
154+
SubmodelEvent smEvt = submodelEventListener.next();
155+
Assert.assertEquals(SubmodelEventType.SM_DELETED, smEvt.getType());
152156
}
153-
while(submodelEventListener.next(300, TimeUnit.MICROSECONDS) != null);
154157
}
155158
}

basyx.aasrepository/basyx.aasrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/feature/kafka/AasEventKafkaListener.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
******************************************************************************/
2626
package org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka;
2727

28-
import java.util.List;
2928
import java.util.Map;
3029
import java.util.concurrent.CountDownLatch;
3130
import java.util.concurrent.LinkedBlockingDeque;
@@ -42,11 +41,9 @@
4241
/**
4342
* @author geso02 (Sonnenberg DFKI GmbH)
4443
*/
45-
@KafkaListener(topics = AasEventKafkaListener.TOPIC_NAME, batch = "false", groupId = "kafka-test-aas", autoStartup = "true")
44+
@KafkaListener(topics = TestApplication.KAFKA_AAS_TOPIC, batch = "false", groupId = TestApplication.KAFKA_GROUP_ID, autoStartup = "true")
4645
@Component
4746
public class AasEventKafkaListener implements ConsumerSeekAware {
48-
49-
public static final String TOPIC_NAME = "aas-events";
5047

5148
private final LinkedBlockingDeque<AasEvent> evt = new LinkedBlockingDeque<AasEvent>();
5249
private final JsonDeserializer deserializer;
@@ -71,20 +68,20 @@ public AasEvent next(int value, TimeUnit unit) throws InterruptedException {
7168
}
7269

7370
public AasEvent next() throws InterruptedException {
74-
return next(1, TimeUnit.MINUTES);
71+
return next(5, TimeUnit.MINUTES);
7572
}
7673

7774
@Override
7875
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
7976
for (TopicPartition eachPartition : assignments.keySet()) {
80-
if (TOPIC_NAME.equals(eachPartition.topic())) {
77+
if (TestApplication.KAFKA_AAS_TOPIC.equals(eachPartition.topic())) {
8178
latch.countDown();
8279
}
8380
}
8481
}
8582

8683
public void awaitTopicAssignment() throws InterruptedException {
87-
if (!latch.await(1, TimeUnit.MINUTES)) {
84+
if (!latch.await(5, TimeUnit.MINUTES)) {
8885
throw new RuntimeException("Timeout occured while waiting for partition assignment. Is kafka running?");
8986
}
9087
}

basyx.aasrepository/basyx.aasrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/feature/kafka/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@
7171
@RunWith(SpringRunner.class)
7272
@TestPropertySource(properties = { KafkaAasRepositoryFeature.FEATURENAME + ".enabled=true",
7373
"spring.kafka.bootstrap-servers=PLAINTEXT_HOST://localhost:9092",
74-
KafkaAasRepositoryFeature.FEATURENAME + ".topic.name=aas-events"
75-
74+
KafkaAasRepositoryFeature.FEATURENAME + ".topic.name="+TestApplication.KAFKA_AAS_TOPIC
7675
})
7776
public class KafkaEventsInMemoryStorageIntegrationTest {
7877

@@ -242,8 +241,13 @@ public void testFeatureIsEnabled() {
242241
public void cleanup() throws InterruptedException {
243242
for (AssetAdministrationShell aas : repo.getAllAas(new PaginationInfo(null, null)).getResult()) {
244243
repo.deleteAas(aas.getId());
244+
AasEvent deletedEvt = listener.next();
245+
Assert.assertEquals(AasEventType.AAS_DELETED, deletedEvt.getType());
246+
Assert.assertEquals(aas.getId(), deletedEvt.getId());
245247
}
246-
while(listener.next(100, TimeUnit.MICROSECONDS) != null);
247248
}
248-
249-
}
249+
@After
250+
public void assertNoAdditionalEvent() throws InterruptedException {
251+
Assert.assertNull(listener.next(1, TimeUnit.SECONDS));
252+
}
253+
}

basyx.aasrepository/basyx.aasrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/feature/kafka/TestApplication.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
@SpringBootApplication
3535
public class TestApplication {
3636

37+
public static final String KAFKA_AAS_TOPIC = "aas-events";
38+
public static final String KAFKA_GROUP_ID = "kafka-tests";
39+
40+
3741
@Bean
3842
public JsonDeserializer getDeserializer() {
3943
return new JsonDeserializer();

basyx.aasrepository/basyx.aasrepository.component/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/component/KafkaFeatureEnabledSmokeTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
3434
import org.eclipse.digitaltwin.aas4j.v3.model.impl.DefaultAssetAdministrationShell;
3535
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.KafkaAasRepositoryFeature;
36+
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.TestApplication;
3637
import org.eclipse.digitaltwin.basyx.aasrepository.AasRepository;
3738
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.AasEventKafkaListener;
3839
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEvent;
@@ -66,7 +67,7 @@
6667
@TestPropertySource(properties = { "basyx.feature.kafka.enabled=true",
6768
"spring.kafka.bootstrap-servers=PLAINTEXT_HOST://localhost:9092",
6869
KafkaAasRepositoryFeature.FEATURENAME + "kafka.enabled=true",
69-
KafkaAasRepositoryFeature.FEATURENAME + ".topic.name=" + AasEventKafkaListener.TOPIC_NAME })
70+
KafkaAasRepositoryFeature.FEATURENAME + ".topic.name=" + TestApplication.KAFKA_AAS_TOPIC })
7071
public class KafkaFeatureEnabledSmokeTest {
7172

7273
@LocalServerPort

basyx.submodelrepository/basyx.submodelrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/submodelrepository/feature/kafka/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ public void testSubmodelElementUpdated() throws InterruptedException {
227227
Assert.assertEquals(TestSubmodels.IDSHORT_PROP_0, evt.getSmElementPath());
228228
Assert.assertNull(evt.getSubmodel());
229229
Assert.assertEquals(elem, evt.getSmElement());
230-
231230
}
232231

233232
@Test
@@ -239,7 +238,6 @@ public void testSubmodelElementAddedUnderPath() throws InterruptedException {
239238

240239
SubmodelElement elem = TestSubmodels.submodelElement(TestSubmodels.IDSHORT_PROP_1, "88");
241240
repo.createSubmodelElement(ID_SM1, TestSubmodels.IDSHORT_COLL, elem);
242-
243241
evt = listener.next();
244242
Assert.assertEquals(SubmodelEventType.SME_CREATED, evt.getType());
245243
Assert.assertEquals(ID_SM1, evt.getId());
@@ -249,8 +247,7 @@ public void testSubmodelElementAddedUnderPath() throws InterruptedException {
249247
}
250248

251249
@Test
252-
public void testSubmodelElementAddedAndBlobValueNotPartOfTheEvent() throws InterruptedException {
253-
250+
public void testSubmodelElementAddedAndBlobValueNotPartOfTheEvent() throws InterruptedException {
254251
Submodel sm = TestSubmodels.submodel();
255252
repo.createSubmodel(sm);
256253
SubmodelEvent evt = listener.next();
@@ -315,9 +312,12 @@ public void testSubmodelElementDeleted() throws InterruptedException {
315312
}
316313

317314
@Test
318-
public void testGetterAreWorking() throws ElementDoesNotExistException, SerializationException {
315+
public void testGetterAreWorking() throws ElementDoesNotExistException, SerializationException, InterruptedException {
319316
Submodel expectedSm = TestSubmodels.submodel();
320317
repo.createSubmodel(expectedSm);
318+
SubmodelEvent evt = listener.next();
319+
Assert.assertEquals(SubmodelEventType.SM_CREATED , evt.getType());
320+
321321
List<Submodel> result = repo.getAllSubmodels(new PaginationInfo(null, null)).getResult();
322322
Assert.assertEquals(1, result.size());
323323
Assert.assertEquals(expectedSm, result.get(0));
@@ -346,9 +346,12 @@ public void cleanup() throws InterruptedException {
346346
if (repo != null) {
347347
for (Submodel sm : repo.getAllSubmodels(new PaginationInfo(null, null)).getResult()) {
348348
repo.deleteSubmodel(sm.getId());
349-
}
349+
SubmodelEvent evt = listener.next();
350+
Assert.assertEquals(SubmodelEventType.SM_DELETED, evt.getType());
351+
Assert.assertEquals(sm.getId(), evt.getId());
352+
}
350353
}
351-
while (listener.next(300, TimeUnit.MICROSECONDS) != null);
352-
353354
}
355+
356+
354357
}

basyx.submodelservice/basyx.submodelservice-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/submodelservice/feature/kafka/KafkaSubmodelServiceIdsOnlySmokeTest.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.concurrent.TimeUnit;
3030

3131
import org.eclipse.digitaltwin.aas4j.v3.dataformat.core.SerializationException;
32-
import org.eclipse.digitaltwin.aas4j.v3.dataformat.json.JsonSerializer;
3332
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
3433
import org.eclipse.digitaltwin.aas4j.v3.model.SubmodelElement;
3534
import org.eclipse.digitaltwin.basyx.core.filerepository.FileRepository;
@@ -85,15 +84,11 @@ public class KafkaSubmodelServiceIdsOnlySmokeTest {
8584

8685
private SubmodelService service;
8786

88-
@Autowired
89-
JsonSerializer serializer;
9087

9188
@Before
9289
public void awaitAssignment() throws InterruptedException, SerializationException {
9390
listener.awaitTopicAssignment();
9491

95-
while(listener.next(100, TimeUnit.MICROSECONDS) != null);
96-
9792
FileRepository repository = new InMemoryFileRepository();
9893
SubmodelBackend backend = new InMemorySubmodelBackend();
9994
SubmodelServiceFactory smFactory = new CrudSubmodelServiceFactory(backend ,repository);
@@ -133,8 +128,4 @@ public void testSubmodelElementPatched() throws InterruptedException, Serializat
133128

134129
}
135130

136-
@After
137-
public void assertNoAdditionalKafkaMessageOnTopic() throws InterruptedException, SerializationException {
138-
Assert.assertNull(listener.next(300, TimeUnit.MILLISECONDS));
139-
}
140131
}

basyx.submodelservice/basyx.submodelservice-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/submodelservice/feature/kafka/KafkaSubmodelServiceSubmodelElementsEventsIntegrationTest.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,6 @@ public class KafkaSubmodelServiceSubmodelElementsEventsIntegrationTest {
9090
public void awaitAssignment() throws InterruptedException {
9191
listener.awaitTopicAssignment();
9292

93-
while(listener.next(100, TimeUnit.MICROSECONDS) != null);
94-
9593
FileRepository repository = new InMemoryFileRepository();
9694
SubmodelBackend backend = new InMemorySubmodelBackend();
9795
SubmodelServiceFactory smFactory = new CrudSubmodelServiceFactory(backend ,repository);
@@ -102,13 +100,10 @@ public void awaitAssignment() throws InterruptedException {
102100
public void testToplevelSubmodelElementAdded() throws InterruptedException, SerializationException {
103101
Assert.assertTrue(feature.isEnabled());
104102

105-
SubmodelEvent evt = listener.next(2, TimeUnit.SECONDS);
106-
107-
108103
SubmodelElement elem = TestSubmodels.submodelElement(TestSubmodels.IDSHORT_PROP_1, "ID");
109104
service.createSubmodelElement(elem);
110105

111-
evt = listener.next();
106+
SubmodelEvent evt = listener.next();
112107
Assert.assertEquals(SubmodelEventType.SME_CREATED, evt.getType());
113108
Assert.assertEquals(submodel.getId(), evt.getId());
114109
Assert.assertEquals(TestSubmodels.IDSHORT_PROP_1, evt.getSmElementPath());
@@ -171,8 +166,4 @@ public void testSubmodelElementDeleted() throws InterruptedException {
171166
Assert.assertNull(evtUpdated.getSmElement());
172167
}
173168

174-
@After
175-
public void assertNoAdditionalKafkaMessageOnTopic() throws InterruptedException, SerializationException {
176-
Assert.assertNull(listener.next(300, TimeUnit.MILLISECONDS));
177-
}
178169
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2024 DFKI GmbH (https://www.dfki.de/en/web)
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining
5+
* a copy of this software and associated documentation files (the
6+
* "Software"), to deal in the Software without restriction, including
7+
* without limitation the rights to use, copy, modify, merge, publish,
8+
* distribute, sublicense, and/or sell copies of the Software, and to
9+
* permit persons to whom the Software is furnished to do so, subject to
10+
* the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be
13+
* included in all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16+
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17+
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18+
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
19+
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
20+
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21+
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22+
*
23+
* SPDX-License-Identifier: MIT
24+
*
25+
******************************************************************************/
26+
package org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka;
27+
28+
import java.util.concurrent.TimeUnit;
29+
30+
import org.eclipse.digitaltwin.aas4j.v3.dataformat.core.SerializationException;
31+
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
32+
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEvent;
33+
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEventType;
34+
import org.junit.After;
35+
import org.junit.Assert;
36+
import org.junit.Before;
37+
import org.junit.Test;
38+
import org.junit.runner.RunWith;
39+
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.boot.test.context.SpringBootTest;
41+
import org.springframework.context.ApplicationContext;
42+
import org.springframework.context.annotation.ComponentScan;
43+
import org.springframework.context.annotation.Import;
44+
import org.springframework.context.event.ContextClosedEvent;
45+
import org.springframework.test.annotation.DirtiesContext;
46+
import org.springframework.test.annotation.DirtiesContext.ClassMode;
47+
import org.springframework.test.context.ActiveProfiles;
48+
import org.springframework.test.context.ContextConfiguration;
49+
import org.springframework.test.context.TestPropertySource;
50+
import org.springframework.test.context.junit4.SpringRunner;
51+
52+
/**
53+
* @author geso02 (Sonnenberg DFKI GmbH)
54+
*/
55+
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
56+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
57+
@ComponentScan(basePackages = { "org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka" })
58+
@RunWith(SpringRunner.class)
59+
@ActiveProfiles("test-submodel")
60+
@ContextConfiguration(classes = {SubmodelServiceTestComponent.class})
61+
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=PLAINTEXT_HOST://localhost:9092",
62+
KafkaSubmodelServiceFeature.FEATURENAME + ".preservationlevel=REMOVE_BLOB_VALUE",
63+
KafkaSubmodelServiceFeature.FEATURENAME + ".enabled=true",
64+
KafkaSubmodelServiceFeature.FEATURENAME + ".topic.name=" + SubmodelEventKafkaListener.TOPIC_NAME,
65+
KafkaSubmodelServiceApplicationListener.SUBMODEL_EVENTS_ACTIVATED + "=true"
66+
})
67+
@Import(SubmodelEventKafkaListener.class)
68+
public class KafkaSubmodelServiceSubmodelEventsIntegrationTest {
69+
70+
@Autowired
71+
private SubmodelEventKafkaListener listener;
72+
73+
@Autowired
74+
private Submodel submodel;
75+
76+
@Autowired
77+
private ApplicationContext context;
78+
79+
@Before
80+
public void awaitAssignment() throws InterruptedException {
81+
listener.awaitTopicAssignment();
82+
}
83+
84+
@Test
85+
public void testSubmodelEvents() throws InterruptedException {
86+
// we expect the "onStartup" submodel created event
87+
SubmodelEvent evt = listener.next();
88+
Assert.assertEquals(SubmodelEventType.SM_CREATED, evt.getType());
89+
Assert.assertEquals(submodel.getId(), evt.getId());
90+
Assert.assertEquals(submodel, evt.getSubmodel());
91+
Assert.assertNull(evt.getSmElementPath());
92+
Assert.assertNull(evt.getSmElement());
93+
94+
// simulate closing
95+
context.publishEvent(new ContextClosedEvent(context));
96+
evt = listener.next();
97+
Assert.assertEquals(SubmodelEventType.SM_DELETED, evt.getType());
98+
Assert.assertEquals(submodel.getId(), evt.getId());
99+
Assert.assertNull(evt.getSubmodel());
100+
Assert.assertNull(evt.getSmElementPath());
101+
Assert.assertNull(evt.getSmElement());
102+
}
103+
104+
105+
}

0 commit comments

Comments
 (0)