Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10816: Add health check endpoint for Kafka Connect #16477

Merged
merged 5 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions clients/src/main/java/org/apache/kafka/common/utils/Time.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.kafka.common.utils;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -98,7 +98,7 @@ default Timer timer(Duration timeout) {
* @param <T> The type of the future.
*/
default <T> T waitForFuture(
CompletableFuture<T> future,
Future<T> future,
long deadlineNs
) throws TimeoutException, InterruptedException, ExecutionException {
TimeoutException timeoutException = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
/**
* Common initialization logic for Kafka Connect, intended for use by command line utilities
*
* @param <H> the type of {@link Herder} to be used
* @param <T> the type of {@link WorkerConfig} to be used
*/
public abstract class AbstractConnectCli<T extends WorkerConfig> {
public abstract class AbstractConnectCli<H extends Herder, T extends WorkerConfig> {

private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
private final String[] args;
Expand All @@ -52,7 +53,7 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
*
* @param args the CLI arguments to be processed. Note that if one or more arguments are passed, the first argument is
* assumed to be the Connect worker properties file and is processed in {@link #run()}. The remaining arguments
* can be handled in {@link #processExtraArgs(Herder, Connect, String[])}
* can be handled in {@link #processExtraArgs(Connect, String[])}
*/
protected AbstractConnectCli(String... args) {
this.args = args;
Expand All @@ -64,15 +65,14 @@ protected AbstractConnectCli(String... args) {
* The first CLI argument is assumed to be the Connect worker properties file and is processed by default. This method
* can be overridden if there are more arguments that need to be processed.
*
* @param herder the {@link Herder} instance that can be used to perform operations on the Connect cluster
* @param connect the {@link Connect} instance that can be stopped (via {@link Connect#stop()}) if there's an error
* encountered while processing the additional CLI arguments.
* @param extraArgs the extra CLI arguments that need to be processed
*/
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
public void processExtraArgs(Connect<H> connect, String[] extraArgs) {
}

protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
protected abstract H createHerder(T config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient);

Expand All @@ -92,7 +92,8 @@ public void run() {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
Connect connect = startConnect(workerProps, extraArgs);
Connect<H> connect = startConnect(workerProps);
processExtraArgs(connect, extraArgs);

// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
Expand All @@ -107,11 +108,9 @@ public void run() {
* Initialize and start an instance of {@link Connect}
*
* @param workerProps the worker properties map used to initialize the {@link WorkerConfig}
* @param extraArgs any additional CLI arguments that may need to be processed via
* {@link #processExtraArgs(Herder, Connect, String[])}
* @return a started instance of {@link Connect}
*/
public Connect startConnect(Map<String, String> workerProps, String... extraArgs) {
public Connect<H> startConnect(Map<String, String> workerProps) {
log.info("Kafka Connect worker initializing ...");
long initStart = time.hiResClockMs();

Expand All @@ -136,9 +135,9 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);

Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
H herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);

final Connect connect = new Connect(herder, restServer);
final Connect<H> connect = new Connect<>(herder, restServer);
log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
Expand All @@ -148,8 +147,6 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
Exit.exit(3);
}

processExtraArgs(herder, connect, extraArgs);

return connect;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
Expand Down Expand Up @@ -52,7 +51,7 @@
* stopping worker instances.
* </p>
*/
public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
public class ConnectDistributed extends AbstractConnectCli<DistributedHerder, DistributedConfig> {

public ConnectDistributed(String... args) {
super(args);
Expand All @@ -64,7 +63,7 @@ protected String usage() {
}

@Override
protected Herder createHerder(DistributedConfig config, String workerId, Plugins plugins,
protected DistributedHerder createHerder(DistributedConfig config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
* since it uses file storage (configurable via {@link StandaloneConfig#OFFSET_STORAGE_FILE_FILENAME_CONFIG})
* </p>
*/
public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
public class ConnectStandalone extends AbstractConnectCli<StandaloneHerder, StandaloneConfig> {
private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);

public ConnectStandalone(String... args) {
Expand All @@ -78,7 +78,7 @@ protected String usage() {
}

@Override
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
public void processExtraArgs(Connect<StandaloneHerder> connect, String[] extraArgs) {
try {
for (final String connectorConfigFile : extraArgs) {
CreateConnectorRequest createConnectorRequest = parseConnectorConfigurationFile(connectorConfigFile);
Expand All @@ -88,12 +88,13 @@ protected void processExtraArgs(Herder herder, Connect connect, String[] extraAr
else
log.info("Created connector {}", info.result().name());
});
herder.putConnectorConfig(
connect.herder().putConnectorConfig(
createConnectorRequest.name(), createConnectorRequest.config(),
createConnectorRequest.initialTargetState(),
false, cb);
cb.get();
}
connect.herder().ready();
} catch (Throwable t) {
log.error("Stopping after connector error", t);
connect.stop();
Expand Down Expand Up @@ -160,7 +161,7 @@ CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws I
}

@Override
protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
protected StandaloneHerder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private final String kafkaClusterId;
protected final StatusBackingStore statusBackingStore;
protected final ConfigBackingStore configBackingStore;
private volatile boolean ready = false;
private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
protected volatile boolean running = false;
private final ExecutorService connectorExecutor;
private final Time time;
protected final Loggers loggers;
Expand Down Expand Up @@ -180,9 +180,13 @@ protected void stopServices() {
Utils.closeQuietly(this.connectorClientConfigOverridePolicy, "connector client config override policy");
}

protected void ready() {
this.ready = true;
}

@Override
public boolean isRunning() {
return running;
public boolean isReady() {
return ready;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,27 @@
* This class ties together all the components of a Kafka Connect process (herder, worker,
* storage, command interface), managing their lifecycle.
*/
public class Connect {
public class Connect<H extends Herder> {
private static final Logger log = LoggerFactory.getLogger(Connect.class);

private final Herder herder;
private final H herder;
private final ConnectRestServer rest;
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook;

public Connect(Herder herder, ConnectRestServer rest) {
public Connect(H herder, ConnectRestServer rest) {
log.debug("Kafka Connect instance created");
this.herder = herder;
this.rest = rest;
shutdownHook = new ShutdownHook();
}

public H herder() {
return herder;
}

public void start() {
try {
log.info("Kafka Connect starting");
Expand Down Expand Up @@ -85,10 +89,6 @@ public void awaitStop() {
}
}

public boolean isRunning() {
return herder.isRunning();
}

// Visible for testing
public RestServer rest() {
return rest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,18 @@ public interface Herder {

void stop();

boolean isRunning();
/**
* @return whether the worker is ready; i.e., it has completed all initialization and startup
* steps such as creating internal topics, joining a cluster, etc.
*/
boolean isReady();

/**
* Check for worker health; i.e., its ability to service external requests from the user such
* as creating, reconfiguring, and deleting connectors
* @param callback callback to invoke once worker health is assured
*/
void healthCheck(Callback<Void> callback);

/**
* Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,17 @@ public void run() {
log.info("Herder starting");
herderThread = Thread.currentThread();

try (TickThreadStage stage = new TickThreadStage("reading to the end of internal topics")) {
try (TickThreadStage stage = new TickThreadStage("initializing and reading to the end of internal topics")) {
startServices();
}

log.info("Herder started");
running = true;

while (!stopping.get()) {
tick();

if (!isReady()) {
ready();
log.info("Herder started");
}
}

recordTickThreadStage("shutting down");
Expand All @@ -391,8 +393,6 @@ public void run() {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Utils.closeQuietly(this::stopServices, "herder services");
Exit.exit(1);
} finally {
running = false;
}
}

Expand Down Expand Up @@ -848,7 +848,17 @@ public void stop() {
ThreadUtils.shutdownExecutorServiceQuietly(forwardRequestExecutor, FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(startAndStopExecutor, START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
log.info("Herder stopped");
running = false;
}

@Override
public void healthCheck(Callback<Void> callback) {
addRequest(
() -> {
callback.onCompletion(null, null);
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -64,18 +63,8 @@ public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
} catch (ExecutionException e) {
throw e.getCause();
} catch (StagedTimeoutException e) {
String message;
Stage stage = e.stage();
if (stage.completed() != null) {
message = "Request timed out. The last operation the worker completed was "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started()) + " and completed at "
+ Instant.ofEpochMilli(stage.completed());
} else {
message = "Request timed out. The worker is currently "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started());
}
String message = "Request timed out. " + stage.summarize();
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
// error is the best option
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,23 @@ public interface RestRequestTimeout {
*/
long timeoutMs();

/**
* @return the current timeout that should be used for health check REST requests, in milliseconds
*/
long healthCheckTimeoutMs();

static RestRequestTimeout constant(long timeoutMs, long healthCheckTimeoutMs) {
return new RestRequestTimeout() {
@Override
public long timeoutMs() {
return timeoutMs;
}

@Override
public long healthCheckTimeoutMs() {
return healthCheckTimeoutMs;
}
};
}

}
Loading