This will be a relatively short post, following on specifically from Part 2, and Part 3 where we moved away from a static grid and instead had coordinates and available neighbours.
Through playing around with the code and algorithm I found various things that I wanted to change, mainly for the purpose of optimisation. With a small 'map' it appears to find paths very fast, but as you increase the search space this can exponentially increase the search time - and we need ways to make it as fast as possible.
1. Use a dictionary rather than looking up in the pandas DataFrame.
Instead of using the available neighbours function we can pre-process our neighbours into a dictionary with the origin coordinate as the key and a list of neighbours as the value. We can then call these neighbours directly in the astar function using:
neighbours = coord_dict.get(current)
For a large map, this will make an immense difference to the search speed. I found somewhere in the range of 100x for a use case I was investiagating but it will obviously depend on map size.
2. This one is a minor change - but in the heuristic function...
def heuristic(a, b):
return np.sqrt((b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2 + (b[2] - a[2]) ** 2)
...we're taking the square root. That is correct if you want to know the precise distance, but all we want is a way to compare each neighbour regarding how far they are from the destination. As this calculation is happening for every neighbour, even small time costs will add up
3. If the neighbour is in the closed loop, skip
In the logic used in previous parts, we were actually only skipping a neighbour if it was in the closed list and it had a g-score higher than the current neighbour. I've found this logic actually means we're checking nodes multiple times, which can be unnecessary and very inefficient. This is a trade-off however, the original logic is slow but allows the algorithm to re-consider a possible path and in general it will always find the shortest path. The new logic is much faster and will find a very similar path but not always the absolute shortest.
If you want to implement the change, this code:
tentative_g_score = gscore[current] + heuristic(current, neighbour)
if neighbour in close_set and tentative_g_score >= gscore.get(neighbour, 0):
continue
if tentative_g_score < gscore.get(neighbour, 0) or neighbour not in [i[1]for i in oheap]:
came_from[neighbour] = current
gscore[neighbour] = tentative_g_score
fscore[neighbour] = tentative_g_score + heuristic(neighbour, goal)
heapq.heappush(oheap, (fscore[neighbour], neighbour))
Becomes:
if (gscore[current] + heuristic(current, neighbour)) < gscore.get(neighbour, 0) or neighbour not in [i[1]for i in open_heap]:
came_from[neighbour] = current
gscore[neighbour] = gscore[current] + heuristic(current, neighbour)
fscore[neighbour] = gscore[neighbour] + heuristic(neighbour, goal)
heapq.heappush(open_heap, (fscore[neighbour], neighbour))
This should have a large impact on run time:
4. Another minor one, but I was using a line of code that didn't need to be there:
for x, y, z in neighbours:
neighbour = x, y, z
Can just be:
for neighbour in neighbours:
FULL CODE:
def heuristic(current, destination):
return (destination[0] - current[0]) ** 2 + (destination[1] - current[1]) ** 2 + (destination[2] - current[2]) ** 2
def astar(start, goal):
close_set = set() # closed list
came_from = {} # parent node dict
gscore = {start:0} # g-score dict
fscore = {start:heuristic(start, goal)} #f score dict
open_heap = [] #open list
heapq.heappush(open_heap, (fscore[start], start)) # put start node into open list
while open_heap:
current = heapq.heappop(open_heap)[1] # extract current node
if current == goal: # if the current is the goal, compile the path back to the origin (from the parent list)
data = []
while current in came_from:
data.append(current)
current = came_from[current]
return data
neighbours = coord_dict.get(current) # find available neighbours
if neighbours == None:
neighbours = []
close_set.add(current) # add current node to closed list
for neighbour in neighbours: # iterate through neighbours
if neighbour in close_set: # if neighbour is in closed set, don't consider
continue
if (gscore[current] + heuristic(current, neighbour)) < gscore.get(neighbour, 0) or neighbour not in [i[1]for i in open_heap]: # if neighbour has lower g score OR isn't on the open list already
came_from[neighbour] = current # add to parent dict
gscore[neighbour] = gscore[current] + heuristic(current, neighbour) # calculate g score
fscore[neighbour] = gscore[neighbour] + heuristic(neighbour, goal) # calculate f score (g+h)
heapq.heappush(open_heap, (fscore[neighbour], neighbour)) # add to open list
return []