17 #include <arpa/inet.h>
30 if (streambuf_mutex != 0) \
31 pthread_mutex_lock(streambuf_mutex); \
32 pthread_mutex_t *mutex_saved = streambuf_mutex; \
35 #define MUTEX_UNLOCK \
36 streambuf_mutex = mutex_saved; \
37 if (streambuf_mutex != 0) \
38 pthread_mutex_unlock(streambuf_mutex); \
41 #define MUTEX_ESCAPE \
42 streambuf_mutex = mutex_saved; \
43 if (streambuf_mutex != 0) \
44 pthread_mutex_unlock(streambuf_mutex);
52 static const int bz_header_length[8] = {35,36,34,44,40,38,40,43};
53 static const unsigned char bz_header[8][48] = {
54 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
55 0x53, 0x59, 0x86, 0xad, 0x3f, 0xf0, 0x00, 0x00,
56 0x02, 0x48, 0x00, 0x04, 0x00, 0x20, 0x00, 0x20,
57 0x00, 0x21, 0x00, 0x82, 0x0b, 0x31, 0x41, 0x59,
59 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
60 0x53, 0x59, 0x7b, 0x68, 0x3e, 0x4a, 0x00, 0x00,
61 0x01, 0x88, 0x00, 0x0f, 0xc0, 0x20, 0x00, 0x21,
62 0x80, 0x0c, 0x01, 0x37, 0xa4, 0xbb, 0x98, 0xa0,
63 0xac, 0x93, 0x29, 0xac },
64 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
65 0x53, 0x59, 0x83, 0x69, 0xfc, 0x04, 0x00, 0x00,
66 0x01, 0x88, 0x00, 0x30, 0x00, 0x20, 0x00, 0x30,
67 0x80, 0x2a, 0x69, 0x11, 0xcc, 0x50, 0x56, 0x49,
69 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
70 0x53, 0x59, 0xd5, 0x0d, 0x1c, 0x28, 0x00, 0x00,
71 0x01, 0x51, 0x80, 0x00, 0x10, 0x01, 0x01, 0x80,
72 0x02, 0x01, 0x80, 0x20, 0x00, 0x31, 0x0c, 0x01,
73 0x06, 0x9b, 0x47, 0xe7, 0x38, 0x04, 0xa6, 0x28,
74 0x2b, 0x24, 0xca, 0x6b },
75 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
76 0x53, 0x59, 0x44, 0x1f, 0x23, 0x2f, 0x00, 0x00,
77 0x02, 0x11, 0x00, 0x00, 0x00, 0xa5, 0xa2, 0xa0,
78 0x00, 0x22, 0x06, 0x9a, 0x7a, 0x10, 0xc0, 0x8e,
79 0x10, 0xd8, 0x43, 0x14, 0x15, 0x92, 0x65, 0x35 },
80 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
81 0x53, 0x59, 0x83, 0x90, 0x27, 0xae, 0x00, 0x00,
82 0x01, 0x81, 0x80, 0x0c, 0x00, 0x14, 0x20, 0x20,
83 0x00, 0x21, 0x86, 0x81, 0x9a, 0x09, 0x4d, 0xa8,
84 0xb9, 0x8a, 0x0a, 0xc9, 0x32, 0x9a },
85 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
86 0x53, 0x59, 0x8d, 0x4f, 0x1e, 0x72, 0x00, 0x00,
87 0x04, 0x41, 0x80, 0x40, 0x00, 0x00, 0x20, 0x14,
88 0x60, 0x20, 0x00, 0x30, 0xc0, 0x08, 0x63, 0x45,
89 0x84, 0x0f, 0xb8, 0xc5, 0x05, 0x64, 0x99, 0x4d },
90 { 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
91 0x53, 0x59, 0x64, 0xf2, 0x3a, 0xdc, 0x00, 0x00,
92 0x00, 0x9e, 0x00, 0x04, 0x00, 0x30, 0x00, 0x02,
93 0x08, 0x08, 0x80, 0x20, 0x00, 0x31, 0x0c, 0x01,
94 0x06, 0x99, 0xa4, 0xe4, 0x4c, 0x0a, 0x62, 0x82,
100 struct pimpl:
public bz_stream {};
102 static inline int flush_macro(
const flush_kind
f) {
116 common::common(std::streambuf *sb)
117 : xstream::common_buffer(sb), z_strm(0), block_start(0), block_offset(0),
125 z_strm->bzalloc = NULL;
126 z_strm->bzfree = NULL;
127 z_strm->opaque = NULL;
129 z_strm->avail_out = out.size;
130 z_strm->next_out = out.buf;
132 z_strm->avail_in = 0;
133 z_strm->next_in = in.buf;
136 unsigned long int common::input_count()
const {
137 return ((uint64_t)(z_strm->total_in_hi32)<< 32) + (uint64_t)(z_strm->total_in_lo32);
140 unsigned long int common::output_count()
const {
141 return ((uint64_t)(z_strm->total_out_hi32)<< 32) + (uint64_t)(z_strm->total_out_lo32);
151 ostreambuf::ostreambuf(std::streambuf * sb)
152 : common(sb), level(9) {
153 LOG(
"bz::ostreambuf without compression level");
154 block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::out);
158 ostreambuf::ostreambuf (std::streambuf * sb,
int l)
159 : common(sb), level(l) {
160 LOG(
"bz::ostreambuf with compression level " << l);
161 block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::out);
165 const char* error_str(
int err) {
168 return "out of memory";
169 case BZ_CONFIG_ERROR:
170 return "bzlib badly configured (bad sizes of int32 (4), int16 (2) or char (1), check and recompile)";
172 return "invalid parameter, possibly invalid compression level";
173 case BZ_SEQUENCE_ERROR:
174 return "bad sequence (this means xstream is buggy)";
176 return "invalid or incomplete data (crc failed)";
177 case BZ_DATA_ERROR_MAGIC:
178 return "magic bytes not found in stream";
181 case BZ_UNEXPECTED_EOF:
182 return "premature end of data";
183 case BZ_OUTBUFF_FULL:
184 return "output buffer full";
187 return "unknown error";
190 void ostreambuf::raise_error(
int err) {
193 LOG(
"bz::ostreambuf::raise_error (" << err <<
") = " << what);
195 if (what.size() > 0) {
196 throw compress_error(
this,what);
198 throw compress_error(
this);
203 void ostreambuf::init() {
204 LOG(
"bz::ostreambuf::init");
205 int cret =::BZ2_bzCompressInit(
213 LOG(
"bz::ostreambuf::init: error creating bz2stream " << cret);
217 setp(in.buf, in.buf + in.size);
220 ostreambuf::~ostreambuf() {
221 LOG(
"bz::ostreambuf::~ostreambuf");
233 int cret = ::BZ2_bzCompressEnd(z_strm);
235 LOG(
"\tERROR: BZ2_bzCompressEnd returned " << cret);
240 int ostreambuf::sync () {
241 LOG(
"bz::ostreambuf::sync");
244 ret = flush(finish_sync);
251 int ostreambuf::overflow (
int c) {
252 LOG(
"bz::ostreambuf::overflow(" << c <<
")\t available=" << (available ()) <<
"\tEOF=" <<
eof);
258 if (0 == available()) {
259 LOG(
"\t have to flush :[]");
262 *pptr() = static_cast <
char >(
c);
268 std::streamsize ostreambuf::xsputn(
const char *buffer, std::streamsize n) {
269 LOG(
"bz::ostreambuf::xsputn(" << buffer <<
"," << n <<
")");
271 return flush(no_sync, buffer, n);
274 int ostreambuf::flush(flush_kind f,
const char *appendbuf,
int appendsize) {
275 LOG(
"bz::ostreambuf::flush(" << f <<
")");
276 std::streamsize in_s = taken();
277 LOG(
"\tinput_size=" << in_s);
282 z_strm->next_in = pbase();
283 z_strm->avail_in = in_s;
285 }
else if (appendsize > 0) {
286 z_strm->next_in = (
char*)appendbuf;
287 z_strm->avail_in = appendsize;
288 written = appendsize;
291 z_strm->next_in = pbase();
292 z_strm->avail_in = 0;
295 block_offset += written;
296 if (block_offset > (std::streamoff)level * 100000) {
297 f = (f == no_sync)? finish_sync : f;
300 if (z_strm->avail_in +
301 z_strm->total_in_lo32 + z_strm->total_in_hi32 == 0)
307 bool reinit_compressor =
false;
314 cret = ::BZ2_bzCompress(z_strm, flush_macro(f));
317 if (finish_sync == f) {
318 if (BZ_STREAM_END == cret) {
320 reinit_compressor =
true;
322 else if (BZ_FINISH_OK == cret) {
327 LOG(
"\terror in finish:" << cret);
331 else if (full_sync == f) {
332 if (BZ_FLUSH_OK == cret) {
333 LOG(
"\tanother go at sync");
336 else if (BZ_RUN_OK == cret) {
341 LOG(
"\terror in sync: " << cret);
345 else if (no_sync == f) {
346 if (BZ_RUN_OK != cret) {
347 LOG(
"\terror compressing " << cret);
352 LOG(
"\tERROR: unknown flush mode " << flush_macro(f));
353 throw general_error();
361 if (f == finish_sync) {
362 std::streamsize count = out.size - z_strm->avail_out;
364 LOG(
"\twriting " << count <<
" bytes");
365 int size = htonl(count);
367 const std::streamsize wrote = _sb->sputn((
char*)&size, 4) +
368 _sb->sputn(out.buf, count);
369 if (wrote != count + 4) {
371 LOG(
"\terror writting, only wrote " << wrote
372 <<
" but asked for " << count);
373 raise_error(BZ_IO_ERROR);
375 block_start = _sb->pubseekoff(0, std::ios_base::cur,
380 z_strm->next_out = out.buf;
381 z_strm->avail_out = out.size;
384 if ((0 == z_strm->avail_out) && (0 != z_strm->avail_in)) {
385 LOG(
"\tavail_out=0 => redo");
389 if (!redo && appendbuf && appendsize > 0) {
390 z_strm->next_in = (
char*)appendbuf;
391 z_strm->avail_in = appendsize;
392 written += appendsize;
397 assert (0 == z_strm->avail_in);
399 if (reinit_compressor) {
401 cret = ::BZ2_bzCompressEnd(z_strm);
403 LOG(
"\tERROR: BZ2_bzCompressEnd returned " << cret);
406 z_strm->bzalloc = NULL;
407 z_strm->bzfree = NULL;
408 z_strm->opaque = NULL;
409 z_strm->avail_out = out.size;
410 z_strm->next_out = out.buf;
411 z_strm->avail_in = 0;
412 z_strm->next_in = in.buf;
413 cret =::BZ2_bzCompressInit(z_strm, level, 0, 30);
415 LOG(
"\tERROR: BZ2_bzCompressInit returned " << cret);
421 setp(in.buf, in.buf + in.size);
429 istreambuf::istreambuf(std::streambuf *sb,
int *left,
unsigned int left_size)
430 : common(sb), end(false),
block_size(0), block_next(0),
431 new_block_start(0), new_block_offset(0),
434 LOG(
"bz::istreambuf");
435 int cret =::BZ2_bzDecompressInit(z_strm,
441 LOG(
"\terror creating bz2stream " << cret);
447 setg(out.buf, out.buf, out.buf);
448 block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::in);
450 if (left_size >=
sizeof(leftovers_buf)) {
451 leftovers = (leftovers_buf*)left;
454 LOG(
"\terror - insufficient space for leftovers buffer");
459 void istreambuf::raise_error(
int err){
462 LOG(
"bz::istreambuf::raise_error (" << err <<
") = " << what);
464 if (what.size() > 0) {
465 throw decompress_error(
this, what);
467 throw decompress_error(
this);
471 int istreambuf::underflow() {
472 LOG(
"z:istreambuf::underflow");
475 LOG(
"\tend of stream (EOF)");
480 if (new_block_start > 0 || new_block_offset > 0) {
481 if (block_start != new_block_start ||
482 block_offset > new_block_offset ||
485 z_strm->next_out = out.buf;
486 z_strm->avail_out = 0;
489 while (block_offset < new_block_offset) {
490 z_strm->next_out = out.buf;
491 z_strm->avail_out = new_block_offset - block_offset;
492 if (z_strm->avail_out > out.size) {
493 z_strm->avail_out = out.size;
495 block_offset += z_strm->avail_out;
499 new_block_offset = 0;
502 z_strm->avail_out = out.size;
503 z_strm->next_out = out.buf;
504 if (0 < z_strm->avail_in) {
505 LOG(
"\tdata in queue, inflating");
508 while (!end && z_strm->avail_out > 0) {
511 if (end && z_strm->avail_out > 0) {
512 LOG(
"\tend of stream (EOF)");
518 setg(out.buf, out.buf, z_strm->next_out);
520 return int(out.buf[0]);
524 std::streamsize istreambuf::xsgetn(
char *buffer, std::streamsize n) {
525 LOG(
"bz::istreambuf::xsgetn (" << n <<
")");
527 if (new_block_start > 0 || new_block_offset > 0) {
528 if (block_start != new_block_start ||
529 block_offset > new_block_offset ||
532 z_strm->next_out = out.buf;
533 z_strm->avail_out = 0;
535 setg(out.buf, out.buf, out.buf);
539 std::streamsize available = egptr() - gptr();
540 int waste = new_block_offset - block_offset;
541 waste = (available < waste)? available : waste;
544 block_offset += waste;
547 while (block_offset < new_block_offset) {
548 z_strm->next_out = out.buf;
549 z_strm->avail_out = new_block_offset - block_offset;
550 if (z_strm->avail_out > out.size) {
551 z_strm->avail_out = out.size;
553 block_offset += z_strm->avail_out;
557 new_block_offset = 0;
561 std::streamsize available = egptr() - gptr();
562 int read = (available >= n)? n : available;
564 std::copy(gptr(), gptr() + read, buffer);
566 block_offset += read;
572 LOG(
"\tend of stream (EOF)");
577 z_strm->next_out = buffer + read;
578 z_strm->avail_out = n - read;
580 if (0 < z_strm->avail_in) {
583 while (!end && z_strm->avail_out > 0) {
586 if (end && z_strm->avail_out > 0) {
587 LOG(
"\tend of stream (EOF)");
591 block_offset += n - read;
596 void istreambuf::read_decompress() {
597 LOG(
"bz::istreambuf::read_decompress ");
598 bool reinit_decompressor =
false;
602 if (new_block_start > 0) {
603 _sb->pubseekoff(new_block_start, std::ios_base::beg,
610 read = _sb->sgetn(in.buf, in.size);
615 if (new_block_start > 0) {
616 _sb->pubseekoff(new_block_start, std::ios_base::beg,
618 block_start = new_block_start;
625 block_start = _sb->pubseekoff(0, std::ios_base::cur,
627 block_start -= leftovers->len;
629 reinit_decompressor = (block_next != block_start);
630 read = leftovers->len;
632 read += _sb->sgetn(leftovers->buf + read, 4 - read);
639 if (leftovers->buf[0] == 0) {
640 int *size = (
int*)leftovers->buf;
643 if (reinit_decompressor && read > 0) {
644 std::memcpy(in.buf, leftovers->buf + 4, read);
645 read += _sb->sgetn(in.buf + read,
block_size - read);
650 leftovers->len = _sb->sgetn(leftovers->buf, 8);
651 if (leftovers->len > 4) {
652 std::memcpy(in.buf + read, leftovers->buf + 4,
654 read += leftovers->len - 4;
658 read += _sb->sgetn(in.buf + read, in.size - read);
662 block_next = _sb->pubseekoff(0, std::ios_base::cur,
664 block_next -= leftovers->len;
667 LOG(
"\tread " << read <<
" bytes");
696 const char* head = (
const char*)bz_header;
697 if (reinit_decompressor) {
699 int saved_buflen = z_strm->avail_out;
700 char *saved_buffer = z_strm->next_out;
701 int cret = ::BZ2_bzDecompressEnd(z_strm);
703 LOG(
"\tERROR: BZ2_bzDecompressEnd returned " << cret);
705 z_strm->bzalloc = NULL;
706 z_strm->bzfree = NULL;
707 z_strm->opaque = NULL;
708 cret = ::BZ2_bzDecompressInit(z_strm, 0, 0);
710 LOG(
"\terror creating bz2stream " << cret);
713 if (strncmp(head, in.buf, 8) != 0) {
717 for (hdr = 0; hdr < 8; ++hdr) {
718 splice = bz_header_length[hdr] - 5;
719 const char* shead = (
const char*)bz_header[hdr];
720 for (match = 1; match < 10; ++match) {
721 if (strncmp(&in.buf[match], &shead[splice], 5) == 0)
728 LOG(
"\tbz2 stream format error on input");
729 raise_error(BZ_DATA_ERROR_MAGIC);
731 char dummy_buffer[10];
732 z_strm->next_out = dummy_buffer;
733 z_strm->avail_out = 8;
735 z_strm->avail_out = 9;
737 in.buf[--match] = (
const char)bz_header[hdr][--splice];
738 z_strm->avail_in = splice;
739 z_strm->next_in = (
char*)bz_header[hdr];
742 z_strm->avail_out = saved_buflen;
743 z_strm->next_out = saved_buffer;
745 z_strm->next_in = in.buf;
746 z_strm->avail_in = read;
750 void istreambuf::decompress() {
751 LOG(
"bz::istreambuf::decompress ");
753 int cret = ::BZ2_bzDecompress(z_strm);
755 const int* head = (
const int*)bz_header;
756 int* buf = (
int*)z_strm->next_in;
758 if (BZ_STREAM_END == cret) {
759 z_strm->avail_in = 0;
762 else if (cret == BZ_DATA_ERROR && z_strm->avail_in == 0) {
768 else if (cret == BZ_DATA_ERROR && z_strm->avail_in == 4 && *buf == *head)
775 z_strm->avail_in = 0;
779 else if (BZ_OK != cret) {
780 printf(
"bz2 input stream crapping out, cret is %d\n", cret);
781 LOG(
"\terror decompressing: " << cret);
786 istreambuf::~istreambuf() {
787 LOG(
"bz::~istreambuf");
789 int cret = ::BZ2_bzDecompressEnd(z_strm);
791 LOG(
"\tERROR: BZ2_bzDecompressEnd returned " << cret);
if(locHist_BCALShowerPhiVsZ!=NULL)
debugging/logging support
C++ streambuf interface to read and write bzip2 streams.
static const size_t block_size
printf("string=%s", string)
exceptions related to bzlib usage xstream::bz namespace