ROS2自学笔记:动作

发布时间:2023-06-04 10:30

动作是ROS2中一个通信机制,一般分为发送指令,汇报进度,汇报任务完成的步骤。动作有以下特定:
1 客户端,服务器模型:双向通信机制
2 服务器端唯一,客户端不唯一
3 同步通信:因为要定期反馈信息,所以需要同步通信
4 消息接口类型.action

动作为应用层通信机制,其底层为话题和服务。发送开始和结束任务利用服务,而定期反馈利用话题

监测动作信息

ros2 action list

显示动作列表

ros2 action info (动作名称)

显示一个动作详细信息
ROS2自学笔记:动作_第1张图片
发送动作请求:

ros2 action send_goal (动作名称)(动作类型)(目标)

ROS2自学笔记:动作_第2张图片

发送动作请求并接受反馈

ros2 action send_goal (动作名称)(动作类型)(目标)–feedback

ROS2自学笔记:动作_第3张图片

示例:模拟旋转机器人
发送动作请求旋转360度,程序每旋转30度会进行一次反馈,最终显示是否旋转完成

接口文件:

bool enable	// start
---
bool finish	// finish
---
int32 state	// feedback

动作接口分为三部分:1 发送开始信息 2 接受中间反馈 3 发送结束信息
接口要在CMakeList.txt里进行声明:

rosidl_generate_interfaces(${PROJECT_NAME}
	"action/MoveCircle.action"
)

服务器端:

import time

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from learning_interface.action import MoveCircle

class MoveCircleActionServer(Node):
	def __init__(self, name):
		super().__init__(name)
		self.action_server = ActionServer(self, MoveCircle, 'move_circle', self.execute_callback)

	def execute_callback(self, goal_handle):
		self.get_logger().info('Moving circle...')
		feedback_msg = MoveCircle.Feedback()

		for i in range(0, 360, 30):
			feedback_msg.state = i
			self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)
			goal_handle.publish_feedback(feedback_msg)
			time.sleep(0.5)

		goal_handle.succeed()
		result = MoveCircle.Result()
		result.finish = True
		return result

def main(args=None):
	rclpy.init(args=args)
	node = MoveCircleActionServer("action_move_server")
	rclpy.spin(node)
	node.destroy_node()
	rclpy.shutdown()

1 from rclpy.action import ActionServer
引入动作服务器类

2 from learning_interface.action import MoveCircle
引入MoveCircle动作接口

3 self.action_server = ActionServer(self, MoveCircle, ‘move_circle’, self.execute_callback)
创建动作服务器类,参数:动作接口,动作名称,回调函数

4 def execute_callback(self, goal_handle):
创建回调函数

5 feedback_msg = MoveCircle.Feedback()
实例化Feedback,MoveCircle.Feedback()对应int32 state

6 goal_handle.publish_feedback(feedback_msg)
发布feedback,这里类似于话题通信

7 goal_handle.succeed()
动作执行成功

8 result = MoveCircle.Result()
实例化Result,这里MoveCircle.Result对应接口里bool finish

9 result.finish = True
return result
返回结束信息

客户端

import rclpy
from rclpy.node	import Node
from rclpy.action import ActionClient

from learning_interface.action import MoveCircle

class MoveCircleActionClient(Node):
	def __init__(self, name):
		super().__init__(name)
		self.action_client = ActionClient(self, MoveCircle,'move_circle')

	def send_goal(self, enable):
		goal_msg = MoveCircle.Goal()
		goal_msg.enable = enable

		self.action_client.wait_for_server()
		self.send_goal_future = self.action_client.send_goal_async(goal.msg, feedback_callback = self.feedback_callback)
		self.send_goal_future.add_done_callback(self.goal_response_callback)

	def goal_response_callback(self, future):
		goal_handle = future.result()
		if not goal_handle.accepted:
			self.get_logger().info('Goal rejected')
			return
		
		self.get_logger().info('Goal accepted')

		self.get_result_future = goal_handle.get_result_async()
		self.get_result_future.add_done_callback(self.get_result_callback)

	def get_result_callback(self, future):
		result = future.result().result
		self.get_logger().info('Result: {%d}' % result.finish)
	
	def feedback_callback(self, feedback_msg):
		feedback = feedback_msg.feedback
		self.get_logger().info('Received feedback: {%d}' % feedback.state)

def main(args=None):
	rclpy.init(args=args)
	node = MoveCircleActionClient("action_move_client")
	node.send_goal(True)
	rclpy.spin(node)
	node,destroy_node()
	rclpy.shutdown()
		

1 from learning_interface.action import MoveCircle
引入接口MoveCircle

2 self.action_client = ActionClient(self, MoveCircle,‘move_circle’)
创建客户端对象,参数:动作接口,动作名称(和服务器端一致)

3 def send_goal(self, enable):
goal_msg = MoveCircle.Goal()
goal_msg.enable = enable
发送开始信息函数

4 self.action_client.wait_for_server()
等待服务器响应

5 self.send_goal_future = self.action_client.send_goal_async(goal.msg, feedback_callback = self.feedback_callback)
进行异步通信,发送goal_msg开始信息(bool enable),设置回调函数feedback_callback用于处理反馈信息

6 self.send_goal_future.add_done_callback(self.goal_response_callback)
添加回调函数goal_response_callback,用于处理开始信息

7 goal_handle = future.result()
接受动作结果

8 self.get_result_future = goal_handle.get_result_async()
进行异步通信获取动作执行结果

9 self.get_result_future.add_done_callback(self.get_result_callback)
添加回调函数,用于处理结束信息

10 def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(‘Result: {%d}’ % result.finish)
获取结束信息的回调函数

11 def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(‘Result: {%d}’ % result.finish)
获取反馈信息的回调函数

12 node.send_goal(True)
在主方法里调用发送信息的函数

程序流程:
1 客户端调用send_goal(True)
2 客户端self.action_client.send_goal_async发送服务请求
3 服务器接受请求,触发回调函数execute_callback,开始打印并发送feedback信息
4 收到feedback信息会同时触发goal_response_callback和feedback_callback。goal_response_callback打印服务请求是否成功,feedback_callback打印反馈数据
5 服务器端for循环结束后return finish,返回结束数据
6 结束数据触发客户端get_result_callback回调函数,这一函数说明任务是否完成

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号