veyron/services/mgmt/node: making the self-update logic of node
manager robust to failures.
This CL introduces changes that guarantee that if the self-update
operation fails, a working copy of the node manager remains on the
device, and when the node manager fails, it is restarted.
To this end, this CL assumes the latest node manager installation is
pointed by a symbolic link (e.g. /usr/local/bin/nm/stable). When the
node manager is to be updated, the binary of the new version is
downloaded to a new "workspace"
(e.g. /usr/local/bin/nm/2014-05-26T15:00:00Z) and the symbolic link
that points to the latest installation of node manager is updated
accordingly. Further, the host init daemon is (expected to be) used to
make sure node manager is restarted when it crashes. To avoid the need
to modify the init daemon configuration every node manager
installation contains a shell script that sets up the application
envelope (environment variable + command-line arguments) and the init
daemon configuration file points to this shell script
(e.g. /usr/local/bin/nm/stable/noned.sh) that sets up the appropriate
environment variable and command-line arguments.
Change-Id: Ie81e21ac39944655ac6c170b8527b64b7a2e148b
diff --git a/lib/exec/child.go b/lib/exec/child.go
index 2353664..45b4a4d 100644
--- a/lib/exec/child.go
+++ b/lib/exec/child.go
@@ -14,14 +14,10 @@
)
type ChildHandle struct {
- // Endpoint is a callback endpoint that can be use to notify the
+ // CallbackName is a callback name that can be use to notify the
// parent that the child has started up successfully via the
// Callback() RPC.
- Endpoint string
- // ID is a callback ID that can be used by a parent to identify this
- // child when the child invokes the Callback() RPC using the
- // callback endpoint.
- ID string
+ CallbackName string
// Secret is a secret passed to the child by its parent via a
// trusted channel.
Secret string
@@ -88,11 +84,7 @@
return nil, ErrUnsupportedVersion
}
dataPipe := os.NewFile(3, "data_rd")
- endpoint, err := decodeString(dataPipe)
- if err != nil {
- return nil, err
- }
- id, err := decodeString(dataPipe)
+ name, err := decodeString(dataPipe)
if err != nil {
return nil, err
}
@@ -101,10 +93,9 @@
return nil, err
}
childHandle = &ChildHandle{
- Endpoint: endpoint,
- ID: id,
- Secret: secret,
- statusPipe: os.NewFile(4, "status_wr"),
+ CallbackName: name,
+ Secret: secret,
+ statusPipe: os.NewFile(4, "status_wr"),
}
return childHandle, nil
}
diff --git a/lib/exec/exec_test.go b/lib/exec/exec_test.go
index 2e4c143..85edb87 100644
--- a/lib/exec/exec_test.go
+++ b/lib/exec/exec_test.go
@@ -99,37 +99,19 @@
panic("unreachable")
}
-func TestEndpointExchange(t *testing.T) {
- cmd := helperCommand("testEndpoint")
+func TestCallbackNameExchange(t *testing.T) {
+ cmd := helperCommand("testCallbackName")
stderr, _ := cmd.StderrPipe()
- ph := vexec.NewParentHandle(cmd, vexec.CallbackEndpointOpt("dummy_endpoint"))
+ ph := vexec.NewParentHandle(cmd, vexec.CallbackNameOpt("dummy_name"))
err := ph.Start()
if err != nil {
- t.Fatalf("testEndpointTest: start: %v", err)
+ t.Fatalf("testCallbackNameTest: start: %v", err)
}
- if !expectMessage(stderr, "dummy_endpoint") {
+ if !expectMessage(stderr, "dummy_name") {
t.Errorf("unexpected output from child")
} else {
if err = cmd.Wait(); err != nil {
- t.Errorf("testEndpointTest: wait: %v", err)
- }
- }
- clean(t, ph)
-}
-
-func TestIDExchange(t *testing.T) {
- cmd := helperCommand("testID")
- stderr, _ := cmd.StderrPipe()
- ph := vexec.NewParentHandle(cmd, vexec.CallbackIDOpt("dummy_id"))
- err := ph.Start()
- if err != nil {
- t.Fatalf("testIDTest: start: %v", err)
- }
- if !expectMessage(stderr, "dummy_id") {
- t.Errorf("unexpected output from child")
- } else {
- if err = cmd.Wait(); err != nil {
- t.Errorf("testIDTest: wait: %v", err)
+ t.Errorf("testCallbackNameTest: wait: %v", err)
}
}
clean(t, ph)
@@ -498,19 +480,12 @@
}()
r := <-rc
os.Exit(r)
- case "testEndpoint":
+ case "testCallbackName":
ch, err := vexec.GetChildHandle()
if err != nil {
log.Fatalf("%v", err)
} else {
- fmt.Fprintf(os.Stderr, "%s", ch.Endpoint)
- }
- case "testID":
- ch, err := vexec.GetChildHandle()
- if err != nil {
- log.Fatalf("%s", err)
- } else {
- fmt.Fprintf(os.Stderr, "%s", ch.ID)
+ fmt.Fprintf(os.Stderr, "%s", ch.CallbackName)
}
case "testSecret":
ch, err := vexec.GetChildHandle()
diff --git a/lib/exec/parent.go b/lib/exec/parent.go
index b2789fc..f3a185b 100644
--- a/lib/exec/parent.go
+++ b/lib/exec/parent.go
@@ -25,8 +25,7 @@
// A ParentHandle is the Parent process' means of managing a single child.
type ParentHandle struct {
c *exec.Cmd
- endpoint string
- id string
+ name string
secret string
statusRead *os.File
statusWrite *os.File
@@ -40,21 +39,13 @@
ExecParentHandleOpt()
}
-// CallbackEndpointOpt can be used to seed the parent handle with a
-// custom callback endpoint.
-type CallbackEndpointOpt string
+// CallbackNameOpt can be used to seed the parent handle with a
+// custom callback name.
+type CallbackNameOpt string
-// ExecParentHandleOpt makes CallbackEndpointOpt an instance of
+// ExecParentHandleOpt makes CallbackNameOpt an instance of
// ParentHandleOpt.
-func (cno CallbackEndpointOpt) ExecParentHandleOpt() {}
-
-// CallbackIDOpt can be used to seed the parent handle with a
-// custom callback ID.
-type CallbackIDOpt string
-
-// ExecParentHandleOpt makes CallbackIDOpt an instance of
-// ParentHandleOpt.
-func (cno CallbackIDOpt) ExecParentHandleOpt() {}
+func (cno CallbackNameOpt) ExecParentHandleOpt() {}
// SecretOpt can be used to seed the parent handle with a custom secret.
type SecretOpt string
@@ -74,14 +65,12 @@
// an instance of exec.Cmd.
func NewParentHandle(c *exec.Cmd, opts ...ParentHandleOpt) *ParentHandle {
c.Env = append(c.Env, versionVariable+"="+version1)
- endpoint, id, secret := emptyEndpoint, emptyID, emptySecret
+ name, secret := "", ""
tk := timekeeper.RealTime()
for _, opt := range opts {
switch v := opt.(type) {
- case CallbackEndpointOpt:
- endpoint = string(v)
- case CallbackIDOpt:
- id = string(v)
+ case CallbackNameOpt:
+ name = string(v)
case SecretOpt:
secret = string(v)
case TimeKeeperOpt:
@@ -91,11 +80,10 @@
}
}
return &ParentHandle{
- c: c,
- endpoint: endpoint,
- id: id,
- secret: secret,
- tk: tk,
+ c: c,
+ name: name,
+ secret: secret,
+ tk: tk,
}
}
@@ -137,12 +125,7 @@
return err
}
// Pass data to the child using a pipe.
- if err := encodeString(dataWrite, p.endpoint); err != nil {
- p.statusWrite.Close()
- p.statusRead.Close()
- return err
- }
- if err := encodeString(dataWrite, p.id); err != nil {
+ if err := encodeString(dataWrite, p.name); err != nil {
p.statusWrite.Close()
p.statusRead.Close()
return err
diff --git a/lib/exec/shared.go b/lib/exec/shared.go
index cf31731..936d2bf 100644
--- a/lib/exec/shared.go
+++ b/lib/exec/shared.go
@@ -5,8 +5,4 @@
readyStatus = "ready"
initStatus = "init"
versionVariable = "VEYRON_EXEC_VERSION"
-
- emptyEndpoint = "emptyEndpoint"
- emptyID = "emptyID"
- emptySecret = "emptySecret"
)
diff --git a/services/mgmt/node/impl/const.go b/services/mgmt/node/impl/const.go
new file mode 100644
index 0000000..43d30fc
--- /dev/null
+++ b/services/mgmt/node/impl/const.go
@@ -0,0 +1,8 @@
+package impl
+
+// Node manager environment variables.
+const (
+ ORIGIN_ENV = "VEYRON_NM_ORIGIN"
+ ROOT_ENV = "VEYRON_NM_ROOT"
+ TEST_UPDATE_ENV = "VEYRON_NM_TEST_UPDATE"
+)
diff --git a/services/mgmt/node/impl/dispatcher.go b/services/mgmt/node/impl/dispatcher.go
index 5de34ea..a787d54 100644
--- a/services/mgmt/node/impl/dispatcher.go
+++ b/services/mgmt/node/impl/dispatcher.go
@@ -1,31 +1,72 @@
package impl
import (
+ "sync"
+
+ "veyron/services/mgmt/node"
+
"veyron2/ipc"
"veyron2/security"
"veyron2/services/mgmt/application"
- "veyron2/services/mgmt/node"
+ "veyron2/services/mgmt/content"
)
// dispatcher holds the state of the node manager dispatcher.
type dispatcher struct {
- envelope *application.Envelope
- origin string
- auth security.Authorizer
+ auth security.Authorizer
+ state *state
}
-// NewDispatcher is the dispatcher factory.
-func NewDispatcher(envelope *application.Envelope, origin string, auth security.Authorizer) *dispatcher {
+// crDispatcher holds the state of the node manager content repository
+// dispatcher.
+type crDispatcher struct {
+ auth security.Authorizer
+}
+
+// arDispatcher holds the state of the node manager application
+// repository dispatcher.
+type arDispatcher struct {
+ auth security.Authorizer
+ state *arState
+}
+
+// NewDispatcher is the node manager dispatcher factory.
+func NewDispatchers(auth security.Authorizer, envelope *application.Envelope, name string) (*dispatcher, *crDispatcher, *arDispatcher) {
return &dispatcher{
- envelope: envelope,
- origin: origin,
- auth: auth,
- }
+ auth: auth,
+ state: &state{
+ channels: make(map[string]chan string),
+ channelsMutex: new(sync.Mutex),
+ envelope: envelope,
+ name: name,
+ updating: false,
+ updatingMutex: new(sync.Mutex),
+ },
+ }, &crDispatcher{
+ auth: auth,
+ }, &arDispatcher{
+ auth: auth,
+ state: &arState{
+ envelope: envelope,
+ name: name,
+ },
+ }
}
// DISPATCHER INTERFACE IMPLEMENTATION
func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(node.NewServerNode(NewInvoker(d.envelope, d.origin, suffix)))
- return invoker, d.auth, nil
+ return ipc.ReflectInvoker(node.NewServerNode(NewInvoker(d.state, suffix))), d.auth, nil
+}
+
+// DISPATCHER INTERFACE IMPLEMENTATION
+
+func (d *crDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+ return ipc.ReflectInvoker(content.NewServerContent(NewCRInvoker(suffix))), d.auth, nil
+}
+
+// DISPATCHER INTERFACE IMPLEMENTATION
+
+func (d *arDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+ return ipc.ReflectInvoker(application.NewServerRepository(NewARInvoker(d.state, suffix))), d.auth, nil
}
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index 2367074..6b473f6 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -1,96 +1,52 @@
-package impl_test
+package impl
import (
- "errors"
"fmt"
- "io"
"os"
- "strconv"
+ "syscall"
"testing"
+ "time"
"veyron/lib/exec"
"veyron/lib/signals"
"veyron/lib/testutil"
"veyron/lib/testutil/blackbox"
- "veyron/services/mgmt/node/impl"
+ "veyron/services/mgmt/node"
mtlib "veyron/services/mounttable/lib"
"veyron2"
- "veyron2/ipc"
"veyron2/naming"
"veyron2/rt"
"veyron2/services/mgmt/application"
- "veyron2/services/mgmt/content"
- "veyron2/services/mgmt/node"
"veyron2/vlog"
)
-var (
- errOperationFailed = errors.New("operation failed")
-)
-
-type arInvoker struct {
- envelope *application.Envelope
-}
-
-func (i *arInvoker) Match(_ ipc.ServerContext, _ []string) (application.Envelope, error) {
- vlog.VI(1).Infof("Match()")
- return *i.envelope, nil
-}
-
-const bufferLength = 1024
-
-type cmInvoker struct{}
-
-func (i *cmInvoker) Delete(_ ipc.ServerContext) error {
- return nil
-}
-
-func (i *cmInvoker) Download(_ ipc.ServerContext, stream content.ContentServiceDownloadStream) error {
- vlog.VI(1).Infof("Download()")
- file, err := os.Open(os.Args[0])
- if err != nil {
- vlog.Errorf("Open() failed: %v", err)
- return errOperationFailed
- }
- defer file.Close()
- buffer := make([]byte, bufferLength)
- for {
- n, err := file.Read(buffer)
- if err != nil && err != io.EOF {
- vlog.Errorf("Read() failed: %v", err)
- return errOperationFailed
- }
- if n == 0 {
- break
- }
- if err := stream.Send(buffer[:n]); err != nil {
- vlog.Errorf("Send() failed: %v", err)
- return errOperationFailed
- }
- }
- return nil
-}
-
-func (i *cmInvoker) Upload(_ ipc.ServerContext, _ content.ContentServiceUploadStream) (string, error) {
- return "", nil
-}
-
func init() {
blackbox.CommandTable["nodeManager"] = nodeManager
}
-func getProcessID(t *testing.T, child *blackbox.Child) int {
- line, err := child.ReadLineFromChild()
- if err != nil {
- child.Cleanup()
- t.Fatalf("ReadLineFromChild() failed: %v", err)
+func invokeCallback(name string) {
+ handle, err := exec.GetChildHandle()
+ switch err {
+ case nil:
+ // Node manager was started by self-update, notify the parent
+ // process that you are ready.
+ handle.SetReady()
+ if handle.CallbackName != "" {
+ nmClient, err := node.BindNode(handle.CallbackName)
+ if err != nil {
+ vlog.Fatalf("BindNode(%v) failed: %v", handle.CallbackName, err)
+ }
+ if err := nmClient.Callback(rt.R().NewContext(), name); err != nil {
+ vlog.Fatalf("Callback(%v) failed: %v", name, err)
+ }
+ }
+ case exec.ErrNoVersion:
+ // Node manager was not started by self-update, no action is
+ // needed.
+ default:
+ vlog.Fatalf("NewChildHandle() failed: %v", err)
}
- pid, err := strconv.Atoi(line)
- if err != nil {
- t.Fatalf("Atoi(%v) failed: %v", line, err)
- }
- return pid
}
func invokeUpdate(t *testing.T, nmAddress string) {
@@ -106,75 +62,47 @@
// nodeManager is an enclosure for the node manager blackbox process.
func nodeManager(argv []string) {
- origin := argv[0]
- runtime := rt.Init()
- defer runtime.Shutdown()
-
- _, nmCleanup := startNodeManager(runtime, origin)
- defer nmCleanup()
- // Wait until shutdown.
- <-signals.ShutdownOnSignals()
- blackbox.WaitForEOFOnStdin()
+ // The node manager program logic assumes that its binary is
+ // executed through a symlink. To meet this assumption, the first
+ // time this binary is started it creates a symlink to itself and
+ // restarts itself using this symlink.
+ fi, err := os.Lstat(os.Args[0])
+ if err != nil {
+ vlog.Fatalf("Lstat(%v) failed: %v", os.Args[0], err)
+ }
+ if fi.Mode()&os.ModeSymlink != os.ModeSymlink && os.Getenv(TEST_UPDATE_ENV) == "" {
+ symlink := os.Args[0] + ".symlink"
+ if err := os.Symlink(os.Args[0], symlink); err != nil {
+ vlog.Fatalf("Symlink(%v, %v) failed: %v", os.Args[0], symlink, err)
+ }
+ argv := append([]string{symlink}, os.Args[1:]...)
+ envv := os.Environ()
+ if err := syscall.Exec(symlink, argv, envv); err != nil {
+ vlog.Fatalf("Exec(%v, %v, %v) failed: %v", symlink, argv, envv, err)
+ }
+ } else {
+ runtime := rt.Init()
+ defer runtime.Shutdown()
+ address, nmCleanup := startNodeManager(runtime, argv[0], argv[1])
+ defer nmCleanup()
+ invokeCallback(address)
+ // Wait until shutdown.
+ <-signals.ShutdownOnSignals()
+ blackbox.WaitForEOFOnStdin()
+ }
}
-func spawnNodeManager(t *testing.T, arAddress, mtAddress string, idFile string) *blackbox.Child {
- child := blackbox.HelperCommand(t, "nodeManager", arAddress)
+func spawnNodeManager(t *testing.T, mtAddress string, idFile string) *blackbox.Child {
+ child := blackbox.HelperCommand(t, "nodeManager", mtAddress, idFile)
child.Cmd.Env = exec.SetEnv(child.Cmd.Env, "MOUNTTABLE_ROOT", mtAddress)
child.Cmd.Env = exec.SetEnv(child.Cmd.Env, "VEYRON_IDENTITY", idFile)
+ child.Cmd.Env = exec.SetEnv(child.Cmd.Env, "VEYRON_NM_ROOT", os.TempDir())
if err := child.Cmd.Start(); err != nil {
t.Fatalf("Start() failed: %v", err)
}
return child
}
-func startApplicationRepository(t *testing.T, runtime veyron2.Runtime, cmAddress string, envelope *application.Envelope) (string, naming.Endpoint, func()) {
- server, err := runtime.NewServer()
- if err != nil {
- t.Fatalf("NewServer() failed: %v", err)
- }
- suffix, dispatcher := "ar", ipc.SoloDispatcher(application.NewServerRepository(&arInvoker{envelope: envelope}), nil)
- if err := server.Register(suffix, dispatcher); err != nil {
- t.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
- }
- protocol, hostname := "tcp", "localhost:0"
- endpoint, err := server.Listen(protocol, hostname)
- if err != nil {
- t.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
- }
- // Method calls must be directed to suffix+"/"+suffix
- server.Publish(suffix)
- vlog.VI(1).Infof("Application repository running at endpoint: %s", endpoint)
- return suffix + "/" + suffix, endpoint, func() {
- if err := server.Stop(); err != nil {
- t.Fatalf("Stop() failed: %v", err)
- }
- }
-}
-
-func startContentManager(t *testing.T, runtime veyron2.Runtime) (string, naming.Endpoint, func()) {
- server, err := runtime.NewServer()
- if err != nil {
- t.Fatalf("NewServer() failed: %v", err)
- }
- suffix, dispatcher := "cm", ipc.SoloDispatcher(content.NewServerContent(&cmInvoker{}), nil)
- if err := server.Register(suffix, dispatcher); err != nil {
- t.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
- }
- protocol, hostname := "tcp", "localhost:0"
- endpoint, err := server.Listen(protocol, hostname)
- if err != nil {
- t.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
- }
- // Method calls must be directed to suffix+"/"+suffix
- server.Publish(suffix)
- vlog.VI(1).Infof("Content manager running at endpoint: %s", endpoint)
- return suffix + "/" + suffix, endpoint, func() {
- if err := server.Stop(); err != nil {
- t.Fatalf("Stop() failed: %v", err)
- }
- }
-}
-
func startMountTable(t *testing.T, runtime veyron2.Runtime) (string, func()) {
server, err := runtime.NewServer(veyron2.ServesMountTableOpt(true))
if err != nil {
@@ -194,7 +122,7 @@
t.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
}
name := naming.JoinAddressName(endpoint.String(), suffix)
- vlog.VI(1).Infof("Mount table running at endpoint: %s, name %q", endpoint, name)
+ vlog.VI(1).Infof("Mount table name: %v", name)
return name, func() {
if err := server.Stop(); err != nil {
t.Fatalf("Stop() failed: %v", err)
@@ -202,7 +130,7 @@
}
}
-func startNodeManager(runtime veyron2.Runtime, origin string) (string, func()) {
+func startNodeManager(runtime veyron2.Runtime, mtAddress, idFile string) (string, func()) {
server, err := runtime.NewServer()
if err != nil {
vlog.Fatalf("NewServer() failed: %v", err)
@@ -212,18 +140,31 @@
if err != nil {
vlog.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
}
- suffix, dispatcher := "", impl.NewDispatcher(&application.Envelope{}, origin, nil)
+ envelope := &application.Envelope{}
+ envelope.Args = make([]string, 0)
+ envelope.Args = os.Args[1:]
+ envelope.Env = make([]string, 0)
+ envelope.Env = append(envelope.Env, os.Environ()...)
+ suffix, crSuffix, arSuffix := "", "cr", "ar"
+ name := naming.MakeTerminal(naming.JoinAddressName(endpoint.String(), suffix))
+ vlog.VI(1).Infof("Node manager name: %v", name)
+ dispatcher, crDispatcher, arDispatcher := NewDispatchers(nil, envelope, name)
if err := server.Register(suffix, dispatcher); err != nil {
vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
}
- address := naming.JoinAddressName(endpoint.String(), suffix)
- vlog.VI(1).Infof("Node manager running at endpoint: %q", address)
- name := "nm"
- if err := server.Publish(name); err != nil {
- vlog.Fatalf("Publish(%v) failed: %v", name, err)
+ if err := server.Register(crSuffix, crDispatcher); err != nil {
+ vlog.Fatalf("Register(%v, %v) failed: %v", crSuffix, crDispatcher, err)
}
- fmt.Printf("%d\n", os.Getpid())
- return address, func() {
+ if err := server.Register(arSuffix, arDispatcher); err != nil {
+ vlog.Fatalf("Register(%v, %v) failed: %v", arSuffix, arDispatcher, err)
+ }
+ publishAs := "nm"
+ if err := server.Publish(publishAs); err != nil {
+ vlog.Fatalf("Publish(%v) failed: %v", publishAs, err)
+ }
+ os.Setenv(ORIGIN_ENV, naming.JoinAddressName(name, "ar"))
+ fmt.Printf("ready\n")
+ return name, func() {
if err := server.Stop(); err != nil {
vlog.Fatalf("Stop() failed: %v", err)
}
@@ -235,48 +176,29 @@
}
func TestUpdate(t *testing.T) {
- // Set up a mount table, a content manager, and an application repository.
+ // Set up a mount table.
runtime := rt.Init()
defer runtime.Shutdown()
mtName, mtCleanup := startMountTable(t, runtime)
defer mtCleanup()
mt := runtime.MountTable()
- // The local, client side MountTable is now relative the MountTable server
- // started above.
+ // The local, client side MountTable is now relative to the
+ // MountTable server started above.
mt.SetRoots([]string{mtName})
-
- cmSuffix, cmEndpoint, cmCleanup := startContentManager(t, runtime)
- cmName := naming.Join(mtName, cmSuffix)
- defer cmCleanup()
- envelope := application.Envelope{}
- arSuffix, arEndpoint, arCleanup := startApplicationRepository(t, runtime, cmSuffix, &envelope)
- //arName := naming.Join(mtName, arSuffix)
- defer arCleanup()
-
ctx := runtime.NewContext()
- if s, err := mt.Resolve(ctx, arSuffix); err != nil || s[0] != "/"+arEndpoint.String()+"//ar" {
- t.Errorf("failed to resolve %q", arSuffix)
- t.Errorf("err: %v, got %v, want /%v//ar", err, s[0], arEndpoint)
- }
- if s, err := mt.Resolve(ctx, cmSuffix); err != nil || s[0] != "/"+cmEndpoint.String()+"//cm" {
- t.Errorf("failed to resolve %q", cmSuffix)
- t.Errorf("err: %v, got %v, want /%v//cm", err, s[0], cmEndpoint)
- }
- // Spawn a node manager with an identity blessed by the mounttable's identity.
- // under the name "test", and obtain its endpoint.
- // TODO(ataly): Eventually we want to use the same identity the node manager
- // would have if it was running in production.
-
+ // Spawn a node manager with an identity blessed by the mounttable's
+ // identity under the name "test", and obtain its address.
+ //
+ // TODO(ataly): Eventually we want to use the same identity the node
+ // manager would have if it was running in production.
idFile := testutil.SaveIdentityToFile(testutil.NewBlessedIdentity(runtime.Identity(), "test"))
defer os.Remove(idFile)
- child := spawnNodeManager(t, arSuffix, mtName, idFile)
+ child := spawnNodeManager(t, mtName, idFile)
defer child.Cleanup()
- _ = getProcessID(t, child) // sync with the child
- envelope.Args = child.Cmd.Args[1:]
- envelope.Env = child.Cmd.Env
- envelope.Binary = cmName
-
+ if err := child.WaitForLine("ready", time.Second); err != nil {
+ t.Fatal("WaitForLine() failed: %v", err)
+ }
name := naming.Join(mtName, "nm")
results, err := mt.Resolve(ctx, name)
if err != nil {
@@ -286,30 +208,12 @@
t.Fatalf("Unexpected number of results: expected %d, got %d", expected, got)
}
nmAddress := results[0]
- vlog.VI(1).Infof("Node manager running at endpoint: %q -> %s", name, nmAddress)
+ vlog.VI(1).Infof("Node manager name: %v address: %v", name, nmAddress)
// Invoke the Update method and check that another instance of the
// node manager binary has been started.
invokeUpdate(t, nmAddress)
- pid := getProcessID(t, child)
-
- if results, err := mt.Resolve(ctx, name); err != nil {
- t.Fatalf("Resolve(%v) failed: %v", name, err)
- } else {
- if expected, got := 2, len(results); expected != got {
- t.Fatalf("Unexpected number of results: expected %d, got %d", expected, got)
- }
- }
-
- // Terminate the node manager binary.
- //
- // TODO(jsimsa): When support for remote Stop() is implemented, use
- // it here instead.
- process, err := os.FindProcess(pid)
- if err != nil {
- t.Fatalf("FindProcess(%v) failed: %v", pid, err)
- }
- if err := process.Kill(); err != nil {
- t.Fatalf("Kill() failed: %v", err)
+ if err := child.WaitForLine("ready", time.Second); err != nil {
+ t.Fatal("WaitForLine() failed: %v", err)
}
}
diff --git a/services/mgmt/node/impl/invoker.go b/services/mgmt/node/impl/invoker.go
index 475307c..e32ff53 100644
--- a/services/mgmt/node/impl/invoker.go
+++ b/services/mgmt/node/impl/invoker.go
@@ -3,22 +3,26 @@
import (
"bytes"
"errors"
+ "fmt"
"io"
"io/ioutil"
+ "math/rand"
"os"
"os/exec"
+ "path/filepath"
"reflect"
"regexp"
"runtime"
"strings"
- "syscall"
+ "sync"
"time"
vexec "veyron/lib/exec"
ibuild "veyron/services/mgmt/build"
-
"veyron/services/mgmt/profile"
+
"veyron2/ipc"
+ "veyron2/naming"
"veyron2/rt"
"veyron2/services/mgmt/application"
"veyron2/services/mgmt/build"
@@ -29,13 +33,30 @@
var updateSuffix = regexp.MustCompile(`^apps\/.*$`)
-// invoker holds the state of a node manager invocation.
-type invoker struct {
+// state wraps state shared between different node manager
+// invocations.
+type state struct {
+ // channels maps callback identifiers to channels that are used to
+ // communicate information from child processes.
+ channels map[string]chan string
+ // channelsMutex is a lock for coordinating concurrent access to
+ // <channels>.
+ channelsMutex *sync.Mutex
// envelope is the node manager application envelope.
envelope *application.Envelope
- // origin is a veyron name that resolves to the node manager
- // envelope.
- origin string
+ // name is the node manager name.
+ name string
+ // updating is a flag that records whether this instance of node
+ // manager is being updated.
+ updating bool
+ // updatingMutex is a lock for coordinating concurrent access to
+ // <updating>.
+ updatingMutex *sync.Mutex
+}
+
+// invoker holds the state of a node manager invocation.
+type invoker struct {
+ state *state
// suffix is the suffix of the current invocation that is assumed to
// be used as a relative veyron name to identify an application,
// installation, or instance.
@@ -43,16 +64,16 @@
}
var (
- errInvalidSuffix = errors.New("invalid suffix")
- errOperationFailed = errors.New("operation failed")
+ errInvalidSuffix = errors.New("invalid suffix")
+ errOperationFailed = errors.New("operation failed")
+ errUpdateInProgress = errors.New("update in progress")
)
// NewInvoker is the invoker factory.
-func NewInvoker(envelope *application.Envelope, origin, suffix string) *invoker {
+func NewInvoker(state *state, suffix string) *invoker {
return &invoker{
- envelope: envelope,
- origin: origin,
- suffix: suffix,
+ state: state,
+ suffix: suffix,
}
}
@@ -276,22 +297,22 @@
// APPLICATION INTERFACE IMPLEMENTATION
-func downloadBinary(binary string) (string, error) {
+func downloadBinary(workspace, binary string) error {
stub, err := content.BindContent(binary)
if err != nil {
vlog.Errorf("BindContent(%q) failed: %v", binary, err)
- return "", errOperationFailed
+ return errOperationFailed
}
stream, err := stub.Download(rt.R().NewContext())
if err != nil {
vlog.Errorf("Download() failed: %v", err)
- return "", errOperationFailed
+ return errOperationFailed
}
- tmpDir, prefix := "", ""
- file, err := ioutil.TempFile(tmpDir, prefix)
+ path := filepath.Join(workspace, "noded")
+ file, err := os.Create(path)
if err != nil {
- vlog.Errorf("TempFile(%q, %q) failed: %v", tmpDir, prefix, err)
- return "", errOperationFailed
+ vlog.Errorf("Create(%q) failed: %v", path, err)
+ return errOperationFailed
}
defer file.Close()
for {
@@ -301,27 +322,23 @@
}
if err != nil {
vlog.Errorf("Recv() failed: %v", err)
- os.Remove(file.Name())
- return "", errOperationFailed
+ return errOperationFailed
}
if _, err := file.Write(bytes); err != nil {
vlog.Errorf("Write() failed: %v", err)
- os.Remove(file.Name())
- return "", errOperationFailed
+ return errOperationFailed
}
}
if err := stream.Finish(); err != nil {
vlog.Errorf("Finish() failed: %v", err)
- os.Remove(file.Name())
- return "", errOperationFailed
+ return errOperationFailed
}
mode := os.FileMode(0755)
if err := file.Chmod(mode); err != nil {
vlog.Errorf("Chmod(%v) failed: %v", mode, err)
- os.Remove(file.Name())
- return "", errOperationFailed
+ return errOperationFailed
}
- return file.Name(), nil
+ return nil
}
func fetchEnvelope(origin string) (*application.Envelope, error) {
@@ -341,36 +358,169 @@
return &envelope, nil
}
-func replaceBinary(oldBinary, newBinary string) error {
- // Replace the old binary with the new one.
- if err := syscall.Unlink(oldBinary); err != nil {
- vlog.Errorf("Unlink(%v) failed: %v", oldBinary, err)
- return errOperationFailed
+func generateBinary(workspace string, envelope *application.Envelope, newBinary bool) error {
+ if newBinary {
+ // Download the new binary.
+ return downloadBinary(workspace, envelope.Binary)
}
- if err := os.Rename(newBinary, oldBinary); err != nil {
- vlog.Errorf("Rename(%v, %v) failed: %v", newBinary, oldBinary, err)
+ // Link the current binary.
+ path := filepath.Join(workspace, "noded")
+ if err := os.Link(os.Args[0], path); err != nil {
+ vlog.Errorf("Link(%v, %v) failed: %v", os.Args[0], path, err)
return errOperationFailed
}
return nil
}
-func spawnNodeManager(envelope *application.Envelope) error {
- cmd := exec.Command(os.Args[0], envelope.Args...)
+func generateScript(workspace string, envelope *application.Envelope) error {
+ output := "#!/bin/bash\n"
+ output += strings.Join(envelope.Env, " ") + " " + os.Args[0] + " " + strings.Join(envelope.Args, " ")
+ path := filepath.Join(workspace, "noded.sh")
+ if err := ioutil.WriteFile(path, []byte(output), 0755); err != nil {
+ vlog.Errorf("WriteFile(%v) failed: %v", path, err)
+ return errOperationFailed
+ }
+ return nil
+}
+
+func updateLink(workspace string) error {
+ link := filepath.Join(os.Getenv(ROOT_ENV), "stable")
+ newLink := link + ".new"
+ fi, err := os.Lstat(newLink)
+ if err == nil {
+ if err := os.Remove(fi.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", fi.Name(), err)
+ return errOperationFailed
+ }
+ }
+ if err := os.Symlink(workspace, newLink); err != nil {
+ vlog.Errorf("Symlink(%v, %v) failed: %v", workspace, newLink, err)
+ return errOperationFailed
+ }
+ if err := os.Rename(newLink, link); err != nil {
+ vlog.Errorf("Rename(%v, %v) failed: %v", newLink, link, err)
+ return errOperationFailed
+ }
+ return nil
+}
+
+func (i *invoker) registerCallback(id string, channel chan string) {
+ i.state.channelsMutex.Lock()
+ defer i.state.channelsMutex.Unlock()
+ i.state.channels[id] = channel
+}
+
+func (i *invoker) testNodeManager(workspace string, envelope *application.Envelope) error {
+ path := filepath.Join(workspace, "noded")
+ cmd := exec.Command(path, envelope.Args...)
cmd.Env = envelope.Env
+ cmd.Env = vexec.SetEnv(cmd.Env, ORIGIN_ENV, naming.JoinAddressName(i.state.name, "ar"))
+ cmd.Env = vexec.SetEnv(cmd.Env, TEST_UPDATE_ENV, "1")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
-
- handle := vexec.NewParentHandle(cmd)
+ // Setup up the child process callback.
+ id := fmt.Sprintf("%d", rand.Int())
+ handle := vexec.NewParentHandle(cmd, vexec.CallbackNameOpt(naming.JoinAddressName(i.state.name, id)))
+ callbackChan := make(chan string)
+ i.registerCallback(id, callbackChan)
+ defer i.unregisterCallback(id)
+ // Start the child process.
if err := handle.Start(); err != nil {
vlog.Errorf("Start() failed: %v", err)
return errOperationFailed
}
- if err := handle.WaitForReady(10 * time.Second); err != nil {
- vlog.Errorf("WaitForReady() failed: %v", err)
- cmd.Process.Kill()
+ // Wait for the child process to start.
+ testTimeout := 2 * time.Second
+ if err := handle.WaitForReady(testTimeout); err != nil {
+ vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
+ if err := cmd.Process.Kill(); err != nil {
+ vlog.Errorf("Kill() failed: %v", err)
+ }
return errOperationFailed
}
- rt.R().Stop()
+ // Wait for the child process to invoke the Callback().
+ select {
+ case address := <-callbackChan:
+ // Check that invoking Update() succeeds.
+ address = naming.JoinAddressName(address, "nm")
+ nmClient, err := node.BindNode(address)
+ if err != nil {
+ vlog.Errorf("BindNode(%v) failed: %v", address, err)
+ if err := cmd.Process.Kill(); err != nil {
+ vlog.Errorf("Kill() failed: %v", err)
+ }
+ return errOperationFailed
+ }
+ if err := nmClient.Update(rt.R().NewContext()); err != nil {
+ if err := cmd.Process.Kill(); err != nil {
+ vlog.Errorf("Kill() failed: %v", err)
+ }
+ return errOperationFailed
+ }
+ case <-time.After(testTimeout):
+ vlog.Errorf("Waiting for callback timed out")
+ if err := cmd.Process.Kill(); err != nil {
+ vlog.Errorf("Kill() failed: %v", err)
+ }
+ return errOperationFailed
+ }
+ return nil
+}
+
+func (i *invoker) unregisterCallback(id string) {
+ i.state.channelsMutex.Lock()
+ defer i.state.channelsMutex.Unlock()
+ delete(i.state.channels, id)
+}
+
+func (i *invoker) updateNodeManager() error {
+ envelope, err := fetchEnvelope(os.Getenv(ORIGIN_ENV))
+ if err != nil {
+ return err
+ }
+ if !reflect.DeepEqual(envelope, i.state.envelope) {
+ // Create new workspace.
+ workspace := filepath.Join(os.Getenv(ROOT_ENV), fmt.Sprintf("%v", time.Now().Format(time.RFC3339Nano)))
+ perm := os.FileMode(0755)
+ if err := os.MkdirAll(workspace, perm); err != nil {
+ vlog.Errorf("MkdirAll(%v, %v) failed: %v", workspace, perm, err)
+ return errOperationFailed
+ }
+ // Populate the new workspace with a node manager binary.
+ if err := generateBinary(workspace, envelope, envelope.Binary != i.state.envelope.Binary); err != nil {
+ if err := os.RemoveAll(workspace); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+ }
+ return err
+ }
+ // Populate the new workspace with a node manager script.
+ if err := generateScript(workspace, envelope); err != nil {
+ if err := os.RemoveAll(workspace); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+ }
+ return err
+ }
+ if os.Getenv(TEST_UPDATE_ENV) == "" {
+ if err := i.testNodeManager(workspace, envelope); err != nil {
+ if err := os.RemoveAll(workspace); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+ }
+ return err
+ }
+ // If the binary has changed, update the node manager symlink.
+ if err := updateLink(workspace); err != nil {
+ if err := os.RemoveAll(workspace); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+ }
+ return err
+ }
+ } else {
+ if err := os.RemoveAll(workspace); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+ }
+ }
+ rt.R().Stop()
+ }
return nil
}
@@ -396,30 +546,18 @@
vlog.VI(0).Infof("%v.Update()", i.suffix)
switch {
case i.suffix == "nm":
- // This branch attempts to update the node manager updates itself.
- envelope, err := fetchEnvelope(i.origin)
- if err != nil {
- return err
+ // This branch attempts to update the node manager itself.
+ i.state.updatingMutex.Lock()
+ if i.state.updating {
+ i.state.updatingMutex.Unlock()
+ return errUpdateInProgress
+ } else {
+ i.state.updating = true
}
- if envelope.Binary != i.envelope.Binary {
- file, err := downloadBinary(envelope.Binary)
- if err != nil {
- return err
- }
- if err := replaceBinary(os.Args[0], file); err != nil {
- os.Remove(file)
- return err
- }
- }
- if !reflect.DeepEqual(envelope, i.envelope) {
- i.envelope = envelope
- if err := spawnNodeManager(i.envelope); err != nil {
- return err
- }
- // TODO(jsimsa): When Bogdan implements the shutdown API, use it
- // to stop itself (or have the caller do that).
- }
- return nil
+ i.state.updatingMutex.Unlock()
+ err := i.updateNodeManager()
+ i.state.updating = false
+ return err
case updateSuffix.MatchString(i.suffix):
// TODO(jsimsa): Implement.
return nil
@@ -457,3 +595,103 @@
// TODO(jsimsa): Implement.
return nil
}
+
+// CALLBACK RECEIVER INTERFACE IMPLEMENTATION
+
+func (i *invoker) Callback(call ipc.ServerContext, name string) error {
+ vlog.VI(0).Infof("%v.Callback()", i.suffix)
+ i.state.channelsMutex.Lock()
+ channel, ok := i.state.channels[i.suffix]
+ i.state.channelsMutex.Unlock()
+ if !ok {
+ return errInvalidSuffix
+ }
+ channel <- name
+ return nil
+}
+
+// invoker holds the state of a node manager content repository invocation.
+type crInvoker struct {
+ suffix string
+}
+
+// NewCRInvoker is the node manager content repository invoker factory.
+func NewCRInvoker(suffix string) *crInvoker {
+ return &crInvoker{
+ suffix: suffix,
+ }
+}
+
+// CONTENT REPOSITORY INTERFACE IMPLEMENTATION
+
+func (i *crInvoker) Delete(ipc.ServerContext) error {
+ vlog.VI(0).Infof("%v.Delete()", i.suffix)
+ return nil
+}
+
+func (i *crInvoker) Download(_ ipc.ServerContext, stream content.ContentServiceDownloadStream) error {
+ vlog.VI(0).Infof("%v.Download()", i.suffix)
+ file, err := os.Open(os.Args[0])
+ if err != nil {
+ vlog.Errorf("Open() failed: %v", err)
+ return errOperationFailed
+ }
+ defer file.Close()
+ bufferLength := 4096
+ buffer := make([]byte, bufferLength)
+ for {
+ n, err := file.Read(buffer)
+ switch err {
+ case io.EOF:
+ return nil
+ case nil:
+ if err := stream.Send(buffer[:n]); err != nil {
+ vlog.Errorf("Send() failed: %v", err)
+ return errOperationFailed
+ }
+ default:
+ vlog.Errorf("Read() failed: %v", err)
+ return errOperationFailed
+ }
+ }
+}
+
+func (i *crInvoker) Upload(ipc.ServerContext, content.ContentServiceUploadStream) (string, error) {
+ vlog.VI(0).Infof("%v.Upload()", i.suffix)
+ return "", nil
+}
+
+// arState wraps state shared by different node manager application
+// repository invocations.
+type arState struct {
+ // envelope is the node manager application envelope.
+ envelope *application.Envelope
+ // name is the node manager name.
+ name string
+}
+
+// invoker holds the state of a node manager application repository invocation.
+type arInvoker struct {
+ state *arState
+ suffix string
+}
+
+// NewARInvoker is the node manager content repository invoker factory.
+
+func NewARInvoker(state *arState, suffix string) *arInvoker {
+ return &arInvoker{
+ state: state,
+ suffix: suffix,
+ }
+}
+
+// APPLICATION REPOSITORY INTERFACE IMPLEMENTATION
+
+func (i *arInvoker) Match(ipc.ServerContext, []string) (application.Envelope, error) {
+ vlog.VI(0).Infof("%v.Match()", i.suffix)
+ envelope := application.Envelope{}
+ envelope.Args = i.state.envelope.Args
+ envelope.Binary = naming.JoinAddressName(i.state.name, "cr")
+ envelope.Env = i.state.envelope.Env
+ return envelope, nil
+}
diff --git a/services/mgmt/node/node.vdl b/services/mgmt/node/node.vdl
new file mode 100644
index 0000000..1ba0284
--- /dev/null
+++ b/services/mgmt/node/node.vdl
@@ -0,0 +1,23 @@
+// Package node contains implementation of managing a node and
+// applications running on the node.
+package node
+
+import (
+ public "veyron2/services/mgmt/node"
+)
+
+// CallbackReceiver can receive callbacks from previously spawned
+// processes.
+type CallbackReceiver interface {
+ // Callback receives a callback from a process that the callee
+ // previously spawned, providing the callee with a name that can be
+ // used to communicate with the caller.
+ Callback(name string) error
+}
+
+// Node describes a node manager internally. In addition to the public
+// Node interface, it implements the callback functionality.
+type Node interface {
+ public.Node
+ CallbackReceiver
+}
diff --git a/services/mgmt/node/node.vdl.go b/services/mgmt/node/node.vdl.go
new file mode 100644
index 0000000..6c25eb6
--- /dev/null
+++ b/services/mgmt/node/node.vdl.go
@@ -0,0 +1,457 @@
+// This file was auto-generated by the veyron vdl tool.
+// Source: node.vdl
+
+// Package node contains implementation of managing a node and
+// applications running on the node.
+package node
+
+import (
+ "veyron2/services/mgmt/node"
+
+ // The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_veyron2 "veyron2"
+ _gen_context "veyron2/context"
+ _gen_ipc "veyron2/ipc"
+ _gen_naming "veyron2/naming"
+ _gen_rt "veyron2/rt"
+ _gen_vdl "veyron2/vdl"
+ _gen_wiretype "veyron2/wiretype"
+)
+
+// CallbackReceiver can receive callbacks from previously spawned
+// processes.
+// CallbackReceiver is the interface the client binds and uses.
+// CallbackReceiver_ExcludingUniversal is the interface without internal framework-added methods
+// to enable embedding without method collisions. Not to be used directly by clients.
+type CallbackReceiver_ExcludingUniversal interface {
+ // Callback receives a callback from a process that the callee
+ // previously spawned, providing the callee with a name that can be
+ // used to communicate with the caller.
+ Callback(ctx _gen_context.T, name string, opts ..._gen_ipc.CallOpt) (err error)
+}
+type CallbackReceiver interface {
+ _gen_ipc.UniversalServiceMethods
+ CallbackReceiver_ExcludingUniversal
+}
+
+// CallbackReceiverService is the interface the server implements.
+type CallbackReceiverService interface {
+
+ // Callback receives a callback from a process that the callee
+ // previously spawned, providing the callee with a name that can be
+ // used to communicate with the caller.
+ Callback(context _gen_ipc.ServerContext, name string) (err error)
+}
+
+// BindCallbackReceiver returns the client stub implementing the CallbackReceiver
+// interface.
+//
+// If no _gen_ipc.Client is specified, the default _gen_ipc.Client in the
+// global Runtime is used.
+func BindCallbackReceiver(name string, opts ..._gen_ipc.BindOpt) (CallbackReceiver, error) {
+ var client _gen_ipc.Client
+ switch len(opts) {
+ case 0:
+ client = _gen_rt.R().Client()
+ case 1:
+ switch o := opts[0].(type) {
+ case _gen_veyron2.Runtime:
+ client = o.Client()
+ case _gen_ipc.Client:
+ client = o
+ default:
+ return nil, _gen_vdl.ErrUnrecognizedOption
+ }
+ default:
+ return nil, _gen_vdl.ErrTooManyOptionsToBind
+ }
+ stub := &clientStubCallbackReceiver{client: client, name: name}
+
+ return stub, nil
+}
+
+// NewServerCallbackReceiver creates a new server stub.
+//
+// It takes a regular server implementing the CallbackReceiverService
+// interface, and returns a new server stub.
+func NewServerCallbackReceiver(server CallbackReceiverService) interface{} {
+ return &ServerStubCallbackReceiver{
+ service: server,
+ }
+}
+
+// clientStubCallbackReceiver implements CallbackReceiver.
+type clientStubCallbackReceiver struct {
+ client _gen_ipc.Client
+ name string
+}
+
+func (__gen_c *clientStubCallbackReceiver) Callback(ctx _gen_context.T, name string, opts ..._gen_ipc.CallOpt) (err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Callback", []interface{}{name}, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (__gen_c *clientStubCallbackReceiver) UnresolveStep(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply []string, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "UnresolveStep", nil, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (__gen_c *clientStubCallbackReceiver) Signature(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply _gen_ipc.ServiceSignature, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Signature", nil, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (__gen_c *clientStubCallbackReceiver) GetMethodTags(ctx _gen_context.T, method string, opts ..._gen_ipc.CallOpt) (reply []interface{}, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "GetMethodTags", []interface{}{method}, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+// ServerStubCallbackReceiver wraps a server that implements
+// CallbackReceiverService and provides an object that satisfies
+// the requirements of veyron2/ipc.ReflectInvoker.
+type ServerStubCallbackReceiver struct {
+ service CallbackReceiverService
+}
+
+func (__gen_s *ServerStubCallbackReceiver) GetMethodTags(call _gen_ipc.ServerCall, method string) ([]interface{}, error) {
+ // TODO(bprosnitz) GetMethodTags() will be replaces with Signature().
+ // Note: This exhibits some weird behavior like returning a nil error if the method isn't found.
+ // This will change when it is replaced with Signature().
+ switch method {
+ case "Callback":
+ return []interface{}{}, nil
+ default:
+ return nil, nil
+ }
+}
+
+func (__gen_s *ServerStubCallbackReceiver) Signature(call _gen_ipc.ServerCall) (_gen_ipc.ServiceSignature, error) {
+ result := _gen_ipc.ServiceSignature{Methods: make(map[string]_gen_ipc.MethodSignature)}
+ result.Methods["Callback"] = _gen_ipc.MethodSignature{
+ InArgs: []_gen_ipc.MethodArgument{
+ {Name: "name", Type: 3},
+ },
+ OutArgs: []_gen_ipc.MethodArgument{
+ {Name: "", Type: 65},
+ },
+ }
+
+ result.TypeDefs = []_gen_vdl.Any{
+ _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}}
+
+ return result, nil
+}
+
+func (__gen_s *ServerStubCallbackReceiver) UnresolveStep(call _gen_ipc.ServerCall) (reply []string, err error) {
+ if unresolver, ok := __gen_s.service.(_gen_ipc.Unresolver); ok {
+ return unresolver.UnresolveStep(call)
+ }
+ if call.Server() == nil {
+ return
+ }
+ var published []string
+ if published, err = call.Server().Published(); err != nil || published == nil {
+ return
+ }
+ reply = make([]string, len(published))
+ for i, p := range published {
+ reply[i] = _gen_naming.Join(p, call.Name())
+ }
+ return
+}
+
+func (__gen_s *ServerStubCallbackReceiver) Callback(call _gen_ipc.ServerCall, name string) (err error) {
+ err = __gen_s.service.Callback(call, name)
+ return
+}
+
+// Node describes a node manager internally. In addition to the public
+// Node interface, it implements the callback functionality.
+// Node is the interface the client binds and uses.
+// Node_ExcludingUniversal is the interface without internal framework-added methods
+// to enable embedding without method collisions. Not to be used directly by clients.
+type Node_ExcludingUniversal interface {
+ // Node can be used to manage a node. The idea is that this interace
+ // will be invoked using a veyron name that identifies the node.
+ node.Node_ExcludingUniversal
+ // CallbackReceiver can receive callbacks from previously spawned
+ // processes.
+ CallbackReceiver_ExcludingUniversal
+}
+type Node interface {
+ _gen_ipc.UniversalServiceMethods
+ Node_ExcludingUniversal
+}
+
+// NodeService is the interface the server implements.
+type NodeService interface {
+
+ // Node can be used to manage a node. The idea is that this interace
+ // will be invoked using a veyron name that identifies the node.
+ node.NodeService
+ // CallbackReceiver can receive callbacks from previously spawned
+ // processes.
+ CallbackReceiverService
+}
+
+// BindNode returns the client stub implementing the Node
+// interface.
+//
+// If no _gen_ipc.Client is specified, the default _gen_ipc.Client in the
+// global Runtime is used.
+func BindNode(name string, opts ..._gen_ipc.BindOpt) (Node, error) {
+ var client _gen_ipc.Client
+ switch len(opts) {
+ case 0:
+ client = _gen_rt.R().Client()
+ case 1:
+ switch o := opts[0].(type) {
+ case _gen_veyron2.Runtime:
+ client = o.Client()
+ case _gen_ipc.Client:
+ client = o
+ default:
+ return nil, _gen_vdl.ErrUnrecognizedOption
+ }
+ default:
+ return nil, _gen_vdl.ErrTooManyOptionsToBind
+ }
+ stub := &clientStubNode{client: client, name: name}
+ stub.Node_ExcludingUniversal, _ = node.BindNode(name, client)
+ stub.CallbackReceiver_ExcludingUniversal, _ = BindCallbackReceiver(name, client)
+
+ return stub, nil
+}
+
+// NewServerNode creates a new server stub.
+//
+// It takes a regular server implementing the NodeService
+// interface, and returns a new server stub.
+func NewServerNode(server NodeService) interface{} {
+ return &ServerStubNode{
+ ServerStubNode: *node.NewServerNode(server).(*node.ServerStubNode),
+ ServerStubCallbackReceiver: *NewServerCallbackReceiver(server).(*ServerStubCallbackReceiver),
+ service: server,
+ }
+}
+
+// clientStubNode implements Node.
+type clientStubNode struct {
+ node.Node_ExcludingUniversal
+ CallbackReceiver_ExcludingUniversal
+
+ client _gen_ipc.Client
+ name string
+}
+
+func (__gen_c *clientStubNode) UnresolveStep(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply []string, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "UnresolveStep", nil, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (__gen_c *clientStubNode) Signature(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply _gen_ipc.ServiceSignature, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Signature", nil, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (__gen_c *clientStubNode) GetMethodTags(ctx _gen_context.T, method string, opts ..._gen_ipc.CallOpt) (reply []interface{}, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "GetMethodTags", []interface{}{method}, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+// ServerStubNode wraps a server that implements
+// NodeService and provides an object that satisfies
+// the requirements of veyron2/ipc.ReflectInvoker.
+type ServerStubNode struct {
+ node.ServerStubNode
+ ServerStubCallbackReceiver
+
+ service NodeService
+}
+
+func (__gen_s *ServerStubNode) GetMethodTags(call _gen_ipc.ServerCall, method string) ([]interface{}, error) {
+ // TODO(bprosnitz) GetMethodTags() will be replaces with Signature().
+ // Note: This exhibits some weird behavior like returning a nil error if the method isn't found.
+ // This will change when it is replaced with Signature().
+ if resp, err := __gen_s.ServerStubNode.GetMethodTags(call, method); resp != nil || err != nil {
+ return resp, err
+ }
+ if resp, err := __gen_s.ServerStubCallbackReceiver.GetMethodTags(call, method); resp != nil || err != nil {
+ return resp, err
+ }
+ return nil, nil
+}
+
+func (__gen_s *ServerStubNode) Signature(call _gen_ipc.ServerCall) (_gen_ipc.ServiceSignature, error) {
+ result := _gen_ipc.ServiceSignature{Methods: make(map[string]_gen_ipc.MethodSignature)}
+
+ result.TypeDefs = []_gen_vdl.Any{}
+ var ss _gen_ipc.ServiceSignature
+ var firstAdded int
+ ss, _ = __gen_s.ServerStubNode.Signature(call)
+ firstAdded = len(result.TypeDefs)
+ for k, v := range ss.Methods {
+ for i, _ := range v.InArgs {
+ if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ for i, _ := range v.OutArgs {
+ if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ if v.InStream >= _gen_wiretype.TypeIDFirst {
+ v.InStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ if v.OutStream >= _gen_wiretype.TypeIDFirst {
+ v.OutStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ result.Methods[k] = v
+ }
+ //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+ for _, d := range ss.TypeDefs {
+ switch wt := d.(type) {
+ case _gen_wiretype.SliceType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.ArrayType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.MapType:
+ if wt.Key >= _gen_wiretype.TypeIDFirst {
+ wt.Key += _gen_wiretype.TypeID(firstAdded)
+ }
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.StructType:
+ for _, fld := range wt.Fields {
+ if fld.Type >= _gen_wiretype.TypeIDFirst {
+ fld.Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ d = wt
+ }
+ result.TypeDefs = append(result.TypeDefs, d)
+ }
+ ss, _ = __gen_s.ServerStubCallbackReceiver.Signature(call)
+ firstAdded = len(result.TypeDefs)
+ for k, v := range ss.Methods {
+ for i, _ := range v.InArgs {
+ if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ for i, _ := range v.OutArgs {
+ if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ if v.InStream >= _gen_wiretype.TypeIDFirst {
+ v.InStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ if v.OutStream >= _gen_wiretype.TypeIDFirst {
+ v.OutStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ result.Methods[k] = v
+ }
+ //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+ for _, d := range ss.TypeDefs {
+ switch wt := d.(type) {
+ case _gen_wiretype.SliceType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.ArrayType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.MapType:
+ if wt.Key >= _gen_wiretype.TypeIDFirst {
+ wt.Key += _gen_wiretype.TypeID(firstAdded)
+ }
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.StructType:
+ for _, fld := range wt.Fields {
+ if fld.Type >= _gen_wiretype.TypeIDFirst {
+ fld.Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ d = wt
+ }
+ result.TypeDefs = append(result.TypeDefs, d)
+ }
+
+ return result, nil
+}
+
+func (__gen_s *ServerStubNode) UnresolveStep(call _gen_ipc.ServerCall) (reply []string, err error) {
+ if unresolver, ok := __gen_s.service.(_gen_ipc.Unresolver); ok {
+ return unresolver.UnresolveStep(call)
+ }
+ if call.Server() == nil {
+ return
+ }
+ var published []string
+ if published, err = call.Server().Published(); err != nil || published == nil {
+ return
+ }
+ reply = make([]string, len(published))
+ for i, p := range published {
+ reply[i] = _gen_naming.Join(p, call.Name())
+ }
+ return
+}
diff --git a/services/mgmt/node/noded/main.go b/services/mgmt/node/noded/main.go
index 3d878a3..dad9f4e 100644
--- a/services/mgmt/node/noded/main.go
+++ b/services/mgmt/node/noded/main.go
@@ -2,11 +2,15 @@
import (
"flag"
+ "os"
+ "veyron/lib/exec"
"veyron/lib/signals"
vflag "veyron/security/flag"
+ "veyron/services/mgmt/node"
"veyron/services/mgmt/node/impl"
+ "veyron2/naming"
"veyron2/rt"
"veyron2/services/mgmt/application"
"veyron2/vlog"
@@ -14,14 +18,13 @@
func main() {
// TODO(rthellend): Remove the address and protocol flags when the config manager is working.
- var address, protocol, name, origin string
+ var address, protocol, publishAs string
flag.StringVar(&address, "address", "localhost:0", "network address to listen on")
- flag.StringVar(&name, "name", "", "name to publish the node manager at")
+ flag.StringVar(&publishAs, "name", "", "name to publish the node manager at")
flag.StringVar(&protocol, "protocol", "tcp", "network type to listen on")
- flag.StringVar(&origin, "origin", "", "node manager application repository")
flag.Parse()
- if origin == "" {
- vlog.Fatalf("Specify an origin using --origin=<name>")
+ if os.Getenv(impl.ORIGIN_ENV) == "" {
+ vlog.Fatalf("Specify the node manager origin as environment variable %s=<name>", impl.ORIGIN_ENV)
}
runtime := rt.Init()
defer runtime.Shutdown()
@@ -30,20 +33,37 @@
vlog.Fatalf("NewServer() failed: %v", err)
}
defer server.Stop()
- envelope := &application.Envelope{}
- dispatcher := impl.NewDispatcher(envelope, origin, vflag.NewAuthorizerOrDie())
- suffix := ""
- if err := server.Register(suffix, dispatcher); err != nil {
- vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
- }
endpoint, err := server.Listen(protocol, address)
if err != nil {
vlog.Fatalf("Listen(%v, %v) failed: %v", protocol, address, err)
}
- vlog.VI(0).Infof("Listening on %v", endpoint)
- if len(name) > 0 {
- if err := server.Publish(name); err != nil {
- vlog.Fatalf("Publish(%v) failed: %v", name, err)
+ envelope := &application.Envelope{}
+ suffix, crSuffix, arSuffix := "", "cr", "ar"
+ name := naming.MakeTerminal(naming.JoinAddressName(endpoint.String(), suffix))
+ vlog.VI(0).Infof("Node manager name: %v", name)
+ dispatcher, crDispatcher, arDispatcher := impl.NewDispatchers(vflag.NewAuthorizerOrDie(), envelope, name)
+ if err := server.Register(suffix, dispatcher); err != nil {
+ vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
+ }
+ if err := server.Register(crSuffix, crDispatcher); err != nil {
+ vlog.Fatalf("Register(%v, %v) failed: %v", crSuffix, crDispatcher, err)
+ }
+ if err := server.Register(arSuffix, arDispatcher); err != nil {
+ vlog.Fatalf("Register(%v, %v) failed: %v", arSuffix, arDispatcher, err)
+ }
+ if len(publishAs) > 0 {
+ if err := server.Publish(publishAs); err != nil {
+ vlog.Fatalf("Publish(%v) failed: %v", publishAs, err)
+ }
+ }
+ handle, err := exec.GetChildHandle()
+ if handle != nil && handle.CallbackName != "" {
+ nmClient, err := node.BindCallbackReceiver(handle.CallbackName)
+ if err != nil {
+ vlog.Fatalf("BindNode(%v) failed: %v", handle.CallbackName, err)
+ }
+ if err := nmClient.Callback(rt.R().NewContext(), name); err != nil {
+ vlog.Fatalf("Callback(%v) failed: %v", name, err)
}
}
// Wait until shutdown.