Skip to content

KAFKA-18182: KIP-891 Connect Multiversion Support (Base PR with Plugin Loading Isolation Changes) #16984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
49b37c7
Add multiversioning apis to isolation
snehashisp Aug 23, 2024
1b740cc
add method for header converter
snehashisp Aug 23, 2024
017df62
add getter for all plugins for a class
snehashisp Sep 19, 2024
4744f4f
Allow super to load class but do a version check
snehashisp Sep 21, 2024
8beac31
validate the version when loaded from parent correctly
snehashisp Sep 21, 2024
814ddb7
bug in loadclass, should use fullname
snehashisp Sep 22, 2024
7497ef7
add available version to exception and minor changes
snehashisp Sep 22, 2024
2119a91
Add default version getter
snehashisp Oct 28, 2024
f973fd0
add a null check
snehashisp Oct 29, 2024
baea3cf
Update log and comment
snehashisp Oct 29, 2024
ddec0d8
Rename default version to latest version
snehashisp Oct 30, 2024
9ded1af
fix header converter version
snehashisp Oct 31, 2024
74a0d8f
Use current classloader when version is not present
snehashisp Oct 31, 2024
bfa73e7
Remove transformation and predicate getters
snehashisp Nov 9, 2024
fbbb0b8
add rawtype annotation
snehashisp Nov 9, 2024
8c334b9
Add a static loader swap method
snehashisp Nov 13, 2024
5fef3f4
Resolve comments on 1st review
snehashisp Dec 5, 2024
5e9f3c7
Remove extra code
snehashisp Dec 5, 2024
b2b51c6
Add plugin version utils for parsing versions
snehashisp Dec 5, 2024
1c35aa9
Should return loader
snehashisp Dec 5, 2024
155271b
fix incorrect classloader equality check
snehashisp Dec 5, 2024
8c88d4c
fix whitespace issue
snehashisp Dec 5, 2024
d6e3392
Update version loading logic and add return delegating loader
snehashisp Dec 5, 2024
b6e2226
Combine loader logic
snehashisp Dec 5, 2024
821a7fe
Don't expose some plugins methods
snehashisp Dec 6, 2024
5b41efe
add safe swap loader instead of static swaploader
snehashisp Dec 6, 2024
158ad0f
Resolve comments
snehashisp Dec 6, 2024
24c95ea
resolve comments and fix some checkstyle
snehashisp Dec 7, 2024
757caba
fix more checkstyle errors
snehashisp Dec 7, 2024
fefc9e1
Merge remote-tracking branch 'origin' into multiversioning
snehashisp Dec 7, 2024
c12a042
Remove classpath helper
snehashisp Dec 7, 2024
1caa6f8
fix tests
snehashisp Dec 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3518,6 +3518,7 @@ project(':connect:runtime') {

implementation libs.slf4jApi
implementation libs.reload4j
implementation libs.slf4jReload4j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonAnnotations
implementation libs.jacksonJaxrsJsonProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.maven.artifact.versioning.ArtifactVersion;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* A custom classloader dedicated to loading Connect plugin classes in classloading isolation.
Expand Down Expand Up @@ -69,36 +75,108 @@ public DelegatingClassLoader() {

/**
* Retrieve the PluginClassLoader associated with a plugin class
*
* @param name The fully qualified class name of the plugin
* @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated.
*/
// VisibleForTesting
PluginClassLoader pluginClassLoader(String name) {
PluginClassLoader pluginClassLoader(String name, VersionRange range) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}

SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
if (inner == null) {
return null;
}
ClassLoader pluginLoader = inner.get(inner.lastKey());


ClassLoader pluginLoader = findPluginLoader(inner, name, range);
return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
? (PluginClassLoader) pluginLoader
: null;
}

ClassLoader connectorLoader(String connectorClassOrAlias) {
String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias);
ClassLoader classLoader = pluginClassLoader(fullName);
if (classLoader == null) classLoader = this;
PluginClassLoader pluginClassLoader(String name) {
return pluginClassLoader(name, null);
}

ClassLoader loader(String classOrAlias, VersionRange range) {
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
ClassLoader classLoader = pluginClassLoader(fullName, range);
if (classLoader == null) {
classLoader = this;
}
log.debug(
"Getting plugin class loader: '{}' for connector: {}",
classLoader,
connectorClassOrAlias
"Got plugin class loader: '{}' for connector: {}",
classLoader,
classOrAlias
);
return classLoader;
}

ClassLoader loader(String classOrAlias) {
return loader(classOrAlias, null);
}

ClassLoader connectorLoader(String connectorClassOrAlias) {
return loader(connectorClassOrAlias);
}

String resolveFullClassName(String classOrAlias) {
return aliases.getOrDefault(classOrAlias, classOrAlias);
}

String latestVersion(String classOrAlias) {
if (classOrAlias == null) {
return null;
}
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
return null;
}
return inner.lastKey().version();
}

private ClassLoader findPluginLoader(
SortedMap<PluginDesc<?>, ClassLoader> loaders,
String pluginName,
VersionRange range
) {

if (range != null) {

if (null != range.getRecommendedVersion()) {
throw new VersionedPluginLoadingException(String.format("A soft version range is not supported for plugin loading, "
+ "this is an internal error as connect should automatically convert soft ranges to hard ranges. "
+ "Provided soft version: %s ", range));
}

ArtifactVersion version = null;
ClassLoader loader = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) {
// the entries should be in sorted order of versions so this should end up picking the latest version which matches the range
if (range.containsVersion(entry.getKey().encodedVersion())) {
loader = entry.getValue();
}
}

if (loader == null) {
List<String> availableVersions = loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
throw new VersionedPluginLoadingException(String.format(
"Plugin %s not found that matches the version range %s, available versions: %s",
pluginName,
range,
availableVersions
), availableVersions);
}
return loader;
}

return loaders.get(loaders.lastKey());
}

public void installDiscoveredPlugins(PluginScanResult scanResult) {
pluginLoaders.putAll(computePluginLoaders(scanResult));
for (String pluginClassName : pluginLoaders.keySet()) {
Expand All @@ -112,21 +190,72 @@ public void installDiscoveredPlugins(PluginScanResult scanResult) {

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
return loadVersionedPluginClass(name, null, resolve);
}

protected Class<?> loadVersionedPluginClass(
String name,
VersionRange range,
boolean resolve
) throws VersionedPluginLoadingException, ClassNotFoundException {

String fullName = aliases.getOrDefault(name, name);
PluginClassLoader pluginLoader = pluginClassLoader(fullName);
PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
Class<?> plugin;
if (pluginLoader != null) {
log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
return pluginLoader.loadClass(fullName, resolve);
log.trace("Retrieving loaded class '{}' from '{}'", name, pluginLoader);
plugin = pluginLoader.loadClass(fullName, resolve);
} else {
plugin = super.loadClass(fullName, resolve);
if (range == null) {
return plugin;
}
verifyClasspathVersionedPlugin(name, plugin, range);
}
return plugin;
}

private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, VersionRange range) throws VersionedPluginLoadingException {
String pluginVersion;
SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(name);

if (scannedPlugin == null) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)",
name
));
}

return super.loadClass(fullName, resolve);
List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream()
.filter(pluginDesc -> pluginDesc.location().equals("classpath"))
.collect(Collectors.toList());

if (classpathPlugins.size() > 1) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has multiple versions specified in class path, "
+ "only one version is allowed in class path for loading a plugin with version range",
name
));
} else if (classpathPlugins.isEmpty()) {
throw new VersionedPluginLoadingException("Invalid plugin found in classpath");
} else {
pluginVersion = classpathPlugins.get(0).version();
if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the required version range %s",
name,
pluginVersion,
range
), Collections.singletonList(pluginVersion));
}
}
}

private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> computePluginLoaders(PluginScanResult plugins) {
Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new HashMap<>();
plugins.forEach(pluginDesc ->
pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>())
.put(pluginDesc, pluginDesc.loader()));
pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>())
.put(pluginDesc, pluginDesc.loader()));
return pluginLoaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
Expand Down Expand Up @@ -59,6 +60,10 @@ public String toString() {
", location='" + location + '\'' +
'}';
}
@JsonIgnore
DefaultArtifactVersion encodedVersion() {
return encodedVersion;
}

public Class<? extends T> pluginClass() {
return klass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -469,7 +471,7 @@ private static Collection<URL> forJavaClassPath() {
}
return distinctUrls(urls);
}

private static Collection<URL> forClassLoader(ClassLoader classLoader) {
final Collection<URL> result = new ArrayList<>();
while (classLoader != null) {
Expand All @@ -483,12 +485,28 @@ private static Collection<URL> forClassLoader(ClassLoader classLoader) {
}
return distinctUrls(result);
}

private static Collection<URL> distinctUrls(Collection<URL> urls) {
Map<String, URL> distinct = new HashMap<>(urls.size());
for (URL url : urls) {
distinct.put(url.toExternalForm(), url);
}
return distinct.values();
}
public static VersionRange connectorVersionRequirement(String version) throws InvalidVersionSpecificationException {
if (version == null || version.equals("latest")) {
return null;
}
version = version.trim();

// check first if the given version is valid
VersionRange.createFromVersionSpec(version);

// now if the version is not enclosed we consider it as a hard requirement and enclose it in []
if (!version.startsWith("[") && !version.startsWith("(")) {
version = "[" + version + "]";
}
return VersionRange.createFromVersionSpec(version);
}

}
Loading
Loading