blob: f5210819532702f6168c9f1b98636bef1751e5f3 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.v23;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.CheckReturnValue;
import io.v.v23.context.VContext;
import io.v.v23.verror.EndOfFileException;
import io.v.v23.verror.VException;
import static io.v.v23.VFutures.sync;
/**
* Contains static utility methods that operate on or return objects of type {@link InputChannel}.
*/
public class InputChannels {
/**
* Function used for transforming an input value into an output value.
*/
public interface TransformFunction<F, T> {
/**
* Returns the result of transforming {@code from}, or {@code null} if the value should
* be skipped.
*
* @throws VException if there was a transform error
*/
T apply(F from) throws VException;
}
/**
* Returns an {@link InputChannel} that applies the provided {@code function} to each element
* of {@code fromChannel}.
*/
public static <F,T> InputChannel<T> transform(
VContext ctx, InputChannel<F> fromChannel,
TransformFunction<? super F, ? extends T> function) {
return new TransformedChannel<>(ctx, fromChannel, function);
}
/**
* Returns a new {@link ListenableFuture} whose result is the list of all elements received
* from the provided {@link InputChannel}.
* <p>
* The returned future will be executed on a
* {@link MoreExecutors#directExecutor() direct executor}.
*/
@CheckReturnValue
public static <T> ListenableFuture<List<T>> asList(final InputChannel<T> channel) {
return asList(channel, MoreExecutors.directExecutor());
}
/**
* Returns a new {@link ListenableFuture} whose result is the list of all elements received
* from the provided {@link InputChannel}.
* <p>
* The returned future will be executed on the provided {@code executor}.
*/
@CheckReturnValue
public static <T> ListenableFuture<List<T>> asList(final InputChannel<T> channel,
Executor executor) {
final SettableFuture<List<T>> future = SettableFuture.create();
Futures.addCallback(channel.recv(), newCallbackForList(
channel, new ArrayList<T>(), future, executor), executor);
return future;
}
private static <T> FutureCallback<T> newCallbackForList(
final InputChannel<T> channel, final List<T> list,
final SettableFuture<List<T>> future,
final Executor executor) {
return new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
list.add(result);
Futures.addCallback(channel.recv(),
newCallbackForList(channel, list, future, executor), executor);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof EndOfFileException) {
future.set(list);
} else {
future.setException(t);
}
}
};
}
/**
* Returns a new {@link ListenableFuture} whose result is available when the provided
* {@link InputChannel} has exhausted all of its elements.
* <p>
* The returned future will be executed on a
* {@link MoreExecutors#directExecutor() direct executor}.
*/
@CheckReturnValue
public static <T> ListenableFuture<Void> asDone(final InputChannel<T> channel) {
return asDone(channel, MoreExecutors.directExecutor());
}
/**
* Returns a new {@link ListenableFuture} whose result is available when the provided
* {@link InputChannel} has exhausted all of its elements.
* <p>
* The returned future will be executed on the provided {@code executor}.
*/
@CheckReturnValue
public static <T> ListenableFuture<Void> asDone(final InputChannel<T> channel,
Executor executor) {
final SettableFuture<Void> future = SettableFuture.create();
Futures.addCallback(channel.recv(),
newCallbackForDone(channel, future, executor), executor);
return future;
}
private static <T> FutureCallback<T> newCallbackForDone(final InputChannel<T> channel,
final SettableFuture<Void> future,
final Executor executor) {
return new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
Futures.addCallback(channel.recv(),
newCallbackForDone(channel, future, executor), executor);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof EndOfFileException) {
future.set(null);
} else {
future.setException(t);
}
}
};
}
/**
* Returns a {@link VIterable} over all the elements in {@code channel}, blocking in every
* iteration.
* <p>
* The returned iterator will terminate gracefully iff {@code channel}'s
* {@link InputChannel#recv} call fails with a {@link io.v.v23.verror.EndOfFileException}.
*/
public static <T> VIterable<T> asIterable(InputChannel<? extends T> channel) {
return new ChannelIterable<>(channel);
}
/**
* Iterates over all elements in {@code channel}, invoking {@link InputChannelCallback#onNext}
* method on the provided callback for each element.
* <p>
* Returns a new {@link ListenableFuture} that completes when the provided {@link InputChannel}
* has exhausted all of its elements or has encountered an error.
* <p>
* The returned future and all the callbacks will be executed on a
* {@link MoreExecutors#directExecutor() direct executor}.
*/
@CheckReturnValue
public static <T> ListenableFuture<Void> withCallback(
InputChannel<T> channel, InputChannelCallback<? super T> callback) {
return withCallback(channel, callback, MoreExecutors.directExecutor());
}
/**
* Iterates over all elements in {@code channel}, invoking {@link InputChannelCallback#onNext}
* method on the provided callback for each element.
* <p>
* Returns a new {@link ListenableFuture} that completes when the provided {@link InputChannel}
* has exhausted all of its elements or has encountered an error.
* <p>
* The returned future and all the callbacks will be executed on the provided {@code executor}.
*/
@CheckReturnValue
public static <T> ListenableFuture<Void> withCallback(
InputChannel<T> channel,
InputChannelCallback<? super T> callback,
Executor executor) {
final SettableFuture<Void> future = SettableFuture.create();
Futures.addCallback(channel.recv(),
newCallbackForCallback(channel, future, callback, executor), executor);
return future;
}
private static <T> FutureCallback<T> newCallbackForCallback(
final InputChannel<T> channel, final SettableFuture<Void> future,
final InputChannelCallback<? super T> callback, final Executor executor) {
return new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
ListenableFuture<Void> done = callback.onNext(result);
if (done == null) {
done = Futures.immediateFuture(null);
}
Futures.addCallback(Futures.transform(done, new AsyncFunction<Void, T>() {
@Override
public ListenableFuture<T> apply(Void input) throws Exception {
return channel.recv();
}
}),
newCallbackForCallback(channel, future, callback, executor), executor);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof EndOfFileException) {
future.set(null);
return;
}
future.setException(t);
}
};
}
private static class TransformedChannel<F, T> implements InputChannel<T> {
private final VContext ctx;
private final InputChannel<F> fromChannel;
private final TransformFunction<? super F, ? extends T> function;
private TransformedChannel(VContext ctx,
InputChannel<F> fromChannel,
TransformFunction<? super F, ? extends T> function) {
this.ctx = ctx;
this.fromChannel = fromChannel;
this.function = function;
}
@Override
public ListenableFuture<T> recv() {
return VFutures.withUserLandChecks(ctx,
Futures.transform(fromChannel.recv(), new AsyncFunction<F, T>() {
@Override
public ListenableFuture<T> apply(F input) throws Exception {
T output = function.apply(input);
if (output == null) {
return recv();
}
return Futures.immediateFuture(output);
}
}));
}
}
private static class ChannelIterable<T> implements VIterable<T> {
private final InputChannel<? extends T> fromChannel;
private boolean isCreated;
private volatile VException error;
private ChannelIterable(InputChannel<? extends T> fromChannel) {
this.fromChannel = fromChannel;
}
public synchronized Iterator<T> iterator() {
Preconditions.checkState(!isCreated, "Can only create one iterator.");
isCreated = true;
return new AbstractIterator<T>() {
protected T computeNext() {
try {
T result = sync(fromChannel.recv());
return result;
} catch (EndOfFileException e) {
return endOfData();
} catch (VException e) {
error = e;
return endOfData();
}
}
};
}
public VException error() {
return error != null ? error : null;
}
}
}