Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
bz.cpp
Go to the documentation of this file.
1 #include <xstream/config.h>
2 #include <fstream>
3 
4 #if HAVE_LIBBZ2
5 
6 #include <stdint.h>
7 #include <unistd.h>
8 #include <string.h>
9 #include <cstring>
10 #include <algorithm>
11 #include <cassert>
12 
13 #include <xstream/bz.h>
14 #include <xstream/except/bz.h>
15 
16 #include <bzlib.h>
17 #include <arpa/inet.h>
18 
19 #include "debug.h"
20 
21 // The following two macros must always occur in pairs within a single
22 // block of code, otherwise it will not even compile. This is done on
23 // purpose, to reduce the risk of blunders with deadlocks. Please take
24 // the lock, do the operation, and then release the lock as quickly as
25 // possible. If your function needs to return between the MUTEX_LOCK and
26 // MUTEX_UNLOCK statements, use MUTEX_ESCAPE before the return statement.
27 
28 #define MUTEX_LOCK \
29  { \
30  if (streambuf_mutex != 0) \
31  pthread_mutex_lock(streambuf_mutex); \
32  pthread_mutex_t *mutex_saved = streambuf_mutex; \
33  streambuf_mutex = 0;
34 
35 #define MUTEX_UNLOCK \
36  streambuf_mutex = mutex_saved; \
37  if (streambuf_mutex != 0) \
38  pthread_mutex_unlock(streambuf_mutex); \
39  }
40 
41 #define MUTEX_ESCAPE \
42  streambuf_mutex = mutex_saved; \
43  if (streambuf_mutex != 0) \
44  pthread_mutex_unlock(streambuf_mutex);
45 
46 namespace xstream {
47 namespace bz {
48 
49 // define a set of compressed stream headers that can be used
50 // to remove arbitrary bit-alignment offsets in an input stream
51 
52  static const int bz_header_length[8] = {35,36,34,44,40,38,40,43};
53  static const unsigned char bz_header[8][48] = {
54  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
55  0x53, 0x59, 0x86, 0xad, 0x3f, 0xf0, 0x00, 0x00,
56  0x02, 0x48, 0x00, 0x04, 0x00, 0x20, 0x00, 0x20,
57  0x00, 0x21, 0x00, 0x82, 0x0b, 0x31, 0x41, 0x59,
58  0x26, 0x53, 0x59 },
59  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
60  0x53, 0x59, 0x7b, 0x68, 0x3e, 0x4a, 0x00, 0x00,
61  0x01, 0x88, 0x00, 0x0f, 0xc0, 0x20, 0x00, 0x21,
62  0x80, 0x0c, 0x01, 0x37, 0xa4, 0xbb, 0x98, 0xa0,
63  0xac, 0x93, 0x29, 0xac },
64  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
65  0x53, 0x59, 0x83, 0x69, 0xfc, 0x04, 0x00, 0x00,
66  0x01, 0x88, 0x00, 0x30, 0x00, 0x20, 0x00, 0x30,
67  0x80, 0x2a, 0x69, 0x11, 0xcc, 0x50, 0x56, 0x49,
68  0x94, 0xd6 },
69  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
70  0x53, 0x59, 0xd5, 0x0d, 0x1c, 0x28, 0x00, 0x00,
71  0x01, 0x51, 0x80, 0x00, 0x10, 0x01, 0x01, 0x80,
72  0x02, 0x01, 0x80, 0x20, 0x00, 0x31, 0x0c, 0x01,
73  0x06, 0x9b, 0x47, 0xe7, 0x38, 0x04, 0xa6, 0x28,
74  0x2b, 0x24, 0xca, 0x6b },
75  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
76  0x53, 0x59, 0x44, 0x1f, 0x23, 0x2f, 0x00, 0x00,
77  0x02, 0x11, 0x00, 0x00, 0x00, 0xa5, 0xa2, 0xa0,
78  0x00, 0x22, 0x06, 0x9a, 0x7a, 0x10, 0xc0, 0x8e,
79  0x10, 0xd8, 0x43, 0x14, 0x15, 0x92, 0x65, 0x35 },
80  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
81  0x53, 0x59, 0x83, 0x90, 0x27, 0xae, 0x00, 0x00,
82  0x01, 0x81, 0x80, 0x0c, 0x00, 0x14, 0x20, 0x20,
83  0x00, 0x21, 0x86, 0x81, 0x9a, 0x09, 0x4d, 0xa8,
84  0xb9, 0x8a, 0x0a, 0xc9, 0x32, 0x9a },
85  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
86  0x53, 0x59, 0x8d, 0x4f, 0x1e, 0x72, 0x00, 0x00,
87  0x04, 0x41, 0x80, 0x40, 0x00, 0x00, 0x20, 0x14,
88  0x60, 0x20, 0x00, 0x30, 0xc0, 0x08, 0x63, 0x45,
89  0x84, 0x0f, 0xb8, 0xc5, 0x05, 0x64, 0x99, 0x4d },
90  { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
91  0x53, 0x59, 0x64, 0xf2, 0x3a, 0xdc, 0x00, 0x00,
92  0x00, 0x9e, 0x00, 0x04, 0x00, 0x30, 0x00, 0x02,
93  0x08, 0x08, 0x80, 0x20, 0x00, 0x31, 0x0c, 0x01,
94  0x06, 0x99, 0xa4, 0xe4, 0x4c, 0x0a, 0x62, 0x82,
95  0xb2, 0x4c, 0xa6 }
96  };
97 
98  static const int eof = std::streambuf::traits_type::eof();
99 
100  struct pimpl: public bz_stream {};
101 
102  static inline int flush_macro(const flush_kind f) {
103  switch (f) {
104  case no_sync:
105  return BZ_RUN;
106  case full_sync:
107  return BZ_FLUSH;
108  case finish_sync:
109  return BZ_FINISH;
110  default:
111  //should probably throw
112  return BZ_RUN;
113  }
114  }
115 
116  common::common(std::streambuf *sb)
117  : xstream::common_buffer(sb), z_strm(0), block_start(0), block_offset(0),
118  streambuf_mutex(0)
119  {
120  LOG("bz::common");
121 
122  z_strm = new pimpl;
123 
124  //initialize bzlib structure
125  z_strm->bzalloc = NULL;
126  z_strm->bzfree = NULL;
127  z_strm->opaque = NULL;
128  //buffers
129  z_strm->avail_out = out.size;
130  z_strm->next_out = out.buf;
131 
132  z_strm->avail_in = 0;
133  z_strm->next_in = in.buf;
134  }
135 
136  unsigned long int common::input_count() const {
137  return ((uint64_t)(z_strm->total_in_hi32)<< 32) + (uint64_t)(z_strm->total_in_lo32);
138  }
139 
140  unsigned long int common::output_count() const {
141  return ((uint64_t)(z_strm->total_out_hi32)<< 32) + (uint64_t)(z_strm->total_out_lo32);
142  }
143 
144  common::~common() {
145  LOG("bz::~common");
146  delete z_strm;
147  }
148 
149 
150  //default compression 9
151  ostreambuf::ostreambuf(std::streambuf * sb)
152  : common(sb), level(9) {
153  LOG("bz::ostreambuf without compression level");
154  block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::out);
155  init ();
156  }
157 
158  ostreambuf::ostreambuf (std::streambuf * sb, int l)
159  : common(sb), level(l) {
160  LOG("bz::ostreambuf with compression level " << l);
161  block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::out);
162  init ();
163  }
164 
165  const char* error_str(int err) {
166  switch(err) {
167  case BZ_MEM_ERROR:
168  return "out of memory";
169  case BZ_CONFIG_ERROR:
170  return "bzlib badly configured (bad sizes of int32 (4), int16 (2) or char (1), check and recompile)";
171  case BZ_PARAM_ERROR:
172  return "invalid parameter, possibly invalid compression level";
173  case BZ_SEQUENCE_ERROR:
174  return "bad sequence (this means xstream is buggy)";
175  case BZ_DATA_ERROR:
176  return "invalid or incomplete data (crc failed)";
177  case BZ_DATA_ERROR_MAGIC:
178  return "magic bytes not found in stream";
179  case BZ_IO_ERROR:
180  return "io error";
181  case BZ_UNEXPECTED_EOF:
182  return "premature end of data";
183  case BZ_OUTBUFF_FULL:
184  return "output buffer full";
185  }
186 
187  return "unknown error";
188  }
189 
190  void ostreambuf::raise_error(int err) {
191  std::string what = error_str(err);
192 
193  LOG("bz::ostreambuf::raise_error (" << err << ") = " << what);
194 
195  if (what.size() > 0) {
196  throw compress_error(this,what);
197  } else {
198  throw compress_error(this);
199  }
200  }
201 
202 
203  void ostreambuf::init() {
204  LOG("bz::ostreambuf::init");
205  int cret =::BZ2_bzCompressInit(
206  z_strm,
207  level,
208  0, //verbosity
209  30 //workFactor (default value) controls when to switch to the fallback algorithm
210  );
211 
212  if (BZ_OK != cret) {
213  LOG("bz::ostreambuf::init: error creating bz2stream " << cret);
214  raise_error(cret);
215  }
216  //initialize streambuf interface functions
217  setp(in.buf, in.buf + in.size);
218  }
219 
220  ostreambuf::~ostreambuf() {
221  LOG("bz::ostreambuf::~ostreambuf");
222  //fullsync (write remaining data)
223  flush(finish_sync);
224 
225  //sync underlying streambuf
226  MUTEX_LOCK
227  _sb->pubsync();
228  MUTEX_UNLOCK
229 
230  if (0 != z_strm) {
231  //XXX should I throw an exception in case of error?
232  //remember this is a destructor
233  int cret = ::BZ2_bzCompressEnd(z_strm);
234  if (BZ_OK != cret) {
235  LOG("\tERROR: BZ2_bzCompressEnd returned " << cret);
236  }
237  }
238  }
239 
240  int ostreambuf::sync () {
241  LOG("bz::ostreambuf::sync");
242  int ret;
243  MUTEX_LOCK
244  ret = flush(finish_sync);
245  _sb->pubsync();
246  MUTEX_UNLOCK
247  return ret;
248  }
249 
250 
251  int ostreambuf::overflow (int c) {
252  LOG("bz::ostreambuf::overflow(" << c << ")\t available=" << (available ()) << "\tEOF=" << eof);
253  if (eof == c) {
254  LOG("\tEOF");
255  flush(no_sync);
256  return eof;
257  } else {
258  if (0 == available()) {
259  LOG("\t have to flush :[]");
260  flush(no_sync);
261  }
262  *pptr() = static_cast < char >(c);
263  pbump(1);
264  }
265  return c;
266  }
267 
268  std::streamsize ostreambuf::xsputn(const char *buffer, std::streamsize n) {
269  LOG("bz::ostreambuf::xsputn(" << buffer << "," << n << ")");
270 
271  return flush(no_sync, buffer, n);
272  }
273 
274  int ostreambuf::flush(flush_kind f, const char *appendbuf, int appendsize) {
275  LOG("bz::ostreambuf::flush(" << f << ")");
276  std::streamsize in_s = taken();
277  LOG("\tinput_size=" << in_s);
278 
279  //set up compression engine input feed
280  int written;
281  if (in_s > 0) {
282  z_strm->next_in = pbase();
283  z_strm->avail_in = in_s;
284  written = in_s;
285  } else if (appendsize > 0) {
286  z_strm->next_in = (char*)appendbuf;
287  z_strm->avail_in = appendsize;
288  written = appendsize;
289  appendsize = 0;
290  } else {
291  z_strm->next_in = pbase();
292  z_strm->avail_in = 0;
293  written = 0;
294  }
295  block_offset += written;
296  if (block_offset > (std::streamoff)level * 100000) {
297  f = (f == no_sync)? finish_sync : f;
298  }
299 
300  if (z_strm->avail_in +
301  z_strm->total_in_lo32 + z_strm->total_in_hi32 == 0)
302  {
303  return 0;
304  }
305 
306  bool redo = false;
307  bool reinit_compressor = false;
308 
309  do {
310  int cret;
311  redo = false;
312  bool error = false;
313 
314  cret = ::BZ2_bzCompress(z_strm, flush_macro(f));
315 
316  //error handling
317  if (finish_sync == f) {
318  if (BZ_STREAM_END == cret) {
319  redo = false;
320  reinit_compressor = true;
321  }
322  else if (BZ_FINISH_OK == cret) {
323  redo = true;
324  }
325  else {
326  //serious error, throw exception
327  LOG("\terror in finish:" << cret);
328  error = true;
329  }
330  }
331  else if (full_sync == f) {
332  if (BZ_FLUSH_OK == cret) {
333  LOG("\tanother go at sync");
334  redo = true;
335  }
336  else if (BZ_RUN_OK == cret) {
337  LOG("\tsync ok");
338  redo = false;
339  }
340  else {
341  LOG("\terror in sync: " << cret);
342  error = true;
343  }
344  }
345  else if (no_sync == f) {
346  if (BZ_RUN_OK != cret) {
347  LOG("\terror compressing " << cret);
348  error = true;
349  }
350  }
351  else {
352  LOG("\tERROR: unknown flush mode " << flush_macro(f));
353  throw general_error();
354  error = true;
355  }
356 
357  if (error) {
358  raise_error(cret);
359  }
360 
361  if (f == finish_sync) { // only complete streams can be written
362  std::streamsize count = out.size - z_strm->avail_out;
363  if (count > 0) { // ignore empty blocks
364  LOG("\twriting " << count << " bytes");
365  int size = htonl(count);
366  MUTEX_LOCK
367  const std::streamsize wrote = _sb->sputn((char*)&size, 4) +
368  _sb->sputn(out.buf, count);
369  if (wrote != count + 4) {
370  MUTEX_ESCAPE
371  LOG("\terror writting, only wrote " << wrote
372  << " but asked for " << count);
373  raise_error(BZ_IO_ERROR);
374  }
375  block_start = _sb->pubseekoff(0, std::ios_base::cur,
376  std::ios_base::out);
377  block_offset = 0;
378  MUTEX_UNLOCK
379  }
380  z_strm->next_out = out.buf;
381  z_strm->avail_out = out.size;
382  }
383 
384  if ((0 == z_strm->avail_out) && (0 != z_strm->avail_in)) {
385  LOG("\tavail_out=0 => redo");
386  redo = true;
387  }
388 
389  if (!redo && appendbuf && appendsize > 0) {
390  z_strm->next_in = (char*)appendbuf;
391  z_strm->avail_in = appendsize;
392  written += appendsize;
393  appendsize = 0;
394  redo = true;
395  }
396  } while (redo);
397  assert (0 == z_strm->avail_in);
398 
399  if (reinit_compressor) {
400  int cret;
401  cret = ::BZ2_bzCompressEnd(z_strm);
402  if (BZ_OK != cret) {
403  LOG("\tERROR: BZ2_bzCompressEnd returned " << cret);
404  raise_error(cret);
405  }
406  z_strm->bzalloc = NULL;
407  z_strm->bzfree = NULL;
408  z_strm->opaque = NULL;
409  z_strm->avail_out = out.size;
410  z_strm->next_out = out.buf;
411  z_strm->avail_in = 0;
412  z_strm->next_in = in.buf;
413  cret =::BZ2_bzCompressInit(z_strm, level, 0, 30);
414  if (BZ_OK != cret) {
415  LOG("\tERROR: BZ2_bzCompressInit returned " << cret);
416  raise_error(cret);
417  }
418  }
419 
420  //reset buffer
421  setp(in.buf, in.buf + in.size);
422  return written;
423  }
424 
425  /////////////////////
426  // istream follows //
427  /////////////////////
428 
429  istreambuf::istreambuf(std::streambuf *sb, int *left, unsigned int left_size)
430  : common(sb), end(false), block_size(0), block_next(0),
431  new_block_start(0), new_block_offset(0),
432  leftovers(0)
433  {
434  LOG("bz::istreambuf");
435  int cret =::BZ2_bzDecompressInit(z_strm,
436  0, //verbosity
437  0 //no small memory
438  );
439 
440  if (BZ_OK != cret) {
441  LOG("\terror creating bz2stream " << cret);
442  raise_error(cret);
443  }
444  //initialize streambuf interface functions
445  //first call will call uflow and this will set the buffer accordingly
446  //no buffering
447  setg(out.buf, out.buf, out.buf);
448  block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::in);
449 
450  if (left_size >= sizeof(leftovers_buf)) {
451  leftovers = (leftovers_buf*)left;
452  }
453  else {
454  LOG("\terror - insufficient space for leftovers buffer");
455  raise_error(cret);
456  }
457  }
458 
459  void istreambuf::raise_error(int err){
460  std::string what = error_str(err);
461 
462  LOG("bz::istreambuf::raise_error (" << err << ") = " << what);
463 
464  if (what.size() > 0) {
465  throw decompress_error(this, what);
466  } else {
467  throw decompress_error(this);
468  }
469  }
470 
471  int istreambuf::underflow() {
472  LOG("z:istreambuf::underflow");
473 
474  if (end) {
475  LOG("\tend of stream (EOF)");
476  //signal the stream has reached it's end
477  return eof;
478  }
479 
480  if (new_block_start > 0 || new_block_offset > 0) {
481  if (block_start != new_block_start ||
482  block_offset > new_block_offset ||
483  block_size == 0)
484  {
485  z_strm->next_out = out.buf;
486  z_strm->avail_out = 0;
487  read_decompress();
488  }
489  while (block_offset < new_block_offset) {
490  z_strm->next_out = out.buf;
491  z_strm->avail_out = new_block_offset - block_offset;
492  if (z_strm->avail_out > out.size) {
493  z_strm->avail_out = out.size;
494  }
495  block_offset += z_strm->avail_out;
496  decompress();
497  }
498  new_block_start = 0;
499  new_block_offset = 0;
500  }
501 
502  z_strm->avail_out = out.size;
503  z_strm->next_out = out.buf;
504  if (0 < z_strm->avail_in) {
505  LOG("\tdata in queue, inflating");
506  decompress();
507  }
508  while (!end && z_strm->avail_out > 0) {
509  read_decompress();
510  }
511  if (end && z_strm->avail_out > 0) {
512  LOG("\tend of stream (EOF)");
513  //signal the stream has reached it's end
514  return eof;
515  }
516 
517  //set streambuf pointers
518  setg(out.buf, out.buf, z_strm->next_out);
519 
520  return int(out.buf[0]);
521  }
522 
523  //read to buffer in place (apart from data already buffered)
524  std::streamsize istreambuf::xsgetn(char *buffer, std::streamsize n) {
525  LOG("bz::istreambuf::xsgetn (" << n << ")");
526 
527  if (new_block_start > 0 || new_block_offset > 0) {
528  if (block_start != new_block_start ||
529  block_offset > new_block_offset ||
530  block_size == 0)
531  {
532  z_strm->next_out = out.buf;
533  z_strm->avail_out = 0;
534  read_decompress();
535  setg(out.buf, out.buf, out.buf);
536  }
537  else
538  {
539  std::streamsize available = egptr() - gptr();
540  int waste = new_block_offset - block_offset;
541  waste = (available < waste)? available : waste;
542  if (waste > 0) {
543  gbump(waste);
544  block_offset += waste;
545  }
546  }
547  while (block_offset < new_block_offset) {
548  z_strm->next_out = out.buf;
549  z_strm->avail_out = new_block_offset - block_offset;
550  if (z_strm->avail_out > out.size) {
551  z_strm->avail_out = out.size;
552  }
553  block_offset += z_strm->avail_out;
554  decompress();
555  }
556  new_block_start = 0;
557  new_block_offset = 0;
558  }
559 
560  //try to satisfy request from buffered input
561  std::streamsize available = egptr() - gptr();
562  int read = (available >= n)? n : available;
563  if (read) {
564  std::copy(gptr(), gptr() + read, buffer);
565  gbump(read);
566  block_offset += read;
567  }
568 
569  //inflate the rest directly into the user's buffer
570  if (read < n) {
571  if (end) {
572  LOG("\tend of stream (EOF)");
573  //signal the stream has reached it's end
574  return eof;
575  }
576 
577  z_strm->next_out = buffer + read;
578  z_strm->avail_out = n - read;
579 
580  if (0 < z_strm->avail_in) {
581  decompress();
582  }
583  while (!end && z_strm->avail_out > 0) {
584  read_decompress();
585  }
586  if (end && z_strm->avail_out > 0) {
587  LOG("\tend of stream (EOF)");
588  //signal the stream has reached it's end
589  return eof;
590  }
591  block_offset += n - read;
592  }
593  return n;
594  }
595 
596  void istreambuf::read_decompress() {
597  LOG("bz::istreambuf::read_decompress ");
598  bool reinit_decompressor = false;
599  int read;
600  if (block_size < 0) { // stream has no blocksize markers
601  MUTEX_LOCK
602  if (new_block_start > 0) {
603  _sb->pubseekoff(new_block_start, std::ios_base::beg,
604  std::ios_base::in);
605  new_block_start = 0;
606  leftovers->len = 0;
607  block_next = 0;
608  end = false;
609  }
610  read = _sb->sgetn(in.buf, in.size);
611  MUTEX_UNLOCK
612  }
613  else { // look for prefixed blocksize: leading byte = 0
614  MUTEX_LOCK
615  if (new_block_start > 0) {
616  _sb->pubseekoff(new_block_start, std::ios_base::beg,
617  std::ios_base::in);
618  block_start = new_block_start;
619  new_block_start = 0;
620  leftovers->len = 0;
621  block_next = 0;
622  end = false;
623  }
624  else {
625  block_start = _sb->pubseekoff(0, std::ios_base::cur,
626  std::ios_base::in);
627  block_start -= leftovers->len;
628  }
629  reinit_decompressor = (block_next != block_start);
630  read = leftovers->len;
631  if (read < 4) {
632  read += _sb->sgetn(leftovers->buf + read, 4 - read);
633  if (read != 4) {
634  end = true;
635  MUTEX_ESCAPE
636  return;
637  }
638  }
639  if (leftovers->buf[0] == 0) { // bz2 blocks have prefixed blocksize
640  int *size = (int*)leftovers->buf;
641  block_size = ntohl(*size);
642  read -= 4;
643  if (reinit_decompressor && read > 0) {
644  std::memcpy(in.buf, leftovers->buf + 4, read);
645  read += _sb->sgetn(in.buf + read, block_size - read);
646  }
647  else {
648  read = _sb->sgetn(in.buf, block_size - read);
649  }
650  leftovers->len = _sb->sgetn(leftovers->buf, 8);
651  if (leftovers->len > 4) {
652  std::memcpy(in.buf + read, leftovers->buf + 4,
653  leftovers->len - 4);
654  read += leftovers->len - 4;
655  }
656  }
657  else { // bz2 blocks are jammed together, no blocksize available
658  read += _sb->sgetn(in.buf + read, in.size - read);
659  leftovers->len = 0;
660  block_size = -1;
661  }
662  block_next = _sb->pubseekoff(0, std::ios_base::cur,
663  std::ios_base::in);
664  block_next -= leftovers->len;
665  MUTEX_UNLOCK
666  }
667  LOG("\tread " << read << " bytes");
668  block_offset = 0;
669 
670  if (0 == read) {
671  end = true;
672  return;
673  }
674 
675  // We want to be able to start decompression at an arbitrary position
676  // in the input stream. This is possible with bzip2 streams, but there
677  // is a problem that the compressed blocks are arbitrary numbers of
678  // bits long and they are catenated one after another in a bit stream
679  // without any reference to byte boundaries. This makes it difficult
680  // to jump into the middle of a stream and start the decompressor
681  // since it expects a stream header followed by the first block that
682  // happens to start on a byte boundary. To make this work, I splice
683  // an artificial stream header followed by a dummy compressed block
684  // onto the beginning of the input stream, where the length of the
685  // dummy block in bits is chosen so that it abutts without padding
686  // the next block in the input stream. I have prepared 8 dummy blocks
687  // so there should be one to match the alignment of any input block.
688  // To match them, I look for the first place in the input stream
689  // with a byte string that matches the last 5 bytes in one of my
690  // dummy headers, and then I inject the dummy header into the
691  // decompressor ahead of the actual data. The dummy blocks are all
692  // contrived to decompress to an 8-byte string, so throwing away the
693  // first 8 bytes out of the decompressor, it is primed to decompress
694  // the remaining stream without any need for bit-shifting the input.
695 
696  const char* head = (const char*)bz_header;
697  if (reinit_decompressor) {
698  // reinitialize bzlib structure
699  int saved_buflen = z_strm->avail_out;
700  char *saved_buffer = z_strm->next_out;
701  int cret = ::BZ2_bzDecompressEnd(z_strm);
702  if (BZ_OK != cret) {
703  LOG("\tERROR: BZ2_bzDecompressEnd returned " << cret);
704  }
705  z_strm->bzalloc = NULL;
706  z_strm->bzfree = NULL;
707  z_strm->opaque = NULL;
708  cret = ::BZ2_bzDecompressInit(z_strm, 0, 0);
709  if (BZ_OK != cret) {
710  LOG("\terror creating bz2stream " << cret);
711  raise_error(cret);
712  }
713  if (strncmp(head, in.buf, 8) != 0) {
714  int hdr;
715  int splice;
716  int match = 10;
717  for (hdr = 0; hdr < 8; ++hdr) {
718  splice = bz_header_length[hdr] - 5;
719  const char* shead = (const char*)bz_header[hdr];
720  for (match = 1; match < 10; ++match) {
721  if (strncmp(&in.buf[match], &shead[splice], 5) == 0)
722  break;
723  }
724  if (match < 10)
725  break;
726  }
727  if (hdr > 7) {
728  LOG("\tbz2 stream format error on input");
729  raise_error(BZ_DATA_ERROR_MAGIC);
730  }
731  char dummy_buffer[10];
732  z_strm->next_out = dummy_buffer;
733  z_strm->avail_out = 8;
734  if (hdr == 3)
735  z_strm->avail_out = 9;
736  while (match > 0)
737  in.buf[--match] = (const char)bz_header[hdr][--splice];
738  z_strm->avail_in = splice;
739  z_strm->next_in = (char*)bz_header[hdr];
740  decompress(); // waste the first 8 bytes
741  }
742  z_strm->avail_out = saved_buflen;
743  z_strm->next_out = saved_buffer;
744  }
745  z_strm->next_in = in.buf;
746  z_strm->avail_in = read;
747  decompress();
748  }
749 
750  void istreambuf::decompress() {
751  LOG("bz::istreambuf::decompress ");
752 
753  int cret = ::BZ2_bzDecompress(z_strm);
754 
755  const int* head = (const int*)bz_header;
756  int* buf = (int*)z_strm->next_in;
757 
758  if (BZ_STREAM_END == cret) {
759  z_strm->avail_in = 0;
760  block_next = 0;
761  }
762  else if (cret == BZ_DATA_ERROR && z_strm->avail_in == 0) {
763  // Ignore CRC errors at the end of stream because we may not have
764  // started decompressing at the beginning. We can rely on the CRC
765  // checks that are present within each compressed block anyway.
766  end = true;
767  }
768  else if (cret == BZ_DATA_ERROR && z_strm->avail_in == 4 && *buf == *head)
769  {
770  // Decompressor may complain that the first 4 bytes of the next
771  // input block were appended to the previous block, if it had
772  // a stream terminus. This is not the true end of the stream,
773  // and not an error, just ignore it and reset block_next so
774  // that the next buffer is treated as a new stream.
775  z_strm->avail_in = 0;
776  block_next = 0;
777  end = false;
778  }
779  else if (BZ_OK != cret) {
780  printf("bz2 input stream crapping out, cret is %d\n", cret);
781  LOG("\terror decompressing: " << cret);
782  raise_error(cret);
783  }
784  }
785 
786  istreambuf::~istreambuf() {
787  LOG("bz::~istreambuf");
788  if (0 != z_strm) {
789  int cret = ::BZ2_bzDecompressEnd(z_strm);
790  if (BZ_OK != cret) {
791  LOG("\tERROR: BZ2_bzDecompressEnd returned " << cret);
792  }
793  }
794  }
795 
796 }//namespace bz
797 }//namespace xstream
798 
799 #endif //bzlib
if(locHist_BCALShowerPhiVsZ!=NULL)
debugging/logging support
char string[256]
#define c
TF1 * f
Definition: FitGains.C:21
static const int eof
Definition: base64.cpp:14
C++ streambuf interface to read and write bzip2 streams.
static const size_t block_size
Definition: src/md5.cpp:54
printf("string=%s", string)
#define LOG(s)
Definition: debug.h:30
exceptions related to bzlib usage xstream::bz namespace