veyron/services/mgmt/node/impl: consolidate the callback logic in one place,
exposing a narrower API to the rest of the node manager so invokers can listen
for values from children without being exposed to the internals.
The API is in the form of a listener interface, that hides the details of
callback IDs, channels, etc.
Tangentially related to the above, I added logic to the unit test to check that
callback channels get cleaned up properly. Did this using a generic 'Leaking'
method that's only compiled for tests and does not expose the implementation of
the rest of the node manager (we can add more verification items to Leaking
later on). Also changed the tests to stop the node manager before the conclusion
of each test (Cleanup kills the child rather than shutting it down cleanly). To
achieve clean shutdown, I went the route of exposing the child's PID via stdout
and then sending it a catchable signal.
Change-Id: I39b782261807306965ea65bed9b438eaea0de484
diff --git a/services/mgmt/node/impl/app_invoker.go b/services/mgmt/node/impl/app_invoker.go
index 1f6a616..db89956 100644
--- a/services/mgmt/node/impl/app_invoker.go
+++ b/services/mgmt/node/impl/app_invoker.go
@@ -347,52 +347,44 @@
}
// Setup up the child process callback.
callbackState := i.callback
- id := callbackState.generateID()
+ listener := callbackState.listenFor(mgmt.AppCycleManagerConfigKey)
+ defer listener.cleanup()
cfg := config.New()
- cfg.Set(mgmt.ParentNodeManagerConfigKey, naming.MakeTerminal(naming.Join(i.config.Name, configSuffix, id)))
+ cfg.Set(mgmt.ParentNodeManagerConfigKey, listener.name())
handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg})
- // Make the channel buffered to avoid blocking the Set method when
- // nothing is receiving on the channel. This happens e.g. when
- // unregisterCallbacks executes before Set is called.
- callbackChan := make(chan string, 1)
- callbackState.register(id, mgmt.AppCycleManagerConfigKey, callbackChan)
- defer callbackState.unregister(id)
// Start the child process.
if err := handle.Start(); err != nil {
vlog.Errorf("Start() failed: %v", err)
return nil, errOperationFailed
}
// Wait for the child process to start.
- testTimeout := 10 * time.Second
- if err := handle.WaitForReady(testTimeout); err != nil {
- vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
+ timeout := 10 * time.Second
+ if err := handle.WaitForReady(timeout); err != nil {
+ vlog.Errorf("WaitForReady(%v) failed: %v", timeout, err)
if err := handle.Clean(); err != nil {
vlog.Errorf("Clean() failed: %v", err)
}
return nil, errOperationFailed
}
- select {
- case childName := <-callbackChan:
- instanceInfo := &instanceInfo{
- AppCycleMgrName: childName,
- Pid: handle.Pid(),
- }
- if err := saveInstanceInfo(instanceDir, instanceInfo); err != nil {
- if err := handle.Clean(); err != nil {
- vlog.Errorf("Clean() failed: %v", err)
- }
- return nil, err
- }
- // TODO(caprita): Spin up a goroutine to reap child status upon
- // exit and transition it to suspended state if it exits on its
- // own.
- case <-time.After(testTimeout):
- vlog.Errorf("Waiting for callback timed out")
+ childName, err := listener.waitForValue(timeout)
+ if err != nil {
if err := handle.Clean(); err != nil {
vlog.Errorf("Clean() failed: %v", err)
}
return nil, errOperationFailed
}
+ instanceInfo := &instanceInfo{
+ AppCycleMgrName: childName,
+ Pid: handle.Pid(),
+ }
+ if err := saveInstanceInfo(instanceDir, instanceInfo); err != nil {
+ if err := handle.Clean(); err != nil {
+ vlog.Errorf("Clean() failed: %v", err)
+ }
+ return nil, err
+ }
+ // TODO(caprita): Spin up a goroutine to reap child status upon exit and
+ // transition it to suspended state if it exits on its own.
return []string{instanceID}, nil
}
diff --git a/services/mgmt/node/impl/config_invoker.go b/services/mgmt/node/impl/config_invoker.go
index 1245fa2..5c4e4bf 100644
--- a/services/mgmt/node/impl/config_invoker.go
+++ b/services/mgmt/node/impl/config_invoker.go
@@ -8,8 +8,11 @@
import (
"strconv"
"sync"
+ "time"
"veyron2/ipc"
+ "veyron2/naming"
+ "veyron2/vlog"
)
type callbackState struct {
@@ -17,15 +20,75 @@
// channels maps callback identifiers and config keys to channels that
// are used to communicate corresponding config values from child
// processes.
- channels map[string]map[string]chan string
+ channels map[string]map[string]chan<- string
// nextCallbackID provides the next callback identifier to use as a key
// for the channels map.
nextCallbackID int64
+ // name is the object name for making calls against the node manager's
+ // config service.
+ name string
}
-func newCallbackState() *callbackState {
+func newCallbackState(name string) *callbackState {
return &callbackState{
- channels: make(map[string]map[string]chan string),
+ channels: make(map[string]map[string]chan<- string),
+ name: name,
+ }
+}
+
+// callbackListener abstracts out listening for values provided via the
+// callback mechanism for a given key.
+type callbackListener interface {
+ // waitForValue blocks until the value that this listener is expecting
+ // arrives; or until the timeout expires.
+ waitForValue(timeout time.Duration) (string, error)
+ // cleanup cleans up any state used by the listener. Should be called
+ // when the listener is no longer needed.
+ cleanup()
+ // name returns the object name for the config service object that
+ // handles the key that the listener is listening for.
+ name() string
+}
+
+// listener implements callbackListener
+type listener struct {
+ id string
+ cs *callbackState
+ ch <-chan string
+ n string
+}
+
+func (l *listener) waitForValue(timeout time.Duration) (string, error) {
+ select {
+ case value := <-l.ch:
+ return value, nil
+ case <-time.After(timeout):
+ vlog.Errorf("Waiting for callback timed out")
+ return "", errOperationFailed
+ }
+}
+
+func (l *listener) cleanup() {
+ l.cs.unregister(l.id)
+}
+
+func (l *listener) name() string {
+ return l.n
+}
+
+func (c *callbackState) listenFor(key string) callbackListener {
+ id := c.generateID()
+ callbackName := naming.MakeTerminal(naming.Join(c.name, configSuffix, id))
+ // Make the channel buffered to avoid blocking the Set method when
+ // nothing is receiving on the channel. This happens e.g. when
+ // unregisterCallbacks executes before Set is called.
+ callbackChan := make(chan string, 1)
+ c.register(id, key, callbackChan)
+ return &listener{
+ id: id,
+ cs: c,
+ ch: callbackChan,
+ n: callbackName,
}
}
@@ -36,11 +99,11 @@
return strconv.FormatInt(c.nextCallbackID-1, 10)
}
-func (c *callbackState) register(id, key string, channel chan string) {
+func (c *callbackState) register(id, key string, channel chan<- string) {
c.Lock()
defer c.Unlock()
if _, ok := c.channels[id]; !ok {
- c.channels[id] = make(map[string]chan string)
+ c.channels[id] = make(map[string]chan<- string)
}
c.channels[id][key] = channel
}
diff --git a/services/mgmt/node/impl/dispatcher.go b/services/mgmt/node/impl/dispatcher.go
index ad3d92b..9d00df3 100644
--- a/services/mgmt/node/impl/dispatcher.go
+++ b/services/mgmt/node/impl/dispatcher.go
@@ -53,7 +53,7 @@
return &dispatcher{
auth: auth,
internal: &internalState{
- callback: newCallbackState(),
+ callback: newCallbackState(config.Name),
updating: newUpdatingState(),
},
config: config,
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index fee491d..640b2a1 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -8,7 +8,9 @@
"os"
goexec "os/exec"
"path/filepath"
+ "strconv"
"strings"
+ "syscall"
"testing"
"veyron/lib/signals"
@@ -71,6 +73,7 @@
vlog.Fatalf("nodeManager expected at least an argument")
}
publishName := args[0]
+ args = args[1:]
defer fmt.Printf("%v terminating\n", publishName)
defer rt.R().Cleanup()
@@ -90,11 +93,11 @@
// This exemplifies how to override or set specific config fields, if,
// for example, the node manager is invoked 'by hand' instead of via a
// script prepared by a previous version of the node manager.
- if len(args) > 1 {
- if want, got := 3, len(args)-1; want != got {
+ if len(args) > 0 {
+ if want, got := 3, len(args); want != got {
vlog.Fatalf("expected %d additional arguments, got %d instead", want, got)
}
- configState.Root, configState.Origin, configState.CurrentLink = args[1], args[2], args[3]
+ configState.Root, configState.Origin, configState.CurrentLink = args[0], args[1], args[2]
}
dispatcher, err := impl.NewDispatcher(nil, configState)
@@ -106,12 +109,15 @@
}
impl.InvokeCallback(name)
+ fmt.Printf("ready:%d\n", os.Getpid())
- fmt.Println("ready")
<-signals.ShutdownOnSignals()
if os.Getenv("PAUSE_BEFORE_STOP") == "1" {
blackbox.WaitForEOFOnStdin()
}
+ if dispatcher.Leaking() {
+ vlog.Fatalf("node manager leaking resources")
+ }
}
// appService defines a test service that the test app should be running.
@@ -201,6 +207,26 @@
}
}
+// readPID waits for the "ready:<PID>" line from the child and parses out the
+// PID of the child.
+func readPID(t *testing.T, c *blackbox.Child) int {
+ line, err := c.ReadLineFromChild()
+ if err != nil {
+ t.Fatalf("ReadLineFromChild() failed: %v", err)
+ return 0
+ }
+ colon := strings.LastIndex(line, ":")
+ if colon == -1 {
+ t.Fatalf("LastIndex(%q, %q) returned -1", line, ":")
+ return 0
+ }
+ pid, err := strconv.Atoi(line[colon+1:])
+ if err != nil {
+ t.Fatalf("Atoi(%q) failed: %v", line[colon+1:], err)
+ }
+ return pid
+}
+
// TestNodeManagerUpdateAndRevert makes the node manager go through the motions of updating
// itself to newer versions (twice), and reverting itself back (twice). It also
// checks that update and revert fail when they're supposed to. The initial
@@ -253,7 +279,7 @@
deferrer()
}
}()
- nm.Expect("ready")
+ readPID(t, nm)
resolve(t, "factoryNM") // Verify the node manager has published itself.
// Simulate an invalid envelope in the application repository.
@@ -288,7 +314,7 @@
// This is from the child node manager started by the node manager
// as an update test.
- nm.Expect("ready")
+ readPID(t, nm)
nm.Expect("v2NM terminating")
updateExpectError(t, "factoryNM", verror.Exists) // Update already in progress.
@@ -306,7 +332,7 @@
t.Fatalf("Start() failed: %v", err)
}
deferrer = runNM.Cleanup
- runNM.Expect("ready")
+ readPID(t, runNM)
resolve(t, "v2NM") // Current link should have been launching v2.
// Try issuing an update without changing the envelope in the application
@@ -329,7 +355,7 @@
// This is from the child node manager started by the node manager
// as an update test.
- runNM.Expect("ready")
+ readPID(t, runNM)
// Both the parent and child node manager should terminate upon successful
// update.
runNM.ExpectSet([]string{"v3NM terminating", "v2NM terminating"})
@@ -348,7 +374,7 @@
t.Fatalf("Start() failed: %v", err)
}
deferrer = runNM.Cleanup
- runNM.Expect("ready")
+ readPID(t, runNM)
resolve(t, "v3NM") // Current link should have been launching v3.
// Revert the node manager to its previous version (v2).
@@ -369,7 +395,7 @@
t.Fatalf("Start() failed: %v", err)
}
deferrer = runNM.Cleanup
- runNM.Expect("ready")
+ readPID(t, runNM)
resolve(t, "v2NM") // Current link should have been launching v2.
// Revert the node manager to its previous version (factory).
@@ -388,8 +414,11 @@
t.Fatalf("Start() failed: %v", err)
}
deferrer = runNM.Cleanup
- runNM.Expect("ready")
+ pid := readPID(t, runNM)
resolve(t, "factoryNM") // Current link should have been launching factory version.
+ syscall.Kill(pid, syscall.SIGINT)
+ runNM.Expect("factoryNM terminating")
+ runNM.ExpectEOFAndWait()
}
type pingServerDisp chan struct{}
@@ -415,7 +444,7 @@
t.Fatalf("Start() failed: %v", err)
}
defer nm.Cleanup()
- nm.Expect("ready")
+ readPID(t, nm)
// Create the local server that the app uses to let us know it's ready.
server, _ := newServer()
@@ -489,4 +518,8 @@
t.Fatalf("Expected to read %v, got %v instead", want, got)
}
// END HACK
+
+ syscall.Kill(nm.Cmd.Process.Pid, syscall.SIGINT)
+ nm.Expect("nm terminating")
+ nm.ExpectEOFAndWait()
}
diff --git a/services/mgmt/node/impl/node_invoker.go b/services/mgmt/node/impl/node_invoker.go
index dbdb8c3..02d87a6 100644
--- a/services/mgmt/node/impl/node_invoker.go
+++ b/services/mgmt/node/impl/node_invoker.go
@@ -172,16 +172,11 @@
cmd.Stderr = os.Stderr
// Setup up the child process callback.
callbackState := i.callback
- id := callbackState.generateID()
+ listener := callbackState.listenFor(mgmt.ChildNodeManagerConfigKey)
+ defer listener.cleanup()
cfg := config.New()
- cfg.Set(mgmt.ParentNodeManagerConfigKey, naming.MakeTerminal(naming.Join(i.config.Name, configSuffix, id)))
+ cfg.Set(mgmt.ParentNodeManagerConfigKey, listener.name())
handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg})
- // Make the channel buffered to avoid blocking the Set method when
- // nothing is receiving on the channel. This happens e.g. when
- // unregisterCallbacks executes before Set is called.
- callbackChan := make(chan string, 1)
- callbackState.register(id, mgmt.ChildNodeManagerConfigKey, callbackChan)
- defer callbackState.unregister(id)
// Start the child process.
if err := handle.Start(); err != nil {
vlog.Errorf("Start() failed: %v", err)
@@ -198,46 +193,41 @@
vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
return errOperationFailed
}
- // Wait for the child process to invoke the Callback().
- select {
- case childName := <-callbackChan:
- // Check that invoking Update() succeeds.
- childName = naming.MakeTerminal(naming.Join(childName, "nm"))
- nmClient, err := node.BindNode(childName)
- if err != nil {
- vlog.Errorf("BindNode(%v) failed: %v", childName, err)
- return errOperationFailed
- }
- linkOld, pathOld, err := i.getCurrentFileInfo()
- if err != nil {
- return errOperationFailed
- }
- // Since the resolution of mtime for files is seconds,
- // the test sleeps for a second to make sure it can
- // check whether the current symlink is updated.
- time.Sleep(time.Second)
- if err := nmClient.Revert(ctx); err != nil {
- return errOperationFailed
- }
- linkNew, pathNew, err := i.getCurrentFileInfo()
- if err != nil {
- return errOperationFailed
- }
- // Check that the new node manager updated the current symbolic
- // link.
- if !linkOld.ModTime().Before(linkNew.ModTime()) {
- vlog.Errorf("new node manager test failed")
- return errOperationFailed
- }
- // Ensure that the current symbolic link points to the same
- // script.
- if pathNew != pathOld {
- i.updateLink(pathOld)
- vlog.Errorf("new node manager test failed")
- return errOperationFailed
- }
- case <-time.After(testTimeout):
- vlog.Errorf("Waiting for callback timed out")
+ childName, err := listener.waitForValue(testTimeout)
+ if err != nil {
+ return errOperationFailed
+ }
+ // Check that invoking Update() succeeds.
+ childName = naming.MakeTerminal(naming.Join(childName, "nm"))
+ nmClient, err := node.BindNode(childName)
+ if err != nil {
+ vlog.Errorf("BindNode(%v) failed: %v", childName, err)
+ return errOperationFailed
+ }
+ linkOld, pathOld, err := i.getCurrentFileInfo()
+ if err != nil {
+ return errOperationFailed
+ }
+ // Since the resolution of mtime for files is seconds, the test sleeps
+ // for a second to make sure it can check whether the current symlink is
+ // updated.
+ time.Sleep(time.Second)
+ if err := nmClient.Revert(ctx); err != nil {
+ return errOperationFailed
+ }
+ linkNew, pathNew, err := i.getCurrentFileInfo()
+ if err != nil {
+ return errOperationFailed
+ }
+ // Check that the new node manager updated the current symbolic link.
+ if !linkOld.ModTime().Before(linkNew.ModTime()) {
+ vlog.Errorf("new node manager test failed")
+ return errOperationFailed
+ }
+ // Ensure that the current symbolic link points to the same script.
+ if pathNew != pathOld {
+ i.updateLink(pathOld)
+ vlog.Errorf("new node manager test failed")
return errOperationFailed
}
return nil
diff --git a/services/mgmt/node/impl/only_for_test.go b/services/mgmt/node/impl/only_for_test.go
new file mode 100644
index 0000000..2c92aaa
--- /dev/null
+++ b/services/mgmt/node/impl/only_for_test.go
@@ -0,0 +1,14 @@
+package impl
+
+// This file contains code in the impl package that we only want built for tests
+// (it exposes public API methods that we don't want to normally expose).
+
+func (c *callbackState) leaking() bool {
+ c.Lock()
+ defer c.Unlock()
+ return len(c.channels) > 0
+}
+
+func (d *dispatcher) Leaking() bool {
+ return d.internal.callback.leaking()
+}