| package impl |
| |
| // The node invoker is responsible for managing the state of the node manager |
| // itself. The implementation expects that the node manager installations are |
| // all organized in the following directory structure: |
| // |
| // <config.Root>/ |
| // node-manager/ |
| // <version 1 timestamp>/ - timestamp of when the version was downloaded |
| // noded - the node manager binary |
| // noded.sh - a shell script to start the binary |
| // <version 2 timestamp> |
| // ... |
| // |
| // The node manager is always expected to be started through the symbolic link |
| // passed in as config.CurrentLink, which is monitored by an init daemon. This |
| // provides for simple and robust updates. |
| // |
| // To update the node manager to a newer version, a new workspace is created and |
| // the symlink is updated to the new noded.sh script. Similarly, to revert the |
| // node manager to a previous version, all that is required is to update the |
| // symlink to point to the previous noded.sh script. |
| |
| import ( |
| "io/ioutil" |
| "os" |
| "os/exec" |
| "path/filepath" |
| "reflect" |
| "strings" |
| "sync" |
| "time" |
| |
| "veyron/lib/config" |
| vexec "veyron/services/mgmt/lib/exec" |
| iconfig "veyron/services/mgmt/node/config" |
| "veyron/services/mgmt/profile" |
| |
| "veyron2/context" |
| "veyron2/ipc" |
| "veyron2/mgmt" |
| "veyron2/naming" |
| "veyron2/rt" |
| "veyron2/services/mgmt/application" |
| "veyron2/services/mgmt/binary" |
| "veyron2/services/mgmt/node" |
| "veyron2/vlog" |
| ) |
| |
| type updatingState struct { |
| // updating is a flag that records whether this instance of node |
| // manager is being updated. |
| updating bool |
| // updatingMutex is a lock for coordinating concurrent access to |
| // <updating>. |
| updatingMutex sync.Mutex |
| } |
| |
| func newUpdatingState() *updatingState { |
| return new(updatingState) |
| } |
| |
| func (u *updatingState) testAndSetUpdating() bool { |
| u.updatingMutex.Lock() |
| defer u.updatingMutex.Unlock() |
| if u.updating { |
| return true |
| } |
| u.updating = true |
| return false |
| } |
| |
| func (u *updatingState) unsetUpdating() { |
| u.updatingMutex.Lock() |
| u.updating = false |
| u.updatingMutex.Unlock() |
| } |
| |
| // nodeInvoker holds the state of a node manager method invocation. |
| type nodeInvoker struct { |
| updating *updatingState |
| callback *callbackState |
| config *iconfig.State |
| } |
| |
| func (*nodeInvoker) Describe(ipc.ServerContext) (node.Description, error) { |
| empty := node.Description{} |
| nodeProfile, err := computeNodeProfile() |
| if err != nil { |
| return empty, err |
| } |
| knownProfiles, err := getKnownProfiles() |
| if err != nil { |
| return empty, err |
| } |
| result := matchProfiles(nodeProfile, knownProfiles) |
| return result, nil |
| } |
| |
| func (*nodeInvoker) IsRunnable(_ ipc.ServerContext, description binary.Description) (bool, error) { |
| nodeProfile, err := computeNodeProfile() |
| if err != nil { |
| return false, err |
| } |
| binaryProfiles := make([]profile.Specification, 0) |
| for name, _ := range description.Profiles { |
| profile, err := getProfile(name) |
| if err != nil { |
| return false, err |
| } |
| binaryProfiles = append(binaryProfiles, *profile) |
| } |
| result := matchProfiles(nodeProfile, binaryProfiles) |
| return len(result.Profiles) > 0, nil |
| } |
| |
| func (*nodeInvoker) Reset(call ipc.ServerContext, deadline uint64) error { |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| // getCurrentFileInfo returns the os.FileInfo for both the symbolic link |
| // CurrentLink, and the node script in the workspace that this link points to. |
| func (i *nodeInvoker) getCurrentFileInfo() (os.FileInfo, string, error) { |
| path := i.config.CurrentLink |
| link, err := os.Lstat(path) |
| if err != nil { |
| vlog.Errorf("Lstat(%v) failed: %v", path, err) |
| return nil, "", err |
| } |
| scriptPath, err := filepath.EvalSymlinks(path) |
| if err != nil { |
| vlog.Errorf("EvalSymlinks(%v) failed: %v", path, err) |
| return nil, "", err |
| } |
| return link, scriptPath, nil |
| } |
| |
| func (i *nodeInvoker) updateLink(newScript string) error { |
| link := i.config.CurrentLink |
| newLink := link + ".new" |
| fi, err := os.Lstat(newLink) |
| if err == nil { |
| if err := os.Remove(fi.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", fi.Name(), err) |
| return errOperationFailed |
| } |
| } |
| if err := os.Symlink(newScript, newLink); err != nil { |
| vlog.Errorf("Symlink(%v, %v) failed: %v", newScript, newLink, err) |
| return errOperationFailed |
| } |
| if err := os.Rename(newLink, link); err != nil { |
| vlog.Errorf("Rename(%v, %v) failed: %v", newLink, link, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func (i *nodeInvoker) revertNodeManager() error { |
| if err := i.updateLink(i.config.Previous); err != nil { |
| return err |
| } |
| rt.R().Stop() |
| return nil |
| } |
| |
| func (i *nodeInvoker) testNodeManager(ctx context.T, workspace string, envelope *application.Envelope) error { |
| path := filepath.Join(workspace, "noded.sh") |
| cmd := exec.Command(path) |
| cmd.Stdout = os.Stdout |
| cmd.Stderr = os.Stderr |
| // Setup up the child process callback. |
| callbackState := i.callback |
| id := callbackState.generateID() |
| cfg := config.New() |
| cfg.Set(mgmt.ParentNodeManagerConfigKey, naming.MakeTerminal(naming.Join(i.config.Name, configSuffix, id))) |
| handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg}) |
| // Make the channel buffered to avoid blocking the Set method when |
| // nothing is receiving on the channel. This happens e.g. when |
| // unregisterCallbacks executes before Set is called. |
| callbackChan := make(chan string, 1) |
| callbackState.register(id, mgmt.ChildNodeManagerConfigKey, callbackChan) |
| defer callbackState.unregister(id) |
| // Start the child process. |
| if err := handle.Start(); err != nil { |
| vlog.Errorf("Start() failed: %v", err) |
| return errOperationFailed |
| } |
| defer func() { |
| if err := handle.Clean(); err != nil { |
| vlog.Errorf("Clean() failed: %v", err) |
| } |
| }() |
| // Wait for the child process to start. |
| testTimeout := 10 * time.Second |
| if err := handle.WaitForReady(testTimeout); err != nil { |
| vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err) |
| return errOperationFailed |
| } |
| // Wait for the child process to invoke the Callback(). |
| select { |
| case childName := <-callbackChan: |
| // Check that invoking Update() succeeds. |
| childName = naming.MakeTerminal(naming.Join(childName, "nm")) |
| nmClient, err := node.BindNode(childName) |
| if err != nil { |
| vlog.Errorf("BindNode(%v) failed: %v", childName, err) |
| return errOperationFailed |
| } |
| linkOld, pathOld, err := i.getCurrentFileInfo() |
| if err != nil { |
| return errOperationFailed |
| } |
| // Since the resolution of mtime for files is seconds, |
| // the test sleeps for a second to make sure it can |
| // check whether the current symlink is updated. |
| time.Sleep(time.Second) |
| if err := nmClient.Revert(ctx); err != nil { |
| return errOperationFailed |
| } |
| linkNew, pathNew, err := i.getCurrentFileInfo() |
| if err != nil { |
| return errOperationFailed |
| } |
| // Check that the new node manager updated the current symbolic |
| // link. |
| if !linkOld.ModTime().Before(linkNew.ModTime()) { |
| vlog.Errorf("new node manager test failed") |
| return errOperationFailed |
| } |
| // Ensure that the current symbolic link points to the same |
| // script. |
| if pathNew != pathOld { |
| i.updateLink(pathOld) |
| vlog.Errorf("new node manager test failed") |
| return errOperationFailed |
| } |
| case <-time.After(testTimeout): |
| vlog.Errorf("Waiting for callback timed out") |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func generateScript(workspace string, configSettings []string, envelope *application.Envelope) error { |
| path, err := filepath.EvalSymlinks(os.Args[0]) |
| if err != nil { |
| vlog.Errorf("EvalSymlinks(%v) failed: %v", os.Args[0], err) |
| return errOperationFailed |
| } |
| output := "#!/bin/bash\n" |
| output += strings.Join(iconfig.QuoteEnv(append(envelope.Env, configSettings...)), " ") + " " |
| output += filepath.Join(workspace, "noded") + " " |
| output += strings.Join(envelope.Args, " ") |
| path = filepath.Join(workspace, "noded.sh") |
| if err := ioutil.WriteFile(path, []byte(output), 0700); err != nil { |
| vlog.Errorf("WriteFile(%v) failed: %v", path, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func (i *nodeInvoker) updateNodeManager(ctx context.T) error { |
| if len(i.config.Origin) == 0 { |
| return errUpdateNoOp |
| } |
| envelope, err := fetchEnvelope(ctx, i.config.Origin) |
| if err != nil { |
| return err |
| } |
| if envelope.Title != application.NodeManagerTitle { |
| return errIncompatibleUpdate |
| } |
| if i.config.Envelope != nil && reflect.DeepEqual(envelope, i.config.Envelope) { |
| return errUpdateNoOp |
| } |
| // Create new workspace. |
| workspace := filepath.Join(i.config.Root, "node-manager", generateVersionDirName()) |
| perm := os.FileMode(0700) |
| if err := os.MkdirAll(workspace, perm); err != nil { |
| vlog.Errorf("MkdirAll(%v, %v) failed: %v", workspace, perm, err) |
| return errOperationFailed |
| } |
| deferrer := func() { |
| if err := os.RemoveAll(workspace); err != nil { |
| vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err) |
| } |
| } |
| defer func() { |
| if deferrer != nil { |
| deferrer() |
| } |
| }() |
| // Populate the new workspace with a node manager binary. |
| // TODO(caprita): match identical binaries on binary metadata |
| // rather than binary object name. |
| sameBinary := i.config.Envelope != nil && envelope.Binary == i.config.Envelope.Binary |
| if err := generateBinary(workspace, "noded", envelope, !sameBinary); err != nil { |
| return err |
| } |
| // Populate the new workspace with a node manager script. |
| configSettings, err := i.config.Save(envelope) |
| if err != nil { |
| return errOperationFailed |
| } |
| if err := generateScript(workspace, configSettings, envelope); err != nil { |
| return err |
| } |
| if err := i.testNodeManager(ctx, workspace, envelope); err != nil { |
| return err |
| } |
| // If the binary has changed, update the node manager symlink. |
| if err := i.updateLink(filepath.Join(workspace, "noded.sh")); err != nil { |
| return err |
| } |
| rt.R().Stop() |
| deferrer = nil |
| return nil |
| } |
| |
| func (*nodeInvoker) Install(ipc.ServerContext, string) (string, error) { |
| return "", errInvalidSuffix |
| } |
| |
| func (*nodeInvoker) Refresh(ipc.ServerContext) error { |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (*nodeInvoker) Restart(ipc.ServerContext) error { |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (*nodeInvoker) Resume(ipc.ServerContext) error { |
| return errInvalidSuffix |
| } |
| |
| func (i *nodeInvoker) Revert(call ipc.ServerContext) error { |
| if i.config.Previous == "" { |
| return errUpdateNoOp |
| } |
| updatingState := i.updating |
| if updatingState.testAndSetUpdating() { |
| return errInProgress |
| } |
| err := i.revertNodeManager() |
| if err != nil { |
| updatingState.unsetUpdating() |
| } |
| return err |
| } |
| |
| func (*nodeInvoker) Start(ipc.ServerContext) ([]string, error) { |
| return nil, errInvalidSuffix |
| } |
| |
| func (*nodeInvoker) Stop(ipc.ServerContext, uint32) error { |
| return errInvalidSuffix |
| } |
| |
| func (*nodeInvoker) Suspend(ipc.ServerContext) error { |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (*nodeInvoker) Uninstall(ipc.ServerContext) error { |
| return errInvalidSuffix |
| } |
| |
| func (i *nodeInvoker) Update(ipc.ServerContext) error { |
| ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute) |
| defer cancel() |
| updatingState := i.updating |
| if updatingState.testAndSetUpdating() { |
| return errInProgress |
| } |
| err := i.updateNodeManager(ctx) |
| if err != nil { |
| updatingState.unsetUpdating() |
| } |
| return err |
| } |
| |
| func (*nodeInvoker) UpdateTo(ipc.ServerContext, string) error { |
| // TODO(jsimsa): Implement. |
| return nil |
| } |