Jupyter Snippet CB2nd 11_dask
Jupyter Snippet CB2nd 11_dask
5.11. Performing out-of-core computations on large arrays with Dask
import numpy as np
import dask.array as da
import memory_profiler
%load_ext memory_profiler
Y = da.random.normal(size=(10000, 10000),
chunks=(1000, 1000))
Y
dask.array<da.random.normal, shape=(10000, 10000),
dtype=float64, chunksize=(1000, 1000)>
Y.shape, Y.size, Y.chunks
((10000, 10000),
100000000,
((1000, ..., 1000),
(1000, ..., 1000)))
mu = Y.mean(axis=0)
mu
dask.array<mean_agg-aggregate, shape=(10000,),
dtype=float64, chunksize=(1000,)>
mu[0].compute()
0.011
def f_numpy():
X = np.random.normal(size=(10000, 10000))
x = X.mean(axis=0)[0:100]
%%memit
f_numpy()
peak memory: 916.32 MiB, increment: 763.00 MiB
%%time
f_numpy()
CPU times: user 3.86 s, sys: 664 ms, total: 4.52 s
Wall time: 4.52 s
def f_dask():
Y = da.random.normal(size=(10000, 10000),
chunks=(1000, 1000))
y = Y.mean(axis=0)[0:100].compute()
%%memit
f_dask()
peak memory: 221.42 MiB, increment: 67.64 MiB
%%time
f_dask()
CPU times: user 492 ms, sys: 12 ms, total: 504 ms
Wall time: 105 ms
def f_dask2():
Y = da.random.normal(size=(10000, 10000),
chunks=(10000, 100))
y = Y.mean(axis=0)[0:100].compute()
%%memit
f_dask2()
peak memory: 145.60 MiB, increment: 6.93 MiB
%%time
f_dask2()
CPU times: user 48 ms, sys: 8 ms, total: 56 ms
Wall time: 57.4 ms
from dask.distributed import Client
client = Client()
client
Y.sum().compute()
4090.221
future = client.compute(Y.sum())
future
future.result()
4090.221
huge = da.random.uniform(
size=(1500000, 100000), chunks=(10000, 10000))
"Size in memory: %.1f GB" % (huge.nbytes / 1024 ** 3)
'Size in memory: 1117.6 GB'
from dask.diagnostics import ProgressBar
# WARNING: this will take a very long time computing
# useless values. This is for pedagogical purposes
# only.
with ProgressBar():
m = huge.mean().compute()
[## ] | 11% Completed | 1min 44.8s