| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package proxy_test |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "reflect" |
| "strings" |
| "testing" |
| "time" |
| |
| "v.io/x/lib/vlog" |
| |
| "v.io/v23" |
| "v.io/v23/naming" |
| "v.io/v23/verror" |
| |
| _ "v.io/x/ref/profiles" |
| inaming "v.io/x/ref/profiles/internal/naming" |
| "v.io/x/ref/profiles/internal/rpc/stream" |
| "v.io/x/ref/profiles/internal/rpc/stream/manager" |
| "v.io/x/ref/profiles/internal/rpc/stream/proxy" |
| "v.io/x/ref/profiles/internal/rpc/stream/vc" |
| "v.io/x/ref/profiles/internal/rpc/stream/vif" |
| "v.io/x/ref/test" |
| "v.io/x/ref/test/testutil" |
| ) |
| |
| //go:generate v23 test generate |
| |
| func TestProxy(t *testing.T) { |
| ctx, shutdown := test.InitForTest() |
| defer shutdown() |
| |
| pproxy := testutil.NewPrincipal("proxy") |
| |
| _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer shutdown() |
| principal := testutil.NewPrincipal("test") |
| blessings := principal.BlessingStore().Default() |
| |
| vlog.Infof("PROXYEP: %s", proxyEp) |
| |
| // Create the stream.Manager for the server. |
| server1 := manager.InternalNew(naming.FixedRoutingID(0x1111111111111111)) |
| defer server1.Shutdown() |
| // Setup a stream.Listener that will accept VCs and Flows routed |
| // through the proxy. |
| ln1, ep1, err := server1.Listen(proxyEp.Network(), proxyEp.String(), principal, blessings) |
| if err != nil { |
| t.Logf(verror.DebugString(err)) |
| t.Fatal(err) |
| } |
| defer ln1.Close() |
| |
| // Create the stream.Manager for a second server. |
| server2 := manager.InternalNew(naming.FixedRoutingID(0x2222222222222222)) |
| defer server2.Shutdown() |
| // Setup a stream.Listener that will accept VCs and Flows routed |
| // through the proxy. |
| ln2, ep2, err := server2.Listen(proxyEp.Network(), proxyEp.String(), principal, blessings) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer ln2.Close() |
| |
| // Create the stream.Manager for a client. |
| client := manager.InternalNew(naming.FixedRoutingID(0xcccccccccccccccc)) |
| defer client.Shutdown() |
| |
| cases := []struct { |
| client stream.Manager |
| ln stream.Listener |
| ep naming.Endpoint |
| }{ |
| {client, ln1, ep1}, // client writing to server1 |
| {server1, ln2, ep2}, // server1 writing to server2 |
| {server1, ln1, ep1}, // server1 writing to itself |
| } |
| |
| const written = "the dough rises" |
| for i, c := range cases { |
| name := fmt.Sprintf("case #%d(write to %v):", i, c.ep) |
| // Accept a single flow and write out what is read to readChan |
| readChan := make(chan string) |
| go readFlow(t, c.ln, readChan) |
| if err := writeFlow(c.client, c.ep, written); err != nil { |
| t.Errorf("%s: %v", name, err) |
| continue |
| } |
| // Validate that the data read is the same as the data written. |
| if read := <-readChan; read != written { |
| t.Errorf("case #%d: Read %q, wrote %q", i, read, written) |
| } |
| } |
| } |
| |
| func TestDuplicateRoutingID(t *testing.T) { |
| ctx, shutdown := test.InitForTest() |
| defer shutdown() |
| |
| pproxy := testutil.NewPrincipal("proxy") |
| _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer shutdown() |
| |
| // Create the stream.Manager for server1 and server2, both with the same routing ID |
| serverRID := naming.FixedRoutingID(0x5555555555555555) |
| server1 := manager.InternalNew(serverRID) |
| server2 := manager.InternalNew(serverRID) |
| defer server1.Shutdown() |
| defer server2.Shutdown() |
| |
| principal := testutil.NewPrincipal("test") |
| blessings := principal.BlessingStore().Default() |
| |
| // First server to claim serverRID should win. |
| ln1, ep1, err := server1.Listen(proxyEp.Network(), proxyEp.String(), principal, blessings) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer ln1.Close() |
| |
| ln2, ep2, err := server2.Listen(proxyEp.Network(), proxyEp.String(), principal, blessings) |
| if pattern := "routing id 00000000000000005555555555555555 is already being proxied"; err == nil || !strings.Contains(err.Error(), pattern) { |
| t.Errorf("Got (%v, %v, %v) want error \"...%v\" (ep1:%v)", ln2, ep2, err, pattern, ep1) |
| } |
| } |
| |
| func TestProxyAuthentication(t *testing.T) { |
| ctx, shutdown := test.InitForTest() |
| defer shutdown() |
| |
| pproxy := testutil.NewPrincipal("proxy") |
| _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer shutdown() |
| if got, want := proxyEp.BlessingNames(), []string{"proxy"}; !reflect.DeepEqual(got, want) { |
| t.Errorf("Proxy endpoint blessing names: got %v, want %v", got, want) |
| } |
| |
| other := manager.InternalNew(naming.FixedRoutingID(0xcccccccccccccccc)) |
| defer other.Shutdown() |
| |
| vc, err := other.Dial(proxyEp, testutil.NewPrincipal("other")) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| flow, err := vc.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| if got, want := flow.RemoteBlessings(), pproxy.BlessingStore().Default(); !reflect.DeepEqual(got, want) { |
| t.Errorf("Proxy authenticated as [%v], want [%v]", got, want) |
| } |
| } |
| |
| func TestServerBlessings(t *testing.T) { |
| ctx, shutdown := test.InitForTest() |
| defer shutdown() |
| |
| var ( |
| pproxy = testutil.NewPrincipal("proxy") |
| pserver = testutil.NewPrincipal("server") |
| pclient = testutil.NewPrincipal("client") |
| ) |
| |
| _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer shutdown() |
| if got, want := proxyEp.BlessingNames(), []string{"proxy"}; !reflect.DeepEqual(got, want) { |
| t.Errorf("Proxy endpoint blessing names: got %v, want %v", got, want) |
| } |
| |
| server := manager.InternalNew(naming.FixedRoutingID(0x5555555555555555)) |
| defer server.Shutdown() |
| |
| ln, ep, err := server.Listen(proxyEp.Network(), proxyEp.String(), pserver, pserver.BlessingStore().Default()) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if got, want := ep.BlessingNames(), []string{"server"}; !reflect.DeepEqual(got, want) { |
| t.Errorf("Server endpoint %q: Got BlessingNames %v, want %v", ep, got, want) |
| } |
| defer ln.Close() |
| go func() { |
| for { |
| if _, err := ln.Accept(); err != nil { |
| return |
| } |
| } |
| }() |
| |
| client := manager.InternalNew(naming.FixedRoutingID(0xcccccccccccccccc)) |
| defer client.Shutdown() |
| vc, err := client.Dial(ep, pclient) |
| if err != nil { |
| t.Fatal(err) |
| } |
| flow, err := vc.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| if got, want := flow.RemoteBlessings(), pserver.BlessingStore().Default(); !reflect.DeepEqual(got, want) { |
| t.Errorf("Got [%v] want [%v]", got, want) |
| } |
| } |
| |
| func TestHostPort(t *testing.T) { |
| ctx, shutdown := test.InitForTest() |
| defer shutdown() |
| |
| pproxy := testutil.NewPrincipal("proxy") |
| _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer shutdown() |
| server := manager.InternalNew(naming.FixedRoutingID(0x5555555555555555)) |
| defer server.Shutdown() |
| addr := proxyEp.Addr().String() |
| port := addr[strings.LastIndex(addr, ":"):] |
| principal := testutil.NewPrincipal("test") |
| blessings := principal.BlessingStore().Default() |
| ln, _, err := server.Listen(inaming.Network, "127.0.0.1"+port, principal, blessings) |
| if err != nil { |
| t.Fatal(err) |
| } |
| ln.Close() |
| } |
| |
| func TestClientBecomesServer(t *testing.T) { |
| ctx, shutdown := test.InitForTest() |
| defer shutdown() |
| |
| pproxy := testutil.NewPrincipal("proxy") |
| _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| server := manager.InternalNew(naming.FixedRoutingID(0x5555555555555555)) |
| client1 := manager.InternalNew(naming.FixedRoutingID(0x1111111111111111)) |
| client2 := manager.InternalNew(naming.FixedRoutingID(0x2222222222222222)) |
| defer shutdown() |
| defer server.Shutdown() |
| defer client1.Shutdown() |
| defer client2.Shutdown() |
| |
| principal := testutil.NewPrincipal("test") |
| blessings := principal.BlessingStore().Default() |
| lnS, epS, err := server.Listen(proxyEp.Network(), proxyEp.String(), principal, blessings) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer lnS.Close() |
| rchan := make(chan string) |
| |
| pclient1 := testutil.NewPrincipal("client1") |
| |
| // client1 must connect to the proxy to speak to the server. |
| // Keep a VC and Flow open to the server, to ensure that the proxy |
| // maintains routing information (at some point, inactive VIFs |
| // should be garbage collected, so this ensures that the VIF |
| // is "active") |
| if vc, err := client1.Dial(epS, pclient1); err != nil { |
| t.Fatal(err) |
| } else if flow, err := vc.Connect(); err != nil { |
| t.Fatal(err) |
| } else { |
| defer flow.Close() |
| } |
| |
| // Now client1 becomes a server |
| lnC, epC, err := client1.Listen(proxyEp.Network(), proxyEp.String(), pclient1, pclient1.BlessingStore().Default()) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer lnC.Close() |
| // client2 should be able to talk to client1 through the proxy |
| rchan = make(chan string) |
| go readFlow(t, lnC, rchan) |
| if err := writeFlow(client2, epC, "daffy duck"); err != nil { |
| t.Fatal("client2 failed to chat with client1: %v", err) |
| } |
| if got, want := <-rchan, "daffy duck"; got != want { |
| t.Fatal("client2->client1 got %q want %q", got, want) |
| } |
| } |
| |
| func testProxyIdleTimeout(t *testing.T, testServer bool) { |
| ctx, shutdown := test.InitForTest() |
| defer shutdown() |
| |
| const ( |
| idleTime = 10 * time.Millisecond |
| // We use a long wait time here since it takes some time to handle VC close |
| // especially in race testing. |
| waitTime = 150 * time.Millisecond |
| ) |
| |
| var ( |
| pproxy = testutil.NewPrincipal("proxy") |
| pserver = testutil.NewPrincipal("server") |
| pclient = testutil.NewPrincipal("client") |
| |
| opts []stream.VCOpt |
| lopts []stream.ListenerOpt |
| ) |
| if testServer { |
| lopts = []stream.ListenerOpt{vc.IdleTimeout{idleTime}} |
| } else { |
| opts = []stream.VCOpt{vc.IdleTimeout{idleTime}} |
| } |
| |
| // Pause the idle timers. |
| triggerTimers := vif.SetFakeTimers() |
| |
| Proxy, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer shutdown() |
| |
| // Create the stream.Manager for the server. |
| server := manager.InternalNew(naming.FixedRoutingID(0x1111111111111111)) |
| defer server.Shutdown() |
| // Setup a stream.Listener that will accept VCs and Flows routed |
| // through the proxy. |
| ln, ep, err := server.Listen(proxyEp.Network(), proxyEp.String(), pserver, pserver.BlessingStore().Default(), lopts...) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer ln.Close() |
| go func() { |
| for { |
| if _, err := ln.Accept(); err != nil { |
| return |
| } |
| } |
| }() |
| |
| // Create the stream.Manager for a client. |
| client := manager.InternalNew(naming.FixedRoutingID(0xcccccccccccccccc)) |
| defer client.Shutdown() |
| |
| // Open a VC and a Flow. |
| VC, err := client.Dial(ep, pclient, opts...) |
| if err != nil { |
| t.Fatal(err) |
| } |
| flow, err := VC.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // Trigger the idle timers. |
| triggerTimers() |
| |
| if numProcs := proxy.NumProcesses(Proxy); numProcs != 2 { |
| // There should be two processes at this point. |
| t.Fatal(fmt.Errorf("Unexpected number of processes: %d\n", numProcs)) |
| } |
| |
| // There is one active flow. The VC should be kept open. |
| time.Sleep(waitTime) |
| if numProcs := proxy.NumProcesses(Proxy); numProcs != 2 { |
| t.Errorf("Want VC is kept open; closed") |
| } |
| |
| flow.Close() |
| |
| // The flow has been closed. The VC should be closed after idle timeout. |
| timeout := time.After(waitTime) |
| for done := false; !done; { |
| select { |
| case <-time.After(idleTime * 2): |
| done = proxy.NumProcesses(Proxy) == 1 |
| case <-timeout: |
| done = true |
| } |
| } |
| if numProcs := proxy.NumProcesses(Proxy); numProcs != 1 { |
| t.Error("Want VC has been closed; still open") |
| } |
| |
| client.ShutdownEndpoint(ep) |
| |
| // Even when the idle timeout is set for VC in server, we still should be |
| // able to dial to the server through the proxy, since one VC between the |
| // server and the proxy should be kept alive as the proxy protocol. |
| // |
| // We use fake timers here again to avoid idle timeout during dialing. |
| defer vif.SetFakeTimers()() |
| if _, err := client.Dial(ep, pclient, opts...); err != nil { |
| t.Errorf("Want to dial to the server; can't dial: %v", err) |
| } |
| } |
| |
| func TestProxyIdleTimeout(t *testing.T) { testProxyIdleTimeout(t, false) } |
| func TestProxyIdleTimeoutServer(t *testing.T) { testProxyIdleTimeout(t, true) } |
| |
| func writeFlow(mgr stream.Manager, ep naming.Endpoint, data string) error { |
| vc, err := mgr.Dial(ep, testutil.NewPrincipal("test")) |
| if err != nil { |
| return fmt.Errorf("manager.Dial(%v) failed: %v", ep, err) |
| } |
| flow, err := vc.Connect() |
| if err != nil { |
| return fmt.Errorf("vc.Connect failed: %v", err) |
| } |
| defer flow.Close() |
| if _, err := flow.Write([]byte(data)); err != nil { |
| return fmt.Errorf("flow.Write failed: %v", err) |
| } |
| return nil |
| } |
| |
| func readFlow(t *testing.T, ln stream.Listener, read chan<- string) { |
| defer close(read) |
| flow, err := ln.Accept() |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| var tmp [1024]byte |
| var buf bytes.Buffer |
| for { |
| n, err := flow.Read(tmp[:]) |
| if err == io.EOF { |
| read <- buf.String() |
| return |
| } |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| buf.Write(tmp[:n]) |
| } |
| } |