Skip to content

Commit

Permalink
Merge pull request #982 from IDgis/import-timeout-fix
Browse files Browse the repository at this point in the history
Import timeout fix
  • Loading branch information
KevinvdBosch authored Mar 22, 2021
2 parents 58ed6ce + 8c23328 commit 0e69a6c
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public final CompletableFuture<Object> ask(Object message) {
return f.ask(actorRef, message);
}

@Override
public final CompletableFuture<Object> ask(Object message, long timeout) {
return f.ask(actorRef, message, timeout);
}

@Override
public final void tell(Object message, ActorRef sender) {
actorRef.tell(message, sender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface AsyncHelper {
AsyncTransactionRef getTransactionRef();

CompletableFuture<Object> ask(Object message);

CompletableFuture<Object> ask(Object message, long timeout);

void tell(Object message, ActorRef sender);
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private void handleEnd(final End end) {
return new FinalizeSession(JobState.FAILED);
}
}).exceptionally(t -> {
log.error("couldn't properly finalize session: {}", t);
log.error(t, "couldn't properly finalize session");

return new FinalizeSession(JobState.FAILED);
}).thenAccept(msg -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void preStart() throws Exception {
public void onReceive(Object msg) throws Exception {
if(msg instanceof ImportJobInfo) {
handleImportJob((ImportJobInfo)msg);
} if(msg instanceof RemoveJobInfo) {
} else if(msg instanceof RemoveJobInfo) {
handleRemoveJob((RemoveJobInfo)msg);
} else if(msg instanceof SessionStarted) {
handleSessionStarted((SessionStarted)msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ protected CompletableFuture<Object> importSucceeded() {
.filter(column -> column.getDataType().equals(Type.GEOMETRY))
.collect(Collectors.toList());

return tx.ask(new CreateIndices("staging_data", tmpTable, geometryColumns)).thenCompose(createIndicesMsg -> {
long timeout = insertCount * 3 + 15000;
log.debug("Creating indices on table {} with a timeout of: {} ms", tmpTable, timeout);

return tx.ask(new CreateIndices("staging_data", tmpTable, geometryColumns), timeout).thenCompose(createIndicesMsg -> {
log.debug("indices created");

if(createIndicesMsg instanceof Ack) {
Expand Down

0 comments on commit 0e69a6c

Please sign in to comment.