User-defined Custom Tasks in RMF Task

注意:用户定义的自定义任务目前处于实验阶段

新的通用任务组合系统正在开发中,讨论可在此处 找到。

处理 RMF 任务时,有两个包:

rmf_task 提供 API 和基类,用于在 RMF 中定义和管理任务。任务被定义为生成阶段的对象,这些阶段是一系列有意义的步骤,可产生理想的结果。每个任务都有一个描述和一个组件,允许我们在给定初始状态的情况下模拟机器人完成任务后的状态,还有一个组件将命令实际机器人执行任务。

rmf_task_sequence 提供了 rmf_task 的开箱即用实现,其中 Task 对象由一系列阶段定义。因此,此类任务生成的阶段将与用于定义它们的阶段序列相匹配。 rmf_task_sequence 中定义的阶段反过来是事件的集合,这些事件还具有用于在事件期间模拟最终状态和命令机器人的组件。目前支持 此处 定义的事件。 然后,用户可以通过将此类阶段/事件的序列串联在一起来构建任务的任意定义。RMF 能够规划和执行此类任务。

perform_action 是一个基于序列的事件,支持执行自定义操作。 perform_action 行为的可定制性是有限的,因此实施自定义逻辑的用户无需担心如何与交通系统交互、开门或使用电梯。这也最大限度地降低了系统集成商引入错误的风险,从而扰乱交通系统或任何其他资源共享系统。当 perform_action 运行时,机器人将成为“只读”交通代理,因此其他机器人将简单地避开它

用户将在 rmf_fleet_adapter 层上工作。在此层中,API 仅限于使用 rmf_task_sequence 来执行任务。仅支持某些事件,它们的描述可以在 此处 找到。

rmf_fleet_adapter 层充当用户可以使用的 API。它仅支持对 此处 中提到的现有事件中的 perform_action 进行自定义行为。 用户只能在 perform_action 中添加自定义任务。RMF 将命令传递给舰队适配器集成的平台特定端,并正式释放对机器人的控制,直到操作完成。 要在 perform_action 中使用自定义任务,用户需要使用 API 的两个部分。

  1. FleetUpdateHandle::add_performable_action 它由两部分组成:第一部分是动作的“类别”,第二部分是“考虑”部分,将根据该部分决定是否接受该动作。

以下是一个例子:

rmf_fleet: name: "ecobot40" limits: linear: [1.2, 1.5] # velocity, acceleration angular: [0.6, 0.6] profile: # Robot profile is modelled as a circle footprint: 0.5 vicinity: 0.6 reversible: False battery_system: voltage: 24.0 capacity: 40.0 charging_current: 26.4 mechanical_system: mass: 80.0 moment_of_inertia: 20.0 friction_coefficient: 0.20 ambient_system: power: 20.0 cleaning_system: power: 760.0 recharge_threshold: 0.05 recharge_soc: 1.0 publish_fleet_state: True account_for_battery_drain: True task_capabilities: # Specify the types of RMF Tasks that robots in this fleet are capable of performing loop: True delivery: False clean: True finishing_request: "park" action_categories: ["clean", "manual_control"]
def _consider(description: dict): confirm = adpt.fleet_update_handle.Confirmation() # Currently there's no way for user to submit a robot_task_request # .json file via the rmf-web dashboard. Thus the short term solution # is to add the fleet_name info into action description. NOTE # only matching fleet_name action will get accepted if (description["category"] == "manual_control" and description["description"]["fleet_name"] != fleet_name): return confirm node.get_logger().warn( f"Accepting action: {description} ") confirm.accept() return confirm fleet_config = config_yaml['rmf_fleet'] task_capabilities_config = fleet_config['task_capabilities'] if 'action_categories' in task_capabilities_config: for cat in task_capabilities_config['action_categories']: node.get_logger().info( f"Fleet [{fleet_name}] is configured" f" to perform action of category [{cat}]") fleet_handle.add_performable_action(cat, _consider)
  1. RobotUpdateHandle::set_action_executor 在这里,您可以告诉舰队适配器如何指示您的机器人开始执行操作。此函数的回调包括:
  • category(string) 类型的操作。
  • description(JSON) 消息,其中包含有关如何执行操作的详细信息。
  • execution(object) 对象,在执行操作时,队列适配器的平台特定端必须持有该对象,理想情况下会定期更新剩余时间estimates.

在“perform_action”中使用自定义任务时,机器人不会参与交通协商。这意味着它将被允许将其轨迹报告给交通调度,从而使其他机器人能够避开它。但是,在任务完成之前,机器人将无法容纳其他机器人。

这是一个例子:

# call this when starting cleaning execution def _action_executor(self, category: str, description: dict, execution: adpt.robot_update_handle.ActionExecution): with self._lock: # only accept clean and manual_control assert(category in ["clean", "manual_control"]) self.action_category = category if (category == "clean"): attempts = 0 self.api.set_cleaning_mode(self.config['active_cleaning_config']) while True: self.node.get_logger().info(f"Requesting robot {self.name} to clean {description}") if self.api.start_clean(description["clean_task_name"], self.robot_map_name): self.check_task_completion = self.api.task_completed # will check api break if (attempts > 3): self.node.get_logger().warn( f"Failed to initiate cleaning action for robot [{self.name}]") # TODO: issue error ticket self.in_error = True # TODO: toggle error back execution.error("Failed to initiate cleaning action for robot {self.name}") execution.finished() return attempts+=1 time.sleep(1.0) elif (category == "manual_control"): self.check_task_completion = lambda:False # user can only cancel the manual_control # start Perform Action self.node.get_logger().warn(f"Robot [{self.name}] starts [{category}] action") self.start_action_time = self.adapter.now() self.on_waypoint = None self.on_lane = None self.action_execution = execution self.stubbornness = self.update_handle.unstable_be_stubborn() # robot moves slower during perform action self.vehicle_traits.linear.nominal_velocity *= 0.2
self.update_handle.set_action_executor(self._action_executor)

这些示例是以下内容的一部分 repository.