Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] KIP-891: Connect Multiversion Support (Base PR with Plugin Loading Isolation Changes) #16984

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3190,6 +3190,7 @@ project(':connect:runtime') {

implementation libs.slf4jApi
implementation libs.reload4j
implementation libs.slf4jReload4j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonAnnotations
implementation libs.jacksonJaxrsJsonProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.maven.artifact.versioning.ArtifactVersion;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* A custom classloader dedicated to loading Connect plugin classes in classloading isolation.
Expand Down Expand Up @@ -69,36 +70,109 @@ public DelegatingClassLoader() {

/**
* Retrieve the PluginClassLoader associated with a plugin class
*
* @param name The fully qualified class name of the plugin
* @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated.
*/
// VisibleForTesting
PluginClassLoader pluginClassLoader(String name) {
PluginClassLoader pluginClassLoader(String name, VersionRange range) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}

SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
if (inner == null) {
return null;
}
ClassLoader pluginLoader = inner.get(inner.lastKey());


ClassLoader pluginLoader = findPluginLoader(inner, name, range);
return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
? (PluginClassLoader) pluginLoader
: null;
}

PluginClassLoader pluginClassLoader(String name) {
return pluginClassLoader(name, null);
}

ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange range) throws ClassNotFoundException {
String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias);
// if the plugin is not loaded via the plugin classloader, it might still be available in the parent delegating
// classloader, in order to check if the version satisfies the requirement we need to load the plugin class here
ClassLoader classLoader = loadVersionedPluginClass(fullName, range, false).getClassLoader();
log.debug(
"Got plugin class loader: '{}' for connector: {}",
classLoader,
connectorClassOrAlias
);
return classLoader;
}

ClassLoader connectorLoader(String connectorClassOrAlias) {
String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias);
ClassLoader classLoader = pluginClassLoader(fullName);
if (classLoader == null) classLoader = this;
log.debug(
"Getting plugin class loader: '{}' for connector: {}",
classLoader,
connectorClassOrAlias
"Getting plugin class loader: '{}' for connector: {}",
classLoader,
connectorClassOrAlias
);
return classLoader;
}

String resolveFullClassName(String classOrAlias) {
return aliases.getOrDefault(classOrAlias, classOrAlias);
}

String latestVersion(String classOrAlias) {
if (classOrAlias == null) {
return null;
}
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
return null;
}
return inner.lastKey().version();
}

private ClassLoader findPluginLoader(
SortedMap<PluginDesc<?>, ClassLoader> loaders,
String pluginName,
VersionRange range
) {

if (range != null) {

ArtifactVersion version = range.getRecommendedVersion();

if (range.hasRestrictions()) {
List<ArtifactVersion> versions = loaders.keySet().stream().map(PluginDesc::encodedVersion).collect(Collectors.toList());
version = range.matchVersion(versions);
if (version == null) {
List<String> availableVersions = loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
throw new VersionedPluginLoadingException(String.format(
"Plugin loader for %s not found that matches the version range %s, available versions: %s",
pluginName,
range,
availableVersions
), availableVersions);
}
}

if (version != null) {
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) {
if (entry.getKey().encodedVersion().equals(version)) {
return entry.getValue();
}
}
}
}

return loaders.get(loaders.lastKey());
}

public void installDiscoveredPlugins(PluginScanResult scanResult) {
pluginLoaders.putAll(computePluginLoaders(scanResult));
for (String pluginClassName : pluginLoaders.keySet()) {
Expand All @@ -122,11 +196,54 @@ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundE
return super.loadClass(fullName, resolve);
}

protected Class<?> loadVersionedPluginClass(
String name,
VersionRange range,
boolean resolve
) throws VersionedPluginLoadingException, ClassNotFoundException {

String fullName = aliases.getOrDefault(name, name);
PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
Class<?> plugin;
if (pluginLoader != null) {
log.trace("Retrieving loaded class '{}' from '{}'", name, pluginLoader);
plugin = pluginLoader.loadClass(fullName, resolve);
} else {
plugin = super.loadClass(fullName, resolve);
// if we are loading a plugin class from the parent classloader, we need to check if the version
// matches the range
String pluginVersion;
try (LoaderSwap classLoader = PluginScanner.withClassLoader(plugin.getClassLoader())) {
pluginVersion = PluginScanner.versionFor(plugin.getDeclaredConstructor().newInstance());
} catch (ReflectiveOperationException | LinkageError e) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s was loaded with %s but failed to determine its version",
name,
plugin.getClassLoader()
), e);
}

if (range != null && range.hasRestrictions() && !range.containsVersion(new DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the required version range %s",
name,
pluginVersion,
range
), Collections.singletonList(pluginVersion));
}
}


return plugin;
}



private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> computePluginLoaders(PluginScanResult plugins) {
Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new HashMap<>();
plugins.forEach(pluginDesc ->
pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>())
.put(pluginDesc, pluginDesc.loader()));
pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>())
.put(pluginDesc, pluginDesc.loader()));
return pluginLoaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
Expand Down Expand Up @@ -59,6 +60,10 @@ public String toString() {
", location='" + location + '\'' +
'}';
}
@JsonIgnore
DefaultArtifactVersion encodedVersion() {
return encodedVersion;
}

public Class<? extends T> pluginClass() {
return klass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected static String reflectiveErrorDescription(Throwable t) {
}
}

protected LoaderSwap withClassLoader(ClassLoader loader) {
protected static LoaderSwap withClassLoader(ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
try {
return new LoaderSwap(savedLoader);
Expand Down
Loading