From e0647db21f213781ce4edca2ccdbed7a23b8e208 Mon Sep 17 00:00:00 2001 From: puhuk Date: Wed, 5 Oct 2022 18:31:18 +0900 Subject: [PATCH 1/3] Update horovod to take all_reduce with group --- ignite/distributed/comp_models/horovod.py | 11 ++++++----- tests/ignite/distributed/utils/__init__.py | 18 +++++------------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/ignite/distributed/comp_models/horovod.py b/ignite/distributed/comp_models/horovod.py index 35c45bff7dd..8c1ecbc6102 100644 --- a/ignite/distributed/comp_models/horovod.py +++ b/ignite/distributed/comp_models/horovod.py @@ -68,8 +68,7 @@ def __init__(self, backend: Optional[str] = None, **kwargs: Any) -> None: def _create_from_backend(self, backend: str, **kwargs: Any) -> None: self._backend = backend # type: str - comm = kwargs.get("comm", None) - hvd.init(comm=comm) + hvd.init(process_sets="dynamic") self._setup_attrs() if torch.cuda.is_available(): torch.cuda.set_device(self.get_local_rank()) @@ -165,14 +164,16 @@ def spawn( _manual_reduce_op_map = {"MIN": torch.min, "MAX": torch.max, "PRODUCT": torch.prod} def _do_all_reduce(self, tensor: torch.Tensor, op: str = "SUM", group: Optional[Any] = None) -> torch.Tensor: - if group is not None: - raise NotImplementedError("all_reduce with group for horovod is not implemented") + if group is not None and not isinstance(group, hvd.ProcessSet): + raise ValueError("Argument group should be list of int or ProcessSet") if op in self._manual_reduce_op_map: op_fn = self._manual_reduce_op_map[op] return self._do_manual_all_reduce(tensor, op_fn) if op not in self._reduce_op_map: raise ValueError(f"Unsupported reduction operation: '{op}'") op = self._reduce_op_map[op] + if group is not None: + return hvd.allreduce(tensor, op=op, process_set=group) return hvd.allreduce(tensor, op=op) def _do_manual_all_reduce(self, tensor: torch.Tensor, op: Any) -> torch.Tensor: @@ -194,7 +195,7 @@ def _do_all_gather(self, tensor: torch.Tensor, group: Optional[Any] = None) -> t return hvd.allgather(tensor) def _do_new_group(self, ranks: List[int], **kwargs: Any) -> Any: - return hvd.ProcessSet(ranks) + return hvd.add_process_set(ranks) def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor: return hvd.broadcast(tensor, root_rank=src) diff --git a/tests/ignite/distributed/utils/__init__.py b/tests/ignite/distributed/utils/__init__.py index 90126223c86..ef8452478e1 100644 --- a/tests/ignite/distributed/utils/__init__.py +++ b/tests/ignite/distributed/utils/__init__.py @@ -128,20 +128,12 @@ def _test_distrib_all_reduce_group(device): bnd = idist.backend() group = idist.new_group(ranks) - if bnd in ("horovod"): - with pytest.raises(NotImplementedError, match=r"all_reduce with group for horovod is not implemented"): - res = idist.all_reduce(t, group=group) - else: - res = idist.all_reduce(t, group=group) - assert res == torch.tensor([sum(ranks)], device=device) + res = idist.all_reduce(t, group=group) + assert res == torch.tensor([sum(ranks)], device=device) t = torch.tensor([rank], device=device) - if bnd in ("horovod"): - with pytest.raises(NotImplementedError, match=r"all_reduce with group for horovod is not implemented"): - res = idist.all_reduce(t, group=ranks) - else: - res = idist.all_reduce(t, group=ranks) - assert res == torch.tensor([sum(ranks)], device=device) + res = idist.all_reduce(t, group=ranks) + assert res == torch.tensor([sum(ranks)], device=device) ranks = "abc" @@ -152,7 +144,7 @@ def _test_distrib_all_reduce_group(device): with pytest.raises(ValueError, match=r"Argument group should be list of int"): res = idist.all_reduce(t, group="abc") elif bnd in ("horovod"): - with pytest.raises(NotImplementedError, match=r"all_reduce with group for horovod is not implemented"): + with pytest.raises(NotImplementedError, match=r"Argument group should be list of int or ProcessSet"): res = idist.all_reduce(t, group="abc") From 8497248cd5cdcffb9ff64bcf9a2ad35383983a7e Mon Sep 17 00:00:00 2001 From: puhuk Date: Wed, 5 Oct 2022 20:15:26 +0900 Subject: [PATCH 2/3] Update horovod.py --- ignite/distributed/comp_models/horovod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ignite/distributed/comp_models/horovod.py b/ignite/distributed/comp_models/horovod.py index 8c1ecbc6102..dd2839319ba 100644 --- a/ignite/distributed/comp_models/horovod.py +++ b/ignite/distributed/comp_models/horovod.py @@ -195,7 +195,7 @@ def _do_all_gather(self, tensor: torch.Tensor, group: Optional[Any] = None) -> t return hvd.allgather(tensor) def _do_new_group(self, ranks: List[int], **kwargs: Any) -> Any: - return hvd.add_process_set(ranks) + return hvd.add_process_set(hvd.ProcessSet(ranks)) def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor: return hvd.broadcast(tensor, root_rank=src) From 1e5a519d3a41b6b340c5a59bb7acd91abb03b36f Mon Sep 17 00:00:00 2001 From: puhuk Date: Wed, 5 Oct 2022 20:57:28 +0900 Subject: [PATCH 3/3] Update horovod.py --- ignite/distributed/comp_models/horovod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ignite/distributed/comp_models/horovod.py b/ignite/distributed/comp_models/horovod.py index dd2839319ba..e4fd168e18f 100644 --- a/ignite/distributed/comp_models/horovod.py +++ b/ignite/distributed/comp_models/horovod.py @@ -195,7 +195,7 @@ def _do_all_gather(self, tensor: torch.Tensor, group: Optional[Any] = None) -> t return hvd.allgather(tensor) def _do_new_group(self, ranks: List[int], **kwargs: Any) -> Any: - return hvd.add_process_set(hvd.ProcessSet(ranks)) + return hvd.ProcessSet(ranks) def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor: return hvd.broadcast(tensor, root_rank=src)