Jupyter Snippet CB2nd 11_dask

Jupyter Snippet CB2nd 11_dask

5.11. Performing out-of-core computations on large arrays with Dask

import numpy as np
import dask.array as da
import memory_profiler
%load_ext memory_profiler
Y = da.random.normal(size=(10000, 10000),
                     chunks=(1000, 1000))
Y
dask.array<da.random.normal, shape=(10000, 10000),
    dtype=float64, chunksize=(1000, 1000)>
Y.shape, Y.size, Y.chunks
((10000, 10000),
 100000000,
 ((1000, ..., 1000),
  (1000, ..., 1000)))
mu = Y.mean(axis=0)
mu
dask.array<mean_agg-aggregate, shape=(10000,),
    dtype=float64, chunksize=(1000,)>
mu[0].compute()
0.011
def f_numpy():
    X = np.random.normal(size=(10000, 10000))
    x = X.mean(axis=0)[0:100]
%%memit
f_numpy()
peak memory: 916.32 MiB, increment: 763.00 MiB
%%time
f_numpy()
CPU times: user 3.86 s, sys: 664 ms, total: 4.52 s
Wall time: 4.52 s
def f_dask():
    Y = da.random.normal(size=(10000, 10000),
                         chunks=(1000, 1000))
    y = Y.mean(axis=0)[0:100].compute()
%%memit
f_dask()
peak memory: 221.42 MiB, increment: 67.64 MiB
%%time
f_dask()
CPU times: user 492 ms, sys: 12 ms, total: 504 ms
Wall time: 105 ms
def f_dask2():
    Y = da.random.normal(size=(10000, 10000),
                         chunks=(10000, 100))
    y = Y.mean(axis=0)[0:100].compute()
%%memit
f_dask2()
peak memory: 145.60 MiB, increment: 6.93 MiB
%%time
f_dask2()
CPU times: user 48 ms, sys: 8 ms, total: 56 ms
Wall time: 57.4 ms
from dask.distributed import Client
client = Client()
client
Y.sum().compute()
4090.221
future = client.compute(Y.sum())
future

png

future.result()
4090.221
huge = da.random.uniform(
    size=(1500000, 100000), chunks=(10000, 10000))
"Size in memory: %.1f GB" % (huge.nbytes / 1024 ** 3)
'Size in memory: 1117.6 GB'
from dask.diagnostics import ProgressBar
# WARNING: this will take a very long time computing
# useless values. This is for pedagogical purposes
# only.
with ProgressBar():
    m = huge.mean().compute()
[##                   ] | 11% Completed |  1min 44.8s