Skip to content

Commit

Permalink
Add example of data streaming with apache kafka + apache flink
Browse files Browse the repository at this point in the history
Add missing requirements.txt + fix .env

Add example.json and fill in readme.

Add proper email.

Specify CrateDB version

Add missing .env

Rename directory + improve kafka environment

Add missing weather_producer

Add example of data streaming with apache kafka + apache flink
  • Loading branch information
surister committed Mar 12, 2024
1 parent 4c9056e commit df13db1
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 0 deletions.
20 changes: 20 additions & 0 deletions application/apache-kafka-flink-streaming/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CRATEDB_HOST=crate
CRATEDB_PORT=4200
CRATEDB_PG_PORT=5432

WEATHER_PRODUCER_CITY=Vienna
WEATHER_PRODUCER_API_KEY=#GET THE API KEY - https://www.weatherapi.com/
WEATHER_PRODUCER_FETCH_EVERY_SECONDS=30
WEATHER_PRODUCER_KAFKA_TOPIC=weather_topic
WEATHER_PRODUCER_KAFKA_BOOTSTRAP_SERVER=kafka

FLINK_CONSUMER_KAFKA_TOPIC=weather_topic
FLINK_CONSUMER_BOOTSTRAP_SERVER=kafka
FLINK_CONSUMER_CRATEDB_PG_URI=jdbc:postgresql://crate:5432/crate
FLINK_CONSUMER_CRATE_USER=crate
FLINK_CONSUMER_CRATE_PASSWORD=empty

# Jar versions.
POSTGRESQL_JAR_VERSION=42.7.2
FLINK_CONNECTOR_JDBC_VERSION=3.1.2-1.18
FLINK_KAFKA_JAR_URL_VERSION=3.1.0-1.18
7 changes: 7 additions & 0 deletions application/apache-kafka-flink-streaming/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.10

WORKDIR /app
COPY * /app

RUN pip install poetry
RUN poetry config virtualenvs.create false && poetry install
135 changes: 135 additions & 0 deletions application/apache-kafka-flink-streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Streaming data with Apache Kafka, Apache Flink and CrateDB.

## About

This example showcases what a data-streaming architecture leveraging Kafka and Flink could look
like.

We use.

- Kafka (confluent)
- Apache Flink
- CrateDB
- Python >=3.7<=3.11
-
[img of the thing]

## Overview

An HTTP call is scheduled to run every 60 seconds on `weather_producer`, the API returns a JSON
with the specified city's weather, the json is then sent through `Kafka`.

`flink_consumer` is a flink application consuming the same kafka topic;
upon receiving data, it sends the resulting datastream to the sink, which is `CrateDB`

Both `flink_consumer` and `weather_producer` are written using their respective Python Wrappers.

[kafka-python](https://kafka-python.readthedocs.io/en/master/)

[apache-flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/overview/)

Everything is customizable via environment variables, the API schedule, the topic, credentials...
etc.

See `.env` more details.

## How to use

There is ready-to-use docker-compose, fill in the .env with the API
Key (Get it here https://www.weatherapi.com/).

### Run the docker compose (and build the images)

`docker compose up -d --build`

### Stop the docker compose

`docker compose down`

### Poetry

`poetry install`

### Pip

`pip install -r requirements.txt`

## Notes

### CrateDB initial settings.

CrateDB stores the shard indexes on the file system by mapping a file into memory (mmap)
You might need to set `max_map_count` to something higher than the usual default, like `262144`.

You can do it by running `sysctl -w vm.max_map_count=262144`,
for more information see: [this](https://cratedb.com/docs/guide/admin/bootstrap-checks.html#linux)

### Mock API call.

If you don't want to register in the weather api we use, you can use the
provided `mock_fetch_weather_data`, call this instead in the scheduler call.

This is what it'd look like.

```python
scheduler.enter(
RUN_EVERY_SECONDS,
1,
schedule_every,
(RUN_EVERY_SECONDS, mock_fetch_weather_data, scheduler)
)
```

*after changing this, re-build the docker compose.*

### Initial kafka topic.

In this example the `Kafka` topic is only initialized the first data is sent to it, because of this
the flink job could fail if it exceeds the default timeout (60) seconds, this might only happen
if the API takes too long to respond *the very first time this project*.

To solve this, you should [configure](https://kafka.apache.org/quickstart#quickstart_createtopic)
the
topics on boot time. This is recommended for production scenarios.

If you are just testing things around, you can solve this by re-running `docker compose up -d`, it
will only start `flink_job` and assuming everything went ok, the topic should already exist and
work as expected.

If it still fails, check if any other container/service is down,
it could be a symptom of a wrong api token or an unresponsive Kafka server, for example.

## Data and schema

See `example.json` for the schema, as you can see in `weather_producer` and `flink_consumer`, schema
manipulation is minimum,
thanks to CrateDB's dynamic objects we only need to map `location` and `current` keys.

For more information on dynamic objects
see: [this](https://cratedb.com/blog/handling-dynamic-objects-in-cratedb)

In `weather_producer` the `Kafka` producer directly serializes the json into a string.

```python
KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER,
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
```

In `flink_consumer` we use a `JSON` serializer and only specify the two main keys,
`location` and `current`

```python
row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()])
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
```

If your format is not json, or if you want to specify the whole schema, adapt it as needed.

[Here](https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/connectors.html)
you can find example of other formats like `csv` or `avro`.

## Jars and versions.

Jars are downloaded at build time to /app/jars, versions are pinned in the .env

There is a `JARS_PATH` in `flink_consumer`, change it if you have the jars somewhere else.
58 changes: 58 additions & 0 deletions application/apache-kafka-flink-streaming/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
services:
weather_producer:
env_file:
- .env
build:
context: .
dockerfile: Dockerfile
command: python -m weather_producer
depends_on:
- kafka

flink_job:
env_file:
- .env
build:
context: .
dockerfile: flink_job.Dockerfile
args:
- POSTGRESQL_JAR_URL=jdbc.postgresql.org/download/postgresql-${POSTGRESQL_JAR_VERSION}.jar
- FLINK_SQL_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/${FLINK_CONNECTOR_JDBC_VERSION}/flink-connector-jdbc-${FLINK_CONNECTOR_JDBC_VERSION}.jar
- FLINK_KAFKA_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/${FLINK_KAFKA_JAR_URL_VERSION}/flink-sql-connector-kafka-${FLINK_KAFKA_JAR_URL_VERSION}.jar
command: python -m flink_consumer
depends_on:
- kafka

crate:
image: crate:5.6.2
ports:
- "4200:4200"
command: [ "crate",
"-Cdiscovery.type=single-node",
]
environment:
- CRATE_HEAP_SIZE=2g

zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-server:6.2.0
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
41 changes: 41 additions & 0 deletions application/apache-kafka-flink-streaming/example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"location": {
"localtime": "2024-03-07 18:20",
"country": "France",
"localtime_epoch": 1709832024,
"name": "Nonette",
"lon": 3.28,
"region": "Auvergne",
"lat": 45.48,
"tz_id": "Europe/Paris"
},
"current": {
"feelslike_c": 11,
"uv": 3,
"last_updated": "2024-03-07 18:15",
"feelslike_f": 51.7,
"wind_degree": 30,
"last_updated_epoch": 1709831700,
"is_day": 1,
"precip_in": 0,
"wind_dir": "NNE",
"gust_mph": 12.1,
"temp_c": 12,
"pressure_in": 29.83,
"gust_kph": 19.5,
"temp_f": 53.6,
"precip_mm": 0,
"cloud": 0,
"wind_kph": 6.8,
"condition": {
"code": 1000,
"icon": "//cdn.weatherapi.com/weather/64x64/day/113.png",
"text": "Sunny"
},
"wind_mph": 4.3,
"vis_km": 10,
"humidity": 50,
"pressure_mb": 1010,
"vis_miles": 6
}
}
67 changes: 67 additions & 0 deletions application/apache-kafka-flink-streaming/flink_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import logging

from pathlib import Path

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema

logging.basicConfig(
format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s',
level=logging.DEBUG
)

JARS_PATH = Path(__file__).parent / 'jars'

KAFKA_BOOTSTRAP_SERVER = os.getenv('FLINK_CONSUMER_BOOTSTRAP_SERVER')
KAFKA_TOPIC = os.getenv('FLINK_CONSUMER_KAFKA_TOPIC')
CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI', 'jdbc:postgresql://localhost:5432/crate')
CRATEDB_USER = os.getenv('FLINK_CONSUMER_CRATE_USER')
CRATEDB_PASSWORD = os.getenv('FLINK_CONSUMER_CRATE_PASSWORD')


def kafka_to_cratedb(env: StreamExecutionEnvironment):
row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()])
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()

# Consumes data from Kafka.
kafka_consumer = FlinkKafkaConsumer(
topics=KAFKA_TOPIC,
deserialization_schema=json_format,
properties={'bootstrap.servers': f'{KAFKA_BOOTSTRAP_SERVER}:9092'}
)
kafka_consumer.set_start_from_latest()

ds = env.add_source(kafka_consumer, source_name='kafka')

# Writes data to cratedb.
ds.add_sink(
JdbcSink.sink(
"insert into doc.weather_flink_sink (location, current) values (?, ?)",
row_type_info,
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url(CRATEDB_PG_URI)
.with_driver_name('org.postgresql.Driver')
.with_user_name(CRATEDB_USER)
.with_password(CRATEDB_PASSWORD)
.build(),
JdbcExecutionOptions.builder()
.with_batch_interval_ms(1000)
.with_batch_size(200)
.with_max_retries(5)
.build()
)
)
env.execute()


if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
jars = list(map(lambda x: 'file://' + str(x), (JARS_PATH.glob('*.jar'))))
env.add_jars(*jars)

logging.info("Reading data from kafka")
kafka_to_cratedb(env)
18 changes: 18 additions & 0 deletions application/apache-kafka-flink-streaming/flink_job.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.10
# Python version is important, because as of today (2024-03-04) kafka-flink is only
# supported for python<=3.10

ARG POSTGRESQL_JAR_URL
ARG FLINK_SQL_JAR_URL
ARG FLINK_KAFKA_JAR_URL

WORKDIR /app
COPY * /app
RUN wget ${POSTGRESQL_JAR_URL} --directory-prefix=/app/jars
RUN wget ${FLINK_SQL_JAR_URL} --directory-prefix=/app/jars
RUN wget ${FLINK_KAFKA_JAR_URL} --directory-prefix=/app/jars

RUN apt update && apt install -y openjdk-11-jdk
RUN pip install poetry

RUN poetry config virtualenvs.create false && poetry install
16 changes: 16 additions & 0 deletions application/apache-kafka-flink-streaming/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[tool.poetry]
name = "cratedb-weather-data"
version = "0.1.0"
description = ""
authors = ["ivan.sanchez <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.9"
requests = "^2.31.0"
kafka-python = "^2.0.2"
apache-flink = "^1.18.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Loading

0 comments on commit df13db1

Please sign in to comment.