Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
14be875
Fixing synchronization issues in TroughCrawlLogFeed.java to reduce bl…
adam-miller Jan 28, 2021
d22ef12
Trying to avoid hard locks on updating url caches
adam-miller Feb 2, 2021
79e5633
Fix error log reporting of batch size for trough crawl logs
adam-miller Feb 18, 2021
86e238e
Create and insert into local sqlite dedup db shards
adam-miller Feb 23, 2021
09485a5
Merge branch 'fixes-trough-crawl-log-feed-synchronization' into adds-…
adam-miller Feb 23, 2021
e300051
Merge branch 'adds-trough-local-dedup-write-cache' into adds-trough-d…
adam-miller Feb 23, 2021
00947b0
Fixing sync issues on uncrawled urls batch as well
adam-miller Feb 25, 2021
88b7eea
On disk - per-seed sqlite dedup
adam-miller Feb 25, 2021
e700f38
Switching to in memory sqlite for dedup cache
adam-miller Feb 26, 2021
8724b14
Replacing some synchronized lists with threadsafe objects and fixing …
adam-miller Mar 10, 2021
54abea3
Merge branch 'adds-trough-local-dedup-write-cache' into adds-trough-d…
adam-miller Mar 10, 2021
466c679
Merge branch 'fixes-trough-crawl-log-feed-synchronization' into adds-…
adam-miller Mar 10, 2021
d721756
Ensure that the final batch is posted during shutdown. Make posting o…
adam-miller Apr 5, 2021
0b09340
Fix inconsistency with cached batch sizes and refine logic for forcin…
adam-miller Apr 5, 2021
18edb3e
Refactor crawl log batch posting. Add configuration options for troug…
adam-miller May 13, 2021
6fad293
Ensure we set dedup schema when writing dedup shards
adam-miller May 17, 2021
7fc648c
Add dedup load from in-memory dedup cache db
adam-miller May 20, 2021
58bf0cf
Refactor Trough client URL cache put. Limit TroughContentDigestHistor…
adam-miller May 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.34.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.archive.modules.postprocessor;


import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Date;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -89,8 +91,12 @@ public class TroughCrawlLogFeed extends Processor implements Lifecycle {

protected static final Logger logger = Logger.getLogger(TroughCrawlLogFeed.class.getName());

protected static final int BATCH_MAX_TIME_MS = 20 * 1000;
protected static final int BATCH_MAX_TIME_MS = 60 * 1000;
protected static final int BATCH_MAX_SIZE = 400;
protected static final String CRAWLED_BATCH = "crawled";
protected static final String UNCRAWLED_BATCH = "uncrawled";
protected AtomicInteger crawledBatchSize = new AtomicInteger(0);
protected AtomicInteger uncrawledBatchSize = new AtomicInteger(0);

protected KeyedProperties kp = new KeyedProperties();
public KeyedProperties getKeyedProperties() {
Expand Down Expand Up @@ -119,15 +125,15 @@ public String getRethinkUrl() {

protected TroughClient troughClient() throws MalformedURLException {
if (troughClient == null) {
troughClient = new TroughClient(getRethinkUrl(), 60 * 60);
troughClient = new TroughClient(getRethinkUrl(), null);
troughClient.start();
}
return troughClient;
}

protected List<Object[]> crawledBatch = new ArrayList<Object[]>();
protected ConcurrentLinkedQueue<Object[]> crawledBatch = new ConcurrentLinkedQueue<Object[]>();
protected long crawledBatchLastTime = System.currentTimeMillis();
protected List<Object[]> uncrawledBatch = new ArrayList<Object[]>();
protected ConcurrentLinkedQueue<Object[]> uncrawledBatch = new ConcurrentLinkedQueue<Object[]>();
protected long uncrawledBatchLastTime = System.currentTimeMillis();

protected Frontier frontier;
Expand Down Expand Up @@ -157,14 +163,32 @@ protected boolean shouldProcess(CrawlURI curi) {
return false;
}
}

protected boolean dumpPendingAtClose = true;
public boolean getDumpPendingAtClose() {
return dumpPendingAtClose;
}
public void setDumpPendingAtClose(boolean dumpPendingAtClose) {
this.dumpPendingAtClose = dumpPendingAtClose;
}

protected boolean forceBatchPosting = false;
protected boolean getForceBatchPosting() {
return forceBatchPosting;
}
protected void setForceBatchPosting(boolean forceBatchPosting) {
this.forceBatchPosting = forceBatchPosting;
}

@Override
public synchronized void stop() {
if (!isRunning) {
return;
}
if (!crawledBatch.isEmpty()) {
postCrawledBatch();
setForceBatchPosting(true);
postBatch(crawledBatch, crawledBatchSize, CRAWLED_BATCH);
setForceBatchPosting(false);
}

if (frontier instanceof BdbFrontier) {
Expand All @@ -177,9 +201,14 @@ public void execute(Object o) {
}
};

logger.info("dumping " + frontier.queuedUriCount() + " queued urls to trough feed");
((BdbFrontier) frontier).forAllPendingDo(closure);
logger.info("dumped " + frontier.queuedUriCount() + " queued urls to trough feed");
if(getDumpPendingAtClose()) {
logger.info("dumping " + frontier.queuedUriCount() + " queued urls to trough feed");
setForceBatchPosting(true);
((BdbFrontier) frontier).forAllPendingDo(closure);
postBatch(uncrawledBatch, uncrawledBatchSize, UNCRAWLED_BATCH);
setForceBatchPosting(false);
logger.info("dumped " + frontier.queuedUriCount() + " queued urls to trough feed");
}
} else {
logger.warning("frontier is not a BdbFrontier, cannot dump queued urls to trough feed");
}
Expand Down Expand Up @@ -223,12 +252,10 @@ protected void innerProcess(CrawlURI curi) throws InterruptedException {
serverCache.getHostFor(curi.getUURI()).getHostName(),
};

synchronized (crawledBatch) {
crawledBatch.add(values);
}
crawledBatch.add(values);

if (crawledBatch.size() >= BATCH_MAX_SIZE || System.currentTimeMillis() - crawledBatchLastTime > BATCH_MAX_TIME_MS) {
postCrawledBatch();
if (crawledBatchSize.incrementAndGet() >= BATCH_MAX_SIZE || System.currentTimeMillis() - crawledBatchLastTime > BATCH_MAX_TIME_MS) {
postBatch(crawledBatch, crawledBatchSize, CRAWLED_BATCH);
}
} else {
Object[] values = new Object[] {
Expand All @@ -241,72 +268,78 @@ protected void innerProcess(CrawlURI curi) throws InterruptedException {
serverCache.getHostFor(curi.getUURI()).getHostName(),
};

synchronized (uncrawledBatch) {
uncrawledBatch.add(values);
}

if (uncrawledBatch.size() >= BATCH_MAX_SIZE || System.currentTimeMillis() - uncrawledBatchLastTime > BATCH_MAX_TIME_MS) {
postUncrawledBatch();
uncrawledBatch.add(values);
if (uncrawledBatchSize.incrementAndGet() >= BATCH_MAX_SIZE || System.currentTimeMillis() - uncrawledBatchLastTime > BATCH_MAX_TIME_MS) {
postBatch(uncrawledBatch, uncrawledBatchSize, UNCRAWLED_BATCH);
}
}
}

protected void postCrawledBatch() {
logger.info("posting batch of " + crawledBatch.size() + " crawled urls trough segment " + getSegmentId());
synchronized (crawledBatch) {
if (!crawledBatch.isEmpty()) {
StringBuffer sqlTmpl = new StringBuffer();
sqlTmpl.append("insert into crawled_url ("
+ "timestamp, status_code, size, payload_size, url, hop_path, is_seed_redirect, "
+ "via, mimetype, content_digest, seed, is_duplicate, warc_filename, "
+ "warc_offset, warc_content_bytes, host) values "
+ "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)");
for (int i = 1; i < crawledBatch.size(); i++) {
sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)");
protected void postBatch(ConcurrentLinkedQueue<Object[]> batch, AtomicInteger batchSize, String batchType) {
ArrayList<Object[]> crawlLogLines = new ArrayList<Object[]>();
synchronized (batch) {
//read and remove log lines from batch into local variable so we can exit the synchronized block asap
if (batchSize.get() >= BATCH_MAX_SIZE || getForceBatchPosting()) {
while(!batch.isEmpty()) {
Object[] crawlLine = batch.poll();
if(crawlLine != null)
crawlLogLines.add(crawlLine);
else
break;
}

Object[] flattenedValues = new Object[16 * crawledBatch.size()];
for (int i = 0; i < crawledBatch.size(); i++) {
System.arraycopy(crawledBatch.get(i), 0, flattenedValues, 16 * i, 16);
}

try {
troughClient().write(getSegmentId(), sqlTmpl.toString(), flattenedValues);
} catch (Exception e) {
logger.log(Level.WARNING, "problem posting batch of " + crawledBatch.size() + " crawled urls to trough segment " + getSegmentId(), e);
}

crawledBatchLastTime = System.currentTimeMillis();
crawledBatch.clear();
batchSize.getAndSet(batch.size()); //size() is O(n). Use sparingly. Should be near zero right now.
logger.info("posting batch of " + crawlLogLines.size() + " " + batchType + " urls to trough segment " + getSegmentId());
}
}
}
protected void postUncrawledBatch() {
logger.info("posting batch of " + uncrawledBatch.size() + " uncrawled urls trough segment " + getSegmentId());
synchronized (uncrawledBatch) {
if (!uncrawledBatch.isEmpty()) {
StringBuffer sqlTmpl = new StringBuffer();
sqlTmpl.append(
"insert into uncrawled_url (timestamp, url, hop_path, status_code, via, seed, host)"
+ " values (%s, %s, %s, %s, %s, %s, %s)");

for (int i = 1; i < uncrawledBatch.size(); i++) {
sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s)");
}
if( crawlLogLines != null && crawlLogLines.size() > 0){
StringBuffer sqlTmpl = new StringBuffer();
int numCols=0;
switch(batchType){
case CRAWLED_BATCH:
sqlTmpl.append("insert into crawled_url ("
+ "timestamp, status_code, size, payload_size, url, hop_path, is_seed_redirect, "
+ "via, mimetype, content_digest, seed, is_duplicate, warc_filename, "
+ "warc_offset, warc_content_bytes, host) values "
+ "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)");
for (int i = 1; i < crawlLogLines.size(); i++) {
sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)");
}
numCols=16;
break;
case UNCRAWLED_BATCH:
sqlTmpl.append(
"insert into uncrawled_url (timestamp, url, hop_path, status_code, via, seed, host)"
+ " values (%s, %s, %s, %s, %s, %s, %s)");

for (int i = 1; i < crawlLogLines.size(); i++) {
sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s)");
}
numCols=7;
break;
}

Object[] flattenedValues = new Object[7 * uncrawledBatch.size()];
for (int i = 0; i < uncrawledBatch.size(); i++) {
System.arraycopy(uncrawledBatch.get(i), 0, flattenedValues, 7 * i, 7);
}
/*
Trough takes a single object array for the values we insert, so an array of size (N columns * X rows).
We read each row in column by column, then repeat for each row.
*/
Object[] flattenedValues = new Object[numCols * crawlLogLines.size()];
for (int i = 0; i < crawlLogLines.size(); i++) {
System.arraycopy(crawlLogLines.get(i),0,flattenedValues, numCols * i,numCols);
}

try {
troughClient().write(getSegmentId(), sqlTmpl.toString(), flattenedValues);
} catch (Exception e) {
logger.log(Level.WARNING, "problem posting batch of " + uncrawledBatch.size() + " uncrawled urls to trough segment " + getSegmentId(), e);
}
try {
troughClient().write(getSegmentId(), sqlTmpl.toString(), flattenedValues);
} catch (Exception e) {
logger.log(Level.WARNING, "problem posting batch of " + crawlLogLines.size() + " " + batchType + " urls to trough segment " + getSegmentId(), e);
}

uncrawledBatchLastTime = System.currentTimeMillis();
uncrawledBatch.clear();
switch(batchType) {
case CRAWLED_BATCH:
crawledBatchLastTime = System.currentTimeMillis();
break;
case UNCRAWLED_BATCH:
uncrawledBatchLastTime = System.currentTimeMillis();
break;
}
}
}
Expand Down
Loading