blob: eece0f7988d04fc4489310da5127292757405a1a [file] [log] [blame]
Asim Shankarae713442014-06-05 15:31:23 -07001// Package publisher provides a type to publish names to a mounttable.
2package publisher
Jiri Simsa5293dcb2014-05-10 09:56:38 -07003
4// TODO(toddw): Add unittests.
5
6import (
7 "fmt"
8 "strings"
9 "time"
10
Jiri Simsa519c5072014-09-17 21:37:57 -070011 "veyron.io/veyron/veyron2/context"
12 "veyron.io/veyron/veyron2/naming"
13 "veyron.io/veyron/veyron2/vlog"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070014)
15
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080016// TODO(cnicolaou): have the done channel return an error so
17// that the publisher calls can return errors also.
18
Jiri Simsa5293dcb2014-05-10 09:56:38 -070019// Publisher manages the publishing of servers in mounttable.
20type Publisher interface {
21 // AddServer adds a new server to be mounted.
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -070022 AddServer(server string, ServesMountTable bool)
Asim Shankar0ea02ab2014-06-09 11:39:24 -070023 // RemoveServer removes a server from the list of mounts.
24 RemoveServer(server string)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070025 // AddName adds a new name for all servers to be mounted as.
26 AddName(name string)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080027 // RemoveName removes a name.
28 RemoveName(name string)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070029 // Published returns the published names rooted at the mounttable.
30 Published() []string
31 // DebugString returns a string representation of the publisher
32 // meant solely for debugging.
33 DebugString() string
34 // Stop causes the publishing to stop and initiates unmounting of the
35 // mounted names. Stop performs the unmounting asynchronously, and
36 // WaitForStop should be used to wait until it is done.
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -070037 // Once Stop is called Add/RemoveServer and AddName become noops.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070038 Stop()
39 // WaitForStop waits until all unmounting initiated by Stop is finished.
40 WaitForStop()
41}
42
Asim Shankarae713442014-06-05 15:31:23 -070043// The publisher adds this much slack to each TTL.
44const mountTTLSlack = 20 * time.Second
45
Jiri Simsa5293dcb2014-05-10 09:56:38 -070046// publisher maintains the name->server associations in the mounttable. It
47// spawns its own goroutine that does the actual work; the publisher itself
48// simply coordinates concurrent access by sending and receiving on the
49// appropriate channels.
50type publisher struct {
51 cmdchan chan interface{} // value is one of {server,name,debug}Cmd
52 donechan chan struct{} // closed when the publisher is done
53}
54
Asim Shankar0ea02ab2014-06-09 11:39:24 -070055type addServerCmd struct {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070056 server string // server to add
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -070057 mt bool // true if server serves a mount table
Jiri Simsa5293dcb2014-05-10 09:56:38 -070058 done chan struct{} // closed when the cmd is done
59}
60
Asim Shankar0ea02ab2014-06-09 11:39:24 -070061type removeServerCmd struct {
62 server string // server to remove
63 done chan struct{} // closed when the cmd is done
64}
65
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080066type addNameCmd struct {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070067 name string // name to add
68 done chan struct{} // closed when the cmd is done
69}
70
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080071type removeNameCmd struct {
72 name string // name to remove
73 done chan struct{} // closed when the cmd is done
74}
75
Jiri Simsa5293dcb2014-05-10 09:56:38 -070076type debugCmd chan string // debug string is sent when the cmd is done
77
78type publishedCmd chan []string // published names are sent when cmd is done
79
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -070080type stopCmd struct{} // sent to the runloop when we want it to exit.
81
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070082// New returns a new publisher that updates mounts on ns every period.
83func New(ctx context.T, ns naming.Namespace, period time.Duration) Publisher {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070084 p := &publisher{
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -070085 cmdchan: make(chan interface{}),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070086 donechan: make(chan struct{}),
87 }
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -070088 go runLoop(ctx, p.cmdchan, p.donechan, ns, period)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070089 return p
90}
91
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -070092func (p *publisher) sendCmd(cmd interface{}) bool {
93 select {
94 case p.cmdchan <- cmd:
95 return true
96 case <-p.donechan:
97 return false
98 }
99}
100
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700101func (p *publisher) AddServer(server string, mt bool) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700102 done := make(chan struct{})
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700103 if p.sendCmd(addServerCmd{server, mt, done}) {
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700104 <-done
105 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700106}
107
108func (p *publisher) RemoveServer(server string) {
109 done := make(chan struct{})
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700110 if p.sendCmd(removeServerCmd{server, done}) {
111 <-done
112 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700113}
114
115func (p *publisher) AddName(name string) {
116 done := make(chan struct{})
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800117 if p.sendCmd(addNameCmd{name, done}) {
118 <-done
119 }
120}
121
122func (p *publisher) RemoveName(name string) {
123 done := make(chan struct{})
124 if p.sendCmd(removeNameCmd{name, done}) {
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700125 <-done
126 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700127}
128
129// Published returns the published name(s) for this publisher, where each name
130// is rooted at the mount table(s) where the name has been mounted.
131// The names are returned grouped by published name, where all the names
132// corresponding the the mount table replicas are grouped together.
133func (p *publisher) Published() []string {
134 published := make(publishedCmd)
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700135 if p.sendCmd(published) {
136 return <-published
137 }
138 return []string{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700139}
140
141func (p *publisher) DebugString() (dbg string) {
142 debug := make(debugCmd)
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700143 if p.sendCmd(debug) {
144 dbg = <-debug
145 } else {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700146 dbg = "stopped"
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700147 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700148 return
149}
150
151// Stop stops the publisher, which in practical terms means un-mounting
152// everything and preventing any further publish operations. The caller can
153// be confident that no new names or servers will get published once Stop
154// returns. To wait for existing mounts to be cleaned up, use WaitForStop.
155//
156// Stopping the publisher is irreversible.
157//
158// Once the publisher is stopped, any further calls on its public methods
159// (including Stop) are no-ops.
160func (p *publisher) Stop() {
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700161 p.sendCmd(stopCmd{})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700162}
163
164func (p *publisher) WaitForStop() {
165 <-p.donechan
166}
167
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700168func runLoop(ctx context.T, cmdchan chan interface{}, donechan chan struct{}, ns naming.Namespace, period time.Duration) {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700169 vlog.VI(2).Info("ipc pub: start runLoop")
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700170 state := newPubState(ctx, ns, period)
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700171
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700172 for {
173 select {
Matt Rosencrantzcd8ffda2014-09-23 16:54:15 -0700174 case cmd := <-cmdchan:
175 switch tcmd := cmd.(type) {
176 case stopCmd:
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700177 state.unmountAll()
Matt Rosencrantz4a93c462014-09-23 17:11:25 -0700178 close(donechan)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700179 vlog.VI(2).Info("ipc pub: exit runLoop")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700180 return
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700181 case addServerCmd:
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700182 state.addServer(tcmd.server, tcmd.mt)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700183 close(tcmd.done)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700184 case removeServerCmd:
185 state.removeServer(tcmd.server)
186 close(tcmd.done)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800187 case addNameCmd:
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700188 state.addName(tcmd.name)
189 close(tcmd.done)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800190 case removeNameCmd:
191 state.removeName(tcmd.name)
192 close(tcmd.done)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700193 case publishedCmd:
194 tcmd <- state.published()
195 close(tcmd)
196 case debugCmd:
197 tcmd <- state.debugString()
198 close(tcmd)
199 }
200 case <-state.timeout():
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700201 // Sync everything once every period, to refresh the ttls.
202 state.sync()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700203 }
204 }
205}
206
207// pubState maintains the state for our periodic mounts. It is not thread-safe;
208// it's only used in the sequential publisher runLoop.
209type pubState struct {
Matt Rosencrantz29147f72014-06-06 12:46:01 -0700210 ctx context.T
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700211 ns naming.Namespace
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700212 period time.Duration
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800213 deadline time.Time // deadline for the next sync call
214 names map[string]bool // names that have been added
215 servers map[string]bool // servers that have been added, true
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800216 mounts map[mountKey]*mountStatus // map each (name,server) to its status
David Why Use Two When One Will Do Presottoc28686e2014-11-05 11:19:29 -0800217 // if server is a mount table server
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700218}
219
220type mountKey struct {
221 name string
222 server string
223}
224
225type mountStatus struct {
226 lastMount time.Time
227 lastMountErr error
228 lastUnmount time.Time
229 lastUnmountErr error
230}
231
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700232func newPubState(ctx context.T, ns naming.Namespace, period time.Duration) *pubState {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700233 return &pubState{
Matt Rosencrantz7e68d5a2014-06-11 15:28:51 +0000234 ctx: ctx,
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700235 ns: ns,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700236 period: period,
237 deadline: time.Now().Add(period),
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800238 names: make(map[string]bool),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700239 servers: make(map[string]bool),
240 mounts: make(map[mountKey]*mountStatus),
241 }
242}
243
244func (ps *pubState) timeout() <-chan time.Time {
245 return time.After(ps.deadline.Sub(time.Now()))
246}
247
248func (ps *pubState) addName(name string) {
249 // Each non-dup name that is added causes new mounts to be created for all
250 // existing servers.
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800251 if ps.names[name] {
252 return
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700253 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800254 ps.names[name] = true
David Why Use Two When One Will Do Presottoc28686e2014-11-05 11:19:29 -0800255 for server, servesMT := range ps.servers {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700256 status := new(mountStatus)
257 ps.mounts[mountKey{name, server}] = status
David Why Use Two When One Will Do Presottoc28686e2014-11-05 11:19:29 -0800258 ps.mount(name, server, status, servesMT)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700259 }
260}
261
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800262func (ps *pubState) removeName(name string) {
263 if !ps.names[name] {
264 return
265 }
266 for server, _ := range ps.servers {
267 if status, exists := ps.mounts[mountKey{name, server}]; exists {
268 ps.unmount(name, server, status)
269 }
270 }
271 delete(ps.names, name)
272}
273
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700274func (ps *pubState) addServer(server string, servesMT bool) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700275 // Each non-dup server that is added causes new mounts to be created for all
276 // existing names.
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800277 if !ps.servers[server] {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700278 ps.servers[server] = servesMT
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800279 for name, _ := range ps.names {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700280 status := new(mountStatus)
281 ps.mounts[mountKey{name, server}] = status
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700282 ps.mount(name, server, status, servesMT)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700283 }
284 }
285}
286
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700287func (ps *pubState) removeServer(server string) {
288 if _, exists := ps.servers[server]; !exists {
289 return
290 }
291 delete(ps.servers, server)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800292 for name, _ := range ps.names {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700293 if status, exists := ps.mounts[mountKey{name, server}]; exists {
294 ps.unmount(name, server, status)
295 }
296 }
297}
298
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700299func (ps *pubState) mount(name, server string, status *mountStatus, servesMT bool) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700300 // Always mount with ttl = period + slack, regardless of whether this is
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700301 // triggered by a newly added server or name, or by sync. The next call
302 // to sync will occur within the next period, and refresh all mounts.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700303 ttl := ps.period + mountTTLSlack
304 status.lastMount = time.Now()
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700305 status.lastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTableOpt(servesMT))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700306 if status.lastMountErr != nil {
307 vlog.Errorf("ipc pub: couldn't mount(%v, %v, %v): %v", name, server, ttl, status.lastMountErr)
308 } else {
309 vlog.VI(2).Infof("ipc pub: mount(%v, %v, %v)", name, server, ttl)
310 }
311}
312
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700313func (ps *pubState) sync() {
314 ps.deadline = time.Now().Add(ps.period) // set deadline for the next sync
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700315 for key, status := range ps.mounts {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700316 if status.lastUnmountErr != nil {
317 // Desired state is "unmounted", failed at previous attempt. Retry.
318 ps.unmount(key.name, key.server, status)
319 } else {
David Why Use Two When One Will Do Presottoc28686e2014-11-05 11:19:29 -0800320 ps.mount(key.name, key.server, status, ps.servers[key.server])
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700321 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700322 }
323}
324
325func (ps *pubState) unmount(name, server string, status *mountStatus) {
326 status.lastUnmount = time.Now()
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700327 status.lastUnmountErr = ps.ns.Unmount(ps.ctx, name, server)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700328 if status.lastUnmountErr != nil {
329 vlog.Errorf("ipc pub: couldn't unmount(%v, %v): %v", name, server, status.lastUnmountErr)
330 } else {
331 vlog.VI(2).Infof("ipc pub: unmount(%v, %v)", name, server)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700332 delete(ps.mounts, mountKey{name, server})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700333 }
334}
335
336func (ps *pubState) unmountAll() {
337 for key, status := range ps.mounts {
338 ps.unmount(key.name, key.server, status)
339 }
340}
341
342func (ps *pubState) published() []string {
343 var ret []string
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800344 for name, _ := range ps.names {
David Why Use Two When One Will Do Presotto59a254c2014-10-30 13:09:29 -0700345 e, err := ps.ns.ResolveToMountTableX(ps.ctx, name)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700346 if err != nil {
347 vlog.Errorf("ipc pub: couldn't resolve %v to mount table: %v", name, err)
348 continue
349 }
David Why Use Two When One Will Do Presotto59a254c2014-10-30 13:09:29 -0700350 if len(e.Servers) == 0 {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700351 vlog.Errorf("ipc pub: no mount table found for %v", name)
352 continue
353 }
David Why Use Two When One Will Do Presotto59a254c2014-10-30 13:09:29 -0700354 for _, s := range e.Servers {
355 ret = append(ret, naming.JoinAddressName(s.Server, e.Name))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700356 }
357 }
358 return ret
359}
360
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800361// TODO(toddw): sort the names/servers so that the output order is stable.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700362func (ps *pubState) debugString() string {
363 l := make([]string, 2+len(ps.mounts))
364 l = append(l, fmt.Sprintf("Publisher period:%v deadline:%v", ps.period, ps.deadline))
365 l = append(l, "==============================Mounts============================================")
366 for key, status := range ps.mounts {
367 l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v) unmount(%v, %v)", key.name, key.server, status.lastMount, status.lastMountErr, status.lastUnmount, status.lastUnmountErr))
368 }
369 return strings.Join(l, "\n")
370}