Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add x-amz-server-side-encryption header for AES256 SSE without customer keys (non SSE-C) #771

Closed
wants to merge 84 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
9bddff4
[maven-release-plugin] prepare for next development iteration
pbadani May 16, 2023
a184ac7
Merge branch '10.5.x' into master (using strategy ours)
pbadani May 16, 2023
19a81f5
Merge branch '10.5.x' into master by pbadani
ConfluentJenkins May 31, 2023
31f4dd9
Merge branch '10.5.x' into master (using strategy ours)
pbadani Jun 1, 2023
fc964ac
Merge branch '10.5.x' into master (using strategy ours)
pbadani Jun 1, 2023
a4f13f0
Merge branch '10.5.x' into master (using strategy ours)
pbadani Jun 2, 2023
b1f1774
[CC-20979] [CC-20329] Updated CVEs (jackson and snappy package)
sp-gupta Aug 10, 2023
5ea1b6f
Merge pull request #668 from confluentinc/CC-21844
sp-gupta Aug 10, 2023
7d0a66d
Merge branch '10.5.x' into master by sp-gupta
ConfluentJenkins Aug 10, 2023
61f85d6
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Aug 11, 2023
41c16b7
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Aug 11, 2023
343c47e
Merge branch 'master' into pr_merge_from_10_5_x_to_master
aniketshrimal Aug 15, 2023
4440802
Merge pull request #672 from confluentinc/pr_merge_from_10_5_x_to_master
aniketshrimal Aug 16, 2023
1ac12c4
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Aug 16, 2023
bf1ddd6
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Aug 16, 2023
3e5a8c9
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Aug 16, 2023
018b348
Merge branch '10.5.x' into master by pbadani
ConfluentJenkins Aug 18, 2023
4c7852a
Merge branch '10.5.x' into master (using strategy ours)
pbadani Aug 18, 2023
466b471
Merge branch '10.5.x' into master (using strategy ours)
pbadani Aug 18, 2023
82fda82
Merge branch '10.5.x' into master (using strategy ours)
pbadani Aug 18, 2023
7b1277a
Merge branch '10.5.x' into master
ConfluentJenkins Sep 7, 2023
a71e6e3
Merge branch '10.5.x' into master by ashoke-cube
ConfluentJenkins Sep 12, 2023
95a6c05
Merge branch '10.5.x' into master (using strategy ours)
ashoke-cube Sep 12, 2023
a4abd18
Merge branch '10.5.x' into master (using strategy ours)
ashoke-cube Sep 12, 2023
f811c2e
Merge branch '10.5.x' into master (using strategy ours)
ashoke-cube Sep 12, 2023
91eab50
Merge branch '10.5.x' into master
ConfluentJenkins Sep 12, 2023
7bbf581
Merge branch '10.5.x' into master (using strategy ours)
ashoke-cube Sep 12, 2023
cc8f85c
Merge branch '10.5.x' into master (using strategy ours)
ashoke-cube Sep 12, 2023
7c20ca3
Merge branch '10.5.x' into master
ConfluentJenkins Sep 27, 2023
f4e3d61
Merge branch '10.5.x' into master (using strategy ours)
Enigma25 Sep 27, 2023
8483821
Merge branch '10.5.x' into master (using strategy ours)
Enigma25 Sep 27, 2023
daf614b
Merge branch '10.5.x' into master (using strategy ours)
Enigma25 Sep 28, 2023
a789d12
Add suport for custom S3 object tagging
dttung2905 Sep 30, 2023
f6efde4
Fix Failed CI
dttung2905 Sep 30, 2023
03e010e
Fix import format structure
dttung2905 Oct 6, 2023
a0b3d39
Remove extra space
dttung2905 Oct 6, 2023
79f575d
Refactor the code to use a single config field instead
dttung2905 Oct 16, 2023
e6c7ef6
Merge branch '10.5.x'
sp-gupta Oct 27, 2023
9713035
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Oct 29, 2023
212954d
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Oct 29, 2023
0ace4a0
Merge branch '10.5.x' into master
ConfluentJenkins Oct 30, 2023
4484e2c
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Oct 30, 2023
d2464ad
Merge pull request #690 from dttung2905/feat-s3-object-tagging
pbadani Oct 30, 2023
497c8c8
MINOR: Fix checkstyle error.
pbadani Oct 30, 2023
92d2865
Merge pull request #698 from confluentinc/pbadani/fix-checkstyle
pbadani Oct 30, 2023
dbde43e
Merge branch '10.5.x' into master
ConfluentJenkins Nov 8, 2023
ab84ec7
Merge branch '10.5.x' into master
ConfluentSemaphore Feb 9, 2024
b642f60
Merge branch '10.5.x' into master
ConfluentSemaphore Feb 16, 2024
80dcb20
Merge branch '10.5.x' into master
ConfluentSemaphore Feb 22, 2024
f924562
Merge branch '10.5.x' into master
ConfluentSemaphore Feb 22, 2024
281d2ff
Merge branch '10.5.x' into master (using strategy ours)
b-goyal Feb 23, 2024
3212bc6
Merge branch '10.5.x' into master
ConfluentSemaphore Feb 23, 2024
19b7565
Merge branch '10.5.x' into master
ConfluentSemaphore Mar 13, 2024
2415862
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Mar 13, 2024
c434167
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Mar 13, 2024
0fe797c
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Mar 13, 2024
020a389
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Mar 14, 2024
edb3c2d
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Mar 14, 2024
034345f
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Mar 14, 2024
7041fce
DP-13727 Add CodeArtifact package paths to service.yml (#724)
huizheng278 Mar 14, 2024
e5b3adc
Update CODEOWNERS to connect-cloud (#736)
rohits64 Apr 3, 2024
d2ec4e8
Merge branch '10.5.x' into master
ConfluentSemaphore Apr 3, 2024
9570925
Merge branch '10.5.x' into master
ConfluentSemaphore Apr 3, 2024
825f298
Merge branch '10.5.x' into master (using strategy ours)
rohits64 Apr 3, 2024
c7d6177
Merge branch '10.5.x' into master (using strategy ours)
rohits64 Apr 4, 2024
1d267f0
Merge branch '10.5.x' into master
ConfluentSemaphore Apr 18, 2024
5048f1b
Merge branch '10.5.x' into master
ConfluentSemaphore Apr 19, 2024
411e7c4
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Apr 19, 2024
efce72a
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Apr 22, 2024
8651d10
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Apr 23, 2024
fd9c87f
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Apr 24, 2024
0932966
Merge branch '10.5.x' into master
ConfluentSemaphore Apr 30, 2024
3ad4679
Merge branch '10.5.x' into master
ConfluentSemaphore May 6, 2024
a7820be
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 May 9, 2024
b1efca8
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 May 9, 2024
0c2fb36
Merge branch '10.5.x' into master
ConfluentSemaphore May 30, 2024
49d9224
Merge branch '10.5.x' into master by patrick-premont (using strategy…
ConfluentSemaphore Jun 1, 2024
20077cc
Merge branch '10.5.x' into master
ConfluentSemaphore Jun 12, 2024
bc7098d
Merge branch '10.5.x' into master
ConfluentSemaphore Jun 18, 2024
c90780d
Merge branch '10.5.x' into master (using strategy ours)
subhashiyer9 Jun 20, 2024
8e490c8
Merge branch '10.5.x' into master
ConfluentSemaphore Aug 5, 2024
a40e3a4
Merge branch '10.5.x' into master (using strategy ours)
sp-gupta Aug 6, 2024
eaa37cc
Merge branch '10.5.x' into master
ConfluentSemaphore Aug 6, 2024
43e7c7b
Add x-amz-server-side-encryption header for AES256 SSE without custom…
85danf Aug 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-connect-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-storage-cloud</artifactId>
<version>10.5.15-SNAPSHOT</version>
<version>10.6.0-SNAPSHOT</version>
</parent>

<artifactId>kafka-connect-s3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {

public static final String S3_OBJECT_TAGGING_CONFIG = "s3.object.tagging";
public static final boolean S3_OBJECT_TAGGING_DEFAULT = false;
public static final String S3_OBJECT_TAGGING_EXTRA_KV = "s3.object.tagging.key.value.pairs";
public static final String S3_OBJECT_TAGGING_EXTRA_KV_DEFAULT = "";

public static final String S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG =
"s3.object.behavior.on.tagging.error";
Expand Down Expand Up @@ -321,6 +323,18 @@ public static ConfigDef newConfigDef() {
"S3 Object Tagging"
);

configDef.define(
S3_OBJECT_TAGGING_EXTRA_KV,
Type.LIST,
S3_OBJECT_TAGGING_EXTRA_KV_DEFAULT,
Importance.LOW,
"Additional S3 tag key value pairs",
group,
++orderInGroup,
Width.LONG,
"S3 Object Tagging Extra Key Value pairs"
);

configDef.define(
S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -78,6 +79,8 @@ public class TopicPartitionWriter {
private final Queue<SinkRecord> buffer;
private final SinkTaskContext context;
private final boolean isTaggingEnabled;
private final List<String> extraTagKeyValuePair;
private HashMap<String, String> hashMapTag;
private final boolean ignoreTaggingErrors;
private int recordCount;
private final int flushSize;
Expand Down Expand Up @@ -146,6 +149,9 @@ public TopicPartitionWriter(TopicPartition tp,
}

isTaggingEnabled = connectorConfig.getBoolean(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG);
extraTagKeyValuePair =
connectorConfig.getList(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV);
getS3Tag();
ignoreTaggingErrors = connectorConfig.getString(
S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG)
.equalsIgnoreCase(S3SinkConnectorConfig.IgnoreOrFailBehavior.IGNORE.toString());
Expand Down Expand Up @@ -192,6 +198,16 @@ public TopicPartitionWriter(TopicPartition tp,
setNextScheduledRotation();
}

private void getS3Tag() {
hashMapTag = new HashMap<>();
if (extraTagKeyValuePair.size() != 0) {
for (int i = 0; i < extraTagKeyValuePair.size(); i++) {
String[] singleKv = extraTagKeyValuePair.get(i).split(":");
hashMapTag.put(singleKv[0], singleKv[1]);
}
}
}

private enum State {
WRITE_STARTED,
WRITE_PARTITION_PAUSED,
Expand Down Expand Up @@ -639,7 +655,11 @@ private void commitFiles() {
String encodedPartition = entry.getKey();
commitFile(encodedPartition);
if (isTaggingEnabled) {
RetryUtil.exponentialBackoffRetry(() -> tagFile(encodedPartition, entry.getValue()),
RetryUtil.exponentialBackoffRetry(() -> tagFile(
encodedPartition,
entry.getValue(),
hashMapTag
),
ConnectException.class,
connectorConfig.getInt(S3_PART_RETRIES_CONFIG),
connectorConfig.getLong(S3_RETRY_BACKOFF_CONFIG)
Expand Down Expand Up @@ -674,7 +694,10 @@ private void commitFile(String encodedPartition) {
}
}

private void tagFile(String encodedPartition, String s3ObjectPath) {
private void tagFile(
String encodedPartition,
String s3ObjectPath,
Map<String,String> extraHashMapTag) {
Long startOffset = startOffsets.get(encodedPartition);
Long endOffset = endOffsets.get(encodedPartition);
Long recordCount = recordCounts.get(encodedPartition);
Expand All @@ -697,7 +720,9 @@ private void tagFile(String encodedPartition, String s3ObjectPath) {
tags.put("startOffset", Long.toString(startOffset));
tags.put("endOffset", Long.toString(endOffset));
tags.put("recordCount", Long.toString(recordCount));

if (extraHashMapTag != null) {
tags.putAll(extraHashMapTag);
}
try {
storage.addTags(s3ObjectPath, tags);
log.info("Tagged S3 object {} with starting offset {}, ending offset {}, record count {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
Expand Down Expand Up @@ -205,12 +206,24 @@ private void internalClose() throws IOException {
super.close();
}

private ObjectMetadata newObjectMetadata() {
ObjectMetadata meta = new ObjectMetadata();
if (StringUtils.isNotBlank(ssea)) {
meta.setSSEAlgorithm(ssea);
}
return meta;
}

private MultipartUpload newMultipartUpload() throws IOException {
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(
bucket,
key
).withCannedACL(cannedAcl);

if (SSEAlgorithm.AES256.toString().equalsIgnoreCase(ssea) && sseCustomerKey == null) {
initRequest.setObjectMetadata(newObjectMetadata());
}

if (SSEAlgorithm.KMS.toString().equalsIgnoreCase(ssea)
&& StringUtils.isNotBlank(sseKmsKeyId)) {
log.debug("Using KMS Key ID: {}", sseKmsKeyId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -338,6 +339,15 @@ public void testConfigurableS3ObjectTaggingConfigs() {
properties.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "true");
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(true, connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG));
assertEquals(new ArrayList<String>(), connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV));

properties.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV, "key1:value1,key2:value2");
List<String> expectedConfigKeyValuePair = new ArrayList<String>() {{
add("key1:value1");
add("key2:value2");
}};
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(expectedConfigKeyValuePair, connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV));

properties.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "false");
connectorConfig = new S3SinkConnectorConfig(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,44 @@ public void testAddingS3ObjectTags() throws Exception{
verifyTags(expectedTaggedFiles);
}

@Test
public void testAddingAdditionalS3ObjectTags() throws Exception{
// Setting size-based rollup to 10 but will produce fewer records. Commit should not happen.
localProps.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "true");
localProps.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV, "key1:value1,key2:value2");
setUp();

// Define the partitioner
Partitioner<?> partitioner = new DefaultPartitioner<>();
partitioner.configure(parsedConfig);
TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null);

String key = "key";
Schema schema = createSchema();
List<Struct> records = createRecordBatches(schema, 3, 3);

Collection<SinkRecord> sinkRecords = createSinkRecords(records, key, schema);

for (SinkRecord record : sinkRecords) {
topicPartitionWriter.buffer(record);
}

// Test actual write
topicPartitionWriter.write();
topicPartitionWriter.close();

// Check expected s3 object tags
String dirPrefix = partitioner.generatePartitionedPath(TOPIC, "partition=" + PARTITION);
Map<String, List<Tag>> expectedTaggedFiles = new HashMap<>();
expectedTaggedFiles.put(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 0, extension, ZERO_PAD_FMT),
Arrays.asList(new Tag("startOffset", "0"), new Tag("endOffset", "2"), new Tag("recordCount", "3"), new Tag("key1", "value1"), new Tag("key2", "value2")));
expectedTaggedFiles.put(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 3, extension, ZERO_PAD_FMT),
Arrays.asList(new Tag("startOffset", "3"), new Tag("endOffset", "5"), new Tag("recordCount", "3"), new Tag("key1", "value1"), new Tag("key2", "value2")));
expectedTaggedFiles.put(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 6, extension, ZERO_PAD_FMT),
Arrays.asList(new Tag("startOffset", "6"), new Tag("endOffset", "8"), new Tag("recordCount", "3")));
verifyTags(expectedTaggedFiles);
}
@Test
public void testIgnoreS3ObjectTaggingSdkClientException() throws Exception {
// Tagging error occurred (SdkClientException) but getting ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testBasicRecordsWrittenAvro() throws Throwable {
props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName());
final String topicNameWithExt = "other." + AVRO_EXTENSION + ".topic." + AVRO_EXTENSION;

// Add an extra topic with this extension inside of the name
// Add an extra topic with this extension inside the name
// Use a TreeSet for test determinism
Set<String> topicNames = new TreeSet<>(Collections.singletonList(topicName));

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-storage-cloud</artifactId>
<packaging>pom</packaging>
<version>10.5.15-SNAPSHOT</version>
<version>10.6.0-SNAPSHOT</version>
<name>kafka-connect-storage-cloud</name>
<organization>
<name>Confluent, Inc.</name>
Expand All @@ -50,7 +50,7 @@
<connection>scm:git:git://github.com/confluentinc/kafka-connect-storage-cloud.git</connection>
<developerConnection>scm:git:[email protected]:confluentinc/kafka-connect-storage-cloud.git</developerConnection>
<url>https://github.com/confluentinc/kafka-connect-storage-cloud</url>
<tag>10.5.x</tag>
<tag>HEAD</tag>
</scm>

<modules>
Expand Down
5 changes: 5 additions & 0 deletions service.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ semaphore:
extra_build_args: "-Dcloud -Pjenkins"
run_pint_merge: true
generate_connect_changelogs: true
code_artifact:
enable: true
package_paths:
- maven-snapshots/maven/io.confluent/kafka-connect-s3
- maven-snapshots/maven/io.confluent/kafka-connect-storage-cloud