-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_service.py
197 lines (143 loc) · 6.05 KB
/
db_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import base64
import datetime
import json
import os
import time
from functools import lru_cache
import cachetools.func
import requests
from firebase_admin import firestore
# Use the application default credentials
import firebase_admin
from firebase_admin import credentials
from firebase_admin import messaging
from pytz import timezone
from api_service import get_all_dist_codes_api, get_dist_vaccination_calendar
os.environ['TZ'] = 'Asia/Kolkata'
time.tzset()
def _get_firebase_credentials():
if 'FIREBASE_CONFIG_BASE64' in os.environ:
# created encoded config and saved in var in heroku
# openssl base64 -in <firebaseConfig.json> -out <firebaseConfigBase64.txt> -A
config_json_base64 = os.environ['FIREBASE_CONFIG_BASE64']
base64_bytes = config_json_base64.encode('ascii')
message_bytes = base64.b64decode(base64_bytes)
message = message_bytes.decode('ascii')
cred = credentials.Certificate(json.loads(message))
else:
cred = credentials.Certificate("vaccineslotavailability-firebase-adminsdk-6jiff-78515b332d.json")
return cred
firebase_admin.initialize_app(_get_firebase_credentials())
db = firestore.client()
def _get_topic_from_dist_id(dist_id):
return "/topics/{}".format(dist_id)
def _get_slot_document_key(dist_id):
return "dist_{}".format(dist_id)
@cachetools.func.ttl_cache(maxsize=1, ttl=10 * 60 * 60)
def get_all_dist_codes_db():
doc_ref = db.collection(u'static').document(u'dist_info')
document = doc_ref.get().to_dict()
return document["dist_info_list"]
@lru_cache(maxsize=100)
def get_dist_id_from_name_db(dist_name):
dist_codes = get_all_dist_codes_db()
name_code_dict = dict((d["dist_name"], d["dist_id"]) for d in dist_codes)
return name_code_dict.get(dist_name)
@lru_cache(maxsize=100)
def get_dist_name_from_id_db(dist_id):
dist_codes = get_all_dist_codes_db()
name_code_dict = dict((d["dist_id"], d["dist_name"]) for d in dist_codes)
return name_code_dict.get(dist_id)
@cachetools.func.ttl_cache(maxsize=300, ttl=10 * 60)
def get_slots_by_dist_id_db(dist_id):
key = _get_slot_document_key(dist_id)
slots_doc = db.collection(u'slots').document(key).get()
if slots_doc.exists:
db_slots = slots_doc.to_dict()["vaccine_slots"]
datetime_format = "%d-%m-%Y %H:%M:%S"
update_ts = slots_doc.to_dict()["update_ts"]
update_ts = update_ts.astimezone(timezone('Asia/Kolkata'))
update_ts = update_ts.strftime(datetime_format)
res_slots = []
for slot in db_slots:
slot["update_ts"] = update_ts
res_slots.append(slot)
return res_slots
return []
def add_subscriber_to_topic(token, dist_id):
registration_tokens = [token]
topic = _get_topic_from_dist_id(dist_id)
response = messaging.subscribe_to_topic(registration_tokens, topic)
if dist_id not in _get_all_subscribed_dists_from_db():
doc_ref = db.collection(u'static').document(u'topics_subscribed')
dist_name = get_dist_name_from_id_db(dist_id)
doc = {str(dist_id): {"name": dist_name, "count": 0}}
doc_ref.set(doc, merge=True)
return response.success_count
def delete_subscriber_from_topic(token, dist_id):
registration_tokens = [token]
topic = _get_topic_from_dist_id(dist_id)
response = messaging.unsubscribe_from_topic(registration_tokens, topic)
return str(response.success_count)
def send_test_notification(token, dist_name):
time.sleep(6)
title = "Test Notification"
body = "This is how we will notify you when slots would be available in {}".format(dist_name)
message = messaging.Message(
notification=messaging.Notification(
title=title,
body=body,
),
token=token
)
response = messaging.send(message)
return response
def notify_all_subscribers(dist_id, dist_name, date, num_slots):
title = "Vaccine Slots Available"
body = "{} has {} slots on {}".format(dist_name, num_slots, date)
message = messaging.Message(
notification=messaging.Notification(
title=title,
body=body,
),
topic=_get_topic_from_dist_id(dist_id)
)
response = messaging.send(message)
print("Notifying : {} ... Response : {}".format(dist_name, str(response)))
return response
@cachetools.func.ttl_cache(maxsize=200, ttl=1 * 60 * 60)
def get_trend_for_dist_id(dist_id):
key = _get_slot_document_key(dist_id)
doc_ref = db.collection(u'trend').document(key).get()
datetime_format = "%d-%m-%Y %H:%M:%S"
past_data = doc_ref.to_dict().get("past", []) if doc_ref.exists else []
past_data = list(sorted(past_data, key=lambda x: datetime.datetime.strptime(x["ts"], datetime_format)))
today = datetime.datetime.now().date()
response = dict()
for data in past_data:
time_str = data["ts"]
num_slots = data["num_slots"]
datetime_obj = datetime.datetime.strptime(time_str, datetime_format)
today_date = datetime.datetime.now().date()
forced_ts = datetime.datetime.combine(today_date, datetime_obj.time())
delta_days = (today - datetime_obj.date()).days
if delta_days > 6:
continue
array = response.get(delta_days, [])
array.append({"ts": forced_ts.timestamp(), # datetime_obj.time().strftime("%H:%M:%S"),
"num_slots": num_slots})
response[delta_days] = array
return response
def get_all_subscribed_dist_ids(token):
with open("fcm_auth_key.txt") as f:
auth_token = f.read()
auth_key = "key={}".format(auth_token)
url = "https://iid.googleapis.com/iid/info/{}".format(token)
response = requests.get(url, params={"details": True}, headers={"Authorization": auth_key})
print(response.json())
return list(response.json()["rel"]["topics"].keys())
@cachetools.func.ttl_cache(maxsize=2, ttl=10 * 60)
def _get_all_subscribed_dists_from_db():
doc = db.collection(u'static').document(u'topics_subscribed').get()
if doc.exists:
return list(map(lambda x: int(x), doc.to_dict().keys()))