Деревья поведения довольно легко реализовать в Python и, хотя существует несколько различных подходов, которые можно использовать, методы, используемые в пакете pi_trees, хорошо поддаются интеграции с темами, сервисами и действиями ROS. На самом деле пакет pitrees был смоделирован по образцу SMACH, так что некоторые части кода уже могли показаться знакомыми.
Основная библиотека pi_trees содержится в файле pi_trees_lib.py в каталоге pi_trees/pi_trees_lib/src. Классы ROS можно найти в файле pi_trees_ros.py в каталоге pi_trees/pi_trees_ros/src. Давайте начнем с pi_trees_lib.py.
Ссылка на источник: pi_trees_lib.py
classTaskStatus():FAILURE=0SUCCESS=1RUNNING=2
Сначала мы определяем возможные значения состояния задачи, используя класс TaskStatus как своего рода перечисление. Оно может включать дополнительные значения состояния, такие как «ОШИБКА» или «НЕИЗВЕСТНО», но эти три должны быть достаточны для большинства приложений.
classTask(object):""" The base Task class """def__init__(self,name,children=None,args,*kwargs):self.name = nameself.status =Noneif children isNone: children =[]self.children = childrendefrun(self):passdefreset(self):for c inself.children: c.reset()defadd_child(self,c):self.children.append(c)defremove_child(self,c):self.children.remove(c)defprepend_child(self,c):self.children.insert(0, c)definsert_child(self,c,i):self.children.insert(i, c)defget_status(self):returnself.statusdefset_status(self,s):self.status = sdefannounce(self):print("Executing task"+str(self.name))# Cледующие две функции позволяют нам использовать синтаксис «with»def__enter__(self):returnself.namedef__exit__(self,exc_type,exc_val,exc_tb):if exc_type isnotNone:returnFalsereturnTrue
Базовый класс задач определяет основной объект дерева поведения. Как минимум, он должен иметь имя и функцию run(), которая в целом будет не только выполнять некоторое поведение, но и возвращать его статус. Другими ключевыми функциями являются add_child() и remove_child(), которые позволяют нам удалять или добавлять подзадачи к составным задачам, таким как селекторы и последовательности (описанные ниже). Вы также можете использовать функции prepend_child() или insert_child() для добавления подзадачи с определенным приоритетом по отношению к другим задачам, уже находящимся в списке.
При создании собственных задач вы будете переопределять функцию run() с помощью кода, который выполняет действия вашей задачи. Затем он возвращает соответствующий статус задачи в зависимости от результата действия. Это станет ясно, когда мы рассмотрим пример патрульного бота позже.
Функция reset() полезна, когда мы хотим обнулить любые счетчики или другие переменные внутри для конкретной задачи и ее дочерних элементов.
Селектор выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает статус «ВЫПОЛНЕНИЕ», то селектор также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.
Последовательность выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает состояние «ВЫПОЛНЕНИЕ», то последовательность также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.
Итератор ведет себя как последовательность, но игнорирует сбои.
Ключевое различие между составной задачей ParallelOne и Селектором заключается в том, что задача ParallelOne выполняет все свои задачи на каждом «тике» часов, если только (или пока) одна подзадача не будет выполнена успешно. Селектор продолжает выполнять первую подзадачу до тех пор, пока эта задача либо не завершится успешно, либо не завершится неудачно, прежде чем перейти к следующей подзадаче или вообще вернуться.
Аналогично задаче ParallelOne, задача ParallelAll запускает каждую подзадачу на каждом тике часов, но продолжается до тех пор, пока все подзадачи не завершатся успешно или пока одна из них не завершится неудачей.
Цикл просто выполняет свои дочерние задачи для заданного числа итераций. Значение -1 для параметров итераций означает "цикл навсегда". Обратите внимание, что задача цикла по-прежнему является задачей сама по себе.
Задача «Игнорировать ошибку» (IgnoreFailure) просто превращает «НЕУДАЧА» в «УСПЕХ» для каждого из своих дочерних моделей поведения. Если состояние дочерней задачи – «ВЫПОЛНЕНИЕ», то IgnoreFailure также принимает статус «ВЫПОЛНЕНИЕ».
class Selector(Task):
""" Run each subtask in sequence until one succeeds, or we run out of tasks. """
def __init__(self, name, args, **kwargs):
super(Selector, self).__init__(name, args, kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.FAILURE:
return c.status
return TaskStatus.FAILURE
class Sequence(Task):
""" Run each subtask in sequence until one fails,
or we run out of tasks. """
def init(self, name, *args, kwargs):
super(Sequence, self).__init__(name, args, *kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.SUCCESS:
return c.status
return TaskStatus.SUCCESS
class Iterator(Task):
""" Iterate through all child tasks ignoring failure. """
def init(self, name, args, **kwargs):
super(Iterator, self).__init__(name, args, **kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.SUCCESS and c.status != TaskStatus.FAILURE:
return c.status
return TaskStatus.SUCCESS
class ParallelOne(Task):
""" A parallel task runs each child task at (roughly) the same time.
The ParallelOne task returns success as soon as any child succeeds. """
def init(self, name, args, **kwargs):
super(ParallelOne, self).__init__(name, args, kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status == TaskStatus.SUCCESS:
return TaskStatus.SUCCESS
return TaskStatus.FAILURE
class ParallelAll(Task):
""" A parallel task runs each child task at (roughly) the same time.
The ParallelAll task requires all subtasks to succeed for it to succeed. """
def init(self, name, *args,kwargs):
super(ParallelAll, self).__init__(name, args, **kwargs)
def run(self):
nsuccess = 0
nchildren = len(self.children)
for c in self.children:
c.status = c.run()
if c.status == TaskStatus.SUCCESS:
n_success += 1
if c.status == TaskStatus.FAILURE:
return TaskStatus.FAILURE
if n_success == n_children:
return TaskStatus.SUCCESS
else:
return TaskStatus.RUNNING
class Loop(Task):
""" Loop over one or more subtasks for the given number of iterations
use the value -1 to indicate a continual loop. """
def __init__(self, name, announce=True, *args, **kwargs):
super(Loop, self).__init__(name, *args, **kwargs)
self.iterations = kwargs['iterations']
self.announce = announce
self.loop_count = 0
self.name = name
print("Loop iterations: " + str(self.iterations))
def run(self):
while True:
if self.iterations != -1 and self.loop_count >= self.iterations:
return TaskStatus.SUCCESS
for c in self.children:
while True:
c.status = c.run()
if c.status == TaskStatus.SUCCESS:
break
return c.status
c.reset()
self.loop_count += 1
if self.announce:
print(self.name + " COMPLETED " + str(self.loop_count) + " LOOP(S)")
class IgnoreFailure(Task):
""" Always return either RUNNING or SUCCESS. """
def __init__(self, name, *args, **kwargs):
super(IgnoreFailure, self).__init__(name, *args, **kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.RUNNING:
return TaskStatus.SUCCESS
else:
return TaskStatus.RUNNING
return TaskStatus.SUCCESS