-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathutils.py
executable file
·585 lines (543 loc) · 23.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
import json
import hashlib
import pyproj
import requests
import shapely
import jwt
from functools import wraps, partial
from flask import request, jsonify
from shapely import ops
from shapely.wkt import loads
from shapely.geometry import mapping
import geojson
from sqlalchemy import and_
from localStoragePy import localStoragePy
from dbms import app, db
from dbms.models import geoIdsModel
from dbms.models.geoIdsModel import GeoIds
from dbms.models.s2CellTokensModel import S2CellTokens
from dbms.models.cellsGeosMiddleModel import CellsGeosMiddle
from sqlalchemy import func
from datetime import date, timedelta
import geopandas as gpd
localStorage = localStoragePy('asset-registry', 'text')
class Utils:
"""
Utils class for helper functions
"""
# decorator for verifying and fetching the JWT
@staticmethod
def fetch_token(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
token = Utils.get_bearer_token()
headers = request.headers
refresh_token = headers.get('X-Refresh-Token')
# jwt is passed in the request header
if not token and request.cookies.get('access_token_cookie') and request.cookies.get(
'refresh_token_cookie'): # check in cookies if not in headers
token = request.cookies.get('access_token_cookie')
refresh_token = request.cookies.get('refresh_token_cookie')
# return 401 if token is not passed
if not token:
return jsonify({'message': 'Token is missing !!'}), 401
try:
# decoding the payload to fetch the stored details
jwt.decode(token, app.config['SECRET_KEY'], algorithms="HS256")
except:
return jsonify({
'message': 'Token is invalid !!'
}), 401
return f(token, refresh_token, *args, **kwargs)
except Exception as e:
return jsonify({
'message': 'Authentication Error',
'error': f'{e}'
}), 401
return decorated
# decorator for checking if valid token provided
@staticmethod
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
# check for api_key and client_secret before jwt
auth_keys = request.headers.get('API-KEYS-AUTHENTICATION')
if auth_keys:
if not request.headers.get('API-KEY') or not request.headers.get('CLIENT-SECRET'):
return jsonify({'message': 'API Key or Client Secret missing!!'}), 401
if Utils.verify_api_secret_keys(request.headers.get('API-KEY'), request.headers.get('CLIENT-SECRET')):
return f(*args, **kwargs)
else:
return jsonify({
'message': 'Invalid API Key or Client Secret.'
}), 401
token = Utils.get_bearer_token()
if not token:
token = localStorage.getItem('access_token')
try:
# decoding the payload to check for valid token
decoded_token = jwt.decode(token, app.config['SECRET_KEY'], algorithms="HS256")
if not 'logout' in request.url:
# check if user account is activated
if not decoded_token['is_activated']:
return jsonify({
'message': 'User account not activated. Activate your account for the services.',
}), 401
except:
return jsonify({
'message': 'Need to Login.'
}), 401
return f(*args, **kwargs)
return decorated
@staticmethod
def records_s2_cell_tokens(s2_cell_tokens_dict: dict):
"""
creates database records for the s2 cell tokens
:param s2_cell_tokens_dict:
:return:
"""
# tokens_dict = {}
tokens_dict_middle_table = {}
for res_level, s2_cell_tokens in s2_cell_tokens_dict.items():
records_list_s2_cell_tokens_middle_table = []
for s2_cell_token in s2_cell_tokens:
records_list_s2_cell_tokens_middle_table.append(S2CellTokens(cell_token=s2_cell_token))
# tokens_dict is a dictionary with structure e.g. {res_level: s2_cell_token_records_for_the_db}
tokens_dict_middle_table[res_level] = records_list_s2_cell_tokens_middle_table
return tokens_dict_middle_table
@staticmethod
def generate_geo_id(s2_cell_tokens):
"""
each list of `s2_index__L20_list` will always have a unique GEO_ID
:param s2_cell_tokens:
:return:
"""
s2_tuple = tuple(s2_cell_tokens)
m = hashlib.sha256()
# encoding the s2 tokens list
for s in s2_tuple:
m.update(s.encode())
geo_id = m.hexdigest() # <-- geoid
# order matters
return geo_id
@staticmethod
def lookup_geo_ids(geo_id_to_lookup):
"""
check if the geo id (field boundary) is already registered
Returns the fetched Field WKT
:param geo_id_to_lookup:
:return:
"""
exists = None
record = GeoIds.query.filter(GeoIds.geo_id == geo_id_to_lookup).first()
if record:
exists = json.loads(record.geo_data)['wkt']
return exists
@staticmethod
def fetch_domain_from_client_secret():
try:
return jwt.decode(request.headers.get('CLIENT-SECRET'), app.config['SECRET_KEY'], algorithms="HS256")[
'sub']
except Exception as e:
raise e
@staticmethod
def register_field_boundary(geo_id, indices, records_list_s2_cell_tokens_middle_table_dict, field_wkt, country,
boundary_type):
"""
registering the geo id (field boundary) in the database
:param geo_id:
:param indices:
:param records_list_s2_cell_tokens_middle_table_dict:
:param field_wkt:
:param country:
:param boundary_type:
:return:
"""
try:
geo_data = {'wkt': field_wkt}
authority_token = None
domain = Utils.get_domain_from_jwt()
if not domain:
domain = Utils.fetch_domain_from_client_secret()
if domain:
authority_token = Utils.get_authority_token_for_domain(domain)
geo_id_record = GeoIds(geo_id, geo_data, authority_token, country, boundary_type)
# creating the json encoded geo_data for different resolution levels
for res_level, s2_cell_tokens_records in records_list_s2_cell_tokens_middle_table_dict.items():
geo_data[res_level] = indices[res_level]
# linking the s2 cell token records with the geo id for the middle table
existing_records = S2CellTokens.query.filter(
S2CellTokens.cell_token.in_(
[s2_cell_tokens_record.cell_token for s2_cell_tokens_record in s2_cell_tokens_records]))
existing_cell_tokens = [existing_record.cell_token for existing_record in list(existing_records)]
ls_records_to_create = [s2_cell_tokens_record for s2_cell_tokens_record in s2_cell_tokens_records if
s2_cell_tokens_record.cell_token not in existing_cell_tokens]
geo_id_record.s2_cell_tokens = geo_id_record.s2_cell_tokens + ls_records_to_create + list(
existing_records)
geo_data = json.dumps(geo_data)
geo_id_record.geo_data = geo_data
# populating the cell tokens, geo id and the middle table in the database
# bulk insertions for tables
# return_defaults as True sets the Id for the record to be inserted
db.session.bulk_save_objects([geo_id_record], return_defaults=True)
db.session.bulk_save_objects(geo_id_record.s2_cell_tokens, return_defaults=True)
ls_middle_table_records = [CellsGeosMiddle(geo_id=geo_id_record.id, cell_id=s2_cell_token_record.id) for
s2_cell_token_record in geo_id_record.s2_cell_tokens]
db.session.bulk_save_objects(ls_middle_table_records)
db.session.commit()
return geo_data
except Exception as e:
raise e
@staticmethod
def fetch_geo_ids_for_cell_tokens(s2_cell_tokens, domain, boundary_type=None):
"""
fetch the geo ids which at least have one token from the tokens list given
Optional domain filter
:param s2_cell_tokens:
:param domain:
:param boundary_type
:return:
"""
# fetching the distinct geo ids for the cell tokens
geo_ids = []
if domain:
authority_token = Utils.get_authority_token_for_domain(domain)
if authority_token:
geo_ids = db.session.query(GeoIds.geo_id).distinct().join(CellsGeosMiddle).join(S2CellTokens).filter(
S2CellTokens.cell_token.in_(set(s2_cell_tokens)), GeoIds.authority_token == authority_token)
else:
geo_ids = db.session.query(GeoIds.geo_id).distinct().join(CellsGeosMiddle).join(S2CellTokens).filter(
S2CellTokens.cell_token.in_(set(s2_cell_tokens)))
if boundary_type and boundary_type != "all" and not isinstance(geo_ids, list):
geo_ids = geo_ids.filter(GeoIds.boundary_type == boundary_type)
geo_ids = [r.geo_id for r in geo_ids]
return geo_ids
@staticmethod
def check_percentage_match(matched_geo_ids, s2_index__l13_list, resolution_level, threshold):
"""
Return the Geo Ids which overlap for a certain threshold
:param matched_geo_ids:
:param s2_index__l13_list:
:param resolution_level:
:param threshold:
:return:
"""
percentage_matched_geo_ids = []
for matched_geo_id in matched_geo_ids:
# fetch s2 cell tokens against a geo id
geo_id_cell_tokens = json.loads(GeoIds.query.filter(GeoIds.geo_id == matched_geo_id).first().geo_data)[
str(resolution_level)]
percentage_match = len(set(s2_index__l13_list) & set(geo_id_cell_tokens)) / float(
len(set(s2_index__l13_list) | set(geo_id_cell_tokens))) * 100
if percentage_match > threshold:
percentage_matched_geo_ids.append(matched_geo_id)
return percentage_matched_geo_ids
@staticmethod
def is_valid_polygon(field_wkt):
"""
Check if a valid polygon
:param field_wkt:
:return:
"""
try:
poly = shapely.wkt.loads(field_wkt)
if poly.geom_type == 'Polygon':
return True
else:
return False
except Exception as e:
print(e)
return False
@staticmethod
def get_percentage_overlap_two_fields(geo_id_field_1, geo_id_field_2):
"""
Determine what is the % overlap of the 2 fields
For Resolution Level 20
Getting overlap of smaller field from the larger one
:param geo_id_field_1:
:param geo_id_field_2:
:return:
"""
try:
field_1 = set(json.loads(GeoIds.query.filter(GeoIds.geo_id == geo_id_field_1).first().geo_data)[
str('20')])
field_2 = set(json.loads(GeoIds.query.filter(GeoIds.geo_id == geo_id_field_2).first().geo_data)[
str('20')])
overlap = field_1 & field_2
percentage_overlap = (len(overlap) / len(field_1)) * 100 if len(field_1) > len(field_2) else (
len(overlap) / len(
field_2)) * 100
except AttributeError:
raise AttributeError('Please provide valid Geo Ids.')
return percentage_overlap
@staticmethod
def fetch_fields_for_cell_tokens(s2_cell_tokens_13, s2_cell_tokens_20, s2_index=None):
"""
Checks if token exists in L13 and L20
Two way search
Fetch the fields
:param s2_cell_tokens_20:
:param s2_cell_tokens_13:
:param s2_index:
:return:
"""
fields_to_return = []
if s2_index:
s2_index_to_fetch = [int(i) for i in s2_index.split(',')]
s2_indexes_to_remove = Utils.get_s2_indexes_to_remove(s2_index_to_fetch)
geo_ids = db.session.query(GeoIds.geo_id).distinct().join(CellsGeosMiddle).join(S2CellTokens).filter(
S2CellTokens.cell_token.in_(s2_cell_tokens_13)
)
geo_ids_L13 = [r.geo_id for r in geo_ids]
# two way check for L20
geo_ids = db.session.query(GeoIds.geo_id).distinct().join(CellsGeosMiddle).join(S2CellTokens).filter(
and_(S2CellTokens.cell_token.in_(s2_cell_tokens_20), GeoIds.geo_id.in_(geo_ids_L13))
)
geo_ids = [r.geo_id for r in geo_ids]
for geo_id in geo_ids:
geo_data = json.loads(GeoIds.query.filter(GeoIds.geo_id == geo_id).first().geo_data)
geo_data_to_return = {}
if s2_index and s2_indexes_to_remove != -1:
geo_data_to_return = Utils.get_specific_s2_index_geo_data(json.dumps(geo_data), s2_indexes_to_remove)
geo_data_to_return['Geo JSON'] = Utils.get_geo_json(geo_data['wkt'])
fields_to_return.append({geo_id: geo_data_to_return})
return fields_to_return
@staticmethod
def fetch_fields_for_a_point_two_way(s2_cell_token_13, s2_cell_token_20, domain, s2_index=None, boundary_type=None):
"""
Checks if token exists in L13, then further checks for L20
Returns the fields if token exists at both the levels
Optional domain filter
:param s2_cell_token_13:
:param s2_cell_token_20:
:param domain:
:param s2_index:
:return:
"""
geo_ids = []
if s2_index:
s2_index_to_fetch = [int(i) for i in s2_index.split(',')]
s2_indexes_to_remove = Utils.get_s2_indexes_to_remove(s2_index_to_fetch)
if domain:
authority_token = Utils.get_authority_token_for_domain(domain)
if authority_token:
geo_ids = db.session.query(GeoIds.geo_id).distinct().join(CellsGeosMiddle).join(S2CellTokens).filter(
S2CellTokens.cell_token == s2_cell_token_13, GeoIds.authority_token == authority_token)
else:
geo_ids = db.session.query(GeoIds.geo_id).distinct().join(CellsGeosMiddle).join(S2CellTokens).filter(
S2CellTokens.cell_token == s2_cell_token_13)
if boundary_type and boundary_type != "all":
geo_ids = geo_ids.filter(GeoIds.boundary_type == boundary_type)
geo_ids = [r.geo_id for r in geo_ids]
fields_to_return = []
for geo_id in geo_ids:
geo_data_to_return = {}
geo_data_obj = GeoIds.query.filter(GeoIds.geo_id == geo_id).first()
geo_data = json.loads(geo_data_obj.geo_data)
geo_data['boundary_type'] = geo_data_obj.boundary_type
if s2_index and s2_indexes_to_remove != -1:
geo_data_to_return = Utils.get_specific_s2_index_geo_data(json.dumps(geo_data), s2_indexes_to_remove)
if s2_cell_token_13 in geo_data['13'] and s2_cell_token_20 in geo_data['20']:
geo_data_to_return['Geo JSON'] = Utils.get_geo_json(geo_data['wkt'])
fields_to_return.append({geo_id: geo_data_to_return})
return fields_to_return
@staticmethod
def fetch_fields_for_geo_ids(geo_ids, s2_index=None):
fields_to_return = []
if s2_index:
s2_index_to_fetch = [int(i) for i in s2_index.split(',')]
s2_indexes_to_remove = Utils.get_s2_indexes_to_remove(s2_index_to_fetch)
for geo_id in geo_ids:
geo_data_to_return = {}
geo_data_object = GeoIds.query.filter(GeoIds.geo_id == geo_id).first()
geo_data = json.loads(geo_data_object.geo_data)
if s2_index and s2_indexes_to_remove != -1:
geo_data_to_return = Utils.get_specific_s2_index_geo_data(json.dumps(geo_data), s2_indexes_to_remove)
geo_data_to_return['Geo JSON'] = Utils.get_geo_json(geo_data['wkt'])
geo_data_to_return['boundary_type'] = geo_data_object.boundary_type
fields_to_return.append({geo_id: geo_data_to_return})
return fields_to_return
@staticmethod
def get_domain_from_jwt():
"""
Get domain of the logged-in user
:return:
"""
try:
token = Utils.get_bearer_token()
domain = jwt.decode(token, app.config['SECRET_KEY'], algorithms="HS256")['domain']
return domain
except Exception as e:
return False
@staticmethod
def get_authority_token_for_domain(domain):
"""
Fetch the authority token against a domain from User Registry
:param domain:
:return:
"""
res = requests.get(app.config['USER_REGISTRY_BASE_URL'] + f'/authority-token/?domain={domain}', timeout=2)
if res and res.json() and res.json().keys() and 'Authority Token' in res.json().keys():
return res.json()['Authority Token']
return None
@staticmethod
def get_s2_indexes_to_remove(s2_indexes):
"""
Fetches the S2 indexes from the given list, which are not required in the JSON response
:param s2_indexes:
:return:
"""
valid_s2_indexes_set = set([8, 13, 15, 18, 19, 20])
s2_indexes_set = set(s2_indexes)
if valid_s2_indexes_set & s2_indexes_set:
return list(valid_s2_indexes_set - s2_indexes_set)
else:
return -1
@staticmethod
def get_specific_s2_index_geo_data(geo_data, s2_indexes_to_remove):
"""
Get only specific S2 indexes data in geo_data (json data)
:param geo_data:
:param s2_indexes_to_remove:
:return:
"""
geo_data = json.loads(geo_data)
for key in s2_indexes_to_remove:
del geo_data[str(key)]
return geo_data
@staticmethod
def get_are_in_acres(wkt):
"""
Fetch the area in acres for the given field (wkt)
:param wkt:
:return:
"""
geom = loads(wkt)
geom_area = ops.transform(
partial(
pyproj.transform,
pyproj.Proj(init='EPSG:4326'),
pyproj.Proj(
proj='aea',
lat_1=geom.bounds[1],
lat_2=geom.bounds[3])),
geom)
# Return the area in km^2
area_in_sq_km = geom_area.area / 1000000
area_in_acres = area_in_sq_km * 247.105
return area_in_acres
@staticmethod
def get_geo_json(field_wkt):
"""
Fetch the Geo JSON for the given field WKT
:param field_wkt:
:return:
"""
geojson_dict = {"type": "Feature"}
geojson_string = geojson.dumps(mapping(loads(field_wkt)))
geojson_dict["geometry"] = json.loads(geojson_string)
return geojson_dict
@staticmethod
def get_row_count_by_month():
"""
Fetch row count by month
:return:
"""
end_date = date.today()
start_date = end_date - timedelta(days=365)
rows = (
db.session.query(
func.date_trunc('month', GeoIds.created_at).label('month'),
func.count().label('count')
)
.filter(GeoIds.created_at >= start_date)
.group_by(func.date_trunc('month', GeoIds.created_at))
.order_by(func.date_trunc('month', GeoIds.created_at))
.all()
)
data_by_month = [{'month': row.month.strftime('%B'), 'count': row.count} for row in rows]
return data_by_month
@staticmethod
def get_row_count_by_country():
"""
Fetch row count by country
:return:
"""
rows = (
db.session.query(GeoIds.country.label('country'), db.func.count().label('count')).group_by(
GeoIds.country).all()
)
count_by_country = [{'country': row.country, 'count': row.count} for row in rows]
return count_by_country
@staticmethod
def get_country_from_point(p):
"""
Fetch country name
:return:
"""
# read shp file for country
worldShpFile = app.static_folder + '/99bfd9e7-bb42-4728-87b5-07f8c8ac631c2020328-1-1vef4ev.lu5nk.shp'
wrs_gdf = gpd.read_file(worldShpFile)
wrs_gdf = wrs_gdf.to_crs(4326)
try:
return wrs_gdf[wrs_gdf.contains(p)].reset_index(drop=True).CNTRY_NAME.iloc[0]
except Exception as e:
return ''
@staticmethod
def get_fields_count_by_domain():
"""
Fetch the fields count registered against the domains
:return:
"""
try:
rows = (
db.session.query(GeoIds.authority_token.label('authority_token'),
db.func.count().label('count')).group_by(GeoIds.authority_token).all()
)
count_by_authority_tokens = [{'authority_token': row.authority_token, 'count': row.count} for row in rows if
row.authority_token is not None]
return count_by_authority_tokens
except Exception as e:
raise e
@staticmethod
def fetch_field_by_geoid(geo_id):
try:
field = geoIdsModel.GeoIds.query \
.filter_by(geo_id=geo_id) \
.first()
if not field:
raise Exception("Field not found, invalid Geo Id.")
return field
except Exception as e:
raise e
@staticmethod
def fetch_field_centroid_by_wkt(wkt):
try:
p = shapely.wkt.loads(wkt)
c = p.centroid
lon = c.x
lat = c.y
return [lat, lon]
except Exception as e:
raise e
@staticmethod
def get_bearer_token():
token = None
bearer = request.headers.get('Authorization') # Bearer JWT token here
if bearer and len(bearer.split()) > 1:
token = bearer.split()[1] # JWT token
return token
@staticmethod
def verify_api_secret_keys(api_key, client_secret):
"""
Verify if the API Key and the Client Secret are valid for a user
:return:
"""
try:
headers = {'API-KEY': api_key, 'CLIENT-SECRET': client_secret}
res = requests.get(app.config['USER_REGISTRY_BASE_URL'] + '/verify-api-secret-keys', headers=headers,
timeout=2)
return res.json()["message"]
except Exception as e:
raise e