PerformAction 教程(Python)

本教程是 Fleet Adapter 教程的扩展,将指导您在 Fleet Adapter 中编写自定义操作。虽然 RMF 提供了一些标准任务,但我们知道不同的机器人可能配备并编程为执行不同类型的操作,例如清洁、拾取物体、远程操作等。通过支持自定义任务,用户可以预先触发 Fleet Adapter 的 config.yaml 中指定的自定义操作,并且 RMF 将放弃对机器人的控制,直到收到机器人已完成自定义操作的信号。您可以探索 在 RMF 中支持新任务 部分,以了解有关支持自定义任务的更多信息以及如何创建自己的任务 JSON 以发送到 RMF。

在本教程中,我们将参考 rmf_demos_fleet_adapter 的简化版本,以在我们的 Fleet Adapter 中实现 Clean PerformAction 功能。

1. 在舰队config.yaml中定义 PerformAction

我们需要在舰队配置中定义操作的名称,以便 RMF 在提交任务时将此操作识别为可执行,并能够将其分派给可以完成该任务的舰队。在 rmf_fleet 部分下的 config.yaml 中,我们可以为舰队提供可执行操作的列表。例如,让我们将 clean 定义为此舰队支持的操作:

rmf_fleet: actions: ["clean"]

2. 在我们的舰队适配器内应用操作执行逻辑

RMF 收到包含此操作的任务并将其分派到正确的舰队后,舰队适配器的 execute_action(~) 回调将被触发。解析到此回调的 category 对应于我们之前定义的操作名称,而 description 包含我们可能感兴趣的有关该操作的任何详细信息。

假设这是提交给 RMF 的任务 JSON:

{ "type": "dispatch_task_request", "request": { "unix_millis_earliest_start_time": start_time, "category": "clean", "description": { "zone": "clean_lobby" } } }

在我们的示例中,提供的“category”将是“clean”,而“description”将包含此任务将我们的机器人引导到哪个清洁区域,即“clean_lobby”。因此,我们需要在“execute_action(~)”中实现逻辑:

def execute_action(self, category: str, description: dict, execution): self.execution = execution if category == 'clean': self.perform_clean(description['zone']) def perform_clean(self, zone): if self.api.start_activity(self.name, 'clean', zone): self.node.get_logger().info( f'Commanding [{self.name}] to clean zone [{zone}]' ) else: self.node.get_logger().error( f'Fleet manager for [{self.name}] does not know how to ' f'clean zone [{zone}]. We will terminate the activity.' ) self.execution.finished() self.execution = None

由于我们的机器人队列可能能够执行多个自定义操作,因此我们需要进行检查以确保收到的“类别”与我们的目标机器人 API 相匹配。收到“清洁”操作后,我们可以相应地触发机器人的 API。

3. 为自定义操作实现机器人 API

这就是“RobotClientAPI.py”中的“start_activity(~)”方法发挥作用的地方。我们需要它实现对机器人的 API 调用以启动清洁活动。例如,如果机器人 API 使用 REST 调用机器人,则实现的方法可能如下所示:

def start_activity( self, robot_name: str, activity: str, label: str ): ''' Request the robot to begin a process. This is specific to the robot and the use case. For example, load/unload a cart for Deliverybot or begin cleaning a zone for a cleaning robot.''' url = ( self.prefix + f"/open-rmf/rmf_demos_fm/start_activity?robot_name={robot_name}" ) # data fields: task, map_name, destination{}, data{} data = {'activity': activity, 'label': label} try: response = requests.post(url, timeout=self.timeout, json=data) response.raise_for_status() if self.debug: print(f'Response: {response.json()}') if response.json()['success']: return True # If we get a response with success=False, then return False except HTTPError as http_err: print(f'HTTP error for {robot_name} in start_activity: {http_err}') except Exception as err: print(f'Other error {robot_name} in start_activity: {err}') return False

4. 完成操作

由于我们在“RobotAdapter”中存储了一个“self.execution”对象,因此当任何执行(导航、停止或操作)完成时,我们都会收到通知,因为更新循环会不断调用“is_command_completed”来检查其状态。

def update(self, state): activity_identifier = None if self.execution: if self.api.is_command_completed(): self.execution.finished() self.execution = None else: activity_identifier = self.execution.identifier

如果您的实现需要单独的回调来标记执行已完成,您可以创建一个新函数来进行此检查,并在操作完成时调用“self.execution.finished()”。