# Measure copy times when using multi-threading

This notebook profiles a very simple copy operation on large arrays (~1 GB), using synthetic data (random walk).

In [5]:
import numpy as np
import bcolz
from bcolz.utils import human_readable_size

In [6]:
import multiprocessing
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
import bokeh
from bokeh.io import output_notebook
output_notebook()

In [7]:
def profile_dask_copy(src, dst, chunks, num_workers=multiprocessing.cpu_count(), dt=0.1, lock=True):
    dsrc = da.from_array(src, chunks=chunks)
    with Profiler() as prof, ResourceProfiler(dt=dt) as rprof:
        da.store(dsrc, dst, num_workers=num_workers, lock=lock)
    visualize([prof, rprof])

## NumPy arrays

In [8]:
a = np.cumsum(np.random.random_sample(150*1000*1000)-0.5)  # random walk
human_readable_size(a.nbytes)

'1.12 GB'

In [9]:
%time a2 = a.copy()

CPU times: user 176 ms, sys: 280 ms, total: 456 ms
Wall time: 458 ms


In [10]:
profile_dask_copy(a, a2, chunks=1000*1000, lock=False, dt=0.05)

## Bcolz carrays (in-memory)

In [11]:
import bcolz
bcolz.cparams.setdefaults(cname='lz4', clevel=5, shuffle=bcolz.BITSHUFFLE)  # use LZ4 and bitshuffle

In [12]:
# Check with no compression first
ca1 = bcolz.carray(a, cparams=bcolz.cparams(clevel=0))
ca1

carray((150000000,), float64)
  nbytes: 1.12 GB; cbytes: 1.12 GB; ratio: 1.00
  cparams := cparams(clevel=0, shuffle=2, cname='lz4', quantize=0)
[ -3.28066043e-01  -5.13630983e-01  -8.95430781e-01 ...,  -1.57147748e+03
  -1.57151374e+03  -1.57150456e+03]

In [13]:
%time ca2 = ca1.copy(cparams=bcolz.cparams(clevel=0))
print(human_readable_size(ca2.cbytes))

CPU times: user 780 ms, sys: 388 ms, total: 1.17 s
Wall time: 586 ms
1.12 GB


When using no compression, we see the true cost of storing data in non-compressed state internally in bcolz.  In this case, the time is significantly larger than numpy, but also larger than using compression:

In [14]:
# Now, for the compressed case
ca = bcolz.carray(a, cparams=bcolz.cparams(quantize=3))   # keep 3 significant digits
ca

carray((150000000,), float64)
  nbytes: 1.12 GB; cbytes: 246.81 MB; ratio: 4.64
  cparams := cparams(clevel=5, shuffle=2, cname='lz4', quantize=3)
[ -3.28125000e-01  -5.13671875e-01  -8.95507812e-01 ...,  -1.57147748e+03
  -1.57151374e+03  -1.57150456e+03]

In [15]:
%time ca2 = ca.copy(cparams=bcolz.cparams(clevel=5))
print(human_readable_size(ca2.cbytes))

CPU times: user 2.44 s, sys: 232 ms, total: 2.67 s
Wall time: 509 ms
246.81 MB


The copy times when using compression are better than without using compression (this is not that unusual for compressible datasets) and pretty close to numpy.copy() (just between 1x and 4x slower, depending on the run and the machine).

## bcolz + dask + compression

In [16]:
profile_dask_copy(ca, ca2, chunks=ca.chunklen, lock=True, dt=0.05)

In [17]:
profile_dask_copy(ca, ca2, chunks=ca.chunklen, lock=False, dt=0.05)

In this case, bcolz + dask copying time is pretty close to bcolz.copy() which uses internal Blosc threads.

## Zarr arrays (in-memory)

In [18]:
import zarr
print('zarr', zarr.__version__)

('zarr', '1.0.0b6.dev18')


In [19]:
# Check with no compression first
za1 = zarr.array(ca, chunks=ca.chunklen, compression=None)
za1

zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)
  compression: none; compression_opts: None
  nbytes: 1.1G; nbytes_stored: 1.1G; ratio: 1.0; initialized: 573/573
  store: __builtin__.dict

In [20]:
za2 = zarr.empty_like(za1)
za2

zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)
  compression: none; compression_opts: None
  nbytes: 1.1G; nbytes_stored: 230; ratio: 5217391.3; initialized: 0/573
  store: __builtin__.dict

In [21]:
%time za2[:] = za1
za2

CPU times: user 196 ms, sys: 244 ms, total: 440 ms
Wall time: 437 ms


zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)
  compression: none; compression_opts: None
  nbytes: 1.1G; nbytes_stored: 1.1G; ratio: 1.0; initialized: 573/573
  store: __builtin__.dict

In the non-compressed case, zarr takes significantly less time than using compression (see below).  That probably means that zarr, contrarily to bcolz, does not store uncompressed data in Blosc containers, so avoiding the penalty of handling small data blocks (which is the case for bcolz, see above). 

In [22]:
# Now, for the compressed case
za = zarr.array(ca, chunks=ca.chunklen, compression='blosc', 
                compression_opts=dict(cname='lz4', clevel=5, shuffle=2))
za

zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)
  compression: blosc; compression_opts: {u'cname': u'lz4', u'shuffle': 2, u'clevel': 5}
  nbytes: 1.1G; nbytes_stored: 245.4M; ratio: 4.7; initialized: 573/573
  store: __builtin__.dict

In [23]:
za2 = zarr.empty_like(za)
za2

zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)
  compression: blosc; compression_opts: {u'cname': u'lz4', u'shuffle': 2, u'clevel': 5}
  nbytes: 1.1G; nbytes_stored: 302; ratio: 3973509.9; initialized: 0/573
  store: __builtin__.dict

In [24]:
%time za2[:] = za
za2

CPU times: user 1.8 s, sys: 96 ms, total: 1.9 s
Wall time: 555 ms


zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)
  compression: blosc; compression_opts: {u'cname': u'lz4', u'shuffle': 2, u'clevel': 5}
  nbytes: 1.1G; nbytes_stored: 245.4M; ratio: 4.7; initialized: 573/573
  store: __builtin__.dict

For the compressed case, zarr takes similar time (more or less, depending on the run) than bcolz when copying, which is the expected thing.

## zarr + dask + compression

In [25]:
profile_dask_copy(za, za2, chunks=za.chunks, lock=True, dt=0.05)

In [26]:
profile_dask_copy(za, za2, chunks=za.chunks, lock=False, dt=0.05)

Here it seems that dask + zarr actually do improve speed over zarr + Blosc internal threads, and the copying times are better than for dask + bcolz.  The latter might be due to the fact that bcolz is not optimized (yet) for `ca2[:] = ca` operations and `ca.copy()` is usually faster.