-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10816: Add health check endpoint for Kafka Connect #16477
Conversation
@gharris1727, if you have time would you mind taking a look? I know the diff is fairly large but most of that is either small-but-far-reaching changes (i.e., getting rid of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feature, Chris!
It mostly looks good to me. I had a few minor comments i'd like your thoughts on.
} | ||
|
||
public static WorkerStatus healthy() { | ||
return new WorkerStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: have a single instance that is always returned? I like the method, rather than having a final constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 done
); | ||
} | ||
|
||
public static WorkerStatus starting(Stage stage) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are pretty natural methods, and I like them. And even though these are not part of the public api...
I think it is a bit odd that these rest entities would be aware of an internal abstraction Stage
. It's not the first time, as TaskInfo depends on ConnectorTaskId, ServerInfo on AppInfoParser, etc.
Just a thought that crossed my mind, but I don't have an alternative that I like better so this can probably stay as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be easy enough to change the parameter to something like String statusDetail
and leave it to the caller to invoke Stage::summarize
if they're using the stage class to track progress. Did that not seem better to you? (I'd be fine with it.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a shot, let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah this is very reasonable. For some reason I thought there were a lot of call-sites that would need an extra null guard, but that wasn't the case. This is great.
@@ -124,10 +127,21 @@ public synchronized void stop() { | |||
worker.stopAndAwaitConnector(connName); | |||
} | |||
stopServices(); | |||
running = false; | |||
healthCheckThread.shutDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please call Thread#join()
at some point to ensure the thread gets cleaned up :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good call. Done 👍
workerStatus = WorkerStatus.unhealthy(stage); | ||
} | ||
} catch (ExecutionException e) { | ||
throw e.getCause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be wrapped in a ConnectRestException, so it gets picked up by the ConnectExceptionMapper? It falls back to a 500 otherwise so it doesn't have a lot of difference, and it should never throw an ExecutionException, but 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception mapper will catch this anyways, won't it? The intent was to hit this part of the code path and generate the standard 500-on-unexpected-error response.
try { | ||
httpClient.start(); | ||
} catch (Exception e) { | ||
throw new ConnectException("Failed to start HTTP client", e); | ||
} | ||
|
||
startConnect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the order here matter? An exception starting httpClient still leaves the kafkaCluster started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to start the HTTP client before we start the Connect cluster in order to be able to hit the cluster's REST API before startConnect
has returned (which is only possible in the standalone integration test because we run EmbeddedConnectStandalone::start
on a separate thread from the one we use to make REST requests).
This is dirty-bordering-on-filthy, but I couldn't think of a better way to probe a worker's health check endpoint during simulated blocked startup. Thoughts?
log.error(msg, t); | ||
throw new RuntimeException(msg, t); | ||
} | ||
private void awaitBrokerShutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a problem where the log directories would be deleted before the brokers shut down? Nice find!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, this was just because there was a race in one of our integration tests where we shut down the underlying Kafka cluster and then wait for a fixed time period before asserting that the Connect cluster is unavailable. Making Kafka cluster shutdown synchronous fixed that problem.
(And the method decomposition was just to appease Checkstyle, although I do have to admit I also find it easier to read now.)
* | ||
* @return the worker's name | ||
*/ | ||
public String name() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to have been unused. I can't imagine a situation where you'd want to programmatically search for the name, so it should probably only be visible in the toString() 👍
… in HealthCheckThread::shutDown
Thanks for the review @gharris1727! This is ready for another pass when you have time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @C0urante!
There is no need to increase the REST request timeout, since we're asserting that the worker is unavailable when we issue the request anyways. There is also no risk that the REST server has not been started or initialized yet, since both of those operations take place synchronously during EmbeddedConnectCluster::addWorker.
Thanks Greg! I've realized the TODO in the code base was unnecessary and this should be safe to merge without waiting for more green CI runs (see the latest commit message for more details). Merging... |
Reviewers: Greg Harris <[email protected]>
Jira, KIP-1017
Adds the
GET /health
endpoint to the Kafka Connect REST API, and covers all three states (starting, healthy, unhealthy) in both modes (distributed, standalone) with integration tests. Some unit tests are also added.Committer Checklist (excluded from commit message)