-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.py
28 lines (21 loc) · 827 Bytes
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import sys
from kafka import KafkaConsumer
from confluent_avro.schema_registry import SchemaRegistry
from confluent_avro.serde import AvroKeyValueSerde
SCHEMA_REGISTRY_URL = "http://localhost:8081"
KAFKA_TOPIC = "telecom_italia_data"
registry_client = SchemaRegistry(SCHEMA_REGISTRY_URL)
avro_serde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)
if __name__ == "__main__":
group_id = ".".join(sys.argv)
consumer = KafkaConsumer(
KAFKA_TOPIC,
group_id=group_id,
bootstrap_servers=["localhost:9092"],
)
print("{} start consuming!...".format(group_id))
for msg in consumer:
print("\nmessage recieved!" + "=" * 50 + "\n")
k = avro_serde.key.deserialize(msg.key)
v = avro_serde.value.deserialize(msg.value)
print(msg.offset, msg.partition, k, v)