Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
t_rest.cxx
Go to the documentation of this file.
1 //
2 // t_rest - standard suite of tests for hddm i/o performance,
3 // with the aim of validating the correct behavior
4 // and benchmarking the performance of the library
5 // in single/multi-threaded mode, with and without
6 // compression, in sequential and random access.
7 //
8 // author: richard.t.jones at uconn.edu
9 // version: august 12, 2016
10 //
11 // usage: t_rest [-n <events>] [-p <threads>] <input_restfile.hddm>
12 //
13 // notes:
14 // 1) The input file is read only once, and copies are made in the local
15 // directory in different compression modes. There should be enough
16 // space in the local directory to hold six copies of the entire
17 // input file, or its first N events if option -n is given: one
18 // single-threaded output and one multi-threaded output file for
19 // each compression mode: bz2, zlib, and none.
20 //
21 
22 #include <iostream>
23 #include <HDDM/hddm_r.hpp>
24 #include <fstream>
25 #include <map>
26 
27 #include <pthread.h>
28 #include <unistd.h>
29 #include <time.h>
30 
31 int max_test = 99;
32 int maxevents = -1;
33 int multithreads = 4;
34 float tread = 0;
35 
36 pthread_mutex_t lock;
37 
38 std::map<int, hddm_r::streamposition> postable;
39 
40 void *event_reader(void *arg) {
41  int tid = *((int**)arg)[0];
42  hddm_r::istream &fin = *((hddm_r::istream**)arg)[1];
43  hddm_r::HDDM rec;
44  while (fin >> rec) {
45  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
46  if (evno > 0 && evno % 10000 == 0)
47  printf(" event %d %d\n", evno, tid);
48  pthread_mutex_lock(&lock);
49  postable[evno] = fin.getPosition();
50  pthread_mutex_unlock(&lock);
51  }
52  return 0;
53 }
54 
55 void *event_writer(void *arg) {
56  int tid = *((int**)arg)[0];
57  hddm_r::istream &fin = *((hddm_r::istream**)arg)[1];
58  hddm_r::ostream &fout = *((hddm_r::ostream**)arg)[2];
59  hddm_r::HDDM rec;
60  while (fin >> rec) {
61  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
62  if (evno > 0 && evno % 10000 == 0)
63  printf(" event %d %d\n", evno, tid);
64  fout << rec;
65  }
66  return 0;
67 }
68 
69 void *random_reader(void *arg) {
70  int tid = *((int**)arg)[0];
71  hddm_r::istream &fin = *((hddm_r::istream**)arg)[1];
72  std::map<int, hddm_r::streamposition> &post =
73  *((std::map<int, hddm_r::streamposition>**)arg)[2];
74  std::map<int, hddm_r::streamposition>::iterator iter;
75  for (iter = post.begin(); iter != post.end(); ++iter) {
76  fin.setPosition(iter->second);
77  hddm_r::HDDM rec;
78  fin >> rec;
79  if (iter->first != rec.getReconstructedPhysicsEvent().getEventNo()) {
80  pthread_mutex_lock(&lock);
81  printf("bad event lookup for event %d in thread %d: ", iter->first, tid);
82  printf("requested %d but got %ld\n", iter->first,
83  rec.getReconstructedPhysicsEvent().getEventNo());
84  printf("requested (%ld,%d,%d) and got (%ld,%d,%d)\n",
85  post[iter->first].block_start,
86  post[iter->first].block_offset,
87  post[iter->first].block_status,
88  fin.getPosition().block_start,
89  fin.getPosition().block_offset,
90  fin.getPosition().block_status);
91  pthread_mutex_unlock(&lock);
92  break;
93  }
94  else if (fin.getPosition() != post[iter->first]) {
95  pthread_mutex_lock(&lock);
96  printf("bad record position for event %d: ", iter->first);
97  printf("requested (%ld,%d,%d) but got (%ld,%d,%d)\n",
98  post[iter->first].block_start,
99  post[iter->first].block_offset,
100  post[iter->first].block_status,
101  fin.getPosition().block_start,
102  fin.getPosition().block_offset,
103  fin.getPosition().block_status);
104  pthread_mutex_unlock(&lock);
105  break;
106  }
107  else if (iter->first > 0 && iter->first % 10000 == 0) {
108  printf(" event %d\n", iter->first);
109  }
110  }
111  return 0;
112 }
113 
114 void usage()
115 {
116  printf("usage: t_rest [options] <input_hddm_file>\n"
117  " where options may include any of the following:\n"
118  " -n <count> : stop after reading <count> events from the\n"
119  " input hddm file, default is the entire file\n"
120  " -p <count> : use <count> i/o threads in multi-thread tests,\n"
121  " default is 4\n"
122  " -h, --help : display this help message\n"
123  "\n");
124  exit(0);
125 }
126 
127 int main(int argc, const char *argv[])
128 {
129  int narg;
130  for (narg=1; narg < argc; ++narg) {
131  if (strncmp(argv[narg], "-n", 2) == 0) {
132  maxevents = atoi(argv[++narg]);
133  }
134  else if (strncmp(argv[narg], "--Nevents", 9) == 0) {
135  maxevents = atoi(argv[++narg]);
136  }
137  else if (strncmp(argv[narg], "-p", 2) == 0) {
138  multithreads = atoi(argv[++narg]);
139  }
140  else if (strncmp(argv[narg], "-Nthreads", 9) == 0) {
141  multithreads = atoi(argv[++narg]);
142  }
143  else if (strncmp(argv[narg], "-h", 2) == 0) {
144  usage();
145  }
146  else if (strncmp(argv[narg], "-?", 2) == 0) {
147  usage();
148  }
149  else if (strncmp(argv[narg], "--help", 6) == 0) {
150  usage();
151  }
152  else {
153  break;
154  }
155  }
156  if (narg == argc) {
157  usage();
158  }
159 
160  std::string rest_z = "t_rest_z.hddm";
161  std::string rest_bz2 = "t_rest_bz2.hddm";
162  std::string rest_none = "t_rest_none.hddm";
163 
164  std::map<int, hddm_r::streamposition> postable_bz2;
165  std::map<int, hddm_r::streamposition> postable_z;
166  std::map<int, hddm_r::streamposition> postable_none;
167 
168  pthread_mutex_init(&lock, 0);
169 
170  printf("test 0: copying records from input file\n");
171 
172  if (max_test > 0) {
173  std::ifstream ifs(argv[narg]);
174  hddm_r::istream fin(ifs);
175  std::ofstream ofs0(rest_none.c_str());
176  std::ofstream ofs1(rest_z.c_str());
177  std::ofstream ofs2(rest_bz2.c_str());
178  hddm_r::ostream fout0(ofs0);
179  hddm_r::ostream fout1(ofs1);
180  fout1.setCompression(hddm_r::k_z_compression);
181  hddm_r::ostream fout2(ofs2);
182  fout2.setCompression(hddm_r::k_bz2_compression);
183  hddm_r::HDDM rec;
184  while (fin >> rec) {
185  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
186  if (evno > 0 && evno % 10000 == 0)
187  printf(" event %d\n", evno);
188  fout0 << rec;
189  fout1 << rec;
190  fout2 << rec;
191  if (fin.getRecordsRead() == maxevents)
192  break;
193  }
194  }
195 
196  printf("test 1: reading single-threaded from input file, bz2 compression\n");
197 
198  if (max_test > 1) {
199  std::ifstream ifs(rest_bz2);
200  hddm_r::istream fin(ifs);
201  struct timespec t0;
202  clock_gettime(CLOCK_REALTIME, &t0);
203  hddm_r::HDDM rec;
204  while (fin >> rec) {
205  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
206  if (evno > 0 && evno % 10000 == 0)
207  printf(" event %d\n", evno);
208  postable_bz2[evno] = fin.getPosition();
209  }
210  struct timespec t1;
211  clock_gettime(CLOCK_REALTIME, &t1);
212  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
213  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
214  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
215  fin.getBytesRead() / tlapse / 1e6);
216  }
217 
218  printf("test 2: reading single-threaded from input file, zlib compression\n");
219 
220  if (max_test > 2) {
221  std::ifstream ifs(rest_z);
222  hddm_r::istream fin(ifs);
223  struct timespec t0;
224  clock_gettime(CLOCK_REALTIME, &t0);
225  hddm_r::HDDM rec;
226  while (fin >> rec) {
227  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
228  if (evno > 0 && evno % 10000 == 0)
229  printf(" event %d\n", evno);
230  postable_z[evno] = fin.getPosition();
231  }
232  struct timespec t1;
233  clock_gettime(CLOCK_REALTIME, &t1);
234  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
235  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
236  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
237  fin.getBytesRead() / tlapse / 1e6);
238  }
239 
240  printf("test 3: reading single-threaded from input file, no compression\n");
241 
242  if (max_test > 3) {
243  std::ifstream ifs(rest_none);
244  hddm_r::istream fin(ifs);
245  struct timespec t0;
246  clock_gettime(CLOCK_REALTIME, &t0);
247  hddm_r::HDDM rec;
248  while (fin >> rec) {
249  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
250  if (evno > 0 && evno % 10000 == 0)
251  printf(" event %d\n", evno);
252  postable_none[evno] = fin.getPosition();
253  }
254  struct timespec t1;
255  clock_gettime(CLOCK_REALTIME, &t1);
256  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
257  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
258  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
259  fin.getBytesRead() / tlapse / 1e6);
260  tread = tlapse;
261  }
262 
263  printf("test 4: writing single-threaded to output file, bz2 compression\n");
264 
265  if (max_test > 4) {
266  std::ifstream ifs(rest_none);
267  hddm_r::istream fin(ifs);
268  std::ofstream ofs("." + rest_bz2);
269  hddm_r::ostream fout(ofs);
270  fout.setCompression(hddm_r::k_bz2_compression);
271  struct timespec t0;
272  clock_gettime(CLOCK_REALTIME, &t0);
273  hddm_r::HDDM rec;
274  while (fin >> rec) {
275  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
276  if (evno > 0 && evno % 10000 == 0)
277  printf(" event %d\n", evno);
278  fout << rec;
279  }
280  struct timespec t1;
281  clock_gettime(CLOCK_REALTIME, &t1);
282  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
283  tlapse -= tread;
284  printf(" %d events written in %f seconds,", fout.getRecordsWritten(), tlapse);
285  printf(" %f ev/s, %f MB/s\n", fout.getRecordsWritten() / tlapse,
286  fout.getBytesWritten() / tlapse / 1e6);
287  }
288 
289  printf("test 5: writing single-threaded to output file, zlib compression\n");
290 
291  if (max_test > 5) {
292  std::ifstream ifs(rest_none);
293  hddm_r::istream fin(ifs);
294  std::ofstream ofs("." + rest_z);
295  hddm_r::ostream fout(ofs);
296  fout.setCompression(hddm_r::k_z_compression);
297  struct timespec t0;
298  clock_gettime(CLOCK_REALTIME, &t0);
299  hddm_r::HDDM rec;
300  while (fin >> rec) {
301  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
302  if (evno > 0 && evno % 10000 == 0)
303  printf(" event %d\n", evno);
304  fout << rec;
305  }
306  struct timespec t1;
307  clock_gettime(CLOCK_REALTIME, &t1);
308  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
309  tlapse -= tread;
310  printf(" %d events written in %f seconds,", fout.getRecordsWritten(), tlapse);
311  printf(" %f ev/s, %f MB/s\n", fout.getRecordsWritten() / tlapse,
312  fout.getBytesWritten() / tlapse / 1e6);
313  }
314 
315  printf("test 6: writing single-threaded to output file, no compression\n");
316 
317  if (max_test > 6) {
318  std::ifstream ifs(rest_none);
319  hddm_r::istream fin(ifs);
320  std::ofstream ofs("." + rest_none);
321  hddm_r::ostream fout(ofs);
322  struct timespec t0;
323  clock_gettime(CLOCK_REALTIME, &t0);
324  hddm_r::HDDM rec;
325  while (fin >> rec) {
326  int evno = rec.getReconstructedPhysicsEvent().getEventNo();
327  if (evno > 0 && evno % 10000 == 0)
328  printf(" event %d\n", evno);
329  fout << rec;
330  }
331  struct timespec t1;
332  clock_gettime(CLOCK_REALTIME, &t1);
333  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
334  tlapse -= tread;
335  printf(" %d events written in %f seconds,", fout.getRecordsWritten(), tlapse);
336  printf(" %f ev/s, %f MB/s\n", fout.getRecordsWritten() / tlapse,
337  fout.getBytesWritten() / tlapse / 1e6);
338  }
339 
340  printf("test 7: reading multi-threaded from input file, bz2 compression\n");
341 
342  if (max_test > 7) {
343  std::ifstream ifs("." + rest_bz2);
344  hddm_r::istream fin(ifs);
345  postable.clear();
346  std::list<pthread_t*> threads;
347  struct timespec t0;
348  clock_gettime(CLOCK_REALTIME, &t0);
349  for (int tid=0; tid < multithreads; ++tid) {
350  pthread_t *thread = new pthread_t;
351  void *args[2] = {&tid, &fin};
352  pthread_create(thread, 0, event_reader, args);
353  threads.push_back(thread);
354  }
355  std::list<pthread_t*>::iterator iter;
356  for (iter = threads.begin(); iter != threads.end(); ++iter) {
357  void *retval;
358  pthread_join(**iter, &retval);
359  delete *iter;
360  }
361  struct timespec t1;
362  clock_gettime(CLOCK_REALTIME, &t1);
363  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
364  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
365  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
366  fin.getBytesRead() / tlapse / 1e6);
367  }
368  std::map<int, hddm_r::streamposition> postable_dot_bz2(postable);
369 
370  printf("test 8: reading multi-threaded from input file, zlib compression\n");
371 
372  if (max_test > 8) {
373  std::ifstream ifs("." + rest_z);
374  hddm_r::istream fin(ifs);
375  postable.clear();
376  std::list<pthread_t*> threads;
377  struct timespec t0;
378  clock_gettime(CLOCK_REALTIME, &t0);
379  for (int tid=0; tid < multithreads; ++tid) {
380  pthread_t *thread = new pthread_t;
381  void *args[2] = {&tid, &fin};
382  pthread_create(thread, 0, event_reader, args);
383  threads.push_back(thread);
384  }
385  std::list<pthread_t*>::iterator iter;
386  for (iter = threads.begin(); iter != threads.end(); ++iter) {
387  void *retval;
388  pthread_join(**iter, &retval);
389  delete *iter;
390  }
391  struct timespec t1;
392  clock_gettime(CLOCK_REALTIME, &t1);
393  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
394  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
395  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
396  fin.getBytesRead() / tlapse / 1e6);
397  }
398  std::map<int, hddm_r::streamposition> postable_dot_z(postable);
399 
400  printf("test 9: reading multi-threaded from input file, no compression\n");
401 
402  if (max_test > 9) {
403  std::ifstream ifs("." + rest_none);
404  hddm_r::istream fin(ifs);
405  postable.clear();
406  std::list<pthread_t*> threads;
407  struct timespec t0;
408  clock_gettime(CLOCK_REALTIME, &t0);
409  for (int tid=0; tid < multithreads; ++tid) {
410  pthread_t *thread = new pthread_t;
411  void *args[2] = {&tid, &fin};
412  pthread_create(thread, 0, event_reader, args);
413  threads.push_back(thread);
414  }
415  std::list<pthread_t*>::iterator iter;
416  for (iter = threads.begin(); iter != threads.end(); ++iter) {
417  void *retval;
418  pthread_join(**iter, &retval);
419  delete *iter;
420  }
421  struct timespec t1;
422  clock_gettime(CLOCK_REALTIME, &t1);
423  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
424  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
425  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
426  fin.getBytesRead() / tlapse / 1e6);
427  }
428  std::map<int, hddm_r::streamposition> postable_dot_none(postable);
429 
430  printf("test 10: writing multi-threaded to output file, bz2 compression\n");
431 
432  if (max_test > 10) {
433  std::ifstream ifs(rest_none);
434  hddm_r::istream fin(ifs);
435  std::ofstream ofs(".." + rest_bz2);
436  hddm_r::ostream fout(ofs);
437  fout.setCompression(hddm_r::k_bz2_compression);
438  std::list<pthread_t*> threads;
439  struct timespec t0;
440  clock_gettime(CLOCK_REALTIME, &t0);
441  for (int tid=0; tid < multithreads; ++tid) {
442  pthread_t *thread = new pthread_t;
443  void *args[3] = {&tid, &fin, &fout};
444  pthread_create(thread, 0, event_writer, args);
445  threads.push_back(thread);
446  }
447  std::list<pthread_t*>::iterator iter;
448  for (iter = threads.begin(); iter != threads.end(); ++iter) {
449  void *retval;
450  pthread_join(**iter, &retval);
451  delete *iter;
452  }
453  struct timespec t1;
454  clock_gettime(CLOCK_REALTIME, &t1);
455  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
456  tlapse -= tread;
457  printf(" %d events written in %f seconds,", fout.getRecordsWritten(), tlapse);
458  printf(" %f ev/s, %f MB/s\n", fout.getRecordsWritten() / tlapse,
459  fout.getBytesWritten() / tlapse / 1e6);
460  }
461 
462  printf("test 11: writing multi-threaded to output file, zlib compression\n");
463 
464  if (max_test > 11) {
465  std::ifstream ifs(rest_none);
466  hddm_r::istream fin(ifs);
467  std::ofstream ofs(".." + rest_z);
468  hddm_r::ostream fout(ofs);
469  fout.setCompression(hddm_r::k_z_compression);
470  std::list<pthread_t*> threads;
471  struct timespec t0;
472  clock_gettime(CLOCK_REALTIME, &t0);
473  for (int tid=0; tid < multithreads; ++tid) {
474  pthread_t *thread = new pthread_t;
475  void *args[3] = {&tid, &fin, &fout};
476  pthread_create(thread, 0, event_writer, args);
477  threads.push_back(thread);
478  }
479  std::list<pthread_t*>::iterator iter;
480  for (iter = threads.begin(); iter != threads.end(); ++iter) {
481  void *retval;
482  pthread_join(**iter, &retval);
483  delete *iter;
484  }
485  struct timespec t1;
486  clock_gettime(CLOCK_REALTIME, &t1);
487  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
488  tlapse -= tread;
489  printf(" %d events written in %f seconds,", fout.getRecordsWritten(), tlapse);
490  printf(" %f ev/s, %f MB/s\n", fout.getRecordsWritten() / tlapse,
491  fout.getBytesWritten() / tlapse / 1e6);
492  }
493 
494  printf("test 12: writing multi-threaded to output file, no compression\n");
495 
496  if (max_test > 12) {
497  std::ifstream ifs(rest_none);
498  hddm_r::istream fin(ifs);
499  std::ofstream ofs(".." + rest_none);
500  hddm_r::ostream fout(ofs);
501  std::list<pthread_t*> threads;
502  struct timespec t0;
503  clock_gettime(CLOCK_REALTIME, &t0);
504  for (int tid=0; tid < multithreads; ++tid) {
505  pthread_t *thread = new pthread_t;
506  void *args[3] = {&tid, &fin, &fout};
507  pthread_create(thread, 0, event_writer, args);
508  threads.push_back(thread);
509  }
510  std::list<pthread_t*>::iterator iter;
511  for (iter = threads.begin(); iter != threads.end(); ++iter) {
512  void *retval;
513  pthread_join(**iter, &retval);
514  delete *iter;
515  }
516  struct timespec t1;
517  clock_gettime(CLOCK_REALTIME, &t1);
518  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
519  tlapse -= tread;
520  printf(" %d events written in %f seconds,", fout.getRecordsWritten(), tlapse);
521  printf(" %f ev/s, %f MB/s\n", fout.getRecordsWritten() / tlapse,
522  fout.getBytesWritten() / tlapse / 1e6);
523  }
524 
525  printf("test 13: reading from multi-threaded output file, bz2 compression\n");
526 
527  if (max_test > 13) {
528  std::ifstream ifs(".." + rest_bz2);
529  hddm_r::istream fin(ifs);
530  postable.clear();
531  std::list<pthread_t*> threads;
532  struct timespec t0;
533  clock_gettime(CLOCK_REALTIME, &t0);
534  for (int tid=0; tid < multithreads; ++tid) {
535  pthread_t *thread = new pthread_t;
536  void *args[2] = {&tid, &fin};
537  pthread_create(thread, 0, event_reader, args);
538  threads.push_back(thread);
539  }
540  std::list<pthread_t*>::iterator iter;
541  for (iter = threads.begin(); iter != threads.end(); ++iter) {
542  void *retval;
543  pthread_join(**iter, &retval);
544  delete *iter;
545  }
546  struct timespec t1;
547  clock_gettime(CLOCK_REALTIME, &t1);
548  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
549  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
550  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
551  fin.getBytesRead() / tlapse / 1e6);
552  }
553  std::map<int, hddm_r::streamposition> postable_dotdot_bz2(postable);
554 
555  printf("test 14: reading from multi-threaded output file, zlib compression\n");
556 
557  if (max_test > 14) {
558  std::ifstream ifs(".." + rest_z);
559  hddm_r::istream fin(ifs);
560  postable.clear();
561  std::list<pthread_t*> threads;
562  struct timespec t0;
563  clock_gettime(CLOCK_REALTIME, &t0);
564  for (int tid=0; tid < multithreads; ++tid) {
565  pthread_t *thread = new pthread_t;
566  void *args[2] = {&tid, &fin};
567  pthread_create(thread, 0, event_reader, args);
568  threads.push_back(thread);
569  }
570  std::list<pthread_t*>::iterator iter;
571  for (iter = threads.begin(); iter != threads.end(); ++iter) {
572  void *retval;
573  pthread_join(**iter, &retval);
574  delete *iter;
575  }
576  struct timespec t1;
577  clock_gettime(CLOCK_REALTIME, &t1);
578  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
579  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
580  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
581  fin.getBytesRead() / tlapse / 1e6);
582  }
583  std::map<int, hddm_r::streamposition> postable_dotdot_z(postable);
584 
585  printf("test 15: reading from multi-threaded output file, no compression\n");
586 
587  if (max_test > 15) {
588  std::ifstream ifs(".." + rest_none);
589  hddm_r::istream fin(ifs);
590  postable.clear();
591  std::list<pthread_t*> threads;
592  struct timespec t0;
593  clock_gettime(CLOCK_REALTIME, &t0);
594  for (int tid=0; tid < multithreads; ++tid) {
595  pthread_t *thread = new pthread_t;
596  void *args[2] = {&tid, &fin};
597  pthread_create(thread, 0, event_reader, args);
598  threads.push_back(thread);
599  }
600  std::list<pthread_t*>::iterator iter;
601  for (iter = threads.begin(); iter != threads.end(); ++iter) {
602  void *retval;
603  pthread_join(**iter, &retval);
604  delete *iter;
605  }
606  struct timespec t1;
607  clock_gettime(CLOCK_REALTIME, &t1);
608  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
609  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
610  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
611  fin.getBytesRead() / tlapse / 1e6);
612  }
613  std::map<int, hddm_r::streamposition> postable_dotdot_none(postable);
614 
615  printf("test 16: single-threaded random access reads, bz2 compression\n");
616 
617  if (max_test > 16) {
618  std::ifstream ifs(rest_bz2);
619  hddm_r::istream fin(ifs);
620  struct timespec t0;
621  clock_gettime(CLOCK_REALTIME, &t0);
622  std::map<int, hddm_r::streamposition>::iterator iter;
623  for (iter = postable_bz2.begin(); iter != postable_bz2.end(); ++iter) {
624  fin.setPosition(iter->second);
625  hddm_r::HDDM rec;
626  fin >> rec;
627  if (iter->first != rec.getReconstructedPhysicsEvent().getEventNo()) {
628  printf("bad event lookup for event %d in file %s: ",
629  iter->first, rest_bz2.c_str());
630  printf("requested %d but got %ld\n", iter->first,
631  rec.getReconstructedPhysicsEvent().getEventNo());
632  printf("requested (%ld,%d,%d) and got (%ld,%d,%d)\n",
633  iter->second.block_start,
634  iter->second.block_offset,
635  iter->second.block_status,
636  fin.getPosition().block_start,
637  fin.getPosition().block_offset,
638  fin.getPosition().block_status);
639  exit(1);
640  }
641  else if (fin.getPosition() != iter->second) {
642  printf("bad record position for event %d in file %s: ",
643  iter->first, rest_bz2.c_str());
644  printf("requested (%ld,%d,%d) but got (%ld,%d,%d)\n",
645  iter->second.block_start,
646  iter->second.block_offset,
647  iter->second.block_status,
648  fin.getPosition().block_start,
649  fin.getPosition().block_offset,
650  fin.getPosition().block_status);
651  exit(1);
652  }
653  else if (iter->first > 0 && iter->first % 1000 == 0 ) {
654  printf(" event %d\n", iter->first);
655  }
656  }
657  struct timespec t1;
658  clock_gettime(CLOCK_REALTIME, &t1);
659  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
660  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
661  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
662  fin.getBytesRead() / tlapse / 1e6);
663  }
664 
665  printf("test 17: single-threaded random access reads, z compression\n");
666 
667  if (max_test > 17) {
668  std::ifstream ifs(rest_z);
669  hddm_r::istream fin(ifs);
670  struct timespec t0;
671  clock_gettime(CLOCK_REALTIME, &t0);
672  std::map<int, hddm_r::streamposition>::iterator iter;
673  for (iter = postable_z.begin(); iter != postable_z.end(); ++iter) {
674  fin.setPosition(iter->second);
675  hddm_r::HDDM rec;
676  fin >> rec;
677  if (iter->first != rec.getReconstructedPhysicsEvent().getEventNo()) {
678  printf("bad event lookup for event %d in file %s: ",
679  iter->first, rest_z.c_str());
680  printf("requested %d but got %ld\n", iter->first,
681  rec.getReconstructedPhysicsEvent().getEventNo());
682  printf("requested (%ld,%d,%d) and got (%ld,%d,%d)\n",
683  iter->second.block_start,
684  iter->second.block_offset,
685  iter->second.block_status,
686  fin.getPosition().block_start,
687  fin.getPosition().block_offset,
688  fin.getPosition().block_status);
689  exit(1);
690  }
691  else if (fin.getPosition() != iter->second) {
692  printf("bad record position for event %d in file %s: ",
693  iter->first, rest_z.c_str());
694  printf("requested (%ld,%d,%d) but got (%ld,%d,%d)\n",
695  iter->second.block_start,
696  iter->second.block_offset,
697  iter->second.block_status,
698  fin.getPosition().block_start,
699  fin.getPosition().block_offset,
700  fin.getPosition().block_status);
701  exit(1);
702  }
703  else if (iter->first > 0 && iter->first % 1000 == 0 ) {
704  printf(" event %d\n", iter->first);
705  }
706  }
707  struct timespec t1;
708  clock_gettime(CLOCK_REALTIME, &t1);
709  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
710  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
711  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
712  fin.getBytesRead() / tlapse / 1e6);
713  }
714 
715  printf("test 18: single-threaded random access reads, no compression\n");
716 
717  if (max_test > 18) {
718  std::ifstream ifs(rest_none);
719  hddm_r::istream fin(ifs);
720  struct timespec t0;
721  clock_gettime(CLOCK_REALTIME, &t0);
722  std::map<int, hddm_r::streamposition>::iterator iter;
723  for (iter = postable_none.begin(); iter != postable_none.end(); ++iter) {
724  fin.setPosition(iter->second);
725  hddm_r::HDDM rec;
726  fin >> rec;
727  if (iter->first != rec.getReconstructedPhysicsEvent().getEventNo()) {
728  printf("bad event lookup for event %d in file %s: ",
729  iter->first, rest_none.c_str());
730  printf("requested %d but got %ld\n", iter->first,
731  rec.getReconstructedPhysicsEvent().getEventNo());
732  printf("requested (%ld,%d,%d) and got (%ld,%d,%d)\n",
733  iter->second.block_start,
734  iter->second.block_offset,
735  iter->second.block_status,
736  fin.getPosition().block_start,
737  fin.getPosition().block_offset,
738  fin.getPosition().block_status);
739  exit(1);
740  }
741  else if (fin.getPosition() != iter->second) {
742  printf("bad record position for event %d in file %s: ",
743  iter->first, rest_none.c_str());
744  printf("requested (%ld,%d,%d) but got (%ld,%d,%d)\n",
745  iter->second.block_start,
746  iter->second.block_offset,
747  iter->second.block_status,
748  fin.getPosition().block_start,
749  fin.getPosition().block_offset,
750  fin.getPosition().block_status);
751  exit(1);
752  }
753  else if (iter->first > 0 && iter->first % 1000 == 0 ) {
754  printf(" event %d\n", iter->first);
755  }
756  }
757  struct timespec t1;
758  clock_gettime(CLOCK_REALTIME, &t1);
759  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
760  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
761  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
762  fin.getBytesRead() / tlapse / 1e6);
763  }
764 
765  printf("test 19: multi-threaded random access reads, bz2 compression\n");
766 
767  if (max_test > 19) {
768  std::ifstream ifs(rest_bz2);
769  hddm_r::istream fin(ifs);
770  std::list<pthread_t*> threads;
771  struct timespec t0;
772  clock_gettime(CLOCK_REALTIME, &t0);
773  for (int tid=0; tid < multithreads; ++tid) {
774  pthread_t *thread = new pthread_t;
775  void *args[3] = {&tid, &fin, &postable_bz2};
776  pthread_create(thread, 0, random_reader, args);
777  threads.push_back(thread);
778  }
779  std::list<pthread_t*>::iterator iter;
780  for (iter = threads.begin(); iter != threads.end(); ++iter) {
781  void *retval;
782  pthread_join(**iter, &retval);
783  delete *iter;
784  }
785  struct timespec t1;
786  clock_gettime(CLOCK_REALTIME, &t1);
787  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
788  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
789  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
790  fin.getBytesRead() / tlapse / 1e6);
791  }
792 
793  printf("test 20: multi-threaded random access reads, z compression\n");
794 
795  if (max_test > 20) {
796  std::ifstream ifs(rest_z);
797  hddm_r::istream fin(ifs);
798  std::list<pthread_t*> threads;
799  struct timespec t0;
800  clock_gettime(CLOCK_REALTIME, &t0);
801  for (int tid=0; tid < multithreads; ++tid) {
802  pthread_t *thread = new pthread_t;
803  void *args[3] = {&tid, &fin, &postable_z};
804  pthread_create(thread, 0, random_reader, args);
805  threads.push_back(thread);
806  }
807  std::list<pthread_t*>::iterator iter;
808  for (iter = threads.begin(); iter != threads.end(); ++iter) {
809  void *retval;
810  pthread_join(**iter, &retval);
811  delete *iter;
812  }
813  struct timespec t1;
814  clock_gettime(CLOCK_REALTIME, &t1);
815  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
816  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
817  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
818  fin.getBytesRead() / tlapse / 1e6);
819  }
820 
821  printf("test 21: multi-threaded random access reads, no compression\n");
822 
823  if (max_test > 21) {
824  std::ifstream ifs(rest_none);
825  hddm_r::istream fin(ifs);
826  std::list<pthread_t*> threads;
827  struct timespec t0;
828  clock_gettime(CLOCK_REALTIME, &t0);
829  for (int tid=0; tid < multithreads; ++tid) {
830  pthread_t *thread = new pthread_t;
831  void *args[3] = {&tid, &fin, &postable_none};
832  pthread_create(thread, 0, random_reader, args);
833  threads.push_back(thread);
834  }
835  std::list<pthread_t*>::iterator iter;
836  for (iter = threads.begin(); iter != threads.end(); ++iter) {
837  void *retval;
838  pthread_join(**iter, &retval);
839  delete *iter;
840  }
841  struct timespec t1;
842  clock_gettime(CLOCK_REALTIME, &t1);
843  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
844  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
845  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
846  fin.getBytesRead() / tlapse / 1e6);
847  }
848 
849  printf("test 22: multi-threaded random access reads, mt-bz2 compression\n");
850 
851  if (max_test > 22) {
852  std::ifstream ifs(".." + rest_bz2);
853  hddm_r::istream fin(ifs);
854  std::list<pthread_t*> threads;
855  struct timespec t0;
856  clock_gettime(CLOCK_REALTIME, &t0);
857  for (int tid=0; tid < multithreads; ++tid) {
858  pthread_t *thread = new pthread_t;
859  void *args[3] = {&tid, &fin, &postable_dotdot_bz2};
860  pthread_create(thread, 0, random_reader, args);
861  threads.push_back(thread);
862  }
863  std::list<pthread_t*>::iterator iter;
864  for (iter = threads.begin(); iter != threads.end(); ++iter) {
865  void *retval;
866  pthread_join(**iter, &retval);
867  delete *iter;
868  }
869  struct timespec t1;
870  clock_gettime(CLOCK_REALTIME, &t1);
871  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
872  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
873  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
874  fin.getBytesRead() / tlapse / 1e6);
875  }
876 
877  printf("test 23: multi-threaded random access reads, mt-z compression\n");
878 
879  if (max_test > 23) {
880  std::ifstream ifs(".." + rest_z);
881  hddm_r::istream fin(ifs);
882  std::list<pthread_t*> threads;
883  struct timespec t0;
884  clock_gettime(CLOCK_REALTIME, &t0);
885  for (int tid=0; tid < multithreads; ++tid) {
886  pthread_t *thread = new pthread_t;
887  void *args[3] = {&tid, &fin, &postable_dotdot_z};
888  pthread_create(thread, 0, random_reader, args);
889  threads.push_back(thread);
890  }
891  std::list<pthread_t*>::iterator iter;
892  for (iter = threads.begin(); iter != threads.end(); ++iter) {
893  void *retval;
894  pthread_join(**iter, &retval);
895  delete *iter;
896  }
897  struct timespec t1;
898  clock_gettime(CLOCK_REALTIME, &t1);
899  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
900  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
901  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
902  fin.getBytesRead() / tlapse / 1e6);
903  }
904 
905  printf("test 24: multi-threaded random access reads, mt-no compression\n");
906 
907  if (max_test > 24) {
908  std::ifstream ifs(".." + rest_none);
909  hddm_r::istream fin(ifs);
910  std::list<pthread_t*> threads;
911  struct timespec t0;
912  clock_gettime(CLOCK_REALTIME, &t0);
913  for (int tid=0; tid < multithreads; ++tid) {
914  pthread_t *thread = new pthread_t;
915  void *args[3] = {&tid, &fin, &postable_dotdot_none};
916  pthread_create(thread, 0, random_reader, args);
917  threads.push_back(thread);
918  }
919  std::list<pthread_t*>::iterator iter;
920  for (iter = threads.begin(); iter != threads.end(); ++iter) {
921  void *retval;
922  pthread_join(**iter, &retval);
923  delete *iter;
924  }
925  struct timespec t1;
926  clock_gettime(CLOCK_REALTIME, &t1);
927  float tlapse = (t1.tv_sec - t0.tv_sec) + (t1.tv_nsec - t0.tv_nsec)*1e-9;
928  printf(" %d events read in %f seconds,", fin.getRecordsRead(), tlapse);
929  printf(" %f ev/s, %f MB/s\n", fin.getRecordsRead() / tlapse,
930  fin.getBytesRead() / tlapse / 1e6);
931  }
932 }
int maxevents
Definition: t_rest.cxx:32
void * event_reader(void *arg)
Definition: t_rest.cxx:40
void * random_reader(void *arg)
Definition: t_rest.cxx:69
float tread
Definition: t_rest.cxx:34
pthread_mutex_t lock
Definition: t_rest.cxx:36
char string[256]
int multithreads
Definition: t_rest.cxx:33
void usage()
Definition: t_rest.cxx:114
TEllipse * e
void * event_writer(void *arg)
Definition: t_rest.cxx:55
std::map< int, hddm_r::streamposition > postable
Definition: t_rest.cxx:38
int max_test
Definition: t_rest.cxx:31
printf("string=%s", string)
int main(int argc, char *argv[])
Definition: gendoc.cc:6