diff --git a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/ComponentManager.java b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/ComponentManager.java index 01009fc23dfc1..7693d52fae57f 100644 --- a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/ComponentManager.java +++ b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/ComponentManager.java @@ -74,6 +74,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -374,6 +377,8 @@ public String[] categories() { private final DefaultServiceProvider defaultServiceProvider; + private final ReentrantReadWriteLock containerLock = new ReentrantReadWriteLock(); + public ComponentManager(final File m2) { this(m2.toPath()); } @@ -690,10 +695,10 @@ protected List addJarContaining(final ClassLoader loader, final String r .of(plugin) // just a small workaround for maven/gradle .flatMap(this::toPluginLocations) - .filter(path -> !container.find(path.getFileName().toString()).isPresent()) + .filter(path -> !findPlugin(path.getFileName().toString()).isPresent()) .map(file -> { final String id = addPlugin(file.toAbsolutePath().toString()); - if (container.find(id).get().get(ContainerComponentRegistry.class).getComponents().isEmpty()) { + if (findPlugin(id).get().get(ContainerComponentRegistry.class).getComponents().isEmpty()) { removePlugin(id); return null; } @@ -742,9 +747,7 @@ public Map mergeCheckpointConfiguration(final String plugin, fin } public ParameterMeta findConfigurationType(final String plugin, final String name, final String configurationType) { - return container - .findAll() - .stream() + return findAll() .map(c -> c.get(ContainerComponentRegistry.class)) .map(registry -> registry.findComponentFamily(plugin)) .filter(Objects::nonNull) @@ -838,7 +841,7 @@ public static Map replaceKeys(final Map configur } public Stream find(final Function> mapper) { - return container.findAll().stream().flatMap(mapper); + return findAll().flatMap(mapper); } // really a DIY entry point for custom flow, it creates component instances but @@ -852,9 +855,8 @@ public Optional createComponent(final String plugin, final String name, private Optional findComponentInternal(final String plugin, final String name, final ComponentType componentType, final int version, final Map configuration) { - if (container.findAll().isEmpty()) { - autoDiscoverPlugins(false, true); - } + autoDiscoverPluginsIfEmpty(false, true); + final Map conf = mergeCheckpointConfiguration(plugin, name, componentType, configuration); return find(pluginContainer -> Stream .of(findInstance(plugin, name, componentType, version, conf, pluginContainer))) @@ -862,7 +864,35 @@ private Optional findComponentInternal(final String plugin, final String .findFirst(); } + public void autoDiscoverPluginsIfEmpty(final boolean callers, final boolean classpath) { + if (hasPlugins()) { + return; + } + + final WriteLock writeLock = containerLock.writeLock(); + writeLock.lock(); + try { + if (hasPlugins()) { + return; + } + + autoDiscoverPlugins0(callers, classpath); + } finally { + writeLock.unlock(); + } + } + public void autoDiscoverPlugins(final boolean callers, final boolean classpath) { + final WriteLock writeLock = containerLock.writeLock(); + writeLock.lock(); + try { + autoDiscoverPlugins0(callers, classpath); + } finally { + writeLock.unlock(); + } + } + + private void autoDiscoverPlugins0(final boolean callers, final boolean classpath) { if (callers && !Boolean.getBoolean("component.manager.callers.skip")) { addCallerAsPlugin(); } @@ -914,12 +944,10 @@ private Optional findGenericInstance(final String plugin, final String n final Container pluginContainer) { return ofNullable(pluginContainer.get(GenericComponentExtension.class)) .filter(ext -> ext.canHandle(componentType.runtimeType(), plugin, name)) - .map(ext -> Object.class - .cast(ext - .createInstance(componentType.runtimeType(), plugin, name, version, configuration, - ofNullable(pluginContainer.get(AllServices.class)) - .map(AllServices::getServices) - .orElseGet(Collections::emptyMap)))); + .map(ext -> ext.createInstance(componentType.runtimeType(), plugin, name, version, configuration, + ofNullable(pluginContainer.get(AllServices.class)) + .map(AllServices::getServices) + .orElseGet(Collections::emptyMap))); } public Optional findMapper(final String plugin, final String name, final int version, @@ -940,26 +968,43 @@ public Optional findProcessor } public boolean hasPlugin(final String plugin) { - return container.find(plugin).isPresent(); + return findPlugin(plugin).isPresent(); } public Optional findPlugin(final String plugin) { - return container.find(plugin); + final ReadLock readLock = containerLock.readLock(); + readLock.lock(); + try { + return container.find(plugin); + } finally { + readLock.unlock(); + } } - public synchronized String addPlugin(final String pluginRootFile) { - final Optional pl = findPlugin(pluginRootFile); - if (pl.isPresent()) { - return pl.get().getId(); + public String addPlugin(final String pluginRootFile) { + final WriteLock writeLock = containerLock.writeLock(); + writeLock.lock(); + try { + final String pluginId = findPluginId(pluginRootFile); + if (pluginId != null) { + return pluginId; + } + + final String id = this.container + .builder(pluginRootFile) + .withCustomizer(createContainerCustomizer(pluginRootFile)) + .withAdditionalClasspath(findAdditionalClasspathFor(container.buildAutoIdFromName(pluginRootFile))) + .create() + .getId(); + info("Adding plugin: " + pluginRootFile + ", as " + id); + return id; + } finally { + writeLock.unlock(); } - final String id = this.container - .builder(pluginRootFile) - .withCustomizer(createContainerCustomizer(pluginRootFile)) - .withAdditionalClasspath(findAdditionalClasspathFor(container.buildAutoIdFromName(pluginRootFile))) - .create() - .getId(); - info("Adding plugin: " + pluginRootFile + ", as " + id); - return id; + } + + private String findPluginId(final String pluginRootFile) { + return findPlugin(pluginRootFile).map(Container::getId).orElse(null); } public String addWithLocationPlugin(final String location, final String pluginRootFile) { @@ -993,7 +1038,7 @@ private Collection findAdditionalClasspathFor(final String pluginId) { } public void removePlugin(final String id) { - container.find(id).ifPresent(Container::close); + findPlugin(id).ifPresent(Container::close); info("Removed plugin: " + id); } @@ -1019,7 +1064,7 @@ private T executeInContainer(final String plugin, final Supplier supplier final ClassLoader old = thread.getContextClassLoader(); thread .setContextClassLoader( - container.find(plugin).map(Container::getLoader).map(ClassLoader.class::cast).orElse(old)); + findPlugin(plugin).map(Container::getLoader).map(ClassLoader.class::cast).orElse(old)); try { return supplier.get(); } finally { @@ -1027,8 +1072,34 @@ private T executeInContainer(final String plugin, final Supplier supplier } } + private boolean hasPlugins() { + final ReadLock readLock = containerLock.readLock(); + readLock.lock(); + try { + return !container.findAll().isEmpty(); + } finally { + readLock.unlock(); + } + } + + private Stream findAll() { + final ReadLock readLock = containerLock.readLock(); + readLock.lock(); + try { + return container.findAll().stream(); + } finally { + readLock.unlock(); + } + } + public List availablePlugins() { - return container.findAll().stream().map(Container::getId).collect(toList()); + final ReadLock readLock = containerLock.readLock(); + readLock.lock(); + try { + return container.getPluginsList(); + } finally { + readLock.unlock(); + } } protected void containerServices(final Container container, final Map, Object> services) { diff --git a/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/ComponentManagerTest.java b/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/ComponentManagerTest.java index 847d0ac43833c..585e197c1ce3e 100644 --- a/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/ComponentManagerTest.java +++ b/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/ComponentManagerTest.java @@ -45,7 +45,9 @@ import java.util.Enumeration; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; import java.util.stream.Stream; @@ -61,6 +63,9 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.opentest4j.AssertionFailedError; import org.talend.sdk.component.api.record.Record; import org.talend.sdk.component.api.service.configuration.LocalConfiguration; @@ -74,6 +79,9 @@ import org.talend.sdk.component.runtime.record.RecordBuilderFactoryImpl; import org.talend.sdk.component.runtime.serialization.EnhancedObjectInputStream; +import lombok.extern.slf4j.Slf4j; + +@Slf4j class ComponentManagerTest { private final PluginGenerator pluginGenerator = new PluginGenerator(); @@ -219,6 +227,75 @@ void addPluginMultiThread(@TempDir final File temporaryFolder) throws Interrupte } } + @MethodSource("autoDiscoveryMultiThreadSource") + @ParameterizedTest + void autoDiscoveryMultiThread(final Consumer consumer, @TempDir final File temporaryFolder) + throws InterruptedException, IOException { + final File pluginFolder = new File(temporaryFolder, "test-plugins_" + UUID.randomUUID()); + pluginFolder.mkdirs(); + final File plugin1 = pluginGenerator.createChainPlugin(pluginFolder, "plugin1.jar"); + final File plugin2 = pluginGenerator.createChainPlugin(pluginFolder, "plugin2.jar"); + + final int threadCount = 50; + final AtomicBoolean intermittentState = new AtomicBoolean(false); + + final ClassLoader contextLoader = Thread.currentThread().getContextClassLoader(); + final URL[] array = new URL[2]; + array[0] = plugin1.toURI().toURL(); + array[1] = plugin2.toURI().toURL(); + + try (final URLClassLoader tmpLoader = new URLClassLoader(array, contextLoader)) { + Thread.currentThread().setContextClassLoader(tmpLoader); + try (final ComponentManager manager = + new ComponentManager(new File("target/test-dependencies"), "META-INF/test/dependencies", null)) { + + Assertions.assertTrue(manager.availablePlugins().isEmpty()); + + final Thread monitor = new Thread(() -> { + final Thread thread = Thread.currentThread(); + while (!thread.isInterrupted()) { + // we want to check this thread as frequent as possible, to catch the probable problem + // it shouldn't be long + if (manager.availablePlugins().size() % 2 != 0) { + intermittentState.set(true); + } + } + }); + monitor.setDaemon(true); + monitor.start(); + + final Thread[] th = new Thread[threadCount]; + for (int ind = 0; ind < th.length; ind++) { + th[ind] = new Thread(() -> consumer.accept(manager)); + } + for (final Thread thread : th) { + thread.start(); + } + for (final Thread thread : th) { + thread.join(); + } + monitor.interrupt(); + monitor.join(); // it's daemon, but let's wait for it's end + + Assertions.assertEquals(2, manager.availablePlugins().size()); + } finally { // clean temp files + DynamicContainerFinder.SERVICES.clear(); + doCleanup(pluginFolder); + } + } finally { + Thread.currentThread().setContextClassLoader(contextLoader); + } + + // it should be either 2 or 0, depending on the timing of the threads + Assertions.assertFalse(intermittentState.get()); + } + + public static Stream autoDiscoveryMultiThreadSource() { + return Stream.of( + Arguments.of((Consumer) manager -> manager.autoDiscoverPluginsIfEmpty(false, true)), + Arguments.of((Consumer) manager -> manager.autoDiscoverPlugins(false, true))); + } + @Test void run(@TempDir final File temporaryFolder) throws Exception { final File pluginFolder = new File(temporaryFolder, "test-plugins_" + UUID.randomUUID().toString()); diff --git a/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/asm/PluginGenerator.java b/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/asm/PluginGenerator.java index dba9721a5e175..df7843bcaa39b 100644 --- a/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/asm/PluginGenerator.java +++ b/component-runtime-manager/src/test/java/org/talend/sdk/component/runtime/manager/asm/PluginGenerator.java @@ -108,6 +108,10 @@ public String map(final String key) { }); outputStream.putNextEntry(new JarEntry(toPack + "/Messages.properties")); outputStream.write(("error=An error occured").getBytes(StandardCharsets.UTF_8)); + outputStream.closeEntry(); + outputStream.putNextEntry(new JarEntry("TALEND-INF/dependencies.txt")); + outputStream.write(("test.group:" + plugin + ":jar:1:compile").getBytes(StandardCharsets.UTF_8)); + outputStream.closeEntry(); outputStream.putNextEntry(new JarEntry("TALEND-INF/local-configuration.properties")); outputStream.write(("_maxBatchSize.value=" + maxBatchSize).getBytes(StandardCharsets.UTF_8)); outputStream.closeEntry(); diff --git a/container/container-core/src/main/java/org/talend/sdk/component/container/ContainerManager.java b/container/container-core/src/main/java/org/talend/sdk/component/container/ContainerManager.java index c88b4aaeb59d8..eddcc25799830 100644 --- a/container/container-core/src/main/java/org/talend/sdk/component/container/ContainerManager.java +++ b/container/container-core/src/main/java/org/talend/sdk/component/container/ContainerManager.java @@ -53,7 +53,6 @@ import java.util.logging.Level; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.talend.sdk.component.classloader.ConfigurableClassLoader; @@ -330,13 +329,13 @@ public Collection findAll() { public List getPluginsList() { return findAll() .stream() - .map(container -> container.getId()) + .map(Container::getId) .sorted() .collect(toList()); } public String getPluginsHash() { - final String plugins = getPluginsList().stream().collect(Collectors.joining()); + final String plugins = String.join("", getPluginsList()); try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); byte[] hash = digest.digest(plugins.getBytes(StandardCharsets.UTF_8));