Skip to content

Commit

Permalink
IKC-316 Mange topics (CRUD)
Browse files Browse the repository at this point in the history
  • Loading branch information
Piotr Belke authored and Piotr Belke committed Jan 6, 2024
1 parent e48421a commit aba0116
Show file tree
Hide file tree
Showing 44 changed files with 854 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import static com.consdata.kouncil.config.security.RoleNames.EDITOR_ROLE;
import static com.consdata.kouncil.config.security.RoleNames.VIEWER_ROLE;

import com.consdata.kouncil.KouncilRuntimeException;
import com.consdata.kouncil.logging.EntryExitLogger;
import javax.annotation.security.RolesAllowed;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
Expand Down Expand Up @@ -58,4 +61,32 @@ public void resend(@RequestBody TopicResendEventsModel resendData,
log.debug("TCS01 topicName={}, serverId={}, message={}", resendData.getSourceTopicName(), serverId, resendData);
topicService.resend(resendData, serverId);
}

@RolesAllowed({EDITOR_ROLE})
@PostMapping("/api/topic/create")
@EntryExitLogger
public void create(@RequestBody TopicData newTopic, @RequestParam("serverId") String serverId) throws KouncilRuntimeException {
topicService.create(newTopic, serverId);
}

@RolesAllowed({EDITOR_ROLE})
@PutMapping("/api/topic/partitions/update")
@EntryExitLogger
public void updateTopicPartitions(@RequestBody TopicData newTopic, @RequestParam("serverId") String serverId) throws KouncilRuntimeException {
topicService.updateTopicPartitions(newTopic, serverId);
}

@RolesAllowed({EDITOR_ROLE})
@GetMapping("/api/topic/{topicName}")
@EntryExitLogger
public TopicData getTopicData(@PathVariable("topicName") String topicName, @RequestParam("serverId") String serverId) throws KouncilRuntimeException {
return topicService.getTopicData(topicName, serverId);
}

@RolesAllowed({EDITOR_ROLE})
@DeleteMapping("/api/topic/{topicName}")
@EntryExitLogger
public void removeTopic(@PathVariable("topicName") String topicName, @RequestParam("serverId") String serverId) throws KouncilRuntimeException {
topicService.removeTopic(topicName, serverId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.consdata.kouncil.topic;

import lombok.Builder;
import lombok.Data;
import lombok.ToString;

@Data
@Builder
@ToString
public class TopicData {

private String name;
private int partitions;
private short replicationFactor;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.consdata.kouncil.topic;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

import com.consdata.kouncil.KafkaConnectionService;
import com.consdata.kouncil.KouncilRuntimeException;
import com.consdata.kouncil.MessagesHelper;
Expand All @@ -9,10 +12,29 @@
import com.consdata.kouncil.topic.util.FieldType;
import com.consdata.kouncil.topic.util.PlaceholderFormatUtil;
import com.consdata.kouncil.track.TopicMetadata;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -28,22 +50,6 @@
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

@Slf4j
@Service
@RequiredArgsConstructor
Expand All @@ -59,13 +65,13 @@ public class TopicService {
private String[] resendHeadersToKeep;

TopicMessagesDto getTopicMessages(@PathVariable("topicName") String topicName,
@PathVariable("partition") String partitions,
@RequestParam("page") String pageParam,
@RequestParam("limit") String limitParam,
@RequestParam(value = "beginningTimestampMillis", required = false) Long beginningTimestampMillis,
@RequestParam(value = "endTimestampMillis", required = false) Long endTimestampMillis,
@RequestParam(value = "offset", required = false) Long offset,
@RequestParam("serverId") String serverId) {
@PathVariable("partition") String partitions,
@RequestParam("page") String pageParam,
@RequestParam("limit") String limitParam,
@RequestParam(value = "beginningTimestampMillis", required = false) Long beginningTimestampMillis,
@RequestParam(value = "endTimestampMillis", required = false) Long endTimestampMillis,
@RequestParam(value = "offset", required = false) Long offset,
@RequestParam("serverId") String serverId) {
messagesHelper.validateTopics(serverId, singletonList(topicName));
int limit = Integer.parseInt(limitParam); // per partition!
long page = Long.parseLong(pageParam); // per partition!
Expand All @@ -82,7 +88,8 @@ TopicMessagesDto getTopicMessages(@PathVariable("topicName") String topicName,

Long startOffsetForPartition = metadata.getBeginningOffsets().get(partitionIndex);
Long endOffsetForPartition = metadata.getEndOffsets().get(partitionIndex);
log.debug("TCM50 partition={}, startOffsetForPartition={}, endOffsetForPartition={}", partitionIndex, startOffsetForPartition, endOffsetForPartition);
log.debug("TCM50 partition={}, startOffsetForPartition={}, endOffsetForPartition={}", partitionIndex, startOffsetForPartition,
endOffsetForPartition);
if (startOffsetForPartition < 0) {
log.debug("TCM51 startOffsetForPartition is -1, seekToEnd");
consumer.seekToEnd(singletonList(partition));
Expand Down Expand Up @@ -110,14 +117,16 @@ TopicMessagesDto getTopicMessages(@PathVariable("topicName") String topicName,
log.debug("TCM90 poll completed records.size={}", messages.size());
messages.sort(Comparator.comparing(TopicMessage::getTimestamp));

long totalResult = metadata.getEndOffsets().keySet().stream().map(index -> metadata.getEndOffsets().get(index) - metadata.getBeginningOffsets().get(index)).reduce(0L, Long::max);
long totalResult = metadata.getEndOffsets().keySet().stream()
.map(index -> metadata.getEndOffsets().get(index) - metadata.getBeginningOffsets().get(index)).reduce(0L, Long::max);
TopicMessagesDto topicMessagesDto = TopicMessagesDto.builder()
.messages(messages)
.partitionOffsets(metadata.getBeginningOffsets())
.partitionEndOffsets(metadata.getEndOffsets())
.totalResults(totalResult)
.build();
log.debug("TCM99 topicName={}, partition={}, page={} topicMessages.size={}, totalResult={}", topicName, partitions, page, topicMessagesDto.getMessages().size(), totalResult);
log.debug("TCM99 topicName={}, partition={}, page={} topicMessages.size={}, totalResult={}", topicName, partitions, page,
topicMessagesDto.getMessages().size(), totalResult);
return topicMessagesDto;
}
}
Expand Down Expand Up @@ -156,10 +165,11 @@ private TopicMetadata prepareMetadata(
}

/**
* Sometimes poll after seek returns no results.
* So we try to call it until we receive 5 consecutive empty polls or have enough messages or received last message
* Sometimes poll after seek returns no results. So we try to call it until we receive 5 consecutive empty polls or have enough messages or received last
* message
*/
private void pollMessages(String clusterId, int limit, KafkaConsumer<Bytes, Bytes> consumer, Long endOffset, TopicPartition partition, List<TopicMessage> messages) {
private void pollMessages(String clusterId, int limit, KafkaConsumer<Bytes, Bytes> consumer, Long endOffset, TopicPartition partition,
List<TopicMessage> messages) {
int emptyPolls = 0;
int messagesCount = 0;
long lastOffset = 0;
Expand All @@ -172,7 +182,8 @@ private void pollMessages(String clusterId, int limit, KafkaConsumer<Bytes, Byte
}
for (ConsumerRecord<Bytes, Bytes> consumerRecord : records) {
if (consumerRecord.offset() >= endOffset) {
log.debug("TCM70 record offset greater than endOffset! partition={}, offset={}, endOffset={}", consumerRecord.partition(), consumerRecord.offset(), endOffset);
log.debug("TCM70 record offset greater than endOffset! partition={}, offset={}, endOffset={}", consumerRecord.partition(),
consumerRecord.offset(), endOffset);
messagesCount = limit;
continue;
}
Expand Down Expand Up @@ -235,7 +246,8 @@ public void resend(TopicResendEventsModel resendParams, String serverId) {
break;
}
resentMessagesCount++;
resendOneRecord(consumerRecord, kafkaTemplate, resendParams.getDestinationTopicName(), resendParams.getDestinationTopicPartition(), resendParams.isShouldFilterOutHeaders());
resendOneRecord(consumerRecord, kafkaTemplate, resendParams.getDestinationTopicName(), resendParams.getDestinationTopicPartition(),
resendParams.isShouldFilterOutHeaders());
}
log.info("Resent {}% completed", resentMessagesCount * 100 / (resendParams.getOffsetEnd() - resendParams.getOffsetBeginning() + 1));
}
Expand All @@ -259,16 +271,18 @@ private static void validateOffsetRange(TopicResendEventsModel resendParams, Kaf
}
}

private void resendOneRecord(ConsumerRecord<Bytes, Bytes> consumerRecord, KafkaTemplate<Bytes, Bytes> kafkaTemplate, String destinationTopic, Integer destinationTopicPartition, boolean shouldFilterOutHeaders) {
ProducerRecord<Bytes, Bytes> producerRecord = new ProducerRecord<>(destinationTopic, destinationTopicPartition, consumerRecord.key(), consumerRecord.value(),
private void resendOneRecord(ConsumerRecord<Bytes, Bytes> consumerRecord, KafkaTemplate<Bytes, Bytes> kafkaTemplate, String destinationTopic,
Integer destinationTopicPartition, boolean shouldFilterOutHeaders) {
ProducerRecord<Bytes, Bytes> producerRecord = new ProducerRecord<>(destinationTopic, destinationTopicPartition, consumerRecord.key(),
consumerRecord.value(),
shouldFilterOutHeaders ? getFilteredOutHeaders(consumerRecord.headers()) : consumerRecord.headers());
kafkaTemplate.send(producerRecord);
}

private List<Header> getFilteredOutHeaders(Headers headers) {
return Arrays.stream(headers.toArray())
.filter(header -> ArrayUtils.contains(resendHeadersToKeep, header.key()))
.collect(Collectors.toList());
.toList();
}

private String replaceTokens(String data, int i) {
Expand All @@ -288,10 +302,62 @@ public void send(String topicName, int count, TopicMessage message, String serve
for (TopicMessageHeader header : message.getHeaders()) {
producerRecord
.headers()
.add(replaceTokens(header.getKey(), i), header.getValue() != null ? replaceTokens(header.getValue(), i).getBytes(StandardCharsets.UTF_8) : null);
.add(replaceTokens(header.getKey(), i),
header.getValue() != null ? replaceTokens(header.getValue(), i).getBytes(StandardCharsets.UTF_8) : null);
}
kafkaTemplate.send(producerRecord);
}
kafkaTemplate.flush();
}

public void create(TopicData newTopic, String serverId) throws KouncilRuntimeException {
log.info("Create new topic with name: {}", newTopic.getName());
try {
CreateTopicsResult topics = kafkaConnectionService.getAdminClient(serverId).createTopics(List.of(
new NewTopic(newTopic.getName(), newTopic.getPartitions(), newTopic.getReplicationFactor())
));

topics.all().get();
} catch (ExecutionException e) {
throw new KouncilRuntimeException(e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KouncilRuntimeException(e.getMessage());
}
}

public void updateTopicPartitions(TopicData newTopic, String serverId) {
log.info("Updating topic {} partitions", newTopic.getName());
kafkaConnectionService.getAdminClient(serverId).createPartitions(Map.of(newTopic.getName(), NewPartitions.increaseTo(newTopic.getPartitions())));
}

public TopicData getTopicData(String topicName, String serverId) {
log.info("Get topic {} from server {}", topicName, serverId);
DescribeTopicsResult describeTopicsResult = kafkaConnectionService.getAdminClient(serverId).describeTopics(List.of(topicName));
TopicData topicData;
try {
TopicDescription topicDescription = describeTopicsResult.allTopicNames().get().get(topicName);
topicData = new TopicData(topicDescription.name(), topicDescription.partitions().size(),
(short) topicDescription.partitions().get(0).replicas().size());
} catch (ExecutionException e) {
throw new KouncilRuntimeException(e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KouncilRuntimeException(e.getMessage());
}
return topicData;
}

public void removeTopic(String topicName, String serverId) {
log.info("Delete topic {}", topicName);
DeleteTopicsResult deleteTopicsResult = kafkaConnectionService.getAdminClient(serverId).deleteTopics(List.of(topicName));
try {
deleteTopicsResult.all().get();
} catch (ExecutionException e) {
throw new KouncilRuntimeException(e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KouncilRuntimeException(e.getMessage());
}
}
}
1 change: 1 addition & 0 deletions kouncil-frontend/angular.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"version": 2,
"projects": {
"common-auth": "libs/common-auth",
"common-components": "libs/common-components",
"common-login": "libs/common-login",
"common-model": "libs/common-model",
Expand Down
20 changes: 19 additions & 1 deletion kouncil-frontend/apps/kouncil/src/app/app-factories.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import {HttpClient} from '@angular/common/http';
import {Backend} from '@app/common-model';
import {environment} from '../environments/environment';
import {TopicsBackendService, TopicsDemoService, TopicsService} from '@app/feat-topics';
import {
TopicsBackendService,
TopicsDemoService,
TopicService,
TopicsService,
TopicBackendService,
TopicDemoService
} from '@app/feat-topics';
import {SendBackendService, SendDemoService, SendService} from '@app/feat-send';
import {ResendBackendService, ResendDemoService, ResendService} from '@app/resend-events';

Expand All @@ -16,6 +23,17 @@ export function topicsServiceFactory(http: HttpClient): TopicsService {
}
}

export function topicServiceFactory(http: HttpClient): TopicService{
switch (environment.backend) {
case Backend.SERVER: {
return new TopicBackendService(http);
}
case Backend.DEMO:
default:
return new TopicDemoService();
}
}

export function sendServiceFactory(http: HttpClient): SendService {
switch (environment.backend) {
case Backend.SERVER: {
Expand Down
Loading

0 comments on commit aba0116

Please sign in to comment.