diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java index d7343098b4e2a..2f38eee4db942 100644 --- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java +++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java @@ -433,6 +433,16 @@ private void validatePushRequest(PushTelemetryRequest request, ClientMetricsInst request.data().clientInstanceId(), clientTelemetryMaxBytes); throw new TelemetryTooLargeException(msg); } + + // Client library picks the first compression type from the list of supported compression types send by the broker. + // If compression type send in push telemetry request is different from the preferred compression type by broker, + // it means that client has ignored the preferred compression type. + // This is not an error, so log a warning and continue with the push telemetry request. + if (request.data().compressionType() != SUPPORTED_COMPRESSION_TYPES.get(0)) { + String msg = String.format("Compression type [%s] is received in push telemetry is different from the preferred compression type [%s] for client instance id: [%s]. Continuing with the push telemetry request.", + CompressionType.forId(request.data().compressionType()), CompressionType.forId(SUPPORTED_COMPRESSION_TYPES.get(0)), request.data().clientInstanceId()); + log.info(msg); + } } private static boolean isSupportedCompressionType(int id) {