-
Notifications
You must be signed in to change notification settings - Fork 0
/
abk_a3c_for_kung_fu.py
356 lines (305 loc) · 15.6 KB
/
abk_a3c_for_kung_fu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# -*- coding: utf-8 -*-
"""ABK - A3C for Kung Fu
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1Nuy73Qg8We1HzCmL63_Jd46YiOzMASU1
# A3C for Kung Fu
## Part 0 - Installing the required packages and importing the libraries
### Installing Gymnasium
"""
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]
"""### Importing the libraries"""
import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import gymnasium as gym
from gymnasium import ObservationWrapper
from gymnasium.spaces import Box
"""## Part 1 - Building the AI
### Creating the architecture of the Neural Network
"""
# prompt:
class Network(nn.Module):
def __init__(self, action_size):
super(Network, self).__init__()
self.conv1 = torch.nn.Conv2d(in_channels = 4, out_channels = 32, kernel_size = (3,3), stride = 2)
self.conv2 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2)
self.conv3 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2)
self.flatten = torch.nn.Flatten()
self.fc1 = torch.nn.Linear(512, 128)
self.fc2a = torch.nn.Linear(128, action_size)
self.fc2s = torch.nn.Linear(128, 1)
def forward(self, state):
x = self.conv1(state)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = self.conv3(x)
x = F.relu(x)
x = self.flatten(x)
x = self.fc1(x)
x = F.relu(x)
action_values = self.fc2a(x)
state_value = self.fc2s(x)[0]
return action_values, state_value
"""## Part 2 - Training the AI
### Setting up the environment
"""
# The ObservationWrapper class likely provides some common functionalities for processing observations in a reinforcement learning context
class PreprocessAtari(ObservationWrapper):
def __init__(self, env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4):
super(PreprocessAtari, self).__init__(env)
# Stores the desired image size for resizing
self.img_size = (height, width)
# Stores the provided cropping function for processing observations
self.crop = crop
# Records the desired order for color channels (e.g., 'pytorch' or 'tensorflow')
self.dim_order = dim_order
# Indicates whether to maintain color information in pre-processed data
self.color = color
# Sets the number of frames to stack for incorporating temporal context
self.frame_stack = n_frames
# Calculates the total number of channels based on color preference and frame stacking (4 = grayscale, 12 = color)
n_channels = 3 * n_frames if color else n_frames
# Dynamically defines the observation space shape based on color format and chosen dim_order
obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
# Sets the observation space using a Box object (low=0.0, high=1.0) with the determined shape
self.observation_space = Box(0.0, 1.0, obs_shape)
# Initializes an array of zeros with the calculated shape and data type (float32) to store pre-processed frames
self.frames = np.zeros(obs_shape, dtype = np.float32)
def reset(self):
# Resets the frame buffer to clear previous observations
self.frames = np.zeros_like(self.frames)
# Calls the underlying environment's reset method to restart the game and retrieve the initial observation and info
obs, info = self.env.reset()
# Calls the update_buffer method to pre-process the initial observation and store it in the buffer
self.update_buffer(obs)
# Returns the pre-processed initial observation and the environment information
return self.frames, info
def observation(self, img):
# Applies the specified cropping function to the raw image
img = self.crop(img)
# Resizes the image to the desired dimensions using OpenCV's resize function
img = cv2.resize(img, self.img_size)
# Checks if color information is not being preserved
if not self.color:
# Checks if the input image has three channels (assuming RGB)
if len(img.shape) == 3 and img.shape[2] == 3:
# If color is disabled and the image has three channels, converts it to grayscale using OpenCV's cvtColor
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Converts the image data to float32 and normalizes pixel values to the range [0, 1]
img = img.astype('float32') / 255.
# Checks if color information is being preserved
if self.color:
# If color is enabled, shifts the frame buffer up by 3 positions (to discard older frames)
self.frames = np.roll(self.frames, shift = -3, axis = 0)
else:
# Otherwise (grayscale images), shifts the frame buffer up by 1 position
self.frames = np.roll(self.frames, shift = -1, axis = 0)
if self.color:
# If color is enabled, updates the last three frames in the buffer with the pre-processed image
self.frames[-3:] = img
else:
# updates the last frame in the buffer with the pre-processed image
self.frames[-1] = img
return self.frames
def update_buffer(self, obs):
self.frames = self.observation(obs)
# not part of PreprocessAtari
def make_env():
env = gym.make("KungFuMasterDeterministic-v0", render_mode = 'rgb_array')
env = PreprocessAtari(env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4)
return env
env = make_env()
state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("Observation shape:", state_shape)
print("Number actions:", number_actions)
print("Action names:", env.env.env.get_action_meanings())
"""### Initializing the hyperparameters"""
learning_rate = 1e-4
discount_factor = 0.99
number_environments = 10
"""### Implementing the A3C class"""
class Agent():
def __init__(self, action_size):
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.action_size = action_size
self.network = Network(action_size).to(self.device)
self.optimizer = torch.optim.Adam(self.network.parameters(), lr = learning_rate)
def act(self, state):
# state should be 4 frame buffer, stack of 4 grayscale frames
# 1 dimension is for which frame it is among those corresponding frames, other 2 dimensions are for size of image
# single full grayscale frame buffer
if state.ndim == 3:
# add 1 extra dimension, make a batch, now dimension will be 4
state = [state]
state = torch.tensor(state, dtype = torch.float32, device = self.device)
action_values, _ = self.network(state)
# policy tensor generated by softmax strategy
policy = F.softmax(action_values, dim = -1)
# choice() is used to sample an action from the policy distribution
# detach() is used to detach the policy tensor from the computational graph (no gradient computation)
return np.array([np.random.choice(len(p), p = p) for p in policy.detach().cpu().numpy()])
# all of the parameters are in batches (numpy arrays)
def step(self, state, action, reward, next_state, done):
# This line calculates the batch size from the shape of the state variable, assuming that all the input arrays have the same batch size.
batch_size = state.shape[0]
# convert to pytorch tensor
state = torch.tensor(state, dtype = torch.float32, device = self.device)
reward = torch.tensor(reward, dtype = torch.float32, device = self.device)
next_state = torch.tensor(next_state, dtype = torch.float32, device = self.device)
done = torch.tensor(done, dtype = torch.bool, device = self.device).to(dtype = torch.float32)
action_values, state_value = self.network(state)
_, next_state_value = self.network(next_state)
# bellman equation
# advantage is calculated as target_state_value - state_value for a specific reason related to the actor-critic architecture being used.
# Understanding Actor-Critic:
# The code implements an actor-critic architecture, where two components work together:
# Actor: Chooses actions based on the current state.
# Critic: Estimates the value of being in a particular state.
# Advantage in Actor-Critic:
# In this architecture, the advantage doesn't directly compare action values to the state value. Instead, it compares the estimated value of the current state
# according to the critic (state_value) with the "true" value of the state, calculated using the Bellman equation (target_state_value).
# The "True" Value:
# The Bellman equation considers both immediate reward and discounted future rewards, providing a more accurate estimate of the state's long-term value
# compared to just the current state value.
target_state_value = reward + discount_factor * next_state_value * (1 - done)
advantage = target_state_value - state_value
# probability distribution of the action values
probs = F.softmax(action_values, dim = -1)
# log probability distribution of the action values
# commonly used for computational efficiency in backpropagation during training
logprobs = F.log_softmax(action_values, dim = -1)
# Entropy measures the uncertainty or randomness associated with the distribution
# High entropy: Indicates a more balanced and diverse distribution, where multiple actions have significant probabilities. This encourages exploration, as
# the agent tries different actions to learn their true effects.
# Low entropy: Signifies a highly deterministic distribution, where one action has a dominant probability. This can lead to exploitation of known good actions,
# but might miss better strategies if they require exploring less obvious choices.
entropy = -torch.sum(probs * logprobs, axis = -1)
# create an array
batch_idx = np.arange(batch_size)
# batch of actions corresponding to all the states in the batch
# extracts the log probabilities of the actions that were actually taken in each experience within the batch. It does this by:
# Using batch_idx to select the appropriate row for each experience in the logprobs array.
# Using the action array (which contains the indices of actions taken) to select the corresponding column for each experience.
logp_actions = logprobs[batch_idx, action]
actor_loss = -(logp_actions * advantage.detach()).mean() - 0.001 * entropy.mean()
# detach() prevents gradient computation to flow into the results
critic_loss = F.mse_loss(target_state_value.detach(), state_value)
total_loss = actor_loss + critic_loss
# zeroing out the gradients to prevent accumulation from the previous iteration
self.optimizer.zero_grad()
# back propagation
total_loss.backward()
# update weights
self.optimizer.step()
"""### Initializing the A3C agent"""
agent = Agent(number_actions)
"""### Evaluating our A3C agent on a single episode"""
def evaluate(agent, env, n_episodes = 1):
episodes_rewards = []
for _ in range(n_episodes):
# initialize state
state, _ = env.reset()
total_reward = 0
while True:
action = agent.act(state)
state, reward, done, info, _ = env.step(action[0])
total_reward += reward
if done:
break
episodes_rewards.append(total_reward)
return episodes_rewards
"""### Testing multiple agents on multiple environments at the same time"""
class EnvBatch:
def __init__(self, n_envs = 10):
self.envs = [make_env() for _ in range(n_envs)]
def reset(self):
_states = []
# initialized states for each of the multiple environments
for env in self.envs:
_states.append(env.reset()[0])
return np.array(_states)
def step(self,actions):
# 1. Parallel Environment Interaction:
# *[env.step(a) for env,a in zip(self.envs, actions)]: This part iterates through two lists in parallel: self.envs and actions.
# self.envs: Likely contains multiple instances of the environment to interact with simultaneously.
# actions: Contains the actions to be taken in each corresponding environment.
# For each (env, action) pair, it calls the env.step(action) method, which simulates taking the action in the corresponding env. This step returns a 5-tuple of information about the environment's response:
# next_state: The observation of the state reached after taking the action.
# reward: The reward received for taking the action.
# done: A boolean indicating whether the episode has terminated.
# infos: (Optional) Additional information about the environment's state.
# _: (Ignored) Placeholder for an unused return value.
# 2. Data Collection and Conversion:
# map(np.array, zip(*[env.step(a) for env,a in zip(self.envs, actions)])): This line processes the results from the parallel interactions:
# map(np.array, ...): Converts each element of the inner list (containing experience tuples for each environment) into a NumPy array, efficiently stacking experiences from multiple environments.
# zip(*...): Transposes the data structure by unpacking elements with the same index from each experience tuple, creating separate arrays for next_states, rewards, dones, infos, and the ignored value.
next_states, rewards, dones, infos, _ = map(np.array, zip(*[env.step(a) for env,a in zip(self.envs, actions)]))
for i in range(len(self.envs)):
if dones[i]:
next_states[i] = self.envs[i].reset()[0]
return next_states, rewards, dones, infos
"""### Training the A3C agent"""
import tqdm
env_batch = EnvBatch(number_environments)
# reset all the states corresponding to multiple environments
batch_states = env_batch.reset()
with tqdm.trange(0, 3001) as progress_bar:
for i in progress_bar:
batch_actions = agent.act(batch_states)
# batch of actions selected from multiple agents in multiple environments simultaneously
batch_next_states, batch_rewards, batch_dones, _ = env_batch.step(batch_actions)
# reduce magnitude of batch rewards coming from kung fu environment to stabilize the training and improve eventually
# Stabilize training by preventing rewards from one environment dominating the learning process
# Encourage the agent to explore different strategies if large rewards lead to faster convergence to suboptimal solutions
batch_rewards *= 0.01
# training
agent.step(batch_states, batch_actions, batch_rewards, batch_next_states, batch_dones)
batch_states = batch_next_states
if i % 1000 == 0:
print("Average agent reward: ", np.mean(evaluate(agent, env, n_episodes = 10)))
"""## Part 3 - Visualizing the results"""
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder
def show_video_of_model(agent, env):
state, _ = env.reset()
done = False
frames = []
while not done:
frame = env.render()
frames.append(frame)
action = agent.act(state)
state, reward, done, _, _ = env.step(action[0])
env.close()
imageio.mimsave('video.mp4', frames, fps=30)
show_video_of_model(agent, env)
def show_video():
mp4list = glob.glob('*.mp4')
if len(mp4list) > 0:
mp4 = mp4list[0]
video = io.open(mp4, 'r+b').read()
encoded = base64.b64encode(video)
display(HTML(data='''<video alt="test" autoplay
loop controls style="height: 400px;">
<source src="data:video/mp4;base64,{0}" type="video/mp4" />
</video>'''.format(encoded.decode('ascii'))))
else:
print("Could not find video")
show_video()