< >
Home » ROS2入门教程 » ROS2入门教程-Intra-Process通讯方式

ROS2入门教程-Intra-Process通讯方式

ROS2入门教程-Intra-Process通讯方式

说明:

  • 介绍Intra-Process的通讯方式

Background

  • ROS应用程序通常有多个单独的节点组成,这些节点的工作范围一般很小,与系统的其他部分联系不大。

  • 这促进了故障隔离、更快的开发、模块化和代码重用, 但它往往以性能为代价。

  • 在ROS1开发之后, 节点有效组合的需求变得明显, Nodelets 也得到了发展。 在ROS2中, 我们旨在通过解决一些需要重构节点的基本问题来改进 Nodelets 的设计。

  • 在这个演示中,其将重点介绍如何通过单独定义节点,将其组合在不同的进程布局中,而不改变节点的代码或限制其功能作用,从而手动组合节点。

Build the demos

  • 这些demo适用于三个主要的操作系统 (Windows、 Mac 或 Linux)。 其中在一些操作系统中需要安装 OpenCV。

  • 使用已编译的二进制包安装ROS2

  • ROS2二进制包的相关下载链接:https://github.com/ros2/ros2/releases

  • 源码安装ROS2

  • 确保安装了OpenCV同时根据源码安装ROS2的教程安装ROS2

  • 源码安装ROS2的教程链接:https://github.com/ros2/ros2/wiki/Installation

运行和理解demo

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;

// Node that produces messages.
struct Producer : public rclcpp::Node
{
  Producer(const std::string & name, const std::string & output)
  : Node(name, "", true)
  {
    // Create a publisher on the output topic.
    pub_ = this->create_publisher<std_msgs::msg::Int32>(output, rmw_qos_profile_default);
    std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;
    // Create a timer which publishes on the output topic at ~1Hz.
    auto callback = [captured_pub]() -> void {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }
        static int32_t count = 0;
        std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32());
        msg->data = count++;
        printf(
          "Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        pub_ptr->publish(msg);
      };
    timer_ = this->create_wall_timer(1s, callback);
  }

  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

// Node that consumes messages.
struct Consumer : public rclcpp::Node
{
  Consumer(const std::string & name, const std::string & input)
  : Node(name, "", true)
  {
    // Create a subscription on the input topic which prints on receipt of new messages.
    sub_ = this->create_subscription<std_msgs::msg::Int32>(
      input, [](std_msgs::msg::Int32::UniquePtr msg) {
      printf(
        " Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
        reinterpret_cast<std::uintptr_t>(msg.get()));
    }, rmw_qos_profile_default);
  }

  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;

  auto producer = std::make_shared<Producer>("producer", "number");
  auto consumer = std::make_shared<Consumer>("consumer", "number");

  executor.add_node(producer);
  executor.add_node(consumer);
  executor.spin();
  return 0;
}
  • 正如主函数所示,其中有一个生产者和一个消费者节点,它们被添加到一个单线程程序中,然后调用spin

  • 如果在Producer结构中的“producer”节点的实现,可以看到其中已经创建了一个发布“number”话题的发布者和一个定期创建一个新消息且在内存中输出出它的地址和值并发布这个消息的定时器

  • 至于“Consumer”节点,可以在Consumer结构中看到它的实现,因为它只订阅“number”话题并打印它接收的消息的地址和值。

  • 期望效果是:生产者将输出地址和值,并且消费者将输出匹配的地址和价值。这表明进程内通信确实正在工作,同时避免使用不必要的副本。

  • 执行 ros2 run intra_process_demo two_node_pipeline 命令来运行demo(需要事先装好ROS2)

$ ros2 run intra_process_demo two_node_pipeline
Published message with value: 0, and address: 0x7fb02303faf0
Published message with value: 1, and address: 0x7fb020cf0520
 Received message with value: 1, and address: 0x7fb020cf0520
Published message with value: 2, and address: 0x7fb020e12900
 Received message with value: 2, and address: 0x7fb020e12900
Published message with value: 3, and address: 0x7fb020cf0520
 Received message with value: 3, and address: 0x7fb020cf0520
Published message with value: 4, and address: 0x7fb020e12900
 Received message with value: 4, and address: 0x7fb020e12900
Published message with value: 5, and address: 0x7fb02303cea0
 Received message with value: 5, and address: 0x7fb02303cea0
[...]
  • 可以看到消息每秒输出一次,因为计时器被设定为每秒发布一次消息

  • 同时可以看到第一条消息(值为0)没有相应的“Received message ...”行。This is because publish/subscribe is "best effort" and we do not have any "latching" like behavior enabled.

  • 这意味着,如果发布者在订阅建立之前发布消息,订阅将不会收到该消息。这种竞争条件可能导致前几个消息丢失。在这种情况下,由于它们每秒只能进行一次,通常只有第一条消息丢失。

  • 最后, 可以看到"Published message..."和"Received message ..."的行是相同的地址和值。 这表明所收到信息的地址与已发布的邮件地址相同, 而且不是副本。

  • 这是因为其正在发布和订阅std::unique_ptr, 它允许消息的所有权安全s地在系统中移动。 你也可以发布和订阅 const & std: : shared ptr, 但是在这种情况下不会出现零拷贝。

The cyclic pipeline demo

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;

// This node receives an Int32, waits 1 second, then increments and sends it.
struct IncrementerPipe : public rclcpp::Node
{
  IncrementerPipe(const std::string & name, const std::string & in, const std::string & out)
  : Node(name, "", true)
  {
    // Create a publisher on the output topic.
    pub = this->create_publisher<std_msgs::msg::Int32>(out, rmw_qos_profile_default);
    std::weak_ptr<std::remove_pointer<decltype(pub.get())>::type> captured_pub = pub;
    // Create a subscription on the input topic.
    sub = this->create_subscription<std_msgs::msg::Int32>(
      in, [captured_pub](std_msgs::msg::Int32::UniquePtr msg) {
      auto pub_ptr = captured_pub.lock();
      if (!pub_ptr) {
        return;
      }
      printf(
        "Received message with value:         %d, and address: 0x%" PRIXPTR "\n", msg->data,
        reinterpret_cast<std::uintptr_t>(msg.get()));
      printf("  sleeping for 1 second...\n");
      if (!rclcpp::sleep_for(1s)) {
        return;    // Return if the sleep failed (e.g. on ctrl-c).
      }
      printf("  done.\n");
      msg->data++;    // Increment the message's data.
      printf(
        "Incrementing and sending with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
        reinterpret_cast<std::uintptr_t>(msg.get()));
      pub_ptr->publish(msg);    // Send the message along to the output topic.
    }, rmw_qos_profile_default);
  }

  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;

  // Create a simple loop by connecting the in and out topics of two IncrementerPipe's.
  // The expectation is that the address of the message being passed between them never changes.
  auto pipe1 = std::make_shared<IncrementerPipe>("pipe1", "topic1", "topic2");
  auto pipe2 = std::make_shared<IncrementerPipe>("pipe2", "topic2", "topic1");
  rclcpp::sleep_for(1s);  // Wait for subscriptions to be established to avoid race conditions.
  // Publish the first message (kicking off the cycle).
  std::unique_ptr<std_msgs::msg::Int32> msg(new std_msgs::msg::Int32());
  msg->data = 42;
  printf(
    "Published first message with value:  %d, and address: 0x%" PRIXPTR "\n", msg->data,
    reinterpret_cast<std::uintptr_t>(msg.get()));
  pipe1->pub->publish(msg);

  executor.add_node(pipe1);
  executor.add_node(pipe2);
  executor.spin();
  return 0;
}
  • 与之前的demo不同, 这个demo只使用一个节点, 用不同的名称和配置实例化了两次。 这个图表最终成为循环中的 pipe1 -> pipe2 -> pipe1 ...。

  • pipe1->pub->publish(msg); 行关闭进程, 从这时起, 消息在节点之间来回传递,每个节点都在自己的订阅回调中调用发布。

  • 期望的效果是节点每秒一次来回传递消息, 每次递增消息的值。 因为消息正在被发布并且订阅为一个unique_ptr, 在开始时创建的相同的消息是持续使用的。

  • 进行测试

% ros2 run intra_process_demo cyclic_pipeline
Published first message with value:  42, and address: 0x7fd2ce0a2bc0
Received message with value:         42, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 43, and address: 0x7fd2ce0a2bc0
Received message with value:         43, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 44, and address: 0x7fd2ce0a2bc0
Received message with value:         44, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 45, and address: 0x7fd2ce0a2bc0
Received message with value:         45, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 46, and address: 0x7fd2ce0a2bc0
Received message with value:         46, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 47, and address: 0x7fd2ce0a2bc0
Received message with value:         47, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
[...]
  • 在每次迭代中,可以看到不断增加的数字,从42开始。在整个过程重复使用相同的消息,如不改变指针地址所示,这样可以避免不必要的副本。

The image pipeline demo

  • demo需要用到OpenCV来捕获,捕捉, 注释, 和查看图片

  • 至于使用OS X 用户: 如果这些例子不能正常,或者接收到一个类似 ddsi_conn_write failed -1错误提示, 则需要增加系统 UDP 数据包的大小:

$ sudo sysctl -w net.inet.udp.recvspace=209715
$ sudo sysctl -w net.inet.udp.maxdgram=65500
  • Simple pipeline

  • 首先将有一个由三个节点组成的管道, 按照这样的顺序排列: camera_node -> watermark_node -> image_view_node

  • camera_node从您的计算机上的相机设备0读取,在图像上写入一些信息并发布它。watermark_node订阅camera_node的输出,并在发布之前添加更多信息。 最后,image_view_node订阅watermark_node的输出,将更多信息写入图像,然后用cv::imshow将其可视化。

  • 在每个节点中,正在发送的或者已经接收到的消息的地址或者两者都被写入到图像中。 watermark和 image view节点被设计成在不复制图像的情况下修改图像。

  • so the addresses imprinted on the image should all be the same as long as the nodes are in the same process and the graph remains organized in a pipeline as sketched above.

  • 执行ros2 run intra_process_demo image_pipeline_all_in_one命令来进行测试

请输入图片描述

  • 按下空格键可以暂停渲染图像, 再次按下空格键会恢复渲染。 按 qESC 退出。

  • Pipeline with two image viewers

  • 执行ros2 run intra_process_demo image_pipeline_with_two_image_view命令来进行测试

请输入图片描述

  • As you can see in the example image above, we have one image with all of the pointers the same and then another image with the same pointers as the first image for the first two entries, but the last pointer on the second image is different. To understand why this is happening consider the graph's topology:
camera_node -> watermark_node -> image_view_node -> image_view_node2
  • The link between the camera_node and the watermark_node can use the same pointer without copying because there is only one intra process subscription to which the message should be delivered. But for the link between the watermark_node and the two image view nodes the relationship is one to many, so if the image view nodes were using unique_ptr callbacks then it would be impossible to deliver the ownership of the same pointer to both. It can be, however, delivered to one of them. Which one would get the original pointer is not defined, but instead is simply the last to be delivered.

  • Note that the image view nodes are not subscribed with unique_ptr callbacks. Instead they are subscribed with const shared_ptrs. This means the system could have delivered the same shared_ptr to both callbacks. Currently the intra process system is not that intelligent and so it stores the message internally as a unique_ptr and copies it into a shared_ptr for each callback until the last one. On the last callback, regardless of the type, the ownership is transferred out of intra process storage and, in the case of the image view, the ownership is moved into a new shared_ptr and delivered. Thus, one of the image view nodes gets a copy and the other gets the original.

  • Pipeline with interprocess viewer

  • One other important thing to get right is to avoid interruption of the intra process zero-copy behavior when interprocess subscriptions are made. To test this we can run the first image pipeline demo, image_pipeline_all_in_one, and then run an instance of the stand alone image_view_node (don't forget to prefix them with ros2 run intra_process_demo in the terminal). This will look something like this:

请输入图片描述

  • It's hard to pause both images at the same time so the images may not line up, but the important thing to notice is that the image_pipeline_all_in_one image view shows the same address for each step. This means that the intra process zero-copy is preserved even when an external view is subscribed as well. You can also see that the interprocess image view has different process IDs for the first two lines of text and the process ID of the standalone image viewer in the third line of text.

参考资料

纠错,疑问,交流: 请进入讨论区点击加入Q群

获取最新文章: 扫一扫右上角的二维码加入“创客智造”公众号


标签: ros2入门教程