diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java index 6145ba40116..034f1c890a4 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java @@ -6,6 +6,7 @@ import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; @@ -33,33 +34,31 @@ public class ScannerFactory { protected int maxQueue = 1000; - protected HashSet instances = new HashSet<>(); - protected HashSet sessionInstances = new HashSet<>(); - protected AccumuloClient cxn; - protected boolean open = true; + protected final Set instances = Collections.synchronizedSet(new HashSet<>()); + protected final Set sessionInstances = Collections.synchronizedSet(new HashSet<>()); + protected final AccumuloClient cxn; + // using an AtomicBoolean to give us a separate monitor for synchronization + protected final AtomicBoolean open = new AtomicBoolean(true); + protected boolean accrueStats = false; - protected Query settings; + protected Query settings = null; protected ResourceQueue scanQueue = null; - ShardQueryConfiguration config = null; + protected ShardQueryConfiguration config = null; private static final Logger log = Logger.getLogger(ScannerFactory.class); public ScannerFactory(GenericQueryConfiguration queryConfiguration) { this.cxn = queryConfiguration.getClient(); - - if (queryConfiguration instanceof ShardQueryConfiguration) { - this.settings = ((ShardQueryConfiguration) queryConfiguration).getQuery(); - this.accrueStats = ((ShardQueryConfiguration) queryConfiguration).getAccrueStats(); - } log.debug("Created scanner factory " + System.identityHashCode(this) + " is wrapped ? " + (cxn instanceof WrappedConnector)); if (queryConfiguration instanceof ShardQueryConfiguration) { - config = ((ShardQueryConfiguration) queryConfiguration); - maxQueue = ((ShardQueryConfiguration) queryConfiguration).getMaxScannerBatchSize(); - this.settings = ((ShardQueryConfiguration) queryConfiguration).getQuery(); + this.config = ((ShardQueryConfiguration) queryConfiguration); + this.maxQueue = this.config.getMaxScannerBatchSize(); + this.settings = this.config.getQuery(); + this.accrueStats = this.config.getAccrueStats(); try { - scanQueue = new ResourceQueue(((ShardQueryConfiguration) queryConfiguration).getNumQueryThreads(), this.cxn); + scanQueue = new ResourceQueue(this.config.getNumQueryThreads(), this.cxn); } catch (Exception e) { throw new RuntimeException(e); } @@ -74,50 +73,71 @@ public ScannerFactory(AccumuloClient client) { public ScannerFactory(AccumuloClient client, int queueSize) { try { this.cxn = client; - scanQueue = new ResourceQueue(queueSize, client); + this.scanQueue = new ResourceQueue(queueSize, client); } catch (Exception e) { throw new RuntimeException(e); } } - public synchronized Scanner newSingleScanner(String tableName, Set auths, Query query) throws TableNotFoundException { - if (open) { + public Scanner newSingleScanner(String tableName, Set auths, Query query) throws TableNotFoundException { + if (open.get()) { Scanner bs = QueryScannerHelper.createScannerWithoutInfo(cxn, tableName, auths, query); log.debug("Created scanner " + System.identityHashCode(bs)); if (log.isTraceEnabled()) { log.trace("Adding instance " + bs.hashCode()); } - return bs; + synchronized (open) { + if (open.get()) { + instances.add(bs); + return bs; + } else { + bs.close(); + throw new IllegalStateException("Factory has been locked. No new scanners can be created."); + } + } } else { throw new IllegalStateException("Factory has been locked. No new scanners can be created."); } } - public synchronized BatchScanner newScanner(String tableName, Set auths, int threads, Query query) throws TableNotFoundException { - if (open) { + public BatchScanner newScanner(String tableName, Set auths, int threads, Query query) throws TableNotFoundException { + if (open.get()) { BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query); log.debug("Created scanner " + System.identityHashCode(bs)); if (log.isTraceEnabled()) { log.trace("Adding instance " + bs.hashCode()); } - instances.add(bs); - return bs; + synchronized (open) { + if (open.get()) { + instances.add(bs); + return bs; + } else { + bs.close(); + throw new IllegalStateException("Factory has been locked. No new scanners can be created."); + } + } } else { throw new IllegalStateException("Factory has been locked. No new scanners can be created."); } } - public synchronized BatchScanner newScanner(String tableName, Set auths, int threads, Query query, boolean reportErrors) - throws TableNotFoundException { - if (open) { + public BatchScanner newScanner(String tableName, Set auths, int threads, Query query, boolean reportErrors) throws TableNotFoundException { + if (open.get()) { BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query, reportErrors); log.debug("Created scanner " + System.identityHashCode(bs)); if (log.isTraceEnabled()) { log.trace("Adding instance " + bs.hashCode()); } - instances.add(bs); - return bs; + synchronized (open) { + if (open.get()) { + instances.add(bs); + return bs; + } else { + bs.close(); + throw new IllegalStateException("Factory has been locked. No new scanners can be created."); + } + } } else { throw new IllegalStateException("Factory has been locked. No new scanners can be created."); } @@ -145,7 +165,7 @@ public BatchScanner newScanner(String tableName, Query query) throws TableNotFou * @throws Exception * if there are issues */ - public synchronized BatchScannerSession newQueryScanner(final String tableName, final Set auths, Query settings) throws Exception { + public BatchScannerSession newQueryScanner(final String tableName, final Set auths, Query settings) throws Exception { return newLimitedScanner(BatchScannerSession.class, tableName, auths, settings).setThreads(scanQueue.getCapacity()); } @@ -169,11 +189,11 @@ public synchronized BatchScannerSession newQueryScanner(final String tableName, * if there are issues * */ - public synchronized T newLimitedScanner(Class wrapper, final String tableName, final Set auths, - final Query settings) throws Exception { + public T newLimitedScanner(Class wrapper, final String tableName, final Set auths, final Query settings) + throws Exception { Preconditions.checkNotNull(scanQueue); Preconditions.checkNotNull(wrapper); - Preconditions.checkArgument(open, "Factory has been locked. No New scanners can be created"); + Preconditions.checkArgument(open.get(), "Factory has been locked. No New scanners can be created"); log.debug("Creating limited scanner whose max threads is is " + scanQueue.getCapacity() + " and max capacity is " + maxQueue); @@ -194,9 +214,15 @@ public synchronized T newLimitedScanner(Class wrap if (log.isTraceEnabled()) { log.trace("Adding instance " + session.hashCode()); } - sessionInstances.add(session); - - return session; + synchronized (open) { + if (open.get()) { + sessionInstances.add(session); + return session; + } else { + session.close(); + throw new IllegalStateException("Factory has been locked. No new scanners can be created."); + } + } } /** @@ -213,7 +239,7 @@ public synchronized T newLimitedScanner(Class wrap * @throws Exception * if there are issues */ - public synchronized RangeStreamScanner newRangeScanner(final String tableName, final Set auths, final Query settings) throws Exception { + public RangeStreamScanner newRangeScanner(final String tableName, final Set auths, final Query settings) throws Exception { return newRangeScanner(tableName, auths, settings, Integer.MAX_VALUE); } @@ -221,16 +247,21 @@ public RangeStreamScanner newRangeScanner(String tableName, Set return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setShardsPerDayThreshold(shardsPerDayThreshold).setScannerFactory(this); } - public synchronized boolean close(ScannerBase bs) { - boolean removed = instances.remove(bs); - if (removed) { + public boolean close(ScannerBase bs) { + try { log.debug("Closed scanner " + System.identityHashCode(bs)); - if (log.isTraceEnabled()) { - log.trace("Closing instance " + bs.hashCode()); + if (instances.remove(bs)) { + if (log.isTraceEnabled()) { + log.trace("Closing instance " + bs.hashCode()); + } + bs.close(); + return true; } - bs.close(); + } catch (Exception e) { + // ANY EXCEPTION HERE CAN SAFELY BE IGNORED + log.trace("Exception closing ScannerBase, can be safely ignored: {}", e); } - return removed; + return false; } /** @@ -238,7 +269,7 @@ public synchronized boolean close(ScannerBase bs) { * * @return a NEW collection of scanners */ - public synchronized Collection currentScanners() { + public Collection currentScanners() { return new ArrayList<>(instances); } @@ -247,28 +278,30 @@ public synchronized Collection currentScanners() { * * @return a NEW collection of scanner session instances */ - public synchronized Collection currentSessions() { + public Collection currentSessions() { return new ArrayList<>(sessionInstances); } - public synchronized boolean lockdown() { + public boolean lockdown() { log.debug("Locked scanner factory " + System.identityHashCode(this)); if (log.isTraceEnabled()) { log.trace("Locked down with following stacktrace", new Exception("stacktrace for debugging")); } - open = false; - return open; + synchronized (open) { + return open.getAndSet(false); + } } - public synchronized void close(ScannerSession bs) { + public void close(ScannerSession bs) { try { log.debug("Closed session " + System.identityHashCode(bs)); - sessionInstances.remove(bs); - if (log.isTraceEnabled()) { - log.trace("Closing instance " + bs.hashCode()); + if (sessionInstances.remove(bs)) { + if (log.isTraceEnabled()) { + log.trace("Closing instance " + bs.hashCode()); + } + bs.close(); } - bs.close(); } catch (Exception e) { // ANY EXCEPTION HERE CAN SAFELY BE IGNORED log.trace("Exception closing ScannerSession, can be safely ignored: {}", e); @@ -279,30 +312,40 @@ public void setMaxQueue(int size) { this.maxQueue = size; } - public synchronized ScannerBase newRfileScanner(String tableName, Set auths, Query setting) { - Configuration conf = new Configuration(); + public ScannerBase newRfileScanner(String tableName, Set auths, Query setting) { + if (open.get()) { + Configuration conf = new Configuration(); - AccumuloClient con = cxn; + AccumuloClient con = cxn; - Properties clientProps = con.properties(); - final String instanceName = clientProps.getProperty(ClientProperty.INSTANCE_NAME.getKey()); - final String zookeepers = clientProps.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey()); + Properties clientProps = con.properties(); + final String instanceName = clientProps.getProperty(ClientProperty.INSTANCE_NAME.getKey()); + final String zookeepers = clientProps.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey()); - AccumuloHelper.setInstanceName(conf, instanceName); - AccumuloHelper.setUsername(conf, con.whoami()); + AccumuloHelper.setInstanceName(conf, instanceName); + AccumuloHelper.setUsername(conf, con.whoami()); - AccumuloHelper.setZooKeepers(conf, zookeepers); - BulkInputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + AccumuloHelper.setZooKeepers(conf, zookeepers); + BulkInputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); - AccumuloHelper.setPassword(conf, config.getAccumuloPassword().getBytes()); - BulkInputFormat.setMemoryInput(conf, con.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next()); + AccumuloHelper.setPassword(conf, config.getAccumuloPassword().getBytes()); + BulkInputFormat.setMemoryInput(conf, con.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next()); - conf.set(MultiRfileInputformat.CACHE_METADATA, "true"); + conf.set(MultiRfileInputformat.CACHE_METADATA, "true"); - ScannerBase baseScanner = new RfileScanner(con, conf, tableName, auths, 1); + ScannerBase baseScanner = new RfileScanner(con, conf, tableName, auths, 1); - instances.add(baseScanner); - - return baseScanner; + synchronized (open) { + if (open.get()) { + instances.add(baseScanner); + return baseScanner; + } else { + baseScanner.close(); + throw new IllegalStateException("Factory has been locked. No new scanners can be created."); + } + } + } else { + throw new IllegalStateException("Factory has been locked. No new scanners can be created."); + } } }