diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/DataPointStagingStore.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/DataPointStagingStore.java index ad70c08d..ece22789 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/DataPointStagingStore.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/DataPointStagingStore.java @@ -149,16 +149,9 @@ public void propagate(String dbName) log.info( "propagating points from staged files for db {}.", dbName ); try { - List files = stagingFiles.collectEligibleFiles(dbName); - int count = 0; - for (SortedStagingFile file: files) { - executorCompletionService.submit(new IntervalProcessorTask(this, file)); - count++; - } + List> statsList = stagingFiles.collectEligibleFiles(dbName, this); - while (count > 0) { - Future result = executorCompletionService.take(); - count--; + for (Future result : statsList) { IntervalProcessors.Stats stats = result.get(); @@ -168,8 +161,9 @@ public void propagate(String dbName) MetricRegistry.name("staging", "intervalprocessor", stats.dbName, "metrics", "raw")).inc(stats.nLines); metricRegistry.counter( MetricRegistry.name("staging", "intervalprocessor", stats.dbName, "metrics", "aggr")).inc(stats.nRecords); + + log.info( String.format("finished propagating points from staged files. processed [%s] files.", stats.sortedStagingFile) ); } - log.info( String.format("finished propagating points from staged files. processed [%s] files.", files.size()) ); } catch(Throwable t) { @@ -187,7 +181,7 @@ public void add( String dbName, int from, long metricId, double val, String metr { received.mark(); - StagingFileSet stagingFile = stagingFileSetProvider.get( dbName, from ); + StagingFileSet stagingFile = stagingFileSetProvider.get( dbName, from, (int) (metricId % 10)); StagingFileRecord r = new StagingFileRecord( stagingFile, metricId, DataPoint.strValue(val), metricName ); if ( queue.offer( r ) ) //TODO: slow down instead of dropping? @@ -304,6 +298,9 @@ void close() this.intervalProcessors.shutdown(); } + public Future submitIntervalProcessorTask(SortedStagingFile sortedFile) { + return this.executorCompletionService.submit(new IntervalProcessorTask(this, sortedFile)); + } private static class IntervalProcessorTask implements Callable { diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileRecord.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileRecord.java index 8cd084bb..31e70c08 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileRecord.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileRecord.java @@ -6,8 +6,6 @@ */ package com.demandware.carbonj.service.db.points; -import com.demandware.carbonj.service.strings.StringsCache; - public class StagingFileRecord { public final StagingFileSet fileName; @@ -39,7 +37,7 @@ public class StagingFileRecord metricName = null; } else { strValue = line.substring(idStart, idEnd); - metricName = StringsCache.get(line.substring(idEnd + 1)); + metricName = line.substring(idEnd + 1); } } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSet.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSet.java index f828606d..91cb9de5 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSet.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSet.java @@ -17,8 +17,6 @@ import java.util.Optional; import java.util.Set; -import com.google.common.base.Joiner; - import com.demandware.carbonj.service.db.util.time.TimeSource; /** @@ -58,6 +56,8 @@ public class StagingFileSet */ final int from; + final int group; + final private int hashCode; private int collectWhenUnmodifiedFor = 600; @@ -66,33 +66,37 @@ public class StagingFileSet StagingFileSet(File stagingFile) { - String stagingFileName = stagingFile.getName(); - this.id = stagingFileName.substring( 0, stagingFileName.indexOf( '.' )); - - int delPos = id.indexOf( PARTS_DELIMITER ); - dbName = id.substring( 0, id.indexOf( PARTS_DELIMITER ) ); - from = Integer.parseInt( id.substring( delPos + 1 ) ); - this.hashCode = calcHashCode(); + this(stagingFile.getName().substring(0, stagingFile.getName().indexOf( '.' ))); } public StagingFileSet( String id ) { this.id = id; - int delPos = id.indexOf( PARTS_DELIMITER ); - dbName = id.substring( 0, id.indexOf( PARTS_DELIMITER ) ); + String[] elements = id.split(PARTS_DELIMITER); + dbName = elements[0]; + from = Integer.parseInt(elements[1]); + group = elements.length > 2 ? Integer.parseInt(elements[2]) : Integer.MAX_VALUE; this.collectWhenUnmodifiedFor = getCollectionIntervalSeconds( dbName ); - from = Integer.parseInt( id.substring( delPos + 1 ) ); this.hashCode = calcHashCode(); } - public StagingFileSet( String dbName, int from) - { - this.id = Joiner.on( PARTS_DELIMITER ).join( dbName, from ); + + public StagingFileSet( String dbName, int from) { + this(dbName, from, Integer.MAX_VALUE); + } + + public StagingFileSet( String dbName, int from, int group) { + this.id = getId(dbName, from, group); this.dbName = dbName; this.from = from; + this.group = group; this.collectWhenUnmodifiedFor = getCollectionIntervalSeconds( dbName ); this.hashCode = calcHashCode(); } + public static String getId(String dbName, int from, int group) { + return dbName + PARTS_DELIMITER + from + PARTS_DELIMITER + group; + } + private int getCollectionIntervalSeconds(String dbName) { return switch (dbName) { @@ -160,19 +164,16 @@ private List existingFiles( File dir, String fileId, boolean sorted) private Optional extractSequence( Path path, String fileId, boolean sorted) { String name = path.getFileName().toString(); + int dot = name.indexOf( '.' ); // ignore unrelated file - if( !name.startsWith( fileId ) ) + if (dot <= 0 || !name.substring(0, dot).equals( fileId )) { return Optional.empty(); } // "." for unsorted files or "..s" for sorted files - int seqStart = name.indexOf( '.' ) + 1; - if( seqStart < 2 ) - { - return Optional.empty(); - } + int seqStart = dot + 1; String seq; if( sorted ) { diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetCollector.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetCollector.java index 7c48a77a..c1cd622c 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetCollector.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetCollector.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Future; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -27,9 +28,10 @@ public StagingFileSetCollector(File dir) this.dir = Preconditions.checkNotNull( dir ); } - synchronized public List collectEligibleFiles(Map files, String dbName) + synchronized public List> collectEligibleFiles( + Map files, String dbName, DataPointStagingStore dataPointStagingStore) { - List sortedFiles = new ArrayList<>(); + List> stats = new ArrayList<>(); List names = new ArrayList<>( files.keySet().stream().filter(fs -> fs.dbName.equals(dbName)).toList() ); names.sort(Comparator.comparingInt(o -> o.from)); @@ -37,17 +39,16 @@ synchronized public List collectEligibleFiles(Map fs.needsCollection( files.get( fs ).lastModified() ) ) .forEach( fs -> { - log.debug( "processing staging file: [" + fs + "]" ); + log.info( "processing staging file: [" + fs + "]" ); StagingFile f = files.remove( fs ); f.close(); Optional lastSorted = fs.getLastSortedFileName( dir ); log.debug( "sorting ..." ); SortedStagingFile sortedFile = f.sort(lastSorted, dbName); log.debug("sorted file: [" + sortedFile + "]"); - sortedFiles.add( sortedFile ); + stats.add(dataPointStagingStore.submitIntervalProcessorTask(sortedFile)); } ); - log.debug("sorted files: [" + sortedFiles + "]"); - return sortedFiles; + return stats; } } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetProvider.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetProvider.java index d1ec3c29..279f2c6c 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetProvider.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFileSetProvider.java @@ -9,43 +9,34 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; public class StagingFileSetProvider { - private int cacheSize = 20; // 3 archive types, usually there will be only one (current) interval per archive. - private LoadingCache cache = CacheBuilder.newBuilder() + private final int cacheSize = 300; + + private final LoadingCache cache = CacheBuilder.newBuilder() .maximumSize( cacheSize ) .expireAfterWrite( 30,TimeUnit.MINUTES ) - .build( new CacheLoader() - { - @Override - public StagingFileSet load( String key ) - throws Exception - { - return new StagingFileSet( key ); - } - } ); - + .build(new CacheLoader<>() { + @SuppressWarnings("NullableProblems") + @Override + public StagingFileSet load(String key) { + return new StagingFileSet(key); + } + } ); - StagingFileSet get(String dbName, int from) + StagingFileSet get(String dbName, int from, int group) { try { - return cache.get( toId( dbName, from ) ); + return cache.get(StagingFileSet.getId(dbName, from, group)); } catch(ExecutionException e) { - throw Throwables.propagate( e ); + throw new RuntimeException(e); } } - - //TODO: refactor - id formatting logic is in StagingFilesSet and here now. - private String toId(String dbName, int from) - { - return dbName + StagingFileSet.PARTS_DELIMITER + from; - } } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFiles.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFiles.java index 60db9103..59cb3597 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFiles.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/db/points/StagingFiles.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import com.codahale.metrics.MetricRegistry; import com.demandware.carbonj.service.db.model.MetricProvider; @@ -30,7 +31,7 @@ public class StagingFiles /** * Tracks current open files. */ - private volatile ConcurrentMap files = new ConcurrentHashMap<>(); + private final ConcurrentMap files = new ConcurrentHashMap<>(); private final File dir; @@ -109,9 +110,9 @@ public void deleteFilesOlderThan(int time) } } - public List collectEligibleFiles(String dbName) + public List> collectEligibleFiles(String dbName, DataPointStagingStore dataPointStagingStore) { - return fileSetCollector.collectEligibleFiles(files, dbName); + return fileSetCollector.collectEligibleFiles(files, dbName, dataPointStagingStore); } public void write(StagingFileRecord r) diff --git a/carbonj.service/src/test/java/com/demandware/carbonj/service/db/points/_StagingFileName.java b/carbonj.service/src/test/java/com/demandware/carbonj/service/db/points/_StagingFileName.java index 9cdfd3c8..02b3f7fa 100644 --- a/carbonj.service/src/test/java/com/demandware/carbonj/service/db/points/_StagingFileName.java +++ b/carbonj.service/src/test/java/com/demandware/carbonj/service/db/points/_StagingFileName.java @@ -90,7 +90,9 @@ public void canParseSortedFileId() public void canBuildFileId() { StagingFileSet fn = new StagingFileSet( "5m7d", 100000); - assertAttributes( fn, "5m7d-100000", "5m7d", 100000); + assertAttributes( fn, "5m7d-100000-2147483647", "5m7d", 100000); + fn = new StagingFileSet( "5m7d", 100000, 1); + assertAttributes( fn, "5m7d-100000-1", "5m7d", 100000); } private void assertAttributes( StagingFileSet fn, String fileId, String dbName, int from)