{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Measure copy times when using multi-threading" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook profiles a very simple copy operation on large arrays (~1 GB), using synthetic data (random walk)." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import numpy as np\n", "import bcolz\n", "from bcolz.utils import human_readable_size" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", " \n", " Loading BokehJS ...\n", "
" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/javascript": [ "\n", "(function(global) {\n", " function now() {\n", " return new Date();\n", " }\n", "\n", " if (typeof (window._bokeh_onload_callbacks) === \"undefined\") {\n", " window._bokeh_onload_callbacks = [];\n", " }\n", "\n", " function run_callbacks() {\n", " window._bokeh_onload_callbacks.forEach(function(callback) { callback() });\n", " delete window._bokeh_onload_callbacks\n", " console.info(\"Bokeh: all callbacks have finished\");\n", " }\n", "\n", " function load_libs(js_urls, callback) {\n", " window._bokeh_onload_callbacks.push(callback);\n", " if (window._bokeh_is_loading > 0) {\n", " console.log(\"Bokeh: BokehJS is being loaded, scheduling callback at\", now());\n", " return null;\n", " }\n", " if (js_urls == null || js_urls.length === 0) {\n", " run_callbacks();\n", " return null;\n", " }\n", " console.log(\"Bokeh: BokehJS not loaded, scheduling load and callback at\", now());\n", " window._bokeh_is_loading = js_urls.length;\n", " for (var i = 0; i < js_urls.length; i++) {\n", " var url = js_urls[i];\n", " var s = document.createElement('script');\n", " s.src = url;\n", " s.async = false;\n", " s.onreadystatechange = s.onload = function() {\n", " window._bokeh_is_loading--;\n", " if (window._bokeh_is_loading === 0) {\n", " console.log(\"Bokeh: all BokehJS libraries loaded\");\n", " run_callbacks()\n", " }\n", " };\n", " s.onerror = function() {\n", " console.warn(\"failed to load library \" + url);\n", " };\n", " console.log(\"Bokeh: injecting script tag for BokehJS library: \", url);\n", " document.getElementsByTagName(\"head\")[0].appendChild(s);\n", " }\n", " };\n", "\n", " var js_urls = ['https://cdn.pydata.org/bokeh/release/bokeh-0.11.1.min.js', 'https://cdn.pydata.org/bokeh/release/bokeh-widgets-0.11.1.min.js', 'https://cdn.pydata.org/bokeh/release/bokeh-compiler-0.11.1.min.js'];\n", "\n", " var inline_js = [\n", " function(Bokeh) {\n", " Bokeh.set_log_level(\"info\");\n", " },\n", " \n", " function(Bokeh) {\n", " Bokeh.$(\"#63764ed7-ac0d-4454-8adc-9a012a333d2c\").text(\"BokehJS successfully loaded\");\n", " },\n", " function(Bokeh) {\n", " console.log(\"Bokeh: injecting CSS: https://cdn.pydata.org/bokeh/release/bokeh-0.11.1.min.css\");\n", " Bokeh.embed.inject_css(\"https://cdn.pydata.org/bokeh/release/bokeh-0.11.1.min.css\");\n", " console.log(\"Bokeh: injecting CSS: https://cdn.pydata.org/bokeh/release/bokeh-widgets-0.11.1.min.css\");\n", " Bokeh.embed.inject_css(\"https://cdn.pydata.org/bokeh/release/bokeh-widgets-0.11.1.min.css\");\n", " }\n", " ];\n", "\n", " function run_inline_js() {\n", " for (var i = 0; i < inline_js.length; i++) {\n", " inline_js[i](window.Bokeh);\n", " }\n", " }\n", "\n", " if (window._bokeh_is_loading === 0) {\n", " console.log(\"Bokeh: BokehJS loaded, going straight to plotting\");\n", " run_inline_js();\n", " } else {\n", " load_libs(js_urls, function() {\n", " console.log(\"Bokeh: BokehJS plotting callback run at\", now());\n", " run_inline_js();\n", " });\n", " }\n", "}(this));" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import multiprocessing\n", "import dask\n", "import dask.array as da\n", "from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler\n", "from dask.diagnostics.profile_visualize import visualize\n", "import bokeh\n", "from bokeh.io import output_notebook\n", "output_notebook()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def profile_dask_copy(src, dst, chunks, num_workers=multiprocessing.cpu_count(), dt=0.1, lock=True):\n", " dsrc = da.from_array(src, chunks=chunks)\n", " with Profiler() as prof, ResourceProfiler(dt=dt) as rprof:\n", " da.store(dsrc, dst, num_workers=num_workers, lock=lock)\n", " visualize([prof, rprof])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## NumPy arrays" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "'1.12 GB'" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "a = np.cumsum(np.random.random_sample(150*1000*1000)-0.5) # random walk\n", "human_readable_size(a.nbytes)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 176 ms, sys: 280 ms, total: 456 ms\n", "Wall time: 458 ms\n" ] } ], "source": [ "%time a2 = a.copy()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false, "scrolled": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "profile_dask_copy(a, a2, chunks=1000*1000, lock=False, dt=0.05)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bcolz carrays (in-memory)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import bcolz\n", "bcolz.cparams.setdefaults(cname='lz4', clevel=5, shuffle=bcolz.BITSHUFFLE) # use LZ4 and bitshuffle" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "carray((150000000,), float64)\n", " nbytes: 1.12 GB; cbytes: 1.12 GB; ratio: 1.00\n", " cparams := cparams(clevel=0, shuffle=2, cname='lz4', quantize=0)\n", "[ -3.28066043e-01 -5.13630983e-01 -8.95430781e-01 ..., -1.57147748e+03\n", " -1.57151374e+03 -1.57150456e+03]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Check with no compression first\n", "ca1 = bcolz.carray(a, cparams=bcolz.cparams(clevel=0))\n", "ca1" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 780 ms, sys: 388 ms, total: 1.17 s\n", "Wall time: 586 ms\n", "1.12 GB\n" ] } ], "source": [ "%time ca2 = ca1.copy(cparams=bcolz.cparams(clevel=0))\n", "print(human_readable_size(ca2.cbytes))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When using no compression, we see the true cost of storing data in non-compressed state internally in bcolz. In this case, the time is significantly larger than numpy, but also larger than using compression:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "carray((150000000,), float64)\n", " nbytes: 1.12 GB; cbytes: 246.81 MB; ratio: 4.64\n", " cparams := cparams(clevel=5, shuffle=2, cname='lz4', quantize=3)\n", "[ -3.28125000e-01 -5.13671875e-01 -8.95507812e-01 ..., -1.57147748e+03\n", " -1.57151374e+03 -1.57150456e+03]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Now, for the compressed case\n", "ca = bcolz.carray(a, cparams=bcolz.cparams(quantize=3)) # keep 3 significant digits\n", "ca" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.44 s, sys: 232 ms, total: 2.67 s\n", "Wall time: 509 ms\n", "246.81 MB\n" ] } ], "source": [ "%time ca2 = ca.copy(cparams=bcolz.cparams(clevel=5))\n", "print(human_readable_size(ca2.cbytes))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The copy times when using compression are better than without using compression (this is not that unusual for compressible datasets) and pretty close to numpy.copy() (just between 1x and 4x slower, depending on the run and the machine)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## bcolz + dask + compression" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false, "scrolled": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "profile_dask_copy(ca, ca2, chunks=ca.chunklen, lock=True, dt=0.05)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "profile_dask_copy(ca, ca2, chunks=ca.chunklen, lock=False, dt=0.05)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case, bcolz + dask copying time is pretty close to bcolz.copy() which uses internal Blosc threads." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Zarr arrays (in-memory)" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "('zarr', '1.0.0b6.dev18')\n" ] } ], "source": [ "import zarr\n", "print('zarr', zarr.__version__)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)\n", " compression: none; compression_opts: None\n", " nbytes: 1.1G; nbytes_stored: 1.1G; ratio: 1.0; initialized: 573/573\n", " store: __builtin__.dict" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Check with no compression first\n", "za1 = zarr.array(ca, chunks=ca.chunklen, compression=None)\n", "za1" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)\n", " compression: none; compression_opts: None\n", " nbytes: 1.1G; nbytes_stored: 230; ratio: 5217391.3; initialized: 0/573\n", " store: __builtin__.dict" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "za2 = zarr.empty_like(za1)\n", "za2" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 196 ms, sys: 244 ms, total: 440 ms\n", "Wall time: 437 ms\n" ] }, { "data": { "text/plain": [ "zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)\n", " compression: none; compression_opts: None\n", " nbytes: 1.1G; nbytes_stored: 1.1G; ratio: 1.0; initialized: 573/573\n", " store: __builtin__.dict" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time za2[:] = za1\n", "za2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the non-compressed case, zarr takes significantly less time than using compression (see below). That probably means that zarr, contrarily to bcolz, does not store uncompressed data in Blosc containers, so avoiding the penalty of handling small data blocks (which is the case for bcolz, see above). " ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)\n", " compression: blosc; compression_opts: {u'cname': u'lz4', u'shuffle': 2, u'clevel': 5}\n", " nbytes: 1.1G; nbytes_stored: 245.4M; ratio: 4.7; initialized: 573/573\n", " store: __builtin__.dict" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Now, for the compressed case\n", "za = zarr.array(ca, chunks=ca.chunklen, compression='blosc', \n", " compression_opts=dict(cname='lz4', clevel=5, shuffle=2))\n", "za" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)\n", " compression: blosc; compression_opts: {u'cname': u'lz4', u'shuffle': 2, u'clevel': 5}\n", " nbytes: 1.1G; nbytes_stored: 302; ratio: 3973509.9; initialized: 0/573\n", " store: __builtin__.dict" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "za2 = zarr.empty_like(za)\n", "za2" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.8 s, sys: 96 ms, total: 1.9 s\n", "Wall time: 555 ms\n" ] }, { "data": { "text/plain": [ "zarr.core.Array((150000000,), float64, chunks=(262144,), order=C)\n", " compression: blosc; compression_opts: {u'cname': u'lz4', u'shuffle': 2, u'clevel': 5}\n", " nbytes: 1.1G; nbytes_stored: 245.4M; ratio: 4.7; initialized: 573/573\n", " store: __builtin__.dict" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time za2[:] = za\n", "za2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For the compressed case, zarr takes similar time (more or less, depending on the run) than bcolz when copying, which is the expected thing." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## zarr + dask + compression" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "profile_dask_copy(za, za2, chunks=za.chunks, lock=True, dt=0.05)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "profile_dask_copy(za, za2, chunks=za.chunks, lock=False, dt=0.05)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here it seems that dask + zarr actually do improve speed over zarr + Blosc internal threads, and the copying times are better than for dask + bcolz. The latter might be due to the fact that bcolz is not optimized (yet) for `ca2[:] = ca` operations and `ca.copy()` is usually faster." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 0 }