blob: 932b8f5848d7dbdb66ab46fa395008b0daf9b25e [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package manager
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7 "net"
8 "reflect"
9 "runtime"
10 "strings"
11 "testing"
12
Jiri Simsa519c5072014-09-17 21:37:57 -070013 "veyron.io/veyron/veyron2/ipc/stream"
14 "veyron.io/veyron/veyron2/naming"
15 "veyron.io/veyron/veyron2/security"
16 "veyron.io/veyron/veyron2/vlog"
Cosmos Nicolaou5129fcb2014-08-22 11:52:25 -070017
Jiri Simsa519c5072014-09-17 21:37:57 -070018 _ "veyron.io/veyron/veyron/lib/testutil"
19 "veyron.io/veyron/veyron/lib/testutil/blackbox"
20 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
21 "veyron.io/veyron/veyron/runtimes/google/ipc/version"
22 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
23 isecurity "veyron.io/veyron/veyron/runtimes/google/security"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070024)
25
Ankura3c97652014-07-17 20:01:21 -070026func newID(name string) security.PrivateID {
Srdjan Petrovic4b0dfa02014-07-25 14:13:56 -070027 id, err := isecurity.NewPrivateID(name, nil)
Ankura3c97652014-07-17 20:01:21 -070028 if err != nil {
29 panic(err)
30 }
31 return id
32}
33
Jiri Simsa5293dcb2014-05-10 09:56:38 -070034func init() {
35 // The testutil package's init sets GOMAXPROCS to NumCPU. We want to
36 // force GOMAXPROCS to remain at 1, in order to trigger a particular
37 // race condition tht occurs when closing the server; also, using 1 cpu
38 // introduces less variance in the behavior of the test.
39 runtime.GOMAXPROCS(1)
40
41 blackbox.CommandTable["runServer"] = runServer
42}
43
44func TestSimpleFlow(t *testing.T) {
45 server := InternalNew(naming.FixedRoutingID(0x55555555))
46 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
47
Adam Sadovsky5181bdb2014-08-13 10:29:11 -070048 ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070049 if err != nil {
50 t.Fatal(err)
51 }
52
53 data := "the dark knight rises"
54 var clientVC stream.VC
55 var clientF1 stream.Flow
56 go func() {
57 if clientVC, err = client.Dial(ep); err != nil {
58 t.Errorf("Dial(%q) failed: %v", ep, err)
59 return
60 }
61 if clientF1, err = clientVC.Connect(); err != nil {
62 t.Errorf("Connect() failed: %v", err)
63 return
64 }
65 if err := writeLine(clientF1, data); err != nil {
66 t.Error(err)
67 }
68 }()
69 serverF, err := ln.Accept()
70 if err != nil {
71 t.Fatalf("Accept failed: %v", err)
72 }
73 if got, err := readLine(serverF); got != data || err != nil {
74 t.Errorf("Got (%q, %v), want (%q, nil)", got, err, data)
75 }
76 // By this point, the goroutine has passed the write call (or exited
77 // early) since the read has gotten through. Check if the goroutine
78 // encountered any errors in creating the VC or flow and abort.
79 if t.Failed() {
80 return
81 }
82 defer clientF1.Close()
83
84 ln.Close()
85
86 // Writes on flows opened before the server listener was closed should
87 // still succeed.
88 data = "the dark knight goes to bed"
89 go func() {
90 if err := writeLine(clientF1, data); err != nil {
91 t.Error(err)
92 }
93 }()
94 if got, err := readLine(serverF); got != data || err != nil {
95 t.Errorf("Got (%q, %v), want (%q, nil)", got, err, data)
96 }
97
98 // Opening a new flow on an existing VC will succeed initially, but
99 // writes on the client end will eventually fail once the server has
100 // stopped listening.
101 //
102 // It will require a round-trip to the server to notice the failure,
103 // hence the client should write enough data to ensure that the Write
104 // call will not return before a round-trip.
105 //
106 // The length of the data is taken to exceed the queue buffer size
107 // (DefaultBytesBufferedPerFlow), the shared counters (MaxSharedBytes)
108 // and the per-flow counters (DefaultBytesBufferedPerFlow) that are
109 // given when the flow gets established.
110 //
111 // TODO(caprita): separate the constants for the queue buffer size and
112 // the default number of counters to avoid confusion.
113 lotsOfData := string(make([]byte, vc.DefaultBytesBufferedPerFlow*2+vc.MaxSharedBytes+1))
114 clientF2, err := clientVC.Connect()
115 if err != nil {
116 t.Fatalf("Connect() failed: %v", err)
117 }
118 defer clientF2.Close()
119 if err := writeLine(clientF2, lotsOfData); err == nil {
120 t.Errorf("Should not be able to Dial or Write after the Listener is closed")
121 }
122 // Opening a new VC should fail fast.
123 if _, err := client.Dial(ep); err == nil {
124 t.Errorf("Should not be able to Dial after listener is closed")
125 }
126}
127
128func TestAuthenticatedByDefault(t *testing.T) {
129 server := InternalNew(naming.FixedRoutingID(0x55555555))
130 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
131
Ankura3c97652014-07-17 20:01:21 -0700132 clientID := newID("client")
133 serverID := newID("server")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700134 // VCSecurityLevel is intentionally not provided to Listen - to test
135 // default behavior.
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700136 ln, ep, err := server.Listen("tcp", "127.0.0.1:0", vc.FixedLocalID(serverID))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700137 if err != nil {
138 t.Fatal(err)
139 }
140
141 errs := make(chan error)
142
143 testIDs := func(tag string, flow stream.Flow, local, remote security.PrivateID) {
144 lID := flow.LocalID()
145 rID := flow.RemoteID()
146 if !reflect.DeepEqual(lID.Names(), local.PublicID().Names()) || !reflect.DeepEqual(rID.Names(), remote.PublicID().Names()) {
147 errs <- fmt.Errorf("%s: LocalID: Got %q want %q. RemoteID: Got %q, want %q", tag, lID, local, rID, remote)
148 return
149 }
150 errs <- nil
151 }
152
153 go func() {
154 flow, err := ln.Accept()
155 if err != nil {
156 errs <- err
157 return
158 }
159 defer flow.Close()
160 testIDs("server", flow, serverID, clientID)
161 }()
162
163 go func() {
164 // VCSecurityLevel is intentionally not provided to Dial - to
165 // test default behavior.
Ankura3c97652014-07-17 20:01:21 -0700166 vc, err := client.Dial(ep, vc.FixedLocalID(clientID))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 if err != nil {
168 errs <- err
169 return
170 }
171 flow, err := vc.Connect()
172 if err != nil {
173 errs <- err
174 return
175 }
176 defer flow.Close()
177 testIDs("client", flow, clientID, serverID)
178 }()
179
180 if err := <-errs; err != nil {
181 t.Error(err)
182 }
183 if err := <-errs; err != nil {
184 t.Error(err)
185 }
186}
187
188func numListeners(m stream.Manager) int { return len(m.(*manager).listeners) }
189func debugString(m stream.Manager) string { return m.(*manager).DebugString() }
190func numVIFs(m stream.Manager) int { return len(m.(*manager).vifs.List()) }
191
192func TestListenEndpoints(t *testing.T) {
193 server := InternalNew(naming.FixedRoutingID(0xcafe))
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700194 ln1, ep1, err1 := server.Listen("tcp", "127.0.0.1:0")
195 ln2, ep2, err2 := server.Listen("tcp", "127.0.0.1:0")
196 // Since "127.0.0.1:0" was used as the network address, a random port will be
197 // assigned in each case. The endpoint should include that random port.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700198 if err1 != nil {
199 t.Error(err1)
200 }
201 if err2 != nil {
202 t.Error(err2)
203 }
204 if ep1.String() == ep2.String() {
205 t.Errorf("Both listeners got the same endpoint: %q", ep1)
206 }
207 if n, expect := numListeners(server), 2; n != expect {
208 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
209 }
210 ln1.Close()
211 if n, expect := numListeners(server), 1; n != expect {
212 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
213 }
214 ln2.Close()
215 if n, expect := numListeners(server), 0; n != expect {
216 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
217 }
218}
219
220func acceptLoop(ln stream.Listener) {
221 for {
222 f, err := ln.Accept()
223 if err != nil {
224 return
225 }
226 f.Close()
227 }
228}
229
230func TestCloseListener(t *testing.T) {
231 server := InternalNew(naming.FixedRoutingID(0x5e97e9))
232
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700233 ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700234 if err != nil {
235 t.Fatal(err)
236 }
237 // Server will just listen for flows and close them.
238 go acceptLoop(ln)
239 client := InternalNew(naming.FixedRoutingID(0xc1e41))
240 if _, err = client.Dial(ep); err != nil {
241 t.Fatal(err)
242 }
243 ln.Close()
244 client = InternalNew(naming.FixedRoutingID(0xc1e42))
245 if _, err := client.Dial(ep); err == nil {
246 t.Errorf("client.Dial(%q) should have failed", ep)
247 }
248}
249
250func TestShutdown(t *testing.T) {
251 server := InternalNew(naming.FixedRoutingID(0x5e97e9))
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700252 ln, _, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700253 if err != nil {
254 t.Fatal(err)
255 }
256 // Server will just listen for flows and close them.
257 go acceptLoop(ln)
258 if n, expect := numListeners(server), 1; n != expect {
259 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
260 }
261 server.Shutdown()
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700262 if _, _, err := server.Listen("tcp", "127.0.0.1:0"); err == nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700263 t.Error("server should have shut down")
264 }
265 if n, expect := numListeners(server), 0; n != expect {
266 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
267 }
268}
269
270func TestShutdownEndpoint(t *testing.T) {
271 server := InternalNew(naming.FixedRoutingID(0x55555555))
272 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
273
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700274 ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700275 if err != nil {
276 t.Fatal(err)
277 }
278
279 // Server will just listen for flows and close them.
280 go acceptLoop(ln)
281
282 vc, err := client.Dial(ep)
283 if err != nil {
284 t.Fatal(err)
285 }
286 if f, err := vc.Connect(); f == nil || err != nil {
287 t.Errorf("vc.Connect failed: (%v, %v)", f, err)
288 }
289 client.ShutdownEndpoint(ep)
290 if f, err := vc.Connect(); f != nil || err == nil {
291 t.Errorf("vc.Connect unexpectedly succeeded: (%v, %v)", f, err)
292 }
293}
294
Andres Erbsenffa45742014-08-13 10:13:11 -0700295/* TLS + resumption + channel bindings is broken: <https://secure-resumption.com/#channelbindings>.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700296func TestSessionTicketCache(t *testing.T) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700297 server := InternalNew(naming.FixedRoutingID(0x55555555))
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700298 _, ep, err := server.Listen("tcp", "127.0.0.1:0", vc.FixedLocalID(newID("server")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700299 if err != nil {
300 t.Fatal(err)
301 }
302
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700303 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
Ankura3c97652014-07-17 20:01:21 -0700304 if _, err = client.Dial(ep, vc.FixedLocalID(newID("TestSessionTicketCacheClient"))); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700305 t.Fatalf("Dial(%q) failed: %v", ep, err)
306 }
307
308 if _, ok := client.(*manager).sessionCache.Get(ep.String()); !ok {
309 t.Fatalf("SessionTicket from TLS handshake not cached")
310 }
311}
Andres Erbsenffa45742014-08-13 10:13:11 -0700312*/
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700313
314func TestMultipleVCs(t *testing.T) {
315 server := InternalNew(naming.FixedRoutingID(0x55555555))
316 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
317
318 const nVCs = 2
319 const data = "bugs bunny"
320
321 // Have the server read from each flow and write to rchan.
322 rchan := make(chan string)
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700323 ln, ep, err := server.Listen("tcp", "127.0.0.1:0", vc.FixedLocalID(newID("server")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700324 if err != nil {
325 t.Fatal(err)
326 }
327 read := func(flow stream.Flow, c chan string) {
328 var buf bytes.Buffer
329 var tmp [1024]byte
330 for {
331 n, err := flow.Read(tmp[:])
332 buf.Write(tmp[:n])
333 if err == io.EOF {
334 c <- buf.String()
335 return
336 }
337 if err != nil {
338 t.Error(err)
339 return
340 }
341 }
342 }
343 go func() {
344 for i := 0; i < nVCs; i++ {
345 flow, err := ln.Accept()
346 if err != nil {
347 t.Error(err)
348 rchan <- ""
349 continue
350 }
351 go read(flow, rchan)
352 }
353 }()
354
355 // Have the client establish nVCs and a flow on each.
356 var vcs [nVCs]stream.VC
357 for i := 0; i < nVCs; i++ {
358 var err error
Ankura3c97652014-07-17 20:01:21 -0700359 vcs[i], err = client.Dial(ep, vc.FixedLocalID(newID("client")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700360 if err != nil {
361 t.Fatal(err)
362 }
363 }
364 write := func(vc stream.VC) {
365 if err != nil {
366 ln.Close()
367 t.Error(err)
368 return
369 }
370 flow, err := vc.Connect()
371 if err != nil {
372 ln.Close()
373 t.Error(err)
374 return
375 }
376 defer flow.Close()
377 if _, err := flow.Write([]byte(data)); err != nil {
378 ln.Close()
379 t.Error(err)
380 return
381 }
382 }
383 for _, vc := range vcs {
384 go write(vc)
385 }
386 for i := 0; i < nVCs; i++ {
387 if got := <-rchan; got != data {
388 t.Errorf("Got %q want %q", got, data)
389 }
390 }
391}
392
393func TestAddressResolution(t *testing.T) {
394 server := InternalNew(naming.FixedRoutingID(0x55555555))
395 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
396
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700397 // Using "tcp4" instead of "tcp" because the latter can end up with IPv6
398 // addresses and our Google Compute Engine integration test machines cannot
399 // resolve IPv6 addresses.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700400 // As of April 2014, https://developers.google.com/compute/docs/networking
401 // said that IPv6 is not yet supported.
402 ln, ep, err := server.Listen("tcp4", "127.0.0.1:0")
403 if err != nil {
404 t.Fatal(err)
405 }
406 go acceptLoop(ln)
407
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700408 // We'd like an endpoint that contains an address that's different than the
409 // one used for the connection. In practice this is awkward to achieve since
410 // we don't want to listen on ":0" since that will annoy firewalls. Instead we
411 // listen on 127.0.0.1 and we fabricate an endpoint that doesn't contain
412 // 127.0.0.1 by using ":0" to create it. This leads to an endpoint such that
413 // the address encoded in the endpoint (e.g. "0.0.0.0:55324") is different
414 // from the address of the connection (e.g. "127.0.0.1:55324").
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700415 _, port, _ := net.SplitHostPort(ep.Addr().String())
416 nep := version.Endpoint(ep.Addr().Network(), net.JoinHostPort("", port), ep.RoutingID())
417
418 // Dial multiple VCs
419 for i := 0; i < 2; i++ {
420 if _, err = client.Dial(nep); err != nil {
421 t.Fatalf("Dial #%d failed: %v", i, err)
422 }
423 }
424 // They should all be on the same VIF.
425 if n := numVIFs(client); n != 1 {
426 t.Errorf("Client has %d VIFs, want 1\n%v", n, debugString(client))
427 }
428 // TODO(ashankar): While a VIF can be re-used to Dial from the server
429 // to the client, currently there is no way to have the client "listen"
430 // on the same VIF. It can listen on a VC for new flows, but it cannot
431 // listen on an established VIF for new VCs. Figure this out?
432}
433
434func TestServerRestartDuringClientLifetime(t *testing.T) {
435 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
436 server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0")
437 server.Cmd.Start()
438 addr, err := server.ReadLineFromChild()
439 if err != nil {
440 t.Fatalf("Failed to read server address from process: %v", err)
441 }
442 ep, err := inaming.NewEndpoint(addr)
443 if err != nil {
444 t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
445 }
446 if _, err := client.Dial(ep); err != nil {
447 t.Fatal(err)
448 }
449 server.Cleanup()
450 // A new VC cannot be created since the server is dead
451 if _, err := client.Dial(ep); err == nil {
452 t.Fatal("Expected client.Dial to fail since server is dead")
453 }
454 // Restarting the server, listening on the same address as before
455 server = blackbox.HelperCommand(t, "runServer", addr)
456 defer server.Cleanup()
457 server.Cmd.Start()
458 if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
459 t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
460 }
461 if _, err := client.Dial(ep); err != nil {
462 t.Fatal(err)
463 }
464}
465
466// Required by blackbox framework
467func TestHelperProcess(t *testing.T) {
468 blackbox.HelperProcess(t)
469}
470
471func runServer(argv []string) {
472 server := InternalNew(naming.FixedRoutingID(0x55555555))
Ankura3c97652014-07-17 20:01:21 -0700473 _, ep, err := server.Listen("tcp", argv[0], vc.FixedLocalID(newID("server")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700474 if err != nil {
475 fmt.Println(err)
476 return
477 }
478 fmt.Println(ep.Addr())
479 // Live forever (till the process is explicitly killed)
480 <-make(chan struct{})
481}
482
483func readLine(f stream.Flow) (string, error) {
484 var result bytes.Buffer
485 var buf [5]byte
486 for {
487 n, err := f.Read(buf[:])
488 result.Write(buf[:n])
489 if err == io.EOF || buf[n-1] == '\n' {
490 return strings.TrimRight(result.String(), "\n"), nil
491 }
492 if err != nil {
493 return "", fmt.Errorf("Read returned (%d, %v)", n, err)
494 }
495 }
496}
497
498func writeLine(f stream.Flow, data string) error {
499 data = data + "\n"
500 vlog.VI(1).Infof("write sending %d bytes", len(data))
501 if n, err := f.Write([]byte(data)); err != nil {
502 return fmt.Errorf("Write returned (%d, %v)", n, err)
503 }
504 return nil
505}
Cosmos Nicolaou5129fcb2014-08-22 11:52:25 -0700506
507func TestRegistration(t *testing.T) {
508 server := InternalNew(naming.FixedRoutingID(0x55555555))
509 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
510
511 dialer := func(addr string) (net.Conn, error) {
512 return nil, fmt.Errorf("tn.Dial")
513 }
514 listener := func(addr string) (net.Listener, error) {
515 return nil, fmt.Errorf("tn.Listen")
516 }
517 stream.RegisterProtocol("tn", dialer, listener)
518
519 _, _, err := server.Listen("tnx", "127.0.0.1:0")
520 if err == nil || !strings.Contains(err.Error(), "unknown network tnx") {
521 t.Fatal("expected error is missing (%v)", err)
522 }
523
524 _, _, err = server.Listen("tn", "127.0.0.1:0")
525 if err == nil || !strings.Contains(err.Error(), "tn.Listen") {
526 t.Fatal("expected error is missing (%v)", err)
527 }
528
529 // Need a functional listener to test Dial.
530 listener = func(addr string) (net.Listener, error) {
531 return net.Listen("tcp", addr)
532 }
533
534 if got, want := stream.RegisterProtocol("tn", dialer, listener), true; got != want {
535 t.Errorf("got %t, want %t", got, want)
536 }
537
538 _, ep, err := server.Listen("tn", "127.0.0.1:0")
539 if err != nil {
540 t.Errorf("unexpected error %s", err)
541 }
542
543 _, err = client.Dial(ep)
544 if err == nil || !strings.Contains(err.Error(), "tn.Dial") {
545 t.Fatal("expected error is missing (%v)", err)
546 }
547}