# Examples for Joblibb

## General examples

General-purpose and introductory examples for joblib.

### Random state within joblib.Parallel

Randomness is affected by parallel execution differently by the different backends.

In particular, when using multiple processes, the random sequence can be the same in all processes. This example illustrates the problem and shows how to work around it.

``````import numpy as np
from joblib import Parallel, delayed``````

A utility function for the example

``````def print_vector(vector, backend):
"""Helper function to print the generated vector with a given backend."""
print('\nThe different generated vectors using the {} backend are:\n {}'
.format(backend, np.array(vector)))``````

#### Sequential behavior

`stochastic_function` will generate five random integers. When calling the function several times, we are expecting to obtain different vectors. For instance, we will call the function five times in a sequential manner, we can check that the generated vectors are all different.

``````def stochastic_function(max_value):
"""Randomly generate integer up to a maximum value."""
return np.random.randint(max_value, size=5)

n_vectors = 5
random_vector = [stochastic_function(10) for _ in range(n_vectors)]
print('\nThe different generated vectors in a sequential manner are:\n {}'
.format(np.array(random_vector)))``````

Out:

``````The different generated vectors in a sequential manner are:
[[5 2 6 7 5]
[4 9 0 6 2]
[2 9 2 3 5]
[9 8 8 2 9]
[0 1 5 7 4]]``````

#### Parallel behavior

Joblib provides three different backend: loky (default), threading, and multiprocessing.

``````backend = 'loky'
random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
stochastic_function)(10) for _ in range(n_vectors))
print_vector(random_vector, backend)``````

Out:

``````The different generated vectors using the loky backend are:
[[8 5 8 5 2]
[2 6 8 8 8]
[3 9 0 8 7]
[6 9 9 8 2]
[1 1 4 7 8]]``````
``````backend = 'threading'
random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
stochastic_function)(10) for _ in range(n_vectors))
print_vector(random_vector, backend)``````

Out:

``````The different generated vectors using the threading backend are:
[[8 9 6 2 7]
[5 5 2 1 9]
[7 1 6 1 9]
[2 7 4 0 1]
[7 1 3 6 3]]``````

Loky and the threading backends behave exactly as in the sequential case and do not require more care. However, this is not the case regarding the multiprocessing backend.

``````backend = 'multiprocessing'
random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
stochastic_function)(10) for _ in range(n_vectors))
print_vector(random_vector, backend)``````

Out:

``````The different generated vectors using the multiprocessing backend are:
[[8 9 8 1 4]
[8 9 8 1 4]
[5 3 6 7 8]
[5 3 6 7 8]
[7 3 6 2 1]]``````

Some of the generated vectors are exactly the same, which can be a problem for the application.

Technically, the reason is that all forked Python processes share the same exact random seed. As a results, we obtain twice the same randomly generated vectors because we are using `n_jobs=2`. A solution is to set the random state within the function which is passed to `joblib.Parallel`.

``````def stochastic_function_seeded(max_value, random_state):
rng = np.random.RandomState(random_state)
return rng.randint(max_value, size=5)``````

`stochastic_function_seeded` accepts as argument a random seed. We can reset this seed by passing `None` at every function call. In this case, we see that the generated vectors are all different.

``````random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
stochastic_function_seeded)(10, None) for _ in range(n_vectors))
print_vector(random_vector, backend)``````

Out:

``````The different generated vectors using the multiprocessing backend are:
[[4 5 9 3 5]
[9 5 6 6 8]
[0 7 1 7 2]
[1 7 4 7 6]
[9 3 1 3 3]]``````

#### Fixing the random state to obtain deterministic results

The pattern of `stochastic_function_seeded` has another advantage: it allows to control the random_state by passing a known seed. So for instance, we can replicate the same generation of vectors by passing a fixed state as follows.

``````random_state = np.random.randint(np.iinfo(np.int32).max, size=n_vectors)

random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
stochastic_function_seeded)(10, rng) for rng in random_state)
print_vector(random_vector, backend)

random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
stochastic_function_seeded)(10, rng) for rng in random_state)
print_vector(random_vector, backend)``````

Out:

``````The different generated vectors using the multiprocessing backend are:
[[1 5 9 4 7]
[5 6 4 2 0]
[8 6 9 1 0]
[3 3 5 6 7]
[7 3 8 8 6]]

The different generated vectors using the multiprocessing backend are:
[[1 5 9 4 7]
[5 6 4 2 0]
[8 6 9 1 0]
[3 3 5 6 7]
[7 3 8 8 6]]``````

Total running time of the script: ( 0 minutes 1.159 seconds)

### Checkpoint using joblib.Memory and joblib.Parallel

This example illustrates how to cache intermediate computing results using`joblib.Memory` within `joblib.Parallel`.

#### Embed caching within parallel processing

It is possible to cache a computationally expensive function executed during a parallel process. `costly_compute` emulates such time consuming function.

``````import time

def costly_compute(data, column):
"""Emulate a costly function by sleeping and returning a column."""
time.sleep(2)
return data[column]

def data_processing_mean(data, column):
"""Compute the mean of a column."""
return costly_compute(data, column).mean()``````

Create some data. The random seed is fixed to generate deterministic data across Python session. Note that this is not necessary for this specific example since the memory cache is cleared at the end of the session.

``````import numpy as np
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)``````

It is first possible to make the processing without caching or parallel processing.

``````start = time.time()
results = [data_processing_mean(data, col) for col in range(data.shape)]
stop = time.time()

print('\nSequential processing')
print('Elapsed time for the entire processing: {:.2f} s'
.format(stop - start))``````

Out:

``````Sequential processing
Elapsed time for the entire processing: 8.01 s``````

`costly_compute` is expensive to compute and it is used as an intermediate step in `data_processing_mean`. Therefore, it is interesting to store the intermediate results from `costly_compute` using `joblib.Memory`.

``````from joblib import Memory

location = './cachedir'
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)``````

Now, we define `data_processing_mean_using_cache` which benefits from the cache by calling `costly_compute_cached`

``````def data_processing_mean_using_cache(data, column):
"""Compute the mean of a column."""
return costly_compute_cached(data, column).mean()``````

Then, we execute the same processing in parallel and caching the intermediate results.

``````from joblib import Parallel, delayed

start = time.time()
results = Parallel(n_jobs=2)(
delayed(data_processing_mean_using_cache)(data, col)
for col in range(data.shape))
stop = time.time()

print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'
.format(stop - start))``````

Out:

``````First round - caching the data
Elapsed time for the entire processing: 4.13 s``````

By using 2 workers, the parallel processing gives a x2 speed-up compared to the sequential case. By executing again the same process, the intermediate results obtained by calling `costly_compute_cached` will be loaded from the cache instead of executing the function.

``````start = time.time()
results = Parallel(n_jobs=2)(
delayed(data_processing_mean_using_cache)(data, col)
for col in range(data.shape))
stop = time.time()

print('Elapsed time for the entire processing: {:.2f} s'
.format(stop - start))``````

Out:

``````Second round - reloading from the cache
Elapsed time for the entire processing: 0.14 s``````

#### Reuse intermediate checkpoints

Having cached the intermediate results of the `costly_compute_cached` function, they are reusable by calling the function. We define a new processing which will take the maximum of the array returned by `costly_compute_cached` instead of previously the mean.

``````def data_processing_max_using_cache(data, column):
"""Compute the max of a column."""
return costly_compute_cached(data, column).max()

start = time.time()
results = Parallel(n_jobs=2)(
delayed(data_processing_max_using_cache)(data, col)
for col in range(data.shape))
stop = time.time()

print('\nReusing intermediate checkpoints')
print('Elapsed time for the entire processing: {:.2f} s'
.format(stop - start))``````

Out:

``````Reusing intermediate checkpoints
Elapsed time for the entire processing: 0.13 s``````

The processing time only corresponds to the execution of the `max` function. The internal call to `costly_compute_cached` is reloading the results from the cache.

#### Clean-up the cache folder

``memory.clear(warn=False)``

Total running time of the script: ( 0 minutes 12.415 seconds)

### How to use joblib.Memory

This example illustrates the usage of `joblib.Memory` with both functions and methods.

## Without `joblib.Memory`

`costly_compute` emulates a computationally expensive process which later will benefit from caching using `joblib.Memory`.

``````import time
import numpy as np

def costly_compute(data, column_index=0):
"""Simulate an expensive computation"""
time.sleep(5)
return data[column_index]``````

Be sure to set the random seed to generate deterministic data. Indeed, if the data is not deterministic, the `joblib.Memory` instance will not be able to reuse the cache from one run to another.

``````rng = np.random.RandomState(42)
data = rng.randn(int(1e5), 10)
start = time.time()
data_trans = costly_compute(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))``````

Out:

``````The function took 5.01 s to compute.

The transformed data are:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
1.57921282  0.76743473 -0.46947439  0.54256004]``````

#### Caching the result of a function to avoid recomputing

If we need to call our function several time with the same input data, it is beneficial to avoid recomputing the same results over and over since it is expensive. `joblib.Memory` enables to cache results from a function into a specific location.

``````from joblib import Memory
location = './cachedir'
memory = Memory(location, verbose=0)

def costly_compute_cached(data, column_index=0):
"""Simulate an expensive computation"""
time.sleep(5)
return data[column_index]

costly_compute_cached = memory.cache(costly_compute_cached)
start = time.time()
data_trans = costly_compute_cached(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))``````

Out:

``````The function took 5.06 s to compute.

The transformed data are:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
1.57921282  0.76743473 -0.46947439  0.54256004]``````

At the first call, the results will be cached. Therefore, the computation time corresponds to the time to compute the results plus the time to dump the results into the disk.

``````start = time.time()
data_trans = costly_compute_cached(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))``````

Out:

``````The function took 0.02 s to compute.

The transformed data are:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
1.57921282  0.76743473 -0.46947439  0.54256004]``````

At the second call, the computation time is largely reduced since the results are obtained by loading the data previously dumped to the disk instead of recomputing the results.

#### Using `joblib.Memory` with a method

`joblib.Memory` is designed to work with functions with no side effects. When dealing with class, the computationally expensive part of a method has to be moved to a function and decorated in the class method.

``````def _costly_compute_cached(data, column):
time.sleep(5)
return data[column]

class Algorithm(object):
"""A class which is using the previous function."""

def __init__(self, column=0):
self.column = column

def transform(self, data):
costly_compute = memory.cache(_costly_compute_cached)
return costly_compute(data, self.column)

transformer = Algorithm()
start = time.time()
data_trans = transformer.transform(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))``````

Out:

``````The function took 5.06 s to compute.

The transformed data are:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
1.57921282  0.76743473 -0.46947439  0.54256004]``````
``````start = time.time()
data_trans = transformer.transform(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))``````

Out:

``````The function took 0.02 s to compute.

The transformed data are:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
1.57921282  0.76743473 -0.46947439  0.54256004]``````

As expected, the second call to the `transform` method load the results which have been cached.

#### Clean up cache directory

``memory.clear(warn=False)``

Total running time of the script: ( 0 minutes 15.235 seconds)

### NumPy memmap in joblib.Parallel

This example illustrates some features enabled by using a memory map (`numpy.memmap`) within `joblib.Parallel`. First, we show that dumping a huge data array ahead of passing it to `joblib.Parallel` speeds up computation. Then, we show the possibility to provide write access to original data.

#### Speed up processing of a large data array

We create a large data array for which the average is computed for several slices.

``````import numpy as np

data = np.random.random((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size)
for start in range(0, data.size - window_size, int(1e5))]``````

The `slow_mean` function introduces a `time.sleep()` call to simulate a more expensive computation cost for which parallel computing is beneficial. Parallel may not be beneficial for very fast operation, due to extra overhead (workers creations, communication, etc.).

``````import time

def slow_mean(data, sl):
"""Simulate a time consuming processing."""
time.sleep(0.01)
return data[sl].mean()``````

First, we will evaluate the sequential computing on our problem.

``````tic = time.time()
results = [slow_mean(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
.format(toc - tic))``````

Out:

``Elapsed time computing the average of couple of slices 1.00 s``

`joblib.Parallel` is used to compute in parallel the average of all slices using 2 workers.

``````from joblib import Parallel, delayed

tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
.format(toc - tic))``````

Out:

``Elapsed time computing the average of couple of slices 1.17 s``

Parallel processing is already faster than the sequential processing. It is also possible to remove a bit of overhead by dumping the `data` array to a memmap and pass the memmap to `joblib.Parallel`.

``````import os

folder = './joblib_memmap'
try:
os.mkdir(folder)
except FileExistsError:
pass

data_filename_memmap = os.path.join(folder, 'data_memmap')
dump(data, data_filename_memmap)

tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
.format(toc - tic))``````

Out:

``Elapsed time computing the average of couple of slices 0.60 s``

Therefore, dumping large `data` array ahead of calling `joblib.Parallel` can speed up the processing by removing some overhead.

#### Writable memmap for shared memory `joblib.Parallel`

`slow_mean_write_output` will compute the mean for some given slices as in the previous example. However, the resulting mean will be directly written on the output array.

``````def slow_mean_write_output(data, sl, output, idx):
"""Simulate a time consuming processing."""
time.sleep(0.005)
res_ = data[sl].mean()
print("[Worker %d] Mean for slice %d is %f" % (os.getpid(), idx, res_))
output[idx] = res_``````

Prepare the folder where the memmap will be dumped.

``output_filename_memmap = os.path.join(folder, 'output_memmap')``

Pre-allocate a writable shared memory map as a container for the results of the parallel computation.

``````output = np.memmap(output_filename_memmap, dtype=data.dtype,
shape=len(slices), mode='w+')``````

`data` is replaced by its memory mapped version. Note that the buffer as already been dumped in the previous section.

``data = load(data_filename_memmap, mmap_mode='r')``

Fork the worker processes to perform computation concurrently

``````Parallel(n_jobs=2)(delayed(slow_mean_write_output)(data, sl, output, idx)
for idx, sl in enumerate(slices))``````

Compare the results from the output buffer with the expected results

``````print("\nExpected means computed in the parent process:\n {}"
.format(np.array(results)))
print("\nActual means computed by the worker processes:\n {}"
.format(output))``````

Out:

``````Expected means computed in the parent process:
[0.4999114  0.49967474 0.49962624 0.49956682 0.49963294 0.49971405
0.49971037 0.49991271 0.50004813 0.50044171 0.5004137  0.50048677
0.50082031 0.50082378 0.50061898 0.50053032 0.50061782 0.50034759
0.50018593 0.49985634 0.49953116 0.49948318 0.49907222 0.49919996
0.49937287 0.50004619 0.49994813 0.50037896 0.50062525 0.50056846
0.50023441 0.50008402 0.49994225 0.49976204 0.49957996 0.49951291
0.49900244 0.49931429 0.49937982 0.49926414 0.49917665 0.49959002
0.49973743 0.49929117 0.49970904 0.49957976 0.499808   0.49944909
0.50008102 0.49983457 0.50034129 0.50046594 0.50067951 0.5007614
0.50118307 0.50104813 0.50094674 0.5012441  0.50045661 0.50047582
0.50031723 0.50022811 0.49982842 0.50036999 0.49978217 0.49988192
0.49989093 0.49945017 0.49927452 0.49961305 0.49946881 0.49953169
0.50005845 0.50006605 0.49995062 0.49992838 0.4997782  0.49982086
0.50005846 0.5000365  0.50022253 0.50002488 0.50003528 0.49996313
0.5001207  0.50002812 0.50026106 0.50019858 0.50001065 0.49978757
0.49984565 0.49989753 0.49985816 0.50013693 0.50026811]

Actual means computed by the worker processes:
[0.4999114  0.49967474 0.49962624 0.49956682 0.49963294 0.49971405
0.49971037 0.49991271 0.50004813 0.50044171 0.5004137  0.50048677
0.50082031 0.50082378 0.50061898 0.50053032 0.50061782 0.50034759
0.50018593 0.49985634 0.49953116 0.49948318 0.49907222 0.49919996
0.49937287 0.50004619 0.49994813 0.50037896 0.50062525 0.50056846
0.50023441 0.50008402 0.49994225 0.49976204 0.49957996 0.49951291
0.49900244 0.49931429 0.49937982 0.49926414 0.49917665 0.49959002
0.49973743 0.49929117 0.49970904 0.49957976 0.499808   0.49944909
0.50008102 0.49983457 0.50034129 0.50046594 0.50067951 0.5007614
0.50118307 0.50104813 0.50094674 0.5012441  0.50045661 0.50047582
0.50031723 0.50022811 0.49982842 0.50036999 0.49978217 0.49988192
0.49989093 0.49945017 0.49927452 0.49961305 0.49946881 0.49953169
0.50005845 0.50006605 0.49995062 0.49992838 0.4997782  0.49982086
0.50005846 0.5000365  0.50022253 0.50002488 0.50003528 0.49996313
0.5001207  0.50002812 0.50026106 0.50019858 0.50001065 0.49978757
0.49984565 0.49989753 0.49985816 0.50013693 0.50026811]``````

#### Clean-up the memmap

Remove the different memmap that we created. It might fail in Windows due to file permissions.

``````import shutil

try:
shutil.rmtree(folder)
except:  # noqa
print('Could not clean-up automatically.')``````

Total running time of the script: ( 0 minutes 3.785 seconds)

## Parallel examples

Examples demoing more advanced parallel patterns.

### Using dask distributed for single-machine parallel computing

This example shows the simplest usage of the dask distributed backend, on the local computer.

This is useful for prototyping a solution, to later be run on a truly distributed cluster, as the only change to be made is the address of the scheduler.

Another realistic usage scenario: combining dask code with joblib code, for instance using dask for preprocessing data, and scikit-learn for machine learning. In such a setting, it may be interesting to use distributed as a backend scheduler for both dask and joblib, to orchestrate well the computation.

#### Setup the distributed client

``````from dask.distributed import Client

# If you have a remote cluster running Dask

# If you want Dask to set itself up on your personal computer
client = Client(processes=False)``````

#### Run parallel computation using dask.distributed

``````import time
import joblib

def long_running_function(i):
time.sleep(.1)
return i``````

The verbose messages below show that the backend is indeed the dask.distributed one

``````with joblib.parallel_backend('dask'):
joblib.Parallel(verbose=100)(
joblib.delayed(long_running_function)(i)
for i in range(10))``````

Out:

``````[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done   2 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done   3 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done   4 out of  10 | elapsed:    0.1s remaining:    0.2s
[Parallel(n_jobs=-1)]: Done   5 out of  10 | elapsed:    0.2s remaining:    0.2s
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    0.2s remaining:    0.2s
[Parallel(n_jobs=-1)]: Done   7 out of  10 | elapsed:    0.2s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done   8 out of  10 | elapsed:    0.2s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    0.3s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    0.4s finished``````

Progress in computation can be followed on the distributed web interface, see http://dask.pydata.org/en/latest/diagnostics-distributed.html

Total running time of the script: ( 0 minutes 0.918 seconds)

`Download all examples in Pythonsource code:auto_examples_python.zip`

`Download all examples in Jupyternotebooks: auto_examples_jupyter.zip`

temp