Merge "ref: changes related to v.io/c/14371"
diff --git a/lib/pubsub/config_test.go b/lib/pubsub/config_test.go
new file mode 100644
index 0000000..f827c79
--- /dev/null
+++ b/lib/pubsub/config_test.go
@@ -0,0 +1,290 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub_test
+
+import (
+ "fmt"
+ "math/rand"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+
+ "v.io/x/ref/lib/pubsub"
+
+ "v.io/v23/verror"
+)
+
+func ExamplePublisher() {
+ in := make(chan pubsub.Setting)
+ pub := pubsub.NewPublisher()
+ pub.CreateStream("net", "network settings", in)
+
+ // A simple producer of IP address settings.
+ producer := func() {
+ in <- pubsub.NewString("ip", "address", "1.2.3.5")
+ }
+
+ var waiter sync.WaitGroup
+ waiter.Add(2)
+
+ // A simple consumer of IP address Settings.
+ consumer := func(ch chan pubsub.Setting) {
+ fmt.Println(<-ch)
+ waiter.Done()
+ }
+
+ // Publish an initial Setting to the Stream.
+ in <- pubsub.NewString("ip", "address", "1.2.3.4")
+
+ // Fork the stream twice, and read the latest value.
+ ch1 := make(chan pubsub.Setting)
+ st, _ := pub.ForkStream("net", ch1)
+ fmt.Println(st.Latest["ip"])
+ ch2 := make(chan pubsub.Setting)
+ st, _ = pub.ForkStream("net", ch2)
+ fmt.Println(st.Latest["ip"])
+
+ // Now we can read new Settings as they are generated.
+ go producer()
+ go consumer(ch1)
+ go consumer(ch2)
+
+ waiter.Wait()
+
+ // Output:
+ // ip: address: (string: 1.2.3.4)
+ // ip: address: (string: 1.2.3.4)
+ // ip: address: (string: 1.2.3.5)
+ // ip: address: (string: 1.2.3.5)
+}
+
+func ExampleShutdown() {
+ in := make(chan pubsub.Setting)
+ pub := pubsub.NewPublisher()
+ stop, _ := pub.CreateStream("net", "network settings", in)
+
+ var ready sync.WaitGroup
+ ready.Add(1)
+
+ // A producer to write 100 Settings before signalling that it's
+ // ready to be shutdown. This is purely to demonstrate how to use
+ // Shutdown.
+ producer := func() {
+ for i := 0; ; i++ {
+ select {
+ case <-stop:
+ close(in)
+ return
+ default:
+ in <- pubsub.NewString("ip", "address", "1.2.3.4")
+ if i == 100 {
+ ready.Done()
+ }
+ }
+ }
+ }
+
+ var waiter sync.WaitGroup
+ waiter.Add(2)
+
+ consumer := func() {
+ ch := make(chan pubsub.Setting, 10)
+ pub.ForkStream("net", ch)
+ i := 0
+ for {
+ if _, ok := <-ch; !ok {
+ // The channel has been closed when the publisher
+ // is asked to shut down.
+ break
+ }
+ i++
+ }
+ if i >= 100 {
+ // We've received at least 100 Settings as per the producer above.
+ fmt.Println("done")
+ }
+ waiter.Done()
+ }
+
+ go producer()
+ go consumer()
+ go consumer()
+ ready.Wait()
+ pub.Shutdown()
+ waiter.Wait()
+ // Output:
+ // done
+ // done
+}
+
+func TestSimple(t *testing.T) {
+ ch := make(chan pubsub.Setting, 2)
+ pub := pubsub.NewPublisher()
+ if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamDoesntExist" {
+ t.Errorf("missing or wrong error: %v", err)
+ }
+ if _, err := pub.CreateStream("stream", "example", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errNeedNonNilChannel" {
+ t.Fatalf("missing or wrong error: %v", err)
+ }
+ if _, err := pub.CreateStream("stream", "example", ch); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if _, err := pub.CreateStream("stream", "example", ch); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamExists" {
+ t.Fatalf("missing or wrong error: %v", err)
+ }
+ if got, want := pub.String(), "(stream: example)"; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ stop, err := pub.CreateStream("s2", "eg2", ch)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if got, want := pub.String(), "(stream: example) (s2: eg2)"; got != want {
+ wantAlternate := "(s2: eg2) (stream: example)"
+ if got != wantAlternate {
+ t.Errorf("got %q, want %q or %q", got, want, wantAlternate)
+ }
+ }
+
+ got, want := pub.Latest("s2"), &pubsub.Stream{"s2", "eg2", nil}
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ pub.Shutdown()
+ if _, running := <-stop; running {
+ t.Errorf("expected to be shutting down")
+ }
+ if _, err := pub.ForkStream("stream", nil); err == nil || verror.ErrorID(err) != "v.io/x/ref/lib/pubsub.errStreamShutDown" {
+ t.Errorf("missing or wrong error: %v", err)
+ }
+ if got, want := pub.String(), "shutdown"; got != want {
+ t.Errorf("got %s, want %s", got, want)
+ }
+
+}
+
+func producer(pub *pubsub.Publisher, in chan<- pubsub.Setting, stop <-chan struct{}, limit int, ch chan int, waiter *sync.WaitGroup) {
+ for i := 0; ; i++ {
+ select {
+ case <-stop:
+ ch <- i
+ waiter.Done()
+ // must close this channel, otherwise the Publisher will leak a goroutine for this stream.
+ close(in)
+ return
+ default:
+ // signal progress on ch, at limit/2, limit and when we're done (above)
+ switch {
+ case i == limit/2:
+ ch <- i
+ case i == limit:
+ ch <- i
+ }
+ if i%2 == 0 {
+ in <- pubsub.NewInt("i", "int", i)
+ } else {
+ in <- pubsub.NewFloat64("f", "float", float64(i))
+ }
+ }
+ }
+}
+
+func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
+ ch := make(chan pubsub.Setting, bufsize)
+ st, _ := pub.ForkStream("net", ch)
+ i, i2 := 0, 0
+ if st.Latest["i"] != nil {
+ i = int(st.Latest["i"].Value().(int))
+ }
+ if st.Latest["f"] != nil {
+ i2 = int(st.Latest["f"].Value().(float64))
+ }
+ if i2 > i {
+ i = i2
+ }
+ i++
+ for s := range ch {
+ switch v := s.Value().(type) {
+ case int:
+ if i%2 != 0 {
+ t.Errorf("expected a float, got an int")
+ break
+ }
+ if v != i {
+ t.Errorf("got %d, want %d", v, i)
+ }
+ case float64:
+ if i%2 != 1 {
+ t.Errorf("expected an int, got a float")
+ break
+ }
+ if v != float64(i) {
+ t.Errorf("got %f, want %f", v, float64(i))
+ }
+ }
+ i++
+ }
+ if i < limit {
+ t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+ }
+ waiter.Done()
+}
+
+func testStream(t *testing.T, consumerBufSize int) {
+ in := make(chan pubsub.Setting)
+ pub := pubsub.NewPublisher()
+ stop, _ := pub.CreateStream("net", "network settings", in)
+
+ rand.Seed(time.Now().UnixNano())
+ limit := rand.Intn(5000)
+ if limit < 100 {
+ limit = 100
+ }
+ t.Logf("limit: %d", limit)
+
+ var waiter sync.WaitGroup
+ waiter.Add(3)
+
+ progress := make(chan int)
+ go producer(pub, in, stop, limit, progress, &waiter)
+
+ i := <-progress
+ t.Logf("limit/2 = %d", i)
+
+ // We use a lot of buffering in this unittest to ensure that
+ // we never miss any settings.
+ go consumer(t, pub, limit, consumerBufSize, &waiter)
+ go consumer(t, pub, limit, consumerBufSize, &waiter)
+
+ reached := <-progress
+ pub.Shutdown()
+ shutdown := <-progress
+ t.Logf("reached %d, shut down at %d", reached, shutdown)
+
+ // Wait for all goroutines to finish.
+ waiter.Wait()
+}
+
+func TestStream(t *testing.T) {
+ testStream(t, 500)
+}
+
+func TestStreamSmallBuffers(t *testing.T) {
+ testStream(t, 1)
+}
+
+func TestDurationFlag(t *testing.T) {
+ d := &pubsub.DurationFlag{}
+ if err := d.Set("1s"); err != nil {
+ t.Errorf("unexpected error %s", err)
+ }
+ if got, want := d.Duration, time.Duration(time.Second); got != want {
+ t.Errorf("got %s, expected %s", got, want)
+ }
+ if err := d.Set("1t"); err == nil || err.Error() != "time: unknown unit t in duration 1t" {
+ t.Errorf("expected error %v", err)
+ }
+}
diff --git a/lib/pubsub/model.go b/lib/pubsub/model.go
new file mode 100644
index 0000000..f2bc2cd
--- /dev/null
+++ b/lib/pubsub/model.go
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pubsub defines interfaces for accessing dynamically changing
+// process configuration information.
+//
+// Settings represent configuration parameters and their value. Settings
+// are published to named Streams. Streams are forked to add additional
+// consumers, i.e. readers of the Settings published to the Stream.
+//
+// Settings are represented by an interface type that wraps the data and
+// provides a name and description for each Settings. Streams are similarly
+// named and also have a description. When streams are 'forked' the latest
+// value of all Settings that have been sent over the Stream are made
+// available to the caller. This allows for a rendezvous between the single
+// producer of Settings and multiple consumers of those Settings that
+// may be added at arbitrary points in time.
+//
+// Streams are hosted by a Publisher type, which in addition to the methods
+// required for managing Streams provides a means to shut down all of the
+// Streams it hosts.
+package pubsub
+
+// Setting must be implemented by all data types to sent over Publisher
+// streams.
+type Setting interface {
+ String() string
+ // Name returns the name of the Setting
+ Name() string
+ // Description returns the description of the Setting
+ Description() string
+ // Value returns the value of the Setting.
+ Value() interface{}
+}
diff --git a/lib/pubsub/publisher.go b/lib/pubsub/publisher.go
new file mode 100644
index 0000000..ec10fb4
--- /dev/null
+++ b/lib/pubsub/publisher.go
@@ -0,0 +1,229 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+
+ "v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/lib/pubsub"
+
+var (
+ errNeedNonNilChannel = verror.Register(pkgPath+".errNeedNonNilChannel", verror.NoRetry, "must provide a non-nil channel")
+ errStreamExists = verror.Register(pkgPath+".errStreamExists", verror.NoRetry, "stream {3} already exists")
+ errStreamShutDown = verror.Register(pkgPath+".errStreamShutDown", verror.NoRetry, "stream {3} has been shut down")
+ errStreamDoesntExist = verror.Register(pkgPath+".errStreamDoesntExist", verror.NoRetry, "stream {3} doesn't exist")
+)
+
+// A Publisher provides a mechanism for communicating Settings from a set
+// of producers to multiple consumers. Each such producer and associated
+// consumers are called a Stream. Operations are provided for creating
+// streams (CreateStream) and adding new consumers (ForkStream). Communication
+// is implemented using channels, with the producer and consumers providing
+// channels to send and receive Settings over. A Stream remembers the last value
+// of all Settings that were sent over it; these can be retrieved via ForkStream
+// or the Latest method.
+//
+// The Publisher may be shut down by calling its Shutdown method and
+// the producers will be notified via the channel returned by CreateStream,
+// at which point they should close the channel they use for publishing Settings.
+// If producers fail to close this channel then the Publisher will leak
+// goroutines (one per stream) when it is shutdown.
+type Publisher struct {
+ mu sync.RWMutex
+ stop chan struct{}
+ shutdown bool
+ streams map[string]*fork
+}
+
+// Stream is returned by Latest and includes the name and description
+// for the stream and the most recent values of the Setting that flowed
+// through it.
+type Stream struct {
+ Name, Description string
+ // Latest is a map of Setting names to the Setting itself.
+ Latest map[string]Setting
+}
+
+// NewPublisher creates a Publisher.
+func NewPublisher() *Publisher {
+ return &Publisher{
+ streams: make(map[string]*fork),
+ stop: make(chan struct{}),
+ }
+}
+
+type fork struct {
+ sync.RWMutex
+ desc string
+ vals map[string]Setting
+ in <-chan Setting
+ outs []chan<- Setting
+}
+
+// CreateStream creates a Stream with the provided name and description
+// (note, Settings have their own names and description, these are for the
+// stream). In general, no buffering is required for this channel since
+// the Publisher itself will read from it, however, if the consumers are slow
+// then the publisher may be slow in draining the channel. The publisher
+// should provide additional buffering if this is a concern.
+// Consequently this mechanism should be used for rarely changing Settings,
+// such as network address changes forced by DHCP and hence no buffering
+// will be required. The channel returned by CreateStream is closed when the
+// publisher is shut down and hence the caller should wait for this to occur
+// and then close the channel it has passed to CreateStream.
+func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error) {
+ if ch == nil {
+ return nil, verror.New(errNeedNonNilChannel, nil)
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.streams[name] != nil {
+ return nil, verror.New(errStreamExists, nil, name)
+ }
+ f := &fork{desc: description, in: ch, vals: make(map[string]Setting)}
+ p.streams[name] = f
+ go f.flow(p.stop)
+ return p.stop, nil
+}
+
+// String returns a string representation of the publisher, including
+// the names and descriptions of all the streams it currently supports.
+func (p *Publisher) String() string {
+ r := ""
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+ if p.shutdown {
+ return "shutdown"
+ }
+ for k, s := range p.streams {
+ r += fmt.Sprintf("(%s: %s) ", k, s.desc)
+ }
+ return strings.TrimRight(r, " ")
+}
+
+// Latest returns information on the requested stream, including the
+// last instance of all Settings, if any, that flowed over it.
+func (p *Publisher) Latest(name string) *Stream {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ f := p.streams[name]
+ if f == nil {
+ return nil
+ }
+ var r map[string]Setting
+ f.RLock()
+ defer f.RUnlock()
+ for k, v := range f.vals {
+ r[k] = v
+ }
+ return &Stream{Name: name, Description: f.desc, Latest: r}
+}
+
+// ForkStream 'forks' the named stream to add a new consumer. The channel
+// provided is to be used to read Settings sent down the stream. This
+// channel will be closed by the Publisher when it is asked to shut down.
+// The reader on this channel must be able to keep up with the flow of Settings
+// through the Stream in order to avoid blocking all other readers and hence
+// should set an appropriate amount of buffering for the channel it passes in.
+// ForkStream returns the most recent values of all Settings previously
+// sent over the stream, thus allowing its caller to synchronise with the
+// stream.
+func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.shutdown {
+ return nil, verror.New(errStreamShutDown, nil, name)
+ }
+ f := p.streams[name]
+ if f == nil {
+ return nil, verror.New(errStreamDoesntExist, nil, name)
+ }
+ f.Lock()
+ defer f.Unlock()
+ r := make(map[string]Setting)
+ for k, v := range f.vals {
+ r[k] = v
+ }
+ f.outs = append(f.outs, ch)
+ return &Stream{Name: name, Description: f.desc, Latest: r}, nil
+}
+
+// CloseFork removes the specified channel from the named stream.
+// The caller must drain the channel before closing it.
+// TODO(cnicolaou): add tests for this.
+func (p *Publisher) CloseFork(name string, ch chan<- Setting) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.shutdown {
+ return verror.New(errStreamShutDown, nil, name)
+ }
+ f := p.streams[name]
+ if f == nil {
+ return verror.New(errStreamDoesntExist, nil, name)
+ }
+ f.Lock()
+ defer f.Unlock()
+ for i, v := range f.outs {
+ if v == ch {
+ f.outs = append(f.outs[0:i], f.outs[i+1:]...)
+ break
+ }
+ }
+ return nil
+}
+
+// Shutdown initiates the process of stopping the operation of the Publisher.
+// All of the channels passed to CreateStream must be closed by their owner
+// to ensure that all goroutines are garbage collected.
+func (p *Publisher) Shutdown() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.shutdown {
+ return
+ }
+ p.shutdown = true
+ close(p.stop)
+}
+
+func (f *fork) closeChans() {
+ f.Lock()
+ for _, o := range f.outs {
+ close(o)
+ }
+ f.outs = nil
+ f.Unlock()
+}
+
+func (f *fork) flow(stop chan struct{}) {
+ closed := false
+ for {
+ select {
+ case <-stop:
+ if !closed {
+ f.closeChans()
+ closed = true
+ }
+ case val, ok := <-f.in:
+ if !ok {
+ f.closeChans()
+ return
+ }
+ f.Lock()
+ f.vals[val.Name()] = val
+ cpy := make([]chan<- Setting, len(f.outs))
+ copy(cpy, f.outs)
+ f.Unlock()
+ for _, o := range cpy {
+ // We may well block here.
+ o <- val
+ }
+ }
+ }
+}
diff --git a/lib/pubsub/types.go b/lib/pubsub/types.go
new file mode 100644
index 0000000..1ee8d66
--- /dev/null
+++ b/lib/pubsub/types.go
@@ -0,0 +1,91 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pubsub
+
+import (
+ "fmt"
+
+ "time"
+)
+
+// Format formats a Setting in a consistent manner, it is intended to be
+// used when implementing the Setting interface.
+func Format(s Setting) string {
+ return fmt.Sprintf("%s: %s: (%T: %s)", s.Name(), s.Description(), s.Value(), s.Value())
+}
+
+// Type Any can be used to represent or implement a Setting of any type.
+type Any struct {
+ name, description string
+ value interface{}
+}
+
+func (s *Any) String() string {
+ return Format(s)
+}
+
+func (s *Any) Name() string {
+ return s.name
+}
+
+func (s *Any) Description() string {
+ return s.description
+}
+
+func (s *Any) Value() interface{} {
+ return s.value
+}
+
+func NewAny(name, description string, value interface{}) Setting {
+ return &Any{name, description, value}
+}
+
+func NewInt(name, description string, value int) Setting {
+ return &Any{name, description, value}
+}
+
+func NewInt64(name, description string, value int64) Setting {
+ return &Any{name, description, value}
+}
+
+func NewBool(name, description string, value bool) Setting {
+ return &Any{name, description, value}
+}
+
+func NewFloat64(name, description string, value float64) Setting {
+ return &Any{name, description, value}
+}
+
+func NewString(name, description string, value string) Setting {
+ return &Any{name, description, value}
+}
+
+func NewDuration(name, description string, value time.Duration) Setting {
+ return &Any{name, description, value}
+}
+
+// DurationFlag implements flag.Value in order to provide validation of
+// duration values in the flag package.
+type DurationFlag struct{ time.Duration }
+
+// Implements flag.Value.Get
+func (d DurationFlag) Get() interface{} {
+ return d.Duration
+}
+
+// Implements flag.Value.Set
+func (d *DurationFlag) Set(s string) error {
+ duration, err := time.ParseDuration(s)
+ if err != nil {
+ return err
+ }
+ d.Duration = duration
+ return nil
+}
+
+// Implements flag.Value.String
+func (d DurationFlag) String() string {
+ return d.Duration.String()
+}
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index c38271c..85e57a2 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -6,6 +6,7 @@
import (
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/rpc"
"v.io/x/ref/lib/apilog"
)
@@ -42,3 +43,8 @@
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
return ctx
}
+
+func (r *Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
+ defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ panic("unimplemented")
+}
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index f42d786..e5d0bb4 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -19,7 +19,6 @@
"v.io/x/lib/netconfig"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
@@ -27,6 +26,7 @@
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/flags"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/runtime/internal"
"v.io/x/ref/runtime/internal/lib/appcycle"
diff --git a/runtime/internal/flow/conn/close_test.go b/runtime/internal/flow/conn/close_test.go
index 7bc4af0..6dc0987 100644
--- a/runtime/internal/flow/conn/close_test.go
+++ b/runtime/internal/flow/conn/close_test.go
@@ -10,13 +10,14 @@
"testing"
"v.io/v23"
+ "v.io/v23/context"
_ "v.io/x/ref/runtime/factories/fake"
)
func TestRemoteDialerClose(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
- d, a, w := setupConns(t, ctx, nil, nil)
+ d, a, w := setupConns(t, ctx, ctx, nil, nil)
d.Close(ctx, fmt.Errorf("Closing randomly."))
<-d.Closed()
<-a.Closed()
@@ -28,7 +29,7 @@
func TestRemoteAcceptorClose(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
- d, a, w := setupConns(t, ctx, nil, nil)
+ d, a, w := setupConns(t, ctx, ctx, nil, nil)
a.Close(ctx, fmt.Errorf("Closing randomly."))
<-a.Closed()
<-d.Closed()
@@ -40,7 +41,7 @@
func TestUnderlyingConnectionClosed(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
- d, a, w := setupConns(t, ctx, nil, nil)
+ d, a, w := setupConns(t, ctx, ctx, nil, nil)
w.close()
<-a.Closed()
<-d.Closed()
@@ -49,7 +50,7 @@
func TestDialAfterConnClose(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
- d, a, _ := setupConns(t, ctx, nil, nil)
+ d, a, _ := setupConns(t, ctx, ctx, nil, nil)
d.Close(ctx, fmt.Errorf("Closing randomly."))
<-d.Closed()
@@ -66,7 +67,7 @@
ctx, shutdown := v23.Init()
defer shutdown()
for _, dialerDials := range []bool{true, false} {
- df, flows := setupFlow(t, ctx, dialerDials)
+ df, flows := setupFlow(t, ctx, ctx, dialerDials)
if _, err := df.WriteMsg([]byte("hello")); err != nil {
t.Fatalf("write failed: %v", err)
}
@@ -79,7 +80,7 @@
if _, err := df.WriteMsg([]byte("there")); err != nil {
t.Fatalf("second write failed: %v", err)
}
- df.(*flw).conn.Close(ctx, nil)
+ df.(*flw).conn.Close(ctx, fmt.Errorf("Closing randomly."))
<-af.Conn().Closed()
if got, err := af.ReadMsg(); err != nil {
t.Fatalf("read failed: %v", err)
@@ -94,3 +95,49 @@
}
}
}
+
+func TestFlowCancelOnWrite(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ dctx, cancel := context.WithCancel(ctx)
+ df, accept := setupFlow(t, dctx, ctx, true)
+ done := make(chan struct{})
+ go func() {
+ if _, err := df.WriteMsg([]byte("hello")); err != nil {
+ t.Fatalf("could not write flow: %v", err)
+ }
+ for {
+ if _, err := df.WriteMsg([]byte("hello")); err == context.Canceled {
+ break
+ } else if err != nil {
+ t.Fatalf("unexpected error waiting for cancel: %v", err)
+ }
+ }
+ close(done)
+ }()
+ af := <-accept
+ cancel()
+ <-done
+ <-af.Closed()
+}
+
+func TestFlowCancelOnRead(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ dctx, cancel := context.WithCancel(ctx)
+ df, accept := setupFlow(t, dctx, ctx, true)
+ done := make(chan struct{})
+ go func() {
+ if _, err := df.WriteMsg([]byte("hello")); err != nil {
+ t.Fatalf("could not write flow: %v", err)
+ }
+ if _, err := df.ReadMsg(); err != context.Canceled {
+ t.Fatalf("unexpected error waiting for cancel: %v", err)
+ }
+ close(done)
+ }()
+ af := <-accept
+ cancel()
+ <-done
+ <-af.Closed()
+}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ebe4535..8cd4b6a 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -14,6 +14,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
+ "v.io/v23/verror"
"v.io/x/ref/runtime/internal/flow/flowcontrol"
)
@@ -155,18 +156,29 @@
// We've already torn this conn down.
return
}
+ ferr := err
+ if verror.ErrorID(err) == ErrConnClosedRemotely.ID {
+ ferr = NewErrFlowClosedRemotely(ctx)
+ } else {
+ message := ""
+ if err != nil {
+ message = err.Error()
+ }
+ cerr := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+ return 0, true, c.mp.writeMsg(ctx, &tearDown{Message: message})
+ })
+ if cerr != nil {
+ ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
+ }
+ }
for _, f := range flows {
- f.close(err)
+ f.close(ctx, ferr)
}
- err = c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
- return 0, true, c.mp.writeMsg(ctx, &tearDown{Err: err})
- })
- if err != nil {
- ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, err)
+ if cerr := c.mp.close(); cerr != nil {
+ ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
}
- if err = c.mp.close(); err != nil {
- ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, err)
- }
+
+ // TODO(mattr): ensure the readLoop is finished before closing this.
close(c.closed)
}
@@ -207,7 +219,7 @@
switch msg := x.(type) {
case *tearDown:
- terr = msg.Err
+ terr = NewErrConnClosedRemotely(ctx, msg.Message)
return
case *openFlow:
@@ -244,7 +256,7 @@
return
}
if msg.flags&closeFlag != 0 {
- f.close(nil)
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
}
case *unencryptedData:
@@ -259,7 +271,7 @@
return
}
if msg.flags&closeFlag != 0 {
- f.close(nil)
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
}
default:
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 2c3ca40..7fefba6 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -43,15 +43,22 @@
}(want)
af := <-flows
+ read := 0
for len(want) > 0 {
got, err := af.ReadMsg()
if err != nil && err != io.EOF {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.Equal(got, want[:len(got)]) {
- t.Fatalf("Got: %s want %s", got, want)
+ pl := len(got)
+ if pl > 100 {
+ pl = 100
+ }
+ pg, pw := got[:pl], want[:pl]
+ t.Fatalf("On read %d got: %v want %v", read, pg, pw)
}
want = want[len(got):]
+ read++
}
if len(want) != 0 {
t.Errorf("got %d leftover bytes, expected 0.", len(want))
@@ -65,7 +72,7 @@
ctx, shutdown := v23.Init()
defer shutdown()
for _, dialerDials := range []bool{true, false} {
- df, flows := setupFlow(t, ctx, dialerDials)
+ df, flows := setupFlow(t, ctx, ctx, dialerDials)
testWrite(t, ctx, []byte("hello world"), df, flows)
}
}
@@ -73,6 +80,6 @@
func TestLargeWrite(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
- df, flows := setupFlow(t, ctx, true)
+ df, flows := setupFlow(t, ctx, ctx, true)
testWrite(t, ctx, randData, df, flows)
}
diff --git a/runtime/internal/flow/conn/conncache.go b/runtime/internal/flow/conn/conncache.go
index b435d9c..ae77f97 100644
--- a/runtime/internal/flow/conn/conncache.go
+++ b/runtime/internal/flow/conn/conncache.go
@@ -119,7 +119,7 @@
c.addrCache, c.ridCache, c.started = nil, nil, nil
d := c.head.next
for d != c.head {
- d.conn.Close(ctx, nil)
+ d.conn.Close(ctx, NewErrCacheClosed(ctx))
d = d.next
}
c.head = nil
@@ -134,14 +134,15 @@
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
- return NewErrCacheClosed(nil)
+ return NewErrCacheClosed(ctx)
}
d := c.head.prev
+ err := NewErrConnKilledToFreeResources(ctx)
for i := 0; i < num; i++ {
if d == c.head {
break
}
- d.conn.Close(ctx, nil)
+ d.conn.Close(ctx, err)
delete(c.addrCache, d.addrKey)
delete(c.ridCache, d.rid)
prev := d.prev
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 98938d4..0e0e821 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -19,6 +19,9 @@
UnexpectedMsg(typ string) {"en": "unexpected message type{:typ}."}
ConnectionClosed() {"en": "connection closed."}
+ ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
+ ConnClosedRemotely(msg string) {"en": "connection closed remotely{:msg}."}
+ FlowClosedRemotely() {"en": "flow closed remotely."}
Send(typ, dest string, err error) {"en": "failure sending {typ} message to {dest}{:err}."}
Recv(src string, err error) {"en": "error reading from {src}{:err}"}
CacheClosed() {"en":"cache is closed"}
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index e26dc9c..2db4372 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -15,16 +15,19 @@
)
var (
- ErrInvalidMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
- ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
- ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
- ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
- ErrUnexpectedMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
- ErrConnectionClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
- ErrSend = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
- ErrRecv = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
- ErrCacheClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
- ErrCounterOverflow = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
+ ErrInvalidMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+ ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+ ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+ ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+ ErrUnexpectedMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
+ ErrConnectionClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
+ ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
+ ErrConnClosedRemotely = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnClosedRemotely", verror.NoRetry, "{1:}{2:} connection closed remotely{:3}.")
+ ErrFlowClosedRemotely = verror.Register("v.io/x/ref/runtime/internal/flow/conn.FlowClosedRemotely", verror.NoRetry, "{1:}{2:} flow closed remotely.")
+ ErrSend = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
+ ErrRecv = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
+ ErrCacheClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
+ ErrCounterOverflow = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
)
func init() {
@@ -34,6 +37,9 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnexpectedMsg.ID), "{1:}{2:} unexpected message type{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnectionClosed.ID), "{1:}{2:} connection closed.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnClosedRemotely.ID), "{1:}{2:} connection closed remotely{:3}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFlowClosedRemotely.ID), "{1:}{2:} flow closed remotely.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrSend.ID), "{1:}{2:} failure sending {3} message to {4}{:5}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrRecv.ID), "{1:}{2:} error reading from {3}{:4}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
@@ -70,6 +76,21 @@
return verror.New(ErrConnectionClosed, ctx)
}
+// NewErrConnKilledToFreeResources returns an error with the ErrConnKilledToFreeResources ID.
+func NewErrConnKilledToFreeResources(ctx *context.T) error {
+ return verror.New(ErrConnKilledToFreeResources, ctx)
+}
+
+// NewErrConnClosedRemotely returns an error with the ErrConnClosedRemotely ID.
+func NewErrConnClosedRemotely(ctx *context.T, msg string) error {
+ return verror.New(ErrConnClosedRemotely, ctx, msg)
+}
+
+// NewErrFlowClosedRemotely returns an error with the ErrFlowClosedRemotely ID.
+func NewErrFlowClosedRemotely(ctx *context.T) error {
+ return verror.New(ErrFlowClosedRemotely, ctx)
+}
+
// NewErrSend returns an error with the ErrSend ID.
func NewErrSend(ctx *context.T, typ string, dest string, err error) error {
return verror.New(ErrSend, ctx, typ, dest, err)
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 804f472..f75e10f 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -8,7 +8,7 @@
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/security"
-
+ "v.io/v23/verror"
"v.io/x/ref/runtime/internal/flow/flowcontrol"
)
@@ -50,6 +50,9 @@
if n, release, err = f.q.read(f.ctx, p); release {
f.conn.release(f.ctx)
}
+ if err != nil {
+ f.close(f.ctx, err)
+ }
return
}
@@ -65,6 +68,9 @@
if buf, release, err = f.q.get(f.ctx); release {
f.conn.release(f.ctx)
}
+ if err != nil {
+ f.close(f.ctx, err)
+ }
return
}
@@ -123,7 +129,7 @@
return size, done, f.conn.mp.writeMsg(f.ctx, d)
})
if alsoClose || err != nil {
- f.close(err)
+ f.close(f.ctx, err)
}
return sent, err
}
@@ -220,10 +226,18 @@
return f.ctx.Done()
}
-func (f *flw) close(err error) {
- f.q.close(f.ctx)
+func (f *flw) close(ctx *context.T, err error) {
+ f.q.close(ctx)
f.cancel()
-
- // TODO(mattr): maybe send a final close data message.
- // TODO(mattr): save the error to hand out later.
+ if verror.ErrorID(err) != ErrFlowClosedRemotely.ID {
+ // We want to try to send this message even if ctx is already canceled.
+ ctx, cancel := context.WithRootCancel(ctx)
+ err := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
+ return 0, true, f.conn.mp.writeMsg(ctx, &data{id: f.id, flags: closeFlag})
+ })
+ if err != nil {
+ ctx.Errorf("Could not send close flow message: %v", err)
+ }
+ cancel()
+ }
}
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index e1bc1f3..6cea426 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -5,8 +5,6 @@
package conn
import (
- "errors"
-
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc/version"
@@ -88,19 +86,15 @@
// tearDown is sent over the wire before a connection is closed.
type tearDown struct {
- Err error
+ Message string
}
func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
- var errBytes []byte
- if m.Err != nil {
- errBytes = []byte(m.Err.Error())
- }
- return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, errBytes})
+ return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Message)})
}
func (m *tearDown) read(ctx *context.T, data []byte) error {
if len(data) > 0 {
- m.Err = errors.New(string(data))
+ m.Message = string(data)
}
return nil
}
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
index ab715ae..2414c1c 100644
--- a/runtime/internal/flow/conn/message_test.go
+++ b/runtime/internal/flow/conn/message_test.go
@@ -5,7 +5,6 @@
package conn
import (
- "errors"
"reflect"
"testing"
@@ -82,7 +81,7 @@
func TestTearDown(t *testing.T) {
testMessages(t, []message{
- &tearDown{Err: errors.New("foobar")},
+ &tearDown{Message: "foobar"},
&tearDown{},
})
}
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index a2c09e6..cd61bc7 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -17,6 +17,7 @@
b, e int
size int
+ nbufs int
toRelease int
notify chan struct{}
}
@@ -32,9 +33,6 @@
}
func (r *readq) put(ctx *context.T, bufs [][]byte) error {
- if len(bufs) == 0 {
- return nil
- }
l := 0
for _, b := range bufs {
l += len(b)
@@ -53,13 +51,13 @@
if newSize > defaultBufferSize {
return NewErrCounterOverflow(ctx)
}
- if r.e == r.b {
- r.reserveLocked(len(bufs))
- }
+ newBufs := r.nbufs + len(bufs)
+ r.reserveLocked(newBufs)
for _, b := range bufs {
r.bufs[r.e] = b
r.e = (r.e + 1) % len(r.bufs)
}
+ r.nbufs = newBufs
if r.size == 0 {
select {
case r.notify <- struct{}{}:
@@ -82,6 +80,7 @@
if len(buf) > 0 {
r.bufs[r.b] = buf
} else {
+ r.nbufs -= 1
r.b = (r.b + 1) % len(r.bufs)
}
r.size -= n
@@ -98,6 +97,7 @@
out = r.bufs[r.b]
r.b = (r.b + 1) % len(r.bufs)
r.size -= len(out)
+ r.nbufs -= 1
r.toRelease += len(out)
return out, r.toRelease > defaultBufferSize/2, nil
}
@@ -131,16 +131,17 @@
}
func (r *readq) reserveLocked(n int) {
- needed := n + r.e - r.b
- if r.e < r.b {
- needed += len(r.bufs)
- }
- if needed < len(r.bufs) {
+ if n < len(r.bufs) {
return
}
- nb := make([][]byte, 2*needed)
- copied := copy(nb, r.bufs[r.b:])
- copied += copy(nb[n:], r.bufs[:r.e])
+ nb := make([][]byte, 2*n)
+ copied := 0
+ if r.e >= r.b {
+ copied = copy(nb, r.bufs[r.b:r.e])
+ } else {
+ copied = copy(nb, r.bufs[r.b:])
+ copied += copy(nb[copied:], r.bufs[:r.e])
+ }
r.bufs, r.b, r.e = nb, 0, copied
}
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 18cc267..5694830 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -96,34 +96,42 @@
return nil
}
-func setupConns(t *testing.T, ctx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *wire) {
- dmrw, amrw, w := newMRWPair(ctx)
+func setupConns(t *testing.T, dctx, actx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *wire) {
+ dmrw, amrw, w := newMRWPair(dctx)
versions := version.RPCVersionRange{Min: 3, Max: 5}
ep, err := v23.NewEndpoint("localhost:80")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
- d, err := NewDialed(ctx, dmrw, ep, ep, versions, fh(dflows), nil)
+ d, err := NewDialed(dctx, dmrw, ep, ep, versions, fh(dflows), nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
- a, err := NewAccepted(ctx, amrw, ep, security.Blessings{}, versions, fh(aflows))
+ a, err := NewAccepted(actx, amrw, ep, security.Blessings{}, versions, fh(aflows))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return d, a, w
}
-func setupFlow(t *testing.T, ctx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow) {
- dflows, aflows := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
- d, a, _ := setupConns(t, ctx, dflows, aflows)
+func setupFlow(t *testing.T, dctx, actx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow) {
+ d, accepted := setupFlows(t, dctx, actx, dialFromDialer, 1)
+ return d[0], accepted
+}
+
+func setupFlows(t *testing.T, dctx, actx *context.T, dialFromDialer bool, n int) (dialed []flow.Flow, accepted <-chan flow.Flow) {
+ dflows, aflows := make(chan flow.Flow, n), make(chan flow.Flow, n)
+ d, a, _ := setupConns(t, dctx, actx, dflows, aflows)
if !dialFromDialer {
d, a = a, d
aflows, dflows = dflows, aflows
}
- df, err := d.Dial(ctx)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
+ dialed = make([]flow.Flow, n)
+ for i := 0; i < n; i++ {
+ var err error
+ if dialed[i], err = d.Dial(dctx); err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
}
- return df, aflows
+ return dialed, aflows
}
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
index 2b6e64b..8221895 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -70,6 +70,15 @@
for {
next := w.fc.nextWorkerLocked()
+ if w.fc.writing == w {
+ // We're already schedule to write, but we should bail
+ // out if we're canceled.
+ select {
+ case <-ctx.Done():
+ err = ctx.Err()
+ default:
+ }
+ }
for w.fc.writing != w && err == nil {
w.fc.mu.Unlock()
if next != nil {
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 3db1610..26b7e91 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -84,6 +84,10 @@
vcCache *vc.VCCache
+ wg sync.WaitGroup
+ mu sync.Mutex
+ closed bool
+
dc vc.DischargeClient
}
@@ -93,8 +97,7 @@
c := &client{
streamMgr: streamMgr,
ns: ns,
-
- vcCache: vc.NewVCCache(),
+ vcCache: vc.NewVCCache(),
}
ipNets, err := ipNetworks()
if err != nil {
@@ -346,6 +349,7 @@
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+ defer c.wg.Done()
status := &serverStatus{index: index, server: server}
var span vtrace.Span
ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
@@ -354,7 +358,6 @@
ch <- status
span.Finish()
}()
-
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{
Name: suberrName(server, name, method),
@@ -485,6 +488,14 @@
// other while manipulating their copy of the options.
vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
copy(vcOptsCopy, vcOpts)
+ c.mu.Lock()
+ if c.closed {
+ c.mu.Unlock()
+ return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
+ }
+ c.wg.Add(1)
+ c.mu.Unlock()
+
go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
}
@@ -734,9 +745,13 @@
func (c *client) Close() {
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ c.mu.Lock()
+ c.closed = true
+ c.mu.Unlock()
for _, v := range c.vcCache.Close() {
c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
}
+ c.wg.Wait()
}
// flowClient implements the RPC client-side protocol for a single RPC, over a
diff --git a/runtime/internal/rpc/full_test.go b/runtime/internal/rpc/full_test.go
index 56fcbdf..9c26e2b 100644
--- a/runtime/internal/rpc/full_test.go
+++ b/runtime/internal/rpc/full_test.go
@@ -20,7 +20,6 @@
"time"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
@@ -36,6 +35,7 @@
"v.io/v23/verror"
"v.io/v23/vtrace"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
"v.io/x/ref/runtime/internal/lib/publisher"
"v.io/x/ref/runtime/internal/lib/websocket"
diff --git a/runtime/internal/rpc/pubsub.go b/runtime/internal/rpc/pubsub.go
index 7bfe46c..6d2ad87 100644
--- a/runtime/internal/rpc/pubsub.go
+++ b/runtime/internal/rpc/pubsub.go
@@ -7,7 +7,7 @@
import (
"net"
- "v.io/x/lib/pubsub"
+ "v.io/x/ref/lib/pubsub"
)
// NewAddAddrsSetting creates the Setting to be sent to Listen to inform
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index c34a728..1e30282 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -15,7 +15,6 @@
"time"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
@@ -32,6 +31,7 @@
"v.io/v23/vtrace"
"v.io/x/ref/lib/apilog"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
"v.io/x/ref/runtime/internal/lib/publisher"
inaming "v.io/x/ref/runtime/internal/naming"
diff --git a/runtime/internal/rpc/server_test.go b/runtime/internal/rpc/server_test.go
index b8b5bb2..7263415 100644
--- a/runtime/internal/rpc/server_test.go
+++ b/runtime/internal/rpc/server_test.go
@@ -12,7 +12,6 @@
"time"
"v.io/x/lib/netstate"
- "v.io/x/lib/pubsub"
"v.io/x/lib/set"
"v.io/v23"
@@ -23,6 +22,7 @@
"v.io/v23/security"
"v.io/v23/verror"
+ "v.io/x/ref/lib/pubsub"
inaming "v.io/x/ref/runtime/internal/naming"
imanager "v.io/x/ref/runtime/internal/rpc/stream/manager"
tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
diff --git a/runtime/internal/rpc/stream/manager/manager.go b/runtime/internal/rpc/stream/manager/manager.go
index e5aa37d..1b63a9f 100644
--- a/runtime/internal/rpc/stream/manager/manager.go
+++ b/runtime/internal/rpc/stream/manager/manager.go
@@ -113,6 +113,11 @@
return "", "", verror.New(stream.ErrResolveFailed, nil, verror.New(errUnknownNetwork, nil, network))
}
+type dialResult struct {
+ conn net.Conn
+ err error
+}
+
// FindOrDialVIF returns the network connection (VIF) to the provided address
// from the cache in the manager. If not already present in the cache, a new
// connection will be created using net.Dial.
@@ -146,7 +151,20 @@
defer unblock()
ctx.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
- conn, err := dial(d, network, address, timeout)
+
+ ch := make(chan *dialResult)
+ go func() {
+ conn, err := dial(d, network, address, timeout)
+ ch <- &dialResult{conn, err}
+ }()
+
+ var conn net.Conn
+ select {
+ case result := <-ch:
+ conn, err = result.conn, result.err
+ case <-ctx.Done():
+ return nil, verror.New(stream.ErrDialFailed, ctx, err)
+ }
if err != nil {
return nil, err
}
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 2e5d5ab..dac4bed 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -14,10 +14,10 @@
"time"
"v.io/x/lib/metadata"
- "v.io/x/lib/pubsub"
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/i18n"
"v.io/v23/namespace"
"v.io/v23/naming"
@@ -30,8 +30,10 @@
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/apilog"
"v.io/x/ref/lib/flags"
+ "v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
_ "v.io/x/ref/lib/stats/sysstats"
+ "v.io/x/ref/runtime/internal/flow/manager"
"v.io/x/ref/runtime/internal/lib/dependency"
inaming "v.io/x/ref/runtime/internal/naming"
inamespace "v.io/x/ref/runtime/internal/naming/namespace"
@@ -51,6 +53,7 @@
backgroundKey
reservedNameKey
listenKey
+ flowManagerKey
// initKey is used to store values that are only set at init time.
initKey
@@ -169,6 +172,15 @@
if err != nil {
return nil, nil, nil, err
}
+
+ // Add the flow.Manager to the context.
+ // TODO(suharshs): Once the client and server use the flow.Manager we will need
+ // manage those dependencies (exactly as we are doing with stream.Manager)
+ ctx, err = r.setNewFlowManager(ctx)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
r.ctx = ctx
return r, r.WithBackgroundContext(ctx), r.shutdown, nil
}
@@ -295,6 +307,23 @@
return sm, nil
}
+func newFlowManager(ctx *context.T) (flow.Manager, error) {
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ return nil, err
+ }
+ return manager.New(ctx, rid), nil
+}
+
+func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, error) {
+ fm, err := newFlowManager(ctx)
+ if err != nil {
+ return nil, err
+ }
+ newctx := context.WithValue(ctx, flowManagerKey, fm)
+ return newctx, err
+}
+
func (r *Runtime) setNewStreamManager(ctx *context.T) (*context.T, error) {
sm, err := newStreamManager(ctx)
if err != nil {
@@ -349,6 +378,9 @@
if newctx, err = r.setNewStreamManager(newctx); err != nil {
return ctx, err
}
+ if newctx, err = r.setNewFlowManager(newctx); err != nil {
+ return ctx, err
+ }
if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
return ctx, err
}
@@ -487,3 +519,11 @@
}
return nil
}
+
+func (*Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
+ // nologcall
+ if d, ok := ctx.Value(flowManagerKey).(flow.Manager); ok {
+ return d
+ }
+ return nil
+}
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index 44cb603..a8753fe 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -152,3 +152,13 @@
}
}
+
+func TestFlowManager(t *testing.T) {
+ r, ctx, shutdown := initForTest(t)
+ defer shutdown()
+
+ if man := r.ExperimentalGetFlowManager(ctx); man == nil {
+ t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
+ }
+
+}
diff --git a/services/internal/packages/packages.go b/services/internal/packages/packages.go
index 3f9af83..1656401 100644
--- a/services/internal/packages/packages.go
+++ b/services/internal/packages/packages.go
@@ -143,6 +143,7 @@
if err != nil {
return err
}
+ fh.Method = zip.Deflate
fh.Name, _ = filepath.Rel(sourceDir, path)
hdr, err := w.CreateHeader(fh)
if err != nil {
diff --git a/services/mounttable/mounttablelib/neighborhood.go b/services/mounttable/mounttablelib/neighborhood.go
index 75488c2..20997cf 100644
--- a/services/mounttable/mounttablelib/neighborhood.go
+++ b/services/mounttable/mounttablelib/neighborhood.go
@@ -39,10 +39,6 @@
const addressPrefix = "address:"
-// AnonymousNeighbor is the neighborhood host name to use if you want to be a client of the neighborhood namespace
-// without appearing in it.
-const AnonymousNeighbor = "_"
-
// neighborhood defines a set of machines on the same multicast media.
type neighborhood struct {
mdns *mdns.MDNS
@@ -108,20 +104,30 @@
}
// Start up MDNS, subscribe to the vanadium service, and add us as a vanadium service provider.
- mdns, err := mdns.NewMDNS(host, "", "", loopback, false)
+ m, err := mdns.NewMDNS(host, "", "", loopback, false)
if err != nil {
- logger.Global().Errorf("mdns startup failed: %s", err)
- return nil, err
+ // The name may not have been unique. Try one more time with a unique
+ // name. NewMDNS will replace the "()" with "(hardware mac address)".
+ if len(host) > 0 {
+ m, err = mdns.NewMDNS(host+"()", "", "", loopback, false)
+ }
+ if err != nil {
+ logger.Global().Errorf("mdns startup failed: %s", err)
+ return nil, err
+ }
}
logger.Global().VI(2).Infof("listening for service vanadium on port %d", port)
- mdns.SubscribeToService("vanadium")
- if host != AnonymousNeighbor {
- mdns.AddService("vanadium", "", port, txt...)
+ m.SubscribeToService("vanadium")
+ if len(host) > 0 {
+ m.AddService("vanadium", "", port, txt...)
}
+
+ // A small sleep to allow the world to learn about us and vice versa. Not
+ // necessary but helpful.
time.Sleep(50 * time.Millisecond)
nh := &neighborhood{
- mdns: mdns,
+ mdns: m,
}
// Watch the network configuration so that we can make MDNS reattach to
diff --git a/services/mounttable/mounttablelib/versionedpermissions.go b/services/mounttable/mounttablelib/versionedpermissions.go
index 50643d9..1c6759a 100644
--- a/services/mounttable/mounttablelib/versionedpermissions.go
+++ b/services/mounttable/mounttablelib/versionedpermissions.go
@@ -143,7 +143,13 @@
// Create name and add the Permissions map to it.
n, err := mt.findNode(ctx, nil, elems, true, nil, nil)
- if n != nil || err == nil {
+ if err != nil {
+ ctx.Errorf("skipping node for %v; error: %v", elems, err)
+ }
+ if n == nil {
+ continue
+ }
+ if err == nil {
ctx.VI(2).Infof("added perms %v to %s", perms, name)
if isPattern {
n.permsTemplate = perms
@@ -151,9 +157,6 @@
n.vPerms, _ = n.vPerms.Set(nil, "", perms)
n.explicitPermissions = true
}
- } else {
- ctx.Errorf("skipping node for %v; error: %v", elems, err)
- continue
}
n.parent.Unlock()
n.Unlock()