ref: Add ExperimentalWithNewFlowManager to the public API.

MultiPart: 1/2
Change-Id: I06b03e5cee1ec850ddd463dfd6332e988aa2e0c3
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index 85e57a2..7a561f3 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -48,3 +48,8 @@
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	panic("unimplemented")
 }
+
+func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	panic("unimplemented")
+}
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index dac4bed..2deb20a 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -176,7 +176,7 @@
 	// Add the flow.Manager to the context.
 	// TODO(suharshs): Once the client and server use the flow.Manager we will need
 	// manage those dependencies (exactly as we are doing with stream.Manager)
-	ctx, err = r.setNewFlowManager(ctx)
+	ctx, _, err = r.setNewFlowManager(ctx)
 	if err != nil {
 		return nil, nil, nil, err
 	}
@@ -315,13 +315,13 @@
 	return manager.New(ctx, rid), nil
 }
 
-func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, error) {
+func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
 	fm, err := newFlowManager(ctx)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 	newctx := context.WithValue(ctx, flowManagerKey, fm)
-	return newctx, err
+	return newctx, fm, nil
 }
 
 func (r *Runtime) setNewStreamManager(ctx *context.T) (*context.T, error) {
@@ -378,7 +378,7 @@
 	if newctx, err = r.setNewStreamManager(newctx); err != nil {
 		return ctx, err
 	}
-	if newctx, err = r.setNewFlowManager(newctx); err != nil {
+	if newctx, _, err = r.setNewFlowManager(newctx); err != nil {
 		return ctx, err
 	}
 	if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
@@ -527,3 +527,17 @@
 	}
 	return nil
 }
+
+func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	newctx, m, err := r.setNewFlowManager(ctx)
+	if err != nil {
+		return ctx, nil, err
+	}
+	// Create a new client since it depends on the flow manager.
+	newctx, _, err = r.WithNewClient(newctx)
+	if err != nil {
+		return ctx, nil, err
+	}
+	return newctx, m, nil
+}
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index a8753fe..9b7f6cd 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -148,7 +148,7 @@
 	debugDisp := r.GetReservedNameDispatcher(nctx)
 
 	if debugDisp != newDebugDisp || debugDisp == oldDebugDisp {
-		t.Error("SetNewDebugDispatcher didn't update the context properly")
+		t.Error("WithNewDebugDispatcher didn't update the context properly")
 	}
 
 }
@@ -157,8 +157,19 @@
 	r, ctx, shutdown := initForTest(t)
 	defer shutdown()
 
-	if man := r.ExperimentalGetFlowManager(ctx); man == nil {
+	oldman := r.ExperimentalGetFlowManager(ctx)
+	if oldman == nil {
 		t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
 	}
-
+	newctx, newman, err := r.ExperimentalWithNewFlowManager(ctx)
+	if err != nil || newman == nil || newman == oldman {
+		t.Fatalf("Could not create flow manager: %v", err)
+	}
+	if !newctx.Initialized() {
+		t.Fatal("Got uninitialized context.")
+	}
+	man := r.ExperimentalGetFlowManager(newctx)
+	if man != newman || man == oldman {
+		t.Error("ExperimentalWithNewFlowManager didn't update the context properly")
+	}
 }