Merge "ref: changes related to v.io/c/14371"
diff --git a/lib/pubsub/config_test.go b/lib/pubsub/config_test.go
new file mode 100644
index 0000000..f827c79
--- /dev/null
+++ b/lib/pubsub/config_test.go
@@ -0,0 +1,290 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub_test
+
+import (
+	"fmt"
+	"math/rand"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+
+	"v.io/x/ref/lib/pubsub"
+
+	"v.io/v23/verror"
+)
+
+func ExamplePublisher() {
+	in := make(chan pubsub.Setting)
+	pub := pubsub.NewPublisher()
+	pub.CreateStream("net", "network settings", in)
+
+	// A simple producer of IP address settings.
+	producer := func() {
+		in <- pubsub.NewString("ip", "address", "1.2.3.5")
+	}
+
+	var waiter sync.WaitGroup
+	waiter.Add(2)
+
+	// A simple consumer of IP address Settings.
+	consumer := func(ch chan pubsub.Setting) {
+		fmt.Println(<-ch)
+		waiter.Done()
+	}
+
+	// Publish an initial Setting to the Stream.
+	in <- pubsub.NewString("ip", "address", "1.2.3.4")
+
+	// Fork the stream twice, and read the latest value.
+	ch1 := make(chan pubsub.Setting)
+	st, _ := pub.ForkStream("net", ch1)
+	fmt.Println(st.Latest["ip"])
+	ch2 := make(chan pubsub.Setting)
+	st, _ = pub.ForkStream("net", ch2)
+	fmt.Println(st.Latest["ip"])
+
+	// Now we can read new Settings as they are generated.
+	go producer()
+	go consumer(ch1)
+	go consumer(ch2)
+
+	waiter.Wait()
+
+	// Output:
+	// ip: address: (string: 1.2.3.4)
+	// ip: address: (string: 1.2.3.4)
+	// ip: address: (string: 1.2.3.5)
+	// ip: address: (string: 1.2.3.5)
+}
+
+func ExampleShutdown() {
+	in := make(chan pubsub.Setting)
+	pub := pubsub.NewPublisher()
+	stop, _ := pub.CreateStream("net", "network settings", in)
+
+	var ready sync.WaitGroup
+	ready.Add(1)
+
+	// A producer to write 100 Settings before signalling that it's
+	// ready to be shutdown. This is purely to demonstrate how to use
+	// Shutdown.
+	producer := func() {
+		for i := 0; ; i++ {
+			select {
+			case <-stop:
+				close(in)
+				return
+			default:
+				in <- pubsub.NewString("ip", "address", "1.2.3.4")
+				if i == 100 {
+					ready.Done()
+				}
+			}
+		}
+	}
+
+	var waiter sync.WaitGroup
+	waiter.Add(2)
+
+	consumer := func() {
+		ch := make(chan pubsub.Setting, 10)
+		pub.ForkStream("net", ch)
+		i := 0
+		for {
+			if _, ok := <-ch; !ok {
+				// The channel has been closed when the publisher
+				// is asked to shut down.
+				break
+			}
+			i++
+		}
+		if i >= 100 {
+			// We've received at least 100 Settings as per the producer above.
+			fmt.Println("done")
+		}
+		waiter.Done()
+	}
+
+	go producer()
+	go consumer()
+	go consumer()
+	ready.Wait()
+	pub.Shutdown()
+	waiter.Wait()
+	// Output:
+	// done
+	// done
+}
+
+func TestSimple(t *testing.T) {
+	ch := make(chan pubsub.Setting, 2)
+	pub := pubsub.NewPublisher()
+	if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamDoesntExist" {
+		t.Errorf("missing or wrong error: %v", err)
+	}
+	if _, err := pub.CreateStream("stream", "example", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errNeedNonNilChannel" {
+		t.Fatalf("missing or wrong error: %v", err)
+	}
+	if _, err := pub.CreateStream("stream", "example", ch); err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	if _, err := pub.CreateStream("stream", "example", ch); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamExists" {
+		t.Fatalf("missing or wrong error: %v", err)
+	}
+	if got, want := pub.String(), "(stream: example)"; got != want {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	stop, err := pub.CreateStream("s2", "eg2", ch)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	if got, want := pub.String(), "(stream: example) (s2: eg2)"; got != want {
+		wantAlternate := "(s2: eg2) (stream: example)"
+		if got != wantAlternate {
+			t.Errorf("got %q, want %q or %q", got, want, wantAlternate)
+		}
+	}
+
+	got, want := pub.Latest("s2"), &pubsub.Stream{"s2", "eg2", nil}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	pub.Shutdown()
+	if _, running := <-stop; running {
+		t.Errorf("expected to be shutting down")
+	}
+	if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamShutDown" {
+		t.Errorf("missing or wrong error: %v", err)
+	}
+	if got, want := pub.String(), "shutdown"; got != want {
+		t.Errorf("got %s, want %s", got, want)
+	}
+
+}
+
+func producer(pub *pubsub.Publisher, in chan<- pubsub.Setting, stop <-chan struct{}, limit int, ch chan int, waiter *sync.WaitGroup) {
+	for i := 0; ; i++ {
+		select {
+		case <-stop:
+			ch <- i
+			waiter.Done()
+			// must close this channel, otherwise the Publisher will leak a goroutine for this stream.
+			close(in)
+			return
+		default:
+			// signal progress on ch, at limit/2, limit and when we're done (above)
+			switch {
+			case i == limit/2:
+				ch <- i
+			case i == limit:
+				ch <- i
+			}
+			if i%2 == 0 {
+				in <- pubsub.NewInt("i", "int", i)
+			} else {
+				in <- pubsub.NewFloat64("f", "float", float64(i))
+			}
+		}
+	}
+}
+
+func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
+	ch := make(chan pubsub.Setting, bufsize)
+	st, _ := pub.ForkStream("net", ch)
+	i, i2 := 0, 0
+	if st.Latest["i"] != nil {
+		i = int(st.Latest["i"].Value().(int))
+	}
+	if st.Latest["f"] != nil {
+		i2 = int(st.Latest["f"].Value().(float64))
+	}
+	if i2 > i {
+		i = i2
+	}
+	i++
+	for s := range ch {
+		switch v := s.Value().(type) {
+		case int:
+			if i%2 != 0 {
+				t.Errorf("expected a float, got an int")
+				break
+			}
+			if v != i {
+				t.Errorf("got %d, want %d", v, i)
+			}
+		case float64:
+			if i%2 != 1 {
+				t.Errorf("expected an int, got a float")
+				break
+			}
+			if v != float64(i) {
+				t.Errorf("got %f, want %f", v, float64(i))
+			}
+		}
+		i++
+	}
+	if i < limit {
+		t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+	}
+	waiter.Done()
+}
+
+func testStream(t *testing.T, consumerBufSize int) {
+	in := make(chan pubsub.Setting)
+	pub := pubsub.NewPublisher()
+	stop, _ := pub.CreateStream("net", "network settings", in)
+
+	rand.Seed(time.Now().UnixNano())
+	limit := rand.Intn(5000)
+	if limit < 100 {
+		limit = 100
+	}
+	t.Logf("limit: %d", limit)
+
+	var waiter sync.WaitGroup
+	waiter.Add(3)
+
+	progress := make(chan int)
+	go producer(pub, in, stop, limit, progress, &waiter)
+
+	i := <-progress
+	t.Logf("limit/2 = %d", i)
+
+	// We use a lot of buffering in this unittest to ensure that
+	// we never miss any settings.
+	go consumer(t, pub, limit, consumerBufSize, &waiter)
+	go consumer(t, pub, limit, consumerBufSize, &waiter)
+
+	reached := <-progress
+	pub.Shutdown()
+	shutdown := <-progress
+	t.Logf("reached %d, shut down at %d", reached, shutdown)
+
+	// Wait for all goroutines to finish.
+	waiter.Wait()
+}
+
+func TestStream(t *testing.T) {
+	testStream(t, 500)
+}
+
+func TestStreamSmallBuffers(t *testing.T) {
+	testStream(t, 1)
+}
+
+func TestDurationFlag(t *testing.T) {
+	d := &pubsub.DurationFlag{}
+	if err := d.Set("1s"); err != nil {
+		t.Errorf("unexpected error %s", err)
+	}
+	if got, want := d.Duration, time.Duration(time.Second); got != want {
+		t.Errorf("got %s, expected %s", got, want)
+	}
+	if err := d.Set("1t"); err == nil || err.Error() != "time: unknown unit t in duration 1t" {
+		t.Errorf("expected error %v", err)
+	}
+}
diff --git a/lib/pubsub/model.go b/lib/pubsub/model.go
new file mode 100644
index 0000000..f2bc2cd
--- /dev/null
+++ b/lib/pubsub/model.go
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pubsub defines interfaces for accessing dynamically changing
+// process configuration information.
+//
+// Settings represent configuration parameters and their value. Settings
+// are published to named Streams. Streams are forked to add additional
+// consumers, i.e. readers of the Settings published to the Stream.
+//
+// Settings are represented by an interface type that wraps the data and
+// provides a name and description for each Settings. Streams are similarly
+// named and also have a description. When streams are 'forked' the latest
+// value of all Settings that have been sent over the Stream are made
+// available to the caller. This allows for a rendezvous between the single
+// producer of Settings and multiple consumers of those Settings that
+// may be added at arbitrary points in time.
+//
+// Streams are hosted by a Publisher type, which in addition to the methods
+// required for managing Streams provides a means to shut down all of the
+// Streams it hosts.
+package pubsub
+
+// Setting must be implemented by all data types to sent over Publisher
+// streams.
+type Setting interface {
+	String() string
+	// Name returns the name of the Setting
+	Name() string
+	// Description returns the description of the Setting
+	Description() string
+	// Value returns the value of the Setting.
+	Value() interface{}
+}
diff --git a/lib/pubsub/publisher.go b/lib/pubsub/publisher.go
new file mode 100644
index 0000000..ec10fb4
--- /dev/null
+++ b/lib/pubsub/publisher.go
@@ -0,0 +1,229 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+	"fmt"
+	"strings"
+	"sync"
+
+	"v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/lib/pubsub"
+
+var (
+	errNeedNonNilChannel = verror.Register(pkgPath+".errNeedNonNilChannel", verror.NoRetry, "must provide a non-nil channel")
+	errStreamExists      = verror.Register(pkgPath+".errStreamExists", verror.NoRetry, "stream {3} already exists")
+	errStreamShutDown    = verror.Register(pkgPath+".errStreamShutDown", verror.NoRetry, "stream {3} has been shut down")
+	errStreamDoesntExist = verror.Register(pkgPath+".errStreamDoesntExist", verror.NoRetry, "stream {3} doesn't exist")
+)
+
+// A Publisher provides a mechanism for communicating Settings from a set
+// of producers to multiple consumers. Each such producer and associated
+// consumers are called a Stream. Operations are provided for creating
+// streams (CreateStream) and adding new consumers (ForkStream). Communication
+// is implemented using channels, with the producer and consumers providing
+// channels to send and receive Settings over. A Stream remembers the last value
+// of all Settings that were sent over it; these can be retrieved via ForkStream
+// or the Latest method.
+//
+// The Publisher may be shut down by calling its Shutdown method and
+// the producers will be notified via the channel returned by CreateStream,
+// at which point they should close the channel they use for publishing Settings.
+// If producers fail to close this channel then the Publisher will leak
+// goroutines (one per stream) when it is shutdown.
+type Publisher struct {
+	mu       sync.RWMutex
+	stop     chan struct{}
+	shutdown bool
+	streams  map[string]*fork
+}
+
+// Stream is returned by Latest and includes the name and description
+// for the stream and the most recent values of the Setting that flowed
+// through it.
+type Stream struct {
+	Name, Description string
+	// Latest is a map of Setting names to the Setting itself.
+	Latest map[string]Setting
+}
+
+// NewPublisher creates a Publisher.
+func NewPublisher() *Publisher {
+	return &Publisher{
+		streams: make(map[string]*fork),
+		stop:    make(chan struct{}),
+	}
+}
+
+type fork struct {
+	sync.RWMutex
+	desc string
+	vals map[string]Setting
+	in   <-chan Setting
+	outs []chan<- Setting
+}
+
+// CreateStream creates a Stream with the provided name and description
+// (note, Settings have their own names and description, these are for the
+// stream). In general, no buffering is required for this channel since
+// the Publisher itself will read from it, however, if the consumers are slow
+// then the publisher may be slow in draining the channel. The publisher
+// should provide additional buffering if this is a concern.
+// Consequently this mechanism should be used for rarely changing Settings,
+// such as network address changes forced by DHCP and hence no buffering
+// will be required. The channel returned by CreateStream is closed when the
+// publisher is shut down and hence the caller should wait for this to occur
+// and then close the channel it has passed to CreateStream.
+func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error) {
+	if ch == nil {
+		return nil, verror.New(errNeedNonNilChannel, nil)
+	}
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.streams[name] != nil {
+		return nil, verror.New(errStreamExists, nil, name)
+	}
+	f := &fork{desc: description, in: ch, vals: make(map[string]Setting)}
+	p.streams[name] = f
+	go f.flow(p.stop)
+	return p.stop, nil
+}
+
+// String returns a string representation of the publisher, including
+// the names and descriptions of all the streams it currently supports.
+func (p *Publisher) String() string {
+	r := ""
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+	if p.shutdown {
+		return "shutdown"
+	}
+	for k, s := range p.streams {
+		r += fmt.Sprintf("(%s: %s) ", k, s.desc)
+	}
+	return strings.TrimRight(r, " ")
+}
+
+// Latest returns information on the requested stream, including the
+// last instance of all Settings, if any, that flowed over it.
+func (p *Publisher) Latest(name string) *Stream {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	f := p.streams[name]
+	if f == nil {
+		return nil
+	}
+	var r map[string]Setting
+	f.RLock()
+	defer f.RUnlock()
+	for k, v := range f.vals {
+		r[k] = v
+	}
+	return &Stream{Name: name, Description: f.desc, Latest: r}
+}
+
+// ForkStream 'forks' the named stream to add a new consumer. The channel
+// provided is to be used to read Settings sent down the stream. This
+// channel will be closed by the Publisher when it is asked to shut down.
+// The reader on this channel must be able to keep up with the flow of Settings
+// through the Stream in order to avoid blocking all other readers and hence
+// should set an appropriate amount of buffering for the channel it passes in.
+// ForkStream returns the most recent values of all Settings previously
+// sent over the stream, thus allowing its caller to synchronise with the
+// stream.
+func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.shutdown {
+		return nil, verror.New(errStreamShutDown, nil, name)
+	}
+	f := p.streams[name]
+	if f == nil {
+		return nil, verror.New(errStreamDoesntExist, nil, name)
+	}
+	f.Lock()
+	defer f.Unlock()
+	r := make(map[string]Setting)
+	for k, v := range f.vals {
+		r[k] = v
+	}
+	f.outs = append(f.outs, ch)
+	return &Stream{Name: name, Description: f.desc, Latest: r}, nil
+}
+
+// CloseFork removes the specified channel from the named stream.
+// The caller must drain the channel before closing it.
+// TODO(cnicolaou): add tests for this.
+func (p *Publisher) CloseFork(name string, ch chan<- Setting) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.shutdown {
+		return verror.New(errStreamShutDown, nil, name)
+	}
+	f := p.streams[name]
+	if f == nil {
+		return verror.New(errStreamDoesntExist, nil, name)
+	}
+	f.Lock()
+	defer f.Unlock()
+	for i, v := range f.outs {
+		if v == ch {
+			f.outs = append(f.outs[0:i], f.outs[i+1:]...)
+			break
+		}
+	}
+	return nil
+}
+
+// Shutdown initiates the process of stopping the operation of the Publisher.
+// All of the channels passed to CreateStream must be closed by their owner
+// to ensure that all goroutines are garbage collected.
+func (p *Publisher) Shutdown() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.shutdown {
+		return
+	}
+	p.shutdown = true
+	close(p.stop)
+}
+
+func (f *fork) closeChans() {
+	f.Lock()
+	for _, o := range f.outs {
+		close(o)
+	}
+	f.outs = nil
+	f.Unlock()
+}
+
+func (f *fork) flow(stop chan struct{}) {
+	closed := false
+	for {
+		select {
+		case <-stop:
+			if !closed {
+				f.closeChans()
+				closed = true
+			}
+		case val, ok := <-f.in:
+			if !ok {
+				f.closeChans()
+				return
+			}
+			f.Lock()
+			f.vals[val.Name()] = val
+			cpy := make([]chan<- Setting, len(f.outs))
+			copy(cpy, f.outs)
+			f.Unlock()
+			for _, o := range cpy {
+				// We may well block here.
+				o <- val
+			}
+		}
+	}
+}
diff --git a/lib/pubsub/types.go b/lib/pubsub/types.go
new file mode 100644
index 0000000..1ee8d66
--- /dev/null
+++ b/lib/pubsub/types.go
@@ -0,0 +1,91 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+	"fmt"
+
+	"time"
+)
+
+// Format formats a Setting in a consistent manner, it is intended to be
+// used when implementing the Setting interface.
+func Format(s Setting) string {
+	return fmt.Sprintf("%s: %s: (%T: %s)", s.Name(), s.Description(), s.Value(), s.Value())
+}
+
+// Type Any can be used to represent or implement a Setting of any type.
+type Any struct {
+	name, description string
+	value             interface{}
+}
+
+func (s *Any) String() string {
+	return Format(s)
+}
+
+func (s *Any) Name() string {
+	return s.name
+}
+
+func (s *Any) Description() string {
+	return s.description
+}
+
+func (s *Any) Value() interface{} {
+	return s.value
+}
+
+func NewAny(name, description string, value interface{}) Setting {
+	return &Any{name, description, value}
+}
+
+func NewInt(name, description string, value int) Setting {
+	return &Any{name, description, value}
+}
+
+func NewInt64(name, description string, value int64) Setting {
+	return &Any{name, description, value}
+}
+
+func NewBool(name, description string, value bool) Setting {
+	return &Any{name, description, value}
+}
+
+func NewFloat64(name, description string, value float64) Setting {
+	return &Any{name, description, value}
+}
+
+func NewString(name, description string, value string) Setting {
+	return &Any{name, description, value}
+}
+
+func NewDuration(name, description string, value time.Duration) Setting {
+	return &Any{name, description, value}
+}
+
+// DurationFlag implements flag.Value in order to provide validation of
+// duration values in the flag package.
+type DurationFlag struct{ time.Duration }
+
+// Implements flag.Value.Get
+func (d DurationFlag) Get() interface{} {
+	return d.Duration
+}
+
+// Implements flag.Value.Set
+func (d *DurationFlag) Set(s string) error {
+	duration, err := time.ParseDuration(s)
+	if err != nil {
+		return err
+	}
+	d.Duration = duration
+	return nil
+}
+
+// Implements flag.Value.String
+func (d DurationFlag) String() string {
+	return d.Duration.String()
+}
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index c38271c..85e57a2 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -6,6 +6,7 @@
 
 import (
 	"v.io/v23/context"
+	"v.io/v23/flow"
 	"v.io/v23/rpc"
 	"v.io/x/ref/lib/apilog"
 )
@@ -42,3 +43,8 @@
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	return ctx
 }
+
+func (r *Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
+	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	panic("unimplemented")
+}
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index f42d786..e5d0bb4 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -19,7 +19,6 @@
 
 	"v.io/x/lib/netconfig"
 	"v.io/x/lib/netstate"
-	"v.io/x/lib/pubsub"
 
 	"v.io/v23"
 	"v.io/v23/context"
@@ -27,6 +26,7 @@
 
 	"v.io/x/ref/internal/logger"
 	"v.io/x/ref/lib/flags"
+	"v.io/x/ref/lib/pubsub"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/runtime/internal"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
diff --git a/runtime/internal/flow/conn/close_test.go b/runtime/internal/flow/conn/close_test.go
index 7bc4af0..6dc0987 100644
--- a/runtime/internal/flow/conn/close_test.go
+++ b/runtime/internal/flow/conn/close_test.go
@@ -10,13 +10,14 @@
 	"testing"
 
 	"v.io/v23"
+	"v.io/v23/context"
 	_ "v.io/x/ref/runtime/factories/fake"
 )
 
 func TestRemoteDialerClose(t *testing.T) {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	d, a, w := setupConns(t, ctx, nil, nil)
+	d, a, w := setupConns(t, ctx, ctx, nil, nil)
 	d.Close(ctx, fmt.Errorf("Closing randomly."))
 	<-d.Closed()
 	<-a.Closed()
@@ -28,7 +29,7 @@
 func TestRemoteAcceptorClose(t *testing.T) {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	d, a, w := setupConns(t, ctx, nil, nil)
+	d, a, w := setupConns(t, ctx, ctx, nil, nil)
 	a.Close(ctx, fmt.Errorf("Closing randomly."))
 	<-a.Closed()
 	<-d.Closed()
@@ -40,7 +41,7 @@
 func TestUnderlyingConnectionClosed(t *testing.T) {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	d, a, w := setupConns(t, ctx, nil, nil)
+	d, a, w := setupConns(t, ctx, ctx, nil, nil)
 	w.close()
 	<-a.Closed()
 	<-d.Closed()
@@ -49,7 +50,7 @@
 func TestDialAfterConnClose(t *testing.T) {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	d, a, _ := setupConns(t, ctx, nil, nil)
+	d, a, _ := setupConns(t, ctx, ctx, nil, nil)
 
 	d.Close(ctx, fmt.Errorf("Closing randomly."))
 	<-d.Closed()
@@ -66,7 +67,7 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	for _, dialerDials := range []bool{true, false} {
-		df, flows := setupFlow(t, ctx, dialerDials)
+		df, flows := setupFlow(t, ctx, ctx, dialerDials)
 		if _, err := df.WriteMsg([]byte("hello")); err != nil {
 			t.Fatalf("write failed: %v", err)
 		}
@@ -79,7 +80,7 @@
 		if _, err := df.WriteMsg([]byte("there")); err != nil {
 			t.Fatalf("second write failed: %v", err)
 		}
-		df.(*flw).conn.Close(ctx, nil)
+		df.(*flw).conn.Close(ctx, fmt.Errorf("Closing randomly."))
 		<-af.Conn().Closed()
 		if got, err := af.ReadMsg(); err != nil {
 			t.Fatalf("read failed: %v", err)
@@ -94,3 +95,49 @@
 		}
 	}
 }
+
+func TestFlowCancelOnWrite(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	dctx, cancel := context.WithCancel(ctx)
+	df, accept := setupFlow(t, dctx, ctx, true)
+	done := make(chan struct{})
+	go func() {
+		if _, err := df.WriteMsg([]byte("hello")); err != nil {
+			t.Fatalf("could not write flow: %v", err)
+		}
+		for {
+			if _, err := df.WriteMsg([]byte("hello")); err == context.Canceled {
+				break
+			} else if err != nil {
+				t.Fatalf("unexpected error waiting for cancel: %v", err)
+			}
+		}
+		close(done)
+	}()
+	af := <-accept
+	cancel()
+	<-done
+	<-af.Closed()
+}
+
+func TestFlowCancelOnRead(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	dctx, cancel := context.WithCancel(ctx)
+	df, accept := setupFlow(t, dctx, ctx, true)
+	done := make(chan struct{})
+	go func() {
+		if _, err := df.WriteMsg([]byte("hello")); err != nil {
+			t.Fatalf("could not write flow: %v", err)
+		}
+		if _, err := df.ReadMsg(); err != context.Canceled {
+			t.Fatalf("unexpected error waiting for cancel: %v", err)
+		}
+		close(done)
+	}()
+	af := <-accept
+	cancel()
+	<-done
+	<-af.Closed()
+}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ebe4535..8cd4b6a 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -14,6 +14,7 @@
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
+	"v.io/v23/verror"
 
 	"v.io/x/ref/runtime/internal/flow/flowcontrol"
 )
@@ -155,18 +156,29 @@
 		// We've already torn this conn down.
 		return
 	}
+	ferr := err
+	if verror.ErrorID(err) == ErrConnClosedRemotely.ID {
+		ferr = NewErrFlowClosedRemotely(ctx)
+	} else {
+		message := ""
+		if err != nil {
+			message = err.Error()
+		}
+		cerr := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+			return 0, true, c.mp.writeMsg(ctx, &tearDown{Message: message})
+		})
+		if cerr != nil {
+			ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
+		}
+	}
 	for _, f := range flows {
-		f.close(err)
+		f.close(ctx, ferr)
 	}
-	err = c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
-		return 0, true, c.mp.writeMsg(ctx, &tearDown{Err: err})
-	})
-	if err != nil {
-		ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, err)
+	if cerr := c.mp.close(); cerr != nil {
+		ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
 	}
-	if err = c.mp.close(); err != nil {
-		ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, err)
-	}
+
+	// TODO(mattr): ensure the readLoop is finished before closing this.
 	close(c.closed)
 }
 
@@ -207,7 +219,7 @@
 
 		switch msg := x.(type) {
 		case *tearDown:
-			terr = msg.Err
+			terr = NewErrConnClosedRemotely(ctx, msg.Message)
 			return
 
 		case *openFlow:
@@ -244,7 +256,7 @@
 				return
 			}
 			if msg.flags&closeFlag != 0 {
-				f.close(nil)
+				f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
 			}
 
 		case *unencryptedData:
@@ -259,7 +271,7 @@
 				return
 			}
 			if msg.flags&closeFlag != 0 {
-				f.close(nil)
+				f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
 			}
 
 		default:
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 2c3ca40..7fefba6 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -43,15 +43,22 @@
 	}(want)
 
 	af := <-flows
+	read := 0
 	for len(want) > 0 {
 		got, err := af.ReadMsg()
 		if err != nil && err != io.EOF {
 			t.Fatalf("Unexpected error: %v", err)
 		}
 		if !bytes.Equal(got, want[:len(got)]) {
-			t.Fatalf("Got: %s want %s", got, want)
+			pl := len(got)
+			if pl > 100 {
+				pl = 100
+			}
+			pg, pw := got[:pl], want[:pl]
+			t.Fatalf("On read %d got: %v want %v", read, pg, pw)
 		}
 		want = want[len(got):]
+		read++
 	}
 	if len(want) != 0 {
 		t.Errorf("got %d leftover bytes, expected 0.", len(want))
@@ -65,7 +72,7 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	for _, dialerDials := range []bool{true, false} {
-		df, flows := setupFlow(t, ctx, dialerDials)
+		df, flows := setupFlow(t, ctx, ctx, dialerDials)
 		testWrite(t, ctx, []byte("hello world"), df, flows)
 	}
 }
@@ -73,6 +80,6 @@
 func TestLargeWrite(t *testing.T) {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	df, flows := setupFlow(t, ctx, true)
+	df, flows := setupFlow(t, ctx, ctx, true)
 	testWrite(t, ctx, randData, df, flows)
 }
diff --git a/runtime/internal/flow/conn/conncache.go b/runtime/internal/flow/conn/conncache.go
index b435d9c..ae77f97 100644
--- a/runtime/internal/flow/conn/conncache.go
+++ b/runtime/internal/flow/conn/conncache.go
@@ -119,7 +119,7 @@
 	c.addrCache, c.ridCache, c.started = nil, nil, nil
 	d := c.head.next
 	for d != c.head {
-		d.conn.Close(ctx, nil)
+		d.conn.Close(ctx, NewErrCacheClosed(ctx))
 		d = d.next
 	}
 	c.head = nil
@@ -134,14 +134,15 @@
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
-		return NewErrCacheClosed(nil)
+		return NewErrCacheClosed(ctx)
 	}
 	d := c.head.prev
+	err := NewErrConnKilledToFreeResources(ctx)
 	for i := 0; i < num; i++ {
 		if d == c.head {
 			break
 		}
-		d.conn.Close(ctx, nil)
+		d.conn.Close(ctx, err)
 		delete(c.addrCache, d.addrKey)
 		delete(c.ridCache, d.rid)
 		prev := d.prev
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 98938d4..0e0e821 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -19,6 +19,9 @@
 
   UnexpectedMsg(typ string) {"en": "unexpected message type{:typ}."}
   ConnectionClosed() {"en": "connection closed."}
+  ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
+  ConnClosedRemotely(msg string) {"en": "connection closed remotely{:msg}."}
+  FlowClosedRemotely() {"en": "flow closed remotely."}
   Send(typ, dest string, err error) {"en": "failure sending {typ} message to {dest}{:err}."}
   Recv(src string, err error) {"en": "error reading from {src}{:err}"}
   CacheClosed() {"en":"cache is closed"}
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index e26dc9c..2db4372 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -15,16 +15,19 @@
 )
 
 var (
-	ErrInvalidMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
-	ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
-	ErrUnknownMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
-	ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
-	ErrUnexpectedMsg     = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
-	ErrConnectionClosed  = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
-	ErrSend              = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
-	ErrRecv              = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
-	ErrCacheClosed       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
-	ErrCounterOverflow   = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
+	ErrInvalidMsg                = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+	ErrInvalidControlMsg         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+	ErrUnknownMsg                = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+	ErrUnknownControlMsg         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+	ErrUnexpectedMsg             = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
+	ErrConnectionClosed          = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
+	ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
+	ErrConnClosedRemotely        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnClosedRemotely", verror.NoRetry, "{1:}{2:} connection closed remotely{:3}.")
+	ErrFlowClosedRemotely        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.FlowClosedRemotely", verror.NoRetry, "{1:}{2:} flow closed remotely.")
+	ErrSend                      = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
+	ErrRecv                      = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
+	ErrCacheClosed               = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
+	ErrCounterOverflow           = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
 )
 
 func init() {
@@ -34,6 +37,9 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnexpectedMsg.ID), "{1:}{2:} unexpected message type{:3}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnectionClosed.ID), "{1:}{2:} connection closed.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnClosedRemotely.ID), "{1:}{2:} connection closed remotely{:3}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFlowClosedRemotely.ID), "{1:}{2:} flow closed remotely.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrSend.ID), "{1:}{2:} failure sending {3} message to {4}{:5}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrRecv.ID), "{1:}{2:} error reading from {3}{:4}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
@@ -70,6 +76,21 @@
 	return verror.New(ErrConnectionClosed, ctx)
 }
 
+// NewErrConnKilledToFreeResources returns an error with the ErrConnKilledToFreeResources ID.
+func NewErrConnKilledToFreeResources(ctx *context.T) error {
+	return verror.New(ErrConnKilledToFreeResources, ctx)
+}
+
+// NewErrConnClosedRemotely returns an error with the ErrConnClosedRemotely ID.
+func NewErrConnClosedRemotely(ctx *context.T, msg string) error {
+	return verror.New(ErrConnClosedRemotely, ctx, msg)
+}
+
+// NewErrFlowClosedRemotely returns an error with the ErrFlowClosedRemotely ID.
+func NewErrFlowClosedRemotely(ctx *context.T) error {
+	return verror.New(ErrFlowClosedRemotely, ctx)
+}
+
 // NewErrSend returns an error with the ErrSend ID.
 func NewErrSend(ctx *context.T, typ string, dest string, err error) error {
 	return verror.New(ErrSend, ctx, typ, dest, err)
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 804f472..f75e10f 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -8,7 +8,7 @@
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/security"
-
+	"v.io/v23/verror"
 	"v.io/x/ref/runtime/internal/flow/flowcontrol"
 )
 
@@ -50,6 +50,9 @@
 	if n, release, err = f.q.read(f.ctx, p); release {
 		f.conn.release(f.ctx)
 	}
+	if err != nil {
+		f.close(f.ctx, err)
+	}
 	return
 }
 
@@ -65,6 +68,9 @@
 	if buf, release, err = f.q.get(f.ctx); release {
 		f.conn.release(f.ctx)
 	}
+	if err != nil {
+		f.close(f.ctx, err)
+	}
 	return
 }
 
@@ -123,7 +129,7 @@
 		return size, done, f.conn.mp.writeMsg(f.ctx, d)
 	})
 	if alsoClose || err != nil {
-		f.close(err)
+		f.close(f.ctx, err)
 	}
 	return sent, err
 }
@@ -220,10 +226,18 @@
 	return f.ctx.Done()
 }
 
-func (f *flw) close(err error) {
-	f.q.close(f.ctx)
+func (f *flw) close(ctx *context.T, err error) {
+	f.q.close(ctx)
 	f.cancel()
-
-	// TODO(mattr): maybe send a final close data message.
-	// TODO(mattr): save the error to hand out later.
+	if verror.ErrorID(err) != ErrFlowClosedRemotely.ID {
+		// We want to try to send this message even if ctx is already canceled.
+		ctx, cancel := context.WithRootCancel(ctx)
+		err := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
+			return 0, true, f.conn.mp.writeMsg(ctx, &data{id: f.id, flags: closeFlag})
+		})
+		if err != nil {
+			ctx.Errorf("Could not send close flow message: %v", err)
+		}
+		cancel()
+	}
 }
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index e1bc1f3..6cea426 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -5,8 +5,6 @@
 package conn
 
 import (
-	"errors"
-
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
@@ -88,19 +86,15 @@
 
 // tearDown is sent over the wire before a connection is closed.
 type tearDown struct {
-	Err error
+	Message string
 }
 
 func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
-	var errBytes []byte
-	if m.Err != nil {
-		errBytes = []byte(m.Err.Error())
-	}
-	return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, errBytes})
+	return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Message)})
 }
 func (m *tearDown) read(ctx *context.T, data []byte) error {
 	if len(data) > 0 {
-		m.Err = errors.New(string(data))
+		m.Message = string(data)
 	}
 	return nil
 }
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
index ab715ae..2414c1c 100644
--- a/runtime/internal/flow/conn/message_test.go
+++ b/runtime/internal/flow/conn/message_test.go
@@ -5,7 +5,6 @@
 package conn
 
 import (
-	"errors"
 	"reflect"
 	"testing"
 
@@ -82,7 +81,7 @@
 
 func TestTearDown(t *testing.T) {
 	testMessages(t, []message{
-		&tearDown{Err: errors.New("foobar")},
+		&tearDown{Message: "foobar"},
 		&tearDown{},
 	})
 }
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index a2c09e6..cd61bc7 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -17,6 +17,7 @@
 	b, e int
 
 	size      int
+	nbufs     int
 	toRelease int
 	notify    chan struct{}
 }
@@ -32,9 +33,6 @@
 }
 
 func (r *readq) put(ctx *context.T, bufs [][]byte) error {
-	if len(bufs) == 0 {
-		return nil
-	}
 	l := 0
 	for _, b := range bufs {
 		l += len(b)
@@ -53,13 +51,13 @@
 	if newSize > defaultBufferSize {
 		return NewErrCounterOverflow(ctx)
 	}
-	if r.e == r.b {
-		r.reserveLocked(len(bufs))
-	}
+	newBufs := r.nbufs + len(bufs)
+	r.reserveLocked(newBufs)
 	for _, b := range bufs {
 		r.bufs[r.e] = b
 		r.e = (r.e + 1) % len(r.bufs)
 	}
+	r.nbufs = newBufs
 	if r.size == 0 {
 		select {
 		case r.notify <- struct{}{}:
@@ -82,6 +80,7 @@
 	if len(buf) > 0 {
 		r.bufs[r.b] = buf
 	} else {
+		r.nbufs -= 1
 		r.b = (r.b + 1) % len(r.bufs)
 	}
 	r.size -= n
@@ -98,6 +97,7 @@
 	out = r.bufs[r.b]
 	r.b = (r.b + 1) % len(r.bufs)
 	r.size -= len(out)
+	r.nbufs -= 1
 	r.toRelease += len(out)
 	return out, r.toRelease > defaultBufferSize/2, nil
 }
@@ -131,16 +131,17 @@
 }
 
 func (r *readq) reserveLocked(n int) {
-	needed := n + r.e - r.b
-	if r.e < r.b {
-		needed += len(r.bufs)
-	}
-	if needed < len(r.bufs) {
+	if n < len(r.bufs) {
 		return
 	}
-	nb := make([][]byte, 2*needed)
-	copied := copy(nb, r.bufs[r.b:])
-	copied += copy(nb[n:], r.bufs[:r.e])
+	nb := make([][]byte, 2*n)
+	copied := 0
+	if r.e >= r.b {
+		copied = copy(nb, r.bufs[r.b:r.e])
+	} else {
+		copied = copy(nb, r.bufs[r.b:])
+		copied += copy(nb[copied:], r.bufs[:r.e])
+	}
 	r.bufs, r.b, r.e = nb, 0, copied
 }
 
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 18cc267..5694830 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -96,34 +96,42 @@
 	return nil
 }
 
-func setupConns(t *testing.T, ctx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *wire) {
-	dmrw, amrw, w := newMRWPair(ctx)
+func setupConns(t *testing.T, dctx, actx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *wire) {
+	dmrw, amrw, w := newMRWPair(dctx)
 	versions := version.RPCVersionRange{Min: 3, Max: 5}
 	ep, err := v23.NewEndpoint("localhost:80")
 	if err != nil {
 		t.Fatalf("Unexpected error: %v", err)
 	}
-	d, err := NewDialed(ctx, dmrw, ep, ep, versions, fh(dflows), nil)
+	d, err := NewDialed(dctx, dmrw, ep, ep, versions, fh(dflows), nil)
 	if err != nil {
 		t.Fatalf("Unexpected error: %v", err)
 	}
-	a, err := NewAccepted(ctx, amrw, ep, security.Blessings{}, versions, fh(aflows))
+	a, err := NewAccepted(actx, amrw, ep, security.Blessings{}, versions, fh(aflows))
 	if err != nil {
 		t.Fatalf("Unexpected error: %v", err)
 	}
 	return d, a, w
 }
 
-func setupFlow(t *testing.T, ctx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow) {
-	dflows, aflows := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
-	d, a, _ := setupConns(t, ctx, dflows, aflows)
+func setupFlow(t *testing.T, dctx, actx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow) {
+	d, accepted := setupFlows(t, dctx, actx, dialFromDialer, 1)
+	return d[0], accepted
+}
+
+func setupFlows(t *testing.T, dctx, actx *context.T, dialFromDialer bool, n int) (dialed []flow.Flow, accepted <-chan flow.Flow) {
+	dflows, aflows := make(chan flow.Flow, n), make(chan flow.Flow, n)
+	d, a, _ := setupConns(t, dctx, actx, dflows, aflows)
 	if !dialFromDialer {
 		d, a = a, d
 		aflows, dflows = dflows, aflows
 	}
-	df, err := d.Dial(ctx)
-	if err != nil {
-		t.Fatalf("Unexpected error: %v", err)
+	dialed = make([]flow.Flow, n)
+	for i := 0; i < n; i++ {
+		var err error
+		if dialed[i], err = d.Dial(dctx); err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
 	}
-	return df, aflows
+	return dialed, aflows
 }
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
index 2b6e64b..8221895 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -70,6 +70,15 @@
 
 	for {
 		next := w.fc.nextWorkerLocked()
+		if w.fc.writing == w {
+			// We're already schedule to write, but we should bail
+			// out if we're canceled.
+			select {
+			case <-ctx.Done():
+				err = ctx.Err()
+			default:
+			}
+		}
 		for w.fc.writing != w && err == nil {
 			w.fc.mu.Unlock()
 			if next != nil {
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 3db1610..26b7e91 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -84,6 +84,10 @@
 
 	vcCache *vc.VCCache
 
+	wg     sync.WaitGroup
+	mu     sync.Mutex
+	closed bool
+
 	dc vc.DischargeClient
 }
 
@@ -93,8 +97,7 @@
 	c := &client{
 		streamMgr: streamMgr,
 		ns:        ns,
-
-		vcCache: vc.NewVCCache(),
+		vcCache:   vc.NewVCCache(),
 	}
 	ipNets, err := ipNetworks()
 	if err != nil {
@@ -346,6 +349,7 @@
 // flow itself.
 // TODO(cnicolaou): implement real, configurable load balancing.
 func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+	defer c.wg.Done()
 	status := &serverStatus{index: index, server: server}
 	var span vtrace.Span
 	ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
@@ -354,7 +358,6 @@
 		ch <- status
 		span.Finish()
 	}()
-
 	suberr := func(err error) *verror.SubErr {
 		return &verror.SubErr{
 			Name:    suberrName(server, name, method),
@@ -485,6 +488,14 @@
 		// other while manipulating their copy of the options.
 		vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
 		copy(vcOptsCopy, vcOpts)
+		c.mu.Lock()
+		if c.closed {
+			c.mu.Unlock()
+			return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
+		}
+		c.wg.Add(1)
+		c.mu.Unlock()
+
 		go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
 	}
 
@@ -734,9 +745,13 @@
 
 func (c *client) Close() {
 	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	c.mu.Lock()
+	c.closed = true
+	c.mu.Unlock()
 	for _, v := range c.vcCache.Close() {
 		c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
 	}
+	c.wg.Wait()
 }
 
 // flowClient implements the RPC client-side protocol for a single RPC, over a
diff --git a/runtime/internal/rpc/full_test.go b/runtime/internal/rpc/full_test.go
index 56fcbdf..9c26e2b 100644
--- a/runtime/internal/rpc/full_test.go
+++ b/runtime/internal/rpc/full_test.go
@@ -20,7 +20,6 @@
 	"time"
 
 	"v.io/x/lib/netstate"
-	"v.io/x/lib/pubsub"
 
 	"v.io/v23"
 	"v.io/v23/context"
@@ -36,6 +35,7 @@
 	"v.io/v23/verror"
 	"v.io/v23/vtrace"
 
+	"v.io/x/ref/lib/pubsub"
 	"v.io/x/ref/lib/stats"
 	"v.io/x/ref/runtime/internal/lib/publisher"
 	"v.io/x/ref/runtime/internal/lib/websocket"
diff --git a/runtime/internal/rpc/pubsub.go b/runtime/internal/rpc/pubsub.go
index 7bfe46c..6d2ad87 100644
--- a/runtime/internal/rpc/pubsub.go
+++ b/runtime/internal/rpc/pubsub.go
@@ -7,7 +7,7 @@
 import (
 	"net"
 
-	"v.io/x/lib/pubsub"
+	"v.io/x/ref/lib/pubsub"
 )
 
 // NewAddAddrsSetting creates the Setting to be sent to Listen to inform
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index c34a728..1e30282 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -15,7 +15,6 @@
 	"time"
 
 	"v.io/x/lib/netstate"
-	"v.io/x/lib/pubsub"
 
 	"v.io/v23"
 	"v.io/v23/context"
@@ -32,6 +31,7 @@
 	"v.io/v23/vtrace"
 
 	"v.io/x/ref/lib/apilog"
+	"v.io/x/ref/lib/pubsub"
 	"v.io/x/ref/lib/stats"
 	"v.io/x/ref/runtime/internal/lib/publisher"
 	inaming "v.io/x/ref/runtime/internal/naming"
diff --git a/runtime/internal/rpc/server_test.go b/runtime/internal/rpc/server_test.go
index b8b5bb2..7263415 100644
--- a/runtime/internal/rpc/server_test.go
+++ b/runtime/internal/rpc/server_test.go
@@ -12,7 +12,6 @@
 	"time"
 
 	"v.io/x/lib/netstate"
-	"v.io/x/lib/pubsub"
 	"v.io/x/lib/set"
 
 	"v.io/v23"
@@ -23,6 +22,7 @@
 	"v.io/v23/security"
 	"v.io/v23/verror"
 
+	"v.io/x/ref/lib/pubsub"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	imanager "v.io/x/ref/runtime/internal/rpc/stream/manager"
 	tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
diff --git a/runtime/internal/rpc/stream/manager/manager.go b/runtime/internal/rpc/stream/manager/manager.go
index e5aa37d..1b63a9f 100644
--- a/runtime/internal/rpc/stream/manager/manager.go
+++ b/runtime/internal/rpc/stream/manager/manager.go
@@ -113,6 +113,11 @@
 	return "", "", verror.New(stream.ErrResolveFailed, nil, verror.New(errUnknownNetwork, nil, network))
 }
 
+type dialResult struct {
+	conn net.Conn
+	err  error
+}
+
 // FindOrDialVIF returns the network connection (VIF) to the provided address
 // from the cache in the manager. If not already present in the cache, a new
 // connection will be created using net.Dial.
@@ -146,7 +151,20 @@
 	defer unblock()
 
 	ctx.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
-	conn, err := dial(d, network, address, timeout)
+
+	ch := make(chan *dialResult)
+	go func() {
+		conn, err := dial(d, network, address, timeout)
+		ch <- &dialResult{conn, err}
+	}()
+
+	var conn net.Conn
+	select {
+	case result := <-ch:
+		conn, err = result.conn, result.err
+	case <-ctx.Done():
+		return nil, verror.New(stream.ErrDialFailed, ctx, err)
+	}
 	if err != nil {
 		return nil, err
 	}
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 2e5d5ab..dac4bed 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -14,10 +14,10 @@
 	"time"
 
 	"v.io/x/lib/metadata"
-	"v.io/x/lib/pubsub"
 
 	"v.io/v23"
 	"v.io/v23/context"
+	"v.io/v23/flow"
 	"v.io/v23/i18n"
 	"v.io/v23/namespace"
 	"v.io/v23/naming"
@@ -30,8 +30,10 @@
 	"v.io/x/ref/internal/logger"
 	"v.io/x/ref/lib/apilog"
 	"v.io/x/ref/lib/flags"
+	"v.io/x/ref/lib/pubsub"
 	"v.io/x/ref/lib/stats"
 	_ "v.io/x/ref/lib/stats/sysstats"
+	"v.io/x/ref/runtime/internal/flow/manager"
 	"v.io/x/ref/runtime/internal/lib/dependency"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	inamespace "v.io/x/ref/runtime/internal/naming/namespace"
@@ -51,6 +53,7 @@
 	backgroundKey
 	reservedNameKey
 	listenKey
+	flowManagerKey
 
 	// initKey is used to store values that are only set at init time.
 	initKey
@@ -169,6 +172,15 @@
 	if err != nil {
 		return nil, nil, nil, err
 	}
+
+	// Add the flow.Manager to the context.
+	// TODO(suharshs): Once the client and server use the flow.Manager we will need
+	// manage those dependencies (exactly as we are doing with stream.Manager)
+	ctx, err = r.setNewFlowManager(ctx)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
 	r.ctx = ctx
 	return r, r.WithBackgroundContext(ctx), r.shutdown, nil
 }
@@ -295,6 +307,23 @@
 	return sm, nil
 }
 
+func newFlowManager(ctx *context.T) (flow.Manager, error) {
+	rid, err := naming.NewRoutingID()
+	if err != nil {
+		return nil, err
+	}
+	return manager.New(ctx, rid), nil
+}
+
+func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, error) {
+	fm, err := newFlowManager(ctx)
+	if err != nil {
+		return nil, err
+	}
+	newctx := context.WithValue(ctx, flowManagerKey, fm)
+	return newctx, err
+}
+
 func (r *Runtime) setNewStreamManager(ctx *context.T) (*context.T, error) {
 	sm, err := newStreamManager(ctx)
 	if err != nil {
@@ -349,6 +378,9 @@
 	if newctx, err = r.setNewStreamManager(newctx); err != nil {
 		return ctx, err
 	}
+	if newctx, err = r.setNewFlowManager(newctx); err != nil {
+		return ctx, err
+	}
 	if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
 		return ctx, err
 	}
@@ -487,3 +519,11 @@
 	}
 	return nil
 }
+
+func (*Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
+	// nologcall
+	if d, ok := ctx.Value(flowManagerKey).(flow.Manager); ok {
+		return d
+	}
+	return nil
+}
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index 44cb603..a8753fe 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -152,3 +152,13 @@
 	}
 
 }
+
+func TestFlowManager(t *testing.T) {
+	r, ctx, shutdown := initForTest(t)
+	defer shutdown()
+
+	if man := r.ExperimentalGetFlowManager(ctx); man == nil {
+		t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
+	}
+
+}
diff --git a/services/internal/packages/packages.go b/services/internal/packages/packages.go
index 3f9af83..1656401 100644
--- a/services/internal/packages/packages.go
+++ b/services/internal/packages/packages.go
@@ -143,6 +143,7 @@
 		if err != nil {
 			return err
 		}
+		fh.Method = zip.Deflate
 		fh.Name, _ = filepath.Rel(sourceDir, path)
 		hdr, err := w.CreateHeader(fh)
 		if err != nil {
diff --git a/services/mounttable/mounttablelib/neighborhood.go b/services/mounttable/mounttablelib/neighborhood.go
index 75488c2..20997cf 100644
--- a/services/mounttable/mounttablelib/neighborhood.go
+++ b/services/mounttable/mounttablelib/neighborhood.go
@@ -39,10 +39,6 @@
 
 const addressPrefix = "address:"
 
-// AnonymousNeighbor is the neighborhood host name to use if you want to be a client of the neighborhood namespace
-// without appearing in it.
-const AnonymousNeighbor = "_"
-
 // neighborhood defines a set of machines on the same multicast media.
 type neighborhood struct {
 	mdns             *mdns.MDNS
@@ -108,20 +104,30 @@
 	}
 
 	// Start up MDNS, subscribe to the vanadium service, and add us as a vanadium service provider.
-	mdns, err := mdns.NewMDNS(host, "", "", loopback, false)
+	m, err := mdns.NewMDNS(host, "", "", loopback, false)
 	if err != nil {
-		logger.Global().Errorf("mdns startup failed: %s", err)
-		return nil, err
+		// The name may not have been unique.  Try one more time with a unique
+		// name.  NewMDNS will replace the "()" with "(hardware mac address)".
+		if len(host) > 0 {
+			m, err = mdns.NewMDNS(host+"()", "", "", loopback, false)
+		}
+		if err != nil {
+			logger.Global().Errorf("mdns startup failed: %s", err)
+			return nil, err
+		}
 	}
 	logger.Global().VI(2).Infof("listening for service vanadium on port %d", port)
-	mdns.SubscribeToService("vanadium")
-	if host != AnonymousNeighbor {
-		mdns.AddService("vanadium", "", port, txt...)
+	m.SubscribeToService("vanadium")
+	if len(host) > 0 {
+		m.AddService("vanadium", "", port, txt...)
 	}
+
+	// A small sleep to allow the world to learn about us and vice versa.  Not
+	// necessary but helpful.
 	time.Sleep(50 * time.Millisecond)
 
 	nh := &neighborhood{
-		mdns: mdns,
+		mdns: m,
 	}
 
 	// Watch the network configuration so that we can make MDNS reattach to
diff --git a/services/mounttable/mounttablelib/versionedpermissions.go b/services/mounttable/mounttablelib/versionedpermissions.go
index 50643d9..1c6759a 100644
--- a/services/mounttable/mounttablelib/versionedpermissions.go
+++ b/services/mounttable/mounttablelib/versionedpermissions.go
@@ -143,7 +143,13 @@
 
 			// Create name and add the Permissions map to it.
 			n, err := mt.findNode(ctx, nil, elems, true, nil, nil)
-			if n != nil || err == nil {
+			if err != nil {
+				ctx.Errorf("skipping node for %v; error: %v", elems, err)
+			}
+			if n == nil {
+				continue
+			}
+			if err == nil {
 				ctx.VI(2).Infof("added perms %v to %s", perms, name)
 				if isPattern {
 					n.permsTemplate = perms
@@ -151,9 +157,6 @@
 					n.vPerms, _ = n.vPerms.Set(nil, "", perms)
 					n.explicitPermissions = true
 				}
-			} else {
-				ctx.Errorf("skipping node for %v; error: %v", elems, err)
-				continue
 			}
 			n.parent.Unlock()
 			n.Unlock()