Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HDET.cc
Go to the documentation of this file.
1 // $Id$
2 //
3 // File: HDET.cc
4 // Created: Fri Apr 22 07:05:40 EDT 2016
5 // Creator: davidl (on Darwin harriet 13.4.0 i386)
6 //
7 
8 #include <cmath>
9 #include <vector>
10 #include <iomanip>
11 using namespace std;
12 
13 #include <JANA/JException.h>
14 using namespace jana;
15 
16 #include "HDET.h"
17 #include "swap_bank.h"
18 
19 //---------------------------------
20 // HDET (Constructor)
21 //---------------------------------
22 HDET::HDET(string source_name, int ET_STATION_NEVENTS, bool ET_STATION_CREATE_BLOCKING):
23  source_name(source_name)
24  ,ET_STATION_NEVENTS(ET_STATION_NEVENTS)
25  ,ET_STATION_CREATE_BLOCKING(ET_STATION_CREATE_BLOCKING)
26 {
27 
28  VERBOSE = 0;
29  err_code = HDET_OK;
30  is_connected = false;
31  swap_needed = false;
32 
33  Net_events = 0;
34  Nevio_blocks = 0;
35  Nevio_events = 0;
36  Net_timeouts = 0;
37 
38 #ifndef HAVE_ET
39 
40  // Not compile with ET support
41  cerr << endl;
42  cerr << "=== ERROR: ET source specified and this was compiled without ===" << endl;
43  cerr << "=== ET support. You need to install ET and set your ===" << endl;
44  cerr << "=== ETROOT environment variable appropriately before ===" << endl;
45  cerr << "=== recompiling. ===" << endl;
46  cerr << endl;
47  throw JException("Compiled without ET support " + this->source_name, __FILE__, __LINE__);
48 
49 #else // HAVE_ET
50 
51 
52  /// Format for ET source strings is:
53  ///
54  /// ET:session:station:host:port
55  ///
56  /// The session is used to form the filename of the ET
57  /// system. For example, if an session of "eb" is specified,
58  /// then a file named "/tmp/et_sys_eb" is assumed to be
59  /// what should be opened. If no session is specified (or
60  /// an empty session name) then "none" is used as the session.
61  /// If the session string starts with a slash (/) then it
62  /// is used as the full filename path.
63  ///
64  /// If the station name specified does not exist, it will
65  /// be created. If it does exist, the existing station will
66  /// be used. If no station is specified, then the station
67  /// name "DANA" will be used. Any station created will be
68  /// set to "blocking" *unless* the configuration paramter
69  /// EVIO:ET_STATION_CREATE_BLOCKING is set to "0"
70  /// in which case it will be set to non-blocking.
71  ///
72  /// If the host is specified, then an attempt will be made
73  /// to open that system. If it is not specified, then
74  /// it will attempt to open an ET system on the local machine.
75  ///
76  /// If port is specified, it is used as the TCP port number
77  /// on the remote host to attach to. If the host is not
78  /// specified (i.e. by having two colons and therefore
79  /// an empty string) then the port is ignored. If the
80  /// port is omitted or specified as "0", then the default
81  /// port is used.
82  ///
83 
84  // Split source name into session, station, etc...
85  vector<string> fields;
86  string str = source_name;
87  size_t startpos=0, endpos=0;
88  while((endpos = str.find(":", startpos)) != str.npos){
89  size_t len = endpos-startpos;
90  fields.push_back(len==0 ? "":str.substr(startpos, len));
91  startpos = endpos+1;
92  }
93  if(startpos<str.length()) fields.push_back(str.substr(startpos, str.npos));
94 
95  string session = fields.size()>1 ? fields[1]:"";
96  string station = fields.size()>2 ? fields[2]:"";
97  string host = fields.size()>3 ? fields[3]:"localhost";
98  int port = fields.size()>4 ? atoi(fields[4].c_str()):ET_SERVER_PORT;
99 
100  if(session == "") session = "none";
101  if(station == "") station = "DANA";
102  if(host == "") host = "localhost";
103  string fname = session.at(0)=='/' ? session:(string("/tmp/et_sys_") + session);
104 
105  // Report to user what we're doing
106  cout << " Opening ET system:" << endl;
107  if(session!=fname) cout << " session: " << session << endl;
108  cout << " station: " << station << endl;
109  cout << " system file: " << fname << endl;
110  cout << " host: " << host << endl;
111  if(port !=0) cout << " port: " << port << endl;
112 
113  // connect to the ET system
114  et_openconfig openconfig;
115  et_open_config_init(&openconfig);
116  if(host != ""){
117  if(host.find("239.")==0){
118  cout<<__FILE__<<":"<<__LINE__<<" Configuring input ET for multicast" << endl;
119  et_open_config_setcast(openconfig, ET_MULTICAST);
120  et_open_config_addmulticast(openconfig, host.c_str());
121  et_open_config_sethost(openconfig, ET_HOST_ANYWHERE);
122  et_open_config_setport(openconfig, port);
123  struct timespec tspec={5,5};
124  et_open_config_settimeout(openconfig, tspec);
125  et_open_config_setwait(openconfig, ET_OPEN_WAIT);
126  }else{
127  cout<<__FILE__<<":"<<__LINE__<<" Configuring input ET for direct connection" << endl;
128  et_open_config_setcast(openconfig, ET_DIRECT);
129  et_open_config_setmode(openconfig, ET_HOST_AS_LOCAL); // ET_HOST_AS_LOCAL or ET_HOST_AS_REMOTE
130  et_open_config_sethost(openconfig, host.c_str());
131  et_open_config_setport(openconfig, ET_BROADCAST_PORT);
132  if(port != 0)et_open_config_setserverport(openconfig, port);
133  }
134  }
135  int err = et_open(&sys_id,fname.c_str(),openconfig);
136  if(err != ET_OK){
137  cerr << __FILE__<<":"<<__LINE__<<" Problem opening ET system"<<endl;
138  cerr << et_perror(err);
139  return;
140  }
141 
142  // create station config in case no station exists
143  et_statconfig et_station_config;
144  et_station_config_init(&et_station_config);
145  et_station_config_setblock(et_station_config, ET_STATION_CREATE_BLOCKING ? ET_STATION_BLOCKING:ET_STATION_NONBLOCKING);
146  et_station_config_setselect(et_station_config,ET_STATION_SELECT_ALL);
147  et_station_config_setuser(et_station_config,ET_STATION_USER_MULTI);
148  et_station_config_setrestore(et_station_config,ET_STATION_RESTORE_OUT);
149  et_station_config_setcue(et_station_config,ET_STATION_NEVENTS);
150  et_station_config_setprescale(et_station_config,1);
151  cout<<"ET station configured\n";
152 
153  // create station if not already created
154  int status=et_station_create(sys_id,&sta_id,station.c_str(),et_station_config);
155  if((status!=ET_OK)&&(status!=ET_ERROR_EXISTS)) {
156  et_close(sys_id);
157  cerr << "Unable to create station " << station << endl;
158  cerr << et_perror(status);
159 
160  // Check that the number of events in the ET system is not
161  // less than the number of events we specified for the station CUE.
162  int Nevents = 0;
163  et_system_getnumevents(sys_id, &Nevents);
164  if(Nevents <= ET_STATION_NEVENTS){
165  cerr << "NOTE: The number of events specified for the station cue is equal to" << endl;
166  cerr << "or greater than the number of events in the entire ET system:" << endl;
167  cerr << endl;
168  cerr << " " << ET_STATION_NEVENTS << " >= " << Nevents << endl;
169  cerr << endl;
170  cerr << "Try re-running with: " << endl;
171  cerr << endl;
172  cerr << " -PEVIO:ET_STATION_NEVENTS=" << (Nevents+1)/2 << endl;
173  cerr << endl;
174  }
175  return;
176  }
177  if(status==ET_ERROR_EXISTS){
178  cout << " Using existing ET station " << station << endl;
179  }else{
180  cout << " ET station " << station << " created\n";
181  }
182 
183  // Attach to the ET station
184  status=et_station_attach(sys_id,sta_id,&att_id);
185  if(status!=ET_OK) {
186  et_close(sys_id);
187  cerr << "Unable to attach to station " << station << endl;
188  return;
189  }
190 
191  cout << "...now connected to ET system: " << fname
192  << ", station: " << station << " (station id=" << sta_id << ", attach id=" << att_id <<")" << endl;
193 
194  is_connected = true;
195 
196  // Make sure the size of event buffers we will allocate are at least as big
197  // as the event size used in the ET system
198  size_t eventsize;
199  et_system_geteventsize(sys_id, &eventsize);
200  cout<<" ET system event size:"<<eventsize<<endl;
201 
202 
203 #endif // HAVE_ET
204 }
205 
206 //---------------------------------
207 // ~HDET (Destructor)
208 //---------------------------------
210 {
211 #ifdef HAVE_ET
212  if(is_connected) et_close(sys_id);
213  is_connected = false;
214 #endif
215 
216  for(auto p : et_buff_pool) delete[] p.first;
217  et_buff_pool.clear();
218 }
219 
220 //----------------
221 // read
222 //----------------
223 bool HDET::read(uint32_t* &buff, uint32_t &buff_len, bool allow_swap)
224 {
225  /// Read an event from the connected ET system.
226  /// One ET event may contain multiple EVIO events. This method needs
227  /// to return a single event in the given buffer (or a non-zero error
228  /// code). To accomodate this, a single ET event is placed into multiple
229  /// buffers so subsequent calls to this method can return one of those
230  /// until another ET read is needed.
231  ///
232  /// Note that the buff and bufflen parameters passed in here are references
233  /// to the buffer in the worker thread that will be assigned to this
234  /// event. In order to save time, we swap that buffer with one from our
235  /// pool. Pool buffers are grown in size as needed to hold events as they
236  /// are copied from the ET event.
237 #ifndef HAVE_ET
238  return HDET_NO_ET_SUPPORT;
239 #else
240 
241  // Note that this should only be called from the dispatcher thread
242  // so no lock is needed for manipulating pool objects.
243 
244  if( et_buffs.empty() ){
245 
246  if(VERBOSE>3) cout << "HDET: et_buffs empty. Will read new ET event ..." << endl;
247 
248  // No event buffers ready, read in another ET event
249  int TIMEOUT = 2;
250  struct timespec timeout;
251  timeout.tv_sec = (unsigned int)floor(TIMEOUT); // set ET timeout
252  timeout.tv_nsec = (unsigned int)floor(1.0E9*(TIMEOUT-(float)timeout.tv_sec));
253  et_event *pe=NULL;
254  int err = et_event_get(sys_id, att_id, &pe, ET_TIMED , &timeout);
255  if(err == ET_ERROR_TIMEOUT) {Net_timeouts++; return (err_code=HDET_TIMEOUT);}
256  Net_events++;
257 
258  // Read ET event
259  uint32_t *et_buff = NULL;
260  size_t et_len=0;
261  et_event_getdata(pe, (void**)&et_buff);
262  et_event_getlength(pe, &et_len);
263 
264  if(VERBOSE>3) cout << "HDET: read ET event with total length of " << et_len << " bytes (" << et_len/4 << " words)" << endl;
265 
266  // A single ET event may have multiple EVIO blocks in it
267  // Each block may have several EVIO events in it.
268  //
269  // (note that "block" here is not the same as the CODA
270  // "block number". That one determines the number of
271  // entangled events within the EVIO event and is dealt
272  // with later while parsing the EVIO event itself.)
273  //
274  // We need to loop over EVIO blocks in this ET event
275  // and then loop over EVIO events within the block.
276 
277  // Loop over EVIO blocks in ET event
278  size_t et_idx=0;
279  while(et_idx < et_len/4){
280 
281  // Pointer to start of EVIO block header
282  if(VERBOSE>3)cout << "HDET: Looking for EVIO block header at et_idx=" << et_idx << endl;
283  uint32_t *evio_block = &et_buff[et_idx];
284 
285  if(VERBOSE>5) PrintEVIOBlockHeader(evio_block);
286 
287  // Check byte order of event by looking at magic #
288  swap_needed = false;
289  uint32_t magic = evio_block[7];
290  switch(magic){
291  case 0xc0da0100: swap_needed = false; break;
292  case 0x0001dac0: swap_needed = true; break;
293  default:
294  cout << "HDET: EVIO magic word not present!" << endl;
295  return (err_code=HDET_ERROR);
296  }
297  Nevio_blocks++;
298  uint32_t len = evio_block[0];
299  if(swap_needed) len = swap32(len);
300  if(VERBOSE>3){
301  cout << "HDET: Swapping is " << (swap_needed ? "":"not ") << "needed" << endl;
302  cout << "HDET: Num. words in EVIO buffer: "<<len<<endl;
303  }
304 
305  bool is_last_evio_block = (evio_block[5]>>(9+8))&0x1;
306  if(VERBOSE>3)cout << "HDET: Is last EVIO block?: " << is_last_evio_block << endl;
307 
308  // Loop over all evio events in ET event
309  uint32_t idx = 8; // point to first EVIO event in block
310  while(idx<len){
311 
312  // Size of events in bytes
313  uint32_t mylen = swap_needed ? swap32(evio_block[idx]):evio_block[idx];
314 
315  if(VERBOSE>3)cout << "HDET: Looking for EVIO event at idx="<<idx<<" (mylen="<<mylen<<" words)" << endl;
316 
317  // Check that EVIO event length doesn't claim to
318  // extend past ET buffer.
319  if( (idx+mylen) > len ){
320  err_mess << "Bad word count while for event in ET event stack!" << endl;
321  err_mess << "idx="<<idx<<" mylen="<<mylen<<" len="<<len<<endl;
322  err_mess << "This indicates a problem either with the DAQ system"<<endl;
323  err_mess << "or this parser code! Contact davidl@jlab.org x5567 " <<endl;
324  return (err_code=HDET_BAD_FORMAT);
325  }
326  Nevio_events++;
327 
328  // Get buffer from pool. If it is not large enough for this
329  // event, then delete it and allocated a new one. Eventually,
330  // the pool will be filled with buffers that are the right size.
331  uint32_t *mybuff = NULL;
332  uint32_t mybuff_len = 0;
333  if(!et_buff_pool.empty()){
334  if(VERBOSE>3)cout << "HDET: Getting buffer from pool" << endl;
335  auto b = et_buff_pool.front();
336  et_buff_pool.pop_front();
337  mybuff = b.first;
338  mybuff_len = b.second;
339  if(mybuff_len < (mylen+1)){ // +1 for length word
340  if(VERBOSE>3)cout << "HDET: buffer too small ("<<mybuff_len<<" < "<<(mylen+1)<<") discarding so new one will be allocated ..." << endl;
341  delete[] mybuff;
342  mybuff = NULL;
343  }
344  }
345 
346  // if mybuff is NULL then either the pool was empty, or the buffer
347  // we got from it was too small. Allocate a buffer of the correct size.
348  if(mybuff==NULL){
349  mybuff_len = mylen+1;
350  if(VERBOSE>3)cout << "HDET: Allocating buffer of length: " << mybuff_len << " words" <<endl;
351  mybuff = new uint32_t[mybuff_len];
352  if(mybuff==NULL){
353  err_mess << "HDET: Failed to allocate buffer of length " << mybuff_len << " words";
354  return (err_code=HDET_ALLOC_FAILED);
355  }
356  }
357 
358  // Copy event into "buff", byte swapping if needed.
359  // If no swapping is needed, we just copy it all over
360  // in one go.
361  if(VERBOSE>3 && swap_needed && !allow_swap) cout << "HDET: Swapping is needed, but user does not allow." << endl;
362  if( swap_needed && allow_swap ){
363  if(VERBOSE>3)cout << "HDET: swapping EVIO buffer ... " <<endl;
364  swap_bank(mybuff, &evio_block[idx], mylen+1);
365  }else{
366  if(VERBOSE>3)cout << "HDET: copying EVIO buffer without swapping ... " <<endl;
367  memcpy(mybuff, &evio_block[idx], (mylen+1)*4);
368  }
369 
370  // Add event to list
371  et_buffs.push_back( pair<uint32_t*,uint32_t>(mybuff, mybuff_len) );
372 
373  // Update pointer to next EVIO event in stack (if any)
374  idx += mylen+1;
375  }
376 
377  // bump index to next EVIO block
378  et_idx += idx;
379  if(VERBOSE>3)cout << "HDET: EVIO events found so far: " << et_buffs.size() << endl;
380  if(is_last_evio_block){
381  if(VERBOSE>3) cout << "HDET: Block flagged as last in ET event. Ignoring last " << (et_len/4 - et_idx) << " words" <<endl;
382  break;
383  }
384  }
385 
386  // Put ET event back since we're done with it
387  if(VERBOSE>5)cout << "HDET: returning ET event to system" << endl;
388  et_event_put(sys_id, att_id, pe);
389 
390  if(VERBOSE>3) cout << "HDET: Found " << et_buffs.size() << " events in the ET event stack." << endl;
391 
392  } // if( et_buffs.empty() )
393 
394  if(VERBOSE>3) cout << "HDET: number of et event buffers: " << et_buffs.size() << endl;
395 
396  // If we still have no events then something has gone wrong!
397  if( et_buffs.empty() ) return (err_code=HDET_ERROR);
398 
399  // recycle worker thread's old buffer to our pool
400  et_buff_pool.push_back(pair<uint32_t*, uint32_t>( buff, buff_len));
401 
402  // give our event buffer to worker thread
403  auto p = et_buffs.front();
404  et_buffs.pop_front();
405  buff = p.first;
406  buff_len = p.second;
407 
408  if(VERBOSE>9) DumpBinary(buff, &buff[buff_len], 256);
409 
410  return (err_code=HDET_OK);
411 #endif // HAVE_ET
412 }
413 
414 //------------------------
415 // PrintEVIOBlockHeader
416 //------------------------
417 void HDET::PrintEVIOBlockHeader(uint32_t *inbuff)
418 {
419  string swap_str = "(unknown, swapping bypassed)";
420  uint32_t magic = inbuff[7];
421  bool swap_needed = false;
422  switch(magic){
423  case 0xc0da0100: swap_str = "(without swapping)";
424  break;
425  case 0x0001dac0: swap_str = "(after swapping)";
426  swap_needed = true;
427  break;
428  }
429 
430  uint32_t buff[8];
431  if(swap_needed){
432  for(int i=0; i<8; i++) buff[i] = swap32(inbuff[i]);
433  }else{
434  for(int i=0; i<8; i++) buff[i] =inbuff[i];
435  }
436 
437  cout << endl;
438  cout << "EVIO Block Header: " << swap_str << endl;
439  cout << "------------------------" << endl;
440  cout << " Block Length: " << HexStr(buff[0]) << " (" << buff[0] << " words = " << (buff[0]>>(10-2)) << " kB)" << endl;
441  cout << " Block Number: " << HexStr(buff[1]) << endl;
442  cout << "Header Length: " << HexStr(buff[2]) << " (should be 0x00000008)" << endl;
443  cout << " Event Count: " << HexStr(buff[3]) << endl;
444  cout << " Reserved 1: " << HexStr(buff[4]) << endl;
445  cout << " Bit Info: " << HexStr(buff[5]>>8) << endl;
446  cout << " Version: " << HexStr(buff[5]&0xFF) << endl;
447  cout << " Reserved 3: " << HexStr(buff[6]) << endl;
448  cout << " Magic word: " << HexStr(buff[7]) << endl;
449 }
450 
451 //----------------
452 // PrintStats
453 //----------------
455 {
456  cout << endl;
457  cout << "ET Statistics for " << source_name << endl;
458  cout << "------------------------" << endl;
459  cout << " Net_events: " << Net_events << endl;
460  cout << " Nevio_blocks: " << Nevio_blocks << endl;
461  cout << " Nevio_events: " << Nevio_events << endl;
462  cout << " Net_timeouts: " << Net_timeouts << endl;
463  cout << endl;
464 }
465 
466 //----------------
467 // DumpBinary
468 //----------------
469 void HDET::DumpBinary(const uint32_t *iptr, const uint32_t *iend, uint32_t MaxWords, const uint32_t *imark)
470 {
471  /// This is used for debugging. It will print to the screen the words
472  /// starting at the address given by iptr and ending just before iend
473  /// or for MaxWords words, whichever comes first. If iend is NULL,
474  /// then MaxWords will be printed. If MaxWords is zero then it is ignored
475  /// and only iend is checked. If both iend==NULL and MaxWords==0, then
476  /// only the word at iptr is printed.
477 
478  cout << "HDET: Dumping binary: istart=" << hex << iptr << " iend=" << iend << " MaxWords=" << dec << MaxWords << endl;
479 
480  if(iend==NULL && MaxWords==0) MaxWords=1;
481  if(MaxWords==0) MaxWords = (uint32_t)0xffffffff;
482 
483  uint32_t Nwords=0;
484  while(iptr!=iend && Nwords<MaxWords){
485 
486  // line1 is hex and line2 is decimal
487  stringstream line1, line2;
488 
489  // print words in columns 8 words wide. First part is
490  // reserved for word number
491  uint32_t Ncols = 8;
492  line1 << setw(5) << Nwords;
493  line2 << string(5, ' ');
494 
495  // Loop over columns
496  for(uint32_t i=0; i<Ncols; i++, iptr++, Nwords++){
497 
498  if(iptr == iend) break;
499  if(Nwords>=MaxWords) break;
500 
501  stringstream iptr_hex;
502  iptr_hex << hex << "0x" << *iptr;
503 
504  string mark = (iptr==imark ? "*":" ");
505 
506  line1 << setw(12) << iptr_hex.str() << mark;
507  line2 << setw(12) << *iptr << mark;
508  }
509 
510  cout << line1.str() << endl;
511  cout << line2.str() << endl;
512  cout << endl;
513  }
514 }
515 
virtual ~HDET()
Definition: HDET.cc:209
uint32_t swap_bank(uint32_t *outbuff, uint32_t *inbuff, uint32_t len)
Definition: swap_bank.cc:15
uint64_t Nevio_blocks
Definition: HDET.h:56
const int Ncols
void PrintEVIOBlockHeader(uint32_t *buff)
Definition: HDET.cc:417
char str[256]
char string[256]
uint64_t Nevio_events
Definition: HDET.h:57
HDET(string source_name, int ET_STATION_NEVENTS=10, bool ET_STATION_CREATE_BLOCKING=false)
Definition: HDET.cc:22
uint32_t err_code
Definition: HDET.h:51
list< pair< uint32_t *, uint32_t > > et_buffs
Definition: HDET.h:70
void PrintStats(void)
Definition: HDET.cc:454
#define swap32(x)
Definition: HDEVIO.h:36
int VERBOSE
Definition: HDET.h:67
bool swap_needed
Definition: HDET.h:68
bool is_connected
Definition: HDET.h:49
stringstream err_mess
Definition: HDET.h:50
string source_name
Definition: HDET.h:48
list< pair< uint32_t *, uint32_t > > et_buff_pool
Definition: HDET.h:71
double Nevents
string HexStr(uint32_t v)
Definition: HDET.h:83
bool read(uint32_t *&buff, uint32_t &buff_len, bool allow_swap)
Definition: HDET.cc:223
void DumpBinary(const uint32_t *iptr, const uint32_t *iend, uint32_t MaxWords=0, const uint32_t *imark=NULL)
Definition: HDET.cc:469
uint64_t Net_timeouts
Definition: HDET.h:58
uint64_t Net_events
Definition: HDET.h:55