blob: 7898ee9f4683560d2be7031a6f60237ea3249a26 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package test
import (
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
)
type noMethodsType struct{ Field string }
type fieldType struct {
unexported string
}
type noExportedFieldsType struct{}
func (noExportedFieldsType) F(_ *context.T, _ rpc.ServerCall, f fieldType) error { return nil }
type badObjectDispatcher struct{}
func (badObjectDispatcher) Lookup(_ *context.T, suffix string) (interface{}, security.Authorizer, error) {
return noMethodsType{}, nil, nil
}
// TestBadObject ensures that Serve handles bad receiver objects gracefully (in
// particular, it doesn't panic).
func TestBadObject(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
sctx := withPrincipal(t, ctx, "server")
cctx := withPrincipal(t, ctx, "client")
if _, _, err := v23.WithNewServer(sctx, "", nil, nil); err == nil {
t.Fatal("should have failed")
}
if _, _, err := v23.WithNewServer(sctx, "", new(noMethodsType), nil); err == nil {
t.Fatal("should have failed")
}
if _, _, err := v23.WithNewServer(sctx, "", new(noExportedFieldsType), nil); err == nil {
t.Fatal("should have failed")
}
if _, _, err := v23.WithNewDispatchingServer(sctx, "", badObjectDispatcher{}); err != nil {
t.Fatalf("ServeDispatcher failed: %v", err)
}
// TODO(mattr): It doesn't necessarily make sense to me that a bad object from
// the dispatcher results in a retry.
var cancel context.CancelFunc
cctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()
var result string
if err := v23.GetClient(cctx).Call(cctx, "servername", "SomeMethod", nil, []interface{}{&result}); err == nil {
// TODO(caprita): Check the error type rather than
// merely ensuring the test doesn't panic.
t.Fatalf("Call should have failed")
}
}
type statusServer struct{ ch chan struct{} }
func (s *statusServer) Hang(ctx *context.T, _ rpc.ServerCall) error {
s.ch <- struct{}{} // Notify the server has received a call.
<-s.ch // Wait for the server to be ready to go.
return nil
}
func TestServerStatus(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
serverChan := make(chan struct{})
sctx, cancel := context.WithCancel(ctx)
_, server, err := v23.WithNewServer(sctx, "test", &statusServer{serverChan}, nil)
if err != nil {
t.Fatal(err)
}
status := server.Status()
if got, want := status.State, rpc.ServerActive; got != want {
t.Fatalf("got %s, want %s", got, want)
}
progress := make(chan error)
makeCall := func(ctx *context.T) {
call, err := v23.GetClient(ctx).StartCall(ctx, "test", "Hang", nil)
progress <- err
progress <- call.Finish()
}
go makeCall(ctx)
// Wait for RPC to start and the server has received the call.
if err := <-progress; err != nil {
t.Fatal(err)
}
<-serverChan
// Stop server asynchronously
go func() {
cancel()
<-server.Closed()
}()
waitForStatus := func(want rpc.ServerState) {
then := time.Now()
for {
status = server.Status()
if got := status.State; got != want {
if time.Now().Sub(then) > time.Minute {
t.Fatalf("got %s, want %s", got, want)
}
} else {
break
}
time.Sleep(100 * time.Millisecond)
}
}
// Server should enter 'ServerStopping' state.
waitForStatus(rpc.ServerStopping)
// Server won't stop until the statusServer's hung method completes.
close(serverChan)
// Wait for RPC to finish
if err := <-progress; err != nil {
t.Fatal(err)
}
// Now that the RPC is done, the server should be able to stop.
waitForStatus(rpc.ServerStopped)
}
func TestPublisherStatus(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
sctx := withPrincipal(t, ctx, "server")
sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{
Addrs: rpc.ListenAddrs{
{"tcp", "127.0.0.1:0"},
{"tcp", "127.0.0.1:0"},
},
})
_, server, err := v23.WithNewServer(sctx, "foo", &testServer{}, nil)
if err != nil {
t.Fatal(err)
}
status := testutil.WaitForServerPublished(server)
if got, want := len(status.PublisherStatus), 2; got != want {
t.Errorf("got %d, want %d", got, want)
}
eps := server.Status().Endpoints
if got, want := len(eps), 2; got != want {
t.Fatalf("got %d, want %d", got, want)
}
// Add a second name and we should now see 4 mounts, 2 for each name.
if err := server.AddName("bar"); err != nil {
t.Fatal(err)
}
status = testutil.WaitForServerPublished(server)
if got, want := len(status.PublisherStatus), 4; got != want {
t.Errorf("got %d, want %d", got, want)
}
serversPerName := map[string][]string{}
for _, ms := range status.PublisherStatus {
serversPerName[ms.Name] = append(serversPerName[ms.Name], ms.Server)
}
if got, want := len(serversPerName), 2; got != want {
t.Fatalf("got %d, want %d", got, want)
}
for _, name := range []string{"foo", "bar"} {
if got, want := len(serversPerName[name]), 2; got != want {
t.Errorf("got %d, want %d", got, want)
}
sort.Strings(serversPerName[name])
if got, want := serversPerName[name], endpointToStrings(eps); !reflect.DeepEqual(got, want) {
t.Errorf("got %v, want %v", got, want)
}
}
}
func TestIsLeafServerOption(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
_, _, err := v23.WithNewDispatchingServer(ctx, "leafserver",
&testServerDisp{&testServer{}}, options.IsLeaf(true))
if err != nil {
t.Fatal(err)
}
// we have set IsLeaf to true, sending any suffix to leafserver should result
// in an suffix was not expected error.
var result string
callErr := v23.GetClient(ctx).Call(ctx, "leafserver/unwantedSuffix", "Echo", []interface{}{"Mirror on the wall"}, []interface{}{&result})
if callErr == nil {
t.Fatalf("Call should have failed with suffix was not expected error")
}
}
func endpointToStrings(eps []naming.Endpoint) []string {
r := []string{}
for _, ep := range eps {
r = append(r, ep.String())
}
sort.Strings(r)
return r
}
type ldServer struct {
started chan struct{}
wait chan struct{}
}
func (s *ldServer) Do(ctx *context.T, call rpc.ServerCall) (bool, error) {
<-s.wait
return ctx.Err() != nil, nil
}
func TestLameDuck(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
cases := []struct {
timeout time.Duration
finishError bool
wasCanceled bool
}{
{timeout: time.Minute, wasCanceled: false},
{timeout: 0, finishError: true},
}
for _, c := range cases {
s := &ldServer{wait: make(chan struct{})}
sctx, cancel := context.WithCancel(ctx)
_, server, err := v23.WithNewServer(sctx, "ld", s, nil, options.LameDuckTimeout(c.timeout))
if err != nil {
t.Fatal(err)
}
call, err := v23.GetClient(ctx).StartCall(ctx, "ld", "Do", nil)
if err != nil {
t.Fatal(err)
}
// Now cancel the context putting the server into lameduck mode.
cancel()
// Now allow the call to complete and see if the context was canceled.
close(s.wait)
var wasCanceled bool
err = call.Finish(&wasCanceled)
if c.finishError {
if err == nil {
t.Errorf("case: %v: Expected error for call but didn't get one", c)
}
} else {
if wasCanceled != c.wasCanceled {
t.Errorf("case %v: got %v.", c, wasCanceled)
}
}
<-server.Closed()
}
}
type dummyService struct{}
func (dummyService) Do(ctx *context.T, call rpc.ServerCall) error {
return nil
}
func mountedBlessings(ctx *context.T, name string) ([]string, error) {
me, err := v23.GetNamespace(ctx).Resolve(ctx, "server")
if err != nil {
return nil, err
}
for _, ms := range me.Servers {
ep, err := naming.ParseEndpoint(ms.Server)
if err != nil {
return nil, err
}
return ep.BlessingNames(), nil
}
return nil, nil
}
func TestUpdateServerBlessings(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
v23.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
idp := testutil.IDProviderFromPrincipal(v23.GetPrincipal(ctx))
serverp := testutil.NewPrincipal()
idp.Bless(serverp, "b1")
sctx, err := v23.WithPrincipal(ctx, serverp)
if err != nil {
t.Error(err)
}
if _, _, err = v23.WithNewServer(sctx, "server", dummyService{}, security.AllowEveryone()); err != nil {
t.Error(err)
}
want := "test-blessing:b1"
for {
bs, err := mountedBlessings(ctx, "server")
if err == nil && len(bs) == 1 && bs[0] == want {
break
}
time.Sleep(10 * time.Millisecond)
}
pcon, err := v23.GetClient(ctx).PinConnection(ctx, "server")
if err != nil {
t.Error(err)
}
defer pcon.Unpin()
conn := pcon.Conn()
names, _ := security.RemoteBlessingNames(ctx, security.NewCall(&security.CallParams{
LocalPrincipal: v23.GetPrincipal(ctx),
RemoteBlessings: conn.RemoteBlessings(),
}))
if len(names) != 1 || names[0] != want {
t.Errorf("got %v, wanted %q", names, want)
}
// Now we bless the server with a new blessing (which will change its default
// blessing). Then we wait for the new value to propogate to the remote end.
idp.Bless(serverp, "b2")
want = "test-blessing:b2"
for {
bs, err := mountedBlessings(ctx, "server")
if err == nil && len(bs) == 1 && bs[0] == want {
break
}
time.Sleep(10 * time.Millisecond)
}
for {
names, _ := security.RemoteBlessingNames(ctx, security.NewCall(&security.CallParams{
LocalPrincipal: v23.GetPrincipal(ctx),
RemoteBlessings: conn.RemoteBlessings(),
}))
if len(names) == 1 || names[0] == want {
break
}
time.Sleep(10 * time.Millisecond)
}
}
func TestListenNetworkInterrupted(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
bp := newBrokenNetProtocol()
sctx, cancel := context.WithCancel(ctx)
sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "broken", Address: "127.0.0.1:0"}}})
// If the network is broken and prevents listening before the server is created,
// the server should listen and accept rpcs when the network is fixed.
bp.setBroken(true)
_, server, err := v23.WithNewServer(sctx, "server", &testServer{}, security.AllowEveryone())
if err != nil {
t.Error(err)
}
// rpc should fail until the network works.
if err := v23.GetClient(ctx).Call(ctx, "server", "Closure", nil, nil, options.NoRetry{}); err == nil {
t.Errorf("call should have failed")
}
bp.setBroken(false)
// rpc should succeed now that network is working again.
if err := v23.GetClient(ctx).Call(ctx, "server", "Closure", nil, nil); err != nil {
t.Error(err)
}
// kill connections to clear the cache.
bp.killConnections()
// If the network is broken after the server is created and successfully listened,
// the server should listen and accept rpcs when the network is fixed.
bp.setBroken(true)
// rpc should fail until the network works.
if err := v23.GetClient(ctx).Call(ctx, "server", "Closure", nil, nil, options.NoRetry{}); err == nil {
t.Errorf("call should have failed")
}
bp.setBroken(false)
// rpc should succeed now that network is working again.
if err := v23.GetClient(ctx).Call(ctx, "server", "Closure", nil, nil); err != nil {
t.Error(err)
}
if eps := server.Status().Endpoints; len(eps) != 1 {
t.Errorf("server should only be listening on one endpoint, got %v", eps)
}
cancel()
<-server.Closed()
}
type brokenNetProtocol struct {
protocol flow.Protocol
mu sync.Mutex
// Unfortunately broken needs to be a channel rather than a bool since we need
// to interrupt listener.Accept() when the network breaks.
broken chan struct{}
conns []flow.Conn
}
func newBrokenNetProtocol() *brokenNetProtocol {
p, _ := flow.RegisteredProtocol("tcp")
bp := &brokenNetProtocol{protocol: p, broken: make(chan struct{})}
flow.RegisterProtocol("broken", bp)
return bp
}
// setBroken sets the protocol to disallow all dials and listens if broken is true.
func (p *brokenNetProtocol) setBroken(broken bool) {
defer p.mu.Unlock()
p.mu.Lock()
if broken {
select {
case <-p.broken:
default:
close(p.broken)
}
} else {
select {
case <-p.broken:
p.broken = make(chan struct{})
default:
}
}
}
func (p *brokenNetProtocol) isBroken() bool {
p.mu.Lock()
broken := p.broken
p.mu.Unlock()
select {
case <-broken:
return true
default:
}
return false
}
// killConnections kills all connections to avoid using cached connections.
func (p *brokenNetProtocol) killConnections() {
defer p.mu.Unlock()
p.mu.Lock()
for _, c := range p.conns {
c.Close()
}
p.conns = nil
}
func (p *brokenNetProtocol) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) {
// Return an error if the network is broken.
if p.isBroken() {
return nil, fmt.Errorf("network is broken")
}
c, err := p.protocol.Dial(ctx, "tcp", address, timeout)
if err != nil {
return nil, err
}
p.mu.Lock()
p.conns = append(p.conns, c)
p.mu.Unlock()
return c, nil
}
func (p *brokenNetProtocol) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
// Return an error if the network is broken.
if p.isBroken() {
return nil, fmt.Errorf("network is broken")
}
l, err := p.protocol.Listen(ctx, "tcp", address)
if err != nil {
return nil, err
}
return &brokenNetListener{p: p, Listener: l}, nil
}
func (p *brokenNetProtocol) Resolve(ctx *context.T, protocol, address string) (string, []string, error) {
return p.protocol.Resolve(ctx, "tcp", address)
}
type brokenNetListener struct {
p *brokenNetProtocol
flow.Listener
}
type connAndErr struct {
c flow.Conn
err error
}
func (l *brokenNetListener) Accept(ctx *context.T) (flow.Conn, error) {
ch := make(chan connAndErr)
go func() {
c, err := l.Listener.Accept(ctx)
ch <- connAndErr{c, err}
}()
l.p.mu.Lock()
broken := l.p.broken
l.p.mu.Unlock()
select {
case <-broken:
cae := <-ch
c, err := cae.c, cae.err
if err != nil {
return nil, err
}
c.Close()
return nil, fmt.Errorf("network is broken")
case cae := <-ch:
c, err := cae.c, cae.err
if err != nil {
return nil, err
}
l.p.mu.Lock()
l.p.conns = append(l.p.conns, c)
l.p.mu.Unlock()
return c, nil
}
}