veyron/services/mgmt/device: monitor app lifetimes

Watch applications invoked by the device manager and update their
status if the app asynchronously stops.

Change-Id: I720f8357a542efe875a4b9215800c5a701aaf1ef
diff --git a/services/mgmt/device/deviced/server.go b/services/mgmt/device/deviced/server.go
index 0127f2e..a431ebc 100644
--- a/services/mgmt/device/deviced/server.go
+++ b/services/mgmt/device/deviced/server.go
@@ -69,7 +69,13 @@
 		vlog.Errorf("NewServer() failed: %v", err)
 		return err
 	}
-	defer server.Stop()
+	var dispatcher ipc.Dispatcher
+	defer func() {
+		server.Stop()
+		if dispatcher != nil {
+			impl.Shutdown(dispatcher)
+		}
+	}()
 
 	// Bring up the device manager with the same address as the mounttable.
 	dmListenSpec := ls
@@ -99,11 +105,13 @@
 	// implementation detail).
 
 	var exitErr error
-	dispatcher, err := impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { exitErr = cmdline.ErrExitCode(*restartExitCode) })
+	dispatcher, err = impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { exitErr = cmdline.ErrExitCode(*restartExitCode) })
 	if err != nil {
 		vlog.Errorf("Failed to create dispatcher: %v", err)
 		return err
 	}
+	// Shutdown via dispatcher above.
+
 	dmPublishName := naming.Join(mtName, "devmgr")
 	if err := server.ServeDispatcher(dmPublishName, dispatcher); err != nil {
 		vlog.Errorf("Serve(%v) failed: %v", dmPublishName, err)
diff --git a/services/mgmt/device/impl/app_service.go b/services/mgmt/device/impl/app_service.go
index 8ef508c..7964778 100644
--- a/services/mgmt/device/impl/app_service.go
+++ b/services/mgmt/device/impl/app_service.go
@@ -209,6 +209,8 @@
 	securityAgent *securityAgentState
 	// mtAddress is the address of the local mounttable.
 	mtAddress string
+	// reap is the app process monitoring subsystem.
+	reap reaper
 }
 
 func saveEnvelope(dir string, envelope *application.Envelope) error {
@@ -766,10 +768,10 @@
 	return cmd, nil
 }
 
-func (i *appService) startCmd(instanceDir string, cmd *exec.Cmd) error {
+func (i *appService) startCmd(instanceDir string, cmd *exec.Cmd) (int, error) {
 	info, err := loadInstanceInfo(instanceDir)
 	if err != nil {
-		return err
+		return 0, err
 	}
 	// Setup up the child process callback.
 	callbackState := i.callback
@@ -780,11 +782,11 @@
 	installationDir, err := filepath.EvalSymlinks(installationLink)
 	if err != nil {
 		vlog.Errorf("EvalSymlinks(%v) failed: %v", installationLink, err)
-		return verror2.Make(ErrOperationFailed, nil)
+		return 0, verror2.Make(ErrOperationFailed, nil)
 	}
 	config, err := loadConfig(installationDir)
 	if err != nil {
-		return err
+		return 0, err
 	}
 	for k, v := range config {
 		cfg.Set(k, v)
@@ -801,7 +803,7 @@
 		file, err := sa.keyMgrAgent.NewConnection(info.SecurityAgentHandle)
 		if err != nil {
 			vlog.Errorf("NewConnection(%v) failed: %v", info.SecurityAgentHandle, err)
-			return err
+			return 0, err
 		}
 		agentCleaner = func() {
 			file.Close()
@@ -831,7 +833,7 @@
 			agentCleaner()
 		}
 		vlog.Errorf("Start() failed: %v", err)
-		return verror2.Make(ErrOperationFailed, nil)
+		return 0, verror2.Make(ErrOperationFailed, nil)
 	}
 	if agentCleaner != nil {
 		agentCleaner()
@@ -840,12 +842,12 @@
 	// Wait for the child process to start.
 	if err := handle.WaitForReady(childReadyTimeout); err != nil {
 		vlog.Errorf("WaitForReady(%v) failed: %v", childReadyTimeout, err)
-		return verror2.Make(ErrOperationFailed, nil)
+		return 0, verror2.Make(ErrOperationFailed, nil)
 	}
 	pid := handle.ChildPid()
 	childName, err := listener.waitForValue(childReadyTimeout)
 	if err != nil {
-		return verror2.Make(ErrOperationFailed, nil)
+		return 0, verror2.Make(ErrOperationFailed, nil)
 	}
 
 	// Because suidhelper uses Go's in-built support for setuid forking,
@@ -853,28 +855,31 @@
 	// so use the pid returned in the app's ready status.
 	info.AppCycleMgrName, info.Pid = childName, pid
 	if err := saveInstanceInfo(instanceDir, info); err != nil {
-		return err
+		return 0, err
 	}
-	// TODO(caprita): Spin up a goroutine to reap child status upon exit and
-	// transition it to suspended state if it exits on its own.
 	handle = nil
-	return nil
+	return pid, nil
 }
 
 func (i *appService) run(instanceDir, systemName string) error {
 	if err := transitionInstance(instanceDir, suspended, starting); err != nil {
 		return err
 	}
+	var pid int
 
 	cmd, err := genCmd(instanceDir, i.config.Helper, systemName, i.mtAddress)
 	if err == nil {
-		err = i.startCmd(instanceDir, cmd)
+		pid, err = i.startCmd(instanceDir, cmd)
 	}
 	if err != nil {
 		transitionInstance(instanceDir, starting, suspended)
 		return err
 	}
-	return transitionInstance(instanceDir, starting, started)
+	if err := transitionInstance(instanceDir, starting, started); err != nil {
+		return err
+	}
+	i.reap.startWatching(instanceDir, pid)
+	return nil
 }
 
 func (i *appService) Start(call ipc.ServerContext) ([]string, error) {
@@ -965,12 +970,16 @@
 	return nil
 }
 
-func stop(ctx *context.T, instanceDir string) error {
+func stop(ctx *context.T, instanceDir string, reap reaper) error {
 	info, err := loadInstanceInfo(instanceDir)
 	if err != nil {
 		return err
 	}
-	return stopAppRemotely(ctx, info.AppCycleMgrName)
+	err = stopAppRemotely(ctx, info.AppCycleMgrName)
+	if err == nil {
+		reap.stopWatching(instanceDir)
+	}
+	return err
 }
 
 // TODO(caprita): implement deadline for Stop.
@@ -986,7 +995,7 @@
 	if err := transitionInstance(instanceDir, started, stopping); err != nil {
 		return err
 	}
-	if err := stop(ctx.Context(), instanceDir); err != nil {
+	if err := stop(ctx.Context(), instanceDir, i.reap); err != nil {
 		transitionInstance(instanceDir, stopping, started)
 		return err
 	}
@@ -1001,7 +1010,7 @@
 	if err := transitionInstance(instanceDir, started, suspending); err != nil {
 		return err
 	}
-	if err := stop(ctx.Context(), instanceDir); err != nil {
+	if err := stop(ctx.Context(), instanceDir, i.reap); err != nil {
 		transitionInstance(instanceDir, suspending, started)
 		return err
 	}
diff --git a/services/mgmt/device/impl/dispatcher.go b/services/mgmt/device/impl/dispatcher.go
index a26933e..5f370a8 100644
--- a/services/mgmt/device/impl/dispatcher.go
+++ b/services/mgmt/device/impl/dispatcher.go
@@ -52,6 +52,8 @@
 	principal security.Principal
 	// Namespace
 	mtAddress string // The address of the local mounttable.
+	// reap is the app process monitoring subsystem.
+	reap reaper
 }
 
 var _ ipc.Dispatcher = (*dispatcher)(nil)
@@ -106,6 +108,7 @@
 		locks:     acls.NewLocks(),
 		principal: principal,
 		mtAddress: mtAddress,
+		reap:      newReaper(),
 	}
 
 	tam, err := flag.TaggedACLMapFromFlag()
@@ -133,6 +136,18 @@
 	return d, nil
 }
 
+// Shutdown the dispatcher.
+func Shutdown(ipcd ipc.Dispatcher) {
+	switch d := ipcd.(type) {
+	case *dispatcher:
+		d.reap.shutdown()
+	case *testModeDispatcher:
+		Shutdown(d.realDispatcher)
+	default:
+		vlog.Panicf("%v not a supported dispatcher type.", ipcd)
+	}
+}
+
 func (d *dispatcher) getACLDir() string {
 	return filepath.Join(d.config.Root, "device-manager", "device-data", "acls")
 }
@@ -280,6 +295,7 @@
 			locks:         d.locks,
 			securityAgent: d.internal.securityAgent,
 			mtAddress:     d.mtAddress,
+			reap:          d.reap,
 		})
 		appSpecificAuthorizer, err := newAppSpecificAuthorizer(auth, d.config, components[1:])
 		if err != nil {
diff --git a/services/mgmt/device/impl/impl_test.go b/services/mgmt/device/impl/impl_test.go
index 1aee70a..d522066 100644
--- a/services/mgmt/device/impl/impl_test.go
+++ b/services/mgmt/device/impl/impl_test.go
@@ -149,6 +149,13 @@
 	veyron2.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
 
 	server, endpoint := mgmttest.NewServer(ctx)
+	var dispatcher ipc.Dispatcher
+	defer func() {
+		server.Stop()
+		if dispatcher != nil {
+			impl.Shutdown(dispatcher)
+		}
+	}()
 	name := naming.JoinAddressName(endpoint, "")
 	vlog.VI(1).Infof("Device manager name: %v", name)
 
@@ -183,10 +190,12 @@
 
 	blessings := fmt.Sprint(veyron2.GetPrincipal(ctx).BlessingStore().Default())
 	testMode := strings.HasSuffix(blessings, "/testdm")
-	dispatcher, err := impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { fmt.Println("restart handler") })
+	dispatcher, err = impl.NewDispatcher(veyron2.GetPrincipal(ctx), configState, mtName, testMode, func() { fmt.Println("restart handler") })
 	if err != nil {
 		vlog.Fatalf("Failed to create device manager dispatcher: %v", err)
 	}
+	// dispatcher is shutdown by deferral above.
+
 	if err := server.ServeDispatcher(publishName, dispatcher); err != nil {
 		vlog.Fatalf("Serve(%v) failed: %v", publishName, err)
 	}
diff --git a/services/mgmt/device/impl/instance_reaping.go b/services/mgmt/device/impl/instance_reaping.go
new file mode 100644
index 0000000..46594b5
--- /dev/null
+++ b/services/mgmt/device/impl/instance_reaping.go
@@ -0,0 +1,104 @@
+package impl
+
+import (
+	"syscall"
+	"time"
+
+	"v.io/core/veyron2/vlog"
+)
+
+type pidInstanceDirPair struct {
+	instanceDir string
+	pid         int
+}
+
+type reaper chan pidInstanceDirPair
+
+// TODO(rjkroege): extend to permit initializing the set of watched
+// pids from an inspection of the file system.
+func newReaper() reaper {
+	c := make(reaper)
+	go processStatusPolling(c, make(map[string]int))
+	return c
+}
+
+func suspendTask(idir string) {
+	if err := transitionInstance(idir, started, suspended); err != nil {
+		// This may fail under two circumstances.
+		// 1. The app has crashed between where startCmd invokes
+		// startWatching and where the invoker sets the state to started.
+		// 2. Remove experiences a failure (e.g. filesystem becoming R/O)
+		vlog.Errorf("reaper transitionInstance(%v, %v, %v) failed: %v\n", idir, started, suspended, err)
+	}
+}
+
+// processStatusPolling polls for the continued existence of a set of tracked
+// pids.
+// TODO(rjkroege): There are nicer ways to provide this functionality.
+// For example, use the kevent facility in darwin or replace init.
+// See http://www.incenp.org/dvlpt/wait4.html for inspiration.
+func processStatusPolling(cq reaper, trackedPids map[string]int) {
+	poll := func() {
+		for idir, pid := range trackedPids {
+			switch err := syscall.Kill(pid, 0); err {
+			case syscall.ESRCH:
+				// No such PID.
+				vlog.VI(2).Infof("processStatusPolling discovered pid %d ended", pid)
+				suspendTask(idir)
+				delete(trackedPids, idir)
+			case nil, syscall.EPERM:
+				vlog.VI(2).Infof("processStatusPolling saw live pid: %d", pid)
+				// The task exists and is running under the same uid as
+				// the device manager or the task exists and is running
+				// under a different uid as would be the case if invoked
+				// via suidhelper. In this case do, nothing.
+
+				// This implementation cannot detect if a process exited
+				// and was replaced by an arbitrary non-Vanadium process
+				// within the polling interval.
+				// TODO(rjkroege): Consider probing the appcycle service of
+				// the pid to confirm.
+			default:
+				// The kill system call manpage says that this can only happen
+				// if the kernel claims that 0 is an invalid signal.
+				// Only a deeply confused kernel would say this so just give
+				// up.
+				vlog.Panicf("processStatusPolling: unanticpated result from sys.Kill: %v", err)
+			}
+		}
+	}
+
+	for {
+		select {
+		case p, ok := <-cq:
+			if !ok {
+				return
+			}
+			if p.pid < 0 {
+				delete(trackedPids, p.instanceDir)
+			} else {
+				trackedPids[p.instanceDir] = p.pid
+				poll()
+			}
+		case <-time.After(time.Second):
+			// Poll once / second.
+			poll()
+		}
+	}
+}
+
+// startWatching begins watching process pid's state. This routine
+// assumes that pid already exists. Since pid is delivered to the device
+// manager by RPC callback, this seems reasonable.
+func (r reaper) startWatching(idir string, pid int) {
+	r <- pidInstanceDirPair{instanceDir: idir, pid: pid}
+}
+
+// stopWatching stops watching process pid's state.
+func (r reaper) stopWatching(idir string) {
+	r <- pidInstanceDirPair{instanceDir: idir, pid: -1}
+}
+
+func (r reaper) shutdown() {
+	close(r)
+}
diff --git a/services/mgmt/device/impl/instance_reaping_test.go b/services/mgmt/device/impl/instance_reaping_test.go
new file mode 100644
index 0000000..5c1016a
--- /dev/null
+++ b/services/mgmt/device/impl/instance_reaping_test.go
@@ -0,0 +1,95 @@
+package impl_test
+
+import (
+	"syscall"
+	"testing"
+	//	"time"
+
+	"v.io/core/veyron2"
+	"v.io/core/veyron2/naming"
+	"v.io/core/veyron2/services/mgmt/stats"
+	verror "v.io/core/veyron2/verror2"
+
+	"v.io/core/veyron/lib/testutil"
+	"v.io/core/veyron/services/mgmt/device/impl"
+	mgmttest "v.io/core/veyron/services/mgmt/lib/testutil"
+)
+
+func TestReaperNoticesAppDeath(t *testing.T) {
+	ctx, shutdown := testutil.InitForTest()
+	defer shutdown()
+	veyron2.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
+
+	sh, deferFn := mgmttest.CreateShellAndMountTable(t, ctx, nil)
+	defer deferFn()
+
+	// Set up mock application and binary repositories.
+	envelope, cleanup := startMockRepos(t, ctx)
+	defer cleanup()
+
+	root, cleanup := mgmttest.SetupRootDir(t, "devicemanager")
+	defer cleanup()
+
+	// Create a script wrapping the test target that implements suidhelper.
+	helperPath := generateSuidHelperScript(t, root)
+
+	// Set up the device manager.  Since we won't do device manager updates,
+	// don't worry about its application envelope and current link.
+	dmh, dms := mgmttest.RunShellCommand(t, sh, nil, deviceManagerCmd, "dm", root, helperPath, "unused_app_repo_name", "unused_curr_link")
+	mgmttest.ReadPID(t, dms)
+
+	// Create the local server that the app uses to let us know it's ready.
+	pingCh, cleanup := setupPingServer(t, ctx)
+	defer cleanup()
+
+	resolve(t, ctx, "pingserver", 1)
+
+	// Create an envelope for a first version of the app.
+	*envelope = envelopeFromShell(sh, nil, appCmd, "google naps", "appV1")
+
+	// Install the app.  The config-specified flag value for testFlagName
+	// should override the value specified in the envelope above.
+	appID := installApp(t, ctx)
+
+	// Start requires the caller to grant a blessing for the app instance.
+	if _, err := startAppImpl(t, ctx, appID, ""); err == nil || !verror.Is(err, impl.ErrInvalidBlessing.ID) {
+		t.Fatalf("Start(%v) expected to fail with %v, got %v instead", appID, impl.ErrInvalidBlessing.ID, err)
+	}
+
+	// Start an instance of the app.
+	instance1ID := startApp(t, ctx, appID)
+
+	// Wait until the app pings us that it's ready.
+	verifyPingArgs(t, pingCh, userName(t), "default", "")
+
+	// Get application pid.
+	name := naming.Join("dm", "apps/"+appID+"/"+instance1ID+"/stats/system/pid")
+	c := stats.StatsClient(name)
+	v, err := c.Value(ctx)
+	if err != nil {
+		t.Fatalf("Value() failed: %v\n", err)
+	}
+	pid, ok := v.(int64)
+	if !ok {
+		t.Fatalf("pid returned from stats interface is not an int")
+	}
+
+	verifyAppState(t, root, appID, instance1ID, "started")
+	syscall.Kill(int(pid), 9)
+
+	// Start a second instance of the app which will force polling to happen.
+	instance2ID := startApp(t, ctx, appID)
+	verifyPingArgs(t, pingCh, userName(t), "default", "")
+
+	verifyAppState(t, root, appID, instance2ID, "started")
+
+	stopApp(t, ctx, appID, instance2ID)
+	verifyAppState(t, root, appID, instance1ID, "suspended")
+
+	// TODO(rjkroege): Exercise the polling loop code.
+
+	// Cleanly shut down the device manager.
+	syscall.Kill(dmh.Pid(), syscall.SIGINT)
+	dms.Expect("dm terminated")
+	dms.ExpectEOF()
+}