001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026
027import org.apache.commons.io.IOUtils;
028
029/**
030 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
031 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
032 * <p>
033 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
034 * </p>
035 * <p>
036 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
037 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
038 * be used.
039 * </p>
040 *
041 * @see MessageDigestCalculatingInputStream
042 */
043public class ObservableInputStream extends ProxyInputStream {
044
045    /**
046     * Abstracts observer callback for {@code ObservableInputStream}s.
047     */
048    public static abstract class Observer {
049
050        /**
051         * Called to indicate that the {@link ObservableInputStream} has been closed.
052         *
053         * @throws IOException if an I/O error occurs.
054         */
055        @SuppressWarnings("unused") // Possibly thrown from subclasses.
056        public void closed() throws IOException {
057            // noop
058        }
059
060        /**
061         * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
062         * been called, and are about to invoke data.
063         *
064         * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
065         * @param offset The offset within the byte array, where data has been stored.
066         * @param length The number of bytes, which have been stored in the byte array.
067         * @throws IOException if an I/O error occurs.
068         */
069        @SuppressWarnings("unused") // Possibly thrown from subclasses.
070        public void data(final byte[] buffer, final int offset, final int length) throws IOException {
071            // noop
072        }
073
074        /**
075         * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
076         * and will return a value.
077         *
078         * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
079         *        {@link #finished()} will be invoked instead.
080         * @throws IOException if an I/O error occurs.
081         */
082        @SuppressWarnings("unused") // Possibly thrown from subclasses.
083        public void data(final int value) throws IOException {
084            // noop
085        }
086
087        /**
088         * Called to indicate that an error occurred on the underlying stream.
089         *
090         * @param exception the exception to throw
091         * @throws IOException if an I/O error occurs.
092         */
093        public void error(final IOException exception) throws IOException {
094            throw exception;
095        }
096
097        /**
098         * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
099         * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
100         *
101         * @throws IOException if an I/O error occurs.
102         */
103        @SuppressWarnings("unused") // Possibly thrown from subclasses.
104        public void finished() throws IOException {
105            // noop
106        }
107    }
108
109    private final List<Observer> observers;
110
111    /**
112     * Creates a new ObservableInputStream for the given InputStream.
113     *
114     * @param inputStream the input stream to observe.
115     */
116    public ObservableInputStream(final InputStream inputStream) {
117        this(inputStream, new ArrayList<>());
118    }
119
120    /**
121     * Creates a new ObservableInputStream for the given InputStream.
122     *
123     * @param inputStream the input stream to observe.
124     * @param observers List of observer callbacks.
125     */
126    private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
127        super(inputStream);
128        this.observers = observers;
129    }
130
131    /**
132     * Creates a new ObservableInputStream for the given InputStream.
133     *
134     * @param inputStream the input stream to observe.
135     * @param observers List of observer callbacks.
136     * @since 2.9.0
137     */
138    public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
139        this(inputStream, Arrays.asList(observers));
140    }
141
142    /**
143     * Adds an Observer.
144     *
145     * @param observer the observer to add.
146     */
147    public void add(final Observer observer) {
148        observers.add(observer);
149    }
150
151    @Override
152    public void close() throws IOException {
153        IOException ioe = null;
154        try {
155            super.close();
156        } catch (final IOException e) {
157            ioe = e;
158        }
159        if (ioe == null) {
160            noteClosed();
161        } else {
162            noteError(ioe);
163        }
164    }
165
166    /**
167     * Reads all data from the underlying {@link InputStream}, while notifying the observers.
168     *
169     * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
170     */
171    public void consume() throws IOException {
172        final byte[] buffer = IOUtils.byteArray();
173        while (read(buffer) != EOF) {
174            // empty
175        }
176    }
177
178    /**
179     * Gets all currently registered observers.
180     *
181     * @return a list of the currently registered observers.
182     * @since 2.9.0
183     */
184    public List<Observer> getObservers() {
185        return observers;
186    }
187
188    /**
189     * Notifies the observers by invoking {@link Observer#finished()}.
190     *
191     * @throws IOException Some observer has thrown an exception, which is being passed down.
192     */
193    protected void noteClosed() throws IOException {
194        for (final Observer observer : getObservers()) {
195            observer.closed();
196        }
197    }
198
199    /**
200     * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
201     *
202     * @param value Passed to the observers.
203     * @throws IOException Some observer has thrown an exception, which is being passed down.
204     */
205    protected void noteDataByte(final int value) throws IOException {
206        for (final Observer observer : getObservers()) {
207            observer.data(value);
208        }
209    }
210
211    /**
212     * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
213     *
214     * @param buffer Passed to the observers.
215     * @param offset Passed to the observers.
216     * @param length Passed to the observers.
217     * @throws IOException Some observer has thrown an exception, which is being passed down.
218     */
219    protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
220        for (final Observer observer : getObservers()) {
221            observer.data(buffer, offset, length);
222        }
223    }
224
225    /**
226     * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
227     *
228     * @param exception Passed to the observers.
229     * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
230     *         exception, which has been passed as an argument.
231     */
232    protected void noteError(final IOException exception) throws IOException {
233        for (final Observer observer : getObservers()) {
234            observer.error(exception);
235        }
236    }
237
238    /**
239     * Notifies the observers by invoking {@link Observer#finished()}.
240     *
241     * @throws IOException Some observer has thrown an exception, which is being passed down.
242     */
243    protected void noteFinished() throws IOException {
244        for (final Observer observer : getObservers()) {
245            observer.finished();
246        }
247    }
248
249    private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
250        if (ioe != null) {
251            noteError(ioe);
252            throw ioe;
253        }
254        if (result == EOF) {
255            noteFinished();
256        } else if (result > 0) {
257            noteDataBytes(buffer, offset, result);
258        }
259    }
260
261    @Override
262    public int read() throws IOException {
263        int result = 0;
264        IOException ioe = null;
265        try {
266            result = super.read();
267        } catch (final IOException ex) {
268            ioe = ex;
269        }
270        if (ioe != null) {
271            noteError(ioe);
272            throw ioe;
273        }
274        if (result == EOF) {
275            noteFinished();
276        } else {
277            noteDataByte(result);
278        }
279        return result;
280    }
281
282    @Override
283    public int read(final byte[] buffer) throws IOException {
284        int result = 0;
285        IOException ioe = null;
286        try {
287            result = super.read(buffer);
288        } catch (final IOException ex) {
289            ioe = ex;
290        }
291        notify(buffer, 0, result, ioe);
292        return result;
293    }
294
295    @Override
296    public int read(final byte[] buffer, final int offset, final int length) throws IOException {
297        int result = 0;
298        IOException ioe = null;
299        try {
300            result = super.read(buffer, offset, length);
301        } catch (final IOException ex) {
302            ioe = ex;
303        }
304        notify(buffer, offset, result, ioe);
305        return result;
306    }
307
308    /**
309     * Removes an Observer.
310     *
311     * @param observer the observer to remove
312     */
313    public void remove(final Observer observer) {
314        observers.remove(observer);
315    }
316
317    /**
318     * Removes all Observers.
319     */
320    public void removeAllObservers() {
321        observers.clear();
322    }
323
324}