Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce PerProcessContext to adjust DDP per process behavior (#913) #955

Merged
merged 1 commit into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions alf/bin/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import torch.multiprocessing as mp

from alf.utils import common
from alf.utils.per_process_context import PerProcessContext
import alf.utils.external_configurables
from alf.trainers import policy_trainer

Expand Down Expand Up @@ -130,6 +131,11 @@ def training_worker(rank: int, world_size: int, conf_file: str, root_dir: str):
_define_flags()
FLAGS(sys.argv, known_only=True)
FLAGS.mark_as_parsed()
# Set the rank and total number of processes for distributed training.
PerProcessContext().set_distributed(
rank=rank, num_processes=world_size)
# Make PerProcessContext read-only.
PerProcessContext().finalize()

# Parse the configuration file, which will also implicitly bring up the environments.
common.parse_conf_file(conf_file)
Expand Down
34 changes: 33 additions & 1 deletion alf/config_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
"""

import runpy

import math
from alf.algorithms.config import TrainerConfig
from alf.algorithms.data_transformer import create_data_transformer
from alf.config_util import config1, get_config_value, pre_config, validate_pre_configs
from alf.environments.utils import create_environment
from alf.utils.common import set_random_seed
from alf.utils.per_process_context import PerProcessContext

__all__ = [
'close_env', 'get_raw_observation_spec', 'get_observation_spec',
Expand Down Expand Up @@ -102,6 +103,32 @@ def get_action_spec():
_is_parsing = False


def adjust_config_by_multi_process_divider(multi_process_divider: int = 1):
"""Adjust specific configuration value in multiple process settings
Alf assumes all configuration files geared towards single process training.
This means that in multi-process settings such as DDP some of the
configuration values needs to be adjusted to achieve parity on number of
processes.
For example, if we run 64 environments in parallel for single process
settings, the value needs to be overriden with 16 if there are 4 identical
processes running DDP training to achieve parity.
Args:
multi_process_divider (int): this is equivalent to number of processes
"""
if multi_process_divider <= 1:
return

# Adjust the num of environments per process. The value for single process
# (before adjustment) is divided by the multi_process_divider and becomes
# the per-process value.
tag = 'create_environment.num_parallel_environments'
num_parallel_environments = get_config_value(tag)
config1(
tag,
math.ceil(num_parallel_environments / multi_process_divider),
raise_if_used=False)


def parse_config(conf_file, conf_params):
"""Parse config file and config parameters

Expand Down Expand Up @@ -165,6 +192,11 @@ def get_env():
# random seed we are using.
if random_seed is None:
config1('TrainerConfig.random_seed', seed, raise_if_used=False)

# In case when running in multi-process mode, the number of environments
# per process need to be adjusted (divided by number of processes).
adjust_config_by_multi_process_divider(
PerProcessContext().num_processes)
_env = create_environment(seed=random_seed)
return _env

Expand Down
7 changes: 7 additions & 0 deletions alf/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import alf.nest as nest
from alf.tensor_specs import TensorSpec, BoundedTensorSpec
from alf.utils.spec_utils import zeros_from_spec as zero_tensor_from_nested_spec
from alf.utils.per_process_context import PerProcessContext
from . import dist_utils, gin_utils


Expand Down Expand Up @@ -255,6 +256,12 @@ def run_under_record_context(func,
summary_max_queue (int): the largest number of summaries to keep in a queue;
will flush once the queue gets bigger than this. Defaults to 10.
"""
# Disable summary if in distributed mode and the running process isn't the
# master process (i.e. rank = 0)
if PerProcessContext().ddp_rank > 0:
func()
return

summary_dir = os.path.expanduser(summary_dir)
summary_writer = alf.summary.create_summary_writer(
summary_dir, flush_secs=flush_secs, max_queue=summary_max_queue)
Expand Down
66 changes: 66 additions & 0 deletions alf/utils/per_process_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2021 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class PerProcessContext(object):
"""A singletone that maintains the per process runtime properties.

It is used mainly in multi-process distributed training mode,
where properties such as the rank of the process and the total
number of processes can be accessed via this interface.
"""
_instance = None

def __new__(cls):
"""Construct the singleton instance.

This initializes the singleton and default values are assigned
to the properties.
"""
if cls._instance is None:
cls._instance = super(PerProcessContext, cls).__new__(cls)
cls._instance._read_only = False
cls._instance._ddp_rank = -1
cls._instance._num_processes = 1
return cls._instance

def finalize(self) -> None:
"""Lock the context so that it becomes read only.
"""
self._read_only = True

def set_distributed(self, rank: int, num_processes: int) -> None:
"""Set the distributed properties.

Args:
rank (int): the ID of the process
num_processes (int): the total number of processes
"""
if self._read_only:
raise AttributeError(
'Cannot mutate PerProcessContext after it is finalized')
self._ddp_rank = rank
self._num_processes = num_processes

@property
def is_distributed(self):
return self._ddp_rank >= 0

@property
def ddp_rank(self):
return self._ddp_rank

@property
def num_processes(self):
return self._num_processes