3.10.3 Классы дерева поведения, специфичные для ROS
Классы дерева поведения, специфичные для ROS, можно найти в файле pi_trees_ros.py в каталоге pi_trees/pi_trees_ros/src. Эта библиотека содержит три ключевые задачи ROS: MonitorTask для мониторинга темы ROS; ServiceTask для подключения к службе ROS; и SimpleActionTask для отправки целей на сервер действий ROS и получения обратной связи. Мы лишь кратко опишем здесь эти задачи, поскольку их использование наглядно продемонстрировано в последующих примерах программирования.
Ссылка на источник: pi_trees_ros.py
class MonitorTask(Task):
""" Turn a ROS subscriber into a Task """
def __init__(self, name, topic, msg_type, msg_cb, wait_for_message=True, timeout=5):
super(MonitorTask, self).__init__(name)
self.topic = topic
self.msg_type = msg_type
self.timeout = timeout
self.msg_cb = msg_cb
rospy.loginfo("Subscribing to topic " + topic)
if wait_for_message:
try:
rospy.wait_for_message(topic, msg_type, timeout=self.timeout)
rospy.loginfo("Connected.")
except:
rospy.loginfo("Timed out waiting for " + topic)
# Подпишитесь на данную тему с помощью данной функции обратного вызова, выполняемой через run()
rospy.Subscriber(self.topic, self.msg_type, self._msg_cb)
def _msg_cb(self, msg):
self.set_status(self.msg_cb(msg))
def run(self):
return self.status
def reset(self):
pass
MonitorTask подписывается на заданную тему ROS и выполняет заданную функцию обратного вызова. Функция обратного вызова определяется пользователем и отвечает за возврат одного из трех допустимых значений состояния задачи: «УСПЕХ», «НЕУДАЧА» или «ВЫПОЛНЕНИЕ».
class ServiceTask(Task):
""" Turn a ROS service into a Task. """
def __init__(self, name, service, service_type, request, result_cb=None, wait_for_service=True, timeout=5):
super(ServiceTask, self).__init__(name)
self.result = None
self.request = request
self.timeout = timeout
self.result_cb = result_cb
rospy.loginfo("Connecting to service " + service)
if wait_for_service:
rospy.loginfo("Waiting for service")
rospy.wait_for_service(service, timeout=self.timeout)
rospy.loginfo("Connected.")
# Создание служебного прокси-сервера
self.service_proxy = rospy.ServiceProxy(service, service_type)
def run(self):
try:
result = self.service_proxy(self.request)
if self.result_cb is not None:
self.result_cb(result)
return TaskStatus.SUCCESS
except:
rospy.logerr(sys.exc_info())
return TaskStatus.FAILURE
def reset(self):
pass
Задача ServiceTask упаковывает заданную службу ROS и при необходимости выполняет определяемую пользователем функцию обратного вызова. По умолчанию задача ServiceTask просто вызывает соответствующую службу ROS и возвращает «УСПЕХ», если только сам вызов службы не завершается неудачей, и в этом случае она возвращает «СБОЙ». Если пользователь передает функцию обратного вызова, эта функция может просто выполнить некоторый произвольный код или может вернуть состояние задачи.
class SimpleActionTask(Task):
""" Turn a ROS action into a Task. """
def __init__(self, name, action, action_type, goal, rate=5, connect_timeout=10, result_timeout=30, reset_after=False, active_cb=None, done_cb=None, feedback_cb=None):
super(SimpleActionTask, self).__init__(name)
self.action = action
self.goal = goal
self.tick = 1.0 / rate
self.rate = rospy.Rate(rate)
self.result = None
self.connect_timeout = connect_timeout
self.result_timeout = result_timeout
self.reset_after = reset_after
if done_cb == None:
done_cb = self.default_done_cb
self.done_cb = done_cb
if active_cb == None:
active_cb = self.default_active_cb
self.active_cb = active_cb
if feedback_cb == None:
feedback_cb = self.default_feedback_cb
self.feedback_cb = feedback_cb
self.action_started = False
self.action_finished = False
self.goal_status_reported = False
self.time_so_far = 0.0
# Возвращаемые значения состояния цели
self.goal_states = ['PENDING', 'ACTIVE', 'PREEMPTED',
'SUCCEEDED', 'ABORTED', 'REJECTED',
'PREEMPTING', 'RECALLING', 'RECALLED', 'LOST']
rospy.loginfo("Connecting to action " + action)
# Подписываемся на базовый сервер действий
self.action_client = actionlib.SimpleActionClient(action, action_type)
rospy.loginfo("Waiting for move_base action server...")
# Ждем до тайм-аута, чтобы сервер действий стал доступен
try:
self.action_client.wait_for_server(rospy.Duration(self.connect_timeout))
except:
rospy.loginfo("Timed out connecting to the action server " + action)
rospy.loginfo("Connected to action server")
def run(self):
# Отправляем цель
if not self.action_started:
rospy.loginfo("Sending " + str(self.name) + " goal to action server...")
self.action_client.send_goal(self.goal, done_cb=self.done_cb, active_cb=self.active_cb, feedback_cb=self.feedback_cb)
self.action_started = True
''' Здесь мы не можем использовать метод wait_for_result (), так как он блокирует все дерево, поэтому мы разбиваем его на временные срезы длительностью 1. '''
if not self.action_finished:
self.time_so_far += self.tick
self.rate.sleep()
if self.time_so_far > self.result_timeout:
self.action_client.cancel_goal()
rospy.loginfo("Timed out achieving goal")
return TaskStatus.FAILURE
else:
return TaskStatus.RUNNING
else:
# Проверяем статус конечной цели, возвращаемый default_done_cb
if self.goal_status == GoalStatus.SUCCEEDED:
self.action_finished = True
if self.reset_after:
self.reset()
return TaskStatus.SUCCESS
elif self.goal_status == GoalStatus.ABORTED:
self.action_started = False
self.action_finished = False
return TaskStatus.FAILURE
else:
self.action_started = False
self.action_finished = False
self.goal_status_reported = False
return TaskStatus.RUNNING
def default_done_cb(self, status, result):
# Проверяем финальный статус
self.goal_status = status
self.action_finished = True
if not self.goal_status_reported:
rospy.loginfo(str(self.name) + " ended with status " + str(self.goal_states[status]))
self.goal_status_reported = True
def default_active_cb(self):
pass
def default_feedback_cb(self, msg):
pass
def reset(self):
self.action_started = False
self.action_finished = False
self.goal_status_reported = False
self.time_so_far = 0.0
Задача SimpleActionTask имитирует задачу SimpleActionState, определенную в SMACH. Ее основная функция заключается в том, чтобы создать программу оболочку для клиента действия ROS (simple action client) и к тому же забрать имя действия, тип действия и цель в качестве аргументов. Она также может принимать аргументы, указывающие определенные пользователем функции обратного вызова для стандартных обратных вызовов: active_cb, done_cb и feedback_cb, которые передаются клиенту ROS. В частности, задача SimpleActionTask определяет функцию по умолчанию done_cb, которая сообщает конечное состояние действия, которое затем преобразуется в соответствующее состояние задачи для использования в остальной части дерева поведения.
Мы рассмотрим задачу SimpleActionTask более подробно в контексте ряда примеров программ, к которым мы обратимся далее.