Home » ROS2探索总结 » ROS2探索总结-12.ROS2 Python编程基础

ROS2探索总结-12.ROS2 Python编程基础

ROS2探索总结-12.ROS2 Python编程基础

说明:

前提准备:

  • 在进行操作前,完成ROS 2的安装和例程编译

一、话题通信

  • 代码编译后,在终端设置环境变量,并使用如下命令运行listener和talker:
$ ros2 run demo_nodes_py listener
$ ros2 run demo_nodes_py talker
  • 运行成功后,可以看到如下效果:

请输入图片描述

  • 以上talker和listener的具体实现,我们可以来分析下代码,分析内容请见代码中的注释:

  • talker.py

# 引用python接口
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
 
class Talker(Node):
 
   def __init__(self):
       super().__init__('talker')
       self.i = 0
 
       # 创建一个Publisher,发布名为chatter的topic,消息类型为String
       self.pub = self.create_publisher(String, 'chatter')
 
       # 设置定时器,周期调用定时器回调函数timer_callback
       timer_period = 1.0
       self.tmr = self.create_timer(timer_period, self.timer_callback)
 
   def timer_callback(self):
       # 创建String类型的消息
       msg = String()
       msg.data = 'Hello World: {0}'.format(self.i)
       self.i += 1
       self.get_logger().info('Publishing: "{0}"'.format(msg.data))
 
       # 发布消息
       self.pub.publish(msg)
 
 
def main(args=None):
   # ROS节点初始化
   rclpy.init(args=args)
 
   # 创建节点及发布器
   node = Talker()
 
   # 循环
   rclpy.spin(node)
 
   # 销毁节点,退出程序
   node.destroy_node()
   rclpy.shutdown()
 
 
if __name__ == '__main__':
   main()
  • listener.py
# 引用python接口
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
 
class Listener(Node):
 
   def __init__(self):
       super().__init__('listener')
 
       # 创建一个subscriber,订阅名为chatter的topic
       # 消息类型是String,回调函数为chatter_callback
       self.sub = self.create_subscription(String, 'chatter', self.chatter_callback)
 
   def chatter_callback(self, msg):
       # 将收到的消息数据打印出来
       self.get_logger().info('I heard: [%s]' % msg.data)
 
def main(args=None):
   # ROS节点初始化
   rclpy.init(args=args)
 
   # 创建节点及订阅器
   node = Listener()
 
   # 循环查询消息队列,收到消息后进入回调函数
   rclpy.spin(node)
 
   # 销毁节点,退出程序
   node.destroy_node()
   rclpy.shutdown()
 
if __name__ == '__main__':
   main()

二、服务通信

  • 接下来我们看下ROS另外一种核心通信机制——服务的实现,运行以下命令实现ROS 1教程当中经典的加法求和服务例程:
$ ros2 run demo_nodes_py add_two_ints_server
$ ros2 run demo_nodes_py add_two_ints_client
  • 可以看到如下效果:

请输入图片描述

  • add_two_ints_server和add_two_ints_client的具体实现,继续来分析代码:

  • add_two_ints_server.py

# 引用python接口
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
 
class AddTwoIntsServer(Node):
 
   def __init__(self):
       super().__init__('add_two_ints_server')
 
       # 创建一个service服务端,提供名为AddTwoInts的service,
       # 消息类型为AddTwoInts,回调函数为add_two_ints_callback
       self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
 
   def add_two_ints_callback(self, request, response):
       # 处理服务功能,将求和结果放入应答数据中
       response.sum = request.a + request.b
       self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
 
       return response
 
def main(args=None):
   # ROS节点初始化
   rclpy.init(args=args)
 
   # 创建服务端
   node = AddTwoIntsServer()
 
   # 循环
   rclpy.spin(node)
 
   # 销毁节点,退出程序
   node.destroy_node()
   rclpy.shutdown()
 
if __name__ == '__main__':
   main()
add_two_ints_client.py
# 引用python接口
from example_interfaces.srv import AddTwoInts
import rclpy
 
def main(args=None):
   # ROS节点初始化
   rclpy.init(args=args)
 
   # 创建节点
   node = rclpy.create_node('add_two_ints_client')
 
   # 创建客户端
   cli = node.create_client(AddTwoInts, 'add_two_ints')
 
   # 等待服务端
   while not cli.wait_for_service(timeout_sec=1.0):
       print('service not available, waiting again...')
 
   # 创建请求数据
   req = AddTwoInts.Request()
   req.a = 2
   req.b = 3
 
   # 异步请求(非阻塞)
   future = cli.call_async(req)
 
   # 等待服务端应答
   rclpy.spin_until_future_complete(node, future)
   if future.result() is not None:
       node.get_logger().info('Result of add_two_ints: %d' % future.result().sum)
   else:
       node.get_logger().error('Exception while calling service: %r' % future.exception())
 
   # 销毁节点,退出程序
   node.destroy_node()
   rclpy.shutdown()
 
if __name__ == '__main__':
   main()

三、QoS配置

  • ROS 2中的通信中间件改用DDS,而QoS是DDS中非常重要的一环,控制了各方面与底层的通讯机制,主要从时间限制、可靠性、持续性、历史记录几个方面,满足用户针对不同场景的数据应用需求。

  • 以话题通信为例,我们看下Python程序中该如何配置QoS:

  • talker_qos.py

import argparse
import sys
 
# 引用python接口
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_default, qos_profile_sensor_data
from rclpy.qos import QoSReliabilityPolicy
 
from std_msgs.msg import String
 
class TalkerQos(Node):
 
   def __init__(self, qos_profile):
       super().__init__('talker_qos')
       self.i = 0
 
       # 传输模式:Reliable、Best effort
       if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
           self.get_logger().info('Reliable talker')
       else:
           self.get_logger().info('Best effort talker')
 
       # 创建一个Publisher,发布名为chatter的topic,消息类型为String,设置qos参数
       self.pub = self.create_publisher(String, 'chatter', qos_profile=qos_profile)
 
       # 设置定时器,周期调用定时器回调函数timer_callback
       timer_period = 1.0
       self.tmr = self.create_timer(timer_period, self.timer_callback)
 
   def timer_callback(self):
       # 创建String类型的消息
       msg = String()
       msg.data = 'Hello World: {0}'.format(self.i)
       self.i += 1
       self.get_logger().info('Publishing: "{0}"'.format(msg.data))
 
       # 发布消息
       self.pub.publish(msg)
 
def main(argv=sys.argv[1:]):
   # 解析输入参数
   parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 
   # 是否使用relibale可靠传输模式
   parser.add_argument(
       '--reliable', dest='reliable', action='store_true',
       help='set qos profile to reliable')
   parser.set_defaults(reliable=False)
 
   # 循环发布次数
   parser.add_argument(
       '-n', '--number_of_cycles', type=int, default=20,
       help='number of sending attempts')
 
   # 其他输入参数  
   parser.add_argument(
       'argv', nargs=argparse.REMAINDER,
       help='Pass arbitrary arguments to the executable')
   args = parser.parse_args(argv)
 
   # ROS节点初始化
   rclpy.init(args=args.argv)
 
   # 设置qos参数
   if args.reliable:
       custom_qos_profile = qos_profile_default
   else:
       custom_qos_profile = qos_profile_sensor_data
 
   # 创建节点及发布器
   node = TalkerQos(custom_qos_profile)
 
   # 循环
   cycle_count = 0
   while rclpy.ok() and cycle_count < args.number_of_cycles:
       rclpy.spin_once(node)
       cycle_count += 1
 
   # 销毁节点并退出
   node.destroy_node()
   rclpy.shutdown()
 
if __name__ == '__main__':
   main()
listener_qos.py
import argparse
import sys
 
# 引用python接口
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_default, qos_profile_sensor_data
from rclpy.qos import QoSReliabilityPolicy
 
from std_msgs.msg import String
 
class ListenerQos(Node):
 
   def __init__(self, qos_profile):
       super().__init__('listener_qos')
 
       # 传输模式:Reliable、Best effort
       if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
           self.get_logger().info('Reliable listener')
       else:
           self.get_logger().info('Best effort listener')
 
       # 创建一个subscriber,订阅名为chatter的topic
       # 消息类型是String,回调函数为chatter_callback,设置qos参数
       self.sub = self.create_subscription(
           String, 'chatter', self.chatter_callback, qos_profile=qos_profile)
 
   def chatter_callback(self, msg):
       # 将收到的消息数据打印出来
       self.get_logger().info('I heard: [%s]' % msg.data)
 
 
def main(argv=sys.argv[1:]):
   # 解析输入参数
   parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 
   # 是否使用relibale可靠传输模式
   parser.add_argument(
       '--reliable', dest='reliable', action='store_true',
       help='set qos profile to reliable')
   parser.set_defaults(reliable=False)
 
   # 循环订阅次数
   parser.add_argument(
       '-n', '--number_of_cycles', type=int, default=20,
       help='max number of spin_once iterations')
 
 # 其他输入参数
   parser.add_argument(
       'argv', nargs=argparse.REMAINDER,
       help='Pass arbitrary arguments to the executable')
   args = parser.parse_args(argv)
 
   # ROS节点初始化
   rclpy.init(args=args.argv)
 
   # 设置qos参数
   if args.reliable:
       custom_qos_profile = qos_profile_default
   else:
       custom_qos_profile = qos_profile_sensor_data
 
   # 创建节点及订阅器
   node = ListenerQos(custom_qos_profile)
 
   # 循环
   cycle_count = 0
   while rclpy.ok() and cycle_count < args.number_of_cycles:
       rclpy.spin_once(node)
       cycle_count += 1
 
   # 销毁节点并退出
   node.destroy_node()
   rclpy.shutdown()
 
if __name__ == '__main__':
   main()
  • ROS 2针对四种类型的通信,提供了QoS的默认配置:

    • Default QoS settings for publishers and subscriptions
      In order to make the transition from ROS1 to ROS2, exercising a similar network behavior is desirable. By default, publishers and subscriptions are reliable in ROS2, have volatile durability, and “keep last” history.
    • Services
      In the same vein as publishers and subscriptions, services are reliable. It is especially important for services to use volatile durability, as otherwise service servers that re-start may receive outdated requests.
      Sensor data
    • For sensor data, in most cases it’s more important to receive readings in a timely fashion, rather than ensuring that all of them arrive. That is, developers want the latest samples as soon as they are captured, at the expense of maybe losing some. For that reason the sensor data profile uses best effort reliability and a smaller queue depth.
    • Parameters
      Parameters are based on services, and as such have a similar profile. The difference is that parameters use a much larger queue depth so that requests do not get lost when, for example, the parameter client is unable to reach the parameter service server.

参考:http://design.ros2.org/articles/qos.html

  • 以上例程运行时,默认使用的是Best effort模式:

请输入图片描述

  • 可以通过命令输入切换成Reliable模式:

请输入图片描述

  • 还可以设置发布或订阅的次数:

请输入图片描述

纠错,疑问,交流: 请进入讨论区点击加入Q群

获取最新文章: 扫一扫右上角的二维码加入“创客智造”公众号


标签: ros2探索总结