veyron/runtimes/google/rt: track StreamManagers created from the runtime, so
that we can call Shutdown on them at Cleanup() time.
Not calling Shutdown exposes us to potentially losing queued writes when the
process shuts down.
See also issue 4 in https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
Change-Id: I843edf1f6f3911ddc200028ebc4415659b27f1fe
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index a2150f6..093ab53 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -81,7 +81,9 @@
}
func (rt *vrt) NewClient(opts ...ipc.ClientOpt) (ipc.Client, error) {
- sm := rt.sm
+ rt.mu.Lock()
+ sm := rt.sm[0]
+ rt.mu.Unlock()
ns := rt.ns
var id security.PublicID
var otherOpts []ipc.ClientOpt
@@ -121,21 +123,26 @@
}
func (rt *vrt) NewServer(opts ...ipc.ServerOpt) (ipc.Server, error) {
- var err error
-
- // Create a new RoutingID (and StreamManager) for each server.
- // Except, in the common case of a process having a single Server,
- // use the same RoutingID (and StreamManager) that is used for Clients.
rt.mu.Lock()
- sm := rt.sm
+ // Create a new RoutingID (and StreamManager) for each server, except
+ // the first one. The reasoning is for the first server to share the
+ // RoutingID (and StreamManager) with Clients.
+ //
+ // TODO(ashankar/caprita): special-casing the first server is ugly, and
+ // has diminished practical benefits since the first server in practice
+ // is the app cycle manager server. If the goal of sharing connections
+ // between Clients and Servers in the same Runtime is still important,
+ // we need to think of other ways to achieve it.
+ sm := rt.sm[0]
rt.nServers++
- if rt.nServers > 1 {
- sm, err = rt.NewStreamManager()
- }
+ nServers := rt.nServers
rt.mu.Unlock()
-
- if err != nil {
- return nil, fmt.Errorf("failed to create ipc/stream/Manager: %v", err)
+ if nServers > 1 {
+ var err error
+ sm, err = rt.NewStreamManager()
+ if err != nil {
+ return nil, fmt.Errorf("failed to create ipc/stream/Manager: %v", err)
+ }
}
// Start the http debug server exactly once for this runtime.
rt.startHTTPDebugServerOnce()
@@ -166,5 +173,13 @@
}
sm := imanager.InternalNew(rid)
rt.debug.RegisterStreamManager(rid, sm)
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ if rt.cleaningUp {
+ sm.Shutdown() // For whatever it's worth.
+ // TODO(caprita): Should we also unregister sm from debug?
+ return nil, errCleaningUp
+ }
+ rt.sm = append(rt.sm, sm)
return sm, nil
}
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 1911cc3..0c59d42 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -20,20 +20,24 @@
"veyron.io/veyron/veyron/services/mgmt/lib/exec"
)
-type vrt struct {
- profile veyron2.Profile
- publisher *config.Publisher
- sm stream.Manager
- ns naming.Namespace
- signals chan os.Signal
- id security.PrivateID
- store security.PublicIDStore
- client ipc.Client
- mgmt *mgmtImpl
- debug debugServer
+// TODO(caprita): Verrorize this, and maybe move it in the API.
+var errCleaningUp = fmt.Errorf("operation rejected: runtime is being cleaned up")
- mu sync.Mutex
- nServers int // GUARDED_BY(mu)
+type vrt struct {
+ mu sync.Mutex
+
+ profile veyron2.Profile
+ publisher *config.Publisher
+ sm []stream.Manager // GUARDED_BY(mu)
+ ns naming.Namespace
+ signals chan os.Signal
+ id security.PrivateID
+ store security.PublicIDStore
+ client ipc.Client
+ mgmt *mgmtImpl
+ debug debugServer
+ nServers int // GUARDED_BY(mu)
+ cleaningUp bool // GUARDED_BY(mu)
}
// Implements veyron2/rt.New
@@ -99,15 +103,16 @@
}
vlog.VI(2).Infof("Namespace Roots: %s", nsRoots)
+ // Create the default stream manager.
+ if _, err := rt.NewStreamManager(); err != nil {
+ return nil, err
+ }
+
+ if err := rt.initSecurity(); err != nil {
+ return nil, err
+ }
+
var err error
- if rt.sm, err = rt.NewStreamManager(); err != nil {
- return nil, err
- }
-
- if err = rt.initSecurity(); err != nil {
- return nil, err
- }
-
if rt.client, err = rt.NewClient(); err != nil {
return nil, err
}
@@ -147,11 +152,36 @@
}
func (rt *vrt) Cleanup() {
+ rt.mu.Lock()
+ if rt.cleaningUp {
+ rt.mu.Unlock()
+ // TODO(caprita): Should we actually treat extra Cleanups as
+ // silent no-ops? For now, it's better not to, in order to
+ // expose programming errors.
+ vlog.Errorf("rt.Cleanup done")
+ return
+ }
+ rt.cleaningUp = true
+ rt.mu.Unlock()
+
// TODO(caprita): Consider shutting down mgmt later in the runtime's
// shutdown sequence, to capture some of the runtime internal shutdown
// tasks in the task tracker.
rt.mgmt.shutdown()
- rt.sm.Shutdown()
+ // It's ok to access rt.sm out of lock below, since a Mutex acts as a
+ // barrier in Go and hence we're guaranteed that cleaningUp is true at
+ // this point. The only code that mutates rt.sm is NewStreamManager in
+ // ipc.go, which respects the value of cleaningUp.
+ //
+ // TODO(caprita): It would be cleaner if we do something like this
+ // inside the lock (and then iterate over smgrs below):
+ // smgrs := rt.sm
+ // rt.sm = nil
+ // However, to make that work we need to prevent NewClient and NewServer
+ // from using rt.sm after cleaningUp has been set. Hence the TODO.
+ for _, sm := range rt.sm {
+ sm.Shutdown()
+ }
rt.shutdownSignalHandling()
rt.shutdownLogging()
}