Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
z.cpp
Go to the documentation of this file.
1 #include <xstream/config.h>
2 #include <fstream>
3 
4 #if HAVE_LIBZ
5 
6 #include <algorithm>
7 #include <string.h>
8 #include <string>
9 #include <cstring>
10 
11 #include <xstream/z.h>
12 #include <xstream/except/z.h>
13 #include <stdexcept>
14 
15 #include <stdio.h>
16 #include <zlib.h>
17 #include <arpa/inet.h>
18 
19 #include <cassert>
20 
21 #include "debug.h"
22 
23 #define COMPRESSION_BLOCK_SIZE 32000
24 
25 // The following two macros must always occur in pairs within a single
26 // block of code, otherwise it will not even compile. This is done on
27 // purpose, to reduce the risk of blunders with deadlocks. Please take
28 // the lock, do the operation, and then release the lock as quickly as
29 // possible. If your function needs to return between the MUTEX_LOCK and
30 // MUTEX_UNLOCK statements, use MUTEX_ESCAPE before the return statement.
31 
32 #define MUTEX_LOCK \
33  { \
34  if (streambuf_mutex != 0) \
35  pthread_mutex_lock(streambuf_mutex); \
36  pthread_mutex_t *mutex_saved = streambuf_mutex; \
37  streambuf_mutex = 0;
38 
39 #define MUTEX_UNLOCK \
40  streambuf_mutex = mutex_saved; \
41  if (streambuf_mutex != 0) \
42  pthread_mutex_unlock(streambuf_mutex); \
43  }
44 
45 #define MUTEX_ESCAPE \
46  streambuf_mutex = mutex_saved; \
47  if (streambuf_mutex != 0) \
48  pthread_mutex_unlock(streambuf_mutex);
49 
50 namespace xstream {
51 namespace z {
52 
53 // define the standard header for a zlib stream that can be used
54 // to prime the inflate engine to start at an arbitrary block in
55 // an input stream
56 
57  static int z_header_length = 2;
58  static unsigned char z_header[2] = {0x78, 0x9c};
59 
60  struct pimpl: public z_stream {};
61 
62  static const int eof = std::streambuf::traits_type::eof();
63 
64  static inline int flush_macro(const flush_kind f) {
65  switch (f) {
66  case no_sync:
67  return Z_NO_FLUSH;
68  case normal_sync:
69  return Z_SYNC_FLUSH;
70  case full_sync:
71  return Z_FULL_FLUSH;
72  case finish_sync:
73  return Z_FINISH;
74  case block_sync:
75 #ifndef Z_BLOCK
76 # define Z_BLOCK 5
77 #endif
78  return Z_BLOCK;
79  default:
80  //should probably throw
81  return Z_NO_FLUSH;
82  }
83  }
84 
85  const char* error_str(int err) {
86  switch(err) {
87  case Z_MEM_ERROR:
88  return "out of memory";
89  case Z_VERSION_ERROR:
90  return "zlib version mismatch";
91  case Z_DATA_ERROR:
92  return "invalid or incomplete data";
93  case Z_STREAM_ERROR:
94  return "stream error";
95  case Z_NEED_DICT:
96  return "need dictionary";
97  case Z_STREAM_END:
98  return "stream end";
99  case Z_BUF_ERROR:
100  return "buffer error";
101  }
102 
103  return "unknown error";
104  }
105 
106 
107  common::common(std::streambuf * sb)
108  : xstream::common_buffer(sb), z_strm(0), block_start(0), block_offset(0),
109  streambuf_mutex(0)
110  {
111  LOG("z::common");
112 
113  z_strm = new pimpl;
114 
115  //initialize zlib structure
116  z_strm->zalloc = Z_NULL;
117  z_strm->zfree = Z_NULL;
118  z_strm->opaque = Z_NULL;
119  //buffers
120  z_strm->avail_out = out.size;
121  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
122 
123  z_strm->avail_in = 0;
124  z_strm->next_in = reinterpret_cast < Bytef* >(in.buf);
125 
126  }
127 
128  void common::grow_out (unsigned int factor) {
129 
130  const size_t taken = out.size - z_strm->avail_out;
131 
132  out.grow(factor);
133 
134  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf + taken);
135  z_strm->avail_out = out.size - taken;
136  }
137 
138  unsigned long int common::input_count() const {
139  return z_strm->total_in;
140  }
141 
142  unsigned long int common::output_count() const {
143  return z_strm->total_out;
144  }
145 
146  unsigned long int common::checksum() const {
147  return z_strm->adler;
148  }
149 
150  common::~common() {
151  LOG("z::~common");
152  delete z_strm;
153  }
154 
155  ostreambuf::ostreambuf (std::streambuf * sb)
156  : common(sb), level(Z_DEFAULT_COMPRESSION) {
157  LOG("z::ostreambuf without compression level");
158  block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::out);
159  init();
160  }
161 
162  ostreambuf::ostreambuf(std::streambuf *sb, int l)
163  : common(sb), level (l) {
164  LOG ("z::ostreambuf with compression level " << l);
165  block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::out);
166  init();
167  }
168 
169  void ostreambuf::raise_error(int err) {
170  std::string what = error_str(err);
171 
172  LOG("z::ostreambuf::raise_error (" << err << ") = " << what);
173 
174  if (what.size() > 0) {
175  throw compress_error(this, what);
176  } else {
177  throw compress_error(this);
178  }
179  }
180 
181  void ostreambuf::init() {
182  LOG ("z::ostreambuf::init");
183 
184  if (Z_DEFAULT_COMPRESSION == level || (level <= 9 && level >= 1)) {
185  int cret =::deflateInit(z_strm, level);
186  if (Z_OK != cret) {
187  LOG ("z::ostreambuf::init: error creating zstream " << cret);
188  //XXX exception ins constructor
189  raise_error(cret);
190  }
191  //initialize streambuf interface functions
192  setp(in.buf, in.buf + in.size);
193  } else {
194  char str[256];
195  sprintf(str, "invalid compression level %d", level);
196  throw std::domain_error(str);
197  }
198  }
199 
200  ostreambuf::~ostreambuf() {
201  LOG ("z::ostreambuf::~ostreambuf");
202  //sync (write remaining data)
203  flush(finish_sync);
204 
205  //sync underlying streambuf
206  MUTEX_LOCK
207  _sb->pubsync();
208  MUTEX_UNLOCK
209 
210  if (0 != z_strm) {
211  //XXX should I throw an exception in case of error?
212  //remember this is a destructor
213  //I should definitely LOG something
214  int cret = ::deflateEnd(z_strm);
215  if (Z_OK != cret){
216  LOG("z::~ostreambuf error dealocating zstream");
217  }
218  }
219  }
220 
221  int ostreambuf::sync () {
222  LOG ("z::ostreambuf::sync");
223  int ret;
224  MUTEX_LOCK
225  ret = flush(finish_sync);
226  _sb->pubsync();
227  MUTEX_UNLOCK
228  return ret;
229  }
230 
231 
232  int ostreambuf::overflow(int c) {
233  LOG ("z::ostreambuf::overflow(" << c << ")\t available=" << (available ()) << "\tEOF=" << eof);
234  if (eof == c) {
235  LOG ("\tEOF");
236  flush(no_sync);
237  return eof;
238  } else {
239  if (0 == available ()) {
240  LOG ("\t have to flush :[]");
241  flush(no_sync);
242  }
243  *pptr () = static_cast < char >(c);
244  pbump (1);
245  }
246  return c;
247  }
248 
249  std::streamsize ostreambuf::xsputn (const char *buffer, std::streamsize n) {
250  LOG ("z::ostreambuf::xsputn(" << buffer << "," << n << ")");
251 
252  return flush(no_sync, buffer, n);
253  }
254 
255  int ostreambuf::flush(flush_kind f, const char *appendbuf, int appendsize) {
256  LOG ("z::ostreambuf::flush(" << f << ")");
257  std::streamsize in_s = taken ();
258  LOG ("\tinput_size=" << in_s);
259 
260  //set up compression engine input feed
261  int written;
262  if (in_s > 0) {
263  z_strm->next_in = reinterpret_cast < Bytef* >(pbase());
264  z_strm->avail_in = in_s;
265  written = in_s;
266  } else if (appendsize > 0) {
267  z_strm->next_in = (Bytef*)appendbuf;
268  z_strm->avail_in = appendsize;
269  written = appendsize;
270  appendsize = 0;
271  } else {
272  z_strm->next_in = reinterpret_cast < Bytef* >(pbase());
273  z_strm->avail_in = 0;
274  written = 0;
275  }
276  block_offset += written;
277  if (block_offset > (std::streamoff)COMPRESSION_BLOCK_SIZE) {
278  f = (f == no_sync)? finish_sync : f;
279  }
280 
281  if (z_strm->avail_in + z_strm->total_in == 0)
282  return 0;
283 
284  bool redo = false;
285  bool reinit_deflator = false;
286 
287  do {
288  int cret;
289  redo = false;
290 
291  do {
292  cret = ::deflate(z_strm, flush_macro(f));
293 
294  if (finish_sync == f && Z_OK == cret) {
295  grow_out();
296  continue;
297  } else {
298  break;
299  }
300  } while (1);
301 
302  //error handling
303  if (f == finish_sync) {
304  if (Z_STREAM_END == cret) {
305  redo = false;
306  reinit_deflator = true;
307  }
308  else {
309  //serious error, throw exception
310  LOG ("\terror :" << cret);
311  }
312  } else if (Z_OK != cret) {
313  LOG ("\terror deflating " << cret);
314  //XXX throw exception here
315  raise_error(cret);
316  }
317 
318  if (f == finish_sync) { // only completed streams can be written
319  std::streamsize count = out.size - z_strm->avail_out;
320  if (count > 0) { // ignore empty blocks
321  LOG ("\twriting " << count << " bytes");
322  int size = htonl(count);
323  MUTEX_LOCK
324  const std::streamsize wrote = _sb->sputn((char*)&size, 4) +
325  _sb->sputn(out.buf, count);
326  if (wrote != count + 4) {
327  MUTEX_ESCAPE
328  LOG("\terror writting, only wrote " << wrote
329  << " but asked for " << count);
330  raise_error(Z_STREAM_ERROR);
331  }
332  block_start = _sb->pubseekoff(0, std::ios_base::cur,
333  std::ios_base::out);
334  block_offset = 0;
335  MUTEX_UNLOCK
336  }
337  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
338  z_strm->avail_out = out.size;
339  }
340 
341  if (0 == z_strm->avail_out) { // && 0 != z_strm->avail_in)
342  LOG("\tavail_out=0 => redo");
343  redo = true;
344  }
345 
346  if (!redo && appendbuf && appendsize > 0) {
347  z_strm->next_in = (Bytef*)appendbuf;
348  z_strm->avail_in = appendsize;
349  written += appendsize;
350  appendsize = 0;
351  redo = true;
352  }
353  }
354  while (redo);
355  assert (0 == z_strm->avail_in);
356 
357  if (reinit_deflator) {
358  int cret;
359  cret = ::deflateEnd(z_strm);
360  if (Z_OK != cret) {
361  LOG("\tERROR: deflateEnd returned " << cret);
362  raise_error(cret);
363  }
364  z_strm->zalloc = Z_NULL;
365  z_strm->zfree = Z_NULL;
366  z_strm->opaque = Z_NULL;
367  z_strm->avail_out = out.size;
368  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
369  z_strm->avail_in = 0;
370  z_strm->next_in = reinterpret_cast < Bytef* >(in.buf);
371  cret =::deflateInit(z_strm, level);
372  if (Z_OK != cret) {
373  LOG("\tERROR: deflateInit returned " << cret);
374  raise_error(cret);
375  }
376  }
377 
378  //reset buffer
379  setp(in.buf, in.buf + in.size);
380  return written;
381  }
382 
383  /////////////////////
384  // istream follows //
385  /////////////////////
386 
387  istreambuf::istreambuf (std::streambuf *sb, int *left, unsigned int left_size)
388  : common(sb), end(false), block_size(0), block_next(0),
389  new_block_start(0), new_block_offset(0),
390  leftovers(0)
391  {
392  LOG ("z::istreambuf");
393 
394  memset(z_strm, 0, sizeof(*z_strm));
395  int cret = ::inflateInit(z_strm);
396 
397  if (Z_OK != cret) {
398  LOG ("\terror creating zstream " << cret);
399  //XXX throw exception here
400  raise_error(cret);
401  }
402  //initialize streambuf interface functions
403  //first call will call uflow and this will set the buffer accordingly
404  //no buffering
405  setg(out.buf, out.buf, out.buf);
406  block_start = _sb->pubseekoff(0, std::ios_base::cur, std::ios_base::in);
407 
408  if (left_size >= sizeof(leftovers_buf)) {
409  leftovers = (leftovers_buf*)left;
410  }
411  else {
412  LOG("\terror - insufficient space for leftovers buffer");
413  raise_error(cret);
414  }
415  }
416 
417  void istreambuf::raise_error(int err) {
418  std::string what = error_str(err);
419 
420  LOG("z::istreambuf::raise_error (" << err << ") = " << what);
421 
422  if (what.size() > 0) {
423  throw decompress_error(this, what);
424  } else {
425  throw decompress_error(this);
426  }
427  }
428 
429  int istreambuf::underflow() {
430  LOG("z:istreambuf::underflow");
431 
432  if (end) {
433  LOG("\tend of stream (EOF)");
434  //signal the stream has reached it's end
435  return eof;
436  }
437 
438  if (new_block_start > 0 || new_block_offset > 0) {
439  if (block_start != new_block_start ||
440  block_offset > new_block_offset ||
441  block_size == 0)
442  {
443  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
444  z_strm->avail_out = 0;
445  read_inflate();
446  }
447  while (block_offset < new_block_offset) {
448  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
449  z_strm->avail_out = new_block_offset - block_offset;
450  if (z_strm->avail_out > out.size) {
451  z_strm->avail_out = out.size;
452  }
453  block_offset += z_strm->avail_out;
454  inflate();
455  }
456  new_block_start = 0;
457  new_block_offset = 0;
458  }
459 
460  z_strm->avail_out = out.size;
461  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
462 
463  if (0 < z_strm->avail_in) {
464  LOG("\tdata in queue, inflating");
465  inflate();
466  }
467  while (!end && z_strm->avail_out > 0) {
468  read_inflate();
469  }
470  if (end && z_strm->avail_out > 0) {
471  LOG("\tend of stream (EOF)");
472  //signal the stream has reached it's end
473  return eof;
474  }
475 
476  //set streambuf pointers
477  setg(out.buf, out.buf, reinterpret_cast <char*> (z_strm->next_out) );
478 
479  return int(out.buf[0]);
480  }
481 
482  //read to buffer in place (apart from data already buffered)
483  std::streamsize istreambuf::xsgetn(char *buffer, std::streamsize n) {
484  LOG("z::istreambuf::xsgetn (" << n << ")");
485 
486  if (new_block_start > 0 || new_block_offset > 0) {
487  if (block_start != new_block_start ||
488  block_offset > new_block_offset ||
489  block_size == 0)
490  {
491  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
492  z_strm->avail_out = 0;
493  read_inflate();
494  setg(out.buf, out.buf, out.buf);
495  }
496  else
497  {
498  std::streamsize available = egptr() - gptr();
499  int waste = new_block_offset - block_offset;
500  waste = (available < waste)? available : waste;
501  if (waste > 0) {
502  gbump(waste);
503  block_offset += waste;
504  }
505  }
506  while (block_offset < new_block_offset) {
507  z_strm->next_out = reinterpret_cast < Bytef* >(out.buf);
508  z_strm->avail_out = new_block_offset - block_offset;
509  if (z_strm->avail_out > out.size) {
510  z_strm->avail_out = out.size;
511  }
512  block_offset += z_strm->avail_out;
513  inflate();
514  }
515  new_block_start = 0;
516  new_block_offset = 0;
517  }
518 
519  //try to satisfy request from buffered input
520  std::streamsize available = egptr() - gptr();
521  int read = (available >= n)? n : available;
522  if (read) {
523  std::copy(gptr(), gptr() + read, buffer);
524  gbump(read);
525  block_offset += read;
526  }
527 
528  //inflate the rest directly into the user's buffer
529  if (read < n) {
530  if (end) {
531  LOG("\tend of stream (EOF)");
532  //signal the stream has reached it's end
533  return eof;
534  }
535 
536  z_strm->next_out = reinterpret_cast < Bytef* >(buffer) + read;
537  z_strm->avail_out = n - read;
538 
539  if (0 < z_strm->avail_in) {
540  inflate();
541  }
542  while (!end && z_strm->avail_out > 0) {
543  read_inflate();
544  }
545  if (end && z_strm->avail_out > 0) {
546  LOG("\tend of stream (EOF)");
547  //signal the stream has reached it's end
548  return eof;
549  }
550  block_offset += n - read;
551  }
552  return n;
553  }
554 
555  void istreambuf::read_inflate( const flush_kind f) {
556  LOG("z::istreambuf::read_inflate " << f);
557  bool reinit_inflator = false;
558  int read;
559  if (block_size < 0) { // stream has no blocksize markers
560  MUTEX_LOCK
561  if (new_block_start > 0) {
562  _sb->pubseekoff(new_block_start, std::ios_base::beg,
563  std::ios_base::in);
564  new_block_start = 0;
565  leftovers->len = 0;
566  block_next = 0;
567  end = false;
568  }
569  read = _sb->sgetn(in.buf, in.size);
570  MUTEX_UNLOCK
571  }
572  else { // look for prefixed blocksize: leading byte = 0
573  MUTEX_LOCK
574  if (new_block_start > 0) {
575  _sb->pubseekoff(new_block_start, std::ios_base::beg,
576  std::ios_base::in);
577  block_start = new_block_start;
578  new_block_start = 0;
579  leftovers->len = 0;
580  block_next = 0;
581  end = false;
582  }
583  else {
584  block_start = _sb->pubseekoff(0, std::ios_base::cur,
585  std::ios_base::in);
586  block_start -= leftovers->len;
587  }
588  reinit_inflator = (block_next != block_start);
589  read = leftovers->len;
590  if (read < 4) {
591  read += _sb->sgetn(leftovers->buf + read, 4 - read);
592  if (read != 4) {
593  end = true;
594  MUTEX_ESCAPE
595  return;
596  }
597  }
598  if (leftovers->buf[0] == 0) { // z blocks have prefixed blocksize
599  int *size = (int*)leftovers->buf;
600  block_size = ntohl(*size);
601  read -= 4;
602  if (reinit_inflator && read > 0) {
603  std::memcpy(in.buf, leftovers->buf + 4, read);
604  read += _sb->sgetn(in.buf + read, block_size - read);
605  }
606  else {
607  read = _sb->sgetn(in.buf, block_size - read);
608  }
609  leftovers->len = _sb->sgetn(leftovers->buf, 8);
610  if (leftovers->len > 4) {
611  std::memcpy(in.buf + read, leftovers->buf + 4,
612  leftovers->len - 4);
613  read += leftovers->len - 4;
614  }
615  }
616  else { // z blocks are jammed together, no blocksize available
617  read += _sb->sgetn(in.buf + read, in.size - read);
618  leftovers->len = 0;
619  block_size = -1;
620  }
621  block_next = _sb->pubseekoff(0, std::ios_base::cur,
622  std::ios_base::in);
623  block_next -= leftovers->len;
624  MUTEX_UNLOCK
625  }
626  LOG("\tread " << read << " bytes");
627  block_offset = 0;
628 
629  if (0 == read) {
630  end = true;
631  return;
632  }
633 
634  const char* head = (const char*)z_header;
635  if (reinit_inflator) {
636  int cret = ::inflateEnd(z_strm);
637  if (Z_OK != cret) {
638  LOG ("\terror terminating zstream " << cret);
639  //XXX throw exception here
640  raise_error(cret);
641  }
642  z_strm->zalloc = Z_NULL;
643  z_strm->zfree = Z_NULL;
644  z_strm->opaque = Z_NULL;
645  cret = ::inflateInit(z_strm);
646  if (Z_OK != cret) {
647  LOG ("\terror initializing zstream " << cret);
648  //XXX throw exception here
649  raise_error(cret);
650  }
651  if (strncmp(head, in.buf, z_header_length) != 0) {
652  z_strm->avail_in = z_header_length;
653  z_strm->next_in = reinterpret_cast < Bytef* >(z_header);
654  inflate(f); // inject the z stream header
655  }
656  }
657  z_strm->next_in = reinterpret_cast < Bytef* >(in.buf);
658  z_strm->avail_in = read;
659  inflate(f);
660  }
661 
662  void istreambuf::inflate(const flush_kind f) {
663  LOG("z::istreambuf::inflate " << f);
664 
665  int cret = ::inflate(z_strm, flush_macro(f));
666 
667  if (Z_STREAM_END == cret) {
668  z_strm->avail_in = 0;
669  block_next = 0;
670  }
671  else if (cret == Z_DATA_ERROR && z_strm->avail_in == 0) {
672  // Ignore CRC errors at the end of stream because we may not have
673  // started inflating at the beginning. We can rely on the CRC
674  // checks that are present within each compressed block anyway.
675  end = true;
676  }
677  else if (Z_OK != cret) {
678  printf("z input stream crapping out, cret is %d\n", cret);
679  LOG("\terror inflating: " << cret);
680  //XXX throw exception
681  raise_error(cret);
682  //can try to salvage some more data with inflateSync (on some cases)
683  }
684  }
685 
686  istreambuf::~istreambuf() {
687  LOG("z::~istreambuf");
688  if (0 != z_strm) {
689  //XXX should I throw an exception in case of error?
690  //remember this is a destructor
691  ::inflateEnd(z_strm);
692  }
693  }
694 
695 }//namespace z
696 }//namespace xstream
697 
698 #endif //zlib
char str[256]
debugging/logging support
char string[256]
sprintf(text,"Post KinFit Cut")
#define c
TF1 * f
Definition: FitGains.C:21
static const int eof
Definition: base64.cpp:14
C++ streambuf interface to read and write file formats supported by Zlib.
static const size_t block_size
Definition: src/md5.cpp:54
printf("string=%s", string)
#define LOG(s)
Definition: debug.h:30
exceptions related to zlib usage xstream::z namespace