Deploying RL Policy Example
This document demonstrates how to deploy a pre-trained reinforcement learning walking policy in the UserCmd state through the AuroraClient API, achieving:
-
FSM state switching to user command state
-
Walking velocity commands input via joystick
-
Real-time obtaining of robot state and construction of policy observations
-
Model inference using torch
-
Sending joint position control commands
FSM State Switching
Deploying a whole-body locomotion policy requires switching to User Command State (FSM state 10). In this state, users can send whole-body joint position commands and PD configuration commands through the client interface to control the robot.
# Switch to PD-Stand mode (FSM state 2)
input("Please Press Enter to switch to FSM state 2 (PDstand)")
client.set_fsm_state(2)
# After robot stabilizes, switch to user command task mode (FSM state 10)
input("After the robot is standing firmly on the ground, Please press Enter to switch to FSM state 10 (User command task) ")
client.set_fsm_state(10)
time.sleep(0.5)
Configuring Motor PD Parameters
In the user command state, users can configure motor PD parameters through the set_motor_cfg interface:
kp_config = {
"left_leg": [180.0, 120.0, 90.0, 120.0, 45.0, 45.0],
"right_leg": [180.0, 120.0, 90.0, 120.0, 45.0, 45.0],
"waist": [90.0],
"left_manipulator": [90.0, 45.0, 45.0, 45.0, 45.0],
"right_manipulator": [90.0, 45.0, 45.0, 45.0, 45.0]
}
kd_config = {
"left_leg": [10.0, 10.0, 8.0, 8.0, 2.5, 2.5],
"right_leg": [10.0, 10.0, 8.0, 8.0, 2.5, 2.5],
"waist": [8.0],
"left_manipulator": [ 8.0, 2.5, 2.5, 2.5, 2.5],
"right_manipulator": [8.0, 2.5, 2.5, 2.5, 2.5]
}
client.set_motor_cfg(kp_config, kd_config)
Monitoring Joystick Input
This example uses the pygame library to listen to joystick input, obtaining left and right stick inputs to control the robot's walking velocity:
def joystick_listener():
global joystick, axis_left, axis_right
while not stop_event.is_set():
pygame.event.get()
axis_left = joystick.get_axis(0), joystick.get_axis(1)
axis_right = joystick.get_axis(3), 0
time.sleep(0.02)
joystick_count = pygame.joystick.get_count()
if joystick_count == 0:
# No joystick detected → exit the program
print("❌ ❌ Error: A joystick is required")
pygame.quit()
client.close()
sys.exit(1)
else:
print(f"✅ Detected {joystick_count} joystick(s)")
joystick = pygame.joystick.Joystick(0)
joystick.init()
thread_joystick_listener = threading.Thread(target=joystick_listener)
thread_joystick_listener.start()
Obtaining Robot State Information and Constructing Observation Vectors
Obtain robot base data and joint states through the get_base_data() and get_group_state interfaces, and construct observation vectors combined with velocity commands from joystick input as policy inputs:
# ============================================================
# 2. Obtain robot states
# ============================================================
imu_measured_quat = client.get_base_data('quat_xyzw')
imu_measured_angular_velocity = client.get_base_data('omega_B')
group_names = ['left_leg', 'right_leg', 'waist']
joint_measured_position = []
joint_measured_velocity = []
for name in group_names:
joint_measured_position.append(client.get_group_state(name, 'position'))
joint_measured_velocity.append(client.get_group_state(name, 'velocity'))
joint_measured_position = numpy.concatenate(joint_measured_position)
joint_measured_velocity = numpy.concatenate(joint_measured_velocity)
# ============================================================
# 3. Construct velocity commands (from joystick) [lin_vel_x, lin_vel_y, ang_vel_yaw], unit: m/s, m/s, rad/s
# ============================================================
commands_safety_ratio = 0.9
command_linear_velocity_x_range = torch.tensor(numpy.array([[-0.2, 0.20]]), dtype=torch.float) \
* commands_safety_ratio
command_linear_velocity_y_range = torch.tensor(numpy.array([[-0.5, 0.5]]), dtype=torch.float) \
* commands_safety_ratio
command_angular_velocity_yaw_range = torch.tensor(numpy.array([[-1.00, 1.00]]), dtype=torch.float) \
* commands_safety_ratio
commands = numpy.array([0.0, 0.0, 0.0, ])
commands_norm = \
numpy.array([
-1 * axis_left[1],
-1 * axis_left[0],
-1 * axis_right[0],
])
commands = \
numpy.array([
commands_norm[0] * numpy.abs(command_linear_velocity_x_range[0, 1].numpy())
if commands_norm[0] >= 0 else
commands_norm[0] * numpy.abs(command_linear_velocity_x_range[0, 0].numpy()),
commands_norm[1] * numpy.abs(command_linear_velocity_y_range[0, 1].numpy())
if commands_norm[1] >= 0 else
commands_norm[1] * numpy.abs(command_linear_velocity_y_range[0, 0].numpy()),
commands_norm[2] * numpy.abs(command_angular_velocity_yaw_range[0, 1].numpy())
if commands_norm[2] >= 0 else
commands_norm[2] * numpy.abs(command_angular_velocity_yaw_range[0, 0].numpy())
])
print("commands:", commands)
# ============================================================
# 4. Construct observations (obs buffer)
# ============================================================
base_measured_quat = numpy.array([0.0, 0.0, 0.0, 1.0, ])
base_measured_angular_velocity = numpy.array([0.0, 0.0, 0.0, ])
for i in range(4):
base_measured_quat[i] = imu_measured_quat[i]
for i in range(3):
base_measured_angular_velocity[i] = imu_measured_angular_velocity[i]
joint_measured_position_for_policy = numpy.zeros(policy_control_num_of_joints)
joint_measured_velocity_for_policy = numpy.zeros(policy_control_num_of_joints)
for i in range(policy_control_num_of_joints):
index = policy_control_index_of_joints[i]
joint_measured_position_for_policy[i] = joint_measured_position[index]
joint_measured_velocity_for_policy[i] = joint_measured_velocity[index]
if policy_action is None:
policy_action = numpy.zeros(policy_control_num_of_joints)
torch_commands = torch.from_numpy(commands).float().unsqueeze(0)
torch_base_measured_quat = torch.from_numpy(base_measured_quat).float().unsqueeze(0)
torch_base_measured_angular_velocity = torch.from_numpy(base_measured_angular_velocity).float().unsqueeze(0)
torch_joint_measured_position_for_policy = torch.from_numpy(joint_measured_position_for_policy).float().unsqueeze(0)
torch_joint_measured_velocity_for_policy = torch.from_numpy(joint_measured_velocity_for_policy).float().unsqueeze(0)
torch_default_joint_position = torch.from_numpy(default_joint_position).float().unsqueeze(0)
torch_gravity_vector = torch.from_numpy(gravity_vector).float().unsqueeze(0)
torch_base_project_gravity = torch_quat_rotate_inverse(torch_base_measured_quat, torch_gravity_vector)
torch_measured_position_offset_for_policy = torch_joint_measured_position_for_policy \
- torch_default_joint_position
torch_action = torch.from_numpy(policy_action).float().unsqueeze(0)
obs_buf = torch.cat([
torch_commands,
torch_base_measured_angular_velocity,
torch_base_project_gravity,
torch_measured_position_offset_for_policy,
torch_joint_measured_velocity_for_policy * 0.1,
torch_action,
], dim=-1)
obs_len = obs_buf.shape[-1]
stack_size = 5
if obs_buf_stack is None:
obs_buf_stack = torch.cat([obs_buf] * stack_size, dim=1).float()
obs_buf_stack = torch.cat([
obs_buf_stack[:, obs_len:],
obs_buf,
], dim=1).float()
Model Inference and Sending Joint Control Commands
Input the constructed observation vectors into the reinforcement learning policy model for inference, obtain joint policy actions, generate joint target positions, and send them to the robot for execution through the set_joint_positions interface:
# ============================================================
# 5. Generate action
# ============================================================
torch_policy_action = policy_model(obs_buf_stack).detach()
torch_policy_action = torch.clip(
torch_policy_action,
min=torch.from_numpy(action_clip_min).float().unsqueeze(0),
max=torch.from_numpy(action_clip_max).float().unsqueeze(0),
)
policy_action = torch_policy_action.numpy().squeeze(0)
torch_joint_target_position_from_policy = torch_policy_action \
+ torch_default_joint_position
joint_target_position_from_policy = torch_joint_target_position_from_policy.numpy().squeeze(0) # unit : rad
joint_target_position = numpy.zeros(robot_num_of_joints)
for i in range(policy_control_num_of_joints):
index = policy_control_index_of_joints[i]
joint_target_position[index] = joint_target_position_from_policy[i]
# ============================================================
# 6. Set position command
# ============================================================
whole_body_pos = {"whole_body": joint_target_position}
client.set_joint_positions(whole_body_pos)
Complete Example
The following is a complete example of deploying an RL policy. Example code location: demo_walk
# ============================================================
# In this demo, a joystick is required to control the walking direction and velocity
# ============================================================
import time
from fourier_aurora_client import AuroraClient
import os
import numpy
import torch
import threading
import sys
import pygame
from ischedule import run_loop, schedule
# Initialize client
client = AuroraClient.get_instance(domain_id=123, robot_name="fouriern1")
time.sleep(1)
policy_file_path = None
policy_model = None
policy_action = None
obs_buf_stack = None
def algorithm():
global policy_model, policy_action, obs_buf_stack, commands_filtered
# ============================================================
# 1. Definition
# ============================================================
robot_num_of_joints = 23
policy_control_num_of_joints = 13 # left leg + right leg + waist
policy_control_index_of_joints = numpy.array([
0, 1, 2, 3, 4, 5, # left leg
6, 7, 8, 9, 10, 11, # right leg
12, # waist
])
default_joint_position = numpy.array([
# left leg
-0.2468, 0.0, 0.0, 0.5181, 0.0, -0.2408,
# right leg
-0.2468, 0.0, 0.0, 0.5181, 0.0, -0.2408,
# waist
0.0,
])
gravity_vector = numpy.array([
0.0, 0.0, -1.0
])
action_clip_max = numpy.array([
3.1416, 2.0946, 2.0946, 2.8796, 0.9596, 1.3616,
3.1416, 0.7856, 2.0946, 2.8796, 0.9596, 1.3616,
3.1416
])
action_clip_min = numpy.array([
-3.1416, -0.7856, -2.0946, -0.6106, -0.9596, -1.4836,
-3.1416, -2.0946, -2.0946, -0.6106, -0.9596, -1.4836,
-3.1416
])
# ============================================================
# 2. Obtain robot states
# ============================================================
imu_measured_quat = client.get_base_data('quat_xyzw')
imu_measured_angular_velocity = client.get_base_data('omega_B')
group_names = ['left_leg', 'right_leg', 'waist']
joint_measured_position = []
joint_measured_velocity = []
for name in group_names:
joint_measured_position.append(client.get_group_state(name, 'position'))
joint_measured_velocity.append(client.get_group_state(name, 'velocity'))
joint_measured_position = numpy.concatenate(joint_measured_position)
joint_measured_velocity = numpy.concatenate(joint_measured_velocity)
# ============================================================
# 3. Construct velocity commands (from joystick) [lin_vel_x, lin_vel_y, ang_vel_yaw], unit: m/s, m/s, rad/s
# ============================================================
commands_safety_ratio = 0.9
command_linear_velocity_x_range = torch.tensor(numpy.array([[-0.2, 0.20]]), dtype=torch.float) \
* commands_safety_ratio
command_linear_velocity_y_range = torch.tensor(numpy.array([[-0.5, 0.5]]), dtype=torch.float) \
* commands_safety_ratio
command_angular_velocity_yaw_range = torch.tensor(numpy.array([[-1.00, 1.00]]), dtype=torch.float) \
* commands_safety_ratio
commands = numpy.array([0.0, 0.0, 0.0, ])
commands_norm = \
numpy.array([
-1 * axis_left[1],
-1 * axis_left[0],
-1 * axis_right[0],
])
commands = \
numpy.array([
commands_norm[0] * numpy.abs(command_linear_velocity_x_range[0, 1].numpy())
if commands_norm[0] >= 0 else
commands_norm[0] * numpy.abs(command_linear_velocity_x_range[0, 0].numpy()),
commands_norm[1] * numpy.abs(command_linear_velocity_y_range[0, 1].numpy())
if commands_norm[1] >= 0 else
commands_norm[1] * numpy.abs(command_linear_velocity_y_range[0, 0].numpy()),
commands_norm[2] * numpy.abs(command_angular_velocity_yaw_range[0, 1].numpy())
if commands_norm[2] >= 0 else
commands_norm[2] * numpy.abs(command_angular_velocity_yaw_range[0, 0].numpy())
])
print("commands:", commands)
# ============================================================
# 4. Construct observations (obs buffer)
# ============================================================
base_measured_quat = numpy.array([0.0, 0.0, 0.0, 1.0, ])
base_measured_angular_velocity = numpy.array([0.0, 0.0, 0.0, ])
for i in range(4):
base_measured_quat[i] = imu_measured_quat[i]
for i in range(3):
base_measured_angular_velocity[i] = imu_measured_angular_velocity[i]
joint_measured_position_for_policy = numpy.zeros(policy_control_num_of_joints)
joint_measured_velocity_for_policy = numpy.zeros(policy_control_num_of_joints)
for i in range(policy_control_num_of_joints):
index = policy_control_index_of_joints[i]
joint_measured_position_for_policy[i] = joint_measured_position[index]
joint_measured_velocity_for_policy[i] = joint_measured_velocity[index]
if policy_action is None:
policy_action = numpy.zeros(policy_control_num_of_joints)
torch_commands = torch.from_numpy(commands).float().unsqueeze(0)
torch_base_measured_quat = torch.from_numpy(base_measured_quat).float().unsqueeze(0)
torch_base_measured_angular_velocity = torch.from_numpy(base_measured_angular_velocity).float().unsqueeze(0)
torch_joint_measured_position_for_policy = torch.from_numpy(joint_measured_position_for_policy).float().unsqueeze(0)
torch_joint_measured_velocity_for_policy = torch.from_numpy(joint_measured_velocity_for_policy).float().unsqueeze(0)
torch_default_joint_position = torch.from_numpy(default_joint_position).float().unsqueeze(0)
torch_gravity_vector = torch.from_numpy(gravity_vector).float().unsqueeze(0)
torch_base_project_gravity = torch_quat_rotate_inverse(torch_base_measured_quat, torch_gravity_vector)
torch_measured_position_offset_for_policy = torch_joint_measured_position_for_policy \
- torch_default_joint_position
torch_action = torch.from_numpy(policy_action).float().unsqueeze(0)
obs_buf = torch.cat([
torch_commands,
torch_base_measured_angular_velocity,
torch_base_project_gravity,
torch_measured_position_offset_for_policy,
torch_joint_measured_velocity_for_policy * 0.1,
torch_action,
], dim=-1)
obs_len = obs_buf.shape[-1]
stack_size = 5
if obs_buf_stack is None:
obs_buf_stack = torch.cat([obs_buf] * stack_size, dim=1).float()
obs_buf_stack = torch.cat([
obs_buf_stack[:, obs_len:],
obs_buf,
], dim=1).float()
# ============================================================
# 5. Generate action
# ============================================================
torch_policy_action = policy_model(obs_buf_stack).detach()
torch_policy_action = torch.clip(
torch_policy_action,
min=torch.from_numpy(action_clip_min).float().unsqueeze(0),
max=torch.from_numpy(action_clip_max).float().unsqueeze(0),
)
policy_action = torch_policy_action.numpy().squeeze(0)
torch_joint_target_position_from_policy = torch_policy_action \
+ torch_default_joint_position
joint_target_position_from_policy = torch_joint_target_position_from_policy.numpy().squeeze(0) # unit : rad
joint_target_position = numpy.zeros(robot_num_of_joints)
for i in range(policy_control_num_of_joints):
index = policy_control_index_of_joints[i]
joint_target_position[index] = joint_target_position_from_policy[i]
# ============================================================
# 6. Set position command
# ============================================================
whole_body_pos = {"whole_body": joint_target_position}
client.set_joint_positions(whole_body_pos)
def joystick_listener():
global joystick, axis_left, axis_right
while True:
pygame.event.get()
axis_left = joystick.get_axis(0), joystick.get_axis(1)
axis_right = joystick.get_axis(3), 0
time.sleep(0.02)
def torch_quat_rotate_inverse(q, v):
"""
Rotate a vector (tensor) by the inverse of a quaternion (tensor).
:param q: A quaternion tensor in the form of [x, y, z, w] in shape of [N, 4].
:param v: A vector tensor in the form of [x, y, z] in shape of [N, 3].
:return: The rotated vector tensor in shape of [N, 3].
"""
q_w = q[:, -1:]
q_vec = q[:, :3]
# Compute the dot product of q_vec and v
q_vec_dot_v = torch.bmm(q_vec.view(-1, 1, 3), v.view(-1, 3, 1)).squeeze(-1)
# Compute the cross product of q_vec and v
q_vec_cross_v = torch.cross(q_vec, v, dim=-1)
# Compute the rotated vector
a = v * (2.0 * q_w ** 2 - 1.0)
b = q_vec_cross_v * q_w * 2.0
c = q_vec * q_vec_dot_v * 2.0
return a - b + c
def main():
# ============================================================
# 1. Global variable initialization
# ============================================================
global policy_file_path, policy_model, joystick, axis_left, axis_right, commands_filtered
joystick = None
axis_left = (0.0, 0.0)
axis_right = (0.0, 0.0)
commands_filtered = numpy.array([0.0, 0.0, 0.0])
# ============================================================
# 2. Initialize joystick and start listener thread
# ============================================================
pygame.init()
pygame.joystick.init()
joystick_count = pygame.joystick.get_count()
if joystick_count == 0:
# No joystick detected → exit the program
print("❌ ❌ Error: A joystick is required")
pygame.quit()
client.close()
sys.exit(1)
else:
print(f"✅ Detected {joystick_count} joystick(s")
joystick = pygame.joystick.Joystick(0)
joystick.init()
thread_joystick_listener = threading.Thread(target=joystick_listener)
thread_joystick_listener.start()
# ============================================================
# 3. Switch FSM states
# ============================================================
# Switch to PD-Stand mode (FSM state 2)
input("Please Press Enter to switch to FSM state 2 (PDstand)")
client.set_fsm_state(2)
# After robot stabilizes, switch to user command task mode (FSM state 10)
input("After the robot is standing firmly on the ground, Please press Enter to switch to FSM state 10 (User command task) ")
client.set_fsm_state(10)
time.sleep(0.5)
# ============================================================
# 4. Motor parameter configuration
# ============================================================
kp_config = {
"left_leg": [180.0, 120.0, 90.0, 120.0, 45.0, 45.0],
"right_leg": [180.0, 120.0, 90.0, 120.0, 45.0, 45.0],
"waist": [90.0],
"left_manipulator": [90.0, 45.0, 45.0, 45.0, 45.0],
"right_manipulator": [90.0, 45.0, 45.0, 45.0, 45.0]
}
kd_config = {
"left_leg": [10.0, 10.0, 8.0, 8.0, 2.5, 2.5],
"right_leg": [10.0, 10.0, 8.0, 8.0, 2.5, 2.5],
"waist": [8.0],
"left_manipulator": [ 8.0, 2.5, 2.5, 2.5, 2.5],
"right_manipulator": [8.0, 2.5, 2.5, 2.5, 2.5]
}
client.set_motor_cfg(kp_config, kd_config)
# ============================================================
# 5. Load policy model and start control loop
# ============================================================
policy_file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"policy_jit_walk.pt",
)
policy_model = torch.jit.load(policy_file_path, map_location=torch.device('cpu'))
# Configure robot control frequency
target_control_frequency = 50
target_control_period_in_s = 1.0 / target_control_frequency
schedule(algorithm, interval=target_control_period_in_s)
run_loop()
if __name__ == "__main__":
main()
This example completely demonstrates the workflow of deploying a reinforcement learning walking policy in the UserCmd state, including key steps such as FSM state switching, PD parameter configuration, joystick input listening, robot state acquisition, observation construction, model inference, and joint control command sending. Users can refer to this example code to develop and deploy their own walking policies.
Running Effect
