3.10.3 Классы дерева поведения, специфичные для ROS

Классы дерева поведения, специфичные для ROS, можно найти в файле pi_trees_ros.py в каталоге pi_trees/pi_trees_ros/src. Эта библиотека содержит три ключевые задачи ROS: MonitorTask для мониторинга темы ROS; ServiceTask для подключения к службе ROS; и SimpleActionTask для отправки целей на сервер действий ROS и получения обратной связи. Мы лишь кратко опишем здесь эти задачи, поскольку их использование наглядно продемонстрировано в последующих примерах программирования.

Ссылка на источник: pi_trees_ros.py

class MonitorTask(Task):     
"""     Turn a ROS subscriber into a Task     """     
    
    def __init__(self, name, topic, msg_type, msg_cb, wait_for_message=True, timeout=5):
        super(MonitorTask, self).__init__(name) 
        self.topic = topic         
        self.msg_type = msg_type        
        self.timeout = timeout         
        self.msg_cb = msg_cb                          
        rospy.loginfo("Subscribing to topic " + topic)                 
        if wait_for_message:             
            try:                
                rospy.wait_for_message(topic, msg_type, timeout=self.timeout)                              
                rospy.loginfo("Connected.")             
            except:    
                rospy.loginfo("Timed out waiting for " + topic)                          
                # Подпишитесь на данную тему с помощью данной функции обратного вызова, выполняемой через run()         
                rospy.Subscriber(self.topic, self.msg_type, self._msg_cb)             
    
    def _msg_cb(self, msg):   
        self.set_status(self.msg_cb(msg))             
    
    def run(self):         
        return self.status         
    
    def reset(self):         
        pass

MonitorTask подписывается на заданную тему ROS и выполняет заданную функцию обратного вызова. Функция обратного вызова определяется пользователем и отвечает за возврат одного из трех допустимых значений состояния задачи: «УСПЕХ», «НЕУДАЧА» или «ВЫПОЛНЕНИЕ».

class ServiceTask(Task):     
"""     Turn a ROS service into a Task.   """     
    
    def __init__(self, name, service, service_type, request, result_cb=None, wait_for_service=True, timeout=5):         
        super(ServiceTask, self).__init__(name)    
        self.result = None         
        self.request = request         
        self.timeout = timeout         
        self.result_cb = result_cb                          
        rospy.loginfo("Connecting to service " + service)                              
        if wait_for_service:              
            rospy.loginfo("Waiting for service")             
            rospy.wait_for_service(service, timeout=self.timeout)            
            rospy.loginfo("Connected.")                  
            # Создание служебного прокси-сервера
            self.service_proxy = rospy.ServiceProxy(service, service_type)             
    
    def run(self):        
        try: 
            result = self.service_proxy(self.request)             
            if self.result_cb is not None:                
                self.result_cb(result)             
                return TaskStatus.SUCCESS         
        except:             
            rospy.logerr(sys.exc_info())             
            return TaskStatus.FAILURE             
    
    def reset(self):                       
        pass

Задача ServiceTask упаковывает заданную службу ROS и при необходимости выполняет определяемую пользователем функцию обратного вызова. По умолчанию задача ServiceTask просто вызывает соответствующую службу ROS и возвращает «УСПЕХ», если только сам вызов службы не завершается неудачей, и в этом случае она возвращает «СБОЙ». Если пользователь передает функцию обратного вызова, эта функция может просто выполнить некоторый произвольный код или может вернуть состояние задачи.

class SimpleActionTask(Task):     
"""     Turn a ROS action into a Task.    """     
    
    def __init__(self, name, action, action_type, goal, rate=5, connect_timeout=10, result_timeout=30, reset_after=False, active_cb=None, done_cb=None, feedback_cb=None):
        super(SimpleActionTask, self).__init__(name)              
        self.action = action         
        self.goal = goal         
        self.tick = 1.0 / rate            
        self.rate = rospy.Rate(rate) 
        self.result = None         
        self.connect_timeout = connect_timeout         
        self.result_timeout = result_timeout          
        self.reset_after = reset_after                 
        if done_cb == None:             
            done_cb = self.default_done_cb         
            self.done_cb = done_cb                 
        if active_cb == None:             
            active_cb = self.default_active_cb         
            self.active_cb = active_cb                 
        if feedback_cb == None:             
            feedback_cb = self.default_feedback_cb         
            self.feedback_cb = feedback_cb        
      
        self.action_started = False         
        self.action_finished = False         
        self.goal_status_reported = False         
        self.time_so_far = 0.0                  

        # Возвращаемые значения состояния цели         
        self.goal_states = ['PENDING', 'ACTIVE', 'PREEMPTED',  
                           'SUCCEEDED', 'ABORTED', 'REJECTED',                                      
                           'PREEMPTING', 'RECALLING', 'RECALLED', 'LOST']      
        
        rospy.loginfo("Connecting to action " + action) 

        # Подписываемся на базовый сервер действий         
        self.action_client = actionlib.SimpleActionClient(action, action_type) 

        rospy.loginfo("Waiting for move_base action server...")    
              
        # Ждем до тайм-аута, чтобы сервер действий стал доступен 
        try:   
            self.action_client.wait_for_server(rospy.Duration(self.connect_timeout))         
        except:             
            rospy.loginfo("Timed out connecting to the action server " + action)              
            rospy.loginfo("Connected to action server") 

    def run(self):         
        # Отправляем цель         
        if not self.action_started:                         
            rospy.loginfo("Sending " + str(self.name) + " goal to action server...")             
            self.action_client.send_goal(self.goal, done_cb=self.done_cb,  active_cb=self.active_cb, feedback_cb=self.feedback_cb)             
            self.action_started = True                   

            ''' Здесь мы не можем использовать метод wait_for_result (), так как он блокирует все дерево, поэтому мы разбиваем его на временные срезы длительностью 1.            ''' 
        if not self.action_finished:             
            self.time_so_far += self.tick             
            self.rate.sleep()             
            if self.time_so_far > self.result_timeout:                 
                self.action_client.cancel_goal()                 
                rospy.loginfo("Timed out achieving goal")                 
                return TaskStatus.FAILURE             
            else:                 
                return TaskStatus.RUNNING         
        else:             
            # Проверяем статус конечной цели, возвращаемый default_done_cb             
            if self.goal_status == GoalStatus.SUCCEEDED:                   
                self.action_finished = True    
                if self.reset_after:                       
                    self.reset()                   
                return TaskStatus.SUCCESS             
            elif self.goal_status == GoalStatus.ABORTED:                 
                self.action_started = False                 
                self.action_finished = False                 
                return TaskStatus.FAILURE             
            else: 
                self.action_started = False                 
                self.action_finished = False                 
                self.goal_status_reported = False                 
                return TaskStatus.RUNNING
    
    def default_done_cb(self, status, result):        
        # Проверяем финальный статус 
        self.goal_status = status         
        self.action_finished = True              
        if not self.goal_status_reported:         
            rospy.loginfo(str(self.name) + " ended with status " +  str(self.goal_states[status]))             
            self.goal_status_reported = True          
    
    def default_active_cb(self):         
        pass             
    
    def default_feedback_cb(self, msg):         
        pass               
    
    def reset(self):         
        self.action_started = False         
        self.action_finished = False         
        self.goal_status_reported = False         
        self.time_so_far = 0.0

Задача SimpleActionTask имитирует задачу SimpleActionState, определенную в SMACH. Ее основная функция заключается в том, чтобы создать программу оболочку для клиента действия ROS (simple action client) и к тому же забрать имя действия, тип действия и цель в качестве аргументов. Она также может принимать аргументы, указывающие определенные пользователем функции обратного вызова для стандартных обратных вызовов: active_cb, done_cb и feedback_cb, которые передаются клиенту ROS. В частности, задача SimpleActionTask определяет функцию по умолчанию done_cb, которая сообщает конечное состояние действия, которое затем преобразуется в соответствующее состояние задачи для использования в остальной части дерева поведения.

Мы рассмотрим задачу SimpleActionTask более подробно в контексте ряда примеров программ, к которым мы обратимся далее.

Last updated