veyron/runtimes/google/rt: Add code to synchronously clean up the runtime.
Change-Id: I44f67ce8e3eeb7756236943bbf2ee70a7a91cf4b
diff --git a/profiles/genericinit.go b/profiles/genericinit.go
index baf2f94..35c955a 100644
--- a/profiles/genericinit.go
+++ b/profiles/genericinit.go
@@ -15,21 +15,19 @@
veyron2.RegisterProfileInit(Init)
}
-func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, error) {
- var err error
+func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, veyron2.Shutdown, error) {
runtime := &grt.RuntimeX{}
- ctx, err = runtime.Init(ctx, nil)
+ ctx, shutdown, err := runtime.Init(ctx, nil)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
ac := appcycle.New()
ctx = runtime.SetAppCycle(ctx, ac)
- if done := ctx.Done(); done != nil {
- go func() {
- <-done
- ac.Shutdown()
- }()
+
+ profileShutdown := func() {
+ shutdown()
+ ac.Shutdown()
}
- return runtime, ctx, nil
+ return runtime, ctx, profileShutdown, nil
}
diff --git a/profiles/static/staticinit.go b/profiles/static/staticinit.go
index 19c9a37..c91777c 100644
--- a/profiles/static/staticinit.go
+++ b/profiles/static/staticinit.go
@@ -30,12 +30,13 @@
veyron2.RegisterProfileInit(Init)
}
-func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, error) {
+func Init(ctx *context.T) (veyron2.RuntimeX, *context.T, veyron2.Shutdown, error) {
var err error
+ var shutdown veyron2.Shutdown
runtime := &grt.RuntimeX{}
- ctx, err = runtime.Init(ctx, nil)
+ ctx, shutdown, err = runtime.Init(ctx, nil)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
log := runtime.GetLogger(ctx)
@@ -49,12 +50,6 @@
ac := appcycle.New()
ctx = runtime.SetAppCycle(ctx, ac)
- if done := ctx.Done(); done != nil {
- go func() {
- <-done
- ac.Shutdown()
- }()
- }
// Our address is private, so we test for running on GCE and for its 1:1 NAT
// configuration. GCEPublicAddress returns a non-nil addr if we are running on GCE.
@@ -63,10 +58,15 @@
listenSpec.AddressChooser = func(string, []ipc.Address) ([]ipc.Address, error) {
return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
}
- return runtime, ctx, nil
+ return runtime, ctx, shutdown, nil
}
}
listenSpec.AddressChooser = internal.IPAddressChooser
ctx = runtime.SetListenSpec(ctx, listenSpec)
- return runtime, ctx, nil
+
+ profileShutdown := func() {
+ shutdown()
+ ac.Shutdown()
+ }
+ return runtime, ctx, profileShutdown, nil
}
diff --git a/runtimes/google/rt/rtx_test.go b/runtimes/google/rt/rtx_test.go
index 212075d..06cdc56 100644
--- a/runtimes/google/rt/rtx_test.go
+++ b/runtimes/google/rt/rtx_test.go
@@ -30,17 +30,17 @@
}
// initForTest creates a context for use in a test.
-func initForTest() (*context.T, context.CancelFunc) {
+func initForTest() (*context.T, veyron2.Shutdown) {
+ ctx, cancel := context.WithCancel(nil)
r := &rt.RuntimeX{}
- var ctx *context.T
- var cancel context.CancelFunc
- var err error
- ctx, cancel = context.WithCancel(ctx)
- ctx, err = r.Init(ctx, nil)
+ ctx, shutdown, err := r.Init(ctx, nil)
if err != nil {
panic(err)
}
- return ctx, cancel
+ return ctx, func() {
+ cancel()
+ shutdown()
+ }
}
func TestHelperProcessX(t *testing.T) {
@@ -48,8 +48,8 @@
}
func TestInitX(t *testing.T) {
- ctx, cancel := initForTest()
- defer cancel()
+ ctx, shutdown := initForTest()
+ defer shutdown()
l := veyron2.GetLogger(ctx)
fmt.Println(l)
@@ -74,8 +74,8 @@
}
func childX(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- ctx, cancel := veyron2.Init()
- defer cancel()
+ ctx, shutdown := veyron2.Init()
+ defer shutdown()
logger := veyron2.GetLogger(ctx)
vlog.Infof("%s\n", logger)
@@ -140,8 +140,8 @@
}
func principalX(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- ctx, cancel := veyron2.Init()
- defer cancel()
+ ctx, shutdown := veyron2.Init()
+ defer shutdown()
p := veyron2.GetPrincipal(ctx)
if err := validatePrincipal(p); err != nil {
@@ -154,8 +154,8 @@
// Runner runs a principal as a subprocess and reports back with its
// own security info and it's childs.
func runnerX(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- ctx, cancel := veyron2.Init()
- defer cancel()
+ ctx, shutdown := veyron2.Init()
+ defer shutdown()
p := veyron2.GetPrincipal(ctx)
if err := validatePrincipal(p); err != nil {
diff --git a/runtimes/google/rt/runtimex.go b/runtimes/google/rt/runtimex.go
index ef249cc..1dce0b1 100644
--- a/runtimes/google/rt/runtimex.go
+++ b/runtimes/google/rt/runtimex.go
@@ -57,7 +57,10 @@
var signals chan os.Signal
func init() {
- veyron2.RegisterRuntime("google", &RuntimeX{})
+ // TODO(mattr): Remove this hacky registration.
+ r := &RuntimeX{}
+ r.wait = sync.NewCond(&r.mu)
+ veyron2.RegisterRuntime("google", r)
runtimeFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime)
}
@@ -84,9 +87,61 @@
// RuntimeX implements the veyron2.RuntimeX interface. It is stateless.
// Please see the interface definition for documentation of the
// individiual methods.
-type RuntimeX struct{}
+type RuntimeX struct {
+ mu sync.Mutex
+ closed bool
+ children int
+ wait *sync.Cond
+}
-func (r *RuntimeX) Init(ctx *context.T, protocols []string) (*context.T, error) {
+func (r *RuntimeX) addChild(ctx *context.T, stop func()) error {
+ // TODO(mattr): Remove this hack once the transition is over.
+ if r == nil {
+ return nil
+ }
+ if r.wait == nil {
+ panic("no wait???")
+ }
+ r.mu.Lock()
+ if r.closed {
+ r.mu.Unlock()
+ stop()
+ return fmt.Errorf("The runtime has already been shutdown.")
+ }
+ r.children++
+ r.mu.Unlock()
+
+ if done := ctx.Done(); done != nil {
+ go func() {
+ <-done
+ stop()
+ r.mu.Lock()
+ r.children--
+ if r.children == 0 {
+ r.wait.Broadcast()
+ }
+ r.mu.Unlock()
+ }()
+ }
+ return nil
+}
+
+func (r *RuntimeX) cancel() {
+ // TODO(mattr): Remove this hack once the transition is over.
+ if r == nil {
+ return
+ }
+ r.mu.Lock()
+ r.closed = true
+ for r.children > 0 {
+ r.wait.Wait()
+ }
+ r.mu.Unlock()
+ vlog.FlushLog()
+}
+
+func (r *RuntimeX) Init(ctx *context.T, protocols []string) (*context.T, veyron2.Shutdown, error) {
+ r.wait = sync.NewCond(&r.mu)
handle, err := exec.GetChildHandle()
switch err {
case exec.ErrNoVersion:
@@ -96,7 +151,7 @@
// The process has been started through the veyron exec
// library.
default:
- return nil, err
+ return nil, nil, err
}
r.initLogging(ctx)
@@ -135,23 +190,23 @@
// Setup the initial trace.
ctx, err = ivtrace.Init(ctx, flags.Vtrace)
if err != nil {
- return nil, err
+ return nil, nil, err
}
ctx, _ = vtrace.SetNewTrace(ctx)
// Enable signal handling.
- initSignalHandling(ctx)
+ r.initSignalHandling(ctx)
// Set the initial namespace.
ctx, _, err = r.setNewNamespace(ctx, flags.NamespaceRoots...)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// Set the initial stream manager.
ctx, _, err = r.setNewStreamManager(ctx)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// The clinet we attach here is incomplete (has a nil principal) and only works
@@ -159,46 +214,40 @@
// After security is initialized we will attach a real client.
ctx, _, err = r.SetNewClient(ctx)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// Initialize security.
principal, err := initSecurity(ctx, handle, flags.Credentials)
if err != nil {
- return nil, err
+ return nil, nil, err
}
ctx = context.WithValue(ctx, principalKey, principal)
// Set up secure client.
ctx, _, err = r.SetNewClient(ctx)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// Initialize management.
if err := initMgmt(ctx, r.GetAppCycle(ctx), handle); err != nil {
- return nil, err
+ return nil, nil, err
}
// TODO(suharshs,mattr): Go through the rt.Cleanup function and make sure everything
// gets cleaned up.
- return ctx, nil
+ return ctx, r.cancel, nil
}
// initLogging configures logging for the runtime. It needs to be called after
// flag.Parse and after signal handling has been initialized.
-func (*RuntimeX) initLogging(ctx *context.T) error {
- if done := ctx.Done(); done != nil {
- go func() {
- <-done
- vlog.FlushLog()
- }()
- }
+func (r *RuntimeX) initLogging(ctx *context.T) error {
return vlog.ConfigureLibraryLoggerFromFlags()
}
-func initSignalHandling(ctx *context.T) {
+func (r *RuntimeX) initSignalHandling(ctx *context.T) {
// TODO(caprita): Given that our device manager implementation is to
// kill all child apps when the device manager dies, we should
// enable SIGHUP on apps by default.
@@ -210,16 +259,17 @@
signal.Notify(signals, syscall.SIGHUP)
go func() {
for {
- vlog.Infof("Received signal %v", <-signals)
+ sig, ok := <-signals
+ if !ok {
+ break
+ }
+ vlog.Infof("Received signal %v", sig)
}
}()
-
- if done := ctx.Done(); done != nil {
- go func() {
- <-done
- signal.Stop(signals)
- }()
- }
+ r.addChild(ctx, func() {
+ signal.Stop(signals)
+ close(signals)
+ })
}
func (*RuntimeX) NewEndpoint(ep string) (naming.Endpoint, error) {
@@ -246,16 +296,18 @@
otherOpts = append(otherOpts, iipc.PreferredServerResolveProtocols(protocols))
}
server, err := iipc.InternalNewServer(ctx, sm, ns, otherOpts...)
- if done := ctx.Done(); err == nil && done != nil {
- // Arrange to clean up the server when the parent context is canceled.
- // TODO(mattr): Should we actually do this? Or just have users clean
- // their own servers up manually?
- go func() {
- <-done
- server.Stop()
- }()
+ if err != nil {
+ return nil, err
}
- return server, err
+ stop := func() {
+ if err := server.Stop(); err != nil {
+ vlog.Errorf("A server could not be stopped: %v", err)
+ }
+ }
+ if err = r.addChild(ctx, stop); err != nil {
+ return nil, err
+ }
+ return server, nil
}
func (r *RuntimeX) setNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
@@ -264,17 +316,11 @@
return ctx, nil, err
}
sm := imanager.InternalNew(rid)
- ctx = context.WithValue(ctx, streamManagerKey, sm)
-
- // Arrange for the manager to shut itself down when the context is canceled.
- if done := ctx.Done(); done != nil {
- go func() {
- <-done
- sm.Shutdown()
- }()
+ newctx := context.WithValue(ctx, streamManagerKey, sm)
+ if err = r.addChild(ctx, sm.Shutdown); err != nil {
+ return ctx, nil, err
}
-
- return ctx, sm, nil
+ return newctx, sm, nil
}
func (r *RuntimeX) SetNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
@@ -288,7 +334,6 @@
if err != nil {
return ctx, nil, err
}
-
return newctx, sm, nil
}
@@ -325,7 +370,7 @@
return p
}
-func (*RuntimeX) SetNewClient(ctx *context.T, opts ...ipc.ClientOpt) (*context.T, ipc.Client, error) {
+func (r *RuntimeX) SetNewClient(ctx *context.T, opts ...ipc.ClientOpt) (*context.T, ipc.Client, error) {
otherOpts := append([]ipc.ClientOpt{}, opts...)
// TODO(mattr, suharshs): Currently there are a lot of things that can come in as opts.
@@ -342,16 +387,14 @@
}
client, err := iipc.InternalNewClient(sm, ns, otherOpts...)
- if err == nil {
- if done := ctx.Done(); done != nil {
- go func() {
- <-done
- client.Close()
- }()
- }
- ctx = SetClient(ctx, client)
+ if err != nil {
+ return ctx, nil, err
}
- return ctx, client, err
+ newctx := SetClient(ctx, client)
+ if err = r.addChild(ctx, client.Close); err != nil {
+ return ctx, nil, err
+ }
+ return newctx, client, err
}
func (*RuntimeX) GetClient(ctx *context.T) ipc.Client {
@@ -436,7 +479,8 @@
return profile
}
-func (*RuntimeX) SetAppCycle(ctx *context.T, appCycle veyron2.AppCycle) *context.T {
+// SetAppCycle attaches an appCycle to the context.
+func (r *RuntimeX) SetAppCycle(ctx *context.T, appCycle veyron2.AppCycle) *context.T {
return context.WithValue(ctx, appCycleKey, appCycle)
}
diff --git a/runtimes/google/rt/runtimex_test.go b/runtimes/google/rt/runtimex_test.go
index a78ad3a..6beb75d 100644
--- a/runtimes/google/rt/runtimex_test.go
+++ b/runtimes/google/rt/runtimex_test.go
@@ -3,6 +3,7 @@
import (
"testing"
+ "v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron/runtimes/google/rt"
@@ -10,34 +11,32 @@
)
// InitForTest creates a context for use in a test.
-func InitForTest(t *testing.T) (*context.T, context.CancelFunc) {
+func InitForTest(t *testing.T) (*rt.RuntimeX, *context.T, veyron2.Shutdown) {
+ ctx, cancel := context.WithCancel(nil)
r := &rt.RuntimeX{}
- var ctx *context.T
- var cancel context.CancelFunc
- var err error
- ctx, cancel = context.WithCancel(ctx)
- ctx, err = r.Init(ctx, nil)
+ ctx, shutdown, err := r.Init(ctx, nil)
if err != nil {
t.Fatal(err)
}
- return ctx, cancel
+ return r, ctx, func() {
+ cancel()
+ shutdown()
+ }
}
func TestNewServer(t *testing.T) {
- ctx, cancel := InitForTest(t)
- defer cancel()
+ r, ctx, shutdown := InitForTest(t)
+ defer shutdown()
- r := &rt.RuntimeX{}
if s, err := r.NewServer(ctx); err != nil || s == nil {
t.Fatalf("Could not create server: %v", err)
}
}
func TestStreamManager(t *testing.T) {
- ctx, cancel := InitForTest(t)
- defer cancel()
+ r, ctx, shutdown := InitForTest(t)
+ defer shutdown()
- r := &rt.RuntimeX{}
orig := r.GetStreamManager(ctx)
c2, sm, err := r.SetNewStreamManager(ctx)
@@ -56,10 +55,8 @@
}
func TestPrincipal(t *testing.T) {
- ctx, cancel := InitForTest(t)
- defer cancel()
-
- r := &rt.RuntimeX{}
+ r, ctx, shutdown := InitForTest(t)
+ defer shutdown()
p2, err := security.NewPrincipal()
if err != nil {
@@ -78,10 +75,9 @@
}
func TestClient(t *testing.T) {
- ctx, cancel := InitForTest(t)
- defer cancel()
+ r, ctx, shutdown := InitForTest(t)
+ defer shutdown()
- r := &rt.RuntimeX{}
orig := r.GetClient(ctx)
c2, client, err := r.SetNewClient(ctx)
@@ -100,10 +96,9 @@
}
func TestNamespace(t *testing.T) {
- ctx, cancel := InitForTest(t)
- defer cancel()
+ r, ctx, shutdown := InitForTest(t)
+ defer shutdown()
- r := &rt.RuntimeX{}
orig := r.GetNamespace(ctx)
newroots := []string{"/newroot1", "/newroot2"}
diff --git a/runtimes/google/rt/signal.go b/runtimes/google/rt/signal.go
index 8ca4643..da7e1db 100644
--- a/runtimes/google/rt/signal.go
+++ b/runtimes/google/rt/signal.go
@@ -20,11 +20,16 @@
signal.Notify(r.signals, syscall.SIGHUP)
go func() {
for {
- vlog.Infof("Received signal %v", <-r.signals)
+ sig, ok := <-r.signals
+ if !ok {
+ break
+ }
+ vlog.Infof("Received signal %v", sig)
}
}()
}
func (r *vrt) shutdownSignalHandling() {
signal.Stop(r.signals)
+ close(r.signals)
}