14 #include <JANA/JApplication.h>
15 #include <JANA/JParameterManager.h>
23 pthread_mutex_init(&output_deque_mutex, NULL);
24 pthread_mutex_init(&buff_pool_mutex,NULL);
30 events_written_to_output = 0;
31 blocks_written_to_output = 0;
32 ofs_debug_output= NULL;
34 MAX_OUTPUT_QUEUE_SIZE = 200;
35 MAX_OUTPUT_BUFFER_SIZE = 0;
37 NEVENTS_PER_BLOCK = 100;
44 string max_output_buffer =
"AUTO";
46 gPARMS->SetDefaultParameter(
"EVIOOUT:MAX_OUTPUT_QUEUE_SIZE" , MAX_OUTPUT_QUEUE_SIZE,
"Maximum number of events output queue can have before processing threads start blocking.");
47 gPARMS->SetDefaultParameter(
"EVIOOUT:MAX_OUTPUT_BUFFER_SIZE", max_output_buffer,
"Maximum number of words in output EVIO block. This may be overwritten by ET event size if writing to ET.");
48 gPARMS->SetDefaultParameter(
"EVIOOUT:MAX_HOLD_TIME", MAX_HOLD_TIME,
"Maximum time in seconds to keep events in buffer before flushing them. This is to prevent farm from witholding events from ER when running very slow trigger rates. This should not be set lesst than 2.");
49 gPARMS->SetDefaultParameter(
"EVIOOUT:NEVENTS_PER_BLOCK", NEVENTS_PER_BLOCK,
"Suggested number of events to write in single output block.");
50 gPARMS->SetDefaultParameter(
"EVIOOUT:DEBUG_FILES" , DEBUG_FILES,
"Write input and output debug files in addition to the standard output.");
53 if( max_output_buffer !=
"AUTO" ){
54 MAX_OUTPUT_BUFFER_SIZE = atoi(max_output_buffer.c_str());
62 if(sink_name.substr(0,3) ==
"ET:"){
65 ConnectToET(sink_name);
70 jout <<
" Opening EVIO output file \"" << sink_name <<
"\"" << endl;
71 evioout =
new ofstream(sink_name.c_str());
72 if(!evioout)
throw JException(
"Unable to create ofstream object for output EVIO file");
73 if(!evioout->is_open())
throw JException(
"Unable to open output EVIO file");
75 sink_type = kFileSink;
76 jout <<
"Opened file \"" << sink_name <<
"\" for writing EVIO events." << endl;
79 }
catch (exception &
e) {
82 jerr << e.what() << endl;
90 if( MAX_OUTPUT_BUFFER_SIZE == 0 ) MAX_OUTPUT_BUFFER_SIZE = 250*1024;
94 ofs_debug_output =
new ofstream(
"hdevio_debug_output.evio");
95 if( !ofs_debug_output->is_open() ){
96 jerr <<
"Unable to open \"hdevio_debug_output.evio\"!" << endl;
97 delete ofs_debug_output;
98 ofs_debug_output = NULL;
100 jout <<
"Opened \"hdevio_debug_output.evio\" for debug output" << endl;
122 pthread_mutex_lock(&buff_pool_mutex);
123 for(uint32_t i=0; i<buff_pool.size(); i++)
delete buff_pool[i];
125 pthread_mutex_unlock(&buff_pool_mutex);
129 deque< vector<uint32_t>* > my_output_deque;
130 FlushOutput(8, my_output_deque);
168 vector<string> fields;
169 string str = sink_name;
170 size_t startpos=0, endpos=0;
171 while((endpos = str.find(
":", startpos)) != str.npos){
172 size_t len = endpos-startpos;
173 fields.push_back(len==0 ?
"":str.substr(startpos, len));
176 if(startpos<str.length()) fields.push_back(str.substr(startpos, str.npos));
178 string session = fields.size()>1 ? fields[1]:
"";
179 string host = fields.size()>2 ? fields[2]:
"";
180 int port = fields.size()>3 ? atoi(fields[3].c_str()):0;
182 if(session ==
"") session =
"none";
183 string fname = session.at(0)==
'/' ? session:(
string(
"/tmp/et_sys_") + session);
186 jout <<
" Opening ET system:" << endl;
187 jout <<
" session: " << session << endl;
188 jout <<
" system file: " << fname << endl;
190 jout <<
" host: "<<host << endl;
191 if(port !=0) jout <<
" port: " << port << endl;
195 et_openconfig openconfig;
196 et_open_config_init(&openconfig);
198 if(host.find(
"239.")==0){
199 cout<<__FILE__<<
":"<<__LINE__<<
" Configuring output ET for multicast" << endl;
200 et_open_config_setcast(openconfig, ET_MULTICAST);
201 et_open_config_addmulticast(openconfig, host.c_str());
202 et_open_config_sethost(openconfig, ET_HOST_ANYWHERE);
203 et_open_config_setport(openconfig, port);
204 struct timespec tspec={5,5};
205 et_open_config_settimeout(openconfig, tspec);
206 et_open_config_setwait(openconfig, ET_OPEN_WAIT);
208 cout<<__FILE__<<
":"<<__LINE__<<
" Configuring output ET for direct connection" << endl;
209 et_open_config_setcast(openconfig, ET_DIRECT);
210 et_open_config_setmode(openconfig, ET_HOST_AS_LOCAL);
211 et_open_config_sethost(openconfig, host.c_str());
212 et_open_config_setport(openconfig, ET_BROADCAST_PORT);
213 if(port != 0) et_open_config_setserverport(openconfig, port);
216 int status = et_open(&sys_id,fname.c_str(),openconfig);
218 cout<<__FILE__<<
":"<<__LINE__<<
" Problem opening ET system"<<endl;
219 cout<< et_perror(status);
225 status=et_station_attach(sys_id, ET_GRANDCENTRAL, &att_id);
228 jerr <<
"Unable to attach to Grand Central station " << endl;
229 cout<< et_perror(status);
234 jout <<
"...now connected to ET system: " << fname
235 <<
", station: Grand Central " <<
" ( attach id=" << att_id <<
")" << endl;
240 et_system_geteventsize(sys_id, &eventsize);
241 uint32_t eventsize_words = (uint32_t)eventsize/4;
242 if( eventsize_words < MAX_OUTPUT_BUFFER_SIZE ){
243 jout<<
" Events in ET system are smaller than currently set max buffer size:"<<endl;
244 jout<<
" "<<eventsize_words<<
" < "<<MAX_OUTPUT_BUFFER_SIZE<<endl;
245 jout<<
" Setting MAX_OUTPUT_BUFFER_SIZE to "<<eventsize_words<<endl;
246 MAX_OUTPUT_BUFFER_SIZE = eventsize_words;
247 }
else if(MAX_OUTPUT_BUFFER_SIZE==0){
248 jout<<
" Auto-setting MAX_OUTPUT_BUFFER_SIZE to ET event size." << endl;
249 MAX_OUTPUT_BUFFER_SIZE = eventsize_words;
252 jout<<
" ET system event size in words:"<<eventsize_words<<
" MAX_OUTPUT_BUFFER_SIZE:"<<MAX_OUTPUT_BUFFER_SIZE<<endl;
256 jerr <<
"You are attempting to connect to an ET system using a binary that" <<endl;
257 jerr <<
"was compiled without ET support. Please reconfigure and recompile" <<endl;
258 jerr <<
"To get ET support." << endl;
289 time_t last_time = time(NULL);
293 time_t t = time(NULL);
296 pthread_mutex_lock(&output_deque_mutex);
304 deque< vector<uint32_t>* >::iterator it;
305 for(it=output_deque.begin(); it!=output_deque.end(); it++){
306 uint32_t N = (*it)->size();
307 if( (Nwords+N) > MAX_OUTPUT_BUFFER_SIZE)
break;
314 if( Nbuffs >= NEVENTS_PER_BLOCK )
break;
319 bool flush_event = Nbuffs < output_deque.size();
323 if(!flush_event) flush_event = output_deque.size() >= NEVENTS_PER_BLOCK;
327 if(!flush_event && !output_deque.empty()) flush_event = (t-last_time) >= MAX_HOLD_TIME;
332 pthread_mutex_unlock(&output_deque_mutex);
336 if(
japp &&
japp->GetQuittingStatus()) quit=
true;
342 if(flush_event && (Nbuffs==0))
348 deque< vector<uint32_t>* > my_output_deque(output_deque.begin(), output_deque.begin()+Nbuffs);
349 output_deque.erase(output_deque.begin(), output_deque.begin()+Nbuffs);
352 pthread_mutex_unlock(&output_deque_mutex);
355 FlushOutput(Nwords, my_output_deque);
362 pthread_mutex_lock(&output_deque_mutex);
363 if( !output_deque.empty() ){
365 deque< vector<uint32_t>* >::iterator it;
366 for(it=output_deque.begin(); it!=output_deque.end(); it++){
367 Nwords += (*it)->size();
369 FlushOutput(Nwords, output_deque);
371 pthread_mutex_unlock(&output_deque_mutex);
406 uint32_t bitinfo = (1<<9) + (1<<10);
407 output_block.reserve(Nwords);
408 output_block.resize(8);
409 output_block[0] = Nwords;
410 output_block[1] = ++blocks_written_to_output;
412 output_block[3] = my_output_deque.size();
414 output_block[5] = (bitinfo<<8) + 0x4;
416 output_block[7] = 0xc0da0100;
421 deque< vector<uint32_t>* >::iterator it;
422 for(it=my_output_deque.begin(); it!=my_output_deque.end(); it++){
423 vector<uint32_t> *buff = *it;
434 uint32_t istart = output_block.size();
435 uint32_t len = buff->size();
436 output_block.resize(istart + len);
437 uint32_t *inbuff = &(*buff)[0];
438 uint32_t *outbuff = &output_block[istart];
448 ReturnBufferToPool(buff);
452 uint32_t *buff = &output_block[0];
453 uint32_t buff_size_bytes = Nwords*
sizeof(uint32_t);
456 if( buff[0]*
sizeof(uint32_t) != buff_size_bytes){
457 jerr <<
"EVIO output block header length does not match buffer size! " << endl;
458 jerr <<
" buff[0]=" << buff[0] <<
" words (=" << buff[0]*
sizeof(uint32_t) <<
" bytes) != " << buff_size_bytes << endl;
459 throw JException(
"EVIO block header size corrupted");
466 for(uint32_t i=0; i<8; i++) buff[i] = tmpbuff[i];
469 if(sink_type == kETSink){
473 int status = et_event_new(sys_id, att_id, &pe, ET_SLEEP, NULL, buff_size_bytes);
475 jerr <<
"Unable to write new event to output (et_event_new returns "<<status<<
")!" << endl;
476 jerr <<
" buff_size_bytes = " << buff_size_bytes << endl;
477 jerr <<
"First few words in case you are trying to debug:" << endl;
478 for(
unsigned int j=0; j<3; j++){
480 for(
unsigned int i=0; i<5; i++){
481 sprintf(str,
" %08x", buff[i+j*5]);
490 et_event_getdata(pe, (
void**)&pdata);
491 memcpy((
char*)pdata, (
char*)buff, buff_size_bytes);
494 status = et_event_put(sys_id, att_id, pe);
498 }
else if(sink_type == kFileSink){
501 if(evioout) evioout->write((
const char*)buff, buff_size_bytes);
507 if(ofs_debug_output) ofs_debug_output->write((
const char*)buff, buff_size_bytes);
520 vector<uint32_t> *buffp = NULL;
522 pthread_mutex_lock(&buff_pool_mutex);
523 if(buff_pool.empty()){
524 buffp =
new vector<uint32_t>;
526 buffp = buff_pool.back();
527 buff_pool.pop_back();
529 pthread_mutex_unlock(&buff_pool_mutex);
531 if(buffp==NULL)
throw JException(
"Unable to get buffer from pool in JEventProcessor_L3proc");
543 pthread_mutex_lock(&buff_pool_mutex);
544 buff_pool.push_back(buff);
545 pthread_mutex_unlock(&buff_pool_mutex);
562 pthread_mutex_lock(&output_deque_mutex);
564 while(output_deque.size() >= MAX_OUTPUT_QUEUE_SIZE){
566 pthread_mutex_unlock(&output_deque_mutex);
573 pthread_mutex_lock(&output_deque_mutex);
577 output_deque.push_back(buff);
580 pthread_mutex_unlock(&output_deque_mutex);
void swap_block_out(uint16_t *inbuff, uint16_t len, uint16_t *outbuff)
void * HDEVIOOutputThread(void *evioout)
sprintf(text,"Post KinFit Cut")
void FlushOutput(uint32_t Nwords, deque< vector< uint32_t > * > &my_output_deque)
void AddBufferToOutput(vector< uint32_t > *buff)
void ConnectToET(string sink_name)
uint32_t swap_bank_out(uint32_t *outbuff, uint32_t *inbuff, uint32_t len)
vector< uint32_t > * GetBufferFromPool(void)
void ReturnBufferToPool(vector< uint32_t > *buff)
void * HDEVIOOutputThread(void)
HDEVIOWriter(string sink_name)