| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package io.v.x.jni.test.fortune; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.reflect.TypeToken; |
| import io.v.v23.OutputChannel; |
| import io.v.v23.V; |
| import io.v.v23.context.CancelableVContext; |
| import io.v.v23.context.VContext; |
| import io.v.v23.naming.GlobReply; |
| import io.v.v23.rpc.Callback; |
| import io.v.v23.rpc.Client; |
| import io.v.v23.rpc.ClientCall; |
| import io.v.v23.rpc.Dispatcher; |
| import io.v.v23.rpc.Invoker; |
| import io.v.v23.rpc.ListenSpec; |
| import io.v.v23.rpc.Server; |
| import io.v.v23.rpc.ServerCall; |
| import io.v.v23.rpc.ServiceObjectWithAuthorizer; |
| import io.v.v23.rpc.StreamServerCall; |
| import io.v.v23.vdl.TypedClientStream; |
| import io.v.v23.vdl.VdlValue; |
| import io.v.v23.vdlroot.signature.Interface; |
| import io.v.v23.vdlroot.signature.Method; |
| import io.v.v23.verror.VException; |
| import junit.framework.TestCase; |
| import org.joda.time.Duration; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.lang.reflect.Type; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| |
| public class FortuneTest extends TestCase { |
| private static final String TEST_INVOKER_FORTUNE = "Test invoker fortune"; |
| |
| private VContext ctx; |
| |
| @Override |
| protected void setUp() throws Exception { |
| ctx = V.init(); |
| ListenSpec.Address addr = new ListenSpec.Address("tcp", "127.0.0.1:0"); |
| ctx = V.withListenSpec(ctx, V.getListenSpec(ctx).withAddress(addr)); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| Server s = V.getServer(ctx); |
| if (s != null) { |
| s.stop(); |
| } |
| V.shutdown(); |
| } |
| |
| private String name() { |
| Server s = V.getServer(ctx); |
| if (s == null) { |
| return ""; |
| } |
| return "/" + s.getStatus().getEndpoints()[0]; |
| } |
| |
| public void testFortune() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| try { |
| client.get(ctxT); |
| fail("Expected exception during call to get() before call to add()"); |
| } catch (VException e) { |
| if (!e.is(Errors.ERR_NO_FORTUNES)) { |
| fail(String.format("Expected error %s, got %s", Errors.ERR_NO_FORTUNES, e)); |
| } |
| } |
| String firstMessage = "First fortune"; |
| client.add(ctxT, firstMessage); |
| assertEquals(firstMessage, client.get(ctxT)); |
| } |
| |
| public void testAsyncFortune() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| final AtomicReference<String> result = new AtomicReference<>(); |
| final CountDownLatch latch = new CountDownLatch(1); |
| VContext ctxT = ctx.withTimeout(new Duration(2000000)); // 20s |
| client.add(ctxT, "Hello world"); |
| client.get(ctxT, new Callback<String>() { |
| @Override |
| public void onSuccess(String fortune) { |
| result.set(fortune); |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onFailure(VException error) { |
| throw new RuntimeException(error); |
| } |
| }); |
| assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); |
| assertThat(result.get()).isNotEmpty(); |
| } |
| |
| public void testAsyncFortuneWithCancel() throws Exception { |
| CountDownLatch callLatch = new CountDownLatch(1); |
| FortuneServer server = new FortuneServerImpl(callLatch); |
| ctx = V.withNewServer(ctx, "", server, null); |
| CancelableVContext cancelCtx = ctx.withCancel(); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| final AtomicReference<String> result = new AtomicReference<>(); |
| final AtomicReference<VException> errorResult = new AtomicReference<>(); |
| final CountDownLatch latch = new CountDownLatch(1); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| client.add(ctxT, "Hello world"); |
| client.get(cancelCtx, new Callback<String>() { |
| @Override |
| public void onSuccess(String fortune) { |
| result.set(fortune); |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onFailure(VException error) { |
| errorResult.set(error); |
| latch.countDown(); |
| } |
| }); |
| // Cancel the RPC. |
| cancelCtx.cancel(); |
| // Allow the server RPC impl to finish. |
| callLatch.countDown(); |
| // The call should have failed, it was canceled before it completed. |
| assertThat(result.get()).isNull(); |
| assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); |
| assertThat(errorResult.get()).isNotNull(); |
| assertThat(errorResult.get().getAction()).isEqualTo(io.v.v23.verror.Errors.CANCELED |
| .getAction()); |
| } |
| |
| public void testStreaming() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| TypedClientStream<Boolean, String, Integer> stream = client.streamingGet(ctxT); |
| String msg = "The only fortune"; |
| client.add(ctxT, msg); |
| try { |
| for (int i = 0; i < 5; ++i) { |
| stream.send(true); |
| assertEquals(msg, stream.recv()); |
| } |
| } catch (EOFException e) { |
| fail("Reached unexpected stream EOF: " + e.getMessage()); |
| } |
| int total = stream.finish(); |
| assertEquals(5, total); |
| } |
| |
| public void testAsyncStreaming() throws Throwable { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| final String msg = "The only fortune"; |
| client.add(ctxT, msg); |
| |
| final AtomicReference<Throwable> errorResult = new AtomicReference<>(); |
| final CountDownLatch latch = new CountDownLatch(1); |
| client.streamingGet(ctxT, new Callback<TypedClientStream<Boolean, |
| String, Integer>>() { |
| @Override |
| public void onSuccess(TypedClientStream<Boolean, String, Integer> stream) { |
| try { |
| for (int i = 0; i < 5; i++) { |
| stream.send(true); |
| assertEquals(msg, stream.recv()); |
| } |
| int total = stream.finish(); |
| assertEquals(5, total); |
| } catch (VException | IOException | AssertionError error) { |
| errorResult.set(error); |
| } finally { |
| latch.countDown(); |
| } |
| } |
| |
| @Override |
| public void onFailure(VException error) { |
| errorResult.set(error); |
| latch.countDown(); |
| } |
| }); |
| assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); |
| if (errorResult.get() != null) { |
| throw errorResult.get(); |
| } |
| } |
| |
| public void testMultiple() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| String firstMessage = "First fortune"; |
| client.add(ctxT, firstMessage); |
| |
| FortuneClient.MultipleGetOut ret = client.multipleGet(ctxT); |
| assertEquals(firstMessage, ret.fortune); |
| assertEquals(firstMessage, ret.another); |
| } |
| |
| public void testComplexError() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| try { |
| client.getComplexError(ctxT); |
| fail("Expected exception during call to getComplexError()"); |
| } catch (VException e) { |
| if (!FortuneServerImpl.COMPLEX_ERROR.deepEquals(e)) { |
| fail(String.format("Expected error %s, got %s", FortuneServerImpl.COMPLEX_ERROR, e)); |
| } |
| } |
| } |
| |
| public void testContext() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| try { |
| client.testServerCall(ctxT); |
| } catch (VException e) { |
| fail("Context check failed: " + e.getMessage()); |
| } |
| } |
| |
| public void testGetServer() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| Server s = V.getServer(ctx); |
| assertThat(s).isNotNull(); |
| } |
| |
| public void testGetSignature() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| Client c = V.getClient(ctx); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| ClientCall call = c.startCall(ctxT, name(), "__Signature", new Object[0], new Type[0]); |
| Object[] results = call.finish(new Type[] { new TypeToken<Interface[]>() {}.getType() }); |
| assertThat(results.length == 1).isTrue(); |
| Interface[] signature = (Interface[]) results[0]; |
| assertThat(signature.length >= 1).isTrue(); |
| assertThat(signature[0].getMethods()).isNotEmpty(); |
| } |
| |
| public void testGlob() throws Exception { |
| FortuneServer server = new FortuneServerImpl(); |
| ctx = V.withNewServer(ctx, "", server, null); |
| |
| List<GlobReply> globResult |
| = ImmutableList.copyOf(V.getNamespace(ctx).glob(ctx, name() + "/*")); |
| assertThat(globResult).hasSize(2); |
| assertThat(globResult.get(0)).isInstanceOf(GlobReply.Entry.class); |
| assertThat(((GlobReply.Entry) globResult.get(0)).getElem().getName()) |
| .isEqualTo(name() + "/helloworld"); |
| assertThat(globResult.get(1)).isInstanceOf(GlobReply.Error.class); |
| } |
| |
| public void testCustomInvoker() throws Exception { |
| ctx = V.withNewServer(ctx, "", new TestInvoker(), null); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| assertThat(client.get(ctxT)).isEqualTo(TEST_INVOKER_FORTUNE); |
| } |
| |
| public void testCustomDispatcherReturningAServer() throws Exception { |
| final FortuneServer server = new FortuneServerImpl(); |
| Dispatcher dispatcher = new Dispatcher() { |
| @Override |
| public ServiceObjectWithAuthorizer lookup(String suffix) throws VException { |
| return new ServiceObjectWithAuthorizer(server, null); |
| } |
| }; |
| ctx = V.withNewServer(ctx, "", dispatcher); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| String firstMessage = "First fortune"; |
| client.add(ctxT, firstMessage); |
| assertEquals(firstMessage, client.get(ctxT)); |
| } |
| |
| public void testCustomDispatcherReturningAnInvoker() throws Exception { |
| Dispatcher dispatcher = new Dispatcher() { |
| @Override |
| public ServiceObjectWithAuthorizer lookup(String suffix) throws VException { |
| return new ServiceObjectWithAuthorizer(new TestInvoker(), null); |
| } |
| }; |
| ctx = V.withNewServer(ctx, "", dispatcher); |
| |
| FortuneClient client = FortuneClientFactory.getFortuneClient(name()); |
| VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s |
| assertThat(client.get(ctxT)).isEqualTo(TEST_INVOKER_FORTUNE); |
| } |
| |
| private static class TestInvoker implements Invoker { |
| @Override |
| public Object[] invoke(VContext ctx, StreamServerCall call, String method, Object[] args) |
| throws VException { |
| if (call.security() == null) { |
| throw new VException("Expected call.security() to return non-null"); |
| } |
| if (call.remoteEndpoint() == null) { |
| throw new VException("Expected remoteEndpoint() to return non-null"); |
| } |
| if (method.equals("get")) { |
| return new Object[] { TEST_INVOKER_FORTUNE }; |
| } |
| throw new VException("Unsupported method: " + method); |
| } |
| @Override |
| public Interface[] getSignature(VContext ctx, ServerCall call) throws VException { |
| throw new VException("getSignature() unimplemented"); |
| } |
| @Override |
| public Method getMethodSignature(VContext ctx, ServerCall call, String method) |
| throws VException { |
| throw new VException("getMethodSignature() unimplemented"); |
| } |
| @Override |
| public Type[] getArgumentTypes(String method) throws VException { |
| if (method.equals("get")) { |
| return new Type[] {}; |
| } |
| throw new VException("Unsupported method: " + method); |
| } |
| @Override |
| public Type[] getResultTypes(String method) throws VException { |
| if (method.equals("get")) { |
| return new Type[] { String.class }; |
| } |
| throw new VException("Unsupported method: " + method); |
| } |
| @Override |
| public VdlValue[] getMethodTags(String method) throws VException { |
| return new VdlValue[] {}; |
| } |
| @Override |
| public void glob(ServerCall call, String pattern, OutputChannel<GlobReply> responseChannel) |
| throws VException { |
| responseChannel.close(); |
| } |
| } |
| } |