Asim Shankar | ae71344 | 2014-06-05 15:31:23 -0700 | [diff] [blame] | 1 | // Package publisher provides a type to publish names to a mounttable. |
| 2 | package publisher |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 3 | |
| 4 | // TODO(toddw): Add unittests. |
| 5 | |
| 6 | import ( |
| 7 | "fmt" |
| 8 | "strings" |
| 9 | "time" |
| 10 | |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 11 | "veyron.io/veyron/veyron2/context" |
| 12 | "veyron.io/veyron/veyron2/naming" |
| 13 | "veyron.io/veyron/veyron2/vlog" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 14 | ) |
| 15 | |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 16 | // TODO(cnicolaou): have the done channel return an error so |
| 17 | // that the publisher calls can return errors also. |
| 18 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 19 | // Publisher manages the publishing of servers in mounttable. |
| 20 | type Publisher interface { |
| 21 | // AddServer adds a new server to be mounted. |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 22 | AddServer(server string, ServesMountTable bool) |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 23 | // RemoveServer removes a server from the list of mounts. |
| 24 | RemoveServer(server string) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 25 | // AddName adds a new name for all servers to be mounted as. |
| 26 | AddName(name string) |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 27 | // RemoveName removes a name. |
| 28 | RemoveName(name string) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 29 | // Published returns the published names rooted at the mounttable. |
| 30 | Published() []string |
| 31 | // DebugString returns a string representation of the publisher |
| 32 | // meant solely for debugging. |
| 33 | DebugString() string |
| 34 | // Stop causes the publishing to stop and initiates unmounting of the |
| 35 | // mounted names. Stop performs the unmounting asynchronously, and |
| 36 | // WaitForStop should be used to wait until it is done. |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 37 | // Once Stop is called Add/RemoveServer and AddName become noops. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 38 | Stop() |
| 39 | // WaitForStop waits until all unmounting initiated by Stop is finished. |
| 40 | WaitForStop() |
| 41 | } |
| 42 | |
Asim Shankar | ae71344 | 2014-06-05 15:31:23 -0700 | [diff] [blame] | 43 | // The publisher adds this much slack to each TTL. |
| 44 | const mountTTLSlack = 20 * time.Second |
| 45 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 46 | // publisher maintains the name->server associations in the mounttable. It |
| 47 | // spawns its own goroutine that does the actual work; the publisher itself |
| 48 | // simply coordinates concurrent access by sending and receiving on the |
| 49 | // appropriate channels. |
| 50 | type publisher struct { |
| 51 | cmdchan chan interface{} // value is one of {server,name,debug}Cmd |
| 52 | donechan chan struct{} // closed when the publisher is done |
| 53 | } |
| 54 | |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 55 | type addServerCmd struct { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 56 | server string // server to add |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 57 | mt bool // true if server serves a mount table |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 58 | done chan struct{} // closed when the cmd is done |
| 59 | } |
| 60 | |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 61 | type removeServerCmd struct { |
| 62 | server string // server to remove |
| 63 | done chan struct{} // closed when the cmd is done |
| 64 | } |
| 65 | |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 66 | type addNameCmd struct { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 67 | name string // name to add |
| 68 | done chan struct{} // closed when the cmd is done |
| 69 | } |
| 70 | |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 71 | type removeNameCmd struct { |
| 72 | name string // name to remove |
| 73 | done chan struct{} // closed when the cmd is done |
| 74 | } |
| 75 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 76 | type debugCmd chan string // debug string is sent when the cmd is done |
| 77 | |
| 78 | type publishedCmd chan []string // published names are sent when cmd is done |
| 79 | |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 80 | type stopCmd struct{} // sent to the runloop when we want it to exit. |
| 81 | |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 82 | // New returns a new publisher that updates mounts on ns every period. |
| 83 | func New(ctx context.T, ns naming.Namespace, period time.Duration) Publisher { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 84 | p := &publisher{ |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 85 | cmdchan: make(chan interface{}), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 86 | donechan: make(chan struct{}), |
| 87 | } |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 88 | go runLoop(ctx, p.cmdchan, p.donechan, ns, period) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 89 | return p |
| 90 | } |
| 91 | |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 92 | func (p *publisher) sendCmd(cmd interface{}) bool { |
| 93 | select { |
| 94 | case p.cmdchan <- cmd: |
| 95 | return true |
| 96 | case <-p.donechan: |
| 97 | return false |
| 98 | } |
| 99 | } |
| 100 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 101 | func (p *publisher) AddServer(server string, mt bool) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 102 | done := make(chan struct{}) |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 103 | if p.sendCmd(addServerCmd{server, mt, done}) { |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 104 | <-done |
| 105 | } |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 106 | } |
| 107 | |
| 108 | func (p *publisher) RemoveServer(server string) { |
| 109 | done := make(chan struct{}) |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 110 | if p.sendCmd(removeServerCmd{server, done}) { |
| 111 | <-done |
| 112 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 113 | } |
| 114 | |
| 115 | func (p *publisher) AddName(name string) { |
| 116 | done := make(chan struct{}) |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 117 | if p.sendCmd(addNameCmd{name, done}) { |
| 118 | <-done |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | func (p *publisher) RemoveName(name string) { |
| 123 | done := make(chan struct{}) |
| 124 | if p.sendCmd(removeNameCmd{name, done}) { |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 125 | <-done |
| 126 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 127 | } |
| 128 | |
| 129 | // Published returns the published name(s) for this publisher, where each name |
| 130 | // is rooted at the mount table(s) where the name has been mounted. |
| 131 | // The names are returned grouped by published name, where all the names |
| 132 | // corresponding the the mount table replicas are grouped together. |
| 133 | func (p *publisher) Published() []string { |
| 134 | published := make(publishedCmd) |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 135 | if p.sendCmd(published) { |
| 136 | return <-published |
| 137 | } |
| 138 | return []string{} |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 139 | } |
| 140 | |
| 141 | func (p *publisher) DebugString() (dbg string) { |
| 142 | debug := make(debugCmd) |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 143 | if p.sendCmd(debug) { |
| 144 | dbg = <-debug |
| 145 | } else { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 146 | dbg = "stopped" |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 147 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 148 | return |
| 149 | } |
| 150 | |
| 151 | // Stop stops the publisher, which in practical terms means un-mounting |
| 152 | // everything and preventing any further publish operations. The caller can |
| 153 | // be confident that no new names or servers will get published once Stop |
| 154 | // returns. To wait for existing mounts to be cleaned up, use WaitForStop. |
| 155 | // |
| 156 | // Stopping the publisher is irreversible. |
| 157 | // |
| 158 | // Once the publisher is stopped, any further calls on its public methods |
| 159 | // (including Stop) are no-ops. |
| 160 | func (p *publisher) Stop() { |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 161 | p.sendCmd(stopCmd{}) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 162 | } |
| 163 | |
| 164 | func (p *publisher) WaitForStop() { |
| 165 | <-p.donechan |
| 166 | } |
| 167 | |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 168 | func runLoop(ctx context.T, cmdchan chan interface{}, donechan chan struct{}, ns naming.Namespace, period time.Duration) { |
Cosmos Nicolaou | ef323db | 2014-09-07 22:13:28 -0700 | [diff] [blame] | 169 | vlog.VI(2).Info("ipc pub: start runLoop") |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 170 | state := newPubState(ctx, ns, period) |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 171 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 172 | for { |
| 173 | select { |
Matt Rosencrantz | cd8ffda | 2014-09-23 16:54:15 -0700 | [diff] [blame] | 174 | case cmd := <-cmdchan: |
| 175 | switch tcmd := cmd.(type) { |
| 176 | case stopCmd: |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 177 | state.unmountAll() |
Matt Rosencrantz | 4a93c46 | 2014-09-23 17:11:25 -0700 | [diff] [blame] | 178 | close(donechan) |
Cosmos Nicolaou | ef323db | 2014-09-07 22:13:28 -0700 | [diff] [blame] | 179 | vlog.VI(2).Info("ipc pub: exit runLoop") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 180 | return |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 181 | case addServerCmd: |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 182 | state.addServer(tcmd.server, tcmd.mt) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 183 | close(tcmd.done) |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 184 | case removeServerCmd: |
| 185 | state.removeServer(tcmd.server) |
| 186 | close(tcmd.done) |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 187 | case addNameCmd: |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 188 | state.addName(tcmd.name) |
| 189 | close(tcmd.done) |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 190 | case removeNameCmd: |
| 191 | state.removeName(tcmd.name) |
| 192 | close(tcmd.done) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 193 | case publishedCmd: |
| 194 | tcmd <- state.published() |
| 195 | close(tcmd) |
| 196 | case debugCmd: |
| 197 | tcmd <- state.debugString() |
| 198 | close(tcmd) |
| 199 | } |
| 200 | case <-state.timeout(): |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 201 | // Sync everything once every period, to refresh the ttls. |
| 202 | state.sync() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 203 | } |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | // pubState maintains the state for our periodic mounts. It is not thread-safe; |
| 208 | // it's only used in the sequential publisher runLoop. |
| 209 | type pubState struct { |
Matt Rosencrantz | 29147f7 | 2014-06-06 12:46:01 -0700 | [diff] [blame] | 210 | ctx context.T |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 211 | ns naming.Namespace |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 212 | period time.Duration |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 213 | deadline time.Time // deadline for the next sync call |
| 214 | names map[string]bool // names that have been added |
| 215 | servers map[string]bool // servers that have been added, true |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 216 | mounts map[mountKey]*mountStatus // map each (name,server) to its status |
David Why Use Two When One Will Do Presotto | c28686e | 2014-11-05 11:19:29 -0800 | [diff] [blame] | 217 | // if server is a mount table server |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 218 | } |
| 219 | |
| 220 | type mountKey struct { |
| 221 | name string |
| 222 | server string |
| 223 | } |
| 224 | |
| 225 | type mountStatus struct { |
| 226 | lastMount time.Time |
| 227 | lastMountErr error |
| 228 | lastUnmount time.Time |
| 229 | lastUnmountErr error |
| 230 | } |
| 231 | |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 232 | func newPubState(ctx context.T, ns naming.Namespace, period time.Duration) *pubState { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 233 | return &pubState{ |
Matt Rosencrantz | 7e68d5a | 2014-06-11 15:28:51 +0000 | [diff] [blame] | 234 | ctx: ctx, |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 235 | ns: ns, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 236 | period: period, |
| 237 | deadline: time.Now().Add(period), |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 238 | names: make(map[string]bool), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 239 | servers: make(map[string]bool), |
| 240 | mounts: make(map[mountKey]*mountStatus), |
| 241 | } |
| 242 | } |
| 243 | |
| 244 | func (ps *pubState) timeout() <-chan time.Time { |
| 245 | return time.After(ps.deadline.Sub(time.Now())) |
| 246 | } |
| 247 | |
| 248 | func (ps *pubState) addName(name string) { |
| 249 | // Each non-dup name that is added causes new mounts to be created for all |
| 250 | // existing servers. |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 251 | if ps.names[name] { |
| 252 | return |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 253 | } |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 254 | ps.names[name] = true |
David Why Use Two When One Will Do Presotto | c28686e | 2014-11-05 11:19:29 -0800 | [diff] [blame] | 255 | for server, servesMT := range ps.servers { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 256 | status := new(mountStatus) |
| 257 | ps.mounts[mountKey{name, server}] = status |
David Why Use Two When One Will Do Presotto | c28686e | 2014-11-05 11:19:29 -0800 | [diff] [blame] | 258 | ps.mount(name, server, status, servesMT) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 259 | } |
| 260 | } |
| 261 | |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 262 | func (ps *pubState) removeName(name string) { |
| 263 | if !ps.names[name] { |
| 264 | return |
| 265 | } |
| 266 | for server, _ := range ps.servers { |
| 267 | if status, exists := ps.mounts[mountKey{name, server}]; exists { |
| 268 | ps.unmount(name, server, status) |
| 269 | } |
| 270 | } |
| 271 | delete(ps.names, name) |
| 272 | } |
| 273 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 274 | func (ps *pubState) addServer(server string, servesMT bool) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 275 | // Each non-dup server that is added causes new mounts to be created for all |
| 276 | // existing names. |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 277 | if !ps.servers[server] { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 278 | ps.servers[server] = servesMT |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 279 | for name, _ := range ps.names { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 280 | status := new(mountStatus) |
| 281 | ps.mounts[mountKey{name, server}] = status |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 282 | ps.mount(name, server, status, servesMT) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 283 | } |
| 284 | } |
| 285 | } |
| 286 | |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 287 | func (ps *pubState) removeServer(server string) { |
| 288 | if _, exists := ps.servers[server]; !exists { |
| 289 | return |
| 290 | } |
| 291 | delete(ps.servers, server) |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 292 | for name, _ := range ps.names { |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 293 | if status, exists := ps.mounts[mountKey{name, server}]; exists { |
| 294 | ps.unmount(name, server, status) |
| 295 | } |
| 296 | } |
| 297 | } |
| 298 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 299 | func (ps *pubState) mount(name, server string, status *mountStatus, servesMT bool) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 300 | // Always mount with ttl = period + slack, regardless of whether this is |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 301 | // triggered by a newly added server or name, or by sync. The next call |
| 302 | // to sync will occur within the next period, and refresh all mounts. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 303 | ttl := ps.period + mountTTLSlack |
| 304 | status.lastMount = time.Now() |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 305 | status.lastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTableOpt(servesMT)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 306 | if status.lastMountErr != nil { |
| 307 | vlog.Errorf("ipc pub: couldn't mount(%v, %v, %v): %v", name, server, ttl, status.lastMountErr) |
| 308 | } else { |
| 309 | vlog.VI(2).Infof("ipc pub: mount(%v, %v, %v)", name, server, ttl) |
| 310 | } |
| 311 | } |
| 312 | |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 313 | func (ps *pubState) sync() { |
| 314 | ps.deadline = time.Now().Add(ps.period) // set deadline for the next sync |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 315 | for key, status := range ps.mounts { |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 316 | if status.lastUnmountErr != nil { |
| 317 | // Desired state is "unmounted", failed at previous attempt. Retry. |
| 318 | ps.unmount(key.name, key.server, status) |
| 319 | } else { |
David Why Use Two When One Will Do Presotto | c28686e | 2014-11-05 11:19:29 -0800 | [diff] [blame] | 320 | ps.mount(key.name, key.server, status, ps.servers[key.server]) |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 321 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 322 | } |
| 323 | } |
| 324 | |
| 325 | func (ps *pubState) unmount(name, server string, status *mountStatus) { |
| 326 | status.lastUnmount = time.Now() |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 327 | status.lastUnmountErr = ps.ns.Unmount(ps.ctx, name, server) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 328 | if status.lastUnmountErr != nil { |
| 329 | vlog.Errorf("ipc pub: couldn't unmount(%v, %v): %v", name, server, status.lastUnmountErr) |
| 330 | } else { |
| 331 | vlog.VI(2).Infof("ipc pub: unmount(%v, %v)", name, server) |
Asim Shankar | 0ea02ab | 2014-06-09 11:39:24 -0700 | [diff] [blame] | 332 | delete(ps.mounts, mountKey{name, server}) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 333 | } |
| 334 | } |
| 335 | |
| 336 | func (ps *pubState) unmountAll() { |
| 337 | for key, status := range ps.mounts { |
| 338 | ps.unmount(key.name, key.server, status) |
| 339 | } |
| 340 | } |
| 341 | |
| 342 | func (ps *pubState) published() []string { |
| 343 | var ret []string |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 344 | for name, _ := range ps.names { |
David Why Use Two When One Will Do Presotto | 59a254c | 2014-10-30 13:09:29 -0700 | [diff] [blame] | 345 | e, err := ps.ns.ResolveToMountTableX(ps.ctx, name) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 346 | if err != nil { |
| 347 | vlog.Errorf("ipc pub: couldn't resolve %v to mount table: %v", name, err) |
| 348 | continue |
| 349 | } |
David Why Use Two When One Will Do Presotto | 59a254c | 2014-10-30 13:09:29 -0700 | [diff] [blame] | 350 | if len(e.Servers) == 0 { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 351 | vlog.Errorf("ipc pub: no mount table found for %v", name) |
| 352 | continue |
| 353 | } |
David Why Use Two When One Will Do Presotto | 59a254c | 2014-10-30 13:09:29 -0700 | [diff] [blame] | 354 | for _, s := range e.Servers { |
| 355 | ret = append(ret, naming.JoinAddressName(s.Server, e.Name)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 356 | } |
| 357 | } |
| 358 | return ret |
| 359 | } |
| 360 | |
Cosmos Nicolaou | 92dba58 | 2014-11-05 17:24:10 -0800 | [diff] [blame] | 361 | // TODO(toddw): sort the names/servers so that the output order is stable. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 362 | func (ps *pubState) debugString() string { |
| 363 | l := make([]string, 2+len(ps.mounts)) |
| 364 | l = append(l, fmt.Sprintf("Publisher period:%v deadline:%v", ps.period, ps.deadline)) |
| 365 | l = append(l, "==============================Mounts============================================") |
| 366 | for key, status := range ps.mounts { |
| 367 | l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v) unmount(%v, %v)", key.name, key.server, status.lastMount, status.lastMountErr, status.lastUnmount, status.lastUnmountErr)) |
| 368 | } |
| 369 | return strings.Join(l, "\n") |
| 370 | } |