-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils_auth.py
169 lines (133 loc) · 5.45 KB
/
utils_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import datetime
import os
import requests
from oauthlib.oauth2 import LegacyApplicationClient, BackendApplicationClient
from requests_oauthlib import OAuth2Session
import config
def get_authorization_token(local=False) -> dict:
"""
Returns authorization token of API_USER.
@param local: if True API runs on localhost and needs some parameters to be set
@return: token as dict
"""
if local:
password = config.API_USER_LOCAL_PASS
else:
password = config.API_USER_PASS
token = get_auth_token_of_user(config.API_USER, password, local=local)
return token
def get_auth_token_of_user(email, password, local=False):
if local:
set_env_for_local_oauthlib()
TOKEN_URL = config.token_local
else:
TOKEN_URL = config.token_staging
my_client = LegacyApplicationClient(client_id=config.CLIENT_ID)
oauth = OAuth2Session(client=my_client)
try:
token = oauth.fetch_token(
token_url=TOKEN_URL, username=email, password=password,
client_id=config.CLIENT_ID, client_secret=config.CLIENT_SECRET)
except Exception as e:
print('Fetching token caused exception, type: ' + str(type(e)))
print(str(e))
raise
return token
def get_auth_token_secret() -> dict:
"""
Returns authorization token in case when only
CLIENT_ID and CLIENT_SECRET are needed for authorization.
@return: token as dict
"""
TOKEN_URL = config.token_staging
my_client = BackendApplicationClient(client_id=config.CLIENT_ID)
oauth = OAuth2Session(client=my_client)
try:
token = oauth.fetch_token(
token_url=TOKEN_URL, client_id=config.CLIENT_ID,
client_secret=config.CLIENT_SECRET)
except Exception as e:
print('Fetching token caused exception, type: ' + str(type(e)))
print(str(e))
raise
return token
def get_protected_resource(endpoint, token, headers=None):
resp = _get_protected_resource(
endpoint, config.CLIENT_ID, token, headers=headers)
return resp
def _get_protected_resource(endpoint, client_id, token,
get_timeout=config.TIMEOUT, headers=None):
try:
client = OAuth2Session(client_id, token=token)
if headers is None:
resp = client.get(endpoint, timeout=get_timeout)
else:
resp = client.get(endpoint, timeout=get_timeout, headers=headers)
return resp
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout):
error_resp = _create_error_response(get_timeout)
return error_resp
def create_protected_resource(endpoint, token, payload=None):
if payload is None:
payload = {}
resp = _create_protected_resource(
endpoint, config.CLIENT_ID, token, payload)
return resp
def _create_protected_resource(endpoint, client_id, token, body,
post_timeout=config.TIMEOUT_POST):
try:
client = OAuth2Session(client_id, token=token)
resp = client.post(url=endpoint, json=body, timeout=post_timeout)
return resp
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout):
error_resp = _create_error_response(post_timeout)
return error_resp
def put_protected_resource(endpoint, token, payload=None):
if payload is None:
payload = {}
resp = _put_protected_resource(endpoint, config.CLIENT_ID, token, payload)
return resp
def _put_protected_resource(endpoint, client_id, token, body,
put_timeout=config.TIMEOUT_POST):
try:
client = OAuth2Session(client_id, token=token)
resp = client.put(url=endpoint, json=body, timeout=put_timeout)
return resp
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout):
error_resp = _create_error_response(put_timeout)
return error_resp
def patch_protected_resource(endpoint, token):
resp = _patch_protected_resource(endpoint, config.CLIENT_ID, token)
return resp
def _patch_protected_resource(endpoint, client_id, token,
patch_timeout=config.TIMEOUT_POST):
try:
client = OAuth2Session(client_id, token=token)
resp = client.patch(url=endpoint, timeout=patch_timeout)
return resp
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout):
error_resp = _create_error_response(patch_timeout)
return error_resp
def delete_protected_resource(endpoint, token, delete_timeout=config.TIMEOUT_POST):
try:
client = OAuth2Session(config.CLIENT_ID, token=token)
resp = client.delete(endpoint, timeout=delete_timeout)
return resp
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout):
error_resp = _create_error_response(delete_timeout)
return error_resp
def set_env_for_local_oauthlib():
# This has to be set if you your API uses HTTP instead of HTTPS
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
def _create_error_response(timeout):
error_resp = _create_408_response()
error_resp = _set_elapsed_time_in_response(error_resp, timeout)
return error_resp
def _create_408_response():
resp = requests.models.Response()
resp.status_code = 408
resp._content = '{"errors":"requests.exceptions.Timeout"}'
return resp
def _set_elapsed_time_in_response(response, timeout):
response.elapsed = datetime.timedelta(seconds=timeout)
return response