-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathUtils.py
134 lines (89 loc) · 4.18 KB
/
Utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import numpy as np
import torch
import yaml
from types import SimpleNamespace as SN
from Modules.Pytoch.Discrete import Actor as torchActor, Critic as torchCritic, DQN as torchDQN
from Modules.Tensorflow.discrete import DQN as tensorflowDQN, Actor as tensorflowActor, Critic as tensorflowCritic
import copy
def get_state_by_visual(data, framework):
if framework == 'torch':
data = np.uint8(255 * np.array(data))
state = []
for i in range(data.shape[3]):
state.append(data[:, :, :, i])
return np.array(state).reshape(data.shape[0], data.shape[3], data.shape[1], data.shape[2])
else:
return data
def advantage_td_target(reward, v_value, next_v_value, done, GAMMA, framework, device):
reward = (reward + 8) / 8
if framework == 'torch':
reward = torch.FloatTensor(reward).to(device)
q_val = reward + GAMMA * next_v_value * (1 - done)
advantage = q_val - v_value
return advantage, q_val
def get_discrete_actor(args, state_dim, action_dim, ACTOR_LEARNING_RATE, device, env_info, hidden):
if state_dim is None:
if args.framework == 'torch':
actor = torchActor.visual_obs_actor(args, action_dim, ACTOR_LEARNING_RATE, device, env_info, hidden)
else:
actor = tensorflowActor.visual_obs_actor(args, action_dim, ACTOR_LEARNING_RATE, device, env_info, hidden)
else:
actor = torchActor.vector_obs_actor(args, state_dim, action_dim, ACTOR_LEARNING_RATE, device, hidden)
return actor
def get_discrete_critic(args, state_dim, action_dim, ACTOR_LEARNING_RATE, device, env_info, hidden):
if state_dim is None:
if args.framework == 'torch':
critic = torchCritic.visual_obs_critic(args, action_dim, ACTOR_LEARNING_RATE, device, env_info, hidden)
else:
critic = tensorflowCritic.visual_obs_critic(args, action_dim, ACTOR_LEARNING_RATE, device, env_info, hidden)
else:
critic = torchCritic.vector_obs_critic(args, state_dim, action_dim, ACTOR_LEARNING_RATE, device, hidden)
return critic
def get_discrete_dqn(args, state_dim, action_dim, LEARNING_RATE, device, env_info, hidden):
if state_dim is None:
if args.framework == 'torch':
dqn = torchDQN.visual_obs_dqn(args, action_dim, LEARNING_RATE, device, env_info, hidden)
else:
dqn = tensorflowDQN.visual_obs_dqn(args, action_dim, LEARNING_RATE, device, env_info, hidden)
else:
dqn = torchDQN.vector_obs_dqn(args, state_dim, action_dim, LEARNING_RATE, device, hidden)
return dqn
def init_target_network(args, model, env_info, hidden):
if args.framework == 'torch':
target_model = copy.deepcopy(model)
else:
target_model = get_discrete_dqn(args, None, model.action_space, model.learning_rate, model.device, env_info, hidden)
target_model.set_weights(model.get_weights())
return target_model
def convertToTensorInput(input, input_size, batsize=1):
input = np.reshape(input, [batsize, input_size])
return torch.FloatTensor(input)
def get_config(algorithm):
config_dir = '{0}/{1}'
with open(config_dir.format('config', "{}.yaml".format(algorithm)), "r") as f:
try:
config = yaml.load(f)
except yaml.YAMLError as exc:
assert False, "default.yaml error: {}".format(exc)
return SN(**config)
def get_device(device_name):
device = device_name if torch.cuda.is_available() else 'cpu'
return device
def update_target(mainDQN, targetDQN, framework):
if framework == 'torch':
targetDQN.load_state_dict(mainDQN.state_dict())
else:
targetDQN.set_weights(mainDQN.get_weights())
class OU_noise:
def __init__(self, action_size):
self.reset()
self.action_size = action_size
self.mu = 0.6
self.theta = 1e-5
self.sigma = 1e-2
def reset(self):
self.X = np.ones(self.action_size) * self.mu
def sample(self):
dx = self.theta * (self.mu - self.X) + self.sigma * np.random.randn(len(self.X))
self.X += dx
return self.X