blob: f098ca48822d8698de1588d38477c39d8e5e1ec4 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"reflect"
"strconv"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
connpackage "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
_ "v.io/x/ref/runtime/protocols/local"
"v.io/x/ref/test"
"v.io/x/ref/test/goroutines"
)
func init() {
flow.RegisterProtocol("resolve", &resolveProtocol{
protocol: "local",
addresses: []string{"address"},
})
p, _ := flow.RegisteredProtocol("local")
flow.RegisterProtocol("wrong", p)
}
var nextrid uint64 = 100
func makeEPs(ctx *context.T, addr string) (ep, nullep, wprotoep, waddrep, waddrprotoep, resolvep naming.Endpoint) {
ep = naming.Endpoint{
Protocol: "local",
Address: addr,
RoutingID: naming.FixedRoutingID(nextrid),
}.WithBlessingNames(unionBlessing(ctx, "A", "B", "C"))
nextrid++
nullep = ep
nullep.RoutingID = naming.NullRoutingID
wprotoep = nullep
wprotoep.Protocol = "wrong"
waddrep = nullep
waddrep.Address = "wrong"
resolvep = nullep
resolvep.Protocol = "resolve"
resolvep.Address = "wrong"
waddrprotoep = ep
waddrprotoep.Protocol = "wrong"
waddrprotoep.Address = "wrong"
return
}
func modep(ep naming.Endpoint, field string, value interface{}) naming.Endpoint {
reflect.ValueOf(ep).FieldByName(field).Set(reflect.ValueOf(value))
return ep
}
func makeEP(ctx *context.T, protocol, address string, rid uint64, blessings ...string) naming.Endpoint {
routingID := naming.NullRoutingID
if rid != 0 {
routingID = naming.FixedRoutingID(rid)
}
return naming.Endpoint{
Protocol: protocol,
Address: address,
RoutingID: routingID,
}.WithBlessingNames(blessings)
}
func TestCacheReserve(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
defer shutdown()
c := NewConnCache(0)
defer c.Close(ctx)
nullep := makeEP(ctx, "local", "a1", 0, "b1")
oneep := makeEP(ctx, "local", "a1", 1, "b1")
twoep := makeEP(ctx, "local", "a1", 2, "b1")
proxyep := makeEP(ctx, "local", "a1", 3, "b1")
r1 := c.Reserve(ctx, oneep)
if r1 == nil {
t.Error("expected non-nil reservation.")
}
// That will have reserved both the address and the path.
// A second reservation should return nil.
if nr := c.Reserve(ctx, oneep); nr != nil {
t.Errorf("got %v, want nil", nr)
}
if nr := c.Reserve(ctx, nullep); nr != nil {
t.Errorf("got %v, want nil", nr)
}
// Reserving the proxy now will return a reservation for only
// the proxies RID, since at this point we don't know the proxies
// RID.
pr := c.Reserve(ctx, proxyep)
if pr == nil {
t.Errorf("epected non-nil reservation")
}
// Reserving a different RID, but the same address will result in a path
// reservation.
r2 := c.Reserve(ctx, twoep)
if r2 == nil {
t.Errorf("expected non-nil reservation.")
}
// Since r1 has reserved the proxy address, it's ProxyConn should return nil.
if pc := r1.ProxyConn(); pc != nil {
t.Errorf("got %v, expected nil.", pc)
}
proxycaf := makeConnAndFlow(t, ctx, proxyep)
defer proxycaf.stop(ctx)
onecaf := makeConnAndFlow(t, ctx, oneep)
defer onecaf.stop(ctx)
r1.Unreserve(onecaf.c, proxycaf.c, nil)
// Now, asking for the ProxyConn on the proxy reservation should find
// the proxy, since we just inserted it.
if pc := pr.ProxyConn().(*connpackage.Conn); pc != proxycaf.c {
t.Errorf("got %v, expected %v", pc, proxycaf.c)
}
// Now that the conn exists, we should not get another reservation.
if nr := c.Reserve(ctx, oneep); nr != nil {
t.Errorf("got %v, want nil", nr)
}
if nr := c.Reserve(ctx, nullep); nr != nil {
t.Errorf("got %v, want nil", nr)
}
// Note that the context should not have been canceled.
if err := r1.Context().Err(); err != nil {
t.Errorf("got %v want nil", err)
}
pc := r2.ProxyConn()
if pc == nil || pc.RemoteEndpoint().RoutingID != proxyep.RoutingID {
t.Fatalf("got %v, want %v", pc.RemoteEndpoint(), proxyep)
}
twocaf := makeConnAndFlow(t, ctx, twoep)
defer twocaf.stop(ctx)
r2.Unreserve(twocaf.c, pc, nil)
}
func TestCacheFind(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
defer shutdown()
c := NewConnCache(0)
ep, nullep, wprotoep, waddrep, waddrprotoep, resolvep := makeEPs(ctx, "address")
auth := flowtest.NewPeerAuthorizer(ep.BlessingNames())
caf := makeConnAndFlow(t, ctx, ep)
defer caf.stop(ctx)
conn := caf.c
if err := c.Insert(conn, false); err != nil {
t.Fatal(err)
}
// We should be able to find the conn in the cache.
if got, _, _, err := c.Find(ctx, nullep, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
// Changing the protocol should fail.
if got, _, _, err := c.Find(ctx, wprotoep, auth); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
// Changing the address should fail.
if got, _, _, err := c.Find(ctx, waddrep, auth); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
// Changing the blessingNames should fail.
if got, _, _, err := c.Find(ctx, ep, flowtest.NewPeerAuthorizer([]string{"wrong"})); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
// But finding a set of blessings that has at least one blessings in remote.Blessings should succeed.
if got, _, _, err := c.Find(ctx, ep, flowtest.NewPeerAuthorizer([]string{"foo", ep.BlessingNames()[0]})); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
// Finding by routing ID should work.
if got, _, _, err := c.Find(ctx, waddrprotoep, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
// Finding by a valid resolve protocol and address should work.
if got, _, _, err := c.Find(ctx, resolvep, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
// Caching a proxied connection should not care about endpoint blessings, since the
// blessings only correspond to the end server.
proxyep, nullProxyep, _, _, _, _ := makeEPs(ctx, "proxy")
caf = makeConnAndFlow(t, ctx, proxyep)
defer caf.stop(ctx)
proxyConn := caf.c
if err := c.Insert(proxyConn, true); err != nil {
t.Fatal(err)
}
// Wrong blessingNames should still work
if got, _, _, err := c.Find(ctx, nullProxyep, flowtest.NewPeerAuthorizer([]string{"wrong"})); err != nil || got != proxyConn {
t.Errorf("got %v, want %v, err: %v", got, proxyConn, err)
}
ridep, nullridep, _, _, waddrprotoridep, _ := makeEPs(ctx, "rid")
// Caching with InsertWithRoutingID should only cache by RoutingID, not with network/address.
ridauth := flowtest.NewPeerAuthorizer(ridep.BlessingNames())
caf = makeConnAndFlow(t, ctx, ridep)
defer caf.stop(ctx)
ridConn := caf.c
if err := c.InsertWithRoutingID(ridConn, false); err != nil {
t.Fatal(err)
}
if got, _, _, err := c.Find(ctx, nullridep, ridauth); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
// Finding by routing ID should work.
if got, _, _, err := c.Find(ctx, waddrprotoridep, ridauth); err != nil || got != ridConn {
t.Errorf("got %v, want %v, err: %v", got, ridConn, err)
}
// Insert a duplicate conn to ensure that replaced conns still get closed.
caf = makeConnAndFlow(t, ctx, ep)
defer caf.stop(ctx)
dupConn := caf.c
if err := c.Insert(dupConn, false); err != nil {
t.Fatal(err)
}
// Closing the cache should close all the connections in the cache.
// Ensure that the conns are not closed yet.
if status := conn.Status(); status == connpackage.Closed {
t.Fatal("wanted conn to not be closed")
}
if status := dupConn.Status(); status == connpackage.Closed {
t.Fatal("wanted dupConn to not be closed")
}
c.Close(ctx)
<-conn.Closed()
<-ridConn.Closed()
<-dupConn.Closed()
<-proxyConn.Closed()
}
func TestLRU(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
defer shutdown()
// Ensure that the least recently created conns are killed by KillConnections.
c := NewConnCache(0)
defer c.Close(ctx)
conns, stop := nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
if err := c.Insert(conn.c, false); err != nil {
t.Fatal(err)
}
}
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
}
// conns[3:] should not be closed and still in the cache.
// conns[:3] should be closed and removed from the cache.
for _, conn := range conns[3:] {
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
}
for _, conn := range conns[:3] {
<-conn.c.Closed()
if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
// Ensure that writing to conns marks conns as more recently used.
c = NewConnCache(0)
defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
if err := c.Insert(conn.c, false); err != nil {
t.Fatal(err)
}
}
for _, conn := range conns[:7] {
conn.write()
}
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
}
// conns[:7] should not be closed and still in the cache.
// conns[7:] should be closed and removed from the cache.
for _, conn := range conns[:7] {
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should still be in cache", conn)
}
}
for _, conn := range conns[7:] {
<-conn.c.Closed()
if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
// Ensure that reading from conns marks conns as more recently used.
c = NewConnCache(0)
defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
if err := c.Insert(conn.c, false); err != nil {
t.Fatal(err)
}
}
for _, conn := range conns[:7] {
conn.read()
}
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
}
// conns[:7] should not be closed and still in the cache.
// conns[7:] should be closed and removed from the cache.
for _, conn := range conns[:7] {
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
}
for _, conn := range conns[7:] {
<-conn.c.Closed()
if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
}
func TestIdleConns(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
defer shutdown()
// Ensure that idle conns are killed by the cache.
// Set the idle timeout very low to ensure all the connections are closed.
c := NewConnCache(1)
defer c.Close(ctx)
conns, stop := nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
// close the flows so the conns aren't kept alive due to ongoing flows.
conn.f.Close()
if err := c.Insert(conn.c, false); err != nil {
t.Fatal(err)
}
}
time.Sleep(2 * time.Millisecond)
if err := c.KillConnections(ctx, 0); err != nil {
t.Fatal(err)
}
// All connections should be lameducked.
for _, conn := range conns {
if status := conn.c.Status(); status < connpackage.EnteringLameDuck {
t.Errorf("conn %v should have been lameducked", conn)
}
}
// We wait for the acknowledgement of the lameduck, then KillConnections should
// actually close the connections.
for _, conn := range conns {
<-conn.c.EnterLameDuck(ctx)
}
if err := c.KillConnections(ctx, 0); err != nil {
t.Fatal(err)
}
// All connections should be removed from the cache.
for _, conn := range conns {
<-conn.c.Closed()
if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
// Set the idle timeout very high to ensure none of the connections are closed.
c = NewConnCache(time.Hour)
defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
// close the flows so the conns aren't kept alive due to ongoing flows.
conn.f.Close()
if err := c.Insert(conn.c, false); err != nil {
t.Fatal(err)
}
}
if err := c.KillConnections(ctx, 0); err != nil {
t.Fatal(err)
}
for _, conn := range conns {
if status := conn.c.Status(); status >= connpackage.EnteringLameDuck {
t.Errorf("conn %v should not have been closed or lameducked", conn)
}
if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
}
// Ensure that a low idle timeout, but live flows on the conns keep the connection alive.
c = NewConnCache(1)
defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
// don't close the flows so that the conns are kept alive.
if err := c.Insert(conn.c, false); err != nil {
t.Fatal(err)
}
}
if err := c.KillConnections(ctx, 0); err != nil {
t.Fatal(err)
}
for _, conn := range conns {
if status := conn.c.Status(); status >= connpackage.LameDuckAcknowledged {
t.Errorf("conn %v should not have been closed or lameducked", conn)
}
if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
}
}
func TestMultiRTTConns(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
defer shutdown()
c := NewConnCache(0)
defer c.Close(ctx)
remote, _, _, _, _, _ := makeEPs(ctx, "normal")
auth := flowtest.NewPeerAuthorizer(remote.BlessingNames())
slow, med, fast := 3*time.Millisecond, 2*time.Millisecond, 1*time.Millisecond
// Add a slow connection into the cache and ensure it is found.
slowConn := newRTTConn(ctx, remote, slow)
if err := c.Insert(slowConn, false); err != nil {
t.Fatal(err)
}
if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != slowConn {
t.Errorf("got %v, want %v, err: %v", got, slowConn, err)
}
// Add a fast connection into the cache and ensure it is found over the slow one.
fastConn := newRTTConn(ctx, remote, fast)
if err := c.Insert(fastConn, false); err != nil {
t.Fatal(err)
}
if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != fastConn {
t.Errorf("got %v, want %v, err: %v", got, fastConn, err)
}
// Add a med connection into the cache and ensure that the fast one is still found.
medConn := newRTTConn(ctx, remote, med)
if err := c.Insert(medConn, false); err != nil {
t.Fatal(err)
}
if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != fastConn {
t.Errorf("got %v, want %v, err: %v", got, fastConn, err)
}
// Kill the fast connection and ensure the med connection is found.
fastConn.Close(ctx, nil)
if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != medConn {
t.Errorf("got %v, want %v, err: %v", got, medConn, err)
}
}
func newRTTConn(ctx *context.T, ep naming.Endpoint, rtt time.Duration) *rttConn {
return &rttConn{
ctx: ctx,
ep: ep,
rtt: rtt,
closed: make(chan struct{}),
}
}
type rttConn struct {
ctx *context.T
ep naming.Endpoint
rtt time.Duration
closed chan struct{}
}
func (c *rttConn) Status() connpackage.Status {
select {
case <-c.closed:
return connpackage.Closed
default:
return connpackage.Active
}
}
func (c *rttConn) IsEncapsulated() bool { return false }
func (c *rttConn) IsIdle(*context.T, time.Duration) bool { return false }
func (c *rttConn) EnterLameDuck(*context.T) chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}
func (c *rttConn) RemoteLameDuck() bool { return false }
func (c *rttConn) CloseIfIdle(*context.T, time.Duration) bool { return false }
func (c *rttConn) Close(*context.T, error) {
select {
case <-c.closed:
default:
close(c.closed)
}
}
func (c *rttConn) RemoteEndpoint() naming.Endpoint { return c.ep }
func (c *rttConn) LocalEndpoint() naming.Endpoint { return c.ep }
func (c *rttConn) RemoteBlessings() security.Blessings {
b, _ := v23.GetPrincipal(c.ctx).BlessingStore().Default()
return b
}
func (c *rttConn) RemoteDischarges() map[string]security.Discharge { return nil }
func (c *rttConn) RTT() time.Duration { return c.rtt }
func (c *rttConn) LastUsed() time.Time { return time.Now() }
func (c *rttConn) DebugString() string { return "" }
func isInCache(ctx *context.T, c *ConnCache, conn *connpackage.Conn) bool {
rep := conn.RemoteEndpoint()
_, _, _, err := c.Find(ctx, rep, flowtest.NewPeerAuthorizer(rep.BlessingNames()))
return err == nil
}
type connAndFlow struct {
c *connpackage.Conn
a *connpackage.Conn
f flow.Flow
}
func (c connAndFlow) write() {
_, err := c.f.WriteMsg([]byte{0})
if err != nil {
panic(err)
}
}
func (c connAndFlow) read() {
_, err := c.f.ReadMsg()
if err != nil {
panic(err)
}
}
func (c connAndFlow) stop(ctx *context.T) {
c.c.Close(ctx, nil)
c.a.Close(ctx, nil)
}
func nConnAndFlows(t *testing.T, ctx *context.T, n int) ([]connAndFlow, func()) {
cfs := make([]connAndFlow, n)
for i := 0; i < n; i++ {
cfs[i] = makeConnAndFlow(t, ctx, naming.Endpoint{
Protocol: "local",
Address: strconv.Itoa(i),
RoutingID: naming.FixedRoutingID(uint64(i + 1)), // We need to have a nonzero rid for bidi.
})
}
return cfs, func() {
for _, conn := range cfs {
conn.stop(ctx)
}
}
}
func makeConnAndFlow(t *testing.T, ctx *context.T, ep naming.Endpoint) connAndFlow {
dmrw, amrw := flowtest.Pipe(t, ctx, "local", "")
dch := make(chan *connpackage.Conn)
ach := make(chan *connpackage.Conn)
go func() {
d, _, _, err := connpackage.NewDialed(ctx, dmrw, ep, ep,
version.RPCVersionRange{Min: 1, Max: 5},
flowtest.AllowAllPeersAuthorizer{},
time.Minute, 0, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
dch <- d
}()
fh := fh{t, make(chan struct{})}
go func() {
a, err := connpackage.NewAccepted(ctx, nil, amrw, ep,
version.RPCVersionRange{Min: 1, Max: 5}, time.Minute, 0, fh)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ach <- a
}()
conn := <-dch
aconn := <-ach
f, err := conn.Dial(ctx, conn.LocalBlessings(), nil, conn.RemoteEndpoint(), 0, false)
if err != nil {
t.Fatal(err)
}
// Write a byte to send the openFlow message.
if _, err := f.Write([]byte{0}); err != nil {
t.Fatal(err)
}
<-fh.ch
return connAndFlow{conn, aconn, f}
}
type fh struct {
t *testing.T
ch chan struct{}
}
func (h fh) HandleFlow(f flow.Flow) error {
go func() {
if _, err := f.WriteMsg([]byte{0}); err != nil {
h.t.Errorf("failed to write: %v", err)
}
close(h.ch)
}()
return nil
}
func unionBlessing(ctx *context.T, names ...string) []string {
principal := v23.GetPrincipal(ctx)
blessings := make([]security.Blessings, len(names))
for i, name := range names {
var err error
if blessings[i], err = principal.BlessSelf(name); err != nil {
panic(err)
}
}
union, err := security.UnionOfBlessings(blessings...)
if err != nil {
panic(err)
}
if err := security.AddToRoots(principal, union); err != nil {
panic(err)
}
if err := principal.BlessingStore().SetDefault(union); err != nil {
panic(err)
}
return security.BlessingNames(principal, union)
}
// resolveProtocol returns a fixed protocol and addresses for its Resolve function.
type resolveProtocol struct {
protocol string
addresses []string
}
func (p *resolveProtocol) Resolve(_ *context.T, _, _ string) (string, []string, error) {
return p.protocol, p.addresses, nil
}
func (*resolveProtocol) Dial(_ *context.T, _, _ string, _ time.Duration) (flow.Conn, error) {
return nil, nil
}
func (*resolveProtocol) Listen(_ *context.T, _, _ string) (flow.Listener, error) {
return nil, nil
}