| package impl |
| |
| // The implementation of the node manager expects that the node manager |
| // installations are all organized in the following directory structure: |
| // |
| // <config.Root>/ |
| // node-manager/ |
| // <version 1 timestamp>/ - timestamp of when the version was downloaded |
| // noded - the node manager binary |
| // noded.sh - a shell script to start the binary |
| // <version 2 timestamp> |
| // ... |
| // |
| // The node manager is always expected to be started through the symbolic link |
| // passed in as config.CurrentLink, which is monitored by an init daemon. This |
| // provides for simple and robust updates. |
| // |
| // To update the node manager to a newer version, a new workspace is created and |
| // the symlink is updated to the new noded.sh script. Similarly, to revert the |
| // node manager to a previous version, all that is required is to update the |
| // symlink to point to the previous noded.sh script. |
| // |
| // |
| // The node manager manages the applications it installs and runs using the |
| // following directory structure: |
| // |
| // TODO(caprita): Not all is yet implemented. |
| // |
| // <config.Root>/ |
| // app-<hash 1>/ - the application dir is named using a hash of the application title |
| // installation-<id 1>/ - installations are labelled with ids |
| // <version 1 timestamp>/ - timestamp of when the version was downloaded |
| // bin - application binary |
| // previous - symbolic link to previous version directory (TODO) |
| // origin - object name for application envelope |
| // envelope - application envelope (JSON-encoded) |
| // <version 2 timestamp> |
| // ... |
| // current - symbolic link to the current version |
| // instances/ |
| // instance-<id a>/ - instances are labelled with ids |
| // root/ - workspace that the instance is run from |
| // logs/ - stderr/stdout and log files generated by instance |
| // info - app manager name and process id for the instance (if running) |
| // version - symbolic link to installation version for the instance |
| // instance-<id b> |
| // ... |
| // stopped-instance-<id c> - stopped instances have their directory name prepended by 'stopped-' |
| // ... |
| // installation-<id 2> |
| // ... |
| // app-<hash 2> |
| // ... |
| // |
| // When node manager starts up, it goes through all instances and resumes the |
| // ones that are not suspended. If the application was still running, it |
| // suspends it first. If an application fails to resume, it stays suspended. |
| // |
| // When node manager shuts down, it suspends all running instances. |
| // |
| // Start starts an instance. Suspend kills the process but leaves the workspace |
| // untouched. Resume restarts the process. Stop kills the process and prevents |
| // future resumes (it also eventually gc's the workspace). |
| // |
| // If the process dies on its own, it stays dead and is assumed suspended. |
| // TODO(caprita): Later, we'll add auto-restart option. |
| // |
| // Concurrency model: installations can be created independently of one another; |
| // installations can be removed at any time (any running instances will be |
| // stopped). The first call to Uninstall will rename the installation dir as a |
| // first step; subsequent Uninstalls will fail. Instances can be created |
| // independently of one another, as long as the installation exists (if it gets |
| // Uninstalled during an instance Start, the Start may fail). When an instance |
| // is stopped, the first call to Stop renames the instance dir; subsequent Stop |
| // calls will fail. Resume will attempt to create an info file; if one exists |
| // already, Resume fails. Suspend will attempt to rename the info file; if none |
| // present, Suspend will fail. |
| // |
| // TODO(caprita): There is room for synergy between how node manager organizes |
| // its own workspace and that for the applications it runs. In particular, |
| // previous, origin, and envelope could be part of a single config. We'll |
| // refine that later. |
| |
| import ( |
| "crypto/md5" |
| "encoding/base64" |
| "encoding/binary" |
| "encoding/json" |
| "fmt" |
| "hash/crc64" |
| "io/ioutil" |
| "os" |
| "os/exec" |
| "path/filepath" |
| "reflect" |
| "regexp" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| "veyron/lib/config" |
| binlib "veyron/services/mgmt/lib/binary" |
| vexec "veyron/services/mgmt/lib/exec" |
| iconfig "veyron/services/mgmt/node/config" |
| "veyron/services/mgmt/profile" |
| |
| "veyron2/context" |
| "veyron2/ipc" |
| "veyron2/mgmt" |
| "veyron2/naming" |
| "veyron2/rt" |
| "veyron2/services/mgmt/appcycle" |
| "veyron2/services/mgmt/application" |
| binapi "veyron2/services/mgmt/binary" |
| "veyron2/services/mgmt/node" |
| "veyron2/services/mgmt/repository" |
| "veyron2/verror" |
| "veyron2/vlog" |
| ) |
| |
| // instanceInfo holds state about a running instance. |
| type instanceInfo struct { |
| AppCycleMgrName string |
| Pid int |
| } |
| |
| // internalState wraps state shared between different node manager |
| // invocations. |
| type internalState struct { |
| // channels maps callback identifiers and config keys to channels that |
| // are used to communicate corresponding config values from child |
| // processes. |
| channels map[string]map[string]chan string |
| // nextCallbackID provides the next callback identifier to use as key |
| // for the channels map. |
| nextCallbackID int64 |
| // channelsMutex is a lock for coordinating concurrent access to |
| // <channels>. |
| channelsMutex sync.Mutex |
| // updating is a flag that records whether this instance of node |
| // manager is being updated. |
| updating bool |
| // updatingMutex is a lock for coordinating concurrent access to |
| // <updating>. |
| updatingMutex sync.Mutex |
| } |
| |
| // invoker holds the state of a node manager invocation. |
| type invoker struct { |
| // internal holds the node manager's internal state that persists across |
| // RPC requests. |
| internal *internalState |
| // config holds the node manager's (immutable) configuration state. |
| config *iconfig.State |
| // suffix is the suffix of the current invocation that is assumed to |
| // be used as a relative object name to identify an application, |
| // installation, or instance. |
| suffix string |
| } |
| |
| var ( |
| appsSuffix = regexp.MustCompile(`^apps\/.*$`) |
| |
| errInvalidSuffix = verror.BadArgf("invalid suffix") |
| errOperationFailed = verror.Internalf("operation failed") |
| errInProgress = verror.Existsf("operation in progress") |
| errIncompatibleUpdate = verror.BadArgf("update failed: mismatching app title") |
| errUpdateNoOp = verror.NotFoundf("no different version available") |
| errNotExist = verror.NotFoundf("object does not exist") |
| errInvalidOperation = verror.BadArgf("invalid operation") |
| ) |
| |
| // NODE INTERFACE IMPLEMENTATION |
| |
| func (i *invoker) Describe(call ipc.ServerContext) (node.Description, error) { |
| vlog.VI(1).Infof("%v.Describe()", i.suffix) |
| empty := node.Description{} |
| nodeProfile, err := computeNodeProfile() |
| if err != nil { |
| return empty, err |
| } |
| knownProfiles, err := getKnownProfiles() |
| if err != nil { |
| return empty, err |
| } |
| result := matchProfiles(nodeProfile, knownProfiles) |
| return result, nil |
| } |
| |
| func (i *invoker) IsRunnable(call ipc.ServerContext, description binapi.Description) (bool, error) { |
| vlog.VI(1).Infof("%v.IsRunnable(%v)", i.suffix, description) |
| nodeProfile, err := computeNodeProfile() |
| if err != nil { |
| return false, err |
| } |
| binaryProfiles := make([]profile.Specification, 0) |
| for name, _ := range description.Profiles { |
| profile, err := getProfile(name) |
| if err != nil { |
| return false, err |
| } |
| binaryProfiles = append(binaryProfiles, *profile) |
| } |
| result := matchProfiles(nodeProfile, binaryProfiles) |
| return len(result.Profiles) > 0, nil |
| } |
| |
| func (i *invoker) Reset(call ipc.ServerContext, deadline uint64) error { |
| vlog.VI(1).Infof("%v.Reset(%v)", i.suffix, deadline) |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| // APPLICATION INTERFACE IMPLEMENTATION |
| |
| func downloadBinary(workspace, fileName, name string) error { |
| data, err := binlib.Download(name) |
| if err != nil { |
| vlog.Errorf("Download(%v) failed: %v", name, err) |
| return errOperationFailed |
| } |
| path, perm := filepath.Join(workspace, fileName), os.FileMode(755) |
| if err := ioutil.WriteFile(path, data, perm); err != nil { |
| vlog.Errorf("WriteFile(%v, %v) failed: %v", path, perm, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func fetchEnvelope(ctx context.T, origin string) (*application.Envelope, error) { |
| stub, err := repository.BindApplication(origin) |
| if err != nil { |
| vlog.Errorf("BindRepository(%v) failed: %v", origin, err) |
| return nil, errOperationFailed |
| } |
| // TODO(jsimsa): Include logic that computes the set of supported |
| // profiles. |
| profiles := []string{"test"} |
| envelope, err := stub.Match(ctx, profiles) |
| if err != nil { |
| vlog.Errorf("Match(%v) failed: %v", profiles, err) |
| return nil, errOperationFailed |
| } |
| return &envelope, nil |
| } |
| |
| func generateBinary(workspace, fileName string, envelope *application.Envelope, newBinary bool) error { |
| if newBinary { |
| // Download the new binary. |
| return downloadBinary(workspace, fileName, envelope.Binary) |
| } |
| // Link the current binary. |
| path := filepath.Join(workspace, fileName) |
| if err := os.Link(os.Args[0], path); err != nil { |
| vlog.Errorf("Link(%v, %v) failed: %v", os.Args[0], path, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| // TODO(jsimsa): Replace <PreviousEnv> with a command-line flag when |
| // command-line flags in tests are supported. |
| func generateScript(workspace string, configSettings []string, envelope *application.Envelope) error { |
| path, err := filepath.EvalSymlinks(os.Args[0]) |
| if err != nil { |
| vlog.Errorf("EvalSymlinks(%v) failed: %v", os.Args[0], err) |
| return errOperationFailed |
| } |
| output := "#!/bin/bash\n" |
| output += strings.Join(iconfig.QuoteEnv(append(envelope.Env, configSettings...)), " ") + " " |
| output += filepath.Join(workspace, "noded") + " " |
| output += strings.Join(envelope.Args, " ") |
| path = filepath.Join(workspace, "noded.sh") |
| if err := ioutil.WriteFile(path, []byte(output), 0700); err != nil { |
| vlog.Errorf("WriteFile(%v) failed: %v", path, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| // getCurrentFileInfo returns the os.FileInfo for both the symbolic link |
| // CurrentLink, and the node script in the workspace that this link points to. |
| func (i *invoker) getCurrentFileInfo() (os.FileInfo, string, error) { |
| path := i.config.CurrentLink |
| link, err := os.Lstat(path) |
| if err != nil { |
| vlog.Errorf("Lstat(%v) failed: %v", path, err) |
| return nil, "", err |
| } |
| scriptPath, err := filepath.EvalSymlinks(path) |
| if err != nil { |
| vlog.Errorf("EvalSymlinks(%v) failed: %v", path, err) |
| return nil, "", err |
| } |
| return link, scriptPath, nil |
| } |
| |
| func (i *invoker) updateLink(newScript string) error { |
| link := i.config.CurrentLink |
| newLink := link + ".new" |
| fi, err := os.Lstat(newLink) |
| if err == nil { |
| if err := os.Remove(fi.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", fi.Name(), err) |
| return errOperationFailed |
| } |
| } |
| if err := os.Symlink(newScript, newLink); err != nil { |
| vlog.Errorf("Symlink(%v, %v) failed: %v", newScript, newLink, err) |
| return errOperationFailed |
| } |
| if err := os.Rename(newLink, link); err != nil { |
| vlog.Errorf("Rename(%v, %v) failed: %v", newLink, link, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func (i *invoker) generateCallbackID() string { |
| i.internal.channelsMutex.Lock() |
| defer i.internal.channelsMutex.Unlock() |
| i.internal.nextCallbackID++ |
| return strconv.FormatInt(i.internal.nextCallbackID-1, 10) |
| } |
| |
| func (i *invoker) registerCallback(id, key string, channel chan string) { |
| i.internal.channelsMutex.Lock() |
| defer i.internal.channelsMutex.Unlock() |
| if _, ok := i.internal.channels[id]; !ok { |
| i.internal.channels[id] = make(map[string]chan string) |
| } |
| i.internal.channels[id][key] = channel |
| } |
| |
| func (i *invoker) revertNodeManager() error { |
| if err := i.updateLink(i.config.Previous); err != nil { |
| return err |
| } |
| rt.R().Stop() |
| return nil |
| } |
| |
| func (i *invoker) testNodeManager(ctx context.T, workspace string, envelope *application.Envelope) error { |
| path := filepath.Join(workspace, "noded.sh") |
| cmd := exec.Command(path) |
| cmd.Stdout = os.Stdout |
| cmd.Stderr = os.Stderr |
| // Setup up the child process callback. |
| id := i.generateCallbackID() |
| cfg := config.New() |
| cfg.Set(mgmt.ParentNodeManagerConfigKey, naming.MakeTerminal(naming.Join(i.config.Name, id))) |
| handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg}) |
| // Make the channel buffered to avoid blocking the Set method when |
| // nothing is receiving on the channel. This happens e.g. when |
| // unregisterCallbacks executes before Set is called. |
| callbackChan := make(chan string, 1) |
| i.registerCallback(id, mgmt.ChildNodeManagerConfigKey, callbackChan) |
| defer i.unregisterCallbacks(id) |
| // Start the child process. |
| if err := handle.Start(); err != nil { |
| vlog.Errorf("Start() failed: %v", err) |
| return errOperationFailed |
| } |
| defer func() { |
| if err := handle.Clean(); err != nil { |
| vlog.Errorf("Clean() failed: %v", err) |
| } |
| }() |
| // Wait for the child process to start. |
| testTimeout := 10 * time.Second |
| if err := handle.WaitForReady(testTimeout); err != nil { |
| vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err) |
| return errOperationFailed |
| } |
| // Wait for the child process to invoke the Callback(). |
| select { |
| case childName := <-callbackChan: |
| // Check that invoking Update() succeeds. |
| childName = naming.MakeTerminal(naming.Join(childName, "nm")) |
| nmClient, err := node.BindNode(childName) |
| if err != nil { |
| vlog.Errorf("BindNode(%v) failed: %v", childName, err) |
| return errOperationFailed |
| } |
| linkOld, pathOld, err := i.getCurrentFileInfo() |
| if err != nil { |
| return errOperationFailed |
| } |
| // Since the resolution of mtime for files is seconds, |
| // the test sleeps for a second to make sure it can |
| // check whether the current symlink is updated. |
| time.Sleep(time.Second) |
| if err := nmClient.Revert(ctx); err != nil { |
| return errOperationFailed |
| } |
| linkNew, pathNew, err := i.getCurrentFileInfo() |
| if err != nil { |
| return errOperationFailed |
| } |
| // Check that the new node manager updated the current symbolic |
| // link. |
| if !linkOld.ModTime().Before(linkNew.ModTime()) { |
| vlog.Errorf("new node manager test failed") |
| return errOperationFailed |
| } |
| // Ensure that the current symbolic link points to the same |
| // script. |
| if pathNew != pathOld { |
| i.updateLink(pathOld) |
| vlog.Errorf("new node manager test failed") |
| return errOperationFailed |
| } |
| case <-time.After(testTimeout): |
| vlog.Errorf("Waiting for callback timed out") |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func (i *invoker) unregisterCallbacks(id string) { |
| i.internal.channelsMutex.Lock() |
| defer i.internal.channelsMutex.Unlock() |
| delete(i.internal.channels, id) |
| } |
| |
| func (i *invoker) updateNodeManager(ctx context.T) error { |
| if len(i.config.Origin) == 0 { |
| return errUpdateNoOp |
| } |
| envelope, err := fetchEnvelope(ctx, i.config.Origin) |
| if err != nil { |
| return err |
| } |
| if envelope.Title != application.NodeManagerTitle { |
| return errIncompatibleUpdate |
| } |
| if i.config.Envelope != nil && reflect.DeepEqual(envelope, i.config.Envelope) { |
| return errUpdateNoOp |
| } |
| // Create new workspace. |
| workspace := filepath.Join(i.config.Root, "node-manager", generateVersionDirName()) |
| perm := os.FileMode(0700) |
| if err := os.MkdirAll(workspace, perm); err != nil { |
| vlog.Errorf("MkdirAll(%v, %v) failed: %v", workspace, perm, err) |
| return errOperationFailed |
| } |
| deferrer := func() { |
| if err := os.RemoveAll(workspace); err != nil { |
| vlog.Errorf("RemoveAll(%v) failed: %v", workspace, err) |
| } |
| } |
| defer func() { |
| if deferrer != nil { |
| deferrer() |
| } |
| }() |
| // Populate the new workspace with a node manager binary. |
| // TODO(caprita): match identical binaries on binary metadata |
| // rather than binary object name. |
| sameBinary := i.config.Envelope != nil && envelope.Binary == i.config.Envelope.Binary |
| if err := generateBinary(workspace, "noded", envelope, !sameBinary); err != nil { |
| return err |
| } |
| // Populate the new workspace with a node manager script. |
| configSettings, err := i.config.Save(envelope) |
| if err != nil { |
| return errOperationFailed |
| } |
| if err := generateScript(workspace, configSettings, envelope); err != nil { |
| return err |
| } |
| if err := i.testNodeManager(ctx, workspace, envelope); err != nil { |
| return err |
| } |
| // If the binary has changed, update the node manager symlink. |
| if err := i.updateLink(filepath.Join(workspace, "noded.sh")); err != nil { |
| return err |
| } |
| rt.R().Stop() |
| deferrer = nil |
| return nil |
| } |
| |
| func saveEnvelope(dir string, envelope *application.Envelope) error { |
| jsonEnvelope, err := json.Marshal(envelope) |
| if err != nil { |
| vlog.Errorf("Marshal(%v) failed: %v", envelope, err) |
| return errOperationFailed |
| } |
| envelopePath := filepath.Join(dir, "envelope") |
| if err := ioutil.WriteFile(envelopePath, jsonEnvelope, 0600); err != nil { |
| vlog.Errorf("WriteFile(%v) failed: %v", envelopePath, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func loadEnvelope(dir string) (*application.Envelope, error) { |
| envelopePath := filepath.Join(dir, "envelope") |
| envelope := new(application.Envelope) |
| if envelopeBytes, err := ioutil.ReadFile(envelopePath); err != nil { |
| vlog.Errorf("ReadFile(%v) failed: %v", envelopePath, err) |
| return nil, errOperationFailed |
| } else if err := json.Unmarshal(envelopeBytes, envelope); err != nil { |
| vlog.Errorf("Unmarshal(%v) failed: %v", envelopeBytes, err) |
| return nil, errOperationFailed |
| } |
| return envelope, nil |
| } |
| |
| func saveOrigin(dir, originVON string) error { |
| path := filepath.Join(dir, "origin") |
| if err := ioutil.WriteFile(path, []byte(originVON), 0600); err != nil { |
| vlog.Errorf("WriteFile(%v) failed: %v", path, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| // generateID returns a new unique id string. The uniqueness is based on the |
| // current timestamp. Not cryptographically secure. |
| func generateID() string { |
| timestamp := fmt.Sprintf("%v", time.Now().Format(time.RFC3339Nano)) |
| h := crc64.New(crc64.MakeTable(crc64.ISO)) |
| h.Write([]byte(timestamp)) |
| b := make([]byte, 8) |
| binary.LittleEndian.PutUint64(b, uint64(h.Sum64())) |
| return strings.TrimRight(base64.URLEncoding.EncodeToString(b), "=") |
| } |
| |
| // TODO(caprita): Nothing prevents different applications from sharing the same |
| // title, and thereby being installed in the same app dir. Do we want to |
| // prevent that for the same user or across users? |
| |
| // applicationDirName generates a cryptographic hash of the application title, |
| // to be used as a directory name for installations of the application with the |
| // given title. |
| func applicationDirName(title string) string { |
| h := md5.New() |
| h.Write([]byte(title)) |
| hash := strings.TrimRight(base64.URLEncoding.EncodeToString(h.Sum(nil)), "=") |
| return "app-" + hash |
| } |
| |
| func installationDirName(installationID string) string { |
| return "installation-" + installationID |
| } |
| |
| func instanceDirName(instanceID string) string { |
| return "instance-" + instanceID |
| } |
| |
| func stoppedInstanceDirName(instanceID string) string { |
| return "stopped-instance-" + instanceID |
| } |
| |
| func generateVersionDirName() string { |
| return time.Now().Format(time.RFC3339Nano) |
| } |
| |
| func (i *invoker) Install(call ipc.ServerContext, applicationVON string) (string, error) { |
| vlog.VI(1).Infof("%v.Install(%q)", i.suffix, applicationVON) |
| if i.suffix != "apps" { |
| return "", errInvalidSuffix |
| } |
| ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute) |
| defer cancel() |
| envelope, err := fetchEnvelope(ctx, applicationVON) |
| if err != nil { |
| return "", err |
| } |
| if envelope.Title == application.NodeManagerTitle { |
| // Disallow node manager apps from being installed like a |
| // regular app. |
| return "", errInvalidOperation |
| } |
| installationID := generateID() |
| installationDir := filepath.Join(i.config.Root, applicationDirName(envelope.Title), installationDirName(installationID)) |
| versionDir := filepath.Join(installationDir, generateVersionDirName()) |
| perm := os.FileMode(0700) |
| if err := os.MkdirAll(versionDir, perm); err != nil { |
| vlog.Errorf("MkdirAll(%v, %v) failed: %v", versionDir, perm, err) |
| return "", errOperationFailed |
| } |
| deferrer := func() { |
| if err := os.RemoveAll(versionDir); err != nil { |
| vlog.Errorf("RemoveAll(%v) failed: %v", versionDir, err) |
| } |
| } |
| defer func() { |
| if deferrer != nil { |
| deferrer() |
| } |
| }() |
| // TODO(caprita): Share binaries if already existing locally. |
| if err := generateBinary(versionDir, "bin", envelope, true); err != nil { |
| return "", err |
| } |
| if err := saveEnvelope(versionDir, envelope); err != nil { |
| return "", err |
| } |
| if err := saveOrigin(versionDir, applicationVON); err != nil { |
| return "", err |
| } |
| link := filepath.Join(installationDir, "current") |
| if err := os.Symlink(versionDir, link); err != nil { |
| vlog.Errorf("Symlink(%v, %v) failed: %v", versionDir, link, err) |
| return "", errOperationFailed |
| } |
| deferrer = nil |
| return naming.Join(envelope.Title, installationID), nil |
| } |
| |
| func (i *invoker) Refresh(call ipc.ServerContext) error { |
| vlog.VI(1).Infof("%v.Refresh()", i.suffix) |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (i *invoker) Restart(call ipc.ServerContext) error { |
| vlog.VI(1).Infof("%v.Restart()", i.suffix) |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (i *invoker) Resume(call ipc.ServerContext) error { |
| vlog.VI(1).Infof("%v.Resume()", i.suffix) |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (i *invoker) Revert(call ipc.ServerContext) error { |
| vlog.VI(1).Infof("%v.Revert()", i.suffix) |
| if i.config.Previous == "" { |
| return errUpdateNoOp |
| } |
| i.internal.updatingMutex.Lock() |
| if i.internal.updating { |
| i.internal.updatingMutex.Unlock() |
| return errInProgress |
| } else { |
| i.internal.updating = true |
| } |
| i.internal.updatingMutex.Unlock() |
| err := i.revertNodeManager() |
| if err != nil { |
| i.internal.updatingMutex.Lock() |
| i.internal.updating = false |
| i.internal.updatingMutex.Unlock() |
| } |
| return err |
| } |
| |
| func splitName(name string) (ret []string) { |
| components := strings.Split(name, "/") |
| for _, c := range components { |
| if len(c) > 0 { |
| ret = append(ret, c) |
| } |
| } |
| return |
| } |
| |
| func generateCommand(envelope *application.Envelope, binPath, instanceDir string) (*exec.Cmd, error) { |
| // TODO(caprita): For the purpose of isolating apps, we should run them |
| // as different users. We'll need to either use the root process or a |
| // suid script to be able to do it. |
| cmd := exec.Command(binPath) |
| // TODO(caprita): Also pass in configuration info like NAMESPACE_ROOT to |
| // the app (to point to the device mounttable). |
| cmd.Env = envelope.Env |
| rootDir := filepath.Join(instanceDir, "root") |
| perm := os.FileMode(0700) |
| if err := os.MkdirAll(rootDir, perm); err != nil { |
| vlog.Errorf("MkdirAll(%v, %v) failed: %v", rootDir, perm, err) |
| return nil, err |
| } |
| cmd.Dir = rootDir |
| logDir := filepath.Join(instanceDir, "logs") |
| if err := os.MkdirAll(logDir, perm); err != nil { |
| vlog.Errorf("MkdirAll(%v, %v) failed: %v", logDir, perm, err) |
| return nil, err |
| } |
| timestamp := time.Now().UnixNano() |
| var err error |
| perm = os.FileMode(0600) |
| cmd.Stdout, err = os.OpenFile(filepath.Join(logDir, fmt.Sprintf("STDOUT-%d", timestamp)), os.O_WRONLY|os.O_CREATE, perm) |
| if err != nil { |
| return nil, err |
| } |
| |
| cmd.Stderr, err = os.OpenFile(filepath.Join(logDir, fmt.Sprintf("STDERR-%d", timestamp)), os.O_WRONLY|os.O_CREATE, perm) |
| if err != nil { |
| return nil, err |
| } |
| // Set up args and env. |
| cmd.Args = append(cmd.Args, "--log_dir=../logs") |
| cmd.Args = append(cmd.Args, envelope.Args...) |
| return cmd, nil |
| } |
| |
| func (i *invoker) Start(call ipc.ServerContext) ([]string, error) { |
| vlog.VI(1).Infof("%v.Start()", i.suffix) |
| if !strings.HasPrefix(i.suffix, "apps") { |
| return nil, errInvalidSuffix |
| } |
| components := splitName(strings.TrimPrefix(i.suffix, "apps")) |
| if nComponents := len(components); nComponents < 2 { |
| return nil, fmt.Errorf("Start all installations / all applications not yet implemented (%v)", i.suffix) |
| } else if nComponents > 2 { |
| return nil, errInvalidSuffix |
| } |
| app, installation := components[0], components[1] |
| installationDir := filepath.Join(i.config.Root, applicationDirName(app), installationDirName(installation)) |
| if _, err := os.Stat(installationDir); err != nil { |
| if os.IsNotExist(err) { |
| return nil, errNotExist |
| } |
| vlog.Errorf("Stat(%v) failed: %v", installationDir, err) |
| return nil, errOperationFailed |
| } |
| currLink := filepath.Join(installationDir, "current") |
| envelope, err := loadEnvelope(currLink) |
| if err != nil { |
| return nil, err |
| } |
| binPath := filepath.Join(currLink, "bin") |
| if _, err := os.Stat(binPath); err != nil { |
| vlog.Errorf("Stat(%v) failed: %v", binPath, err) |
| return nil, errOperationFailed |
| } |
| instanceID := generateID() |
| // TODO(caprita): Clean up instanceDir upon failure. |
| instanceDir := filepath.Join(installationDir, "instances", instanceDirName(instanceID)) |
| cmd, err := generateCommand(envelope, binPath, instanceDir) |
| if err != nil { |
| vlog.Errorf("generateCommand(%v, %v, %v) failed: %v", envelope, binPath, instanceDir, err) |
| return nil, errOperationFailed |
| } |
| // Setup up the child process callback. |
| id := i.generateCallbackID() |
| cfg := config.New() |
| cfg.Set(mgmt.ParentNodeManagerConfigKey, naming.MakeTerminal(naming.Join(i.config.Name, id))) |
| handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg}) |
| // Make the channel buffered to avoid blocking the Set method when |
| // nothing is receiving on the channel. This happens e.g. when |
| // unregisterCallbacks executes before Set is called. |
| callbackChan := make(chan string, 1) |
| i.registerCallback(id, mgmt.AppCycleManagerConfigKey, callbackChan) |
| defer i.unregisterCallbacks(id) |
| // Start the child process. |
| if err := handle.Start(); err != nil { |
| vlog.Errorf("Start() failed: %v", err) |
| return nil, errOperationFailed |
| } |
| // Wait for the child process to start. |
| testTimeout := 10 * time.Second |
| if err := handle.WaitForReady(testTimeout); err != nil { |
| vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err) |
| if err := handle.Clean(); err != nil { |
| vlog.Errorf("Clean() failed: %v", err) |
| } |
| return nil, errOperationFailed |
| } |
| select { |
| case childName := <-callbackChan: |
| instanceInfo := &instanceInfo{ |
| AppCycleMgrName: childName, |
| Pid: handle.Pid(), |
| } |
| if err := saveInstanceInfo(instanceDir, instanceInfo); err != nil { |
| if err := handle.Clean(); err != nil { |
| vlog.Errorf("Clean() failed: %v", err) |
| } |
| return nil, err |
| } |
| // TODO(caprita): Spin up a goroutine to reap child status upon |
| // exit and transition it to suspended state if it exits on its |
| // own. |
| case <-time.After(testTimeout): |
| vlog.Errorf("Waiting for callback timed out") |
| if err := handle.Clean(); err != nil { |
| vlog.Errorf("Clean() failed: %v", err) |
| } |
| return nil, errOperationFailed |
| } |
| return []string{instanceID}, nil |
| } |
| |
| func saveInstanceInfo(dir string, info *instanceInfo) error { |
| jsonInfo, err := json.Marshal(info) |
| if err != nil { |
| vlog.Errorf("Marshal(%v) failed: %v", info, err) |
| return errOperationFailed |
| } |
| infoPath := filepath.Join(dir, "info") |
| if err := ioutil.WriteFile(infoPath, jsonInfo, 0600); err != nil { |
| vlog.Errorf("WriteFile(%v) failed: %v", infoPath, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func loadInstanceInfo(dir string) (*instanceInfo, error) { |
| infoPath := filepath.Join(dir, "info") |
| info := new(instanceInfo) |
| if infoBytes, err := ioutil.ReadFile(infoPath); err != nil { |
| vlog.Errorf("ReadFile(%v) failed: %v", infoPath, err) |
| return nil, errOperationFailed |
| } else if err := json.Unmarshal(infoBytes, info); err != nil { |
| vlog.Errorf("Unmarshal(%v) failed: %v", infoBytes, err) |
| return nil, errOperationFailed |
| } |
| return info, nil |
| } |
| |
| func (i *invoker) Stop(call ipc.ServerContext, deadline uint32) error { |
| // TODO(caprita): implement deadline. |
| ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute) |
| defer cancel() |
| vlog.VI(1).Infof("%v.Stop(%v)", i.suffix, deadline) |
| if !strings.HasPrefix(i.suffix, "apps") { |
| return errInvalidSuffix |
| } |
| components := splitName(strings.TrimPrefix(i.suffix, "apps")) |
| if nComponents := len(components); nComponents < 3 { |
| return fmt.Errorf("Stop all instances / all installations / all applications not yet implemented (%v)", i.suffix) |
| } else if nComponents > 3 { |
| return errInvalidSuffix |
| } |
| app, installation, instance := components[0], components[1], components[2] |
| instancesDir := filepath.Join(i.config.Root, applicationDirName(app), installationDirName(installation), "instances") |
| instanceDir := filepath.Join(instancesDir, instanceDirName(instance)) |
| stoppedInstanceDir := filepath.Join(instancesDir, stoppedInstanceDirName(instance)) |
| if err := os.Rename(instanceDir, stoppedInstanceDir); err != nil { |
| vlog.Errorf("Rename(%v, %v) failed: %v", instanceDir, stoppedInstanceDir, err) |
| if os.IsNotExist(err) { |
| return errNotExist |
| } |
| vlog.Errorf("Rename(%v, %v) failed: %v", instanceDir, stoppedInstanceDir, err) |
| return errOperationFailed |
| } |
| // TODO(caprita): restore the instance to unstopped upon failure? |
| |
| info, err := loadInstanceInfo(stoppedInstanceDir) |
| if err != nil { |
| return errOperationFailed |
| } |
| appStub, err := appcycle.BindAppCycle(info.AppCycleMgrName) |
| if err != nil { |
| vlog.Errorf("BindAppCycle(%v) failed: %v", info.AppCycleMgrName, err) |
| return errOperationFailed |
| } |
| stream, err := appStub.Stop(ctx) |
| if err != nil { |
| vlog.Errorf("Got error: %v", err) |
| return errOperationFailed |
| } |
| rstream := stream.RecvStream() |
| for rstream.Advance() { |
| vlog.VI(2).Infof("%v.Stop(%v) task update: %v", i.suffix, deadline, rstream.Value()) |
| } |
| if err := rstream.Err(); err != nil { |
| vlog.Errorf("Stream returned an error: %v", err) |
| return errOperationFailed |
| } |
| if err := stream.Finish(); err != nil { |
| vlog.Errorf("Got error: %v", err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func (i *invoker) Suspend(call ipc.ServerContext) error { |
| vlog.VI(1).Infof("%v.Suspend()", i.suffix) |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (i *invoker) Uninstall(call ipc.ServerContext) error { |
| vlog.VI(1).Infof("%v.Uninstall()", i.suffix) |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| func (i *invoker) Update(call ipc.ServerContext) error { |
| vlog.VI(1).Infof("%v.Update()", i.suffix) |
| ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute) |
| defer cancel() |
| switch { |
| case i.suffix == "nm": |
| // This branch attempts to update the node manager itself. |
| i.internal.updatingMutex.Lock() |
| if i.internal.updating { |
| i.internal.updatingMutex.Unlock() |
| return errInProgress |
| } else { |
| i.internal.updating = true |
| } |
| i.internal.updatingMutex.Unlock() |
| err := i.updateNodeManager(ctx) |
| if err != nil { |
| i.internal.updatingMutex.Lock() |
| i.internal.updating = false |
| i.internal.updatingMutex.Unlock() |
| } |
| return err |
| case appsSuffix.MatchString(i.suffix): |
| // TODO(jsimsa): Implement. |
| return nil |
| default: |
| return errInvalidSuffix |
| } |
| |
| } |
| |
| func (i *invoker) UpdateTo(call ipc.ServerContext, von string) error { |
| vlog.VI(1).Infof("%v.UpdateTo(%q)", i.suffix, von) |
| // TODO(jsimsa): Implement. |
| return nil |
| } |
| |
| // CONFIG INTERFACE IMPLEMENTATION |
| |
| func (i *invoker) Set(_ ipc.ServerContext, key, value string) error { |
| vlog.VI(1).Infof("%v.Set(%v, %v)", i.suffix, key, value) |
| id := i.suffix |
| i.internal.channelsMutex.Lock() |
| if _, ok := i.internal.channels[id]; !ok { |
| i.internal.channelsMutex.Unlock() |
| return errInvalidSuffix |
| } |
| channel, ok := i.internal.channels[id][key] |
| i.internal.channelsMutex.Unlock() |
| if !ok { |
| return nil |
| } |
| channel <- value |
| return nil |
| } |