Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10816: Add health check endpoint for Kafka Connect #16477

Merged
merged 5 commits into from
Jul 3, 2024

Conversation

C0urante
Copy link
Contributor

Jira, KIP-1017

Adds the GET /health endpoint to the Kafka Connect REST API, and covers all three states (starting, healthy, unhealthy) in both modes (distributed, standalone) with integration tests. Some unit tests are also added.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante C0urante added connect kip Requires or implements a KIP labels Jun 27, 2024
@C0urante
Copy link
Contributor Author

@gharris1727, if you have time would you mind taking a look? I know the diff is fairly large but most of that is either small-but-far-reaching changes (i.e., getting rid of the DistributedHerder::running field and replacing its uses in our integration tests with the health check endpoint) or newly-added unit and integration tests. Still, if this is too much to do in one round, please let me know and I can try to break it out into smaller PRs.

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feature, Chris!

It mostly looks good to me. I had a few minor comments i'd like your thoughts on.

}

public static WorkerStatus healthy() {
return new WorkerStatus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: have a single instance that is always returned? I like the method, rather than having a final constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done

);
}

public static WorkerStatus starting(Stage stage) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are pretty natural methods, and I like them. And even though these are not part of the public api...

I think it is a bit odd that these rest entities would be aware of an internal abstraction Stage. It's not the first time, as TaskInfo depends on ConnectorTaskId, ServerInfo on AppInfoParser, etc.

Just a thought that crossed my mind, but I don't have an alternative that I like better so this can probably stay as-is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be easy enough to change the parameter to something like String statusDetail and leave it to the caller to invoke Stage::summarize if they're using the stage class to track progress. Did that not seem better to you? (I'd be fine with it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this a shot, let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah this is very reasonable. For some reason I thought there were a lot of call-sites that would need an extra null guard, but that wasn't the case. This is great.

@@ -124,10 +127,21 @@ public synchronized void stop() {
worker.stopAndAwaitConnector(connName);
}
stopServices();
running = false;
healthCheckThread.shutDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please call Thread#join() at some point to ensure the thread gets cleaned up :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good call. Done 👍

workerStatus = WorkerStatus.unhealthy(stage);
}
} catch (ExecutionException e) {
throw e.getCause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be wrapped in a ConnectRestException, so it gets picked up by the ConnectExceptionMapper? It falls back to a 500 otherwise so it doesn't have a lot of difference, and it should never throw an ExecutionException, but 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception mapper will catch this anyways, won't it? The intent was to hit this part of the code path and generate the standard 500-on-unexpected-error response.

try {
httpClient.start();
} catch (Exception e) {
throw new ConnectException("Failed to start HTTP client", e);
}

startConnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the order here matter? An exception starting httpClient still leaves the kafkaCluster started.

Copy link
Contributor Author

@C0urante C0urante Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to start the HTTP client before we start the Connect cluster in order to be able to hit the cluster's REST API before startConnect has returned (which is only possible in the standalone integration test because we run EmbeddedConnectStandalone::start on a separate thread from the one we use to make REST requests).

This is dirty-bordering-on-filthy, but I couldn't think of a better way to probe a worker's health check endpoint during simulated blocked startup. Thoughts?

log.error(msg, t);
throw new RuntimeException(msg, t);
}
private void awaitBrokerShutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a problem where the log directories would be deleted before the brokers shut down? Nice find!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, this was just because there was a race in one of our integration tests where we shut down the underlying Kafka cluster and then wait for a fixed time period before asserting that the Connect cluster is unavailable. Making Kafka cluster shutdown synchronous fixed that problem.

(And the method decomposition was just to appease Checkstyle, although I do have to admit I also find it easier to read now.)

*
* @return the worker's name
*/
public String name() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to have been unused. I can't imagine a situation where you'd want to programmatically search for the name, so it should probably only be visible in the toString() 👍

@C0urante
Copy link
Contributor Author

C0urante commented Jul 3, 2024

Thanks for the review @gharris1727! This is ready for another pass when you have time.

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @C0urante!

There is no need to increase the REST request timeout, since we're asserting
that the worker is unavailable when we issue the request anyways. There is also
no risk that the REST server has not been started or initialized yet, since both
of those operations take place synchronously during
EmbeddedConnectCluster::addWorker.
@C0urante
Copy link
Contributor Author

C0urante commented Jul 3, 2024

Thanks Greg!

I've realized the TODO in the code base was unnecessary and this should be safe to merge without waiting for more green CI runs (see the latest commit message for more details).

Merging...

@C0urante C0urante merged commit 27220d1 into apache:trunk Jul 3, 2024
1 check was pending
@C0urante C0urante deleted the kafka-10816 branch July 3, 2024 18:15
abhi-ksolves pushed a commit to ksolves/kafka that referenced this pull request Jul 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
connect kip Requires or implements a KIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants