Skip to content

Commit af4487c

Browse files
authored
Merge branch 'master' into last_partition_optimize
2 parents 1496b4c + ccad338 commit af4487c

File tree

24 files changed

+845
-74
lines changed

24 files changed

+845
-74
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public boolean cancelProcess(long processId) throws TException {
299299
if (process == null || process.getProcessId() != processId) {
300300
return false;
301301
}
302-
process.close();
302+
process.close(true);
303303
return true;
304304
}
305305

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public interface OptimizingProcess {
2828

2929
long getProcessId();
3030

31-
void close();
31+
void close(boolean needCommit);
3232

3333
boolean isClosed();
3434

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.amoro.server.AmoroServiceConstants;
3636
import org.apache.amoro.server.catalog.CatalogManager;
3737
import org.apache.amoro.server.manager.MetricManager;
38+
import org.apache.amoro.server.optimizing.TaskRuntime.Status;
3839
import org.apache.amoro.server.persistence.OptimizingProcessState;
3940
import org.apache.amoro.server.persistence.PersistentBase;
4041
import org.apache.amoro.server.persistence.TaskFilesPersistence;
@@ -149,7 +150,7 @@ private void initTableRuntime(DefaultTableRuntime tableRuntime) {
149150
"Close the committing process {} on table {}",
150151
process.getProcessId(),
151152
tableRuntime.getTableIdentifier());
152-
process.close();
153+
process.close(false);
153154
}
154155
}
155156
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
@@ -159,7 +160,7 @@ private void initTableRuntime(DefaultTableRuntime tableRuntime) {
159160
}
160161
} else {
161162
if (process != null) {
162-
process.close();
163+
process.close(false);
163164
}
164165
}
165166
}
@@ -188,7 +189,7 @@ public void releaseTable(DefaultTableRuntime tableRuntime) {
188189
.filter(process -> process.getTableId() == tableRuntime.getTableIdentifier().getId())
189190
.collect(Collectors.toList());
190191
for (OptimizingProcess process : processList) {
191-
process.close();
192+
process.close(false);
192193
clearProcess(process);
193194
}
194195
LOG.info(
@@ -524,15 +525,19 @@ public ProcessStatus getStatus() {
524525
}
525526

526527
@Override
527-
public void close() {
528+
public void close(boolean needCommit) {
528529
lock.lock();
529530
try {
530531
if (this.status != ProcessStatus.RUNNING) {
531532
return;
532533
}
533-
this.status = ProcessStatus.CLOSED;
534-
this.endTime = System.currentTimeMillis();
535-
persistAndSetCompleted(false);
534+
if (tableRuntime.isAllowPartialCommit() && needCommit) {
535+
tableRuntime.beginCommitting();
536+
} else {
537+
this.status = ProcessStatus.CLOSED;
538+
this.endTime = System.currentTimeMillis();
539+
persistAndSetCompleted(false);
540+
}
536541
} finally {
537542
lock.unlock();
538543
}
@@ -581,14 +586,25 @@ private void acceptResult(TaskRuntime<?> taskRuntime) {
581586
taskRuntime.getFailReason());
582587
retryTask(taskRuntime);
583588
} else {
584-
LOG.info(
585-
"Task {} has reached the max execute retry count. Process {} failed.",
586-
taskRuntime.getTaskId(),
587-
processId);
588-
this.failedReason = taskRuntime.getFailReason();
589-
this.status = ProcessStatus.FAILED;
590-
this.endTime = taskRuntime.getEndTime();
591-
persistAndSetCompleted(false);
589+
if (tableRuntime.isAllowPartialCommit()
590+
&& tableRuntime.getOptimizingStatus().isProcessing()
591+
&& tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
592+
LOG.info(
593+
"Task {} has reached the max execute retry count. Process {} cancels unfinished tasks and commits SUCCESS tasks.",
594+
taskRuntime.getTaskId(),
595+
processId);
596+
failedReason = taskRuntime.getFailReason();
597+
tableRuntime.beginCommitting();
598+
} else {
599+
LOG.info(
600+
"Task {} has reached the max execute retry count. Process {} failed.",
601+
taskRuntime.getTaskId(),
602+
processId);
603+
this.failedReason = taskRuntime.getFailReason();
604+
this.status = ProcessStatus.FAILED;
605+
this.endTime = taskRuntime.getEndTime();
606+
persistAndSetCompleted(false);
607+
}
592608
}
593609
}
594610
} finally {
@@ -666,11 +682,18 @@ public int getActualQuota() {
666682

667683
@Override
668684
public void commit() {
685+
List<TaskRuntime<RewriteStageTask>> successTasks =
686+
taskMap.values().stream()
687+
.filter(task -> task.getStatus() == Status.SUCCESS)
688+
.collect(Collectors.toList());
669689
LOG.debug(
670690
"{} get {} tasks of {} partitions to commit",
671691
tableRuntime.getTableIdentifier(),
672-
taskMap.size(),
673-
taskMap.values());
692+
successTasks.size(),
693+
successTasks.stream()
694+
.map(task -> task.getTaskDescriptor().getPartition())
695+
.distinct()
696+
.count());
674697

675698
lock.lock();
676699
try {
@@ -685,9 +708,16 @@ public void commit() {
685708
try {
686709
hasCommitted = true;
687710
buildCommit().commit();
688-
status = ProcessStatus.SUCCESS;
711+
if (allTasksPrepared()) {
712+
status = ProcessStatus.SUCCESS;
713+
} else if (taskMap.values().stream()
714+
.anyMatch(task -> task.getStatus() == TaskRuntime.Status.FAILED)) {
715+
status = ProcessStatus.FAILED;
716+
} else {
717+
status = ProcessStatus.CLOSED;
718+
}
689719
endTime = System.currentTimeMillis();
690-
persistAndSetCompleted(true);
720+
persistAndSetCompleted(status == ProcessStatus.SUCCESS);
691721
} catch (PersistenceException e) {
692722
LOG.warn(
693723
"{} failed to persist process completed, will retry next commit",
@@ -838,7 +868,7 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
838868
"Load task inputs failed, close the optimizing process : {}",
839869
optimizingProcess.getProcessId(),
840870
e);
841-
optimizingProcess.close();
871+
optimizingProcess.close(false);
842872
}
843873
}
844874

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.amoro.optimizing.RewriteFilesOutput;
3636
import org.apache.amoro.optimizing.RewriteStageTask;
3737
import org.apache.amoro.properties.HiveTableProperties;
38+
import org.apache.amoro.server.optimizing.TaskRuntime.Status;
3839
import org.apache.amoro.server.utils.IcebergTableUtil;
3940
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
4041
import org.apache.amoro.table.MixedTable;
@@ -86,6 +87,25 @@ public UnKeyedTableCommit(
8687
this.tasks = tasks;
8788
}
8889

90+
private Set<ContentFile<?>> getExcludedDeleteFiles(
91+
List<TaskRuntime<RewriteStageTask>> successTasks) {
92+
Set<ContentFile<?>> excludedDeleteFiles = new HashSet<>();
93+
tasks.stream()
94+
.filter(task -> !successTasks.contains(task))
95+
.map(TaskRuntime::getTaskDescriptor)
96+
.filter(task -> task.getInput().rewrittenDeleteFiles() != null)
97+
.forEach(
98+
task ->
99+
excludedDeleteFiles.addAll(
100+
Arrays.stream(task.getInput().rewrittenDeleteFiles())
101+
.collect(Collectors.toSet())));
102+
return excludedDeleteFiles;
103+
}
104+
105+
private boolean needRemove(Set<ContentFile<?>> excludedDeleteFiles, ContentFile<?> deleteFile) {
106+
return !excludedDeleteFiles.contains(deleteFile);
107+
}
108+
89109
protected List<DataFile> moveFile2HiveIfNeed() {
90110
if (!needMoveFile2Hive()) {
91111
return null;
@@ -186,16 +206,26 @@ private String getIcebergPartitionLocation(StructLike partitionData) {
186206
}
187207

188208
public void commit() throws OptimizingCommitException {
209+
List<TaskRuntime<RewriteStageTask>> successTasks =
210+
tasks.stream()
211+
.filter(task -> task.getStatus() == Status.SUCCESS)
212+
.collect(Collectors.toList());
213+
if (successTasks.isEmpty()) {
214+
LOG.info("No tasks to commit for table {}", table.id());
215+
return;
216+
}
217+
189218
long startTime = System.currentTimeMillis();
190219
LOG.info("Starting to commit table {} with {} tasks.", table.id(), tasks.size());
191220

221+
Set<ContentFile<?>> excludedDeleteFiles = getExcludedDeleteFiles(successTasks);
192222
List<DataFile> hiveNewDataFiles = moveFile2HiveIfNeed();
193223
// collect files
194224
Set<DataFile> addedDataFiles = Sets.newHashSet();
195225
Set<DataFile> removedDataFiles = Sets.newHashSet();
196226
Set<DeleteFile> addedDeleteFiles = Sets.newHashSet();
197227
Set<DeleteFile> removedDeleteFiles = Sets.newHashSet();
198-
tasks.stream()
228+
successTasks.stream()
199229
.map(TaskRuntime::getTaskDescriptor)
200230
.forEach(
201231
task -> {
@@ -213,6 +243,7 @@ public void commit() throws OptimizingCommitException {
213243
if (task.getInput().rewrittenDeleteFiles() != null) {
214244
removedDeleteFiles.addAll(
215245
Arrays.stream(task.getInput().rewrittenDeleteFiles())
246+
.filter(deleteFile -> needRemove(excludedDeleteFiles, deleteFile))
216247
.map(ContentFiles::asDeleteFile)
217248
.collect(Collectors.toSet()));
218249
}

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@
3232
import org.apache.amoro.server.table.TableConfigurations;
3333
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
3434
import org.apache.amoro.server.utils.IcebergTableUtil;
35+
import org.apache.amoro.server.utils.RollingFileCleaner;
3536
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
3637
import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
3738
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
3839
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
3940
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
4041
import org.apache.amoro.table.TableIdentifier;
4142
import org.apache.amoro.utils.TableFileUtil;
42-
import org.apache.hadoop.fs.Path;
4343
import org.apache.iceberg.ContentFile;
4444
import org.apache.iceberg.ContentScanTask;
4545
import org.apache.iceberg.DataFile;
@@ -67,8 +67,8 @@
6767
import org.apache.iceberg.types.Conversions;
6868
import org.apache.iceberg.types.Type;
6969
import org.apache.iceberg.types.Types;
70+
import org.apache.iceberg.util.DateTimeUtil;
7071
import org.apache.iceberg.util.SerializableFunction;
71-
import org.apache.iceberg.util.ThreadPools;
7272
import org.slf4j.Logger;
7373
import org.slf4j.LoggerFactory;
7474

@@ -91,7 +91,6 @@
9191
import java.util.Queue;
9292
import java.util.Set;
9393
import java.util.concurrent.LinkedTransferQueue;
94-
import java.util.concurrent.atomic.AtomicInteger;
9594
import java.util.regex.Pattern;
9695
import java.util.stream.Collectors;
9796
import java.util.stream.Stream;
@@ -201,54 +200,31 @@ private void expireSnapshots(long olderThan, int minCount, Set<String> exclude)
201200
olderThan,
202201
minCount,
203202
exclude);
204-
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
205-
Set<String> parentDirectories = new HashSet<>();
206-
Set<String> expiredFiles = new HashSet<>();
203+
RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(), exclude);
207204
table
208205
.expireSnapshots()
209206
.retainLast(Math.max(minCount, 1))
210207
.expireOlderThan(olderThan)
211-
.deleteWith(
212-
file -> {
213-
if (exclude.isEmpty()) {
214-
expiredFiles.add(file);
215-
} else {
216-
String fileUriPath = TableFileUtil.getUriPath(file);
217-
if (!exclude.contains(fileUriPath)
218-
&& !exclude.contains(new Path(fileUriPath).getParent().toString())) {
219-
expiredFiles.add(file);
220-
}
221-
}
222-
223-
parentDirectories.add(new Path(file).getParent().toString());
224-
toDeleteFiles.incrementAndGet();
225-
})
208+
.deleteWith(expiredFileCleaner::addFile)
226209
.cleanExpiredFiles(
227210
true) /* enable clean only for collecting the expired files, will delete them later */
228211
.commit();
229212

230-
// try to batch delete files
231-
int deletedFiles =
232-
TableFileUtil.parallelDeleteFiles(fileIO(), expiredFiles, ThreadPools.getWorkerPool());
233-
234-
parentDirectories.forEach(
235-
parent -> {
236-
try {
237-
TableFileUtil.deleteEmptyDirectory(fileIO(), parent, exclude);
238-
} catch (Exception e) {
239-
// Ignore exceptions to remove as many directories as possible
240-
LOG.warn("Failed to delete empty directory {} for table {}", parent, table.name(), e);
241-
}
242-
});
243-
244-
runWithCondition(
245-
toDeleteFiles.get() > 0,
246-
() ->
247-
LOG.info(
248-
"Deleted {}/{} files for table {}",
249-
deletedFiles,
250-
toDeleteFiles.get(),
251-
getTable().name()));
213+
int collectedFiles = expiredFileCleaner.fileCount();
214+
expiredFileCleaner.clear();
215+
if (collectedFiles > 0) {
216+
LOG.info(
217+
"Expired {}/{} files for table {} order than {}",
218+
collectedFiles,
219+
expiredFileCleaner.cleanedFileCount(),
220+
table.name(),
221+
DateTimeUtil.formatTimestampMillis(olderThan));
222+
} else {
223+
LOG.debug(
224+
"No expired files found for table {} order than {}",
225+
table.name(),
226+
DateTimeUtil.formatTimestampMillis(olderThan));
227+
}
252228
}
253229

254230
@Override

amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
2525
import org.apache.amoro.process.ProcessStatus;
2626
import org.apache.amoro.server.optimizing.OptimizingProcess;
27+
import org.apache.amoro.server.optimizing.OptimizingStatus;
2728
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
2829
import org.apache.amoro.server.table.DefaultTableRuntime;
2930
import org.apache.amoro.server.table.TableService;
@@ -58,8 +59,9 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {
5859
}
5960

6061
private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) {
62+
// only evaluate pending input when optimizing is enabled and in idle state
6163
if (tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()
62-
&& !tableRuntime.getOptimizingStatus().isProcessing()) {
64+
&& tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
6365
AbstractOptimizingEvaluator evaluator =
6466
IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
6567
if (evaluator.isNecessary()) {
@@ -86,7 +88,7 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
8688
&& !tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()) {
8789
OptimizingProcess optimizingProcess = defaultTableRuntime.getOptimizingProcess();
8890
if (optimizingProcess != null && optimizingProcess.getStatus() == ProcessStatus.RUNNING) {
89-
optimizingProcess.close();
91+
optimizingProcess.close(false);
9092
}
9193
}
9294
}

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ public double calculateQuotaOccupy() {
247247
return (double) getQuotaTime() / AmoroServiceConstants.QUOTA_LOOK_BACK_TIME / targetQuotaLimit;
248248
}
249249

250+
public boolean isAllowPartialCommit() {
251+
return getOptimizingConfig().isAllowPartialCommit();
252+
}
253+
250254
public void setPendingInput(AbstractOptimizingEvaluator.PendingInput pendingInput) {
251255
long pendingFileSize =
252256
pendingInput.getDataFileSize()
@@ -303,7 +307,7 @@ public DefaultTableRuntime refresh(AmoroTable<?> table) {
303307
if (!Objects.equals(
304308
getGroupName(), newConfiguration.getOptimizingConfig().getOptimizerGroup())) {
305309
if (optimizingProcess != null) {
306-
optimizingProcess.close();
310+
optimizingProcess.close(false);
307311
}
308312
this.optimizingMetrics.optimizerGroupChanged(getGroupName());
309313
}
@@ -429,7 +433,7 @@ public void dispose() {
429433
store()
430434
.synchronizedInvoke(
431435
() -> {
432-
Optional.ofNullable(optimizingProcess).ifPresent(OptimizingProcess::close);
436+
Optional.ofNullable(optimizingProcess).ifPresent(process -> process.close(false));
433437
});
434438
super.dispose();
435439
}

0 commit comments

Comments
 (0)