blob: da6eab0ac8da0a0cacd02e6d818dde9af5f7fbee [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vc
import (
"bytes"
"crypto/rand"
"io"
"net"
"os"
"testing"
"v.io/v23/context"
"v.io/v23/vtrace"
"v.io/x/ref/lib/flags"
"v.io/x/ref/runtime/internal/lib/upcqueue"
"v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/version"
ivtrace "v.io/x/ref/runtime/internal/vtrace"
"v.io/x/ref/test/testutil"
)
var (
provider = testutil.NewIDProvider("root")
pClient = testutil.NewPrincipal()
pServer = testutil.NewPrincipal()
)
func init() {
provider.Bless(pClient, "client")
provider.Bless(pServer, "server")
}
func Init() (*context.T, context.CancelFunc) {
ctx, cancel := context.RootContext()
ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{
CollectRegexp: ".*",
DumpOnShutdown: true,
CacheSize: 100,
})
if err != nil {
panic(err)
}
return ctx, func() {
vtrace.FormatTraces(os.Stderr, vtrace.GetStore(ctx).TraceRecords(), nil)
cancel()
}
}
type xConn struct {
net.Conn
}
func (xConn) DisableEncryption() {}
func pipe() (xConn, xConn) {
a, b := net.Pipe()
return xConn{a}, xConn{b}
}
func createVCs(clientCtx, serverCtx *context.T, q *upcqueue.T) (client, server stream.XVC) {
xc1, xc2 := pipe()
clientEP := &naming.Endpoint{Address: "client"}
serverEP := &naming.Endpoint{Address: "server"}
vrange := version.Range{Min: 10, Max: 10}
blessings := pServer.BlessingStore().Default()
schan := make(chan stream.XVC, 1)
go func() {
server, err := XNewAccepted(serverCtx, xc2, pServer, serverEP, blessings, vrange, q)
if err != nil {
panic(err)
}
schan <- server
}()
client, err := XNewDialed(clientCtx, xc1, pClient, clientEP, serverEP, vrange)
if err != nil {
panic(err)
}
return client, <-schan
}
func TestXVC(t *testing.T) {
ctx, cancel := Init()
defer cancel()
clientCtx, clientspan := vtrace.WithNewTrace(ctx)
defer clientspan.Finish()
serverCtx, serverspan := vtrace.WithNewTrace(ctx)
defer serverspan.Finish()
q := upcqueue.New()
client, server := createVCs(clientCtx, serverCtx, q)
ch := make(chan []byte)
go func() {
_, span := vtrace.WithNewSpan(serverCtx, "server read")
defer span.Finish()
item, err := q.Get(nil)
if err != nil {
panic(err)
}
sflow := item.(stream.XFlow)
recvbuf := make([]byte, 4<<20)
n, err := io.ReadFull(sflow, recvbuf)
if err != nil {
panic(err)
}
if n != 4<<20 {
t.Errorf("Expected %d bytes got %d", 4<<20, n)
}
ch <- recvbuf
}()
_, span := vtrace.WithNewSpan(clientCtx, "client write")
cflow, err := client.Connect()
if err != nil {
panic(err)
}
sendbuf := make([]byte, 4<<20)
_, err = io.ReadFull(rand.Reader, sendbuf)
if err != nil {
panic(err)
}
_, err = cflow.Write(sendbuf)
if err != nil {
panic(err)
}
cflow.Close()
span.Finish()
if bytes.Compare(sendbuf, <-ch) != 0 {
t.Errorf("Didn't get the sent data.")
}
// Close the client and wait for the server to close.
client.Close(nil)
<-server.Closed()
}