forked from xdevplatform/parking
-
Notifications
You must be signed in to change notification settings - Fork 0
/
parking_filtered_stream.py
129 lines (89 loc) · 3.38 KB
/
parking_filtered_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import requests
import json
from requests.auth import AuthBase
from twilio_connect_demo import twilio_connect, send_message
consumer_key = os.environ.get("CONSUMER_KEY")
consumer_secret = os.environ.get("CONSUMER_SECRET")
stream_url = "https://api.twitter.com/labs/1/tweets/stream/filter"
rules_url = "https://api.twitter.com/labs/1/tweets/stream/filter/rules"
sample_rules = [{"value": "from: nycasp"}]
class BearerTokenAuth(AuthBase):
def __init__(self, consumer_key, consumer_secret):
self.bearer_token_url = "https://api.twitter.com/oauth2/token"
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.bearer_token = self.get_bearer_token()
def get_bearer_token(self):
response = requests.post(
self.bearer_token_url,
auth=(self.consumer_key, self.consumer_secret),
data={"grant_type": "client_credentials"},
headers={"User-Agent": "TwitterDevFilteredStreamQuickStartPython"},
)
if response.status_code is not 200:
raise Exception(
f"Cannot get a Bearer token (HTTP %d): %s"
% (response.status_code, response.text)
)
body = response.json()
return body["access_token"]
def __call__(self, r):
r.headers["Authorization"] = f"Bearer %s" % self.bearer_token
r.headers["User-Agent"] = "TwitterDevFilteredStreamQuickStartPython"
return r
def get_all_rules(auth):
response = requests.get(rules_url, auth=auth)
if response.status_code is not 200:
raise Exception(
f"Cannot get rules (HTTP %d): %s" % (response.status_code, response.text)
)
return response.json()
def delete_all_rules(rules, auth):
if rules is None or "data" not in rules:
return None
ids = list(map(lambda rule: rule["id"], rules["data"]))
payload = {"delete": {"ids": ids}}
response = requests.post(rules_url, auth=auth, json=payload)
if response.status_code is not 200:
raise Exception(
f"Cannot delete rules (HTTP %d): %s" % (response.status_code, response.text)
)
def set_rules(rules, auth):
if rules is None:
return
payload = {"add": rules}
response = requests.post(rules_url, auth=auth, json=payload)
if response.status_code is not 201:
raise Exception(
f"Cannot create rules (HTTP %d): %s" % (response.status_code, response.text)
)
def stream_connect(auth):
response = requests.get(stream_url, auth=auth, stream=True)
for response_line in response.iter_lines():
if response_line:
pl = json.loads(response_line)
return pl
bearer_token = BearerTokenAuth(consumer_key, consumer_secret)
def setup_rules(auth):
current_rules = get_all_rules(auth)
delete_all_rules(current_rules, auth)
set_rules(sample_rules, auth)
setup_rules(bearer_token)
def parking_logic(pl_text, client):
if "suspended" in pl_text:
if "tomorrow" in pl_text:
send_message(client=client)
print("text sent")
else:
print("not today, friend")
else:
print("not today, friend")
def main():
pl = stream_connect(bearer_token)
pl_text = pl["data"]["text"]
print(pl_text)
client = twilio_connect()
parking_logic(pl_text, client)
if __name__ == "__main__":
main()