Skip to content

Commit

Permalink
fix: Use durable consumer for GitHub webhook events (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
egekocabas authored Feb 8, 2025
1 parent 4a3d005 commit 5aa9cc6
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/deploy_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ jobs:
echo "DATASOURCE_PASSWORD=${{ secrets.DATASOURCE_PASSWORD }}" >> .env
echo "NATS_SERVER=${{ secrets.NATS_SERVER }}" >> .env
echo "NATS_AUTH_TOKEN=${{ secrets.NATS_AUTH_TOKEN }}" >> .env
echo "NATS_DURABLE_CONSUMER_NAME=${{ vars.NATS_DURABLE_CONSUMER_NAME }}" >> .env
echo "NATS_CONSUMER_INACTIVE_THRESHOLD_MINUTES=${{ vars.NATS_CONSUMER_INACTIVE_THRESHOLD_MINUTES }}" >> .env
echo "NATS_CONSUMER_ACK_WAIT_SECONDS=${{ vars.NATS_CONSUMER_ACK_WAIT_SECONDS }}" >> .env
echo "WEBHOOK_SECRET=${{ secrets.WEBHOOK_SECRET }}" >> .env
echo "REPOSITORY_NAME=${{ vars.REPOSITORY_NAME }}" >> .env
echo "ORGANIZATION_NAME=${{ vars.ORGANIZATION_NAME }}" >> .env
Expand Down
5 changes: 4 additions & 1 deletion compose.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
- helios-network

nats-server:
image: nats:alpine
image: nats:2.10.25-alpine
ports:
- "4222:4222"
- "8222:8222"
Expand Down Expand Up @@ -88,6 +88,9 @@ services:
- DATASOURCE_PASSWORD=${DATASOURCE_PASSWORD}
- NATS_SERVER=${NATS_SERVER}
- NATS_AUTH_TOKEN=${NATS_AUTH_TOKEN}
- NATS_DURABLE_CONSUMER_NAME=${NATS_DURABLE_CONSUMER_NAME}
- NATS_CONSUMER_INACTIVE_THRESHOLD_MINUTES=${NATS_CONSUMER_INACTIVE_THRESHOLD_MINUTES}
- NATS_CONSUMER_ACK_WAIT_SECONDS=${NATS_CONSUMER_ACK_WAIT_SECONDS}
- ORGANIZATION_NAME=${ORGANIZATION_NAME}
- GITHUB_AUTH_TOKEN=${GITHUB_AUTH_TOKEN}
- REPOSITORY_NAME=${REPOSITORY_NAME}
Expand Down
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ services:
- helios-network
nats-server:
image: nats:alpine
image: nats:2.10.25-alpine
ports:
- "4222:4222"
- "8222:8222"
Expand Down
3 changes: 3 additions & 0 deletions docs/development/local/setup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ When Helios starts, it will look for all the **GitHub App** variables above:
- ``DATASOURCE_PASSWORD``: Database password
- ``NATS_SERVER``: NATS server URL
- ``NATS_AUTH_TOKEN``: Authorization token for NATS server. This token is used to authenticate different services with the NATS server.
- ``NATS_DURABLE_CONSUMER_NAME``: Name of the durable consumer for NATS server. With durable consumers, NATS remembers where it left off when the last event was acknowledged.
- ``NATS_CONSUMER_INACTIVE_THRESHOLD_MINUTES``: (Optional, default: 30) Specifies the time (in minutes) after which an inactive consumer is removed.
- ``NATS_CONSUMER_ACK_WAIT_SECONDS``: (Optional, default: 60) Specifies the time (in seconds) that NATS waits for a message acknowledgment before resending the message.
- ``REPOSITORY_NAME``: Name of the repository that should be synced (e.g. `ls1intum/Helios`)
- ``RUN_ON_STARTUP_COOLDOWN``: When server starts, it first checks the latest run of sync, if it is less than this value in minutes, it will not run the sync again
- ``OAUTH_ISSUER_URL``: URL to Keycloak realm
Expand Down
3 changes: 3 additions & 0 deletions server/application-server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ DATASOURCE_USERNAME=helios
DATASOURCE_PASSWORD=helios
NATS_SERVER=localhost:4222
NATS_AUTH_TOKEN=5760e8ae09adfb2756f9f8cd5cb2caa704cd3f549eaa9298be843ceb165185d815b81f90c680fa7f626b7cd63abf6ac9
NATS_DURABLE_CONSUMER_NAME=helios-github-consumer
NATS_CONSUMER_INACTIVE_THRESHOLD_MINUTES=30
NATS_CONSUMER_ACK_WAIT_SECONDS=60
REPOSITORY_NAME=<repository_name e.g. ls1intum/Helios>
RUN_ON_STARTUP_COOLDOWN=0 <when server starts, it first checks the latest run of sync, if it is less than this value in minutes, it will not run the sync again>
OAUTH_ISSUER_URL=http://localhost:8081/realms/helios-example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@
import io.nats.client.ConsumerContext;
import io.nats.client.JetStreamApiException;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.MessageConsumer;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.StreamContext;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.DeliverPolicy;
import io.sentry.Sentry;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.stream.Stream;
import lombok.extern.log4j.Log4j2;
import org.kohsuke.github.GHEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
Expand All @@ -46,27 +50,45 @@ public class NatsConsumerService {
@Value("${nats.durableConsumerName}")
private String durableConsumerName;

@Value("${nats.consumerInactiveThresholdMinutes}")
private int consumerInactiveThresholdMinutes;

@Value("${nats.consumerAckWaitSeconds}")
private int consumerAckWaitSeconds;

@Value("${monitoring.repositories}")
private String[] repositoriesToMonitor;

@Value("${nats.auth.token}")
private String natsAuthToken;

private final Environment environment;

private Connection natsConnection;
private ConsumerContext consumerContext;

/**
* Active NATS message consumer.
*/
private MessageConsumer messageConsumer;

private final Environment environment;

private final GitHubMessageHandlerRegistry handlerRegistry;

private final GitHubCustomMessageHandlerRegistry customHandlerRegistry;

private final NatsErrorListener natsErrorListener;

@Autowired
public NatsConsumerService(
Environment environment, GitHubMessageHandlerRegistry handlerRegistry,
GitHubCustomMessageHandlerRegistry customHandlerRegistry) {
Environment environment,
GitHubMessageHandlerRegistry handlerRegistry,
GitHubCustomMessageHandlerRegistry customHandlerRegistry,
@Lazy NatsErrorListener natsErrorListener) {
this.environment = environment;
this.handlerRegistry = handlerRegistry;
this.customHandlerRegistry = customHandlerRegistry;
this.natsErrorListener = natsErrorListener;
}

@EventListener(ApplicationReadyEvent.class)
Expand All @@ -87,9 +109,9 @@ public void init() {
while (true) {
try {
natsConnection = Nats.connect(options);
setupConsumer(natsConnection);
setupOrUpdateConsumer(natsConnection);
return;
} catch (IOException | InterruptedException e) {
} catch (IOException | InterruptedException | RuntimeException e) {
log.error("NATS connection error: {}", e.getMessage(), e);
}
}
Expand All @@ -111,48 +133,79 @@ private Options buildNatsOptions() {
.connectionListener(
(conn, type) ->
log.info("Connection event - Server: {}, {}", conn.getServerInfo().getPort(), type))
.errorListener(natsErrorListener)
.maxReconnects(-1)
.reconnectWait(Duration.ofSeconds(INITIAL_RECONNECT_DELAY_SECONDS))
.build();
}

private void setupConsumer(Connection connection) throws IOException, InterruptedException {
private synchronized void setupOrUpdateConsumer(Connection connection)
throws IOException, InterruptedException, RuntimeException {
try {
// Close old consumer if it exists
if (messageConsumer != null) {
messageConsumer.close();
log.info("Closed previous MessageConsumer.");
messageConsumer = null;
}

// Get the stream context for the "github" stream
StreamContext streamContext = connection.getStreamContext("github");
// Get the subjects to monitor
String[] subjects = getSubjects();
log.info("Setting up consumer for stream 'github' with subjects: {}",
Arrays.toString(subjects));


ConsumerConfiguration.Builder consumerConfigBuilder = null;

// Check if consumer already exists
// Check if a consumer with the given durable name already exists
ConsumerInfo existingConsumer = null;
if (durableConsumerName != null && !durableConsumerName.isEmpty()) {
try {
consumerContext = streamContext.getConsumerContext(durableConsumerName);
} catch (JetStreamApiException e) {
consumerContext = null;
existingConsumer = streamContext.getConsumerInfo(durableConsumerName);
log.info("Consumer '{}' found.", durableConsumerName);
} catch (Exception e) {
log.info("Consumer '{}' not found; a new one will be created.", durableConsumerName);
}
}

if (consumerContext == null) {
log.info("Setting up consumer for subjects: {}", Arrays.toString(getSubjects()));
ConsumerConfiguration.Builder consumerConfigBuilder =
ConsumerConfiguration.builder()
.filterSubjects(getSubjects())
.deliverPolicy(DeliverPolicy.ByStartTime)
.startTime(ZonedDateTime.now().minusDays(timeframe));
if (existingConsumer != null) {
// Use existing configuration as base
ConsumerConfiguration existingConfig = existingConsumer.getConsumerConfiguration();
log.info("Consumer '{}' already exists. Updating subjects.", durableConsumerName);
consumerConfigBuilder = ConsumerConfiguration.builder(existingConfig)
.inactiveThreshold(Duration.ofMinutes(consumerInactiveThresholdMinutes))
.ackWait(Duration.ofSeconds(consumerAckWaitSeconds))
.filterSubjects(subjects);
} else {
log.info("Creating new configuration for consumer.");
// Create new configuration with deliver policy and start time
consumerConfigBuilder = ConsumerConfiguration.builder()
.deliverPolicy(DeliverPolicy.ByStartTime)
.startTime(ZonedDateTime.now().minusDays(timeframe))
.inactiveThreshold(Duration.ofMinutes(consumerInactiveThresholdMinutes))
.ackWait(Duration.ofSeconds(consumerAckWaitSeconds))
.filterSubjects(subjects);

if (durableConsumerName != null && !durableConsumerName.isEmpty()) {
consumerConfigBuilder.durable(durableConsumerName);
consumerConfigBuilder = consumerConfigBuilder.durable(durableConsumerName);
}

ConsumerConfiguration consumerConfig = consumerConfigBuilder.build();
consumerContext = streamContext.createOrUpdateConsumer(consumerConfig);
} else {
log.info("Consumer already exists. Skipping consumer setup.");
}

MessageHandler handler = this::handleMessage;
consumerContext.consume(handler);
ConsumerConfiguration consumerConfig = consumerConfigBuilder.build();
consumerContext = streamContext.createOrUpdateConsumer(consumerConfig);
log.info("Consumer created or updated with name '{}' and configuration: {}",
consumerContext.getConsumerInfo().getName(), consumerConfig);

messageConsumer = consumerContext.consume(this::handleMessage);
log.info("Successfully started consuming messages.");
} catch (JetStreamApiException e) {
log.error("JetStream API exception: {}", e.getMessage(), e);
throw new IOException("Failed to set up consumer.", e);
} catch (Exception e) {
log.error("Error setting up consumer: {}", e.getMessage(), e);
throw new RuntimeException("Failed to set up consumer.", e);
}
}

Expand Down Expand Up @@ -190,6 +243,26 @@ private void handleMessage(Message msg) {
}
}

/**
* Reinitialize the NATS consumer.
*/
public synchronized void reinitializeConsumer() {
log.info("NATS Consumer reinitialization process started.");
if (natsConnection != null) {
try {
log.info("Attempting to reinitialize the NATS consumer...");
setupOrUpdateConsumer(natsConnection);
log.info("NATS consumer reinitialized successfully.");
} catch (Exception e) {
log.error("Failed to reinitialize the NATS consumer: {}", e.getMessage(), e);
// Log error to Sentry
Sentry.captureException(e);
}
} else {
log.warn("NATS connection is null. Can not reinitialize consumer.");
}
}

/**
* Subjects to monitor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package de.tum.cit.aet.helios.nats;

import io.nats.client.Connection;
import io.nats.client.Consumer;
import io.nats.client.ErrorListener;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.Subscription;
import io.nats.client.impl.ErrorListenerConsoleImpl;
import io.nats.client.support.Status;
import io.sentry.Sentry;
import io.sentry.SentryLevel;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
* Custom error listener for NATS connection.
* Inspired by: <a href="https://natsbyexample.com/examples/os/intro/java">natsbyexample</a>
*/
@Log4j2
@Component
public class NatsErrorListener extends ErrorListenerConsoleImpl {

private final NatsConsumerService natsConsumerService;

@Autowired
public NatsErrorListener(@Lazy NatsConsumerService natsConsumerService) {
this.natsConsumerService = natsConsumerService;
}


/**
* Handles pull status errors for the NATS connection.
* Logs the error and trys consumer reinitialization if the consumer was deleted.
*/
@Override
public void pullStatusError(Connection conn, JetStreamSubscription sub, Status status) {
String message = this.supplyMessage("[SEVERE] pullStatusError", conn, (Consumer) null, sub,
new Object[] {"Status: ", status});
log.error(message);
// Check if the consumer was deleted
if (status.getCode() == 409 && "Consumer Deleted".equals(status.getMessage())) {
log.error("Consumer '{}' was deleted, triggering reinitialization of consumer...",
sub.getConsumerName());

// Log error to Sentry
Sentry.captureMessage(message, SentryLevel.ERROR);
// Trigger consumer recreation
natsConsumerService.reinitializeConsumer();
}
}

@Override
public void heartbeatAlarm(Connection conn, JetStreamSubscription sub, long lastStreamSequence,
long lastConsumerSequence) {
String message = this.supplyMessage("[SEVERE] heartbeatAlarm", conn, (Consumer) null, sub,
new Object[] {"lastStreamSequence: ", lastStreamSequence, "lastConsumerSequence: ",
lastConsumerSequence});
log.error(message);
}

@Override
public void errorOccurred(Connection conn, String error) {
String message =
this.supplyMessage("[SEVERE] errorOccurred", conn, (Consumer) null, (Subscription) null,
new Object[] {"Error: ", error});
log.error(message);
}

@Override
public void exceptionOccurred(Connection conn, Exception exp) {
String message =
this.supplyMessage("[SEVERE] exceptionOccurred", conn, (Consumer) null, (Subscription) null,
new Object[] {"Exception: ", exp});
log.error(message);
}

@Override
public void slowConsumerDetected(Connection conn, Consumer consumer) {
String message =
this.supplyMessage("[WARN] slowConsumerDetected", conn, consumer, (Subscription) null,
new Object[0]);
log.warn(message);
}

@Override
public void messageDiscarded(Connection conn, Message msg) {
String message =
this.supplyMessage("[INFO] messageDiscarded", conn, (Consumer) null, (Subscription) null,
new Object[] {"Message: ", msg});
log.info(message);
}

@Override
public void unhandledStatus(Connection conn, JetStreamSubscription sub, Status status) {
String message = this.supplyMessage("[WARN] unhandledStatus", conn, (Consumer) null, sub,
new Object[] {"Status: ", status});
log.warn(message);
}

@Override
public void pullStatusWarning(Connection conn, JetStreamSubscription sub, Status status) {
String message = this.supplyMessage("[WARN] pullStatusWarning", conn, (Consumer) null, sub,
new Object[] {"Status: ", status});
log.warn(message);
}

@Override
public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String id,
ErrorListener.FlowControlSource source) {
String message = this.supplyMessage("[INFO] flowControlProcessed", conn, (Consumer) null, sub,
new Object[] {"FlowControlSource: ", source});
log.info(message);
}

@Override
public void socketWriteTimeout(Connection conn) {
String message = this.supplyMessage("[SEVERE] socketWriteTimeout", conn, (Consumer) null,
(Subscription) null, new Object[0]);
log.error(message);
}
}
Loading

0 comments on commit 5aa9cc6

Please sign in to comment.