Single board computer projects

ROS2 nodes for ESP32 with potentiometer (sensor) and servo (actuator)

last updated: 2024-08-27

Quick links

Intro

I will use a simple ESP32 circuit (Arduino) with potentiometer (sensor) and servo (actuator) and create two ROS2 nodes to access these.

Tested with Jazzy and Humble

For more information about ROS2: https://www.weigu.lu/sb-computer/ros2_basics/index.html

Building and testing the hardware

We need a device with a variable input (sensor) and output (servo) to test two nodes, so let's quickly built one.

Bom and circuit

BOM:

Platform:

A 10 kΩ potentiometer (variable resistance) will be connected to 3.3 V and deliver a voltage between 0 V and 3.3 V. The ADC of the ESP32 (GPIO18) will convert this voltage with 12 bit in a value between 0 and 4095.

The actuator will be a servo (GPIO26). The value for the servo ranges from 0 to 180 corresponding to an angle of 0° to 180°. We need the ESP32servo library (Kevin Harrington). Install it with the Tools > Manage libraries... .

Use this circuit:


ROS2 device

ESP32 Hardware

Communication between ESP32 and Raspi

I will use a similar packet structure as it was used for the JPL NASA open source robot (osr). The communication goes over a good old serial interface. On ESP we use Serial2 on the pins 16 (RxD2) and 17 (TxD2). RxD2 is connected to TxD and TxD2 to RxD of an 3V USB to Serial adapter that uses an USB port of the Raspi. Don't forget to connect ground.

The serial data will have a two byte preamble (or syncword). These are the teo first bytes of every single message. The preamble is 0xABCD. A 1 byte CRC-8 Checksum provides verification that the frame was received correctly. It is computed of only the data bytes and is stored at the end of the packet. It can be re-calculated on both sides of the data stream to check if the data transmission was correct. As we need here only one data value, the data consists in this example from 2 bytes. Here is an example of the frame (pot. value 1562):

Preamblel HByte Preamblel LByte Data HByte Data LByte CRC
0xAB 0xCD 0x06 0x1A 0x1C
Testing basic serial communication

First let's test the serial connection. We use 1000000 bit/s :).

    // esp32_hardware_serial_test.ino

    byte incoming_byte;
    long counter = 0;

    void setup() {
      Serial.begin(115200);                     // initialize serial monitor
      Serial2.begin(1000000,SERIAL_8N1, 16, 17); //init hardware serial
      delay(500);
      Serial.println("Setup done!");
    }

    void loop() {
      Serial.println(String("Hello! ") + counter); // msg to Raspi
      Serial2.println(String("Hello! ") + counter); // msg to Raspi
      while (Serial2.available() > 0) {  // if any serial available, read it
        incoming_byte = Serial2.read();  // and write it to the terminal
        Serial.write(incoming_byte);     // alt: Serial.write(Serial2.read());
      }
      Serial.print(".");
      delay(2000);
      counter++;
    }

On the Raspi look with sudo dmesg for the name of your adapter (e.g. ttyUSB0, ttyAMA0). Then we configure the interface and wait for 15 bytes.

    stty -F /dev/ttyUSB0 speed 10000000 cs8 -cstopb -parenb -echo
    hexdump -C -n 15 /dev/ttyUSB0

We should see a "Hello!". To send something to the ESP32 (Arduino Serial monitor) we use:

    echo -en '\x6f\x6f\x6f\x6f\x0a0' > /dev/ttyUSB0

We should see a "oooo".

Testing the potentiometer and servo

Next let's test the potentiometer and servo connections:

    // esp32_hardware_pot_and_servo_test.ino

    #include <ESP32Servo.h>

    const byte PIN_POT = 26;
    const byte PIN_SERVO = 18;
    unsigned short pot_value = 0;
    int servo_angle = 0;    // 0-180° variable to store the servo position

    Servo My_Servo;

    void setup() {
      Serial.begin(115200);              // initialize serial for terminal  
      My_Servo.setPeriodHertz(50);            // see lib example
      My_Servo.attach(PIN_SERVO, 1000, 2000); // see lib example
      delay(500);
      Serial.println("Setup done!");
    }

    void loop() {
      pot_value = analogRead(PIN_POT);    
      Serial.print(String("Pot value: ") + pot_value + '\t');
      servo_angle = map(pot_value,0,4095,0,180);
      Serial.println(String("Servo angle: ") + servo_angle);  
      My_Servo.write(servo_angle);  
      delay(10);
    }

Flashing the Arduino program

And last but not least, lets flash our Arduino program:

    // ros2_esp32_servo_pot.ino  weigu.lu
    // Using ESP32Servo (install with lib manager) 
    // https://github.com/jkb-git/ESP32Servo

    #include <ESP32Servo.h>

    const long SER_SPEED = 1000000;
    const byte PIN_POT = 26;
    const byte PIN_SERVO = 18;
    const unsigned short PREAMBLE = 0xABCD;
    const unsigned long DELAY_MS = 500; // time in ms between sends
    unsigned short pot_value = 0;       // 0-4095 variable to store pot position
    int servo_angle = 0;                // 0-180° variable to store the servo position
    byte checksum = 0;
    byte data_out[5] = {0};  // consists for the moment only of pot_value 
    byte data_out_length = sizeof(data_out) / sizeof(byte);
    byte data_in[5] = {0};  // consists for the moment only of servo_angle 
    byte data_in_length = sizeof(data_in) / sizeof(byte);

    Servo My_Servo;

    void setup() {
      Serial.begin(115200);                        // initialize serial monitor
      Serial2.begin(SER_SPEED,SERIAL_8N1, 16, 17); //init hardware serial  
      My_Servo.setPeriodHertz(50);                 // see lib example
      My_Servo.attach(PIN_SERVO, 1000, 2000);      // see lib example
      delay(500);
      Serial.println("Setup done!");
    }

    void loop() {
      if (non_blocking_delay(DELAY_MS)) {
        send_pot_value();    
      }
      if (read_telegram() == data_in_length) {
        set_servo();

      }
      delay(1); // for the dog
    }

    unsigned short get_pot_value() {  
      const byte ADC_SAMPLES = 5;
      unsigned long pot_adc_sum = 0;    
      for (byte i = 0; i<ADC_SAMPLES; i++) {
        pot_adc_sum += analogRead(PIN_POT);
      }
      return pot_adc_sum/ADC_SAMPLES;
    }

    void send_pot_value() {
      data_out[0] = highByte(PREAMBLE);  
      data_out[1] = lowByte(PREAMBLE);    
      pot_value = get_pot_value();
      Serial.println(pot_value);
      data_out[2] = highByte(pot_value);
      data_out[3] = lowByte(pot_value);
      data_out[4] = xor_checksum(data_out,2,3);
      for (byte i = 0; i < data_out_length; i++) {
        Serial2.write(data_out[i]);    
      }  
      for (byte i = 0; i < data_out_length; i++) {
        Serial.print(data_out[i],HEX);
      }  
      Serial.println();
    }

    byte read_telegram() {
      unsigned short serial_cnt = 0;  
      if (Serial2.available()) {   // wait for the whole stream
        delay(1); // 1ms = 100 byte with 1000000bps
        }  
      while (Serial2.available()) {
        data_in[serial_cnt] = Serial2.read();
        serial_cnt++;
      }
      if ((data_in[0] != highByte(PREAMBLE)) || (data_in[1] != lowByte(PREAMBLE))) {    
        while (Serial2.available() > 0) {Serial2.read();} //clear the buffer!    
        return -1;
      }
      return serial_cnt;
    }

    void set_servo() {
    byte checksum = xor_checksum(data_in,2,3);
      if (data_in[4] == checksum) {
        servo_angle = data_in[2]*256+data_in[3];
        Serial.println(servo_angle);           
        My_Servo.write(servo_angle);        
      }
    }

    byte xor_checksum(byte *data, byte index1, byte index2) {
      checksum = 0;
      for (byte i = index1; i <= index2; i++) {
        checksum ^= data[i]; // XOR the current byte with the checksum
      }
      return checksum; 
    }

    bool non_blocking_delay(unsigned long milliseconds) {
      static unsigned long nb_delay_prev_time = 0;
      if(millis() >= nb_delay_prev_time + milliseconds) {
        nb_delay_prev_time += milliseconds;
        return true;
      }
      return false;
    } 

Creating a real sensor node (potentiometer on ESP32)

Now let's create our workspace (if not already done) and node:

    cd ~
    mkdir -p myros2_ws/src
    cd myros2_ws/src
    ros2 pkg create --build-type ament_python --license Apache-2.0 py_esp32_pot_servo \ --dependencies rclpy std_msgs 

In the package.xml file change the , , and tags. In setup.py change the same variables and add the entry point:

    ...
    entry_points={
       'console_scripts': [
            'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
            #'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
        ],
    },

We need the Python serial library and install it with:

    sudo apt install python3-serial

Next we create our python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo. I had problems to use a use an external module (serial communication) with the node program, so I integrated the communication class in the node program. It contains also some functions not used, but helpful if debugging is necessary (I omit them here).

The name of the script is ros2_esp32_get_pot.py:

    #!/usr/bin/env python
    import rclpy       # the imports match the dependencies from package. xml
    from rclpy.node import Node     # import python node class
    from std_msgs.msg import String # import the built-in string message type
    import serial

    SER_PORT = "/dev/ttyUSB0"
    SER_SPEED = 1000000

    """ I could not import an external module, so I added the usb_serial code here"""
    class SerialComm():
        """ Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
            (2 byte for now) + CRC (1byte) """
        def __init__(self, port, speed):
            self.port = port
            self.speed = speed
            self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
            self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
            self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
            self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
            self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
            self.crc_index = 4   # crc index in data stream (1 byte)

        def check_if_crc_ok(self, crc, crc_data):
            """ xor checksum over bytes from index1 to index2 included
                and compare with data checksum """
            checksum = 0
            for byte in crc_data:   # iterate over each byte in the data
                checksum ^= byte    # XOR the byte with the checksum
            if crc == checksum:
                return True
            else:
                return False

        def get_pot_value(self):
            ser = serial.Serial(self.port, self.speed) # open the serial port
            data = ser.read(5)
            #print(data.hex())
            if data[0:2]  == self.preambel:
                pot_value = data[self.pot_val_index1]*256+data[self.pot_val_index2]
                #print("Pot value: ", pot_value, end="\t")
                crc = data[self.crc_index]
                crc_data = data[self.data_index1:self.data_index2+1]    # checksum data values
                if self.check_if_crc_ok(crc, crc_data):
                    #print("Checksum ok")
                    flag = True
                else:
                    #print("Checksum error")
                    flag = False
            else:
                #print("Data acquisition error!")
                flag = False
            ser.close()
            if flag:
                return pot_value
            else:
                return -1

    class PotPublisher(Node): # our pub node class
        def __init__(self):
            super().__init__('pot_publisher') # pass name to node class
            self.publisher_ = self.create_publisher(String, 'pot_value', 10)
            timer_period = 0.5    # 0.5 seconds
            self.timer = self.create_timer(timer_period, self.timer_callback)
            self.i = 0            # counter for message
            self.serial_comm = SerialComm(SER_PORT, SER_SPEED)

        def timer_callback(self): # is called every 0.5s from timer
            pot_value = self.serial_comm.get_pot_value()
            msg = String()
            if pot_value != -1:
                msg.data = str(pot_value)
            else:
                msg.data = "error"
                self.get_logger().info('Communication or CRC error') # log to console
            self.publisher_.publish(msg) # publish to topic
            self.get_logger().info('Publishing: "%s"' % msg.data) # log to console
            self.i += 1           # increment counter


    def main(args=None):
        rclpy.init(args=args)            # init rclpy
        pot_publisher = PotPublisher()   # create publisher node
        rclpy.spin(pot_publisher)        # loop the node so callbacks occur
        pot_publisher.destroy_node()     # destroy the node explicitly
        rclpy.shutdown()                 # shutdown rclpy

    if __name__ == '__main__':
        main()

Check in the setup script that your path is $base/lib/py_pubsub. Then we run rosdep in the root of our workspace to check for missing dependencies before building. Then we built the package and run the node:

    cd myros2_ws
    rosdep install -i --from-path src --rosdistro jazzy -y
    #rosdep install -i --from-path src --rosdistro humble -y
    colcon build --packages-select py_esp32_pot_servo
    source install/setup.bash
    ros2 run py_esp32_pot_servo potentiometer    
Creating a real actuator node (servo on ESP32)

Now let's create a second node residing in the same package.

We add in setup.py the second entry point:

    ...
    entry_points={
       'console_scripts': [
            'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
            'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
        ],
    },

Then we create our second python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo.

The name of the script is ros2_esp32_set_servo_pot.py:

    #!/usr/bin/env python

    import rclpy       # the imports match the dependencies from package. xml
    from rclpy.node import Node     # import python node class
    from std_msgs.msg import String # import the built-in string message type
    import serial
    from time import sleep

    SER_PORT = "/dev/ttyUSB0"
    SER_SPEED = 1000000

    """ I could not import an external module, so I added the usb_serial code here"""
    class SerialComm():
        """ Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
            (2 byte for now) + CRC (1byte) """
        def __init__(self, port, speed):
            self.port = port
            self.speed = speed
            self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
            self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
            self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
            self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
            self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
            self.servo_val_index1 = 2 # hbyte index of servo value in data_stream
            self.servo_val_index2 = 3 # lbyte index of servo value in data_stream
            self.crc_index = 4   # crc index in data stream (1 byte)
            self.data_bytes_pot = 5
            self.data_bytes_servo = 5

        def check_if_crc_ok(self, crc, crc_data):
            """ Compare data crc with with checksum """
            checksum = self.calculate_checksum(crc_data)
            for byte in crc_data:   # iterate over each byte in the data
                checksum ^= byte    # XOR the byte with the checksum
            if crc == checksum:
                return True
            else:
                return False

        def calculate_checksum(self, crc_data):
            """ xor checksum over bytes from index1 to index2 included"""
            checksum = 0
            for byte in crc_data:   # iterate over each byte in the data
                checksum ^= byte    # XOR the byte with the checksum
            return checksum

        def send_angle_2_servo(self, angle):
            ser = serial.Serial(self.port, self.speed) # open the serial port
            data = [None]*5
            data[0:2] = self.preambel
            data[self.servo_val_index1:self.servo_val_index2+1] = angle.to_bytes(2, 'big')
            data[self.crc_index] = self.calculate_checksum(data[2:4])
            ser.write(bytearray(data))
            ser.close()

    class Send2Servo(Node): # another pub node class
        def __init__(self):
            self.angle = 0
            super().__init__('send_angle_2_servo')
            self.subscription = self.create_subscription(
                String,
                'pot_value',
                self.listener_callback,
                10)
            self.subscription  # prevent unused variable warning
            timer_period = 0.5    # 0.5 seconds
            self.timer = self.create_timer(timer_period, self.timer_callback)
            self.serial_comm = SerialComm(SER_PORT, SER_SPEED)

        def timer_callback(self): # is called every 0.5s from timer
            self.serial_comm.send_angle_2_servo(self.angle)
            self.get_logger().info('I send: "%s"' % str(self.angle))

        def listener_callback(self, msg):
            self.get_logger().info('I heard: "%s"' % msg.data)
            self.angle = int(int(msg.data)*180/4095)

    def main(args=None):
        rclpy.init(args=args)               # init rclpy
        send_angle_to_servo = Send2Servo()  # create publisher node
        rclpy.spin(send_angle_to_servo)    # loop the node so callbacks occur
        send_angle_to_servo.destroy_node()  # destroy the node explicitly
        rclpy.shutdown()                    # shutdown rclpy

    if __name__ == '__main__':
        main()
    cd myros2_ws
    rosdep install -i --from-path src --rosdistro jazzy -y
    #rosdep install -i --from-path src --rosdistro humble -y
    colcon build --packages-select py_esp32_pot_servo
    source install/setup.bash
    ros2 run py_esp32_pot_servo servo

If both nodes are running we should see something like this on the screen (2 ssh terminal sessions and the output od Arduino serial monitor):

Screenshot 2 terminal
Click for bigger picture

Downloads

Interesting links