last updated: 2024-08-27
I will use a simple ESP32 circuit (Arduino) with potentiometer (sensor) and servo (actuator) and create two ROS2 nodes to access these.
Tested with Jazzy and Humble
For more information about ROS2: https://www.weigu.lu/sb-computer/ros2_basics/index.html
We need a device with a variable input (sensor) and output (servo) to test two nodes, so let's quickly built one.
BOM:
Platform:
A 10 kΩ potentiometer (variable resistance) will be connected to 3.3 V and deliver a voltage between 0 V and 3.3 V. The ADC of the ESP32 (GPIO18) will convert this voltage with 12 bit in a value between 0
and 4095
.
The actuator will be a servo (GPIO26). The value for the servo ranges from 0
to 180
corresponding to an angle of 0° to 180°. We need the ESP32servo
library (Kevin Harrington). Install it with the Tools > Manage libraries...
.
Use this circuit:
I will use a similar packet structure as it was used for the JPL NASA open source robot (osr). The communication goes over a good old serial interface. On ESP we use Serial2 on the pins 16 (RxD2) and 17 (TxD2). RxD2 is connected to TxD and TxD2 to RxD of an 3V USB to Serial adapter that uses an USB port of the Raspi. Don't forget to connect ground.
The serial data will have a two byte preamble (or syncword). These are the teo first bytes of every single message. The preamble is 0xABCD
. A 1 byte CRC-8 Checksum provides verification that the frame was received correctly. It is computed of only the data bytes and is stored at the end of the packet. It can be re-calculated on both sides of the data stream to check if the data transmission was correct. As we need here only one data value, the data consists in this example from 2 bytes. Here is an example of the frame (pot. value 1562):
Preamblel HByte | Preamblel LByte | Data HByte | Data LByte | CRC |
---|---|---|---|---|
0xAB | 0xCD | 0x06 | 0x1A | 0x1C |
First let's test the serial connection. We use 1000000 bit/s :).
// esp32_hardware_serial_test.ino
byte incoming_byte;
long counter = 0;
void setup() {
Serial.begin(115200); // initialize serial monitor
Serial2.begin(1000000,SERIAL_8N1, 16, 17); //init hardware serial
delay(500);
Serial.println("Setup done!");
}
void loop() {
Serial.println(String("Hello! ") + counter); // msg to Raspi
Serial2.println(String("Hello! ") + counter); // msg to Raspi
while (Serial2.available() > 0) { // if any serial available, read it
incoming_byte = Serial2.read(); // and write it to the terminal
Serial.write(incoming_byte); // alt: Serial.write(Serial2.read());
}
Serial.print(".");
delay(2000);
counter++;
}
On the Raspi look with sudo dmesg
for the name of your adapter (e.g. ttyUSB0, ttyAMA0). Then we configure the interface and wait for 15 bytes.
stty -F /dev/ttyUSB0 speed 10000000 cs8 -cstopb -parenb -echo
hexdump -C -n 15 /dev/ttyUSB0
We should see a "Hello!". To send something to the ESP32 (Arduino Serial monitor) we use:
echo -en '\x6f\x6f\x6f\x6f\x0a0' > /dev/ttyUSB0
We should see a "oooo".
Next let's test the potentiometer and servo connections:
// esp32_hardware_pot_and_servo_test.ino
#include <ESP32Servo.h>
const byte PIN_POT = 26;
const byte PIN_SERVO = 18;
unsigned short pot_value = 0;
int servo_angle = 0; // 0-180° variable to store the servo position
Servo My_Servo;
void setup() {
Serial.begin(115200); // initialize serial for terminal
My_Servo.setPeriodHertz(50); // see lib example
My_Servo.attach(PIN_SERVO, 1000, 2000); // see lib example
delay(500);
Serial.println("Setup done!");
}
void loop() {
pot_value = analogRead(PIN_POT);
Serial.print(String("Pot value: ") + pot_value + '\t');
servo_angle = map(pot_value,0,4095,0,180);
Serial.println(String("Servo angle: ") + servo_angle);
My_Servo.write(servo_angle);
delay(10);
}
And last but not least, lets flash our Arduino program:
// ros2_esp32_servo_pot.ino weigu.lu
// Using ESP32Servo (install with lib manager)
// https://github.com/jkb-git/ESP32Servo
#include <ESP32Servo.h>
const long SER_SPEED = 1000000;
const byte PIN_POT = 26;
const byte PIN_SERVO = 18;
const unsigned short PREAMBLE = 0xABCD;
const unsigned long DELAY_MS = 500; // time in ms between sends
unsigned short pot_value = 0; // 0-4095 variable to store pot position
int servo_angle = 0; // 0-180° variable to store the servo position
byte checksum = 0;
byte data_out[5] = {0}; // consists for the moment only of pot_value
byte data_out_length = sizeof(data_out) / sizeof(byte);
byte data_in[5] = {0}; // consists for the moment only of servo_angle
byte data_in_length = sizeof(data_in) / sizeof(byte);
Servo My_Servo;
void setup() {
Serial.begin(115200); // initialize serial monitor
Serial2.begin(SER_SPEED,SERIAL_8N1, 16, 17); //init hardware serial
My_Servo.setPeriodHertz(50); // see lib example
My_Servo.attach(PIN_SERVO, 1000, 2000); // see lib example
delay(500);
Serial.println("Setup done!");
}
void loop() {
if (non_blocking_delay(DELAY_MS)) {
send_pot_value();
}
if (read_telegram() == data_in_length) {
set_servo();
}
delay(1); // for the dog
}
unsigned short get_pot_value() {
const byte ADC_SAMPLES = 5;
unsigned long pot_adc_sum = 0;
for (byte i = 0; i<ADC_SAMPLES; i++) {
pot_adc_sum += analogRead(PIN_POT);
}
return pot_adc_sum/ADC_SAMPLES;
}
void send_pot_value() {
data_out[0] = highByte(PREAMBLE);
data_out[1] = lowByte(PREAMBLE);
pot_value = get_pot_value();
Serial.println(pot_value);
data_out[2] = highByte(pot_value);
data_out[3] = lowByte(pot_value);
data_out[4] = xor_checksum(data_out,2,3);
for (byte i = 0; i < data_out_length; i++) {
Serial2.write(data_out[i]);
}
for (byte i = 0; i < data_out_length; i++) {
Serial.print(data_out[i],HEX);
}
Serial.println();
}
byte read_telegram() {
unsigned short serial_cnt = 0;
if (Serial2.available()) { // wait for the whole stream
delay(1); // 1ms = 100 byte with 1000000bps
}
while (Serial2.available()) {
data_in[serial_cnt] = Serial2.read();
serial_cnt++;
}
if ((data_in[0] != highByte(PREAMBLE)) || (data_in[1] != lowByte(PREAMBLE))) {
while (Serial2.available() > 0) {Serial2.read();} //clear the buffer!
return -1;
}
return serial_cnt;
}
void set_servo() {
byte checksum = xor_checksum(data_in,2,3);
if (data_in[4] == checksum) {
servo_angle = data_in[2]*256+data_in[3];
Serial.println(servo_angle);
My_Servo.write(servo_angle);
}
}
byte xor_checksum(byte *data, byte index1, byte index2) {
checksum = 0;
for (byte i = index1; i <= index2; i++) {
checksum ^= data[i]; // XOR the current byte with the checksum
}
return checksum;
}
bool non_blocking_delay(unsigned long milliseconds) {
static unsigned long nb_delay_prev_time = 0;
if(millis() >= nb_delay_prev_time + milliseconds) {
nb_delay_prev_time += milliseconds;
return true;
}
return false;
}
Now let's create our workspace (if not already done) and node:
cd ~
mkdir -p myros2_ws/src
cd myros2_ws/src
ros2 pkg create --build-type ament_python --license Apache-2.0 py_esp32_pot_servo \ --dependencies rclpy std_msgs
In the package.xml
file change the setup.py
change the same variables and add the entry point:
...
entry_points={
'console_scripts': [
'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
#'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
],
},
We need the Python serial
library and install it with:
sudo apt install python3-serial
Next we create our python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo
.
I had problems to use a use an external module (serial communication) with the node program, so I integrated the communication class in the node program. It contains also some functions not used, but helpful if debugging is necessary (I omit them here).
The name of the script is ros2_esp32_get_pot.py
:
#!/usr/bin/env python
import rclpy # the imports match the dependencies from package. xml
from rclpy.node import Node # import python node class
from std_msgs.msg import String # import the built-in string message type
import serial
SER_PORT = "/dev/ttyUSB0"
SER_SPEED = 1000000
""" I could not import an external module, so I added the usb_serial code here"""
class SerialComm():
""" Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
(2 byte for now) + CRC (1byte) """
def __init__(self, port, speed):
self.port = port
self.speed = speed
self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
self.crc_index = 4 # crc index in data stream (1 byte)
def check_if_crc_ok(self, crc, crc_data):
""" xor checksum over bytes from index1 to index2 included
and compare with data checksum """
checksum = 0
for byte in crc_data: # iterate over each byte in the data
checksum ^= byte # XOR the byte with the checksum
if crc == checksum:
return True
else:
return False
def get_pot_value(self):
ser = serial.Serial(self.port, self.speed) # open the serial port
data = ser.read(5)
#print(data.hex())
if data[0:2] == self.preambel:
pot_value = data[self.pot_val_index1]*256+data[self.pot_val_index2]
#print("Pot value: ", pot_value, end="\t")
crc = data[self.crc_index]
crc_data = data[self.data_index1:self.data_index2+1] # checksum data values
if self.check_if_crc_ok(crc, crc_data):
#print("Checksum ok")
flag = True
else:
#print("Checksum error")
flag = False
else:
#print("Data acquisition error!")
flag = False
ser.close()
if flag:
return pot_value
else:
return -1
class PotPublisher(Node): # our pub node class
def __init__(self):
super().__init__('pot_publisher') # pass name to node class
self.publisher_ = self.create_publisher(String, 'pot_value', 10)
timer_period = 0.5 # 0.5 seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0 # counter for message
self.serial_comm = SerialComm(SER_PORT, SER_SPEED)
def timer_callback(self): # is called every 0.5s from timer
pot_value = self.serial_comm.get_pot_value()
msg = String()
if pot_value != -1:
msg.data = str(pot_value)
else:
msg.data = "error"
self.get_logger().info('Communication or CRC error') # log to console
self.publisher_.publish(msg) # publish to topic
self.get_logger().info('Publishing: "%s"' % msg.data) # log to console
self.i += 1 # increment counter
def main(args=None):
rclpy.init(args=args) # init rclpy
pot_publisher = PotPublisher() # create publisher node
rclpy.spin(pot_publisher) # loop the node so callbacks occur
pot_publisher.destroy_node() # destroy the node explicitly
rclpy.shutdown() # shutdown rclpy
if __name__ == '__main__':
main()
Check in the setup script that your path is $base/lib/py_pubsub
. Then we run rosdep
in the root of our workspace to check for missing dependencies before building. Then we built the package and run the node:
cd myros2_ws
rosdep install -i --from-path src --rosdistro jazzy -y
#rosdep install -i --from-path src --rosdistro humble -y
colcon build --packages-select py_esp32_pot_servo
source install/setup.bash
ros2 run py_esp32_pot_servo potentiometer
Now let's create a second node residing in the same package.
We add in setup.py
the second entry point:
...
entry_points={
'console_scripts': [
'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
],
},
Then we create our second python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo
.
The name of the script is ros2_esp32_set_servo_pot.py
:
#!/usr/bin/env python
import rclpy # the imports match the dependencies from package. xml
from rclpy.node import Node # import python node class
from std_msgs.msg import String # import the built-in string message type
import serial
from time import sleep
SER_PORT = "/dev/ttyUSB0"
SER_SPEED = 1000000
""" I could not import an external module, so I added the usb_serial code here"""
class SerialComm():
""" Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
(2 byte for now) + CRC (1byte) """
def __init__(self, port, speed):
self.port = port
self.speed = speed
self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
self.servo_val_index1 = 2 # hbyte index of servo value in data_stream
self.servo_val_index2 = 3 # lbyte index of servo value in data_stream
self.crc_index = 4 # crc index in data stream (1 byte)
self.data_bytes_pot = 5
self.data_bytes_servo = 5
def check_if_crc_ok(self, crc, crc_data):
""" Compare data crc with with checksum """
checksum = self.calculate_checksum(crc_data)
for byte in crc_data: # iterate over each byte in the data
checksum ^= byte # XOR the byte with the checksum
if crc == checksum:
return True
else:
return False
def calculate_checksum(self, crc_data):
""" xor checksum over bytes from index1 to index2 included"""
checksum = 0
for byte in crc_data: # iterate over each byte in the data
checksum ^= byte # XOR the byte with the checksum
return checksum
def send_angle_2_servo(self, angle):
ser = serial.Serial(self.port, self.speed) # open the serial port
data = [None]*5
data[0:2] = self.preambel
data[self.servo_val_index1:self.servo_val_index2+1] = angle.to_bytes(2, 'big')
data[self.crc_index] = self.calculate_checksum(data[2:4])
ser.write(bytearray(data))
ser.close()
class Send2Servo(Node): # another pub node class
def __init__(self):
self.angle = 0
super().__init__('send_angle_2_servo')
self.subscription = self.create_subscription(
String,
'pot_value',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
timer_period = 0.5 # 0.5 seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.serial_comm = SerialComm(SER_PORT, SER_SPEED)
def timer_callback(self): # is called every 0.5s from timer
self.serial_comm.send_angle_2_servo(self.angle)
self.get_logger().info('I send: "%s"' % str(self.angle))
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
self.angle = int(int(msg.data)*180/4095)
def main(args=None):
rclpy.init(args=args) # init rclpy
send_angle_to_servo = Send2Servo() # create publisher node
rclpy.spin(send_angle_to_servo) # loop the node so callbacks occur
send_angle_to_servo.destroy_node() # destroy the node explicitly
rclpy.shutdown() # shutdown rclpy
if __name__ == '__main__':
main()
cd myros2_ws
rosdep install -i --from-path src --rosdistro jazzy -y
#rosdep install -i --from-path src --rosdistro humble -y
colcon build --packages-select py_esp32_pot_servo
source install/setup.bash
ros2 run py_esp32_pot_servo servo
If both nodes are running we should see something like this on the screen (2 ssh terminal sessions and the output od Arduino serial monitor):