diff --git a/Prop3D/generate_data/hsds.py b/Prop3D/generate_data/hsds.py index 3525e28..c78672c 100644 --- a/Prop3D/generate_data/hsds.py +++ b/Prop3D/generate_data/hsds.py @@ -36,101 +36,199 @@ def load_h5(job: Job, h5_file: str, hsds_file: str, prefix: str = "", hard_link_ if prefix.startswith('/'): prefix = prefix[1:] - with h5py.File(h5_file, 'r') as f: - for key in f[f'/{prefix}'].keys(): - full_key = f"/{prefix}/{key}" - if all([k not in full_key for k in hard_link_map.keys()]): - #if "domain" not in full_key or "data_splits" not in full_key or "representatives" not in full_key: - RealtimeLogger.info(f"Adding {prefix}/{key}") - elif hardlink_items: - #old_kwd, new_kwd = next(hard_link_map.items()) - for old_kwd, new_kwd in hard_link_map.items(): - if old_kwd in full_key: - store[full_key] = store[f"{prefix.split(old_kwd, 1)[0]}/{new_kwd}/{key}"] - RealtimeLogger.info(f"Hardlinked {full_key}") - break - else: - raise RuntimeError(f"hard_link_map keys ('{list(old_kwd.keys())}')") - - continue + if all([k not in prefix for k in hard_link_map.keys()]): + #if "domain" not in full_key or "data_splits" not in full_key or "representatives" not in full_key: + RealtimeLogger.info(f"Adding /{prefix}/{key}") + + elif hardlink_items: + #old_kwd, new_kwd = next(hard_link_map.items()) + for old_kwd, new_kwd in hard_link_map.items(): + if old_kwd in prefix: + base_key = Path(prefix).stem + with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + store['/{prefix}'] = store[f"/{prefix.split(old_kwd, 1)[0]}/{new_kwd}/{base_key}"] + RealtimeLogger.info(f"Hardlinked /{prefix}") + break + else: + raise RuntimeError(f"hard_link_map keys ('{list(old_kwd.keys())}')") + + return - - h5_object = f[full_key] + with h5py.File(h5_file, 'r') as f: + h5_object = f[f'/{prefix}'] - if isinstance(h5_object, h5py.Group): - with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: - store_object = store.require_group(full_key) + if isinstance(h5_object, h5py.Group): + with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + store_object = store.require_group(f'/{prefix}') - for attr, attr_value in h5_object.attrs.items(): - store_object.attrs[attr] = attr_value + for attr, attr_value in h5_object.attrs.items(): + store_object.attrs[attr] = attr_value + for key in h5_object.keys(): if key not in hard_link_keys: job.addChildJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}", - hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=False) + hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, + hardlink_items=False) else: #Add hard links last job.addFollowOnJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}", - hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=True) - - elif isinstance(h5_object, h5py.Dataset): - dset = h5_object[:] - shape = h5_object.shape - dtype = h5_object.dtype - dset_parameters = {k:getattr(h5_object, k) for k in ('compression', - 'compression_opts', 'scaleoffset', 'shuffle', 'fletcher32', 'fillvalue')} - dset_parameters['chunks'] = True - - retry = False - should_remove = False + hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, + hardlink_items=True) + + elif isinstance(h5_object, h5py.Dataset): + dset = h5_object[:] + shape = h5_object.shape + dtype = h5_object.dtype + dset_parameters = {k:getattr(h5_object, k) for k in ('compression', + 'compression_opts', 'scaleoffset', 'shuffle', 'fletcher32', 'fillvalue')} + dset_parameters['chunks'] = True + + retry = False + should_remove = False + with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + try: + store.require_dataset(f'/{prefix}', shape, dtype, data=dset, **dset_parameters) + except (OSError, TypeError) as e: + if isinstance(e, OSError) and "Request Entity Too Large" in str(e): + #First time loading dataset and it is too large + retry = True + should_remove = True + + elif isinstance(e, TypeError) or (isinstance(e, OSError) and "Dataset is not extensible" in str(e)): + #Previous error and datasets aren't the same size. Remove everything and try again + retry = True + should_remove = True + + if should_remove: with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: try: - store.require_dataset(full_key, shape, dtype, data=dset, **dset_parameters) - except (OSError, TypeError) as e: - if isinstance(e, OSError) and "Request Entity Too Large" in str(e): - #First time loading dataset and it is too large - retry = True - should_remove = True - - elif isinstance(e, TypeError) or (isinstance(e, OSError) and "Dataset is not extensible" in str(e)): - #Previous error and datasets aren't the same size. Remove everything and try again - retry = True - should_remove = True - - if should_remove: + del store[f'/{prefix}'] + except IOError: + pass + + for _ in range(20): with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: try: - del store[full_key] - except IOError: - pass + store[f'/{prefix}'] + sleep(1) + continue + except KeyError: + break + else: + RealtimeLogger.info(f"Unable to remove {full_key} to start over") + return + + if retry: + #Dataset too lareg to pass over http PUT + + #dset_parameters["chunks"] = (chunk_size,) + dset_parameters["maxshape"] = None + + with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + new_dset = store.create_dataset(f'/{prefix}', dset.shape[0], dtype, **dset_parameters) + chunk_size = min(new_dset.chunks[0], 500) #atoms in structure int(len(rec_arr)/4) + + for start in range(0, len(dset), chunk_size): + small_data = dset[start:start+chunk_size] + with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + new_dset = store[f'/{prefix}'] + new_dset.resize(dset.shape[0] + small_data.shape[0], axis=0) + new_dset[-1:] = small_data + # with h5py.File(h5_file, 'r') as f: + # for key in f[f'/{prefix}'].keys(): + # full_key = f"/{prefix}/{key}" + # if all([k not in full_key for k in hard_link_map.keys()]): + # #if "domain" not in full_key or "data_splits" not in full_key or "representatives" not in full_key: + # RealtimeLogger.info(f"Adding {prefix}/{key}") + # elif hardlink_items: + # #old_kwd, new_kwd = next(hard_link_map.items()) + # for old_kwd, new_kwd in hard_link_map.items(): + # if old_kwd in full_key: + # store[full_key] = store[f"{prefix.split(old_kwd, 1)[0]}/{new_kwd}/{key}"] + # RealtimeLogger.info(f"Hardlinked {full_key}") + # break + # else: + # raise RuntimeError(f"hard_link_map keys ('{list(old_kwd.keys())}')") + + # continue + + + # h5_object = f[full_key] + + # if isinstance(h5_object, h5py.Group): + # with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + # store_object = store.require_group(full_key) + + # for attr, attr_value in h5_object.attrs.items(): + # store_object.attrs[attr] = attr_value + + # if key not in hard_link_keys: + # job.addChildJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}", + # hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=False) + # else: + # #Add hard links last + # job.addFollowOnJobFn(load_h5, h5_file, hsds_file, prefix=f"{prefix}/{key}", + # hard_link_keys=hard_link_keys, hard_link_map=hard_link_map, hardlink_items=True) + + # elif isinstance(h5_object, h5py.Dataset): + # dset = h5_object[:] + # shape = h5_object.shape + # dtype = h5_object.dtype + # dset_parameters = {k:getattr(h5_object, k) for k in ('compression', + # 'compression_opts', 'scaleoffset', 'shuffle', 'fletcher32', 'fillvalue')} + # dset_parameters['chunks'] = True + + # retry = False + # should_remove = False + # with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + # try: + # store.require_dataset(full_key, shape, dtype, data=dset, **dset_parameters) + # except (OSError, TypeError) as e: + # if isinstance(e, OSError) and "Request Entity Too Large" in str(e): + # #First time loading dataset and it is too large + # retry = True + # should_remove = True + + # elif isinstance(e, TypeError) or (isinstance(e, OSError) and "Dataset is not extensible" in str(e)): + # #Previous error and datasets aren't the same size. Remove everything and try again + # retry = True + # should_remove = True + + # if should_remove: + # with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + # try: + # del store[full_key] + # except IOError: + # pass - for _ in range(20): - with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: - try: - store[full_key] - sleep(1) - continue - except KeyError: - break - else: - continue - raise RuntimeError(f"Unable to remove {full_key} to start over") - - if retry: - #Dataset too lareg to pass over http PUT + # for _ in range(20): + # with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + # try: + # store[full_key] + # sleep(1) + # continue + # except KeyError: + # break + # else: + # continue + # raise RuntimeError(f"Unable to remove {full_key} to start over") + + # if retry: + # #Dataset too lareg to pass over http PUT - #dset_parameters["chunks"] = (chunk_size,) - dset_parameters["maxshape"] = None + # #dset_parameters["chunks"] = (chunk_size,) + # dset_parameters["maxshape"] = None - with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: - new_dset = store.create_dataset(full_key, dset.shape[0], dtype, **dset_parameters) - chunk_size = min(new_dset.chunks[0], 500) #atoms in structure int(len(rec_arr)/4) - - for start in range(0, len(dset), chunk_size): - small_data = dset[start:start+chunk_size] - with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: - new_dset = store[full_key] - new_dset.resize(dset.shape[0] + small_data.shape[0], axis=0) - new_dset[-1:] = small_data + # with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + # new_dset = store.create_dataset(full_key, dset.shape[0], dtype, **dset_parameters) + # chunk_size = min(new_dset.chunks[0], 500) #atoms in structure int(len(rec_arr)/4) + + # for start in range(0, len(dset), chunk_size): + # small_data = dset[start:start+chunk_size] + # with h5pyd.File(hsds_file, mode=file_mode, use_cache=False, retries=100) as store: + # new_dset = store[full_key] + # new_dset.resize(dset.shape[0] + small_data.shape[0], axis=0) + # new_dset[-1:] = small_data def save_h5(hsds_file: str, h5_file: str, prefix: str = "", hard_link_keys: list[str] = ["validation", "train", "test"], hard_link_map: dict[str,str] = {"data_splits":"domains", "representatives":"domains"}, hardlink_items: bool = False) -> None: