Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DEventSourceEventStore.cc
Go to the documentation of this file.
1 // $Id$
2 //
3 // File: DEventSourceEventStore.cc
4 // Creator: sdobbs
5 //
6 
7 #include <iostream>
8 #include <string>
9 #include <vector>
10 #include <sstream>
11 #include <algorithm>
12 #include <iterator>
13 #include <stdlib.h>
14 #include <limits>
15 
16 using namespace std;
17 
18 #include <DANA/DApplication.h>
19 #include <DANA/DStatusBits.h>
20 
21 #include <TRandom3.h>
22 
23 #include <HDDM/DEventSourceREST.h>
24 
25 #include "DEventSourceEventStore.h"
26 #include "DEventStoreEvent.h"
27 #include "DESSkimData.h"
28 
29 static string EventstoreQueryHelp() {
30  string str = "For more information, go to https://github.com/JeffersonLab/HDEventStore/wiki";
31  return str;
32 }
33 
34 // forward declarations
35 //class DEventSourceREST;
36 
37 // Various variables
38 static bool TEST_MODE = false;
39 
40 
41 //---------------------------------
42 // DEventSourceEventStore (Constructor)
43 //---------------------------------
44 DEventSourceEventStore::DEventSourceEventStore(const char* source_name):JEventSource(source_name)
45 {
46  // initialize data members
47  es_data_loaded = false;
48  event_source = NULL;
49  min_run = 0;
50  max_run = numeric_limits<int>::max(); // default to something ridiculously large
51  esdb_connection = "mysql://es_user@hallddb.jlab.org/EventStoreTMP"; // default to main JLab ES server
52  BASE_SKIM_INDEX = 20;
54 
55  // eventstore parameters
56  // for more information, see ...
57  //grade = "physics"; // REST physics data
58  //grade = "recon"; // REST physics data
59  grade = "recon-unchecked"; // DEBUG!!
60  timestamp = "21000501"; // use the latest data - this should never happen, anyway...
61  load_all_skims = true; // default to processing all skims
62  run_period_set = false;
63  run_range_set = false;
64 
65  // load run period mapping
66  // hardcode for now, this info should move to RCDB...
67  run_period_map["RunPeriod-2015-03"] = pair<int,int>(2607,3385);
68  run_period_map["RunPeriod-2016-02"] = pair<int,int>(10000,20000);
69 
70  // read in configurations
71  // priority: JANA command line -> environment variable -> default
72  if(getenv("EVENTSTORE_CONNECTION") != NULL)
73  esdb_connection = getenv("EVENTSTORE_CONNECTION");
74  gPARMS->SetDefaultParameter("ESDB:DB_CONNECTION", esdb_connection,
75  "Specification of EventStore DB connection.");
76 
77  int test_mode_flag = 0;
78  gPARMS->SetDefaultParameter("ESDB:TEST_MODE", test_mode_flag,
79  "Toggle test mode features");
80  if(test_mode_flag != 0) {
81  TEST_MODE = true;
82  if(gRandom == NULL)
83  gRandom = new TRandom3(0);
84  }
85 
86  // First, parse the eventsource query
87  // For details of the query format, see: <...>
88 
89  // Tokenize the query string
90  string es_query(source_name);
91  istringstream iss(es_query);
92  vector<string> tokens;
93  copy(istream_iterator<string>(iss),
94  istream_iterator<string>(),
95  back_inserter(tokens));
96 
97  ////////////////////////////////////////////////////////////
98 
99  // initialize database connection
100  if(esdb_connection.substr(0,8) == "mysql://") {
101  cout << "MySQL connection" << endl;
102  esdb = static_cast<DESDBProvider *>(new DESDBProviderMySQL(esdb_connection));
103  } else if(esdb_connection.substr(0,8) == "sqlite://") {
104  cout << "SQLite connection" << endl;
105  }
106 
107  // Connect to database
108  esdb->Open();
109 
110  ////////////////////////////////////////////////////////////
111  // parse the ES command
112  // Check query header
113  if( (tokens[0] != "eventstore") || tokens.size() < 3)
114  throw JException("Invalid ES query = " + es_query + "\n\n" + EventstoreQueryHelp() );
115 
116  if( (tokens[1] == "in") ) { // read data from the data base
117  timestamp = tokens[2]; // require a timestamp
118 
119  if(tokens.size() > 3) {
120  // parse the rest
121  size_t token_ind = 3;
122 
123  // the next argument is the data grade to use
124  grade = tokens[token_ind++];
125 
126  while(token_ind < tokens.size()) {
127  if(tokens[token_ind] == "runs") {
128  if(run_period_set)
129  throw JException("Cannot set run range and run period in the same command!");
130 
131  // make sure there's enough args
132  if(tokens.size() - token_ind < 3)
133  throw JException("Invalid ES query = " + es_query + "\n\n" + EventstoreQueryHelp() );
134 
135  min_run = atoi(tokens[token_ind+1].c_str()); // ERROR CHECK!!
136  max_run = atoi(tokens[token_ind+2].c_str()); // ERROR CHECK!!
137 
138  // sanity check
139  if(max_run < min_run) {
140  throw JException("Maximum run must be larger than minimum run!");
141  }
142  token_ind += 3;
143  } else if(tokens[token_ind] == "run_period") {
144  if(run_range_set)
145  throw JException("Cannot set run range and run period in the same command!");
146 
147  // make sure there's enough args
148  if(tokens.size() - token_ind < 2)
149  throw JException("Invalid ES query = " + es_query + "\n\n" + EventstoreQueryHelp() );
150 
151  map< string, pair<int,int> >::iterator run_period_itr = run_period_map.find(tokens[token_ind+1]);
152  if(run_period_itr == run_period_map.end()) {
153  // a bad run period was specified...
154  PrintRunPeriods();
155  throw JException("Invalid ES query = " + es_query + "\n");
156  }
157  min_run = run_period_itr->second.first;
158  max_run = run_period_itr->second.second;
159  token_ind += 2;
160  } else if(tokens[token_ind] == "skims") {
161  load_all_skims = false;
162  // for the skims command, assume the rest of the arguments are skim names
163  while(token_ind++ < tokens.size()) {
164  skim_list.push_back(tokens[token_ind]);
165  }
166 
167  // sanity check - don't allow a million skims!
168  if(MAX_SKIM_INDEX - BASE_SKIM_INDEX < int(skim_list.size())) {
169  throw JException("Too many skims specified!!");
170  }
171  } else {
172  // require a valid command
173  throw JException("Invalid ES query = " + es_query + "\n\n" + EventstoreQueryHelp() );
174  }
175  }
176 
177  // sanity check - make sure the grade exists!
178  vector<string> grades_in_db;
179  esdb->GetGrades(grades_in_db);
180 
181  vector<string>::iterator grade_itr = find(grades_in_db.begin(), grades_in_db.end(), grade);
182  if(grade_itr == grades_in_db.end()) {
183  jerr << "Could not find grade \'" << grade << "\' in DB!" << endl;
184  PrintGrades();
185  throw JException("Invalid ES query = " + es_query + "\n");
186  }
187  }
188 
189 
190  // debugging stuff
191  if(TEST_MODE) {
192  if(skim_list.size() == 0) {
193  skim_list.push_back("pi0");
194  skim_list.push_back("eta");
195  skim_list.push_back("rho");
196  skim_list.push_back("omega");
197  }
198  }
199  } else if( (tokens[1] == "info") ) { // query information from the DB
200  // runs <datestamp> [<grade> [<skim>] ] [runs <min> [<max>]]
201  // [<run info query>] prints available runs in DB that match criteria
202  //PrintRuns();
203 
204  // grades prints available grades in DB
205  if(tokens[2] == "grades") {
206  PrintGrades();
207 
208  // run_periods prints available run periods
209  } else if(tokens[2] == "run_periods") {
210  PrintRunPeriods();
211 
212  // skims <datestamp> <grade> print skims available for the grade
213  } else if(tokens[2] == "skims") {
214  if(tokens.size() < 5)
215  throw JException("Invalid query: skims <datestamp> <grade>" );
216 
217  PrintSkims(tokens[3], tokens[4]);
218  }
219  // actualDate <datestamp> <grade> print actual internal date used
220  //} else if(tokens[2] == "actualDate") {
221  // if(tokens.size() < 5)
222  // throw JException("Invalid query = actualDate <datestamp> <grade>" );
223  //
224  // PrintActualDate();
225  //}
226  // versions <datestamp> <grade> [-verbose]
227  // print run ranges versus versions
228  // use -verbose option for more details
229 
230  // We're just querying the database, so we can quit here
231  exit(0);
232  } else {
233  throw JException("Invalid ES query = " + es_query + "\n\n" + EventstoreQueryHelp() );
234  }
235 
236  ////////////////////////////////////////////////////////////
237  if(load_all_skims) {
238  // if the user didn't ask for specific skims, then load all of them
240  }
241  /*
242  if(TEST_MODE) // if we're testing, don't make any more checks
243  return;
244 
245  // load some data here
246 
247  // make sure we've found any files
248  */
249 }
250 
251 //---------------------------------
252 // ~DEventSourceEventStore (Destructor)
253 //---------------------------------
255 {
256  if (event_source != NULL)
257  delete event_source;
258 }
259 
260 //---------------------------------
261 // GetEvent
262 //---------------------------------
263 jerror_t DEventSourceEventStore::GetEvent(JEvent &event)
264 {
265 
266  // FOR DEBUGGING - EMIT EVENTS FOREVER
267  if(TEST_MODE) {
268  // output some fake event with skim information
269  event.SetEventNumber(1);
270  event.SetRunNumber(10000);
271  event.SetJEventSource(this);
272  //event.SetRef(NULL);
273  event.SetStatusBit(kSTATUS_FROM_FILE);
274  event.SetStatusBit(kSTATUS_PHYSICS_EVENT);
275 
276  DEventStoreEvent *the_es_event = new DEventStoreEvent();
277  event.SetRef(the_es_event);
278  for(int i=0; i<4; i++)
279  if(gRandom->Uniform() < 0.5)
280  the_es_event->Add_Skim(skim_list[i]);
281 
282  return NOERROR;
283  }
284 
285  // make sure the file is open
286  while(event_source == NULL) {
287  while(OpenNextFile() != NOERROR) {} // keep trying to open files until none are left
288  if(event_source == NULL)
289  return NO_MORE_EVENTS_IN_SOURCE;
290 
291  // skip to next event
292  jerror_t retval;
293  if( (retval = MoveToNextEvent()) != NOERROR)
294  return retval; // if we can't get to another event, then we're done
295 
296  // read the next event in
297  retval = event_source->GetEvent(event);
298  if(retval == NOERROR) {
299  // To store the skim and other EventStore information for the event
300  // we wrap the actual event data and store our information in the wrapper
301  DEventStoreEvent *the_es_event = new DEventStoreEvent();
302  the_es_event->Set_EventSource(event_source);
303  the_es_event->Set_SourceRef(event.GetRef()); // save the actual event data
304  event.SetRef(the_es_event);
305  event.SetStatusBit(kSTATUS_FROM_FILE);
306  event.SetStatusBit(kSTATUS_PHYSICS_EVENT);
307 
308  // tag event with skims
309  ;
310  } else if(retval == NO_MORE_EVENTS_IN_SOURCE) {
311  // if the source is empty, close the current one, then move to the next
312  delete event_source;
313  event_source = NULL;
314  } else { // if there'a another error, then pass it on...
315  return retval;
316  }
317  }
318 
319  return NOERROR;
320 }
321 
322 //---------------------------------
323 // FreeEvent
324 //---------------------------------
326 {
327  if(event_source != NULL)
328  event_source->FreeEvent(event);
329 }
330 
331 //---------------------------------
332 // GetObjects
333 //---------------------------------
334 jerror_t DEventSourceEventStore::GetObjects(JEvent &event, JFactory_base *factory)
335 {
336  /// This gets called through the virtual method of the
337  /// JEventSource base class. It creates the objects of the type
338  /// on which factory is based.
339 
340  // We must have a factory to hold the data
341  if(!factory) throw RESOURCE_UNAVAILABLE;
342 
343  // return meta-EventStore information
344  string dataClassName = factory->GetDataClassName();
345 
346  if (dataClassName =="DESSkimData") {
347  JFactory<DESSkimData> *essd_factory = dynamic_cast<JFactory<DESSkimData>*>(factory);
348 
349  DEventStoreEvent *the_es_event = static_cast<DEventStoreEvent *>(event.GetRef());
350  DESSkimData *skim_data = new DESSkimData(the_es_event->Get_Skims(), skim_list);
351 
352  vector<DESSkimData*> skim_data_vec(1, skim_data);
353  essd_factory->CopyTo(skim_data_vec);
354 
355  return NOERROR;
356  }
357 
358  if(!event_source) throw RESOURCE_UNAVAILABLE;
359 
360  // See GetEvent() for the motivation for this
361  // Unwrap the event...
362  DEventStoreEvent *the_es_event = static_cast<DEventStoreEvent *>(event.GetRef());
363  event.SetRef(the_es_event->Get_SourceRef());
364  // ..now grab the objects...
365  jerror_t retval = event_source->GetObjects(event, factory);
366  // ...and wrap it back up
367  event.SetRef(the_es_event);
368 
369  return retval;
370 }
371 
372 //---------------------------------
373 // MoveToNextEvent
374 //---------------------------------
376 {
377  // if we're loading all of the skims, then we don't need to skip any events
378  if(load_all_skims)
379  return NOERROR;
380 
381  return NOERROR;
382 }
383 
384 //---------------------------------
385 // OpenNextFile
386 //---------------------------------
388 {
389  if(!es_data_loaded) {
390  // LoadESData();
391 
392  // sanity check
393  if(data_files.size() == 0) {
394  jerr << "Could not load any files from EventStore!" << endl;
395  return NO_MORE_EVENT_SOURCES;
396  }
397 
398  // keep a pointer to the current file
399  current_file_itr = data_files.begin();
400 
401  es_data_loaded = true;
402  }
403 
404  // if there's a current file open, close it so that we don't leak memory
405  if(event_source != NULL) {
406  delete event_source;
407  event_source = NULL;
408  }
409 
410  //Get generators
411  vector<JEventSourceGenerator*> locEventSourceGenerators = japp->GetEventSourceGenerators();
412 
413  //Get event source
414  while( (event_source == NULL) && (current_file_itr != data_files.end()) ) {
415 
416  // Loop over JEventSourceGenerator objects and find the one
417  // (if any) that has the highest chance of being able to read
418  // this source. The return value of
419  // JEventSourceGenerator::CheckOpenable(source) is a liklihood that
420  // the named source can be read by the JEventSource objects
421  // created by the generator. In most cases, the liklihood will
422  // be either 0.0 or 1.0. In the case that 2 or more generators return
423  // equal liklihoods, the first one in the list will be used.
424  JEventSourceGenerator* locEventSourceGenerator = NULL;
425  double liklihood = 0.0;
426  string locFileName = current_file_itr->c_str();
427  for(unsigned int i=0; i<locEventSourceGenerators.size(); i++)
428  {
429  double my_liklihood = locEventSourceGenerators[i]->CheckOpenable(locFileName);
430  if(my_liklihood > liklihood)
431  {
432  liklihood = my_liklihood;
433  locEventSourceGenerator = locEventSourceGenerators[i];
434  }
435  }
436 
437  if(locEventSourceGenerator != NULL)
438  {
439  jout<<"Opening source \""<<locFileName<<"\" of type: "<<locEventSourceGenerator->Description()<<endl;
440  event_source = locEventSourceGenerator->MakeJEventSource(locFileName);
441  }
442 
443  if(event_source == NULL){
444  jerr<<endl;
445  jerr<<" xxxxxxxxxxxx Unable to open event source \""<<locFileName<<"\"! xxxxxxxxxxxx"<<endl;
446  }
447 
449  }
450 
451  // error check
452  if(event_source == NULL)
453  return NO_MORE_EVENT_SOURCES;
454  else
455  return NOERROR;
456 }
457 
458 //---------------------------------
459 // PrintGrades
460 //---------------------------------
462 {
463  vector<string> grades;
464  esdb->GetGrades(grades);
465 
466  // print out information
467  cout << endl << "Available grades:" << endl;
468  for(vector<string>::iterator it = grades.begin();
469  it != grades.end(); it++)
470  cout << " " << *it << endl;
471  cout << endl;
472 }
473 
474 //---------------------------------
475 // PrintRunPeriods
476 //---------------------------------
478 {
479  vector<string> grades;
480  esdb->GetGrades(grades);
481 
482  // print out information
483  cout << endl << "Available Run Periods:" << endl;
484  for(map< string, pair<int,int> >::iterator it = run_period_map.begin();
485  it != run_period_map.end(); it++) {
486  pair<int,int> &the_run_range = it->second;
487  cout << " " << it->first << ": "
488  << the_run_range.first << " - " << the_run_range.second << endl;
489  }
490  cout << endl;
491 }
492 
493 //---------------------------------
494 // PrintSkims
495 //---------------------------------
496 void DEventSourceEventStore::PrintSkims(string datestamp, string grade)
497 {
498  vector<string> skims;
499  esdb->GetSkims(skims, datestamp, grade);
500 
501  // print out information
502  cout << endl << "Available skims for grade " << grade << ":" << endl;
503  for(vector<string>::iterator it = skims.begin();
504  it != skims.end(); it++)
505  cout << " " << *it << endl;
506  cout << endl;
507 }
508 
map< string, pair< int, int > > run_period_map
char str[256]
vector< string >::iterator current_file_itr
jerror_t GetObjects(JEvent &event, JFactory_base *factory)
void PrintSkims(string timestamp, string grade)
JApplication * japp
virtual bool GetSkims(vector< string > &grades, string timestamp, string grade)=0
virtual bool Open()=0
static string EventstoreQueryHelp()
jerror_t GetEvent(JEvent &event)
virtual bool GetGrades(vector< string > &grades)=0
void Set_EventSource(JEventSource *locEventSource)
void Set_SourceRef(void *locSourceRef)
void Add_Skim(string locSkim)
static bool TEST_MODE
DEventSourceEventStore(const char *source_name)
void * Get_SourceRef(void) const