3.10.3 Классы дерева поведения, специфичные для ROS
Классы дерева поведения, специфичные для ROS, можно найти в файле pi_trees_ros.py в каталоге pi_trees/pi_trees_ros/src. Эта библиотека содержит три ключевые задачи ROS: MonitorTask для мониторинга темы ROS; ServiceTask для подключения к службе ROS; и SimpleActionTask для отправки целей на сервер действий ROS и получения обратной связи. Мы лишь кратко опишем здесь эти задачи, поскольку их использование наглядно продемонстрировано в последующих примерах программирования.
Ссылка на источник: pi_trees_ros.py
classMonitorTask(Task): """ Turn a ROS subscriber into a Task """def__init__(self,name,topic,msg_type,msg_cb,wait_for_message=True,timeout=5):super(MonitorTask, self).__init__(name) self.topic = topic self.msg_type = msg_type self.timeout = timeout self.msg_cb = msg_cb rospy.loginfo("Subscribing to topic "+ topic)if wait_for_message:try: rospy.wait_for_message(topic, msg_type, timeout=self.timeout) rospy.loginfo("Connected.")except: rospy.loginfo("Timed out waiting for "+ topic) # Подпишитесь на данную тему с помощью данной функции обратного вызова, выполняемой через run()
rospy.Subscriber(self.topic, self.msg_type, self._msg_cb)def_msg_cb(self,msg): self.set_status(self.msg_cb(msg))defrun(self): return self.status defreset(self): pass
MonitorTask подписывается на заданную тему ROS и выполняет заданную функцию обратного вызова. Функция обратного вызова определяется пользователем и отвечает за возврат одного из трех допустимых значений состояния задачи: «УСПЕХ», «НЕУДАЧА» или «ВЫПОЛНЕНИЕ».
classServiceTask(Task): """ Turn a ROS service into a Task. """ def __init__(self, name, service, service_type, request, result_cb=None, wait_for_service=True, timeout=5):
super(ServiceTask, self).__init__(name) self.result =None self.request = request self.timeout = timeout self.result_cb = result_cb rospy.loginfo("Connecting to service "+ service)if wait_for_service: rospy.loginfo("Waiting for service") rospy.wait_for_service(service, timeout=self.timeout) rospy.loginfo("Connected.")# Создание служебного прокси-сервера self.service_proxy = rospy.ServiceProxy(service, service_type)defrun(self): try: result = self.service_proxy(self.request)if self.result_cb isnotNone: self.result_cb(result)return TaskStatus.SUCCESS except: rospy.logerr(sys.exc_info())return TaskStatus.FAILURE defreset(self): pass
Задача ServiceTask упаковывает заданную службу ROS и при необходимости выполняет определяемую пользователем функцию обратного вызова. По умолчанию задача ServiceTask просто вызывает соответствующую службу ROS и возвращает «УСПЕХ», если только сам вызов службы не завершается неудачей, и в этом случае она возвращает «СБОЙ». Если пользователь передает функцию обратного вызова, эта функция может просто выполнить некоторый произвольный код или может вернуть состояние задачи.
classSimpleActionTask(Task): """ Turn a ROS action into a Task. """ def __init__(self, name, action, action_type, goal, rate=5, connect_timeout=10, result_timeout=30, reset_after=False, active_cb=None, done_cb=None, feedback_cb=None):
super(SimpleActionTask, self).__init__(name) self.action = action self.goal = goal self.tick =1.0/ rate self.rate = rospy.Rate(rate) self.result =None self.connect_timeout = connect_timeout self.result_timeout = result_timeout self.reset_after = reset_after if done_cb ==None: done_cb = self.default_done_cb self.done_cb = done_cb if active_cb ==None: active_cb = self.default_active_cb self.active_cb = active_cb if feedback_cb ==None: feedback_cb = self.default_feedback_cb self.feedback_cb = feedback_cb self.action_started =False self.action_finished =False self.goal_status_reported =False self.time_so_far =0.0# Возвращаемые значения состояния цели self.goal_states = ['PENDING','ACTIVE','PREEMPTED','SUCCEEDED','ABORTED','REJECTED','PREEMPTING','RECALLING','RECALLED','LOST'] rospy.loginfo("Connecting to action "+ action)# Подписываемся на базовый сервер действий self.action_client = actionlib.SimpleActionClient(action, action_type) rospy.loginfo("Waiting for move_base action server...")# Ждем до тайм-аута, чтобы сервер действий стал доступен try: self.action_client.wait_for_server(rospy.Duration(self.connect_timeout))except: rospy.loginfo("Timed out connecting to the action server "+ action) rospy.loginfo("Connected to action server")defrun(self): # Отправляем цель ifnot self.action_started: rospy.loginfo("Sending "+str(self.name) +" goal to action server...") self.action_client.send_goal(self.goal, done_cb=self.done_cb, active_cb=self.active_cb, feedback_cb=self.feedback_cb)
self.action_started =True ''' Здесь мы не можем использовать метод wait_for_result (), так как он блокирует все дерево, поэтому мы разбиваем его на временные срезы длительностью 1. '''
ifnot self.action_finished: self.time_so_far += self.tick self.rate.sleep()if self.time_so_far > self.result_timeout: self.action_client.cancel_goal() rospy.loginfo("Timed out achieving goal")return TaskStatus.FAILURE else:return TaskStatus.RUNNING else:# Проверяем статус конечной цели, возвращаемый default_done_cb if self.goal_status == GoalStatus.SUCCEEDED: self.action_finished =Trueif self.reset_after: self.reset()return TaskStatus.SUCCESS elif self.goal_status == GoalStatus.ABORTED: self.action_started =False self.action_finished =Falsereturn TaskStatus.FAILURE else: self.action_started =False self.action_finished =False self.goal_status_reported =Falsereturn TaskStatus.RUNNINGdefdefault_done_cb(self,status,result): # Проверяем финальный статус self.goal_status = status self.action_finished =Trueifnot self.goal_status_reported: rospy.loginfo(str(self.name) +" ended with status "+str(self.goal_states[status])) self.goal_status_reported =Truedefdefault_active_cb(self): passdefdefault_feedback_cb(self,msg): passdefreset(self): self.action_started =False self.action_finished =False self.goal_status_reported =False self.time_so_far =0.0
Задача SimpleActionTask имитирует задачу SimpleActionState, определенную в SMACH. Ее основная функция заключается в том, чтобы создать программу оболочку для клиента действия ROS (simple action client) и к тому же забрать имя действия, тип действия и цель в качестве аргументов. Она также может принимать аргументы, указывающие определенные пользователем функции обратного вызова для стандартных обратных вызовов: active_cb, done_cb и feedback_cb, которые передаются клиенту ROS. В частности, задача SimpleActionTask определяет функцию по умолчанию done_cb, которая сообщает конечное состояние действия, которое затем преобразуется в соответствующее состояние задачи для использования в остальной части дерева поведения.
Мы рассмотрим задачу SimpleActionTask более подробно в контексте ряда примеров программ, к которым мы обратимся далее.