Skip to content

Commit

Permalink
Spark: Use bulk deletes in rewrite manifests action (apache#10343)
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar authored Jun 17, 2024
1 parent 52d82f9 commit d8f26ca
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand All @@ -54,7 +55,6 @@
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -352,12 +352,14 @@ private void replaceManifests(
}

private void deleteFiles(Iterable<String> locations) {
Tasks.foreach(locations)
.executeWith(ThreadPools.getWorkerPool())
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(location -> table.io().deleteFile(location));
Iterable<FileInfo> files =
Iterables.transform(locations, location -> new FileInfo(location, MANIFEST));
if (table.io() instanceof SupportsBulkOperations) {
deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
} else {
deleteFiles(
ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file), files.iterator());
}
}

private static ManifestFile writeManifest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand All @@ -60,7 +61,6 @@
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -362,12 +362,14 @@ private void replaceManifests(
}

private void deleteFiles(Iterable<String> locations) {
Tasks.foreach(locations)
.executeWith(ThreadPools.getWorkerPool())
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(location -> table.io().deleteFile(location));
Iterable<FileInfo> files =
Iterables.transform(locations, location -> new FileInfo(location, MANIFEST));
if (table.io() instanceof SupportsBulkOperations) {
deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
} else {
deleteFiles(
ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file), files.iterator());
}
}

private ManifestWriterFactory manifestWriters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand All @@ -60,7 +61,6 @@
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -362,12 +362,14 @@ private void replaceManifests(
}

private void deleteFiles(Iterable<String> locations) {
Tasks.foreach(locations)
.executeWith(ThreadPools.getWorkerPool())
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(location -> table.io().deleteFile(location));
Iterable<FileInfo> files =
Iterables.transform(locations, location -> new FileInfo(location, MANIFEST));
if (table.io() instanceof SupportsBulkOperations) {
deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
} else {
deleteFiles(
ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file), files.iterator());
}
}

private ManifestWriterFactory manifestWriters() {
Expand Down

0 comments on commit d8f26ca

Please sign in to comment.