Merge "services/groups: storage engine update follow up (v.io/c/13650)"
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index ef03f4e..3e7b57e 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -11,4 +11,7 @@
// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
error (
LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
+ UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
+ ManagerClosed() {"en": "manager is already closed"}
+ AcceptFailed(err error) {"en": "accept failed{:err}"}
)
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 2029486..741c22a 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -16,13 +16,34 @@
var (
ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+ ErrUnknownProtocol = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+ ErrManagerClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+ ErrAcceptFailed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
)
func init() {
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
}
// NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
func NewErrLargerThan3ByteUInt(ctx *context.T) error {
return verror.New(ErrLargerThan3ByteUInt, ctx)
}
+
+// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
+func NewErrUnknownProtocol(ctx *context.T, protocol string) error {
+ return verror.New(ErrUnknownProtocol, ctx, protocol)
+}
+
+// NewErrManagerClosed returns an error with the ErrManagerClosed ID.
+func NewErrManagerClosed(ctx *context.T) error {
+ return verror.New(ErrManagerClosed, ctx)
+}
+
+// NewErrAcceptFailed returns an error with the ErrAcceptFailed ID.
+func NewErrAcceptFailed(ctx *context.T, err error) error {
+ return verror.New(ErrAcceptFailed, ctx, err)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
new file mode 100644
index 0000000..e6d9360
--- /dev/null
+++ b/runtime/internal/flow/manager/manager.go
@@ -0,0 +1,244 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "net"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+
+ "v.io/x/ref/runtime/internal/flow/conn"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+ inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/runtime/internal/rpc/version"
+)
+
+type manager struct {
+ rid naming.RoutingID
+ closed <-chan struct{}
+ q *upcqueue.T
+
+ mu *sync.Mutex
+ listenEndpoints []naming.Endpoint
+}
+
+func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+ m := &manager{
+ rid: rid,
+ closed: ctx.Done(),
+ mu: &sync.Mutex{},
+ q: upcqueue.New(),
+ }
+ return m
+}
+
+// Listen causes the Manager to accept flows from the provided protocol and address.
+// Listen may be called muliple times.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+ netLn, err := listen(ctx, protocol, address)
+ if err != nil {
+ return flow.NewErrNetwork(ctx, err)
+ }
+ local := &inaming.Endpoint{
+ Protocol: protocol,
+ Address: netLn.Addr().String(),
+ RID: m.rid,
+ }
+ m.mu.Lock()
+ m.listenEndpoints = append(m.listenEndpoints, local)
+ m.mu.Unlock()
+ go m.netLnAcceptLoop(ctx, netLn, local)
+ return nil
+}
+
+func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+ const killConnectionsRetryDelay = 5 * time.Millisecond
+ for {
+ netConn, err := netLn.Accept()
+ for tokill := 1; isTemporaryError(err); tokill *= 2 {
+ if isTooManyOpenFiles(err) {
+ // TODO(suharshs): Find a way to kill connections here. We will need
+ // caching to be able to delete the connections.
+ } else {
+ tokill = 1
+ }
+ time.Sleep(killConnectionsRetryDelay)
+ netConn, err = netLn.Accept()
+ }
+ if err != nil {
+ ctx.VI(2).Infof("net.Listener.Accept on localEP %v failed: %v", local, err)
+ }
+ // TODO(suharshs): This conn needs to be cached instead of ignored.
+ _, err = conn.NewAccepted(
+ ctx,
+ &framer{ReadWriter: netConn},
+ local,
+ v23.GetPrincipal(ctx).BlessingStore().Default(),
+ version.Supported,
+ &flowHandler{q: m.q, closed: m.closed},
+ )
+ if err != nil {
+ netConn.Close()
+ ctx.VI(2).Infof("failed to accept flow.Conn on localEP %v failed: %v", local, err)
+ }
+ }
+}
+
+type flowHandler struct {
+ q *upcqueue.T
+ closed <-chan struct{}
+}
+
+func (h *flowHandler) HandleFlow(f flow.Flow) error {
+ select {
+ case <-h.closed:
+ // This will make the Put call below return a upcqueue.ErrQueueIsClosed.
+ h.q.Close()
+ default:
+ }
+ return h.q.Put(f)
+}
+
+// ListeningEndpoints returns the endpoints that the Manager has explicitly
+// listened on. The Manager will accept new flows on these endpoints.
+// Returned endpoints all have a RoutingID unique to the Acceptor.
+func (m *manager) ListeningEndpoints() []naming.Endpoint {
+ m.mu.Lock()
+ ret := make([]naming.Endpoint, len(m.listenEndpoints))
+ copy(ret, m.listenEndpoints)
+ m.mu.Unlock()
+ return ret
+}
+
+// Accept blocks until a new Flow has been initiated by a remote process.
+// Flows are accepted from addresses that the Manager is listening on,
+// including outgoing dialed connections.
+//
+// For example:
+// err := m.Listen(ctx, "tcp", ":0")
+// for {
+// flow, err := m.Accept(ctx)
+// // process flow
+// }
+//
+// can be used to accept Flows initiated by remote processes.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
+ // TODO(suharshs): Ensure that m is attached to ctx.
+ item, err := m.q.Get(m.closed)
+ switch {
+ case err == upcqueue.ErrQueueIsClosed:
+ return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
+ case err != nil:
+ return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
+ default:
+ return item.(flow.Flow), nil
+ }
+}
+
+// Dial creates a Flow to the provided remote endpoint, using 'fn' to
+// determine the blessings that will be sent to the remote end.
+//
+// To maximize re-use of connections, the Manager will also Listen on Dialed
+// connections for the lifetime of the connection.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
+ // TODO(suharshs): Add caching of connections.
+ addr := remote.Addr()
+ d, _, _, _ := rpc.RegisteredProtocol(addr.Network())
+ netConn, err := dial(ctx, d, addr.Network(), addr.String())
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+
+ c, err := conn.NewDialed(
+ ctx,
+ &framer{ReadWriter: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
+ localEndpoint(netConn, m.rid),
+ remote,
+ version.Supported,
+ &flowHandler{q: m.q, closed: m.closed},
+ fn,
+ )
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+ return c.Dial(ctx)
+}
+
+// Closed returns a channel that remains open for the lifetime of the Manager
+// object. Once the channel is closed any operations on the Manager will
+// necessarily fail.
+func (m *manager) Closed() <-chan struct{} {
+ return m.closed
+}
+
+func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
+ if d != nil {
+ var timeout time.Duration
+ if dl, ok := ctx.Deadline(); ok {
+ timeout = dl.Sub(time.Now())
+ }
+ return d(protocol, address, timeout)
+ }
+ return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
+ if r != nil {
+ net, addr, err := r(protocol, address)
+ if err != nil {
+ return "", "", err
+ }
+ return net, addr, nil
+ }
+ return "", "", NewErrUnknownProtocol(ctx, protocol)
+}
+
+func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
+ if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
+ ln, err := l(protocol, address)
+ if err != nil {
+ return nil, err
+ }
+ return ln, nil
+ }
+ return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+ localAddr := conn.LocalAddr()
+ ep := &inaming.Endpoint{
+ Protocol: localAddr.Network(),
+ Address: localAddr.String(),
+ RID: rid,
+ }
+ return ep
+}
+
+func isTemporaryError(err error) bool {
+ oErr, ok := err.(*net.OpError)
+ return ok && oErr.Temporary()
+}
+
+func isTooManyOpenFiles(err error) bool {
+ oErr, ok := err.(*net.OpError)
+ return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
+}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
new file mode 100644
index 0000000..a179692
--- /dev/null
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+ "bufio"
+ "strings"
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/runtime/internal/flow/manager"
+ "v.io/x/ref/test/testutil"
+)
+
+func TestDirectConnection(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ p := testutil.NewPrincipal("test")
+ ctx, err := v23.WithPrincipal(ctx, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ rid := naming.FixedRoutingID(0x5555)
+ m := manager.New(ctx, rid)
+ want := "read this please"
+
+ if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+ t.Fatal(err)
+ }
+
+ bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+ eps := m.ListeningEndpoints()
+ if len(eps) == 0 {
+ t.Fatalf("no endpoints listened on")
+ }
+ flow, err := m.Dial(ctx, eps[0], bFn)
+ if err != nil {
+ t.Error(err)
+ }
+ writeLine(flow, want)
+
+ flow, err = m.Accept(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := readLine(flow)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
+
+func readLine(f flow.Flow) (string, error) {
+ s, err := bufio.NewReader(f).ReadString('\n')
+ return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+ data += "\n"
+ _, err := f.Write([]byte(data))
+ return err
+}
diff --git a/runtime/internal/mojo_util.go b/runtime/internal/mojo_util.go
new file mode 100644
index 0000000..e434d68
--- /dev/null
+++ b/runtime/internal/mojo_util.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build mojo
+
+package internal
+
+import (
+ "flag"
+ "os"
+ "strings"
+
+ "v.io/x/ref/lib/flags"
+)
+
+// TODO(sadovsky): Terrible, terrible hack.
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+ // We expect that command-line flags have not been parsed. v23_util performs
+ // command-line parsing at this point. For Mojo, we instead parse command-line
+ // flags from the V23_MOJO_FLAGS env var.
+ // TODO(sadovsky): Maybe move this check to util.go, or drop it?
+ if flag.CommandLine.Parsed() {
+ panic("flag.CommandLine.Parse() has been called")
+ }
+ // TODO(sadovsky): Support argument quoting. More generally, parse this env
+ // var similar to how bash parses arguments.
+ return f.Parse(strings.Split(os.Getenv("V23_MOJO_FLAGS"), " "), config)
+}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 63f3133..06cc3cd 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -26,6 +26,7 @@
// Min is incremented whenever we want to remove support for old protocol
// versions.
var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
+var Supported = version.RPCVersionRange{Min: version.RPCVersion10, Max: version.RPCVersion11}
func init() {
metadata.Insert("v23.RPCVersionMax", fmt.Sprint(SupportedRange.Max))
diff --git a/runtime/internal/util.go b/runtime/internal/util.go
index 4df3db4..f5f6f6a 100644
--- a/runtime/internal/util.go
+++ b/runtime/internal/util.go
@@ -7,7 +7,6 @@
import (
"fmt"
"net"
- "os"
"strings"
"v.io/x/lib/netstate"
@@ -40,7 +39,7 @@
if handle != nil {
config = handle.Config.Dump()
}
- return f.Parse(os.Args[1:], config)
+ return parseFlagsInternal(f, config)
}
// IPAddressChooser returns the preferred IP address, which is,
diff --git a/runtime/internal/v23_util.go b/runtime/internal/v23_util.go
new file mode 100644
index 0000000..5375851
--- /dev/null
+++ b/runtime/internal/v23_util.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !mojo
+
+package internal
+
+import (
+ "os"
+
+ "v.io/x/ref/lib/flags"
+)
+
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+ return f.Parse(os.Args[1:], config)
+}
diff --git a/services/device/deviced/internal/impl/globsuid/signature_match_test.go b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
index 8f7c3c5..8dbd380 100644
--- a/services/device/deviced/internal/impl/globsuid/signature_match_test.go
+++ b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
@@ -28,9 +28,9 @@
)
func TestDownloadSignatureMatch(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
sh, deferFn := servicetest.CreateShellAndMountTable(t, ctx, nil)
defer deferFn()
@@ -39,7 +39,7 @@
pkgVON := naming.Join(binaryVON, "testpkg")
defer utiltest.StartRealBinaryRepository(t, ctx, binaryVON)()
- up := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+ up := rg.RandomBytes(rg.RandomIntn(5 << 20))
mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
sig, err := binarylib.Upload(ctx, naming.Join(binaryVON, "testbinary"), up, mediaInfo)
if err != nil {
@@ -52,7 +52,7 @@
t.Fatalf("ioutil.TempDir failed: %v", err)
}
defer os.RemoveAll(tmpdir)
- pkgContents := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+ pkgContents := rg.RandomBytes(rg.RandomIntn(5 << 20))
if err := ioutil.WriteFile(filepath.Join(tmpdir, "pkg.txt"), pkgContents, 0600); err != nil {
t.Fatalf("ioutil.WriteFile failed: %v", err)
}
diff --git a/services/internal/binarylib/client_test.go b/services/internal/binarylib/client_test.go
index 000e8ca..7c83ca2 100644
--- a/services/internal/binarylib/client_test.go
+++ b/services/internal/binarylib/client_test.go
@@ -73,13 +73,13 @@
// TestBufferAPI tests the binary repository client-side library
// interface using buffers.
func TestBufferAPI(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
von, cleanup := setupRepository(t, ctx)
defer cleanup()
- data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+ data := rg.RandomBytes(rg.RandomIntn(10 << 20))
mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
sig, err := Upload(ctx, von, data, mediaInfo)
if err != nil {
@@ -125,14 +125,14 @@
// TestFileAPI tests the binary repository client-side library
// interface using files.
func TestFileAPI(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
von, cleanup := setupRepository(t, ctx)
defer cleanup()
// Create up to 10MB of random bytes.
- data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+ data := rg.RandomBytes(rg.RandomIntn(10 << 20))
dir, prefix := "", ""
src, err := ioutil.TempFile(dir, prefix)
if err != nil {
diff --git a/services/internal/binarylib/http_test.go b/services/internal/binarylib/http_test.go
index 73b253f..198cb75 100644
--- a/services/internal/binarylib/http_test.go
+++ b/services/internal/binarylib/http_test.go
@@ -21,9 +21,9 @@
// TestHTTP checks that HTTP download works.
func TestHTTP(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
// TODO(caprita): This is based on TestMultiPart (impl_test.go). Share
// the code where possible.
@@ -34,8 +34,8 @@
data := make([][]byte, length)
for i := 0; i < length; i++ {
// Random size, but at least 1 (avoid empty parts).
- size := testutil.RandomIntn(1000*binarylib.BufferLength) + 1
- data[i] = testutil.RandomBytes(size)
+ size := rg.RandomIntn(1000*binarylib.BufferLength) + 1
+ data[i] = rg.RandomBytes(size)
}
mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
if err := binary.Create(ctx, int32(length), mediaInfo); err != nil {
diff --git a/services/internal/binarylib/impl_test.go b/services/internal/binarylib/impl_test.go
index a614217..1ffd9a6 100644
--- a/services/internal/binarylib/impl_test.go
+++ b/services/internal/binarylib/impl_test.go
@@ -79,11 +79,12 @@
func TestHierarchy(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
for i := 0; i < md5.Size; i++ {
binary, ep, _, cleanup := startServer(t, ctx, i)
defer cleanup()
- data := testData()
+ data := testData(rg)
// Test the binary repository interface.
if err := binary.Create(ctx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -131,6 +132,7 @@
func TestMultiPart(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
for length := 2; length < 5; length++ {
binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -138,7 +140,7 @@
// Create <length> chunks of up to 4MB of random bytes.
data := make([][]byte, length)
for i := 0; i < length; i++ {
- data[i] = testData()
+ data[i] = testData(rg)
}
// Test the binary repository interface.
if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -181,9 +183,9 @@
// resumption ranging the number of parts the uploaded binary consists
// of.
func TestResumption(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
for length := 2; length < 5; length++ {
binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -191,7 +193,7 @@
// Create <length> chunks of up to 4MB of random bytes.
data := make([][]byte, length)
for i := 0; i < length; i++ {
- data[i] = testData()
+ data[i] = testData(rg)
}
if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed: %v", err)
@@ -211,7 +213,7 @@
break
}
for i := 0; i < length; i++ {
- fail := testutil.RandomIntn(2)
+ fail := rg.RandomIntn(2)
if parts[i] == binarylib.MissingPart && fail != 0 {
if streamErr, err := invokeUpload(t, ctx, binary, data[i], int32(i)); streamErr != nil || err != nil {
t.FailNow()
@@ -227,18 +229,18 @@
// TestErrors checks that the binary interface correctly reports errors.
func TestErrors(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
binary, _, _, cleanup := startServer(t, ctx, 2)
defer cleanup()
const length = 2
data := make([][]byte, length)
for i := 0; i < length; i++ {
- data[i] = testData()
+ data[i] = testData(rg)
for j := 0; j < len(data[i]); j++ {
- data[i][j] = byte(testutil.RandomInt())
+ data[i][j] = byte(rg.RandomInt())
}
}
if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -295,10 +297,11 @@
func TestGlob(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
_, ep, _, cleanup := startServer(t, ctx, 2)
defer cleanup()
- data := testData()
+ data := testData(rg)
objects := []string{"foo", "bar", "hello world", "a/b/c"}
for _, obj := range objects {
diff --git a/services/internal/binarylib/perms_test.go b/services/internal/binarylib/perms_test.go
index b4b684a..0729b69 100644
--- a/services/internal/binarylib/perms_test.go
+++ b/services/internal/binarylib/perms_test.go
@@ -76,6 +76,7 @@
func TestBinaryCreateAccessList(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
selfCtx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("self"))
if err != nil {
@@ -105,7 +106,7 @@
if err := b("bini/private").Create(childCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataPrivate := testData()
+ fakeDataPrivate := testData(rg)
if streamErr, err := invokeUpload(t, childCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
}
@@ -130,6 +131,7 @@
func TestBinaryRootAccessList(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
selfPrincipal := testutil.NewPrincipal("self")
selfCtx, err := v23.WithPrincipal(ctx, selfPrincipal)
@@ -161,7 +163,7 @@
if err := b("bini/private").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataPrivate := testData()
+ fakeDataPrivate := testData(rg)
if streamErr, err := invokeUpload(t, selfCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
}
@@ -169,7 +171,7 @@
if err := b("bini/shared").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataShared := testData()
+ fakeDataShared := testData(rg)
if streamErr, err := invokeUpload(t, selfCtx, b("bini/shared"), fakeDataShared, 0); streamErr != nil || err != nil {
t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
}
@@ -296,7 +298,7 @@
if err := b("bini/otherbinary").Create(otherCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataOther := testData()
+ fakeDataOther := testData(rg)
if streamErr, err := invokeUpload(t, otherCtx, b("bini/otherbinary"), fakeDataOther, 0); streamErr != nil || err != nil {
t.FailNow()
}
diff --git a/services/internal/binarylib/util_test.go b/services/internal/binarylib/util_test.go
index 30b414f..cd3d785 100644
--- a/services/internal/binarylib/util_test.go
+++ b/services/internal/binarylib/util_test.go
@@ -88,8 +88,8 @@
}
// testData creates up to 4MB of random bytes.
-func testData() []byte {
- size := testutil.RandomIntn(1000 * binarylib.BufferLength)
- data := testutil.RandomBytes(size)
+func testData(rg *testutil.Random) []byte {
+ size := rg.RandomIntn(1000 * binarylib.BufferLength)
+ data := rg.RandomBytes(size)
return data
}
diff --git a/test/testutil/rand.go b/test/testutil/rand.go
index 3cb1526..8e21640 100644
--- a/test/testutil/rand.go
+++ b/test/testutil/rand.go
@@ -28,7 +28,6 @@
// Random is a concurrent-access friendly source of randomness.
type Random struct {
mu sync.Mutex
- seed int64
rand *rand.Rand
}
@@ -72,9 +71,12 @@
return buffer
}
-// Create a new pseudo-random number generator, the seed may be supplied
-// by V23_RNG_SEED to allow for reproducing a previous sequence.
-func NewRandGenerator() *Random {
+type loggingFunc func(format string, args ...interface{})
+
+// NewRandGenerator creates a new pseudo-random number generator; the seed may
+// be supplied by V23_RNG_SEED to allow for reproducing a previous sequence, and
+// is printed using the supplied logging function.
+func NewRandGenerator(logger loggingFunc) *Random {
seed := time.Now().UnixNano()
seedString := os.Getenv(SeedEnv)
if seedString != "" {
@@ -85,17 +87,35 @@
panic(fmt.Sprintf("ParseInt(%v, %v, %v) failed: %v", seedString, base, bitSize, err))
}
}
- return &Random{seed: seed, rand: rand.New(rand.NewSource(seed))}
+ logger("Seeded pseudo-random number generator with %v", seed)
+ return &Random{rand: rand.New(rand.NewSource(seed))}
}
+// TODO(caprita): Consider deprecating InitRandGenerator in favor of using
+// NewRandGenerator directly. There are several drawbacks to using the global
+// singleton Random object:
+//
+// - tests that do not call InitRandGenerator themselves could depend on
+// InitRandGenerator having been called by other tests in the same package and
+// stop working when run standalone with test --run
+//
+// - conversely, a test case may call InitRandGenerator without actually
+// needing to; it's hard to figure out if some library called by a test
+// actually uses the Random object or not
+//
+// - when several test cases share the same Random object, there is
+// interference in the stream of random numbers generated for each test case
+// if run in parallel
+//
+// All these issues can be trivially addressed if the Random object is created
+// and plumbed through the call stack explicitly.
+
// InitRandGenerator creates an instance of Random in the public variable Rand
-// and returns a function intended to be defer'ed that prints out the
-// seed use when creating the number number generator using the supplied
-// logging function.
-func InitRandGenerator(loggingFunc func(format string, args ...interface{})) {
+// and prints out the seed use when creating the number number generator using
+// the supplied logging function.
+func InitRandGenerator(logger loggingFunc) {
once.Do(func() {
- Rand = NewRandGenerator()
- loggingFunc("Seeded pseudo-random number generator with %v", Rand.seed)
+ Rand = NewRandGenerator(logger)
})
}