Деревья поведения довольно легко реализовать в Python и, хотя существует несколько различных подходов, которые можно использовать, методы, используемые в пакете pi_trees, хорошо поддаются интеграции с темами, сервисами и действиями ROS. На самом деле пакет pitrees был смоделирован по образцу SMACH, так что некоторые части кода уже могли показаться знакомыми.
Основная библиотека pi_trees содержится в файле pi_trees_lib.py в каталоге pi_trees/pi_trees_lib/src. Классы ROS можно найти в файле pi_trees_ros.py в каталоге pi_trees/pi_trees_ros/src. Давайте начнем с pi_trees_lib.py.
Сначала мы определяем возможные значения состояния задачи, используя класс TaskStatus как своего рода перечисление. Оно может включать дополнительные значения состояния, такие как «ОШИБКА» или «НЕИЗВЕСТНО», но эти три должны быть достаточны для большинства приложений.
class Task(object):
""" The base Task class """
def __init__(self, name, children=None, args, *kwargs):
self.name = name
self.status = None
if children is None:
children = []
self.children = children
def run(self):
pass
def reset(self):
for c in self.children:
c.reset()
def add_child(self, c):
self.children.append(c)
def remove_child(self, c):
self.children.remove(c)
def prepend_child(self, c):
self.children.insert(0, c)
def insert_child(self, c, i):
self.children.insert(i, c)
def get_status(self):
return self.status
def set_status(self, s):
self.status = s
def announce(self):
print("Executing task" + str(self.name))
# Cледующие две функции позволяют нам использовать синтаксис «with»
def __enter__(self):
return self.name
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
return False
return True
Базовый класс задач определяет основной объект дерева поведения. Как минимум, он должен иметь имя и функцию run(), которая в целом будет не только выполнять некоторое поведение, но и возвращать его статус. Другими ключевыми функциями являются add_child() и remove_child(), которые позволяют нам удалять или добавлять подзадачи к составным задачам, таким как селекторы и последовательности (описанные ниже). Вы также можете использовать функции prepend_child() или insert_child() для добавления подзадачи с определенным приоритетом по отношению к другим задачам, уже находящимся в списке.
При создании собственных задач вы будете переопределять функцию run() с помощью кода, который выполняет действия вашей задачи. Затем он возвращает соответствующий статус задачи в зависимости от результата действия. Это станет ясно, когда мы рассмотрим пример патрульного бота позже.
Функция reset() полезна, когда мы хотим обнулить любые счетчики или другие переменные внутри для конкретной задачи и ее дочерних элементов.
class Selector(Task):
""" Run each subtask in sequence until one succeeds, or we run out of tasks. """
def __init__(self, name, args, **kwargs):
super(Selector, self).__init__(name, args, kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.FAILURE:
return c.status
return TaskStatus.FAILURE
Селектор выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает статус «ВЫПОЛНЕНИЕ», то селектор также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.
class Sequence(Task):
""" Run each subtask in sequence until one fails,
or we run out of tasks. """
def init(self, name, *args, kwargs):
super(Sequence, self).__init__(name, args, *kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.SUCCESS:
return c.status
return TaskStatus.SUCCESS
Последовательность выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает состояние «ВЫПОЛНЕНИЕ», то последовательность также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.
class Iterator(Task):
""" Iterate through all child tasks ignoring failure. """
def init(self, name, args, **kwargs):
super(Iterator, self).__init__(name, args, **kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.SUCCESS and c.status != TaskStatus.FAILURE:
return c.status
return TaskStatus.SUCCESS
Итератор ведет себя как последовательность, но игнорирует сбои.
class ParallelOne(Task):
""" A parallel task runs each child task at (roughly) the same time.
The ParallelOne task returns success as soon as any child succeeds. """
def init(self, name, args, **kwargs):
super(ParallelOne, self).__init__(name, args, kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status == TaskStatus.SUCCESS:
return TaskStatus.SUCCESS
return TaskStatus.FAILURE
Ключевое различие между составной задачей ParallelOne и Селектором заключается в том, что задача ParallelOne выполняет все свои задачи на каждом «тике» часов, если только (или пока) одна подзадача не будет выполнена успешно. Селектор продолжает выполнять первую подзадачу до тех пор, пока эта задача либо не завершится успешно, либо не завершится неудачно, прежде чем перейти к следующей подзадаче или вообще вернуться.
class ParallelAll(Task):
""" A parallel task runs each child task at (roughly) the same time.
The ParallelAll task requires all subtasks to succeed for it to succeed. """
def init(self, name, *args,kwargs):
super(ParallelAll, self).__init__(name, args, **kwargs)
def run(self):
nsuccess = 0
nchildren = len(self.children)
for c in self.children:
c.status = c.run()
if c.status == TaskStatus.SUCCESS:
n_success += 1
if c.status == TaskStatus.FAILURE:
return TaskStatus.FAILURE
if n_success == n_children:
return TaskStatus.SUCCESS
else:
return TaskStatus.RUNNING
Аналогично задаче ParallelOne, задача ParallelAll запускает каждую подзадачу на каждом тике часов, но продолжается до тех пор, пока все подзадачи не завершатся успешно или пока одна из них не завершится неудачей.
class Loop(Task):
""" Loop over one or more subtasks for the given number of iterations
use the value -1 to indicate a continual loop. """
def __init__(self, name, announce=True, *args, **kwargs):
super(Loop, self).__init__(name, *args, **kwargs)
self.iterations = kwargs['iterations']
self.announce = announce
self.loop_count = 0
self.name = name
print("Loop iterations: " + str(self.iterations))
def run(self):
while True:
if self.iterations != -1 and self.loop_count >= self.iterations:
return TaskStatus.SUCCESS
for c in self.children:
while True:
c.status = c.run()
if c.status == TaskStatus.SUCCESS:
break
return c.status
c.reset()
self.loop_count += 1
if self.announce:
print(self.name + " COMPLETED " + str(self.loop_count) + " LOOP(S)")
Цикл просто выполняет свои дочерние задачи для заданного числа итераций. Значение -1 для параметров итераций означает "цикл навсегда". Обратите внимание, что задача цикла по-прежнему является задачей сама по себе.
class IgnoreFailure(Task):
""" Always return either RUNNING or SUCCESS. """
def __init__(self, name, *args, **kwargs):
super(IgnoreFailure, self).__init__(name, *args, **kwargs)
def run(self):
for c in self.children:
c.status = c.run()
if c.status != TaskStatus.RUNNING:
return TaskStatus.SUCCESS
else:
return TaskStatus.RUNNING
return TaskStatus.SUCCESS
Задача «Игнорировать ошибку» (IgnoreFailure) просто превращает «НЕУДАЧА» в «УСПЕХ» для каждого из своих дочерних моделей поведения. Если состояние дочерней задачи – «ВЫПОЛНЕНИЕ», то IgnoreFailure также принимает статус «ВЫПОЛНЕНИЕ».