23. [ROS RC์นด] ์์ฉ ์ค์ต, Object Following
๋๋ฒ์งธ ์์ฉ ์ฌ๋ก์ธ Object Following ์์ ๋ฅผ ์์ํด ๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค. ๐
๋จผ์ ์คํ ๋ชจ์ต๋ถํฐ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
jetson์ ๊ตฌ๋งคํ์ จ๋ค๋ฉด ๋ชจ๋๋ค ๊ฐ์ง๊ณ ์๋ ์ด ๋ น์ ๋ฐ์ค๋ฅผ ๋ฐ๋ผ์ ์กฐํฅํ๋ ์์ ์ ๋๋ค.
- ๋จผ์ ์คํ์์ผ๋ณด๊ธฐ
$ roscore
# camera image publisher
$ rosrun csi_camera csi_pub.py
# CV Magic, Bind Ball with Pixel Value
$ rosrun donkey_cv find_ball.py
# Blob Point to Twist
$ rosrun donkey_control chase_the_ball.py
# PWM Control node
$ rosrun donkey_control blob_chase.py
# image_view ์คํ
$ rosrun image_view image_view image:=/blob/image_blob
๋ฌธ์ ์์ด ์ ์คํ ๋์ จ๋์?? ์ ์ฒด์ ์ธ ๊ตฌ์กฐ๋ฅผ ์ดํด๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค.
rqt_graph
- csi_pub : ์นด๋ฉ๋ผ ์ด๋ฏธ์ง๋ฅผ ๊ฒ์ํด์ ์ก์ถํด์ฃผ๋ ๋ ธ๋์ด์ง์?
- blob_detector : ์ปดํจํฐ ๋น์ ์ด ์ ์ฉ๋ ๋ ธ๋๋ก, ๋ น์ ๋ฌผ์ฒด๋ฅผ ์ฐพ์๋ด๊ณ , ์ง์์ ์ผ๋ก ์ถ์ ํ๋ ๋ ธ๋
- chase_ball : ์ด์ ๋ ธ๋์์ ๋ น์ ๋ฌผ์ฒด์ ์์น๊ฐ ํฝ์ ๋จ์๋ก ์ค๊ฒ ๋ฉ๋๋ค. ์ด ํฝ์ ์ ์์น์ ๋ฐ๋ผ์ ์ ์ด ์ ํธ๋ฅผ Twist ํ์์ผ๋ก ๋ณํํฉ๋๋ค.
- image_view : ์ด์ ์๋ ์ดํด๋ณด์๋, ์ด๋ฏธ์ง ํ ํฝ์ ๋ณผ ์ ์๊ฒ ํด์ฃผ๋ ๋ ธ๋์ ๋๋ค.
- blob_chase_node : ์ค์ง์ ์ธ PWM ์ ์ด๋ฅผ ๋ด๋นํ๋ ๋ ธ๋์ ๋๋ค.
์ง๋ ํ๋ก์ ํธ ๋ถ์๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก, ์ด๋ฒ์๋ ๋ท๋ถ๋ถ์ node์์๋ถํฐ ์ฐจ๊ทผ์ฐจ๊ทผ ๋ถ์ํด ๋ณด๊ฒ ์ต๋๋ค.
์ฐธ๊ณ ๋ก ์ด๋ฒ ํ๋ก์ ํธ๋ ๋ค์์ ์คํ์์ค๋ฅผ ์ฐธ๊ณ ์ผ์ ์ ์ํ์์ต๋๋ค.
blob_chase.py
#!/usr/bin/env python
"""
referenced from those projects
DkLowLevelCtrl, ServoConvert part from tizianofiorenzani/ros_tutorials
url: https://github.com/tizianofiorenzani/ros_tutorials
PCA9685 part from donkeycar
url: https://github.com/autorope/donkeycar/blob/99c853b1737f12019ae598d3c7f00699d2166472/donkeycar/parts/actuator.py#L12
Listens to /dkcar/control/cmd_vel for corrective actions to the /cmd_vel coming from keyboard or joystick
"""
import rospy
import time
from geometry_msgs.msg import Twist
class PCA9685:
"""
PWM motor controler using PCA9685 boards.
This is used for most RC Cars
"""
def __init__(
self, channel, address=0x40, frequency=60, busnum=None, init_delay=0.1
):
self.default_freq = 60
self.pwm_scale = frequency / self.default_freq
import Adafruit_PCA9685
# Initialise the PCA9685 using the default address (0x40).
if busnum is not None:
from Adafruit_GPIO import I2C
# replace the get_bus function with our own
def get_bus():
return busnum
I2C.get_default_bus = get_bus
self.pwm = Adafruit_PCA9685.PCA9685(address=address)
self.pwm.set_pwm_freq(frequency)
self.channel = channel
time.sleep(init_delay) # "Tamiya TBLE-02" makes a little leap otherwise
self.pulse = 340
self.prev_pulse = 340
self.running = True
def set_pwm(self, pulse):
try:
self.pwm.set_pwm(self.channel, 0, int(pulse * self.pwm_scale))
except:
self.pwm.set_pwm(self.channel, 0, int(pulse * self.pwm_scale))
def run(self, pulse):
pulse_diff = pulse - self.prev_pulse
if abs(pulse_diff) > 40:
if pulse_diff > 0:
pulse += 0.7 * pulse_diff
else:
pulse -= 0.7 * pulse_diff
self.set_pwm(pulse)
self.prev_pulse = pulse
def set_pulse(self, pulse):
self.pulse = pulse
def update(self):
while self.running:
self.set_pulse(self.pulse)
class ServoConvert:
def __init__(self, id=1, center_value=370, range=90, direction=1):
self.value = 0.0
self.value_out = center_value
self._center = center_value
self._range = range
self._half_range = 0.5 * range # 45
self._dir = direction # 1 or -1
self.id = id
# --- Convert its range in [-1, 1]
self._sf = 1.0 / self._half_range # 1 / 45
def get_value_out(self, value_in):
# --- twist type value is in [-1, 1]
self.value = value_in
self.value_out = int(self._dir * value_in * self._half_range + self._center)
return self.value_out
def saturate(value, min, max):
if value <= min:
return min
elif value >= max:
return max
else:
return value
class DkLowLevelCtrl:
def __init__(self):
rospy.loginfo("Setting Up the Node...")
# --- Initialize the node
rospy.init_node("blob_chase_node")
self._throttle = PCA9685(channel=0, busnum=1)
rospy.loginfo("Throttle Controler Awaked!!")
self._steering_servo = PCA9685(channel=1, busnum=1)
rospy.loginfo("Steering Controler Awaked!!")
self.actuators = {}
self.actuators["throttle"] = ServoConvert(
id=1, center_value=350, range=80
)
self.actuators["steering"] = ServoConvert(
id=2, center_value=375, range=80, direction=1
) # -- positive left
rospy.loginfo("> Actuators corrrectly initialized")
# --- Create a debug publisher for resulting cmd_vel
self.ros_pub_debug_command = rospy.Publisher(
"/dkcar/debug/cmd_vel", Twist, queue_size=1
)
rospy.loginfo("> Publisher corrrectly initialized")
# --- Create the Subscriber to Twist commands
self.ros_sub_twist = rospy.Subscriber(
"/cmd_vel", Twist, self.update_message_from_command
)
rospy.loginfo("> Subscriber corrrectly initialized")
# --- Create the Subscriber to obstacle_avoidance commands
self.ros_sub_twist = rospy.Subscriber(
"/dkcar/control/cmd_vel", Twist, self.update_message_from_chase
)
rospy.loginfo("> Subscriber corrrectly initialized")
self.throttle_cmd = 0.0
self.throttle_chase = 1.0
self.steer_cmd = 0.0
self.steer_chase = 0.0
self._debud_command_msg = Twist()
# --- Get the last time e got a commands
self._last_time_cmd_rcv = time.time()
self._last_time_chase_rcv = time.time()
self._timeout_ctrl = 100
self._timeout_blob = 1
rospy.loginfo("Initialization complete")
def update_message_from_command(self, message):
self._last_time_cmd_rcv = time.time()
self.throttle_cmd = message.linear.x
self.steer_cmd = message.angular.z
def update_message_from_chase(self, message):
self._last_time_chase_rcv = time.time()
self.throttle_chase = message.linear.x
self.steer_chase = message.angular.z
# print(self.throttle_chase, self.steer_chase)
def compose_command_velocity(self):
self.throttle = saturate(self.throttle_cmd * self.throttle_chase, -1, 1)
# -- Add steering
self.steer = saturate(self.steer_cmd + self.steer_chase, -1, 1)
self._debud_command_msg.linear.x = self.throttle
self._debud_command_msg.angular.z = self.steer
self.ros_pub_debug_command.publish(self._debud_command_msg)
self.set_actuators_from_cmdvel(self.throttle, self.steer)
def set_actuators_from_cmdvel(self, throttle, steering):
"""
Get a message from cmd_vel, assuming a maximum input of 1
"""
# -- Convert vel into servo values
self.actuators["throttle"].get_value_out(throttle)
self.actuators["steering"].get_value_out(steering)
# rospy.loginfo("Got a command v = %2.1f s = %2.1f"%(throttle, steering))
self.set_pwm_pulse(self.actuators["throttle"].value_out, self.actuators["steering"].value_out)
print( "throttle value : " + str(self.actuators["throttle"].value_out))
print( "steering value : " + str(self.actuators["steering"].value_out))
def set_pwm_pulse(self, speed_pulse, steering_pulse):
self._throttle.run(speed_pulse)
self._steering_servo.run(steering_pulse)
def set_actuators_idle(self):
# -- Convert vel into servo values
self.throttle_cmd = 0.0
self.steer_cmd = 0.0
def reset_avoid(self):
self.throttle_chase = 1.0
self.steer_avoid = 0.0
@property
def is_controller_connected(self):
# print time.time() - self._last_time_cmd_rcv
return time.time() - self._last_time_cmd_rcv < self._timeout_ctrl
@property
def is_chase_connected(self):
return time.time() - self._last_time_chase_rcv < self._timeout_blob
def run(self):
# --- Set the control rate
rate = rospy.Rate(10)
while not rospy.is_shutdown():
self.compose_command_velocity()
if not self.is_controller_connected:
self.set_actuators_idle()
if not self.is_chase_connected:
self.reset_avoid()
rate.sleep()
if __name__ == "__main__":
dk_llc = DkLowLevelCtrl()
dk_llc.run()
๐ต ์ง๊ธ๊ป ๋ด์๋ ์์ ์ ๋นํด ์๋นํ ๊ธด ์ฝ๋์ธ๋ฐ์, ํ์ง๋ง ์ค๋ณต๋๋ ๋ถ๋ถ๋ ๋ง์ ํฌ๊ฒ ๊ฑฑ์ ํ์ง ์์ผ์ ๋ ์ข์ต๋๋ค. ์ฐจ๊ทผ์ฐจ๊ทผ ์ดํด๋ณด๊ฒ ์ต๋๋ค!!
์ด ํ๋ก์ ํธ๋ donkeycar์ youtube ํ๋ก์ ํธ์์ ํ์ ๋ถ๋ถ๋ค์ ์ทจํฉํ์ฌ ๋ง๋ค์๋ค๊ณ ๋ณด์๋ฉด ๋ฉ๋๋ค.
- import
import rospy
import time
from geometry_msgs.msg import Twist
- PCA9685 Class
class PCA9685:
"""
PWM motor controler using PCA9685 boards.
This is used for most RC Cars
"""
def __init__(
self, channel, address=0x40, frequency=60, busnum=None, init_delay=0.1
):
self.default_freq = 60
self.pwm_scale = frequency / self.default_freq
...
์ด์ ๊ฒ์ ์ปจํธ๋กค๋ฌ ์์ ์ ์์ ๋์ผํ ํด๋์ค์ ๋๋ค. ์์ธํ ์ค๋ช ์ ์ด์ ๊ฐ์๋ฅผ ์ฐธ์กฐํด์ฃผ์ธ์ ๐
- ServoConvert Class
class ServoConvert:
def __init__(self, id=1, center_value=370, range=90, direction=1):
self.value = 0.0
self.value_out = center_value
self._center = center_value
self._range = range
self._half_range = 0.5 * range # 45
self._dir = direction # 1 or -1
self.id = id
# --- Convert its range in [-1, 1]
self._sf = 1.0 / self._half_range # 1 / 45
def get_value_out(self, value_in):
# --- twist type value is in [-1, 1]
self.value = value_in
self.value_out = int(self._dir * value_in * self._half_range + self._center)
# print self.id, self.value_out
return self.value_out
Twist ํ์์ ์๋ ๊ฐ์ ๋ฐ์ PWM Pulse๋ก ๋งคํ์์ผ์ฃผ๋ ํด๋์ค์ ๋๋ค.
- ํด๋์ค ๋ฉค๋ฒ ๋ณ์๋ค์ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
self.value = 0.0
self.value_out = center_value
self._center = center_value
self._range = range
self._half_range = 0.5 * range # 45
self._dir = direction # 1 or -1
self.id = id
# --- Convert its range in [-1, 1]
self._sf = 1.0 / self._half_range # 1 / 45
self.value_out
: output value
self._center
: ํด๋น ๋ชจํฐ์ ์์
self._range
: ํด๋น ๋ชจํฐ์ ์ต์ ~ ์ต๋ ๊ฐ๋๋ฒ์
self._half_range
: ๊ฐ๋๋ฒ์์ ์ ๋ฐ์ ๋๋ค.
self._dir
: ์์ ์ ๊ธฐ์ค์ผ๋ก์ ๋ฐฉํฅ, 1 ํน์ -1 ์ด ๋ฉ๋๋ค.
๋ค์์ ์์์ด ๋ฉ์ธ์ด๋ผ๊ณ ๋ณด์๋ฉด ๋ฉ๋๋ค.
-1 ~ 1 ์ฌ์ด์ value_in ๊ฐ์ด ๋ค์ด์ค๊ฒ ๋๋ฉฐ, ์๋์ ๊ฐ์ ์์์ ํตํด ์ต์ข PWM Pulse๋ฅผ ๊ณ์ฐํฉ๋๋ค.
self.value_out = int(self._dir * value_in * self._half_range + self._center)
์๋ฅผ ๋ค์ด ๋ด ์๋ค.
- dir = 1
- half_range = 45 (range = 90)
- center = 370
์์ ๊ฐ์ ์ํฉ์์ value_in์ด -1 ~ 1 ๊น์ง ๋ณํํ๋ค๊ณ ์๊ฐํ๋ฉด
- value_in = -1 โ {1 * (-1) * 45} + 370 = 325
- value_in = 1 โ {1 * (1) * 45} + 370 = 415
value_in์ 325 ~ 415์ ๊ฐ์ ๊ฐ๊ฒ ๋ฉ๋๋ค.
์ด ๊ณ์ฐ์ ์๋ก์ด Twist ๊ฐ์ด subscribe ๋ ๋๋ง๋ค ์คํ๋ฉ๋๋ค.
def set_actuators_from_cmdvel(self, throttle, steering):
"""
Get a message from cmd_vel, assuming a maximum input of 1
"""
# -- Convert vel into servo values
self.actuators["throttle"].get_value_out(throttle)
self.actuators["steering"].get_value_out(steering)
- DkLowLevelCtrl Class
def run(self):
# --- Set the control rate
rate = rospy.Rate(10)
while not rospy.is_shutdown():
self.compose_command_velocity()
# print self._last_time_cmd_rcv, self.is_controller_connected
if not self.is_controller_connected:
self.set_actuators_idle()
if not self.is_chase_connected:
self.reset_avoid()
rate.sleep()
if __name__ == "__main__":
dk_llc = DkLowLevelCtrl()
dk_llc.run()
๊ฐ์ฅ ๊ธธ๊ณ , ๋ณต์กํ class ์ธ๋ฐ์, ๋ณด์๋ค์ํผ main์ ์ดํด๋ณด๋ฉด, ์ค์ ๋ก๋ run ํจ์๊ฐ ๋์๋๋ ๊ฒ์ด ๋ณด์ด๊ธฐ์ ์ฌ๊ธฐ์๋ถํฐ ๋ถ์ํด ๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค.
์์ธ์ฒ๋ฆฌ์ ํด๋นํ๋ ๋ถ๋ถ์ ์ดํผ๊ฒ ์ต๋๋ค.
self._timeout_ctrl = 100
def set_actuators_idle(self):
# -- Convert vel into servo values
self.throttle_cmd = 0.0
self.steer_cmd = 0.0
def reset_avoid(self):
self.throttle_chase = 1.0
self.steer_avoid = 0.0
@property
def is_controller_connected(self):
# print time.time() - self._last_time_cmd_rcv
return time.time() - self._last_time_cmd_rcv < self._timeout_ctrl
@property
def is_chase_connected(self):
return time.time() - self._last_time_chase_rcv < self._timeout_blob
1. ํน์ ์๊ฐ๋์ ์ธ๋ถ์์ cmd_vel ์ ์ด ์ ํธ๊ฐ ์๋ค๋ฉด, ์ ์ด์ ๊ด๋ จ๋ ๋ฉค๋ฒ ๋ณ์๋ฅผ ์ด๊ธฐ ์ํ๋ก ๋๋๋ฆฝ๋๋ค.
2. ํน์ ์๊ฐ๋์ ์ถ์ ํด์ผ ํ ์ด๋ฏธ์ง๊ฐ ๋ณด์ด์ง ์์๋ค๋ฉด, ์งํ ๋ฐฉํฅ์ ์ ๋ฉด์ผ๋ก ์ด๊ธฐํ์ํต๋๋ค.
- compose_command_velocity
def compose_command_velocity(self):
self.throttle = saturate(self.throttle_cmd * self.throttle_chase, -1, 1)
# -- Add steering
self.steer = saturate(self.steer_cmd + self.steer_chase, -1, 1)
self._debud_command_msg.linear.x = self.throttle
self._debud_command_msg.angular.z = self.steer
self.ros_pub_debug_command.publish(self._debud_command_msg)
self.set_actuators_from_cmdvel(self.throttle, self.steer)
๐ ํ๋ํ๋์ฉ ์ดํด๋ด ์๋ค.
- saturate
self.throttle = saturate(self.throttle_cmd * self.throttle_chase, -1, 1)
self.steer = saturate(self.steer_cmd + self.steer_chase, -1, 1)
def saturate(value, min, max):
if value <= min:
return min
elif value >= max:
return max
else:
return value
์ต๋, ์ต์๊ฐ์ ๋์ง ์๋๋ก ํ๊ณ๋ฅผ ์ ํด์ฃผ๋ ํจ์์ ๋๋ค.
์๋ฅผ ๋ค๋ฉด, -1.2 โ -1 / 1.3 โ 1 ์ด๋ ๊ฒ ๋ณํ๋๊ฒ ์ง์
# --- Create a debug publisher for resulting cmd_vel
self.ros_pub_debug_command = rospy.Publisher(
"/dkcar/debug/cmd_vel", Twist, queue_size=1
)
...
self._debud_command_msg.linear.x = self.throttle
self._debud_command_msg.angular.z = self.steer
self.ros_pub_debug_command.publish(self._debud_command_msg)
๋๋ฒ๊น ํ ํฝ์ผ๋ก publish ํ๋ ๋ชจ์ต์ ๋๋ค.
- set_actuators_from_cmdvel
self.set_actuators_from_cmdvel(self.throttle, self.steer)
def set_actuators_from_cmdvel(self, throttle, steering):
"""
Get a message from cmd_vel, assuming a maximum input of 1
"""
# -- Convert vel into servo values
self.actuators["throttle"].get_value_out(throttle)
self.actuators["steering"].get_value_out(steering)
self.set_pwm_pulse(self.actuators["throttle"].value_out, self.actuators["steering"].value_out)
print( "throttle value : " + str(self.actuators["throttle"].value_out))
print( "steering value : " + str(self.actuators["steering"].value_out))
์๊น ์ดํด๋ณด์๋ ์์์ธ get_value_out
๊ฐ ์ฌ์ฉ๋๊ณ ์์ต๋๋ค!!
PWM pulse๋ฅผ ๊ณ์ฐํ์ฌ ์ต์ข
์ ์ผ๋ก set_pwm_pulse
๋ฅผ ํตํด
PCA9685 ํด๋์ค์ pulse modulation์ ํ๊ณ ์์ต๋๋ค.
- set_pwm_pulse
self._throttle = PCA9685(channel=0, busnum=1)
rospy.loginfo("Throttle Controler Awaked!!")
self._steering_servo = PCA9685(channel=1, busnum=1)
rospy.loginfo("Steering Controler Awaked!!")
def set_pwm_pulse(self, speed_pulse, steering_pulse):
self._throttle.run(speed_pulse)
self._steering_servo.run(steering_pulse)
๊ฑฐ์ ๋ค ์ดํด๋ณด์์ต๋๋ค.
๋ง์ง๋ง์ผ๋ก subscriber ์์ฑ ๋ถ๋ถ์ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
# --- Create the Subscriber to Twist commands
self.ros_sub_twist = rospy.Subscriber(
"/cmd_vel", Twist, self.update_message_from_command
)
rospy.loginfo("> Subscriber corrrectly initialized")
# --- Create the Subscriber to obstacle_avoidance commands
self.ros_sub_twist = rospy.Subscriber(
"/dkcar/control/cmd_vel", Twist, self.update_message_from_chase
)
rospy.loginfo("> Subscriber corrrectly initialized")
def update_message_from_command(self, message):
self._last_time_cmd_rcv = time.time()
self.throttle_cmd = message.linear.x
self.steer_cmd = message.angular.z
def update_message_from_chase(self, message):
self._last_time_chase_rcv = time.time()
self.throttle_chase = message.linear.x
self.steer_chase = message.angular.z
์์ ๋ง์๋๋ ธ์ง๋ง, ํ ํ๋ก์ ํธ๋ ์ ์ง, ํ์ง ์ ์ด๋ ์ฌ์ฉ์๋ก ๋ถํฐ ๋ฐ๋ก ๋ฐ๊ณ ์์ง์?
์ด ๋ถ๋ถ์ ๋ํด ์ ๋ก์ฌํญ์ด ์์ผ์ค ๊ฒ์ ๋๋ค. ๊ทธ๋์, ์ด์ํ ์ผ์๋ฅผ ํตํด ์ด๋ฅผ ๋ณด์ํ๋ ๋๋ค๋ฅธ ๊ฐ์๋ฅผ ์ฒจ๋ถํ๋, ๋ค์ ๊ฐ์๊น์ง ์๋ฃํ์ ์ดํ ๋์ ํด ๋ณด์๊ธฐ๋ฅผ ์ถ์ฒ๋๋ฆฝ๋๋ค.
๋ค์ ๊ฐ์์ ์ด์ด์ ๋๋จธ์ง ๋ ธ๋๋ค๋ ๋ถ์ํด ๋ณด๊ฒ ์ต๋๋ค!!