blob: 02a96d3d2c5f3a54359c911132d7f15cf847172f [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"bufio"
"strings"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/x/ref/runtime/factories/fake"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
"v.io/x/ref/test/goroutines"
)
func init() {
test.Init()
}
const leakWaitTime = 250 * time.Millisecond
func TestDirectConnection(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
actx := fake.SetFlowManager(ctx, am)
if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
dctx := fake.SetFlowManager(ctx, dm)
testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
shutdown()
<-am.Closed()
<-dm.Closed()
}
func TestDialCachedConn(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
actx := fake.SetFlowManager(ctx, am)
if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
dctx := fake.SetFlowManager(ctx, dm)
// At first the cache should be empty.
if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
old := dm.(*manager).cache.ridCache[am.RoutingID()]
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Errorf("got cache size %v, want %v", got, want)
}
if c := dm.(*manager).cache.ridCache[am.RoutingID()]; c != old {
t.Errorf("got %v want %v", c, old)
}
shutdown()
<-am.Closed()
<-dm.Closed()
}
func TestBidirectionalListeningEndpoint(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
actx := fake.SetFlowManager(ctx, am)
if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
dctx := fake.SetFlowManager(ctx, dm)
testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
// Now am should be able to make a flow to dm even though dm is not listening.
testFlows(t, actx, dctx, flowtest.BlessingsForPeer)
shutdown()
<-am.Closed()
<-dm.Closed()
}
func TestNullClientBlessings(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
actx := fake.SetFlowManager(ctx, am)
if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
nulldm := New(ctx, naming.NullRoutingID)
nctx := fake.SetFlowManager(ctx, nulldm)
_, af := testFlows(t, nctx, actx, flowtest.BlessingsForPeer)
// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
t.Errorf("got %v, want zero-value blessings", rBlessings)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
dctx := fake.SetFlowManager(ctx, dm)
_, af = testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
// Ensure that the remote blessings of the underlying conn of the accepted flow are
// non-zero if we did specify a RoutingID.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
t.Errorf("got %v, want non-zero blessings", rBlessings)
}
shutdown()
<-am.Closed()
<-dm.Closed()
<-nulldm.Closed()
}
func testFlows(t *testing.T, dctx, actx *context.T, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
am := v23.ExperimentalGetFlowManager(actx)
ep := am.ListeningEndpoints()[0]
var err error
df, err = v23.ExperimentalGetFlowManager(dctx).Dial(dctx, ep, bFn)
if err != nil {
t.Fatal(err)
}
want := "do you read me?"
writeLine(df, want)
af, err = am.Accept(actx)
if err != nil {
t.Fatal(err)
}
got, err := readLine(af)
if err != nil {
t.Error(err)
}
if got != want {
t.Errorf("got %v, want %v", got, want)
}
want = "i read you"
if err := writeLine(af, want); err != nil {
t.Error(err)
}
got, err = readLine(df)
if err != nil {
t.Error(err)
}
if got != want {
t.Errorf("got %v, want %v", got, want)
}
return
}
func readLine(f flow.Flow) (string, error) {
s, err := bufio.NewReader(f).ReadString('\n')
return strings.TrimRight(s, "\n"), err
}
func writeLine(f flow.Flow, data string) error {
data += "\n"
_, err := f.Write([]byte(data))
return err
}