blob: cd5d9e97acb49cb2933646821a87cfb652676d25 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package test
import (
"io"
"testing"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/x/ref"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
"v.io/x/ref/test"
)
type canceld struct {
name string
child string
started chan struct{}
canceled chan struct{}
}
func (c *canceld) Run(ctx *context.T, _ rpc.ServerCall) error {
close(c.started)
client := v23.GetClient(ctx)
var done chan struct{}
if c.child != "" {
done = make(chan struct{})
go func() {
client.Call(ctx, c.child, "Run", nil, nil)
close(done)
}()
}
<-ctx.Done()
if done != nil {
<-done
}
close(c.canceled)
return nil
}
func makeCanceld(ctx *context.T, name, child string) (*canceld, error) {
c := &canceld{
name: name,
child: child,
started: make(chan struct{}, 0),
canceled: make(chan struct{}, 0),
}
_, _, err := v23.WithNewServer(ctx, name, c, security.AllowEveryone())
if err != nil {
return nil, err
}
return c, nil
}
// TestCancellationPropagation tests that cancellation propogates along an
// RPC call chain without user intervention.
func TestCancellationPropagation(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
c1, err := makeCanceld(ctx, "c1", "c2")
if err != nil {
t.Fatalf("Can't start server:", err, verror.DebugString(err))
}
c2, err := makeCanceld(ctx, "c2", "")
if err != nil {
t.Fatalf("Can't start server:", err)
}
ctx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
v23.GetClient(ctx).Call(ctx, "c1", "Run", nil, nil)
close(done)
}()
<-c1.started
<-c2.started
cancel()
<-c1.canceled
<-c2.canceled
<-done
}
type cancelTestServer struct {
started chan struct{}
cancelled chan struct{}
t *testing.T
}
func newCancelTestServer(t *testing.T) *cancelTestServer {
return &cancelTestServer{
started: make(chan struct{}),
cancelled: make(chan struct{}),
t: t,
}
}
func (s *cancelTestServer) CancelStreamReader(ctx *context.T, call rpc.StreamServerCall) error {
close(s.started)
var b []byte
if err := call.Recv(&b); err != io.EOF {
s.t.Errorf("Got error %v, want io.EOF", err)
}
<-ctx.Done()
close(s.cancelled)
return nil
}
// CancelStreamIgnorer doesn't read from it's input stream so all it's
// buffers fill. The intention is to show that call.Done() is closed
// even when the stream is stalled.
func (s *cancelTestServer) CancelStreamIgnorer(ctx *context.T, _ rpc.StreamServerCall) error {
close(s.started)
<-ctx.Done()
close(s.cancelled)
return nil
}
func waitForCancel(t *testing.T, ts *cancelTestServer, cancel context.CancelFunc) {
<-ts.started
cancel()
<-ts.cancelled
}
// TestCancel tests cancellation while the server is reading from a stream.
func TestCancel(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
var (
sctx = withPrincipal(t, ctx, "server")
cctx = withPrincipal(t, ctx, "client")
ts = newCancelTestServer(t)
)
_, _, err := v23.WithNewServer(sctx, "cancel", ts, security.AllowEveryone())
if err != nil {
t.Fatal(err)
}
cctx, cancel := context.WithCancel(cctx)
done := make(chan struct{})
go func() {
v23.GetClient(cctx).Call(cctx, "cancel", "CancelStreamReader", nil, nil)
close(done)
}()
waitForCancel(t, ts, cancel)
<-done
}
// TestCancelWithFullBuffers tests that even if the writer has filled the buffers and
// the server is not reading that the cancel message gets through.
func TestCancelWithFullBuffers(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
var (
sctx = withPrincipal(t, ctx, "server")
cctx = withPrincipal(t, ctx, "client")
ts = newCancelTestServer(t)
)
_, _, err := v23.WithNewServer(sctx, "cancel", ts, security.AllowEveryone())
if err != nil {
t.Fatal(err)
}
cctx, cancel := context.WithCancel(cctx)
call, err := v23.GetClient(cctx).StartCall(cctx, "cancel", "CancelStreamIgnorer", nil)
if err != nil {
t.Fatalf("Start call failed: %v", err)
}
// Fill up all the write buffers to ensure that cancelling works even when the stream
// is blocked.
if ref.RPCTransitionState() >= ref.XServers {
call.Send(make([]byte, conn.DefaultBytesBufferedPerFlow-2048))
} else {
call.Send(make([]byte, vc.MaxSharedBytes))
call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
}
done := make(chan struct{})
go func() {
call.Finish()
close(done)
}()
waitForCancel(t, ts, cancel)
<-done
}