veyron2: move the AppCycle implementation out of the runtime.

- this change moves AppCycle out of the core veyron2.Runtime
interface, this in turn both reduces the code+dependencies
in the runtime and allows the AppCycle implementation to use
stubs.
- profiles provide an appropriate AppCycle server to the runtime,
which will then create a server if appropriate to allow remote
access
- this unifies 'shutdown/cleanup' for profiles, the runtime
and the AppCycle.

Change-Id: Ie013dfa5d116b3fbe85a3544c01e383223b4b7b5
diff --git a/lib/appcycle/appcycle.go b/lib/appcycle/appcycle.go
new file mode 100644
index 0000000..3dd0004
--- /dev/null
+++ b/lib/appcycle/appcycle.go
@@ -0,0 +1,141 @@
+package appcycle
+
+import (
+	"fmt"
+	"os"
+	"sync"
+
+	"veyron.io/veyron/veyron2"
+	"veyron.io/veyron/veyron2/ipc"
+	stub "veyron.io/veyron/veyron2/services/mgmt/appcycle"
+)
+
+type AppCycle struct {
+	sync.RWMutex
+	waiters      []chan<- string
+	taskTrackers []chan<- veyron2.Task
+	task         veyron2.Task
+	shutDown     bool
+	rt           veyron2.Runtime
+	disp         *invoker
+}
+
+type invoker struct {
+	ac *AppCycle
+}
+
+func New(rt veyron2.Runtime) *AppCycle {
+	ac := &AppCycle{rt: rt}
+	ac.disp = &invoker{ac}
+	return ac
+}
+
+func (m *AppCycle) Shutdown() {
+	m.Lock()
+	defer m.Unlock()
+	if m.shutDown {
+		return
+	}
+	m.shutDown = true
+	for _, t := range m.taskTrackers {
+		close(t)
+	}
+	m.taskTrackers = nil
+}
+
+func (m *AppCycle) stop(msg string) {
+	m.RLock()
+	defer m.RUnlock()
+	if len(m.waiters) == 0 {
+		os.Exit(veyron2.UnhandledStopExitCode)
+	}
+	for _, w := range m.waiters {
+		select {
+		case w <- msg:
+		default:
+		}
+	}
+}
+
+func (m *AppCycle) Stop() {
+	m.stop(veyron2.LocalStop)
+}
+
+func (*AppCycle) ForceStop() {
+	os.Exit(veyron2.ForceStopExitCode)
+}
+
+func (m *AppCycle) WaitForStop(ch chan<- string) {
+	m.Lock()
+	defer m.Unlock()
+	m.waiters = append(m.waiters, ch)
+}
+
+func (m *AppCycle) TrackTask(ch chan<- veyron2.Task) {
+	m.Lock()
+	defer m.Unlock()
+	if m.shutDown {
+		close(ch)
+		return
+	}
+	m.taskTrackers = append(m.taskTrackers, ch)
+}
+
+func (m *AppCycle) advanceTask(progress, goal int32) {
+	m.Lock()
+	defer m.Unlock()
+	m.task.Goal += goal
+	m.task.Progress += progress
+	for _, t := range m.taskTrackers {
+		select {
+		case t <- m.task:
+		default:
+			// TODO(caprita): Make it such that the latest task
+			// update is always added to the channel even if channel
+			// is full.  One way is to pull an element from t and
+			// then re-try the push.
+		}
+	}
+}
+
+func (m *AppCycle) AdvanceGoal(delta int32) {
+	if delta <= 0 {
+		return
+	}
+	m.advanceTask(0, delta)
+}
+
+func (m *AppCycle) AdvanceProgress(delta int32) {
+	if delta <= 0 {
+		return
+	}
+	m.advanceTask(delta, 0)
+}
+
+func (m *AppCycle) Remote() interface{} {
+	return stub.AppCycleServer(m.disp)
+}
+
+func (d *invoker) Stop(ctx stub.AppCycleStopContext) error {
+	// The size of the channel should be reasonably sized to expect not to
+	// miss updates while we're waiting for the stream to unblock.
+	ch := make(chan veyron2.Task, 10)
+	d.ac.TrackTask(ch)
+	// TODO(caprita): Include identity of Stop issuer in message.
+	d.ac.stop(veyron2.RemoteStop)
+	for {
+		task, ok := <-ch
+		if !ok {
+			// Channel closed, meaning process shutdown is imminent.
+			break
+		}
+		actask := stub.Task{Progress: task.Progress, Goal: task.Goal}
+		ctx.SendStream().Send(actask)
+	}
+	return nil
+}
+
+func (d *invoker) ForceStop(ipc.ServerContext) error {
+	d.ac.ForceStop()
+	return fmt.Errorf("ForceStop should not reply as the process should be dead")
+}
diff --git a/lib/signals/signals.go b/lib/signals/signals.go
index 36bf90b..e24e1c3 100644
--- a/lib/signals/signals.go
+++ b/lib/signals/signals.go
@@ -47,7 +47,7 @@
 				sawStop = true
 				if r := rt.R(); r != nil {
 					stopWaiter := make(chan string, 1)
-					r.WaitForStop(stopWaiter)
+					r.AppCycle().WaitForStop(stopWaiter)
 					go func() {
 						for {
 							ch <- stopSignal(<-stopWaiter)
diff --git a/lib/signals/signals_test.go b/lib/signals/signals_test.go
index 4aff7fe..e101277 100644
--- a/lib/signals/signals_test.go
+++ b/lib/signals/signals_test.go
@@ -49,7 +49,7 @@
 			close(ch)
 			return
 		case "stop":
-			rt.R().Stop()
+			rt.R().AppCycle().Stop()
 		}
 	}
 }
diff --git a/profiles/chrome/chrome.go b/profiles/chrome/chrome.go
index 22b5beb..cfead4e 100644
--- a/profiles/chrome/chrome.go
+++ b/profiles/chrome/chrome.go
@@ -39,11 +39,13 @@
 	return p
 }
 
-func (g *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) error {
-	rt.Logger().VI(1).Infof("%s", g)
-	return nil
+func (c *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
+	rt.Logger().VI(1).Infof("%s", c)
+	return nil, nil
 }
 
-func (g *chrome) String() string {
-	return "chrome profile on " + g.Platform().String()
+func (*chrome) Cleanup() {}
+
+func (c *chrome) String() string {
+	return "chrome profile on " + c.Platform().String()
 }
diff --git a/profiles/doc.go b/profiles/doc.go
index bee7e06..635df9d 100644
--- a/profiles/doc.go
+++ b/profiles/doc.go
@@ -17,17 +17,12 @@
 // registration.
 //
 // This top level directory contains a 'generic' Profile and utility routines
-// used by other Profiles. It does not follow the convention of registering
-// itself via its Init function, since the expected use is that it will
-// used automatically as a default by the Runtime. Instead it provides a New
-// function. This avoids the need for every main package to import
-// "veyron.io/veyron/veyron/profiles", instead, only more specific Profiles must be so imported.
+// used by other Profiles. It should be imported whenever possible and
+// particularly by tests.
 //
-// The 'net' Profile adds operating system support for varied network
+// The 'roaming' Profile adds operating system support for varied network
 // configurations and in particular dhcp. It should be used by any application
-// that may 'roam' or any may be behind a 1-1 NAT.
+// that may 'roam' or any may be behind a 1-1 NAT. The 'static' profile
+// does not provide dhcp support, but is otherwise like the roaming profile.
 //
-// The 'net/bluetooth' Profile adds operating system support for bluetooth
-// networking.
-// TODO(cnicolaou,ashankar): add this
 package profiles
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index b5afa26..b0143a7 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -14,6 +14,7 @@
 	"veyron.io/veyron/veyron2/ipc"
 	"veyron.io/veyron/veyron2/rt"
 
+	"veyron.io/veyron/veyron/lib/appcycle"
 	"veyron.io/veyron/veyron/lib/flags"
 	"veyron.io/veyron/veyron/lib/netstate"
 	"veyron.io/veyron/veyron/profiles"
@@ -35,7 +36,9 @@
 	rt.RegisterProfile(&profile{})
 }
 
-type profile struct{}
+type profile struct {
+	ac *appcycle.AppCycle
+}
 
 func (p *profile) Name() string {
 	return "GCE"
@@ -54,17 +57,22 @@
 	return "net " + p.Platform().String()
 }
 
-func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) error {
+func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) (veyron2.AppCycle, error) {
 	if !gce.RunningOnGCE() {
-		return fmt.Errorf("GCE profile used on a non-GCE system")
+		return nil, fmt.Errorf("GCE profile used on a non-GCE system")
 	}
+	p.ac = appcycle.New(rt)
 	ListenSpec.Address = listenAddressFlag.String()
 	if ip, err := gce.ExternalIPAddress(); err != nil {
-		return err
+		return p.ac, err
 	} else {
 		ListenSpec.AddressChooser = func(network string, addrs []ipc.Address) ([]ipc.Address, error) {
 			return []ipc.Address{&netstate.AddrIfc{&net.IPAddr{IP: ip}, "gce-nat", nil}}, nil
 		}
 	}
-	return nil
+	return p.ac, nil
+}
+
+func (p *profile) Cleanup() {
+	p.ac.Shutdown()
 }
diff --git a/profiles/generic.go b/profiles/generic.go
index 9aadc25..96ddcb7 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -6,6 +6,7 @@
 	"veyron.io/veyron/veyron2/ipc"
 	"veyron.io/veyron/veyron2/rt"
 
+	"veyron.io/veyron/veyron/lib/appcycle"
 	"veyron.io/veyron/veyron/profiles/internal"
 	_ "veyron.io/veyron/veyron/runtimes/google/rt"
 )
@@ -17,7 +18,7 @@
 	AddressChooser: internal.IPAddressChooser,
 }
 
-type generic struct{}
+type generic struct{ ac *appcycle.AppCycle }
 
 var _ veyron2.Profile = (*generic)(nil)
 
@@ -44,9 +45,14 @@
 	return p
 }
 
-func (g *generic) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+func (g *generic) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
 	rt.Logger().VI(1).Infof("%s", g)
-	return nil
+	g.ac = appcycle.New(rt)
+	return g.ac, nil
+}
+
+func (g *generic) Cleanup() {
+	g.ac.Shutdown()
 }
 
 func (g *generic) String() string {
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index d4c0262..837a5b8 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -18,6 +18,7 @@
 	"veyron.io/veyron/veyron2/ipc"
 	"veyron.io/veyron/veyron2/rt"
 
+	"veyron.io/veyron/veyron/lib/appcycle"
 	"veyron.io/veyron/veyron/lib/flags"
 	"veyron.io/veyron/veyron/lib/netconfig"
 	"veyron.io/veyron/veyron/lib/netstate"
@@ -45,7 +46,9 @@
 }
 
 type profile struct {
-	gce string
+	gce                  string
+	ac                   *appcycle.AppCycle
+	cleanupCh, watcherCh chan struct{}
 }
 
 func New() veyron2.Profile {
@@ -69,7 +72,7 @@
 	return p.Name() + " " + p.Platform().String()
 }
 
-func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) error {
+func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) (veyron2.AppCycle, error) {
 	log := rt.Logger()
 
 	rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
@@ -81,6 +84,8 @@
 		Proxy:    lf.ListenProxy,
 	}
 
+	p.ac = appcycle.New(rt)
+
 	// Our address is private, so we test for running on GCE and for its
 	// 1:1 NAT configuration.
 	if !internal.HasPublicIP(log) {
@@ -89,7 +94,7 @@
 				return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
 			}
 			p.gce = "+gce"
-			return nil
+			return p.ac, nil
 		}
 	}
 
@@ -99,40 +104,57 @@
 	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamName, ch)
 	if err != nil {
 		log.Errorf("failed to create publisher: %s", err)
-		return err
+		p.ac.Shutdown()
+		return nil, err
 	}
 
-	ListenSpec.StreamPublisher = publisher
-	ListenSpec.StreamName = SettingsStreamName
-	ListenSpec.AddressChooser = internal.IPAddressChooser
-	go monitorNetworkSettings(rt, stop, ch, ListenSpec)
-	return nil
-}
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettings(rt veyron2.Runtime, stop <-chan struct{},
-	ch chan<- config.Setting, listenSpec ipc.ListenSpec) {
-	defer close(ch)
-
-	log := rt.Logger()
 	prev, err := netstate.GetAccessibleIPs()
 	if err != nil {
-		// TODO(cnicolaou): add support for shutting down profiles
-		//<-stop
 		log.VI(2).Infof("failed to determine network state")
-		return
+		p.ac.Shutdown()
+		return nil, err
 	}
 
 	// Start the dhcp watcher.
 	watcher, err := netconfig.NewNetConfigWatcher()
 	if err != nil {
 		log.VI(2).Infof("Failed to get new config watcher: %s", err)
-		// TODO(cnicolaou): add support for shutting down profiles
-		//<-stop
-		return
+		p.ac.Shutdown()
+		return nil, err
 	}
 
+	p.cleanupCh = make(chan struct{})
+	p.watcherCh = make(chan struct{})
+
+	ListenSpec.StreamPublisher = publisher
+	ListenSpec.StreamName = SettingsStreamName
+	ListenSpec.AddressChooser = internal.IPAddressChooser
+
+	go monitorNetworkSettings(rt, watcher, prev, stop, p.cleanupCh, p.watcherCh, ch, ListenSpec)
+	return p.ac, nil
+}
+
+func (p *profile) Cleanup() {
+	if p.cleanupCh != nil {
+		close(p.cleanupCh)
+	}
+	if p.ac != nil {
+		p.ac.Shutdown()
+	}
+	if p.watcherCh != nil {
+		<-p.watcherCh
+	}
+}
+
+// monitorNetworkSettings will monitor network configuration changes and
+// publish subsequent Settings to reflect any changes detected.
+func monitorNetworkSettings(rt veyron2.Runtime, watcher netconfig.NetConfigWatcher, prev netstate.AddrList, pubStop, cleanup <-chan struct{},
+	watcherLoop chan<- struct{}, ch chan<- config.Setting, listenSpec ipc.ListenSpec) {
+	defer close(ch)
+
+	log := rt.Logger()
+
+done:
 	for {
 		select {
 		case <-watcher.Channel():
@@ -160,9 +182,12 @@
 				ch <- ipc.NewAddAddrsSetting(chosen)
 			}
 			prev = cur
-			// TODO(cnicolaou): add support for shutting down profiles.
-			//case <-stop:
-			//	return
+		case <-cleanup:
+			break done
+		case <-pubStop:
+			goto done
 		}
 	}
+	watcher.Stop()
+	close(watcherLoop)
 }
diff --git a/profiles/static/static.go b/profiles/static/static.go
index 377874a..2720f1d 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -11,6 +11,7 @@
 	"veyron.io/veyron/veyron2/ipc"
 	"veyron.io/veyron/veyron2/rt"
 
+	"veyron.io/veyron/veyron/lib/appcycle"
 	"veyron.io/veyron/veyron/lib/flags"
 	"veyron.io/veyron/veyron/lib/netstate"
 	"veyron.io/veyron/veyron/profiles"
@@ -34,6 +35,7 @@
 
 type static struct {
 	gce string
+	ac  *appcycle.AppCycle
 }
 
 // New returns a new instance of a very static Profile. It can be used
@@ -55,7 +57,7 @@
 	return p
 }
 
-func (p *static) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+func (p *static) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
 	log := rt.Logger()
 
 	rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
@@ -67,6 +69,8 @@
 		Proxy:    lf.ListenProxy,
 	}
 
+	p.ac = appcycle.New(rt)
+
 	// Our address is private, so we test for running on GCE and for its
 	// 1:1 NAT configuration. GCEPublicAddress returns a non-nil addr
 	// if we are indeed running on GCE.
@@ -76,11 +80,17 @@
 				return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
 			}
 			p.gce = "+gce"
-			return nil
+			return p.ac, nil
 		}
 	}
 	ListenSpec.AddressChooser = internal.IPAddressChooser
-	return nil
+	return p.ac, nil
+}
+
+func (p *static) Cleanup() {
+	if p.ac != nil {
+		p.ac.Shutdown()
+	}
 }
 
 func (p *static) String() string {
diff --git a/runtimes/google/appcycle/appcycle.go b/runtimes/google/appcycle/appcycle.go
deleted file mode 100644
index 5c3e4de..0000000
--- a/runtimes/google/appcycle/appcycle.go
+++ /dev/null
@@ -1,81 +0,0 @@
-// Package appcycle is a stripped-down server stub implementation for the
-// AppCycle service.  We can't use the generated stub under
-// veyron2/services/mgmt/appcycle because that would introduce a recursive
-// dependency on veyron/runtimes/google (via veyron2/rt).
-//
-// TODO(caprita): It would be nice to still use the stub if possible.  Look
-// into the feasibility of a generated stub that does not depend on veyron2/rt.
-package appcycle
-
-import (
-	"veyron.io/veyron/veyron2"
-	"veyron.io/veyron/veyron2/ipc"
-)
-
-// AppCycleServerMethods is the interface a server writer
-// implements for AppCycle.
-//
-// AppCycle interfaces with the process running a veyron runtime.
-type AppCycleServerMethods interface {
-	// Stop initiates shutdown of the server.  It streams back periodic
-	// updates to give the client an idea of how the shutdown is
-	// progressing.
-	Stop(AppCycleStopContext) error
-	// ForceStop tells the server to shut down right away.  It can be issued
-	// while a Stop is outstanding if for example the client does not want
-	// to wait any longer.
-	ForceStop(ipc.ServerContext) error
-}
-
-// AppCycleServer returns a server stub for AppCycle.
-// It converts an implementation of AppCycleServerMethods into
-// an object that may be used by ipc.Server.
-func AppCycleServer(impl AppCycleServerMethods) AppCycleServerStub {
-	return AppCycleServerStub{impl}
-}
-
-type AppCycleServerStub struct {
-	impl AppCycleServerMethods
-}
-
-func (s AppCycleServerStub) Stop(ctx *AppCycleStopContextStub) error {
-	return s.impl.Stop(ctx)
-}
-
-func (s AppCycleServerStub) ForceStop(call ipc.ServerCall) error {
-	return s.impl.ForceStop(call)
-}
-
-// AppCycleStopContext represents the context passed to AppCycle.Stop.
-type AppCycleStopContext interface {
-	ipc.ServerContext
-	// SendStream returns the send side of the server stream.
-	SendStream() interface {
-		// Send places the item onto the output stream.  Returns errors encountered
-		// while sending.  Blocks if there is no buffer space; will unblock when
-		// buffer space is available.
-		Send(item veyron2.Task) error
-	}
-}
-
-type AppCycleStopContextStub struct {
-	ipc.ServerCall
-}
-
-func (s *AppCycleStopContextStub) Init(call ipc.ServerCall) {
-	s.ServerCall = call
-}
-
-func (s *AppCycleStopContextStub) SendStream() interface {
-	Send(item veyron2.Task) error
-} {
-	return implAppCycleStopContextSend{s}
-}
-
-type implAppCycleStopContextSend struct {
-	s *AppCycleStopContextStub
-}
-
-func (s implAppCycleStopContextSend) Send(item veyron2.Task) error {
-	return s.s.Send(item)
-}
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index 1d61739..44e52e4 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -2,8 +2,6 @@
 
 import (
 	"fmt"
-	"os"
-	"sync"
 	"time"
 
 	"veyron.io/veyron/veyron2"
@@ -13,17 +11,51 @@
 	"veyron.io/veyron/veyron2/options"
 
 	"veyron.io/veyron/veyron/lib/exec"
-	"veyron.io/veyron/veyron/runtimes/google/appcycle"
 )
 
-type mgmtImpl struct {
-	sync.RWMutex
-	waiters      []chan<- string
-	taskTrackers []chan<- veyron2.Task
-	task         veyron2.Task
-	shutDown     bool
-	rt           *vrt
-	server       ipc.Server // Serves AppCycle service.
+// TODO(cnicolaou,caprita): move this all out of the runtime when we
+// refactor the profiles/runtime interface.
+func (rt *vrt) initMgmt(appCycle veyron2.AppCycle, handle *exec.ChildHandle) (ipc.Server, error) {
+	// Do not initialize the mgmt runtime if the process has not
+	// been started through the veyron exec library by a node
+	// manager.
+	if handle == nil {
+		return nil, nil
+	}
+	parentName, err := handle.Config.Get(mgmt.ParentNameConfigKey)
+	if err != nil {
+		return nil, nil
+	}
+	listenSpec, err := getListenSpec(handle)
+	if err != nil {
+		return nil, err
+	}
+	var serverOpts []ipc.ServerOpt
+	parentPeerPattern, err := handle.Config.Get(mgmt.ParentBlessingConfigKey)
+	if err == nil && parentPeerPattern != "" {
+		// Grab the blessing from our blessing store that the parent
+		// told us to use so they can talk to us.
+		serverBlessing := rt.Principal().BlessingStore().ForPeer(parentPeerPattern)
+		serverOpts = append(serverOpts, options.ServerBlessings{serverBlessing})
+	}
+	server, err := rt.NewServer(serverOpts...)
+	if err != nil {
+		return nil, err
+	}
+	ep, err := server.Listen(*listenSpec)
+	if err != nil {
+		return nil, err
+	}
+	if err := server.Serve("", appCycle.Remote(), nil); err != nil {
+		server.Stop()
+		return nil, err
+	}
+	err = rt.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
+	if err != nil {
+		server.Stop()
+		return nil, err
+	}
+	return server, nil
 }
 
 func getListenSpec(handle *exec.ChildHandle) (*ipc.ListenSpec, error) {
@@ -45,47 +77,9 @@
 	return &ipc.ListenSpec{Protocol: protocol, Address: address}, nil
 }
 
-func (m *mgmtImpl) initMgmt(rt *vrt, handle *exec.ChildHandle) error {
-	// Do not initialize the mgmt runtime if the process has not
-	// been started through the veyron exec library by a node
-	// manager.
-	if handle == nil {
-		return nil
-	}
-	parentName, err := handle.Config.Get(mgmt.ParentNameConfigKey)
-	if err != nil {
-		return nil
-	}
-	listenSpec, err := getListenSpec(handle)
-	if err != nil {
-		return err
-	}
-	var serverOpts []ipc.ServerOpt
-	parentPeerPattern, err := handle.Config.Get(mgmt.ParentBlessingConfigKey)
-	if err == nil && parentPeerPattern != "" {
-		// Grab the blessing from our blessing store that the parent
-		// told us to use so they can talk to us.
-		serverBlessing := rt.Principal().BlessingStore().ForPeer(parentPeerPattern)
-		serverOpts = append(serverOpts, options.ServerBlessings{serverBlessing})
-	}
-	m.rt = rt
-	m.server, err = rt.NewServer(serverOpts...)
-	if err != nil {
-		return err
-	}
-	ep, err := m.server.Listen(*listenSpec)
-	if err != nil {
-		return err
-	}
-	if err := m.server.Serve("", appcycle.AppCycleServer(m), nil); err != nil {
-		return err
-	}
-	return m.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
-}
-
-func (m *mgmtImpl) callbackToParent(parentName, myName string) error {
-	ctx, _ := m.rt.NewContext().WithTimeout(10 * time.Second)
-	call, err := m.rt.Client().StartCall(ctx, parentName, "Set", []interface{}{mgmt.AppCycleManagerConfigKey, myName})
+func (rt *vrt) callbackToParent(parentName, myName string) error {
+	ctx, _ := rt.NewContext().WithTimeout(10 * time.Second)
+	call, err := rt.Client().StartCall(ctx, parentName, "Set", []interface{}{mgmt.AppCycleManagerConfigKey, myName})
 	if err != nil {
 		return err
 	}
@@ -94,113 +88,3 @@
 	}
 	return err
 }
-
-func (m *mgmtImpl) shutdown() {
-	m.Lock()
-	if m.shutDown {
-		m.Unlock()
-		return
-	}
-	m.shutDown = true
-	for _, t := range m.taskTrackers {
-		close(t)
-	}
-	m.taskTrackers = nil
-	server := m.server
-	m.Unlock()
-	if server != nil {
-		server.Stop()
-	}
-}
-
-func (rt *vrt) stop(msg string) {
-	rt.mgmt.RLock()
-	defer rt.mgmt.RUnlock()
-	if len(rt.mgmt.waiters) == 0 {
-		os.Exit(veyron2.UnhandledStopExitCode)
-	}
-	for _, w := range rt.mgmt.waiters {
-		select {
-		case w <- msg:
-		default:
-		}
-	}
-}
-
-func (rt *vrt) Stop() {
-	rt.stop(veyron2.LocalStop)
-}
-
-func (*vrt) ForceStop() {
-	os.Exit(veyron2.ForceStopExitCode)
-}
-
-func (rt *vrt) WaitForStop(ch chan<- string) {
-	rt.mgmt.Lock()
-	defer rt.mgmt.Unlock()
-	rt.mgmt.waiters = append(rt.mgmt.waiters, ch)
-}
-
-func (rt *vrt) TrackTask(ch chan<- veyron2.Task) {
-	rt.mgmt.Lock()
-	defer rt.mgmt.Unlock()
-	if rt.mgmt.shutDown {
-		close(ch)
-		return
-	}
-	rt.mgmt.taskTrackers = append(rt.mgmt.taskTrackers, ch)
-}
-
-func (rt *vrt) advanceTask(progress, goal int) {
-	rt.mgmt.Lock()
-	defer rt.mgmt.Unlock()
-	rt.mgmt.task.Goal += goal
-	rt.mgmt.task.Progress += progress
-	for _, t := range rt.mgmt.taskTrackers {
-		select {
-		case t <- rt.mgmt.task:
-		default:
-			// TODO(caprita): Make it such that the latest task
-			// update is always added to the channel even if channel
-			// is full.  One way is to pull an element from t and
-			// then re-try the push.
-		}
-	}
-}
-
-func (rt *vrt) AdvanceGoal(delta int) {
-	if delta <= 0 {
-		return
-	}
-	rt.advanceTask(0, delta)
-}
-
-func (rt *vrt) AdvanceProgress(delta int) {
-	if delta <= 0 {
-		return
-	}
-	rt.advanceTask(delta, 0)
-}
-
-func (m *mgmtImpl) Stop(ctx appcycle.AppCycleStopContext) error {
-	// The size of the channel should be reasonably sized to expect not to
-	// miss updates while we're waiting for the stream to unblock.
-	ch := make(chan veyron2.Task, 10)
-	m.rt.TrackTask(ch)
-	// TODO(caprita): Include identity of Stop issuer in message.
-	m.rt.stop(veyron2.RemoteStop)
-	for {
-		task, ok := <-ch
-		if !ok {
-			// Channel closed, meaning process shutdown is imminent.
-			break
-		}
-		ctx.SendStream().Send(task)
-	}
-	return nil
-}
-
-func (m *mgmtImpl) ForceStop(ipc.ServerContext) error {
-	m.rt.ForceStop()
-	return fmt.Errorf("ForceStop should not reply as the process should be dead")
-}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index fc39919..cd3db2a 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -45,7 +45,8 @@
 // TestBasic verifies that the basic plumbing works: LocalStop calls result in
 // stop messages being sent on the channel passed to WaitForStop.
 func TestBasic(t *testing.T) {
-	m, _ := rt.New(profileOpt)
+	r, _ := rt.New(profileOpt)
+	m := r.AppCycle()
 	ch := make(chan string, 1)
 	m.WaitForStop(ch)
 	for i := 0; i < 10; i++ {
@@ -64,7 +65,8 @@
 // TestMultipleWaiters verifies that the plumbing works with more than one
 // registered wait channel.
 func TestMultipleWaiters(t *testing.T) {
-	m, _ := rt.New(profileOpt)
+	r, _ := rt.New(profileOpt)
+	m := r.AppCycle()
 	ch1 := make(chan string, 1)
 	m.WaitForStop(ch1)
 	ch2 := make(chan string, 1)
@@ -84,7 +86,8 @@
 // channel is not being drained: once the channel's buffer fills up, future
 // Stops become no-ops.
 func TestMultipleStops(t *testing.T) {
-	m, _ := rt.New(profileOpt)
+	r, _ := rt.New(profileOpt)
+	m := r.AppCycle()
 	ch := make(chan string, 1)
 	m.WaitForStop(ch)
 	for i := 0; i < 10; i++ {
@@ -101,7 +104,8 @@
 }
 
 func noWaiters(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
-	m, _ := rt.New(profileOpt)
+	r, _ := rt.New(profileOpt)
+	m := r.AppCycle()
 	fmt.Fprintf(stdout, "ready\n")
 	modules.WaitForEOF(stdin)
 	m.Stop()
@@ -126,7 +130,8 @@
 }
 
 func forceStop(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
-	m, _ := rt.New(profileOpt)
+	r, _ := rt.New(profileOpt)
+	m := r.AppCycle()
 	fmt.Fprintf(stdout, "ready\n")
 	modules.WaitForEOF(stdin)
 	m.WaitForStop(make(chan string, 1))
@@ -153,7 +158,7 @@
 	}
 }
 
-func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int) {
+func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int32) {
 	if want, got := (veyron2.Task{progress, goal}), <-ch; !reflect.DeepEqual(want, got) {
 		t.Errorf("Unexpected progress: want %+v, got %+v", want, got)
 	}
@@ -170,7 +175,8 @@
 // TestProgress verifies that the ticker update/track logic works for a single
 // tracker.
 func TestProgress(t *testing.T) {
-	m, _ := rt.New(profileOpt)
+	r, _ := rt.New(profileOpt)
+	m := r.AppCycle()
 	m.AdvanceGoal(50)
 	ch := make(chan veyron2.Task, 1)
 	m.TrackTask(ch)
@@ -190,7 +196,7 @@
 	checkNoProgress(t, ch)
 	m.AdvanceGoal(0)
 	checkNoProgress(t, ch)
-	m.Cleanup()
+	r.Cleanup()
 	if _, ok := <-ch; ok {
 		t.Errorf("Expected channel to be closed")
 	}
@@ -200,7 +206,8 @@
 // works for more than one tracker.  It also ensures that the runtime doesn't
 // block when the tracker channels are full.
 func TestProgressMultipleTrackers(t *testing.T) {
-	m, _ := rt.New(profileOpt)
+	r, _ := rt.New(profileOpt)
+	m := r.AppCycle()
 	// ch1 is 1-buffered, ch2 is 2-buffered.
 	ch1, ch2 := make(chan veyron2.Task, 1), make(chan veyron2.Task, 2)
 	m.TrackTask(ch1)
@@ -223,7 +230,7 @@
 	m.AdvanceGoal(4)
 	checkProgress(t, ch1, 11, 4)
 	checkProgress(t, ch2, 11, 4)
-	m.Cleanup()
+	r.Cleanup()
 	if _, ok := <-ch1; ok {
 		t.Errorf("Expected channel to be closed")
 	}
@@ -237,15 +244,16 @@
 	if err != nil {
 		return err
 	}
+	m := r.AppCycle()
 	defer r.Cleanup()
 	ch := make(chan string, 1)
-	r.WaitForStop(ch)
+	m.WaitForStop(ch)
 	fmt.Fprintf(stdout, "Got %s\n", <-ch)
-	r.AdvanceGoal(10)
+	m.AdvanceGoal(10)
 	fmt.Fprintf(stdout, "Doing some work\n")
-	r.AdvanceProgress(2)
+	m.AdvanceProgress(2)
 	fmt.Fprintf(stdout, "Doing some more work\n")
-	r.AdvanceProgress(5)
+	m.AdvanceProgress(5)
 	return nil
 }
 
@@ -268,7 +276,6 @@
 		t.Fatalf("Got error: %v", err)
 	}
 	ch := make(chan string)
-
 	var ep naming.Endpoint
 	if ep, err = server.Listen(profiles.LocalListenSpec); err != nil {
 		t.Fatalf("Got error: %v", err)
@@ -277,7 +284,6 @@
 		t.Fatalf("Got error: %v", err)
 	}
 	return server, naming.JoinAddressName(ep.String(), ""), ch
-
 }
 
 func setupRemoteAppCycleMgr(t *testing.T) (veyron2.Runtime, modules.Handle, appcycle.AppCycleClientMethods, func()) {
@@ -298,8 +304,12 @@
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
-
-	appCycleName := <-ch
+	appCycleName := ""
+	select {
+	case appCycleName = <-ch:
+	case <-time.After(time.Minute):
+		t.Errorf("timeout")
+	}
 	appCycle := appcycle.AppCycleClient(appCycleName)
 	return r, h, appCycle, func() {
 		configServer.Stop()
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 8ff2225..ab635b2 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -30,8 +30,7 @@
 var errCleaningUp = fmt.Errorf("operation rejected: runtime is being cleaned up")
 
 type vrt struct {
-	mu sync.Mutex
-
+	mu                 sync.Mutex
 	profile            veyron2.Profile
 	publisher          *config.Publisher
 	sm                 []stream.Manager // GUARDED_BY(mu)
@@ -39,7 +38,8 @@
 	signals            chan os.Signal
 	principal          security.Principal
 	client             ipc.Client
-	mgmt               *mgmtImpl
+	ac                 veyron2.AppCycle
+	acServer           ipc.Server
 	flags              flags.RuntimeFlags
 	preferredProtocols options.PreferredProtocols
 	reservedDisp       ipc.Dispatcher
@@ -69,7 +69,6 @@
 	})
 	flags := runtimeFlags.RuntimeFlags()
 	rt := &vrt{
-		mgmt:       new(mgmtImpl),
 		lang:       i18n.LangIDFromEnv(),
 		program:    filepath.Base(os.Args[0]),
 		flags:      flags,
@@ -136,14 +135,14 @@
 	}
 
 	rt.publisher = config.NewPublisher()
-	if err := rt.profile.Init(rt, rt.publisher); err != nil {
+	if rt.ac, err = rt.profile.Init(rt, rt.publisher); err != nil {
 		return nil, err
 	}
-
-	if err := rt.mgmt.initMgmt(rt, handle); err != nil {
+	server, err := rt.initMgmt(rt.ac, handle)
+	if err != nil {
 		return nil, err
 	}
-
+	rt.acServer = server
 	vlog.VI(2).Infof("rt.Init done")
 	return rt, nil
 }
@@ -156,6 +155,10 @@
 	return rt.profile
 }
 
+func (rt *vrt) AppCycle() veyron2.AppCycle {
+	return rt.ac
+}
+
 func (rt *vrt) ConfigureReservedName(server ipc.Dispatcher, opts ...ipc.ServerOpt) {
 	rt.mu.Lock()
 	defer rt.mu.Unlock()
@@ -188,7 +191,11 @@
 	// TODO(caprita): Consider shutting down mgmt later in the runtime's
 	// shutdown sequence, to capture some of the runtime internal shutdown
 	// tasks in the task tracker.
-	rt.mgmt.shutdown()
+	rt.profile.Cleanup()
+	if rt.acServer != nil {
+		rt.acServer.Stop()
+	}
+
 	// It's ok to access rt.sm out of lock below, since a Mutex acts as a
 	// barrier in Go and hence we're guaranteed that cleaningUp is true at
 	// this point.  The only code that mutates rt.sm is NewStreamManager in
diff --git a/runtimes/google/rt/signal_test.go b/runtimes/google/rt/signal_test.go
index f478d6d..de2d98c 100644
--- a/runtimes/google/rt/signal_test.go
+++ b/runtimes/google/rt/signal_test.go
@@ -14,6 +14,7 @@
 	"veyron.io/veyron/veyron2/options"
 	"veyron.io/veyron/veyron2/rt"
 
+	"veyron.io/veyron/veyron/lib/appcycle"
 	"veyron.io/veyron/veyron/lib/expect"
 	"veyron.io/veyron/veyron/lib/modules"
 )
@@ -42,10 +43,12 @@
 	return "myprofile on " + mp.Platform().String()
 }
 
-func (mp *myprofile) Init(veyron2.Runtime, *config.Publisher) error {
-	return nil
+func (mp *myprofile) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
+	return appcycle.New(rt), nil
 }
 
+func (mp *myprofile) Cleanup() {}
+
 func simpleEchoProgram(stdin io.Reader, stdout io.Writer) {
 	fmt.Fprintf(stdout, "ready\n")
 	scanner := bufio.NewScanner(stdin)
diff --git a/runtimes/google/testing/mocks/runtime/panic_runtime.go b/runtimes/google/testing/mocks/runtime/panic_runtime.go
index 1c28e1d..d70852b 100644
--- a/runtimes/google/testing/mocks/runtime/panic_runtime.go
+++ b/runtimes/google/testing/mocks/runtime/panic_runtime.go
@@ -22,6 +22,7 @@
 const badRuntime = "The runtime implmentation should not call methods on runtime intances."
 
 func (*PanicRuntime) Profile() veyron2.Profile                               { panic(badRuntime) }
+func (*PanicRuntime) AppCycle() veyron2.AppCycle                             { panic(badRuntime) }
 func (*PanicRuntime) Publisher() *config.Publisher                           { panic(badRuntime) }
 func (*PanicRuntime) Principal() security.Principal                          { panic(badRuntime) }
 func (*PanicRuntime) NewClient(opts ...ipc.ClientOpt) (ipc.Client, error)    { panic(badRuntime) }
@@ -39,12 +40,6 @@
 func (*PanicRuntime) NewLogger(name string, opts ...vlog.LoggingOpts) (vlog.Logger, error) {
 	panic(badRuntime)
 }
-func (*PanicRuntime) Stop()                         { panic(badRuntime) }
-func (*PanicRuntime) ForceStop()                    { panic(badRuntime) }
-func (*PanicRuntime) WaitForStop(chan<- string)     { panic(badRuntime) }
-func (*PanicRuntime) AdvanceGoal(delta int)         { panic(badRuntime) }
-func (*PanicRuntime) AdvanceProgress(delta int)     { panic(badRuntime) }
-func (*PanicRuntime) TrackTask(chan<- veyron2.Task) { panic(badRuntime) }
 func (*PanicRuntime) ConfigureReservedName(ipc.Dispatcher, ...ipc.ServerOpt) {
 	panic(badRuntime)
 }
diff --git a/services/mgmt/node/impl/node_invoker.go b/services/mgmt/node/impl/node_invoker.go
index 51a35cc..eeed7a4 100644
--- a/services/mgmt/node/impl/node_invoker.go
+++ b/services/mgmt/node/impl/node_invoker.go
@@ -159,7 +159,7 @@
 	if err := updateLink(i.config.Previous, i.config.CurrentLink); err != nil {
 		return err
 	}
-	rt.R().Stop()
+	rt.R().AppCycle().Stop()
 	return nil
 }
 
@@ -376,7 +376,7 @@
 		return err
 	}
 
-	rt.R().Stop()
+	rt.R().AppCycle().Stop()
 	deferrer = nil
 	return nil
 }