veyron2: move the AppCycle implementation out of the runtime.
- this change moves AppCycle out of the core veyron2.Runtime
interface, this in turn both reduces the code+dependencies
in the runtime and allows the AppCycle implementation to use
stubs.
- profiles provide an appropriate AppCycle server to the runtime,
which will then create a server if appropriate to allow remote
access
- this unifies 'shutdown/cleanup' for profiles, the runtime
and the AppCycle.
Change-Id: Ie013dfa5d116b3fbe85a3544c01e383223b4b7b5
diff --git a/lib/appcycle/appcycle.go b/lib/appcycle/appcycle.go
new file mode 100644
index 0000000..3dd0004
--- /dev/null
+++ b/lib/appcycle/appcycle.go
@@ -0,0 +1,141 @@
+package appcycle
+
+import (
+ "fmt"
+ "os"
+ "sync"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/ipc"
+ stub "veyron.io/veyron/veyron2/services/mgmt/appcycle"
+)
+
+type AppCycle struct {
+ sync.RWMutex
+ waiters []chan<- string
+ taskTrackers []chan<- veyron2.Task
+ task veyron2.Task
+ shutDown bool
+ rt veyron2.Runtime
+ disp *invoker
+}
+
+type invoker struct {
+ ac *AppCycle
+}
+
+func New(rt veyron2.Runtime) *AppCycle {
+ ac := &AppCycle{rt: rt}
+ ac.disp = &invoker{ac}
+ return ac
+}
+
+func (m *AppCycle) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+ if m.shutDown {
+ return
+ }
+ m.shutDown = true
+ for _, t := range m.taskTrackers {
+ close(t)
+ }
+ m.taskTrackers = nil
+}
+
+func (m *AppCycle) stop(msg string) {
+ m.RLock()
+ defer m.RUnlock()
+ if len(m.waiters) == 0 {
+ os.Exit(veyron2.UnhandledStopExitCode)
+ }
+ for _, w := range m.waiters {
+ select {
+ case w <- msg:
+ default:
+ }
+ }
+}
+
+func (m *AppCycle) Stop() {
+ m.stop(veyron2.LocalStop)
+}
+
+func (*AppCycle) ForceStop() {
+ os.Exit(veyron2.ForceStopExitCode)
+}
+
+func (m *AppCycle) WaitForStop(ch chan<- string) {
+ m.Lock()
+ defer m.Unlock()
+ m.waiters = append(m.waiters, ch)
+}
+
+func (m *AppCycle) TrackTask(ch chan<- veyron2.Task) {
+ m.Lock()
+ defer m.Unlock()
+ if m.shutDown {
+ close(ch)
+ return
+ }
+ m.taskTrackers = append(m.taskTrackers, ch)
+}
+
+func (m *AppCycle) advanceTask(progress, goal int32) {
+ m.Lock()
+ defer m.Unlock()
+ m.task.Goal += goal
+ m.task.Progress += progress
+ for _, t := range m.taskTrackers {
+ select {
+ case t <- m.task:
+ default:
+ // TODO(caprita): Make it such that the latest task
+ // update is always added to the channel even if channel
+ // is full. One way is to pull an element from t and
+ // then re-try the push.
+ }
+ }
+}
+
+func (m *AppCycle) AdvanceGoal(delta int32) {
+ if delta <= 0 {
+ return
+ }
+ m.advanceTask(0, delta)
+}
+
+func (m *AppCycle) AdvanceProgress(delta int32) {
+ if delta <= 0 {
+ return
+ }
+ m.advanceTask(delta, 0)
+}
+
+func (m *AppCycle) Remote() interface{} {
+ return stub.AppCycleServer(m.disp)
+}
+
+func (d *invoker) Stop(ctx stub.AppCycleStopContext) error {
+ // The size of the channel should be reasonably sized to expect not to
+ // miss updates while we're waiting for the stream to unblock.
+ ch := make(chan veyron2.Task, 10)
+ d.ac.TrackTask(ch)
+ // TODO(caprita): Include identity of Stop issuer in message.
+ d.ac.stop(veyron2.RemoteStop)
+ for {
+ task, ok := <-ch
+ if !ok {
+ // Channel closed, meaning process shutdown is imminent.
+ break
+ }
+ actask := stub.Task{Progress: task.Progress, Goal: task.Goal}
+ ctx.SendStream().Send(actask)
+ }
+ return nil
+}
+
+func (d *invoker) ForceStop(ipc.ServerContext) error {
+ d.ac.ForceStop()
+ return fmt.Errorf("ForceStop should not reply as the process should be dead")
+}
diff --git a/lib/signals/signals.go b/lib/signals/signals.go
index 36bf90b..e24e1c3 100644
--- a/lib/signals/signals.go
+++ b/lib/signals/signals.go
@@ -47,7 +47,7 @@
sawStop = true
if r := rt.R(); r != nil {
stopWaiter := make(chan string, 1)
- r.WaitForStop(stopWaiter)
+ r.AppCycle().WaitForStop(stopWaiter)
go func() {
for {
ch <- stopSignal(<-stopWaiter)
diff --git a/lib/signals/signals_test.go b/lib/signals/signals_test.go
index 4aff7fe..e101277 100644
--- a/lib/signals/signals_test.go
+++ b/lib/signals/signals_test.go
@@ -49,7 +49,7 @@
close(ch)
return
case "stop":
- rt.R().Stop()
+ rt.R().AppCycle().Stop()
}
}
}
diff --git a/profiles/chrome/chrome.go b/profiles/chrome/chrome.go
index 22b5beb..cfead4e 100644
--- a/profiles/chrome/chrome.go
+++ b/profiles/chrome/chrome.go
@@ -39,11 +39,13 @@
return p
}
-func (g *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) error {
- rt.Logger().VI(1).Infof("%s", g)
- return nil
+func (c *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
+ rt.Logger().VI(1).Infof("%s", c)
+ return nil, nil
}
-func (g *chrome) String() string {
- return "chrome profile on " + g.Platform().String()
+func (*chrome) Cleanup() {}
+
+func (c *chrome) String() string {
+ return "chrome profile on " + c.Platform().String()
}
diff --git a/profiles/doc.go b/profiles/doc.go
index bee7e06..635df9d 100644
--- a/profiles/doc.go
+++ b/profiles/doc.go
@@ -17,17 +17,12 @@
// registration.
//
// This top level directory contains a 'generic' Profile and utility routines
-// used by other Profiles. It does not follow the convention of registering
-// itself via its Init function, since the expected use is that it will
-// used automatically as a default by the Runtime. Instead it provides a New
-// function. This avoids the need for every main package to import
-// "veyron.io/veyron/veyron/profiles", instead, only more specific Profiles must be so imported.
+// used by other Profiles. It should be imported whenever possible and
+// particularly by tests.
//
-// The 'net' Profile adds operating system support for varied network
+// The 'roaming' Profile adds operating system support for varied network
// configurations and in particular dhcp. It should be used by any application
-// that may 'roam' or any may be behind a 1-1 NAT.
+// that may 'roam' or any may be behind a 1-1 NAT. The 'static' profile
+// does not provide dhcp support, but is otherwise like the roaming profile.
//
-// The 'net/bluetooth' Profile adds operating system support for bluetooth
-// networking.
-// TODO(cnicolaou,ashankar): add this
package profiles
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index b5afa26..b0143a7 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -14,6 +14,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netstate"
"veyron.io/veyron/veyron/profiles"
@@ -35,7 +36,9 @@
rt.RegisterProfile(&profile{})
}
-type profile struct{}
+type profile struct {
+ ac *appcycle.AppCycle
+}
func (p *profile) Name() string {
return "GCE"
@@ -54,17 +57,22 @@
return "net " + p.Platform().String()
}
-func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) error {
+func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) (veyron2.AppCycle, error) {
if !gce.RunningOnGCE() {
- return fmt.Errorf("GCE profile used on a non-GCE system")
+ return nil, fmt.Errorf("GCE profile used on a non-GCE system")
}
+ p.ac = appcycle.New(rt)
ListenSpec.Address = listenAddressFlag.String()
if ip, err := gce.ExternalIPAddress(); err != nil {
- return err
+ return p.ac, err
} else {
ListenSpec.AddressChooser = func(network string, addrs []ipc.Address) ([]ipc.Address, error) {
return []ipc.Address{&netstate.AddrIfc{&net.IPAddr{IP: ip}, "gce-nat", nil}}, nil
}
}
- return nil
+ return p.ac, nil
+}
+
+func (p *profile) Cleanup() {
+ p.ac.Shutdown()
}
diff --git a/profiles/generic.go b/profiles/generic.go
index 9aadc25..96ddcb7 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -6,6 +6,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/profiles/internal"
_ "veyron.io/veyron/veyron/runtimes/google/rt"
)
@@ -17,7 +18,7 @@
AddressChooser: internal.IPAddressChooser,
}
-type generic struct{}
+type generic struct{ ac *appcycle.AppCycle }
var _ veyron2.Profile = (*generic)(nil)
@@ -44,9 +45,14 @@
return p
}
-func (g *generic) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+func (g *generic) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
rt.Logger().VI(1).Infof("%s", g)
- return nil
+ g.ac = appcycle.New(rt)
+ return g.ac, nil
+}
+
+func (g *generic) Cleanup() {
+ g.ac.Shutdown()
}
func (g *generic) String() string {
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index d4c0262..837a5b8 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -18,6 +18,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netconfig"
"veyron.io/veyron/veyron/lib/netstate"
@@ -45,7 +46,9 @@
}
type profile struct {
- gce string
+ gce string
+ ac *appcycle.AppCycle
+ cleanupCh, watcherCh chan struct{}
}
func New() veyron2.Profile {
@@ -69,7 +72,7 @@
return p.Name() + " " + p.Platform().String()
}
-func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) error {
+func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) (veyron2.AppCycle, error) {
log := rt.Logger()
rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
@@ -81,6 +84,8 @@
Proxy: lf.ListenProxy,
}
+ p.ac = appcycle.New(rt)
+
// Our address is private, so we test for running on GCE and for its
// 1:1 NAT configuration.
if !internal.HasPublicIP(log) {
@@ -89,7 +94,7 @@
return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
}
p.gce = "+gce"
- return nil
+ return p.ac, nil
}
}
@@ -99,40 +104,57 @@
stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamName, ch)
if err != nil {
log.Errorf("failed to create publisher: %s", err)
- return err
+ p.ac.Shutdown()
+ return nil, err
}
- ListenSpec.StreamPublisher = publisher
- ListenSpec.StreamName = SettingsStreamName
- ListenSpec.AddressChooser = internal.IPAddressChooser
- go monitorNetworkSettings(rt, stop, ch, ListenSpec)
- return nil
-}
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettings(rt veyron2.Runtime, stop <-chan struct{},
- ch chan<- config.Setting, listenSpec ipc.ListenSpec) {
- defer close(ch)
-
- log := rt.Logger()
prev, err := netstate.GetAccessibleIPs()
if err != nil {
- // TODO(cnicolaou): add support for shutting down profiles
- //<-stop
log.VI(2).Infof("failed to determine network state")
- return
+ p.ac.Shutdown()
+ return nil, err
}
// Start the dhcp watcher.
watcher, err := netconfig.NewNetConfigWatcher()
if err != nil {
log.VI(2).Infof("Failed to get new config watcher: %s", err)
- // TODO(cnicolaou): add support for shutting down profiles
- //<-stop
- return
+ p.ac.Shutdown()
+ return nil, err
}
+ p.cleanupCh = make(chan struct{})
+ p.watcherCh = make(chan struct{})
+
+ ListenSpec.StreamPublisher = publisher
+ ListenSpec.StreamName = SettingsStreamName
+ ListenSpec.AddressChooser = internal.IPAddressChooser
+
+ go monitorNetworkSettings(rt, watcher, prev, stop, p.cleanupCh, p.watcherCh, ch, ListenSpec)
+ return p.ac, nil
+}
+
+func (p *profile) Cleanup() {
+ if p.cleanupCh != nil {
+ close(p.cleanupCh)
+ }
+ if p.ac != nil {
+ p.ac.Shutdown()
+ }
+ if p.watcherCh != nil {
+ <-p.watcherCh
+ }
+}
+
+// monitorNetworkSettings will monitor network configuration changes and
+// publish subsequent Settings to reflect any changes detected.
+func monitorNetworkSettings(rt veyron2.Runtime, watcher netconfig.NetConfigWatcher, prev netstate.AddrList, pubStop, cleanup <-chan struct{},
+ watcherLoop chan<- struct{}, ch chan<- config.Setting, listenSpec ipc.ListenSpec) {
+ defer close(ch)
+
+ log := rt.Logger()
+
+done:
for {
select {
case <-watcher.Channel():
@@ -160,9 +182,12 @@
ch <- ipc.NewAddAddrsSetting(chosen)
}
prev = cur
- // TODO(cnicolaou): add support for shutting down profiles.
- //case <-stop:
- // return
+ case <-cleanup:
+ break done
+ case <-pubStop:
+ goto done
}
}
+ watcher.Stop()
+ close(watcherLoop)
}
diff --git a/profiles/static/static.go b/profiles/static/static.go
index 377874a..2720f1d 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -11,6 +11,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netstate"
"veyron.io/veyron/veyron/profiles"
@@ -34,6 +35,7 @@
type static struct {
gce string
+ ac *appcycle.AppCycle
}
// New returns a new instance of a very static Profile. It can be used
@@ -55,7 +57,7 @@
return p
}
-func (p *static) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+func (p *static) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
log := rt.Logger()
rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
@@ -67,6 +69,8 @@
Proxy: lf.ListenProxy,
}
+ p.ac = appcycle.New(rt)
+
// Our address is private, so we test for running on GCE and for its
// 1:1 NAT configuration. GCEPublicAddress returns a non-nil addr
// if we are indeed running on GCE.
@@ -76,11 +80,17 @@
return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
}
p.gce = "+gce"
- return nil
+ return p.ac, nil
}
}
ListenSpec.AddressChooser = internal.IPAddressChooser
- return nil
+ return p.ac, nil
+}
+
+func (p *static) Cleanup() {
+ if p.ac != nil {
+ p.ac.Shutdown()
+ }
}
func (p *static) String() string {
diff --git a/runtimes/google/appcycle/appcycle.go b/runtimes/google/appcycle/appcycle.go
deleted file mode 100644
index 5c3e4de..0000000
--- a/runtimes/google/appcycle/appcycle.go
+++ /dev/null
@@ -1,81 +0,0 @@
-// Package appcycle is a stripped-down server stub implementation for the
-// AppCycle service. We can't use the generated stub under
-// veyron2/services/mgmt/appcycle because that would introduce a recursive
-// dependency on veyron/runtimes/google (via veyron2/rt).
-//
-// TODO(caprita): It would be nice to still use the stub if possible. Look
-// into the feasibility of a generated stub that does not depend on veyron2/rt.
-package appcycle
-
-import (
- "veyron.io/veyron/veyron2"
- "veyron.io/veyron/veyron2/ipc"
-)
-
-// AppCycleServerMethods is the interface a server writer
-// implements for AppCycle.
-//
-// AppCycle interfaces with the process running a veyron runtime.
-type AppCycleServerMethods interface {
- // Stop initiates shutdown of the server. It streams back periodic
- // updates to give the client an idea of how the shutdown is
- // progressing.
- Stop(AppCycleStopContext) error
- // ForceStop tells the server to shut down right away. It can be issued
- // while a Stop is outstanding if for example the client does not want
- // to wait any longer.
- ForceStop(ipc.ServerContext) error
-}
-
-// AppCycleServer returns a server stub for AppCycle.
-// It converts an implementation of AppCycleServerMethods into
-// an object that may be used by ipc.Server.
-func AppCycleServer(impl AppCycleServerMethods) AppCycleServerStub {
- return AppCycleServerStub{impl}
-}
-
-type AppCycleServerStub struct {
- impl AppCycleServerMethods
-}
-
-func (s AppCycleServerStub) Stop(ctx *AppCycleStopContextStub) error {
- return s.impl.Stop(ctx)
-}
-
-func (s AppCycleServerStub) ForceStop(call ipc.ServerCall) error {
- return s.impl.ForceStop(call)
-}
-
-// AppCycleStopContext represents the context passed to AppCycle.Stop.
-type AppCycleStopContext interface {
- ipc.ServerContext
- // SendStream returns the send side of the server stream.
- SendStream() interface {
- // Send places the item onto the output stream. Returns errors encountered
- // while sending. Blocks if there is no buffer space; will unblock when
- // buffer space is available.
- Send(item veyron2.Task) error
- }
-}
-
-type AppCycleStopContextStub struct {
- ipc.ServerCall
-}
-
-func (s *AppCycleStopContextStub) Init(call ipc.ServerCall) {
- s.ServerCall = call
-}
-
-func (s *AppCycleStopContextStub) SendStream() interface {
- Send(item veyron2.Task) error
-} {
- return implAppCycleStopContextSend{s}
-}
-
-type implAppCycleStopContextSend struct {
- s *AppCycleStopContextStub
-}
-
-func (s implAppCycleStopContextSend) Send(item veyron2.Task) error {
- return s.s.Send(item)
-}
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index 1d61739..44e52e4 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -2,8 +2,6 @@
import (
"fmt"
- "os"
- "sync"
"time"
"veyron.io/veyron/veyron2"
@@ -13,17 +11,51 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron/lib/exec"
- "veyron.io/veyron/veyron/runtimes/google/appcycle"
)
-type mgmtImpl struct {
- sync.RWMutex
- waiters []chan<- string
- taskTrackers []chan<- veyron2.Task
- task veyron2.Task
- shutDown bool
- rt *vrt
- server ipc.Server // Serves AppCycle service.
+// TODO(cnicolaou,caprita): move this all out of the runtime when we
+// refactor the profiles/runtime interface.
+func (rt *vrt) initMgmt(appCycle veyron2.AppCycle, handle *exec.ChildHandle) (ipc.Server, error) {
+ // Do not initialize the mgmt runtime if the process has not
+ // been started through the veyron exec library by a node
+ // manager.
+ if handle == nil {
+ return nil, nil
+ }
+ parentName, err := handle.Config.Get(mgmt.ParentNameConfigKey)
+ if err != nil {
+ return nil, nil
+ }
+ listenSpec, err := getListenSpec(handle)
+ if err != nil {
+ return nil, err
+ }
+ var serverOpts []ipc.ServerOpt
+ parentPeerPattern, err := handle.Config.Get(mgmt.ParentBlessingConfigKey)
+ if err == nil && parentPeerPattern != "" {
+ // Grab the blessing from our blessing store that the parent
+ // told us to use so they can talk to us.
+ serverBlessing := rt.Principal().BlessingStore().ForPeer(parentPeerPattern)
+ serverOpts = append(serverOpts, options.ServerBlessings{serverBlessing})
+ }
+ server, err := rt.NewServer(serverOpts...)
+ if err != nil {
+ return nil, err
+ }
+ ep, err := server.Listen(*listenSpec)
+ if err != nil {
+ return nil, err
+ }
+ if err := server.Serve("", appCycle.Remote(), nil); err != nil {
+ server.Stop()
+ return nil, err
+ }
+ err = rt.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
+ if err != nil {
+ server.Stop()
+ return nil, err
+ }
+ return server, nil
}
func getListenSpec(handle *exec.ChildHandle) (*ipc.ListenSpec, error) {
@@ -45,47 +77,9 @@
return &ipc.ListenSpec{Protocol: protocol, Address: address}, nil
}
-func (m *mgmtImpl) initMgmt(rt *vrt, handle *exec.ChildHandle) error {
- // Do not initialize the mgmt runtime if the process has not
- // been started through the veyron exec library by a node
- // manager.
- if handle == nil {
- return nil
- }
- parentName, err := handle.Config.Get(mgmt.ParentNameConfigKey)
- if err != nil {
- return nil
- }
- listenSpec, err := getListenSpec(handle)
- if err != nil {
- return err
- }
- var serverOpts []ipc.ServerOpt
- parentPeerPattern, err := handle.Config.Get(mgmt.ParentBlessingConfigKey)
- if err == nil && parentPeerPattern != "" {
- // Grab the blessing from our blessing store that the parent
- // told us to use so they can talk to us.
- serverBlessing := rt.Principal().BlessingStore().ForPeer(parentPeerPattern)
- serverOpts = append(serverOpts, options.ServerBlessings{serverBlessing})
- }
- m.rt = rt
- m.server, err = rt.NewServer(serverOpts...)
- if err != nil {
- return err
- }
- ep, err := m.server.Listen(*listenSpec)
- if err != nil {
- return err
- }
- if err := m.server.Serve("", appcycle.AppCycleServer(m), nil); err != nil {
- return err
- }
- return m.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
-}
-
-func (m *mgmtImpl) callbackToParent(parentName, myName string) error {
- ctx, _ := m.rt.NewContext().WithTimeout(10 * time.Second)
- call, err := m.rt.Client().StartCall(ctx, parentName, "Set", []interface{}{mgmt.AppCycleManagerConfigKey, myName})
+func (rt *vrt) callbackToParent(parentName, myName string) error {
+ ctx, _ := rt.NewContext().WithTimeout(10 * time.Second)
+ call, err := rt.Client().StartCall(ctx, parentName, "Set", []interface{}{mgmt.AppCycleManagerConfigKey, myName})
if err != nil {
return err
}
@@ -94,113 +88,3 @@
}
return err
}
-
-func (m *mgmtImpl) shutdown() {
- m.Lock()
- if m.shutDown {
- m.Unlock()
- return
- }
- m.shutDown = true
- for _, t := range m.taskTrackers {
- close(t)
- }
- m.taskTrackers = nil
- server := m.server
- m.Unlock()
- if server != nil {
- server.Stop()
- }
-}
-
-func (rt *vrt) stop(msg string) {
- rt.mgmt.RLock()
- defer rt.mgmt.RUnlock()
- if len(rt.mgmt.waiters) == 0 {
- os.Exit(veyron2.UnhandledStopExitCode)
- }
- for _, w := range rt.mgmt.waiters {
- select {
- case w <- msg:
- default:
- }
- }
-}
-
-func (rt *vrt) Stop() {
- rt.stop(veyron2.LocalStop)
-}
-
-func (*vrt) ForceStop() {
- os.Exit(veyron2.ForceStopExitCode)
-}
-
-func (rt *vrt) WaitForStop(ch chan<- string) {
- rt.mgmt.Lock()
- defer rt.mgmt.Unlock()
- rt.mgmt.waiters = append(rt.mgmt.waiters, ch)
-}
-
-func (rt *vrt) TrackTask(ch chan<- veyron2.Task) {
- rt.mgmt.Lock()
- defer rt.mgmt.Unlock()
- if rt.mgmt.shutDown {
- close(ch)
- return
- }
- rt.mgmt.taskTrackers = append(rt.mgmt.taskTrackers, ch)
-}
-
-func (rt *vrt) advanceTask(progress, goal int) {
- rt.mgmt.Lock()
- defer rt.mgmt.Unlock()
- rt.mgmt.task.Goal += goal
- rt.mgmt.task.Progress += progress
- for _, t := range rt.mgmt.taskTrackers {
- select {
- case t <- rt.mgmt.task:
- default:
- // TODO(caprita): Make it such that the latest task
- // update is always added to the channel even if channel
- // is full. One way is to pull an element from t and
- // then re-try the push.
- }
- }
-}
-
-func (rt *vrt) AdvanceGoal(delta int) {
- if delta <= 0 {
- return
- }
- rt.advanceTask(0, delta)
-}
-
-func (rt *vrt) AdvanceProgress(delta int) {
- if delta <= 0 {
- return
- }
- rt.advanceTask(delta, 0)
-}
-
-func (m *mgmtImpl) Stop(ctx appcycle.AppCycleStopContext) error {
- // The size of the channel should be reasonably sized to expect not to
- // miss updates while we're waiting for the stream to unblock.
- ch := make(chan veyron2.Task, 10)
- m.rt.TrackTask(ch)
- // TODO(caprita): Include identity of Stop issuer in message.
- m.rt.stop(veyron2.RemoteStop)
- for {
- task, ok := <-ch
- if !ok {
- // Channel closed, meaning process shutdown is imminent.
- break
- }
- ctx.SendStream().Send(task)
- }
- return nil
-}
-
-func (m *mgmtImpl) ForceStop(ipc.ServerContext) error {
- m.rt.ForceStop()
- return fmt.Errorf("ForceStop should not reply as the process should be dead")
-}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index fc39919..cd3db2a 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -45,7 +45,8 @@
// TestBasic verifies that the basic plumbing works: LocalStop calls result in
// stop messages being sent on the channel passed to WaitForStop.
func TestBasic(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
ch := make(chan string, 1)
m.WaitForStop(ch)
for i := 0; i < 10; i++ {
@@ -64,7 +65,8 @@
// TestMultipleWaiters verifies that the plumbing works with more than one
// registered wait channel.
func TestMultipleWaiters(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
ch1 := make(chan string, 1)
m.WaitForStop(ch1)
ch2 := make(chan string, 1)
@@ -84,7 +86,8 @@
// channel is not being drained: once the channel's buffer fills up, future
// Stops become no-ops.
func TestMultipleStops(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
ch := make(chan string, 1)
m.WaitForStop(ch)
for i := 0; i < 10; i++ {
@@ -101,7 +104,8 @@
}
func noWaiters(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
fmt.Fprintf(stdout, "ready\n")
modules.WaitForEOF(stdin)
m.Stop()
@@ -126,7 +130,8 @@
}
func forceStop(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
fmt.Fprintf(stdout, "ready\n")
modules.WaitForEOF(stdin)
m.WaitForStop(make(chan string, 1))
@@ -153,7 +158,7 @@
}
}
-func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int) {
+func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int32) {
if want, got := (veyron2.Task{progress, goal}), <-ch; !reflect.DeepEqual(want, got) {
t.Errorf("Unexpected progress: want %+v, got %+v", want, got)
}
@@ -170,7 +175,8 @@
// TestProgress verifies that the ticker update/track logic works for a single
// tracker.
func TestProgress(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
m.AdvanceGoal(50)
ch := make(chan veyron2.Task, 1)
m.TrackTask(ch)
@@ -190,7 +196,7 @@
checkNoProgress(t, ch)
m.AdvanceGoal(0)
checkNoProgress(t, ch)
- m.Cleanup()
+ r.Cleanup()
if _, ok := <-ch; ok {
t.Errorf("Expected channel to be closed")
}
@@ -200,7 +206,8 @@
// works for more than one tracker. It also ensures that the runtime doesn't
// block when the tracker channels are full.
func TestProgressMultipleTrackers(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
// ch1 is 1-buffered, ch2 is 2-buffered.
ch1, ch2 := make(chan veyron2.Task, 1), make(chan veyron2.Task, 2)
m.TrackTask(ch1)
@@ -223,7 +230,7 @@
m.AdvanceGoal(4)
checkProgress(t, ch1, 11, 4)
checkProgress(t, ch2, 11, 4)
- m.Cleanup()
+ r.Cleanup()
if _, ok := <-ch1; ok {
t.Errorf("Expected channel to be closed")
}
@@ -237,15 +244,16 @@
if err != nil {
return err
}
+ m := r.AppCycle()
defer r.Cleanup()
ch := make(chan string, 1)
- r.WaitForStop(ch)
+ m.WaitForStop(ch)
fmt.Fprintf(stdout, "Got %s\n", <-ch)
- r.AdvanceGoal(10)
+ m.AdvanceGoal(10)
fmt.Fprintf(stdout, "Doing some work\n")
- r.AdvanceProgress(2)
+ m.AdvanceProgress(2)
fmt.Fprintf(stdout, "Doing some more work\n")
- r.AdvanceProgress(5)
+ m.AdvanceProgress(5)
return nil
}
@@ -268,7 +276,6 @@
t.Fatalf("Got error: %v", err)
}
ch := make(chan string)
-
var ep naming.Endpoint
if ep, err = server.Listen(profiles.LocalListenSpec); err != nil {
t.Fatalf("Got error: %v", err)
@@ -277,7 +284,6 @@
t.Fatalf("Got error: %v", err)
}
return server, naming.JoinAddressName(ep.String(), ""), ch
-
}
func setupRemoteAppCycleMgr(t *testing.T) (veyron2.Runtime, modules.Handle, appcycle.AppCycleClientMethods, func()) {
@@ -298,8 +304,12 @@
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
-
- appCycleName := <-ch
+ appCycleName := ""
+ select {
+ case appCycleName = <-ch:
+ case <-time.After(time.Minute):
+ t.Errorf("timeout")
+ }
appCycle := appcycle.AppCycleClient(appCycleName)
return r, h, appCycle, func() {
configServer.Stop()
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 8ff2225..ab635b2 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -30,8 +30,7 @@
var errCleaningUp = fmt.Errorf("operation rejected: runtime is being cleaned up")
type vrt struct {
- mu sync.Mutex
-
+ mu sync.Mutex
profile veyron2.Profile
publisher *config.Publisher
sm []stream.Manager // GUARDED_BY(mu)
@@ -39,7 +38,8 @@
signals chan os.Signal
principal security.Principal
client ipc.Client
- mgmt *mgmtImpl
+ ac veyron2.AppCycle
+ acServer ipc.Server
flags flags.RuntimeFlags
preferredProtocols options.PreferredProtocols
reservedDisp ipc.Dispatcher
@@ -69,7 +69,6 @@
})
flags := runtimeFlags.RuntimeFlags()
rt := &vrt{
- mgmt: new(mgmtImpl),
lang: i18n.LangIDFromEnv(),
program: filepath.Base(os.Args[0]),
flags: flags,
@@ -136,14 +135,14 @@
}
rt.publisher = config.NewPublisher()
- if err := rt.profile.Init(rt, rt.publisher); err != nil {
+ if rt.ac, err = rt.profile.Init(rt, rt.publisher); err != nil {
return nil, err
}
-
- if err := rt.mgmt.initMgmt(rt, handle); err != nil {
+ server, err := rt.initMgmt(rt.ac, handle)
+ if err != nil {
return nil, err
}
-
+ rt.acServer = server
vlog.VI(2).Infof("rt.Init done")
return rt, nil
}
@@ -156,6 +155,10 @@
return rt.profile
}
+func (rt *vrt) AppCycle() veyron2.AppCycle {
+ return rt.ac
+}
+
func (rt *vrt) ConfigureReservedName(server ipc.Dispatcher, opts ...ipc.ServerOpt) {
rt.mu.Lock()
defer rt.mu.Unlock()
@@ -188,7 +191,11 @@
// TODO(caprita): Consider shutting down mgmt later in the runtime's
// shutdown sequence, to capture some of the runtime internal shutdown
// tasks in the task tracker.
- rt.mgmt.shutdown()
+ rt.profile.Cleanup()
+ if rt.acServer != nil {
+ rt.acServer.Stop()
+ }
+
// It's ok to access rt.sm out of lock below, since a Mutex acts as a
// barrier in Go and hence we're guaranteed that cleaningUp is true at
// this point. The only code that mutates rt.sm is NewStreamManager in
diff --git a/runtimes/google/rt/signal_test.go b/runtimes/google/rt/signal_test.go
index f478d6d..de2d98c 100644
--- a/runtimes/google/rt/signal_test.go
+++ b/runtimes/google/rt/signal_test.go
@@ -14,6 +14,7 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/expect"
"veyron.io/veyron/veyron/lib/modules"
)
@@ -42,10 +43,12 @@
return "myprofile on " + mp.Platform().String()
}
-func (mp *myprofile) Init(veyron2.Runtime, *config.Publisher) error {
- return nil
+func (mp *myprofile) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
+ return appcycle.New(rt), nil
}
+func (mp *myprofile) Cleanup() {}
+
func simpleEchoProgram(stdin io.Reader, stdout io.Writer) {
fmt.Fprintf(stdout, "ready\n")
scanner := bufio.NewScanner(stdin)
diff --git a/runtimes/google/testing/mocks/runtime/panic_runtime.go b/runtimes/google/testing/mocks/runtime/panic_runtime.go
index 1c28e1d..d70852b 100644
--- a/runtimes/google/testing/mocks/runtime/panic_runtime.go
+++ b/runtimes/google/testing/mocks/runtime/panic_runtime.go
@@ -22,6 +22,7 @@
const badRuntime = "The runtime implmentation should not call methods on runtime intances."
func (*PanicRuntime) Profile() veyron2.Profile { panic(badRuntime) }
+func (*PanicRuntime) AppCycle() veyron2.AppCycle { panic(badRuntime) }
func (*PanicRuntime) Publisher() *config.Publisher { panic(badRuntime) }
func (*PanicRuntime) Principal() security.Principal { panic(badRuntime) }
func (*PanicRuntime) NewClient(opts ...ipc.ClientOpt) (ipc.Client, error) { panic(badRuntime) }
@@ -39,12 +40,6 @@
func (*PanicRuntime) NewLogger(name string, opts ...vlog.LoggingOpts) (vlog.Logger, error) {
panic(badRuntime)
}
-func (*PanicRuntime) Stop() { panic(badRuntime) }
-func (*PanicRuntime) ForceStop() { panic(badRuntime) }
-func (*PanicRuntime) WaitForStop(chan<- string) { panic(badRuntime) }
-func (*PanicRuntime) AdvanceGoal(delta int) { panic(badRuntime) }
-func (*PanicRuntime) AdvanceProgress(delta int) { panic(badRuntime) }
-func (*PanicRuntime) TrackTask(chan<- veyron2.Task) { panic(badRuntime) }
func (*PanicRuntime) ConfigureReservedName(ipc.Dispatcher, ...ipc.ServerOpt) {
panic(badRuntime)
}
diff --git a/services/mgmt/node/impl/node_invoker.go b/services/mgmt/node/impl/node_invoker.go
index 51a35cc..eeed7a4 100644
--- a/services/mgmt/node/impl/node_invoker.go
+++ b/services/mgmt/node/impl/node_invoker.go
@@ -159,7 +159,7 @@
if err := updateLink(i.config.Previous, i.config.CurrentLink); err != nil {
return err
}
- rt.R().Stop()
+ rt.R().AppCycle().Stop()
return nil
}
@@ -376,7 +376,7 @@
return err
}
- rt.R().Stop()
+ rt.R().AppCycle().Stop()
deferrer = nil
return nil
}