lib: moving pubsub from lib to ref/lib
The pubsub package has a bad dependency on v.io/v23, which prevents
the lib repository to be release publicly as part of the plan to
externalizing the v23 tool. This CL moves it to ref/lib as it is only
imported by packages under v.io/x/ref/...
MultiPart: 2/2
Change-Id: Ib798ea12214cc84e32f71321916fea48ebb136c9
diff --git a/lib/pubsub/config_test.go b/lib/pubsub/config_test.go
new file mode 100644
index 0000000..f827c79
--- /dev/null
+++ b/lib/pubsub/config_test.go
@@ -0,0 +1,290 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub_test
+
+import (
+ "fmt"
+ "math/rand"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+
+ "v.io/x/ref/lib/pubsub"
+
+ "v.io/v23/verror"
+)
+
+func ExamplePublisher() {
+ in := make(chan pubsub.Setting)
+ pub := pubsub.NewPublisher()
+ pub.CreateStream("net", "network settings", in)
+
+ // A simple producer of IP address settings.
+ producer := func() {
+ in <- pubsub.NewString("ip", "address", "1.2.3.5")
+ }
+
+ var waiter sync.WaitGroup
+ waiter.Add(2)
+
+ // A simple consumer of IP address Settings.
+ consumer := func(ch chan pubsub.Setting) {
+ fmt.Println(<-ch)
+ waiter.Done()
+ }
+
+ // Publish an initial Setting to the Stream.
+ in <- pubsub.NewString("ip", "address", "1.2.3.4")
+
+ // Fork the stream twice, and read the latest value.
+ ch1 := make(chan pubsub.Setting)
+ st, _ := pub.ForkStream("net", ch1)
+ fmt.Println(st.Latest["ip"])
+ ch2 := make(chan pubsub.Setting)
+ st, _ = pub.ForkStream("net", ch2)
+ fmt.Println(st.Latest["ip"])
+
+ // Now we can read new Settings as they are generated.
+ go producer()
+ go consumer(ch1)
+ go consumer(ch2)
+
+ waiter.Wait()
+
+ // Output:
+ // ip: address: (string: 1.2.3.4)
+ // ip: address: (string: 1.2.3.4)
+ // ip: address: (string: 1.2.3.5)
+ // ip: address: (string: 1.2.3.5)
+}
+
+func ExampleShutdown() {
+ in := make(chan pubsub.Setting)
+ pub := pubsub.NewPublisher()
+ stop, _ := pub.CreateStream("net", "network settings", in)
+
+ var ready sync.WaitGroup
+ ready.Add(1)
+
+ // A producer to write 100 Settings before signalling that it's
+ // ready to be shutdown. This is purely to demonstrate how to use
+ // Shutdown.
+ producer := func() {
+ for i := 0; ; i++ {
+ select {
+ case <-stop:
+ close(in)
+ return
+ default:
+ in <- pubsub.NewString("ip", "address", "1.2.3.4")
+ if i == 100 {
+ ready.Done()
+ }
+ }
+ }
+ }
+
+ var waiter sync.WaitGroup
+ waiter.Add(2)
+
+ consumer := func() {
+ ch := make(chan pubsub.Setting, 10)
+ pub.ForkStream("net", ch)
+ i := 0
+ for {
+ if _, ok := <-ch; !ok {
+ // The channel has been closed when the publisher
+ // is asked to shut down.
+ break
+ }
+ i++
+ }
+ if i >= 100 {
+ // We've received at least 100 Settings as per the producer above.
+ fmt.Println("done")
+ }
+ waiter.Done()
+ }
+
+ go producer()
+ go consumer()
+ go consumer()
+ ready.Wait()
+ pub.Shutdown()
+ waiter.Wait()
+ // Output:
+ // done
+ // done
+}
+
+func TestSimple(t *testing.T) {
+ ch := make(chan pubsub.Setting, 2)
+ pub := pubsub.NewPublisher()
+ if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamDoesntExist" {
+ t.Errorf("missing or wrong error: %v", err)
+ }
+ if _, err := pub.CreateStream("stream", "example", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errNeedNonNilChannel" {
+ t.Fatalf("missing or wrong error: %v", err)
+ }
+ if _, err := pub.CreateStream("stream", "example", ch); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if _, err := pub.CreateStream("stream", "example", ch); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamExists" {
+ t.Fatalf("missing or wrong error: %v", err)
+ }
+ if got, want := pub.String(), "(stream: example)"; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ stop, err := pub.CreateStream("s2", "eg2", ch)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if got, want := pub.String(), "(stream: example) (s2: eg2)"; got != want {
+ wantAlternate := "(s2: eg2) (stream: example)"
+ if got != wantAlternate {
+ t.Errorf("got %q, want %q or %q", got, want, wantAlternate)
+ }
+ }
+
+ got, want := pub.Latest("s2"), &pubsub.Stream{"s2", "eg2", nil}
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ pub.Shutdown()
+ if _, running := <-stop; running {
+ t.Errorf("expected to be shutting down")
+ }
+ if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamShutDown" {
+ t.Errorf("missing or wrong error: %v", err)
+ }
+ if got, want := pub.String(), "shutdown"; got != want {
+ t.Errorf("got %s, want %s", got, want)
+ }
+
+}
+
+func producer(pub *pubsub.Publisher, in chan<- pubsub.Setting, stop <-chan struct{}, limit int, ch chan int, waiter *sync.WaitGroup) {
+ for i := 0; ; i++ {
+ select {
+ case <-stop:
+ ch <- i
+ waiter.Done()
+ // must close this channel, otherwise the Publisher will leak a goroutine for this stream.
+ close(in)
+ return
+ default:
+ // signal progress on ch, at limit/2, limit and when we're done (above)
+ switch {
+ case i == limit/2:
+ ch <- i
+ case i == limit:
+ ch <- i
+ }
+ if i%2 == 0 {
+ in <- pubsub.NewInt("i", "int", i)
+ } else {
+ in <- pubsub.NewFloat64("f", "float", float64(i))
+ }
+ }
+ }
+}
+
+func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
+ ch := make(chan pubsub.Setting, bufsize)
+ st, _ := pub.ForkStream("net", ch)
+ i, i2 := 0, 0
+ if st.Latest["i"] != nil {
+ i = int(st.Latest["i"].Value().(int))
+ }
+ if st.Latest["f"] != nil {
+ i2 = int(st.Latest["f"].Value().(float64))
+ }
+ if i2 > i {
+ i = i2
+ }
+ i++
+ for s := range ch {
+ switch v := s.Value().(type) {
+ case int:
+ if i%2 != 0 {
+ t.Errorf("expected a float, got an int")
+ break
+ }
+ if v != i {
+ t.Errorf("got %d, want %d", v, i)
+ }
+ case float64:
+ if i%2 != 1 {
+ t.Errorf("expected an int, got a float")
+ break
+ }
+ if v != float64(i) {
+ t.Errorf("got %f, want %f", v, float64(i))
+ }
+ }
+ i++
+ }
+ if i < limit {
+ t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+ }
+ waiter.Done()
+}
+
+func testStream(t *testing.T, consumerBufSize int) {
+ in := make(chan pubsub.Setting)
+ pub := pubsub.NewPublisher()
+ stop, _ := pub.CreateStream("net", "network settings", in)
+
+ rand.Seed(time.Now().UnixNano())
+ limit := rand.Intn(5000)
+ if limit < 100 {
+ limit = 100
+ }
+ t.Logf("limit: %d", limit)
+
+ var waiter sync.WaitGroup
+ waiter.Add(3)
+
+ progress := make(chan int)
+ go producer(pub, in, stop, limit, progress, &waiter)
+
+ i := <-progress
+ t.Logf("limit/2 = %d", i)
+
+ // We use a lot of buffering in this unittest to ensure that
+ // we never miss any settings.
+ go consumer(t, pub, limit, consumerBufSize, &waiter)
+ go consumer(t, pub, limit, consumerBufSize, &waiter)
+
+ reached := <-progress
+ pub.Shutdown()
+ shutdown := <-progress
+ t.Logf("reached %d, shut down at %d", reached, shutdown)
+
+ // Wait for all goroutines to finish.
+ waiter.Wait()
+}
+
+func TestStream(t *testing.T) {
+ testStream(t, 500)
+}
+
+func TestStreamSmallBuffers(t *testing.T) {
+ testStream(t, 1)
+}
+
+func TestDurationFlag(t *testing.T) {
+ d := &pubsub.DurationFlag{}
+ if err := d.Set("1s"); err != nil {
+ t.Errorf("unexpected error %s", err)
+ }
+ if got, want := d.Duration, time.Duration(time.Second); got != want {
+ t.Errorf("got %s, expected %s", got, want)
+ }
+ if err := d.Set("1t"); err == nil || err.Error() != "time: unknown unit t in duration 1t" {
+ t.Errorf("expected error %v", err)
+ }
+}
diff --git a/lib/pubsub/model.go b/lib/pubsub/model.go
new file mode 100644
index 0000000..f2bc2cd
--- /dev/null
+++ b/lib/pubsub/model.go
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pubsub defines interfaces for accessing dynamically changing
+// process configuration information.
+//
+// Settings represent configuration parameters and their value. Settings
+// are published to named Streams. Streams are forked to add additional
+// consumers, i.e. readers of the Settings published to the Stream.
+//
+// Settings are represented by an interface type that wraps the data and
+// provides a name and description for each Settings. Streams are similarly
+// named and also have a description. When streams are 'forked' the latest
+// value of all Settings that have been sent over the Stream are made
+// available to the caller. This allows for a rendezvous between the single
+// producer of Settings and multiple consumers of those Settings that
+// may be added at arbitrary points in time.
+//
+// Streams are hosted by a Publisher type, which in addition to the methods
+// required for managing Streams provides a means to shut down all of the
+// Streams it hosts.
+package pubsub
+
+// Setting must be implemented by all data types to sent over Publisher
+// streams.
+type Setting interface {
+ String() string
+ // Name returns the name of the Setting
+ Name() string
+ // Description returns the description of the Setting
+ Description() string
+ // Value returns the value of the Setting.
+ Value() interface{}
+}
diff --git a/lib/pubsub/publisher.go b/lib/pubsub/publisher.go
new file mode 100644
index 0000000..ec10fb4
--- /dev/null
+++ b/lib/pubsub/publisher.go
@@ -0,0 +1,229 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+
+ "v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/lib/pubsub"
+
+var (
+ errNeedNonNilChannel = verror.Register(pkgPath+".errNeedNonNilChannel", verror.NoRetry, "must provide a non-nil channel")
+ errStreamExists = verror.Register(pkgPath+".errStreamExists", verror.NoRetry, "stream {3} already exists")
+ errStreamShutDown = verror.Register(pkgPath+".errStreamShutDown", verror.NoRetry, "stream {3} has been shut down")
+ errStreamDoesntExist = verror.Register(pkgPath+".errStreamDoesntExist", verror.NoRetry, "stream {3} doesn't exist")
+)
+
+// A Publisher provides a mechanism for communicating Settings from a set
+// of producers to multiple consumers. Each such producer and associated
+// consumers are called a Stream. Operations are provided for creating
+// streams (CreateStream) and adding new consumers (ForkStream). Communication
+// is implemented using channels, with the producer and consumers providing
+// channels to send and receive Settings over. A Stream remembers the last value
+// of all Settings that were sent over it; these can be retrieved via ForkStream
+// or the Latest method.
+//
+// The Publisher may be shut down by calling its Shutdown method and
+// the producers will be notified via the channel returned by CreateStream,
+// at which point they should close the channel they use for publishing Settings.
+// If producers fail to close this channel then the Publisher will leak
+// goroutines (one per stream) when it is shutdown.
+type Publisher struct {
+ mu sync.RWMutex
+ stop chan struct{}
+ shutdown bool
+ streams map[string]*fork
+}
+
+// Stream is returned by Latest and includes the name and description
+// for the stream and the most recent values of the Setting that flowed
+// through it.
+type Stream struct {
+ Name, Description string
+ // Latest is a map of Setting names to the Setting itself.
+ Latest map[string]Setting
+}
+
+// NewPublisher creates a Publisher.
+func NewPublisher() *Publisher {
+ return &Publisher{
+ streams: make(map[string]*fork),
+ stop: make(chan struct{}),
+ }
+}
+
+type fork struct {
+ sync.RWMutex
+ desc string
+ vals map[string]Setting
+ in <-chan Setting
+ outs []chan<- Setting
+}
+
+// CreateStream creates a Stream with the provided name and description
+// (note, Settings have their own names and description, these are for the
+// stream). In general, no buffering is required for this channel since
+// the Publisher itself will read from it, however, if the consumers are slow
+// then the publisher may be slow in draining the channel. The publisher
+// should provide additional buffering if this is a concern.
+// Consequently this mechanism should be used for rarely changing Settings,
+// such as network address changes forced by DHCP and hence no buffering
+// will be required. The channel returned by CreateStream is closed when the
+// publisher is shut down and hence the caller should wait for this to occur
+// and then close the channel it has passed to CreateStream.
+func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error) {
+ if ch == nil {
+ return nil, verror.New(errNeedNonNilChannel, nil)
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.streams[name] != nil {
+ return nil, verror.New(errStreamExists, nil, name)
+ }
+ f := &fork{desc: description, in: ch, vals: make(map[string]Setting)}
+ p.streams[name] = f
+ go f.flow(p.stop)
+ return p.stop, nil
+}
+
+// String returns a string representation of the publisher, including
+// the names and descriptions of all the streams it currently supports.
+func (p *Publisher) String() string {
+ r := ""
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+ if p.shutdown {
+ return "shutdown"
+ }
+ for k, s := range p.streams {
+ r += fmt.Sprintf("(%s: %s) ", k, s.desc)
+ }
+ return strings.TrimRight(r, " ")
+}
+
+// Latest returns information on the requested stream, including the
+// last instance of all Settings, if any, that flowed over it.
+func (p *Publisher) Latest(name string) *Stream {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ f := p.streams[name]
+ if f == nil {
+ return nil
+ }
+ var r map[string]Setting
+ f.RLock()
+ defer f.RUnlock()
+ for k, v := range f.vals {
+ r[k] = v
+ }
+ return &Stream{Name: name, Description: f.desc, Latest: r}
+}
+
+// ForkStream 'forks' the named stream to add a new consumer. The channel
+// provided is to be used to read Settings sent down the stream. This
+// channel will be closed by the Publisher when it is asked to shut down.
+// The reader on this channel must be able to keep up with the flow of Settings
+// through the Stream in order to avoid blocking all other readers and hence
+// should set an appropriate amount of buffering for the channel it passes in.
+// ForkStream returns the most recent values of all Settings previously
+// sent over the stream, thus allowing its caller to synchronise with the
+// stream.
+func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.shutdown {
+ return nil, verror.New(errStreamShutDown, nil, name)
+ }
+ f := p.streams[name]
+ if f == nil {
+ return nil, verror.New(errStreamDoesntExist, nil, name)
+ }
+ f.Lock()
+ defer f.Unlock()
+ r := make(map[string]Setting)
+ for k, v := range f.vals {
+ r[k] = v
+ }
+ f.outs = append(f.outs, ch)
+ return &Stream{Name: name, Description: f.desc, Latest: r}, nil
+}
+
+// CloseFork removes the specified channel from the named stream.
+// The caller must drain the channel before closing it.
+// TODO(cnicolaou): add tests for this.
+func (p *Publisher) CloseFork(name string, ch chan<- Setting) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.shutdown {
+ return verror.New(errStreamShutDown, nil, name)
+ }
+ f := p.streams[name]
+ if f == nil {
+ return verror.New(errStreamDoesntExist, nil, name)
+ }
+ f.Lock()
+ defer f.Unlock()
+ for i, v := range f.outs {
+ if v == ch {
+ f.outs = append(f.outs[0:i], f.outs[i+1:]...)
+ break
+ }
+ }
+ return nil
+}
+
+// Shutdown initiates the process of stopping the operation of the Publisher.
+// All of the channels passed to CreateStream must be closed by their owner
+// to ensure that all goroutines are garbage collected.
+func (p *Publisher) Shutdown() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.shutdown {
+ return
+ }
+ p.shutdown = true
+ close(p.stop)
+}
+
+func (f *fork) closeChans() {
+ f.Lock()
+ for _, o := range f.outs {
+ close(o)
+ }
+ f.outs = nil
+ f.Unlock()
+}
+
+func (f *fork) flow(stop chan struct{}) {
+ closed := false
+ for {
+ select {
+ case <-stop:
+ if !closed {
+ f.closeChans()
+ closed = true
+ }
+ case val, ok := <-f.in:
+ if !ok {
+ f.closeChans()
+ return
+ }
+ f.Lock()
+ f.vals[val.Name()] = val
+ cpy := make([]chan<- Setting, len(f.outs))
+ copy(cpy, f.outs)
+ f.Unlock()
+ for _, o := range cpy {
+ // We may well block here.
+ o <- val
+ }
+ }
+ }
+}
diff --git a/lib/pubsub/types.go b/lib/pubsub/types.go
new file mode 100644
index 0000000..1ee8d66
--- /dev/null
+++ b/lib/pubsub/types.go
@@ -0,0 +1,91 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+ "fmt"
+
+ "time"
+)
+
+// Format formats a Setting in a consistent manner, it is intended to be
+// used when implementing the Setting interface.
+func Format(s Setting) string {
+ return fmt.Sprintf("%s: %s: (%T: %s)", s.Name(), s.Description(), s.Value(), s.Value())
+}
+
+// Type Any can be used to represent or implement a Setting of any type.
+type Any struct {
+ name, description string
+ value interface{}
+}
+
+func (s *Any) String() string {
+ return Format(s)
+}
+
+func (s *Any) Name() string {
+ return s.name
+}
+
+func (s *Any) Description() string {
+ return s.description
+}
+
+func (s *Any) Value() interface{} {
+ return s.value
+}
+
+func NewAny(name, description string, value interface{}) Setting {
+ return &Any{name, description, value}
+}
+
+func NewInt(name, description string, value int) Setting {
+ return &Any{name, description, value}
+}
+
+func NewInt64(name, description string, value int64) Setting {
+ return &Any{name, description, value}
+}
+
+func NewBool(name, description string, value bool) Setting {
+ return &Any{name, description, value}
+}
+
+func NewFloat64(name, description string, value float64) Setting {
+ return &Any{name, description, value}
+}
+
+func NewString(name, description string, value string) Setting {
+ return &Any{name, description, value}
+}
+
+func NewDuration(name, description string, value time.Duration) Setting {
+ return &Any{name, description, value}
+}
+
+// DurationFlag implements flag.Value in order to provide validation of
+// duration values in the flag package.
+type DurationFlag struct{ time.Duration }
+
+// Implements flag.Value.Get
+func (d DurationFlag) Get() interface{} {
+ return d.Duration
+}
+
+// Implements flag.Value.Set
+func (d *DurationFlag) Set(s string) error {
+ duration, err := time.ParseDuration(s)
+ if err != nil {
+ return err
+ }
+ d.Duration = duration
+ return nil
+}
+
+// Implements flag.Value.String
+func (d DurationFlag) String() string {
+ return d.Duration.String()
+}
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index f42d786..e5d0bb4 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -19,7 +19,6 @@
"v.io/x/lib/netconfig"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
@@ -27,6 +26,7 @@
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/flags"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/runtime/internal"
"v.io/x/ref/runtime/internal/lib/appcycle"
diff --git a/runtime/internal/rpc/full_test.go b/runtime/internal/rpc/full_test.go
index 56fcbdf..9c26e2b 100644
--- a/runtime/internal/rpc/full_test.go
+++ b/runtime/internal/rpc/full_test.go
@@ -20,7 +20,6 @@
"time"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
@@ -36,6 +35,7 @@
"v.io/v23/verror"
"v.io/v23/vtrace"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
"v.io/x/ref/runtime/internal/lib/publisher"
"v.io/x/ref/runtime/internal/lib/websocket"
diff --git a/runtime/internal/rpc/pubsub.go b/runtime/internal/rpc/pubsub.go
index 7bfe46c..6d2ad87 100644
--- a/runtime/internal/rpc/pubsub.go
+++ b/runtime/internal/rpc/pubsub.go
@@ -7,7 +7,7 @@
import (
"net"
- "v.io/x/lib/pubsub"
+ "v.io/x/ref/lib/pubsub"
)
// NewAddAddrsSetting creates the Setting to be sent to Listen to inform
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index c34a728..1e30282 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -15,7 +15,6 @@
"time"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
@@ -32,6 +31,7 @@
"v.io/v23/vtrace"
"v.io/x/ref/lib/apilog"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
"v.io/x/ref/runtime/internal/lib/publisher"
inaming "v.io/x/ref/runtime/internal/naming"
diff --git a/runtime/internal/rpc/server_test.go b/runtime/internal/rpc/server_test.go
index b8b5bb2..7263415 100644
--- a/runtime/internal/rpc/server_test.go
+++ b/runtime/internal/rpc/server_test.go
@@ -12,7 +12,6 @@
"time"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/x/lib/set"
"v.io/v23"
@@ -23,6 +22,7 @@
"v.io/v23/security"
"v.io/v23/verror"
+ "v.io/x/ref/lib/pubsub"
inaming "v.io/x/ref/runtime/internal/naming"
imanager "v.io/x/ref/runtime/internal/rpc/stream/manager"
tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 9cc9825..dac4bed 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -14,7 +14,6 @@
"time"
"v.io/x/lib/metadata"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
@@ -31,6 +30,7 @@
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/apilog"
"v.io/x/ref/lib/flags"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
_ "v.io/x/ref/lib/stats/sysstats"
"v.io/x/ref/runtime/internal/flow/manager"