-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
338 lines (299 loc) · 11.7 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#Copyright 2020 ModEngineer
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import warnings
from copy import deepcopy
class GraphError(Exception):
"""Error class for this module"""
pass
class Auto:
"""NoneType-like object that represents the automatic value"""
def __repr__(self):
return "Auto"
def __str__(self):
return "Auto"
class ExitContainer():
"""Error container class that can be used to raise the error or to get an exit code. 0 is a generic, non-error code, negative codes should represent errors, and positive codes should represent non-error program states."""
class ExitContainerWarning(Warning):
pass
def __init__(self, exception=None, code=Auto()):
#Exit code processing with exception checking
self.exception = exception
#Auto handling
if type(code) is Auto or code is Auto:
if exception == None:
self.code = 0
else:
self.code = -0x1
else:
self.code = code
#Positive code handling and exception warning
if type(code) is int:
if code >= 0:
if exception != None:
warnings.warn(
"ExitContainer with an exception should not have a positive code.",
category=ExitContainer.ExitContainerWarning)
#Negative code handling and exception warning
elif code < 0:
if exception == None:
warnings.warn(
"ExitContainer without an exception should not have a negative code.",
category=ExitContainer.ExitContainerWarning)
def raise_exception(self):
"""A function that raises the ExitContainer's exception if it has an exception stored"""
if self.exception != None:
raise self.exception
#Graph validation function. Checks graph connections, endpoint count, and nonexistent nodes listed as connections.
def validate_graph(graphData):
try:
endpointCount = 0
for node in graphData:
if type(graphData[node]) not in [list, tuple]:
return ExitContainer(
TypeError(
"Type " + repr(type(graphData[node])) +
" is not a list or a tuple. The graphData argument requires that connections are stored in lists or tuples."
), -0x2)
try:
for connectedNode in graphData[node]:
if not node in graphData[connectedNode]:
return ExitContainer(
GraphError(
"Nodes " + repr(node) + " and " +
repr(connectedNode) +
" do not list each other in their connections."
), -0x3)
except KeyError as exc:
try:
raise GraphError("Node " + repr(exc.args[0]) +
" does not exist.") from exc
except GraphError as exc2:
return ExitContainer(exc2, -0x4)
if len(graphData[node]) % 2 == 1:
endpointCount += 1
if endpointCount > 2:
return ExitContainer(
GraphError(
"Graph cannot contain more than 2 nodes of odd degree."
), -0x5)
if not is_connected(graphData):
return ExitContainer(GraphError("Graph is not connected."), -0x6)
return ExitContainer()
except Exception as e:
return ExitContainer(e)
#Node remover
def remove_node(node, graphData):
for connection in graphData[node]:
graphData[connection].remove(node)
del graphData[node]
#Edge remover
def remove_edge(node1, node2, graphData):
graphData[node1].remove(node2)
graphData[node2].remove(node1)
#Node creator
def add_node(node, graphData):
if node in graphData.keys():
raise GraphError("Node already in graph")
graphData[node] = []
#Edge creator
def add_edge(node1, node2, graphData):
if not node1 in graphData.keys():
add_node(node1, graphData)
if not node2 in graphData.keys():
add_node(node2, graphData)
if not node1 in graphData[node2]:
graphData[node2].append(node1)
if not node2 in graphData[node1]:
graphData[node1].append(node2)
def dfs(graphData, vertex=Auto()):
s = []
discovered = []
if type(vertex) is Auto or vertex is Auto:
vertex = list(graphData.keys())[0]
s.append(vertex)
while s:
vertex = s.pop()
if not vertex in discovered:
discovered.append(vertex)
for w in graphData[vertex]:
s.append(w)
return discovered
def is_connected(graphData):
discovered = dfs(graphData)
#DFS, returns a list of discovered nodes after picking an arbitrary node
#Loop that checks if any node isn't discovered
for key in graphData.keys():
if not key in discovered:
return False
return True
#Bridge-finding.
def find_bridges(graphData):
if not is_connected(graphData):
raise GraphError("Unconnected graph cannot be tested for bridges")
#Init
processed = []
bridges = []
#Loop that removes an edge temporarily, runs DFS, and checks if the graph is still connected
for start in graphData.keys():
for end in graphData[start]:
if (start, end) in processed or (end, start) in processed:
continue
processed.append((start, end))
tempGData = deepcopy(graphData)
remove_edge(start, end, tempGData)
if not is_connected(tempGData):
bridges.append((start, end))
return bridges
#Function that collapses a list of lists into a single list
def __collapse_list_list(listList, recursive=False):
outList = []
for item in listList:
if type(item) == list:
if recursive:
item = __collapse_list_list(deepcopy(item), True)
outList += item
else:
outList.append(item)
return outList
#Function that checks if all items in subList are in containerList
def __list_contains_list(subList, containerList):
for item in subList:
if item not in containerList:
return False
return True
#Graph splitter
def split_unconnected(graphData):
if is_connected(graphData):
return [graphData]
dfsresults = []
while not __list_contains_list(graphData.keys(),
__collapse_list_list(dfsresults)):
dfsresults.append(
dfs(graphData,
vertex=[
vert for vert in graphData.keys()
if not vert in __collapse_list_list(dfsresults)
][0]))
outputGraphs = []
for result in dfsresults:
currentGraph = {}
for node in result:
currentGraph[node] = graphData[node]
outputGraphs.append(currentGraph)
return outputGraphs
def split_at_bridge(splitNode, node2, graphData):
bridgeList = find_bridges(graphData)
if not ((splitNode, node2) in bridgeList or
(node2, splitNode) in bridgeList):
raise GraphError("Cannot split graph on non-bridge")
outGraphs = []
outGraphs.append(deepcopy(graphData))
outGraphs.append(deepcopy(graphData))
for connection in outGraphs[0][splitNode]:
if connection != node2:
remove_edge(splitNode, connection, outGraphs[0])
bridgeGraphDFS = dfs(outGraphs[0], splitNode)
loopDict = deepcopy(outGraphs[0])
for node in loopDict:
if not node in bridgeGraphDFS:
remove_node(node, outGraphs[0])
loopDict = deepcopy(outGraphs[1])
for node in loopDict:
if node in bridgeGraphDFS and node != splitNode:
remove_node(node, outGraphs[1])
return outGraphs
#Fleury implementation based on Wikipedia's pseudocode
def fleury(graphData, checkPath=True):
"""Fleury's algorithm for finding a Eulerian path in an undirected graph"""
#Init
tempGData = deepcopy(graphData)
endpoints = []
#Endpoint finding
for node in tempGData:
if len(tempGData[node]) % 2 == 1:
endpoints.append(node)
if len(endpoints) == 0:
currentNode = list(tempGData.keys())[0]
else:
currentNode = endpoints[0]
path = [currentNode]
#Mainloop
while tempGData[currentNode]:
#Next-edge-finder. Prefers non-bridges
filtered_bridges = [bridge[bridge[0]==currentNode] for bridge in find_bridges(tempGData) if bridge[0] == currentNode or bridge[1] == currentNode]
nonBridgeConnections = [
connection for connection in tempGData[currentNode]
if not connection in filtered_bridges
]
if nonBridgeConnections:
nextNode = nonBridgeConnections[0]
else:
nextNode = tempGData[currentNode][0]
#Next-node-queuing and path building
remove_edge(currentNode, nextNode, tempGData)
if not tempGData[currentNode]:
remove_node(currentNode, tempGData)
path.append(nextNode)
currentNode = nextNode
#Path validation
if checkPath:
for node in graphData:
for connection in graphData[node]:
listContainedList = False
for index in range(len(path) - 1):
if __list_contains_list([node, connection],
[path[index], path[index + 1]]):
listContainedList = True
break
if not listContainedList:
return None
return path
# Automatic splitting of graph into multiple graphs with Eulerian paths (NOT FUNCTIONAL)
# def auto_split(graphData, returnGraphs=True, returnPaths=False):
# assert returnGraphs or returnPaths, "auto_split must have a selected return value."
# output = {}
# if returnGraphs:
# output["graphs"] = []
# if returnPaths:
# output["paths"] = []
# fleuryTest = fleury(graphData)
# if fleuryTest:
# if returnGraphs:
# output["graphs"] = [graphData]
# if returnPaths:
# output["paths"] = [fleuryTest]
# return output
# eulerianParts = []
# tempGData = deepcopy(graphData)
# while tempGData:
# currentFleury = fleury(tempGData, False)
# eulerianParts.append(currentFleury)
# if returnGraphs:
# output["graphs"].append({})
# for index, node1 in enumerate(currentFleury[:-1]):
# node2 = currentFleury[index + 1]
# remove_edge(node1, node2, tempGData)
# if returnGraphs:
# add_edge(node1, node2, output["graphs"][-1])
# tempTempGData = deepcopy(tempGData)
# for node in tempTempGData:
# if not tempGData[node]:
# if returnGraphs and not node in __collapse_list_list(
# eulerianParts):
# output["graphs"].append([node])
# remove_node(node, tempGData)
# del tempTempGData
# if returnPaths:
# output["paths"] = eulerianParts
# return output