2
2
import time
3
3
import typing
4
4
from io import BytesIO
5
- from typing import Any , Dict , List , Optional , Tuple
5
+ from typing import Dict , List , Optional , Tuple
6
6
from urllib .parse import urlparse
7
7
8
8
from django .conf import settings
9
9
10
10
import requests
11
+ from requests import Response
11
12
from requests .adapters import HTTPAdapter
12
13
from requests .exceptions import RetryError
13
14
from requests .packages .urllib3 .util .retry import Retry
14
15
15
- from hct_mis_api .apps .core .kobo .common import filter_by_owner
16
- from hct_mis_api .apps .core .models import BusinessArea , XLSXKoboTemplate
16
+ from hct_mis_api .apps .core .models import XLSXKoboTemplate
17
17
from hct_mis_api .apps .utils .exceptions import log_and_raise
18
18
19
19
logger = logging .getLogger (__name__ )
20
20
21
21
22
- class TokenNotProvided (Exception ):
23
- pass
24
-
25
-
26
- class TokenInvalid (Exception ):
22
+ class CountryCodeNotProvided (Exception ):
27
23
pass
28
24
29
25
30
26
class KoboRequestsSession (requests .Session ):
31
- AUTH_DOMAINS = [urlparse (settings .KOBO_URL ).hostname ]
27
+ AUTH_DOMAINS = [urlparse (settings .KOBO_URL ).hostname , urlparse ( settings . KOBO_URL ). hostname ]
32
28
33
29
def should_strip_auth (self , old_url : str , new_url : str ) -> bool :
34
30
new_parsed = urlparse (new_url )
35
- if new_parsed .hostname in KoboRequestsSession .AUTH_DOMAINS :
31
+ if new_parsed .hostname in KoboRequestsSession .AUTH_DOMAINS : # pragma: no cover
36
32
return False
37
- return super ().should_strip_auth (old_url , new_url ) # type: ignore # FIXME: Call to untyped function "should_strip_auth" in typed context
33
+ return super ().should_strip_auth (
34
+ old_url , new_url
35
+ ) # type: ignore # FIXME: Call to untyped function "should_strip_auth" in typed context
38
36
39
37
40
38
class KoboAPI :
41
- def __init__ (self , business_area_slug : Optional [str ] = None ):
42
- if business_area_slug is not None :
43
- self .business_area = BusinessArea .objects .get (slug = business_area_slug )
44
- self .KPI_URL = self .business_area .kobo_url or settings .KOBO_URL
45
- else :
46
- self .business_area = None
47
- self .KPI_URL = settings .KOBO_URL
39
+ LIMIT = 30_000
40
+ FORMAT = "json"
48
41
49
- self ._get_token ()
42
+ def __init__ (
43
+ self , kpi_url : Optional [str ] = None , token : Optional [str ] = None , project_views_id : Optional [str ] = None
44
+ ) -> None :
45
+ self ._kpi_url = kpi_url or settings .KOBO_URL
46
+ self ._token = token or settings .KOBO_MASTER_API_TOKEN
47
+ self ._project_views_id = project_views_id or settings .KOBO_PROJECT_VIEWS_ID
48
+
49
+ self ._client = KoboRequestsSession ()
50
+ self ._set_token ()
51
+
52
+ def _set_token (self ) -> None :
53
+ retries = Retry (total = 5 , backoff_factor = 1 , status_forcelist = [502 , 503 , 504 ], allowed_methods = False )
54
+ self ._client .mount (self ._kpi_url , HTTPAdapter (max_retries = retries ))
55
+ self ._client .headers .update ({"Authorization" : f"token { self ._token } " })
50
56
51
- def _handle_paginated_results (self , url : str ) -> List [Dict ]:
57
+ def _get_paginated_request (self , url : str ) -> List [Dict ]:
52
58
next_url = url
53
59
results : List = []
54
60
55
- # if there will be more than 30000 results,
56
- # we need to make additional queries
57
61
while next_url :
58
- data = self ._handle_request (next_url )
62
+ response = self ._get_request (next_url )
63
+ data = response .json ()
59
64
next_url = data ["next" ]
60
65
results .extend (data ["results" ])
61
66
return results
62
67
63
- def _get_url (
64
- self ,
65
- endpoint : str ,
66
- append_api : bool = True ,
67
- add_limit : bool = True ,
68
- additional_query_params : Optional [Any ] = None ,
69
- ) -> str :
70
- endpoint .strip ("/" )
71
- if endpoint != "token" and append_api is True :
72
- endpoint = f"api/v2/{ endpoint } "
73
- # According to the Kobo API documentation,
74
- # the maximum limit per page is 30000
75
- query_params = f"format=json{ '&limit=30000' if add_limit else '' } "
76
- if additional_query_params is not None :
77
- query_params += f"&{ additional_query_params } "
78
- return f"{ self .KPI_URL } /{ endpoint } ?{ query_params } "
79
-
80
- def _get_token (self ) -> None :
81
- self ._client = KoboRequestsSession ()
82
- retries = Retry (total = 5 , backoff_factor = 1 , status_forcelist = [502 , 503 , 504 ], allowed_methods = False )
83
- self ._client .mount (self .KPI_URL , HTTPAdapter (max_retries = retries ))
84
-
85
- if self .business_area is None :
86
- token = settings .KOBO_MASTER_API_TOKEN
87
- else :
88
- token = self .business_area .kobo_token
89
-
90
- if not token :
91
- msg = f"KOBO Token is not set for business area { self .business_area } "
92
- logger .warning (msg )
93
- raise TokenNotProvided (msg )
94
-
95
- self ._client .headers .update ({"Authorization" : f"token { token } " })
96
-
97
- def _handle_request (self , url : str ) -> Dict :
68
+ def _get_request (self , url : str ) -> Response :
98
69
response = self ._client .get (url = url )
99
70
try :
100
71
response .raise_for_status ()
101
- except requests .exceptions .HTTPError as e :
102
- logger .warning (e )
72
+ except requests .exceptions .HTTPError as e : # pragma: no cover
73
+ logger .exception (e )
103
74
raise
104
- return response . json ()
75
+ return response
105
76
106
77
def _post_request (
107
78
self , url : str , data : Optional [Dict ] = None , files : Optional [typing .IO ] = None
108
- ) -> requests . Response :
79
+ ) -> Response : # pragma: no cover
109
80
return self ._client .post (url = url , data = data , files = files )
110
81
111
- def _patch_request (
112
- self , url : str , data : Optional [Dict ] = None , files : Optional [typing .IO ] = None
113
- ) -> requests .Response :
114
- return self ._client .patch (url = url , data = data , files = files )
115
-
116
82
def create_template_from_file (
117
- self , bytes_io_file : Optional [typing .IO ], xlsx_kobo_template_object : XLSXKoboTemplate , template_id : str = ""
118
- ) -> Optional [Tuple [Dict , str ]]:
119
- data = {
120
- "name" : "Untitled" ,
121
- "asset_type" : "template" ,
122
- "description" : "" ,
123
- "sector" : "" ,
124
- "country" : "" ,
125
- "share-metadata" : False ,
126
- }
83
+ self , bytes_io_file : typing .IO , xlsx_kobo_template_object : XLSXKoboTemplate , template_id : str = ""
84
+ ) -> Optional [Tuple [Dict , str ]]: # pragma: no cover
85
+ # TODO: not sure if this actually works
127
86
if not template_id :
128
- asset_response = self ._post_request (url = self ._get_url ("assets/" , add_limit = False ), data = data )
87
+ data = {
88
+ "name" : "Untitled" ,
89
+ "asset_type" : "template" ,
90
+ "description" : "" ,
91
+ "sector" : "" ,
92
+ "country" : "" ,
93
+ "share-metadata" : False ,
94
+ }
95
+ endpoint = "api/v2/assets"
96
+ query_params = f"format={ self .FORMAT } "
97
+ url = f"{ self ._kpi_url } /{ endpoint } ?{ query_params } "
98
+ asset_response = self ._post_request (url = url , data = data )
129
99
try :
130
100
asset_response .raise_for_status ()
131
101
except requests .exceptions .HTTPError as e :
@@ -135,12 +105,13 @@ def create_template_from_file(
135
105
asset_uid = asset_response_dict .get ("uid" )
136
106
else :
137
107
asset_uid = template_id
108
+
138
109
file_import_data = {
139
110
"assetUid" : asset_uid ,
140
- "destination" : self . _get_url ( f" assets/{ asset_uid } /" , append_api = False , add_limit = False ) ,
111
+ "destination" : f" { self . _kpi_url } / assets/{ asset_uid } ?format= { self . FORMAT } " ,
141
112
}
142
113
file_import_response = self ._post_request (
143
- url = self ._get_url ( "imports/" , append_api = False , add_limit = False ) ,
114
+ url = f" { self ._kpi_url } /imports?format= { self . FORMAT } " ,
144
115
data = file_import_data ,
145
116
files = {"file" : bytes_io_file }, # type: ignore # FIXME
146
117
)
@@ -149,7 +120,8 @@ def create_template_from_file(
149
120
150
121
attempts = 5
151
122
while attempts >= 0 :
152
- response_dict = self ._handle_request (url )
123
+ response = self ._get_request (url )
124
+ response_dict = response .json ()
153
125
import_status = response_dict .get ("status" )
154
126
if import_status == "processing" :
155
127
xlsx_kobo_template_object .status = XLSXKoboTemplate .PROCESSING
@@ -162,36 +134,31 @@ def create_template_from_file(
162
134
log_and_raise ("Fetching import data took too long" , error_type = RetryError )
163
135
return None
164
136
165
- def get_all_projects_data (self ) -> List :
166
- if not self . business_area :
167
- logger . warning ( "Business area is not provided" )
168
- raise ValueError ( "Business area is not provided" )
169
- projects_url = self ._get_url ( "assets/" )
170
-
171
- results = self ._handle_paginated_results ( projects_url )
172
- return filter_by_owner ( results , self .business_area )
137
+ def get_all_projects_data (self , country_code : str ) -> List :
138
+ if not country_code :
139
+ raise CountryCodeNotProvided ( "No country code provided" )
140
+ endpoint = f"api/v2/project-views/ { self . _project_views_id } /assets/"
141
+ query_params = f"format= { self .FORMAT } &limit= { self . LIMIT } "
142
+ query_params += f"&q=settings__country_codes__icontains: { country_code . upper () } "
143
+ url = f" { self ._kpi_url } / { endpoint } ? { query_params } "
144
+ return self ._get_paginated_request ( url )
173
145
174
146
def get_single_project_data (self , uid : str ) -> Dict :
175
- projects_url = self ._get_url (f"assets/{ uid } " )
176
-
177
- return self ._handle_request (projects_url )
147
+ endpoint = f"api/v2/assets/{ uid } /"
148
+ query_params = f"format={ self .FORMAT } &limit={ self .LIMIT } "
149
+ url = f"{ self ._kpi_url } /{ endpoint } ?{ query_params } "
150
+ response = self ._get_request (url )
151
+ return response .json ()
178
152
179
- def get_project_submissions (self , uid : str , only_active_submissions : bool ) -> List :
180
- additional_query_params = None
153
+ def get_project_submissions (self , uid : str , only_active_submissions : bool ) -> List [Dict ]:
154
+ endpoint = f"api/v2/assets/{ uid } /data/"
155
+ query_params = f"format={ self .FORMAT } &limit={ self .LIMIT } "
181
156
if only_active_submissions :
182
157
additional_query_params = 'query={"_validation_status.uid":"validation_status_approved"}'
183
- submissions_url = self ._get_url (
184
- f"assets/{ uid } /data/" ,
185
- additional_query_params = additional_query_params ,
186
- )
187
-
188
- return self ._handle_paginated_results (submissions_url )
158
+ query_params += f"&{ additional_query_params } "
159
+ url = f"{ self ._kpi_url } /{ endpoint } ?{ query_params } "
160
+ return self ._get_paginated_request (url )
189
161
190
- def get_attached_file (self , url : str ) -> BytesIO :
191
- response = self ._client .get (url = url )
192
- try :
193
- response .raise_for_status ()
194
- except requests .exceptions .HTTPError as e :
195
- logger .warning (e )
196
- raise
162
+ def get_attached_file (self , url : str ) -> BytesIO : # pragma: no cover
163
+ response = self ._get_request (url )
197
164
return BytesIO (response .content )
0 commit comments