diff --git a/src/datapump/sync/fire_alerts.py b/src/datapump/sync/fire_alerts.py index fb8f9eb4..ed7a1732 100644 --- a/src/datapump/sync/fire_alerts.py +++ b/src/datapump/sync/fire_alerts.py @@ -1,10 +1,7 @@ import csv -import io import os -import shutil -import zipfile +from tempfile import TemporaryDirectory -import requests import shapefile from ..clients.aws import get_s3_client @@ -30,96 +27,77 @@ def process_active_fire_alerts(alert_type): - LOGGER.info(f"Retrieving fire alerts for {alert_type}") - response = requests.get(ACTIVE_FIRE_ALERTS_7D_SHAPEFILE_URLS[alert_type]) - - if response.status_code != 200: - raise Exception( - f"Unable to get active {alert_type} fire alerts, FIRMS returned status code {response.status_code}" - ) + nrt_s3_directory = f"nasa_{alert_type.lower()}_fire_alerts/{VERSIONS[alert_type]}/vector/epsg-4326/tsv/near_real_time" + last_saved_date, last_saved_min = _get_last_saved_alert_time(nrt_s3_directory) + LOGGER.info(f"Last saved row datetime: {last_saved_date} {last_saved_min}") - LOGGER.info("Successfully downloaded alerts from NASA") + LOGGER.info(f"Retrieving fire alerts for {alert_type}") + rows = [] + with shapefile.Reader(ACTIVE_FIRE_ALERTS_7D_SHAPEFILE_URLS[alert_type]) as sf: + LOGGER.info(f"Shapefile has {len(sf)} records") + for shape_record in sf.iterShapeRecords(): + row = shape_record.record.as_dict() + row["ACQ_DATE"] = row["ACQ_DATE"].strftime("%Y-%m-%d") + row["LATITUDE"] = shape_record.shape.points[0][1] + row["LONGITUDE"] = shape_record.shape.points[0][0] + # For VIIRS we only want first letter of confidence category, + # to make NRT category same as scientific + if alert_type == "viirs": + row["CONFIDENCE"] = row["CONFIDENCE"][0] - zip = zipfile.ZipFile(io.BytesIO(response.content)) - shp_dir = f"{TEMP_DIR}/fire_alerts_{alert_type}" - zip.extractall(shp_dir) + if row["ACQ_DATE"] > last_saved_date or ( + row["ACQ_DATE"] == last_saved_date and + row["ACQ_TIME"] > last_saved_min + ): + rows.append(row) - if not os.path.isfile(f"{shp_dir}/{SHP_NAMES[alert_type]}"): + if not rows: raise Exception( - f"{alert_type} fire alerts zip downloaded, but contains no .shp file!" + f"{alert_type} shapefile contained no new records since {last_saved_date} {last_saved_min}" ) - - sf = shapefile.Reader(f"{shp_dir}/{SHP_NAMES[alert_type]}") - - rows = [] - for shape_record in sf.iterShapeRecords(): - row = shape_record.record.as_dict() - row["LATITUDE"] = shape_record.shape.points[0][1] - row["LONGITUDE"] = shape_record.shape.points[0][0] - row["ACQ_DATE"] = row["ACQ_DATE"].strftime("%Y-%m-%d") - rows.append(row) + else: + LOGGER.info(f"Found {len(rows)} new records for {alert_type}") sorted_rows = sorted(rows, key=lambda row: f"{row['ACQ_DATE']}_{row['ACQ_TIME']}") + first_row = sorted_rows[0] last_row = sorted_rows[-1] + LOGGER.info(f"First new record datetime: {first_row['ACQ_DATE']} {first_row['ACQ_TIME']}") + fields = [ "latitude", "longitude", "acq_date", "acq_time", "confidence", + *(BRIGHTNESS_FIELDS[alert_type]), + "frp" ] - fields += BRIGHTNESS_FIELDS[alert_type] - fields.append("frp") - result_path = get_tmp_result_path(alert_type) + with TemporaryDirectory() as temp_dir: + result_path = f"{temp_dir}/fire_alerts_{alert_type.lower()}.tsv" - tsv_file = open(result_path, "w", newline="") - tsv_writer = csv.DictWriter(tsv_file, fieldnames=fields, delimiter="\t") - tsv_writer.writeheader() + with open(result_path, "w", newline="") as tsv_file: + tsv_writer = csv.DictWriter(tsv_file, fieldnames=fields, delimiter="\t") + tsv_writer.writeheader() - nrt_s3_directory = f"nasa_{alert_type.lower()}_fire_alerts/{VERSIONS[alert_type]}/vector/epsg-4326/tsv/near_real_time" - last_saved_date, last_saved_min = _get_last_saved_alert_time(nrt_s3_directory) - LOGGER.info(f"Last saved row datetime: {last_saved_date} {last_saved_min}") + for row in sorted_rows: + _write_row(row, fields, tsv_writer) - first_row = None - for row in sorted_rows: - # only start once we confirm we're past the overlap with the last dataset - if row["ACQ_DATE"] > last_saved_date or ( - row["ACQ_DATE"] == last_saved_date and row["ACQ_TIME"] > last_saved_min - ): - if not first_row: - first_row = row - LOGGER.info( - f"First row datetime: {first_row['ACQ_DATE']} {first_row['ACQ_TIME']}" - ) - - # for VIIRS, we only want first letter of confidence category, to make NRT category same as scientific - if alert_type == "viirs": - row["CONFIDENCE"] = row["CONFIDENCE"][0] - - _write_row(row, fields, tsv_writer) - - LOGGER.info(f"Last row datetime: {last_row['ACQ_DATE']} {last_row['ACQ_TIME']}") - LOGGER.info("Successfully wrote TSV") + LOGGER.info("Successfully wrote TSV") + LOGGER.info(f"Last new record datetime: {last_row['ACQ_DATE']} {last_row['ACQ_TIME']}") - tsv_file.close() + # Upload file to s3 + pipeline_key = f"{nrt_s3_directory}/{first_row['ACQ_DATE']}-{first_row['ACQ_TIME']}_{last_row['ACQ_DATE']}-{last_row['ACQ_TIME']}.tsv" - # upload both files to s3 - file_name = f"{first_row['ACQ_DATE']}-{first_row['ACQ_TIME']}_{last_row['ACQ_DATE']}-{last_row['ACQ_TIME']}.tsv" - - with open(result_path, "rb") as tsv_result: - pipeline_key = f"{nrt_s3_directory}/{file_name}" - get_s3_client().upload_fileobj( - tsv_result, Bucket=DATA_LAKE_BUCKET, Key=pipeline_key - ) + with open(result_path, "rb") as tsv_result: + get_s3_client().upload_fileobj( + tsv_result, Bucket=DATA_LAKE_BUCKET, Key=pipeline_key + ) LOGGER.info(f"Successfully uploaded to s3://{DATA_LAKE_BUCKET}/{pipeline_key}") - # remove raw shapefile, since it can be big and hit max lambda storage size of 512 MB - shutil.rmtree(shp_dir) - return (f"s3a://{DATA_LAKE_BUCKET}/{pipeline_key}", last_row["ACQ_DATE"])