๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

์นดํ…Œ๊ณ ๋ฆฌ ์—†์Œ

24. [ROS RC์นด] ์‘์šฉ ์‹ค์Šต, Object Following - 2

๐Ÿ“—

24. [ROS RC์นด] ์‘์šฉ ์‹ค์Šต, Object Following - 2

๊ณ„์†ํ•ด์„œ ๋‹ค๋ฅธ node๋“ค๋„ ๋ถ„์„ํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

๊ทธ ์ „์—! ๋ณต์Šต์„ ํ•˜๊ณ  ๋„˜์–ด๊ฐ€๊ฒ ์Šต๋‹ˆ๋‹ค ๐Ÿ˜Ž

rqt_graph

  • csi_pub : ์นด๋ฉ”๋ผ ์ด๋ฏธ์ง€๋ฅผ ๊ณ„์†ํ•ด์„œ ์†ก์ถœํ•ด์ฃผ๋Š” ๋…ธ๋“œ์ด์ง€์š”?
  • blob_detector : ์ปดํ“จํ„ฐ ๋น„์ „์ด ์ ์šฉ๋œ ๋…ธ๋“œ๋กœ, ๋…น์ƒ‰ ๋ฌผ์ฒด๋ฅผ ์ฐพ์•„๋‚ด๊ณ , ์ง€์†์ ์œผ๋กœ ์ถ”์ ํ•˜๋Š” ๋…ธ๋“œ
  • chase_ball : ์ด์ „ ๋…ธ๋“œ์—์„œ ๋…น์ƒ‰ ๋ฌผ์ฒด์˜ ์œ„์น˜๊ฐ€ ํ”ฝ์…€ ๋น„์œจ ๋‹จ์œ„๋กœ ์˜ค๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ด ํ”ฝ์…€์˜ ์œ„์น˜์— ๋”ฐ๋ผ์„œ ์ œ์–ด ์‹ ํ˜ธ๋ฅผ Twist ํ˜•์‹์œผ๋กœ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
  • image_view : ์ด์ „์—๋„ ์‚ดํŽด๋ณด์•˜๋˜, ์ด๋ฏธ์ง€ ํ† ํ”ฝ์„ ๋ณผ ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๋Š” ๋…ธ๋“œ์ž…๋‹ˆ๋‹ค.
  • blob_chase_node : ์‹ค์งˆ์ ์ธ PWM ์ œ์–ด๋ฅผ ๋‹ด๋‹นํ•˜๋Š” ๋…ธ๋“œ์ž…๋‹ˆ๋‹ค.

chase_the_ball.py


#!/usr/bin/python
"""
Gets the position of the blob and it commands to steer the wheels

referenced from tizianofiorenzani/ros_tutorials
url: https://github.com/tizianofiorenzani/ros_tutorials

Subscribes to 
    /blob/point_blob
    
Publishes commands to 
    /dkcar/control/cmd_vel    

"""
import math, time
import rospy
from geometry_msgs.msg import Twist
from geometry_msgs.msg import Point

K_LAT_DIST_TO_STEER = 2.0


def saturate(value, min, max):
    if value <= min:
        return min
    elif value >= max:
        return max
    else:
        return value


class ChaseBall:
    def __init__(self):

        self.blob_x = 0.0
        self.blob_y = 0.0
        self._time_detected = 0.0

        self.sub_center = rospy.Subscriber("/blob/point_blob", Point, self.update_ball)
        rospy.loginfo("Subscribers set")

        self.pub_twist = rospy.Publisher("/dkcar/control/cmd_vel", Twist, queue_size=5)
        rospy.loginfo("Publisher set")

        self._message = Twist()

        self._time_steer = 0
        self._steer_sign_prev = 0

    @property
    def is_detected(self):
        return time.time() - self._time_detected < 1.0

    def update_ball(self, message):
        self.blob_x = message.x
        self.blob_y = message.y
        self._time_detected = time.time()
        # rospy.loginfo("Ball detected: %.1f  %.1f "%(self.blob_x, self.blob_y))

    def get_control_action(self):
        """
        Based on the current ranges, calculate the command

        Steer will be added to the commanded throttle
        throttle will be multiplied by the commanded throttle
        """
        steer_action = 0.0
        throttle_action = 0.0

        if self.is_detected:
            # --- Apply steering, proportional to how close is the object
            steer_action = -K_LAT_DIST_TO_STEER * self.blob_x
            steer_action = saturate(steer_action, -1.5, 1.5)
            rospy.loginfo("Steering command %.2f" % steer_action)
            throttle_action = 1.0

        return (steer_action, throttle_action)

    def run(self):

        # --- Set the control rate
        rate = rospy.Rate(5)

        while not rospy.is_shutdown():
            # -- Get the control action
            steer_action, throttle_action = self.get_control_action()

            rospy.loginfo("Steering = %3.1f" % (steer_action))

            # -- update the message
            self._message.linear.x = throttle_action
            self._message.angular.z = steer_action

            # -- publish it
            self.pub_twist.publish(self._message)

            rate.sleep()


if __name__ == "__main__":

    rospy.init_node("chase_ball")

    chase_ball = ChaseBall()
    chase_ball.run()

์ฃผ์„์—๋„ ๋‚˜์™€ ์žˆ๋“ฏ์ด ์ด node๋Š” โ‡’ /blob/point_blob๋ฅผ Subscribes ๋ฐ›์•„์„œ

โ‡’ Twist ํ˜•์‹์œผ๋กœ ๋ณ€ํ™˜ํ•œ ํ›„,

โ‡’ /dkcar/control/cmd_vel์—๊ฒŒ Publish ํ•ฉ๋‹ˆ๋‹ค.

  • import
import math, time
import rospy
from geometry_msgs.msg import Twist
from geometry_msgs.msg import Point

  • /blob/point_blob ๋Š” geometry_msgs/Pointํ˜•์‹์„ ๊ฐ–๋Š”๋ฐ์š”, ์–ด๋–ป๊ฒŒ ์ƒ๊ฒผ์„๊นŒ์š”?
$ rosmsg show geometry_msgs/Point
float64 x
float64 y
float64 z

๋“ค์–ด์˜ค๊ฒŒ ๋˜๋Š” x,y,z ๊ฐ€ ์–ด๋– ํ•œ ์˜๋ฏธ๋ฅผ ๊ฐ–๋Š”์ง€๋Š” ๊ทธ ์•ž๋‹จ์ธ find_ball์—์„œ ์˜๋ฏธ๋ฅผ ๊ฐ–์Šต๋‹ˆ๋‹ค.

์ง€๊ธˆ์€ ์šฐ์„  ์ „์ฒด์ ์ธ ํ๋ฆ„์„ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

  • main & run
    def run(self):

        # --- Set the control rate
        rate = rospy.Rate(5)

        while not rospy.is_shutdown():
            # -- Get the control action
            steer_action, throttle_action = self.get_control_action()

            rospy.loginfo("Steering = %3.1f" % (steer_action))

            # -- update the message
            self._message.linear.x = throttle_action
            self._message.angular.z = steer_action

            # -- publish it
            self.pub_twist.publish(self._message)

            rate.sleep()


if __name__ == "__main__":

    rospy.init_node("chase_ball")

    chase_ball = ChaseBall()
    chase_ball.run()

run์˜ ํ•˜๋‹จ๋ถ€๋Š” msg๋ฅผ ์ฑ„์šฐ๊ณ , publish๋ฅผ ํ•˜๋Š” ๋ถ€๋ถ„์ด๊ธฐ์— topic ๊ฐ•์˜๋ฅผ ์ˆ˜๊ฐ•ํ•˜์…จ๋‹ค๋ฉด ๋ชจ๋‘ ์ดํ•ดํ•˜์‹ค ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์šฐ๋ฆฌ๋Š” ์ƒ๋‹จ์˜ get_control_action ํ•จ์ˆ˜๋ฅผ ์ค‘์ ์ ์œผ๋กœ ์‚ดํ”ผ๊ฒ ์Šต๋‹ˆ๋‹ค.

  • get_control_action
    def get_control_action(self):
        """
        Based on the current ranges, calculate the command

        Steer will be added to the commanded throttle
        throttle will be multiplied by the commanded throttle
        """
        steer_action = 0.0
        throttle_action = 0.0

        if self.is_detected:
            # --- Apply steering, proportional to how close is the object
            steer_action = -K_LAT_DIST_TO_STEER * self.blob_x
            steer_action = saturate(steer_action, -1.5, 1.5)
            rospy.loginfo("Steering command %.2f" % steer_action)
            throttle_action = 1.0

        return (steer_action, throttle_action)

K_LAT_DIST_TO_STEER ๊ฐ€ ๋ฌด์—‡์ผ๊นŒ์š”? ์˜ˆ๋ฅผ ๋“ค์–ด ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

๊ฒฐ๋ก ๋ถ€ํ„ฐ ๋ง์”€๋“œ๋ฆฌ์ž๋ฉด, blob_x๋Š” -1 ~ 1์„ ๋„˜์„ ์ˆ˜ ์—†๋Š” ๊ฐ’์ž…๋‹ˆ๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด, K_LAT_DIST_TO_STEER = 2.0์ธ ํ˜„ ์ƒํ™ฉ์—์„œ

โ‡’ steer_action๋Š” -2.0 ~ 2.0์˜ ๊ฐ’์„ ๊ฐ€์ง„๋‹ค๊ณ  ํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ์š”, ์ด๊ฒƒ์„ saturate์‹œํ‚ค๋ฏ€๋กœ, ๋‹ค์‹œ

โ‡’ -1.5 ~ 1.5์˜ ๊ฐ’์„ ๊ฐ–๋Š” steer_action์„ ์–ป๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

            # -- update the message
            self._message.linear.x = throttle_action
            self._message.angular.z = steer_action

            # -- publish it
            self.pub_twist.publish(self._message)

์ด steer_action ๊ฐ’์„ Twist์— ๋‹ด์•„ ์ตœ์ข…์ ์œผ๋กœ publishํ•˜๊ฒŒ ๋˜๋ฉฐ,

๊ทธ๋ ‡๋‹ค๋ฉด K_LAT_DIST_TO_STEER๋ž€,

์–ผ๋งˆ๋‚˜ ๋ฏผ๊ฐํ•˜๊ฒŒ ์ ฏ์Šจ ๋ฐ•์Šค์— ๋ฐ˜์‘ํ•  ๊ฒƒ์ธ์ง€๋ฅผ ๊ฒฐ์ •์ง“๋Š” ์ƒ์ˆ˜๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค!
๐Ÿ’ก
K_LAT_DIST_TO_STEER ๊ฐ€ ํฌ๋‹ค๋ฉด, ์ ฏ์Šจ ๋ฐ•์Šค๊ฐ€ ์กฐ๊ธˆ๋งŒ ์ค‘์‹ฌ์—์„œ ์˜ค๋ฅธ์ชฝ์œผ๋กœ ์ด๋™ํ•ด๋„ ๋ฐ”๋กœ ๋ฐ”ํ€ด๊ฐ€ ์˜ค๋ฅธ์ชฝ์œผ๋กœ ํ™• ๊บพ์ด๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

์•„์ง๊นŒ์ง„ ํฌ๊ฒŒ ์™€๋‹ฟ์ง€ ์•Š์œผ์‹ค ๊ฒƒ ๊ฐ™์€๋ฐ์š”, ์„œ๋‘˜๋Ÿฌ ๋งˆ์ง€๋ง‰ find_ball๋กœ ๋„˜์–ด๊ฐ€ ๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

find_ball


#!/usr/bin/env python

"""
ON THE RASPI: roslaunch raspicam_node camerav2_320x240.launch enable_raw:=true

   0------------------> x (cols) Image Frame
   |
   |        c    Camera frame
   |         o---> x
   |         |
   |         V y
   |
   V y (rows)


referenced from tizianofiorenzani/ros_tutorials
url: https://github.com/tizianofiorenzani/ros_tutorials

SUBSCRIBES TO:
    /raspicam_node/image: Source image topic
    
PUBLISHES TO:
    /blob/image_blob : image with detected blob and search window
    /blob/image_mask : masking    
    /blob/point_blob : blob position in adimensional values wrt. camera frame

"""


# --- Allow relative importing
if __name__ == "__main__" and __package__ is None:
    from os import sys, path

    sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))

import sys
import rospy
import cv2
import time

from std_msgs.msg import String
from sensor_msgs.msg import Image
from geometry_msgs.msg import Point
from cv_bridge import CvBridge, CvBridgeError
from include.blob_detector import *


class BlobDetector:
    def __init__(
        self, thr_min, thr_max, blur=15, blob_params=None, detection_window=None
    ):

        self.set_threshold(thr_min, thr_max)
        self.set_blur(blur)
        self.set_blob_params(blob_params)
        self.detection_window = detection_window

        self._t0 = time.time()

        self.blob_point = Point()

        print(">> Publishing image to topic image_blob")
        self.image_pub = rospy.Publisher("/blob/image_blob", Image, queue_size=1)
        self.mask_pub = rospy.Publisher("/blob/image_mask", Image, queue_size=1)
        print(">> Publishing position to topic point_blob")
        self.blob_pub = rospy.Publisher("/blob/point_blob", Point, queue_size=1)

        self.bridge = CvBridge()
        self.image_sub = rospy.Subscriber("/csi_image", Image, self.callback)
        print("<< Subscribed to topic /csi_image")

(...)

def main(args):

    pink_min = (135, 41, 95)
    pink_max = (255, 196, 255)

    green_min = (39, 81, 71)
    green_max = (75, 255, 255)

    blur = 5
    min_size = 10
    max_size = 40

    # --- detection window respect to camera frame in [x_min, y_min, x_max, y_max] adimensional (0 to 1)
    x_min = 0.1
    x_max = 0.9
    y_min = 0.1
    y_max = 0.9

    detection_window = [x_min, y_min, x_max, y_max]

    params = cv2.SimpleBlobDetector_Params()

    # Change thresholds
    params.minThreshold = 0
    params.maxThreshold = 200

    # Filter by Area.
    params.filterByArea = True
    params.minArea = 2000
    params.maxArea = 70000  # 640 * 480 = 307,200

    # Filter by Circularity
    params.filterByCircularity = True
    params.minCircularity = 0.1

    # Filter by Convexity
    params.filterByConvexity = True
    params.minConvexity = 0.2

    # Filter by Inertia
    params.filterByInertia = True
    params.minInertiaRatio = 0.7

    rospy.init_node("blob_detector", anonymous=True)
    ic = BlobDetector(green_min, green_max, blur, params, detection_window)
    try:
        rospy.spin()
    except KeyboardInterrupt:
        print("Shutting down")

    cv2.destroyAllWindows()


if __name__ == "__main__":
    main(sys.argv)

์ด๋ฒˆ ์ฝ”๋“œ๋„ ๊ธธ์ด๊ฐ€ ๊ต‰์žฅํžˆ ๊ธธ์ง€์š”? ๐Ÿ˜‚๐Ÿ˜‚

์ฝ”๋“œ ๋ถ„์„ ์ „์—, ์šฐ๋ฆฌ ์žฌ๋ฏธ์žˆ๋Š” ์‹ค์Šต์„ ํ•˜๋‚˜ ํ•ด๋ณด์•„์š”

Blob Thresholding


$ cd ~/catkin_ws/src/donkey_ros/donkey_cv/include/
$ python range_detector.py --image frame0000.jpg --filter HSV --preview
ํ•˜๋‹จ์— ์ œ๊ฐ€ ํ•ด๋‘” ๋‹ต์„ ๋ณด๊ธฐ ์ „์—, H, S, V Min/Max ๊ฐ’๋“ค์„ ๊ณ„์†ํ•ด์„œ ๋ฐ”๊ฟ”๊ฐ€๋ฉด์„œ, ์–ด๋– ํ•œ ๋ณ€ํ™”๊ฐ€ ์ƒ๊ธฐ๋Š”์ง€ ์‚ดํŽด๋ณด์„ธ์š”!!

  • my_value

HSV๊ฐ€ ๋ญ”๊ฐ€์š” โ“โ“โ“


์ƒ‰์ƒ์„ ํ‘œํ˜„ํ•˜๋Š” ๋Œ€ํ‘œ์ ์ธ ๋ฐฉ์‹์œผ๋กœ, ๋ชจ๋‘๊ฐ€ ์•Œ๊ณ  ์žˆ๋Š” RGB๋ฐฉ์‹์ด ์žˆ์ง€์š”? ์ด์™€ ๋”๋ถˆ์–ด,

์ƒ‰์ƒ(Hue), ์ฑ„๋„(Saturation), ๋ช…๋„(Value) ์˜ ๊ฐ’์œผ๋กœ ์ƒ‰์ƒ์„ ํ‘œํ˜„ํ•˜๋Š” ์ฒด๊ณ„๊ฐ€ ์žˆ๋‹ต๋‹ˆ๋‹ค.

image from : wikipedia

image from : wikipedia

ppt๋‚˜ ํฌํ† ์ƒต์„ ์‚ฌ์šฉํ•˜๋‹ค๋ณด๋ฉด, ์ด๋ ‡๊ฒŒ ์ƒ‰์ƒ์„ ๊ณ ๋ฅด๋Š” gradation์„ ๋งŽ์ด ๋ณด์…จ์ง€์š”?

๋ฐ”๋กœ ์ด ๊ฒƒ์ด Hue/Saturation์— ํ•ด๋‹นํ•ฉ๋‹ˆ๋‹ค.

image from : wikipedia

์ด ์„ธ๊ฐ€์ง€ ๊ฐ’์„ ์ˆ˜์ •ํ•˜๋ฉด์„œ, ์ ฏ์Šจ ๋ฐ•์Šค๋งŒ์„ ๊ฑธ๋Ÿฌ์ฃผ๋Š” ๋ฒ”์œ„๋ฅผ ์„ ๋ณ„ํ•˜๊ณ , ์ด๊ฒƒ์„ ํ†ตํ•ด ๋ฌผ์ฒด๋ฅผ ๋”ฐ๋ผ๊ฐ€๋„๋ก ํ•  ์ˆ˜ ์žˆ์—ˆ๋˜ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๐Ÿ˜‰๐Ÿ˜‰ ์ฝ”๋“œ์ƒ์—์„œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ํ‘œํ˜„๋˜์—ˆ์ง€์š”.

    green_min = (39, 81, 71)
    green_max = (75, 255, 255)

์—ฌ๋Ÿฌ๋ถ„๋“ค๋„ ์ง์ ‘ ์‚ฌ์ง„์„ ์ดฌ์˜ํ•˜์—ฌ ๋‹ค๋ฅธ ํ•„ํ„ฐ๋ฅผ ๋งŒ๋“ค์–ด๋ณด์„ธ์š”
$ roscore
$ rosrun csi_camera csi_pub.py
$ rosrun image_view image_view image:=/csi_image