-
Notifications
You must be signed in to change notification settings - Fork 1
/
gng.py
375 lines (335 loc) · 12 KB
/
gng.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
from random import random
from math import sqrt
"""
Author: Lisa Meeden
Date: 2/14/2008
Implementation of GNG as described in the paper 'A Growing Neural Gas
Network Learns Topologies' by Bernd Fritzke published in 'Advances in
Neural Information Processing 7', MIT Press, 1995.
GNG is an incremental network model able to learn the topological
relationships in a given set of input vectors using a simple Hebb-like
learning rule.
"""
def randomCirclePoint(radius):
"""
Assuming circle is centered at (0,0)
"""
diameter = 2*radius
limit = radius**2
while True:
x = (diameter*random())-radius
y = (diameter*random())-radius
if x**2 + y**2 <= limit:
return [x, y]
def randomSpherePoint(radius):
"""
Assuming sphere is centered at (0,0)
"""
diameter = 2*radius
limit = radius**2
while True:
x = (diameter*random())-radius
y = (diameter*random())-radius
z = (diameter*random())-radius
if x**2 + y**2 + z**2 <= limit:
return [x, y, z]
class Unit:
"""
Each unit in the GNG maintains a reference vector, an error
measure, and a list of edges.
"""
def __init__(self, user_id = "", vector = None, dimension=1338, minVal=0, maxVal=5):
self.dimension = dimension
self.minVal = minVal
self.maxVal = maxVal
self.userId = user_id
if vector:
# print "vector defined"
self.vector = vector
else:
# print "vector undefined"
self.vector = self.randomVector()
self.error = 0
self.edges = []
def __str__(self):
result = "Unit:\n"
result += "Vector: " + self.vectorStr()
result += " Error: " + str(self.error) + "\n"
result += " User_id: " + self.userId
for e in self.edges:
result += e.__str__()
return result
def getUser(self):
return self.userId
def vectorStr(self):
result = "[ "
for i in range(len(self.vector)):
result += "%.3f " % self.vector[i]
result += "] "
return result
def getEdgeTo(self, unit):
"""
Returns the edge to the given unit or None.
"""
for edge in self.edges:
if edge.toUnit == unit:
return edge
return None
def getNeighbors(self):
"""
Returns a list of its immediate neighboring units.
"""
neighbors = []
for edge in self.edges:
neighbors.append(edge.toUnit)
return neighbors
def randomVector(self):
"""
Generats a random reference vector within the appropriate bounds.
"""
vec = []
for i in range(self.dimension):
vec.append(((self.maxVal-self.minVal) * random()) + self.minVal)
return vec
def moveVector(self, towardPoint, lrate):
"""
Moves the reference vector toward the given point based on the
given learning rate.
"""
for i in range(len(towardPoint)):
# print i, len(self.vector)
self.vector[i] += lrate*(towardPoint[i]-self.vector[i])
class Edge:
"""
Edges in the GNG are undirected. However for ease of
implementation, the edges are represented as one-way. For example,
if unitA and unitB and connected, then unitA maintains an edge to
unitB and unitB maintains an edge to unitA. Edges also maintain
their age. If an edge becomes too old, it will be removed.
"""
def __init__(self, toUnit):
self.toUnit = toUnit
self.age = 0
def __str__(self):
result = "Edge to: "
result += self.toUnit.vectorStr()
result += " Age: " + str(self.age) + "\n"
return result
class GrowingNeuralGas:
"""
Parameters:
winnerLearnRate Used to adjust closest unit towards input point
neighborLearnRate Used to adjust other neighbors towards input point
maxAge Edges older than maxAge are removed
reduceError All errors are reduced by this amount each GNG step
stepsToInsert A new unit is added periodically based on this
insertError Error of every new unit is reduced by this amount
NOTE: The default values are taken from the paper.
The GNG always begins with two randomly placed units. It takes as
input a function that will generate the next point from the input
distribution.
"""
def __init__(self, generateNext, length, verbose=0):
self.winnerLearnRate = 0.2
self.neighborLearnRate = 0.006
self.maxAge = 50
self.reduceError = 0.995
self.stepsToInsert = 10
self.insertError = 0.5
self.verbose = verbose
self.stepCount = 1
self.units = [Unit(dimension=length),
Unit(dimension=length),
Unit(dimension=length),
Unit(dimension=length),
Unit(dimension=length)]
self.generateNext = generateNext
def __str__(self):
result = "GNG step " + str(self.stepCount) + "\n"
result += "Number of units: " + str(len(self.units)) + "\n"
result += "Average error: " + str(self.averageError()) + "\n"
if self.verbose > 1:
for unit in self.units:
result += unit.__str__()
return result
def distance(self, v1, v2):
"""
Returns the Euclidean distance between two vectors.
"""
total = 0
for i in range(len(v1)):
total += (v1[i] - v2[i])**2
return sqrt(total)
def plot(self):
"""
Creates a file readable by xgraph of the first two dimensions
of every unit vector and its edges
"""
filename = "plot%d" % self.stepCount
data = open(filename, "w")
for unit in self.units:
for edge in unit.edges:
data.write("move %f %f\n" % (unit.vector[0], unit.vector[1]))
next = edge.toUnit
data.write("%f %f\n" % (next.vector[0], next.vector[1]))
data.close()
def unitOfInterest(self, unit, cutoff):
"""
Used to focus on particular units when debugging.
"""
for value in unit.vector:
if abs(value) > cutoff:
return True
else:
return False
def computeDistances(self, point):
"""
Computes the distances between the given point and every unit
in the GNG. Returns the closest and next closest units.
"""
dists = []
for i in range(len(self.units)):
dists.append((self.distance(self.units[i].vector, point), i))
dists.sort()
best = dists[0][1]
second = dists[1][1]
if self.verbose > 1:
print "Processing:", point
print "Closest:", self.units[best].vectorStr()
print "Second:", self.units[second].vectorStr()
print
return self.units[best], self.units[second]
def incrementEdgeAges(self, unit):
"""
Increments the ages of every unit directly connected to the
given unit.
"""
for outgoing in unit.edges:
outgoing.age += 1
incoming = outgoing.toUnit.getEdgeTo(unit)
incoming.age += 1
def connectUnits(self, a, b):
"""
Adds the appropriate edges to connect units a and b.
"""
if self.verbose >= 1:
print "Add edge:", a.vectorStr(), b.vectorStr()
a.edges.append(Edge(b))
b.edges.append(Edge(a))
def disconnectUnits(self, a, b):
"""
Removes the appropriate edges to disconnect units a and b.
"""
if self.verbose >= 1:
print "Remove edge:", a.vectorStr(), b.vectorStr()
a.edges.remove(a.getEdgeTo(b))
b.edges.remove(b.getEdgeTo(a))
def removeStaleEdges(self):
"""
Checks all edges in the GNG and removes any with an age exceeding
the maxAge parameter. Also removes any unit that is completely
disconnected.
"""
for unit in self.units:
i = len(unit.edges)-1
while i>=0:
if unit.edges[i].age > self.maxAge:
if self.verbose >= 1:
adjacent = unit.edges[i].toUnit
print "Removing stale edge: %s %s" % \
(unit.vectorStr(), adjacent.vectorStr())
unit.edges.pop(i)
i -= 1
i = len(self.units)-1
while i>=0:
if len(self.units[i].edges) == 0:
if self.verbose >= 1:
print "Removing disconnected unit:", unit.vectorStr()
self.units.pop(i)
i -= 1
def maxErrorUnit(self, unitList):
"""
Given a list of units, returns the unit with the highest error.
"""
highest = unitList[0]
for i in range(1, len(unitList)):
if unitList[i].error > highest.error:
highest = unitList[i]
return highest
def averageError(self):
"""
Returns the average error across all units in the GNG.
"""
total = 0.0
for unit in self.units:
total += unit.error
return total/len(self.units)
def insertUnit(self, user_id=""):
"""
Inserts a new unit into the GNG. Finds the unit with the highest
error and then finds its topological neighbor with the highest
error and inserts the new unit between the two.
"""
worst = self.maxErrorUnit(self.units)
if self.verbose > 1:
print "Max error", worst.__str__()
worstNeighbor = self.maxErrorUnit(worst.getNeighbors())
newVector = []
for i in range(len(worst.vector)):
newVector.append(0.5 * (worst.vector[i] + worstNeighbor.vector[i]))
newUnit = Unit(vector=newVector,user_id=user_id)
if self.verbose > 0:
print "Insert unit:", newUnit.vectorStr()
self.units.append(newUnit)
self.connectUnits(newUnit, worst)
self.connectUnits(newUnit, worstNeighbor)
self.disconnectUnits(worst, worstNeighbor)
worst.error *= self.insertError
worstNeighbor.error *= self.insertError
newUnit.error = worst.error
def reduceAllErrors(self):
"""
Decays the error at all units.
"""
for unit in self.units:
unit.error *= self.reduceError
def step(self):
"""
Processes one input at a time through the GNG.
Do an experiment to illustrate the ability of GNG to grow and
shrink. Generate input from the unit circle. The change the
distribution for a time. Eventually revert back to the
original distribution.
"""
# if self.stepCount < 5000 or self.stepCount > 10000:
nextPoint, nextName = self.generateNext()
# else:
# nextPoint = self.generateNext(0.5)
best, second = self.computeDistances(nextPoint)
self.incrementEdgeAges(best)
best.error += self.distance(best.vector, nextPoint)**2
best.moveVector(nextPoint, self.winnerLearnRate)
for unit in best.getNeighbors():
unit.moveVector(nextPoint, self.neighborLearnRate)
edgeExists = best.getEdgeTo(second)
if edgeExists:
edgeExists.age = 0
second.getEdgeTo(best).age = 0
else:
self.connectUnits(best, second)
self.removeStaleEdges()
if self.stepCount % self.stepsToInsert == 0:
self.insertUnit(nextName)
self.reduceAllErrors()
### To view progress of learning
if self.stepCount % 1000 == 0:
self.plot()
self.stepCount += 1
def main():
gng = GrowingNeuralGas(randomCirclePoint, 2, verbose=0)
for i in range(15000):
gng.step()
if gng.stepCount % 1000==0:
print gng
if __name__ == '__main__':
main()