-
Notifications
You must be signed in to change notification settings - Fork 0
/
ddpg.py
124 lines (108 loc) · 4.57 KB
/
ddpg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import logging
from datetime import datetime
import numpy as np
import tensorflow as tf
from utils.replay_buffer import ReplayBuffer
from utils.action_noise import OUActionNoise
from network.actor import Actor
from network.critic import Critic
class Agent(object):
"""
DDPG agent
"""
def __init__(self, state_dims, action_dims, action_boundaries,
actor_lr = 1e-5, critic_lr = 1e-4, batch_size = 64, gamma = 0.99,
rand_steps = 1, buf_size = 10000, tau = 1e-3, fcl1_size = 400, fcl2_size = 600):
# action size
self.n_states = state_dims[0]
# state size
self.n_actions = action_dims[0]
self.batch_size = batch_size
# experience replay buffer
self._memory = ReplayBuffer(buf_size, state_dims, action_dims)
# noise generator
self._noise = OUActionNoise(mu=np.zeros(action_dims))
# Bellman discount factor
self.gamma = gamma
# environmental action boundaries
self.lower_bound = action_boundaries[0]
self.upper_bound = action_boundaries[1]
# number of episodes for random action exploration
self.rand_steps = rand_steps - 1
# turn off most logging
logging.getLogger("tensorflow").setLevel(logging.FATAL)
# date = datetime.now().strftime("%m%d%Y_%H%M%S")
# path_actor = "./models/actor/actor" + date + ".h5"
# path_critic = "./models/critic/actor" + date + ".h5"
# actor class
self.actor = Actor(state_dims = state_dims, action_dims = action_dims,
lr = actor_lr, batch_size = batch_size, tau = tau,
upper_bound = self.upper_bound,
fcl1_size = fcl1_size, fcl2_size = fcl2_size)
# critic class
self.critic = Critic(state_dims = state_dims, action_dims = action_dims,
lr = critic_lr, batch_size = batch_size, tau = tau,
fcl1_size = fcl1_size, fcl2_size = fcl2_size)
def get_action(self, state, step):
"""
Return the best action in the passed state, according to the model
in training. Noise added for exploration
"""
#take only random actions for the first episode
if(step > self.rand_steps):
noise = self._noise()
state = state.reshape(self.n_states, 1).T
action = self.actor.model.predict(state)[0]
action_p = action + noise
else:
#explore the action space quickly
action_p = np.random.uniform(self.lower_bound, self.upper_bound, self.n_actions)
#clip the resulting action with the bounds
action_p = np.clip(action_p, self.lower_bound, self.upper_bound)
return action_p
def learn(self):
"""
Fill the buffer up to the batch size, then train both networks with
experience from the replay buffer.
"""
if self._memory.isReady(self.batch_size):
self.train_helper()
"""
Train helper methods
train_helper
train_critic
train_actor
get_q_targets Q values to train the critic
get_gradients policy gradients to train the actor
"""
def train_helper(self):
# get experience batch
states, actions, rewards, terminal, states_n = self._memory.sample(self.batch_size)
states = tf.convert_to_tensor(states)
actions = tf.convert_to_tensor(actions)
rewards = tf.convert_to_tensor(rewards)
rewards = tf.cast(rewards, dtype=tf.float32)
states_n = tf.convert_to_tensor(states_n)
# train the critic before the actor
self.train_critic(states, actions, rewards, terminal, states_n)
self.train_actor(states)
#update the target models
self.critic.update_target()
self.actor.update_target()
def train_critic(self, states, actions, rewards, terminal, states_n):
"""
Use updated Q targets to train the critic network
"""
# TODO cleaner code, ugly passing of actor target model
self.critic.train(states, actions, rewards, terminal, states_n, self.actor.target_model, self.gamma)
def train_actor(self, states):
"""
Train the actor network with the critic evaluation
"""
# TODO cleaner code, ugly passing of critic model
self.actor.train(states, self.critic.model)
def remember(self, state, state_new, action, reward, terminal):
"""
replay buffer interfate to the outsize
"""
self._memory.remember(state, state_new, action, reward, terminal)