ref: Introduce valid channel to publisher and reimplement publisher
so that calls are truly async.

This uncovered quite a few tests that were passing just because the
old publisher was semi-synchronous when it used an unbuffered chan.

Also introduce PublisherState to the PublisherStatus(formerly Mounts).

Fixes vanadium/issues#1148

MultiPart: 3/4

Change-Id: Ifa906b5ee0fb1676d1ba1fb6977979a79060bbb4
diff --git a/examples/rps/rpsbot/impl_test.go b/examples/rps/rpsbot/impl_test.go
index fdf6bd0..1a1411a 100644
--- a/examples/rps/rpsbot/impl_test.go
+++ b/examples/rps/rpsbot/impl_test.go
@@ -19,6 +19,7 @@
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/x/ref/examples/rps"
+	"v.io/x/ref/test/testutil"
 	"v.io/x/ref/test/v23test"
 )
 
@@ -32,6 +33,8 @@
 	for _, n := range names[1:] {
 		server.AddName(n)
 	}
+	testutil.WaitForServerPublished(server)
+
 	return rpsService, func() {
 		if err := server.Stop(); err != nil {
 			t.Fatalf("Stop() failed: %v", err)
diff --git a/lib/publisher/publisher.go b/lib/publisher/publisher.go
index 7e45d5d..eb69bb7 100644
--- a/lib/publisher/publisher.go
+++ b/lib/publisher/publisher.go
@@ -5,12 +5,10 @@
 // Package publisher provides a type to publish names to a mounttable.
 package publisher
 
-// TODO(toddw): Add unittests.
-
 import (
 	"fmt"
-	"sort"
 	"strings"
+	"sync"
 	"time"
 
 	"v.io/v23/context"
@@ -21,192 +19,25 @@
 	"v.io/v23/verror"
 )
 
-// Publisher manages the publishing of servers in mounttable.
-type Publisher interface {
-	// AddServer adds a new server to be mounted.
-	AddServer(server string)
-	// RemoveServer removes a server from the list of mounts.
-	RemoveServer(server string)
-	// AddName adds a new name for all servers to be mounted as.
-	AddName(name string, ServesMountTable bool, IsLeaf bool)
-	// RemoveName removes a name.
-	RemoveName(name string)
-	// Status returns a snapshot of the publisher's current state.
-	Status() rpc.MountState
-	// DebugString returns a string representation of the publisher
-	// meant solely for debugging.
-	DebugString() string
-	// Stop causes the publishing to stop and initiates unmounting of the
-	// mounted names.  Stop performs the unmounting asynchronously, and
-	// WaitForStop should be used to wait until it is done.
-	// Once Stop is called Add/RemoveServer and AddName become noops.
-	Stop()
-	// WaitForStop waits until all unmounting initiated by Stop is finished.
-	WaitForStop()
-}
-
 // The publisher adds this much slack to each TTL.
 const mountTTLSlack = 20 * time.Second
 
-// publisher maintains the name->server associations in the mounttable.  It
-// spawns its own goroutine that does the actual work; the publisher itself
-// simply coordinates concurrent access by sending and receiving on the
-// appropriate channels.
-type publisher struct {
-	cmdchan  chan interface{} // value is one of {server,name,debug}Cmd
-	stopchan chan struct{}    // closed when no longer accepting commands.
-	donechan chan struct{}    // closed when the publisher is done
-	ctx      *context.T
-}
+// T manages the publishing of names and servers in the mounttable.
+// It spawns an internal goroutine the periodically performs mount and unmount
+// rpcs. T is safe to use concurrently.
+type T struct {
+	ctx    *context.T         // context used to make rpcs
+	cancel context.CancelFunc // cancel function for the above ctx
+	ns     namespace.T
+	period time.Duration
+	closed chan struct{} // closed when the Publisher is closed
 
-type addServerCmd struct {
-	server string // server to add
-}
-
-type removeServerCmd struct {
-	server string // server to remove
-}
-
-type addNameCmd struct {
-	name string // name to add
-	mt   bool   // true if server serves a mount table
-	leaf bool   // true if server is a leaf
-}
-
-type removeNameCmd struct {
-	name string // name to remove
-}
-
-type debugCmd chan string // debug string is sent when the cmd is done
-
-type statusCmd chan rpc.MountState // status info is sent when cmd is done
-
-type stopCmd struct{} // sent to the runloop when we want it to exit.
-
-// New returns a new publisher that updates mounts on ns every period.
-func New(ctx *context.T, ns namespace.T, period time.Duration) Publisher {
-	p := &publisher{
-		cmdchan:  make(chan interface{}),
-		stopchan: make(chan struct{}),
-		donechan: make(chan struct{}),
-		ctx:      ctx,
-	}
-	go runLoop(ctx, p.cmdchan, p.donechan, ns, period)
-	return p
-}
-
-func (p *publisher) sendCmd(cmd interface{}) bool {
-	select {
-	case p.cmdchan <- cmd:
-		return true
-	case <-p.stopchan:
-		return false
-	case <-p.donechan:
-		return false
-	}
-}
-
-func (p *publisher) AddServer(server string) {
-	p.sendCmd(addServerCmd{server})
-}
-
-func (p *publisher) RemoveServer(server string) {
-	p.sendCmd(removeServerCmd{server})
-}
-
-func (p *publisher) AddName(name string, mt bool, leaf bool) {
-	p.sendCmd(addNameCmd{name, mt, leaf})
-}
-
-func (p *publisher) RemoveName(name string) {
-	p.sendCmd(removeNameCmd{name})
-}
-
-func (p *publisher) Status() rpc.MountState {
-	status := make(statusCmd)
-	if p.sendCmd(status) {
-		return <-status
-	}
-	return rpc.MountState{}
-}
-
-func (p *publisher) DebugString() (dbg string) {
-	debug := make(debugCmd)
-	if p.sendCmd(debug) {
-		dbg = <-debug
-	} else {
-		dbg = "stopped"
-	}
-	return
-}
-
-// Stop stops the publisher, which in practical terms means un-mounting
-// everything and preventing any further publish operations.  The caller can
-// be confident that no new names or servers will get published once Stop
-// returns.  To wait for existing mounts to be cleaned up, use WaitForStop.
-//
-// Stopping the publisher is irreversible.
-//
-// Once the publisher is stopped, any further calls on its public methods
-// (including Stop) are no-ops.
-func (p *publisher) Stop() {
-	p.sendCmd(stopCmd{})
-	close(p.stopchan) // stop accepting new commands now.
-}
-
-func (p *publisher) WaitForStop() {
-	<-p.donechan
-}
-
-func runLoop(ctx *context.T, cmdchan chan interface{}, donechan chan struct{}, ns namespace.T, period time.Duration) {
-	ctx.VI(2).Info("rpc pub: start runLoop")
-	state := newPubState(ctx, ns, period)
-	for {
-		select {
-		case cmd := <-cmdchan:
-			switch tcmd := cmd.(type) {
-			case stopCmd:
-				state.unmountAll()
-				close(donechan)
-				ctx.VI(2).Info("rpc pub: exit runLoop")
-				return
-			case addServerCmd:
-				state.addServer(tcmd.server)
-			case removeServerCmd:
-				state.removeServer(tcmd.server)
-			case addNameCmd:
-				state.addName(tcmd.name, tcmd.mt, tcmd.leaf)
-			case removeNameCmd:
-				state.removeName(tcmd.name)
-			case statusCmd:
-				tcmd <- state.getStatus()
-				close(tcmd)
-			case debugCmd:
-				tcmd <- state.debugString()
-				close(tcmd)
-			}
-		case <-state.timeout():
-			// Sync everything once every period, to refresh the ttls.
-			state.sync()
-		}
-	}
-}
-
-type mountKey struct {
-	name, server string
-}
-
-// pubState maintains the state for our periodic mounts.  It is not thread-safe;
-// it's only used in the sequential publisher runLoop.
-type pubState struct {
-	ctx      *context.T
-	ns       namespace.T
-	period   time.Duration
-	deadline time.Time           // deadline for the next sync call
-	names    map[string]nameAttr // names that have been added
-	servers  map[string]bool     // servers that have been added, true
-	// map each (name,server) to its status.
-	mounts map[mountKey]*rpc.MountStatus
+	mu      sync.Mutex
+	changed chan struct{}
+	dirty   chan struct{}
+	names   map[string]nameAttr               // names that have been added
+	servers map[string]bool                   // servers that have been added
+	entries map[publishKey]rpc.PublisherEntry // map each (name,server) to its entry
 }
 
 type nameAttr struct {
@@ -214,176 +45,337 @@
 	isLeaf   bool
 }
 
-func newPubState(ctx *context.T, ns namespace.T, period time.Duration) *pubState {
-	return &pubState{
-		ctx:      ctx,
-		ns:       ns,
-		period:   period,
-		deadline: time.Now().Add(period),
-		names:    make(map[string]nameAttr),
-		servers:  make(map[string]bool),
-		mounts:   make(map[mountKey]*rpc.MountStatus),
+type publishKey struct {
+	name, server string
+}
+
+// New returns a new publisher that updates mounts on ns every period, and when
+// changes are made to the state.
+func New(ctx *context.T, ns namespace.T, period time.Duration) *T {
+	p := &T{
+		ns:      ns,
+		period:  period,
+		closed:  make(chan struct{}),
+		changed: make(chan struct{}, 1),
+		dirty:   make(chan struct{}),
+		names:   make(map[string]nameAttr),
+		servers: make(map[string]bool),
+		entries: make(map[publishKey]rpc.PublisherEntry),
 	}
+	timer := time.NewTimer(period) // timer for the next refresh publish call
+	// We create a new root context so that unmount RPCs can work even after the ctx
+	// passed in is closed.
+	p.ctx, p.cancel = context.WithRootCancel(ctx)
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				timer.Stop()
+				p.stop()
+				close(p.closed)
+				return
+			case <-timer.C:
+				timer.Reset(period)
+				p.publish(true)
+			case <-p.changed:
+				p.publish(false)
+			}
+		}
+	}()
+	return p
 }
 
-func (ps *pubState) timeout() <-chan time.Time {
-	return time.After(ps.deadline.Sub(time.Now()))
-}
-
-func (ps *pubState) addName(name string, mt bool, leaf bool) {
-	// Each non-dup name that is added causes new mounts to be created for all
-	// existing servers.
-	if _, exists := ps.names[name]; exists {
+// AddName adds a new name for all servers to be mounted as.
+func (p *T) AddName(name string, mt bool, leaf bool) {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	if p.names == nil {
 		return
 	}
-	attr := nameAttr{mt, leaf}
-	ps.names[name] = attr
-	for server, _ := range ps.servers {
-		status := new(rpc.MountStatus)
-		ps.mounts[mountKey{name, server}] = status
-		ps.mount(name, server, status, attr)
-	}
-}
-
-func (ps *pubState) removeName(name string) {
-	if _, exists := ps.names[name]; !exists {
+	if attr, exists := p.names[name]; exists && (attr.servesMT == mt && attr.isLeaf == leaf) {
 		return
 	}
-	for server, _ := range ps.servers {
-		if status, exists := ps.mounts[mountKey{name, server}]; exists {
-			ps.unmount(name, server, status, true)
-		}
-	}
-	delete(ps.names, name)
-}
-
-func (ps *pubState) addServer(server string) {
-	// Each non-dup server that is added causes new mounts to be created for all
-	// existing names.
-	if _, exists := ps.servers[server]; !exists {
-		ps.servers[server] = true
-		for name, attr := range ps.names {
-			status := new(rpc.MountStatus)
-			ps.mounts[mountKey{name, server}] = status
-			ps.mount(name, server, status, attr)
-		}
-	}
-}
-
-func (ps *pubState) removeServer(server string) {
-	if _, exists := ps.servers[server]; !exists {
-		return
-	}
-	delete(ps.servers, server)
-	for name, _ := range ps.names {
-		if status, exists := ps.mounts[mountKey{name, server}]; exists {
-			ps.unmount(name, server, status, true)
-		}
-	}
-}
-
-func (ps *pubState) mount(name, server string, status *rpc.MountStatus, attr nameAttr) {
-	// Always mount with ttl = period + slack, regardless of whether this is
-	// triggered by a newly added server or name, or by sync.  The next call
-	// to sync will occur within the next period, and refresh all mounts.
-	ttl := ps.period + mountTTLSlack
-	last := *status
-	status.LastMount = time.Now()
-	status.LastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTable(attr.servesMT), naming.IsLeaf(attr.isLeaf))
-	status.TTL = ttl
-	// If the mount status changed, log it.
-	if status.LastMountErr != nil {
-		if verror.ErrorID(last.LastMountErr) != verror.ErrorID(status.LastMountErr) || ps.ctx.V(2) {
-			ps.ctx.Errorf("rpc pub: couldn't mount(%v, %v, %v): %v", name, server, ttl, status.LastMountErr)
-		}
-	} else {
-		if last.LastMount.IsZero() || last.LastMountErr != nil || ps.ctx.V(2) {
-			ps.ctx.Infof("rpc pub: mount(%v, %v, %v)", name, server, ttl)
-		}
-	}
-}
-
-func (ps *pubState) sync() {
-	ps.deadline = time.Now().Add(ps.period) // set deadline for the next sync
-	for key, status := range ps.mounts {
-		if status.LastUnmountErr != nil {
-			// Desired state is "unmounted", failed at previous attempt. Retry.
-			ps.unmount(key.name, key.server, status, true)
+	p.names[name] = nameAttr{mt, leaf}
+	for server := range p.servers {
+		key := publishKey{name, server}
+		if pe, ok := p.entries[key]; ok {
+			pe.DesiredState = rpc.PublisherMounted
+			p.entries[key] = pe
 		} else {
-			ps.mount(key.name, key.server, status, ps.names[key.name])
+			p.entries[key] = rpc.PublisherEntry{Name: name, Server: server, DesiredState: rpc.PublisherMounted}
 		}
 	}
+	p.notifyChanged()
 }
 
-func (ps *pubState) unmount(name, server string, status *rpc.MountStatus, retry bool) {
-	status.LastUnmount = time.Now()
-	var opts []naming.NamespaceOpt
-	if !retry {
-		opts = []naming.NamespaceOpt{options.NoRetry{}}
+// RemoveName removes a name.
+func (p *T) RemoveName(name string) {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	if p.names == nil {
+		return
 	}
-	status.LastUnmountErr = ps.ns.Unmount(ps.ctx, name, server, opts...)
-	if status.LastUnmountErr != nil {
-		ps.ctx.Errorf("rpc pub: couldn't unmount(%v, %v): %v", name, server, status.LastUnmountErr)
-	} else {
-		ps.ctx.VI(1).Infof("rpc pub: unmount(%v, %v)", name, server)
-		delete(ps.mounts, mountKey{name, server})
+	if _, exists := p.names[name]; !exists {
+		return
 	}
-}
-
-func (ps *pubState) unmountAll() {
-	for key, status := range ps.mounts {
-		ps.unmount(key.name, key.server, status, false)
-	}
-}
-
-func copyNamesToSlice(sl map[string]nameAttr) []string {
-	var ret []string
-	for s, _ := range sl {
-		if len(s) == 0 {
-			continue
+	delete(p.names, name)
+	for server := range p.servers {
+		key := publishKey{name, server}
+		if pe, ok := p.entries[key]; ok {
+			pe.DesiredState = rpc.PublisherUnmounted
+			p.entries[key] = pe
 		}
-		ret = append(ret, s)
 	}
-	return ret
+	p.notifyChanged()
 }
 
-func copyServersToSlice(sl map[string]bool) []string {
-	var ret []string
-	for s, _ := range sl {
-		if len(s) == 0 {
-			continue
+// AddServer adds a new server to be mounted under all names.
+func (p *T) AddServer(server string) {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	if p.names == nil {
+		return
+	}
+	if _, exists := p.servers[server]; exists {
+		return
+	}
+	p.servers[server] = true
+	for name := range p.names {
+		key := publishKey{name, server}
+		if pe, ok := p.entries[key]; ok {
+			pe.DesiredState = rpc.PublisherMounted
+			p.entries[key] = pe
+		} else {
+			p.entries[key] = rpc.PublisherEntry{Name: name, Server: server, DesiredState: rpc.PublisherMounted}
 		}
-		ret = append(ret, s)
 	}
-	return ret
+	p.notifyChanged()
 }
 
-func (ps *pubState) getStatus() rpc.MountState {
-	st := make([]rpc.MountStatus, 0, len(ps.mounts))
-	names := copyNamesToSlice(ps.names)
-	servers := copyServersToSlice(ps.servers)
-	sort.Strings(names)
-	sort.Strings(servers)
-	for _, name := range names {
-		for _, server := range servers {
-			if v := ps.mounts[mountKey{name, server}]; v != nil {
-				mst := *v
-				mst.Name = name
-				mst.Server = server
-				st = append(st, mst)
+// RemoveServer removes a server from the list of mounts.
+func (p *T) RemoveServer(server string) {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	if p.names == nil {
+		return
+	}
+	if _, exists := p.servers[server]; !exists {
+		return
+	}
+	delete(p.servers, server)
+	for name := range p.names {
+		key := publishKey{name, server}
+		if pe, ok := p.entries[key]; ok {
+			pe.DesiredState = rpc.PublisherUnmounted
+			p.entries[key] = pe
+		}
+	}
+	p.notifyChanged()
+}
+
+// Status returns a snapshot of the publisher's current state.
+// The returned channel is closed when the state has become stale and the caller
+// should repoll Status.
+func (p *T) Status() ([]rpc.PublisherEntry, <-chan struct{}) {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	st := make([]rpc.PublisherEntry, 0, len(p.entries))
+	now := time.Now()
+	for _, e := range p.entries {
+		mountDelta := now.Sub(e.LastMount)
+		switch {
+		case e.LastMount.IsZero() && e.LastUnmount.IsZero():
+			e.LastState = rpc.PublisherUnmounted
+		case e.LastUnmount.After(e.LastMount) && e.LastUnmountErr == nil:
+			e.LastState = rpc.PublisherUnmounted
+		case mountDelta > p.period+2*mountTTLSlack:
+			e.LastState = rpc.PublisherUnmounted
+		case mountDelta < p.period:
+			e.LastState = rpc.PublisherMounted
+		case e.LastUnmount.After(e.LastMount):
+			e.LastState = rpc.PublisherUnmounting
+		default:
+			e.LastState = rpc.PublisherMounting
+		}
+		st = append(st, e)
+	}
+	return st, p.dirty
+}
+
+// String returns a string representation of the publisher.
+func (p *T) String() string {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	l := make([]string, 0, 2+len(p.entries))
+	l = append(l, fmt.Sprintf("Publisher period:%v", p.period))
+	l = append(l, "==============================Mounts============================================")
+	for key, entry := range p.entries {
+		l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v, %v) unmount(%v, %v) Last: %s, Desired: %s ", key.name, key.server,
+			entry.LastMount, entry.LastMountErr, entry.TTL, entry.LastUnmount, entry.LastUnmountErr, entry.LastState, entry.DesiredState))
+	}
+	return strings.Join(l, "\n")
+}
+
+// Closed returns a channel that is closed when the publisher context is cancelled,
+// and all unmount operations terminate.
+func (p *T) Closed() <-chan struct{} {
+	return p.closed
+}
+
+// publish makes RPCs to the mounttable to mount and unmount entries.
+// If refreshAll is true, then all entries will be refreshed.
+// Otherwise fresh changes in entries will be updated (i.e. AddName, RemoveName, etc.)
+func (p *T) publish(refreshAll bool) {
+	mounts, unmounts := p.entriesToPublish(refreshAll)
+
+	// TODO(suharshs): We could potentially do these mount and unmount rpcs in parallel.
+	mountEntries := make([]rpc.PublisherEntry, 0, len(mounts))
+	unmountEntries := make([]rpc.PublisherEntry, 0, len(unmounts))
+	for _, params := range mounts {
+		mountEntries = append(mountEntries, p.mount(params))
+	}
+	for _, params := range unmounts {
+		unmountEntries = append(unmountEntries, p.unmount(params))
+	}
+
+	// Update p.entries with the new entries.
+	p.updateEntries(mountEntries, unmountEntries)
+}
+
+func (p *T) entriesToPublish(refreshAll bool) ([]mountParams, []unmountParams) {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	var mounts []mountParams
+	var unmounts []unmountParams
+	for key, entry := range p.entries {
+		if entry.DesiredState == rpc.PublisherUnmounted {
+			if entry.LastState == rpc.PublisherUnmounted {
+				delete(p.entries, key)
+			} else if refreshAll || entry.LastUnmount.IsZero() {
+				unmounts = append(unmounts, unmountParams{entry: entry, retry: true})
+			}
+		} else {
+			if refreshAll || entry.LastMount.IsZero() {
+				mounts = append(mounts, mountParams{entry: entry, attr: p.names[key.name]})
 			}
 		}
 	}
-	return st
+	return mounts, unmounts
 }
 
-// TODO(toddw): sort the names/servers so that the output order is stable.
-func (ps *pubState) debugString() string {
-	l := make([]string, 2+len(ps.mounts))
-	l = append(l, fmt.Sprintf("Publisher period:%v deadline:%v", ps.period, ps.deadline))
-	l = append(l, "==============================Mounts============================================")
-	for key, status := range ps.mounts {
-		l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v, %s) unmount(%v, %v)", key.name, key.server, status.LastMount, status.LastMountErr, status.TTL, status.LastUnmount, status.LastUnmountErr))
+func (p *T) updateEntries(mountEntries, unmountEntries []rpc.PublisherEntry) {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	for _, entry := range mountEntries {
+		key := publishKey{entry.Name, entry.Server}
+		// Ensure that the DesiredState, that may have been changed while the
+		// lock was released, is not overwritten by the DesiredState of entry.
+		if current, ok := p.entries[key]; ok {
+			entry.DesiredState = current.DesiredState
+		}
+		p.entries[key] = entry
 	}
-	return strings.Join(l, "\n")
+	for _, entry := range unmountEntries {
+		key := publishKey{entry.Name, entry.Server}
+		// Ensure that we don't delete the entry if the DesiredState was
+		// changed while the lock was released.
+		if current, ok := p.entries[key]; ok {
+			entry.DesiredState = current.DesiredState
+		}
+		if entry.DesiredState == rpc.PublisherUnmounted && entry.LastUnmountErr == nil {
+			delete(p.entries, key)
+		} else {
+			p.entries[key] = entry
+		}
+	}
+	close(p.dirty)
+	p.dirty = make(chan struct{})
+}
+
+func (p *T) stop() {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	p.names = nil
+	p.servers = nil
+	// We make one final attempt to unmount everything; we ignore failures here,
+	// and don't retry, since the mounts will eventually timeout anyways.
+	for _, pe := range p.entries {
+		p.unmount(unmountParams{entry: pe, retry: false})
+	}
+	p.cancel()
+	close(p.dirty)
+}
+
+func (p *T) notifyChanged() {
+	// We ensure that callers of this function (i.e AddName, RemoveName, AddServer
+	// RemoveServer) do not block. We do this by giving p.changed a buffer of size 1,
+	// and adding a default clause below. This allows multiple calls to notifyChanged
+	// to complete before the internal goroutine processes the change.
+	select {
+	case p.changed <- struct{}{}:
+	default:
+	}
+}
+
+type mountParams struct {
+	entry rpc.PublisherEntry
+	attr  nameAttr
+}
+
+// mount makes an mount RPC to the entry described in params. It returns a new
+// rpc.PublisherEntry, updated with the results of the RPC.
+func (p *T) mount(params mountParams) rpc.PublisherEntry {
+	last, entry, attr := params.entry, params.entry, params.attr
+	// Always mount with ttl = period + slack.
+	// The next call to publish call will occur within the next period.
+	ttl := p.period + mountTTLSlack
+	entry.LastMount = time.Now()
+	// Ensure that LastMount > LastUnmount to make it easier to check for the last
+	// tried operation.
+	if entry.LastMount.Before(entry.LastUnmount) {
+		entry.LastMount = entry.LastUnmount.Add(1)
+	}
+	entry.LastMountErr = p.ns.Mount(p.ctx, entry.Name, entry.Server, ttl, naming.ServesMountTable(attr.servesMT), naming.IsLeaf(attr.isLeaf))
+	entry.TTL = ttl
+	// If the mount entry changed, log it.
+	if entry.LastMountErr != nil {
+		if verror.ErrorID(last.LastMountErr) != verror.ErrorID(entry.LastMountErr) || p.ctx.V(2) {
+			p.ctx.Errorf("rpc pub: couldn't mount(%v, %v, %v): %v", entry.Name, entry.Server, ttl, entry.LastMountErr)
+		}
+	} else {
+		entry.LastState = rpc.PublisherMounted
+		if last.LastMount.IsZero() || last.LastMountErr != nil || p.ctx.V(2) {
+			p.ctx.Infof("rpc pub: mount(%v, %v, %v)", entry.Name, entry.Server, ttl)
+		}
+	}
+	return entry
+}
+
+type unmountParams struct {
+	entry rpc.PublisherEntry
+	retry bool
+}
+
+// unmount makes an unmount RPC to the entry described in params. It returns a
+// new rpc.PublisherEntry, updated with the results of the RPC.
+func (p *T) unmount(params unmountParams) rpc.PublisherEntry {
+	entry := params.entry
+	var opts []naming.NamespaceOpt
+	if !params.retry {
+		opts = []naming.NamespaceOpt{options.NoRetry{}}
+	}
+	entry.LastUnmount = time.Now()
+	// Ensure that LastUnmount > LastMount to make it easier to check for the last
+	// tried operation.
+	if entry.LastUnmount.Before(entry.LastMount) {
+		entry.LastUnmount = entry.LastMount.Add(1)
+	}
+	entry.LastUnmountErr = p.ns.Unmount(p.ctx, entry.Name, entry.Server, opts...)
+	if entry.LastUnmountErr != nil {
+		p.ctx.Errorf("rpc pub: couldn't unmount(%v, %v): %v", entry.Name, entry.Server, entry.LastUnmountErr)
+	} else {
+		entry.LastState = rpc.PublisherUnmounted
+		p.ctx.VI(1).Infof("rpc pub: unmount(%v, %v)", entry.Name, entry.Server)
+	}
+	return entry
 }
diff --git a/lib/publisher/publisher_test.go b/lib/publisher/publisher_test.go
index 2c468cb..6386bb4 100644
--- a/lib/publisher/publisher_test.go
+++ b/lib/publisher/publisher_test.go
@@ -5,7 +5,6 @@
 package publisher_test
 
 import (
-	"fmt"
 	"reflect"
 	"sort"
 	"testing"
@@ -14,6 +13,7 @@
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/namespace"
+	"v.io/v23/rpc"
 
 	"v.io/x/ref/lib/publisher"
 	_ "v.io/x/ref/runtime/factories/generic"
@@ -51,7 +51,8 @@
 	ctx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 	ns := v23.GetNamespace(ctx)
-	pub := publisher.New(ctx, ns, time.Second)
+	pubctx, cancel := context.WithCancel(ctx)
+	pub := publisher.New(pubctx, ns, time.Second)
 	pub.AddName("foo", false, false)
 	pub.AddServer("foo:8000")
 	if got, want := resolveWithRetry(t, ns, ctx, "foo", 1), []string{"/foo:8000"}; !reflect.DeepEqual(got, want) {
@@ -71,17 +72,19 @@
 	}
 	pub.RemoveName("foo")
 	verifyMissing(t, ns, ctx, "foo")
-	pub.Stop()
-	pub.WaitForStop()
+
+	cancel()
+	<-pub.Closed()
 }
 
 func TestStatus(t *testing.T) {
 	ctx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 	ns := v23.GetNamespace(ctx)
-	pub := publisher.New(ctx, ns, time.Second)
+	pubctx, cancel := context.WithCancel(ctx)
+	pub := publisher.New(pubctx, ns, time.Second)
 	pub.AddName("foo", false, false)
-	status := pub.Status()
+	status, _ := pub.Status()
 	if got, want := len(status), 0; got != want {
 		t.Errorf("got %d, want %d", got, want)
 	}
@@ -89,54 +92,94 @@
 
 	// Wait for the publisher to asynchronously publish the
 	// requisite number of servers.
-	ch := make(chan error, 1)
 	waitFor := func(n int) {
-		deadline := time.Now().Add(time.Minute)
 		for {
-			status = pub.Status()
+			status, dirty := pub.Status()
 			if got, want := len(status), n; got != want {
-				if time.Now().After(deadline) {
-					ch <- fmt.Errorf("got %d, want %d", got, want)
-					return
-				}
-				time.Sleep(100 * time.Millisecond)
+				<-dirty
 			} else {
-				ch <- nil
 				return
 			}
 		}
 	}
 
-	go waitFor(1)
-	if err := <-ch; err != nil {
-		t.Fatalf("%s", err)
-	}
+	waitFor(1)
 
 	pub.AddServer("bar:8000")
 	pub.AddName("baz", false, false)
 
-	go waitFor(4)
-	if err := <-ch; err != nil {
-		t.Fatalf("%s", err)
-	}
+	waitFor(4)
 
-	status = pub.Status()
-	names := status.Names()
-	if got, want := names, []string{"baz", "foo"}; !reflect.DeepEqual(got, want) {
+	status, _ = pub.Status()
+	names, servers := publisherNamesAndServers(status)
+	// There will be two of each name and two of each server in the mount entries.
+	if got, want := names, []string{"baz", "baz", "foo", "foo"}; !reflect.DeepEqual(got, want) {
 		t.Errorf("got %q, want %q", got, want)
 	}
-	servers := status.Servers()
-	if got, want := servers, []string{"bar:8000", "foo:8000"}; !reflect.DeepEqual(got, want) {
+	if got, want := servers, []string{"bar:8000", "bar:8000", "foo:8000", "foo:8000"}; !reflect.DeepEqual(got, want) {
 		t.Errorf("got %q, want %q", got, want)
 	}
 	pub.RemoveName("foo")
+	waitFor(2)
 	verifyMissing(t, ns, ctx, "foo")
 
-	status = pub.Status()
-	go waitFor(2)
-	if err := <-ch; err != nil {
-		t.Fatalf("%s", err)
+	status, _ = pub.Status()
+	names, servers = publisherNamesAndServers(status)
+	if got, want := names, []string{"baz", "baz"}; !reflect.DeepEqual(got, want) {
+		t.Errorf("got %q, want %q", got, want)
 	}
-	pub.Stop()
-	pub.WaitForStop()
+	if got, want := servers, []string{"bar:8000", "foo:8000"}; !reflect.DeepEqual(got, want) {
+		t.Errorf("got %q, want %q", got, want)
+	}
+
+	cancel()
+	<-pub.Closed()
+}
+
+func TestRemoveFailedAdd(t *testing.T) {
+	// Test that removing an already unmounted name (due to an error while mounting),
+	// results in a Status that has no entries.
+	// We use v23.Init instead of v23.InitWithMounTable to make all calls to the mounttable fail.
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	ns := v23.GetNamespace(ctx)
+	pubctx, cancel := context.WithCancel(ctx)
+	pub := publisher.New(pubctx, ns, time.Second)
+	pub.AddServer("foo:8000")
+	// Adding a name should result in one entry in the publisher with state PublisherMounting, since
+	// it can never successfully mount.
+	pub.AddName("foo", false, false)
+	status, _ := pub.Status()
+	if got, want := len(status), 1; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	// We want to ensure that the LastState is either:
+	// PublisherUnmounted if the publisher hasn't tried to mount yet, or
+	// PublisherMounting if the publisher tried to mount but failed.
+	if got, want := status[0].LastState, rpc.PublisherMounting; got > want {
+		t.Fatalf("got %s, want %s", got, want)
+	}
+	// Removing "foo" should result in an empty Status.
+	pub.RemoveName("foo")
+	for {
+		status, dirty := pub.Status()
+		if got, want := len(status), 0; got != want {
+			<-dirty
+		} else {
+			return
+		}
+	}
+
+	cancel()
+	<-pub.Closed()
+}
+
+func publisherNamesAndServers(entries []rpc.PublisherEntry) (names []string, servers []string) {
+	for _, e := range entries {
+		names = append(names, e.Name)
+		servers = append(servers, e.Server)
+	}
+	sort.Strings(names)
+	sort.Strings(servers)
+	return names, servers
 }
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index 55eac9e..b77ac37 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -29,6 +29,17 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
+	waitForEndpoints := func(server rpc.Server, n int) rpc.ServerStatus {
+		for {
+			status := server.Status()
+			if got, want := len(status.Endpoints), n; got != want {
+				<-status.Valid
+			} else {
+				return status
+			}
+		}
+	}
+
 	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
 		return NewClient(ctx, opts...)
 	})
@@ -67,18 +78,10 @@
 	n1 := netstate.NewNetAddr("ip", "1.1.1.1")
 	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
 
-	change := status.Valid
-
 	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
-	// We should be notified of a network change.
-	<-change
-	status = server.Status()
-	eps := status.Endpoints
-	change = status.Valid
 	// We expect 4 new endpoints, 2 for each valid listen call.
-	if got, want := len(eps), len(prevEps)+4; got != want {
-		t.Errorf("got %v, want %v", got, want)
-	}
+	status = waitForEndpoints(server, len(prevEps)+4)
+	eps := status.Endpoints
 	// We expect the added networks to be in the new endpoints.
 	if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 2; got != want {
 		t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
@@ -90,14 +93,8 @@
 
 	// Now remove a network.
 	ch <- roaming.NewRmAddrsSetting([]net.Addr{n1})
-	<-change
-	status = server.Status()
+	status = waitForEndpoints(server, len(prevEps)-2)
 	eps = status.Endpoints
-	change = status.Valid
-	// We expect 2 endpoints to be missing.
-	if got, want := len(eps), len(prevEps)-2; got != want {
-		t.Errorf("got %v, want %v", got, want)
-	}
 	// We expect the removed network to not be in the new endpoints.
 	if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 0; got != want {
 		t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
@@ -106,21 +103,18 @@
 
 	// Now remove everything, essentially "disconnected from the network"
 	ch <- roaming.NewRmAddrsSetting(getIPAddrs(prevEps))
-	<-change
-	status = server.Status()
-	eps = status.Endpoints
-	change = status.Valid
 	// We expect there to be only the bidi endpoint.
+	status = waitForEndpoints(server, 1)
+	eps = status.Endpoints
 	if got, want := len(eps), 1; got != want && eps[0].Addr().Network() != "bidi" {
 		t.Errorf("got %v, want %v", got, want)
 	}
 
 	// Now if we reconnect to a network it should should up.
 	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1})
-	<-change
-	status = server.Status()
-	eps = status.Endpoints
 	// We expect 2 endpoints to be added
+	status = waitForEndpoints(server, 2)
+	eps = status.Endpoints
 	if got, want := len(eps), 2; got != want {
 		t.Errorf("got %v, want %v", got, want)
 	}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 19ed9dc..e812c67 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -62,13 +62,13 @@
 	ctx               *context.T
 	cancel            context.CancelFunc // function to cancel the above context.
 	flowMgr           flow.Manager
-	publisher         publisher.Publisher // publisher to publish mounttable mounts.
-	settingsPublisher *pubsub.Publisher   // pubsub publisher for dhcp
+	settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
 	valid             chan struct{}
 	blessings         security.Blessings
 	typeCache         *typeCache
 	state             rpc.ServerState // the current state of the server.
 	stopProxy         context.CancelFunc
+	publisher         *publisher.T // publisher to publish mounttable mounts.
 
 	endpoints map[string]*inaming.Endpoint                 // endpoints that the server is listening on.
 	lnErrors  map[struct{ Protocol, Address string }]error // errors from listening
@@ -186,7 +186,10 @@
 		s.cancel()
 		return ctx, nil, err
 	}
-	s.publisher = publisher.New(s.ctx, v23.GetNamespace(s.ctx), publishPeriod)
+	pubctx, pubcancel := context.WithCancel(s.ctx)
+	s.publisher = publisher.New(pubctx, v23.GetNamespace(s.ctx), publishPeriod)
+	s.active.Add(1)
+	go s.monitorPubStatus(ctx)
 
 	// TODO(caprita): revist printing the blessings with string, and
 	// instead expose them as a list.
@@ -209,13 +212,13 @@
 		defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
 
 		s.stats.stop()
-		s.publisher.Stop()
+		pubcancel()
 		s.stopProxy()
 
 		done := make(chan struct{})
 		go func() {
 			s.flowMgr.StopListening(ctx)
-			s.publisher.WaitForStop()
+			<-s.publisher.Closed()
 			// At this point no new flows should arrive.  Wait for existing calls
 			// to complete.
 			s.active.Wait()
@@ -233,9 +236,9 @@
 		// ongoing requests.  Hopefully this will bring all outstanding
 		// operations to a close.
 		s.cancel()
-		// Note that since the context has been canceled, publisher.WaitForStop and <-flowMgr.Closed()
+		// Note that since the context has been canceled, <-publisher.Closed() and <-flowMgr.Closed()
 		// should return right away.
-		s.publisher.WaitForStop()
+		<-s.publisher.Closed()
 		<-s.flowMgr.Closed()
 		s.Lock()
 		close(s.valid)
@@ -251,11 +254,39 @@
 	return s.ctx, s, nil
 }
 
+// monitorPubStatus guarantees that the ServerStatus.Valid channel is closed
+// when the publisher state becomes dirty. Since we also get the publisher.Status()
+// in the Status method, its possible that the Valid channel in the returned
+// ServerStatus will close spuriously by this goroutine.
+func (s *server) monitorPubStatus(ctx *context.T) {
+	defer s.active.Done()
+	var pubDirty <-chan struct{}
+	s.Lock()
+	_, pubDirty = s.publisher.Status()
+	s.Unlock()
+	for {
+		select {
+		case <-pubDirty:
+			s.Lock()
+			_, pubDirty = s.publisher.Status()
+			s.updateValidLocked()
+			s.Unlock()
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
 func (s *server) Status() rpc.ServerStatus {
 	status := rpc.ServerStatus{}
 	status.ServesMountTable = s.servesMountTable
-	status.Mounts = s.publisher.Status()
 	s.Lock()
+	// We call s.publisher.Status here instead of using a publisher status cached
+	// by s.monitorPubStatus, because we want to guarantee that s.AddName/AddServer
+	// calls have the added publisher entries in the returned s.Status() immediately.
+	// i.e. s.AddName("foo")
+	//      s.Status().PublisherStatus // Should have entry an for "foo".
+	status.PublisherStatus, _ = s.publisher.Status()
 	status.Valid = s.valid
 	status.State = s.state
 	for _, e := range s.endpoints {
diff --git a/runtime/internal/rpc/test/server_test.go b/runtime/internal/rpc/test/server_test.go
index a2e201f..a2ca391 100644
--- a/runtime/internal/rpc/test/server_test.go
+++ b/runtime/internal/rpc/test/server_test.go
@@ -18,6 +18,7 @@
 	"v.io/v23/security"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	"v.io/x/ref/test"
+	"v.io/x/ref/test/testutil"
 )
 
 type noMethodsType struct{ Field string }
@@ -137,7 +138,7 @@
 	waitForStatus(rpc.ServerStopped)
 }
 
-func TestMountStatus(t *testing.T) {
+func TestPublisherStatus(t *testing.T) {
 	ctx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 
@@ -152,44 +153,26 @@
 	if err != nil {
 		t.Fatal(err)
 	}
-	status := server.Status()
+	status := testutil.WaitForServerPublished(server)
+	if got, want := len(status.PublisherStatus), 2; got != want {
+		t.Errorf("got %d, want %d", got, want)
+	}
 	eps := server.Status().Endpoints
 	if got, want := len(eps), 2; got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
 	setLeafEndpoints(eps)
-	if got, want := len(status.Mounts), 2; got != want {
-		t.Fatalf("got %d, want %d", got, want)
-	}
-	servers := status.Mounts.Servers()
-	if got, want := len(servers), 2; got != want {
-		t.Fatalf("got %d, want %d", got, want)
-	}
-	if got, want := servers, endpointToStrings(eps); !reflect.DeepEqual(got, want) {
-		t.Fatalf("got %v, want %v", got, want)
-	}
 
 	// Add a second name and we should now see 4 mounts, 2 for each name.
 	if err := server.AddName("bar"); err != nil {
 		t.Fatal(err)
 	}
-	status = server.Status()
-	if got, want := len(status.Mounts), 4; got != want {
-		t.Fatalf("got %d, want %d", got, want)
-	}
-	servers = status.Mounts.Servers()
-	if got, want := len(servers), 2; got != want {
-		t.Fatalf("got %d, want %d", got, want)
-	}
-	if got, want := servers, endpointToStrings(eps); !reflect.DeepEqual(got, want) {
-		t.Fatalf("got %v, want %v", got, want)
-	}
-	names := status.Mounts.Names()
-	if got, want := len(names), 2; got != want {
-		t.Fatalf("got %d, want %d", got, want)
+	status = testutil.WaitForServerPublished(server)
+	if got, want := len(status.PublisherStatus), 4; got != want {
+		t.Errorf("got %d, want %d", got, want)
 	}
 	serversPerName := map[string][]string{}
-	for _, ms := range status.Mounts {
+	for _, ms := range status.PublisherStatus {
 		serversPerName[ms.Name] = append(serversPerName[ms.Name], ms.Server)
 	}
 	if got, want := len(serversPerName), 2; got != want {
@@ -197,7 +180,11 @@
 	}
 	for _, name := range []string{"foo", "bar"} {
 		if got, want := len(serversPerName[name]), 2; got != want {
-			t.Fatalf("got %d, want %d", got, want)
+			t.Errorf("got %d, want %d", got, want)
+		}
+		sort.Strings(serversPerName[name])
+		if got, want := serversPerName[name], endpointToStrings(eps); !reflect.DeepEqual(got, want) {
+			t.Errorf("got %v, want %v", got, want)
 		}
 	}
 }
diff --git a/services/mounttable/mounttablelib/servers.go b/services/mounttable/mounttablelib/servers.go
index b0f6eb0..5e289d9 100644
--- a/services/mounttable/mounttablelib/servers.go
+++ b/services/mounttable/mounttablelib/servers.go
@@ -35,8 +35,17 @@
 		return "", nil, err
 	}
 	stopFuncs = append(stopFuncs, mtServer.Stop)
-	mtEndpoints := mtServer.Status().Endpoints
-	mtName := mtEndpoints[0].Name()
+	var mtName string
+	var mtEndpoints []naming.Endpoint
+	for {
+		status := mtServer.Status()
+		mtEndpoints = status.Endpoints
+		mtName = mtEndpoints[0].Name()
+		if mtEndpoints[0].Addr().Network() != "bidi" {
+			break
+		}
+		<-status.Valid
+	}
 	ctx.Infof("Mount table service at: %q endpoint: %s", mountName, mtName)
 
 	if len(nhName) > 0 {
diff --git a/services/wspr/internal/browspr/browspr_test.go b/services/wspr/internal/browspr/browspr_test.go
index 25594f9..86d933f 100644
--- a/services/wspr/internal/browspr/browspr_test.go
+++ b/services/wspr/internal/browspr/browspr_test.go
@@ -94,7 +94,7 @@
 found:
 	for {
 		status := mockServer.Status()
-		for _, v := range status.Mounts {
+		for _, v := range status.PublisherStatus {
 			if v.Name == mockServerName && v.Server == mockServerEndpoint.String() && !v.LastMount.IsZero() {
 				if v.LastMountErr != nil {
 					t.Fatalf("Failed to mount %s: %v", v.Name, v.LastMountErr)
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index 47544d6..1472915 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -577,22 +577,22 @@
 	lastErrors := map[string]string{}
 	for {
 		status := s.server.Status()
-		for _, mountStatus := range status.Mounts {
+		for _, e := range status.PublisherStatus {
 			var errMsg string
-			if mountStatus.LastMountErr != nil {
-				errMsg = mountStatus.LastMountErr.Error()
+			if e.LastMountErr != nil {
+				errMsg = e.LastMountErr.Error()
 			}
-			mountName := mountStatus.Name
-			if lastMessage, ok := lastErrors[mountName]; !ok || errMsg != lastMessage {
+			name := e.Name
+			if lastMessage, ok := lastErrors[name]; !ok || errMsg != lastMessage {
 				if errMsg == "" {
 					s.helper.SendLogMessage(
-						lib.LogLevelInfo, "serve: "+mountName+" successfully mounted ")
+						lib.LogLevelInfo, "serve: "+name+" successfully mounted ")
 				} else {
 					s.helper.SendLogMessage(
-						lib.LogLevelError, "serve: "+mountName+" failed with: "+errMsg)
+						lib.LogLevelError, "serve: "+name+" failed with: "+errMsg)
 				}
 			}
-			lastErrors[mountName] = errMsg
+			lastErrors[name] = errMsg
 		}
 		select {
 		case <-time.After(10 * time.Second):
diff --git a/services/xproxy/xproxy/proxy.go b/services/xproxy/xproxy/proxy.go
index 4ce8d36..171f8c05 100644
--- a/services/xproxy/xproxy/proxy.go
+++ b/services/xproxy/xproxy/proxy.go
@@ -31,7 +31,7 @@
 
 type proxy struct {
 	m      flow.Manager
-	pub    publisher.Publisher
+	pub    *publisher.T
 	closed chan struct{}
 	auth   security.Authorizer
 	wg     sync.WaitGroup
@@ -84,8 +84,7 @@
 		p.mu.Lock()
 		p.closing = true
 		p.mu.Unlock()
-		p.pub.Stop()
-		p.pub.WaitForStop()
+		<-p.pub.Closed()
 		p.wg.Wait()
 		<-p.m.Closed()
 		close(p.closed)
diff --git a/test/testutil/rpc.go b/test/testutil/rpc.go
new file mode 100644
index 0000000..ac599d5
--- /dev/null
+++ b/test/testutil/rpc.go
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package testutil
+
+import (
+	"v.io/v23/rpc"
+)
+
+// WaitForServerPublished blocks until all published mounts/unmounts have reached
+// their desired state, and returns the resulting server status.
+func WaitForServerPublished(s rpc.Server) rpc.ServerStatus {
+	for {
+		status := s.Status()
+		if checkAllPublished(status) {
+			return status
+		}
+		<-status.Valid
+	}
+}
+
+func checkAllPublished(status rpc.ServerStatus) bool {
+	for _, e := range status.PublisherStatus {
+		if e.LastState != e.DesiredState {
+			return false
+		}
+	}
+	return true
+}