Skip to content

Commit f530915

Browse files
authored
Add Apache Kafka Event Interface For Repositories (eclipse-basyx#523)
* Add Kafka repository event integration refs eclipse-basyx#522 * Merge the main branch and implement missing method * Add test properties again and fix wrong beans construction * Fixed automatic upload handling and improved test case path management - Corrected automatic upload of files by ensuring paths are annotated with @value, so the correct files are uploaded as expected. - Removed the `basyx.environment` property from tests, as it is unrelated to the test scope and streamlines the configuration. * Fix incorrect bean registration in test cases This commit resolves an issue where a Submodel, which should only be used in submodel-repository tests, was mistakenly registered in environment-related tests. The Submodel is now conditionally provided via profiles, ensuring it is only available in the appropriate tests and no longer causes failures in the environment tests. * Fix missing deserializer bean test * Add @value annotation back again for the member variable basyx environment is not assigned in auth preloader otherwise * Trigger build * assign another consumer group group could already be handled by another listener * Add newest kafka version and use single broker setup with kRaft and without zookeeper * Update docker-compose.yml * Add Kafka Example Project (eclipse-basyx#522) - Add default topic names for Kafka setup - Update and improve README files for better clarity and structure - Adjust tests and code to resolve merge conflicts - Fix failing test case that was breaking the CI process * Cleanup events before teststart * Cleanup topics on teststart and add uncommented test * Revert changes and fix README file nameing * Add newline * Add newline * Add newlines * Add newlines * Add cleanup methods for tests * Fix imports and use cleanup methods * Cleanup registries before test run * Update topic names and wait intervals * alwayx use seekToEnd for kafka tests * Remove seek-to-end method and use the same group-id for the tests intead
1 parent 32d5165 commit f530915

File tree

78 files changed

+5009
-113
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+5009
-113
lines changed

basyx.aasenvironment/basyx.aasenvironment.component/pom.xml

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,32 @@
8383
<groupId>org.eclipse.digitaltwin.basyx</groupId>
8484
<artifactId>basyx.aasenvironment-feature-authorization</artifactId>
8585
</dependency>
86+
<dependency>
87+
<groupId>org.eclipse.digitaltwin.basyx</groupId>
88+
<artifactId>basyx.aasrepository-feature-kafka</artifactId>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.eclipse.digitaltwin.basyx</groupId>
92+
<artifactId>basyx.submodelrepository-feature-kafka</artifactId>
93+
</dependency>
94+
<dependency>
95+
<groupId>org.eclipse.digitaltwin.basyx</groupId>
96+
<artifactId>basyx.aasrepository-feature-kafka</artifactId>
97+
<classifier>tests</classifier>
98+
<scope>test</scope>
99+
</dependency>
100+
<dependency>
101+
<groupId>org.eclipse.digitaltwin.basyx</groupId>
102+
<artifactId>basyx.submodelservice-feature-kafka</artifactId>
103+
<classifier>tests</classifier>
104+
<scope>test</scope>
105+
</dependency>
106+
<dependency>
107+
<groupId>org.eclipse.digitaltwin.basyx</groupId>
108+
<artifactId>basyx.submodelrepository-feature-kafka</artifactId>
109+
<classifier>tests</classifier>
110+
<scope>test</scope>
111+
</dependency>
86112
<dependency>
87113
<groupId>org.eclipse.digitaltwin.basyx</groupId>
88114
<artifactId>basyx.http</artifactId>
@@ -100,8 +126,14 @@
100126
<scope>test</scope>
101127
</dependency>
102128
<dependency>
103-
<groupId>org.eclipse.digitaltwin.basyx</groupId>
104-
<artifactId>basyx.mongodbcore</artifactId>
129+
<groupId>org.springframework.boot</groupId>
130+
<artifactId>spring-boot-starter-test</artifactId>
131+
<scope>test</scope>
132+
</dependency>
133+
<dependency>
134+
<groupId>org.junit.vintage</groupId>
135+
<artifactId>junit-vintage-engine</artifactId>
136+
<scope>test</scope>
105137
</dependency>
106138
<dependency>
107139
<groupId>org.springframework.boot</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2024 DFKI GmbH (https://www.dfki.de/en/web)
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining
5+
* a copy of this software and associated documentation files (the
6+
* "Software"), to deal in the Software without restriction, including
7+
* without limitation the rights to use, copy, modify, merge, publish,
8+
* distribute, sublicense, and/or sell copies of the Software, and to
9+
* permit persons to whom the Software is furnished to do so, subject to
10+
* the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be
13+
* included in all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16+
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17+
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18+
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
19+
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
20+
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21+
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22+
*
23+
* SPDX-License-Identifier: MIT
24+
*
25+
******************************************************************************/
26+
package org.eclipse.digitaltwin.basyx.aasenvironment.component;
27+
28+
import java.util.concurrent.TimeUnit;
29+
30+
import org.eclipse.digitaltwin.aas4j.v3.dataformat.json.JsonSerializer;
31+
import org.eclipse.digitaltwin.aas4j.v3.model.AssetAdministrationShell;
32+
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
33+
import org.eclipse.digitaltwin.basyx.aasrepository.AasRepository;
34+
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.AasEventKafkaListener;
35+
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.KafkaAasRepositoryFeature;
36+
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.TestShells;
37+
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEvent;
38+
import org.eclipse.digitaltwin.basyx.core.pagination.PaginationInfo;
39+
import org.eclipse.digitaltwin.basyx.submodelrepository.SubmodelRepository;
40+
import org.eclipse.digitaltwin.basyx.submodelrepository.feature.kafka.KafkaSubmodelRepositoryFeature;
41+
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.SubmodelEventKafkaListener;
42+
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.TestSubmodels;
43+
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEvent;
44+
import org.junit.After;
45+
import org.junit.Assert;
46+
import org.junit.Before;
47+
import org.junit.Test;
48+
import org.junit.runner.RunWith;
49+
import org.springframework.beans.factory.annotation.Autowired;
50+
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
51+
import org.springframework.boot.test.context.SpringBootTest;
52+
import org.springframework.context.annotation.ComponentScan;
53+
import org.springframework.context.annotation.Import;
54+
import org.springframework.http.MediaType;
55+
import org.springframework.test.annotation.DirtiesContext;
56+
import org.springframework.test.annotation.DirtiesContext.ClassMode;
57+
import org.springframework.test.context.TestPropertySource;
58+
import org.springframework.test.context.junit4.SpringRunner;
59+
import org.springframework.test.web.servlet.MockMvc;
60+
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
61+
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
62+
63+
/**
64+
* @author sonnenberg (DFKI GmbH)
65+
*/
66+
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
67+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
68+
@ComponentScan(basePackages = { "org.eclipse.digitaltwin.basyx"})
69+
@RunWith(SpringRunner.class)
70+
@TestPropertySource(properties = {
71+
"basyx.environment=",
72+
"basyx.feature.kafka.enabled=true",
73+
"spring.kafka.bootstrap-servers=PLAINTEXT_HOST://localhost:9092"
74+
})
75+
@AutoConfigureMockMvc
76+
@Import({ SubmodelEventKafkaListener.class, AasEventKafkaListener.class})
77+
public class KafkaEventsInMemoryStorageIntegrationTest {
78+
79+
@Autowired
80+
private AasEventKafkaListener aasEventListener;
81+
82+
@Autowired
83+
private SubmodelEventKafkaListener submodelEventListener;
84+
85+
@Autowired
86+
private KafkaAasRepositoryFeature aasFeature;
87+
88+
@Autowired
89+
private KafkaSubmodelRepositoryFeature submodelFeature;
90+
91+
@Autowired
92+
private MockMvc mvc;
93+
94+
@Autowired
95+
private JsonSerializer serializer;
96+
97+
@Autowired
98+
private SubmodelRepository smRepo;
99+
100+
@Autowired
101+
private AasRepository aasRepo;
102+
103+
@Before
104+
public void awaitAssignment() throws InterruptedException {
105+
aasEventListener.awaitTopicAssignment();
106+
submodelEventListener.awaitTopicAssignment();
107+
108+
cleanup();
109+
}
110+
111+
@Test
112+
public void testCreateAas() throws Exception {
113+
AssetAdministrationShell shell = TestShells.shell();
114+
String body = serializer.write(shell);
115+
116+
mvc.perform(MockMvcRequestBuilders.post("/shells").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON))
117+
.andExpect(MockMvcResultMatchers.status().isCreated())
118+
.andExpect(MockMvcResultMatchers.content().json(body));
119+
AasEvent aasEvt = aasEventListener.next();
120+
Assert.assertEquals(shell, aasEvt.getAas());
121+
Assert.assertEquals(shell.getId(), aasEvt.getId());
122+
Assert.assertNull(aasEvt.getSubmodelId());
123+
Assert.assertNull(aasEvt.getAssetInformation());
124+
Assert.assertNull(aasEvt.getReference());
125+
126+
Submodel sm = TestSubmodels.createSubmodel("http://submodels/123", "123", "hello");
127+
body = serializer.write(sm);
128+
mvc.perform(MockMvcRequestBuilders.post("/submodels").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON))
129+
.andExpect(MockMvcResultMatchers.status().isCreated());
130+
SubmodelEvent smEvt = submodelEventListener.next();
131+
Assert.assertEquals(sm, smEvt.getSubmodel());
132+
Assert.assertEquals(sm.getId(), smEvt.getId());
133+
Assert.assertNull(smEvt.getSmElement());
134+
Assert.assertNull(smEvt.getSmElementPath());
135+
}
136+
137+
138+
@Test
139+
public void testFeatureIsEnabled() {
140+
Assert.assertTrue(aasFeature.isEnabled());
141+
Assert.assertTrue(submodelFeature.isEnabled());
142+
}
143+
144+
@After
145+
public void cleanup() throws InterruptedException {
146+
for (AssetAdministrationShell aas : aasRepo.getAllAas(new PaginationInfo(null, null)).getResult()) {
147+
aasRepo.deleteAas(aas.getId());
148+
}
149+
150+
for (Submodel sm : smRepo.getAllSubmodels(new PaginationInfo(null, null)).getResult()) {
151+
smRepo.deleteSubmodel(sm.getId());
152+
}
153+
while(submodelEventListener.next(300, TimeUnit.MICROSECONDS) != null);
154+
}
155+
}

basyx.aasregistry/basyx.aasregistry-service-basetests/src/main/java/org/eclipse/digitaltwin/basyx/aasregistry/service/tests/integration/BaseIntegrationTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -139,20 +139,23 @@ public abstract class BaseIntegrationTest {
139139
protected RegistryAndDiscoveryInterfaceApi api;
140140

141141
@Before
142-
public void initClient() throws ApiException {
142+
public void setUp() throws Exception {
143+
initClient();
144+
cleanup();
145+
}
146+
147+
protected void initClient() throws Exception {
143148
api = new RegistryAndDiscoveryInterfaceApi("http", "127.0.0.1", port);
144-
api.deleteAllShellDescriptors();
145-
queue().assertNoAdditionalMessage();
146149
}
147150

148-
@After
149-
public void cleanup() throws ApiException {
150-
queue().assertNoAdditionalMessage();
151+
protected void cleanup() throws ApiException, InterruptedException {
152+
queue().pullAdditionalMessages();
151153
GetAssetAdministrationShellDescriptorsResult result = api.getAllAssetAdministrationShellDescriptors(null, null, null, null);
152154
for (AssetAdministrationShellDescriptor eachDescriptor : result.getResult()) {
153155
api.deleteAssetAdministrationShellDescriptorById(eachDescriptor.getId());
154156
assertThatEventWasSend(RegistryEvent.builder().id(eachDescriptor.getId()).type(EventType.AAS_UNREGISTERED).build());
155157
}
158+
queue().pullAdditionalMessages();
156159
}
157160

158161
@Test
@@ -230,7 +233,6 @@ public void whenDeleteAll_thenAllDescriptorsAreRemoved() throws ApiException {
230233
assertThat(events.remove(RegistryEvent.builder().id("id_" + i).type(EventType.AAS_UNREGISTERED).build())).isTrue();
231234
}
232235
assertThat(events.isEmpty());
233-
queue().assertNoAdditionalMessage();
234236
}
235237

236238
@Test
@@ -246,7 +248,7 @@ public void whenCreateAndDeleteDescriptors_thenAllDescriptorsAreRemoved() throws
246248
all = api.getAllAssetAdministrationShellDescriptors(null, null, null, null).getResult();
247249
assertThat(all).isEmpty();
248250

249-
queue().assertNoAdditionalMessage();
251+
queue().pullAdditionalMessages();
250252
}
251253

252254
@Test
@@ -279,7 +281,7 @@ public void whenRegisterAndUnregisterSubmodel_thenSubmodelIsCreatedAndDeleted()
279281
aasDescriptor = api.getAssetAdministrationShellDescriptorById(aasId);
280282
assertThat(aasDescriptor.getSubmodelDescriptors()).doesNotContain(toRegister);
281283

282-
queue().assertNoAdditionalMessage();
284+
queue().pullAdditionalMessages();
283285
}
284286

285287
@Test

basyx.aasregistry/basyx.aasregistry-service-basetests/src/main/java/org/eclipse/digitaltwin/basyx/aasregistry/service/tests/integration/EventQueue.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,8 @@ public void reset() {
5555
}
5656
}
5757

58-
public void assertNoAdditionalMessage() {
59-
try {
60-
String message = messageQueue.poll(1, TimeUnit.SECONDS);
61-
if (message != null) {
62-
throw new EventListenerException("Got additional message: " + message);
63-
}
64-
} catch (InterruptedException e) {
65-
Thread.currentThread().interrupt();
66-
throw new EventListenerException(e);
67-
}
58+
public void pullAdditionalMessages() throws InterruptedException {
59+
while(messageQueue.poll(100, TimeUnit.MILLISECONDS) != null);
6860
}
6961

7062
public RegistryEvent poll() {
@@ -81,11 +73,11 @@ public RegistryEvent poll() {
8173
throw new EventListenerException(e);
8274
}
8375
}
84-
76+
8577
public static final class EventListenerException extends RuntimeException {
8678

8779
private static final long serialVersionUID = 1L;
88-
80+
8981
public EventListenerException(Throwable e) {
9082
super(e);
9183
}

basyx.aasregistry/basyx.aasregistry-service-release-kafka-mem/src/test/java/org/eclipse/digitaltwin/basyx/aasregistry/service/storage/memory/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,14 @@
2424
******************************************************************************/
2525
package org.eclipse.digitaltwin.basyx.aasregistry.service.storage.memory;
2626

27-
import java.util.List;
2827
import java.util.Map;
2928
import java.util.concurrent.CountDownLatch;
3029
import java.util.concurrent.TimeUnit;
3130

3231
import org.apache.kafka.common.TopicPartition;
3332
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.BaseIntegrationTest;
3433
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.EventQueue;
35-
import org.junit.Before;
3634
import org.springframework.beans.factory.annotation.Autowired;
37-
import org.springframework.beans.factory.annotation.Value;
3835
import org.springframework.kafka.annotation.KafkaHandler;
3936
import org.springframework.kafka.annotation.KafkaListener;
4037
import org.springframework.kafka.listener.ConsumerSeekAware;
@@ -51,10 +48,12 @@ public class KafkaEventsInMemoryStorageIntegrationTest extends BaseIntegrationTe
5148
@Autowired
5249
private RegistrationEventKafkaListener listener;
5350

54-
@Before
55-
public void awaitAssignment() throws InterruptedException {
51+
@Override
52+
public void setUp() throws Exception {
5653
listener.awaitTopicAssignment();
54+
super.setUp();
5755
}
56+
5857

5958
@Override
6059
public EventQueue queue() {
@@ -68,9 +67,6 @@ private static class RegistrationEventKafkaListener implements ConsumerSeekAware
6867
private final EventQueue queue;
6968
private final CountDownLatch latch = new CountDownLatch(1);
7069

71-
@Value("${spring.kafka.template.default-topic}")
72-
private String topicName;
73-
7470
@SuppressWarnings("unused")
7571
public RegistrationEventKafkaListener(ObjectMapper mapper) {
7672
this.queue = new EventQueue(mapper);
@@ -85,8 +81,7 @@ public void receiveMessage(String content) {
8581
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
8682
ConsumerSeekCallback callback) {
8783
for (TopicPartition eachPartition : assignments.keySet()) {
88-
if (topicName.equals(eachPartition.topic())) {
89-
callback.seekToEnd(List.of(eachPartition));
84+
if ("aas-registry".equals(eachPartition.topic())) {
9085
latch.countDown();
9186
}
9287
}

basyx.aasregistry/basyx.aasregistry-service-release-kafka-mongodb/src/test/java/org/eclipse/digitaltwin/basyx/aasregistry/service/storage/mongodb/AuthorizedClientTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,23 @@ public class AuthorizedClientTest extends BaseIntegrationTest {
6161
@Value("${local.server.port}")
6262
private int port;
6363

64-
@Before
65-
public void awaitAssignment() throws InterruptedException {
64+
@Override
65+
public void setUp() throws Exception {
6666
listener.awaitTopicAssignment();
67+
super.setUp();
6768
}
6869

6970
@Override
7071
public EventQueue queue() {
7172
return listener.getQueue();
7273
}
7374

74-
@Before
7575
@Override
76-
public void initClient() throws ApiException {
76+
public void initClient() throws Exception {
7777
api = new AuthorizedConnectedAasRegistry("http://127.0.0.1:" + port, new TokenManager("http://localhost:9096/realms/BaSyx/protocol/openid-connect/token", new ClientCredentialAccessTokenProvider(new ClientCredential("workstation-1", "nY0mjyECF60DGzNmQUjL81XurSl8etom"))));
7878

7979
api.deleteAllShellDescriptors();
80-
queue().assertNoAdditionalMessage();
80+
queue().pullAdditionalMessages();
8181
}
8282

8383
@Test
@@ -110,4 +110,4 @@ public void whenPostShellDescriptor_LocationIsReturned() throws ApiException, IO
110110
// TODO: It uses normal GET unauthorized request, need to override and refactor
111111
}
112112

113-
}
113+
}

0 commit comments

Comments
 (0)