11 #include <stk_util/parallel/mpi_filebuf.hpp> 14 enum { buffer_default_length = 4096 };
15 enum { buffer_putback_length = 16 };
19 mpi_filebuf::mpi_filebuf()
21 comm( MPI_COMM_NULL ),
26 comm_buffer_len( buffer_default_length ),
30 mpi_filebuf::~mpi_filebuf()
37 mpi_filebuf * mpi_filebuf::set_buffer_length(
const size_t len )
40 if ( NULL != comm_buffer )
return (mpi_filebuf *) NULL ;
43 comm_buffer_len = buffer_putback_length < len ? len : buffer_putback_length ;
50 mpi_filebuf * mpi_filebuf::open(
51 MPI_Comm communicator ,
52 const int root_processor ,
53 const std::ios_base::openmode file_mode ,
54 const char *
const file_name )
59 if ( NULL != comm_buffer )
return (mpi_filebuf *) NULL ;
62 ( std::ios::in == file_mode ) ?
'r' : (
63 ( std::ios::out == file_mode ) ?
'w' : (
64 ( std::ios::app == file_mode ) ?
'a' : -1 ) );
73 data[0] = root_processor ;
75 data[2] = comm_buffer_len ;
77 if ( MPI_SUCCESS != ( err = MPI_Bcast(data,3,MPI_INT,0,communicator) ) )
78 MPI_Abort( communicator , err );
82 local = data[0] != root_processor || data[1] != mode || data[2] != (signed) comm_buffer_len ;
84 if ( MPI_SUCCESS != ( err =
85 MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) )
86 MPI_Abort( communicator , err );
90 return (mpi_filebuf *) NULL ;
98 if ( MPI_SUCCESS != ( err = MPI_Comm_rank( communicator , &rank ) ) )
99 MPI_Abort( communicator , err );
101 char *
const tmp_buf = (
char *) std::malloc( comm_buffer_len );
102 std::FILE * tmp_fp = NULL ;
104 local = tmp_buf == NULL ;
106 if ( root_processor == rank && ! local ) {
107 tmp_fp = std::fopen( file_name , ( ( ( mode ==
'r' ) ?
"r" :
108 ( mode ==
'w' ) ?
"w" :
"a" ) ) );
109 #ifdef REDSTORM_SETVBUF 111 if (std::setvbuf(tmp_fp, NULL, _IOFBF, 32768) != 0) {
117 local = NULL == tmp_fp ;
120 if ( MPI_SUCCESS != ( err =
121 MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) )
122 MPI_Abort( communicator , err );
125 if ( NULL != tmp_buf ) std::free( tmp_buf );
126 if ( NULL != tmp_fp ) std::fclose( tmp_fp );
128 return (mpi_filebuf *) NULL ;
135 comm = communicator ;
136 comm_root = root_processor ;
137 comm_root_fp = tmp_fp ;
138 comm_buffer = tmp_buf ;
139 comm_output = mode !=
'r' ;
143 if ( comm_output ) setp( comm_buffer, comm_buffer + comm_buffer_len );
152 mpi_filebuf * mpi_filebuf::close()
154 mpi_filebuf * tmp = NULL ;
156 if ( NULL != comm_buffer ) {
160 if ( NULL != comm_root_fp ) std::fclose( comm_root_fp );
162 std::free( comm_buffer );
164 if ( comm_output ) setp(NULL,NULL);
165 else setg(NULL,NULL,NULL);
169 comm = MPI_COMM_NULL ;
171 comm_root_fp = NULL ;
187 int mpi_filebuf::underflow()
191 if ( NULL != comm_buffer && ! comm_output &&
192 ( gptr() == NULL || gptr() >= egptr() ) ) {
198 const size_t size = comm_buffer_len - buffer_putback_length ;
199 char *
const buf = comm_buffer + buffer_putback_length ;
206 if ( NULL != comm_root_fp ) nread = std::fread(buf,1,size,comm_root_fp);
208 if ( MPI_SUCCESS != ( err =
209 MPI_Bcast( &nread, 1, MPI_INT, comm_root, comm ) ) )
218 if ( MPI_SUCCESS != ( err =
219 MPI_Bcast( buf, nread, MPI_BYTE, comm_root, comm ) ) )
224 setg( comm_buffer, buf, buf + nread );
235 setg(NULL, NULL, NULL);
248 int mpi_filebuf::overflow(
int c )
250 if ( NULL != comm_buffer && comm_output ) {
253 char * cur_buffer = comm_buffer ;
254 size_t cur_offset = pptr() - cur_buffer ;
255 size_t cur_length = epptr() - cur_buffer ;
257 assert( cur_offset <= cur_length );
259 if ( NULL != comm_root_fp ) {
260 if ( std::fwrite(cur_buffer,1,cur_offset,comm_root_fp) != cur_offset ) {
265 else if ( cur_length <= cur_offset ) {
268 cur_buffer = (
char *) std::realloc( cur_buffer , cur_length *= 2 );
273 if ( NULL != cur_buffer ) {
275 comm_buffer = cur_buffer ;
277 setp( cur_buffer + cur_offset, cur_buffer + cur_length );
297 mpi_filebuf * mpi_filebuf::flush()
303 if ( NULL != comm_buffer && comm_output ) {
311 char * cur_buf = comm_buffer ;
312 unsigned int cur_len = pptr() - cur_buf ;
316 char * recv_buf = NULL ;
317 int * recv_len = NULL ;
318 int * recv_disp = NULL ;
330 if ( MPI_SUCCESS != ( err = MPI_Comm_size(comm,&nproc) ) )
331 MPI_Abort( comm , err );
333 recv_len = (
int*) std::malloc(
sizeof(
int) * nproc );
335 if ( NULL == recv_len ) MPI_Abort( comm , MPI_ERR_UNKNOWN );
337 for (
int j = 0 ; j < nproc ; ++j )
343 if ( MPI_SUCCESS != ( err =
344 MPI_Gather(&cur_len,1,MPI_INT,recv_len,1,MPI_INT,comm_root,comm)))
345 MPI_Abort( comm , err );
349 if ( NULL != comm_root_fp ) {
351 recv_len[ comm_root ] = 0 ;
355 if ( NULL == ( recv_disp = (
int*) std::malloc(
sizeof(
int) * (nproc + 1) ) ) )
362 for ( i = 0 ; i < nproc ; ++i )
363 recv_disp[i+1] = recv_disp[i] + recv_len[i] ;
365 if ( 0 < recv_disp[nproc] ) {
366 if ( NULL == ( recv_buf = (
char*) std::malloc( recv_disp[nproc] ) ) )
373 if ( -1 != result ) {
378 if ( std::fwrite(cur_buf,1,cur_len,comm_root_fp) != cur_len )
385 std::fflush( comm_root_fp );
390 if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm)))
391 MPI_Abort( comm , err );
397 if ( MPI_SUCCESS != ( err =
398 MPI_Gatherv(cur_buf, cur_len, MPI_BYTE,
399 recv_buf, recv_len, recv_disp, MPI_BYTE,
400 comm_root, comm ) ) )
401 MPI_Abort( comm , err );
405 if ( NULL != comm_root_fp ) {
409 for ( i = 1 ; i < nproc && 0 == result ; ++i ) {
410 const int j = ( i + comm_root ) % nproc ;
411 const unsigned int len = recv_len[j] ;
414 if ( std::fwrite(recv_buf+recv_disp[j],1,len,comm_root_fp) != len )
418 std::fflush( comm_root_fp );
423 if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm)))
424 MPI_Abort( comm , err );
426 else if ( 1 == result ) {
434 setp( comm_buffer , epptr() );
438 if ( NULL != recv_buf ) std::free( recv_buf );
439 if ( NULL != recv_len ) std::free( recv_len );
440 if ( NULL != recv_disp ) std::free( recv_disp );
445 return -1 == result ? (mpi_filebuf *) NULL :
this ;
450 int mpi_filebuf::sync()
454 if ( NULL != comm_root_fp ) {
458 char * cur_buf = comm_buffer ;
459 int cur_len = pptr() - cur_buf ;
461 if ( 0 < cur_len ) std::fwrite(cur_buf,1,cur_len,comm_root_fp);
463 std::fflush( comm_root_fp );
465 setp( comm_buffer , epptr() );
472 std::streambuf * mpi_filebuf::setbuf(
char * s , std::streamsize n )
double start_time()
Function start_time returns the start time of this application execution.