| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Tests in a separate package to ensure that only the exported API is used in the tests. |
| // |
| // All tests are run with the default security level on VCs (SecurityConfidential). |
| |
| package vif_test |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "net" |
| "reflect" |
| "runtime" |
| "sort" |
| "sync" |
| "testing" |
| "time" |
| |
| "v.io/v23/naming" |
| "v.io/v23/rpc/version" |
| |
| inaming "v.io/x/ref/profiles/internal/naming" |
| "v.io/x/ref/profiles/internal/rpc/stream" |
| "v.io/x/ref/profiles/internal/rpc/stream/vc" |
| "v.io/x/ref/profiles/internal/rpc/stream/vif" |
| iversion "v.io/x/ref/profiles/internal/rpc/version" |
| "v.io/x/ref/test/testutil" |
| ) |
| |
| //go:generate v23 test generate |
| |
| func TestSingleFlowCreatedAtClient(t *testing.T) { |
| client, server := NewClientServer() |
| defer client.Close() |
| |
| clientVC, _, err := createVC(client, server, makeEP(0x5)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| writer, err := clientVC.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| // Test with an empty message to ensure that we correctly |
| // handle closing empty flows. |
| rwSingleFlow(t, writer, acceptFlowAtServer(server), "") |
| writer, err = clientVC.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| rwSingleFlow(t, writer, acceptFlowAtServer(server), "the dark knight") |
| } |
| |
| func TestSingleFlowCreatedAtServer(t *testing.T) { |
| client, server := NewClientServer() |
| defer client.Close() |
| |
| clientVC, serverConnector, err := createVC(client, server, makeEP(0x5)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| ln, err := clientVC.Listen() |
| if err != nil { |
| t.Fatal(err) |
| } |
| writer, err := serverConnector.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| reader, err := ln.Accept() |
| if err != nil { |
| t.Fatal(err) |
| } |
| rwSingleFlow(t, writer, reader, "the dark knight") |
| ln.Close() |
| } |
| |
| func testMultipleVCsAndMultipleFlows(t *testing.T, gomaxprocs int) { |
| // This test dials multiple VCs from the client to the server. |
| // On each VC, it creates multiple flows, writes to them and verifies |
| // that the other process received what was written. |
| |
| // Knobs configuring this test |
| // |
| // In case the test breaks, the knobs can be tuned down to isolate the problem. |
| // In normal circumstances, the knows should be tuned up to stress test the code. |
| const ( |
| nVCs = 6 // Number of VCs created by the client process Dialing. |
| nFlowsFromClientPerVC = 3 // Number of flows initiated by the client process, per VC |
| nFlowsFromServerPerVC = 4 // Number of flows initiated by the server process, per VC |
| |
| // Maximum number of bytes to write and read per flow. |
| // The actual size is selected randomly. |
| maxBytesPerFlow = 512 << 10 // 512KB |
| ) |
| |
| mp := runtime.GOMAXPROCS(gomaxprocs) |
| defer runtime.GOMAXPROCS(mp) |
| client, server := NewClientServer() |
| defer client.Close() |
| |
| // Create all the VCs |
| // clientVCs[i] is the VC at the client process |
| // serverConnectors[i] is the corresponding VC at the server process. |
| clientVCs, serverConnectors, err := createNVCs(client, server, 0, nVCs) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // Create listeners for flows on the client VCs. |
| // Flows are implicitly being listened to at the server (available through server.Accept()) |
| clientLNs, err := createListeners(clientVCs) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // Create flows: |
| // Over each VC, create nFlowsFromClientPerVC initiated by the client |
| // and nFlowsFromServerPerVC initiated by the server. |
| nFlows := nVCs * (nFlowsFromClientPerVC + nFlowsFromServerPerVC) |
| |
| // Fill in random strings that will be written over the Flows. |
| dataWritten := make([]string, nFlows) |
| for i := 0; i < nFlows; i++ { |
| dataWritten[i] = string(testutil.RandomBytes(maxBytesPerFlow)) |
| } |
| |
| // write writes data to flow in randomly sized chunks. |
| write := func(flow stream.Flow, data string) { |
| defer flow.Close() |
| buf := []byte(data) |
| // Split into a random number of Write calls. |
| for len(buf) > 0 { |
| size := 1 + testutil.Intn(len(buf)) // Random number in [1, len(buf)] |
| n, err := flow.Write(buf[:size]) |
| if err != nil { |
| t.Errorf("Write failed: (%d, %v)", n, err) |
| return |
| } |
| buf = buf[size:] |
| } |
| } |
| |
| dataReadChan := make(chan string, nFlows) |
| // read reads from a flow and writes out the data to dataReadChan |
| read := func(flow stream.Flow) { |
| var buf bytes.Buffer |
| var tmp [1024]byte |
| for { |
| n, err := flow.Read(tmp[:testutil.Intn(len(tmp))]) |
| buf.Write(tmp[:n]) |
| if err == io.EOF { |
| break |
| } |
| if err != nil { |
| t.Errorf("Read error: %v", err) |
| break |
| } |
| } |
| dataReadChan <- buf.String() |
| } |
| |
| index := 0 |
| for i := 0; i < len(clientVCs); i++ { |
| for j := 0; j < nFlowsFromClientPerVC; j++ { |
| // Flow initiated by client, read by server |
| writer, err := clientVCs[i].Connect() |
| if err != nil { |
| t.Errorf("clientVCs[%d], flow %d: %v", i, j, err) |
| continue |
| } |
| go write(writer, dataWritten[index]) |
| go read(acceptFlowAtServer(server)) |
| index++ |
| } |
| } |
| for i := 0; i < len(serverConnectors); i++ { |
| for j := 0; j < nFlowsFromServerPerVC; j++ { |
| // Flow initiated by server, read by client |
| writer, err := serverConnectors[i].Connect() |
| if err != nil { |
| t.Errorf("serverConnectors[%d], flow %d: %v", i, j, err) |
| continue |
| } |
| go write(writer, dataWritten[index]) |
| go read(acceptFlowAtClient(clientLNs[i])) |
| index++ |
| } |
| } |
| if index != nFlows { |
| t.Errorf("Created %d flows, wanted %d", index, nFlows) |
| } |
| |
| // Collect all data read and compare against the data written. |
| // Since flows might be accepted in arbitrary order, sort the data before comparing. |
| dataRead := make([]string, index) |
| for i := 0; i < index; i++ { |
| dataRead[i] = <-dataReadChan |
| } |
| sort.Strings(dataWritten) |
| sort.Strings(dataRead) |
| if !reflect.DeepEqual(dataRead, dataWritten) { |
| // Since the strings can be very large, only print out the first few diffs. |
| nDiffs := 0 |
| for i := 0; i < len(dataRead); i++ { |
| if dataRead[i] != dataWritten[i] { |
| nDiffs++ |
| t.Errorf("Diff %d out of %d items: Got %q want %q", nDiffs, i, atmostNbytes(dataRead[i], 20), atmostNbytes(dataWritten[i], 20)) |
| } |
| } |
| if nDiffs > 0 { |
| t.Errorf("#Mismatches:%d #ReadSamples:%d #WriteSamples:%d", nDiffs, len(dataRead), len(dataWritten)) |
| } |
| } |
| } |
| |
| func TestMultipleVCsAndMultipleFlows_1(t *testing.T) { |
| // Test with a single goroutine since that is typically easier to debug |
| // in case of problems. |
| testMultipleVCsAndMultipleFlows(t, 1) |
| } |
| |
| func TestMultipleVCsAndMultipleFlows_5(t *testing.T) { |
| // Test with multiple goroutines, particularly useful for checking |
| // races with |
| // go test -race |
| testMultipleVCsAndMultipleFlows(t, 5) |
| } |
| |
| func TestClose(t *testing.T) { |
| client, server := NewClientServer() |
| vc, _, err := createVC(client, server, makeEP(0x5)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| clientFlow, err := vc.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| serverFlow := acceptFlowAtServer(server) |
| |
| var message = []byte("bugs bunny") |
| go func() { |
| if n, err := clientFlow.Write(message); n != len(message) || err != nil { |
| t.Fatalf("Wrote (%d, %v), want (%d, nil)", n, err, len(message)) |
| } |
| client.Close() |
| }() |
| |
| buf := make([]byte, 1024) |
| // client.Close should drain all pending writes first. |
| if n, err := serverFlow.Read(buf); n != len(message) || err != nil { |
| t.Fatalf("Got (%d, %v) = %q, want (%d, nil) = %q", n, err, buf[:n], len(message), message) |
| } |
| // subsequent reads should fail, since the VIF should be closed. |
| if n, err := serverFlow.Read(buf); n != 0 || err == nil { |
| t.Fatalf("Got (%d, %v) = %q, want (0, nil)", n, err, buf[:n]) |
| } |
| server.Close() |
| } |
| |
| func TestOnClose(t *testing.T) { |
| notifyC, notifyS := make(chan *vif.VIF), make(chan *vif.VIF) |
| notifyFuncC := func(vf *vif.VIF) { notifyC <- vf } |
| notifyFuncS := func(vf *vif.VIF) { notifyS <- vf } |
| |
| // Close the client VIF. Both client and server should be notified. |
| client, server, err := New(nil, nil, notifyFuncC, notifyFuncS, nil, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| client.Close() |
| if got := <-notifyC; got != client { |
| t.Errorf("Want notification for %v; got %v", client, got) |
| } |
| if got := <-notifyS; got != server { |
| t.Errorf("Want notification for %v; got %v", server, got) |
| } |
| |
| // Same as above, but close the server VIF at this time. |
| client, server, err = New(nil, nil, notifyFuncC, notifyFuncS, nil, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| server.Close() |
| if got := <-notifyC; got != client { |
| t.Errorf("Want notification for %v; got %v", client, got) |
| } |
| if got := <-notifyS; got != server { |
| t.Errorf("Want notification for %v; got %v", server, got) |
| } |
| } |
| |
| func testCloseWhenEmpty(t *testing.T, testServer bool) { |
| const ( |
| waitTime = 5 * time.Millisecond |
| ) |
| |
| notify := make(chan interface{}) |
| notifyFunc := func(vf *vif.VIF) { notify <- vf } |
| |
| newVIF := func() (vf, remote *vif.VIF) { |
| var err error |
| vf, remote, err = New(nil, nil, notifyFunc, notifyFunc, nil, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if err = vf.StartAccepting(); err != nil { |
| t.Fatal(err) |
| } |
| if testServer { |
| vf, remote = remote, vf |
| } |
| return |
| } |
| |
| // Initially empty. Should not be closed. |
| vf, remote := newVIF() |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| |
| // Open one VC. Should not be closed. |
| vf, remote = newVIF() |
| if _, _, err := createVC(vf, remote, makeEP(0x10)); err != nil { |
| t.Fatal(err) |
| } |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| |
| // Close the VC. Should be closed. |
| vf.ShutdownVCs(makeEP(0x10)) |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| |
| // Same as above, but open a VC from the remote side. |
| vf, remote = newVIF() |
| _, _, err := createVC(remote, vf, makeEP(0x10)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| remote.ShutdownVCs(makeEP(0x10)) |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| |
| // Create two VCs. |
| vf, remote = newVIF() |
| if _, _, err := createNVCs(vf, remote, 0x10, 2); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Close the first VC twice. Should not be closed. |
| vf.ShutdownVCs(makeEP(0x10)) |
| vf.ShutdownVCs(makeEP(0x10)) |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| |
| // Close the second VC. Should be closed. |
| vf.ShutdownVCs(makeEP(0x10 + 1)) |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| } |
| |
| func TestCloseWhenEmpty(t *testing.T) { testCloseWhenEmpty(t, false) } |
| func TestCloseWhenEmptyServer(t *testing.T) { testCloseWhenEmpty(t, true) } |
| |
| func testStartTimeout(t *testing.T, testServer bool) { |
| const ( |
| startTime = 5 * time.Millisecond |
| // We use a long wait time here since it takes some time for the underlying network |
| // connection of the other side to be closed especially in race testing. |
| waitTime = 150 * time.Millisecond |
| ) |
| |
| notify := make(chan interface{}) |
| notifyFunc := func(vf *vif.VIF) { notify <- vf } |
| |
| newVIF := func() (vf, remote *vif.VIF, triggerTimers func()) { |
| triggerTimers = vif.SetFakeTimers() |
| var vfStartTime, remoteStartTime time.Duration = startTime, 0 |
| if testServer { |
| vfStartTime, remoteStartTime = remoteStartTime, vfStartTime |
| } |
| var err error |
| vf, remote, err = New(nil, nil, notifyFunc, notifyFunc, []stream.VCOpt{vc.StartTimeout{vfStartTime}}, []stream.ListenerOpt{vc.StartTimeout{remoteStartTime}}) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if err = vf.StartAccepting(); err != nil { |
| t.Fatal(err) |
| } |
| if testServer { |
| vf, remote = remote, vf |
| } |
| return |
| } |
| |
| // No VC opened. Should be closed after the start timeout. |
| vf, remote, triggerTimers := newVIF() |
| triggerTimers() |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| |
| // Open one VC. Should not be closed. |
| vf, remote, triggerTimers = newVIF() |
| if _, _, err := createVC(vf, remote, makeEP(0x10)); err != nil { |
| t.Fatal(err) |
| } |
| triggerTimers() |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| |
| // Close the VC. Should be closed. |
| vf.ShutdownVCs(makeEP(0x10)) |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| } |
| |
| func TestStartTimeout(t *testing.T) { testStartTimeout(t, false) } |
| func TestStartTimeoutServer(t *testing.T) { testStartTimeout(t, true) } |
| |
| func testIdleTimeout(t *testing.T, testServer bool) { |
| const ( |
| idleTime = 10 * time.Millisecond |
| waitTime = idleTime * 2 |
| ) |
| |
| notify := make(chan interface{}) |
| notifyFunc := func(vf *vif.VIF) { notify <- vf } |
| |
| newVIF := func() (vf, remote *vif.VIF) { |
| var err error |
| if vf, remote, err = New(nil, nil, notifyFunc, notifyFunc, nil, nil); err != nil { |
| t.Fatal(err) |
| } |
| if err = vf.StartAccepting(); err != nil { |
| t.Fatal(err) |
| } |
| if testServer { |
| vf, remote = remote, vf |
| } |
| return |
| } |
| newVC := func(vf, remote *vif.VIF) (VC stream.VC, ln stream.Listener, remoteVC stream.Connector) { |
| triggerTimers := vif.SetFakeTimers() |
| defer triggerTimers() |
| var err error |
| VC, remoteVC, err = createVC(vf, remote, makeEP(0x10), vc.IdleTimeout{idleTime}) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if ln, err = VC.Listen(); err != nil { |
| t.Fatal(err) |
| } |
| return |
| } |
| newFlow := func(vc stream.VC, remote *vif.VIF) stream.Flow { |
| f, err := vc.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| acceptFlowAtServer(remote) |
| return f |
| } |
| |
| // No active flow. Should be notified. |
| vf, remote := newVIF() |
| _, _, _ = newVC(vf, remote) |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| |
| // Same as above, but with multiple VCs. |
| vf, remote = newVIF() |
| triggerTimers := vif.SetFakeTimers() |
| if _, _, err := createNVCs(vf, remote, 0x10, 5, vc.IdleTimeout{idleTime}); err != nil { |
| t.Fatal(err) |
| } |
| triggerTimers() |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| |
| // Open one flow. Should not be notified. |
| vf, remote = newVIF() |
| vc, _, _ := newVC(vf, remote) |
| f1 := newFlow(vc, remote) |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| |
| // Close the flow. Should be notified. |
| f1.Close() |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| |
| // Open two flows. |
| vf, remote = newVIF() |
| vc, _, _ = newVC(vf, remote) |
| f1 = newFlow(vc, remote) |
| f2 := newFlow(vc, remote) |
| |
| // Close the first flow twice. Should not be notified. |
| f1.Close() |
| f1.Close() |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| |
| // Close the second flow. Should be notified now. |
| f2.Close() |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| |
| // Same as above, but open a flow from the remote side. |
| vf, remote = newVIF() |
| _, ln, remoteVC := newVC(vf, remote) |
| f1, err := remoteVC.Connect() |
| if err != nil { |
| t.Fatal(err) |
| } |
| acceptFlowAtClient(ln) |
| if err := vif.WaitWithTimeout(notify, waitTime); err != nil { |
| t.Error(err) |
| } |
| f1.Close() |
| if err := vif.WaitForNotifications(notify, vf, remote); err != nil { |
| t.Error(err) |
| } |
| } |
| |
| func TestIdleTimeout(t *testing.T) { testIdleTimeout(t, false) } |
| func TestIdleTimeoutServer(t *testing.T) { testIdleTimeout(t, true) } |
| |
| func TestShutdownVCs(t *testing.T) { |
| client, server := NewClientServer() |
| defer server.Close() |
| defer client.Close() |
| |
| testN := func(N int) error { |
| c := client.NumVCs() |
| if c != N { |
| return fmt.Errorf("%d VCs on client VIF, expected %d", c, N) |
| } |
| return nil |
| } |
| |
| if _, _, err := createVC(client, server, makeEP(0x5)); err != nil { |
| t.Fatal(err) |
| } |
| if err := testN(1); err != nil { |
| t.Error(err) |
| } |
| if _, _, err := createVC(client, server, makeEP(0x5)); err != nil { |
| t.Fatal(err) |
| } |
| if err := testN(2); err != nil { |
| t.Error(err) |
| } |
| if _, _, err := createVC(client, server, makeEP(0x7)); err != nil { |
| t.Fatal(err) |
| } |
| if err := testN(3); err != nil { |
| t.Error(err) |
| } |
| // Client does not have any VCs to the endpoint with routing id 0x9, |
| // so nothing should be closed |
| if n := client.ShutdownVCs(makeEP(0x9)); n != 0 { |
| t.Errorf("Expected 0 VCs to be closed, closed %d", n) |
| } |
| if err := testN(3); err != nil { |
| t.Error(err) |
| } |
| // But it does have to 0x5 |
| if n := client.ShutdownVCs(makeEP(0x5)); n != 2 { |
| t.Errorf("Expected 2 VCs to be closed, closed %d", n) |
| } |
| if err := testN(1); err != nil { |
| t.Error(err) |
| } |
| // And 0x7 |
| if n := client.ShutdownVCs(makeEP(0x7)); n != 1 { |
| t.Errorf("Expected 2 VCs to be closed, closed %d", n) |
| } |
| if err := testN(0); err != nil { |
| t.Error(err) |
| } |
| } |
| |
| type versionTestCase struct { |
| client, server, ep *iversion.Range |
| expectError bool |
| expectVIFError bool |
| } |
| |
| func (tc *versionTestCase) Run(t *testing.T) { |
| client, server, err := NewVersionedClientServer(tc.client, tc.server) |
| if (err != nil) != tc.expectVIFError { |
| t.Errorf("Error mismatch. Wanted error: %v, got %v; client: %v, server: %v", tc.expectVIFError, err, tc.client, tc.server) |
| } |
| if err != nil { |
| return |
| } |
| defer client.Close() |
| |
| ep := &inaming.Endpoint{ |
| Protocol: "test", |
| Address: "addr", |
| RID: naming.FixedRoutingID(0x5), |
| } |
| clientVC, _, err := createVC(client, server, ep) |
| if (err != nil) != tc.expectError { |
| t.Errorf("Error mismatch. Wanted error: %v, got %v (client:%v, server:%v ep:%v)", tc.expectError, err, tc.client, tc.server, tc.ep) |
| |
| } |
| if err != nil { |
| return |
| } |
| |
| writer, err := clientVC.Connect() |
| if err != nil { |
| t.Errorf("Unexpected error on case %+v: %v", tc, err) |
| return |
| } |
| |
| rwSingleFlow(t, writer, acceptFlowAtServer(server), "the dark knight") |
| } |
| |
| // TestIncompatibleVersions tests many cases where the client and server |
| // have compatible or incompatible supported version ranges. It ensures |
| // that overlapping ranges work properly, but non-overlapping ranges generate |
| // errors. |
| func TestIncompatibleVersions(t *testing.T) { |
| unknown := &iversion.Range{version.UnknownRPCVersion, version.UnknownRPCVersion} |
| tests := []versionTestCase{ |
| {&iversion.Range{2, 2}, &iversion.Range{2, 2}, &iversion.Range{2, 2}, false, false}, |
| {&iversion.Range{2, 3}, &iversion.Range{3, 5}, &iversion.Range{3, 5}, false, false}, |
| {&iversion.Range{2, 3}, &iversion.Range{3, 5}, unknown, false, false}, |
| |
| // VIF error since there are no versions in common. |
| {&iversion.Range{2, 3}, &iversion.Range{4, 5}, &iversion.Range{4, 5}, true, true}, |
| {&iversion.Range{2, 3}, &iversion.Range{4, 5}, unknown, true, true}, |
| } |
| |
| for _, tc := range tests { |
| tc.Run(t) |
| } |
| } |
| |
| func TestNetworkFailure(t *testing.T) { |
| c1, c2 := pipe() |
| result := make(chan *vif.VIF) |
| pclient := testutil.NewPrincipal("client") |
| go func() { |
| client, err := vif.InternalNewDialedVIF(c1, naming.FixedRoutingID(0xc), pclient, nil, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| result <- client |
| }() |
| pserver := testutil.NewPrincipal("server") |
| blessings := pserver.BlessingStore().Default() |
| server, err := vif.InternalNewAcceptedVIF(c2, naming.FixedRoutingID(0x5), pserver, blessings, nil, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| client := <-result |
| // If the network connection dies, Dial and Accept should fail. |
| c1.Close() |
| if _, err := client.Dial(makeEP(0x5), pclient); err == nil { |
| t.Errorf("Expected client.Dial to fail") |
| } |
| if _, err := server.Accept(); err == nil { |
| t.Errorf("Expected server.Accept to fail") |
| } |
| } |
| |
| func makeEP(rid uint64) naming.Endpoint { |
| return &inaming.Endpoint{ |
| Protocol: "test", |
| Address: "addr", |
| RID: naming.FixedRoutingID(rid), |
| } |
| } |
| |
| // pipeAddr provides a more descriptive String implementation than provided by net.Pipe. |
| type pipeAddr struct{ name string } |
| |
| func (a pipeAddr) Network() string { return "pipe" } |
| func (a pipeAddr) String() string { return a.name } |
| |
| // pipeConn provides a buffered net.Conn, with pipeAddr addressing. |
| type pipeConn struct { |
| lock sync.Mutex |
| // w is guarded by lock, to prevent Close from racing with Write. This is a |
| // quick way to prevent the race, but it allows a Write to block the Close. |
| // This isn't a problem in the tests currently. |
| w chan<- []byte |
| r <-chan []byte |
| rdata []byte |
| laddr, raddr pipeAddr |
| } |
| |
| func (c *pipeConn) Read(data []byte) (int, error) { |
| for len(c.rdata) == 0 { |
| d, ok := <-c.r |
| if !ok { |
| return 0, io.EOF |
| } |
| c.rdata = d |
| } |
| n := copy(data, c.rdata) |
| c.rdata = c.rdata[n:] |
| return n, nil |
| } |
| |
| func (c *pipeConn) Write(data []byte) (int, error) { |
| c.lock.Lock() |
| defer c.lock.Unlock() |
| if c.w == nil { |
| return 0, io.EOF |
| } |
| d := make([]byte, len(data)) |
| copy(d, data) |
| c.w <- d |
| return len(data), nil |
| } |
| |
| func (c *pipeConn) Close() error { |
| c.lock.Lock() |
| defer c.lock.Unlock() |
| if c.w == nil { |
| return io.EOF |
| } |
| close(c.w) |
| c.w = nil |
| return nil |
| } |
| |
| func (c *pipeConn) LocalAddr() net.Addr { return c.laddr } |
| func (c *pipeConn) RemoteAddr() net.Addr { return c.raddr } |
| func (c *pipeConn) SetDeadline(t time.Time) error { return nil } |
| func (c *pipeConn) SetReadDeadline(t time.Time) error { return nil } |
| func (c *pipeConn) SetWriteDeadline(t time.Time) error { return nil } |
| |
| func pipe() (net.Conn, net.Conn) { |
| clientAddr := pipeAddr{"client"} |
| serverAddr := pipeAddr{"server"} |
| c1 := make(chan []byte, 10) |
| c2 := make(chan []byte, 10) |
| p1 := &pipeConn{w: c1, r: c2, laddr: clientAddr, raddr: serverAddr} |
| p2 := &pipeConn{w: c2, r: c1, laddr: serverAddr, raddr: clientAddr} |
| return p1, p2 |
| } |
| |
| func NewClientServer() (client, server *vif.VIF) { |
| var err error |
| client, server, err = New(nil, nil, nil, nil, nil, nil) |
| if err != nil { |
| panic(err) |
| } |
| return |
| } |
| |
| func NewVersionedClientServer(clientVersions, serverVersions *iversion.Range) (client, server *vif.VIF, verr error) { |
| return New(clientVersions, serverVersions, nil, nil, nil, nil) |
| } |
| |
| func New(clientVersions, serverVersions *iversion.Range, clientOnClose, serverOnClose func(*vif.VIF), opts []stream.VCOpt, lopts []stream.ListenerOpt) (client, server *vif.VIF, verr error) { |
| c1, c2 := pipe() |
| var cerr error |
| cl := make(chan *vif.VIF) |
| go func() { |
| c, err := vif.InternalNewDialedVIF(c1, naming.FixedRoutingID(0xc), testutil.NewPrincipal("client"), clientVersions, clientOnClose, opts...) |
| if err != nil { |
| cerr = err |
| close(cl) |
| } else { |
| cl <- c |
| } |
| }() |
| pserver := testutil.NewPrincipal("server") |
| bserver := pserver.BlessingStore().Default() |
| s, err := vif.InternalNewAcceptedVIF(c2, naming.FixedRoutingID(0x5), pserver, bserver, serverVersions, serverOnClose, lopts...) |
| c, ok := <-cl |
| if err != nil { |
| verr = err |
| return |
| } |
| if !ok { |
| verr = cerr |
| return |
| } |
| server = s |
| client = c |
| return |
| } |
| |
| // rwSingleFlow writes out data on writer and ensures that the reader sees the same string. |
| func rwSingleFlow(t *testing.T, writer io.WriteCloser, reader io.Reader, data string) { |
| go func() { |
| if n, err := writer.Write([]byte(data)); n != len(data) || err != nil { |
| t.Errorf("Write failure. Got (%d, %v) want (%d, nil)", n, err, len(data)) |
| } |
| writer.Close() |
| }() |
| |
| var buf bytes.Buffer |
| var tmp [4096]byte |
| for { |
| n, err := reader.Read(tmp[:]) |
| buf.Write(tmp[:n]) |
| if err == io.EOF { |
| break |
| } |
| if err != nil { |
| t.Errorf("Read error: %v", err) |
| } |
| } |
| if buf.String() != data { |
| t.Errorf("Wrote %q but read %q", data, buf.String()) |
| } |
| } |
| |
| // createVC creates a VC by dialing from the client process to the server |
| // process. It returns the VC at the client and the Connector at the server |
| // (which the server can use to create flows over the VC)). |
| func createVC(client, server *vif.VIF, ep naming.Endpoint, opts ...stream.VCOpt) (clientVC stream.VC, serverConnector stream.Connector, err error) { |
| vcChan := make(chan stream.VC) |
| scChan := make(chan stream.Connector) |
| errChan := make(chan error) |
| go func() { |
| vc, err := client.Dial(ep, testutil.NewPrincipal("client"), opts...) |
| errChan <- err |
| vcChan <- vc |
| }() |
| go func() { |
| cAndf, err := server.Accept() |
| errChan <- err |
| if err == nil { |
| scChan <- cAndf.Connector |
| } |
| }() |
| if err = <-errChan; err != nil { |
| return |
| } |
| if err = <-errChan; err != nil { |
| return |
| } |
| clientVC = <-vcChan |
| serverConnector = <-scChan |
| return |
| } |
| |
| func createNVCs(client, server *vif.VIF, startRID uint64, N int, opts ...stream.VCOpt) (clientVCs []stream.VC, serverConnectors []stream.Connector, err error) { |
| var c stream.VC |
| var s stream.Connector |
| for i := 0; i < N; i++ { |
| c, s, err = createVC(client, server, makeEP(startRID+uint64(i)), opts...) |
| if err != nil { |
| return |
| } |
| clientVCs = append(clientVCs, c) |
| serverConnectors = append(serverConnectors, s) |
| } |
| return |
| } |
| |
| func createListeners(vcs []stream.VC) ([]stream.Listener, error) { |
| var ret []stream.Listener |
| for _, vc := range vcs { |
| ln, err := vc.Listen() |
| if err != nil { |
| return nil, err |
| } |
| ret = append(ret, ln) |
| } |
| return ret, nil |
| } |
| |
| func acceptFlowAtServer(vf *vif.VIF) stream.Flow { |
| for { |
| cAndf, err := vf.Accept() |
| if err != nil { |
| panic(err) |
| } |
| if cAndf.Flow != nil { |
| return cAndf.Flow |
| } |
| } |
| } |
| |
| func acceptFlowAtClient(ln stream.Listener) stream.Flow { |
| f, err := ln.Accept() |
| if err != nil { |
| panic(err) |
| } |
| return f |
| } |
| |
| func atmostNbytes(s string, n int) string { |
| if n > len(s) { |
| return s |
| } |
| b := []byte(s) |
| return string(b[:n/2]) + "..." + string(b[len(s)-n/2:]) |
| } |