ref: Introduce valid channel to publisher and reimplement publisher
so that calls are truly async.
This uncovered quite a few tests that were passing just because the
old publisher was semi-synchronous when it used an unbuffered chan.
Also introduce PublisherState to the PublisherStatus(formerly Mounts).
Fixes vanadium/issues#1148
MultiPart: 3/4
Change-Id: Ifa906b5ee0fb1676d1ba1fb6977979a79060bbb4
diff --git a/examples/rps/rpsbot/impl_test.go b/examples/rps/rpsbot/impl_test.go
index fdf6bd0..1a1411a 100644
--- a/examples/rps/rpsbot/impl_test.go
+++ b/examples/rps/rpsbot/impl_test.go
@@ -19,6 +19,7 @@
"v.io/v23"
"v.io/v23/context"
"v.io/x/ref/examples/rps"
+ "v.io/x/ref/test/testutil"
"v.io/x/ref/test/v23test"
)
@@ -32,6 +33,8 @@
for _, n := range names[1:] {
server.AddName(n)
}
+ testutil.WaitForServerPublished(server)
+
return rpsService, func() {
if err := server.Stop(); err != nil {
t.Fatalf("Stop() failed: %v", err)
diff --git a/lib/publisher/publisher.go b/lib/publisher/publisher.go
index 7e45d5d..eb69bb7 100644
--- a/lib/publisher/publisher.go
+++ b/lib/publisher/publisher.go
@@ -5,12 +5,10 @@
// Package publisher provides a type to publish names to a mounttable.
package publisher
-// TODO(toddw): Add unittests.
-
import (
"fmt"
- "sort"
"strings"
+ "sync"
"time"
"v.io/v23/context"
@@ -21,192 +19,25 @@
"v.io/v23/verror"
)
-// Publisher manages the publishing of servers in mounttable.
-type Publisher interface {
- // AddServer adds a new server to be mounted.
- AddServer(server string)
- // RemoveServer removes a server from the list of mounts.
- RemoveServer(server string)
- // AddName adds a new name for all servers to be mounted as.
- AddName(name string, ServesMountTable bool, IsLeaf bool)
- // RemoveName removes a name.
- RemoveName(name string)
- // Status returns a snapshot of the publisher's current state.
- Status() rpc.MountState
- // DebugString returns a string representation of the publisher
- // meant solely for debugging.
- DebugString() string
- // Stop causes the publishing to stop and initiates unmounting of the
- // mounted names. Stop performs the unmounting asynchronously, and
- // WaitForStop should be used to wait until it is done.
- // Once Stop is called Add/RemoveServer and AddName become noops.
- Stop()
- // WaitForStop waits until all unmounting initiated by Stop is finished.
- WaitForStop()
-}
-
// The publisher adds this much slack to each TTL.
const mountTTLSlack = 20 * time.Second
-// publisher maintains the name->server associations in the mounttable. It
-// spawns its own goroutine that does the actual work; the publisher itself
-// simply coordinates concurrent access by sending and receiving on the
-// appropriate channels.
-type publisher struct {
- cmdchan chan interface{} // value is one of {server,name,debug}Cmd
- stopchan chan struct{} // closed when no longer accepting commands.
- donechan chan struct{} // closed when the publisher is done
- ctx *context.T
-}
+// T manages the publishing of names and servers in the mounttable.
+// It spawns an internal goroutine the periodically performs mount and unmount
+// rpcs. T is safe to use concurrently.
+type T struct {
+ ctx *context.T // context used to make rpcs
+ cancel context.CancelFunc // cancel function for the above ctx
+ ns namespace.T
+ period time.Duration
+ closed chan struct{} // closed when the Publisher is closed
-type addServerCmd struct {
- server string // server to add
-}
-
-type removeServerCmd struct {
- server string // server to remove
-}
-
-type addNameCmd struct {
- name string // name to add
- mt bool // true if server serves a mount table
- leaf bool // true if server is a leaf
-}
-
-type removeNameCmd struct {
- name string // name to remove
-}
-
-type debugCmd chan string // debug string is sent when the cmd is done
-
-type statusCmd chan rpc.MountState // status info is sent when cmd is done
-
-type stopCmd struct{} // sent to the runloop when we want it to exit.
-
-// New returns a new publisher that updates mounts on ns every period.
-func New(ctx *context.T, ns namespace.T, period time.Duration) Publisher {
- p := &publisher{
- cmdchan: make(chan interface{}),
- stopchan: make(chan struct{}),
- donechan: make(chan struct{}),
- ctx: ctx,
- }
- go runLoop(ctx, p.cmdchan, p.donechan, ns, period)
- return p
-}
-
-func (p *publisher) sendCmd(cmd interface{}) bool {
- select {
- case p.cmdchan <- cmd:
- return true
- case <-p.stopchan:
- return false
- case <-p.donechan:
- return false
- }
-}
-
-func (p *publisher) AddServer(server string) {
- p.sendCmd(addServerCmd{server})
-}
-
-func (p *publisher) RemoveServer(server string) {
- p.sendCmd(removeServerCmd{server})
-}
-
-func (p *publisher) AddName(name string, mt bool, leaf bool) {
- p.sendCmd(addNameCmd{name, mt, leaf})
-}
-
-func (p *publisher) RemoveName(name string) {
- p.sendCmd(removeNameCmd{name})
-}
-
-func (p *publisher) Status() rpc.MountState {
- status := make(statusCmd)
- if p.sendCmd(status) {
- return <-status
- }
- return rpc.MountState{}
-}
-
-func (p *publisher) DebugString() (dbg string) {
- debug := make(debugCmd)
- if p.sendCmd(debug) {
- dbg = <-debug
- } else {
- dbg = "stopped"
- }
- return
-}
-
-// Stop stops the publisher, which in practical terms means un-mounting
-// everything and preventing any further publish operations. The caller can
-// be confident that no new names or servers will get published once Stop
-// returns. To wait for existing mounts to be cleaned up, use WaitForStop.
-//
-// Stopping the publisher is irreversible.
-//
-// Once the publisher is stopped, any further calls on its public methods
-// (including Stop) are no-ops.
-func (p *publisher) Stop() {
- p.sendCmd(stopCmd{})
- close(p.stopchan) // stop accepting new commands now.
-}
-
-func (p *publisher) WaitForStop() {
- <-p.donechan
-}
-
-func runLoop(ctx *context.T, cmdchan chan interface{}, donechan chan struct{}, ns namespace.T, period time.Duration) {
- ctx.VI(2).Info("rpc pub: start runLoop")
- state := newPubState(ctx, ns, period)
- for {
- select {
- case cmd := <-cmdchan:
- switch tcmd := cmd.(type) {
- case stopCmd:
- state.unmountAll()
- close(donechan)
- ctx.VI(2).Info("rpc pub: exit runLoop")
- return
- case addServerCmd:
- state.addServer(tcmd.server)
- case removeServerCmd:
- state.removeServer(tcmd.server)
- case addNameCmd:
- state.addName(tcmd.name, tcmd.mt, tcmd.leaf)
- case removeNameCmd:
- state.removeName(tcmd.name)
- case statusCmd:
- tcmd <- state.getStatus()
- close(tcmd)
- case debugCmd:
- tcmd <- state.debugString()
- close(tcmd)
- }
- case <-state.timeout():
- // Sync everything once every period, to refresh the ttls.
- state.sync()
- }
- }
-}
-
-type mountKey struct {
- name, server string
-}
-
-// pubState maintains the state for our periodic mounts. It is not thread-safe;
-// it's only used in the sequential publisher runLoop.
-type pubState struct {
- ctx *context.T
- ns namespace.T
- period time.Duration
- deadline time.Time // deadline for the next sync call
- names map[string]nameAttr // names that have been added
- servers map[string]bool // servers that have been added, true
- // map each (name,server) to its status.
- mounts map[mountKey]*rpc.MountStatus
+ mu sync.Mutex
+ changed chan struct{}
+ dirty chan struct{}
+ names map[string]nameAttr // names that have been added
+ servers map[string]bool // servers that have been added
+ entries map[publishKey]rpc.PublisherEntry // map each (name,server) to its entry
}
type nameAttr struct {
@@ -214,176 +45,337 @@
isLeaf bool
}
-func newPubState(ctx *context.T, ns namespace.T, period time.Duration) *pubState {
- return &pubState{
- ctx: ctx,
- ns: ns,
- period: period,
- deadline: time.Now().Add(period),
- names: make(map[string]nameAttr),
- servers: make(map[string]bool),
- mounts: make(map[mountKey]*rpc.MountStatus),
+type publishKey struct {
+ name, server string
+}
+
+// New returns a new publisher that updates mounts on ns every period, and when
+// changes are made to the state.
+func New(ctx *context.T, ns namespace.T, period time.Duration) *T {
+ p := &T{
+ ns: ns,
+ period: period,
+ closed: make(chan struct{}),
+ changed: make(chan struct{}, 1),
+ dirty: make(chan struct{}),
+ names: make(map[string]nameAttr),
+ servers: make(map[string]bool),
+ entries: make(map[publishKey]rpc.PublisherEntry),
}
+ timer := time.NewTimer(period) // timer for the next refresh publish call
+ // We create a new root context so that unmount RPCs can work even after the ctx
+ // passed in is closed.
+ p.ctx, p.cancel = context.WithRootCancel(ctx)
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ p.stop()
+ close(p.closed)
+ return
+ case <-timer.C:
+ timer.Reset(period)
+ p.publish(true)
+ case <-p.changed:
+ p.publish(false)
+ }
+ }
+ }()
+ return p
}
-func (ps *pubState) timeout() <-chan time.Time {
- return time.After(ps.deadline.Sub(time.Now()))
-}
-
-func (ps *pubState) addName(name string, mt bool, leaf bool) {
- // Each non-dup name that is added causes new mounts to be created for all
- // existing servers.
- if _, exists := ps.names[name]; exists {
+// AddName adds a new name for all servers to be mounted as.
+func (p *T) AddName(name string, mt bool, leaf bool) {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ if p.names == nil {
return
}
- attr := nameAttr{mt, leaf}
- ps.names[name] = attr
- for server, _ := range ps.servers {
- status := new(rpc.MountStatus)
- ps.mounts[mountKey{name, server}] = status
- ps.mount(name, server, status, attr)
- }
-}
-
-func (ps *pubState) removeName(name string) {
- if _, exists := ps.names[name]; !exists {
+ if attr, exists := p.names[name]; exists && (attr.servesMT == mt && attr.isLeaf == leaf) {
return
}
- for server, _ := range ps.servers {
- if status, exists := ps.mounts[mountKey{name, server}]; exists {
- ps.unmount(name, server, status, true)
- }
- }
- delete(ps.names, name)
-}
-
-func (ps *pubState) addServer(server string) {
- // Each non-dup server that is added causes new mounts to be created for all
- // existing names.
- if _, exists := ps.servers[server]; !exists {
- ps.servers[server] = true
- for name, attr := range ps.names {
- status := new(rpc.MountStatus)
- ps.mounts[mountKey{name, server}] = status
- ps.mount(name, server, status, attr)
- }
- }
-}
-
-func (ps *pubState) removeServer(server string) {
- if _, exists := ps.servers[server]; !exists {
- return
- }
- delete(ps.servers, server)
- for name, _ := range ps.names {
- if status, exists := ps.mounts[mountKey{name, server}]; exists {
- ps.unmount(name, server, status, true)
- }
- }
-}
-
-func (ps *pubState) mount(name, server string, status *rpc.MountStatus, attr nameAttr) {
- // Always mount with ttl = period + slack, regardless of whether this is
- // triggered by a newly added server or name, or by sync. The next call
- // to sync will occur within the next period, and refresh all mounts.
- ttl := ps.period + mountTTLSlack
- last := *status
- status.LastMount = time.Now()
- status.LastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTable(attr.servesMT), naming.IsLeaf(attr.isLeaf))
- status.TTL = ttl
- // If the mount status changed, log it.
- if status.LastMountErr != nil {
- if verror.ErrorID(last.LastMountErr) != verror.ErrorID(status.LastMountErr) || ps.ctx.V(2) {
- ps.ctx.Errorf("rpc pub: couldn't mount(%v, %v, %v): %v", name, server, ttl, status.LastMountErr)
- }
- } else {
- if last.LastMount.IsZero() || last.LastMountErr != nil || ps.ctx.V(2) {
- ps.ctx.Infof("rpc pub: mount(%v, %v, %v)", name, server, ttl)
- }
- }
-}
-
-func (ps *pubState) sync() {
- ps.deadline = time.Now().Add(ps.period) // set deadline for the next sync
- for key, status := range ps.mounts {
- if status.LastUnmountErr != nil {
- // Desired state is "unmounted", failed at previous attempt. Retry.
- ps.unmount(key.name, key.server, status, true)
+ p.names[name] = nameAttr{mt, leaf}
+ for server := range p.servers {
+ key := publishKey{name, server}
+ if pe, ok := p.entries[key]; ok {
+ pe.DesiredState = rpc.PublisherMounted
+ p.entries[key] = pe
} else {
- ps.mount(key.name, key.server, status, ps.names[key.name])
+ p.entries[key] = rpc.PublisherEntry{Name: name, Server: server, DesiredState: rpc.PublisherMounted}
}
}
+ p.notifyChanged()
}
-func (ps *pubState) unmount(name, server string, status *rpc.MountStatus, retry bool) {
- status.LastUnmount = time.Now()
- var opts []naming.NamespaceOpt
- if !retry {
- opts = []naming.NamespaceOpt{options.NoRetry{}}
+// RemoveName removes a name.
+func (p *T) RemoveName(name string) {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ if p.names == nil {
+ return
}
- status.LastUnmountErr = ps.ns.Unmount(ps.ctx, name, server, opts...)
- if status.LastUnmountErr != nil {
- ps.ctx.Errorf("rpc pub: couldn't unmount(%v, %v): %v", name, server, status.LastUnmountErr)
- } else {
- ps.ctx.VI(1).Infof("rpc pub: unmount(%v, %v)", name, server)
- delete(ps.mounts, mountKey{name, server})
+ if _, exists := p.names[name]; !exists {
+ return
}
-}
-
-func (ps *pubState) unmountAll() {
- for key, status := range ps.mounts {
- ps.unmount(key.name, key.server, status, false)
- }
-}
-
-func copyNamesToSlice(sl map[string]nameAttr) []string {
- var ret []string
- for s, _ := range sl {
- if len(s) == 0 {
- continue
+ delete(p.names, name)
+ for server := range p.servers {
+ key := publishKey{name, server}
+ if pe, ok := p.entries[key]; ok {
+ pe.DesiredState = rpc.PublisherUnmounted
+ p.entries[key] = pe
}
- ret = append(ret, s)
}
- return ret
+ p.notifyChanged()
}
-func copyServersToSlice(sl map[string]bool) []string {
- var ret []string
- for s, _ := range sl {
- if len(s) == 0 {
- continue
+// AddServer adds a new server to be mounted under all names.
+func (p *T) AddServer(server string) {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ if p.names == nil {
+ return
+ }
+ if _, exists := p.servers[server]; exists {
+ return
+ }
+ p.servers[server] = true
+ for name := range p.names {
+ key := publishKey{name, server}
+ if pe, ok := p.entries[key]; ok {
+ pe.DesiredState = rpc.PublisherMounted
+ p.entries[key] = pe
+ } else {
+ p.entries[key] = rpc.PublisherEntry{Name: name, Server: server, DesiredState: rpc.PublisherMounted}
}
- ret = append(ret, s)
}
- return ret
+ p.notifyChanged()
}
-func (ps *pubState) getStatus() rpc.MountState {
- st := make([]rpc.MountStatus, 0, len(ps.mounts))
- names := copyNamesToSlice(ps.names)
- servers := copyServersToSlice(ps.servers)
- sort.Strings(names)
- sort.Strings(servers)
- for _, name := range names {
- for _, server := range servers {
- if v := ps.mounts[mountKey{name, server}]; v != nil {
- mst := *v
- mst.Name = name
- mst.Server = server
- st = append(st, mst)
+// RemoveServer removes a server from the list of mounts.
+func (p *T) RemoveServer(server string) {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ if p.names == nil {
+ return
+ }
+ if _, exists := p.servers[server]; !exists {
+ return
+ }
+ delete(p.servers, server)
+ for name := range p.names {
+ key := publishKey{name, server}
+ if pe, ok := p.entries[key]; ok {
+ pe.DesiredState = rpc.PublisherUnmounted
+ p.entries[key] = pe
+ }
+ }
+ p.notifyChanged()
+}
+
+// Status returns a snapshot of the publisher's current state.
+// The returned channel is closed when the state has become stale and the caller
+// should repoll Status.
+func (p *T) Status() ([]rpc.PublisherEntry, <-chan struct{}) {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ st := make([]rpc.PublisherEntry, 0, len(p.entries))
+ now := time.Now()
+ for _, e := range p.entries {
+ mountDelta := now.Sub(e.LastMount)
+ switch {
+ case e.LastMount.IsZero() && e.LastUnmount.IsZero():
+ e.LastState = rpc.PublisherUnmounted
+ case e.LastUnmount.After(e.LastMount) && e.LastUnmountErr == nil:
+ e.LastState = rpc.PublisherUnmounted
+ case mountDelta > p.period+2*mountTTLSlack:
+ e.LastState = rpc.PublisherUnmounted
+ case mountDelta < p.period:
+ e.LastState = rpc.PublisherMounted
+ case e.LastUnmount.After(e.LastMount):
+ e.LastState = rpc.PublisherUnmounting
+ default:
+ e.LastState = rpc.PublisherMounting
+ }
+ st = append(st, e)
+ }
+ return st, p.dirty
+}
+
+// String returns a string representation of the publisher.
+func (p *T) String() string {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ l := make([]string, 0, 2+len(p.entries))
+ l = append(l, fmt.Sprintf("Publisher period:%v", p.period))
+ l = append(l, "==============================Mounts============================================")
+ for key, entry := range p.entries {
+ l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v, %v) unmount(%v, %v) Last: %s, Desired: %s ", key.name, key.server,
+ entry.LastMount, entry.LastMountErr, entry.TTL, entry.LastUnmount, entry.LastUnmountErr, entry.LastState, entry.DesiredState))
+ }
+ return strings.Join(l, "\n")
+}
+
+// Closed returns a channel that is closed when the publisher context is cancelled,
+// and all unmount operations terminate.
+func (p *T) Closed() <-chan struct{} {
+ return p.closed
+}
+
+// publish makes RPCs to the mounttable to mount and unmount entries.
+// If refreshAll is true, then all entries will be refreshed.
+// Otherwise fresh changes in entries will be updated (i.e. AddName, RemoveName, etc.)
+func (p *T) publish(refreshAll bool) {
+ mounts, unmounts := p.entriesToPublish(refreshAll)
+
+ // TODO(suharshs): We could potentially do these mount and unmount rpcs in parallel.
+ mountEntries := make([]rpc.PublisherEntry, 0, len(mounts))
+ unmountEntries := make([]rpc.PublisherEntry, 0, len(unmounts))
+ for _, params := range mounts {
+ mountEntries = append(mountEntries, p.mount(params))
+ }
+ for _, params := range unmounts {
+ unmountEntries = append(unmountEntries, p.unmount(params))
+ }
+
+ // Update p.entries with the new entries.
+ p.updateEntries(mountEntries, unmountEntries)
+}
+
+func (p *T) entriesToPublish(refreshAll bool) ([]mountParams, []unmountParams) {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ var mounts []mountParams
+ var unmounts []unmountParams
+ for key, entry := range p.entries {
+ if entry.DesiredState == rpc.PublisherUnmounted {
+ if entry.LastState == rpc.PublisherUnmounted {
+ delete(p.entries, key)
+ } else if refreshAll || entry.LastUnmount.IsZero() {
+ unmounts = append(unmounts, unmountParams{entry: entry, retry: true})
+ }
+ } else {
+ if refreshAll || entry.LastMount.IsZero() {
+ mounts = append(mounts, mountParams{entry: entry, attr: p.names[key.name]})
}
}
}
- return st
+ return mounts, unmounts
}
-// TODO(toddw): sort the names/servers so that the output order is stable.
-func (ps *pubState) debugString() string {
- l := make([]string, 2+len(ps.mounts))
- l = append(l, fmt.Sprintf("Publisher period:%v deadline:%v", ps.period, ps.deadline))
- l = append(l, "==============================Mounts============================================")
- for key, status := range ps.mounts {
- l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v, %s) unmount(%v, %v)", key.name, key.server, status.LastMount, status.LastMountErr, status.TTL, status.LastUnmount, status.LastUnmountErr))
+func (p *T) updateEntries(mountEntries, unmountEntries []rpc.PublisherEntry) {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ for _, entry := range mountEntries {
+ key := publishKey{entry.Name, entry.Server}
+ // Ensure that the DesiredState, that may have been changed while the
+ // lock was released, is not overwritten by the DesiredState of entry.
+ if current, ok := p.entries[key]; ok {
+ entry.DesiredState = current.DesiredState
+ }
+ p.entries[key] = entry
}
- return strings.Join(l, "\n")
+ for _, entry := range unmountEntries {
+ key := publishKey{entry.Name, entry.Server}
+ // Ensure that we don't delete the entry if the DesiredState was
+ // changed while the lock was released.
+ if current, ok := p.entries[key]; ok {
+ entry.DesiredState = current.DesiredState
+ }
+ if entry.DesiredState == rpc.PublisherUnmounted && entry.LastUnmountErr == nil {
+ delete(p.entries, key)
+ } else {
+ p.entries[key] = entry
+ }
+ }
+ close(p.dirty)
+ p.dirty = make(chan struct{})
+}
+
+func (p *T) stop() {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ p.names = nil
+ p.servers = nil
+ // We make one final attempt to unmount everything; we ignore failures here,
+ // and don't retry, since the mounts will eventually timeout anyways.
+ for _, pe := range p.entries {
+ p.unmount(unmountParams{entry: pe, retry: false})
+ }
+ p.cancel()
+ close(p.dirty)
+}
+
+func (p *T) notifyChanged() {
+ // We ensure that callers of this function (i.e AddName, RemoveName, AddServer
+ // RemoveServer) do not block. We do this by giving p.changed a buffer of size 1,
+ // and adding a default clause below. This allows multiple calls to notifyChanged
+ // to complete before the internal goroutine processes the change.
+ select {
+ case p.changed <- struct{}{}:
+ default:
+ }
+}
+
+type mountParams struct {
+ entry rpc.PublisherEntry
+ attr nameAttr
+}
+
+// mount makes an mount RPC to the entry described in params. It returns a new
+// rpc.PublisherEntry, updated with the results of the RPC.
+func (p *T) mount(params mountParams) rpc.PublisherEntry {
+ last, entry, attr := params.entry, params.entry, params.attr
+ // Always mount with ttl = period + slack.
+ // The next call to publish call will occur within the next period.
+ ttl := p.period + mountTTLSlack
+ entry.LastMount = time.Now()
+ // Ensure that LastMount > LastUnmount to make it easier to check for the last
+ // tried operation.
+ if entry.LastMount.Before(entry.LastUnmount) {
+ entry.LastMount = entry.LastUnmount.Add(1)
+ }
+ entry.LastMountErr = p.ns.Mount(p.ctx, entry.Name, entry.Server, ttl, naming.ServesMountTable(attr.servesMT), naming.IsLeaf(attr.isLeaf))
+ entry.TTL = ttl
+ // If the mount entry changed, log it.
+ if entry.LastMountErr != nil {
+ if verror.ErrorID(last.LastMountErr) != verror.ErrorID(entry.LastMountErr) || p.ctx.V(2) {
+ p.ctx.Errorf("rpc pub: couldn't mount(%v, %v, %v): %v", entry.Name, entry.Server, ttl, entry.LastMountErr)
+ }
+ } else {
+ entry.LastState = rpc.PublisherMounted
+ if last.LastMount.IsZero() || last.LastMountErr != nil || p.ctx.V(2) {
+ p.ctx.Infof("rpc pub: mount(%v, %v, %v)", entry.Name, entry.Server, ttl)
+ }
+ }
+ return entry
+}
+
+type unmountParams struct {
+ entry rpc.PublisherEntry
+ retry bool
+}
+
+// unmount makes an unmount RPC to the entry described in params. It returns a
+// new rpc.PublisherEntry, updated with the results of the RPC.
+func (p *T) unmount(params unmountParams) rpc.PublisherEntry {
+ entry := params.entry
+ var opts []naming.NamespaceOpt
+ if !params.retry {
+ opts = []naming.NamespaceOpt{options.NoRetry{}}
+ }
+ entry.LastUnmount = time.Now()
+ // Ensure that LastUnmount > LastMount to make it easier to check for the last
+ // tried operation.
+ if entry.LastUnmount.Before(entry.LastMount) {
+ entry.LastUnmount = entry.LastMount.Add(1)
+ }
+ entry.LastUnmountErr = p.ns.Unmount(p.ctx, entry.Name, entry.Server, opts...)
+ if entry.LastUnmountErr != nil {
+ p.ctx.Errorf("rpc pub: couldn't unmount(%v, %v): %v", entry.Name, entry.Server, entry.LastUnmountErr)
+ } else {
+ entry.LastState = rpc.PublisherUnmounted
+ p.ctx.VI(1).Infof("rpc pub: unmount(%v, %v)", entry.Name, entry.Server)
+ }
+ return entry
}
diff --git a/lib/publisher/publisher_test.go b/lib/publisher/publisher_test.go
index 2c468cb..6386bb4 100644
--- a/lib/publisher/publisher_test.go
+++ b/lib/publisher/publisher_test.go
@@ -5,7 +5,6 @@
package publisher_test
import (
- "fmt"
"reflect"
"sort"
"testing"
@@ -14,6 +13,7 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/namespace"
+ "v.io/v23/rpc"
"v.io/x/ref/lib/publisher"
_ "v.io/x/ref/runtime/factories/generic"
@@ -51,7 +51,8 @@
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
ns := v23.GetNamespace(ctx)
- pub := publisher.New(ctx, ns, time.Second)
+ pubctx, cancel := context.WithCancel(ctx)
+ pub := publisher.New(pubctx, ns, time.Second)
pub.AddName("foo", false, false)
pub.AddServer("foo:8000")
if got, want := resolveWithRetry(t, ns, ctx, "foo", 1), []string{"/foo:8000"}; !reflect.DeepEqual(got, want) {
@@ -71,17 +72,19 @@
}
pub.RemoveName("foo")
verifyMissing(t, ns, ctx, "foo")
- pub.Stop()
- pub.WaitForStop()
+
+ cancel()
+ <-pub.Closed()
}
func TestStatus(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
ns := v23.GetNamespace(ctx)
- pub := publisher.New(ctx, ns, time.Second)
+ pubctx, cancel := context.WithCancel(ctx)
+ pub := publisher.New(pubctx, ns, time.Second)
pub.AddName("foo", false, false)
- status := pub.Status()
+ status, _ := pub.Status()
if got, want := len(status), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
@@ -89,54 +92,94 @@
// Wait for the publisher to asynchronously publish the
// requisite number of servers.
- ch := make(chan error, 1)
waitFor := func(n int) {
- deadline := time.Now().Add(time.Minute)
for {
- status = pub.Status()
+ status, dirty := pub.Status()
if got, want := len(status), n; got != want {
- if time.Now().After(deadline) {
- ch <- fmt.Errorf("got %d, want %d", got, want)
- return
- }
- time.Sleep(100 * time.Millisecond)
+ <-dirty
} else {
- ch <- nil
return
}
}
}
- go waitFor(1)
- if err := <-ch; err != nil {
- t.Fatalf("%s", err)
- }
+ waitFor(1)
pub.AddServer("bar:8000")
pub.AddName("baz", false, false)
- go waitFor(4)
- if err := <-ch; err != nil {
- t.Fatalf("%s", err)
- }
+ waitFor(4)
- status = pub.Status()
- names := status.Names()
- if got, want := names, []string{"baz", "foo"}; !reflect.DeepEqual(got, want) {
+ status, _ = pub.Status()
+ names, servers := publisherNamesAndServers(status)
+ // There will be two of each name and two of each server in the mount entries.
+ if got, want := names, []string{"baz", "baz", "foo", "foo"}; !reflect.DeepEqual(got, want) {
t.Errorf("got %q, want %q", got, want)
}
- servers := status.Servers()
- if got, want := servers, []string{"bar:8000", "foo:8000"}; !reflect.DeepEqual(got, want) {
+ if got, want := servers, []string{"bar:8000", "bar:8000", "foo:8000", "foo:8000"}; !reflect.DeepEqual(got, want) {
t.Errorf("got %q, want %q", got, want)
}
pub.RemoveName("foo")
+ waitFor(2)
verifyMissing(t, ns, ctx, "foo")
- status = pub.Status()
- go waitFor(2)
- if err := <-ch; err != nil {
- t.Fatalf("%s", err)
+ status, _ = pub.Status()
+ names, servers = publisherNamesAndServers(status)
+ if got, want := names, []string{"baz", "baz"}; !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
}
- pub.Stop()
- pub.WaitForStop()
+ if got, want := servers, []string{"bar:8000", "foo:8000"}; !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+
+ cancel()
+ <-pub.Closed()
+}
+
+func TestRemoveFailedAdd(t *testing.T) {
+ // Test that removing an already unmounted name (due to an error while mounting),
+ // results in a Status that has no entries.
+ // We use v23.Init instead of v23.InitWithMounTable to make all calls to the mounttable fail.
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ ns := v23.GetNamespace(ctx)
+ pubctx, cancel := context.WithCancel(ctx)
+ pub := publisher.New(pubctx, ns, time.Second)
+ pub.AddServer("foo:8000")
+ // Adding a name should result in one entry in the publisher with state PublisherMounting, since
+ // it can never successfully mount.
+ pub.AddName("foo", false, false)
+ status, _ := pub.Status()
+ if got, want := len(status), 1; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ // We want to ensure that the LastState is either:
+ // PublisherUnmounted if the publisher hasn't tried to mount yet, or
+ // PublisherMounting if the publisher tried to mount but failed.
+ if got, want := status[0].LastState, rpc.PublisherMounting; got > want {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+ // Removing "foo" should result in an empty Status.
+ pub.RemoveName("foo")
+ for {
+ status, dirty := pub.Status()
+ if got, want := len(status), 0; got != want {
+ <-dirty
+ } else {
+ return
+ }
+ }
+
+ cancel()
+ <-pub.Closed()
+}
+
+func publisherNamesAndServers(entries []rpc.PublisherEntry) (names []string, servers []string) {
+ for _, e := range entries {
+ names = append(names, e.Name)
+ servers = append(servers, e.Server)
+ }
+ sort.Strings(names)
+ sort.Strings(servers)
+ return names, servers
}
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index 55eac9e..b77ac37 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -29,6 +29,17 @@
ctx, shutdown := v23.Init()
defer shutdown()
+ waitForEndpoints := func(server rpc.Server, n int) rpc.ServerStatus {
+ for {
+ status := server.Status()
+ if got, want := len(status.Endpoints), n; got != want {
+ <-status.Valid
+ } else {
+ return status
+ }
+ }
+ }
+
ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
return NewClient(ctx, opts...)
})
@@ -67,18 +78,10 @@
n1 := netstate.NewNetAddr("ip", "1.1.1.1")
n2 := netstate.NewNetAddr("ip", "2.2.2.2")
- change := status.Valid
-
ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
- // We should be notified of a network change.
- <-change
- status = server.Status()
- eps := status.Endpoints
- change = status.Valid
// We expect 4 new endpoints, 2 for each valid listen call.
- if got, want := len(eps), len(prevEps)+4; got != want {
- t.Errorf("got %v, want %v", got, want)
- }
+ status = waitForEndpoints(server, len(prevEps)+4)
+ eps := status.Endpoints
// We expect the added networks to be in the new endpoints.
if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 2; got != want {
t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
@@ -90,14 +93,8 @@
// Now remove a network.
ch <- roaming.NewRmAddrsSetting([]net.Addr{n1})
- <-change
- status = server.Status()
+ status = waitForEndpoints(server, len(prevEps)-2)
eps = status.Endpoints
- change = status.Valid
- // We expect 2 endpoints to be missing.
- if got, want := len(eps), len(prevEps)-2; got != want {
- t.Errorf("got %v, want %v", got, want)
- }
// We expect the removed network to not be in the new endpoints.
if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 0; got != want {
t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
@@ -106,21 +103,18 @@
// Now remove everything, essentially "disconnected from the network"
ch <- roaming.NewRmAddrsSetting(getIPAddrs(prevEps))
- <-change
- status = server.Status()
- eps = status.Endpoints
- change = status.Valid
// We expect there to be only the bidi endpoint.
+ status = waitForEndpoints(server, 1)
+ eps = status.Endpoints
if got, want := len(eps), 1; got != want && eps[0].Addr().Network() != "bidi" {
t.Errorf("got %v, want %v", got, want)
}
// Now if we reconnect to a network it should should up.
ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1})
- <-change
- status = server.Status()
- eps = status.Endpoints
// We expect 2 endpoints to be added
+ status = waitForEndpoints(server, 2)
+ eps = status.Endpoints
if got, want := len(eps), 2; got != want {
t.Errorf("got %v, want %v", got, want)
}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 19ed9dc..e812c67 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -62,13 +62,13 @@
ctx *context.T
cancel context.CancelFunc // function to cancel the above context.
flowMgr flow.Manager
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
+ settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
valid chan struct{}
blessings security.Blessings
typeCache *typeCache
state rpc.ServerState // the current state of the server.
stopProxy context.CancelFunc
+ publisher *publisher.T // publisher to publish mounttable mounts.
endpoints map[string]*inaming.Endpoint // endpoints that the server is listening on.
lnErrors map[struct{ Protocol, Address string }]error // errors from listening
@@ -186,7 +186,10 @@
s.cancel()
return ctx, nil, err
}
- s.publisher = publisher.New(s.ctx, v23.GetNamespace(s.ctx), publishPeriod)
+ pubctx, pubcancel := context.WithCancel(s.ctx)
+ s.publisher = publisher.New(pubctx, v23.GetNamespace(s.ctx), publishPeriod)
+ s.active.Add(1)
+ go s.monitorPubStatus(ctx)
// TODO(caprita): revist printing the blessings with string, and
// instead expose them as a list.
@@ -209,13 +212,13 @@
defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
s.stats.stop()
- s.publisher.Stop()
+ pubcancel()
s.stopProxy()
done := make(chan struct{})
go func() {
s.flowMgr.StopListening(ctx)
- s.publisher.WaitForStop()
+ <-s.publisher.Closed()
// At this point no new flows should arrive. Wait for existing calls
// to complete.
s.active.Wait()
@@ -233,9 +236,9 @@
// ongoing requests. Hopefully this will bring all outstanding
// operations to a close.
s.cancel()
- // Note that since the context has been canceled, publisher.WaitForStop and <-flowMgr.Closed()
+ // Note that since the context has been canceled, <-publisher.Closed() and <-flowMgr.Closed()
// should return right away.
- s.publisher.WaitForStop()
+ <-s.publisher.Closed()
<-s.flowMgr.Closed()
s.Lock()
close(s.valid)
@@ -251,11 +254,39 @@
return s.ctx, s, nil
}
+// monitorPubStatus guarantees that the ServerStatus.Valid channel is closed
+// when the publisher state becomes dirty. Since we also get the publisher.Status()
+// in the Status method, its possible that the Valid channel in the returned
+// ServerStatus will close spuriously by this goroutine.
+func (s *server) monitorPubStatus(ctx *context.T) {
+ defer s.active.Done()
+ var pubDirty <-chan struct{}
+ s.Lock()
+ _, pubDirty = s.publisher.Status()
+ s.Unlock()
+ for {
+ select {
+ case <-pubDirty:
+ s.Lock()
+ _, pubDirty = s.publisher.Status()
+ s.updateValidLocked()
+ s.Unlock()
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
func (s *server) Status() rpc.ServerStatus {
status := rpc.ServerStatus{}
status.ServesMountTable = s.servesMountTable
- status.Mounts = s.publisher.Status()
s.Lock()
+ // We call s.publisher.Status here instead of using a publisher status cached
+ // by s.monitorPubStatus, because we want to guarantee that s.AddName/AddServer
+ // calls have the added publisher entries in the returned s.Status() immediately.
+ // i.e. s.AddName("foo")
+ // s.Status().PublisherStatus // Should have entry an for "foo".
+ status.PublisherStatus, _ = s.publisher.Status()
status.Valid = s.valid
status.State = s.state
for _, e := range s.endpoints {
diff --git a/runtime/internal/rpc/test/server_test.go b/runtime/internal/rpc/test/server_test.go
index a2e201f..a2ca391 100644
--- a/runtime/internal/rpc/test/server_test.go
+++ b/runtime/internal/rpc/test/server_test.go
@@ -18,6 +18,7 @@
"v.io/v23/security"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/test"
+ "v.io/x/ref/test/testutil"
)
type noMethodsType struct{ Field string }
@@ -137,7 +138,7 @@
waitForStatus(rpc.ServerStopped)
}
-func TestMountStatus(t *testing.T) {
+func TestPublisherStatus(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
@@ -152,44 +153,26 @@
if err != nil {
t.Fatal(err)
}
- status := server.Status()
+ status := testutil.WaitForServerPublished(server)
+ if got, want := len(status.PublisherStatus), 2; got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
eps := server.Status().Endpoints
if got, want := len(eps), 2; got != want {
t.Fatalf("got %d, want %d", got, want)
}
setLeafEndpoints(eps)
- if got, want := len(status.Mounts), 2; got != want {
- t.Fatalf("got %d, want %d", got, want)
- }
- servers := status.Mounts.Servers()
- if got, want := len(servers), 2; got != want {
- t.Fatalf("got %d, want %d", got, want)
- }
- if got, want := servers, endpointToStrings(eps); !reflect.DeepEqual(got, want) {
- t.Fatalf("got %v, want %v", got, want)
- }
// Add a second name and we should now see 4 mounts, 2 for each name.
if err := server.AddName("bar"); err != nil {
t.Fatal(err)
}
- status = server.Status()
- if got, want := len(status.Mounts), 4; got != want {
- t.Fatalf("got %d, want %d", got, want)
- }
- servers = status.Mounts.Servers()
- if got, want := len(servers), 2; got != want {
- t.Fatalf("got %d, want %d", got, want)
- }
- if got, want := servers, endpointToStrings(eps); !reflect.DeepEqual(got, want) {
- t.Fatalf("got %v, want %v", got, want)
- }
- names := status.Mounts.Names()
- if got, want := len(names), 2; got != want {
- t.Fatalf("got %d, want %d", got, want)
+ status = testutil.WaitForServerPublished(server)
+ if got, want := len(status.PublisherStatus), 4; got != want {
+ t.Errorf("got %d, want %d", got, want)
}
serversPerName := map[string][]string{}
- for _, ms := range status.Mounts {
+ for _, ms := range status.PublisherStatus {
serversPerName[ms.Name] = append(serversPerName[ms.Name], ms.Server)
}
if got, want := len(serversPerName), 2; got != want {
@@ -197,7 +180,11 @@
}
for _, name := range []string{"foo", "bar"} {
if got, want := len(serversPerName[name]), 2; got != want {
- t.Fatalf("got %d, want %d", got, want)
+ t.Errorf("got %d, want %d", got, want)
+ }
+ sort.Strings(serversPerName[name])
+ if got, want := serversPerName[name], endpointToStrings(eps); !reflect.DeepEqual(got, want) {
+ t.Errorf("got %v, want %v", got, want)
}
}
}
diff --git a/services/mounttable/mounttablelib/servers.go b/services/mounttable/mounttablelib/servers.go
index b0f6eb0..5e289d9 100644
--- a/services/mounttable/mounttablelib/servers.go
+++ b/services/mounttable/mounttablelib/servers.go
@@ -35,8 +35,17 @@
return "", nil, err
}
stopFuncs = append(stopFuncs, mtServer.Stop)
- mtEndpoints := mtServer.Status().Endpoints
- mtName := mtEndpoints[0].Name()
+ var mtName string
+ var mtEndpoints []naming.Endpoint
+ for {
+ status := mtServer.Status()
+ mtEndpoints = status.Endpoints
+ mtName = mtEndpoints[0].Name()
+ if mtEndpoints[0].Addr().Network() != "bidi" {
+ break
+ }
+ <-status.Valid
+ }
ctx.Infof("Mount table service at: %q endpoint: %s", mountName, mtName)
if len(nhName) > 0 {
diff --git a/services/wspr/internal/browspr/browspr_test.go b/services/wspr/internal/browspr/browspr_test.go
index 25594f9..86d933f 100644
--- a/services/wspr/internal/browspr/browspr_test.go
+++ b/services/wspr/internal/browspr/browspr_test.go
@@ -94,7 +94,7 @@
found:
for {
status := mockServer.Status()
- for _, v := range status.Mounts {
+ for _, v := range status.PublisherStatus {
if v.Name == mockServerName && v.Server == mockServerEndpoint.String() && !v.LastMount.IsZero() {
if v.LastMountErr != nil {
t.Fatalf("Failed to mount %s: %v", v.Name, v.LastMountErr)
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index 47544d6..1472915 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -577,22 +577,22 @@
lastErrors := map[string]string{}
for {
status := s.server.Status()
- for _, mountStatus := range status.Mounts {
+ for _, e := range status.PublisherStatus {
var errMsg string
- if mountStatus.LastMountErr != nil {
- errMsg = mountStatus.LastMountErr.Error()
+ if e.LastMountErr != nil {
+ errMsg = e.LastMountErr.Error()
}
- mountName := mountStatus.Name
- if lastMessage, ok := lastErrors[mountName]; !ok || errMsg != lastMessage {
+ name := e.Name
+ if lastMessage, ok := lastErrors[name]; !ok || errMsg != lastMessage {
if errMsg == "" {
s.helper.SendLogMessage(
- lib.LogLevelInfo, "serve: "+mountName+" successfully mounted ")
+ lib.LogLevelInfo, "serve: "+name+" successfully mounted ")
} else {
s.helper.SendLogMessage(
- lib.LogLevelError, "serve: "+mountName+" failed with: "+errMsg)
+ lib.LogLevelError, "serve: "+name+" failed with: "+errMsg)
}
}
- lastErrors[mountName] = errMsg
+ lastErrors[name] = errMsg
}
select {
case <-time.After(10 * time.Second):
diff --git a/services/xproxy/xproxy/proxy.go b/services/xproxy/xproxy/proxy.go
index 4ce8d36..171f8c05 100644
--- a/services/xproxy/xproxy/proxy.go
+++ b/services/xproxy/xproxy/proxy.go
@@ -31,7 +31,7 @@
type proxy struct {
m flow.Manager
- pub publisher.Publisher
+ pub *publisher.T
closed chan struct{}
auth security.Authorizer
wg sync.WaitGroup
@@ -84,8 +84,7 @@
p.mu.Lock()
p.closing = true
p.mu.Unlock()
- p.pub.Stop()
- p.pub.WaitForStop()
+ <-p.pub.Closed()
p.wg.Wait()
<-p.m.Closed()
close(p.closed)
diff --git a/test/testutil/rpc.go b/test/testutil/rpc.go
new file mode 100644
index 0000000..ac599d5
--- /dev/null
+++ b/test/testutil/rpc.go
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package testutil
+
+import (
+ "v.io/v23/rpc"
+)
+
+// WaitForServerPublished blocks until all published mounts/unmounts have reached
+// their desired state, and returns the resulting server status.
+func WaitForServerPublished(s rpc.Server) rpc.ServerStatus {
+ for {
+ status := s.Status()
+ if checkAllPublished(status) {
+ return status
+ }
+ <-status.Valid
+ }
+}
+
+func checkAllPublished(status rpc.ServerStatus) bool {
+ for _, e := range status.PublisherStatus {
+ if e.LastState != e.DesiredState {
+ return false
+ }
+ }
+ return true
+}