diff --git a/neonwranglerpy/fetcher/fetcher.py b/neonwranglerpy/fetcher/fetcher.py index 9c829dd..087a670 100644 --- a/neonwranglerpy/fetcher/fetcher.py +++ b/neonwranglerpy/fetcher/fetcher.py @@ -3,7 +3,10 @@ import os from concurrent.futures import ThreadPoolExecutor from os.path import join as pjoin +import requests from itertools import repeat + + if 'NEONWRANGLER_HOME' in os.environ: fury_home = os.environ['NEONWRANGLER_HOME'] else: @@ -32,7 +35,7 @@ async def _request(session, url): return await response.json() -async def _download(session, url, filename, sem, size=None): +async def _download(session, url, filename, sem,month, size=None): """An asynchronous function to download file from url. Parameters @@ -46,8 +49,8 @@ async def _download(session, url, filename, sem, size=None): size : int, optional Length of the content in bytes """ + # print(month) if not os.path.exists(filename): - print(f'Downloading: {filename}') async with sem: async with session.get(url) as response: size = response.content_length if not size else size @@ -61,37 +64,55 @@ async def _download(session, url, filename, sem, size=None): # update_progressbar(progress, size) -async def _fetcher(batch, rate_limit, headers): +async def _fetcher(data, rate_limit, headers, files_to_stack_path="filesToStack"): """Fetcher for downloading files.""" sem = asyncio.Semaphore(rate_limit) + data = data['data'] dir_name = '.'.join([ - 'NEON', batch['productCode'], batch['siteCode'], batch['month'], batch['release'] + 'NEON', data['productCode'], data['siteCode'], data['month'], data['release'] ]) - d_urls = [file['url'] for file in batch["files"]] - sizes = [file['size'] for file in batch["files"]] - f_names = [file['name'] for file in batch["files"]] - f_paths = [pjoin(dir_name, name) for name in f_names] + print(f"{data['siteCode']}" + "-" + f"{data['month']}" ) + zip_dir_path = os.path.join(files_to_stack_path, f'{dir_name}') + os.mkdir(zip_dir_path) + + d_urls = [f['url'] for f in data["files"]] + sizes = [f['size'] for f in data["files"]] + f_names = [f['name'] for f in data["files"]] + f_paths = [pjoin(zip_dir_path, name) for name in f_names] + month = [data['month']] zip_url = zip(d_urls, f_paths, sizes) async with aiohttp.ClientSession() as session: tasks = [] for url, name, sz in zip_url: - task = asyncio.create_task(_download(session, url, name, sem, sz)) + task = asyncio.create_task(_download(session, url, name, sem, month, sz)) tasks.append(task) await asyncio.gather(*tasks) -def fetcher(batch, rate_limit, headers): +async def vst_fetcher(item, rate_limit, headers, files_to_stack_path="filesToStack"): + data = requests.get(item).json() + await _fetcher(data, rate_limit, headers, files_to_stack_path) + + +def fetcher(batch, data_type, rate_limit, headers, files_to_stack_path): try: - asyncio.run(_fetcher(batch, rate_limit, headers)) + if data_type == 'vst': + asyncio.run(vst_fetcher(batch, rate_limit, headers, files_to_stack_path)) + elif data_type == 'aop': + asyncio.run(_fetcher(batch, rate_limit, headers, files_to_stack_path)) + except Exception as e: print(f"Error processing URLs: {e}") -def run_threaded_batches(batches, batch_size, rate_limit, headers=None): - max_thread = 2 - num_threads = (len(batches) + batch_size - 1) // batch_size - with ThreadPoolExecutor(max_workers=max_thread) as executor: +def run_threaded_batches(batches, data_type, rate_limit, headers=None, savepath='/filesToStack'): + num_cores = os.cpu_count() # Get the number of CPU cores + num_threads = min(num_cores, len(batches)) # Limit threads to CPU cores or the number of batches, whichever is smaller + + with ThreadPoolExecutor(max_workers=num_threads) as executor: for i in range(num_threads): - batch = batches[i * batch_size:min((i + 1) * batch_size, len(batches))] - executor.map(fetcher, batch, repeat(rate_limit), repeat(headers)) + # Distribute the batches evenly among threads + batch = batches[i::int(num_threads)] + # executor.submit(fetcher, batch, rate_limit, headers) + executor.map(fetcher, batch, repeat(data_type), repeat(rate_limit), repeat(headers), repeat(savepath)) diff --git a/neonwranglerpy/lib/retrieve_coords_itc.py b/neonwranglerpy/lib/retrieve_coords_itc.py index 83a20a8..1cfc08b 100644 --- a/neonwranglerpy/lib/retrieve_coords_itc.py +++ b/neonwranglerpy/lib/retrieve_coords_itc.py @@ -32,17 +32,18 @@ def retrieve_coords_itc(dat): plots_df = plots.loc[vst_rows] convert_dict = { - 'pointID': str, + 'pointID': 'string', } # converting the pointID dtype from string to float64 - plots_df = plots_df.astype({'pointID': 'float64'}) + plots_df = plots_df.astype({'pointID': 'Int64'}).astype(convert_dict) + data = dat.astype({'pointID': 'Int64'}).astype(convert_dict) - vst_df = dat.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID']) - vst_df = vst_df.astype(convert_dict) - na_values = vst_df['stemAzimuth'].isnull().values.any() - if na_values: + vst_df = data.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID']) + na_values = vst_df['stemAzimuth'].isnull().values.sum() + + if na_values > 0: print( - f"{len(na_values)} entries could not be georeferenced and will be discarded.") + f"{na_values} entries could not be georeferenced and will be discarded.") vst_df.dropna(subset=['stemAzimuth'], axis=0, inplace=True) vst_df.reset_index(drop=True, inplace=True) # if retrieve_dist_to_utm doesn't work add p[0] as an extra argument to diff --git a/neonwranglerpy/lib/retrieve_vst_data.py b/neonwranglerpy/lib/retrieve_vst_data.py index ca427d0..50d6f26 100644 --- a/neonwranglerpy/lib/retrieve_vst_data.py +++ b/neonwranglerpy/lib/retrieve_vst_data.py @@ -8,10 +8,11 @@ def retrieve_vst_data(dpId="DP1.10098.001", site="all", - start_date="", - end_date="", + start_date=None, + end_date=None, method="shp", savepath="", + attributes=None, save_files=False, stacked_df=True): """Retrieve Vegetation Structure Data From NEON and Add Individual ID coordinates. @@ -61,14 +62,14 @@ def retrieve_vst_data(dpId="DP1.10098.001", # Adds the UTM coordinates of vst entries based on azimuth and distance vst["vst_mappingandtagging"] = retrieve_coords_itc(vst_mappingandtagging) - attributes = vst_apparentindividual[[ - 'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus', - 'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight', - 'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition', - 'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight', - 'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter' - ]] - + if attributes is None: + attributes = vst_apparentindividual[[ + 'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus', + 'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight', + 'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition', + 'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight', + 'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter' + ]] vst['vst_mappingandtagging'].rename(columns={'eventID': 'tagEventID'}, inplace=True) csv_vst = pd.merge(attributes, vst["vst_mappingandtagging"], diff --git a/neonwranglerpy/utilities/loadByProduct.py b/neonwranglerpy/utilities/loadByProduct.py index ce7815c..66d76e9 100644 --- a/neonwranglerpy/utilities/loadByProduct.py +++ b/neonwranglerpy/utilities/loadByProduct.py @@ -72,12 +72,12 @@ def load_by_product(dpID, f'directly to R with this function.Use the byFileAOP() or ' \ f'byTileAOP() function to download locally." ' - if len(start_date): + if start_date is not None: if not match(DATE_PATTERN, start_date): return 'startdate and enddate must be either NA or valid dates in' \ ' the form YYYY-MM' - if len(end_date): + if end_date is not None: if not match(DATE_PATTERN, end_date): return 'startdate and enddate must be either NA or valid dates in' \ ' the form YYYY-MM' diff --git a/neonwranglerpy/utilities/zipsByProduct.py b/neonwranglerpy/utilities/zipsByProduct.py index abc6e2b..7008e55 100644 --- a/neonwranglerpy/utilities/zipsByProduct.py +++ b/neonwranglerpy/utilities/zipsByProduct.py @@ -6,14 +6,15 @@ from neonwranglerpy.utilities.tools import get_api, get_month_year_urls from neonwranglerpy.utilities.defaults import NEON_API_BASE_URL from neonwranglerpy.utilities.getzipurls import get_zip_urls +import neonwranglerpy.fetcher.fetcher as fetcher DATE_PATTERN = re.compile('20[0-9]{2}-[0-9]{2}') def zips_by_product(dpID, site='all', - start_date='', - end_date='', + start_date=None, + end_date=None, package="basic", release="current", savepath='', @@ -65,12 +66,12 @@ def zips_by_product(dpID, return f"{dpID} is not a properly formatted data product ID. The correct format" \ f" is DP#.#####.00#, where the first placeholder must be between 1 and 4." - if len(start_date): + if start_date is not None: if not re.match(DATE_PATTERN, start_date): return 'startdate and enddate must be either NA or valid dates in the form '\ 'YYYY-MM' - if len(end_date): + if end_date is not None: if not re.match(DATE_PATTERN, end_date): return 'startdate and enddate must be either NA or valid dates in the form ' \ 'YYYY-MM' @@ -109,21 +110,18 @@ def zips_by_product(dpID, print(f"There is no data for site {site}") # extracting urls for specified start and end dates - if len(start_date): + if start_date is not None: month_urls = get_month_year_urls(start_date, month_urls, 'start') if not len(month_urls): print("There is no data for selected dates") - if len(end_date): + if end_date is not None: month_urls = get_month_year_urls(end_date, month_urls, 'end') if not len(month_urls): print("There is no data for selected dates") - # list of all the urls of the files - temp = get_zip_urls(month_urls, package, dpID, release, token) - # TODO: calculate download size # TODO: user input for downloading or not if not savepath: @@ -135,25 +133,10 @@ def zips_by_product(dpID, os.makedirs(savepath) files_to_stack_path = os.path.join(savepath, "filesToStack") - os.mkdir(files_to_stack_path) - - # TODO: add progress bar + if not os.path.isdir(files_to_stack_path): + os.mkdir(files_to_stack_path) if files_to_stack_path: - for zips in temp: - dirname = '.'.join([ - 'NEON', zips['productCode'], zips['siteCode'], zips['month'], - zips['release'] - ]) - zip_dir_path = os.path.join(files_to_stack_path, f'{dirname}') - os.mkdir(zip_dir_path) - for file in zips['files']: - try: - save_path = os.path.join(zip_dir_path, f"{file['name']}") - file_path, _ = urlretrieve(file['url'], save_path) - - except HTTPError as e: - print("HTTPError :", e) - return None + fetcher.run_threaded_batches(month_urls,'vst', rate_limit=2, headers=None, savepath=files_to_stack_path) # returns the path to /filestostack directory return files_to_stack_path