77
88import requests
99
10- from pystatis import config , db
11- from pystatis import cache
10+ from pystatis import cache , config , db
1211from pystatis .exception import DestatisStatusError , NoNewerDataError , TableNotFoundError
1312from pystatis .types import ParamDict
1413
@@ -51,9 +50,7 @@ def load_data(
5150 logger .info ("Data was loaded from cache." )
5251 else :
5352 response = get_data_from_endpoint (endpoint , method , params , db_name )
54- content_type = response .headers .get ("Content-Type" , "text/csv" ).split ("/" )[
55- - 1
56- ]
53+ content_type = response .headers .get ("Content-Type" , "text/csv" ).split ("/" )[- 1 ]
5754 data = response .content
5855
5956 # status code 98 means that the table is too big
@@ -62,7 +59,7 @@ def load_data(
6259 try :
6360 # test for job-relevant status code
6461 response_status_code = response .json ().get ("Status" ).get ("Code" )
65- except json .decoder .JSONDecodeError :
62+ except ( json .decoder .JSONDecodeError , requests . exceptions . JSONDecodeError ) :
6663 pass
6764
6865 if response_status_code == 98 :
@@ -72,14 +69,9 @@ def load_data(
7269 "Verarbeitung im Hintergrund erfolgreich gestartet. Job-ID: %s." ,
7370 job_id ,
7471 )
75- # in rare cases it seems that asking catalogue/jobs endpoint for the state of the newly created job fails because no job could be found
76- # so we add 5 seconds here to make sure that the job was created in the meantime
77- time .sleep (5 )
78- response = get_data_from_resultfile (job_id , db_name )
72+ response = get_data_from_resultfile (job_id , params , db_name )
7973 assert isinstance (response .content , bytes ) # nosec assert_used
80- content_type = response .headers .get ("Content-Type" , "text/csv" ).split (
81- "/"
82- )[- 1 ]
74+ content_type = response .headers .get ("Content-Type" , "text/csv" ).split ("/" )[- 1 ]
8375 data = response .content
8476
8577 cache .cache_data (cache_dir , name , params , data , content_type )
@@ -122,7 +114,7 @@ def get_response(db_name: str, params: ParamDict) -> requests.Response:
122114 "password" : db_pw ,
123115 }
124116
125- return requests .post (url , headers = headers , data = params , timeout = (5 , 300 ))
117+ return requests .post (url , headers = headers , data = params , timeout = (30 , 300 ))
126118
127119 # Determine database by matching regex to item code
128120 if db_name is None :
@@ -162,18 +154,19 @@ def start_job(endpoint: str, method: str, params: ParamDict) -> requests.Respons
162154 Args:
163155 endpoint (str): Destatis endpoint (eg. data, catalogue, ..)
164156 method (str): Destatis method (eg. tablefile, ...)
165- params (dict): dictionary of query parameters
157+ params (dict): Dictionary of query parameters.
166158
167159 Returns:
168160 requests.Response: the response object holding the response from calling the Destatis endpoint.
169161 """
170162 logger .warning (
171163 "Die Tabelle ist zu groß, um direkt abgerufen zu werden. Es wird eine Verarbeitung im Hintergrund gestartet."
172164 )
173- params ["job" ] = "true"
165+ job_params = params .copy ()
166+ job_params ["job" ] = "true"
174167
175168 # starting a job
176- response = get_data_from_endpoint (endpoint = endpoint , method = method , params = params )
169+ response = get_data_from_endpoint (endpoint = endpoint , method = method , params = job_params )
177170
178171 return response
179172
@@ -201,58 +194,57 @@ def get_job_id_from_response(response: requests.Response) -> str:
201194
202195
203196def get_data_from_resultfile (
204- job_id : str , db_name : str | None = None
197+ job_id : str , params : ParamDict , db_name : str | None = None
205198) -> requests .Response :
206199 """Get data from a job once it is finished or when the timeout is reached.
207200
208201 Args:
209202 job_id (str): Job ID generated by Destatis API.
203+ params (dict): Dictionary of query parameters.
210204 db_name (str, optional): The database to use for this data request.
211205 One of "genesis", "zensus", "regio". Defaults to None.
212206
213207 Returns:
214208 requests.Response: the response object holding the response from calling the Destatis endpoint.
215209 """
216- params = {
217- "selection" : "*" + job_id ,
218- "searchcriterion" : "code" ,
219- "sortcriterion" : "code" ,
220- "type" : "all" ,
210+ job_params = {
211+ "selection" : job_id ,
212+ "area" : "user" ,
213+ "pagelength" : "1" ,
221214 }
222215
223216 time_ = time .perf_counter ()
224217
225218 while (time .perf_counter () - time_ ) < JOB_TIMEOUT :
226- response = get_data_from_endpoint (
227- endpoint = "catalogue" , method = "jobs" , params = params , db_name = db_name
228- )
229-
230- jobs = response .json ().get ("List" )
231- if len (jobs ) > 0 and jobs [0 ].get ("State" ) == "Fertig" :
232- logger .info (
233- (
234- "Verarbeitung im Hintergrund abgeschlossen. "
235- "Ergebnis kann jetzt abgerufen werden über "
236- "/data/resultfile und Job-ID: %s."
237- ),
238- job_id ,
219+ try :
220+ response = get_data_from_endpoint (
221+ endpoint = "catalogue" , method = "results" , params = job_params , db_name = db_name
239222 )
240- break
241- else :
223+
224+ jobs = response .json ().get ("List" )
225+ if len (jobs ) > 0 :
226+ logger .info (
227+ (
228+ "Verarbeitung im Hintergrund abgeschlossen. "
229+ "Ergebnis kann jetzt abgerufen werden über "
230+ "/data/resultfile und Job-ID: %s."
231+ ),
232+ job_id ,
233+ )
234+ break
235+ except DestatisStatusError :
242236 logger .info ("Verarbeitung im Hintergrund läuft noch..." )
243237
244238 time .sleep (5 )
245239 else :
246- print ("Time out exceeded! Aborting..." )
240+ logger .error (
241+ "Verarbeitungsfenster von %s Minuten überschritten. Job-Datei konnte nicht heruntergeladen werden." ,
242+ JOB_TIMEOUT // 60 ,
243+ )
247244 return bytes ()
248245
249- time .sleep (5 )
250- params = {
251- "name" : job_id ,
252- "area" : "all" ,
253- "compress" : "false" ,
254- "format" : "ffcsv" ,
255- }
246+ params = params .copy ()
247+ params ["name" ] = job_id
256248 response = get_data_from_endpoint (
257249 endpoint = "data" , method = "resultfile" , params = params , db_name = db_name
258250 )
@@ -325,36 +317,47 @@ def _check_destatis_status(destatis_status: dict) -> None: # type: ignore
325317 Raises:
326318 DestatisStatusError: If the status code or type displays an error (caused by the user inputs)
327319 """
328- # -1 status code for unexpected errors and if no status code is given (faulty response)
329- destatis_status_code = destatis_status .get ("Code" , - 1 )
330- destatis_status_type = destatis_status .get ("Type" , "Information" )
320+ # Status codes
321+ STATUS_CODE_PARAM_ADJUSTED = 22
322+ STATUS_CODE_NO_NEW_DATA = 50
323+ STATUS_CODE_TABLE_NOT_FOUND = 90
324+ STATUS_CODE_LARGE_TABLE = 98
325+ STATUS_CODE_NO_MATCHING_OBJECT = 104
326+ STATUS_CODE_ERROR = - 1 # For unexpected errors and if no status code is given
327+
328+ # Status types
329+ STATUS_TYPE_INFORMATION = "Information"
330+ ERROR_TYPES = ["Error" , "Fehler" ]
331+ WARNING_TYPES = ["Warning" , "Warnung" ]
332+
333+ destatis_status_code = destatis_status .get ("Code" , STATUS_CODE_ERROR )
334+ destatis_status_type = destatis_status .get ("Type" , STATUS_TYPE_INFORMATION )
331335 destatis_status_content = destatis_status .get ("Content" )
332336
333- # define status types
334- error_en_de = ["Error" , "Fehler" ]
335- warning_en_de = ["Warning" , "Warnung" ]
336-
337- # check for generic/ system error
338- if destatis_status_code == - 1 :
337+ # Check for generic/system error
338+ if destatis_status_code == STATUS_CODE_ERROR :
339339 raise DestatisStatusError (destatis_status_content )
340340
341- # check for destatis/ query errors
342- elif (destatis_status_code in [104 , 50 , 90 ]) or (
343- destatis_status_type in error_en_de
344- ):
345- if destatis_status_code == 98 :
341+ # Check for destatis/query errors
342+ elif (
343+ destatis_status_code
344+ in [STATUS_CODE_NO_MATCHING_OBJECT , STATUS_CODE_NO_NEW_DATA , STATUS_CODE_TABLE_NOT_FOUND ]
345+ ) or (destatis_status_type in ERROR_TYPES ):
346+ if destatis_status_code == STATUS_CODE_LARGE_TABLE :
346347 pass
347- elif destatis_status_code == 50 :
348+ elif destatis_status_code == STATUS_CODE_NO_NEW_DATA :
348349 raise NoNewerDataError (destatis_status_content )
349- elif destatis_status_code == 90 :
350+ elif destatis_status_code == STATUS_CODE_TABLE_NOT_FOUND :
350351 raise TableNotFoundError (destatis_status_content )
351352 else :
352353 raise DestatisStatusError (destatis_status_content )
353354
354- # output warnings to user
355- elif (destatis_status_code == 22 ) or (destatis_status_type in warning_en_de ):
355+ # Output warnings to user
356+ elif (destatis_status_code == STATUS_CODE_PARAM_ADJUSTED ) or (
357+ destatis_status_type in WARNING_TYPES
358+ ):
356359 logger .warning (destatis_status_content )
357360
358- # output information to user
359- elif destatis_status_type .lower () == "information" :
361+ # Output information to user
362+ elif destatis_status_type .lower () == STATUS_TYPE_INFORMATION . lower () :
360363 logger .info ("Code %d: %s" , destatis_status_code , destatis_status_content )
0 commit comments