1. 개요
ros2 action server를 파이썬으로 작성한다. 목표값 n을 10으로 설정하면 1부터 10까지 누적합(55)을 계산한다. 중간중간 부분합을 feedback으로 보내준다.
2. workspace와 패키지 생성하기
> mkdir -p myross/src
> cd myros_ws/src
myaction과 myinterfaces 패키지를 만들어 본다.
> ros2 pkg create myaction --build-type ament_python
> ros2 pkg create myinterfaces --build-type ament_cmake
> cd ~/myros_ws
> colcon build
3. Action의 interface 정의하기
> cd ~/myros_ws/src/myinterfaces
> mkdir action
> cd action
> vi MyAction.action <== file 생성
MyAction.action 파일(대문자로 시작)을 만든다. --- 를 아래와 같이 2개 사용하여 3개의 공간을 만든다.
첫번째 상단 공간은 goal definition, 두번째 중간 공간은 result definition, 세번째 공간은 feedback공간이다.
# goal definition
int32 n_val
---
# result definition
int32 total
---
# feedback
int32 partial_sum
> cd ~/myros_ws/src/interfaces
> vi CMakeLists.txt
CMakeLists.txt 파일에 다음을 추가한다.
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME} "action/MyAction.action" DEPENDENCIES builtin_interfaces)
> vi package.xml 에서 다음을 추가한다. </package> 바로 위에 추가하면 된다.
<member_of_group>rosidl_interface_packages</member_of_group>
4. action server작성
> cd ~/myros_ws/src/myaction/myaction
my_server.py를 다음과 같이 만든다. __init__.py 파일이 있는 폴더에 만든다.
from rclpy.action import ActionServer
from rclpy.node import Node
import rclpy
# 사용자 정의 action 메시지를 임포트합니다.
# 실제 구현에서는 '.action' 파일에 정의된 action 타입을 사용해야 합니다.
from myinterfaces.action import MyAction
class MyActionServer(Node):
def __init__(self):
super().__init__('my_action_server')
self._action_server = ActionServer(
self,
MyAction,
'my_action',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = MyAction.Feedback()
total = 0
for i in range(1, goal_handle.request.n_val + 1):
total += i
feedback_msg.partial_sum = total
self.get_logger().info(f'Feedback: {total}')
goal_handle.publish_feedback(feedback_msg)
rclpy.spin_once(self, timeout_sec=0.1)
goal_handle.succeed()
result = MyAction.Result()
result.total = total
return result
def main(args=None):
rclpy.init(args=args)
action_server = MyActionServer()
rclpy.spin(action_server)
rclpy.shutdown()
if __name__ == '__main__':
main()
5. package.xml에 다음을 추가한다. </licence> 테그 밑에 입력하면 된다.
> cd ~/myros_ws/src/myaction
> vi package.xml
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>rclpy</depend>
<depend>myinterfaces</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
6. setup.py 파일을 열고 파이썬 패키지 빌드 설정을 합니다
> cd ~/myros_ws/src/myaction
> vi setup.py
entry_points={
'console_scripts':[
'server = myaction.my_server:main',
'client = myaction.my_client:main',
],
},
7. 컴파일 및 실행
> cd ~/myros_ws
> colcon build
> source ./install/setup.bash
> ros2 run myaction server
새로운 터미널을 실행하고
> ros2 action list
/my_action <== 이 액션이 보이는지 확인한다.
> ros2 action info /my_action
Action: /my_action
Action clients: 0
Action servers: 1
/my_action_server
다음과 같이 컴맨드 창에서 직접 호출 할 수 있다
> ros2 action send_goal /my_action myinterfaces/action/MyAction -- "{\"n_val\": 10}"
Sending goal:
n_val: 10
Goal accepted with ID: 02e9df50a8294bda89fa3ce862b037d6
Result:
total: 55
Goal finished with status: SUCCEEDED
> cd ~/myros_ws/src/myaction/myaction
> vi my_client.py <= my_server.py와 같은 폴더에 만들고 다음과 같이 입력한다.
import rclpy
from rclpy.action import ActionClient
from myinterfaces.action import MyAction
def feedback_callback(feedback_msg):
print(f"Received feedback: {feedback_msg.feedback.partial_sum}")
def main(args=None):
rclpy.init(args=args)
node = rclpy.create_node('my_action_client_node')
action_client = ActionClient(node, MyAction, 'my_action')
goal = MyAction.Goal()
goal.n_val = 5
action_client.wait_for_server()
send_goal_future = action_client.send_goal_async(goal, feedback_callback=feedback_callback)
rclpy.spin_until_future_complete(node, send_goal_future)
goal_handle = send_goal_future.result()
if not goal_handle.accepted:
print('Goal rejected :(')
return
get_result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(node, get_result_future)
result = get_result_future.result().result
if result is not None:
print(f"Result: {result.total}")
else:
print("Action failed to complete")
rclpy.shutdown()
if __name__ == '__main__':
main()
ros2 run fibonacci_action_py server
> cd ~/myros_ws
> colcon build
> source ./install/setup.bash
> ros2 run myaction client
Received feedback: 1
Received feedback: 3
Received feedback: 6
Received feedback: 10
Received feedback: 15
Result: 15