Skip to content

Commit 1ef4245

Browse files
committed
Reworking Threading and Return Handling.
1 parent f1d1997 commit 1ef4245

File tree

3 files changed

+525
-471
lines changed

3 files changed

+525
-471
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
<groupId>com.cloudera.utils.hadoop</groupId>
3030
<artifactId>hms-mirror</artifactId>
31-
<version>3.0.0.3</version>
31+
<version>3.0.0.4</version>
3232
<packaging>jar</packaging>
3333

3434
<name>hms-mirror</name>

src/main/java/com/cloudera/utils/hms/mirror/service/HMSMirrorAppService.java

Lines changed: 94 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -370,30 +370,29 @@ public CompletableFuture<Boolean> run() {
370370
}
371371
runStatus.setStage(StageEnum.DATABASES, CollectionEnum.COMPLETED);
372372

373-
// Collect Table Information and ensure process is complete before moving on.
374-
while (true) {
375-
boolean check = true;
376-
for (CompletableFuture<ReturnStatus> sf : gtf) {
377-
if (!sf.isDone()) {
378-
check = false;
379-
break;
380-
}
381-
try {
382-
if (sf.isDone() && sf.get() != null) {
383-
if (sf.get().getStatus() == ReturnStatus.Status.ERROR) {
384-
rtn = Boolean.FALSE;
385-
// throw new RuntimeException(sf.get().getException());
386-
}
373+
// Wait for all the CompletableFutures to finish.
374+
CompletableFuture.allOf(gtf.toArray(new CompletableFuture[0])).join();
375+
376+
// Check that all the CompletableFutures in 'gtf' passed with ReturnStatus.Status.SUCCESS.
377+
for (CompletableFuture<ReturnStatus> sf : gtf) {
378+
try {
379+
ReturnStatus rs = sf.get();
380+
if (nonNull(rs)) {
381+
if (rs.getStatus() == ReturnStatus.Status.SUCCESS) {
382+
runStatus.getOperationStatistics().getSuccesses().incrementDatabases();
383+
} else {
384+
rtn = Boolean.FALSE;
385+
runStatus.getOperationStatistics().getFailures().incrementDatabases();
387386
}
388-
} catch (InterruptedException | ExecutionException e) {
389-
log.error("Interrupted Table collection", e);
390-
rtn = Boolean.FALSE;
391-
// throw new RuntimeException(e);
387+
} else {
388+
log.error("ReturnStatus is null in gathering table.");
392389
}
390+
} catch (InterruptedException | ExecutionException e) {
391+
log.error("Interrupted Table collection", e);
392+
rtn = Boolean.FALSE;
393393
}
394-
if (check)
395-
break;
396394
}
395+
397396
runStatus.setStage(StageEnum.TABLES, CollectionEnum.COMPLETED);
398397
gtf.clear(); // reset
399398

@@ -497,58 +496,46 @@ public CompletableFuture<Boolean> run() {
497496
// Check that a tables metadata has been retrieved. When it has (ReturnStatus.Status.CALCULATED_SQL),
498497
// move on to the NEXTSTEP and actual do the transfer.
499498
// ========================================
500-
while (true) {
501-
boolean check = true;
502-
for (CompletableFuture<ReturnStatus> sf : gtf) {
503-
if (!sf.isDone()) {
504-
check = false;
505-
break;
506-
}
507-
try {
508-
if (sf.isDone() && sf.get() != null) {
509-
switch (sf.get().getStatus()) {
510-
case SUCCESS:
511-
runStatus.getOperationStatistics().getCounts().incrementTables();
512-
// Trigger next step and set status.
513-
// TODO: Next Step
514-
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
515-
// Launch the next step, which is the transfer.
516-
runStatus.getOperationStatistics().getSuccesses().incrementTables();
517-
518-
migrationFuture.add(getTransferService().build(sf.get().getTableMirror()));
519-
break;
520-
case ERROR:
521-
runStatus.getOperationStatistics().getCounts().incrementTables();
522-
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
523-
break;
524-
case FATAL:
525-
runStatus.getOperationStatistics().getCounts().incrementTables();
526-
runStatus.getOperationStatistics().getFailures().incrementTables();
527-
rtn = Boolean.FALSE;
528-
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
529-
log.error("FATAL: ", sf.get().getException());
530-
case NEXTSTEP:
531-
break;
532-
case SKIP:
533-
runStatus.getOperationStatistics().getCounts().incrementTables();
534-
// Set for tables that are being removed.
535-
runStatus.getOperationStatistics().getSkipped().incrementTables();
536-
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
537-
break;
538-
}
499+
CompletableFuture.allOf(gtf.toArray(new CompletableFuture[0])).join();
500+
// Check that all the CompletableFutures in 'gtf' passed with ReturnStatus.Status.SUCCESS.
501+
for (CompletableFuture<ReturnStatus> sf : gtf) {
502+
try {
503+
ReturnStatus returnStatus = sf.get();
504+
if (nonNull(returnStatus)) {
505+
switch (returnStatus.getStatus()) {
506+
case SUCCESS:
507+
runStatus.getOperationStatistics().getCounts().incrementTables();
508+
// Trigger next step and set status.
509+
// TODO: Next Step
510+
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
511+
// Launch the next step, which is the transfer.
512+
runStatus.getOperationStatistics().getSuccesses().incrementTables();
513+
514+
migrationFuture.add(getTransferService().build(sf.get().getTableMirror()));
515+
break;
516+
case ERROR:
517+
runStatus.getOperationStatistics().getCounts().incrementTables();
518+
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
519+
break;
520+
case FATAL:
521+
runStatus.getOperationStatistics().getCounts().incrementTables();
522+
runStatus.getOperationStatistics().getFailures().incrementTables();
523+
rtn = Boolean.FALSE;
524+
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
525+
log.error("FATAL: ", sf.get().getException());
526+
case NEXTSTEP:
527+
break;
528+
case SKIP:
529+
runStatus.getOperationStatistics().getCounts().incrementTables();
530+
// Set for tables that are being removed.
531+
runStatus.getOperationStatistics().getSkipped().incrementTables();
532+
sf.get().setStatus(ReturnStatus.Status.NEXTSTEP);
533+
break;
539534
}
540-
} catch (InterruptedException | ExecutionException e) {
541-
rtn = Boolean.FALSE;
542-
log.error("Interrupted", e);
543535
}
544-
}
545-
if (check)
546-
break;
547-
try {
548-
// Slow down the loop.
549-
sleep(2000);
550-
} catch (InterruptedException e) {
551-
throw new RuntimeException(e);
536+
} catch (InterruptedException | ExecutionException | RuntimeException e) {
537+
log.error("Interrupted Table collection", e);
538+
rtn = Boolean.FALSE;
552539
}
553540
}
554541

@@ -587,42 +574,32 @@ public CompletableFuture<Boolean> run() {
587574
Set<TableMirror> migrationExecutions = new HashSet<>();
588575

589576
// Check the Migration Futures are done.
590-
while (true) {
591-
boolean check = true;
592-
for (CompletableFuture<ReturnStatus> sf : migrationFuture) {
593-
if (!sf.isDone()) {
594-
check = false;
595-
continue;
596-
}
597-
try {
598-
if (sf.isDone() && sf.get() != null) {
599-
TableMirror tableMirror = sf.get().getTableMirror();
600-
// Only push SUCCESSFUL tables to the migrationExecutions list.
601-
if (sf.get().getStatus() == ReturnStatus.Status.SUCCESS) {
602-
// Success means add table the execution list.
603-
migrationExecutions.add(tableMirror);
604-
}
577+
CompletableFuture.allOf(migrationFuture.toArray(new CompletableFuture[0])).join();
578+
579+
// Check that all the CompletableFutures in 'migrationFuture' passed with ReturnStatus.Status.SUCCESS.
580+
for (CompletableFuture<ReturnStatus> sf : migrationFuture) {
581+
try {
582+
ReturnStatus rs = sf.get();
583+
if (nonNull(rs)) {
584+
TableMirror tableMirror = rs.getTableMirror();
585+
// Only push SUCCESSFUL tables to the migrationExecutions list.
586+
if (rs.getStatus() == ReturnStatus.Status.SUCCESS) {
587+
// Success means add table the execution list.
588+
migrationExecutions.add(tableMirror);
605589
}
606-
} catch (InterruptedException | ExecutionException e) {
607-
log.error("Interrupted", e);
608-
rtn = Boolean.FALSE;
609-
// throw new RuntimeException(e);
590+
} else {
591+
log.error("ReturnStatus is NULL in migration build");
610592
}
611-
}
612-
if (check)
613-
break;
614-
try {
615-
// Slow down the loop.
616-
sleep(2000);
617-
} catch (InterruptedException e) {
618-
throw new RuntimeException(e);
593+
} catch (InterruptedException | ExecutionException | RuntimeException e) {
594+
log.error("Interrupted Building Migrations", e);
595+
rtn = Boolean.FALSE;
619596
}
620597
}
598+
621599
if (rtn) {
622600
runStatus.setStage(StageEnum.BUILDING_TABLES, CollectionEnum.COMPLETED);
623601
} else {
624602
runStatus.setStage(StageEnum.BUILDING_TABLES, CollectionEnum.ERRORED);
625-
// runStatus.addError(MessageCode.BUILDING_TABLES_ISSUE);
626603
}
627604

628605
migrationFuture.clear(); // reset
@@ -682,41 +659,31 @@ public CompletableFuture<Boolean> run() {
682659
migrationFuture.add(getTransferService().execute(tableMirror));
683660
}
684661

685-
// Check the Migration Futures are done.
686-
while (true) {
687-
boolean check = true;
688-
for (CompletableFuture<ReturnStatus> sf : migrationFuture) {
689-
if (!sf.isDone()) {
690-
check = false;
691-
break;
692-
}
693-
try {
694-
if (sf.isDone() && sf.get() != null) {
695-
TableMirror tableMirror = sf.get().getTableMirror();
696-
if (sf.get().getStatus() == ReturnStatus.Status.ERROR) {
697-
// Check if the table was removed, so that's not a processing error.
698-
if (tableMirror != null) {
699-
if (!tableMirror.isRemove()) {
700-
rtn = Boolean.FALSE;
701-
}
662+
// Wait for all the CompletableFutures to finish.
663+
CompletableFuture.allOf(migrationFuture.toArray(new CompletableFuture[0])).join();
664+
// Check that all the CompletableFutures in 'migrationFuture' passed with ReturnStatus.Status.SUCCESS.
665+
for (CompletableFuture<ReturnStatus> sf : migrationFuture) {
666+
try {
667+
ReturnStatus rs = sf.get();
668+
if (nonNull(rs)) {
669+
TableMirror tableMirror = rs.getTableMirror();
670+
if (rs.getStatus() == ReturnStatus.Status.ERROR) {
671+
// Check if the table was removed, so that's not a processing error.
672+
if (tableMirror != null) {
673+
if (!tableMirror.isRemove()) {
674+
rtn = Boolean.FALSE;
702675
}
703676
}
704677
}
705-
} catch (InterruptedException | ExecutionException e) {
706-
log.error("Interrupted", e);
707-
rtn = Boolean.FALSE;
708-
// throw new RuntimeException(e);
678+
} else {
679+
log.error("ReturnStatus is NULL in migrationFuture");
709680
}
710-
}
711-
if (check)
712-
break;
713-
try {
714-
// Slow down the loop.
715-
sleep(2000);
716-
} catch (InterruptedException e) {
717-
throw new RuntimeException(e);
681+
} catch (InterruptedException | ExecutionException | RuntimeException e) {
682+
log.error("Interrupted Migration Executions", e);
683+
rtn = Boolean.FALSE;
718684
}
719685
}
686+
720687
// If still TRUE, then we're good.
721688
if (rtn) {
722689
runStatus.setStage(StageEnum.PROCESSING_TABLES, CollectionEnum.COMPLETED);

0 commit comments

Comments
 (0)