-
Notifications
You must be signed in to change notification settings - Fork 35
/
neuralnetwork.py
153 lines (112 loc) · 4.27 KB
/
neuralnetwork.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import random
import math
def activationFunction(x):
return 1.0 / (1.0 + math.exp(-x))
class Node:
def __init__(self):
self.lastOutput = None
self.lastInput = None
self.error = None
self.outgoingEdges = []
self.incomingEdges = []
self.addBias()
def addBias(self):
self.incomingEdges.append(Edge(BiasNode(), self))
def evaluate(self, inputVector):
if self.lastOutput is not None:
return self.lastOutput
self.lastInput = []
weightedSum = 0
for e in self.incomingEdges:
theInput = e.source.evaluate(inputVector)
self.lastInput.append(theInput)
weightedSum += e.weight * theInput
self.lastOutput = activationFunction(weightedSum)
self.evaluateCache = self.lastOutput
return self.lastOutput
def getError(self, label):
''' Get the error for a given node in the network. If the node is an
output node, label will be used to compute the error. For an input node, we
simply ignore the error. '''
if self.error is not None:
return self.error
assert self.lastOutput is not None
if self.outgoingEdges == []: # this is an output node
self.error = label - self.lastOutput
else:
self.error = sum([edge.weight * edge.target.getError(label) for edge in self.outgoingEdges])
return self.error
def updateWeights(self, learningRate):
''' Update the weights of a node, and all of its successor nodes.
Assume self is not an InputNode. If the error, lastOutput, and
lastInput are None, then this node has already been updated. '''
if (self.error is not None and self.lastOutput is not None
and self.lastInput is not None):
for i, edge in enumerate(self.incomingEdges):
edge.weight += (learningRate * self.lastOutput * (1 - self.lastOutput) *
self.error * self.lastInput[i])
for edge in self.outgoingEdges:
edge.target.updateWeights(learningRate)
self.error = None
self.lastInput = None
self.lastOutput = None
def clearEvaluateCache(self):
if self.lastOutput is not None:
self.lastOutput = None
for edge in self.incomingEdges:
edge.source.clearEvaluateCache()
class InputNode(Node):
''' Input nodes simply evaluate to the value of the input for that index.
As such, each input node must specify an index. We allow multiple copies
of an input node with the same index (why not?). '''
def __init__(self, index):
Node.__init__(self)
self.index = index;
def evaluate(self, inputVector):
self.lastOutput = inputVector[self.index]
return self.lastOutput
def updateWeights(self, learningRate):
for edge in self.outgoingEdges:
edge.target.updateWeights(learningRate)
def getError(self, label):
for edge in self.outgoingEdges:
edge.target.getError(label)
def addBias(self):
pass
def clearEvaluateCache(self):
self.lastOutput = None
class BiasNode(InputNode):
def __init__(self):
Node.__init__(self)
def evaluate(self, inputVector):
return 1.0
class Edge:
def __init__(self, source, target):
self.weight = random.uniform(0,1)
self.source = source
self.target = target
# attach the edges to its nodes
source.outgoingEdges.append(self)
target.incomingEdges.append(self)
class Network:
def __init__(self):
self.inputNodes = []
self.outputNode = None
def evaluate(self, inputVector):
assert max([v.index for v in self.inputNodes]) < len(inputVector)
self.outputNode.clearEvaluateCache()
output = self.outputNode.evaluate(inputVector)
return output
def propagateError(self, label):
for node in self.inputNodes:
node.getError(label)
def updateWeights(self, learningRate):
for node in self.inputNodes:
node.updateWeights(learningRate)
def train(self, labeledExamples, learningRate=0.9, maxIterations=10000):
while maxIterations > 0:
for example, label in labeledExamples:
output = self.evaluate(example)
self.propagateError(label)
self.updateWeights(learningRate)
maxIterations -= 1