Merge "v23/config: move it to x/lib/pubsub."
diff --git a/pubsub/.api b/pubsub/.api
new file mode 100644
index 0000000..2c1b816
--- /dev/null
+++ b/pubsub/.api
@@ -0,0 +1,35 @@
+pkg pubsub, func Format(Setting) string
+pkg pubsub, func NewAny(string, string, interface{}) Setting
+pkg pubsub, func NewBool(string, string, bool) Setting
+pkg pubsub, func NewDuration(string, string, time.Duration) Setting
+pkg pubsub, func NewFloat64(string, string, float64) Setting
+pkg pubsub, func NewInt(string, string, int) Setting
+pkg pubsub, func NewInt64(string, string, int64) Setting
+pkg pubsub, func NewPublisher() *Publisher
+pkg pubsub, func NewString(string, string, string) Setting
+pkg pubsub, method (*Any) Description() string
+pkg pubsub, method (*Any) Name() string
+pkg pubsub, method (*Any) String() string
+pkg pubsub, method (*Any) Value() interface{}
+pkg pubsub, method (*DurationFlag) Set(string) error
+pkg pubsub, method (*Publisher) CloseFork(string, chan<- Setting) error
+pkg pubsub, method (*Publisher) CreateStream(string, string, <-chan Setting) (<-chan struct{}, error)
+pkg pubsub, method (*Publisher) ForkStream(string, chan<- Setting) (*Stream, error)
+pkg pubsub, method (*Publisher) Latest(string) *Stream
+pkg pubsub, method (*Publisher) Shutdown()
+pkg pubsub, method (*Publisher) String() string
+pkg pubsub, method (DurationFlag) Get() interface{}
+pkg pubsub, method (DurationFlag) String() string
+pkg pubsub, type Any struct
+pkg pubsub, type DurationFlag struct
+pkg pubsub, type DurationFlag struct, embedded time.Duration
+pkg pubsub, type Publisher struct
+pkg pubsub, type Setting interface { Description, Name, String, Value }
+pkg pubsub, type Setting interface, Description() string
+pkg pubsub, type Setting interface, Name() string
+pkg pubsub, type Setting interface, String() string
+pkg pubsub, type Setting interface, Value() interface{}
+pkg pubsub, type Stream struct
+pkg pubsub, type Stream struct, Description string
+pkg pubsub, type Stream struct, Latest map[string]Setting
+pkg pubsub, type Stream struct, Name string
diff --git a/pubsub/config_test.go b/pubsub/config_test.go
new file mode 100644
index 0000000..09859d5
--- /dev/null
+++ b/pubsub/config_test.go
@@ -0,0 +1,291 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub_test
+
+import (
+	"fmt"
+	"math/rand"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+
+	"v.io/x/lib/pubsub"
+
+	"v.io/v23/verror"
+)
+
+func ExamplePublisher() {
+	in := make(chan pubsub.Setting)
+	pub := pubsub.NewPublisher()
+	pub.CreateStream("net", "network settings", in)
+
+	// A simple producer of IP address settings.
+	producer := func() {
+		in <- pubsub.NewString("ip", "address", "1.2.3.5")
+	}
+
+	var waiter sync.WaitGroup
+	waiter.Add(2)
+
+	// A simple consumer of IP address Settings.
+	consumer := func(ch chan pubsub.Setting) {
+		fmt.Println(<-ch)
+		waiter.Done()
+	}
+
+	// Publish an initial Setting to the Stream.
+	in <- pubsub.NewString("ip", "address", "1.2.3.4")
+
+	// Fork the stream twice, and read the latest value.
+	ch1 := make(chan pubsub.Setting)
+	st, _ := pub.ForkStream("net", ch1)
+	fmt.Println(st.Latest["ip"])
+	ch2 := make(chan pubsub.Setting)
+	st, _ = pub.ForkStream("net", ch2)
+	fmt.Println(st.Latest["ip"])
+
+	// Now we can read new Settings as they are generated.
+	go producer()
+	go consumer(ch1)
+	go consumer(ch2)
+
+	waiter.Wait()
+
+	// Output:
+	// ip: address: (string: 1.2.3.4)
+	// ip: address: (string: 1.2.3.4)
+	// ip: address: (string: 1.2.3.5)
+	// ip: address: (string: 1.2.3.5)
+}
+
+func ExampleShutdown() {
+	in := make(chan pubsub.Setting)
+	pub := pubsub.NewPublisher()
+	stop, _ := pub.CreateStream("net", "network settings", in)
+
+	var ready sync.WaitGroup
+	ready.Add(1)
+
+	// A producer to write 100 Settings before signalling that it's
+	// ready to be shutdown. This is purely to demonstrate how to use
+	// Shutdown.
+	producer := func() {
+		for i := 0; ; i++ {
+			select {
+			case <-stop:
+				close(in)
+				return
+			default:
+				in <- pubsub.NewString("ip", "address", "1.2.3.4")
+				if i == 100 {
+					ready.Done()
+				}
+			}
+		}
+	}
+
+	var waiter sync.WaitGroup
+	waiter.Add(2)
+
+	consumer := func() {
+		ch := make(chan pubsub.Setting, 10)
+		pub.ForkStream("net", ch)
+		i := 0
+		for {
+			if _, ok := <-ch; !ok {
+				// The channel has been closed when the publisher
+				// is asked to shut down.
+				break
+			}
+			i++
+		}
+		if i >= 100 {
+			// We've received at least 100 Settings as per the producer above.
+			fmt.Println("done")
+		}
+		waiter.Done()
+	}
+
+	go producer()
+	go consumer()
+	go consumer()
+	ready.Wait()
+	pub.Shutdown()
+	waiter.Wait()
+	// Output:
+	// done
+	// done
+}
+
+func TestSimple(t *testing.T) {
+	ch := make(chan pubsub.Setting, 2)
+	pub := pubsub.NewPublisher()
+	if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamDoesntExist" {
+		t.Errorf("missing or wrong error: %v", err)
+	}
+	if _, err := pub.CreateStream("stream", "example", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errNeedNonNilChannel" {
+		t.Fatalf("missing or wrong error: %v", err)
+	}
+	if _, err := pub.CreateStream("stream", "example", ch); err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	if _, err := pub.CreateStream("stream", "example", ch); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamExists" {
+		t.Fatalf("missing or wrong error: %v", err)
+	}
+	if got, want := pub.String(), "(stream: example)"; got != want {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	stop, err := pub.CreateStream("s2", "eg2", ch)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	if got, want := pub.String(), "(stream: example) (s2: eg2)"; got != want {
+		wantAlternate := "(s2: eg2) (stream: example)"
+		if got != wantAlternate {
+			t.Errorf("got %q, want %q or %q", got, want, wantAlternate)
+		}
+	}
+
+	got, want := pub.Latest("s2"), &pubsub.Stream{"s2", "eg2", nil}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	pub.Shutdown()
+	if _, running := <-stop; running {
+		t.Errorf("expected to be shutting down")
+	}
+	if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamShutDown" {
+		t.Errorf("missing or wrong error: %v", err)
+	}
+	if got, want := pub.String(), "shutdown"; got != want {
+		t.Errorf("got %s, want %s", got, want)
+	}
+
+}
+
+func producer(pub *pubsub.Publisher, in chan<- pubsub.Setting, stop <-chan struct{}, limit int, ch chan int, waiter *sync.WaitGroup) {
+	for i := 0; ; i++ {
+		select {
+		case <-stop:
+			ch <- i
+			waiter.Done()
+			// must close this channel, otherwise the Publisher will leak a goroutine for this stream.
+			close(in)
+			return
+		default:
+			// signal progress on ch, at limit/2, limit and when we're done (above)
+			switch {
+			case i == limit/2:
+				ch <- i
+			case i == limit:
+				ch <- i
+			}
+			if i%2 == 0 {
+				in <- pubsub.NewInt("i", "int", i)
+			} else {
+				in <- pubsub.NewFloat64("f", "float", float64(i))
+			}
+		}
+	}
+	panic("should never get here")
+}
+
+func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
+	ch := make(chan pubsub.Setting, bufsize)
+	st, _ := pub.ForkStream("net", ch)
+	i, i2 := 0, 0
+	if st.Latest["i"] != nil {
+		i = int(st.Latest["i"].Value().(int))
+	}
+	if st.Latest["f"] != nil {
+		i2 = int(st.Latest["f"].Value().(float64))
+	}
+	if i2 > i {
+		i = i2
+	}
+	i++
+	for s := range ch {
+		switch v := s.Value().(type) {
+		case int:
+			if i%2 != 0 {
+				t.Errorf("expected a float, got an int")
+				break
+			}
+			if v != i {
+				t.Errorf("got %d, want %d", v, i)
+			}
+		case float64:
+			if i%2 != 1 {
+				t.Errorf("expected an int, got a float")
+				break
+			}
+			if v != float64(i) {
+				t.Errorf("got %f, want %f", v, i)
+			}
+		}
+		i++
+	}
+	if i < limit {
+		t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+	}
+	waiter.Done()
+}
+
+func testStream(t *testing.T, consumerBufSize int) {
+	in := make(chan pubsub.Setting)
+	pub := pubsub.NewPublisher()
+	stop, _ := pub.CreateStream("net", "network settings", in)
+
+	rand.Seed(time.Now().UnixNano())
+	limit := rand.Intn(5000)
+	if limit < 100 {
+		limit = 100
+	}
+	t.Logf("limit: %d", limit)
+
+	var waiter sync.WaitGroup
+	waiter.Add(3)
+
+	progress := make(chan int)
+	go producer(pub, in, stop, limit, progress, &waiter)
+
+	i := <-progress
+	t.Logf("limit/2 = %d", i)
+
+	// We use a lot of buffering in this unittest to ensure that
+	// we never miss any settings.
+	go consumer(t, pub, limit, consumerBufSize, &waiter)
+	go consumer(t, pub, limit, consumerBufSize, &waiter)
+
+	reached := <-progress
+	pub.Shutdown()
+	shutdown := <-progress
+	t.Logf("reached %d, shut down at %d", reached, shutdown)
+
+	// Wait for all goroutines to finish.
+	waiter.Wait()
+}
+
+func TestStream(t *testing.T) {
+	testStream(t, 500)
+}
+
+func TestStreamSmallBuffers(t *testing.T) {
+	testStream(t, 1)
+}
+
+func TestDurationFlag(t *testing.T) {
+	d := &pubsub.DurationFlag{}
+	if err := d.Set("1s"); err != nil {
+		t.Errorf("unexpected error %s", err)
+	}
+	if got, want := d.Duration, time.Duration(time.Second); got != want {
+		t.Errorf("got %s, expected %s", got, want)
+	}
+	if err := d.Set("1t"); err == nil || err.Error() != "time: unknown unit t in duration 1t" {
+		t.Errorf("expected error %v", err)
+	}
+}
diff --git a/pubsub/model.go b/pubsub/model.go
new file mode 100644
index 0000000..f2bc2cd
--- /dev/null
+++ b/pubsub/model.go
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pubsub defines interfaces for accessing dynamically changing
+// process configuration information.
+//
+// Settings represent configuration parameters and their value. Settings
+// are published to named Streams. Streams are forked to add additional
+// consumers, i.e. readers of the Settings published to the Stream.
+//
+// Settings are represented by an interface type that wraps the data and
+// provides a name and description for each Settings. Streams are similarly
+// named and also have a description. When streams are 'forked' the latest
+// value of all Settings that have been sent over the Stream are made
+// available to the caller. This allows for a rendezvous between the single
+// producer of Settings and multiple consumers of those Settings that
+// may be added at arbitrary points in time.
+//
+// Streams are hosted by a Publisher type, which in addition to the methods
+// required for managing Streams provides a means to shut down all of the
+// Streams it hosts.
+package pubsub
+
+// Setting must be implemented by all data types to sent over Publisher
+// streams.
+type Setting interface {
+	String() string
+	// Name returns the name of the Setting
+	Name() string
+	// Description returns the description of the Setting
+	Description() string
+	// Value returns the value of the Setting.
+	Value() interface{}
+}
diff --git a/pubsub/publisher.go b/pubsub/publisher.go
new file mode 100644
index 0000000..e5e60fe
--- /dev/null
+++ b/pubsub/publisher.go
@@ -0,0 +1,229 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+	"fmt"
+	"strings"
+	"sync"
+
+	"v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/lib/pubsub"
+
+var (
+	errNeedNonNilChannel = verror.Register(pkgPath+".errNeedNonNilChannel", verror.NoRetry, "must provide a non-nil channel")
+	errStreamExists      = verror.Register(pkgPath+".errStreamExists", verror.NoRetry, "stream {3} already exists")
+	errStreamShutDown    = verror.Register(pkgPath+".errStreamShutDown", verror.NoRetry, "stream {3} has been shut down")
+	errStreamDoesntExist = verror.Register(pkgPath+".errStreamDoesntExist", verror.NoRetry, "stream {3} doesn't exist")
+)
+
+// A Publisher provides a mechanism for communicating Settings from a set
+// of producers to multiple consumers. Each such producer and associated
+// consumers are called a Stream. Operations are provided for creating
+// streams (CreateStream) and adding new consumers (ForkStream). Communication
+// is implemented using channels, with the producer and consumers providing
+// channels to send and receive Settings over. A Stream remembers the last value
+// of all Settings that were sent over it; these can be retrieved via ForkStream
+// or the Latest method.
+//
+// The Publisher may be shut down by calling its Shutdown method and
+// the producers will be notified via the channel returned by CreateStream,
+// at which point they should close the channel they use for publishing Settings.
+// If producers fail to close this channel then the Publisher will leak
+// goroutines (one per stream) when it is shutdown.
+type Publisher struct {
+	mu       sync.RWMutex
+	stop     chan struct{}
+	shutdown bool
+	streams  map[string]*fork
+}
+
+// Stream is returned by Latest and includes the name and description
+// for the stream and the most recent values of the Setting that flowed
+// through it.
+type Stream struct {
+	Name, Description string
+	// Latest is a map of Setting names to the Setting itself.
+	Latest map[string]Setting
+}
+
+// NewPublisher creates a Publisher.
+func NewPublisher() *Publisher {
+	return &Publisher{
+		streams: make(map[string]*fork),
+		stop:    make(chan struct{}),
+	}
+}
+
+type fork struct {
+	sync.RWMutex
+	desc string
+	vals map[string]Setting
+	in   <-chan Setting
+	outs []chan<- Setting
+}
+
+// CreateStream creates a Stream with the provided name and description
+// (note, Settings have their own names and description, these are for the
+// stream). In general, no buffering is required for this channel since
+// the Publisher itself will read from it, however, if the consumers are slow
+// then the publisher may be slow in draining the channel. The publisher
+// should provide additional buffering if this is a concern.
+// Consequently this mechanism should be used for rarely changing Settings,
+// such as network address changes forced by DHCP and hence no buffering
+// will be required. The channel returned by CreateStream is closed when the
+// publisher is shut down and hence the caller should wait for this to occur
+// and then close the channel it has passed to CreateStream.
+func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error) {
+	if ch == nil {
+		return nil, verror.New(errNeedNonNilChannel, nil)
+	}
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.streams[name] != nil {
+		return nil, verror.New(errStreamExists, nil, name)
+	}
+	f := &fork{desc: description, in: ch, vals: make(map[string]Setting)}
+	p.streams[name] = f
+	go f.flow(p.stop)
+	return p.stop, nil
+}
+
+// String returns a string representation of the publisher, including
+// the names and descriptions of all the streams it currently supports.
+func (p *Publisher) String() string {
+	r := ""
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+	if p.shutdown {
+		return "shutdown"
+	}
+	for k, s := range p.streams {
+		r += fmt.Sprintf("(%s: %s) ", k, s.desc)
+	}
+	return strings.TrimRight(r, " ")
+}
+
+// Latest returns information on the requested stream, including the
+// last instance of all Settings, if any, that flowed over it.
+func (p *Publisher) Latest(name string) *Stream {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	f := p.streams[name]
+	if f == nil {
+		return nil
+	}
+	var r map[string]Setting
+	f.RLock()
+	defer f.RUnlock()
+	for k, v := range f.vals {
+		r[k] = v
+	}
+	return &Stream{Name: name, Description: f.desc, Latest: r}
+}
+
+// ForkStream 'forks' the named stream to add a new consumer. The channel
+// provided is to be used to read Settings sent down the stream. This
+// channel will be closed by the Publisher when it is asked to shut down.
+// The reader on this channel must be able to keep up with the flow of Settings
+// through the Stream in order to avoid blocking all other readers and hence
+// should set an appropriate amount of buffering for the channel it passes in.
+// ForkStream returns the most recent values of all Settings previously
+// sent over the stream, thus allowing its caller to synchronise with the
+// stream.
+func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.shutdown {
+		return nil, verror.New(errStreamShutDown, nil, name)
+	}
+	f := p.streams[name]
+	if f == nil {
+		return nil, verror.New(errStreamDoesntExist, nil, name)
+	}
+	f.Lock()
+	defer f.Unlock()
+	r := make(map[string]Setting)
+	for k, v := range f.vals {
+		r[k] = v
+	}
+	f.outs = append(f.outs, ch)
+	return &Stream{Name: name, Description: f.desc, Latest: r}, nil
+}
+
+// CloseFork removes the specified channel from the named stream.
+// The caller must drain the channel before closing it.
+// TODO(cnicolaou): add tests for this.
+func (p *Publisher) CloseFork(name string, ch chan<- Setting) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.shutdown {
+		return verror.New(errStreamShutDown, nil, name)
+	}
+	f := p.streams[name]
+	if f == nil {
+		return verror.New(errStreamDoesntExist, nil, name)
+	}
+	f.Lock()
+	defer f.Unlock()
+	for i, v := range f.outs {
+		if v == ch {
+			f.outs = append(f.outs[0:i], f.outs[i+1:]...)
+			break
+		}
+	}
+	return nil
+}
+
+// Shutdown initiates the process of stopping the operation of the Publisher.
+// All of the channels passed to CreateStream must be closed by their owner
+// to ensure that all goroutines are garbage collected.
+func (p *Publisher) Shutdown() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.shutdown {
+		return
+	}
+	p.shutdown = true
+	close(p.stop)
+}
+
+func (f *fork) closeChans() {
+	f.Lock()
+	for _, o := range f.outs {
+		close(o)
+	}
+	f.outs = nil
+	f.Unlock()
+}
+
+func (f *fork) flow(stop chan struct{}) {
+	closed := false
+	for {
+		select {
+		case <-stop:
+			if !closed {
+				f.closeChans()
+				closed = true
+			}
+		case val, ok := <-f.in:
+			if !ok {
+				f.closeChans()
+				return
+			}
+			f.Lock()
+			f.vals[val.Name()] = val
+			cpy := make([]chan<- Setting, len(f.outs))
+			copy(cpy, f.outs)
+			f.Unlock()
+			for _, o := range cpy {
+				// We may well block here.
+				o <- val
+			}
+		}
+	}
+}
diff --git a/pubsub/types.go b/pubsub/types.go
new file mode 100644
index 0000000..1ee8d66
--- /dev/null
+++ b/pubsub/types.go
@@ -0,0 +1,91 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+	"fmt"
+
+	"time"
+)
+
+// Format formats a Setting in a consistent manner, it is intended to be
+// used when implementing the Setting interface.
+func Format(s Setting) string {
+	return fmt.Sprintf("%s: %s: (%T: %s)", s.Name(), s.Description(), s.Value(), s.Value())
+}
+
+// Type Any can be used to represent or implement a Setting of any type.
+type Any struct {
+	name, description string
+	value             interface{}
+}
+
+func (s *Any) String() string {
+	return Format(s)
+}
+
+func (s *Any) Name() string {
+	return s.name
+}
+
+func (s *Any) Description() string {
+	return s.description
+}
+
+func (s *Any) Value() interface{} {
+	return s.value
+}
+
+func NewAny(name, description string, value interface{}) Setting {
+	return &Any{name, description, value}
+}
+
+func NewInt(name, description string, value int) Setting {
+	return &Any{name, description, value}
+}
+
+func NewInt64(name, description string, value int64) Setting {
+	return &Any{name, description, value}
+}
+
+func NewBool(name, description string, value bool) Setting {
+	return &Any{name, description, value}
+}
+
+func NewFloat64(name, description string, value float64) Setting {
+	return &Any{name, description, value}
+}
+
+func NewString(name, description string, value string) Setting {
+	return &Any{name, description, value}
+}
+
+func NewDuration(name, description string, value time.Duration) Setting {
+	return &Any{name, description, value}
+}
+
+// DurationFlag implements flag.Value in order to provide validation of
+// duration values in the flag package.
+type DurationFlag struct{ time.Duration }
+
+// Implements flag.Value.Get
+func (d DurationFlag) Get() interface{} {
+	return d.Duration
+}
+
+// Implements flag.Value.Set
+func (d *DurationFlag) Set(s string) error {
+	duration, err := time.ParseDuration(s)
+	if err != nil {
+		return err
+	}
+	d.Duration = duration
+	return nil
+}
+
+// Implements flag.Value.String
+func (d DurationFlag) String() string {
+	return d.Duration.String()
+}