diff --git a/docs/source/_static/setup/walkthrough_gear_assembly_sim_real.gif b/docs/source/_static/setup/walkthrough_gear_assembly_sim_real.gif new file mode 100644 index 00000000000..6f669dc71ce --- /dev/null +++ b/docs/source/_static/setup/walkthrough_gear_assembly_sim_real.gif @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4c7f09fd5a02123b705ed2ed9e2ad129f5a64d9c410d9d47c2bf69fb6cb61caa +size 2748857 diff --git a/docs/source/_static/setup/walkthrough_sim_real_gear_assembly_train_4.png b/docs/source/_static/setup/walkthrough_sim_real_gear_assembly_train_4.png new file mode 100644 index 00000000000..9cc11225ed6 Binary files /dev/null and b/docs/source/_static/setup/walkthrough_sim_real_gear_assembly_train_4.png differ diff --git a/docs/source/refs/troubleshooting.rst b/docs/source/refs/troubleshooting.rst index 8f3a82f3f15..18a88da7c69 100644 --- a/docs/source/refs/troubleshooting.rst +++ b/docs/source/refs/troubleshooting.rst @@ -75,8 +75,8 @@ For instance, to run a standalone script with verbose logging, you can use the f # Run the standalone script with info logging ./isaaclab.sh -p scripts/tutorials/00_sim/create_empty.py --headless --info -For more fine-grained control, you can modify the logging channels through the ``logger`` module. -For more information, please refer to its `documentation `__. +For more fine-grained control, you can modify the logging channels through the ``omni.log`` module. +For more information, please refer to its `documentation `__. Observing long load times at the start of the simulation diff --git a/docs/source/setup/walkthrough/index.rst b/docs/source/setup/walkthrough/index.rst index 2ba22662558..169a56d988d 100644 --- a/docs/source/setup/walkthrough/index.rst +++ b/docs/source/setup/walkthrough/index.rst @@ -22,3 +22,4 @@ represents a different stage of modifying the default template project to achiev technical_env_design training_jetbot_gt training_jetbot_reward_exploration + sim_to_real_training diff --git a/docs/source/setup/walkthrough/sim_to_real_training.rst b/docs/source/setup/walkthrough/sim_to_real_training.rst new file mode 100644 index 00000000000..4b7f018c01d --- /dev/null +++ b/docs/source/setup/walkthrough/sim_to_real_training.rst @@ -0,0 +1,495 @@ +.. _walkthrough_sim_to_real: + +Training for Sim-to-Real Transfer +================================== + +This tutorial walks you through how to train a gear insertion assembly reinforcement learning (RL) policy that transfers from simulation to a real robot. The workflow consists of two main stages: + +1. **Simulation Training in Isaac Lab**: Train the policy in a high-fidelity physics simulation with domain randomization +2. **Real Robot Deployment with Isaac ROS**: Deploy the trained policy on real hardware using Isaac ROS and a custom ROS inference node + +This walkthrough covers the key principles and best practices for sim-to-real transfer using Isaac Lab, illustrated with a real-world example: + +- the Gear Assembly task for the UR10e robot with the Robotiq 2F-140 gripper or 2F-85 gripper + +**Task Details:** + +The gear assembly policy operates as follows: + +1. **Initial State**: The policy assumes the gear is already grasped by the gripper at the start of the episode +2. **Input Observations**: The policy receives the pose of the gear shaft (position and orientation) in which the gear should be inserted, obtained from a separate perception pipeline +3. **Policy Output**: The policy outputs delta joint positions (incremental changes to joint angles) to control the robot arm and perform the insertion +4. **Generalization**: The trained policy generalizes across 3 different gear sizes without requiring retraining for each size + + +.. figure:: ../../_static/setup/walkthrough_gear_assembly_sim_real.gif + :align: center + :figwidth: 100% + :alt: Comparison of gear assembly in simulation versus real hardware + + Sim-to-real transfer: Gear assembly policy trained in Isaac Lab (left) successfully deployed on real UR10e robot (right). + +This environment has been successfully deployed on real UR10e robots without an IsaacLab dependency. + +**Scope of This Tutorial:** + +This tutorial focuses exclusively on the **training part** of the sim-to-real transfer workflow in Isaac Lab. For the complete deployment workflow on the real robot, including the exact steps to set up the vision pipeline, robot interface and the ROS inference node to run your trained policy on real hardware, please refer to the `Isaac ROS Documentation `_. + +Overview +-------- + +Successful sim-to-real transfer requires addressing three fundamental aspects: + +1. **Input Consistency**: Ensuring the observations your policy receives in simulation match those available on the real robot +2. **System Response Consistency**: Ensuring the robot and environment respond to actions in simulation the same way they do in reality +3. **Output Consistency**: Ensuring any post-processing applied to policy outputs in Isaac Lab is also applied during real-world inference + +When all three aspects are properly addressed, policies trained purely in simulation can achieve robust performance on real hardware without any real-world training data. + +**Debugging Tip**: When your policy fails on the real robot, the best approach to debug is to set up the real robot with the same initial observations as in simulation, then compare how the controller/system respond. This isolates whether the problem is from observation mismatch (Input Consistency) or physics/controller mismatch (System Response Consistency). + +Part 1: Input Consistency +-------------------------- + +The observations your policy receives must be consistent between simulation and reality. This means: + +1. The observation space should only include information available from real sensors +2. Sensor noise and delays should be modeled appropriately + +Using Real-Robot-Available Observations +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Your simulation environment should only use observations that are available on the real robot and not use "privileged" information that wouldn't be available in deployment. + + +Observation Specification: Isaac-Deploy-GearAssembly-UR10e-2F140-v0 +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The Gear Assembly environment uses both proprioceptive and exteroceptive (vision) observations: + +.. list-table:: Gear Assembly Environment Observations + :widths: 25 25 25 25 + :header-rows: 1 + + * - Observation + - Dimension + - Real-World Source + - Noise in Training + * - ``joint_pos`` + - 6 (UR10e arm joints) + - UR10e controller + - None (proprioceptive) + * - ``joint_vel`` + - 6 (UR10e arm joints) + - UR10e controller + - None (proprioceptive) + * - ``gear_shaft_pos`` + - 3 (x, y, z position) + - FoundationPose + RealSense depth + - ±0.005 m (5mm, estimated error from FoundationPose + RealSense depth pipeline) + * - ``gear_shaft_quat`` + - 4 (quaternion orientation) + - FoundationPose + RealSense depth + - ±0.01 per component (~5° angular error, estimated error from FoundationPose + RealSense depth pipeline) + +**Implementation:** + +.. code-block:: python + + from isaaclab.utils.noise import AdditiveUniformNoiseCfg as Unoise + + @configclass + class PolicyCfg(ObsGroup): + """Observations for policy group.""" + + # Robot joint states - NO noise for proprioceptive observations + joint_pos = ObsTerm( + func=mdp.joint_pos, + params={"asset_cfg": SceneEntityCfg("robot", joint_names=["shoulder_pan_joint", ...])}, + ) + + joint_vel = ObsTerm( + func=mdp.joint_vel, + params={"asset_cfg": SceneEntityCfg("robot", joint_names=["shoulder_pan_joint", ...])}, + ) + + # Gear shaft pose from FoundationPose perception + # ADD noise for exteroceptive (vision-based) observations + # Calibrated to match FoundationPose + RealSense D435 error + # Typical error: 3-8mm position, 3-7° orientation + gear_shaft_pos = ObsTerm( + func=mdp.gear_shaft_pos_w, + params={"asset_cfg": SceneEntityCfg("factory_gear_base")}, + noise=Unoise(n_min=-0.005, n_max=0.005), # ±5mm + ) + + # Quaternion noise: small uniform noise on each component + # Results in ~5° orientation error + gear_shaft_quat = ObsTerm( + func=mdp.gear_shaft_quat_w, + params={"asset_cfg": SceneEntityCfg("factory_gear_base")}, + noise=Unoise(n_min=-0.01, n_max=0.01), + ) + + def __post_init__(self): + self.enable_corruption = True # Enable for perception observations only + self.concatenate_terms = True + +**Why No Noise for Proprioceptive Observations?** + +Empirically, we found that policies trained without noise on proprioceptive observations (joint positions and velocities) transfer well to real hardware. The UR10e controller provides sufficiently accurate joint state feedback that modeling sensor noise doesn't improve sim-to-real transfer for these tasks. + + +Part 2: System Response Consistency +------------------------------------ + +Once your observations are consistent, you need to ensure the simulated robot and environment respond to actions the same way the real system does. In this use case, this involves three main aspects: + +1. Physics simulation parameters (friction, contact properties) +2. Actuator modeling (PD controller gains, effort limits) +3. Domain randomization + +Physics Parameter Tuning +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Accurate physics simulation is critical for contact-rich tasks. Key parameters include: + +- Friction coefficients (static and dynamic) +- Contact solver parameters +- Material properties +- Rigid body properties + +**Example: Gear Assembly Physics Configuration** + +The Gear Assembly task requires accurate contact modeling for insertion. Here's how friction is configured: + +.. code-block:: python + + # From joint_pos_env_cfg.py in Isaac-Deploy-GearAssembly-UR10e-2F140-v0 + + @configclass + class EventCfg: + """Configuration for events including physics randomization.""" + + # Randomize friction for gear objects + small_gear_physics_material = EventTerm( + func=mdp.randomize_rigid_body_material, + mode="startup", + params={ + "asset_cfg": SceneEntityCfg("factory_gear_small", body_names=".*"), + "static_friction_range": (0.75, 0.75), # Calibrated to real gear material + "dynamic_friction_range": (0.75, 0.75), + "restitution_range": (0.0, 0.0), # No bounce + "num_buckets": 16, + }, + ) + + # Similar configuration for gripper fingers + robot_physics_material = EventTerm( + func=mdp.randomize_rigid_body_material, + mode="startup", + params={ + "asset_cfg": SceneEntityCfg("robot", body_names=".*finger"), + "static_friction_range": (0.75, 0.75), # Calibrated to real gripper + "dynamic_friction_range": (0.75, 0.75), + "restitution_range": (0.0, 0.0), + "num_buckets": 16, + }, + ) + +These friction values (0.75) were determined through iterative visual comparison: + +1. Record videos of the gear being grasped and manipulated on real hardware +2. Start training in simulation and observe the live simulation viewer +3. Look for physics issues (penetration, unrealistic slipping, poor contact) +4. Adjust friction coefficients and solver parameters and retry +5. Compare the gear's behavior in the gripper visually between sim and real +6. Repeat adjustments until behavior matches (no need to wait for full policy training) +7. Once physics looks good, train in headless mode with video recording: + + .. code-block:: bash + + python scripts/reinforcement_learning/train.py \ + --task Isaac-Deploy-GearAssembly-UR10e-2F140-v0 \ + --headless \ + --video --video_length 800 --video_interval 5000 + +8. Review the recorded videos and compare with real hardware videos to verify physics behavior + +**Contact Solver Configuration** + +Contact-rich manipulation requires careful solver tuning. These parameters were calibrated through the same iterative visual comparison process as the friction coefficients: + +.. code-block:: python + + # Robot rigid body properties + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=True, # Robot is mounted, no gravity + max_depenetration_velocity=5.0, # Control interpenetration resolution + linear_damping=0.0, # No artificial damping + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, # Important for accurate dynamics + solver_position_iteration_count=4, # Balance accuracy vs performance + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, # Allow large contact forces + ), + +**Important**: The ``solver_position_iteration_count`` is a critical parameter for contact-rich tasks. Increasing this value improves collision simulation stability and reduces penetration issues, but it also increases simulation and training time. For the gear assembly task, we use ``solver_position_iteration_count=4`` as a balance between physics accuracy and computational performance. If you observe penetration or unstable contacts, try increasing to 8 or 16, but expect slower training. + +.. code-block:: python + + # Articulation properties + articulation_props=sim_utils.ArticulationRootPropertiesCfg( + enabled_self_collisions=False, + solver_position_iteration_count=4, + solver_velocity_iteration_count=1, + ), + + # Contact properties + collision_props=sim_utils.CollisionPropertiesCfg( + contact_offset=0.005, # 5mm contact detection distance + rest_offset=0.0, # Objects touch at 0 distance + ), + +Actuator Modeling +~~~~~~~~~~~~~~~~~ + +Accurate actuator modeling ensures the simulated robot moves like the real one. This includes: + +- PD controller gains (stiffness and damping) +- Effort and velocity limits +- Joint friction + +**Controller Choice: Impedance Control** + +For the UR10e deployment, we use an impedance controller interface. Using a simpler controller like impedance control reduces the chances of variation between simulation and reality compared to more complex controllers (e.g., operational space control, hybrid force-position control). Simpler controllers: + +- Have fewer parameters that can mismatch between sim and real +- Are easier to model accurately in simulation +- Have more predictable behavior that's easier to replicate +- Reduce the controller complexity as a source of sim-real gap + +**Example: UR10e Actuator Configuration** + +.. code-block:: python + + # Default UR10e actuator configuration + actuators = { + "arm": ImplicitActuatorCfg( + joint_names_expr=["shoulder_pan_joint", "shoulder_lift_joint", + "elbow_joint", "wrist_1_joint", "wrist_2_joint", "wrist_3_joint"], + effort_limit=87.0, # From UR10e specifications + velocity_limit=2.0, # From UR10e specifications + stiffness=800.0, # Calibrated to match real behavior + damping=40.0, # Calibrated to match real behavior + ), + } + +**Domain Randomization of Actuator Parameters** + +To account for variations in real robot behavior, randomize actuator gains during training: + +.. code-block:: python + + # From EventCfg in the Gear Assembly environment + robot_joint_stiffness_and_damping = EventTerm( + func=mdp.randomize_actuator_gains, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", joint_names=["shoulder_.*", "elbow_.*", "wrist_.*"]), + "stiffness_distribution_params": (0.75, 1.5), # 75% to 150% of nominal + "damping_distribution_params": (0.3, 3.0), # 30% to 300% of nominal + "operation": "scale", + "distribution": "log_uniform", + }, + ) + + +**Joint Friction Randomization** + +Real robots have friction in their joints that varies with position, velocity, and temperature. For the UR10e with impedance controller interface, we observed significant stiction (static friction) causing the controller to not reach target joint positions. + +**Characterizing Real Robot Behavior:** + +To quantify this behavior, we plotted the step response of the impedance controller on the real robot and observed contact offsets of approximately 0.25 degrees from the commanded setpoint. This steady-state error is caused by joint friction opposing the controller's commanded motion. Based on these measurements, we added joint friction modeling in simulation to replicate this behavior: + +.. code-block:: python + + joint_friction = EventTerm( + func=mdp.randomize_joint_parameters, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", joint_names=["shoulder_.*", "elbow_.*", "wrist_.*"]), + "friction_distribution_params": (0.3, 0.7), # Add 0.3 to 0.7 Nm friction + "operation": "add", + "distribution": "uniform", + }, + ) + +**Why Joint Friction Matters**: Without modeling joint friction in simulation, the policy learns to expect that commanded joint positions are always reached. On the real robot, stiction prevents small movements and causes steady-state errors. By adding friction during training, the policy learns to account for these effects and commands appropriately larger motions to overcome friction. + +**Compensating for Stiction with Action Scaling:** + +To help the policy overcome stiction on the real robot, we also increased the output action scaling. The Isaac ROS documentation notes that a higher action scale (0.0325 vs 0.025) is needed to overcome the higher static friction (stiction) compared to the 2F-85 gripper. This increased scaling ensures the policy commands are large enough to overcome the friction forces observed in the step response analysis. + +Action Space Design +~~~~~~~~~~~~~~~~~~~ + +Your action space should match what the real robot controller can execute. For this task we found that **incremental joint position control** is the most reliable approach. + +**Example: Gear Assembly Action Configuration** + +.. code-block:: python + + # For contact-rich manipulation, smaller action scale for more precise control + self.joint_action_scale = 0.025 # ±2.5 degrees per step + + self.actions.arm_action = mdp.RelativeJointPositionActionCfg( + asset_name="robot", + joint_names=["shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", + "wrist_1_joint", "wrist_2_joint", "wrist_3_joint"], + scale=self.joint_action_scale, + use_zero_offset=True, + ) + +The action scale is a critical hyperparameter that should be tuned based on: + +- Task precision requirements (smaller for contact-rich tasks) +- Control frequency (higher frequency allows larger steps) + +Domain Randomization Strategy +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Domain randomization should cover the range of conditions in which you want the real robot to perform. Increasing randomization ranges makes it harder for the policy to learn, but allows for larger variations in inputs and system parameters. The key is to balance training difficulty with robustness: randomize enough to cover real-world variations, but not so much that the policy cannot learn effectively. + +**Pose Randomization** + +For manipulation tasks, randomize object poses to ensure the policy works across the workspace: + +.. code-block:: python + + # From Gear Assembly environment + randomize_gears_and_base_pose = EventTerm( + func=gear_assembly_events.randomize_gears_and_base_pose, + mode="reset", + params={ + "pose_range": { + "x": [-0.1, 0.1], # ±10cm + "y": [-0.25, 0.25], # ±25cm + "z": [-0.1, 0.1], # ±10cm + "roll": [-math.pi/90, math.pi/90], # ±2 degrees + "pitch": [-math.pi/90, math.pi/90], # ±2 degrees + "yaw": [-math.pi/6, math.pi/6], # ±30 degrees + }, + "gear_pos_range": { + "x": [-0.02, 0.02], # ±2cm relative to base + "y": [-0.02, 0.02], + "z": [0.0575, 0.0775], # 5.75-7.75cm above base + }, + "rot_randomization_range": { + "roll": [-math.pi/36, math.pi/36], # ±5 degrees + "pitch": [-math.pi/36, math.pi/36], + "yaw": [-math.pi/36, math.pi/36], + }, + }, + ) + +**Initial State Randomization** + +Randomizing the robot's initial configuration helps the policy handle different starting conditions: + +.. code-block:: python + + set_robot_to_grasp_pose = EventTerm( + func=gear_assembly_events.set_robot_to_grasp_pose, + mode="reset", + params={ + "robot_asset_cfg": SceneEntityCfg("robot"), + "rot_offset": [0.0, math.sqrt(2)/2, math.sqrt(2)/2, 0.0], # Base gripper orientation + "pos_randomization_range": { + "x": [-0.0, 0.0], + "y": [-0.005, 0.005], # ±5mm variation + "z": [-0.003, 0.003], # ±3mm variation + }, + "gripper_type": "2f_140", + }, + ) + +Part 3: Training the Policy in Isaac Lab +----------------------------------------- + +Now that we've covered the key principles for sim-to-real transfer, let's train the gear assembly policy in Isaac Lab. + +Step 1: Visualize the Environment +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +First, launch the training with a small number of environments and visualization enabled to verify that the environment is set up correctly: + +.. code-block:: bash + + # Launch training with visualization + python scripts/reinforcement_learning/train.py \ + --task Isaac-Deploy-GearAssembly-UR10e-2F140-v0 \ + --num_envs 4 + +This will open the Isaac Sim viewer where you can observe the training process in real-time. + +.. figure:: ../../_static/setup/walkthrough_sim_real_gear_assembly_train_4.png + :align: center + :figwidth: 100% + :alt: Gear assembly training visualization in Isaac Lab + + Training visualization showing multiple parallel environments with robots grasping gears. + +**What to Expect:** + +In the early stages of training, you should see the robots moving around with the gears grasped by the grippers, but they won't be successfully inserting the gears yet. This is expected behavior as the policy is still learning. The robots will move the grasped gear in various directions. Once you've verified the environment looks correct, stop the training (Ctrl+C) and proceed to full-scale training. + +Step 2: Full-Scale Training with Video Recording +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Now launch the full training run with more parallel environments in headless mode for faster training. We'll also enable video recording to monitor progress: + +.. code-block:: bash + + # Full training with video recording + python scripts/reinforcement_learning/train.py \ + --task Isaac-Deploy-GearAssembly-UR10e-2F140-v0 \ + --headless \ + --num_envs 1024 \ + --video --video_length 800 --video_interval 5000 + +This command will: + +- Run 1024 parallel environments for efficient training +- Run in headless mode (no visualization) for maximum performance +- Record videos every 5000 steps to monitor training progress +- Save videos with 800 frames each + +Training typically takes ~6-12 hours for a robust insertion policy. The videos will be saved in the ``logs`` directory and can be reviewed to assess policy performance during training. + +Step 3: Deploy on Real Robot +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Once training is complete, follow the `Isaac ROS inference documentation `_ to deploy your policy. + +The Isaac ROS deployment pipeline directly uses the trained model checkpoint (``.pt`` file) along with the ``agent.yaml`` and ``env.yaml`` configuration files generated during training. No additional export step is required. + +The deployment pipeline uses Isaac ROS and a custom ROS inference node to run the policy on real hardware. The pipeline includes: + +1. **Perception**: Camera-based pose estimation (FoundationPose, Segment Anything) +2. **Motion Planning**: cuMotion for collision-free trajectories +3. **Policy Inference**: Your trained policy running at control frequency in a custom ROS inference node +4. **Robot Control**: Low-level controller executing commands + + +Further Resources +----------------- + +- Sim-to-Real Policy Transfer for Whole Body Controllers: :ref:`sim2real` - Shows how to train and deploy a whole body controller for legged robots using Isaac Lab with the Newton backend +- `Isaac ROS Manipulation Documentation `_ +- `Isaac ROS Gear Assembly Tutorial `_ +- Isaac Lab API Documentation: :ref:`api_lab` +- RL Training Tutorial: :ref:`tutorial_03_envs_rl_training` diff --git a/source/isaaclab/isaaclab/envs/mdp/rewards.py b/source/isaaclab/isaaclab/envs/mdp/rewards.py index 20df2a557eb..4232c0a8895 100644 --- a/source/isaaclab/isaaclab/envs/mdp/rewards.py +++ b/source/isaaclab/isaaclab/envs/mdp/rewards.py @@ -241,11 +241,17 @@ def applied_torque_limits(env: ManagerBasedRLEnv, asset_cfg: SceneEntityCfg = Sc ) return torch.sum(out_of_limits, dim=1) +def action_rate(env: ManagerBasedRLEnv) -> torch.Tensor: + """Penalize the rate of change of the actions using L2 norm.""" + return torch.norm(env.action_manager.action - env.action_manager.prev_action, p=2, dim=-1) def action_rate_l2(env: ManagerBasedRLEnv) -> torch.Tensor: """Penalize the rate of change of the actions using L2 squared kernel.""" return torch.sum(torch.square(env.action_manager.action - env.action_manager.prev_action), dim=1) +def action(env: ManagerBasedRLEnv) -> torch.Tensor: + """Penalize the actions using L2 squared kernel (summed for each environment).""" + return torch.sum(env.action_manager.action**2, dim=-1) def action_l2(env: ManagerBasedRLEnv) -> torch.Tensor: """Penalize the actions using L2 squared kernel.""" diff --git a/source/isaaclab/isaaclab/utils/noise/__init__.py b/source/isaaclab/isaaclab/utils/noise/__init__.py index d2f703758b0..ef2cea78582 100644 --- a/source/isaaclab/isaaclab/utils/noise/__init__.py +++ b/source/isaaclab/isaaclab/utils/noise/__init__.py @@ -26,8 +26,8 @@ """ from .noise_cfg import NoiseCfg # noqa: F401 -from .noise_cfg import ConstantNoiseCfg, GaussianNoiseCfg, NoiseModelCfg, NoiseModelWithAdditiveBiasCfg, UniformNoiseCfg -from .noise_model import NoiseModel, NoiseModelWithAdditiveBias, constant_noise, gaussian_noise, uniform_noise +from .noise_cfg import ConstantNoiseCfg, GaussianNoiseCfg, NoiseModelCfg, NoiseModelWithAdditiveBiasCfg, UniformNoiseCfg, ResetSampledNoiseModelCfg +from .noise_model import NoiseModel, NoiseModelWithAdditiveBias, ResetSampledNoiseModel, constant_noise, gaussian_noise, uniform_noise # Backward compatibility ConstantBiasNoiseCfg = ConstantNoiseCfg diff --git a/source/isaaclab/isaaclab/utils/noise/noise_cfg.py b/source/isaaclab/isaaclab/utils/noise/noise_cfg.py index 0c49828b3ff..3de46bc89f1 100644 --- a/source/isaaclab/isaaclab/utils/noise/noise_cfg.py +++ b/source/isaaclab/isaaclab/utils/noise/noise_cfg.py @@ -109,3 +109,15 @@ class NoiseModelWithAdditiveBiasCfg(NoiseModelCfg): Defaults to True. """ + +@configclass +class ResetSampledNoiseModelCfg(NoiseModelCfg): + """Configuration for a noise model that samples noise ONLY during reset.""" + + class_type: type = noise_model.ResetSampledNoiseModel + + noise_cfg: NoiseCfg = MISSING + """The noise configuration for the noise. + + Based on this configuration, the noise is sampled at every reset of the noise model. + """ diff --git a/source/isaaclab/isaaclab/utils/noise/noise_model.py b/source/isaaclab/isaaclab/utils/noise/noise_model.py index dae36b55c72..d780d916067 100644 --- a/source/isaaclab/isaaclab/utils/noise/noise_model.py +++ b/source/isaaclab/isaaclab/utils/noise/noise_model.py @@ -189,3 +189,71 @@ def __call__(self, data: torch.Tensor) -> torch.Tensor: # now re-sample that expanded bias in-place self.reset() return super().__call__(data) + self._bias + +class ResetSampledNoiseModel(NoiseModel): + """Noise model that samples noise ONLY during reset and applies it consistently. + + The noise is sampled from the configured distribution ONLY during reset and applied consistently + until the next reset. Unlike regular noise that generates new random values every step, + this model maintains the same noise values throughout an episode. + """ + + def __init__(self, noise_model_cfg: noise_cfg.NoiseModelCfg, num_envs: int, device: str): + # initialize parent class + super().__init__(noise_model_cfg, num_envs, device) + # store the noise configuration + self._noise_cfg = noise_model_cfg.noise_cfg + self._sampled_noise = torch.zeros((num_envs, 1), device=self._device) + self._num_components: int | None = None + + def reset(self, env_ids: Sequence[int] | None = None): + """Reset the noise model by sampling NEW noise values. + + This method samples new noise for the specified environments using the configured noise function. + The sampled noise will remain constant until the next reset. + + Args: + env_ids: The environment ids to reset the noise model for. Defaults to None, + in which case all environments are considered. + """ + # resolve the environment ids + if env_ids is None: + env_ids = slice(None) + + # Use the existing noise function to sample new noise + # Create dummy data to sample from the noise function + dummy_data = torch.zeros((env_ids.stop - env_ids.start if isinstance(env_ids, slice) else len(env_ids), 1), + device=self._device) + + # Sample noise using the configured noise function + sampled_noise = self._noise_model_cfg.noise_cfg.func(dummy_data, self._noise_model_cfg.noise_cfg) + + self._sampled_noise[env_ids] = sampled_noise + + def __call__(self, data: torch.Tensor) -> torch.Tensor: + """Apply the pre-sampled noise to the data. + + This method applies the noise that was sampled during the last reset. + No new noise is generated - the same values are used consistently. + + Args: + data: The data to apply the noise to. Shape is (num_envs, ...). + + Returns: + The data with the noise applied. Shape is the same as the input data. + """ + # on first apply, expand noise to match last dim of data + if self._num_components is None: + *_, self._num_components = data.shape + # expand noise from (num_envs,1) to (num_envs, num_components) + self._sampled_noise = self._sampled_noise.repeat(1, self._num_components) + + # apply the noise based on operation + if self._noise_cfg.operation == "add": + return data + self._sampled_noise + elif self._noise_cfg.operation == "scale": + return data * self._sampled_noise + elif self._noise_cfg.operation == "abs": + return self._sampled_noise + else: + raise ValueError(f"Unknown operation in noise: {self._noise_cfg.operation}") diff --git a/source/isaaclab_assets/isaaclab_assets/robots/universal_robots.py b/source/isaaclab_assets/isaaclab_assets/robots/universal_robots.py index 4433b824235..2d6f0bae1b1 100644 --- a/source/isaaclab_assets/isaaclab_assets/robots/universal_robots.py +++ b/source/isaaclab_assets/isaaclab_assets/robots/universal_robots.py @@ -10,6 +10,7 @@ * :obj:`UR10_CFG`: The UR10 arm without a gripper. * :obj:`UR10E_ROBOTIQ_GRIPPER_CFG`: The UR10E arm with Robotiq_2f_140 gripper. +* :obj:`UR10e_ROBOTIQ_2F_85_CFG`: The UR10E arm with Robotiq 2F-85 gripper. Reference: https://github.com/ros-industrial/universal_robot """ @@ -163,3 +164,43 @@ ) """Configuration of UR-10E arm with Robotiq_2f_140 gripper.""" + +UR10e_ROBOTIQ_2F_85_CFG = UR10e_CFG.copy() +UR10e_ROBOTIQ_2F_85_CFG.spawn.variants = {"Gripper": "Robotiq_2f_85"} +UR10e_ROBOTIQ_2F_85_CFG.spawn.rigid_props.disable_gravity = True +UR10e_ROBOTIQ_2F_85_CFG.init_state.joint_pos["finger_joint"] = 0.0 +UR10e_ROBOTIQ_2F_85_CFG.init_state.joint_pos[".*_inner_finger_joint"] = 0.0 +UR10e_ROBOTIQ_2F_85_CFG.init_state.joint_pos[".*_inner_finger_knuckle_joint"] = 0.0 +UR10e_ROBOTIQ_2F_85_CFG.init_state.joint_pos[".*_outer_.*_joint"] = 0.0 +# the major actuator joint for gripper +UR10e_ROBOTIQ_2F_85_CFG.actuators["gripper_drive"] = ImplicitActuatorCfg( + joint_names_expr=["finger_joint"], # "right_outer_knuckle_joint" is its mimic joint + effort_limit_sim=10.0, + velocity_limit_sim=1.0, + stiffness=11.25, + damping=0.1, + friction=0.0, + armature=0.0, +) +# enable the gripper to grasp in a parallel manner +UR10e_ROBOTIQ_2F_85_CFG.actuators["gripper_finger"] = ImplicitActuatorCfg( + joint_names_expr=[".*_inner_finger_joint"], + effort_limit_sim=1.0, + velocity_limit_sim=1.0, + stiffness=0.2, + damping=0.001, + friction=0.0, + armature=0.0, +) +# set PD to zero for passive joints in close-loop gripper +UR10e_ROBOTIQ_2F_85_CFG.actuators["gripper_passive"] = ImplicitActuatorCfg( + joint_names_expr=[".*_inner_finger_knuckle_joint", "right_outer_knuckle_joint"], + effort_limit_sim=1.0, + velocity_limit_sim=1.0, + stiffness=0.0, + damping=0.0, + friction=0.0, + armature=0.0, +) + +"""Configuration of UR-10E arm with Robotiq 2F-85 gripper.""" diff --git a/source/isaaclab_rl/isaaclab_rl/rsl_rl/rl_cfg.py b/source/isaaclab_rl/isaaclab_rl/rsl_rl/rl_cfg.py index 5b03a7c639b..bfc212f4006 100644 --- a/source/isaaclab_rl/isaaclab_rl/rsl_rl/rl_cfg.py +++ b/source/isaaclab_rl/isaaclab_rl/rsl_rl/rl_cfg.py @@ -31,6 +31,9 @@ class RslRlPpoActorCriticCfg: noise_std_type: Literal["scalar", "log"] = "scalar" """The type of noise standard deviation for the policy. Default is scalar.""" + state_dependent_std: bool = False + """Whether to use state-dependent standard deviation for the policy. Default is False.""" + actor_obs_normalization: bool = MISSING """Whether to normalize the observation for the actor network.""" diff --git a/source/isaaclab_rl/setup.py b/source/isaaclab_rl/setup.py index 705863eb9f2..dbb7bad4b94 100644 --- a/source/isaaclab_rl/setup.py +++ b/source/isaaclab_rl/setup.py @@ -50,7 +50,6 @@ } # Add the names with hyphens as aliases for convenience EXTRAS_REQUIRE["rl_games"] = EXTRAS_REQUIRE["rl-games"] -EXTRAS_REQUIRE["rsl_rl"] = EXTRAS_REQUIRE["rsl-rl"] # Cumulation of all extra-requires EXTRAS_REQUIRE["all"] = list(itertools.chain.from_iterable(EXTRAS_REQUIRE.values())) diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/__init__.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/__init__.py new file mode 100644 index 00000000000..525f8c7d27a --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Assemble 3 gears into a base.""" diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/__init__.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/__init__.py new file mode 100644 index 00000000000..257dea87052 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Configurations for arm-based gear assembly environments.""" + +# We leave this file empty since we don't want to expose any configs in this package directly. +# We still need this file to import the "config" module in the parent package. diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/__init__.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/__init__.py new file mode 100644 index 00000000000..d10731973b8 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/__init__.py @@ -0,0 +1,75 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +import gymnasium as gym + +from . import agents + +## +# Register Gym environments. +## + + +# UR10e with 2F-140 gripper +gym.register( + id="Isaac-Deploy-GearAssembly-UR10e-2F140-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + disable_env_checker=True, + kwargs={ + "env_cfg_entry_point": f"{__name__}.joint_pos_env_cfg:UR10e2F140GearAssemblyEnvCfg", + "rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:UR10GearAssemblyRNNPPORunnerCfg", + }, +) + +gym.register( + id="Isaac-Deploy-GearAssembly-UR10e-2F140-Play-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + disable_env_checker=True, + kwargs={ + "env_cfg_entry_point": f"{__name__}.joint_pos_env_cfg:UR10e2F140GearAssemblyEnvCfg_PLAY", + }, +) + +# UR10e with 2F-85 gripper +gym.register( + id="Isaac-Deploy-GearAssembly-UR10e-2F85-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + disable_env_checker=True, + kwargs={ + "env_cfg_entry_point": f"{__name__}.joint_pos_env_cfg:UR10e2F85GearAssemblyEnvCfg", + "rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:UR10GearAssemblyRNNPPORunnerCfg", + }, +) + +gym.register( + id="Isaac-Deploy-GearAssembly-UR10e-2F85-Play-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + disable_env_checker=True, + kwargs={ + "env_cfg_entry_point": f"{__name__}.joint_pos_env_cfg:UR10e2F85GearAssemblyEnvCfg_PLAY", + }, +) + +# UR10e with 2F-140 gripper - ROS Inference +gym.register( + id="Isaac-Deploy-GearAssembly-UR10e-2F140-ROS-Inference-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + disable_env_checker=True, + kwargs={ + "env_cfg_entry_point": f"{__name__}.ros_inference_env_cfg:UR10e2F140GearAssemblyROSInferenceEnvCfg", + "rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:UR10GearAssemblyRNNPPORunnerCfg", + }, +) + +# UR10e with 2F-85 gripper - ROS Inference +gym.register( + id="Isaac-Deploy-GearAssembly-UR10e-2F85-ROS-Inference-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + disable_env_checker=True, + kwargs={ + "env_cfg_entry_point": f"{__name__}.ros_inference_env_cfg:UR10e2F85GearAssemblyROSInferenceEnvCfg", + "rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:UR10GearAssemblyRNNPPORunnerCfg", + }, +) diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/agents/__init__.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/agents/__init__.py new file mode 100644 index 00000000000..bcc238c84a9 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/agents/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/agents/rsl_rl_ppo_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/agents/rsl_rl_ppo_cfg.py new file mode 100644 index 00000000000..800905e51f9 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/agents/rsl_rl_ppo_cfg.py @@ -0,0 +1,51 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from isaaclab.utils import configclass + +from isaaclab_rl.rsl_rl import RslRlOnPolicyRunnerCfg, RslRlPpoActorCriticRecurrentCfg, RslRlPpoAlgorithmCfg + + +@configclass +class UR10GearAssemblyRNNPPORunnerCfg(RslRlOnPolicyRunnerCfg): + seed = 7858 + num_steps_per_env = 512 + max_iterations = 1500 + save_interval = 50 + experiment_name = "gear_assembly_ur10e" + clip_actions = 1.0 + resume = False + value_normalization = False + obs_groups = { + "policy": ["policy"], + "critic": ["critic"], + } + policy = RslRlPpoActorCriticRecurrentCfg( + state_dependent_std=True, + init_noise_std=1.0, + actor_obs_normalization=True, + critic_obs_normalization=True, + actor_hidden_dims=[256, 128, 64], + critic_hidden_dims=[256, 128, 64], + noise_std_type="log", + activation="elu", + rnn_type="lstm", + rnn_hidden_dim=256, + rnn_num_layers=2, + ) + algorithm = RslRlPpoAlgorithmCfg( + value_loss_coef=1.0, + use_clipped_value_loss=True, + clip_param=0.2, + entropy_coef=0.0, + num_learning_epochs=8, + num_mini_batches=16, + learning_rate=5.0e-4, + schedule="adaptive", + gamma=0.99, + lam=0.95, + desired_kl=0.008, + max_grad_norm=1.0, + ) diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/joint_pos_env_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/joint_pos_env_cfg.py new file mode 100644 index 00000000000..46c93df560b --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/joint_pos_env_cfg.py @@ -0,0 +1,522 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +import math +import torch + +import isaaclab.sim as sim_utils +from isaaclab.actuators import ImplicitActuatorCfg +from isaaclab.assets import ArticulationCfg +from isaaclab.managers import EventTermCfg as EventTerm +from isaaclab.managers import SceneEntityCfg +from isaaclab.utils import configclass + +import isaaclab_tasks.manager_based.manipulation.deploy.mdp as mdp +import isaaclab_tasks.manager_based.manipulation.deploy.mdp.events as gear_assembly_events +from isaaclab_tasks.manager_based.manipulation.deploy.gear_assembly.gear_assembly_env_cfg import GearAssemblyEnvCfg + +## +# Pre-defined configs +## +from isaaclab_assets.robots.universal_robots import UR10e_ROBOTIQ_GRIPPER_CFG, UR10e_ROBOTIQ_2F_85_CFG # isort: skip + + +## +# Gripper-specific helper functions +## + + +def set_finger_joint_pos_robotiq_2f140( + joint_pos: torch.Tensor, + reset_ind_joint_pos: list[int], + finger_joints: list[int], + finger_joint_position: float, +): + """Set finger joint positions for Robotiq 2F-140 gripper. + + Args: + joint_pos: Joint positions tensor + reset_ind_joint_pos: Row indices into the sliced joint_pos tensor + finger_joints: List of finger joint indices + finger_joint_position: Target position for finger joints + """ + for idx in reset_ind_joint_pos: + # For 2F-140 gripper (8 joints expected) + # Joint structure: [finger_joint, finger_joint, outer_joints x2, inner_finger_joints x2, pad_joints x2] + if len(finger_joints) < 8: + raise ValueError(f"2F-140 gripper requires at least 8 finger joints, got {len(finger_joints)}") + + joint_pos[idx, finger_joints[0]] = finger_joint_position + joint_pos[idx, finger_joints[1]] = finger_joint_position + + # outer finger joints set to 0 + joint_pos[idx, finger_joints[2]] = 0 + joint_pos[idx, finger_joints[3]] = 0 + + # inner finger joints: multiply by -1 + joint_pos[idx, finger_joints[4]] = -finger_joint_position + joint_pos[idx, finger_joints[5]] = -finger_joint_position + + joint_pos[idx, finger_joints[6]] = finger_joint_position + joint_pos[idx, finger_joints[7]] = finger_joint_position + + +def set_finger_joint_pos_robotiq_2f85( + joint_pos: torch.Tensor, + reset_ind_joint_pos: list[int], + finger_joints: list[int], + finger_joint_position: float, +): + """Set finger joint positions for Robotiq 2F-85 gripper. + + Args: + joint_pos: Joint positions tensor + reset_ind_joint_pos: Row indices into the sliced joint_pos tensor + finger_joints: List of finger joint indices + finger_joint_position: Target position for finger joints + """ + for idx in reset_ind_joint_pos: + # For 2F-85 gripper (6 joints expected) + # Joint structure: [finger_joint, finger_joint, inner_finger_joints x2, inner_finger_knuckle_joints x2] + if len(finger_joints) < 6: + raise ValueError(f"2F-85 gripper requires at least 6 finger joints, got {len(finger_joints)}") + + # Multiply specific indices by -1: [2, 4, 5] + # These correspond to ['left_inner_finger_joint', 'right_inner_finger_knuckle_joint', 'left_inner_finger_knuckle_joint'] + joint_pos[idx, finger_joints[0]] = finger_joint_position + joint_pos[idx, finger_joints[1]] = finger_joint_position + joint_pos[idx, finger_joints[2]] = -finger_joint_position + joint_pos[idx, finger_joints[3]] = finger_joint_position + joint_pos[idx, finger_joints[4]] = -finger_joint_position + joint_pos[idx, finger_joints[5]] = -finger_joint_position + + +## +# Environment configuration +## + + +@configclass +class EventCfg: + """Configuration for events.""" + + robot_joint_stiffness_and_damping = EventTerm( + func=mdp.randomize_actuator_gains, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg( + "robot", joint_names=["shoulder_.*", "elbow_.*", "wrist_.*"] + ), # only the arm joints are randomized + "stiffness_distribution_params": (0.75, 1.5), + "damping_distribution_params": (0.3, 3.0), + "operation": "scale", + "distribution": "log_uniform", + }, + ) + + joint_friction = EventTerm( + func=mdp.randomize_joint_parameters, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", joint_names=["shoulder_.*", "elbow_.*", "wrist_.*"]), + "friction_distribution_params": (0.3, 0.7), + "operation": "add", + "distribution": "uniform", + }, + ) + + small_gear_physics_material = EventTerm( + func=mdp.randomize_rigid_body_material, + mode="startup", + params={ + "asset_cfg": SceneEntityCfg("factory_gear_small", body_names=".*"), + "static_friction_range": (0.75, 0.75), + "dynamic_friction_range": (0.75, 0.75), + "restitution_range": (0.0, 0.0), + "num_buckets": 16, + }, + ) + + medium_gear_physics_material = EventTerm( + func=mdp.randomize_rigid_body_material, + mode="startup", + params={ + "asset_cfg": SceneEntityCfg("factory_gear_medium", body_names=".*"), + "static_friction_range": (0.75, 0.75), + "dynamic_friction_range": (0.75, 0.75), + "restitution_range": (0.0, 0.0), + "num_buckets": 16, + }, + ) + + large_gear_physics_material = EventTerm( + func=mdp.randomize_rigid_body_material, + mode="startup", + params={ + "asset_cfg": SceneEntityCfg("factory_gear_large", body_names=".*"), + "static_friction_range": (0.75, 0.75), + "dynamic_friction_range": (0.75, 0.75), + "restitution_range": (0.0, 0.0), + "num_buckets": 16, + }, + ) + + gear_base_physics_material = EventTerm( + func=mdp.randomize_rigid_body_material, + mode="startup", + params={ + "asset_cfg": SceneEntityCfg("factory_gear_base", body_names=".*"), + "static_friction_range": (0.75, 0.75), + "dynamic_friction_range": (0.75, 0.75), + "restitution_range": (0.0, 0.0), + "num_buckets": 16, + }, + ) + + robot_physics_material = EventTerm( + func=mdp.randomize_rigid_body_material, + mode="startup", + params={ + "asset_cfg": SceneEntityCfg("robot", body_names=".*finger"), + "static_friction_range": (5.0, 5.0), + "dynamic_friction_range": (5.0, 5.0), + "restitution_range": (0.0, 0.0), + "num_buckets": 16, + }, + ) + + randomize_gear_type = EventTerm( + func=gear_assembly_events.RandomizeGearType, + mode="reset", + params={ + "gear_types": ["gear_small", "gear_medium", "gear_large"] + # "gear_types": ["gear_small"] + }, + ) + + reset_all = EventTerm(func=mdp.reset_scene_to_default, mode="reset") + + randomize_gears_and_base_pose = EventTerm( + func=gear_assembly_events.RandomizeGearsAndBasePose, + mode="reset", + params={ + "pose_range": { + "x": [-0.1, 0.1], + "y": [-0.25, 0.25], + "z": [-0.1, 0.1], + "roll": [-math.pi / 90, math.pi / 90], # 2 degree + "pitch": [-math.pi / 90, math.pi / 90], # 2 degree + "yaw": [-math.pi / 6, math.pi / 6], # 2 degree + }, + "gear_pos_range": { + "x": [-0.02, 0.02], + "y": [-0.02, 0.02], + "z": [0.0575, 0.0775], # 0.045 + 0.0225 + }, + "velocity_range": {}, + }, + ) + + set_robot_to_grasp_pose = EventTerm( + func=gear_assembly_events.SetRobotToGraspPose, + mode="reset", + params={ + "robot_asset_cfg": SceneEntityCfg("robot"), + "pos_randomization_range": {"x": [-0.0, 0.0], "y": [-0.005, 0.005], "z": [-0.003, 0.003]}, + }, + ) + + +@configclass +class UR10eGearAssemblyEnvCfg(GearAssemblyEnvCfg): + """Base configuration for UR10e Gear Assembly Environment. + + This class contains common setup shared across different gripper configurations. + Subclasses should configure gripper-specific parameters. + """ + + def __post_init__(self): + # post init of parent + super().__post_init__() + + # Robot-specific parameters (can be overridden for other robots) + self.end_effector_body_name = "wrist_3_link" # End effector body name for IK and termination checks + self.num_arm_joints = 6 # Number of arm joints (excluding gripper) + self.grasp_rot_offset = [ + 0.0, + math.sqrt(2) / 2, + math.sqrt(2) / 2, + 0.0, + ] # Rotation offset for grasp pose (quaternion [w, x, y, z]) + self.gripper_joint_setter_func = None # Gripper-specific joint setter function (set in subclass) + + # Gear orientation termination thresholds (in degrees) + self.gear_orientation_roll_threshold_deg = 15.0 # Maximum allowed roll deviation + self.gear_orientation_pitch_threshold_deg = 15.0 # Maximum allowed pitch deviation + self.gear_orientation_yaw_threshold_deg = 180.0 # Maximum allowed yaw deviation + + # Common observation configuration + self.observations.policy.joint_pos.params["asset_cfg"].joint_names = [ + "shoulder_pan_joint", + "shoulder_lift_joint", + "elbow_joint", + "wrist_1_joint", + "wrist_2_joint", + "wrist_3_joint", + ] + self.observations.policy.joint_vel.params["asset_cfg"].joint_names = [ + "shoulder_pan_joint", + "shoulder_lift_joint", + "elbow_joint", + "wrist_1_joint", + "wrist_2_joint", + "wrist_3_joint", + ] + + # override events + self.events = EventCfg() + + # Update termination thresholds from config + self.terminations.gear_orientation_exceeded.params["roll_threshold_deg"] = ( + self.gear_orientation_roll_threshold_deg + ) + self.terminations.gear_orientation_exceeded.params["pitch_threshold_deg"] = ( + self.gear_orientation_pitch_threshold_deg + ) + self.terminations.gear_orientation_exceeded.params["yaw_threshold_deg"] = ( + self.gear_orientation_yaw_threshold_deg + ) + + # override command generator body + self.joint_action_scale = 0.025 + self.actions.arm_action = mdp.RelativeJointPositionActionCfg( + asset_name="robot", + joint_names=[ + "shoulder_pan_joint", + "shoulder_lift_joint", + "elbow_joint", + "wrist_1_joint", + "wrist_2_joint", + "wrist_3_joint", + ], + scale=self.joint_action_scale, + use_zero_offset=True, + ) + + +@configclass +class UR10e2F140GearAssemblyEnvCfg(UR10eGearAssemblyEnvCfg): + """Configuration for UR10e with Robotiq 2F-140 gripper.""" + + def __post_init__(self): + # post init of parent + super().__post_init__() + + # switch robot to ur10e with 2F-140 gripper + self.scene.robot = UR10e_ROBOTIQ_GRIPPER_CFG.replace( + prim_path="{ENV_REGEX_NS}/Robot", + spawn=UR10e_ROBOTIQ_GRIPPER_CFG.spawn.replace( + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=True, + max_depenetration_velocity=5.0, + linear_damping=0.0, + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, + solver_position_iteration_count=4, + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, + ), + articulation_props=sim_utils.ArticulationRootPropertiesCfg( + enabled_self_collisions=False, solver_position_iteration_count=4, solver_velocity_iteration_count=1 + ), + collision_props=sim_utils.CollisionPropertiesCfg(contact_offset=0.005, rest_offset=0.0), + ), + # Joint positions based on IK from center of distribution for randomized gear positions + # This is done so that the start for the differential IK search after randomizing + # is close to the optimal grasp pose + init_state=ArticulationCfg.InitialStateCfg( + joint_pos={ + "shoulder_pan_joint": 2.7228, + "shoulder_lift_joint": -8.3962e-01, + "elbow_joint": 1.3684, + "wrist_1_joint": -2.1048, + "wrist_2_joint": -1.5691, + "wrist_3_joint": -1.9896, + "finger_joint": 0.0, + ".*_inner_finger_joint": 0.0, + ".*_inner_finger_pad_joint": 0.0, + ".*_outer_.*_joint": 0.0, + }, + pos=(0.0, 0.0, 0.0), + rot=(1.0, 0.0, 0.0, 0.0), + ), + ) + + # 2F-140 gripper actuator configuration + self.scene.robot.actuators["gripper_finger"] = ImplicitActuatorCfg( + joint_names_expr=[".*_inner_finger_joint"], + effort_limit_sim=10.0, + velocity_limit_sim=10.0, + stiffness=10.0, + damping=0.05, + friction=0.0, + armature=0.0, + ) + + # Set gripper-specific joint setter function + self.gripper_joint_setter_func = set_finger_joint_pos_robotiq_2f140 + + # gear offsets and grasp positions for the 2F-140 gripper + self.gear_offsets_grasp = { + "gear_small": [0.0, self.gear_offsets["gear_small"][0], -0.26], + "gear_medium": [0.0, self.gear_offsets["gear_medium"][0], -0.26], + "gear_large": [0.0, self.gear_offsets["gear_large"][0], -0.26], + } + + # Grasp widths for 2F-140 gripper + self.hand_grasp_width = {"gear_small": 0.64, "gear_medium": 0.54, "gear_large": 0.51} + + # Close widths for 2F-140 gripper + self.hand_close_width = {"gear_small": 0.69, "gear_medium": 0.59, "gear_large": 0.56} + + # Populate event term parameters + self.events.set_robot_to_grasp_pose.params["gear_offsets_grasp"] = self.gear_offsets_grasp + self.events.set_robot_to_grasp_pose.params["end_effector_body_name"] = self.end_effector_body_name + self.events.set_robot_to_grasp_pose.params["num_arm_joints"] = self.num_arm_joints + self.events.set_robot_to_grasp_pose.params["grasp_rot_offset"] = self.grasp_rot_offset + self.events.set_robot_to_grasp_pose.params["gripper_joint_setter_func"] = self.gripper_joint_setter_func + + # Populate termination term parameters + self.terminations.gear_dropped.params["gear_offsets_grasp"] = self.gear_offsets_grasp + self.terminations.gear_dropped.params["end_effector_body_name"] = self.end_effector_body_name + self.terminations.gear_dropped.params["grasp_rot_offset"] = self.grasp_rot_offset + + self.terminations.gear_orientation_exceeded.params["end_effector_body_name"] = self.end_effector_body_name + self.terminations.gear_orientation_exceeded.params["grasp_rot_offset"] = self.grasp_rot_offset + + +@configclass +class UR10e2F85GearAssemblyEnvCfg(UR10eGearAssemblyEnvCfg): + """Configuration for UR10e with Robotiq 2F-85 gripper.""" + + def __post_init__(self): + # post init of parent + super().__post_init__() + + # switch robot to ur10e with 2F-85 gripper + self.scene.robot = UR10e_ROBOTIQ_2F_85_CFG.replace( + prim_path="{ENV_REGEX_NS}/Robot", + spawn=UR10e_ROBOTIQ_2F_85_CFG.spawn.replace( + # TODO: @ashwinvk: Revert to default USD after https://jirasw.nvidia.com/browse/ISIM-4733 is resolved + usd_path="omniverse://isaac-dev.ov.nvidia.com/Projects/isaac_ros_gear_insertion/ur10e_default_2f85.usd", + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=True, + max_depenetration_velocity=5.0, + linear_damping=0.0, + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, + solver_position_iteration_count=4, + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, + ), + articulation_props=sim_utils.ArticulationRootPropertiesCfg( + enabled_self_collisions=False, solver_position_iteration_count=4, solver_velocity_iteration_count=1 + ), + collision_props=sim_utils.CollisionPropertiesCfg(contact_offset=0.005, rest_offset=0.0), + ), + # Joint positions based on IK from center of distribution for randomized gear positions + # This is done so that the start for the differential IK search after randomizing + # is close to the optimal grasp pose + init_state=ArticulationCfg.InitialStateCfg( + joint_pos={ + "shoulder_pan_joint": 2.7228, + "shoulder_lift_joint": -8.3962e-01, + "elbow_joint": 1.3684, + "wrist_1_joint": -2.1048, + "wrist_2_joint": -1.5691, + "wrist_3_joint": -1.9896, + "finger_joint": 0.0, + ".*_inner_finger_joint": 0.0, + ".*_inner_finger_knuckle_joint": 0.0, + ".*_outer_.*_joint": 0.0, + }, + pos=(0.0, 0.0, 0.0), + rot=(1.0, 0.0, 0.0, 0.0), + ), + ) + + # 2F-85 gripper actuator configuration (higher effort limits than 2F-140) + self.scene.robot.actuators["gripper_finger"] = ImplicitActuatorCfg( + joint_names_expr=[".*_inner_finger_joint"], + effort_limit_sim=10.0, + velocity_limit_sim=10.0, + stiffness=10.0, + damping=0.05, + friction=0.0, + armature=0.0, + ) + + # Set gripper-specific joint setter function + self.gripper_joint_setter_func = set_finger_joint_pos_robotiq_2f85 + + # gear offsets and grasp positions for the 2F-85 gripper + self.gear_offsets_grasp = { + "gear_small": [0.0, self.gear_offsets["gear_small"][0], -0.18], + "gear_medium": [0.0, self.gear_offsets["gear_medium"][0], -0.18], + "gear_large": [0.0, self.gear_offsets["gear_large"][0], -0.18], + } + + # Grasp widths for 2F-85 gripper + self.hand_grasp_width = {"gear_small": 0.64, "gear_medium": 0.46, "gear_large": 0.4} + + # Close widths for 2F-85 gripper + self.hand_close_width = {"gear_small": 0.69, "gear_medium": 0.51, "gear_large": 0.45} + + # Populate event term parameters + self.events.set_robot_to_grasp_pose.params["gear_offsets_grasp"] = self.gear_offsets_grasp + self.events.set_robot_to_grasp_pose.params["end_effector_body_name"] = self.end_effector_body_name + self.events.set_robot_to_grasp_pose.params["num_arm_joints"] = self.num_arm_joints + self.events.set_robot_to_grasp_pose.params["grasp_rot_offset"] = self.grasp_rot_offset + self.events.set_robot_to_grasp_pose.params["gripper_joint_setter_func"] = self.gripper_joint_setter_func + + # Populate termination term parameters + self.terminations.gear_dropped.params["gear_offsets_grasp"] = self.gear_offsets_grasp + self.terminations.gear_dropped.params["end_effector_body_name"] = self.end_effector_body_name + self.terminations.gear_dropped.params["grasp_rot_offset"] = self.grasp_rot_offset + + self.terminations.gear_orientation_exceeded.params["end_effector_body_name"] = self.end_effector_body_name + self.terminations.gear_orientation_exceeded.params["grasp_rot_offset"] = self.grasp_rot_offset + + +@configclass +class UR10e2F140GearAssemblyEnvCfg_PLAY(UR10e2F140GearAssemblyEnvCfg): + """Play configuration for UR10e with Robotiq 2F-140 gripper.""" + + def __post_init__(self): + # post init of parent + super().__post_init__() + # make a smaller scene for play + self.scene.num_envs = 50 + self.scene.env_spacing = 2.5 + # disable randomization for play + self.observations.policy.enable_corruption = False + + +@configclass +class UR10e2F85GearAssemblyEnvCfg_PLAY(UR10e2F85GearAssemblyEnvCfg): + """Play configuration for UR10e with Robotiq 2F-85 gripper.""" + + def __post_init__(self): + # post init of parent + super().__post_init__() + # make a smaller scene for play + self.scene.num_envs = 50 + self.scene.env_spacing = 2.5 + # disable randomization for play + self.observations.policy.enable_corruption = False diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/ros_inference_env_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/ros_inference_env_cfg.py new file mode 100644 index 00000000000..3ed149355c8 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/config/ur_10e/ros_inference_env_cfg.py @@ -0,0 +1,180 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from isaaclab.utils import configclass +from isaaclab.assets import ArticulationCfg +from isaaclab.assets import RigidObjectCfg + +from .joint_pos_env_cfg import UR10e2F140GearAssemblyEnvCfg, UR10e2F85GearAssemblyEnvCfg + + +@configclass +class UR10e2F140GearAssemblyROSInferenceEnvCfg(UR10e2F140GearAssemblyEnvCfg): + """Configuration for ROS inference with UR10e and Robotiq 2F-140 gripper. + + This configuration: + - Exposes variables needed for ROS inference + - Overrides robot and gear initial poses for fixed/deterministic setup + """ + + def __post_init__(self): + # post init of parent + super().__post_init__() + + # Variables used by Isaac Manipulator for on robot inference + self.obs_order = ["arm_dof_pos", "arm_dof_vel", "shaft_pos", "shaft_quat"] + self.policy_action_space = "joint" + self.arm_joint_names = [ + "shoulder_pan_joint", + "shoulder_lift_joint", + "elbow_joint", + "wrist_1_joint", + "wrist_2_joint", + "wrist_3_joint", + ] + self.action_space = 6 + self.state_space = 42 + self.observation_space = 19 + + # Set joint_action_scale from the existing arm_action.scale + self.joint_action_scale = self.actions.arm_action.scale + + self.action_scale_joint_space = [ + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + ] + + # Override robot initial pose for ROS inference (fixed pose, no randomization) + # These joint positions are tuned for a good starting configuration + self.scene.robot.init_state = ArticulationCfg.InitialStateCfg( + joint_pos={ + "shoulder_pan_joint": 2.7228e+00, + "shoulder_lift_joint": -8.3962e-01, + "elbow_joint": 1.3684e+00, + "wrist_1_joint": -2.1048e+00, + "wrist_2_joint": -1.5691e+00, + "wrist_3_joint": -1.9896e+00, + "finger_joint": 0.0, + ".*_inner_finger_joint": 0.0, + ".*_inner_finger_pad_joint": 0.0, + ".*_outer_.*_joint": 0.0, + }, + pos=(0.0, 0.0, 0.0), + rot=(0.0, 0.0, 0.0, 1.0), + ) + + # Override gear base initial pose (fixed pose for ROS inference) + self.scene.factory_gear_base.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) + + # Override gear initial poses (fixed poses for ROS inference) + # Small gear + self.scene.factory_gear_small.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), # z = base_z + 0.1675 (above base) + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) + + # Medium gear + self.scene.factory_gear_medium.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) + + # Large gear + self.scene.factory_gear_large.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) + +@configclass +class UR10e2F85GearAssemblyROSInferenceEnvCfg(UR10e2F85GearAssemblyEnvCfg): + """Configuration for ROS inference with UR10e and Robotiq 2F-85 gripper. + + This configuration: + - Exposes variables needed for ROS inference + - Overrides robot and gear initial poses for fixed/deterministic setup + """ + + def __post_init__(self): + # post init of parent + super().__post_init__() + + # Variables used by Isaac Manipulator for on robot inference + # TODO: @ashwinvk: Remove these from env cfg once the generic inference node has been implemented + self.obs_order = ["arm_dof_pos", "arm_dof_vel", "shaft_pos", "shaft_quat"] + self.policy_action_space = "joint" + self.arm_joint_names = [ + "shoulder_pan_joint", + "shoulder_lift_joint", + "elbow_joint", + "wrist_1_joint", + "wrist_2_joint", + "wrist_3_joint", + ] + self.action_space = 6 + self.state_space = 42 + self.observation_space = 19 + + # Set joint_action_scale from the existing arm_action.scale + self.joint_action_scale = self.actions.arm_action.scale + + self.action_scale_joint_space = [ + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + self.joint_action_scale, + ] + + # Override robot initial pose for ROS inference (fixed pose, no randomization) + # These joint positions are tuned for a good starting configuration + self.scene.robot.init_state = ArticulationCfg.InitialStateCfg( + joint_pos={ + "shoulder_pan_joint": 2.7228e+00, + "shoulder_lift_joint": -8.3962e-01, + "elbow_joint": 1.3684e+00, + "wrist_1_joint": -2.1048e+00, + "wrist_2_joint": -1.5691e+00, + "wrist_3_joint": -1.9896e+00, + "finger_joint": 0.0, + ".*_inner_finger_joint": 0.0, + ".*_inner_finger_knuckle_joint": 0.0, + ".*_outer_.*_joint": 0.0, + }, + pos=(0.0, 0.0, 0.0), + rot=(0.0, 0.0, 0.0, 1.0), + ) + + # Override gear base initial pose (fixed pose for ROS inference) + self.scene.factory_gear_base.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) + + # Override gear initial poses (fixed poses for ROS inference) + # Small gear + self.scene.factory_gear_small.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), # z = base_z + 0.1675 (above base) + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) + + # Medium gear + self.scene.factory_gear_medium.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) + + # Large gear + self.scene.factory_gear_large.init_state = RigidObjectCfg.InitialStateCfg( + pos=(1.0200, -0.2100, -0.1), + rot=(-0.70711, 0.0, 0.0, 0.70711), + ) diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/gear_assembly_env_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/gear_assembly_env_cfg.py new file mode 100644 index 00000000000..5fde5eb57e3 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/gear_assembly/gear_assembly_env_cfg.py @@ -0,0 +1,323 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from dataclasses import MISSING +import os + +import isaaclab.sim as sim_utils +from isaaclab.assets import ArticulationCfg, AssetBaseCfg, RigidObjectCfg +from isaaclab.envs import ManagerBasedRLEnvCfg +from isaaclab.managers import ActionTermCfg as ActionTerm +from isaaclab.managers import EventTermCfg as EventTerm + +from isaaclab.managers import ObservationGroupCfg as ObsGroup +from isaaclab.managers import ObservationTermCfg as ObsTerm +from isaaclab.managers import RewardTermCfg as RewTerm +from isaaclab.managers import SceneEntityCfg +from isaaclab.managers import TerminationTermCfg as DoneTerm +from isaaclab.sim.simulation_cfg import PhysxCfg, SimulationCfg +from isaaclab.scene import InteractiveSceneCfg +from isaaclab.utils import configclass +from isaaclab.utils.noise import ResetSampledNoiseModelCfg, UniformNoiseCfg +import isaaclab_tasks.manager_based.manipulation.deploy.mdp as mdp +import isaaclab_tasks.manager_based.manipulation.deploy.mdp.terminations as gear_assembly_terminations + +# Get the directory where this configuration file is located +CONFIG_DIR = os.path.dirname(os.path.abspath(__file__)) +ASSETS_DIR = os.path.join(CONFIG_DIR, "assets") + +## +# Environment configuration +## + +@configclass +class GearAssemblySceneCfg(InteractiveSceneCfg): + """Configuration for the scene with a robotic arm.""" + + # Disable scene replication to allow USD-level randomization + replicate_physics = False + + # world + ground = AssetBaseCfg( + prim_path="/World/ground", + spawn=sim_utils.GroundPlaneCfg(), + init_state=AssetBaseCfg.InitialStateCfg(pos=(0.0, 0.0, -1.05)), + ) + + factory_gear_base = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/FactoryGearBase", + # TODO: change to common isaac sim directory + spawn=sim_utils.UsdFileCfg( + usd_path="omniverse://isaac-dev.ov.nvidia.com/Isaac/Props/Factory/gear_assets/factory_gear_base/factory_gear_base.usd", + activate_contact_sensors=False, + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + kinematic_enabled=True, + max_depenetration_velocity=5.0, + linear_damping=0.0, + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, + solver_position_iteration_count=32, + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, + ), + mass_props=sim_utils.MassPropertiesCfg(mass=None), + collision_props=sim_utils.CollisionPropertiesCfg(contact_offset=0.02, rest_offset=0.0), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(-1.0200, 0.2100, -0.1), rot=(0.70711, 0.0, 0.0, 0.70711)), + ) + + factory_gear_small = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/FactoryGearSmall", + # TODO: change to common isaac sim directory + spawn=sim_utils.UsdFileCfg( + usd_path="omniverse://isaac-dev.ov.nvidia.com/Isaac/Props/Factory/gear_assets/factory_gear_small/factory_gear_small.usd", + activate_contact_sensors=False, + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + kinematic_enabled=False, + max_depenetration_velocity=5.0, + linear_damping=0.0, + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, + solver_position_iteration_count=32, + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, + ), + mass_props=sim_utils.MassPropertiesCfg(mass=None), + collision_props=sim_utils.CollisionPropertiesCfg(contact_offset=0.02, rest_offset=0.0), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(-1.0200, 0.2100, -0.1), rot=(0.70711, 0.0, 0.0, 0.70711)), + ) + + factory_gear_medium = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/FactoryGearMedium", + # TODO: change to common isaac sim directory + spawn=sim_utils.UsdFileCfg( + usd_path="omniverse://isaac-dev.ov.nvidia.com/Isaac/Props/Factory/gear_assets/factory_gear_medium/factory_gear_medium.usd", + activate_contact_sensors=False, + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + kinematic_enabled=False, + max_depenetration_velocity=5.0, + linear_damping=0.0, + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, + solver_position_iteration_count=32, + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, + ), + mass_props=sim_utils.MassPropertiesCfg(mass=None), + collision_props=sim_utils.CollisionPropertiesCfg(contact_offset=0.02, rest_offset=0.0), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(-1.0200, 0.2100, -0.1), rot=(0.70711, 0.0, 0.0, 0.70711)), + ) + + factory_gear_large = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/FactoryGearLarge", + # TODO: change to common isaac sim directory + spawn=sim_utils.UsdFileCfg( + usd_path="omniverse://isaac-dev.ov.nvidia.com/Isaac/Props/Factory/gear_assets/factory_gear_large/factory_gear_large.usd", + activate_contact_sensors=False, + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + kinematic_enabled=False, + max_depenetration_velocity=5.0, + linear_damping=0.0, + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, + solver_position_iteration_count=32, + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, + ), + mass_props=sim_utils.MassPropertiesCfg(mass=None), + collision_props=sim_utils.CollisionPropertiesCfg(contact_offset=0.02, rest_offset=0.0), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(-1.0200, 0.2100, -0.1), rot=(0.70711, 0.0, 0.0, 0.70711)), + ) + + + # robots + robot: ArticulationCfg = MISSING + + + # lights + light = AssetBaseCfg( + prim_path="/World/light", + spawn=sim_utils.DomeLightCfg(color=(0.75, 0.75, 0.75), intensity=2500.0), + ) + +@configclass +class ActionsCfg: + """Action specifications for the MDP.""" + + arm_action: ActionTerm = MISSING + gripper_action: ActionTerm | None = None + + +@configclass +class ObservationsCfg: + """Observation specifications for the MDP.""" + + @configclass + class PolicyCfg(ObsGroup): + """Observations for policy group.""" + + # observation terms (order preserved) + joint_pos = ObsTerm(func=mdp.joint_pos, + params={"asset_cfg": SceneEntityCfg("robot", joint_names=[".*"])}) + joint_vel = ObsTerm(func=mdp.joint_vel, + params={"asset_cfg": SceneEntityCfg("robot", joint_names=[".*"])}) + gear_shaft_pos = ObsTerm( + func=mdp.GearShaftPosW, + params={}, # Will be populated in __post_init__ + noise=ResetSampledNoiseModelCfg( + noise_cfg=UniformNoiseCfg(n_min=-0.005, n_max=0.005, operation="add") + ) + ) + gear_shaft_quat = ObsTerm(func=mdp.GearShaftQuatW) + + def __post_init__(self): + self.enable_corruption = True + self.concatenate_terms = True + + @configclass + class CriticCfg(ObsGroup): + """Observations for policy group.""" + + # observation terms (order preserved) + joint_pos = ObsTerm(func=mdp.joint_pos, + params={"asset_cfg": SceneEntityCfg("robot", joint_names=[".*"])}) + joint_vel = ObsTerm(func=mdp.joint_vel, + params={"asset_cfg": SceneEntityCfg("robot", joint_names=[".*"])}) + gear_shaft_pos = ObsTerm(func=mdp.GearShaftPosW, params={}) # Will be populated in __post_init__ + gear_shaft_quat = ObsTerm(func=mdp.GearShaftQuatW) + + gear_pos = ObsTerm(func=mdp.GearPosW) + gear_quat = ObsTerm(func=mdp.GearQuatW) + + # observation groups + policy: PolicyCfg = PolicyCfg() + critic: CriticCfg = CriticCfg() + + +@configclass +class EventCfg: + """Configuration for events.""" + + + reset_all = EventTerm(func=mdp.reset_scene_to_default, mode="reset") + + reset_gear = EventTerm( + func=mdp.reset_root_state_uniform, + mode="reset", + params={ + "pose_range": { + "x": [-0.05, 0.05], + "y": [-0.05, 0.05], + "z": [0.1, 0.15], + }, + "velocity_range": {}, + "asset_cfg": SceneEntityCfg("factory_gear_small"), + }, + ) + + +@configclass +class RewardsCfg: + """Reward terms for the MDP.""" + + end_effector_gear_keypoint_tracking = RewTerm( + func=mdp.KeypointEntityError, + weight=-1.5, + params={ + "asset_cfg_1": SceneEntityCfg("factory_gear_base"), + "keypoint_scale": 0.15, + }, + ) + + end_effector_gear_keypoint_tracking_exp = RewTerm( + func=mdp.KeypointEntityErrorExp, + weight=1.5, + params={ + "asset_cfg_1": SceneEntityCfg("factory_gear_base"), + "kp_exp_coeffs": [(50, 0.0001), (300, 0.0001)], + "kp_use_sum_of_exps": False, + "keypoint_scale": 0.15, + }, + ) + + action_rate = RewTerm(func=mdp.action_rate, weight=-5.0e-06) + + +@configclass +class TerminationsCfg: + """Termination terms for the MDP.""" + + time_out = DoneTerm(func=mdp.time_out, time_out=True) + + gear_dropped = DoneTerm( + func=gear_assembly_terminations.ResetWhenGearDropped, + params={ + "distance_threshold": 0.15, # 15cm from gripper + "robot_asset_cfg": SceneEntityCfg("robot"), + }, + ) + + gear_orientation_exceeded = DoneTerm( + func=gear_assembly_terminations.ResetWhenGearOrientationExceedsThreshold, + params={ + "roll_threshold_deg": 7.0, # Maximum roll deviation in degrees + "pitch_threshold_deg": 7.0, # Maximum pitch deviation in degrees + "yaw_threshold_deg": 180.0, # Maximum yaw deviation in degrees + "robot_asset_cfg": SceneEntityCfg("robot"), + }, + ) + +@configclass +class GearAssemblyEnvCfg(ManagerBasedRLEnvCfg): + # Scene settings + scene: GearAssemblySceneCfg = GearAssemblySceneCfg(num_envs=4096, env_spacing=2.5) + # Basic settings + observations: ObservationsCfg = ObservationsCfg() + actions: ActionsCfg = ActionsCfg() + # MDP settings + rewards: RewardsCfg = RewardsCfg() + terminations: TerminationsCfg = TerminationsCfg() + events: EventCfg = EventCfg() + sim: SimulationCfg = SimulationCfg( + + physx=PhysxCfg( + gpu_collision_stack_size=2**28, # Important to prevent collisionStackSize buffer overflow in contact-rich environments. + gpu_max_rigid_contact_count=2**23, + gpu_max_rigid_patch_count=2**23 + ), + ) + + def __post_init__(self): + """Post initialization.""" + # general settings + self.episode_length_s = 6.66 + self.viewer.eye = (3.5, 3.5, 3.5) + # simulation settings + self.decimation = 4 + self.sim.render_interval = self.decimation + self.sim.dt = 1.0 / 120.0 + + self.gear_offsets = {'gear_small': [0.076125, 0.0, 0.0], + 'gear_medium': [0.030375, 0.0, 0.0], + 'gear_large': [-0.045375, 0.0, 0.0]} + + # Populate observation term parameters with gear offsets + self.observations.policy.gear_shaft_pos.params["gear_offsets"] = self.gear_offsets + self.observations.critic.gear_shaft_pos.params["gear_offsets"] = self.gear_offsets diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/__init__.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/__init__.py index 6686f9f5276..199e38f54e9 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/__init__.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/__init__.py @@ -7,4 +7,7 @@ from isaaclab.envs.mdp import * # noqa: F401, F403 +from .events import * # noqa: F401, F403 +from .observations import * # noqa: F401, F403 from .rewards import * # noqa: F401, F403 +from .terminations import * # noqa: F401, F403 diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/events.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/events.py new file mode 100644 index 00000000000..f17642877f4 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/events.py @@ -0,0 +1,461 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Class-based event terms specific to the gear assembly manipulation environments.""" + + +from __future__ import annotations + +import random +import torch +from typing import TYPE_CHECKING + +import isaaclab.utils.math as math_utils +from isaaclab.assets import Articulation, RigidObject +from isaaclab.managers import EventTermCfg, ManagerTermBase, SceneEntityCfg + +from isaaclab_tasks.direct.automate import factory_control as fc + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedEnv + + +class RandomizeGearType(ManagerTermBase): + """Randomize and manage the gear type being used for each environment. + + This class stores the current gear type for each environment and provides a mapping + from gear type names to indices. It serves as the central manager for gear type state + that other MDP terms depend on. + """ + + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the gear type randomization term. + + Args: + cfg: Event term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Extract gear types from config (required parameter) + if "gear_types" not in cfg.params: + raise ValueError("'gear_types' parameter is required in RandomizeGearType configuration") + self.gear_types: list[str] = cfg.params["gear_types"] + + # Create gear type mapping (shared across all terms) + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + + # Store current gear type for each environment (as list for easy access) + # Initialize all to first gear type in the list + self._current_gear_type = [self.gear_types[0]] * env.num_envs + + # Store reference on environment for other terms to access + env._gear_type_manager = self + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor, + gear_types: list[str] = ["gear_small", "gear_medium", "gear_large"], + ): + """Randomize the gear type for specified environments. + + Args: + env: The environment containing the assets + env_ids: Environment IDs to randomize + gear_types: List of available gear types to choose from + """ + # Randomly select gear type for each environment + # Use the parameter passed to __call__ (not self.gear_types) to allow runtime overrides + for env_id in env_ids.tolist(): + self._current_gear_type[env_id] = random.choice(gear_types) + + def get_gear_type(self, env_id: int) -> str: + """Get the current gear type for a specific environment.""" + return self._current_gear_type[env_id] + + def get_all_gear_types(self) -> list[str]: + """Get current gear types for all environments.""" + return self._current_gear_type + + +class SetRobotToGraspPose(ManagerTermBase): + """Set robot to grasp pose using IK with pre-cached tensors. + + This class-based term caches all required tensors and gear offsets during initialization, + avoiding repeated allocations and lookups during execution. + """ + + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the set robot to grasp pose term. + + Args: + cfg: Event term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Get robot asset configuration + self.robot_asset_cfg: SceneEntityCfg = cfg.params.get("robot_asset_cfg", SceneEntityCfg("robot")) + self.robot_asset: Articulation = env.scene[self.robot_asset_cfg.name] + + # Get robot-specific parameters from environment config (all required) + # Validate required parameters + if "end_effector_body_name" not in cfg.params: + raise ValueError( + "'end_effector_body_name' parameter is required in SetRobotToGraspPose configuration. " + "Example: 'wrist_3_link'" + ) + if "num_arm_joints" not in cfg.params: + raise ValueError( + "'num_arm_joints' parameter is required in SetRobotToGraspPose configuration. Example: 6 for UR10e" + ) + if "grasp_rot_offset" not in cfg.params: + raise ValueError( + "'grasp_rot_offset' parameter is required in SetRobotToGraspPose configuration. " + "It should be a quaternion [w, x, y, z]. Example: [0.0, 0.707, 0.707, 0.0]" + ) + if "gripper_joint_setter_func" not in cfg.params: + raise ValueError( + "'gripper_joint_setter_func' parameter is required in SetRobotToGraspPose configuration. " + "It should be a function to set gripper joint positions." + ) + + self.end_effector_body_name = cfg.params["end_effector_body_name"] + self.num_arm_joints = cfg.params["num_arm_joints"] + self.gripper_joint_setter_func = cfg.params["gripper_joint_setter_func"] + + # Pre-cache gear grasp offsets as tensors (required parameter) + if "gear_offsets_grasp" not in cfg.params: + raise ValueError( + "'gear_offsets_grasp' parameter is required in SetRobotToGraspPose configuration. " + "It should be a dict with keys 'gear_small', 'gear_medium', 'gear_large' mapping to [x, y, z] offsets." + ) + gear_offsets_grasp = cfg.params["gear_offsets_grasp"] + if not isinstance(gear_offsets_grasp, dict): + raise TypeError( + f"'gear_offsets_grasp' parameter must be a dict, got {type(gear_offsets_grasp).__name__}. " + "It should have keys 'gear_small', 'gear_medium', 'gear_large' mapping to [x, y, z] offsets." + ) + + self.gear_grasp_offset_tensors = {} + for gear_type in ["gear_small", "gear_medium", "gear_large"]: + if gear_type not in gear_offsets_grasp: + raise ValueError( + f"'{gear_type}' offset is required in 'gear_offsets_grasp' parameter. " + f"Found keys: {list(gear_offsets_grasp.keys())}" + ) + self.gear_grasp_offset_tensors[gear_type] = torch.tensor( + gear_offsets_grasp[gear_type], device=env.device, dtype=torch.float32 + ) + + # Pre-cache grasp rotation offset tensor + grasp_rot_offset = cfg.params["grasp_rot_offset"] + self.grasp_rot_offset_tensor = ( + torch.tensor(grasp_rot_offset, device=env.device, dtype=torch.float32).unsqueeze(0).repeat(env.num_envs, 1) + ) + + # Pre-allocate buffers for batch operations + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + self.local_env_indices = torch.arange(env.num_envs, device=env.device) + self.gear_grasp_offsets_buffer = torch.zeros(env.num_envs, 3, device=env.device, dtype=torch.float32) + + # Cache hand grasp/close widths + self.hand_grasp_width = env.cfg.hand_grasp_width + self.hand_close_width = env.cfg.hand_close_width + + # Find end effector index once + eef_indices, _ = self.robot_asset.find_bodies([self.end_effector_body_name]) + if len(eef_indices) == 0: + raise ValueError(f"End effector body '{self.end_effector_body_name}' not found in robot") + self.eef_idx = eef_indices[0] + + # Find jacobian body index (for fixed-base robots, subtract 1) + self.jacobi_body_idx = self.eef_idx - 1 + + # Find all joints once + all_joints, all_joints_names = self.robot_asset.find_joints([".*"]) + self.all_joints = all_joints + self.finger_joints = all_joints[self.num_arm_joints :] + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor, + robot_asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), + pos_threshold: float = 1e-6, + rot_threshold: float = 1e-6, + max_iterations: int = 10, + pos_randomization_range: dict | None = None, + gear_offsets_grasp: dict | None = None, + end_effector_body_name: str | None = None, + num_arm_joints: int | None = None, + grasp_rot_offset: list | None = None, + gripper_joint_setter_func: callable | None = None, + ): + """Set robot to grasp pose using IK. + + Args: + env: Environment instance + env_ids: Environment IDs to reset + robot_asset_cfg: Robot asset configuration (unused, kept for compatibility) + pos_threshold: Position convergence threshold + rot_threshold: Rotation convergence threshold + max_iterations: Maximum IK iterations + pos_randomization_range: Optional position randomization range + """ + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this event term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + + # Get current joint state + joint_pos = self.robot_asset.data.joint_pos[env_ids].clone() + joint_vel = self.robot_asset.data.joint_vel[env_ids].clone() + + # Slice buffers for current batch size + num_reset_envs = len(env_ids) + gear_type_indices = self.gear_type_indices[:num_reset_envs] + local_env_indices = self.local_env_indices[:num_reset_envs] + gear_grasp_offsets = self.gear_grasp_offsets_buffer[:num_reset_envs] + grasp_rot_offset_tensor = self.grasp_rot_offset_tensor[env_ids] + + # IK loop + for i in range(max_iterations): + # Get current joint state + joint_pos = self.robot_asset.data.joint_pos[env_ids].clone() + joint_vel = self.robot_asset.data.joint_vel[env_ids].clone() + + env_ids_list = env_ids.tolist() + + # Stack all gear positions and quaternions + all_gear_pos = torch.stack( + [ + env.scene["factory_gear_small"].data.root_link_pos_w, + env.scene["factory_gear_medium"].data.root_link_pos_w, + env.scene["factory_gear_large"].data.root_link_pos_w, + ], + dim=1, + )[env_ids] + + all_gear_quat = torch.stack( + [ + env.scene["factory_gear_small"].data.root_link_quat_w, + env.scene["factory_gear_medium"].data.root_link_quat_w, + env.scene["factory_gear_large"].data.root_link_quat_w, + ], + dim=1, + )[env_ids] + + # Update gear_type_indices using pre-allocated buffer + for idx, env_id in enumerate(env_ids_list): + gear_type_indices[idx] = gear_type_manager.gear_type_map[gear_type_manager.get_gear_type(env_id)] + + # Select gear data using advanced indexing + grasp_object_pos_world = all_gear_pos[local_env_indices, gear_type_indices] + grasp_object_quat = all_gear_quat[local_env_indices, gear_type_indices] + + # Apply rotation offset + grasp_object_quat = math_utils.quat_mul(grasp_object_quat, grasp_rot_offset_tensor) + + # Get grasp offsets using cached tensors + for idx, env_id in enumerate(env_ids_list): + gear_type = gear_type_manager.get_gear_type(env_id) + gear_grasp_offsets[idx] = self.gear_grasp_offset_tensors[gear_type] + + # Add position randomization if specified + if pos_randomization_range is not None: + pos_keys = ["x", "y", "z"] + range_list_pos = [pos_randomization_range.get(key, (0.0, 0.0)) for key in pos_keys] + ranges_pos = torch.tensor(range_list_pos, device=env.device) + rand_pos_offsets = math_utils.sample_uniform( + ranges_pos[:, 0], ranges_pos[:, 1], (len(env_ids), 3), device=env.device + ) + gear_grasp_offsets = gear_grasp_offsets + rand_pos_offsets + + # Transform offsets from gear frame to world frame + grasp_object_pos_world = grasp_object_pos_world + math_utils.quat_apply( + grasp_object_quat, gear_grasp_offsets + ) + + # Get end effector pose + eef_pos = self.robot_asset.data.body_pos_w[env_ids, self.eef_idx] + eef_quat = self.robot_asset.data.body_quat_w[env_ids, self.eef_idx] + + # Compute pose error + pos_error, axis_angle_error = fc.get_pose_error( + fingertip_midpoint_pos=eef_pos, + fingertip_midpoint_quat=eef_quat, + ctrl_target_fingertip_midpoint_pos=grasp_object_pos_world, + ctrl_target_fingertip_midpoint_quat=grasp_object_quat, + jacobian_type="geometric", + rot_error_type="axis_angle", + ) + delta_hand_pose = torch.cat((pos_error, axis_angle_error), dim=-1) + + # Check convergence + pos_error_norm = torch.norm(pos_error, dim=-1) + rot_error_norm = torch.norm(axis_angle_error, dim=-1) + + if torch.all(pos_error_norm < pos_threshold) and torch.all(rot_error_norm < rot_threshold): + break + + # Solve IK using jacobian + jacobians = self.robot_asset.root_physx_view.get_jacobians().clone() + jacobian = jacobians[env_ids, self.jacobi_body_idx, :, :] + + delta_dof_pos = fc._get_delta_dof_pos( + delta_pose=delta_hand_pose, + ik_method="dls", + jacobian=jacobian, + device=env.device, + ) + + # Update joint positions + joint_pos = joint_pos + delta_dof_pos + joint_vel = torch.zeros_like(joint_pos) + + # Write to sim + self.robot_asset.set_joint_position_target(joint_pos, env_ids=env_ids) + self.robot_asset.set_joint_velocity_target(joint_vel, env_ids=env_ids) + self.robot_asset.write_joint_state_to_sim(joint_pos, joint_vel, env_ids=env_ids) + + # Set gripper to grasp position + joint_pos = self.robot_asset.data.joint_pos[env_ids].clone() + + for row_idx, env_id in enumerate(env_ids_list): + gear_key = gear_type_manager.get_gear_type(env_id) + hand_grasp_width = self.hand_grasp_width[gear_key] + self.gripper_joint_setter_func(joint_pos, [row_idx], self.finger_joints, hand_grasp_width) + + self.robot_asset.set_joint_position_target(joint_pos, joint_ids=self.all_joints, env_ids=env_ids) + self.robot_asset.write_joint_state_to_sim(joint_pos, joint_vel, env_ids=env_ids) + + # Set gripper to closed position + for row_idx, env_id in enumerate(env_ids_list): + gear_key = gear_type_manager.get_gear_type(env_id) + hand_close_width = self.hand_close_width[gear_key] + self.gripper_joint_setter_func(joint_pos, [row_idx], self.finger_joints, hand_close_width) + + self.robot_asset.set_joint_position_target(joint_pos, joint_ids=self.all_joints, env_ids=env_ids) + + +class RandomizeGearsAndBasePose(ManagerTermBase): + """Randomize both the gear base pose and individual gear poses. + + This class-based term pre-caches all tensors needed for randomization. + """ + + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the randomize gears and base pose term. + + Args: + cfg: Event term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Pre-allocate gear type mapping and indices + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + + # Cache asset names + self.gear_asset_names = ["factory_gear_small", "factory_gear_medium", "factory_gear_large"] + self.base_asset_name = "factory_gear_base" + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor, + pose_range: dict = {}, + velocity_range: dict = {}, + gear_pos_range: dict = {}, + ): + """Randomize gear base and gear poses. + + Args: + env: Environment instance + env_ids: Environment IDs to randomize + pose_range: Pose randomization range for base and all gears + velocity_range: Velocity randomization range + gear_pos_range: Additional position randomization for selected gear only + """ + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this event term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + device = env.device + + # Shared pose samples for all assets + pose_keys = ["x", "y", "z", "roll", "pitch", "yaw"] + range_list_pose = [pose_range.get(key, (0.0, 0.0)) for key in pose_keys] + ranges_pose = torch.tensor(range_list_pose, device=device) + rand_pose_samples = math_utils.sample_uniform( + ranges_pose[:, 0], ranges_pose[:, 1], (len(env_ids), 6), device=device + ) + + orientations_delta = math_utils.quat_from_euler_xyz( + rand_pose_samples[:, 3], rand_pose_samples[:, 4], rand_pose_samples[:, 5] + ) + + # Shared velocity samples + range_list_vel = [velocity_range.get(key, (0.0, 0.0)) for key in pose_keys] + ranges_vel = torch.tensor(range_list_vel, device=device) + rand_vel_samples = math_utils.sample_uniform( + ranges_vel[:, 0], ranges_vel[:, 1], (len(env_ids), 6), device=device + ) + + # Prepare poses for all assets + positions_by_asset = {} + orientations_by_asset = {} + velocities_by_asset = {} + + asset_names_to_process = [self.base_asset_name] + self.gear_asset_names + for asset_name in asset_names_to_process: + asset: RigidObject | Articulation = env.scene[asset_name] + root_states = asset.data.default_root_state[env_ids].clone() + positions = root_states[:, 0:3] + env.scene.env_origins[env_ids] + rand_pose_samples[:, 0:3] + orientations = math_utils.quat_mul(root_states[:, 3:7], orientations_delta) + velocities = root_states[:, 7:13] + rand_vel_samples + positions_by_asset[asset_name] = positions + orientations_by_asset[asset_name] = orientations + velocities_by_asset[asset_name] = velocities + + # Per-env gear offset (gear_pos_range) applied only to selected gear + range_list_gear = [gear_pos_range.get(key, (0.0, 0.0)) for key in ["x", "y", "z"]] + ranges_gear = torch.tensor(range_list_gear, device=device) + rand_gear_offsets = math_utils.sample_uniform( + ranges_gear[:, 0], ranges_gear[:, 1], (len(env_ids), 3), device=device + ) + + # Update gear_type_indices + env_ids_list = env_ids.tolist() + num_reset_envs = len(env_ids) + gear_type_indices = self.gear_type_indices[:num_reset_envs] + + for idx, env_id in enumerate(env_ids_list): + gear_type_indices[idx] = self.gear_type_map[gear_type_manager.get_gear_type(env_id)] + + # Apply offsets using vectorized operations with masks + for gear_idx, asset_name in enumerate(self.gear_asset_names): + if asset_name in positions_by_asset: + mask = gear_type_indices == gear_idx + positions_by_asset[asset_name][mask] = positions_by_asset[asset_name][mask] + rand_gear_offsets[mask] + + # Write to sim + for asset_name in positions_by_asset.keys(): + asset = env.scene[asset_name] + positions = positions_by_asset[asset_name] + orientations = orientations_by_asset[asset_name] + velocities = velocities_by_asset[asset_name] + asset.write_root_pose_to_sim(torch.cat([positions, orientations], dim=-1), env_ids=env_ids) + asset.write_root_velocity_to_sim(velocities, env_ids=env_ids) diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/observations.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/observations.py new file mode 100644 index 00000000000..ad29e2f849e --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/observations.py @@ -0,0 +1,290 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Class-based observation terms for the gear assembly manipulation environment.""" + +from __future__ import annotations + +import torch +from typing import TYPE_CHECKING + +from isaaclab.assets import RigidObject +from isaaclab.managers import ManagerTermBase, ObservationTermCfg, SceneEntityCfg +from isaacsim.core.utils.torch.transformations import tf_combine + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedRLEnv + from .events import RandomizeGearType + + +class GearShaftPosW(ManagerTermBase): + """Gear shaft position in world frame with offset applied. + + This class-based term caches gear offset tensors and identity quaternions. + """ + + def __init__(self, cfg: ObservationTermCfg, env: ManagerBasedRLEnv): + """Initialize the gear shaft position observation term. + + Args: + cfg: Observation term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Cache asset + self.asset_cfg: SceneEntityCfg = cfg.params.get("asset_cfg", SceneEntityCfg("factory_gear_base")) + self.asset: RigidObject = env.scene[self.asset_cfg.name] + + # Pre-cache gear offset tensors (required parameter) + if "gear_offsets" not in cfg.params: + raise ValueError( + "'gear_offsets' parameter is required in GearShaftPosW configuration. " + "It should be a dict with keys 'gear_small', 'gear_medium', 'gear_large' mapping to [x, y, z] offsets." + ) + gear_offsets = cfg.params["gear_offsets"] + if not isinstance(gear_offsets, dict): + raise TypeError( + f"'gear_offsets' parameter must be a dict, got {type(gear_offsets).__name__}. " + "It should have keys 'gear_small', 'gear_medium', 'gear_large' mapping to [x, y, z] offsets." + ) + + self.gear_offset_tensors = {} + for gear_type in ["gear_small", "gear_medium", "gear_large"]: + if gear_type not in gear_offsets: + raise ValueError( + f"'{gear_type}' offset is required in 'gear_offsets' parameter. " + f"Found keys: {list(gear_offsets.keys())}" + ) + self.gear_offset_tensors[gear_type] = torch.tensor( + gear_offsets[gear_type], device=env.device, dtype=torch.float32 + ) + + # Pre-allocate buffers + self.offsets_buffer = torch.zeros(env.num_envs, 3, device=env.device, dtype=torch.float32) + self.identity_quat = ( + torch.tensor([[1.0, 0.0, 0.0, 0.0]], device=env.device, dtype=torch.float32) + .repeat(env.num_envs, 1) + .contiguous() + ) + + def __call__( + self, + env: ManagerBasedRLEnv, + asset_cfg: SceneEntityCfg = SceneEntityCfg("factory_gear_base"), + gear_offsets: dict | None = None, + ) -> torch.Tensor: + """Compute gear shaft position in world frame. + + Args: + env: Environment instance + asset_cfg: Configuration of the gear base asset (unused, kept for compatibility) + + Returns: + Gear shaft position tensor of shape (num_envs, 3) + """ + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this observation term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + current_gear_types = gear_type_manager.get_all_gear_types() + + # Get base gear position and orientation + base_pos = self.asset.data.root_pos_w + base_quat = self.asset.data.root_quat_w + + # Update offsets using cached gear offset tensors + for i in range(env.num_envs): + self.offsets_buffer[i] = self.gear_offset_tensors[current_gear_types[i]] + + # Transform offsets + _, shaft_pos = tf_combine(base_quat, base_pos, self.identity_quat, self.offsets_buffer) + + return shaft_pos - env.scene.env_origins + + +class GearShaftQuatW(ManagerTermBase): + """Gear shaft orientation in world frame. + + This class-based term caches the asset reference. + """ + + def __init__(self, cfg: ObservationTermCfg, env: ManagerBasedRLEnv): + """Initialize the gear shaft orientation observation term. + + Args: + cfg: Observation term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Cache asset + self.asset_cfg: SceneEntityCfg = cfg.params.get("asset_cfg", SceneEntityCfg("factory_gear_base")) + self.asset: RigidObject = env.scene[self.asset_cfg.name] + + def __call__( + self, + env: ManagerBasedRLEnv, + asset_cfg: SceneEntityCfg = SceneEntityCfg("factory_gear_base"), + ) -> torch.Tensor: + """Compute gear shaft orientation in world frame. + + Args: + env: Environment instance + asset_cfg: Configuration of the gear base asset (unused, kept for compatibility) + + Returns: + Gear shaft orientation tensor of shape (num_envs, 4) + """ + # Get base quaternion + base_quat = self.asset.data.root_quat_w + + # Ensure w component is positive + w_negative = base_quat[:, 0] < 0 + positive_quat = base_quat.clone() + positive_quat[w_negative] = -base_quat[w_negative] + + return positive_quat + + +class GearPosW(ManagerTermBase): + """Gear position in world frame. + + This class-based term caches gear type mapping and index tensors. + """ + + def __init__(self, cfg: ObservationTermCfg, env: ManagerBasedRLEnv): + """Initialize the gear position observation term. + + Args: + cfg: Observation term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Pre-allocate gear type mapping and indices + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + self.env_indices = torch.arange(env.num_envs, device=env.device) + + # Cache gear assets + self.gear_assets = { + "gear_small": env.scene["factory_gear_small"], + "gear_medium": env.scene["factory_gear_medium"], + "gear_large": env.scene["factory_gear_large"], + } + + def __call__(self, env: ManagerBasedRLEnv) -> torch.Tensor: + """Compute gear position in world frame. + + Args: + env: Environment instance + + Returns: + Gear position tensor of shape (num_envs, 3) + """ + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this observation term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + current_gear_types = gear_type_manager.get_all_gear_types() + + # Stack all gear positions + all_gear_positions = torch.stack( + [ + self.gear_assets["gear_small"].data.root_pos_w, + self.gear_assets["gear_medium"].data.root_pos_w, + self.gear_assets["gear_large"].data.root_pos_w, + ], + dim=1, + ) + + # Update gear_type_indices + for i in range(env.num_envs): + self.gear_type_indices[i] = self.gear_type_map[current_gear_types[i]] + + # Select gear positions using advanced indexing + gear_positions = all_gear_positions[self.env_indices, self.gear_type_indices] + + return gear_positions - env.scene.env_origins + + +class GearQuatW(ManagerTermBase): + """Gear orientation in world frame. + + This class-based term caches gear type mapping and index tensors. + """ + + def __init__(self, cfg: ObservationTermCfg, env: ManagerBasedRLEnv): + """Initialize the gear orientation observation term. + + Args: + cfg: Observation term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Pre-allocate gear type mapping and indices + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + self.env_indices = torch.arange(env.num_envs, device=env.device) + + # Cache gear assets + self.gear_assets = { + "gear_small": env.scene["factory_gear_small"], + "gear_medium": env.scene["factory_gear_medium"], + "gear_large": env.scene["factory_gear_large"], + } + + def __call__(self, env: ManagerBasedRLEnv) -> torch.Tensor: + """Compute gear orientation in world frame. + + Args: + env: Environment instance + + Returns: + Gear orientation tensor of shape (num_envs, 4) + """ + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this observation term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + current_gear_types = gear_type_manager.get_all_gear_types() + + # Stack all gear quaternions + all_gear_quat = torch.stack( + [ + self.gear_assets["gear_small"].data.root_quat_w, + self.gear_assets["gear_medium"].data.root_quat_w, + self.gear_assets["gear_large"].data.root_quat_w, + ], + dim=1, + ) + + # Update gear_type_indices + for i in range(env.num_envs): + self.gear_type_indices[i] = self.gear_type_map[current_gear_types[i]] + + # Select gear quaternions using advanced indexing + gear_quat = all_gear_quat[self.env_indices, self.gear_type_indices] + + # Ensure w component is positive + w_negative = gear_quat[:, 0] < 0 + gear_positive_quat = gear_quat.clone() + gear_positive_quat[w_negative] = -gear_quat[w_negative] + + return gear_positive_quat diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/rewards.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/rewards.py index 0d14620e225..b0f7f2c8979 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/rewards.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/rewards.py @@ -3,6 +3,8 @@ # # SPDX-License-Identifier: BSD-3-Clause +"""Class-based reward terms for the gear assembly manipulation environment.""" + from __future__ import annotations import torch @@ -10,12 +12,14 @@ from isaacsim.core.utils.torch.transformations import tf_combine -from isaaclab.managers import SceneEntityCfg +from isaaclab.managers import ManagerTermBase, RewardTermCfg, SceneEntityCfg from isaaclab.sensors.frame_transformer.frame_transformer import FrameTransformer if TYPE_CHECKING: from isaaclab.envs import ManagerBasedRLEnv + from .events import RandomizeGearType + def get_keypoint_offsets_full_6d(add_cube_center_kp: bool = False, device: torch.device | None = None) -> torch.Tensor: """Get keypoints for pose alignment comparison. Pose is aligned if all axis are aligned. @@ -33,199 +37,492 @@ def get_keypoint_offsets_full_6d(add_cube_center_kp: bool = False, device: torch keypoint_corners = [[1, 0, 0], [0, 1, 0], [0, 0, 1]] keypoint_corners = torch.tensor(keypoint_corners, device=device, dtype=torch.float32) - keypoint_corners = torch.cat((keypoint_corners, -keypoint_corners[-3:]), dim=0) # use both negative and positive + keypoint_corners = torch.cat((keypoint_corners, -keypoint_corners[-3:]), dim=0) return keypoint_corners -def compute_keypoint_distance( - current_pos: torch.Tensor, - current_quat: torch.Tensor, - target_pos: torch.Tensor, - target_quat: torch.Tensor, - keypoint_scale: float = 1.0, - add_cube_center_kp: bool = True, - device: torch.device | None = None, -) -> torch.Tensor: +class ComputeKeypointDistance(ManagerTermBase): """Compute keypoint distance between current and target poses. - This function creates keypoints from the current and target poses and calculates - the L2 norm distance between corresponding keypoints. The keypoints are created - by applying offsets to the poses and transforming them to world coordinates. - - Args: - current_pos: Current position tensor of shape (num_envs, 3) - current_quat: Current quaternion tensor of shape (num_envs, 4) - target_pos: Target position tensor of shape (num_envs, 3) - target_quat: Target quaternion tensor of shape (num_envs, 4) - keypoint_scale: Scale factor for keypoint offsets - add_cube_center_kp: Whether to include the center keypoint (0, 0, 0) - device: Device to create tensors on - - Returns: - Keypoint distance tensor of shape (num_envs, num_keypoints) where each element - is the L2 norm distance between corresponding keypoints + This class-based term pre-caches keypoint offsets and identity quaternions + to avoid repeated allocations during reward computation. """ - if device is None: - device = current_pos.device - - num_envs = current_pos.shape[0] - - # Get keypoint offsets - keypoint_offsets = get_keypoint_offsets_full_6d(add_cube_center_kp, device) - keypoint_offsets = keypoint_offsets * keypoint_scale - num_keypoints = keypoint_offsets.shape[0] - - # Create identity quaternion for transformations - identity_quat = torch.tensor([1.0, 0.0, 0.0, 0.0], device=device).unsqueeze(0).repeat(num_envs, 1) - - # Initialize keypoint tensors - keypoints_current = torch.zeros((num_envs, num_keypoints, 3), device=device) - keypoints_target = torch.zeros((num_envs, num_keypoints, 3), device=device) - - # Compute keypoints for current and target poses - for idx, keypoint_offset in enumerate(keypoint_offsets): - # Transform keypoint offset to world coordinates for current pose - keypoints_current[:, idx] = tf_combine( - current_quat, current_pos, identity_quat, keypoint_offset.repeat(num_envs, 1) - )[1] - - # Transform keypoint offset to world coordinates for target pose - keypoints_target[:, idx] = tf_combine( - target_quat, target_pos, identity_quat, keypoint_offset.repeat(num_envs, 1) - )[1] - # Calculate L2 norm distance between corresponding keypoints - keypoint_dist_sep = torch.norm(keypoints_target - keypoints_current, p=2, dim=-1) - - return keypoint_dist_sep - - -def keypoint_command_error( - env: ManagerBasedRLEnv, - command_name: str, - asset_cfg: SceneEntityCfg, - keypoint_scale: float = 1.0, - add_cube_center_kp: bool = True, -) -> torch.Tensor: - """Compute keypoint distance between current and desired poses from command. - The function computes the keypoint distance between the current pose of the end effector from - the frame transformer sensor and the desired pose from the command. Keypoints are created by - applying offsets to both poses and the distance is computed as the L2-norm between corresponding keypoints. - - Args: - env: The environment containing the asset - command_name: Name of the command containing desired pose - asset_cfg: Configuration of the asset to track (not used, kept for compatibility) - keypoint_scale: Scale factor for keypoint offsets - add_cube_center_kp: Whether to include the center keypoint (0, 0, 0) + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + """Initialize the compute keypoint distance term. + + Args: + cfg: Reward term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Get keypoint configuration + add_cube_center_kp = cfg.params.get("add_cube_center_kp", True) + + # Pre-compute base keypoint offsets (unscaled) + self.keypoint_offsets_base = get_keypoint_offsets_full_6d( + add_cube_center_kp=add_cube_center_kp, device=env.device + ) + self.num_keypoints = self.keypoint_offsets_base.shape[0] + + # Pre-allocate identity quaternion for keypoint transforms + self.identity_quat_keypoints = ( + torch.tensor([[1.0, 0.0, 0.0, 0.0]], device=env.device, dtype=torch.float32) + .repeat(env.num_envs * self.num_keypoints, 1) + .contiguous() + ) + + # Pre-allocate buffer for batched keypoint offsets + self.keypoint_offsets_buffer = torch.zeros( + env.num_envs, self.num_keypoints, 3, device=env.device, dtype=torch.float32 + ) + + def compute( + self, + current_pos: torch.Tensor, + current_quat: torch.Tensor, + target_pos: torch.Tensor, + target_quat: torch.Tensor, + keypoint_scale: float = 1.0, + ) -> torch.Tensor: + """Compute keypoint distance between current and target poses. + + Args: + current_pos: Current position tensor of shape (num_envs, 3) + current_quat: Current quaternion tensor of shape (num_envs, 4) + target_pos: Target position tensor of shape (num_envs, 3) + target_quat: Target quaternion tensor of shape (num_envs, 4) + keypoint_scale: Scale factor for keypoint offsets + + Returns: + Keypoint distance tensor of shape (num_envs, num_keypoints) + """ + num_envs = current_pos.shape[0] + + # Scale keypoint offsets + keypoint_offsets = self.keypoint_offsets_base * keypoint_scale + + # Create batched keypoints (in-place operation) + self.keypoint_offsets_buffer[:num_envs] = keypoint_offsets.unsqueeze(0) + + # Flatten for batch processing + keypoint_offsets_flat = self.keypoint_offsets_buffer[:num_envs].reshape(-1, 3) + identity_quat = self.identity_quat_keypoints[: num_envs * self.num_keypoints] + + # Expand quaternions and positions for all keypoints + current_quat_expanded = current_quat.unsqueeze(1).expand(-1, self.num_keypoints, -1).reshape(-1, 4) + current_pos_expanded = current_pos.unsqueeze(1).expand(-1, self.num_keypoints, -1).reshape(-1, 3) + target_quat_expanded = target_quat.unsqueeze(1).expand(-1, self.num_keypoints, -1).reshape(-1, 4) + target_pos_expanded = target_pos.unsqueeze(1).expand(-1, self.num_keypoints, -1).reshape(-1, 3) + + # Transform all keypoints at once + _, keypoints_current_flat = tf_combine( + current_quat_expanded, current_pos_expanded, identity_quat, keypoint_offsets_flat + ) + _, keypoints_target_flat = tf_combine( + target_quat_expanded, target_pos_expanded, identity_quat, keypoint_offsets_flat + ) + + # Reshape back + keypoints_current = keypoints_current_flat.reshape(num_envs, self.num_keypoints, 3) + keypoints_target = keypoints_target_flat.reshape(num_envs, self.num_keypoints, 3) + + # Calculate L2 norm distance + keypoint_dist_sep = torch.norm(keypoints_target - keypoints_current, p=2, dim=-1) + + return keypoint_dist_sep + + +class KeypointCommandError(ManagerTermBase): + """Compute keypoint distance between current and desired poses from command. - Returns: - Keypoint distance tensor of shape (num_envs, num_keypoints) where each element - is the L2 norm distance between corresponding keypoints + This class-based term uses ComputeKeypointDistance internally. """ - # extract the frame transformer sensor - asset: FrameTransformer = env.scene[asset_cfg.name] - command = env.command_manager.get_command(command_name) - - # obtain the desired pose from command (position and orientation) - des_pos_b = command[:, :3] - des_quat_b = command[:, 3:7] - - # transform desired pose to world frame using source frame from frame transformer - des_pos_w = des_pos_b - des_quat_w = des_quat_b - - # get current pose in world frame from frame transformer (end effector pose) - curr_pos_w = asset.data.target_pos_source[:, 0] # First target frame is end_effector - curr_quat_w = asset.data.target_quat_source[:, 0] # First target frame is end_effector - - # compute keypoint distance - keypoint_dist_sep = compute_keypoint_distance( - current_pos=curr_pos_w, - current_quat=curr_quat_w, - target_pos=des_pos_w, - target_quat=des_quat_w, - keypoint_scale=keypoint_scale, - add_cube_center_kp=add_cube_center_kp, - device=curr_pos_w.device, - ) - - # Return mean distance across keypoints to match expected reward shape (num_envs,) - return keypoint_dist_sep.mean(-1) - - -def keypoint_command_error_exp( - env: ManagerBasedRLEnv, - command_name: str, - asset_cfg: SceneEntityCfg, - kp_exp_coeffs: list[tuple[float, float]] = [(1.0, 0.1)], - kp_use_sum_of_exps: bool = True, - keypoint_scale: float = 1.0, - add_cube_center_kp: bool = True, -) -> torch.Tensor: + + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + """Initialize the keypoint command error term. + + Args: + cfg: Reward term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Cache asset configuration + self.asset_cfg: SceneEntityCfg = cfg.params.get("asset_cfg", SceneEntityCfg("ee_frame")) + self.command_name: str = cfg.params.get("command_name", "ee_pose") + + # Create keypoint distance computer + self.keypoint_computer = ComputeKeypointDistance(cfg, env) + + def __call__( + self, + env: ManagerBasedRLEnv, + command_name: str, + asset_cfg: SceneEntityCfg, + keypoint_scale: float = 1.0, + add_cube_center_kp: bool = True, + ) -> torch.Tensor: + """Compute keypoint distance error. + + Args: + env: Environment instance + command_name: Name of the command containing desired pose + asset_cfg: Configuration of the asset to track + keypoint_scale: Scale factor for keypoint offsets + add_cube_center_kp: Whether to include center keypoint + + Returns: + Mean keypoint distance tensor of shape (num_envs,) + """ + # Extract frame transformer sensor + asset: FrameTransformer = env.scene[asset_cfg.name] + command = env.command_manager.get_command(command_name) + + # Get desired pose from command + des_pos_w = command[:, :3] + des_quat_w = command[:, 3:7] + + # Get current pose from frame transformer + curr_pos_w = asset.data.target_pos_source[:, 0] + curr_quat_w = asset.data.target_quat_source[:, 0] + + # Compute keypoint distance + keypoint_dist_sep = self.keypoint_computer.compute( + current_pos=curr_pos_w, + current_quat=curr_quat_w, + target_pos=des_pos_w, + target_quat=des_quat_w, + keypoint_scale=keypoint_scale, + ) + + return keypoint_dist_sep.mean(-1) + + +class KeypointCommandErrorExp(ManagerTermBase): """Compute exponential keypoint reward between current and desired poses from command. - The function computes the keypoint distance between the current pose of the end effector from - the frame transformer sensor and the desired pose from the command, then applies an exponential - reward function. The reward is computed using the formula: 1 / (exp(a * distance) + b + exp(-a * distance)) - where a and b are coefficients. + This class-based term uses ComputeKeypointDistance internally and applies + exponential reward transformation. + """ - Args: - env: The environment containing the asset - command_name: Name of the command containing desired pose - asset_cfg: Configuration of the asset to track (not used, kept for compatibility) - kp_exp_coeffs: List of (a, b) coefficient pairs for exponential reward - kp_use_sum_of_exps: Whether to use sum of exponentials (True) or single exponential (False) - keypoint_scale: Scale factor for keypoint offsets - add_cube_center_kp: Whether to include the center keypoint (0, 0, 0) + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + """Initialize the keypoint command error exponential term. + + Args: + cfg: Reward term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Cache asset configuration + self.asset_cfg: SceneEntityCfg = cfg.params.get("asset_cfg", SceneEntityCfg("ee_frame")) + self.command_name: str = cfg.params.get("command_name", "ee_pose") + + # Create keypoint distance computer + self.keypoint_computer = ComputeKeypointDistance(cfg, env) + + def __call__( + self, + env: ManagerBasedRLEnv, + command_name: str, + asset_cfg: SceneEntityCfg, + kp_exp_coeffs: list[tuple[float, float]] = [(1.0, 0.1)], + kp_use_sum_of_exps: bool = True, + keypoint_scale: float = 1.0, + add_cube_center_kp: bool = True, + ) -> torch.Tensor: + """Compute exponential keypoint reward. + + Args: + env: Environment instance + command_name: Name of the command containing desired pose + asset_cfg: Configuration of the asset to track + kp_exp_coeffs: List of (a, b) coefficient pairs for exponential reward + kp_use_sum_of_exps: Whether to use sum of exponentials + keypoint_scale: Scale factor for keypoint offsets + add_cube_center_kp: Whether to include center keypoint + + Returns: + Exponential keypoint reward tensor of shape (num_envs,) + """ + # Extract frame transformer sensor + asset: FrameTransformer = env.scene[asset_cfg.name] + command = env.command_manager.get_command(command_name) + + # Get desired pose from command + des_pos_w = command[:, :3] + des_quat_w = command[:, 3:7] + + # Get current pose from frame transformer + curr_pos_w = asset.data.target_pos_source[:, 0] + curr_quat_w = asset.data.target_quat_source[:, 0] + + # Compute keypoint distance + keypoint_dist_sep = self.keypoint_computer.compute( + current_pos=curr_pos_w, + current_quat=curr_quat_w, + target_pos=des_pos_w, + target_quat=des_quat_w, + keypoint_scale=keypoint_scale, + ) + + # Compute exponential reward + keypoint_reward_exp = torch.zeros_like(keypoint_dist_sep[:, 0]) + + if kp_use_sum_of_exps: + for coeff in kp_exp_coeffs: + a, b = coeff + keypoint_reward_exp += ( + 1.0 / (torch.exp(a * keypoint_dist_sep) + b + torch.exp(-a * keypoint_dist_sep)) + ).mean(-1) + else: + keypoint_dist = keypoint_dist_sep.mean(-1) + for coeff in kp_exp_coeffs: + a, b = coeff + keypoint_reward_exp += 1.0 / (torch.exp(a * keypoint_dist) + b + torch.exp(-a * keypoint_dist)) + + return keypoint_reward_exp + + +class KeypointEntityError(ManagerTermBase): + """Compute keypoint distance between a RigidObject and the dynamically selected gear. + + This class-based term pre-caches gear type mapping and asset references. + """ - Returns: - Exponential keypoint reward tensor of shape (num_envs,) where each element - is the exponential reward value + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + """Initialize the keypoint entity error term. + + Args: + cfg: Reward term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Cache asset configuration + self.asset_cfg_1: SceneEntityCfg = cfg.params.get("asset_cfg_1", SceneEntityCfg("factory_gear_base")) + self.asset_1 = env.scene[self.asset_cfg_1.name] + + # Pre-allocate gear type mapping and indices + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + self.env_indices = torch.arange(env.num_envs, device=env.device) + + # Cache gear assets + self.gear_assets = { + "gear_small": env.scene["factory_gear_small"], + "gear_medium": env.scene["factory_gear_medium"], + "gear_large": env.scene["factory_gear_large"], + } + + # Create keypoint distance computer + self.keypoint_computer = ComputeKeypointDistance(cfg, env) + + def __call__( + self, + env: ManagerBasedRLEnv, + asset_cfg_1: SceneEntityCfg, + keypoint_scale: float = 1.0, + add_cube_center_kp: bool = True, + ) -> torch.Tensor: + """Compute keypoint distance error. + + Args: + env: Environment instance + asset_cfg_1: Configuration of the first asset (RigidObject) + keypoint_scale: Scale factor for keypoint offsets + add_cube_center_kp: Whether to include center keypoint + + Returns: + Mean keypoint distance tensor of shape (num_envs,) + """ + # Get current pose of asset_1 (RigidObject) + curr_pos_1 = self.asset_1.data.body_pos_w[:, 0] + curr_quat_1 = self.asset_1.data.body_quat_w[:, 0] + + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this reward term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + current_gear_types = gear_type_manager.get_all_gear_types() + + # Stack all gear positions and quaternions + all_gear_pos = torch.stack( + [ + self.gear_assets["gear_small"].data.body_pos_w[:, 0], + self.gear_assets["gear_medium"].data.body_pos_w[:, 0], + self.gear_assets["gear_large"].data.body_pos_w[:, 0], + ], + dim=1, + ) + + all_gear_quat = torch.stack( + [ + self.gear_assets["gear_small"].data.body_quat_w[:, 0], + self.gear_assets["gear_medium"].data.body_quat_w[:, 0], + self.gear_assets["gear_large"].data.body_quat_w[:, 0], + ], + dim=1, + ) + + # Update gear_type_indices + for i in range(env.num_envs): + gear_type = current_gear_types[i] + if gear_type not in self.gear_type_map: + raise ValueError( + f"Invalid gear type '{gear_type}' for environment {i}. " + f"Valid types are: {list(self.gear_type_map.keys())}" + ) + self.gear_type_indices[i] = self.gear_type_map[gear_type] + + # Select positions and quaternions using advanced indexing + curr_pos_2 = all_gear_pos[self.env_indices, self.gear_type_indices] + curr_quat_2 = all_gear_quat[self.env_indices, self.gear_type_indices] + + # Compute keypoint distance + keypoint_dist_sep = self.keypoint_computer.compute( + current_pos=curr_pos_1, + current_quat=curr_quat_1, + target_pos=curr_pos_2, + target_quat=curr_quat_2, + keypoint_scale=keypoint_scale, + ) + + return keypoint_dist_sep.mean(-1) + + +class KeypointEntityErrorExp(ManagerTermBase): + """Compute exponential keypoint reward between a RigidObject and the dynamically selected gear. + + This class-based term pre-caches gear type mapping and asset references. """ - # extract the frame transformer sensor - asset: FrameTransformer = env.scene[asset_cfg.name] - command = env.command_manager.get_command(command_name) - - # obtain the desired pose from command (position and orientation) - des_pos_b = command[:, :3] - des_quat_b = command[:, 3:7] - - # transform desired pose to world frame using source frame from frame transformer - des_pos_w = des_pos_b - des_quat_w = des_quat_b - - # get current pose in world frame from frame transformer (end effector pose) - curr_pos_w = asset.data.target_pos_source[:, 0] # First target frame is end_effector - curr_quat_w = asset.data.target_quat_source[:, 0] # First target frame is end_effector - - # compute keypoint distance - keypoint_dist_sep = compute_keypoint_distance( - current_pos=curr_pos_w, - current_quat=curr_quat_w, - target_pos=des_pos_w, - target_quat=des_quat_w, - keypoint_scale=keypoint_scale, - add_cube_center_kp=add_cube_center_kp, - device=curr_pos_w.device, - ) - - # compute exponential reward - keypoint_reward_exp = torch.zeros_like(keypoint_dist_sep[:, 0]) # shape: (num_envs,) - - if kp_use_sum_of_exps: - # Use sum of exponentials: average across keypoints for each coefficient - for coeff in kp_exp_coeffs: - a, b = coeff - keypoint_reward_exp += ( - 1.0 / (torch.exp(a * keypoint_dist_sep) + b + torch.exp(-a * keypoint_dist_sep)) - ).mean(-1) - else: - # Use single exponential: average keypoint distance first, then apply exponential - keypoint_dist = keypoint_dist_sep.mean(-1) # shape: (num_envs,) - for coeff in kp_exp_coeffs: - a, b = coeff - keypoint_reward_exp += 1.0 / (torch.exp(a * keypoint_dist) + b + torch.exp(-a * keypoint_dist)) - return keypoint_reward_exp + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + """Initialize the keypoint entity error exponential term. + + Args: + cfg: Reward term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Cache asset configuration + self.asset_cfg_1: SceneEntityCfg = cfg.params.get("asset_cfg_1", SceneEntityCfg("factory_gear_base")) + self.asset_1 = env.scene[self.asset_cfg_1.name] + + # Pre-allocate gear type mapping and indices + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + self.env_indices = torch.arange(env.num_envs, device=env.device) + + # Cache gear assets + self.gear_assets = { + "gear_small": env.scene["factory_gear_small"], + "gear_medium": env.scene["factory_gear_medium"], + "gear_large": env.scene["factory_gear_large"], + } + + # Create keypoint distance computer + self.keypoint_computer = ComputeKeypointDistance(cfg, env) + + def __call__( + self, + env: ManagerBasedRLEnv, + asset_cfg_1: SceneEntityCfg, + kp_exp_coeffs: list[tuple[float, float]] = [(1.0, 0.1)], + kp_use_sum_of_exps: bool = True, + keypoint_scale: float = 1.0, + add_cube_center_kp: bool = True, + ) -> torch.Tensor: + """Compute exponential keypoint reward. + + Args: + env: Environment instance + asset_cfg_1: Configuration of the first asset (RigidObject) + kp_exp_coeffs: List of (a, b) coefficient pairs for exponential reward + kp_use_sum_of_exps: Whether to use sum of exponentials + keypoint_scale: Scale factor for keypoint offsets + add_cube_center_kp: Whether to include center keypoint + + Returns: + Exponential keypoint reward tensor of shape (num_envs,) + """ + # Get current pose of asset_1 (RigidObject) + curr_pos_1 = self.asset_1.data.body_pos_w[:, 0] + curr_quat_1 = self.asset_1.data.body_quat_w[:, 0] + + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this reward term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + current_gear_types = gear_type_manager.get_all_gear_types() + + # Stack all gear positions and quaternions + all_gear_pos = torch.stack( + [ + self.gear_assets["gear_small"].data.body_pos_w[:, 0], + self.gear_assets["gear_medium"].data.body_pos_w[:, 0], + self.gear_assets["gear_large"].data.body_pos_w[:, 0], + ], + dim=1, + ) + + all_gear_quat = torch.stack( + [ + self.gear_assets["gear_small"].data.body_quat_w[:, 0], + self.gear_assets["gear_medium"].data.body_quat_w[:, 0], + self.gear_assets["gear_large"].data.body_quat_w[:, 0], + ], + dim=1, + ) + + # Update gear_type_indices + for i in range(env.num_envs): + gear_type = current_gear_types[i] + if gear_type not in self.gear_type_map: + raise ValueError( + f"Invalid gear type '{gear_type}' for environment {i}. " + f"Valid types are: {list(self.gear_type_map.keys())}" + ) + self.gear_type_indices[i] = self.gear_type_map[gear_type] + + # Select positions and quaternions using advanced indexing + curr_pos_2 = all_gear_pos[self.env_indices, self.gear_type_indices] + curr_quat_2 = all_gear_quat[self.env_indices, self.gear_type_indices] + + # Compute keypoint distance + keypoint_dist_sep = self.keypoint_computer.compute( + current_pos=curr_pos_1, + current_quat=curr_quat_1, + target_pos=curr_pos_2, + target_quat=curr_quat_2, + keypoint_scale=keypoint_scale, + ) + + # Compute exponential reward + keypoint_reward_exp = torch.zeros_like(keypoint_dist_sep[:, 0]) + + if kp_use_sum_of_exps: + for coeff in kp_exp_coeffs: + a, b = coeff + keypoint_reward_exp += ( + 1.0 / (torch.exp(a * keypoint_dist_sep) + b + torch.exp(-a * keypoint_dist_sep)) + ).mean(-1) + else: + keypoint_dist = keypoint_dist_sep.mean(-1) + for coeff in kp_exp_coeffs: + a, b = coeff + keypoint_reward_exp += 1.0 / (torch.exp(a * keypoint_dist) + b + torch.exp(-a * keypoint_dist)) + + return keypoint_reward_exp diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/terminations.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/terminations.py new file mode 100644 index 00000000000..11c32371a42 --- /dev/null +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/mdp/terminations.py @@ -0,0 +1,326 @@ +# Copyright (c) 2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Class-based termination terms specific to the gear assembly manipulation environments.""" + +from __future__ import annotations + +import torch +from typing import TYPE_CHECKING + +import carb + +import isaaclab.utils.math as math_utils +from isaaclab.assets import Articulation +from isaaclab.managers import ManagerTermBase, SceneEntityCfg, TerminationTermCfg + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedEnv + + from .events import RandomizeGearType + + +class ResetWhenGearDropped(ManagerTermBase): + """Check if the gear has fallen out of the gripper and return reset flags. + + This class-based term pre-caches all required tensors and gear offsets. + """ + + def __init__(self, cfg: TerminationTermCfg, env: ManagerBasedEnv): + """Initialize the reset when gear dropped term. + + Args: + cfg: Termination term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Get robot asset configuration + self.robot_asset_cfg: SceneEntityCfg = cfg.params.get("robot_asset_cfg", SceneEntityCfg("robot")) + self.robot_asset: Articulation = env.scene[self.robot_asset_cfg.name] + + # Validate required parameters + if "end_effector_body_name" not in cfg.params: + raise ValueError( + "'end_effector_body_name' parameter is required in ResetWhenGearDropped configuration. " + "Example: 'wrist_3_link'" + ) + if "grasp_rot_offset" not in cfg.params: + raise ValueError( + "'grasp_rot_offset' parameter is required in ResetWhenGearDropped configuration. " + "It should be a quaternion [w, x, y, z]. Example: [0.0, 0.707, 0.707, 0.0]" + ) + + self.end_effector_body_name = cfg.params["end_effector_body_name"] + + # Pre-cache gear grasp offsets as tensors (required parameter) + if "gear_offsets_grasp" not in cfg.params: + raise ValueError( + "'gear_offsets_grasp' parameter is required in ResetWhenGearDropped configuration. " + "It should be a dict with keys 'gear_small', 'gear_medium', 'gear_large' mapping to [x, y, z] offsets." + ) + gear_offsets_grasp = cfg.params["gear_offsets_grasp"] + if not isinstance(gear_offsets_grasp, dict): + raise TypeError( + f"'gear_offsets_grasp' parameter must be a dict, got {type(gear_offsets_grasp).__name__}. " + "It should have keys 'gear_small', 'gear_medium', 'gear_large' mapping to [x, y, z] offsets." + ) + + self.gear_grasp_offset_tensors = {} + for gear_type in ["gear_small", "gear_medium", "gear_large"]: + if gear_type not in gear_offsets_grasp: + raise ValueError( + f"'{gear_type}' offset is required in 'gear_offsets_grasp' parameter. " + f"Found keys: {list(gear_offsets_grasp.keys())}" + ) + self.gear_grasp_offset_tensors[gear_type] = torch.tensor( + gear_offsets_grasp[gear_type], device=env.device, dtype=torch.float32 + ) + + # Pre-cache grasp rotation offset tensor + grasp_rot_offset = cfg.params["grasp_rot_offset"] + self.grasp_rot_offset_tensor = ( + torch.tensor(grasp_rot_offset, device=env.device, dtype=torch.float32).unsqueeze(0).repeat(env.num_envs, 1) + ) + + # Pre-allocate buffers + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + self.env_indices = torch.arange(env.num_envs, device=env.device) + self.gear_grasp_offsets_buffer = torch.zeros(env.num_envs, 3, device=env.device, dtype=torch.float32) + self.all_gear_pos_buffer = torch.zeros(env.num_envs, 3, 3, device=env.device, dtype=torch.float32) + self.all_gear_quat_buffer = torch.zeros(env.num_envs, 3, 4, device=env.device, dtype=torch.float32) + self.reset_flags = torch.zeros(env.num_envs, dtype=torch.bool, device=env.device) + + # Cache gear assets + self.gear_assets = { + "gear_small": env.scene["factory_gear_small"], + "gear_medium": env.scene["factory_gear_medium"], + "gear_large": env.scene["factory_gear_large"], + } + + # Find end effector index once + eef_indices, _ = self.robot_asset.find_bodies([self.end_effector_body_name]) + if len(eef_indices) == 0: + carb.log_warn( + f"{self.end_effector_body_name} not found in robot body names. Cannot check gear drop condition." + ) + self.eef_idx = None + else: + self.eef_idx = eef_indices[0] + + def __call__( + self, + env: ManagerBasedEnv, + distance_threshold: float = 0.1, + robot_asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), + gear_offsets_grasp: dict | None = None, + end_effector_body_name: str | None = None, + grasp_rot_offset: list | None = None, + ) -> torch.Tensor: + """Check if gear has dropped and return reset flags. + + Args: + env: Environment instance + distance_threshold: Maximum allowed distance between gear grasp point and gripper + robot_asset_cfg: Configuration for the robot asset (unused, kept for compatibility) + + Returns: + Boolean tensor indicating which environments should be reset + """ + # Reset flags + self.reset_flags.fill_(False) + + if self.eef_idx is None: + return self.reset_flags + + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this termination term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + current_gear_types = gear_type_manager.get_all_gear_types() + + # Get end effector position + eef_pos_world = self.robot_asset.data.body_link_pos_w[:, self.eef_idx] + + # Update gear positions and quaternions in buffers + self.all_gear_pos_buffer[:, 0, :] = self.gear_assets["gear_small"].data.root_link_pos_w + self.all_gear_pos_buffer[:, 1, :] = self.gear_assets["gear_medium"].data.root_link_pos_w + self.all_gear_pos_buffer[:, 2, :] = self.gear_assets["gear_large"].data.root_link_pos_w + + self.all_gear_quat_buffer[:, 0, :] = self.gear_assets["gear_small"].data.root_link_quat_w + self.all_gear_quat_buffer[:, 1, :] = self.gear_assets["gear_medium"].data.root_link_quat_w + self.all_gear_quat_buffer[:, 2, :] = self.gear_assets["gear_large"].data.root_link_quat_w + + # Update gear_type_indices + for i in range(env.num_envs): + self.gear_type_indices[i] = self.gear_type_map[current_gear_types[i]] + + # Select gear data using advanced indexing + gear_pos_world = self.all_gear_pos_buffer[self.env_indices, self.gear_type_indices] + gear_quat_world = self.all_gear_quat_buffer[self.env_indices, self.gear_type_indices] + + # Apply rotation offset + gear_quat_world = math_utils.quat_mul(gear_quat_world, self.grasp_rot_offset_tensor) + + # Get grasp offsets + for i in range(env.num_envs): + self.gear_grasp_offsets_buffer[i] = self.gear_grasp_offset_tensors[current_gear_types[i]] + + # Transform grasp offsets to world frame + gear_grasp_pos_world = gear_pos_world + math_utils.quat_apply(gear_quat_world, self.gear_grasp_offsets_buffer) + + # Compute distances + distances = torch.norm(gear_grasp_pos_world - eef_pos_world, dim=-1) + + # Check distance threshold + self.reset_flags[:] = distances > distance_threshold + + return self.reset_flags + + +class ResetWhenGearOrientationExceedsThreshold(ManagerTermBase): + """Check if the gear's orientation relative to the gripper exceeds thresholds. + + This class-based term pre-caches all required tensors and thresholds. + """ + + def __init__(self, cfg: TerminationTermCfg, env: ManagerBasedEnv): + """Initialize the reset when gear orientation exceeds threshold term. + + Args: + cfg: Termination term configuration + env: Environment instance + """ + super().__init__(cfg, env) + + # Get robot asset configuration + self.robot_asset_cfg: SceneEntityCfg = cfg.params.get("robot_asset_cfg", SceneEntityCfg("robot")) + self.robot_asset: Articulation = env.scene[self.robot_asset_cfg.name] + + # Validate required parameters + if "end_effector_body_name" not in cfg.params: + raise ValueError( + "'end_effector_body_name' parameter is required in ResetWhenGearOrientationExceedsThreshold" + " configuration. Example: 'wrist_3_link'" + ) + if "grasp_rot_offset" not in cfg.params: + raise ValueError( + "'grasp_rot_offset' parameter is required in ResetWhenGearOrientationExceedsThreshold configuration. " + "It should be a quaternion [w, x, y, z]. Example: [0.0, 0.707, 0.707, 0.0]" + ) + + self.end_effector_body_name = cfg.params["end_effector_body_name"] + + # Pre-cache grasp rotation offset tensor + grasp_rot_offset = cfg.params["grasp_rot_offset"] + self.grasp_rot_offset_tensor = ( + torch.tensor(grasp_rot_offset, device=env.device, dtype=torch.float32).unsqueeze(0).repeat(env.num_envs, 1) + ) + + # Pre-allocate buffers + self.gear_type_map = {"gear_small": 0, "gear_medium": 1, "gear_large": 2} + self.gear_type_indices = torch.zeros(env.num_envs, device=env.device, dtype=torch.long) + self.env_indices = torch.arange(env.num_envs, device=env.device) + self.all_gear_quat_buffer = torch.zeros(env.num_envs, 3, 4, device=env.device, dtype=torch.float32) + self.reset_flags = torch.zeros(env.num_envs, dtype=torch.bool, device=env.device) + + # Cache gear assets + self.gear_assets = { + "gear_small": env.scene["factory_gear_small"], + "gear_medium": env.scene["factory_gear_medium"], + "gear_large": env.scene["factory_gear_large"], + } + + # Find end effector index once + eef_indices, _ = self.robot_asset.find_bodies([self.end_effector_body_name]) + if len(eef_indices) == 0: + carb.log_warn( + f"{self.end_effector_body_name} not found in robot body names. Cannot check gear orientation condition." + ) + self.eef_idx = None + else: + self.eef_idx = eef_indices[0] + + def __call__( + self, + env: ManagerBasedEnv, + roll_threshold_deg: float = 30.0, + pitch_threshold_deg: float = 30.0, + yaw_threshold_deg: float = 180.0, + robot_asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), + end_effector_body_name: str | None = None, + grasp_rot_offset: list | None = None, + ) -> torch.Tensor: + """Check if gear orientation exceeds thresholds and return reset flags. + + Args: + env: Environment instance + roll_threshold_deg: Maximum allowed roll angle deviation in degrees + pitch_threshold_deg: Maximum allowed pitch angle deviation in degrees + yaw_threshold_deg: Maximum allowed yaw angle deviation in degrees + robot_asset_cfg: Configuration for the robot asset (unused, kept for compatibility) + + Returns: + Boolean tensor indicating which environments should be reset + """ + # Reset flags + self.reset_flags.fill_(False) + + if self.eef_idx is None: + return self.reset_flags + + # Check if gear type manager exists + if not hasattr(env, "_gear_type_manager"): + raise RuntimeError( + "Gear type manager not initialized. Ensure RandomizeGearType event is configured " + "in your environment's event configuration before this termination term is used." + ) + + gear_type_manager: RandomizeGearType = env._gear_type_manager + current_gear_types = gear_type_manager.get_all_gear_types() + + # Convert thresholds to radians + roll_threshold_rad = torch.deg2rad(torch.tensor(roll_threshold_deg, device=env.device)) + pitch_threshold_rad = torch.deg2rad(torch.tensor(pitch_threshold_deg, device=env.device)) + yaw_threshold_rad = torch.deg2rad(torch.tensor(yaw_threshold_deg, device=env.device)) + + # Get end effector orientation + eef_quat_world = self.robot_asset.data.body_link_quat_w[:, self.eef_idx] + + # Update gear quaternions in buffer + self.all_gear_quat_buffer[:, 0, :] = self.gear_assets["gear_small"].data.root_link_quat_w + self.all_gear_quat_buffer[:, 1, :] = self.gear_assets["gear_medium"].data.root_link_quat_w + self.all_gear_quat_buffer[:, 2, :] = self.gear_assets["gear_large"].data.root_link_quat_w + + # Update gear_type_indices + for i in range(env.num_envs): + self.gear_type_indices[i] = self.gear_type_map[current_gear_types[i]] + + # Select gear data using advanced indexing + gear_quat_world = self.all_gear_quat_buffer[self.env_indices, self.gear_type_indices] + + # Apply rotation offset + gear_quat_world = math_utils.quat_mul(gear_quat_world, self.grasp_rot_offset_tensor) + + # Compute relative orientation: q_rel = q_gear * q_eef^-1 + eef_quat_inv = math_utils.quat_conjugate(eef_quat_world) + relative_quat = math_utils.quat_mul(gear_quat_world, eef_quat_inv) + + # Convert relative quaternion to Euler angles + roll, pitch, yaw = math_utils.euler_xyz_from_quat(relative_quat) + + # Check if any angle exceeds its threshold + self.reset_flags[:] = ( + (torch.abs(roll) > roll_threshold_rad) + | (torch.abs(pitch) > pitch_threshold_rad) + | (torch.abs(yaw) > yaw_threshold_rad) + ) + + return self.reset_flags diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/reach/reach_env_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/reach/reach_env_cfg.py index 767de2160e5..b3106110fd0 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/reach/reach_env_cfg.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/deploy/reach/reach_env_cfg.py @@ -153,7 +153,7 @@ class RewardsCfg: """Reward terms for the MDP.""" end_effector_keypoint_tracking = RewTerm( - func=mdp.keypoint_command_error, + func=mdp.KeypointCommandError, weight=-1.5, params={ "asset_cfg": SceneEntityCfg("ee_frame"), @@ -162,7 +162,7 @@ class RewardsCfg: }, ) end_effector_keypoint_tracking_exp = RewTerm( - func=mdp.keypoint_command_error_exp, + func=mdp.KeypointCommandErrorExp, weight=1.5, params={ "asset_cfg": SceneEntityCfg("ee_frame"),