Skip to content

xushiyan/kafka-connect-file2es

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ETL file data to ElasticSearch

Overview

overview

Run the Pipeline

Start Services

docker-compose up -d

After services fully start, visit

Create Connectors

# file source connector for topic: analytics.server.logs
curl -X POST http://localhost:8083/connectors \
    -H 'Content-Type:application/json' \
    -H 'Accept:application/json' \
    -d @connectors/connect.source.file.analytics.server.logs.json

# file source connector for topic: analytics.user.events
curl -X POST http://localhost:8083/connectors \
    -H 'Content-Type:application/json' \
    -H 'Accept:application/json' \
    -d @connectors/connect.source.file.analytics.user.events.json

# elasticsearch sink connector for all topics
curl -X POST http://localhost:8083/connectors \
    -H 'Content-Type:application/json' \
    -H 'Accept:application/json' \
    -d @connectors/connect.sink.elasticsearch.json

Generate Data

Write data to files

# messages for topic: analytics.server.logs
echo '{"ts":1528697281001,"id":"ac1393","log":"Nam congue pretium ligula, ac susc ."}' >> data/analytics.server.logs.txt
echo '{"ts":1528697281002,"id":"ac1393","log":"Quisque pretium justo massa, ac laet"}' >> data/analytics.server.logs.txt

# messages for topic: analytics.user.events
echo '{"ts":1528591539001,"id":"12bfc4","event":"login"}'  >> data/analytics.user.events.txt
echo '{"ts":1528592539001,"id":"12bfc4","event":"logout"}' >> data/analytics.user.events.txt
echo '{"ts":1528593539001,"id":"12bfc4","event":"login"}'  >> data/analytics.user.events.txt
echo '{"ts":1528594539001,"id":"12bfc4","event":"logout"}' >> data/analytics.user.events.txt

View Data in ElasticSearch

# custom source connector that generates random data
curl -X POST http://localhost:8083/connectors \
    -H 'Content-Type:application/json' \
    -H 'Accept:application/json' \
    -d @connectors/connect.source.datagen.json

Based on the configuration, the connector task generates 10 messages every 5 seconds and send those to topic generated.events.

References