Merge "services/groups: storage engine update follow up (v.io/c/13650)"
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index ef03f4e..3e7b57e 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -11,4 +11,7 @@
 // TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
 error (
   LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
+  UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
+  ManagerClosed() {"en": "manager is already closed"}
+  AcceptFailed(err error) {"en": "accept failed{:err}"}
 )
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 2029486..741c22a 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -16,13 +16,34 @@
 
 var (
 	ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+	ErrUnknownProtocol     = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+	ErrManagerClosed       = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+	ErrAcceptFailed        = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
 )
 
 func init() {
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
 }
 
 // NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
 func NewErrLargerThan3ByteUInt(ctx *context.T) error {
 	return verror.New(ErrLargerThan3ByteUInt, ctx)
 }
+
+// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
+func NewErrUnknownProtocol(ctx *context.T, protocol string) error {
+	return verror.New(ErrUnknownProtocol, ctx, protocol)
+}
+
+// NewErrManagerClosed returns an error with the ErrManagerClosed ID.
+func NewErrManagerClosed(ctx *context.T) error {
+	return verror.New(ErrManagerClosed, ctx)
+}
+
+// NewErrAcceptFailed returns an error with the ErrAcceptFailed ID.
+func NewErrAcceptFailed(ctx *context.T, err error) error {
+	return verror.New(ErrAcceptFailed, ctx, err)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
new file mode 100644
index 0000000..e6d9360
--- /dev/null
+++ b/runtime/internal/flow/manager/manager.go
@@ -0,0 +1,244 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+	"net"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+
+	"v.io/x/ref/runtime/internal/flow/conn"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/version"
+)
+
+type manager struct {
+	rid    naming.RoutingID
+	closed <-chan struct{}
+	q      *upcqueue.T
+
+	mu              *sync.Mutex
+	listenEndpoints []naming.Endpoint
+}
+
+func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+	m := &manager{
+		rid:    rid,
+		closed: ctx.Done(),
+		mu:     &sync.Mutex{},
+		q:      upcqueue.New(),
+	}
+	return m
+}
+
+// Listen causes the Manager to accept flows from the provided protocol and address.
+// Listen may be called muliple times.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+	netLn, err := listen(ctx, protocol, address)
+	if err != nil {
+		return flow.NewErrNetwork(ctx, err)
+	}
+	local := &inaming.Endpoint{
+		Protocol: protocol,
+		Address:  netLn.Addr().String(),
+		RID:      m.rid,
+	}
+	m.mu.Lock()
+	m.listenEndpoints = append(m.listenEndpoints, local)
+	m.mu.Unlock()
+	go m.netLnAcceptLoop(ctx, netLn, local)
+	return nil
+}
+
+func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+	const killConnectionsRetryDelay = 5 * time.Millisecond
+	for {
+		netConn, err := netLn.Accept()
+		for tokill := 1; isTemporaryError(err); tokill *= 2 {
+			if isTooManyOpenFiles(err) {
+				// TODO(suharshs): Find a way to kill connections here. We will need
+				// caching to be able to delete the connections.
+			} else {
+				tokill = 1
+			}
+			time.Sleep(killConnectionsRetryDelay)
+			netConn, err = netLn.Accept()
+		}
+		if err != nil {
+			ctx.VI(2).Infof("net.Listener.Accept on localEP %v failed: %v", local, err)
+		}
+		// TODO(suharshs): This conn needs to be cached instead of ignored.
+		_, err = conn.NewAccepted(
+			ctx,
+			&framer{ReadWriter: netConn},
+			local,
+			v23.GetPrincipal(ctx).BlessingStore().Default(),
+			version.Supported,
+			&flowHandler{q: m.q, closed: m.closed},
+		)
+		if err != nil {
+			netConn.Close()
+			ctx.VI(2).Infof("failed to accept flow.Conn on localEP %v failed: %v", local, err)
+		}
+	}
+}
+
+type flowHandler struct {
+	q      *upcqueue.T
+	closed <-chan struct{}
+}
+
+func (h *flowHandler) HandleFlow(f flow.Flow) error {
+	select {
+	case <-h.closed:
+		// This will make the Put call below return a upcqueue.ErrQueueIsClosed.
+		h.q.Close()
+	default:
+	}
+	return h.q.Put(f)
+}
+
+// ListeningEndpoints returns the endpoints that the Manager has explicitly
+// listened on. The Manager will accept new flows on these endpoints.
+// Returned endpoints all have a RoutingID unique to the Acceptor.
+func (m *manager) ListeningEndpoints() []naming.Endpoint {
+	m.mu.Lock()
+	ret := make([]naming.Endpoint, len(m.listenEndpoints))
+	copy(ret, m.listenEndpoints)
+	m.mu.Unlock()
+	return ret
+}
+
+// Accept blocks until a new Flow has been initiated by a remote process.
+// Flows are accepted from addresses that the Manager is listening on,
+// including outgoing dialed connections.
+//
+// For example:
+//   err := m.Listen(ctx, "tcp", ":0")
+//   for {
+//     flow, err := m.Accept(ctx)
+//     // process flow
+//   }
+//
+// can be used to accept Flows initiated by remote processes.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
+	// TODO(suharshs): Ensure that m is attached to ctx.
+	item, err := m.q.Get(m.closed)
+	switch {
+	case err == upcqueue.ErrQueueIsClosed:
+		return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
+	case err != nil:
+		return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
+	default:
+		return item.(flow.Flow), nil
+	}
+}
+
+// Dial creates a Flow to the provided remote endpoint, using 'fn' to
+// determine the blessings that will be sent to the remote end.
+//
+// To maximize re-use of connections, the Manager will also Listen on Dialed
+// connections for the lifetime of the connection.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
+	// TODO(suharshs): Add caching of connections.
+	addr := remote.Addr()
+	d, _, _, _ := rpc.RegisteredProtocol(addr.Network())
+	netConn, err := dial(ctx, d, addr.Network(), addr.String())
+	if err != nil {
+		return nil, flow.NewErrDialFailed(ctx, err)
+	}
+
+	c, err := conn.NewDialed(
+		ctx,
+		&framer{ReadWriter: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
+		localEndpoint(netConn, m.rid),
+		remote,
+		version.Supported,
+		&flowHandler{q: m.q, closed: m.closed},
+		fn,
+	)
+	if err != nil {
+		return nil, flow.NewErrDialFailed(ctx, err)
+	}
+	return c.Dial(ctx)
+}
+
+// Closed returns a channel that remains open for the lifetime of the Manager
+// object. Once the channel is closed any operations on the Manager will
+// necessarily fail.
+func (m *manager) Closed() <-chan struct{} {
+	return m.closed
+}
+
+func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
+	if d != nil {
+		var timeout time.Duration
+		if dl, ok := ctx.Deadline(); ok {
+			timeout = dl.Sub(time.Now())
+		}
+		return d(protocol, address, timeout)
+	}
+	return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
+	if r != nil {
+		net, addr, err := r(protocol, address)
+		if err != nil {
+			return "", "", err
+		}
+		return net, addr, nil
+	}
+	return "", "", NewErrUnknownProtocol(ctx, protocol)
+}
+
+func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
+	if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
+		ln, err := l(protocol, address)
+		if err != nil {
+			return nil, err
+		}
+		return ln, nil
+	}
+	return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+	localAddr := conn.LocalAddr()
+	ep := &inaming.Endpoint{
+		Protocol: localAddr.Network(),
+		Address:  localAddr.String(),
+		RID:      rid,
+	}
+	return ep
+}
+
+func isTemporaryError(err error) bool {
+	oErr, ok := err.(*net.OpError)
+	return ok && oErr.Temporary()
+}
+
+func isTooManyOpenFiles(err error) bool {
+	oErr, ok := err.(*net.OpError)
+	return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
+}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
new file mode 100644
index 0000000..a179692
--- /dev/null
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+	"bufio"
+	"strings"
+	"testing"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/ref/runtime/internal/flow/manager"
+	"v.io/x/ref/test/testutil"
+)
+
+func TestDirectConnection(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	p := testutil.NewPrincipal("test")
+	ctx, err := v23.WithPrincipal(ctx, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rid := naming.FixedRoutingID(0x5555)
+	m := manager.New(ctx, rid)
+	want := "read this please"
+
+	if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+		t.Fatal(err)
+	}
+
+	bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+	eps := m.ListeningEndpoints()
+	if len(eps) == 0 {
+		t.Fatalf("no endpoints listened on")
+	}
+	flow, err := m.Dial(ctx, eps[0], bFn)
+	if err != nil {
+		t.Error(err)
+	}
+	writeLine(flow, want)
+
+	flow, err = m.Accept(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err := readLine(flow)
+	if err != nil {
+		t.Error(err)
+	}
+	if got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
+
+func readLine(f flow.Flow) (string, error) {
+	s, err := bufio.NewReader(f).ReadString('\n')
+	return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+	data += "\n"
+	_, err := f.Write([]byte(data))
+	return err
+}
diff --git a/runtime/internal/mojo_util.go b/runtime/internal/mojo_util.go
new file mode 100644
index 0000000..e434d68
--- /dev/null
+++ b/runtime/internal/mojo_util.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build mojo
+
+package internal
+
+import (
+	"flag"
+	"os"
+	"strings"
+
+	"v.io/x/ref/lib/flags"
+)
+
+// TODO(sadovsky): Terrible, terrible hack.
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+	// We expect that command-line flags have not been parsed. v23_util performs
+	// command-line parsing at this point. For Mojo, we instead parse command-line
+	// flags from the V23_MOJO_FLAGS env var.
+	// TODO(sadovsky): Maybe move this check to util.go, or drop it?
+	if flag.CommandLine.Parsed() {
+		panic("flag.CommandLine.Parse() has been called")
+	}
+	// TODO(sadovsky): Support argument quoting. More generally, parse this env
+	// var similar to how bash parses arguments.
+	return f.Parse(strings.Split(os.Getenv("V23_MOJO_FLAGS"), " "), config)
+}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 63f3133..06cc3cd 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -26,6 +26,7 @@
 // Min is incremented whenever we want to remove support for old protocol
 // versions.
 var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
+var Supported = version.RPCVersionRange{Min: version.RPCVersion10, Max: version.RPCVersion11}
 
 func init() {
 	metadata.Insert("v23.RPCVersionMax", fmt.Sprint(SupportedRange.Max))
diff --git a/runtime/internal/util.go b/runtime/internal/util.go
index 4df3db4..f5f6f6a 100644
--- a/runtime/internal/util.go
+++ b/runtime/internal/util.go
@@ -7,7 +7,6 @@
 import (
 	"fmt"
 	"net"
-	"os"
 	"strings"
 
 	"v.io/x/lib/netstate"
@@ -40,7 +39,7 @@
 	if handle != nil {
 		config = handle.Config.Dump()
 	}
-	return f.Parse(os.Args[1:], config)
+	return parseFlagsInternal(f, config)
 }
 
 // IPAddressChooser returns the preferred IP address, which is,
diff --git a/runtime/internal/v23_util.go b/runtime/internal/v23_util.go
new file mode 100644
index 0000000..5375851
--- /dev/null
+++ b/runtime/internal/v23_util.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !mojo
+
+package internal
+
+import (
+	"os"
+
+	"v.io/x/ref/lib/flags"
+)
+
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+	return f.Parse(os.Args[1:], config)
+}
diff --git a/services/device/deviced/internal/impl/globsuid/signature_match_test.go b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
index 8f7c3c5..8dbd380 100644
--- a/services/device/deviced/internal/impl/globsuid/signature_match_test.go
+++ b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
@@ -28,9 +28,9 @@
 )
 
 func TestDownloadSignatureMatch(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	sh, deferFn := servicetest.CreateShellAndMountTable(t, ctx, nil)
 	defer deferFn()
@@ -39,7 +39,7 @@
 	pkgVON := naming.Join(binaryVON, "testpkg")
 	defer utiltest.StartRealBinaryRepository(t, ctx, binaryVON)()
 
-	up := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+	up := rg.RandomBytes(rg.RandomIntn(5 << 20))
 	mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
 	sig, err := binarylib.Upload(ctx, naming.Join(binaryVON, "testbinary"), up, mediaInfo)
 	if err != nil {
@@ -52,7 +52,7 @@
 		t.Fatalf("ioutil.TempDir failed: %v", err)
 	}
 	defer os.RemoveAll(tmpdir)
-	pkgContents := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+	pkgContents := rg.RandomBytes(rg.RandomIntn(5 << 20))
 	if err := ioutil.WriteFile(filepath.Join(tmpdir, "pkg.txt"), pkgContents, 0600); err != nil {
 		t.Fatalf("ioutil.WriteFile failed: %v", err)
 	}
diff --git a/services/internal/binarylib/client_test.go b/services/internal/binarylib/client_test.go
index 000e8ca..7c83ca2 100644
--- a/services/internal/binarylib/client_test.go
+++ b/services/internal/binarylib/client_test.go
@@ -73,13 +73,13 @@
 // TestBufferAPI tests the binary repository client-side library
 // interface using buffers.
 func TestBufferAPI(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	von, cleanup := setupRepository(t, ctx)
 	defer cleanup()
-	data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+	data := rg.RandomBytes(rg.RandomIntn(10 << 20))
 	mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
 	sig, err := Upload(ctx, von, data, mediaInfo)
 	if err != nil {
@@ -125,14 +125,14 @@
 // TestFileAPI tests the binary repository client-side library
 // interface using files.
 func TestFileAPI(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	von, cleanup := setupRepository(t, ctx)
 	defer cleanup()
 	// Create up to 10MB of random bytes.
-	data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+	data := rg.RandomBytes(rg.RandomIntn(10 << 20))
 	dir, prefix := "", ""
 	src, err := ioutil.TempFile(dir, prefix)
 	if err != nil {
diff --git a/services/internal/binarylib/http_test.go b/services/internal/binarylib/http_test.go
index 73b253f..198cb75 100644
--- a/services/internal/binarylib/http_test.go
+++ b/services/internal/binarylib/http_test.go
@@ -21,9 +21,9 @@
 
 // TestHTTP checks that HTTP download works.
 func TestHTTP(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	// TODO(caprita): This is based on TestMultiPart (impl_test.go).  Share
 	// the code where possible.
@@ -34,8 +34,8 @@
 		data := make([][]byte, length)
 		for i := 0; i < length; i++ {
 			// Random size, but at least 1 (avoid empty parts).
-			size := testutil.RandomIntn(1000*binarylib.BufferLength) + 1
-			data[i] = testutil.RandomBytes(size)
+			size := rg.RandomIntn(1000*binarylib.BufferLength) + 1
+			data[i] = rg.RandomBytes(size)
 		}
 		mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
 		if err := binary.Create(ctx, int32(length), mediaInfo); err != nil {
diff --git a/services/internal/binarylib/impl_test.go b/services/internal/binarylib/impl_test.go
index a614217..1ffd9a6 100644
--- a/services/internal/binarylib/impl_test.go
+++ b/services/internal/binarylib/impl_test.go
@@ -79,11 +79,12 @@
 func TestHierarchy(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	for i := 0; i < md5.Size; i++ {
 		binary, ep, _, cleanup := startServer(t, ctx, i)
 		defer cleanup()
-		data := testData()
+		data := testData(rg)
 
 		// Test the binary repository interface.
 		if err := binary.Create(ctx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -131,6 +132,7 @@
 func TestMultiPart(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	for length := 2; length < 5; length++ {
 		binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -138,7 +140,7 @@
 		// Create <length> chunks of up to 4MB of random bytes.
 		data := make([][]byte, length)
 		for i := 0; i < length; i++ {
-			data[i] = testData()
+			data[i] = testData(rg)
 		}
 		// Test the binary repository interface.
 		if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -181,9 +183,9 @@
 // resumption ranging the number of parts the uploaded binary consists
 // of.
 func TestResumption(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	for length := 2; length < 5; length++ {
 		binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -191,7 +193,7 @@
 		// Create <length> chunks of up to 4MB of random bytes.
 		data := make([][]byte, length)
 		for i := 0; i < length; i++ {
-			data[i] = testData()
+			data[i] = testData(rg)
 		}
 		if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 			t.Fatalf("Create() failed: %v", err)
@@ -211,7 +213,7 @@
 				break
 			}
 			for i := 0; i < length; i++ {
-				fail := testutil.RandomIntn(2)
+				fail := rg.RandomIntn(2)
 				if parts[i] == binarylib.MissingPart && fail != 0 {
 					if streamErr, err := invokeUpload(t, ctx, binary, data[i], int32(i)); streamErr != nil || err != nil {
 						t.FailNow()
@@ -227,18 +229,18 @@
 
 // TestErrors checks that the binary interface correctly reports errors.
 func TestErrors(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	binary, _, _, cleanup := startServer(t, ctx, 2)
 	defer cleanup()
 	const length = 2
 	data := make([][]byte, length)
 	for i := 0; i < length; i++ {
-		data[i] = testData()
+		data[i] = testData(rg)
 		for j := 0; j < len(data[i]); j++ {
-			data[i][j] = byte(testutil.RandomInt())
+			data[i][j] = byte(rg.RandomInt())
 		}
 	}
 	if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -295,10 +297,11 @@
 func TestGlob(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	_, ep, _, cleanup := startServer(t, ctx, 2)
 	defer cleanup()
-	data := testData()
+	data := testData(rg)
 
 	objects := []string{"foo", "bar", "hello world", "a/b/c"}
 	for _, obj := range objects {
diff --git a/services/internal/binarylib/perms_test.go b/services/internal/binarylib/perms_test.go
index b4b684a..0729b69 100644
--- a/services/internal/binarylib/perms_test.go
+++ b/services/internal/binarylib/perms_test.go
@@ -76,6 +76,7 @@
 func TestBinaryCreateAccessList(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	selfCtx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("self"))
 	if err != nil {
@@ -105,7 +106,7 @@
 	if err := b("bini/private").Create(childCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataPrivate := testData()
+	fakeDataPrivate := testData(rg)
 	if streamErr, err := invokeUpload(t, childCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
 		t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
 	}
@@ -130,6 +131,7 @@
 func TestBinaryRootAccessList(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	selfPrincipal := testutil.NewPrincipal("self")
 	selfCtx, err := v23.WithPrincipal(ctx, selfPrincipal)
@@ -161,7 +163,7 @@
 	if err := b("bini/private").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataPrivate := testData()
+	fakeDataPrivate := testData(rg)
 	if streamErr, err := invokeUpload(t, selfCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
 		t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
 	}
@@ -169,7 +171,7 @@
 	if err := b("bini/shared").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataShared := testData()
+	fakeDataShared := testData(rg)
 	if streamErr, err := invokeUpload(t, selfCtx, b("bini/shared"), fakeDataShared, 0); streamErr != nil || err != nil {
 		t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
 	}
@@ -296,7 +298,7 @@
 	if err := b("bini/otherbinary").Create(otherCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataOther := testData()
+	fakeDataOther := testData(rg)
 	if streamErr, err := invokeUpload(t, otherCtx, b("bini/otherbinary"), fakeDataOther, 0); streamErr != nil || err != nil {
 		t.FailNow()
 	}
diff --git a/services/internal/binarylib/util_test.go b/services/internal/binarylib/util_test.go
index 30b414f..cd3d785 100644
--- a/services/internal/binarylib/util_test.go
+++ b/services/internal/binarylib/util_test.go
@@ -88,8 +88,8 @@
 }
 
 // testData creates up to 4MB of random bytes.
-func testData() []byte {
-	size := testutil.RandomIntn(1000 * binarylib.BufferLength)
-	data := testutil.RandomBytes(size)
+func testData(rg *testutil.Random) []byte {
+	size := rg.RandomIntn(1000 * binarylib.BufferLength)
+	data := rg.RandomBytes(size)
 	return data
 }
diff --git a/test/testutil/rand.go b/test/testutil/rand.go
index 3cb1526..8e21640 100644
--- a/test/testutil/rand.go
+++ b/test/testutil/rand.go
@@ -28,7 +28,6 @@
 // Random is a concurrent-access friendly source of randomness.
 type Random struct {
 	mu   sync.Mutex
-	seed int64
 	rand *rand.Rand
 }
 
@@ -72,9 +71,12 @@
 	return buffer
 }
 
-// Create a new pseudo-random number generator, the seed may be supplied
-// by V23_RNG_SEED to allow for reproducing a previous sequence.
-func NewRandGenerator() *Random {
+type loggingFunc func(format string, args ...interface{})
+
+// NewRandGenerator creates a new pseudo-random number generator; the seed may
+// be supplied by V23_RNG_SEED to allow for reproducing a previous sequence, and
+// is printed using the supplied logging function.
+func NewRandGenerator(logger loggingFunc) *Random {
 	seed := time.Now().UnixNano()
 	seedString := os.Getenv(SeedEnv)
 	if seedString != "" {
@@ -85,17 +87,35 @@
 			panic(fmt.Sprintf("ParseInt(%v, %v, %v) failed: %v", seedString, base, bitSize, err))
 		}
 	}
-	return &Random{seed: seed, rand: rand.New(rand.NewSource(seed))}
+	logger("Seeded pseudo-random number generator with %v", seed)
+	return &Random{rand: rand.New(rand.NewSource(seed))}
 }
 
+// TODO(caprita): Consider deprecating InitRandGenerator in favor of using
+// NewRandGenerator directly.  There are several drawbacks to using the global
+// singleton Random object:
+//
+//   - tests that do not call InitRandGenerator themselves could depend on
+//   InitRandGenerator having been called by other tests in the same package and
+//   stop working when run standalone with test --run
+//
+//   - conversely, a test case may call InitRandGenerator without actually
+//   needing to; it's hard to figure out if some library called by a test
+//   actually uses the Random object or not
+//
+//   - when several test cases share the same Random object, there is
+//   interference in the stream of random numbers generated for each test case
+//   if run in parallel
+//
+// All these issues can be trivially addressed if the Random object is created
+// and plumbed through the call stack explicitly.
+
 // InitRandGenerator creates an instance of Random in the public variable Rand
-// and returns a function intended to be defer'ed that prints out the
-// seed use when creating the number number generator using the supplied
-// logging function.
-func InitRandGenerator(loggingFunc func(format string, args ...interface{})) {
+// and prints out the seed use when creating the number number generator using
+// the supplied logging function.
+func InitRandGenerator(logger loggingFunc) {
 	once.Do(func() {
-		Rand = NewRandGenerator()
-		loggingFunc("Seeded pseudo-random number generator with %v", Rand.seed)
+		Rand = NewRandGenerator(logger)
 	})
 }