veyron/services/mgmt/node: making the self-update logic of node
manager robust to failures.

This CL introduces changes that guarantee that if the self-update
operation fails, a working copy of the node manager remains on the
device, and when the node manager fails, it is restarted.

To this end, this CL assumes the latest node manager installation is
pointed by a symbolic link (e.g. /usr/local/bin/nm/stable). When the
node manager is to be updated, the binary of the new version is
downloaded to a new "workspace"
(e.g. /usr/local/bin/nm/2014-05-26T15:00:00Z) and the symbolic link
that points to the latest installation of node manager is updated
accordingly. Further, the host init daemon is (expected to be) used to
make sure node manager is restarted when it crashes. To avoid the need
to modify the init daemon configuration every node manager
installation contains a shell script that sets up the application
envelope (environment variable + command-line arguments) and the init
daemon configuration file points to this shell script
(e.g. /usr/local/bin/nm/stable/noned.sh) that sets up the appropriate
environment variable and command-line arguments.

Change-Id: Ie81e21ac39944655ac6c170b8527b64b7a2e148b
diff --git a/services/mgmt/node/impl/invoker.go b/services/mgmt/node/impl/invoker.go
index 475307c..e32ff53 100644
--- a/services/mgmt/node/impl/invoker.go
+++ b/services/mgmt/node/impl/invoker.go
@@ -3,22 +3,26 @@
 import (
 	"bytes"
 	"errors"
+	"fmt"
 	"io"
 	"io/ioutil"
+	"math/rand"
 	"os"
 	"os/exec"
+	"path/filepath"
 	"reflect"
 	"regexp"
 	"runtime"
 	"strings"
-	"syscall"
+	"sync"
 	"time"
 
 	vexec "veyron/lib/exec"
 	ibuild "veyron/services/mgmt/build"
-
 	"veyron/services/mgmt/profile"
+
 	"veyron2/ipc"
+	"veyron2/naming"
 	"veyron2/rt"
 	"veyron2/services/mgmt/application"
 	"veyron2/services/mgmt/build"
@@ -29,13 +33,30 @@
 
 var updateSuffix = regexp.MustCompile(`^apps\/.*$`)
 
-// invoker holds the state of a node manager invocation.
-type invoker struct {
+// state wraps state shared between different node manager
+// invocations.
+type state struct {
+	// channels maps callback identifiers to channels that are used to
+	// communicate information from child processes.
+	channels map[string]chan string
+	// channelsMutex is a lock for coordinating concurrent access to
+	// <channels>.
+	channelsMutex *sync.Mutex
 	// envelope is the node manager application envelope.
 	envelope *application.Envelope
-	// origin is a veyron name that resolves to the node manager
-	// envelope.
-	origin string
+	// name is the node manager name.
+	name string
+	// updating is a flag that records whether this instance of node
+	// manager is being updated.
+	updating bool
+	// updatingMutex is a lock for coordinating concurrent access to
+	// <updating>.
+	updatingMutex *sync.Mutex
+}
+
+// invoker holds the state of a node manager invocation.
+type invoker struct {
+	state *state
 	// suffix is the suffix of the current invocation that is assumed to
 	// be used as a relative veyron name to identify an application,
 	// installation, or instance.
@@ -43,16 +64,16 @@
 }
 
 var (
-	errInvalidSuffix   = errors.New("invalid suffix")
-	errOperationFailed = errors.New("operation failed")
+	errInvalidSuffix    = errors.New("invalid suffix")
+	errOperationFailed  = errors.New("operation failed")
+	errUpdateInProgress = errors.New("update in progress")
 )
 
 // NewInvoker is the invoker factory.
-func NewInvoker(envelope *application.Envelope, origin, suffix string) *invoker {
+func NewInvoker(state *state, suffix string) *invoker {
 	return &invoker{
-		envelope: envelope,
-		origin:   origin,
-		suffix:   suffix,
+		state:  state,
+		suffix: suffix,
 	}
 }
 
@@ -276,22 +297,22 @@
 
 // APPLICATION INTERFACE IMPLEMENTATION
 
-func downloadBinary(binary string) (string, error) {
+func downloadBinary(workspace, binary string) error {
 	stub, err := content.BindContent(binary)
 	if err != nil {
 		vlog.Errorf("BindContent(%q) failed: %v", binary, err)
-		return "", errOperationFailed
+		return errOperationFailed
 	}
 	stream, err := stub.Download(rt.R().NewContext())
 	if err != nil {
 		vlog.Errorf("Download() failed: %v", err)
-		return "", errOperationFailed
+		return errOperationFailed
 	}
-	tmpDir, prefix := "", ""
-	file, err := ioutil.TempFile(tmpDir, prefix)
+	path := filepath.Join(workspace, "noded")
+	file, err := os.Create(path)
 	if err != nil {
-		vlog.Errorf("TempFile(%q, %q) failed: %v", tmpDir, prefix, err)
-		return "", errOperationFailed
+		vlog.Errorf("Create(%q) failed: %v", path, err)
+		return errOperationFailed
 	}
 	defer file.Close()
 	for {
@@ -301,27 +322,23 @@
 		}
 		if err != nil {
 			vlog.Errorf("Recv() failed: %v", err)
-			os.Remove(file.Name())
-			return "", errOperationFailed
+			return errOperationFailed
 		}
 		if _, err := file.Write(bytes); err != nil {
 			vlog.Errorf("Write() failed: %v", err)
-			os.Remove(file.Name())
-			return "", errOperationFailed
+			return errOperationFailed
 		}
 	}
 	if err := stream.Finish(); err != nil {
 		vlog.Errorf("Finish() failed: %v", err)
-		os.Remove(file.Name())
-		return "", errOperationFailed
+		return errOperationFailed
 	}
 	mode := os.FileMode(0755)
 	if err := file.Chmod(mode); err != nil {
 		vlog.Errorf("Chmod(%v) failed: %v", mode, err)
-		os.Remove(file.Name())
-		return "", errOperationFailed
+		return errOperationFailed
 	}
-	return file.Name(), nil
+	return nil
 }
 
 func fetchEnvelope(origin string) (*application.Envelope, error) {
@@ -341,36 +358,169 @@
 	return &envelope, nil
 }
 
-func replaceBinary(oldBinary, newBinary string) error {
-	// Replace the old binary with the new one.
-	if err := syscall.Unlink(oldBinary); err != nil {
-		vlog.Errorf("Unlink(%v) failed: %v", oldBinary, err)
-		return errOperationFailed
+func generateBinary(workspace string, envelope *application.Envelope, newBinary bool) error {
+	if newBinary {
+		// Download the new binary.
+		return downloadBinary(workspace, envelope.Binary)
 	}
-	if err := os.Rename(newBinary, oldBinary); err != nil {
-		vlog.Errorf("Rename(%v, %v) failed: %v", newBinary, oldBinary, err)
+	// Link the current binary.
+	path := filepath.Join(workspace, "noded")
+	if err := os.Link(os.Args[0], path); err != nil {
+		vlog.Errorf("Link(%v, %v) failed: %v", os.Args[0], path, err)
 		return errOperationFailed
 	}
 	return nil
 }
 
-func spawnNodeManager(envelope *application.Envelope) error {
-	cmd := exec.Command(os.Args[0], envelope.Args...)
+func generateScript(workspace string, envelope *application.Envelope) error {
+	output := "#!/bin/bash\n"
+	output += strings.Join(envelope.Env, " ") + " " + os.Args[0] + " " + strings.Join(envelope.Args, " ")
+	path := filepath.Join(workspace, "noded.sh")
+	if err := ioutil.WriteFile(path, []byte(output), 0755); err != nil {
+		vlog.Errorf("WriteFile(%v) failed: %v", path, err)
+		return errOperationFailed
+	}
+	return nil
+}
+
+func updateLink(workspace string) error {
+	link := filepath.Join(os.Getenv(ROOT_ENV), "stable")
+	newLink := link + ".new"
+	fi, err := os.Lstat(newLink)
+	if err == nil {
+		if err := os.Remove(fi.Name()); err != nil {
+			vlog.Errorf("Remove(%v) failed: %v", fi.Name(), err)
+			return errOperationFailed
+		}
+	}
+	if err := os.Symlink(workspace, newLink); err != nil {
+		vlog.Errorf("Symlink(%v, %v) failed: %v", workspace, newLink, err)
+		return errOperationFailed
+	}
+	if err := os.Rename(newLink, link); err != nil {
+		vlog.Errorf("Rename(%v, %v) failed: %v", newLink, link, err)
+		return errOperationFailed
+	}
+	return nil
+}
+
+func (i *invoker) registerCallback(id string, channel chan string) {
+	i.state.channelsMutex.Lock()
+	defer i.state.channelsMutex.Unlock()
+	i.state.channels[id] = channel
+}
+
+func (i *invoker) testNodeManager(workspace string, envelope *application.Envelope) error {
+	path := filepath.Join(workspace, "noded")
+	cmd := exec.Command(path, envelope.Args...)
 	cmd.Env = envelope.Env
+	cmd.Env = vexec.SetEnv(cmd.Env, ORIGIN_ENV, naming.JoinAddressName(i.state.name, "ar"))
+	cmd.Env = vexec.SetEnv(cmd.Env, TEST_UPDATE_ENV, "1")
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
-
-	handle := vexec.NewParentHandle(cmd)
+	// Setup up the child process callback.
+	id := fmt.Sprintf("%d", rand.Int())
+	handle := vexec.NewParentHandle(cmd, vexec.CallbackNameOpt(naming.JoinAddressName(i.state.name, id)))
+	callbackChan := make(chan string)
+	i.registerCallback(id, callbackChan)
+	defer i.unregisterCallback(id)
+	// Start the child process.
 	if err := handle.Start(); err != nil {
 		vlog.Errorf("Start() failed: %v", err)
 		return errOperationFailed
 	}
-	if err := handle.WaitForReady(10 * time.Second); err != nil {
-		vlog.Errorf("WaitForReady() failed: %v", err)
-		cmd.Process.Kill()
+	// Wait for the child process to start.
+	testTimeout := 2 * time.Second
+	if err := handle.WaitForReady(testTimeout); err != nil {
+		vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
+		if err := cmd.Process.Kill(); err != nil {
+			vlog.Errorf("Kill() failed: %v", err)
+		}
 		return errOperationFailed
 	}
-	rt.R().Stop()
+	// Wait for the child process to invoke the Callback().
+	select {
+	case address := <-callbackChan:
+		// Check that invoking Update() succeeds.
+		address = naming.JoinAddressName(address, "nm")
+		nmClient, err := node.BindNode(address)
+		if err != nil {
+			vlog.Errorf("BindNode(%v) failed: %v", address, err)
+			if err := cmd.Process.Kill(); err != nil {
+				vlog.Errorf("Kill() failed: %v", err)
+			}
+			return errOperationFailed
+		}
+		if err := nmClient.Update(rt.R().NewContext()); err != nil {
+			if err := cmd.Process.Kill(); err != nil {
+				vlog.Errorf("Kill() failed: %v", err)
+			}
+			return errOperationFailed
+		}
+	case <-time.After(testTimeout):
+		vlog.Errorf("Waiting for callback timed out")
+		if err := cmd.Process.Kill(); err != nil {
+			vlog.Errorf("Kill() failed: %v", err)
+		}
+		return errOperationFailed
+	}
+	return nil
+}
+
+func (i *invoker) unregisterCallback(id string) {
+	i.state.channelsMutex.Lock()
+	defer i.state.channelsMutex.Unlock()
+	delete(i.state.channels, id)
+}
+
+func (i *invoker) updateNodeManager() error {
+	envelope, err := fetchEnvelope(os.Getenv(ORIGIN_ENV))
+	if err != nil {
+		return err
+	}
+	if !reflect.DeepEqual(envelope, i.state.envelope) {
+		// Create new workspace.
+		workspace := filepath.Join(os.Getenv(ROOT_ENV), fmt.Sprintf("%v", time.Now().Format(time.RFC3339Nano)))
+		perm := os.FileMode(0755)
+		if err := os.MkdirAll(workspace, perm); err != nil {
+			vlog.Errorf("MkdirAll(%v, %v) failed: %v", workspace, perm, err)
+			return errOperationFailed
+		}
+		// Populate the new workspace with a node manager binary.
+		if err := generateBinary(workspace, envelope, envelope.Binary != i.state.envelope.Binary); err != nil {
+			if err := os.RemoveAll(workspace); err != nil {
+				vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+			}
+			return err
+		}
+		// Populate the new workspace with a node manager script.
+		if err := generateScript(workspace, envelope); err != nil {
+			if err := os.RemoveAll(workspace); err != nil {
+				vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+			}
+			return err
+		}
+		if os.Getenv(TEST_UPDATE_ENV) == "" {
+			if err := i.testNodeManager(workspace, envelope); err != nil {
+				if err := os.RemoveAll(workspace); err != nil {
+					vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+				}
+				return err
+			}
+			// If the binary has changed, update the node manager symlink.
+			if err := updateLink(workspace); err != nil {
+				if err := os.RemoveAll(workspace); err != nil {
+					vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+				}
+				return err
+			}
+		} else {
+			if err := os.RemoveAll(workspace); err != nil {
+				vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
+			}
+		}
+		rt.R().Stop()
+	}
 	return nil
 }
 
@@ -396,30 +546,18 @@
 	vlog.VI(0).Infof("%v.Update()", i.suffix)
 	switch {
 	case i.suffix == "nm":
-		// This branch attempts to update the node manager updates itself.
-		envelope, err := fetchEnvelope(i.origin)
-		if err != nil {
-			return err
+		// This branch attempts to update the node manager itself.
+		i.state.updatingMutex.Lock()
+		if i.state.updating {
+			i.state.updatingMutex.Unlock()
+			return errUpdateInProgress
+		} else {
+			i.state.updating = true
 		}
-		if envelope.Binary != i.envelope.Binary {
-			file, err := downloadBinary(envelope.Binary)
-			if err != nil {
-				return err
-			}
-			if err := replaceBinary(os.Args[0], file); err != nil {
-				os.Remove(file)
-				return err
-			}
-		}
-		if !reflect.DeepEqual(envelope, i.envelope) {
-			i.envelope = envelope
-			if err := spawnNodeManager(i.envelope); err != nil {
-				return err
-			}
-			// TODO(jsimsa): When Bogdan implements the shutdown API, use it
-			// to stop itself (or have the caller do that).
-		}
-		return nil
+		i.state.updatingMutex.Unlock()
+		err := i.updateNodeManager()
+		i.state.updating = false
+		return err
 	case updateSuffix.MatchString(i.suffix):
 		// TODO(jsimsa): Implement.
 		return nil
@@ -457,3 +595,103 @@
 	// TODO(jsimsa): Implement.
 	return nil
 }
+
+// CALLBACK RECEIVER INTERFACE IMPLEMENTATION
+
+func (i *invoker) Callback(call ipc.ServerContext, name string) error {
+	vlog.VI(0).Infof("%v.Callback()", i.suffix)
+	i.state.channelsMutex.Lock()
+	channel, ok := i.state.channels[i.suffix]
+	i.state.channelsMutex.Unlock()
+	if !ok {
+		return errInvalidSuffix
+	}
+	channel <- name
+	return nil
+}
+
+// invoker holds the state of a node manager content repository invocation.
+type crInvoker struct {
+	suffix string
+}
+
+// NewCRInvoker is the node manager content repository invoker factory.
+func NewCRInvoker(suffix string) *crInvoker {
+	return &crInvoker{
+		suffix: suffix,
+	}
+}
+
+// CONTENT REPOSITORY INTERFACE IMPLEMENTATION
+
+func (i *crInvoker) Delete(ipc.ServerContext) error {
+	vlog.VI(0).Infof("%v.Delete()", i.suffix)
+	return nil
+}
+
+func (i *crInvoker) Download(_ ipc.ServerContext, stream content.ContentServiceDownloadStream) error {
+	vlog.VI(0).Infof("%v.Download()", i.suffix)
+	file, err := os.Open(os.Args[0])
+	if err != nil {
+		vlog.Errorf("Open() failed: %v", err)
+		return errOperationFailed
+	}
+	defer file.Close()
+	bufferLength := 4096
+	buffer := make([]byte, bufferLength)
+	for {
+		n, err := file.Read(buffer)
+		switch err {
+		case io.EOF:
+			return nil
+		case nil:
+			if err := stream.Send(buffer[:n]); err != nil {
+				vlog.Errorf("Send() failed: %v", err)
+				return errOperationFailed
+			}
+		default:
+			vlog.Errorf("Read() failed: %v", err)
+			return errOperationFailed
+		}
+	}
+}
+
+func (i *crInvoker) Upload(ipc.ServerContext, content.ContentServiceUploadStream) (string, error) {
+	vlog.VI(0).Infof("%v.Upload()", i.suffix)
+	return "", nil
+}
+
+// arState wraps state shared by different node manager application
+// repository invocations.
+type arState struct {
+	// envelope is the node manager application envelope.
+	envelope *application.Envelope
+	// name is the node manager name.
+	name string
+}
+
+// invoker holds the state of a node manager application repository invocation.
+type arInvoker struct {
+	state  *arState
+	suffix string
+}
+
+// NewARInvoker is the node manager content repository invoker factory.
+
+func NewARInvoker(state *arState, suffix string) *arInvoker {
+	return &arInvoker{
+		state:  state,
+		suffix: suffix,
+	}
+}
+
+// APPLICATION REPOSITORY INTERFACE IMPLEMENTATION
+
+func (i *arInvoker) Match(ipc.ServerContext, []string) (application.Envelope, error) {
+	vlog.VI(0).Infof("%v.Match()", i.suffix)
+	envelope := application.Envelope{}
+	envelope.Args = i.state.envelope.Args
+	envelope.Binary = naming.JoinAddressName(i.state.name, "cr")
+	envelope.Env = i.state.envelope.Env
+	return envelope, nil
+}