veyron/runtimes/google/rt: Add code to synchronously clean up the runtime.

Change-Id: I44f67ce8e3eeb7756236943bbf2ee70a7a91cf4b
diff --git a/profiles/genericinit.go b/profiles/genericinit.go
index baf2f94..35c955a 100644
--- a/profiles/genericinit.go
+++ b/profiles/genericinit.go
@@ -15,21 +15,19 @@
 	veyron2.RegisterProfileInit(Init)
 }
 
-func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, error) {
-	var err error
+func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, veyron2.Shutdown, error) {
 	runtime := &grt.RuntimeX{}
-	ctx, err = runtime.Init(ctx, nil)
+	ctx, shutdown, err := runtime.Init(ctx, nil)
 	if err != nil {
-		return nil, nil, err
+		return nil, nil, nil, err
 	}
 
 	ac := appcycle.New()
 	ctx = runtime.SetAppCycle(ctx, ac)
-	if done := ctx.Done(); done != nil {
-		go func() {
-			<-done
-			ac.Shutdown()
-		}()
+
+	profileShutdown := func() {
+		shutdown()
+		ac.Shutdown()
 	}
-	return runtime, ctx, nil
+	return runtime, ctx, profileShutdown, nil
 }
diff --git a/profiles/static/staticinit.go b/profiles/static/staticinit.go
index 19c9a37..c91777c 100644
--- a/profiles/static/staticinit.go
+++ b/profiles/static/staticinit.go
@@ -30,12 +30,13 @@
 	veyron2.RegisterProfileInit(Init)
 }
 
-func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, error) {
+func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, veyron2.Shutdown, error) {
 	var err error
+	var shutdown veyron2.Shutdown
 	runtime := &grt.RuntimeX{}
-	ctx, err = runtime.Init(ctx, nil)
+	ctx, shutdown, err = runtime.Init(ctx, nil)
 	if err != nil {
-		return nil, nil, err
+		return nil, nil, nil, err
 	}
 	log := runtime.GetLogger(ctx)
 
@@ -49,12 +50,6 @@
 
 	ac := appcycle.New()
 	ctx = runtime.SetAppCycle(ctx, ac)
-	if done := ctx.Done(); done != nil {
-		go func() {
-			<-done
-			ac.Shutdown()
-		}()
-	}
 
 	// Our address is private, so we test for running on GCE and for its 1:1 NAT
 	// configuration. GCEPublicAddress returns a non-nil addr if we are running on GCE.
@@ -63,10 +58,15 @@
 			listenSpec.AddressChooser = func(string, []ipc.Address) ([]ipc.Address, error) {
 				return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
 			}
-			return runtime, ctx, nil
+			return runtime, ctx, shutdown, nil
 		}
 	}
 	listenSpec.AddressChooser = internal.IPAddressChooser
 	ctx = runtime.SetListenSpec(ctx, listenSpec)
-	return runtime, ctx, nil
+
+	profileShutdown := func() {
+		shutdown()
+		ac.Shutdown()
+	}
+	return runtime, ctx, profileShutdown, nil
 }
diff --git a/runtimes/google/rt/rtx_test.go b/runtimes/google/rt/rtx_test.go
index 212075d..06cdc56 100644
--- a/runtimes/google/rt/rtx_test.go
+++ b/runtimes/google/rt/rtx_test.go
@@ -30,17 +30,17 @@
 }
 
 // initForTest creates a context for use in a test.
-func initForTest() (*context.T, context.CancelFunc) {
+func initForTest() (*context.T, veyron2.Shutdown) {
+	ctx, cancel := context.WithCancel(nil)
 	r := &rt.RuntimeX{}
-	var ctx *context.T
-	var cancel context.CancelFunc
-	var err error
-	ctx, cancel = context.WithCancel(ctx)
-	ctx, err = r.Init(ctx, nil)
+	ctx, shutdown, err := r.Init(ctx, nil)
 	if err != nil {
 		panic(err)
 	}
-	return ctx, cancel
+	return ctx, func() {
+		cancel()
+		shutdown()
+	}
 }
 
 func TestHelperProcessX(t *testing.T) {
@@ -48,8 +48,8 @@
 }
 
 func TestInitX(t *testing.T) {
-	ctx, cancel := initForTest()
-	defer cancel()
+	ctx, shutdown := initForTest()
+	defer shutdown()
 
 	l := veyron2.GetLogger(ctx)
 	fmt.Println(l)
@@ -74,8 +74,8 @@
 }
 
 func childX(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
-	ctx, cancel := veyron2.Init()
-	defer cancel()
+	ctx, shutdown := veyron2.Init()
+	defer shutdown()
 
 	logger := veyron2.GetLogger(ctx)
 	vlog.Infof("%s\n", logger)
@@ -140,8 +140,8 @@
 }
 
 func principalX(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
-	ctx, cancel := veyron2.Init()
-	defer cancel()
+	ctx, shutdown := veyron2.Init()
+	defer shutdown()
 
 	p := veyron2.GetPrincipal(ctx)
 	if err := validatePrincipal(p); err != nil {
@@ -154,8 +154,8 @@
 // Runner runs a principal as a subprocess and reports back with its
 // own security info and it's childs.
 func runnerX(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
-	ctx, cancel := veyron2.Init()
-	defer cancel()
+	ctx, shutdown := veyron2.Init()
+	defer shutdown()
 
 	p := veyron2.GetPrincipal(ctx)
 	if err := validatePrincipal(p); err != nil {
diff --git a/runtimes/google/rt/runtimex.go b/runtimes/google/rt/runtimex.go
index ef249cc..1dce0b1 100644
--- a/runtimes/google/rt/runtimex.go
+++ b/runtimes/google/rt/runtimex.go
@@ -57,7 +57,10 @@
 var signals chan os.Signal
 
 func init() {
-	veyron2.RegisterRuntime("google", &RuntimeX{})
+	// TODO(mattr): Remove this hacky registration.
+	r := &RuntimeX{}
+	r.wait = sync.NewCond(&r.mu)
+	veyron2.RegisterRuntime("google", r)
 	runtimeFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime)
 }
 
@@ -84,9 +87,61 @@
 // RuntimeX implements the veyron2.RuntimeX interface.  It is stateless.
 // Please see the interface definition for documentation of the
 // individiual methods.
-type RuntimeX struct{}
+type RuntimeX struct {
+	mu       sync.Mutex
+	closed   bool
+	children int
+	wait     *sync.Cond
+}
 
-func (r *RuntimeX) Init(ctx *context.T, protocols []string) (*context.T, error) {
+func (r *RuntimeX) addChild(ctx *context.T, stop func()) error {
+	// TODO(mattr): Remove this hack once the transition is over.
+	if r == nil {
+		return nil
+	}
+	if r.wait == nil {
+		panic("no wait???")
+	}
+	r.mu.Lock()
+	if r.closed {
+		r.mu.Unlock()
+		stop()
+		return fmt.Errorf("The runtime has already been shutdown.")
+	}
+	r.children++
+	r.mu.Unlock()
+
+	if done := ctx.Done(); done != nil {
+		go func() {
+			<-done
+			stop()
+			r.mu.Lock()
+			r.children--
+			if r.children == 0 {
+				r.wait.Broadcast()
+			}
+			r.mu.Unlock()
+		}()
+	}
+	return nil
+}
+
+func (r *RuntimeX) cancel() {
+	// TODO(mattr): Remove this hack once the transition is over.
+	if r == nil {
+		return
+	}
+	r.mu.Lock()
+	r.closed = true
+	for r.children > 0 {
+		r.wait.Wait()
+	}
+	r.mu.Unlock()
+	vlog.FlushLog()
+}
+
+func (r *RuntimeX) Init(ctx *context.T, protocols []string) (*context.T, veyron2.Shutdown, error) {
+	r.wait = sync.NewCond(&r.mu)
 	handle, err := exec.GetChildHandle()
 	switch err {
 	case exec.ErrNoVersion:
@@ -96,7 +151,7 @@
 		// The process has been started through the veyron exec
 		// library.
 	default:
-		return nil, err
+		return nil, nil, err
 	}
 
 	r.initLogging(ctx)
@@ -135,23 +190,23 @@
 	// Setup the initial trace.
 	ctx, err = ivtrace.Init(ctx, flags.Vtrace)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 	ctx, _ = vtrace.SetNewTrace(ctx)
 
 	// Enable signal handling.
-	initSignalHandling(ctx)
+	r.initSignalHandling(ctx)
 
 	// Set the initial namespace.
 	ctx, _, err = r.setNewNamespace(ctx, flags.NamespaceRoots...)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	// Set the initial stream manager.
 	ctx, _, err = r.setNewStreamManager(ctx)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	// The clinet we attach here is incomplete (has a nil principal) and only works
@@ -159,46 +214,40 @@
 	// After security is initialized we will attach a real client.
 	ctx, _, err = r.SetNewClient(ctx)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	// Initialize security.
 	principal, err := initSecurity(ctx, handle, flags.Credentials)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 	ctx = context.WithValue(ctx, principalKey, principal)
 
 	// Set up secure client.
 	ctx, _, err = r.SetNewClient(ctx)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	// Initialize management.
 	if err := initMgmt(ctx, r.GetAppCycle(ctx), handle); err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	// TODO(suharshs,mattr): Go through the rt.Cleanup function and make sure everything
 	// gets cleaned up.
 
-	return ctx, nil
+	return ctx, r.cancel, nil
 }
 
 // initLogging configures logging for the runtime. It needs to be called after
 // flag.Parse and after signal handling has been initialized.
-func (*RuntimeX) initLogging(ctx *context.T) error {
-	if done := ctx.Done(); done != nil {
-		go func() {
-			<-done
-			vlog.FlushLog()
-		}()
-	}
+func (r *RuntimeX) initLogging(ctx *context.T) error {
 	return vlog.ConfigureLibraryLoggerFromFlags()
 }
 
-func initSignalHandling(ctx *context.T) {
+func (r *RuntimeX) initSignalHandling(ctx *context.T) {
 	// TODO(caprita): Given that our device manager implementation is to
 	// kill all child apps when the device manager dies, we should
 	// enable SIGHUP on apps by default.
@@ -210,16 +259,17 @@
 	signal.Notify(signals, syscall.SIGHUP)
 	go func() {
 		for {
-			vlog.Infof("Received signal %v", <-signals)
+			sig, ok := <-signals
+			if !ok {
+				break
+			}
+			vlog.Infof("Received signal %v", sig)
 		}
 	}()
-
-	if done := ctx.Done(); done != nil {
-		go func() {
-			<-done
-			signal.Stop(signals)
-		}()
-	}
+	r.addChild(ctx, func() {
+		signal.Stop(signals)
+		close(signals)
+	})
 }
 
 func (*RuntimeX) NewEndpoint(ep string) (naming.Endpoint, error) {
@@ -246,16 +296,18 @@
 		otherOpts = append(otherOpts, iipc.PreferredServerResolveProtocols(protocols))
 	}
 	server, err := iipc.InternalNewServer(ctx, sm, ns, otherOpts...)
-	if done := ctx.Done(); err == nil && done != nil {
-		// Arrange to clean up the server when the parent context is canceled.
-		// TODO(mattr): Should we actually do this?  Or just have users clean
-		// their own servers up manually?
-		go func() {
-			<-done
-			server.Stop()
-		}()
+	if err != nil {
+		return nil, err
 	}
-	return server, err
+	stop := func() {
+		if err := server.Stop(); err != nil {
+			vlog.Errorf("A server could not be stopped: %v", err)
+		}
+	}
+	if err = r.addChild(ctx, stop); err != nil {
+		return nil, err
+	}
+	return server, nil
 }
 
 func (r *RuntimeX) setNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
@@ -264,17 +316,11 @@
 		return ctx, nil, err
 	}
 	sm := imanager.InternalNew(rid)
-	ctx = context.WithValue(ctx, streamManagerKey, sm)
-
-	// Arrange for the manager to shut itself down when the context is canceled.
-	if done := ctx.Done(); done != nil {
-		go func() {
-			<-done
-			sm.Shutdown()
-		}()
+	newctx := context.WithValue(ctx, streamManagerKey, sm)
+	if err = r.addChild(ctx, sm.Shutdown); err != nil {
+		return ctx, nil, err
 	}
-
-	return ctx, sm, nil
+	return newctx, sm, nil
 }
 
 func (r *RuntimeX) SetNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
@@ -288,7 +334,6 @@
 	if err != nil {
 		return ctx, nil, err
 	}
-
 	return newctx, sm, nil
 }
 
@@ -325,7 +370,7 @@
 	return p
 }
 
-func (*RuntimeX) SetNewClient(ctx *context.T, opts ...ipc.ClientOpt) (*context.T, ipc.Client, error) {
+func (r *RuntimeX) SetNewClient(ctx *context.T, opts ...ipc.ClientOpt) (*context.T, ipc.Client, error) {
 	otherOpts := append([]ipc.ClientOpt{}, opts...)
 
 	// TODO(mattr, suharshs):  Currently there are a lot of things that can come in as opts.
@@ -342,16 +387,14 @@
 	}
 
 	client, err := iipc.InternalNewClient(sm, ns, otherOpts...)
-	if err == nil {
-		if done := ctx.Done(); done != nil {
-			go func() {
-				<-done
-				client.Close()
-			}()
-		}
-		ctx = SetClient(ctx, client)
+	if err != nil {
+		return ctx, nil, err
 	}
-	return ctx, client, err
+	newctx := SetClient(ctx, client)
+	if err = r.addChild(ctx, client.Close); err != nil {
+		return ctx, nil, err
+	}
+	return newctx, client, err
 }
 
 func (*RuntimeX) GetClient(ctx *context.T) ipc.Client {
@@ -436,7 +479,8 @@
 	return profile
 }
 
-func (*RuntimeX) SetAppCycle(ctx *context.T, appCycle veyron2.AppCycle) *context.T {
+// SetAppCycle attaches an appCycle to the context.
+func (r *RuntimeX) SetAppCycle(ctx *context.T, appCycle veyron2.AppCycle) *context.T {
 	return context.WithValue(ctx, appCycleKey, appCycle)
 }
 
diff --git a/runtimes/google/rt/runtimex_test.go b/runtimes/google/rt/runtimex_test.go
index a78ad3a..6beb75d 100644
--- a/runtimes/google/rt/runtimex_test.go
+++ b/runtimes/google/rt/runtimex_test.go
@@ -3,6 +3,7 @@
 import (
 	"testing"
 
+	"v.io/core/veyron2"
 	"v.io/core/veyron2/context"
 
 	"v.io/core/veyron/runtimes/google/rt"
@@ -10,34 +11,32 @@
 )
 
 // InitForTest creates a context for use in a test.
-func InitForTest(t *testing.T) (*context.T, context.CancelFunc) {
+func InitForTest(t *testing.T) (*rt.RuntimeX, *context.T, veyron2.Shutdown) {
+	ctx, cancel := context.WithCancel(nil)
 	r := &rt.RuntimeX{}
-	var ctx *context.T
-	var cancel context.CancelFunc
-	var err error
-	ctx, cancel = context.WithCancel(ctx)
-	ctx, err = r.Init(ctx, nil)
+	ctx, shutdown, err := r.Init(ctx, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	return ctx, cancel
+	return r, ctx, func() {
+		cancel()
+		shutdown()
+	}
 }
 
 func TestNewServer(t *testing.T) {
-	ctx, cancel := InitForTest(t)
-	defer cancel()
+	r, ctx, shutdown := InitForTest(t)
+	defer shutdown()
 
-	r := &rt.RuntimeX{}
 	if s, err := r.NewServer(ctx); err != nil || s == nil {
 		t.Fatalf("Could not create server: %v", err)
 	}
 }
 
 func TestStreamManager(t *testing.T) {
-	ctx, cancel := InitForTest(t)
-	defer cancel()
+	r, ctx, shutdown := InitForTest(t)
+	defer shutdown()
 
-	r := &rt.RuntimeX{}
 	orig := r.GetStreamManager(ctx)
 
 	c2, sm, err := r.SetNewStreamManager(ctx)
@@ -56,10 +55,8 @@
 }
 
 func TestPrincipal(t *testing.T) {
-	ctx, cancel := InitForTest(t)
-	defer cancel()
-
-	r := &rt.RuntimeX{}
+	r, ctx, shutdown := InitForTest(t)
+	defer shutdown()
 
 	p2, err := security.NewPrincipal()
 	if err != nil {
@@ -78,10 +75,9 @@
 }
 
 func TestClient(t *testing.T) {
-	ctx, cancel := InitForTest(t)
-	defer cancel()
+	r, ctx, shutdown := InitForTest(t)
+	defer shutdown()
 
-	r := &rt.RuntimeX{}
 	orig := r.GetClient(ctx)
 
 	c2, client, err := r.SetNewClient(ctx)
@@ -100,10 +96,9 @@
 }
 
 func TestNamespace(t *testing.T) {
-	ctx, cancel := InitForTest(t)
-	defer cancel()
+	r, ctx, shutdown := InitForTest(t)
+	defer shutdown()
 
-	r := &rt.RuntimeX{}
 	orig := r.GetNamespace(ctx)
 
 	newroots := []string{"/newroot1", "/newroot2"}
diff --git a/runtimes/google/rt/signal.go b/runtimes/google/rt/signal.go
index 8ca4643..da7e1db 100644
--- a/runtimes/google/rt/signal.go
+++ b/runtimes/google/rt/signal.go
@@ -20,11 +20,16 @@
 	signal.Notify(r.signals, syscall.SIGHUP)
 	go func() {
 		for {
-			vlog.Infof("Received signal %v", <-r.signals)
+			sig, ok := <-r.signals
+			if !ok {
+				break
+			}
+			vlog.Infof("Received signal %v", sig)
 		}
 	}()
 }
 
 func (r *vrt) shutdownSignalHandling() {
 	signal.Stop(r.signals)
+	close(r.signals)
 }