Skip to content

Commit

Permalink
Remove refernces to accumulo private class threadpools (#2346)
Browse files Browse the repository at this point in the history
* Remove references to accumulo threadpools

* Fix rebase
  • Loading branch information
alerman authored Apr 23, 2024
1 parent 2441af7 commit 3011367
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 17 deletions.
3 changes: 2 additions & 1 deletion checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
<module name="TreeWalker">
<!--check that only Accumulo 1.9 public APIs are imported-->
<module name="RegexpSinglelineJava">
<property name="format" value="import\s+org\.apache\.accumulo\.(.*\.(impl|thrift|crypto)\..*|(?!core|minicluster|testing).*|core\.(?!client|conf|data|security|spi|iterators).*)"/>
<!-- <property name="format" value="import\s+org\.apache\.accumulo\.(.*\.(impl|thrift|crypto)\..*|(?!core|minicluster|testing).*|core\.(?!client|conf|data|security|spi|iterators).*)"/>-->
<property name="format" value="import\s+org\.apache\.accumulo\.(?!(core\.(client|data|iterators|security)|minicluster|hadoop)\.).*" />
<property name="ignoreComments" value="true" />
<property name="message" value="Accumulo non-public classes imported" />
</module>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<surefire.forkCount>1C</surefire.forkCount>
<version.accumulo>2.1.1</version.accumulo>
<version.accumulo>2.1.2</version.accumulo>
<version.arquillian>1.4.1.Final</version.arquillian>
<version.arquillian-weld-ee-embedded>1.0.0.Final</version.arquillian-weld-ee-embedded>
<version.assertj>3.20.2</version.assertj>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -33,6 +33,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import datawave.ingest.util.cache.ReloadableCacheBuilder;
import datawave.ingest.util.cache.watch.FileRuleWatcher;
Expand Down Expand Up @@ -108,8 +109,10 @@ public class ConfigurableAgeOffFilter extends Filter implements OptionDescriber

private static final Logger log = Logger.getLogger(ConfigurableAgeOffFilter.class);

private static final ScheduledThreadPoolExecutor SIMPLE_TIMER = ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
ConfigurableAgeOffFilter.class.getSimpleName() + "-ruleCache-refresh", false);
private static final ThreadFactory TIMER_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat(ConfigurableAgeOffFilter.class.getSimpleName() + "-ruleCache-refresh-%d").build();

private static final ScheduledExecutorService SIMPLE_TIMER = Executors.newSingleThreadScheduledExecutor(TIMER_THREAD_FACTORY);

public static final String UPDATE_INTERVAL_MS_PROP = "tserver.datawave.ageoff.cache.update.interval.ms";
protected static final long DEFAULT_UPDATE_INTERVAL_MS = 5;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package datawave.core.iterators;

import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.log4j.Logger;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
*
*/
Expand Down Expand Up @@ -52,7 +53,7 @@ private ThreadPoolExecutor createExecutorService(final String prop, final String
}
final ThreadPoolExecutor service = createExecutorService(getMaxThreads(prop, pluginEnv), name + " (" + instanceId + ')');
threadPools.put(name, service);
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(accumuloConfiguration).scheduleWithFixedDelay(() -> {
Executors.newScheduledThreadPool(getMaxThreads(prop, pluginEnv)).scheduleWithFixedDelay(() -> {
try {
// Very important to not use the accumuloConfiguration in this thread and instead use the pluginEnv
// The accumuloConfiguration caches table ids which may no longer exist down the road.
Expand All @@ -77,8 +78,8 @@ private ThreadPoolExecutor createExecutorService(final String prop, final String
}

private ThreadPoolExecutor createExecutorService(int maxThreads, String name) {
ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createThreadPool(maxThreads, maxThreads, 5 * 60, TimeUnit.SECONDS, name,
new LinkedBlockingQueue<>(), false);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(name + "-%d").build();
ThreadPoolExecutor pool = new ThreadPoolExecutor(maxThreads, maxThreads, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), tf);
pool.allowCoreThreadTimeOut(true);
return pool;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package datawave.query.iterator.profile;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.junit.Assert;
import org.junit.Test;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class QuerySpanTest {

@Test
Expand Down Expand Up @@ -83,8 +85,9 @@ public void testMultiThreadedQuerySpanAcrossThreads() {
Runnable r1 = new QSRunnable(qsc, qs1);
Runnable r2 = new QSRunnable(qsc, qs2);
Runnable r3 = new QSRunnable(qsc, qs3);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("QSExecutor-%d").build();
ExecutorService executorService = Executors.newFixedThreadPool(10, tf);

ExecutorService executorService = ThreadPools.getClientThreadPools(Threads.UEH).createFixedThreadPool(10, "QSExecutor", false);
executorService.execute(r1);
executorService.execute(r2);
executorService.execute(r3);
Expand Down

0 comments on commit 3011367

Please sign in to comment.