Single board computer projects

ROS2 node for ESP32 with potentiometer (sensor) and Dynamixel servo (actuator)

last updated: 2024-08-27

Quick links

Intro

I used a simple ESP32 circuit (Arduino) with potentiometer (sensor) and servo (actuator) to create 2 ROS2 nodes here: https://www.weigu.lu/sb-computer/ros2_basics/ros2_esp32_pot_servo/index.html.

For robots that need feedback a normal servo is not the best solution. So here I will use a Dynamixel servo as actuator.

For more information about ROS2: https://www.weigu.lu/sb-computer/ros2_basics/index.html

Dynamixel AX-12A

Dynamixel servos have a sophisticated controller on-board, that uses a proprietary digital communication protocol over a bus (no PWM). This allows for daisy-chaining multiple servos. They provide telemetry feedback (integrated sensors) and you can tune the position control loop and set parameters like current limiting. They are more robust (encoder instead a potentiometer) and easier to integrate into a robot than a common hobby servo.

I will use an AX-12A Servo (now replaced with an XL430-W250). This AX-12A needs 11.1 V , can turn 0-300 ° (endless) and has a resolution of 0.29 °. The communication uses half duplex asynchronous serial communication (8N1) with TTL level (5 V). The stall torque is 1.5 Nm (12 V, 1.5 A). More infos here: https://emanual.robotis.com/docs/en/dxl/ax/ax-12a/.

To connect the servo to the ESP32 I use a little interface board with level shifter and tri-state buffer to get a good communication: https://github.com/JosueAGtz/dynamixelInterface. The schematic is here: https://github.com/JosueAGtz/dynamixelInterface/blob/main/Hardware/Dynamixel%20Interface%20Sch.pdf.

Building and testing the hardware

BOM:

Platform:

A 10 kΩ potentiometer (variable resistance) will be connected to 3.3 V and deliver a voltage between 0 V and 3.3 V. The ADC of the ESP32 (GPIO18) will convert this voltage with 12 bit in a value between 0 and 4095.

The actuator will be a Dynamixel Servo. The value for the servo ranges from 0 to 280 corresponding to an angle of 0° to 280°. We need the Dynamixel2Arduino library (Robotis). Install it with the Tools > Manage libraries... .

Use this circuit:


ROS2 device

ESP32 Hardware

Dynamixel with Arduino

First let's test the hardware. Her a little sketch to turn the Dynamixel using the Dynamixel2Arduino library (look also at the Arduino examples):

    /*
      dynamixel_read_write_ax_test.ino
      Little test program for Dynamixel AX-12A on ESP32 and little interface
      weigu.lu
      https://www.weigu.lu/sb-computer/ros2_basics/ros2_esp32_dynamixel/index.html  
      https://github.com/JosueAGtz/dynamixelInterface
      More functions in:
      https://github.com/ROBOTIS-GIT/Dynamixel2Arduino/blob/master/src/Dynamixel2Arduino.h
    */

    #include <Dynamixel2Arduino.h>

    #define DXL_SERIAL   Serial1            // Serial to Dynamixel
    #define DEBUG_SERIAL Serial             // Arduino serial monitor
    const byte DXL_DIR_PIN = 19;            // Dynamixel interface board  DIR PIN
    const byte DXL_ID = 1;                  // default AX-12A
    const float DXL_PROTOCOL_VERSION = 1.0; // default AX-12A
    const unsigned long DXL_BPS = 1000000;  // default AX-12A
    const unsigned long DEBUG_BPS = 115200;

    Dynamixel2Arduino dxl(DXL_SERIAL, DXL_DIR_PIN);

    void setup() {
      DXL_SERIAL.begin(DXL_BPS, SERIAL_8N1, 21, 22);    // SERIAL1 on ESP32 to Pin 21/22 
      DEBUG_SERIAL.begin(DEBUG_BPS);                    // Set debugging port baudrate to 115200bps
      delay(2000);
      dxl.begin(DXL_BPS);                               // Set Port baudrate to default AX-12A
      dxl.setPortProtocolVersion(DXL_PROTOCOL_VERSION); // Set Port Protocol Version  
      DEBUG_SERIAL.println("Refer to eManual for more details: https://emanual.robotis.com/docs/en/dxl/");
      DEBUG_SERIAL.println("Read for PROTOCOL " + String(DXL_PROTOCOL_VERSION) + ", ID: " + String(DXL_ID));
      DEBUG_SERIAL.println("Model Number: " + String(dxl.getModelNumber(DXL_ID))); // Read DYNAMIXEL Model Number
      DEBUG_SERIAL.println("Bitrate: " + String(dxl. getPortBaud()));              // Read DYNAMIXEL Baudrate  
      // Read DYNAMIXEL Present Position    
      DEBUG_SERIAL.println("Present position: " + String(dxl.getPresentPosition(DXL_ID, UNIT_DEGREE)));   
      DEBUG_SERIAL.println("Setup done!");
      delay(2000);
    }

    void loop() {  
      dxl.ledOn(DXL_ID);    
      for (int i = 0; i<290; i=i+5) {
        DEBUG_SERIAL.print(String(i) + '\t');
        dxl.setGoalPosition(DXL_ID, i, UNIT_DEGREE);
        delay(100);
        int pos = dxl.getPresentPosition(DXL_ID, UNIT_DEGREE);
        if (((pos==0) && (i!=0)) || (pos>300) || (pos<0)) {
          delay(100);
          pos = dxl.getPresentPosition(DXL_ID, UNIT_DEGREE);
        } 
        DEBUG_SERIAL.println(pos);    
      }
      dxl.setGoalPosition(DXL_ID, 0, UNIT_DEGREE);
      dxl.ledOff(DXL_ID);  
      delay(5000);  
    }

Communication between ESP32 and Raspi

I will use a similar packet structure as it was used for the JPL NASA open source robot (osr). The communication goes over a good old serial interface. On ESP we use Serial2 on the pins 16 (RxD2) and 17 (TxD2). RxD2 is connected to TxD and TxD2 to RxD of an 3V USB to Serial adapter that uses an USB port of the Raspi. Don't forget to connect ground.

The serial data will have a two byte preamble (or syncword). These are the teo first bytes of every single message. The preamble is 0xABCD. A 1 byte CRC-8 Checksum provides verification that the frame was received correctly. It is computed of only the data bytes and is stored at the end of the packet. It can be re-calculated on both sides of the data stream to check if the data transmission was correct. As we need here only one data value, the data consists in this example from 2 bytes. Here is an example of the frame (pot. value 1562):

Preamblel HByte Preamblel LByte Data HByte Data LByte CRC
0xAB 0xCD 0x06 0x1A 0x1C
Testing basic serial communication

First let's test the serial connection. We use 1000000 bit/s :).

    // esp32_hardware_serial_test.ino

    byte incoming_byte;
    long counter = 0;

    void setup() {
      Serial.begin(115200);                     // initialize serial monitor
      Serial2.begin(1000000,SERIAL_8N1, 16, 17); //init hardware serial
      delay(500);
      Serial.println("Setup done!");
    }

    void loop() {
      Serial.println(String("Hello! ") + counter); // msg to Raspi
      Serial2.println(String("Hello! ") + counter); // msg to Raspi
      while (Serial2.available() > 0) {  // if any serial available, read it
        incoming_byte = Serial2.read();  // and write it to the terminal
        Serial.write(incoming_byte);     // alt: Serial.write(Serial2.read());
      }
      Serial.print(".");
      delay(2000);
      counter++;
    }

On the Raspi look with sudo dmesg for the name of your adapter (e.g. ttyUSB0, ttyAMA0). Then we configure the interface and wait for 15 bytes.

    stty -F /dev/ttyUSB0 speed 10000000 cs8 -cstopb -parenb -echo
    hexdump -C -n 15 /dev/ttyUSB0

We should see a "Hello!". To send something to the ESP32 (Arduino Serial monitor) we use:

    echo -en '\x6f\x6f\x6f\x6f\x0a0' > /dev/ttyUSB0

We should see a "oooo".

Testing the potentiometer and the Dynamixel

Next let's test the potentiometer and Dynamixel connections:

    // esp32_hardware_pot_and_dynamixel_test.ino

    #include <Dynamixel2Arduino.h>

    #define DXL_SERIAL   Serial1            // Serial to Dynamixel
    #define DEBUG_SERIAL Serial             // Arduino serial monitor
    const unsigned long DEBUG_BPS = 115200;
    const unsigned long DXL_BPS = 1000000;  // default AX-12A
    const byte DXL_ID = 1;                  // default AX-12A
    const float DXL_PROTOCOL_VERSION = 1.0; // default AX-12A
    const byte DXL_DIR_PIN = 19;            // Dynamixel interface board  DIR PIN
    const byte PIN_POT = 26;
    unsigned short pot_value = 0;
    int dxl_angle = 0;    // 0-180° variable to store the servo position

    Dynamixel2Arduino dxl(DXL_SERIAL, DXL_DIR_PIN);

    void setup() {
      DEBUG_SERIAL.begin(DEBUG_BPS);                 // serial monitor
      delay(1000);
      DXL_SERIAL.begin(DXL_BPS, SERIAL_8N1, 21, 22); // DXL serial ESP32 Pin 21/22  
      dxl.begin(DXL_BPS);                            // Set speed to default AX-12A
      dxl.setPortProtocolVersion(DXL_PROTOCOL_VERSION); // Set Protocol Version
      DEBUG_SERIAL.println("Setup done!");
      delay(1000);  
    }

    void loop() {
      pot_value = analogRead(PIN_POT);    
      DEBUG_SERIAL.print(String("Pot value: ") + pot_value + '\t');
      dxl_angle = map(pot_value,0,4095,0,280);  
      DEBUG_SERIAL.println(String("DXL angle: ") + dxl_angle + "°");  
      dxl.setGoalPosition(DXL_ID, dxl_angle, UNIT_DEGREE);
      delay(10);  
    }

Flashing the Arduino program

And last but not least, lets flash our Arduino program:

    // ros2_esp32_dynamixel_pot.ino  weigu.lu
    // Using Dynamixel2Arduino (install with lib manager) and Dynamixel interface
    // https://github.com/JosueAGtz/dynamixelInterface

    #include <Dynamixel2Arduino.h>

    #define DEBUG_SERIAL Serial             // Arduino serial monitor
    #define DXL_SERIAL   Serial1            // Serial to Dynamixel
    #define USB_SERIAL   Serial2            // Serial to Raspi
    const unsigned long DEBUG_BPS = 115200;
    const unsigned long DXL_BPS = 1000000;  // default AX-12A
    const unsigned long USB_BPS = 1000000;
    const byte DXL_ID = 1;                  // default AX-12A
    const float DXL_PROTOCOL_VERSION = 1.0; // default AX-12A
    const byte DXL_DIR_PIN = 19;            // Dynamixel interface board  DIR PIN
    const byte PIN_POT = 26;

    const unsigned short PREAMBLE = 0xABCD;
    const unsigned long DELAY_MS = 500; // time in ms between sends
    unsigned short pot_value = 0;       // 0-4095 variable pot position
    int servo_angle = 0;                // 0-180° variable servo position
    byte checksum = 0;
    byte data_out[5] = {0};  // consists for the moment only of pot_value 
    byte data_out_length = sizeof(data_out) / sizeof(byte);
    byte data_in[5] = {0};  // consists for the moment only of servo_angle 
    byte data_in_length = sizeof(data_in) / sizeof(byte);

    Dynamixel2Arduino dxl(DXL_SERIAL, DXL_DIR_PIN);

    void setup() {
      DEBUG_SERIAL.begin(DEBUG_BPS);                 // serial monitor
      delay(1000);
      DXL_SERIAL.begin(DXL_BPS, SERIAL_8N1, 21, 22); // DXL serial ESP32 Pin 21/22  
      USB_SERIAL.begin(USB_BPS,SERIAL_8N1, 16, 17);  // USB serial ESP32 Pin 16/17   
      dxl.begin(DXL_BPS);                            // Set speed to default AX-12A
      dxl.setPortProtocolVersion(DXL_PROTOCOL_VERSION); // Set Protocol Version
      DEBUG_SERIAL.println("Setup done!");
      delay(1000);
    }

    void loop() {
      if (non_blocking_delay(DELAY_MS)) {
        send_pot_value();    
      }
      //DEBUG_SERIAL.println(read_telegram());
      if (read_telegram() == data_in_length) {
        set_dynamixel();

      }
      delay(100); // for the dog
    }

    unsigned short get_pot_value() {  
      const byte ADC_SAMPLES = 5;
      unsigned long pot_adc_sum = 0;    
      for (byte i = 0; i<ADC_SAMPLES; i++) {
        pot_adc_sum += analogRead(PIN_POT);
      }
      return pot_adc_sum/ADC_SAMPLES;
    }

    void send_pot_value() {
      data_out[0] = highByte(PREAMBLE);  
      data_out[1] = lowByte(PREAMBLE);    
      pot_value = get_pot_value();
      DEBUG_SERIAL.println(pot_value);
      data_out[2] = highByte(pot_value);
      data_out[3] = lowByte(pot_value);
      data_out[4] = xor_checksum(data_out,2,3);
      for (byte i = 0; i < data_out_length; i++) {
        USB_SERIAL.write(data_out[i]);    
      }  
      for (byte i = 0; i < data_out_length; i++) {
        DEBUG_SERIAL.print(data_out[i],HEX);
      }  
      DEBUG_SERIAL.println();
    }

    byte read_telegram() {
      unsigned short serial_cnt = 0;  
      if (USB_SERIAL.available()) {   // wait for the whole stream
        delay(1); // 1ms = 100 byte with 1000000bps
        }  
      while (USB_SERIAL.available()) {
        data_in[serial_cnt] = USB_SERIAL.read();
        serial_cnt++;
      }
      if ((data_in[0] != highByte(PREAMBLE)) || (data_in[1] != lowByte(PREAMBLE))) {    
        while (USB_SERIAL.available() > 0) {USB_SERIAL.read();} //clear the buffer!    
        return -1;
      }
      return serial_cnt;
    }

    void set_dynamixel() {
    byte checksum = xor_checksum(data_in,2,3);
      if (data_in[4] == checksum) {
        servo_angle = data_in[2]*256+data_in[3];
        DEBUG_SERIAL.println(servo_angle);           
        dxl.setGoalPosition(DXL_ID, servo_angle, UNIT_DEGREE);
      }
    }

    byte xor_checksum(byte *data, byte index1, byte index2) {
      checksum = 0;
      for (byte i = index1; i <= index2; i++) {
        checksum ^= data[i]; // XOR the current byte with the checksum
      }
      return checksum; 
    }

    bool non_blocking_delay(unsigned long milliseconds) {
      static unsigned long nb_delay_prev_time = 0;
      if(millis() >= nb_delay_prev_time + milliseconds) {
        nb_delay_prev_time += milliseconds;
        return true;
      }
      return false;
    } 

Creating a real sensor node (potentiometer on ESP32)

Now let's create our workspace (if not already done) and node:

    cd ~
    mkdir -p myros2_ws/src
    cd myros2_ws/src
    ros2 pkg create --build-type ament_python --license Apache-2.0 py_esp32_pot_servo \ --dependencies rclpy std_msgs 

In the package.xml file change the , , and tags. In setup.py change the same variables and add the entry point:

    ...
    entry_points={
       'console_scripts': [
            'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
            #'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
        ],
    },

We need the Python serial library and install it with:

    sudo apt install python3-serial

Next we create our python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo. I had problems to use a use an external module (serial communication) with the node program, so I integrated the communication class in the node program. It contains also some functions not used, but helpful if debugging is necessary (I omit them here).

The name of the script is ros2_esp32_get_pot.py:

    #!/usr/bin/env python
    import rclpy       # the imports match the dependencies from package. xml
    from rclpy.node import Node     # import python node class
    from std_msgs.msg import String # import the built-in string message type
    import serial

    SER_PORT = "/dev/ttyUSB0"
    SER_SPEED = 1000000

    """ I could not import an external module, so I added the usb_serial code here"""
    class SerialComm():
        """ Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
            (2 byte for now) + CRC (1byte) """
        def __init__(self, port, speed):
            self.port = port
            self.speed = speed
            self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
            self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
            self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
            self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
            self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
            self.crc_index = 4   # crc index in data stream (1 byte)

        def check_if_crc_ok(self, crc, crc_data):
            """ xor checksum over bytes from index1 to index2 included
                and compare with data checksum """
            checksum = 0
            for byte in crc_data:   # iterate over each byte in the data
                checksum ^= byte    # XOR the byte with the checksum
            if crc == checksum:
                return True
            else:
                return False

        def get_pot_value(self):
            ser = serial.Serial(self.port, self.speed) # open the serial port
            data = ser.read(5)
            #print(data.hex())
            if data[0:2]  == self.preambel:
                pot_value = data[self.pot_val_index1]*256+data[self.pot_val_index2]
                #print("Pot value: ", pot_value, end="\t")
                crc = data[self.crc_index]
                crc_data = data[self.data_index1:self.data_index2+1]    # checksum data values
                if self.check_if_crc_ok(crc, crc_data):
                    #print("Checksum ok")
                    flag = True
                else:
                    #print("Checksum error")
                    flag = False
            else:
                #print("Data acquisition error!")
                flag = False
            ser.close()
            if flag:
                return pot_value
            else:
                return -1

    class PotPublisher(Node): # our pub node class
        def __init__(self):
            super().__init__('pot_publisher') # pass name to node class
            self.publisher_ = self.create_publisher(String, 'pot_value', 10)
            timer_period = 0.5    # 0.5 seconds
            self.timer = self.create_timer(timer_period, self.timer_callback)
            self.i = 0            # counter for message
            self.serial_comm = SerialComm(SER_PORT, SER_SPEED)

        def timer_callback(self): # is called every 0.5s from timer
            pot_value = self.serial_comm.get_pot_value()
            msg = String()
            if pot_value != -1:
                msg.data = str(pot_value)
            else:
                msg.data = "error"
                self.get_logger().info('Communication or CRC error') # log to console
            self.publisher_.publish(msg) # publish to topic
            self.get_logger().info('Publishing: "%s"' % msg.data) # log to console
            self.i += 1           # increment counter


    def main(args=None):
        rclpy.init(args=args)            # init rclpy
        pot_publisher = PotPublisher()   # create publisher node
        rclpy.spin(pot_publisher)        # loop the node so callbacks occur
        pot_publisher.destroy_node()     # destroy the node explicitly
        rclpy.shutdown()                 # shutdown rclpy

    if __name__ == '__main__':
        main()

Check in the setup script that your path is $base/lib/py_pubsub. Then we run rosdep in the root of our workspace to check for missing dependencies before building. Then we built the package and run the node:

    cd myros2_ws
    rosdep install -i --from-path src --rosdistro jazzy -y
    #rosdep install -i --from-path src --rosdistro humble -y
    colcon build --packages-select py_esp32_pot_servo
    source install/setup.bash
    ros2 run py_esp32_pot_servo potentiometer    
Creating a real actuator node (servo on ESP32)

Now let's create a second node residing in the same package.

We add in setup.py the second entry point:

    ...
    entry_points={
       'console_scripts': [
            'potentiometer = py_esp32_pot_servo.ros2_esp32_get_pot:main',
            'servo = py_esp32_pot_servo.ros2_esp32_set_servo:main',
        ],
    },

Then we create our second python script in the folder src/py_esp32_pot_servo/py_esp32_pot_servo.

The name of the script is ros2_esp32_set_servo_pot.py:

    #!/usr/bin/env python

    import rclpy       # the imports match the dependencies from package. xml
    from rclpy.node import Node     # import python node class
    from std_msgs.msg import String # import the built-in string message type
    import serial
    from time import sleep

    SER_PORT = "/dev/ttyUSB0"
    SER_SPEED = 1000000

    """ I could not import an external module, so I added the usb_serial code here"""
    class SerialComm():
        """ Get a value over Serial from ESP32, Frame = Preamble (2 byte) + data
            (2 byte for now) + CRC (1byte) """
        def __init__(self, port, speed):
            self.port = port
            self.speed = speed
            self.preambel = b'\xAB\xCD' # define the sync word as a bytes object
            self.data_index1 = 2 # first byte index of data values in data_stream (for CRC)
            self.data_index2 = 3 # last byte index of data values in data stream (for CRC)
            self.pot_val_index1 = 2 # hbyte index of pot value in data_stream
            self.pot_val_index2 = 3 # lbyte index of pot value in data_stream
            self.servo_val_index1 = 2 # hbyte index of servo value in data_stream
            self.servo_val_index2 = 3 # lbyte index of servo value in data_stream
            self.crc_index = 4   # crc index in data stream (1 byte)
            self.data_bytes_pot = 5
            self.data_bytes_servo = 5

        def check_if_crc_ok(self, crc, crc_data):
            """ Compare data crc with with checksum """
            checksum = self.calculate_checksum(crc_data)
            for byte in crc_data:   # iterate over each byte in the data
                checksum ^= byte    # XOR the byte with the checksum
            if crc == checksum:
                return True
            else:
                return False

        def calculate_checksum(self, crc_data):
            """ xor checksum over bytes from index1 to index2 included"""
            checksum = 0
            for byte in crc_data:   # iterate over each byte in the data
                checksum ^= byte    # XOR the byte with the checksum
            return checksum

        def send_angle_2_servo(self, angle):
            ser = serial.Serial(self.port, self.speed) # open the serial port
            data = [None]*5
            data[0:2] = self.preambel
            data[self.servo_val_index1:self.servo_val_index2+1] = angle.to_bytes(2, 'big')
            data[self.crc_index] = self.calculate_checksum(data[2:4])
            ser.write(bytearray(data))
            ser.close()

    class Send2Servo(Node): # another pub node class
        def __init__(self):
            self.angle = 0
            super().__init__('send_angle_2_servo')
            self.subscription = self.create_subscription(
                String,
                'pot_value',
                self.listener_callback,
                10)
            self.subscription  # prevent unused variable warning
            timer_period = 0.5    # 0.5 seconds
            self.timer = self.create_timer(timer_period, self.timer_callback)
            self.serial_comm = SerialComm(SER_PORT, SER_SPEED)

        def timer_callback(self): # is called every 0.5s from timer
            self.serial_comm.send_angle_2_servo(self.angle)
            self.get_logger().info('I send: "%s"' % str(self.angle))

        def listener_callback(self, msg):
            self.get_logger().info('I heard: "%s"' % msg.data)
            self.angle = int(int(msg.data)*180/4095)

    def main(args=None):
        rclpy.init(args=args)               # init rclpy
        send_angle_to_servo = Send2Servo()  # create publisher node
        rclpy.spin(send_angle_to_servo)    # loop the node so callbacks occur
        send_angle_to_servo.destroy_node()  # destroy the node explicitly
        rclpy.shutdown()                    # shutdown rclpy

    if __name__ == '__main__':
        main()
    cd myros2_ws
    rosdep install -i --from-path src --rosdistro jazzy -y
    #rosdep install -i --from-path src --rosdistro humble -y
    colcon build --packages-select py_esp32_pot_servo
    source install/setup.bash
    ros2 run py_esp32_pot_servo servo

If both nodes are running we should see something like this on the screen (2 ssh terminal sessions and the output od Arduino serial monitor):

Screenshot 2 terminal
Click for bigger picture

Downloads

Interesting links