-
Notifications
You must be signed in to change notification settings - Fork 0
/
EnvWrappers.py
45 lines (39 loc) · 1.63 KB
/
EnvWrappers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from re import match
import gym
from gym.vector.vector_env import VectorEnv
import numpy as np
class MaskVelocityWrapper(gym.ObservationWrapper):
"""
Gym environment observation wrapper used to mask velocity terms in
observations. The intention is the make the MDP partially observatiable.
"""
def __init__(self, env: gym.Env):
super(MaskVelocityWrapper, self).__init__(env)
if issubclass(type(env), VectorEnv):
env_name: str = env.envs[0].unwrapped.spec.id
else:
env_name: str = env.unwrapped.spec.id
if env_name == "CartPole-v0":
self.mask = np.array([1., 0., 1., 0.])
elif env_name == "CartPole-v1":
self.mask = np.array([1., 0., 1., 0.])
elif env_name == "Pendulum-v0":
self.mask = np.array([1., 1., 0.])
elif env_name == "LunarLander-v2":
self.mask = np.array([1., 1., 0., 0., 1., 0., 1., 1,])
elif env_name == "LunarLanderContinuous-v2":
self.mask = np.array([1., 1., 0., 0., 1., 0., 1., 1,])
else:
raise NotImplementedError
def observation(self, observation):
return observation * self.mask
class PerturbationWrapper(gym.ObservationWrapper):
"""
Gym environment observation wrapper used to mask velocity terms in
observations. The intention is the make the MDP partially observatiable.
"""
def __init__(self, env: gym.Env, sigma: float):
super(PerturbationWrapper, self).__init__(env)
self.sigma = sigma
def observation(self, observation):
return observation + np.random.randn(*(observation.shape)) * self.sigma