Merge branch 'master' into vtrace
diff --git a/cmd/mounttable/impl.go b/cmd/mounttable/impl.go
index f3b4974..25a0c80 100644
--- a/cmd/mounttable/impl.go
+++ b/cmd/mounttable/impl.go
@@ -20,7 +20,6 @@
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
- "v.io/v23/security"
"v.io/x/ref/lib/v23cmd"
_ "v.io/x/ref/runtime/factories/generic"
@@ -201,25 +200,3 @@
`,
Children: []*cmdline.Command{cmdGlob, cmdMount, cmdUnmount, cmdResolveStep},
}
-
-func blessingPatternsFromServer(ctx *context.T, server string) ([]security.BlessingPattern, error) {
- ctx.Infof("Contacting %q to determine the blessings presented by it", server)
- ctx, cancel := context.WithTimeout(ctx, time.Minute)
- defer cancel()
- call, err := v23.GetClient(ctx).StartCall(ctx, server, rpc.ReservedSignature, nil)
- if err != nil {
- return nil, fmt.Errorf("Unable to extract blessings presented by %q: %v", server, err)
- }
- blessings, _ := call.RemoteBlessings()
- if len(blessings) == 0 {
- return nil, fmt.Errorf("No recognizable blessings presented by %q, it cannot be securely mounted", server)
- }
- // This translation between BlessingPattern and string is silly!
- // Kill the BlessingPatterns type and make methods on that type
- // functions instead!
- patterns := make([]security.BlessingPattern, len(blessings))
- for i := range blessings {
- patterns[i] = security.BlessingPattern(blessings[i])
- }
- return patterns, nil
-}
diff --git a/lib/security/bcrypter/crypter_test.go b/lib/security/bcrypter/crypter_test.go
index c686349..37bee26 100644
--- a/lib/security/bcrypter/crypter_test.go
+++ b/lib/security/bcrypter/crypter_test.go
@@ -29,7 +29,7 @@
return []byte("AThirtyTwoBytePieceOfTextThisIs!")
}
-func TextExtract(t *testing.T) {
+func TestExtract(t *testing.T) {
ctx, shutdown := context.RootContext()
defer shutdown()
@@ -50,7 +50,7 @@
if got := key.Blessing(); got != b {
t.Fatalf("extracted key is for blessing %v, want key for blessing %v", got, b)
}
- if got, want := key.Params, googleYoutube.Params(); !reflect.DeepEqual(got, want) {
+ if got, want := key.Params(), googleYoutube.Params(); !reflect.DeepEqual(got, want) {
t.Fatalf("extract key is for params %v, want key for params %v", got, want)
}
}
diff --git a/lib/security/testutil_test.go b/lib/security/testutil_test.go
index 4118e85..fea678f 100644
--- a/lib/security/testutil_test.go
+++ b/lib/security/testutil_test.go
@@ -54,14 +54,6 @@
return p, def
}
-func bless(blesser, blessed security.Principal, with security.Blessings, extension string) security.Blessings {
- b, err := blesser.Bless(blessed.PublicKey(), with, extension, security.UnconstrainedUse())
- if err != nil {
- panic(err)
- }
- return b
-}
-
func blessSelf(p security.Principal, name string) security.Blessings {
b, err := p.BlessSelf(name)
if err != nil {
diff --git a/lib/vdl/opconst/big_complex.go b/lib/vdl/opconst/big_complex.go
index 3d204d1..da2a8e0 100644
--- a/lib/vdl/opconst/big_complex.go
+++ b/lib/vdl/opconst/big_complex.go
@@ -25,11 +25,6 @@
return &bigComplex{re: *re}
}
-// imagComplex returns a bigComplex with real part zero, and imaginary part im.
-func imagComplex(im *big.Rat) *bigComplex {
- return &bigComplex{im: *im}
-}
-
func (z *bigComplex) SetComplex128(c complex128) *bigComplex {
z.re.SetFloat64(real(c))
z.im.SetFloat64(imag(c))
diff --git a/runtime/factories/android/android.go b/runtime/factories/android/android.go
index e09c89d..deabcd4 100644
--- a/runtime/factories/android/android.go
+++ b/runtime/factories/android/android.go
@@ -28,25 +28,18 @@
"v.io/x/ref/runtime/internal"
"v.io/x/ref/runtime/internal/lib/appcycle"
"v.io/x/ref/runtime/internal/lib/roaming"
- "v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
"v.io/x/ref/runtime/internal/rt"
_ "v.io/x/ref/runtime/protocols/tcp"
_ "v.io/x/ref/runtime/protocols/ws"
_ "v.io/x/ref/runtime/protocols/wsh"
"v.io/x/ref/services/debug/debuglib"
-
- // TODO(suharshs): Remove these once we switch to the flow protocols.
- _ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
)
var commonFlags *flags.Flags
func init() {
v23.RegisterRuntimeFactory(Init)
- rpc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener)
flow.RegisterUnknownProtocol("wsh", xwebsocket.WSH{})
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
}
diff --git a/runtime/factories/chrome/chrome.go b/runtime/factories/chrome/chrome.go
index 9408d82..f40d965 100644
--- a/runtime/factories/chrome/chrome.go
+++ b/runtime/factories/chrome/chrome.go
@@ -16,22 +16,16 @@
"v.io/x/ref/lib/flags"
"v.io/x/ref/runtime/internal"
- "v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
grt "v.io/x/ref/runtime/internal/rt"
_ "v.io/x/ref/runtime/protocols/ws"
_ "v.io/x/ref/runtime/protocols/wsh_nacl"
-
- // TODO(suharshs): Remove this after we switch to the flow protocols.
- _ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/wsh_nacl"
)
var commonFlags *flags.Flags
func init() {
v23.RegisterRuntimeFactory(Init)
- rpc.RegisterUnknownProtocol("wsh", websocket.Dial, websocket.Resolve, websocket.Listener)
flow.RegisterUnknownProtocol("wsh", xwebsocket.WS{})
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime)
}
diff --git a/runtime/factories/fake/fake.go b/runtime/factories/fake/fake.go
index 4fd788e..89ec1d7 100644
--- a/runtime/factories/fake/fake.go
+++ b/runtime/factories/fake/fake.go
@@ -14,20 +14,13 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
- "v.io/v23/rpc"
"v.io/x/ref/runtime/internal"
- "v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
+ _ "v.io/x/ref/runtime/protocols/local"
_ "v.io/x/ref/runtime/protocols/tcp"
_ "v.io/x/ref/runtime/protocols/ws"
_ "v.io/x/ref/runtime/protocols/wsh"
-
- // TODO(suharshs): Remove these once we switch to the flow protocols.
- _ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
- _ "v.io/x/ref/runtime/protocols/local"
)
var (
@@ -41,7 +34,6 @@
func init() {
v23.RegisterRuntimeFactory(Init)
- rpc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener)
flow.RegisterUnknownProtocol("wsh", xwebsocket.WSH{})
}
diff --git a/runtime/factories/gce/gce.go b/runtime/factories/gce/gce.go
index eb892c3..aee5a41 100644
--- a/runtime/factories/gce/gce.go
+++ b/runtime/factories/gce/gce.go
@@ -23,24 +23,17 @@
"v.io/x/ref/runtime/internal"
"v.io/x/ref/runtime/internal/gce"
"v.io/x/ref/runtime/internal/lib/appcycle"
- "v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
grt "v.io/x/ref/runtime/internal/rt"
_ "v.io/x/ref/runtime/protocols/tcp"
_ "v.io/x/ref/runtime/protocols/ws"
_ "v.io/x/ref/runtime/protocols/wsh"
-
- // TODO(suharshs): Remove these once we switch to the flow protocols.
- _ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
)
var commonFlags *flags.Flags
func init() {
v23.RegisterRuntimeFactory(Init)
- rpc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener)
flow.RegisterUnknownProtocol("wsh", xwebsocket.WSH{})
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
}
diff --git a/runtime/factories/generic/generic.go b/runtime/factories/generic/generic.go
index d6513dc..cf4b404 100644
--- a/runtime/factories/generic/generic.go
+++ b/runtime/factories/generic/generic.go
@@ -18,24 +18,17 @@
"v.io/x/ref/lib/flags"
"v.io/x/ref/runtime/internal"
"v.io/x/ref/runtime/internal/lib/appcycle"
- "v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
grt "v.io/x/ref/runtime/internal/rt"
_ "v.io/x/ref/runtime/protocols/tcp"
_ "v.io/x/ref/runtime/protocols/ws"
_ "v.io/x/ref/runtime/protocols/wsh"
-
- // TODO(suharshs): Remove these once we switch to the flow protocols.
- _ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
)
var commonFlags *flags.Flags
func init() {
v23.RegisterRuntimeFactory(Init)
- rpc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener)
flow.RegisterUnknownProtocol("wsh", xwebsocket.WSH{})
flags.SetDefaultHostPort(":0")
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index c6c0fe3..5d0ce6b 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -29,25 +29,18 @@
"v.io/x/ref/runtime/internal"
"v.io/x/ref/runtime/internal/lib/appcycle"
"v.io/x/ref/runtime/internal/lib/roaming"
- "v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
"v.io/x/ref/runtime/internal/rt"
_ "v.io/x/ref/runtime/protocols/tcp"
_ "v.io/x/ref/runtime/protocols/ws"
_ "v.io/x/ref/runtime/protocols/wsh"
"v.io/x/ref/services/debug/debuglib"
-
- // TODO(suharshs): Remove these once we switch to the flow protocols.
- _ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
- _ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
)
var commonFlags *flags.Flags
func init() {
v23.RegisterRuntimeFactory(Init)
- rpc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener)
flow.RegisterUnknownProtocol("wsh", xwebsocket.WSH{})
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
}
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 8c9b5ba..4e8363e 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -177,14 +177,14 @@
if binding, err = message.Append(ctx, lSetup, nil); err != nil {
return nil, nil, err
}
- if binding, err = message.Append(ctx, rSetup, nil); err != nil {
+ if binding, err = message.Append(ctx, rSetup, binding); err != nil {
return nil, nil, err
}
} else {
if binding, err = message.Append(ctx, rSetup, nil); err != nil {
return nil, nil, err
}
- if binding, err = message.Append(ctx, lSetup, nil); err != nil {
+ if binding, err = message.Append(ctx, lSetup, binding); err != nil {
return nil, nil, err
}
}
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 285789c..c6478b6 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -132,16 +132,6 @@
func (fc *fakeDischargeClient) Close() {}
func (fc *fakeDischargeClient) Closed() <-chan struct{} { return nil }
-func patterns(ctx *context.T) []security.BlessingPattern {
- p := v23.GetPrincipal(ctx)
- b := p.BlessingStore().Default()
- var patterns []security.BlessingPattern
- for _, n := range security.BlessingNames(p, b) {
- patterns = append(patterns, security.BlessingPattern(n))
- }
- return patterns
-}
-
func TestUnidirectional(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 55a8197..bd2d272 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -158,6 +158,12 @@
if channelTimeout == 0 {
channelTimeout = defaultChannelTimeout
}
+ // If the conn is being built on an encapsulated flow, we must update the
+ // cancellation of the flow, to ensure that the conn doesn't get killed
+ // when the context passed in is cancelled.
+ if f, ok := conn.(*flw); ok {
+ ctx = f.SetDeadlineContext(ctx, time.Time{})
+ }
c := &Conn{
mp: newMessagePipe(conn),
handler: handler,
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 60236a6..b827759 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -513,7 +513,7 @@
delete(f.conn.flows, f.id)
f.conn.mu.Unlock()
if serr != nil {
- ctx.Errorf("Could not send close flow message: %v", err)
+ ctx.VI(2).Infof("Could not send close flow message: %v", err)
}
}
}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index 0e4467d..63316e0 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -267,20 +267,6 @@
}
}
-// matchBlessings return true if the intersection of a and b is not empty.
-func matchBlessings(a, b []string) bool {
- m := make(map[string]bool, len(a))
- for _, i := range a {
- m[i] = true
- }
- for _, j := range b {
- if m[j] {
- return true
- }
- }
- return false
-}
-
// TODO(suharshs): If sorting the connections becomes too slow, switch to
// container/heap instead of sorting all the connections.
type connEntries []*connEntry
diff --git a/runtime/internal/lib/tcputil/tcputil.go b/runtime/internal/lib/tcputil/tcputil.go
index c6891f1..c0c8f78 100644
--- a/runtime/internal/lib/tcputil/tcputil.go
+++ b/runtime/internal/lib/tcputil/tcputil.go
@@ -13,7 +13,7 @@
"v.io/v23/context"
"v.io/v23/flow"
- "v.io/x/ref/runtime/internal/lib/framer"
+ "v.io/x/ref/runtime/protocols/lib/framer"
)
const keepAlivePeriod = 30 * time.Second
diff --git a/runtime/internal/lib/websocket/conn.go b/runtime/internal/lib/websocket/conn.go
deleted file mode 100644
index 77c7e99..0000000
--- a/runtime/internal/lib/websocket/conn.go
+++ /dev/null
@@ -1,171 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build !nacl
-
-package websocket
-
-import (
- "fmt"
- "io"
- "net"
- "sync"
- "time"
-
- "github.com/gorilla/websocket"
-)
-
-// WebsocketConn provides a net.Conn interface for a websocket connection.
-func WebsocketConn(ws *websocket.Conn) net.Conn {
- return &wrappedConn{ws: ws}
-}
-
-// wrappedConn provides a net.Conn interface to a websocket.
-// The underlying websocket connection needs regular calls to Read to make sure
-// websocket control messages (such as pings) are processed by the websocket
-// library.
-type wrappedConn struct {
- ws *websocket.Conn
- currReader io.Reader
-
- // The gorilla docs aren't explicit about reading and writing from
- // different goroutines. It is explicit that only one goroutine can
- // do a write at any given time and only one goroutine can do a read
- // at any given time. Based on inspection it seems that using a reader
- // and writer simultaneously is safe, but this might change with
- // future changes. We can't actually share the lock, because this means
- // that we can't write while we are waiting for a message, causing some
- // deadlocks where a write is need to unblock a read.
- writeLock sync.Mutex
- readLock sync.Mutex
-}
-
-func (c *wrappedConn) readFromCurrReader(b []byte) (int, error) {
- n, err := c.currReader.Read(b)
- if err == io.EOF {
- err = nil
- c.currReader = nil
- }
- return n, err
-}
-
-func (c *wrappedConn) Read(b []byte) (int, error) {
- c.readLock.Lock()
- defer c.readLock.Unlock()
- var n int
- var err error
-
- // TODO(bjornick): It would be nice to be able to read multiple messages at
- // a time in case the first message is not big enough to fill b and another
- // message is ready.
- // Loop until we either get data or an error. This exists
- // mostly to avoid return 0, nil.
- for n == 0 && err == nil {
- if c.currReader == nil {
- t, r, err := c.ws.NextReader()
- if err != nil {
- return 0, err
- }
- if t != websocket.BinaryMessage {
- return 0, fmt.Errorf("Unexpected message type %d", t)
- }
- c.currReader = r
- }
- n, err = c.readFromCurrReader(b)
- }
- return n, err
-}
-
-func (c *wrappedConn) Write(b []byte) (int, error) {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- if err := c.ws.WriteMessage(websocket.BinaryMessage, b); err != nil {
- return 0, err
- }
- return len(b), nil
-}
-
-func (c *wrappedConn) Close() error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- // Send an EOF control message to the remote end so that it can
- // handle the close gracefully.
- msg := websocket.FormatCloseMessage(websocket.CloseGoingAway, "EOF")
- c.ws.WriteControl(websocket.CloseMessage, msg, time.Now().Add(time.Second))
- return c.ws.Close()
-}
-
-func (c *wrappedConn) LocalAddr() net.Addr {
- return &addr{"ws", c.ws.LocalAddr().String()}
-}
-
-func (c *wrappedConn) RemoteAddr() net.Addr {
- return &addr{"ws", c.ws.RemoteAddr().String()}
-}
-
-func (c *wrappedConn) SetDeadline(t time.Time) error {
- if err := c.SetReadDeadline(t); err != nil {
- return err
- }
- return c.SetWriteDeadline(t)
-}
-
-func (c *wrappedConn) SetReadDeadline(t time.Time) error {
- return c.ws.SetReadDeadline(t)
-}
-
-func (c *wrappedConn) SetWriteDeadline(t time.Time) error {
- return c.ws.SetWriteDeadline(t)
-}
-
-// hybridConn is used by the 'hybrid' protocol that can accept
-// either 'tcp' or 'websocket' connections. In particular, it allows
-// for the reader to peek and buffer the first n bytes of a stream
-// in order to determine what the connection type is.
-type hybridConn struct {
- conn net.Conn
- buffered []byte
-}
-
-func (wc *hybridConn) Read(b []byte) (int, error) {
- lbuf := len(wc.buffered)
- if lbuf == 0 {
- return wc.conn.Read(b)
- }
- copyn := copy(b, wc.buffered)
- wc.buffered = wc.buffered[copyn:]
- if len(b) > copyn {
- n, err := wc.conn.Read(b[copyn:])
- return copyn + n, err
- }
- return copyn, nil
-}
-
-func (wc *hybridConn) Write(b []byte) (n int, err error) {
- return wc.conn.Write(b)
-}
-
-func (wc *hybridConn) Close() error {
- return wc.conn.Close()
-}
-
-func (wc *hybridConn) LocalAddr() net.Addr {
- return &addr{"wsh", wc.conn.LocalAddr().String()}
-}
-
-func (wc *hybridConn) RemoteAddr() net.Addr {
- return &addr{"wsh", wc.conn.RemoteAddr().String()}
-}
-
-func (wc *hybridConn) SetDeadline(t time.Time) error {
- return wc.conn.SetDeadline(t)
-}
-
-func (wc *hybridConn) SetReadDeadline(t time.Time) error {
- return wc.conn.SetReadDeadline(t)
-}
-
-func (wc *hybridConn) SetWriteDeadline(t time.Time) error {
- return wc.conn.SetWriteDeadline(t)
-}
diff --git a/runtime/internal/lib/websocket/conn_nacl.go b/runtime/internal/lib/websocket/conn_nacl.go
deleted file mode 100644
index ca7e16d..0000000
--- a/runtime/internal/lib/websocket/conn_nacl.go
+++ /dev/null
@@ -1,115 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build nacl
-
-package websocket
-
-import (
- "net"
- "net/url"
- "runtime/ppapi"
- "sync"
- "time"
-
- "v.io/v23/context"
-)
-
-// Ppapi instance which must be set before the Dial is called.
-var PpapiInstance ppapi.Instance
-
-func WebsocketConn(address string, ws *ppapi.WebsocketConn) net.Conn {
- return &wrappedConn{
- address: address,
- ws: ws,
- }
-}
-
-type wrappedConn struct {
- address string
- ws *ppapi.WebsocketConn
- readLock sync.Mutex
- writeLock sync.Mutex
- currBuffer []byte
-}
-
-func Dial(ctx *context.T, protocol, address string, timeout time.Duration) (net.Conn, error) {
- inst := PpapiInstance
- u, err := url.Parse("ws://" + address)
- if err != nil {
- return nil, err
- }
-
- ws, err := inst.DialWebsocket(u.String())
- if err != nil {
- return nil, err
- }
- return WebsocketConn(address, ws), nil
-}
-
-func Resolve(ctx *context.T, protocol, address string) (string, string, error) {
- return "ws", address, nil
-}
-
-func (c *wrappedConn) Read(b []byte) (int, error) {
- c.readLock.Lock()
- defer c.readLock.Unlock()
-
- var err error
- if len(c.currBuffer) == 0 {
- c.currBuffer, err = c.ws.ReceiveMessage()
- if err != nil {
- return 0, err
- }
- }
-
- n := copy(b, c.currBuffer)
- c.currBuffer = c.currBuffer[n:]
- return n, nil
-}
-
-func (c *wrappedConn) Write(b []byte) (int, error) {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- if err := c.ws.SendMessage(b); err != nil {
- return 0, err
- }
- return len(b), nil
-}
-
-func (c *wrappedConn) Close() error {
- return c.ws.Close()
-}
-
-func (c *wrappedConn) LocalAddr() net.Addr {
- return websocketAddr{s: c.address}
-}
-
-func (c *wrappedConn) RemoteAddr() net.Addr {
- return websocketAddr{s: c.address}
-}
-
-func (c *wrappedConn) SetDeadline(t time.Time) error {
- panic("SetDeadline not implemented.")
-}
-
-func (c *wrappedConn) SetReadDeadline(t time.Time) error {
- panic("SetReadDeadline not implemented.")
-}
-
-func (c *wrappedConn) SetWriteDeadline(t time.Time) error {
- panic("SetWriteDeadline not implemented.")
-}
-
-type websocketAddr struct {
- s string
-}
-
-func (websocketAddr) Network() string {
- return "ws"
-}
-
-func (w websocketAddr) String() string {
- return w.s
-}
diff --git a/runtime/internal/lib/websocket/conn_test.go b/runtime/internal/lib/websocket/conn_test.go
deleted file mode 100644
index 292f616..0000000
--- a/runtime/internal/lib/websocket/conn_test.go
+++ /dev/null
@@ -1,120 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build !nacl
-
-package websocket
-
-import (
- "bytes"
- "net"
- "net/http"
- "sync"
- "testing"
- "time"
-
- "github.com/gorilla/websocket"
-
- "v.io/v23/context"
-)
-
-func writer(c net.Conn, data []byte, times int, wg *sync.WaitGroup) {
- defer wg.Done()
- b := []byte{byte(len(data))}
- b = append(b, data...)
- for i := 0; i < times; i++ {
- c.Write(b)
- }
-}
-
-func readMessage(c net.Conn) ([]byte, error) {
- var length [1]byte
- // Read the size
- for {
- n, err := c.Read(length[:])
- if err != nil {
- return nil, err
- }
- if n == 1 {
- break
- }
- }
- size := int(length[0])
- buf := make([]byte, size)
- n := 0
- for n < size {
- nn, err := c.Read(buf[n:])
- if err != nil {
- return buf, err
- }
- n += nn
- }
-
- return buf, nil
-}
-
-func reader(t *testing.T, c net.Conn, expected []byte, totalWrites int) {
- totalReads := 0
- for buf, err := readMessage(c); err == nil; buf, err = readMessage(c) {
- totalReads++
- if !bytes.Equal(buf, expected) {
- t.Errorf("Unexpected message %v, expected %v", buf, expected)
- }
- }
- if totalReads != totalWrites {
- t.Errorf("wrong number of messages expected %v, got %v", totalWrites, totalReads)
- }
-}
-
-func TestMultipleGoRoutines(t *testing.T) {
- l, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatalf("Failed to listen: %v", err)
- }
- addr := l.Addr()
- input := []byte("no races here")
- const numWriters int = 12
- const numWritesPerWriter int = 1000
- const totalWrites int = numWriters * numWritesPerWriter
- s := &http.Server{
- Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "GET" {
- http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
- return
- }
- ws, err := websocket.Upgrade(w, r, nil, 1024, 1024)
- if _, ok := err.(websocket.HandshakeError); ok {
- http.Error(w, "Not a websocket handshake", 400)
- return
- } else if err != nil {
- http.Error(w, "Internal Error", 500)
- return
- }
- reader(t, WebsocketConn(ws), input, totalWrites)
- }),
- }
- // Dial out in another go routine
- go func() {
- ctx, _ := context.RootContext()
- conn, err := Dial(ctx, "tcp", addr.String(), time.Second)
- numTries := 0
- for err != nil && numTries < 5 {
- numTries++
- time.Sleep(time.Second)
- }
-
- if err != nil {
- t.Fatalf("failed to connect to server: %v", err)
- }
- var writers sync.WaitGroup
- writers.Add(numWriters)
- for i := 0; i < numWriters; i++ {
- go writer(conn, input, numWritesPerWriter, &writers)
- }
- writers.Wait()
- conn.Close()
- l.Close()
- }()
- s.Serve(l)
-}
diff --git a/runtime/internal/lib/websocket/dialer.go b/runtime/internal/lib/websocket/dialer.go
deleted file mode 100644
index f7a3b21..0000000
--- a/runtime/internal/lib/websocket/dialer.go
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build !nacl
-
-package websocket
-
-import (
- "net"
- "net/http"
- "net/url"
- "time"
-
- "github.com/gorilla/websocket"
-
- "v.io/x/ref/runtime/internal/lib/tcputil"
-
- "v.io/v23/context"
-)
-
-func Dial(ctx *context.T, protocol, address string, timeout time.Duration) (net.Conn, error) {
- var then time.Time
- if timeout > 0 {
- then = time.Now().Add(timeout)
- }
- tcp := mapWebSocketToTCP[protocol]
- conn, err := net.DialTimeout(tcp, address, timeout)
- if err != nil {
- return nil, err
- }
- conn.SetReadDeadline(then)
- if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
- return nil, err
- }
- u, err := url.Parse("ws://" + address)
- if err != nil {
- return nil, err
- }
- ws, _, err := websocket.NewClient(conn, u, http.Header{}, 4096, 4096)
- if err != nil {
- return nil, err
- }
- var zero time.Time
- conn.SetDeadline(zero)
- return WebsocketConn(ws), nil
-}
diff --git a/runtime/internal/lib/websocket/hybrid.go b/runtime/internal/lib/websocket/hybrid.go
deleted file mode 100644
index fb92d0a..0000000
--- a/runtime/internal/lib/websocket/hybrid.go
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package websocket
-
-import (
- "net"
- "time"
-
- "v.io/x/ref/runtime/internal/lib/tcputil"
-
- "v.io/v23/context"
-)
-
-// TODO(jhahn): Figure out a way for this mapping to be shared.
-var mapWebSocketToTCP = map[string]string{"ws": "tcp", "ws4": "tcp4", "ws6": "tcp6", "wsh": "tcp", "wsh4": "tcp4", "wsh6": "tcp6", "tcp": "tcp", "tcp4": "tcp4", "tcp6": "tcp6"}
-
-// HybridDial returns net.Conn that can be used with a HybridListener but
-// always uses tcp. A client must specifically elect to use websockets by
-// calling websocket.Dialer. The returned net.Conn will report 'tcp' as its
-// Network.
-func HybridDial(ctx *context.T, network, address string, timeout time.Duration) (net.Conn, error) {
- tcp := mapWebSocketToTCP[network]
- conn, err := net.DialTimeout(tcp, address, timeout)
- if err != nil {
- return nil, err
- }
- if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
- return nil, err
- }
- return conn, nil
-}
-
-// HybridResolve performs a DNS resolution on the network, address and always
-// returns tcp as its Network.
-func HybridResolve(ctx *context.T, network, address string) (string, string, error) {
- tcp := mapWebSocketToTCP[network]
- tcpAddr, err := net.ResolveTCPAddr(tcp, address)
- if err != nil {
- return "", "", err
- }
- return tcp, tcpAddr.String(), nil
-}
-
-// HybridListener returns a net.Listener that supports both tcp and
-// websockets over the same, single, port. A listen address of
-// --v23.tcp.protocol=wsh --v23.tcp.address=127.0.0.1:8101 means
-// that port 8101 can accept connections that use either tcp or websocket.
-// The listener looks at the first 4 bytes of the incoming data stream
-// to decide if it's a websocket protocol or not. These must be 'GET ' for
-// websockets, all other protocols must guarantee to not send 'GET ' as the
-// first four bytes of the payload.
-func HybridListener(ctx *context.T, protocol, address string) (net.Listener, error) {
- return listener(protocol, address, true)
-}
diff --git a/runtime/internal/lib/websocket/listener.go b/runtime/internal/lib/websocket/listener.go
deleted file mode 100644
index f31f096..0000000
--- a/runtime/internal/lib/websocket/listener.go
+++ /dev/null
@@ -1,233 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build !nacl
-
-package websocket
-
-import (
- "errors"
- "io"
- "net"
- "net/http"
- "sync"
- "time"
-
- "github.com/gorilla/websocket"
-
- "v.io/x/ref/internal/logger"
- "v.io/x/ref/runtime/internal/lib/tcputil"
-
- "v.io/v23/context"
-)
-
-var errListenerIsClosed = errors.New("Listener has been Closed")
-
-const (
- bufferSize = 4096
- classificationTime = 10 * time.Second
-)
-
-// A listener that is able to handle either raw tcp or websocket requests.
-type wsTCPListener struct {
- closed bool // GUARDED_BY(mu)
- mu sync.Mutex
-
- acceptQ chan interface{} // net.Conn or error returned by netLn.Accept
- httpQ chan net.Conn // Candidates for websocket upgrades before being added to acceptQ
- netLn net.Listener // The underlying listener
- httpReq sync.WaitGroup // Number of active HTTP requests
- hybrid bool // true if running in 'hybrid' mode
-}
-
-// chanListener implements net.Listener, with Accept reading from c.
-type chanListener struct {
- net.Listener // Embedded for all other net.Listener functionality.
- c <-chan net.Conn
-}
-
-func (ln *chanListener) Accept() (net.Conn, error) {
- conn, ok := <-ln.c
- if !ok {
- return nil, errListenerIsClosed
- }
- return conn, nil
-}
-
-func Listener(ctx *context.T, protocol, address string) (net.Listener, error) {
- return listener(protocol, address, false)
-}
-
-func listener(protocol, address string, hybrid bool) (net.Listener, error) {
- netLn, err := net.Listen(mapWebSocketToTCP[protocol], address)
- if err != nil {
- return nil, err
- }
- ln := &wsTCPListener{
- acceptQ: make(chan interface{}),
- httpQ: make(chan net.Conn),
- netLn: netLn,
- hybrid: hybrid,
- }
- go ln.netAcceptLoop()
- httpsrv := http.Server{Handler: ln}
- go httpsrv.Serve(&chanListener{Listener: ln, c: ln.httpQ})
- return ln, nil
-}
-
-func (ln *wsTCPListener) Accept() (net.Conn, error) {
- for {
- item, ok := <-ln.acceptQ
- if !ok {
- return nil, errListenerIsClosed
- }
- switch v := item.(type) {
- case net.Conn:
- return v, nil
- case error:
- return nil, v
- default:
- logger.Global().Errorf("Unexpected type %T in channel (%v)", v, v)
- }
- }
-}
-
-func (ln *wsTCPListener) Close() error {
- ln.mu.Lock()
- if ln.closed {
- ln.mu.Unlock()
- return errListenerIsClosed
- }
- ln.closed = true
- ln.mu.Unlock()
- addr := ln.netLn.Addr()
- err := ln.netLn.Close()
- logger.Global().VI(1).Infof("Closed net.Listener on (%q, %q): %v", addr.Network(), addr, err)
- // netAcceptLoop might be trying to push new TCP connections that
- // arrived while the listener was being closed. Drop those.
- drainChan(ln.acceptQ)
- return nil
-}
-
-func (ln *wsTCPListener) netAcceptLoop() {
- var classifications sync.WaitGroup
- defer func() {
- // This sequence of closures is carefully curated based on the
- // following invariants:
- // (1) All calls to ln.classify have been added to classifications.
- // (2) Only ln.classify sends on ln.httpQ
- // (3) All calls to ln.ServeHTTP have been added to ln.httpReq
- // (4) Sends on ln.acceptQ are done by either ln.netAcceptLoop ro ln.ServeHTTP
- classifications.Wait()
- close(ln.httpQ)
- ln.httpReq.Wait()
- close(ln.acceptQ)
- }()
- for {
- conn, err := ln.netLn.Accept()
- if err != nil {
- // If the listener has been closed, quit - otherwise
- // propagate the error.
- ln.mu.Lock()
- closed := ln.closed
- ln.mu.Unlock()
- if closed {
- return
- }
- ln.acceptQ <- err
- continue
- }
- logger.Global().VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
- if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
- logger.Global().Errorf("Failed to enable TCP keep alive: %v", err)
- }
- classifications.Add(1)
- go ln.classify(conn, &classifications)
- }
-}
-
-// classify classifies conn as either an HTTP connection or a non-HTTP one.
-//
-// If the latter, then the connection is added to ln.acceptQ.
-// If the former, then the connection is queued up for a websocket upgrade.
-func (ln *wsTCPListener) classify(conn net.Conn, done *sync.WaitGroup) {
- defer done.Done()
- isHTTP := true
- if ln.hybrid {
- conn.SetReadDeadline(time.Now().Add(classificationTime))
- defer conn.SetReadDeadline(time.Time{})
- var magic [1]byte
- n, err := io.ReadFull(conn, magic[:])
- if err != nil {
- // Unable to classify, ignore this connection.
- logger.Global().VI(1).Infof("Shutting down connection from %v since the magic bytes could not be read: %v", conn.RemoteAddr(), err)
- conn.Close()
- return
- }
- conn = &hybridConn{conn: conn, buffered: magic[:n]}
- isHTTP = magic[0] == 'G'
- }
- if isHTTP {
- ln.httpReq.Add(1)
- ln.httpQ <- conn
- return
- }
- ln.acceptQ <- conn
-}
-
-func (ln *wsTCPListener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- defer ln.httpReq.Done()
- if r.Method != "GET" {
- http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
- return
- }
- ws, err := websocket.Upgrade(w, r, nil, bufferSize, bufferSize)
- if _, ok := err.(websocket.HandshakeError); ok {
- // Close the connection to not serve HTTP requests from this connection
- // any more. Otherwise panic from negative httpReq counter can occur.
- // Although go http.Server gracefully shutdowns the server from a panic,
- // it would be nice to avoid it.
- w.Header().Set("Connection", "close")
- http.Error(w, "Not a websocket handshake", http.StatusBadRequest)
- logger.Global().Errorf("Rejected a non-websocket request: %v", err)
- return
- }
- if err != nil {
- w.Header().Set("Connection", "close")
- http.Error(w, "Internal Error", http.StatusInternalServerError)
- logger.Global().Errorf("Rejected a non-websocket request: %v", err)
- return
- }
- ln.acceptQ <- WebsocketConn(ws)
-}
-
-type addr struct{ n, a string }
-
-func (a addr) Network() string {
- return a.n
-}
-
-func (a addr) String() string {
- return a.a
-}
-
-func (ln *wsTCPListener) Addr() net.Addr {
- protocol := "ws"
- if ln.hybrid {
- protocol = "wsh"
- }
- return addr{protocol, ln.netLn.Addr().String()}
-}
-
-func drainChan(c <-chan interface{}) {
- for {
- item, ok := <-c
- if !ok {
- return
- }
- if conn, ok := item.(net.Conn); ok {
- conn.Close()
- }
- }
-}
diff --git a/runtime/internal/lib/websocket/listener_nacl.go b/runtime/internal/lib/websocket/listener_nacl.go
deleted file mode 100644
index ebf6255..0000000
--- a/runtime/internal/lib/websocket/listener_nacl.go
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build nacl
-
-package websocket
-
-import (
- "fmt"
- "net"
-
- "v.io/v23/context"
-)
-
-// Websocket listeners are not supported in NaCl.
-// This file is needed for compilation only.
-func listener(protocol, address string, hybrid bool) (net.Listener, error) {
- return nil, fmt.Errorf("Websocket Listener called in nacl code!")
-}
-
-func Listener(ctx *context.T, protocol, address string) (net.Listener, error) {
- return nil, fmt.Errorf("Websocket Listener called in nacl code!")
-}
diff --git a/runtime/internal/lib/websocket/listener_test.go b/runtime/internal/lib/websocket/listener_test.go
deleted file mode 100644
index f19ddd3..0000000
--- a/runtime/internal/lib/websocket/listener_test.go
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build !nacl
-
-package websocket
-
-import (
- "bytes"
- "log"
- "net"
- "strings"
- "testing"
- "time"
-
- "v.io/v23/context"
-)
-
-func TestAcceptsAreNotSerialized(t *testing.T) {
- ctx, _ := context.RootContext()
- ln, err := HybridListener(ctx, "wsh", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
- defer func() { go ln.Close() }()
- portscan := make(chan struct{})
-
- // Goroutine that continuously accepts connections.
- go func() {
- for {
- conn, err := ln.Accept()
- if err != nil {
- return
- }
- defer conn.Close()
- }
- }()
-
- // Imagine some client was port scanning and thus opened a TCP
- // connection (but never sent the bytes)
- go func() {
- conn, err := net.Dial("tcp", ln.Addr().String())
- if err != nil {
- t.Error(err)
- }
- close(portscan)
- // Keep the connection alive by blocking on a read. (The read
- // should return once the test exits).
- conn.Read(make([]byte, 1024))
- }()
- // Another client that dials a legitimate connection should not be
- // blocked on the portscanner.
- // (Wait for the portscanner to establish the TCP connection first).
- <-portscan
- conn, err := Dial(ctx, ln.Addr().Network(), ln.Addr().String(), time.Second)
- if err != nil {
- t.Fatal(err)
- }
- conn.Close()
-}
-
-func TestNonWebsocketRequest(t *testing.T) {
- ctx, _ := context.RootContext()
- ln, err := HybridListener(ctx, "wsh", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
- defer func() { go ln.Close() }()
-
- // Goroutine that continuously accepts connections.
- go func() {
- for {
- _, err := ln.Accept()
- if err != nil {
- return
- }
- }
- }()
-
- var out bytes.Buffer
- log.SetOutput(&out)
-
- // Imagine some client keeps sending non-websocket requests.
- conn, err := net.Dial("tcp", ln.Addr().String())
- if err != nil {
- t.Error(err)
- }
- for i := 0; i < 2; i++ {
- conn.Write([]byte("GET / HTTP/1.1\r\n\r\n"))
- conn.Read(make([]byte, 1024))
- }
-
- logs := out.String()
- if strings.Contains(logs, "panic") {
- t.Errorf("Unexpected panic:\n%s", logs)
- }
-}
diff --git a/runtime/internal/lib/websocket/resolver.go b/runtime/internal/lib/websocket/resolver.go
deleted file mode 100644
index 5a99c23..0000000
--- a/runtime/internal/lib/websocket/resolver.go
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build !nacl
-
-package websocket
-
-import (
- "net"
-
- "v.io/v23/context"
-)
-
-// Resolve performs a DNS resolution on the provided protocol and address.
-func Resolve(ctx *context.T, protocol, address string) (string, string, error) {
- tcp := mapWebSocketToTCP[protocol]
- tcpAddr, err := net.ResolveTCPAddr(tcp, address)
- if err != nil {
- return "", "", err
- }
- return "ws", tcpAddr.String(), nil
-}
diff --git a/runtime/internal/lib/websocket/util_test.go b/runtime/internal/lib/websocket/util_test.go
deleted file mode 100644
index f7a7c4e..0000000
--- a/runtime/internal/lib/websocket/util_test.go
+++ /dev/null
@@ -1,290 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package websocket_test
-
-import (
- "encoding/gob"
- "fmt"
- "hash/crc64"
- "io"
- "math/rand"
- "net"
- "sync"
- "testing"
- "time"
-
- "v.io/v23/context"
- "v.io/v23/rpc"
-)
-
-var crcTable *crc64.Table
-
-func init() {
- crcTable = crc64.MakeTable(crc64.ISO)
-}
-
-func newSender(t *testing.T, dialer rpc.DialerFunc, protocol, address string) net.Conn {
- ctx, _ := context.RootContext()
- conn, err := dialer(ctx, protocol, address, time.Minute)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- return nil
- }
- return conn
-}
-
-func checkProtocols(conn net.Conn, tx string) error {
- expectedProtocol := map[string]string{
- "ws": "ws", "wsh": "tcp", "tcp": "tcp",
- }
- if got, want := conn.LocalAddr().Network(), expectedProtocol[tx]; got != want {
- return fmt.Errorf("wrong local protocol: got %q, want %q", got, want)
- }
- // Can't tell that the remote protocol is really 'wsh'
- if got, want := conn.RemoteAddr().Network(), expectedProtocol[tx]; got != want {
- return fmt.Errorf("wrong remote protocol: got %q, want %q", got, want)
- }
- return nil
-}
-
-type packet struct {
- Data []byte
- Size int
- CRC64 uint64
-}
-
-func createPacket() *packet {
- p := &packet{}
- p.Size = rand.Intn(4 * 1024)
- p.Data = make([]byte, p.Size)
- for i := 0; i < p.Size; i++ {
- p.Data[i] = byte(rand.Int() & 0xff)
- }
- p.CRC64 = crc64.Checksum([]byte(p.Data), crcTable)
- return p
-}
-
-func checkPacket(p *packet) error {
- if got, want := len(p.Data), p.Size; got != want {
- return fmt.Errorf("wrong sizes: got %d, want %d", got, want)
- }
- crc := crc64.Checksum(p.Data, crcTable)
- if got, want := crc, p.CRC64; got != want {
- return fmt.Errorf("wrong crc: got %d, want %d", got, want)
- }
- return nil
-}
-
-type backChannel struct {
- crcChan chan uint64
- byteChan chan []byte
- errChan chan error
-}
-
-type bcTable struct {
- ready *sync.Cond
- sync.Mutex
- bc map[string]*backChannel
-}
-
-var globalBCTable bcTable
-
-func init() {
- globalBCTable.ready = sync.NewCond(&globalBCTable)
- globalBCTable.bc = make(map[string]*backChannel)
-}
-
-func (bt *bcTable) waitfor(key string) *backChannel {
- bt.Lock()
- defer bt.Unlock()
- for {
- bc := bt.bc[key]
- if bc != nil {
- delete(bt.bc, key)
- return bc
- }
- bt.ready.Wait()
- }
-}
-
-func (bt *bcTable) add(key string, bc *backChannel) {
- bt.Lock()
- bt.bc[key] = bc
- bt.Unlock()
- bt.ready.Broadcast()
-}
-
-func packetReceiver(t *testing.T, ln net.Listener, bc *backChannel) {
- conn, err := ln.Accept()
- if err != nil {
- close(bc.crcChan)
- close(bc.errChan)
- return
- }
-
- globalBCTable.add(conn.RemoteAddr().String(), bc)
-
- defer conn.Close()
- dec := gob.NewDecoder(conn)
- rxed := 0
- for {
- var p packet
- err := dec.Decode(&p)
- if err != nil {
- if err != io.EOF {
- bc.errChan <- fmt.Errorf("unexpected error: %s", err)
- }
- close(bc.crcChan)
- close(bc.errChan)
- return
- }
- if err := checkPacket(&p); err != nil {
- bc.errChan <- fmt.Errorf("unexpected error: %s", err)
- }
- bc.crcChan <- p.CRC64
- rxed++
- }
-}
-
-func packetSender(t *testing.T, nPackets int, conn net.Conn) {
- txCRCs := make([]uint64, nPackets)
- enc := gob.NewEncoder(conn)
- for i := 0; i < nPackets; i++ {
- p := createPacket()
- txCRCs[i] = p.CRC64
- if err := enc.Encode(p); err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- }
- conn.Close() // Close the connection so that the receiver quits.
-
- bc := globalBCTable.waitfor(conn.LocalAddr().String())
- for err := range bc.errChan {
- if err != nil {
- t.Fatalf(err.Error())
- }
- }
-
- rxed := 0
- for rxCRC := range bc.crcChan {
- if got, want := rxCRC, txCRCs[rxed]; got != want {
- t.Errorf("%s -> %s: packet %d: mismatched CRCs: got %d, want %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), rxed, got, want)
- }
- rxed++
- }
- if got, want := rxed, nPackets; got != want {
- t.Fatalf("%s -> %s: got %d, want %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
- }
-}
-
-func packetRunner(t *testing.T, ln net.Listener, dialer rpc.DialerFunc, protocol, address string) {
- nPackets := 100
- go packetReceiver(t, ln, &backChannel{
- crcChan: make(chan uint64, nPackets),
- errChan: make(chan error, nPackets),
- })
-
- conn := newSender(t, dialer, protocol, address)
- if err := checkProtocols(conn, protocol); err != nil {
- t.Fatalf(err.Error())
- }
- packetSender(t, nPackets, conn)
-}
-
-func byteReceiver(t *testing.T, ln net.Listener, bc *backChannel) {
- conn, err := ln.Accept()
- if err != nil {
- close(bc.byteChan)
- close(bc.errChan)
- return
- }
- globalBCTable.add(conn.RemoteAddr().String(), bc)
-
- defer conn.Close()
- rxed := 0
- for {
- buf := make([]byte, rxed+1)
- n, err := conn.Read(buf)
- if err != nil {
- if err != io.EOF {
- bc.errChan <- fmt.Errorf("unexpected error: %s", err)
- }
- close(bc.byteChan)
- close(bc.errChan)
- return
- }
- if got, want := n, len(buf[:n]); got != want {
- bc.errChan <- fmt.Errorf("%s -> %s: got %d bytes, expected %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
- }
- if got, want := buf[0], byte(0xff); got != want {
- bc.errChan <- fmt.Errorf("%s -> %s: got %x, want %x", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
- }
- bc.byteChan <- buf[:n]
- rxed++
- }
-}
-
-func byteSender(t *testing.T, nIterations int, conn net.Conn) {
- txBytes := make([][]byte, nIterations+1)
- for i := 0; i < nIterations; i++ {
- p := make([]byte, i+1)
- p[0] = 0xff
- for j := 1; j <= i; j++ {
- p[j] = byte(64 + i) // start at ASCII A
- }
- txBytes[i] = p
- n, err := conn.Write(p)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- if got, want := n, i+1; got != want {
- t.Fatalf("wrote %d, not %d bytes", got, want)
- }
- }
- conn.Close()
-
- bc := globalBCTable.waitfor(conn.LocalAddr().String())
-
- for err := range bc.errChan {
- if err != nil {
- t.Fatalf(err.Error())
- }
- }
-
- addr := fmt.Sprintf("%s -> %s", conn.LocalAddr().String(), conn.RemoteAddr().String())
- rxed := 0
- for rxBytes := range bc.byteChan {
- if got, want := len(rxBytes), rxed+1; got != want {
- t.Fatalf("%s: got %d, want %d bytes", addr, got, want)
- }
- if got, want := rxBytes[0], byte(0xff); got != want {
- t.Fatalf("%s: got %x, want %x", addr, got, want)
- }
- for i := 0; i < len(rxBytes); i++ {
- if got, want := rxBytes[i], txBytes[rxed][i]; got != want {
- t.Fatalf("%s: got %c, want %c", addr, got, want)
- }
- }
- rxed++
- }
- if got, want := rxed, nIterations; got != want {
- t.Fatalf("%s: got %d, want %d", addr, got, want)
- }
-}
-
-func byteRunner(t *testing.T, ln net.Listener, dialer rpc.DialerFunc, protocol, address string) {
- nIterations := 10
- go byteReceiver(t, ln, &backChannel{
- byteChan: make(chan []byte, nIterations),
- errChan: make(chan error, nIterations),
- })
-
- conn := newSender(t, dialer, protocol, address)
- defer conn.Close()
- if err := checkProtocols(conn, protocol); err != nil {
- t.Fatalf(err.Error())
- }
- byteSender(t, nIterations, conn)
-}
diff --git a/runtime/internal/lib/websocket/ws_test.go b/runtime/internal/lib/websocket/ws_test.go
deleted file mode 100644
index a193885..0000000
--- a/runtime/internal/lib/websocket/ws_test.go
+++ /dev/null
@@ -1,106 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package websocket_test
-
-import (
- "net"
- "sync"
- "testing"
- "time"
-
- "v.io/v23/context"
- "v.io/v23/rpc"
- "v.io/x/ref/runtime/internal/lib/websocket"
-)
-
-func packetTester(t *testing.T, dialer rpc.DialerFunc, listener rpc.ListenerFunc, txProtocol, rxProtocol string) {
- ctx, _ := context.RootContext()
- ln, err := listener(ctx, rxProtocol, "127.0.0.1:0")
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- defer ln.Close()
- if got, want := ln.Addr().Network(), rxProtocol; got != want {
- t.Fatalf("got %q, want %q", got, want)
- }
-
- packetRunner(t, ln, dialer, txProtocol, ln.Addr().String())
- packetRunner(t, ln, dialer, txProtocol, ln.Addr().String())
-}
-
-func byteTester(t *testing.T, dialer rpc.DialerFunc, listener rpc.ListenerFunc, txProtocol, rxProtocol string) {
- ctx, _ := context.RootContext()
- ln, err := listener(ctx, rxProtocol, "127.0.0.1:0")
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- defer ln.Close()
- if got, want := ln.Addr().Network(), rxProtocol; got != want {
- t.Fatalf("got %q, want %q", got, want)
- }
-
- byteRunner(t, ln, dialer, txProtocol, ln.Addr().String())
- byteRunner(t, ln, dialer, txProtocol, ln.Addr().String())
-
-}
-
-func simpleDial(ctx *context.T, p, a string, timeout time.Duration) (net.Conn, error) {
- return net.DialTimeout(p, a, timeout)
-}
-
-func TestWSToWS(t *testing.T) {
- byteTester(t, websocket.Dial, websocket.Listener, "ws", "ws")
- packetTester(t, websocket.Dial, websocket.Listener, "ws", "ws")
-}
-
-func TestWSToWSH(t *testing.T) {
- byteTester(t, websocket.Dial, websocket.HybridListener, "ws", "wsh")
- //packetTester(t, websocket.Dial, websocket.HybridListener, "ws", "wsh")
-}
-
-func TestWSHToWSH(t *testing.T) {
- byteTester(t, websocket.HybridDial, websocket.HybridListener, "wsh", "wsh")
- packetTester(t, websocket.HybridDial, websocket.HybridListener, "wsh", "wsh")
-}
-
-func TestTCPToWSH(t *testing.T) {
- byteTester(t, simpleDial, websocket.HybridListener, "tcp", "wsh")
- packetTester(t, simpleDial, websocket.HybridListener, "tcp", "wsh")
-}
-
-func TestMixed(t *testing.T) {
- ctx, _ := context.RootContext()
- ln, err := websocket.HybridListener(ctx, "wsh", "127.0.0.1:0")
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- defer ln.Close()
-
- var pwg sync.WaitGroup
- packetTest := func(dialer rpc.DialerFunc, protocol string) {
- packetRunner(t, ln, dialer, protocol, ln.Addr().String())
- pwg.Done()
- }
-
- pwg.Add(4)
- go packetTest(websocket.Dial, "ws")
- go packetTest(simpleDial, "tcp")
- go packetTest(websocket.Dial, "ws")
- go packetTest(websocket.HybridDial, "wsh")
- pwg.Wait()
-
- var bwg sync.WaitGroup
- byteTest := func(dialer rpc.DialerFunc, protocol string) {
- byteRunner(t, ln, dialer, protocol, ln.Addr().String())
- bwg.Done()
- }
- bwg.Add(4)
- go byteTest(websocket.Dial, "ws")
- go byteTest(simpleDial, "tcp")
- go byteTest(websocket.Dial, "ws")
- go byteTest(websocket.HybridDial, "wsh")
-
- bwg.Wait()
-}
diff --git a/runtime/internal/naming/namespace/all_test.go b/runtime/internal/naming/namespace/all_test.go
index 20e550e..277c65c 100644
--- a/runtime/internal/naming/namespace/all_test.go
+++ b/runtime/internal/naming/namespace/all_test.go
@@ -186,18 +186,10 @@
doResolveTest(t, "ResolveToMountTable", ns.ResolveToMountTable, ctx, name, want)
}
-func testResolveToMountTableWithPattern(t *testing.T, ctx *context.T, ns namespace.T, name string, pattern naming.NamespaceOpt, want ...string) {
- doResolveTest(t, "ResolveToMountTable", ns.ResolveToMountTable, ctx, name, want, pattern)
-}
-
func testResolve(t *testing.T, ctx *context.T, ns namespace.T, name string, want ...string) {
doResolveTest(t, "Resolve", ns.Resolve, ctx, name, want)
}
-func testResolveWithPattern(t *testing.T, ctx *context.T, ns namespace.T, name string, pattern naming.NamespaceOpt, want ...string) {
- doResolveTest(t, "Resolve", ns.Resolve, ctx, name, want, pattern)
-}
-
type serverEntry struct {
mountPoint string
stop func() error
@@ -631,14 +623,6 @@
}
}
-func bless(blesser, delegate security.Principal, extension string) {
- b, err := blesser.Bless(delegate.PublicKey(), blesser.BlessingStore().Default(), extension, security.UnconstrainedUse())
- if err != nil {
- panic(err)
- }
- delegate.BlessingStore().SetDefault(b)
-}
-
func TestAuthorizationDuringResolve(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
diff --git a/runtime/internal/naming/namespace/mount.go b/runtime/internal/naming/namespace/mount.go
index f181b08..9c0390c 100644
--- a/runtime/internal/naming/namespace/mount.go
+++ b/runtime/internal/naming/namespace/mount.go
@@ -11,7 +11,6 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
- "v.io/v23/security"
"v.io/x/ref/lib/apilog"
)
@@ -80,11 +79,3 @@
ctx.VI(1).Infof("Remove(%s, %v) -> %v", name, deleteSubtree, err)
return err
}
-
-func str2pattern(strs []string) (ret []security.BlessingPattern) {
- ret = make([]security.BlessingPattern, len(strs))
- for i, s := range strs {
- ret[i] = security.BlessingPattern(s)
- }
- return
-}
diff --git a/runtime/internal/naming/namespace/namespace.go b/runtime/internal/naming/namespace/namespace.go
index 38dd7e7..d02494f 100644
--- a/runtime/internal/naming/namespace/namespace.go
+++ b/runtime/internal/naming/namespace/namespace.go
@@ -182,16 +182,6 @@
return ctx
}
-// withTimeoutAndCancel returns a new context with a deadline and a cancellation function.
-func withTimeoutAndCancel(ctx *context.T) (nctx *context.T, cancel context.CancelFunc) {
- if _, ok := ctx.Deadline(); !ok {
- nctx, cancel = context.WithTimeout(ctx, callTimeout)
- } else {
- nctx, cancel = context.WithCancel(ctx)
- }
- return
-}
-
// CacheCtl implements naming.Namespace.CacheCtl
func (ns *namespace) CacheCtl(ctls ...naming.CacheCtl) []naming.CacheCtl {
defer apilog.LogCallf(nil, "ctls...=%v", ctls)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
diff --git a/runtime/internal/naming/namespace/stub.go b/runtime/internal/naming/namespace/stub.go
index 9c0ca59..5bc30d3 100644
--- a/runtime/internal/naming/namespace/stub.go
+++ b/runtime/internal/naming/namespace/stub.go
@@ -12,10 +12,3 @@
}
return
}
-
-func convertStringsToServers(servers []string) (ret []naming.MountedServer) {
- for _, s := range servers {
- ret = append(ret, naming.MountedServer{Server: s})
- }
- return
-}
diff --git a/runtime/internal/rpc/benchmark/simple/AndroidManifest.xml b/runtime/internal/rpc/benchmark/simple/AndroidManifest.xml
new file mode 100644
index 0000000..41917b0
--- /dev/null
+++ b/runtime/internal/rpc/benchmark/simple/AndroidManifest.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Copyright 2015 The Vanadium Authors. All rights reserved.
+Use of this source code is governed by a BSD-style
+license that can be found in the LICENSE file.
+-->
+<manifest
+ xmlns:android="http://schemas.android.com/apk/res/android"
+ package="io.v.x.ref.runtime.internal.rpc.benchmark.simple"
+ android:versionCode="1"
+ android:versionName="1.0">
+
+ <!-- http://developer.android.com/guide/topics/manifest/manifest-intro.html#perms -->
+ <uses-permission android:name="android.permission.INTERNET" />
+
+ <application android:label="v23rpc" android:debuggable="true">
+
+ <activity android:name="org.golang.app.GoNativeActivity"
+ android:label="v23rpc"
+ android:configChanges="orientation|keyboardHidden">
+ <meta-data android:name="android.app.lib_name" android:value="v23rpc" />
+ <intent-filter>
+ <action android:name="android.intent.action.MAIN" />
+ <category android:name="android.intent.category.LAUNCHER" />
+ </intent-filter>
+ </activity>
+ </application>
+</manifest>
diff --git a/runtime/internal/rpc/benchmark/simple/benchmark.go b/runtime/internal/rpc/benchmark/simple/benchmark.go
new file mode 100644
index 0000000..482b0fb
--- /dev/null
+++ b/runtime/internal/rpc/benchmark/simple/benchmark.go
@@ -0,0 +1,208 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "runtime"
+ "testing"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/x/lib/ibe"
+ "v.io/x/ref/lib/security/bcrypter"
+ "v.io/x/ref/lib/security/securityflag"
+ _ "v.io/x/ref/runtime/factories/roaming"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
+ fmanager "v.io/x/ref/runtime/internal/flow/manager"
+ "v.io/x/ref/runtime/internal/rpc/benchmark/internal"
+ "v.io/x/ref/test"
+ "v.io/x/ref/test/benchmark"
+ "v.io/x/ref/test/testutil"
+)
+
+const (
+ payloadSize = 1000
+ chunkCnt = 10000
+
+ bulkPayloadSize = 1000000
+
+ numCPUs = 2
+ defaultBenchTime = 5 * time.Second
+)
+
+var ctx *context.T
+
+type benchFun func(b *testing.B)
+
+func newServer(ctx *context.T, opts ...rpc.ServerOpt) (*context.T, naming.Endpoint) {
+ ctx, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie(), opts...)
+ if err != nil {
+ ctx.Fatalf("NewServer failed: %v", err)
+ }
+ return ctx, server.Status().Endpoints[0]
+}
+
+// Benchmark for measuring RPC connection time including authentication.
+//
+// rpc.Client doesn't export an interface for closing connection. So we
+// use the stream manager directly here.
+func benchmarkRPCConnection(b *testing.B) {
+ mp := runtime.GOMAXPROCS(numCPUs)
+ defer runtime.GOMAXPROCS(mp)
+
+ ctx, serverEP := newServer(ctx)
+
+ principal := testutil.NewPrincipal("test")
+ nctx, _ := v23.WithPrincipal(ctx, principal)
+
+ b.StopTimer()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ mctx, cancel := context.WithCancel(nctx)
+ m := fmanager.New(mctx, naming.FixedRoutingID(0xc), nil, 0)
+ b.StartTimer()
+ _, err := m.Dial(mctx, serverEP, flowtest.AllowAllPeersAuthorizer{}, 0)
+ if err != nil {
+ ctx.Fatalf("Dial failed: %v", err)
+ }
+ b.StopTimer()
+ cancel()
+ <-m.Closed()
+ }
+}
+
+// Benchmark for measuring RPC connection time when using private mutual
+// authentication. 'serverAuth' is the authorization policy used by the
+// server while revealing its blessings, and 'clientBlessing' is the blessing
+// used by the client.
+//
+// The specific protocol being benchmarked is Protocol 3 from the doc:
+// https://docs.google.com/document/d/1FpLJSiKy4sXxRUSZh1BQrhUEn7io-dGW7y-DMszI21Q/edit
+func benchmarkPrivateRPCConnection(ctx *context.T, serverAuth []security.BlessingPattern, clientBlessing string) benchFun {
+ return func(b *testing.B) {
+ mp := runtime.GOMAXPROCS(numCPUs)
+ defer runtime.GOMAXPROCS(mp)
+
+ ctx, privateServerEP := newServer(ctx, options.ServerPeers(serverAuth))
+
+ principal := testutil.NewPrincipal(clientBlessing)
+ nctx, _ := v23.WithPrincipal(ctx, principal)
+
+ b.StopTimer()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ mctx, cancel := context.WithCancel(nctx)
+ m := fmanager.New(mctx, naming.FixedRoutingID(0xc), nil, 0)
+ b.StartTimer()
+ _, err := m.Dial(nctx, privateServerEP, flowtest.AllowAllPeersAuthorizer{}, 0)
+ if err != nil {
+ ctx.Fatalf("Dial failed: %v", err)
+ }
+ b.StopTimer()
+ cancel()
+ <-m.Closed()
+ }
+ }
+}
+
+// Benchmark for non-streaming RPC.
+func benchmarkRPC(b *testing.B) {
+ ctx, serverEP := newServer(ctx)
+ mp := runtime.GOMAXPROCS(numCPUs)
+ defer runtime.GOMAXPROCS(mp)
+ internal.CallEcho(b, ctx, serverEP.Name(), b.N, payloadSize, benchmark.NewStats(1))
+}
+
+// Benchmark for streaming RPC.
+func benchmarkStreamingRPC(b *testing.B) {
+ ctx, serverEP := newServer(ctx)
+ mp := runtime.GOMAXPROCS(numCPUs)
+ defer runtime.GOMAXPROCS(mp)
+ internal.CallEchoStream(b, ctx, serverEP.Name(), b.N, chunkCnt, payloadSize, benchmark.NewStats(1))
+}
+
+// Benchmark for measuring throughput in streaming RPC.
+func benchmarkStreamingRPCThroughput(b *testing.B) {
+ ctx, serverEP := newServer(ctx)
+ mp := runtime.GOMAXPROCS(numCPUs)
+ defer runtime.GOMAXPROCS(mp)
+ internal.CallEchoStream(b, ctx, serverEP.Name(), 1, b.N, bulkPayloadSize, benchmark.NewStats(1))
+}
+
+func msPerRPC(r testing.BenchmarkResult) float64 {
+ return r.T.Seconds() / float64(r.N) * 1000
+}
+
+func rpcPerSec(r testing.BenchmarkResult) float64 {
+ return float64(r.N) / r.T.Seconds()
+}
+func mbPerSec(r testing.BenchmarkResult) float64 {
+ return (float64(r.Bytes) * float64(r.N) / 1e6) / r.T.Seconds()
+}
+
+func runBenchmarks() {
+ r := testing.Benchmark(benchmarkRPCConnection)
+ fmt.Printf("RPC Connection\t%.2f ms/rpc\n", msPerRPC(r))
+
+ master, err := ibe.SetupBB1()
+ if err != nil {
+ ctx.Fatalf("ibe.SetupBB1 failed: %v", err)
+ }
+ root := bcrypter.NewRoot("root", master)
+ clientBlessing := "root:alice:client"
+
+ // Attach a crypter to the context, and add a blessing private
+ // key to the for 'clientBlesing'.
+ crypter := bcrypter.NewCrypter()
+ cctx := bcrypter.WithCrypter(ctx, crypter)
+ key, err := root.Extract(ctx, clientBlessing)
+ if err != nil {
+ ctx.Fatalf("could not extract private key: %v", err)
+ }
+ if err := crypter.AddKey(cctx, key); err != nil {
+ ctx.Fatalf("could not add key to crypter: %v", err)
+ }
+
+ serverAuthPatterns := [][]security.BlessingPattern{
+ []security.BlessingPattern{"root:alice"},
+ []security.BlessingPattern{"root:bob:friend", "root:carol:friend", "root:alice:client"},
+ []security.BlessingPattern{"root:bob:spouse", "root:bob:enemy", "root:carol:spouse", "root:carol:enemy", "root:alice:client:$"},
+ }
+ for _, serverAuth := range serverAuthPatterns {
+ r = testing.Benchmark(benchmarkPrivateRPCConnection(cctx, serverAuth, clientBlessing))
+ fmt.Printf("Private RPC Connection with server authorization policy %v and client blessing %v \t%.2f ms/rpc\n", serverAuth, clientBlessing, msPerRPC(r))
+ }
+
+ // Create a connection to exclude the setup time from the following benchmarks.
+ ctx, serverEP := newServer(ctx)
+ internal.CallEcho(&testing.B{}, ctx, serverEP.Name(), 1, 0, benchmark.NewStats(1))
+
+ r = testing.Benchmark(benchmarkRPC)
+ fmt.Printf("RPC (echo %vB)\t%.2f ms/rpc (%.2f qps)\n", payloadSize, msPerRPC(r), rpcPerSec(r))
+
+ r = testing.Benchmark(benchmarkStreamingRPC)
+ fmt.Printf("RPC Streaming (echo %vB)\t%.2f ms/rpc\n", payloadSize, msPerRPC(r)/chunkCnt)
+
+ r = testing.Benchmark(benchmarkStreamingRPCThroughput)
+ fmt.Printf("RPC Streaming Throughput (echo %vMB)\t%.2f MB/s\n", bulkPayloadSize/1e6, mbPerSec(r))
+}
+
+func realMain() {
+ // Set the default benchmark time.
+ flag.Set("test.benchtime", defaultBenchTime.String())
+
+ var shutdown v23.Shutdown
+ ctx, shutdown = test.V23Init()
+ defer shutdown()
+
+ runBenchmarks()
+}
diff --git a/runtime/internal/rpc/benchmark/simple/main.go b/runtime/internal/rpc/benchmark/simple/main.go
index 9da3d9e..19e4ac1 100644
--- a/runtime/internal/rpc/benchmark/simple/main.go
+++ b/runtime/internal/rpc/benchmark/simple/main.go
@@ -2,207 +2,10 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+// +build !android
+
package main
-import (
- "flag"
- "fmt"
- "runtime"
- "testing"
- "time"
-
- "v.io/v23"
- "v.io/v23/context"
- "v.io/v23/naming"
- "v.io/v23/options"
- "v.io/v23/rpc"
- "v.io/v23/security"
- "v.io/x/lib/ibe"
- "v.io/x/ref/lib/security/bcrypter"
- "v.io/x/ref/lib/security/securityflag"
- _ "v.io/x/ref/runtime/factories/roaming"
- "v.io/x/ref/runtime/internal/flow/flowtest"
- fmanager "v.io/x/ref/runtime/internal/flow/manager"
- "v.io/x/ref/runtime/internal/rpc/benchmark/internal"
- "v.io/x/ref/test"
- "v.io/x/ref/test/benchmark"
- "v.io/x/ref/test/testutil"
-)
-
-const (
- payloadSize = 1000
- chunkCnt = 10000
-
- bulkPayloadSize = 1000000
-
- numCPUs = 2
- defaultBenchTime = 5 * time.Second
-)
-
-var ctx *context.T
-
-type benchFun func(b *testing.B)
-
-func newServer(ctx *context.T, opts ...rpc.ServerOpt) (*context.T, naming.Endpoint) {
- ctx, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie(), opts...)
- if err != nil {
- ctx.Fatalf("NewServer failed: %v", err)
- }
- return ctx, server.Status().Endpoints[0]
-}
-
-// Benchmark for measuring RPC connection time including authentication.
-//
-// rpc.Client doesn't export an interface for closing connection. So we
-// use the stream manager directly here.
-func benchmarkRPCConnection(b *testing.B) {
- mp := runtime.GOMAXPROCS(numCPUs)
- defer runtime.GOMAXPROCS(mp)
-
- ctx, serverEP := newServer(ctx)
-
- principal := testutil.NewPrincipal("test")
- nctx, _ := v23.WithPrincipal(ctx, principal)
-
- b.StopTimer()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- mctx, cancel := context.WithCancel(nctx)
- m := fmanager.New(mctx, naming.FixedRoutingID(0xc), nil, 0)
- b.StartTimer()
- _, err := m.Dial(mctx, serverEP, flowtest.AllowAllPeersAuthorizer{}, 0)
- if err != nil {
- ctx.Fatalf("Dial failed: %v", err)
- }
- b.StopTimer()
- cancel()
- <-m.Closed()
- }
-}
-
-// Benchmark for measuring RPC connection time when using private mutual
-// authentication. 'serverAuth' is the authorization policy used by the
-// server while revealing its blessings, and 'clientBlessing' is the blessing
-// used by the client.
-//
-// The specific protocol being benchmarked is Protocol 3 from the doc:
-// https://docs.google.com/document/d/1FpLJSiKy4sXxRUSZh1BQrhUEn7io-dGW7y-DMszI21Q/edit
-func benchmarkPrivateRPCConnection(ctx *context.T, serverAuth []security.BlessingPattern, clientBlessing string) benchFun {
- return func(b *testing.B) {
- mp := runtime.GOMAXPROCS(numCPUs)
- defer runtime.GOMAXPROCS(mp)
-
- ctx, privateServerEP := newServer(ctx, options.ServerPeers(serverAuth))
-
- principal := testutil.NewPrincipal(clientBlessing)
- nctx, _ := v23.WithPrincipal(ctx, principal)
-
- b.StopTimer()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- mctx, cancel := context.WithCancel(nctx)
- m := fmanager.New(mctx, naming.FixedRoutingID(0xc), nil, 0)
- b.StartTimer()
- _, err := m.Dial(nctx, privateServerEP, flowtest.AllowAllPeersAuthorizer{}, 0)
- if err != nil {
- ctx.Fatalf("Dial failed: %v", err)
- }
- b.StopTimer()
- cancel()
- <-m.Closed()
- }
- }
-}
-
-// Benchmark for non-streaming RPC.
-func benchmarkRPC(b *testing.B) {
- ctx, serverEP := newServer(ctx)
- mp := runtime.GOMAXPROCS(numCPUs)
- defer runtime.GOMAXPROCS(mp)
- internal.CallEcho(b, ctx, serverEP.Name(), b.N, payloadSize, benchmark.NewStats(1))
-}
-
-// Benchmark for streaming RPC.
-func benchmarkStreamingRPC(b *testing.B) {
- ctx, serverEP := newServer(ctx)
- mp := runtime.GOMAXPROCS(numCPUs)
- defer runtime.GOMAXPROCS(mp)
- internal.CallEchoStream(b, ctx, serverEP.Name(), b.N, chunkCnt, payloadSize, benchmark.NewStats(1))
-}
-
-// Benchmark for measuring throughput in streaming RPC.
-func benchmarkStreamingRPCThroughput(b *testing.B) {
- ctx, serverEP := newServer(ctx)
- mp := runtime.GOMAXPROCS(numCPUs)
- defer runtime.GOMAXPROCS(mp)
- internal.CallEchoStream(b, ctx, serverEP.Name(), 1, b.N, bulkPayloadSize, benchmark.NewStats(1))
-}
-
-func msPerRPC(r testing.BenchmarkResult) float64 {
- return r.T.Seconds() / float64(r.N) * 1000
-}
-
-func rpcPerSec(r testing.BenchmarkResult) float64 {
- return float64(r.N) / r.T.Seconds()
-}
-func mbPerSec(r testing.BenchmarkResult) float64 {
- return (float64(r.Bytes) * float64(r.N) / 1e6) / r.T.Seconds()
-}
-
-func runBenchmarks() {
- r := testing.Benchmark(benchmarkRPCConnection)
- fmt.Printf("RPC Connection\t%.2f ms/rpc\n", msPerRPC(r))
-
- master, err := ibe.SetupBB1()
- if err != nil {
- ctx.Fatalf("ibe.SetupBB1 failed: %v", err)
- }
- root := bcrypter.NewRoot("root", master)
- clientBlessing := "root:alice:client"
-
- // Attach a crypter to the context, and add a blessing private
- // key to the for 'clientBlesing'.
- crypter := bcrypter.NewCrypter()
- cctx := bcrypter.WithCrypter(ctx, crypter)
- key, err := root.Extract(ctx, clientBlessing)
- if err != nil {
- ctx.Fatalf("could not extract private key: %v", err)
- }
- if err := crypter.AddKey(cctx, key); err != nil {
- ctx.Fatalf("could not add key to crypter: %v", err)
- }
-
- serverAuthPatterns := [][]security.BlessingPattern{
- []security.BlessingPattern{"root:alice"},
- []security.BlessingPattern{"root:bob:friend", "root:carol:friend", "root:alice:client"},
- []security.BlessingPattern{"root:bob:spouse", "root:bob:enemy", "root:carol:spouse", "root:carol:enemy", "root:alice:client:$"},
- }
- for _, serverAuth := range serverAuthPatterns {
- r = testing.Benchmark(benchmarkPrivateRPCConnection(cctx, serverAuth, clientBlessing))
- fmt.Printf("Private RPC Connection with server authorization policy %v and client blessing %v \t%.2f ms/rpc\n", serverAuth, clientBlessing, msPerRPC(r))
- }
-
- // Create a connection to exclude the setup time from the following benchmarks.
- ctx, serverEP := newServer(ctx)
- internal.CallEcho(&testing.B{}, ctx, serverEP.Name(), 1, 0, benchmark.NewStats(1))
-
- r = testing.Benchmark(benchmarkRPC)
- fmt.Printf("RPC (echo %vB)\t%.2f ms/rpc (%.2f qps)\n", payloadSize, msPerRPC(r), rpcPerSec(r))
-
- r = testing.Benchmark(benchmarkStreamingRPC)
- fmt.Printf("RPC Streaming (echo %vB)\t%.2f ms/rpc\n", payloadSize, msPerRPC(r)/chunkCnt)
-
- r = testing.Benchmark(benchmarkStreamingRPCThroughput)
- fmt.Printf("RPC Streaming Throughput (echo %vMB)\t%.2f MB/s\n", bulkPayloadSize/1e6, mbPerSec(r))
-}
-
func main() {
- // Set the default benchmark time.
- flag.Set("test.benchtime", defaultBenchTime.String())
-
- var shutdown v23.Shutdown
- ctx, shutdown = test.V23Init()
- defer shutdown()
-
- runBenchmarks()
+ realMain()
}
diff --git a/runtime/internal/rpc/benchmark/simple/main_android.go b/runtime/internal/rpc/benchmark/simple/main_android.go
new file mode 100644
index 0000000..9d8fe52
--- /dev/null
+++ b/runtime/internal/rpc/benchmark/simple/main_android.go
@@ -0,0 +1,62 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build android
+
+// Android "app" to run the RPC benchmarks.
+//
+// Usage: See run-android.sh
+package main
+
+import (
+ "time"
+
+ "golang.org/x/mobile/app"
+ "golang.org/x/mobile/event/lifecycle"
+ "golang.org/x/mobile/event/paint"
+ "golang.org/x/mobile/gl"
+)
+
+func main() {
+ done := make(chan struct{})
+ go func() {
+ realMain()
+ close(done)
+ }()
+ app.Main(func(a app.App) {
+ var glctx gl.Context
+ ticks := time.Tick(time.Second / 2)
+ black := false
+ for {
+ select {
+ case <-done:
+ done = nil
+ a.Send(paint.Event{})
+ case <-ticks:
+ black = !black
+ a.Send(paint.Event{})
+ case e := <-a.Events():
+ switch e := a.Filter(e).(type) {
+ case lifecycle.Event:
+ glctx, _ = e.DrawContext.(gl.Context)
+ case paint.Event:
+ if glctx == nil {
+ continue
+ }
+ // solid green success
+ // flashing red/blue: working
+ if done == nil {
+ glctx.ClearColor(0, 1, 0, 1)
+ } else if black {
+ glctx.ClearColor(0, 0, 0, 1)
+ } else {
+ glctx.ClearColor(0, 0, 1, 1)
+ }
+ glctx.Clear(gl.COLOR_BUFFER_BIT)
+ a.Publish()
+ }
+ }
+ }
+ })
+}
diff --git a/runtime/internal/rpc/benchmark/simple/run-android.sh b/runtime/internal/rpc/benchmark/simple/run-android.sh
new file mode 100755
index 0000000..694d152
--- /dev/null
+++ b/runtime/internal/rpc/benchmark/simple/run-android.sh
@@ -0,0 +1,16 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+set -e
+set -x
+
+[ -z "${GOPATH}" ] && echo "Must set GOPATH, for example: export GOPATH=\$HOME/go so that gomobile can be installed there" && exit 1
+go get golang.org/x/mobile/cmd/gomobile
+GOMOBILE=${GOPATH}/bin/gomobile
+jiri run ${GOMOBILE} build v.io/x/ref/runtime/internal/rpc/benchmark/simple
+adb install -r ./simple.apk
+echo "Start the v23rpc app on the phone connected to adb"
+adb logcat *:S GoLog:*
+
diff --git a/runtime/internal/rpc/protocols/tcp/init.go b/runtime/internal/rpc/protocols/tcp/init.go
deleted file mode 100644
index a78117f..0000000
--- a/runtime/internal/rpc/protocols/tcp/init.go
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package tcp
-
-import (
- "fmt"
- "net"
- "time"
-
- "v.io/v23/context"
- "v.io/v23/rpc"
-
- "v.io/x/ref/runtime/internal/lib/tcputil"
-)
-
-func init() {
- rpc.RegisterProtocol("tcp", tcpDial, tcpResolve, tcpListen, "tcp4", "tcp6")
- rpc.RegisterProtocol("tcp4", tcpDial, tcpResolve, tcpListen)
- rpc.RegisterProtocol("tcp6", tcpDial, tcpResolve, tcpListen)
-}
-
-func tcpDial(ctx *context.T, network, address string, timeout time.Duration) (net.Conn, error) {
- conn, err := net.DialTimeout(network, address, timeout)
- if err != nil {
- return nil, err
- }
- if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
- return nil, err
- }
- return conn, nil
-}
-
-// tcpResolve performs a DNS resolution on the provided network and address.
-func tcpResolve(ctx *context.T, network, address string) (string, string, error) {
- tcpAddr, err := net.ResolveTCPAddr(network, address)
- if err != nil {
- return "", "", err
- }
- return tcpAddr.Network(), tcpAddr.String(), nil
-}
-
-// tcpListen returns a listener that sets KeepAlive on all accepted connections.
-func tcpListen(ctx *context.T, network, laddr string) (net.Listener, error) {
- ln, err := net.Listen(network, laddr)
- if err != nil {
- return nil, err
- }
- return &tcpListener{ln}, nil
-}
-
-// tcpListener is a wrapper around net.Listener that sets KeepAlive on all
-// accepted connections.
-type tcpListener struct {
- netLn net.Listener
-}
-
-func (ln *tcpListener) Accept() (net.Conn, error) {
- conn, err := ln.netLn.Accept()
- if err != nil {
- return nil, err
- }
- if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
- return nil, fmt.Errorf("Failed to enable TCP keep alive: %v", err)
- }
- return conn, nil
-}
-
-func (ln *tcpListener) Close() error {
- return ln.netLn.Close()
-}
-
-func (ln *tcpListener) Addr() net.Addr {
- return ln.netLn.Addr()
-}
diff --git a/runtime/internal/rpc/protocols/ws/init.go b/runtime/internal/rpc/protocols/ws/init.go
deleted file mode 100644
index 5aac575..0000000
--- a/runtime/internal/rpc/protocols/ws/init.go
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package websocket
-
-import (
- "v.io/v23/rpc"
-
- "v.io/x/ref/runtime/internal/lib/websocket"
-)
-
-func init() {
- // ws, ws4, ws6 represent websocket protocol instances.
- rpc.RegisterProtocol("ws", websocket.Dial, websocket.Resolve, websocket.Listener, "ws4", "ws6")
- rpc.RegisterProtocol("ws4", websocket.Dial, websocket.Resolve, websocket.Listener)
- rpc.RegisterProtocol("ws6", websocket.Dial, websocket.Resolve, websocket.Listener)
-}
diff --git a/runtime/internal/rpc/protocols/wsh/init.go b/runtime/internal/rpc/protocols/wsh/init.go
deleted file mode 100644
index 26cc680..0000000
--- a/runtime/internal/rpc/protocols/wsh/init.go
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package wsh registers the websocket 'hybrid' protocol.
-// We prefer to use tcp whenever we can to avoid the overhead of websockets.
-package wsh
-
-import (
- "v.io/v23/rpc"
-
- "v.io/x/ref/runtime/internal/lib/websocket"
-)
-
-func init() {
- rpc.RegisterProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener, "tcp4", "tcp6", "ws4", "ws6")
- rpc.RegisterProtocol("wsh4", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener, "tcp4", "ws4")
- rpc.RegisterProtocol("wsh6", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener, "tcp6", "ws6")
-}
diff --git a/runtime/internal/rpc/protocols/wsh_nacl/init.go b/runtime/internal/rpc/protocols/wsh_nacl/init.go
deleted file mode 100644
index 276a567..0000000
--- a/runtime/internal/rpc/protocols/wsh_nacl/init.go
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package wsh_nacl registers the websocket 'hybrid' protocol for nacl
-// architectures.
-package wsh_nacl
-
-import (
- "v.io/v23/rpc"
-
- "v.io/x/ref/runtime/internal/lib/websocket"
-)
-
-func init() {
- // We limit wsh to ws since in general nacl does not allow direct access
- // to TCP/UDP networking.
- rpc.RegisterProtocol("wsh", websocket.Dial, websocket.Resolve, websocket.Listener, "ws4", "ws6")
- rpc.RegisterProtocol("wsh4", websocket.Dial, websocket.Resolve, websocket.Listener, "ws4")
- rpc.RegisterProtocol("wsh6", websocket.Dial, websocket.Resolve, websocket.Listener, "ws6")
-}
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index 9857d95..c064de0 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -6,7 +6,6 @@
import (
"net"
- "sort"
"strings"
"testing"
@@ -156,12 +155,3 @@
}
return addrs
}
-
-func endpointToStrings(eps []naming.Endpoint) []string {
- r := []string{}
- for _, ep := range eps {
- r = append(r, ep.String())
- }
- sort.Strings(r)
- return r
-}
diff --git a/runtime/internal/rpc/server_authorizer.go b/runtime/internal/rpc/server_authorizer.go
index 9dedfce..b84a10d 100644
--- a/runtime/internal/rpc/server_authorizer.go
+++ b/runtime/internal/rpc/server_authorizer.go
@@ -71,17 +71,3 @@
}
return nil
}
-
-func canCreateServerAuthorizer(ctx *context.T, opts []rpc.CallOpt) error {
- policy := false
- for _, o := range opts {
- switch o.(type) {
- case options.ServerAuthorizer:
- if policy {
- return verror.New(errMultipleAuthorizationPolicies, ctx)
- }
- policy = true
- }
- }
- return nil
-}
diff --git a/runtime/internal/rpc/test/cancel_test.go b/runtime/internal/rpc/test/cancel_test.go
index 69edbcb..9a97291 100644
--- a/runtime/internal/rpc/test/cancel_test.go
+++ b/runtime/internal/rpc/test/cancel_test.go
@@ -299,21 +299,6 @@
}
func registerDisProtocol(wrap string, conns chan disconnect) {
- dial, resolve, listen, protonames := rpc.RegisteredProtocol(wrap)
- rpc.RegisterProtocol("dis", func(ctx *context.T, p, a string, t time.Duration) (net.Conn, error) {
- conn, err := dial(ctx, protonames[0], a, t)
- if err == nil {
- dc := &disConn{Conn: conn}
- conns <- dc
- conn = dc
- }
- return conn, err
- }, func(ctx *context.T, protocol, address string) (string, string, error) {
- _, a, err := resolve(ctx, protonames[0], address)
- return "dis", a, err
- }, func(ctx *context.T, protocol, address string) (net.Listener, error) {
- return listen(ctx, protonames[0], address)
- })
// We only register this flow protocol to make the test work in xclients mode.
protocol, _ := flow.RegisteredProtocol("tcp")
flow.RegisterProtocol("dis", &flowdis{base: protocol})
diff --git a/runtime/internal/rpc/test/client_test.go b/runtime/internal/rpc/test/client_test.go
index a6932ca..eefb38e 100644
--- a/runtime/internal/rpc/test/client_test.go
+++ b/runtime/internal/rpc/test/client_test.go
@@ -396,10 +396,6 @@
// logErr("timeout to server", err)
}
-func simpleResolver(ctx *context.T, network, address string) (string, string, error) {
- return network, address, nil
-}
-
type closeConn struct {
ctx *context.T
flow.Conn
diff --git a/runtime/internal/rpc/testutil_test.go b/runtime/internal/rpc/testutil_test.go
index e08d80a..6acff51 100644
--- a/runtime/internal/rpc/testutil_test.go
+++ b/runtime/internal/rpc/testutil_test.go
@@ -5,8 +5,6 @@
package rpc
import (
- "reflect"
- "testing"
"time"
"v.io/v23"
@@ -14,69 +12,12 @@
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/vdl"
- "v.io/v23/verror"
"v.io/v23/vtrace"
"v.io/x/ref/lib/flags"
ivtrace "v.io/x/ref/runtime/internal/vtrace"
"v.io/x/ref/test"
)
-func makeResultPtrs(ins []interface{}) []interface{} {
- outs := make([]interface{}, len(ins))
- for ix, in := range ins {
- typ := reflect.TypeOf(in)
- if typ == nil {
- // Nil indicates interface{}.
- var empty interface{}
- typ = reflect.ValueOf(&empty).Elem().Type()
- }
- outs[ix] = reflect.New(typ).Interface()
- }
- return outs
-}
-
-func checkResultPtrs(t *testing.T, name string, gotptrs, want []interface{}) {
- for ix, res := range gotptrs {
- got := reflect.ValueOf(res).Elem().Interface()
- want := want[ix]
- switch g := got.(type) {
- case verror.E:
- w, ok := want.(verror.E)
- // don't use reflect deep equal on verror's since they contain
- // a list of stack PCs which will be different.
- if !ok {
- t.Errorf("%s result %d got type %T, want %T", name, ix, g, w)
- }
- if verror.ErrorID(g) != w.ID {
- t.Errorf("%s result %d got %v, want %v", name, ix, g, w)
- }
- default:
- if !reflect.DeepEqual(got, want) {
- t.Errorf("%s result %d got %v, want %v", name, ix, got, want)
- }
- }
-
- }
-}
-
-func mkCaveat(cav security.Caveat, err error) security.Caveat {
- if err != nil {
- panic(err)
- }
- return cav
-}
-
-func bless(blesser, blessed security.Principal, extension string, caveats ...security.Caveat) security.Blessings {
- if len(caveats) == 0 {
- caveats = append(caveats, security.UnconstrainedUse())
- }
- b, err := blesser.Bless(blessed.PublicKey(), blesser.BlessingStore().Default(), extension, caveats[0], caveats[1:]...)
- if err != nil {
- panic(err)
- }
- return b
-}
-
func initForTest() (*context.T, v23.Shutdown) {
ctx, shutdown := test.V23Init()
ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{})
@@ -87,14 +28,6 @@
return ctx, shutdown
}
-func mkThirdPartyCaveat(discharger security.PublicKey, location string, c security.Caveat) security.Caveat {
- tpc, err := security.NewPublicKeyCaveat(discharger, location, security.ThirdPartyRequirements{}, c)
- if err != nil {
- panic(err)
- }
- return tpc
-}
-
// mockCall implements security.Call
type mockCall struct {
p security.Principal
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 54b41af..a2a14b3 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -78,6 +78,7 @@
preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
servesMountTable bool
isLeaf bool
+ lameDuckTimeout time.Duration // the time to wait for inflight operations to finish on shutdown
stats *rpcStats // stats for this server.
}
@@ -126,6 +127,7 @@
state: rpc.ServerActive,
endpoints: make(map[string]*inaming.Endpoint),
lnErrors: make(map[struct{ Protocol, Address string }]error),
+ lameDuckTimeout: 5 * time.Second,
}
channelTimeout := time.Duration(0)
var authorizedPeers []security.BlessingPattern
@@ -141,6 +143,8 @@
s.preferredProtocols = []string(opt)
case options.ChannelTimeout:
channelTimeout = time.Duration(opt)
+ case options.LameDuckTimeout:
+ s.lameDuckTimeout = time.Duration(opt)
case options.ServerPeers:
authorizedPeers = []security.BlessingPattern(opt)
if len(authorizedPeers) == 0 {
@@ -205,7 +209,7 @@
select {
case <-done:
- case <-time.After(5 * time.Second): // TODO(mattr): This should be configurable.
+ case <-time.After(s.lameDuckTimeout):
s.ctx.Errorf("%s: Timed out waiting for active requests to complete", serverDebug)
}
// Now we cancel the root context which closes all the connections
diff --git a/runtime/internal/lib/framer/errors.vdl b/runtime/protocols/lib/framer/errors.vdl
similarity index 100%
rename from runtime/internal/lib/framer/errors.vdl
rename to runtime/protocols/lib/framer/errors.vdl
diff --git a/runtime/internal/lib/framer/errors.vdl.go b/runtime/protocols/lib/framer/errors.vdl.go
similarity index 79%
rename from runtime/internal/lib/framer/errors.vdl.go
rename to runtime/protocols/lib/framer/errors.vdl.go
index 2da6252..1c077bf 100644
--- a/runtime/internal/lib/framer/errors.vdl.go
+++ b/runtime/protocols/lib/framer/errors.vdl.go
@@ -15,7 +15,7 @@
)
var (
- ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/lib/framer.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+ ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/protocols/lib/framer.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
)
func init() {
diff --git a/runtime/internal/lib/framer/framer.go b/runtime/protocols/lib/framer/framer.go
similarity index 100%
rename from runtime/internal/lib/framer/framer.go
rename to runtime/protocols/lib/framer/framer.go
diff --git a/runtime/internal/lib/framer/framer_test.go b/runtime/protocols/lib/framer/framer_test.go
similarity index 100%
rename from runtime/internal/lib/framer/framer_test.go
rename to runtime/protocols/lib/framer/framer_test.go
diff --git a/services/cluster/vkube/vkube_v23_test.go b/services/cluster/vkube/vkube_v23_test.go
index 6db39e6..4da6460 100644
--- a/services/cluster/vkube/vkube_v23_test.go
+++ b/services/cluster/vkube/vkube_v23_test.go
@@ -65,16 +65,17 @@
// Note, creds do not affect non-Vanadium commands.
c := sh.Cmd(name, args...).WithCredentials(creds)
c.ExitErrorIsOk = true
- prefix := textutil.PrefixLineWriter(os.Stdout, filepath.Base(name)+"> ")
- c.AddStdoutWriter(gosh.NopWriteCloser(prefix))
- stdout := c.Stdout()
- prefix.Flush()
+ plw := textutil.PrefixLineWriter(os.Stdout, filepath.Base(name)+"> ")
+ c.AddStdoutWriter(gosh.NopWriteCloser(plw))
+ c.AddStderrWriter(gosh.NopWriteCloser(plw))
+ output := c.CombinedOutput()
+ plw.Flush()
if expectSuccess && c.Err != nil {
t.Error(testutil.FormatLogLine(2, "Unexpected failure: %s %s :%v", name, strings.Join(args, " "), c.Err))
} else if !expectSuccess && c.Err == nil {
t.Error(testutil.FormatLogLine(2, "Unexpected success %d: %s %s", name, strings.Join(args, " ")))
}
- return stdout
+ return output
}
}
gsutil = cmd("gsutil", true)
@@ -145,7 +146,13 @@
// Find the pod running tunneld, get the server's addr from its stdout.
podName := kubectlOK("get", "pod", "-l", "application=tunneld", "--template={{range .items}}{{.metadata.name}}{{end}}")
- addr := strings.TrimPrefix(strings.TrimSpace(kubectlOK("logs", podName, "-c", "tunneld")), "NAME=")
+ var addr string
+ for _, log := range strings.Split(kubectlOK("logs", podName, "-c", "tunneld"), "\n") {
+ if strings.HasPrefix(log, "NAME=") {
+ addr = strings.TrimPrefix(log, "NAME=")
+ break
+ }
+ }
if got, expected := vshOK(addr, "echo", "hello", "world"), "hello world\n"; got != expected {
t.Errorf("Unexpected output. Got %q, expected %q", got, expected)
}
diff --git a/services/mounttable/mounttablelib/mounttable_test.go b/services/mounttable/mounttablelib/mounttable_test.go
index 8cd9e99..024e98a 100644
--- a/services/mounttable/mounttablelib/mounttable_test.go
+++ b/services/mounttable/mounttablelib/mounttable_test.go
@@ -5,7 +5,6 @@
package mounttablelib_test
import (
- "encoding/json"
"errors"
"fmt"
"io"
@@ -119,10 +118,6 @@
return names
}
-func strslice(strs ...string) []string {
- return strs
-}
-
func resolve(ctx *context.T, name string) (*naming.MountEntry, error) {
// Resolve the name one level.
var entry naming.MountEntry
@@ -558,24 +553,6 @@
}
}
-func getUserNodeCounts(t *testing.T) (counts map[string]int32) {
- s, err := libstats.Value("mounttable/num-nodes-per-user")
- if err != nil {
- boom(t, "Can't get mounttable statistics")
- }
- // This string is a json encoded map. Decode.
- switch v := s.(type) {
- default:
- boom(t, "Wrong type for mounttable statistics")
- case string:
- err = json.Unmarshal([]byte(v), &counts)
- if err != nil {
- boom(t, "Can't unmarshal mounttable statistics")
- }
- }
- return
-}
-
func TestGlobAccessLists(t *testing.T) {
rootCtx, aliceCtx, bobCtx, shutdown := initTest()
defer shutdown()
diff --git a/services/mounttable/mounttablelib/neighborhood_test.go b/services/mounttable/mounttablelib/neighborhood_test.go
index fbf343d..645c9e7 100644
--- a/services/mounttable/mounttablelib/neighborhood_test.go
+++ b/services/mounttable/mounttablelib/neighborhood_test.go
@@ -19,14 +19,6 @@
"v.io/x/ref/test"
)
-func protocolAndAddress(e naming.Endpoint) (string, string, error) {
- addr := e.Addr()
- if addr == nil {
- return "", "", fmt.Errorf("failed to get address")
- }
- return addr.Network(), addr.String(), nil
-}
-
type stopper interface {
Stop()
}
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 46d1a76..627289d 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -441,25 +441,3 @@
}
return svc, iSt, cleanup
}
-
-func testIfSgPfxsEqual(t *testing.T, m map[string]sgSet, a []string) {
- aMap := arrToMap(a)
-
- if len(aMap) != len(m) {
- t.Fatalf("testIfSgPfxsEqual diff lengths: got %v, want %v", aMap, m)
- }
-
- for p := range aMap {
- if _, ok := m[p]; !ok {
- t.Fatalf("testIfSgPfxsEqual: want %v", p)
- }
- }
-}
-
-func arrToMap(a []string) map[string]struct{} {
- m := make(map[string]struct{})
- for _, s := range a {
- m[s] = struct{}{}
- }
- return m
-}
diff --git a/services/wspr/browsprd/main_nacl.go b/services/wspr/browsprd/main_nacl.go
index 81c7865..b4b705b 100644
--- a/services/wspr/browsprd/main_nacl.go
+++ b/services/wspr/browsprd/main_nacl.go
@@ -22,7 +22,6 @@
"v.io/x/ref/internal/logger"
vsecurity "v.io/x/ref/lib/security"
_ "v.io/x/ref/runtime/factories/chrome"
- "v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
"v.io/x/ref/services/wspr/internal/app"
"v.io/x/ref/services/wspr/internal/browspr"
@@ -63,7 +62,6 @@
browsprInst.initFileSystem()
// Give the websocket interface the ppapi instance.
- websocket.PpapiInstance = inst
xwebsocket.PpapiInstance = inst
// Set up the channel and register start rpc handler.
diff --git a/services/wspr/wsprlib/wspr.go b/services/wspr/wsprlib/wspr.go
index 4622136..b5d57d4 100644
--- a/services/wspr/wsprlib/wspr.go
+++ b/services/wspr/wsprlib/wspr.go
@@ -8,10 +8,8 @@
package wsprlib
import (
- "bytes"
"crypto/tls"
"fmt"
- "io"
"net"
"net/http"
"sync"
@@ -44,16 +42,6 @@
pipes map[*http.Request]*pipe
}
-func readFromRequest(r *http.Request) (*bytes.Buffer, error) {
- var buf bytes.Buffer
- if readBytes, err := io.Copy(&buf, r.Body); err != nil {
- return nil, fmt.Errorf("error copying message out of request: %v", err)
- } else if wantBytes := r.ContentLength; readBytes != wantBytes {
- return nil, fmt.Errorf("read %d bytes, wanted %d", readBytes, wantBytes)
- }
- return &buf, nil
-}
-
// Starts listening for requests and returns the network endpoint address.
func (wspr *WSPR) Listen() net.Addr {
addr := fmt.Sprintf("127.0.0.1:%d", wspr.httpPort)
diff --git a/test/modules/shell.go b/test/modules/shell.go
index c34d3b1..ededae2 100644
--- a/test/modules/shell.go
+++ b/test/modules/shell.go
@@ -133,7 +133,6 @@
"os"
"path/filepath"
"sync"
- "syscall"
"time"
"v.io/x/lib/envvar"
@@ -259,18 +258,6 @@
return c.path
}
-func dup(conn *os.File) (int, error) {
- syscall.ForkLock.RLock()
- fd, err := syscall.Dup(int(conn.Fd()))
- if err != nil {
- syscall.ForkLock.RUnlock()
- return -1, err
- }
- syscall.CloseOnExec(fd)
- syscall.ForkLock.RUnlock()
- return fd, nil
-}
-
// NewCustomCredentials creates a new Principal for StartWithOpts.
// Returns nil if the shell is not managing principals.
func (sh *Shell) NewCustomCredentials() (cred *CustomCredentials, err error) {
diff --git a/test/modules/util.go b/test/modules/util.go
index 3469fbd..97436eb 100644
--- a/test/modules/util.go
+++ b/test/modules/util.go
@@ -11,10 +11,7 @@
"io/ioutil"
"os"
- "v.io/v23/security"
-
"v.io/x/ref/internal/logger"
- vsecurity "v.io/x/ref/lib/security"
)
func newLogfile(prefix, name string) (*os.File, error) {
@@ -41,21 +38,3 @@
io.Copy(out, f)
f.Close()
}
-
-func principalFromDir(dir string) (security.Principal, error) {
- p, err := vsecurity.LoadPersistentPrincipal(dir, nil)
- if err == nil {
- return p, nil
- }
- if !os.IsNotExist(err) {
- return nil, err
- }
- p, err = vsecurity.CreatePersistentPrincipal(dir, nil)
- if err != nil {
- return nil, err
- }
- if err := vsecurity.InitDefaultBlessings(p, shellBlessingExtension); err != nil {
- return nil, err
- }
- return p, nil
-}
diff --git a/test/timekeeper/manual_time_test.go b/test/timekeeper/manual_time_test.go
index abb3546..307d69d 100644
--- a/test/timekeeper/manual_time_test.go
+++ b/test/timekeeper/manual_time_test.go
@@ -96,26 +96,24 @@
<-c
}
-func testBlocking(t *testing.T) {
+func TestBlocking(t *testing.T) {
mt := NewManualTime()
sync := make(chan bool)
go func() {
// Simulate blocking on a timer.
<-mt.After(10 * time.Second)
- <-mt.After(11 * time.Second)
- <-mt.After(3 * time.Second)
+ <-mt.After(2 * time.Second)
sync <- true
<-mt.After(4 * time.Second)
sync <- true
}()
- <-mt.Requests()
- <-mt.Requests()
- mt.AdvanceTime(12 * time.Second)
- <-mt.Requests()
- mt.AdvanceTime(2 * time.Second)
+ <-mt.Requests() // 10
+ mt.AdvanceTime(11 * time.Second)
+ <-mt.Requests() // 2
+ mt.AdvanceTime(3 * time.Second)
mt.AdvanceTime(time.Second)
<-sync
- <-mt.Requests()
+ <-mt.Requests() // 4
mt.AdvanceTime(5 * time.Second)
<-sync
}