blob: d9cd129b14e7c1e520014bbfea21c65ae2468c02 [file] [log] [blame]
package impl
// The implementation of the node manager expects that the node manager
// installations are all organized in the following directory structure:
//
// <config.Root>/
// node-manager/
// <version 1 timestamp>/ - timestamp of when the version was downloaded
// noded - the node manager binary
// noded.sh - a shell script to start the binary
// <version 2 timestamp>
// ...
//
// The node manager is always expected to be started through the symbolic link
// passed in as config.CurrentLink, which is monitored by an init daemon. This
// provides for simple and robust updates.
//
// To update the node manager to a newer version, a new workspace is created and
// the symlink is updated to the new noded.sh script. Similarly, to revert the
// node manager to a previous version, all that is required is to update the
// symlink to point to the previous noded.sh script.
//
//
// The node manager manages the applications it installs and runs using the
// following directory structure:
//
// TODO(caprita): Not all is yet implemented.
//
// <config.Root>/
// app-<hash 1>/ - the application dir is named using a hash of the application title
// installation-<id 1>/ - installations are labelled with ids
// <version 1 timestamp>/ - timestamp of when the version was downloaded
// bin - application binary
// previous - symbolic link to previous version directory (TODO)
// origin - object name for application envelope
// envelope - application envelope (JSON-encoded)
// <version 2 timestamp>
// ...
// current - symbolic link to the current version
// instances/
// instance-<id a>/ - instances are labelled with ids
// root/ - workspace that the instance is run from
// logs/ - stderr/stdout and log files generated by instance
// info - app manager name and process id for the instance (if running)
// version - symbolic link to installation version for the instance
// instance-<id b>
// ...
// stopped-instance-<id c> - stopped instances have their directory name prepended by 'stopped-'
// ...
// installation-<id 2>
// ...
// app-<hash 2>
// ...
//
// When node manager starts up, it goes through all instances and resumes the
// ones that are not suspended. If the application was still running, it
// suspends it first. If an application fails to resume, it stays suspended.
//
// When node manager shuts down, it suspends all running instances.
//
// Start starts an instance. Suspend kills the process but leaves the workspace
// untouched. Resume restarts the process. Stop kills the process and prevents
// future resumes (it also eventually gc's the workspace).
//
// If the process dies on its own, it stays dead and is assumed suspended.
// TODO(caprita): Later, we'll add auto-restart option.
//
// Concurrency model: installations can be created independently of one another;
// installations can be removed at any time (any running instances will be
// stopped). The first call to Uninstall will rename the installation dir as a
// first step; subsequent Uninstalls will fail. Instances can be created
// independently of one another, as long as the installation exists (if it gets
// Uninstalled during an instance Start, the Start may fail). When an instance
// is stopped, the first call to Stop renames the instance dir; subsequent Stop
// calls will fail. Resume will attempt to create an info file; if one exists
// already, Resume fails. Suspend will attempt to rename the info file; if none
// present, Suspend will fail.
//
// TODO(caprita): There is room for synergy between how node manager organizes
// its own workspace and that for the applications it runs. In particular,
// previous, origin, and envelope could be part of a single config. We'll
// refine that later.
import (
"crypto/md5"
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"hash/crc64"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"time"
"veyron/lib/config"
binlib "veyron/services/mgmt/lib/binary"
vexec "veyron/services/mgmt/lib/exec"
iconfig "veyron/services/mgmt/node/config"
"veyron/services/mgmt/profile"
"veyron2/context"
"veyron2/ipc"
"veyron2/mgmt"
"veyron2/naming"
"veyron2/rt"
"veyron2/services/mgmt/appcycle"
"veyron2/services/mgmt/application"
binapi "veyron2/services/mgmt/binary"
"veyron2/services/mgmt/node"
"veyron2/services/mgmt/repository"
"veyron2/verror"
"veyron2/vlog"
)
// instanceInfo holds state about a running instance.
type instanceInfo struct {
AppCycleMgrName string
Pid int
}
// internalState wraps state shared between different node manager
// invocations.
type internalState struct {
// channels maps callback identifiers and config keys to channels that
// are used to communicate corresponding config values from child
// processes.
channels map[string]map[string]chan string
// nextCallbackID provides the next callback identifier to use as key
// for the channels map.
nextCallbackID int64
// channelsMutex is a lock for coordinating concurrent access to
// <channels>.
channelsMutex sync.Mutex
// updating is a flag that records whether this instance of node
// manager is being updated.
updating bool
// updatingMutex is a lock for coordinating concurrent access to
// <updating>.
updatingMutex sync.Mutex
}
// invoker holds the state of a node manager invocation.
type invoker struct {
// internal holds the node manager's internal state that persists across
// RPC requests.
internal *internalState
// config holds the node manager's (immutable) configuration state.
config *iconfig.State
// suffix is the suffix of the current invocation that is assumed to
// be used as a relative object name to identify an application,
// installation, or instance.
suffix string
}
var (
appsSuffix = regexp.MustCompile(`^apps\/.*$`)
errInvalidSuffix = verror.BadArgf("invalid suffix")
errOperationFailed = verror.Internalf("operation failed")
errInProgress = verror.Existsf("operation in progress")
errIncompatibleUpdate = verror.BadArgf("update failed: mismatching app title")
errUpdateNoOp = verror.NotFoundf("no different version available")
errNotExist = verror.NotFoundf("object does not exist")
errInvalidOperation = verror.BadArgf("invalid operation")
)
// NODE INTERFACE IMPLEMENTATION
func (i *invoker) Describe(call ipc.ServerContext) (node.Description, error) {
vlog.VI(1).Infof("%v.Describe()", i.suffix)
empty := node.Description{}
nodeProfile, err := computeNodeProfile()
if err != nil {
return empty, err
}
knownProfiles, err := getKnownProfiles()
if err != nil {
return empty, err
}
result := matchProfiles(nodeProfile, knownProfiles)
return result, nil
}
func (i *invoker) IsRunnable(call ipc.ServerContext, description binapi.Description) (bool, error) {
vlog.VI(1).Infof("%v.IsRunnable(%v)", i.suffix, description)
nodeProfile, err := computeNodeProfile()
if err != nil {
return false, err
}
binaryProfiles := make([]profile.Specification, 0)
for name, _ := range description.Profiles {
profile, err := getProfile(name)
if err != nil {
return false, err
}
binaryProfiles = append(binaryProfiles, *profile)
}
result := matchProfiles(nodeProfile, binaryProfiles)
return len(result.Profiles) > 0, nil
}
func (i *invoker) Reset(call ipc.ServerContext, deadline uint64) error {
vlog.VI(1).Infof("%v.Reset(%v)", i.suffix, deadline)
// TODO(jsimsa): Implement.
return nil
}
// APPLICATION INTERFACE IMPLEMENTATION
func downloadBinary(workspace, fileName, name string) error {
data, err := binlib.Download(name)
if err != nil {
vlog.Errorf("Download(%v) failed: %v", name, err)
return errOperationFailed
}
path, perm := filepath.Join(workspace, fileName), os.FileMode(755)
if err := ioutil.WriteFile(path, data, perm); err != nil {
vlog.Errorf("WriteFile(%v, %v) failed: %v", path, perm, err)
return errOperationFailed
}
return nil
}
func fetchEnvelope(ctx context.T, origin string) (*application.Envelope, error) {
stub, err := repository.BindApplication(origin)
if err != nil {
vlog.Errorf("BindRepository(%v) failed: %v", origin, err)
return nil, errOperationFailed
}
// TODO(jsimsa): Include logic that computes the set of supported
// profiles.
profiles := []string{"test"}
envelope, err := stub.Match(ctx, profiles)
if err != nil {
vlog.Errorf("Match(%v) failed: %v", profiles, err)
return nil, errOperationFailed
}
return &envelope, nil
}
func generateBinary(workspace, fileName string, envelope *application.Envelope, newBinary bool) error {
if newBinary {
// Download the new binary.
return downloadBinary(workspace, fileName, envelope.Binary)
}
// Link the current binary.
path := filepath.Join(workspace, fileName)
if err := os.Link(os.Args[0], path); err != nil {
vlog.Errorf("Link(%v, %v) failed: %v", os.Args[0], path, err)
return errOperationFailed
}
return nil
}
// TODO(jsimsa): Replace <PreviousEnv> with a command-line flag when
// command-line flags in tests are supported.
func generateScript(workspace string, configSettings []string, envelope *application.Envelope) error {
path, err := filepath.EvalSymlinks(os.Args[0])
if err != nil {
vlog.Errorf("EvalSymlinks(%v) failed: %v", os.Args[0], err)
return errOperationFailed
}
output := "#!/bin/bash\n"
output += strings.Join(iconfig.QuoteEnv(append(envelope.Env, configSettings...)), " ") + " "
output += filepath.Join(workspace, "noded") + " "
output += strings.Join(envelope.Args, " ")
path = filepath.Join(workspace, "noded.sh")
if err := ioutil.WriteFile(path, []byte(output), 0700); err != nil {
vlog.Errorf("WriteFile(%v) failed: %v", path, err)
return errOperationFailed
}
return nil
}
// getCurrentFileInfo returns the os.FileInfo for both the symbolic link
// CurrentLink, and the node script in the workspace that this link points to.
func (i *invoker) getCurrentFileInfo() (os.FileInfo, string, error) {
path := i.config.CurrentLink
link, err := os.Lstat(path)
if err != nil {
vlog.Errorf("Lstat(%v) failed: %v", path, err)
return nil, "", err
}
scriptPath, err := filepath.EvalSymlinks(path)
if err != nil {
vlog.Errorf("EvalSymlinks(%v) failed: %v", path, err)
return nil, "", err
}
return link, scriptPath, nil
}
func (i *invoker) updateLink(newScript string) error {
link := i.config.CurrentLink
newLink := link + ".new"
fi, err := os.Lstat(newLink)
if err == nil {
if err := os.Remove(fi.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", fi.Name(), err)
return errOperationFailed
}
}
if err := os.Symlink(newScript, newLink); err != nil {
vlog.Errorf("Symlink(%v, %v) failed: %v", newScript, newLink, err)
return errOperationFailed
}
if err := os.Rename(newLink, link); err != nil {
vlog.Errorf("Rename(%v, %v) failed: %v", newLink, link, err)
return errOperationFailed
}
return nil
}
func (i *invoker) generateCallbackID() string {
i.internal.channelsMutex.Lock()
defer i.internal.channelsMutex.Unlock()
i.internal.nextCallbackID++
return strconv.FormatInt(i.internal.nextCallbackID-1, 10)
}
func (i *invoker) registerCallback(id, key string, channel chan string) {
i.internal.channelsMutex.Lock()
defer i.internal.channelsMutex.Unlock()
if _, ok := i.internal.channels[id]; !ok {
i.internal.channels[id] = make(map[string]chan string)
}
i.internal.channels[id][key] = channel
}
func (i *invoker) revertNodeManager() error {
if err := i.updateLink(i.config.Previous); err != nil {
return err
}
rt.R().Stop()
return nil
}
func (i *invoker) testNodeManager(ctx context.T, workspace string, envelope *application.Envelope) error {
path := filepath.Join(workspace, "noded.sh")
cmd := exec.Command(path)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Setup up the child process callback.
id := i.generateCallbackID()
cfg := config.New()
cfg.Set(mgmt.ParentNodeManagerConfigKey, naming.MakeTerminal(naming.Join(i.config.Name, id)))
handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg})
// Make the channel buffered to avoid blocking the Set method when
// nothing is receiving on the channel. This happens e.g. when
// unregisterCallbacks executes before Set is called.
callbackChan := make(chan string, 1)
i.registerCallback(id, mgmt.ChildNodeManagerConfigKey, callbackChan)
defer i.unregisterCallbacks(id)
// Start the child process.
if err := handle.Start(); err != nil {
vlog.Errorf("Start() failed: %v", err)
return errOperationFailed
}
defer func() {
if err := handle.Clean(); err != nil {
vlog.Errorf("Clean() failed: %v", err)
}
}()
// Wait for the child process to start.
testTimeout := 10 * time.Second
if err := handle.WaitForReady(testTimeout); err != nil {
vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
return errOperationFailed
}
// Wait for the child process to invoke the Callback().
select {
case childName := <-callbackChan:
// Check that invoking Update() succeeds.
childName = naming.MakeTerminal(naming.Join(childName, "nm"))
nmClient, err := node.BindNode(childName)
if err != nil {
vlog.Errorf("BindNode(%v) failed: %v", childName, err)
return errOperationFailed
}
linkOld, pathOld, err := i.getCurrentFileInfo()
if err != nil {
return errOperationFailed
}
// Since the resolution of mtime for files is seconds,
// the test sleeps for a second to make sure it can
// check whether the current symlink is updated.
time.Sleep(time.Second)
if err := nmClient.Revert(ctx); err != nil {
return errOperationFailed
}
linkNew, pathNew, err := i.getCurrentFileInfo()
if err != nil {
return errOperationFailed
}
// Check that the new node manager updated the current symbolic
// link.
if !linkOld.ModTime().Before(linkNew.ModTime()) {
vlog.Errorf("new node manager test failed")
return errOperationFailed
}
// Ensure that the current symbolic link points to the same
// script.
if pathNew != pathOld {
i.updateLink(pathOld)
vlog.Errorf("new node manager test failed")
return errOperationFailed
}
case <-time.After(testTimeout):
vlog.Errorf("Waiting for callback timed out")
return errOperationFailed
}
return nil
}
func (i *invoker) unregisterCallbacks(id string) {
i.internal.channelsMutex.Lock()
defer i.internal.channelsMutex.Unlock()
delete(i.internal.channels, id)
}
func (i *invoker) updateNodeManager(ctx context.T) error {
if len(i.config.Origin) == 0 {
return errUpdateNoOp
}
envelope, err := fetchEnvelope(ctx, i.config.Origin)
if err != nil {
return err
}
if envelope.Title != application.NodeManagerTitle {
return errIncompatibleUpdate
}
if i.config.Envelope != nil && reflect.DeepEqual(envelope, i.config.Envelope) {
return errUpdateNoOp
}
// Create new workspace.
workspace := filepath.Join(i.config.Root, "node-manager", generateVersionDirName())
perm := os.FileMode(0700)
if err := os.MkdirAll(workspace, perm); err != nil {
vlog.Errorf("MkdirAll(%v, %v) failed: %v", workspace, perm, err)
return errOperationFailed
}
deferrer := func() {
if err := os.RemoveAll(workspace); err != nil {
vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err)
}
}
defer func() {
if deferrer != nil {
deferrer()
}
}()
// Populate the new workspace with a node manager binary.
// TODO(caprita): match identical binaries on binary metadata
// rather than binary object name.
sameBinary := i.config.Envelope != nil && envelope.Binary == i.config.Envelope.Binary
if err := generateBinary(workspace, "noded", envelope, !sameBinary); err != nil {
return err
}
// Populate the new workspace with a node manager script.
configSettings, err := i.config.Save(envelope)
if err != nil {
return errOperationFailed
}
if err := generateScript(workspace, configSettings, envelope); err != nil {
return err
}
if err := i.testNodeManager(ctx, workspace, envelope); err != nil {
return err
}
// If the binary has changed, update the node manager symlink.
if err := i.updateLink(filepath.Join(workspace, "noded.sh")); err != nil {
return err
}
rt.R().Stop()
deferrer = nil
return nil
}
func saveEnvelope(dir string, envelope *application.Envelope) error {
jsonEnvelope, err := json.Marshal(envelope)
if err != nil {
vlog.Errorf("Marshal(%v) failed: %v", envelope, err)
return errOperationFailed
}
envelopePath := filepath.Join(dir, "envelope")
if err := ioutil.WriteFile(envelopePath, jsonEnvelope, 0600); err != nil {
vlog.Errorf("WriteFile(%v) failed: %v", envelopePath, err)
return errOperationFailed
}
return nil
}
func loadEnvelope(dir string) (*application.Envelope, error) {
envelopePath := filepath.Join(dir, "envelope")
envelope := new(application.Envelope)
if envelopeBytes, err := ioutil.ReadFile(envelopePath); err != nil {
vlog.Errorf("ReadFile(%v) failed: %v", envelopePath, err)
return nil, errOperationFailed
} else if err := json.Unmarshal(envelopeBytes, envelope); err != nil {
vlog.Errorf("Unmarshal(%v) failed: %v", envelopeBytes, err)
return nil, errOperationFailed
}
return envelope, nil
}
func saveOrigin(dir, originVON string) error {
path := filepath.Join(dir, "origin")
if err := ioutil.WriteFile(path, []byte(originVON), 0600); err != nil {
vlog.Errorf("WriteFile(%v) failed: %v", path, err)
return errOperationFailed
}
return nil
}
// generateID returns a new unique id string. The uniqueness is based on the
// current timestamp. Not cryptographically secure.
func generateID() string {
timestamp := fmt.Sprintf("%v", time.Now().Format(time.RFC3339Nano))
h := crc64.New(crc64.MakeTable(crc64.ISO))
h.Write([]byte(timestamp))
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(h.Sum64()))
return strings.TrimRight(base64.URLEncoding.EncodeToString(b), "=")
}
// TODO(caprita): Nothing prevents different applications from sharing the same
// title, and thereby being installed in the same app dir. Do we want to
// prevent that for the same user or across users?
// applicationDirName generates a cryptographic hash of the application title,
// to be used as a directory name for installations of the application with the
// given title.
func applicationDirName(title string) string {
h := md5.New()
h.Write([]byte(title))
hash := strings.TrimRight(base64.URLEncoding.EncodeToString(h.Sum(nil)), "=")
return "app-" + hash
}
func installationDirName(installationID string) string {
return "installation-" + installationID
}
func instanceDirName(instanceID string) string {
return "instance-" + instanceID
}
func stoppedInstanceDirName(instanceID string) string {
return "stopped-instance-" + instanceID
}
func generateVersionDirName() string {
return time.Now().Format(time.RFC3339Nano)
}
func (i *invoker) Install(call ipc.ServerContext, applicationVON string) (string, error) {
vlog.VI(1).Infof("%v.Install(%q)", i.suffix, applicationVON)
if i.suffix != "apps" {
return "", errInvalidSuffix
}
ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute)
defer cancel()
envelope, err := fetchEnvelope(ctx, applicationVON)
if err != nil {
return "", err
}
if envelope.Title == application.NodeManagerTitle {
// Disallow node manager apps from being installed like a
// regular app.
return "", errInvalidOperation
}
installationID := generateID()
installationDir := filepath.Join(i.config.Root, applicationDirName(envelope.Title), installationDirName(installationID))
versionDir := filepath.Join(installationDir, generateVersionDirName())
perm := os.FileMode(0700)
if err := os.MkdirAll(versionDir, perm); err != nil {
vlog.Errorf("MkdirAll(%v, %v) failed: %v", versionDir, perm, err)
return "", errOperationFailed
}
deferrer := func() {
if err := os.RemoveAll(versionDir); err != nil {
vlog.Errorf("RemoveAll(%v) failed: %v", versionDir, err)
}
}
defer func() {
if deferrer != nil {
deferrer()
}
}()
// TODO(caprita): Share binaries if already existing locally.
if err := generateBinary(versionDir, "bin", envelope, true); err != nil {
return "", err
}
if err := saveEnvelope(versionDir, envelope); err != nil {
return "", err
}
if err := saveOrigin(versionDir, applicationVON); err != nil {
return "", err
}
link := filepath.Join(installationDir, "current")
if err := os.Symlink(versionDir, link); err != nil {
vlog.Errorf("Symlink(%v, %v) failed: %v", versionDir, link, err)
return "", errOperationFailed
}
deferrer = nil
return naming.Join(envelope.Title, installationID), nil
}
func (i *invoker) Refresh(call ipc.ServerContext) error {
vlog.VI(1).Infof("%v.Refresh()", i.suffix)
// TODO(jsimsa): Implement.
return nil
}
func (i *invoker) Restart(call ipc.ServerContext) error {
vlog.VI(1).Infof("%v.Restart()", i.suffix)
// TODO(jsimsa): Implement.
return nil
}
func (i *invoker) Resume(call ipc.ServerContext) error {
vlog.VI(1).Infof("%v.Resume()", i.suffix)
// TODO(jsimsa): Implement.
return nil
}
func (i *invoker) Revert(call ipc.ServerContext) error {
vlog.VI(1).Infof("%v.Revert()", i.suffix)
if i.config.Previous == "" {
return errUpdateNoOp
}
i.internal.updatingMutex.Lock()
if i.internal.updating {
i.internal.updatingMutex.Unlock()
return errInProgress
} else {
i.internal.updating = true
}
i.internal.updatingMutex.Unlock()
err := i.revertNodeManager()
if err != nil {
i.internal.updatingMutex.Lock()
i.internal.updating = false
i.internal.updatingMutex.Unlock()
}
return err
}
func splitName(name string) (ret []string) {
components := strings.Split(name, "/")
for _, c := range components {
if len(c) > 0 {
ret = append(ret, c)
}
}
return
}
func generateCommand(envelope *application.Envelope, binPath, instanceDir string) (*exec.Cmd, error) {
// TODO(caprita): For the purpose of isolating apps, we should run them
// as different users. We'll need to either use the root process or a
// suid script to be able to do it.
cmd := exec.Command(binPath)
// TODO(caprita): Also pass in configuration info like NAMESPACE_ROOT to
// the app (to point to the device mounttable).
cmd.Env = envelope.Env
rootDir := filepath.Join(instanceDir, "root")
perm := os.FileMode(0700)
if err := os.MkdirAll(rootDir, perm); err != nil {
vlog.Errorf("MkdirAll(%v, %v) failed: %v", rootDir, perm, err)
return nil, err
}
cmd.Dir = rootDir
logDir := filepath.Join(instanceDir, "logs")
if err := os.MkdirAll(logDir, perm); err != nil {
vlog.Errorf("MkdirAll(%v, %v) failed: %v", logDir, perm, err)
return nil, err
}
timestamp := time.Now().UnixNano()
var err error
perm = os.FileMode(0600)
cmd.Stdout, err = os.OpenFile(filepath.Join(logDir, fmt.Sprintf("STDOUT-%d", timestamp)), os.O_WRONLY|os.O_CREATE, perm)
if err != nil {
return nil, err
}
cmd.Stderr, err = os.OpenFile(filepath.Join(logDir, fmt.Sprintf("STDERR-%d", timestamp)), os.O_WRONLY|os.O_CREATE, perm)
if err != nil {
return nil, err
}
// Set up args and env.
cmd.Args = append(cmd.Args, "--log_dir=../logs")
cmd.Args = append(cmd.Args, envelope.Args...)
return cmd, nil
}
func (i *invoker) Start(call ipc.ServerContext) ([]string, error) {
vlog.VI(1).Infof("%v.Start()", i.suffix)
if !strings.HasPrefix(i.suffix, "apps") {
return nil, errInvalidSuffix
}
components := splitName(strings.TrimPrefix(i.suffix, "apps"))
if nComponents := len(components); nComponents < 2 {
return nil, fmt.Errorf("Start all installations / all applications not yet implemented (%v)", i.suffix)
} else if nComponents > 2 {
return nil, errInvalidSuffix
}
app, installation := components[0], components[1]
installationDir := filepath.Join(i.config.Root, applicationDirName(app), installationDirName(installation))
if _, err := os.Stat(installationDir); err != nil {
if os.IsNotExist(err) {
return nil, errNotExist
}
vlog.Errorf("Stat(%v) failed: %v", installationDir, err)
return nil, errOperationFailed
}
currLink := filepath.Join(installationDir, "current")
envelope, err := loadEnvelope(currLink)
if err != nil {
return nil, err
}
binPath := filepath.Join(currLink, "bin")
if _, err := os.Stat(binPath); err != nil {
vlog.Errorf("Stat(%v) failed: %v", binPath, err)
return nil, errOperationFailed
}
instanceID := generateID()
// TODO(caprita): Clean up instanceDir upon failure.
instanceDir := filepath.Join(installationDir, "instances", instanceDirName(instanceID))
cmd, err := generateCommand(envelope, binPath, instanceDir)
if err != nil {
vlog.Errorf("generateCommand(%v, %v, %v) failed: %v", envelope, binPath, instanceDir, err)
return nil, errOperationFailed
}
// Setup up the child process callback.
id := i.generateCallbackID()
cfg := config.New()
cfg.Set(mgmt.ParentNodeManagerConfigKey, naming.MakeTerminal(naming.Join(i.config.Name, id)))
handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg})
// Make the channel buffered to avoid blocking the Set method when
// nothing is receiving on the channel. This happens e.g. when
// unregisterCallbacks executes before Set is called.
callbackChan := make(chan string, 1)
i.registerCallback(id, mgmt.AppCycleManagerConfigKey, callbackChan)
defer i.unregisterCallbacks(id)
// Start the child process.
if err := handle.Start(); err != nil {
vlog.Errorf("Start() failed: %v", err)
return nil, errOperationFailed
}
// Wait for the child process to start.
testTimeout := 10 * time.Second
if err := handle.WaitForReady(testTimeout); err != nil {
vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
if err := handle.Clean(); err != nil {
vlog.Errorf("Clean() failed: %v", err)
}
return nil, errOperationFailed
}
select {
case childName := <-callbackChan:
instanceInfo := &instanceInfo{
AppCycleMgrName: childName,
Pid: handle.Pid(),
}
if err := saveInstanceInfo(instanceDir, instanceInfo); err != nil {
if err := handle.Clean(); err != nil {
vlog.Errorf("Clean() failed: %v", err)
}
return nil, err
}
// TODO(caprita): Spin up a goroutine to reap child status upon
// exit and transition it to suspended state if it exits on its
// own.
case <-time.After(testTimeout):
vlog.Errorf("Waiting for callback timed out")
if err := handle.Clean(); err != nil {
vlog.Errorf("Clean() failed: %v", err)
}
return nil, errOperationFailed
}
return []string{instanceID}, nil
}
func saveInstanceInfo(dir string, info *instanceInfo) error {
jsonInfo, err := json.Marshal(info)
if err != nil {
vlog.Errorf("Marshal(%v) failed: %v", info, err)
return errOperationFailed
}
infoPath := filepath.Join(dir, "info")
if err := ioutil.WriteFile(infoPath, jsonInfo, 0600); err != nil {
vlog.Errorf("WriteFile(%v) failed: %v", infoPath, err)
return errOperationFailed
}
return nil
}
func loadInstanceInfo(dir string) (*instanceInfo, error) {
infoPath := filepath.Join(dir, "info")
info := new(instanceInfo)
if infoBytes, err := ioutil.ReadFile(infoPath); err != nil {
vlog.Errorf("ReadFile(%v) failed: %v", infoPath, err)
return nil, errOperationFailed
} else if err := json.Unmarshal(infoBytes, info); err != nil {
vlog.Errorf("Unmarshal(%v) failed: %v", infoBytes, err)
return nil, errOperationFailed
}
return info, nil
}
func (i *invoker) Stop(call ipc.ServerContext, deadline uint32) error {
// TODO(caprita): implement deadline.
ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute)
defer cancel()
vlog.VI(1).Infof("%v.Stop(%v)", i.suffix, deadline)
if !strings.HasPrefix(i.suffix, "apps") {
return errInvalidSuffix
}
components := splitName(strings.TrimPrefix(i.suffix, "apps"))
if nComponents := len(components); nComponents < 3 {
return fmt.Errorf("Stop all instances / all installations / all applications not yet implemented (%v)", i.suffix)
} else if nComponents > 3 {
return errInvalidSuffix
}
app, installation, instance := components[0], components[1], components[2]
instancesDir := filepath.Join(i.config.Root, applicationDirName(app), installationDirName(installation), "instances")
instanceDir := filepath.Join(instancesDir, instanceDirName(instance))
stoppedInstanceDir := filepath.Join(instancesDir, stoppedInstanceDirName(instance))
if err := os.Rename(instanceDir, stoppedInstanceDir); err != nil {
vlog.Errorf("Rename(%v, %v) failed: %v", instanceDir, stoppedInstanceDir, err)
if os.IsNotExist(err) {
return errNotExist
}
vlog.Errorf("Rename(%v, %v) failed: %v", instanceDir, stoppedInstanceDir, err)
return errOperationFailed
}
// TODO(caprita): restore the instance to unstopped upon failure?
info, err := loadInstanceInfo(stoppedInstanceDir)
if err != nil {
return errOperationFailed
}
appStub, err := appcycle.BindAppCycle(info.AppCycleMgrName)
if err != nil {
vlog.Errorf("BindAppCycle(%v) failed: %v", info.AppCycleMgrName, err)
return errOperationFailed
}
stream, err := appStub.Stop(ctx)
if err != nil {
vlog.Errorf("Got error: %v", err)
return errOperationFailed
}
rstream := stream.RecvStream()
for rstream.Advance() {
vlog.VI(2).Infof("%v.Stop(%v) task update: %v", i.suffix, deadline, rstream.Value())
}
if err := rstream.Err(); err != nil {
vlog.Errorf("Stream returned an error: %v", err)
return errOperationFailed
}
if err := stream.Finish(); err != nil {
vlog.Errorf("Got error: %v", err)
return errOperationFailed
}
return nil
}
func (i *invoker) Suspend(call ipc.ServerContext) error {
vlog.VI(1).Infof("%v.Suspend()", i.suffix)
// TODO(jsimsa): Implement.
return nil
}
func (i *invoker) Uninstall(call ipc.ServerContext) error {
vlog.VI(1).Infof("%v.Uninstall()", i.suffix)
// TODO(jsimsa): Implement.
return nil
}
func (i *invoker) Update(call ipc.ServerContext) error {
vlog.VI(1).Infof("%v.Update()", i.suffix)
ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute)
defer cancel()
switch {
case i.suffix == "nm":
// This branch attempts to update the node manager itself.
i.internal.updatingMutex.Lock()
if i.internal.updating {
i.internal.updatingMutex.Unlock()
return errInProgress
} else {
i.internal.updating = true
}
i.internal.updatingMutex.Unlock()
err := i.updateNodeManager(ctx)
if err != nil {
i.internal.updatingMutex.Lock()
i.internal.updating = false
i.internal.updatingMutex.Unlock()
}
return err
case appsSuffix.MatchString(i.suffix):
// TODO(jsimsa): Implement.
return nil
default:
return errInvalidSuffix
}
}
func (i *invoker) UpdateTo(call ipc.ServerContext, von string) error {
vlog.VI(1).Infof("%v.UpdateTo(%q)", i.suffix, von)
// TODO(jsimsa): Implement.
return nil
}
// CONFIG INTERFACE IMPLEMENTATION
func (i *invoker) Set(_ ipc.ServerContext, key, value string) error {
vlog.VI(1).Infof("%v.Set(%v, %v)", i.suffix, key, value)
id := i.suffix
i.internal.channelsMutex.Lock()
if _, ok := i.internal.channels[id]; !ok {
i.internal.channelsMutex.Unlock()
return errInvalidSuffix
}
channel, ok := i.internal.channels[id][key]
i.internal.channelsMutex.Unlock()
if !ok {
return nil
}
channel <- value
return nil
}