19
19
from datetime import datetime , timezone
20
20
from DiveDB .services .utils .openstack import SwiftClient
21
21
22
- duckpond = DuckPond ()
23
- swift_client = SwiftClient ()
24
22
25
23
django_prefix = os .environ .get ("DJANGO_PREFIX" , "DiveDB" )
26
24
os .environ .setdefault (
27
25
"DJANGO_SETTINGS_MODULE" , f"{ django_prefix } .server.django_app.settings"
28
26
)
29
27
django .setup ()
30
28
31
- from DiveDB .server .metadata .models import Files , Recordings # noqa: E402
29
+ from DiveDB .server .metadata .models import ( # noqa: E402
30
+ Files ,
31
+ Recordings ,
32
+ Deployments ,
33
+ Animals ,
34
+ Loggers ,
35
+ AnimalDeployments ,
36
+ )
32
37
33
38
34
39
@dataclass
@@ -55,6 +60,11 @@ class NetCDFValidationError(Exception):
55
60
class DataUploader :
56
61
"""Data Uploader"""
57
62
63
+ def __init__ (self , duckpond : DuckPond = None , swift_client : SwiftClient = None ):
64
+ """Initialize DataUploader with optional DuckPond and SwiftClient instances."""
65
+ self .duckpond = duckpond or DuckPond ()
66
+ self .swift_client = swift_client or SwiftClient ()
67
+
58
68
def _read_edf_signal (self , edf : edfio .Edf , label : str ):
59
69
"""Function to read a single signal from an EDF file."""
60
70
signal = edf .get_signal (label )
@@ -119,15 +129,15 @@ def _create_value_structs(self, values):
119
129
lambda x : (float (x ) if isinstance (x , (int , float )) else np .nan )
120
130
)(values )
121
131
122
- # Determine if any value has a decimal place
123
- if np .any (numeric_values % 1 != 0 ):
124
- float_values = np .where (np .isfinite (numeric_values ), numeric_values , None )
125
- int_values = None
126
- else :
132
+ # Check if the data type is integer
133
+ if np .issubdtype (numeric_values .dtype , np .integer ):
127
134
float_values = None
128
135
int_values = np .where (
129
136
np .isfinite (numeric_values ), numeric_values .astype (int ), None
130
137
)
138
+ else :
139
+ float_values = np .where (np .isfinite (numeric_values ), numeric_values , None )
140
+ int_values = None
131
141
132
142
string_values = np .where (
133
143
~ np .isin (values , [True , False ])
@@ -186,7 +196,9 @@ def _write_data_to_duckpond(
186
196
[metadata ["animal" ]] * len (values ), type = pa .string ()
187
197
),
188
198
"deployment" : pa .array (
189
- [metadata ["deployment" ]] * len (values ), type = pa .string ()
199
+ # This fix isn't working, make it a string
200
+ [str (metadata ["deployment" ])] * len (values ),
201
+ type = pa .string (),
190
202
),
191
203
"recording" : pa .array (
192
204
[metadata ["recording" ]] * len (values ), type = pa .string ()
@@ -199,7 +211,7 @@ def _write_data_to_duckpond(
199
211
},
200
212
schema = LAKE_CONFIGS ["DATA" ]["schema" ],
201
213
)
202
- duckpond .write_to_delta (
214
+ self . duckpond .write_to_delta (
203
215
data = batch_table ,
204
216
lake = "DATA" ,
205
217
mode = "append" ,
@@ -241,7 +253,7 @@ def _write_event_to_duckpond(
241
253
[metadata ["animal" ]] * len (event_keys ), type = pa .string ()
242
254
),
243
255
"deployment" : pa .array (
244
- [metadata ["deployment" ]] * len (event_keys ), type = pa .string ()
256
+ [str ( metadata ["deployment" ]) ] * len (event_keys ), type = pa .string ()
245
257
),
246
258
"recording" : pa .array (
247
259
[metadata ["recording" ]] * len (event_keys ), type = pa .string ()
@@ -256,7 +268,7 @@ def _write_event_to_duckpond(
256
268
},
257
269
schema = LAKE_CONFIGS ["STATE_EVENTS" ]["schema" ],
258
270
)
259
- duckpond .write_to_delta (
271
+ self . duckpond .write_to_delta (
260
272
data = batch_table ,
261
273
lake = "STATE_EVENTS" ,
262
274
mode = "append" ,
@@ -266,7 +278,6 @@ def _write_event_to_duckpond(
266
278
)
267
279
del batch_table
268
280
gc .collect ()
269
-
270
281
def _validate_netcdf (self , ds : xr .Dataset ):
271
282
"""
272
283
Validates netCDF file before upload.
@@ -356,8 +367,84 @@ def _validate_netcdf(self, ds: xr.Dataset):
356
367
)
357
368
return True
358
369
370
+ def get_or_create_logger (self , logger_data ):
371
+ logger , created = Loggers .objects .get_or_create (
372
+ id = logger_data ["logger_id" ],
373
+ defaults = {
374
+ "manufacturer" : logger_data .get ("manufacturer" ),
375
+ "manufacturer_name" : logger_data .get ("manufacturer_name" ),
376
+ "serial_no" : logger_data .get ("serial_no" ),
377
+ "ptt" : logger_data .get ("ptt" ),
378
+ "type" : logger_data .get ("type" ),
379
+ "notes" : logger_data .get ("notes" ),
380
+ },
381
+ )
382
+ return logger , created
383
+
384
+ def get_or_create_recording (self , recording_data ):
385
+ animal_deployment , _ = AnimalDeployments .objects .get_or_create (
386
+ animal = recording_data ["animal" ], deployment = recording_data ["deployment" ]
387
+ )
388
+ recording , created = Recordings .objects .get_or_create (
389
+ id = recording_data ["recording_id" ],
390
+ defaults = {
391
+ "name" : recording_data .get ("name" ),
392
+ "animal_deployment" : animal_deployment ,
393
+ "logger" : recording_data .get ("logger" ),
394
+ "start_time" : recording_data .get ("start_time" ),
395
+ "end_time" : recording_data .get ("end_time" ),
396
+ "timezone" : recording_data .get ("timezone" ),
397
+ "quality" : recording_data .get ("quality" ),
398
+ "attachment_location" : recording_data .get ("attachment_location" ),
399
+ "attachment_type" : recording_data .get ("attachment_type" ),
400
+ },
401
+ )
402
+ return recording , created
403
+
404
+ def get_or_create_deployment (self , deployment_data ):
405
+ deployment , created = Deployments .objects .get_or_create (
406
+ id = deployment_data ["deployment_id" ],
407
+ defaults = {
408
+ "domain_deployment_id" : deployment_data .get ("domain_deployment_id" ),
409
+ "animal_age_class" : deployment_data .get ("animal_age_class" ),
410
+ "animal_age" : deployment_data .get ("animal_age" ),
411
+ "deployment_type" : deployment_data .get ("deployment_type" ),
412
+ "deployment_name" : deployment_data .get ("deployment_name" ),
413
+ "rec_date" : deployment_data .get ("rec_date" ),
414
+ "deployment_latitude" : deployment_data .get ("deployment_latitude" ),
415
+ "deployment_longitude" : deployment_data .get ("deployment_longitude" ),
416
+ "deployment_location" : deployment_data .get ("deployment_location" ),
417
+ "departure_datetime" : deployment_data .get ("departure_datetime" ),
418
+ "recovery_latitude" : deployment_data .get ("recovery_latitude" ),
419
+ "recovery_longitude" : deployment_data .get ("recovery_longitude" ),
420
+ "recovery_location" : deployment_data .get ("recovery_location" ),
421
+ "arrival_datetime" : deployment_data .get ("arrival_datetime" ),
422
+ "notes" : deployment_data .get ("notes" ),
423
+ },
424
+ )
425
+ return deployment , created
426
+
427
+ def get_or_create_animal (self , animal_data ):
428
+ animal , created = Animals .objects .get_or_create (
429
+ id = animal_data ["animal_id" ],
430
+ defaults = {
431
+ "project_id" : animal_data .get ("project_id" ),
432
+ "common_name" : animal_data .get ("common_name" ),
433
+ "scientific_name" : animal_data .get ("scientific_name" ),
434
+ "lab_id" : animal_data .get ("lab_id" ),
435
+ "birth_year" : animal_data .get ("birth_year" ),
436
+ "sex" : animal_data .get ("sex" ),
437
+ "domain_ids" : animal_data .get ("domain_ids" ),
438
+ },
439
+ )
440
+ return animal , created
441
+
359
442
def upload_netcdf (
360
- self , netcdf_file_path : str , metadata : dict , batch_size : int = 1000000
443
+ self ,
444
+ netcdf_file_path : str ,
445
+ metadata : dict ,
446
+ batch_size : int = 1000000 ,
447
+ rename_map : dict = None ,
361
448
):
362
449
"""
363
450
Uploads a netCDF file to the database and DuckPond.
@@ -370,9 +457,22 @@ def upload_netcdf(
370
457
- deployment: Deployment Name (str)
371
458
- recording: Recording Name (str)
372
459
batch_size (int, optional): Size of data batches for processing. Defaults to 1 million
460
+ rename_map (dict, optional): A dictionary mapping original variable names to new names.
373
461
"""
462
+
374
463
ds = xr .open_dataset (netcdf_file_path )
375
464
465
+ # Apply renaming if rename_map is provided
466
+ if rename_map :
467
+ # Convert all data variable names to lowercase
468
+ lower_case_rename_map = {k .lower (): v for k , v in rename_map .items ()}
469
+ ds = ds .rename (
470
+ {
471
+ var : lower_case_rename_map .get (var .lower (), var )
472
+ for var in ds .data_vars
473
+ }
474
+ )
475
+
376
476
print (
377
477
f"Creating file record for { os .path .basename (netcdf_file_path )} and uploading to OpenStack..."
378
478
)
@@ -431,7 +531,7 @@ def __init__(self, name):
431
531
group = coord .replace ("_samples" , "" ),
432
532
event_keys = event_keys ,
433
533
event_data = event_data ,
434
- file_name = file .file [ " name" ] ,
534
+ file_name = file .file . name ,
435
535
)
436
536
for var_name , var_data in ds [variables_with_coord ].items ():
437
537
if (
@@ -453,7 +553,9 @@ def __init__(self, name):
453
553
454
554
group = var_data .attrs .get ("group" , "ungrouped" )
455
555
class_name = var_name
456
- label = sub_var_name
556
+ label = rename_map .get (
557
+ sub_var_name .lower (), sub_var_name
558
+ )
457
559
458
560
values = var_data .values [start :end , var_index ]
459
561
self ._write_data_to_duckpond (
@@ -487,6 +589,7 @@ def __init__(self, name):
487
589
if "variable" in var_data .attrs
488
590
else var_name
489
591
)
592
+ label = rename_map .get (label .lower (), label )
490
593
491
594
values = var_data .values [start :end ]
492
595
self ._write_data_to_duckpond (
0 commit comments