-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtweet_reader.py
126 lines (87 loc) · 3.48 KB
/
tweet_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import streaming as S
import requests
from textblob import TextBlob
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell'
def send_top_count_words(df, epoch_id, num, api):
df = df.head(num)
words, counts = [], []
for row in df:
row = row.asDict()
words.append(row.get('word'))
counts.append(row.get('count'))
stats_json = {'words': words, 'count': counts}
print('Sent data at epoch_id=', epoch_id, ',data=', stats_json)
response = requests.post(api, json=stats_json)
print('Sent response', response)
def remove_punctuation(column):
punct_string = ''.join(r'\{}'.format(ch)
for ch in r"""()[]{}""'':;,.!?\-/+*<=>&|""")
return F.lower(F.trim(F.regexp_replace(column, '[{}]'.format(punct_string), '')))
def get_top_hashtags(df):
if df.empty: return
words = df \
.select(F.explode(F.split(remove_punctuation(df.value), " +")).alias("word")) \
.filter("word like '#%'")
words = words.groupBy("word").count()
top_words = words.orderBy(F.col("count").desc())
query = top_words\
.writeStream \
.outputMode("complete") \
.foreachBatch(lambda df, epoch_id: send_top_count_words(df, epoch_id, 10, 'http://localhost:10000/update_hashtags_count')) \
.trigger(processingTime='2 seconds')\
.start()
return query
def get_sentiment(text):
sent = TextBlob(text).sentiment.polarity
neutral_limit = 0.05
if sent >= neutral_limit:
return (1, 0, 0) # positive
elif sent > -neutral_limit:
return (0, 1, 0) # neutral
else:
return (0, 0, 1) # negative
def send_sentiment_counter(df, epoch_id, api):
if df.empty: return
pos, neutral, neg = df.first()[0]
total = pos + neutral + neg
sentiment_json = {'positive': pos/total, 'neutral': neutral /
total, 'negative': neg/total, 'total': total}
print('Sent data at epoch_id=', epoch_id, ',data=', sentiment_json)
response = requests.post(api, json=sentiment_json)
print('Sent response', response)
def get_sentiment_counter(df):
get_sentiment_udf = F.udf(
get_sentiment, T.ArrayType(T.IntegerType(), False))
tweets_sentiment_df = df \
.select(get_sentiment_udf('value').alias('sentiment')) \
sum_sentiment = tweets_sentiment_df \
.agg(F.array(*[F.sum(F.col("sentiment").getItem(i)) for i in range(3)]))
query = sum_sentiment\
.writeStream \
.outputMode("complete") \
.foreachBatch(lambda df, epoch_id: send_sentiment_counter(df, epoch_id, 'http://localhost:10000/update_sentiment_counters')) \
.trigger(processingTime='2 seconds')\
.start()
return query
def main():
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "tweet-stream") \
.option("startingOffsets", "earliest") \
.load()
df = df.selectExpr("CAST(value AS STRING)")
query1 = get_top_hashtags(df)
query2 = get_sentiment_counter(df)
query1.awaitTermination()
query2.awaitTermination()
if __name__ == '__main__':
main()