001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 */ 018 019package org.apache.commons.exec; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.io.PipedOutputStream; 025 026import org.apache.commons.exec.util.DebugUtils; 027 028/** 029 * Copies standard output and error of sub-processes to standard output and error 030 * of the parent process. If output or error stream are set to null, any feedback 031 * from that stream will be lost. 032 * 033 * @version $Id: PumpStreamHandler.java 1557263 2014-01-10 21:18:09Z ggregory $ 034 */ 035public class PumpStreamHandler implements ExecuteStreamHandler { 036 037 private static final long STOP_TIMEOUT_ADDITION = 2000L; 038 039 private Thread outputThread; 040 041 private Thread errorThread; 042 043 private Thread inputThread; 044 045 private final OutputStream out; 046 047 private final OutputStream err; 048 049 private final InputStream input; 050 051 private InputStreamPumper inputStreamPumper; 052 053 /** the timeout in ms the implementation waits when stopping the pumper threads */ 054 private long stopTimeout; 055 056 /** the last exception being caught */ 057 private IOException caught = null; 058 059 /** 060 * Construct a new <CODE>PumpStreamHandler</CODE>. 061 */ 062 public PumpStreamHandler() { 063 this(System.out, System.err); 064 } 065 066 /** 067 * Construct a new <CODE>PumpStreamHandler</CODE>. 068 * 069 * @param outAndErr the output/error <CODE>OutputStream</CODE>. 070 */ 071 public PumpStreamHandler(final OutputStream outAndErr) { 072 this(outAndErr, outAndErr); 073 } 074 075 /** 076 * Construct a new <CODE>PumpStreamHandler</CODE>. 077 * 078 * @param out the output <CODE>OutputStream</CODE>. 079 * @param err the error <CODE>OutputStream</CODE>. 080 */ 081 public PumpStreamHandler(final OutputStream out, final OutputStream err) { 082 this(out, err, null); 083 } 084 085 /** 086 * Construct a new <CODE>PumpStreamHandler</CODE>. 087 * 088 * @param out the output <CODE>OutputStream</CODE>. 089 * @param err the error <CODE>OutputStream</CODE>. 090 * @param input the input <CODE>InputStream</CODE>. 091 */ 092 public PumpStreamHandler(final OutputStream out, final OutputStream err, final InputStream input) { 093 this.out = out; 094 this.err = err; 095 this.input = input; 096 } 097 098 /** 099 * Set maximum time to wait until output streams are exchausted 100 * when {@link #stop()} was called. 101 * 102 * @param timeout timeout in milliseconds or zero to wait forever (default) 103 */ 104 public void setStopTimeout(final long timeout) { 105 this.stopTimeout = timeout; 106 } 107 108 /** 109 * Set the <CODE>InputStream</CODE> from which to read the standard output 110 * of the process. 111 * 112 * @param is the <CODE>InputStream</CODE>. 113 */ 114 public void setProcessOutputStream(final InputStream is) { 115 if (out != null) { 116 createProcessOutputPump(is, out); 117 } 118 } 119 120 /** 121 * Set the <CODE>InputStream</CODE> from which to read the standard error 122 * of the process. 123 * 124 * @param is the <CODE>InputStream</CODE>. 125 */ 126 public void setProcessErrorStream(final InputStream is) { 127 if (err != null) { 128 createProcessErrorPump(is, err); 129 } 130 } 131 132 /** 133 * Set the <CODE>OutputStream</CODE> by means of which input can be sent 134 * to the process. 135 * 136 * @param os the <CODE>OutputStream</CODE>. 137 */ 138 public void setProcessInputStream(final OutputStream os) { 139 if (input != null) { 140 if (input == System.in) { 141 inputThread = createSystemInPump(input, os); 142 } else { 143 inputThread = createPump(input, os, true); 144 } 145 } else { 146 try { 147 os.close(); 148 } catch (final IOException e) { 149 final String msg = "Got exception while closing output stream"; 150 DebugUtils.handleException(msg, e); 151 } 152 } 153 } 154 155 /** 156 * Start the <CODE>Thread</CODE>s. 157 */ 158 public void start() { 159 if (outputThread != null) { 160 outputThread.start(); 161 } 162 if (errorThread != null) { 163 errorThread.start(); 164 } 165 if (inputThread != null) { 166 inputThread.start(); 167 } 168 } 169 170 /** 171 * Stop pumping the streams. When a timeout is specified it it is not guaranteed that the 172 * pumper threads are cleanly terminated. 173 */ 174 public void stop() throws IOException { 175 176 if (inputStreamPumper != null) { 177 inputStreamPumper.stopProcessing(); 178 } 179 180 stopThread(outputThread, stopTimeout); 181 stopThread(errorThread, stopTimeout); 182 stopThread(inputThread, stopTimeout); 183 184 if (err != null && err != out) { 185 try { 186 err.flush(); 187 } catch (final IOException e) { 188 final String msg = "Got exception while flushing the error stream : " + e.getMessage(); 189 DebugUtils.handleException(msg, e); 190 } 191 } 192 193 if (out != null) { 194 try { 195 out.flush(); 196 } catch (final IOException e) { 197 final String msg = "Got exception while flushing the output stream"; 198 DebugUtils.handleException(msg, e); 199 } 200 } 201 202 if (caught != null) { 203 throw caught; 204 } 205 } 206 207 /** 208 * Get the error stream. 209 * 210 * @return <CODE>OutputStream</CODE>. 211 */ 212 protected OutputStream getErr() { 213 return err; 214 } 215 216 /** 217 * Get the output stream. 218 * 219 * @return <CODE>OutputStream</CODE>. 220 */ 221 protected OutputStream getOut() { 222 return out; 223 } 224 225 /** 226 * Create the pump to handle process output. 227 * 228 * @param is the <CODE>InputStream</CODE>. 229 * @param os the <CODE>OutputStream</CODE>. 230 */ 231 protected void createProcessOutputPump(final InputStream is, final OutputStream os) { 232 outputThread = createPump(is, os); 233 } 234 235 /** 236 * Create the pump to handle error output. 237 * 238 * @param is the <CODE>InputStream</CODE>. 239 * @param os the <CODE>OutputStream</CODE>. 240 */ 241 protected void createProcessErrorPump(final InputStream is, final OutputStream os) { 242 errorThread = createPump(is, os); 243 } 244 245 /** 246 * Creates a stream pumper to copy the given input stream to the given 247 * output stream. When the 'os' is an PipedOutputStream we are closing 248 * 'os' afterwards to avoid an IOException ("Write end dead"). 249 * 250 * @param is the input stream to copy from 251 * @param os the output stream to copy into 252 * @return the stream pumper thread 253 */ 254 protected Thread createPump(final InputStream is, final OutputStream os) { 255 final boolean closeWhenExhausted = os instanceof PipedOutputStream ? true : false; 256 return createPump(is, os, closeWhenExhausted); 257 } 258 259 /** 260 * Creates a stream pumper to copy the given input stream to the given 261 * output stream. 262 * 263 * @param is the input stream to copy from 264 * @param os the output stream to copy into 265 * @param closeWhenExhausted close the output stream when the input stream is exhausted 266 * @return the stream pumper thread 267 */ 268 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) { 269 final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper"); 270 result.setDaemon(true); 271 return result; 272 } 273 274 /** 275 * Stopping a pumper thread. The implementation actually waits 276 * longer than specified in 'timeout' to detect if the timeout 277 * was indeed exceeded. If the timeout was exceeded an IOException 278 * is created to be thrown to the caller. 279 * 280 * @param thread the thread to be stopped 281 * @param timeout the time in ms to wait to join 282 */ 283 protected void stopThread(final Thread thread, final long timeout) { 284 285 if (thread != null) { 286 try { 287 if (timeout == 0) { 288 thread.join(); 289 } else { 290 final long timeToWait = timeout + STOP_TIMEOUT_ADDITION; 291 final long startTime = System.currentTimeMillis(); 292 thread.join(timeToWait); 293 if (!(System.currentTimeMillis() < startTime + timeToWait)) { 294 final String msg = "The stop timeout of " + timeout + " ms was exceeded"; 295 caught = new ExecuteException(msg, Executor.INVALID_EXITVALUE); 296 } 297 } 298 } catch (final InterruptedException e) { 299 thread.interrupt(); 300 } 301 } 302 } 303 304 /** 305 * Creates a stream pumper to copy the given input stream to the given 306 * output stream. 307 * 308 * @param is the System.in input stream to copy from 309 * @param os the output stream to copy into 310 * @return the stream pumper thread 311 */ 312 private Thread createSystemInPump(final InputStream is, final OutputStream os) { 313 inputStreamPumper = new InputStreamPumper(is, os); 314 final Thread result = new Thread(inputStreamPumper, "Exec Input Stream Pumper"); 315 result.setDaemon(true); 316 return result; 317 } 318}