blob: ec887b2716d8ddc3bf13a1b6ddc392e9b1f94b23 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.x.jni.test.fortune;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.v.v23.InputChannelCallback;
import io.v.v23.InputChannels;
import io.v.v23.context.VContext;
import io.v.v23.naming.GlobError;
import io.v.v23.naming.GlobReply;
import io.v.v23.naming.MountEntry;
import io.v.v23.naming.MountedServer;
import io.v.v23.rpc.Globber;
import io.v.v23.rpc.ServerCall;
import io.v.v23.vdl.ServerSendStream;
import io.v.v23.vdl.ServerStream;
import io.v.v23.vdl.VdlUint32;
import io.v.v23.verror.VException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class FortuneServerImpl implements FortuneServer, Globber {
private static final ComplexErrorParam COMPLEX_PARAM = new ComplexErrorParam(
"StrVal",
11,
ImmutableList.<VdlUint32>of(new VdlUint32(22), new VdlUint32(33)));
public static final ComplexException COMPLEX_ERROR = new ComplexException(
"en", "test", "test", COMPLEX_PARAM, "secondParam", 3);
private final CountDownLatch clientLatch;
private final CountDownLatch serverLatch;
private String lastAddedFortune;
public FortuneServerImpl() {
this(null, null);
}
/**
* Creates a new instance of this server with the provided latches.
*
* @param clientLatch if not {@code null}, {@link FortuneServerImpl#get} method will block
* until the latch is counted down
* @param serverLatch if not {@code null}, {@link FortuneServerImpl#get} method will count
* down this latch upon its invocation
*/
public FortuneServerImpl(CountDownLatch clientLatch, CountDownLatch serverLatch) {
this.clientLatch = clientLatch;
this.serverLatch = serverLatch;
}
@Override
public ListenableFuture<String> get(final VContext context, ServerCall call) {
if (serverLatch != null) {
serverLatch.countDown();
}
if (clientLatch != null) {
try {
// Caution: this is not idiomatic for server methods: they must be non-blocking.
// However, it helps us with LameDuck tests.
clientLatch.await();
} catch (InterruptedException e) {
return Futures.immediateFailedFuture(new VException(e.getMessage()));
}
}
if (lastAddedFortune == null) {
return Futures.immediateFailedFuture(new NoFortunesException(context));
}
return Futures.immediateFuture(lastAddedFortune);
}
@Override
public ListenableFuture<Map<String, String>> parameterizedGet(
VContext context, ServerCall call) {
if (lastAddedFortune == null) {
return Futures.immediateFailedFuture(new NoFortunesException(context));
}
return Futures.immediateFuture(
(Map<String, String>) ImmutableMap.of(lastAddedFortune, lastAddedFortune));
}
@Override
public ListenableFuture<Void> add(VContext context, ServerCall call, String fortune) {
lastAddedFortune = fortune;
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Integer> streamingGet(
final VContext context, ServerCall call, final ServerStream<String, Boolean> stream) {
final SettableFuture<Integer> future = SettableFuture.create();
final AtomicInteger numSent = new AtomicInteger(0);
Futures.addCallback(InputChannels.withCallback(stream, new InputChannelCallback<Boolean>() {
@Override
public ListenableFuture<Void> onNext(Boolean result) {
if (lastAddedFortune == null) {
return Futures.immediateFailedFuture(new NoFortunesException(context));
}
return Futures.transform(stream.send(lastAddedFortune),
new Function<Void, Void>() {
@Override
public Void apply(Void input) {
numSent.incrementAndGet();
return null;
}
});
}
}), new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
future.set(numSent.get());
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
});
return future;
}
@Override
public ListenableFuture<MultipleGetOut> multipleGet(VContext context, ServerCall call) {
if (lastAddedFortune == null) {
return Futures.immediateFailedFuture(new NoFortunesException(context));
}
MultipleGetOut ret = new MultipleGetOut();
ret.fortune = lastAddedFortune;
ret.another = lastAddedFortune;
return Futures.immediateFuture(ret);
}
@Override
public ListenableFuture<MultipleStreamingGetOut> multipleStreamingGet(
final VContext context, ServerCall call, final ServerStream<String, Boolean> stream) {
final SettableFuture<MultipleStreamingGetOut> future = SettableFuture.create();
final AtomicInteger numSent = new AtomicInteger(0);
Futures.addCallback(InputChannels.withCallback(stream, new InputChannelCallback<Boolean>() {
@Override
public ListenableFuture<Void> onNext(Boolean result) {
if (lastAddedFortune == null) {
return Futures.immediateFailedFuture(new NoFortunesException(context));
}
return Futures.transform(stream.send(lastAddedFortune),
new Function<Void, Void>() {
@Override
public Void apply(Void input) {
numSent.incrementAndGet();
return null;
}
});
}
}), new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
MultipleStreamingGetOut ret = new MultipleStreamingGetOut();
ret.total = numSent.get();
ret.another = numSent.get();
future.set(ret);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
});
return future;
}
@Override
public ListenableFuture<Void> getComplexError(VContext context, ServerCall call) {
return Futures.immediateFailedFuture(COMPLEX_ERROR);
}
@Override
public ListenableFuture<Void> testServerCall(VContext context, ServerCall call) {
if (call == null) {
return Futures.immediateFailedFuture(new VException("ServerCall is null"));
}
if (call.suffix() == null) {
return Futures.immediateFailedFuture(new VException("Suffix is null"));
}
if (call.localEndpoint() == null) {
return Futures.immediateFailedFuture(new VException("Local endpoint is null"));
}
if (call.remoteEndpoint() == null) {
return Futures.immediateFailedFuture(new VException("Remote endpoint is null"));
}
if (context == null) {
return Futures.immediateFailedFuture(new VException("Vanadium context is null"));
}
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<String> getServerThread(VContext context, ServerCall call) {
return Futures.immediateFuture(Thread.currentThread().toString());
}
@Override
public ListenableFuture<Void> noTags(VContext context, ServerCall call) {
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> glob(VContext context, ServerCall call,
String pattern, final ServerSendStream<GlobReply> stream) {
final GlobReply.Entry entry = new GlobReply.Entry(
new MountEntry("helloworld", ImmutableList.<MountedServer>of(), false, false));
final GlobReply.Error error = new GlobReply.Error(
new GlobError("Hello, world!", new VException("Some error")));
return Futures.transform(stream.send(entry), new AsyncFunction<Void, Void>() {
@Override
public ListenableFuture<Void> apply(Void input) throws Exception {
return stream.send(error);
}
});
}
}