|
28 | 28 | serialize_to_tensor,
|
29 | 29 | upload,
|
30 | 30 | )
|
31 |
| -from olmo_core.utils import TORCH_DTYPE_TO_STR, TORCH_DTYPES, wait_for |
| 31 | +from olmo_core.utils import TORCH_DTYPE_TO_STR, TORCH_DTYPES |
32 | 32 |
|
33 | 33 | from .sharded_flat_tensor import ShardedFlatTensor, ShardingSpec
|
34 | 34 | from .utils import all_gather_object, barrier, get_rank, get_world_size, scatter_object
|
@@ -258,34 +258,20 @@ def save(
|
258 | 258 | clean_up_local_dir = False
|
259 | 259 | if not is_url(dir):
|
260 | 260 | local_dir = Path(dir)
|
261 |
| - if local_rank == 0: |
262 |
| - if save_overwrite and not dir_is_empty(local_dir): |
263 |
| - clear_directory(local_dir) |
264 |
| - local_dir.mkdir(parents=True, exist_ok=True) |
| 261 | + if save_overwrite and not dir_is_empty(local_dir): |
| 262 | + clear_directory(local_dir) |
265 | 263 |
|
266 | 264 | barrier()
|
267 |
| - |
268 |
| - # All ranks wait for rank 0 to create the directory. On NFS the directory might |
269 |
| - # not be available immediately. This also ensures all ranks share the filesystem. |
270 |
| - description = f"Waiting for '{local_dir}' to be created" |
271 |
| - try: |
272 |
| - wait_for(local_dir.exists, description) |
273 |
| - except TimeoutError as e: |
274 |
| - raise RuntimeError( |
275 |
| - f"{description} timed out, please ensure each rank is saving to the same directory on a shared filesystem." |
276 |
| - ) from e |
| 265 | + local_dir.mkdir(parents=True, exist_ok=True) |
277 | 266 | else:
|
278 | 267 | local_dir = Path(tempfile.mkdtemp())
|
279 | 268 | remote_dir = str(dir).rstrip("/")
|
280 | 269 | clean_up_local_dir = True
|
281 | 270 | # NOTE: we do have the ability to clear bucket storage "folders" via `clear_directory`,
|
282 | 271 | # but that's super dangerous. All it takes is one person passing in the wrong folder
|
283 | 272 | # name and they could wipe out a ton of very important checkpoints.
|
284 |
| - if local_rank == 0: |
285 |
| - if not save_overwrite and file_exists(f"{remote_dir}/{self.METADATA_FILENAME}"): |
286 |
| - raise FileExistsError( |
287 |
| - f"Remote checkpoint directory '{remote_dir}' already contains a checkpoint!" |
288 |
| - ) |
| 273 | + if not save_overwrite and file_exists(f"{remote_dir}/{self.METADATA_FILENAME}"): |
| 274 | + raise FileExistsError(f"Remote checkpoint directory '{remote_dir}' already contains a checkpoint!") |
289 | 275 |
|
290 | 276 | try:
|
291 | 277 | if not dir_is_empty(local_dir):
|
|
0 commit comments