Skip to content

Commit

Permalink
#59 add topicList context menu item to open LagViewer only showing co…
Browse files Browse the repository at this point in the history
…nsuming groups
  • Loading branch information
patschuh committed Mar 15, 2024
1 parent e0f23b1 commit af6c740
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

group = 'at.esque.kafka'
version = '2.8.0'
version = '2.9.0'

repositories {
mavenCentral()
Expand Down
28 changes: 26 additions & 2 deletions src/main/java/at/esque/kafka/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,16 @@ private ListCell<String> topicListCellFactory() {

ContextMenu contextMenu = new ContextMenu();


MenuItem infoItem = new MenuItem();
infoItem.textProperty().set("describe");
infoItem.setGraphic(new FontIcon(FontAwesome.INFO));
infoItem.setOnAction(event -> showDescribeTopicDialog(selectedTopic()));

MenuItem consumingGroupsItem = new MenuItem();
consumingGroupsItem.textProperty().set("show consumer groups");
consumingGroupsItem.setGraphic(new FontIcon(FontAwesome.OBJECT_GROUP));
consumingGroupsItem.setOnAction(event -> showLagViewerForTopic(selectedTopic()));

MenuItem configMessageTypesItem = new MenuItem();
configMessageTypesItem.textProperty().set("configure message types");
configMessageTypesItem.setGraphic(new FontIcon(FontAwesome.WRENCH));
Expand Down Expand Up @@ -533,7 +537,7 @@ private ListCell<String> topicListCellFactory() {
}
}
});
contextMenu.getItems().addAll(infoItem, configMessageTypesItem, traceItem, deleteItem);
contextMenu.getItems().addAll(infoItem, configMessageTypesItem, consumingGroupsItem, traceItem, deleteItem);

cell.textProperty().bind(cell.itemProperty());

Expand Down Expand Up @@ -753,6 +757,26 @@ public void lagViewerClick(ActionEvent actionEvent) {
}
}

public void showLagViewerForTopic(String topic) {
try {
FXMLLoader fxmlLoader = injector.getInstance(FXMLLoader.class);
fxmlLoader.setLocation(getClass().getResource("/fxml/lagViewer.fxml"));
Parent root1 = fxmlLoader.load();
LagViewerController controller = fxmlLoader.getController();
controller.setup(adminClient, Set.of(topic));
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH)));
stage.initOwner(controlledStage);
stage.initModality(Modality.NONE);
stage.setTitle("Groups consuming " + topic + " - " + selectedCluster().getIdentifier());
stage.setScene(Main.createStyledScene(root1, 1200, 800));
stage.show();
centerStageOnControlledStage(stage);
} catch (Exception e) {
ErrorAlert.show(e, controlledStage);
}
}

@FXML
public void aclViewer(ActionEvent actionEvent) {
try {
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,21 @@ public DescribeTopicWrapper describeTopic(String topic) {
}
}

public List<Lag> getConsumerGroupLags() {
public List<Lag> getConsumerGroupLags(Set<String> consumedTopicFilterParameters) {
ListConsumerGroupsResult result = adminClient.listConsumerGroups();
try {
Collection<ConsumerGroupListing> consumerGroupListings = result.all().get();
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(buildConsumerGroupsOffsetSpecMap(consumerGroupListings));
Map<String, Map<TopicPartition, OffsetAndMetadata>> consumerGroupTopicPartitionOffsetMap = listConsumerGroupOffsetsResult.all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = adminClient.listOffsets(buildListEndOffsetParameters(consumerGroupTopicPartitionOffsetMap)).all().get();
return consumerGroupTopicPartitionOffsetMap.entrySet().stream().map(consumerGroup -> {
Map<String, Map<TopicPartition, OffsetAndMetadata>> filteredConsumerGroups = consumerGroupTopicPartitionOffsetMap.entrySet().stream().filter(stringMapEntry -> {
if (consumedTopicFilterParameters == null) {
return true;
}
return stringMapEntry.getValue().keySet().stream()
.anyMatch(topicPartition -> consumedTopicFilterParameters.contains(topicPartition.topic()));
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = adminClient.listOffsets(buildListEndOffsetParameters(filteredConsumerGroups)).all().get();
return filteredConsumerGroups.entrySet().stream().map(consumerGroup -> {
Map<TopicPartition, Long> relevantEndOffsets = buildRelevantOffsetsMap(topicPartitionListOffsetsResultInfoMap, consumerGroup.getValue().keySet());
Lag lag = new Lag(
consumerGroup.getKey(),
Expand Down
19 changes: 17 additions & 2 deletions src/main/java/at/esque/kafka/lag/viewer/LagViewerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class LagViewerController {
Expand All @@ -35,6 +36,8 @@ public class LagViewerController {

private KafkaesqueAdminClient adminClient;

private Set<String> consumedTopicFilterParameters;

private BooleanProperty refreshRunning = new SimpleBooleanProperty(false);

@FXML
Expand All @@ -45,7 +48,12 @@ public void initialize() {
}

public void setup(KafkaesqueAdminClient adminClient) {
setup(adminClient, null);
}

public void setup(KafkaesqueAdminClient adminClient, Set<String> consumedTopicFilterParameters) {
this.adminClient = adminClient;
this.consumedTopicFilterParameters = consumedTopicFilterParameters;
titleLabel.textProperty().bind(Bindings.createStringBinding(() -> {
Lag selectedItem = consumerGroupList.getListView().getSelectionModel().getSelectedItem();
if (selectedItem != null) {
Expand Down Expand Up @@ -85,7 +93,14 @@ public void setup(KafkaesqueAdminClient adminClient) {
});

partitionOffsetList.setStringifierFunction(Lag::getTitle);
partitionOffsetList.setListComparator(Comparator.comparing(Lag::getTitle));
partitionOffsetList.setListComparator(Comparator.comparing(lag -> {
try {
return Integer.parseInt(lag.getTitle());
} catch (NumberFormatException e) {
logger.warn("partion lag title " + lag.getTitle() +" could not be parsed as int", e);
return Integer.MAX_VALUE;
}
}));
partitionOffsetList.filterTextFieldDisableProperty().bind(refreshRunning);

this.adminClient = adminClient;
Expand All @@ -98,7 +113,7 @@ private void startRefreshList() {
runInDaemonThread(() -> {
Platform.runLater(() -> {
refreshRunning.setValue(true);
consumerGroupList.setItems(FXCollections.observableArrayList(adminClient.getConsumerGroupLags()));
consumerGroupList.setItems(FXCollections.observableArrayList(adminClient.getConsumerGroupLags(consumedTopicFilterParameters)));
refreshConsumerGroupsInFxThreadDone.set(true);
});
while (!refreshConsumerGroupsInFxThreadDone.get()) {
Expand Down

0 comments on commit af6c740

Please sign in to comment.