Merge "vbecome: Unset V23_CREDENTIALS"
diff --git a/runtime/internal/flow/errors.vdl b/runtime/internal/flow/errors.vdl
new file mode 100644
index 0000000..8019811
--- /dev/null
+++ b/runtime/internal/flow/errors.vdl
@@ -0,0 +1,15 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package flow
+
+// These messages are constructed so as to avoid embedding a component/method name
+// and are thus more suitable for inclusion in other verrors.
+// This practice of omitting {1}{2} should be used throughout the flow implementations
+// since all of their errors are intended to be used as arguments to higher level errors.
+// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
+error (
+  WrongObjectInContext(typ string) {"en":
+  "context passed to method of {typ} object, but that object is not attached to the context."}
+)
diff --git a/runtime/internal/flow/errors.vdl.go b/runtime/internal/flow/errors.vdl.go
new file mode 100644
index 0000000..3f4d3b2
--- /dev/null
+++ b/runtime/internal/flow/errors.vdl.go
@@ -0,0 +1,28 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package flow
+
+import (
+	// VDL system imports
+	"v.io/v23/context"
+	"v.io/v23/i18n"
+	"v.io/v23/verror"
+)
+
+var (
+	ErrWrongObjectInContext = verror.Register("v.io/x/ref/runtime/internal/flow.WrongObjectInContext", verror.NoRetry, "{1:}{2:} context passed to method of {3} object, but that object is not attached to the context.")
+)
+
+func init() {
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrWrongObjectInContext.ID), "{1:}{2:} context passed to method of {3} object, but that object is not attached to the context.")
+}
+
+// NewErrWrongObjectInContext returns an error with the ErrWrongObjectInContext ID.
+func NewErrWrongObjectInContext(ctx *context.T, typ string) error {
+	return verror.New(ErrWrongObjectInContext, ctx, typ)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 6320fc3..60fa43e 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -20,6 +20,7 @@
 	"v.io/v23/security"
 	"v.io/v23/verror"
 
+	iflow "v.io/x/ref/runtime/internal/flow"
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/lib/upcqueue"
 	inaming "v.io/x/ref/runtime/internal/naming"
@@ -87,6 +88,9 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+	if err := m.validateContext(ctx); err != nil {
+		return err
+	}
 	if protocol == inaming.Network {
 		return m.proxyListen(ctx, address)
 	}
@@ -309,7 +313,9 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
-	// TODO(suharshs): Ensure that m is attached to ctx.
+	if err := m.validateContext(ctx); err != nil {
+		return nil, err
+	}
 	item, err := m.q.Get(ctx.Done())
 	switch {
 	case err == upcqueue.ErrQueueIsClosed:
@@ -330,6 +336,9 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
+	if err := m.validateContext(ctx); err != nil {
+		return nil, err
+	}
 	var fh conn.FlowHandler
 	if m.rid != naming.NullRoutingID {
 		fh = &flowHandler{q: m.q}
@@ -447,6 +456,13 @@
 	return m.closed
 }
 
+func (m *manager) validateContext(ctx *context.T) error {
+	if v23.ExperimentalGetFlowManager(ctx) != m {
+		return flow.NewErrBadArg(ctx, iflow.NewErrWrongObjectInContext(ctx, "manager"))
+	}
+	return nil
+}
+
 func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) {
 	if p != nil {
 		var timeout time.Duration
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index cce80f9..02a96d3 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -15,7 +15,7 @@
 	"v.io/v23/flow"
 	"v.io/v23/naming"
 
-	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/factories/fake"
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	"v.io/x/ref/test"
@@ -33,12 +33,14 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+	actx := fake.SetFlowManager(ctx, am)
+	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	dctx := fake.SetFlowManager(ctx, dm)
 
-	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
 
 	shutdown()
 	<-am.Closed()
@@ -50,24 +52,26 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+	actx := fake.SetFlowManager(ctx, am)
+	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	dctx := fake.SetFlowManager(ctx, dm)
 	// At first the cache should be empty.
 	if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	// After dialing a connection the cache should hold one connection.
-	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	old := dm.(*manager).cache.ridCache[am.RoutingID()]
 	// After dialing another connection the cache should still hold one connection
 	// because the connections should be reused.
-	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
 		t.Errorf("got cache size %v, want %v", got, want)
 	}
@@ -85,14 +89,16 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+	actx := fake.SetFlowManager(ctx, am)
+	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
-	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	dctx := fake.SetFlowManager(ctx, dm)
+	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
 	// Now am should be able to make a flow to dm even though dm is not listening.
-	testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
+	testFlows(t, actx, dctx, flowtest.BlessingsForPeer)
 
 	shutdown()
 	<-am.Closed()
@@ -104,17 +110,20 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+	actx := fake.SetFlowManager(ctx, am)
+	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 	nulldm := New(ctx, naming.NullRoutingID)
-	_, af := testFlows(t, ctx, nulldm, am, flowtest.BlessingsForPeer)
+	nctx := fake.SetFlowManager(ctx, nulldm)
+	_, af := testFlows(t, nctx, actx, flowtest.BlessingsForPeer)
 	// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
 	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
 		t.Errorf("got %v, want zero-value blessings", rBlessings)
 	}
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
-	_, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	dctx := fake.SetFlowManager(ctx, dm)
+	_, af = testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
 	// Ensure that the remote blessings of the underlying conn of the accepted flow are
 	// non-zero if we did specify a RoutingID.
 	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
@@ -127,20 +136,17 @@
 	<-nulldm.Closed()
 }
 
-func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
-	eps := am.ListeningEndpoints()
-	if len(eps) == 0 {
-		t.Fatalf("no endpoints listened on")
-	}
-	ep := eps[0]
+func testFlows(t *testing.T, dctx, actx *context.T, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
+	am := v23.ExperimentalGetFlowManager(actx)
+	ep := am.ListeningEndpoints()[0]
 	var err error
-	df, err = dm.Dial(ctx, ep, bFn)
+	df, err = v23.ExperimentalGetFlowManager(dctx).Dial(dctx, ep, bFn)
 	if err != nil {
 		t.Fatal(err)
 	}
 	want := "do you read me?"
 	writeLine(df, want)
-	af, err = am.Accept(ctx)
+	af, err = am.Accept(actx)
 	if err != nil {
 		t.Fatal(err)
 	}