用ros2,python编写一个发布者和订阅者节点 在终端打印出来 还有他们的连接关系 用(rqt_graph看)内容不限
时间: 2025-06-09 15:55:28 浏览: 21
<think>嗯,用户让我用ROS2和Python写一个发布者和订阅者节点,还要在终端打印信息,并且展示他们的连接关系,用rqt_graph查看。我需要先回忆一下ROS2的基本概念和结构。
首先,ROS2中的节点是基本的执行单元,发布者和订阅者是通过话题进行通信的。发布者节点发布消息到某个话题,订阅者节点订阅该话题来接收消息。用户的需求是两个节点,一个发布,一个订阅,并且用Python来编写。
接下来,我需要考虑如何用Python在ROS2中创建这两个节点。记得要导入rclpy库,以及相关的消息类型。比如,这里可能用标准消息类型String,所以需要从std_msgs.msg中导入String。
然后,发布者节点的结构大概是这样的:继承Node类,在初始化时创建发布者,定时器定期发布消息。消息内容需要包含一个计数器,这样每次发布的时候计数器递增,方便在终端看到变化。发布的消息会被打印出来,所以要用get_logger().info来输出。
订阅者节点则是创建订阅,回调函数在接收到消息时触发,同样用get_logger().info打印接收到的消息数据。
然后,用户提到连接关系用rqt_graph查看,需要确保话题名称一致,发布者和订阅者使用相同的话题,比如例子中的“my_topic”。这样在运行后,启动rqt_graph就能看到两个节点通过话题连接的结构。
另外,需要编写setup.py文件,用于注册节点,这样ros2 run的时候可以找到可执行文件。用户可能容易忘记这一步,所以需要详细说明entry_points的配置。
测试步骤方面,要指导用户如何编译、运行,以及如何查看rqt_graph。可能还需要提醒用户在不同终端中运行各个部分,比如一个终端运行发布者,一个运行订阅者,再一个运行rqt_graph。
还有可能遇到的问题,比如依赖没有安装,或者工作空间没有正确source,或者消息类型不匹配。需要提醒用户注意检查这些点。
最后,确保代码的正确性,比如定时器的周期设置是否正确,回调函数的参数是否正确处理。比如发布者的回调函数是timer_callback,而订阅者的是listener_callback,参数是msg,然后访问msg.data。
总结一下,步骤应该是:创建功能包,编写发布者代码,编写订阅者代码,配置setup.py,编译,运行节点,然后用rqt_graph查看连接。整个过程需要详细但清晰,避免用户遗漏步骤。</think>下面我将为您详细介绍如何用ROS2和Python编写发布者/订阅者节点,并展示连接关系:
---
### 1. 创建功能包
```bash
ros2 pkg create my_pubsub --build-type ament_python
cd my_pubsub/my_pubsub
```
---
### 2. 编写发布者节点 (`publisher_node.py`)
```python
# publisher_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MyPublisher(Node):
def __init__(self):
super().__init__('my_publisher')
self.publisher_ = self.create_publisher(String, 'my_topic', 10)
self.timer = self.create_timer(1.0, self.timer_callback)
self.count = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.count}'
self.publisher_.publish(msg)
self.get_logger().info(f'发布: "{msg.data}"')
self.count += 1
def main(args=None):
rclpy.init(args=args)
node = MyPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
```
---
### 3. 编写订阅者节点 (`subscriber_node.py`)
```python
# subscriber_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MySubscriber(Node):
def __init__(self):
super().__init__('my_subscriber')
self.subscription = self.create_subscription(
String,
'my_topic',
self.listener_callback,
10)
def listener_callback(self, msg):
self.get_logger().info(f'收到: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = MySubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
```
---
### 4. 修改`setup.py`
```python
entry_points={
'console_scripts': [
'publisher = my_pubsub.publisher_node:main',
'subscriber = my_pubsub.subscriber_node:main'
],
},
```
---
### 5. 编译运行
```bash
cd ../..
colcon build --packages-select my_pubsub
source install/setup.bash
# 新终端运行发布者
ros2 run my_pubsub publisher
# 新终端运行订阅者
ros2 run my_pubsub subscriber
# 新终端查看连接
rqt_graph
```
---
### 6. 连接关系示意图(rqt_graph输出)
```
[my_publisher] ---> (my_topic) ---> [my_subscriber]
```
---
### 关键点说明:
1. **话题通信**:通过`my_topic`建立连接
2. **消息类型**:使用标准`String`消息类型
3. **调试输出**:
- 发布者每秒发送递增的计数器
- 订阅者实时显示接收内容
4. **可视化**:`rqt_graph`会显示两个节点通过话题连接的拓扑结构
---
### 测试效果:
- 发布者终端输出:
```
[INFO] [my_publisher]: 发布: "Hello World: 0"
[INFO] [my_publisher]: 发布: "Hello World: 1"
```
- 订阅者终端输出:
```
[INFO] [my_subscriber]: 收到: "Hello World: 0"
[INFO] [my_subscriber]: 收到: "Hello World: 1"
```
阅读全文
相关推荐


















