From fdc34100a03b5a35455d121494f55653d9bdce8a Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 25 Oct 2024 13:47:10 +0100 Subject: [PATCH] some PoC changes --- .../telemetry/TelemetryConfiguration.java | 2 +- .../io/logical/LogicalIOConfiguration.java | 3 +- .../io/logical/impl/ParquetLogicalIOImpl.java | 28 ++++++++++++++++++- .../io/logical/impl/ParquetPrefetcher.java | 7 +++-- .../io/logical/parquet/ColumnMetadata.java | 2 ++ .../parquet/ParquetMetadataParsingTask.java | 4 +++ .../ParquetPredictivePrefetchingTask.java | 25 +++++++++++++++-- .../io/physical/PhysicalIOConfiguration.java | 2 +- .../io/physical/data/BlobStore.java | 11 ++++++++ .../io/physical/data/BlockStore.java | 1 + .../io/physical/impl/PhysicalIOImpl.java | 4 ++- 11 files changed, 79 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/software/amazon/s3/dataaccelerator/common/telemetry/TelemetryConfiguration.java b/common/src/main/java/software/amazon/s3/dataaccelerator/common/telemetry/TelemetryConfiguration.java index 96cfd4b8..77dfb3ec 100644 --- a/common/src/main/java/software/amazon/s3/dataaccelerator/common/telemetry/TelemetryConfiguration.java +++ b/common/src/main/java/software/amazon/s3/dataaccelerator/common/telemetry/TelemetryConfiguration.java @@ -42,7 +42,7 @@ public class TelemetryConfiguration { // Logging reporting is on by default public static final String LOGGING_ENABLED_KEY = "logging.enabled"; - public static final boolean DEFAULT_LOGGING_ENABLED = true; + public static final boolean DEFAULT_LOGGING_ENABLED = false; // Aggregations are off by default public static final String AGGREGATIONS_ENABLED_KEY = "aggregations.enabled"; diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/LogicalIOConfiguration.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/LogicalIOConfiguration.java index 0efbf601..6bae99c9 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/LogicalIOConfiguration.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/LogicalIOConfiguration.java @@ -15,6 +15,7 @@ */ package software.amazon.s3.dataaccelerator.io.logical; +import static software.amazon.s3.dataaccelerator.util.Constants.ONE_KB; import static software.amazon.s3.dataaccelerator.util.Constants.ONE_MB; import lombok.Builder; @@ -29,7 +30,7 @@ @EqualsAndHashCode public class LogicalIOConfiguration { private static final boolean DEFAULT_FOOTER_CACHING_ENABLED = true; - private static final long DEFAULT_FOOTER_CACHING_SIZE = ONE_MB; + private static final long DEFAULT_FOOTER_CACHING_SIZE = 32 * ONE_KB; private static final boolean DEFAULT_SMALL_OBJECT_PREFETCHING_ENABLED = true; private static final long DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD = 3 * ONE_MB; private static final int DEFAULT_PARQUET_METADATA_STORE_SIZE = 45; diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetLogicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetLogicalIOImpl.java index ad3dc506..0a532dcf 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetLogicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetLogicalIOImpl.java @@ -16,9 +16,14 @@ package software.amazon.s3.dataaccelerator.io.logical.impl; import java.io.IOException; +import java.util.Optional; + import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.s3.dataaccelerator.common.telemetry.Telemetry; import software.amazon.s3.dataaccelerator.io.logical.LogicalIOConfiguration; +import software.amazon.s3.dataaccelerator.io.logical.parquet.ParquetMetadataParsingTask; import software.amazon.s3.dataaccelerator.io.physical.PhysicalIO; import software.amazon.s3.dataaccelerator.util.S3URI; @@ -29,6 +34,11 @@ public class ParquetLogicalIOImpl extends DefaultLogicalIOImpl { // Dependencies private final ParquetPrefetcher parquetPrefetcher; + private final PhysicalIO physicalIO; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetLogicalIOImpl.class); + + boolean columnsRead = false; /** * Constructs an instance of LogicalIOImpl. @@ -51,6 +61,8 @@ public ParquetLogicalIOImpl( this.parquetPrefetcher = new ParquetPrefetcher( s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore); + + this.physicalIO = physicalIO; this.parquetPrefetcher.prefetchFooterAndBuildMetadata(); } @@ -68,8 +80,22 @@ public ParquetLogicalIOImpl( public int read(byte[] buf, int off, int len, long position) throws IOException { // Perform async prefetching before doing the blocking read this.parquetPrefetcher.prefetchRemainingColumnChunk(position, len); - this.parquetPrefetcher.addToRecentColumnList(position); + Optional columnName = this.parquetPrefetcher.addToRecentColumnList(position, len); + + if (columnName.isPresent()) { + columnsRead = true; + } return super.read(buf, off, len, position); } + + @Override + public void close() throws IOException { + if (columnsRead) { + LOG.info("Columns read!, removing from cache"); + physicalIO.close(); + } + + } + } diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetPrefetcher.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetPrefetcher.java index cbbd989a..d26d27cb 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetPrefetcher.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/impl/ParquetPrefetcher.java @@ -15,6 +15,7 @@ */ package software.amazon.s3.dataaccelerator.io.logical.impl; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.AccessLevel; import lombok.AllArgsConstructor; @@ -185,10 +186,12 @@ private CompletableFuture prefetchPredictedColumns( * * @param position the position to record */ - public void addToRecentColumnList(long position) { + public Optional addToRecentColumnList(long position, int len) { if (logicalIOConfiguration.getPrefetchingMode() != PrefetchMode.OFF) { - this.parquetPredictivePrefetchingTask.addToRecentColumnList(position); + return this.parquetPredictivePrefetchingTask.addToRecentColumnList(position, len); } + + return Optional.empty(); } private boolean shouldPrefetch() { diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ColumnMetadata.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ColumnMetadata.java index 3c26f0eb..35d53a98 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ColumnMetadata.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ColumnMetadata.java @@ -22,6 +22,8 @@ public class ColumnMetadata { private final int rowGroupIndex; private final String columnName; + private final long dataPageOffset; + private final long dictionaryOffset; private final long startPos; private final long compressedSize; private final int schemaHash; diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java index 6423ed6b..1f9564f5 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java @@ -108,6 +108,8 @@ private ColumnMappers buildColumnMaps(FileMetaData fileMetaData) { new ColumnMetadata( rowGroupIndex, columnName, + columnChunk.getMeta_data().getData_page_offset(), + columnChunk.getMeta_data().getDictionary_page_offset(), columnChunk.getMeta_data().getDictionary_page_offset(), columnChunk.getMeta_data().getTotal_compressed_size(), concatenatedColumnNames.hashCode()); @@ -121,6 +123,8 @@ private ColumnMappers buildColumnMaps(FileMetaData fileMetaData) { new ColumnMetadata( rowGroupIndex, columnName, + columnChunk.getMeta_data().getData_page_offset(), + 0, columnChunk.getFile_offset(), columnChunk.getMeta_data().getTotal_compressed_size(), concatenatedColumnNames.hashCode()); diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java index 53f9fe4f..2d251369 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java @@ -112,18 +112,24 @@ public ParquetPredictivePrefetchingTask( * @param position current read position * @return name of column added as recent column */ - public Optional addToRecentColumnList(long position) { + public Optional addToRecentColumnList(long position, int len) { if (parquetColumnPrefetchStore.getColumnMappers(s3Uri) != null) { ColumnMappers columnMappers = parquetColumnPrefetchStore.getColumnMappers(s3Uri); if (columnMappers.getOffsetIndexToColumnMap().containsKey(position)) { ColumnMetadata columnMetadata = columnMappers.getOffsetIndexToColumnMap().get(position); parquetColumnPrefetchStore.addRecentColumn(columnMetadata); + LOG.info("READING COLUMN {}", columnMetadata.getColumnName()); + // Maybe prefetch all recent columns for the current row group, if they have not been // prefetched already. prefetchCurrentRowGroup(columnMappers, columnMetadata); - return Optional.of(columnMetadata.getColumnName()); + if (len / (double) columnMetadata.getCompressedSize() > 0.3) { + return Optional.of(columnMetadata.getColumnName()); + } + + return Optional.empty(); } } @@ -171,14 +177,27 @@ public IOPlanExecution prefetchRecentColumns( List prefetchRanges = new ArrayList<>(); for (String recentColumn : getRecentColumns(columnMappers.getOffsetIndexToColumnMap())) { if (columnMappers.getColumnNameToColumnMap().containsKey(recentColumn)) { - LOG.debug( + LOG.info( "Column {} found in schema for {}, adding to prefetch list", recentColumn, this.s3Uri.getKey()); + List columnMetadataList = columnMappers.getColumnNameToColumnMap().get(recentColumn); for (ColumnMetadata columnMetadata : columnMetadataList) { if (rowGroupsToPrefetch.contains(columnMetadata.getRowGroupIndex())) { + + // A separate request for the dictionary + if (columnMetadata.getDictionaryOffset() != 0) { + prefetchRanges.add( + new Range( + columnMetadata.getDictionaryOffset(), + columnMetadata.getDictionaryOffset() + (columnMetadata.getDataPageOffset() - columnMetadata.getDictionaryOffset() - 1)) + ); + + LOG.info("PREFETCHING DICTIONARY {} - {}", columnMetadata.getDictionaryOffset(), columnMetadata.getDictionaryOffset() + (columnMetadata.getDataPageOffset() - columnMetadata.getDictionaryOffset() - 1)); + } + prefetchRanges.add( new Range( columnMetadata.getStartPos(), diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/PhysicalIOConfiguration.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/PhysicalIOConfiguration.java index 2ae9e680..fc041475 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/PhysicalIOConfiguration.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/PhysicalIOConfiguration.java @@ -30,7 +30,7 @@ @Builder @EqualsAndHashCode public class PhysicalIOConfiguration { - private static final int DEFAULT_CAPACITY_BLOB_STORE = 50; + private static final int DEFAULT_CAPACITY_BLOB_STORE = 18; private static final int DEFAULT_CAPACITY_METADATA_STORE = 50; private static final boolean DEFAULT_USE_SINGLE_CACHE = true; private static final long DEFAULT_BLOCK_SIZE_BYTES = 8 * ONE_MB; diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlobStore.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlobStore.java index 56021294..08fb96c4 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlobStore.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlobStore.java @@ -21,6 +21,7 @@ import java.util.LinkedHashMap; import java.util.Map; import lombok.NonNull; +import org.omg.CORBA.TypeCodePackage.BadKind; import software.amazon.s3.dataaccelerator.common.telemetry.Telemetry; import software.amazon.s3.dataaccelerator.io.physical.PhysicalIOConfiguration; import software.amazon.s3.dataaccelerator.request.ObjectClient; @@ -82,6 +83,16 @@ public Blob get(S3URI s3URI) { telemetry)); } + public void removeFromBlob(S3URI s3URI) { + Blob blob = blobMap.get(s3URI); + + if (blob != null) { + blob.close(); + } + + blobMap.remove(s3URI); + } + /** Closes the {@link BlobStore} and frees up all resources it holds. */ @Override public void close() { diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlockStore.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlockStore.java index 0aaa01e1..0fb4e6d2 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlockStore.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/data/BlockStore.java @@ -127,6 +127,7 @@ private void safeClose(Block block) { @Override public void close() { + LOG.info("Closing BlockStore {}", s3URI); blocks.forEach(this::safeClose); } } diff --git a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/impl/PhysicalIOImpl.java index e021393b..f76e7334 100644 --- a/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/dataaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -164,5 +164,7 @@ private long contentLength() { } @Override - public void close() throws IOException {} + public void close() throws IOException { + this.blobStore.removeFromBlob(s3URI); + } }