Skip to content

Commit

Permalink
MODHAADM-6 Experiments continued
Browse files Browse the repository at this point in the history
 - move queue idle/active handling to JobHandler
  • Loading branch information
nielserik committed Dec 30, 2024
1 parent 300c794 commit 6db24a7
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,11 @@ private void generateIds(RoutingContext routingContext) {
responseText(routingContext, 200).end(response.toString());
}

private final static ConcurrentMap<String, ConcurrentMap<String, String>> fileProcessorVerticles = new ConcurrentHashMap<>();
private final static ConcurrentMap<String, ConcurrentMap<String, String>> fileImportVerticles = new ConcurrentHashMap<>();

private void initializeFileQueueProcessor(String tenant, String jobId, RoutingContext routingContext) {
fileProcessorVerticles.putIfAbsent(tenant, new ConcurrentHashMap<>());
String previousMapping = fileProcessorVerticles.get(tenant).putIfAbsent(jobId, "initializing");
private void startImportVerticle(String tenant, String jobId, RoutingContext routingContext) {
fileImportVerticles.putIfAbsent(tenant, new ConcurrentHashMap<>());
String previousMapping = fileImportVerticles.get(tenant).putIfAbsent(jobId, "initializing");
if (previousMapping == null) {
Vertx vertx = Vertx.vertx(new VertxOptions().setMaxWorkerExecuteTime(10).setMaxWorkerExecuteTimeUnit(TimeUnit.MINUTES));
vertx.deployVerticle(new JobHandler(tenant, jobId, vertx, routingContext), new DeploymentOptions()
Expand All @@ -952,7 +952,7 @@ private void initializeFileQueueProcessor(String tenant, String jobId, RoutingCo
if (started.succeeded()) {
System.out.println("ID-NE: started verticle for " + tenant + " and job ID " + jobId);
System.out.println("ID-NE: current thread " + Thread.currentThread().getName());
fileProcessorVerticles.get(tenant).put(jobId, started.result());
fileImportVerticles.get(tenant).put(jobId, started.result());
System.out.println("ID-NE: deployed verticles: " + vertx.deploymentIDs());
} else {
System.out.println("ID-NE: Couldn't start file processor threads for tenant " + tenant + " and jobID " + jobId);
Expand All @@ -976,7 +976,7 @@ private void stageXmlRecords(Vertx vertx, RoutingContext routingContext) {
.onComplete(resp -> {
if (resp.result().found()) {
new FileQueue(vertx, tenant, jobId).addNewFile(fileName, xmlContent);
initializeFileQueueProcessor(tenant, jobId, routingContext);
startImportVerticle(tenant, jobId, routingContext);
responseText(routingContext, 200).end("File queued for processing in ms " + (System.currentTimeMillis() - fileStartTime));
} else {
responseError(routingContext, 404, "Error: No harvest config with id [" + jobId + "] found.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ public void put(String jsonRecord) {

private void releaseBatch(BatchOfRecords batch) {
try {
//System.out.println("releaseBatch() # "+batch.getBatchNumber() +" with " + batch.size() + " records, batch queue size before put: " + batchQueue.size());
batchQueue.put(batch);
//System.out.println("releaseBatch() # "+batch.getBatchNumber() +" with " + batch.size() + ", batch queue size after put: " + batchQueue.size());
persistBatch();
} catch (InterruptedException ie) {
System.out.println("Error: Queue put operation was interrupted.");
Expand All @@ -57,51 +55,45 @@ private void releaseBatch(BatchOfRecords batch) {

@Override
public void endOfDocument() {
//System.out.println("endOfDocument() putting null record ");
put(null);
//System.out.println("endOfDocument() null record put");
}

/**
* This is the last function of the import pipeline, and since it's asynchronous
* it must be in charge of when to invoke reporting. JobHandler will not
* otherwise know when the last upsert of a source file of records is done, for example.
*/
private void persistBatch() {
BatchOfRecords batch = batchQueue.peek();
//System.out.println("persistBatch, peeked batch #" + (batch == null ? " null " : batch.getBatchNumber() + " with " + batch.size() + "records, last batch? " + batch.isLastBatchOfFile()));
if (batch != null) {
if (batch.size() > 0) {
// long batchUpsertStarted = System.currentTimeMillis();
updateClient.inventoryUpsert(batch.getUpsertRequestBody()).onComplete(response -> {
job.reporting().incrementRecordsProcessed(batch.size());
//System.out.println("persistBatch(), upsert done for batch #" + batch.getBatchNumber() + " with " + batch.size() + " records");
reporting(batch);
if (batch.isLastBatchOfFile()) {
report(batch);
}
try {
//System.out.println("Taking batch from queue with " + batchQueue.size() + " batches");
batchQueue.take();
//System.out.println("Took batch #" + takebatch.getBatchNumber() + " from queue, last batch? " + takebatch.isLastBatchOfFile());

} catch (InterruptedException ignored) {}
});
} else { // we get here when the last set of records is exactly 100. We just need to report
//System.out.println("persistBatch(), batch #" + batch.getBatchNumber() + " with " + batch.size() + " records, is last batch? " + batch.isLastBatchOfFile());
reporting(batch);
if (batch.isLastBatchOfFile()) {
report(batch);
}
try {
//System.out.println("Taking batch from queue with " + batchQueue.size() + " batches");
batchQueue.take();
//System.out.println("Took batch #" + takebatch.getBatchNumber() + " from queue, last batch? " + takebatch.isLastBatchOfFile());
} catch (InterruptedException ignored) {}

}
}
}

private void reporting(BatchOfRecords batch) {
//System.out.println("Report if last batch, last batch? " + batch.isLastBatchOfFile());
if (batch.isLastBatchOfFile()) {
job.reporting().incrementFilesProcessed();
job.reporting().reportFileStats();
job.reporting().reportFileQueueStats();
if (job.reporting().fileQueueDone()) {
//System.out.println("reporting() no more files in queue, no pending file stats, reset batch counter");
batchCounter = 0;
}
private void report(BatchOfRecords batch) {
job.reporting().incrementFilesProcessed();
job.reporting().reportFileStats();
var queueDone = job.fileQueueDone(batch.isLastBatchOfFile());
job.reporting().reportFileQueueStats(queueDone);
if (queueDone) {
batchCounter = 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.folio.harvesteradmin.legacydata.statics.ApiPaths.HARVESTER_HARVESTABLES_PATH;

Expand All @@ -25,13 +26,14 @@ public class JobHandler extends AbstractVerticle {
private final FileQueue fileQueue;
private final Reporting reporting;
private final InventoryBatchUpdating inventoryUpdater;
private final AtomicBoolean passive = new AtomicBoolean(true);
public static final Logger logger = LogManager.getLogger("queued-files-processing");

public JobHandler(String tenant, String jobId, Vertx vertx, RoutingContext routingContext) {
this.tenant = tenant;
this.jobId = jobId;
fileQueue = new FileQueue(vertx, tenant, jobId);
reporting = new Reporting(fileQueue);
reporting = new Reporting(this);
inventoryUpdater = new InventoryBatchUpdating(this, routingContext);
}

Expand All @@ -40,10 +42,11 @@ public void start() {
System.out.println("ID-NE: starting file processor for tenant " + tenant + " and job ID " + jobId);
vertx.setPeriodic(200, (r) -> {
File currentFile = nextFileIfPossible();
if (currentFile != null) { // null if queue is empty or the previous file is still processing
var refreshPipeline = reporting.betweenQueuesOfFiles.get(); // Rebuild pipeline once per job
reporting.markNextFileProcessing(currentFile.getName());
processFile(currentFile,refreshPipeline).onComplete(na -> fileQueue.deleteFile(currentFile))
if (currentFile != null) { // null if queue is empty or a previous file is still processing
boolean activating = passive.getAndSet(false); // check if job was passive before this file
reporting.nowProcessing(currentFile.getName(), activating); // reset stats if new job was just activated
processFile(currentFile, activating) // refresh style sheets from db if new job
.onComplete(na -> fileQueue.deleteFile(currentFile))
.onFailure(f -> System.out.println("Error processing file: " + f.getMessage()));
}
});
Expand All @@ -58,6 +61,13 @@ private File nextFileIfPossible() {
return null;
}

public boolean fileQueueDone(boolean possibly) {
if (possibly && !fileQueue.hasNextFile() && !reporting.pendingFileStats()) {
passive.set(true);
}
return passive.get();
}

private boolean resumeHaltedProcessing() {
return fileQueue.processingSlotTaken() && inventoryUpdater.noPendingBatches(10);
}
Expand Down Expand Up @@ -87,10 +97,8 @@ private Future<Void> processFile(File xmlFile, boolean refreshPipeline) {
private Future<TransformationPipeline> getTransformationPipeline(String jobId, boolean refresh) {
Promise<TransformationPipeline> promise = Promise.promise();
if (TransformationPipeline.hasInstance(tenant, jobId) && !refresh) {
//System.out.println("Getting cached transformation pipeline.");
promise.complete(TransformationPipeline.getInstance(tenant, jobId));
} else {
//System.out.println("Building transformation pipeline from configuration database.");
new LegacyHarvesterStorage(vertx, tenant).getConfigRecordById(HARVESTER_HARVESTABLES_PATH, jobId)
.compose(resp -> {
if (resp.wasOK()) {
Expand All @@ -115,4 +123,8 @@ public Reporting reporting() {
return reporting;
}

public String getJobId() {
return jobId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,30 @@

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Reporting {

private final String jobId;
private final FileQueue fileQueue;
public final AtomicBoolean betweenQueuesOfFiles = new AtomicBoolean(true);
private final AtomicLong startTime = new AtomicLong();
private final AtomicInteger filesProcessed = new AtomicInteger(0);
private final AtomicInteger recordsProcessed = new AtomicInteger(0);
private final BlockingQueue<FileStats> fileStats = new ArrayBlockingQueue<>(2);

public Reporting (FileQueue fileQueue) {
this.jobId = fileQueue.getJobId();
this.fileQueue = fileQueue;
public Reporting (JobHandler handler) {
this.jobId = handler.getJobId();
}

public void markNextFileProcessing(String fileName) {
if (betweenQueuesOfFiles.get()) {
//System.out.println("nextFileProcessing() queue marked idle, resetting records, files processed, star time ");
public void nowProcessing(String fileName, boolean resetCounters) {
if (resetCounters) {
recordsProcessed.set(0);
filesProcessed.set(0);
startTime.set(System.currentTimeMillis());
betweenQueuesOfFiles.set(false);
}
try {
//System.out.println("Put FileStats for " + fileName + " in queue.");
fileStats.put(new FileStats(fileName));
} catch (InterruptedException ignore) {}
//System.out.println(fileStats.size() + " FileStats in queue.");
}

public void incrementFilesProcessed() {
Expand Down Expand Up @@ -64,17 +56,9 @@ public void reportFileStats() {
} catch (InterruptedException ie) { System.out.println(ie.getMessage());}
}

public boolean fileQueueDone() {
if (!fileQueue.hasNextFile() && !pendingFileStats()) {
betweenQueuesOfFiles.set(true);
return true;
}
return false;
}

public void reportFileQueueStats() {
public void reportFileQueueStats(boolean queueDone) {
long processingTime = (System.currentTimeMillis() - startTime.get());
System.out.println((fileQueueDone()? "Done processing queue for job " : "Job ") + jobId + ": " + filesProcessed + " file(s) with " + recordsProcessed.get() +
System.out.println((queueDone ? "Done processing queue for job " : "Job ") + jobId + ": " + filesProcessed + " file(s) with " + recordsProcessed.get() +
" records processed in " + processingTimeAsString(processingTime) + " (" +
(recordsProcessed.get() * 1000L / processingTime) + " recs/s.)");
}
Expand Down

0 comments on commit 6db24a7

Please sign in to comment.