blob: 6c78e89875b1ddbacddf232c865e0f590d2f1b4f [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package manager
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7 "net"
8 "reflect"
9 "runtime"
10 "strings"
11 "testing"
12
13 _ "veyron/lib/testutil"
14 "veyron/lib/testutil/blackbox"
15 "veyron/runtimes/google/ipc/stream/vc"
16 "veyron/runtimes/google/ipc/version"
17 inaming "veyron/runtimes/google/naming"
Ankura3c97652014-07-17 20:01:21 -070018 isecurity "veyron/runtimes/google/security"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070019
Jiri Simsa5293dcb2014-05-10 09:56:38 -070020 "veyron2/ipc/stream"
21 "veyron2/naming"
22 "veyron2/security"
23 "veyron2/vlog"
24)
25
Ankura3c97652014-07-17 20:01:21 -070026func newID(name string) security.PrivateID {
Srdjan Petrovic4b0dfa02014-07-25 14:13:56 -070027 id, err := isecurity.NewPrivateID(name, nil)
Ankura3c97652014-07-17 20:01:21 -070028 if err != nil {
29 panic(err)
30 }
31 return id
32}
33
Jiri Simsa5293dcb2014-05-10 09:56:38 -070034func init() {
35 // The testutil package's init sets GOMAXPROCS to NumCPU. We want to
36 // force GOMAXPROCS to remain at 1, in order to trigger a particular
37 // race condition tht occurs when closing the server; also, using 1 cpu
38 // introduces less variance in the behavior of the test.
39 runtime.GOMAXPROCS(1)
40
41 blackbox.CommandTable["runServer"] = runServer
42}
43
44func TestSimpleFlow(t *testing.T) {
45 server := InternalNew(naming.FixedRoutingID(0x55555555))
46 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
47
Adam Sadovsky5181bdb2014-08-13 10:29:11 -070048 ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070049 if err != nil {
50 t.Fatal(err)
51 }
52
53 data := "the dark knight rises"
54 var clientVC stream.VC
55 var clientF1 stream.Flow
56 go func() {
57 if clientVC, err = client.Dial(ep); err != nil {
58 t.Errorf("Dial(%q) failed: %v", ep, err)
59 return
60 }
61 if clientF1, err = clientVC.Connect(); err != nil {
62 t.Errorf("Connect() failed: %v", err)
63 return
64 }
65 if err := writeLine(clientF1, data); err != nil {
66 t.Error(err)
67 }
68 }()
69 serverF, err := ln.Accept()
70 if err != nil {
71 t.Fatalf("Accept failed: %v", err)
72 }
73 if got, err := readLine(serverF); got != data || err != nil {
74 t.Errorf("Got (%q, %v), want (%q, nil)", got, err, data)
75 }
76 // By this point, the goroutine has passed the write call (or exited
77 // early) since the read has gotten through. Check if the goroutine
78 // encountered any errors in creating the VC or flow and abort.
79 if t.Failed() {
80 return
81 }
82 defer clientF1.Close()
83
84 ln.Close()
85
86 // Writes on flows opened before the server listener was closed should
87 // still succeed.
88 data = "the dark knight goes to bed"
89 go func() {
90 if err := writeLine(clientF1, data); err != nil {
91 t.Error(err)
92 }
93 }()
94 if got, err := readLine(serverF); got != data || err != nil {
95 t.Errorf("Got (%q, %v), want (%q, nil)", got, err, data)
96 }
97
98 // Opening a new flow on an existing VC will succeed initially, but
99 // writes on the client end will eventually fail once the server has
100 // stopped listening.
101 //
102 // It will require a round-trip to the server to notice the failure,
103 // hence the client should write enough data to ensure that the Write
104 // call will not return before a round-trip.
105 //
106 // The length of the data is taken to exceed the queue buffer size
107 // (DefaultBytesBufferedPerFlow), the shared counters (MaxSharedBytes)
108 // and the per-flow counters (DefaultBytesBufferedPerFlow) that are
109 // given when the flow gets established.
110 //
111 // TODO(caprita): separate the constants for the queue buffer size and
112 // the default number of counters to avoid confusion.
113 lotsOfData := string(make([]byte, vc.DefaultBytesBufferedPerFlow*2+vc.MaxSharedBytes+1))
114 clientF2, err := clientVC.Connect()
115 if err != nil {
116 t.Fatalf("Connect() failed: %v", err)
117 }
118 defer clientF2.Close()
119 if err := writeLine(clientF2, lotsOfData); err == nil {
120 t.Errorf("Should not be able to Dial or Write after the Listener is closed")
121 }
122 // Opening a new VC should fail fast.
123 if _, err := client.Dial(ep); err == nil {
124 t.Errorf("Should not be able to Dial after listener is closed")
125 }
126}
127
128func TestAuthenticatedByDefault(t *testing.T) {
129 server := InternalNew(naming.FixedRoutingID(0x55555555))
130 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
131
Ankura3c97652014-07-17 20:01:21 -0700132 clientID := newID("client")
133 serverID := newID("server")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700134 // VCSecurityLevel is intentionally not provided to Listen - to test
135 // default behavior.
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700136 ln, ep, err := server.Listen("tcp", "127.0.0.1:0", vc.FixedLocalID(serverID))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700137 if err != nil {
138 t.Fatal(err)
139 }
140
141 errs := make(chan error)
142
143 testIDs := func(tag string, flow stream.Flow, local, remote security.PrivateID) {
144 lID := flow.LocalID()
145 rID := flow.RemoteID()
146 if !reflect.DeepEqual(lID.Names(), local.PublicID().Names()) || !reflect.DeepEqual(rID.Names(), remote.PublicID().Names()) {
147 errs <- fmt.Errorf("%s: LocalID: Got %q want %q. RemoteID: Got %q, want %q", tag, lID, local, rID, remote)
148 return
149 }
150 errs <- nil
151 }
152
153 go func() {
154 flow, err := ln.Accept()
155 if err != nil {
156 errs <- err
157 return
158 }
159 defer flow.Close()
160 testIDs("server", flow, serverID, clientID)
161 }()
162
163 go func() {
164 // VCSecurityLevel is intentionally not provided to Dial - to
165 // test default behavior.
Ankura3c97652014-07-17 20:01:21 -0700166 vc, err := client.Dial(ep, vc.FixedLocalID(clientID))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 if err != nil {
168 errs <- err
169 return
170 }
171 flow, err := vc.Connect()
172 if err != nil {
173 errs <- err
174 return
175 }
176 defer flow.Close()
177 testIDs("client", flow, clientID, serverID)
178 }()
179
180 if err := <-errs; err != nil {
181 t.Error(err)
182 }
183 if err := <-errs; err != nil {
184 t.Error(err)
185 }
186}
187
188func numListeners(m stream.Manager) int { return len(m.(*manager).listeners) }
189func debugString(m stream.Manager) string { return m.(*manager).DebugString() }
190func numVIFs(m stream.Manager) int { return len(m.(*manager).vifs.List()) }
191
192func TestListenEndpoints(t *testing.T) {
193 server := InternalNew(naming.FixedRoutingID(0xcafe))
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700194 ln1, ep1, err1 := server.Listen("tcp", "127.0.0.1:0")
195 ln2, ep2, err2 := server.Listen("tcp", "127.0.0.1:0")
196 // Since "127.0.0.1:0" was used as the network address, a random port will be
197 // assigned in each case. The endpoint should include that random port.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700198 if err1 != nil {
199 t.Error(err1)
200 }
201 if err2 != nil {
202 t.Error(err2)
203 }
204 if ep1.String() == ep2.String() {
205 t.Errorf("Both listeners got the same endpoint: %q", ep1)
206 }
207 if n, expect := numListeners(server), 2; n != expect {
208 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
209 }
210 ln1.Close()
211 if n, expect := numListeners(server), 1; n != expect {
212 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
213 }
214 ln2.Close()
215 if n, expect := numListeners(server), 0; n != expect {
216 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
217 }
218}
219
220func acceptLoop(ln stream.Listener) {
221 for {
222 f, err := ln.Accept()
223 if err != nil {
224 return
225 }
226 f.Close()
227 }
228}
229
230func TestCloseListener(t *testing.T) {
231 server := InternalNew(naming.FixedRoutingID(0x5e97e9))
232
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700233 ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700234 if err != nil {
235 t.Fatal(err)
236 }
237 // Server will just listen for flows and close them.
238 go acceptLoop(ln)
239 client := InternalNew(naming.FixedRoutingID(0xc1e41))
240 if _, err = client.Dial(ep); err != nil {
241 t.Fatal(err)
242 }
243 ln.Close()
244 client = InternalNew(naming.FixedRoutingID(0xc1e42))
245 if _, err := client.Dial(ep); err == nil {
246 t.Errorf("client.Dial(%q) should have failed", ep)
247 }
248}
249
250func TestShutdown(t *testing.T) {
251 server := InternalNew(naming.FixedRoutingID(0x5e97e9))
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700252 ln, _, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700253 if err != nil {
254 t.Fatal(err)
255 }
256 // Server will just listen for flows and close them.
257 go acceptLoop(ln)
258 if n, expect := numListeners(server), 1; n != expect {
259 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
260 }
261 server.Shutdown()
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700262 if _, _, err := server.Listen("tcp", "127.0.0.1:0"); err == nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700263 t.Error("server should have shut down")
264 }
265 if n, expect := numListeners(server), 0; n != expect {
266 t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server))
267 }
268}
269
270func TestShutdownEndpoint(t *testing.T) {
271 server := InternalNew(naming.FixedRoutingID(0x55555555))
272 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
273
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700274 ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700275 if err != nil {
276 t.Fatal(err)
277 }
278
279 // Server will just listen for flows and close them.
280 go acceptLoop(ln)
281
282 vc, err := client.Dial(ep)
283 if err != nil {
284 t.Fatal(err)
285 }
286 if f, err := vc.Connect(); f == nil || err != nil {
287 t.Errorf("vc.Connect failed: (%v, %v)", f, err)
288 }
289 client.ShutdownEndpoint(ep)
290 if f, err := vc.Connect(); f != nil || err == nil {
291 t.Errorf("vc.Connect unexpectedly succeeded: (%v, %v)", f, err)
292 }
293}
294
295func TestSessionTicketCache(t *testing.T) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700296 server := InternalNew(naming.FixedRoutingID(0x55555555))
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700297 _, ep, err := server.Listen("tcp", "127.0.0.1:0", vc.FixedLocalID(newID("server")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700298 if err != nil {
299 t.Fatal(err)
300 }
301
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700302 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
Ankura3c97652014-07-17 20:01:21 -0700303 if _, err = client.Dial(ep, vc.FixedLocalID(newID("TestSessionTicketCacheClient"))); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700304 t.Fatalf("Dial(%q) failed: %v", ep, err)
305 }
306
307 if _, ok := client.(*manager).sessionCache.Get(ep.String()); !ok {
308 t.Fatalf("SessionTicket from TLS handshake not cached")
309 }
310}
311
312func TestMultipleVCs(t *testing.T) {
313 server := InternalNew(naming.FixedRoutingID(0x55555555))
314 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
315
316 const nVCs = 2
317 const data = "bugs bunny"
318
319 // Have the server read from each flow and write to rchan.
320 rchan := make(chan string)
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700321 ln, ep, err := server.Listen("tcp", "127.0.0.1:0", vc.FixedLocalID(newID("server")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700322 if err != nil {
323 t.Fatal(err)
324 }
325 read := func(flow stream.Flow, c chan string) {
326 var buf bytes.Buffer
327 var tmp [1024]byte
328 for {
329 n, err := flow.Read(tmp[:])
330 buf.Write(tmp[:n])
331 if err == io.EOF {
332 c <- buf.String()
333 return
334 }
335 if err != nil {
336 t.Error(err)
337 return
338 }
339 }
340 }
341 go func() {
342 for i := 0; i < nVCs; i++ {
343 flow, err := ln.Accept()
344 if err != nil {
345 t.Error(err)
346 rchan <- ""
347 continue
348 }
349 go read(flow, rchan)
350 }
351 }()
352
353 // Have the client establish nVCs and a flow on each.
354 var vcs [nVCs]stream.VC
355 for i := 0; i < nVCs; i++ {
356 var err error
Ankura3c97652014-07-17 20:01:21 -0700357 vcs[i], err = client.Dial(ep, vc.FixedLocalID(newID("client")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700358 if err != nil {
359 t.Fatal(err)
360 }
361 }
362 write := func(vc stream.VC) {
363 if err != nil {
364 ln.Close()
365 t.Error(err)
366 return
367 }
368 flow, err := vc.Connect()
369 if err != nil {
370 ln.Close()
371 t.Error(err)
372 return
373 }
374 defer flow.Close()
375 if _, err := flow.Write([]byte(data)); err != nil {
376 ln.Close()
377 t.Error(err)
378 return
379 }
380 }
381 for _, vc := range vcs {
382 go write(vc)
383 }
384 for i := 0; i < nVCs; i++ {
385 if got := <-rchan; got != data {
386 t.Errorf("Got %q want %q", got, data)
387 }
388 }
389}
390
391func TestAddressResolution(t *testing.T) {
392 server := InternalNew(naming.FixedRoutingID(0x55555555))
393 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
394
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700395 // Using "tcp4" instead of "tcp" because the latter can end up with IPv6
396 // addresses and our Google Compute Engine integration test machines cannot
397 // resolve IPv6 addresses.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700398 // As of April 2014, https://developers.google.com/compute/docs/networking
399 // said that IPv6 is not yet supported.
400 ln, ep, err := server.Listen("tcp4", "127.0.0.1:0")
401 if err != nil {
402 t.Fatal(err)
403 }
404 go acceptLoop(ln)
405
Adam Sadovsky5181bdb2014-08-13 10:29:11 -0700406 // We'd like an endpoint that contains an address that's different than the
407 // one used for the connection. In practice this is awkward to achieve since
408 // we don't want to listen on ":0" since that will annoy firewalls. Instead we
409 // listen on 127.0.0.1 and we fabricate an endpoint that doesn't contain
410 // 127.0.0.1 by using ":0" to create it. This leads to an endpoint such that
411 // the address encoded in the endpoint (e.g. "0.0.0.0:55324") is different
412 // from the address of the connection (e.g. "127.0.0.1:55324").
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700413 _, port, _ := net.SplitHostPort(ep.Addr().String())
414 nep := version.Endpoint(ep.Addr().Network(), net.JoinHostPort("", port), ep.RoutingID())
415
416 // Dial multiple VCs
417 for i := 0; i < 2; i++ {
418 if _, err = client.Dial(nep); err != nil {
419 t.Fatalf("Dial #%d failed: %v", i, err)
420 }
421 }
422 // They should all be on the same VIF.
423 if n := numVIFs(client); n != 1 {
424 t.Errorf("Client has %d VIFs, want 1\n%v", n, debugString(client))
425 }
426 // TODO(ashankar): While a VIF can be re-used to Dial from the server
427 // to the client, currently there is no way to have the client "listen"
428 // on the same VIF. It can listen on a VC for new flows, but it cannot
429 // listen on an established VIF for new VCs. Figure this out?
430}
431
432func TestServerRestartDuringClientLifetime(t *testing.T) {
433 client := InternalNew(naming.FixedRoutingID(0xcccccccc))
434 server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0")
435 server.Cmd.Start()
436 addr, err := server.ReadLineFromChild()
437 if err != nil {
438 t.Fatalf("Failed to read server address from process: %v", err)
439 }
440 ep, err := inaming.NewEndpoint(addr)
441 if err != nil {
442 t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
443 }
444 if _, err := client.Dial(ep); err != nil {
445 t.Fatal(err)
446 }
447 server.Cleanup()
448 // A new VC cannot be created since the server is dead
449 if _, err := client.Dial(ep); err == nil {
450 t.Fatal("Expected client.Dial to fail since server is dead")
451 }
452 // Restarting the server, listening on the same address as before
453 server = blackbox.HelperCommand(t, "runServer", addr)
454 defer server.Cleanup()
455 server.Cmd.Start()
456 if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
457 t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
458 }
459 if _, err := client.Dial(ep); err != nil {
460 t.Fatal(err)
461 }
462}
463
464// Required by blackbox framework
465func TestHelperProcess(t *testing.T) {
466 blackbox.HelperProcess(t)
467}
468
469func runServer(argv []string) {
470 server := InternalNew(naming.FixedRoutingID(0x55555555))
Ankura3c97652014-07-17 20:01:21 -0700471 _, ep, err := server.Listen("tcp", argv[0], vc.FixedLocalID(newID("server")))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700472 if err != nil {
473 fmt.Println(err)
474 return
475 }
476 fmt.Println(ep.Addr())
477 // Live forever (till the process is explicitly killed)
478 <-make(chan struct{})
479}
480
481func readLine(f stream.Flow) (string, error) {
482 var result bytes.Buffer
483 var buf [5]byte
484 for {
485 n, err := f.Read(buf[:])
486 result.Write(buf[:n])
487 if err == io.EOF || buf[n-1] == '\n' {
488 return strings.TrimRight(result.String(), "\n"), nil
489 }
490 if err != nil {
491 return "", fmt.Errorf("Read returned (%d, %v)", n, err)
492 }
493 }
494}
495
496func writeLine(f stream.Flow, data string) error {
497 data = data + "\n"
498 vlog.VI(1).Infof("write sending %d bytes", len(data))
499 if n, err := f.Write([]byte(data)); err != nil {
500 return fmt.Errorf("Write returned (%d, %v)", n, err)
501 }
502 return nil
503}