3.10.2 Основные компоненты библиотеки pi_trees

Деревья поведения довольно легко реализовать в Python и, хотя существует несколько различных подходов, которые можно использовать, методы, используемые в пакете pi_trees, хорошо поддаются интеграции с темами, сервисами и действиями ROS. На самом деле пакет pitrees был смоделирован по образцу SMACH, так что некоторые части кода уже могли показаться знакомыми.

Основная библиотека pi_trees содержится в файле pi_trees_lib.py в каталоге pi_trees/pi_trees_lib/src. Классы ROS можно найти в файле pi_trees_ros.py в каталоге pi_trees/pi_trees_ros/src. Давайте начнем с pi_trees_lib.py.

Ссылка на источник: pi_trees_lib.py

class TaskStatus():
    FAILURE = 0
    SUCCESS = 1 
    RUNNING = 2

Сначала мы определяем возможные значения состояния задачи, используя класс TaskStatus как своего рода перечисление. Оно может включать дополнительные значения состояния, такие как «ОШИБКА» или «НЕИЗВЕСТНО», но эти три должны быть достаточны для большинства приложений.

class Task(object):
    """ The base Task class """
    
    def __init__(self, name, children=None, args, *kwargs): 
        self.name = name
        self.status = None
        if children is None:
             children = [] 
             self.children = children
    
    def run(self): 
        pass
    
    def reset(self):
        for c in self.children: 
            c.reset() 
    
    def add_child(self, c): 
        self.children.append(c) 
    
    def remove_child(self, c):
        self.children.remove(c)
    
    def prepend_child(self, c):
        self.children.insert(0, c) 
    
    def insert_child(self, c, i):
        self.children.insert(i, c)
    
    def get_status(self):
        return self.status
    
    def set_status(self, s): 
        self.status = s
    
    def announce(self):
        print("Executing task" + str(self.name))
    
    # Cледующие две функции позволяют нам использовать синтаксис «with»
    def __enter__(self):     
        return self.name
    
    def __exit__(self, exc_type, exc_val, exc_tb):        
        if exc_type is not None:             
            return False         
        return True

Базовый класс задач определяет основной объект дерева поведения. Как минимум, он должен иметь имя и функцию run(), которая в целом будет не только выполнять некоторое поведение, но и возвращать его статус. Другими ключевыми функциями являются add_child() и remove_child(), которые позволяют нам удалять или добавлять подзадачи к составным задачам, таким как селекторы и последовательности (описанные ниже). Вы также можете использовать функции prepend_child() или insert_child() для добавления подзадачи с определенным приоритетом по отношению к другим задачам, уже находящимся в списке.

При создании собственных задач вы будете переопределять функцию run() с помощью кода, который выполняет действия вашей задачи. Затем он возвращает соответствующий статус задачи в зависимости от результата действия. Это станет ясно, когда мы рассмотрим пример патрульного бота позже.

Функция reset() полезна, когда мы хотим обнулить любые счетчики или другие переменные внутри для конкретной задачи и ее дочерних элементов.

class Selector(Task):
    """ Run each subtask in sequence until one succeeds, or we run out of tasks. """
    
    def __init__(self, name, args, **kwargs): 
        super(Selector, self).__init__(name, args, kwargs) 
    
    def run(self):
        for c in self.children: 
            c.status = c.run()
            if c.status != TaskStatus.FAILURE:
                return c.status
        return TaskStatus.FAILURE 

Селектор выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает статус «ВЫПОЛНЕНИЕ», то селектор также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.

class Sequence(Task):
    """ Run each subtask in sequence until one fails, 
    or we run out of tasks. """
    
    def init(self, name, *args, kwargs): 
        super(Sequence, self).__init__(name, args, *kwargs)
    
    def run(self): 
        for c in self.children: 
            c.status = c.run()
            if c.status != TaskStatus.SUCCESS: 
                return c.status 
        return TaskStatus.SUCCESS 

Последовательность выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает состояние «ВЫПОЛНЕНИЕ», то последовательность также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.

class Iterator(Task):
    """ Iterate through all child tasks ignoring failure. """
 
    def init(self, name, args, **kwargs): 
        super(Iterator, self).__init__(name, args, **kwargs) 
    
    def run(self):
        for c in self.children:
            c.status = c.run() 
            if c.status != TaskStatus.SUCCESS and c.status != TaskStatus.FAILURE: 
                return c.status 
        return TaskStatus.SUCCESS

Итератор ведет себя как последовательность, но игнорирует сбои.

class ParallelOne(Task):
    """ A parallel task runs each child task at (roughly) the same time. 
    The ParallelOne task returns success as soon as any child succeeds. """
    
    def init(self, name, args, **kwargs): 
        super(ParallelOne, self).__init__(name, args, kwargs) 
    
    def run(self):
        for c in self.children: 
            c.status = c.run() 
            if c.status == TaskStatus.SUCCESS: 
                return TaskStatus.SUCCESS 
        return TaskStatus.FAILURE

Ключевое различие между составной задачей ParallelOne и Селектором заключается в том, что задача ParallelOne выполняет все свои задачи на каждом «тике» часов, если только (или пока) одна подзадача не будет выполнена успешно. Селектор продолжает выполнять первую подзадачу до тех пор, пока эта задача либо не завершится успешно, либо не завершится неудачно, прежде чем перейти к следующей подзадаче или вообще вернуться.

class ParallelAll(Task):
    """ A parallel task runs each child task at (roughly) the same time. 
    The ParallelAll task requires all subtasks to succeed for it to succeed. """
    
    def init(self, name, *args,kwargs): 
        super(ParallelAll, self).__init__(name, args, **kwargs)
    
    def run(self): 
        nsuccess = 0 
        nchildren = len(self.children) 
        for c in self.children: 
            c.status = c.run()
            if c.status == TaskStatus.SUCCESS: 
                n_success += 1 
            if c.status == TaskStatus.FAILURE: 
                return TaskStatus.FAILURE 
        if n_success == n_children: 
            return TaskStatus.SUCCESS 
        else:
            return TaskStatus.RUNNING

Аналогично задаче ParallelOne, задача ParallelAll запускает каждую подзадачу на каждом тике часов, но продолжается до тех пор, пока все подзадачи не завершатся успешно или пока одна из них не завершится неудачей.

class Loop(Task):     
"""     Loop over one or more subtasks for the given number of iterations
use the value -1 to indicate a continual loop.   """     
    
    def __init__(self, name, announce=True, *args, **kwargs):         
        super(Loop, self).__init__(name, *args, **kwargs)   
        self.iterations = kwargs['iterations']         
        self.announce = announce         
        self.loop_count = 0         
        self.name = name        
        print("Loop iterations: " + str(self.iterations))             
    
    def run(self):                 
        while True:             
            if self.iterations != -1 and self.loop_count >= self.iterations:                 
                return TaskStatus.SUCCESS                                     
        for c in self.children:                 
            while True:                    
                c.status = c.run()                                         
                if c.status == TaskStatus.SUCCESS:                         
                    break 
                return c.status                                  
            c.reset()                              
            self.loop_count += 1                         
            if self.announce:
                print(self.name + " COMPLETED " + str(self.loop_count) + " LOOP(S)") 

Цикл просто выполняет свои дочерние задачи для заданного числа итераций. Значение -1 для параметров итераций означает "цикл навсегда". Обратите внимание, что задача цикла по-прежнему является задачей сама по себе.

class IgnoreFailure(Task):     
"""     Always return either RUNNING or SUCCESS.     """     
    
    def __init__(self, name, *args, **kwargs):         
        super(IgnoreFailure, self).__init__(name, *args, **kwargs)      
    
    def run(self):         
        for c in self.children: 
            c.status = c.run() 
            if c.status != TaskStatus.RUNNING:                 
                return TaskStatus.SUCCESS           
            else:                 
                return TaskStatus.RUNNING        
        return TaskStatus.SUCCESS

Задача «Игнорировать ошибку» (IgnoreFailure) просто превращает «НЕУДАЧА» в «УСПЕХ» для каждого из своих дочерних моделей поведения. Если состояние дочерней задачи – «ВЫПОЛНЕНИЕ», то IgnoreFailure также принимает статус «ВЫПОЛНЕНИЕ».

Last updated