| package manager |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "net" |
| "reflect" |
| "runtime" |
| "strings" |
| "testing" |
| |
| _ "veyron/lib/testutil" |
| "veyron/lib/testutil/blackbox" |
| "veyron/runtimes/google/ipc/stream/vc" |
| "veyron/runtimes/google/ipc/version" |
| inaming "veyron/runtimes/google/naming" |
| |
| "veyron2" |
| "veyron2/ipc/stream" |
| "veyron2/naming" |
| "veyron2/security" |
| "veyron2/vlog" |
| ) |
| |
| func init() { |
| // The testutil package's init sets GOMAXPROCS to NumCPU. We want to |
| // force GOMAXPROCS to remain at 1, in order to trigger a particular |
| // race condition tht occurs when closing the server; also, using 1 cpu |
| // introduces less variance in the behavior of the test. |
| runtime.GOMAXPROCS(1) |
| |
| blackbox.CommandTable["runServer"] = runServer |
| } |
| |
| func TestSimpleFlow(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0x55555555)) |
| client := InternalNew(naming.FixedRoutingID(0xcccccccc)) |
| |
| ln, ep, err := server.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| data := "the dark knight rises" |
| var clientVC stream.VC |
| var clientF1 stream.Flow |
| go func() { |
| if clientVC, err = client.Dial(ep); err != nil { |
| t.Errorf("Dial(%q) failed: %v", ep, err) |
| return |
| } |
| if clientF1, err = clientVC.Connect(); err != nil { |
| t.Errorf("Connect() failed: %v", err) |
| return |
| } |
| if err := writeLine(clientF1, data); err != nil { |
| t.Error(err) |
| } |
| }() |
| serverF, err := ln.Accept() |
| if err != nil { |
| t.Fatalf("Accept failed: %v", err) |
| } |
| if got, err := readLine(serverF); got != data || err != nil { |
| t.Errorf("Got (%q, %v), want (%q, nil)", got, err, data) |
| } |
| // By this point, the goroutine has passed the write call (or exited |
| // early) since the read has gotten through. Check if the goroutine |
| // encountered any errors in creating the VC or flow and abort. |
| if t.Failed() { |
| return |
| } |
| defer clientF1.Close() |
| |
| ln.Close() |
| |
| // Writes on flows opened before the server listener was closed should |
| // still succeed. |
| data = "the dark knight goes to bed" |
| go func() { |
| if err := writeLine(clientF1, data); err != nil { |
| t.Error(err) |
| } |
| }() |
| if got, err := readLine(serverF); got != data || err != nil { |
| t.Errorf("Got (%q, %v), want (%q, nil)", got, err, data) |
| } |
| |
| // Opening a new flow on an existing VC will succeed initially, but |
| // writes on the client end will eventually fail once the server has |
| // stopped listening. |
| // |
| // It will require a round-trip to the server to notice the failure, |
| // hence the client should write enough data to ensure that the Write |
| // call will not return before a round-trip. |
| // |
| // The length of the data is taken to exceed the queue buffer size |
| // (DefaultBytesBufferedPerFlow), the shared counters (MaxSharedBytes) |
| // and the per-flow counters (DefaultBytesBufferedPerFlow) that are |
| // given when the flow gets established. |
| // |
| // TODO(caprita): separate the constants for the queue buffer size and |
| // the default number of counters to avoid confusion. |
| lotsOfData := string(make([]byte, vc.DefaultBytesBufferedPerFlow*2+vc.MaxSharedBytes+1)) |
| clientF2, err := clientVC.Connect() |
| if err != nil { |
| t.Fatalf("Connect() failed: %v", err) |
| } |
| defer clientF2.Close() |
| if err := writeLine(clientF2, lotsOfData); err == nil { |
| t.Errorf("Should not be able to Dial or Write after the Listener is closed") |
| } |
| // Opening a new VC should fail fast. |
| if _, err := client.Dial(ep); err == nil { |
| t.Errorf("Should not be able to Dial after listener is closed") |
| } |
| } |
| |
| func TestAuthenticatedByDefault(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0x55555555)) |
| client := InternalNew(naming.FixedRoutingID(0xcccccccc)) |
| |
| serverID := security.FakePrivateID("server") |
| clientID := security.FakePrivateID("client") |
| |
| // VCSecurityLevel is intentionally not provided to Listen - to test |
| // default behavior. |
| ln, ep, err := server.Listen("tcp", "localhost:0", veyron2.LocalID(serverID)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| errs := make(chan error) |
| |
| testIDs := func(tag string, flow stream.Flow, local, remote security.PrivateID) { |
| lID := flow.LocalID() |
| rID := flow.RemoteID() |
| if !reflect.DeepEqual(lID.Names(), local.PublicID().Names()) || !reflect.DeepEqual(rID.Names(), remote.PublicID().Names()) { |
| errs <- fmt.Errorf("%s: LocalID: Got %q want %q. RemoteID: Got %q, want %q", tag, lID, local, rID, remote) |
| return |
| } |
| errs <- nil |
| } |
| |
| go func() { |
| flow, err := ln.Accept() |
| if err != nil { |
| errs <- err |
| return |
| } |
| defer flow.Close() |
| testIDs("server", flow, serverID, clientID) |
| }() |
| |
| go func() { |
| // VCSecurityLevel is intentionally not provided to Dial - to |
| // test default behavior. |
| vc, err := client.Dial(ep, veyron2.LocalID(clientID)) |
| if err != nil { |
| errs <- err |
| return |
| } |
| flow, err := vc.Connect() |
| if err != nil { |
| errs <- err |
| return |
| } |
| defer flow.Close() |
| testIDs("client", flow, clientID, serverID) |
| }() |
| |
| if err := <-errs; err != nil { |
| t.Error(err) |
| } |
| if err := <-errs; err != nil { |
| t.Error(err) |
| } |
| } |
| |
| func numListeners(m stream.Manager) int { return len(m.(*manager).listeners) } |
| func debugString(m stream.Manager) string { return m.(*manager).DebugString() } |
| func numVIFs(m stream.Manager) int { return len(m.(*manager).vifs.List()) } |
| |
| func TestListenEndpoints(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0xcafe)) |
| ln1, ep1, err1 := server.Listen("tcp", "localhost:0") |
| ln2, ep2, err2 := server.Listen("tcp", "localhost:0") |
| // Since "localhost:0" was used as the network address, a random port |
| // will be assigned in each case. The endpoint should include that |
| // random port. |
| if err1 != nil { |
| t.Error(err1) |
| } |
| if err2 != nil { |
| t.Error(err2) |
| } |
| if ep1.String() == ep2.String() { |
| t.Errorf("Both listeners got the same endpoint: %q", ep1) |
| } |
| if n, expect := numListeners(server), 2; n != expect { |
| t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server)) |
| } |
| ln1.Close() |
| if n, expect := numListeners(server), 1; n != expect { |
| t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server)) |
| } |
| ln2.Close() |
| if n, expect := numListeners(server), 0; n != expect { |
| t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server)) |
| } |
| } |
| |
| func acceptLoop(ln stream.Listener) { |
| for { |
| f, err := ln.Accept() |
| if err != nil { |
| return |
| } |
| f.Close() |
| } |
| } |
| |
| func TestCloseListener(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0x5e97e9)) |
| |
| ln, ep, err := server.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatal(err) |
| } |
| // Server will just listen for flows and close them. |
| go acceptLoop(ln) |
| client := InternalNew(naming.FixedRoutingID(0xc1e41)) |
| if _, err = client.Dial(ep); err != nil { |
| t.Fatal(err) |
| } |
| ln.Close() |
| client = InternalNew(naming.FixedRoutingID(0xc1e42)) |
| if _, err := client.Dial(ep); err == nil { |
| t.Errorf("client.Dial(%q) should have failed", ep) |
| } |
| } |
| |
| func TestShutdown(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0x5e97e9)) |
| ln, _, err := server.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatal(err) |
| } |
| // Server will just listen for flows and close them. |
| go acceptLoop(ln) |
| if n, expect := numListeners(server), 1; n != expect { |
| t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server)) |
| } |
| server.Shutdown() |
| if _, _, err := server.Listen("tcp", "localhost:0"); err == nil { |
| t.Error("server should have shut down") |
| } |
| if n, expect := numListeners(server), 0; n != expect { |
| t.Errorf("expecting %d listeners, got %d for %s", n, expect, debugString(server)) |
| } |
| } |
| |
| func TestShutdownEndpoint(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0x55555555)) |
| client := InternalNew(naming.FixedRoutingID(0xcccccccc)) |
| |
| ln, ep, err := server.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // Server will just listen for flows and close them. |
| go acceptLoop(ln) |
| |
| vc, err := client.Dial(ep) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if f, err := vc.Connect(); f == nil || err != nil { |
| t.Errorf("vc.Connect failed: (%v, %v)", f, err) |
| } |
| client.ShutdownEndpoint(ep) |
| if f, err := vc.Connect(); f != nil || err == nil { |
| t.Errorf("vc.Connect unexpectedly succeeded: (%v, %v)", f, err) |
| } |
| } |
| |
| func TestSessionTicketCache(t *testing.T) { |
| serverID := veyron2.LocalID(security.FakePrivateID("TestSessionTicketCacheServer")) |
| server := InternalNew(naming.FixedRoutingID(0x55555555)) |
| _, ep, err := server.Listen("tcp", "localhost:0", serverID) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| clientID := veyron2.LocalID(security.FakePrivateID("TestSessionTicketCacheClient")) |
| client := InternalNew(naming.FixedRoutingID(0xcccccccc)) |
| if _, err = client.Dial(ep, clientID); err != nil { |
| t.Fatalf("Dial(%q) failed: %v", ep, err) |
| } |
| |
| if _, ok := client.(*manager).sessionCache.Get(ep.String()); !ok { |
| t.Fatalf("SessionTicket from TLS handshake not cached") |
| } |
| } |
| |
| func TestMultipleVCs(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0x55555555)) |
| client := InternalNew(naming.FixedRoutingID(0xcccccccc)) |
| |
| const nVCs = 2 |
| const data = "bugs bunny" |
| |
| // Have the server read from each flow and write to rchan. |
| rchan := make(chan string) |
| ln, ep, err := server.Listen("tcp", "localhost:0", veyron2.LocalID(security.FakePrivateID("server"))) |
| if err != nil { |
| t.Fatal(err) |
| } |
| read := func(flow stream.Flow, c chan string) { |
| var buf bytes.Buffer |
| var tmp [1024]byte |
| for { |
| n, err := flow.Read(tmp[:]) |
| buf.Write(tmp[:n]) |
| if err == io.EOF { |
| c <- buf.String() |
| return |
| } |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| } |
| } |
| go func() { |
| for i := 0; i < nVCs; i++ { |
| flow, err := ln.Accept() |
| if err != nil { |
| t.Error(err) |
| rchan <- "" |
| continue |
| } |
| go read(flow, rchan) |
| } |
| }() |
| |
| // Have the client establish nVCs and a flow on each. |
| var vcs [nVCs]stream.VC |
| for i := 0; i < nVCs; i++ { |
| var err error |
| vcs[i], err = client.Dial(ep, veyron2.LocalID(security.FakePrivateID("client"))) |
| if err != nil { |
| t.Fatal(err) |
| } |
| } |
| write := func(vc stream.VC) { |
| if err != nil { |
| ln.Close() |
| t.Error(err) |
| return |
| } |
| flow, err := vc.Connect() |
| if err != nil { |
| ln.Close() |
| t.Error(err) |
| return |
| } |
| defer flow.Close() |
| if _, err := flow.Write([]byte(data)); err != nil { |
| ln.Close() |
| t.Error(err) |
| return |
| } |
| } |
| for _, vc := range vcs { |
| go write(vc) |
| } |
| for i := 0; i < nVCs; i++ { |
| if got := <-rchan; got != data { |
| t.Errorf("Got %q want %q", got, data) |
| } |
| } |
| } |
| |
| func TestAddressResolution(t *testing.T) { |
| server := InternalNew(naming.FixedRoutingID(0x55555555)) |
| client := InternalNew(naming.FixedRoutingID(0xcccccccc)) |
| |
| // Using "tcp4" instead of "tcp" because the latter can end up with |
| // IPv6 addresses and our Google Compute Engine integration test |
| // machines cannot resolve IPv6 addresses. |
| // As of April 2014, https://developers.google.com/compute/docs/networking |
| // said that IPv6 is not yet supported. |
| ln, ep, err := server.Listen("tcp4", "127.0.0.1:0") |
| if err != nil { |
| t.Fatal(err) |
| } |
| go acceptLoop(ln) |
| |
| // We'd like an enpoint that contains an address that's different |
| // to the one used for the connection. In practice this is awkward |
| // to achieve since we don't want to listen on ":0" since that will |
| // annoy firewalls. Instead we listen on 127.0.0.1 and we fabricate an |
| // endpoint that doesn't contain 127.0.0.1 by using ":0" to create it. |
| // This leads to an endpoint such that the address |
| // encoded in the endpoint (e.g. "0.0.0.0:55324") is different from |
| // address of the connection (e.g. "127.0.0.1:55324"). |
| _, port, _ := net.SplitHostPort(ep.Addr().String()) |
| nep := version.Endpoint(ep.Addr().Network(), net.JoinHostPort("", port), ep.RoutingID()) |
| |
| // Dial multiple VCs |
| for i := 0; i < 2; i++ { |
| if _, err = client.Dial(nep); err != nil { |
| t.Fatalf("Dial #%d failed: %v", i, err) |
| } |
| } |
| // They should all be on the same VIF. |
| if n := numVIFs(client); n != 1 { |
| t.Errorf("Client has %d VIFs, want 1\n%v", n, debugString(client)) |
| } |
| // TODO(ashankar): While a VIF can be re-used to Dial from the server |
| // to the client, currently there is no way to have the client "listen" |
| // on the same VIF. It can listen on a VC for new flows, but it cannot |
| // listen on an established VIF for new VCs. Figure this out? |
| } |
| |
| func TestServerRestartDuringClientLifetime(t *testing.T) { |
| client := InternalNew(naming.FixedRoutingID(0xcccccccc)) |
| server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0") |
| server.Cmd.Start() |
| addr, err := server.ReadLineFromChild() |
| if err != nil { |
| t.Fatalf("Failed to read server address from process: %v", err) |
| } |
| ep, err := inaming.NewEndpoint(addr) |
| if err != nil { |
| t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err) |
| } |
| if _, err := client.Dial(ep); err != nil { |
| t.Fatal(err) |
| } |
| server.Cleanup() |
| // A new VC cannot be created since the server is dead |
| if _, err := client.Dial(ep); err == nil { |
| t.Fatal("Expected client.Dial to fail since server is dead") |
| } |
| // Restarting the server, listening on the same address as before |
| server = blackbox.HelperCommand(t, "runServer", addr) |
| defer server.Cleanup() |
| server.Cmd.Start() |
| if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil { |
| t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr) |
| } |
| if _, err := client.Dial(ep); err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| // Required by blackbox framework |
| func TestHelperProcess(t *testing.T) { |
| blackbox.HelperProcess(t) |
| } |
| |
| func runServer(argv []string) { |
| server := InternalNew(naming.FixedRoutingID(0x55555555)) |
| _, ep, err := server.Listen("tcp", argv[0], veyron2.LocalID(security.FakePrivateID("server"))) |
| if err != nil { |
| fmt.Println(err) |
| return |
| } |
| fmt.Println(ep.Addr()) |
| // Live forever (till the process is explicitly killed) |
| <-make(chan struct{}) |
| } |
| |
| func readLine(f stream.Flow) (string, error) { |
| var result bytes.Buffer |
| var buf [5]byte |
| for { |
| n, err := f.Read(buf[:]) |
| result.Write(buf[:n]) |
| if err == io.EOF || buf[n-1] == '\n' { |
| return strings.TrimRight(result.String(), "\n"), nil |
| } |
| if err != nil { |
| return "", fmt.Errorf("Read returned (%d, %v)", n, err) |
| } |
| } |
| } |
| |
| func writeLine(f stream.Flow, data string) error { |
| data = data + "\n" |
| vlog.VI(1).Infof("write sending %d bytes", len(data)) |
| if n, err := f.Write([]byte(data)); err != nil { |
| return fmt.Errorf("Write returned (%d, %v)", n, err) |
| } |
| return nil |
| } |