blob: f24f208aa0a1b216e07311cad9fac77fd792118b [file] [log] [blame]
package rt
import (
"fmt"
"os"
"sync"
"time"
"veyron/runtimes/google/appcycle"
"veyron/services/mgmt/lib/exec"
"veyron2"
"veyron2/ipc"
"veyron2/mgmt"
"veyron2/naming"
)
type mgmtImpl struct {
sync.RWMutex
waiters []chan<- string
taskTrackers []chan<- veyron2.Task
task veyron2.Task
shutDown bool
rt *vrt
server ipc.Server // Serves AppCycle service.
}
// parentName returns the object name for the Config service on which we should
// communicate the object name of the app cycle service. Currently, this can
// be configured either via env vars or via the exec config passed from parent.
func parentName() (name string) {
name = os.Getenv(mgmt.ParentNodeManagerConfigKey)
if len(name) > 0 {
return
}
handle, _ := exec.GetChildHandle()
if handle == nil {
return
}
name, _ = handle.Config.Get(mgmt.ParentNodeManagerConfigKey)
return
}
func (m *mgmtImpl) init(rt *vrt) error {
m.rt = rt
parentName := parentName()
if len(parentName) == 0 {
return nil
}
var err error
if m.server, err = rt.NewServer(); err != nil {
return err
}
// TODO(caprita): We should pick the address to listen on from config.
var ep naming.Endpoint
if ep, err = m.server.Listen("tcp", "127.0.0.1:0"); err != nil {
return err
}
if err := m.server.Serve("", ipc.SoloDispatcher(appcycle.NewServerAppCycle(m), nil)); err != nil {
return err
}
return m.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
}
func (m *mgmtImpl) callbackToParent(parentName, myName string) error {
ctx, _ := m.rt.NewContext().WithTimeout(10 * time.Second)
call, err := m.rt.Client().StartCall(ctx, parentName, "Set", []interface{}{mgmt.AppCycleManagerConfigKey, myName})
if err != nil {
return err
}
if ierr := call.Finish(&err); ierr != nil {
return ierr
}
return err
}
func (m *mgmtImpl) shutdown() {
m.Lock()
if m.shutDown {
m.Unlock()
return
}
m.shutDown = true
for _, t := range m.taskTrackers {
close(t)
}
m.taskTrackers = nil
server := m.server
m.Unlock()
if server != nil {
server.Stop()
}
}
func (rt *vrt) stop(msg string) {
rt.mgmt.RLock()
defer rt.mgmt.RUnlock()
if len(rt.mgmt.waiters) == 0 {
os.Exit(veyron2.UnhandledStopExitCode)
}
for _, w := range rt.mgmt.waiters {
select {
case w <- msg:
default:
}
}
}
func (rt *vrt) Stop() {
rt.stop(veyron2.LocalStop)
}
func (*vrt) ForceStop() {
os.Exit(veyron2.ForceStopExitCode)
}
func (rt *vrt) WaitForStop(ch chan<- string) {
rt.mgmt.Lock()
defer rt.mgmt.Unlock()
rt.mgmt.waiters = append(rt.mgmt.waiters, ch)
}
func (rt *vrt) TrackTask(ch chan<- veyron2.Task) {
rt.mgmt.Lock()
defer rt.mgmt.Unlock()
if rt.mgmt.shutDown {
close(ch)
return
}
rt.mgmt.taskTrackers = append(rt.mgmt.taskTrackers, ch)
}
func (rt *vrt) advanceTask(progress, goal int) {
rt.mgmt.Lock()
defer rt.mgmt.Unlock()
rt.mgmt.task.Goal += goal
rt.mgmt.task.Progress += progress
for _, t := range rt.mgmt.taskTrackers {
select {
case t <- rt.mgmt.task:
default:
// TODO(caprita): Make it such that the latest task
// update is always added to the channel even if channel
// is full. One way is to pull an element from t and
// then re-try the push.
}
}
}
func (rt *vrt) AdvanceGoal(delta int) {
if delta <= 0 {
return
}
rt.advanceTask(0, delta)
}
func (rt *vrt) AdvanceProgress(delta int) {
if delta <= 0 {
return
}
rt.advanceTask(delta, 0)
}
func (m *mgmtImpl) Stop(_ ipc.ServerContext, stream appcycle.AppCycleServiceStopStream) error {
// TODO(caprita): Include identity of Stop issuer in message.
m.rt.stop(veyron2.RemoteStop)
// The size of the channel should be reasonably sized to expect not to
// miss updates while we're waiting for the stream to unblock.
ch := make(chan veyron2.Task, 10)
m.rt.TrackTask(ch)
for {
task, ok := <-ch
if !ok {
// Channel closed, meaning process shutdown is imminent.
break
}
stream.Send(task)
}
return nil
}
func (m *mgmtImpl) ForceStop(ipc.ServerContext) error {
m.rt.ForceStop()
return fmt.Errorf("ForceStop should not reply as the process should be dead")
}