Merge "runtime/internal/rpc: Make ref package tests pass with XServers state."
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ecbd8a9..bbe5862 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -47,7 +47,6 @@
 type Conn struct {
 	fc                     *flowcontrol.FlowController
 	mp                     *messagePipe
-	handler                FlowHandler
 	version                version.RPCVersion
 	lBlessings, rBlessings security.Blessings
 	local, remote          naming.Endpoint
@@ -56,6 +55,7 @@
 	loopWG                 sync.WaitGroup
 
 	mu             sync.Mutex
+	handler        FlowHandler
 	nextFid        uint64
 	flows          map[uint64]*flw
 	dischargeTimer *time.Timer
@@ -263,16 +263,17 @@
 		return NewErrConnClosedRemotely(ctx, msg.Message)
 
 	case *message.OpenFlow:
+		c.mu.Lock()
 		if c.handler == nil {
 			return NewErrUnexpectedMsg(ctx, "openFlow")
 		}
-		c.mu.Lock()
+		handler := c.handler
 		f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true)
 		f.worker.Release(ctx, int(msg.InitialCounters))
 		c.toRelease[msg.ID] = defaultBufferSize
 		c.borrowing[msg.ID] = true
 		c.mu.Unlock()
-		c.handler.HandleFlow(f)
+		handler.HandleFlow(f)
 		if err := f.q.put(ctx, msg.Payload); err != nil {
 			return err
 		}
@@ -351,3 +352,13 @@
 	_, ok := c.mp.rw.(*flw)
 	return ok
 }
+
+func (c *Conn) UpdateFlowHandler(ctx *context.T, handler FlowHandler) error {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	if c.handler == nil && handler != nil {
+		return NewErrUpdatingNilFlowHandler(ctx)
+	}
+	c.handler = handler
+	return nil
+}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index c9ba7c0..9c5ce48 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -14,7 +14,9 @@
 
 	"v.io/v23"
 	"v.io/v23/flow"
+	"v.io/v23/rpc/version"
 	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/internal/flow/flowtest"
 	"v.io/x/ref/test"
 	"v.io/x/ref/test/goroutines"
 )
@@ -82,3 +84,57 @@
 	go doWrite(t, af, randData)
 	wg.Wait()
 }
+
+func TestUpdateFlowHandler(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
+	ctx, shutdown := v23.Init()
+
+	dmrw, amrw, _ := flowtest.NewMRWPair(ctx)
+	versions := version.RPCVersionRange{Min: 3, Max: 5}
+	ep, err := v23.NewEndpoint("localhost:80")
+	if err != nil {
+		t.Fatal(err)
+	}
+	dch, ach := make(chan *Conn), make(chan *Conn)
+	q1, q2 := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
+	fh1, fh2 := fh(q1), fh(q2)
+	go func() {
+		d, err := NewDialed(ctx, dmrw, ep, ep, versions, nil)
+		if err != nil {
+			panic(err)
+		}
+		dch <- d
+	}()
+	go func() {
+		a, err := NewAccepted(ctx, amrw, ep, versions, fh1)
+		if err != nil {
+			panic(err)
+		}
+		ach <- a
+	}()
+	d, a := <-dch, <-ach
+	var f flow.Flow
+	if f, err = d.Dial(ctx, flowtest.BlessingsForPeer); err != nil {
+		t.Fatal(err)
+	}
+	// Write a byte to send the openFlow message.
+	if _, err := f.Write([]byte{'a'}); err != nil {
+		t.Fatal(err)
+	}
+	// The flow should be accepted in fh1.
+	<-q1
+	// After updating to fh2 the flow should be accepted in fh2.
+	a.UpdateFlowHandler(ctx, fh2)
+	if f, err = d.Dial(ctx, flowtest.BlessingsForPeer); err != nil {
+		t.Fatal(err)
+	}
+	// Write a byte to send the openFlow message.
+	if _, err := f.Write([]byte{'a'}); err != nil {
+		t.Fatal(err)
+	}
+	<-q2
+	shutdown()
+	d.Close(ctx, nil)
+	a.Close(ctx, nil)
+}
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 7cb60ae..42d092f 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -24,4 +24,5 @@
   NoPublicKey() {"en": "No public key was received by the remote end."}
   DialingNonServer() {"en": "You are attempting to dial on a connection with no remote server."}
   AcceptorBlessingsMissing() {"en": "The acceptor did not send blessings."}
+  UpdatingNilFlowHandler() {"en": "nil flowHandler cannot be updated to non-nil value."}
 )
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index 16e1438..ea7a30e 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -28,6 +28,7 @@
 	ErrNoPublicKey              = verror.Register("v.io/x/ref/runtime/internal/flow/conn.NoPublicKey", verror.NoRetry, "{1:}{2:} No public key was received by the remote end.")
 	ErrDialingNonServer         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
 	ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
+	ErrUpdatingNilFlowHandler   = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UpdatingNilFlowHandler", verror.NoRetry, "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
 )
 
 func init() {
@@ -44,6 +45,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNoPublicKey.ID), "{1:}{2:} No public key was received by the remote end.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDialingNonServer.ID), "{1:}{2:} You are attempting to dial on a connection with no remote server.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptorBlessingsMissing.ID), "{1:}{2:} The acceptor did not send blessings.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUpdatingNilFlowHandler.ID), "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
 }
 
 // NewErrMissingSetupOption returns an error with the ErrMissingSetupOption ID.
@@ -110,3 +112,8 @@
 func NewErrAcceptorBlessingsMissing(ctx *context.T) error {
 	return verror.New(ErrAcceptorBlessingsMissing, ctx)
 }
+
+// NewErrUpdatingNilFlowHandler returns an error with the ErrUpdatingNilFlowHandler ID.
+func NewErrUpdatingNilFlowHandler(ctx *context.T) error {
+	return verror.New(ErrUpdatingNilFlowHandler, ctx)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 5849a5c..bc15eb4 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -380,9 +380,6 @@
 		if err != nil {
 			return nil, flow.NewErrDialFailed(ctx, err)
 		}
-		// TODO(mattr): We should only pass a flowHandler to NewDialed if there
-		// is a server attached to this flow manager.  Perhaps we can signal
-		// "serving flow manager" by passing a 0 RID to non-serving flow managers?
 		c, err = conn.NewDialed(
 			ctx,
 			flowConn,
diff --git a/runtime/internal/rpc/benchmark/benchmark_test.go b/runtime/internal/rpc/benchmark/benchmark_test.go
index 3107ce6..aefc28c 100644
--- a/runtime/internal/rpc/benchmark/benchmark_test.go
+++ b/runtime/internal/rpc/benchmark/benchmark_test.go
@@ -112,7 +112,7 @@
 	var shutdown v23.Shutdown
 	ctx, shutdown = test.V23Init()
 
-	ctx, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
+	_, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
 	if err != nil {
 		ctx.Fatalf("NewServer failed: %v", err)
 	}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 0d026b9..d8b8702 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -551,7 +551,6 @@
 		dec:        vom.NewDecoderWithTypeDecoder(flow, typeDec),
 		discharges: make(map[string]security.Discharge),
 	}
-	// TODO(toddw): Add logic to create separate type flows!
 	return fs, nil
 }
 
@@ -831,7 +830,7 @@
 }
 func (fs *xflowServer) Server() rpc.Server {
 	//nologcall
-	return nil // TODO(toddw): Change return to rpc.Server
+	return fs.server
 }
 func (fs *xflowServer) Timestamp() time.Time {
 	//nologcall