View Javadoc
1   /*
2    * Copyright (C) 2006 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static com.google.common.base.Preconditions.checkArgument;
20  import static com.google.common.base.Preconditions.checkNotNull;
21  import static com.google.common.base.Preconditions.checkState;
22  import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
23  import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
24  import static java.lang.Thread.currentThread;
25  import static java.util.Arrays.asList;
26  
27  import com.google.common.annotations.Beta;
28  import com.google.common.base.Function;
29  import com.google.common.base.Optional;
30  import com.google.common.base.Preconditions;
31  import com.google.common.collect.ImmutableCollection;
32  import com.google.common.collect.ImmutableList;
33  import com.google.common.collect.Lists;
34  import com.google.common.collect.Ordering;
35  import com.google.common.collect.Queues;
36  import com.google.common.collect.Sets;
37  
38  import java.lang.reflect.Constructor;
39  import java.lang.reflect.InvocationTargetException;
40  import java.lang.reflect.UndeclaredThrowableException;
41  import java.util.Arrays;
42  import java.util.Collections;
43  import java.util.List;
44  import java.util.Set;
45  import java.util.concurrent.CancellationException;
46  import java.util.concurrent.ConcurrentLinkedQueue;
47  import java.util.concurrent.CountDownLatch;
48  import java.util.concurrent.ExecutionException;
49  import java.util.concurrent.Executor;
50  import java.util.concurrent.Future;
51  import java.util.concurrent.TimeUnit;
52  import java.util.concurrent.TimeoutException;
53  import java.util.concurrent.atomic.AtomicInteger;
54  import java.util.logging.Level;
55  import java.util.logging.Logger;
56  
57  import javax.annotation.Nullable;
58  
59  /**
60   * Static utility methods pertaining to the {@link Future} interface.
61   *
62   * <p>Many of these methods use the {@link ListenableFuture} API; consult the
63   * Guava User Guide article on <a href=
64   * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
65   * {@code ListenableFuture}</a>.
66   *
67   * @author Kevin Bourrillion
68   * @author Nishant Thakkar
69   * @author Sven Mawson
70   * @since 1.0
71   */
72  @Beta
73  public final class Futures {
74    private Futures() {}
75  
76    /**
77     * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
78     * and a {@link Function} that maps from {@link Exception} instances into the
79     * appropriate checked type.
80     *
81     * <p>The given mapping function will be applied to an
82     * {@link InterruptedException}, a {@link CancellationException}, or an
83     * {@link ExecutionException}.
84     * See {@link Future#get()} for details on the exceptions thrown.
85     *
86     * @since 9.0 (source-compatible since 1.0)
87     */
88    public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
89        ListenableFuture<V> future, Function<Exception, X> mapper) {
90      return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
91    }
92  
93    private abstract static class ImmediateFuture<V>
94        implements ListenableFuture<V> {
95  
96      private static final Logger log =
97          Logger.getLogger(ImmediateFuture.class.getName());
98  
99      @Override
100     public void addListener(Runnable listener, Executor executor) {
101       checkNotNull(listener, "Runnable was null.");
102       checkNotNull(executor, "Executor was null.");
103       try {
104         executor.execute(listener);
105       } catch (RuntimeException e) {
106         // ListenableFuture's contract is that it will not throw unchecked
107         // exceptions, so log the bad runnable and/or executor and swallow it.
108         log.log(Level.SEVERE, "RuntimeException while executing runnable "
109             + listener + " with executor " + executor, e);
110       }
111     }
112 
113     @Override
114     public boolean cancel(boolean mayInterruptIfRunning) {
115       return false;
116     }
117 
118     @Override
119     public abstract V get() throws ExecutionException;
120 
121     @Override
122     public V get(long timeout, TimeUnit unit) throws ExecutionException {
123       checkNotNull(unit);
124       return get();
125     }
126 
127     @Override
128     public boolean isCancelled() {
129       return false;
130     }
131 
132     @Override
133     public boolean isDone() {
134       return true;
135     }
136   }
137 
138   private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
139 
140     @Nullable private final V value;
141 
142     ImmediateSuccessfulFuture(@Nullable V value) {
143       this.value = value;
144     }
145 
146     @Override
147     public V get() {
148       return value;
149     }
150   }
151 
152   private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
153       extends ImmediateFuture<V> implements CheckedFuture<V, X> {
154 
155     @Nullable private final V value;
156 
157     ImmediateSuccessfulCheckedFuture(@Nullable V value) {
158       this.value = value;
159     }
160 
161     @Override
162     public V get() {
163       return value;
164     }
165 
166     @Override
167     public V checkedGet() {
168       return value;
169     }
170 
171     @Override
172     public V checkedGet(long timeout, TimeUnit unit) {
173       checkNotNull(unit);
174       return value;
175     }
176   }
177 
178   private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
179 
180     private final Throwable thrown;
181 
182     ImmediateFailedFuture(Throwable thrown) {
183       this.thrown = thrown;
184     }
185 
186     @Override
187     public V get() throws ExecutionException {
188       throw new ExecutionException(thrown);
189     }
190   }
191 
192   private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
193 
194     private final CancellationException thrown;
195 
196     ImmediateCancelledFuture() {
197       this.thrown = new CancellationException("Immediate cancelled future.");
198     }
199 
200     @Override
201     public boolean isCancelled() {
202       return true;
203     }
204 
205     @Override
206     public V get() {
207       throw AbstractFuture.cancellationExceptionWithCause(
208           "Task was cancelled.", thrown);
209     }
210   }
211 
212   private static class ImmediateFailedCheckedFuture<V, X extends Exception>
213       extends ImmediateFuture<V> implements CheckedFuture<V, X> {
214 
215     private final X thrown;
216 
217     ImmediateFailedCheckedFuture(X thrown) {
218       this.thrown = thrown;
219     }
220 
221     @Override
222     public V get() throws ExecutionException {
223       throw new ExecutionException(thrown);
224     }
225 
226     @Override
227     public V checkedGet() throws X {
228       throw thrown;
229     }
230 
231     @Override
232     public V checkedGet(long timeout, TimeUnit unit) throws X {
233       checkNotNull(unit);
234       throw thrown;
235     }
236   }
237 
238   /**
239    * Creates a {@code ListenableFuture} which has its value set immediately upon
240    * construction. The getters just return the value. This {@code Future} can't
241    * be canceled or timed out and its {@code isDone()} method always returns
242    * {@code true}.
243    */
244   public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
245     return new ImmediateSuccessfulFuture<V>(value);
246   }
247 
248   /**
249    * Returns a {@code CheckedFuture} which has its value set immediately upon
250    * construction.
251    *
252    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
253    * method always returns {@code true}. Calling {@code get()} or {@code
254    * checkedGet()} will immediately return the provided value.
255    */
256   public static <V, X extends Exception> CheckedFuture<V, X>
257       immediateCheckedFuture(@Nullable V value) {
258     return new ImmediateSuccessfulCheckedFuture<V, X>(value);
259   }
260 
261   /**
262    * Returns a {@code ListenableFuture} which has an exception set immediately
263    * upon construction.
264    *
265    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
266    * method always returns {@code true}. Calling {@code get()} will immediately
267    * throw the provided {@code Throwable} wrapped in an {@code
268    * ExecutionException}.
269    */
270   public static <V> ListenableFuture<V> immediateFailedFuture(
271       Throwable throwable) {
272     checkNotNull(throwable);
273     return new ImmediateFailedFuture<V>(throwable);
274   }
275 
276   /**
277    * Creates a {@code ListenableFuture} which is cancelled immediately upon
278    * construction, so that {@code isCancelled()} always returns {@code true}.
279    *
280    * @since 14.0
281    */
282   public static <V> ListenableFuture<V> immediateCancelledFuture() {
283     return new ImmediateCancelledFuture<V>();
284   }
285 
286   /**
287    * Returns a {@code CheckedFuture} which has an exception set immediately upon
288    * construction.
289    *
290    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
291    * method always returns {@code true}. Calling {@code get()} will immediately
292    * throw the provided {@code Exception} wrapped in an {@code
293    * ExecutionException}, and calling {@code checkedGet()} will throw the
294    * provided exception itself.
295    */
296   public static <V, X extends Exception> CheckedFuture<V, X>
297       immediateFailedCheckedFuture(X exception) {
298     checkNotNull(exception);
299     return new ImmediateFailedCheckedFuture<V, X>(exception);
300   }
301 
302   /**
303    * Returns a {@code Future} whose result is taken from the given primary
304    * {@code input} or, if the primary input fails, from the {@code Future}
305    * provided by the {@code fallback}. {@link FutureFallback#create} is not
306    * invoked until the primary input has failed, so if the primary input
307    * succeeds, it is never invoked. If, during the invocation of {@code
308    * fallback}, an exception is thrown, this exception is used as the result of
309    * the output {@code Future}.
310    *
311    * <p>Below is an example of a fallback that returns a default value if an
312    * exception occurs:
313    *
314    * <pre>   {@code
315    *   ListenableFuture<Integer> fetchCounterFuture = ...;
316    *
317    *   // Falling back to a zero counter in case an exception happens when
318    *   // processing the RPC to fetch counters.
319    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
320    *       fetchCounterFuture, new FutureFallback<Integer>() {
321    *         public ListenableFuture<Integer> create(Throwable t) {
322    *           // Returning "0" as the default for the counter when the
323    *           // exception happens.
324    *           return immediateFuture(0);
325    *         }
326    *       });}</pre>
327    *
328    * <p>The fallback can also choose to propagate the original exception when
329    * desired:
330    *
331    * <pre>   {@code
332    *   ListenableFuture<Integer> fetchCounterFuture = ...;
333    *
334    *   // Falling back to a zero counter only in case the exception was a
335    *   // TimeoutException.
336    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
337    *       fetchCounterFuture, new FutureFallback<Integer>() {
338    *         public ListenableFuture<Integer> create(Throwable t) {
339    *           if (t instanceof TimeoutException) {
340    *             return immediateFuture(0);
341    *           }
342    *           return immediateFailedFuture(t);
343    *         }
344    *       });}</pre>
345    *
346    * <p>Note: If the derived {@code Future} is slow or heavyweight to create
347    * (whether the {@code Future} itself is slow or heavyweight to complete is
348    * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
349    * FutureFallback, Executor) supplying an executor}. If you do not supply an
350    * executor, {@code withFallback} will use {@link
351    * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
352    * caveats for heavier operations. For example, the call to {@code
353    * fallback.create} may run on an unpredictable or undesirable thread:
354    *
355    * <ul>
356    * <li>If the input {@code Future} is done at the time {@code withFallback}
357    * is called, {@code withFallback} will call {@code fallback.create} inline.
358    * <li>If the input {@code Future} is not yet done, {@code withFallback} will
359    * schedule {@code fallback.create} to be run by the thread that completes
360    * the input {@code Future}, which may be an internal system thread such as
361    * an RPC network thread.
362    * </ul>
363    *
364    * <p>Also note that, regardless of which thread executes the {@code
365    * sameThreadExecutor} {@code fallback.create}, all other registered but
366    * unexecuted listeners are prevented from running during its execution, even
367    * if those listeners are to run in other executors.
368    *
369    * @param input the primary input {@code Future}
370    * @param fallback the {@link FutureFallback} implementation to be called if
371    *     {@code input} fails
372    * @since 14.0
373    */
374   public static <V> ListenableFuture<V> withFallback(
375       ListenableFuture<? extends V> input,
376       FutureFallback<? extends V> fallback) {
377     return withFallback(input, fallback, sameThreadExecutor());
378   }
379 
380   /**
381    * Returns a {@code Future} whose result is taken from the given primary
382    * {@code input} or, if the primary input fails, from the {@code Future}
383    * provided by the {@code fallback}. {@link FutureFallback#create} is not
384    * invoked until the primary input has failed, so if the primary input
385    * succeeds, it is never invoked. If, during the invocation of {@code
386    * fallback}, an exception is thrown, this exception is used as the result of
387    * the output {@code Future}.
388    *
389    * <p>Below is an example of a fallback that returns a default value if an
390    * exception occurs:
391    *
392    * <pre>   {@code
393    *   ListenableFuture<Integer> fetchCounterFuture = ...;
394    *
395    *   // Falling back to a zero counter in case an exception happens when
396    *   // processing the RPC to fetch counters.
397    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
398    *       fetchCounterFuture, new FutureFallback<Integer>() {
399    *         public ListenableFuture<Integer> create(Throwable t) {
400    *           // Returning "0" as the default for the counter when the
401    *           // exception happens.
402    *           return immediateFuture(0);
403    *         }
404    *       }, sameThreadExecutor());}</pre>
405    *
406    * <p>The fallback can also choose to propagate the original exception when
407    * desired:
408    *
409    * <pre>   {@code
410    *   ListenableFuture<Integer> fetchCounterFuture = ...;
411    *
412    *   // Falling back to a zero counter only in case the exception was a
413    *   // TimeoutException.
414    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
415    *       fetchCounterFuture, new FutureFallback<Integer>() {
416    *         public ListenableFuture<Integer> create(Throwable t) {
417    *           if (t instanceof TimeoutException) {
418    *             return immediateFuture(0);
419    *           }
420    *           return immediateFailedFuture(t);
421    *         }
422    *       }, sameThreadExecutor());}</pre>
423    *
424    * <p>When the execution of {@code fallback.create} is fast and lightweight
425    * (though the {@code Future} it returns need not meet these criteria),
426    * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
427    * omitting the executor} or explicitly specifying {@code
428    * sameThreadExecutor}. However, be aware of the caveats documented in the
429    * link above.
430    *
431    * @param input the primary input {@code Future}
432    * @param fallback the {@link FutureFallback} implementation to be called if
433    *     {@code input} fails
434    * @param executor the executor that runs {@code fallback} if {@code input}
435    *     fails
436    * @since 14.0
437    */
438   public static <V> ListenableFuture<V> withFallback(
439       ListenableFuture<? extends V> input,
440       FutureFallback<? extends V> fallback, Executor executor) {
441     checkNotNull(fallback);
442     return new FallbackFuture<V>(input, fallback, executor);
443   }
444 
445   /**
446    * A future that falls back on a second, generated future, in case its
447    * original future fails.
448    */
449   private static class FallbackFuture<V> extends AbstractFuture<V> {
450 
451     private volatile ListenableFuture<? extends V> running;
452 
453     FallbackFuture(ListenableFuture<? extends V> input,
454         final FutureFallback<? extends V> fallback,
455         final Executor executor) {
456       running = input;
457       addCallback(running, new FutureCallback<V>() {
458         @Override
459         public void onSuccess(V value) {
460           set(value);
461         }
462 
463         @Override
464         public void onFailure(Throwable t) {
465           if (isCancelled()) {
466             return;
467           }
468           try {
469             running = fallback.create(t);
470             if (isCancelled()) { // in case cancel called in the meantime
471               running.cancel(wasInterrupted());
472               return;
473             }
474             addCallback(running, new FutureCallback<V>() {
475               @Override
476               public void onSuccess(V value) {
477                 set(value);
478               }
479 
480               @Override
481               public void onFailure(Throwable t) {
482                 if (running.isCancelled()) {
483                   cancel(false);
484                 } else {
485                   setException(t);
486                 }
487               }
488             }, sameThreadExecutor());
489           } catch (Throwable e) {
490             setException(e);
491           }
492         }
493       }, executor);
494     }
495 
496     @Override
497     public boolean cancel(boolean mayInterruptIfRunning) {
498       if (super.cancel(mayInterruptIfRunning)) {
499         running.cancel(mayInterruptIfRunning);
500         return true;
501       }
502       return false;
503     }
504   }
505 
506   /**
507    * Returns a new {@code ListenableFuture} whose result is asynchronously
508    * derived from the result of the given {@code Future}. More precisely, the
509    * returned {@code Future} takes its result from a {@code Future} produced by
510    * applying the given {@code AsyncFunction} to the result of the original
511    * {@code Future}. Example:
512    *
513    * <pre>   {@code
514    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
515    *   AsyncFunction<RowKey, QueryResult> queryFunction =
516    *       new AsyncFunction<RowKey, QueryResult>() {
517    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
518    *           return dataService.read(rowKey);
519    *         }
520    *       };
521    *   ListenableFuture<QueryResult> queryFuture =
522    *       transform(rowKeyFuture, queryFunction);}</pre>
523    *
524    * <p>Note: If the derived {@code Future} is slow or heavyweight to create
525    * (whether the {@code Future} itself is slow or heavyweight to complete is
526    * irrelevant), consider {@linkplain #transform(ListenableFuture,
527    * AsyncFunction, Executor) supplying an executor}. If you do not supply an
528    * executor, {@code transform} will use {@link
529    * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
530    * caveats for heavier operations. For example, the call to {@code
531    * function.apply} may run on an unpredictable or undesirable thread:
532    *
533    * <ul>
534    * <li>If the input {@code Future} is done at the time {@code transform} is
535    * called, {@code transform} will call {@code function.apply} inline.
536    * <li>If the input {@code Future} is not yet done, {@code transform} will
537    * schedule {@code function.apply} to be run by the thread that completes the
538    * input {@code Future}, which may be an internal system thread such as an
539    * RPC network thread.
540    * </ul>
541    *
542    * <p>Also note that, regardless of which thread executes the {@code
543    * sameThreadExecutor} {@code function.apply}, all other registered but
544    * unexecuted listeners are prevented from running during its execution, even
545    * if those listeners are to run in other executors.
546    *
547    * <p>The returned {@code Future} attempts to keep its cancellation state in
548    * sync with that of the input future and that of the future returned by the
549    * function. That is, if the returned {@code Future} is cancelled, it will
550    * attempt to cancel the other two, and if either of the other two is
551    * cancelled, the returned {@code Future} will receive a callback in which it
552    * will attempt to cancel itself.
553    *
554    * @param input The future to transform
555    * @param function A function to transform the result of the input future
556    *     to the result of the output future
557    * @return A future that holds result of the function (if the input succeeded)
558    *     or the original input's failure (if not)
559    * @since 11.0
560    */
561   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
562       AsyncFunction<? super I, ? extends O> function) {
563     return transform(input, function, MoreExecutors.sameThreadExecutor());
564   }
565 
566   /**
567    * Returns a new {@code ListenableFuture} whose result is asynchronously
568    * derived from the result of the given {@code Future}. More precisely, the
569    * returned {@code Future} takes its result from a {@code Future} produced by
570    * applying the given {@code AsyncFunction} to the result of the original
571    * {@code Future}. Example:
572    *
573    * <pre>   {@code
574    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
575    *   AsyncFunction<RowKey, QueryResult> queryFunction =
576    *       new AsyncFunction<RowKey, QueryResult>() {
577    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
578    *           return dataService.read(rowKey);
579    *         }
580    *       };
581    *   ListenableFuture<QueryResult> queryFuture =
582    *       transform(rowKeyFuture, queryFunction, executor);}</pre>
583    *
584    * <p>The returned {@code Future} attempts to keep its cancellation state in
585    * sync with that of the input future and that of the future returned by the
586    * chain function. That is, if the returned {@code Future} is cancelled, it
587    * will attempt to cancel the other two, and if either of the other two is
588    * cancelled, the returned {@code Future} will receive a callback in which it
589    * will attempt to cancel itself.
590    *
591    * <p>When the execution of {@code function.apply} is fast and lightweight
592    * (though the {@code Future} it returns need not meet these criteria),
593    * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
594    * the executor} or explicitly specifying {@code sameThreadExecutor}.
595    * However, be aware of the caveats documented in the link above.
596    *
597    * @param input The future to transform
598    * @param function A function to transform the result of the input future
599    *     to the result of the output future
600    * @param executor Executor to run the function in.
601    * @return A future that holds result of the function (if the input succeeded)
602    *     or the original input's failure (if not)
603    * @since 11.0
604    */
605   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
606       AsyncFunction<? super I, ? extends O> function,
607       Executor executor) {
608     ChainingListenableFuture<I, O> output =
609         new ChainingListenableFuture<I, O>(function, input);
610     input.addListener(output, executor);
611     return output;
612   }
613 
614   /**
615    * Returns a new {@code ListenableFuture} whose result is the product of
616    * applying the given {@code Function} to the result of the given {@code
617    * Future}. Example:
618    *
619    * <pre>   {@code
620    *   ListenableFuture<QueryResult> queryFuture = ...;
621    *   Function<QueryResult, List<Row>> rowsFunction =
622    *       new Function<QueryResult, List<Row>>() {
623    *         public List<Row> apply(QueryResult queryResult) {
624    *           return queryResult.getRows();
625    *         }
626    *       };
627    *   ListenableFuture<List<Row>> rowsFuture =
628    *       transform(queryFuture, rowsFunction);}</pre>
629    *
630    * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
631    * #transform(ListenableFuture, Function, Executor) supplying an executor}.
632    * If you do not supply an executor, {@code transform} will use {@link
633    * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
634    * caveats for heavier operations.  For example, the call to {@code
635    * function.apply} may run on an unpredictable or undesirable thread:
636    *
637    * <ul>
638    * <li>If the input {@code Future} is done at the time {@code transform} is
639    * called, {@code transform} will call {@code function.apply} inline.
640    * <li>If the input {@code Future} is not yet done, {@code transform} will
641    * schedule {@code function.apply} to be run by the thread that completes the
642    * input {@code Future}, which may be an internal system thread such as an
643    * RPC network thread.
644    * </ul>
645    *
646    * <p>Also note that, regardless of which thread executes the {@code
647    * sameThreadExecutor} {@code function.apply}, all other registered but
648    * unexecuted listeners are prevented from running during its execution, even
649    * if those listeners are to run in other executors.
650    *
651    * <p>The returned {@code Future} attempts to keep its cancellation state in
652    * sync with that of the input future. That is, if the returned {@code Future}
653    * is cancelled, it will attempt to cancel the input, and if the input is
654    * cancelled, the returned {@code Future} will receive a callback in which it
655    * will attempt to cancel itself.
656    *
657    * <p>An example use of this method is to convert a serializable object
658    * returned from an RPC into a POJO.
659    *
660    * @param input The future to transform
661    * @param function A Function to transform the results of the provided future
662    *     to the results of the returned future.  This will be run in the thread
663    *     that notifies input it is complete.
664    * @return A future that holds result of the transformation.
665    * @since 9.0 (in 1.0 as {@code compose})
666    */
667   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
668       final Function<? super I, ? extends O> function) {
669     return transform(input, function, MoreExecutors.sameThreadExecutor());
670   }
671 
672   /**
673    * Returns a new {@code ListenableFuture} whose result is the product of
674    * applying the given {@code Function} to the result of the given {@code
675    * Future}. Example:
676    *
677    * <pre>   {@code
678    *   ListenableFuture<QueryResult> queryFuture = ...;
679    *   Function<QueryResult, List<Row>> rowsFunction =
680    *       new Function<QueryResult, List<Row>>() {
681    *         public List<Row> apply(QueryResult queryResult) {
682    *           return queryResult.getRows();
683    *         }
684    *       };
685    *   ListenableFuture<List<Row>> rowsFuture =
686    *       transform(queryFuture, rowsFunction, executor);}</pre>
687    *
688    * <p>The returned {@code Future} attempts to keep its cancellation state in
689    * sync with that of the input future. That is, if the returned {@code Future}
690    * is cancelled, it will attempt to cancel the input, and if the input is
691    * cancelled, the returned {@code Future} will receive a callback in which it
692    * will attempt to cancel itself.
693    *
694    * <p>An example use of this method is to convert a serializable object
695    * returned from an RPC into a POJO.
696    *
697    * <p>When the transformation is fast and lightweight, consider {@linkplain
698    * #transform(ListenableFuture, Function) omitting the executor} or
699    * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
700    * caveats documented in the link above.
701    *
702    * @param input The future to transform
703    * @param function A Function to transform the results of the provided future
704    *     to the results of the returned future.
705    * @param executor Executor to run the function in.
706    * @return A future that holds result of the transformation.
707    * @since 9.0 (in 2.0 as {@code compose})
708    */
709   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
710       final Function<? super I, ? extends O> function, Executor executor) {
711     checkNotNull(function);
712     AsyncFunction<I, O> wrapperFunction
713         = new AsyncFunction<I, O>() {
714             @Override public ListenableFuture<O> apply(I input) {
715               O output = function.apply(input);
716               return immediateFuture(output);
717             }
718         };
719     return transform(input, wrapperFunction, executor);
720   }
721 
722   /**
723    * Like {@link #transform(ListenableFuture, Function)} except that the
724    * transformation {@code function} is invoked on each call to
725    * {@link Future#get() get()} on the returned future.
726    *
727    * <p>The returned {@code Future} reflects the input's cancellation
728    * state directly, and any attempt to cancel the returned Future is likewise
729    * passed through to the input Future.
730    *
731    * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
732    * only apply the timeout to the execution of the underlying {@code Future},
733    * <em>not</em> to the execution of the transformation function.
734    *
735    * <p>The primary audience of this method is callers of {@code transform}
736    * who don't have a {@code ListenableFuture} available and
737    * do not mind repeated, lazy function evaluation.
738    *
739    * @param input The future to transform
740    * @param function A Function to transform the results of the provided future
741    *     to the results of the returned future.
742    * @return A future that returns the result of the transformation.
743    * @since 10.0
744    */
745   public static <I, O> Future<O> lazyTransform(final Future<I> input,
746       final Function<? super I, ? extends O> function) {
747     checkNotNull(input);
748     checkNotNull(function);
749     return new Future<O>() {
750 
751       @Override
752       public boolean cancel(boolean mayInterruptIfRunning) {
753         return input.cancel(mayInterruptIfRunning);
754       }
755 
756       @Override
757       public boolean isCancelled() {
758         return input.isCancelled();
759       }
760 
761       @Override
762       public boolean isDone() {
763         return input.isDone();
764       }
765 
766       @Override
767       public O get() throws InterruptedException, ExecutionException {
768         return applyTransformation(input.get());
769       }
770 
771       @Override
772       public O get(long timeout, TimeUnit unit)
773           throws InterruptedException, ExecutionException, TimeoutException {
774         return applyTransformation(input.get(timeout, unit));
775       }
776 
777       private O applyTransformation(I input) throws ExecutionException {
778         try {
779           return function.apply(input);
780         } catch (Throwable t) {
781           throw new ExecutionException(t);
782         }
783       }
784     };
785   }
786 
787   /**
788    * An implementation of {@code ListenableFuture} that also implements
789    * {@code Runnable} so that it can be used to nest ListenableFutures.
790    * Once the passed-in {@code ListenableFuture} is complete, it calls the
791    * passed-in {@code Function} to generate the result.
792    *
793    * <p>For historical reasons, this class has a special case in its exception
794    * handling: If the given {@code AsyncFunction} throws an {@code
795    * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
796    * and uses its <i>cause</i> as the output future's exception, rather than
797    * using the {@code UndeclaredThrowableException} itself as it would for other
798    * exception types. The reason for this is that {@code Futures.transform} used
799    * to require a {@code Function}, whose {@code apply} method is not allowed to
800    * throw checked exceptions. Nowadays, {@code Futures.transform} has an
801    * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
802    * <i>is</i> allowed to throw checked exception. Users who wish to throw
803    * checked exceptions should use that overload instead, and <a
804    * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
805    * should remove the {@code UndeclaredThrowableException} special case</a>.
806    */
807   private static class ChainingListenableFuture<I, O>
808       extends AbstractFuture<O> implements Runnable {
809 
810     private AsyncFunction<? super I, ? extends O> function;
811     private ListenableFuture<? extends I> inputFuture;
812     private volatile ListenableFuture<? extends O> outputFuture;
813     private final CountDownLatch outputCreated = new CountDownLatch(1);
814 
815     private ChainingListenableFuture(
816         AsyncFunction<? super I, ? extends O> function,
817         ListenableFuture<? extends I> inputFuture) {
818       this.function = checkNotNull(function);
819       this.inputFuture = checkNotNull(inputFuture);
820     }
821 
822     @Override
823     public boolean cancel(boolean mayInterruptIfRunning) {
824       /*
825        * Our additional cancellation work needs to occur even if
826        * !mayInterruptIfRunning, so we can't move it into interruptTask().
827        */
828       if (super.cancel(mayInterruptIfRunning)) {
829         // This should never block since only one thread is allowed to cancel
830         // this Future.
831         cancel(inputFuture, mayInterruptIfRunning);
832         cancel(outputFuture, mayInterruptIfRunning);
833         return true;
834       }
835       return false;
836     }
837 
838     private void cancel(@Nullable Future<?> future,
839         boolean mayInterruptIfRunning) {
840       if (future != null) {
841         future.cancel(mayInterruptIfRunning);
842       }
843     }
844 
845     @Override
846     public void run() {
847       try {
848         I sourceResult;
849         try {
850           sourceResult = getUninterruptibly(inputFuture);
851         } catch (CancellationException e) {
852           // Cancel this future and return.
853           // At this point, inputFuture is cancelled and outputFuture doesn't
854           // exist, so the value of mayInterruptIfRunning is irrelevant.
855           cancel(false);
856           return;
857         } catch (ExecutionException e) {
858           // Set the cause of the exception as this future's exception
859           setException(e.getCause());
860           return;
861         }
862 
863         final ListenableFuture<? extends O> outputFuture = this.outputFuture =
864             Preconditions.checkNotNull(function.apply(sourceResult),
865                 "AsyncFunction may not return null.");
866         if (isCancelled()) {
867           outputFuture.cancel(wasInterrupted());
868           this.outputFuture = null;
869           return;
870         }
871         outputFuture.addListener(new Runnable() {
872             @Override
873             public void run() {
874               try {
875                 set(getUninterruptibly(outputFuture));
876               } catch (CancellationException e) {
877                 // Cancel this future and return.
878                 // At this point, inputFuture and outputFuture are done, so the
879                 // value of mayInterruptIfRunning is irrelevant.
880                 cancel(false);
881                 return;
882               } catch (ExecutionException e) {
883                 // Set the cause of the exception as this future's exception
884                 setException(e.getCause());
885               } finally {
886                 // Don't pin inputs beyond completion
887                 ChainingListenableFuture.this.outputFuture = null;
888               }
889             }
890           }, MoreExecutors.sameThreadExecutor());
891       } catch (UndeclaredThrowableException e) {
892         // Set the cause of the exception as this future's exception
893         setException(e.getCause());
894       } catch (Throwable t) {
895         // This exception is irrelevant in this thread, but useful for the
896         // client
897         setException(t);
898       } finally {
899         // Don't pin inputs beyond completion
900         function = null;
901         inputFuture = null;
902         // Allow our get routines to examine outputFuture now.
903         outputCreated.countDown();
904       }
905     }
906   }
907 
908   /**
909    * Returns a new {@code ListenableFuture} whose result is the product of
910    * calling {@code get()} on the {@code Future} nested within the given {@code
911    * Future}, effectively chaining the futures one after the other.  Example:
912    *
913    * <pre>   {@code
914    *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
915    *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
916    *
917    * <p>This call has the same cancellation and execution semantics as {@link
918    * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
919    * Future} attempts to keep its cancellation state in sync with both the
920    * input {@code Future} and the nested {@code Future}.  The transformation
921    * is very lightweight and therefore takes place in the same thread (either
922    * the thread that called {@code dereference}, or the thread in which the
923    * dereferenced future completes).
924    *
925    * @param nested The nested future to transform.
926    * @return A future that holds result of the inner future.
927    * @since 13.0
928    */
929   @SuppressWarnings({"rawtypes", "unchecked"})
930   public static <V> ListenableFuture<V> dereference(
931       ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
932     return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
933   }
934 
935   /**
936    * Helper {@code Function} for {@link #dereference}.
937    */
938   private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
939       new AsyncFunction<ListenableFuture<Object>, Object>() {
940         @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
941           return input;
942         }
943       };
944 
945   /**
946    * Creates a new {@code ListenableFuture} whose value is a list containing the
947    * values of all its input futures, if all succeed. If any input fails, the
948    * returned future fails.
949    *
950    * <p>The list of results is in the same order as the input list.
951    *
952    * <p>Canceling this future will attempt to cancel all the component futures,
953    * and if any of the provided futures fails or is canceled, this one is,
954    * too.
955    *
956    * @param futures futures to combine
957    * @return a future that provides a list of the results of the component
958    *         futures
959    * @since 10.0
960    */
961   @Beta
962   public static <V> ListenableFuture<List<V>> allAsList(
963       ListenableFuture<? extends V>... futures) {
964     return listFuture(ImmutableList.copyOf(futures), true,
965         MoreExecutors.sameThreadExecutor());
966   }
967 
968   /**
969    * Creates a new {@code ListenableFuture} whose value is a list containing the
970    * values of all its input futures, if all succeed. If any input fails, the
971    * returned future fails.
972    *
973    * <p>The list of results is in the same order as the input list.
974    *
975    * <p>Canceling this future will attempt to cancel all the component futures,
976    * and if any of the provided futures fails or is canceled, this one is,
977    * too.
978    *
979    * @param futures futures to combine
980    * @return a future that provides a list of the results of the component
981    *         futures
982    * @since 10.0
983    */
984   @Beta
985   public static <V> ListenableFuture<List<V>> allAsList(
986       Iterable<? extends ListenableFuture<? extends V>> futures) {
987     return listFuture(ImmutableList.copyOf(futures), true,
988         MoreExecutors.sameThreadExecutor());
989   }
990 
991   /**
992    * Creates a new {@code ListenableFuture} whose result is set from the
993    * supplied future when it completes.  Cancelling the supplied future
994    * will also cancel the returned future, but cancelling the returned
995    * future will have no effect on the supplied future.
996    *
997    * @since 15.0
998    */
999   public static <V> ListenableFuture<V> nonCancellationPropagating(
1000       ListenableFuture<V> future) {
1001     return new NonCancellationPropagatingFuture<V>(future);
1002   }
1003 
1004   /**
1005    * A wrapped future that does not propagate cancellation to its delegate.
1006    */
1007   private static class NonCancellationPropagatingFuture<V>
1008       extends AbstractFuture<V> {
1009     NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1010       checkNotNull(delegate);
1011       addCallback(delegate, new FutureCallback<V>() {
1012         @Override
1013         public void onSuccess(V result) {
1014           set(result);
1015         }
1016 
1017         @Override
1018         public void onFailure(Throwable t) {
1019           if (delegate.isCancelled()) {
1020             cancel(false);
1021           } else {
1022             setException(t);
1023           }
1024         }
1025       }, sameThreadExecutor());
1026     }
1027   }
1028 
1029   /**
1030    * Creates a new {@code ListenableFuture} whose value is a list containing the
1031    * values of all its successful input futures. The list of results is in the
1032    * same order as the input list, and if any of the provided futures fails or
1033    * is canceled, its corresponding position will contain {@code null} (which is
1034    * indistinguishable from the future having a successful value of
1035    * {@code null}).
1036    *
1037    * <p>Canceling this future will attempt to cancel all the component futures.
1038    *
1039    * @param futures futures to combine
1040    * @return a future that provides a list of the results of the component
1041    *         futures
1042    * @since 10.0
1043    */
1044   @Beta
1045   public static <V> ListenableFuture<List<V>> successfulAsList(
1046       ListenableFuture<? extends V>... futures) {
1047     return listFuture(ImmutableList.copyOf(futures), false,
1048         MoreExecutors.sameThreadExecutor());
1049   }
1050 
1051   /**
1052    * Creates a new {@code ListenableFuture} whose value is a list containing the
1053    * values of all its successful input futures. The list of results is in the
1054    * same order as the input list, and if any of the provided futures fails or
1055    * is canceled, its corresponding position will contain {@code null} (which is
1056    * indistinguishable from the future having a successful value of
1057    * {@code null}).
1058    *
1059    * <p>Canceling this future will attempt to cancel all the component futures.
1060    *
1061    * @param futures futures to combine
1062    * @return a future that provides a list of the results of the component
1063    *         futures
1064    * @since 10.0
1065    */
1066   @Beta
1067   public static <V> ListenableFuture<List<V>> successfulAsList(
1068       Iterable<? extends ListenableFuture<? extends V>> futures) {
1069     return listFuture(ImmutableList.copyOf(futures), false,
1070         MoreExecutors.sameThreadExecutor());
1071   }
1072 
1073   /**
1074    * Returns a list of delegate futures that correspond to the futures received in the order
1075    * that they complete. Delegate futures return the same value or throw the same exception
1076    * as the corresponding input future returns/throws.
1077    *
1078    * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
1079    * does not correspond to a specific input future until the appropriate number of input
1080    * futures have completed. At that point, it is too late to cancel the input future.
1081    * The input future's result, which cannot be stored into the cancelled delegate future,
1082    * is ignored.
1083    *
1084    * @since 17.0
1085    */
1086   @Beta
1087   public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
1088       Iterable<? extends ListenableFuture<? extends T>> futures) {
1089     // A CLQ may be overkill here.  We could save some pointers/memory by synchronizing on an
1090     // ArrayDeque
1091     final ConcurrentLinkedQueue<AsyncSettableFuture<T>> delegates =
1092         Queues.newConcurrentLinkedQueue();
1093     ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
1094     // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
1095     // atomically and therefore that each returned future is guaranteed to be in completion order.
1096     // N.B. there are some cases where the use of this executor could have possibly surprising
1097     // effects when input futures finish at approximately the same time _and_ the output futures
1098     // have sameThreadExecutor listeners. In this situation, the listeners may end up running on a
1099     // different thread than if they were attached to the corresponding input future.  We believe
1100     // this to be a negligible cost since:
1101     // 1. Using the sameThreadExecutor implies that your callback is safe to run on any thread.
1102     // 2. This would likely only be noticeable if you were doing something expensive or blocking on
1103     //    a sameThreadExecutor listener on one of the output futures which is an antipattern anyway.
1104     SerializingExecutor executor = new SerializingExecutor(MoreExecutors.sameThreadExecutor());
1105     for (final ListenableFuture<? extends T> future : futures) {
1106       AsyncSettableFuture<T> delegate = AsyncSettableFuture.create();
1107       // Must make sure to add the delegate to the queue first in case the future is already done
1108       delegates.add(delegate);
1109       future.addListener(new Runnable() {
1110         @Override public void run() {
1111           delegates.remove().setFuture(future);
1112         }
1113       }, executor);
1114       listBuilder.add(delegate);
1115     }
1116     return listBuilder.build();
1117   }
1118 
1119   /**
1120    * Registers separate success and failure callbacks to be run when the {@code
1121    * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1122    * complete} or, if the computation is already complete, immediately.
1123    *
1124    * <p>There is no guaranteed ordering of execution of callbacks, but any
1125    * callback added through this method is guaranteed to be called once the
1126    * computation is complete.
1127    *
1128    * Example: <pre> {@code
1129    * ListenableFuture<QueryResult> future = ...;
1130    * addCallback(future,
1131    *     new FutureCallback<QueryResult> {
1132    *       public void onSuccess(QueryResult result) {
1133    *         storeInCache(result);
1134    *       }
1135    *       public void onFailure(Throwable t) {
1136    *         reportError(t);
1137    *       }
1138    *     });}</pre>
1139    *
1140    * <p>Note: If the callback is slow or heavyweight, consider {@linkplain
1141    * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1142    * executor}. If you do not supply an executor, {@code addCallback} will use
1143    * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
1144    * some caveats for heavier operations. For example, the callback may run on
1145    * an unpredictable or undesirable thread:
1146    *
1147    * <ul>
1148    * <li>If the input {@code Future} is done at the time {@code addCallback} is
1149    * called, {@code addCallback} will execute the callback inline.
1150    * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1151    * schedule the callback to be run by the thread that completes the input
1152    * {@code Future}, which may be an internal system thread such as an RPC
1153    * network thread.
1154    * </ul>
1155    *
1156    * <p>Also note that, regardless of which thread executes the {@code
1157    * sameThreadExecutor} callback, all other registered but unexecuted listeners
1158    * are prevented from running during its execution, even if those listeners
1159    * are to run in other executors.
1160    *
1161    * <p>For a more general interface to attach a completion listener to a
1162    * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1163    *
1164    * @param future The future attach the callback to.
1165    * @param callback The callback to invoke when {@code future} is completed.
1166    * @since 10.0
1167    */
1168   public static <V> void addCallback(ListenableFuture<V> future,
1169       FutureCallback<? super V> callback) {
1170     addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1171   }
1172 
1173   /**
1174    * Registers separate success and failure callbacks to be run when the {@code
1175    * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1176    * complete} or, if the computation is already complete, immediately.
1177    *
1178    * <p>The callback is run in {@code executor}.
1179    * There is no guaranteed ordering of execution of callbacks, but any
1180    * callback added through this method is guaranteed to be called once the
1181    * computation is complete.
1182    *
1183    * Example: <pre> {@code
1184    * ListenableFuture<QueryResult> future = ...;
1185    * Executor e = ...
1186    * addCallback(future,
1187    *     new FutureCallback<QueryResult> {
1188    *       public void onSuccess(QueryResult result) {
1189    *         storeInCache(result);
1190    *       }
1191    *       public void onFailure(Throwable t) {
1192    *         reportError(t);
1193    *       }
1194    *     }, e);}</pre>
1195    *
1196    * <p>When the callback is fast and lightweight, consider {@linkplain
1197    * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1198    * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
1199    * caveats documented in the link above.
1200    *
1201    * <p>For a more general interface to attach a completion listener to a
1202    * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1203    *
1204    * @param future The future attach the callback to.
1205    * @param callback The callback to invoke when {@code future} is completed.
1206    * @param executor The executor to run {@code callback} when the future
1207    *    completes.
1208    * @since 10.0
1209    */
1210   public static <V> void addCallback(final ListenableFuture<V> future,
1211       final FutureCallback<? super V> callback, Executor executor) {
1212     Preconditions.checkNotNull(callback);
1213     Runnable callbackListener = new Runnable() {
1214       @Override
1215       public void run() {
1216         final V value;
1217         try {
1218           // TODO(user): (Before Guava release), validate that this
1219           // is the thing for IE.
1220           value = getUninterruptibly(future);
1221         } catch (ExecutionException e) {
1222           callback.onFailure(e.getCause());
1223           return;
1224         } catch (RuntimeException e) {
1225           callback.onFailure(e);
1226           return;
1227         } catch (Error e) {
1228           callback.onFailure(e);
1229           return;
1230         }
1231         callback.onSuccess(value);
1232       }
1233     };
1234     future.addListener(callbackListener, executor);
1235   }
1236 
1237   /**
1238    * Returns the result of {@link Future#get()}, converting most exceptions to a
1239    * new instance of the given checked exception type. This reduces boilerplate
1240    * for a common use of {@code Future} in which it is unnecessary to
1241    * programmatically distinguish between exception types or to extract other
1242    * information from the exception instance.
1243    *
1244    * <p>Exceptions from {@code Future.get} are treated as follows:
1245    * <ul>
1246    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1247    *     {@code X} if the cause is a checked exception, an {@link
1248    *     UncheckedExecutionException} if the cause is a {@code
1249    *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1250    *     {@code Error}.
1251    * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1252    *     restoring the interrupt).
1253    * <li>Any {@link CancellationException} is propagated untouched, as is any
1254    *     other {@link RuntimeException} (though {@code get} implementations are
1255    *     discouraged from throwing such exceptions).
1256    * </ul>
1257    *
1258    * <p>The overall principle is to continue to treat every checked exception as a
1259    * checked exception, every unchecked exception as an unchecked exception, and
1260    * every error as an error. In addition, the cause of any {@code
1261    * ExecutionException} is wrapped in order to ensure that the new stack trace
1262    * matches that of the current thread.
1263    *
1264    * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1265    * public constructor that accepts zero or more arguments, all of type {@code
1266    * String} or {@code Throwable} (preferring constructors with at least one
1267    * {@code String}) and calling the constructor via reflection. If the
1268    * exception did not already have a cause, one is set by calling {@link
1269    * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1270    * {@code IllegalArgumentException} is thrown.
1271    *
1272    * @throws X if {@code get} throws any checked exception except for an {@code
1273    *         ExecutionException} whose cause is not itself a checked exception
1274    * @throws UncheckedExecutionException if {@code get} throws an {@code
1275    *         ExecutionException} with a {@code RuntimeException} as its cause
1276    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1277    *         with an {@code Error} as its cause
1278    * @throws CancellationException if {@code get} throws a {@code
1279    *         CancellationException}
1280    * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1281    *         RuntimeException} or does not have a suitable constructor
1282    * @since 10.0
1283    */
1284   public static <V, X extends Exception> V get(
1285       Future<V> future, Class<X> exceptionClass) throws X {
1286     checkNotNull(future);
1287     checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1288         "Futures.get exception type (%s) must not be a RuntimeException",
1289         exceptionClass);
1290     try {
1291       return future.get();
1292     } catch (InterruptedException e) {
1293       currentThread().interrupt();
1294       throw newWithCause(exceptionClass, e);
1295     } catch (ExecutionException e) {
1296       wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1297       throw new AssertionError();
1298     }
1299   }
1300 
1301   /**
1302    * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1303    * exceptions to a new instance of the given checked exception type. This
1304    * reduces boilerplate for a common use of {@code Future} in which it is
1305    * unnecessary to programmatically distinguish between exception types or to
1306    * extract other information from the exception instance.
1307    *
1308    * <p>Exceptions from {@code Future.get} are treated as follows:
1309    * <ul>
1310    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1311    *     {@code X} if the cause is a checked exception, an {@link
1312    *     UncheckedExecutionException} if the cause is a {@code
1313    *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1314    *     {@code Error}.
1315    * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1316    *     restoring the interrupt).
1317    * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1318    * <li>Any {@link CancellationException} is propagated untouched, as is any
1319    *     other {@link RuntimeException} (though {@code get} implementations are
1320    *     discouraged from throwing such exceptions).
1321    * </ul>
1322    *
1323    * <p>The overall principle is to continue to treat every checked exception as a
1324    * checked exception, every unchecked exception as an unchecked exception, and
1325    * every error as an error. In addition, the cause of any {@code
1326    * ExecutionException} is wrapped in order to ensure that the new stack trace
1327    * matches that of the current thread.
1328    *
1329    * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1330    * public constructor that accepts zero or more arguments, all of type {@code
1331    * String} or {@code Throwable} (preferring constructors with at least one
1332    * {@code String}) and calling the constructor via reflection. If the
1333    * exception did not already have a cause, one is set by calling {@link
1334    * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1335    * {@code IllegalArgumentException} is thrown.
1336    *
1337    * @throws X if {@code get} throws any checked exception except for an {@code
1338    *         ExecutionException} whose cause is not itself a checked exception
1339    * @throws UncheckedExecutionException if {@code get} throws an {@code
1340    *         ExecutionException} with a {@code RuntimeException} as its cause
1341    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1342    *         with an {@code Error} as its cause
1343    * @throws CancellationException if {@code get} throws a {@code
1344    *         CancellationException}
1345    * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1346    *         RuntimeException} or does not have a suitable constructor
1347    * @since 10.0
1348    */
1349   public static <V, X extends Exception> V get(
1350       Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1351       throws X {
1352     checkNotNull(future);
1353     checkNotNull(unit);
1354     checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1355         "Futures.get exception type (%s) must not be a RuntimeException",
1356         exceptionClass);
1357     try {
1358       return future.get(timeout, unit);
1359     } catch (InterruptedException e) {
1360       currentThread().interrupt();
1361       throw newWithCause(exceptionClass, e);
1362     } catch (TimeoutException e) {
1363       throw newWithCause(exceptionClass, e);
1364     } catch (ExecutionException e) {
1365       wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1366       throw new AssertionError();
1367     }
1368   }
1369 
1370   private static <X extends Exception> void wrapAndThrowExceptionOrError(
1371       Throwable cause, Class<X> exceptionClass) throws X {
1372     if (cause instanceof Error) {
1373       throw new ExecutionError((Error) cause);
1374     }
1375     if (cause instanceof RuntimeException) {
1376       throw new UncheckedExecutionException(cause);
1377     }
1378     throw newWithCause(exceptionClass, cause);
1379   }
1380 
1381   /**
1382    * Returns the result of calling {@link Future#get()} uninterruptibly on a
1383    * task known not to throw a checked exception. This makes {@code Future} more
1384    * suitable for lightweight, fast-running tasks that, barring bugs in the
1385    * code, will not fail. This gives it exception-handling behavior similar to
1386    * that of {@code ForkJoinTask.join}.
1387    *
1388    * <p>Exceptions from {@code Future.get} are treated as follows:
1389    * <ul>
1390    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1391    *     {@link UncheckedExecutionException} (if the cause is an {@code
1392    *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1393    *     Error}).
1394    * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1395    *     call. The interrupt is restored before {@code getUnchecked} returns.
1396    * <li>Any {@link CancellationException} is propagated untouched. So is any
1397    *     other {@link RuntimeException} ({@code get} implementations are
1398    *     discouraged from throwing such exceptions).
1399    * </ul>
1400    *
1401    * <p>The overall principle is to eliminate all checked exceptions: to loop to
1402    * avoid {@code InterruptedException}, to pass through {@code
1403    * CancellationException}, and to wrap any exception from the underlying
1404    * computation in an {@code UncheckedExecutionException} or {@code
1405    * ExecutionError}.
1406    *
1407    * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1408    * {@link Uninterruptibles#getUninterruptibly(Future)}.
1409    *
1410    * @throws UncheckedExecutionException if {@code get} throws an {@code
1411    *         ExecutionException} with an {@code Exception} as its cause
1412    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1413    *         with an {@code Error} as its cause
1414    * @throws CancellationException if {@code get} throws a {@code
1415    *         CancellationException}
1416    * @since 10.0
1417    */
1418   public static <V> V getUnchecked(Future<V> future) {
1419     checkNotNull(future);
1420     try {
1421       return getUninterruptibly(future);
1422     } catch (ExecutionException e) {
1423       wrapAndThrowUnchecked(e.getCause());
1424       throw new AssertionError();
1425     }
1426   }
1427 
1428   private static void wrapAndThrowUnchecked(Throwable cause) {
1429     if (cause instanceof Error) {
1430       throw new ExecutionError((Error) cause);
1431     }
1432     /*
1433      * It's a non-Error, non-Exception Throwable. From my survey of such
1434      * classes, I believe that most users intended to extend Exception, so we'll
1435      * treat it like an Exception.
1436      */
1437     throw new UncheckedExecutionException(cause);
1438   }
1439 
1440   /*
1441    * TODO(user): FutureChecker interface for these to be static methods on? If
1442    * so, refer to it in the (static-method) Futures.get documentation
1443    */
1444 
1445   /*
1446    * Arguably we don't need a timed getUnchecked because any operation slow
1447    * enough to require a timeout is heavyweight enough to throw a checked
1448    * exception and therefore be inappropriate to use with getUnchecked. Further,
1449    * it's not clear that converting the checked TimeoutException to a
1450    * RuntimeException -- especially to an UncheckedExecutionException, since it
1451    * wasn't thrown by the computation -- makes sense, and if we don't convert
1452    * it, the user still has to write a try-catch block.
1453    *
1454    * If you think you would use this method, let us know.
1455    */
1456 
1457   private static <X extends Exception> X newWithCause(
1458       Class<X> exceptionClass, Throwable cause) {
1459     // getConstructors() guarantees this as long as we don't modify the array.
1460     @SuppressWarnings("unchecked")
1461     List<Constructor<X>> constructors =
1462         (List) Arrays.asList(exceptionClass.getConstructors());
1463     for (Constructor<X> constructor : preferringStrings(constructors)) {
1464       @Nullable X instance = newFromConstructor(constructor, cause);
1465       if (instance != null) {
1466         if (instance.getCause() == null) {
1467           instance.initCause(cause);
1468         }
1469         return instance;
1470       }
1471     }
1472     throw new IllegalArgumentException(
1473         "No appropriate constructor for exception of type " + exceptionClass
1474             + " in response to chained exception", cause);
1475   }
1476 
1477   private static <X extends Exception> List<Constructor<X>>
1478       preferringStrings(List<Constructor<X>> constructors) {
1479     return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1480   }
1481 
1482   private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1483       Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1484         @Override public Boolean apply(Constructor<?> input) {
1485           return asList(input.getParameterTypes()).contains(String.class);
1486         }
1487       }).reverse();
1488 
1489   @Nullable private static <X> X newFromConstructor(
1490       Constructor<X> constructor, Throwable cause) {
1491     Class<?>[] paramTypes = constructor.getParameterTypes();
1492     Object[] params = new Object[paramTypes.length];
1493     for (int i = 0; i < paramTypes.length; i++) {
1494       Class<?> paramType = paramTypes[i];
1495       if (paramType.equals(String.class)) {
1496         params[i] = cause.toString();
1497       } else if (paramType.equals(Throwable.class)) {
1498         params[i] = cause;
1499       } else {
1500         return null;
1501       }
1502     }
1503     try {
1504       return constructor.newInstance(params);
1505     } catch (IllegalArgumentException e) {
1506       return null;
1507     } catch (InstantiationException e) {
1508       return null;
1509     } catch (IllegalAccessException e) {
1510       return null;
1511     } catch (InvocationTargetException e) {
1512       return null;
1513     }
1514   }
1515 
1516   private interface FutureCombiner<V, C> {
1517     C combine(List<Optional<V>> values);
1518   }
1519 
1520   private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1521     private static final Logger logger =
1522         Logger.getLogger(CombinedFuture.class.getName());
1523 
1524     ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1525     final boolean allMustSucceed;
1526     final AtomicInteger remaining;
1527     FutureCombiner<V, C> combiner;
1528     List<Optional<V>> values;
1529     final Object seenExceptionsLock = new Object();
1530     Set<Throwable> seenExceptions;
1531 
1532     CombinedFuture(
1533         ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1534         boolean allMustSucceed, Executor listenerExecutor,
1535         FutureCombiner<V, C> combiner) {
1536       this.futures = futures;
1537       this.allMustSucceed = allMustSucceed;
1538       this.remaining = new AtomicInteger(futures.size());
1539       this.combiner = combiner;
1540       this.values = Lists.newArrayListWithCapacity(futures.size());
1541       init(listenerExecutor);
1542     }
1543 
1544     /**
1545      * Must be called at the end of the constructor.
1546      */
1547     protected void init(final Executor listenerExecutor) {
1548       // First, schedule cleanup to execute when the Future is done.
1549       addListener(new Runnable() {
1550         @Override
1551         public void run() {
1552           // Cancel all the component futures.
1553           if (CombinedFuture.this.isCancelled()) {
1554             for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1555               future.cancel(CombinedFuture.this.wasInterrupted());
1556             }
1557           }
1558 
1559           // Let go of the memory held by other futures
1560           CombinedFuture.this.futures = null;
1561 
1562           // By now the values array has either been set as the Future's value,
1563           // or (in case of failure) is no longer useful.
1564           CombinedFuture.this.values = null;
1565 
1566           // The combiner may also hold state, so free that as well
1567           CombinedFuture.this.combiner = null;
1568         }
1569       }, MoreExecutors.sameThreadExecutor());
1570 
1571       // Now begin the "real" initialization.
1572 
1573       // Corner case: List is empty.
1574       if (futures.isEmpty()) {
1575         set(combiner.combine(ImmutableList.<Optional<V>>of()));
1576         return;
1577       }
1578 
1579       // Populate the results list with null initially.
1580       for (int i = 0; i < futures.size(); ++i) {
1581         values.add(null);
1582       }
1583 
1584       // Register a listener on each Future in the list to update
1585       // the state of this future.
1586       // Note that if all the futures on the list are done prior to completing
1587       // this loop, the last call to addListener() will callback to
1588       // setOneValue(), transitively call our cleanup listener, and set
1589       // this.futures to null.
1590       // This is not actually a problem, since the foreach only needs
1591       // this.futures to be non-null at the beginning of the loop.
1592       int i = 0;
1593       for (final ListenableFuture<? extends V> listenable : futures) {
1594         final int index = i++;
1595         listenable.addListener(new Runnable() {
1596           @Override
1597           public void run() {
1598             setOneValue(index, listenable);
1599           }
1600         }, listenerExecutor);
1601       }
1602     }
1603 
1604     /**
1605      * Fails this future with the given Throwable if {@link #allMustSucceed} is
1606      * true. Also, logs the throwable if it is an {@link Error} or if
1607      * {@link #allMustSucceed} is {@code true}, the throwable did not cause
1608      * this future to fail, and it is the first time we've seen that particular Throwable.
1609      */
1610     private void setExceptionAndMaybeLog(Throwable throwable) {
1611       boolean visibleFromOutputFuture = false;
1612       boolean firstTimeSeeingThisException = true;
1613       if (allMustSucceed) {
1614         // As soon as the first one fails, throw the exception up.
1615         // The result of all other inputs is then ignored.
1616         visibleFromOutputFuture = super.setException(throwable);
1617 
1618         synchronized (seenExceptionsLock) {
1619           if (seenExceptions == null) {
1620             seenExceptions = Sets.newHashSet();
1621           }
1622           firstTimeSeeingThisException = seenExceptions.add(throwable);
1623         }
1624       }
1625 
1626       if (throwable instanceof Error
1627           || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) {
1628         logger.log(Level.SEVERE, "input future failed.", throwable);
1629       }
1630     }
1631 
1632     /**
1633      * Sets the value at the given index to that of the given future.
1634      */
1635     private void setOneValue(int index, Future<? extends V> future) {
1636       List<Optional<V>> localValues = values;
1637       // TODO(user): This check appears to be redundant since values is
1638       // assigned null only after the future completes.  However, values
1639       // is not volatile so it may be possible for us to observe the changes
1640       // to these two values in a different order... which I think is why
1641       // we need to check both.  Clear up this craziness either by making
1642       // values volatile or proving that it doesn't need to be for some other
1643       // reason.
1644       if (isDone() || localValues == null) {
1645         // Some other future failed or has been cancelled, causing this one to
1646         // also be cancelled or have an exception set. This should only happen
1647         // if allMustSucceed is true or if the output itself has been
1648         // cancelled.
1649         checkState(allMustSucceed || isCancelled(),
1650             "Future was done before all dependencies completed");
1651       }
1652 
1653       try {
1654         checkState(future.isDone(),
1655             "Tried to set value from future which is not done");
1656         V returnValue = getUninterruptibly(future);
1657         if (localValues != null) {
1658           localValues.set(index, Optional.fromNullable(returnValue));
1659         }
1660       } catch (CancellationException e) {
1661         if (allMustSucceed) {
1662           // Set ourselves as cancelled. Let the input futures keep running
1663           // as some of them may be used elsewhere.
1664           cancel(false);
1665         }
1666       } catch (ExecutionException e) {
1667         setExceptionAndMaybeLog(e.getCause());
1668       } catch (Throwable t) {
1669         setExceptionAndMaybeLog(t);
1670       } finally {
1671         int newRemaining = remaining.decrementAndGet();
1672         checkState(newRemaining >= 0, "Less than 0 remaining futures");
1673         if (newRemaining == 0) {
1674           FutureCombiner<V, C> localCombiner = combiner;
1675           if (localCombiner != null && localValues != null) {
1676             set(localCombiner.combine(localValues));
1677           } else {
1678             checkState(isDone());
1679           }
1680         }
1681       }
1682     }
1683 
1684   }
1685 
1686   /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1687   private static <V> ListenableFuture<List<V>> listFuture(
1688       ImmutableList<ListenableFuture<? extends V>> futures,
1689       boolean allMustSucceed, Executor listenerExecutor) {
1690     return new CombinedFuture<V, List<V>>(
1691         futures, allMustSucceed, listenerExecutor,
1692         new FutureCombiner<V, List<V>>() {
1693           @Override
1694           public List<V> combine(List<Optional<V>> values) {
1695             List<V> result = Lists.newArrayList();
1696             for (Optional<V> element : values) {
1697               result.add(element != null ? element.orNull() : null);
1698             }
1699             return Collections.unmodifiableList(result);
1700           }
1701         });
1702   }
1703 
1704   /**
1705    * A checked future that uses a function to map from exceptions to the
1706    * appropriate checked type.
1707    */
1708   private static class MappingCheckedFuture<V, X extends Exception> extends
1709       AbstractCheckedFuture<V, X> {
1710 
1711     final Function<Exception, X> mapper;
1712 
1713     MappingCheckedFuture(ListenableFuture<V> delegate,
1714         Function<Exception, X> mapper) {
1715       super(delegate);
1716 
1717       this.mapper = checkNotNull(mapper);
1718     }
1719 
1720     @Override
1721     protected X mapException(Exception e) {
1722       return mapper.apply(e);
1723     }
1724   }
1725 }