Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
async_filebuf.cc
Go to the documentation of this file.
1 //
2 // File: async_filebuf.cc
3 // Created: Wed May 1 09:22:00 EST 2019
4 // Creator: richard.t.jones at uconn.edu
5 //
6 // Require: --std=c++11 -pthread
7 //
8 
9 // This code does not compile on Mac OS X using
10 // Xcode 10.1 = Apple LLVM version 10.0.0 (clang-1000.10.44.4)
11 // 6/3/2019 DL
12 #ifndef __APPLE__
13 
14 #include <string>
15 #include <string.h>
16 #include <stdexcept>
17 
18 #include <async_filebuf.h>
19 
20 async_filebuf::async_filebuf(int segsize, int segcount, int lookback)
21  : segment_size(segsize),
22  segment_count(segcount),
23  segment_lookback(lookback),
24  readloop_active(0)
25 {
26 #if VERBOSE_ASYNC_FILEBUF
27  std::cout << THIS_ASYNCFB << "async_filebuf::async_filebuf(" << segsize << "," << segcount << "," << lookback << ")" << std::endl;
28 #endif
29  if (segment_count < segment_lookback + 2) {
30  std::string errmsg("async_filebuf error - insufficient"
31  " segment count for look-back.");
32  std::cerr << errmsg << std::endl;
33  throw std::range_error(errmsg);
34  }
35  int bufsize = segment_size * segment_count;
36  buffer = new char[bufsize];
37  setbuf(buffer, bufsize);
38 }
39 
41 {
42 #if VERBOSE_ASYNC_FILEBUF
43  std::cout << THIS_ASYNCFB << "async_filebuf::~async_filebuf()" << std::endl;
44 #endif
45  if (is_open())
46  close();
47  delete [] buffer;
48 }
49 
51 {
52 #if VERBOSE_ASYNC_FILEBUF
53  std::cout << THIS_ASYNCFB << "async_filebuf::readloop_initiate()" << std::endl;
54 #endif
55  if (readloop_active)
56  return -1;
57  segment_count = 0;
58  for (char *p = buffer_start; p < buffer_end; p += segment_size) {
59  segment_cond.push_back(sEmpty);
60  segment_pos.push_back(0);
61  segment_len.push_back(0);
62  ++segment_count;
63  }
64  segment_backstop = 0;
65  readloop_active = 1;
66  readloop_thread = new std::thread(&async_filebuf::readloop, this);
70  return 0;
71 }
72 
74 {
75 #if VERBOSE_ASYNC_FILEBUF
76  std::cout << THIS_ASYNCFB << "async_filebuf::readloop_terminate()" << std::endl;
77 #endif
78  //std::cout << "bang!!!" << std::endl;
79  if (readloop_active) {
80  std::streampos pos = getpos();
81  if (readloop_thread) {
82  std::unique_lock<std::mutex> lk(readloop_lock);
83  readloop_active = 0;
84  readloop_wake.notify_one();
85  }
86  readloop_thread->join();
87  delete readloop_thread;
88  readloop_thread = 0;
89  std::filebuf::seekpos(pos, std::ios::in);
93  segment_cond.clear();
94  segment_pos.clear();
95  segment_len.clear();
96  }
97  return 0;
98 }
99 
101 {
102 #if VERBOSE_ASYNC_FILEBUF
103  std::cout << THIS_ASYNCFB << "async_filebuf::readloop()" << std::endl;
104 #endif
105  int seg = 0;
106  while (readloop_active) {
107  std::unique_lock<std::mutex> lk(readloop_lock);
108  while (readloop_active && segment_cond[seg] != sEmpty) {
109  readloop_wake.wait(lk);
110  }
111  if (! readloop_active)
112  break;
113  segment_cond[seg] = sFilling;
114  lk.unlock();
115  char *sbase = buffer_start + seg * segment_size;
116  segment_pos[seg] = this->std::filebuf::seekoff(0, std::ios::cur, std::ios::in);
117  std::streamsize nreq = buffer_end - sbase;
118  nreq = (nreq > segment_size)? segment_size : nreq;
119  segment_len[seg] = std::filebuf::xsgetn(sbase, nreq);
120  lk.lock();
121  segment_cond[seg] = sFull;
122  readloop_woke.notify_one();
123  seg = (seg + 1) % segment_count;
124  }
125  return 0;
126 }
127 
129 {
130 #if VERBOSE_ASYNC_FILEBUF
131  std::cout << THIS_ASYNCFB << "async_filebuf::underflow()" << std::endl;
132 #endif
133  if (!readloop_active) {
134  if (segment_lookback > 0)
136  else
137  return std::filebuf::underflow();
138  }
139 
140  int seg = segment() % segment_count;
141  if (segoff() > 0)
142  seg = (seg + 1) % segment_count;
143  std::unique_lock<std::mutex> lk(readloop_lock);
144  while (segment_cond[seg] != sFull) {
145  readloop_woke.wait(lk);
146  }
147  segment_cond[seg] = sEmptying;
148  if ((segment_backstop + segment_lookback + 1) % segment_count == seg) {
150  readloop_wake.notify_one();
152  }
156  if (segment_len[seg] == 0)
157  return EOF;
158  return (unsigned char)*buffer_gptr;
159 }
160 
161 std::streampos async_filebuf::seekoff(std::streamoff off, std::ios::seekdir way,
162  std::ios::openmode which)
163 {
164 #if VERBOSE_ASYNC_FILEBUF
165  std::cout << THIS_ASYNCFB << "async_filebuf::seekoff(" << off << "," << way << "," << which << ")" << std::endl;
166 #endif
167  if (readloop_active && segment_lookback > 0) {
168  if (way == std::ios::beg)
169  return seekpos(off, which);
170  else if (way == std::ios::cur)
171  return seekpos(getpos() + off, which);
172  else
174  }
175  return this->std::filebuf::seekoff(off, way, which);
176 }
177 
178 std::streampos async_filebuf::seekpos(std::streampos pos, std::ios::openmode which)
179 {
180 #if VERBOSE_ASYNC_FILEBUF
181  std::cout << THIS_ASYNCFB << "async_filebuf::seekpos(" << pos << "," << which << ")" << std::endl;
182 #endif
183  if (! readloop_active || segment_lookback == 0) {
184  return this->std::filebuf::seekpos(pos, which);
185  }
186  if (pos < std::streampos(0))
187  pos = std::streampos(0);
188  std::streampos curpos = getpos();
189  if (abs(pos - curpos) > segment_lookback * segment_size) {
190  if (readloop_active)
192  return this->std::filebuf::seekpos(pos, which);
193  }
194 
195  int seg = segment();
196  while (pos < segment_pos[seg]) {
197  segment_cond[seg] = sFull;
198  int prevseg = (seg == 0)? segment_count-1 : seg-1;
199  if (segment_cond[prevseg] != sEmptying) {
200  if (readloop_active)
202  return this->std::filebuf::seekpos(pos, which);
203  }
204  seg = prevseg;
205  }
206  while (pos >= segment_pos[seg] + segment_len[seg]) {
208  if (underflow() == EOF)
209  return std::streampos(std::streamoff(-1));
210  seg = (seg + 1) % segment_count;
211  }
212  int off = pos - segment_pos[seg];
214  buffer_gptr = buffer_eback + off;
216  return pos;
217 }
218 
219 std::streamsize async_filebuf::xsgetn(char* s, std::streamsize n)
220 {
221 #if VERBOSE_ASYNC_FILEBUF
222  std::cout << THIS_ASYNCFB << "async_filebuf::xsgetn(s," << n << ")" << std::endl;
223 #endif
224  if (segment_lookback == 0) {
225  return std::filebuf::xsgetn(s,n);
226  }
227 
228  std::streamsize nleft=n;
229  while (nleft > 0) {
230  int nbuf = buffer_egptr - buffer_gptr;
231  if (nbuf > 0) {
232  nbuf = (nbuf < nleft)? nbuf : nleft;
233  memcpy(s, buffer_gptr, nbuf);
234 #if VERBOSE_ASYNC_FILEBUF
235  std::cout << THIS_ASYNCFB << "memcpy(d, s, " << nbuf << ")" << std::endl;
236 #endif
237 #if SHADOW_DEBUG
238  shadow_ifs.seekg(getpos());
239  char *shadowbuf = new char[nbuf];
240  if (shadow_ifs.read(shadowbuf, nbuf) && shadow_ifs.gcount() == nbuf) {
241  for (int i=0; i<nbuf; ++i) {
242  if (shadowbuf[i] != buffer_gptr[i]) {
243  std::cerr << "Error in async_filebuf::xsgetn - "
244  "data read from buffer does not match "
245  "what reading directly from the file "
246  "gives at the same offset! Cannot continue."
247  << std::endl;
248  exit(6);
249  }
250  }
251  }
252  else {
253  std::cerr << "Error in async_filebuf::xsgetn - "
254  "error reading from the shadow ifstream input. "
255  "Cannot continue."
256  << std::endl;
257  exit(6);
258  }
259  delete [] shadowbuf;
260 #endif
261  s += nbuf;
262  buffer_gptr += nbuf;
263  nleft -= nbuf;
264  }
265  else if (underflow() == EOF) {
266  break;
267  }
268  }
269  return n - nleft;
270 }
271 
272 #else // __APPLE__
273 int async_filebuff_disable_for_mac_osx = 0; // symbol so compiled object isn't empty
274 #endif // __APPLE__
275 
std::vector< segment_state > segment_cond
std::streampos getpos()
virtual std::streampos seekpos(std::streampos pos, std::ios::openmode which)
char string[256]
int readloop_initiate()
std::condition_variable readloop_wake
char * buffer_eback
std::mutex readloop_lock
std::condition_variable readloop_woke
virtual int underflow()
std::thread * readloop_thread
char * buffer_egptr
async_filebuf(int segsize=1000000, int segcount=3, int lookback=1)
int readloop_terminate()
std::vector< std::streampos > segment_pos
async_filebuf * close()
Definition: async_filebuf.h:40
virtual std::streamsize xsgetn(char *s, std::streamsize n)
std::vector< std::streamsize > segment_len
char * buffer_gptr
#define THIS_ASYNCFB
Definition: async_filebuf.h:21
virtual std::streambuf * setbuf(char *s, std::streamsize n)
virtual ~async_filebuf()
virtual std::streampos seekoff(std::streamoff off, std::ios::seekdir way, std::ios::openmode which)
char * buffer_start