Skip to content

Commit

Permalink
Add additional checks to ensure complete is not called early
Browse files Browse the repository at this point in the history
  • Loading branch information
wwelling committed Aug 27, 2024
1 parent d1fbaaa commit 5a4a06d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ public static Individual from(Map<String, Object> content) {
return new Individual(content);
}

@Override
public String toString() {
return "Individual [content=" + content + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -215,26 +216,31 @@ public Flux<Individual> export(QueryArg query, List<FilterArg> filters, List<Boo
try {
solrClient.queryAndStreamResponse(collectionName, builder.query(), new StreamingResponseCallback() {
private final AtomicLong remaining = new AtomicLong(0);
private final AtomicBoolean docListInfoReceived = new AtomicBoolean(false);

@Override
public void streamSolrDocument(SolrDocument document) {
emitter.next(Individual.from(document));
Individual individual = Individual.from(document);
logger.info("{}: streamSolrDocument: {}", builder.getId(), individual);
emitter.next(individual);

long numRemaining = remaining.decrementAndGet();
logger.debug("{}: streamSolrDocument {}", builder.getId(), numRemaining);
if (numRemaining == 0) {
logger.info("{}: COMPLETE", builder.getId());
logger.info("{}: streamSolrDocument remaining: {}", builder.getId(), numRemaining);
if (numRemaining == 0 && docListInfoReceived.get()) {
logger.info("{}: streamSolrDocument COMPLETE", builder.getId());
emitter.complete();
}
}

@Override
public void streamDocListInfo(long numFound, long start, Float maxScore) {
logger.debug("{}: streamDocListInfo {} {} {}", builder.getId(), numFound, start, maxScore);
if (numFound > 0) {
remaining.set(numFound);
} else {
logger.info("{}: COMPLETE", builder.getId());
logger.info("{}: streamDocListInfo {} {} {}", builder.getId(), numFound, start, maxScore);

remaining.set(numFound);
docListInfoReceived.set(true);

if (numFound == 0) {
logger.info("{}: streamDocListInfo COMPLETE", builder.getId());
emitter.complete();
}
}
Expand Down

0 comments on commit 5a4a06d

Please sign in to comment.