-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtweepy_tweet_streamer.py
56 lines (38 loc) · 1.41 KB
/
tweepy_tweet_streamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import kafka
import tweepy
import configparser
def GetTokensV1(fpath):
config = configparser.ConfigParser()
config.read(fpath)
main_config = config['MAIN']
consumer_key = main_config['consumer_key']
consumer_secret = main_config['consumer_secret']
access_key = main_config['access_token']
access_secret = main_config['access_token_secret']
return consumer_key, consumer_secret, access_key, access_secret
def GetTokenV2(fpath):
return open(fpath).read()
class TweetStreamer(tweepy.StreamingClient):
def __init__(self, *args):
super().__init__(*args)
self.producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')
def on_errors(self, errors):
print("Error received", errors)
return super().on_errors(errors)
def on_tweet(self, tweet):
print('Received tweet with lang=', tweet.lang)
if tweet.lang in ('en', 'uk', 'ru'):
print("On tweet:", tweet.data)
self.producer.send('tweet-stream', tweet.text.encode('utf-8'))
return super().on_tweet(tweet)
def test1():
token = GetTokenV2('data/credentials_v2.txt')
client = tweepy.Client(token)
resp = client.get_user(username='gleb_pilipets')
print(resp)
def test2():
token = GetTokenV2('data/credentials_v2.txt')
stream = TweetStreamer(token)
stream.sample(tweet_fields=['lang'])
if __name__ == '__main__':
test2()