diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 304ba032b416a..1fdd6815fea73 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1920,7 +1920,9 @@ private FSDataInputStream executeOpen( .withCallbacks(createInputStreamCallbacks(auditSpan)) .withContext(readContext.build()) .withObjectAttributes(createObjectAttributes(path, fileStatus)) - .withStreamStatistics(inputStreamStats); + .withStreamStatistics(inputStreamStats) + .withAuditSpan(auditSpan); + return new FSDataInputStream(getStore().readObject(parameters)); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index e91710a0af3a0..780875d93c55b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -21,6 +21,7 @@ import java.util.List; import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; @@ -50,6 +51,8 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE; +import static software.amazon.awssdk.core.interceptor.SdkExecutionAttribute.OPERATION_NAME; +import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID; /** * Extract information from a request. @@ -193,6 +196,18 @@ private RequestInfo writing(final String verb, || request instanceof CreateSessionRequest; } + /** + * If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing + * of requests which are made outside S3A's requestFactory. + * + * @param executionAttributes request execution attributes + * @return true if request is audited outside of current span + */ + public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) { + return executionAttributes.getAttribute(SPAN_ID) != null + && executionAttributes.getAttribute(OPERATION_NAME) != null; + } + /** * Predicate which returns true if the request is part of the * multipart upload API -and which therefore must be rejected diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java index 840ce5ffd3084..e1d28bea5f31a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java @@ -61,6 +61,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO; import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan; +import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT; @@ -69,6 +70,8 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER; import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName; +import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME; +import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID; /** * The LoggingAuditor logs operations at DEBUG (in SDK Request) and @@ -85,7 +88,6 @@ public class LoggingAuditor private static final Logger LOG = LoggerFactory.getLogger(LoggingAuditor.class); - /** * Some basic analysis for the logs. */ @@ -267,8 +269,9 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) { */ private class LoggingAuditSpan extends AbstractAuditSpanImpl { - private final HttpReferrerAuditHeader referrer; + private HttpReferrerAuditHeader referrer; + private final HttpReferrerAuditHeader.Builder headerBuilder; /** * Attach Range of data for GetObject Request. * @param request the sdk request to be modified @@ -300,7 +303,7 @@ private LoggingAuditSpan( final String path2) { super(spanId, operationName); - this.referrer = HttpReferrerAuditHeader.builder() + this.headerBuilder = HttpReferrerAuditHeader.builder() .withContextId(getAuditorId()) .withSpanId(spanId) .withOperationName(operationName) @@ -312,8 +315,9 @@ private LoggingAuditSpan( currentThreadID()) .withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp())) .withEvaluated(context.getEvaluatedEntries()) - .withFilter(filters) - .build(); + .withFilter(filters); + + this.referrer = this.headerBuilder.build(); this.description = referrer.buildHttpReferrer(); } @@ -384,12 +388,33 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, SdkHttpRequest httpRequest = context.httpRequest(); SdkRequest sdkRequest = context.request(); + // If spanId and operationName are set in execution attributes, then use these values, + // instead of the ones in the current span. This is useful when requests are happening in dependencies such as + // the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL + // will attach the current spanId and operationName via execution attributes during it's request creation. These + // can then used to update the values in the logger and referrer header. Without this overwriting, the operation + // name and corresponding span will be whichever is active on the thread the request is getting executed on. + boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes); + + String spanId = isRequestAuditedOutsideCurrentSpan ? + executionAttributes.getAttribute(SPAN_ID) : getSpanId(); + + String operationName = isRequestAuditedOutsideCurrentSpan ? + executionAttributes.getAttribute(OPERATION_NAME) : getOperationName(); + + if (isRequestAuditedOutsideCurrentSpan) { + this.headerBuilder.withSpanId(spanId); + this.headerBuilder.withOperationName(operationName); + this.referrer = this.headerBuilder.build(); + } + // attach range for GetObject requests attachRangeFromRequest(httpRequest, executionAttributes); // for delete op, attach the number of files to delete attachDeleteKeySizeAttribute(sdkRequest); + // build the referrer header final String header = referrer.buildHttpReferrer(); // update the outer class's field. @@ -400,11 +425,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, .appendHeader(HEADER_REFERRER, header) .build(); } + if (LOG.isDebugEnabled()) { LOG.debug("[{}] {} Executing {} with {}; {}", currentThreadID(), - getSpanId(), - getOperationName(), + spanId, + operationName, analyzer.analyze(context.request()), header); } @@ -533,10 +559,12 @@ public void beforeExecution(Context.BeforeExecution context, + analyzer.analyze(context.request()); final String unaudited = getSpanId() + " " + UNAUDITED_OPERATION + " " + error; + // If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it + // as an audited request. if (isRequestNotAlwaysInSpan(context.request())) { - // can get by auditing during a copy, so don't overreact + // can get by auditing during a copy, so don't overreact. LOG.debug(unaudited); - } else { + } else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) { final RuntimeException ex = new AuditFailureException(unaudited); LOG.debug(unaudited, ex); if (isRejectOutOfSpan()) { @@ -547,5 +575,4 @@ public void beforeExecution(Context.BeforeExecution context, super.beforeExecution(context, executionAttributes); } } - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 6b910c6538070..551fa08c50ec9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -25,6 +25,7 @@ import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext; import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -205,6 +206,11 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa .etag(parameters.getObjectAttributes().getETag()).build()); } + openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder() + .operationName(parameters.getAuditSpan().getOperationName()) + .spanId(parameters.getAuditSpan().getSpanId()) + .build()); + return openStreamInformationBuilder.build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java index c67c08be7b986..d8377fed94766 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -96,8 +96,7 @@ public StreamFactoryRequirements factoryRequirements() { vectorContext.setMinSeekForVectoredReads(0); return new StreamFactoryRequirements(0, - 0, vectorContext, - StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests); + 0, vectorContext); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java index e784dadcb651a..da4f517090bb7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.store.audit.AuditSpan; import static java.util.Objects.requireNonNull; @@ -69,6 +70,11 @@ public final class ObjectReadParameters { */ private LocalDirAllocator directoryAllocator; + /** + * Span for which this stream is being created. + */ + private AuditSpan auditSpan; + /** * @return Read operation context. */ @@ -172,6 +178,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value return this; } + /** + * Getter. + * @return Audit span. + */ + public AuditSpan getAuditSpan() { + return auditSpan; + } + + /** + * Set audit span. + * @param value new value + * @return the builder + */ + public ObjectReadParameters withAuditSpan(final AuditSpan value) { + auditSpan = value; + return this; + } + /** * Validate that all attributes are as expected. * Mock tests can skip this if required. @@ -185,6 +209,7 @@ public ObjectReadParameters validate() { requireNonNull(directoryAllocator, "directoryAllocator"); requireNonNull(objectAttributes, "objectAttributes"); requireNonNull(streamStatistics, "streamStatistics"); + requireNonNull(auditSpan, "auditSpan"); return this; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index dff171bbdd8eb..6b44c5d5f70c2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; @@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable { verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); fs.close(); verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1); + + // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE, + // in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here + // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS: + // [5-8388612, 8388613-16777220, 16777221-21511173]. + verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); } @Test