Skip to content

Commit

Permalink
W-16093303: Split staging file into group
Browse files Browse the repository at this point in the history
  • Loading branch information
peterzxu-crm committed Jul 1, 2024
1 parent d996f36 commit baba66d
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,9 @@ public void propagate(String dbName)
log.info( "propagating points from staged files for db {}.", dbName );
try
{
List<SortedStagingFile> files = stagingFiles.collectEligibleFiles(dbName);
int count = 0;
for (SortedStagingFile file: files) {
executorCompletionService.submit(new IntervalProcessorTask(this, file));
count++;
}
List<Future<IntervalProcessors.Stats>> statsList = stagingFiles.collectEligibleFiles(dbName, this);

while (count > 0) {
Future<IntervalProcessors.Stats> result = executorCompletionService.take();
count--;
for (Future<IntervalProcessors.Stats> result : statsList) {

IntervalProcessors.Stats stats = result.get();

Expand All @@ -168,8 +161,9 @@ public void propagate(String dbName)
MetricRegistry.name("staging", "intervalprocessor", stats.dbName, "metrics", "raw")).inc(stats.nLines);
metricRegistry.counter(
MetricRegistry.name("staging", "intervalprocessor", stats.dbName, "metrics", "aggr")).inc(stats.nRecords);

log.info( String.format("finished propagating points from staged files. processed [%s] files.", stats.sortedStagingFile) );
}
log.info( String.format("finished propagating points from staged files. processed [%s] files.", files.size()) );
}
catch(Throwable t)
{
Expand All @@ -187,7 +181,7 @@ public void add( String dbName, int from, long metricId, double val, String metr
{
received.mark();

StagingFileSet stagingFile = stagingFileSetProvider.get( dbName, from );
StagingFileSet stagingFile = stagingFileSetProvider.get( dbName, from, (int) (metricId % 10));
StagingFileRecord r = new StagingFileRecord( stagingFile, metricId, DataPoint.strValue(val), metricName );

if ( queue.offer( r ) ) //TODO: slow down instead of dropping?
Expand Down Expand Up @@ -304,6 +298,9 @@ void close()
this.intervalProcessors.shutdown();
}

public Future<IntervalProcessors.Stats> submitIntervalProcessorTask(SortedStagingFile sortedFile) {
return this.executorCompletionService.submit(new IntervalProcessorTask(this, sortedFile));
}

private static class IntervalProcessorTask implements Callable<IntervalProcessors.Stats> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
*/
package com.demandware.carbonj.service.db.points;

import com.demandware.carbonj.service.strings.StringsCache;

public class StagingFileRecord
{
public final StagingFileSet fileName;
Expand Down Expand Up @@ -39,7 +37,7 @@ public class StagingFileRecord
metricName = null;
} else {
strValue = line.substring(idStart, idEnd);
metricName = StringsCache.get(line.substring(idEnd + 1));
metricName = line.substring(idEnd + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.util.Optional;
import java.util.Set;

import com.google.common.base.Joiner;

import com.demandware.carbonj.service.db.util.time.TimeSource;

/**
Expand Down Expand Up @@ -58,6 +56,8 @@ public class StagingFileSet
*/
final int from;

final int group;

final private int hashCode;

private int collectWhenUnmodifiedFor = 600;
Expand All @@ -66,33 +66,37 @@ public class StagingFileSet

StagingFileSet(File stagingFile)
{
String stagingFileName = stagingFile.getName();
this.id = stagingFileName.substring( 0, stagingFileName.indexOf( '.' ));

int delPos = id.indexOf( PARTS_DELIMITER );
dbName = id.substring( 0, id.indexOf( PARTS_DELIMITER ) );
from = Integer.parseInt( id.substring( delPos + 1 ) );
this.hashCode = calcHashCode();
this(stagingFile.getName().substring(0, stagingFile.getName().indexOf( '.' )));
}

public StagingFileSet( String id )
{
this.id = id;
int delPos = id.indexOf( PARTS_DELIMITER );
dbName = id.substring( 0, id.indexOf( PARTS_DELIMITER ) );
String[] elements = id.split(PARTS_DELIMITER);
dbName = elements[0];
from = Integer.parseInt(elements[1]);
group = elements.length > 2 ? Integer.parseInt(elements[2]) : Integer.MAX_VALUE;
this.collectWhenUnmodifiedFor = getCollectionIntervalSeconds( dbName );
from = Integer.parseInt( id.substring( delPos + 1 ) );
this.hashCode = calcHashCode();
}
public StagingFileSet( String dbName, int from)
{
this.id = Joiner.on( PARTS_DELIMITER ).join( dbName, from );

public StagingFileSet( String dbName, int from) {
this(dbName, from, Integer.MAX_VALUE);
}

public StagingFileSet( String dbName, int from, int group) {
this.id = getId(dbName, from, group);
this.dbName = dbName;
this.from = from;
this.group = group;
this.collectWhenUnmodifiedFor = getCollectionIntervalSeconds( dbName );
this.hashCode = calcHashCode();
}

public static String getId(String dbName, int from, int group) {
return dbName + PARTS_DELIMITER + from + PARTS_DELIMITER + group;
}

private int getCollectionIntervalSeconds(String dbName)
{
return switch (dbName) {
Expand Down Expand Up @@ -160,19 +164,16 @@ private List<Integer> existingFiles( File dir, String fileId, boolean sorted)
private Optional<Integer> extractSequence( Path path, String fileId, boolean sorted)
{
String name = path.getFileName().toString();
int dot = name.indexOf( '.' );

// ignore unrelated file
if( !name.startsWith( fileId ) )
if (dot <= 0 || !name.substring(0, dot).equals( fileId ))
{
return Optional.empty();
}

// "<file-id>.<seq>" for unsorted files or "<file-id>.<seq>.s" for sorted files
int seqStart = name.indexOf( '.' ) + 1;
if( seqStart < 2 )
{
return Optional.empty();
}
int seqStart = dot + 1;
String seq;
if( sorted )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
Expand All @@ -27,27 +28,27 @@ public StagingFileSetCollector(File dir)
this.dir = Preconditions.checkNotNull( dir );
}

synchronized public List<SortedStagingFile> collectEligibleFiles(Map<StagingFileSet, StagingFile> files, String dbName)
synchronized public List<Future<IntervalProcessors.Stats>> collectEligibleFiles(
Map<StagingFileSet, StagingFile> files, String dbName, DataPointStagingStore dataPointStagingStore)
{
List<SortedStagingFile> sortedFiles = new ArrayList<>();
List<Future<IntervalProcessors.Stats>> stats = new ArrayList<>();
List<StagingFileSet> names = new ArrayList<>( files.keySet().stream().filter(fs -> fs.dbName.equals(dbName)).toList() );
names.sort(Comparator.comparingInt(o -> o.from));

names.stream()
.filter( fs -> fs.needsCollection( files.get( fs ).lastModified() ) )
.forEach( fs ->
{
log.debug( "processing staging file: [" + fs + "]" );
log.info( "processing staging file: [" + fs + "]" );
StagingFile f = files.remove( fs );
f.close();
Optional<String> lastSorted = fs.getLastSortedFileName( dir );
log.debug( "sorting ..." );
SortedStagingFile sortedFile = f.sort(lastSorted, dbName);
log.debug("sorted file: [" + sortedFile + "]");
sortedFiles.add( sortedFile );
stats.add(dataPointStagingStore.submitIntervalProcessorTask(sortedFile));
}
);
log.debug("sorted files: [" + sortedFiles + "]");
return sortedFiles;
return stats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,34 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

public class StagingFileSetProvider
{
private int cacheSize = 20; // 3 archive types, usually there will be only one (current) interval per archive.
private LoadingCache<String, StagingFileSet> cache = CacheBuilder.newBuilder()
private final int cacheSize = 300;

private final LoadingCache<String, StagingFileSet> cache = CacheBuilder.newBuilder()
.maximumSize( cacheSize )
.expireAfterWrite( 30,TimeUnit.MINUTES )
.build( new CacheLoader<String, StagingFileSet>()
{
@Override
public StagingFileSet load( String key )
throws Exception
{
return new StagingFileSet( key );
}
} );

.build(new CacheLoader<>() {
@SuppressWarnings("NullableProblems")
@Override
public StagingFileSet load(String key) {
return new StagingFileSet(key);
}
} );

StagingFileSet get(String dbName, int from)
StagingFileSet get(String dbName, int from, int group)
{
try
{
return cache.get( toId( dbName, from ) );
return cache.get(StagingFileSet.getId(dbName, from, group));
}
catch(ExecutionException e)
{
throw Throwables.propagate( e );
throw new RuntimeException(e);
}
}

//TODO: refactor - id formatting logic is in StagingFilesSet and here now.
private String toId(String dbName, int from)
{
return dbName + StagingFileSet.PARTS_DELIMITER + from;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;

import com.codahale.metrics.MetricRegistry;
import com.demandware.carbonj.service.db.model.MetricProvider;
Expand All @@ -30,7 +31,7 @@ public class StagingFiles
/**
* Tracks current open files.
*/
private volatile ConcurrentMap<StagingFileSet, StagingFile> files = new ConcurrentHashMap<>();
private final ConcurrentMap<StagingFileSet, StagingFile> files = new ConcurrentHashMap<>();

private final File dir;

Expand Down Expand Up @@ -109,9 +110,9 @@ public void deleteFilesOlderThan(int time)
}
}

public List<SortedStagingFile> collectEligibleFiles(String dbName)
public List<Future<IntervalProcessors.Stats>> collectEligibleFiles(String dbName, DataPointStagingStore dataPointStagingStore)
{
return fileSetCollector.collectEligibleFiles(files, dbName);
return fileSetCollector.collectEligibleFiles(files, dbName, dataPointStagingStore);
}

public void write(StagingFileRecord r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public void canParseSortedFileId()
public void canBuildFileId()
{
StagingFileSet fn = new StagingFileSet( "5m7d", 100000);
assertAttributes( fn, "5m7d-100000", "5m7d", 100000);
assertAttributes( fn, "5m7d-100000-2147483647", "5m7d", 100000);
fn = new StagingFileSet( "5m7d", 100000, 1);
assertAttributes( fn, "5m7d-100000-1", "5m7d", 100000);
}

private void assertAttributes( StagingFileSet fn, String fileId, String dbName, int from)
Expand Down

0 comments on commit baba66d

Please sign in to comment.