Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: save COGs with dask to azure #195

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from

Conversation

wietzesuijker
Copy link

@wietzesuijker wietzesuijker commented Dec 13, 2024

This PR introduces Azure Blob Storage support for saving COGs with Dask, complementing the existing AWS S3 support.

  • Implements a MultiPartUpload class in _az.py to handle Azure-specific multipart upload operations, including block staging, finalization, and cancellation.
  • Adds MultiPartUploadBase in _multipart.py to provide a shared interface for multipart uploads, applicable to both Azure and S3 implementations.
  • Updates save_cog_with_dask to recognize Azure URLs and integrate the Azure multipart upload workflow.
  • Adds unit tests in tests/test_az.py using mocked Azure SDK to verify functionality specific to Azure Blob Storage.

Here’s a simple test I ran to verify the Azure implementation:

import dask.distributed
import planetary_computer as pc
from odc.geo.cog._tifffile import save_cog_with_dask
from odc.stac import configure_rio, stac_load
from pystac_client import Client

# Setup Dask client for efficient processing
client = dask.distributed.Client()
configure_rio(cloud_defaults=True, client=client)

# Query STAC API for Sentinel-2 data
catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
query = catalog.search(
    collections=["sentinel-2-l2a"],
    datetime="2019-06",
    query={"s2:mgrs_tile": dict(eq="06VVN")},
)
items = list(query.items())
print(f"Found {len(items)} datasets")

# Load Sentinel-2 data with specific bands
resolution = 40 
xx = stac_load(
    items,
    bands=["SCL"],
    resolution=resolution,
    chunks={"x": 2048, "y": 2048},
    patch_url=pc.sign,
    dtype="uint16",
    nodata=0,
)

# Select a single time slice for saving
to_save = xx.SCL.isel(time=3)

# Azure Blob Storage details
blob_name = f"SCL-{to_save.time.dt.strftime('%Y%m%d').item()}.tif"
azure_url = f"az://{container}/{blob_name}"


result = save_cog_with_dask(
    to_save,
    dst=f"az://{container}/{blob_name}",
    compression="DEFLATE",
    predictor=2,
    overview_resampling="nearest",
    azure={
        "account_url": f"https://{storage_account}.blob.core.windows.net",
        "credential": sas_token,  # can be blob specific or container wide
    },
    blocksize=[512, 512],
)

# Compute the result to trigger the actual upload
result.compute()
print(f"COG successfully uploaded to: {azure_url}")

While implementing this, I noticed a couple of quirks in the original code:

  • Spelling Inconsistencies: Words like finalise vs. finalize were used. I used ise rather than ize.
  • Typing Updates: The code inconsistently uses Dict/List from typing vs. built-ins (dict/list). I've changed it to Dict and List since the dev-env.yml specifies python=3.8 (but I hadn't used python 3.8 in ages).

As this is my first PR here, I look forward to feedback and suggestions for improvement!

@wietzesuijker wietzesuijker force-pushed the feat/save-cog-to-azure branch 2 times, most recently from 74e2c99 to bc0bbc2 Compare December 13, 2024 16:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant