Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
JEventProcessor_danaevio.cc
Go to the documentation of this file.
1 // JEventProcessor_danaevio.cc
2 //
3 //
4 // JANA event processor plugin writes out evio DOM tree to file OR sends
5 // events to receiver (e.g. event display) via TCP socket
6 //
7 //
8 // Implements JANA command-line parameters:
9 //
10 // EVIO:FILENAME output file name, default "dana_events.evio"
11 // use "socket" (lower case) to communicate with event display
12 // EVIO:HOST specify host for socket communications, default is "localhost"
13 // EVIO:PORT specify port for socket communications, default is 3309 (0xCED)
14 // EVIO:BUFSIZE serialized event internal buffer size, default 200000 words
15 // EVIO:SOCKETTRY number of times to try connecting to the socket
16 // EVIO:SOCKETWAIT number of seconds between tries
17 //
18 //
19 // dana_evio_dict.xml is corresponding evio2xml dictionary
20 //
21 // E.g. to run:
22 // $HALLD_RECON_HOME/bin/Linux_CentOS5-x86_64-gcc4.1.2/hd_ana --plugin=danaevio -PEVIO:DANAEVIO="all" ../Event.hddm
23 //
24 //
25 // Elliott Wolin, 19-Jul-2010
26 
27 
31 #include <danaevio/dana_evio_dict.h>
32 
33 
34 
35 // evio output file name, use EVIO:FILENAME command-line parameter to override
36 static string evioFileName = "dana_events.evio";
37 
38 
39 // for evio file output
40 static evioFileChannel *chan;
41 
42 
43 // evio TCP host and port for socket-based output
44 static bool evioIOAbort = false; // true if unrecoverable IO error
45 static string evioHost = "localhost";
46 static int evioPort = 0xCED; // 3309
47 static FILE* evioFILE = NULL;
48 static int evioSocket = 0;
49 static uint32_t socketHeader[3] = {0xCEBAF,1,0};
50 static uint32_t *socketBuffer;
51 static int evioSocketTry = 6;
52 static int evioSocketWait = 5;
53 
54 
55 // internal evio buffer size, use EVIO:BUFSIZE command-line parameter to override
56 static int evioBufSize=750000;
57 
58 
59 // mutex for serializing writing to file
60 static pthread_mutex_t evioMutex = PTHREAD_MUTEX_INITIALIZER;
61 
62 
63 // from Dave Heddle
64 static FILE *initSocket(const char *ipAddress, int port, int *sock);
65 
66 
67 // from JApplication
68 extern jana::JApplication *japp;
69 
70 
71 
72 //----------------------------------------------------------------------------
73 //----------------------------------------------------------------------------
74 
75 
77 
78 
79  jout << endl << " Default JEventProcessor_danaevio invoked" << endl << endl;
80 
81 
82  // check for EVIO:FILENAME output file name parameter
83  // if "socket" then output using TCP socket
84  gPARMS->SetDefaultParameter("EVIO:FILENAME",evioFileName);
85  jout << endl << " EVIO output file name is " << evioFileName << endl << endl;
86 
87 
88  // check for socket parameters
89  gPARMS->SetDefaultParameter("EVIO:HOST",evioHost);
90  gPARMS->SetDefaultParameter("EVIO:PORT",evioPort);
91  gPARMS->SetDefaultParameter("EVIO:SOCKETTRY",evioSocketTry);
92  gPARMS->SetDefaultParameter("EVIO:SOCKETWAIT",evioSocketWait);
93 
94 
95  // check for EVIO:BUFSIZE internal buffer size parameter
96  gPARMS->SetDefaultParameter("EVIO:BUFSIZE",evioBufSize);
97  jout << endl << " EVIO internal buf size is " << evioBufSize << endl << endl;
98  if(evioFileName=="socket") {
99  jout << endl << " EVIO TCP socket host is " << evioHost<< endl << endl;
100  jout << endl << " EVIO TCP socket port is " << evioPort<< endl << endl;
101  jout << endl << " EVIO TCP socket try is " << evioSocketTry << endl << endl;
102  jout << endl << " EVIO TCP socket wait is " << evioSocketWait << endl << endl;
103  }
104 
105 
106  // open file channel or TCP socket
107  if(evioFileName!="socket") {
108 
109  // file I/O
110  try {
111  chan = new evioFileChannel(evioFileName,"w",evioBufSize);
112  chan->open();
113 
114  } catch (evioException e) {
115  jerr << endl << " ?evioException in JEventProcessor_danaevio" << endl << endl
116  << e.toString() << endl;
117  japp->Quit();
118  evioIOAbort=true;
119 
120  } catch (...) {
121  jerr << endl << " ?unknown exception in JEventProcessor_danaevio, unable to open output file" << endl << endl;
122  japp->Quit();
123  evioIOAbort=true;
124  }
125 
126 
127  } else {
128 
129  // TCP socket I/O
130  // allocate buffer to hold serialized event
131  socketBuffer = new uint32_t[evioBufSize];
132 
133  // open socket
135  if(evioFILE==NULL) {
136  jerr << endl << " ?JEventProcessor_danaevio...unable to open socket" << endl << endl;
137  japp->Quit();
138  evioIOAbort=true;
139  return;
140  }
141  }
142 
143 }
144 
145 
146 //----------------------------------------------------------------------------
147 
148 
150 
151  if(evioIOAbort)return;
152 
153 
154  // close file or socket
155  if(evioFileName!="socket") {
156 
157  // file I/O
158  try {
159  chan->close();
160  delete(chan);
161 
162  } catch (evioException e) {
163  jerr << endl << " ?evioException in ~JEventProcessor_danaevio" << endl << endl
164  << e.toString() << endl;
165  } catch (...) {
166  jerr << endl << " ?unknown exception in ~JEventProcessor_danaevio, unable to close output file" << endl << endl;
167  }
168 
169  } else {
170 
171  // TCP socket I/O
172  if(evioFILE!=NULL) {
173  fflush(evioFILE);
174  fclose(evioFILE);
175  delete(socketBuffer);
176  }
177  }
178 
179 }
180 
181 
182 //----------------------------------------------------------------------------
183 
184 
185 jerror_t JEventProcessor_danaevio::brun(JEventLoop *eventLoop, int32_t runnumber) {
186 
187  static bool first_time = true;
188  unsigned int n;
189 
190 
191  // has file or socket open failed?
192  if(evioIOAbort)return(UNRECOVERABLE_ERROR);
193 
194 
195  // get write lock
196  pthread_mutex_lock(&evioMutex);
197 
198 
199  // create dictionary banks from DDANAEVIO factory tagMap<string, pair<uint16_t,uint8_t> >
200  // and write out as first event in file
201  if(first_time) {
202  first_time=false;
203 
204  try {
205  evioDOMTree tree(1,0);
206  evioDOMNodeP name = evioDOMNode::createEvioDOMNode<string> (1,1);
207  evioDOMNodeP tag = evioDOMNode::createEvioDOMNode<uint16_t> (1,2);
208  evioDOMNodeP num = evioDOMNode::createEvioDOMNode<uint8_t> (1,3);
209  tree << name << tag << num;
210 
211  const map< string, pair<uint16_t,uint8_t> > *theMap = DDANAEVIO_factory::getTagMapPointer();
212  map< string, pair<uint16_t,uint8_t> >::const_iterator iter;
213  for(iter=theMap->begin(); iter!=theMap->end(); iter++) {
214  *name << iter->first;
215  *tag << iter->second.first;
216  *num << iter->second.second;
217  }
218 
219  // file or socket I/O
220  if(evioFileName!="socket") {
221  chan->write(tree);
222 
223  } else {
224  tree.toEVIOBuffer(socketBuffer,evioBufSize);
225  socketHeader[2]=4*(socketBuffer[0]+1);
226  n = fwrite(socketHeader,sizeof(uint32_t),3,evioFILE);
227  n += fwrite(socketBuffer,sizeof(uint32_t),socketBuffer[0]+1,evioFILE);
228  if(n!=(3+socketBuffer[0]+1)) {
229  jerr << " ?JEventProcessor_danaevio::brun...unable to write to socket" << endl;
230  return(UNRECOVERABLE_ERROR);
231  }
232  fflush(evioFILE);
233  }
234 
235 
236 
237  } catch (evioException e) {
238  jerr << endl << " ?evioException in ~JEventProcessor_danaevio::brun, unable to write to file" << endl << endl
239  << e.toString() << endl;
240 
241  } catch (...) {
242  jerr << endl << " ?unknown exception in ~JEventProcessor_danaevio::brun" << endl << endl;
243  }
244  }
245 
246  // unlock
247  pthread_mutex_unlock(&evioMutex);
248 
249 
250  return(NOERROR);
251 }
252 
253 
254 //----------------------------------------------------------------------------
255 
256 
257 jerror_t JEventProcessor_danaevio::evnt(JEventLoop *eventLoop, uint64_t eventnumber) {
258 
259  unsigned int n;
260 
261 
262  // has file or socket open failed?
263  if(evioIOAbort)return(UNRECOVERABLE_ERROR);
264 
265 
266  // get evio trees
267  vector<const DDANAEVIODOMTree*> evioTrees;
268  eventLoop->Get(evioTrees);
269  if(evioTrees.size()<=0)return(NOERROR);
270 
271 
272  // get write lock
273  pthread_mutex_lock(&evioMutex);
274 
275 
276  // write out all evio trees
277  if(evioFileName!="socket") {
278 
279  // file I/O
280  for(unsigned int i=0; i<evioTrees.size(); i++) {
281  try {
282  chan->write(evioTrees[i]->tree);
283 
284  } catch (evioException e) {
285  jerr << endl << " ?evioException in JEventProcessor_danaevio::evnt" << endl << endl
286  << e.toString() << endl;
287  } catch (...) {
288  jerr << endl << " ?unknown exception in JEventProcessor_danaevio::evnt, unable to write to file" << endl << endl;
289  }
290  }
291 
292  } else {
293 
294  // socket I/o
295  for(unsigned int i=0; i<evioTrees.size(); i++) {
296  try {
297 
298  evioTrees[i]->tree.toEVIOBuffer(socketBuffer,evioBufSize);
299  socketHeader[2]=4*(socketBuffer[0]+1);
300  n = fwrite(socketHeader,sizeof(uint32_t),3,evioFILE);
301  n += fwrite(socketBuffer,sizeof(uint32_t),socketBuffer[0]+1,evioFILE);
302  if(n!=(3+socketBuffer[0]+1)) {
303  jerr << " ?JEventProcessor_danaevio::evnt...unable to write to socket" << endl;
304  return(UNRECOVERABLE_ERROR);
305  }
306  fflush(evioFILE);
307 
308  } catch (...) {
309  jerr << endl << " ?unknown exception in JEventProcessor_danaevio::evnt, unable to write to socket " << endl << endl;
310  }
311  }
312  }
313 
314 
315  // unlock
316  pthread_mutex_unlock(&evioMutex);
317 
318 
319  // done
320  return NOERROR;
321 }
322 
323 
324 //----------------------------------------------------------------------------
325 //----------------------------------------------------------------------------
326 
327 
328 // from Dave Heddle's note on CEDSocket
329 // ejw, 23-Jul-2010
330 
331 
332 //return a stream that wraps the socket for writing, or
333 //NULL if it fails for any reason. Upon return the reference
334 //for the socket will be in the variable sock
335 FILE *initSocket(const char *ipAddress, int port, int *sock) {
336 
337  // Create a stream socket using TCP and IPv4
338  *sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
339  if (*sock < 0) {
340  jerr << endl;
341  jerr << " ?initSocket...socket() failed";
342  jerr << endl;
343  return NULL;
344  }
345 
346 
347  // Construct the server address structure
348  struct sockaddr_in servAddr; // Server address
349  memset(&servAddr, 0, sizeof(servAddr)); // Zero out structure
350  servAddr.sin_family = AF_INET; // IPv4 address family
351 
352 
353  // get host entry using ascii host name
354  struct hostent *myHostEnt = gethostbyname(ipAddress);
355  if(myHostEnt==NULL) {
356  jerr << endl;
357  jerr << " ?initSocket...unable to gethostbyname()";
358  jerr << endl;
359  return NULL;
360  }
361 
362 
363  // Convert address to 4-byte form using ascii dotted-decimal form
364  struct in_addr **myList = (in_addr **)myHostEnt->h_addr_list;
365  int rtnVal = inet_pton(AF_INET, inet_ntoa(*myList[0]), &servAddr.sin_addr.s_addr);
366  if (rtnVal == 0) {
367  jerr << endl;
368  jerr << " ?initSocket...inet_pton() failed. Invalid address string";
369  jerr << endl;
370  return NULL;
371  }
372  else if (rtnVal < 0) {
373  jerr << endl;
374  jerr << " ?initSocket...inet_pton() failed";
375  jerr << endl;
376  return NULL;
377  }
378  servAddr.sin_port = htons(port); // Server port
379 
380 
381  // try a number of times to establish the connection to the server
382  int i=0;
383  while (true) {
384  i++;
385  if(connect(*sock, (struct sockaddr *) &servAddr, sizeof(servAddr))>=0) {
386  jout << "initSocket...connection successful on attempt " << i << endl;
387  break;
388 
389  } else if (i<evioSocketTry) {
390  jerr << " ...initSocket connection attempt " << i << " failed, trying again..." << endl;
391  sleep(evioSocketWait);
392  continue;
393 
394  } else {
395  jerr << endl;
396  jerr << " ?initSocket...connect() failed after " << evioSocketTry << " attempts" << endl;
397  jerr << endl;
398  return NULL;
399  }
400  }
401 
402 
403  //wrap the socket in an output stream
404  return fdopen(*sock, "w");
405 }
406 
407 
408 //----------------------------------------------------------------------------
409 //----------------------------------------------------------------------------
static bool evioIOAbort
static int evioSocketTry
static const map< string, pair< uint16_t, uint8_t > > * getTagMapPointer()
static pthread_mutex_t evioMutex
static FILE * initSocket(const char *ipAddress, int port, int *sock)
static uint32_t * socketBuffer
static int evioPort
JApplication * japp
TEllipse * e
static string evioFileName
static evioFileChannel * chan
static string evioHost
jerror_t brun(JEventLoop *eventLoop, int32_t runnumber)
jerror_t evnt(JEventLoop *eventLoop, uint64_t eventnumber)
static int evioSocket
static uint32_t socketHeader[3]
static int evioBufSize
static FILE * evioFILE
static int evioSocketWait