Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use an already trained Keras model to predict on lots of data #35

Open
mrocklin opened this issue Aug 31, 2018 · 17 comments
Open

Use an already trained Keras model to predict on lots of data #35

mrocklin opened this issue Aug 31, 2018 · 17 comments
Labels
help wanted Extra attention is needed

Comments

@mrocklin
Copy link
Member

A common approach is to train on a bit of data and then use that trained model to predict on lots of data. We could do this using ParallelPostFit in dask-ml, or we can use X.map_blocks or df.map_partitions. In either case we might want to be a bit careful about avoiding repeated serializations costs. For example, in the following case I suspect that we include the serialized model in every task

# maybe bad?
model = load_model()
predictions = X.map_blocks(model.predict)  

It's probably better to encourage the user to keep the model delayed

# maybe bad?
model = dask.delayed(load_model)()
predictions = X.map_blocks(model.predict)  

We should also ensure that dask-ml does this correctly, and includes the model as a single task in the graph so that it gets sent around appropriately (cc @TomAugspurger )

I'm also generally curious if a Keras model that lives on the GPU will eventually make its way back onto the GPU when deserializing.

@TomAugspurger
Copy link
Member

FYI, I started on this at https://gist.github.com/TomAugspurger/2889a052b5fec4d691f83ba2062d2d92

As you predicted X.map_blocks(model.predict) was slow.

I stopped as soon as I hit an error, and didn't do any profiling yet. I'll pick it up again soon, but don't want else to duplicate effort.

@mrocklin
Copy link
Member Author

mrocklin commented Oct 25, 2018 via email

@TomAugspurger
Copy link
Member

Oh, and /profile-server is going to be extremely useful here. On a whim, I tried X.map_blocks(delayed(model.predict)) and the scheduler has been at 100% CPU for a minute while the workers are idle.

@TomAugspurger
Copy link
Member

Right I think I'm stalled on deserializing the TensorFlow graph in a new process https://gist.github.com/33efb49efe611701ef122f577d0e0430

@TomAugspurger
Copy link
Member

Probably putting this on the backburner for now, if others want to take a look.

@AakashKumarNain
Copy link

@TomAugspurger @mrocklin Once we have our stacked delayed dask array, can't we just generate batches of data from it on the fly? Something like this

data = [dd.array.from_delayed(x, shape=(224,224, 3),dtype=np.float32) for x in images]

nb_batches = 100
for i in range(100):
    batch_images, batch_labels = next(data) #just an example to show. 
    model.train_on_batch(batch_images, batch_labels)

Is there any way to do this?

@mrocklin
Copy link
Member Author

It depends on what you mean by "batch" I guess. You can slice into x in a variety of ways

index = np.random.randint(0, x.shape[0], size=10)
batch = x[index]

Some ways of slicing will be cheap (like above), some won't, depending on chunk structure.

@AakashKumarNain
Copy link

AakashKumarNain commented Jun 14, 2019

Thanks @mrocklin I would elaborate a bit on that. Say I have 50,000 images on my disk. I cannot load all the data in memory once. In normal case, we would use a generator that yields batch of data. For example, for a batch size of 32, each batch would contain 32 images. This batch is then fed into the model and trained on it.

Now, with a simple python generator we are using only one core. So, instead of using a python generator, let us say we get delayed dask arrays as

data = [dd.array.from_delayed(x, shape=(224,224, 3),dtype=np.float32) for x in images]

The shape of the final array would be (50000, 224, 224, 3). I am asking that what is the best way to iterate over this delayed array, such that on each iteration, I get a chunk of data containing 32 images

@mrocklin
Copy link
Member Author

The same as you would with NumPy

for i in range(0, x.shape[0], 32):
    chunk = x[i:i+32, ...]

chunk is a dask array here. I'm not sure if that's what you want. You might want to call compute or delay the fit call (although Keras has issues sometimes with moving to other threads).

@AakashKumarNain
Copy link

AakashKumarNain commented Jun 14, 2019

Cool. Thanks a lot for your time. Yeah, I am aware of those issues, and that is why I just want to use dask for batch generation and no delayed calls to fit.

@bw4sz
Copy link

bw4sz commented Sep 9, 2019

@AakashKumarNain I have a similar use case, did you find performance improvements when transitioning from Numpy to dask, reading image slices from file?

@skeller88
Copy link

@AakashKumarNain same question on this. What code did you end up using? How was the performance? I want to use a keras.utils.Sequence subclass to leverage keras fit_generator, so I'm thinking something that keeps the images in a dask array and then loads each batch into memory:

class DaskImageSequence(keras.utils.Sequence):
    def __init__(self, x: dask.array, y: dask.array, batch_size: int):
        self.x = x
        self.y = y
        self.batch_size = batch_size

    def __len__(self):
        len_x = self.x.shape[0].compute()
        return int(np.ceil(len_x / self.batch_size))

    def __getitem__(self, batch_num) -> Tuple[np.ndarray, np.ndarray]:
        batch_x = self.x[batch_num * self.batch_size:(batch_num + 1) * self.batch_size].compute()
        batch_y = self.y[batch_num * self.batch_size:(batch_num + 1) * self.batch_size].compute()
        return batch_x, batch_y

@AakashKumarNain
Copy link

@skeller88 I didn't try it. I was trying to benchmark it with tf.dataset. But this certainly looks good to this point.

@bw4sz
Copy link

bw4sz commented Jan 15, 2020

@mrocklin I think I'm stumbling on the exact issue "#maybe bad" mentioned at the top.

psuedo code (working on reproducible)

We have a large number of numpy arrays (geospatial tiles) and an object detection model.

This works in serial.

    model = create_model()
    results = []
    for tile in tilelist:
        boxes = model.predict_tile(tile)
        results.append(boxes)

Following your thought from above,
this

    results = []
    for tile in tilelist:
        model = dask.delayed(create_model)()
        boxes = dask.delayed(model.predict_tile)(tile)
        results.append(boxes)
        
    all_boxes = dask.compute(*results)

has some sort of multiprocessing tensorflow error

builtins.ValueError: Tensor Tensor("filtered_detections/map/TensorArrayStack/TensorArrayGatherV3:0", shape=(?, 300, 4), dtype=float32) is not an element of this graph.

one level up on traceback is
tensorflow/python/framework/ops.py", line 3796, in as_graph_element

    with self._lock:
      return self._as_graph_element_locked(obj, allow_tensor, allow_operation)

testing on LocalCluster on CPU, but will eventually move to SLURM with GPUs.

perhaps related to (dask/distributed#878, dask/dask-ml#281)

For anyone that comes looking, i'm giving up here because I think its a bit of a red herring. I was just using LocalCluster to run tests, and my sense is that is part of the problem. I can see that keras serialization is on going challenge, and my ultimate goal is to get this running on a SLURM cluster, which in this case might be quite a bit simpler. Leaving this note here for others. I will open a reproducible example tomorrow. The problem persists on dask-jobqueue, and I've looked through all the pertinent issues and I feel like the answer is known, but not obvious documented.

@bw4sz
Copy link

bw4sz commented Jan 16, 2020

I created a working example here for those who find this link: dask/distributed#2333

@mrocklin
Copy link
Member Author

This keeps coming up. I'm adding it to the core maintenance project board.
https://stackoverflow.com/questions/61924824/how-to-do-model-predict-using-distributed-dask-with-a-pre-trained-keras-model

@bw4sz
Copy link

bw4sz commented May 23, 2020 via email

@jacobtomlinson jacobtomlinson added the help wanted Extra attention is needed label Oct 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

6 participants