Skip to content

fix(QTDI-1638): thread safe component manager #1053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -374,6 +377,8 @@

private final DefaultServiceProvider defaultServiceProvider;

private final ReentrantReadWriteLock containerLock = new ReentrantReadWriteLock();

public ComponentManager(final File m2) {
this(m2.toPath());
}
Expand Down Expand Up @@ -655,7 +660,7 @@
}
}

protected List<String> addJarContaining(final ClassLoader loader, final String resource) {

Check failure on line 663 in component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/ComponentManager.java

View check run for this annotation

sonar-eks / Component Runtime Sonarqube Results

component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/ComponentManager.java#L663

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.
final URL url = loader.getResource(resource);
if (url != null) {
Path plugin = null;
Expand Down Expand Up @@ -690,10 +695,10 @@
.of(plugin)
// just a small workaround for maven/gradle
.flatMap(this::toPluginLocations)
.filter(path -> !container.find(path.getFileName().toString()).isPresent())
.filter(path -> !findPlugin(path.getFileName().toString()).isPresent())
.map(file -> {
final String id = addPlugin(file.toAbsolutePath().toString());
if (container.find(id).get().get(ContainerComponentRegistry.class).getComponents().isEmpty()) {
if (findPlugin(id).get().get(ContainerComponentRegistry.class).getComponents().isEmpty()) {
removePlugin(id);
return null;
}
Expand Down Expand Up @@ -742,9 +747,7 @@
}

public ParameterMeta findConfigurationType(final String plugin, final String name, final String configurationType) {
return container
.findAll()
.stream()
return findAll()
.map(c -> c.get(ContainerComponentRegistry.class))
.map(registry -> registry.findComponentFamily(plugin))
.filter(Objects::nonNull)
Expand Down Expand Up @@ -838,7 +841,7 @@
}

public <T> Stream<T> find(final Function<Container, Stream<T>> mapper) {
return container.findAll().stream().flatMap(mapper);
return findAll().flatMap(mapper);
}

// really a DIY entry point for custom flow, it creates component instances but
Expand All @@ -852,17 +855,44 @@

private Optional<Object> findComponentInternal(final String plugin, final String name,
final ComponentType componentType, final int version, final Map<String, String> configuration) {
if (container.findAll().isEmpty()) {
autoDiscoverPlugins(false, true);
}
autoDiscoverPluginsIfEmpty(false, true);

final Map<String, String> conf = mergeCheckpointConfiguration(plugin, name, componentType, configuration);
return find(pluginContainer -> Stream
.of(findInstance(plugin, name, componentType, version, conf, pluginContainer)))
.filter(Objects::nonNull)
.findFirst();
}

public void autoDiscoverPluginsIfEmpty(final boolean callers, final boolean classpath) {
if (hasPlugins()) {
return;
}

final WriteLock writeLock = containerLock.writeLock();
writeLock.lock();
try {
if (hasPlugins()) {
return;
}

autoDiscoverPlugins0(callers, classpath);
} finally {
writeLock.unlock();
}
}

public void autoDiscoverPlugins(final boolean callers, final boolean classpath) {
final WriteLock writeLock = containerLock.writeLock();
writeLock.lock();
try {
autoDiscoverPlugins0(callers, classpath);
} finally {
writeLock.unlock();
}
}

private void autoDiscoverPlugins0(final boolean callers, final boolean classpath) {
if (callers && !Boolean.getBoolean("component.manager.callers.skip")) {
addCallerAsPlugin();
}
Expand Down Expand Up @@ -914,12 +944,10 @@
final Container pluginContainer) {
return ofNullable(pluginContainer.get(GenericComponentExtension.class))
.filter(ext -> ext.canHandle(componentType.runtimeType(), plugin, name))
.map(ext -> Object.class
.cast(ext
.createInstance(componentType.runtimeType(), plugin, name, version, configuration,
ofNullable(pluginContainer.get(AllServices.class))
.map(AllServices::getServices)
.orElseGet(Collections::emptyMap))));
.map(ext -> ext.createInstance(componentType.runtimeType(), plugin, name, version, configuration,
ofNullable(pluginContainer.get(AllServices.class))
.map(AllServices::getServices)
.orElseGet(Collections::emptyMap)));
}

public Optional<Mapper> findMapper(final String plugin, final String name, final int version,
Expand All @@ -940,26 +968,43 @@
}

public boolean hasPlugin(final String plugin) {
return container.find(plugin).isPresent();
return findPlugin(plugin).isPresent();
}

public Optional<Container> findPlugin(final String plugin) {
return container.find(plugin);
final ReadLock readLock = containerLock.readLock();
readLock.lock();
try {
return container.find(plugin);
} finally {
readLock.unlock();
}
}

public synchronized String addPlugin(final String pluginRootFile) {
final Optional<Container> pl = findPlugin(pluginRootFile);
if (pl.isPresent()) {
return pl.get().getId();
public String addPlugin(final String pluginRootFile) {
final WriteLock writeLock = containerLock.writeLock();
writeLock.lock();
try {
final String pluginId = findPluginId(pluginRootFile);
if (pluginId != null) {
return pluginId;
}

final String id = this.container
.builder(pluginRootFile)
.withCustomizer(createContainerCustomizer(pluginRootFile))
.withAdditionalClasspath(findAdditionalClasspathFor(container.buildAutoIdFromName(pluginRootFile)))
.create()
.getId();
info("Adding plugin: " + pluginRootFile + ", as " + id);
return id;
} finally {
writeLock.unlock();
}
final String id = this.container
.builder(pluginRootFile)
.withCustomizer(createContainerCustomizer(pluginRootFile))
.withAdditionalClasspath(findAdditionalClasspathFor(container.buildAutoIdFromName(pluginRootFile)))
.create()
.getId();
info("Adding plugin: " + pluginRootFile + ", as " + id);
return id;
}

private String findPluginId(final String pluginRootFile) {
return findPlugin(pluginRootFile).map(Container::getId).orElse(null);
}

public String addWithLocationPlugin(final String location, final String pluginRootFile) {
Expand Down Expand Up @@ -993,7 +1038,7 @@
}

public void removePlugin(final String id) {
container.find(id).ifPresent(Container::close);
findPlugin(id).ifPresent(Container::close);
info("Removed plugin: " + id);
}

Expand All @@ -1019,16 +1064,42 @@
final ClassLoader old = thread.getContextClassLoader();
thread
.setContextClassLoader(
container.find(plugin).map(Container::getLoader).map(ClassLoader.class::cast).orElse(old));
findPlugin(plugin).map(Container::getLoader).map(ClassLoader.class::cast).orElse(old));
try {
return supplier.get();
} finally {
thread.setContextClassLoader(old);
}
}

private boolean hasPlugins() {
final ReadLock readLock = containerLock.readLock();
readLock.lock();
try {
return !container.findAll().isEmpty();
} finally {
readLock.unlock();
}
}

private Stream<Container> findAll() {
final ReadLock readLock = containerLock.readLock();
readLock.lock();
try {
return container.findAll().stream();
} finally {
readLock.unlock();
}
}

public List<String> availablePlugins() {
return container.findAll().stream().map(Container::getId).collect(toList());
final ReadLock readLock = containerLock.readLock();
readLock.lock();
try {
return container.getPluginsList();
} finally {
readLock.unlock();
}
}

protected void containerServices(final Container container, final Map<Class<?>, Object> services) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.stream.Stream;
Expand All @@ -61,6 +63,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.opentest4j.AssertionFailedError;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.configuration.LocalConfiguration;
Expand All @@ -74,6 +79,9 @@
import org.talend.sdk.component.runtime.record.RecordBuilderFactoryImpl;
import org.talend.sdk.component.runtime.serialization.EnhancedObjectInputStream;

import lombok.extern.slf4j.Slf4j;

@Slf4j
class ComponentManagerTest {

private final PluginGenerator pluginGenerator = new PluginGenerator();
Expand Down Expand Up @@ -219,6 +227,75 @@ void addPluginMultiThread(@TempDir final File temporaryFolder) throws Interrupte
}
}

@MethodSource("autoDiscoveryMultiThreadSource")
@ParameterizedTest
void autoDiscoveryMultiThread(final Consumer<ComponentManager> consumer, @TempDir final File temporaryFolder)
throws InterruptedException, IOException {
final File pluginFolder = new File(temporaryFolder, "test-plugins_" + UUID.randomUUID());
pluginFolder.mkdirs();
final File plugin1 = pluginGenerator.createChainPlugin(pluginFolder, "plugin1.jar");
final File plugin2 = pluginGenerator.createChainPlugin(pluginFolder, "plugin2.jar");

final int threadCount = 50;
final AtomicBoolean intermittentState = new AtomicBoolean(false);

final ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
final URL[] array = new URL[2];
array[0] = plugin1.toURI().toURL();
array[1] = plugin2.toURI().toURL();

try (final URLClassLoader tmpLoader = new URLClassLoader(array, contextLoader)) {
Thread.currentThread().setContextClassLoader(tmpLoader);
try (final ComponentManager manager =
new ComponentManager(new File("target/test-dependencies"), "META-INF/test/dependencies", null)) {

Assertions.assertTrue(manager.availablePlugins().isEmpty());

final Thread monitor = new Thread(() -> {
final Thread thread = Thread.currentThread();
while (!thread.isInterrupted()) {
// we want to check this thread as frequent as possible, to catch the probable problem
// it shouldn't be long
if (manager.availablePlugins().size() % 2 != 0) {
intermittentState.set(true);
}
}
});
monitor.setDaemon(true);
monitor.start();

final Thread[] th = new Thread[threadCount];
for (int ind = 0; ind < th.length; ind++) {
th[ind] = new Thread(() -> consumer.accept(manager));
}
for (final Thread thread : th) {
thread.start();
}
for (final Thread thread : th) {
thread.join();
}
monitor.interrupt();
monitor.join(); // it's daemon, but let's wait for it's end

Assertions.assertEquals(2, manager.availablePlugins().size());
} finally { // clean temp files
DynamicContainerFinder.SERVICES.clear();
doCleanup(pluginFolder);
}
} finally {
Thread.currentThread().setContextClassLoader(contextLoader);
}

// it should be either 2 or 0, depending on the timing of the threads
Assertions.assertFalse(intermittentState.get());
}

public static Stream<Arguments> autoDiscoveryMultiThreadSource() {
return Stream.of(
Arguments.of((Consumer<ComponentManager>) manager -> manager.autoDiscoverPluginsIfEmpty(false, true)),
Arguments.of((Consumer<ComponentManager>) manager -> manager.autoDiscoverPlugins(false, true)));
}

@Test
void run(@TempDir final File temporaryFolder) throws Exception {
final File pluginFolder = new File(temporaryFolder, "test-plugins_" + UUID.randomUUID().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public String map(final String key) {
});
outputStream.putNextEntry(new JarEntry(toPack + "/Messages.properties"));
outputStream.write(("error=An error occured").getBytes(StandardCharsets.UTF_8));
outputStream.closeEntry();
outputStream.putNextEntry(new JarEntry("TALEND-INF/dependencies.txt"));
outputStream.write(("test.group:" + plugin + ":jar:1:compile").getBytes(StandardCharsets.UTF_8));
outputStream.closeEntry();
outputStream.putNextEntry(new JarEntry("TALEND-INF/local-configuration.properties"));
outputStream.write(("_maxBatchSize.value=" + maxBatchSize).getBytes(StandardCharsets.UTF_8));
outputStream.closeEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.logging.Level;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.talend.sdk.component.classloader.ConfigurableClassLoader;
Expand Down Expand Up @@ -330,13 +329,13 @@ public Collection<Container> findAll() {
public List<String> getPluginsList() {
return findAll()
.stream()
.map(container -> container.getId())
.map(Container::getId)
.sorted()
.collect(toList());
}

public String getPluginsHash() {
final String plugins = getPluginsList().stream().collect(Collectors.joining());
final String plugins = String.join("", getPluginsList());
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(plugins.getBytes(StandardCharsets.UTF_8));
Expand Down
Loading