lib: moving pubsub from lib to ref/lib
The pubsub package has a bad dependency on v.io/v23, which prevents
the lib repository to be release publicly as part of the plan to
externalizing the v23 tool. This CL moves it to ref/lib as it is only
imported by packages under v.io/x/ref/...
MultiPart: 1/2
Change-Id: I3a41e5b48f58d602972978aa70cbfdee7bdf68b9
diff --git a/pubsub/.api b/pubsub/.api
deleted file mode 100644
index 2c1b816..0000000
--- a/pubsub/.api
+++ /dev/null
@@ -1,35 +0,0 @@
-pkg pubsub, func Format(Setting) string
-pkg pubsub, func NewAny(string, string, interface{}) Setting
-pkg pubsub, func NewBool(string, string, bool) Setting
-pkg pubsub, func NewDuration(string, string, time.Duration) Setting
-pkg pubsub, func NewFloat64(string, string, float64) Setting
-pkg pubsub, func NewInt(string, string, int) Setting
-pkg pubsub, func NewInt64(string, string, int64) Setting
-pkg pubsub, func NewPublisher() *Publisher
-pkg pubsub, func NewString(string, string, string) Setting
-pkg pubsub, method (*Any) Description() string
-pkg pubsub, method (*Any) Name() string
-pkg pubsub, method (*Any) String() string
-pkg pubsub, method (*Any) Value() interface{}
-pkg pubsub, method (*DurationFlag) Set(string) error
-pkg pubsub, method (*Publisher) CloseFork(string, chan<- Setting) error
-pkg pubsub, method (*Publisher) CreateStream(string, string, <-chan Setting) (<-chan struct{}, error)
-pkg pubsub, method (*Publisher) ForkStream(string, chan<- Setting) (*Stream, error)
-pkg pubsub, method (*Publisher) Latest(string) *Stream
-pkg pubsub, method (*Publisher) Shutdown()
-pkg pubsub, method (*Publisher) String() string
-pkg pubsub, method (DurationFlag) Get() interface{}
-pkg pubsub, method (DurationFlag) String() string
-pkg pubsub, type Any struct
-pkg pubsub, type DurationFlag struct
-pkg pubsub, type DurationFlag struct, embedded time.Duration
-pkg pubsub, type Publisher struct
-pkg pubsub, type Setting interface { Description, Name, String, Value }
-pkg pubsub, type Setting interface, Description() string
-pkg pubsub, type Setting interface, Name() string
-pkg pubsub, type Setting interface, String() string
-pkg pubsub, type Setting interface, Value() interface{}
-pkg pubsub, type Stream struct
-pkg pubsub, type Stream struct, Description string
-pkg pubsub, type Stream struct, Latest map[string]Setting
-pkg pubsub, type Stream struct, Name string
diff --git a/pubsub/.godepcop b/pubsub/.godepcop
deleted file mode 100644
index 3a1e5c5..0000000
--- a/pubsub/.godepcop
+++ /dev/null
@@ -1,5 +0,0 @@
-<godepcop>
- <!-- The pubsub package pulls in verror, which pulls in a bunch of v23 deps.
- TODO(cnicolaou,toddw): Remove the verror dependency. -->
- <pkg allow="v.io/v23/..."/>
-</godepcop>
diff --git a/pubsub/config_test.go b/pubsub/config_test.go
deleted file mode 100644
index 09859d5..0000000
--- a/pubsub/config_test.go
+++ /dev/null
@@ -1,291 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package pubsub_test
-
-import (
- "fmt"
- "math/rand"
- "reflect"
- "sync"
- "testing"
- "time"
-
- "v.io/x/lib/pubsub"
-
- "v.io/v23/verror"
-)
-
-func ExamplePublisher() {
- in := make(chan pubsub.Setting)
- pub := pubsub.NewPublisher()
- pub.CreateStream("net", "network settings", in)
-
- // A simple producer of IP address settings.
- producer := func() {
- in <- pubsub.NewString("ip", "address", "1.2.3.5")
- }
-
- var waiter sync.WaitGroup
- waiter.Add(2)
-
- // A simple consumer of IP address Settings.
- consumer := func(ch chan pubsub.Setting) {
- fmt.Println(<-ch)
- waiter.Done()
- }
-
- // Publish an initial Setting to the Stream.
- in <- pubsub.NewString("ip", "address", "1.2.3.4")
-
- // Fork the stream twice, and read the latest value.
- ch1 := make(chan pubsub.Setting)
- st, _ := pub.ForkStream("net", ch1)
- fmt.Println(st.Latest["ip"])
- ch2 := make(chan pubsub.Setting)
- st, _ = pub.ForkStream("net", ch2)
- fmt.Println(st.Latest["ip"])
-
- // Now we can read new Settings as they are generated.
- go producer()
- go consumer(ch1)
- go consumer(ch2)
-
- waiter.Wait()
-
- // Output:
- // ip: address: (string: 1.2.3.4)
- // ip: address: (string: 1.2.3.4)
- // ip: address: (string: 1.2.3.5)
- // ip: address: (string: 1.2.3.5)
-}
-
-func ExampleShutdown() {
- in := make(chan pubsub.Setting)
- pub := pubsub.NewPublisher()
- stop, _ := pub.CreateStream("net", "network settings", in)
-
- var ready sync.WaitGroup
- ready.Add(1)
-
- // A producer to write 100 Settings before signalling that it's
- // ready to be shutdown. This is purely to demonstrate how to use
- // Shutdown.
- producer := func() {
- for i := 0; ; i++ {
- select {
- case <-stop:
- close(in)
- return
- default:
- in <- pubsub.NewString("ip", "address", "1.2.3.4")
- if i == 100 {
- ready.Done()
- }
- }
- }
- }
-
- var waiter sync.WaitGroup
- waiter.Add(2)
-
- consumer := func() {
- ch := make(chan pubsub.Setting, 10)
- pub.ForkStream("net", ch)
- i := 0
- for {
- if _, ok := <-ch; !ok {
- // The channel has been closed when the publisher
- // is asked to shut down.
- break
- }
- i++
- }
- if i >= 100 {
- // We've received at least 100 Settings as per the producer above.
- fmt.Println("done")
- }
- waiter.Done()
- }
-
- go producer()
- go consumer()
- go consumer()
- ready.Wait()
- pub.Shutdown()
- waiter.Wait()
- // Output:
- // done
- // done
-}
-
-func TestSimple(t *testing.T) {
- ch := make(chan pubsub.Setting, 2)
- pub := pubsub.NewPublisher()
- if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamDoesntExist" {
- t.Errorf("missing or wrong error: %v", err)
- }
- if _, err := pub.CreateStream("stream", "example", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errNeedNonNilChannel" {
- t.Fatalf("missing or wrong error: %v", err)
- }
- if _, err := pub.CreateStream("stream", "example", ch); err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- if _, err := pub.CreateStream("stream", "example", ch); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamExists" {
- t.Fatalf("missing or wrong error: %v", err)
- }
- if got, want := pub.String(), "(stream: example)"; got != want {
- t.Errorf("got %q, want %q", got, want)
- }
- stop, err := pub.CreateStream("s2", "eg2", ch)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- if got, want := pub.String(), "(stream: example) (s2: eg2)"; got != want {
- wantAlternate := "(s2: eg2) (stream: example)"
- if got != wantAlternate {
- t.Errorf("got %q, want %q or %q", got, want, wantAlternate)
- }
- }
-
- got, want := pub.Latest("s2"), &pubsub.Stream{"s2", "eg2", nil}
- if !reflect.DeepEqual(got, want) {
- t.Errorf("got %q, want %q", got, want)
- }
- pub.Shutdown()
- if _, running := <-stop; running {
- t.Errorf("expected to be shutting down")
- }
- if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/lib/pubsub.errStreamShutDown" {
- t.Errorf("missing or wrong error: %v", err)
- }
- if got, want := pub.String(), "shutdown"; got != want {
- t.Errorf("got %s, want %s", got, want)
- }
-
-}
-
-func producer(pub *pubsub.Publisher, in chan<- pubsub.Setting, stop <-chan struct{}, limit int, ch chan int, waiter *sync.WaitGroup) {
- for i := 0; ; i++ {
- select {
- case <-stop:
- ch <- i
- waiter.Done()
- // must close this channel, otherwise the Publisher will leak a goroutine for this stream.
- close(in)
- return
- default:
- // signal progress on ch, at limit/2, limit and when we're done (above)
- switch {
- case i == limit/2:
- ch <- i
- case i == limit:
- ch <- i
- }
- if i%2 == 0 {
- in <- pubsub.NewInt("i", "int", i)
- } else {
- in <- pubsub.NewFloat64("f", "float", float64(i))
- }
- }
- }
- panic("should never get here")
-}
-
-func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
- ch := make(chan pubsub.Setting, bufsize)
- st, _ := pub.ForkStream("net", ch)
- i, i2 := 0, 0
- if st.Latest["i"] != nil {
- i = int(st.Latest["i"].Value().(int))
- }
- if st.Latest["f"] != nil {
- i2 = int(st.Latest["f"].Value().(float64))
- }
- if i2 > i {
- i = i2
- }
- i++
- for s := range ch {
- switch v := s.Value().(type) {
- case int:
- if i%2 != 0 {
- t.Errorf("expected a float, got an int")
- break
- }
- if v != i {
- t.Errorf("got %d, want %d", v, i)
- }
- case float64:
- if i%2 != 1 {
- t.Errorf("expected an int, got a float")
- break
- }
- if v != float64(i) {
- t.Errorf("got %f, want %f", v, i)
- }
- }
- i++
- }
- if i < limit {
- t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
- }
- waiter.Done()
-}
-
-func testStream(t *testing.T, consumerBufSize int) {
- in := make(chan pubsub.Setting)
- pub := pubsub.NewPublisher()
- stop, _ := pub.CreateStream("net", "network settings", in)
-
- rand.Seed(time.Now().UnixNano())
- limit := rand.Intn(5000)
- if limit < 100 {
- limit = 100
- }
- t.Logf("limit: %d", limit)
-
- var waiter sync.WaitGroup
- waiter.Add(3)
-
- progress := make(chan int)
- go producer(pub, in, stop, limit, progress, &waiter)
-
- i := <-progress
- t.Logf("limit/2 = %d", i)
-
- // We use a lot of buffering in this unittest to ensure that
- // we never miss any settings.
- go consumer(t, pub, limit, consumerBufSize, &waiter)
- go consumer(t, pub, limit, consumerBufSize, &waiter)
-
- reached := <-progress
- pub.Shutdown()
- shutdown := <-progress
- t.Logf("reached %d, shut down at %d", reached, shutdown)
-
- // Wait for all goroutines to finish.
- waiter.Wait()
-}
-
-func TestStream(t *testing.T) {
- testStream(t, 500)
-}
-
-func TestStreamSmallBuffers(t *testing.T) {
- testStream(t, 1)
-}
-
-func TestDurationFlag(t *testing.T) {
- d := &pubsub.DurationFlag{}
- if err := d.Set("1s"); err != nil {
- t.Errorf("unexpected error %s", err)
- }
- if got, want := d.Duration, time.Duration(time.Second); got != want {
- t.Errorf("got %s, expected %s", got, want)
- }
- if err := d.Set("1t"); err == nil || err.Error() != "time: unknown unit t in duration 1t" {
- t.Errorf("expected error %v", err)
- }
-}
diff --git a/pubsub/model.go b/pubsub/model.go
deleted file mode 100644
index f2bc2cd..0000000
--- a/pubsub/model.go
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package pubsub defines interfaces for accessing dynamically changing
-// process configuration information.
-//
-// Settings represent configuration parameters and their value. Settings
-// are published to named Streams. Streams are forked to add additional
-// consumers, i.e. readers of the Settings published to the Stream.
-//
-// Settings are represented by an interface type that wraps the data and
-// provides a name and description for each Settings. Streams are similarly
-// named and also have a description. When streams are 'forked' the latest
-// value of all Settings that have been sent over the Stream are made
-// available to the caller. This allows for a rendezvous between the single
-// producer of Settings and multiple consumers of those Settings that
-// may be added at arbitrary points in time.
-//
-// Streams are hosted by a Publisher type, which in addition to the methods
-// required for managing Streams provides a means to shut down all of the
-// Streams it hosts.
-package pubsub
-
-// Setting must be implemented by all data types to sent over Publisher
-// streams.
-type Setting interface {
- String() string
- // Name returns the name of the Setting
- Name() string
- // Description returns the description of the Setting
- Description() string
- // Value returns the value of the Setting.
- Value() interface{}
-}
diff --git a/pubsub/publisher.go b/pubsub/publisher.go
deleted file mode 100644
index e5e60fe..0000000
--- a/pubsub/publisher.go
+++ /dev/null
@@ -1,229 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package pubsub
-
-import (
- "fmt"
- "strings"
- "sync"
-
- "v.io/v23/verror"
-)
-
-const pkgPath = "v.io/x/lib/pubsub"
-
-var (
- errNeedNonNilChannel = verror.Register(pkgPath+".errNeedNonNilChannel", verror.NoRetry, "must provide a non-nil channel")
- errStreamExists = verror.Register(pkgPath+".errStreamExists", verror.NoRetry, "stream {3} already exists")
- errStreamShutDown = verror.Register(pkgPath+".errStreamShutDown", verror.NoRetry, "stream {3} has been shut down")
- errStreamDoesntExist = verror.Register(pkgPath+".errStreamDoesntExist", verror.NoRetry, "stream {3} doesn't exist")
-)
-
-// A Publisher provides a mechanism for communicating Settings from a set
-// of producers to multiple consumers. Each such producer and associated
-// consumers are called a Stream. Operations are provided for creating
-// streams (CreateStream) and adding new consumers (ForkStream). Communication
-// is implemented using channels, with the producer and consumers providing
-// channels to send and receive Settings over. A Stream remembers the last value
-// of all Settings that were sent over it; these can be retrieved via ForkStream
-// or the Latest method.
-//
-// The Publisher may be shut down by calling its Shutdown method and
-// the producers will be notified via the channel returned by CreateStream,
-// at which point they should close the channel they use for publishing Settings.
-// If producers fail to close this channel then the Publisher will leak
-// goroutines (one per stream) when it is shutdown.
-type Publisher struct {
- mu sync.RWMutex
- stop chan struct{}
- shutdown bool
- streams map[string]*fork
-}
-
-// Stream is returned by Latest and includes the name and description
-// for the stream and the most recent values of the Setting that flowed
-// through it.
-type Stream struct {
- Name, Description string
- // Latest is a map of Setting names to the Setting itself.
- Latest map[string]Setting
-}
-
-// NewPublisher creates a Publisher.
-func NewPublisher() *Publisher {
- return &Publisher{
- streams: make(map[string]*fork),
- stop: make(chan struct{}),
- }
-}
-
-type fork struct {
- sync.RWMutex
- desc string
- vals map[string]Setting
- in <-chan Setting
- outs []chan<- Setting
-}
-
-// CreateStream creates a Stream with the provided name and description
-// (note, Settings have their own names and description, these are for the
-// stream). In general, no buffering is required for this channel since
-// the Publisher itself will read from it, however, if the consumers are slow
-// then the publisher may be slow in draining the channel. The publisher
-// should provide additional buffering if this is a concern.
-// Consequently this mechanism should be used for rarely changing Settings,
-// such as network address changes forced by DHCP and hence no buffering
-// will be required. The channel returned by CreateStream is closed when the
-// publisher is shut down and hence the caller should wait for this to occur
-// and then close the channel it has passed to CreateStream.
-func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error) {
- if ch == nil {
- return nil, verror.New(errNeedNonNilChannel, nil)
- }
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.streams[name] != nil {
- return nil, verror.New(errStreamExists, nil, name)
- }
- f := &fork{desc: description, in: ch, vals: make(map[string]Setting)}
- p.streams[name] = f
- go f.flow(p.stop)
- return p.stop, nil
-}
-
-// String returns a string representation of the publisher, including
-// the names and descriptions of all the streams it currently supports.
-func (p *Publisher) String() string {
- r := ""
- p.mu.RLock()
- defer p.mu.RUnlock()
- if p.shutdown {
- return "shutdown"
- }
- for k, s := range p.streams {
- r += fmt.Sprintf("(%s: %s) ", k, s.desc)
- }
- return strings.TrimRight(r, " ")
-}
-
-// Latest returns information on the requested stream, including the
-// last instance of all Settings, if any, that flowed over it.
-func (p *Publisher) Latest(name string) *Stream {
- p.mu.Lock()
- defer p.mu.Unlock()
- f := p.streams[name]
- if f == nil {
- return nil
- }
- var r map[string]Setting
- f.RLock()
- defer f.RUnlock()
- for k, v := range f.vals {
- r[k] = v
- }
- return &Stream{Name: name, Description: f.desc, Latest: r}
-}
-
-// ForkStream 'forks' the named stream to add a new consumer. The channel
-// provided is to be used to read Settings sent down the stream. This
-// channel will be closed by the Publisher when it is asked to shut down.
-// The reader on this channel must be able to keep up with the flow of Settings
-// through the Stream in order to avoid blocking all other readers and hence
-// should set an appropriate amount of buffering for the channel it passes in.
-// ForkStream returns the most recent values of all Settings previously
-// sent over the stream, thus allowing its caller to synchronise with the
-// stream.
-func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.shutdown {
- return nil, verror.New(errStreamShutDown, nil, name)
- }
- f := p.streams[name]
- if f == nil {
- return nil, verror.New(errStreamDoesntExist, nil, name)
- }
- f.Lock()
- defer f.Unlock()
- r := make(map[string]Setting)
- for k, v := range f.vals {
- r[k] = v
- }
- f.outs = append(f.outs, ch)
- return &Stream{Name: name, Description: f.desc, Latest: r}, nil
-}
-
-// CloseFork removes the specified channel from the named stream.
-// The caller must drain the channel before closing it.
-// TODO(cnicolaou): add tests for this.
-func (p *Publisher) CloseFork(name string, ch chan<- Setting) error {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.shutdown {
- return verror.New(errStreamShutDown, nil, name)
- }
- f := p.streams[name]
- if f == nil {
- return verror.New(errStreamDoesntExist, nil, name)
- }
- f.Lock()
- defer f.Unlock()
- for i, v := range f.outs {
- if v == ch {
- f.outs = append(f.outs[0:i], f.outs[i+1:]...)
- break
- }
- }
- return nil
-}
-
-// Shutdown initiates the process of stopping the operation of the Publisher.
-// All of the channels passed to CreateStream must be closed by their owner
-// to ensure that all goroutines are garbage collected.
-func (p *Publisher) Shutdown() {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.shutdown {
- return
- }
- p.shutdown = true
- close(p.stop)
-}
-
-func (f *fork) closeChans() {
- f.Lock()
- for _, o := range f.outs {
- close(o)
- }
- f.outs = nil
- f.Unlock()
-}
-
-func (f *fork) flow(stop chan struct{}) {
- closed := false
- for {
- select {
- case <-stop:
- if !closed {
- f.closeChans()
- closed = true
- }
- case val, ok := <-f.in:
- if !ok {
- f.closeChans()
- return
- }
- f.Lock()
- f.vals[val.Name()] = val
- cpy := make([]chan<- Setting, len(f.outs))
- copy(cpy, f.outs)
- f.Unlock()
- for _, o := range cpy {
- // We may well block here.
- o <- val
- }
- }
- }
-}
diff --git a/pubsub/types.go b/pubsub/types.go
deleted file mode 100644
index 1ee8d66..0000000
--- a/pubsub/types.go
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package pubsub
-
-import (
- "fmt"
-
- "time"
-)
-
-// Format formats a Setting in a consistent manner, it is intended to be
-// used when implementing the Setting interface.
-func Format(s Setting) string {
- return fmt.Sprintf("%s: %s: (%T: %s)", s.Name(), s.Description(), s.Value(), s.Value())
-}
-
-// Type Any can be used to represent or implement a Setting of any type.
-type Any struct {
- name, description string
- value interface{}
-}
-
-func (s *Any) String() string {
- return Format(s)
-}
-
-func (s *Any) Name() string {
- return s.name
-}
-
-func (s *Any) Description() string {
- return s.description
-}
-
-func (s *Any) Value() interface{} {
- return s.value
-}
-
-func NewAny(name, description string, value interface{}) Setting {
- return &Any{name, description, value}
-}
-
-func NewInt(name, description string, value int) Setting {
- return &Any{name, description, value}
-}
-
-func NewInt64(name, description string, value int64) Setting {
- return &Any{name, description, value}
-}
-
-func NewBool(name, description string, value bool) Setting {
- return &Any{name, description, value}
-}
-
-func NewFloat64(name, description string, value float64) Setting {
- return &Any{name, description, value}
-}
-
-func NewString(name, description string, value string) Setting {
- return &Any{name, description, value}
-}
-
-func NewDuration(name, description string, value time.Duration) Setting {
- return &Any{name, description, value}
-}
-
-// DurationFlag implements flag.Value in order to provide validation of
-// duration values in the flag package.
-type DurationFlag struct{ time.Duration }
-
-// Implements flag.Value.Get
-func (d DurationFlag) Get() interface{} {
- return d.Duration
-}
-
-// Implements flag.Value.Set
-func (d *DurationFlag) Set(s string) error {
- duration, err := time.ParseDuration(s)
- if err != nil {
- return err
- }
- d.Duration = duration
- return nil
-}
-
-// Implements flag.Value.String
-func (d DurationFlag) String() string {
- return d.Duration.String()
-}