ROS与Python入门教程-CompressedImage类型的订阅器和发布器
ROS与Python入门教程-CompressedImage类型的订阅器和发布器
说明
- 这一节介绍订阅包含sensor_msgs::CompressedImage的主题,转换CompressedImage为numpy.ndarray,从而做检测和标记,再发布为CompressedImage类型的主题。
实现
- 在beginner_tutorials/scripts目录,新建subscriber_publisher_CompressedImage.py
$ roscd beginner_tutorials/scripts/
$ touch subscriber_publisher_CompressedImage.py
$ chmod +x subscriber_publisher_CompressedImage.py
$ rosed beginner_tutorials subscriber_publisher_CompressedImage.py
- 手工输入如下完整示例代码:(忽略注释部分)
#!/usr/bin/env python
"""OpenCV feature detectors with ros CompressedImage Topics in python.
This example subscribes to a ros topic containing sensor_msgs
CompressedImage. It converts the CompressedImage into a numpy.ndarray,
then detects and marks features in that image. It finally displays
and publishes the new image - again as CompressedImage topic.
"""
__author__ = 'Simon Haller <simon.haller at uibk.ac.at>'
__version__= '0.1'
__license__ = 'BSD'
# Python libs
import sys, time
# numpy and scipy
import numpy as np
from scipy.ndimage import filters
# OpenCV
import cv2
# Ros libraries
import roslib
import rospy
# Ros Messages
from sensor_msgs.msg import CompressedImage
# We do not use cv_bridge it does not support CompressedImage in python
# from cv_bridge import CvBridge, CvBridgeError
VERBOSE=False
class image_feature:
def __init__(self):
'''Initialize ros publisher, ros subscriber'''
# topic where we publish
self.image_pub = rospy.Publisher("/output/image_raw/compressed",
CompressedImage, queue_size = 10)
# self.bridge = CvBridge()
# subscribed Topic
self.subscriber = rospy.Subscriber("/camera/image/compressed",
CompressedImage, self.callback)
if VERBOSE :
print "subscribed to /camera/image/compressed"
def callback(self, ros_data):
'''Callback function of subscribed topic.
Here images get converted and features detected'''
if VERBOSE :
print 'received image of type: "%s"' % ros_data.format
#### direct conversion to CV2 ####
np_arr = np.fromstring(ros_data.data, np.uint8)
image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
#### Feature detectors using CV2 ####
# "","Grid","Pyramid" +
# "FAST","GFTT","HARRIS","MSER","ORB","SIFT","STAR","SURF"
method = "GridFAST"
feat_det = cv2.FeatureDetector_create(method)
time1 = time.time()
# convert np image to grayscale
featPoints = feat_det.detect(
cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY))
time2 = time.time()
if VERBOSE :
print '%s detector found: %s points in: %s sec.'%(method,
len(featPoints),time2-time1)
for featpoint in featPoints:
x,y = featpoint.pt
cv2.circle(image_np,(int(x),int(y)), 3, (0,0,255), -1)
cv2.imshow('cv_img', image_np)
cv2.waitKey(2)
#### Create CompressedIamge ####
msg = CompressedImage()
msg.header.stamp = rospy.Time.now()
msg.format = "jpeg"
msg.data = np.array(cv2.imencode('.jpg', image_np)[1]).tostring()
# Publish new image
self.image_pub.publish(msg)
#self.subscriber.unregister()
def main(args):
'''Initializes and cleanup ros node'''
ic = image_feature()
rospy.init_node('image_feature', anonymous=True)
try:
rospy.spin()
except KeyboardInterrupt:
print "Shutting down ROS Image feature detector module"
cv2.destroyAllWindows()
if __name__ == '__main__':
main(sys.argv)
代码分析:
- 代码:
# Python libs
import sys, time
# numpy and scipy
import numpy as np
from scipy.ndimage import filters
# OpenCV
import cv2
# Ros libraries
import roslib
import rospy
# Ros Messages
from sensor_msgs.msg import CompressedImage
- 分析:
- 导入所需的库,Python相关库,OpenCV相关库,ROS相关库,ROS相关消息
- Time用于测量特征检测的时间
- NumPy、SciPy和CV2用来实现处理转换,显示和特征检测
- ROS消息需要来自sensor_msgs的CompressedImage
代码:
VERBOSE = False
分析:如果设置为true,你会得到一些额外的信息打印到命令行(特征检测方法,检测点的数量,时间)
代码:
class image_feature:
def __init__(self):
...
def callback(self, ros_data):
分析:定义类,一个构造函数用于实例化,一个callback函数用于处理压缩的图片数据。
代码:
The __init__ method
分析:发布主题/output/image_raw/compressed,订阅主题并用回调函数处理/camera/image/compressed,都传递CompressedImage的图片数据。
- 代码:
np_arr = np.fromstring(ros_data.data, np.uint8)
image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
分析:转换压缩图片数据为cv2的图片数据。这里先转换成numpy数组,再转换成CV2的图片(numpy.ndarray)
代码:
#### Feature detectors using CV2 ####
# "","Grid","Pyramid" +
# "FAST","GFTT","HARRIS","MSER","ORB","SIFT","STAR","SURF"
method = "GridFAST"
feat_det = cv2.FeatureDetector_create(method)
time1 = time.time()
分析:
- 第一行选择特征检测方法
- 第二行创建特征检测
- 第三行获取时间
代码:
# convert np image to grayscale
featPoints = feat_det.detect(cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY))
time2 = time.time()
if VERBOSE :
print '%s detector found: %s featpoints in: %s sec.' %(method,
len(featPoints),time2-time1)
- 分析:
cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY)
转换图像成灰度图feat_det.detect(
获取特征点- 第二行,记录时间点
- 第三行,VERBOSE 为真,输出相关信息。
- 代码:
for featpoint in featPoints:
x,y = featpoint.pt
cv2.circle(image_np,(int(x),int(y)), 3, (0,0,255), -1)
cv2.imshow('cv_img', image_np)
cv2.waitKey(2)
分析:在图片上画圆标注出特征点
代码:
#### Create CompressedIamge ####
msg = CompressedImage()
msg.header.stamp = rospy.Time.now()
msg.format = "jpeg"
msg.data = np.array(cv2.imencode('.jpg', image_np)[1]).tostring()
分析:
- 创建要发布的图片信息。
- 三个变量内容:header,format,data. data是cv2的图片转换成np.array,再输出成字符串。
代码:
# Publish new image
self.image_pub.publish(msg)
- 分析:最后是发布主题
测试节点
- 启动roscore
$ roscore
- 启动usbcam摄像头,使用ros by example的程序例子启动摄像头。
$ rosrun rbx1_vision usb_cam.launch
- 启动节点
$ rosrun beginner_tutorials subscriber_publisher_CompressedImage.py
- 效果:
获取最新文章: 扫一扫右上角的二维码加入“创客智造”公众号