Skip to content

Commit

Permalink
Integrates fetcher with zipsByProduct
Browse files Browse the repository at this point in the history
Signed-off-by: nagesh bansal <[email protected]>
  • Loading branch information
Nageshbansal committed Sep 9, 2023
1 parent 85fabd8 commit 369f430
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 63 deletions.
55 changes: 38 additions & 17 deletions neonwranglerpy/fetcher/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import os
from concurrent.futures import ThreadPoolExecutor
from os.path import join as pjoin
import requests
from itertools import repeat


if 'NEONWRANGLER_HOME' in os.environ:
fury_home = os.environ['NEONWRANGLER_HOME']
else:
Expand Down Expand Up @@ -32,7 +35,7 @@ async def _request(session, url):
return await response.json()


async def _download(session, url, filename, sem, size=None):
async def _download(session, url, filename, sem,month, size=None):
"""An asynchronous function to download file from url.
Parameters
Expand All @@ -46,8 +49,8 @@ async def _download(session, url, filename, sem, size=None):
size : int, optional
Length of the content in bytes
"""
# print(month)
if not os.path.exists(filename):
print(f'Downloading: {filename}')
async with sem:
async with session.get(url) as response:
size = response.content_length if not size else size
Expand All @@ -61,37 +64,55 @@ async def _download(session, url, filename, sem, size=None):
# update_progressbar(progress, size)


async def _fetcher(batch, rate_limit, headers):
async def _fetcher(data, rate_limit, headers, files_to_stack_path="filesToStack"):
"""Fetcher for downloading files."""
sem = asyncio.Semaphore(rate_limit)
data = data['data']
dir_name = '.'.join([
'NEON', batch['productCode'], batch['siteCode'], batch['month'], batch['release']
'NEON', data['productCode'], data['siteCode'], data['month'], data['release']
])
d_urls = [file['url'] for file in batch["files"]]
sizes = [file['size'] for file in batch["files"]]
f_names = [file['name'] for file in batch["files"]]
f_paths = [pjoin(dir_name, name) for name in f_names]
print(f"{data['siteCode']}" + "-" + f"{data['month']}" )
zip_dir_path = os.path.join(files_to_stack_path, f'{dir_name}')
os.mkdir(zip_dir_path)

d_urls = [f['url'] for f in data["files"]]
sizes = [f['size'] for f in data["files"]]
f_names = [f['name'] for f in data["files"]]
f_paths = [pjoin(zip_dir_path, name) for name in f_names]
month = [data['month']]
zip_url = zip(d_urls, f_paths, sizes)
async with aiohttp.ClientSession() as session:
tasks = []
for url, name, sz in zip_url:
task = asyncio.create_task(_download(session, url, name, sem, sz))
task = asyncio.create_task(_download(session, url, name, sem, month, sz))
tasks.append(task)

await asyncio.gather(*tasks)


def fetcher(batch, rate_limit, headers):
async def vst_fetcher(item, rate_limit, headers, files_to_stack_path="filesToStack"):
data = requests.get(item).json()
await _fetcher(data, rate_limit, headers, files_to_stack_path)


def fetcher(batch, data_type, rate_limit, headers, files_to_stack_path):
try:
asyncio.run(_fetcher(batch, rate_limit, headers))
if data_type == 'vst':
asyncio.run(vst_fetcher(batch, rate_limit, headers, files_to_stack_path))
elif data_type == 'aop':
asyncio.run(_fetcher(batch, rate_limit, headers, files_to_stack_path))

except Exception as e:
print(f"Error processing URLs: {e}")


def run_threaded_batches(batches, batch_size, rate_limit, headers=None):
max_thread = 2
num_threads = (len(batches) + batch_size - 1) // batch_size
with ThreadPoolExecutor(max_workers=max_thread) as executor:
def run_threaded_batches(batches, data_type, rate_limit, headers=None, savepath='/filesToStack'):
num_cores = os.cpu_count() # Get the number of CPU cores
num_threads = min(num_cores, len(batches)) # Limit threads to CPU cores or the number of batches, whichever is smaller

with ThreadPoolExecutor(max_workers=num_threads) as executor:
for i in range(num_threads):
batch = batches[i * batch_size:min((i + 1) * batch_size, len(batches))]
executor.map(fetcher, batch, repeat(rate_limit), repeat(headers))
# Distribute the batches evenly among threads
batch = batches[i::int(num_threads)]
# executor.submit(fetcher, batch, rate_limit, headers)
executor.map(fetcher, batch, repeat(data_type), repeat(rate_limit), repeat(headers), repeat(savepath))
15 changes: 8 additions & 7 deletions neonwranglerpy/lib/retrieve_coords_itc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ def retrieve_coords_itc(dat):
plots_df = plots.loc[vst_rows]

convert_dict = {
'pointID': str,
'pointID': 'string',
}
# converting the pointID dtype from string to float64
plots_df = plots_df.astype({'pointID': 'float64'})
plots_df = plots_df.astype({'pointID': 'Int64'}).astype(convert_dict)
data = dat.astype({'pointID': 'Int64'}).astype(convert_dict)

vst_df = dat.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID'])
vst_df = vst_df.astype(convert_dict)
na_values = vst_df['stemAzimuth'].isnull().values.any()
if na_values:
vst_df = data.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID'])
na_values = vst_df['stemAzimuth'].isnull().values.sum()

if na_values > 0:
print(
f"{len(na_values)} entries could not be georeferenced and will be discarded.")
f"{na_values} entries could not be georeferenced and will be discarded.")
vst_df.dropna(subset=['stemAzimuth'], axis=0, inplace=True)
vst_df.reset_index(drop=True, inplace=True)
# if retrieve_dist_to_utm doesn't work add p[0] as an extra argument to
Expand Down
21 changes: 11 additions & 10 deletions neonwranglerpy/lib/retrieve_vst_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

def retrieve_vst_data(dpId="DP1.10098.001",
site="all",
start_date="",
end_date="",
start_date=None,
end_date=None,
method="shp",
savepath="",
attributes=None,
save_files=False,
stacked_df=True):
"""Retrieve Vegetation Structure Data From NEON and Add Individual ID coordinates.
Expand Down Expand Up @@ -61,14 +62,14 @@ def retrieve_vst_data(dpId="DP1.10098.001",
# Adds the UTM coordinates of vst entries based on azimuth and distance
vst["vst_mappingandtagging"] = retrieve_coords_itc(vst_mappingandtagging)

attributes = vst_apparentindividual[[
'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus',
'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight',
'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition',
'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight',
'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter'
]]

if attributes is None:
attributes = vst_apparentindividual[[
'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus',
'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight',
'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition',
'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight',
'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter'
]]
vst['vst_mappingandtagging'].rename(columns={'eventID': 'tagEventID'}, inplace=True)
csv_vst = pd.merge(attributes,
vst["vst_mappingandtagging"],
Expand Down
4 changes: 2 additions & 2 deletions neonwranglerpy/utilities/loadByProduct.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ def load_by_product(dpID,
f'directly to R with this function.Use the byFileAOP() or ' \
f'byTileAOP() function to download locally." '

if len(start_date):
if start_date is not None:
if not match(DATE_PATTERN, start_date):
return 'startdate and enddate must be either NA or valid dates in' \
' the form YYYY-MM'

if len(end_date):
if end_date is not None:
if not match(DATE_PATTERN, end_date):
return 'startdate and enddate must be either NA or valid dates in' \
' the form YYYY-MM'
Expand Down
37 changes: 10 additions & 27 deletions neonwranglerpy/utilities/zipsByProduct.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from neonwranglerpy.utilities.tools import get_api, get_month_year_urls
from neonwranglerpy.utilities.defaults import NEON_API_BASE_URL
from neonwranglerpy.utilities.getzipurls import get_zip_urls
import neonwranglerpy.fetcher.fetcher as fetcher

DATE_PATTERN = re.compile('20[0-9]{2}-[0-9]{2}')


def zips_by_product(dpID,
site='all',
start_date='',
end_date='',
start_date=None,
end_date=None,
package="basic",
release="current",
savepath='',
Expand Down Expand Up @@ -65,12 +66,12 @@ def zips_by_product(dpID,
return f"{dpID} is not a properly formatted data product ID. The correct format" \
f" is DP#.#####.00#, where the first placeholder must be between 1 and 4."

if len(start_date):
if start_date is not None:
if not re.match(DATE_PATTERN, start_date):
return 'startdate and enddate must be either NA or valid dates in the form '\
'YYYY-MM'

if len(end_date):
if end_date is not None:
if not re.match(DATE_PATTERN, end_date):
return 'startdate and enddate must be either NA or valid dates in the form ' \
'YYYY-MM'
Expand Down Expand Up @@ -109,21 +110,18 @@ def zips_by_product(dpID,
print(f"There is no data for site {site}")

# extracting urls for specified start and end dates
if len(start_date):
if start_date is not None:
month_urls = get_month_year_urls(start_date, month_urls, 'start')

if not len(month_urls):
print("There is no data for selected dates")

if len(end_date):
if end_date is not None:
month_urls = get_month_year_urls(end_date, month_urls, 'end')

if not len(month_urls):
print("There is no data for selected dates")

# list of all the urls of the files
temp = get_zip_urls(month_urls, package, dpID, release, token)

# TODO: calculate download size
# TODO: user input for downloading or not
if not savepath:
Expand All @@ -135,25 +133,10 @@ def zips_by_product(dpID,
os.makedirs(savepath)

files_to_stack_path = os.path.join(savepath, "filesToStack")
os.mkdir(files_to_stack_path)

# TODO: add progress bar
if not os.path.isdir(files_to_stack_path):
os.mkdir(files_to_stack_path)

if files_to_stack_path:
for zips in temp:
dirname = '.'.join([
'NEON', zips['productCode'], zips['siteCode'], zips['month'],
zips['release']
])
zip_dir_path = os.path.join(files_to_stack_path, f'{dirname}')
os.mkdir(zip_dir_path)
for file in zips['files']:
try:
save_path = os.path.join(zip_dir_path, f"{file['name']}")
file_path, _ = urlretrieve(file['url'], save_path)

except HTTPError as e:
print("HTTPError :", e)
return None
fetcher.run_threaded_batches(month_urls,'vst', rate_limit=2, headers=None, savepath=files_to_stack_path)
# returns the path to /filestostack directory
return files_to_stack_path

0 comments on commit 369f430

Please sign in to comment.