Login With Github

Examples for Joblibb

General examples

General-purpose and introductory examples for joblib.

Random state within joblib.Parallel

Randomness is affected by parallel execution differently by the different backends.

In particular, when using multiple processes, the random sequence can be the same in all processes. This example illustrates the problem and shows how to work around it.

import numpy as np
from joblib import Parallel, delayed

A utility function for the example

def print_vector(vector, backend):
    """Helper function to print the generated vector with a given backend."""
    print('\nThe different generated vectors using the {} backend are:\n {}'
          .format(backend, np.array(vector)))

Sequential behavior

stochastic_function will generate five random integers. When calling the function several times, we are expecting to obtain different vectors. For instance, we will call the function five times in a sequential manner, we can check that the generated vectors are all different.

def stochastic_function(max_value):
    """Randomly generate integer up to a maximum value."""
    return np.random.randint(max_value, size=5)


n_vectors = 5
random_vector = [stochastic_function(10) for _ in range(n_vectors)]
print('\nThe different generated vectors in a sequential manner are:\n {}'
      .format(np.array(random_vector)))

Out:

The different generated vectors in a sequential manner are:
 [[5 2 6 7 5]
 [4 9 0 6 2]
 [2 9 2 3 5]
 [9 8 8 2 9]
 [0 1 5 7 4]]

Parallel behavior

Joblib provides three different backend: loky (default), threading, and multiprocessing.

backend = 'loky'
random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
    stochastic_function)(10) for _ in range(n_vectors))
print_vector(random_vector, backend)

Out:

The different generated vectors using the loky backend are:
 [[8 5 8 5 2]
 [2 6 8 8 8]
 [3 9 0 8 7]
 [6 9 9 8 2]
 [1 1 4 7 8]]
backend = 'threading'
random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
    stochastic_function)(10) for _ in range(n_vectors))
print_vector(random_vector, backend)

Out:

The different generated vectors using the threading backend are:
 [[8 9 6 2 7]
 [5 5 2 1 9]
 [7 1 6 1 9]
 [2 7 4 0 1]
 [7 1 3 6 3]]

Loky and the threading backends behave exactly as in the sequential case and do not require more care. However, this is not the case regarding the multiprocessing backend.

backend = 'multiprocessing'
random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
    stochastic_function)(10) for _ in range(n_vectors))
print_vector(random_vector, backend)

Out:

The different generated vectors using the multiprocessing backend are:
 [[8 9 8 1 4]
 [8 9 8 1 4]
 [5 3 6 7 8]
 [5 3 6 7 8]
 [7 3 6 2 1]]

Some of the generated vectors are exactly the same, which can be a problem for the application.

Technically, the reason is that all forked Python processes share the same exact random seed. As a results, we obtain twice the same randomly generated vectors because we are using n_jobs=2. A solution is to set the random state within the function which is passed to joblib.Parallel.

def stochastic_function_seeded(max_value, random_state):
    rng = np.random.RandomState(random_state)
    return rng.randint(max_value, size=5)

stochastic_function_seeded accepts as argument a random seed. We can reset this seed by passing None at every function call. In this case, we see that the generated vectors are all different.

random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
    stochastic_function_seeded)(10, None) for _ in range(n_vectors))
print_vector(random_vector, backend)

Out:

The different generated vectors using the multiprocessing backend are:
 [[4 5 9 3 5]
 [9 5 6 6 8]
 [0 7 1 7 2]
 [1 7 4 7 6]
 [9 3 1 3 3]]

Fixing the random state to obtain deterministic results

The pattern of stochastic_function_seeded has another advantage: it allows to control the random_state by passing a known seed. So for instance, we can replicate the same generation of vectors by passing a fixed state as follows.

random_state = np.random.randint(np.iinfo(np.int32).max, size=n_vectors)

random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
    stochastic_function_seeded)(10, rng) for rng in random_state)
print_vector(random_vector, backend)

random_vector = Parallel(n_jobs=2, backend=backend)(delayed(
    stochastic_function_seeded)(10, rng) for rng in random_state)
print_vector(random_vector, backend)

Out:

The different generated vectors using the multiprocessing backend are:
 [[1 5 9 4 7]
 [5 6 4 2 0]
 [8 6 9 1 0]
 [3 3 5 6 7]
 [7 3 8 8 6]]

The different generated vectors using the multiprocessing backend are:
 [[1 5 9 4 7]
 [5 6 4 2 0]
 [8 6 9 1 0]
 [3 3 5 6 7]
 [7 3 8 8 6]]

Total running time of the script: ( 0 minutes 1.159 seconds)

Checkpoint using joblib.Memory and joblib.Parallel

This example illustrates how to cache intermediate computing results usingjoblib.Memory within joblib.Parallel.

Embed caching within parallel processing

It is possible to cache a computationally expensive function executed during a parallel process. costly_compute emulates such time consuming function.

import time


def costly_compute(data, column):
    """Emulate a costly function by sleeping and returning a column."""
    time.sleep(2)
    return data[column]


def data_processing_mean(data, column):
    """Compute the mean of a column."""
    return costly_compute(data, column).mean()

Create some data. The random seed is fixed to generate deterministic data across Python session. Note that this is not necessary for this specific example since the memory cache is cleared at the end of the session.

import numpy as np
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)

It is first possible to make the processing without caching or parallel processing.

start = time.time()
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
stop = time.time()

print('\nSequential processing')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

Out:

Sequential processing
Elapsed time for the entire processing: 8.01 s

costly_compute is expensive to compute and it is used as an intermediate step in data_processing_mean. Therefore, it is interesting to store the intermediate results from costly_compute using joblib.Memory.

from joblib import Memory

location = './cachedir'
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)

Now, we define data_processing_mean_using_cache which benefits from the cache by calling costly_compute_cached

def data_processing_mean_using_cache(data, column):
    """Compute the mean of a column."""
    return costly_compute_cached(data, column).mean()

Then, we execute the same processing in parallel and caching the intermediate results.

from joblib import Parallel, delayed

start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

Out:

First round - caching the data
Elapsed time for the entire processing: 4.13 s

By using 2 workers, the parallel processing gives a x2 speed-up compared to the sequential case. By executing again the same process, the intermediate results obtained by calling costly_compute_cached will be loaded from the cache instead of executing the function.

start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('\nSecond round - reloading from the cache')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

Out:

Second round - reloading from the cache
Elapsed time for the entire processing: 0.14 s

Reuse intermediate checkpoints

Having cached the intermediate results of the costly_compute_cached function, they are reusable by calling the function. We define a new processing which will take the maximum of the array returned by costly_compute_cached instead of previously the mean.

def data_processing_max_using_cache(data, column):
    """Compute the max of a column."""
    return costly_compute_cached(data, column).max()


start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_max_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('\nReusing intermediate checkpoints')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

Out:

Reusing intermediate checkpoints
Elapsed time for the entire processing: 0.13 s

The processing time only corresponds to the execution of the max function. The internal call to costly_compute_cached is reloading the results from the cache.

Clean-up the cache folder

memory.clear(warn=False)

Total running time of the script: ( 0 minutes 12.415 seconds)

How to use joblib.Memory

This example illustrates the usage of joblib.Memory with both functions and methods.

Without joblib.Memory

costly_compute emulates a computationally expensive process which later will benefit from caching using joblib.Memory.

import time
import numpy as np


def costly_compute(data, column_index=0):
    """Simulate an expensive computation"""
    time.sleep(5)
    return data[column_index]

Be sure to set the random seed to generate deterministic data. Indeed, if the data is not deterministic, the joblib.Memory instance will not be able to reuse the cache from one run to another.

rng = np.random.RandomState(42)
data = rng.randn(int(1e5), 10)
start = time.time()
data_trans = costly_compute(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))

Out:

The function took 5.01 s to compute.

The transformed data are:
 [ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]

Caching the result of a function to avoid recomputing

If we need to call our function several time with the same input data, it is beneficial to avoid recomputing the same results over and over since it is expensive. joblib.Memory enables to cache results from a function into a specific location.

from joblib import Memory
location = './cachedir'
memory = Memory(location, verbose=0)


def costly_compute_cached(data, column_index=0):
    """Simulate an expensive computation"""
    time.sleep(5)
    return data[column_index]


costly_compute_cached = memory.cache(costly_compute_cached)
start = time.time()
data_trans = costly_compute_cached(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))

Out:

The function took 5.06 s to compute.

The transformed data are:
 [ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]

At the first call, the results will be cached. Therefore, the computation time corresponds to the time to compute the results plus the time to dump the results into the disk.

start = time.time()
data_trans = costly_compute_cached(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))

Out:

The function took 0.02 s to compute.

The transformed data are:
 [ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]

At the second call, the computation time is largely reduced since the results are obtained by loading the data previously dumped to the disk instead of recomputing the results.

Using joblib.Memory with a method

joblib.Memory is designed to work with functions with no side effects. When dealing with class, the computationally expensive part of a method has to be moved to a function and decorated in the class method.

def _costly_compute_cached(data, column):
    time.sleep(5)
    return data[column]


class Algorithm(object):
    """A class which is using the previous function."""

    def __init__(self, column=0):
        self.column = column

    def transform(self, data):
        costly_compute = memory.cache(_costly_compute_cached)
        return costly_compute(data, self.column)


transformer = Algorithm()
start = time.time()
data_trans = transformer.transform(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))

Out:

The function took 5.06 s to compute.

The transformed data are:
 [ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
start = time.time()
data_trans = transformer.transform(data)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print('\nThe transformed data are:\n {}'.format(data_trans))

Out:

The function took 0.02 s to compute.

The transformed data are:
 [ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]

As expected, the second call to the transform method load the results which have been cached.

Clean up cache directory

memory.clear(warn=False)

Total running time of the script: ( 0 minutes 15.235 seconds)

NumPy memmap in joblib.Parallel

This example illustrates some features enabled by using a memory map (numpy.memmap) within joblib.Parallel. First, we show that dumping a huge data array ahead of passing it to joblib.Parallel speeds up computation. Then, we show the possibility to provide write access to original data.

Speed up processing of a large data array

We create a large data array for which the average is computed for several slices.

import numpy as np

data = np.random.random((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size)
          for start in range(0, data.size - window_size, int(1e5))]

The slow_mean function introduces a time.sleep() call to simulate a more expensive computation cost for which parallel computing is beneficial. Parallel may not be beneficial for very fast operation, due to extra overhead (workers creations, communication, etc.).

import time


def slow_mean(data, sl):
    """Simulate a time consuming processing."""
    time.sleep(0.01)
    return data[sl].mean()

First, we will evaluate the sequential computing on our problem.

tic = time.time()
results = [slow_mean(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))

Out:

Elapsed time computing the average of couple of slices 1.00 s

joblib.Parallel is used to compute in parallel the average of all slices using 2 workers.

from joblib import Parallel, delayed


tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))

Out:

Elapsed time computing the average of couple of slices 1.17 s

Parallel processing is already faster than the sequential processing. It is also possible to remove a bit of overhead by dumping the data array to a memmap and pass the memmap to joblib.Parallel.

import os
from joblib import dump, load

folder = './joblib_memmap'
try:
    os.mkdir(folder)
except FileExistsError:
    pass

data_filename_memmap = os.path.join(folder, 'data_memmap')
dump(data, data_filename_memmap)
data = load(data_filename_memmap, mmap_mode='r')

tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))

Out:

Elapsed time computing the average of couple of slices 0.60 s

Therefore, dumping large data array ahead of calling joblib.Parallel can speed up the processing by removing some overhead.

Writable memmap for shared memory joblib.Parallel

slow_mean_write_output will compute the mean for some given slices as in the previous example. However, the resulting mean will be directly written on the output array.

def slow_mean_write_output(data, sl, output, idx):
    """Simulate a time consuming processing."""
    time.sleep(0.005)
    res_ = data[sl].mean()
    print("[Worker %d] Mean for slice %d is %f" % (os.getpid(), idx, res_))
    output[idx] = res_

Prepare the folder where the memmap will be dumped.

output_filename_memmap = os.path.join(folder, 'output_memmap')

Pre-allocate a writable shared memory map as a container for the results of the parallel computation.

output = np.memmap(output_filename_memmap, dtype=data.dtype,
                   shape=len(slices), mode='w+')

data is replaced by its memory mapped version. Note that the buffer as already been dumped in the previous section.

data = load(data_filename_memmap, mmap_mode='r')

Fork the worker processes to perform computation concurrently

Parallel(n_jobs=2)(delayed(slow_mean_write_output)(data, sl, output, idx)
                   for idx, sl in enumerate(slices))

Compare the results from the output buffer with the expected results

print("\nExpected means computed in the parent process:\n {}"
      .format(np.array(results)))
print("\nActual means computed by the worker processes:\n {}"
      .format(output))

Out:

Expected means computed in the parent process:
 [0.4999114  0.49967474 0.49962624 0.49956682 0.49963294 0.49971405
 0.49971037 0.49991271 0.50004813 0.50044171 0.5004137  0.50048677
 0.50082031 0.50082378 0.50061898 0.50053032 0.50061782 0.50034759
 0.50018593 0.49985634 0.49953116 0.49948318 0.49907222 0.49919996
 0.49937287 0.50004619 0.49994813 0.50037896 0.50062525 0.50056846
 0.50023441 0.50008402 0.49994225 0.49976204 0.49957996 0.49951291
 0.49900244 0.49931429 0.49937982 0.49926414 0.49917665 0.49959002
 0.49973743 0.49929117 0.49970904 0.49957976 0.499808   0.49944909
 0.50008102 0.49983457 0.50034129 0.50046594 0.50067951 0.5007614
 0.50118307 0.50104813 0.50094674 0.5012441  0.50045661 0.50047582
 0.50031723 0.50022811 0.49982842 0.50036999 0.49978217 0.49988192
 0.49989093 0.49945017 0.49927452 0.49961305 0.49946881 0.49953169
 0.50005845 0.50006605 0.49995062 0.49992838 0.4997782  0.49982086
 0.50005846 0.5000365  0.50022253 0.50002488 0.50003528 0.49996313
 0.5001207  0.50002812 0.50026106 0.50019858 0.50001065 0.49978757
 0.49984565 0.49989753 0.49985816 0.50013693 0.50026811]

Actual means computed by the worker processes:
 [0.4999114  0.49967474 0.49962624 0.49956682 0.49963294 0.49971405
 0.49971037 0.49991271 0.50004813 0.50044171 0.5004137  0.50048677
 0.50082031 0.50082378 0.50061898 0.50053032 0.50061782 0.50034759
 0.50018593 0.49985634 0.49953116 0.49948318 0.49907222 0.49919996
 0.49937287 0.50004619 0.49994813 0.50037896 0.50062525 0.50056846
 0.50023441 0.50008402 0.49994225 0.49976204 0.49957996 0.49951291
 0.49900244 0.49931429 0.49937982 0.49926414 0.49917665 0.49959002
 0.49973743 0.49929117 0.49970904 0.49957976 0.499808   0.49944909
 0.50008102 0.49983457 0.50034129 0.50046594 0.50067951 0.5007614
 0.50118307 0.50104813 0.50094674 0.5012441  0.50045661 0.50047582
 0.50031723 0.50022811 0.49982842 0.50036999 0.49978217 0.49988192
 0.49989093 0.49945017 0.49927452 0.49961305 0.49946881 0.49953169
 0.50005845 0.50006605 0.49995062 0.49992838 0.4997782  0.49982086
 0.50005846 0.5000365  0.50022253 0.50002488 0.50003528 0.49996313
 0.5001207  0.50002812 0.50026106 0.50019858 0.50001065 0.49978757
 0.49984565 0.49989753 0.49985816 0.50013693 0.50026811]

Clean-up the memmap

Remove the different memmap that we created. It might fail in Windows due to file permissions.

import shutil

try:
    shutil.rmtree(folder)
except:  # noqa
    print('Could not clean-up automatically.')

Total running time of the script: ( 0 minutes 3.785 seconds)

Parallel examples

Examples demoing more advanced parallel patterns.

Using dask distributed for single-machine parallel computing

This example shows the simplest usage of the dask distributed backend, on the local computer.

This is useful for prototyping a solution, to later be run on a truly distributed cluster, as the only change to be made is the address of the scheduler.

Another realistic usage scenario: combining dask code with joblib code, for instance using dask for preprocessing data, and scikit-learn for machine learning. In such a setting, it may be interesting to use distributed as a backend scheduler for both dask and joblib, to orchestrate well the computation.

Setup the distributed client

from dask.distributed import Client

# If you have a remote cluster running Dask
# client = Client('tcp://scheduler-address:8786')

# If you want Dask to set itself up on your personal computer
client = Client(processes=False)

Run parallel computation using dask.distributed

import time
import joblib


def long_running_function(i):
    time.sleep(.1)
    return i

The verbose messages below show that the backend is indeed the dask.distributed one

with joblib.parallel_backend('dask'):
    joblib.Parallel(verbose=100)(
        joblib.delayed(long_running_function)(i)
        for i in range(10))

Out:

[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done   2 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done   3 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done   4 out of  10 | elapsed:    0.1s remaining:    0.2s
[Parallel(n_jobs=-1)]: Done   5 out of  10 | elapsed:    0.2s remaining:    0.2s
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    0.2s remaining:    0.2s
[Parallel(n_jobs=-1)]: Done   7 out of  10 | elapsed:    0.2s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done   8 out of  10 | elapsed:    0.2s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    0.3s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    0.4s finished

Progress in computation can be followed on the distributed web interface, see http://dask.pydata.org/en/latest/diagnostics-distributed.html

Total running time of the script: ( 0 minutes 0.918 seconds)

Download all examples in Pythonsource code:auto_examples_python.zip

Download all examples in Jupyternotebooks: auto_examples_jupyter.zip

0 Comment

temp