From 65b673b3bcf55b61aa805e26a82aca65c98ed4d0 Mon Sep 17 00:00:00 2001 From: David Oesch Date: Tue, 17 Sep 2024 13:35:16 +0200 Subject: [PATCH] Dev 20240717 to v.1.1.0 (#94) * Create util_upload_dxdy.py * Fix drive timout and clean up satromo_publish : we re- initialize gdrive , since it might time out ( already applied as hotfix on prod) * MSG / MFG adaption to the operational CLIMA delivery - added in ls and lst_clima descriptor of original filename - lst_clima: adapted extraction for new delivery from meteoswiss, time extraction, set path names to M instead to MSG or MFG * starting the processing of the LST climatology * Update util_upload_dxdy.py adapted to ror output, merging dx and dy in one files and create a cogtif * Update util_upload_dxdy.py fixing docu issue * s2 sr precalculated coregistration usage switch DX DY - shifts are now provided by an asset collection which has been calculated by AROSICS. initial Version to test October2023 data * added oed_config * Update step0_processor_s2_sr.py Added info regarding METHOF of terrain shadow / Coreg for image properties * Handle STAC Timeouts * Create util_deleteassets.py To delete assets from an image collection by an asset list (.txt) containing the IDs of the assets to delete. * Update step0_processor_msg_lst_clima.py to use MSG instead of MFG * added util_checkasset -added util checkasset and * Update satromo_processor.drawio * Update satromo_processor.drawio * Added satromo_processor.drawio.svg * fix for https://github.com/swisstopo/topo-satromo/issues/92 - fix https://github.com/swisstopo/topo-satromo/issues/92 - reduced printing of STAC commands and GDAL output * Added google cloud storage option to def and prod -added GCS export options to main_utils, satromo-publish * Update util_moveassets.py * Added checlk_asset_size function to publish.py - new function to call for the 'asset_size' of the product to be published via the config.py file - check if current asset size matches the defined asset_size - adapt dev_config.py to contain asset_size variables for S2-SR and VHI * Update step1_processor_vhi.py - comment functions to deal with VIIRS LST data (could be deleted later on) - changed data source references of LST to resort to Meteosat data instead of VIIRS data - changing band name and scaling factor to match Meteosat data * Update dev_config.py - change from DRIVE to GCS - added asset_size to S2-SR and VHI - addjust LST products for VHI * updated docu in MSG Processor * VHI is calculated for every day even if there is no s2_sr for the specific day Checking for the date provided, if the S2_SR product is available, and generates for the specific day the VHI - added the is_date_in_empty_asset_list function in main_utils - added this check in step1_processor_vhi * Added check for existance of DX DY in S2_SR Is a dx dy available for this date -> Yes: continue / No: abort ('No dx dy available') * Fixing vulnerabilities https://github.com/swisstopo/topo-satromo/pull/93 https://github.com/swisstopo/topo-satromo/security/dependabot/9 https://github.com/swisstopo/topo-satromo/security/dependabot/8 https://github.com/swisstopo/topo-satromo/security/dependabot/6 * fxied fiona dependeny in geopandas * added checks VHI :asset already exists or in empty asset list -VHI GEE Asset already exists ?? then skip -VHI in empty_asset list? then skip * VHI update for operational daily run - added a 'LST_current_data' coellction to VHI Product in config - removed PRODUCT_MSG . not needed since we upload now from within PRODUCT_VHI the daily LST - step0_processor_msg_lst.py Set target asset to 'LST_current_data' and lets take no data definition from PRODUCT_MSG_CLIMA VHI step1_processor_vhi.py - re-arranged base variables to the top of the function def process_PRODUCT_VHI - Test PRE conditions: 1) TEST LST: LST Asset avilable? We first check if we have the neccessary termporal coverage 2 )TEST VHI GEE: VHI GEE Asset already exists ?? then skip 3) # TEST VHI empty asset? VHI in empty_asset list? then skip 4) # TEST only Process if newer than last product update OR no S2 SR fro specific date is in empty list * reduced printing statements - no more printing of warnregions stats * update prod_config for vhi --------- Co-authored-by: Joan Sturm --- README.md | 2 +- configuration/dev_config.py | 36 +- configuration/oed_config.py | 309 ++++++++++++ configuration/prod_config.py | 21 +- main_functions/main_extract_warnregions.py | 23 +- main_functions/main_publish_stac_fsdi.py | 63 ++- main_functions/main_thumbnails.py | 10 +- main_functions/main_utils.py | 76 ++- main_functions/util_checkassets.py | 220 +++++++++ main_functions/util_deleteassets.py | 76 +++ main_functions/util_moveassets.py | 9 +- main_functions/util_upload_dxdy.py | 299 ++++++++++++ requirements.txt | 6 +- satromo_processor.drawio | 127 ++--- satromo_processor.drawio.svg | 3 +- satromo_processor.py | 5 +- satromo_publish.py | 447 +++++++++++------- step0_processors/step0_processor_msg_lst.py | 31 +- .../step0_processor_msg_lst_clima.py | 58 ++- step0_processors/step0_processor_s2_sr.py | 109 ++++- step1_processors/step1_processor_vhi.py | 218 ++++++--- tools/last_updates.csv | 2 +- 22 files changed, 1741 insertions(+), 409 deletions(-) create mode 100644 configuration/oed_config.py create mode 100644 main_functions/util_checkassets.py create mode 100644 main_functions/util_deleteassets.py create mode 100644 main_functions/util_upload_dxdy.py diff --git a/README.md b/README.md index 21a08d5c..b4b7794e 100644 --- a/README.md +++ b/README.md @@ -235,7 +235,7 @@ Update the step0_empty_assets.csv directly on GitHub PROD: https://github.com/sw - [ ] M1 Vitalität – NDMI Anomalien - [ ] N2 Veränderung – NDVI Anomalien - [ ] B2 Natürliche Störungen – NBR - - [ ] V1 Vegetation Health Index + - [X] V1 Vegetation Health Index ## Contributing diff --git a/configuration/dev_config.py b/configuration/dev_config.py index 0b0f4e06..f511f9aa 100644 --- a/configuration/dev_config.py +++ b/configuration/dev_config.py @@ -19,10 +19,15 @@ EMPTY_ASSET_LIST = os.path.join("tools", "step0_empty_assets.csv") PROCESSING_DIR = "processing" LAST_PRODUCT_UPDATES = os.path.join("tools", "last_updates.csv") +# Set GDRIVE Type: GCS for Google Cloud Storage and DRIVE for Google Drive +GDRIVE_TYPE = "GCS" +# Set GCS Bucket name of Google Cloud Storage +GCLOUD_BUCKET = "satromo_export" # Local Machine GDRIVE_SOURCE_DEV = "geedrivetest:" # under Windows, add \\ to escape the backslash like r'Y:\\' -GDRIVE_MOUNT_DEV = r'Y:\\' +# GDRIVE_MOUNT_DEV = r'Y:\\' #for DRIVE +GDRIVE_MOUNT_DEV = r'M:\\satromo_export' # for GCS # under Windows, add \\ to escape the backslash like r'X:\\' S3_DESTINATION_DEV = r'X:\\' @@ -81,7 +86,7 @@ "geocat_id": "7ae5cd5b-e872-4719-92c0-dc2f86c4d471", "temporal_coverage": 1, # Days "spatial_scale_export": 10, # Meters # TODO: check if needed in context with step0 - # Meters # TODO: check if needed in context with step0 + "asset_size": 5, "spatial_scale_export_mask": 10, "product_name": "ch.swisstopo.swisseo_s2-sr_v100", "no_data": 9999, @@ -99,29 +104,32 @@ "product_name": "ch.swisstopo.swisseo_vhi_v100", "no_data": 255, "missing_data": 110, + "asset_size": 2, 'NDVI_reference_data': 'projects/satromo-prod/assets/col/1991-2020_NDVI_SWISS', - 'LST_reference_data': 'projects/satromo-prod/assets/col/2012-2020_LST_SWISS', + 'LST_reference_data': 'projects/satromo-prod/assets/col/1991-2020_LST_SWISS', + # prod:'projects/satromo-prod/assets/col/LST_SWISS', + 'LST_current_data': 'projects/satromo-int/assets/LST_CLIMA_SWISS', "step1_collection": 'projects/satromo-int/assets/VHI_SWISS', "step0_collection": "projects/satromo-int/assets/COL_S2_SR_HARMONIZED_SWISS" } -# MSG – MeteoSchweiz -PRODUCT_MSG = { - # - # this is placeholder, needed for the step0 function, - "image_collection": "METEOSCHWEIZ/MSG", - "temporal_coverage": 7, # Days - "product_name": "ch.meteoschweiz.landoberflaechentemperatur", - "no_data": 0, - # 'step0_collection': 'projects/satromo-int/assets/LST_SWISS' -} +# # MSG – MeteoSchweiz +# PRODUCT_MSG = { +# # +# # this is placeholder, needed for the step0 function, +# "image_collection": "METEOSCHWEIZ/MSG", +# "temporal_coverage": 7, # Days +# "product_name": "ch.meteoschweiz.landoberflaechentemperatur", +# "no_data": 0, +# 'step0_collection': 'projects/satromo-int/assets/LST_CLIMA_SWISS' +# } # MSG – MeteoSchweiz: only used for repreocessing PRODUCT_MSG_CLIMA = { # # this is placeholder, needed for the step0 function, "image_collection": "METEOSCHWEIZ/MSG", - "temporal_coverage": 7, # Days + "temporal_coverage": 1, # Days "product_name": "ch.meteoschweiz.landoberflaechentemperatur", "no_data": 0, # 'step0_collection': 'projects/satromo-int/assets/LST_CLIMA_SWISS' diff --git a/configuration/oed_config.py b/configuration/oed_config.py new file mode 100644 index 00000000..994b2437 --- /dev/null +++ b/configuration/oed_config.py @@ -0,0 +1,309 @@ +# -*- coding: utf-8 -*- +import os + +# General variables +# -------------------------- + +# GitHub repository +GITHUB_OWNER = "swisstopo" +GITHUB_REPO = "topo-satromo" + +# Secrets +GDRIVE_SECRETS = os.path.join("secrets", "geetest-credentials.secret") +RCLONE_SECRETS = os.path.join("secrets", "rclone.conf") +FSDI_SECRETS = os.path.join("secrets", "stac_fsdi.json") + +# File and directory paths +GEE_RUNNING_TASKS = os.path.join("processing", "running_tasks.csv") +GEE_COMPLETED_TASKS = os.path.join("tools", "completed_tasks.csv") +EMPTY_ASSET_LIST = os.path.join("tools", "step0_empty_assets.csv") +PROCESSING_DIR = "processing" +LAST_PRODUCT_UPDATES = os.path.join("tools", "last_updates.csv") +# Local Machine +GDRIVE_SOURCE_DEV = "geedrivetest:" +# under Windows, add \\ to escape the backslash like r'Y:\\' +GDRIVE_MOUNT_DEV = r'Y:\\' +# under Windows, add \\ to escape the backslash like r'X:\\' +S3_DESTINATION_DEV = r'X:\\' + +# GITHUB +GDRIVE_SOURCE_INT = "geedriveINT:" +GDRIVE_MOUNT_INT = "localgdrive" +S3_DESTINATION_INT = os.path.join("s3INT:satromoint", "data") + + +# General GEE parameters + +# TODO: check if needed +SHARD_SIZE = 256 + +# Development environment parameters +RESULTS = os.path.join("results") # Local path for results + +# General product parameters +# --------------------------- + +# Coordinate Reference System (EPSG:4326 for WGS84, EPSG:2056 for CH1903+, see epsg.io) +OUTPUT_CRS = "EPSG:2056" + +# Desired buffer in m width around ROI, e.g., 25000, this defines the final extent +# TODO: check if needed in context with step0 +BUFFER = os.path.join("assets", "ch_buffer_5000m.shp") +OVERVIEW_LAKES = os.path.join("assets", "overview_lakes_2056.shp") +OVERVIEW_RIVERS = os.path.join("assets", "overview_rivers_2056.shp") +WARNREGIONS = os.path.join("assets", "warnregionen_vhi_2056.shp") + + +# Switzerland border with 10km buffer: [5.78, 45.70, 10.69, 47.89] , Schönbühl [ 7.471940, 47.011335, 7.497431, 47.027602] Martigny [ 7.075402, 46.107098, 7.100894, 46.123639] +# Defines the initial extent to search for image tiles This is not the final extent is defined by BUFFER +# TODO: check if needed in context with step0 +ROI_RECTANGLE = [5.78, 45.70, 10.69, 47.89] +ROI_BORDER_BUFFER = 5000 # Buffer around Switzerland + +# No data value TODO : needs to be defined per product +NODATA = 9999 + + +## PRODUCTS, INDICES and custom COLLECTIONS ### +# --------------------------- +# See https://github.com/swisstopo/topo-satromo/tree/main?tab=readme-ov-file#configuration-in-_configpy for details +# TL;DR : First define in A) PRODUCTS, INDICES: for step0 (cloud, shadow, co-register, mosaic) the TOA SR data custom "step0_collection" to be generated / used +# then + +# A) PRODUCTS, INDICES +# ******************** + +# ch.swisstopo.swisseo_s2-sr +PRODUCT_S2_LEVEL_2A = { + # "prefix": "S2_L2A_SR", + # TODO: check if needed in context with step0 + "image_collection": "COPERNICUS/S2_SR_HARMONIZED", + "geocat_id": "7ae5cd5b-e872-4719-92c0-dc2f86c4d471", + "temporal_coverage": 1, # Days + "spatial_scale_export": 10, # Meters # TODO: check if needed in context with step0 + # Meters # TODO: check if needed in context with step0 + "spatial_scale_export_mask": 10, + "product_name": "ch.swisstopo.swisseo_s2-sr_v100", + "no_data": 9999, + "step0_collection": "projects/geetest-386915/assets/col_s2_sr" +} + +# VHI – Trockenstress +PRODUCT_VHI = { + # TODO: check if needed in context with step0 + "image_collection": "COPERNICUS/S2_SR_HARMONIZED", + "geocat_id": "bc4d0e6b-e92e-4f28-a7d2-f41bf61e98bc", + "temporal_coverage": 7, # Days + "spatial_scale_export": 10, # Meters + # "band_names": [{'NIR': "B8", 'RED': "B4"}], + "product_name": "ch.swisstopo.swisseo_vhi_v100", + "no_data": 255, + "missing_data": 110, + 'NDVI_reference_data': 'projects/satromo-prod/assets/col/1991-2020_NDVI_SWISS', + 'LST_reference_data': 'projects/satromo-prod/assets/col/2012-2020_LST_SWISS', + # "step1_collection": 'projects/satromo-int/assets/VHI_SWISS', + # "step0_collection": "projects/satromo-int/assets/COL_S2_SR_HARMONIZED_SWISS" +} + +# MSG – MeteoSchweiz +PRODUCT_MSG = { + # + # this is placeholder, needed for the step0 function, + "image_collection": "METEOSCHWEIZ/MSG", + "temporal_coverage": 7, # Days + "product_name": "ch.meteoschweiz.landoberflaechentemperatur", + "no_data": 0, + # 'step0_collection': 'projects/satromo-int/assets/LST_SWISS' +} + +# MSG – MeteoSchweiz: only used for repreocessing +PRODUCT_MSG_CLIMA = { + # + # this is placeholder, needed for the step0 function, + "image_collection": "METEOSCHWEIZ/MSG", + "temporal_coverage": 365, # Days + "product_name": "ch.meteoschweiz.landoberflaechentemperatur", + "no_data": 0, + # 'step0_collection': 'projects/satromo-int/assets/LST_CLIMA_SWISS' +} + +# TEST datasets +# TEST NDVI +PRODUCT_NDVI_MAX = { + # "prefix": "Sentinel_NDVI-MAX_SR_CloudFree_crop", + # TODO: check if needed in context with step0 + "image_collection": "COPERNICUS/S2_SR_HARMONIZED", + "temporal_coverage": 3, # Days + "spatial_scale_export": 10, # Meters + "band_names": [{'NIR': "B8", 'RED': "B4"}], + "product_name": "NDVI-MAX", + "no_data": 255, + # "step0_collection": "projects/satromo-int/assets/COL_S2_SR_HARMONIZED_SWISS" +} + +# TEST S2 -TOA: TEST +PRODUCT_S2_LEVEL_1C = { + # "prefix": "S2_L1C_TOA", + "image_collection": "COPERNICUS/S2_HARMONIZED", + "temporal_coverage": 30, # Days + "spatial_scale_export": 10, # Meters + "spatial_scale_export_mask": 60, + "product_name": "S2_LEVEL_1C", + "no_data": 9999, + # "step0_collection": "projects/geetest-386915/assets/col_s2_toa" +} + +# TEST S2 -TOA- NDVI p +PRODUCT_NDVI_MAX_TOA = { + # "prefix": "Sentinel_NDVI-MAX_TOA_CloudFree_crop", + # TODO: check if needed in context with step0 + "image_collection": "COPERNICUS/S2_HARMONIZED", + "temporal_coverage": 1, # Days + "spatial_scale_export": 1, # Meters + "band_names": [{'NIR': "B8", 'RED': "B4"}], + "product_name": "NDVI-MAX_TOA", + "no_data": 255, + # "step0_collection": "projects/geetest-386915/assets/col_s2_toa" +} + +# ch.swisstopo.swisseo_l57-sr +PRODUCT_L57_LEVEL_2 = { + + # TODO: check if needed in context with step0 + "image_collection": "LANDSAT/LT05/C02/T1_L2", + "geocat_id": "tbd", + "temporal_coverage": 1, # Days + "spatial_scale_export": 30, # Meters # TODO: check if needed in context with step0 + # Meters # TODO: check if needed in context with step0 + "product_name": "ch.swisstopo.swisseo_l57-sr_v100", + "no_data": 9999, + # "step0_collection": "projects/satromo-int/assets/COL_LANDSAT_SR_SWISS" +} + +# ch.swisstopo.swisseo_l57-toa +PRODUCT_L57_LEVEL_1 = { + + # TODO: check if needed in context with step0 + "image_collection": "LANDSAT/LT05/C02/T1_TOA", + "geocat_id": "tbd", + "temporal_coverage": 1, # Days + "spatial_scale_export": 30, # Meters # TODO: check if needed in context with step0 + # Meters # TODO: check if needed in context with step0 + "product_name": "ch.swisstopo.swisseo_l57-toa_v100", + "no_data": 9999, + # "step0_collection": "projects/satromo-int/assets/COL_LANDSAT_TOA_SWISS" +} + +# ch.swisstopo.swisseo_l57-sr +PRODUCT_L89_LEVEL_2 = { + + # TODO: check if needed in context with step0 + "image_collection": "LANDSAT/LC08/C02/T1_L2", + "geocat_id": "tbd", + "temporal_coverage": 1, # Days + "spatial_scale_export": 30, # Meters # TODO: check if needed in context with step0 + # Meters # TODO: check if needed in context with step0 + "product_name": "ch.swisstopo.swisseo_l89-sr_v100", + "no_data": 9999, + # "step0_collection": "projects/satromo-int/assets/COL_LANDSAT_SR_SWISS" +} + +# ch.swisstopo.swisseo_l89-toa +PRODUCT_L89_LEVEL_1 = { + + # TODO: check if needed in context with step0 + "image_collection": "LANDSAT/LC08/C02/T1_TOA", + "geocat_id": "tbd", + "temporal_coverage": 1, # Days + "spatial_scale_export": 30, # Meters # TODO: check if needed in context with step0 + # Meters # TODO: check if needed in context with step0 + "product_name": "ch.swisstopo.swisseo_l89-toa_v100", + "no_data": 9999, + # "step0_collection": "projects/satromo-int/assets/COL_LANDSAT_TOA_SWISS" +} + +# ch.swisstopo.swisseo_s3-toa +PRODUCT_S3_LEVEL_1 = { + + # TODO: check if needed in context with step0 + "image_collection": "COPERNICUS/S3/OLCI", + "geocat_id": "tbd", + "temporal_coverage": 1, # Days + "spatial_scale_export": 300, # Meters # TODO: check if needed in context with step0 + # Meters # TODO: check if needed in context with step0 + "product_name": "ch.swisstopo.swisseo_s3-toa_v100", + "no_data": 9999, + # "step0_collection": "projects/satromo-int/assets/COL_S3_TOA_SWISS" +} + +# B custom COLLECTION +# ******************** +# Contains dictionary used to manage custom collection (asset) in GEE, +# for example to clear old images not used anymore. + +# Configure the dict containing +# - the name of the custom collection (asset) in GEE, (eg: projects/satromo-int/assets/COL_S2_SR_HARMONIZED_SWISS ) +# - the function to process the raw data for teh collection (eg:step0_processor_s2_sr.generate_s2_sr_mosaic_for_single_date ) + +# Make sure that the products above use the corresponding custom collection (assets) + +step0 = { + # 'projects/satromo-exolabs/assets/col_s2_toa': { + # 'step0_function': 'step0_processor_s2_toa.generate_s2_toa_mosaic_for_single_date', + # # cleaning_older_than: 2 # entry used to clean assets + # }, + 'projects/geetest-386915/assets/col_s2_sr': { + 'step0_function': 'step0_processor_s2_sr.generate_s2_sr_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + }, + 'projects/satromo-int/assets/COL_LANDSAT_SR_SWISS': { + 'step0_function': 'step0_processor_l57_sr.generate_l57_sr_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + }, + 'projects/satromo-int/assets/COL_LANDSAT_TOA_SWISS': { + 'step0_function': 'step0_processor_l57_toa.generate_l57_toa_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + }, + 'projects/satromo-int/assets/COL_LANDSAT_SR_SWISS': { + 'step0_function': 'step0_processor_l89_sr.generate_l89_sr_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + }, + 'projects/satromo-int/assets/COL_LANDSAT_TOA_SWISS': { + 'step0_function': 'step0_processor_l89_toa.generate_l89_toa_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + }, + 'projects/satromo-int/assets/COL_S3_TOA_SWISS': { + 'step0_function': 'step0_processor_s3_toa.generate_s3_toa_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + }, + 'projects/satromo-int/assets/LST_SWISS': { + 'step0_function': 'step0_processor_msg_lst.generate_msg_lst_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + }, + 'projects/satromo-int/assets/LST_CLIMA_SWISS': { + 'step0_function': 'step0_processor_msg_lst_clima.generate_msg_lst_mosaic_for_single_date' + # cleaning_older_than: 2 # entry used to clean assets + } +} + + +# STAC Integration +# --------------- + +STAC_FOLDER = "stac-collection" +# Use the AWS Cloudfront distribution instead of "https://satromoint.s3.eu-central-2.amazonaws.com/" +STAC_BASE_URL = "https://d29cp2gnktw6by.cloudfront.net/" +STAC_PRODUCT = ["S2_LEVEL_2A", "NDVI-MAX"] + +# under Windows, add \\ to escape the backslash like r'X:\\' +STAC_DESTINATION_DEV = r'X:\\' + +GDRIVE_SOURCE_INT = "geedriveINT:" +GDRIVE_MOUNT_INT = "localgdrive" +STAC_DESTINATION_INT = "s3INT:satromoint" + +# STAC FSDI +# --------------- +STAC_FSDI_SCHEME = 'https' +STAC_FSDI_HOSTNAME = 'sys-data.int.bgdi.ch' +STAC_FSDI_API = '/api/stac/v0.9/' diff --git a/configuration/prod_config.py b/configuration/prod_config.py index 0737024b..ca0d2364 100644 --- a/configuration/prod_config.py +++ b/configuration/prod_config.py @@ -19,10 +19,15 @@ EMPTY_ASSET_LIST = os.path.join("tools", "step0_empty_assets.csv") PROCESSING_DIR = "processing" LAST_PRODUCT_UPDATES = os.path.join("tools", "last_updates.csv") +# Set GDRIVE Type: GCS for Google Cloud Storage and DRIVE for Google Drive +GDRIVE_TYPE = "DRIVE" +# Set GCS Bucket name of Google Cloud Storage +GCLOUD_BUCKET = "satromo_export" # Local Machine GDRIVE_SOURCE_DEV = "geedriveINT:" # under Windows, add \\ to escape the backslash like r'Y:\\' GDRIVE_MOUNT_DEV = r'G:\\' +# GDRIVE_MOUNT_DEV = r'M:\\satromo_export' # for GCS # under Windows, add \\ to escape the backslash like r'X:\\' S3_DESTINATION_DEV = r'X:\\' # GITHUB @@ -79,7 +84,7 @@ "geocat_id": "7ae5cd5b-e872-4719-92c0-dc2f86c4d471", "temporal_coverage": 1, # Days "spatial_scale_export": 10, # Meters # TODO: check if needed in context with step0 - # Meters # TODO: check if needed in context with step0 + "asset_size": 5, "spatial_scale_export_mask": 10, "product_name": "ch.swisstopo.swisseo_s2-sr_v100", "no_data": 9999, @@ -96,21 +101,23 @@ "product_name": "ch.swisstopo.swisseo_vhi_v100", "no_data": 255, "missing_data": 110, + "asset_size": 2, 'NDVI_reference_data': 'projects/satromo-prod/assets/col/1991-2020_NDVI_SWISS', 'LST_reference_data': 'projects/satromo-prod/assets/col/2012-2020_LST_SWISS', - # "step1_collection": 'projects/satromo-prod/assets/VHI_SWISS', - # "step0_collection": "projects/satromo-prod/assets/col/S2_SR_HARMONIZED_SWISS" + 'LST_current_data': 'projects/satromo-prod/assets/col/LST_SWISS', + "step1_collection": 'projects/satromo-prod/assets/col/VHI_SWISS', + "step0_collection": "projects/satromo-prod/assets/col/S2_SR_HARMONIZED_SWISS" } -# MSG – MeteoSchweiz -PRODUCT_MSG = { +# MSG – MeteoSchweiz: only used for repreocessing +PRODUCT_MSG_CLIMA = { # # this is placeholder, needed for the step0 function, "image_collection": "METEOSCHWEIZ/MSG", - "temporal_coverage": 7, # Days + "temporal_coverage": 1, # Days "product_name": "ch.meteoschweiz.landoberflaechentemperatur", "no_data": 0, - # 'step0_collection': 'projects/satromo-prod/assets/col/VHI_SWISS' + # 'step0_collection': 'projects/satromo-int/assets/LST_CLIMA_SWISS' } diff --git a/main_functions/main_extract_warnregions.py b/main_functions/main_extract_warnregions.py index 53b4b1a5..35d25a94 100644 --- a/main_functions/main_extract_warnregions.py +++ b/main_functions/main_extract_warnregions.py @@ -38,19 +38,16 @@ """ - - # ---------------------------------------- def export(raster_url, shape_file, filename, dateISO8601, missing_values): - - # Parameters : - regionnr = "REGION_NR" #depend on the SHP file delivered by FOEN - regionname = "Name" #depend on the SHP file delivered by FOEN + + # Parameters : + regionnr = "REGION_NR" # depend on the SHP file delivered by FOEN + regionname = "Name" # depend on the SHP file delivered by FOEN vhimean = "vhi_mean" availpercen = "availability_percentage" date_column = "date" - gdf = gpd.read_file(shape_file) @@ -104,12 +101,12 @@ def export(raster_url, shape_file, filename, dateISO8601, missing_values): availability_percentage_rounded) # Print statistics - print(f"Region {region}: {region_name}") - print(f" Min value: {min_value}") - print(f" Max value: {max_value}") - print(f" Mean value: {mean_value_rounded}") - print( - f" Percentage of availability: {availability_percentage:.1f}%") + # print(f"Region {region}: {region_name}") + # print(f" Min value: {min_value}") + # print(f" Max value: {max_value}") + # print(f" Mean value: {mean_value_rounded}") + # print( + # f" Percentage of availability: {availability_percentage:.1f}%") except ValueError: # Handle empty intersection (assign missing_values) raster_values.append(missing_values) diff --git a/main_functions/main_publish_stac_fsdi.py b/main_functions/main_publish_stac_fsdi.py index a46c28c2..4f21b963 100644 --- a/main_functions/main_publish_stac_fsdi.py +++ b/main_functions/main_publish_stac_fsdi.py @@ -12,7 +12,7 @@ from datetime import datetime import pyproj import re - +import time import configuration as config """ @@ -107,9 +107,8 @@ def initialize_fsdi(): password = os.environ.get( 'STAC_PASSWORD', config_data["FSDI"]["password"]) - # INT (github action) + # PROD (github action) else: - # TODO add PROD PW in GA and in config when going live user = os.environ['FSDI_STAC_USER'] password = os.environ['FSDI_STAC_PASSWORD'] @@ -362,20 +361,54 @@ def create_asset(stac_asset_url, payload): Returns: bool: True if the creation was successful, False otherwise. """ - response = requests.put( - url=stac_asset_url, - auth=(user, password), - # auth=HTTPBasicAuth(user, password), - # proxies={"https": proxy.guess_proxy()}, - # verify=False, - json=payload - ) - if response.status_code // 200 == 1: - return True - else: - print(response.json()) + # Maximum number of retries + max_retries = 3 + # Delay between retries in seconds + delay = 20 + # Flag to indicate success or failure + success = False + + for attempt in range(max_retries): + try: + # Send PUT request + response = requests.put( + url=stac_asset_url, + auth=(user, password), + json=payload + ) + + # Check the status code + if response.status_code == 200 or response.status_code == 201: + try: + # Try to decode the JSON response + data = response.json() + #print(data) + success = True + break + except requests.exceptions.JSONDecodeError as e: + print("Error decoding JSON:", e) + print("Response content:", response.text) + else: + print( + f"Attempt {attempt + 1}: Received status code {response.status_code}") + print("Response content:", response.text) + if attempt < max_retries - 1: + print(f"Retrying in {delay} seconds...") + time.sleep(delay) + + except requests.exceptions.RequestException as e: + # Handle other request-related exceptions + print(f"An error occurred: {e}") + if attempt < max_retries - 1: + print(f"Retrying in {delay} seconds...") + time.sleep(delay) + + if not success: + print("Failed to receive a successful response after multiple attempts.") return False + return True + def upload_asset_multipart(stac_asset_filename, stac_asset_url, part_size=part_size_mb * 1024 ** 2): """ diff --git a/main_functions/main_thumbnails.py b/main_functions/main_thumbnails.py index 8efb1f33..9961f658 100644 --- a/main_functions/main_thumbnails.py +++ b/main_functions/main_thumbnails.py @@ -61,11 +61,11 @@ def fill_buffer_switzerland(shapefile_dir, shapefile_name, target_width): result = subprocess.run(command, check=True, capture_output=True, text=True) - # Check and print the command output - if result.returncode == 0: - print("Rasterization successful!") - else: - print(f"Error: {result.stderr}") + # # Check and print the command output + # if result.returncode == 0: + # print("Rasterization successful!") + # else: + # print(f"Error: {result.stderr}") def apply_overlay(input_file, output_file): diff --git a/main_functions/main_utils.py b/main_functions/main_utils.py index d29627e1..b63dcea2 100644 --- a/main_functions/main_utils.py +++ b/main_functions/main_utils.py @@ -5,6 +5,40 @@ import csv import os import json +import pandas as pd + + +def is_date_in_empty_asset_list(collection, check_date_str): + """ + Check if a given date for a collection is in the empty asset list. + + Args: + collection_basename (str): The basename of the collection. + check_date_str (str): The date to check in string format. + config (object): Configuration object containing EMPTY_ASSET_LIST path. + + Returns: + bool: True if the date is found in the empty asset list, False otherwise. + """ + try: + collection_basename = os.path.basename(collection) + # Read the empty asset list + df = pd.read_csv(config.EMPTY_ASSET_LIST) + + # Filter the dataframe for the given collection and date + df_selection = df[(df.collection == collection_basename) & + (df.date == check_date_str)] + + # Check if any rows match the criteria + if len(df_selection) > 0: + print(check_date_str+' is in empty_asset_list for '+collection) + return True + else: + return False + + except Exception as e: + print(f"Error checking empty asset list: {e}") + return False # Return False in case of any error to allow further processing def get_github_info(): @@ -203,7 +237,7 @@ def get_quadrants(roi): def start_export(image, scale, description, region, filename_prefix, crs): """ - Starts an export task to export an image to Google Drive. + Starts an export task to export an image to Google Drive or Google Cloud Storage Args: @@ -213,6 +247,7 @@ def start_export(image, scale, description, region, filename_prefix, crs): region: The region of interest (ROI) to export. filename_prefix: The prefix to be used for the exported file. crs: The coordinate reference system (CRS) of the exported image. + GCS=False : If set to true, an GCS will be used for output Returns: None @@ -223,17 +258,31 @@ def start_export(image, scale, description, region, filename_prefix, crs): # Use projection() from one of the original images instead, e.g., S2_collection.first().projection(), *after the aoi/date filters but before mapping any transformation function* then # work with the corresponding CrsTtransform derived from it crs:'EPSG:32632', crsTransform: '[10,0,0,0,10,0]' - task = ee.batch.Export.image.toDrive( - image=image, - description=description, - scale=scale, - region=region, - fileNamePrefix=filename_prefix, - maxPixels=1e13, - crs=crs, - fileFormat="GeoTIFF" - ) - + if config.GDRIVE_TYPE == "GCS": + # print("GCS export") + task = ee.batch.Export.image.toCloudStorage( + image=image, + description=description, + scale=scale, + region=region, + fileNamePrefix=filename_prefix, + maxPixels=1e13, + crs=crs, + fileFormat="GeoTIFF", + bucket=config.GCLOUD_BUCKET + ) + else: + # print("Drive export") + task = ee.batch.Export.image.toDrive( + image=image, + description=description, + scale=scale, + region=region, + fileNamePrefix=filename_prefix, + maxPixels=1e13, + crs=crs, + fileFormat="GeoTIFF" + ) # OPTION Export in GEE with UTM32 # for images covering that UTM zone this will be the best, but for the neighbouring UTM zones, images will be reprojected. So, for mosaics for larger areas spanning multiple UTM zones maybe some alternative projection is more convenient. # task = ee.batch.Export.image.toDrive( @@ -265,7 +314,8 @@ def start_export(image, scale, description, region, filename_prefix, crs): # Get Task ID task_id = task.status()["id"] - print("Exporting with Task ID:", task_id + f" file {filename_prefix}...") + print("Exporting with Task ID:", task_id + + f" file {filename_prefix} to {config.GDRIVE_TYPE}...") # Save Task ID and filename to a text file header = ["Task ID", "Filename"] diff --git a/main_functions/util_checkassets.py b/main_functions/util_checkassets.py new file mode 100644 index 00000000..3cbbfece --- /dev/null +++ b/main_functions/util_checkassets.py @@ -0,0 +1,220 @@ +""" +This script is designed to work with Google Earth Engine (GEE) and Google Drive. It authenticates GEE and Google Drive, lists assets from a GEE collection, extracts timestamps from filenames, identifies missing timestamps based on a specified date range, and optionally exports the missing timestamps to a CSV fil + + +Modules Used + + import ee: Google Earth Engine API for interacting with GEE resources. + import os: Module for interacting with the operating system, particularly for directory and file operations. + import re: Regular expressions module for string pattern matching and extraction. + import datetime and import timedelta: Modules for handling dates and time arithmetic. + import pandas as pd: Pandas library, commonly used for data manipulation (imported but not used in this script). + import csv: Module for CSV file operations. + from pydrive.auth import GoogleAuth: Google Drive authentication module. + from oauth2client.service_account import ServiceAccountCredentials: OAuth2 credentials for authenticating service accounts. + +Functions + + initialize_gee_and_drive(credentials_file) + Initializes Google Earth Engine and Google Drive authentication using the provided service account credentials. + Args: + credentials_file (str): Path to the service account credentials JSON file. + Returns: + True if initialization is successful, False otherwise. + list_assets(path) + Lists assets in the specified GEE collection. + Args: + path (str): The path to the GEE collection. + Returns: + A list of assets in the collection. + extract_timestamps(directory_path, start_date, end_date) + Extracts timestamps from filenames in the specified directory that match a specific pattern and are within the given date range. + Args: + directory_path (str): The path to the directory containing the files. + start_date (str): The start date in YYYY-MM-DD format. + end_date (str): The end date in YYYY-MM-DD format. + Returns: + A list of timestamps extracted from the filenames. + find_missing_assets(source_timestamps, asset_names) + Identifies timestamps that are present in the source_timestamps but missing from the asset_names list. + Args: + source_timestamps (list): List of timestamps to check. + asset_names (list): List of asset names containing dates. + Returns: + A sorted list of missing timestamps. + doy_to_date(doy, year=2023) + Converts a day-of-year (DOY) to a date for the specified year. + Args: + doy (int): The day of the year (1-365). + year (int): The year for the conversion (default is 2023). + Returns: + A datetime object representing the date. + export_to_csv(missing_numbers, filename) + Exports a list of missing timestamps to a CSV file. + Args: + missing_numbers (list): List of missing timestamps. + filename (str): The name of the CSV file to create. + Returns: + None. + +Usage + + Initialization: + The script starts by initializing GEE and Google Drive authentication using the provided credentials file. + + Asset Listing: + It lists all assets in the specified GEE collection path. + + Timestamp Extraction: + Timestamps are extracted from files in the specified directory that match the date range. + + Finding Missing Assets: + The script checks if there are any timestamps missing between the source files and the GEE assets. + + Exporting Missing Assets: + If any timestamps are missing, they are exported to a CSV file. + + Output: + The script outputs a message indicating whether synchronization is "OK" or "NOT OK" based on the presence of missing assets. + +Example: + $ python util_checkassets.py + + Ensure to update the 'credentials_file' and "data_path" and 'collection_path' variables accordingly. +""" +import ee +import os +import re +from datetime import datetime +import pandas as pd +from pydrive.auth import GoogleAuth +from oauth2client.service_account import ServiceAccountCredentials +import csv +from datetime import datetime, timedelta + + +def initialize_gee_and_drive(credentials_file): + """ + Initializes Google Earth Engine (GEE) and Google Drive authentication. + + Args: + credentials_file (str): Path to the service account credentials JSON file. + + Returns: + bool: True if initialization is successful, False otherwise. + """ + # Set scopes for Google Drive + scopes = ["https://www.googleapis.com/auth/drive"] + + try: + # Initialize Google Earth Engine + ee.Initialize() + + # Authenticate with Google Drive + gauth = GoogleAuth() + gauth.service_account_file = credentials_file + gauth.credentials = ServiceAccountCredentials.from_json_keyfile_name( + gauth.service_account_file, scopes=scopes + ) + gauth.ServiceAuth() + + print("Google Earth Engine and Google Drive authentication successful.") + return True + + except Exception as e: + print( + f"Failed to initialize Google Earth Engine and Google Drive: {str(e)}") + return False + + +# Function to list assets in a collection +def list_assets(path): + asset_list = ee.data.listAssets({'parent': path}) + return asset_list.get('assets', []) + +# Function to extract numbers from asset names + + +def extract_timestamps(directory_path, start_date, end_date): + + # Initialize an empty list to store the timestamps + timestamps = [] + + # Loop through all files in the directory + for file_name in os.listdir(directory_path): + # Check if the file ends with '10m_dx.tif' + if file_name.endswith('10m_dx.tif'): + # Retrieve the date from the filename using regex + match = re.search(r'(\d{4}-\d{2}-\d{2}T\d{6})', file_name) + if match: + timestamp = match.group(0).upper() + timestamps.append(timestamp) + return timestamps + +# Function to determine missing numbers between 1 and 365 + + +def find_missing_assets(source_timestamps, asset_names): + # Extract dates from asset names + asset_dates = set() + for asset in asset_names: + match = re.search( + r'S2-SR_mosaic_(\d{4}-\d{2}-\d{2}T\d{6})_registration-10m', asset) + if match: + asset_date = match.group(1).upper() + asset_dates.add(asset_date) + + # Find missing timestamps + missing_timestamps = [ + timestamp for timestamp in source_timestamps if timestamp not in asset_dates] + + return sorted(missing_timestamps) + +# Function to convert day-of-year to date for the year 2023 + + +def doy_to_date(doy, year=2023): + return datetime(year, 1, 1) + timedelta(days=doy - 1) + +# Function to export missing numbers and corresponding dates to CSV + + +def export_to_csv(missing_numbers, filename): + with open(filename, mode='w', newline='') as file: + writer = csv.writer(file) + writer.writerow(['Missing Date']) + for number in missing_numbers: + writer.writerow([number]) + + +if __name__ == "__main__": + # Path to the service account credentials JSON file + credentials_file = r'C:\path\xxx' + + start_date = "2023-10-01" + end_date = "2023-10-10" + + # Specify the originla data path + data_path = r'\\path\2023-10' + + # Specify the collection path + collection_path = 'projects/satromo-432405/assets/COL_S2_SR_DXDY' + + # Initialize Google Earth Engine and Google Drive authentication + if initialize_gee_and_drive(credentials_file): + asset_list = list_assets(collection_path) + asset_names = [asset['name'] for asset in asset_list] + source_timestamps = extract_timestamps(data_path, start_date, end_date) + missing_assets = find_missing_assets(source_timestamps, asset_names) + + # Check if missing_assets is empty + if not missing_assets: + print("OK sync for "+start_date+" to "+end_date + + " for "+data_path+" and "+collection_path) + else: + print("NOT OK: Missing assets:", missing_assets) + # Specify the output CSV file name + csv_filename = 'missing_asset_numbers.csv' + export_to_csv(missing_assets, csv_filename) + + print(f'Missing asset exported to {csv_filename}') diff --git a/main_functions/util_deleteassets.py b/main_functions/util_deleteassets.py new file mode 100644 index 00000000..1276e888 --- /dev/null +++ b/main_functions/util_deleteassets.py @@ -0,0 +1,76 @@ +from oauth2client.service_account import ServiceAccountCredentials +from pydrive.auth import GoogleAuth +import ee +import json +import os +# Assuming configuration.py is in ../configuration directory + + +config = r'C:\temp\topo-satromo\secrets\geetest-credentials-int.secret' +asset_list = r'C:\temp\topo-satromo\main_functions\asset_list.txt' + +def initialize_gee_and_drive(): + """ + Initializes Google Earth Engine (GEE) and Google Drive based on the run type. + + If the run type is 2, initializes GEE and authenticates using the service account key file. + If the run type is 1, initializes GEE and authenticates using secrets from GitHub Action. + + Prints a success or failure message after initializing GEE. + + Note: This function assumes the required credentials and scopes are properly set. + + Returns: + None + """ + # Set scopes for Google Drive + scopes = ["https://www.googleapis.com/auth/drive"] + + # Initialize GEE and authenticate using the service account key file + + # Read the service account key file + with open(config, "r") as f: + data = json.load(f) + + # Authenticate with Google using the service account key file + gauth = GoogleAuth() + gauth.service_account_file = config + gauth.service_account_email = data["client_email"] + gauth.credentials = ServiceAccountCredentials.from_json_keyfile_name( + gauth.service_account_file, scopes=scopes + ) + + # Initialize Google Earth Engine + credentials = ee.ServiceAccountCredentials( + gauth.service_account_email, gauth.service_account_file + ) + ee.Initialize(credentials) + + # Test if GEE initialization is successful + image = ee.Image("NASA/NASADEM_HGT/001") + title = image.get("title").getInfo() + + if title == "NASADEM: NASA NASADEM Digital Elevation 30m": + print("GEE initialization successful") + else: + print("GEE initialization FAILED") + + +# Authenticate with GEE and GDRIVE +initialize_gee_and_drive() + +# Read asset names from the text file into a list +with open(asset_list, 'r') as file: + asset_names = file.readlines() + +# Remove whitespace and newline characters from the asset names +asset_names = [name.strip() for name in asset_names] + +# Delete assets from ImageCollection +for asset_name in asset_names: + try: + # Delete the asset + ee.data.deleteAsset(asset_name) + print(f'Deleted asset: {asset_name}') + except Exception as e: + print(f'Error deleting asset: {asset_name}, {e}') diff --git a/main_functions/util_moveassets.py b/main_functions/util_moveassets.py index 225ea638..c45240f2 100644 --- a/main_functions/util_moveassets.py +++ b/main_functions/util_moveassets.py @@ -67,21 +67,18 @@ def initialize_gee_and_drive(): # Remove whitespace and newline characters from the asset names asset_names = [name.strip() for name in asset_names] -# Define the Earth Engine username for the source account - - # Define the Earth Engine username for the destination account -destination_username = "satromo-int" +destination_username = "satromo-prod" # Copy each asset from the source to the destination for asset_name in asset_names: # Construct the source and destination asset paths # source_asset = "users/{}/SATROMO/{}".format(source_username, asset_name) - source_asset = "projects/geetest-386915/assets/terrain_shadow/{}".format( + source_asset = "projects/satromo-int/assets/1991-2020_NDVI_STATS15/{}".format( asset_name) # destination_asset = 'projects/{}/assets/res/{}'.format( # destination_username, asset_name) - destination_asset = 'projects/{}/assets/TERRAINSHADOW_SWISS/{}'.format( + destination_asset = 'projects/{}/assets/col/1991-2020_NDVI_SWISS/{}'.format( destination_username, asset_name) # Copy the asset from the source to the destination diff --git a/main_functions/util_upload_dxdy.py b/main_functions/util_upload_dxdy.py new file mode 100644 index 00000000..c12a6f07 --- /dev/null +++ b/main_functions/util_upload_dxdy.py @@ -0,0 +1,299 @@ +import ee +import os +import argparse +import numpy as np +from datetime import datetime, timezone +from pydrive.auth import GoogleAuth +import json +import time +import subprocess +from oauth2client.service_account import ServiceAccountCredentials +from google.cloud import storage +import re + +""" +Header: +------- +Script Name: util_upload_dxdy.py +Description: This script automates the process of uploading a dx dy mosaic image to Google Cloud Storage (GCS) and then to Google Earth Engine (GEE) collection. It checks the environment (local or production), initializes the necessary credentials, and processes the upload and configuration of the image asset. + +Introduction: +------------- +The primary purpose of this script is to streamline the workflow for uploading and managing mosaic images within the Google Cloud and Google Earth Engine ecosystems. This script is particularly made for uploading coregistration shift matrix for s2 to be used in GEE. + +Content: +-------- +1. determine_run_type(): + - Determines the run environment (local or production). + - Initializes Google Cloud and Google Earth Engine credentials. +2. upload_dx_dy_mosaic_for_single_date(day_to_process, collection): + - Uploads a specified GeoTIFF file to Google Cloud Storage. + - Configures and processes the image within Google Earth Engine. + - Manages metadata, checks for existing assets, and starts task execution for upload to GEE colleaction as asset. + +Example: +---------- +python util_upload_dxdy.py -d "/path/to/your/dx file.tif" + + + +""" + + +def determine_run_type(): + """ + Determines the run type based on the existence of the SECRET on the local machine file. + + If the file `config.GDRIVE_SECRETS` exists, sets the run type to 2 (DEV) and prints a corresponding message. + Otherwise, sets the run type to 1 (PROD) and prints a corresponding message. + """ + global run_type + # Set scopes for Google Drive + scopes = ["https://www.googleapis.com/auth/drive"] + + if os.path.exists(config_GDRIVE_SECRETS): + run_type = 2 + + # Read the service account key file + with open(config_GDRIVE_SECRETS, "r") as f: + data = json.load(f) + + # Authenticate with Google using the service account key file + gauth = GoogleAuth() + gauth.service_account_file = os.path.join( + "secrets", "geetest-credentials.secret") + gauth.service_account_email = data["client_email"] + print("\nType 2 run PROCESSOR: We are on a local machine") + else: + run_type = 1 + gauth = GoogleAuth() + google_client_secret = os.environ.get('GOOGLE_CLIENT_SECRET') + google_client_secret = json.loads(google_client_secret) + gauth.service_account_email = google_client_secret["client_email"] + google_client_secret_str = json.dumps(google_client_secret) + + # Write the JSON string to a temporary key file + gauth.service_account_file = "keyfile.json" + with open(gauth.service_account_file, "w") as f: + f.write(google_client_secret_str) + + gauth.credentials = ServiceAccountCredentials.from_json_keyfile_name( + gauth.service_account_file, scopes=scopes + ) + print("\nType 1 run PROCESSOR: We are on GitHub") + + # Test if GEE initialization is successful + # Initialize Google Earth Engine + credentials = ee.ServiceAccountCredentials( + gauth.service_account_email, gauth.service_account_file + ) + ee.Initialize(credentials) + + image = ee.Image("NASA/NASADEM_HGT/001") + title = image.get("title").getInfo() + + if title == "NASADEM: NASA NASADEM Digital Elevation 30m": + print("GEE initialization successful") + else: + print("GEE initialization FAILED") + + # Initialize GCS + global storage_client + storage_client = storage.Client.from_service_account_json( + gauth.service_account_file) + + +def upload_dx_dy_mosaic_for_single_date(day_to_process: str, collection: str) -> None: + """ + upload a dX dy mosaic for a single date to Google Cloud Storage and Google Earth Engine. + + Args: + day_to_process (str): The date to process (format: YYYY-MM-DD). + collection (str): The collection name for the asset. + task_description (str): Description of the task. + """ + + # CONFIGURATION START + # -------------------- + # Filename + file_name = os.path.basename(day_to_process) + + # Timestamp + timestamp = re.search(r'(\d{4}-\d{2}-\d{2}T\d{6})', + file_name).group(0).upper() + + # EPSG traget colllection: shoudl be EPSG:32632 + epsg = 'EPSG:32632' + + # assetname + # asset_name = re.sub(r'.*?(s2-sr.*)\.tif', r'\1', file_name) + asset_name = "S2-SR_mosaic_"+timestamp+"_registration-10m" + + # GCS Google cloud storage bucket + bucket_name = "s2_sr_registration_swiss" + + # new_band_names = + band_names = ['reg_dx', 'reg_dy'] + + # Switch to Wait till upload is complete + wait_for_upload = True + + # Switch to delete local file + delete_local = True + + # CONFIGURATION END + # -------------------- + + print("processing DXDY of: "+timestamp) + + # Merging the corresponding DX and DY into one file + + command = ["gdalbuildvrt", + "-separate", + asset_name+".vrt", + day_to_process, + day_to_process.replace('_dx', '_dy') + ] + # print(command) + result = subprocess.run(command, check=True, + capture_output=True, text=True) + # print(result) + + command = ["gdal_translate", + "-of", "COG", + # "-co", "TILING_SCHEME=GoogleMapsCompatible", + "-co", "COMPRESS=DEFLATE", + "-co", "PREDICTOR=2", + "-co", "NUM_THREADS=ALL_CPUS", + asset_name+".vrt", + asset_name+".tif", + ] + # print(command) + result = subprocess.run(command, check=True, + capture_output=True, text=True) + # print(result) + + # check if we are on a local machine or on github + determine_run_type() + + # initialize_bucket(bucket_name) + bucket = storage_client.bucket(bucket_name) + + # Upload the file to the bucket + try: + blob = bucket.blob(asset_name+".tif") + blob.upload_from_filename( + asset_name+".tif") + print("SUCCESS: uploaded to gs://"+bucket_name+"/"+asset_name+".tif") + + # delete file on GCS + print("Starting export task "+asset_name+" to GEE ...") + except Exception as e: + # Handle any exceptions raised during the upload process + print(f"ERROR: uploading file to GCS: {e}") + + # Load the GeoTIFF file as an Earth Engine Image + image = ee.Image.loadGeoTIFF( + f"gs://{bucket_name}/"+asset_name+".tif") + + # rename band + image = image.rename(band_names) + + # Parse the datetime in UTC + date_time = datetime.strptime( + timestamp + "Z", '%Y-%m-%dT%H%M%SZ') + date_time = date_time.replace(tzinfo=timezone.utc) + + # Set metadata properties + + image = image.set({ + # Convert to milliseconds Start timestamp + 'system:time_start': int(date_time.timestamp()) * 1000, + # Assuming single timestamp Convert to milliseconds End timestamp + 'system:time_end': int(date_time.timestamp()) * 1000, + # Set the name of the image + 'system:name': asset_name+".tif", + # Set the date + 'date': timestamp, + # Method + 'method': "AROSICS", + # Version + 'product_version': "v1.0.0", + # Arosics orig_name X + 'orig_file_dx': file_name, + # Arosics orig_name Y + 'orig_file_dy': file_name.replace('_dx', '_dy'), + }) + + # Check if the asset already exists + asset_exists = ee.data.getInfo(collection+"/"+asset_name) + + # If the asset exists, delete it, otherwise upload failes + if asset_exists: + ee.data.deleteAsset(collection+"/"+asset_name) + print(f"Deleted existing asset: {asset_name}") + + # Export the image to the asset folder + task = ee.batch.Export.image.toAsset( + image, + scale=10, + description="regDX_regDY_"+timestamp, + crs=epsg, + maxPixels=1e10, + assetId=collection+"/"+asset_name + ) # force Enable overwrite + task.start() + + if wait_for_upload is True: + # Wait for 60 seconds + time.sleep(60) + + # Check the export task status: + # If bulk upload is required: no while loop is required, you need to parse thet stats in dict and then check for FAILED tasks + while task.active(): + print("Export task "+asset_name+" is still active. Waiting...") + time.sleep(60) # Check status every 5 seconds + + # If the task is completed, continue with cleaning up GCS + if task.status()['state'] == 'COMPLETED': + # Continue with your code here + pass + print("SUCCESS: uploaded to collection "+collection+" finished") + + # delete file on GCS + blob.delete() + + # If the task has failed, print the error message + elif task.status()['state'] == 'FAILED': + error_message = task.status()['error_message'] + print("ERROR: Export task "+asset_name + + " failed with error message:", error_message) + + # remove the local file + if delete_local is True: + os.remove(asset_name+".tif") + os.remove(asset_name+".vrt") + else: + pass + + +if __name__ == "__main__": + global config_GDRIVE_SECRETS + config_GDRIVE_SECRETS = r'C:\temp\topo-satromo\secrets\geetest-credentials-int.secret' + + # Define the default path for `day_to_process` + default_day_to_process = os.path.join( + 'C:', 'temp', 'temp', '2023-10', 'S2-L2A-mosaic_2023-10-24T104019_registration_swiss-10m_dx.tif') + collection = "projects/satromo-int/assets/COL_S2_SR_DXDY" + + # Set up argument parsing + parser = argparse.ArgumentParser(description="Process DX DY for upload.") + parser.add_argument('-d', '--day', type=str, default=default_day_to_process, + help="Path to the dx file. Defaults to the file defined in the code.") + args = parser.parse_args() + + # Use the argument if provided, otherwise fall back to the default + day_to_process = args.day + + # Call the function with the appropriate day_to_process value + upload_dx_dy_mosaic_for_single_date(day_to_process, collection) diff --git a/requirements.txt b/requirements.txt index 642e0d0a..de309249 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,8 +12,8 @@ click-plugins==1.1.1 cligj==0.7.2 colorama==0.4.6 earthengine-api==0.1.395 -fiona==1.9.6 -geopandas==0.14.3 +fiona>=1.10b2 +geopandas==0.14.4 google-api-core==2.11.0 google-api-python-client==2.88.0 google-auth==2.19.1 @@ -54,5 +54,5 @@ six==1.16.0 snuggs==1.4.7 tzdata==2023.3 uritemplate==4.1.1 -urllib3==1.26.18 +urllib3==1.26.19 varint==1.0.2 diff --git a/satromo_processor.drawio b/satromo_processor.drawio index dae073d9..c0b4e973 100644 --- a/satromo_processor.drawio +++ b/satromo_processor.drawio @@ -1,6 +1,6 @@ - + - + @@ -17,10 +17,10 @@ - + - + @@ -82,8 +82,8 @@ - - + + @@ -112,7 +112,7 @@ - + @@ -123,7 +123,7 @@ - + @@ -138,8 +138,8 @@ - - + + @@ -172,6 +172,27 @@ + + + + + + + + + + + + + + + + + + + + + @@ -186,7 +207,7 @@ - + @@ -230,8 +251,10 @@ - - + + + + @@ -242,12 +265,9 @@ - + - - - @@ -301,15 +321,15 @@ - + - + - + @@ -321,17 +341,9 @@ - + - - - - - - - - @@ -343,7 +355,7 @@ - + @@ -354,17 +366,14 @@ - + - - - - - + + @@ -377,9 +386,6 @@ - - - @@ -389,22 +395,10 @@ - - - - + - - - - - - - - - - + @@ -419,6 +413,16 @@ + + + + + + + + + + @@ -427,30 +431,33 @@ - + - - + - + - + + + + + - - + + diff --git a/satromo_processor.drawio.svg b/satromo_processor.drawio.svg index 4e2fb6e5..978d1e2a 100644 --- a/satromo_processor.drawio.svg +++ b/satromo_processor.drawio.svg @@ -1,3 +1,4 @@ + -
SATROMO
SATROMO
step0_functions.py
step0_functions.py
start
(current_date)
start...

GET COLLECTION INFO


get_step0_dict


- collection names  


- product names  


- temporal coverages
GET COLLECTION INFO...

GET ASSET STATUS INFO


- def step0_main


- def step0_check_collection


- def check_if_asset_prepare  


- def write_task_metadata_if_needed
GET ASSET STATUS INFO...
NO
NO
YES
YES
is asset available?
is ass...

START ASSET GENERATION


check_if_asset_prepared
-  generate_single_date_function
START ASSET GENERATION...
*CONFIG
py
*CONF...

step0[collection]['step0_function']


step0[collection]['step0_function']...
end
end
step0_processor_<sensor>_<level>.py
step0_processor_<sensor>_<level>.py

CONFIG SINGLE SCENE


- def generate_<sensor>_<prod>_mosaic_for_single_date
cloudMasking = True
terrainShadowDetection = True / False
swathMosaic = True/  False
coRegistration = True / False
export<>bands = True / False
exportRegLayers = True / False
exportMask =True / False
exportcloud =True / False
AOI / REFERENCE / SATELLITE DATA : set path
CONFIG SINGLE SCENE...
is an image  available?
is an...

step0_utils.py


def write_asset_as_empty
step0_utils.py...

Iif  image_list_size ==0
"no candidate scene"
 
NO
Iif  image_list_size ==0...
step0_empty_assets.csv
step0...

WRITE TILES METADATA


- def generate_<sensor>_<prod>_mosaic_for_single_date
write tile json
WRITE TILES METADATA...
YES
YES

CLOUD & CLOUD SHADOW


- def maskCloudsAndShadow
 isNotCloud = clouds.lt(50)
 minimum shadow distance 30 pixel

capture semi-transparent cloud edges
Find dark pixels but exclude lakes and rivers
Project shadows from clouds
combined mask for clouds and cloud shadows
CLOUD & CLOUD SHADOW...
S2_srWithCloudMask
S2_srWithCloudMask

TERRAIN SHADOW


- def addTerrainShadow
  ee.Terrain.hillShadow
TERRAIN SHADOW...

MOSAIC


- def mosaic_collection
 mosaics image acquired on the same day (same image swath)
 Also copy the properties ["system:time_start", "system:index", "date", "month",  "SENSING_ORBIT_NUMBER", "PROCESSING_BASELINE",                                              "SPACECRAFT_NAME", "MEAN_SOLAR_ZENITH_ANGLE",                                     "MEAN_SOLAR_AZIMUTH_ANGLE"])
MOSAIC...
is a mosaic
cloudy or cloud / terrain shadowy?def addMaskedPixelCount
length_without_clouds == 0

is a m...

COREGISTER to REFERENCE IMAGE


- def S2regFunc
function co-registers Sentinel-2 images to the Sentinel-2 global reference image

resampling bicubic with R bands
.addBands(reg_dx)
.addBands(reg_dy) 
.addBands(reg_confidence) 
.addBands(reg_offset
.addBands(reg_angle)
COREGISTER to REFERENCE IMAGE...
NO
NO

EXPORT ASSET


- ee.batch.Export.image.toAsset

crs='EPSG:2056',
'S2-L2A_mosaic_' + sensing_date_read + '_bands-10m'
bands: ['B2', 'B3', 'B4', 'B8']
      ['terrainShadowMask', 'cloudAndCloudShadowMask']
      ['reg_dx', 'reg_dy', 'reg_confidence']
      ['cloudProbability']
'S2-L2A_Mosaic_' + sensing_date_read + '_bands-20m'
bands: ['B8A','B11']
EXPORT ASSET...
assets_10m.gee
assets_20m.gee

asset...
"cloudy"
"cloudy"
processor.py
processor.py
end
end
yes 
yes 
No
No
does new imagery exist?
does ne...
Filter Collection 
Filter...
last_updates.csv
last_...
Clip Collection
Clip C...
Create ARD / Indices
Create...
Split into
quadrants
Split...
PROPERTIES json
PROPE...
LOGINFO csv
LOGIN...
Export
Export
running_tasks.csv
runni...
*.tif
*.tif
publisher.py
publisher.py
yes
yes
No
No
are there completed  tasks?
are th...
Get ARD / Indices Metadata
Get AR...
LOGINFO csv
LOGIN...
Move 
files
Move...



Amazon S3
Amaz...
PROPERTIES json
PROPE...
LOGINFO csv
LOGIN...
.tif
.tif
catalog.json
catal...
update status
update...
completed_tasks.csv
compl...
Create STAC catalog
Create...
Amazon Cloudfront
Ama...
Merge files
Merge...
YES
YES
\ No newline at end of file +
SATROMO
step0_functions.py
start
(current_date)

GET COLLECTION INFO


get_step0_dict


- collection names  


- product names  


- temporal coverages

GET ASSET STATUS INFO


- def step0_main


- def step0_check_collection


- def check_if_asset_prepare  


- def write_task_metadata_if_needed
NO
YES
is asset available?

START ASSET GENERATION


check_if_asset_prepared
-  generate_single_date_function
*CONFIG
py

step0[collection]['step0_function']


end
step0_processor_<sensor>_<level>.py

CONFIG SINGLE SCENE


- def generate_<sensor>_<prod>_mosaic_for_single_date
cloudMasking = True
terrainShadowDetection = True / False
swathMosaic = True/  False
coRegistration = True / False
export<>bands = True / False
exportRegLayers = True / False
exportMask =True / False
exportcloud =True / False
cloudScorePlus = True / False
terrainShadowDetectionPrecalculated = True / False
coRegistrationPrecalculated = True / False
AOI / REFERENCE / SATELLITE DATA : set path
is an image  available?

step0_utils.py


def write_asset_as_empty

Iif  image_list_size ==0
"no candidate scene"
 
NO
step0_empty_assets.csv

WRITE TILES METADATA


- def generate_<sensor>_<prod>_mosaic_for_single_date
write tile json
YES

CLOUD & CLOUD SHADOW


- def maskCloudsAndShadow
 isNotCloud = clouds.lt(50)
 minimum shadow distance 30 pixel

capture semi-transparent cloud edges
Find dark pixels but exclude lakes and rivers
Project shadows from clouds
combined mask for clouds and cloud shadows

- Cloudscore plus
S2_srWithCloudMask

TERRAIN SHADOW


- def addTerrainShadow
  ee.Terrain.hillShadow
-  def addTerrainShadow_predefined

MOSAIC


- def mosaic_collection
 mosaics image acquired on the same day (same image swath)
 Also copy the properties ["system:time_start", "system:index", "date", "month",  "SENSING_ORBIT_NUMBER", "PROCESSING_BASELINE",                                              "SPACECRAFT_NAME", "MEAN_SOLAR_ZENITH_ANGLE",                                     "MEAN_SOLAR_AZIMUTH_ANGLE"])
is a mosaic
cloudy or cloud / terrain shadowy?def addMaskedPixelCount
length_without_clouds == 0

COREGISTER to REFERENCE IMAGE


- def S2regFunc
function co-registers Sentinel-2 images to the Sentinel-2 global reference image

resampling bicubic with R bands
.addBands(reg_dx)
.addBands(reg_dy) 
.addBands(reg_confidence) 
.addBands(reg_offset
.addBands(reg_angle)

def S2regprecalcFunc
NO

EXPORT ASSET


- ee.batch.Export.image.toAsset

crs='EPSG:2056',
'S2-L2A_mosaic_' + sensing_date_read + '_bands-10m'
bands: ['B2', 'B3', 'B4', 'B8']
      ['terrainShadowMask', 'cloudAndCloudShadowMask']
      ['reg_dx', 'reg_dy', 'reg_confidence']
      ['cloudProbability']
'S2-L2A_Mosaic_' + sensing_date_read + '_bands-20m'
bands: ['B8A','B11']
assets_10m.gee
assets_20m.gee

"cloudy"
assets / DOY 
mask
Predef TRUE
Precalc TRUE
assets / Orbit
dx/dy
processor.py
end
yes 
No
does new imagery exist?
Filter Collection 
last_updates.csv
Clip Collection
Create ARD / Indices
Split into
quadrants
metadata.json
Export
running_tasks.csv
*.tif
publisher.py
yes
No
For each Date: A) are there completed  tasks?  AND B) five assets for each day?
Get ARD / Indices Metadata
metadata.json
STAC
upload



Amazon S3
metadata.json
.tif
thumbnail
.jpg
update status
completed_tasks.csv
Amazon Cloudfront
Merge files
YES
\ No newline at end of file diff --git a/satromo_processor.py b/satromo_processor.py index 45fe2f62..4528ea49 100644 --- a/satromo_processor.py +++ b/satromo_processor.py @@ -474,7 +474,7 @@ def process_NDVI_MAX_TOA(roi): # For debugging # -------------- - # current_date_str = "2023-07-15" + # current_date_str = "2024-09-13" # print("*****************************\n") # print("using a manual set Date: " + current_date_str) @@ -512,6 +512,7 @@ def process_NDVI_MAX_TOA(roi): print('Collection ready: {}'.format(collection_ready)) for product_to_be_processed in step0_product_dict[collection_ready][0]: print('Launching product {}'.format(product_to_be_processed)) + if product_to_be_processed == 'PRODUCT_NDVI_MAX': # TODO Needs to be checked if needed roi = ee.Geometry.Rectangle(config.ROI_RECTANGLE) result = process_NDVI_MAX(roi) @@ -592,6 +593,6 @@ def process_NDVI_MAX_TOA(roi): else: raise BrokenPipeError('Inconsitent configuration') - print("Result:", result) + # print("Result:", result) print("Processing done!") diff --git a/satromo_publish.py b/satromo_publish.py index 29542617..0ac92623 100644 --- a/satromo_publish.py +++ b/satromo_publish.py @@ -15,6 +15,8 @@ import requests import time from datetime import datetime +from collections import defaultdict +from google.cloud import storage from main_functions import main_thumbnails, main_publish_stac_fsdi, main_extract_warnregions @@ -84,6 +86,11 @@ def initialize_gee_and_drive(): rclone_config_file = config.RCLONE_SECRETS google_secret_file = config.GDRIVE_SECRETS + + # TODO GCS: later move below so it works as well on GITHUB + global storage_client + storage_client = storage.Client.from_service_account_json( + gauth.service_account_file) else: # Initialize GEE and Google Drive using GitHub secrets @@ -147,6 +154,7 @@ def initialize_gee_and_drive(): else: print("GEE initialization FAILED") + def initialize_drive(): """ Re-Initialize Google Drive since it times out. @@ -175,7 +183,6 @@ def initialize_drive(): gauth.service_account_file, scopes=scopes ) - else: # Initialize Google Drive using GitHub secrets @@ -191,11 +198,11 @@ def initialize_drive(): gauth.service_account_file, scopes=scopes ) - # Create the Google Drive client global drive drive = GoogleDrive(gauth) + def download_and_delete_file(file): """ DEV/local machine only Download a file from Google Drive and delete it afterwards. @@ -309,7 +316,7 @@ def merge_files_with_gdal_warp(source): # print(command) result = subprocess.run(command, check=True, capture_output=True, text=True) - print(result) + # print(result) # run gdal translate command = ["gdalwarp", @@ -333,7 +340,7 @@ def merge_files_with_gdal_warp(source): # print(command) result = subprocess.run(command, check=True, capture_output=True, text=True) - print(result) + # print(result) # For Debugging uncomment below # print("Standard Output:") @@ -462,8 +469,20 @@ def clean_up_gdrive(filename): # }).GetList() # The approach above does not work if there are a lot of files - filtered_files = drive.ListFile({"q": "trashed=false"}).GetList() - file_list = [file for file in filtered_files if filename in file['title']] + # TODO GCS HERE:: List forl all files + if config.GDRIVE_TYPE != "GCS": + filtered_files = drive.ListFile({"q": "trashed=false"}).GetList() + file_list = [ + file for file in filtered_files if filename in file['title']] + else: + # TODO GCS HERE:: List forl all files + # initialize_bucket(bucket_name) + bucket = storage_client.bucket(config.GCLOUD_BUCKET) + blobs = bucket.list_blobs() + file_list = [] + for blob in blobs: + if filename in blob.name: + file_list.append(blob.name) # Check if the file is found if len(file_list) > 0: @@ -472,7 +491,10 @@ def clean_up_gdrive(filename): for file in file_list: # Get the current Task id - file_on_drive = file['title'] + if config.GDRIVE_TYPE != "GCS": + file_on_drive = file['title'] + else: + file_on_drive = file file_task_id = extract_value_from_csv( config.GEE_RUNNING_TASKS, file_on_drive.replace(".tif", ""), "Filename", "Task ID") @@ -484,7 +506,16 @@ def clean_up_gdrive(filename): file_task_status['description']) # Delete file on gdrive with muliple attempt - delete_gdrive(file) + if config.GDRIVE_TYPE != "GCS": + delete_gdrive(file) + else: + # Get the blob (file) object + blob = bucket.blob(file) + + # Delete the blob + blob.delete() + + print(f"File {file} deleted from bucket.") # Add DATA GEE PROCESSING info to stats write_file(file_task_status, config.GEE_COMPLETED_TASKS) @@ -686,6 +717,31 @@ def check_substrings_presence(file_merged, substring_to_check, additional_substr else: return False +def check_asset_size(filename): + """ + Checks the asset size of the product defined in the configuration + + Args: + - filename (str): The filename containing the product name to match. + + Returns: + - asset_size (int or None): The needed asset size if a matching product is found, + otherwise None. + """ + # Iterate through all items in the config file + for product_name in dir(config): + # Get the product dictionary + product_info = getattr(config, product_name) + + # Check if it's a dictionary and has the 'product_name' key + if isinstance(product_info, dict) and 'product_name' in product_info: + if product_info['product_name'] in filename: + # Return the expected asset size + return product_info['asset_size'] + + print("No matching product found in the configuration.") + return None # Return None if no matching product is found + if __name__ == "__main__": @@ -696,11 +752,12 @@ def check_substrings_presence(file_merged, substring_to_check, additional_substr initialize_gee_and_drive() # empty temp files on GDrive - file_list = drive.ListFile({'q': "trashed=true"}).GetList() - for file in file_list: - # Delete file on gdrive with muliple attempt - delete_gdrive(file) - # print('GDRIVE TRASH: Deleted file: %s' % file['title']) + if config.GDRIVE_TYPE != "GCS": + file_list = drive.ListFile({'q': "trashed=true"}).GetList() + for file in file_list: + # Delete file on gdrive with muliple attempt + delete_gdrive(file) + # print('GDRIVE TRASH: Deleted file: %s' % file['title']) # Read the status file with open(config.GEE_RUNNING_TASKS, "r") as f: @@ -717,175 +774,213 @@ def check_substrings_presence(file_merged, substring_to_check, additional_substr unique_filenames = list(unique_filenames) - # Check if each quandrant is complete then process - # Iterate over unique filenames + # Step 1: Group by date + grouped_files = defaultdict(list) + for filename in unique_filenames: + # Extract the date part + date_part = filename.split('_mosaic_')[1].split('T')[0] + # Group the filenames by date + grouped_files[date_part].append(filename) - # Keep track of completion status - all_completed = True - # You need to change this if we have more than 4 quadrants - for quadrant_num in range(1, 5): - # Construct the filename with the quadrant - full_filename = filename + "quadrant" + str(quadrant_num) - - # Find the corresponding task ID in the lines list - task_id = None - for line in lines[1:]: - if full_filename in line: - task_id = line.strip().split(",")[0] - break - - if task_id: - # Check task status - task_status = ee.data.getTaskStatus(task_id)[0] - - if task_status["state"] != "COMPLETED": - # Task is not completed - all_completed = False - print(f"{full_filename} - {task_status['state']}") - - # Check overall completion status - if all_completed: - - print(filename+" is ready to process") - - # read metadata from json - with open(os.path.join( - config.PROCESSING_DIR, (filename+"_metadata.json")), 'r') as f: - metadata = json.load(f) - - # Set the buffer based on orbit or use Switzerland wide buffer - if 'GEE_PROPERTIES' in metadata and 'SENSING_ORBIT_NUMBER' in metadata['GEE_PROPERTIES']: - config.BUFFER = os.path.join("assets", "ch_buffer_5000m_2056_" + str( - metadata['GEE_PROPERTIES']['SENSING_ORBIT_NUMBER']) + ".shp") - else: - config.BUFFER = os.path.join("assets", "ch_buffer_5000m.shp") + # Step 2: Create the unique_filename_day list + unique_filename_day = list(grouped_files.values()) - # merge files - file_merged = merge_files_with_gdal_warp(filename) + # Step 3: Loop through unique_filename_day, Start the processing and remove groups from unique_filename + for group in unique_filename_day: + print("Date:", + group[0].split('_mosaic_')[1].split('T')[0], "checking export status") - # check if there is a need to create thumbnail , if yes create it - thumbnail = main_thumbnails.create_thumbnail( - file_merged, metadata['SWISSTOPO']['PRODUCT']) + # Check if each quandrant is complete then process + # Iterate over unique filenames + + # Set asset counter to 0 + all_assets = 0 + for filename in group: + + # Keep track of completion status + all_completed = True + + # You need to change this if we have more than 4 quadrants + for quadrant_num in range(1, 5): + # Construct the filename with the quadrant + full_filename = filename + "quadrant" + str(quadrant_num) + + # Find the corresponding task ID in the lines list + task_id = None + for line in lines[1:]: + if full_filename in line: + task_id = line.strip().split(",")[0] + break + + if task_id: + # Check task status + task_status = ee.data.getTaskStatus(task_id)[0] + + if task_status["state"] != "COMPLETED": + # Task is not completed + all_completed = False + print(f"{full_filename} - {task_status['state']}") + + # Check overall completion status of files + + if all_completed: + all_assets = all_assets + 1 + + # Check overall completion status of all assets for date. + asset_size = check_asset_size(filename) + if all_assets == asset_size: + print(" ... checking status of asset: "+filename) + print(" --> ", + group[0].split('_mosaic_')[1].split('T')[0], "all assets exported and READY ...") + + for filename in group: + + print(filename+" starting processing ... ") + + # read metadata from json + with open(os.path.join( + config.PROCESSING_DIR, (filename+"_metadata.json")), 'r') as f: + metadata = json.load(f) + + # Set the buffer based on orbit or use Switzerland wide buffer + if 'GEE_PROPERTIES' in metadata and 'SENSING_ORBIT_NUMBER' in metadata['GEE_PROPERTIES']: + config.BUFFER = os.path.join("assets", "ch_buffer_5000m_2056_" + str( + metadata['GEE_PROPERTIES']['SENSING_ORBIT_NUMBER']) + ".shp") + else: + config.BUFFER = os.path.join( + "assets", "ch_buffer_5000m.shp") + + # merge files + + file_merged = merge_files_with_gdal_warp(filename) + + # check if there is a need to create thumbnail , if yes create it + thumbnail = main_thumbnails.create_thumbnail( + file_merged, metadata['SWISSTOPO']['PRODUCT']) + + # upload file to FSDI STAC - # upload file to FSDI STAC - main_publish_stac_fsdi.publish_to_stac( - file_merged, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID']) - - # Warnregions: - # swisseo-vhi warnregions: create - - # Check if we deal with VHI Vegetation or Forest files - if check_substrings_presence(file_merged, metadata['SWISSTOPO']['PRODUCT'], ['vegetation-10m.tif', 'forest-10m.tif']) is True: - print("Extracting warnregions stats...") - warnregionfilename = metadata['SWISSTOPO']['PRODUCT']+"_"+metadata['SWISSTOPO']['ITEM'] + \ - "_" + \ - file_merged[file_merged.rfind( - "_") + 1:file_merged.rfind("-")]+"-warnregions" - - # Extracting warnregions - main_extract_warnregions.export(file_merged, config.WARNREGIONS, warnregionfilename, - metadata['SWISSTOPO']['DATEITEMGENERATION']+"T23:59:59Z", config.PRODUCT_VHI['missing_data']) - - # Pushing CSV , GEOJSON and PARQUET - warnformats = [".csv", ".geojson", ".parquet"] # - for format in warnformats: - main_publish_stac_fsdi.publish_to_stac( - warnregionfilename+format, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID']) - # Define the new metadata entry - new_entry_key = (file_merged[file_merged.rfind( - "_") + 1:file_merged.rfind("-")] + "-warnregions" + format.replace(".", "-")).upper() - new_entry_value = { - "PRODUCT": metadata['SWISSTOPO']['PRODUCT'], - "ITEM": metadata['SWISSTOPO']['ITEM'], - "ASSET": warnregionfilename + format, - "SOURCE": file_merged, - "format": format, - "regionId": "RegionID", - "vhiMean": "VHI Mean Region", - "availabilityPercentage": "percentage of available pixels with information within region" - } - - # Update the metadata dictionary with the new entry - metadata[new_entry_key] = new_entry_value - - # Write the updated metadata back to the JSON file - with open(os.path.join( - config.PROCESSING_DIR, file_merged.replace(".tif", "_metadata.json")), 'w') as f: - json.dump(metadata, f) - - # Create a current version and upload file to FSDI STAC, only if the latest item on STAC is newer or of the same age - collection = metadata['SWISSTOPO']['PRODUCT'] - result = extract_and_compare_datetime_from_url(config.STAC_FSDI_SCHEME+"://"+config.STAC_FSDI_HOSTNAME+config.STAC_FSDI_API + - "collections/"+collection+"/items/"+collection.replace("ch.swisstopo.", ""), metadata['SWISSTOPO']['ITEM']) - if result == True: - print("Newest dataset detected: updating CURRENT") - - file_merged_current = re.sub( - r'\d{4}-\d{2}-\d{2}T\d{6}', 'current', file_merged) - # Rename the file - os.rename(file_merged, file_merged_current) - - # Publish current dataset to stac - main_publish_stac_fsdi.publish_to_stac( - file_merged_current, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID'], current=True) - - # Publish current thumbnail if a thumbnail is required - if thumbnail is not False: - main_publish_stac_fsdi.publish_to_stac( - thumbnail, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID'], current=True) - - # Rename the file back - os.rename(file_merged_current, file_merged) - - # Pushing Warnregions CSV , GEOJSON and PARQUET - if check_substrings_presence(file_merged, metadata['SWISSTOPO']['PRODUCT'], ['vegetation-10m.tif', 'forest-10m.tif']) is True: - # create filepath - warnregionfilename_current = re.sub( - r'\d{4}-\d{2}-\d{2}T\d{6}', 'current', warnregionfilename) - for format in warnformats: - - # Rename the file - os.rename(warnregionfilename+format, - warnregionfilename_current+format) - - # Publish current dataset to stac main_publish_stac_fsdi.publish_to_stac( - warnregionfilename_current+format, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID'], current=True) - - # Rename the file back - os.rename(warnregionfilename_current + - format, warnregionfilename+format) - - # move file to INT STAC : in case reproejction is done here: move file_reprojected - move_files_with_rclone( - file_merged, os.path.join(S3_DESTINATION, metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['ITEM'])) - - # Pushing Warnregions CSV , GEOJSON and PARQUET - if check_substrings_presence(file_merged, metadata['SWISSTOPO']['PRODUCT'], ['vegetation-10m.tif', 'forest-10m.tif']) is True: - for format in warnformats: - move_files_with_rclone( - warnregionfilename+format, os.path.join(S3_DESTINATION, metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['ITEM'])) - - # Upload and move thumbnail if a thumbnail is required - if thumbnail is not False: - main_publish_stac_fsdi.publish_to_stac( - thumbnail, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID']) - move_files_with_rclone( - thumbnail, os.path.join(S3_DESTINATION, metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['ITEM'])) - - # clean up GDrive and local drive, move JSON to STAC - # Re-Test if we are on a local machine or if we are on Github: Redo, since GDRIVE might have a timeout - determine_run_type() - - # Authenticate GDRIVE - initialize_drive() - - # os.remove(file_merged) - clean_up_gdrive(filename) - - else: - print(filename+" is NOT ready to process") + file_merged, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID']) + + # Warnregions: + # swisseo-vhi warnregions: create + + # Check if we deal with VHI Vegetation or Forest files + if check_substrings_presence(file_merged, metadata['SWISSTOPO']['PRODUCT'], ['vegetation-10m.tif', 'forest-10m.tif']) is True: + print("Extracting warnregions stats...") + warnregionfilename = metadata['SWISSTOPO']['PRODUCT']+"_"+metadata['SWISSTOPO']['ITEM'] + \ + "_" + \ + file_merged[file_merged.rfind( + "_") + 1:file_merged.rfind("-")]+"-warnregions" + + # Extracting warnregions + main_extract_warnregions.export(file_merged, config.WARNREGIONS, warnregionfilename, + metadata['SWISSTOPO']['DATEITEMGENERATION']+"T23:59:59Z", config.PRODUCT_VHI['missing_data']) + + # Pushing CSV , GEOJSON and PARQUET + warnformats = [".csv", ".geojson", ".parquet"] # + for format in warnformats: + main_publish_stac_fsdi.publish_to_stac( + warnregionfilename+format, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID']) + # Define the new metadata entry + new_entry_key = (file_merged[file_merged.rfind( + "_") + 1:file_merged.rfind("-")] + "-warnregions" + format.replace(".", "-")).upper() + new_entry_value = { + "PRODUCT": metadata['SWISSTOPO']['PRODUCT'], + "ITEM": metadata['SWISSTOPO']['ITEM'], + "ASSET": warnregionfilename + format, + "SOURCE": file_merged, + "format": format, + "regionId": "RegionID", + "vhiMean": "VHI Mean Region", + "availabilityPercentage": "percentage of available pixels with information within region" + } + + # Update the metadata dictionary with the new entry + metadata[new_entry_key] = new_entry_value + + # Write the updated metadata back to the JSON file + with open(os.path.join( + config.PROCESSING_DIR, file_merged.replace(".tif", "_metadata.json")), 'w') as f: + json.dump(metadata, f) + + # Create a current version and upload file to FSDI STAC, only if the latest item on STAC is newer or of the same age + collection = metadata['SWISSTOPO']['PRODUCT'] + result = extract_and_compare_datetime_from_url(config.STAC_FSDI_SCHEME+"://"+config.STAC_FSDI_HOSTNAME+config.STAC_FSDI_API + + "collections/"+collection+"/items/"+collection.replace("ch.swisstopo.", ""), metadata['SWISSTOPO']['ITEM']) + if result == True: + print("Newest dataset detected: updating CURRENT") + + file_merged_current = re.sub( + r'\d{4}-\d{2}-\d{2}T\d{6}', 'current', file_merged) + # Rename the file + os.rename(file_merged, file_merged_current) + + # Publish current dataset to stac + main_publish_stac_fsdi.publish_to_stac( + file_merged_current, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID'], current=True) + + # Publish current thumbnail if a thumbnail is required + if thumbnail is not False: + main_publish_stac_fsdi.publish_to_stac( + thumbnail, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID'], current=True) + + # Rename the file back + os.rename(file_merged_current, file_merged) + + # Pushing Warnregions CSV , GEOJSON and PARQUET + if check_substrings_presence(file_merged, metadata['SWISSTOPO']['PRODUCT'], ['vegetation-10m.tif', 'forest-10m.tif']) is True: + # create filepath + warnregionfilename_current = re.sub( + r'\d{4}-\d{2}-\d{2}T\d{6}', 'current', warnregionfilename) + for format in warnformats: + + # Rename the file + os.rename(warnregionfilename+format, + warnregionfilename_current+format) + + # Publish current dataset to stac + main_publish_stac_fsdi.publish_to_stac( + warnregionfilename_current+format, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID'], current=True) + + # Rename the file back + os.rename(warnregionfilename_current + + format, warnregionfilename+format) + + # move file to INT STAC : in case reproejction is done here: move file_reprojected + move_files_with_rclone( + file_merged, os.path.join(S3_DESTINATION, metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['ITEM'])) + + # Pushing Warnregions CSV , GEOJSON and PARQUET + if check_substrings_presence(file_merged, metadata['SWISSTOPO']['PRODUCT'], ['vegetation-10m.tif', 'forest-10m.tif']) is True: + for format in warnformats: + move_files_with_rclone( + warnregionfilename+format, os.path.join(S3_DESTINATION, metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['ITEM'])) + + # Upload and move thumbnail if a thumbnail is required + if thumbnail is not False: + main_publish_stac_fsdi.publish_to_stac( + thumbnail, metadata['SWISSTOPO']['ITEM'], metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['GEOCATID']) + move_files_with_rclone( + thumbnail, os.path.join(S3_DESTINATION, metadata['SWISSTOPO']['PRODUCT'], metadata['SWISSTOPO']['ITEM'])) + + # clean up GDrive and local drive, move JSON to STAC + # Re -Test if we are on a local machine or if we are on Github: Redo, since GDRIVE might have a timeout + determine_run_type() + + # Authenticate with GDRIVE + initialize_drive() + + # os.remove(file_merged) + clean_up_gdrive(filename) + + # Remove each filename from the original list + unique_filenames.remove(filename) + + else: + print(" ... checking status of asset: "+filename) # delete consolidated META file [os.remove(file) for file in glob.glob("*_metadata.json")] diff --git a/step0_processors/step0_processor_msg_lst.py b/step0_processors/step0_processor_msg_lst.py index 7f39b7e2..dcc42e18 100644 --- a/step0_processors/step0_processor_msg_lst.py +++ b/step0_processors/step0_processor_msg_lst.py @@ -16,13 +16,14 @@ from oauth2client.service_account import ServiceAccountCredentials from google.cloud import storage -# Processing pipeline for daily LandSurfce mosaics over Switzerland. +# Processing pipeline for DAILY NETCDF for daily LandSurfce mosaics over Switzerland. ############################## # INTRODUCTION # This script provides a tool to Access and upload Landsurface (LST) data over Switzerland to GEE. -# It uses CMS SAF data provided by MeteoSwiss LST to be stored as SATROMO assets and -# to calculate VCI and TCI and combine them to the VHI. The CM SAF data are owned by EUMETSAT and are +# It uses CMS SAF data provided by MeteoSwiss LST as +# -> DAILY FILES +# to be stored as SATROMO assets and to calculate VCI and TCI and combine them to the VHI. The CM SAF data are owned by EUMETSAT and are # available to all users free of charge and with no conditions to use. If you wish to use these products, # EUMETSAT's copyright credit must be shown by displaying the words "Copyright (c) (2022) EUMETSAT" under/in # each of these SAF Products used in a project or shown in a publication or website. @@ -321,9 +322,17 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # CONFIGURATION START # -------------------- + # WORKING WITH 1 FILE PER DAY (operational delivery) # netcdf files: download data from data.geo.admin.ch location , check if file exist raw_filename = day_to_process.replace("-", "")+"000000.nc" - data_import_url = "https://data.geo.admin.ch/ch.meteoschweiz.landoberflaechentemperatur/dev/msg.LST_PMW.H_ch02.lonlat_"+raw_filename + + # WORKING WITH 1 FILE PER MONTH (one time delivery) + # modified_date_str = day_to_process.replace("-", "") + # Replace the day part (DD) with "01" + # raw_filename = modified_date_str[:6] + "01"+"000000.nc" + + data_import_url = "https://data.geo.admin.ch/ch.meteoschweiz.globalstrahlung-monatlich/landoberflaechentemperatur/msg.LST_PMW.H_ch02.lonlat_"+raw_filename + # data_import_url = "https://data.geo.admin.ch/ch.meteoschweiz.landoberflaechentemperatur/MSG2004-2023/msg.LST_PMW.H_ch02.lonlat_"+raw_filename # UTC Hour of LST LST_hour = 11 @@ -344,6 +353,9 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # CONFIGURATION END # -------------------- + + # Convert the string to an ee.Date object + # Send a HEAD request to check if the file exists try: # Send a HEAD request to check if the file exists @@ -400,7 +412,7 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str print(f"Error uploading file to GCS: {e}") # Define the asset name for Earth Engine - asset_name = config.PRODUCT_MSG['step0_collection']+"/"+asset_prefix+day_to_process + \ + asset_name = config.PRODUCT_VHI['LST_current_data']+"/"+asset_prefix+day_to_process + \ "T"+str(LST_hour)+"0000"+'_bands-1721m' # Load the GeoTIFF file as an Earth Engine Image @@ -428,6 +440,8 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str 'date': day_to_process, # HourMin Sec 'hour': str(LST_hour), + # Orig filename + 'orig_filename': os.path.basename(data_import_url), # Set the date 'spacecraft_name': info_raw_file['global_attributes']['platform'], # Set the date @@ -440,7 +454,7 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str 'date_created': info_raw_file['global_attributes']['date_created'], # Set the no data value, you can add more properties like baselines etc # '_FillValue': str(info_raw_file['variables']['LST_PMW']['attributes']['_FillValue']) - 'no_data': str(config.PRODUCT_MSG["no_data"]) + 'no_data': str(config.PRODUCT_MSG_CLIMA["no_data"]) }) # Check if the asset already exists @@ -453,7 +467,9 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # Export the image to the asset folder task = ee.batch.Export.image.toAsset( - image, assetId=asset_name) # force Enable overwrite + image, + assetId=asset_name, + description=task_description) # force Enable overwrite task.start() if wait_for_upload is True: @@ -484,3 +500,4 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # remove the local file os.remove("MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") os.remove(raw_filename) + return True diff --git a/step0_processors/step0_processor_msg_lst_clima.py b/step0_processors/step0_processor_msg_lst_clima.py index 8013216e..93b0d0a8 100644 --- a/step0_processors/step0_processor_msg_lst_clima.py +++ b/step0_processors/step0_processor_msg_lst_clima.py @@ -16,13 +16,14 @@ from oauth2client.service_account import ServiceAccountCredentials from google.cloud import storage -# Processing pipeline for daily LandSurfce mosaics over Switzerland. +# Processing pipeline for MONTHLY NETCDF for daily LandSurfce mosaics over Switzerland. ############################## # INTRODUCTION # This script provides a tool to Access and upload Landsurface (LST) data over Switzerland to GEE. -# It uses CMS SAF data provided by MeteoSwiss LST to be stored as SATROMO assets and -# to calculate VCI and TCI and combine them to the VHI. The CM SAF data are owned by EUMETSAT and are +# It uses CMS SAF data provided by MeteoSwiss LST as +# -> MONTHLY FILES +# to be stored as SATROMO assets and to calculate VCI and TCI and combine them to the VHI. The CM SAF data are owned by EUMETSAT and are # available to all users free of charge and with no conditions to use. If you wish to use these products, # EUMETSAT's copyright credit must be shown by displaying the words "Copyright (c) (2022) EUMETSAT" under/in # each of these SAF Products used in a project or shown in a publication or website. @@ -153,11 +154,18 @@ def get_netcdf_info(file_path, epoch_time=None): } # If epoch_time is provided and found in the file, get the variables for that time + if time_index is not None: time_variables = {} for var_name in dataset.variables: var = dataset.variables[var_name] - var_data = var[time_index].tolist() + if 'time' in var.dimensions: + # Find the correct index in all dimensions + index = tuple(time_index if dim == 'time' else slice(None) + for dim in var.dimensions) + var_data = var[index].tolist() + else: + var_data = var[:].tolist() # Non-time variables time_variables[var_name] = { 'dimensions': var.dimensions, 'shape': var.shape, @@ -311,7 +319,7 @@ def export_netcdf_band_to_geotiff(file_path, time_value, output_tiff): def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str, task_description: str) -> None: """ - Generates a MSG LST mosaic for a single date and uploads it to Google Cloud Storage and Google Earth Engine. + Generates a MSG MFG LST mosaic for a single date and uploads it to Google Cloud Storage and Google Earth Engine. Args: day_to_process (str): The date to process (format: YYYY-MM-DD). @@ -322,8 +330,15 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # -------------------- # netcdf files: download data from data.geo.admin.ch location , check if file exist - raw_filename = day_to_process.replace("-", "")+"000000.nc" - data_import_url = "https://data.geo.admin.ch/ch.meteoschweiz.landoberflaechentemperatur/dev/msg.LST_PMW.H_ch02.lonlat_"+raw_filename + modified_date_str = day_to_process.replace("-", "") + # Replace the day part (DD) with "01" + raw_filename = modified_date_str[:6] + "01"+"000000.nc" + + # MFG + data_import_url = "https://data.geo.admin.ch/ch.meteoschweiz.landoberflaechentemperatur/MFG1991-2005/mfg.LST_PMW.H_ch02.lonlat_"+raw_filename + + # MSG + # data_import_url = "https://data.geo.admin.ch/ch.meteoschweiz.landoberflaechentemperatur/MSG2004-2023/msg.LST_PMW.H_ch02.lonlat_"+raw_filename # UTC Hour of LST LST_hour = 11 @@ -335,7 +350,7 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str bucket_name = "viirs_lst_meteoswiss" # GEE asset NAME prefix - asset_prefix = "MSG_METEOSWISS_mosaic_" + asset_prefix = "M_METEOSWISS_mosaic_" # Switch to Wait till upload is complete # if set to false, The deletion of the file on GCS (delete blob) has to be implemented yet @@ -374,7 +389,7 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # Create the TIFF dataset export_netcdf_band_to_geotiff( - raw_filename, epochtime, "MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") + raw_filename, epochtime, "M_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") # Get the metdata for the epoch info_raw_file = get_netcdf_info(raw_filename, epoch_time=epochtime) @@ -386,26 +401,26 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str bucket = storage_client.bucket(bucket_name) # upload local file to GCS - # gs_path = upload_to_gcs("MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif", "MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") + # gs_path = upload_to_gcs("M_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif", "M_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") # Upload the file to the bucket try: - blob = bucket.blob("MSG_LST_"+day_to_process + + blob = bucket.blob("M_LST_"+day_to_process + "T"+str(LST_hour)+"0000.tif") blob.upload_from_filename( - "MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") - print(f"File MSG_LST_"+day_to_process+"T"+str(LST_hour) + - "0000.tif uploaded to gs://"+bucket_name+"/MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") + "M_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") + print(f"File M_LST_"+day_to_process+"T"+str(LST_hour) + + "0000.tif uploaded to gs://"+bucket_name+"/M_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") except Exception as e: # Handle any exceptions raised during the upload process print(f"Error uploading file to GCS: {e}") # Define the asset name for Earth Engine asset_name = config.PRODUCT_MSG_CLIMA['step0_collection']+"/"+asset_prefix+day_to_process + \ - "T"+str(LST_hour)+"0000"+'_bands-4415m' + "T"+str(LST_hour)+"0000"+'_bands-02d' # Load the GeoTIFF file as an Earth Engine Image image = ee.Image.loadGeoTIFF( - f"gs://{bucket_name}/MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") + f"gs://{bucket_name}/M_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") # rename band image = image.rename([band_name]) @@ -423,11 +438,13 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # Assuming single timestamp Convert to milliseconds End timestamp 'system:time_end': int(date_time.timestamp()) * 1000, # Set the name of the image - 'system:name': asset_prefix+day_to_process+"T"+str(LST_hour)+"0000"+'_bands-4415m', + 'system:name': asset_prefix+day_to_process+"T"+str(LST_hour)+"0000"+'_bands-02d', # Set the date 'date': day_to_process, # HourMin Sec 'hour': str(LST_hour), + # Orig filename + 'orig_filename': os.path.basename(data_import_url), # Set the date 'spacecraft_name': info_raw_file['global_attributes']['platform'], # Set the date @@ -453,7 +470,8 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # Export the image to the asset folder task = ee.batch.Export.image.toAsset( - image, assetId=asset_name) # force Enable overwrite + image, assetId=asset_name, + description=task_description) # force Enable overwrite task.start() if wait_for_upload is True: @@ -471,7 +489,7 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str # Continue with your code here pass print("upload finished:" + asset_prefix+day_to_process + - "T"+str(LST_hour)+"0000"+'_bands-4415m') + "T"+str(LST_hour)+"0000"+'_bands-02d') # delete file on GCS blob.delete() @@ -482,5 +500,5 @@ def generate_msg_lst_mosaic_for_single_date(day_to_process: str, collection: str print("Export task failed with error message:", error_message) # remove the local file - os.remove("MSG_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") + os.remove("M_LST_"+day_to_process+"T"+str(LST_hour)+"0000.tif") os.remove(raw_filename) diff --git a/step0_processors/step0_processor_s2_sr.py b/step0_processors/step0_processor_s2_sr.py index 2a62a2c9..f30a7e0a 100644 --- a/step0_processors/step0_processor_s2_sr.py +++ b/step0_processors/step0_processor_s2_sr.py @@ -41,7 +41,9 @@ def generate_s2_sr_mosaic_for_single_date(day_to_process: str, collection: str, # options': True, False - defines if individual scenes get mosaiced to an image swath swathMosaic = True # options': True, False - defines if the coregistration is applied - coRegistration = True + coRegistration = False + # options': True, False - defines if the coregistration is applied + coRegistrationPrecalculated = True # Export switches # options': True, 'False - defines if 10-m-bands are exported': 'B2','B3','B4','B8' @@ -94,6 +96,11 @@ def generate_s2_sr_mosaic_for_single_date(day_to_process: str, collection: str, # processing:The shadow masks are combined into a single image with multiple bands as asset per DOY. terrain_shadow_collection = "projects/satromo-prod/assets/col/TERRAINSHADOW_SWISS/" + # DX DY - Precalculated DX DY shifts + # source: https://github.com/SARcycle/AROSICS/ + # processing: The DX DY are combined into a single image with multiple bands as asset per DATE. + dxdy_collection = "projects/satromo-432405/assets/COL_S2_SR_DXDY" + ############################## # SATELLITE DATA @@ -565,11 +572,26 @@ def mosaic_collection(img): # Getting swisstopo Processor Version processor_version = main_utils.get_github_info() + + # Set TerrainShadow Properties + if terrainShadowDetectionPrecalculated: + terrainshadow_method = terrain_shadow_collection + else: + terrainshadow_method = 'ee.Terrain.hillShadow' + + # Set TerrainShadow Properties + if coRegistrationPrecalculated: + coreg_method = dxdy_collection + else: + coreg_method = 'GEE displacement' + # set the extracted properties to the mosaic mosaic = mosaic.set('system:time_start', time_start) \ .set('system:time_end', time_end) \ .set('index_list', index_list) \ .set('scene_count', scene_count) \ + .set('COREGISTRATION', coreg_method) \ + .set('TERRAIN_SHADOW', terrainshadow_method) \ .set('SWISSTOPO_PROCESSOR', processor_version['GithubLink']) \ .set('SWISSTOPO_RELEASE_VERSION', processor_version['ReleaseVersion']) @@ -640,11 +662,96 @@ def S2regFunc(image): return registered + def S2regprecalcFunc(image, day, collection): + + # Load the collction + dxdy_coll = ee.ImageCollection(collection) + + # Define the precise start and end timestamps for '2023-10-01' + start_datetime = day+'T00:00:00' + end_datetime = day+'T23:59:59' + + # Filter the collection by the precise date and time range + filtered_collection = dxdy_coll.filterDate( + start_datetime, end_datetime) + + # Is a dx dy available for this date -> Yes: continue / No: abort ('No dx dy available') + image_list_size = filtered_collection.size().getInfo() + if image_list_size == 0: + write_asset_as_empty( + collection, day, 'No dx dy available') + return + + # Get the first image that meets the criteria + dxdy = filtered_collection.first() + + # Check if the image exists + if dxdy: + # Get the image ID + dxdy_id = dxdy.get('system:id').getInfo() + print('-> dxdy ID:', dxdy_id) + else: + print('ERROR: No precalculated dxdy found for the specified date.') + + # Extract relevant displacement parameters + # Select the bands 'reg_dx' and 'reg_dy' and divide by 100 + displacement = dxdy.select(['reg_dx', 'reg_dy']).divide(100) + + # Extract relevant displacement parameters + reg_dx = dxdy.select('reg_dx') + reg_dy = dxdy.select('reg_dy') + reg_confidence = dxdy.select( + 'reg_dy').rename('reg_confidence') + # TODO This band is not needed change whole processing chain since now all are 0, till the export + reg_confidence = reg_confidence.multiply(0).round().toUint8() + + # # Use bicubic resampling during registration. + # imageOrig = image.resample('bicubic') + + # # Choose to register using only the 'R' band. + # imageRedBand = imageOrig.select('B4') + + # # Determine the displacement by matching only the 'R' bands. + # displacement = imageRedBand.displacement( + # referenceImage=S2_gri, + # maxOffset=10, + # patchWidth=300, + # stiffness=8 + # ) + + # # Extract relevant displacement parameters + # reg_dx = displacement.select('dx').rename('reg_dx') + # reg_dx = reg_dx.multiply(100).round().toInt16() + # reg_dy = displacement.select('dy').rename('reg_dy') + # reg_dy = reg_dy.multiply(100).round().toInt16() + # reg_confidence = displacement.select( + # 'confidence').rename('reg_confidence') + # reg_confidence = reg_confidence.multiply(100).round().toUint8() + + # Compute image offset and direction. + reg_offset = reg_dx.hypot(reg_dy).rename('reg_offset') + reg_angle = reg_dx.atan2(reg_dy).rename('reg_offsetAngle') + + # Use the computed displacement to register all original bands. + registered = image.displace(displacement) \ + .addBands(reg_dx) \ + .addBands(reg_dy) \ + .addBands(reg_confidence) \ + .addBands(reg_offset) \ + .addBands(reg_angle) + + return registered + # SWITCH if coRegistration is True: print('--- Image swath co-registration applied ---') # apply the registration function S2_sr = S2regFunc(S2_sr) + if coRegistrationPrecalculated is True: + print('--- Image swath co-registration from precalculated dx dy is applied ---') + # apply the registration function + + S2_sr = S2regprecalcFunc(S2_sr, day_to_process, dxdy_collection) ############################## # EXPORT diff --git a/step1_processors/step1_processor_vhi.py b/step1_processors/step1_processor_vhi.py index 9486f360..22587ae5 100644 --- a/step1_processors/step1_processor_vhi.py +++ b/step1_processors/step1_processor_vhi.py @@ -1,7 +1,10 @@ import ee import datetime +from datetime import timedelta import configuration as config from main_functions import main_utils +from step0_processors.step0_utils import write_asset_as_empty +from step0_processors.step0_processor_msg_lst import generate_msg_lst_mosaic_for_single_date # Processing pipeline for daily vegetation health index (VHI) mosaics over Switzerland @@ -112,52 +115,81 @@ def loadLstRefData(doy): LSTref = LSTref.divide(scale) return LSTref -# Helper function to extract the values from specific bits - - -def bitwiseExtract(input, fromBit, toBit): - """ - Extracts values from specific bits in an input integer. - - Args: - input (ee.Number): Input integer. - fromBit (int): Starting bit position. - toBit (int): Ending bit position. - - Returns: - ee.Number: Extracted value. - """ - maskSize = ee.Number(1).add(toBit).subtract(fromBit) - mask = ee.Number(1).leftShift(maskSize).subtract(1) - return input.rightShift(fromBit).bitwiseAnd(mask) - -# function to mask clouds from a "NOAA/VIIRS/001/VNP21A1D" dataset - - -def maskCloudsAndLowQuality(image): - """ - Masks clouds and low-quality pixels in a VIIRS dataset. - - Args: - image (ee.Image): VIIRS image. - - Returns: - ee.Image: Masked image. - """ - # extract the quality band - QC = image.select('QC') - # only keep pixels from the input image where - # Bits 0-1 = 0 (Pixel produced, good quality, no further QA info necessary) - qaMask = bitwiseExtract(QC, 0, 1).eq(0) - image = image.updateMask(qaMask) - return image +# # Helper function to extract the values from specific bits +# def bitwiseExtract(input, fromBit, toBit): +# """ +# Extracts values from specific bits in an input integer. + +# Args: +# input (ee.Number): Input integer. +# fromBit (int): Starting bit position. +# toBit (int): Ending bit position. + +# Returns: +# ee.Number: Extracted value. +# """ +# maskSize = ee.Number(1).add(toBit).subtract(fromBit) +# mask = ee.Number(1).leftShift(maskSize).subtract(1) +# return input.rightShift(fromBit).bitwiseAnd(mask) + +# # function to mask clouds from a "NOAA/VIIRS/001/VNP21A1D" dataset +# def maskCloudsAndLowQuality(image): +# """ +# Masks clouds and low-quality pixels in a VIIRS dataset. + +# Args: +# image (ee.Image): VIIRS image. + +# Returns: +# ee.Image: Masked image. +# """ +# # extract the quality band +# QC = image.select('QC') +# # only keep pixels from the input image where +# # Bits 0-1 = 0 (Pixel produced, good quality, no further QA info necessary) +# qaMask = bitwiseExtract(QC, 0, 1).eq(0) +# image = image.updateMask(qaMask) +# return image + +# # This function loads the current LST data from VIIRS imagery +# def loadLstCurrentData(date, d, aoi): +# """ +# Loads the current Land Surface Temperature (LST) data from VIIRS imagery. +# Takes the most recent pixels from the ee.ImageCollection. + +# Args: +# date (ee.Date): Date of interest. +# d (int): Number of days to cover in the time window. +# aoi (ee.Geometry): Area of interest. + +# Returns: +# Tuple[ee.Image, str, int]: Tuple containing LST image, index list, and scene count. +# """ +# start_date = date.advance((-1*d), 'day') +# end_date = date.advance(1, 'day') +# LST_col = ee.ImageCollection("NOAA/VIIRS/001/VNP21A1D") \ +# .filterDate(start_date, end_date) \ +# .filterBounds(aoi) +# # apply cloud and low quality masks +# LST_col_masked = LST_col.map(maskCloudsAndLowQuality) +# # Sort the collection by time in descending order +# sortedCollection = LST_col_masked.sort('system:time_start', False) +# # Create list with indices of all used data +# LST_index_list = sortedCollection.aggregate_array('system:index') +# LST_index_list = LST_index_list.join(',') +# LST_scene_count = sortedCollection.size() +# # Create a mosaic using the latest pixel values +# latestMosaic = sortedCollection.mosaic() +# # Select LST for the mosaic +# LSTj = latestMosaic.select('LST_1KM').rename('lst') +# return LSTj, LST_index_list, LST_scene_count # This function loads the current LST data def loadLstCurrentData(date, d, aoi): """ - Loads the current Land Surface Temperature (LST) data from VIIRS imagery. + Loads the current Land Surface Temperature (LST) data from Meteosat data. Takes the most recent pixels from the ee.ImageCollection. Args: @@ -170,21 +202,23 @@ def loadLstCurrentData(date, d, aoi): """ start_date = date.advance((-1*d), 'day') end_date = date.advance(1, 'day') - LST_col = ee.ImageCollection("NOAA/VIIRS/001/VNP21A1D") \ + LST_col = ee.ImageCollection(config.PRODUCT_VHI['LST_current_data']) \ .filterDate(start_date, end_date) \ .filterBounds(aoi) - # apply cloud and low quality masks - LST_col_masked = LST_col.map(maskCloudsAndLowQuality) + # Sort the collection by time in descending order - sortedCollection = LST_col_masked.sort('system:time_start', False) + sortedCollection = LST_col.sort('system:time_start', False) # Create list with indices of all used data LST_index_list = sortedCollection.aggregate_array('system:index') LST_index_list = LST_index_list.join(',') LST_scene_count = sortedCollection.size() # Create a mosaic using the latest pixel values latestMosaic = sortedCollection.mosaic() - # Calculate NDVI for the mosaic - LSTj = latestMosaic.select('LST_1KM').rename('lst') + # Select LST for the mosaic + LST_mosaic = latestMosaic.select('LST_PMW').rename('lst') + # Divide by the scale + scale = ee.Number(100) + LSTj = LST_mosaic.divide(scale) return LSTj, LST_index_list, LST_scene_count @@ -200,6 +234,16 @@ def process_PRODUCT_VHI(roi, collection_ready, current_date_str): Returns: None """ + + ############################## + # MASKS + # Mask for vegetation + vegetation_mask = ee.Image( + 'projects/satromo-prod/assets/res/ch_bafu_lebensraumkarte_mask_vegetation_epsg32632') + # Mask for the forest + forest_mask = ee.Image( + 'projects/satromo-prod/assets/res/ch_bafu_lebensraumkarte_mask_forest_epsg32632') + ############################## # SWITCHES # The switches enable / disable the execution of individual steps in this script @@ -214,6 +258,10 @@ def process_PRODUCT_VHI(roi, collection_ready, current_date_str): # options: True, False - defines if the p05 and p95 percentiles of the reference data sets are used, # otherwise the min and max will be used (False) + ############################## + # SPACE + aoi = roi + ############################## # PRODUCT product_name = config.PRODUCT_VHI['product_name'] @@ -240,33 +288,75 @@ def process_PRODUCT_VHI(roi, collection_ready, current_date_str): CI_method = 'min_and_max' ############################## - # SPACE - aoi = roi - - ############################## - # MASKS - # Mask for vegetation - vegetation_mask = ee.Image( - 'projects/satromo-prod/assets/res/ch_bafu_lebensraumkarte_mask_vegetation_epsg32632') - # Mask for the forest - forest_mask = ee.Image( - 'projects/satromo-prod/assets/res/ch_bafu_lebensraumkarte_mask_forest_epsg32632') - - ############################## - # DATA + # Sentinel S2 SR Data S2_col = ee.ImageCollection(collection_ready) \ .filterDate(start_date, end_date) \ .filterBounds(aoi) \ .filter(ee.Filter.stringEndsWith('system:index', '10m')) ########################################### - # PRE-PROCESSING + # Test PRE conditions + + # TEST LST: LST Asset avilable? We first check if we have the neccessary termporal coverage + + LST_col = ee.ImageCollection(config.PRODUCT_VHI['LST_current_data']) \ + .filterDate(start_date, end_date) + LST_count = LST_col.size().getInfo() + + # If wee don't have LST coverage we start to process it for each day + if LST_count != config.PRODUCT_VHI['temporal_coverage']: + + # Function to check if an asset exists for a given date + def check_asset_exists(date): + next_date = date.advance(1, 'day') + filtered_col = LST_col.filterDate(date, next_date) + count = filtered_col.size().getInfo() + return count > 0 + + # Check each day + check_date = start_date + + while check_date.millis().getInfo() <= end_date.millis().getInfo(): + if not check_asset_exists(check_date): + print( + '... starting import of LST for '+check_date.format('YYYY-MM-dd').getInfo()+" from MeteoSwiss raw data") + result = generate_msg_lst_mosaic_for_single_date(check_date.format('YYYY-MM-dd').getInfo( + ), config.PRODUCT_VHI['LST_current_data'], "LST-"+check_date.format('YYYY-MM-dd').getInfo()) + if result == False: + print('Cutting asset create for VHI ' + + current_date_str + ': missing LST Data') + return + + check_date = check_date.advance(1, 'day') + + # TEST VHI GEE: VHI GEE Asset already exists ?? then skip + VHI_col = ee.ImageCollection(config.PRODUCT_VHI['step1_collection']) \ + .filterMetadata('system:index', 'contains', current_date_str) \ + .filterBounds(aoi) + VHI_count = VHI_col.size().getInfo() + if VHI_count == 2: + print(current_date_str+' is already in ' + + config.PRODUCT_VHI['step1_collection']) + return + + # TEST VHI empty asset? VHI in empty_asset list? then skip + if main_utils.is_date_in_empty_asset_list(config.PRODUCT_VHI['step1_collection'], current_date_str): + return + # Get information about the available sensor data for the range + # Get the number of images in the filtered collection + image_count = S2_col.size().getInfo() + + if image_count == 0: + write_asset_as_empty( + config.PRODUCT_VHI['step1_collection'], current_date_str, 'No S2 SR data available') + return + + # TEST only Process if newer than last product update OR no S2 SR fro specific date is in empty list sensor_stats = main_utils.get_collection_info(S2_col) + if main_utils.check_product_update(config.PRODUCT_VHI['product_name'], sensor_stats[1]) or main_utils.is_date_in_empty_asset_list(collection_ready, current_date_str): - # Check if there is new sensor data compared to the stored dataset - if main_utils.check_product_update(config.PRODUCT_VHI['product_name'], sensor_stats[1]) is True: - print("new imagery from: " + sensor_stats[1]) + print("new/latest imagery from: " + sensor_stats[1]) ########################################### # PROCESSING diff --git a/tools/last_updates.csv b/tools/last_updates.csv index 3f693202..78f46c23 100644 --- a/tools/last_updates.csv +++ b/tools/last_updates.csv @@ -1,4 +1,4 @@ Product,LastSceneDate,RunDate,Status NDVI-MAX,2024-02-15,2024-02-15,complete ch.swisstopo.swisseo_vhi_v100,2020-07-18,2020-07-18,RUNNING -ch.swisstopo.swisseo_s2-sr_v100,2024-09-07,2024-09-10T22:10,complete +ch.swisstopo.swisseo_s2-sr_v100,2024-09-07,2024-09-10T22:10,complete \ No newline at end of file