13 #include <forward_list>
21 #include <condition_variable>
26 using namespace std::chrono;
60 evioout.SetTag(
"--- EVIO ---: ");
104 gPARMS->SetDefaultParameter(
"EVIO:VERBOSE",
VERBOSE,
"Set verbosity level for processing and debugging statements while parsing. 0=no debugging messages. 10=all messages");
105 gPARMS->SetDefaultParameter(
"ET:VERBOSE",
VERBOSE_ET,
"Set verbosity level for processing and debugging statements while reading from ET. 0=no debugging messages. 10=all messages");
106 gPARMS->SetDefaultParameter(
"EVIO:NTHREADS",
NTHREADS,
"Set the number of worker threads to use for parsing the EVIO data");
107 gPARMS->SetDefaultParameter(
"EVIO:MAX_PARSED_EVENTS",
MAX_PARSED_EVENTS,
"Set maximum number of events to allow in EVIO parsed events queue");
108 gPARMS->SetDefaultParameter(
"EVIO:MAX_EVENT_RECYCLES",
MAX_EVENT_RECYCLES,
"Set maximum number of EVIO (i.e. block of) events a worker thread should process before pruning excess DParsedEvent objects from its pool");
109 gPARMS->SetDefaultParameter(
"EVIO:MAX_OBJECT_RECYCLES",
MAX_OBJECT_RECYCLES,
"Set maximum number of events a DParsedEvent should be used for before pruning excess hit objects from its pools");
110 gPARMS->SetDefaultParameter(
"EVIO:LOOP_FOREVER",
LOOP_FOREVER,
"If reading from EVIO file, keep re-opening file and re-reading events forever (only useful for debugging) If reading from ET, this is ignored.");
111 gPARMS->SetDefaultParameter(
"EVIO:RUN_NUMBER",
USER_RUN_NUMBER,
"User-supplied run number. Override run number from other sources with this.(will be ignored if set to zero)");
112 gPARMS->SetDefaultParameter(
"EVIO:ET_STATION_NEVENTS",
ET_STATION_NEVENTS,
"Number of events to use if we have to create the ET station. Ignored if station already exists.");
113 gPARMS->SetDefaultParameter(
"EVIO:ET_STATION_CREATE_BLOCKING",
ET_STATION_CREATE_BLOCKING,
"Set this to 0 to create station in non-blocking mode (default is to create it in blocking mode). Ignored if station already exists.");
114 gPARMS->SetDefaultParameter(
"EVIO:PRINT_STATS",
PRINT_STATS,
"Print some additional stats from event source when it's finished processing events");
116 gPARMS->SetDefaultParameter(
"EVIO:SWAP",
SWAP,
"Allow swapping automatic swapping. Turning this off should only be used for debugging.");
117 gPARMS->SetDefaultParameter(
"EVIO:LINK",
LINK,
"Link associated objects. Turning this off should only be used for debugging.");
118 gPARMS->SetDefaultParameter(
"EVIO:LINK_TRIGGERTIME",
LINK_TRIGGERTIME,
"Link D*TriggerTime associated objects. This is on by default and may be OK to turn off (but please check output if you do!)");
119 gPARMS->SetDefaultParameter(
"EVIO:LINK_BORCONFIG",
LINK_BORCONFIG,
"Link BORConfig associated objects. This is on by default. If turned off, it will break emulation (and possibly other things in the future).");
120 gPARMS->SetDefaultParameter(
"EVIO:LINK_CONFIG",
LINK_CONFIG,
"Link Config associated objects. This is on by default.");
122 gPARMS->SetDefaultParameter(
"EVIO:PARSE",
PARSE,
"Set this to 0 to disable parsing of event buffers and generation of any objects (for benchmarking/debugging)");
123 gPARMS->SetDefaultParameter(
"EVIO:PARSE_F250",
PARSE_F250,
"Set this to 0 to disable parsing of data from F250 ADC modules (for benchmarking/debugging)");
124 gPARMS->SetDefaultParameter(
"EVIO:PARSE_F125",
PARSE_F125,
"Set this to 0 to disable parsing of data from F125 ADC modules (for benchmarking/debugging)");
125 gPARMS->SetDefaultParameter(
"EVIO:PARSE_F1TDC",
PARSE_F1TDC,
"Set this to 0 to disable parsing of data from F1TDC modules (for benchmarking/debugging)");
126 gPARMS->SetDefaultParameter(
"EVIO:PARSE_CAEN1290TDC",
PARSE_CAEN1290TDC,
"Set this to 0 to disable parsing of data from CAEN 1290 TDC modules (for benchmarking/debugging)");
127 gPARMS->SetDefaultParameter(
"EVIO:PARSE_CONFIG",
PARSE_CONFIG,
"Set this to 0 to disable parsing of ROC configuration data in the data stream (for benchmarking/debugging)");
128 gPARMS->SetDefaultParameter(
"EVIO:PARSE_BOR",
PARSE_BOR,
"Set this to 0 to disable parsing of BOR events from the data stream (for benchmarking/debugging)");
129 gPARMS->SetDefaultParameter(
"EVIO:PARSE_EPICS",
PARSE_EPICS,
"Set this to 0 to disable parsing of EPICS events from the data stream (for benchmarking/debugging)");
130 gPARMS->SetDefaultParameter(
"EVIO:PARSE_EVENTTAG",
PARSE_EVENTTAG,
"Set this to 0 to disable parsing of event tag data in the data stream (for benchmarking/debugging)");
131 gPARMS->SetDefaultParameter(
"EVIO:PARSE_TRIGGER",
PARSE_TRIGGER,
"Set this to 0 to disable parsing of the built trigger bank from CODA (for benchmarking/debugging)");
132 gPARMS->SetDefaultParameter(
"EVIO:PARSE_SSP",
PARSE_SSP,
"Set this to 0 to disable parsing of the SSP (DIRC data) bank from CODA (for benchmarking/debugging)");
133 gPARMS->SetDefaultParameter(
"EVIO:APPLY_TRANSLATION_TABLE",
APPLY_TRANSLATION_TABLE,
"Apply the translation table to create DigiHits (you almost always want this on)");
134 gPARMS->SetDefaultParameter(
"EVIO:IGNORE_EMPTY_BOR",
IGNORE_EMPTY_BOR,
"Set to non-zero to continue processing data even if an empty BOR event is encountered.");
135 gPARMS->SetDefaultParameter(
"EVIO:TREAT_TRUNCATED_AS_ERROR",
TREAT_TRUNCATED_AS_ERROR,
"Set to non-zero to have a truncated EVIO file the JANA return code to non-zero indicating the program errored.");
137 gPARMS->SetDefaultParameter(
"EVIO:F250_EMULATION_MODE",
F250_EMULATION_MODE,
"Set f250 emulation mode. 0=no emulation, 1=always, 2=auto. Default is 2 (auto).");
138 gPARMS->SetDefaultParameter(
"EVIO:F125_EMULATION_MODE",
F125_EMULATION_MODE,
"Set f125 emulation mode. 0=no emulation, 1=always, 2=auto. Default is 2 (auto).");
141 "Comma separated list of systems to parse EVIO data for. "
142 "Default is empty string which means to parse all. System "
143 "names should be what is returned by DTranslationTable::DetectorName() .");
146 if(gPARMS->Exists(
"RECORD_CALL_STACK")) gPARMS->GetParameter(
"RECORD_CALL_STACK",
RECORD_CALL_STACK);
161 uint64_t run_number_seed = 0;
164 if(this->source_name.find(
"ET:") == 0){
167 if(
VERBOSE>0)
evioout <<
"Attempting to open \""<<this->source_name<<
"\" as ET (network) source..." <<endl;
172 throw JException(
"Failed to open ET system: " + this->source_name, __FILE__, __LINE__);
180 if(
VERBOSE>0)
evioout <<
"Attempting to open \""<<this->source_name<<
"\" as EVIO file..." <<endl;
185 throw JException(
"Failed to open EVIO file: " + this->source_name, __FILE__, __LINE__);
193 if(
VERBOSE>0)
evioout <<
"Success opening event source \"" << this->source_name <<
"\"!" <<endl;
227 cerr <<
"loading VERSION 3" << endl;
230 <<
" , Using v3 firmware as default ..." << endl;
237 tstart = high_resolution_clock::now();
268 if(
VERBOSE>0)
evioout <<
"Closing hdevio event source \"" << this->source_name <<
"\"" <<endl;
270 auto tdiff = duration_cast<duration<double>>(
tend -
tstart);
273 uint32_t NTHREADS_PROC = 1;
274 if(gPARMS->Exists(
"NTHREADS")) gPARMS->GetParameter(
"NTHREADS", NTHREADS_PROC);
282 char sdispatcher[256];
284 char sprocessor[256];
290 cout <<
" EVIO Processing rate = " << rate <<
" Hz" << endl;
291 cout << sdispatcher << endl;
292 cout << sparser << endl;
293 cout << sprocessor << endl;
327 bool allow_swap =
false;
328 uint64_t istreamorder = 0;
331 if(
japp->GetQuittingStatus())
break;
337 if(t->in_use)
continue;
343 this_thread::sleep_for(milliseconds(1));
349 uint32_t* &buff = thr->
buff;
352 bool swap_needed =
false;
363 buff =
new uint32_t[buff_len];
381 jout <<
"Missing EVIO file trailer in CDAQ file (ignoring..)" << endl;
385 bool ignore_error =
false;
398 hdet->
read(buff, buff_len, allow_swap);
400 static uint64_t ntimeouts=0;
403 if( ++ntimeouts>=2 ){
404 int ic = ntimeouts%4;
405 const char *
c[4] = {
"|",
"/",
"-",
"\\"};
406 if(ntimeouts==2) cout << endl;
407 cout <<
" ET stalled ... " << c[ic] <<
" \r";
417 if(ntimeouts>=2) cout << endl;
430 thr->
cv.notify_all();
440 this_thread::sleep_for(milliseconds(10));
456 for(
auto w : worker_threads ){
460 for(
auto pe : w->parsed_event_pool ) in_use |= pe->in_use;
463 this_thread::sleep_for(milliseconds(10));
469 worker_threads.clear();
471 tend = std::chrono::high_resolution_clock::now();
494 pthread_mutex_lock(&in_progress_mutex);
495 auto it = in_progess_events.find(Ncalls_to_GetEvent);
496 if( it != in_progess_events.end() )in_progess_events.erase(it);
497 pthread_mutex_unlock(&in_progress_mutex);
498 return NO_MORE_EVENTS_IN_SOURCE;
517 event.SetJEventSource(
this);
575 if(!factory)
throw RESOURCE_UNAVAILABLE;
578 string dataClassName = factory->GetDataClassName();
579 string tag = factory->Tag();
580 if(tag.length()!=0)
return OBJECT_NOT_AVAILABLE;
583 JEventLoop *loop =
event.GetJEventLoop();
584 vector<const DTranslationTable*> translationTables;
587 if(ttfac) ttfac->Get(translationTables);
606 for(
auto tt : translationTables){
607 tt->ApplyTranslationTable(loop);
623 for(
auto tt : translationTables){
624 if(isSuppliedType)
break;
625 isSuppliedType = tt->IsSuppliedType(dataClassName);
629 if( isSuppliedType ){
634 return OBJECT_NOT_AVAILABLE;
664 LinkModule(borptrs->vDf250BORConfig, pe->vDf250WindowRawData);
665 LinkModule(borptrs->vDf250BORConfig, pe->vDf250PulseIntegral);
668 LinkModule(borptrs->vDf125BORConfig, pe->vDf125WindowRawData);
669 LinkModule(borptrs->vDf125BORConfig, pe->vDf125PulseIntegral);
670 LinkModule(borptrs->vDf125BORConfig, pe->vDf125CDCPulse);
671 LinkModule(borptrs->vDf125BORConfig, pe->vDf125FDCPulse);
673 LinkModule(borptrs->vDF1TDCBORConfig, pe->vDF1TDCHit);
675 LinkModule(borptrs->vDCAEN1290TDCBORConfig, pe->vDCAEN1290TDCHit);
727 if(
VERBOSE>2)
evioout <<
" In JEventSource_EVIOpp::SearchFileForRunNumber() source_name=" << source_name <<
" ..." << endl;
729 uint32_t buff_len = 4000000;
730 uint32_t *buff =
new uint32_t[buff_len];
735 uint32_t *iptr = buff;
736 uint32_t *iend = &iptr[*iptr - 1];
737 if(
VERBOSE>2)
evioout <<
"Checking event with header= 0x" << hex << iptr[1] << dec << endl;
738 if(*iptr > 2048) iend = &iptr[2048];
739 bool has_timestamps =
false;
744 if( (*iptr & 0xff000f) == 0x600001){
745 if(
VERBOSE>2)
evioout <<
" Found EPICS header. Looking for HD:coda:daq:run_number ..." << endl;
746 const char *cptr = (
const char*)&iptr[1];
747 const char *cend = (
const char*)iend;
748 const char *needle =
"HD:coda:daq:run_number=";
751 if(!strncmp(cptr, needle, strlen(needle))){
753 uint64_t run_number_seed = atoi(&cptr[strlen(needle)]);
755 if(buff)
delete[] buff;
756 return run_number_seed;
763 if( (*iptr & 0xffffffff) == 0x00700E01)
continue;
766 if( (*iptr & 0xffffffff) == 0x00700e34){
768 uint32_t crate_len = iptr[0];
769 uint32_t crate_header = iptr[1];
770 uint32_t *iend_crate = &iptr[crate_len];
773 if( (crate_header>>16) == 0x71 ){
777 while(iptr<iend_crate){
778 uint32_t module_header = *iptr++;
779 uint32_t module_len = module_header&0xFFFF;
780 uint32_t modType = (module_header>>20)&0x1f;
783 uint64_t run_number_seed = iptr[0];
785 if(buff)
delete[] buff;
786 return run_number_seed;
788 iptr = &iptr[module_len];
797 bool not_in_this_buffer =
false;
806 not_in_this_buffer =
true;
810 has_timestamps =
true;
818 if(not_in_this_buffer)
break;
821 if( ((*iptr)&0x00FF0000) != 0x000A0000) { iptr--;
continue; }
822 uint32_t M = iptr[-3] & 0x000000FF;
823 if(
VERBOSE>2)
evioout <<
" ...(epic quest) Trigger bank " << (has_timestamps ?
"does":
"doesn't") <<
" have timestamps. Nevents in block M=" << M <<endl;
825 uint64_t *iptr64 = (uint64_t*)iptr;
828 if(
VERBOSE>3)
evioout <<
" ....(epic quest) Event num: " << event_num <<endl;
830 if(has_timestamps) iptr64 = &iptr64[M];
832 uint64_t run_number_seed = (*iptr64)>>32;
833 if(
VERBOSE>1)
evioout <<
" .. (epic quest) Found run number: " << run_number_seed <<endl;
836 if(buff)
delete[] buff;
837 return run_number_seed;
842 if(
VERBOSE>2)
evioout <<
" more than 500 events checked and no run number seen! abondoning search" << endl;
851 if(buff)
delete[] buff;
853 if(
VERBOSE>2)
evioout <<
" failed to find run number. Returning 0" << endl;
902 for(
auto wrd : pe->vDf250WindowRawData){
906 try{ wrd->GetSingle(cf250PulseTime); }
catch(...){}
907 try{ wrd->GetSingle(cf250PulsePedestal); }
catch(...){}
913 vector<Df250PulseTime*> em_pts;
914 vector<Df250PulsePedestal*> em_pps;
915 vector<Df250PulseIntegral*> em_pis;
924 if(!em_pts.empty() && f250PulseTime){
934 em_pts.erase(em_pts.begin());
937 if(!em_pps.empty() && f250PulsePedestal){
947 em_pps.erase(em_pps.begin());
950 pe->vDf250PulseTime.insert(pe->vDf250PulseTime.end(), em_pts.begin(), em_pts.end());
951 pe->vDf250PulsePedestal.insert(pe->vDf250PulsePedestal.end(), em_pps.begin(), em_pps.end());
952 pe->vDf250PulseIntegral.insert(pe->vDf250PulseIntegral.end(), em_pis.begin(), em_pis.end());
957 for(
auto wrd : pe->vDf250WindowRawData){
959 vector<const Df250PulseData*> cpdats;
960 try{ wrd->Get(cpdats); }
catch(...){}
962 vector<Df250PulseData*> pdats;
963 for(
auto cpdat : cpdats)
971 for(
auto pdat : pdats)
981 for(uint32_t i=cpdats.size(); i<pdats.size(); i++){
982 pe->vDf250PulseData.push_back(pdats[i]);
988 for(
auto wrd : pe->vDf250WindowRawData){
990 vector<const Df250PulseData*> cpdats;
991 try{ wrd->Get(cpdats); }
catch(...){}
993 vector<Df250PulseData*> pdats;
994 for(
auto cpdat : cpdats)
1002 for(
auto pdat : pdats)
1012 for(uint32_t i=cpdats.size(); i<pdats.size(); i++){
1013 pe->vDf250PulseData.push_back(pdats[i]);
1020 throw JException(ss.str());
1028 for(
auto p : pe->vDf250PulseTime_pool )
delete p;
1029 for(
auto p : pe->vDf250PulsePedestal_pool)
delete p;
1030 for(
auto p : pe->vDf250PulseIntegral_pool)
delete p;
1031 pe->vDf250PulseTime_pool.clear();
1032 pe->vDf250PulsePedestal_pool.clear();
1033 pe->vDf250PulseIntegral_pool.clear();
1048 for(
auto wrd : pe->vDf125WindowRawData){
1052 try{ wrd->GetSingle(cf125CDCPulse); }
catch(...){}
1053 try{ wrd->GetSingle(cf125FDCPulse); }
catch(...){}
1067 if(f125CDCPulse == NULL && ( wrd->rocid < 30 ) ){
1068 f125CDCPulse = pe->NEW_Df125CDCPulse();
1069 f125CDCPulse->
rocid = wrd->rocid;
1070 f125CDCPulse->
slot = wrd->slot;
1071 f125CDCPulse->
channel = wrd->channel;
1073 f125CDCPulse->AddAssociatedObject(wrd);
1075 else if(f125FDCPulse == NULL && ( wrd->rocid > 30 ) ){
1076 f125FDCPulse = pe->NEW_Df125FDCPulse();
1077 f125FDCPulse->
rocid = wrd->rocid;
1078 f125FDCPulse->
slot = wrd->slot;
1079 f125FDCPulse->
channel = wrd->channel;
1081 f125FDCPulse->AddAssociatedObject(wrd);
1086 if(f125CDCPulse!=NULL) f125CDCPulse->
emulated = 1;
1087 if(f125FDCPulse!=NULL) f125FDCPulse->
emulated = 1;
1107 vector<string> sourcetypes;
1123 JEventLoop::call_stack_t cs;
1124 cs.caller_name =
"<ignore>";
1128 cs.start_time = 0.0;
1130 cs.data_source = JEventLoop::DATA_FROM_SOURCE;
1131 loop->AddToCallStack(cs);
1145 JEventLoop::call_stack_t cs;
1146 cs.caller_name = caller;
1147 cs.callee_name = callee;
1148 cs.data_source = JEventLoop::DATA_FROM_SOURCE;
1149 loop->AddToCallStack(cs);
1150 cs.callee_name = cs.caller_name;
1151 cs.caller_name =
"<ignore>";
1152 cs.data_source = JEventLoop::DATA_FROM_FACTORY;
1153 loop->AddToCallStack(cs);
uint32_t pedestal_emulated
Calculated from raw data (when available)
bool TREAT_TRUNCATED_AS_ERROR
jerror_t GetObjects(jana::JEvent &event, jana::JFactory_base *factory)
bool sortf250pulsenumbers(const Df250PulseData *a, const Df250PulseData *b)
bool IsParsedDataType(string &classname) const
Df125EmulatorAlgorithm * f125Emulator
virtual void EmulateFirmware(const Df250WindowRawData *wrd, std::vector< Df250PulseTime * > &pt_objs, std::vector< Df250PulsePedestal * > &pp_objs, std::vector< Df250PulseIntegral * > &pi_objs)=0
void AddToCallStack(DParsedEvent *pe, JEventLoop *loop)
virtual const char * className(void)
void GetParsedDataTypes(vector< string > &classnames, bool include_all=false) const
void LinkBORassociations(DParsedEvent *pe)
sprintf(text,"Post KinFit Cut")
uint64_t event_status_bits
set< uint32_t > ROCIDS_TO_PARSE
void LinkModuleBORSamplesCopy(vector< T * > &a, vector< U * > &b)
bool ET_STATION_CREATE_BLOCKING
std::atomic< uint_fast64_t > NEVENTBUFF_STALLED
virtual void EmulateFirmware(const Df125WindowRawData *, Df125CDCPulse *, Df125FDCPulse *)=0
uint64_t SearchFileForRunNumber(void)
uint32_t pulse_peak
from Pulse Pedestal Data word
uint64_t MAX_EVENT_RECYCLES
bool emulated
true if made from Window Raw Data
vector< DEVIOWorkerThread * > worker_threads
uint32_t F250_EMULATION_VERSION
uint32_t MAX_PARSED_EVENTS
uint32_t quality_factor_emulated
Calculated from raw data if available.
bool APPLY_TRANSLATION_TABLE
JEventSource_EVIOpp(const char *source_name)
list< DParsedEvent * > parsed_events
list< DBORptrs * > borptrs_list
bool readNoFileBuff(uint32_t *user_buff, uint32_t user_buff_len, bool allow_swap=true)
uint32_t quality_factor
from Pulse Time Data word
bool IsNonEmptyDerivedDataType(string &classname) const
uint64_t MAX_EVENT_RECYCLES
uint64_t GetNWordsLeftInFile(void)
bool emulated
true if emulated values are copied to the main input
virtual ~JEventSource_EVIOpp()
void FreeEvent(jana::JEvent &event)
uint32_t time
from Pulse Time Data word
void EmulateDf125Firmware(DParsedEvent *pe)
uint32_t time_emulated
Calculated from raw data if available.
Df250EmulatorAlgorithm * f250Emulator
bool emulated
true if made from Window Raw Data
EVIOSourceType source_type
uint32_t F250_EMULATION_MODE
uint64_t MAX_OBJECT_RECYCLES
mutex PARSED_EVENTS_MUTEX
static void SetStatusBitDescriptions(jana::JApplication *japp)
uint32_t pulse_peak_emulated
Calculated from raw data (when available)
uint32_t F125_EMULATION_MODE
uint32_t pedestal
from Pulse Pedestal Data word
void CopyToFactories(JEventLoop *loop)
std::chrono::high_resolution_clock::time_point tend
void EmulateDf250Firmware(DParsedEvent *pe)
bool read(uint32_t *&buff, uint32_t &buff_len, bool allow_swap)
jerror_t GetEvent(jana::JEvent &event)
thread * dispatcher_thread
static void SetSystemsToParse(string systems, JEventSource *eventsource)
bool emulated
true if emulated values are copied to the main input
std::chrono::high_resolution_clock::time_point tstart
std::atomic< uint_fast64_t > NPARSER_STALLED
void AddSourceObjectsToCallStack(JEventLoop *loop, string className)
void AddEmulatedObjectsToCallStack(JEventLoop *loop, string caller, string callee)
uint32_t pulse_number
pulse number for this channel, this event starting from 0
condition_variable PARSED_EVENTS_CV
bool et_quit_next_timeout
void LinkModule(vector< T * > &a, vector< U * > &b)
uint64_t MAX_OBJECT_RECYCLES
std::atomic< uint_fast64_t > NDISPATCHER_STALLED
std::atomic< uint_fast64_t > NEVENTS_PROCESSED