veyron/runtimes/google/ipc: move to using modules instead of blackbox.

Change-Id: I12ef958cc04ed91c6315ce402d5b470ac323eb6c
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 0997553..4e4a5bf 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -13,10 +13,8 @@
 
 	"veyron.io/veyron/veyron/lib/netstate"
 	_ "veyron.io/veyron/veyron/lib/testutil"
-	"veyron.io/veyron/veyron/lib/testutil/blackbox"
 	"veyron.io/veyron/veyron/profiles"
 	imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
-	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/version"
@@ -978,56 +976,6 @@
 	}
 }
 
-// TestReconnect verifies that the client transparently re-establishes the
-// connection to the server if the server dies and comes back (on the same
-// endpoint).
-func TestReconnect(t *testing.T) {
-	b := createBundle(t, sectest.NewPrincipal("client"), nil, nil) // We only need the client from the bundle.
-	defer b.cleanup(t)
-	server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0")
-	server.Cmd.Start()
-	addr, err := server.ReadLineFromChild()
-	if err != nil {
-		t.Fatalf("Failed to read server address from process: %v", err)
-	}
-	ep, err := inaming.NewEndpoint(addr)
-	if err != nil {
-		t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
-	}
-	serverName := naming.JoinAddressName(ep.String(), "suffix")
-	makeCall := func() (string, error) {
-		call, err := b.client.StartCall(testContext(), serverName, "Echo", []interface{}{"bratman"})
-		if err != nil {
-			return "", err
-		}
-		var result string
-		if err = call.Finish(&result); err != nil {
-			return "", err
-		}
-		return result, nil
-	}
-	expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
-	if result, err := makeCall(); err != nil || result != expected {
-		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
-	}
-	// Kill the server, verify client can't talk to it anymore.
-	server.Cleanup()
-	if _, err := makeCall(); err == nil {
-		t.Fatal("Expected call to fail since server is dead")
-	}
-	// Resurrect the server with the same address, verify client
-	// re-establishes the connection.
-	server = blackbox.HelperCommand(t, "runServer", addr)
-	defer server.Cleanup()
-	server.Cmd.Start()
-	if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
-		t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
-	}
-	if result, err := makeCall(); err != nil || result != expected {
-		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
-	}
-}
-
 func TestPreferredAddress(t *testing.T) {
 	sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
 	defer sm.Shutdown()
@@ -1096,154 +1044,6 @@
 	}
 }
 
-type proxyHandle struct {
-	ns      naming.Namespace
-	process *blackbox.Child
-	mount   string
-}
-
-func (h *proxyHandle) Start(t *testing.T) error {
-	h.process = blackbox.HelperCommand(t, "runProxy")
-	h.process.Cmd.Start()
-	var err error
-	if h.mount, err = h.process.ReadLineFromChild(); err != nil {
-		return err
-	}
-	if err := h.ns.Mount(testContext(), "proxy", h.mount, time.Hour); err != nil {
-		return err
-	}
-	return nil
-}
-
-func (h *proxyHandle) Stop() error {
-	if h.process == nil {
-		return nil
-	}
-	h.process.Cleanup()
-	h.process = nil
-	if len(h.mount) == 0 {
-		return nil
-	}
-	return h.ns.Unmount(testContext(), "proxy", h.mount)
-}
-
-func TestProxy(t *testing.T) {
-	sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
-	ns := tnaming.NewSimpleNamespace()
-	client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("client")})
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer client.Close()
-	server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer server.Stop()
-
-	name := "mountpoint/server/suffix"
-	makeCall := func() (string, error) {
-		call, err := client.StartCall(testContext(), name, "Echo", []interface{}{"batman"})
-		if err != nil {
-			return "", err
-		}
-		var result string
-		if err = call.Finish(&result); err != nil {
-			return "", err
-		}
-		return result, nil
-	}
-	proxy := &proxyHandle{ns: ns}
-	if err := proxy.Start(t); err != nil {
-		t.Fatal(err)
-	}
-	defer proxy.Stop()
-	spec := *profiles.LocalListenSpec
-	spec.Proxy = "proxy"
-	if _, err := server.ListenX(&spec); err != nil {
-		t.Fatal(err)
-	}
-	if err := server.Serve("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
-		t.Fatal(err)
-	}
-	verifyMount(t, ns, name)
-	// Proxied endpoint should be published and RPC should succeed (through proxy)
-	const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
-	if result, err := makeCall(); result != expected || err != nil {
-		t.Fatalf("Got (%v, %v) want (%v, nil)", result, err, expected)
-	}
-
-	// Proxy dies, calls should fail and the name should be unmounted.
-	if err := proxy.Stop(); err != nil {
-		t.Fatal(err)
-	}
-	if result, err := makeCall(); err == nil {
-		t.Fatalf(`Got (%v, %v) want ("", <non-nil>) as proxy is down`, result, err)
-	}
-	for {
-		if _, err := ns.Resolve(testContext(), name); err != nil {
-			break
-		}
-	}
-	verifyMountMissing(t, ns, name)
-
-	// Proxy restarts, calls should eventually start succeeding.
-	if err := proxy.Start(t); err != nil {
-		t.Fatal(err)
-	}
-	for {
-		if result, err := makeCall(); err == nil {
-			if result != expected {
-				t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
-			}
-			break
-		}
-	}
-}
-
-func runServer(argv []string) {
-	mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
-	ns := tnaming.NewSimpleNamespace()
-	server, err := InternalNewServer(testContext(), mgr, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
-	if err != nil {
-		vlog.Fatalf("InternalNewServer failed: %v", err)
-	}
-	disp := testServerDisp{new(testServer)}
-	if err := server.Serve("server", disp); err != nil {
-		vlog.Fatalf("server.Register failed: %v", err)
-	}
-	spec := *profiles.LocalListenSpec
-	spec.Address = argv[0]
-	ep, err := server.ListenX(&spec)
-	if err != nil {
-		vlog.Fatalf("server.Listen failed: %v", err)
-	}
-	fmt.Println(ep.Addr())
-	// Live forever (parent process should explicitly kill us).
-	<-make(chan struct{})
-}
-
-func runProxy([]string) {
-	rid, err := naming.NewRoutingID()
-	if err != nil {
-		vlog.Fatal(err)
-	}
-	proxy, err := proxy.New(rid, sectest.NewPrincipal("proxy"), "tcp", "127.0.0.1:0", "")
-	if err != nil {
-		vlog.Fatal(err)
-	}
-	fmt.Println("/" + proxy.Endpoint().String())
-	<-make(chan struct{})
-}
-
-// Required by blackbox framework.
-func TestHelperProcess(t *testing.T) {
-	blackbox.HelperProcess(t)
-}
-
 func init() {
-	blackbox.CommandTable["runServer"] = runServer
-	blackbox.CommandTable["runProxy"] = runProxy
-
 	vom.Register(fakeTimeCaveat(0))
 }
diff --git a/runtimes/google/ipc/roaming_test.go b/runtimes/google/ipc/roaming_test.go
deleted file mode 100644
index 57437bd..0000000
--- a/runtimes/google/ipc/roaming_test.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package ipc_test
-
-/*
-func startRoamingServer() {
-	mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
-	ns := newNamespace()
-}
-
-func TestRoamingListen(t *testing.T) {
-	r, err := rt.New()
-	if err != nil {
-		t.Errorf("unexpected error: %s", err)
-	}
-	publisher := config.CreateStream("test")
-
-	server, err := r.NewServer()
-	defer server.Stop()
-
-	if err != nil {
-		t.Errorf("unexpected error: %s", err)
-	}
-	server.RoamingListen(r.Publisher())
-	server.Serve(nil)
-}
-*/
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index ddefad2..608424f 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -384,12 +384,17 @@
 		// (2) Reconnect to the proxy unless the server has been stopped
 		backoff := min
 		ln = nil
+		// TODO(ashankar,cnicolaou): this code is way too confusing and should
+		// be cleaned up.
 		for ln == nil {
 			select {
 			case <-time.After(backoff):
 				resolved, err := s.resolveToAddress(proxy)
 				if err != nil {
 					vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
+					if backoff = backoff * 2; backoff > max {
+						backoff = max
+					}
 					break
 				}
 				var ep naming.Endpoint
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
new file mode 100644
index 0000000..5842e50
--- /dev/null
+++ b/runtimes/google/ipc/server_test.go
@@ -0,0 +1,241 @@
+package ipc
+
+import (
+	"fmt"
+	"io"
+	"os"
+	"strings"
+	"testing"
+	"time"
+
+	"veyron.io/veyron/veyron2/naming"
+
+	"veyron.io/veyron/veyron/lib/expect"
+	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
+
+	"veyron.io/veyron/veyron/lib/modules"
+	"veyron.io/veyron/veyron/profiles"
+	imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
+	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
+	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
+	inaming "veyron.io/veyron/veyron/runtimes/google/naming"
+	tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
+)
+
+// TestReconnect verifies that the client transparently re-establishes the
+// connection to the server if the server dies and comes back (on the same
+// endpoint).
+func TestReconnect(t *testing.T) {
+	b := createBundle(t, sectest.NewPrincipal("client"), nil, nil) // We only need the client from the bundle.
+	defer b.cleanup(t)
+	sh := modules.NewShell()
+	defer sh.Cleanup(os.Stderr, os.Stderr)
+	server, err := sh.Start("runServer", "127.0.0.1:0")
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	session := expect.NewSession(t, server.Stdout(), time.Minute)
+	addr := session.ReadLine()
+	ep, err := inaming.NewEndpoint(addr)
+	if err != nil {
+		t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
+	}
+	serverName := naming.JoinAddressName(ep.String(), "suffix")
+	makeCall := func() (string, error) {
+		ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
+		call, err := b.client.StartCall(ctx, serverName, "Echo", []interface{}{"bratman"})
+		if err != nil {
+			return "", fmt.Errorf("START: %s", err)
+		}
+		var result string
+		if err = call.Finish(&result); err != nil {
+			return "", err
+		}
+		return result, nil
+	}
+	expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+	if result, err := makeCall(); err != nil || result != expected {
+		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+	}
+	// Kill the server, verify client can't talk to it anymore.
+	server.Shutdown(nil, nil)
+	if _, err := makeCall(); err == nil || !strings.HasPrefix(err.Error(), "START") {
+		t.Fatalf(`Got (%v) want ("START: <err>") as server is down`, err)
+	}
+
+	// Resurrect the server with the same address, verify client
+	// re-establishes the connection.
+	server, err = sh.Start("runServer", addr)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	session = expect.NewSession(t, server.Stdout(), time.Minute)
+	defer server.Shutdown(nil, nil)
+	session.Expect(addr)
+	if result, err := makeCall(); err != nil || result != expected {
+		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+	}
+}
+
+type proxyHandle struct {
+	ns      naming.Namespace
+	process modules.Handle
+	session *expect.Session
+	mount   string
+}
+
+func (h *proxyHandle) Start(t *testing.T) error {
+	sh := modules.NewShell()
+	server, err := sh.Start("runProxy")
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	h.process = server
+	h.session = expect.NewSession(t, server.Stdout(), time.Minute)
+	h.mount = h.session.ReadLine()
+	if err := h.session.Error(); err != nil {
+		return err
+	}
+	if err := h.ns.Mount(testContext(), "proxy", h.mount, time.Hour); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (h *proxyHandle) Stop() error {
+	if h.process == nil {
+		return nil
+	}
+	h.process.Shutdown(os.Stderr, os.Stderr)
+	h.process = nil
+	if len(h.mount) == 0 {
+		return nil
+	}
+	return h.ns.Unmount(testContext(), "proxy", h.mount)
+}
+
+func TestProxy(t *testing.T) {
+	sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+	ns := tnaming.NewSimpleNamespace()
+	client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("client")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+	server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer server.Stop()
+
+	name := "mountpoint/server/suffix"
+	makeCall := func() (string, error) {
+		ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
+		// Let's fail fast so that the tests don't take as long to run.
+		call, err := client.StartCall(ctx, name, "Echo", []interface{}{"batman"})
+		if err != nil {
+			// proxy is down, we should return here/.... prepend
+			// the error with a well known string so that we can test for that.
+			return "", fmt.Errorf("RESOLVE: %s", err)
+		}
+		var result string
+		if err = call.Finish(&result); err != nil {
+			return "", err
+		}
+		return result, nil
+	}
+	proxy := &proxyHandle{ns: ns}
+	if err := proxy.Start(t); err != nil {
+		t.Fatal(err)
+	}
+	defer proxy.Stop()
+	spec := *profiles.LocalListenSpec
+	spec.Proxy = "proxy"
+	if _, err := server.ListenX(&spec); err != nil {
+		t.Fatal(err)
+	}
+	if err := server.Serve("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
+		t.Fatal(err)
+	}
+	verifyMount(t, ns, name)
+	// Proxied endpoint should be published and RPC should succeed (through proxy)
+	const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
+	if result, err := makeCall(); result != expected || err != nil {
+		t.Fatalf("Got (%v, %v) want (%v, nil)", result, err, expected)
+	}
+	// Proxy dies, calls should fail and the name should be unmounted.
+	if err := proxy.Stop(); err != nil {
+		t.Fatal(err)
+	}
+	if result, err := makeCall(); err == nil || !strings.HasPrefix(err.Error(), "RESOLVE") {
+		t.Fatalf(`Got (%v, %v) want ("", "RESOLVE: <err>") as proxy is down`, result, err)
+	}
+	for {
+		if _, err := ns.Resolve(testContext(), name); err != nil {
+			break
+		}
+	}
+	verifyMountMissing(t, ns, name)
+
+	// Proxy restarts, calls should eventually start succeeding.
+	if err := proxy.Start(t); err != nil {
+		t.Fatal(err)
+	}
+
+	for {
+		if result, err := makeCall(); err == nil {
+			if result != expected {
+				t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
+			}
+			break
+		}
+	}
+}
+
+func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
+	ns := tnaming.NewSimpleNamespace()
+	server, err := InternalNewServer(testContext(), mgr, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+	if err != nil {
+		return fmt.Errorf("InternalNewServer failed: %v", err)
+	}
+	disp := testServerDisp{new(testServer)}
+	if err := server.Serve("server", disp); err != nil {
+		return fmt.Errorf("server.Register failed: %v", err)
+	}
+	spec := *profiles.LocalListenSpec
+	spec.Address = args[1]
+	ep, err := server.ListenX(&spec)
+	if err != nil {
+		return fmt.Errorf("server.Listen failed: %v", err)
+	}
+	fmt.Fprintf(stdout, "%s\n", ep.Addr())
+	// parent process should explicitly shut us down by closing stdin.
+	modules.WaitForEOF(stdin)
+	return nil
+}
+
+func runProxy(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	rid, err := naming.NewRoutingID()
+	if err != nil {
+		return err
+	}
+	proxy, err := proxy.New(rid, sectest.NewPrincipal("proxy"), "tcp", "127.0.0.1:0", "")
+	if err != nil {
+		return err
+	}
+	fmt.Fprintf(stdout, "/%s\n", proxy.Endpoint().String())
+	// parent process should explicitly shut us down by closing stdin.
+	modules.WaitForEOF(stdin)
+	return nil
+}
+
+// Required by modules framework.
+func TestHelperProcess(t *testing.T) {
+	modules.DispatchInTest()
+}
+
+func init() {
+	modules.RegisterChild("runServer", "[address]", runServer)
+	modules.RegisterChild("runProxy", "", runProxy)
+}