【ROS】Actionlibで時間のかかる処理を実行
- 最初にこれまでのおさらい
- TopocのPublisherとSubscriberは、非同期のデータフロー
- Serviseは同期通信
- Actionlibは、どの司令に対する結果なのかや、実行中のフィードバックを返すことができる
- 例えば、自立移動の命令を送り、移動が終わった時に結果が失敗か成功かを即座に知ることができる
- Serviceは処理が終わるまで待つことができるが、その間は呼び出し側のプログラムはブロックとして停止する欠点
- 例えば、自立移動の命令を送り、移動が終わった時に結果が失敗か成功かを即座に知ることができる
$ roscd ros_start/ $ mkdir action $ vi action/GoUntilBumper.action
geometry_msgs/Twist target_vel int32 timeout_sec --- bool bumper_hit --- geometry_msgs/Twist current_vel
- 1つめのブロック
- 「Goal」と呼ばれる。引数のようなもので、目的地を表す。
- 2つめのブロック
- 「Result」と呼ばれる。成否を主に返す
- 3つめのブロック
- 「Feedback」と呼ばれる。途中経過として返したい情報を発行
CMakeLists.txtの編集
find_packageを下のように編集。前回からactionlib_msgsとgeometry_msgsを加えただけ。
find_package(catkin REQUIRED COMPONENTS roscpp rospy std_msgs message_generation actionlib_msgs geometry_msgs )
add_action_filesを以下のように書き換える
## Generate actions in the 'action' folder add_action_files( FILES GoUntilBumper.action )
generate_messagesにgeometry_smgsとactionlib_msgsを書き加える
## Generate added messages and services with any dependencies listed here generate_messages( DEPENDENCIES std_msgs geometry_msgs actionlib_msgs ):
$ cd ~/catkin_ws/ $ catkin_make
以下のようにmsgsファイルが表示されていれば正常
$ ls devel/share/ros_start/msg/ GoUntilBumperAction.msg GoUntilBumperFeedback.msg GoUntilBumperActionFeedback.msg GoUntilBumperGoal.msg GoUntilBumperActionGoal.msg GoUntilBumperResult.msg GoUntilBumperActionResult.msgb
Action Serverの作成
~/catkin_ws/src/ros_start/scripts$ vi bumper_action.py
#!/usr/bin/env python import rospy import actionlib from geometry_msgs.msg import Twist from kobuki_msgs.msg import BumperEvent from ros_start.msg import GoUntilBumperAction from ros_start.msg import GoUntilBumperResult from ros_start.msg import GoUntilBumperFeedback class BumperAction( object ): def __init__( self ): # make ***Pulisher*** and ***Subscriber*** self._pub = rospy.Publisher( '/mobile_base/commands/velocity', Twist, queue_size = 10 ) self._sub = rospy.Subscriber( '/mobile_base/events/bumper', BumperEvent, self.bumper_callback, queue_size = 1 ) # Get maximum speed from parameter( defalut: 0.5) self._max_vel = rospy.get_param( '~max_vel', 0.5 ) # make Action's server. ## action's name: bumper_action ## type: GoUntilBumperAction ## exuecute_cb: Server to actually run ## auto_start: auto start ( false ) self._action_server = actionlib.SimpleActionServer( 'bumper_action', GoUntilBumperAction, execute_cb = self.go_until_bumper, auto_start = False ) # Initialize variables to determine if the bumper is hit self._hit_bumper = False # start self._action_server.start() def bumper_callback( self, bumper ): # when the bumper hits self._hit_bumper = True def go_until_bumper( self, goal ): # Action's server print( goal.target_vel ) r = rospy.Rate( 10.0 ) # 10kHz zero_vel = Twist() # 1 second in 10 loops for i in range( 10 * goal.timeout_sec ): # timeout_sec is input by commands if self._action_server.is_preempt_requested(): # stop intruction self._action_server.set_preempted() # Send that action was blocked by user request break if self._hit_bumper: self._pub.publish( zero_vel ) break else: if goal.target_vel.linear.x > self._max_vel: goal.target_vel.linear.x = self._max_vel self._pub.publish( goal.target_vel ) # Publish feedback = GoUntilBumperFeedback( current_vel = goal.target_vel ) self._action_server.publish_feedback( feedback ) # Pass an instance of GoUntilBumperFeedback class to publish_feedback() r.sleep() # Call set_succeeded() by inserting the result of bumper_hit into an instance of class GoUntilBumperResult result = GoUntilBumperResult( bumper_hit = self._hit_bumper ) self._action_server.set_succeeded( result ) if __name__ == '__main__': rospy.init_node( 'bumper_action' ) bumper_action = BumperAction() rospy.spin()
$ roscore
$ chmod 755 bumper_action.py $ rosrun ros_start bumper_action.py
$ roslaunch kobuki_gazebo kobuki_playground.launch
$ rosrun actionlib axclient.py /bumper_action
ここで、linearのxを0.8、timeout_secを10に書き換え、SEND GOALをクリックする。
ブロックに衝突して止まる
このとき、GUIのClientの画面はこのようになっているはず。bumper_hitがTrueになっているこを確認。
Action Clientの作成
~/catkin_ws/src/ros_start/scripts$ vi bumper_client.py
#!/usr/bin/env python import rospy import actionlib from ros_start.msg import GoUntilBumperAction from ros_start.msg import GoUntilBumperGoal def go_until_bumper(): # make Action's client # name: bumper_action # type: GoUntilBumperAction action_client = actionlib.SimpleActionClient( 'bumper_action', GoUntilBumperAction ) action_client.wait_for_server() # Wait until the server is ready # Set GoUntilBumperGaol's instance goal = GoUntilBumperGoal() goal.target_vel.linear.x = 0.8 goal.timeout_sec = 10 action_client.send_goal( goal ) # Send data ( Publish to topic bumper_action/goal ) action_client.wait_for_result() # wait for result result = action_client.get_result() if result.bumper_hit: rospy.loginfo( 'bumper hit!' ) else: rospu.loginfo( 'faild!' ) if __name__ == '__main__': try: rospy.init_node( 'bumper_client' ) go_until_bumper() except rospy.ROSInterruptException: pass
$ roslaunch kobuki_gazebo kobuki_playground.launch
$ rosrun ros_start bumper_action.py
$ chmod 755 bumper_client.py $ rosrun ros_start bumper_client.py [INFO] [1557812920.030237, 79.450000]: bumper hit!
やっていることはGUIを使った時と変わらないので、動作は同じ。