| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package xproxyd_test |
| |
| import ( |
| "bufio" |
| "fmt" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "v.io/x/ref" |
| _ "v.io/x/ref/runtime/factories/generic" |
| "v.io/x/ref/services/xproxyd" |
| "v.io/x/ref/test/goroutines" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/flow" |
| "v.io/v23/naming" |
| "v.io/v23/rpc" |
| "v.io/v23/security" |
| ) |
| |
| const ( |
| leakWaitTime = 250 * time.Millisecond |
| pollTime = 50 * time.Millisecond |
| ) |
| |
| type testService struct{} |
| |
| func (t *testService) Echo(ctx *context.T, call rpc.ServerCall, arg string) (string, error) { |
| return "response:" + arg, nil |
| } |
| |
| func TestProxyRPC(t *testing.T) { |
| if ref.RPCTransitionState() != ref.XServers { |
| t.Skip("Test only runs under 'V23_RPC_TRANSITION_STATE==xservers'") |
| } |
| defer goroutines.NoLeaks(t, leakWaitTime)() |
| ctx, shutdown := v23.Init() |
| defer shutdown() |
| |
| // Start the proxy. |
| pep := startProxy(t, ctx, address{"tcp", "127.0.0.1:0"}) |
| |
| // Start the server listening through the proxy. |
| ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: pep.Name()}) |
| _, s, err := v23.WithNewServer(ctx, "", &testService{}, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| // Wait for the server to finish listening through the proxy. |
| eps := s.Status().Endpoints |
| for ; len(eps) < 2 || eps[1].Addr().Network() == ""; eps = s.Status().Endpoints { |
| time.Sleep(pollTime) |
| } |
| |
| var got string |
| if err := v23.GetClient(ctx).Call(ctx, eps[1].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil { |
| t.Fatal(err) |
| } |
| if want := "response:hello"; got != want { |
| t.Errorf("got %v, want %v", got, want) |
| } |
| } |
| |
| func TestMultipleProxyRPC(t *testing.T) { |
| if ref.RPCTransitionState() != ref.XServers { |
| t.Skip("Test only runs under 'V23_RPC_TRANSITION_STATE==xservers'") |
| } |
| defer goroutines.NoLeaks(t, leakWaitTime)() |
| kp := newKillProtocol() |
| flow.RegisterProtocol("kill", kp) |
| ctx, shutdown := v23.Init() |
| defer shutdown() |
| |
| // Start the proxies. |
| pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"}) |
| p2ep := startProxy(t, ctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"}) |
| |
| // Start the server listening through the proxy. |
| ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: p2ep.Name()}) |
| _, s, err := v23.WithNewServer(ctx, "", &testService{}, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| // Create a new flow manager for the client. |
| cctx, _, err := v23.ExperimentalWithNewFlowManager(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| // Wait for the server to finish listening through the proxy. |
| eps := s.Status().Endpoints |
| for ; len(eps) == 0 || eps[0].Addr().Network() == ""; eps = s.Status().Endpoints { |
| time.Sleep(pollTime) |
| } |
| |
| var got string |
| if err := v23.GetClient(cctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil { |
| t.Fatal(err) |
| } |
| if want := "response:hello"; got != want { |
| t.Errorf("got %v, want %v", got, want) |
| } |
| } |
| |
| // TODO(suharshs): Remove the below tests when the transition is complete. |
| func TestSingleProxy(t *testing.T) { |
| defer goroutines.NoLeaks(t, leakWaitTime)() |
| kp := newKillProtocol() |
| flow.RegisterProtocol("kill", kp) |
| pctx, shutdown := v23.Init() |
| defer shutdown() |
| actx, am, err := v23.ExperimentalWithNewFlowManager(pctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"}) |
| |
| done := make(chan struct{}) |
| update := func(eps []naming.Endpoint) { |
| if len(eps) > 0 { |
| if err := testEndToEndConnection(t, dctx, actx, dm, am, eps[0]); err != nil { |
| t.Error(err) |
| } |
| close(done) |
| } |
| } |
| |
| if err := am.ProxyListen(actx, pep, update); err != nil { |
| t.Fatal(err) |
| } |
| <-done |
| } |
| |
| func TestMultipleProxies(t *testing.T) { |
| defer goroutines.NoLeaks(t, leakWaitTime)() |
| kp := newKillProtocol() |
| flow.RegisterProtocol("kill", kp) |
| pctx, shutdown := v23.Init() |
| defer shutdown() |
| actx, am, err := v23.ExperimentalWithNewFlowManager(pctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"}) |
| |
| p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"}) |
| |
| p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"}) |
| |
| ch := make(chan struct{}) |
| var allEps []naming.Endpoint |
| idx := 0 |
| update := func(eps []naming.Endpoint) { |
| // TODO(suharshs): Fix this test once we have the proxy send update messages to the |
| // server when it reconnects to a proxy. |
| if len(eps) == 3 { |
| allEps = eps |
| } |
| if len(eps) > 0 { |
| if err := testEndToEndConnection(t, dctx, actx, dm, am, allEps[idx]); err != nil { |
| t.Error(err) |
| } |
| idx++ |
| ch <- struct{}{} |
| } |
| } |
| |
| if err := am.ProxyListen(actx, p3ep, update); err != nil { |
| t.Fatal(err) |
| } |
| |
| <-ch |
| // Test the other two endpoints. |
| for i := 0; i < 2; i++ { |
| // Kill the connections to test reconnection. |
| kp.KillConnections() |
| <-ch |
| } |
| } |
| |
| func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) error { |
| // The dialing flow.Manager dials a flow to the accepting flow.Manager. |
| want := "Do you read me?" |
| df, err := dm.Dial(dctx, aep, bfp) |
| if err != nil { |
| return err |
| } |
| // We write before accepting to ensure that the openFlow message is sent. |
| if err := writeLine(df, want); err != nil { |
| return err |
| } |
| af, err := am.Accept(actx) |
| if err != nil { |
| return err |
| } |
| got, err := readLine(af) |
| if err != nil { |
| return err |
| } |
| if got != want { |
| return fmt.Errorf("got %v, want %v", got, want) |
| } |
| |
| // Writes in the opposite direction should work as well. |
| want = "I read you loud and clear." |
| if err := writeLine(af, want); err != nil { |
| return err |
| } |
| got, err = readLine(df) |
| if err != nil { |
| return err |
| } |
| if got != want { |
| return fmt.Errorf("got %v, want %v", got, want) |
| } |
| return nil |
| } |
| |
| // TODO(suharshs): Add test for bidirectional RPC. |
| |
| func readLine(f flow.Flow) (string, error) { |
| s, err := bufio.NewReader(f).ReadString('\n') |
| return strings.TrimRight(s, "\n"), err |
| } |
| |
| func writeLine(f flow.Flow, data string) error { |
| data += "\n" |
| _, err := f.Write([]byte(data)) |
| return err |
| } |
| |
| func bfp( |
| ctx *context.T, |
| localEndpoint, remoteEndpoint naming.Endpoint, |
| remoteBlessings security.Blessings, |
| remoteDischarges map[string]security.Discharge, |
| ) (security.Blessings, map[string]security.Discharge, error) { |
| return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil |
| } |
| |
| type address struct { |
| Protocol, Address string |
| } |
| |
| func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint { |
| var ls rpc.ListenSpec |
| hasProxies := false |
| for _, addr := range addrs { |
| ls.Addrs = append(ls.Addrs, addr) |
| if addr.Protocol == "v23" { |
| hasProxies = true |
| } |
| } |
| ctx = v23.WithListenSpec(ctx, ls) |
| proxy, _, err := xproxyd.New(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| // Wait for the proxy to connect to its proxies. |
| if hasProxies { |
| for len(proxy.MultipleProxyEndpoints()) == 0 { |
| time.Sleep(pollTime) |
| } |
| } |
| peps := proxy.ListeningEndpoints() |
| for _, pep := range peps { |
| if pep.Addr().Network() == "tcp" || pep.Addr().Network() == "kill" { |
| return pep |
| } |
| } |
| t.Fatal("Proxy not listening on network address.") |
| return nil |
| } |
| |
| type killProtocol struct { |
| protocol flow.Protocol |
| mu sync.Mutex |
| conns []flow.Conn |
| } |
| |
| func newKillProtocol() *killProtocol { |
| p, _ := flow.RegisteredProtocol("tcp") |
| return &killProtocol{protocol: p} |
| } |
| |
| func (p *killProtocol) KillConnections() { |
| p.mu.Lock() |
| for _, c := range p.conns { |
| c.Close() |
| } |
| p.conns = nil |
| p.mu.Unlock() |
| } |
| |
| func (p *killProtocol) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) { |
| c, err := p.protocol.Dial(ctx, "tcp", address, timeout) |
| if err != nil { |
| return nil, err |
| } |
| p.mu.Lock() |
| p.conns = append(p.conns, c) |
| p.mu.Unlock() |
| return c, nil |
| } |
| |
| func (p *killProtocol) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) { |
| return p.protocol.Listen(ctx, "tcp", address) |
| } |
| |
| func (p *killProtocol) Resolve(ctx *context.T, protocol, address string) (string, string, error) { |
| return p.protocol.Resolve(ctx, "tcp", address) |
| } |