Autonomous Mapping Robot

ROSPython

This project is an implementation of SLAM (Simultaneous Localization and Mapping). The robot uses a LIDAR sensor to scan its environment and search out new areas of the map to explore.

The video above covers SLAM, how a LIDAR sensor works, frontier exploration, pathfinding, pure pursuit, obstacle avoidance, and Monte Carlo localization.

This project was part, for one of my robotics courses, RBE 3002 - Unified Robotics IV.

Below are some code snippets for various ROS messages and nodes used to make this project.

Frontier.msg

ROS message for a frontier that keeps track of its size in grid cells and its centroid as a point.

uint32 size geometry_msgs/Point centroid

FrontierList.msg

ROS message for a list of frontiers.

lab4/Frontier[] frontiers

frontier_search.py

ROS node that constantly checks for new frontiers and publishes them using Expanding Wavefront Frontier Detection.

#!/usr/bin/env python3 from path_planner import PathPlanner from nav_msgs.msg import OccupancyGrid from lab4.msg import Frontier, FrontierList class FrontierSearch: @staticmethod def search( mapdata: OccupancyGrid, start: "tuple[int, int]", include_frontier_cells: bool = False, ) -> "tuple[FrontierList, list[tuple[int, int]]]": MIN_FRONTIER_SIZE = 8 # Number of cells # Create queue for breadth-first search queue = [] queue.append(start) # Initialize dictionaries for keeping track of visited and frontier cells visited = {} is_frontier = {} visited[start] = True # Initialize list of frontiers frontiers = [] # Initialize list of frontier cells frontier_cells = [] while queue: current = queue.pop(0) for neighbor in PathPlanner.neighbors_of_4(mapdata, current): neighbor_value = PathPlanner.get_cell_value(mapdata, neighbor) if neighbor_value >= 0 and not neighbor in visited: visited[neighbor] = True queue.append(neighbor) elif FrontierSearch.is_new_frontier_cell( mapdata, neighbor, is_frontier ): # Mark as frontier is_frontier[neighbor] = True # Build new frontier new_frontier, new_frontier_cells = ( FrontierSearch.build_new_frontier( mapdata, neighbor, is_frontier, include_frontier_cells, ) ) if new_frontier.size >= MIN_FRONTIER_SIZE: frontiers.append(new_frontier) if include_frontier_cells: frontier_cells.extend(new_frontier_cells) return (FrontierList(frontiers=frontiers), frontier_cells) @staticmethod def build_new_frontier( mapdata: OccupancyGrid, initial_cell: "tuple[int, int]", is_frontier: dict, include_frontier_cells: bool = False, ) -> "tuple[Frontier, list[tuple[int, int]]]": # Initialize frontier fields size = 1 centroid_x = initial_cell[0] centroid_y = initial_cell[1] # Create queue for breadth-first search queue = [] queue.append(initial_cell) # Initialize list of frontier cells frontier_cells = [] # Breadth-first search for frontier cells while queue: current = queue.pop(0) if include_frontier_cells: frontier_cells.append(current) for neighbor in PathPlanner.neighbors_of_8(mapdata, current): if FrontierSearch.is_new_frontier_cell(mapdata, neighbor, is_frontier): # Mark as frontier is_frontier[neighbor] = True # Update size and centroid size += 1 centroid_x += neighbor[0] centroid_y += neighbor[1] queue.append(neighbor) # Calculate centroid by taking the average centroid_x /= size centroid_y /= size # Make and return new frontier centroid = PathPlanner.grid_to_world( mapdata, (int(centroid_x), int(centroid_y)) ) return ( Frontier(size=size, centroid=centroid), frontier_cells, ) @staticmethod def is_new_frontier_cell( mapdata: OccupancyGrid, cell: "tuple[int, int]", is_frontier: dict ) -> bool: # Cell must be unknown and not already a frontier if PathPlanner.get_cell_value(mapdata, cell) != -1 or cell in is_frontier: return False # Cell should have at least one connected cell that is free WALKABLE_THRESHOLD = 50 for neighbor in PathPlanner.neighbors_of_4(mapdata, cell): neighbor_value = PathPlanner.get_cell_value(mapdata, neighbor) if neighbor_value >= 0 and neighbor_value < WALKABLE_THRESHOLD: return True return False

frontier_exploration.py

ROS node that listens for a FrontierList published by the FrontierSearch node and picks one to explore.

It uses a combination of A* cost and frontier size to decide which frontier is the best.

Once a frontier has been selected, this node publishs a path to reach the frontier using the PathPlanner class.

#!/usr/bin/env python3 import os import rospy import rospkg import threading import subprocess import numpy as np from typing import Union from path_planner import PathPlanner from frontier_search import FrontierSearch from nav_msgs.msg import OccupancyGrid, Path, GridCells, Odometry from geometry_msgs.msg import Pose, Point, Quaternion from lab4.msg import FrontierList from tf import TransformListener from tf.transformations import euler_from_quaternion class FrontierExploration: def **init**(self): """ Class constructor """ rospy.init_node("frontier_exploration") # Set if in debug mode self.is_in_debug_mode = ( rospy.has_param("~debug") and rospy.get_param("~debug") == "true" ) # Publishers self.pure_pursuit_pub = rospy.Publisher( "/pure_pursuit/path", Path, queue_size=10 ) if self.is_in_debug_mode: self.frontier_cells_pub = rospy.Publisher( "/frontier_exploration/frontier_cells", GridCells, queue_size=10 ) self.start_pub = rospy.Publisher( "/frontier_exploration/start", GridCells, queue_size=10 ) self.goal_pub = rospy.Publisher( "/frontier_exploration/goal", GridCells, queue_size=10 ) self.cspace_pub = rospy.Publisher("/cspace", GridCells, queue_size=10) self.cost_map_pub = rospy.Publisher( "/cost_map", OccupancyGrid, queue_size=10 ) # Subscribers rospy.Subscriber("/odom", Odometry, self.update_odometry) rospy.Subscriber("/map", OccupancyGrid, self.update_map) self.tf_listener = TransformListener() self.lock = threading.Lock() self.pose = None self.map = None self.NUM_EXPLORE_FAILS_BEFORE_FINISH = 30 self.no_path_found_counter = 0 self.no_frontiers_found_counter = 0 self.is_finished_exploring = False def update_odometry(self, msg: "Union[Odometry, None]" = None): """ Updates the current pose of the robot. """ try: (trans, rot) = self.tf_listener.lookupTransform( "/map", "/base_footprint", rospy.Time(0) ) except: return self.pose = Pose( position=Point(x=trans[0], y=trans[1]), orientation=Quaternion(x=rot[0], y=rot[1], z=rot[2], w=rot[3]), ) def update_map(self, msg: OccupancyGrid): """ Updates the current map. This method is a callback bound to a Subscriber. :param msg [OccupancyGrid] The current map information. """ self.map = msg def save_map(self): # Get the path of the current package rospack = rospkg.RosPack() package_path = rospack.get_path("lab4") # Construct the path to the map map_path = os.path.join(package_path, "map/map") if not os.path.exists(os.path.dirname(map_path)): os.makedirs(os.path.dirname(map_path)) # Run map_saver subprocess.call(["rosrun", "map_server", "map_saver", "-f", map_path]) self.update_odometry() if self.pose is None: rospy.logerr("Failed to get pose") return # Save the robot's position and orientation position = self.pose.position orientation = self.pose.orientation roll, pitch, yaw = euler_from_quaternion( [orientation.x, orientation.y, orientation.z, orientation.w] ) with open(os.path.join(package_path, "map/pose.txt"), "w") as f: f.write(f"{position.x} {position.y} {position.z} {yaw} {pitch} {roll}\n") @staticmethod def get_top_frontiers(frontiers, n): # Sort the frontiers by size in descending order sorted_frontiers = sorted( frontiers, key=lambda frontier: frontier.size, reverse=True ) # Return the top n frontiers return sorted_frontiers[:n] def publish_cost_map(self, mapdata: OccupancyGrid, cost_map: np.ndarray): # Create an OccupancyGrid message grid = OccupancyGrid() grid.header.stamp = rospy.Time.now() grid.header.frame_id = "map" grid.info.resolution = mapdata.info.resolution grid.info.width = cost_map.shape[1] grid.info.height = cost_map.shape[0] grid.info.origin = mapdata.info.origin # Normalize the cost map to the range [0, 100] and convert it to integers cost_map_normalized = (cost_map / np.max(cost_map) * 100).astype(np.int8) # Flatten the cost map and convert it to a list grid.data = cost_map_normalized.flatten().tolist() # Publish the OccupancyGrid message self.cost_map_pub.publish(grid) def check_if_finished_exploring(self): # Publish empty path to stop the robot self.pure_pursuit_pub.publish(Path()) # If no frontiers or paths are found for a certain number of times, finish exploring if ( self.no_frontiers_found_counter >= self.NUM_EXPLORE_FAILS_BEFORE_FINISH or self.no_path_found_counter >= self.NUM_EXPLORE_FAILS_BEFORE_FINISH ): rospy.loginfo("Done exploring!") self.save_map() rospy.loginfo("Saved map") self.is_finished_exploring = True def explore_frontier(self, frontier_list: FrontierList): # If finished exploring, no pose, no map, or no frontier list, return if self.is_finished_exploring or self.pose is None or self.map is None: return frontiers = frontier_list.frontiers # If no frontiers are found, check if finished exploring if not frontiers: rospy.loginfo("No frontiers") self.no_frontiers_found_counter += 1 self.check_if_finished_exploring() return else: self.no_frontiers_found_counter = 0 A_STAR_COST_WEIGHT = 10.0 FRONTIER_SIZE_COST_WEIGHT = 1.0 # Calculate the C-space cspace, cspace_cells = PathPlanner.calc_cspace(self.map, self.is_in_debug_mode) # if cspace_cells is not None: # self.cspace_pub.publish(cspace_cells) # Calculate the cost map cost_map = PathPlanner.calc_cost_map(self.map) if self.is_in_debug_mode: self.publish_cost_map(self.map, cost_map) # Get the start start = PathPlanner.world_to_grid(self.map, self.pose.position) # Execute A* for every frontier lowest_cost = float("inf") best_path = None # Check only the top frontiers in terms of size MAX_NUM_FRONTIERS_TO_CHECK = 8 top_frontiers = FrontierExploration.get_top_frontiers( frontiers, MAX_NUM_FRONTIERS_TO_CHECK ) starts = [] goals = [] # Log how many frontiers are being explored rospy.loginfo(f"Exploring {len(top_frontiers)} frontiers") for frontier in top_frontiers: # Get goal goal = PathPlanner.world_to_grid(self.map, frontier.centroid) # Execute A* path, a_star_cost, start, goal = PathPlanner.a_star( cspace, cost_map, start, goal ) # If in debug mode, append start and goal if self.is_in_debug_mode: starts.append(start) goals.append(goal) if path is None or a_star_cost is None: continue # Calculate cost cost = (A_STAR_COST_WEIGHT * a_star_cost) + ( FRONTIER_SIZE_COST_WEIGHT / frontier.size ) # Update best path if cost < lowest_cost: lowest_cost = cost best_path = path # If in debug mode, publish the start and goal if self.is_in_debug_mode: self.start_pub.publish(PathPlanner.get_grid_cells(self.map, starts)) self.goal_pub.publish(PathPlanner.get_grid_cells(self.map, goals)) # If a path was found, publish it if best_path: rospy.loginfo(f"Found best path with cost {lowest_cost}") start = best_path[0] path = PathPlanner.path_to_message(self.map, best_path) self.pure_pursuit_pub.publish(path) self.no_path_found_counter = 0 # If no path was found, check if finished exploring else: rospy.loginfo("No paths found") self.no_path_found_counter += 1 self.check_if_finished_exploring() def run(self): rate = rospy.Rate(20) # Hz while not rospy.is_shutdown(): if self.pose is None or self.map is None: continue # Get the start position of the robot start = PathPlanner.world_to_grid(self.map, self.pose.position) # Get frontiers frontier_list, frontier_cells = FrontierSearch.search( self.map, start, self.is_in_debug_mode ) if frontier_list is None: continue # Publish frontier cells if in debug mode if self.is_in_debug_mode: self.frontier_cells_pub.publish( PathPlanner.get_grid_cells(self.map, frontier_cells) ) self.explore_frontier(frontier_list) rate.sleep() if **name** == "**main**": FrontierExploration().run()

path_planner.py

Helper class used to perform an A* search on a grid cell map.

#!/usr/bin/env python3 import rospy import math import cv2 import numpy as np from typing import Union from std_msgs.msg import Header from nav_msgs.msg import GridCells, OccupancyGrid, Path from geometry_msgs.msg import Point, Quaternion, Pose, PoseStamped from priority_queue import PriorityQueue from tf.transformations import quaternion_from_euler DIRECTIONS_OF_4 = [(-1, 0), (1, 0), (0, -1), (0, 1)] DIRECTIONS_OF_8 = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)] class PathPlanner: @staticmethod def grid_to_index(mapdata: OccupancyGrid, p: "tuple[int, int]") -> int: """ Returns the index corresponding to the given (x,y) coordinates in the occupancy grid. :param p [(int, int)] The cell coordinate. :return [int] The index. """ return p[1] * mapdata.info.width + p[0] @staticmethod def get_cell_value(mapdata: OccupancyGrid, p: "tuple[int, int]") -> int: """ Returns the cell corresponding to the given (x,y) coordinates in the occupancy grid. :param p [(int, int)] The cell coordinate. :return [int] The cell. """ return mapdata.data[PathPlanner.grid_to_index(mapdata, p)] @staticmethod def euclidean_distance( p1: "tuple[float, float]", p2: "tuple[float, float]" ) -> float: """ Calculates the Euclidean distance between two points. :param p1 [(float, float)] first point. :param p2 [(float, float)] second point. :return [float] distance. """ return math.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2) @staticmethod def grid_to_world(mapdata: OccupancyGrid, p: "tuple[int, int]") -> Point: """ Transforms a cell coordinate in the occupancy grid into a world coordinate. :param mapdata [OccupancyGrid] The map information. :param p [(int, int)] The cell coordinate. :return [Point] The position in the world. """ x = (p[0] + 0.5) * mapdata.info.resolution + mapdata.info.origin.position.x y = (p[1] + 0.5) * mapdata.info.resolution + mapdata.info.origin.position.y return Point(x, y, 0) @staticmethod def world_to_grid(mapdata: OccupancyGrid, wp: Point) -> "tuple[int, int]": """ Transforms a world coordinate into a cell coordinate in the occupancy grid. :param mapdata [OccupancyGrid] The map information. :param wp [Point] The world coordinate. :return [(int,int)] The cell position as a tuple. """ x = int((wp.x - mapdata.info.origin.position.x) / mapdata.info.resolution) y = int((wp.y - mapdata.info.origin.position.y) / mapdata.info.resolution) return (x, y) @staticmethod def path_to_poses( mapdata: OccupancyGrid, path: "list[tuple[int, int]]" ) -> "list[PoseStamped]": """ Converts the given path into a list of PoseStamped. :param mapdata [OccupancyGrid] The map information. :param path [[(int,int)]] The path as a list of tuples (cell coordinates). :return [[PoseStamped]] The path as a list of PoseStamped (world coordinates). """ poses = [] for i in range(len(path) - 1): cell = path[i] next_cell = path[i + 1] if i != len(path) - 1: angle_to_next = math.atan2( next_cell[1] - cell[1], next_cell[0] - cell[0] ) q = quaternion_from_euler(0, 0, angle_to_next) poses.append( PoseStamped( header=Header(frame_id="map"), pose=Pose( position=PathPlanner.grid_to_world(mapdata, cell), orientation=Quaternion(q[0], q[1], q[2], q[3]), ), ) ) return poses @staticmethod def is_cell_in_bounds(mapdata: OccupancyGrid, p: "tuple[int, int]") -> bool: width = mapdata.info.width height = mapdata.info.height x = p[0] y = p[1] if x < 0 or x >= width: return False if y < 0 or y >= height: return False return True @staticmethod def is_cell_walkable(mapdata: OccupancyGrid, p: "tuple[int, int]") -> bool: """ A cell is walkable if all of these conditions are true: 1. It is within the boundaries of the grid; 2. It is free (not occupied by an obstacle) :param mapdata [OccupancyGrid] The map information. :param p [(int, int)] The coordinate in the grid. :return [bool] True if the cell is walkable, False otherwise """ if not PathPlanner.is_cell_in_bounds(mapdata, p): return False WALKABLE_THRESHOLD = 50 return PathPlanner.get_cell_value(mapdata, p) < WALKABLE_THRESHOLD @staticmethod def neighbors( mapdata: OccupancyGrid, p: "tuple[int, int]", directions: "list[tuple[int, int]]", must_be_walkable: bool = True, ) -> "list[tuple[int, int]]": """ Returns the neighbors cells of (x,y) in the occupancy grid given directions to check. :param mapdata [OccupancyGrid] The map information. :param p [(int, int)] The coordinate in the grid. :param directions [[(int,int)]] A list of directions to check for neighbors. :param must_be_walkable [bool] Whether or not the cells must be walkable :return [[(int,int)]] A list of 4-neighbors. """ neighbors = [] for direction in directions: candidate = (p[0] + direction[0], p[1] + direction[1]) if ( must_be_walkable and PathPlanner.is_cell_walkable(mapdata, candidate) ) or ( not must_be_walkable and PathPlanner.is_cell_in_bounds(mapdata, candidate) ): neighbors.append(candidate) return neighbors @staticmethod def neighbors_of_4( mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True ) -> "list[tuple[int, int]]": return PathPlanner.neighbors(mapdata, p, DIRECTIONS_OF_4, must_be_walkable) @staticmethod def neighbors_of_8( mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True ) -> "list[tuple[int, int]]": return PathPlanner.neighbors(mapdata, p, DIRECTIONS_OF_8, must_be_walkable) @staticmethod def neighbors_and_distances( mapdata: OccupancyGrid, p: "tuple[int, int]", directions: "list[tuple[int, int]]", must_be_walkable: bool = True, ) -> "list[tuple[tuple[int, int], float]]": """ Returns the neighbors cells of (x,y) in the occupancy grid given directions to check and their distances. :param mapdata [OccupancyGrid] The map information. :param p [(int, int)] The coordinate in the grid. :param directions [[(int,int)]] A list of directions to check for neighbors. :param must_be_walkable [bool] Whether or not the cells must be walkable :return [[(int,int)]] A list of 4-neighbors. """ neighbors = [] for direction in directions: candidate = (p[0] + direction[0], p[1] + direction[1]) if not must_be_walkable or PathPlanner.is_cell_walkable(mapdata, candidate): distance = PathPlanner.euclidean_distance(direction, (0, 0)) neighbors.append((candidate, distance)) return neighbors @staticmethod def neighbors_and_distances_of_4( mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True ) -> "list[tuple[tuple[int, int], float]]": return PathPlanner.neighbors_and_distances( mapdata, p, DIRECTIONS_OF_4, must_be_walkable ) @staticmethod def neighbors_and_distances_of_8( mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True ) -> "list[tuple[tuple[int, int], float]]": return PathPlanner.neighbors_and_distances( mapdata, p, DIRECTIONS_OF_8, must_be_walkable ) @staticmethod def get_grid_cells( mapdata: OccupancyGrid, cells: "list[tuple[int, int]]" ) -> GridCells: world_cells = [] for cell in cells: world_cells.append(PathPlanner.grid_to_world(mapdata, cell)) resolution = mapdata.info.resolution return GridCells( header=Header(frame_id="map"), cell_width=resolution, cell_height=resolution, cells=world_cells, ) @staticmethod def calc_cspace( mapdata: OccupancyGrid, include_cells: bool ) -> "tuple[OccupancyGrid, Union[GridCells, None]]": """ Calculates the C-Space, i.e., makes the obstacles in the map thicker. Publishes the list of cells that were added to the original map. :param mapdata [OccupancyGrid] The map data. :return [OccupancyGrid] The C-Space. """ PADDING = 5 # The number of cells around the obstacles # Create numpy grid from mapdata width = mapdata.info.width height = mapdata.info.height map = np.array(mapdata.data).reshape(width, height).astype(np.uint8) # Get mask of unknown areas unknown_area_mask = cv2.inRange( map, 255, 255 ) # -1 overflows to 255 when cast to uint8 kernel = np.ones((PADDING, PADDING), dtype=np.uint8) unknown_area_mask = cv2.erode(unknown_area_mask, kernel, iterations=1) # Change unknown areas to free space map[map == 255] = 0 # Inflate the obstacles where necessary kernel = np.ones((PADDING, PADDING), np.uint8) obstacle_mask = cv2.dilate(map, kernel, iterations=1) cspace_data = cv2.bitwise_or(obstacle_mask, unknown_area_mask) cspace_data = np.array(cspace_data).reshape(width * height).tolist() # Return the C-space cspace = OccupancyGrid( header=mapdata.header, info=mapdata.info, data=cspace_data ) # Return the cells that were added to the original map cspace_cells = None if include_cells: cells = [] # Find the indices of the obstacle cells obstacle_indices = np.where(obstacle_mask > 0) # Convert the indices to cell coordinates and append them to cells for y, x in zip(*obstacle_indices): cells.append((x, y)) cspace_cells = PathPlanner.get_grid_cells(mapdata, cells) return (cspace, cspace_cells) @staticmethod def get_cost_map_value(cost_map: np.ndarray, p: "tuple[int, int]") -> int: return cost_map[p[1]][p[0]] @staticmethod def show_map(name: str, map: np.ndarray): normalized = cv2.normalize( map, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U ) cv2.imshow(name, normalized) cv2.waitKey(0) @staticmethod def calc_cost_map(mapdata: OccupancyGrid) -> np.ndarray: rospy.loginfo("Calculating cost map") # Create numpy array from mapdata width = mapdata.info.width height = mapdata.info.height map = np.array(mapdata.data).reshape(height, width).astype(np.uint8) map[map == 255] = 100 # Iteratively dilate the walls until no changes are made cost_map = np.zeros_like(map) dilated_map = map.copy() iterations = 0 kernel = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], np.uint8) while np.any(dilated_map == 0): # Increase iterations iterations += 1 # Dilate the map next_dilated_map = cv2.dilate(dilated_map, kernel, iterations=1) # Get the difference between the next dilated map and the current one to get an outline difference = next_dilated_map - dilated_map # Assign all non-zero cells in the outline their cost difference[difference > 0] = iterations # Add the outline to the cost map cost_map = cv2.bitwise_or(cost_map, difference) # Update dilated map dilated_map = next_dilated_map # PathPlanner.show_map("wall_dilation", cost_map) # Turn the cost map into a mask of only the middle of the hallways # All cells that are not in the middle of the hallways will have a cost of 0 # Cells that are in the middle of the hallways will have a cost of 1 cost_map = PathPlanner.create_hallway_mask(mapdata, cost_map, iterations // 4) # PathPlanner.show_map("hallway_mask", cost_map) # Iteratively dilate the hallway mask until no changes are made dilated_map = cost_map.copy() cost = 1 for i in range(iterations): # Increase cost cost += 1 # Dilate the map next_dilated_map = cv2.dilate(dilated_map, kernel, iterations=1) # Get the difference between the next dilated map and the current one to get an outline difference = next_dilated_map - dilated_map # Assign all non-zero cells in the outline their cost difference[difference > 0] = cost # Add the outline to the cost map cost_map = cv2.bitwise_or(cost_map, difference) # Update dilated map dilated_map = next_dilated_map # Subtract 1 from all non-zero values in cost map cost_map[cost_map > 0] -= 1 # PathPlanner.show_map("cost_map", cost_map) return cost_map @staticmethod def create_hallway_mask( mapdata: OccupancyGrid, cost_map: np.ndarray, threshold: int ) -> np.ndarray: """ Create a mask of the cost_map that only contains cells that are hallway cells. """ # Initialize the mask with the same shape as the cost_map and all values set to False mask = np.zeros_like(cost_map, dtype=bool) # Get the indices of the non-zero cells in the cost_map non_zero_indices = np.nonzero(cost_map) # Iterate over the non-zero cells in the cost_map for y, x in zip(*non_zero_indices): # If the cell is a hallway cell, set the corresponding cell in the mask to True if PathPlanner.is_hallway_cell(mapdata, cost_map, (x, y), threshold): mask[y][x] = 1 return mask.astype(np.uint8) @staticmethod def is_hallway_cell( mapdata: OccupancyGrid, cost_map: np.ndarray, p: "tuple[int, int]", threshold: int, ) -> bool: """ Determine whether a cell is a "hallway cell" meaning it has a cost greater than or equal to all of its neighbors """ cost_map_value = PathPlanner.get_cost_map_value(cost_map, p) for neighbor in PathPlanner.neighbors_of_8(mapdata, p, False): neighbor_cost_map_value = PathPlanner.get_cost_map_value(cost_map, neighbor) if ( neighbor_cost_map_value < threshold or neighbor_cost_map_value > cost_map_value ): return False return True @staticmethod def get_first_walkable_neighbor( mapdata, start: "tuple[int, int]" ) -> "tuple[int, int]": """ Helper function for a_star that gets the first walkable neighbor from the start cell in case it is not already walkable :param mapdata [OccupancyGrid] The map data. :param padding [start] The start cell. :return [(int, int)] The first walkable neighbor. """ # Create queue for breadth-first search queue = [] queue.append(start) # Initialize dictionary for keeping track of visited cells visited = {} while queue: current = queue.pop(0) if PathPlanner.is_cell_walkable(mapdata, current): return current for neighbor in PathPlanner.neighbors_of_4(mapdata, current, False): visited[neighbor] = True queue.append(neighbor) # If nothing found, just return original start cell return start @staticmethod def a_star( mapdata: OccupancyGrid, cost_map: np.ndarray, start: "tuple[int, int]", goal: "tuple[int, int]", ) -> "tuple[Union[list[tuple[int, int]], None], Union[float, None], tuple[int, int], tuple[int, int]]": COST_MAP_WEIGHT = 1000 # If the start cell is not walkable, get the first walkable neighbor instead if not PathPlanner.is_cell_walkable(mapdata, start): start = PathPlanner.get_first_walkable_neighbor(mapdata, start) # Likewise, if the goal cell is not walkable, get the first walkable neighbor instead if not PathPlanner.is_cell_walkable(mapdata, goal): goal = PathPlanner.get_first_walkable_neighbor(mapdata, goal) pq = PriorityQueue() pq.put(start, 0) cost_so_far = {} distance_cost_so_far = {} cost_so_far[start] = 0 distance_cost_so_far[start] = 0 came_from = {} came_from[start] = None while not pq.empty(): current = pq.get() if current == goal: break for neighbor, distance in PathPlanner.neighbors_and_distances_of_8( mapdata, current ): added_cost = ( distance + COST_MAP_WEIGHT * PathPlanner.get_cost_map_value(cost_map, neighbor) ) new_cost = cost_so_far[current] + added_cost if not neighbor in cost_so_far or new_cost < cost_so_far[neighbor]: cost_so_far[neighbor] = new_cost distance_cost_so_far[neighbor] = ( distance_cost_so_far[current] + distance ) priority = new_cost + PathPlanner.euclidean_distance(neighbor, goal) pq.put(neighbor, priority) came_from[neighbor] = current path = [] cell = goal while cell: path.insert(0, cell) if cell in came_from: cell = came_from[cell] else: return (None, None, start, goal) # Prevent paths that are too short MIN_PATH_LENGTH = 12 if len(path) < MIN_PATH_LENGTH: return (None, None, start, goal) # Truncate the last few poses of the path POSES_TO_TRUNCATE = 8 path = path[:-POSES_TO_TRUNCATE] return (path, distance_cost_so_far[goal], start, goal) @staticmethod def path_to_message(mapdata: OccupancyGrid, path: "list[tuple[int, int]]") -> Path: """ Takes a path on the grid and returns a Path message. :param path [[(int,int)]] The path on the grid (a list of tuples) :return [Path] A Path message (the coordinates are expressed in the world) """ poses = PathPlanner.path_to_poses(mapdata, path) return Path(header=Header(frame_id="map"), poses=poses)

pure_pursuit.py

ROS node that listens for a Path from the FrontierExploration node and follows it using Pure Pursuit.

#!/usr/bin/env python3 import math import rospy import numpy as np from path_planner import PathPlanner from std_msgs.msg import Header, Bool from nav_msgs.msg import Path, Odometry, GridCells, OccupancyGrid from geometry_msgs.msg import Point, PointStamped, Twist, Vector3, Pose, Quaternion from tf.transformations import euler_from_quaternion from tf import TransformListener class PurePursuit: def __init__(self): """ Class constructor """ rospy.init_node("pure_pursuit") # Set if in debug mode self.is_in_debug_mode = ( rospy.has_param("~debug") and rospy.get_param("~debug") == "true" ) # Publishers self.cmd_vel = rospy.Publisher("/cmd_vel", Twist, queue_size=10) self.lookahead_pub = rospy.Publisher( "/pure_pursuit/lookahead", PointStamped, queue_size=10 ) if self.is_in_debug_mode: self.fov_cells_pub = rospy.Publisher( "/pure_pursuit/fov_cells", GridCells, queue_size=100 ) self.close_wall_cells_pub = rospy.Publisher( "/pure_pursuit/close_wall_cells", GridCells, queue_size=100 ) # Subscribers rospy.Subscriber("/odom", Odometry, self.update_odometry) rospy.Subscriber("/map", OccupancyGrid, self.update_map) rospy.Subscriber("/pure_pursuit/path", Path, self.update_path) rospy.Subscriber("/pure_pursuit/enabled", Bool, self.update_enabled) # Pure pursuit parameters self.LOOKAHEAD_DISTANCE = 0.18 # m self.WHEEL_BASE = 0.16 # m self.MAX_DRIVE_SPEED = 0.1 # m/s self.MAX_TURN_SPEED = 1.25 # rad/s self.TURN_SPEED_KP = 1.25 self.DISTANCE_TOLERANCE = 0.1 # m # Obstacle avoidance parameters self.OBSTACLE_AVOIDANCE_GAIN = 0.3 self.OBSTACLE_AVOIDANCE_MAX_SLOW_DOWN_DISTANCE = 0.16 # m self.OBSTACLE_AVOIDANCE_MIN_SLOW_DOWN_DISTANCE = 0.12 # m self.OBSTACLE_AVOIDANCE_MIN_SLOW_DOWN_FACTOR = 0.25 self.FOV = 200 # degrees self.FOV_DISTANCE = 25 # Number of grid cells self.FOV_DEADZONE = 80 # degrees self.SMALL_FOV = 300 # degrees self.SMALL_FOV_DISTANCE = 10 # Number of grid cells self.tf_listener = TransformListener() self.pose = None self.map = None self.path = Path() self.alpha = 0 self.enabled = True self.reversed = False self.closest_distance = float("inf") def update_odometry(self, msg: Odometry): """ Updates the current pose of the robot. """ try: (trans, rot) = self.tf_listener.lookupTransform( "/map", "/base_footprint", rospy.Time(0) ) except: return self.pose = Pose( position=Point(x=trans[0], y=trans[1]), orientation=Quaternion(x=rot[0], y=rot[1], z=rot[2], w=rot[3]), ) def update_map(self, msg: OccupancyGrid): """ Updates the current map. This method is a callback bound to a Subscriber. :param msg [OccupancyGrid] The current map information. """ self.map = msg def update_path(self, msg: Path): self.path = msg def update_enabled(self, msg: Bool): self.enabled = msg.data def calculate_steering_adjustment(self) -> float: if self.pose is None or self.map is None: return 0 orientation = self.pose.orientation roll, pitch, yaw = euler_from_quaternion( [orientation.x, orientation.y, orientation.z, orientation.w] ) yaw = float(np.rad2deg(yaw)) # Get the grid cell of the robot robot_cell = PathPlanner.world_to_grid(self.map, self.pose.position) weighted_sum_of_angles = 0 total_weight = 0 self.closest_distance = float("inf") # Get all wall cells near the robot within the distance fov_cells = [] wall_cells = [] wall_cell_count = 0 for dx in range(-self.FOV_DISTANCE, self.FOV_DISTANCE + 1): for dy in range(-self.FOV_DISTANCE, self.FOV_DISTANCE + 1): cell = (robot_cell[0] + dx, robot_cell[1] + dy) distance = PathPlanner.euclidean_distance(robot_cell, cell) # If the cell is out of bounds, ignore it if not PathPlanner.is_cell_in_bounds(self.map, cell): continue is_wall = not PathPlanner.is_cell_walkable(self.map, cell) if is_wall and distance < self.closest_distance: self.closest_distance = distance # Calculate the angle of the cell relative to the robot angle = float(np.rad2deg(np.arctan2(dy, dx))) - yaw # If reversed, add 180 to the angle if self.reversed: angle += 180 # Keep angle in the range of -180 to 180 if angle < -180: angle += 360 elif angle > 180: angle -= 360 # Ignore scans that are outside the field of view is_in_fov = ( distance <= self.FOV_DISTANCE and angle >= -self.FOV / 2 and angle <= self.FOV / 2 and not abs(angle) < self.FOV_DEADZONE / 2 ) is_in_small_fov = ( distance <= self.SMALL_FOV_DISTANCE and angle >= -self.SMALL_FOV / 2 and angle <= self.SMALL_FOV / 2 ) if not is_in_fov and not is_in_small_fov: continue # If in debug mode, add the cell to the field of view if self.is_in_debug_mode: fov_cells.append(cell) # If cell is not a wall, ignore it if not is_wall: continue weight = 1 / (distance**2) if distance != 0 else 0 weighted_sum_of_angles += weight * angle total_weight += weight wall_cell_count += 1 if self.is_in_debug_mode: wall_cells.append(cell) # If in debug mode, publish the wall cells if self.is_in_debug_mode: self.fov_cells_pub.publish(PathPlanner.get_grid_cells(self.map, fov_cells)) self.close_wall_cells_pub.publish( PathPlanner.get_grid_cells(self.map, wall_cells) ) if total_weight == 0: return 0 # Calculate the average angle (weighted sum of angles divided by total weight) average_angle = weighted_sum_of_angles / total_weight # Calculate the steering adjustment based on the average angle steering_adjustment = ( -self.OBSTACLE_AVOIDANCE_GAIN * average_angle / wall_cell_count ) return steering_adjustment @staticmethod def distance(x0, y0, x1, y1) -> float: return math.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2) def get_distance_to_waypoint_index(self, i: int) -> float: if self.pose is None or self.path.poses is None: return -1 position = self.pose.position waypoint = self.path.poses[i].pose.position return PurePursuit.distance(position.x, position.y, waypoint.x, waypoint.y) def find_nearest_waypoint_index(self) -> int: nearest_waypoint_index = -1 if self.path.poses is None: return nearest_waypoint_index closest_distance = float("inf") for i in range(len(self.path.poses) - 1): distance = self.get_distance_to_waypoint_index(i) if distance and distance < closest_distance: closest_distance = distance nearest_waypoint_index = i return nearest_waypoint_index def find_lookahead(self, nearest_waypoint_index, lookahead_distance) -> Point: if self.path.poses is None: return Point() i = nearest_waypoint_index while ( i < len(self.path.poses) and self.get_distance_to_waypoint_index(i) < lookahead_distance ): i += 1 return self.path.poses[i - 1].pose.position def get_goal(self) -> Point: if self.path.poses is None: return Point() poses = self.path.poses return poses[len(poses) - 1].pose.position def send_speed(self, linear_speed: float, angular_speed: float): """ Sends the speeds to the motors. :param linear_speed [float] [m/s] The forward linear speed. :param angular_speed [float] [rad/s] The angular speed for rotating around the body center. """ twist = Twist(linear=Vector3(x=linear_speed), angular=Vector3(z=angular_speed)) self.cmd_vel.publish(twist) def stop(self): self.send_speed(0, 0) def run(self): rospy.sleep(5) while not rospy.is_shutdown(): if self.pose is None: continue # If not enabled, do nothing if not self.enabled: continue # If no path, stop if self.path is None or not self.path.poses: self.stop() continue goal = self.get_goal() nearest_waypoint_index = self.find_nearest_waypoint_index() lookahead = self.find_lookahead( nearest_waypoint_index, self.LOOKAHEAD_DISTANCE ) self.lookahead_pub.publish( PointStamped(header=Header(frame_id="map"), point=lookahead) ) # Calculate alpha (angle between target and current position) position = self.pose.position orientation = self.pose.orientation roll, pitch, yaw = euler_from_quaternion( [orientation.x, orientation.y, orientation.z, orientation.w] ) x = position.x y = position.y dx = lookahead.x - x dy = lookahead.y - y self.alpha = float(np.arctan2(dy, dx) - yaw) if self.alpha > np.pi: self.alpha -= 2 * np.pi elif self.alpha < -np.pi: self.alpha += 2 * np.pi # If the lookahead is behind the robot, follow the path backwards self.reversed = abs(self.alpha) > np.pi / 2 # Calculate the lookahead distance and center of curvature lookahead_distance = PurePursuit.distance(x, y, lookahead.x, lookahead.y) radius_of_curvature = float(lookahead_distance / (2 * np.sin(self.alpha))) # Calculate drive speed drive_speed = (-1 if self.reversed else 1) * self.MAX_DRIVE_SPEED # Stop if at goal distance_to_goal = PurePursuit.distance(x, y, goal.x, goal.y) if distance_to_goal < self.DISTANCE_TOLERANCE: self.stop() continue # Calculate turn speed turn_speed = self.TURN_SPEED_KP * drive_speed / radius_of_curvature # Obstacle avoicance turn_speed += self.calculate_steering_adjustment() # Clamp turn speed turn_speed = max(-self.MAX_TURN_SPEED, min(self.MAX_TURN_SPEED, turn_speed)) # Slow down if close to obstacle if self.closest_distance < self.OBSTACLE_AVOIDANCE_MAX_SLOW_DOWN_DISTANCE: drive_speed *= float( np.interp( self.closest_distance, [ self.OBSTACLE_AVOIDANCE_MIN_SLOW_DOWN_DISTANCE, self.OBSTACLE_AVOIDANCE_MAX_SLOW_DOWN_DISTANCE, ], [self.OBSTACLE_AVOIDANCE_MIN_SLOW_DOWN_FACTOR, 1], ) ) # Send speed self.send_speed(drive_speed, turn_speed) if __name__ == "__main__": PurePursuit().run()