Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some PoC changes #149

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TelemetryConfiguration {

// Logging reporting is on by default
public static final String LOGGING_ENABLED_KEY = "logging.enabled";
public static final boolean DEFAULT_LOGGING_ENABLED = true;
public static final boolean DEFAULT_LOGGING_ENABLED = false;

// Aggregations are off by default
public static final String AGGREGATIONS_ENABLED_KEY = "aggregations.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package software.amazon.s3.dataaccelerator.io.logical;

import static software.amazon.s3.dataaccelerator.util.Constants.ONE_KB;
import static software.amazon.s3.dataaccelerator.util.Constants.ONE_MB;

import lombok.Builder;
Expand All @@ -29,7 +30,7 @@
@EqualsAndHashCode
public class LogicalIOConfiguration {
private static final boolean DEFAULT_FOOTER_CACHING_ENABLED = true;
private static final long DEFAULT_FOOTER_CACHING_SIZE = ONE_MB;
private static final long DEFAULT_FOOTER_CACHING_SIZE = 32 * ONE_KB;
private static final boolean DEFAULT_SMALL_OBJECT_PREFETCHING_ENABLED = true;
private static final long DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD = 3 * ONE_MB;
private static final int DEFAULT_PARQUET_METADATA_STORE_SIZE = 45;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
package software.amazon.s3.dataaccelerator.io.logical.impl;

import java.io.IOException;
import java.util.Optional;

import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.dataaccelerator.common.telemetry.Telemetry;
import software.amazon.s3.dataaccelerator.io.logical.LogicalIOConfiguration;
import software.amazon.s3.dataaccelerator.io.logical.parquet.ParquetMetadataParsingTask;
import software.amazon.s3.dataaccelerator.io.physical.PhysicalIO;
import software.amazon.s3.dataaccelerator.util.S3URI;

Expand All @@ -29,6 +34,11 @@
public class ParquetLogicalIOImpl extends DefaultLogicalIOImpl {
// Dependencies
private final ParquetPrefetcher parquetPrefetcher;
private final PhysicalIO physicalIO;

private static final Logger LOG = LoggerFactory.getLogger(ParquetLogicalIOImpl.class);

boolean columnsRead = false;

/**
* Constructs an instance of LogicalIOImpl.
Expand All @@ -51,6 +61,8 @@ public ParquetLogicalIOImpl(
this.parquetPrefetcher =
new ParquetPrefetcher(
s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore);

this.physicalIO = physicalIO;
this.parquetPrefetcher.prefetchFooterAndBuildMetadata();
}

Expand All @@ -68,8 +80,22 @@ public ParquetLogicalIOImpl(
public int read(byte[] buf, int off, int len, long position) throws IOException {
// Perform async prefetching before doing the blocking read
this.parquetPrefetcher.prefetchRemainingColumnChunk(position, len);
this.parquetPrefetcher.addToRecentColumnList(position);
Optional<String> columnName = this.parquetPrefetcher.addToRecentColumnList(position, len);

if (columnName.isPresent()) {
columnsRead = true;
}

return super.read(buf, off, len, position);
}

@Override
public void close() throws IOException {
if (columnsRead) {
LOG.info("Columns read!, removing from cache");
physicalIO.close();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package software.amazon.s3.dataaccelerator.io.logical.impl;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -185,10 +186,12 @@ private CompletableFuture<IOPlanExecution> prefetchPredictedColumns(
*
* @param position the position to record
*/
public void addToRecentColumnList(long position) {
public Optional<String> addToRecentColumnList(long position, int len) {
if (logicalIOConfiguration.getPrefetchingMode() != PrefetchMode.OFF) {
this.parquetPredictivePrefetchingTask.addToRecentColumnList(position);
return this.parquetPredictivePrefetchingTask.addToRecentColumnList(position, len);
}

return Optional.empty();
}

private boolean shouldPrefetch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
public class ColumnMetadata {
private final int rowGroupIndex;
private final String columnName;
private final long dataPageOffset;
private final long dictionaryOffset;
private final long startPos;
private final long compressedSize;
private final int schemaHash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ private ColumnMappers buildColumnMaps(FileMetaData fileMetaData) {
new ColumnMetadata(
rowGroupIndex,
columnName,
columnChunk.getMeta_data().getData_page_offset(),
columnChunk.getMeta_data().getDictionary_page_offset(),
columnChunk.getMeta_data().getDictionary_page_offset(),
columnChunk.getMeta_data().getTotal_compressed_size(),
concatenatedColumnNames.hashCode());
Expand All @@ -121,6 +123,8 @@ private ColumnMappers buildColumnMaps(FileMetaData fileMetaData) {
new ColumnMetadata(
rowGroupIndex,
columnName,
columnChunk.getMeta_data().getData_page_offset(),
0,
columnChunk.getFile_offset(),
columnChunk.getMeta_data().getTotal_compressed_size(),
concatenatedColumnNames.hashCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,24 @@ public ParquetPredictivePrefetchingTask(
* @param position current read position
* @return name of column added as recent column
*/
public Optional<String> addToRecentColumnList(long position) {
public Optional<String> addToRecentColumnList(long position, int len) {
if (parquetColumnPrefetchStore.getColumnMappers(s3Uri) != null) {
ColumnMappers columnMappers = parquetColumnPrefetchStore.getColumnMappers(s3Uri);
if (columnMappers.getOffsetIndexToColumnMap().containsKey(position)) {
ColumnMetadata columnMetadata = columnMappers.getOffsetIndexToColumnMap().get(position);
parquetColumnPrefetchStore.addRecentColumn(columnMetadata);

LOG.info("READING COLUMN {}", columnMetadata.getColumnName());

// Maybe prefetch all recent columns for the current row group, if they have not been
// prefetched already.
prefetchCurrentRowGroup(columnMappers, columnMetadata);

return Optional.of(columnMetadata.getColumnName());
if (len / (double) columnMetadata.getCompressedSize() > 0.3) {
return Optional.of(columnMetadata.getColumnName());
}

return Optional.empty();
}
}

Expand Down Expand Up @@ -171,14 +177,27 @@ public IOPlanExecution prefetchRecentColumns(
List<Range> prefetchRanges = new ArrayList<>();
for (String recentColumn : getRecentColumns(columnMappers.getOffsetIndexToColumnMap())) {
if (columnMappers.getColumnNameToColumnMap().containsKey(recentColumn)) {
LOG.debug(
LOG.info(
"Column {} found in schema for {}, adding to prefetch list",
recentColumn,
this.s3Uri.getKey());

List<ColumnMetadata> columnMetadataList =
columnMappers.getColumnNameToColumnMap().get(recentColumn);
for (ColumnMetadata columnMetadata : columnMetadataList) {
if (rowGroupsToPrefetch.contains(columnMetadata.getRowGroupIndex())) {

// A separate request for the dictionary
if (columnMetadata.getDictionaryOffset() != 0) {
prefetchRanges.add(
new Range(
columnMetadata.getDictionaryOffset(),
columnMetadata.getDictionaryOffset() + (columnMetadata.getDataPageOffset() - columnMetadata.getDictionaryOffset() - 1))
);

LOG.info("PREFETCHING DICTIONARY {} - {}", columnMetadata.getDictionaryOffset(), columnMetadata.getDictionaryOffset() + (columnMetadata.getDataPageOffset() - columnMetadata.getDictionaryOffset() - 1));
}

prefetchRanges.add(
new Range(
columnMetadata.getStartPos(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@Builder
@EqualsAndHashCode
public class PhysicalIOConfiguration {
private static final int DEFAULT_CAPACITY_BLOB_STORE = 50;
private static final int DEFAULT_CAPACITY_BLOB_STORE = 18;
private static final int DEFAULT_CAPACITY_METADATA_STORE = 50;
private static final boolean DEFAULT_USE_SINGLE_CACHE = true;
private static final long DEFAULT_BLOCK_SIZE_BYTES = 8 * ONE_MB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import lombok.NonNull;
import org.omg.CORBA.TypeCodePackage.BadKind;
import software.amazon.s3.dataaccelerator.common.telemetry.Telemetry;
import software.amazon.s3.dataaccelerator.io.physical.PhysicalIOConfiguration;
import software.amazon.s3.dataaccelerator.request.ObjectClient;
Expand Down Expand Up @@ -82,6 +83,16 @@ public Blob get(S3URI s3URI) {
telemetry));
}

public void removeFromBlob(S3URI s3URI) {
Blob blob = blobMap.get(s3URI);

if (blob != null) {
blob.close();
}

blobMap.remove(s3URI);
}

/** Closes the {@link BlobStore} and frees up all resources it holds. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private void safeClose(Block block) {

@Override
public void close() {
LOG.info("Closing BlockStore {}", s3URI);
blocks.forEach(this::safeClose);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,7 @@ private long contentLength() {
}

@Override
public void close() throws IOException {}
public void close() throws IOException {
this.blobStore.removeFromBlob(s3URI);
}
}
Loading