-
Notifications
You must be signed in to change notification settings - Fork 1
/
multi_gpu_keras.py
168 lines (138 loc) · 6.1 KB
/
multi_gpu_keras.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# neeeded to add this otherwise Keras 2.1.2 multi_gpu_model doesn't load working model
from keras.layers.merge import concatenate
from keras import backend as K
from keras.layers.core import Lambda
from keras.engine.training import Model
import tensorflow as tf
def _get_available_devices():
from tensorflow.python.client import device_lib
local_device_protos = device_lib.list_local_devices()
return [x.name for x in local_device_protos]
def _normalize_device_name(name):
name = name.lower().replace('device:', '')
return name
def multi_gpu_model(model, gpus):
"""Replicates a model on different GPUs.
Specifically, this function implements single-machine
multi-GPU data parallelism. It works in the following way:
- Divide the model's input(s) into multiple sub-batches.
- Apply a model copy on each sub-batch. Every model copy
is executed on a dedicated GPU.
- Concatenate the results (on CPU) into one big batch.
E.g. if your `batch_size` is 64 and you use `gpus=2`,
then we will divide the input into 2 sub-batches of 32 samples,
process each sub-batch on one GPU, then return the full
batch of 64 processed samples.
This induces quasi-linear speedup on up to 8 GPUs.
This function is only available with the TensorFlow backend
for the time being.
# Arguments
model: A Keras model instance. To avoid OOM errors,
this model could have been built on CPU, for instance
(see usage example below).
gpus: Integer >= 2, number of on GPUs on which to create
model replicas.
# Returns
A Keras `Model` instance which can be used just like the initial
`model` argument, but which distributes its workload on multiple GPUs.
# Example
```python
import tensorflow as tf
from keras.applications import Xception
from keras.utils import multi_gpu_model
import numpy as np
num_samples = 1000
height = 224
width = 224
num_classes = 1000
# Instantiate the base model
# (here, we do it on CPU, which is optional).
with tf.device('/cpu:0'):
model = Xception(weights=None,
input_shape=(height, width, 3),
classes=num_classes)
# Replicates the model on 8 GPUs.
# This assumes that your machine has 8 available GPUs.
parallel_model = multi_gpu_model(model, gpus=8)
parallel_model.compile(loss='categorical_crossentropy',
optimizer='rmsprop')
# Generate dummy data.
x = np.random.random((num_samples, height, width, 3))
y = np.random.random((num_samples, num_classes))
# This `fit` call will be distributed on 8 GPUs.
# Since the batch size is 256, each GPU will process 32 samples.
parallel_model.fit(x, y, epochs=20, batch_size=256)
```
"""
if K.backend() != 'tensorflow':
raise ValueError('`multi_gpu_model` is only available '
'with the TensorFlow backend.')
if gpus <= 1:
return model
# raise ValueError('For multi-gpu usage to be effective, '
# 'call `multi_gpu_model` with `gpus >= 2`. '
# 'Received: `gpus=%d`' % gpus)
target_devices = ['/cpu:0'] + ['/gpu:%d' % i for i in range(gpus)]
available_devices = _get_available_devices()
available_devices = [_normalize_device_name(name) for name in available_devices]
for device in target_devices:
if device not in available_devices:
raise ValueError(
'To call `multi_gpu_model` with `gpus=%d`, '
'we expect the following devices to be available: %s. '
'However this machine only has: %s. '
'Try reducing `gpus`.' % (gpus,
target_devices,
available_devices))
def get_slice(data, i, parts):
shape = tf.shape(data)
batch_size = shape[:1]
input_shape = shape[1:]
step = batch_size // parts
if i == gpus - 1:
size = batch_size - step * i
else:
size = step
size = tf.concat([size, input_shape], axis=0)
stride = tf.concat([step, input_shape * 0], axis=0)
start = stride * i
return tf.slice(data, start, size)
all_outputs = []
for i in range(len(model.outputs)):
all_outputs.append([])
# Place a copy of the model on each GPU,
# each getting a slice of the inputs.
for i in range(gpus):
with tf.device('/gpu:%d' % i):
with tf.name_scope('replica_%d' % i):
inputs = []
# Retrieve a slice of the input.
for x in model.inputs:
input_shape = tuple(x.get_shape().as_list())[1:]
slice_i = Lambda(get_slice,
output_shape=input_shape,
arguments={'i': i,
'parts': gpus})(x)
inputs.append(slice_i)
# Apply model on slice
# (creating a model replica on the target device).
outputs = model(inputs)
if not isinstance(outputs, list):
outputs = [outputs]
# Save the outputs for merging back together later.
for o in range(len(outputs)):
all_outputs[o].append(outputs[o])
# Merge outputs on CPU.
with tf.device('/cpu:0'):
merged = []
for outputs in all_outputs:
merged.append(concatenate(outputs,
axis=0))
new_model = Model(model.inputs, outputs=merged)
funcType = type(model.save)
# monkeypatch the save to save just the underlying model
def new_save(self_,filepath, overwrite=True):
model.save(filepath, overwrite)
new_model.save=funcType(new_save, new_model)
return new_model
#return Model(model.inputs, merged)