-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstrategy.py
98 lines (78 loc) · 3.34 KB
/
strategy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from abc import ABC, abstractmethod, abstractclassmethod
from collections import defaultdict
from utils import PriorityQueue
class Strategy(ABC):
@staticmethod
def reconstract_path(came_from, current):
total_path = [current]
while current in came_from.keys():
current = came_from[current]
total_path.append(current)
total_path.reverse()
return total_path
@abstractclassmethod
def solve(cls, start):
pass
class AStar(Strategy):
@classmethod
def solve(cls, start, computation_progress = None, analysis_graph = None,
heuristic = None, distance = lambda state1, state2: 1):
if not start.is_solvable():
return None
open_set = PriorityQueue()
open_set.add_task(start, priority=heuristic(start))
closed_set = set()
came_from = {}
g_score = defaultdict(lambda: float('inf'))
g_score[start] = 0
f_score = defaultdict(lambda: float('inf'))
f_score[start] = heuristic(start)
current = open_set.pop_task()
while current:
closed_set.add(current)
if len(closed_set) %1000 == 0:
start.update_computation_progress(computation_progress, len(closed_set))
if current.is_goal():
result = cls.reconstract_path(came_from, current)
if analysis_graph:
analysis_graph.update(len(result), len(closed_set))
return result
for neighbor in current.next_states():
#if neighbor in closed_set:
# continue
tentative_g_score = g_score[current] + distance(current, neighbor)
if tentative_g_score < g_score[neighbor]:
came_from[neighbor] = current
g_score[neighbor] = tentative_g_score
f_score[neighbor] = g_score[neighbor] + heuristic(neighbor)
open_set.add_task(neighbor, priority=f_score[neighbor])
current = open_set.pop_task()
return None
class Dijkstra(Strategy):
@classmethod
def solve(cls, start, computation_progress = None,
distance = lambda state1, state2: 1):
distances = defaultdict(lambda: float('inf'))
distances[start] = 0
open_set = PriorityQueue()
open_set.add_task(start, priority=distances[start])
closed_set = set()
came_from = {}
current = open_set.pop_task()
while current:
closed_set.add(current)
print(len(closed_set))
if len(closed_set) %1000 == 0:
start.update_computation_progress(computation_progress, len(closed_set))
if current.is_goal():
return cls.reconstract_path(came_from, current)
for neighbor in current.next_states():
if neighbor in closed_set:
continue
new_distance = distances[current] + distance(current, neighbor)
if new_distance < distances[neighbor] :
came_from[neighbor] = current
distances[neighbor] = new_distance
open_set.add_task(neighbor, priority=new_distance)
current = open_set.pop_task()
return None