Деревья поведения довольно легко реализовать в Python и, хотя существует несколько различных подходов, которые можно использовать, методы, используемые в пакете pi_trees, хорошо поддаются интеграции с темами, сервисами и действиями ROS. На самом деле пакет pitrees был смоделирован по образцу SMACH, так что некоторые части кода уже могли показаться знакомыми.
Основная библиотека pi_trees содержится в файле pi_trees_lib.py в каталоге pi_trees/pi_trees_lib/src. Классы ROS можно найти в файле pi_trees_ros.py в каталоге pi_trees/pi_trees_ros/src. Давайте начнем с pi_trees_lib.py.
Сначала мы определяем возможные значения состояния задачи, используя класс TaskStatus как своего рода перечисление. Оно может включать дополнительные значения состояния, такие как «ОШИБКА» или «НЕИЗВЕСТНО», но эти три должны быть достаточны для большинства приложений.
classTask(object):""" The base Task class """def__init__(self,name,children=None,args,*kwargs): self.name = name self.status =Noneif children isNone: children = [] self.children = childrendefrun(self): passdefreset(self):for c in self.children: c.reset()defadd_child(self,c): self.children.append(c)defremove_child(self,c): self.children.remove(c)defprepend_child(self,c): self.children.insert(0, c)definsert_child(self,c,i): self.children.insert(i, c)defget_status(self):return self.statusdefset_status(self,s): self.status = sdefannounce(self):print("Executing task"+str(self.name))# Cледующие две функции позволяют нам использовать синтаксис «with»def__enter__(self): return self.namedef__exit__(self,exc_type,exc_val,exc_tb): if exc_type isnotNone:returnFalsereturnTrue
Базовый класс задач определяет основной объект дерева поведения. Как минимум, он должен иметь имя и функцию run(), которая в целом будет не только выполнять некоторое поведение, но и возвращать его статус. Другими ключевыми функциями являются add_child() и remove_child(), которые позволяют нам удалять или добавлять подзадачи к составным задачам, таким как селекторы и последовательности (описанные ниже). Вы также можете использовать функции prepend_child() или insert_child() для добавления подзадачи с определенным приоритетом по отношению к другим задачам, уже находящимся в списке.
При создании собственных задач вы будете переопределять функцию run() с помощью кода, который выполняет действия вашей задачи. Затем он возвращает соответствующий статус задачи в зависимости от результата действия. Это станет ясно, когда мы рассмотрим пример патрульного бота позже.
Функция reset() полезна, когда мы хотим обнулить любые счетчики или другие переменные внутри для конкретной задачи и ее дочерних элементов.
classSelector(Task):""" Run each subtask in sequence until one succeeds, or we run out of tasks. """def__init__(self,name,args,**kwargs): super(Selector, self).__init__(name, args, kwargs)defrun(self):for c in self.children: c.status = c.run()if c.status != TaskStatus.FAILURE:return c.statusreturn TaskStatus.FAILURE
Селектор выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает статус «ВЫПОЛНЕНИЕ», то селектор также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.
classSequence(Task):""" Run each subtask in sequence until one fails, or we run out of tasks. """definit(self,name,*args,kwargs): super(Sequence, self).__init__(name, args, *kwargs)defrun(self): for c in self.children: c.status = c.run()if c.status != TaskStatus.SUCCESS:return c.status return TaskStatus.SUCCESS
Последовательность выполняет каждую дочернюю задачу в порядке списка до тех пор, пока одна из них не завершится успешно или пока не закончатся подзадачи. Обратите внимание, что если дочерняя задача возвращает состояние «ВЫПОЛНЕНИЕ», то последовательность также возвращает «ВЫПОЛНЕНИЕ» до тех пор, пока дочерняя задача не будет выполнена успешно или неудачно.
classIterator(Task):""" Iterate through all child tasks ignoring failure. """definit(self,name,args,**kwargs): super(Iterator, self).__init__(name, args, **kwargs)defrun(self):for c in self.children: c.status = c.run()if c.status != TaskStatus.SUCCESS and c.status != TaskStatus.FAILURE:return c.status return TaskStatus.SUCCESS
Итератор ведет себя как последовательность, но игнорирует сбои.
classParallelOne(Task):""" A parallel task runs each child task at (roughly) the same time. The ParallelOne task returns success as soon as any child succeeds. """definit(self,name,args,**kwargs): super(ParallelOne, self).__init__(name, args, kwargs)defrun(self):for c in self.children: c.status = c.run()if c.status == TaskStatus.SUCCESS:return TaskStatus.SUCCESS return TaskStatus.FAILURE
Ключевое различие между составной задачей ParallelOne и Селектором заключается в том, что задача ParallelOne выполняет все свои задачи на каждом «тике» часов, если только (или пока) одна подзадача не будет выполнена успешно. Селектор продолжает выполнять первую подзадачу до тех пор, пока эта задача либо не завершится успешно, либо не завершится неудачно, прежде чем перейти к следующей подзадаче или вообще вернуться.
classParallelAll(Task):""" A parallel task runs each child task at (roughly) the same time. The ParallelAll task requires all subtasks to succeed for it to succeed. """definit(self,name,*args,kwargs): super(ParallelAll, self).__init__(name, args, **kwargs)defrun(self): nsuccess =0 nchildren =len(self.children)for c in self.children: c.status = c.run()if c.status == TaskStatus.SUCCESS: n_success +=1if c.status == TaskStatus.FAILURE:return TaskStatus.FAILURE if n_success == n_children:return TaskStatus.SUCCESS else:return TaskStatus.RUNNING
Аналогично задаче ParallelOne, задача ParallelAll запускает каждую подзадачу на каждом тике часов, но продолжается до тех пор, пока все подзадачи не завершатся успешно или пока одна из них не завершится неудачей.
classLoop(Task): """ Loop over one or more subtasks for the given number of iterationsuse the value -1 to indicate a continual loop. """def__init__(self,name,announce=True,*args,**kwargs): super(Loop, self).__init__(name, *args, **kwargs) self.iterations = kwargs['iterations'] self.announce = announce self.loop_count =0 self.name = name print("Loop iterations: "+str(self.iterations))defrun(self): whileTrue:if self.iterations !=-1and self.loop_count >= self.iterations:return TaskStatus.SUCCESS for c in self.children:whileTrue: c.status = c.run()if c.status == TaskStatus.SUCCESS:breakreturn c.status c.reset() self.loop_count +=1if self.announce:print(self.name +" COMPLETED "+str(self.loop_count) +" LOOP(S)")
Цикл просто выполняет свои дочерние задачи для заданного числа итераций. Значение -1 для параметров итераций означает "цикл навсегда". Обратите внимание, что задача цикла по-прежнему является задачей сама по себе.
classIgnoreFailure(Task): """ Always return either RUNNING or SUCCESS. """def__init__(self,name,*args,**kwargs): super(IgnoreFailure, self).__init__(name, *args, **kwargs)defrun(self): for c in self.children: c.status = c.run()if c.status != TaskStatus.RUNNING:return TaskStatus.SUCCESS else:return TaskStatus.RUNNING return TaskStatus.SUCCESS
Задача «Игнорировать ошибку» (IgnoreFailure) просто превращает «НЕУДАЧА» в «УСПЕХ» для каждого из своих дочерних моделей поведения. Если состояние дочерней задачи – «ВЫПОЛНЕНИЕ», то IgnoreFailure также принимает статус «ВЫПОЛНЕНИЕ».