blosc.c 65 KB


  1. /*********************************************************************
  2. Blosc - Blocked Shuffling and Compression Library
  3. Author: Francesc Alted <francesc@blosc.org>
  4. Creation date: 2009-05-20
  5. See LICENSES/BLOSC.txt for details about copyright and rights to use.
  6. **********************************************************************/
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <errno.h>
  10. #include <string.h>
  11. #include <sys/types.h>
  12. #include <assert.h>
  13. #include "fastcopy.h"
  14. #if defined(USING_CMAKE)
  15. #include "config.h"
  16. #endif /* USING_CMAKE */
  17. #include "blosc.h"
  18. #include "shuffle.h"
  19. #include "blosclz.h"
  20. #if defined(HAVE_LZ4)
  21. #include "lz4.h"
  22. #include "lz4hc.h"
  23. #endif /* HAVE_LZ4 */
  24. #if defined(HAVE_SNAPPY)
  25. #include "snappy-c.h"
  26. #endif /* HAVE_SNAPPY */
  27. #if defined(HAVE_ZLIB)
  28. #include "zlib.h"
  29. #endif /* HAVE_ZLIB */
  30. #if defined(HAVE_ZSTD)
  31. #include "zstd.h"
  32. #endif /* HAVE_ZSTD */
  33. #if defined(_WIN32) && !defined(__MINGW32__)
  34. #include <windows.h>
  35. #include <malloc.h>
  36. /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
  37. #if defined(_MSC_VER) && _MSC_VER < 1600
  38. #include "win32/stdint-windows.h"
  39. #else
  40. #include <stdint.h>
  41. #endif
  42. #include <process.h>
  43. #define getpid _getpid
  44. #else
  45. #include <stdint.h>
  46. #include <unistd.h>
  47. #include <inttypes.h>
  48. #endif /* _WIN32 */
  49. /* Include the win32/pthread.h library for all the Windows builds. See #224. */
  50. #if defined(_WIN32)
  51. #include "win32/pthread.h"
  52. #include "win32/pthread.c"
  53. #else
  54. #include <pthread.h>
  55. #endif
  56. /* Some useful units */
  57. #define KB 1024
  58. #define MB (1024 * (KB))
  59. /* Minimum buffer size to be compressed */
  60. #define MIN_BUFFERSIZE 128 /* Cannot be smaller than 66 */
  61. /* The maximum number of splits in a block for compression */
  62. #define MAX_SPLITS 16 /* Cannot be larger than 128 */
  63. /* The size of L1 cache. 32 KB is quite common nowadays. */
  64. #define L1 (32 * (KB))
  65. /* Have problems using posix barriers when symbol value is 200112L */
  66. /* This requires more investigation, but will work for the moment */
  67. #if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
  68. #define _POSIX_BARRIERS_MINE
  69. #endif
  70. /* Synchronization variables */
  71. struct blosc_context {
  72. int32_t compress; /* 1 if we are doing compression 0 if decompress */
  73. const uint8_t* src;
  74. uint8_t* dest; /* The current pos in the destination buffer */
  75. uint8_t* header_flags; /* Flags for header */
  76. int compversion; /* Compressor version byte, only used during decompression */
  77. int32_t sourcesize; /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
  78. int32_t nblocks; /* Number of total blocks in buffer */
  79. int32_t leftover; /* Extra bytes at end of buffer */
  80. int32_t blocksize; /* Length of the block in bytes */
  81. int32_t typesize; /* Type size */
  82. int32_t num_output_bytes; /* Counter for the number of output bytes */
  83. int32_t destsize; /* Maximum size for destination buffer */
  84. uint8_t* bstarts; /* Start of the buffer past header info */
  85. int32_t compcode; /* Compressor code to use */
  86. int clevel; /* Compression level (1-9) */
  87. /* Threading */
  88. int32_t numthreads;
  89. int32_t threads_started;
  90. int32_t end_threads;
  91. pthread_t threads[BLOSC_MAX_THREADS];
  92. int32_t tids[BLOSC_MAX_THREADS];
  93. pthread_mutex_t count_mutex;
  94. #ifdef _POSIX_BARRIERS_MINE
  95. pthread_barrier_t barr_init;
  96. pthread_barrier_t barr_finish;
  97. #else
  98. int32_t count_threads;
  99. pthread_mutex_t count_threads_mutex;
  100. pthread_cond_t count_threads_cv;
  101. #endif
  102. #if !defined(_WIN32)
  103. pthread_attr_t ct_attr; /* creation time attrs for threads */
  104. #endif
  105. int32_t thread_giveup_code; /* error code when give up */
  106. int32_t thread_nblock; /* block counter */
  107. };
  108. struct thread_context {
  109. struct blosc_context* parent_context;
  110. int32_t tid;
  111. uint8_t* tmp;
  112. uint8_t* tmp2;
  113. uint8_t* tmp3;
  114. int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
  115. };
  116. /* Global context for non-contextual API */
  117. static struct blosc_context* g_global_context;
  118. static pthread_mutex_t global_comp_mutex;
  119. static int32_t g_compressor = BLOSC_BLOSCLZ; /* the compressor to use by default */
  120. static int32_t g_threads = 1;
  121. static int32_t g_force_blocksize = 0;
  122. static int32_t g_initlib = 0;
  123. static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
  124. /* Wrapped function to adjust the number of threads used by blosc */
  125. int blosc_set_nthreads_(struct blosc_context*);
  126. /* Releases the global threadpool */
  127. int blosc_release_threadpool(struct blosc_context* context);
  128. /* Macros for synchronization */
  129. /* Wait until all threads are initialized */
  130. #ifdef _POSIX_BARRIERS_MINE
  131. #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
  132. rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
  133. if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
  134. printf("Could not wait on barrier (init): %d\n", rc); \
  135. return((RET_VAL)); \
  136. }
  137. #else
  138. #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
  139. pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
  140. if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
  141. CONTEXT_PTR->count_threads++; \
  142. pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
  143. } \
  144. else { \
  145. pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
  146. } \
  147. pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
  148. #endif
  149. /* Wait for all threads to finish */
  150. #ifdef _POSIX_BARRIERS_MINE
  151. #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
  152. rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
  153. if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
  154. printf("Could not wait on barrier (finish)\n"); \
  155. return((RET_VAL)); \
  156. }
  157. #else
  158. #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
  159. pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
  160. if (CONTEXT_PTR->count_threads > 0) { \
  161. CONTEXT_PTR->count_threads--; \
  162. pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
  163. } \
  164. else { \
  165. pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
  166. } \
  167. pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
  168. #endif
  169. /* A function for aligned malloc that is portable */
  170. static uint8_t *my_malloc(size_t size)
  171. {
  172. void *block = NULL;
  173. int res = 0;
  174. /* Do an alignment to 32 bytes because AVX2 is supported */
  175. #if defined(_WIN32)
  176. /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
  177. block = (void *)_aligned_malloc(size, 32);
  178. #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
  179. /* Platform does have an implementation of posix_memalign */
  180. res = posix_memalign(&block, 32, size);
  181. #else
  182. block = malloc(size);
  183. #endif /* _WIN32 */
  184. if (block == NULL || res != 0) {
  185. printf("Error allocating memory!");
  186. return NULL;
  187. }
  188. return (uint8_t *)block;
  189. }
  190. /* Release memory booked by my_malloc */
  191. static void my_free(void *block)
  192. {
  193. #if defined(_WIN32)
  194. _aligned_free(block);
  195. #else
  196. free(block);
  197. #endif /* _WIN32 */
  198. }
  199. /* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
  200. static int32_t sw32_(const uint8_t *pa)
  201. {
  202. int32_t idest;
  203. uint8_t *dest = (uint8_t *)&idest;
  204. int i = 1; /* for big/little endian detection */
  205. char *p = (char *)&i;
  206. if (p[0] != 1) {
  207. /* big endian */
  208. dest[0] = pa[3];
  209. dest[1] = pa[2];
  210. dest[2] = pa[1];
  211. dest[3] = pa[0];
  212. }
  213. else {
  214. /* little endian */
  215. dest[0] = pa[0];
  216. dest[1] = pa[1];
  217. dest[2] = pa[2];
  218. dest[3] = pa[3];
  219. }
  220. return idest;
  221. }
  222. /* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
  223. static void _sw32(uint8_t* dest, int32_t a)
  224. {
  225. uint8_t *pa = (uint8_t *)&a;
  226. int i = 1; /* for big/little endian detection */
  227. char *p = (char *)&i;
  228. if (p[0] != 1) {
  229. /* big endian */
  230. dest[0] = pa[3];
  231. dest[1] = pa[2];
  232. dest[2] = pa[1];
  233. dest[3] = pa[0];
  234. }
  235. else {
  236. /* little endian */
  237. dest[0] = pa[0];
  238. dest[1] = pa[1];
  239. dest[2] = pa[2];
  240. dest[3] = pa[3];
  241. }
  242. }
  243. /*
  244. * Conversion routines between compressor and compression libraries
  245. */
  246. /* Return the library code associated with the compressor name */
  247. static int compname_to_clibcode(const char *compname)
  248. {
  249. if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
  250. return BLOSC_BLOSCLZ_LIB;
  251. if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
  252. return BLOSC_LZ4_LIB;
  253. if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
  254. return BLOSC_LZ4_LIB;
  255. if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
  256. return BLOSC_SNAPPY_LIB;
  257. if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
  258. return BLOSC_ZLIB_LIB;
  259. if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
  260. return BLOSC_ZSTD_LIB;
  261. return -1;
  262. }
  263. /* Return the library name associated with the compressor code */
  264. static const char *clibcode_to_clibname(int clibcode)
  265. {
  266. if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
  267. if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
  268. if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
  269. if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
  270. if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
  271. return NULL; /* should never happen */
  272. }
  273. /*
  274. * Conversion routines between compressor names and compressor codes
  275. */
  276. /* Get the compressor name associated with the compressor code */
  277. int blosc_compcode_to_compname(int compcode, const char **compname)
  278. {
  279. int code = -1; /* -1 means non-existent compressor code */
  280. const char *name = NULL;
  281. /* Map the compressor code */
  282. if (compcode == BLOSC_BLOSCLZ)
  283. name = BLOSC_BLOSCLZ_COMPNAME;
  284. else if (compcode == BLOSC_LZ4)
  285. name = BLOSC_LZ4_COMPNAME;
  286. else if (compcode == BLOSC_LZ4HC)
  287. name = BLOSC_LZ4HC_COMPNAME;
  288. else if (compcode == BLOSC_SNAPPY)
  289. name = BLOSC_SNAPPY_COMPNAME;
  290. else if (compcode == BLOSC_ZLIB)
  291. name = BLOSC_ZLIB_COMPNAME;
  292. else if (compcode == BLOSC_ZSTD)
  293. name = BLOSC_ZSTD_COMPNAME;
  294. *compname = name;
  295. /* Guess if there is support for this code */
  296. if (compcode == BLOSC_BLOSCLZ)
  297. code = BLOSC_BLOSCLZ;
  298. #if defined(HAVE_LZ4)
  299. else if (compcode == BLOSC_LZ4)
  300. code = BLOSC_LZ4;
  301. else if (compcode == BLOSC_LZ4HC)
  302. code = BLOSC_LZ4HC;
  303. #endif /* HAVE_LZ4 */
  304. #if defined(HAVE_SNAPPY)
  305. else if (compcode == BLOSC_SNAPPY)
  306. code = BLOSC_SNAPPY;
  307. #endif /* HAVE_SNAPPY */
  308. #if defined(HAVE_ZLIB)
  309. else if (compcode == BLOSC_ZLIB)
  310. code = BLOSC_ZLIB;
  311. #endif /* HAVE_ZLIB */
  312. #if defined(HAVE_ZSTD)
  313. else if (compcode == BLOSC_ZSTD)
  314. code = BLOSC_ZSTD;
  315. #endif /* HAVE_ZSTD */
  316. return code;
  317. }
  318. /* Get the compressor code for the compressor name. -1 if it is not available */
  319. int blosc_compname_to_compcode(const char *compname)
  320. {
  321. int code = -1; /* -1 means non-existent compressor code */
  322. if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
  323. code = BLOSC_BLOSCLZ;
  324. }
  325. #if defined(HAVE_LZ4)
  326. else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
  327. code = BLOSC_LZ4;
  328. }
  329. else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
  330. code = BLOSC_LZ4HC;
  331. }
  332. #endif /* HAVE_LZ4 */
  333. #if defined(HAVE_SNAPPY)
  334. else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
  335. code = BLOSC_SNAPPY;
  336. }
  337. #endif /* HAVE_SNAPPY */
  338. #if defined(HAVE_ZLIB)
  339. else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
  340. code = BLOSC_ZLIB;
  341. }
  342. #endif /* HAVE_ZLIB */
  343. #if defined(HAVE_ZSTD)
  344. else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
  345. code = BLOSC_ZSTD;
  346. }
  347. #endif /* HAVE_ZSTD */
  348. return code;
  349. }
  350. #if defined(HAVE_LZ4)
  351. static int lz4_wrap_compress(const char* input, size_t input_length,
  352. char* output, size_t maxout, int accel)
  353. {
  354. int cbytes;
  355. cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
  356. accel);
  357. return cbytes;
  358. }
  359. static int lz4hc_wrap_compress(const char* input, size_t input_length,
  360. char* output, size_t maxout, int clevel)
  361. {
  362. int cbytes;
  363. if (input_length > (size_t)(2<<30))
  364. return -1; /* input larger than 1 GB is not supported */
  365. /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
  366. * but levels larger than 9 does not buy much compression. */
  367. cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
  368. clevel);
  369. return cbytes;
  370. }
  371. static int lz4_wrap_decompress(const char* input, size_t compressed_length,
  372. char* output, size_t maxout)
  373. {
  374. size_t cbytes;
  375. cbytes = LZ4_decompress_fast(input, output, (int)maxout);
  376. if (cbytes != compressed_length) {
  377. return 0;
  378. }
  379. return (int)maxout;
  380. }
  381. #endif /* HAVE_LZ4 */
  382. #if defined(HAVE_SNAPPY)
  383. static int snappy_wrap_compress(const char* input, size_t input_length,
  384. char* output, size_t maxout)
  385. {
  386. snappy_status status;
  387. size_t cl = maxout;
  388. status = snappy_compress(input, input_length, output, &cl);
  389. if (status != SNAPPY_OK){
  390. return 0;
  391. }
  392. return (int)cl;
  393. }
  394. static int snappy_wrap_decompress(const char* input, size_t compressed_length,
  395. char* output, size_t maxout)
  396. {
  397. snappy_status status;
  398. size_t ul = maxout;
  399. status = snappy_uncompress(input, compressed_length, output, &ul);
  400. if (status != SNAPPY_OK){
  401. return 0;
  402. }
  403. return (int)ul;
  404. }
  405. #endif /* HAVE_SNAPPY */
  406. #if defined(HAVE_ZLIB)
  407. /* zlib is not very respectful with sharing name space with others.
  408. Fortunately, its names do not collide with those already in blosc. */
  409. static int zlib_wrap_compress(const char* input, size_t input_length,
  410. char* output, size_t maxout, int clevel)
  411. {
  412. int status;
  413. uLongf cl = maxout;
  414. status = compress2(
  415. (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
  416. if (status != Z_OK){
  417. return 0;
  418. }
  419. return (int)cl;
  420. }
  421. static int zlib_wrap_decompress(const char* input, size_t compressed_length,
  422. char* output, size_t maxout)
  423. {
  424. int status;
  425. uLongf ul = maxout;
  426. status = uncompress(
  427. (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
  428. if (status != Z_OK){
  429. return 0;
  430. }
  431. return (int)ul;
  432. }
  433. #endif /* HAVE_ZLIB */
  434. #if defined(HAVE_ZSTD)
  435. static int zstd_wrap_compress(const char* input, size_t input_length,
  436. char* output, size_t maxout, int clevel) {
  437. size_t code;
  438. clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
  439. /* Make the level 8 close enough to maxCLevel */
  440. if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
  441. code = ZSTD_compress(
  442. (void*)output, maxout, (void*)input, input_length, clevel);
  443. if (ZSTD_isError(code)) {
  444. return 0;
  445. }
  446. return (int)code;
  447. }
  448. static int zstd_wrap_decompress(const char* input, size_t compressed_length,
  449. char* output, size_t maxout) {
  450. size_t code;
  451. code = ZSTD_decompress(
  452. (void*)output, maxout, (void*)input, compressed_length);
  453. if (ZSTD_isError(code)) {
  454. fprintf(stderr, "error decompressing with Zstd: %s \n", ZSTD_getErrorName(code));
  455. return 0;
  456. }
  457. return (int)code;
  458. }
  459. #endif /* HAVE_ZSTD */
  460. /* Compute acceleration for blosclz */
  461. static int get_accel(const struct blosc_context* context) {
  462. int32_t clevel = context->clevel;
  463. if (context->compcode == BLOSC_LZ4) {
  464. /* This acceleration setting based on discussions held in:
  465. * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
  466. */
  467. return (10 - clevel);
  468. }
  469. return 1;
  470. }
  471. /* Shuffle & compress a single block */
  472. static int blosc_c(const struct blosc_context* context, int32_t blocksize,
  473. int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
  474. const uint8_t *src, uint8_t *dest, uint8_t *tmp,
  475. uint8_t *tmp2)
  476. {
  477. int8_t header_flags = *(context->header_flags);
  478. int dont_split = (header_flags & 0x10) >> 4;
  479. int32_t j, neblock, nsplits;
  480. int32_t cbytes; /* number of compressed bytes in split */
  481. int32_t ctbytes = 0; /* number of compressed bytes in block */
  482. int32_t maxout;
  483. int32_t typesize = context->typesize;
  484. const uint8_t *_tmp = src;
  485. const char *compname;
  486. int accel;
  487. int bscount;
  488. int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
  489. int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
  490. (blocksize >= typesize));
  491. if (doshuffle) {
  492. /* Byte shuffling only makes sense if typesize > 1 */
  493. shuffle(typesize, blocksize, src, tmp);
  494. _tmp = tmp;
  495. }
  496. /* We don't allow more than 1 filter at the same time (yet) */
  497. else if (dobitshuffle) {
  498. bscount = bitshuffle(typesize, blocksize, src, tmp, tmp2);
  499. if (bscount < 0)
  500. return bscount;
  501. _tmp = tmp;
  502. }
  503. /* Calculate acceleration for different compressors */
  504. accel = get_accel(context);
  505. /* The number of splits for this block */
  506. if (!dont_split && !leftoverblock) {
  507. nsplits = typesize;
  508. }
  509. else {
  510. nsplits = 1;
  511. }
  512. neblock = blocksize / nsplits;
  513. for (j = 0; j < nsplits; j++) {
  514. dest += sizeof(int32_t);
  515. ntbytes += (int32_t)sizeof(int32_t);
  516. ctbytes += (int32_t)sizeof(int32_t);
  517. maxout = neblock;
  518. #if defined(HAVE_SNAPPY)
  519. if (context->compcode == BLOSC_SNAPPY) {
  520. /* TODO perhaps refactor this to keep the value stashed somewhere */
  521. maxout = snappy_max_compressed_length(neblock);
  522. }
  523. #endif /* HAVE_SNAPPY */
  524. if (ntbytes+maxout > maxbytes) {
  525. maxout = maxbytes - ntbytes; /* avoid buffer overrun */
  526. if (maxout <= 0) {
  527. return 0; /* non-compressible block */
  528. }
  529. }
  530. if (context->compcode == BLOSC_BLOSCLZ) {
  531. cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
  532. dest, maxout);
  533. }
  534. #if defined(HAVE_LZ4)
  535. else if (context->compcode == BLOSC_LZ4) {
  536. cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
  537. (char *)dest, (size_t)maxout, accel);
  538. }
  539. else if (context->compcode == BLOSC_LZ4HC) {
  540. cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
  541. (char *)dest, (size_t)maxout,
  542. context->clevel);
  543. }
  544. #endif /* HAVE_LZ4 */
  545. #if defined(HAVE_SNAPPY)
  546. else if (context->compcode == BLOSC_SNAPPY) {
  547. cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
  548. (char *)dest, (size_t)maxout);
  549. }
  550. #endif /* HAVE_SNAPPY */
  551. #if defined(HAVE_ZLIB)
  552. else if (context->compcode == BLOSC_ZLIB) {
  553. cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
  554. (char *)dest, (size_t)maxout,
  555. context->clevel);
  556. }
  557. #endif /* HAVE_ZLIB */
  558. #if defined(HAVE_ZSTD)
  559. else if (context->compcode == BLOSC_ZSTD) {
  560. cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
  561. (char*)dest, (size_t)maxout, context->clevel);
  562. }
  563. #endif /* HAVE_ZSTD */
  564. else {
  565. blosc_compcode_to_compname(context->compcode, &compname);
  566. fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
  567. fprintf(stderr, "compression support. Please use one having it.");
  568. return -5; /* signals no compression support */
  569. }
  570. if (cbytes > maxout) {
  571. /* Buffer overrun caused by compression (should never happen) */
  572. return -1;
  573. }
  574. else if (cbytes < 0) {
  575. /* cbytes should never be negative */
  576. return -2;
  577. }
  578. else if (cbytes == 0 || cbytes == neblock) {
  579. /* The compressor has been unable to compress data at all. */
  580. /* Before doing the copy, check that we are not running into a
  581. buffer overflow. */
  582. if ((ntbytes+neblock) > maxbytes) {
  583. return 0; /* Non-compressible data */
  584. }
  585. fastcopy(dest, _tmp + j * neblock, neblock);
  586. cbytes = neblock;
  587. }
  588. _sw32(dest - 4, cbytes);
  589. dest += cbytes;
  590. ntbytes += cbytes;
  591. ctbytes += cbytes;
  592. } /* Closes j < nsplits */
  593. return ctbytes;
  594. }
  595. /* Decompress & unshuffle a single block */
  596. static int blosc_d(struct blosc_context* context, int32_t blocksize,
  597. int32_t leftoverblock, const uint8_t *src, uint8_t *dest,
  598. uint8_t *tmp, uint8_t *tmp2)
  599. {
  600. int8_t header_flags = *(context->header_flags);
  601. int32_t compformat = (header_flags & 0xe0) >> 5;
  602. int dont_split = (header_flags & 0x10) >> 4;
  603. int32_t j, neblock, nsplits;
  604. int32_t nbytes; /* number of decompressed bytes in split */
  605. int32_t cbytes; /* number of compressed bytes in split */
  606. int32_t ctbytes = 0; /* number of compressed bytes in block */
  607. int32_t ntbytes = 0; /* number of uncompressed bytes in block */
  608. uint8_t *_tmp = dest;
  609. int32_t typesize = context->typesize;
  610. const char *compname;
  611. int bscount;
  612. int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
  613. int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
  614. (blocksize >= typesize));
  615. int compversion = context->compversion;
  616. if (doshuffle || dobitshuffle) {
  617. _tmp = tmp;
  618. }
  619. /* The number of splits for this block */
  620. if (!dont_split &&
  621. /* For compatibility with before the introduction of the split flag */
  622. ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
  623. !leftoverblock) {
  624. nsplits = typesize;
  625. }
  626. else {
  627. nsplits = 1;
  628. }
  629. neblock = blocksize / nsplits;
  630. for (j = 0; j < nsplits; j++) {
  631. cbytes = sw32_(src); /* amount of compressed bytes */
  632. src += sizeof(int32_t);
  633. ctbytes += (int32_t)sizeof(int32_t);
  634. /* Uncompress */
  635. if (cbytes == neblock) {
  636. fastcopy(_tmp, src, neblock);
  637. nbytes = neblock;
  638. }
  639. else {
  640. if (compformat == BLOSC_BLOSCLZ_FORMAT) {
  641. if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
  642. fprintf(stderr, "Unrecognized BloscLZ version %d\n", compversion);
  643. return -9;
  644. }
  645. nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
  646. }
  647. #if defined(HAVE_LZ4)
  648. else if (compformat == BLOSC_LZ4_FORMAT) {
  649. if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
  650. fprintf(stderr, "Unrecognized LZ4 version %d\n", compversion);
  651. return -9;
  652. }
  653. nbytes = lz4_wrap_decompress((char *)src, (size_t)cbytes,
  654. (char*)_tmp, (size_t)neblock);
  655. }
  656. #endif /* HAVE_LZ4 */
  657. #if defined(HAVE_SNAPPY)
  658. else if (compformat == BLOSC_SNAPPY_FORMAT) {
  659. if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
  660. fprintf(stderr, "Unrecognized Snappy version %d\n", compversion);
  661. return -9;
  662. }
  663. nbytes = snappy_wrap_decompress((char *)src, (size_t)cbytes,
  664. (char*)_tmp, (size_t)neblock);
  665. }
  666. #endif /* HAVE_SNAPPY */
  667. #if defined(HAVE_ZLIB)
  668. else if (compformat == BLOSC_ZLIB_FORMAT) {
  669. if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
  670. fprintf(stderr, "Unrecognized Zlib version %d\n", compversion);
  671. return -9;
  672. }
  673. nbytes = zlib_wrap_decompress((char *)src, (size_t)cbytes,
  674. (char*)_tmp, (size_t)neblock);
  675. }
  676. #endif /* HAVE_ZLIB */
  677. #if defined(HAVE_ZSTD)
  678. else if (compformat == BLOSC_ZSTD_FORMAT) {
  679. if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
  680. fprintf(stderr, "Unrecognized Zstd version %d\n", compversion);
  681. return -9;
  682. }
  683. nbytes = zstd_wrap_decompress((char*)src, (size_t)cbytes,
  684. (char*)_tmp, (size_t)neblock);
  685. }
  686. #endif /* HAVE_ZSTD */
  687. else {
  688. compname = clibcode_to_clibname(compformat);
  689. fprintf(stderr,
  690. "Blosc has not been compiled with decompression "
  691. "support for '%s' format. ", compname);
  692. fprintf(stderr, "Please recompile for adding this support.\n");
  693. return -5; /* signals no decompression support */
  694. }
  695. /* Check that decompressed bytes number is correct */
  696. if (nbytes != neblock) {
  697. return -2;
  698. }
  699. }
  700. src += cbytes;
  701. ctbytes += cbytes;
  702. _tmp += nbytes;
  703. ntbytes += nbytes;
  704. } /* Closes j < nsplits */
  705. if (doshuffle) {
  706. unshuffle(typesize, blocksize, tmp, dest);
  707. }
  708. else if (dobitshuffle) {
  709. bscount = bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
  710. if (bscount < 0)
  711. return bscount;
  712. }
  713. /* Return the number of uncompressed bytes */
  714. return ntbytes;
  715. }
  716. /* Serial version for compression/decompression */
  717. static int serial_blosc(struct blosc_context* context)
  718. {
  719. int32_t j, bsize, leftoverblock;
  720. int32_t cbytes;
  721. int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
  722. int32_t ntbytes = context->num_output_bytes;
  723. uint8_t *tmp = my_malloc(context->blocksize + ebsize);
  724. uint8_t *tmp2 = tmp + context->blocksize;
  725. for (j = 0; j < context->nblocks; j++) {
  726. if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
  727. _sw32(context->bstarts + j * 4, ntbytes);
  728. }
  729. bsize = context->blocksize;
  730. leftoverblock = 0;
  731. if ((j == context->nblocks - 1) && (context->leftover > 0)) {
  732. bsize = context->leftover;
  733. leftoverblock = 1;
  734. }
  735. if (context->compress) {
  736. if (*(context->header_flags) & BLOSC_MEMCPYED) {
  737. /* We want to memcpy only */
  738. fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
  739. context->src + j * context->blocksize, bsize);
  740. cbytes = bsize;
  741. }
  742. else {
  743. /* Regular compression */
  744. cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
  745. context->destsize, context->src+j*context->blocksize,
  746. context->dest+ntbytes, tmp, tmp2);
  747. if (cbytes == 0) {
  748. ntbytes = 0; /* uncompressible data */
  749. break;
  750. }
  751. }
  752. }
  753. else {
  754. if (*(context->header_flags) & BLOSC_MEMCPYED) {
  755. /* We want to memcpy only */
  756. fastcopy(context->dest + j * context->blocksize,
  757. context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
  758. cbytes = bsize;
  759. }
  760. else {
  761. /* Regular decompression */
  762. cbytes = blosc_d(context, bsize, leftoverblock,
  763. context->src + sw32_(context->bstarts + j * 4),
  764. context->dest+j*context->blocksize, tmp, tmp2);
  765. }
  766. }
  767. if (cbytes < 0) {
  768. ntbytes = cbytes; /* error in blosc_c or blosc_d */
  769. break;
  770. }
  771. ntbytes += cbytes;
  772. }
  773. // Free temporaries
  774. my_free(tmp);
  775. return ntbytes;
  776. }
  777. /* Threaded version for compression/decompression */
  778. static int parallel_blosc(struct blosc_context* context)
  779. {
  780. int rc;
  781. /* Check whether we need to restart threads */
  782. blosc_set_nthreads_(context);
  783. /* Set sentinels */
  784. context->thread_giveup_code = 1;
  785. context->thread_nblock = -1;
  786. /* Synchronization point for all threads (wait for initialization) */
  787. WAIT_INIT(-1, context);
  788. /* Synchronization point for all threads (wait for finalization) */
  789. WAIT_FINISH(-1, context);
  790. if (context->thread_giveup_code > 0) {
  791. /* Return the total bytes (de-)compressed in threads */
  792. return context->num_output_bytes;
  793. }
  794. else {
  795. /* Compression/decompression gave up. Return error code. */
  796. return context->thread_giveup_code;
  797. }
  798. }
  799. /* Do the compression or decompression of the buffer depending on the
  800. global params. */
  801. static int do_job(struct blosc_context* context)
  802. {
  803. int32_t ntbytes;
  804. /* Run the serial version when nthreads is 1 or when the buffers are
  805. not much larger than blocksize */
  806. if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
  807. ntbytes = serial_blosc(context);
  808. }
  809. else {
  810. ntbytes = parallel_blosc(context);
  811. }
  812. return ntbytes;
  813. }
  814. /* Whether a codec is meant for High Compression Ratios */
  815. #define HCR(codec) ( \
  816. ((codec) == BLOSC_LZ4HC) || \
  817. ((codec) == BLOSC_ZLIB) || \
  818. ((codec) == BLOSC_ZSTD) ? 1 : 0 )
  819. /* Conditions for splitting a block before compressing with a codec. */
  820. static int split_block(int compcode, int typesize, int blocksize) {
  821. int splitblock = -1;
  822. switch (g_splitmode) {
  823. case BLOSC_ALWAYS_SPLIT:
  824. splitblock = 1;
  825. break;
  826. case BLOSC_NEVER_SPLIT:
  827. splitblock = 0;
  828. break;
  829. case BLOSC_AUTO_SPLIT:
  830. /* Normally all the compressors designed for speed benefit from a
  831. split. However, in conducted benchmarks LZ4 seems that it runs
  832. faster if we don't split, which is quite surprising. */
  833. splitblock= (((compcode == BLOSC_BLOSCLZ) ||
  834. (compcode == BLOSC_SNAPPY)) &&
  835. (typesize <= MAX_SPLITS) &&
  836. (blocksize / typesize) >= MIN_BUFFERSIZE);
  837. break;
  838. case BLOSC_FORWARD_COMPAT_SPLIT:
  839. /* The zstd support was introduced at the same time than the split flag, so
  840. * there should be not a problem with not splitting bloscks with it */
  841. splitblock = ((compcode != BLOSC_ZSTD) &&
  842. (typesize <= MAX_SPLITS) &&
  843. (blocksize / typesize) >= MIN_BUFFERSIZE);
  844. break;
  845. default:
  846. fprintf(stderr, "Split mode %d not supported", g_splitmode);
  847. }
  848. return splitblock;
  849. }
  850. static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
  851. int32_t typesize, int32_t nbytes,
  852. int32_t forced_blocksize)
  853. {
  854. int32_t blocksize;
  855. /* Protection against very small buffers */
  856. if (nbytes < (int32_t)typesize) {
  857. return 1;
  858. }
  859. blocksize = nbytes; /* Start by a whole buffer as blocksize */
  860. if (forced_blocksize) {
  861. blocksize = forced_blocksize;
  862. /* Check that forced blocksize is not too small */
  863. if (blocksize < MIN_BUFFERSIZE) {
  864. blocksize = MIN_BUFFERSIZE;
  865. }
  866. }
  867. else if (nbytes >= L1) {
  868. blocksize = L1;
  869. /* For HCR codecs, increase the block sizes by a factor of 2 because they
  870. are meant for compressing large blocks (i.e. they show a big overhead
  871. when compressing small ones). */
  872. if (HCR(context->compcode)) {
  873. blocksize *= 2;
  874. }
  875. switch (clevel) {
  876. case 0:
  877. /* Case of plain copy */
  878. blocksize /= 4;
  879. break;
  880. case 1:
  881. blocksize /= 2;
  882. break;
  883. case 2:
  884. blocksize *= 1;
  885. break;
  886. case 3:
  887. blocksize *= 2;
  888. break;
  889. case 4:
  890. case 5:
  891. blocksize *= 4;
  892. break;
  893. case 6:
  894. case 7:
  895. case 8:
  896. blocksize *= 8;
  897. break;
  898. case 9:
  899. blocksize *= 8;
  900. if (HCR(context->compcode)) {
  901. blocksize *= 2;
  902. }
  903. break;
  904. default:
  905. assert(0);
  906. break;
  907. }
  908. }
  909. /* Enlarge the blocksize for splittable codecs */
  910. if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
  911. if (blocksize > (1 << 16)) {
  912. /* Do not use a too large split buffer (> 64 KB) for splitting codecs */
  913. blocksize = (1 << 16);
  914. }
  915. blocksize *= typesize;
  916. if (blocksize < (1 << 16)) {
  917. /* Do not use a too small blocksize (< 64 KB) when typesize is small */
  918. blocksize = (1 << 16);
  919. }
  920. }
  921. /* Check that blocksize is not too large */
  922. if (blocksize > (int32_t)nbytes) {
  923. blocksize = nbytes;
  924. }
  925. /* blocksize *must absolutely* be a multiple of the typesize */
  926. if (blocksize > typesize) {
  927. blocksize = blocksize / typesize * typesize;
  928. }
  929. return blocksize;
  930. }
  931. static int initialize_context_compression(struct blosc_context* context,
  932. int clevel,
  933. int doshuffle,
  934. size_t typesize,
  935. size_t sourcesize,
  936. const void* src,
  937. void* dest,
  938. size_t destsize,
  939. int32_t compressor,
  940. int32_t blocksize,
  941. int32_t numthreads)
  942. {
  943. /* Set parameters */
  944. context->compress = 1;
  945. context->src = (const uint8_t*)src;
  946. context->dest = (uint8_t *)(dest);
  947. context->num_output_bytes = 0;
  948. context->destsize = (int32_t)destsize;
  949. context->sourcesize = sourcesize;
  950. context->typesize = typesize;
  951. context->compcode = compressor;
  952. context->numthreads = numthreads;
  953. context->end_threads = 0;
  954. context->clevel = clevel;
  955. /* Check buffer size limits */
  956. if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
  957. /* If buffer is too large, give up. */
  958. fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
  959. BLOSC_MAX_BUFFERSIZE);
  960. return -1;
  961. }
  962. /* Compression level */
  963. if (clevel < 0 || clevel > 9) {
  964. /* If clevel not in 0..9, print an error */
  965. fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
  966. return -10;
  967. }
  968. /* Shuffle */
  969. if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
  970. fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
  971. return -10;
  972. }
  973. /* Check typesize limits */
  974. if (context->typesize > BLOSC_MAX_TYPESIZE) {
  975. /* If typesize is too large, treat buffer as an 1-byte stream. */
  976. context->typesize = 1;
  977. }
  978. /* Get the blocksize */
  979. context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
  980. /* Compute number of blocks in buffer */
  981. context->nblocks = context->sourcesize / context->blocksize;
  982. context->leftover = context->sourcesize % context->blocksize;
  983. context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
  984. return 1;
  985. }
  986. static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
  987. {
  988. int32_t compformat;
  989. int dont_split;
  990. /* Write version header for this block */
  991. context->dest[0] = BLOSC_VERSION_FORMAT; /* blosc format version */
  992. /* Write compressor format */
  993. compformat = -1;
  994. switch (context->compcode)
  995. {
  996. case BLOSC_BLOSCLZ:
  997. compformat = BLOSC_BLOSCLZ_FORMAT;
  998. context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
  999. break;
  1000. #if defined(HAVE_LZ4)
  1001. case BLOSC_LZ4:
  1002. compformat = BLOSC_LZ4_FORMAT;
  1003. context->dest[1] = BLOSC_LZ4_VERSION_FORMAT; /* lz4 format version */
  1004. break;
  1005. case BLOSC_LZ4HC:
  1006. compformat = BLOSC_LZ4HC_FORMAT;
  1007. context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
  1008. break;
  1009. #endif /* HAVE_LZ4 */
  1010. #if defined(HAVE_SNAPPY)
  1011. case BLOSC_SNAPPY:
  1012. compformat = BLOSC_SNAPPY_FORMAT;
  1013. context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT; /* snappy format version */
  1014. break;
  1015. #endif /* HAVE_SNAPPY */
  1016. #if defined(HAVE_ZLIB)
  1017. case BLOSC_ZLIB:
  1018. compformat = BLOSC_ZLIB_FORMAT;
  1019. context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT; /* zlib format version */
  1020. break;
  1021. #endif /* HAVE_ZLIB */
  1022. #if defined(HAVE_ZSTD)
  1023. case BLOSC_ZSTD:
  1024. compformat = BLOSC_ZSTD_FORMAT;
  1025. context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT; /* zstd format version */
  1026. break;
  1027. #endif /* HAVE_ZSTD */
  1028. default:
  1029. {
  1030. const char *compname;
  1031. compname = clibcode_to_clibname(compformat);
  1032. fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
  1033. fprintf(stderr, "compression support. Please use one having it.");
  1034. return -5; /* signals no compression support */
  1035. break;
  1036. }
  1037. }
  1038. context->header_flags = context->dest+2; /* flags */
  1039. context->dest[2] = 0; /* zeroes flags */
  1040. context->dest[3] = (uint8_t)context->typesize; /* type size */
  1041. _sw32(context->dest + 4, context->sourcesize); /* size of the buffer */
  1042. _sw32(context->dest + 8, context->blocksize); /* block size */
  1043. context->bstarts = context->dest + 16; /* starts for every block */
  1044. context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks; /* space for header and pointers */
  1045. if (context->clevel == 0) {
  1046. /* Compression level 0 means buffer to be memcpy'ed */
  1047. *(context->header_flags) |= BLOSC_MEMCPYED;
  1048. context->num_output_bytes = 16; /* space just for header */
  1049. }
  1050. if (context->sourcesize < MIN_BUFFERSIZE) {
  1051. /* Buffer is too small. Try memcpy'ing. */
  1052. *(context->header_flags) |= BLOSC_MEMCPYED;
  1053. context->num_output_bytes = 16; /* space just for header */
  1054. }
  1055. if (doshuffle == BLOSC_SHUFFLE) {
  1056. /* Byte-shuffle is active */
  1057. *(context->header_flags) |= BLOSC_DOSHUFFLE; /* bit 0 set to one in flags */
  1058. }
  1059. if (doshuffle == BLOSC_BITSHUFFLE) {
  1060. /* Bit-shuffle is active */
  1061. *(context->header_flags) |= BLOSC_DOBITSHUFFLE; /* bit 2 set to one in flags */
  1062. }
  1063. dont_split = !split_block(context->compcode, context->typesize,
  1064. context->blocksize);
  1065. *(context->header_flags) |= dont_split << 4; /* dont_split is in bit 4 */
  1066. *(context->header_flags) |= compformat << 5; /* compressor format starts at bit 5 */
  1067. return 1;
  1068. }
  1069. int blosc_compress_context(struct blosc_context* context)
  1070. {
  1071. int32_t ntbytes = 0;
  1072. if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
  1073. (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
  1074. return 0; /* data cannot be copied without overrun destination */
  1075. }
  1076. /* Do the actual compression */
  1077. ntbytes = do_job(context);
  1078. if (ntbytes < 0) {
  1079. return -1;
  1080. }
  1081. if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
  1082. /* Last chance for fitting `src` buffer in `dest`. Update flags and force a copy. */
  1083. *(context->header_flags) |= BLOSC_MEMCPYED;
  1084. context->num_output_bytes = BLOSC_MAX_OVERHEAD; /* reset the output bytes in previous step */
  1085. ntbytes = do_job(context);
  1086. if (ntbytes < 0) {
  1087. return -1;
  1088. }
  1089. }
  1090. /* Set the number of compressed bytes in header */
  1091. _sw32(context->dest + 12, ntbytes);
  1092. assert(ntbytes <= context->destsize);
  1093. return ntbytes;
  1094. }
  1095. /* The public routine for compression with context. */
  1096. int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
  1097. size_t nbytes, const void* src, void* dest,
  1098. size_t destsize, const char* compressor,
  1099. size_t blocksize, int numinternalthreads)
  1100. {
  1101. int error, result;
  1102. struct blosc_context context;
  1103. context.threads_started = 0;
  1104. error = initialize_context_compression(&context, clevel, doshuffle, typesize,
  1105. nbytes, src, dest, destsize,
  1106. blosc_compname_to_compcode(compressor),
  1107. blocksize, numinternalthreads);
  1108. if (error < 0) { return error; }
  1109. error = write_compression_header(&context, clevel, doshuffle);
  1110. if (error < 0) { return error; }
  1111. result = blosc_compress_context(&context);
  1112. if (numinternalthreads > 1)
  1113. {
  1114. blosc_release_threadpool(&context);
  1115. }
  1116. return result;
  1117. }
  1118. /* The public routine for compression. See blosc.h for docstrings. */
  1119. int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
  1120. const void *src, void *dest, size_t destsize)
  1121. {
  1122. int error;
  1123. int result;
  1124. char* envvar;
  1125. /* Check if should initialize */
  1126. if (!g_initlib) blosc_init();
  1127. /* Check for a BLOSC_CLEVEL environment variable */
  1128. envvar = getenv("BLOSC_CLEVEL");
  1129. if (envvar != NULL) {
  1130. long value;
  1131. value = strtol(envvar, NULL, 10);
  1132. if ((value != EINVAL) && (value >= 0)) {
  1133. clevel = (int)value;
  1134. }
  1135. }
  1136. /* Check for a BLOSC_SHUFFLE environment variable */
  1137. envvar = getenv("BLOSC_SHUFFLE");
  1138. if (envvar != NULL) {
  1139. if (strcmp(envvar, "NOSHUFFLE") == 0) {
  1140. doshuffle = BLOSC_NOSHUFFLE;
  1141. }
  1142. if (strcmp(envvar, "SHUFFLE") == 0) {
  1143. doshuffle = BLOSC_SHUFFLE;
  1144. }
  1145. if (strcmp(envvar, "BITSHUFFLE") == 0) {
  1146. doshuffle = BLOSC_BITSHUFFLE;
  1147. }
  1148. }
  1149. /* Check for a BLOSC_TYPESIZE environment variable */
  1150. envvar = getenv("BLOSC_TYPESIZE");
  1151. if (envvar != NULL) {
  1152. long value;
  1153. value = strtol(envvar, NULL, 10);
  1154. if ((value != EINVAL) && (value > 0)) {
  1155. typesize = (int)value;
  1156. }
  1157. }
  1158. /* Check for a BLOSC_COMPRESSOR environment variable */
  1159. envvar = getenv("BLOSC_COMPRESSOR");
  1160. if (envvar != NULL) {
  1161. result = blosc_set_compressor(envvar);
  1162. if (result < 0) { return result; }
  1163. }
  1164. /* Check for a BLOSC_COMPRESSOR environment variable */
  1165. envvar = getenv("BLOSC_BLOCKSIZE");
  1166. if (envvar != NULL) {
  1167. long blocksize;
  1168. blocksize = strtol(envvar, NULL, 10);
  1169. if ((blocksize != EINVAL) && (blocksize > 0)) {
  1170. blosc_set_blocksize((size_t)blocksize);
  1171. }
  1172. }
  1173. /* Check for a BLOSC_NTHREADS environment variable */
  1174. envvar = getenv("BLOSC_NTHREADS");
  1175. if (envvar != NULL) {
  1176. long nthreads;
  1177. nthreads = strtol(envvar, NULL, 10);
  1178. if ((nthreads != EINVAL) && (nthreads > 0)) {
  1179. result = blosc_set_nthreads((int)nthreads);
  1180. if (result < 0) { return result; }
  1181. }
  1182. }
  1183. /* Check for a BLOSC_SPLITMODE environment variable */
  1184. envvar = getenv("BLOSC_SPLITMODE");
  1185. if (envvar != NULL) {
  1186. if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
  1187. blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
  1188. }
  1189. else if (strcmp(envvar, "AUTO") == 0) {
  1190. blosc_set_splitmode(BLOSC_AUTO_SPLIT);
  1191. }
  1192. else if (strcmp(envvar, "ALWAYS") == 0) {
  1193. blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
  1194. }
  1195. else if (strcmp(envvar, "NEVER") == 0) {
  1196. blosc_set_splitmode(BLOSC_NEVER_SPLIT);
  1197. }
  1198. else {
  1199. fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
  1200. return -1;
  1201. }
  1202. }
  1203. /* Check for a BLOSC_NOLOCK environment variable. It is important
  1204. that this should be the last env var so that it can take the
  1205. previous ones into account */
  1206. envvar = getenv("BLOSC_NOLOCK");
  1207. if (envvar != NULL) {
  1208. const char *compname;
  1209. blosc_compcode_to_compname(g_compressor, &compname);
  1210. result = blosc_compress_ctx(clevel, doshuffle, typesize,
  1211. nbytes, src, dest, destsize,
  1212. compname, g_force_blocksize, g_threads);
  1213. return result;
  1214. }
  1215. pthread_mutex_lock(&global_comp_mutex);
  1216. error = initialize_context_compression(g_global_context, clevel, doshuffle,
  1217. typesize, nbytes, src, dest, destsize,
  1218. g_compressor, g_force_blocksize,
  1219. g_threads);
  1220. if (error < 0) { return error; }
  1221. error = write_compression_header(g_global_context, clevel, doshuffle);
  1222. if (error < 0) { return error; }
  1223. result = blosc_compress_context(g_global_context);
  1224. pthread_mutex_unlock(&global_comp_mutex);
  1225. return result;
  1226. }
  1227. int blosc_run_decompression_with_context(struct blosc_context* context,
  1228. const void* src,
  1229. void* dest,
  1230. size_t destsize,
  1231. int numinternalthreads)
  1232. {
  1233. uint8_t version;
  1234. int32_t ntbytes;
  1235. context->compress = 0;
  1236. context->src = (const uint8_t*)src;
  1237. context->dest = (uint8_t*)dest;
  1238. context->destsize = destsize;
  1239. context->num_output_bytes = 0;
  1240. context->numthreads = numinternalthreads;
  1241. context->end_threads = 0;
  1242. /* Read the header block */
  1243. version = context->src[0]; /* blosc format version */
  1244. context->compversion = context->src[1];
  1245. context->header_flags = (uint8_t*)(context->src + 2); /* flags */
  1246. context->typesize = (int32_t)context->src[3]; /* typesize */
  1247. context->sourcesize = sw32_(context->src + 4); /* buffer size */
  1248. context->blocksize = sw32_(context->src + 8); /* block size */
  1249. if (version != BLOSC_VERSION_FORMAT) {
  1250. /* Version from future */
  1251. return -1;
  1252. }
  1253. if (*context->header_flags & 0x08) {
  1254. /* compressor flags from the future */
  1255. return -1;
  1256. }
  1257. context->bstarts = (uint8_t*)(context->src + 16);
  1258. /* Compute some params */
  1259. /* Total blocks */
  1260. context->nblocks = context->sourcesize / context->blocksize;
  1261. context->leftover = context->sourcesize % context->blocksize;
  1262. context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
  1263. /* Check that we have enough space to decompress */
  1264. if (context->sourcesize > (int32_t)destsize) {
  1265. return -1;
  1266. }
  1267. /* Do the actual decompression */
  1268. ntbytes = do_job(context);
  1269. if (ntbytes < 0) {
  1270. return -1;
  1271. }
  1272. assert(ntbytes <= (int32_t)destsize);
  1273. return ntbytes;
  1274. }
  1275. /* The public routine for decompression with context. */
  1276. int blosc_decompress_ctx(const void *src, void *dest, size_t destsize,
  1277. int numinternalthreads)
  1278. {
  1279. int result;
  1280. struct blosc_context context;
  1281. context.threads_started = 0;
  1282. result = blosc_run_decompression_with_context(&context, src, dest, destsize, numinternalthreads);
  1283. if (numinternalthreads > 1)
  1284. {
  1285. blosc_release_threadpool(&context);
  1286. }
  1287. return result;
  1288. }
  1289. /* The public routine for decompression. See blosc.h for docstrings. */
  1290. int blosc_decompress(const void *src, void *dest, size_t destsize)
  1291. {
  1292. int result;
  1293. char* envvar;
  1294. long nthreads;
  1295. /* Check if should initialize */
  1296. if (!g_initlib) blosc_init();
  1297. /* Check for a BLOSC_NTHREADS environment variable */
  1298. envvar = getenv("BLOSC_NTHREADS");
  1299. if (envvar != NULL) {
  1300. nthreads = strtol(envvar, NULL, 10);
  1301. if ((nthreads != EINVAL) && (nthreads > 0)) {
  1302. result = blosc_set_nthreads((int)nthreads);
  1303. if (result < 0) { return result; }
  1304. }
  1305. }
  1306. /* Check for a BLOSC_NOLOCK environment variable. It is important
  1307. that this should be the last env var so that it can take the
  1308. previous ones into account */
  1309. envvar = getenv("BLOSC_NOLOCK");
  1310. if (envvar != NULL) {
  1311. result = blosc_decompress_ctx(src, dest, destsize, g_threads);
  1312. return result;
  1313. }
  1314. pthread_mutex_lock(&global_comp_mutex);
  1315. result = blosc_run_decompression_with_context(g_global_context, src, dest,
  1316. destsize, g_threads);
  1317. pthread_mutex_unlock(&global_comp_mutex);
  1318. return result;
  1319. }
  1320. /* Specific routine optimized for decompression a small number of
  1321. items out of a compressed chunk. This does not use threads because
  1322. it would affect negatively to performance. */
  1323. int blosc_getitem(const void *src, int start, int nitems, void *dest)
  1324. {
  1325. uint8_t *_src=NULL; /* current pos for source buffer */
  1326. uint8_t version, compversion; /* versions for compressed header */
  1327. uint8_t flags; /* flags for header */
  1328. int32_t ntbytes = 0; /* the number of uncompressed bytes */
  1329. int32_t nblocks; /* number of total blocks in buffer */
  1330. int32_t leftover; /* extra bytes at end of buffer */
  1331. uint8_t *bstarts; /* start pointers for each block */
  1332. int32_t typesize, blocksize, nbytes;
  1333. int32_t j, bsize, bsize2, leftoverblock;
  1334. int32_t cbytes, startb, stopb;
  1335. int stop = start + nitems;
  1336. uint8_t *tmp;
  1337. uint8_t *tmp2;
  1338. uint8_t *tmp3;
  1339. int32_t ebsize;
  1340. _src = (uint8_t *)(src);
  1341. /* Read the header block */
  1342. version = _src[0]; /* blosc format version */
  1343. compversion = _src[1];
  1344. flags = _src[2]; /* flags */
  1345. typesize = (int32_t)_src[3]; /* typesize */
  1346. nbytes = sw32_(_src + 4); /* buffer size */
  1347. blocksize = sw32_(_src + 8); /* block size */
  1348. if (version != BLOSC_VERSION_FORMAT)
  1349. return -9;
  1350. ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
  1351. tmp = my_malloc(blocksize + ebsize + blocksize);
  1352. tmp2 = tmp + blocksize;
  1353. tmp3 = tmp + blocksize + ebsize;
  1354. _src += 16;
  1355. bstarts = _src;
  1356. /* Compute some params */
  1357. /* Total blocks */
  1358. nblocks = nbytes / blocksize;
  1359. leftover = nbytes % blocksize;
  1360. nblocks = (leftover>0)? nblocks+1: nblocks;
  1361. _src += sizeof(int32_t)*nblocks;
  1362. /* Check region boundaries */
  1363. if ((start < 0) || (start*typesize > nbytes)) {
  1364. fprintf(stderr, "`start` out of bounds");
  1365. return -1;
  1366. }
  1367. if ((stop < 0) || (stop*typesize > nbytes)) {
  1368. fprintf(stderr, "`start`+`nitems` out of bounds");
  1369. return -1;
  1370. }
  1371. for (j = 0; j < nblocks; j++) {
  1372. bsize = blocksize;
  1373. leftoverblock = 0;
  1374. if ((j == nblocks - 1) && (leftover > 0)) {
  1375. bsize = leftover;
  1376. leftoverblock = 1;
  1377. }
  1378. /* Compute start & stop for each block */
  1379. startb = start * typesize - j * blocksize;
  1380. stopb = stop * typesize - j * blocksize;
  1381. if ((startb >= (int)blocksize) || (stopb <= 0)) {
  1382. continue;
  1383. }
  1384. if (startb < 0) {
  1385. startb = 0;
  1386. }
  1387. if (stopb > (int)blocksize) {
  1388. stopb = blocksize;
  1389. }
  1390. bsize2 = stopb - startb;
  1391. /* Do the actual data copy */
  1392. if (flags & BLOSC_MEMCPYED) {
  1393. /* We want to memcpy only */
  1394. fastcopy((uint8_t *) dest + ntbytes,
  1395. (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
  1396. cbytes = bsize2;
  1397. }
  1398. else {
  1399. struct blosc_context context = {0};
  1400. /* Only initialize the fields blosc_d uses */
  1401. context.typesize = typesize;
  1402. context.header_flags = &flags;
  1403. context.compversion = compversion;
  1404. /* Regular decompression. Put results in tmp2. */
  1405. cbytes = blosc_d(&context, bsize, leftoverblock,
  1406. (uint8_t *)src + sw32_(bstarts + j * 4),
  1407. tmp2, tmp, tmp3);
  1408. if (cbytes < 0) {
  1409. ntbytes = cbytes;
  1410. break;
  1411. }
  1412. /* Copy to destination */
  1413. fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
  1414. cbytes = bsize2;
  1415. }
  1416. ntbytes += cbytes;
  1417. }
  1418. my_free(tmp);
  1419. return ntbytes;
  1420. }
  1421. /* Decompress & unshuffle several blocks in a single thread */
  1422. static void *t_blosc(void *ctxt)
  1423. {
  1424. struct thread_context* context = (struct thread_context*)ctxt;
  1425. int32_t cbytes, ntdest;
  1426. int32_t tblocks; /* number of blocks per thread */
  1427. int32_t leftover2;
  1428. int32_t tblock; /* limit block on a thread */
  1429. int32_t nblock_; /* private copy of nblock */
  1430. int32_t bsize, leftoverblock;
  1431. /* Parameters for threads */
  1432. int32_t blocksize;
  1433. int32_t ebsize;
  1434. int32_t compress;
  1435. int32_t maxbytes;
  1436. int32_t ntbytes;
  1437. int32_t flags;
  1438. int32_t nblocks;
  1439. int32_t leftover;
  1440. uint8_t *bstarts;
  1441. const uint8_t *src;
  1442. uint8_t *dest;
  1443. uint8_t *tmp;
  1444. uint8_t *tmp2;
  1445. uint8_t *tmp3;
  1446. int rc;
  1447. while(1)
  1448. {
  1449. /* Synchronization point for all threads (wait for initialization) */
  1450. WAIT_INIT(NULL, context->parent_context);
  1451. if(context->parent_context->end_threads)
  1452. {
  1453. break;
  1454. }
  1455. /* Get parameters for this thread before entering the main loop */
  1456. blocksize = context->parent_context->blocksize;
  1457. ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
  1458. compress = context->parent_context->compress;
  1459. flags = *(context->parent_context->header_flags);
  1460. maxbytes = context->parent_context->destsize;
  1461. nblocks = context->parent_context->nblocks;
  1462. leftover = context->parent_context->leftover;
  1463. bstarts = context->parent_context->bstarts;
  1464. src = context->parent_context->src;
  1465. dest = context->parent_context->dest;
  1466. if (blocksize > context->tmpblocksize)
  1467. {
  1468. my_free(context->tmp);
  1469. context->tmp = my_malloc(blocksize + ebsize + blocksize);
  1470. context->tmp2 = context->tmp + blocksize;
  1471. context->tmp3 = context->tmp + blocksize + ebsize;
  1472. }
  1473. tmp = context->tmp;
  1474. tmp2 = context->tmp2;
  1475. tmp3 = context->tmp3;
  1476. ntbytes = 0; /* only useful for decompression */
  1477. if (compress && !(flags & BLOSC_MEMCPYED)) {
  1478. /* Compression always has to follow the block order */
  1479. pthread_mutex_lock(&context->parent_context->count_mutex);
  1480. context->parent_context->thread_nblock++;
  1481. nblock_ = context->parent_context->thread_nblock;
  1482. pthread_mutex_unlock(&context->parent_context->count_mutex);
  1483. tblock = nblocks;
  1484. }
  1485. else {
  1486. /* Decompression can happen using any order. We choose
  1487. sequential block order on each thread */
  1488. /* Blocks per thread */
  1489. tblocks = nblocks / context->parent_context->numthreads;
  1490. leftover2 = nblocks % context->parent_context->numthreads;
  1491. tblocks = (leftover2>0)? tblocks+1: tblocks;
  1492. nblock_ = context->tid*tblocks;
  1493. tblock = nblock_ + tblocks;
  1494. if (tblock > nblocks) {
  1495. tblock = nblocks;
  1496. }
  1497. }
  1498. /* Loop over blocks */
  1499. leftoverblock = 0;
  1500. while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
  1501. bsize = blocksize;
  1502. if (nblock_ == (nblocks - 1) && (leftover > 0)) {
  1503. bsize = leftover;
  1504. leftoverblock = 1;
  1505. }
  1506. if (compress) {
  1507. if (flags & BLOSC_MEMCPYED) {
  1508. /* We want to memcpy only */
  1509. fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, src + nblock_ * blocksize,
  1510. bsize);
  1511. cbytes = bsize;
  1512. }
  1513. else {
  1514. /* Regular compression */
  1515. cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
  1516. src+nblock_*blocksize, tmp2, tmp, tmp3);
  1517. }
  1518. }
  1519. else {
  1520. if (flags & BLOSC_MEMCPYED) {
  1521. /* We want to memcpy only */
  1522. fastcopy(dest + nblock_ * blocksize, src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
  1523. bsize);
  1524. cbytes = bsize;
  1525. }
  1526. else {
  1527. cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
  1528. src + sw32_(bstarts + nblock_ * 4),
  1529. dest+nblock_*blocksize,
  1530. tmp, tmp2);
  1531. }
  1532. }
  1533. /* Check whether current thread has to giveup */
  1534. if (context->parent_context->thread_giveup_code <= 0) {
  1535. break;
  1536. }
  1537. /* Check results for the compressed/decompressed block */
  1538. if (cbytes < 0) { /* compr/decompr failure */
  1539. /* Set giveup_code error */
  1540. pthread_mutex_lock(&context->parent_context->count_mutex);
  1541. context->parent_context->thread_giveup_code = cbytes;
  1542. pthread_mutex_unlock(&context->parent_context->count_mutex);
  1543. break;
  1544. }
  1545. if (compress && !(flags & BLOSC_MEMCPYED)) {
  1546. /* Start critical section */
  1547. pthread_mutex_lock(&context->parent_context->count_mutex);
  1548. ntdest = context->parent_context->num_output_bytes;
  1549. _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
  1550. if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
  1551. context->parent_context->thread_giveup_code = 0; /* uncompressible buffer */
  1552. pthread_mutex_unlock(&context->parent_context->count_mutex);
  1553. break;
  1554. }
  1555. context->parent_context->thread_nblock++;
  1556. nblock_ = context->parent_context->thread_nblock;
  1557. context->parent_context->num_output_bytes += cbytes; /* update return bytes counter */
  1558. pthread_mutex_unlock(&context->parent_context->count_mutex);
  1559. /* End of critical section */
  1560. /* Copy the compressed buffer to destination */
  1561. fastcopy(dest + ntdest, tmp2, cbytes);
  1562. }
  1563. else {
  1564. nblock_++;
  1565. /* Update counter for this thread */
  1566. ntbytes += cbytes;
  1567. }
  1568. } /* closes while (nblock_) */
  1569. /* Sum up all the bytes decompressed */
  1570. if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
  1571. /* Update global counter for all threads (decompression only) */
  1572. pthread_mutex_lock(&context->parent_context->count_mutex);
  1573. context->parent_context->num_output_bytes += ntbytes;
  1574. pthread_mutex_unlock(&context->parent_context->count_mutex);
  1575. }
  1576. /* Meeting point for all threads (wait for finalization) */
  1577. WAIT_FINISH(NULL, context->parent_context);
  1578. }
  1579. /* Cleanup our working space and context */
  1580. my_free(context->tmp);
  1581. my_free(context);
  1582. return(NULL);
  1583. }
  1584. static int init_threads(struct blosc_context* context)
  1585. {
  1586. int32_t tid;
  1587. int rc2;
  1588. int32_t ebsize;
  1589. struct thread_context* thread_context;
  1590. /* Initialize mutex and condition variable objects */
  1591. pthread_mutex_init(&context->count_mutex, NULL);
  1592. /* Set context thread sentinels */
  1593. context->thread_giveup_code = 1;
  1594. context->thread_nblock = -1;
  1595. /* Barrier initialization */
  1596. #ifdef _POSIX_BARRIERS_MINE
  1597. pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
  1598. pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
  1599. #else
  1600. pthread_mutex_init(&context->count_threads_mutex, NULL);
  1601. pthread_cond_init(&context->count_threads_cv, NULL);
  1602. context->count_threads = 0; /* Reset threads counter */
  1603. #endif
  1604. #if !defined(_WIN32)
  1605. /* Initialize and set thread detached attribute */
  1606. pthread_attr_init(&context->ct_attr);
  1607. pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
  1608. #endif
  1609. /* Finally, create the threads in detached state */
  1610. for (tid = 0; tid < context->numthreads; tid++) {
  1611. context->tids[tid] = tid;
  1612. /* Create a thread context thread owns context (will destroy when finished) */
  1613. thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
  1614. thread_context->parent_context = context;
  1615. thread_context->tid = tid;
  1616. ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
  1617. thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
  1618. thread_context->tmp2 = thread_context->tmp + context->blocksize;
  1619. thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
  1620. thread_context->tmpblocksize = context->blocksize;
  1621. #if !defined(_WIN32)
  1622. rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
  1623. #else
  1624. rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
  1625. #endif
  1626. if (rc2) {
  1627. fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
  1628. fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
  1629. return(-1);
  1630. }
  1631. }
  1632. return(0);
  1633. }
  1634. int blosc_get_nthreads(void)
  1635. {
  1636. int ret = g_threads;
  1637. return ret;
  1638. }
  1639. int blosc_set_nthreads(int nthreads_new)
  1640. {
  1641. int ret = g_threads;
  1642. /* Check if should initialize */
  1643. if (!g_initlib) blosc_init();
  1644. if (nthreads_new != ret){
  1645. /* Re-initialize Blosc */
  1646. blosc_destroy();
  1647. blosc_init();
  1648. g_threads = nthreads_new;
  1649. }
  1650. return ret;
  1651. }
  1652. int blosc_set_nthreads_(struct blosc_context* context)
  1653. {
  1654. if (context->numthreads > BLOSC_MAX_THREADS) {
  1655. fprintf(stderr,
  1656. "Error. nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
  1657. BLOSC_MAX_THREADS);
  1658. return -1;
  1659. }
  1660. else if (context->numthreads <= 0) {
  1661. fprintf(stderr, "Error. nthreads must be a positive integer");
  1662. return -1;
  1663. }
  1664. /* Launch a new pool of threads */
  1665. if (context->numthreads > 1 && context->numthreads != context->threads_started) {
  1666. blosc_release_threadpool(context);
  1667. init_threads(context);
  1668. }
  1669. /* We have now started the threads */
  1670. context->threads_started = context->numthreads;
  1671. return context->numthreads;
  1672. }
  1673. const char* blosc_get_compressor(void)
  1674. {
  1675. const char* compname;
  1676. blosc_compcode_to_compname(g_compressor, &compname);
  1677. return compname;
  1678. }
  1679. int blosc_set_compressor(const char *compname)
  1680. {
  1681. int code = blosc_compname_to_compcode(compname);
  1682. g_compressor = code;
  1683. /* Check if should initialize */
  1684. if (!g_initlib) blosc_init();
  1685. return code;
  1686. }
  1687. const char* blosc_list_compressors(void)
  1688. {
  1689. static int compressors_list_done = 0;
  1690. static char ret[256];
  1691. if (compressors_list_done) return ret;
  1692. ret[0] = '\0';
  1693. strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
  1694. #if defined(HAVE_LZ4)
  1695. strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
  1696. strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
  1697. #endif /* HAVE_LZ4 */
  1698. #if defined(HAVE_SNAPPY)
  1699. strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
  1700. #endif /* HAVE_SNAPPY */
  1701. #if defined(HAVE_ZLIB)
  1702. strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
  1703. #endif /* HAVE_ZLIB */
  1704. #if defined(HAVE_ZSTD)
  1705. strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
  1706. #endif /* HAVE_ZSTD */
  1707. compressors_list_done = 1;
  1708. return ret;
  1709. }
  1710. const char* blosc_get_version_string(void)
  1711. {
  1712. return BLOSC_VERSION_STRING;
  1713. }
  1714. int blosc_get_complib_info(const char *compname, char **complib, char **version)
  1715. {
  1716. int clibcode;
  1717. const char *clibname;
  1718. const char *clibversion = "unknown";
  1719. #if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
  1720. char sbuffer[256];
  1721. #endif
  1722. clibcode = compname_to_clibcode(compname);
  1723. clibname = clibcode_to_clibname(clibcode);
  1724. /* complib version */
  1725. if (clibcode == BLOSC_BLOSCLZ_LIB) {
  1726. clibversion = BLOSCLZ_VERSION_STRING;
  1727. }
  1728. #if defined(HAVE_LZ4)
  1729. else if (clibcode == BLOSC_LZ4_LIB) {
  1730. #if defined(LZ4_VERSION_MAJOR)
  1731. sprintf(sbuffer, "%d.%d.%d",
  1732. LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
  1733. clibversion = sbuffer;
  1734. #endif /* LZ4_VERSION_MAJOR */
  1735. }
  1736. #endif /* HAVE_LZ4 */
  1737. #if defined(HAVE_SNAPPY)
  1738. else if (clibcode == BLOSC_SNAPPY_LIB) {
  1739. #if defined(SNAPPY_VERSION)
  1740. sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
  1741. clibversion = sbuffer;
  1742. #endif /* SNAPPY_VERSION */
  1743. }
  1744. #endif /* HAVE_SNAPPY */
  1745. #if defined(HAVE_ZLIB)
  1746. else if (clibcode == BLOSC_ZLIB_LIB) {
  1747. clibversion = ZLIB_VERSION;
  1748. }
  1749. #endif /* HAVE_ZLIB */
  1750. #if defined(HAVE_ZSTD)
  1751. else if (clibcode == BLOSC_ZSTD_LIB) {
  1752. sprintf(sbuffer, "%d.%d.%d",
  1753. ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
  1754. clibversion = sbuffer;
  1755. }
  1756. #endif /* HAVE_ZSTD */
  1757. *complib = strdup(clibname);
  1758. *version = strdup(clibversion);
  1759. return clibcode;
  1760. }
  1761. /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
  1762. void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
  1763. size_t *cbytes, size_t *blocksize)
  1764. {
  1765. uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
  1766. uint8_t version = _src[0]; /* version of header */
  1767. if (version != BLOSC_VERSION_FORMAT) {
  1768. *nbytes = *blocksize = *cbytes = 0;
  1769. return;
  1770. }
  1771. /* Read the interesting values */
  1772. *nbytes = (size_t)sw32_(_src + 4); /* uncompressed buffer size */
  1773. *blocksize = (size_t)sw32_(_src + 8); /* block size */
  1774. *cbytes = (size_t)sw32_(_src + 12); /* compressed buffer size */
  1775. }
  1776. /* Return `typesize` and `flags` from a compressed buffer. */
  1777. void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
  1778. int *flags)
  1779. {
  1780. uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
  1781. uint8_t version = _src[0]; /* version of header */
  1782. if (version != BLOSC_VERSION_FORMAT) {
  1783. *flags = *typesize = 0;
  1784. return;
  1785. }
  1786. /* Read the interesting values */
  1787. *flags = (int)_src[2] & 7; /* first three flags */
  1788. *typesize = (size_t)_src[3]; /* typesize */
  1789. }
  1790. /* Return version information from a compressed buffer. */
  1791. void blosc_cbuffer_versions(const void *cbuffer, int *version,
  1792. int *versionlz)
  1793. {
  1794. uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
  1795. /* Read the version info */
  1796. *version = (int)_src[0]; /* blosc format version */
  1797. *versionlz = (int)_src[1]; /* Lempel-Ziv compressor format version */
  1798. }
  1799. /* Return the compressor library/format used in a compressed buffer. */
  1800. const char *blosc_cbuffer_complib(const void *cbuffer)
  1801. {
  1802. uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
  1803. int clibcode;
  1804. const char *complib;
  1805. /* Read the compressor format/library info */
  1806. clibcode = (_src[2] & 0xe0) >> 5;
  1807. complib = clibcode_to_clibname(clibcode);
  1808. return complib;
  1809. }
  1810. /* Get the internal blocksize to be used during compression. 0 means
  1811. that an automatic blocksize is computed internally. */
  1812. int blosc_get_blocksize(void)
  1813. {
  1814. return (int)g_force_blocksize;
  1815. }
  1816. /* Force the use of a specific blocksize. If 0, an automatic
  1817. blocksize will be used (the default). */
  1818. void blosc_set_blocksize(size_t size)
  1819. {
  1820. g_force_blocksize = (int32_t)size;
  1821. }
  1822. /* Force the use of a specific split mode. */
  1823. void blosc_set_splitmode(int mode)
  1824. {
  1825. g_splitmode = mode;
  1826. }
  1827. void blosc_init(void)
  1828. {
  1829. /* Return if we are already initialized */
  1830. if (g_initlib) return;
  1831. pthread_mutex_init(&global_comp_mutex, NULL);
  1832. g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
  1833. g_global_context->threads_started = 0;
  1834. g_initlib = 1;
  1835. }
  1836. void blosc_destroy(void)
  1837. {
  1838. /* Return if Blosc is not initialized */
  1839. if (!g_initlib) return;
  1840. g_initlib = 0;
  1841. blosc_release_threadpool(g_global_context);
  1842. my_free(g_global_context);
  1843. pthread_mutex_destroy(&global_comp_mutex);
  1844. }
  1845. int blosc_release_threadpool(struct blosc_context* context)
  1846. {
  1847. int32_t t;
  1848. void* status;
  1849. int rc;
  1850. int rc2;
  1851. if (context->threads_started > 0)
  1852. {
  1853. /* Tell all existing threads to finish */
  1854. context->end_threads = 1;
  1855. /* Sync threads */
  1856. WAIT_INIT(-1, context);
  1857. /* Join exiting threads */
  1858. for (t=0; t<context->threads_started; t++) {
  1859. rc2 = pthread_join(context->threads[t], &status);
  1860. if (rc2) {
  1861. fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
  1862. fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
  1863. }
  1864. }
  1865. /* Release mutex and condition variable objects */
  1866. pthread_mutex_destroy(&context->count_mutex);
  1867. /* Barriers */
  1868. #ifdef _POSIX_BARRIERS_MINE
  1869. pthread_barrier_destroy(&context->barr_init);
  1870. pthread_barrier_destroy(&context->barr_finish);
  1871. #else
  1872. pthread_mutex_destroy(&context->count_threads_mutex);
  1873. pthread_cond_destroy(&context->count_threads_cv);
  1874. #endif
  1875. /* Thread attributes */
  1876. #if !defined(_WIN32)
  1877. pthread_attr_destroy(&context->ct_attr);
  1878. #endif
  1879. }
  1880. context->threads_started = 0;
  1881. return 0;
  1882. }
  1883. int blosc_free_resources(void)
  1884. {
  1885. /* Return if Blosc is not initialized */
  1886. if (!g_initlib) return -1;
  1887. return blosc_release_threadpool(g_global_context);
  1888. }