Skip to content

HADOOP-19365. AAL support for auditing. #7723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,9 @@ private FSDataInputStream executeOpen(
.withCallbacks(createInputStreamCallbacks(auditSpan))
.withContext(readContext.build())
.withObjectAttributes(createObjectAttributes(path, fileStatus))
.withStreamStatistics(inputStreamStats);
.withStreamStatistics(inputStreamStats)
.withAuditSpan(auditSpan);

return new FSDataInputStream(getStore().readObject(parameters));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
Expand Down Expand Up @@ -50,6 +51,8 @@
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
import static software.amazon.awssdk.core.interceptor.SdkExecutionAttribute.OPERATION_NAME;
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;

Check failure on line 55 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java#L55

javadoc: error: cannot find symbol

Check failure on line 55 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java#L55

javadoc: error: static import only from classes and interfaces

/**
* Extract information from a request.
Expand Down Expand Up @@ -193,6 +196,18 @@
|| request instanceof CreateSessionRequest;
}

/**
* If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing
* of requests which are made outside S3A's requestFactory.
*
* @param executionAttributes request execution attributes
* @return true if request is audited outside of current span
*/
public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
return executionAttributes.getAttribute(SPAN_ID) != null
&& executionAttributes.getAttribute(OPERATION_NAME) != null;
}

/**
* Predicate which returns true if the request is part of the
* multipart upload API -and which therefore must be rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
Expand All @@ -69,6 +70,8 @@
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;

Check failure on line 73 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java#L73

javadoc: error: cannot find symbol

Check failure on line 73 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java#L73

javadoc: error: static import only from classes and interfaces
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;

Check failure on line 74 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java#L74

javadoc: error: cannot find symbol

Check failure on line 74 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java#L74

javadoc: error: static import only from classes and interfaces

/**
* The LoggingAuditor logs operations at DEBUG (in SDK Request) and
Expand All @@ -85,7 +88,6 @@
private static final Logger LOG =
LoggerFactory.getLogger(LoggingAuditor.class);


/**
* Some basic analysis for the logs.
*/
Expand Down Expand Up @@ -267,8 +269,9 @@
*/
private class LoggingAuditSpan extends AbstractAuditSpanImpl {

private final HttpReferrerAuditHeader referrer;
private HttpReferrerAuditHeader referrer;

private final HttpReferrerAuditHeader.Builder headerBuilder;
/**
* Attach Range of data for GetObject Request.
* @param request the sdk request to be modified
Expand Down Expand Up @@ -300,7 +303,7 @@
final String path2) {
super(spanId, operationName);

this.referrer = HttpReferrerAuditHeader.builder()
this.headerBuilder = HttpReferrerAuditHeader.builder()
.withContextId(getAuditorId())
.withSpanId(spanId)
.withOperationName(operationName)
Expand All @@ -312,8 +315,9 @@
currentThreadID())
.withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
.withEvaluated(context.getEvaluatedEntries())
.withFilter(filters)
.build();
.withFilter(filters);

this.referrer = this.headerBuilder.build();

this.description = referrer.buildHttpReferrer();
}
Expand Down Expand Up @@ -384,12 +388,33 @@
SdkHttpRequest httpRequest = context.httpRequest();
SdkRequest sdkRequest = context.request();

// If spanId and operationName are set in execution attributes, then use these values,
// instead of the ones in the current span. This is useful when requests are happening in dependencies such as
// the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL
// will attach the current spanId and operationName via execution attributes during it's request creation. These
// can then used to update the values in the logger and referrer header. Without this overwriting, the operation
// name and corresponding span will be whichever is active on the thread the request is getting executed on.
boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes);

String spanId = isRequestAuditedOutsideCurrentSpan ?
executionAttributes.getAttribute(SPAN_ID) : getSpanId();

String operationName = isRequestAuditedOutsideCurrentSpan ?
executionAttributes.getAttribute(OPERATION_NAME) : getOperationName();

if (isRequestAuditedOutsideCurrentSpan) {
this.headerBuilder.withSpanId(spanId);
this.headerBuilder.withOperationName(operationName);
this.referrer = this.headerBuilder.build();
}

// attach range for GetObject requests
attachRangeFromRequest(httpRequest, executionAttributes);

// for delete op, attach the number of files to delete
attachDeleteKeySizeAttribute(sdkRequest);


// build the referrer header
final String header = referrer.buildHttpReferrer();
// update the outer class's field.
Expand All @@ -400,11 +425,12 @@
.appendHeader(HEADER_REFERRER, header)
.build();
}

if (LOG.isDebugEnabled()) {
LOG.debug("[{}] {} Executing {} with {}; {}",
currentThreadID(),
getSpanId(),
getOperationName(),
spanId,
operationName,
analyzer.analyze(context.request()),
header);
}
Expand Down Expand Up @@ -533,10 +559,12 @@
+ analyzer.analyze(context.request());
final String unaudited = getSpanId() + " "
+ UNAUDITED_OPERATION + " " + error;
// If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it
// as an audited request.
if (isRequestNotAlwaysInSpan(context.request())) {
// can get by auditing during a copy, so don't overreact
// can get by auditing during a copy, so don't overreact.
LOG.debug(unaudited);
} else {
} else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
final RuntimeException ex = new AuditFailureException(unaudited);
LOG.debug(unaudited, ex);
if (isRejectOutOfSpan()) {
Expand All @@ -547,5 +575,4 @@
super.beforeExecution(context, executionAttributes);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;

Check failure on line 28 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java#L28

javadoc: error: cannot find symbol
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
Expand Down Expand Up @@ -205,6 +206,11 @@
.etag(parameters.getObjectAttributes().getETag()).build());
}

openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
.operationName(parameters.getAuditSpan().getOperationName())
.spanId(parameters.getAuditSpan().getSpanId())
.build());

return openStreamInformationBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public StreamFactoryRequirements factoryRequirements() {
vectorContext.setMinSeekForVectoredReads(0);

return new StreamFactoryRequirements(0,
0, vectorContext,
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
0, vectorContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.store.audit.AuditSpan;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -69,6 +70,11 @@ public final class ObjectReadParameters {
*/
private LocalDirAllocator directoryAllocator;

/**
* Span for which this stream is being created.
*/
private AuditSpan auditSpan;

/**
* @return Read operation context.
*/
Expand Down Expand Up @@ -172,6 +178,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value
return this;
}

/**
* Getter.
* @return Audit span.
*/
public AuditSpan getAuditSpan() {
return auditSpan;
}

/**
* Set audit span.
* @param value new value
* @return the builder
*/
public ObjectReadParameters withAuditSpan(final AuditSpan value) {
auditSpan = value;
return this;
}

/**
* Validate that all attributes are as expected.
* Mock tests can skip this if required.
Expand All @@ -185,6 +209,7 @@ public ObjectReadParameters validate() {
requireNonNull(directoryAllocator, "directoryAllocator");
requireNonNull(objectAttributes, "objectAttributes");
requireNonNull(streamStatistics, "streamStatistics");
requireNonNull(auditSpan, "auditSpan");
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
Expand Down Expand Up @@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
fs.close();
verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);

// Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE,
// in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS:
// [5-8388612, 8388613-16777220, 16777221-21511173].
verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
}

@Test
Expand Down