blob: baaebfded8c3101c2cda19a4627571bb33d6bbae [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vc
import (
"bytes"
"io"
"net"
"reflect"
"testing"
"v.io/v23/verror"
"v.io/x/ref/runtime/internal/lib/bqueue"
"v.io/x/ref/runtime/internal/lib/bqueue/drrqueue"
"v.io/x/ref/runtime/internal/lib/iobuf"
"v.io/x/ref/runtime/internal/lib/sync"
"v.io/x/ref/runtime/internal/rpc/stream"
)
// TestWrite is a very basic, easy to follow, but not very thorough test of the
// writer. More thorough testing of flows (and implicitly the writer) is in
// vc_test.go.
func TestWrite(t *testing.T) {
bq := drrqueue.New(128)
defer bq.Close()
bw, err := bq.NewWriter(0, 0, 10)
if err != nil {
t.Fatal(err)
}
shared := sync.NewSemaphore()
shared.IncN(4)
w := newTestWriter(bw, shared)
if n, err := w.Write([]byte("abcd")); n != 4 || err != nil {
t.Errorf("Got (%d, %v) want (4, nil)", n, err)
}
// Should have used up 4 shared counters
if err := shared.TryDecN(1); err != sync.ErrTryAgain {
t.Errorf("Got %v want %v", err, sync.ErrTryAgain)
}
// Further Writes will block until some space has been released.
w.Release(10)
if n, err := w.Write([]byte("efghij")); n != 6 || err != nil {
t.Errorf("Got (%d, %v) want (5, nil)", n, err)
}
// And the release should have returned to the shared counters set
if err := shared.TryDecN(4); err != nil {
t.Errorf("Got %v want %v", err, nil)
}
// Further writes will block since all 10 bytes (provided to NewWriter)
// have been exhausted and Get hasn't been called on bq yet.
deadline := make(chan struct{}, 0)
w.SetDeadline(deadline)
close(deadline)
w.SetDeadline(deadline)
if n, err := w.Write([]byte("k")); n != 0 || !isTimeoutError(err) {
t.Errorf("Got (%d, %v) want (0, timeout error)", n, err)
}
w.Close()
if w.BytesWritten() != 10 {
t.Errorf("Got %d want %d", w.BytesWritten(), 10)
}
_, bufs, err := bq.Get(nil)
var read bytes.Buffer
for _, b := range bufs {
read.Write(b.Contents)
b.Release()
}
if g, w := read.String(), "abcdefghij"; g != w {
t.Errorf("Got %q want %q", g, w)
}
}
func TestCloseBeforeWrite(t *testing.T) {
bq := drrqueue.New(128)
defer bq.Close()
bw, err := bq.NewWriter(0, 0, 10)
if err != nil {
t.Fatal(err)
}
shared := sync.NewSemaphore()
shared.IncN(4)
w := newTestWriter(bw, shared)
w.Close()
if n, err := w.Write([]byte{1, 2}); n != 0 || verror.ErrorID(err) != stream.ErrBadState.ID {
t.Errorf("Got (%v, %v) want (0, %v)", n, err, stream.ErrBadState)
}
}
func TestShutdownBeforeWrite(t *testing.T) {
bq := drrqueue.New(128)
defer bq.Close()
bw, err := bq.NewWriter(0, 0, 10)
if err != nil {
t.Fatal(err)
}
shared := sync.NewSemaphore()
shared.IncN(4)
w := newTestWriter(bw, shared)
w.shutdown(true)
if n, err := w.Write([]byte{1, 2}); n != 0 || err != io.EOF {
t.Errorf("Got (%v, %v) want (0, %v)", n, err, io.EOF)
}
}
func TestCloseDoesNotDiscardPendingWrites(t *testing.T) {
bq := drrqueue.New(128)
defer bq.Close()
bw, err := bq.NewWriter(0, 0, 10)
if err != nil {
t.Fatal(err)
}
shared := sync.NewSemaphore()
shared.IncN(2)
w := newTestWriter(bw, shared)
data := []byte{1, 2}
if n, err := w.Write(data); n != len(data) || err != nil {
t.Fatalf("Got (%d, %v) want (%d, nil)", n, err, len(data))
}
w.Close()
gbw, bufs, err := bq.Get(nil)
if err != nil {
t.Fatal(err)
}
if gbw != bw {
t.Fatalf("Got %v want %v", gbw, bw)
}
if len(bufs) != 1 {
t.Fatalf("Got %d bufs, want 1", len(bufs))
}
if !reflect.DeepEqual(bufs[0].Contents, data) {
t.Fatalf("Got %v want %v", bufs[0].Contents, data)
}
if !gbw.IsDrained() {
t.Fatal("Expected bqueue.Writer to be drained")
}
}
func TestWriterCloseIsIdempotent(t *testing.T) {
bq := drrqueue.New(128)
defer bq.Close()
bw, err := bq.NewWriter(0, 0, 10)
if err != nil {
t.Fatal(err)
}
shared := sync.NewSemaphore()
shared.IncN(1)
w := newTestWriter(bw, shared)
if n, err := w.Write([]byte{1}); n != 1 || err != nil {
t.Fatalf("Got (%d, %v) want (1, nil)", n, err)
}
// Should have used up the shared counter.
if err := shared.TryDec(); err != sync.ErrTryAgain {
t.Fatalf("Got %v want %v", err, sync.ErrTryAgain)
}
w.Close()
// The shared counter should have been returned
if err := shared.TryDec(); err != nil {
t.Fatalf("Got %v want nil", err)
}
// Closing again shouldn't affect the shared counters
w.Close()
if err := shared.TryDec(); err != sync.ErrTryAgain {
t.Fatalf("Got %v want %v", err, sync.ErrTryAgain)
}
}
func TestClosedChannel(t *testing.T) {
bq := drrqueue.New(128)
defer bq.Close()
bw, err := bq.NewWriter(0, 0, 10)
if err != nil {
t.Fatal(err)
}
shared := sync.NewSemaphore()
shared.IncN(4)
w := newTestWriter(bw, shared)
go w.Close()
<-w.Closed()
if n, err := w.Write([]byte{1, 2}); n != 0 || verror.ErrorID(err) != stream.ErrBadState.ID {
t.Errorf("Got (%v, %v) want (0, %v)", n, err, stream.ErrBadState.ID)
}
}
func newTestWriter(bqw bqueue.Writer, shared *sync.Semaphore) *writer {
alloc := iobuf.NewAllocator(iobuf.NewPool(0), 0)
return newWriter(16, bqw, alloc, shared)
}
func isTimeoutError(err error) bool {
neterr, ok := err.(net.Error)
return ok && neterr.Timeout()
}