diff --git a/examples/notebooks/XLA_jax.ipynb b/examples/notebooks/XLA_jax.ipynb index 1adb81f..c801b99 100644 --- a/examples/notebooks/XLA_jax.ipynb +++ b/examples/notebooks/XLA_jax.ipynb @@ -33,6 +33,7 @@ "outputs": [], "source": [ "from jax import __version__\n", + "\n", "print(__version__)" ] }, diff --git a/examples/notebooks/XLA_torch.ipynb b/examples/notebooks/XLA_torch.ipynb index 0234a6d..6ee13af 100644 --- a/examples/notebooks/XLA_torch.ipynb +++ b/examples/notebooks/XLA_torch.ipynb @@ -48,6 +48,7 @@ ], "source": [ "from torch import __version__\n", + "\n", "print(__version__)" ] }, diff --git a/examples/torch_ppo/evaluate.py b/examples/torch_ppo/evaluate.py new file mode 100644 index 0000000..60aa580 --- /dev/null +++ b/examples/torch_ppo/evaluate.py @@ -0,0 +1,147 @@ +import argparse +import os +import time + +import gym +import torch + +from animus import set_global_seed + +from src.acmodel import ACModel +from src.agent import Agent +from src.settings import LOGDIR +from src.utils import ParallelEnv, synthesize + +if __name__ == "__main__": + # Parse arguments + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", required=True, help="name of the environment (REQUIRED)" + ) + # parser.add_argument( + # "--model", required=True, help="name of the trained model (REQUIRED)" + # ) + parser.add_argument( + "--episodes", + type=int, + default=100, + help="number of episodes of evaluation (default: 100)", + ) + parser.add_argument("--seed", type=int, default=0, help="random seed (default: 0)") + parser.add_argument( + "--procs", type=int, default=16, help="number of processes (default: 16)" + ) + parser.add_argument( + "--argmax", + action="store_true", + default=False, + help="action with highest probability is selected", + ) + parser.add_argument( + "--worst-episodes-to-show", + type=int, + default=10, + help="how many worst episodes to show", + ) + parser.add_argument( + "--recurrent", action="store_true", default=False, help="add a LSTM to the model" + ) + args = parser.parse_args() + + # Set seed for all randomness sources + set_global_seed(args.seed) + + # Set device + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + print(f"Device: {device}\n") + + # Load environments + + envs = [] + for i in range(args.procs): + env = gym.make(args.env) + envs.append(env) + env = ParallelEnv(envs) + print("Environments loaded\n") + + # Load agent + acmodel = ACModel( + observation_space=env.observation_space, + action_space=env.action_space, + recurrent=args.recurrent, + ) + checkpoint = torch.load( + os.path.join(LOGDIR, "acmodel.best.pth"), + map_location=lambda storage, loc: storage, + ) + acmodel.load_state_dict(checkpoint) + agent = Agent( + acmodel=acmodel, + device=device, + argmax=args.argmax, + ) + print("Agent loaded\n") + + # Initialize logs + logs = {"num_steps_per_episode": [], "return_per_episode": []} + + # Run agent + start_time = time.time() + obss = env.reset() + + log_done_counter = 0 + log_episode_return = torch.zeros(args.procs, device=device) + log_episode_num_steps = torch.zeros(args.procs, device=device) + + while log_done_counter < args.episodes: + actions = agent.get_actions(obss) + obss, rewards, dones, _ = env.step(actions) + agent.analyze_feedbacks(rewards, dones) + + log_episode_return += torch.tensor(rewards, device=device, dtype=torch.float) + log_episode_num_steps += torch.ones(args.procs, device=device) + + for i, done in enumerate(dones): + if done: + log_done_counter += 1 + logs["return_per_episode"].append(log_episode_return[i].item()) + logs["num_steps_per_episode"].append(log_episode_num_steps[i].item()) + + mask = 1 - torch.tensor(dones, device=device, dtype=torch.float) + log_episode_return *= mask + log_episode_num_steps *= mask + + end_time = time.time() + + # Print logs + num_steps = sum(logs["num_steps_per_episode"]) + fps = num_steps / (end_time - start_time) + duration = int(end_time - start_time) + return_per_episode = synthesize(logs["return_per_episode"]) + num_steps_per_episode = synthesize(logs["num_steps_per_episode"]) + + print( + "S {} | FPS {:.0f} | D {} | R:μσmM {:.2f} {:.2f} {:.2f} {:.2f} | F:μσmM {:.1f} {:.1f} {} {}".format( + num_steps, + fps, + duration, + *return_per_episode.values(), + *num_steps_per_episode.values(), + ) + ) + + # Print worst episodes + n = args.worst_episodes_to_show + if n > 0: + print("\n{} worst episodes:".format(n)) + + indexes = sorted( + range(len(logs["return_per_episode"])), + key=lambda k: logs["return_per_episode"][k], + ) + for i in indexes[:n]: + print( + "- episode {}: R={}, F={}".format( + i, logs["return_per_episode"][i], logs["num_steps_per_episode"][i] + ) + ) diff --git a/examples/torch_ppo/src/acmodel.py b/examples/torch_ppo/src/acmodel.py new file mode 100644 index 0000000..bc3b3d4 --- /dev/null +++ b/examples/torch_ppo/src/acmodel.py @@ -0,0 +1,94 @@ +import torch +from torch.distributions.categorical import Categorical +import torch.nn as nn +import torch.nn.functional as F + + +# Function from https://github.com/ikostrikov/pytorch-a2c-ppo-acktr/blob/master/model.py +def init_params(m): + classname = m.__class__.__name__ + if classname.find("Linear") != -1: + m.weight.data.normal_(0, 1) + m.weight.data *= 1 / torch.sqrt(m.weight.data.pow(2).sum(1, keepdim=True)) + if m.bias is not None: + m.bias.data.fill_(0) + + +class ACModel(nn.Module): + def __init__(self, observation_space, action_space, recurrent=False): + super().__init__() + + # Decide which components are enabled + self.recurrent = recurrent + + # Define embedder + self.embedder = nn.Sequential( + nn.Linear(observation_space.shape[0], 128), + nn.ReLU(), + nn.Linear(128, 128), + nn.ReLU(), + ) + self.embedding_size = 128 + # TODO: add image support + # self.embedder = nn.Sequential( + # nn.Conv2d(3, 16, (2, 2)), + # nn.ReLU(), + # nn.MaxPool2d((2, 2)), + # nn.Conv2d(16, 32, (2, 2)), + # nn.ReLU(), + # nn.Conv2d(32, 64, (2, 2)), + # nn.ReLU(), + # ) + # n = obs_space["image"][0] + # m = obs_space["image"][1] + # self.embedding_size = ((n - 1) // 2 - 2) * ((m - 1) // 2 - 2) * 64 + + # Define memory + if self.recurrent: + self.memory_rnn = nn.LSTMCell(self.embedding_size, self.semi_memory_size) + + # Resize embedding + self.embedding_size = self.semi_memory_size + + # Define actor's model + self.actor = nn.Sequential( + nn.Linear(self.embedding_size, 64), nn.Tanh(), nn.Linear(64, action_space.n) + ) + + # Define critic's model + self.critic = nn.Sequential( + nn.Linear(self.embedding_size, 64), nn.Tanh(), nn.Linear(64, 1) + ) + + # Initialize parameters correctly + self.apply(init_params) + + @property + def memory_size(self): + return 2 * self.semi_memory_size + + @property + def semi_memory_size(self): + return self.embedding_size + + def forward(self, x, memory=None): + x = self.embedder(x) + + if self.recurrent: + hidden = ( + memory[:, : self.semi_memory_size], + memory[:, self.semi_memory_size :], + ) + hidden = self.memory_rnn(x, hidden) + embedding = hidden[0] + memory = torch.cat(hidden, dim=1) + else: + embedding = x + + x = self.actor(embedding) + dist = Categorical(logits=F.log_softmax(x, dim=1)) + + x = self.critic(embedding) + value = x.squeeze(1) + + return dist, value, memory diff --git a/examples/torch_ppo/src/agent.py b/examples/torch_ppo/src/agent.py new file mode 100644 index 0000000..0c69a55 --- /dev/null +++ b/examples/torch_ppo/src/agent.py @@ -0,0 +1,51 @@ +import numpy as np +import torch + + +class Agent: + """An agent. Used for model inference. + + It is able: + - to choose an action given an observation, + - to analyze the feedback (i.e. reward and done state) of its action.""" + + def __init__(self, acmodel, device, argmax=False, num_envs=1): + self.acmodel = acmodel + self.device = device + self.argmax = argmax + self.num_envs = num_envs + + if self.acmodel.recurrent: + self.memories = torch.zeros( + self.num_envs, self.acmodel.memory_size, device=self.device + ) + else: + self.memories = None + + self.acmodel.to(self.device) + self.acmodel.eval() + + def get_actions(self, obss): + with torch.no_grad(): + obss = torch.tensor(np.array(obss), device=self.device, dtype=torch.float) + dist, _, self.memories = self.acmodel(obss, self.memories) + + if self.argmax: + actions = dist.probs.max(1, keepdim=True)[1] + else: + actions = dist.sample() + + return actions.cpu().numpy() + + def get_action(self, obs): + return self.get_actions([obs])[0] + + def analyze_feedbacks(self, rewards, dones): + if self.acmodel.recurrent: + masks = 1 - torch.tensor( + dones, dtype=torch.float, device=self.device + ).unsqueeze(1) + self.memories *= masks + + def analyze_feedback(self, reward, done): + return self.analyze_feedbacks([reward], [done]) diff --git a/examples/torch_ppo/src/settings.py b/examples/torch_ppo/src/settings.py new file mode 100644 index 0000000..fdf62b1 --- /dev/null +++ b/examples/torch_ppo/src/settings.py @@ -0,0 +1 @@ +LOGDIR = "./logs_ppo" diff --git a/examples/torch_ppo/src/trainer.py b/examples/torch_ppo/src/trainer.py new file mode 100644 index 0000000..abe25c3 --- /dev/null +++ b/examples/torch_ppo/src/trainer.py @@ -0,0 +1,592 @@ +from abc import ABC, abstractmethod + +import numpy as np +import torch + +from src.utils import DictList, ParallelEnv + + +def default_preprocess_obss(obss, device=None): + return torch.tensor(np.array(obss), device=device) + + +class ITrainer(ABC): + """The base class for RL algorithms.""" + + def __init__( + self, + envs, + acmodel, + device, + num_steps_per_proc, + discount, + lr, + gae_lambda, + entropy_coef, + value_loss_coef, + max_grad_norm, + recurrence, + preprocess_obss, + reshape_reward, + ): + """ + Initializes a `IAlgo` instance. + + Parameters: + ---------- + envs : list + a list of environments that will be run in parallel + acmodel : torch.Module + the model + num_steps_per_proc : int + the number of steps collected by every process for an update + discount : float + the discount for future rewards + lr : float + the learning rate for optimizers + gae_lambda : float + the lambda coefficient in the GAE formula + ([Schulman et al., 2015](https://arxiv.org/abs/1506.02438)) + entropy_coef : float + the weight of the entropy cost in the final objective + value_loss_coef : float + the weight of the value loss in the final objective + max_grad_norm : float + gradient will be clipped to be at most this value + recurrence : int + the number of steps the gradient is propagated back in time + preprocess_obss : function + a function that takes observations returned by the environment + and converts them into the format that the model can handle + reshape_reward : function + a function that shapes the reward, takes an + (observation, action, reward, done) tuple as an input + """ + + # Store parameters + + self.env = ParallelEnv(envs) + self.acmodel = acmodel + self.device = device + self.num_steps_per_proc = num_steps_per_proc + self.discount = discount + self.lr = lr + self.gae_lambda = gae_lambda + self.entropy_coef = entropy_coef + self.value_loss_coef = value_loss_coef + self.max_grad_norm = max_grad_norm + self.recurrence = recurrence + self.preprocess_obss = preprocess_obss or default_preprocess_obss + self.reshape_reward = reshape_reward + + # Control parameters + assert self.acmodel.recurrent or self.recurrence == 1 + assert self.num_steps_per_proc % self.recurrence == 0 + + # Configure acmodel + self.acmodel.to(self.device) + self.acmodel.train() + + # Store helpers values + self.num_procs = len(envs) + self.num_steps = self.num_steps_per_proc * self.num_procs + + # Initialize experience values + shape = (self.num_steps_per_proc, self.num_procs) + + self.obs = self.env.reset() + self.obss = [None] * (shape[0]) + if self.acmodel.recurrent: + self.memory = torch.zeros( + shape[1], self.acmodel.memory_size, device=self.device + ) + self.memories = torch.zeros( + *shape, self.acmodel.memory_size, device=self.device + ) + self.mask = torch.ones(shape[1], device=self.device) + self.masks = torch.zeros(*shape, device=self.device) + self.actions = torch.zeros(*shape, device=self.device, dtype=torch.int) + self.values = torch.zeros(*shape, device=self.device) + self.rewards = torch.zeros(*shape, device=self.device) + self.advantages = torch.zeros(*shape, device=self.device) + self.log_probs = torch.zeros(*shape, device=self.device) + + # Initialize log values + self.log_episode_return = torch.zeros(self.num_procs, device=self.device) + self.log_episode_reshaped_return = torch.zeros( + self.num_procs, device=self.device + ) + self.log_episode_num_steps = torch.zeros(self.num_procs, device=self.device) + + self.log_done_counter = 0 + self.log_return = [0] * self.num_procs + self.log_reshaped_return = [0] * self.num_procs + self.log_num_steps = [0] * self.num_procs + + def collect_experiences(self): + """Collects rollouts and computes advantages. + + Runs several environments concurrently. The next actions are computed + in a batch mode for all environments at the same time. The rollouts + and advantages from all environments are concatenated together. + + Returns + ------- + exps : DictList + Contains actions, rewards, advantages etc as attributes. + Each attribute, e.g. `exps.reward` has a shape + (self.num_steps_per_proc * num_envs, ...). k-th block + of consecutive `self.num_steps_per_proc` steps contains + data obtained from the k-th environment. Be careful not to mix + data from different environments! + logs : dict + Useful stats about the training process, including the average + reward, policy loss, value loss, etc. + """ + + for i in range(self.num_steps_per_proc): + # Do one agent-environment interaction + preprocessed_obs = self.preprocess_obss(self.obs, device=self.device) + with torch.no_grad(): + if self.acmodel.recurrent: + dist, value, memory = self.acmodel( + preprocessed_obs, self.memory * self.mask.unsqueeze(1) + ) + else: + dist, value, _ = self.acmodel(preprocessed_obs) + action = dist.sample() + + obs, reward, done, _ = self.env.step(action.cpu().numpy()) + + # Update experiences values + self.obss[i] = self.obs + self.obs = obs + if self.acmodel.recurrent: + self.memories[i] = self.memory + self.memory = memory + self.masks[i] = self.mask + self.mask = 1 - torch.tensor(done, device=self.device, dtype=torch.float) + self.actions[i] = action + self.values[i] = value + if self.reshape_reward is not None: + self.rewards[i] = torch.tensor( + [ + self.reshape_reward(obs_, action_, reward_, done_) + for obs_, action_, reward_, done_ in zip( + obs, action, reward, done + ) + ], + device=self.device, + ) + else: + self.rewards[i] = torch.tensor(reward, device=self.device) + self.log_probs[i] = dist.log_prob(action) + + # Update log values + self.log_episode_return += torch.tensor( + reward, device=self.device, dtype=torch.float + ) + self.log_episode_reshaped_return += self.rewards[i] + self.log_episode_num_steps += torch.ones(self.num_procs, device=self.device) + + for i, done_ in enumerate(done): + if done_: + self.log_done_counter += 1 + self.log_return.append(self.log_episode_return[i].item()) + self.log_reshaped_return.append( + self.log_episode_reshaped_return[i].item() + ) + self.log_num_steps.append(self.log_episode_num_steps[i].item()) + + self.log_episode_return *= self.mask + self.log_episode_reshaped_return *= self.mask + self.log_episode_num_steps *= self.mask + + # Add advantage and return to experiences + preprocessed_obs = self.preprocess_obss(self.obs, device=self.device) + with torch.no_grad(): + if self.acmodel.recurrent: + _, next_value, _ = self.acmodel( + preprocessed_obs, self.memory * self.mask.unsqueeze(1) + ) + else: + _, next_value, _ = self.acmodel(preprocessed_obs) + + for i in reversed(range(self.num_steps_per_proc)): + next_mask = ( + self.masks[i + 1] if i < self.num_steps_per_proc - 1 else self.mask + ) + next_value = ( + self.values[i + 1] if i < self.num_steps_per_proc - 1 else next_value + ) + next_advantage = ( + self.advantages[i + 1] if i < self.num_steps_per_proc - 1 else 0 + ) + + delta = ( + self.rewards[i] + self.discount * next_value * next_mask - self.values[i] + ) + self.advantages[i] = ( + delta + self.discount * self.gae_lambda * next_advantage * next_mask + ) + + # Define experiences: + # the whole experience is the concatenation of the experience + # of each process. + # In comments below: + # - T is self.num_steps_per_proc, + # - P is self.num_procs, + # - D is the dimensionality. + + exps = DictList() + exps.obs = [ + self.obss[i][j] + for j in range(self.num_procs) + for i in range(self.num_steps_per_proc) + ] + if self.acmodel.recurrent: + # T x P x D -> P x T x D -> (P * T) x D + exps.memory = self.memories.transpose(0, 1).reshape( + -1, *self.memories.shape[2:] + ) + # T x P -> P x T -> (P * T) x 1 + exps.mask = self.masks.transpose(0, 1).reshape(-1).unsqueeze(1) + # for all tensors below, T x P -> P x T -> P * T + exps.action = self.actions.transpose(0, 1).reshape(-1) + exps.value = self.values.transpose(0, 1).reshape(-1) + exps.reward = self.rewards.transpose(0, 1).reshape(-1) + exps.advantage = self.advantages.transpose(0, 1).reshape(-1) + exps.returnn = exps.value + exps.advantage + exps.log_prob = self.log_probs.transpose(0, 1).reshape(-1) + + # Preprocess experiences + exps.obs = self.preprocess_obss(exps.obs, device=self.device) + + # Log some values + keep = max(self.log_done_counter, self.num_procs) + + logs = { + "return_per_episode": self.log_return[-keep:], + "reshaped_return_per_episode": self.log_reshaped_return[-keep:], + "num_steps_per_episode": self.log_num_steps[-keep:], + "num_steps": self.num_steps, + } + + self.log_done_counter = 0 + self.log_return = self.log_return[-self.num_procs :] + self.log_reshaped_return = self.log_reshaped_return[-self.num_procs :] + self.log_num_steps = self.log_num_steps[-self.num_procs :] + + return exps, logs + + @abstractmethod + def update_parameters(self): + pass + + +class A2CTrainer(ITrainer): + """The Advantage Actor-Critic algorithm.""" + + def __init__( + self, + envs, + acmodel, + device=None, + num_steps_per_proc=None, + discount=0.99, + lr=0.01, + gae_lambda=0.95, + entropy_coef=0.01, + value_loss_coef=0.5, + max_grad_norm=0.5, + recurrence=4, + rmsprop_alpha=0.99, + rmsprop_eps=1e-8, + preprocess_obss=None, + reshape_reward=None, + ): + num_steps_per_proc = num_steps_per_proc or 8 + super().__init__( + envs, + acmodel, + device, + num_steps_per_proc, + discount, + lr, + gae_lambda, + entropy_coef, + value_loss_coef, + max_grad_norm, + recurrence, + preprocess_obss, + reshape_reward, + ) + + self.optimizer = torch.optim.RMSprop( + self.acmodel.parameters(), lr, alpha=rmsprop_alpha, eps=rmsprop_eps + ) + + def _get_starting_indexes(self): + """Gives the indexes of the observations given to the model and the + experiences used to compute the loss at first. + + The indexes are the integers from 0 to `self.num_steps` with a step of + `self.recurrence`. If the model is not recurrent, they are all the + integers from 0 to `self.num_steps`. + + Returns + ------- + starting_indexes : list of int + the indexes of the experiences to be used at first + """ + starting_indexes = np.arange(0, self.num_steps, self.recurrence) + return starting_indexes + + def update_parameters(self, exps): + # Compute starting indexes + inds = self._get_starting_indexes() + + # Initialize update values + update_entropy = 0 + update_value = 0 + update_policy_loss = 0 + update_value_loss = 0 + update_loss = 0 + + # Initialize memory + if self.acmodel.recurrent: + memory = exps.memory[inds] + + for i in range(self.recurrence): + # Create a sub-batch of experience + sb = exps[inds + i] + + # Compute loss + if self.acmodel.recurrent: + dist, value, memory = self.acmodel(sb.obs, memory * sb.mask) + else: + dist, value, _ = self.acmodel(sb.obs) + entropy = dist.entropy().mean() + policy_loss = -(dist.log_prob(sb.action) * sb.advantage).mean() + value_loss = (value - sb.returnn).pow(2).mean() + loss = ( + policy_loss + - self.entropy_coef * entropy + + self.value_loss_coef * value_loss + ) + + # Update batch values + update_entropy += entropy.item() + update_value += value.mean().item() + update_policy_loss += policy_loss.item() + update_value_loss += value_loss.item() + update_loss += loss + + # Update update values + update_entropy /= self.recurrence + update_value /= self.recurrence + update_policy_loss /= self.recurrence + update_value_loss /= self.recurrence + update_loss /= self.recurrence + + # Update actor-critic + self.optimizer.zero_grad() + update_loss.backward() + update_grad_norm = ( + sum(p.grad.data.norm(2) ** 2 for p in self.acmodel.parameters()) ** 0.5 + ) + torch.nn.utils.clip_grad_norm_(self.acmodel.parameters(), self.max_grad_norm) + self.optimizer.step() + + # Log some values + logs = { + "entropy": update_entropy, + "value": update_value, + "policy_loss": update_policy_loss, + "value_loss": update_value_loss, + "grad_norm": update_grad_norm, + } + return logs + + +class PPOTrainer(ITrainer): + """The Proximal Policy Optimization algorithm + ([Schulman et al., 2015](https://arxiv.org/abs/1707.06347)).""" + + def __init__( + self, + envs, + acmodel, + device=None, + num_steps_per_proc=None, + discount=0.99, + lr=0.001, + gae_lambda=0.95, + entropy_coef=0.01, + value_loss_coef=0.5, + max_grad_norm=0.5, + recurrence=4, + adam_eps=1e-8, + clip_eps=0.2, + epochs=4, + batch_size=256, + preprocess_obss=None, + reshape_reward=None, + ): + num_steps_per_proc = num_steps_per_proc or 128 + super().__init__( + envs, + acmodel, + device, + num_steps_per_proc, + discount, + lr, + gae_lambda, + entropy_coef, + value_loss_coef, + max_grad_norm, + recurrence, + preprocess_obss, + reshape_reward, + ) + + self.clip_eps = clip_eps + self.epochs = epochs + self.batch_size = batch_size + + assert self.batch_size % self.recurrence == 0 + + self.optimizer = torch.optim.Adam(self.acmodel.parameters(), lr, eps=adam_eps) + self.batch_num = 0 + + def _get_batches_starting_indexes(self): + """Gives, for each batch, the indexes of the observations given to + the model and the experiences used to compute the loss at first. + + First, the indexes are the integers from 0 to `self.num_steps` with a step of + `self.recurrence`, shifted by `self.recurrence//2` one time in two for having + more diverse batches. Then, the indexes are splited into the different batches. + + Returns + ------- + batches_starting_indexes : list of list of int + the indexes of the experiences to be used at first for each batch + """ + indexes = np.arange(0, self.num_steps, self.recurrence) + indexes = np.random.permutation(indexes) + + # Shift starting indexes by self.recurrence//2 half the time + if self.batch_num % 2 == 1: + indexes = indexes[(indexes + self.recurrence) % self.num_steps_per_proc != 0] + indexes += self.recurrence // 2 + self.batch_num += 1 + + num_indexes = self.batch_size // self.recurrence + batches_starting_indexes = [ + indexes[i : i + num_indexes] for i in range(0, len(indexes), num_indexes) + ] + return batches_starting_indexes + + def update_parameters(self, exps): + # Collect experiences + for _ in range(self.epochs): + # Initialize log values + log_entropies = [] + log_values = [] + log_policy_losses = [] + log_value_losses = [] + log_grad_norms = [] + + for inds in self._get_batches_starting_indexes(): + # Initialize batch values + batch_entropy = 0 + batch_value = 0 + batch_policy_loss = 0 + batch_value_loss = 0 + batch_loss = 0 + + # Initialize memory + if self.acmodel.recurrent: + memory = exps.memory[inds] + + for i in range(self.recurrence): + # Create a sub-batch of experience + sb = exps[inds + i] + + # Compute loss + if self.acmodel.recurrent: + dist, value, memory = self.acmodel(sb.obs, memory * sb.mask) + else: + dist, value, _ = self.acmodel(sb.obs) + + entropy = dist.entropy().mean() + + ratio = torch.exp(dist.log_prob(sb.action) - sb.log_prob) + surr1 = ratio * sb.advantage + surr2 = ( + torch.clamp(ratio, 1.0 - self.clip_eps, 1.0 + self.clip_eps) + * sb.advantage + ) + policy_loss = -torch.min(surr1, surr2).mean() + + value_clipped = sb.value + torch.clamp( + value - sb.value, -self.clip_eps, self.clip_eps + ) + surr1 = (value - sb.returnn).pow(2) + surr2 = (value_clipped - sb.returnn).pow(2) + value_loss = torch.max(surr1, surr2).mean() + loss = ( + policy_loss + - self.entropy_coef * entropy + + self.value_loss_coef * value_loss + ) + + # Update batch values + batch_entropy += entropy.item() + batch_value += value.mean().item() + batch_policy_loss += policy_loss.item() + batch_value_loss += value_loss.item() + batch_loss += loss + + # Update memories for next epoch + if self.acmodel.recurrent and i < self.recurrence - 1: + exps.memory[inds + i + 1] = memory.detach() + + # Update batch values + batch_entropy /= self.recurrence + batch_value /= self.recurrence + batch_policy_loss /= self.recurrence + batch_value_loss /= self.recurrence + batch_loss /= self.recurrence + + # Update actor-critic + self.optimizer.zero_grad() + batch_loss.backward() + grad_norm = ( + sum( + p.grad.data.norm(2).item() ** 2 + for p in self.acmodel.parameters() + ) + ** 0.5 + ) + torch.nn.utils.clip_grad_norm_( + self.acmodel.parameters(), self.max_grad_norm + ) + self.optimizer.step() + + # Update log values + log_entropies.append(batch_entropy) + log_values.append(batch_value) + log_policy_losses.append(batch_policy_loss) + log_value_losses.append(batch_value_loss) + log_grad_norms.append(grad_norm) + + # Log some values + logs = { + "entropy": np.mean(log_entropies), + "value": np.mean(log_values), + "policy_loss": np.mean(log_policy_losses), + "value_loss": np.mean(log_value_losses), + "grad_norm": np.mean(log_grad_norms), + } + return logs diff --git a/examples/torch_ppo/src/utils.py b/examples/torch_ppo/src/utils.py new file mode 100644 index 0000000..06c7cab --- /dev/null +++ b/examples/torch_ppo/src/utils.py @@ -0,0 +1,109 @@ +from collections import OrderedDict +import logging +from multiprocessing import Pipe, Process +import os +import sys + +import gym +import numpy as np + + +def synthesize(array): + d = OrderedDict() + d["mean"] = np.mean(array) + d["std"] = np.std(array) + d["min"] = np.amin(array) + d["max"] = np.amax(array) + return d + + +def get_txt_logger(model_dir): + path = os.path.join(model_dir, "log.txt") + os.makedirs(model_dir, exist_ok=True) + logging.basicConfig( + level=logging.INFO, + format="%(message)s", + handlers=[logging.FileHandler(filename=path), logging.StreamHandler(sys.stdout)], + ) + return logging.getLogger() + + +class DictList(dict): + """A dictionnary of lists of same size. Dictionnary items can be + accessed using `.` notation and list items using `[]` notation. + + Example: + >>> d = DictList({"a": [[1, 2], [3, 4]], "b": [[5], [6]]}) + >>> d.a + [[1, 2], [3, 4]] + >>> d[0] + DictList({"a": [1, 2], "b": [5]}) + """ + + __getattr__ = dict.__getitem__ + __setattr__ = dict.__setitem__ + + def __len__(self): + return len(next(iter(dict.values(self)))) + + def __getitem__(self, index): + return DictList({key: value[index] for key, value in dict.items(self)}) + + def __setitem__(self, index, d): + for key, value in d.items(): + dict.__getitem__(self, key)[index] = value + + +def worker(conn, env): + while True: + cmd, data = conn.recv() + if cmd == "step": + obs, reward, done, info = env.step(data) + if done: + obs = env.reset() + conn.send((obs, reward, done, info)) + elif cmd == "reset": + obs = env.reset() + conn.send(obs) + else: + raise NotImplementedError + + +class ParallelEnv(gym.Env): + """A concurrent execution of environments in multiple processes.""" + + def __init__(self, envs): + assert len(envs) >= 1, "No environment given." + + self.envs = envs + self.observation_space = self.envs[0].observation_space + self.action_space = self.envs[0].action_space + + self.locals = [] + for env in self.envs[1:]: + local, remote = Pipe() + self.locals.append(local) + p = Process(target=worker, args=(remote, env)) + p.daemon = True + p.start() + remote.close() + + def reset(self): + for local in self.locals: + local.send(("reset", None)) + results = [self.envs[0].reset()] + [local.recv() for local in self.locals] + return results + + def step(self, actions): + for local, action in zip(self.locals, actions[1:]): + local.send(("step", action)) + obs, reward, done, info = self.envs[0].step(actions[0]) + if done: + obs = self.envs[0].reset() + results = zip( + *[(obs, reward, done, info)] + [local.recv() for local in self.locals] + ) + return results + + def render(self): + raise NotImplementedError diff --git a/examples/torch_ppo/train.py b/examples/torch_ppo/train.py new file mode 100644 index 0000000..de11dbf --- /dev/null +++ b/examples/torch_ppo/train.py @@ -0,0 +1,150 @@ +import argparse +import time + +import gym +import tensorboardX +import torch + +from animus import IExperiment, set_global_seed +from animus.torch.callbacks import TorchCheckpointerCallback + +from src.acmodel import ACModel +from src.settings import LOGDIR +from src.trainer import A2CTrainer, PPOTrainer +from src.utils import get_txt_logger, synthesize + +TRAINERS = { + "a2c": A2CTrainer, + "ppo": PPOTrainer, +} + + +class Experiment(IExperiment): + def __init__( + self, + *, + # general + num_steps: int, + num_envs: int, + alg_name: str = "ppo", + env_name: str = "CartPole-v1", + recurrent: bool = False, + **trainer_kwargs, + ): + super().__init__() + self.update_step = 0 + self.num_epochs = num_steps + self.num_envs = num_envs + self.recurrent = recurrent + self.env_name = env_name + self.alg_name = alg_name + self.trainer_kwargs = trainer_kwargs + + def on_experiment_start(self, exp: "IExperiment"): + super().on_experiment_start(exp) + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + envs = [] + for i in range(self.num_envs): + envs.append(gym.make(self.env_name)) + observation_space, action_space = envs[0].observation_space, envs[0].action_space + self.acmodel = ACModel( + observation_space=observation_space, + action_space=action_space, + recurrent=self.recurrent, + ) + self.trainer = TRAINERS[self.alg_name]( + envs=envs, acmodel=self.acmodel, device=self.device, **self.trainer_kwargs + ) + self.callbacks = { + "checkpointer": TorchCheckpointerCallback( + exp_attr="acmodel", + logdir=LOGDIR, + metric_key="return_mean", + minimize=False, + ), + } + self.txt_logger = get_txt_logger(LOGDIR) + self.tb_logger = tensorboardX.SummaryWriter(LOGDIR) + + def on_epoch_start(self, exp: "IExperiment"): + self.epoch_metrics = {} + set_global_seed(self.seed + self.epoch_step) + + def run_epoch(self) -> None: + update_start_time = time.time() + exps, logs1 = self.trainer.collect_experiences() + logs2 = self.trainer.update_parameters(exps) + logs = {**logs1, **logs2} + update_end_time = time.time() + + self.update_step += 1 + self.epoch_step += logs["num_steps"] + fps = logs["num_steps"] / (update_end_time - update_start_time) + + return_per_episode = synthesize(logs["return_per_episode"]) + rreturn_per_episode = synthesize(logs["reshaped_return_per_episode"]) + num_steps_per_episode = synthesize(logs["num_steps_per_episode"]) + + header = ["update", "steps", "FPS"] + data = [self.update_step, self.epoch_step, fps] + header += ["rreturn_" + key for key in rreturn_per_episode.keys()] + data += rreturn_per_episode.values() + header += ["num_steps_" + key for key in num_steps_per_episode.keys()] + data += num_steps_per_episode.values() + header += ["entropy", "value", "policy_loss", "value_loss", "grad_norm"] + data += [ + logs["entropy"], + logs["value"], + logs["policy_loss"], + logs["value_loss"], + logs["grad_norm"], + ] + self.txt_logger.info( + "U {} | S {:06} | FPS {:04.0f} | rR:μσmM {:.2f} {:.2f} {:.2f} {:.2f} | F:μσmM {:.1f} {:.1f} {} {} | H {:.3f} | V {:.3f} | pL {:.3f} | vL {:.3f} | ∇ {:.3f}".format( + *data + ) + ) + header += ["return_" + key for key in return_per_episode.keys()] + data += return_per_episode.values() + + self.epoch_metrics = dict(zip(header, data)) + for field, value in self.epoch_metrics.items(): + self.tb_logger.add_scalar(field, value, self.epoch_step) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", required=True, help="name of the environment to be run (REQUIRED)" + ) + parser.add_argument( + "--procs", type=int, default=16, help="number of processes (default: 16)" + ) + parser.add_argument( + "--steps", + type=int, + default=int(1e6), + help="number of frames of training (default: 1e6)", + ) + args = parser.parse_args() + + exp = Experiment( + num_steps=args.steps, + num_envs=args.procs, + env_name=args.env, + recurrent=False, + alg_name="ppo", + # ppo + num_steps_per_proc=None, + discount=0.99, + lr=0.001, + gae_lambda=0.95, + entropy_coef=0.01, + value_loss_coef=0.5, + max_grad_norm=0.5, + recurrence=1, + adam_eps=1e-8, + clip_eps=0.2, + epochs=4, + batch_size=256, + ).run() diff --git a/examples/torch_ppo/visualize.py b/examples/torch_ppo/visualize.py new file mode 100644 index 0000000..b833cbc --- /dev/null +++ b/examples/torch_ppo/visualize.py @@ -0,0 +1,119 @@ +import argparse +import os + +import gym +import numpy as np +import torch + +from animus import set_global_seed + +from src.acmodel import ACModel +from src.agent import Agent +from src.settings import LOGDIR + +if __name__ == "__main__": + # Parse arguments + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", required=True, help="name of the environment to be run (REQUIRED)" + ) + # parser.add_argument( + # "--model", required=True, help="name of the trained model (REQUIRED)" + # ) + parser.add_argument("--seed", type=int, default=0, help="random seed (default: 0)") + parser.add_argument( + "--shift", + type=int, + default=0, + help="number of times the environment is reset at the beginning (default: 0)", + ) + parser.add_argument( + "--argmax", + action="store_true", + default=False, + help="select the action with highest probability (default: False)", + ) + parser.add_argument( + "--pause", + type=float, + default=0.1, + help="pause duration between two consequent actions of the agent (default: 0.1)", + ) + parser.add_argument( + "--gif", + type=str, + default=None, + help="store output as gif with the given filename", + ) + parser.add_argument( + "--episodes", type=int, default=10, help="number of episodes to visualize" + ) + parser.add_argument( + "--recurrent", action="store_true", default=False, help="add a LSTM to the model" + ) + args = parser.parse_args() + + # Set seed for all randomness sources + set_global_seed(args.seed) + + # Set device + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + print(f"Device: {device}\n") + + # Load environment + + env = gym.make(args.env) + for _ in range(args.shift): + env.reset() + print("Environment loaded\n") + + # Load agent + acmodel = ACModel( + observation_space=env.observation_space, + action_space=env.action_space, + recurrent=args.recurrent, + ) + checkpoint = torch.load( + os.path.join(LOGDIR, "acmodel.best.pth"), + map_location=lambda storage, loc: storage, + ) + acmodel.load_state_dict(checkpoint) + agent = Agent( + acmodel=acmodel, + device=device, + argmax=args.argmax, + ) + print("Agent loaded\n") + + # Run the agent + + if args.gif: + from array2gif import write_gif + + frames = [] + + # Create a window to view the environment + # env.render("human") + + for episode in range(args.episodes): + obs = env.reset() + + while True: + env.render("human") + if args.gif: + frames.append(np.moveaxis(env.render("rgb_array"), 2, 0)) + + action = agent.get_action(obs) + obs, reward, done, _ = env.step(action) + agent.analyze_feedback(reward, done) + + if done: # or env.window.closed: + break + + # if env.window.closed: + # break + + if args.gif: + print("Saving gif... ", end="") + write_gif(np.array(frames), args.gif + ".gif", fps=1 / args.pause) + print("Done.") diff --git a/examples/torch_rl/README.md b/examples/torch_rl/README.md index 6c5a01c..082e050 100644 --- a/examples/torch_rl/README.md +++ b/examples/torch_rl/README.md @@ -1,6 +1,6 @@ ## Tl;dr ```bash -pip install animus gym numpy torch tqdm +pip install animus gym pygame numpy torch tqdm python torch_dqn.py python torch_ddpg.py python torch_reinforce.py