Merge "runtime/internal/rpc: Make ref package tests pass with XServers state."
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ecbd8a9..bbe5862 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -47,7 +47,6 @@
type Conn struct {
fc *flowcontrol.FlowController
mp *messagePipe
- handler FlowHandler
version version.RPCVersion
lBlessings, rBlessings security.Blessings
local, remote naming.Endpoint
@@ -56,6 +55,7 @@
loopWG sync.WaitGroup
mu sync.Mutex
+ handler FlowHandler
nextFid uint64
flows map[uint64]*flw
dischargeTimer *time.Timer
@@ -263,16 +263,17 @@
return NewErrConnClosedRemotely(ctx, msg.Message)
case *message.OpenFlow:
+ c.mu.Lock()
if c.handler == nil {
return NewErrUnexpectedMsg(ctx, "openFlow")
}
- c.mu.Lock()
+ handler := c.handler
f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true)
f.worker.Release(ctx, int(msg.InitialCounters))
c.toRelease[msg.ID] = defaultBufferSize
c.borrowing[msg.ID] = true
c.mu.Unlock()
- c.handler.HandleFlow(f)
+ handler.HandleFlow(f)
if err := f.q.put(ctx, msg.Payload); err != nil {
return err
}
@@ -351,3 +352,13 @@
_, ok := c.mp.rw.(*flw)
return ok
}
+
+func (c *Conn) UpdateFlowHandler(ctx *context.T, handler FlowHandler) error {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ if c.handler == nil && handler != nil {
+ return NewErrUpdatingNilFlowHandler(ctx)
+ }
+ c.handler = handler
+ return nil
+}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index c9ba7c0..9c5ce48 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -14,7 +14,9 @@
"v.io/v23"
"v.io/v23/flow"
+ "v.io/v23/rpc/version"
_ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
"v.io/x/ref/test/goroutines"
)
@@ -82,3 +84,57 @@
go doWrite(t, af, randData)
wg.Wait()
}
+
+func TestUpdateFlowHandler(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
+ ctx, shutdown := v23.Init()
+
+ dmrw, amrw, _ := flowtest.NewMRWPair(ctx)
+ versions := version.RPCVersionRange{Min: 3, Max: 5}
+ ep, err := v23.NewEndpoint("localhost:80")
+ if err != nil {
+ t.Fatal(err)
+ }
+ dch, ach := make(chan *Conn), make(chan *Conn)
+ q1, q2 := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
+ fh1, fh2 := fh(q1), fh(q2)
+ go func() {
+ d, err := NewDialed(ctx, dmrw, ep, ep, versions, nil)
+ if err != nil {
+ panic(err)
+ }
+ dch <- d
+ }()
+ go func() {
+ a, err := NewAccepted(ctx, amrw, ep, versions, fh1)
+ if err != nil {
+ panic(err)
+ }
+ ach <- a
+ }()
+ d, a := <-dch, <-ach
+ var f flow.Flow
+ if f, err = d.Dial(ctx, flowtest.BlessingsForPeer); err != nil {
+ t.Fatal(err)
+ }
+ // Write a byte to send the openFlow message.
+ if _, err := f.Write([]byte{'a'}); err != nil {
+ t.Fatal(err)
+ }
+ // The flow should be accepted in fh1.
+ <-q1
+ // After updating to fh2 the flow should be accepted in fh2.
+ a.UpdateFlowHandler(ctx, fh2)
+ if f, err = d.Dial(ctx, flowtest.BlessingsForPeer); err != nil {
+ t.Fatal(err)
+ }
+ // Write a byte to send the openFlow message.
+ if _, err := f.Write([]byte{'a'}); err != nil {
+ t.Fatal(err)
+ }
+ <-q2
+ shutdown()
+ d.Close(ctx, nil)
+ a.Close(ctx, nil)
+}
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 7cb60ae..42d092f 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -24,4 +24,5 @@
NoPublicKey() {"en": "No public key was received by the remote end."}
DialingNonServer() {"en": "You are attempting to dial on a connection with no remote server."}
AcceptorBlessingsMissing() {"en": "The acceptor did not send blessings."}
+ UpdatingNilFlowHandler() {"en": "nil flowHandler cannot be updated to non-nil value."}
)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index 16e1438..ea7a30e 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -28,6 +28,7 @@
ErrNoPublicKey = verror.Register("v.io/x/ref/runtime/internal/flow/conn.NoPublicKey", verror.NoRetry, "{1:}{2:} No public key was received by the remote end.")
ErrDialingNonServer = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
+ ErrUpdatingNilFlowHandler = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UpdatingNilFlowHandler", verror.NoRetry, "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
)
func init() {
@@ -44,6 +45,7 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNoPublicKey.ID), "{1:}{2:} No public key was received by the remote end.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDialingNonServer.ID), "{1:}{2:} You are attempting to dial on a connection with no remote server.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptorBlessingsMissing.ID), "{1:}{2:} The acceptor did not send blessings.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUpdatingNilFlowHandler.ID), "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
}
// NewErrMissingSetupOption returns an error with the ErrMissingSetupOption ID.
@@ -110,3 +112,8 @@
func NewErrAcceptorBlessingsMissing(ctx *context.T) error {
return verror.New(ErrAcceptorBlessingsMissing, ctx)
}
+
+// NewErrUpdatingNilFlowHandler returns an error with the ErrUpdatingNilFlowHandler ID.
+func NewErrUpdatingNilFlowHandler(ctx *context.T) error {
+ return verror.New(ErrUpdatingNilFlowHandler, ctx)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 5849a5c..bc15eb4 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -380,9 +380,6 @@
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
- // TODO(mattr): We should only pass a flowHandler to NewDialed if there
- // is a server attached to this flow manager. Perhaps we can signal
- // "serving flow manager" by passing a 0 RID to non-serving flow managers?
c, err = conn.NewDialed(
ctx,
flowConn,
diff --git a/runtime/internal/rpc/benchmark/benchmark_test.go b/runtime/internal/rpc/benchmark/benchmark_test.go
index 3107ce6..aefc28c 100644
--- a/runtime/internal/rpc/benchmark/benchmark_test.go
+++ b/runtime/internal/rpc/benchmark/benchmark_test.go
@@ -112,7 +112,7 @@
var shutdown v23.Shutdown
ctx, shutdown = test.V23Init()
- ctx, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
+ _, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
if err != nil {
ctx.Fatalf("NewServer failed: %v", err)
}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 0d026b9..d8b8702 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -551,7 +551,6 @@
dec: vom.NewDecoderWithTypeDecoder(flow, typeDec),
discharges: make(map[string]security.Discharge),
}
- // TODO(toddw): Add logic to create separate type flows!
return fs, nil
}
@@ -831,7 +830,7 @@
}
func (fs *xflowServer) Server() rpc.Server {
//nologcall
- return nil // TODO(toddw): Change return to rpc.Server
+ return fs.server
}
func (fs *xflowServer) Timestamp() time.Time {
//nologcall