blob: 4a0867644c19a29fc063a526213f65dea0ec00b9 [file] [log] [blame]
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package xproxyd_test
6
7import (
8 "bufio"
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -07009 "fmt"
Suharsh Sivakumara5ba0f82015-09-25 18:26:52 -070010 "math/rand"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070011 "strings"
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -070012 "sync"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070013 "testing"
Suharsh Sivakumar21373dd2015-09-15 16:31:00 -070014 "time"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070015
Suharsh Sivakumar0d917c42015-09-25 16:06:19 -070016 "v.io/x/ref"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070017 _ "v.io/x/ref/runtime/factories/generic"
18 "v.io/x/ref/services/xproxyd"
Suharsh Sivakumar21373dd2015-09-15 16:31:00 -070019 "v.io/x/ref/test/goroutines"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070020
21 "v.io/v23"
22 "v.io/v23/context"
23 "v.io/v23/flow"
24 "v.io/v23/naming"
25 "v.io/v23/rpc"
26 "v.io/v23/security"
27)
28
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -070029const (
Suharsh Sivakumarb73dd632015-09-18 13:07:37 -070030 leakWaitTime = 250 * time.Millisecond
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -070031 pollTime = 50 * time.Millisecond
32)
Suharsh Sivakumar21373dd2015-09-15 16:31:00 -070033
Suharsh Sivakumar63dab492015-09-24 18:01:31 -070034type testService struct{}
35
36func (t *testService) Echo(ctx *context.T, call rpc.ServerCall, arg string) (string, error) {
37 return "response:" + arg, nil
38}
39
40func TestProxyRPC(t *testing.T) {
Suharsh Sivakumar0d917c42015-09-25 16:06:19 -070041 if ref.RPCTransitionState() != ref.XServers {
Suharsh Sivakumar63dab492015-09-24 18:01:31 -070042 t.Skip("Test only runs under 'V23_RPC_TRANSITION_STATE==xservers'")
43 }
44 defer goroutines.NoLeaks(t, leakWaitTime)()
45 ctx, shutdown := v23.Init()
46 defer shutdown()
47
48 // Start the proxy.
49 pep := startProxy(t, ctx, address{"tcp", "127.0.0.1:0"})
50
51 // Start the server listening through the proxy.
52 ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: pep.Name()})
53 _, s, err := v23.WithNewServer(ctx, "", &testService{}, nil)
54 if err != nil {
55 t.Fatal(err)
56 }
57 // Wait for the server to finish listening through the proxy.
58 eps := s.Status().Endpoints
59 for ; len(eps) < 2 || eps[1].Addr().Network() == ""; eps = s.Status().Endpoints {
60 time.Sleep(pollTime)
61 }
62
63 var got string
64 if err := v23.GetClient(ctx).Call(ctx, eps[1].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
65 t.Fatal(err)
66 }
67 if want := "response:hello"; got != want {
68 t.Errorf("got %v, want %v", got, want)
69 }
70}
71
72func TestMultipleProxyRPC(t *testing.T) {
Suharsh Sivakumar0d917c42015-09-25 16:06:19 -070073 if ref.RPCTransitionState() != ref.XServers {
Suharsh Sivakumar63dab492015-09-24 18:01:31 -070074 t.Skip("Test only runs under 'V23_RPC_TRANSITION_STATE==xservers'")
75 }
76 defer goroutines.NoLeaks(t, leakWaitTime)()
77 kp := newKillProtocol()
78 flow.RegisterProtocol("kill", kp)
79 ctx, shutdown := v23.Init()
80 defer shutdown()
81
82 // Start the proxies.
83 pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
84 p2ep := startProxy(t, ctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
85
86 // Start the server listening through the proxy.
87 ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: p2ep.Name()})
88 _, s, err := v23.WithNewServer(ctx, "", &testService{}, nil)
89 if err != nil {
90 t.Fatal(err)
91 }
92 // Create a new flow manager for the client.
93 cctx, _, err := v23.ExperimentalWithNewFlowManager(ctx)
94 if err != nil {
95 t.Fatal(err)
96 }
97 // Wait for the server to finish listening through the proxy.
98 eps := s.Status().Endpoints
99 for ; len(eps) == 0 || eps[0].Addr().Network() == ""; eps = s.Status().Endpoints {
100 time.Sleep(pollTime)
101 }
102
103 var got string
104 if err := v23.GetClient(cctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
105 t.Fatal(err)
106 }
107 if want := "response:hello"; got != want {
108 t.Errorf("got %v, want %v", got, want)
109 }
110}
111
112// TODO(suharshs): Remove the below tests when the transition is complete.
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700113func TestSingleProxy(t *testing.T) {
Suharsh Sivakumar21373dd2015-09-15 16:31:00 -0700114 defer goroutines.NoLeaks(t, leakWaitTime)()
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700115 kp := newKillProtocol()
116 flow.RegisterProtocol("kill", kp)
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700117 pctx, shutdown := v23.Init()
118 defer shutdown()
119 actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
120 if err != nil {
121 t.Fatal(err)
122 }
123 dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
124 if err != nil {
125 t.Fatal(err)
126 }
127
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700128 pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700129
Suharsh Sivakumar63dab492015-09-24 18:01:31 -0700130 done := make(chan struct{})
131 update := func(eps []naming.Endpoint) {
132 if len(eps) > 0 {
133 if err := testEndToEndConnection(t, dctx, actx, dm, am, eps[0]); err != nil {
134 t.Error(err)
135 }
136 close(done)
137 }
138 }
139
140 if err := am.ProxyListen(actx, pep, update); err != nil {
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700141 t.Fatal(err)
142 }
Suharsh Sivakumar63dab492015-09-24 18:01:31 -0700143 <-done
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700144}
145
146func TestMultipleProxies(t *testing.T) {
Suharsh Sivakumar21373dd2015-09-15 16:31:00 -0700147 defer goroutines.NoLeaks(t, leakWaitTime)()
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700148 kp := newKillProtocol()
149 flow.RegisterProtocol("kill", kp)
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700150 pctx, shutdown := v23.Init()
151 defer shutdown()
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700152 actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
153 if err != nil {
154 t.Fatal(err)
155 }
156 dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
157 if err != nil {
158 t.Fatal(err)
159 }
160
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700161 pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700162
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700163 p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700164
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700165 p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700166
Suharsh Sivakumara5ba0f82015-09-25 18:26:52 -0700167 done := make(chan struct{})
Suharsh Sivakumar63dab492015-09-24 18:01:31 -0700168 update := func(eps []naming.Endpoint) {
169 // TODO(suharshs): Fix this test once we have the proxy send update messages to the
Suharsh Sivakumara5ba0f82015-09-25 18:26:52 -0700170 // server when it reconnects to a proxy. This test only really tests the first connection
171 // currently because the connections are cached. So we need to kill connections and
172 // wait for them to reestablish but we need proxies to update communicate their new endpoints
173 // to each other and to the server. For now we at least check a random endpoint so the
174 // test will at least fail over many runs if something is wrong.
Suharsh Sivakumar63dab492015-09-24 18:01:31 -0700175 if len(eps) > 0 {
Suharsh Sivakumara5ba0f82015-09-25 18:26:52 -0700176 if err := testEndToEndConnection(t, dctx, actx, dm, am, eps[rand.Int()%3]); err != nil {
Suharsh Sivakumar63dab492015-09-24 18:01:31 -0700177 t.Error(err)
178 }
Suharsh Sivakumara5ba0f82015-09-25 18:26:52 -0700179 close(done)
Suharsh Sivakumar63dab492015-09-24 18:01:31 -0700180 }
181 }
182
183 if err := am.ProxyListen(actx, p3ep, update); err != nil {
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700184 t.Fatal(err)
185 }
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700186
Suharsh Sivakumara5ba0f82015-09-25 18:26:52 -0700187 <-done
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700188}
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700189
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700190func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700191 // The dialing flow.Manager dials a flow to the accepting flow.Manager.
192 want := "Do you read me?"
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700193 df, err := dm.Dial(dctx, aep, bfp)
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700194 if err != nil {
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700195 return err
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700196 }
197 // We write before accepting to ensure that the openFlow message is sent.
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700198 if err := writeLine(df, want); err != nil {
199 return err
200 }
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700201 af, err := am.Accept(actx)
202 if err != nil {
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700203 return err
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700204 }
205 got, err := readLine(af)
206 if err != nil {
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700207 return err
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700208 }
209 if got != want {
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700210 return fmt.Errorf("got %v, want %v", got, want)
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700211 }
212
213 // Writes in the opposite direction should work as well.
214 want = "I read you loud and clear."
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700215 if err := writeLine(af, want); err != nil {
216 return err
217 }
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700218 got, err = readLine(df)
219 if err != nil {
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700220 return err
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700221 }
222 if got != want {
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700223 return fmt.Errorf("got %v, want %v", got, want)
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700224 }
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700225 return nil
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700226}
227
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700228// TODO(suharshs): Add test for bidirectional RPC.
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700229
230func readLine(f flow.Flow) (string, error) {
231 s, err := bufio.NewReader(f).ReadString('\n')
232 return strings.TrimRight(s, "\n"), err
233}
234
235func writeLine(f flow.Flow, data string) error {
236 data += "\n"
237 _, err := f.Write([]byte(data))
238 return err
239}
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700240
241func bfp(
242 ctx *context.T,
243 localEndpoint, remoteEndpoint naming.Endpoint,
244 remoteBlessings security.Blessings,
245 remoteDischarges map[string]security.Discharge,
246) (security.Blessings, map[string]security.Discharge, error) {
247 return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
248}
249
250type address struct {
251 Protocol, Address string
252}
253
254func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint {
255 var ls rpc.ListenSpec
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700256 hasProxies := false
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700257 for _, addr := range addrs {
258 ls.Addrs = append(ls.Addrs, addr)
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700259 if addr.Protocol == "v23" {
260 hasProxies = true
261 }
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700262 }
263 ctx = v23.WithListenSpec(ctx, ls)
Suharsh Sivakumare0feb272015-09-14 18:03:48 -0700264 proxy, _, err := xproxyd.New(ctx)
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700265 if err != nil {
266 t.Fatal(err)
267 }
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700268 // Wait for the proxy to connect to its proxies.
269 if hasProxies {
270 for len(proxy.MultipleProxyEndpoints()) == 0 {
271 time.Sleep(pollTime)
272 }
273 }
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700274 peps := proxy.ListeningEndpoints()
275 for _, pep := range peps {
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700276 if pep.Addr().Network() == "tcp" || pep.Addr().Network() == "kill" {
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700277 return pep
278 }
279 }
280 t.Fatal("Proxy not listening on network address.")
281 return nil
282}
Suharsh Sivakumarf82e2b42015-09-16 19:04:53 -0700283
284type killProtocol struct {
285 protocol flow.Protocol
286 mu sync.Mutex
287 conns []flow.Conn
288}
289
290func newKillProtocol() *killProtocol {
291 p, _ := flow.RegisteredProtocol("tcp")
292 return &killProtocol{protocol: p}
293}
294
295func (p *killProtocol) KillConnections() {
296 p.mu.Lock()
297 for _, c := range p.conns {
298 c.Close()
299 }
300 p.conns = nil
301 p.mu.Unlock()
302}
303
304func (p *killProtocol) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) {
305 c, err := p.protocol.Dial(ctx, "tcp", address, timeout)
306 if err != nil {
307 return nil, err
308 }
309 p.mu.Lock()
310 p.conns = append(p.conns, c)
311 p.mu.Unlock()
312 return c, nil
313}
314
315func (p *killProtocol) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
316 return p.protocol.Listen(ctx, "tcp", address)
317}
318
319func (p *killProtocol) Resolve(ctx *context.T, protocol, address string) (string, string, error) {
320 return p.protocol.Resolve(ctx, "tcp", address)
321}