-
Notifications
You must be signed in to change notification settings - Fork 37
S3 Table SQL Sink #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
S3 Table SQL Sink #112
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
# Flink Iceberg Sink using SQL API with S3 Tables | ||
|
||
* Flink version: 1.19.0 | ||
* Flink API: SQL API | ||
* Iceberg 1.8.1 | ||
* Language: Java (11) | ||
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) | ||
and [S3 Tables](https://docs.aws.amazon.com/s3/latest/userguide/s3-tables.html) | ||
|
||
This example demonstrates how to use | ||
[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Amazon S3 Tables Catalog. | ||
|
||
For simplicity, the application generates synthetic data, random stock prices, internally. | ||
Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records | ||
that can be converted to table format for SQL operations. | ||
|
||
### Prerequisites | ||
|
||
#### Create a Table Bucket | ||
The sample application expects the S3 Table Bucket to exist and to have the ARN in the local environment: | ||
```bash | ||
aws s3tables create-table-bucket --name flink-example | ||
{ | ||
"arn": "arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example" | ||
|
||
} | ||
``` | ||
|
||
If you already did this, you can query to get the ARN like this: | ||
|
||
```bash | ||
aws s3tables list-table-buckets | ||
``` | ||
|
||
This will show you the list of table buckets. Select the one you wish to write to and paste it into the config file in this project. | ||
|
||
#### Create a Namespace in the Table Bucket (Database) | ||
The sample application expects the Namespace in the Table Bucket to exist | ||
```bash | ||
aws s3tables create-namespace \ | ||
--table-bucket-arn arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example \ | ||
--namespace default | ||
``` | ||
|
||
#### IAM Permissions | ||
|
||
The application must have IAM permissions to: | ||
* Write and Read from the S3 Table | ||
|
||
### Runtime configuration | ||
|
||
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. | ||
|
||
When running locally, the configuration is read from the | ||
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. | ||
|
||
Runtime parameters: | ||
|
||
| Group ID | Key | Default | Description | | ||
|-----------|--------------------------|------------------|---------------------------------------------------------------------------------------------------------------------| | ||
| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | | ||
| `Iceberg` | `table.bucket.arn` | (mandatory) | ARN of the S3 Tables bucket, for example `arn:aws:s3tables:region:account:bucket/my-bucket`. | | ||
| `Iceberg` | `catalog.db` | `iceberg` | Name of the S3 Tables Catalog database. | | ||
| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the S3 Tables Catalog table. | | ||
|
||
### Checkpoints | ||
|
||
Checkpointing must be enabled. Iceberg commits writes on checkpoint. | ||
|
||
When running locally, the application enables checkpoints programmatically, every 30 seconds. | ||
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. | ||
|
||
### Known limitations | ||
|
||
At the moment there are current limitations concerning Flink Iceberg integration: | ||
* Doesn't support Iceberg Table with hidden partitioning | ||
* Doesn't support adding columns, removing columns, renaming columns or changing columns. | ||
|
||
### Running locally, in IntelliJ | ||
|
||
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. | ||
|
||
Make sure to configure the appropriate AWS credentials and region when running locally, and ensure the provided S3 Tables bucket ARN is valid and accessible. | ||
|
||
See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,238 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xmlns="http://maven.apache.org/POM/4.0.0" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.amazonaws</groupId> | ||
<artifactId>s3-table-sql-flink</artifactId> | ||
<version>1.0</version> | ||
<packaging>jar</packaging> | ||
|
||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<target.java.version>11</target.java.version> | ||
<maven.compiler.source>${target.java.version}</maven.compiler.source> | ||
<maven.compiler.target>${target.java.version}</maven.compiler.target> | ||
|
||
<flink.version>1.19.0</flink.version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it only works with Flink 1.19 and not 1.20 we must say it, here as a command and in the README as a big warning |
||
<avro.version>1.11.3</avro.version> | ||
<scala.version>2.12</scala.version> | ||
<hadoop.version>3.4.0</hadoop.version> | ||
<iceberg.version>1.6.1</iceberg.version> | ||
<kda.runtime.version>1.2.0</kda.runtime.version> | ||
<log4j.version>2.23.1</log4j.version> | ||
<junit5.version>5.8.1</junit5.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<!-- Flink Core dependencies --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-runtime-web</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-streaming-java</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-runtime</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-api-java-bridge</artifactId> | ||
<version>${flink.version}</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be |
||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-common</artifactId> | ||
<version>${flink.version}</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Must be |
||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-metrics-dropwizard</artifactId> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is flink-merics-dropwizard actually used? |
||
<version>${flink.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-avro</artifactId> | ||
<version>${flink.version}</version> | ||
</dependency> | ||
|
||
<!-- Flink Table Dependencies --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-planner_${scala.version}</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- MSF Dependencies --> | ||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-kinesisanalytics-runtime</artifactId> | ||
<version>${kda.runtime.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- S3 Tables Dependencies --> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>s3tables</artifactId> | ||
<version>2.31.50</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better making this version a property |
||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.s3tables</groupId> | ||
<artifactId>s3-tables-catalog-for-iceberg</artifactId> | ||
<version>0.1.6</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better making this version a property |
||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-files</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-client</artifactId> | ||
<version>${hadoop.version}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-reload4j</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-common</artifactId> | ||
<version>${hadoop.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-core</artifactId> | ||
<version>${hadoop.version}</version> | ||
</dependency> | ||
|
||
<!-- Iceberg Dependencies --> | ||
<dependency> | ||
<groupId>org.apache.iceberg</groupId> | ||
<artifactId>iceberg-core</artifactId> | ||
<version>${iceberg.version}</version> | ||
</dependency> | ||
<!-- Remove duplicate iceberg-flink dependency --> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this comment mean? |
||
<dependency> | ||
<groupId>org.apache.iceberg</groupId> | ||
<artifactId>iceberg-flink</artifactId> | ||
<version>${iceberg.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.iceberg</groupId> | ||
<artifactId>iceberg-aws-bundle</artifactId> | ||
<version>${iceberg.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.iceberg</groupId> | ||
<artifactId>iceberg-aws</artifactId> | ||
<version>${iceberg.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.iceberg</groupId> | ||
<artifactId>iceberg-flink-1.19</artifactId> | ||
<version>${iceberg.version}</version> | ||
</dependency> | ||
|
||
<!-- Testing Dependencies --> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter</artifactId> | ||
<version>${junit5.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<!-- Logging Dependencies - These should be the ONLY SLF4J binding --> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-slf4j-impl</artifactId> | ||
<version>${log4j.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-api</artifactId> | ||
<version>${log4j.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-core</artifactId> | ||
<version>${log4j.version}</version> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<!-- Java Compiler --> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<version>3.8.1</version> | ||
<configuration> | ||
<source>${target.java.version}</source> | ||
<target>${target.java.version}</target> | ||
</configuration> | ||
</plugin> | ||
|
||
<!-- Shade plugin to build the fat-jar --> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<version>3.2.1</version> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
<configuration> | ||
<artifactSet> | ||
<excludes> | ||
<exclude>org.apache.flink:force-shading</exclude> | ||
<exclude>com.google.code.findbugs:jsr305</exclude> | ||
<exclude>org.slf4j:*</exclude> | ||
<exclude>log4j:*</exclude> | ||
</excludes> | ||
</artifactSet> | ||
<filters> | ||
<filter> | ||
<artifact>*:*</artifact> | ||
<excludes> | ||
<exclude>META-INF/*.SF</exclude> | ||
<exclude>META-INF/*.DSA</exclude> | ||
<exclude>META-INF/*.RSA</exclude> | ||
</excludes> | ||
</filter> | ||
</filters> | ||
<transformers> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> | ||
<mainClass>S3TableSQLJSONExample</mainClass> | ||
</transformer> | ||
</transformers> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to explain why 1.19