Behavior Trees en ROS 2 para el control de dos robots SCARA

Objetivo del documento

Este documento explica cómo aplicar Behavior Trees (BT) en ROS 2 para controlar la secuencia de trabajo de dos robots SCARA de 3 grados de libertad:

  • Robot 1: ra1

  • Robot 2: ra2

  • Robot 2: ra3

El sistema base considerado es:

  • Robots SCARA propios de 3 GDL: rotacional, rotacional, prismático.

  • Modelo CAD convertido a xacro/URDF.

  • Dos robots cargados simultáneamente en RViz2 usando namespaces.

  • Puente ROS 2 - MQTT para enviar objetivos de juntas a ESP32.

  • ESP32 ejecutando movimiento con AccelStepper.

  • El gemelo digital se actualiza con los movimientos reales o estimados.

  • No se utiliza cinemática inversa; ya se conocen los objetivos articulares [q1, q2, q3].

La tarea que se quiere automatizar es:

Home -> Pick -> Activar efector -> Place -> Apagar efector

En el caso de dos robots, los ciclos no siempre se ejecutan en paralelo. El activador de máquina puede disparar el ciclo del robot 1 o del robot 2 de forma independiente. También puede ocurrir que, mientras el robot 1 está ejecutando su ciclo, se active el ciclo del robot 2.

Contexto general

En ROS 1 se utilizaba con frecuencia SMACH para crear máquinas de estado en Python. En ROS 2 no existe un reemplazo oficial único de SMACH. Las alternativas más utilizadas son:

  • FSM manual en Python.

  • SMACC2 para máquinas de estado en C++.

  • YASMIN para FSM en Python/C++.

  • Behavior Trees para orquestación reactiva de tareas.

  • Planificación simbólica con PlanSys2 para tareas más complejas.

En este documento se desarrolla el camino de Behavior Trees en Python, usando py_trees y py_trees_ros.

Referencias útiles:

Concepto de Behavior Tree

Un Behavior Tree es una estructura jerárquica que define cómo se ejecuta una tarea. A diferencia de una máquina de estados clásica, un BT no depende únicamente de un estado activo y transiciones explícitas, sino que evalúa periódicamente el árbol mediante ciclos llamados ticks.

Cada nodo del árbol devuelve uno de tres estados:

Estados de retorno de un nodo BT

Estado

Significado

SUCCESS

La acción o condición terminó correctamente.

FAILURE

La acción o condición falló.

RUNNING

La acción todavía está en ejecución.

Ejemplo aplicado al robot:

Ejemplos de retorno

Nodo

Estado devuelto

Interpretación

MoveHome

RUNNING

El robot todavía se está moviendo hacia Home.

MoveHome

SUCCESS

El robot llegó a Home.

MovePick

FAILURE

El robot no completó el movimiento dentro del timeout.

Elementos principales de un Behavior Tree

Behavior / Action node

Un Behaviour es un nodo básico del árbol. Puede representar una acción o una condición.

Ejemplos para el SCARA:

MoveHome
MovePick
EffectorOn
MovePlace
EffectorOff
CheckPieceDetected
CheckEmergencyStop

Sequence

Una Sequence ejecuta sus hijos en orden.

Reglas:

  • Si un hijo devuelve SUCCESS, pasa al siguiente.

  • Si un hijo devuelve RUNNING, la secuencia devuelve RUNNING.

  • Si un hijo devuelve FAILURE, la secuencia devuelve FAILURE.

Para la tarea principal:

Sequence: PalletizingTask
├── WaitStart
├── MoveHome
├── MovePick
├── EffectorOn
├── MovePlace
└── EffectorOff

Selector / Fallback

Un Selector o Fallback prueba alternativas.

Reglas:

  • Si un hijo devuelve FAILURE, prueba el siguiente.

  • Si un hijo devuelve SUCCESS, el selector devuelve SUCCESS.

  • Si un hijo devuelve RUNNING, el selector devuelve RUNNING.

Ejemplo con recuperación:

Selector: Root
├── MainTask
└── Recovery

Si MainTask falla, se ejecuta Recovery.

Decorator

Un Decorator modifica el comportamiento de un nodo hijo.

Ejemplos:

  • Reintentar una acción.

  • Invertir el resultado.

  • Aplicar timeout.

  • Repetir una acción.

Ejemplo conceptual:

Retry
└── MovePick

Blackboard

El Blackboard es una memoria compartida entre nodos del árbol. Puede almacenar:

  • Estado del robot.

  • Último motion_state recibido.

  • Resultado de sensores.

  • Parámetros de receta.

  • Variables de coordinación entre robots.

En este primer ejercicio usaremos una estructura simple de Python como estado compartido:

self.robot_status = {
    "motion_state": None,
    "start_requested": False,
}

Diferencia entre FSM y Behavior Tree

Una FSM manual para tu secuencia se ve así:

IDLE -> MOVE_HOME -> MOVE_PICK -> EFFECTOR_ON -> MOVE_PLACE -> EFFECTOR_OFF -> DONE

Un Behavior Tree equivalente se ve así:

Sequence: PalletizingTask
├── WaitStart
├── MoveHome
├── MovePick
├── EffectorOn
├── MovePlace
└── EffectorOff

La FSM es muy clara para secuencias pequeñas. El BT empieza a ser más conveniente cuando se agregan:

  • Reintentos.

  • Condiciones de sensores.

  • Fallos y recuperación.

  • Bloqueo de zonas compartidas.

  • Coordinación entre robots.

  • Tareas alternativas.

  • Comportamiento reactivo.

Instalación

BT se puede utilizar directamente con python.

pip install py-trees
apt-get install -y graphviz
pip install py-trees pydot

Diccionarios Python

Para agregar un mayor nivel de control con respecto a las variables del sistema es importante manejar una memoria de datos, en este caso se utiliza un diccionario con las variables más importantes,

CAMINOS = {
        "malla_A": {
                "0A": [0, 3, 6],
                "0B": [0, 3, 4, 7],
                "0C": [0, 3, 4, 5, 8],

                "1A": [1, 4, 3, 6],
                "1B": [1, 4, 7],
                "1C": [1, 4, 5, 8],

                "2A": [2, 5, 4, 3, 6],
                "2B": [2, 5, 4, 7],
                "2C": [2, 5, 8],
        },

        "malla_B": {
                "0A": [0, 3, 6],
                "0B": [0, 3, 4, 7],
                "0C": [0, 3, 4, 5, 8],

                "1A": [1, 4, 3, 6],
                "1B": [1, 4, 7],
                "1C": [1, 4, 5, 8],

                "2A": [2, 5, 4, 3, 6],
                "2B": [2, 5, 4, 7],
                "2C": [2, 5, 8],
        },

        "malla_C": {
                "0A": [0, 3, 6],
                "0B": [0, 3, 4, 7],
                "0C": [0, 3, 4, 5, 8],

                "1A": [1, 4, 3, 6],
                "1B": [1, 4, 7],
                "1C": [1, 4, 5, 8],

                "2A": [2, 5, 4, 3, 6],
                "2B": [2, 5, 4, 7],
                "2C": [2, 5, 8],
        },
}

def obtener_camino(nombre_malla, codigo_camino):
        """
        Busca un camino dentro del diccionario CAMINOS.

        Parámetros:
                nombre_malla: nombre de la malla. Ejemplo: "malla_A"
                codigo_camino: código de camino. Ejemplo: "0B"

        Retorna:
                Lista con los puntos del camino. Ejemplo: [0, 3, 4, 7]
        """

        nombre_malla = nombre_malla.strip()
        codigo_camino = codigo_camino.upper().strip()

        if nombre_malla not in CAMINOS:
                raise ValueError(
                                f"La malla '{nombre_malla}' no existe. "
                                f"Mallas disponibles: {list(CAMINOS.keys())}"
                )

        if codigo_camino not in CAMINOS[nombre_malla]:
                raise ValueError(
                                f"El camino '{codigo_camino}' no existe en '{nombre_malla}'. "
                                f"Caminos disponibles: {list(CAMINOS[nombre_malla].keys())}"
                )

        return CAMINOS[nombre_malla][codigo_camino]


# ============================================================
# PRUEBAS
# ============================================================

camino_1 = obtener_camino("malla_A", "0B")
camino_2 = obtener_camino("malla_B", "2C")
camino_3 = obtener_camino("malla_C", "1A")

print("Camino malla_A 0B:", camino_1)
print("Camino malla_B 2C:", camino_2)
print("Camino malla_C 1A:", camino_3)

caminos_b = CAMINOS

caminos_b["malla_A"]["0B"] = [0,0,1]

print(CAMINOS["malla_A"]["0B"])

BT para el control

En este ejemplo se utiliza usa secuenca de control simple para un robot, simulando cada movimiento a través de un timer.

# ============================================================
# IMPORTACIÓN DE LIBRERÍAS
# ============================================================

import time
import os
import py_trees


# ============================================================
# DATO PRINCIPAL: caminos de todas las mallas 3x3
# ============================================================

CAMINOS = {
        "malla_A": {
                "0A": [0, 3, 6],
                "0B": [0, 3, 4, 7],
                "0C": [0, 3, 4, 5, 8],

                "1A": [1, 4, 3, 6],
                "1B": [1, 4, 7],
                "1C": [1, 4, 5, 8],

                "2A": [2, 5, 4, 3, 6],
                "2B": [2, 5, 4, 7],
                "2C": [2, 5, 8],
        },

        "malla_B": {
                "0A": [0, 3, 6],
                "0B": [0, 3, 4, 7],
                "0C": [0, 3, 4, 5, 8],

                "1A": [1, 4, 3, 6],
                "1B": [1, 4, 7],
                "1C": [1, 4, 5, 8],

                "2A": [2, 5, 4, 3, 6],
                "2B": [2, 5, 4, 7],
                "2C": [2, 5, 8],
        },

        "malla_C": {
                "0A": [0, 3, 6],
                "0B": [0, 3, 4, 7],
                "0C": [0, 3, 4, 5, 8],

                "1A": [1, 4, 3, 6],
                "1B": [1, 4, 7],
                "1C": [1, 4, 5, 8],

                "2A": [2, 5, 4, 3, 6],
                "2B": [2, 5, 4, 7],
                "2C": [2, 5, 8],
        },
}


# ============================================================
# FUNCIÓN: buscar camino según nombre de malla y código
# ============================================================

def obtener_camino(nombre_malla, codigo_camino):
        """
        Busca un camino dentro del diccionario CAMINOS.

        Ejemplo:
                obtener_camino("malla_A", "0B")

        Retorna:
                [0, 3, 4, 7]
        """

        nombre_malla = nombre_malla.strip()
        codigo_camino = codigo_camino.upper().strip()

        if nombre_malla not in CAMINOS:
                raise ValueError(
                                f"La malla '{nombre_malla}' no existe. "
                                f"Mallas disponibles: {list(CAMINOS.keys())}"
                )

        if codigo_camino not in CAMINOS[nombre_malla]:
                raise ValueError(
                                f"El camino '{codigo_camino}' no existe en '{nombre_malla}'. "
                                f"Caminos disponibles: {list(CAMINOS[nombre_malla].keys())}"
                )

        return CAMINOS[nombre_malla][codigo_camino]


# ============================================================
# CLASE: SOLICITAR CAMINO
# ============================================================

class SolicitarCamino(py_trees.behaviour.Behaviour):
        """
        Primer comportamiento del árbol.

        Este nodo pide al usuario:
        - nombre de la malla
        - código del camino

        Si el camino existe:
                retorna SUCCESS

        Si el camino no existe:
                retorna FAILURE
        """

        def __init__(self, name, memoria):
                super().__init__(name=name)

                # memoria es un diccionario compartido entre comportamientos
                self.memoria = memoria

                # Variable para evitar pedir datos más de una vez
                self.consulta_realizada = False

        def initialise(self):
                print("\n=== Solicitud de camino ===")

        def update(self):

                # Si ya se hizo la consulta, no la repetimos
                if self.consulta_realizada:
                                return py_trees.common.Status.SUCCESS

                try:
                                # Pedir datos al usuario
                                nombre_malla = input("Ingrese nombre de la malla (malla_A, malla_B, malla_C): ")
                                codigo_camino = input("Ingrese código del camino (ejemplo: 0A, 0B, 2C): ")

                                # Buscar el camino en el diccionario
                                camino = obtener_camino(nombre_malla, codigo_camino)

                                # Guardar datos en memoria compartida
                                self.memoria["nombre_malla"] = nombre_malla.strip()
                                self.memoria["codigo_camino"] = codigo_camino.upper().strip()
                                self.memoria["camino"] = camino

                                print("\nCamino encontrado correctamente:")
                                print(f"Malla: {self.memoria['nombre_malla']}")
                                print(f"Código: {self.memoria['codigo_camino']}")
                                print(f"Camino: {self.memoria['camino']}")

                                # Marcar que ya se consultó
                                self.consulta_realizada = True

                                # Si el camino existe, el comportamiento termina con éxito
                                return py_trees.common.Status.SUCCESS

                except ValueError as error:
                                print("\nError en la búsqueda del camino:")
                                print(error)

                                # Si hay error, el comportamiento falla
                                return py_trees.common.Status.FAILURE


# ============================================================
# CLASE: ACCIÓN SIMPLE
# ============================================================

class AccionSimple(py_trees.behaviour.Behaviour):
        """
        Comportamiento simple que simula una acción con tiempo.

        Ejemplo:
        - Mover a HOME
        - Activar efector
        - Apagar efector
        """

        def __init__(self, name, tiempo=1.0):
                super().__init__(name=name)
                self.tiempo = tiempo
                self.tiempo_inicio = None

        def initialise(self):
                self.tiempo_inicio = time.time()
                print(f"Iniciando: {self.name}")

        def update(self):
                tiempo_actual = time.time()
                tiempo_transcurrido = tiempo_actual - self.tiempo_inicio

                if tiempo_transcurrido >= self.tiempo:
                                print(f"Finalizado: {self.name}")
                                return py_trees.common.Status.SUCCESS

                return py_trees.common.Status.RUNNING


# ============================================================
# CLASE: RECORRER CAMINO
# ============================================================

class RecorrerCamino(py_trees.behaviour.Behaviour):
        """
        Este comportamiento recorre los puntos del camino encontrado.

        Por ejemplo, si el usuario ingresó:
                malla_A, 0B

        El camino será:
                [0, 3, 4, 7]

        Este nodo simula mover el robot punto por punto.
        """

        def __init__(self, name, memoria, tiempo_por_punto=1.0):
                super().__init__(name=name)

                self.memoria = memoria
                self.tiempo_por_punto = tiempo_por_punto

                self.indice_actual = 0
                self.tiempo_inicio = None

        def initialise(self):
                self.indice_actual = 0
                self.tiempo_inicio = time.time()

                print("\nIniciando recorrido del camino...")

        def update(self):

                # Verificar que exista un camino guardado
                if "camino" not in self.memoria:
                                print("No existe un camino cargado en memoria.")
                                return py_trees.common.Status.FAILURE

                camino = self.memoria["camino"]

                # Si ya recorrimos todos los puntos, terminamos
                if self.indice_actual >= len(camino):
                                print("Camino recorrido completamente.")
                                return py_trees.common.Status.SUCCESS

                tiempo_actual = time.time()
                tiempo_transcurrido = tiempo_actual - self.tiempo_inicio

                # Cada cierto tiempo avanzamos al siguiente punto
                if tiempo_transcurrido >= self.tiempo_por_punto:
                                punto = camino[self.indice_actual]

                                print(f"Robot moviéndose al punto: {punto}")

                                # Aquí en un robot real publicarías:
                                # /raX/joint_goals_q
                                # y esperarías /raX/motion_state

                                self.indice_actual += 1
                                self.tiempo_inicio = time.time()

                return py_trees.common.Status.RUNNING


# ============================================================
# FUNCIÓN: CREAR ÁRBOL DE COMPORTAMIENTO
# ============================================================

def crear_arbol():

        # Memoria compartida entre nodos del árbol
        memoria = {}

        # Nodo raíz tipo Sequence
        root = py_trees.composites.Sequence(
                name="Secuencia_SCARA",
                memory=True
        )

        # Primer comportamiento:
        # El usuario ingresa la malla y el camino
        solicitar_camino = SolicitarCamino(
                name="Solicitar camino",
                memoria=memoria
        )

        # Segundo comportamiento:
        # Simula mover a HOME
        mover_home = AccionSimple(
                name="Mover a HOME",
                tiempo=2.0
        )

        # Tercer comportamiento:
        # Recorre los puntos del camino encontrado
        recorrer_camino = RecorrerCamino(
                name="Recorrer camino",
                memoria=memoria,
                tiempo_por_punto=1.0
        )

        # Cuarto comportamiento:
        # Activa efector
        activar_efector = AccionSimple(
                name="Activar efector",
                tiempo=1.0
        )

        # Quinto comportamiento:
        # Apaga efector
        apagar_efector = AccionSimple(
                name="Apagar efector",
                tiempo=1.0
        )

        # Agregar comportamientos al árbol
        root.add_children([
                solicitar_camino,
                mover_home,
                recorrer_camino,
                activar_efector,
                apagar_efector
        ])

        return root


# ============================================================
# FUNCIÓN: GRAFICAR ÁRBOL EN PNG
# ============================================================

def graficar_arbol_png(arbol, nombre_archivo="arbol_scara"):

        print("\nÁrbol en formato texto:")
        print(py_trees.display.ascii_tree(arbol))

        py_trees.display.render_dot_tree(
                root=arbol,
                name=nombre_archivo,
                target_directory="."
        )

        archivo_dot = f"{nombre_archivo}.dot"
        archivo_svg = f"{nombre_archivo}.svg"
        archivo_png = f"{nombre_archivo}.png"

        if os.path.exists(archivo_dot):
                os.remove(archivo_dot)

        if os.path.exists(archivo_svg):
                os.remove(archivo_svg)

        print(f"\nImagen PNG generada: {archivo_png}")

        try:
                from IPython.display import Image, display
                display(Image(filename=archivo_png))
        except Exception:
                print(f"Abre manualmente el archivo: {archivo_png}")


# ============================================================
# FUNCIÓN PRINCIPAL
# ============================================================

def main():

        arbol = crear_arbol()

        # Graficar estructura inicial
        graficar_arbol_png(arbol, "arbol_scara_con_busqueda")

        print("\nEjecutando árbol de comportamiento...\n")

        while arbol.status not in [
                py_trees.common.Status.SUCCESS,
                py_trees.common.Status.FAILURE
        ]:
                arbol.tick_once()
                time.sleep(0.1)

        if arbol.status == py_trees.common.Status.SUCCESS:
                print("\nTarea completa correctamente.")

        elif arbol.status == py_trees.common.Status.FAILURE:
                print("\nLa tarea falló.")

        print("\nÁrbol con estados finales:")
        print(py_trees.display.ascii_tree(arbol, show_status=True))


# ============================================================
# PUNTO DE ENTRADA
# ============================================================

if __name__ == "__main__":
        main()

BT y ROS2

sudo apt update
sudo apt install ros-humble-py-trees

Integración del proceso con BT y ROS2.

import rclpy
from rclpy.node import Node

from std_msgs.msg import String
from std_msgs.msg import Float32MultiArray
from std_msgs.msg import UInt8

import py_trees
import time

# ============================================================
# BASE DE DATOS DEL SISTEMA
# ============================================================

SISTEMAS_DB = {
        "sis_a": {
                "robot_articular": "ra1",
                "robot_movil": "rm_sis_a",

                "home_q": [0.0, 0.0, 0.0],
                "recolectar_q": [45.0, 10.0, 0.0],

                "estaciones_storage_q": {
                                0: [0.0, 0.0, 0.0],
                                1: [10.0, 0.0, 0.0],
                                2: [20.0, 0.0, 0.0],
                                3: [0.0, 10.0, 0.0],
                                4: [10.0, 10.0, 0.0],
                                5: [20.0, 10.0, 0.0],
                                6: [0.0, 20.0, 0.0],
                                7: [10.0, 20.0, 0.0],
                                8: [20.0, 20.0, 0.0],
                },

                "caminos_movil": {
                                "0A": [0, 3, 6],
                                "0B": [0, 3, 4, 7],
                                "0C": [0, 3, 4, 5, 8],
                                "1A": [1, 4, 3, 6],
                                "1B": [1, 4, 7],
                                "1C": [1, 4, 5, 8],
                                "2A": [2, 5, 4, 3, 6],
                                "2B": [2, 5, 4, 7],
                                "2C": [2, 5, 8],
                },
        },

        "sis_b": {
                "robot_articular": "ra2",
                "robot_movil": "rm_sis_b",

                "home_q": [0.0, 0.0, 0.0],
                "recolectar_q": [35.0, 10.0, 0.0],

                "estaciones_storage_q": {
                                0: [0.0, 0.0, 0.0],
                                1: [10.0, 0.0, 0.0],
                                2: [20.0, 0.0, 0.0],
                                3: [0.0, 10.0, 0.0],
                                4: [10.0, 10.0, 0.0],
                                5: [20.0, 10.0, 0.0],
                                6: [0.0, 20.0, 0.0],
                                7: [10.0, 20.0, 0.0],
                                8: [20.0, 20.0, 0.0],
                },

                "caminos_movil": {
                                "0A": [0, 3, 6],
                                "0B": [0, 3, 4, 7],
                                "0C": [0, 3, 4, 5, 8],
                                "1A": [1, 4, 3, 6],
                                "1B": [1, 4, 7],
                                "1C": [1, 4, 5, 8],
                                "2A": [2, 5, 4, 3, 6],
                                "2B": [2, 5, 4, 7],
                                "2C": [2, 5, 8],
                },
        },

        "sis_c": {
                "robot_articular": "ra3",
                "robot_movil": "rm_sis_c",

                "home_q": [0.0, 0.0, 0.0],
                "recolectar_q": [35.0, 10.0, 0.0],

                "estaciones_storage_q": {
                                0: [0.0, 0.0, 0.0],
                                1: [10.0, 0.0, 0.0],
                                2: [20.0, 0.0, 0.0],
                                3: [0.0, 10.0, 0.0],
                                4: [10.0, 10.0, 0.0],
                                5: [20.0, 10.0, 0.0],
                                6: [0.0, 20.0, 0.0],
                                7: [10.0, 20.0, 0.0],
                                8: [20.0, 20.0, 0.0],
                },

                "caminos_movil": {
                                "0A": [0, 3, 6],
                                "0B": [0, 3, 4, 7],
                                "0C": [0, 3, 4, 5, 8],
                                "1A": [1, 4, 3, 6],
                                "1B": [1, 4, 7],
                                "1C": [1, 4, 5, 8],
                                "2A": [2, 5, 4, 3, 6],
                                "2B": [2, 5, 4, 7],
                                "2C": [2, 5, 8],
                },
        },
}


# ============================================================
# FUNCIÓN: CONSULTAR TAREA
# ============================================================

def consultar_tarea(nombre_sistema, codigo_camino, estacion_storage):
        nombre_sistema = nombre_sistema.strip().lower()
        codigo_camino = codigo_camino.upper().strip()

        try:
                estacion_storage = int(estacion_storage)
        except ValueError:
                raise ValueError("La estación del storage debe ser un número entre 0 y 8.")

        if nombre_sistema not in SISTEMAS_DB:
                raise ValueError(f"El sistema '{nombre_sistema}' no existe.")

        sistema = SISTEMAS_DB[nombre_sistema]

        if codigo_camino not in sistema["caminos_movil"]:
                raise ValueError(
                                f"El camino móvil '{codigo_camino}' no existe en '{nombre_sistema}'."
                )

        if estacion_storage not in sistema["estaciones_storage_q"]:
                raise ValueError(
                                f"La estación '{estacion_storage}' no existe en '{nombre_sistema}'."
                )

        tarea = {
                "nombre_sistema": nombre_sistema,
                "codigo_camino": codigo_camino,
                "estacion_storage": estacion_storage,

                "robot_articular": sistema["robot_articular"],
                "robot_movil": sistema["robot_movil"],

                "camino_movil": sistema["caminos_movil"][codigo_camino],

                "home_q": sistema["home_q"],
                "recolectar_q": sistema["recolectar_q"],
                "storage_q": sistema["estaciones_storage_q"][estacion_storage],
        }

        return tarea


# ============================================================
# BEHAVIOR: ESPERAR Y VALIDAR TAREA
# ============================================================

class EsperarTarea(py_trees.behaviour.Behaviour):

        def __init__(self, name, node, memoria):
                super().__init__(name=name)
                self.node = node
                self.memoria = memoria

        def update(self):
                if not self.memoria["nueva_tarea"]:
                                return py_trees.common.Status.RUNNING

                texto_tarea = self.memoria["texto_tarea"]
                self.node.get_logger().info(f"Tarea recibida: {texto_tarea}")

                partes = texto_tarea.split()

                if len(partes) != 3:
                                self.node.get_logger().error(
                                        "Formato incorrecto. Use: 'sis_a 0B 7'"
                                )
                                self.memoria["nueva_tarea"] = False
                                return py_trees.common.Status.FAILURE

                nombre_sistema = partes[0]
                codigo_camino = partes[1]
                estacion_storage = partes[2]

                try:
                                tarea = consultar_tarea(
                                        nombre_sistema,
                                        codigo_camino,
                                        estacion_storage
                                )

                                self.memoria["tarea"] = tarea
                                self.memoria["robot_articular"] = tarea["robot_articular"]
                                self.memoria["robot_movil"] = tarea["robot_movil"]
                                self.memoria["nueva_tarea"] = False

                                self.node.get_logger().info("Tarea validada correctamente.")
                                self.node.get_logger().info(
                                        f"Robot móvil: {tarea['robot_movil']}"
                                )
                                self.node.get_logger().info(
                                        f"Robot articular: {tarea['robot_articular']}"
                                )
                                self.node.get_logger().info(
                                        f"Camino móvil: {tarea['camino_movil']}"
                                )
                                self.node.get_logger().info(
                                        f"Storage estación {tarea['estacion_storage']} -> q={tarea['storage_q']}"
                                )

                                return py_trees.common.Status.SUCCESS

                except ValueError as error:
                                self.node.get_logger().error(str(error))
                                self.memoria["nueva_tarea"] = False
                                return py_trees.common.Status.FAILURE


# ============================================================
# BEHAVIOR: INICIAR SISTEMA
# ============================================================

class IniciarSistema(py_trees.behaviour.Behaviour):

        def __init__(self, name, node, memoria, tiempo_espera=0.2):
                super().__init__(name=name)
                self.node = node
                self.memoria = memoria
                self.tiempo_espera = tiempo_espera
                self.tiempo_inicio = None

        def initialise(self):
                self.tiempo_inicio = time.time()

                msg = UInt8()
                msg.data = 1

                self.node.publisher_system_start.publish(msg)
                self.node.get_logger().info("Inicio del sistema enviado.")

        def update(self):
                if time.time() - self.tiempo_inicio >= self.tiempo_espera:
                                return py_trees.common.Status.SUCCESS

                return py_trees.common.Status.RUNNING


# ============================================================
# BEHAVIOR: ENVIAR CAMINO AL ROBOT MÓVIL
# ============================================================

class EnviarCaminoMovil(py_trees.behaviour.Behaviour):

        def __init__(self, name, node, memoria, timeout=20.0):
                super().__init__(name=name)
                self.node = node
                self.memoria = memoria
                self.timeout = timeout
                self.tiempo_inicio = None
                self.seen_moving = False

        def initialise(self):
                self.tiempo_inicio = time.time()
                self.seen_moving = False

                tarea = self.memoria["tarea"]
                robot_movil = tarea["robot_movil"]

                msg = Float32MultiArray()
                msg.data = [float(punto) for punto in tarea["camino_movil"]]

                self.node.publishers_mobile_path[robot_movil].publish(msg)

                self.node.get_logger().info(
                                f"[{robot_movil}] Camino móvil enviado: {msg.data}"
                )

        def update(self):
                tarea = self.memoria["tarea"]
                robot_movil = tarea["robot_movil"]

                motion_state = self.memoria["mobile_motion_state"][robot_movil]

                if motion_state == 1:
                                self.seen_moving = True
                                return py_trees.common.Status.RUNNING

                if self.seen_moving and motion_state == 0:
                                self.node.get_logger().info(
                                        f"[{robot_movil}] Camino móvil terminado."
                                )
                                return py_trees.common.Status.SUCCESS

                if time.time() - self.tiempo_inicio > self.timeout:
                                self.node.get_logger().error(
                                        f"[{robot_movil}] Timeout esperando fin del camino móvil."
                                )
                                return py_trees.common.Status.FAILURE

                return py_trees.common.Status.RUNNING


# ============================================================
# BEHAVIOR: MOVER ROBOT ARTICULAR
# ============================================================

class MoverJuntas(py_trees.behaviour.Behaviour):

        def __init__(self, name, node, memoria, target_key, timeout=10.0):
                super().__init__(name=name)

                self.node = node
                self.memoria = memoria
                self.target_key = target_key
                self.timeout = timeout

                self.tiempo_inicio = None
                self.seen_moving = False

        def initialise(self):
                self.tiempo_inicio = time.time()
                self.seen_moving = False

                tarea = self.memoria["tarea"]
                robot_articular = tarea["robot_articular"]

                q = tarea[self.target_key]

                msg = Float32MultiArray()
                msg.data = [float(valor) for valor in q]

                self.node.publishers_joint_goals[robot_articular].publish(msg)

                self.node.get_logger().info(
                                f"[{robot_articular}] {self.name} -> q={msg.data}"
                )

        def update(self):
                tarea = self.memoria["tarea"]
                robot_articular = tarea["robot_articular"]

                motion_state = self.memoria["articular_motion_state"][robot_articular]

                if motion_state == 1:
                                self.seen_moving = True
                                return py_trees.common.Status.RUNNING

                if self.seen_moving and motion_state == 0:
                                self.node.get_logger().info(
                                        f"[{robot_articular}] {self.name} terminado."
                                )
                                return py_trees.common.Status.SUCCESS

                if time.time() - self.tiempo_inicio > self.timeout:
                                self.node.get_logger().error(
                                        f"[{robot_articular}] Timeout en {self.name}."
                                )
                                return py_trees.common.Status.FAILURE

                return py_trees.common.Status.RUNNING


# ============================================================
# BEHAVIOR: EFECTOR ON/OFF
# ============================================================

class ComandoEfector(py_trees.behaviour.Behaviour):

        def __init__(self, name, node, memoria, activar=True, tiempo_espera=5):
                super().__init__(name=name)

                self.node = node
                self.memoria = memoria
                self.activar = activar
                self.tiempo_espera = tiempo_espera
                self.tiempo_inicio = None

        def initialise(self):
                self.tiempo_inicio = time.time()

                tarea = self.memoria["tarea"]
                robot_articular = tarea["robot_articular"]

                msg = UInt8()
                msg.data = 1 if self.activar else 0

                self.node.publishers_effector[robot_articular].publish(msg)

                estado = "ON" if self.activar else "OFF"

                self.node.get_logger().info(
                                f"[{robot_articular}] Efector {estado}"
                )

        def update(self):
                if time.time() - self.tiempo_inicio >= self.tiempo_espera:
                                return py_trees.common.Status.SUCCESS

                return py_trees.common.Status.RUNNING


# ============================================================
# NODO ROS 2 PRINCIPAL
# ============================================================

class ScaraBTNode(Node):

        def __init__(self):
                super().__init__("scara_bt_node")

                self.memoria = {
                                "nueva_tarea": False,
                                "texto_tarea": "",
                                "tarea": None,

                                "robot_articular": None,
                                "robot_movil": None,

                                "arbol_ocupado": False,

                                "articular_motion_state": {
                                        "ra1": None,
                                        "ra2": None,
                                        "ra3": None,
                                },

                                "mobile_motion_state": {
                                        "rm_sis_a": None,
                                        "rm_sis_b": None,
                                        "rm_sis_c": None,
                                },
                }

                self.publisher_system_start = self.create_publisher(
                                UInt8,
                                "/system/start",
                                10
                )

                self.publishers_mobile_path = {
                                "rm_sis_a": self.create_publisher(
                                        Float32MultiArray,
                                        "/rm_sis_a/path_cmd",
                                        10
                                ),
                                "rm_sis_b": self.create_publisher(
                                        Float32MultiArray,
                                        "/rm_sis_b/path_cmd",
                                        10
                                ),
                                "rm_sis_c": self.create_publisher(
                                        Float32MultiArray,
                                        "/rm_sis_c/path_cmd",
                                        10
                                ),
                }

                self.publishers_joint_goals = {
                                "ra1": self.create_publisher(
                                        Float32MultiArray,
                                        "/ra1/joint_goals_q",
                                        10
                                ),
                                "ra2": self.create_publisher(
                                        Float32MultiArray,
                                        "/ra2/joint_goals_q",
                                        10
                                ),
                                "ra3": self.create_publisher(
                                        Float32MultiArray,
                                        "/ra3/joint_goals_q",
                                        10
                                ),
                }

                self.publishers_effector = {
                                "ra1": self.create_publisher(UInt8, "/ra1/effector_cmd", 10),
                                "ra2": self.create_publisher(UInt8, "/ra2/effector_cmd", 10),
                                "ra3": self.create_publisher(UInt8, "/ra3/effector_cmd", 10),
                }

                self.sub_task = self.create_subscription(
                                String,
                                "/task_request",
                                self.callback_task_request,
                                10
                )

                self.sub_mobile_state_a = self.create_subscription(
                                UInt8,
                                "/rm_sis_a/motion_state",
                                lambda msg: self.callback_mobile_motion_state("rm_sis_a", msg),
                                10
                )

                self.sub_mobile_state_b = self.create_subscription(
                                UInt8,
                                "/rm_sis_b/motion_state",
                                lambda msg: self.callback_mobile_motion_state("rm_sis_b", msg),
                                10
                )

                self.sub_mobile_state_c = self.create_subscription(
                                UInt8,
                                "/rm_sis_c/motion_state",
                                lambda msg: self.callback_mobile_motion_state("rm_sis_c", msg),
                                10
                )

                self.sub_articular_state_ra1 = self.create_subscription(
                                UInt8,
                                "/ra1/motion_state",
                                lambda msg: self.callback_articular_motion_state("ra1", msg),
                                10
                )

                self.sub_articular_state_ra2 = self.create_subscription(
                                UInt8,
                                "/ra2/motion_state",
                                lambda msg: self.callback_articular_motion_state("ra2", msg),
                                10
                )

                self.sub_articular_state_ra3 = self.create_subscription(
                                UInt8,
                                "/ra3/motion_state",
                                lambda msg: self.callback_articular_motion_state("ra3", msg),
                                10
                )

                self.tree = self.crear_arbol()

                self.timer = self.create_timer(0.1, self.tick_tree)

                self.get_logger().info("Nodo Behavior Tree iniciado.")
                self.get_logger().info(
                                "Envía tareas con: /task_request String 'sis_a 0B 7'"
                )

        def callback_task_request(self, msg):
                if self.memoria["arbol_ocupado"]:
                                self.get_logger().warn(
                                        "Árbol ocupado. Tarea ignorada hasta terminar la actual."
                                )
                                return

                self.memoria["texto_tarea"] = msg.data
                self.memoria["nueva_tarea"] = True
                self.memoria["arbol_ocupado"] = True

                self.tree.stop(py_trees.common.Status.INVALID)

                self.get_logger().info(f"Nueva tarea recibida: {msg.data}")

        def callback_mobile_motion_state(self, robot_movil, msg):
                self.memoria["mobile_motion_state"][robot_movil] = int(msg.data)

        def callback_articular_motion_state(self, robot_articular, msg):
                self.memoria["articular_motion_state"][robot_articular] = int(msg.data)

        def crear_arbol(self):
                root = py_trees.composites.Sequence(
                                name="BT_MOVIL_ARTICULAR_STORAGE",
                                memory=True
                )

                esperar_tarea = EsperarTarea(
                                name="Esperar y validar tarea",
                                node=self,
                                memoria=self.memoria
                )

                iniciar_sistema = IniciarSistema(
                                name="Iniciar sistema",
                                node=self,
                                memoria=self.memoria
                )

                enviar_camino_movil = EnviarCaminoMovil(
                                name="Enviar camino móvil",
                                node=self,
                                memoria=self.memoria,
                                timeout=20.0
                )

                mover_recoleccion = MoverJuntas(
                                name="Mover articular a recolección",
                                node=self,
                                memoria=self.memoria,
                                target_key="recolectar_q",
                                timeout=10.0
                )

                activar_efector = ComandoEfector(
                                name="Activar efector",
                                node=self,
                                memoria=self.memoria,
                                activar=True,
                                tiempo_espera=0.2
                )

                mover_storage = MoverJuntas(
                                name="Mover articular a storage",
                                node=self,
                                memoria=self.memoria,
                                target_key="storage_q",
                                timeout=10.0
                )

                apagar_efector = ComandoEfector(
                                name="Apagar efector",
                                node=self,
                                memoria=self.memoria,
                                activar=False,
                                tiempo_espera=0.2
                )

                mover_home = MoverJuntas(
                                name="Mover articular a HOME",
                                node=self,
                                memoria=self.memoria,
                                target_key="home_q",
                                timeout=10.0
                )

                root.add_children([
                                esperar_tarea,
                                iniciar_sistema,
                                enviar_camino_movil,
                                mover_recoleccion,
                                activar_efector,
                                mover_storage,
                                apagar_efector,
                                mover_home,
                ])

                return root

        def limpiar_tarea(self):
                self.memoria["nueva_tarea"] = False
                self.memoria["texto_tarea"] = ""
                self.memoria["tarea"] = None
                self.memoria["robot_articular"] = None
                self.memoria["robot_movil"] = None
                self.memoria["arbol_ocupado"] = False

        def apagar_efector_seguro(self):
                robot_articular = self.memoria.get("robot_articular")

                if robot_articular in self.publishers_effector:
                                msg = UInt8()
                                msg.data = 0
                                self.publishers_effector[robot_articular].publish(msg)

        def tick_tree(self):
                self.tree.tick_once()

                if self.tree.status == py_trees.common.Status.SUCCESS:
                                self.get_logger().info("Tarea completada correctamente.")
                                self.limpiar_tarea()
                                self.tree.stop(py_trees.common.Status.INVALID)

                elif self.tree.status == py_trees.common.Status.FAILURE:
                                self.get_logger().error("La tarea falló.")
                                self.apagar_efector_seguro()
                                self.limpiar_tarea()
                                self.tree.stop(py_trees.common.Status.INVALID)


# ============================================================
# MAIN
# ============================================================

def main():
        rclpy.init()

        node = ScaraBTNode()

        try:
                rclpy.spin(node)
        except KeyboardInterrupt:
                pass

        node.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
        main()

Comandos para el control

Arranque del sistema:

ros2 topic pub /task_request std_msgs/msg/String "{data: 'sis_a 0B 7'}" -1

Control del robot móvil:

ros2 topic pub /rm_sis_a/motion_state std_msgs/msg/UInt8 "{data: 1}" -1
ros2 topic pub /rm_sis_a/motion_state std_msgs/msg/UInt8 "{data: 0}" -1

Control del robot articular:

ros2 topic pub /ra1/motion_state std_msgs/msg/UInt8 "{data: 1}" -1
ros2 topic pub /ra1/motion_state std_msgs/msg/UInt8 "{data: 0}" -1

Agregar recuperación de errores

La versión inicial retorna FAILURE si un movimiento supera el timeout. Luego puedes agregar un árbol de recuperación.

Estructura conceptual:

Selector: RootWithRecovery
├── Sequence: MainTask
│   ├── WaitStart
│   ├── MoveHome
│   ├── MovePick
│   ├── EffectorOn
│   ├── MovePlace
│   └── EffectorOff
└── Sequence: Recovery
    ├── EffectorOff
    └── MoveHome

Interpretación:

  • Primero intenta la tarea principal.

  • Si la tarea principal falla, ejecuta recuperación.

  • La recuperación apaga el efector y manda el robot a Home.

Ejemplo parcial de código:

root = py_trees.composites.Selector(
    name=f"RootWithRecovery_{self.robot_ns}",
    memory=False
)

main = py_trees.composites.Sequence(
    name=f"MainTask_{self.robot_ns}",
    memory=True
)

recovery = py_trees.composites.Sequence(
    name=f"Recovery_{self.robot_ns}",
    memory=True
)

main.add_children([
    WaitStart(...),
    MoveJointGoal(...),
    MoveJointGoal(...),
    EffectorCommand(...),
    MoveJointGoal(...),
    EffectorCommand(...),
])

recovery.add_children([
    EffectorCommand(
        name=f"RecoveryEffectorOff_{self.robot_ns}",
        node=self,
        pub_effector=self.pub_effector,
        on=False,
        settle_sec=0.2
    ),
    MoveJointGoal(
        name=f"RecoveryMoveHome_{self.robot_ns}",
        node=self,
        pub_goal=self.pub_goal,
        robot_status=self.robot_status,
        target=t["home"],
        timeout_sec=10.0
    ),
])

root.add_children([main, recovery])

Agregar reintentos

Puedes envolver un movimiento con Retry.

Ejemplo:

move_pick = MoveJointGoal(
    name=f"MovePick_{self.robot_ns}",
    node=self,
    pub_goal=self.pub_goal,
    robot_status=self.robot_status,
    target=t["pick"],
    timeout_sec=10.0
)

move_pick_retry = py_trees.decorators.Retry(
    name=f"RetryMovePick_{self.robot_ns}",
    child=move_pick,
    num_failures=2
)

Luego en la secuencia agregas move_pick_retry en vez de move_pick.

Agregar condición de pieza

Si más adelante agregas un sensor de presencia o visión artificial, puedes añadir un nodo de condición.

Tópico sugerido:

/raX/piece_detected

Tipo:

std_msgs/msg/Bool

Árbol modificado:

Sequence
├── WaitStart
├── MoveHome
├── MovePick
├── CheckPieceDetected
├── EffectorOn
├── MovePlace
└── EffectorOff

Ejemplo conceptual:

class CheckPieceDetected(py_trees.behaviour.Behaviour):
    def __init__(self, name, robot_status):
        super().__init__(name=name)
        self.robot_status = robot_status

    def update(self):
        if self.robot_status.get("piece_detected", False):
            return py_trees.common.Status.SUCCESS
        return py_trees.common.Status.FAILURE