Skip to content

Sparse+dense mixed arrays #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 51 additions & 18 deletions dask_glm/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

from __future__ import absolute_import, division, print_function

from dask import delayed, persist, compute
import functools
import numpy as np
import operator

import dask
from dask import delayed, persist, compute, sharedict
from dask.utils import ensure_dict
import dask.array as da
import numpy as np
from scipy.optimize import fmin_l_bfgs_b

import toolz

from dask_glm.utils import dot, exp, log1p
from dask_glm.families import Logistic
Expand Down Expand Up @@ -152,7 +156,6 @@ def newton(X, y, max_iter=50, tol=1e-8, family=Logistic):

def admm(X, y, regularizer=L1, lamduh=0.1, rho=1, over_relax=1,
max_iter=250, abstol=1e-4, reltol=1e-2, family=Logistic):

pointwise_loss = family.pointwise_loss
pointwise_gradient = family.pointwise_gradient
regularizer = _regularizers.get(regularizer, regularizer) # string
Expand All @@ -173,15 +176,18 @@ def wrapped(beta, X, y, z, u, rho):
f = create_local_f(pointwise_loss)
fprime = create_local_gradient(pointwise_gradient)

nchunks = getattr(X, 'npartitions', 1)
# nchunks = X.npartitions
try:
nchunks = len(X.chunks[0])
except AttributeError: # NumPy array input
nchunks = 1

(n, p) = X.shape
# XD = X.to_delayed().flatten().tolist()
# yD = y.to_delayed().flatten().tolist()

if isinstance(X, da.Array):
XD = X.rechunk((None, X.shape[-1])).to_delayed().flatten().tolist()
XD = X.to_delayed().tolist()
else:
XD = [X]

if isinstance(y, da.Array):
yD = y.rechunk((None, y.shape[-1])).to_delayed().flatten().tolist()
else:
Expand All @@ -192,7 +198,6 @@ def wrapped(beta, X, y, z, u, rho):
betas = np.array([np.ones(p) for i in range(nchunks)])

for k in range(max_iter):

# x-update step
new_betas = [delayed(local_update)(xx, yy, bb, z, uu, rho, f=f,
fprime=fprime) for
Expand All @@ -219,21 +224,47 @@ def wrapped(beta, X, y, z, u, rho):
reltol * np.linalg.norm(rho * u)

if primal_res < eps_pri and dual_res < eps_dual:
print("Converged!", k)
break

return z


def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b):

beta = beta.ravel()
u = u.ravel()
z = z.ravel()
solver_args = (X, y, z, u, rho)
beta, f, d = solver(f, beta, fprime=fprime, args=solver_args,
maxiter=200,
maxfun=250)

if len(X) > 1:
# Construct dask graph for computation
X = [da.from_array(x, chunks=x.shape, name=False,
getitem=operator.getitem) for x in X]
X = da.concatenate(X, axis=1).persist(get=dask.get)
beta = da.from_array(beta, chunks=beta.shape, name=False,
getitem=operator.getitem).persist(get=dask.get)
ff = f(beta, X, y, z, u, rho)
ffprime = fprime(beta, X, y, z, u, rho)
ff, ffprime = dask.delayed(ff), dask.delayed(ffprime)
dsk = ensure_dict(sharedict.merge(ff.dask, ffprime.dask))

beta_key = beta._keys()[0]
def f2(beta):
""" Reuse existing graph, just swap in new beta and compute """
dsk[beta_key] = beta
result, gradient = dask.get(dsk, [ff.key, ffprime.key])
print(result, gradient)
return result, gradient

solver_args = ()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no solver_args in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are all, I think, in the task graph. My assumption is that these will not change during the call to local_update. Is this correct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's correct.

else:
X = X[0]
solver_args = (X, y, z, u, rho)

def f2(beta, *args):
result, gradient = f(beta, *args), fprime(beta, *args)
print(result, gradient)
return result, gradient

beta, _, _ = solver(f2, beta, args=solver_args, maxiter=200, maxfun=250)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^^ I think this line is incorrect; solver (in this case fmin_l_bfgs_b) has this call signature; the first argument is the function f, and a keyword argument needs to be fprime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that I didn't have to specify fprime if f returned two results

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it looks like you're right; sorry, I've never called it that way. Hmm back to the drawing board.


return beta

Expand Down Expand Up @@ -371,14 +402,16 @@ def proximal_grad(X, y, regularizer=L1, lamduh=0.1, family=Logistic,
break
stepSize *= backtrackMult
if stepSize == 0:
print('No more progress')
if verbose:
print('No more progress')
break
df /= max(func, lf)
db = 0
if verbose:
print('%2d %.6e %9.2e %.2e %.1e' % (k + 1, func, df, db, stepSize))
if df < tol:
print('Converged')
if verbose:
print('Converged')
break
stepSize *= stepGrowth
backtrackMult = nextBacktrackMult
Expand Down
8 changes: 5 additions & 3 deletions dask_glm/tests/test_algos_families.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,20 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family):
])
@pytest.mark.parametrize('N', [1000])
@pytest.mark.parametrize('nchunks', [1, 10])
@pytest.mark.parametrize('mchunks', [1, 2])
@pytest.mark.parametrize('family', [Logistic, Normal])
@pytest.mark.parametrize('lam', [0.01, 1.2, 4.05])
@pytest.mark.parametrize('reg', [L1, L2])
def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):
def test_basic_reg_descent(func, kwargs, N, nchunks, mchunks, family, lam, reg):
beta = np.random.normal(size=2)
M = len(beta)
X = da.random.random((N, M), chunks=(N // nchunks, M))
X = da.random.random((N, M), chunks=(N // nchunks, M // mchunks))
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

X, y = persist(X, y)

result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs)
with dask.set_options(get=dask.get): # for easy debugging when errors occur
result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs)
test_vec = np.random.normal(size=2)

f = reg.add_reg_f(family.pointwise_loss, lam)
Expand Down
8 changes: 5 additions & 3 deletions dask_glm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dask.array as da
import numpy as np
from multipledispatch import dispatch
import toolz


def sigmoid(x):
Expand Down Expand Up @@ -81,7 +82,7 @@ def log1p(A):
@dispatch(object, object)
def dot(A, B):
x = max([A, B], key=lambda x: getattr(x, '__array_priority__', 0))
module = package_of(x)
module = package_of(type(x))
return module.dot(A, B)


Expand Down Expand Up @@ -155,13 +156,14 @@ def exp(x):
return np.exp(x.todense())


def package_of(obj):
@toolz.memoize
def package_of(typ):
""" Return package containing object's definition

Or return None if not found
"""
# http://stackoverflow.com/questions/43462701/get-package-of-python-object/43462865#43462865
mod = inspect.getmodule(obj)
mod = inspect.getmodule(typ)
if not mod:
return
base, _sep, _stem = mod.__name__.partition('.')
Expand Down