veyron/services/mgmt: implementing Revert() for the node manager
This CL:
- implements Revert() for the node manager
- simplifies the logic used for testing a new version of a node manager
- renames content.Content to content.Repository
- renames application.Shutdown to application.Stop
- renames exec.SetEnv and exec.GetEnv to exec.Setenv and exec.Getenv
respectively
Change-Id: If58521eec37bc12c8f2de9ad89d1d571e9c51b33
diff --git a/lib/exec/util.go b/lib/exec/util.go
index f19342c..8ecbeb2 100644
--- a/lib/exec/util.go
+++ b/lib/exec/util.go
@@ -5,9 +5,20 @@
"strings"
)
-// SetEnv updates / adds the value assignment for the given variable
+// Getenv retrieves the value of the given variable from the given
+// slice of environment variable assignments.
+func Getenv(env []string, name string) (string, error) {
+ for _, v := range env {
+ if strings.HasPrefix(v, name+"=") {
+ return strings.TrimPrefix(v, name+"="), nil
+ }
+ }
+ return "", errors.New("not found")
+}
+
+// Setenv updates / adds the value assignment for the given variable
// in the given slice of environment variable assigments.
-func SetEnv(env []string, name, value string) []string {
+func Setenv(env []string, name, value string) []string {
newValue := name + "=" + value
for i, v := range env {
if strings.HasPrefix(v, name+"=") {
@@ -17,14 +28,3 @@
}
return append(env, newValue)
}
-
-// GetEnv retrieves the value of the given variable from the given
-// slice of environment variable assignments.
-func GetEnv(env []string, name string) (string, error) {
- for _, v := range env {
- if strings.HasPrefix(v, name+"=") {
- return strings.TrimPrefix(v, name+"="), nil
- }
- }
- return "", errors.New("not found")
-}
diff --git a/lib/exec/util_test.go b/lib/exec/util_test.go
index 0f6e14b..a247524 100644
--- a/lib/exec/util_test.go
+++ b/lib/exec/util_test.go
@@ -6,28 +6,28 @@
func TestEnv(t *testing.T) {
env := make([]string, 0)
- env = SetEnv(env, "NAME", "VALUE1")
+ env = Setenv(env, "NAME", "VALUE1")
if expected, got := 1, len(env); expected != got {
t.Fatalf("Unexpected length of environment variable slice: expected %d, got %d", expected, got)
}
if expected, got := "NAME=VALUE1", env[0]; expected != got {
t.Fatalf("Unexpected element in the environment variable slice: expected %d, got %d", expected, got)
}
- env = SetEnv(env, "NAME", "VALUE2")
+ env = Setenv(env, "NAME", "VALUE2")
if expected, got := 1, len(env); expected != got {
t.Fatalf("Unexpected length of environment variable slice: expected %d, got %d", expected, got)
}
if expected, got := "NAME=VALUE2", env[0]; expected != got {
t.Fatalf("Unexpected element in the environment variable slice: expected %d, got %d", expected, got)
}
- value, err := GetEnv(env, "NAME")
+ value, err := Getenv(env, "NAME")
if err != nil {
t.Fatalf("Unexpected error when looking up environment variable value: %v", err)
}
if expected, got := "VALUE2", value; expected != got {
t.Fatalf("Unexpected value of an environment variable: expected %d, got %d", expected, got)
}
- value, err = GetEnv(env, "NONAME")
+ value, err = Getenv(env, "NONAME")
if err == nil {
t.Fatalf("Expected error when looking up environment variable value, got none", value)
}
diff --git a/services/mgmt/content/impl/dispatcher.go b/services/mgmt/content/impl/dispatcher.go
index 35ad7ff..8ba4074 100644
--- a/services/mgmt/content/impl/dispatcher.go
+++ b/services/mgmt/content/impl/dispatcher.go
@@ -24,6 +24,6 @@
// DISPATCHER INTERFACE IMPLEMENTATION
func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(content.NewServerContent(newInvoker(d.root, d.depth, &d.fs, suffix)))
+ invoker := ipc.ReflectInvoker(content.NewServerRepository(newInvoker(d.root, d.depth, &d.fs, suffix)))
return invoker, d.auth, nil
}
diff --git a/services/mgmt/content/impl/impl_test.go b/services/mgmt/content/impl/impl_test.go
index ee4d7a2..997b373 100644
--- a/services/mgmt/content/impl/impl_test.go
+++ b/services/mgmt/content/impl/impl_test.go
@@ -25,7 +25,7 @@
// invokeUpload invokes the Upload RPC using the given client stub
// <stub> and streams the given content <content> to it.
-func invokeUpload(t *testing.T, stub content.Content, content []byte) (string, error) {
+func invokeUpload(t *testing.T, stub content.Repository, content []byte) (string, error) {
stream, err := stub.Upload(rt.R().NewContext())
if err != nil {
return "", err
@@ -47,7 +47,7 @@
// invokeDownload invokes the Download RPC using the given client stub
// <stub> and streams content from to it.
-func invokeDownload(t *testing.T, stub content.Content) ([]byte, error) {
+func invokeDownload(t *testing.T, stub content.Repository) ([]byte, error) {
stream, err := stub.Download(rt.R().NewContext())
if err != nil {
return nil, err
@@ -62,7 +62,7 @@
// invokeDelete invokes the Delete RPC using the given client stub
// <stub>.
-func invokeDelete(t *testing.T, stub content.Content) error {
+func invokeDelete(t *testing.T, stub content.Repository) error {
return stub.Delete(rt.R().NewContext())
}
@@ -92,9 +92,9 @@
// Create client stubs for talking to the server.
name := naming.JoinAddressName(endpoint.String(), "//")
- stub, err := content.BindContent(name)
+ stub, err := content.BindRepository(name)
if err != nil {
- t.Fatalf("BindContent() failed: %v", err)
+ t.Fatalf("BindRepository() failed: %v", err)
}
// Upload
@@ -110,9 +110,9 @@
}
// Download
- stub, err = content.BindContent(naming.Join(name, checksum))
+ stub, err = content.BindRepository(naming.Join(name, checksum))
if err != nil {
- t.Fatalf("BindContent() failed: %v", err)
+ t.Fatalf("BindRepository() failed: %v", err)
}
output, err := invokeDownload(t, stub)
if err != nil {
diff --git a/services/mgmt/content/impl/invoker.go b/services/mgmt/content/impl/invoker.go
index f39f644..56775e3 100644
--- a/services/mgmt/content/impl/invoker.go
+++ b/services/mgmt/content/impl/invoker.go
@@ -116,7 +116,7 @@
return nil
}
-func (i *invoker) Download(context ipc.ServerContext, stream content.ContentServiceDownloadStream) error {
+func (i *invoker) Download(context ipc.ServerContext, stream content.RepositoryServiceDownloadStream) error {
vlog.VI(0).Infof("%v.Download()", i.suffix)
if !isValid(i.suffix) {
return errInvalidSuffix
@@ -147,7 +147,7 @@
return nil
}
-func (i *invoker) Upload(context ipc.ServerContext, stream content.ContentServiceUploadStream) (string, error) {
+func (i *invoker) Upload(context ipc.ServerContext, stream content.RepositoryServiceUploadStream) (string, error) {
vlog.VI(0).Infof("%v.Upload()", i.suffix)
if i.suffix != "" {
return "", errInvalidSuffix
diff --git a/services/mgmt/node/impl/const.go b/services/mgmt/node/impl/const.go
index 43d30fc..081a7ed 100644
--- a/services/mgmt/node/impl/const.go
+++ b/services/mgmt/node/impl/const.go
@@ -2,7 +2,7 @@
// Node manager environment variables.
const (
- ORIGIN_ENV = "VEYRON_NM_ORIGIN"
- ROOT_ENV = "VEYRON_NM_ROOT"
- TEST_UPDATE_ENV = "VEYRON_NM_TEST_UPDATE"
+ PREVIOUS_ENV = "VEYRON_NM_PREVIOUS"
+ ORIGIN_ENV = "VEYRON_NM_ORIGIN"
+ ROOT_ENV = "VEYRON_NM_ROOT"
)
diff --git a/services/mgmt/node/impl/dispatcher.go b/services/mgmt/node/impl/dispatcher.go
index a787d54..19147f6 100644
--- a/services/mgmt/node/impl/dispatcher.go
+++ b/services/mgmt/node/impl/dispatcher.go
@@ -8,7 +8,6 @@
"veyron2/ipc"
"veyron2/security"
"veyron2/services/mgmt/application"
- "veyron2/services/mgmt/content"
)
// dispatcher holds the state of the node manager dispatcher.
@@ -17,40 +16,20 @@
state *state
}
-// crDispatcher holds the state of the node manager content repository
-// dispatcher.
-type crDispatcher struct {
- auth security.Authorizer
-}
-
-// arDispatcher holds the state of the node manager application
-// repository dispatcher.
-type arDispatcher struct {
- auth security.Authorizer
- state *arState
-}
-
// NewDispatcher is the node manager dispatcher factory.
-func NewDispatchers(auth security.Authorizer, envelope *application.Envelope, name string) (*dispatcher, *crDispatcher, *arDispatcher) {
+func NewDispatcher(auth security.Authorizer, envelope *application.Envelope, name, previous string) *dispatcher {
return &dispatcher{
- auth: auth,
- state: &state{
- channels: make(map[string]chan string),
- channelsMutex: new(sync.Mutex),
- envelope: envelope,
- name: name,
- updating: false,
- updatingMutex: new(sync.Mutex),
- },
- }, &crDispatcher{
- auth: auth,
- }, &arDispatcher{
- auth: auth,
- state: &arState{
- envelope: envelope,
- name: name,
- },
- }
+ auth: auth,
+ state: &state{
+ channels: make(map[string]chan string),
+ channelsMutex: new(sync.Mutex),
+ envelope: envelope,
+ name: name,
+ previous: previous,
+ updating: false,
+ updatingMutex: new(sync.Mutex),
+ },
+ }
}
// DISPATCHER INTERFACE IMPLEMENTATION
@@ -58,15 +37,3 @@
func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
return ipc.ReflectInvoker(node.NewServerNode(NewInvoker(d.state, suffix))), d.auth, nil
}
-
-// DISPATCHER INTERFACE IMPLEMENTATION
-
-func (d *crDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- return ipc.ReflectInvoker(content.NewServerContent(NewCRInvoker(suffix))), d.auth, nil
-}
-
-// DISPATCHER INTERFACE IMPLEMENTATION
-
-func (d *arDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- return ipc.ReflectInvoker(application.NewServerRepository(NewARInvoker(d.state, suffix))), d.auth, nil
-}
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index 7ca6f3f..4651016 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -1,29 +1,151 @@
-package impl
+package impl_test
import (
+ "errors"
"fmt"
+ "io"
+ "io/ioutil"
"os"
+ "path/filepath"
+ "regexp"
+ "strings"
"syscall"
"testing"
+ "time"
"veyron/lib/exec"
"veyron/lib/signals"
"veyron/lib/testutil"
"veyron/lib/testutil/blackbox"
"veyron/services/mgmt/node"
+ "veyron/services/mgmt/node/impl"
mtlib "veyron/services/mounttable/lib"
"veyron2"
+ "veyron2/ipc"
"veyron2/naming"
"veyron2/rt"
"veyron2/services/mgmt/application"
+ "veyron2/services/mgmt/content"
"veyron2/vlog"
)
+const (
+ TEST_ENV = "VEYRON_NM_TEST"
+)
+
+var (
+ errOperationFailed = errors.New("operation failed")
+)
+
func init() {
blackbox.CommandTable["nodeManager"] = nodeManager
}
+// arInvoker holds the state of an application repository invocation
+// mock.
+type arInvoker struct{}
+
+// APPLICATION REPOSITORY INTERFACE IMPLEMENTATION
+
+func (i *arInvoker) Match(ipc.ServerContext, []string) (application.Envelope, error) {
+ vlog.VI(0).Infof("Match()")
+ envelope := generateEnvelope()
+ envelope.Env = exec.Setenv(envelope.Env, TEST_ENV, "child")
+ envelope.Binary = "cr"
+ return *envelope, nil
+}
+
+// crInvoker holds the state of a content repository invocation mock.
+type crInvoker struct{}
+
+// CONTENT REPOSITORY INTERFACE IMPLEMENTATION
+
+func (i *crInvoker) Delete(ipc.ServerContext) error {
+ vlog.VI(0).Infof("Delete()")
+ return nil
+}
+
+func (i *crInvoker) Download(_ ipc.ServerContext, stream content.RepositoryServiceDownloadStream) error {
+ vlog.VI(0).Infof("Download()")
+ file, err := os.Open(os.Args[0])
+ if err != nil {
+ vlog.Errorf("Open() failed: %v", err)
+ return errOperationFailed
+ }
+ defer file.Close()
+ bufferLength := 4096
+ buffer := make([]byte, bufferLength)
+ for {
+ n, err := file.Read(buffer)
+ switch err {
+ case io.EOF:
+ return nil
+ case nil:
+ if err := stream.Send(buffer[:n]); err != nil {
+ vlog.Errorf("Send() failed: %v", err)
+ return errOperationFailed
+ }
+ default:
+ vlog.Errorf("Read() failed: %v", err)
+ return errOperationFailed
+ }
+ }
+}
+
+func (i *crInvoker) Upload(ipc.ServerContext, content.RepositoryServiceUploadStream) (string, error) {
+ vlog.VI(0).Infof("Upload()")
+ return "", nil
+}
+
+func generateBinary(workspace string) string {
+ path := filepath.Join(workspace, "noded")
+ if err := os.Link(os.Args[0], path); err != nil {
+ vlog.Fatalf("Link(%v, %v) failed: %v", os.Args[0], path, err)
+ }
+ return path
+}
+
+func generateEnvelope() *application.Envelope {
+ envelope := &application.Envelope{}
+ envelope.Args = os.Args[1:]
+ re := regexp.MustCompile("^([^=]*)=(.*)$")
+ for _, env := range os.Environ() {
+ envelope.Env = append(envelope.Env, re.ReplaceAllString(env, "$1=\"$2\""))
+ }
+ return envelope
+}
+
+func generateLink(root, workspace string) {
+ link := filepath.Join(root, impl.CURRENT)
+ newLink := link + ".new"
+ fi, err := os.Lstat(newLink)
+ if err == nil {
+ if err := os.Remove(fi.Name()); err != nil {
+ vlog.Fatalf("Remove(%v) failed: %v", fi.Name(), err)
+ }
+ }
+ if err := os.Symlink(workspace, newLink); err != nil {
+ vlog.Fatalf("Symlink(%v, %v) failed: %v", workspace, newLink, err)
+ }
+ if err := os.Rename(newLink, link); err != nil {
+ vlog.Fatalf("Rename(%v, %v) failed: %v", newLink, link, err)
+ }
+}
+
+func generateScript(workspace, binary string) string {
+ envelope := generateEnvelope()
+ envelope.Env = exec.Setenv(envelope.Env, TEST_ENV, "parent")
+ output := "#!/bin/bash\n"
+ output += strings.Join(envelope.Env, " ") + " "
+ output += binary + " " + strings.Join(envelope.Args, " ")
+ path := filepath.Join(workspace, "noded.sh")
+ if err := ioutil.WriteFile(path, []byte(output), 0755); err != nil {
+ vlog.Fatalf("WriteFile(%v) failed: %v", path, err)
+ }
+ return path
+}
+
func invokeCallback(name string) {
handle, err := exec.GetChildHandle()
switch err {
@@ -48,62 +170,129 @@
}
}
-func invokeUpdate(t *testing.T, nmAddress string) {
- address := naming.JoinAddressName(nmAddress, "nm")
- nmClient, err := node.BindNode(address)
+func invokeUpdate(t *testing.T, name string) {
+ address := naming.JoinAddressName(name, "nm")
+ stub, err := node.BindNode(address)
if err != nil {
t.Fatalf("BindNode(%v) failed: %v", address, err)
}
- if err := nmClient.Update(rt.R().NewContext()); err != nil {
- t.Fatalf("%v.Update() failed: %v", address, err)
+ if err := stub.Update(rt.R().NewContext()); err != nil {
+ t.Fatalf("Update() failed: %v", err)
}
}
-// nodeManager is an enclosure for the node manager blackbox process.
+// nodeManager is an enclosure for setting up and starting the parent
+// and child node manager used by the TestUpdate() method.
func nodeManager(argv []string) {
- // The node manager program logic assumes that its binary is
- // executed through a symlink. To meet this assumption, the first
- // time this binary is started it creates a symlink to itself and
- // restarts itself using this symlink.
- fi, err := os.Lstat(os.Args[0])
- if err != nil {
- vlog.Fatalf("Lstat(%v) failed: %v", os.Args[0], err)
- }
- if fi.Mode()&os.ModeSymlink != os.ModeSymlink && os.Getenv(TEST_UPDATE_ENV) == "" {
- symlink := os.Args[0] + ".symlink"
- if err := os.Symlink(os.Args[0], symlink); err != nil {
- vlog.Fatalf("Symlink(%v, %v) failed: %v", os.Args[0], symlink, err)
+ root := os.Getenv(impl.ROOT_ENV)
+ switch os.Getenv(TEST_ENV) {
+ case "setup":
+ workspace := filepath.Join(root, fmt.Sprintf("%v", time.Now().Format(time.RFC3339Nano)))
+ perm := os.FileMode(0755)
+ if err := os.MkdirAll(workspace, perm); err != nil {
+ vlog.Fatalf("MkdirAll(%v, %v) failed: %v", workspace, perm, err)
}
- argv := append([]string{symlink}, os.Args[1:]...)
- envv := os.Environ()
- if err := syscall.Exec(symlink, argv, envv); err != nil {
- vlog.Fatalf("Exec(%v, %v, %v) failed: %v", symlink, argv, envv, err)
+ binary := generateBinary(workspace)
+ script := generateScript(workspace, binary)
+ generateLink(root, workspace)
+ argv, envv := []string{}, []string{}
+ if err := syscall.Exec(script, argv, envv); err != nil {
+ vlog.Fatalf("Exec(%v, %v, %v) failed: %v", script, argv, envv, err)
}
- } else {
+ case "parent":
runtime := rt.Init()
defer runtime.Shutdown()
- address, nmCleanup := startNodeManager(runtime, argv[0], argv[1])
+ // Set up a mock content repository, a mock application repository, and a node manager.
+ _, crCleanup := startContentRepository()
+ defer crCleanup()
+ _, arCleanup := startApplicationRepository()
+ defer arCleanup()
+ _, nmCleanup := startNodeManager()
defer nmCleanup()
- invokeCallback(address)
+ // Wait until shutdown.
+ <-signals.ShutdownOnSignals()
+ blackbox.WaitForEOFOnStdin()
+ case "child":
+ runtime := rt.Init()
+ defer runtime.Shutdown()
+ // Set up a node manager.
+ name, nmCleanup := startNodeManager()
+ defer nmCleanup()
+ invokeCallback(name)
// Wait until shutdown.
<-signals.ShutdownOnSignals()
blackbox.WaitForEOFOnStdin()
}
}
-func spawnNodeManager(t *testing.T, mtAddress string, idFile string) *blackbox.Child {
- child := blackbox.HelperCommand(t, "nodeManager", mtAddress, idFile)
- child.Cmd.Env = exec.SetEnv(child.Cmd.Env, "MOUNTTABLE_ROOT", mtAddress)
- child.Cmd.Env = exec.SetEnv(child.Cmd.Env, "VEYRON_IDENTITY", idFile)
- child.Cmd.Env = exec.SetEnv(child.Cmd.Env, "VEYRON_NM_ROOT", os.TempDir())
+func spawnNodeManager(t *testing.T, mtName string, idFile string) *blackbox.Child {
+ root := filepath.Join(os.TempDir(), "noded")
+ child := blackbox.HelperCommand(t, "nodeManager")
+ child.Cmd.Env = exec.Setenv(child.Cmd.Env, "MOUNTTABLE_ROOT", mtName)
+ child.Cmd.Env = exec.Setenv(child.Cmd.Env, "VEYRON_IDENTITY", idFile)
+ child.Cmd.Env = exec.Setenv(child.Cmd.Env, impl.ORIGIN_ENV, "ar")
+ child.Cmd.Env = exec.Setenv(child.Cmd.Env, impl.ROOT_ENV, root)
+ child.Cmd.Env = exec.Setenv(child.Cmd.Env, TEST_ENV, "setup")
if err := child.Cmd.Start(); err != nil {
t.Fatalf("Start() failed: %v", err)
}
return child
}
-func startMountTable(t *testing.T, runtime veyron2.Runtime) (string, func()) {
- server, err := runtime.NewServer(veyron2.ServesMountTableOpt(true))
+func startApplicationRepository() (string, func()) {
+ server, err := rt.R().NewServer()
+ if err != nil {
+ vlog.Fatalf("NewServer() failed: %v", err)
+ }
+ suffix, dispatcher := "", ipc.SoloDispatcher(application.NewServerRepository(&arInvoker{}), nil)
+ if err := server.Register(suffix, dispatcher); err != nil {
+ vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
+ }
+ protocol, hostname := "tcp", "localhost:0"
+ endpoint, err := server.Listen(protocol, hostname)
+ if err != nil {
+ vlog.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
+ }
+ vlog.VI(1).Infof("Application repository running at endpoint: %s", endpoint)
+ name := "ar"
+ if err := server.Publish(name); err != nil {
+ vlog.Fatalf("Publish(%v) failed: %v", name, err)
+ }
+ return name, func() {
+ if err := server.Stop(); err != nil {
+ vlog.Fatalf("Stop() failed: %v", err)
+ }
+ }
+}
+
+func startContentRepository() (string, func()) {
+ server, err := rt.R().NewServer()
+ if err != nil {
+ vlog.Fatalf("NewServer() failed: %v", err)
+ }
+ suffix, dispatcher := "", ipc.SoloDispatcher(content.NewServerRepository(&crInvoker{}), nil)
+ if err := server.Register(suffix, dispatcher); err != nil {
+ vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
+ }
+ protocol, hostname := "tcp", "localhost:0"
+ endpoint, err := server.Listen(protocol, hostname)
+ if err != nil {
+ vlog.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
+ }
+ vlog.VI(1).Infof("Content repository running at endpoint: %s", endpoint)
+ name := "cr"
+ if err := server.Publish(name); err != nil {
+ vlog.Fatalf("Publish(%v) failed: %v", name, err)
+ }
+ return name, func() {
+ if err := server.Stop(); err != nil {
+ vlog.Fatalf("Stop() failed: %v", err)
+ }
+ }
+}
+
+func startMountTable(t *testing.T) (string, func()) {
+ server, err := rt.R().NewServer(veyron2.ServesMountTableOpt(true))
if err != nil {
t.Fatalf("NewServer() failed: %v", err)
}
@@ -129,8 +318,8 @@
}
}
-func startNodeManager(runtime veyron2.Runtime, mtAddress, idFile string) (string, func()) {
- server, err := runtime.NewServer()
+func startNodeManager() (string, func()) {
+ server, err := rt.R().NewServer()
if err != nil {
vlog.Fatalf("NewServer() failed: %v", err)
}
@@ -139,29 +328,19 @@
if err != nil {
vlog.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
}
- envelope := &application.Envelope{}
- envelope.Args = make([]string, 0)
- envelope.Args = os.Args[1:]
- envelope.Env = make([]string, 0)
- envelope.Env = append(envelope.Env, os.Environ()...)
- suffix, crSuffix, arSuffix := "", "cr", "ar"
+ suffix, envelope := "", &application.Envelope{}
name := naming.MakeTerminal(naming.JoinAddressName(endpoint.String(), suffix))
- vlog.VI(1).Infof("Node manager name: %v", name)
- dispatcher, crDispatcher, arDispatcher := NewDispatchers(nil, envelope, name)
+ vlog.VI(0).Infof("Node manager name: %v", name)
+ // TODO(jsimsa): Replace PREVIOUS_ENV with a command-line flag when
+ // command-line flags in tests are supported.
+ dispatcher := impl.NewDispatcher(nil, envelope, name, os.Getenv(impl.PREVIOUS_ENV))
if err := server.Register(suffix, dispatcher); err != nil {
vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
}
- if err := server.Register(crSuffix, crDispatcher); err != nil {
- vlog.Fatalf("Register(%v, %v) failed: %v", crSuffix, crDispatcher, err)
- }
- if err := server.Register(arSuffix, arDispatcher); err != nil {
- vlog.Fatalf("Register(%v, %v) failed: %v", arSuffix, arDispatcher, err)
- }
publishAs := "nm"
if err := server.Publish(publishAs); err != nil {
vlog.Fatalf("Publish(%v) failed: %v", publishAs, err)
}
- os.Setenv(ORIGIN_ENV, naming.JoinAddressName(name, "ar"))
fmt.Printf("ready\n")
return name, func() {
if err := server.Stop(); err != nil {
@@ -174,18 +353,44 @@
blackbox.HelperProcess(t)
}
+// TestUpdate checks that the node manager Update() method works as
+// expected. To that end, this test spawns a new process that invokes
+// the nodeManager() method. The behavior of this method depends on
+// the value of the VEYRON_NM_TEST environment variable:
+//
+// 1) Initially, the value of VEYRON_NM_TEST is set to be "setup",
+// which prompts the nodeManager() method to setup a new workspace
+// that mimics the structure used for storing the node manager
+// binary. The method then sets VEYRON_NM_TEST to "parent" and
+// restarts itself using syscall.Exec(), effectively becoming the
+// process described next.
+//
+// 2) The "parent" branch sets up a mock application and content
+// repository and a node manager that is pointed to the mock
+// application repository for updates. When all three services start,
+// the TestUpdate() method is notified and it proceeds to invoke
+// Update() on the node manager. This in turn results in the node
+// manager downloading an application envelope from the mock
+// application repository and a binary from the mock content
+// repository. These are identical to the application envelope of the
+// "parent" node manager, except for the VEYRON_NM_TEST variable,
+// which is set to "child". The Update() method then spawns the child
+// node manager, checks that it is a valid node manager, and
+// terminates.
+//
+// 3) The "child" branch sets up a node manager and then calls back to
+// the "parent" node manager. This prompts the parent node manager to
+// invoke the Revert() method on the child node manager, which
+// terminates the child.
func TestUpdate(t *testing.T) {
// Set up a mount table.
runtime := rt.Init()
- defer runtime.Shutdown()
- mtName, mtCleanup := startMountTable(t, runtime)
+ mtName, mtCleanup := startMountTable(t)
defer mtCleanup()
ns := runtime.Namespace()
- // The local, client side Namespace is now relative to the
+ // The local, client-side Namespace is now relative to the
// MountTable server started above.
ns.SetRoots([]string{mtName})
- ctx := runtime.NewContext()
-
// Spawn a node manager with an identity blessed by the MountTable's
// identity under the name "test", and obtain its address.
//
@@ -196,7 +401,7 @@
child := spawnNodeManager(t, mtName, idFile)
defer child.Cleanup()
child.Expect("ready")
- name := naming.Join(mtName, "nm")
+ ctx, name := runtime.NewContext(), naming.Join(mtName, "nm")
results, err := ns.Resolve(ctx, name)
if err != nil {
t.Fatalf("Resolve(%v) failed: %v", name, err)
@@ -204,11 +409,6 @@
if expected, got := 1, len(results); expected != got {
t.Fatalf("Unexpected number of results: expected %d, got %d", expected, got)
}
- nmAddress := results[0]
- vlog.VI(1).Infof("Node manager name: %v address: %v", name, nmAddress)
-
- // Invoke the Update method and check that another instance of the
- // node manager binary has been started.
- invokeUpdate(t, nmAddress)
+ invokeUpdate(t, name)
child.Expect("ready")
}
diff --git a/services/mgmt/node/impl/invoker.go b/services/mgmt/node/impl/invoker.go
index 3a3df2e..46fd993 100644
--- a/services/mgmt/node/impl/invoker.go
+++ b/services/mgmt/node/impl/invoker.go
@@ -1,5 +1,29 @@
package impl
+// The implementation of the node manager expects that the node
+// manager installations are all organized in the following directory
+// structure:
+//
+// VEYRON_NM_ROOT/
+// current # symlink to one of the workspaces
+// workspace-1/
+// noded - the node manager binary
+// noded.sh - a shell script to start the binary
+// ...
+// workspace-n/
+// noded - the node manager binary
+// noded.sh - a shell script to start the binary
+//
+// The node manager is always expected to be started through
+// VEYRON_NM_ROOT/current/noded.sh, which is monitored by an init
+// daemon. This provides for simple and robust updates.
+//
+// To update the node manager to a newer version, a new workspace is
+// created and the "current" symlink is updated
+// accordingly. Similarly, to revert the node manager to a previous
+// version, all that is required is to update the symlink to point to
+// the previous workspace.
+
import (
"bytes"
"errors"
@@ -31,7 +55,9 @@
"veyron2/vlog"
)
-var updateSuffix = regexp.MustCompile(`^apps\/.*$`)
+const CURRENT = "current"
+
+var appsSuffix = regexp.MustCompile(`^apps\/.*$`)
// state wraps state shared between different node manager
// invocations.
@@ -46,6 +72,9 @@
envelope *application.Envelope
// name is the node manager name.
name string
+ // previous holds the local path to the previous version of the node
+ // manager.
+ previous string
// updating is a flag that records whether this instance of node
// manager is being updated.
updating bool
@@ -298,7 +327,7 @@
// APPLICATION INTERFACE IMPLEMENTATION
func downloadBinary(workspace, binary string) error {
- stub, err := content.BindContent(binary)
+ stub, err := content.BindRepository(binary)
if err != nil {
vlog.Errorf("BindContent(%q) failed: %v", binary, err)
return errOperationFailed
@@ -372,10 +401,19 @@
return nil
}
+// TODO(jsimsa): Replace PREVIOUS_ENV with a command-line flag when
+// command-line flags in tests are supported.
func generateScript(workspace string, envelope *application.Envelope) error {
+ path, err := filepath.EvalSymlinks(os.Args[0])
+ if err != nil {
+ vlog.Errorf("EvalSymlinks(%v) failed: %v", os.Args[0], err)
+ return errOperationFailed
+ }
output := "#!/bin/bash\n"
- output += strings.Join(envelope.Env, " ") + " " + os.Args[0] + " " + strings.Join(envelope.Args, " ")
- path := filepath.Join(workspace, "noded.sh")
+ output += PREVIOUS_ENV + "=" + filepath.Dir(path) + " "
+ output += strings.Join(envelope.Env, " ") + " "
+ output += os.Args[0] + " " + strings.Join(envelope.Args, " ")
+ path = filepath.Join(workspace, "noded.sh")
if err := ioutil.WriteFile(path, []byte(output), 0755); err != nil {
vlog.Errorf("WriteFile(%v) failed: %v", path, err)
return errOperationFailed
@@ -383,8 +421,25 @@
return nil
}
+// getCurrentFileInfo returns the os.FileInfo for both the symbolic
+// link $VEYRON_NM_ROOT/current and the workspace this link points to.
+func getCurrentFileInfo() (os.FileInfo, os.FileInfo, error) {
+ path := filepath.Join(os.Getenv(ROOT_ENV), CURRENT)
+ link, err := os.Lstat(path)
+ if err != nil {
+ vlog.Errorf("Lstat(%v) failed: %v", path, err)
+ return nil, nil, err
+ }
+ workspace, err := os.Stat(path)
+ if err != nil {
+ vlog.Errorf("Stat(%v) failed: %v", path, err)
+ return nil, nil, err
+ }
+ return link, workspace, nil
+}
+
func updateLink(workspace string) error {
- link := filepath.Join(os.Getenv(ROOT_ENV), "stable")
+ link := filepath.Join(os.Getenv(ROOT_ENV), CURRENT)
newLink := link + ".new"
fi, err := os.Lstat(newLink)
if err == nil {
@@ -410,12 +465,17 @@
i.state.channels[id] = channel
}
+func (i *invoker) revertNodeManager() error {
+ if err := updateLink(i.state.previous); err != nil {
+ return err
+ }
+ rt.R().Stop()
+ return nil
+}
+
func (i *invoker) testNodeManager(workspace string, envelope *application.Envelope) error {
- path := filepath.Join(workspace, "noded")
- cmd := exec.Command(path, envelope.Args...)
- cmd.Env = envelope.Env
- cmd.Env = vexec.SetEnv(cmd.Env, ORIGIN_ENV, naming.JoinAddressName(i.state.name, "ar"))
- cmd.Env = vexec.SetEnv(cmd.Env, TEST_UPDATE_ENV, "1")
+ path := filepath.Join(workspace, "noded.sh")
+ cmd := exec.Command(path)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Setup up the child process callback.
@@ -446,21 +506,48 @@
nmClient, err := node.BindNode(address)
if err != nil {
vlog.Errorf("BindNode(%v) failed: %v", address, err)
- if err := cmd.Process.Kill(); err != nil {
- vlog.Errorf("Kill() failed: %v", err)
+ if err := handle.Clean(); err != nil {
+ vlog.Errorf("Clean() failed: %v", err)
}
return errOperationFailed
}
- if err := nmClient.Update(rt.R().NewContext()); err != nil {
- if err := cmd.Process.Kill(); err != nil {
- vlog.Errorf("Kill() failed: %v", err)
+ linkOld, workspaceOld, err := getCurrentFileInfo()
+ if err != nil {
+ if err := handle.Clean(); err != nil {
+ vlog.Errorf("Clean() failed: %v", err)
}
return errOperationFailed
}
+ if err := nmClient.Revert(rt.R().NewContext()); err != nil {
+ if err := handle.Clean(); err != nil {
+ vlog.Errorf("Clean() failed: %v", err)
+ }
+ return errOperationFailed
+ }
+ linkNew, workspaceNew, err := getCurrentFileInfo()
+ if err != nil {
+ if err := handle.Clean(); err != nil {
+ vlog.Errorf("Clean() failed: %v", err)
+ }
+ return errOperationFailed
+ }
+ // Check that the new node manager updated the symbolic link
+ // $VEYRON_NM_ROOT/current.
+ if !linkOld.ModTime().Before(linkNew.ModTime()) {
+ vlog.Errorf("new node manager test failed")
+ return errOperationFailed
+ }
+ // Check that the symbolic link $VEYRON_NM_ROOT/current points to
+ // the same workspace.
+ if workspaceOld.Name() != workspaceNew.Name() {
+ updateLink(workspaceOld.Name())
+ vlog.Errorf("new node manager test failed")
+ return errOperationFailed
+ }
case <-time.After(testTimeout):
vlog.Errorf("Waiting for callback timed out")
- if err := cmd.Process.Kill(); err != nil {
- vlog.Errorf("Kill() failed: %v", err)
+ if err := handle.Clean(); err != nil {
+ vlog.Errorf("Clean() failed: %v", err)
}
return errOperationFailed
}
@@ -500,24 +587,18 @@
}
return err
}
- if os.Getenv(TEST_UPDATE_ENV) == "" {
- if err := i.testNodeManager(workspace, envelope); err != nil {
- if err := os.RemoveAll(workspace); err != nil {
- vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
- }
- return err
- }
- // If the binary has changed, update the node manager symlink.
- if err := updateLink(workspace); err != nil {
- if err := os.RemoveAll(workspace); err != nil {
- vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
- }
- return err
- }
- } else {
+ if err := i.testNodeManager(workspace, envelope); err != nil {
if err := os.RemoveAll(workspace); err != nil {
vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
}
+ return err
+ }
+ // If the binary has changed, update the node manager symlink.
+ if err := updateLink(workspace); err != nil {
+ if err := os.RemoveAll(workspace); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+ }
+ return err
}
rt.R().Stop()
}
@@ -530,12 +611,62 @@
return "", nil
}
+func (i *invoker) Refresh(call ipc.ServerContext) error {
+ vlog.VI(0).Infof("%v.Refresh()", i.suffix)
+ // TODO(jsimsa): Implement.
+ return nil
+}
+
+func (i *invoker) Restart(call ipc.ServerContext) error {
+ vlog.VI(0).Infof("%v.Restart()", i.suffix)
+ // TODO(jsimsa): Implement.
+ return nil
+}
+
+func (i *invoker) Resume(call ipc.ServerContext) error {
+ vlog.VI(0).Infof("%v.Resume()", i.suffix)
+ // TODO(jsimsa): Implement.
+ return nil
+}
+
+func (i *invoker) Revert(call ipc.ServerContext) error {
+ vlog.VI(0).Infof("%v.Revert()", i.suffix)
+ if i.state.previous == "" {
+ return errOperationFailed
+ }
+ i.state.updatingMutex.Lock()
+ if i.state.updating {
+ i.state.updatingMutex.Unlock()
+ return errUpdateInProgress
+ } else {
+ i.state.updating = true
+ }
+ i.state.updatingMutex.Unlock()
+ err := i.revertNodeManager()
+ i.state.updatingMutex.Lock()
+ i.state.updating = false
+ i.state.updatingMutex.Unlock()
+ return err
+}
+
func (i *invoker) Start(call ipc.ServerContext) ([]string, error) {
vlog.VI(0).Infof("%v.Start()", i.suffix)
// TODO(jsimsa): Implement.
return make([]string, 0), nil
}
+func (i *invoker) Stop(call ipc.ServerContext, deadline uint64) error {
+ vlog.VI(0).Infof("%v.Stop(%v)", i.suffix, deadline)
+ // TODO(jsimsa): Implement.
+ return nil
+}
+
+func (i *invoker) Suspend(call ipc.ServerContext) error {
+ vlog.VI(0).Infof("%v.Suspend()", i.suffix)
+ // TODO(jsimsa): Implement.
+ return nil
+}
+
func (i *invoker) Uninstall(call ipc.ServerContext) error {
vlog.VI(0).Infof("%v.Uninstall()", i.suffix)
// TODO(jsimsa): Implement.
@@ -556,9 +687,11 @@
}
i.state.updatingMutex.Unlock()
err := i.updateNodeManager()
+ i.state.updatingMutex.Lock()
i.state.updating = false
+ i.state.updatingMutex.Unlock()
return err
- case updateSuffix.MatchString(i.suffix):
+ case appsSuffix.MatchString(i.suffix):
// TODO(jsimsa): Implement.
return nil
default:
@@ -566,36 +699,6 @@
}
}
-func (i *invoker) Refresh(call ipc.ServerContext) error {
- vlog.VI(0).Infof("%v.Refresh()", i.suffix)
- // TODO(jsimsa): Implement.
- return nil
-}
-
-func (i *invoker) Restart(call ipc.ServerContext) error {
- vlog.VI(0).Infof("%v.Restart()", i.suffix)
- // TODO(jsimsa): Implement.
- return nil
-}
-
-func (i *invoker) Resume(call ipc.ServerContext) error {
- vlog.VI(0).Infof("%v.Resume()", i.suffix)
- // TODO(jsimsa): Implement.
- return nil
-}
-
-func (i *invoker) Shutdown(call ipc.ServerContext, deadline uint64) error {
- vlog.VI(0).Infof("%v.Shutdown(%v)", i.suffix, deadline)
- // TODO(jsimsa): Implement.
- return nil
-}
-
-func (i *invoker) Suspend(call ipc.ServerContext) error {
- vlog.VI(0).Infof("%v.Suspend()", i.suffix)
- // TODO(jsimsa): Implement.
- return nil
-}
-
// CALLBACK RECEIVER INTERFACE IMPLEMENTATION
func (i *invoker) Callback(call ipc.ServerContext, name string) error {
@@ -609,89 +712,3 @@
channel <- name
return nil
}
-
-// invoker holds the state of a node manager content repository invocation.
-type crInvoker struct {
- suffix string
-}
-
-// NewCRInvoker is the node manager content repository invoker factory.
-func NewCRInvoker(suffix string) *crInvoker {
- return &crInvoker{
- suffix: suffix,
- }
-}
-
-// CONTENT REPOSITORY INTERFACE IMPLEMENTATION
-
-func (i *crInvoker) Delete(ipc.ServerContext) error {
- vlog.VI(0).Infof("%v.Delete()", i.suffix)
- return nil
-}
-
-func (i *crInvoker) Download(_ ipc.ServerContext, stream content.ContentServiceDownloadStream) error {
- vlog.VI(0).Infof("%v.Download()", i.suffix)
- file, err := os.Open(os.Args[0])
- if err != nil {
- vlog.Errorf("Open() failed: %v", err)
- return errOperationFailed
- }
- defer file.Close()
- bufferLength := 4096
- buffer := make([]byte, bufferLength)
- for {
- n, err := file.Read(buffer)
- switch err {
- case io.EOF:
- return nil
- case nil:
- if err := stream.Send(buffer[:n]); err != nil {
- vlog.Errorf("Send() failed: %v", err)
- return errOperationFailed
- }
- default:
- vlog.Errorf("Read() failed: %v", err)
- return errOperationFailed
- }
- }
-}
-
-func (i *crInvoker) Upload(ipc.ServerContext, content.ContentServiceUploadStream) (string, error) {
- vlog.VI(0).Infof("%v.Upload()", i.suffix)
- return "", nil
-}
-
-// arState wraps state shared by different node manager application
-// repository invocations.
-type arState struct {
- // envelope is the node manager application envelope.
- envelope *application.Envelope
- // name is the node manager name.
- name string
-}
-
-// invoker holds the state of a node manager application repository invocation.
-type arInvoker struct {
- state *arState
- suffix string
-}
-
-// NewARInvoker is the node manager content repository invoker factory.
-
-func NewARInvoker(state *arState, suffix string) *arInvoker {
- return &arInvoker{
- state: state,
- suffix: suffix,
- }
-}
-
-// APPLICATION REPOSITORY INTERFACE IMPLEMENTATION
-
-func (i *arInvoker) Match(ipc.ServerContext, []string) (application.Envelope, error) {
- vlog.VI(0).Infof("%v.Match()", i.suffix)
- envelope := application.Envelope{}
- envelope.Args = i.state.envelope.Args
- envelope.Binary = naming.JoinAddressName(i.state.name, "cr")
- envelope.Env = i.state.envelope.Env
- return envelope, nil
-}
diff --git a/services/mgmt/node/noded/main.go b/services/mgmt/node/noded/main.go
index dad9f4e..00b6365 100644
--- a/services/mgmt/node/noded/main.go
+++ b/services/mgmt/node/noded/main.go
@@ -20,8 +20,8 @@
// TODO(rthellend): Remove the address and protocol flags when the config manager is working.
var address, protocol, publishAs string
flag.StringVar(&address, "address", "localhost:0", "network address to listen on")
- flag.StringVar(&publishAs, "name", "", "name to publish the node manager at")
flag.StringVar(&protocol, "protocol", "tcp", "network type to listen on")
+ flag.StringVar(&publishAs, "name", "", "name to publish the node manager at")
flag.Parse()
if os.Getenv(impl.ORIGIN_ENV) == "" {
vlog.Fatalf("Specify the node manager origin as environment variable %s=<name>", impl.ORIGIN_ENV)
@@ -37,20 +37,15 @@
if err != nil {
vlog.Fatalf("Listen(%v, %v) failed: %v", protocol, address, err)
}
- envelope := &application.Envelope{}
- suffix, crSuffix, arSuffix := "", "cr", "ar"
+ suffix, envelope := "", &application.Envelope{}
name := naming.MakeTerminal(naming.JoinAddressName(endpoint.String(), suffix))
vlog.VI(0).Infof("Node manager name: %v", name)
- dispatcher, crDispatcher, arDispatcher := impl.NewDispatchers(vflag.NewAuthorizerOrDie(), envelope, name)
+ // TODO(jsimsa): Replace PREVIOUS_ENV with a command-line flag when
+ // command-line flags are supported in tests.
+ dispatcher := impl.NewDispatcher(vflag.NewAuthorizerOrDie(), envelope, name, os.Getenv(impl.PREVIOUS_ENV))
if err := server.Register(suffix, dispatcher); err != nil {
vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
}
- if err := server.Register(crSuffix, crDispatcher); err != nil {
- vlog.Fatalf("Register(%v, %v) failed: %v", crSuffix, crDispatcher, err)
- }
- if err := server.Register(arSuffix, arDispatcher); err != nil {
- vlog.Fatalf("Register(%v, %v) failed: %v", arSuffix, arDispatcher, err)
- }
if len(publishAs) > 0 {
if err := server.Publish(publishAs); err != nil {
vlog.Fatalf("Publish(%v) failed: %v", publishAs, err)
diff --git a/tools/content/impl/impl.go b/tools/content/impl/impl.go
index ef412d5..f9fd170 100644
--- a/tools/content/impl/impl.go
+++ b/tools/content/impl/impl.go
@@ -24,7 +24,7 @@
if expected, got := 1, len(args); expected != got {
return cmd.Errorf("delete: incorrect number of arguments, expected %d, got %d", expected, got)
}
- c, err := content.BindContent(args[0])
+ c, err := content.BindRepository(args[0])
if err != nil {
return fmt.Errorf("bind error: %v", err)
}
@@ -61,7 +61,7 @@
}
defer f.Close()
- c, err := content.BindContent(args[0])
+ c, err := content.BindRepository(args[0])
if err != nil {
return fmt.Errorf("bind error: %v", err)
}
@@ -119,7 +119,7 @@
}
defer f.Close()
- c, err := content.BindContent(args[0])
+ c, err := content.BindRepository(args[0])
if err != nil {
return fmt.Errorf("bind error: %v", err)
}
diff --git a/tools/content/impl/impl_test.go b/tools/content/impl/impl_test.go
index 3bc79a8..efd7f0d 100644
--- a/tools/content/impl/impl_test.go
+++ b/tools/content/impl/impl_test.go
@@ -32,14 +32,14 @@
return nil
}
-func (s *server) Download(_ ipc.ServerContext, stream content.ContentServiceDownloadStream) error {
+func (s *server) Download(_ ipc.ServerContext, stream content.RepositoryServiceDownloadStream) error {
vlog.VI(2).Infof("Download() was called. suffix=%v", s.suffix)
stream.Send([]byte("Hello"))
stream.Send([]byte("World"))
return nil
}
-func (s *server) Upload(_ ipc.ServerContext, stream content.ContentServiceUploadStream) (string, error) {
+func (s *server) Upload(_ ipc.ServerContext, stream content.RepositoryServiceUploadStream) (string, error) {
vlog.VI(2).Infof("Upload() was called. suffix=%v", s.suffix)
for {
if _, err := stream.Recv(); err != nil {
@@ -57,7 +57,7 @@
}
func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(content.NewServerContent(&server{suffix: suffix}))
+ invoker := ipc.ReflectInvoker(content.NewServerRepository(&server{suffix: suffix}))
return invoker, nil, nil
}