blob: 718670f5a61de220ce65b716872f0f1a873e5b50 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package xproxy_test
import (
"crypto/rand"
"strings"
"sync"
"testing"
"time"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/runtime/protocols/debug"
"v.io/x/ref/services/xproxy/xproxy"
"v.io/x/ref/test"
"v.io/x/ref/test/goroutines"
"v.io/x/ref/test/testutil"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
)
const leakWaitTime = 250 * time.Millisecond
var randData []byte
func init() {
randData = make([]byte, 1<<17)
if _, err := rand.Read(randData); err != nil {
panic("Could not read random data.")
}
}
type testService struct{}
func (t *testService) Echo(ctx *context.T, call rpc.ServerCall, arg string) (string, error) {
return "response:" + arg, nil
}
func TestProxyRPC(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Start the proxy.
pname, stop := startProxy(t, ctx, "proxy", security.AllowEveryone(), "", address{"tcp", "127.0.0.1:0"})
defer stop()
// Start the server listening through the proxy.
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: pname})
sctx, cancel := context.WithCancel(ctx)
_, server, err := v23.WithNewServer(sctx, "server", &testService{}, nil)
if err != nil {
t.Fatal(err)
}
defer func() {
cancel()
<-server.Closed()
}()
var got string
if err := v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
t.Errorf("got %v, want %v", got, want)
}
}
func TestMultipleProxyRPC(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
kp := newKillProtocol()
flow.RegisterProtocol("kill", kp)
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Start the proxies.
p1name, stop := startProxy(t, ctx, "p1", security.AllowEveryone(), "", address{"kill", "127.0.0.1:0"})
defer stop()
p2name, stop := startProxy(t, ctx, "p2", security.AllowEveryone(), p1name, address{"kill", "127.0.0.1:0"})
defer stop()
// Start the server listening through the proxy.
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: p2name})
sctx, cancel := context.WithCancel(ctx)
_, server, err := v23.WithNewServer(sctx, "server", &testService{}, nil)
if err != nil {
t.Error(err)
}
defer func() {
cancel()
<-server.Closed()
}()
var got string
if err := v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Error(err)
}
if want := "response:hello"; got != want {
t.Errorf("got %v, want %v", got, want)
}
kp.KillConnections()
// Killing the connections and trying again should work.
for {
if err := v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&got}); err == nil {
break
}
}
if want := "response:hello"; got != want {
t.Errorf("got %v, want %v", got, want)
}
}
func TestProxyNotAuthorized(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Make principals for the proxy, server, and client.
pctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("proxy"))
if err != nil {
t.Fatal(err)
}
sctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("server"))
if err != nil {
t.Fatal(err)
}
cctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("client"))
if err != nil {
t.Fatal(err)
}
// Have the root bless the client, server, and proxy.
root := testutil.IDProviderFromPrincipal(v23.GetPrincipal(ctx))
if err := root.Bless(v23.GetPrincipal(pctx), "proxy"); err != nil {
t.Fatal(err)
}
if err := root.Bless(v23.GetPrincipal(cctx), "client"); err != nil {
t.Fatal(err)
}
if err := root.Bless(v23.GetPrincipal(sctx), "server"); err != nil {
t.Fatal(err)
}
// Now the proxy's blessings would fail authorization from the client using the
// default authorizer.
pname, stop := startProxy(t, pctx, "proxy", security.AllowEveryone(), "", address{"tcp", "127.0.0.1:0"})
defer stop()
// Start the server listening through the proxy.
sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Proxy: pname})
sctx, cancel := context.WithCancel(sctx)
_, server, err := v23.WithNewServer(sctx, "server", &testService{}, security.AllowEveryone())
if err != nil {
t.Fatal(err)
}
defer func() {
cancel()
<-server.Closed()
}()
// The call should succeed which means that the client did not try to authorize
// the proxy's blessings.
var got string
if err := v23.GetClient(cctx).Call(cctx, "server", "Echo", []interface{}{"hello"},
[]interface{}{&got}, options.ServerAuthorizer{rejectProxyAuthorizer{}}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
t.Errorf("got %v, want %v", got, want)
}
}
func TestProxyAuthorizesServer(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Make principals for the proxy and server.
pctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("proxy"))
if err != nil {
t.Fatal(err)
}
sctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("server"))
if err != nil {
t.Fatal(err)
}
// Have the root bless the proxy and server.
root := testutil.IDProviderFromPrincipal(v23.GetPrincipal(ctx))
if err := root.Bless(v23.GetPrincipal(pctx), "proxy"); err != nil {
t.Fatal(err)
}
if err := root.Bless(v23.GetPrincipal(sctx), "server"); err != nil {
t.Fatal(err)
}
// Start a proxy that accepts connections from everyone and ensure that it does.
pname, stop := startProxy(t, pctx, "acceptproxy", security.AllowEveryone(), "", address{"tcp", "127.0.0.1:0"})
defer stop()
sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Proxy: pname})
sctx, cancel := context.WithCancel(sctx)
defer cancel()
_, server, err := v23.WithNewServer(sctx, "", &testService{}, nil)
if err != nil {
t.Fatal(err)
}
status := testutil.WaitForProxyEndpoints(server, pname)
proxyEP := status.Endpoints[0]
// A proxy using the default authorizer should not authorize the server.
pname, stop = startProxy(t, pctx, "denyproxy", nil, "", address{"tcp", "127.0.0.1:0"})
defer stop()
sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Proxy: pname})
_, server, err = v23.WithNewServer(sctx, "", &testService{}, nil)
if err != nil {
t.Fatal(err)
}
for {
status := server.Status()
if err, ok := status.ProxyErrors["denyproxy"]; ok && err == nil {
t.Errorf("proxy should not have authorized server")
} else if ok {
break
}
<-status.Dirty
}
// Artificially constructing the proxied endpoint to the server should
// not work. (i.e. a client cannot "trick" a proxy into connecting to a server
// that the proxy doesn't want to talk to).
ep, err := setEndpointRoutingID(proxyEP, server.Status().Endpoints[0].RoutingID)
if err != nil {
t.Error(err)
}
var got string
if err := v23.GetClient(ctx).Call(ctx, ep.Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}, options.NoRetry{}); err == nil {
t.Error("proxy should not have authorized server.")
}
}
func TestBigProxyRPC(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
defer shutdown()
// Start the proxy.
pname, stop := startProxy(t, ctx, "", security.AllowEveryone(), "", address{"tcp", "127.0.0.1:0"})
defer stop()
// Start the server listening through the proxy.
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: pname})
sctx, cancel := context.WithCancel(ctx)
defer cancel()
_, server, err := v23.WithNewServer(sctx, "", &testService{}, nil)
if err != nil {
t.Fatal(err)
}
status := testutil.WaitForProxyEndpoints(server, pname)
name := status.Endpoints[0].Name()
var got string
if err := v23.GetClient(ctx).Call(ctx, name, "Echo", []interface{}{string(randData)}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:" + string(randData); got != want {
t.Errorf("got %v, want %v", got, want)
}
}
func TestProxiedServerCachedConnection(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
mu := &sync.Mutex{}
numConns := 0
ctx = debug.WithFilter(ctx, func(c flow.Conn) flow.Conn {
mu.Lock()
numConns++
mu.Unlock()
return c
})
// Make principals for the proxy and server.
pctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("proxy"))
if err != nil {
t.Fatal(err)
}
sctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("server"))
if err != nil {
t.Fatal(err)
}
// Have the root bless the proxy and server.
root := testutil.IDProviderFromPrincipal(v23.GetPrincipal(ctx))
if err := root.Bless(v23.GetPrincipal(pctx), "proxy"); err != nil {
t.Fatal(err)
}
if err := root.Bless(v23.GetPrincipal(sctx), "server"); err != nil {
t.Fatal(err)
}
// Start the proxy.
pname, stop := startProxy(t, pctx, "proxy", security.AllowEveryone(), "", address{"debug", "tcp/127.0.0.1:0"})
defer stop()
// Start the server listening through the proxy.
sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Proxy: pname})
sctx, cancel := context.WithCancel(sctx)
ctx, server, err := v23.WithNewServer(sctx, "server", &testService{}, nil)
if err != nil {
t.Fatal(err)
}
defer func() {
cancel()
<-server.Closed()
}()
var got string
// Make the server call itself.
if err := v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
t.Errorf("got %v, want %v", got, want)
}
// Ensure that only 1 connection was made. So we check for 2, 1 accepted, and 1 dialed.
// One connection for the server connecting to the proxy, the proxy should reuse
// the connection on the second hop back to the server.
mu.Lock()
n := numConns
mu.Unlock()
if want := 2; n != want {
t.Errorf("got %v, want %v", n, want)
}
}
type blockingServer struct {
start chan struct{} // closed when the
wait chan struct{}
}
func (s *blockingServer) BlockingCall(*context.T, rpc.ServerCall) error {
close(s.start)
<-s.wait
return nil
}
func newBlockingServer() *blockingServer {
return &blockingServer{make(chan struct{}), make(chan struct{})}
}
func TestConcurrentProxyConnections(t *testing.T) {
// Test that when a client makes a connection to two different proxied servers
// a call to one server doesn't cause a call to the other to fail.
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Start the proxy.
pname, stop := startProxy(t, ctx, "proxy", security.AllowEveryone(), "", address{"tcp", "127.0.0.1:0"})
defer stop()
// Start the server listening through the proxy.
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: pname})
sctx, cancel := context.WithCancel(ctx)
defer cancel()
service := newBlockingServer()
_, server, err := v23.WithNewServer(sctx, "", service, nil)
if err != nil {
t.Error(err)
}
status := testutil.WaitForProxyEndpoints(server, pname)
ep := status.Endpoints[0]
// Create a nonexistent server.
badep, err := setEndpointRoutingID(ep, naming.FixedRoutingID(0x666))
if err != nil {
t.Error(err)
}
// Start a call to the good server.
call, err := v23.GetClient(ctx).StartCall(ctx, ep.Name(), "BlockingCall", nil)
if err != nil {
t.Error(err)
}
// wait for the server to get the rpc.
<-service.start
// Make a call to the noexistent server.
if _, err := v23.GetClient(ctx).PinConnection(ctx, badep.Name(), options.NoRetry{}); err == nil {
t.Errorf("Call should not succeed.")
}
// Unblock the first rpc and ensure that it succeeds.
close(service.wait)
if err := call.Finish(); err != nil {
t.Error(err)
}
}
func TestProxyBlessings(t *testing.T) {
// Test that the proxy presents the blessings tagged for the server, rather than
// its default blessings.
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Make principals for the proxy and server.
pctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("proxy"))
if err != nil {
t.Fatal(err)
}
sctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("server"))
if err != nil {
t.Fatal(err)
}
// Have the root bless the proxy and server.
root := testutil.IDProviderFromPrincipal(v23.GetPrincipal(ctx))
if err := root.Bless(v23.GetPrincipal(pctx), "proxy"); err != nil {
t.Fatal(err)
}
if err := root.Bless(v23.GetPrincipal(sctx), "server"); err != nil {
t.Fatal(err)
}
// Set the for peer blessings that the server will send to the proxy to be one
// that the proxy will reject. Additionally ensure that the default blessing is
// one that the proxy will accept. This ensures that the server is sending the
// blessings tagged for the proxy.
serverP := v23.GetPrincipal(sctx)
def, _ := serverP.BlessingStore().Default()
pblesser := testutil.IDProviderFromPrincipal(v23.GetPrincipal(pctx))
if err := pblesser.Bless(serverP, "server"); err != nil {
t.Fatal(err)
}
serverP.BlessingStore().Set(def, "...")
// Start the proxy.
pname, stop := startProxy(t, pctx, "proxy", nil, "", address{"tcp", "127.0.0.1:0"})
defer stop()
// Start the server listening through the proxy.
sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Proxy: pname})
sctx, cancel := context.WithCancel(sctx)
defer cancel()
_, server, err := v23.WithNewServer(sctx, "", &testService{}, nil)
if err != nil {
t.Error(err)
}
for {
status := server.Status()
if err, ok := status.ProxyErrors["proxy"]; ok && err == nil {
t.Errorf("proxy should not have authorized server")
} else if ok {
break
}
<-status.Dirty
}
}
type rejectProxyAuthorizer struct{}
func (rejectProxyAuthorizer) Authorize(ctx *context.T, call security.Call) error {
names, _ := security.RemoteBlessingNames(ctx, call)
for _, n := range names {
if strings.Contains(n, "proxy") {
panic("should not call authorizer on proxy")
}
}
return nil
}
type address struct {
Protocol, Address string
}
func startProxy(t *testing.T, ctx *context.T, name string, auth security.Authorizer, listenOnProxy string, addrs ...address) (string, func()) {
var ls rpc.ListenSpec
for _, addr := range addrs {
ls.Addrs = append(ls.Addrs, addr)
}
ls.Proxy = listenOnProxy
ctx = v23.WithListenSpec(ctx, ls)
ctx, cancel := context.WithCancel(ctx)
proxy, err := xproxy.New(ctx, name, auth)
if err != nil {
t.Fatal(err)
}
stop := func() {
cancel()
<-proxy.Closed()
}
if len(name) > 0 {
return name, stop
}
peps := proxy.ListeningEndpoints()
for _, pep := range peps {
if pep.Addr().Network() == "tcp" || pep.Addr().Network() == "kill" {
return pep.Name(), stop
}
}
t.Fatal("Proxy not listening on network address.")
return "", nil
}
type killProtocol struct {
protocol flow.Protocol
mu sync.Mutex
conns []flow.Conn
}
type kpListener struct {
kp *killProtocol
flow.Listener
}
func (l *kpListener) Accept(ctx *context.T) (flow.Conn, error) {
c, err := l.Listener.Accept(ctx)
if err != nil {
return nil, err
}
l.kp.mu.Lock()
l.kp.conns = append(l.kp.conns, c)
l.kp.mu.Unlock()
return c, err
}
func newKillProtocol() *killProtocol {
p, _ := flow.RegisteredProtocol("tcp")
return &killProtocol{protocol: p}
}
func (p *killProtocol) KillConnections() {
p.mu.Lock()
for _, c := range p.conns {
c.Close()
}
p.conns = nil
p.mu.Unlock()
}
func (p *killProtocol) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) {
c, err := p.protocol.Dial(ctx, "tcp", address, timeout)
if err != nil {
return nil, err
}
p.mu.Lock()
p.conns = append(p.conns, c)
p.mu.Unlock()
return c, nil
}
func (p *killProtocol) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
l, err := p.protocol.Listen(ctx, "tcp", address)
if err != nil {
return nil, err
}
return &kpListener{kp: p, Listener: l}, nil
}
func (p *killProtocol) Resolve(ctx *context.T, protocol, address string) (string, []string, error) {
return p.protocol.Resolve(ctx, "tcp", address)
}
func setEndpointRoutingID(ep naming.Endpoint, rid naming.RoutingID) (naming.Endpoint, error) {
network, address, _, mountable := getEndpointParts(ep)
var opts []naming.EndpointOpt
opts = append(opts, rid)
opts = append(opts, mountable)
epString := naming.FormatEndpoint(network, address, opts...)
return naming.ParseEndpoint(epString)
}
// getEndpointParts returns all the fields of ep.
func getEndpointParts(ep naming.Endpoint) (network string, address string,
rid naming.RoutingID, mountable naming.EndpointOpt) {
network, address = ep.Addr().Network(), ep.Addr().String()
rid = ep.RoutingID
mountable = naming.ServesMountTable(ep.ServesMountTable)
return
}