Skip to content

S3 Table SQL Sink #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions java/Iceberg/S3TableSQLJSON/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Flink Iceberg Sink using SQL API with S3 Tables

* Flink version: 1.19.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explain why 1.19

* Flink API: SQL API
* Iceberg 1.8.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [S3 Tables](https://docs.aws.amazon.com/s3/latest/userguide/s3-tables.html)

This example demonstrates how to use
[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Amazon S3 Tables Catalog.

For simplicity, the application generates synthetic data, random stock prices, internally.
Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records
that can be converted to table format for SQL operations.

### Prerequisites

#### Create a Table Bucket
The sample application expects the S3 Table Bucket to exist and to have the ARN in the local environment:
```bash
aws s3tables create-table-bucket --name flink-example
{
"arn": "arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example"

}
```

If you already did this, you can query to get the ARN like this:

```bash
aws s3tables list-table-buckets
```

This will show you the list of table buckets. Select the one you wish to write to and paste it into the config file in this project.

#### Create a Namespace in the Table Bucket (Database)
The sample application expects the Namespace in the Table Bucket to exist
```bash
aws s3tables create-namespace \
--table-bucket-arn arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example \
--namespace default
```

#### IAM Permissions

The application must have IAM permissions to:
* Write and Read from the S3 Table

### Runtime configuration

When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.

When running locally, the configuration is read from the
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.

Runtime parameters:

| Group ID | Key | Default | Description |
|-----------|--------------------------|------------------|---------------------------------------------------------------------------------------------------------------------|
| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. |
| `Iceberg` | `table.bucket.arn` | (mandatory) | ARN of the S3 Tables bucket, for example `arn:aws:s3tables:region:account:bucket/my-bucket`. |
| `Iceberg` | `catalog.db` | `iceberg` | Name of the S3 Tables Catalog database. |
| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the S3 Tables Catalog table. |

### Checkpoints

Checkpointing must be enabled. Iceberg commits writes on checkpoint.

When running locally, the application enables checkpoints programmatically, every 30 seconds.
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.

### Known limitations

At the moment there are current limitations concerning Flink Iceberg integration:
* Doesn't support Iceberg Table with hidden partitioning
* Doesn't support adding columns, removing columns, renaming columns or changing columns.

### Running locally, in IntelliJ

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

Make sure to configure the appropriate AWS credentials and region when running locally, and ensure the provided S3 Tables bucket ARN is valid and accessible.

See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
238 changes: 238 additions & 0 deletions java/Iceberg/S3TableSQLJSON/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>s3-table-sql-flink</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>

<flink.version>1.19.0</flink.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it only works with Flink 1.19 and not 1.20 we must say it, here as a command and in the README as a big warning

<avro.version>1.11.3</avro.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.4.0</hadoop.version>
<iceberg.version>1.6.1</iceberg.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
</properties>

<dependencies>
<!-- Flink Core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be provided

</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be provided

</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is flink-merics-dropwizard actually used?

<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Flink Table Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- MSF Dependencies -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<!-- S3 Tables Dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3tables</artifactId>
<version>2.31.50</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better making this version a property

</dependency>
<dependency>
<groupId>software.amazon.s3tables</groupId>
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
<version>0.1.6</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better making this version a property

</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>

<!-- Iceberg Dependencies -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Remove duplicate iceberg-flink dependency -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.19</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- Testing Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<!-- Logging Dependencies - These should be the ONLY SLF4J binding -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

<!-- Shade plugin to build the fat-jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>S3TableSQLJSONExample</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading