blob: c94306ca7c22d914a3e344eb4d834c97bee20bdb [file] [log] [blame]
Cosmos Nicolaou39e3ae52014-11-14 13:30:01 -08001package appcycle
2
3import (
4 "fmt"
5 "os"
6 "sync"
7
8 "veyron.io/veyron/veyron2"
9 "veyron.io/veyron/veyron2/ipc"
10 stub "veyron.io/veyron/veyron2/services/mgmt/appcycle"
11)
12
13type AppCycle struct {
14 sync.RWMutex
15 waiters []chan<- string
16 taskTrackers []chan<- veyron2.Task
17 task veyron2.Task
18 shutDown bool
Cosmos Nicolaou39e3ae52014-11-14 13:30:01 -080019 disp *invoker
20}
21
22type invoker struct {
23 ac *AppCycle
24}
25
Bogdan Caprita3e8f9642014-12-05 14:29:40 -080026func New() *AppCycle {
27 ac := new(AppCycle)
Cosmos Nicolaou39e3ae52014-11-14 13:30:01 -080028 ac.disp = &invoker{ac}
29 return ac
30}
31
32func (m *AppCycle) Shutdown() {
33 m.Lock()
34 defer m.Unlock()
35 if m.shutDown {
36 return
37 }
38 m.shutDown = true
39 for _, t := range m.taskTrackers {
40 close(t)
41 }
42 m.taskTrackers = nil
43}
44
45func (m *AppCycle) stop(msg string) {
46 m.RLock()
47 defer m.RUnlock()
48 if len(m.waiters) == 0 {
49 os.Exit(veyron2.UnhandledStopExitCode)
50 }
51 for _, w := range m.waiters {
52 select {
53 case w <- msg:
54 default:
55 }
56 }
57}
58
59func (m *AppCycle) Stop() {
60 m.stop(veyron2.LocalStop)
61}
62
63func (*AppCycle) ForceStop() {
64 os.Exit(veyron2.ForceStopExitCode)
65}
66
67func (m *AppCycle) WaitForStop(ch chan<- string) {
68 m.Lock()
69 defer m.Unlock()
70 m.waiters = append(m.waiters, ch)
71}
72
73func (m *AppCycle) TrackTask(ch chan<- veyron2.Task) {
74 m.Lock()
75 defer m.Unlock()
76 if m.shutDown {
77 close(ch)
78 return
79 }
80 m.taskTrackers = append(m.taskTrackers, ch)
81}
82
83func (m *AppCycle) advanceTask(progress, goal int32) {
84 m.Lock()
85 defer m.Unlock()
86 m.task.Goal += goal
87 m.task.Progress += progress
88 for _, t := range m.taskTrackers {
89 select {
90 case t <- m.task:
91 default:
92 // TODO(caprita): Make it such that the latest task
93 // update is always added to the channel even if channel
94 // is full. One way is to pull an element from t and
95 // then re-try the push.
96 }
97 }
98}
99
100func (m *AppCycle) AdvanceGoal(delta int32) {
101 if delta <= 0 {
102 return
103 }
104 m.advanceTask(0, delta)
105}
106
107func (m *AppCycle) AdvanceProgress(delta int32) {
108 if delta <= 0 {
109 return
110 }
111 m.advanceTask(delta, 0)
112}
113
114func (m *AppCycle) Remote() interface{} {
115 return stub.AppCycleServer(m.disp)
116}
117
118func (d *invoker) Stop(ctx stub.AppCycleStopContext) error {
119 // The size of the channel should be reasonably sized to expect not to
120 // miss updates while we're waiting for the stream to unblock.
121 ch := make(chan veyron2.Task, 10)
122 d.ac.TrackTask(ch)
123 // TODO(caprita): Include identity of Stop issuer in message.
124 d.ac.stop(veyron2.RemoteStop)
125 for {
126 task, ok := <-ch
127 if !ok {
128 // Channel closed, meaning process shutdown is imminent.
129 break
130 }
131 actask := stub.Task{Progress: task.Progress, Goal: task.Goal}
132 ctx.SendStream().Send(actask)
133 }
134 return nil
135}
136
137func (d *invoker) ForceStop(ipc.ServerContext) error {
138 d.ac.ForceStop()
139 return fmt.Errorf("ForceStop should not reply as the process should be dead")
140}