-
Notifications
You must be signed in to change notification settings - Fork 6.4k
/
Copy pathautorec.py
126 lines (100 loc) · 3.04 KB
/
autorec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# https://udemy.com/recommender-systems
# https://deeplearningcourses.com/recommender-systems
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from scipy.sparse import save_npz, load_npz
import keras.backend as K
from keras.models import Model
from keras.layers import Input, Dropout, Dense
from keras.regularizers import l2
from keras.optimizers import SGD
# config
batch_size = 128
epochs = 20
reg = 0.0001
# reg = 0
A = load_npz("Atrain.npz")
A_test = load_npz("Atest.npz")
mask = (A > 0) * 1.0
mask_test = (A_test > 0) * 1.0
# make copies since we will shuffle
A_copy = A.copy()
mask_copy = mask.copy()
A_test_copy = A_test.copy()
mask_test_copy = mask_test.copy()
N, M = A.shape
print("N:", N, "M:", M)
print("N // batch_size:", N // batch_size)
# center the data
mu = A.sum() / mask.sum()
print("mu:", mu)
# build the model - just a 1 hidden layer autoencoder
i = Input(shape=(M,))
# bigger hidden layer size seems to help!
x = Dropout(0.7)(i)
x = Dense(700, activation='tanh', kernel_regularizer=l2(reg))(x)
# x = Dropout(0.5)(x)
x = Dense(M, kernel_regularizer=l2(reg))(x)
def custom_loss(y_true, y_pred):
mask = K.cast(K.not_equal(y_true, 0), dtype='float32')
diff = y_pred - y_true
sqdiff = diff * diff * mask
sse = K.sum(K.sum(sqdiff))
n = K.sum(K.sum(mask))
return sse / n
def generator(A, M):
while True:
A, M = shuffle(A, M)
for i in range(A.shape[0] // batch_size + 1):
upper = min((i+1)*batch_size, A.shape[0])
a = A[i*batch_size:upper].toarray()
m = M[i*batch_size:upper].toarray()
a = a - mu * m # must keep zeros at zero!
# m2 = (np.random.random(a.shape) > 0.5)
# noisy = a * m2
noisy = a # no noise
yield noisy, a
def test_generator(A, M, A_test, M_test):
# assumes A and A_test are in corresponding order
# both of size N x M
while True:
for i in range(A.shape[0] // batch_size + 1):
upper = min((i+1)*batch_size, A.shape[0])
a = A[i*batch_size:upper].toarray()
m = M[i*batch_size:upper].toarray()
at = A_test[i*batch_size:upper].toarray()
mt = M_test[i*batch_size:upper].toarray()
a = a - mu * m
at = at - mu * mt
yield a, at
model = Model(i, x)
model.compile(
loss=custom_loss,
optimizer=SGD(lr=0.08, momentum=0.9),
# optimizer='adam',
metrics=[custom_loss],
)
r = model.fit(
generator(A, mask),
validation_data=test_generator(A_copy, mask_copy, A_test_copy, mask_test_copy),
epochs=epochs,
steps_per_epoch=A.shape[0] // batch_size + 1,
validation_steps=A_test.shape[0] // batch_size + 1,
)
print(r.history.keys())
# plot losses
plt.plot(r.history['loss'], label="train loss")
plt.plot(r.history['val_loss'], label="test loss")
plt.legend()
plt.show()
# plot mse
plt.plot(r.history['custom_loss'], label="train mse")
plt.plot(r.history['val_custom_loss'], label="test mse")
plt.legend()
plt.show()