lib: moving pubsub from lib to ref/lib

The pubsub package has a bad dependency on v.io/v23, which prevents
the lib repository to be release publicly as part of the plan to
externalizing the v23 tool. This CL moves it to ref/lib as it is only
imported by packages under v.io/x/ref/...

MultiPart: 1/2

Change-Id: I3a41e5b48f58d602972978aa70cbfdee7bdf68b9
diff --git a/pubsub/.api b/pubsub/.api
deleted file mode 100644
index 2c1b816..0000000
--- a/pubsub/.api
+++ /dev/null
@@ -1,35 +0,0 @@
-pkg pubsub, func Format(Setting) string
-pkg pubsub, func NewAny(string, string, interface{}) Setting
-pkg pubsub, func NewBool(string, string, bool) Setting
-pkg pubsub, func NewDuration(string, string, time.Duration) Setting
-pkg pubsub, func NewFloat64(string, string, float64) Setting
-pkg pubsub, func NewInt(string, string, int) Setting
-pkg pubsub, func NewInt64(string, string, int64) Setting
-pkg pubsub, func NewPublisher() *Publisher
-pkg pubsub, func NewString(string, string, string) Setting
-pkg pubsub, method (*Any) Description() string
-pkg pubsub, method (*Any) Name() string
-pkg pubsub, method (*Any) String() string
-pkg pubsub, method (*Any) Value() interface{}
-pkg pubsub, method (*DurationFlag) Set(string) error
-pkg pubsub, method (*Publisher) CloseFork(string, chan<- Setting) error
-pkg pubsub, method (*Publisher) CreateStream(string, string, <-chan Setting) (<-chan struct{}, error)
-pkg pubsub, method (*Publisher) ForkStream(string, chan<- Setting) (*Stream, error)
-pkg pubsub, method (*Publisher) Latest(string) *Stream
-pkg pubsub, method (*Publisher) Shutdown()
-pkg pubsub, method (*Publisher) String() string
-pkg pubsub, method (DurationFlag) Get() interface{}
-pkg pubsub, method (DurationFlag) String() string
-pkg pubsub, type Any struct
-pkg pubsub, type DurationFlag struct
-pkg pubsub, type DurationFlag struct, embedded time.Duration
-pkg pubsub, type Publisher struct
-pkg pubsub, type Setting interface { Description, Name, String, Value }
-pkg pubsub, type Setting interface, Description() string
-pkg pubsub, type Setting interface, Name() string
-pkg pubsub, type Setting interface, String() string
-pkg pubsub, type Setting interface, Value() interface{}
-pkg pubsub, type Stream struct
-pkg pubsub, type Stream struct, Description string
-pkg pubsub, type Stream struct, Latest map[string]Setting
-pkg pubsub, type Stream struct, Name string
diff --git a/pubsub/.godepcop b/pubsub/.godepcop
deleted file mode 100644
index 3a1e5c5..0000000
--- a/pubsub/.godepcop
+++ /dev/null
@@ -1,5 +0,0 @@
-<godepcop>
-  <!-- The pubsub package pulls in verror, which pulls in a bunch of v23 deps.
-       TODO(cnicolaou,toddw): Remove the verror dependency. -->
-  <pkg allow="v.io/v23/..."/>
-</godepcop>
diff --git a/pubsub/config_test.go b/pubsub/config_test.go
deleted file mode 100644
index 09859d5..0000000
--- a/pubsub/config_test.go
+++ /dev/null
@@ -1,291 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package pubsub_test
-
-import (
-	"fmt"
-	"math/rand"
-	"reflect"
-	"sync"
-	"testing"
-	"time"
-
-	"v.io/x/lib/pubsub"
-
-	"v.io/v23/verror"
-)
-
-func ExamplePublisher() {
-	in := make(chan pubsub.Setting)
-	pub := pubsub.NewPublisher()
-	pub.CreateStream("net", "network settings", in)
-
-	// A simple producer of IP address settings.
-	producer := func() {
-		in <- pubsub.NewString("ip", "address", "1.2.3.5")
-	}
-
-	var waiter sync.WaitGroup
-	waiter.Add(2)
-
-	// A simple consumer of IP address Settings.
-	consumer := func(ch chan pubsub.Setting) {
-		fmt.Println(<-ch)
-		waiter.Done()
-	}
-
-	// Publish an initial Setting to the Stream.
-	in <- pubsub.NewString("ip", "address", "1.2.3.4")
-
-	// Fork the stream twice, and read the latest value.
-	ch1 := make(chan pubsub.Setting)
-	st, _ := pub.ForkStream("net", ch1)
-	fmt.Println(st.Latest["ip"])
-	ch2 := make(chan pubsub.Setting)
-	st, _ = pub.ForkStream("net", ch2)
-	fmt.Println(st.Latest["ip"])
-
-	// Now we can read new Settings as they are generated.
-	go producer()
-	go consumer(ch1)
-	go consumer(ch2)
-
-	waiter.Wait()
-
-	// Output:
-	// ip: address: (string: 1.2.3.4)
-	// ip: address: (string: 1.2.3.4)
-	// ip: address: (string: 1.2.3.5)
-	// ip: address: (string: 1.2.3.5)
-}
-
-func ExampleShutdown() {
-	in := make(chan pubsub.Setting)
-	pub := pubsub.NewPublisher()
-	stop, _ := pub.CreateStream("net", "network settings", in)
-
-	var ready sync.WaitGroup
-	ready.Add(1)
-
-	// A producer to write 100 Settings before signalling that it's
-	// ready to be shutdown. This is purely to demonstrate how to use
-	// Shutdown.
-	producer := func() {
-		for i := 0; ; i++ {
-			select {
-			case <-stop:
-				close(in)
-				return
-			default:
-				in <- pubsub.NewString("ip", "address", "1.2.3.4")
-				if i == 100 {
-					ready.Done()
-				}
-			}
-		}
-	}
-
-	var waiter sync.WaitGroup
-	waiter.Add(2)
-
-	consumer := func() {
-		ch := make(chan pubsub.Setting, 10)
-		pub.ForkStream("net", ch)
-		i := 0
-		for {
-			if _, ok := <-ch; !ok {
-				// The channel has been closed when the publisher
-				// is asked to shut down.
-				break
-			}
-			i++
-		}
-		if i >= 100 {
-			// We've received at least 100 Settings as per the producer above.
-			fmt.Println("done")
-		}
-		waiter.Done()
-	}
-
-	go producer()
-	go consumer()
-	go consumer()
-	ready.Wait()
-	pub.Shutdown()
-	waiter.Wait()
-	// Output:
-	// done
-	// done
-}
-
-func TestSimple(t *testing.T) {
-	ch := make(chan pubsub.Setting, 2)
-	pub := pubsub.NewPublisher()
-	if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamDoesntExist" {
-		t.Errorf("missing or wrong error: %v", err)
-	}
-	if _, err := pub.CreateStream("stream", "example", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errNeedNonNilChannel" {
-		t.Fatalf("missing or wrong error: %v", err)
-	}
-	if _, err := pub.CreateStream("stream", "example", ch); err != nil {
-		t.Fatalf("unexpected error: %s", err)
-	}
-	if _, err := pub.CreateStream("stream", "example", ch); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamExists" {
-		t.Fatalf("missing or wrong error: %v", err)
-	}
-	if got, want := pub.String(), "(stream: example)"; got != want {
-		t.Errorf("got %q, want %q", got, want)
-	}
-	stop, err := pub.CreateStream("s2", "eg2", ch)
-	if err != nil {
-		t.Fatalf("unexpected error: %s", err)
-	}
-	if got, want := pub.String(), "(stream: example) (s2: eg2)"; got != want {
-		wantAlternate := "(s2: eg2) (stream: example)"
-		if got != wantAlternate {
-			t.Errorf("got %q, want %q or %q", got, want, wantAlternate)
-		}
-	}
-
-	got, want := pub.Latest("s2"), &pubsub.Stream{"s2", "eg2", nil}
-	if !reflect.DeepEqual(got, want) {
-		t.Errorf("got %q, want %q", got, want)
-	}
-	pub.Shutdown()
-	if _, running := <-stop; running {
-		t.Errorf("expected to be shutting down")
-	}
-	if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamShutDown" {
-		t.Errorf("missing or wrong error: %v", err)
-	}
-	if got, want := pub.String(), "shutdown"; got != want {
-		t.Errorf("got %s, want %s", got, want)
-	}
-
-}
-
-func producer(pub *pubsub.Publisher, in chan<- pubsub.Setting, stop <-chan struct{}, limit int, ch chan int, waiter *sync.WaitGroup) {
-	for i := 0; ; i++ {
-		select {
-		case <-stop:
-			ch <- i
-			waiter.Done()
-			// must close this channel, otherwise the Publisher will leak a goroutine for this stream.
-			close(in)
-			return
-		default:
-			// signal progress on ch, at limit/2, limit and when we're done (above)
-			switch {
-			case i == limit/2:
-				ch <- i
-			case i == limit:
-				ch <- i
-			}
-			if i%2 == 0 {
-				in <- pubsub.NewInt("i", "int", i)
-			} else {
-				in <- pubsub.NewFloat64("f", "float", float64(i))
-			}
-		}
-	}
-	panic("should never get here")
-}
-
-func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
-	ch := make(chan pubsub.Setting, bufsize)
-	st, _ := pub.ForkStream("net", ch)
-	i, i2 := 0, 0
-	if st.Latest["i"] != nil {
-		i = int(st.Latest["i"].Value().(int))
-	}
-	if st.Latest["f"] != nil {
-		i2 = int(st.Latest["f"].Value().(float64))
-	}
-	if i2 > i {
-		i = i2
-	}
-	i++
-	for s := range ch {
-		switch v := s.Value().(type) {
-		case int:
-			if i%2 != 0 {
-				t.Errorf("expected a float, got an int")
-				break
-			}
-			if v != i {
-				t.Errorf("got %d, want %d", v, i)
-			}
-		case float64:
-			if i%2 != 1 {
-				t.Errorf("expected an int, got a float")
-				break
-			}
-			if v != float64(i) {
-				t.Errorf("got %f, want %f", v, i)
-			}
-		}
-		i++
-	}
-	if i < limit {
-		t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
-	}
-	waiter.Done()
-}
-
-func testStream(t *testing.T, consumerBufSize int) {
-	in := make(chan pubsub.Setting)
-	pub := pubsub.NewPublisher()
-	stop, _ := pub.CreateStream("net", "network settings", in)
-
-	rand.Seed(time.Now().UnixNano())
-	limit := rand.Intn(5000)
-	if limit < 100 {
-		limit = 100
-	}
-	t.Logf("limit: %d", limit)
-
-	var waiter sync.WaitGroup
-	waiter.Add(3)
-
-	progress := make(chan int)
-	go producer(pub, in, stop, limit, progress, &waiter)
-
-	i := <-progress
-	t.Logf("limit/2 = %d", i)
-
-	// We use a lot of buffering in this unittest to ensure that
-	// we never miss any settings.
-	go consumer(t, pub, limit, consumerBufSize, &waiter)
-	go consumer(t, pub, limit, consumerBufSize, &waiter)
-
-	reached := <-progress
-	pub.Shutdown()
-	shutdown := <-progress
-	t.Logf("reached %d, shut down at %d", reached, shutdown)
-
-	// Wait for all goroutines to finish.
-	waiter.Wait()
-}
-
-func TestStream(t *testing.T) {
-	testStream(t, 500)
-}
-
-func TestStreamSmallBuffers(t *testing.T) {
-	testStream(t, 1)
-}
-
-func TestDurationFlag(t *testing.T) {
-	d := &pubsub.DurationFlag{}
-	if err := d.Set("1s"); err != nil {
-		t.Errorf("unexpected error %s", err)
-	}
-	if got, want := d.Duration, time.Duration(time.Second); got != want {
-		t.Errorf("got %s, expected %s", got, want)
-	}
-	if err := d.Set("1t"); err == nil || err.Error() != "time: unknown unit t in duration 1t" {
-		t.Errorf("expected error %v", err)
-	}
-}
diff --git a/pubsub/model.go b/pubsub/model.go
deleted file mode 100644
index f2bc2cd..0000000
--- a/pubsub/model.go
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package pubsub defines interfaces for accessing dynamically changing
-// process configuration information.
-//
-// Settings represent configuration parameters and their value. Settings
-// are published to named Streams. Streams are forked to add additional
-// consumers, i.e. readers of the Settings published to the Stream.
-//
-// Settings are represented by an interface type that wraps the data and
-// provides a name and description for each Settings. Streams are similarly
-// named and also have a description. When streams are 'forked' the latest
-// value of all Settings that have been sent over the Stream are made
-// available to the caller. This allows for a rendezvous between the single
-// producer of Settings and multiple consumers of those Settings that
-// may be added at arbitrary points in time.
-//
-// Streams are hosted by a Publisher type, which in addition to the methods
-// required for managing Streams provides a means to shut down all of the
-// Streams it hosts.
-package pubsub
-
-// Setting must be implemented by all data types to sent over Publisher
-// streams.
-type Setting interface {
-	String() string
-	// Name returns the name of the Setting
-	Name() string
-	// Description returns the description of the Setting
-	Description() string
-	// Value returns the value of the Setting.
-	Value() interface{}
-}
diff --git a/pubsub/publisher.go b/pubsub/publisher.go
deleted file mode 100644
index e5e60fe..0000000
--- a/pubsub/publisher.go
+++ /dev/null
@@ -1,229 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package pubsub
-
-import (
-	"fmt"
-	"strings"
-	"sync"
-
-	"v.io/v23/verror"
-)
-
-const pkgPath = "v.io/x/lib/pubsub"
-
-var (
-	errNeedNonNilChannel = verror.Register(pkgPath+".errNeedNonNilChannel", verror.NoRetry, "must provide a non-nil channel")
-	errStreamExists      = verror.Register(pkgPath+".errStreamExists", verror.NoRetry, "stream {3} already exists")
-	errStreamShutDown    = verror.Register(pkgPath+".errStreamShutDown", verror.NoRetry, "stream {3} has been shut down")
-	errStreamDoesntExist = verror.Register(pkgPath+".errStreamDoesntExist", verror.NoRetry, "stream {3} doesn't exist")
-)
-
-// A Publisher provides a mechanism for communicating Settings from a set
-// of producers to multiple consumers. Each such producer and associated
-// consumers are called a Stream. Operations are provided for creating
-// streams (CreateStream) and adding new consumers (ForkStream). Communication
-// is implemented using channels, with the producer and consumers providing
-// channels to send and receive Settings over. A Stream remembers the last value
-// of all Settings that were sent over it; these can be retrieved via ForkStream
-// or the Latest method.
-//
-// The Publisher may be shut down by calling its Shutdown method and
-// the producers will be notified via the channel returned by CreateStream,
-// at which point they should close the channel they use for publishing Settings.
-// If producers fail to close this channel then the Publisher will leak
-// goroutines (one per stream) when it is shutdown.
-type Publisher struct {
-	mu       sync.RWMutex
-	stop     chan struct{}
-	shutdown bool
-	streams  map[string]*fork
-}
-
-// Stream is returned by Latest and includes the name and description
-// for the stream and the most recent values of the Setting that flowed
-// through it.
-type Stream struct {
-	Name, Description string
-	// Latest is a map of Setting names to the Setting itself.
-	Latest map[string]Setting
-}
-
-// NewPublisher creates a Publisher.
-func NewPublisher() *Publisher {
-	return &Publisher{
-		streams: make(map[string]*fork),
-		stop:    make(chan struct{}),
-	}
-}
-
-type fork struct {
-	sync.RWMutex
-	desc string
-	vals map[string]Setting
-	in   <-chan Setting
-	outs []chan<- Setting
-}
-
-// CreateStream creates a Stream with the provided name and description
-// (note, Settings have their own names and description, these are for the
-// stream). In general, no buffering is required for this channel since
-// the Publisher itself will read from it, however, if the consumers are slow
-// then the publisher may be slow in draining the channel. The publisher
-// should provide additional buffering if this is a concern.
-// Consequently this mechanism should be used for rarely changing Settings,
-// such as network address changes forced by DHCP and hence no buffering
-// will be required. The channel returned by CreateStream is closed when the
-// publisher is shut down and hence the caller should wait for this to occur
-// and then close the channel it has passed to CreateStream.
-func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error) {
-	if ch == nil {
-		return nil, verror.New(errNeedNonNilChannel, nil)
-	}
-	p.mu.Lock()
-	defer p.mu.Unlock()
-	if p.streams[name] != nil {
-		return nil, verror.New(errStreamExists, nil, name)
-	}
-	f := &fork{desc: description, in: ch, vals: make(map[string]Setting)}
-	p.streams[name] = f
-	go f.flow(p.stop)
-	return p.stop, nil
-}
-
-// String returns a string representation of the publisher, including
-// the names and descriptions of all the streams it currently supports.
-func (p *Publisher) String() string {
-	r := ""
-	p.mu.RLock()
-	defer p.mu.RUnlock()
-	if p.shutdown {
-		return "shutdown"
-	}
-	for k, s := range p.streams {
-		r += fmt.Sprintf("(%s: %s) ", k, s.desc)
-	}
-	return strings.TrimRight(r, " ")
-}
-
-// Latest returns information on the requested stream, including the
-// last instance of all Settings, if any, that flowed over it.
-func (p *Publisher) Latest(name string) *Stream {
-	p.mu.Lock()
-	defer p.mu.Unlock()
-	f := p.streams[name]
-	if f == nil {
-		return nil
-	}
-	var r map[string]Setting
-	f.RLock()
-	defer f.RUnlock()
-	for k, v := range f.vals {
-		r[k] = v
-	}
-	return &Stream{Name: name, Description: f.desc, Latest: r}
-}
-
-// ForkStream 'forks' the named stream to add a new consumer. The channel
-// provided is to be used to read Settings sent down the stream. This
-// channel will be closed by the Publisher when it is asked to shut down.
-// The reader on this channel must be able to keep up with the flow of Settings
-// through the Stream in order to avoid blocking all other readers and hence
-// should set an appropriate amount of buffering for the channel it passes in.
-// ForkStream returns the most recent values of all Settings previously
-// sent over the stream, thus allowing its caller to synchronise with the
-// stream.
-func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error) {
-	p.mu.Lock()
-	defer p.mu.Unlock()
-	if p.shutdown {
-		return nil, verror.New(errStreamShutDown, nil, name)
-	}
-	f := p.streams[name]
-	if f == nil {
-		return nil, verror.New(errStreamDoesntExist, nil, name)
-	}
-	f.Lock()
-	defer f.Unlock()
-	r := make(map[string]Setting)
-	for k, v := range f.vals {
-		r[k] = v
-	}
-	f.outs = append(f.outs, ch)
-	return &Stream{Name: name, Description: f.desc, Latest: r}, nil
-}
-
-// CloseFork removes the specified channel from the named stream.
-// The caller must drain the channel before closing it.
-// TODO(cnicolaou): add tests for this.
-func (p *Publisher) CloseFork(name string, ch chan<- Setting) error {
-	p.mu.Lock()
-	defer p.mu.Unlock()
-	if p.shutdown {
-		return verror.New(errStreamShutDown, nil, name)
-	}
-	f := p.streams[name]
-	if f == nil {
-		return verror.New(errStreamDoesntExist, nil, name)
-	}
-	f.Lock()
-	defer f.Unlock()
-	for i, v := range f.outs {
-		if v == ch {
-			f.outs = append(f.outs[0:i], f.outs[i+1:]...)
-			break
-		}
-	}
-	return nil
-}
-
-// Shutdown initiates the process of stopping the operation of the Publisher.
-// All of the channels passed to CreateStream must be closed by their owner
-// to ensure that all goroutines are garbage collected.
-func (p *Publisher) Shutdown() {
-	p.mu.Lock()
-	defer p.mu.Unlock()
-	if p.shutdown {
-		return
-	}
-	p.shutdown = true
-	close(p.stop)
-}
-
-func (f *fork) closeChans() {
-	f.Lock()
-	for _, o := range f.outs {
-		close(o)
-	}
-	f.outs = nil
-	f.Unlock()
-}
-
-func (f *fork) flow(stop chan struct{}) {
-	closed := false
-	for {
-		select {
-		case <-stop:
-			if !closed {
-				f.closeChans()
-				closed = true
-			}
-		case val, ok := <-f.in:
-			if !ok {
-				f.closeChans()
-				return
-			}
-			f.Lock()
-			f.vals[val.Name()] = val
-			cpy := make([]chan<- Setting, len(f.outs))
-			copy(cpy, f.outs)
-			f.Unlock()
-			for _, o := range cpy {
-				// We may well block here.
-				o <- val
-			}
-		}
-	}
-}
diff --git a/pubsub/types.go b/pubsub/types.go
deleted file mode 100644
index 1ee8d66..0000000
--- a/pubsub/types.go
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package pubsub
-
-import (
-	"fmt"
-
-	"time"
-)
-
-// Format formats a Setting in a consistent manner, it is intended to be
-// used when implementing the Setting interface.
-func Format(s Setting) string {
-	return fmt.Sprintf("%s: %s: (%T: %s)", s.Name(), s.Description(), s.Value(), s.Value())
-}
-
-// Type Any can be used to represent or implement a Setting of any type.
-type Any struct {
-	name, description string
-	value             interface{}
-}
-
-func (s *Any) String() string {
-	return Format(s)
-}
-
-func (s *Any) Name() string {
-	return s.name
-}
-
-func (s *Any) Description() string {
-	return s.description
-}
-
-func (s *Any) Value() interface{} {
-	return s.value
-}
-
-func NewAny(name, description string, value interface{}) Setting {
-	return &Any{name, description, value}
-}
-
-func NewInt(name, description string, value int) Setting {
-	return &Any{name, description, value}
-}
-
-func NewInt64(name, description string, value int64) Setting {
-	return &Any{name, description, value}
-}
-
-func NewBool(name, description string, value bool) Setting {
-	return &Any{name, description, value}
-}
-
-func NewFloat64(name, description string, value float64) Setting {
-	return &Any{name, description, value}
-}
-
-func NewString(name, description string, value string) Setting {
-	return &Any{name, description, value}
-}
-
-func NewDuration(name, description string, value time.Duration) Setting {
-	return &Any{name, description, value}
-}
-
-// DurationFlag implements flag.Value in order to provide validation of
-// duration values in the flag package.
-type DurationFlag struct{ time.Duration }
-
-// Implements flag.Value.Get
-func (d DurationFlag) Get() interface{} {
-	return d.Duration
-}
-
-// Implements flag.Value.Set
-func (d *DurationFlag) Set(s string) error {
-	duration, err := time.ParseDuration(s)
-	if err != nil {
-		return err
-	}
-	d.Duration = duration
-	return nil
-}
-
-// Implements flag.Value.String
-func (d DurationFlag) String() string {
-	return d.Duration.String()
-}