-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_connector.py
152 lines (109 loc) · 4.63 KB
/
spark_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import logging
import spacy
from pyspark.sql.functions import lower, udf, current_timestamp, date_trunc, regexp_replace\
, trim, col, explode, split, desc
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StringType
import shutil
import os
import nltk
from nltk.corpus import stopwords
import sys
args = sys.argv
if len(args) != 5:
print("Pass sufficnent arguments.")
print('Help: spark_connector.py <CHECKPOINT_DIR> <BOOTSTRAP_SERVER> <READ_TOPIC> <WRITE_TOPIC>')
# specify the checkpoint directory
CHECKPOINT_DIR = args[1]
BOOTSTRAP_SERVER = args[2]
READ_TOPIC = args[3]
WRITE_TOPIC = args[4]
# delete the checkpoint directory if it exists
if os.path.exists(CHECKPOINT_DIR):
print('Clearing checkpoint dir ', CHECKPOINT_DIR)
shutil.rmtree(CHECKPOINT_DIR)
nltk.download('stopwords')
NER = spacy.load("en_core_web_sm")
conf = SparkConf().setAppName("MyApp").set("spark.jars", "kafka-clients-3.4.0.jar")
# test code to check if a file is loaded
# input_file_path = "/Users/vigneshthirunavukkarasu/Downloads/sample.txt"
# rdd = spark.sparkContext.textFile(input_file_path)
# print(rdd.collect())
# .config(conf=conf) \
# spark = SparkSession \
# .builder \
# .appName("test") \
# .master("local") \
# .getOrCreate()
spark = SparkSession.builder.appName("reddit-comments-stream-application").getOrCreate()
# supress the unwanted logging
spark.sparkContext.setLogLevel("WARN")
logging.getLogger("py4j").setLevel(logging.ERROR)
# Create a DataFrame representing the stream of input lines from Kafka
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", READ_TOPIC) \
.load().selectExpr("CAST(value AS STRING)")
df.printSchema()
words = df
# @udf(returnType=ner_schema)
# @udf(returnType=ArrayType(StringType()))
NER_LIST = ["PERSON","NORP","FAC","ORG","GPE","LOC","PRODUCT","EVENT","WORK_OF_ART","LAW","LANGUAGE"]
@udf(returnType=StringType())
def perform_ner(comments):
text = NER(comments)
return ','.join([ ent.text for ent in text.ents if ent.label_ in NER_LIST])
stop_words = stopwords.words('english')
# filter NER words
words = words.select(perform_ner("value").alias("words"))
# remove stop words
words = words.filter(~col("words").isin(stop_words))
# split sentence into words
words = words.select(explode(split(words.words, ',')))
# remove empty & null words
words = words.filter(col("words").isNotNull() & (col("words") != ""))
words = words.selectExpr("col as words")
# words = words.selectExpr("words as words")
words.printSchema()
words = words.withColumn("words", lower("words"))\
.withColumn("words", regexp_replace("words", "[^a-zA-Z0-9\\s]+", "")) \
.withColumn("words", trim("words")) \
.withColumn("count", lit(1))
# remove empty & null words
words = words.filter(col("words").isNotNull() & (col("words") != ""))
words.printSchema()
# words = words.withColumn("timestamp", date_trunc("second", current_timestamp()))
# words = words.withColumn("timestamp", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
# words = words.withColumn("count", lit(1))
words.printSchema()
window_size = "5 seconds"
# need to do the word count here.
words_count = words.groupBy('words').count()
# wordCounts = words.groupBy('words').count()
words_count = words_count.withColumn("timestamp", date_trunc("second", current_timestamp()))
words_count.printSchema()
# words_tumbling = words.groupBy(window("timestamp", windowDuration=window_size), "words").count().alias("count")
# words = words.withColumn("timestamp", current_timestamp())
# Print the output to the console
# query = words.writeStream.outputMode("append").format("console").start()
# use this schema for pushing to the final topic
final_schema = StructType([
StructField("words", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("count", IntegerType(), True)
])
console_query = words_count.orderBy(desc("count")).writeStream\
.outputMode("complete")\
.format("console")\
.start()
query = words\
.selectExpr("CAST(words AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream.format("kafka").option("kafka.bootstrap.servers", BOOTSTRAP_SERVER) \
.outputMode("update") \
.option("topic", WRITE_TOPIC).option("checkpointLocation", CHECKPOINT_DIR) \
.start()
query.awaitTermination()
console_query.awaitTermination()