Hall-D Software  alpha
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
async_filebuf.h
Go to the documentation of this file.
1 //
2 // File: async_filebuf.h
3 // Created: Wed May 1 09:22:00 EST 2019
4 // Creator: richard.t.jones at uconn.edu
5 //
6 
7 #ifndef _ASYNC_FILEBUF_
8 #define _ASYNC_FILEBUF_
9 
10 #include <fstream>
11 #include <iostream>
12 #include <cstdio>
13 #include <vector>
14 #include <thread>
15 #include <mutex>
16 #include <condition_variable>
17 
18 //#define VERBOSE_ASYNC_FILEBUF 1
19 //#define SHADOW_DEBUG 1
20 
21 #define THIS_ASYNCFB "(" << (void*)this << ")"
22 
23 class async_filebuf : public std::filebuf {
24 
25  public:
26  async_filebuf(int segsize=1000000, int segcount=3, int lookback=1);
27  virtual ~async_filebuf();
28 
29  async_filebuf* open(const std::string fname, std::ios::openmode mode) {
30 #if VERBOSE_ASYNC_FILEBUF
31  std::cout << THIS_ASYNCFB << "async_filebuf::open(" << fname << "," << mode << ")" << std::endl;
32 #endif
33  std::filebuf::open(fname, mode);
34 #if SHADOW_DEBUG
35  shadow_ifs.open(fname, mode);
36 #endif
37  return this;
38  }
39 
41 #if VERBOSE_ASYNC_FILEBUF
42  std::cout << THIS_ASYNCFB << "async_filebuf::close()" << std::endl;
43 #endif
44  if (readloop_active)
47 #if SHADOW_DEBUG
48  shadow_ifs.close();
49 #endif
50  return this;
51  }
52 
53  std::streamsize in_avail() {
54 #if VERBOSE_ASYNC_FILEBUF
55  std::cout << THIS_ASYNCFB << "async_filebuf::in_avail()" << std::endl;
56 #endif
57  if (readloop_active)
58  return buffer_egptr - buffer_gptr;
59  return std::filebuf::in_avail();
60  }
61 
62  int snextc() {
63 #if VERBOSE_ASYNC_FILEBUF
64  std::cout << THIS_ASYNCFB << "async_filebuf::snextc()" << std::endl;
65 #endif
66  if (readloop_active) {
68  return (unsigned char)*(++buffer_gptr);
69  if (underflow() == EOF)
70  return EOF;
71  return snextc();
72  }
73  return std::filebuf::snextc();
74  }
75 
76  int sbumpc() {
77  if (readloop_active) {
79  return (unsigned char)*(buffer_gptr++);
80  if (underflow() == EOF)
81  return EOF;
82  return sbumpc();
83  }
84  return std::filebuf::sbumpc();
85  }
86 
87  int sgetc() {
88  if (readloop_active) {
90  return underflow();
91  else
92  return (unsigned char)*buffer_gptr;
93  }
94  return std::filebuf::sgetc();
95  }
96 
97  int sputbackc(int c) {
98  if (readloop_active) {
99  int seg = (buffer_gptr - buffer_start) / segment_size;
100  char *eback = buffer_start + seg * segment_size;
101  if (buffer_gptr > eback || c == (unsigned char)buffer_gptr[-1])
102  return (unsigned char)*(--buffer_gptr);
103  else
104  return pbackfail();
105  }
106  return std::filebuf::sputbackc(c);
107  }
108 
109  int sungetc() {
110  if (readloop_active) {
111  int seg = (buffer_gptr - buffer_start) / segment_size;
112  char *eback = buffer_start + seg * segment_size;
113  if (buffer_gptr > eback)
114  return (unsigned char)*(--buffer_gptr);
115  else
116  return pbackfail();
117  }
118  return std::filebuf::sungetc();
119  }
120 
121  protected:
122  int pbackfail(char c=EOF) {
123  if (readloop_active)
124  return EOF;
125  return std::filebuf::pbackfail(c);
126  }
127 
128  virtual std::streambuf* setbuf(char* s, std::streamsize n) {
129 #if VERBOSE_ASYNC_FILEBUF
130  std::cout << THIS_ASYNCFB << "async_filebuf::setbuf(s," << n << ")" << std::endl;
131 #endif
132  if (! readloop_active) {
133  buffer_start = s;
134  buffer_end = s + n;
138  }
139  return this;
140  }
141 
142  virtual std::streamsize showmanyc() {
143  if (readloop_active) {
144  if (buffer_gptr == buffer_egptr)
145  underflow();
146  return buffer_egptr - buffer_gptr;
147  }
148  return std::filebuf::showmanyc();
149  }
150 
151  virtual int uflow() {
152 #if VERBOSE_ASYNC_FILEBUF
153  std::cout << THIS_ASYNCFB << "async_filebuf::uflow()" << std::endl;
154 #endif
155  if (readloop_active) {
156  if (underflow() == EOF)
157  return EOF;
158  return (unsigned char)*buffer_gptr++;
159  }
160  return std::filebuf::uflow();
161  }
162 
163  virtual int underflow();
164 
165  virtual std::streamsize xsgetn(char* s, std::streamsize n);
166 
167  virtual std::streampos seekoff(std::streamoff off, std::ios::seekdir way,
168  std::ios::openmode which);
169 
170  virtual std::streampos seekpos(std::streampos pos, std::ios::openmode which);
171 
172  protected:
173  char *buffer;
175  char *buffer_end;
177  char *buffer_gptr;
179 
181  std::vector<segment_state> segment_cond;
182  std::vector<std::streampos> segment_pos;
183  std::vector<std::streamsize> segment_len;
189  std::mutex readloop_lock;
190  std::condition_variable readloop_wake;
191  std::condition_variable readloop_woke;
192  std::thread *readloop_thread;
193 
194 #if SHADOW_DEBUG
195  std::ifstream shadow_ifs;
196 #endif
197 
198  protected:
199  int readloop_initiate();
200  int readloop_terminate();
201  int readloop();
202 
204  int segoff() { return (buffer_gptr - buffer_start) % segment_size; }
205 
206  std::streampos getpos() {
207  if (! readloop_active)
208  return this->std::filebuf::seekoff(0, std::ios::cur, std::ios::in);
209  else if (buffer_gptr == buffer_egptr) {
210  if (segment_len[segment()] > 0)
211  underflow();
212  if (buffer_gptr == buffer_egptr)
213  return this->std::filebuf::seekoff(0, std::ios::cur, std::ios::in);
214  }
215  std::streampos pos = segment_pos[segment()] + std::streamsize(segoff());
216 #if VERBOSE_ASYNC_FILEBUF
217  std::cout << THIS_ASYNCFB << "async_filebuf::getpos() returns " << pos << " in segment " << segment() << std::endl;
218 #endif
219  return pos;
220  }
221 };
222 
223 #endif
async_filebuf * open(const std::string fname, std::ios::openmode mode)
Definition: async_filebuf.h:29
std::vector< segment_state > segment_cond
textOut open("gains.txt")
std::streampos getpos()
textOut close()
virtual std::streampos seekpos(std::streampos pos, std::ios::openmode which)
char string[256]
#define c
int readloop_initiate()
std::condition_variable readloop_wake
char * buffer_eback
std::mutex readloop_lock
std::condition_variable readloop_woke
virtual int underflow()
std::thread * readloop_thread
char * buffer_egptr
async_filebuf(int segsize=1000000, int segcount=3, int lookback=1)
int readloop_terminate()
std::vector< std::streampos > segment_pos
async_filebuf * close()
Definition: async_filebuf.h:40
std::streamsize in_avail()
Definition: async_filebuf.h:53
virtual int uflow()
virtual std::streamsize xsgetn(char *s, std::streamsize n)
int pbackfail(char c=EOF)
std::vector< std::streamsize > segment_len
char * buffer_gptr
int sputbackc(int c)
Definition: async_filebuf.h:97
#define THIS_ASYNCFB
Definition: async_filebuf.h:21
virtual std::streambuf * setbuf(char *s, std::streamsize n)
virtual std::streamsize showmanyc()
virtual ~async_filebuf()
virtual std::streampos seekoff(std::streamoff off, std::ios::seekdir way, std::ios::openmode which)
char * buffer_start