Skip to content

Commit

Permalink
Update parallelized hsload, but should probably avoid using it
Browse files Browse the repository at this point in the history
  • Loading branch information
edraizen committed Sep 15, 2023
1 parent 1c98693 commit 7d6964d
Showing 1 changed file with 177 additions and 79 deletions.
256 changes: 177 additions & 79 deletions Prop3D/generate_data/hsds.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,101 +36,199 @@ def load_h5(job: Job, h5_file: str, hsds_file: str, prefix: str = "", hard_link_
if prefix.startswith('/'):
prefix = prefix[1:]

with h5py.File(h5_file, 'r') as f:
for key in f[f'/{prefix}'].keys():
full_key = f"/{prefix}/{key}"
if all([k not in full_key for k in hard_link_map.keys()]):
#if "domain" not in full_key or "data_splits" not in full_key or "representatives" not in full_key:
RealtimeLogger.info(f"Adding {prefix}/{key}")
elif hardlink_items:
#old_kwd, new_kwd = next(hard_link_map.items())
for old_kwd, new_kwd in hard_link_map.items():
if old_kwd in full_key:
store[full_key] = store[f"{prefix.split(old_kwd, 1)[0]}/{new_kwd}/{key}"]
RealtimeLogger.info(f"Hardlinked {full_key}")
break
else:
raise RuntimeError(f"hard_link_map keys ('{list(old_kwd.keys())}')")
continue
if all([k not in prefix for k in hard_link_map.keys()]):
#if "domain" not in full_key or "data_splits" not in full_key or "representatives" not in full_key:
RealtimeLogger.info(f"Adding /{prefix}/{key}")

elif hardlink_items:
#old_kwd, new_kwd = next(hard_link_map.items())
for old_kwd, new_kwd in hard_link_map.items():
if old_kwd in prefix:
base_key = Path(prefix).stem
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
store['/{prefix}'] = store[f"/{prefix.split(old_kwd, 1)[0]}/{new_kwd}/{base_key}"]
RealtimeLogger.info(f"Hardlinked /{prefix}")
break
else:
raise RuntimeError(f"hard_link_map keys ('{list(old_kwd.keys())}')")

return

h5_object = f[full_key]
with h5py.File(h5_file, 'r') as f:
h5_object = f[f'/{prefix}']

if isinstance(h5_object, h5py.Group):
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
store_object = store.require_group(full_key)
if isinstance(h5_object, h5py.Group):
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
store_object = store.require_group(f'/{prefix}')

for attr, attr_value in h5_object.attrs.items():
store_object.attrs[attr] = attr_value
for attr, attr_value in h5_object.attrs.items():
store_object.attrs[attr] = attr_value

for key in h5_object.keys():
if key not in hard_link_keys:
job.addChildJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}",
hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=False)
hard_link_keys=hard_link_keys, hard_link_map=hard_link_map,
hardlink_items=False)
else:
#Add hard links last
job.addFollowOnJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}",
hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=True)

elif isinstance(h5_object, h5py.Dataset):
dset = h5_object[:]
shape = h5_object.shape
dtype = h5_object.dtype
dset_parameters = {k:getattr(h5_object, k) for k in ('compression',
'compression_opts', 'scaleoffset', 'shuffle', 'fletcher32', 'fillvalue')}
dset_parameters['chunks'] = True

retry = False
should_remove = False
hard_link_keys=hard_link_keys, hard_link_map=hard_link_map,
hardlink_items=True)

elif isinstance(h5_object, h5py.Dataset):
dset = h5_object[:]
shape = h5_object.shape
dtype = h5_object.dtype
dset_parameters = {k:getattr(h5_object, k) for k in ('compression',
'compression_opts', 'scaleoffset', 'shuffle', 'fletcher32', 'fillvalue')}
dset_parameters['chunks'] = True

retry = False
should_remove = False
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
try:
store.require_dataset(f'/{prefix}', shape, dtype, data=dset, **dset_parameters)
except (OSError, TypeError) as e:
if isinstance(e, OSError) and "Request Entity Too Large" in str(e):
#First time loading dataset and it is too large
retry = True
should_remove = True

elif isinstance(e, TypeError) or (isinstance(e, OSError) and "Dataset is not extensible" in str(e)):
#Previous error and datasets aren't the same size. Remove everything and try again
retry = True
should_remove = True

if should_remove:
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
try:
store.require_dataset(full_key, shape, dtype, data=dset, **dset_parameters)
except (OSError, TypeError) as e:
if isinstance(e, OSError) and "Request Entity Too Large" in str(e):
#First time loading dataset and it is too large
retry = True
should_remove = True

elif isinstance(e, TypeError) or (isinstance(e, OSError) and "Dataset is not extensible" in str(e)):
#Previous error and datasets aren't the same size. Remove everything and try again
retry = True
should_remove = True

if should_remove:
del store[f'/{prefix}']
except IOError:
pass

for _ in range(20):
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
try:
del store[full_key]
except IOError:
pass
store[f'/{prefix}']
sleep(1)
continue
except KeyError:
break
else:
RealtimeLogger.info(f"Unable to remove {full_key} to start over")
return

if retry:
#Dataset too lareg to pass over http PUT

#dset_parameters["chunks"] = (chunk_size,)
dset_parameters["maxshape"] = None

with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
new_dset = store.create_dataset(f'/{prefix}', dset.shape[0], dtype, **dset_parameters)
chunk_size = min(new_dset.chunks[0], 500) #atoms in structure int(len(rec_arr)/4)

for start in range(0, len(dset), chunk_size):
small_data = dset[start:start+chunk_size]
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
new_dset = store[f'/{prefix}']
new_dset.resize(dset.shape[0] + small_data.shape[0], axis=0)
new_dset[-1:] = small_data
# with h5py.File(h5_file, 'r') as f:
# for key in f[f'/{prefix}'].keys():
# full_key = f"/{prefix}/{key}"
# if all([k not in full_key for k in hard_link_map.keys()]):
# #if "domain" not in full_key or "data_splits" not in full_key or "representatives" not in full_key:
# RealtimeLogger.info(f"Adding {prefix}/{key}")
# elif hardlink_items:
# #old_kwd, new_kwd = next(hard_link_map.items())
# for old_kwd, new_kwd in hard_link_map.items():
# if old_kwd in full_key:
# store[full_key] = store[f"{prefix.split(old_kwd, 1)[0]}/{new_kwd}/{key}"]
# RealtimeLogger.info(f"Hardlinked {full_key}")
# break
# else:
# raise RuntimeError(f"hard_link_map keys ('{list(old_kwd.keys())}')")

# continue


# h5_object = f[full_key]

# if isinstance(h5_object, h5py.Group):
# with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
# store_object = store.require_group(full_key)

# for attr, attr_value in h5_object.attrs.items():
# store_object.attrs[attr] = attr_value

# if key not in hard_link_keys:
# job.addChildJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}",
# hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=False)
# else:
# #Add hard links last
# job.addFollowOnJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}",
# hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=True)

# elif isinstance(h5_object, h5py.Dataset):
# dset = h5_object[:]
# shape = h5_object.shape
# dtype = h5_object.dtype
# dset_parameters = {k:getattr(h5_object, k) for k in ('compression',
# 'compression_opts', 'scaleoffset', 'shuffle', 'fletcher32', 'fillvalue')}
# dset_parameters['chunks'] = True

# retry = False
# should_remove = False
# with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
# try:
# store.require_dataset(full_key, shape, dtype, data=dset, **dset_parameters)
# except (OSError, TypeError) as e:
# if isinstance(e, OSError) and "Request Entity Too Large" in str(e):
# #First time loading dataset and it is too large
# retry = True
# should_remove = True

# elif isinstance(e, TypeError) or (isinstance(e, OSError) and "Dataset is not extensible" in str(e)):
# #Previous error and datasets aren't the same size. Remove everything and try again
# retry = True
# should_remove = True

# if should_remove:
# with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
# try:
# del store[full_key]
# except IOError:
# pass

for _ in range(20):
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
try:
store[full_key]
sleep(1)
continue
except KeyError:
break
else:
continue
raise RuntimeError(f"Unable to remove {full_key} to start over")

if retry:
#Dataset too lareg to pass over http PUT
# for _ in range(20):
# with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
# try:
# store[full_key]
# sleep(1)
# continue
# except KeyError:
# break
# else:
# continue
# raise RuntimeError(f"Unable to remove {full_key} to start over")

# if retry:
# #Dataset too lareg to pass over http PUT

#dset_parameters["chunks"] = (chunk_size,)
dset_parameters["maxshape"] = None
# #dset_parameters["chunks"] = (chunk_size,)
# dset_parameters["maxshape"] = None

with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
new_dset = store.create_dataset(full_key, dset.shape[0], dtype, **dset_parameters)
chunk_size = min(new_dset.chunks[0], 500) #atoms in structure int(len(rec_arr)/4)

for start in range(0, len(dset), chunk_size):
small_data = dset[start:start+chunk_size]
with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
new_dset = store[full_key]
new_dset.resize(dset.shape[0] + small_data.shape[0], axis=0)
new_dset[-1:] = small_data
# with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
# new_dset = store.create_dataset(full_key, dset.shape[0], dtype, **dset_parameters)
# chunk_size = min(new_dset.chunks[0], 500) #atoms in structure int(len(rec_arr)/4)

# for start in range(0, len(dset), chunk_size):
# small_data = dset[start:start+chunk_size]
# with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store:
# new_dset = store[full_key]
# new_dset.resize(dset.shape[0] + small_data.shape[0], axis=0)
# new_dset[-1:] = small_data

def save_h5(hsds_file: str, h5_file: str, prefix: str = "", hard_link_keys: list[str] = ["validation", "train", "test"],
hard_link_map: dict[str,str] = {"data_splits":"domains", "representatives":"domains"}, hardlink_items: bool = False) -> None:
Expand Down

0 comments on commit 7d6964d

Please sign in to comment.