last updated: 2024-08-27
I used a simple ESP32 circuit (Arduino) with potentiometer (sensor) and servo (actuator) to create 2 ROS2 nodes here: https://www.weigu.lu/sb-computer/ros2_basics/ros2_esp32_pot_servo/index.html.
For robots that need feedback a normal servo is not the best solution. So here I will use a Dynamixel servo as actuator.
For more information about ROS2: https://www.weigu.lu/sb-computer/ros2_basics/index.html
Dynamixel servos have a sophisticated controller on-board, that uses a proprietary digital communication protocol over a bus (no PWM). This allows for daisy-chaining multiple servos. They provide telemetry feedback (integrated sensors) and you can tune the position control loop and set parameters like current limiting. They are more robust (encoder instead a potentiometer) and easier to integrate into a robot than a common hobby servo.
I will use an AX-12A Servo (now replaced with an XL430-W250). This AX-12A needs 11.1 V , can turn 0-300 ° (endless) and has a resolution of 0.29 °. The communication uses half duplex asynchronous serial communication (8N1) with TTL level (5 V). The stall torque is 1.5 Nm (12 V, 1.5 A). More infos here: https://emanual.robotis.com/docs/en/dxl/ax/ax-12a/.
To connect the servo to the ESP32 I use a little interface board with level shifter and tri-state buffer to get a good communication: https://github.com/JosueAGtz/dynamixelInterface. The schematic is here: https://github.com/JosueAGtz/dynamixelInterface/blob/main/Hardware/Dynamixel%20Interface%20Sch.pdf.
BOM:
Platform:
A 10 kΩ potentiometer (variable resistance) will be connected to 3.3 V and deliver a voltage between 0 V and 3.3 V. The ADC of the ESP32 (GPIO18) will convert this voltage with 12 bit in a value between 0
and 4095
.
The actuator will be a Dynamixel Servo. The value for the servo ranges from 0
to 280
corresponding to an angle of 0° to 280°. We need the Dynamixel2Arduino
library (Robotis). Install it with the Tools > Manage libraries...
.
Use this circuit:
First let's test the hardware. Her a little sketch to turn the Dynamixel using the Dynamixel2Arduino library (look also at the Arduino examples):
/*
dynamixel_read_write_ax_test.ino
Little test program for Dynamixel AX-12A on ESP32 and little interface
weigu.lu
https://www.weigu.lu/sb-computer/ros2_basics/ros2_esp32_dynamixel/index.html
https://github.com/JosueAGtz/dynamixelInterface
More functions in:
https://github.com/ROBOTIS-GIT/Dynamixel2Arduino/blob/master/src/Dynamixel2Arduino.h
*/
#include <Dynamixel2Arduino.h>
#define DXL_SERIAL Serial1 // Serial to Dynamixel
#define DEBUG_SERIAL Serial // Arduino serial monitor
const byte DXL_DIR_PIN = 19; // Dynamixel interface board DIR PIN
const byte DXL_ID = 1; // default AX-12A
const float DXL_PROTOCOL_VERSION = 1.0; // default AX-12A
const unsigned long DXL_BPS = 1000000; // default AX-12A
const unsigned long DEBUG_BPS = 115200;
Dynamixel2Arduino dxl(DXL_SERIAL, DXL_DIR_PIN);
void setup() {
DXL_SERIAL.begin(DXL_BPS, SERIAL_8N1, 21, 22); // SERIAL1 on ESP32 to Pin 21/22
DEBUG_SERIAL.begin(DEBUG_BPS); // Set debugging port baudrate to 115200bps
delay(2000);
dxl.begin(DXL_BPS); // Set Port baudrate to default AX-12A
dxl.setPortProtocolVersion(DXL_PROTOCOL_VERSION); // Set Port Protocol Version
DEBUG_SERIAL.println("Refer to eManual for more details: https://emanual.robotis.com/docs/en/dxl/");
DEBUG_SERIAL.println("Read for PROTOCOL " + String(DXL_PROTOCOL_VERSION) + ", ID: " + String(DXL_ID));
DEBUG_SERIAL.println("Model Number: " + String(dxl.getModelNumber(DXL_ID))); // Read DYNAMIXEL Model Number
DEBUG_SERIAL.println("Bitrate: " + String(dxl. getPortBaud())); // Read DYNAMIXEL Baudrate
// Read DYNAMIXEL Present Position
DEBUG_SERIAL.println("Present position: " + String(dxl.getPresentPosition(DXL_ID, UNIT_DEGREE)));
DEBUG_SERIAL.println("Setup done!");
delay(2000);
}
void loop() {
dxl.ledOn(DXL_ID);
for (int i = 0; i<290; i=i+5) {
DEBUG_SERIAL.print(String(i) + '\t');
dxl.setGoalPosition(DXL_ID, i, UNIT_DEGREE);
delay(100);
int pos = dxl.getPresentPosition(DXL_ID, UNIT_DEGREE);
if (((pos==0) && (i!=0)) || (pos>300) || (pos<0)) {
delay(100);
pos = dxl.getPresentPosition(DXL_ID, UNIT_DEGREE);
}
DEBUG_SERIAL.println(pos);
}
dxl.setGoalPosition(DXL_ID, 0, UNIT_DEGREE);
dxl.ledOff(DXL_ID);
delay(5000);
}
I will use a similar packet structure as it was used for the JPL NASA open source robot (osr). The communication goes over a good old serial interface. On ESP we use Serial2 on the pins 16 (RxD2) and 17 (TxD2). RxD2 is connected to TxD and TxD2 to RxD of an 3V USB to Serial adapter that uses an USB port of the Raspi. Don't forget to connect ground.
The serial data will have a two byte preamble (or syncword). These are the teo first bytes of every single message. The preamble is 0xABCD
. A 1 byte CRC-8 Checksum provides verification that the frame was received correctly. It is computed of only the data bytes and is stored at the end of the packet. It can be re-calculated on both sides of the data stream to check if the data transmission was correct. As we need here only one data value, the data consists in this example from 2 bytes. Here is an example of the frame (pot. value 1562):
Preamblel HByte | Preamblel LByte | Data HByte | Data LByte | CRC |
---|---|---|---|---|
0xAB | 0xCD | 0x06 | 0x1A | 0x1C |
First let's test the serial connection. We use 1000000 bit/s :).
// esp32_hardware_serial_test.ino
byte incoming_byte;
long counter = 0;
void setup() {
Serial.begin(115200); // initialize serial monitor
Serial2.begin(1000000,SERIAL_8N1, 16, 17); //init hardware serial
delay(500);
Serial.println("Setup done!");
}
void loop() {
Serial.println(String("Hello! ") + counter); // msg to Raspi
Serial2.println(String("Hello! ") + counter); // msg to Raspi
while (Serial2.available() > 0) { // if any serial available, read it
incoming_byte = Serial2.read(); // and write it to the terminal
Serial.write(incoming_byte); // alt: Serial.write(Serial2.read());
}
Serial.print(".");
delay(2000);
counter++;
}
On the Raspi look with sudo dmesg
for the name of your adapter (e.g. ttyUSB0, ttyAMA0). Then we configure the interface and wait for 15 bytes.
stty -F /dev/ttyUSB0 speed 10000000 cs8 -cstopb -parenb -echo
hexdump -C -n 15 /dev/ttyUSB0
We should see a "Hello!". To send something to the ESP32 (Arduino Serial monitor) we use:
echo -en '\x6f\x6f\x6f\x6f\x0a0' > /dev/ttyUSB0
We should see a "oooo".
Next let's test the potentiometer and Dynamixel connections:
// esp32_hardware_pot_and_dynamixel_test.ino
#include <Dynamixel2Arduino.h>
#define DXL_SERIAL Serial1 // Serial to Dynamixel
#define DEBUG_SERIAL Serial // Arduino serial monitor
const unsigned long DEBUG_BPS = 115200;
const unsigned long DXL_BPS = 1000000; // default AX-12A
const byte DXL_ID = 1; // default AX-12A
const float DXL_PROTOCOL_VERSION = 1.0; // default AX-12A
const byte DXL_DIR_PIN = 19; // Dynamixel interface board DIR PIN
const byte PIN_POT = 26;
unsigned short pot_value = 0;
int dxl_angle = 0; // 0-180° variable to store the servo position
Dynamixel2Arduino dxl(DXL_SERIAL, DXL_DIR_PIN);
void setup() {
DEBUG_SERIAL.begin(DEBUG_BPS); // serial monitor
delay(1000);
DXL_SERIAL.begin(DXL_BPS, SERIAL_8N1, 21, 22); // DXL serial ESP32 Pin 21/22
dxl.begin(DXL_BPS); // Set speed to default AX-12A
dxl.setPortProtocolVersion(DXL_PROTOCOL_VERSION); // Set Protocol Version
DEBUG_SERIAL.println("Setup done!");
delay(1000);
}
void loop() {
pot_value = analogRead(PIN_POT);
DEBUG_SERIAL.print(String("Pot value: ") + pot_value + '\t');
dxl_angle = map(pot_value,0,4095,0,280);
DEBUG_SERIAL.println(String("DXL angle: ") + dxl_angle + "°");
dxl.setGoalPosition(DXL_ID, dxl_angle, UNIT_DEGREE);
delay(10);
}
And last but not least, lets flash our Arduino program:
// ros2_esp32_dynamixel_pot.ino weigu.lu
// Using Dynamixel2Arduino (install with lib manager) and Dynamixel interface
// https://github.com/JosueAGtz/dynamixelInterface
#include <Dynamixel2Arduino.h>
#define DEBUG_SERIAL Serial // Arduino serial monitor
#define DXL_SERIAL Serial1 // Serial to Dynamixel
#define USB_SERIAL Serial2 // Serial to Raspi
const unsigned long DEBUG_BPS = 115200;
const unsigned long DXL_BPS = 1000000; // default AX-12A
const unsigned long USB_BPS = 1000000;
const byte DXL_ID = 1; // default AX-12A
const float DXL_PROTOCOL_VERSION = 1.0; // default AX-12A
const byte DXL_DIR_PIN = 19; // Dynamixel interface board DIR PIN
const byte PIN_POT = 26;
const unsigned short PREAMBLE = 0xABCD;
const unsigned long DELAY_MS = 500; // time in ms between sends
unsigned short pot_value = 0; // 0-4095 variable pot position
int servo_angle = 0; // 0-180° variable servo position
byte checksum = 0;
byte data_out[5] = {0}; // consists for the moment only of pot_value
byte data_out_length = sizeof(data_out) / sizeof(byte);
byte data_in[5] = {0}; // consists for the moment only of servo_angle
byte data_in_length = sizeof(data_in) / sizeof(byte);
Dynamixel2Arduino dxl(DXL_SERIAL, DXL_DIR_PIN);
void setup() {
DEBUG_SERIAL.begin(DEBUG_BPS); // serial monitor
delay(1000);
DXL_SERIAL.begin(DXL_BPS, SERIAL_8N1, 21, 22); // DXL serial ESP32 Pin 21/22
USB_SERIAL.begin(USB_BPS,SERIAL_8N1, 16, 17); // USB serial ESP32 Pin 16/17
dxl.begin(DXL_BPS); // Set speed to default AX-12A
dxl.setPortProtocolVersion(DXL_PROTOCOL_VERSION); // Set Protocol Version
DEBUG_SERIAL.println("Setup done!");
delay(1000);
}
void loop() {
if (non_blocking_delay(DELAY_MS)) {
send_pot_value();
}
//DEBUG_SERIAL.println(read_telegram());
if (read_telegram() == data_in_length) {
set_dynamixel();
}
delay(100); // for the dog
}
unsigned short get_pot_value() {
const byte ADC_SAMPLES = 5;
unsigned long pot_adc_sum = 0;
for (byte i = 0; i<ADC_SAMPLES; i++) {
pot_adc_sum += analogRead(PIN_POT);
}
return pot_adc_sum/ADC_SAMPLES;
}
void send_pot_value() {
data_out[0] = highByte(PREAMBLE);
data_out[1] = lowByte(PREAMBLE);
pot_value = get_pot_value();
DEBUG_SERIAL.println(pot_value);
data_out[2] = highByte(pot_value);
data_out[3] = lowByte(pot_value);
data_out[4] = xor_checksum(data_out,2,3);
for (byte i = 0; i < data_out_length; i++) {
USB_SERIAL.write(data_out[i]);
}
for (byte i = 0; i < data_out_length; i++) {
DEBUG_SERIAL.print(data_out[i],HEX);
}
DEBUG_SERIAL.println();
}
byte read_telegram() {
unsigned short serial_cnt = 0;
if (USB_SERIAL.available()) { // wait for the whole stream
delay(1); // 1ms = 100 byte with 1000000bps
}
while (USB_SERIAL.available()) {
data_in[serial_cnt] = USB_SERIAL.read();
serial_cnt++;
}
if ((data_in[0] != highByte(PREAMBLE)) || (data_in[1] != lowByte(PREAMBLE))) {
while (USB_SERIAL.available() > 0) {USB_SERIAL.read();} //clear the buffer!
return -1;
}
return serial_cnt;
}
void set_dynamixel() {
byte checksum = xor_checksum(data_in,2,3);
if (data_in[4] == checksum) {
servo_angle = data_in[2]*256+data_in[3];
DEBUG_SERIAL.println(servo_angle);
dxl.setGoalPosition(DXL_ID, servo_angle, UNIT_DEGREE);
}
}
byte xor_checksum(byte *data, byte index1, byte index2) {
checksum = 0;
for (byte i = index1; i <= index2; i++) {
checksum ^= data[i]; // XOR the current byte with the checksum
}
return checksum;
}
bool non_blocking_delay(unsigned long milliseconds) {
static unsigned long nb_delay_prev_time = 0;
if(millis() >= nb_delay_prev_time + milliseconds) {
nb_delay_prev_time += milliseconds;
return true;
}
return false;
}
Now let's create our workspace (if not already done) and node:
cd ~
mkdir -p myros2_ws/src
cd myros2_ws/src
ros2 pkg create --build-type ament_python --license Apache-2.0 py_esp32_pot_servo \ --dependencies rclpy std_msgs
In the package.xml
file change the setup.py
change the same variables and add the entry point:
...
entry_points={
'console_scripts': [
'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
#'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
],
},
We need the Python serial
library and install it with:
sudo apt install python3-serial
Next we create our python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo
.
I had problems to use a use an external module (serial communication) with the node program, so I integrated the communication class in the node program. It contains also some functions not used, but helpful if debugging is necessary (I omit them here).
The name of the script is ros2_esp32_get_pot.py
:
#!/usr/bin/env python
import rclpy # the imports match the dependencies from package. xml
from rclpy.node import Node # import python node class
from std_msgs.msg import String # import the built-in string message type
import serial
SER_PORT = "/dev/ttyUSB0"
SER_SPEED = 1000000
""" I could not import an external module, so I added the usb_serial code here"""
class SerialComm():
""" Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
(2 byte for now) + CRC (1byte) """
def __init__(self, port, speed):
self.port = port
self.speed = speed
self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
self.crc_index = 4 # crc index in data stream (1 byte)
def check_if_crc_ok(self, crc, crc_data):
""" xor checksum over bytes from index1 to index2 included
and compare with data checksum """
checksum = 0
for byte in crc_data: # iterate over each byte in the data
checksum ^= byte # XOR the byte with the checksum
if crc == checksum:
return True
else:
return False
def get_pot_value(self):
ser = serial.Serial(self.port, self.speed) # open the serial port
data = ser.read(5)
#print(data.hex())
if data[0:2] == self.preambel:
pot_value = data[self.pot_val_index1]*256+data[self.pot_val_index2]
#print("Pot value: ", pot_value, end="\t")
crc = data[self.crc_index]
crc_data = data[self.data_index1:self.data_index2+1] # checksum data values
if self.check_if_crc_ok(crc, crc_data):
#print("Checksum ok")
flag = True
else:
#print("Checksum error")
flag = False
else:
#print("Data acquisition error!")
flag = False
ser.close()
if flag:
return pot_value
else:
return -1
class PotPublisher(Node): # our pub node class
def __init__(self):
super().__init__('pot_publisher') # pass name to node class
self.publisher_ = self.create_publisher(String, 'pot_value', 10)
timer_period = 0.5 # 0.5 seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0 # counter for message
self.serial_comm = SerialComm(SER_PORT, SER_SPEED)
def timer_callback(self): # is called every 0.5s from timer
pot_value = self.serial_comm.get_pot_value()
msg = String()
if pot_value != -1:
msg.data = str(pot_value)
else:
msg.data = "error"
self.get_logger().info('Communication or CRC error') # log to console
self.publisher_.publish(msg) # publish to topic
self.get_logger().info('Publishing: "%s"' % msg.data) # log to console
self.i += 1 # increment counter
def main(args=None):
rclpy.init(args=args) # init rclpy
pot_publisher = PotPublisher() # create publisher node
rclpy.spin(pot_publisher) # loop the node so callbacks occur
pot_publisher.destroy_node() # destroy the node explicitly
rclpy.shutdown() # shutdown rclpy
if __name__ == '__main__':
main()
Check in the setup script that your path is $base/lib/py_pubsub
. Then we run rosdep
in the root of our workspace to check for missing dependencies before building. Then we built the package and run the node:
cd myros2_ws
rosdep install -i --from-path src --rosdistro jazzy -y
#rosdep install -i --from-path src --rosdistro humble -y
colcon build --packages-select py_esp32_pot_servo
source install/setup.bash
ros2 run py_esp32_pot_servo potentiometer
Now let's create a second node residing in the same package.
We add in setup.py
the second entry point:
...
entry_points={
'console_scripts': [
'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
],
},
Then we create our second python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo
.
The name of the script is ros2_esp32_set_servo_pot.py
:
#!/usr/bin/env python
import rclpy # the imports match the dependencies from package. xml
from rclpy.node import Node # import python node class
from std_msgs.msg import String # import the built-in string message type
import serial
from time import sleep
SER_PORT = "/dev/ttyUSB0"
SER_SPEED = 1000000
""" I could not import an external module, so I added the usb_serial code here"""
class SerialComm():
""" Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
(2 byte for now) + CRC (1byte) """
def __init__(self, port, speed):
self.port = port
self.speed = speed
self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
self.servo_val_index1 = 2 # hbyte index of servo value in data_stream
self.servo_val_index2 = 3 # lbyte index of servo value in data_stream
self.crc_index = 4 # crc index in data stream (1 byte)
self.data_bytes_pot = 5
self.data_bytes_servo = 5
def check_if_crc_ok(self, crc, crc_data):
""" Compare data crc with with checksum """
checksum = self.calculate_checksum(crc_data)
for byte in crc_data: # iterate over each byte in the data
checksum ^= byte # XOR the byte with the checksum
if crc == checksum:
return True
else:
return False
def calculate_checksum(self, crc_data):
""" xor checksum over bytes from index1 to index2 included"""
checksum = 0
for byte in crc_data: # iterate over each byte in the data
checksum ^= byte # XOR the byte with the checksum
return checksum
def send_angle_2_servo(self, angle):
ser = serial.Serial(self.port, self.speed) # open the serial port
data = [None]*5
data[0:2] = self.preambel
data[self.servo_val_index1:self.servo_val_index2+1] = angle.to_bytes(2, 'big')
data[self.crc_index] = self.calculate_checksum(data[2:4])
ser.write(bytearray(data))
ser.close()
class Send2Servo(Node): # another pub node class
def __init__(self):
self.angle = 0
super().__init__('send_angle_2_servo')
self.subscription = self.create_subscription(
String,
'pot_value',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
timer_period = 0.5 # 0.5 seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.serial_comm = SerialComm(SER_PORT, SER_SPEED)
def timer_callback(self): # is called every 0.5s from timer
self.serial_comm.send_angle_2_servo(self.angle)
self.get_logger().info('I send: "%s"' % str(self.angle))
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
self.angle = int(int(msg.data)*180/4095)
def main(args=None):
rclpy.init(args=args) # init rclpy
send_angle_to_servo = Send2Servo() # create publisher node
rclpy.spin(send_angle_to_servo) # loop the node so callbacks occur
send_angle_to_servo.destroy_node() # destroy the node explicitly
rclpy.shutdown() # shutdown rclpy
if __name__ == '__main__':
main()
cd myros2_ws
rosdep install -i --from-path src --rosdistro jazzy -y
#rosdep install -i --from-path src --rosdistro humble -y
colcon build --packages-select py_esp32_pot_servo
source install/setup.bash
ros2 run py_esp32_pot_servo servo
If both nodes are running we should see something like this on the screen (2 ssh terminal sessions and the output od Arduino serial monitor):