Skip to content

Commit 0797163

Browse files
committed
[FLINK-37688] Implement Amazon CloudWatch Metric Sink Connector
1 parent 9d6746b commit 0797163

File tree

55 files changed

+5122
-138
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+5122
-138
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one
4+
~ or more contributor license agreements. See the NOTICE file
5+
~ distributed with this work for additional information
6+
~ regarding copyright ownership. The ASF licenses this file
7+
~ to you under the Apache License, Version 2.0 (the
8+
~ "License"); you may not use this file except in compliance
9+
~ with the License. You may obtain a copy of the License at
10+
~
11+
~ http://www.apache.org/licenses/LICENSE-2.0
12+
~
13+
~ Unless required by applicable law or agreed to in writing, software
14+
~ distributed under the License is distributed on an "AS IS" BASIS,
15+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
~ See the License for the specific language governing permissions and
17+
~ limitations under the License.
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
24+
<parent>
25+
<artifactId>flink-connector-aws-e2e-tests-parent</artifactId>
26+
<groupId>org.apache.flink</groupId>
27+
<version>5.1-SNAPSHOT</version>
28+
</parent>
29+
30+
<modelVersion>4.0.0</modelVersion>
31+
32+
<artifactId>flink-connector-aws-cloudwatch-e2e-tests</artifactId>
33+
<name>Flink : Connectors : AWS : E2E Tests : Amazon CloudWatch</name>
34+
<packaging>jar</packaging>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>flink-streaming-java</artifactId>
40+
<version>${flink.version}</version>
41+
</dependency>
42+
43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-sql-connector-cloudwatch</artifactId>
46+
<version>${project.version}</version>
47+
<scope>test</scope>
48+
</dependency>
49+
50+
<dependency>
51+
<groupId>org.apache.flink</groupId>
52+
<artifactId>flink-connector-cloudwatch</artifactId>
53+
<version>${project.version}</version>
54+
</dependency>
55+
56+
<dependency>
57+
<groupId>org.apache.flink</groupId>
58+
<artifactId>flink-connector-aws-base</artifactId>
59+
<version>${project.version}</version>
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>org.apache.flink</groupId>
64+
<artifactId>flink-connector-aws-base</artifactId>
65+
<version>${project.version}</version>
66+
<type>test-jar</type>
67+
</dependency>
68+
69+
<dependency>
70+
<groupId>org.apache.flink</groupId>
71+
<artifactId>flink-connector-cloudwatch</artifactId>
72+
<version>${project.version}</version>
73+
<type>test-jar</type>
74+
</dependency>
75+
76+
<!-- Other third-party dependencies -->
77+
<dependency>
78+
<groupId>com.google.guava</groupId>
79+
<artifactId>guava</artifactId>
80+
<scope>test</scope>
81+
</dependency>
82+
83+
<dependency>
84+
<groupId>com.fasterxml.jackson.core</groupId>
85+
<artifactId>jackson-databind</artifactId>
86+
<scope>test</scope>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>com.fasterxml.jackson.datatype</groupId>
91+
<artifactId>jackson-datatype-jsr310</artifactId>
92+
<scope>test</scope>
93+
</dependency>
94+
95+
<dependency>
96+
<groupId>software.amazon.awssdk</groupId>
97+
<artifactId>s3</artifactId>
98+
<scope>test</scope>
99+
</dependency>
100+
101+
<dependency>
102+
<groupId>software.amazon.awssdk</groupId>
103+
<artifactId>cloudwatch</artifactId>
104+
<scope>test</scope>
105+
</dependency>
106+
</dependencies>
107+
108+
<build>
109+
<plugins>
110+
<plugin>
111+
<groupId>org.apache.maven.plugins</groupId>
112+
<artifactId>maven-dependency-plugin</artifactId>
113+
<executions>
114+
<execution>
115+
<id>copy</id>
116+
<phase>pre-integration-test</phase>
117+
<goals>
118+
<goal>copy</goal>
119+
</goals>
120+
</execution>
121+
</executions>
122+
<configuration>
123+
<artifactItems>
124+
<artifactItem>
125+
<groupId>org.apache.flink</groupId>
126+
<artifactId>flink-sql-connector-cloudwatch</artifactId>
127+
<version>${project.version}</version>
128+
<destFileName>sql-cloudwatch.jar</destFileName>
129+
<type>jar</type>
130+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
131+
</artifactItem>
132+
</artifactItems>
133+
</configuration>
134+
</plugin>
135+
</plugins>
136+
</build>
137+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.cloudwatch.sink.test;
19+
20+
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
21+
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
22+
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
23+
import org.apache.flink.connector.cloudwatch.sink.CloudWatchSink;
24+
import org.apache.flink.connector.cloudwatch.sink.MetricWriteRequest;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.apache.flink.test.junit5.MiniClusterExtension;
27+
28+
import org.junit.jupiter.api.AfterEach;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
import org.junit.jupiter.api.extension.ExtendWith;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
import org.testcontainers.containers.Network;
35+
import org.testcontainers.junit.jupiter.Container;
36+
import org.testcontainers.junit.jupiter.Testcontainers;
37+
import org.testcontainers.utility.DockerImageName;
38+
import software.amazon.awssdk.core.SdkSystemSetting;
39+
import software.amazon.awssdk.http.SdkHttpClient;
40+
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
41+
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest;
42+
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse;
43+
import software.amazon.awssdk.services.cloudwatch.model.Metric;
44+
import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery;
45+
import software.amazon.awssdk.services.cloudwatch.model.MetricStat;
46+
47+
import java.time.Instant;
48+
import java.util.UUID;
49+
50+
import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
51+
import static org.assertj.core.api.Assertions.assertThat;
52+
53+
/** Integration test for {@link CloudWatchSink}. */
54+
@Testcontainers
55+
@ExtendWith(MiniClusterExtension.class)
56+
public class CloudWatchSinkITCase {
57+
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchSinkITCase.class);
58+
59+
private static String testMetricName;
60+
private static final int NUMBER_OF_ELEMENTS = 50;
61+
62+
private static StreamExecutionEnvironment env;
63+
64+
private CloudWatchClient cloudWatchClient;
65+
private SdkHttpClient httpClient;
66+
private static final Network network = Network.newNetwork();
67+
private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2";
68+
private static final String TEST_NAMESPACE = "test_namespace";
69+
70+
@Container
71+
private static final LocalstackContainer MOCK_CLOUDWATCH_CONTAINER =
72+
new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION))
73+
.withNetwork(network)
74+
.withNetworkAliases("localstack");
75+
76+
@BeforeEach
77+
public void setup() {
78+
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
79+
80+
testMetricName = UUID.randomUUID().toString();
81+
82+
env = StreamExecutionEnvironment.getExecutionEnvironment();
83+
env.setParallelism(1);
84+
85+
httpClient = AWSServicesTestUtils.createHttpClient();
86+
87+
cloudWatchClient =
88+
AWSServicesTestUtils.createAwsSyncClient(
89+
MOCK_CLOUDWATCH_CONTAINER.getEndpoint(),
90+
httpClient,
91+
CloudWatchClient.builder());
92+
93+
LOG.info("Done setting up the localstack.");
94+
}
95+
96+
@AfterEach
97+
public void teardown() {
98+
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
99+
AWSGeneralUtil.closeResources(httpClient, cloudWatchClient);
100+
}
101+
102+
@Test
103+
public void testRandomDataSuccessfullyWritten() throws Exception {
104+
CloudWatchSink<MetricWriteRequest> cloudWatchSink =
105+
CloudWatchSink.<MetricWriteRequest>builder()
106+
.setNamespace(TEST_NAMESPACE)
107+
.setCloudWatchClientProperties(
108+
createConfig(MOCK_CLOUDWATCH_CONTAINER.getEndpoint()))
109+
.build();
110+
111+
Instant testTimestamp = Instant.now();
112+
113+
env.fromSequence(1, NUMBER_OF_ELEMENTS)
114+
.map(
115+
data ->
116+
MetricWriteRequest.builder()
117+
.withMetricName(testMetricName)
118+
.addValue(1.0d)
119+
.withTimestamp(testTimestamp)
120+
.build())
121+
.sinkTo(cloudWatchSink);
122+
123+
env.execute("Integration Test");
124+
125+
GetMetricDataResponse response =
126+
cloudWatchClient.getMetricData(
127+
GetMetricDataRequest.builder()
128+
.metricDataQueries(
129+
MetricDataQuery.builder()
130+
.metricStat(getMetricStat("Sum"))
131+
.build(),
132+
MetricDataQuery.builder()
133+
.metricStat(getMetricStat("SampleCount"))
134+
.build())
135+
.startTime(testTimestamp.minusSeconds(300))
136+
.endTime(testTimestamp.plusSeconds(300))
137+
.build());
138+
139+
response.metricDataResults()
140+
.forEach(
141+
result ->
142+
assertThat(result.values())
143+
.containsExactly(Double.valueOf(NUMBER_OF_ELEMENTS)));
144+
}
145+
146+
private static MetricStat getMetricStat(String stat) {
147+
return MetricStat.builder()
148+
.metric(
149+
Metric.builder()
150+
.namespace(TEST_NAMESPACE)
151+
.metricName(testMetricName)
152+
.build())
153+
.stat(stat)
154+
.period(300)
155+
.build();
156+
}
157+
}

0 commit comments

Comments
 (0)