Sierra Toolkit  Version of the Day
mpi_filebuf.cpp
1 
10 #include <cstdlib>
11 #include <stk_util/parallel/mpi_filebuf.hpp>
12 #include <assert.h>
13 
14 enum { buffer_default_length = 4096 };
15 enum { buffer_putback_length = 16 };
16 
17 /*--------------------------------------------------------------------*/
18 
19 mpi_filebuf::mpi_filebuf()
20  : std::streambuf(),
21  comm( MPI_COMM_NULL ),
22  comm_root( -1 ),
23  comm_root_fp( NULL ),
24  comm_output( 0 ),
25  comm_buffer( NULL ),
26  comm_buffer_len( buffer_default_length ),
27  comm_time(0.0)
28 {}
29 
30 mpi_filebuf::~mpi_filebuf()
31 {
32  close();
33 }
34 
35 /*--------------------------------------------------------------------*/
36 
37 mpi_filebuf * mpi_filebuf::set_buffer_length( const size_t len )
38 {
39  // If already open then abort
40  if ( NULL != comm_buffer ) return (mpi_filebuf *) NULL ;
41 
42  // Wait and verify upon the attempt to open
43  comm_buffer_len = buffer_putback_length < len ? len : buffer_putback_length ;
44 
45  return this ;
46 }
47 
48 /*--------------------------------------------------------------------*/
49 
50 mpi_filebuf * mpi_filebuf::open(
51  MPI_Comm communicator ,
52  const int root_processor ,
53  const std::ios_base::openmode file_mode ,
54  const char * const file_name )
55 {
56  const double start_time = MPI_Wtime();
57 
58  // If already open then abort
59  if ( NULL != comm_buffer ) return (mpi_filebuf *) NULL ;
60 
61  const int mode =
62  ( std::ios::in == file_mode ) ? 'r' : (
63  ( std::ios::out == file_mode ) ? 'w' : (
64  ( std::ios::app == file_mode ) ? 'a' : -1 ) );
65 
66  int err ;
67  int rank ;
68  int local, global ;
69  int data[3] ;
70 
71  // Broadcast the selected root processor and 'C' file mode
72 
73  data[0] = root_processor ;
74  data[1] = mode ;
75  data[2] = comm_buffer_len ;
76 
77  if ( MPI_SUCCESS != ( err = MPI_Bcast(data,3,MPI_INT,0,communicator) ) )
78  MPI_Abort( communicator , err );
79 
80  // Verify that all processors have the same root, mode, and buffer length:
81 
82  local = data[0] != root_processor || data[1] != mode || data[2] != (signed) comm_buffer_len ;
83 
84  if ( MPI_SUCCESS != ( err =
85  MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) )
86  MPI_Abort( communicator , err );
87 
88  if ( global ) {
89  comm_time += MPI_Wtime() - start_time ;
90  return (mpi_filebuf *) NULL ;
91  }
92 
93  //--------------------------------------------------------------------
94  // Root processor and mode are consistent.
95  // All processors try to allocate buffers and the
96  // root processor tries to open the file.
97 
98  if ( MPI_SUCCESS != ( err = MPI_Comm_rank( communicator , &rank ) ) )
99  MPI_Abort( communicator , err );
100 
101  char * const tmp_buf = (char *) std::malloc( comm_buffer_len );
102  std::FILE * tmp_fp = NULL ;
103 
104  local = tmp_buf == NULL ; // Failed allocation ?
105 
106  if ( root_processor == rank && ! local ) {
107  tmp_fp = std::fopen( file_name , ( ( ( mode == 'r' ) ? "r" :
108  ( mode == 'w' ) ? "w" : "a" ) ) );
109 #ifdef REDSTORM_SETVBUF
110  if (tmp_fp) {
111  if (std::setvbuf(tmp_fp, NULL, _IOFBF, 32768) != 0) {
112  std::fclose(tmp_fp);
113  tmp_fp = 0;
114  }
115  }
116 #endif
117  local = NULL == tmp_fp ;
118  }
119 
120  if ( MPI_SUCCESS != ( err =
121  MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) )
122  MPI_Abort( communicator , err );
123 
124  if ( global ) {
125  if ( NULL != tmp_buf ) std::free( tmp_buf ); // Deallocate
126  if ( NULL != tmp_fp ) std::fclose( tmp_fp ); // Close the file
127  comm_time += MPI_Wtime() - start_time ;
128  return (mpi_filebuf *) NULL ;
129  }
130 
131  //--------------------------------------------------------------------
132  // All memory allocated and root processor openned the file
133  // Update the internal members accordingly.
134 
135  comm = communicator ;
136  comm_root = root_processor ;
137  comm_root_fp = tmp_fp ;
138  comm_buffer = tmp_buf ;
139  comm_output = mode != 'r' ;
140 
141  // If output then set up put-buffer
142 
143  if ( comm_output ) setp( comm_buffer, comm_buffer + comm_buffer_len );
144 
145  comm_time += MPI_Wtime() - start_time ;
146 
147  return this ;
148 }
149 
150 /*--------------------------------------------------------------------*/
151 
152 mpi_filebuf * mpi_filebuf::close()
153 {
154  mpi_filebuf * tmp = NULL ;
155 
156  if ( NULL != comm_buffer ) {
157 
158  flush(); // Flush the buffers
159 
160  if ( NULL != comm_root_fp ) std::fclose( comm_root_fp ); // Close the file
161 
162  std::free( comm_buffer ); // Free the buffer
163 
164  if ( comm_output ) setp(NULL,NULL);
165  else setg(NULL,NULL,NULL);
166 
167  // Reset the members:
168 
169  comm = MPI_COMM_NULL ;
170  comm_root = -1 ;
171  comm_root_fp = NULL ;
172  comm_output = 0 ;
173  comm_buffer = NULL ;
174 
175  tmp = this ;
176  }
177 
178  return tmp ;
179 }
180 
181 /*--------------------------------------------------------------------*/
182 /* Underflow, a global call.
183  Read more data from the root processor's file and
184  broadcast it to all processors.
185 */
186 
187 int mpi_filebuf::underflow()
188 {
189  const double start_time = MPI_Wtime();
190 
191  if ( NULL != comm_buffer && ! comm_output && // Open for read
192  ( gptr() == NULL || gptr() >= egptr() ) ) { // valid get buffer
193 
194 
195  // Length of the buffer, consistent on all processors
196  // Entire buffer is offset to accomodate putbacks
197 
198  const size_t size = comm_buffer_len - buffer_putback_length ;
199  char * const buf = comm_buffer + buffer_putback_length ;
200 
201  int nread ;
202  int err ;
203 
204  // Root processor reads from the file and broadcasts the result
205 
206  if ( NULL != comm_root_fp ) nread = std::fread(buf,1,size,comm_root_fp);
207 
208  if ( MPI_SUCCESS != ( err =
209  MPI_Bcast( &nread, 1, MPI_INT, comm_root, comm ) ) )
210  MPI_Abort(comm,err);
211 
212  // If the read is successfull then update the get buffer pointers:
213 
214  if ( 0 < nread ) {
215 
216  // Broadcast the read buffer to all processors:
217 
218  if ( MPI_SUCCESS != ( err =
219  MPI_Bcast( buf, nread, MPI_BYTE, comm_root, comm ) ) )
220  MPI_Abort(comm,err);
221 
222  // Set the get buffer:
223 
224  setg( comm_buffer, buf, buf + nread );
225 
226  // Return the next character from the file:
227 
228  comm_time += MPI_Wtime() - start_time ;
229 
230  return *buf ;
231  }
232  }
233 
234  // Failed: set the get buffer to NULL and return EOF
235  setg(NULL, NULL, NULL);
236 
237  comm_time += MPI_Wtime() - start_time ;
238 
239  return EOF;
240 }
241 
242 /*--------------------------------------------------------------------*/
243 /* Overflow, a local call.
244  Output complete lines of data on the root processor.
245  Increase the buffer size on all other processors.
246 */
247 
248 int mpi_filebuf::overflow( int c )
249 {
250  if ( NULL != comm_buffer && comm_output ) { // open for write
251 
252  // Determine current offset and length:
253  char * cur_buffer = comm_buffer ;
254  size_t cur_offset = pptr() - cur_buffer ;
255  size_t cur_length = epptr() - cur_buffer ;
256 
257  assert( cur_offset <= cur_length /* detecting abuse by 'ostream' */ );
258 
259  if ( NULL != comm_root_fp ) {
260  if ( std::fwrite(cur_buffer,1,cur_offset,comm_root_fp) != cur_offset ) {
261  return EOF ; // Write failed
262  }
263  cur_offset = 0 ;
264  }
265  else if ( cur_length <= cur_offset ) {
266  // Not root processor, ran out of buffer space and
267  // cannot write so increase the buffer size:
268  cur_buffer = (char *) std::realloc( cur_buffer , cur_length *= 2 );
269  }
270 
271  // If buffer is still good then reset the put-buffer
272 
273  if ( NULL != cur_buffer ) {
274 
275  comm_buffer = cur_buffer ;
276 
277  setp( cur_buffer + cur_offset, cur_buffer + cur_length );
278 
279  if ( c != EOF ) {
280 
281  sputc(c);
282  return c;
283  }
284  else {
285  return 0;
286  }
287  }
288  }
289  return EOF ;
290 }
291 
292 /*--------------------------------------------------------------------*/
293 /* Send output buffers to root processor and
294  write them to the output file.
295 */
296 
297 mpi_filebuf * mpi_filebuf::flush()
298 {
299  const double start_time = MPI_Wtime();
300 
301  int result = -1 ; // Failure return value
302 
303  if ( NULL != comm_buffer && comm_output ) { // Open for write
304 
305  int err ;
306 
307  result = 0 ;
308 
309  // Determine the local length:
310 
311  char * cur_buf = comm_buffer ;
312  unsigned int cur_len = pptr() - cur_buf ;
313 
314  // Determine the global lengths
315 
316  char * recv_buf = NULL ;
317  int * recv_len = NULL ;
318  int * recv_disp = NULL ;
319 
320  int nproc = 0 ;
321 
322 
323 // if ( NULL != comm_root_fp ) {
324 
325 // It should no be neccessary to allocate recv_len on non-root
326 // nodes, but the MPI_Gatherv on Janus always accesses recv_len
327 // even on non-root processors which causes a segmentaion
328 // violation if recv_len is set to NULL.
329 
330  if ( MPI_SUCCESS != ( err = MPI_Comm_size(comm,&nproc) ) )
331  MPI_Abort( comm , err );
332 
333  recv_len = (int*) std::malloc( sizeof(int) * nproc );
334 
335  if ( NULL == recv_len ) MPI_Abort( comm , MPI_ERR_UNKNOWN );
336 
337  for (int j = 0 ; j < nproc ; ++j )
338  recv_len[j] = 0;
339 // }
340 
341  // Gather buffer lengths on the root processor
342 
343  if ( MPI_SUCCESS != ( err =
344  MPI_Gather(&cur_len,1,MPI_INT,recv_len,1,MPI_INT,comm_root,comm)))
345  MPI_Abort( comm , err );
346 
347  // Root processor must allocate enough buffer space:
348 
349  if ( NULL != comm_root_fp ) {
350 
351  recv_len[ comm_root ] = 0 ; // Don't send to self
352 
353  int i ;
354 
355  if ( NULL == ( recv_disp = (int*) std::malloc( sizeof(int) * (nproc + 1) ) ) )
356  result = -1 ;
357 
358  if ( 0 == result ) { // Allocation succeeded
359 
360  recv_disp[0] = 0 ;
361 
362  for ( i = 0 ; i < nproc ; ++i )
363  recv_disp[i+1] = recv_disp[i] + recv_len[i] ;
364 
365  if ( 0 < recv_disp[nproc] ) {
366  if ( NULL == ( recv_buf = (char*) std::malloc( recv_disp[nproc] ) ) )
367  result = -1 ;
368  }
369  else {
370  result = 1 ; // No need to gather!
371  }
372 
373  if ( -1 != result ) {
374 
375  // Write the root processor's buffer
376 
377  if ( 0 < cur_len ) {
378  if ( std::fwrite(cur_buf,1,cur_len,comm_root_fp) != cur_len )
379  result = -1 ; // Write failed
380 
381  cur_len = 0 ; // Wrote this buffer
382  }
383  }
384  }
385  std::fflush( comm_root_fp );
386  }
387 
388  // Root process broadcasts that all is well with the allocation
389 
390  if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm)))
391  MPI_Abort( comm , err );
392 
393  if ( 0 == result ) { // All-is-well, need to gather and write
394 
395  // Gather the buffers to the root processor
396 
397  if ( MPI_SUCCESS != ( err =
398  MPI_Gatherv(cur_buf, cur_len, MPI_BYTE,
399  recv_buf, recv_len, recv_disp, MPI_BYTE,
400  comm_root, comm ) ) )
401  MPI_Abort( comm , err );
402 
403  // Output the buffers, beginning with 'comm_root'
404 
405  if ( NULL != comm_root_fp ) {
406 
407  int i ;
408 
409  for ( i = 1 ; i < nproc && 0 == result ; ++i ) {
410  const int j = ( i + comm_root ) % nproc ;
411  const unsigned int len = recv_len[j] ;
412 
413  if ( 0 < len )
414  if ( std::fwrite(recv_buf+recv_disp[j],1,len,comm_root_fp) != len )
415  result = -1 ; // Write failed
416  }
417 
418  std::fflush( comm_root_fp );
419  }
420 
421  // Broadcast that the write succeeded
422 
423  if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm)))
424  MPI_Abort( comm , err );
425  }
426  else if ( 1 == result ) {
427  // Did not need to gather
428 
429  result = 0 ;
430  }
431 
432  // Reset the output buffer
433 
434  setp( comm_buffer , epptr() );
435 
436  // Clean up allocated memory
437 
438  if ( NULL != recv_buf ) std::free( recv_buf );
439  if ( NULL != recv_len ) std::free( recv_len );
440  if ( NULL != recv_disp ) std::free( recv_disp );
441  }
442 
443  comm_time += MPI_Wtime() - start_time ;
444 
445  return -1 == result ? (mpi_filebuf *) NULL : this ;
446 }
447 
448 /*--------------------------------------------------------------------*/
449 
450 int mpi_filebuf::sync()
451 {
452  // The root processor will push to file, all others ignore
453 
454  if ( NULL != comm_root_fp ) {
455 
456  // Determine the local length:
457 
458  char * cur_buf = comm_buffer ;
459  int cur_len = pptr() - cur_buf ;
460 
461  if ( 0 < cur_len ) std::fwrite(cur_buf,1,cur_len,comm_root_fp);
462 
463  std::fflush( comm_root_fp );
464 
465  setp( comm_buffer , epptr() );
466  }
467 
468  return 0 ;
469 }
470 
471 
472 std::streambuf * mpi_filebuf::setbuf( char * s , std::streamsize n )
473 {
474  return this ;
475 }
double start_time()
Function start_time returns the start time of this application execution.
Definition: Env.cpp:212