ref/runtime/internal/flow: Move the conncache back into the manager package.
Change-Id: Id479ee2a219ee8b1cfb3626ad6094acf62b00c66
diff --git a/runtime/internal/flow/conn/close_test.go b/runtime/internal/flow/conn/close_test.go
index fd73c50..8e65b46 100644
--- a/runtime/internal/flow/conn/close_test.go
+++ b/runtime/internal/flow/conn/close_test.go
@@ -21,7 +21,7 @@
d.Close(ctx, fmt.Errorf("Closing randomly."))
<-d.Closed()
<-a.Closed()
- if !w.isClosed() {
+ if !w.IsClosed() {
t.Errorf("The connection should be closed")
}
}
@@ -33,7 +33,7 @@
a.Close(ctx, fmt.Errorf("Closing randomly."))
<-a.Closed()
<-d.Closed()
- if !w.isClosed() {
+ if !w.IsClosed() {
t.Errorf("The connection should be closed")
}
}
@@ -42,7 +42,7 @@
ctx, shutdown := v23.Init()
defer shutdown()
d, a, w := setupConns(t, ctx, ctx, nil, nil)
- w.close()
+ w.Close()
<-a.Closed()
<-d.Closed()
}
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 40e5c0f..7e18e71 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -21,12 +21,10 @@
UnknownControlMsg(cmd byte) {"en": "unknown control command{:cmd}."}
UnexpectedMsg(typ string) {"en": "unexpected message type{:typ}."}
ConnectionClosed() {"en": "connection closed."}
- ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
ConnClosedRemotely(msg string) {"en": "connection closed remotely{:msg}."}
FlowClosedRemotely() {"en": "flow closed remotely."}
Send(typ, dest string, err error) {"en": "failure sending {typ} message to {dest}{:err}."}
Recv(src string, err error) {"en": "error reading from {src}{:err}"}
- CacheClosed() {"en":"cache is closed"}
CounterOverflow() {"en": "A remote process has sent more data than allowed."}
BlessingsFlowClosed() {"en": "The blessings flow was closed."}
InvalidChannelBinding() {"en": "The channel binding was invalid."}
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index ed75b03..f649ba4 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -15,27 +15,25 @@
)
var (
- ErrInvalidMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
- ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd {3} and size {4} failed decoding at field {5}{:6}.")
- ErrInvalidSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidSetupOption", verror.NoRetry, "{1:}{2:} setup option{:3} failed decoding at field{:4}.")
- ErrMissingSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.MissingSetupOption", verror.NoRetry, "{1:}{2:} missing required setup option{:3}.")
- ErrUnknownSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownSetupOption", verror.NoRetry, "{1:}{2:} unknown setup option{:3}.")
- ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
- ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
- ErrUnexpectedMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
- ErrConnectionClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
- ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
- ErrConnClosedRemotely = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnClosedRemotely", verror.NoRetry, "{1:}{2:} connection closed remotely{:3}.")
- ErrFlowClosedRemotely = verror.Register("v.io/x/ref/runtime/internal/flow/conn.FlowClosedRemotely", verror.NoRetry, "{1:}{2:} flow closed remotely.")
- ErrSend = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
- ErrRecv = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
- ErrCacheClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
- ErrCounterOverflow = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
- ErrBlessingsFlowClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.BlessingsFlowClosed", verror.NoRetry, "{1:}{2:} The blessings flow was closed.")
- ErrInvalidChannelBinding = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidChannelBinding", verror.NoRetry, "{1:}{2:} The channel binding was invalid.")
- ErrNoPublicKey = verror.Register("v.io/x/ref/runtime/internal/flow/conn.NoPublicKey", verror.NoRetry, "{1:}{2:} No public key was received by the remote end.")
- ErrDialingNonServer = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
- ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
+ ErrInvalidMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+ ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd {3} and size {4} failed decoding at field {5}{:6}.")
+ ErrInvalidSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidSetupOption", verror.NoRetry, "{1:}{2:} setup option{:3} failed decoding at field{:4}.")
+ ErrMissingSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.MissingSetupOption", verror.NoRetry, "{1:}{2:} missing required setup option{:3}.")
+ ErrUnknownSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownSetupOption", verror.NoRetry, "{1:}{2:} unknown setup option{:3}.")
+ ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+ ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+ ErrUnexpectedMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
+ ErrConnectionClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
+ ErrConnClosedRemotely = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnClosedRemotely", verror.NoRetry, "{1:}{2:} connection closed remotely{:3}.")
+ ErrFlowClosedRemotely = verror.Register("v.io/x/ref/runtime/internal/flow/conn.FlowClosedRemotely", verror.NoRetry, "{1:}{2:} flow closed remotely.")
+ ErrSend = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
+ ErrRecv = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
+ ErrCounterOverflow = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
+ ErrBlessingsFlowClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.BlessingsFlowClosed", verror.NoRetry, "{1:}{2:} The blessings flow was closed.")
+ ErrInvalidChannelBinding = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidChannelBinding", verror.NoRetry, "{1:}{2:} The channel binding was invalid.")
+ ErrNoPublicKey = verror.Register("v.io/x/ref/runtime/internal/flow/conn.NoPublicKey", verror.NoRetry, "{1:}{2:} No public key was received by the remote end.")
+ ErrDialingNonServer = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
+ ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
)
func init() {
@@ -48,12 +46,10 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnexpectedMsg.ID), "{1:}{2:} unexpected message type{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnectionClosed.ID), "{1:}{2:} connection closed.")
- i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnClosedRemotely.ID), "{1:}{2:} connection closed remotely{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFlowClosedRemotely.ID), "{1:}{2:} flow closed remotely.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrSend.ID), "{1:}{2:} failure sending {3} message to {4}{:5}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrRecv.ID), "{1:}{2:} error reading from {3}{:4}")
- i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCounterOverflow.ID), "{1:}{2:} A remote process has sent more data than allowed.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBlessingsFlowClosed.ID), "{1:}{2:} The blessings flow was closed.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidChannelBinding.ID), "{1:}{2:} The channel binding was invalid.")
@@ -107,11 +103,6 @@
return verror.New(ErrConnectionClosed, ctx)
}
-// NewErrConnKilledToFreeResources returns an error with the ErrConnKilledToFreeResources ID.
-func NewErrConnKilledToFreeResources(ctx *context.T) error {
- return verror.New(ErrConnKilledToFreeResources, ctx)
-}
-
// NewErrConnClosedRemotely returns an error with the ErrConnClosedRemotely ID.
func NewErrConnClosedRemotely(ctx *context.T, msg string) error {
return verror.New(ErrConnClosedRemotely, ctx, msg)
@@ -132,11 +123,6 @@
return verror.New(ErrRecv, ctx, src, err)
}
-// NewErrCacheClosed returns an error with the ErrCacheClosed ID.
-func NewErrCacheClosed(ctx *context.T) error {
- return verror.New(ErrCacheClosed, ctx)
-}
-
// NewErrCounterOverflow returns an error with the ErrCounterOverflow ID.
func NewErrCounterOverflow(ctx *context.T) error {
return verror.New(ErrCounterOverflow, ctx)
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
index 81ceb64..2eefacf 100644
--- a/runtime/internal/flow/conn/message_test.go
+++ b/runtime/internal/flow/conn/message_test.go
@@ -12,6 +12,7 @@
"v.io/v23/context"
"v.io/v23/rpc/version"
_ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
)
@@ -49,7 +50,7 @@
}
func testMessages(t *testing.T, ctx *context.T, cases []message) {
- w, r, _ := newMRWPair(ctx)
+ w, r, _ := flowtest.NewMRWPair(ctx)
wp, rp := newMessagePipe(w), newMessagePipe(r)
for _, want := range cases {
ch := make(chan struct{})
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 140515a..40bb0e2 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -5,8 +5,6 @@
package conn
import (
- "io"
- "sync"
"testing"
"v.io/v23"
@@ -15,89 +13,9 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
- "v.io/x/ref/internal/logger"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
)
-type wire struct {
- ctx *context.T
- mu sync.Mutex
- c *sync.Cond
- closed bool
-}
-
-func (w *wire) close() {
- w.mu.Lock()
- w.closed = true
- w.c.Broadcast()
- w.mu.Unlock()
-}
-
-func (w *wire) isClosed() bool {
- w.mu.Lock()
- c := w.closed
- w.mu.Unlock()
- return c
-}
-
-type mRW struct {
- wire *wire
- in []byte
- peer *mRW
-}
-
-func newMRWPair(ctx *context.T) (MsgReadWriteCloser, MsgReadWriteCloser, *wire) {
- w := &wire{ctx: ctx}
- w.c = sync.NewCond(&w.mu)
- a, b := &mRW{wire: w}, &mRW{wire: w}
- a.peer, b.peer = b, a
- return a, b, w
-}
-
-func (f *mRW) WriteMsg(data ...[]byte) (int, error) {
- buf := []byte{}
- for _, d := range data {
- buf = append(buf, d...)
- }
- logbuf := buf
- if len(buf) > 128 {
- logbuf = buf[:128]
- }
- logger.Global().VI(2).Infof("Writing %d bytes to the wire: %#v", len(buf), logbuf)
- defer f.wire.mu.Unlock()
- f.wire.mu.Lock()
- for f.peer.in != nil && !f.wire.closed {
- f.wire.c.Wait()
- }
- if f.wire.closed {
- return 0, io.EOF
- }
- f.peer.in = buf
- f.wire.c.Broadcast()
- return len(buf), nil
-}
-func (f *mRW) ReadMsg() (buf []byte, err error) {
- defer f.wire.mu.Unlock()
- f.wire.mu.Lock()
- for f.in == nil && !f.wire.closed {
- f.wire.c.Wait()
- }
- if f.wire.closed {
- return nil, io.EOF
- }
- buf, f.in = f.in, nil
- f.wire.c.Broadcast()
- logbuf := buf
- if len(buf) > 128 {
- logbuf = buf[:128]
- }
- logger.Global().VI(2).Infof("Reading %d bytes from the wire: %#v", len(buf), logbuf)
- return buf, nil
-}
-func (f *mRW) Close() error {
- f.wire.close()
- return nil
-}
-
type fh chan<- flow.Flow
func (fh fh) HandleFlow(f flow.Flow) error {
@@ -108,8 +26,8 @@
return nil
}
-func setupConns(t *testing.T, dctx, actx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *wire) {
- dmrw, amrw, w := newMRWPair(dctx)
+func setupConns(t *testing.T, dctx, actx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *flowtest.Wire) {
+ dmrw, amrw, w := flowtest.NewMRWPair(dctx)
versions := version.RPCVersionRange{Min: 3, Max: 5}
ep, err := v23.NewEndpoint("localhost:80")
if err != nil {
diff --git a/runtime/internal/flow/flowtest/flowtest.go b/runtime/internal/flow/flowtest/flowtest.go
new file mode 100644
index 0000000..652e8b4
--- /dev/null
+++ b/runtime/internal/flow/flowtest/flowtest.go
@@ -0,0 +1,93 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package flowtest
+
+import (
+ "io"
+ "sync"
+
+ "v.io/v23/context"
+ "v.io/x/ref/internal/logger"
+)
+
+type Wire struct {
+ ctx *context.T
+ mu sync.Mutex
+ c *sync.Cond
+ closed bool
+}
+
+func (w *Wire) Close() {
+ w.mu.Lock()
+ w.closed = true
+ w.c.Broadcast()
+ w.mu.Unlock()
+}
+
+func (w *Wire) IsClosed() bool {
+ w.mu.Lock()
+ c := w.closed
+ w.mu.Unlock()
+ return c
+}
+
+type MRW struct {
+ wire *Wire
+ in []byte
+ peer *MRW
+}
+
+func NewMRWPair(ctx *context.T) (*MRW, *MRW, *Wire) {
+ w := &Wire{ctx: ctx}
+ w.c = sync.NewCond(&w.mu)
+ a, b := &MRW{wire: w}, &MRW{wire: w}
+ a.peer, b.peer = b, a
+ return a, b, w
+}
+
+func (f *MRW) WriteMsg(data ...[]byte) (int, error) {
+ buf := []byte{}
+ for _, d := range data {
+ buf = append(buf, d...)
+ }
+ logbuf := buf
+ if len(buf) > 128 {
+ logbuf = buf[:128]
+ }
+ logger.Global().VI(2).Infof("Writing %d bytes to the wire: %#v", len(buf), logbuf)
+ defer f.wire.mu.Unlock()
+ f.wire.mu.Lock()
+ for f.peer.in != nil && !f.wire.closed {
+ f.wire.c.Wait()
+ }
+ if f.wire.closed {
+ return 0, io.EOF
+ }
+ f.peer.in = buf
+ f.wire.c.Broadcast()
+ return len(buf), nil
+}
+func (f *MRW) ReadMsg() (buf []byte, err error) {
+ defer f.wire.mu.Unlock()
+ f.wire.mu.Lock()
+ for f.in == nil && !f.wire.closed {
+ f.wire.c.Wait()
+ }
+ if f.wire.closed {
+ return nil, io.EOF
+ }
+ buf, f.in = f.in, nil
+ f.wire.c.Broadcast()
+ logbuf := buf
+ if len(buf) > 128 {
+ logbuf = buf[:128]
+ }
+ logger.Global().VI(2).Infof("Reading %d bytes from the wire: %#v", len(buf), logbuf)
+ return buf, nil
+}
+func (f *MRW) Close() error {
+ f.wire.Close()
+ return nil
+}
diff --git a/runtime/internal/flow/conn/conncache.go b/runtime/internal/flow/manager/conncache.go
similarity index 94%
rename from runtime/internal/flow/conn/conncache.go
rename to runtime/internal/flow/manager/conncache.go
index ae77f97..cc5943a 100644
--- a/runtime/internal/flow/conn/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package conn
+package manager
import (
"strings"
@@ -10,6 +10,7 @@
"v.io/v23/context"
"v.io/v23/naming"
+ "v.io/x/ref/runtime/internal/flow/conn"
)
// ConnCache is a cache of Conns keyed by (protocol, address, blessingNames)
@@ -26,7 +27,7 @@
}
type connEntry struct {
- conn *Conn
+ conn *conn.Conn
rid naming.RoutingID
addrKey string
next, prev *connEntry
@@ -57,7 +58,7 @@
// be the same as the arguments provided to ReservedFind.
// All new ReservedFind calls for the (protocol, address, blessings) will Block
// until the corresponding Unreserve call is made.
-func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*Conn, error) {
+func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*conn.Conn, error) {
k := key(protocol, address, blessingNames)
defer c.mu.Unlock()
c.mu.Lock()
@@ -93,7 +94,7 @@
// Insert adds conn to the cache.
// An error will be returned iff the cache has been closed.
-func (c *ConnCache) Insert(conn *Conn) error {
+func (c *ConnCache) Insert(conn *conn.Conn) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
@@ -155,7 +156,7 @@
// FindWithRoutingID returns a Conn where the remote end of the connection
// is identified by the provided rid. nil is returned if there is no such Conn.
// FindWithRoutingID will return an error iff the cache is closed.
-func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*Conn, error) {
+func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*conn.Conn, error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
@@ -206,9 +207,9 @@
prev.next = c
}
-func isClosed(conn *Conn) bool {
+func isClosed(conn *conn.Conn) bool {
select {
- case <-conn.closed:
+ case <-conn.Closed():
return true
default:
return false
diff --git a/runtime/internal/flow/conn/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
similarity index 84%
rename from runtime/internal/flow/conn/conncache_test.go
rename to runtime/internal/flow/manager/conncache_test.go
index 2686b0b..80892a7 100644
--- a/runtime/internal/flow/conn/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package conn
+package manager
import (
"strconv"
@@ -12,7 +12,8 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc/version"
-
+ connpackage "v.io/x/ref/runtime/internal/flow/conn"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
inaming "v.io/x/ref/runtime/internal/naming"
)
@@ -73,8 +74,8 @@
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
// Looking it up again should block until a matching Unreserve call is made.
- ch := make(chan *Conn, 1)
- go func(ch chan *Conn) {
+ ch := make(chan *connpackage.Conn, 1)
+ go func(ch chan *connpackage.Conn) {
conn, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings)
if err != nil {
t.Fatal(err)
@@ -150,10 +151,11 @@
}
}
for _, conn := range conns[:7] {
- if got, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames()); err != nil || got != conn {
+ rep := conn.RemoteEndpoint()
+ if got, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames()); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
- c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+ c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
}
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
@@ -187,7 +189,8 @@
}
}
for _, conn := range conns[:7] {
- if got, err := c.FindWithRoutingID(conn.remote.RoutingID()); err != nil || got != conn {
+ rep := conn.RemoteEndpoint()
+ if got, err := c.FindWithRoutingID(rep.RoutingID()); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
}
@@ -215,13 +218,14 @@
}
}
-func isInCache(t *testing.T, c *ConnCache, conn *Conn) bool {
- rfconn, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+func isInCache(t *testing.T, c *ConnCache, conn *connpackage.Conn) bool {
+ rep := conn.RemoteEndpoint()
+ rfconn, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
if err != nil {
t.Errorf("got %v, want %v, err: %v", rfconn, conn, err)
}
- c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
- ridconn, err := c.FindWithRoutingID(conn.remote.RoutingID())
+ c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
+ ridconn, err := c.FindWithRoutingID(rep.RoutingID())
if err != nil {
t.Errorf("got %v, want %v, err: %v", ridconn, conn, err)
}
@@ -243,8 +247,8 @@
return size
}
-func nConns(t *testing.T, ctx *context.T, n int) []*Conn {
- conns := make([]*Conn, n)
+func nConns(t *testing.T, ctx *context.T, n int) []*connpackage.Conn {
+ conns := make([]*connpackage.Conn, n)
for i := 0; i < n; i++ {
conns[i] = makeConn(t, ctx, &inaming.Endpoint{
Protocol: strconv.Itoa(i),
@@ -254,19 +258,21 @@
return conns
}
-func makeConn(t *testing.T, ctx *context.T, ep naming.Endpoint) *Conn {
- dmrw, amrw, _ := newMRWPair(ctx)
- dch := make(chan *Conn)
- ach := make(chan *Conn)
+func makeConn(t *testing.T, ctx *context.T, ep naming.Endpoint) *connpackage.Conn {
+ dmrw, amrw, _ := flowtest.NewMRWPair(ctx)
+ dch := make(chan *connpackage.Conn)
+ ach := make(chan *connpackage.Conn)
go func() {
- d, err := NewDialed(ctx, dmrw, ep, ep, version.RPCVersionRange{1, 5}, nil)
+ d, err := connpackage.NewDialed(ctx, dmrw, ep, ep,
+ version.RPCVersionRange{Min: 1, Max: 5}, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
dch <- d
}()
go func() {
- a, err := NewAccepted(ctx, amrw, ep, version.RPCVersionRange{1, 5}, nil)
+ a, err := connpackage.NewAccepted(ctx, amrw, ep,
+ version.RPCVersionRange{Min: 1, Max: 5}, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index 3e7b57e..463e2cb 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -14,4 +14,6 @@
UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
ManagerClosed() {"en": "manager is already closed"}
AcceptFailed(err error) {"en": "accept failed{:err}"}
+ CacheClosed() {"en":"cache is closed"}
+ ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
)
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 741c22a..bfa716e 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -15,10 +15,12 @@
)
var (
- ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
- ErrUnknownProtocol = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
- ErrManagerClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
- ErrAcceptFailed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
+ ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+ ErrUnknownProtocol = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+ ErrManagerClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+ ErrAcceptFailed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
+ ErrCacheClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
+ ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
)
func init() {
@@ -26,6 +28,8 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
}
// NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
@@ -47,3 +51,13 @@
func NewErrAcceptFailed(ctx *context.T, err error) error {
return verror.New(ErrAcceptFailed, ctx, err)
}
+
+// NewErrCacheClosed returns an error with the ErrCacheClosed ID.
+func NewErrCacheClosed(ctx *context.T) error {
+ return verror.New(ErrCacheClosed, ctx)
+}
+
+// NewErrConnKilledToFreeResources returns an error with the ErrConnKilledToFreeResources ID.
+func NewErrConnKilledToFreeResources(ctx *context.T) error {
+ return verror.New(ErrConnKilledToFreeResources, ctx)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index ac26aa1..a3394d8 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,7 +26,7 @@
rid naming.RoutingID
closed <-chan struct{}
q *upcqueue.T
- cache *conn.ConnCache
+ cache *ConnCache
mu *sync.Mutex
listenEndpoints []naming.Endpoint
@@ -37,7 +37,7 @@
rid: rid,
closed: ctx.Done(),
q: upcqueue.New(),
- cache: conn.NewConnCache(),
+ cache: NewConnCache(),
mu: &sync.Mutex{},
}
return m