diff --git a/earthaccess/api.py b/earthaccess/api.py index a10ee0e2..5961b540 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -184,6 +184,7 @@ def download( def open( granules: Union[List[str], List[earthaccess.results.DataGranule]], provider: Optional[str] = None, + sizes: Optional[List[int]] = None, ) -> List[AbstractFileSystem]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -194,7 +195,9 @@ def open( Returns: a list of s3fs "file pointers" to s3 files. """ - results = earthaccess.__store__.open(granules=granules, provider=provider) + results = earthaccess.__store__.open( + granules=granules, provider=provider, sizes=sizes + ) return results diff --git a/earthaccess/store.py b/earthaccess/store.py index cfe7bc79..b8cab700 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import os import shutil @@ -37,6 +39,7 @@ def __reduce__(self) -> Any: self.granule, earthaccess.__auth__, dumps(self.f), + self.f.size, ) def __repr__(self) -> str: @@ -48,9 +51,16 @@ def _open_files( granules: Union[List[str], List[DataGranule]], fs: fsspec.AbstractFileSystem, threads: Optional[int] = 8, + sizes: Optional[List[int]] = None, ) -> List[fsspec.AbstractFileSystem]: + file_sizes: Union[List[int], List[None]] + if sizes is None: + file_sizes = [None] * len(data_links) + else: + file_sizes = sizes + def multi_thread_open(data: tuple) -> EarthAccessFile: - urls, granule = data + urls, granule, size = data if type(granule) is not str: if len(granule.data_links()) > 1: print( @@ -58,14 +68,20 @@ def multi_thread_open(data: tuple) -> EarthAccessFile: "earthaccess will only open the first data link, " "try filtering the links before opening them." ) - return EarthAccessFile(fs.open(urls), granule) + return EarthAccessFile(fs.open(urls, size=size), granule) - fileset = pqdm(zip(data_links, granules), multi_thread_open, n_jobs=threads) + fileset = pqdm( + zip(data_links, granules, file_sizes), multi_thread_open, n_jobs=threads + ) return fileset def make_instance( - cls: Any, granule: DataGranule, auth: Auth, data: Any + cls: Any, + granule: DataGranule, + auth: Auth, + data: Any, + size: int | None, ) -> EarthAccessFile: # Attempt to re-authenticate if not earthaccess.__auth__.authenticated: @@ -79,7 +95,8 @@ def make_instance( ): # NOTE: This uses the first data_link listed in the granule. That's not # guaranteed to be the right one. - return EarthAccessFile(earthaccess.open([granule])[0], granule) + sizes = [size] if size is not None else None + return EarthAccessFile(earthaccess.open([granule], sizes=sizes)[0], granule) else: return EarthAccessFile(loads(data), granule) @@ -269,6 +286,7 @@ def open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -279,7 +297,7 @@ def open( a list of s3fs "file pointers" to s3 files. """ if len(granules): - return self._open(granules, provider) + return self._open(granules, provider, sizes=sizes) print("The granules list is empty, moving on...") return None @@ -288,6 +306,7 @@ def _open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -305,6 +324,7 @@ def _open_granules( granules: List[DataGranule], provider: Optional[str] = None, threads: Optional[int] = 8, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: fileset: List = [] data_links: List = [] @@ -346,6 +366,7 @@ def _open_granules( granules=granules, fs=s3_fs, threads=threads, + sizes=sizes, ) except Exception: print( @@ -355,7 +376,9 @@ def _open_granules( ) return None else: - fileset = self._open_urls_https(data_links, granules, threads=threads) + fileset = self._open_urls_https( + data_links, granules, threads=threads, sizes=sizes + ) return fileset else: access_method = "on_prem" @@ -364,7 +387,9 @@ def _open_granules( granule.data_links(access=access_method) for granule in granules ) ) - fileset = self._open_urls_https(data_links, granules, threads=threads) + fileset = self._open_urls_https( + data_links, granules, threads=threads, sizes=sizes + ) return fileset @_open.register @@ -373,6 +398,7 @@ def _open_urls( granules: List[str], provider: Optional[str] = None, threads: Optional[int] = 8, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: fileset: List = [] data_links: List = [] @@ -404,6 +430,7 @@ def _open_urls( granules=granules, fs=s3_fs, threads=threads, + sizes=sizes, ) except Exception: print( @@ -426,7 +453,9 @@ def _open_urls( "We cannot open S3 links when we are not in-region, try using HTTPS links" ) return None - fileset = self._open_urls_https(data_links, granules, threads) + + fileset = self._open_urls_https(data_links, granules, 8, sizes) + return fileset def get( @@ -639,11 +668,12 @@ def _open_urls_https( urls: List[str], granules: Union[List[str], List[DataGranule]], threads: Optional[int] = 8, + sizes: Optional[List[int]] = None, ) -> List[fsspec.AbstractFileSystem]: https_fs = self.get_fsspec_session() if https_fs is not None: try: - fileset = _open_files(urls, granules, https_fs, threads) + fileset = _open_files(urls, granules, https_fs, threads, sizes) except Exception: print( "An exception occurred while trying to access remote files via HTTPS: "