728x90
반응형
Message, Service interface programming
- Test the created msg interface
[pub_node]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
import rclpy
# Node class import. In Node class, there are create_publisher(), create_timer(), get_logger() and so on.
from rclpy.node import Node
from rclpy.node import QoSProfile
# msg type import
from std_msgs.msg import String
# Add custum msg interface
from my_interface_example.msg import Num
# Create class and inherit Node class
class HelloworldPublihser(Node):
# Initializer(Constructor)
def __init__(self):
# calls the Node class's constructor and make node name
super().__init__('helloworld_publisher')
qos_profile = QoSProfile(depth=10)
# make publisher
# self.helloworld_publisher = self.create_publisher(String, 'helloworld', qos_profile)
self.helloworld_publisher = self.create_publisher(Num, 'num_topic', qos_profile)
# call callback func. every 1 sec
self.timer = self.create_timer(1, self.publish_helloworld_msg)
self.count = 0
# Define callback func.
def publish_helloworld_msg(self):
# call String class and asign to msg variable
# msg = String()
msg = Num()
# Define msg data using data property which is from String class
# msg.data = 'Helloworld: {0}'.format(self.count)
msg.num = self.count
# Actually publish : push on Topic
self.helloworld_publisher.publish(msg)
# Print on terminal
# self.get_logger().info('Published message: {0}'.format(msg.data))
self.get_logger().info('Published message: {0}'.format(msg.num))
self.count += 1
def main(args=None):
# rclpy initialize
rclpy.init(args=args)
# node : instance of HelloworldPublihser class
node = HelloworldPublihser()
# play the node. class. and its mean playing callback func.
rclpy.spin(node)
# close the node
node.destroy_node()
rclpy.shutdown()
'''
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
'''
if __name__ == '__main__':
main()
|
cs |
[sub_node]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import rclpy
# import Node class
from rclpy.node import Node
from rclpy.node import QoSProfile
# import String msg tyme class
# from std_msgs.msg import String
from my_interface_example.msg import Num
# Create HelloworldSubscriber Class, and inherit Node class
class HelloworldSubscriber(Node):
def __init__(self):
# Initialize by constructor and give subscribe node name
super().__init__('Helloworld_subscriber')
qos_profile = QoSProfile(depth=10)
# Define subscribe node
self.Helloworld_subscriber = self.create_subscription(
Num,
'num_topic',
self.subscribe_topic_message,
qos_profile)
# Define callback func.
# msg is defined in helloworld_publisher.py
def subscribe_topic_message(self, msg):
# print on terminal
self.get_logger().info('Received message: {0}'.format(msg.num))
def main(args=None):
# Initailize rclpy
rclpy.init(args=args)
# Instance of HelloworldSubscriber class
node = HelloworldSubscriber()
# play the node(HelloworldSubscriber class), its mean playing callbackfunc.
rclpy.spin(node)
# close the node
node.destroy_node()
rclpy.shutdown()
'''
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
'''
if __name__ == '__main__':
main()
|
cs |
[revise package.xml]
[ros run]
- Test the created srv interface
[server_node]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
import rclpy
# import Node class
from rclpy.node import Node
# import AddTwoInts srv type class
# from example_interfaces.srv import AddTwoInts
from my_interface_example.srv import AddThreeInts
# Define class and inherit Node class to use Node class's func.s
class SumServiceAsync(Node):
# Initializer(Constructor)
def __init__(self):
# calls the Node class's constructor and make node name
super().__init__('sum_service')
# Define srv.
self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback)
# Define callback func.
def add_three_ints_callback(self, request, response):
# Define variable
response.sum = request.a + request.b + request.c
self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c))
return response
def main(args=None):
rclpy.init(args=args)
sum_service = SumServiceAsync()
try:
rclpy.spin(sum_service)
except KeyboardInterrupt:
sum_service.get_logger().info('This node is finished by keboard interrupt')
finally:
sum_service.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
|
cs |
[client_node]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
import sys
import rclpy
# import Node class
from rclpy.node import Node
# import srv type class
# from example_interfaces.srv import AddTwoInts
from my_interface_example.srv import AddThreeInts
# Define SumClientAsync class
class SumClientAsync(Node):
def __init__(self):
super().__init__('sum_client_async')
# Define client
self.cli = self.create_client(AddThreeInts, 'add_three_ints')
# waiting for service getting ready
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info(
'service not available, waiting again...')
# req variable declare : Request func. of AddTwoInts class
self.req = AddThreeInts.Request()
def send_request(self):
# get command line input arg for the request
self.req.a = int(sys.argv[1])
self.req.b = int(sys.argv[2])
self.req.c = int(sys.argv[3])
# make a srv request and asynchronously get the result
self.future = self.cli.call_async(self.req)
def main(args=None):
rclpy.init(args=args)
# make instance of class
sum_client = SumClientAsync()
sum_client.send_request()
while rclpy.ok():
# excute the instance
rclpy.spin_once(sum_client)
# if srv request done
if sum_client.future.done():
try:
# get result and assign to variable
response = sum_client.future.result()
except Exception as e:
sum_client.get_logger().info(
'Service call failed %r' %(e,))
else:
sum_client.get_logger().info(
'Result of add_three_ints: for %d + %d + %d = %d' % (sum_client.req.a, sum_client.req.b, sum_client.req.c, response.sum))
break
sum_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
|
cs |
[revise package.xml]
[ros run]
728x90
반응형
'기계공학부 시절의 기록 > ROS2 Study' 카테고리의 다른 글
10 week HW (3) action programming : action server node (0) | 2021.11.09 |
---|---|
10 week HM (2) : create action interface (0) | 2021.11.09 |
8week ROS homework : Simple node code explain (0) | 2021.10.25 |
ROS homework week7 (0) | 2021.10.18 |
6 week HW Parameter control using turtlesim node (0) | 2021.10.12 |
댓글0