Tutoriales

Creación de un espacio de trabajo

A continuación, se realiza la creación de un espacio de trabajo denominado ros2_ws

Creación del espacio de trabajo

mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build
echo "source ~/ros2_ws/install/setup.bash" >> ~/.bashrc
source ~/.bashrc

Creación Paquetes

Una vez construido el espacio de trabajo, se debe realizar la construcción de un paquete de ROS2, en este caso se utilizará un paquete compatible con python, y tendrá las dependencias de std_msgs

  1. Crear el paquete

cd ~/ros2_ws/src
ros2 pkg create mi_pkg_python --build-type ament_python --dependencies rclpy std_msgs
  1. Estructura generada

mi_pkg_python/
├── mi_pkg_python/
│   └── __init__.py
├── package.xml
├── resource/
│   └── mi_pkg_python
├── setup.py
├── setup.cfg
└── test/
  1. Compilar el paquete

cd ~/ros2_ws
colcon build --packages-select mi_pkg_python

Nodos publicador y Suscriptor

Una vez compilado el paquete, se realiza la creación de los nodos publicador y suscriptor.

  1. Crear el archivo del nodo publicador

touch mi_pkg_python/mi_pkg_python/publicador.py
touch mi_pkg_python/mi_pkg_python/suscriptor.py
  1. Agregar el código

# Importa la librería principal de ROS 2 en Python
import rclpy

# Importa la clase base Node, que representa un nodo en ROS 2
from rclpy.node import Node

# Importa el tipo de mensaje estándar String del paquete std_msgs
from std_msgs.msg import String

# Define una clase que extiende Node, representando un nodo que publica mensajes
class MinimalPublisher(Node):

      def __init__(self):
         # Llama al constructor de la clase padre (Node) con el nombre del nodo
         super().__init__('minimal_publisher')

         # Crea un publicador que publica mensajes del tipo String en el topic 'topic' con una cola de 10 mensajes
         self.publisher_ = self.create_publisher(String, 'topic', 10)

         # Crea un temporizador que ejecuta la función `timer_callback` cada 0.5 segundos
         self.timer = self.create_timer(0.5, self.timer_callback)

         # Contador para numerar los mensajes
         self.i = 0

      # Función que se ejecuta cada 0.5 segundos
      def timer_callback(self):
         # Crea un mensaje tipo String
         msg = String()

         # Asigna el contenido del mensaje
         msg.data = f'Hello World: {self.i}'

         # Publica el mensaje en el topic
         self.publisher_.publish(msg)

         # Muestra por consola el mensaje publicado
         self.get_logger().info(f'Publishing: "{msg.data}"')

         # Incrementa el contador
         self.i += 1

# Punto de entrada principal del programa
def main(args=None):
      # Inicializa el sistema de nodos de ROS 2
      rclpy.init(args=args)

      try:
         # Instancia el nodo y empieza a ejecutarlo
         node = MinimalPublisher()

         # Mantiene al nodo activo, escuchando eventos y timers
         rclpy.spin(node)

      except KeyboardInterrupt:
         # Permite salir con Ctrl+C sin error
         pass

      finally:
         # Destruye el nodo de forma limpia y apaga el sistema ROS 2
         node.destroy_node()
         rclpy.shutdown()

# Verifica si el archivo se está ejecutando directamente (no importado)
if __name__ == '__main__':
      main()
  1. Registrar los nodos en setup.py

entry_points={
    'console_scripts': [
        'publicador1 = mi_pkg_python.publicador:main',
        'suscriptor1 = mi_pkg_python.suscriptor:main',
    ],
},
  1. Compilar el paquete

cd ~/ros2_ws
colcon build --packages-select mi_pkg_python
  1. Ejecutar el nodo publicador

ros2 run mi_pkg_python publicador1

en una terminal diferente ejecutar el nodo suscriptor

ros2 run mi_pkg_python suscriptor1

Paquetes con mensajes personalizados

Los mensajes personalizados permiten crear un paquete exclusivo para generar estructuras personales de mensajes en ROS2 usando ament_cmake, ademas de poder integrarlos a otros paquetes.

Antes de continuar con la generación de mensajes es necesario revisar, que tipos de mensajes están disponibles:

ros2 interface list | grep msg

y la revisión de los parámetros del mensaje se realiza mediante el comand:

ros2 interface show [tipo de mensaje]

Ahora, se va a crear un paquete llamado avig_msg que defina un mensaje personalizado AprilTagPixel.msg con los siguientes campos:

string id
int32 posx
int32 posy

  1. Crear el paquete

Desde la carpeta src del workspace:

ros2 pkg create avig_msg --build-type ament_cmake

Este comando genera la estructura básica del paquete avig_msg.


  1. Crear el mensaje personalizado

cd avig_msg
mkdir msg
gedit msg/AprilTagPixel.msg

Contenido:

int32 id
float32 posx
float32 posy
int32 orden
float32 dist

Este archivo define un mensaje simple para enviar datos de detección de un tag.


  1. Mensajes compuestos

Se puede crear mensajes que tengan anidada mas información en este caso se va a crear un array de AprilTagPixel.msg.

Dentro del paquete avig_msg

cd msg
gedit AprilTagPixelArray.msg

Contenido:

AprilTagPixel[] tags
  1. Editar CMakeLists.txt

Editar CMakeLists.txt para incluir soporte de mensajes:

cmake_minimum_required(VERSION 3.8)
project(avig_msg)

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/AprilTagPixel.msg"
  "msg/AprilTagPixelArray.msg"
  DEPENDENCIES builtin_interfaces
)

ament_export_dependencies(rosidl_default_runtime)
ament_package()

¿Por qué? - rosidl_default_generators genera los bindings del mensaje. - builtin_interfaces es requerido si se usan tipos nativos como string, int32. - ament_export_dependencies permite que otros paquetes importen estos mensajes.


  1. Editar package.xml

Agregar las dependencias necesarias:

<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

¿Por qué? Estas etiquetas aseguran que el sistema de compilación de ROS 2 reconozca este paquete como generador de interfaces.


  1. Compilar e instalar

Desde la raíz del workspace:

cd ~/ros2_ws
colcon build --packages-select avig_msg
source install/setup.bash

  1. Verificar el mensaje

ros2 interface show avig_msg/msg/AprilTagPixel

  1. Usar el mensaje en otro paquete

En el paquete creado anteriormente de Python mi_pkg_python:

En package.xml:

<exec_depend>avig_msg</exec_depend>

En el código Python:

from avig_msg.msg import AprilTagPixel

Se puede usar en un publicador o suscriptor como cualquier otro mensaje.

April-Tags

Un AprilTag es un tipo de marcador visual 2D diseñado para permitir la detección robusta, precisa y rápida en entornos de visión por computadora y robótica. Fue desarrollado en el Laboratorio de Robótica de la Universidad de Michigan.

Características principales:

  • Patrón en blanco y negro con bordes codificados.

  • Cada tag tiene un ID único.

  • Puede ser detectado desde distintos ángulos y con iluminación variable.

  • Su detección es más rápida y robusta que QR o ARTags para propósitos robóticos.


Usos comunes de AprilTags en Robótica

Aplicación

Descripción

Localización absoluta

Identificar una posición fija en el mapa mediante un tag conocido.

Seguimiento de objetos

Seguir la posición de un tag en tiempo real (e.g. manipulación, drones).

Calibración de cámaras

Calibrar parámetros intrínsecos y extrínsecos de una cámara.

SLAM / Navegación

Complementar sensores como LiDAR o IMU en mapeo y navegación autónoma.

Interacción humano-robot

Identificar objetos, posiciones, o áreas accesibles visualmente.


Librería usada

Usamos la librería pupil_apriltags, una implementación rápida del detector.
Es eficiente, moderna, y puede ser utilizada en aplicaciones en tiempo real.

El resto de aplicaciones de AprilTag se puede revisar en los siguintes repositorios:

April-Tags Git-hub-Tags

Instalación:

pip install pupil-apriltags opencv-python

Ejemplos

Detección en Python puro (sin ROS)

import cv2
   from pupil_apriltags import Detector

   def main():
      # Abre la cámara (0 = cámara predeterminada)
      cap = cv2.VideoCapture(0)

      if not cap.isOpened():
         print("No se pudo acceder a la cámara.")
         return

      # Configurar el detector de AprilTags
      at_detector = Detector(
         families='tag36h11',
         nthreads=1,
         quad_decimate=1.0,
         quad_sigma=0.0,
         refine_edges=True,
         decode_sharpening=0.25,
         debug=False
      )

      # Nombre de la ventana
      window_name = 'AprilTag Detector - tag36h11'

      # Hacer la ventana redimensionable
      cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
      cv2.resizeWindow(window_name, 800, 600)  # Ancho x Alto en píxeles

      print("Cámara activa. Presiona 'q' para salir.")

      while True:
         ret, frame = cap.read()
         if not ret:
               print("No se pudo leer el frame.")
               break

         # Convertir imagen a escala de grises
         gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

         # Detectar etiquetas AprilTag
         detections = at_detector.detect(gray)

         for detection in detections:
               tag_id = detection.tag_id
               print(f"Tag detectado: {tag_id}")

               # Dibujar un círculo en el centro del tag
               center = (int(detection.center[0]), int(detection.center[1]))
               cv2.circle(frame, center, 10, (0, 255, 0), 2)

         # Mostrar imagen con detecciones
         cv2.imshow(window_name, frame)

         # Salir si se presiona la tecla 'q'
         if cv2.waitKey(1) & 0xFF == ord('q'):
               break

      # Liberar recursos
      cap.release()
      cv2.destroyAllWindows()
      print("Cámara cerrada.")

   if __name__ == '__main__':
      main()

Servicios personalizados

En el servicio personalizado se agrega la funcionalidad de una Heuristica al sistema de control del robot.

Definición general Una heurística es una regla, método o estrategia que simplifica la toma de decisiones y permite encontrar soluciones aproximadas en situaciones complejas, donde el cálculo exacto sería muy costoso o imposible.

Para ello, se establece un servicio que tiene como requerimiento el tipo de mensaje AprilTagPixelArray y como respuesta un AprilTagPixel

  1. Creación del archivo srv

Dentro del paquete avig_msg

mkdir srv
cd srv
gedit Heuristica.srv

Contenido:

avig_msg/AprilTagPixelArray tags_in
---
avig_msg/AprilTagPixel tag_out

Esto indica que el servicio recibirá una lista de tags y devolverá solo uno como resultado.


  1. Configurar CMakeLists.txt

Agrega lo siguiente si no está presente:

find_package(rosidl_default_generators REQUIRED)

Agrega todos los archivos .msg y .srv:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/AprilTagPixel.msg"
  "msg/AprilTagPixelArray.msg"
  "srv/Heuristica.srv"
  DEPENDENCIES builtin_interfaces
)

revisar que se encuentren exportadas las dependencias:

ament_export_dependencies(rosidl_default_runtime)

  1. Configurar package.xml

Incluye los siguientes bloques:

<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

  1. Compilar

Desde la raíz del workspace:

colcon build --packages-select avig_msg
source install/setup.bash

Verifica el servicio:

ros2 interface show avig_msg/srv/Heuristica

  1. Crear el Servidor

Dentro del paquete mi_pkg_python

mkdir servicios
cd servicios
touch __initi__.py

Guarda el siguiente código como srv_servidor.py y srv_cliente.py:

import rclpy
from rclpy.node import Node
from avig_msg.srv import Heuristica
from avig_msg.msg import AprilTagPixel
import math

class EuristicaServer(Node):
   def __init__(self):
      super().__init__('euristica_server')
      self.srv = self.create_service(Heuristica, 'Heuristica', self.heuristica_callback)
      self.get_logger().info('Servicio Euristica listo.')

   def heuristica_callback(self, request, response):
      tags = request.tags_in.tags
      self.get_logger().info(f"Recibidos {tags} tags.")

      if not tags:
            self.get_logger().warn("No se recibió ningún tag.")
            return response

      tag1 = next((tag for tag in tags if tag.id == 1), None)
      tag2 = next((tag for tag in tags if tag.id == 2), None)

      if tag1 is None or tag2 is None:
            self.get_logger().warn("Faltan tag1 o tag2, no se puede continuar.")
            return response


      tags_ordenados = [tag for tag in tags if tag.id != 0 and tag.id != 1 and tag.id != 2]

      for i, tag in enumerate(tags_ordenados):
            self.get_logger().info(f"Revisando el tag: {tag.id}")
            if 10 <= tag.id < 20:
               tag.dist = math.sqrt((tag.posx - tag1.posx)**2 + (tag.posy - tag1.posy)**2)
            else:
               tag.dist = math.sqrt((tag.posx - tag2.posx)**2 + (tag.posy - tag2.posy)**2)

            self.get_logger().info(f"Distancia del Tag: {tag.id} es {tag.dist}")

      # Heurística: devolver el tag con menor coordenada posx
      tag_ordenado = sorted(tags_ordenados, key=lambda t: t.dist)[0]
      response.tag_out = tag_ordenado
      self.get_logger().info(f"Tag elegido: {tag_ordenado.id}")
      return response

def main(args=None):
   rclpy.init(args=args)
   node = EuristicaServer()
   rclpy.spin(node)
   rclpy.shutdown()

if __name__ == '__main__':
   main()

El nodo servidor

  • Responde a solicitudes del servicio Heuristica

  • Elige el tag con menor distancia al tag 1 o al tag 2 (dist)

El nodo cliente:

  • Crea y envía una lista de AprilTagPixel

  • Solicita una respuesta del servidor

  • Muestra el tag seleccionado


  1. Registrar los nodos en setup.py

En tu setup.py agrega:

entry_points={
    'console_scripts': [
        'servidor = servicio.srv_servidor:main',
        'cliente = servicio.srv_cliente:main',
    ],
},
  1. Compilar el paquete

  2. Ejecutar

En dos terminales diferentes:

bash ros2 run mi_pkg_python servidor
bash ros2 run mi_pkg_python cliente

Implementación de una Acción Personalizada

En esta acción personalizada llamada MoverA se simula el movimiento de un robot hacia una posición (x, y) objetivo, utilizando el sistema de acciones de ROS 2.

La acción representa como el robot se mueve gradualmente en línea recta hacia la meta, acercándose poco a poco hasta que la distancia sea menor a un umbral (por ejemplo, 0.1).

La trayectoría que se simulará en este ejercicio obedece los siguientes puntos:

mat

gif

Estructura general

  • Paquete de mensajes: avig_msg

  • Archivo de acción: MoverA.action

  • Paquete de código (cliente/servidor): mi_pkg_python


  1. Definición de MoverA.action

Ubicar en avig_msg/action/MoverA.action:

# Objetivo
float32 x_actual
float32 y_actual
float32 x_meta
float32 y_meta

---
# Resultado
bool success
---
# Feedback
float32 distancia_restante

  1. Configurar CMakeLists.txt en avig_msg

Agregar:

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "action/MoverA.action"
  DEPENDENCIES builtin_interfaces
)

ament_export_dependencies(rosidl_default_runtime)

  1. Configurar package.xml en avig_msg

Agregar:

<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

  1. Dentro del paquete mi_pkg_python crar los programas: action_server.py y action_client.py

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from avig_msg.action import MoverA
import math

# Definición de la clase del servidor de acción
class MoverAServer(Node):
   def __init__(self):
      # Inicializa el nodo con nombre 'movera_server'
      super().__init__('movera_server')

      # Crea un servidor de acción para la acción personalizada 'MoverA'
      self._action_server = ActionServer(
            self,
            MoverA,
            'mover_a',  # Nombre del tópico de acción
            execute_callback=self.execute_callback,  # Lógica principal de ejecución
            goal_callback=self.goal_callback,        # Qué hacer cuando llega una meta
            cancel_callback=self.cancel_callback     # Qué hacer si el cliente pide cancelar
      )

      self.goal_handle = None  # Guardará el manejador de la meta actual
      self.timer = None        # Temporizador usado para simular movimiento

   # Callback que se ejecuta cuando se recibe una nueva meta
   def goal_callback(self, goal_request):
      self.get_logger().info('Objetivo recibido.')
      return GoalResponse.ACCEPT  # Acepta todas las metas que recibe

   # Callback que se ejecuta cuando se solicita cancelar una meta
   def cancel_callback(self, goal_handle):
      self.get_logger().info('Cancelación solicitada.')
      return CancelResponse.ACCEPT  # Acepta siempre la cancelación

   # Lógica principal que se ejecuta cuando la meta ha sido aceptada
   async def execute_callback(self, goal_handle):
      self.goal_handle = goal_handle

      # Extrae posición inicial y meta desde la solicitud
      self.x_actual = goal_handle.request.x_actual
      self.y_actual = goal_handle.request.y_actual
      self.x_goal = goal_handle.request.x_meta
      self.y_goal = goal_handle.request.y_meta
      self.step_size = 0.1  # Tamaño de paso por iteración

      # Crea el objeto de feedback que será enviado al cliente
      self.feedback_msg = MoverA.Feedback()

      self.get_logger().info(f'Iniciando movimiento: ({self.x_actual:.2f}, {self.y_actual:.2f}) → ({self.x_goal:.2f}, {self.y_goal:.2f})')

      # Crea un temporizador que se ejecuta cada 0.5 segundos
      self.timer = self.create_timer(0.5, self.update_position)

      # Bucle que espera mientras se ejecuta el temporizador
      while not goal_handle.is_cancel_requested and self.distance(self.x_actual, self.y_actual) > 0.1:
            # Procesa eventos del temporizador
            rclpy.spin_once(self, timeout_sec=0.1)

      # Si el cliente cancela la acción
      if goal_handle.is_cancel_requested:
            self.timer.cancel()            # Detiene el temporizador
            goal_handle.canceled()        # Informa al cliente que fue cancelado
            self.get_logger().warn("Objetivo cancelado por el cliente")
            return MoverA.Result(success=False)

      # Si se llegó al destino
      self.timer.cancel()                # Detiene el temporizador
      goal_handle.succeed()             # Marca como completado con éxito
      self.get_logger().info("Objetivo alcanzado")
      return MoverA.Result(success=True)

   # Función ejecutada periódicamente por el temporizador
   def update_position(self):
      if not self.goal_handle:
            return

      # Calcula la distancia restante
      dist = self.distance(self.x_actual, self.y_actual)
      if dist <= 0.1:
            return  # Ya está suficientemente cerca

      # Simula movimiento en línea recta hacia la meta
      self.x_actual += self.step_size * (self.x_goal - self.x_actual) / dist
      self.y_actual += self.step_size * (self.y_goal - self.y_actual) / dist

      # Calcula nueva distancia y envía feedback
      dist = self.distance(self.x_actual, self.y_actual)
      self.feedback_msg.distancia_restante = float(dist)
      self.goal_handle.publish_feedback(self.feedback_msg)
      self.get_logger().info(f"Distancia restante: {dist:.2f}")

   # Función auxiliar para calcular la distancia euclidiana al objetivo
   def distance(self, x, y):
      return math.sqrt((x - self.x_goal)**2 + (y - self.y_goal)**2)

# Función principal para ejecutar el servidor de acción
def main(args=None):
   rclpy.init(args=args)
   node = MoverAServer()  # Crea el servidor
   rclpy.spin(node)       # Mantiene el nodo activo
   rclpy.shutdown()       # Apaga ROS 2 cuando termina

if __name__ == '__main__':
   main()

El cliente:

  • Envía un objetivo (x, y) con la posición inicial y un objetivo (x, y) con la posición meta al servidor.

  • Imprime el feedback recibido.

  • Muestra el resultado final.

El servidor:

  • Recibe un objetivo de la posición inicial(x, y) y de la posiciónobjetivo (x, y).

  • Simula el movimiento hacia la meta.

  • Envía feedback de la distancia restante.

  • Devuelve success = true si llega al destino.


  1. setup.py Registrar scripts en el setup.py de acciones:

entry_points={
    'console_scripts': [
        'action_server = acciones.action_server:main',
        'action_client = acciones.action_client:main',
    ],
}

  1. Compilación

Desde la raíz del workspace:

colcon build
source install/setup.bash

  1. Ejecución

En una terminal, ejecutar el servidor:

ros2 run mi_pkg_python action_server

En otra terminal, ejecutar el cliente:

ros2 run mi_pkg_python action_client