How (Not) To Scale Deep Learning in 6 Easy Steps

However, there’s an important flaw.

The final evaluation on the held-out 10% validation data shows that true accuracy is more like 76%.

Actually, the model has overfitted.

That’s not good, but worse, it means that most of the time it spent training was spent making it a little worse.

It should have ended when accuracy on the validation data stopped decreasing.

Not only would that have left a better model, it would have completed faster.

Step #2: Use Early Stopping Keras (and other frameworks) have built-in support for stopping when further training appears to be making the model worse.

In Keras, it’s the EarlyStopping callback.

Using it means passing the validation data to the training process for evaluation on every epoch.

Training will stop after several epochs have passed with no improvement.

restore_best_weights=True ensures that the final model’s weights are from its best epoch, not just the last one.

This should be your default.

.

early_stopping = EarlyStopping(patience=3, monitor=val_acc,   min_delta=0.

001, restore_best_weights=True, verbose=1) model.

fit(X_train, y_train, batch_size=2, epochs=60, verbose=2,    validation_data=(X_test, y_test), callbacks=[early_stopping]) model.

evaluate(X_test, y_test) .

Epoch 12/60  – 74s – loss: 0.

9468 – acc: 0.

7689 – val_loss: 1.

2728 – val_acc: 0.

7597 Epoch 13/60  – 75s – loss: 0.

8886 – acc: 0.

7795 – val_loss: 1.

4035 – val_acc: 0.

7456 Epoch 14/60 Restoring model weights from the end of the best epoch.

 – 80s – loss: 0.

8391 – acc: 0.

7870 – val_loss: 1.

4467 – val_acc: 0.

7420 Epoch 00014: early stopping .

[1.

3035458562230895, 0.

7597173] Now, training stops in 14 epochs, not 60, and 18 minutes.

Each epoch took a little longer (75s vs 65s) because of the evaluation of the validation data.

Accuracy is better too, at 76.

7%.

With early stopping, note that the number of epochs passed to fit() only matters as a limit on the maximum number of epochs that will run.

It can be set to a large value.

This is the first a couple observations here that suggest the same thing: epochs don’t really matter as a unit of training.

They’re just a number of batches of data that constitute the whole input to training.

But training means passing over the data in batches repeatedly until the model is trained enough.

How many epochs that represents isn’t directly important.

An epoch is still useful as a point of comparison for time taken to train per amount of data though.

Step #3: Max Out GPU with Larger Batch Sizes In Databricks, cluster metrics are exposed through a Ganglia-based UI.

This shows GPU utilization during training.

Monitoring utilization is important to tuning as it can suggest bottlenecks.

Here, the GPU is pretty well used at about 90%: 100% is cooler than 90%.

The batch size of 2 is small, and isn’t keeping the GPU busy enough during processing.

Increasing the batch size would increase that utilization.

The goal isn’t only to make the GPU busier, but to benefit from the extra work.

Bigger batches improve how well each batch updates the model (up to a point) with more accurate gradients.

That in turn can allow training to use a higher learning rate, and more quickly reach the point where the model stops improving.

Or, with extra capacity, it’s possible to add complexity to the network architecture itself to take advantage of that.

This example doesn’t intend to explore tuning the architecture, but will try adding some dropout to decrease this network’s tendency to overfit.

model = build_model(dropout=0.

5) model.

compile(optimizer=Nadam(lr=0.

004),   loss=sparse_categorical_crossentropy, metrics=[accuracy]) model.

fit(X_train, y_train, batch_size=16, epochs=30, verbose=2,    validation_data=(X_test, y_test), callbacks=[early_stopping]) … Epoch 6/30  – 56s – loss: 0.

1487 – acc: 0.

9583 – val_loss: 1.

1105 – val_acc: 0.

7633 Epoch 7/30  – 56s – loss: 0.

1022 – acc: 0.

9717 – val_loss: 1.

2128 – val_acc: 0.

7456 Epoch 8/30  – 56s – loss: 0.

0853 – acc: 0.

9744 – val_loss: 1.

2004 – val_acc: 0.

7597 Epoch 9/30 Restoring model weights from the end of the best epoch.

 – 62s – loss: 0.

0811 – acc: 0.

9815 – val_loss: 1.

2424 – val_acc: 0.

7350 Epoch 00009: early stopping With a larger batch size of 16 instead of 2, and learning rate of 0.

004 instead of 0.

001, the GPU crunches through epochs in under 60s instead of 75s.

The model reaches about the same accuracy (76.

3%) in only 9 epochs.

Total train time was just 9 minutes, much better than 65.

It’s all too easy to increase the learning rate too far, in which case training accuracy will be poor and stay poor.

When increasing the batch size by 8x, it’s typically advisable to increase learning rate by at most 8x.

Some research suggests that when the batch size increases by N, the learning rate can scale by about sqrt(N).

Note that there is some randomness inherent in the training process, as inputs are shuffled by Keras.

Accuracy fluctuates mostly up but sometimes down over time, and coupled with early stopping, training might terminate earlier or later depending on the order the data is encountered.

To even this out, the ‘patience’ of EarlyStopping can be increased at the cost of extra training at the end.

Step #4: Use Petastorm and /dbfs/ml to Access Large Data Training above used just a 10% sample of the data, and the tips above helped bring training time down by adopting a few best practices.

The next step, of course, is to train on all of the data.

This should help achieve higher accuracy, but means more data will have to be processed too.

The full data set is many gigabytes, which could still fit in memory, but for purposes here, let’s pretend it wouldn’t.

Data needs to be loaded efficiently in chunks into memory during training with a different approach.

Fortunately, the Petastorm library from Uber is designed to feed Parquet-based data into Tensorflow (or Keras) training in this way.

It can be applied by adapting the preprocessing and training code to create Tensorflow Datasets, rather than pandas DataFrames, for training.

Datasets here act like infinite iterators over the data, which means steps_per_epoch is now defined to specify how many batches make an epoch.

This underscores how an ‘epoch’ is somewhat arbitrary.

It’s also common to checkpoint model training progress in long-running training jobs, to recover from failures during training.

This is also added as a callback.

(Note: To run this example, attach the petastorm library to your cluster.

) path_base = “/dbfs/.

/” checkpoint_path = path_base + “checkpoint” table_path_base = path_base + “caltech_256_image/” table_path_base_file = “file:” + table_path_base train_size = spark.

read.

parquet(table_path_base_file + “train”).

count() test_size = spark.

read.

parquet(table_path_base_file + “test”).

count() # Workaround for Arrow issue: underscore_files = [f for f in (os.

listdir(table_path_base + “train”) +    os.

listdir(table_path_base + “test”)) if f.

startswith(“_”)] pq.

EXCLUDED_PARQUET_PATHS.

update(underscore_files) img_size = 299 def transform_reader(reader, batch_size):   def transform_input(x):     img_bytes = tf.

reshape(decode_raw(x.

image, tf.

uint8), (-1,img_size,img_size,3))     inputs = preprocess_input(tf.

cast(img_bytes, tf.

float32))     outputs = x.

label – 1     return (inputs, outputs)   return make_petastorm_dataset(reader).

map(transform_input).

    apply(unbatch()).

shuffle(400, seed=42).

    batch(batch_size, drop_remainder=True) The method above reimplements some of the preprocessing from earlier code in terms of Tensorflow’s transformation APIs.

Note that Petastorm produces Datasets that deliver data in batches that depends entirely on the Parquet files’ row group size.

To control the batch size for training, it’s necessary to use Tensorflow’s unbatch() and batch() operations to re-batch the data into the right size.

Also, note the small workaround that’s currently necessary to avoid a problem in reading Parquet files via Arrow in Petastorm.

batch_size = 16 with make_batch_reader(table_path_base_file + “train”, num_epochs=None) as train_reader:   with make_batch_reader(table_path_base_file + “test”, num_epochs=None) as test_reader:     train_dataset = transform_reader(train_reader, batch_size)     test_dataset = transform_reader(test_reader, batch_size)     model = build_model(dropout=0.

5)     model.

compile(optimizer=Nadam(lr=0.

004),       loss=sparse_categorical_crossentropy, metrics=[acc])     early_stopping = EarlyStopping(patience=3, monitor=val_acc,       min_delta=0.

001, restore_best_weights=True, verbose=1)     # Note: you must set save_weights_only=True to avoid problems with hdf5 files and /dbfs/ml     checkpoint = ModelCheckpoint(checkpoint_path + “/checkpoint-{epoch}.

ckpt”, save_weights_only=True, verbose=1)     model.

fit(train_dataset, epochs=30, steps_per_epoch=(train_size // batch_size),               validation_data=test_dataset, validation_steps=(test_size // batch_size),               verbose=2, callbacks=[early_stopping, checkpoint]) More asides: for technical reasons, currently ModelCheckpoint must set save_weights_only=True when using /dbfs.

It also appears necessary to use different checkpoint paths per epoch; use a path pattern that includes {epoch}.

Now run: Epoch 8/30 Epoch 00008: saving model to /dbfs/tmp/sean.

owen/binary/checkpoint/checkpoint-8.

ckpt  – 682s – loss: 1.

0154 – acc: 0.

8336 – val_loss: 1.

2391 – val_acc: 0.

8301 Epoch 9/30 Epoch 00009: saving model to /dbfs/tmp/sean.

owen/binary/checkpoint/checkpoint-9.

ckpt.

 – 684s – loss: 1.

0048 – acc: 0.

8397 – val_loss: 1.

2900 – val_acc: 0.

8275 Epoch 10/30 Epoch 00010: saving model to /dbfs/tmp/sean.

owen/binary/checkpoint/checkpoint-10.

ckpt  – 689s – loss: 1.

0033 – acc: 0.

8422 – val_loss: 1.

3706 – val_acc: 0.

8225 Epoch 11/30 Restoring model weights from the end of the best epoch.

Epoch 00011: saving model to /dbfs/tmp/sean.

owen/binary/checkpoint/checkpoint-11.

ckpt  – 687s – loss: 0.

9800 – acc: 0.

8503 – val_loss: 1.

3837 – val_acc: 0.

8225 Epoch 00011: early stopping Epoch times are almost 11x longer, but recall that an epoch here is now a full pass over the training data, not a 10% sample.

The extra overhead comes from the I/O in reading data from Parquet in cloud storage, and writing checkpoint files.

The GPU utilization graph manifests this in “spiky” utilization of the GPU: The upside?.Accuracy is significantly better at 83%.

The cost was much longer training time: 126 minutes instead of 9.

For many applications, this could be well worth it.

Databricks provides an optimized implementation of the file system mount that makes the Parquet files appear as local files to training.

Accessing them via /dbfs/ml/… instead of /dbfs/… can improve I/O performance.

Also, Petastorm itself can cache data on local disks to avoid re-reading data from cloud storage.

path_base = “/dbfs/ml/.

” checkpoint_path = path_base + “checkpoint” table_path_base = path_base + “caltech_256_image/” table_path_base_file = “file:” + table_path_base def make_caching_reader(suffix, cur_shard=None, shard_count=None): return make_batch_reader(table_path_base_file + suffix, num_epochs=None, cur_shard=cur_shard, shard_count=shard_count, cache_type=local-disk, cache_location=”/tmp/” + suffix, cache_size_limit=20000000000, cache_row_size_estimate=img_size * img_size * 3) The rest of the code is as above, just using make_caching_reader in place of make_reader.

Epoch 6/30 Epoch 00006: saving model to /dbfs/ml/tmp/sean.

owen/binary/checkpoint/checkpoint-6.

ckpt – 638s – loss: 1.

0221 – acc: 0.

8252 – val_loss: 1.

1612 – val_acc: 0.

8285 .

Epoch 00009: early stopping   The training time decreased from about 126 minutes to 96 minutes for roughly the same result.

That’s still more than 10x the runtime for 10x the data, but not bad for a 7% increase in accuracy.

Step #5: Use Multiple GPUs Still want to go faster, and have some budget?.It’s easy to try a bigger GPU like a V100 and retune appropriately.

However, at some point, scaling up means multiple GPUs.

Instances with, for example, eight K80 GPUs are readily available in the cloud.

Keras provides a simple utility function called multi_gpu_model that can parallelize training across multiple GPUs.

It’s just a one-line code change: num_gpus = 8 .

model = multi_gpu_model(model, gpus=num_gpus) (Note: to run this example, choose a driver instance type with 8 GPUs.

) The modification was easy, but, to cut to the chase without repeating the training output: per-epoch time becomes 270s instead of 630s.

That’s not 8x faster, not even 3x faster.

Each of the 8 GPUs is only processing 1/8th of each batch of 16 inputs, so each is again effectively processing just 2 per batch.

As above, it’s possible to increase the batch size by 8x to compensate, to 256, and further increase the learning rate to 0.

016.

(See the accompanying notebook for full code listings.

) It reveals that training is faster, at 135s per epoch.

The speedup is better, but still not 8x.

Accuracy is steady at around 83%, so this still progresses towards faster training.

The Keras implementation is simple, but not optimal.

GPU utilization remains spiky because the GPUs idle while Keras combines partial gradients in a straightforward but slow way.

Horovod is another project from Uber that helps scale deep learning training across not just multiple GPUs on one machine, but GPUs across many machines, and with great efficiency.

While it’s often associated with training across multiple machines, that’s not actually the next step in scaling up.

It can help this current multi-GPU setup.

All else equal, it’ll be more efficient to utilize 8 GPUs connected to the same VM than spread across the network.

It requires a different modification to the code, which uses the HorovodRunner utility from Databricks to integrate Horovod with Spark: batch_size = 32 num_gpus = 8 def train_hvd(): hvd.

init() config = tf.

ConfigProto() config.

gpu_options.

allow_growth = True config.

gpu_options.

visible_device_list = str(hvd.

local_rank()) K.

set_session(tf.

Session(config=config)) pq.

EXCLUDED_PARQUET_PATHS.

update(underscore_files) with make_caching_reader(“train”, cur_shard=hvd.

rank(), shard_count=hvd.

size()) as train_reader: with make_caching_reader(“test”, cur_shard=hvd.

rank(), shard_count=hvd.

size()) as test_reader: train_dataset = transform_reader(train_reader, batch_size) test_dataset = transform_reader(test_reader, batch_size) model = build_model(dropout=0.

5) optimizer = Nadam(lr=0.

016) optimizer = hvd.

DistributedOptimizer(optimizer) model.

compile(optimizer=optimizer, loss=sparse_categorical_crossentropy, metrics=[acc]) callbacks = [hvd.

callbacks.

BroadcastGlobalVariablesCallback(0), hvd.

callbacks.

MetricAverageCallback(), EarlyStopping(patience=3, monitor=val_acc, min_delta=0.

001, restore_best_weights=True, verbose=(1 if hvd.

rank() == 0 else 0))] if hvd.

rank() == 0: callbacks.

append(ModelCheckpoint( checkpoint_path + “/checkpoint-{epoch}.

ckpt”, save_weights_only=True, verbose=1)) model.

fit(train_dataset, epochs=30, steps_per_epoch=(train_size // (batch_size * num_gpus)), validation_data=test_dataset, validation_steps=(test_size // (batch_size * num_gpus)), verbose=(2 if hvd.

rank() == 0 else 0), callbacks=callbacks) hr = HorovodRunner(np=-num_gpus) hr.

run(train_hvd) Again a few notes: The Arrow workaround must be repeated in the Horovod training function Use hvd.

callbacks.

MetricAverageCallback to correctly average validation metrics Make sure to only run checkpoint callbacks on one worker (rank 0) Set HorovodRunner’s np= argument to minus the number of GPUs to use, when local Batch size here is now per GPU, not overall.

Note the different computation in steps_per_epoch The output from the training is, well, noisy and so won’t be copied here in full.

Total training time has come down to about 12.

6 minutes, from 96, or almost 7.

6x, which is satisfyingly close to the maximum possible 8x speedup!.Accuracy is up to 83.

5%.

Compare to 9 minutes and 76% accuracy on one GPU.

Step #6: Use Horovod Across Multiple Machines Sometimes, 8 or even 16 GPUs just isn’t enough, and that’s the most you can get on one machine today.

Or, sometimes it can be cheaper to provision GPUs across many smaller machines to take advantage of varying prices per machine type in the cloud.

The same Horovod example above can run on a cluster of 8 1-GPU machines instead of 1 8-GPU machine with just a single line of change.

HorovodRunner manages the distributed work of Horovod on the Spark cluster by using Spark 2.

4’s barrier mode support.

num_gpus = 8 .

hr = HorovodRunner(np=num_gpus) (Note: to run this example, provision a cluster with 8 workers, each with 1 GPU.

) The only change is to specify 8, rather than -8, to select 8 GPUs on the cluster rather than on the driver.

GPU utilization is pleasingly full across 8 machines’ GPUs (the idle one is the driver, which does not participate in the training): Accuracy is again about the same as expected, at 83.

6%.

Total run time is almost 17 minutes rather than 12.

6, which reflects the overhead of coordinating GPUs across machines.

This overhead could be worthwhile in some cases for cost purposes, and is simply a necessary evil if a training job has to scale past 16 GPUs.

Where possible, allocating all the GPUs on one machine is faster though.

For a problem of this moderate size, it probably won’t be possible to usefully exploit more GPU resources.

Keeping them busy would mean larger learning rates and the learning rate is already about as high as it can go.

For this network, a few K80 GPUs may be the right maximum amount of resource to deploy.

Of course, there are much larger networks and datasets out there!.Conclusion Deep learning is powerful magic, but we always want it to go faster.

It scales in different ways though.

There are new best practices and pitfalls to know when setting out to train a model.

A few of these helped the small image classification problem here improve accuracy slightly while reducing runtime 7x.

The first steps in scaling aren’t more resources, but looking for easy optimizations.

Scaling to train on an entire large data set in the cloud requires some new tools, but not necessarily more GPUs at first.

With careful use of Petastorm and /dbfs/ml, 10x the data helped achieve 82.

7% accuracy is not much more than 10x the time on the same hardware.

The next step of scaling up means utilizing multiple GPUs with tools like Horovod, but doesn’t mean a cluster of machines necessarily, unlike in ETL jobs where a cluster of machines is the norm.

A single 8 GPU instance allowed training to finish almost 8x faster and achieve over 83% accuracy.

Only for the largest problems are multiple GPU instances necessary, but Horovod can help scale even there without much overhead.

      Try Databricks for free.

Get started today.. More details

Leave a Reply