libROM  v1.0
Data-driven physical simulation library
ParallelBuffer.cpp
1 
11 // Description: A simple I/O stream class that intercepts output from an
12 // ostream and redirects the output as necessary for parallel
13 // I/O.
14 
15 #include "ParallelBuffer.h"
16 #include "Utilities.h"
17 
18 #include "mpi.h"
19 
20 #include <string>
21 #include <cstring>
22 #include <cstdio>
23 
24 namespace CAROM {
25 
26 const int ParallelBuffer::DEFAULT_BUFFER_SIZE = 128;
27 
28 /*
29  *************************************************************************
30  *
31  * Construct a parallel buffer object. The object will require further
32  * initialization to set up I/O streams and the prefix string.
33  *
34  *************************************************************************
35  */
37 {
38  int mpi_init;
39  MPI_Initialized(&mpi_init);
40  int rank;
41  if (mpi_init) {
42  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
43  }
44  else {
45  rank = 0;
46  }
47  d_prefix = "P=" + Utilities::processorToString(rank) + ":";
48  d_ostream = &std::cerr;
49  d_buffer = 0;
50  d_buffer_size = 0;
51  d_buffer_ptr = 0;
52 }
53 
54 /*
55  *************************************************************************
56  *
57  * The destructor deallocates internal data buffer. It does not modify
58  * the output streams.
59  *
60  *************************************************************************
61  */
63 {
64  if (d_buffer) {
65  delete[] d_buffer;
66  }
67 }
68 
69 /*
70  *************************************************************************
71  *
72  * Write a text string of the specified length to the output stream.
73  * Note that the string data is accumulated into the internal output
74  * buffer until an end-of-line is detected.
75  *
76  *************************************************************************
77  */
78 void
80  const std::string& text,
81  int length)
82 {
83  if (length > 0) {
84 
85  /*
86  * If we need to allocate the internal buffer, then do so.
87  */
88  if (!d_buffer) {
89  d_buffer = new char[DEFAULT_BUFFER_SIZE];
90  d_buffer_size = DEFAULT_BUFFER_SIZE;
91  d_buffer_ptr = 0;
92  }
93 
94  /*
95  * If the buffer pointer is zero, then prepend the prefix.
96  */
97  if (d_buffer_ptr == 0) {
98  copyToBuffer(d_prefix, static_cast<int>(d_prefix.length()));
99  }
100 
101  /*
102  * Search for an end-of-line in the string.
103  */
104 // const int eol_ptr = static_cast<int>(text.find('\n'));
105  int eol_ptr = 0;
106  for ( ; (eol_ptr < length) && (text[eol_ptr] != '\n'); eol_ptr++) {}
107 
108  /*
109  * If no end-of-line is found, copy the entire text string but do not
110  * output. Otherwise copy the text string through the end-of-line,
111  * output, and recurse with the remainder of the text string if there are
112  * more characters in it.
113  */
114  if (eol_ptr == length) {
115  copyToBuffer(text, length);
116  } else {
117  const int ncopy = eol_ptr + 1;
118  copyToBuffer(text, ncopy);
119  outputBuffer();
120  if (ncopy < length) {
121  outputString(text.substr(ncopy), length - ncopy);
122  }
123  }
124  }
125 }
126 
127 /*
128  *************************************************************************
129  *
130  * Synchronize the parallel buffer and write string data. This routine
131  * is called from streambuf.
132  *
133  *************************************************************************
134  */
135 int
137 {
138  const int n = static_cast<int>(pptr() - pbase());
139  if (n > 0) {
140  outputString(pbase(), n);
141  }
142  return 0;
143 }
144 
145 /*
146  *************************************************************************
147  *
148  * Write the specified number of characters into the output stream.
149  * This routine is called from streambuf. If this routine is not
150  * provided, then overflow() is called instead for each character.
151  *
152  * Note that this routine is not required; it only
153  * offers some efficiency over overflow().
154  *
155  *************************************************************************
156  */
157 #if !defined(__INTEL_COMPILER) && (defined(__GNUG__))
158 std::streamsize
159 ParallelBuffer::xsputn(
160  const std::string& text,
161  std::streamsize n)
162 {
163  sync();
164  if (n > 0) outputString(text, static_cast<int>(n));
165  return n;
166 }
167 #endif
168 
169 /*
170  *************************************************************************
171  *
172  * Write a single character into the parallel buffer. This routine is
173  * called from streambuf.
174  *
175  *************************************************************************
176  */
177 int
179  int ch)
180 {
181  const int n = static_cast<int>(pptr() - pbase());
182  if (n && sync()) {
183  return EOF;
184  }
185  if (ch != EOF) {
186  char character[2];
187  character[0] = (char)ch;
188  character[1] = 0;
189  outputString(character, 1);
190  }
191  pbump(-n);
192  return 0;
193 }
194 
195 /*
196  *************************************************************************
197  *
198  * Copy data from the text string into the internal output buffer.
199  * If the internal buffer is not large enough to hold all of the string
200  * data, then allocate a new internal buffer.
201  *
202  *************************************************************************
203  */
204 void
205 ParallelBuffer::copyToBuffer(
206  const std::string& text,
207  int length)
208 {
209  /*
210  * First check whether we need to increase the size of the buffer
211  */
212  if (d_buffer_ptr + length > d_buffer_size) {
213  int new_size;
214  if (d_buffer_ptr + length > 2 * d_buffer_size) {
215  new_size = d_buffer_ptr + length;
216  }
217  else {
218  new_size = 2 * d_buffer_size;
219  }
220  char* new_buffer = new char[new_size];
221 
222  if (d_buffer_ptr > 0) {
223  (void)strncpy(new_buffer, d_buffer, d_buffer_ptr);
224  }
225  delete[] d_buffer;
226 
227  d_buffer = new_buffer;
228  d_buffer_size = new_size;
229  }
230  CAROM_ASSERT(d_buffer_ptr + length <= d_buffer_size);
231 
232  /*
233  * Copy data from the input into the internal buffer and increment pointer
234  */
235  strncpy(d_buffer + d_buffer_ptr, text.c_str(), length);
236  d_buffer_ptr += length;
237 }
238 
239 /*
240  *************************************************************************
241  *
242  * Output buffered stream data to the active output streams and reset
243  * the buffer pointer to its empty state.
244  *
245  *************************************************************************
246  */
247 void
248 ParallelBuffer::outputBuffer()
249 {
250  if (d_buffer_ptr > 0) {
251  if (d_ostream) {
252  d_ostream->write(d_buffer, d_buffer_ptr);
253  d_ostream->flush();
254  }
255  d_buffer_ptr = 0;
256  }
257 }
258 
259 }
int overflow(int ch)
Write an overflow character into the parallel buffer (called from streambuf).
int sync()
Synchronize the parallel buffer (called from streambuf).
~ParallelBuffer()
Destructor.
ParallelBuffer()
Default constructor.
void outputString(const std::string &text)
Write a text string to the output stream.
static std::string processorToString(int processorID)
Converts a processor ID to a string. Use this to ensure same width is used when converting a processo...
Definition: Utilities.cpp:55