-
Notifications
You must be signed in to change notification settings - Fork 0
Use with Stable Baselines3
The given code demonstrates an environment dynamic called Image
that is designed to integrate camera data into a reinforcement learning MuJoCo environment. This class provides methods for capturing camera data, preprocessing the data, and generating observations that can be used by reinforcement learning agents.
class Image:
def __init__(self, environment):
"""
Initializes the Image class with the given environment.
Parameters:
- environment: An instance of the environment class that provides camera data.
"""
self.environment = environment
shape = 64 * 64 * 3
self.observation_space = {"low": [0 for _ in range(shape)], "high": [257 for _ in range(shape)]}
self.action_space = {"low": [], "high": []}
def dynamic(self, agent, actions):
"""
Generates observations based on camera data for the given agent and actions.
Parameters:
- agent: Represents the agent for which the observation is generated.
- actions: Represents the set of actions taken by the agent.
Returns:
- reward: A placeholder value (0) indicating the reward for the agent's current state.
- observation: A flattened NumPy array representing the preprocessed camera image.
"""
image = self.environment.get_camera_data(agent)
image = cv2.resize(image[0], (64, 64))
image = th.from_numpy(image)
image = th.flatten(image)
observation = image.cpu().detach().numpy()
return 0, observation, False, {}
The Image
class has an __init__
method that initializes the environment dynamic instance with the required environment parameter. This environment represents the source from which camera data will be obtained. Additionally, the method has to define the observation space and action space for the reinforcement learning agent. In this case, the observation space is defined as a dictionary with "low" and "high" keys, representing the lower and upper bounds of the observation values.
The dynamic
method is the core of the Image
environment dynamic. It has to take in two parameters: agent
, which represents the agent's name for which the observation is generated, and actions
, which represents the set of actions taken by the agent. The method starts by capturing camera data from the environment using the get_camera_data
method, specific to the provided agent.
Next, the captured image is preprocessed. It is resized to a desired shape (in this case, 64x64) using the cv2.resize
function from the OpenCV library. The resized image is then converted to a PyTorch tensor using th.from_numpy
, assuming the PyTorch library is available. The tensor is flattened into a 1D array using th.flatten
, and finally, converted to a NumPy array using .cpu().detach().numpy()
.
The method returns a tuple (reward, observation)
, where reward
is a placeholder value of 0 indicating the reward for the agent's current state, and observation
is the flattened NumPy array representing the preprocessed camera image.
The given code consists of two functions, reward
and collision_reward
, which calculate the reward for a reinforcement learning agent based on its interaction with the environment in the context of a Mujoco gym.
def reward(mujoco_gym, agent):
target = "target"
reward = 0
if(mujoco_gym.collision(agent + "_geom0", target + "_geom0")):
reward = 1
return reward
def collision_reward(mujoco_gym, agent):
borders = ["border1", "border2", "border3", "border4"]
reward = 0
for border in borders:
if(mujoco_gym.collision(agent + "_geom0", border)):
reward = -0.5
return reward
The reward
function takes two parameters: mujoco_gym
, which represents the Mujoco gym environment, and agent
, which represents the agent in the environment. The function begins by defining a target object using the string "target". It initializes the reward to 0. The function then checks if there is a collision between the agent's geometry (identified by agent + "_geom0"
) and the target's geometry (identified by target + "_geom0"
). If a collision occurs, the reward is set to 1, indicating a positive outcome. Finally, the function returns the reward.
The collision_reward
function also takes the mujoco_gym
and agent
parameters. It defines a list of borders with the names "border1", "border2", "border3", and "border4". The reward is initially set to 0. The function then iterates over each border in the list and checks if there is a collision between the agent's geometry and the current border's geometry using mujoco_gym.collision
. If a collision is detected with any of the borders, the reward is set to -0 (which is essentially the same as 0). This indicates a negative outcome or penalty due to collision with a border. Finally, the function returns the reward.
In the context of reinforcement learning, these functions can be used to define the reward signal for the agent's interactions with the environment. The reward
function assigns a positive reward when the agent collides with the target object, potentially indicating a successful task completion. On the other hand, the collision_reward
function assigns a negative reward when the agent collides with any of the specified borders, penalizing undesired behavior such as hitting the boundaries.
def done(mujoco_gym, agent):
borders = ["border1", "border2", "border3", "border4"]
for border in borders:
if(mujoco_gym.collision(agent + "_geom0", border)):
return True
return False
The done
function takes two parameters: mujoco_gym
, which represents the Mujoco gym environment, and agent
, which represents the agent in the environment. It begins by defining a list of borders with the names "border1", "border2", "border3", and "border4".
The function then iterates over each border in the list using a for
loop. Inside the loop, it checks if there is a collision between the agent's geometry (identified by agent + "_geom0"
) and the current border's geometry using mujoco_gym.collision
function. If a collision is detected between the agent and any of the borders, the function immediately returns True
, indicating that the episode is considered done or terminated.
If the loop completes without finding any collision between the agent and the borders, the function returns False
, indicating that the episode is not yet finished.
def createEnvironment():
"""
Creates and initializes a MuJoCo_RL environment with a single agent.
Returns:
- environment: Initialized MuJoCo_RL environment with a single agent.
The function creates a MuJoCo_RL environment using the provided configuration dictionary. It sets the environment paths and defines a single agent named "agent". The configuration dictionary includes various parameters such as reward functions, termination conditions, frame skipping, environment dynamics, free joint, rendering mode, maximum number of steps, and agent cameras.
The initialized environment is then wrapped with the Single_Agent_Wrapper to focus on a single agent. Finally, the initialized and wrapped environment is returned.
"""
environment_path = ["pita_reference/mujoco/Example.xml"]
agents = ["agent"]
config_dict = {"xmlPath":environment_path, "agents":agents, "rewardFunctions":[reward, collision_reward], "doneFunctions":[done], "skipFrames":30, "environmentDynamics":[Image], "freeJoint":True, "renderMode":False, "maxSteps":4096, "agentCameras":True}
environment = MuJoCoRL(config_dict)
environment = GymWrapper(environment, agent="agent")
return environment
In the beginning, the environment path and agent names are assigned to variables. These variables are then used to construct the config_dict
, which is a dictionary containing the configuration parameters for the MuJoCo_RL environment. The reward
and collision_reward
functions are included as the reward functions in the configuration dictionary, and the done
function is added as the termination condition.
The code proceeds to create an instance of the MuJoCoRL
environment by passing the config_dict
to the constructor. This initializes the MuJoCo_RL environment with the specified configuration.
Subsequently, the initialized environment is wrapped with the GymWrapper
class, which is a custom wrapper specific to the MuJoCo_RL environment. The purpose of this wrapper is to adapt the MuJoCo_RL environment to be compatible with the OpenAI Gym interface, which is a widely used interface for reinforcement learning environments. The GymWrapper
is specifically configured for the "agent" agent.
Finally, the initialized and wrapped environment is returned by the function as the final output.
The provided code defines a custom convolutional neural network (CNN) class called CustomCNN
that serves as a feature extractor for Stable Baselines3.
class CustomCNN(BaseFeaturesExtractor):
"""
:param observation_space: (gym.Space)
:param features_dim: (int) Number of features extracted.
This corresponds to the number of unit for the last layer.
"""
def __init__(self, observation_space: spaces.Box, features_dim: int = 256):
super().__init__(observation_space, features_dim)
# We assume CxHxW images (channels first)
# Re-ordering will be done by pre-preprocessing or wrapper
self.cnn = nn.Sequential(
nn.Unflatten(1, (3, 64, 64)),
nn.Conv2d(3, 16, kernel_size=3, stride=3, padding=0),
nn.ReLU(),
nn.Conv2d(16, 32, kernel_size=3, stride=3, padding=0),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3, stride=3, padding=0),
nn.ReLU(),
nn.Flatten(),
)
# Compute shape by doing one forward pass
with th.no_grad():
n_flatten = self.cnn(
th.as_tensor(observation_space.sample()[None]).float()
).shape[1]
self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())
def forward(self, observations: th.Tensor) -> th.Tensor:
result = self.linear(self.cnn(observations))
return result
The CustomCNN
class inherits from the BaseFeaturesExtractor
class, which is a parent class providing common functionality for feature extraction. The class takes two parameters in its constructor: observation_space
, which represents the input observation space (gym.Space) of the RL environment, and features_dim
, an optional parameter specifying the number of features to be extracted by the network (default value is 256).
Inside the __init__
method, the constructor starts by calling the constructor of the BaseFeaturesExtractor
class using the super()
function. This ensures that the base class is properly initialized with the provided observation_space
and features_dim
.
The main architecture of the CNN is defined using the nn.Sequential
module from PyTorch. The CNN consists of several layers, including convolutional layers (nn.Conv2d
), activation functions (nn.ReLU
), and a flattening layer (nn.Flatten
). The network is designed to process images with shape CxHxW (channels first), assuming the image size is 64x64. The number of channels is set to 3, representing RGB images.
After defining the CNN layers, a forward pass is performed on a sample observation to determine the output shape of the CNN. This is done using the nn.as_tensor
function to convert the sample observation from the environment's observation space to a PyTorch tensor. The tensor is then passed through the CNN layers, and its shape is extracted to obtain the number of flattened features (n_flatten
).
Finally, the output of the CNN layers is passed through a linear layer (nn.Linear
) followed by an activation function (nn.ReLU
) to produce the final feature representation. The forward
method takes an input tensor observations
and applies the CNN layers and linear layer to compute the feature representation. The result is returned as the output of the forward
method.
env = createEnvironment()
learning_rate = 1e-5
network_size = [128, 128]
features_dim = 64
batch_size = 64
timesteps = 150000
policy_kwargs = dict(
features_extractor_class=CustomCNN,
features_extractor_kwargs=dict(features_dim=features_dim),
net_arch=dict(pi=network_size, vf=network_size),
lstm_hidden_size=64
)
model = RecurrentPPO("CnnLstmPolicy", env, policy_kwargs=policy_kwargs, verbose=1, batch_size=batch_size, device="cuda", learning_rate=learning_rate)
model.learn(timesteps, progress_bar=True)
model.save("model")
Developed by Microcosm.AI