-
Notifications
You must be signed in to change notification settings - Fork 0
/
Expectation-Maximization-test.py
155 lines (126 loc) · 5.17 KB
/
Expectation-Maximization-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import numpy as np
import matplotlib.pyplot as plt
from math import sqrt, log, exp, pi
from matplotlib.colors import LogNorm
from random import uniform
import random as rd
from sklearn import mixture
n_samples = 300
# generate random sample, two components
np.random.seed(0)
# generate spherical data centered on (20, 20)
shifted_gaussian = np.random.randn(n_samples, 2) + np.array([20, 20])
# generate zero centered stretched Gaussian data
C = np.array([[0., -0.7], [3.5, .7]])
stretched_gaussian = np.dot(np.random.randn(n_samples, 2), C)
# concatenate the two datasets into the final training set
X_train = np.vstack([shifted_gaussian, stretched_gaussian])
#Initialisation
n_components = 2
likelihood = 3
n_iterations = 5
phi = 1/2
mu = 1/2
sigma = 1/2
weights = np.zeros((len(X_train), 2))
class Gaussian:
"Model univariate Gaussian"
def __init__(self, mu, sigma1, sigma2):
#mean and standard deviation
self.mu1 = [rd.randint(-10, 20), rd.randint(-5, 25)]
self.mu2 = [rd.randint(-10, 20), rd.randint(-5, 25)]
self.sigma1 = sigma1
self.sigma2 = sigma2
#probability density function
def pdf(self, datapoint, i):
"Probability of a data point given the current parameters"
if i == 1:
u = (datapoint - self.mu1)
y = ((1 / ((2 * pi) * pow(np.linalg.det(self.sigma1, 1 / 2))) * exp(-np.transpose(u) / self.sigma1 * u / 2)))
else:
u = (datapoint - self.mu2)
y = ((1 / ((2 * pi) * pow(np.linalg.det(self.sigma2, 1 / 2))) * exp(-np.transpose(u) / self.sigma2 * u / 2)))
return y
class GaussianMixture:
"Model mixture of two univariate Gaussians and their EM estimation"
def __init__(self, X_train, mu_min=min(X_train.argmin(0)), mu_max=max(X_train.argmax(0)), sigma_min=.1, sigma_max=1, phi=.5):
self.X_train = X_train
# init with multiple gaussians
self.one = Gaussian(uniform(mu_min, mu_max),
uniform(sigma_min, sigma_max),
uniform(sigma_min, sigma_max))
self.two = Gaussian(uniform(mu_min, mu_max),
uniform(sigma_min, sigma_max),
uniform(sigma_min, sigma_max))
# as well as how much to mix them
self.phi = phi
def Estep(self):
"Perform an E(stimation)-step, freshening up self.loglike in the process"
# compute weights
self.loglike = 0. # = log(p = 1)
for datapoint in self.X_train:
# unnormalized weights
wp1 = self.one.pdf(datapoint, 1) * self.phi
wp2 = self.two.pdf(datapoint, 2) * (1. - self.phi)
# compute denominator
den = wp1 + wp2
# normalize
wp1 /= den
wp2 /= den
# add into loglike
self.loglike += log(wp1 + wp2)
# yield weight tuple
yield (wp1, wp2)
def Mstep(self, weights):
"Perform an M(aximization)-step"
# compute denominators
(left, rigt) = zip(*weights)
one_den = sum(left)
two_den = sum(rigt)
# compute new means
self.one.mu = sum(w * d / one_den for (w, d) in zip(left, X_train))
self.two.mu = sum(w * d / two_den for (w, d) in zip(rigt, X_train))
# compute new sigmas
self.one.sigma = sqrt(sum(w * ((d - self.one.mu) ** 2)
for (w, d) in zip(left, X_train)) / one_den)
self.two.sigma = sqrt(sum(w * ((d - self.two.mu) ** 2)
for (w, d) in zip(rigt, X_train)) / two_den)
# compute new mix
self.mix = one_den / len(X_train)
def iterate(self, N=1, verbose=False):
"Perform N iterations, then compute log-likelihood"
def pdf(self, x):
return (self.mix) * self.one.pdf(x) + (1 - self.mix) * self.two.pdf(x)
def __repr__(self):
return 'GaussianMixture({0}, {1}, mix={2.03})'.format(self.one,
self.two,
self.mix)
def __str__(self):
return 'Mixture: {0}, {1}, mix={2:.03})'.format(self.one,
self.two,
self.mix)
best_mix = None
best_loglike = float('-inf')
phi = GaussianMixture(X_train)
for _ in range(n_iterations):
try:
# train!
phi.iterate(verbose=True)
if phi.loglike > best_loglike:
best_loglike = phi.loglike
best_mix = phi
except (ZeroDivisionError, ValueError, RuntimeWarning): # Catch division errors from bad starts, and just throw them
pass
# display predicted scores by the model as a contour plot
x = np.linspace(-20., 30.)
y = np.linspace(-20., 40.)
X, Y = np.meshgrid(x, y)
XX = np.array([X.ravel(), Y.ravel()]).T
Z = -clf.score_samples(XX)
Z = Z.reshape(X.shape)
CS = plt.contour(X, Y, Z, norm=LogNorm(vmin=1.0, vmax=1000.0), levels=np.logspace(0, 3, 10))
CB = plt.colorbar(CS, shrink=0.8, extend='both')
plt.scatter(X_train[:, 0], X_train[:, 1], .8)
plt.title('Negative log-likelihood predicted by a GMM')
plt.axis('tight')
plt.show()