veyron/services/mgmt/device: monitor app lifetimes
Watch applications invoked by the device manager and update their
status if the app asynchronously stops.
Change-Id: I720f8357a542efe875a4b9215800c5a701aaf1ef
diff --git a/services/mgmt/device/deviced/server.go b/services/mgmt/device/deviced/server.go
index 0127f2e..a431ebc 100644
--- a/services/mgmt/device/deviced/server.go
+++ b/services/mgmt/device/deviced/server.go
@@ -69,7 +69,13 @@
vlog.Errorf("NewServer() failed: %v", err)
return err
}
- defer server.Stop()
+ var dispatcher ipc.Dispatcher
+ defer func() {
+ server.Stop()
+ if dispatcher != nil {
+ impl.Shutdown(dispatcher)
+ }
+ }()
// Bring up the device manager with the same address as the mounttable.
dmListenSpec := ls
@@ -99,11 +105,13 @@
// implementation detail).
var exitErr error
- dispatcher, err := impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { exitErr = cmdline.ErrExitCode(*restartExitCode) })
+ dispatcher, err = impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { exitErr = cmdline.ErrExitCode(*restartExitCode) })
if err != nil {
vlog.Errorf("Failed to create dispatcher: %v", err)
return err
}
+ // Shutdown via dispatcher above.
+
dmPublishName := naming.Join(mtName, "devmgr")
if err := server.ServeDispatcher(dmPublishName, dispatcher); err != nil {
vlog.Errorf("Serve(%v) failed: %v", dmPublishName, err)
diff --git a/services/mgmt/device/impl/app_service.go b/services/mgmt/device/impl/app_service.go
index 8ef508c..7964778 100644
--- a/services/mgmt/device/impl/app_service.go
+++ b/services/mgmt/device/impl/app_service.go
@@ -209,6 +209,8 @@
securityAgent *securityAgentState
// mtAddress is the address of the local mounttable.
mtAddress string
+ // reap is the app process monitoring subsystem.
+ reap reaper
}
func saveEnvelope(dir string, envelope *application.Envelope) error {
@@ -766,10 +768,10 @@
return cmd, nil
}
-func (i *appService) startCmd(instanceDir string, cmd *exec.Cmd) error {
+func (i *appService) startCmd(instanceDir string, cmd *exec.Cmd) (int, error) {
info, err := loadInstanceInfo(instanceDir)
if err != nil {
- return err
+ return 0, err
}
// Setup up the child process callback.
callbackState := i.callback
@@ -780,11 +782,11 @@
installationDir, err := filepath.EvalSymlinks(installationLink)
if err != nil {
vlog.Errorf("EvalSymlinks(%v) failed: %v", installationLink, err)
- return verror2.Make(ErrOperationFailed, nil)
+ return 0, verror2.Make(ErrOperationFailed, nil)
}
config, err := loadConfig(installationDir)
if err != nil {
- return err
+ return 0, err
}
for k, v := range config {
cfg.Set(k, v)
@@ -801,7 +803,7 @@
file, err := sa.keyMgrAgent.NewConnection(info.SecurityAgentHandle)
if err != nil {
vlog.Errorf("NewConnection(%v) failed: %v", info.SecurityAgentHandle, err)
- return err
+ return 0, err
}
agentCleaner = func() {
file.Close()
@@ -831,7 +833,7 @@
agentCleaner()
}
vlog.Errorf("Start() failed: %v", err)
- return verror2.Make(ErrOperationFailed, nil)
+ return 0, verror2.Make(ErrOperationFailed, nil)
}
if agentCleaner != nil {
agentCleaner()
@@ -840,12 +842,12 @@
// Wait for the child process to start.
if err := handle.WaitForReady(childReadyTimeout); err != nil {
vlog.Errorf("WaitForReady(%v) failed: %v", childReadyTimeout, err)
- return verror2.Make(ErrOperationFailed, nil)
+ return 0, verror2.Make(ErrOperationFailed, nil)
}
pid := handle.ChildPid()
childName, err := listener.waitForValue(childReadyTimeout)
if err != nil {
- return verror2.Make(ErrOperationFailed, nil)
+ return 0, verror2.Make(ErrOperationFailed, nil)
}
// Because suidhelper uses Go's in-built support for setuid forking,
@@ -853,28 +855,31 @@
// so use the pid returned in the app's ready status.
info.AppCycleMgrName, info.Pid = childName, pid
if err := saveInstanceInfo(instanceDir, info); err != nil {
- return err
+ return 0, err
}
- // TODO(caprita): Spin up a goroutine to reap child status upon exit and
- // transition it to suspended state if it exits on its own.
handle = nil
- return nil
+ return pid, nil
}
func (i *appService) run(instanceDir, systemName string) error {
if err := transitionInstance(instanceDir, suspended, starting); err != nil {
return err
}
+ var pid int
cmd, err := genCmd(instanceDir, i.config.Helper, systemName, i.mtAddress)
if err == nil {
- err = i.startCmd(instanceDir, cmd)
+ pid, err = i.startCmd(instanceDir, cmd)
}
if err != nil {
transitionInstance(instanceDir, starting, suspended)
return err
}
- return transitionInstance(instanceDir, starting, started)
+ if err := transitionInstance(instanceDir, starting, started); err != nil {
+ return err
+ }
+ i.reap.startWatching(instanceDir, pid)
+ return nil
}
func (i *appService) Start(call ipc.ServerContext) ([]string, error) {
@@ -965,12 +970,16 @@
return nil
}
-func stop(ctx *context.T, instanceDir string) error {
+func stop(ctx *context.T, instanceDir string, reap reaper) error {
info, err := loadInstanceInfo(instanceDir)
if err != nil {
return err
}
- return stopAppRemotely(ctx, info.AppCycleMgrName)
+ err = stopAppRemotely(ctx, info.AppCycleMgrName)
+ if err == nil {
+ reap.stopWatching(instanceDir)
+ }
+ return err
}
// TODO(caprita): implement deadline for Stop.
@@ -986,7 +995,7 @@
if err := transitionInstance(instanceDir, started, stopping); err != nil {
return err
}
- if err := stop(ctx.Context(), instanceDir); err != nil {
+ if err := stop(ctx.Context(), instanceDir, i.reap); err != nil {
transitionInstance(instanceDir, stopping, started)
return err
}
@@ -1001,7 +1010,7 @@
if err := transitionInstance(instanceDir, started, suspending); err != nil {
return err
}
- if err := stop(ctx.Context(), instanceDir); err != nil {
+ if err := stop(ctx.Context(), instanceDir, i.reap); err != nil {
transitionInstance(instanceDir, suspending, started)
return err
}
diff --git a/services/mgmt/device/impl/dispatcher.go b/services/mgmt/device/impl/dispatcher.go
index a26933e..5f370a8 100644
--- a/services/mgmt/device/impl/dispatcher.go
+++ b/services/mgmt/device/impl/dispatcher.go
@@ -52,6 +52,8 @@
principal security.Principal
// Namespace
mtAddress string // The address of the local mounttable.
+ // reap is the app process monitoring subsystem.
+ reap reaper
}
var _ ipc.Dispatcher = (*dispatcher)(nil)
@@ -106,6 +108,7 @@
locks: acls.NewLocks(),
principal: principal,
mtAddress: mtAddress,
+ reap: newReaper(),
}
tam, err := flag.TaggedACLMapFromFlag()
@@ -133,6 +136,18 @@
return d, nil
}
+// Shutdown the dispatcher.
+func Shutdown(ipcd ipc.Dispatcher) {
+ switch d := ipcd.(type) {
+ case *dispatcher:
+ d.reap.shutdown()
+ case *testModeDispatcher:
+ Shutdown(d.realDispatcher)
+ default:
+ vlog.Panicf("%v not a supported dispatcher type.", ipcd)
+ }
+}
+
func (d *dispatcher) getACLDir() string {
return filepath.Join(d.config.Root, "device-manager", "device-data", "acls")
}
@@ -280,6 +295,7 @@
locks: d.locks,
securityAgent: d.internal.securityAgent,
mtAddress: d.mtAddress,
+ reap: d.reap,
})
appSpecificAuthorizer, err := newAppSpecificAuthorizer(auth, d.config, components[1:])
if err != nil {
diff --git a/services/mgmt/device/impl/impl_test.go b/services/mgmt/device/impl/impl_test.go
index 1aee70a..d522066 100644
--- a/services/mgmt/device/impl/impl_test.go
+++ b/services/mgmt/device/impl/impl_test.go
@@ -149,6 +149,13 @@
veyron2.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
server, endpoint := mgmttest.NewServer(ctx)
+ var dispatcher ipc.Dispatcher
+ defer func() {
+ server.Stop()
+ if dispatcher != nil {
+ impl.Shutdown(dispatcher)
+ }
+ }()
name := naming.JoinAddressName(endpoint, "")
vlog.VI(1).Infof("Device manager name: %v", name)
@@ -183,10 +190,12 @@
blessings := fmt.Sprint(veyron2.GetPrincipal(ctx).BlessingStore().Default())
testMode := strings.HasSuffix(blessings, "/testdm")
- dispatcher, err := impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { fmt.Println("restart handler") })
+ dispatcher, err = impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { fmt.Println("restart handler") })
if err != nil {
vlog.Fatalf("Failed to create device manager dispatcher: %v", err)
}
+ // dispatcher is shutdown by deferral above.
+
if err := server.ServeDispatcher(publishName, dispatcher); err != nil {
vlog.Fatalf("Serve(%v) failed: %v", publishName, err)
}
diff --git a/services/mgmt/device/impl/instance_reaping.go b/services/mgmt/device/impl/instance_reaping.go
new file mode 100644
index 0000000..46594b5
--- /dev/null
+++ b/services/mgmt/device/impl/instance_reaping.go
@@ -0,0 +1,104 @@
+package impl
+
+import (
+ "syscall"
+ "time"
+
+ "v.io/core/veyron2/vlog"
+)
+
+type pidInstanceDirPair struct {
+ instanceDir string
+ pid int
+}
+
+type reaper chan pidInstanceDirPair
+
+// TODO(rjkroege): extend to permit initializing the set of watched
+// pids from an inspection of the file system.
+func newReaper() reaper {
+ c := make(reaper)
+ go processStatusPolling(c, make(map[string]int))
+ return c
+}
+
+func suspendTask(idir string) {
+ if err := transitionInstance(idir, started, suspended); err != nil {
+ // This may fail under two circumstances.
+ // 1. The app has crashed between where startCmd invokes
+ // startWatching and where the invoker sets the state to started.
+ // 2. Remove experiences a failure (e.g. filesystem becoming R/O)
+ vlog.Errorf("reaper transitionInstance(%v, %v, %v) failed: %v\n", idir, started, suspended, err)
+ }
+}
+
+// processStatusPolling polls for the continued existence of a set of tracked
+// pids.
+// TODO(rjkroege): There are nicer ways to provide this functionality.
+// For example, use the kevent facility in darwin or replace init.
+// See http://www.incenp.org/dvlpt/wait4.html for inspiration.
+func processStatusPolling(cq reaper, trackedPids map[string]int) {
+ poll := func() {
+ for idir, pid := range trackedPids {
+ switch err := syscall.Kill(pid, 0); err {
+ case syscall.ESRCH:
+ // No such PID.
+ vlog.VI(2).Infof("processStatusPolling discovered pid %d ended", pid)
+ suspendTask(idir)
+ delete(trackedPids, idir)
+ case nil, syscall.EPERM:
+ vlog.VI(2).Infof("processStatusPolling saw live pid: %d", pid)
+ // The task exists and is running under the same uid as
+ // the device manager or the task exists and is running
+ // under a different uid as would be the case if invoked
+ // via suidhelper. In this case do, nothing.
+
+ // This implementation cannot detect if a process exited
+ // and was replaced by an arbitrary non-Vanadium process
+ // within the polling interval.
+ // TODO(rjkroege): Consider probing the appcycle service of
+ // the pid to confirm.
+ default:
+ // The kill system call manpage says that this can only happen
+ // if the kernel claims that 0 is an invalid signal.
+ // Only a deeply confused kernel would say this so just give
+ // up.
+ vlog.Panicf("processStatusPolling: unanticpated result from sys.Kill: %v", err)
+ }
+ }
+ }
+
+ for {
+ select {
+ case p, ok := <-cq:
+ if !ok {
+ return
+ }
+ if p.pid < 0 {
+ delete(trackedPids, p.instanceDir)
+ } else {
+ trackedPids[p.instanceDir] = p.pid
+ poll()
+ }
+ case <-time.After(time.Second):
+ // Poll once / second.
+ poll()
+ }
+ }
+}
+
+// startWatching begins watching process pid's state. This routine
+// assumes that pid already exists. Since pid is delivered to the device
+// manager by RPC callback, this seems reasonable.
+func (r reaper) startWatching(idir string, pid int) {
+ r <- pidInstanceDirPair{instanceDir: idir, pid: pid}
+}
+
+// stopWatching stops watching process pid's state.
+func (r reaper) stopWatching(idir string) {
+ r <- pidInstanceDirPair{instanceDir: idir, pid: -1}
+}
+
+func (r reaper) shutdown() {
+ close(r)
+}
diff --git a/services/mgmt/device/impl/instance_reaping_test.go b/services/mgmt/device/impl/instance_reaping_test.go
new file mode 100644
index 0000000..5c1016a
--- /dev/null
+++ b/services/mgmt/device/impl/instance_reaping_test.go
@@ -0,0 +1,95 @@
+package impl_test
+
+import (
+ "syscall"
+ "testing"
+ // "time"
+
+ "v.io/core/veyron2"
+ "v.io/core/veyron2/naming"
+ "v.io/core/veyron2/services/mgmt/stats"
+ verror "v.io/core/veyron2/verror2"
+
+ "v.io/core/veyron/lib/testutil"
+ "v.io/core/veyron/services/mgmt/device/impl"
+ mgmttest "v.io/core/veyron/services/mgmt/lib/testutil"
+)
+
+func TestReaperNoticesAppDeath(t *testing.T) {
+ ctx, shutdown := testutil.InitForTest()
+ defer shutdown()
+ veyron2.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
+
+ sh, deferFn := mgmttest.CreateShellAndMountTable(t, ctx, nil)
+ defer deferFn()
+
+ // Set up mock application and binary repositories.
+ envelope, cleanup := startMockRepos(t, ctx)
+ defer cleanup()
+
+ root, cleanup := mgmttest.SetupRootDir(t, "devicemanager")
+ defer cleanup()
+
+ // Create a script wrapping the test target that implements suidhelper.
+ helperPath := generateSuidHelperScript(t, root)
+
+ // Set up the device manager. Since we won't do device manager updates,
+ // don't worry about its application envelope and current link.
+ dmh, dms := mgmttest.RunShellCommand(t, sh, nil, deviceManagerCmd, "dm", root, helperPath, "unused_app_repo_name", "unused_curr_link")
+ mgmttest.ReadPID(t, dms)
+
+ // Create the local server that the app uses to let us know it's ready.
+ pingCh, cleanup := setupPingServer(t, ctx)
+ defer cleanup()
+
+ resolve(t, ctx, "pingserver", 1)
+
+ // Create an envelope for a first version of the app.
+ *envelope = envelopeFromShell(sh, nil, appCmd, "google naps", "appV1")
+
+ // Install the app. The config-specified flag value for testFlagName
+ // should override the value specified in the envelope above.
+ appID := installApp(t, ctx)
+
+ // Start requires the caller to grant a blessing for the app instance.
+ if _, err := startAppImpl(t, ctx, appID, ""); err == nil || !verror.Is(err, impl.ErrInvalidBlessing.ID) {
+ t.Fatalf("Start(%v) expected to fail with %v, got %v instead", appID, impl.ErrInvalidBlessing.ID, err)
+ }
+
+ // Start an instance of the app.
+ instance1ID := startApp(t, ctx, appID)
+
+ // Wait until the app pings us that it's ready.
+ verifyPingArgs(t, pingCh, userName(t), "default", "")
+
+ // Get application pid.
+ name := naming.Join("dm", "apps/"+appID+"/"+instance1ID+"/stats/system/pid")
+ c := stats.StatsClient(name)
+ v, err := c.Value(ctx)
+ if err != nil {
+ t.Fatalf("Value() failed: %v\n", err)
+ }
+ pid, ok := v.(int64)
+ if !ok {
+ t.Fatalf("pid returned from stats interface is not an int")
+ }
+
+ verifyAppState(t, root, appID, instance1ID, "started")
+ syscall.Kill(int(pid), 9)
+
+ // Start a second instance of the app which will force polling to happen.
+ instance2ID := startApp(t, ctx, appID)
+ verifyPingArgs(t, pingCh, userName(t), "default", "")
+
+ verifyAppState(t, root, appID, instance2ID, "started")
+
+ stopApp(t, ctx, appID, instance2ID)
+ verifyAppState(t, root, appID, instance1ID, "suspended")
+
+ // TODO(rjkroege): Exercise the polling loop code.
+
+ // Cleanly shut down the device manager.
+ syscall.Kill(dmh.Pid(), syscall.SIGINT)
+ dms.Expect("dm terminated")
+ dms.ExpectEOF()
+}