This project is an implementation of SLAM (Simultaneous Localization and Mapping). The robot uses a LIDAR sensor to scan its environment and search out new areas of the map to explore.
The video above covers SLAM, how a LIDAR sensor works, frontier exploration, pathfinding, pure pursuit, obstacle avoidance, and Monte Carlo localization.
This project was part, for one of my robotics courses, RBE 3002 - Unified Robotics IV.
Below are some code snippets for various ROS messages and nodes used to make this project.
ROS message for a frontier that keeps track of its size in grid cells and its centroid as a point.
uint32 size
geometry_msgs/Point centroid
ROS message for a list of frontiers.
lab4/Frontier[] frontiers
ROS node that constantly checks for new frontiers and publishes them using Expanding Wavefront Frontier Detection.
#!/usr/bin/env python3
from path_planner import PathPlanner
from nav_msgs.msg import OccupancyGrid
from lab4.msg import Frontier, FrontierList
class FrontierSearch:
def search(
mapdata: OccupancyGrid,
start: "tuple[int, int]",
include_frontier_cells: bool = False,
) -> "tuple[FrontierList, list[tuple[int, int]]]":
MIN_FRONTIER_SIZE = 8 # Number of cells
# Create queue for breadth-first search
queue = []
# Initialize dictionaries for keeping track of visited and frontier cells
visited = {}
is_frontier = {}
visited[start] = True
# Initialize list of frontiers
frontiers = []
# Initialize list of frontier cells
frontier_cells = []
while queue:
current = queue.pop(0)
for neighbor in PathPlanner.neighbors_of_4(mapdata, current):
neighbor_value = PathPlanner.get_cell_value(mapdata, neighbor)
if neighbor_value >= 0 and not neighbor in visited:
visited[neighbor] = True
elif FrontierSearch.is_new_frontier_cell(
mapdata, neighbor, is_frontier
# Mark as frontier
is_frontier[neighbor] = True
# Build new frontier
new_frontier, new_frontier_cells = (
if new_frontier.size >= MIN_FRONTIER_SIZE:
if include_frontier_cells:
return (FrontierList(frontiers=frontiers), frontier_cells)
def build_new_frontier(
mapdata: OccupancyGrid,
initial_cell: "tuple[int, int]",
is_frontier: dict,
include_frontier_cells: bool = False,
) -> "tuple[Frontier, list[tuple[int, int]]]":
# Initialize frontier fields
size = 1
centroid_x = initial_cell[0]
centroid_y = initial_cell[1]
# Create queue for breadth-first search
queue = []
# Initialize list of frontier cells
frontier_cells = []
# Breadth-first search for frontier cells
while queue:
current = queue.pop(0)
if include_frontier_cells:
for neighbor in PathPlanner.neighbors_of_8(mapdata, current):
if FrontierSearch.is_new_frontier_cell(mapdata, neighbor, is_frontier):
# Mark as frontier
is_frontier[neighbor] = True
# Update size and centroid
size += 1
centroid_x += neighbor[0]
centroid_y += neighbor[1]
# Calculate centroid by taking the average
centroid_x /= size
centroid_y /= size
# Make and return new frontier
centroid = PathPlanner.grid_to_world(
mapdata, (int(centroid_x), int(centroid_y))
return (
Frontier(size=size, centroid=centroid),
def is_new_frontier_cell(
mapdata: OccupancyGrid, cell: "tuple[int, int]", is_frontier: dict
) -> bool:
# Cell must be unknown and not already a frontier
if PathPlanner.get_cell_value(mapdata, cell) != -1 or cell in is_frontier:
return False
# Cell should have at least one connected cell that is free
for neighbor in PathPlanner.neighbors_of_4(mapdata, cell):
neighbor_value = PathPlanner.get_cell_value(mapdata, neighbor)
if neighbor_value >= 0 and neighbor_value < WALKABLE_THRESHOLD:
return True
return False
ROS node that listens for a FrontierList published by the FrontierSearch node and picks one to explore.
It uses a combination of A* cost and frontier size to decide which frontier is the best.
Once a frontier has been selected, this node publishs a path to reach the frontier using the PathPlanner class.
#!/usr/bin/env python3
import os
import rospy
import rospkg
import threading
import subprocess
import numpy as np
from typing import Union
from path_planner import PathPlanner
from frontier_search import FrontierSearch
from nav_msgs.msg import OccupancyGrid, Path, GridCells, Odometry
from geometry_msgs.msg import Pose, Point, Quaternion
from lab4.msg import FrontierList
from tf import TransformListener
from tf.transformations import euler_from_quaternion
class FrontierExploration:
def **init**(self):
Class constructor
# Set if in debug mode
self.is_in_debug_mode = (
rospy.has_param("~debug") and rospy.get_param("~debug") == "true"
# Publishers
self.pure_pursuit_pub = rospy.Publisher(
"/pure_pursuit/path", Path, queue_size=10
if self.is_in_debug_mode:
self.frontier_cells_pub = rospy.Publisher(
"/frontier_exploration/frontier_cells", GridCells, queue_size=10
self.start_pub = rospy.Publisher(
"/frontier_exploration/start", GridCells, queue_size=10
self.goal_pub = rospy.Publisher(
"/frontier_exploration/goal", GridCells, queue_size=10
self.cspace_pub = rospy.Publisher("/cspace", GridCells, queue_size=10)
self.cost_map_pub = rospy.Publisher(
"/cost_map", OccupancyGrid, queue_size=10
# Subscribers
rospy.Subscriber("/odom", Odometry, self.update_odometry)
rospy.Subscriber("/map", OccupancyGrid, self.update_map)
self.tf_listener = TransformListener()
self.lock = threading.Lock()
self.pose = None = None
self.no_path_found_counter = 0
self.no_frontiers_found_counter = 0
self.is_finished_exploring = False
def update_odometry(self, msg: "Union[Odometry, None]" = None):
Updates the current pose of the robot.
(trans, rot) = self.tf_listener.lookupTransform(
"/map", "/base_footprint", rospy.Time(0)
self.pose = Pose(
position=Point(x=trans[0], y=trans[1]),
orientation=Quaternion(x=rot[0], y=rot[1], z=rot[2], w=rot[3]),
def update_map(self, msg: OccupancyGrid):
Updates the current map.
This method is a callback bound to a Subscriber.
:param msg [OccupancyGrid] The current map information.
""" = msg
def save_map(self):
# Get the path of the current package
rospack = rospkg.RosPack()
package_path = rospack.get_path("lab4")
# Construct the path to the map
map_path = os.path.join(package_path, "map/map")
if not os.path.exists(os.path.dirname(map_path)):
# Run map_saver["rosrun", "map_server", "map_saver", "-f", map_path])
if self.pose is None:
rospy.logerr("Failed to get pose")
# Save the robot's position and orientation
position = self.pose.position
orientation = self.pose.orientation
roll, pitch, yaw = euler_from_quaternion(
[orientation.x, orientation.y, orientation.z, orientation.w]
with open(os.path.join(package_path, "map/pose.txt"), "w") as f:
f.write(f"{position.x} {position.y} {position.z} {yaw} {pitch} {roll}\n")
def get_top_frontiers(frontiers, n):
# Sort the frontiers by size in descending order
sorted_frontiers = sorted(
frontiers, key=lambda frontier: frontier.size, reverse=True
# Return the top n frontiers
return sorted_frontiers[:n]
def publish_cost_map(self, mapdata: OccupancyGrid, cost_map: np.ndarray):
# Create an OccupancyGrid message
grid = OccupancyGrid()
grid.header.stamp =
grid.header.frame_id = "map" = = cost_map.shape[1] = cost_map.shape[0] =
# Normalize the cost map to the range [0, 100] and convert it to integers
cost_map_normalized = (cost_map / np.max(cost_map) * 100).astype(np.int8)
# Flatten the cost map and convert it to a list = cost_map_normalized.flatten().tolist()
# Publish the OccupancyGrid message
def check_if_finished_exploring(self):
# Publish empty path to stop the robot
# If no frontiers or paths are found for a certain number of times, finish exploring
if (
self.no_frontiers_found_counter >= self.NUM_EXPLORE_FAILS_BEFORE_FINISH
or self.no_path_found_counter >= self.NUM_EXPLORE_FAILS_BEFORE_FINISH
rospy.loginfo("Done exploring!")
rospy.loginfo("Saved map")
self.is_finished_exploring = True
def explore_frontier(self, frontier_list: FrontierList):
# If finished exploring, no pose, no map, or no frontier list, return
if self.is_finished_exploring or self.pose is None or is None:
frontiers = frontier_list.frontiers
# If no frontiers are found, check if finished exploring
if not frontiers:
rospy.loginfo("No frontiers")
self.no_frontiers_found_counter += 1
self.no_frontiers_found_counter = 0
# Calculate the C-space
cspace, cspace_cells = PathPlanner.calc_cspace(, self.is_in_debug_mode)
# if cspace_cells is not None:
# self.cspace_pub.publish(cspace_cells)
# Calculate the cost map
cost_map = PathPlanner.calc_cost_map(
if self.is_in_debug_mode:
self.publish_cost_map(, cost_map)
# Get the start
start = PathPlanner.world_to_grid(, self.pose.position)
# Execute A* for every frontier
lowest_cost = float("inf")
best_path = None
# Check only the top frontiers in terms of size
top_frontiers = FrontierExploration.get_top_frontiers(
starts = []
goals = []
# Log how many frontiers are being explored
rospy.loginfo(f"Exploring {len(top_frontiers)} frontiers")
for frontier in top_frontiers:
# Get goal
goal = PathPlanner.world_to_grid(, frontier.centroid)
# Execute A*
path, a_star_cost, start, goal = PathPlanner.a_star(
cspace, cost_map, start, goal
# If in debug mode, append start and goal
if self.is_in_debug_mode:
if path is None or a_star_cost is None:
# Calculate cost
cost = (A_STAR_COST_WEIGHT * a_star_cost) + (
# Update best path
if cost < lowest_cost:
lowest_cost = cost
best_path = path
# If in debug mode, publish the start and goal
if self.is_in_debug_mode:
self.start_pub.publish(PathPlanner.get_grid_cells(, starts))
self.goal_pub.publish(PathPlanner.get_grid_cells(, goals))
# If a path was found, publish it
if best_path:
rospy.loginfo(f"Found best path with cost {lowest_cost}")
start = best_path[0]
path = PathPlanner.path_to_message(, best_path)
self.no_path_found_counter = 0
# If no path was found, check if finished exploring
rospy.loginfo("No paths found")
self.no_path_found_counter += 1
def run(self):
rate = rospy.Rate(20) # Hz
while not rospy.is_shutdown():
if self.pose is None or is None:
# Get the start position of the robot
start = PathPlanner.world_to_grid(, self.pose.position)
# Get frontiers
frontier_list, frontier_cells =, start, self.is_in_debug_mode
if frontier_list is None:
# Publish frontier cells if in debug mode
if self.is_in_debug_mode:
PathPlanner.get_grid_cells(, frontier_cells)
if **name** == "**main**":
Helper class used to perform an A* search on a grid cell map.
#!/usr/bin/env python3
import rospy
import math
import cv2
import numpy as np
from typing import Union
from std_msgs.msg import Header
from nav_msgs.msg import GridCells, OccupancyGrid, Path
from geometry_msgs.msg import Point, Quaternion, Pose, PoseStamped
from priority_queue import PriorityQueue
from tf.transformations import quaternion_from_euler
DIRECTIONS_OF_4 = [(-1, 0), (1, 0), (0, -1), (0, 1)]
DIRECTIONS_OF_8 = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
class PathPlanner:
def grid_to_index(mapdata: OccupancyGrid, p: "tuple[int, int]") -> int:
Returns the index corresponding to the given (x,y) coordinates in the occupancy grid.
:param p [(int, int)] The cell coordinate.
:return [int] The index.
return p[1] * + p[0]
def get_cell_value(mapdata: OccupancyGrid, p: "tuple[int, int]") -> int:
Returns the cell corresponding to the given (x,y) coordinates in the occupancy grid.
:param p [(int, int)] The cell coordinate.
:return [int] The cell.
return[PathPlanner.grid_to_index(mapdata, p)]
def euclidean_distance(
p1: "tuple[float, float]", p2: "tuple[float, float]"
) -> float:
Calculates the Euclidean distance between two points.
:param p1 [(float, float)] first point.
:param p2 [(float, float)] second point.
:return [float] distance.
return math.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2)
def grid_to_world(mapdata: OccupancyGrid, p: "tuple[int, int]") -> Point:
Transforms a cell coordinate in the occupancy grid into a world coordinate.
:param mapdata [OccupancyGrid] The map information.
:param p [(int, int)] The cell coordinate.
:return [Point] The position in the world.
x = (p[0] + 0.5) * +
y = (p[1] + 0.5) * +
return Point(x, y, 0)
def world_to_grid(mapdata: OccupancyGrid, wp: Point) -> "tuple[int, int]":
Transforms a world coordinate into a cell coordinate in the occupancy grid.
:param mapdata [OccupancyGrid] The map information.
:param wp [Point] The world coordinate.
:return [(int,int)] The cell position as a tuple.
x = int((wp.x - /
y = int((wp.y - /
return (x, y)
def path_to_poses(
mapdata: OccupancyGrid, path: "list[tuple[int, int]]"
) -> "list[PoseStamped]":
Converts the given path into a list of PoseStamped.
:param mapdata [OccupancyGrid] The map information.
:param path [[(int,int)]] The path as a list of tuples (cell coordinates).
:return [[PoseStamped]] The path as a list of PoseStamped (world coordinates).
poses = []
for i in range(len(path) - 1):
cell = path[i]
next_cell = path[i + 1]
if i != len(path) - 1:
angle_to_next = math.atan2(
next_cell[1] - cell[1], next_cell[0] - cell[0]
q = quaternion_from_euler(0, 0, angle_to_next)
position=PathPlanner.grid_to_world(mapdata, cell),
orientation=Quaternion(q[0], q[1], q[2], q[3]),
return poses
def is_cell_in_bounds(mapdata: OccupancyGrid, p: "tuple[int, int]") -> bool:
width =
height =
x = p[0]
y = p[1]
if x < 0 or x >= width:
return False
if y < 0 or y >= height:
return False
return True
def is_cell_walkable(mapdata: OccupancyGrid, p: "tuple[int, int]") -> bool:
A cell is walkable if all of these conditions are true:
1. It is within the boundaries of the grid;
2. It is free (not occupied by an obstacle)
:param mapdata [OccupancyGrid] The map information.
:param p [(int, int)] The coordinate in the grid.
:return [bool] True if the cell is walkable, False otherwise
if not PathPlanner.is_cell_in_bounds(mapdata, p):
return False
return PathPlanner.get_cell_value(mapdata, p) < WALKABLE_THRESHOLD
def neighbors(
mapdata: OccupancyGrid,
p: "tuple[int, int]",
directions: "list[tuple[int, int]]",
must_be_walkable: bool = True,
) -> "list[tuple[int, int]]":
Returns the neighbors cells of (x,y) in the occupancy grid given directions to check.
:param mapdata [OccupancyGrid] The map information.
:param p [(int, int)] The coordinate in the grid.
:param directions [[(int,int)]] A list of directions to check for neighbors.
:param must_be_walkable [bool] Whether or not the cells must be walkable
:return [[(int,int)]] A list of 4-neighbors.
neighbors = []
for direction in directions:
candidate = (p[0] + direction[0], p[1] + direction[1])
if (
must_be_walkable and PathPlanner.is_cell_walkable(mapdata, candidate)
) or (
not must_be_walkable
and PathPlanner.is_cell_in_bounds(mapdata, candidate)
return neighbors
def neighbors_of_4(
mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True
) -> "list[tuple[int, int]]":
return PathPlanner.neighbors(mapdata, p, DIRECTIONS_OF_4, must_be_walkable)
def neighbors_of_8(
mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True
) -> "list[tuple[int, int]]":
return PathPlanner.neighbors(mapdata, p, DIRECTIONS_OF_8, must_be_walkable)
def neighbors_and_distances(
mapdata: OccupancyGrid,
p: "tuple[int, int]",
directions: "list[tuple[int, int]]",
must_be_walkable: bool = True,
) -> "list[tuple[tuple[int, int], float]]":
Returns the neighbors cells of (x,y) in the occupancy grid given directions to check and their distances.
:param mapdata [OccupancyGrid] The map information.
:param p [(int, int)] The coordinate in the grid.
:param directions [[(int,int)]] A list of directions to check for neighbors.
:param must_be_walkable [bool] Whether or not the cells must be walkable
:return [[(int,int)]] A list of 4-neighbors.
neighbors = []
for direction in directions:
candidate = (p[0] + direction[0], p[1] + direction[1])
if not must_be_walkable or PathPlanner.is_cell_walkable(mapdata, candidate):
distance = PathPlanner.euclidean_distance(direction, (0, 0))
neighbors.append((candidate, distance))
return neighbors
def neighbors_and_distances_of_4(
mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True
) -> "list[tuple[tuple[int, int], float]]":
return PathPlanner.neighbors_and_distances(
mapdata, p, DIRECTIONS_OF_4, must_be_walkable
def neighbors_and_distances_of_8(
mapdata: OccupancyGrid, p: "tuple[int, int]", must_be_walkable: bool = True
) -> "list[tuple[tuple[int, int], float]]":
return PathPlanner.neighbors_and_distances(
mapdata, p, DIRECTIONS_OF_8, must_be_walkable
def get_grid_cells(
mapdata: OccupancyGrid, cells: "list[tuple[int, int]]"
) -> GridCells:
world_cells = []
for cell in cells:
world_cells.append(PathPlanner.grid_to_world(mapdata, cell))
resolution =
return GridCells(
def calc_cspace(
mapdata: OccupancyGrid, include_cells: bool
) -> "tuple[OccupancyGrid, Union[GridCells, None]]":
Calculates the C-Space, i.e., makes the obstacles in the map thicker.
Publishes the list of cells that were added to the original map.
:param mapdata [OccupancyGrid] The map data.
:return [OccupancyGrid] The C-Space.
PADDING = 5 # The number of cells around the obstacles
# Create numpy grid from mapdata
width =
height =
map = np.array(, height).astype(np.uint8)
# Get mask of unknown areas
unknown_area_mask = cv2.inRange(
map, 255, 255
) # -1 overflows to 255 when cast to uint8
kernel = np.ones((PADDING, PADDING), dtype=np.uint8)
unknown_area_mask = cv2.erode(unknown_area_mask, kernel, iterations=1)
# Change unknown areas to free space
map[map == 255] = 0
# Inflate the obstacles where necessary
kernel = np.ones((PADDING, PADDING), np.uint8)
obstacle_mask = cv2.dilate(map, kernel, iterations=1)
cspace_data = cv2.bitwise_or(obstacle_mask, unknown_area_mask)
cspace_data = np.array(cspace_data).reshape(width * height).tolist()
# Return the C-space
cspace = OccupancyGrid(
header=mapdata.header,, data=cspace_data
# Return the cells that were added to the original map
cspace_cells = None
if include_cells:
cells = []
# Find the indices of the obstacle cells
obstacle_indices = np.where(obstacle_mask > 0)
# Convert the indices to cell coordinates and append them to cells
for y, x in zip(*obstacle_indices):
cells.append((x, y))
cspace_cells = PathPlanner.get_grid_cells(mapdata, cells)
return (cspace, cspace_cells)
def get_cost_map_value(cost_map: np.ndarray, p: "tuple[int, int]") -> int:
return cost_map[p[1]][p[0]]
def show_map(name: str, map: np.ndarray):
normalized = cv2.normalize(
map, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U
cv2.imshow(name, normalized)
def calc_cost_map(mapdata: OccupancyGrid) -> np.ndarray:
rospy.loginfo("Calculating cost map")
# Create numpy array from mapdata
width =
height =
map = np.array(, width).astype(np.uint8)
map[map == 255] = 100
# Iteratively dilate the walls until no changes are made
cost_map = np.zeros_like(map)
dilated_map = map.copy()
iterations = 0
kernel = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], np.uint8)
while np.any(dilated_map == 0):
# Increase iterations
iterations += 1
# Dilate the map
next_dilated_map = cv2.dilate(dilated_map, kernel, iterations=1)
# Get the difference between the next dilated map and the current one to get an outline
difference = next_dilated_map - dilated_map
# Assign all non-zero cells in the outline their cost
difference[difference > 0] = iterations
# Add the outline to the cost map
cost_map = cv2.bitwise_or(cost_map, difference)
# Update dilated map
dilated_map = next_dilated_map
# PathPlanner.show_map("wall_dilation", cost_map)
# Turn the cost map into a mask of only the middle of the hallways
# All cells that are not in the middle of the hallways will have a cost of 0
# Cells that are in the middle of the hallways will have a cost of 1
cost_map = PathPlanner.create_hallway_mask(mapdata, cost_map, iterations // 4)
# PathPlanner.show_map("hallway_mask", cost_map)
# Iteratively dilate the hallway mask until no changes are made
dilated_map = cost_map.copy()
cost = 1
for i in range(iterations):
# Increase cost
cost += 1
# Dilate the map
next_dilated_map = cv2.dilate(dilated_map, kernel, iterations=1)
# Get the difference between the next dilated map and the current one to get an outline
difference = next_dilated_map - dilated_map
# Assign all non-zero cells in the outline their cost
difference[difference > 0] = cost
# Add the outline to the cost map
cost_map = cv2.bitwise_or(cost_map, difference)
# Update dilated map
dilated_map = next_dilated_map
# Subtract 1 from all non-zero values in cost map
cost_map[cost_map > 0] -= 1
# PathPlanner.show_map("cost_map", cost_map)
return cost_map
def create_hallway_mask(
mapdata: OccupancyGrid, cost_map: np.ndarray, threshold: int
) -> np.ndarray:
Create a mask of the cost_map that only contains cells that are hallway cells.
# Initialize the mask with the same shape as the cost_map and all values set to False
mask = np.zeros_like(cost_map, dtype=bool)
# Get the indices of the non-zero cells in the cost_map
non_zero_indices = np.nonzero(cost_map)
# Iterate over the non-zero cells in the cost_map
for y, x in zip(*non_zero_indices):
# If the cell is a hallway cell, set the corresponding cell in the mask to True
if PathPlanner.is_hallway_cell(mapdata, cost_map, (x, y), threshold):
mask[y][x] = 1
return mask.astype(np.uint8)
def is_hallway_cell(
mapdata: OccupancyGrid,
cost_map: np.ndarray,
p: "tuple[int, int]",
threshold: int,
) -> bool:
Determine whether a cell is a "hallway cell" meaning it has a cost
greater than or equal to all of its neighbors
cost_map_value = PathPlanner.get_cost_map_value(cost_map, p)
for neighbor in PathPlanner.neighbors_of_8(mapdata, p, False):
neighbor_cost_map_value = PathPlanner.get_cost_map_value(cost_map, neighbor)
if (
neighbor_cost_map_value < threshold
or neighbor_cost_map_value > cost_map_value
return False
return True
def get_first_walkable_neighbor(
mapdata, start: "tuple[int, int]"
) -> "tuple[int, int]":
Helper function for a_star that gets the first walkable neighbor from
the start cell in case it is not already walkable
:param mapdata [OccupancyGrid] The map data.
:param padding [start] The start cell.
:return [(int, int)] The first walkable neighbor.
# Create queue for breadth-first search
queue = []
# Initialize dictionary for keeping track of visited cells
visited = {}
while queue:
current = queue.pop(0)
if PathPlanner.is_cell_walkable(mapdata, current):
return current
for neighbor in PathPlanner.neighbors_of_4(mapdata, current, False):
visited[neighbor] = True
# If nothing found, just return original start cell
return start
def a_star(
mapdata: OccupancyGrid,
cost_map: np.ndarray,
start: "tuple[int, int]",
goal: "tuple[int, int]",
) -> "tuple[Union[list[tuple[int, int]], None], Union[float, None], tuple[int, int], tuple[int, int]]":
# If the start cell is not walkable, get the first walkable neighbor instead
if not PathPlanner.is_cell_walkable(mapdata, start):
start = PathPlanner.get_first_walkable_neighbor(mapdata, start)
# Likewise, if the goal cell is not walkable, get the first walkable neighbor instead
if not PathPlanner.is_cell_walkable(mapdata, goal):
goal = PathPlanner.get_first_walkable_neighbor(mapdata, goal)
pq = PriorityQueue()
pq.put(start, 0)
cost_so_far = {}
distance_cost_so_far = {}
cost_so_far[start] = 0
distance_cost_so_far[start] = 0
came_from = {}
came_from[start] = None
while not pq.empty():
current = pq.get()
if current == goal:
for neighbor, distance in PathPlanner.neighbors_and_distances_of_8(
mapdata, current
added_cost = (
* PathPlanner.get_cost_map_value(cost_map, neighbor)
new_cost = cost_so_far[current] + added_cost
if not neighbor in cost_so_far or new_cost < cost_so_far[neighbor]:
cost_so_far[neighbor] = new_cost
distance_cost_so_far[neighbor] = (
distance_cost_so_far[current] + distance
priority = new_cost + PathPlanner.euclidean_distance(neighbor, goal)
pq.put(neighbor, priority)
came_from[neighbor] = current
path = []
cell = goal
while cell:
path.insert(0, cell)
if cell in came_from:
cell = came_from[cell]
return (None, None, start, goal)
# Prevent paths that are too short
if len(path) < MIN_PATH_LENGTH:
return (None, None, start, goal)
# Truncate the last few poses of the path
path = path[:-POSES_TO_TRUNCATE]
return (path, distance_cost_so_far[goal], start, goal)
def path_to_message(mapdata: OccupancyGrid, path: "list[tuple[int, int]]") -> Path:
Takes a path on the grid and returns a Path message.
:param path [[(int,int)]] The path on the grid (a list of tuples)
:return [Path] A Path message (the coordinates are expressed in the world)
poses = PathPlanner.path_to_poses(mapdata, path)
return Path(header=Header(frame_id="map"), poses=poses)
ROS node that listens for a Path from the FrontierExploration node and follows it using Pure Pursuit.
#!/usr/bin/env python3
import math
import rospy
import numpy as np
from path_planner import PathPlanner
from std_msgs.msg import Header, Bool
from nav_msgs.msg import Path, Odometry, GridCells, OccupancyGrid
from geometry_msgs.msg import Point, PointStamped, Twist, Vector3, Pose, Quaternion
from tf.transformations import euler_from_quaternion
from tf import TransformListener
class PurePursuit:
def __init__(self):
Class constructor
# Set if in debug mode
self.is_in_debug_mode = (
rospy.has_param("~debug") and rospy.get_param("~debug") == "true"
# Publishers
self.cmd_vel = rospy.Publisher("/cmd_vel", Twist, queue_size=10)
self.lookahead_pub = rospy.Publisher(
"/pure_pursuit/lookahead", PointStamped, queue_size=10
if self.is_in_debug_mode:
self.fov_cells_pub = rospy.Publisher(
"/pure_pursuit/fov_cells", GridCells, queue_size=100
self.close_wall_cells_pub = rospy.Publisher(
"/pure_pursuit/close_wall_cells", GridCells, queue_size=100
# Subscribers
rospy.Subscriber("/odom", Odometry, self.update_odometry)
rospy.Subscriber("/map", OccupancyGrid, self.update_map)
rospy.Subscriber("/pure_pursuit/path", Path, self.update_path)
rospy.Subscriber("/pure_pursuit/enabled", Bool, self.update_enabled)
# Pure pursuit parameters
self.LOOKAHEAD_DISTANCE = 0.18 # m
self.WHEEL_BASE = 0.16 # m
self.MAX_DRIVE_SPEED = 0.1 # m/s
self.MAX_TURN_SPEED = 1.25 # rad/s
self.TURN_SPEED_KP = 1.25
# Obstacle avoidance parameters
self.FOV = 200 # degrees
self.FOV_DISTANCE = 25 # Number of grid cells
self.FOV_DEADZONE = 80 # degrees
self.SMALL_FOV = 300 # degrees
self.SMALL_FOV_DISTANCE = 10 # Number of grid cells
self.tf_listener = TransformListener()
self.pose = None = None
self.path = Path()
self.alpha = 0
self.enabled = True
self.reversed = False
self.closest_distance = float("inf")
def update_odometry(self, msg: Odometry):
Updates the current pose of the robot.
(trans, rot) = self.tf_listener.lookupTransform(
"/map", "/base_footprint", rospy.Time(0)
self.pose = Pose(
position=Point(x=trans[0], y=trans[1]),
orientation=Quaternion(x=rot[0], y=rot[1], z=rot[2], w=rot[3]),
def update_map(self, msg: OccupancyGrid):
Updates the current map.
This method is a callback bound to a Subscriber.
:param msg [OccupancyGrid] The current map information.
""" = msg
def update_path(self, msg: Path):
self.path = msg
def update_enabled(self, msg: Bool):
self.enabled =
def calculate_steering_adjustment(self) -> float:
if self.pose is None or is None:
return 0
orientation = self.pose.orientation
roll, pitch, yaw = euler_from_quaternion(
[orientation.x, orientation.y, orientation.z, orientation.w]
yaw = float(np.rad2deg(yaw))
# Get the grid cell of the robot
robot_cell = PathPlanner.world_to_grid(, self.pose.position)
weighted_sum_of_angles = 0
total_weight = 0
self.closest_distance = float("inf")
# Get all wall cells near the robot within the distance
fov_cells = []
wall_cells = []
wall_cell_count = 0
for dx in range(-self.FOV_DISTANCE, self.FOV_DISTANCE + 1):
for dy in range(-self.FOV_DISTANCE, self.FOV_DISTANCE + 1):
cell = (robot_cell[0] + dx, robot_cell[1] + dy)
distance = PathPlanner.euclidean_distance(robot_cell, cell)
# If the cell is out of bounds, ignore it
if not PathPlanner.is_cell_in_bounds(, cell):
is_wall = not PathPlanner.is_cell_walkable(, cell)
if is_wall and distance < self.closest_distance:
self.closest_distance = distance
# Calculate the angle of the cell relative to the robot
angle = float(np.rad2deg(np.arctan2(dy, dx))) - yaw
# If reversed, add 180 to the angle
if self.reversed:
angle += 180
# Keep angle in the range of -180 to 180
if angle < -180:
angle += 360
elif angle > 180:
angle -= 360
# Ignore scans that are outside the field of view
is_in_fov = (
distance <= self.FOV_DISTANCE
and angle >= -self.FOV / 2
and angle <= self.FOV / 2
and not abs(angle) < self.FOV_DEADZONE / 2
is_in_small_fov = (
distance <= self.SMALL_FOV_DISTANCE
and angle >= -self.SMALL_FOV / 2
and angle <= self.SMALL_FOV / 2
if not is_in_fov and not is_in_small_fov:
# If in debug mode, add the cell to the field of view
if self.is_in_debug_mode:
# If cell is not a wall, ignore it
if not is_wall:
weight = 1 / (distance**2) if distance != 0 else 0
weighted_sum_of_angles += weight * angle
total_weight += weight
wall_cell_count += 1
if self.is_in_debug_mode:
# If in debug mode, publish the wall cells
if self.is_in_debug_mode:
self.fov_cells_pub.publish(PathPlanner.get_grid_cells(, fov_cells))
PathPlanner.get_grid_cells(, wall_cells)
if total_weight == 0:
return 0
# Calculate the average angle (weighted sum of angles divided by total weight)
average_angle = weighted_sum_of_angles / total_weight
# Calculate the steering adjustment based on the average angle
steering_adjustment = (
-self.OBSTACLE_AVOIDANCE_GAIN * average_angle / wall_cell_count
return steering_adjustment
def distance(x0, y0, x1, y1) -> float:
return math.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2)
def get_distance_to_waypoint_index(self, i: int) -> float:
if self.pose is None or self.path.poses is None:
return -1
position = self.pose.position
waypoint = self.path.poses[i].pose.position
return PurePursuit.distance(position.x, position.y, waypoint.x, waypoint.y)
def find_nearest_waypoint_index(self) -> int:
nearest_waypoint_index = -1
if self.path.poses is None:
return nearest_waypoint_index
closest_distance = float("inf")
for i in range(len(self.path.poses) - 1):
distance = self.get_distance_to_waypoint_index(i)
if distance and distance < closest_distance:
closest_distance = distance
nearest_waypoint_index = i
return nearest_waypoint_index
def find_lookahead(self, nearest_waypoint_index, lookahead_distance) -> Point:
if self.path.poses is None:
return Point()
i = nearest_waypoint_index
while (
i < len(self.path.poses)
and self.get_distance_to_waypoint_index(i) < lookahead_distance
i += 1
return self.path.poses[i - 1].pose.position
def get_goal(self) -> Point:
if self.path.poses is None:
return Point()
poses = self.path.poses
return poses[len(poses) - 1].pose.position
def send_speed(self, linear_speed: float, angular_speed: float):
Sends the speeds to the motors.
:param linear_speed [float] [m/s] The forward linear speed.
:param angular_speed [float] [rad/s] The angular speed for rotating around the body center.
twist = Twist(linear=Vector3(x=linear_speed), angular=Vector3(z=angular_speed))
def stop(self):
self.send_speed(0, 0)
def run(self):
while not rospy.is_shutdown():
if self.pose is None:
# If not enabled, do nothing
if not self.enabled:
# If no path, stop
if self.path is None or not self.path.poses:
goal = self.get_goal()
nearest_waypoint_index = self.find_nearest_waypoint_index()
lookahead = self.find_lookahead(
nearest_waypoint_index, self.LOOKAHEAD_DISTANCE
PointStamped(header=Header(frame_id="map"), point=lookahead)
# Calculate alpha (angle between target and current position)
position = self.pose.position
orientation = self.pose.orientation
roll, pitch, yaw = euler_from_quaternion(
[orientation.x, orientation.y, orientation.z, orientation.w]
x = position.x
y = position.y
dx = lookahead.x - x
dy = lookahead.y - y
self.alpha = float(np.arctan2(dy, dx) - yaw)
if self.alpha > np.pi:
self.alpha -= 2 * np.pi
elif self.alpha < -np.pi:
self.alpha += 2 * np.pi
# If the lookahead is behind the robot, follow the path backwards
self.reversed = abs(self.alpha) > np.pi / 2
# Calculate the lookahead distance and center of curvature
lookahead_distance = PurePursuit.distance(x, y, lookahead.x, lookahead.y)
radius_of_curvature = float(lookahead_distance / (2 * np.sin(self.alpha)))
# Calculate drive speed
drive_speed = (-1 if self.reversed else 1) * self.MAX_DRIVE_SPEED
# Stop if at goal
distance_to_goal = PurePursuit.distance(x, y, goal.x, goal.y)
if distance_to_goal < self.DISTANCE_TOLERANCE:
# Calculate turn speed
turn_speed = self.TURN_SPEED_KP * drive_speed / radius_of_curvature
# Obstacle avoicance
turn_speed += self.calculate_steering_adjustment()
# Clamp turn speed
turn_speed = max(-self.MAX_TURN_SPEED, min(self.MAX_TURN_SPEED, turn_speed))
# Slow down if close to obstacle
if self.closest_distance < self.OBSTACLE_AVOIDANCE_MAX_SLOW_DOWN_DISTANCE:
drive_speed *= float(
# Send speed
self.send_speed(drive_speed, turn_speed)
if __name__ == "__main__":