123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028 |
- /*********************************************************************
- Blosc - Blocked Shuffling and Compression Library
- Author: Francesc Alted <francesc@blosc.org>
- Creation date: 2009-05-20
- See LICENSES/BLOSC.txt for details about copyright and rights to use.
- **********************************************************************/
- #include <stdio.h>
- #include <stdlib.h>
- #include <errno.h>
- #include <string.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <assert.h>
- #if defined(USING_CMAKE)
- #include "config.h"
- #endif /* USING_CMAKE */
- #include "blosc.h"
- #include "shuffle.h"
- #include "blosclz.h"
- #if defined(HAVE_LZ4)
- #include "lz4.h"
- #include "lz4hc.h"
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- #include "snappy-c.h"
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- #include "zlib.h"
- #endif /* HAVE_ZLIB */
- #if defined(_WIN32) && !defined(__MINGW32__)
- #include <windows.h>
- #include <malloc.h>
- /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
- #if defined(_MSC_VER) && _MSC_VER < 1600
- #include "win32/stdint-windows.h"
- #else
- #include <stdint.h>
- #endif
- #include <process.h>
- #define getpid _getpid
- #else
- #include <stdint.h>
- #include <unistd.h>
- #include <inttypes.h>
- #endif /* _WIN32 */
- #if defined(_WIN32) && !defined(__GNUC__)
- #include "win32/pthread.h"
- #include "win32/pthread.c"
- #else
- #include <pthread.h>
- #endif
- /* If C11 is supported, use it's built-in aligned allocation. */
- #if __STDC_VERSION__ >= 201112L
- #include <stdalign.h>
- #endif
- /* Some useful units */
- #define KB 1024
- #define MB (1024*KB)
- /* Minimum buffer size to be compressed */
- #define MIN_BUFFERSIZE 128 /* Cannot be smaller than 66 */
- /* The maximum number of splits in a block for compression */
- #define MAX_SPLITS 16 /* Cannot be larger than 128 */
- /* The size of L1 cache. 32 KB is quite common nowadays. */
- #define L1 (32*KB)
- /* Have problems using posix barriers when symbol value is 200112L */
- /* This requires more investigation, but will work for the moment */
- #if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
- #define _POSIX_BARRIERS_MINE
- #endif
- /* Synchronization variables */
- struct blosc_context {
- int32_t compress; /* 1 if we are doing compression 0 if decompress */
- const uint8_t* src;
- uint8_t* dest; /* The current pos in the destination buffer */
- uint8_t* header_flags; /* Flags for header. Currently booked:
- - 0: byte-shuffled?
- - 1: memcpy'ed?
- - 2: bit-shuffled? */
- int32_t sourcesize; /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
- int32_t nblocks; /* Number of total blocks in buffer */
- int32_t leftover; /* Extra bytes at end of buffer */
- int32_t blocksize; /* Length of the block in bytes */
- int32_t typesize; /* Type size */
- int32_t num_output_bytes; /* Counter for the number of output bytes */
- int32_t destsize; /* Maximum size for destination buffer */
- uint8_t* bstarts; /* Start of the buffer past header info */
- int32_t compcode; /* Compressor code to use */
- int clevel; /* Compression level (1-9) */
- /* Threading */
- int32_t numthreads;
- int32_t threads_started;
- int32_t end_threads;
- pthread_t threads[BLOSC_MAX_THREADS];
- int32_t tids[BLOSC_MAX_THREADS];
- pthread_mutex_t count_mutex;
- #ifdef _POSIX_BARRIERS_MINE
- pthread_barrier_t barr_init;
- pthread_barrier_t barr_finish;
- #else
- int32_t count_threads;
- pthread_mutex_t count_threads_mutex;
- pthread_cond_t count_threads_cv;
- #endif
- #if !defined(_WIN32)
- pthread_attr_t ct_attr; /* creation time attrs for threads */
- #endif
- int32_t thread_giveup_code; /* error code when give up */
- int32_t thread_nblock; /* block counter */
- };
- struct thread_context {
- struct blosc_context* parent_context;
- int32_t tid;
- uint8_t* tmp;
- uint8_t* tmp2;
- uint8_t* tmp3;
- int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
- };
- /* Global context for non-contextual API */
- static struct blosc_context* g_global_context;
- static pthread_mutex_t global_comp_mutex;
- static int32_t g_compressor = BLOSC_BLOSCLZ; /* the compressor to use by default */
- static int32_t g_threads = 1;
- static int32_t g_force_blocksize = 0;
- static int32_t g_initlib = 0;
- /* Wrapped function to adjust the number of threads used by blosc */
- int blosc_set_nthreads_(struct blosc_context*);
- /* Releases the global threadpool */
- int blosc_release_threadpool(struct blosc_context* context);
- /* Macros for synchronization */
- /* Wait until all threads are initialized */
- #ifdef _POSIX_BARRIERS_MINE
- #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
- rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
- if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
- printf("Could not wait on barrier (init): %d\n", rc); \
- return((RET_VAL)); \
- }
- #else
- #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
- pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
- if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
- CONTEXT_PTR->count_threads++; \
- pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
- } \
- else { \
- pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
- } \
- pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
- #endif
- /* Wait for all threads to finish */
- #ifdef _POSIX_BARRIERS_MINE
- #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
- rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
- if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
- printf("Could not wait on barrier (finish)\n"); \
- return((RET_VAL)); \
- }
- #else
- #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
- pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
- if (CONTEXT_PTR->count_threads > 0) { \
- CONTEXT_PTR->count_threads--; \
- pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
- } \
- else { \
- pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
- } \
- pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
- #endif
- /* A function for aligned malloc that is portable */
- static uint8_t *my_malloc(size_t size)
- {
- void *block = NULL;
- int res = 0;
- /* Do an alignment to 32 bytes because AVX2 is supported */
- #if _ISOC11_SOURCE
- /* C11 aligned allocation. 'size' must be a multiple of the alignment. */
- block = aligned_alloc(32, size);
- #elif defined(_WIN32)
- /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
- block = (void *)_aligned_malloc(size, 32);
- #elif defined __APPLE__
- /* Mac OS X guarantees 16-byte alignment in small allocs */
- block = malloc(size);
- #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
- /* Platform does have an implementation of posix_memalign */
- res = posix_memalign(&block, 32, size);
- #else
- block = malloc(size);
- #endif /* _WIN32 */
- if (block == NULL || res != 0) {
- printf("Error allocating memory!");
- return NULL;
- }
- return (uint8_t *)block;
- }
- /* Release memory booked by my_malloc */
- static void my_free(void *block)
- {
- #if defined(_WIN32)
- _aligned_free(block);
- #else
- free(block);
- #endif /* _WIN32 */
- }
- /* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
- static int32_t sw32_(const uint8_t *pa)
- {
- int32_t idest;
- uint8_t *dest = (uint8_t *)&idest;
- int i = 1; /* for big/little endian detection */
- char *p = (char *)&i;
- if (p[0] != 1) {
- /* big endian */
- dest[0] = pa[3];
- dest[1] = pa[2];
- dest[2] = pa[1];
- dest[3] = pa[0];
- }
- else {
- /* little endian */
- dest[0] = pa[0];
- dest[1] = pa[1];
- dest[2] = pa[2];
- dest[3] = pa[3];
- }
- return idest;
- }
- /* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
- static void _sw32(uint8_t* dest, int32_t a)
- {
- uint8_t *pa = (uint8_t *)&a;
- int i = 1; /* for big/little endian detection */
- char *p = (char *)&i;
- if (p[0] != 1) {
- /* big endian */
- dest[0] = pa[3];
- dest[1] = pa[2];
- dest[2] = pa[1];
- dest[3] = pa[0];
- }
- else {
- /* little endian */
- dest[0] = pa[0];
- dest[1] = pa[1];
- dest[2] = pa[2];
- dest[3] = pa[3];
- }
- }
- /*
- * Conversion routines between compressor and compression libraries
- */
- /* Return the library code associated with the compressor name */
- static int compname_to_clibcode(const char *compname)
- {
- if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
- return BLOSC_BLOSCLZ_LIB;
- if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
- return BLOSC_LZ4_LIB;
- if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
- return BLOSC_LZ4_LIB;
- if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
- return BLOSC_SNAPPY_LIB;
- if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
- return BLOSC_ZLIB_LIB;
- return -1;
- }
- /* Return the library name associated with the compressor code */
- static char *clibcode_to_clibname(int clibcode)
- {
- if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
- if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
- if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
- if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
- return NULL; /* should never happen */
- }
- /*
- * Conversion routines between compressor names and compressor codes
- */
- /* Get the compressor name associated with the compressor code */
- int blosc_compcode_to_compname(int compcode, char **compname)
- {
- int code = -1; /* -1 means non-existent compressor code */
- char *name = NULL;
- /* Map the compressor code */
- if (compcode == BLOSC_BLOSCLZ)
- name = BLOSC_BLOSCLZ_COMPNAME;
- else if (compcode == BLOSC_LZ4)
- name = BLOSC_LZ4_COMPNAME;
- else if (compcode == BLOSC_LZ4HC)
- name = BLOSC_LZ4HC_COMPNAME;
- else if (compcode == BLOSC_SNAPPY)
- name = BLOSC_SNAPPY_COMPNAME;
- else if (compcode == BLOSC_ZLIB)
- name = BLOSC_ZLIB_COMPNAME;
- *compname = name;
- /* Guess if there is support for this code */
- if (compcode == BLOSC_BLOSCLZ)
- code = BLOSC_BLOSCLZ;
- #if defined(HAVE_LZ4)
- else if (compcode == BLOSC_LZ4)
- code = BLOSC_LZ4;
- else if (compcode == BLOSC_LZ4HC)
- code = BLOSC_LZ4HC;
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- else if (compcode == BLOSC_SNAPPY)
- code = BLOSC_SNAPPY;
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- else if (compcode == BLOSC_ZLIB)
- code = BLOSC_ZLIB;
- #endif /* HAVE_ZLIB */
- return code;
- }
- /* Get the compressor code for the compressor name. -1 if it is not available */
- int blosc_compname_to_compcode(const char *compname)
- {
- int code = -1; /* -1 means non-existent compressor code */
- if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
- code = BLOSC_BLOSCLZ;
- }
- #if defined(HAVE_LZ4)
- else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
- code = BLOSC_LZ4;
- }
- else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
- code = BLOSC_LZ4HC;
- }
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
- code = BLOSC_SNAPPY;
- }
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
- code = BLOSC_ZLIB;
- }
- #endif /* HAVE_ZLIB */
- return code;
- }
- #if defined(HAVE_LZ4)
- static int lz4_wrap_compress(const char* input, size_t input_length,
- char* output, size_t maxout, int accel)
- {
- int cbytes;
- cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
- accel);
- return cbytes;
- }
- static int lz4hc_wrap_compress(const char* input, size_t input_length,
- char* output, size_t maxout, int clevel)
- {
- int cbytes;
- if (input_length > (size_t)(2<<30))
- return -1; /* input larger than 1 GB is not supported */
- /* clevel for lz4hc goes up to 16, at least in LZ4 1.1.3 */
- cbytes = LZ4_compressHC2_limitedOutput(input, output, (int)input_length,
- (int)maxout, clevel*2-1);
- return cbytes;
- }
- static int lz4_wrap_decompress(const char* input, size_t compressed_length,
- char* output, size_t maxout)
- {
- size_t cbytes;
- cbytes = LZ4_decompress_fast(input, output, (int)maxout);
- if (cbytes != compressed_length) {
- return 0;
- }
- return (int)maxout;
- }
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- static int snappy_wrap_compress(const char* input, size_t input_length,
- char* output, size_t maxout)
- {
- snappy_status status;
- size_t cl = maxout;
- status = snappy_compress(input, input_length, output, &cl);
- if (status != SNAPPY_OK){
- return 0;
- }
- return (int)cl;
- }
- static int snappy_wrap_decompress(const char* input, size_t compressed_length,
- char* output, size_t maxout)
- {
- snappy_status status;
- size_t ul = maxout;
- status = snappy_uncompress(input, compressed_length, output, &ul);
- if (status != SNAPPY_OK){
- return 0;
- }
- return (int)ul;
- }
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- /* zlib is not very respectful with sharing name space with others.
- Fortunately, its names do not collide with those already in blosc. */
- static int zlib_wrap_compress(const char* input, size_t input_length,
- char* output, size_t maxout, int clevel)
- {
- int status;
- uLongf cl = maxout;
- status = compress2(
- (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
- if (status != Z_OK){
- return 0;
- }
- return (int)cl;
- }
- static int zlib_wrap_decompress(const char* input, size_t compressed_length,
- char* output, size_t maxout)
- {
- int status;
- uLongf ul = maxout;
- status = uncompress(
- (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
- if (status != Z_OK){
- return 0;
- }
- return (int)ul;
- }
- #endif /* HAVE_ZLIB */
- /* Compute acceleration for blosclz */
- static int get_accel(const struct blosc_context* context) {
- int32_t clevel = context->clevel;
- int32_t typesize = context->typesize;
- if (clevel == 9) {
- return 1;
- }
- if (context->compcode == BLOSC_BLOSCLZ) {
- /* Compute the power of 2. See:
- * http://www.exploringbinary.com/ten-ways-to-check-if-an-integer-is-a-power-of-two-in-c/
- */
- int32_t tspow2 = ((typesize != 0) && !(typesize & (typesize - 1)));
- if (tspow2 && typesize < 32) {
- return 32;
- }
- }
- else if (context->compcode == BLOSC_LZ4) {
- /* This acceleration setting based on discussions held in:
- * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
- */
- return (10 - clevel);
- }
- return 1;
- }
- /* Shuffle & compress a single block */
- static int blosc_c(const struct blosc_context* context, int32_t blocksize,
- int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
- const uint8_t *src, uint8_t *dest, uint8_t *tmp,
- uint8_t *tmp2)
- {
- int32_t j, neblock, nsplits;
- int32_t cbytes; /* number of compressed bytes in split */
- int32_t ctbytes = 0; /* number of compressed bytes in block */
- int32_t maxout;
- int32_t typesize = context->typesize;
- const uint8_t *_tmp = src;
- char *compname;
- int accel;
- int bscount;
- if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) {
- /* Byte shuffling only makes sense if typesize > 1 */
- shuffle(typesize, blocksize, src, tmp);
- _tmp = tmp;
- }
- /* We don't allow more than 1 filter at the same time (yet) */
- else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) {
- bscount = bitshuffle(typesize, blocksize, src, tmp, tmp2);
- if (bscount < 0)
- return bscount;
- _tmp = tmp;
- }
- /* Calculate acceleration for different compressors */
- accel = get_accel(context);
- /* Compress for each shuffled slice split for this block. */
- /* If typesize is too large, neblock is too small or we are in a
- leftover block, do not split at all. */
- if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
- (!leftoverblock)) {
- nsplits = typesize;
- }
- else {
- nsplits = 1;
- }
- neblock = blocksize / nsplits;
- for (j = 0; j < nsplits; j++) {
- dest += sizeof(int32_t);
- ntbytes += (int32_t)sizeof(int32_t);
- ctbytes += (int32_t)sizeof(int32_t);
- maxout = neblock;
- #if defined(HAVE_SNAPPY)
- if (context->compcode == BLOSC_SNAPPY) {
- /* TODO perhaps refactor this to keep the value stashed somewhere */
- maxout = snappy_max_compressed_length(neblock);
- }
- #endif /* HAVE_SNAPPY */
- if (ntbytes+maxout > maxbytes) {
- maxout = maxbytes - ntbytes; /* avoid buffer overrun */
- if (maxout <= 0) {
- return 0; /* non-compressible block */
- }
- }
- if (context->compcode == BLOSC_BLOSCLZ) {
- cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
- dest, maxout, accel);
- }
- #if defined(HAVE_LZ4)
- else if (context->compcode == BLOSC_LZ4) {
- cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
- (char *)dest, (size_t)maxout, accel);
- }
- else if (context->compcode == BLOSC_LZ4HC) {
- cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
- (char *)dest, (size_t)maxout,
- context->clevel);
- }
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- else if (context->compcode == BLOSC_SNAPPY) {
- cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
- (char *)dest, (size_t)maxout);
- }
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- else if (context->compcode == BLOSC_ZLIB) {
- cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
- (char *)dest, (size_t)maxout,
- context->clevel);
- }
- #endif /* HAVE_ZLIB */
- else {
- blosc_compcode_to_compname(context->compcode, &compname);
- fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
- fprintf(stderr, "compression support. Please use one having it.");
- return -5; /* signals no compression support */
- }
- if (cbytes > maxout) {
- /* Buffer overrun caused by compression (should never happen) */
- return -1;
- }
- else if (cbytes < 0) {
- /* cbytes should never be negative */
- return -2;
- }
- else if (cbytes == 0 || cbytes == neblock) {
- /* The compressor has been unable to compress data at all. */
- /* Before doing the copy, check that we are not running into a
- buffer overflow. */
- if ((ntbytes+neblock) > maxbytes) {
- return 0; /* Non-compressible data */
- }
- memcpy(dest, _tmp+j*neblock, neblock);
- cbytes = neblock;
- }
- _sw32(dest - 4, cbytes);
- dest += cbytes;
- ntbytes += cbytes;
- ctbytes += cbytes;
- } /* Closes j < nsplits */
- return ctbytes;
- }
- /* Decompress & unshuffle a single block */
- static int blosc_d(struct blosc_context* context, int32_t blocksize, int32_t leftoverblock,
- const uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2)
- {
- int32_t j, neblock, nsplits;
- int32_t nbytes; /* number of decompressed bytes in split */
- int32_t cbytes; /* number of compressed bytes in split */
- int32_t ctbytes = 0; /* number of compressed bytes in block */
- int32_t ntbytes = 0; /* number of uncompressed bytes in block */
- uint8_t *_tmp = dest;
- int32_t typesize = context->typesize;
- int32_t compformat;
- char *compname;
- int bscount;
- if ((*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) || \
- (*(context->header_flags) & BLOSC_DOBITSHUFFLE)) {
- _tmp = tmp;
- }
- compformat = (*(context->header_flags) & 0xe0) >> 5;
- /* Compress for each shuffled slice split for this block. */
- if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
- (!leftoverblock)) {
- nsplits = typesize;
- }
- else {
- nsplits = 1;
- }
- neblock = blocksize / nsplits;
- for (j = 0; j < nsplits; j++) {
- cbytes = sw32_(src); /* amount of compressed bytes */
- src += sizeof(int32_t);
- ctbytes += (int32_t)sizeof(int32_t);
- /* Uncompress */
- if (cbytes == neblock) {
- memcpy(_tmp, src, neblock);
- nbytes = neblock;
- }
- else {
- if (compformat == BLOSC_BLOSCLZ_FORMAT) {
- nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
- }
- #if defined(HAVE_LZ4)
- else if (compformat == BLOSC_LZ4_FORMAT) {
- nbytes = lz4_wrap_decompress((char *)src, (size_t)cbytes,
- (char*)_tmp, (size_t)neblock);
- }
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- else if (compformat == BLOSC_SNAPPY_FORMAT) {
- nbytes = snappy_wrap_decompress((char *)src, (size_t)cbytes,
- (char*)_tmp, (size_t)neblock);
- }
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- else if (compformat == BLOSC_ZLIB_FORMAT) {
- nbytes = zlib_wrap_decompress((char *)src, (size_t)cbytes,
- (char*)_tmp, (size_t)neblock);
- }
- #endif /* HAVE_ZLIB */
- else {
- compname = clibcode_to_clibname(compformat);
- fprintf(stderr,
- "Blosc has not been compiled with decompression "
- "support for '%s' format. ", compname);
- fprintf(stderr, "Please recompile for adding this support.\n");
- return -5; /* signals no decompression support */
- }
- /* Check that decompressed bytes number is correct */
- if (nbytes != neblock) {
- return -2;
- }
- }
- src += cbytes;
- ctbytes += cbytes;
- _tmp += nbytes;
- ntbytes += nbytes;
- } /* Closes j < nsplits */
- if (*(context->header_flags) & BLOSC_DOSHUFFLE & (typesize > 1)) {
- unshuffle(typesize, blocksize, tmp, dest);
- }
- else if (*(context->header_flags) & BLOSC_DOBITSHUFFLE) {
- bscount = bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
- if (bscount < 0)
- return bscount;
- }
- /* Return the number of uncompressed bytes */
- return ntbytes;
- }
- /* Serial version for compression/decompression */
- static int serial_blosc(struct blosc_context* context)
- {
- int32_t j, bsize, leftoverblock;
- int32_t cbytes;
- int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
- int32_t ntbytes = context->num_output_bytes;
- uint8_t *tmp = my_malloc(context->blocksize + ebsize);
- uint8_t *tmp2 = tmp + context->blocksize;
- for (j = 0; j < context->nblocks; j++) {
- if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
- _sw32(context->bstarts + j * 4, ntbytes);
- }
- bsize = context->blocksize;
- leftoverblock = 0;
- if ((j == context->nblocks - 1) && (context->leftover > 0)) {
- bsize = context->leftover;
- leftoverblock = 1;
- }
- if (context->compress) {
- if (*(context->header_flags) & BLOSC_MEMCPYED) {
- /* We want to memcpy only */
- memcpy(context->dest+BLOSC_MAX_OVERHEAD+j*context->blocksize,
- context->src+j*context->blocksize,
- bsize);
- cbytes = bsize;
- }
- else {
- /* Regular compression */
- cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
- context->destsize, context->src+j*context->blocksize,
- context->dest+ntbytes, tmp, tmp2);
- if (cbytes == 0) {
- ntbytes = 0; /* uncompressible data */
- break;
- }
- }
- }
- else {
- if (*(context->header_flags) & BLOSC_MEMCPYED) {
- /* We want to memcpy only */
- memcpy(context->dest+j*context->blocksize,
- context->src+BLOSC_MAX_OVERHEAD+j*context->blocksize,
- bsize);
- cbytes = bsize;
- }
- else {
- /* Regular decompression */
- cbytes = blosc_d(context, bsize, leftoverblock,
- context->src + sw32_(context->bstarts + j * 4),
- context->dest+j*context->blocksize, tmp, tmp2);
- }
- }
- if (cbytes < 0) {
- ntbytes = cbytes; /* error in blosc_c or blosc_d */
- break;
- }
- ntbytes += cbytes;
- }
- // Free temporaries
- my_free(tmp);
- return ntbytes;
- }
- /* Threaded version for compression/decompression */
- static int parallel_blosc(struct blosc_context* context)
- {
- int rc;
- /* Check whether we need to restart threads */
- blosc_set_nthreads_(context);
- /* Set sentinels */
- context->thread_giveup_code = 1;
- context->thread_nblock = -1;
- /* Synchronization point for all threads (wait for initialization) */
- WAIT_INIT(-1, context);
- /* Synchronization point for all threads (wait for finalization) */
- WAIT_FINISH(-1, context);
- if (context->thread_giveup_code > 0) {
- /* Return the total bytes (de-)compressed in threads */
- return context->num_output_bytes;
- }
- else {
- /* Compression/decompression gave up. Return error code. */
- return context->thread_giveup_code;
- }
- }
- /* Do the compression or decompression of the buffer depending on the
- global params. */
- static int do_job(struct blosc_context* context)
- {
- int32_t ntbytes;
- /* Run the serial version when nthreads is 1 or when the buffers are
- not much larger than blocksize */
- if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
- ntbytes = serial_blosc(context);
- }
- else {
- ntbytes = parallel_blosc(context);
- }
- return ntbytes;
- }
- static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
- int32_t typesize, int32_t nbytes,
- int32_t forced_blocksize)
- {
- int32_t blocksize;
- /* Protection against very small buffers */
- if (nbytes < (int32_t)typesize) {
- return 1;
- }
- blocksize = nbytes; /* Start by a whole buffer as blocksize */
- if (forced_blocksize) {
- blocksize = forced_blocksize;
- /* Check that forced blocksize is not too small */
- if (blocksize < MIN_BUFFERSIZE) {
- blocksize = MIN_BUFFERSIZE;
- }
- }
- else if (nbytes >= L1) {
- blocksize = L1;
- /* For Zlib, increase the block sizes in a factor of 8 because it
- is meant for compression large blocks (it shows a big overhead
- in compressing small ones). */
- if (context->compcode == BLOSC_ZLIB) {
- blocksize *= 8;
- }
- /* For LZ4HC, increase the block sizes in a factor of 8 because it
- is meant for compression large blocks (it shows a big overhead
- in compressing small ones). */
- if (context->compcode == BLOSC_LZ4HC) {
- blocksize *= 8;
- }
- if (clevel == 0) {
- blocksize /= 4;
- }
- else if (clevel <= 3) {
- blocksize /= 2;
- }
- else if (clevel <= 5) {
- blocksize *= 1;
- }
- else if (clevel <= 6) {
- blocksize *= 2;
- }
- else if (clevel < 9) {
- blocksize *= 4;
- }
- else {
- blocksize *= 16;
- }
- }
- /* Check that blocksize is not too large */
- if (blocksize > (int32_t)nbytes) {
- blocksize = nbytes;
- }
- /* blocksize *must absolutely* be a multiple of the typesize */
- if (blocksize > typesize) {
- blocksize = blocksize / typesize * typesize;
- }
- return blocksize;
- }
- static int initialize_context_compression(struct blosc_context* context,
- int clevel,
- int doshuffle,
- size_t typesize,
- size_t sourcesize,
- const void* src,
- void* dest,
- size_t destsize,
- int32_t compressor,
- int32_t blocksize,
- int32_t numthreads)
- {
- /* Set parameters */
- context->compress = 1;
- context->src = (const uint8_t*)src;
- context->dest = (uint8_t *)(dest);
- context->num_output_bytes = 0;
- context->destsize = (int32_t)destsize;
- context->sourcesize = sourcesize;
- context->typesize = typesize;
- context->compcode = compressor;
- context->numthreads = numthreads;
- context->end_threads = 0;
- context->clevel = clevel;
- /* Check buffer size limits */
- if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
- /* If buffer is too large, give up. */
- fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
- BLOSC_MAX_BUFFERSIZE);
- return -1;
- }
- /* Compression level */
- if (clevel < 0 || clevel > 9) {
- /* If clevel not in 0..9, print an error */
- fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
- return -10;
- }
- /* Shuffle */
- if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
- fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
- return -10;
- }
- /* Check typesize limits */
- if (context->typesize > BLOSC_MAX_TYPESIZE) {
- /* If typesize is too large, treat buffer as an 1-byte stream. */
- context->typesize = 1;
- }
- /* Get the blocksize */
- context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
- /* Compute number of blocks in buffer */
- context->nblocks = context->sourcesize / context->blocksize;
- context->leftover = context->sourcesize % context->blocksize;
- context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
- return 1;
- }
- static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
- {
- int32_t compformat;
- /* Write version header for this block */
- context->dest[0] = BLOSC_VERSION_FORMAT; /* blosc format version */
- /* Write compressor format */
- compformat = -1;
- switch (context->compcode)
- {
- case BLOSC_BLOSCLZ:
- compformat = BLOSC_BLOSCLZ_FORMAT;
- context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
- break;
- #if defined(HAVE_LZ4)
- case BLOSC_LZ4:
- compformat = BLOSC_LZ4_FORMAT;
- context->dest[1] = BLOSC_LZ4_VERSION_FORMAT; /* lz4 format version */
- break;
- case BLOSC_LZ4HC:
- compformat = BLOSC_LZ4HC_FORMAT;
- context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
- break;
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- case BLOSC_SNAPPY:
- compformat = BLOSC_SNAPPY_FORMAT;
- context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT; /* snappy format version */
- break;
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- case BLOSC_ZLIB:
- compformat = BLOSC_ZLIB_FORMAT;
- context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT; /* zlib format version */
- break;
- #endif /* HAVE_ZLIB */
- default:
- {
- char *compname;
- compname = clibcode_to_clibname(compformat);
- fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
- fprintf(stderr, "compression support. Please use one having it.");
- return -5; /* signals no compression support */
- break;
- }
- }
- context->header_flags = context->dest+2; /* flags */
- context->dest[2] = 0; /* zeroes flags */
- context->dest[3] = (uint8_t)context->typesize; /* type size */
- _sw32(context->dest + 4, context->sourcesize); /* size of the buffer */
- _sw32(context->dest + 8, context->blocksize); /* block size */
- context->bstarts = context->dest + 16; /* starts for every block */
- context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks; /* space for header and pointers */
- if (context->clevel == 0) {
- /* Compression level 0 means buffer to be memcpy'ed */
- *(context->header_flags) |= BLOSC_MEMCPYED;
- }
- if (context->sourcesize < MIN_BUFFERSIZE) {
- /* Buffer is too small. Try memcpy'ing. */
- *(context->header_flags) |= BLOSC_MEMCPYED;
- }
- if (doshuffle == BLOSC_SHUFFLE) {
- /* Byte-shuffle is active */
- *(context->header_flags) |= BLOSC_DOSHUFFLE; /* bit 0 set to one in flags */
- }
- if (doshuffle == BLOSC_BITSHUFFLE) {
- /* Bit-shuffle is active */
- *(context->header_flags) |= BLOSC_DOBITSHUFFLE; /* bit 2 set to one in flags */
- }
- *(context->header_flags) |= compformat << 5; /* compressor format start at bit 5 */
- return 1;
- }
- int blosc_compress_context(struct blosc_context* context)
- {
- int32_t ntbytes = 0;
- if (!(*(context->header_flags) & BLOSC_MEMCPYED)) {
- /* Do the actual compression */
- ntbytes = do_job(context);
- if (ntbytes < 0) {
- return -1;
- }
- if ((ntbytes == 0) && (context->sourcesize+BLOSC_MAX_OVERHEAD <= context->destsize)) {
- /* Last chance for fitting `src` buffer in `dest`. Update flags
- and do a memcpy later on. */
- *(context->header_flags) |= BLOSC_MEMCPYED;
- }
- }
- if (*(context->header_flags) & BLOSC_MEMCPYED) {
- if (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize) {
- /* We are exceeding maximum output size */
- ntbytes = 0;
- }
- else {
- memcpy(context->dest+BLOSC_MAX_OVERHEAD, context->src,
- context->sourcesize);
- ntbytes = context->sourcesize + BLOSC_MAX_OVERHEAD;
- }
- }
- /* Set the number of compressed bytes in header */
- _sw32(context->dest + 12, ntbytes);
- assert(ntbytes <= context->destsize);
- return ntbytes;
- }
- /* The public routine for compression with context. */
- int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
- size_t nbytes, const void* src, void* dest,
- size_t destsize, const char* compressor,
- size_t blocksize, int numinternalthreads)
- {
- int error, result;
- struct blosc_context context;
- context.threads_started = 0;
- error = initialize_context_compression(&context, clevel, doshuffle, typesize,
- nbytes, src, dest, destsize,
- blosc_compname_to_compcode(compressor),
- blocksize, numinternalthreads);
- if (error < 0) { return error; }
- error = write_compression_header(&context, clevel, doshuffle);
- if (error < 0) { return error; }
- result = blosc_compress_context(&context);
- if (numinternalthreads > 1)
- {
- blosc_release_threadpool(&context);
- }
- return result;
- }
- /* The public routine for compression. See blosc.h for docstrings. */
- int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
- const void *src, void *dest, size_t destsize)
- {
- int error;
- int result;
- char* envvar;
- /* Check if should initialize */
- if (!g_initlib) blosc_init();
- /* Check for a BLOSC_CLEVEL environment variable */
- envvar = getenv("BLOSC_CLEVEL");
- if (envvar != NULL) {
- long value;
- value = strtol(envvar, NULL, 10);
- if ((value != EINVAL) && (value >= 0)) {
- clevel = (int)value;
- }
- }
- /* Check for a BLOSC_SHUFFLE environment variable */
- envvar = getenv("BLOSC_SHUFFLE");
- if (envvar != NULL) {
- if (strcmp(envvar, "NOSHUFFLE") == 0) {
- doshuffle = BLOSC_NOSHUFFLE;
- }
- if (strcmp(envvar, "SHUFFLE") == 0) {
- doshuffle = BLOSC_SHUFFLE;
- }
- if (strcmp(envvar, "BITSHUFFLE") == 0) {
- doshuffle = BLOSC_BITSHUFFLE;
- }
- }
- /* Check for a BLOSC_TYPESIZE environment variable */
- envvar = getenv("BLOSC_TYPESIZE");
- if (envvar != NULL) {
- long value;
- value = strtol(envvar, NULL, 10);
- if ((value != EINVAL) && (value > 0)) {
- typesize = (int)value;
- }
- }
- /* Check for a BLOSC_COMPRESSOR environment variable */
- envvar = getenv("BLOSC_COMPRESSOR");
- if (envvar != NULL) {
- result = blosc_set_compressor(envvar);
- if (result < 0) { return result; }
- }
- /* Check for a BLOSC_COMPRESSOR environment variable */
- envvar = getenv("BLOSC_BLOCKSIZE");
- if (envvar != NULL) {
- long blocksize;
- blocksize = strtol(envvar, NULL, 10);
- if ((blocksize != EINVAL) && (blocksize > 0)) {
- blosc_set_blocksize((size_t)blocksize);
- }
- }
- /* Check for a BLOSC_NTHREADS environment variable */
- envvar = getenv("BLOSC_NTHREADS");
- if (envvar != NULL) {
- long nthreads;
- nthreads = strtol(envvar, NULL, 10);
- if ((nthreads != EINVAL) && (nthreads > 0)) {
- result = blosc_set_nthreads((int)nthreads);
- if (result < 0) { return result; }
- }
- }
- /* Check for a BLOSC_NOLOCK environment variable. It is important
- that this should be the last env var so that it can take the
- previous ones into account */
- envvar = getenv("BLOSC_NOLOCK");
- if (envvar != NULL) {
- char *compname;
- blosc_compcode_to_compname(g_compressor, &compname);
- result = blosc_compress_ctx(clevel, doshuffle, typesize,
- nbytes, src, dest, destsize,
- compname, g_force_blocksize, g_threads);
- return result;
- }
- pthread_mutex_lock(&global_comp_mutex);
- error = initialize_context_compression(g_global_context, clevel, doshuffle,
- typesize, nbytes, src, dest, destsize,
- g_compressor, g_force_blocksize,
- g_threads);
- if (error < 0) { return error; }
- error = write_compression_header(g_global_context, clevel, doshuffle);
- if (error < 0) { return error; }
- result = blosc_compress_context(g_global_context);
- pthread_mutex_unlock(&global_comp_mutex);
- return result;
- }
- int blosc_run_decompression_with_context(struct blosc_context* context,
- const void* src,
- void* dest,
- size_t destsize,
- int numinternalthreads)
- {
- uint8_t version;
- uint8_t versionlz;
- uint32_t ctbytes;
- int32_t ntbytes;
- context->compress = 0;
- context->src = (const uint8_t*)src;
- context->dest = (uint8_t*)dest;
- context->destsize = destsize;
- context->num_output_bytes = 0;
- context->numthreads = numinternalthreads;
- context->end_threads = 0;
- /* Read the header block */
- version = context->src[0]; /* blosc format version */
- versionlz = context->src[1]; /* blosclz format version */
- context->header_flags = (uint8_t*)(context->src + 2); /* flags */
- context->typesize = (int32_t)context->src[3]; /* typesize */
- context->sourcesize = sw32_(context->src + 4); /* buffer size */
- context->blocksize = sw32_(context->src + 8); /* block size */
- ctbytes = sw32_(context->src + 12); /* compressed buffer size */
- /* Unused values */
- version += 0; /* shut up compiler warning */
- versionlz += 0; /* shut up compiler warning */
- ctbytes += 0; /* shut up compiler warning */
- context->bstarts = (uint8_t*)(context->src + 16);
- /* Compute some params */
- /* Total blocks */
- context->nblocks = context->sourcesize / context->blocksize;
- context->leftover = context->sourcesize % context->blocksize;
- context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
- /* Check that we have enough space to decompress */
- if (context->sourcesize > (int32_t)destsize) {
- return -1;
- }
- /* Check whether this buffer is memcpy'ed */
- if (*(context->header_flags) & BLOSC_MEMCPYED) {
- memcpy(dest, (uint8_t *)src+BLOSC_MAX_OVERHEAD, context->sourcesize);
- ntbytes = context->sourcesize;
- }
- else {
- /* Do the actual decompression */
- ntbytes = do_job(context);
- if (ntbytes < 0) {
- return -1;
- }
- }
- assert(ntbytes <= (int32_t)destsize);
- return ntbytes;
- }
- /* The public routine for decompression with context. */
- int blosc_decompress_ctx(const void *src, void *dest, size_t destsize,
- int numinternalthreads)
- {
- int result;
- struct blosc_context context;
- context.threads_started = 0;
- result = blosc_run_decompression_with_context(&context, src, dest, destsize, numinternalthreads);
- if (numinternalthreads > 1)
- {
- blosc_release_threadpool(&context);
- }
- return result;
- }
- /* The public routine for decompression. See blosc.h for docstrings. */
- int blosc_decompress(const void *src, void *dest, size_t destsize)
- {
- int result;
- char* envvar;
- long nthreads;
- /* Check if should initialize */
- if (!g_initlib) blosc_init();
- /* Check for a BLOSC_NTHREADS environment variable */
- envvar = getenv("BLOSC_NTHREADS");
- if (envvar != NULL) {
- nthreads = strtol(envvar, NULL, 10);
- if ((nthreads != EINVAL) && (nthreads > 0)) {
- result = blosc_set_nthreads((int)nthreads);
- if (result < 0) { return result; }
- }
- }
- /* Check for a BLOSC_NOLOCK environment variable. It is important
- that this should be the last env var so that it can take the
- previous ones into account */
- envvar = getenv("BLOSC_NOLOCK");
- if (envvar != NULL) {
- result = blosc_decompress_ctx(src, dest, destsize, g_threads);
- return result;
- }
- pthread_mutex_lock(&global_comp_mutex);
- result = blosc_run_decompression_with_context(g_global_context, src, dest,
- destsize, g_threads);
- pthread_mutex_unlock(&global_comp_mutex);
- return result;
- }
- /* Specific routine optimized for decompression a small number of
- items out of a compressed chunk. This does not use threads because
- it would affect negatively to performance. */
- int blosc_getitem(const void *src, int start, int nitems, void *dest)
- {
- uint8_t *_src=NULL; /* current pos for source buffer */
- uint8_t version, versionlz; /* versions for compressed header */
- uint8_t flags; /* flags for header */
- int32_t ntbytes = 0; /* the number of uncompressed bytes */
- int32_t nblocks; /* number of total blocks in buffer */
- int32_t leftover; /* extra bytes at end of buffer */
- uint8_t *bstarts; /* start pointers for each block */
- int tmp_init = 0;
- int32_t typesize, blocksize, nbytes, ctbytes;
- int32_t j, bsize, bsize2, leftoverblock;
- int32_t cbytes, startb, stopb;
- int stop = start + nitems;
- uint8_t *tmp;
- uint8_t *tmp2;
- uint8_t *tmp3;
- int32_t ebsize;
- _src = (uint8_t *)(src);
- /* Read the header block */
- version = _src[0]; /* blosc format version */
- versionlz = _src[1]; /* blosclz format version */
- flags = _src[2]; /* flags */
- typesize = (int32_t)_src[3]; /* typesize */
- nbytes = sw32_(_src + 4); /* buffer size */
- blocksize = sw32_(_src + 8); /* block size */
- ctbytes = sw32_(_src + 12); /* compressed buffer size */
- ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
- tmp = my_malloc(blocksize + ebsize + blocksize);
- tmp2 = tmp + blocksize;
- tmp3 = tmp + blocksize + ebsize;
- version += 0; /* shut up compiler warning */
- versionlz += 0; /* shut up compiler warning */
- ctbytes += 0; /* shut up compiler warning */
- _src += 16;
- bstarts = _src;
- /* Compute some params */
- /* Total blocks */
- nblocks = nbytes / blocksize;
- leftover = nbytes % blocksize;
- nblocks = (leftover>0)? nblocks+1: nblocks;
- _src += sizeof(int32_t)*nblocks;
- /* Check region boundaries */
- if ((start < 0) || (start*typesize > nbytes)) {
- fprintf(stderr, "`start` out of bounds");
- return -1;
- }
- if ((stop < 0) || (stop*typesize > nbytes)) {
- fprintf(stderr, "`start`+`nitems` out of bounds");
- return -1;
- }
- for (j = 0; j < nblocks; j++) {
- bsize = blocksize;
- leftoverblock = 0;
- if ((j == nblocks - 1) && (leftover > 0)) {
- bsize = leftover;
- leftoverblock = 1;
- }
- /* Compute start & stop for each block */
- startb = start * typesize - j * blocksize;
- stopb = stop * typesize - j * blocksize;
- if ((startb >= (int)blocksize) || (stopb <= 0)) {
- continue;
- }
- if (startb < 0) {
- startb = 0;
- }
- if (stopb > (int)blocksize) {
- stopb = blocksize;
- }
- bsize2 = stopb - startb;
- /* Do the actual data copy */
- if (flags & BLOSC_MEMCPYED) {
- /* We want to memcpy only */
- memcpy((uint8_t *)dest + ntbytes,
- (uint8_t *)src + BLOSC_MAX_OVERHEAD + j*blocksize + startb,
- bsize2);
- cbytes = bsize2;
- }
- else {
- struct blosc_context context;
- /* blosc_d only uses typesize and flags */
- context.typesize = typesize;
- context.header_flags = &flags;
- /* Regular decompression. Put results in tmp2. */
- cbytes = blosc_d(&context, bsize, leftoverblock,
- (uint8_t *)src + sw32_(bstarts + j * 4),
- tmp2, tmp, tmp3);
- if (cbytes < 0) {
- ntbytes = cbytes;
- break;
- }
- /* Copy to destination */
- memcpy((uint8_t *)dest + ntbytes, tmp2 + startb, bsize2);
- cbytes = bsize2;
- }
- ntbytes += cbytes;
- }
- my_free(tmp);
- return ntbytes;
- }
- /* Decompress & unshuffle several blocks in a single thread */
- static void *t_blosc(void *ctxt)
- {
- struct thread_context* context = (struct thread_context*)ctxt;
- int32_t cbytes, ntdest;
- int32_t tblocks; /* number of blocks per thread */
- int32_t leftover2;
- int32_t tblock; /* limit block on a thread */
- int32_t nblock_; /* private copy of nblock */
- int32_t bsize, leftoverblock;
- /* Parameters for threads */
- int32_t blocksize;
- int32_t ebsize;
- int32_t compress;
- int32_t maxbytes;
- int32_t ntbytes;
- int32_t flags;
- int32_t nblocks;
- int32_t leftover;
- uint8_t *bstarts;
- const uint8_t *src;
- uint8_t *dest;
- uint8_t *tmp;
- uint8_t *tmp2;
- uint8_t *tmp3;
- int rc;
- while(1)
- {
- /* Synchronization point for all threads (wait for initialization) */
- WAIT_INIT(NULL, context->parent_context);
- if(context->parent_context->end_threads)
- {
- break;
- }
- /* Get parameters for this thread before entering the main loop */
- blocksize = context->parent_context->blocksize;
- ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
- compress = context->parent_context->compress;
- flags = *(context->parent_context->header_flags);
- maxbytes = context->parent_context->destsize;
- nblocks = context->parent_context->nblocks;
- leftover = context->parent_context->leftover;
- bstarts = context->parent_context->bstarts;
- src = context->parent_context->src;
- dest = context->parent_context->dest;
- if (blocksize > context->tmpblocksize)
- {
- my_free(context->tmp);
- context->tmp = my_malloc(blocksize + ebsize + blocksize);
- context->tmp2 = context->tmp + blocksize;
- context->tmp3 = context->tmp + blocksize + ebsize;
- }
- tmp = context->tmp;
- tmp2 = context->tmp2;
- tmp3 = context->tmp3;
- ntbytes = 0; /* only useful for decompression */
- if (compress && !(flags & BLOSC_MEMCPYED)) {
- /* Compression always has to follow the block order */
- pthread_mutex_lock(&context->parent_context->count_mutex);
- context->parent_context->thread_nblock++;
- nblock_ = context->parent_context->thread_nblock;
- pthread_mutex_unlock(&context->parent_context->count_mutex);
- tblock = nblocks;
- }
- else {
- /* Decompression can happen using any order. We choose
- sequential block order on each thread */
- /* Blocks per thread */
- tblocks = nblocks / context->parent_context->numthreads;
- leftover2 = nblocks % context->parent_context->numthreads;
- tblocks = (leftover2>0)? tblocks+1: tblocks;
- nblock_ = context->tid*tblocks;
- tblock = nblock_ + tblocks;
- if (tblock > nblocks) {
- tblock = nblocks;
- }
- }
- /* Loop over blocks */
- leftoverblock = 0;
- while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
- bsize = blocksize;
- if (nblock_ == (nblocks - 1) && (leftover > 0)) {
- bsize = leftover;
- leftoverblock = 1;
- }
- if (compress) {
- if (flags & BLOSC_MEMCPYED) {
- /* We want to memcpy only */
- memcpy(dest+BLOSC_MAX_OVERHEAD+nblock_*blocksize,
- src+nblock_*blocksize, bsize);
- cbytes = bsize;
- }
- else {
- /* Regular compression */
- cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
- src+nblock_*blocksize, tmp2, tmp, tmp3);
- }
- }
- else {
- if (flags & BLOSC_MEMCPYED) {
- /* We want to memcpy only */
- memcpy(dest+nblock_*blocksize,
- src+BLOSC_MAX_OVERHEAD+nblock_*blocksize, bsize);
- cbytes = bsize;
- }
- else {
- cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
- src + sw32_(bstarts + nblock_ * 4),
- dest+nblock_*blocksize,
- tmp, tmp2);
- }
- }
- /* Check whether current thread has to giveup */
- if (context->parent_context->thread_giveup_code <= 0) {
- break;
- }
- /* Check results for the compressed/decompressed block */
- if (cbytes < 0) { /* compr/decompr failure */
- /* Set giveup_code error */
- pthread_mutex_lock(&context->parent_context->count_mutex);
- context->parent_context->thread_giveup_code = cbytes;
- pthread_mutex_unlock(&context->parent_context->count_mutex);
- break;
- }
- if (compress && !(flags & BLOSC_MEMCPYED)) {
- /* Start critical section */
- pthread_mutex_lock(&context->parent_context->count_mutex);
- ntdest = context->parent_context->num_output_bytes;
- _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
- if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
- context->parent_context->thread_giveup_code = 0; /* uncompressible buffer */
- pthread_mutex_unlock(&context->parent_context->count_mutex);
- break;
- }
- context->parent_context->thread_nblock++;
- nblock_ = context->parent_context->thread_nblock;
- context->parent_context->num_output_bytes += cbytes; /* update return bytes counter */
- pthread_mutex_unlock(&context->parent_context->count_mutex);
- /* End of critical section */
- /* Copy the compressed buffer to destination */
- memcpy(dest+ntdest, tmp2, cbytes);
- }
- else {
- nblock_++;
- /* Update counter for this thread */
- ntbytes += cbytes;
- }
- } /* closes while (nblock_) */
- /* Sum up all the bytes decompressed */
- if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
- /* Update global counter for all threads (decompression only) */
- pthread_mutex_lock(&context->parent_context->count_mutex);
- context->parent_context->num_output_bytes += ntbytes;
- pthread_mutex_unlock(&context->parent_context->count_mutex);
- }
- /* Meeting point for all threads (wait for finalization) */
- WAIT_FINISH(NULL, context->parent_context);
- }
- /* Cleanup our working space and context */
- my_free(context->tmp);
- my_free(context);
- return(NULL);
- }
- static int init_threads(struct blosc_context* context)
- {
- int32_t tid;
- int rc2;
- int32_t ebsize;
- struct thread_context* thread_context;
- /* Initialize mutex and condition variable objects */
- pthread_mutex_init(&context->count_mutex, NULL);
- /* Set context thread sentinels */
- context->thread_giveup_code = 1;
- context->thread_nblock = -1;
- /* Barrier initialization */
- #ifdef _POSIX_BARRIERS_MINE
- pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
- pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
- #else
- pthread_mutex_init(&context->count_threads_mutex, NULL);
- pthread_cond_init(&context->count_threads_cv, NULL);
- context->count_threads = 0; /* Reset threads counter */
- #endif
- #if !defined(_WIN32)
- /* Initialize and set thread detached attribute */
- pthread_attr_init(&context->ct_attr);
- pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
- #endif
- /* Finally, create the threads in detached state */
- for (tid = 0; tid < context->numthreads; tid++) {
- context->tids[tid] = tid;
- /* Create a thread context thread owns context (will destroy when finished) */
- thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
- thread_context->parent_context = context;
- thread_context->tid = tid;
- ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
- thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
- thread_context->tmp2 = thread_context->tmp + context->blocksize;
- thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
- thread_context->tmpblocksize = context->blocksize;
- #if !defined(_WIN32)
- rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
- #else
- rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
- #endif
- if (rc2) {
- fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
- fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
- return(-1);
- }
- }
- return(0);
- }
- int blosc_get_nthreads(void)
- {
- int ret = g_threads;
- return ret;
- }
- int blosc_set_nthreads(int nthreads_new)
- {
- int ret = g_threads;
- /* Check if should initialize */
- if (!g_initlib) blosc_init();
- if (nthreads_new != ret){
- /* Re-initialize Blosc */
- blosc_destroy();
- blosc_init();
- g_threads = nthreads_new;
- }
- return ret;
- }
- int blosc_set_nthreads_(struct blosc_context* context)
- {
- if (context->numthreads > BLOSC_MAX_THREADS) {
- fprintf(stderr,
- "Error. nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
- BLOSC_MAX_THREADS);
- return -1;
- }
- else if (context->numthreads <= 0) {
- fprintf(stderr, "Error. nthreads must be a positive integer");
- return -1;
- }
- /* Launch a new pool of threads */
- if (context->numthreads > 1 && context->numthreads != context->threads_started) {
- blosc_release_threadpool(context);
- init_threads(context);
- }
- /* We have now started the threads */
- context->threads_started = context->numthreads;
- return context->numthreads;
- }
- char* blosc_get_compressor(void)
- {
- char* compname;
- blosc_compcode_to_compname(g_compressor, &compname);
- return compname;
- }
- int blosc_set_compressor(const char *compname)
- {
- int code = blosc_compname_to_compcode(compname);
- g_compressor = code;
- /* Check if should initialize */
- if (!g_initlib) blosc_init();
- return code;
- }
- char* blosc_list_compressors(void)
- {
- static int compressors_list_done = 0;
- static char ret[256];
- if (compressors_list_done) return ret;
- ret[0] = '\0';
- strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
- #if defined(HAVE_LZ4)
- strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
- strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
- #endif /* HAVE_ZLIB */
- compressors_list_done = 1;
- return ret;
- }
- char* blosc_get_version_string(void)
- {
- static char ret[256];
- strcpy(ret, BLOSC_VERSION_STRING);
- return ret;
- }
- int blosc_get_complib_info(char *compname, char **complib, char **version)
- {
- int clibcode;
- char *clibname;
- char *clibversion = "unknown";
- #if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION))
- char sbuffer[256];
- #endif
- clibcode = compname_to_clibcode(compname);
- clibname = clibcode_to_clibname(clibcode);
- /* complib version */
- if (clibcode == BLOSC_BLOSCLZ_LIB) {
- clibversion = BLOSCLZ_VERSION_STRING;
- }
- #if defined(HAVE_LZ4)
- else if (clibcode == BLOSC_LZ4_LIB) {
- #if defined(LZ4_VERSION_MAJOR)
- sprintf(sbuffer, "%d.%d.%d",
- LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
- clibversion = sbuffer;
- #endif /* LZ4_VERSION_MAJOR */
- }
- #endif /* HAVE_LZ4 */
- #if defined(HAVE_SNAPPY)
- else if (clibcode == BLOSC_SNAPPY_LIB) {
- #if defined(SNAPPY_VERSION)
- sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
- clibversion = sbuffer;
- #endif /* SNAPPY_VERSION */
- }
- #endif /* HAVE_SNAPPY */
- #if defined(HAVE_ZLIB)
- else if (clibcode == BLOSC_ZLIB_LIB) {
- clibversion = ZLIB_VERSION;
- }
- #endif /* HAVE_ZLIB */
- *complib = strdup(clibname);
- *version = strdup(clibversion);
- return clibcode;
- }
- /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
- void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
- size_t *cbytes, size_t *blocksize)
- {
- uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
- uint8_t version, versionlz; /* versions for compressed header */
- /* Read the version info (could be useful in the future) */
- version = _src[0]; /* blosc format version */
- versionlz = _src[1]; /* blosclz format version */
- version += 0; /* shut up compiler warning */
- versionlz += 0; /* shut up compiler warning */
- /* Read the interesting values */
- *nbytes = (size_t)sw32_(_src + 4); /* uncompressed buffer size */
- *blocksize = (size_t)sw32_(_src + 8); /* block size */
- *cbytes = (size_t)sw32_(_src + 12); /* compressed buffer size */
- }
- /* Return `typesize` and `flags` from a compressed buffer. */
- void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
- int *flags)
- {
- uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
- uint8_t version, versionlz; /* versions for compressed header */
- /* Read the version info (could be useful in the future) */
- version = _src[0]; /* blosc format version */
- versionlz = _src[1]; /* blosclz format version */
- version += 0; /* shut up compiler warning */
- versionlz += 0; /* shut up compiler warning */
- /* Read the interesting values */
- *flags = (int)_src[2]; /* flags */
- *typesize = (size_t)_src[3]; /* typesize */
- }
- /* Return version information from a compressed buffer. */
- void blosc_cbuffer_versions(const void *cbuffer, int *version,
- int *versionlz)
- {
- uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
- /* Read the version info */
- *version = (int)_src[0]; /* blosc format version */
- *versionlz = (int)_src[1]; /* Lempel-Ziv compressor format version */
- }
- /* Return the compressor library/format used in a compressed buffer. */
- char *blosc_cbuffer_complib(const void *cbuffer)
- {
- uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
- int clibcode;
- char *complib;
- /* Read the compressor format/library info */
- clibcode = (_src[2] & 0xe0) >> 5;
- complib = clibcode_to_clibname(clibcode);
- return complib;
- }
- /* Get the internal blocksize to be used during compression. 0 means
- that an automatic blocksize is computed internally. */
- int blosc_get_blocksize(void)
- {
- return (int)g_force_blocksize;
- }
- /* Force the use of a specific blocksize. If 0, an automatic
- blocksize will be used (the default). */
- void blosc_set_blocksize(size_t size)
- {
- g_force_blocksize = (int32_t)size;
- }
- void blosc_init(void)
- {
- /* Return if we are already initialized */
- if (g_initlib) return;
- pthread_mutex_init(&global_comp_mutex, NULL);
- g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
- g_global_context->threads_started = 0;
- g_initlib = 1;
- }
- void blosc_destroy(void)
- {
- /* Return if Blosc is not initialized */
- if (!g_initlib) return;
- g_initlib = 0;
- blosc_release_threadpool(g_global_context);
- my_free(g_global_context);
- pthread_mutex_destroy(&global_comp_mutex);
- }
- int blosc_release_threadpool(struct blosc_context* context)
- {
- int32_t t;
- void* status;
- int rc;
- int rc2;
- if (context->threads_started > 0)
- {
- /* Tell all existing threads to finish */
- context->end_threads = 1;
- /* Sync threads */
- WAIT_INIT(-1, context);
- /* Join exiting threads */
- for (t=0; t<context->threads_started; t++) {
- rc2 = pthread_join(context->threads[t], &status);
- if (rc2) {
- fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
- fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
- }
- }
- /* Release mutex and condition variable objects */
- pthread_mutex_destroy(&context->count_mutex);
- /* Barriers */
- #ifdef _POSIX_BARRIERS_MINE
- pthread_barrier_destroy(&context->barr_init);
- pthread_barrier_destroy(&context->barr_finish);
- #else
- pthread_mutex_destroy(&context->count_threads_mutex);
- pthread_cond_destroy(&context->count_threads_cv);
- #endif
- /* Thread attributes */
- #if !defined(_WIN32)
- pthread_attr_destroy(&context->ct_attr);
- #endif
- }
- context->threads_started = 0;
- return 0;
- }
- int blosc_free_resources(void)
- {
- /* Return if Blosc is not initialized */
- if (!g_initlib) return -1;
- return blosc_release_threadpool(g_global_context);
- }
|