diff --git a/ignite/distributed/comp_models/horovod.py b/ignite/distributed/comp_models/horovod.py index 35c45bff7dd..e4fd168e18f 100644 --- a/ignite/distributed/comp_models/horovod.py +++ b/ignite/distributed/comp_models/horovod.py @@ -68,8 +68,7 @@ def __init__(self, backend: Optional[str] = None, **kwargs: Any) -> None: def _create_from_backend(self, backend: str, **kwargs: Any) -> None: self._backend = backend # type: str - comm = kwargs.get("comm", None) - hvd.init(comm=comm) + hvd.init(process_sets="dynamic") self._setup_attrs() if torch.cuda.is_available(): torch.cuda.set_device(self.get_local_rank()) @@ -165,14 +164,16 @@ def spawn( _manual_reduce_op_map = {"MIN": torch.min, "MAX": torch.max, "PRODUCT": torch.prod} def _do_all_reduce(self, tensor: torch.Tensor, op: str = "SUM", group: Optional[Any] = None) -> torch.Tensor: - if group is not None: - raise NotImplementedError("all_reduce with group for horovod is not implemented") + if group is not None and not isinstance(group, hvd.ProcessSet): + raise ValueError("Argument group should be list of int or ProcessSet") if op in self._manual_reduce_op_map: op_fn = self._manual_reduce_op_map[op] return self._do_manual_all_reduce(tensor, op_fn) if op not in self._reduce_op_map: raise ValueError(f"Unsupported reduction operation: '{op}'") op = self._reduce_op_map[op] + if group is not None: + return hvd.allreduce(tensor, op=op, process_set=group) return hvd.allreduce(tensor, op=op) def _do_manual_all_reduce(self, tensor: torch.Tensor, op: Any) -> torch.Tensor: diff --git a/tests/ignite/distributed/utils/__init__.py b/tests/ignite/distributed/utils/__init__.py index 90126223c86..ef8452478e1 100644 --- a/tests/ignite/distributed/utils/__init__.py +++ b/tests/ignite/distributed/utils/__init__.py @@ -128,20 +128,12 @@ def _test_distrib_all_reduce_group(device): bnd = idist.backend() group = idist.new_group(ranks) - if bnd in ("horovod"): - with pytest.raises(NotImplementedError, match=r"all_reduce with group for horovod is not implemented"): - res = idist.all_reduce(t, group=group) - else: - res = idist.all_reduce(t, group=group) - assert res == torch.tensor([sum(ranks)], device=device) + res = idist.all_reduce(t, group=group) + assert res == torch.tensor([sum(ranks)], device=device) t = torch.tensor([rank], device=device) - if bnd in ("horovod"): - with pytest.raises(NotImplementedError, match=r"all_reduce with group for horovod is not implemented"): - res = idist.all_reduce(t, group=ranks) - else: - res = idist.all_reduce(t, group=ranks) - assert res == torch.tensor([sum(ranks)], device=device) + res = idist.all_reduce(t, group=ranks) + assert res == torch.tensor([sum(ranks)], device=device) ranks = "abc" @@ -152,7 +144,7 @@ def _test_distrib_all_reduce_group(device): with pytest.raises(ValueError, match=r"Argument group should be list of int"): res = idist.all_reduce(t, group="abc") elif bnd in ("horovod"): - with pytest.raises(NotImplementedError, match=r"all_reduce with group for horovod is not implemented"): + with pytest.raises(NotImplementedError, match=r"Argument group should be list of int or ProcessSet"): res = idist.all_reduce(t, group="abc")