blob: eb69bb7531b5191aebec80f4ac265034c41f52cc [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package publisher provides a type to publish names to a mounttable.
package publisher
import (
"fmt"
"strings"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/namespace"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/verror"
)
// The publisher adds this much slack to each TTL.
const mountTTLSlack = 20 * time.Second
// T manages the publishing of names and servers in the mounttable.
// It spawns an internal goroutine the periodically performs mount and unmount
// rpcs. T is safe to use concurrently.
type T struct {
ctx *context.T // context used to make rpcs
cancel context.CancelFunc // cancel function for the above ctx
ns namespace.T
period time.Duration
closed chan struct{} // closed when the Publisher is closed
mu sync.Mutex
changed chan struct{}
dirty chan struct{}
names map[string]nameAttr // names that have been added
servers map[string]bool // servers that have been added
entries map[publishKey]rpc.PublisherEntry // map each (name,server) to its entry
}
type nameAttr struct {
servesMT bool
isLeaf bool
}
type publishKey struct {
name, server string
}
// New returns a new publisher that updates mounts on ns every period, and when
// changes are made to the state.
func New(ctx *context.T, ns namespace.T, period time.Duration) *T {
p := &T{
ns: ns,
period: period,
closed: make(chan struct{}),
changed: make(chan struct{}, 1),
dirty: make(chan struct{}),
names: make(map[string]nameAttr),
servers: make(map[string]bool),
entries: make(map[publishKey]rpc.PublisherEntry),
}
timer := time.NewTimer(period) // timer for the next refresh publish call
// We create a new root context so that unmount RPCs can work even after the ctx
// passed in is closed.
p.ctx, p.cancel = context.WithRootCancel(ctx)
go func() {
for {
select {
case <-ctx.Done():
timer.Stop()
p.stop()
close(p.closed)
return
case <-timer.C:
timer.Reset(period)
p.publish(true)
case <-p.changed:
p.publish(false)
}
}
}()
return p
}
// AddName adds a new name for all servers to be mounted as.
func (p *T) AddName(name string, mt bool, leaf bool) {
defer p.mu.Unlock()
p.mu.Lock()
if p.names == nil {
return
}
if attr, exists := p.names[name]; exists && (attr.servesMT == mt && attr.isLeaf == leaf) {
return
}
p.names[name] = nameAttr{mt, leaf}
for server := range p.servers {
key := publishKey{name, server}
if pe, ok := p.entries[key]; ok {
pe.DesiredState = rpc.PublisherMounted
p.entries[key] = pe
} else {
p.entries[key] = rpc.PublisherEntry{Name: name, Server: server, DesiredState: rpc.PublisherMounted}
}
}
p.notifyChanged()
}
// RemoveName removes a name.
func (p *T) RemoveName(name string) {
defer p.mu.Unlock()
p.mu.Lock()
if p.names == nil {
return
}
if _, exists := p.names[name]; !exists {
return
}
delete(p.names, name)
for server := range p.servers {
key := publishKey{name, server}
if pe, ok := p.entries[key]; ok {
pe.DesiredState = rpc.PublisherUnmounted
p.entries[key] = pe
}
}
p.notifyChanged()
}
// AddServer adds a new server to be mounted under all names.
func (p *T) AddServer(server string) {
defer p.mu.Unlock()
p.mu.Lock()
if p.names == nil {
return
}
if _, exists := p.servers[server]; exists {
return
}
p.servers[server] = true
for name := range p.names {
key := publishKey{name, server}
if pe, ok := p.entries[key]; ok {
pe.DesiredState = rpc.PublisherMounted
p.entries[key] = pe
} else {
p.entries[key] = rpc.PublisherEntry{Name: name, Server: server, DesiredState: rpc.PublisherMounted}
}
}
p.notifyChanged()
}
// RemoveServer removes a server from the list of mounts.
func (p *T) RemoveServer(server string) {
defer p.mu.Unlock()
p.mu.Lock()
if p.names == nil {
return
}
if _, exists := p.servers[server]; !exists {
return
}
delete(p.servers, server)
for name := range p.names {
key := publishKey{name, server}
if pe, ok := p.entries[key]; ok {
pe.DesiredState = rpc.PublisherUnmounted
p.entries[key] = pe
}
}
p.notifyChanged()
}
// Status returns a snapshot of the publisher's current state.
// The returned channel is closed when the state has become stale and the caller
// should repoll Status.
func (p *T) Status() ([]rpc.PublisherEntry, <-chan struct{}) {
defer p.mu.Unlock()
p.mu.Lock()
st := make([]rpc.PublisherEntry, 0, len(p.entries))
now := time.Now()
for _, e := range p.entries {
mountDelta := now.Sub(e.LastMount)
switch {
case e.LastMount.IsZero() && e.LastUnmount.IsZero():
e.LastState = rpc.PublisherUnmounted
case e.LastUnmount.After(e.LastMount) && e.LastUnmountErr == nil:
e.LastState = rpc.PublisherUnmounted
case mountDelta > p.period+2*mountTTLSlack:
e.LastState = rpc.PublisherUnmounted
case mountDelta < p.period:
e.LastState = rpc.PublisherMounted
case e.LastUnmount.After(e.LastMount):
e.LastState = rpc.PublisherUnmounting
default:
e.LastState = rpc.PublisherMounting
}
st = append(st, e)
}
return st, p.dirty
}
// String returns a string representation of the publisher.
func (p *T) String() string {
defer p.mu.Unlock()
p.mu.Lock()
l := make([]string, 0, 2+len(p.entries))
l = append(l, fmt.Sprintf("Publisher period:%v", p.period))
l = append(l, "==============================Mounts============================================")
for key, entry := range p.entries {
l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v, %v) unmount(%v, %v) Last: %s, Desired: %s ", key.name, key.server,
entry.LastMount, entry.LastMountErr, entry.TTL, entry.LastUnmount, entry.LastUnmountErr, entry.LastState, entry.DesiredState))
}
return strings.Join(l, "\n")
}
// Closed returns a channel that is closed when the publisher context is cancelled,
// and all unmount operations terminate.
func (p *T) Closed() <-chan struct{} {
return p.closed
}
// publish makes RPCs to the mounttable to mount and unmount entries.
// If refreshAll is true, then all entries will be refreshed.
// Otherwise fresh changes in entries will be updated (i.e. AddName, RemoveName, etc.)
func (p *T) publish(refreshAll bool) {
mounts, unmounts := p.entriesToPublish(refreshAll)
// TODO(suharshs): We could potentially do these mount and unmount rpcs in parallel.
mountEntries := make([]rpc.PublisherEntry, 0, len(mounts))
unmountEntries := make([]rpc.PublisherEntry, 0, len(unmounts))
for _, params := range mounts {
mountEntries = append(mountEntries, p.mount(params))
}
for _, params := range unmounts {
unmountEntries = append(unmountEntries, p.unmount(params))
}
// Update p.entries with the new entries.
p.updateEntries(mountEntries, unmountEntries)
}
func (p *T) entriesToPublish(refreshAll bool) ([]mountParams, []unmountParams) {
defer p.mu.Unlock()
p.mu.Lock()
var mounts []mountParams
var unmounts []unmountParams
for key, entry := range p.entries {
if entry.DesiredState == rpc.PublisherUnmounted {
if entry.LastState == rpc.PublisherUnmounted {
delete(p.entries, key)
} else if refreshAll || entry.LastUnmount.IsZero() {
unmounts = append(unmounts, unmountParams{entry: entry, retry: true})
}
} else {
if refreshAll || entry.LastMount.IsZero() {
mounts = append(mounts, mountParams{entry: entry, attr: p.names[key.name]})
}
}
}
return mounts, unmounts
}
func (p *T) updateEntries(mountEntries, unmountEntries []rpc.PublisherEntry) {
defer p.mu.Unlock()
p.mu.Lock()
for _, entry := range mountEntries {
key := publishKey{entry.Name, entry.Server}
// Ensure that the DesiredState, that may have been changed while the
// lock was released, is not overwritten by the DesiredState of entry.
if current, ok := p.entries[key]; ok {
entry.DesiredState = current.DesiredState
}
p.entries[key] = entry
}
for _, entry := range unmountEntries {
key := publishKey{entry.Name, entry.Server}
// Ensure that we don't delete the entry if the DesiredState was
// changed while the lock was released.
if current, ok := p.entries[key]; ok {
entry.DesiredState = current.DesiredState
}
if entry.DesiredState == rpc.PublisherUnmounted && entry.LastUnmountErr == nil {
delete(p.entries, key)
} else {
p.entries[key] = entry
}
}
close(p.dirty)
p.dirty = make(chan struct{})
}
func (p *T) stop() {
defer p.mu.Unlock()
p.mu.Lock()
p.names = nil
p.servers = nil
// We make one final attempt to unmount everything; we ignore failures here,
// and don't retry, since the mounts will eventually timeout anyways.
for _, pe := range p.entries {
p.unmount(unmountParams{entry: pe, retry: false})
}
p.cancel()
close(p.dirty)
}
func (p *T) notifyChanged() {
// We ensure that callers of this function (i.e AddName, RemoveName, AddServer
// RemoveServer) do not block. We do this by giving p.changed a buffer of size 1,
// and adding a default clause below. This allows multiple calls to notifyChanged
// to complete before the internal goroutine processes the change.
select {
case p.changed <- struct{}{}:
default:
}
}
type mountParams struct {
entry rpc.PublisherEntry
attr nameAttr
}
// mount makes an mount RPC to the entry described in params. It returns a new
// rpc.PublisherEntry, updated with the results of the RPC.
func (p *T) mount(params mountParams) rpc.PublisherEntry {
last, entry, attr := params.entry, params.entry, params.attr
// Always mount with ttl = period + slack.
// The next call to publish call will occur within the next period.
ttl := p.period + mountTTLSlack
entry.LastMount = time.Now()
// Ensure that LastMount > LastUnmount to make it easier to check for the last
// tried operation.
if entry.LastMount.Before(entry.LastUnmount) {
entry.LastMount = entry.LastUnmount.Add(1)
}
entry.LastMountErr = p.ns.Mount(p.ctx, entry.Name, entry.Server, ttl, naming.ServesMountTable(attr.servesMT), naming.IsLeaf(attr.isLeaf))
entry.TTL = ttl
// If the mount entry changed, log it.
if entry.LastMountErr != nil {
if verror.ErrorID(last.LastMountErr) != verror.ErrorID(entry.LastMountErr) || p.ctx.V(2) {
p.ctx.Errorf("rpc pub: couldn't mount(%v, %v, %v): %v", entry.Name, entry.Server, ttl, entry.LastMountErr)
}
} else {
entry.LastState = rpc.PublisherMounted
if last.LastMount.IsZero() || last.LastMountErr != nil || p.ctx.V(2) {
p.ctx.Infof("rpc pub: mount(%v, %v, %v)", entry.Name, entry.Server, ttl)
}
}
return entry
}
type unmountParams struct {
entry rpc.PublisherEntry
retry bool
}
// unmount makes an unmount RPC to the entry described in params. It returns a
// new rpc.PublisherEntry, updated with the results of the RPC.
func (p *T) unmount(params unmountParams) rpc.PublisherEntry {
entry := params.entry
var opts []naming.NamespaceOpt
if !params.retry {
opts = []naming.NamespaceOpt{options.NoRetry{}}
}
entry.LastUnmount = time.Now()
// Ensure that LastUnmount > LastMount to make it easier to check for the last
// tried operation.
if entry.LastUnmount.Before(entry.LastMount) {
entry.LastUnmount = entry.LastMount.Add(1)
}
entry.LastUnmountErr = p.ns.Unmount(p.ctx, entry.Name, entry.Server, opts...)
if entry.LastUnmountErr != nil {
p.ctx.Errorf("rpc pub: couldn't unmount(%v, %v): %v", entry.Name, entry.Server, entry.LastUnmountErr)
} else {
entry.LastState = rpc.PublisherUnmounted
p.ctx.VI(1).Infof("rpc pub: unmount(%v, %v)", entry.Name, entry.Server)
}
return entry
}