first commit

This commit is contained in:
“jiawei” 2025-08-09 15:44:49 +08:00
commit bf461ab934
51 changed files with 3408 additions and 0 deletions

View File

@ -0,0 +1,4 @@
__pycache__/
mini_bdx_runtime.egg-info/
build
dist

View File

@ -0,0 +1,182 @@
# Open Duck Mini Runtime
## Raspberry Pi zero 2W setup
### Install Raspberry Pi OS
Download Raspberry Pi OS Lite (64-bit) from here : https://www.raspberrypi.com/software/operating-systems/
Follow the instructions here to install the OS on the SD card : https://www.raspberrypi.com/documentation/computers/getting-started.html
With the Raspberry Pi Imager, you can pre-configure session, wifi and ssh. Do it like below :
![imager_setup](https://github.com/user-attachments/assets/7a4987b2-de83-41dd-ab7f-585259685f16)
> Tip: I configure the rasp to connect to my phone's hotspot, this way I can connect to it from anywhere.
### Setup SSH (If not setup during the installation)
When first booting on the rasp, you will need to connect a screen and a keyboard. The first thing you should do is connect to a wifi network and enable SSH.
To do so, you can follow this guide : https://www.raspberrypi.com/documentation/computers/configuration.html#setting-up-wifi
Then, you can connect to your rasp using SSH without having to plug a screen and a keyboard.
### Update the system and install necessary stuff
```bash
sudo apt update
sudo apt upgrade
sudo apt install git
sudo apt install python3-pip
sudo apt install python3-virtualenvwrapper
(optional) sudo apt install python3-picamzero
```
Add this to the end of the `.bashrc`:
```bash
export WORKON_HOME=$HOME/.virtualenvs
export PROJECT_HOME=$HOME/Devel
source /usr/share/virtualenvwrapper/virtualenvwrapper.sh
```
### Enable I2C
`sudo raspi-config` -> `Interface Options` -> `I2C`
TODO set 400KHz ?
### Set the usbserial latency timer
```bash
cd /etc/udev/rules.d/
sudo touch 99-usb-serial.rules
sudo nano 99-usb-serial.rules
# copy the following line in the file
SUBSYSTEM=="usb-serial", DRIVER=="ftdi_sio", ATTR{latency_timer}="1"
```
### Set the udev rules for the motor control board
TODO
### Setup xbox one controller over bluetooth
Turn your xbox one controller on and set it in pairing mode by long pressing the sync button on the top of the controller.
Run the following commands on the rasp :
```bash
bluetoothctl
scan on
```
Wait for the controller to appear in the list, then run :
```bash
pair <controller_mac_address>
trust <controller_mac_address>
connect <controller_mac_address>
```
The led on the controller should stop blinking and stay on.
You can test that it's working by running
```bash
python3 mini_bdx_runtime/mini_bdx_runtime/xbox_controller.py
```
## Speaker wiring and configuration
Follow this tutorial
> For now, don't activate `/dev/zero` when they ask
https://learn.adafruit.com/adafruit-max98357-i2s-class-d-mono-amp?view=all
## Install the runtime
### Make a virtual environment and activate it
```bash
mkvirtualenv -p python3 open-duck-mini-runtime
workon open-duck-mini-runtime
```
Clone this repository on your rasp, cd into the repo, then :
```bash
git clone https://github.com/apirrone/Open_Duck_Mini_Runtime
cd Open_Duck_Mini_Runtime
git checkout v2
pip install -e .
```
In Raspberry Pi 5, you need to perform the following operations
```bash
pip uninstall -y RPi.GPIO
pip install lgpio
```
## Test the IMU
```bash
python3 mini_bdx_runtime/mini_bdx_runtime/raw_imu.py
```
You can also run `python3 scripts/imu_server.py` on the robot and `python3 scripts/imu_client.py --ip <robot_ip>` on your computer to check that the frame is oriented correctly.
> To find the ip address of the robot, run `ifconfig` on the robot
## Test motors
This will allow you to verify all your motors are connected and configured.
```bash
python3 scripts/check_motors.py
```
## Make your duck_config.json
Copy `example_config.json` in the home directory of your duck and rename it `duck_config.json`.
`cp example_config.json ~/duck_config.json`
In this file, you can configure some stuff, like registering if you installed the expression features, installed the imu upside down or and other stuff. You also write the joints offsets of your duck here
## Find the joints offsets
This script will guide you through finding the joints offsets of your robot that you can then write in your `duck_config.json`
> This procedure won't be necessary in the future as we will be flashing the offsets directly in each motor's eeprom.
```bash
cd scripts/
python find_soft_offsets.py
```
## Run the walk !
Download the [latest policy checkpoint ](https://github.com/apirrone/Open_Duck_Mini/blob/v2/BEST_WALK_ONNX_2.onnx) and copy it to your duck.
`cd scripts/`
`python v2_rl_walk_mujoco.py --onnx_model_path <path_to>/BEST_WALK_ONNX_2.onnx`
```
- The commands are :
- A to pause/unpause
- X to turn on/off the projector
- B to play a random sound
- Y to turn on/off head control (very experimental, I don't recommend trying that, it can break your duck's head)
- left and right triggers to control the left and right antennas
- LB (new!) press and hold to increase the walking frequency, kind of a sprint mode 🙂
```

View File

@ -0,0 +1,3 @@
- [] Better handle xbox controller
- It's a little bit of a mess right now, how we handle directions and buttons etc
- [] Make the offsets flashing work. This will be in the motor configuration script

View File

@ -0,0 +1,15 @@
# Checklist
To check that all works and is properly configured before running a policy.
TODO (Antoine)
Make a script that goes through all this automatically
- joints positions and offsets
- imu orientation
- feet switches
make optional
- eyes/projector
- antennas
- speaker
- camera

View File

@ -0,0 +1,29 @@
{
"start_paused": false,
"imu_upside_down": false,
"phase_frequency_factor_offset": 0.0,
"expression_features": {
"eyes": false,
"projector": false,
"antennas": false,
"speaker": false,
"microphone": false,
"camera": false
},
"joints_offsets": {
"left_hip_yaw": 0.0,
"left_hip_roll": 0.0,
"left_hip_pitch": 0.0,
"left_knee": 0.0,
"left_ankle": 0.0,
"neck_pitch": 0.0,
"head_pitch": 0.0,
"head_yaw": 0.0,
"head_roll": 0.00,
"right_hip_yaw": 0.0,
"right_hip_roll": 0.0,
"right_hip_pitch": 0.0,
"right_knee": 0.0,
"right_ankle": 0.0
}
}

View File

@ -0,0 +1,64 @@
import board
import pwmio
import math
import time
LEFT_ANTENNA_PIN = board.D13
RIGHT_ANTENNA_PIN = board.D12
LEFT_SIGN = 1
RIGHT_SIGN = -1
MIN_UPDATE_INTERVAL = 1 / 50 # 20ms
def value_to_duty_cycle(v):
pulse_width_ms = 1.5 + (v * 0.5) # 1ms to 2ms
duty_cycle = int((pulse_width_ms / 20) * 65535)
return min(max(duty_cycle, 3277), 6553)
class Antennas:
def __init__(self):
neutral_duty = value_to_duty_cycle(0)
self.pwm_left = pwmio.PWMOut(LEFT_ANTENNA_PIN, frequency=50, duty_cycle=neutral_duty)
self.pwm_right = pwmio.PWMOut(RIGHT_ANTENNA_PIN, frequency=50, duty_cycle=neutral_duty)
def set_position_left(self, position):
self.set_position(self.pwm_left, position, LEFT_SIGN)
def set_position_right(self, position):
self.set_position(self.pwm_right, position, RIGHT_SIGN)
def set_position(self, pwm, value, sign=1):
# if value == 0:
# return
if -1 <= value <= 1:
duty_cycle = value_to_duty_cycle(value * sign) # Convert value to duty cycle (1ms-2ms)
pwm.duty_cycle = duty_cycle
else:
print("Invalid input! Enter a value between -1 and 1.")
def stop(self):
time.sleep(MIN_UPDATE_INTERVAL)
self.set_position_left(0)
self.set_position_right(0)
time.sleep(MIN_UPDATE_INTERVAL)
self.pwm_left.deinit()
self.pwm_right.deinit()
if __name__ == "__main__":
antennas = Antennas()
try:
start_time = time.monotonic()
current_time = start_time
while current_time - start_time < 5:
value = math.sin(2 * math.pi * 1 * current_time)
antennas.set_position_left(value)
antennas.set_position_right(value)
time.sleep(MIN_UPDATE_INTERVAL)
current_time = time.monotonic()
finally:
antennas.stop()

View File

@ -0,0 +1,85 @@
import time
class Button:
def __init__(self):
self.last_pressed_time = time.time()
self.timeout = 0.2
self.is_pressed = False
self.triggered = False
self.released = True
def update(self, value):
if self.is_pressed and not value:
self.released = True
self.is_pressed = value
if (
self.released
and self.is_pressed
and (time.time() - self.last_pressed_time > self.timeout)
):
self.triggered = True
self.last_pressed_time = time.time()
else:
self.triggered = False
if self.is_pressed:
self.released = False
class Buttons:
def __init__(self):
self.A = Button()
self.B = Button()
self.X = Button()
self.Y = Button()
self.LB = Button()
self.RB = Button()
self.dpad_up = Button()
self.dpad_down = Button()
def update(self, A, B, X, Y, LB, RB, dpad_up, dpad_down):
self.A.update(A)
self.B.update(B)
self.X.update(X)
self.Y.update(Y)
self.LB.update(LB)
self.RB.update(RB)
self.dpad_up.update(dpad_up)
self.dpad_down.update(dpad_down)
if __name__ == "__main__":
from mini_bdx_runtime.xbox_controller import XBoxController
xbox_controller = XBoxController(30)
buttons = Buttons()
while True:
(
_,
A_pressed,
B_pressed,
X_pressed,
Y_pressed,
LB_pressed,
RB_pressed,
_,
_,
up_down,
) = xbox_controller.get_last_command()
buttons.update(
A_pressed,
B_pressed,
X_pressed,
Y_pressed,
LB_pressed,
RB_pressed,
up_down == 1,
up_down == -1,
)
print(buttons.dpad_up.triggered, buttons.dpad_up.is_pressed, buttons.dpad_down.triggered, buttons.dpad_down.is_pressed)
time.sleep(0.05)

View File

@ -0,0 +1,30 @@
from picamzero import Camera
import cv2
import base64
import os
class Cam:
def __init__(self):
self.cam = Camera()
def get_encoded_image(self):
im = self.cam.capture_array()
im = cv2.resize(im, (512, 512))
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
im = cv2.rotate(im, cv2.ROTATE_90_CLOCKWISE)
cv2.imwrite("/home/bdxv2/aze.jpg", im)
return self.encode_image("/home/bdxv2/aze.jpg")
# def encode_image(self, image):
# return base64.b64encode(image).decode("utf-8")
def encode_image(self, image_path: str):
# check if the image exists
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")

View File

@ -0,0 +1,83 @@
import json
from typing import Optional
import os
HOME_DIR = os.path.expanduser("~")
class DuckConfig:
def __init__(
self,
config_json_path: Optional[str] = f"{HOME_DIR}/duck_config.json",
ignore_default: bool = False,
):
"""
Looks for duck_config.json in the home directory by default.
If not found, uses default values.
"""
self.default = False
try:
self.json_config = (
json.load(open(config_json_path, "r")) if config_json_path else {}
)
except FileNotFoundError:
print(
f"Warning : didn't find the config json file at {config_json_path}, using default values"
)
self.json_config = {}
self.default = True
if config_json_path is None:
print("Warning : didn't provide a config json path, using default values")
self.default = True
if self.default and not ignore_default:
print("")
print("")
print("")
print("")
print("======")
print(
"WARNING : Running with default values probably won't work well. Please make a duck_config.json file and set the parameters."
)
res = input("Do you still want to run ? (y/N)")
if res.lower() != "y":
print("Exiting...")
exit(1)
self.start_paused = self.json_config.get("start_paused", False)
self.imu_upside_down = self.json_config.get("imu_upside_down", False)
self.phase_frequency_factor_offset = self.json_config.get(
"phase_frequency_factor_offset", 0.0
)
expression_features = self.json_config.get("expression_features", {})
self.eyes = expression_features.get("eyes", False)
self.projector = expression_features.get("projector", False)
self.antennas = expression_features.get("antennas", False)
self.speaker = expression_features.get("speaker", False)
self.microphone = expression_features.get("microphone", False)
self.camera = expression_features.get("camera", False)
# default joints offsets are 0.0
self.joints_offset = self.json_config.get(
"joints_offsets",
{
"left_hip_yaw": 0.0,
"left_hip_roll": 0.0,
"left_hip_pitch": 0.0,
"left_knee": 0.0,
"left_ankle": 0.0,
"neck_pitch": 0.0,
"head_pitch": 0.0,
"head_yaw": 0.0,
"head_roll": 0.00,
"right_hip_yaw": 0.0,
"right_hip_roll": 0.0,
"right_hip_pitch": 0.0,
"right_knee": 0.0,
"right_ankle": 0.0,
},
)

View File

@ -0,0 +1,58 @@
import board
import digitalio
import random
import time
from threading import Thread, Event
LEFT_EYE_PIN = board.D24
RIGHT_EYE_PIN = board.D23
class Eyes:
def __init__(self, blink_duration=0.1, min_interval=1.0, max_interval=4.0):
self.left_eye = digitalio.DigitalInOut(LEFT_EYE_PIN)
self.left_eye.direction = digitalio.Direction.OUTPUT
self.right_eye = digitalio.DigitalInOut(RIGHT_EYE_PIN)
self.right_eye.direction = digitalio.Direction.OUTPUT
self.blink_duration = blink_duration
self.min_interval = min_interval
self.max_interval = max_interval
self._stop_event = Event()
self._thread = Thread(target=self.run, daemon=True)
self._thread.start()
def _set_eyes(self, state):
self.left_eye.value = state
self.right_eye.value = state
def run(self):
try:
while not self._stop_event.is_set():
self._set_eyes(False)
time.sleep(self.blink_duration)
self._set_eyes(True)
next_blink = random.uniform(self.min_interval, self.max_interval)
time.sleep(next_blink)
except Exception as err:
print(f"Error in eye thread: {err}")
self._stop_event.set()
def stop(self):
self._stop_event.set()
self._thread.join()
self._set_eyes(False)
self.left_eye.deinit()
self.right_eye.deinit()
if __name__ == "__main__":
e = Eyes()
try:
while True:
time.sleep(1)
finally:
e.stop()

View File

@ -0,0 +1,34 @@
import board
import digitalio
import time
LEFT_FOOT_PIN = board.D22
RIGHT_FOOT_PIN = board.D27
class FeetContacts:
def __init__(self):
self.left_foot = digitalio.DigitalInOut(LEFT_FOOT_PIN)
self.left_foot.direction = digitalio.Direction.INPUT
self.left_foot.pull = digitalio.Pull.UP
self.right_foot = digitalio.DigitalInOut(RIGHT_FOOT_PIN)
self.right_foot.direction = digitalio.Direction.INPUT
self.right_foot.pull = digitalio.Pull.UP
def get(self):
left = not self.left_foot.value
right = not self.right_foot.value
return [left, right]
def stop(self):
self.left_foot.deinit()
self.right_foot.deinit()
if __name__ == "__main__":
feet_contacts = FeetContacts()
try:
while True:
print(feet_contacts.get())
time.sleep(0.05)
finally:
feet_contacts.stop()

View File

@ -0,0 +1,161 @@
import adafruit_bno055
import board
import busio
import numpy as np
import pickle
import os
# import serial
from queue import Queue
from threading import Thread
import time
from scipy.spatial.transform import Rotation as R
# TODO filter spikes
class Imu:
def __init__(
self, sampling_freq, user_pitch_bias=0, calibrate=False, upside_down=True
):
self.sampling_freq = sampling_freq
self.user_pitch_bias = user_pitch_bias
self.nominal_pitch_bias = 0
self.calibrate = calibrate
# self.uart = serial.Serial("/dev/ttyS0", baudrate=9600)
# self.imu = adafruit_bno055.BNO055_UART(self.uart)
i2c = busio.I2C(board.SCL, board.SDA)
self.imu = adafruit_bno055.BNO055_I2C(i2c)
self.imu.mode = adafruit_bno055.IMUPLUS_MODE
# self.imu.mode = adafruit_bno055.ACCGYRO_MODE
# self.imu.mode = adafruit_bno055.GYRONLY_MODE
# self.imu.mode = adafruit_bno055.NDOF_MODE
# self.imu.mode = adafruit_bno055.NDOF_FMC_OFF_MODE
if upside_down:
self.imu.axis_remap = (
adafruit_bno055.AXIS_REMAP_Y,
adafruit_bno055.AXIS_REMAP_X,
adafruit_bno055.AXIS_REMAP_Z,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
)
else:
self.imu.axis_remap = (
adafruit_bno055.AXIS_REMAP_Y,
adafruit_bno055.AXIS_REMAP_X,
adafruit_bno055.AXIS_REMAP_Z,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
adafruit_bno055.AXIS_REMAP_POSITIVE,
adafruit_bno055.AXIS_REMAP_POSITIVE,
)
self.pitch_bias = self.nominal_pitch_bias + self.user_pitch_bias
if self.calibrate:
self.imu.mode = adafruit_bno055.NDOF_MODE
calibrated = self.imu.calibrated
while not calibrated:
print("Calibration status: ", self.imu.calibration_status)
print("Calibrated : ", self.imu.calibrated)
calibrated = self.imu.calibrated
time.sleep(0.1)
print("CALIBRATION DONE")
offsets_accelerometer = self.imu.offsets_accelerometer
offsets_gyroscope = self.imu.offsets_gyroscope
offsets_magnetometer = self.imu.offsets_magnetometer
imu_calib_data = {
"offsets_accelerometer": offsets_accelerometer,
"offsets_gyroscope": offsets_gyroscope,
"offsets_magnetometer": offsets_magnetometer,
}
for k, v in imu_calib_data.items():
print(k, v)
pickle.dump(imu_calib_data, open("imu_calib_data.pkl", "wb"))
print("Saved", "imu_calib_data.pkl")
exit()
if os.path.exists("imu_calib_data.pkl"):
imu_calib_data = pickle.load(open("imu_calib_data.pkl", "rb"))
self.imu.mode = adafruit_bno055.CONFIG_MODE
time.sleep(0.1)
self.imu.offsets_accelerometer = imu_calib_data["offsets_accelerometer"]
self.imu.offsets_gyroscope = imu_calib_data["offsets_gyroscope"]
self.imu.offsets_magnetometer = imu_calib_data["offsets_magnetometer"]
self.imu.mode = adafruit_bno055.IMUPLUS_MODE
time.sleep(0.1)
else:
print("imu_calib_data.pkl not found")
print("Imu is running uncalibrated")
self.last_imu_data = [0, 0, 0, 0]
self.imu_queue = Queue(maxsize=1)
Thread(target=self.imu_worker, daemon=True).start()
def convert_axes(self, euler):
euler = [np.pi + euler[1], euler[0], euler[2]]
return euler
def imu_worker(self):
while True:
s = time.time()
try:
# imu returns scalar first
raw_orientation = np.array(self.imu.quaternion).copy() # quat
euler = (
R.from_quat(raw_orientation, scalar_first=True)
.as_euler("xyz")
.copy()
)
except Exception as e:
print("[IMU]:", e)
continue
# Converting to correct axes
# euler = self.convert_axes(euler)
euler[1] -= np.deg2rad(self.pitch_bias)
# euler[2] = 0 # ignoring yaw
# gives scalar last, which is what isaac wants
final_orientation_quat = R.from_euler("xyz", euler).as_quat()
self.imu_queue.put(final_orientation_quat.copy())
took = time.time() - s
time.sleep(max(0, 1 / self.sampling_freq - took))
def get_data(self, euler=False, mat=False):
try:
self.last_imu_data = self.imu_queue.get(False) # non blocking
except Exception:
pass
try:
if not euler and not mat:
return self.last_imu_data
elif euler:
return R.from_quat(self.last_imu_data).as_euler("xyz")
elif mat:
return R.from_quat(self.last_imu_data).as_matrix()
except Exception as e:
print("[IMU]: ", e)
return None
if __name__ == "__main__":
imu = Imu(50, calibrate=True, upside_down=False)
# imu = Imu(50, upside_down=False)
while True:
data = imu.get_data()
# print(data)
print("gyro", np.around(data["gyro"], 3))
print("accelero", np.around(data["accelero"], 3))
print("---")
time.sleep(1 / 25)

View File

@ -0,0 +1,43 @@
import onnxruntime
class OnnxInfer:
def __init__(self, onnx_model_path, input_name="obs", awd=False):
self.onnx_model_path = onnx_model_path
self.ort_session = onnxruntime.InferenceSession(
self.onnx_model_path, providers=["CPUExecutionProvider"]
)
self.input_name = input_name
self.awd = awd
def infer(self, inputs):
if self.awd:
outputs = self.ort_session.run(None, {self.input_name: [inputs]})
return outputs[0][0]
else:
outputs = self.ort_session.run(
None, {self.input_name: inputs.astype("float32")}
)
return outputs[0]
if __name__ == "__main__":
import argparse
import numpy as np
import time
parser = argparse.ArgumentParser()
parser.add_argument("-o", "--onnx_model_path", type=str, required=True)
args = parser.parse_args()
oi = OnnxInfer(args.onnx_model_path, awd=True)
inputs = np.random.uniform(size=54).astype(np.float32)
inputs = np.arange(47).astype(np.float32)
times = []
for i in range(1000):
start = time.time()
print(oi.infer(inputs))
times.append(time.time() - start)
print("Average time: ", sum(times) / len(times))
print("Average fps: ", 1 / (sum(times) / len(times)))

View File

@ -0,0 +1,118 @@
import numpy as np
import pickle
class PolyReferenceMotion:
def __init__(self, polynomial_coefficients: str):
data = pickle.load(open(polynomial_coefficients, "rb"))
self.dx_range = [0, 0]
self.dy_range = [0, 0]
self.dtheta_range = [0, 0]
self.dxs = []
self.dys = []
self.dthetas = []
self.data_array = []
self.period = None
self.fps = None
self.frame_offsets = None
self.startend_double_support_ratio = None
self.start_offset = None
self.nb_steps_in_period = None
self.process(data)
def process(self, data):
print("[Poly ref data] Processing ...")
_data = {}
for name in data.keys():
split = name.split("_")
dx = float(split[0])
dy = float(split[1])
dtheta = float(split[2])
if self.period is None:
self.period = data[name]["period"]
self.fps = data[name]["fps"]
self.frame_offsets = data[name]["frame_offsets"]
self.startend_double_support_ratio = data[name][
"startend_double_support_ratio"
]
self.start_offset = int(self.startend_double_support_ratio * self.fps)
self.nb_steps_in_period = int(self.period * self.fps)
if dx not in self.dxs:
self.dxs.append(dx)
if dy not in self.dys:
self.dys.append(dy)
if dtheta not in self.dthetas:
self.dthetas.append(dtheta)
self.dx_range = [min(dx, self.dx_range[0]), max(dx, self.dx_range[1])]
self.dy_range = [min(dy, self.dy_range[0]), max(dy, self.dy_range[1])]
self.dtheta_range = [
min(dtheta, self.dtheta_range[0]),
max(dtheta, self.dtheta_range[1]),
]
if dx not in _data:
_data[dx] = {}
if dy not in _data[dx]:
_data[dx][dy] = {}
if dtheta not in _data[dx][dy]:
_data[dx][dy][dtheta] = data[name]
_coeffs = data[name]["coefficients"]
coeffs = []
for k, v in _coeffs.items():
coeffs.append(v)
_data[dx][dy][dtheta] = coeffs
self.dxs = sorted(self.dxs)
self.dys = sorted(self.dys)
self.dthetas = sorted(self.dthetas)
nb_dx = len(self.dxs)
nb_dy = len(self.dys)
nb_dtheta = len(self.dthetas)
self.data_array = nb_dx * [None]
for x, dx in enumerate(self.dxs):
self.data_array[x] = nb_dy * [None]
for y, dy in enumerate(self.dys):
self.data_array[x][y] = nb_dtheta * [None]
for th, dtheta in enumerate(self.dthetas):
self.data_array[x][y][th] = _data[dx][dy][dtheta]
self.data_array = self.data_array
print("[Poly ref data] Done processing")
def vel_to_index(self, dx, dy, dtheta):
dx = np.clip(dx, self.dx_range[0], self.dx_range[1])
dy = np.clip(dy, self.dy_range[0], self.dy_range[1])
dtheta = np.clip(dtheta, self.dtheta_range[0], self.dtheta_range[1])
ix = np.argmin(np.abs(np.array(self.dxs) - dx))
iy = np.argmin(np.abs(np.array(self.dys) - dy))
itheta = np.argmin(np.abs(np.array(self.dthetas) - dtheta))
return int(ix), int(iy), int(itheta)
def sample_polynomial(self, t, coeffs):
ret = []
for c in coeffs:
ret.append(np.polyval(np.flip(c), t))
return ret
def get_reference_motion(self, dx, dy, dtheta, i):
ix, iy, itheta = self.vel_to_index(dx, dy, dtheta)
t = i % self.nb_steps_in_period / self.nb_steps_in_period
t = np.clip(t, 0.0, 1.0) # safeguard
ret = self.sample_polynomial(t, self.data_array[ix][iy][itheta])
return ret

View File

@ -0,0 +1,31 @@
import board
import digitalio
import time
PROJECTOR_GPIO = board.D25
class Projector:
def __init__(self):
self.project = digitalio.DigitalInOut(PROJECTOR_GPIO)
self.project.direction = digitalio.Direction.OUTPUT
self.on = False
def switch(self):
self.on = not self.on
self.project.value = self.on
def stop(self):
self.project.value = False
self.project.deinit()
if __name__ == "__main__":
p = Projector()
try:
while True:
p.switch()
time.sleep(1)
finally:
p.stop()

View File

@ -0,0 +1,167 @@
import adafruit_bno055
import board
import busio
import numpy as np
import os
import pickle
from queue import Queue
from threading import Thread
import time
# TODO filter spikes
class Imu:
def __init__(
self, sampling_freq, user_pitch_bias=0, calibrate=False, upside_down=True
):
self.sampling_freq = sampling_freq
self.calibrate = calibrate
i2c = busio.I2C(board.SCL, board.SDA)
self.imu = adafruit_bno055.BNO055_I2C(i2c)
# self.imu.mode = adafruit_bno055.IMUPLUS_MODE
# self.imu.mode = adafruit_bno055.ACCGYRO_MODE
# self.imu.mode = adafruit_bno055.GYRONLY_MODE
self.imu.mode = adafruit_bno055.NDOF_MODE
# self.imu.mode = adafruit_bno055.NDOF_FMC_OFF_MODE
if upside_down:
self.imu.axis_remap = (
adafruit_bno055.AXIS_REMAP_Y,
adafruit_bno055.AXIS_REMAP_X,
adafruit_bno055.AXIS_REMAP_Z,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
)
else:
self.imu.axis_remap = (
adafruit_bno055.AXIS_REMAP_Y,
adafruit_bno055.AXIS_REMAP_X,
adafruit_bno055.AXIS_REMAP_Z,
adafruit_bno055.AXIS_REMAP_NEGATIVE,
adafruit_bno055.AXIS_REMAP_POSITIVE,
adafruit_bno055.AXIS_REMAP_POSITIVE,
)
if self.calibrate:
self.imu.mode = adafruit_bno055.NDOF_MODE
calibrated = self.imu.calibrated
while not calibrated:
print("Calibration status: ", self.imu.calibration_status)
print("Calibrated : ", self.imu.calibrated)
calibrated = self.imu.calibrated
time.sleep(0.1)
print("CALIBRATION DONE")
offsets_accelerometer = self.imu.offsets_accelerometer
offsets_gyroscope = self.imu.offsets_gyroscope
offsets_magnetometer = self.imu.offsets_magnetometer
imu_calib_data = {
"offsets_accelerometer": offsets_accelerometer,
"offsets_gyroscope": offsets_gyroscope,
"offsets_magnetometer": offsets_magnetometer,
}
for k, v in imu_calib_data.items():
print(k, v)
pickle.dump(imu_calib_data, open("imu_calib_data.pkl", "wb"))
print("Saved", "imu_calib_data.pkl")
exit()
if os.path.exists("imu_calib_data.pkl"):
imu_calib_data = pickle.load(open("imu_calib_data.pkl", "rb"))
self.imu.mode = adafruit_bno055.CONFIG_MODE
time.sleep(0.1)
self.imu.offsets_accelerometer = imu_calib_data["offsets_accelerometer"]
self.imu.offsets_gyroscope = imu_calib_data["offsets_gyroscope"]
self.imu.offsets_magnetometer = imu_calib_data["offsets_magnetometer"]
self.imu.mode = adafruit_bno055.NDOF_MODE
time.sleep(0.1)
else:
print("imu_calib_data.pkl not found")
print("Imu is running uncalibrated")
self.x_offset = 0
# self.tare_x()
self.last_imu_data = [0, 0, 0, 0]
self.last_imu_data = {
"gyro": [0, 0, 0],
"accelero": [0, 0, 0],
}
self.imu_queue = Queue(maxsize=1)
Thread(target=self.imu_worker, daemon=True).start()
def tare_x(self):
print("Taring x ...")
x_values = []
num_values = 100
ok = False
while not ok:
x_values.append(np.array(self.imu.acceleration)[0])
x_values = x_values[-num_values:]
if len(x_values) == num_values:
mean = np.mean(x_values)
std = np.std(x_values)
if std < 0.05:
ok = True
self.x_offset = mean
print("Tare x done")
else:
print(std)
time.sleep(0.01)
def imu_worker(self):
while True:
s = time.time()
try:
gyro = np.array(self.imu.gyro).copy()
accelero = np.array(self.imu.acceleration).copy()
except Exception as e:
print("[IMU]:", e)
continue
if gyro is None or accelero is None:
continue
if gyro.any() is None or accelero.any() is None:
continue
accelero[0] -= self.x_offset
data = {
"gyro": gyro,
"accelero": accelero,
}
self.imu_queue.put(data)
took = time.time() - s
time.sleep(max(0, 1 / self.sampling_freq - took))
def get_data(self):
try:
self.last_imu_data = self.imu_queue.get(False) # non blocking
except Exception:
pass
return self.last_imu_data
if __name__ == "__main__":
imu = Imu(50, upside_down=False)
while True:
data = imu.get_data()
# print(data)
print("gyro", np.around(data["gyro"], 3))
print("accelero", np.around(data["accelero"], 3))
print("---")
time.sleep(1 / 25)

View File

@ -0,0 +1,161 @@
# This is the joints order when loading using IsaacGymEnvs
# ['left_hip_yaw', 'left_hip_roll', 'left_hip_pitch', 'left_knee', 'left_ankle', 'neck_pitch', 'head_pitch', 'head_yaw', 'left_antenna', 'right_antenna', 'right_hip_yaw', 'right_hip_roll', 'right_hip_pitch', 'right_knee', 'right_ankle']
# This is the "standard" order (from mujoco)
# ['right_hip_yaw', 'right_hip_roll', 'right_hip_pitch', 'right_knee', 'right_ankle', 'left_hip_yaw', 'left_hip_roll', 'left_hip_pitch', 'left_knee', 'left_ankle', 'neck_pitch', 'head_pitch', 'head_yaw', 'left_antenna', 'right_antenna']
#
# We need to reorder the joints to match the IsaacGymEnvs order
#
import numpy as np
mujoco_joints_order = [
"left_hip_yaw",
"left_hip_roll",
"left_hip_pitch",
"left_knee",
"left_ankle",
"neck_pitch",
"head_pitch",
"head_yaw",
"head_roll",
"left_antenna",
"right_antenna",
"right_hip_yaw",
"right_hip_roll",
"right_hip_pitch",
"right_knee",
"right_ankle",
]
isaac_joints_order = [
"left_hip_yaw",
"left_hip_roll",
"left_hip_pitch",
"left_knee",
"left_ankle",
"neck_pitch",
"head_pitch",
"head_yaw",
"head_roll",
"left_antenna",
"right_antenna",
"right_hip_yaw",
"right_hip_roll",
"right_hip_pitch",
"right_knee",
"right_ankle",
]
def isaac_to_mujoco(joints):
new_joints = [
# left leg
joints[0],
joints[1],
joints[2],
joints[3],
joints[4],
# right leg
joints[11],
joints[12],
joints[13],
joints[14],
joints[15],
# head
joints[5],
joints[6],
joints[7],
joints[8],
joints[9],
joints[10],
]
return new_joints
def mujoco_to_isaac(joints):
new_joints = [
# left leg
joints[0],
joints[1],
joints[2],
joints[3],
joints[4],
# head
joints[10],
joints[11],
joints[12],
joints[13],
joints[14],
joints[15],
# right leg
joints[5],
joints[6],
joints[7],
joints[8],
joints[9],
]
return new_joints
# TODO ADD BACK
def action_to_pd_targets(action, offset, scale):
return offset + scale * action
def make_action_dict(action, joints_order):
action_dict = {}
for i, a in enumerate(action):
if "antenna" not in joints_order[i]:
action_dict[joints_order[i]] = a
return action_dict
def quat_rotate_inverse(q, v):
q = np.array(q)
v = np.array(v)
q_w = q[-1]
q_vec = q[:3]
a = v * (2.0 * q_w**2 - 1.0)
b = np.cross(q_vec, v) * q_w * 2.0
c = q_vec * (np.dot(q_vec, v)) * 2.0
return a - b + c
class ActionFilter:
def __init__(self, window_size=10):
self.window_size = window_size
self.action_buffer = []
def push(self, action):
self.action_buffer.append(action)
if len(self.action_buffer) > self.window_size:
self.action_buffer.pop(0)
def get_filtered_action(self):
return np.mean(self.action_buffer, axis=0)
class LowPassActionFilter:
def __init__(self, control_freq, cutoff_frequency=30.0):
self.last_action = 0
self.current_action = 0
self.control_freq = float(control_freq)
self.cutoff_frequency = float(cutoff_frequency)
self.alpha = self.compute_alpha()
def compute_alpha(self):
return (1.0 / self.cutoff_frequency) / (
1.0 / self.control_freq + 1.0 / self.cutoff_frequency
)
def push(self, action):
self.current_action = action
def get_filtered_action(self):
self.last_action = (
self.alpha * self.last_action + (1 - self.alpha) * self.current_action
)
return self.last_action

View File

@ -0,0 +1,166 @@
import time
import numpy as np
import rustypot
from mini_bdx_runtime.duck_config import DuckConfig
class HWI:
def __init__(self, duck_config: DuckConfig, usb_port: str = "/dev/ttyACM0"):
self.duck_config = duck_config
# Order matters here
self.joints = {
"left_hip_yaw": 20,
"left_hip_roll": 21,
"left_hip_pitch": 22,
"left_knee": 23,
"left_ankle": 24,
"neck_pitch": 30,
"head_pitch": 31,
"head_yaw": 32,
"head_roll": 33,
# "left_antenna": None,
# "right_antenna": None,
"right_hip_yaw": 10,
"right_hip_roll": 11,
"right_hip_pitch": 12,
"right_knee": 13,
"right_ankle": 14,
}
self.zero_pos = {
"left_hip_yaw": 0,
"left_hip_roll": 0,
"left_hip_pitch": 0,
"left_knee": 0,
"left_ankle": 0,
"neck_pitch": 0,
"head_pitch": 0,
"head_yaw": 0,
"head_roll": 0,
# "left_antenna":0,
# "right_antenna":0,
"right_hip_yaw": 0,
"right_hip_roll": 0,
"right_hip_pitch": 0,
"right_knee": 0,
"right_ankle": 0,
}
self.init_pos = {
"left_hip_yaw": 0.002,
"left_hip_roll": 0.053,
"left_hip_pitch": -0.63,
"left_knee": 1.368,
"left_ankle": -0.784,
"neck_pitch": 0.0,
"head_pitch": 0.0,
"head_yaw": 0,
"head_roll": 0,
# "left_antenna": 0,
# "right_antenna": 0,
"right_hip_yaw": -0.003,
"right_hip_roll": -0.065,
"right_hip_pitch": 0.635,
"right_knee": 1.379,
"right_ankle": -0.796,
}
self.joints_offsets = self.duck_config.joints_offset
self.kps = np.ones(len(self.joints)) * 32 # default kp
self.kds = np.ones(len(self.joints)) * 0 # default kd
self.low_torque_kps = np.ones(len(self.joints)) * 2
self.io = rustypot.feetech(usb_port, 1000000)
def set_kps(self, kps):
self.kps = kps
self.io.set_kps(list(self.joints.values()), self.kps)
def set_kds(self, kds):
self.kds = kds
self.io.set_kds(list(self.joints.values()), self.kds)
def set_kp(self, id, kp):
self.io.set_kps([id], [kp])
def turn_on(self):
self.io.set_kps(list(self.joints.values()), self.low_torque_kps)
print("turn on : low KPS set")
time.sleep(1)
self.set_position_all(self.init_pos)
print("turn on : init pos set")
time.sleep(1)
self.io.set_kps(list(self.joints.values()), self.kps)
print("turn on : high kps")
def turn_off(self):
self.io.disable_torque(list(self.joints.values()))
def set_position(self, joint_name, pos):
"""
pos is in radians
"""
id = self.joints[joint_name]
pos = pos + self.joints_offsets[joint_name]
self.io.write_goal_position([id], [pos])
def set_position_all(self, joints_positions):
"""
joints_positions is a dictionary with joint names as keys and joint positions as values
Warning: expects radians
"""
ids_positions = {
self.joints[joint]: position + self.joints_offsets[joint]
for joint, position in joints_positions.items()
}
self.io.write_goal_position(
list(self.joints.values()), list(ids_positions.values())
)
def get_present_positions(self, ignore=[]):
"""
Returns the present positions in radians
"""
try:
present_positions = self.io.read_present_position(
list(self.joints.values())
)
except Exception as e:
print(e)
return None
present_positions = [
pos - self.joints_offsets[joint]
for joint, pos in zip(self.joints.keys(), present_positions)
if joint not in ignore
]
return np.array(np.around(present_positions, 3))
def get_present_velocities(self, rad_s=True, ignore=[]):
"""
Returns the present velocities in rad/s (default) or rev/min
"""
try:
present_velocities = self.io.read_present_velocity(
list(self.joints.values())
)
except Exception as e:
print(e)
return None
present_velocities = [
vel
for joint, vel in zip(self.joints.keys(), present_velocities)
if joint not in ignore
]
return np.array(np.around(present_velocities, 3))

View File

@ -0,0 +1,58 @@
import pygame
import time
import os
import random
class Sounds:
def __init__(self, volume=1.0, sound_directory="./"):
pygame.mixer.init()
pygame.mixer.music.set_volume(volume)
self.sounds = {}
self.ok = True
try:
for file in os.listdir(sound_directory):
if file.endswith(".wav"):
sound_path = os.path.join(sound_directory, file)
try:
self.sounds[file] = pygame.mixer.Sound(sound_path)
print(f"Loaded: {file}")
except pygame.error as e:
print(f"Failed to load {file}: {e}")
except FileNotFoundError:
print(f"Directory {sound_directory} not found.")
self.ok = False
if len(self.sounds) == 0:
print("No sound files found in the directory.")
self.ok = False
def play(self, sound_name):
if not self.ok:
print("Sounds not initialized properly.")
return
if sound_name in self.sounds:
self.sounds[sound_name].play()
print(f"Playing: {sound_name}")
else:
print(f"Sound '{sound_name}' not found!")
def play_random_sound(self):
if not self.ok:
print("Sounds not initialized properly.")
return
sound_name = random.choice(list(self.sounds.keys()))
self.sounds[sound_name].play()
def play_happy(self):
self.sounds["happy1.wav"].play()
# Example usage
if __name__ == "__main__":
sound_player = Sounds(1.0, "../assets/")
time.sleep(1)
while True:
# sound_player.play_random_sound()
sound_player.play_happy()
time.sleep(3)

View File

@ -0,0 +1,222 @@
import pygame
from threading import Thread
from queue import Queue
import time
import numpy as np
import sys
sys.path.append('D:\Open_Duck_Mini_Runtime-2\Open_Duck_Mini_Runtime-2\mini_bdx_runtime\mini_bdx_runtime')
from mini_bdx_runtime.buttons import Buttons
X_RANGE = [-0.15, 0.15]
Y_RANGE = [-0.2, 0.2]
YAW_RANGE = [-1.0, 1.0]
# rads
NECK_PITCH_RANGE = [-0.34, 1.1]
HEAD_PITCH_RANGE = [-0.78, 0.3]
HEAD_YAW_RANGE = [-0.5, 0.5]
HEAD_ROLL_RANGE = [-0.5, 0.5]
class XBoxController:
def __init__(self, command_freq, only_head_control=False):
self.command_freq = command_freq
self.head_control_mode = only_head_control
self.only_head_control = only_head_control
self.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.last_left_trigger = 0.0
self.last_right_trigger = 0.0
pygame.init()
self.p1 = pygame.joystick.Joystick(0)
self.p1.init()
print(f"Loaded joystick with {self.p1.get_numaxes()} axes.")
self.cmd_queue = Queue(maxsize=1)
self.A_pressed = False
self.B_pressed = False
self.X_pressed = False
self.Y_pressed = False
self.LB_pressed = False
self.RB_pressed = False
self.buttons = Buttons()
Thread(target=self.commands_worker, daemon=True).start()
def commands_worker(self):
while True:
self.cmd_queue.put(self.get_commands())
time.sleep(1 / self.command_freq)
def get_commands(self):
last_commands = self.last_commands
left_trigger = self.last_left_trigger
right_trigger = self.last_right_trigger
l_x = -1 * self.p1.get_axis(0)
l_y = -1 * self.p1.get_axis(1)
r_x = -1 * self.p1.get_axis(2)
r_y = -1 * self.p1.get_axis(3)
right_trigger = np.around((self.p1.get_axis(4) + 1) / 2, 3)
left_trigger = np.around((self.p1.get_axis(5) + 1) / 2, 3)
if left_trigger < 0.1:
left_trigger = 0
if right_trigger < 0.1:
right_trigger = 0
if not self.head_control_mode:
lin_vel_y = l_x
lin_vel_x = l_y
ang_vel = r_x
if lin_vel_x >= 0:
lin_vel_x *= np.abs(X_RANGE[1])
else:
lin_vel_x *= np.abs(X_RANGE[0])
if lin_vel_y >= 0:
lin_vel_y *= np.abs(Y_RANGE[1])
else:
lin_vel_y *= np.abs(Y_RANGE[0])
if ang_vel >= 0:
ang_vel *= np.abs(YAW_RANGE[1])
else:
ang_vel *= np.abs(YAW_RANGE[0])
last_commands[0] = lin_vel_x
last_commands[1] = lin_vel_y
last_commands[2] = ang_vel
else:
last_commands[0] = 0.0
last_commands[1] = 0.0
last_commands[2] = 0.0
last_commands[3] = 0.0 # neck pitch 0 for now
head_yaw = l_x
head_pitch = l_y
head_roll = r_x
if head_yaw >= 0:
head_yaw *= np.abs(HEAD_YAW_RANGE[0])
else:
head_yaw *= np.abs(HEAD_YAW_RANGE[1])
if head_pitch >= 0:
head_pitch *= np.abs(HEAD_PITCH_RANGE[0])
else:
head_pitch *= np.abs(HEAD_PITCH_RANGE[1])
if head_roll >= 0:
head_roll *= np.abs(HEAD_ROLL_RANGE[0])
else:
head_roll *= np.abs(HEAD_ROLL_RANGE[1])
last_commands[4] = head_pitch
last_commands[5] = head_yaw
last_commands[6] = head_roll
for event in pygame.event.get():
if event.type == pygame.JOYBUTTONDOWN:
if self.p1.get_button(0): # A button
self.A_pressed = True
if self.p1.get_button(1): # B button
self.B_pressed = True
if self.p1.get_button(3): # X button
self.X_pressed = True
if self.p1.get_button(4): # Y button
self.Y_pressed = True
if not self.only_head_control:
self.head_control_mode = not self.head_control_mode
if self.p1.get_button(6): # LB button
self.LB_pressed = True
if self.p1.get_button(7): # RB button
self.RB_pressed = True
if event.type == pygame.JOYBUTTONUP:
self.A_pressed = False
self.B_pressed = False
self.X_pressed = False
self.Y_pressed = False
self.LB_pressed = False
self.RB_pressed = False
# for i in range(10):
# if self.p1.get_button(i):
# print(f"Button {i} pressed")
up_down = self.p1.get_hat(0)[1]
pygame.event.pump() # process event queue
return (
np.around(last_commands, 3),
self.A_pressed,
self.B_pressed,
self.X_pressed,
self.Y_pressed,
self.LB_pressed,
self.RB_pressed,
left_trigger,
right_trigger,
up_down,
)
def get_last_command(self):
A_pressed = False
B_pressed = False
X_pressed = False
Y_pressed = False
LB_pressed = False
RB_pressed = False
up_down = 0
try:
(
self.last_commands,
A_pressed,
B_pressed,
X_pressed,
Y_pressed,
LB_pressed,
RB_pressed,
self.last_left_trigger,
self.last_right_trigger,
up_down,
) = self.cmd_queue.get(
False
) # non blocking
except Exception:
pass
self.buttons.update(
A_pressed,
B_pressed,
X_pressed,
Y_pressed,
LB_pressed,
RB_pressed,
up_down == 1,
up_down == -1,
)
return (
self.last_commands,
self.buttons,
self.last_left_trigger,
self.last_right_trigger,
)
if __name__ == "__main__":
controller = XBoxController(20)
while True:
print(controller.get_last_command())
time.sleep(0.05)

View File

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

View File

@ -0,0 +1,18 @@
from mini_bdx_runtime.xbox_controller import XBoxController
from mini_bdx_runtime.antennas import Antennas
import time
controller = XBoxController(60)
antennas = Antennas()
while True:
_, _, left_trigger, right_trigger = controller.get_last_command()
antennas.set_position_left(right_trigger)
antennas.set_position_right(left_trigger)
# print(left_trigger, right_trigger)
time.sleep(1 / 50)

View File

@ -0,0 +1,5 @@
from mini_bdx_runtime.raw_imu import Imu
if __name__ == "__main__":
imu = Imu(50, calibrate=True, upside_down=False)

View File

@ -0,0 +1,18 @@
from picamzero import Camera
import cv2
print("Initializing camera ...")
cam = Camera()
# cam.still_size = (512, 512)
print("Camera initialized")
im = cam.capture_array()
im = cv2.resize(im, (512, 512))
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
cv2.imwrite("/home/bdxv2/aze.jpg", im)
# cv2.imshow("Image", im)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
# cam.take_photo("/home/bdxv2/aze.jpg")

View File

@ -0,0 +1,168 @@
"""
Debug script to check all motors in the robot.
Verifies each motor is accessible and allows testing movement.
"""
from mini_bdx_runtime.rustypot_position_hwi import HWI
from mini_bdx_runtime.duck_config import DuckConfig
import time
import numpy as np
import traceback
def main():
print("Initializing hardware interface...")
try:
# Initialize with default USB port - you might need to modify this
print("Attempting to connect to motor controller...")
duck_config = DuckConfig() # Create default configuration
# Initialize with duck_config
print("Attempting to connect to motor controller...")
hwi = HWI(duck_config=duck_config)
print("Successfully connected to hardware!")
except Exception as e:
print(f"Error connecting to hardware: {e}")
print(f"Error details: {traceback.format_exc()}")
print("Check that the robot is powered on and USB connection is correct.")
return
# Turn on with low torque for safety - ONE BY ONE
print("\nTurning on motors with low torque (one by one)...")
unresponsive_motors = []
for joint_name, joint_id in hwi.joints.items():
try:
print(f"Setting low torque for motor '{joint_name}' (ID: {joint_id})...")
hwi.io.set_kps([joint_id], [hwi.low_torque_kps[0]])
print(f"✓ Low torque set successfully for motor '{joint_name}' (ID: {joint_id}).")
except Exception as e:
print(f"✗ Error setting low torque for motor '{joint_name}' (ID: {joint_id}): {e}")
print(f"Error details: {traceback.format_exc()}")
unresponsive_motors.append((joint_name, joint_id))
# Check if all motors are responsive
print("\nChecking if all motors are responsive...")
for joint_name, joint_id in hwi.joints.items():
# Skip motors that already failed
if (joint_name, joint_id) in unresponsive_motors:
print(f"Skipping previously unresponsive motor: '{joint_name}' (ID: {joint_id})")
continue
print(f"Attempting to read position from motor '{joint_name}' (ID: {joint_id})...")
try:
# Try to read the position to check if motor is responsive
position = hwi.io.read_present_position([joint_id])
print(f"✓ Motor '{joint_name}' (ID: {joint_id}) is responsive. Position: {position[0]:.3f}")
except Exception as e:
print(f"✗ Error accessing motor '{joint_name}' (ID: {joint_id}): {e}")
print(f"Error details for motor {joint_id}: {traceback.format_exc()}")
unresponsive_motors.append((joint_name, joint_id))
if unresponsive_motors:
print("\nWARNING: Some motors are not responsive!")
print("Unresponsive motors:", unresponsive_motors)
continue_anyway = input("Do you want to continue anyway? (y/n): ").lower()
if continue_anyway != 'y':
print("Exiting...")
try:
print("Attempting to turn off responsive motors before exiting...")
for joint_name, joint_id in hwi.joints.items():
if (joint_name, joint_id) not in unresponsive_motors:
try:
hwi.io.disable_torque([joint_id])
print(f"Disabled torque for motor '{joint_name}' (ID: {joint_id})")
except:
pass
except:
pass
return
# Test moving each motor individually
print("\n--- Motor Movement Test ---")
print("This will move each motor by a small amount to check if it's working correctly.")
input("Press Enter to begin the movement test...")
for joint_name, joint_id in hwi.joints.items():
# Skip unresponsive motors
if (joint_name, joint_id) in unresponsive_motors:
print(f"Skipping unresponsive motor: '{joint_name}' (ID: {joint_id})")
continue
print(f"\nTesting motor: '{joint_name}' (ID: {joint_id})")
test_this_motor = input(f"Test this motor? (Enter/y for yes, n to skip, q to quit): ").lower()
if test_this_motor == 'q':
print("Exiting movement test...")
break
if test_this_motor == 'n':
print(f"Skipping '{joint_name}' (ID: {joint_id})")
continue
try:
# Get current position
print(f"Reading current position from motor '{joint_name}' (ID: {joint_id})...")
current_position = hwi.io.read_present_position([joint_id])[0]
print(f"Current position: {current_position:.3f}")
# Calculate test position (move by 0.1 radians)
test_position = current_position + 0.1
# Move to test position
print(f"Moving motor '{joint_name}' (ID: {joint_id}) to test position: {test_position:.3f}...")
hwi.io.write_goal_position([joint_id], [test_position])
time.sleep(1) # Wait for movement
# Read new position
print(f"Reading new position from motor '{joint_name}' (ID: {joint_id})...")
new_position = hwi.io.read_present_position([joint_id])[0]
print(f"New position: {new_position:.3f}")
# Return to original position
print(f"Returning motor '{joint_name}' (ID: {joint_id}) to original position...")
hwi.io.write_goal_position([joint_id], [current_position])
time.sleep(1) # Wait for movement
# No confirmation question, just assume success
print(f"✓ Motor '{joint_name}' (ID: {joint_id}) movement test completed.")
except Exception as e:
print(f"Error testing motor '{joint_name}' (ID: {joint_id}): {e}")
print(f"Error details: {traceback.format_exc()}")
# Turn off motors
print("\nTurning off motors one by one...")
for joint_name, joint_id in hwi.joints.items():
if (joint_name, joint_id) in unresponsive_motors:
print(f"Skipping turning off unresponsive motor: '{joint_name}' (ID: {joint_id})")
continue
try:
print(f"Disabling torque for motor '{joint_name}' (ID: {joint_id})...")
hwi.io.disable_torque([joint_id])
print(f"✓ Motor '{joint_name}' (ID: {joint_id}) turned off successfully.")
except Exception as e:
print(f"✗ Error turning off motor '{joint_name}' (ID: {joint_id}): {e}")
print(f"Error details: {traceback.format_exc()}")
print("\nMotor test completed.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nScript interrupted by user. Attempting to turn off motors...")
try:
print("Initializing HWI to turn off motors...")
hwi = HWI()
for joint_name, joint_id in hwi.joints.items():
try:
print(f"Turning off motor '{joint_name}' (ID: {joint_id})...")
hwi.io.disable_torque([joint_id])
print(f"✓ Motor '{joint_name}' (ID: {joint_id}) turned off successfully.")
except Exception as e:
print(f"✗ Error turning off motor '{joint_name}' (ID: {joint_id}): {e}")
except Exception as e:
print(f"Error initializing HWI to turn off motors: {e}")
print(f"Error details: {traceback.format_exc()}")

View File

@ -0,0 +1,31 @@
from pypot.feetech import FeetechSTS3215IO
io = FeetechSTS3215IO(
"/dev/ttyACM0",
baudrate=1000000,
use_sync_read=True,
)
joints = {
"left_hip_yaw": 20,
"left_hip_roll": 21,
"left_hip_pitch": 22,
"left_knee": 23,
"left_ankle": 24,
"neck_pitch": 30,
"head_pitch": 31,
"head_yaw": 32,
"head_roll": 33,
# "left_antenna": None,
# "right_antenna": None,
"right_hip_yaw": 10,
"right_hip_roll": 11,
"right_hip_pitch": 12,
"right_knee": 13,
"right_ankle": 14,
}
voltages = io.get_present_voltage(list(joints.values()))
for i, name in enumerate(joints.keys()):
print(name, round(voltages[i] * 0.1, 2), "V")

View File

@ -0,0 +1,40 @@
from pypot.feetech import FeetechSTS3215IO
import time
joints = {
"left_hip_yaw": 20,
"left_hip_roll": 21,
"left_hip_pitch": 22,
"left_knee": 23,
"left_ankle": 24,
"neck_pitch": 30,
"head_pitch": 31,
"head_yaw": 32,
"head_roll": 33,
"right_hip_yaw": 10,
"right_hip_roll": 11,
"right_hip_pitch": 12,
"right_knee": 13,
"right_ankle": 14,
}
joints_inv = {v: k for k, v in joints.items()}
ids = list(joints.values())
io = FeetechSTS3215IO("/dev/ttyACM0")
for current_id in ids:
print("Configuring", joints_inv[current_id])
io.set_lock({current_id: 0})
# io.set_mode({current_id: 0})
io.set_maximum_acceleration({current_id: 0})
io.set_acceleration({current_id: 0})
# io.set_maximum_velocity({current_id: 0})
# io.set_goal_speed({current_id: 0})
io.set_P_coefficient({current_id: 32})
io.set_I_coefficient({current_id: 0})
io.set_D_coefficient({current_id: 0})
io.set_lock({current_id: 1})
time.sleep(1)
input("Press any key to set this dof to 0 position ... Or press Ctrl+C to cancel")
io.set_goal_position({current_id: 0})

View File

@ -0,0 +1,89 @@
from pypot.feetech import FeetechSTS3215IO
import argparse
import time
DEFAULT_ID = 1 # A brand new motor should have id 1
parser = argparse.ArgumentParser()
parser.add_argument(
"--port",
help="The port the motor is connected to. Default is /dev/ttyACM0. Use `ls /dev/tty* | grep usb` to find the port.",
default="/dev/ttyACM0",
)
parser.add_argument("--id", help="The id to set to the motor.", type=str, required=True)
args = parser.parse_args()
io = FeetechSTS3215IO(args.port)
current_id = DEFAULT_ID
def scan():
id = None
for i in range(255):
print(f"scanning for id {i} ...")
try:
io.get_present_position([i])
id = i
print(f"Found motor with id {id}")
break
except Exception:
pass
return id
try:
io.get_present_position([DEFAULT_ID])
except Exception:
print(
f"Could not find motor with default id ({DEFAULT_ID}). Scanning for motor ..."
)
res = scan()
if res is not None:
current_id = res
else:
print("Could not find motor. Exiting ...")
exit()
# print("current id: ", current_id)
kp = io.get_P_coefficient([current_id])
ki = io.get_I_coefficient([current_id])
kd = io.get_D_coefficient([current_id])
max_acceleration = io.get_maximum_acceleration([current_id])
acceleration = io.get_acceleration([current_id])
mode = io.get_mode([current_id])
# print(f"PID : {kp}, {ki}, {kd}")
# print(f"max_acceleration: {max_acceleration}")
# print(f"acceleration: {acceleration}")
# print(f"mode: {mode}")
io.set_lock({current_id: 0})
io.set_mode({current_id: 0})
io.set_maximum_acceleration({current_id: 0})
io.set_acceleration({current_id: 0})
io.set_P_coefficient({current_id: 32})
io.set_I_coefficient({current_id: 0})
io.set_D_coefficient({current_id: 0})
io.change_id({current_id: int(args.id)})
current_id = int(args.id)
time.sleep(1)
io.set_goal_position({current_id: 0})
time.sleep(1)
print("===")
print("Done configuring motor.")
print(f"Motor id: {current_id}")
print(f"P coefficient : {io.get_P_coefficient([current_id])}")
print(f"I coefficient : {io.get_I_coefficient([current_id])}")
print(f"D coefficient : {io.get_D_coefficient([current_id])}")
print(f"acceleration: {io.get_acceleration([current_id])}")
print(f"max_acceleration: {io.get_maximum_acceleration([current_id])}")
print(f"mode: {io.get_mode([current_id])}")
print("===")

View File

@ -0,0 +1,302 @@
from openai import OpenAI
import time
import json
import os
from io import BytesIO
import base64
from v2_rl_walk_mujoco import RLWalk
from threading import Thread
import cv2
from mini_bdx_runtime.camera import Cam
# TODO mission : find an object ?
# Your Tools class
class Tools:
def __init__(self):
self.cam = Cam()
self.rl_walk = RLWalk(
"/home/bdxv2/BEST_WALK_ONNX_2.onnx",
cutoff_frequency=40,
)
Thread(target=self.rl_walk.run, daemon=True).start()
# def upload_image(self, image_path: str):
# image_name = os.path.basename(image_path)
# im = cv2.imread(image_path)
# im = cv2.resize(im, (512, 512))
# cv2.imwrite(image_path, im)
# # command = (
# # f"scp {image_path} apirrone@s-nguyen.net:/home/apirrone/webserv/images/"
# # )
# command = (
# f"scp {image_path} apirrone@192.168.10.103:/home/apirrone/webserv/images/"
# )
# print(command)
# url = f"http://s-nguyen.net:4444/images/{image_name}"
# os.system(command)
# return url
def move_forward(self, seconds=2):
seconds = max(2, min(seconds, 5))
print(f"Moving forward for {seconds} seconds")
self.rl_walk.last_commands[0] = 0.15
time.sleep(seconds)
self.rl_walk.last_commands[0] = 0.0
print("Stopped moving forward")
return f"Moved forward for {seconds} seconds successfully"
def turn_left(self, seconds=2):
seconds = max(2, min(seconds, 5))
print(f"Turning left for {seconds} seconds")
self.rl_walk.last_commands[2] = 1.0
time.sleep(seconds)
self.rl_walk.last_commands[2] = 0.0
print("Stopped turning left")
return f"Turned left for {seconds} seconds successfully"
def turn_right(self, seconds=2):
seconds = max(2, min(seconds, 5))
print(f"Turning right for {seconds} seconds")
self.rl_walk.last_commands[2] = -1.0
time.sleep(seconds)
self.rl_walk.last_commands[2] = 0.0
print("Stopped turning right")
return f"Turned right for {seconds} seconds successfully"
def move_backward(self, seconds=2):
seconds = max(2, min(seconds, 5))
print(f"Moving backward for {seconds} seconds")
self.rl_walk.last_commands[0] = -0.15
time.sleep(seconds)
self.rl_walk.last_commands[0] = 0.0
print("Stopped moving backward")
return f"Moved backward for {seconds} seconds successfully"
def take_picture(self):
# https://projects.raspberrypi.org/en/projects/getting-started-with-picamera/5
print("Taking a picture...")
image64 = self.cam.get_encoded_image()
url = ("data:image/jpeg;base64," + image64,)
time.sleep(1)
print("Picture taken")
return url
def play_happy_sound(self):
self.rl_walk.sounds.play_happy()
return "Played happy sound"
# Tool instance
tools_instance = Tools()
openai_tools = [
{
"type": "function",
"name": "move_forward",
"description": "Move forward for a number of seconds",
"parameters": {
"type": "object",
"properties": {
"seconds": {
"type": "integer",
"description": "Number of seconds to move forward (min 2, max 5)",
}
},
"required": ["seconds"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "move_backward",
"description": "Move backward for a number of seconds",
"parameters": {
"type": "object",
"properties": {
"seconds": {
"type": "integer",
"description": "Number of seconds to move backward (min 2, max 5)",
}
},
"required": ["seconds"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "turn_left",
"description": "Turn left on the spot",
"parameters": {
"type": "object",
"properties": {
"seconds": {
"type": "integer",
"description": "Number of seconds to turn left (min 2, max 5)",
}
},
"required": ["seconds"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "turn_right",
"description": "Turn right on the spot",
"parameters": {
"type": "object",
"properties": {
"seconds": {
"type": "integer",
"description": "Number of seconds to turn right (min 2, max 5)",
}
},
"required": ["seconds"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "take_picture",
"description": "Take a picture",
"parameters": {
"type": "object",
"properties": {},
# No required properties for taking a picture
},
},
{
"type": "function",
"name": "play_happy_sound",
"description": "Play a happy sound",
"parameters": {
"type": "object",
"properties": {},
# No required properties for playing a sound
},
}
]
# Mapping function names to actual methods
function_map = {
"move_forward": tools_instance.move_forward,
"move_backward": tools_instance.move_backward,
"turn_left": tools_instance.turn_left,
"turn_right": tools_instance.turn_right,
"take_picture": tools_instance.take_picture,
}
messages = [
{
"role": "system",
"content": (
"You are a cute little biped robot that can move around using the following tools: "
"`move_forward`, `move_backward`, `turn_left`, `turn_right`, 'play_happy_sound' and 'take_picture'. "
"moving forward for 1 second will make you move forward by about 15 centimeters"
"turning for 1 second will make you turn about 45 degrees"
"You can only perform one action at a time. If multiple actions are needed, call them step by step."
"Explain what you are doing along the way"
"Always start by taking a picture of the environment so you can see where you are. "
"When you take a picture, describe what you see in the image. "
"make sure not to hit any walls or objects. Take pictures regularly to know where you are."
"Maybe it's a good idea to turn 360 degrees to check all directions. (no need if you already found it)"
"When given a goal to find something, if you found it, navigate to be in front of it, facing it. You want to be about 1 meter close to it"
"When you are 1 meter close to the object, play the happy sound"
""
),
},
# {
# "role": "user",
# "content": "Find the yellow vaccum cleaner !",
# },
{
"role": "user",
"content": "Find the waste bin and turn around it. Play the happy sound when you are done",
},
]
# Mapping function names to actual methods
function_map = {
"move_forward": tools_instance.move_forward,
"move_backward": tools_instance.move_backward,
"turn_left": tools_instance.turn_left,
"turn_right": tools_instance.turn_right,
"take_picture": tools_instance.take_picture,
"play_happy_sound": tools_instance.play_happy_sound,
}
client = OpenAI()
def call_function(name, args):
if name == "move_forward":
return function_map[name](args["seconds"])
elif name == "move_backward":
return function_map[name](args["seconds"])
elif name == "turn_left":
return function_map[name](args["seconds"])
elif name == "turn_right":
return function_map[name](args["seconds"])
elif name == "take_picture":
return function_map[name]()
elif name == "play_happy_sound":
return function_map[name]()
else:
raise ValueError(f"Unknown function name: {name}")
while True:
response = client.responses.create(
model="gpt-4o-mini",
input=messages,
tools=openai_tools,
)
if len(response.output) == 1 and response.output[0].type == "function_call":
print("Only function call, no text response")
else:
try:
print(response.output[0].content[0].text)
except:
print("Error occurred while processing response")
for tool_call in response.output:
if tool_call.type != "function_call":
continue
name = tool_call.name
args = json.loads(tool_call.arguments)
result = call_function(name, args)[0]
messages.append(tool_call)
if tool_call.name == "take_picture":
# result is an image URL
# Add an optional prompt or let GPT interpret the image
messages.append(
{
"role": "user",
"content": [{"type": "input_image", "image_url": result}],
}
)
messages.append(
{
"type": "function_call_output",
"call_id": tool_call.call_id,
"output": "Image taken and provided above.",
}
)
else:
messages.append(
{
"type": "function_call_output",
"call_id": tool_call.call_id,
"output": str(result),
}
)

View File

@ -0,0 +1,84 @@
"""
Find the offsets to set in self.joints_offsets in hwi_feetech_pwm_control.py
"""
from mini_bdx_runtime.rustypot_position_hwi import HWI
from mini_bdx_runtime.duck_config import DuckConfig
import time
dummy_config = DuckConfig(config_json_path=None, ignore_default=True)
print("======")
print(
"Warning : this script will move the robot to its zero position quiclky, make sure it is safe to do so"
)
print("======")
print("")
input(
"Press any key to start. The robot will move to its zero position. Make sure it is safe to do so. At any time, press ctrl+c to stop, the motors will be turned off."
)
hwi = HWI(dummy_config)
hwi.init_pos = hwi.zero_pos
hwi.set_kds([0] * len(hwi.joints))
hwi.turn_on()
print("")
print("")
print("")
print("")
hwi.set_position_all(hwi.zero_pos)
time.sleep(1)
try:
for i, joint_name in enumerate(hwi.joints.keys()):
joint_id = hwi.joints[joint_name]
ok = False
while not ok:
res = input(f" === Setting up {joint_name} === (Y/(s)kip : ").lower()
if res == "s":
break
hwi.set_position_all(hwi.zero_pos)
time.sleep(0.5)
current_pos = hwi.get_present_positions()[i]
if current_pos is None:
continue
# hwi.control.kps[i] = 0
hwi.io.disable_torque([joint_id])
input(
f"{joint_name} is now turned off. Move it to the desired zero position and press any key to confirm the offset"
)
new_pos = hwi.get_present_positions()[i]
offset = new_pos - current_pos
print(f" ---> Offset is {offset}")
hwi.joints_offsets[joint_name] = offset
input(
"Press any key to move the motor to its zero position with offset taken into account"
)
hwi.set_position_all(hwi.zero_pos)
time.sleep(0.5)
hwi.io.enable_torque([joint_id])
# hwi.control.kps[i] = 32
res = input("Is that ok ? (Y/n)").lower()
if res == "y" or res == "":
print("Ok, setting offset")
hwi.joints_offsets[joint_name] = offset
ok = True
print("------")
print("Current offsets : ")
for k, v in hwi.joints_offsets.items():
print(f"{k} : {v}")
print("------")
print("")
else:
print("Ok, let's try again")
hwi.joints_offsets[joint_name] = 0
print("===")
print("Done ! ")
print("Now you can copy the offsets in your duck_config.json")
except KeyboardInterrupt:
hwi.turn_off()

View File

@ -0,0 +1,103 @@
"""
Sets up the robot in init position, you control the head with the xbox controller
"""
import time
import numpy as np
from mini_bdx_runtime.rustypot_position_hwi import HWI
from mini_bdx_runtime.duck_config import DuckConfig
from mini_bdx_runtime.xbox_controller import XBoxController
from mini_bdx_runtime.eyes import Eyes
from mini_bdx_runtime.sounds import Sounds
from mini_bdx_runtime.antennas import Antennas
from mini_bdx_runtime.projector import Projector
duck_config = DuckConfig()
xbox_controller = XBoxController(50, only_head_control=True)
if duck_config.speaker:
sounds = Sounds(volume=1.0, sound_directory="../mini_bdx_runtime/assets/")
if duck_config.antennas:
antennas = Antennas()
if duck_config.eyes:
eyes = Eyes()
if duck_config.projector:
projector = Projector()
hwi = HWI(duck_config)
kps = [8] * 14
kds = [0] * 14
hwi.set_kps(kps)
hwi.set_kds(kds)
hwi.turn_on()
limits = {
"neck_pitch": [-20, 60],
"head_pitch": [-60, 45],
"head_yaw": [-60, 60],
"head_roll": [-20, 20],
}
try:
while True:
last_commands, buttons, left_trigger, right_trigger = (
xbox_controller.get_last_command()
)
l_x = last_commands[5]
l_y = last_commands[4]
r_x = last_commands[6]
# r_y = last_commands[3]
head_yaw_deg = (
l_x * (limits["head_yaw"][1] - limits["head_yaw"][0]) / 2
+ (limits["head_yaw"][1] + limits["head_yaw"][0]) / 2
)
head_yaw_pos_rad = np.deg2rad(head_yaw_deg)
head_roll_deg = (
r_x * (limits["head_roll"][1] - limits["head_roll"][0]) / 2
+ (limits["head_roll"][1] + limits["head_roll"][0]) / 2
)
head_roll_pos_rad = np.deg2rad(head_roll_deg)
head_pitch_deg = (
l_y * (limits["head_pitch"][1] - limits["head_pitch"][0]) / 2
+ (limits["head_pitch"][1] + limits["head_pitch"][0]) / 2
)
head_pitch_pos_rad = np.deg2rad(head_pitch_deg)
# neck_pitch_deg = (
# -r_y * (limits["neck_pitch"][1] - limits["neck_pitch"][0]) / 2
# + (limits["neck_pitch"][1] + limits["neck_pitch"][0]) / 2
# )
# neck_pitch_pos_rad = np.deg2rad(neck_pitch_deg)
hwi.set_position("head_yaw", head_yaw_pos_rad)
hwi.set_position("head_roll", head_roll_pos_rad)
hwi.set_position("head_pitch", head_pitch_pos_rad)
# hwi.set_position("neck_pitch", neck_pitch_pos_rad)
if duck_config.antennas:
antennas.set_position_left(right_trigger)
antennas.set_position_right(left_trigger)
if buttons.B.triggered:
if duck_config.speaker:
sounds.play_random_sound()
if buttons.X.triggered:
if duck_config.projector:
projector.switch()
# pygame.event.pump() # process event queue
time.sleep(1 / 60)
except KeyboardInterrupt:
if duck_config.antennas:
antennas.stop()

View File

@ -0,0 +1,76 @@
import socket
import time
import numpy as np
import pickle
from queue import Queue
from threading import Thread
from scipy.spatial.transform import Rotation as R
from FramesViewer.viewer import Viewer
import argparse
class IMUClient:
def __init__(self, host, port=1234, freq=30):
self.host = host
self.port = port
self.freq = freq
self.client_socket = socket.socket()
self.connected = False
while not self.connected:
try:
self.client_socket.connect((self.host, self.port))
self.connected = True
except Exception as e:
print(e)
time.sleep(0.5)
self.imu_queue = Queue(maxsize=1)
self.last_imu = [0, 0, 0, 0]
Thread(target=self.imu_worker, daemon=True).start()
def imu_worker(self):
while True:
try:
data = self.client_socket.recv(1024) # receive response
data = pickle.loads(data)
self.imu_queue.put(data)
except:
print("missed imu")
time.sleep(1 / self.freq)
def get_imu(self):
try:
self.last_imu = self.imu_queue.get(False)
except:
pass
return self.last_imu
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, required=True, help="IP address of the robot")
args = parser.parse_args()
client = IMUClient(args.ip)
fv = Viewer()
fv.start()
pose = np.eye(4)
pose[:3, 3] = [0.1, 0.1, 0.1]
try:
while True:
quat = client.get_imu()
try:
rot_mat = R.from_quat(quat).as_matrix()
pose[:3, :3] = rot_mat
fv.pushFrame(pose, "pose")
except Exception as e:
print("error", e)
pass
time.sleep(1 / 30)
except KeyboardInterrupt:
pass

View File

@ -0,0 +1,62 @@
import socket
import time
import pickle
from mini_bdx_runtime.imu import Imu
from threading import Thread
import time
import argparse
class IMUServer:
def __init__(self, imu=None):
self.host = "0.0.0.0"
self.port = 1234
self.server_socket = socket.socket()
self.server_socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
) # enable address reuse
self.server_socket.bind((self.host, self.port))
if imu is None:
self.imu = Imu(50, user_pitch_bias=args.pitch_bias, upside_down=False)
else:
self.imu = imu
self.stop = False
Thread(target=self.run, daemon=True).start()
def run(self):
while not self.stop:
self.server_socket.listen(1)
conn, address = self.server_socket.accept() # accept new connection
print("Connection from: " + str(address))
try:
while True:
data = self.imu.get_data()
data = pickle.dumps(data)
conn.send(data) # send data to the client
time.sleep(1 / 30)
except:
pass
self.server_socket.close()
print("thread closed")
time.sleep(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--pitch_bias", type=float, default=0, help="deg")
args = parser.parse_args()
imu_server = IMUServer()
try:
while True:
time.sleep(0.01)
except KeyboardInterrupt:
print("Closing server")
imu_server.stop = True
time.sleep(2)

View File

@ -0,0 +1,104 @@
from pypot.feetech import FeetechSTS3215IO
import pickle
import numpy as np
import time
import argparse
parser = argparse.ArgumentParser(description="Record data from Feetech motor")
parser.add_argument(
"--new_firmware",
action="store_true",
help="Use new firmware for the motor",
default=False,
)
args = parser.parse_args()
io = FeetechSTS3215IO("/dev/ttyACM0")
def convert_load(raw_load):
sign = -1
if raw_load > 1023:
raw_load -= 1024
sign = 1
return sign * raw_load * 0.001
id = 1 if args.new_firmware else 11
kp = 32
kd = 0
acceleration = 0
maximum_acceleration = 0
maximum_velocity = 0
goal_speed = 0
time.sleep(1)
io.set_maximum_acceleration({id: maximum_acceleration})
io.set_acceleration({id: acceleration})
io.set_maximum_velocity({id: maximum_velocity})
io.set_goal_speed({id: goal_speed})
io.set_P_coefficient({id: kp})
io.set_D_coefficient({id: kd})
time.sleep(1)
goal_position = 90
io.set_goal_position({id: 0})
time.sleep(3)
exit()
times = []
positions = []
goal_positions = []
speeds = []
loads = []
currents = []
io.set_goal_position({id: goal_position})
s = time.time()
set = False
while True:
t = time.time() - s
# goal_position = np.rad2deg(np.sin(t**2))
io.set_goal_position({id: goal_position})
present_position = np.deg2rad(io.get_present_position([id])[0])
present_speed = np.deg2rad(io.get_present_speed([id])[0])
present_load = convert_load(io.get_present_load([id])[0])
present_current = io.get_present_current([id])[0]
times.append(t)
positions.append(present_position)
goal_positions.append(np.deg2rad(goal_position))
speeds.append(present_speed)
loads.append(present_load)
currents.append(present_current)
if t > 3:
break
time.sleep(0.01)
fw_version = "new" if args.new_firmware else "old"
data = {
"maximum_acceleration": maximum_acceleration,
"acceleration": acceleration,
"maximum_velocity": maximum_velocity,
"kp": kp,
"kd": kd,
"times": times,
"positions": positions,
"goal_positions": goal_positions,
"speeds": speeds,
"loads": loads,
"currents": currents,
"fw_version": fw_version,
}
pickle.dump(
data,
open(
f"data_KP_{kp}_{fw_version}.pkl",
"wb",
),
)

View File

@ -0,0 +1,35 @@
import pickle
import argparse
import matplotlib.pyplot as plt
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file", type=str, required=True)
args = parser.parse_args()
data = pickle.load(open(args.file, "rb"))
plt.figure()
plt.subplot(4, 1, 1)
plt.plot(data["times"], data["positions"], label="positions (rad)")
plt.plot(data["times"], data["goal_positions"], label="goal_positions (rad)")
plt.legend()
plt.subplot(4, 1, 2)
plt.plot(data["times"], data["speeds"], label="speed (rad/s)")
plt.legend()
plt.subplot(4, 1, 3)
plt.plot(data["times"], data["loads"], label="load")
plt.legend()
plt.subplot(4, 1, 4)
plt.plot(data["times"], data["currents"], label="current (mA)")
plt.legend()
# plt.suptitle(f"Acceleration: {data['acceleration']}, KP: {data['kp']}, KD: {data['kd']}")
plt.suptitle(f"KP: {data['kp']}, firmware: {data['fw_version']}")
plt.tight_layout()
plt.show()

View File

@ -0,0 +1,100 @@
from pypot.feetech import FeetechSTS3215IO
import pickle
import numpy as np
import time
io = FeetechSTS3215IO("/dev/ttyACM0")
# accelerations = [0, 10, 50, 100, 200, 255]
accelerations = [0]
switch=False
# kps = [4, 8, 16, 32]
# kds = [0, 4, 8, 16, 32]
# accelerations = [0]
kps = [32]
kds = [0]
for acceleration in accelerations:
for kp in kps:
for kd in kds:
print(f"acceleration: {acceleration}, kp: {kp}, kd: {kd}")
# acceleration = 50
# kp = 32
# kd = 0
# io.set_mode({1: 0})
# io.set_lock({1: 1})
# time.sleep(1)
# io.set_maximum_acceleration({1: acceleration})
# io.set_acceleration({1: acceleration})
# time.sleep(1)
io.set_P_coefficient({1: kp})
io.set_D_coefficient({1: kd})
# time.sleep(1)
goal_position = 90
io.set_goal_position({1: 0})
time.sleep(3)
times = []
positions = []
goal_positions = []
speeds = []
loads = []
currents = []
def convert_load(raw_load):
sign = -1
if raw_load > 1023:
raw_load -= 1024
sign = 1
return sign * raw_load * 0.001
io.set_goal_position({1: goal_position})
s = time.time()
set = False
while True:
t = time.time() - s
# goal_position = np.rad2deg(np.sin(t**2))
io.set_goal_position({1: goal_position})
present_position = np.deg2rad(io.get_present_position([1])[0])
present_speed = np.deg2rad(io.get_present_speed([1])[0])
present_load = convert_load(io.get_present_load([1])[0])
present_current = io.get_present_current([1])[0]
times.append(t)
positions.append(present_position)
goal_positions.append(np.deg2rad(goal_position))
speeds.append(present_speed)
loads.append(present_load)
currents.append(present_current)
# if switch and t > 0.2 and not set:
# goal_position = -goal_position
# io.set_goal_position({1: goal_position})
# set = True
if t > 3:
break
time.sleep(0.01)
data = {
"acceleration": acceleration,
"kp": kp,
"kd": kd,
"times": times,
"positions": positions,
"goal_positions": goal_positions,
"speeds": speeds,
"loads": loads,
"currents": currents,
}
pickle.dump(
data,
open(f"data_{'switch_' if switch else ''}acceleration_{acceleration}_kp_{kp}_kd_{kd}.pkl", "wb"),
)

View File

@ -0,0 +1,9 @@
from mini_bdx_runtime.rustypot_position_hwi import HWI
from mini_bdx_runtime.duck_config import DuckConfig
import time
duck_config = DuckConfig()
hwi = HWI(duck_config)
hwi.turn_off()
time.sleep(1)

View File

@ -0,0 +1,9 @@
from mini_bdx_runtime.rustypot_position_hwi import HWI
from mini_bdx_runtime.duck_config import DuckConfig
import time
duck_config = DuckConfig()
hwi = HWI(duck_config)
hwi.turn_on()
time.sleep(1)

View File

@ -0,0 +1,400 @@
import time
import pickle
import numpy as np
from mini_bdx_runtime.rustypot_position_hwi import HWI
from mini_bdx_runtime.onnx_infer import OnnxInfer
from mini_bdx_runtime.raw_imu import Imu
from mini_bdx_runtime.poly_reference_motion import PolyReferenceMotion
from mini_bdx_runtime.xbox_controller import XBoxController
from mini_bdx_runtime.feet_contacts import FeetContacts
from mini_bdx_runtime.eyes import Eyes
from mini_bdx_runtime.sounds import Sounds
from mini_bdx_runtime.antennas import Antennas
from mini_bdx_runtime.projector import Projector
from mini_bdx_runtime.rl_utils import make_action_dict, LowPassActionFilter
from mini_bdx_runtime.duck_config import DuckConfig
import os
HOME_DIR = os.path.expanduser("~")
class RLWalk:
def __init__(
self,
onnx_model_path: str,
duck_config_path: str = f"{HOME_DIR}/duck_config.json",
serial_port: str = "/dev/ttyACM0",
control_freq: float = 50,
pid=[30, 0, 0],
action_scale=0.25,
commands=False,
pitch_bias=0,
save_obs=False,
replay_obs=None,
cutoff_frequency=None,
):
self.duck_config = DuckConfig(config_json_path=duck_config_path)
self.commands = commands
self.pitch_bias = pitch_bias
self.onnx_model_path = onnx_model_path
self.policy = OnnxInfer(self.onnx_model_path, awd=True)
self.num_dofs = 14
self.max_motor_velocity = 5.24 # rad/s
# Control
self.control_freq = control_freq
self.pid = pid
self.save_obs = save_obs
if self.save_obs:
self.saved_obs = []
self.replay_obs = replay_obs
if self.replay_obs is not None:
self.replay_obs = pickle.load(open(self.replay_obs, "rb"))
self.action_filter = None
if cutoff_frequency is not None:
self.action_filter = LowPassActionFilter(
self.control_freq, cutoff_frequency
)
self.hwi = HWI(self.duck_config, serial_port)
self.start()
self.imu = Imu(
sampling_freq=int(self.control_freq),
user_pitch_bias=self.pitch_bias,
upside_down=self.duck_config.imu_upside_down,
)
self.feet_contacts = FeetContacts()
# Scales
self.action_scale = action_scale
self.last_action = np.zeros(self.num_dofs)
self.last_last_action = np.zeros(self.num_dofs)
self.last_last_last_action = np.zeros(self.num_dofs)
self.init_pos = list(self.hwi.init_pos.values())
self.motor_targets = np.array(self.init_pos.copy())
self.prev_motor_targets = np.array(self.init_pos.copy())
self.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.paused = self.duck_config.start_paused
self.command_freq = 20 # hz
if self.commands:
self.xbox_controller = XBoxController(self.command_freq)
# Reference motion, but we only really need the length of one phase
# TODO
self.PRM = PolyReferenceMotion("./polynomial_coefficients.pkl")
self.imitation_i = 0
self.imitation_phase = np.array([0, 0])
self.phase_frequency_factor = 1.0
self.phase_frequency_factor_offset = (
self.duck_config.phase_frequency_factor_offset
)
# Optional expression features
if self.duck_config.eyes:
self.eyes = Eyes()
if self.duck_config.projector:
self.projector = Projector()
if self.duck_config.speaker:
self.sounds = Sounds(
volume=1.0, sound_directory="../mini_bdx_runtime/assets/"
)
if self.duck_config.antennas:
self.antennas = Antennas()
def get_obs(self):
imu_data = self.imu.get_data()
dof_pos = self.hwi.get_present_positions(
ignore=[
"left_antenna",
"right_antenna",
]
) # rad
dof_vel = self.hwi.get_present_velocities(
ignore=[
"left_antenna",
"right_antenna",
]
) # rad/s
if dof_pos is None or dof_vel is None:
return None
if len(dof_pos) != self.num_dofs:
print(f"ERROR len(dof_pos) != {self.num_dofs}")
return None
if len(dof_vel) != self.num_dofs:
print(f"ERROR len(dof_vel) != {self.num_dofs}")
return None
cmds = self.last_commands
feet_contacts = self.feet_contacts.get()
obs = np.concatenate(
[
imu_data["gyro"],
imu_data["accelero"],
cmds,
dof_pos - self.init_pos,
dof_vel * 0.05,
self.last_action,
self.last_last_action,
self.last_last_last_action,
self.motor_targets,
feet_contacts,
self.imitation_phase,
]
)
return obs
def start(self):
kps = [self.pid[0]] * 14
kds = [self.pid[2]] * 14
# lower head kps
kps[5:9] = [8, 8, 8, 8]
self.hwi.set_kps(kps)
self.hwi.set_kds(kds)
self.hwi.turn_on()
time.sleep(2)
def get_phase_frequency_factor(self, x_velocity):
max_phase_frequency = 1.2
min_phase_frequency = 1.0
# Perform linear interpolation
freq = min_phase_frequency + (abs(x_velocity) / 0.15) * (
max_phase_frequency - min_phase_frequency
)
return freq
def run(self):
i = 0
try:
print("Starting")
start_t = time.time()
while True:
left_trigger = 0
right_trigger = 0
t = time.time()
if self.commands:
self.last_commands, self.buttons, left_trigger, right_trigger = (
self.xbox_controller.get_last_command()
)
if self.buttons.dpad_up.triggered:
self.phase_frequency_factor_offset += 0.05
print(
f"Phase frequency factor offset {round(self.phase_frequency_factor_offset, 3)}"
)
if self.buttons.dpad_down.triggered:
self.phase_frequency_factor_offset -= 0.05
print(
f"Phase frequency factor offset {round(self.phase_frequency_factor_offset, 3)}"
)
if self.buttons.LB.is_pressed:
self.phase_frequency_factor = 1.3
else:
self.phase_frequency_factor = 1.0
if self.buttons.X.triggered:
if self.duck_config.projector:
self.projector.switch()
if self.buttons.B.triggered:
if self.duck_config.speaker:
self.sounds.play_random_sound()
if self.duck_config.antennas:
self.antennas.set_position_left(right_trigger)
self.antennas.set_position_right(left_trigger)
if self.buttons.A.triggered:
self.paused = not self.paused
if self.paused:
print("PAUSE")
else:
print("UNPAUSE")
if self.paused:
time.sleep(0.1)
continue
obs = self.get_obs()
if obs is None:
continue
self.imitation_i += 1 * (
self.phase_frequency_factor + self.phase_frequency_factor_offset
)
self.imitation_i = self.imitation_i % self.PRM.nb_steps_in_period
self.imitation_phase = np.array(
[
np.cos(
self.imitation_i / self.PRM.nb_steps_in_period * 2 * np.pi
),
np.sin(
self.imitation_i / self.PRM.nb_steps_in_period * 2 * np.pi
),
]
)
if self.save_obs:
self.saved_obs.append(obs)
if self.replay_obs is not None:
if i < len(self.replay_obs):
obs = self.replay_obs[i]
else:
print("BREAKING ")
break
action = self.policy.infer(obs)
self.last_last_last_action = self.last_last_action.copy()
self.last_last_action = self.last_action.copy()
self.last_action = action.copy()
# action = np.zeros(10)
self.motor_targets = self.init_pos + action * self.action_scale
# self.motor_targets = np.clip(
# self.motor_targets,
# self.prev_motor_targets
# - self.max_motor_velocity * (1 / self.control_freq), # control dt
# self.prev_motor_targets
# + self.max_motor_velocity * (1 / self.control_freq), # control dt
# )
if self.action_filter is not None:
self.action_filter.push(self.motor_targets)
filtered_motor_targets = self.action_filter.get_filtered_action()
if (
time.time() - start_t > 1
): # give time to the filter to stabilize
self.motor_targets = filtered_motor_targets
self.prev_motor_targets = self.motor_targets.copy()
head_motor_targets = self.last_commands[3:] + self.motor_targets[5:9]
self.motor_targets[5:9] = head_motor_targets
action_dict = make_action_dict(
self.motor_targets, list(self.hwi.joints.keys())
)
self.hwi.set_position_all(action_dict)
i += 1
took = time.time() - t
# print("Full loop took", took, "fps : ", np.around(1 / took, 2))
if (1 / self.control_freq - took) < 0:
print(
"Policy control budget exceeded by",
np.around(took - 1 / self.control_freq, 3),
)
time.sleep(max(0, 1 / self.control_freq - took))
except KeyboardInterrupt:
if self.duck_config.antennas:
self.antennas.stop()
if self.duck_config.eyes:
self.eyes.stop()
if self.duck_config.projector:
self.projector.stop()
self.feet_contacts.stop()
if self.save_obs:
pickle.dump(self.saved_obs, open("robot_saved_obs.pkl", "wb"))
print("TURNING OFF")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--onnx_model_path", type=str, required=True)
parser.add_argument(
"--duck_config_path",
type=str,
required=False,
default=f"{HOME_DIR}/duck_config.json",
)
parser.add_argument("-a", "--action_scale", type=float, default=0.25)
parser.add_argument("-p", type=int, default=30)
parser.add_argument("-i", type=int, default=0)
parser.add_argument("-d", type=int, default=0)
parser.add_argument("-c", "--control_freq", type=int, default=50)
parser.add_argument("--pitch_bias", type=float, default=0, help="deg")
parser.add_argument(
"--commands",
action="store_true",
default=True,
help="external commands, keyboard or gamepad. Launch control_server.py on host computer",
)
parser.add_argument(
"--save_obs",
type=str,
required=False,
default=False,
help="save the run's observations",
)
parser.add_argument(
"--replay_obs",
type=str,
required=False,
default=None,
help="replay the observations from a previous run (can be from the robot or from mujoco)",
)
parser.add_argument("--cutoff_frequency", type=float, default=None)
args = parser.parse_args()
pid = [args.p, args.i, args.d]
print("Done parsing args")
rl_walk = RLWalk(
args.onnx_model_path,
duck_config_path=args.duck_config_path,
action_scale=args.action_scale,
pid=pid,
control_freq=args.control_freq,
commands=args.commands,
pitch_bias=args.pitch_bias,
save_obs=args.save_obs,
replay_obs=args.replay_obs,
cutoff_frequency=args.cutoff_frequency,
)
print("Done instantiating RLWalk")
rl_walk.run()

View File

@ -0,0 +1,38 @@
[metadata]
name = mini-bdx-runtime
version = 0.0.1
author = Antoine Pirrone
author_email = antoine.pirrone@gmail.com
url = https://github.com/apirrone/mini_BDX_runtime
description = Runtime code for mini BDX
long_description = file: README.md
long_description_content_type = text/markdown
[options]
packages = find:
zip_safe = True
include_package_data = True
package_dir=
=mini_bdx_runtime
install_requires =
rustypot==0.1.0
onnxruntime==1.18.1
numpy==1.26.4
adafruit-circuitpython-bno055==5.4.13
scipy==1.15.1
pygame==2.6.0
pypot @ git+https://github.com/pollen-robotics/pypot@support-feetech-sts3215
openai==1.70.0
# adafruit_extended_bus
[options.packages.find]
where=mini_bdx_runtime
[options.package_data]
[options.extras_require]
[options.entry_points]
console_scripts =