Skip to content

Commit 5318df7

Browse files
committed
Fixes classloading for sasl callbacl handler, when the handler needs to
be loaded via the spring-boot fat-jar classloader
1 parent b6f78bc commit 5318df7

File tree

2 files changed

+221
-6
lines changed

2 files changed

+221
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
* @author Adrian Gygax
118118
* @author Soby Chacko
119119
* @author Jaeyeon Kim
120+
* @author Alexandros Papadakis
120121
*/
121122
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
122123
implements ProducerFactory<K, V>, ApplicationContextAware,
@@ -947,13 +948,34 @@ private CloseSafeProducer<K, V> doCreateTxProducer(@Nullable String prefix, Stri
947948
}
948949

949950
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
950-
Producer<K, V> kafkaProducer =
951-
new KafkaProducer<>(rawConfigs, this.keySerializerSupplier == null ? null : this.keySerializerSupplier.get(),
952-
this.valueSerializerSupplier == null ? null : this.valueSerializerSupplier.get());
953-
for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
954-
kafkaProducer = pp.apply(kafkaProducer);
951+
// store and restore the context class loader
952+
ClassLoader original = Thread.currentThread().getContextClassLoader();
953+
954+
// Safely pick a loader (applicationContext is @Nullable)
955+
ClassLoader target = null;
956+
if (this.applicationContext != null) { // @Nullable guard
957+
target = this.applicationContext.getClassLoader(); // from ResourceLoader
958+
}
959+
boolean switched = false;
960+
961+
try {
962+
if (target != null && target != original) {
963+
Thread.currentThread().setContextClassLoader(target);
964+
switched = true;
965+
}
966+
Producer<K, V> kafkaProducer =
967+
new KafkaProducer<>(rawConfigs, this.keySerializerSupplier == null ? null : this.keySerializerSupplier.get(),
968+
this.valueSerializerSupplier == null ? null : this.valueSerializerSupplier.get());
969+
for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
970+
kafkaProducer = pp.apply(kafkaProducer);
971+
}
972+
return kafkaProducer;
973+
}
974+
finally {
975+
if (switched) {
976+
Thread.currentThread().setContextClassLoader(original);
977+
}
955978
}
956-
return kafkaProducer;
957979
}
958980

959981
@Nullable
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.net.URL;
20+
import java.net.URLClassLoader;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.util.Map;
24+
import java.util.UUID;
25+
import java.util.jar.JarEntry;
26+
import java.util.jar.JarOutputStream;
27+
28+
import javax.tools.JavaCompiler;
29+
import javax.tools.ToolProvider;
30+
31+
import org.apache.kafka.clients.producer.ProducerConfig;
32+
import org.apache.kafka.common.config.ConfigException;
33+
import org.apache.kafka.common.serialization.StringSerializer;
34+
import org.junit.jupiter.api.AfterAll;
35+
import org.junit.jupiter.api.BeforeAll;
36+
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.api.io.TempDir;
38+
39+
import org.springframework.context.support.GenericApplicationContext;
40+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
41+
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
42+
import org.springframework.kafka.test.context.EmbeddedKafka;
43+
import org.springframework.kafka.test.utils.KafkaTestUtils;
44+
45+
import static org.assertj.core.api.Assertions.assertThat;
46+
import static org.assertj.core.api.Assertions.assertThatCode;
47+
import static org.assertj.core.api.Assertions.catchThrowable;
48+
49+
/**
50+
* Reproducer for GH:
51+
* https://github.com/spring-projects/spring-kafka/issues/4109 Tests the
52+
* class-loading behavior when a SASL callback handler class is packaged with
53+
* the application
54+
*
55+
* @author Alexandros Papadakis
56+
*/
57+
@EmbeddedKafka(topics = { KafkaTemplateTests.INT_KEY_TOPIC, KafkaTemplateTests.STRING_KEY_TOPIC })
58+
class KafkaSaslHandlerClassloadingTest {
59+
60+
public static final String INT_KEY_TOPIC = "intKeyTopic";
61+
62+
public static final String STRING_KEY_TOPIC = "stringKeyTopic";
63+
64+
private static EmbeddedKafkaBroker embeddedKafka;
65+
66+
private static final String FQCN = "xxx.yyy.kafka.auth.XyzAuthenticateCallbackHandler";
67+
68+
private static final String SOURCE = "package xxx.yyy.kafka.auth;\n" + "import javax.security.auth.callback.*;\n"
69+
+ "import java.util.List;\n" + "import java.util.Map;\n"
70+
+ "import javax.security.auth.login.AppConfigurationEntry;\n"
71+
+ "import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;\n"
72+
+ "public class XyzAuthenticateCallbackHandler implements AuthenticateCallbackHandler {\n"
73+
+ " @Override public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { }\n"
74+
+ " @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { }\n"
75+
+ " @Override public void close() { }\n" + "}\n";
76+
77+
/**
78+
* Build a tiny jar that contains only the callback handler class; return the
79+
* child ClassLoader that can see it.
80+
*/
81+
private URLClassLoader makeChildLoaderWithHandler(@TempDir Path tempDir) throws Exception {
82+
// 1) write source
83+
Path srcDir = tempDir.resolve("src/xxx/yyy/kafka/auth");
84+
Files.createDirectories(srcDir);
85+
Path javaFile = srcDir.resolve("XyzAuthenticateCallbackHandler.java");
86+
Files.writeString(javaFile, SOURCE);
87+
88+
// 2) compile using current test classpath
89+
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
90+
assertThat(compiler).as("Tests must run on a JDK (not a JRE) to get the JavaCompiler").isNotNull();
91+
92+
Path classesDir = tempDir.resolve("classes");
93+
Files.createDirectories(classesDir);
94+
int rc = compiler.run(null, null, null, "-classpath", System.getProperty("java.class.path"), "-d",
95+
classesDir.toString(), javaFile.toString());
96+
assertThat(rc).as("Compilation failed").isZero();
97+
98+
// 3) jar it
99+
Path jar = tempDir.resolve("handler.jar");
100+
try (JarOutputStream jos = new JarOutputStream(Files.newOutputStream(jar))) {
101+
String entryName = "xxx/yyy/kafka/auth/XyzAuthenticateCallbackHandler.class";
102+
jos.putNextEntry(new JarEntry(entryName));
103+
byte[] bytes = Files.readAllBytes(classesDir.resolve(entryName));
104+
jos.write(bytes);
105+
jos.closeEntry();
106+
}
107+
108+
// 4) child loader that can see ONLY the handler jar (parent is the current TCCL
109+
// for Kafka & Spring classes)
110+
URL[] urls = new URL[] { jar.toUri().toURL() };
111+
return new URLClassLoader(urls, Thread.currentThread().getContextClassLoader());
112+
}
113+
114+
@BeforeAll
115+
public static void setUp() {
116+
embeddedKafka = EmbeddedKafkaCondition.getBroker();
117+
}
118+
119+
@AfterAll
120+
public static void tearDown() {
121+
}
122+
123+
private Map<String, Object> producerProps() {
124+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka,
125+
"KafkaSaslHandlerTests" + UUID.randomUUID(), false);
126+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
127+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
128+
// SASL wiring to reach the callback-handler parsing path
129+
props.put("security.protocol", "SASL_SSL");
130+
props.put("sasl.login.callback.handler.class", FQCN);
131+
props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
132+
props.put("sasl.mechanism", "OAUTHBEARER");
133+
return props;
134+
}
135+
136+
@Test
137+
void reproduces_failure_when_handler_is_only_visible_to_child_loader(@TempDir Path temp) throws Exception {
138+
URLClassLoader child = makeChildLoaderWithHandler(temp);
139+
140+
// TCCL remains the parent (cannot see the handler class)
141+
ClassLoader original = Thread.currentThread().getContextClassLoader();
142+
try {
143+
Thread.currentThread().setContextClassLoader(original); // explicit for clarity
144+
145+
Map<String, Object> props = producerProps();
146+
147+
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(props);
148+
149+
Throwable thrown = catchThrowable(factory::createProducer);
150+
assertThat(thrown).as("Expected class-loading failure when only the child loader has the handler")
151+
.isInstanceOf(ConfigException.class);
152+
153+
assertThat(thrown.getMessage()).as("Exception message should indicate missing class")
154+
.matches("(?s).*could not be found.*|(?s).*cannot be found.*");
155+
156+
thrown.printStackTrace();
157+
}
158+
finally {
159+
Thread.currentThread().setContextClassLoader(original);
160+
child.close();
161+
}
162+
}
163+
164+
@Test
165+
void succeeds_when_fix_provides_Class_instance_from_correct_loader(@TempDir Path temp) throws Exception {
166+
URLClassLoader appCtxLoader = makeChildLoaderWithHandler(temp);
167+
168+
ClassLoader original = Thread.currentThread().getContextClassLoader();
169+
try {
170+
// TCCL still can't see the class (remains parent)
171+
Thread.currentThread().setContextClassLoader(original);
172+
173+
Map<String, Object> props = producerProps();
174+
175+
// Create an ApplicationContext whose ClassLoader == appCtxLoader
176+
try (GenericApplicationContext ctx = new GenericApplicationContext()) {
177+
ctx.setClassLoader(appCtxLoader); // <-- critical: handler only here
178+
// Register the factory as a bean so it receives the ApplicationContext
179+
ctx.registerBean(DefaultKafkaProducerFactory.class, () -> new DefaultKafkaProducerFactory<>(props));
180+
ctx.refresh();
181+
182+
DefaultKafkaProducerFactory<?, ?> factory = ctx.getBean(DefaultKafkaProducerFactory.class);
183+
184+
assertThatCode(factory::createProducer)
185+
.as("Passing a Class<?> bypasses name lookups and should succeed").doesNotThrowAnyException();
186+
}
187+
}
188+
finally {
189+
Thread.currentThread().setContextClassLoader(original);
190+
appCtxLoader.close();
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)