Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delay #1621

Closed
wants to merge 5 commits into from
Closed

delay #1621

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,41 +1,14 @@
from pathlib import Path
from typing import Any, ClassVar, override

import mujoco
import numpy as np
from gymnasium.envs.mujoco.mujoco_env import MujocoEnv
from gymnasium.spaces import Box
from nao_interface import Nao
from numpy.typing import NDArray
from throwing import ThrowableObject

SENSOR_NAMES = [
"accelerometer",
"gyroscope",
"head.yaw",
"head.pitch",
"left_leg.hip_yaw_pitch",
"left_leg.hip_roll",
"left_leg.hip_pitch",
"left_leg.knee_pitch",
"left_leg.ankle_pitch",
"left_leg.ankle_roll",
"right_leg.hip_roll",
"right_leg.hip_pitch",
"right_leg.knee_pitch",
"right_leg.ankle_pitch",
"right_leg.ankle_roll",
"left_arm.shoulder_pitch",
"left_arm.shoulder_roll",
"left_arm.elbow_yaw",
"left_arm.elbow_roll",
"left_arm.wrist_yaw",
"right_arm.shoulder_pitch",
"right_arm.shoulder_roll",
"right_arm.elbow_yaw",
"right_arm.elbow_roll",
"right_arm.wrist_yaw",
]

ACTUATOR_NAMES = [
"left_leg.hip_yaw_pitch",
"left_leg.hip_roll",
Expand Down Expand Up @@ -83,6 +56,7 @@ def __init__(
*,
throw_tomatoes: bool = False,
fsr_scale: float = 0.019,
sensor_delay: int = 0,
**kwargs: Any,
) -> None:
observation_space = Box(
Expand All @@ -109,7 +83,15 @@ def __init__(
)
self._actuation_mask = self._get_actuation_mask()
self.action_space_size = len(ACTUATOR_NAMES)
self.nao = Nao(self.model, self.data, fsr_scale=fsr_scale)
self.nao = Nao(
self.model,
self.data,
fsr_scale=fsr_scale,
position_sensor_delay=sensor_delay,
fsr_sensor_delay=sensor_delay,
gyroscope_sensor_delay=sensor_delay,
accelerometer_sensor_delay=sensor_delay,
)

def _get_actuation_mask(self) -> NDArray[np.bool_]:
actuation_mask = np.zeros(self.model.nu, dtype=np.bool_)
Expand All @@ -131,26 +113,39 @@ def _set_action_space(self) -> Box:
return self.action_space

def _get_obs(self) -> NDArray[np.floating]:
force_sensing_resistors_right = self.nao.right_fsr_values().sum()
force_sensing_resistors_left = self.nao.left_fsr_values().sum()
force_sensing_resistors_left = self.nao.left_fsr().sum(keepdims=True)
force_sensing_resistors_right = self.nao.right_fsr().sum(keepdims=True)

sensors = np.concatenate(
positions = self.nao.position_encoders()
gyroscopes = self.nao.gyroscope()
accelerometer = self.nao.accelerometer()

return np.concatenate(
[
self.data.sensor(sensor_name).data
for sensor_name in SENSOR_NAMES
],
)
fsrs = np.array(
[force_sensing_resistors_right, force_sensing_resistors_left],
positions,
gyroscopes,
accelerometer,
force_sensing_resistors_left,
force_sensing_resistors_right,
]
)

return np.concatenate([sensors, fsrs])
@override
def _step_mujoco_simulation(self, ctrl: NDArray, n_frames: int) -> None:
self.nao.data.ctrl[self._actuation_mask] = ctrl

mujoco.mj_step(self.model, self.data, nstep=n_frames)

# As of MuJoCo 2.0, force-related quantities like cacc are not computed
# unless there's a force sensor in the model.
# See https://github.com/openai/gym/issues/1541
mujoco.mj_rnePostConstraint(self.model, self.data)

@override
def do_simulation(
self,
ctrl: NDArray[np.floating],
n_frames: int,
) -> None:
self.data.ctrl[self._actuation_mask] = ctrl
self._step_mujoco_simulation(self.data.ctrl, n_frames)
self._step_mujoco_simulation(ctrl, n_frames)
self.nao.update_sensors()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import numpy as np
from gymnasium import utils
from nao_interface.poses import PENALIZED_POSE
from nao_interface.poses import PENALIZED_POSE, READY_POSE
from numpy.typing import NDArray
from rewards import (
ConstantReward,
Expand All @@ -17,16 +17,16 @@
OFFSET_QPOS = np.array(
[
0.0,
0.0,
0.09,
-0.06,
0.01,
-0.002,
0.0,
0.09,
-0.06,
0.01,
0.002,
0.010821502783627257,
-0.3107718500421241,
0.8246279211008891,
-0.513856071058765,
-0.010821502783627453,
-0.010821502783627257,
-0.3107718500421241,
0.8246279211008891,
-0.513856071058765,
0.010821502783627453,
1.57,
0.1,
-1.57,
Expand All @@ -52,6 +52,7 @@ def __init__(
) -> None:
super().__init__(
throw_tomatoes=throw_tomatoes,
sensor_delay=3,
**kwargs,
)

Expand Down Expand Up @@ -110,5 +111,5 @@ def reset_model(self) -> NDArray[np.floating]:
self.init_qpos,
self.init_qvel,
)
self.nao.reset(PENALIZED_POSE)
self.nao.reset(READY_POSE)
return self._get_obs()
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class NaoWalking(NaoBaseEnv, utils.EzPickle):
def __init__(self, *, throw_tomatoes: bool, **kwargs: Any) -> None:
super().__init__(
throw_tomatoes=throw_tomatoes,
sensor_delay=3,
**kwargs,
)

Expand Down Expand Up @@ -116,7 +117,15 @@ def __init__(self, *, throw_tomatoes: bool, **kwargs: Any) -> None:
utils.EzPickle.__init__(self, **kwargs)

@override
def step(self, action: NDArray[np.floating]) -> tuple:
def step(
self, action: NDArray[np.floating]
) -> tuple[
NDArray[np.float64],
np.float64,
bool,
bool,
dict[str, np.float64],
]:
robot_position = self.data.site("Robot").xpos

if self.projectile.has_ground_contact() and self.throw_tomatoes:
Expand All @@ -138,7 +147,7 @@ def step(self, action: NDArray[np.floating]) -> tuple:
distinct_rewards = self.reward.rewards(
RewardContext(self.nao, action, self.state)
)
reward = sum(distinct_rewards.values())
reward = sum(distinct_rewards.values(), np.floating(0.0))

head_center_z = self.data.site("head_center").xpos[2]
terminated = head_center_z < HEAD_THRESHOLD_HEIGHT
Expand All @@ -155,8 +164,8 @@ def do_simulation(
ctrl: NDArray[np.floating],
n_frames: int,
) -> None:
right_pressure = self.nao.right_fsr_values().sum()
left_pressure = self.nao.left_fsr_values().sum()
right_pressure = self.nao.right_fsr().sum()
left_pressure = self.nao.left_fsr().sum()

measurements = Measurements(left_pressure, right_pressure)
self.nao.data.ctrl[:] = OFFSET_QPOS
Expand All @@ -180,7 +189,7 @@ def do_simulation(
dt=dt,
)
self.nao.data.ctrl[self._actuation_mask] += ctrl
self._step_mujoco_simulation(self.nao.data.ctrl, n_frames)
super().do_simulation(self.nao.data.ctrl, n_frames)

@override
def reset_model(self) -> NDArray[np.floating]:
Expand All @@ -194,8 +203,8 @@ def reset_model(self) -> NDArray[np.floating]:
self.nao.reset(READY_POSE)

measurements = Measurements(
self.nao.left_fsr_values().sum(),
self.nao.right_fsr_values().sum(),
self.nao.left_fsr().sum(),
self.nao.right_fsr().sum(),
)

apply_walking(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import numpy as np
from numpy.typing import NDArray


class RingBuffer:
data: NDArray
index: int = 0

def __init__(self, size: int, initial_value: NDArray) -> None:
self.data = np.broadcast_to(
initial_value, (size, initial_value.shape[0])
).copy()

def push(self, value: NDArray) -> None:
self.index = (self.index + 1) % self.data.shape[0]
self.data[self.index] = value

def right(self) -> NDArray:
return self.data[self.index]

def left(self) -> NDArray:
return self.data[(self.index + 1) % self.data.shape[0]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import numpy as np

from .ring_buffer import RingBuffer


def test_initialization() -> None:
buffer = RingBuffer(1, np.array([1, 2, 3]))
assert np.all(buffer.data == np.array([[1, 2, 3]]))


def test_buffer_initialization() -> None:
buffer = RingBuffer(2, np.array([1, 2, 3]))
assert np.all(buffer.data == np.array([[1, 2, 3], [1, 2, 3]]))


def test_push() -> None:
buffer = RingBuffer(2, np.array([1, 2, 3]))
buffer.push(np.array([4, 5, 6]))
assert np.all(buffer.data == np.array([[1, 2, 3], [4, 5, 6]]))


def test_push_overflow() -> None:
buffer = RingBuffer(2, np.array([1, 2, 3]))
buffer.push(np.array([4, 5, 6]))
buffer.push(np.array([7, 8, 9]))
assert np.all(buffer.data == np.array([[7, 8, 9], [4, 5, 6]]))


def test_right() -> None:
buffer = RingBuffer(2, np.array([1, 2, 3]))
buffer.push(np.array([4, 5, 6]))
assert np.all(buffer.right() == np.array([4, 5, 6]))


def test_left() -> None:
buffer = RingBuffer(2, np.array([1, 2, 3]))
buffer.push(np.array([4, 5, 6]))
assert np.all(buffer.left() == np.array([1, 2, 3]))


def test_right_with_overflow() -> None:
buffer = RingBuffer(2, np.array([1, 2, 3]))
buffer.push(np.array([4, 5, 6]))
buffer.push(np.array([7, 8, 9]))
assert np.all(buffer.right() == np.array([7, 8, 9]))


def test_left_with_overflow() -> None:
buffer = RingBuffer(2, np.array([1, 2, 3]))
buffer.push(np.array([4, 5, 6]))
buffer.push(np.array([7, 8, 9]))
assert np.all(buffer.left() == np.array([4, 5, 6]))
Loading
Loading