blob: e682b107cdadec9952e581f572acc60f64574d30 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.v23;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureFallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.v.v23.context.VContext;
import io.v.v23.verror.CanceledException;
import io.v.v23.verror.VException;
/**
* Contains static utility methods that operate on or return objects of type {@link Future}.
*/
public class VFutures {
/**
* Waits for the computation embodied by the given {@code future} to complete and then
* retrieves its result.
* <p>
* This method requires that {@code future}'s computation may only throw a {@link VException} -
* any other exception type will result in a {@link RuntimeException}.
* <p>
* If the {@code future}'s execution raises an {@link CancellationException}, this method
* will throw a {@link CanceledException} (which is a {@link VException}).
* <p>
* If the {@code future}'s execution raises an {@link InterruptedException}, this
* method will raise a {@link RuntimeException}.
*
* @param future the future being executed
* @return the computed result of the future
* @throws VException if the computation embodied by the future threw a {@link VException}
* @throws CanceledException if the future's execution raised an {@link CancellationException}
* @throws RuntimeException if the computation embodied by the future threw an exception
* other than {@link VException}, or if the future's executions raised
* an {@link InterruptedException}
*/
public static <T> T sync(Future<T> future) throws VException {
try {
return future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof VException) {
throw (VException) e.getCause();
} else if (e.getCause() instanceof CancellationException) {
throw new CanceledException(null);
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(
"Vanadium futures may only raise a VException or a RuntimeException.",
e.getCause());
} catch (InterruptedException e) {
throw new RuntimeException("Vanadium future may not raise an InterruptedException.", e);
} catch (CancellationException e) {
throw new CanceledException(null);
}
}
/**
* Waits if necessary for at most the given time for the computation embodied by the given
* {@code future} to complete and then retrieves its result, if available.
* <p>
* This method requires that {@code future}'s computation may only throw a {@link VException} -
* any other exception type will result in a {@link RuntimeException}.
* <p>
* If the {@code future}'s execution raises an {@link CancellationException}, this method
* will throw a {@link CanceledException} (which is a {@link VException}).
* <p>
* If the {@code future}'s execution results in an {@link InterruptedException}, this
* method will raise a {@link RuntimeException}.
*
* @param future the future being executed
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result of the future
* @throws VException if the computation embodied by the future threw a {@link VException}
* @throws CanceledException if the future's execution raised an {@link CancellationException}
* @throws RuntimeException if the computation embodied by the future threw an exception
* other than {@link VException}, or if the future's executions raised
* an {@link InterruptedException}
* @throws TimeoutException if the future didn't complete in the allotted time
*/
public static <T> T sync(Future<T> future, long timeout, TimeUnit unit)
throws VException, TimeoutException {
try {
return future.get(timeout, unit);
} catch (ExecutionException e) {
if (e.getCause() instanceof VException) {
throw (VException) e.getCause();
} else if (e.getCause() instanceof CancellationException) {
throw new CanceledException(null);
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(
"Vanadium futures may only raise a VException or a RuntimeException.",
e.getCause());
} catch (InterruptedException e) {
throw new RuntimeException("Vanadium future may not raise an InterruptedException.", e);
} catch (CancellationException e) {
throw new CanceledException(null);
}
}
/**
* Returns a new {@link ListenableFuture} whose result is the same as the result of the
* given {@code future} and is executed on an {@link Executor} specified in the given
* {@code context}.
* <p>
* If no executor is specified in the context, the future may be executed on an arbitrary
* thread.
*/
public static <T> ListenableFuture<T> onExecutor(
final VContext context, final ListenableFuture<T> future) {
Executor executor = V.getExecutor(context);
if (executor == null) {
return future;
}
return Futures.transform(future, new AsyncFunction<T, T>() {
@Override
public ListenableFuture<T> apply(T input) throws Exception {
return Futures.immediateFuture(input);
}
}, executor);
}
/**
* Returns a new {@link ListenableFuture} that runs on an {@link Executor} specified in the
* given {@code context} and is canceled iff the given {@code context} has been canceled.
*/
public static <T> ListenableFuture<T> withUserLandChecks(
final VContext context, final ListenableFuture<T> future) {
Executor executor = V.getExecutor(context);
if (executor == null) {
throw new RuntimeException("NULL executor in context: did you derive this context " +
"from the context returned by V.init()?");
}
return Futures.withFallback(Futures.transform(future, new AsyncFunction<T, T>() {
@Override
public ListenableFuture<T> apply(T input) throws Exception {
if (context.isCanceled()) {
return Futures.immediateCancelledFuture();
}
return Futures.immediateFuture(input);
}
}, executor), new FutureFallback<T>() {
@Override
public ListenableFuture<T> create(Throwable t) throws Exception {
if (context.isCanceled()) {
return Futures.immediateCancelledFuture();
}
return Futures.immediateFailedFuture(t);
}
}, executor);
}
}