veyron/runtimes/google/ipc: move to using modules instead of blackbox.
Change-Id: I12ef958cc04ed91c6315ce402d5b470ac323eb6c
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 0997553..4e4a5bf 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -13,10 +13,8 @@
"veyron.io/veyron/veyron/lib/netstate"
_ "veyron.io/veyron/veyron/lib/testutil"
- "veyron.io/veyron/veyron/lib/testutil/blackbox"
"veyron.io/veyron/veyron/profiles"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
- "veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
@@ -978,56 +976,6 @@
}
}
-// TestReconnect verifies that the client transparently re-establishes the
-// connection to the server if the server dies and comes back (on the same
-// endpoint).
-func TestReconnect(t *testing.T) {
- b := createBundle(t, sectest.NewPrincipal("client"), nil, nil) // We only need the client from the bundle.
- defer b.cleanup(t)
- server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0")
- server.Cmd.Start()
- addr, err := server.ReadLineFromChild()
- if err != nil {
- t.Fatalf("Failed to read server address from process: %v", err)
- }
- ep, err := inaming.NewEndpoint(addr)
- if err != nil {
- t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
- }
- serverName := naming.JoinAddressName(ep.String(), "suffix")
- makeCall := func() (string, error) {
- call, err := b.client.StartCall(testContext(), serverName, "Echo", []interface{}{"bratman"})
- if err != nil {
- return "", err
- }
- var result string
- if err = call.Finish(&result); err != nil {
- return "", err
- }
- return result, nil
- }
- expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
- if result, err := makeCall(); err != nil || result != expected {
- t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
- }
- // Kill the server, verify client can't talk to it anymore.
- server.Cleanup()
- if _, err := makeCall(); err == nil {
- t.Fatal("Expected call to fail since server is dead")
- }
- // Resurrect the server with the same address, verify client
- // re-establishes the connection.
- server = blackbox.HelperCommand(t, "runServer", addr)
- defer server.Cleanup()
- server.Cmd.Start()
- if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
- t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
- }
- if result, err := makeCall(); err != nil || result != expected {
- t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
- }
-}
-
func TestPreferredAddress(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
@@ -1096,154 +1044,6 @@
}
}
-type proxyHandle struct {
- ns naming.Namespace
- process *blackbox.Child
- mount string
-}
-
-func (h *proxyHandle) Start(t *testing.T) error {
- h.process = blackbox.HelperCommand(t, "runProxy")
- h.process.Cmd.Start()
- var err error
- if h.mount, err = h.process.ReadLineFromChild(); err != nil {
- return err
- }
- if err := h.ns.Mount(testContext(), "proxy", h.mount, time.Hour); err != nil {
- return err
- }
- return nil
-}
-
-func (h *proxyHandle) Stop() error {
- if h.process == nil {
- return nil
- }
- h.process.Cleanup()
- h.process = nil
- if len(h.mount) == 0 {
- return nil
- }
- return h.ns.Unmount(testContext(), "proxy", h.mount)
-}
-
-func TestProxy(t *testing.T) {
- sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
- ns := tnaming.NewSimpleNamespace()
- client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("client")})
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
- if err != nil {
- t.Fatal(err)
- }
- defer server.Stop()
-
- name := "mountpoint/server/suffix"
- makeCall := func() (string, error) {
- call, err := client.StartCall(testContext(), name, "Echo", []interface{}{"batman"})
- if err != nil {
- return "", err
- }
- var result string
- if err = call.Finish(&result); err != nil {
- return "", err
- }
- return result, nil
- }
- proxy := &proxyHandle{ns: ns}
- if err := proxy.Start(t); err != nil {
- t.Fatal(err)
- }
- defer proxy.Stop()
- spec := *profiles.LocalListenSpec
- spec.Proxy = "proxy"
- if _, err := server.ListenX(&spec); err != nil {
- t.Fatal(err)
- }
- if err := server.Serve("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
- t.Fatal(err)
- }
- verifyMount(t, ns, name)
- // Proxied endpoint should be published and RPC should succeed (through proxy)
- const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
- if result, err := makeCall(); result != expected || err != nil {
- t.Fatalf("Got (%v, %v) want (%v, nil)", result, err, expected)
- }
-
- // Proxy dies, calls should fail and the name should be unmounted.
- if err := proxy.Stop(); err != nil {
- t.Fatal(err)
- }
- if result, err := makeCall(); err == nil {
- t.Fatalf(`Got (%v, %v) want ("", <non-nil>) as proxy is down`, result, err)
- }
- for {
- if _, err := ns.Resolve(testContext(), name); err != nil {
- break
- }
- }
- verifyMountMissing(t, ns, name)
-
- // Proxy restarts, calls should eventually start succeeding.
- if err := proxy.Start(t); err != nil {
- t.Fatal(err)
- }
- for {
- if result, err := makeCall(); err == nil {
- if result != expected {
- t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
- }
- break
- }
- }
-}
-
-func runServer(argv []string) {
- mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
- ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), mgr, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
- if err != nil {
- vlog.Fatalf("InternalNewServer failed: %v", err)
- }
- disp := testServerDisp{new(testServer)}
- if err := server.Serve("server", disp); err != nil {
- vlog.Fatalf("server.Register failed: %v", err)
- }
- spec := *profiles.LocalListenSpec
- spec.Address = argv[0]
- ep, err := server.ListenX(&spec)
- if err != nil {
- vlog.Fatalf("server.Listen failed: %v", err)
- }
- fmt.Println(ep.Addr())
- // Live forever (parent process should explicitly kill us).
- <-make(chan struct{})
-}
-
-func runProxy([]string) {
- rid, err := naming.NewRoutingID()
- if err != nil {
- vlog.Fatal(err)
- }
- proxy, err := proxy.New(rid, sectest.NewPrincipal("proxy"), "tcp", "127.0.0.1:0", "")
- if err != nil {
- vlog.Fatal(err)
- }
- fmt.Println("/" + proxy.Endpoint().String())
- <-make(chan struct{})
-}
-
-// Required by blackbox framework.
-func TestHelperProcess(t *testing.T) {
- blackbox.HelperProcess(t)
-}
-
func init() {
- blackbox.CommandTable["runServer"] = runServer
- blackbox.CommandTable["runProxy"] = runProxy
-
vom.Register(fakeTimeCaveat(0))
}
diff --git a/runtimes/google/ipc/roaming_test.go b/runtimes/google/ipc/roaming_test.go
deleted file mode 100644
index 57437bd..0000000
--- a/runtimes/google/ipc/roaming_test.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package ipc_test
-
-/*
-func startRoamingServer() {
- mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
- ns := newNamespace()
-}
-
-func TestRoamingListen(t *testing.T) {
- r, err := rt.New()
- if err != nil {
- t.Errorf("unexpected error: %s", err)
- }
- publisher := config.CreateStream("test")
-
- server, err := r.NewServer()
- defer server.Stop()
-
- if err != nil {
- t.Errorf("unexpected error: %s", err)
- }
- server.RoamingListen(r.Publisher())
- server.Serve(nil)
-}
-*/
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index ddefad2..608424f 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -384,12 +384,17 @@
// (2) Reconnect to the proxy unless the server has been stopped
backoff := min
ln = nil
+ // TODO(ashankar,cnicolaou): this code is way too confusing and should
+ // be cleaned up.
for ln == nil {
select {
case <-time.After(backoff):
resolved, err := s.resolveToAddress(proxy)
if err != nil {
vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
+ if backoff = backoff * 2; backoff > max {
+ backoff = max
+ }
break
}
var ep naming.Endpoint
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
new file mode 100644
index 0000000..5842e50
--- /dev/null
+++ b/runtimes/google/ipc/server_test.go
@@ -0,0 +1,241 @@
+package ipc
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2/naming"
+
+ "veyron.io/veyron/veyron/lib/expect"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
+
+ "veyron.io/veyron/veyron/lib/modules"
+ "veyron.io/veyron/veyron/profiles"
+ imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
+ inaming "veyron.io/veyron/veyron/runtimes/google/naming"
+ tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
+)
+
+// TestReconnect verifies that the client transparently re-establishes the
+// connection to the server if the server dies and comes back (on the same
+// endpoint).
+func TestReconnect(t *testing.T) {
+ b := createBundle(t, sectest.NewPrincipal("client"), nil, nil) // We only need the client from the bundle.
+ defer b.cleanup(t)
+ sh := modules.NewShell()
+ defer sh.Cleanup(os.Stderr, os.Stderr)
+ server, err := sh.Start("runServer", "127.0.0.1:0")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ session := expect.NewSession(t, server.Stdout(), time.Minute)
+ addr := session.ReadLine()
+ ep, err := inaming.NewEndpoint(addr)
+ if err != nil {
+ t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
+ }
+ serverName := naming.JoinAddressName(ep.String(), "suffix")
+ makeCall := func() (string, error) {
+ ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
+ call, err := b.client.StartCall(ctx, serverName, "Echo", []interface{}{"bratman"})
+ if err != nil {
+ return "", fmt.Errorf("START: %s", err)
+ }
+ var result string
+ if err = call.Finish(&result); err != nil {
+ return "", err
+ }
+ return result, nil
+ }
+ expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+ if result, err := makeCall(); err != nil || result != expected {
+ t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+ }
+ // Kill the server, verify client can't talk to it anymore.
+ server.Shutdown(nil, nil)
+ if _, err := makeCall(); err == nil || !strings.HasPrefix(err.Error(), "START") {
+ t.Fatalf(`Got (%v) want ("START: <err>") as server is down`, err)
+ }
+
+ // Resurrect the server with the same address, verify client
+ // re-establishes the connection.
+ server, err = sh.Start("runServer", addr)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ session = expect.NewSession(t, server.Stdout(), time.Minute)
+ defer server.Shutdown(nil, nil)
+ session.Expect(addr)
+ if result, err := makeCall(); err != nil || result != expected {
+ t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+ }
+}
+
+type proxyHandle struct {
+ ns naming.Namespace
+ process modules.Handle
+ session *expect.Session
+ mount string
+}
+
+func (h *proxyHandle) Start(t *testing.T) error {
+ sh := modules.NewShell()
+ server, err := sh.Start("runProxy")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ h.process = server
+ h.session = expect.NewSession(t, server.Stdout(), time.Minute)
+ h.mount = h.session.ReadLine()
+ if err := h.session.Error(); err != nil {
+ return err
+ }
+ if err := h.ns.Mount(testContext(), "proxy", h.mount, time.Hour); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (h *proxyHandle) Stop() error {
+ if h.process == nil {
+ return nil
+ }
+ h.process.Shutdown(os.Stderr, os.Stderr)
+ h.process = nil
+ if len(h.mount) == 0 {
+ return nil
+ }
+ return h.ns.Unmount(testContext(), "proxy", h.mount)
+}
+
+func TestProxy(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ ns := tnaming.NewSimpleNamespace()
+ client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("client")})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+ server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+
+ name := "mountpoint/server/suffix"
+ makeCall := func() (string, error) {
+ ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
+ // Let's fail fast so that the tests don't take as long to run.
+ call, err := client.StartCall(ctx, name, "Echo", []interface{}{"batman"})
+ if err != nil {
+ // proxy is down, we should return here/.... prepend
+ // the error with a well known string so that we can test for that.
+ return "", fmt.Errorf("RESOLVE: %s", err)
+ }
+ var result string
+ if err = call.Finish(&result); err != nil {
+ return "", err
+ }
+ return result, nil
+ }
+ proxy := &proxyHandle{ns: ns}
+ if err := proxy.Start(t); err != nil {
+ t.Fatal(err)
+ }
+ defer proxy.Stop()
+ spec := *profiles.LocalListenSpec
+ spec.Proxy = "proxy"
+ if _, err := server.ListenX(&spec); err != nil {
+ t.Fatal(err)
+ }
+ if err := server.Serve("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
+ t.Fatal(err)
+ }
+ verifyMount(t, ns, name)
+ // Proxied endpoint should be published and RPC should succeed (through proxy)
+ const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
+ if result, err := makeCall(); result != expected || err != nil {
+ t.Fatalf("Got (%v, %v) want (%v, nil)", result, err, expected)
+ }
+ // Proxy dies, calls should fail and the name should be unmounted.
+ if err := proxy.Stop(); err != nil {
+ t.Fatal(err)
+ }
+ if result, err := makeCall(); err == nil || !strings.HasPrefix(err.Error(), "RESOLVE") {
+ t.Fatalf(`Got (%v, %v) want ("", "RESOLVE: <err>") as proxy is down`, result, err)
+ }
+ for {
+ if _, err := ns.Resolve(testContext(), name); err != nil {
+ break
+ }
+ }
+ verifyMountMissing(t, ns, name)
+
+ // Proxy restarts, calls should eventually start succeeding.
+ if err := proxy.Start(t); err != nil {
+ t.Fatal(err)
+ }
+
+ for {
+ if result, err := makeCall(); err == nil {
+ if result != expected {
+ t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
+ }
+ break
+ }
+ }
+}
+
+func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
+ ns := tnaming.NewSimpleNamespace()
+ server, err := InternalNewServer(testContext(), mgr, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ if err != nil {
+ return fmt.Errorf("InternalNewServer failed: %v", err)
+ }
+ disp := testServerDisp{new(testServer)}
+ if err := server.Serve("server", disp); err != nil {
+ return fmt.Errorf("server.Register failed: %v", err)
+ }
+ spec := *profiles.LocalListenSpec
+ spec.Address = args[1]
+ ep, err := server.ListenX(&spec)
+ if err != nil {
+ return fmt.Errorf("server.Listen failed: %v", err)
+ }
+ fmt.Fprintf(stdout, "%s\n", ep.Addr())
+ // parent process should explicitly shut us down by closing stdin.
+ modules.WaitForEOF(stdin)
+ return nil
+}
+
+func runProxy(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ return err
+ }
+ proxy, err := proxy.New(rid, sectest.NewPrincipal("proxy"), "tcp", "127.0.0.1:0", "")
+ if err != nil {
+ return err
+ }
+ fmt.Fprintf(stdout, "/%s\n", proxy.Endpoint().String())
+ // parent process should explicitly shut us down by closing stdin.
+ modules.WaitForEOF(stdin)
+ return nil
+}
+
+// Required by modules framework.
+func TestHelperProcess(t *testing.T) {
+ modules.DispatchInTest()
+}
+
+func init() {
+ modules.RegisterChild("runServer", "[address]", runServer)
+ modules.RegisterChild("runProxy", "", runProxy)
+}