Ejercicio práctico 1
Actividad
Diseñar y simular un sistema de clasificación de objetos basados en ROS2.
Para los siguientes programas es importante crear un directorio proyecto dentro del paquete mi_pkg_python.
Calibración de la cámara
Instalación de paquetes
Librerías necesarias:
OpenCV
pip install opencv-python
PyYAML
pip install pyyaml
pupil-apriltags para detección de tags
pip install pupil-apriltags
Codigos de ROS2 desarrollados en este repositorio.
Algoritmo de calibración
Para calibrar una cámara se utiliza el algoritmo de ajuste de matriz intrínseca basado en un patrón de tablero de ajedrez. Este tipo de calibración permite corregir las distorsiones ópticas generadas por el lente, como la distorsión radial y tangencial, mejorando así la precisión en tareas de visión por computadora.
Los requisitos para realizar la calibración son:
Una estructura fija donde se monte la cámara.
Un tablero de ajedrez de calibración impreso y plano.
Para más detalles sobre el algoritmo utilizado, puedes consultar la documentación oficial de OpenCV:
En este ejemplo se utiliza un tablero generado por el proyecto de Mark Hedley Jones, disponible en:
Además, se emplean AprilTags de 100 mm para pruebas adicionales de localización.
Captura de Imágenes del Tablero - Archivo:
0_captura.py- Abre la cámara, muestra una vista previa y guarda imágenes al presionarc. - Las imágenes se guardan en la carpetacalib_imgs/.Calibración con Tablero de Ajedrez - Archivo:
1_calib.py- Utiliza las imágenes para detectar esquinas de un tablero de 10x7. - Genera un archivocamera_calibration.yamlcon la matriz intrínseca y coeficientes de distorsión.Visualización de Corrección - Archivo:
3_cam.py- Muestra la imagen original y la corregida en tiempo real utilizando la calibración.Detección de AprilTags - Archivo:
4_apriltag.py- Usapupil_apriltagspara detectar tags y calcular sus poses. - Imprime la distancia entre el tag de referencia (ID 0) y los demás.
Parámetros
Tablero de calibración: 10 x 7 esquinas internas
Tamaño de cuadrado: 25 mm (0.025 m)
Distancia y pose se muestran en milímetros
Salida: camera_calibration.yaml (compatible con ROS)
Publicación de objetos simulados
Los objetos que se visualicen a través de la cámara serán digitalizados y simulados en RVIZ2.
Dependiendo del ID que tenga el tag, se realiza un cambio de color para distinguir la familia de tags relacionada con el tag_id 1 y el tag_id 2.
Código del proyecto: p1_pub_obj.py
import rclpy
from rclpy.node import Node
from avig_msg.msg import AprilTagPixelArray
from visualization_msgs.msg import Marker
from geometry_msgs.msg import Point
from std_msgs.msg import ColorRGBA
# Publicador para el seteo del modelo URDF
from sensor_msgs.msg import JointState
class CuboPublisher(Node):
def __init__(self):
super().__init__('cubo_publisher')
self.publisher = self.create_publisher(Marker, 'visualization_marker', 10)
# Suscripcion
self.subscription = self.create_subscription(
AprilTagPixelArray,
'/apriltag_pixels',
self.listener_callback,
1)
self.subscription
self.msg_cubos = None
# Timer del publicador
self.timer = self.create_timer(1.0, self.publicar_cubos)
def listener_callback(self,msg):
# Llegada del mensaje desde el nodo de la camara
self.msg_cubos = msg.tags
def publicar_cubos(self):
if self.msg_cubos != None:
# reviso el arreglo del tipo de mensajes apriltags
for i, tag in enumerate(self.msg_cubos):
# area de clasificacion TAG 1
if tag.id == 1:
cubo = Marker()
cubo.header.frame_id = 'world'
cubo.header.stamp = self.get_clock().now().to_msg()
cubo.ns = 'cubos'
cubo.id = i
cubo.type = Marker.CUBE
cubo.action = Marker.ADD
cubo.pose.position.x = tag.posx
cubo.pose.position.y = tag.posy
cubo.pose.position.z = 0.005 # para que se vea encima del suelo
cubo.pose.orientation.x = 0.0
cubo.pose.orientation.y = 0.0
cubo.pose.orientation.z = 0.0
cubo.pose.orientation.w = 1.0
cubo.scale.x = 0.10
cubo.scale.y = 0.10
cubo.scale.z = 0.01
cubo.color = ColorRGBA(r=1.0, g=0.0, b=0.0, a=0.8)
cubo.lifetime.sec = 0 # 0 = permanente
self.publisher.publish(cubo)
# area de clasificacion TAG 2
if tag.id == 2:
cubo = Marker()
cubo.header.frame_id = 'world'
cubo.header.stamp = self.get_clock().now().to_msg()
cubo.ns = 'cubos'
cubo.id = i
cubo.type = Marker.CUBE
cubo.action = Marker.ADD
cubo.pose.position.x = tag.posx
cubo.pose.position.y = tag.posy
cubo.pose.position.z = 0.005 # para que se vea encima del suelo
cubo.pose.orientation.x = 0.0
cubo.pose.orientation.y = 0.0
cubo.pose.orientation.z = 0.0
cubo.pose.orientation.w = 1.0
cubo.scale.x = 0.10
cubo.scale.y = 0.10
cubo.scale.z = 0.01
cubo.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=0.8)
cubo.lifetime.sec = 0 # 0 = permanente
self.publisher.publish(cubo)
# TAGs compatibles con el TAG 1
if 10 <= tag.id < 20:
cubo = Marker()
cubo.header.frame_id = 'world'
cubo.header.stamp = self.get_clock().now().to_msg()
cubo.ns = 'cubos'
cubo.id = i
cubo.type = Marker.CUBE
cubo.action = Marker.ADD
cubo.pose.position.x = tag.posx
cubo.pose.position.y = tag.posy
cubo.pose.position.z = 0.015 # para que se vea encima del suelo
cubo.pose.orientation.x = 0.0
cubo.pose.orientation.y = 0.0
cubo.pose.orientation.z = 0.0
cubo.pose.orientation.w = 1.0
cubo.scale.x = 0.03
cubo.scale.y = 0.03
cubo.scale.z = 0.03
cubo.color = ColorRGBA(r=1.0, g=0.0, b=0.0, a=1.0)
cubo.lifetime.sec = 0 # 0 = permanente
self.publisher.publish(cubo)
# TAGs compatibles con el TAG 2
if 20 <= tag.id < 30:
cubo = Marker()
cubo.header.frame_id = 'world'
cubo.header.stamp = self.get_clock().now().to_msg()
cubo.ns = 'cubos'
cubo.id = i
cubo.type = Marker.CUBE
cubo.action = Marker.ADD
cubo.pose.position.x = tag.posx
cubo.pose.position.y = tag.posy
cubo.pose.position.z = 0.015 # para que se vea encima del suelo
cubo.pose.orientation.x = 0.0
cubo.pose.orientation.y = 0.0
cubo.pose.orientation.z = 0.0
cubo.pose.orientation.w = 1.0
cubo.scale.x = 0.03
cubo.scale.y = 0.03
cubo.scale.z = 0.03
cubo.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=1.0)
cubo.lifetime.sec = 0 # 0 = permanente
self.publisher.publish(cubo)
def main(args=None):
rclpy.init(args=args)
node = CuboPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Modificación de servicios
Para solucionar el problema de generar una heurística que permita la toma decisiones con respecto a que tag debe clasificar primero el sistema. Se crea un nuevo
archivo srv HeuristicaP dentro del paquete avig_msg, teniendo de requermiento y respuesta un AprilTagPixelArray.
Dado que en la sección de tutoriales se creó una solución para este problema, unicamente es necesario cambiar la respuesta del servidor en base al nuevo tipo de archivo srv
Código del proyecto p1_heuristica_server.py
import rclpy
from rclpy.node import Node
from avig_msg.srv import HeuristicaP
from avig_msg.msg import AprilTagPixel
import math
class EuristicaServer(Node):
def __init__(self):
# Declaracion del nodo
super().__init__('euristica_server')
# Creacion del servicio
self.srv = self.create_service(HeuristicaP, '/heuristica', self.heuristica_callback)
self.get_logger().info('Servicio Euristica listo.')
def heuristica_callback(self, request, response):
tags = request.tags_in.tags
self.get_logger().info(f"Recibidos {tags} tags.")
if not tags:
self.get_logger().warn("No se recibió ningún tag.")
return response
tag1 = next((tag for tag in tags if tag.id == 1), None)
tag2 = next((tag for tag in tags if tag.id == 2), None)
if tag1 is None or tag2 is None:
self.get_logger().warn("Faltan tag1 o tag2, no se puede continuar.")
return response
tags_ordenados = [tag for tag in tags if tag.id != 0 and tag.id != 1 and tag.id != 2]
for i, tag in enumerate(tags_ordenados):
self.get_logger().info(f"Revisando el tag: {tag.id}")
if 10 <= tag.id < 20:
tag.dist = math.sqrt((tag.posx - tag1.posx)**2 + (tag.posy - tag1.posy)**2)
else:
tag.dist = math.sqrt((tag.posx - tag2.posx)**2 + (tag.posy - tag2.posy)**2)
self.get_logger().info(f"Distancia del Tag: {tag.id} es {tag.dist}")
# Heurística: devolver el tag con menor coordenada posx
tag_ordenado = sorted(tags_ordenados, key=lambda t: t.dist)
self.get_logger().info(f"Revisando el tag: {tag_ordenado}")
response.tags_out.tags = tag_ordenado
return response
def main(args=None):
rclpy.init(args=args)
node = EuristicaServer()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
Cinemática Inversa
Modelo Geométrico
Se considera un manipulador plano con dos eslabones de longitud:
\(l_1\): longitud del brazo
\(l_2\): longitud del antebrazo
Y dos articulaciones:
\(q_1\): ángulo del primer eslabón respecto al eje X de
base_link\(q_2\): ángulo del segundo eslabón respecto al primero
El extremo del robot (end-effector) está ubicado en el plano XY, y el origen del robot está desplazado desde el origen global (world) por:
\(dx\): desplazamiento en X
\(dy\): desplazamiento en Y
Formulación
Dado un punto deseado en coordenadas globales:
El primer paso es trasladar ese punto al marco del robot (base_link):
A partir de este punto deseado relativo al marco base, se aplica la cinemática inversa para resolver los ángulos de las articulaciones.
Se define:
El ángulo \(q_2\) se obtiene mediante:
Y el ángulo \(q_1\) se obtiene por:
Restricciones
El valor absoluto de \(D\) debe ser menor o igual a 1 para garantizar solución real.
Si \(|D| > 1\), el punto está fuera del alcance del robot.
Ejemplo en Código
Con el fin de automatizar el proceso de carga del desplazamiento del origen del URDF del robot SCARA creado, con respecto al april-tag 0, y de los eslabones correspondientes a q1 y q2, se implementa un algoritmo que a través del uso de TF2 carga automaticamente estos valores al lanzarse el launcher con el URDF y visualizador RVIZ.
Código del proyecto p1_ci.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TransformStamped, Pose
from tf2_ros import Buffer, TransformListener, LookupException, TimeoutException
from math import acos, atan2, sqrt
import numpy as np
class CinematicaInversaTF(Node):
def __init__(self):
super().__init__('cinematica_inversa_tf')
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
self.pub = self.create_publisher(Pose, '/angulos_mov', 1)
self.sub = self.create_subscription(Pose, '/puntos_ci', self.callback_ci, 1)
self.timer = self.create_timer(1.0, self.obtener_parametros_robot)
self.dx = None
self.dy = None
self.l1 = None
self.l2 = None
def obtener_parametros_robot(self):
try:
t0 = self.tf_buffer.lookup_transform('world', 'base_link', rclpy.time.Time())
t1 = self.tf_buffer.lookup_transform('brazo_link', 'antebrazo_link', rclpy.time.Time())
t2 = self.tf_buffer.lookup_transform('antebrazo_link', 'efector_link', rclpy.time.Time())
self.dx = t0.transform.translation.x
self.dy = t0.transform.translation.y
self.l1 = sqrt(t1.transform.translation.x**2 + t1.transform.translation.y**2)
self.l2 = sqrt(t2.transform.translation.x**2 + t2.transform.translation.y**2)
self.get_logger().info(f"Obtenido: l1={self.l1:.3f}, l2={self.l2:.3f}, dx={self.dx:.3f}, dy={self.dy:.3f}")
self.timer.cancel() # Ya no es necesario repetir
except (LookupException, TimeoutException) as e:
self.get_logger().warn("Esperando transformaciones...")
def callback_ci(self, msg):
if None in (self.dx, self.dy, self.l1, self.l2):
self.get_logger().warn("Parámetros del robot aún no disponibles.")
return
try:
xa = msg.position.x
ya = msg.position.y
# Convertir a sistema base_link
xd = xa - self.dx
yd = ya - self.dy
D = (xd**2 + yd**2 - self.l1**2 - self.l2**2) / (2 * self.l1 * self.l2)
if abs(D) > 1:
self.get_logger().error("Punto fuera del alcance geométrico.")
return
q2 = acos(D)
q1 = atan2(yd, xd) - atan2(self.l2 * np.sin(q2), self.l1 + self.l2 * np.cos(q2))
self.get_logger().info(f"q1 = {q1:.3f} rad, q2 = {q2:.3f} rad")
msg_enviar = Pose()
msg_enviar.orientation.x = q1
msg_enviar.orientation.y = q2
msg_enviar.position.z = msg.position.z
self.get_logger().info(f'Enviando angulos {msg_enviar}')
self.pub.publish(msg_enviar)
except Exception as e:
self.get_logger().error(f"Error en cinemática inversa: {str(e)}")
def main(args=None):
rclpy.init(args=args)
node = CinematicaInversaTF()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
Conexión ROS2 - ESP32
Utilizando el archivo base de urdf-mqtt.py, se realizan las modificaciones necesarias para suscribirse a un topic, que publique los angulos que debe moverse cada juntas para alcanzar los diferentes objetos en el espacio de trabajo.
Código del proyecto p1_puente.py
import rclpy
from rclpy.node import Node
from rclpy.duration import Duration
# librerias soporte URDF
from std_msgs.msg import Float32
from sensor_msgs.msg import JointState
from geometry_msgs.msg import Pose
# librerias MQTT
import paho.mqtt.client as mqtt
import json
import math
class MQTTBridge(Node):
def __init__(self):
super().__init__('mqtt_bridge')
# Angulos del robot
self.real_q1 = 0.0
self.real_q2 = 0.0
self.real_q3 = 0.0
self.pub = self.create_publisher( Float32, 'sensor_bateria', 1)
self.subscription = self.create_subscription(
Pose,
'/angulos_mov',
self.listener_ros, 1
)
self.pub_joint = self.create_publisher(
JointState,
'/joint_states',
1
)
self.last_data = None
self.active = True # control de publicación activa
self.last_mqtt_time = self.get_clock().now()
self.timer = self.create_timer(0.1, self.publish_sensor_data) # Publicador (10 Hz)
self.timer_watchdog = self.create_timer(0.5, self.check_timeout) # Verificador de tiempo
self.topic_sub = "ra/sensores"
self.topic_pub = "ra/juntas"
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect("192.168.100.180", 1883, 60)
self.mqtt_client.loop_start()
def listener_ros(self, msg):
self.get_logger().info('Arrancando Puente')
q1_mov = 0.0
q2_mov = 0.0
q3_mov = 0.0
# Resolucion del stepper 0.9
meta = round(math.degrees(msg.orientation.x),4)
self.real_q1, q1_mov = self.mover_a_angulo_discreto(meta,self.real_q1)
meta = round(math.degrees(msg.orientation.y),4)
self.real_q2, q2_mov = self.mover_a_angulo_discreto(meta,self.real_q2)
payload = {
"q1": q1_mov,
"q2": q2_mov
}
msg_j = JointState()
msg_j.header.stamp = self.get_clock().now().to_msg()
msg_j.name = ['brazo_joint', 'antebrazo_joint', 'efector_joint']
msg_j.position = [math.radians(self.real_q1), math.radians(self.real_q2), msg.position.z]
self.pub_joint.publish(msg_j)
self.get_logger().info('Postura inicial publicada.')
msg_mqtt = json.dumps(payload)
self.mqtt_client.publish(self.topic_pub, msg_mqtt)
print("Mensaje Enviado")
def mover_a_angulo_discreto(self, angulo_objetivo, angulo_actual, paso=0.9):
"""
Calcula el desplazamiento al múltiplo de 'paso' más cercano al ángulo objetivo.
Retorna:
- el nuevo ángulo corregido (múltiplo de paso)
- el desplazamiento angular necesario desde el ángulo actual
"""
# Calcula desplazamiento
desplazamiento = angulo_objetivo - angulo_actual
# Redondea el ángulo objetivo al múltiplo más cercano
desplazamiento_valido = round(desplazamiento / paso) * paso
meta_ajustada = angulo_actual + desplazamiento_valido
print(f"[INFO] Objetivo :{angulo_objetivo}°")
print(f"[INFO] Objetivo ajustado: {meta_ajustada}° (múltiplo de {paso}°)")
print(f"[INFO] Desplazamiento desde actual: {desplazamiento_valido:.2f}°")
return round(meta_ajustada,2), round(desplazamiento_valido,2)
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Conectado al broker MQTT")
client.subscribe(self.topic_sub)
else:
print(f"Error de conexión: código {rc}")
def on_message(self, client, userdata, msg):
try:
mensaje = msg.payload.decode("utf-8")
data = json.loads(mensaje)
if msg.topic == self.topic_sub:
self.last_data = float(data["bateria"])
self.last_mqtt_time = self.get_clock().now() # Actualiza tiempo del último dato
self.active = True
except Exception as e:
print("Error procesando mensaje:", e)
def publish_sensor_data(self):
if self.last_data is not None and self.active:
ros_msg = Float32()
ros_msg.data = self.last_data
self.pub.publish(ros_msg)
self.get_logger().info(f"ROS2 publicó: {ros_msg.data}")
def check_timeout(self):
now = self.get_clock().now()
if now - self.last_mqtt_time > Duration(seconds=2.0):
if self.active:
self.get_logger().warn("No se reciben datos desde MQTT. Se detiene la publicación.")
self.active = False
def main(args=None):
rclpy.init(args=args)
node = MQTTBridge()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
rclpy.shutdown()
Modificacion de archivos launch
En el archivo launcher es necesario eliminar el slider creado para la manipulación del URDF, dado a que este ahora se moverá en base al algoritmo de control del sistema creado.
Código del proyecto p1.launch.py
# Importa la clase principal para definir lanzamientos en ROS 2
from launch import LaunchDescription
# Importa la acción Node para lanzar nodos ROS 2
from launch_ros.actions import Node
# Permite obtener la ruta del directorio share de un paquete instalado
from ament_index_python.packages import get_package_share_directory
# Controlador de lanzamiento
from launch.actions import TimerAction
# Módulo estándar para trabajar con rutas de archivos
import os
# Función principal requerida por ROS 2 para ejecutar este archivo de lanzamiento
def generate_launch_description():
# Construye la ruta completa del archivo URDF dentro del paquete
urdf_file = os.path.join(
get_package_share_directory('mi_pkg_python'), # Paquete que contiene el URDF
'urdf',
'ensamblaje.urdf'
)
# Devuelve la lista de nodos a lanzar
return LaunchDescription([
# Nodo que publica el URDF en el topic /robot_description
Node(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
parameters=[{'robot_description': open(urdf_file).read()}]
),
# Nodo que lanza RViz2 para visualizar el robot
Node(
package='rviz2',
executable='rviz2',
name='rviz2',
output='screen'
),
# Nodo que agrega una transformación estática: world → base_link
Node(
package='tf2_ros',
executable='static_transform_publisher',
name='static_tf_pub',
arguments=['0.10', '0.10', '0.0', # x y z (en metros)
'0', '0', '0', # roll pitch yaw (en radianes)
'world', 'base_link'], # parent frame, child frame
output='screen'
),
# Nodo personalizados
Node(
package='mi_pkg_python',
executable='p1_pub_obj',
name='nodo_publicador_objetos',
output='screen'
),
Node(
package='mi_pkg_python',
executable='p1_heur',
name='nodo_heuristica_server',
output='screen'
),
Node(
package='mi_pkg_python',
executable='p1_ci',
name='nodo_ci',
output='screen'
),
Node(
package='mi_pkg_python',
executable='p1_puente',
name='nodo_puente_mqtt',
output='screen'
),
TimerAction(
period=2.0, # esperar 2 segundos
actions=[
Node(
package='mi_pkg_python',
executable='p1_set',
name='init_joints',
output='screen'
)
]
)
])
Dado que, se han eliminado los slider es necesario crear un archivo que publique los valores iniciales de las juntas q1,q2 y q3.
Código del proyecto p1_set.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
class JointInitializer(Node):
def __init__(self):
super().__init__('init_joints')
self.publisher = self.create_publisher(JointState, '/joint_states', 1)
# Publicar al iniciar
self.timer = self.create_timer(0.5, self.publicar_posicion)
def publicar_posicion(self):
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = ['brazo_joint', 'antebrazo_joint', 'efector_joint']
msg.position = [0.0, 0.0, 0.0]
self.publisher.publish(msg)
self.get_logger().info('Postura inicial publicada.')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
node = JointInitializer()
rclpy.spin(node)
if __name__ == '__main__':
main()
Algoritmo de control de proceso
Para realizar el ejercicio de clasificación de objetos es necesario aplicar un algoritmo que permita serilizar el proceso que tendra que hacer el robot.
Código del proyecto p1_coordinador.py
import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from avig_msg.msg import AprilTagPixelArray
from avig_msg.srv import HeuristicaP
from sensor_msgs.msg import JointState
from geometry_msgs.msg import Pose
import time
class NodoCoordinador(Node):
def __init__(self):
super().__init__('nodo_coordinador')
# Tags puntos de almacenes
self.tag_id_1 = None
self.tag_id_2 = None
# Publicador para setear la posición inicial
self.pub_joint = self.create_publisher(JointState, '/joint_states', 1)
# CI / Puente
self.pub_tra = self.create_publisher(Pose, '/puntos_ci', 1)
# Cliente de servicio y acción
self.cli = self.create_client(HeuristicaP, '/heuristica')
self.sub = self.create_subscription(
AprilTagPixelArray,
'/apriltag_pixels',
self.callback_tags,
1
)
self.ultima_data = None
self.proceso_activo = False
self.get_logger().info('P "y" para comenzar el proceso...')
self.esperar_input_usuario()
def esperar_input_usuario(self):
import threading
def esperar_tecla():
while True:
tecla = input()
if tecla.lower() == 'y':
self.get_logger().info('Iniciando el proceso...')
self.proceso_activo = True
break
threading.Thread(target=esperar_tecla, daemon=True).start()
def setear_posicion_inicial(self):
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = ['brazo_joint', 'antebrazo_joint', 'efector_joint']
msg.position = [0.0, 0.0, 0.0]
self.pub_joint.publish(msg)
## Sleep de 2 segundos
self.get_logger().info('Posición inicial seteada.')
def callback_tags(self, msg):
if not self.proceso_activo:
return
tags = msg.tags
self.tag_id_1 = next((tag for tag in tags if tag.id == 1), None)
self.tag_id_2 = next((tag for tag in tags if tag.id == 2), None)
if self.tag_id_1 is None or self.tag_id_2 is None:
self.get_logger().warn("Faltan tag1 o tag2, no se puede continuar.")
self.esperar_input_usuario()
self.proceso_activo = False
return
if not self.cli.service_is_ready():
self.get_logger().warn('Servicio no está disponible.')
return
if not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().error('Timeout esperando servicio.')
return
request = HeuristicaP.Request()
request.tags_in.tags = msg.tags
self.proceso_activo = False
future = self.cli.call_async(request)
future.add_done_callback(self.llamar_accion)
def llamar_accion(self, future):
try:
self.get_logger().info(f'Solocitando CI')
response = future.result()
# Envio de resultados el movimiento del primer valor
for i, tag in enumerate(response.tags_out.tags):
goal_msg = Pose()
goal_msg.position.x = tag.posx
goal_msg.position.y = tag.posy
goal_msg.position.z = 0.0
self.pub_tra.publish(goal_msg)
time.sleep(3)
self.get_logger().info(f'Enviando objetivos al nodo CI 1 {goal_msg}')
if 10 <= tag.id < 20:
goal_msg = Pose()
goal_msg.position.x = self.tag_id_1.posx
goal_msg.position.y = self.tag_id_1.posy
goal_msg.position.z = 0.03
self.pub_tra.publish(goal_msg)
time.sleep(3)
self.get_logger().info(f'Enviando objetivos al nodo CI 1 {goal_msg}')
self.get_logger().info(f'Ejercicio completo{goal_msg}')
rclpy.shutdown()
except Exception as e:
self.get_logger().error(f'Error en servicio Herustica: {str(e)}')
def main(args=None):
rclpy.init(args=args)
node = NodoCoordinador()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
rclpy.shutdown()
if __name__ == '__main__':
main()