Skip to content

WIP: ENH: Basic implementation of SGD #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion dask_glm/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from dask import delayed, persist, compute, set_options
import functools
import numpy as np
import numpy.linalg as LA
import dask.array as da
from scipy.optimize import fmin_l_bfgs_b
import sklearn.utils.random


from dask_glm.utils import dot, normalize
Expand Down Expand Up @@ -138,6 +140,119 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs):
return beta


def _get_n(n, kwargs):
if np.isnan(n):
if 'n' in kwargs:
return kwargs.get('n')
raise ValueError('Could not get the number of examples `n`. Pass the '
'number of examples in as a keyword argument: '
'`sgd(..., n=n)`. If using a distributed dataframe, '
'`sgd(..., n=len(ddf))` works')
return n


def _shuffle_blocks(x, random_state=None):
rng = sklearn.utils.random.check_random_state(random_state)
i = rng.permutation(x.shape[0]).astype(int)
y = x[i]
return y


def _index_full_to_chunk(index, chunks):
index.sort()
boundaries = np.cumsum(chunks)
out = []
for lower, upper in zip(boundaries, boundaries[1:]):
i = (lower <= index) & (index < upper)
out += [index[i]]
return out


@normalize
def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=1000,
initial_step=1e-4, callback=None, average=True, maxiter=np.inf,
**kwargs):
r"""Stochastic Gradient Descent.

Parameters
----------
X: array - like, shape(n_samples, n_features)
y: array - like, shape(n_samples,)
epochs: int, float
maximum number of passes through the dataset
tol: float
Maximum allowed change from prior iteration required to
declare convergence
batch_size: int
The batch size used to approximate the gradient. Larger batch sizes
will approximate the gradient better.
initial_step: float
The initial step size. The step size is decays like 1 / k.
callback: callable
A callback to call every iteration that accepts keyword arguments
`X`, `y`, `beta`, `grad`, `nit` (number of iterations) and `family`
average: bool
To average the parameters found or not. See[1]_.
family: Family

Returns
-------
beta : array-like, shape (n_features,)
"""
# import ipdb; ipdb.set_trace()
gradient = family.gradient
n, p = X.shape
n = _get_n(n, kwargs)

beta = np.zeros(p)
beta_sum = np.zeros_like(beta)
beta = da.from_array(beta, chunks=p)
beta_sum = da.from_array(beta_sum, chunks=p)
iters = 0

# step_size = O(1/sqrt(k)) from "Non-asymptotic analysis of
# stochastic approximation algorithms for machine learning" by
# Moulines, Eric and Bach, Francis Rsgd
# but, this may require many iterations. Using
# step_size = lambda init, nit, decay: init * decay**(nit//n)
# is used in practice but not testing now
step_size = lambda init, nit: init / np.sqrt(nit + 1)
rng = np.random.RandomState(42)
finished = False
while not finished:
for k in range(n // batch_size):
beta_old = beta.copy()
iters += 1

i = rng.choice(n, size=batch_size).astype(int)
i.sort()
i = da.from_array(i, chunks=len(i))
X_batch = X[i]
y_batch = y[i]

Xbeta = dot(X_batch, beta)
grad = gradient(Xbeta, X_batch, y_batch)
beta -= step_size(initial_step, iters) * (n / batch_size) * grad
if average:
beta_sum += beta

too_long = (iters / n > epochs) or (iters > maxiter)
if too_long:
finished = True
break
if iters % 1000 == 0:
rel_error = da.linalg.norm(beta_old - beta)
rel_error /= da.linalg.norm(beta)
converged = rel_error < tol
if converged.compute():
finished = True
break
finished = True
if average:
return beta_sum.compute() / iters
return beta.compute()


@normalize
def newton(X, y, max_iter=50, tol=1e-8, family=Logistic, **kwargs):
"""Newtons Method for Logistic Regression.
Expand Down Expand Up @@ -430,5 +545,6 @@ def proximal_grad(X, y, regularizer='l1', lamduh=0.1, family=Logistic,
'gradient_descent': gradient_descent,
'newton': newton,
'lbfgs': lbfgs,
'proximal_grad': proximal_grad
'proximal_grad': proximal_grad,
'sgd': sgd
}
7 changes: 4 additions & 3 deletions dask_glm/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def family(self):

def __init__(self, fit_intercept=True, solver='admm', regularizer='l2',
max_iter=100, tol=1e-4, lamduh=1.0, rho=1,
over_relax=1, abstol=1e-4, reltol=1e-2):
over_relax=1, abstol=1e-4, reltol=1e-2, **kwargs):
self.fit_intercept = fit_intercept
self.solver = solver
self.regularizer = regularizer
Expand Down Expand Up @@ -61,9 +61,10 @@ def __init__(self, fit_intercept=True, solver='admm', regularizer='l2',

self._fit_kwargs = {k: getattr(self, k) for k in fit_kwargs}

def fit(self, X, y=None):
def fit(self, X, y=None, **kwargs):
X_ = self._maybe_add_intercept(X)
self._coef = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs)
self._coef = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs,
**kwargs)

if self.fit_intercept:
self.coef_ = self._coef[:-1]
Expand Down