Skip to content

Commit 55265da

Browse files
Initial Commit
1 parent bebdd70 commit 55265da

8 files changed

+1512
-0
lines changed

AvroConsumer-V2/ClickRecord.java

+686
Large diffs are not rendered by default.
+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.consumer.*;
3+
4+
5+
public class ClickRecordConsumerV2{
6+
7+
public static void main(String[] args) throws Exception{
8+
9+
String topicName = "AvroClicks";
10+
11+
String groupName = "RG";
12+
Properties props = new Properties();
13+
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
14+
props.put("group.id", groupName);
15+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16+
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
17+
props.put("schema.registry.url", "http://localhost:8081");
18+
props.put("specific.avro.reader", "true");
19+
20+
KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props);
21+
consumer.subscribe(Arrays.asList(topicName));
22+
try{
23+
while (true){
24+
ConsumerRecords<String, ClickRecord> records = consumer.poll(100);
25+
for (ConsumerRecord<String, ClickRecord> record : records){
26+
System.out.println("Session id="+ record.value().getSessionId()
27+
+ " Channel=" + record.value().getChannel()
28+
+ " Entry URL=" + record.value().getEntryUrl()
29+
+ " Language=" + record.value().getLanguage());
30+
}
31+
}
32+
}catch(Exception ex){
33+
ex.printStackTrace();
34+
}
35+
finally{
36+
consumer.close();
37+
}
38+
}
39+
40+
}

AvroConsumer-V2/ClickRecordV2.avsc

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{"type": "record",
2+
"name": "ClickRecord",
3+
"fields": [
4+
{"name": "session_id", "type": "string"},
5+
{"name": "browser", "type": ["string", "null"]},
6+
{"name": "campaign", "type": ["string", "null"]},
7+
{"name": "channel", "type": "string"},
8+
{"name": "entry_url", "type": ["string", "null"], "default": "None"},
9+
{"name": "ip", "type": ["string", "null"]},
10+
{"name": "language", "type": ["string", "null"], "default": "None"},
11+
{"name": "os", "type": ["string", "null"],"default": "None"}
12+
]
13+
}

AvroConsumer-V2/build.sbt

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
name := "AvroTest"
2+
3+
val repositories = Seq(
4+
"confluent" at "http://packages.confluent.io/maven/",
5+
Resolver.sonatypeRepo("public")
6+
)
7+
8+
libraryDependencies ++= Seq(
9+
"org.apache.avro" % "avro" % "1.8.1",
10+
"io.confluent" % "kafka-avro-serializer" % "3.1.1",
11+
"org.apache.kafka" % "kafka-clients" % "0.10.1.0"
12+
exclude("javax.jms", "jms")
13+
exclude("com.sun.jdmk", "jmxtools")
14+
exclude("com.sun.jmx", "jmxri")
15+
exclude("org.slf4j", "slf4j-simple")
16+
)
17+
18+
resolvers += "confluent" at "http://packages.confluent.io/maven/"

0 commit comments

Comments
 (0)