Merge branch 'master' into proxyflow
diff --git a/.gerrit_commit_message b/.gerrit_commit_message
new file mode 100644
index 0000000..beb8086
--- /dev/null
+++ b/.gerrit_commit_message
@@ -0,0 +1,22 @@
+DO NOT SUBMIT: RPC2 Experiment.
+
+This change implements a prototype POC for RPC2.
+
+All new implementation is prefixed with X and we did not change any existing
+code.
+
+Doc: https://docs.google.com/a/google.com/document/d/11NlwHsNyGg-bZU0-d43s-z0HS9pz9x4QSwpe0M8bukQ/edit?usp=sharing
+
+Main points:
+(1) VIF/VC are unified into one layer. This automatically saves us a round
+ of auth when making a direct connection (not through a proxy)
+(2) When talking through a proxy, a VC is built on top of flows from the client
+ to the proxy and from the proxy to the server. This allows us to reuse the
+ stream manager for the proxy, allowing it to be built with the stream API.
+
+Code:
+x_client.go, x_server.go are mostly identical to client.go and server.go, just
+with different types.
+The real implementation are in xvc.go, x_manager.go, rpc/proxy.go, and x_listener.go.
+
+Change-Id: I650570570d597a27274430e83a5efa118e6923c6
\ No newline at end of file
diff --git a/runtime/internal/lib/bqueue/drrqueue/drrqueue.go b/runtime/internal/lib/bqueue/drrqueue/drrqueue.go
index a42731b..d9820ae 100644
--- a/runtime/internal/lib/bqueue/drrqueue/drrqueue.go
+++ b/runtime/internal/lib/bqueue/drrqueue/drrqueue.go
@@ -230,6 +230,11 @@
}
w.updateStateLocked(true, consumed)
+ ss := "bufs: "
+ for _, b := range bufs {
+ ss += fmt.Sprintf(" %d", b.Size())
+ }
+
return bufs, bufs != nil
}
diff --git a/runtime/internal/lib/iobuf/allocator.go b/runtime/internal/lib/iobuf/allocator.go
index d059564..21d370c 100644
--- a/runtime/internal/lib/iobuf/allocator.go
+++ b/runtime/internal/lib/iobuf/allocator.go
@@ -66,6 +66,14 @@
return a.iobuf.slice(free, base, a.index)
}
+func (a *Allocator) CopyExtra(buf []byte, extra int) *Slice {
+ slice := a.Alloc(uint(len(buf) + extra))
+ copy(slice.Contents, buf)
+ slice.Contents = slice.Contents[:len(buf)]
+ slice.extra = uint(extra)
+ return slice
+}
+
// Copy allocates a Slice and copies the buf into it.
func (a *Allocator) Copy(buf []byte) *Slice {
slice := a.Alloc(uint(len(buf)))
diff --git a/runtime/internal/lib/iobuf/slice.go b/runtime/internal/lib/iobuf/slice.go
index d48a463..1da3940 100644
--- a/runtime/internal/lib/iobuf/slice.go
+++ b/runtime/internal/lib/iobuf/slice.go
@@ -7,8 +7,9 @@
// Slice refers to an iobuf and the byte slice for the actual data.
type Slice struct {
iobuf *buf
- free uint // Free area before base, if any.
- base uint // Index into the underlying iobuf.
+ free uint // Free area before base, if any.
+ base uint // Index into the underlying iobuf.
+ extra uint
Contents []byte // iobuf.Contents[base:bound]
}
@@ -17,6 +18,14 @@
return len(slice.Contents)
}
+func (slice *Slice) Extend(n int) {
+ if int(slice.extra) < n {
+ panic("not enough reserved.")
+ }
+ slice.Contents = slice.Contents[0 : len(slice.Contents)+n]
+ slice.extra -= uint(n)
+}
+
// FreeEntirePrefix sets the free index to zero. Be careful when using this,
// you should ensure that no Slices are using the free region.
func (slice *Slice) FreeEntirePrefix() {
diff --git a/runtime/internal/rpc/proxy/proxy.go b/runtime/internal/rpc/proxy/proxy.go
new file mode 100644
index 0000000..8328311
--- /dev/null
+++ b/runtime/internal/rpc/proxy/proxy.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package proxy
+
+import (
+ "fmt"
+ "io"
+
+ "v.io/x/ref/runtime/internal/lib/iobuf"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/crypto"
+ "v.io/x/ref/runtime/internal/rpc/stream/manager"
+ "v.io/x/ref/runtime/internal/rpc/stream/message"
+
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+ "v.io/x/lib/vlog"
+)
+
+var nullCipher = crypto.NullControlCipher{}
+
+type proxy struct {
+ mgr stream.XManager
+ principal security.Principal
+}
+
+func New(protocol, address string, rid naming.RoutingID, ctx *context.T, principal security.Principal) (*proxy, func(), naming.Endpoint, error) {
+ proxy := &proxy{
+ mgr: manager.XInternalNew(ctx, rid),
+ principal: principal,
+ }
+ ln, ep, err := proxy.mgr.Listen(protocol, address, proxy.principal, proxy.principal.BlessingStore().Default())
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ go proxy.listenLoop(ln)
+ // TODO(suharshs): implement proxy close function.
+ return proxy, func() {}, ep, nil
+}
+
+func (p *proxy) listenLoop(ln stream.XListener) {
+ for {
+ f, err := ln.Accept()
+ if err != nil {
+ vlog.Errorf("ln.Accept failed: %v", err)
+ }
+ go func() {
+ if err := p.processFlow(f); err != nil {
+ vlog.Errorf("processFlow failed: %v", err)
+ }
+ }()
+ }
+}
+
+func (p *proxy) processFlow(f stream.XFlow) error {
+ r := iobuf.NewReader(iobuf.NewPool(0), f)
+ m, err := message.ReadFrom(r, nullCipher)
+ if err != nil {
+ return fmt.Errorf("failed to read message: %v", err)
+ }
+ switch msg := m.(type) {
+ case *message.SetupVC:
+ fout, err := p.mgr.Dial(msg.RemoteEndpoint, p.principal)
+ if err != nil {
+ return fmt.Errorf("p.mgr.Dial: %v", err)
+ }
+ // Forward the setupVC message.
+ if err := message.WriteTo(fout, msg, nullCipher); err != nil {
+ return fmt.Errorf("message.WriteTo(fout): %v", err)
+ }
+ // Disable encryption on the flows since all data sent after the setupVC message
+ // will be encryption between the ends.
+ f.DisableEncryption()
+ fout.DisableEncryption()
+ go p.forwardLoop(f, fout)
+ go p.forwardLoop(fout, f)
+ return nil
+ default:
+ return fmt.Errorf("unexpected message: %v", m)
+ }
+}
+
+func (p *proxy) forwardLoop(fin, fout stream.XFlow) {
+ for {
+ _, err := io.Copy(fin, fout)
+ if err == io.EOF {
+ return
+ } else if err != nil {
+ vlog.Errorf("f.Read failed: %v", err)
+ return
+ }
+ }
+}
diff --git a/runtime/internal/rpc/stream/manager/xlistener.go b/runtime/internal/rpc/stream/manager/xlistener.go
new file mode 100644
index 0000000..a899891
--- /dev/null
+++ b/runtime/internal/rpc/stream/manager/xlistener.go
@@ -0,0 +1,130 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "net"
+
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/vc"
+ iversion "v.io/x/ref/runtime/internal/rpc/version"
+
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+)
+
+type adapter interface {
+ Accept() (stream.XConn, naming.Endpoint, error)
+ Close() error
+}
+
+type xListener struct {
+ m *xmanager
+ q *upcqueue.T
+ principal security.Principal
+ lBlessings security.Blessings
+ ln adapter
+}
+
+func (ln *xListener) acceptLoop(ctx *context.T) {
+ for {
+ conn, lep, err := ln.ln.Accept()
+ if err != nil {
+ vlog.Errorf("netAcceptLoop: %v", err)
+ return
+ }
+ // TODO(suharshs): Get the versions here correctly.
+ vc, err := vc.XNewAccepted(ctx, conn, ln.principal, lep, ln.lBlessings, *iversion.SupportedRange, ln.q)
+ if err != nil {
+ vlog.Errorf("netAcceptLoop: %v", err)
+ return
+ }
+ if err := ln.m.vcCache.Insert(vc); err != nil {
+ vlog.Errorf("m.vcCache.Insert: %v", err)
+ }
+ }
+}
+
+func (ln *xListener) Accept() (stream.XFlow, error) {
+ item, err := ln.q.Get(nil)
+ switch {
+ case err == upcqueue.ErrQueueIsClosed:
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
+ case err != nil:
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
+ default:
+ return item.(stream.XFlow), nil
+ }
+}
+
+func (ln *xListener) Close() error {
+ // TODO(suharshs): implement this for real.
+ ln.q.Close()
+ return nil
+}
+
+type netAdapter struct {
+ net.Listener
+ rid naming.RoutingID
+}
+
+func (n *netAdapter) Accept() (stream.XConn, naming.Endpoint, error) {
+ if conn, err := n.Listener.Accept(); err != nil {
+ return nil, nil, err
+ } else {
+ return xNetConn{conn}, localEndpoint(conn, n.rid), nil
+ }
+}
+
+func newXNetListener(ctx *context.T, m *xmanager, netLn net.Listener, principal security.Principal, blessings security.Blessings) stream.XListener {
+ ln := &xListener{
+ m: m,
+ q: upcqueue.New(),
+ ln: &netAdapter{netLn, m.rid},
+ principal: principal,
+ lBlessings: blessings,
+ }
+ go ln.acceptLoop(ctx)
+ return ln
+}
+
+type proxyAdapter struct {
+ q *upcqueue.T
+}
+
+func (p *proxyAdapter) Accept() (stream.XConn, naming.Endpoint, error) {
+ item, err := p.q.Get(nil)
+ f := item.(stream.XFlow)
+ return f, f.LocalEndpoint(), err
+}
+
+func (p *proxyAdapter) Close() error {
+ p.q.Close()
+ return nil
+}
+
+func newXProxyListener(proxyEP naming.Endpoint, m *xmanager, ctx *context.T, principal security.Principal) (stream.XListener, error) {
+ pa := &proxyAdapter{upcqueue.New()}
+ ln := &xListener{
+ m: m,
+ q: upcqueue.New(),
+ ln: pa,
+ principal: principal,
+ lBlessings: principal.BlessingStore().Default(),
+ }
+ _, vc, err := m.internalDial(proxyEP, ln.principal)
+ if err != nil {
+ return nil, err
+ }
+ if err := vc.ListenTo(pa.q); err != nil {
+ return nil, err
+ }
+ go ln.acceptLoop(ctx)
+ return ln, nil
+}
diff --git a/runtime/internal/rpc/stream/manager/xmanager.go b/runtime/internal/rpc/stream/manager/xmanager.go
new file mode 100644
index 0000000..34b797e
--- /dev/null
+++ b/runtime/internal/rpc/stream/manager/xmanager.go
@@ -0,0 +1,178 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "net"
+ "time"
+
+ inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/vc"
+ iversion "v.io/x/ref/runtime/internal/rpc/version"
+
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+)
+
+type xmanager struct {
+ ctx *context.T // TODO(mattr): This should be pushed into the managers interfaces.
+ rid naming.RoutingID
+ vcCache *vc.XVCCache
+}
+
+func XInternalNew(ctx *context.T, rid naming.RoutingID) stream.XManager {
+ return &xmanager{
+ ctx: ctx,
+ rid: rid,
+ vcCache: vc.NewXVCCache(),
+ }
+}
+
+func xdial(network, address string, timeout time.Duration) (net.Conn, error) {
+ d, _, _, _ := rpc.RegisteredProtocol(network)
+ if d != nil {
+ conn, err := d(network, address, timeout)
+ if err != nil {
+ return nil, verror.New(stream.ErrDialFailed, nil, err)
+ }
+ return conn, nil
+ }
+ return nil, verror.New(stream.ErrDialFailed, nil, verror.New(errUnknownNetwork, nil, network))
+}
+
+func (m *xmanager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, naming.Endpoint, error) {
+ bNames, err := extractBlessingNames(principal, blessings)
+ if err != nil {
+ return nil, nil, err
+ }
+ ln, ep, err := m.internalListen(protocol, address, principal, blessings, opts...)
+ if err != nil {
+ return nil, nil, err
+ }
+ ep.Blessings = bNames
+ return ln, ep, nil
+}
+
+func (m *xmanager) internalListen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, *inaming.Endpoint, error) {
+ if protocol == inaming.Network {
+ // Act as if listening on the address of a remote proxy.
+ ep, err := inaming.NewEndpoint(address)
+ if err != nil {
+ return nil, nil, verror.New(stream.ErrBadArg, nil, verror.New(errEndpointParseError, nil, address, err))
+ }
+ return m.remoteListen(ep, principal)
+ }
+
+ netln, err := listen(protocol, address)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ ln := newXNetListener(m.ctx, m, netln, principal, blessings)
+ ep := &inaming.Endpoint{
+ Protocol: protocol,
+ Address: netln.Addr().String(),
+ RID: m.rid,
+ }
+ return ln, ep, nil
+}
+
+func (m *xmanager) remoteListen(proxy naming.Endpoint, principal security.Principal) (stream.XListener, *inaming.Endpoint, error) {
+ ln, err := newXProxyListener(proxy, m, m.ctx, principal)
+ if err != nil {
+ return nil, nil, err
+ }
+ ep := &inaming.Endpoint{
+ Protocol: proxy.Addr().Network(),
+ Address: proxy.Addr().String(),
+ RID: m.rid,
+ }
+ return ln, ep, nil
+}
+
+func (m *xmanager) Dial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, error) {
+ f, _, err := m.internalDial(remote, principal, opts...)
+ return f, err
+}
+
+func (m *xmanager) internalDial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, stream.XVC, error) {
+ var timeout time.Duration
+ for _, o := range opts {
+ switch v := o.(type) {
+ case DialTimeout:
+ timeout = time.Duration(v)
+ }
+ }
+ var v stream.XVC
+ var err error
+ if v, err = m.vcCache.ReservedFind(remote); err != nil {
+ return nil, nil, err
+ }
+ defer m.vcCache.Unreserve(remote)
+ if v == nil {
+ conn, err := xdial(remote.Addr().Network(), remote.Addr().String(), timeout)
+ if err != nil {
+ return nil, nil, err
+ }
+ local := localEndpoint(conn, m.rid)
+ if v, err = vc.XNewDialed(m.ctx, xNetConn{conn}, principal, local, remote, *iversion.SupportedRange); err != nil {
+ return nil, nil, err
+ }
+ if err := m.vcCache.Insert(v); err != nil {
+ return nil, nil, err
+ }
+ }
+
+ if remote.RoutingID() != v.(backingVC).RemoteEndpoint().RoutingID() {
+ flow, err := v.Connect()
+ if err != nil {
+ return nil, nil, err
+ }
+ if v, err = vc.XNewDialed(m.ctx, flow, principal, flow.LocalEndpoint(), remote, *iversion.SupportedRange); err != nil {
+ return nil, nil, err
+ }
+ }
+ f, err := v.Connect()
+ return f, v, err
+}
+
+func (m *xmanager) Shutdown() {
+ // TODO(suharshs): Implement this.
+}
+
+func (m *xmanager) RoutingID() naming.RoutingID {
+ return m.rid
+}
+
+type backingVC interface {
+ LocalEndpoint() naming.Endpoint
+ RemoteEndpoint() naming.Endpoint
+ LocalPrincipal() security.Principal
+ LocalBlessings() security.Blessings
+ RemoteBlessings() security.Blessings
+ LocalDischarges() map[string]security.Discharge
+ RemoteDischarges() map[string]security.Discharge
+ VCDataCache() stream.VCDataCache
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+ localAddr := conn.LocalAddr()
+ ep := &inaming.Endpoint{
+ Protocol: localAddr.Network(),
+ Address: localAddr.String(),
+ RID: rid,
+ }
+ return ep
+}
+
+type xNetConn struct {
+ net.Conn
+}
+
+func (xNetConn) DisableEncryption() {}
diff --git a/runtime/internal/rpc/stream/manager/xmanager_test.go b/runtime/internal/rpc/stream/manager/xmanager_test.go
new file mode 100644
index 0000000..6ff4dc4
--- /dev/null
+++ b/runtime/internal/rpc/stream/manager/xmanager_test.go
@@ -0,0 +1,148 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "testing"
+
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/vtrace"
+ "v.io/x/ref/lib/flags"
+ "v.io/x/ref/runtime/internal/rpc/proxy"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/manager"
+ ivtrace "v.io/x/ref/runtime/internal/vtrace"
+ "v.io/x/ref/test/testutil"
+)
+
+func Init() (*context.T, context.CancelFunc) {
+ ctx, cancel := context.RootContext()
+ ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{
+ CollectRegexp: ".*",
+ DumpOnShutdown: true,
+ CacheSize: 100,
+ })
+ if err != nil {
+ panic(err)
+ }
+ return ctx, func() {
+ vtrace.FormatTraces(os.Stderr, vtrace.GetStore(ctx).TraceRecords(), nil)
+ cancel()
+ }
+}
+
+func TestDirectConnection(t *testing.T) {
+ ctx, cancel := Init()
+ defer cancel()
+
+ p := testutil.NewPrincipal("test")
+ rid := naming.FixedRoutingID(0x5555)
+ m := manager.XInternalNew(ctx, rid)
+ want := "read this please"
+
+ ln, ep, err := m.Listen("tcp", "127.0.0.1:0", p, p.BlessingStore().Default())
+ if err != nil {
+ t.Error(err)
+ }
+
+ go func(ln stream.XListener) {
+ flow, err := ln.Accept()
+ if err != nil {
+ t.Error(err)
+ }
+ got, err := readLine(flow)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ ln.Close()
+ }(ln)
+
+ flow, err := m.Dial(ep, p)
+ if err != nil {
+ t.Error(err)
+ }
+ writeLine(flow, want)
+}
+
+func TestProxyConnection(t *testing.T) {
+ ctx, cancel := Init()
+ defer cancel()
+
+ p := testutil.NewPrincipal("test")
+ srid := naming.FixedRoutingID(0x5555)
+ crid := naming.FixedRoutingID(0x4444)
+ prid := naming.FixedRoutingID(0x6666)
+ sm := manager.XInternalNew(ctx, srid)
+ cm := manager.XInternalNew(ctx, crid)
+ want := "read this please"
+
+ _, sd, pep, err := proxy.New("tcp", "127.0.0.1:0", prid, ctx, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer sd()
+ fmt.Printf("proxy ep: %v\n", pep)
+
+ // listen on the proxy
+ ln, ep, err := sm.Listen("v23", pep.String(), p, p.BlessingStore().Default())
+ fmt.Printf("server ep: %v\n", ep)
+ if err != nil {
+ t.Error(err)
+ }
+
+ go func(ln stream.XListener) {
+ flow, err := ln.Accept()
+ if err != nil {
+ t.Error(err)
+ }
+ got, err := readLine(flow)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ ln.Close()
+ }(ln)
+
+ flow, err := cm.Dial(ep, p)
+ if err != nil {
+ t.Error(err)
+ }
+
+ writeLine(flow, want)
+}
+
+func readLine(f stream.XFlow) (string, error) {
+ var result bytes.Buffer
+ var buf [5]byte
+ for {
+ n, err := f.Read(buf[:])
+ result.Write(buf[:n])
+ if err == io.EOF || buf[n-1] == '\n' {
+ return strings.TrimRight(result.String(), "\n"), nil
+ }
+ if err != nil {
+ return "", fmt.Errorf("Read returned (%d, %v)", n, err)
+ }
+ }
+}
+
+func writeLine(f stream.XFlow, data string) error {
+ data = data + "\n"
+ if n, err := f.Write([]byte(data)); err != nil {
+ return fmt.Errorf("Write returned (%d, %v)", n, err)
+ }
+ return nil
+}
diff --git a/runtime/internal/rpc/stream/message/data.go b/runtime/internal/rpc/stream/message/data.go
index 784b603..bb6196f 100644
--- a/runtime/internal/rpc/stream/message/data.go
+++ b/runtime/internal/rpc/stream/message/data.go
@@ -25,6 +25,10 @@
// SetClose sets the Close flag of the message.
func (d *Data) SetClose() { d.flags |= 0x1 }
+func (d *Data) Encrypted() bool { return d.flags&0x2 == 2 }
+
+func (d *Data) SetEncrypted() { d.flags |= 0x2 }
+
// Release releases the Payload
func (d *Data) Release() {
if d.Payload != nil {
diff --git a/runtime/internal/rpc/stream/message/message.go b/runtime/internal/rpc/stream/message/message.go
index 5e4b790..1ef3d10 100644
--- a/runtime/internal/rpc/stream/message/message.go
+++ b/runtime/internal/rpc/stream/message/message.go
@@ -159,6 +159,15 @@
Payload: payload,
}
m.Payload.TruncateFront(uint(dataHeaderSizeBytes + macSize))
+ if m.Encrypted() {
+ if !c.Open(m.Payload.Contents) {
+ m.Payload.Release()
+ return nil, verror.New(errCorruptedMessage, nil)
+ }
+ // TODO(mattr): This is probably not exactly right, but the iobuf code
+ // is strange and complex.
+ m.Payload.Contents = m.Payload.Contents[:m.Payload.Size()-macSize]
+ }
return m, nil
default:
payload.Release()
@@ -190,6 +199,12 @@
write4ByteUint(header[8:12], uint32(m.Flow))
header[12] = m.flags
EncryptMessage(msg.Contents, c)
+ if m.Encrypted() {
+ // TODO(mattr): Ensure that when we create an encrypted data message
+ // we leave macSize bytes free at the end.
+ msg.Extend(macSize)
+ c.Seal(msg.Contents[HeaderSizeBytes+macSize:])
+ }
_, err := w.Write(msg.Contents)
msg.Release()
return err
diff --git a/runtime/internal/rpc/stream/message/message_test.go b/runtime/internal/rpc/stream/message/message_test.go
index 72c108e..c5b4155 100644
--- a/runtime/internal/rpc/stream/message/message_test.go
+++ b/runtime/internal/rpc/stream/message/message_test.go
@@ -233,7 +233,7 @@
func TestDataNoPayload(t *testing.T) {
tests := []Data{
{VCI: 10, Flow: 3},
- {VCI: 11, Flow: 4, flags: 10},
+ {VCI: 11, Flow: 4},
}
var c testControlCipher
pool := iobuf.NewPool(0)
diff --git a/runtime/internal/rpc/stream/vc/auth.go b/runtime/internal/rpc/stream/vc/auth.go
index b85a4f1..7bca61d 100644
--- a/runtime/internal/rpc/stream/vc/auth.go
+++ b/runtime/internal/rpc/stream/vc/auth.go
@@ -8,10 +8,12 @@
"bytes"
"io"
+ "v.io/v23/context"
"v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/v23/vom"
+ "v.io/v23/vtrace"
"v.io/x/ref/runtime/internal/lib/iobuf"
"v.io/x/ref/runtime/internal/rpc/stream"
@@ -55,7 +57,7 @@
}
// Note that since the client uses a self-signed blessing to authenticate
// during VC setup, it does not share any discharges.
- client, _, err := readBlessings(conn, authClientContextTag, crypter, v)
+ client, _, err := readBlessings(nil, conn, authClientContextTag, crypter, v)
if err != nil {
return security.Blessings{}, nil, err
}
@@ -69,7 +71,7 @@
// The client will only share its blessings if the server (who shares its
// blessings first) is authorized as per the authorizer for this RPC.
func AuthenticateAsClient(conn io.ReadWriteCloser, crypter crypto.Crypter, v version.RPCVersion, params security.CallParams, auth *ServerAuthorizer) (security.Blessings, security.Blessings, map[string]security.Discharge, error) {
- server, serverDischarges, err := readBlessings(conn, authServerContextTag, crypter, v)
+ server, serverDischarges, err := readBlessings(nil, conn, authServerContextTag, crypter, v)
if err != nil {
return security.Blessings{}, security.Blessings{}, nil, err
}
@@ -113,7 +115,9 @@
if err := enc.Encode(discharges); err != nil {
return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
}
- msg, err := crypter.Encrypt(iobuf.NewSlice(buf.Bytes()))
+ bts := buf.Bytes()
+
+ msg, err := crypter.Encrypt(iobuf.NewSlice(bts))
if err != nil {
return err
}
@@ -125,36 +129,56 @@
return nil
}
-func readBlessings(r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
+func readBlessings(ctx *context.T, r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
+ var span vtrace.Span
+ if ctx != nil {
+ ctx, span = vtrace.WithNewSpan(ctx, "Read blessings")
+ defer span.Finish()
+ }
+
var msg []byte
var noBlessings security.Blessings
+ _, span = vtrace.WithNewSpan(ctx, "First vom Decoding")
dec := vom.NewDecoder(r)
if err := dec.Decode(&msg); err != nil {
return noBlessings, nil, verror.New(stream.ErrNetwork, nil, verror.New(errHandshakeMessage, nil, err))
}
+ span.Finish()
+
+ _, span = vtrace.WithNewSpan(ctx, "Checking channel binding")
buf, err := crypter.Decrypt(iobuf.NewSlice(msg))
if err != nil {
return noBlessings, nil, err
}
defer buf.Release()
+ span.Finish()
+
dec = vom.NewDecoder(bytes.NewReader(buf.Contents))
var (
blessings security.Blessings
sig security.Signature
)
+ _, span = vtrace.WithNewSpan(ctx, "Second vom Decoding")
if err = dec.Decode(&sig); err != nil {
return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
+ span.Finish()
+ _, span = vtrace.WithNewSpan(ctx, "Third vom Decoding")
if err = dec.Decode(&blessings); err != nil {
return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
+ span.Finish()
+ _, span = vtrace.WithNewSpan(ctx, "Fourth vom Decoding")
var discharges []security.Discharge
if err := dec.Decode(&discharges); err != nil {
return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
+ span.Finish()
+ _, span = vtrace.WithNewSpan(ctx, "Signature verify")
if !sig.Verify(blessings.PublicKey(), append(tag, crypter.ChannelBinding()...)) {
return noBlessings, nil, verror.New(stream.ErrSecurity, nil, verror.New(errInvalidSignatureInMessage, nil))
}
+ span.Finish()
return blessings, mkDischargeMap(discharges), nil
}
diff --git a/runtime/internal/rpc/stream/vc/knobs.go b/runtime/internal/rpc/stream/vc/knobs.go
index 7271f7a..70539fb 100644
--- a/runtime/internal/rpc/stream/vc/knobs.go
+++ b/runtime/internal/rpc/stream/vc/knobs.go
@@ -32,4 +32,8 @@
// Special flow over which discharges for third-party caveats
// on the server's blessings are sent.
DischargeFlowID = 4
+
+ // These virtual flows are used as queue IDs in XVC.
+ ExpressFlowID = 5
+ StopFlowID = 6
)
diff --git a/runtime/internal/rpc/stream/vc/writer.go b/runtime/internal/rpc/stream/vc/writer.go
index 684a357..ff4fba1 100644
--- a/runtime/internal/rpc/stream/vc/writer.go
+++ b/runtime/internal/rpc/stream/vc/writer.go
@@ -111,6 +111,12 @@
w.muSharedCountersBorrowed.Unlock()
}
+func (w *writer) DisableEncryption() {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ w.Sink.Put(disableEncryption, w.deadline)
+}
+
// Write implements the Write call for a Flow.
//
// Flow control is achieved using receive buffers (aka counters), wherein the
@@ -151,7 +157,9 @@
w.muSharedCountersBorrowed.Unlock()
w.Sink.Release(n)
}
- slice := w.Alloc.Copy(b[:n])
+ // TODO(mattr): We're leaving space here for the MAC. This is a gross
+ // way to do it. The whole allocator thing is done in an unpleasant way here.
+ slice := w.Alloc.CopyExtra(b[:n], 16)
if err := w.Sink.Put(slice, w.deadline); err != nil {
slice.Release()
atomic.AddUint32(&w.totalBytes, uint32(written))
diff --git a/runtime/internal/rpc/stream/vc/xvc.go b/runtime/internal/rpc/stream/vc/xvc.go
new file mode 100644
index 0000000..14e3a93
--- /dev/null
+++ b/runtime/internal/rpc/stream/vc/xvc.go
@@ -0,0 +1,824 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc/version"
+ "v.io/v23/security"
+ "v.io/v23/vom"
+ "v.io/v23/vtrace"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/runtime/internal/lib/bqueue"
+ "v.io/x/ref/runtime/internal/lib/bqueue/drrqueue"
+ "v.io/x/ref/runtime/internal/lib/iobuf"
+ vsync "v.io/x/ref/runtime/internal/lib/sync"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/crypto"
+ "v.io/x/ref/runtime/internal/rpc/stream/id"
+ "v.io/x/ref/runtime/internal/rpc/stream/message"
+ iversion "v.io/x/ref/runtime/internal/rpc/version"
+)
+
+var pool = iobuf.NewPool(0)
+var nullCipher = crypto.NullControlCipher{}
+var stopToken = iobuf.NewSlice(make([]byte, 1))
+
+// This is kind of a hack. If you send this down a flow then all
+// bytes sent after will be sent unencyrpted.
+var disableEncryption = iobuf.NewSlice(make([]byte, 1))
+
+const (
+ // Priorities of the buffered queues used for flow control of writes.
+ authPriority bqueue.Priority = iota
+ expressPriority
+ systemXFlowPriority
+ normalXFlowPriority
+ stopPriority
+)
+
+type flowState struct {
+ flow *flow
+ encryptionDisabled bool
+}
+
+type xvc struct {
+ mu sync.Mutex
+ flowMap map[id.Flow]*flowState
+ lDischarges, rDischarges map[string]security.Discharge
+ nextConnectFID id.Flow
+
+ incommingMu sync.Mutex
+ incomming *upcqueue.T
+
+ cipher crypto.ControlCipher
+ version version.RPCVersion
+ principal security.Principal
+ lBlessings, rBlessings security.Blessings
+ local, remote naming.Endpoint
+ conn stream.XConn
+ outgoing bqueue.T
+ expressQ, stopQ bqueue.Writer
+ sharedCounters *vsync.Semaphore
+ reader *iobuf.Reader
+ closed chan struct{}
+ dataCache *dataCache
+ dClient DischargeClient
+ dBuffer time.Duration
+}
+
+func commonInit(
+ ctx *context.T,
+ conn stream.XConn,
+ principal security.Principal,
+ versions iversion.Range) (*xvc, stream.XFlow, error) {
+ vc := &xvc{
+ principal: principal,
+ cipher: nullCipher,
+ conn: conn,
+ outgoing: drrqueue.New(MaxPayloadSizeBytes),
+ flowMap: make(map[id.Flow]*flowState),
+ sharedCounters: vsync.NewSemaphore(),
+ closed: make(chan struct{}),
+ dataCache: newDataCache(),
+ }
+ // Both sides create reserved flows ahead of time.
+ authFlow, err := vc.newFlow(AuthFlowID, authPriority)
+ authFlow.Release(DefaultBytesBufferedPerFlow)
+ return vc, authFlow, err
+}
+
+func XNewDialed(
+ ctx *context.T,
+ conn stream.XConn,
+ principal security.Principal,
+ local, remote naming.Endpoint,
+ versions iversion.Range,
+ opts ...stream.VCOpt) (stream.XVC, error) {
+ ctx, span := vtrace.WithNewSpan(ctx, "Dialing VC")
+ defer span.Finish()
+
+ vc, authFlow, err := commonInit(ctx, conn, principal, versions)
+ if err != nil {
+ return nil, err
+ }
+
+ vc.local = local
+ vc.nextConnectFID = NumReservedFlows
+
+ errch := make(chan error)
+ go func() {
+ errch <- vc.setup(ctx, local, remote, versions, principal != nil)
+ }()
+ if err := <-errch; err != nil {
+ vc.Close(err)
+ return nil, err
+ }
+ if principal == nil {
+ // Exit early for unencrypted VCs.
+ return vc, nil
+ }
+ if err := vc.authenticateClient(ctx, authFlow, opts...); err != nil {
+ vc.Close(err)
+ return nil, err
+ }
+ return vc, nil
+}
+
+func XNewAccepted(
+ ctx *context.T,
+ conn stream.XConn,
+ principal security.Principal,
+ local naming.Endpoint,
+ lBlessings security.Blessings,
+ versions iversion.Range,
+ incomming *upcqueue.T,
+ opts ...stream.ListenerOpt) (stream.XVC, error) {
+ ctx, span := vtrace.WithNewSpan(ctx, "Accepting VC")
+ defer span.Finish()
+
+ vc, authFlow, err := commonInit(ctx, conn, principal, versions)
+ if err != nil {
+ return nil, err
+ }
+ vc.incommingMu.Lock()
+ // We unlock this only AFTER authentication is finished to ensure
+ // that no flows are announced until we're fully initialized.
+ // TODO(mattr): Find a better way.
+ defer vc.incommingMu.Unlock()
+ vc.incomming = incomming
+
+ vc.local = local
+ vc.lBlessings = lBlessings
+ vc.nextConnectFID = NumReservedFlows + 1
+ vc.dClient, vc.dBuffer = dischargeOptions(opts)
+
+ errch := make(chan error)
+ go func() {
+ // TODO(mattr): Sending local twice, but... what should we send?
+ errch <- vc.setup(ctx, local, local, versions, principal != nil)
+ }()
+ if err := <-errch; err != nil {
+ vc.Close(err)
+ return nil, err
+ }
+ if principal == nil {
+ //Exit early for unencrypted VCs.
+ return vc, nil
+ }
+ if err := vc.authenticateServer(ctx, authFlow, opts...); err != nil {
+ vc.Close(err)
+ return nil, err
+ }
+ return vc, nil
+}
+
+func (vc *xvc) setup(
+ ctx *context.T,
+ local, remote naming.Endpoint,
+ versions iversion.Range,
+ encrypted bool) error {
+ ctx, span := vtrace.WithNewSpan(ctx, "Setup")
+ defer span.Finish()
+
+ _, keySpan := vtrace.WithNewSpan(ctx, "Generate Key")
+ pk, sk, err := crypto.GenerateBoxKey()
+ if err != nil {
+ return err
+ }
+ keySpan.Finish()
+
+ setup := &message.SetupVC{
+ LocalEndpoint: local,
+ RemoteEndpoint: remote,
+ Counters: message.NewCounters(),
+ Setup: message.Setup{
+ Versions: versions,
+ Options: []message.SetupOption{&message.NaclBox{PublicKey: *pk}},
+ },
+ }
+ setup.Counters.Add(0, SharedFlowID, DefaultBytesBufferedPerFlow)
+
+ // Send and receive a SetupVC message.
+ // Note we use the nullCipher here. The XConn is handling encryption.
+ errch := make(chan error, 1)
+ go func() {
+ _, span := vtrace.WithNewSpan(ctx, "Writing setup")
+ errch <- message.WriteTo(vc.conn, setup, nullCipher)
+ span.Finish()
+ }()
+ _, readSpan := vtrace.WithNewSpan(ctx, "Reading setup")
+ vc.reader = iobuf.NewReader(pool, vc.conn)
+ msg, err := message.ReadFrom(vc.reader, nullCipher)
+ if err != nil {
+ return err
+ }
+ remoteSetup, ok := msg.(*message.SetupVC)
+ if !ok {
+ return fmt.Errorf("wrong message.")
+ }
+ readSpan.Finish()
+
+ if err := <-errch; err != nil {
+ return err
+ }
+ vc.remote = remoteSetup.LocalEndpoint
+
+ // Release shared counters
+ vc.distributeCounters(remoteSetup.Counters)
+
+ // Choose a version.
+ vrange, err := versions.Intersect(&remoteSetup.Setup.Versions)
+ if err != nil {
+ return err
+ }
+ vc.version = vrange.Max
+
+ // Set up the cipher.
+ if encrypted {
+ _, naclSpan := vtrace.WithNewSpan(ctx, "Creating Cipher")
+ remoteKey := remoteSetup.Setup.NaclBox()
+ if remoteKey == nil {
+ return fmt.Errorf("missing key.")
+ }
+ // Disable the underlying encryption. We'll do our own from now on.
+ vc.conn.DisableEncryption()
+
+ vc.cipher = crypto.NewControlCipherRPC11(pk, sk, &remoteKey.PublicKey)
+ naclSpan.Finish()
+ }
+
+ vc.expressQ, err = vc.outgoing.NewWriter(ExpressFlowID, expressPriority, DefaultBytesBufferedPerFlow)
+ if err != nil {
+ return err
+ }
+ vc.expressQ.Release(-1) // Disable flow control
+ vc.stopQ, err = vc.outgoing.NewWriter(StopFlowID, stopPriority, DefaultBytesBufferedPerFlow)
+ if err != nil {
+ return err
+ }
+ vc.stopQ.Release(-1) // Disable flow control
+
+ // Start read and write loops
+ go vc.readLoop()
+ go vc.writeLoop()
+
+ return nil
+}
+
+func (vc *xvc) connectSystemFlows() error {
+ conn, err := vc.connect(TypeFlowID, systemXFlowPriority)
+ if err != nil {
+ return err
+ }
+ vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(conn))
+ vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(conn))
+
+ if len(vc.rBlessings.ThirdPartyCaveats()) > 0 {
+ conn, err = vc.connect(DischargeFlowID, systemXFlowPriority)
+ if err != nil {
+ return err
+ }
+ go vc.recvDischargesLoop(conn)
+ }
+ return nil
+}
+
+func (vc *xvc) sendDischargesLoop(conn io.WriteCloser, tpCavs []security.Caveat) {
+ defer conn.Close()
+ if vc.dClient == nil {
+ return
+ }
+ enc := vom.NewEncoder(conn)
+ discharges := vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
+ for expiry := minExpiryTime(discharges, tpCavs); !expiry.IsZero(); expiry = minExpiryTime(discharges, tpCavs) {
+ select {
+ case <-time.After(fetchDuration(expiry, vc.dBuffer)):
+ discharges = vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
+ if err := enc.Encode(discharges); err != nil {
+ vlog.Errorf("encoding discharges on VC %v failed: %v", vc, err)
+ return
+ }
+ if len(discharges) == 0 {
+ continue
+ }
+ vc.mu.Lock()
+ if vc.lDischarges == nil {
+ vc.lDischarges = make(map[string]security.Discharge)
+ }
+ for _, d := range discharges {
+ vc.lDischarges[d.ID()] = d
+ }
+ vc.mu.Unlock()
+ case <-vc.closed:
+ vlog.VI(3).Infof("closing sendDischargesLoop on VC %v", vc)
+ return
+ }
+ }
+}
+
+func (vc *xvc) recvDischargesLoop(conn io.ReadCloser) {
+ defer conn.Close()
+ dec := vom.NewDecoder(conn)
+ for {
+ var discharges []security.Discharge
+ if err := dec.Decode(&discharges); err != nil {
+ vlog.VI(3).Infof("decoding discharges on %v failed: %v", vc, err)
+ return
+ }
+ if len(discharges) == 0 {
+ continue
+ }
+ vc.mu.Lock()
+ if vc.rDischarges == nil {
+ vc.rDischarges = make(map[string]security.Discharge)
+ }
+ for _, d := range discharges {
+ vc.rDischarges[d.ID()] = d
+ }
+ vc.mu.Unlock()
+ }
+}
+
+func (vc *xvc) handleMessage(msg message.T) error {
+ switch m := msg.(type) {
+
+ case *message.Data:
+ payload := m.Payload
+ defer func() {
+ if payload != nil {
+ payload.Release()
+ }
+ }()
+ vc.mu.Lock()
+ fs := vc.flowMap[m.Flow]
+ if m.Close() && fs != nil {
+ defer fs.flow.Shutdown()
+ delete(vc.flowMap, m.Flow)
+ }
+ vc.mu.Unlock()
+ if payload.Size() == 0 {
+ return nil
+ }
+ if fs == nil {
+ vlog.Infof("Ignoring data packet for unkown flow: %#v", msg)
+ return nil
+ }
+ if err := fs.flow.reader.Put(payload); err != nil {
+ vlog.Errorf("Could not queue data packet: %#v", msg)
+ return err
+ }
+ payload = nil
+
+ case *message.AddReceiveBuffers:
+ vc.distributeCounters(m.Counters)
+
+ case *message.OpenFlow:
+ if err := vc.accept(m.Flow, int(m.InitialCounters)); err != nil {
+ cm := &message.Data{Flow: m.Flow}
+ cm.SetClose()
+ if err := vc.sendOnExpressQ(cm); err != nil {
+ return err
+ }
+ }
+ // TODO(mattr): The first write uses shared counters, but the first write is
+ // always just 1 byte :(
+ counters := message.NewCounters()
+ counters.Add(0, m.Flow, uint32(DefaultBytesBufferedPerFlow))
+ err := vc.sendOnExpressQ(&message.AddReceiveBuffers{
+ Counters: counters,
+ })
+ return err
+
+ case *message.CloseVC:
+ vc.stopQ.Put(stopToken, nil)
+ if m.Error != "" {
+ vlog.Errorf("VC Closed from remote end due to error: %s", m.Error)
+ return fmt.Errorf(m.Error)
+ }
+ return io.EOF
+
+ default:
+ vlog.Infof("Ignoring irrelevant or unrecognized message %T: ", m)
+ }
+ return nil
+}
+
+func (vc *xvc) distributeCounters(counters message.Counters) {
+ for cid, bytes := range counters {
+ if cid.Flow() == SharedFlowID {
+ vc.sharedCounters.IncN(uint(bytes))
+ continue
+ }
+ vc.mu.Lock()
+ f := vc.flowMap[cid.Flow()]
+ vc.mu.Unlock()
+ if f == nil {
+ vlog.Infof("ignoring counters for unknown flow: %d", cid.Flow())
+ continue
+ }
+ f.flow.Release(int(bytes))
+ }
+}
+
+func (vc *xvc) readLoop() {
+ for {
+ msg, err := message.ReadFrom(vc.reader, vc.cipher)
+ if err != nil {
+ vlog.Errorf("Exiting VC read loop: %v", err)
+ return
+ }
+ if err := vc.handleMessage(msg); err != nil {
+ if err != io.EOF {
+ vlog.Errorf("Could not handle message: %v", err)
+ }
+ return
+ }
+ }
+}
+
+func releaseBufs(bufs []*iobuf.Slice) {
+ for _, b := range bufs {
+ if b != disableEncryption && b != stopToken {
+ b.Release()
+ }
+ }
+}
+
+func (vc *xvc) writeData(writer bqueue.Writer, bufs []*iobuf.Slice) error {
+ defer releaseBufs(bufs)
+ fid := id.Flow(writer.ID())
+ var encryptionDisabled bool
+ vc.mu.Lock()
+ fs := vc.flowMap[fid]
+ if fs != nil {
+ encryptionDisabled = fs.encryptionDisabled
+ }
+ vc.mu.Unlock()
+
+ // TODO(mattr): coalesce, but only after finding the encryption boundary.
+ last := len(bufs) - 1
+ // TODO(mattr): This is broken if disableEncryption is last.
+ drained := writer.IsDrained()
+
+ if drained {
+ defer func() {
+ vc.mu.Lock()
+ delete(vc.flowMap, fid)
+ vc.mu.Unlock()
+ }()
+ }
+
+ for i, b := range bufs {
+ if b == disableEncryption {
+ encryptionDisabled = true
+ vc.mu.Lock()
+ if fs != nil {
+ fs.encryptionDisabled = true
+ }
+ vc.mu.Unlock()
+ continue
+ }
+
+ d := &message.Data{Flow: fid, Payload: b}
+ if !encryptionDisabled {
+ d.SetEncrypted()
+ }
+ if drained && i == last {
+ d.SetClose()
+ }
+ if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
+ return err
+ }
+ }
+ if len(bufs) == 0 && drained {
+ d := &message.Data{Flow: fid}
+ d.SetClose()
+ if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (vc *xvc) writeSerializedMessage(msg *iobuf.Slice) error {
+ // TODO(mattr): All of this would be easier if the queues contained messages
+ // instead of byte arrays.
+ if err := message.EncryptMessage(msg.Contents, vc.cipher); err != nil {
+ return err
+ }
+ if _, err := vc.conn.Write(msg.Contents); err != nil {
+ return err
+ }
+ msg.Release()
+ return nil
+}
+
+func (vc *xvc) writeLoop() {
+ defer vc.outgoing.Close()
+ defer close(vc.closed)
+ for {
+ writer, bufs, err := vc.outgoing.Get(nil)
+ if err != nil {
+ return
+ }
+ switch writer {
+ case vc.expressQ:
+ for _, b := range bufs {
+ if err = vc.writeSerializedMessage(b); err != nil {
+ vlog.Errorf("Could not write control message: %v", err)
+ return
+ }
+ }
+ case vc.stopQ:
+ if len(bufs) > 0 {
+ // If we get stopToken here, then the vc was closed by the remote end.
+ if bufs[0] != stopToken {
+ if err := vc.writeSerializedMessage(bufs[0]); err != nil {
+ vlog.Errorf("Could not write CloseVC: %v", err)
+ }
+ }
+ }
+ return
+ default:
+ if err = vc.writeData(writer, bufs); err != nil {
+ vlog.Errorf("Error writing: %v", err)
+ return
+ }
+ }
+ }
+}
+
+func (vc *xvc) authenticateServer(ctx *context.T, authFlow stream.XFlow, opts ...stream.ListenerOpt) error {
+ ctx, span := vtrace.WithNewSpan(ctx, "Server Authentication")
+ defer span.Finish()
+ defer authFlow.Close()
+
+ if vc.lBlessings.IsZero() {
+ return fmt.Errorf("no server blessings")
+ }
+
+ // TODO(mattr): Check that authFlow has the right fid.
+
+ // This is a wart. We always pass a null crypter (with channel binding) even
+ // though this is all encrypted by the flow write. This is just because we
+ // want to re-use the existing Authenticate methods.
+ crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())
+
+ var serverDischarges []security.Discharge
+ if tpcavs := vc.lBlessings.ThirdPartyCaveats(); len(tpcavs) > 0 && vc.dClient != nil {
+ _, span := vtrace.WithNewSpan(ctx, "Prepare Discharages")
+ serverDischarges = vc.dClient.PrepareDischarges(ctx, tpcavs, security.DischargeImpetus{})
+ span.Finish()
+ }
+ _, wspan := vtrace.WithNewSpan(ctx, "writeBlessings")
+ if err := writeBlessings(authFlow, authServerContextTag, crypter, vc.principal, vc.lBlessings, serverDischarges, vc.version); err != nil {
+ return err
+ }
+ wspan.Finish()
+
+ // Note that since the client uses a self-signed blessing to authenticate
+ // during VC setup, it does not share any discharges.
+ rBlessings, _, err := readBlessings(ctx, authFlow, authClientContextTag, crypter, vc.version)
+ if err != nil {
+ return err
+ }
+ vc.rBlessings = rBlessings
+ vc.lDischarges = mkDischargeMap(serverDischarges)
+ return nil
+}
+
+func (vc *xvc) authenticateClient(ctx *context.T, authFlow stream.XFlow, opts ...stream.VCOpt) error {
+ ctx, span := vtrace.WithNewSpan(ctx, "Client Authentication")
+ defer span.Finish()
+ defer authFlow.Close()
+
+ // This is a wart. We always pass a null crypter (with channel binding) even
+ // though this is all encrypted by the flow write. This is just because we
+ // want to re-use the existing Authenticate methods.
+ crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())
+
+ server, serverDischarges, err := readBlessings(ctx, authFlow, authServerContextTag, crypter, vc.version)
+ if err != nil {
+ return err
+ }
+
+ // Authorize the server based on the provided authorizer.
+ auth := serverAuthorizer(opts)
+ if auth != nil {
+ _, authSpan := vtrace.WithNewSpan(ctx, "Authorize Server")
+ params := security.CallParams{
+ LocalPrincipal: vc.principal,
+ LocalEndpoint: vc.local,
+ RemoteEndpoint: vc.remote,
+ RemoteBlessings: server,
+ RemoteDischarges: serverDischarges,
+ }
+ if err := auth.Authorize(params); err != nil {
+ return fmt.Errorf("Not trusted.")
+ }
+ authSpan.Finish()
+ }
+
+ // The client shares its blessings at RPC time (as the blessings may vary
+ // across RPCs). During VC handshake, the client simply sends a self-signed
+ // blessing in order to reveal its public key to the server.
+ // TODO(suharshs): Actually we should just send the public key.
+ _, blessSpan := vtrace.WithNewSpan(ctx, "Bless self")
+ client, err := vc.principal.BlessSelf("vcauth")
+ if err != nil {
+ return fmt.Errorf("Could not create blessing")
+ }
+ blessSpan.Finish()
+ _, writeSpan := vtrace.WithNewSpan(ctx, "Write blessings")
+ if err := writeBlessings(authFlow, authClientContextTag, crypter, vc.principal, client, nil, vc.version); err != nil {
+ return err
+ }
+ writeSpan.Finish()
+
+ vc.lBlessings = client
+ vc.rBlessings = server
+ vc.rDischarges = serverDischarges
+
+ return vc.connectSystemFlows()
+}
+
+func (vc *xvc) newFlow(fid id.Flow, priority bqueue.Priority) (*flow, error) {
+ qw, err := vc.outgoing.NewWriter(bqueue.ID(fid), priority, MaxPayloadSizeBytes)
+ if err != nil {
+ return nil, err
+ }
+ f := &flow{
+ backingVC: vc,
+ reader: newReader(&xReadHandler{fid, vc}),
+ writer: newWriter(MaxPayloadSizeBytes, qw, iobuf.NewAllocator(pool, 0), vc.sharedCounters),
+ }
+
+ vc.mu.Lock()
+ defer vc.mu.Unlock()
+ if vc.flowMap == nil {
+ qw.Close()
+ return nil, fmt.Errorf("Could not create flow on closed VC.")
+ }
+ // TODO(mattr): it's an error if fid already exists.
+ vc.flowMap[fid] = &flowState{f, true}
+
+ return f, nil
+}
+
+func (vc *xvc) sendOnExpressQ(msg message.T) error {
+ return vc.sendOn(vc.expressQ, msg)
+}
+
+func (vc *xvc) sendOn(w bqueue.Writer, msg message.T) error {
+ var buf bytes.Buffer
+ if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(vc.cipher)); err != nil {
+ return err
+ }
+ err := w.Put(iobuf.NewSlice(buf.Bytes()), nil)
+ return err
+}
+
+func (vc *xvc) connect(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.XFlow, error) {
+ f, err := vc.newFlow(fid, priority)
+ if err != nil {
+ return nil, err
+ }
+
+ vc.sendOnExpressQ(&message.OpenFlow{
+ Flow: fid,
+ InitialCounters: uint32(DefaultBytesBufferedPerFlow),
+ })
+ return f, nil
+}
+
+func (vc *xvc) accept(fid id.Flow, initialCounters int) error {
+ priority := normalXFlowPriority
+ if fid < NumReservedFlows {
+ priority = systemXFlowPriority
+ }
+ f, err := vc.newFlow(fid, priority)
+ if err != nil {
+ return err
+ }
+ f.Release(initialCounters)
+ switch fid {
+ case TypeFlowID:
+ vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(f))
+ vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(f))
+ return nil
+ case DischargeFlowID:
+ tpCaveats := vc.lBlessings.ThirdPartyCaveats()
+ if len(tpCaveats) > 0 {
+ go vc.sendDischargesLoop(f, tpCaveats)
+ }
+ return nil
+ default:
+ vc.incommingMu.Lock()
+ q := vc.incomming
+ vc.incommingMu.Unlock()
+
+ if q == nil {
+ err = fmt.Errorf("VC not listening")
+ } else {
+ err = q.Put(f)
+ }
+ if err != nil {
+ vc.mu.Lock()
+ f.Shutdown()
+ delete(vc.flowMap, fid)
+ vc.mu.Unlock()
+ }
+ return err
+ }
+}
+
+func (vc *xvc) Connect(opts ...stream.FlowOpt) (stream.XFlow, error) {
+ vc.mu.Lock()
+ fid := vc.nextConnectFID
+ vc.nextConnectFID += 2
+ vc.mu.Unlock()
+ return vc.connect(fid, normalXFlowPriority, opts...)
+}
+
+func (vc *xvc) ListenTo(q *upcqueue.T) error {
+ vc.incommingMu.Lock()
+ defer vc.incommingMu.Unlock()
+ if vc.incomming != nil {
+ return fmt.Errorf("The given address already has a listener.")
+ }
+ vc.incomming = q
+ return nil
+}
+
+func (vc *xvc) Closed() chan struct{} {
+ return vc.closed
+}
+
+func (vc *xvc) Close(reason error) error {
+ vc.mu.Lock()
+ flows := vc.flowMap
+ vc.flowMap = nil
+ vc.mu.Unlock()
+
+ vc.incommingMu.Lock()
+ q := vc.incomming
+ vc.incomming = nil
+ vc.incommingMu.Unlock()
+
+ if q != nil {
+ q.Close()
+ }
+ vc.sharedCounters.Close()
+
+ for _, fs := range flows {
+ fs.flow.Close()
+ }
+
+ msg := ""
+ if reason != nil {
+ msg = reason.Error()
+ }
+ err := vc.sendOn(vc.stopQ, &message.CloseVC{
+ Error: msg,
+ })
+ return err
+}
+
+// Implement the backingVC interface for flows.
+// Note I don't bother with locking because these are all set by the time the
+// constructor is done.
+func (vc *xvc) LocalEndpoint() naming.Endpoint { return vc.local }
+func (vc *xvc) RemoteEndpoint() naming.Endpoint { return vc.remote }
+func (vc *xvc) LocalPrincipal() security.Principal { return vc.principal }
+func (vc *xvc) LocalBlessings() security.Blessings { return vc.lBlessings }
+func (vc *xvc) RemoteBlessings() security.Blessings { return vc.rBlessings }
+func (vc *xvc) LocalDischarges() map[string]security.Discharge { return vc.lDischarges }
+func (vc *xvc) RemoteDischarges() map[string]security.Discharge { return vc.rDischarges }
+func (vc *xvc) VCDataCache() stream.VCDataCache { return vc.dataCache }
+
+type xReadHandler struct {
+ flow id.Flow
+ vc *xvc
+}
+
+func (h *xReadHandler) HandleRead(bytes uint) {
+ counters := message.NewCounters()
+ counters.Add(0, h.flow, uint32(bytes))
+ err := h.vc.sendOnExpressQ(&message.AddReceiveBuffers{
+ Counters: counters,
+ })
+ if err != nil {
+ vlog.Errorf("Unable to send counters: %v", err)
+ }
+}
diff --git a/runtime/internal/rpc/stream/vc/xvc_cache.go b/runtime/internal/rpc/stream/vc/xvc_cache.go
new file mode 100644
index 0000000..73818d8
--- /dev/null
+++ b/runtime/internal/rpc/stream/vc/xvc_cache.go
@@ -0,0 +1,119 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+ "fmt"
+ "sync"
+
+ "v.io/x/ref/runtime/internal/rpc/stream"
+
+ "v.io/v23/naming"
+ "v.io/v23/verror"
+)
+
+var errXVCCacheClosed = reg(".errXVCCacheClosed", "vc cache has been closed")
+
+// XVCCache implements a set of VIFs keyed by the endpoint of the remote end and the
+// local principal. Multiple goroutines can invoke methods on the XVCCache simultaneously.
+type XVCCache struct {
+ mu sync.Mutex
+ epCache map[string]*xvc // GUARDED_BY(mu)
+ ridCache map[naming.RoutingID]*xvc // GUARDED_BY(mu)
+ started map[string]bool // GUARDED_BY(mu)
+ cond *sync.Cond
+}
+
+// NewXVCCache returns a new cache for XVCs.
+func NewXVCCache() *XVCCache {
+ c := &XVCCache{
+ epCache: make(map[string]*xvc),
+ ridCache: make(map[naming.RoutingID]*xvc),
+ started: make(map[string]bool),
+ }
+ c.cond = sync.NewCond(&c.mu)
+ return c
+}
+
+// ReservedFind returns a XVC where the remote end of the underlying connection
+// is identified by the provided (ep). Returns nil if there is no
+// such XVC.
+//
+// Iff the cache is closed, ReservedFind will return an error.
+// If ReservedFind has no error, the caller is required to call Unreserve, to avoid deadlock.
+// The ep in Unreserve must be the same as the one used in the ReservedFind call.
+// During this time, all new ReservedFind calls for this ep will Block until
+// the corresponding Unreserve call is made.
+func (c *XVCCache) ReservedFind(ep naming.Endpoint) (stream.XVC, error) {
+ if ret, ok := c.ridCache[ep.RoutingID()]; ok {
+ return ret, nil
+ }
+ k := ep.String()
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ for c.started[k] {
+ c.cond.Wait()
+ }
+ if c.epCache == nil {
+ return nil, verror.New(errXVCCacheClosed, nil)
+ }
+ c.started[k] = true
+ ret := c.epCache[k]
+ if ret == nil {
+ return nil, nil
+ }
+ return ret, nil
+}
+
+// Unreserve marks the status of the ep as no longer started, and
+// broadcasts waiting threads.
+func (c *XVCCache) Unreserve(ep naming.Endpoint) {
+ c.mu.Lock()
+ delete(c.started, ep.String())
+ c.cond.Broadcast()
+ c.mu.Unlock()
+}
+
+// Insert adds vc to the cache and returns an error iff the cache has been closed.
+func (c *XVCCache) Insert(v stream.XVC) error {
+ vc, ok := v.(*xvc)
+ if !ok {
+ return fmt.Errorf("vc %#v is not of type *vc.xvc", vc)
+ }
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.epCache == nil {
+ return verror.New(errXVCCacheClosed, nil)
+ }
+ c.epCache[vc.RemoteEndpoint().String()] = vc
+ c.ridCache[vc.RemoteEndpoint().RoutingID()] = vc
+ return nil
+}
+
+// Close marks the XVCCache as closed and returns the XVCs remaining in the cache.
+func (c *XVCCache) Close() []*xvc {
+ c.mu.Lock()
+ vcs := make([]*xvc, 0, len(c.epCache))
+ for _, vc := range c.epCache {
+ vcs = append(vcs, vc)
+ }
+ c.epCache = nil
+ c.ridCache = nil
+ c.started = nil
+ c.mu.Unlock()
+ return vcs
+}
+
+// Delete removes vc from the cache, returning an error iff the cache has been closed.
+func (c *XVCCache) Delete(vc *xvc) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.epCache == nil {
+ return verror.New(errXVCCacheClosed, nil)
+ }
+ delete(c.epCache, vc.RemoteEndpoint().String())
+ delete(c.ridCache, vc.RemoteEndpoint().RoutingID())
+ return nil
+}
diff --git a/runtime/internal/rpc/stream/vc/xvc_test.go b/runtime/internal/rpc/stream/vc/xvc_test.go
new file mode 100644
index 0000000..da6eab0
--- /dev/null
+++ b/runtime/internal/rpc/stream/vc/xvc_test.go
@@ -0,0 +1,142 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+ "bytes"
+ "crypto/rand"
+ "io"
+ "net"
+ "os"
+ "testing"
+
+ "v.io/v23/context"
+ "v.io/v23/vtrace"
+ "v.io/x/ref/lib/flags"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+ "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/version"
+ ivtrace "v.io/x/ref/runtime/internal/vtrace"
+ "v.io/x/ref/test/testutil"
+)
+
+var (
+ provider = testutil.NewIDProvider("root")
+ pClient = testutil.NewPrincipal()
+ pServer = testutil.NewPrincipal()
+)
+
+func init() {
+ provider.Bless(pClient, "client")
+ provider.Bless(pServer, "server")
+}
+
+func Init() (*context.T, context.CancelFunc) {
+ ctx, cancel := context.RootContext()
+ ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{
+ CollectRegexp: ".*",
+ DumpOnShutdown: true,
+ CacheSize: 100,
+ })
+ if err != nil {
+ panic(err)
+ }
+ return ctx, func() {
+ vtrace.FormatTraces(os.Stderr, vtrace.GetStore(ctx).TraceRecords(), nil)
+ cancel()
+ }
+}
+
+type xConn struct {
+ net.Conn
+}
+
+func (xConn) DisableEncryption() {}
+
+func pipe() (xConn, xConn) {
+ a, b := net.Pipe()
+ return xConn{a}, xConn{b}
+}
+
+func createVCs(clientCtx, serverCtx *context.T, q *upcqueue.T) (client, server stream.XVC) {
+ xc1, xc2 := pipe()
+ clientEP := &naming.Endpoint{Address: "client"}
+ serverEP := &naming.Endpoint{Address: "server"}
+ vrange := version.Range{Min: 10, Max: 10}
+ blessings := pServer.BlessingStore().Default()
+ schan := make(chan stream.XVC, 1)
+ go func() {
+ server, err := XNewAccepted(serverCtx, xc2, pServer, serverEP, blessings, vrange, q)
+ if err != nil {
+ panic(err)
+ }
+ schan <- server
+ }()
+ client, err := XNewDialed(clientCtx, xc1, pClient, clientEP, serverEP, vrange)
+ if err != nil {
+ panic(err)
+ }
+ return client, <-schan
+}
+
+func TestXVC(t *testing.T) {
+ ctx, cancel := Init()
+ defer cancel()
+
+ clientCtx, clientspan := vtrace.WithNewTrace(ctx)
+ defer clientspan.Finish()
+ serverCtx, serverspan := vtrace.WithNewTrace(ctx)
+ defer serverspan.Finish()
+
+ q := upcqueue.New()
+
+ client, server := createVCs(clientCtx, serverCtx, q)
+
+ ch := make(chan []byte)
+ go func() {
+ _, span := vtrace.WithNewSpan(serverCtx, "server read")
+ defer span.Finish()
+ item, err := q.Get(nil)
+ if err != nil {
+ panic(err)
+ }
+ sflow := item.(stream.XFlow)
+ recvbuf := make([]byte, 4<<20)
+ n, err := io.ReadFull(sflow, recvbuf)
+ if err != nil {
+ panic(err)
+ }
+ if n != 4<<20 {
+ t.Errorf("Expected %d bytes got %d", 4<<20, n)
+ }
+ ch <- recvbuf
+ }()
+
+ _, span := vtrace.WithNewSpan(clientCtx, "client write")
+ cflow, err := client.Connect()
+ if err != nil {
+ panic(err)
+ }
+ sendbuf := make([]byte, 4<<20)
+ _, err = io.ReadFull(rand.Reader, sendbuf)
+ if err != nil {
+ panic(err)
+ }
+ _, err = cflow.Write(sendbuf)
+ if err != nil {
+ panic(err)
+ }
+ cflow.Close()
+ span.Finish()
+
+ if bytes.Compare(sendbuf, <-ch) != 0 {
+ t.Errorf("Didn't get the sent data.")
+ }
+
+ // Close the client and wait for the server to close.
+ client.Close(nil)
+ <-server.Closed()
+}
diff --git a/runtime/internal/rpc/stream/xmodel.go b/runtime/internal/rpc/stream/xmodel.go
new file mode 100644
index 0000000..3981a2c
--- /dev/null
+++ b/runtime/internal/rpc/stream/xmodel.go
@@ -0,0 +1,55 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stream
+
+import (
+ "io"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+)
+
+type XFlow interface {
+ Flow
+
+ // DisableEncryption disables encryption for the flow. All bytes
+ // written afterward will be sent in the clear. All calls after the
+ // first have no effect.
+ DisableEncryption()
+}
+
+type XManager interface {
+ Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...ListenerOpt) (XListener, naming.Endpoint, error)
+ Dial(remote naming.Endpoint, principal security.Principal, opts ...VCOpt) (XFlow, error)
+ Shutdown()
+ RoutingID() naming.RoutingID
+}
+
+type XConn interface {
+ io.ReadWriteCloser
+
+ // DisableEncryption disables encryption for the flow. All bytes
+ // written afterward will be sent in the clear. All calls after the
+ // first have no effect.
+ DisableEncryption()
+}
+
+type XListener interface {
+ Accept() (XFlow, error)
+ Close() error
+}
+
+// TODO(mattr): XVC doesn't need to be an interface, and shouldn't be in the
+// model file.
+type XVC interface {
+ Connect(opts ...FlowOpt) (XFlow, error)
+ ListenTo(q *upcqueue.T) error
+ // Close closes the VC and all flows on it, allowing any pending writes in
+ // flows to drain.
+ Close(reason error) error
+ Closed() chan struct{}
+ VCDataCache() VCDataCache
+}
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
new file mode 100644
index 0000000..c7886ec
--- /dev/null
+++ b/runtime/internal/rpc/x_test.go
@@ -0,0 +1,81 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "v.io/x/ref/runtime/internal/rpc/proxy"
+ "v.io/x/ref/runtime/internal/rpc/stream/manager"
+ tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
+ "v.io/x/ref/test/testutil"
+
+ "v.io/v23"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+)
+
+func TestProxyRPCX(t *testing.T) {
+ ctx, shutdown := initForTest()
+ defer shutdown()
+
+ p := testutil.NewPrincipal("test")
+ ctx, err := v23.WithPrincipal(ctx, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ srid := naming.FixedRoutingID(0x5555)
+ crid := naming.FixedRoutingID(0x4444)
+ prid := naming.FixedRoutingID(0x6666)
+ sm := manager.XInternalNew(ctx, srid)
+ cm := manager.XInternalNew(ctx, crid)
+ ns := tnaming.NewSimpleNamespace()
+
+ client, err := XInternalNewClient(cm, ns)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ server, err := XInternalNewServer(ctx, sm, ns, nil, "", client, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, sd, pep, err := proxy.New("tcp", "127.0.0.1:0", prid, ctx, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer sd()
+ fmt.Printf("proxy ep: %v\n", pep)
+
+ spec := rpc.ListenSpec{Proxy: pep.Name()}
+ // the endpoint to the proxy wont be returned here because connecting to the
+ // proxy happens asynchronously.
+ _, err = server.Listen(spec)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := server.Serve("", &testServer{}, nil); err != nil {
+ t.Fatal(err)
+ }
+
+ var proxies []rpc.ProxyStatus
+ for len(proxies) == 0 {
+ time.Sleep(100 * time.Millisecond)
+ t.Log("checking for proxies")
+ proxies = server.Status().Proxies
+ }
+ ep := proxies[0].Endpoint
+
+ var got string
+ if err := client.Call(ctx, ep.Name(), "Echo", []interface{}{"arg"}, []interface{}{&got}); err != nil {
+ t.Fatal(err)
+ }
+ if want := fmt.Sprintf("method:%q,suffix:%q,arg:%q", "Echo", "", "arg"); got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
new file mode 100644
index 0000000..6d32f6b
--- /dev/null
+++ b/runtime/internal/rpc/xclient.go
@@ -0,0 +1,436 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "v.io/x/lib/vlog"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/namespace"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+ "v.io/v23/vtrace"
+
+ inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/vc"
+)
+
+type xclient struct {
+ streamMgr stream.XManager
+ ns namespace.T
+ vcOpts []stream.VCOpt // vc opts passed to dial
+ preferredProtocols []string
+
+ // We cache the IP networks on the device since it is not that cheap to read
+ // network interfaces through os syscall.
+ // TODO(jhahn): Add monitoring the network interface changes.
+ ipNets []*net.IPNet
+
+ dc vc.DischargeClient
+}
+
+var _ rpc.Client = (*xclient)(nil)
+
+func XInternalNewClient(streamMgr stream.XManager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+ c := &xclient{
+ streamMgr: streamMgr,
+ ns: ns,
+ ipNets: ipNetworks(),
+ }
+ c.dc = InternalNewDischargeClient(nil, c, 0)
+ for _, opt := range opts {
+ // Collect all client opts that are also vc opts.
+ switch v := opt.(type) {
+ case stream.VCOpt:
+ c.vcOpts = append(c.vcOpts, v)
+ case PreferredProtocols:
+ c.preferredProtocols = v
+ }
+ }
+
+ return c, nil
+}
+
+func (c *xclient) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
+ suberr := func(err error) *verror.SubErr {
+ return &verror.SubErr{Err: err, Options: verror.Print}
+ }
+ sm := c.streamMgr
+ flow, err := sm.Dial(ep, principal, vcOpts...)
+ if err != nil {
+ return nil, suberr(err)
+ }
+ return flow, nil
+}
+
+func (c *xclient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
+ return c.startCall(ctx, name, method, args, opts)
+}
+
+func (c *xclient) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
+ deadline := getDeadline(ctx, opts)
+
+ var lastErr error
+ for retries := uint(0); ; retries++ {
+ call, err := c.startCall(ctx, name, method, inArgs, opts)
+ if err != nil {
+ return err
+ }
+ err = call.Finish(outArgs...)
+ if err == nil {
+ return nil
+ }
+ lastErr = err
+ // We only retry if RetryBackoff is returned by the application because other
+ // RetryConnection and RetryRefetch required actions by the client before
+ // retrying.
+ if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
+ vlog.VI(4).Infof("Cannot retry after error: %s", lastErr)
+ break
+ }
+ if !backoff(retries, deadline) {
+ break
+ }
+ vlog.VI(4).Infof("Retrying due to error: %s", lastErr)
+ }
+ return lastErr
+}
+
+// startCall ensures StartCall always returns verror.E.
+func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
+ if !ctx.Initialized() {
+ return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
+ }
+ ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
+ if err := canCreateServerAuthorizer(ctx, opts); err != nil {
+ return nil, verror.New(verror.ErrBadArg, ctx, err)
+ }
+
+ deadline := getDeadline(ctx, opts)
+
+ var lastErr error
+ for retries := uint(0); ; retries++ {
+ call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
+ if err == nil {
+ return call, nil
+ }
+ lastErr = err
+ if !shouldRetry(action, requireResolve, deadline, opts) {
+ span.Annotatef("Cannot retry after error: %s", err)
+ break
+ }
+ if !backoff(retries, deadline) {
+ break
+ }
+ span.Annotatef("Retrying due to error: %s", err)
+ }
+ return nil, lastErr
+}
+
+// tryCreateFlow attempts to establish a Flow to "server" (which must be a
+// rooted name), over which a method invocation request could be sent.
+//
+// The server at the remote end of the flow is authorized using the provided
+// authorizer, both during creation of the VC underlying the flow and the
+// flow itself.
+// TODO(cnicolaou): implement real, configurable load balancing.
+func (c *xclient) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+ status := &serverStatus{index: index, server: server}
+ var span vtrace.Span
+ ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
+ span.Annotatef("address:%v", server)
+ defer func() {
+ ch <- status
+ span.Finish()
+ }()
+
+ suberr := func(err error) *verror.SubErr {
+ return &verror.SubErr{
+ Name: suberrName(server, name, method),
+ Err: err,
+ Options: verror.Print,
+ }
+ }
+
+ address, suffix := naming.SplitAddressName(server)
+ if len(address) == 0 {
+ status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
+ return
+ }
+ status.suffix = suffix
+
+ ep, err := inaming.NewEndpoint(address)
+ if err != nil {
+ status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
+ return
+ }
+ if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
+ status.serverErr.Name = suberrName(server, name, method)
+ vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
+ return
+ }
+
+ // Authorize the remote end of the flow using the provided authorizer.
+ if status.flow.LocalPrincipal() == nil {
+ // LocalPrincipal is nil which means we are operating under
+ // SecurityNone.
+ return
+ }
+
+ seccall := security.NewCall(&security.CallParams{
+ LocalPrincipal: status.flow.LocalPrincipal(),
+ LocalBlessings: status.flow.LocalBlessings(),
+ RemoteBlessings: status.flow.RemoteBlessings(),
+ LocalEndpoint: status.flow.LocalEndpoint(),
+ RemoteEndpoint: status.flow.RemoteEndpoint(),
+ RemoteDischarges: status.flow.RemoteDischarges(),
+ Method: method,
+ Suffix: status.suffix,
+ })
+ if err := auth.Authorize(ctx, seccall); err != nil {
+ // We will test for errServerAuthorizeFailed in failedTryCall and report
+ // verror.ErrNotTrusted
+ status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
+ vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
+ status.flow.Close()
+ status.flow = nil
+ return
+ }
+ status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall)
+ return
+}
+
+// tryCall makes a single attempt at a call. It may connect to multiple servers
+// (all that serve "name"), but will invoke the method on at most one of them
+// (the server running on the most preferred protcol and network amongst all
+// the servers that were successfully connected to and authorized).
+// if requireResolve is true on return, then we shouldn't bother retrying unless
+// you can re-resolve.
+func (c *xclient) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
+ var resolved *naming.MountEntry
+ var blessingPattern security.BlessingPattern
+ blessingPattern, name = security.SplitPatternName(name)
+ if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
+ // We always return NoServers as the error so that the caller knows
+ // that's ok to retry the operation since the name may be registered
+ // in the near future.
+ switch {
+ case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
+ return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
+ case verror.ErrorID(err) == verror.ErrNoServers.ID:
+ // Avoid wrapping errors unnecessarily.
+ return nil, verror.NoRetry, false, err
+ default:
+ return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
+ }
+ } else {
+ if len(resolved.Servers) == 0 {
+ // This should never happen.
+ return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
+ }
+ // An empty set of protocols means all protocols...
+ if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
+ return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
+ }
+ }
+
+ // We need to ensure calls to v23 factory methods do not occur during runtime
+ // initialization. Currently, the agent, which uses SecurityNone, is the only caller
+ // during runtime initialization. We would like to set the principal in the context
+ // to nil if we are running in SecurityNone, but this always results in a panic since
+ // the agent client would trigger the call v23.WithPrincipal during runtime
+ // initialization. So, we gate the call to v23.GetPrincipal instead since the agent
+ // client will have callEncrypted == false.
+ // Potential solutions to this are:
+ // (1) Create a separate client for the agent so that this code doesn't have to
+ // account for its use during runtime initialization.
+ // (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
+ // on here.
+ var principal security.Principal
+ if callEncrypted(opts) {
+ if principal = v23.GetPrincipal(ctx); principal == nil {
+ return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx)
+ }
+ }
+
+ // servers is now ordered by the priority heurestic implemented in
+ // filterAndOrderServers.
+ //
+ // Try to connect to all servers in parallel. Provide sufficient
+ // buffering for all of the connections to finish instantaneously. This
+ // is important because we want to process the responses in priority
+ // order; that order is indicated by the order of entries in servers.
+ // So, if two respones come in at the same 'instant', we prefer the
+ // first in the resolved.Servers)
+ attempts := len(resolved.Servers)
+
+ responses := make([]*serverStatus, attempts)
+ ch := make(chan *serverStatus, attempts)
+ vcOpts := append(getVCOpts(opts), c.vcOpts...)
+ authorizer := newServerAuthorizer(blessingPattern, opts...)
+ for i, server := range resolved.Names() {
+ // Create a copy of vcOpts for each call to tryCreateFlow
+ // to avoid concurrent tryCreateFlows from stepping on each
+ // other while manipulating their copy of the options.
+ vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
+ copy(vcOptsCopy, vcOpts)
+ go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
+ }
+
+ var timeoutChan <-chan time.Time
+ if deadline, ok := ctx.Deadline(); ok {
+ timeoutChan = time.After(deadline.Sub(time.Now()))
+ }
+
+ for {
+ // Block for at least one new response from the server, or the timeout.
+ select {
+ case r := <-ch:
+ responses[r.index] = r
+ // Read as many more responses as we can without blocking.
+ LoopNonBlocking:
+ for {
+ select {
+ default:
+ break LoopNonBlocking
+ case r := <-ch:
+ responses[r.index] = r
+ }
+ }
+ case <-timeoutChan:
+ vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name)
+ _, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
+ if verror.ErrorID(err) != verror.ErrTimeout.ID {
+ return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
+ }
+ return nil, verror.NoRetry, false, err
+ }
+
+ dc := c.dc
+ if shouldNotFetchDischarges(opts) {
+ dc = nil
+ }
+ // Process new responses, in priority order.
+ numResponses := 0
+ for _, r := range responses {
+ if r != nil {
+ numResponses++
+ }
+ if r == nil || r.flow == nil {
+ continue
+ }
+
+ doneChan := ctx.Done()
+ r.flow.SetDeadline(doneChan)
+ fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
+ if err != nil {
+ return nil, verror.NoRetry, false, err
+ }
+
+ if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
+ r.serverErr = &verror.SubErr{
+ Name: suberrName(r.server, name, method),
+ Options: verror.Print,
+ Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
+ }
+ vlog.VI(2).Infof("rpc: err: %s", r.serverErr)
+ r.flow.Close()
+ r.flow = nil
+ continue
+ }
+
+ // This is the 'point of no return'; once the RPC is started (fc.start
+ // below) we can't be sure if it makes it to the server or not so, this
+ // code will never call fc.start more than once to ensure that we provide
+ // 'at-most-once' rpc semantics at this level. Retrying the network
+ // connections (i.e. creating flows) is fine since we can cleanup that
+ // state if we abort a call (i.e. close the flow).
+ //
+ // We must ensure that all flows other than r.flow are closed.
+ //
+ // TODO(cnicolaou): all errors below are marked as NoRetry
+ // because we want to provide at-most-once rpc semantics so
+ // we only ever attempt an RPC once. In the future, we'll cache
+ // responses on the server and then we can retry in-flight
+ // RPCs.
+ go cleanupTryCall(r, responses, ch)
+
+ if doneChan != nil {
+ go func() {
+ select {
+ case <-doneChan:
+ vtrace.GetSpan(fc.ctx).Annotate("Canceled")
+ fc.flow.Cancel()
+ case <-fc.flow.Closed():
+ }
+ }()
+ }
+
+ deadline, _ := ctx.Deadline()
+ if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
+ return nil, verror.NoRetry, false, verr
+ }
+ return fc, verror.NoRetry, false, nil
+ }
+ if numResponses == len(responses) {
+ return c.failedTryCall(ctx, name, method, responses, ch)
+ }
+ }
+}
+
+// failedTryCall performs ©asynchronous cleanup for tryCall, and returns an
+// appropriate error from the responses we've already received. All parallel
+// calls in tryCall failed or we timed out if we get here.
+func (c *xclient) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
+ go cleanupTryCall(nil, responses, ch)
+ c.ns.FlushCacheEntry(name)
+ suberrs := []verror.SubErr{}
+ topLevelError := verror.ErrNoServers
+ topLevelAction := verror.RetryRefetch
+ onlyErrNetwork := true
+ for _, r := range responses {
+ if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
+ switch verror.ErrorID(r.serverErr.Err) {
+ case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
+ topLevelError = verror.ErrNotTrusted
+ topLevelAction = verror.NoRetry
+ onlyErrNetwork = false
+ case stream.ErrAborted.ID, stream.ErrNetwork.ID:
+ // do nothing
+ default:
+ onlyErrNetwork = false
+ }
+ suberrs = append(suberrs, *r.serverErr)
+ }
+ }
+
+ if onlyErrNetwork {
+ // If we only encountered network errors, then report ErrBadProtocol.
+ topLevelError = verror.ErrBadProtocol
+ }
+
+ // TODO(cnicolaou): we get system errors for things like dialing using
+ // the 'ws' protocol which can never succeed even if we retry the connection,
+ // hence we return RetryRefetch below except for the case where the servers
+ // are not trusted, in case there's no point in retrying at all.
+ // TODO(cnicolaou): implementing at-most-once rpc semantics in the future
+ // will require thinking through all of the cases where the RPC can
+ // be retried by the client whilst it's actually being executed on the
+ // server.
+ return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
+}
+
+func (c *xclient) Close() {
+ // TODO(suharshs): Implement this.
+}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
new file mode 100644
index 0000000..840c83f
--- /dev/null
+++ b/runtime/internal/rpc/xserver.go
@@ -0,0 +1,1277 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "fmt"
+ "io"
+ "net"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ "v.io/x/lib/netstate"
+ "v.io/x/lib/pubsub"
+ "v.io/x/lib/vlog"
+
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/namespace"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
+ "v.io/v23/vdl"
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+ "v.io/v23/vtrace"
+
+ "v.io/x/ref/lib/apilog"
+ "v.io/x/ref/lib/stats"
+ "v.io/x/ref/runtime/internal/lib/publisher"
+ inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/manager"
+ "v.io/x/ref/runtime/internal/rpc/stream/vc"
+)
+
+// state for each requested listen address
+type xlistenState struct {
+ protocol, address string
+ ln stream.XListener
+ lep naming.Endpoint
+ lnerr, eperr error
+ roaming bool
+ // We keep track of all of the endpoints, the port and a copy of
+ // the original listen endpoint for use with roaming network changes.
+ ieps []*inaming.Endpoint // list of currently active eps
+ port string // port to use for creating new eps
+ protoIEP inaming.Endpoint // endpoint to use as template for new eps (includes rid, versions etc)
+}
+
+type xserver struct {
+ sync.Mutex
+ // context used by the server to make internal RPCs, error messages etc.
+ ctx *context.T
+ cancel context.CancelFunc // function to cancel the above context.
+ state serverState // track state of the server.
+ streamMgr stream.XManager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ dc vc.DischargeClient // fetches discharges of blessings
+ listenerOpts []stream.ListenerOpt // listener opts for Listen.
+ settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
+ settingsName string // pubwsub stream name for dhcp
+ dhcpState *dhcpState // dhcpState, nil if not using dhcp
+ principal security.Principal
+ blessings security.Blessings
+
+ // maps that contain state on listeners.
+ listenState map[*xlistenState]struct{}
+ listeners map[stream.XListener]struct{}
+
+ // state of proxies keyed by the name of the proxy
+ proxies map[string]proxyState
+
+ disp rpc.Dispatcher // dispatcher to serve RPCs
+ dispReserved rpc.Dispatcher // dispatcher for reserved methods
+ active sync.WaitGroup // active goroutines we've spawned.
+ stoppedChan chan struct{} // closed when the server has been stopped.
+ preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
+ // We cache the IP networks on the device since it is not that cheap to read
+ // network interfaces through os syscall.
+ // TODO(jhahn): Add monitoring the network interface changes.
+ ipNets []*net.IPNet
+ ns namespace.T
+ servesMountTable bool
+ isLeaf bool
+
+ // TODO(cnicolaou): add roaming stats to rpcStats
+ stats *rpcStats // stats for this server.
+}
+
+func (s *xserver) allowed(next serverState, method string) error {
+ if states[s.state][next] {
+ s.state = next
+ return nil
+ }
+ return verror.New(verror.ErrBadState, s.ctx, fmt.Sprintf("%s called out of order or more than once", method))
+}
+
+func (s *xserver) isStopState() bool {
+ return s.state == stopping || s.state == stopped
+}
+
+func XInternalNewServer(
+ ctx *context.T,
+ streamMgr stream.XManager,
+ ns namespace.T,
+ settingsPublisher *pubsub.Publisher,
+ settingsName string,
+ client rpc.Client,
+ principal security.Principal,
+ opts ...rpc.ServerOpt) (rpc.Server, error) {
+ ctx, cancel := context.WithRootCancel(ctx)
+ ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
+ statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
+ s := &xserver{
+ ctx: ctx,
+ cancel: cancel,
+ streamMgr: streamMgr,
+ principal: principal,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listenState: make(map[*xlistenState]struct{}),
+ listeners: make(map[stream.XListener]struct{}),
+ proxies: make(map[string]proxyState),
+ stoppedChan: make(chan struct{}),
+ ipNets: ipNetworks(),
+ ns: ns,
+ stats: newRPCStats(statsPrefix),
+ settingsPublisher: settingsPublisher,
+ settingsName: settingsName,
+ }
+ var (
+ dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
+ securityLevel options.SecurityLevel
+ )
+ for _, opt := range opts {
+ switch opt := opt.(type) {
+ case stream.ListenerOpt:
+ // Collect all ServerOpts that are also ListenerOpts.
+ s.listenerOpts = append(s.listenerOpts, opt)
+ switch opt := opt.(type) {
+ case vc.DischargeExpiryBuffer:
+ dischargeExpiryBuffer = time.Duration(opt)
+ }
+ case options.ServerBlessings:
+ s.blessings = opt.Blessings
+ case options.ServesMountTable:
+ s.servesMountTable = bool(opt)
+ case options.IsLeaf:
+ s.isLeaf = bool(opt)
+ case ReservedNameDispatcher:
+ s.dispReserved = opt.Dispatcher
+ case PreferredServerResolveProtocols:
+ s.preferredProtocols = []string(opt)
+ case options.SecurityLevel:
+ securityLevel = opt
+ }
+ }
+ if s.blessings.IsZero() && principal != nil {
+ s.blessings = principal.BlessingStore().Default()
+ }
+ if securityLevel == options.SecurityNone {
+ s.principal = nil
+ s.blessings = security.Blessings{}
+ s.dispReserved = nil
+ }
+ // Make dischargeExpiryBuffer shorter than the VC discharge buffer to ensure we have fetched
+ // the discharges by the time the VC asks for them.`
+ s.dc = InternalNewDischargeClient(ctx, client, dischargeExpiryBuffer-(5*time.Second))
+ s.listenerOpts = append(s.listenerOpts, s.dc)
+ s.listenerOpts = append(s.listenerOpts, vc.DialContext{ctx})
+ blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
+ // TODO(caprita): revist printing the blessings with %s, and
+ // instead expose them as a list.
+ stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
+ if principal != nil {
+ stats.NewStringFunc(blessingsStatsName, func() string {
+ return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
+ })
+ }
+ return s, nil
+}
+
+func (s *xserver) Status() rpc.ServerStatus {
+ defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ status := rpc.ServerStatus{}
+ s.Lock()
+ defer s.Unlock()
+ status.State = externalStates[s.state]
+ status.ServesMountTable = s.servesMountTable
+ status.Mounts = s.publisher.Status()
+ status.Endpoints = []naming.Endpoint{}
+ for ls, _ := range s.listenState {
+ if ls.eperr != nil {
+ status.Errors = append(status.Errors, ls.eperr)
+ }
+ if ls.lnerr != nil {
+ status.Errors = append(status.Errors, ls.lnerr)
+ }
+ for _, iep := range ls.ieps {
+ status.Endpoints = append(status.Endpoints, iep)
+ }
+ }
+ status.Proxies = make([]rpc.ProxyStatus, 0, len(s.proxies))
+ for k, v := range s.proxies {
+ status.Proxies = append(status.Proxies, rpc.ProxyStatus{k, v.endpoint, v.err})
+ }
+ return status
+}
+
+func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
+ defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ s.Lock()
+ defer s.Unlock()
+ if s.dhcpState != nil {
+ s.dhcpState.watchers[ch] = struct{}{}
+ }
+}
+
+func (s *xserver) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
+ defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ s.Lock()
+ defer s.Unlock()
+ if s.dhcpState != nil {
+ delete(s.dhcpState.watchers, ch)
+ }
+}
+
+// resolveToEndpoint resolves an object name or address to an endpoint.
+func (s *xserver) resolveToEndpoint(address string) (string, error) {
+ var resolved *naming.MountEntry
+ var err error
+ if s.ns != nil {
+ if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
+ return "", err
+ }
+ } else {
+ // Fake a namespace resolution
+ resolved = &naming.MountEntry{Servers: []naming.MountedServer{
+ {Server: address},
+ }}
+ }
+ // An empty set of protocols means all protocols...
+ if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
+ return "", err
+ }
+ for _, n := range resolved.Names() {
+ address, suffix := naming.SplitAddressName(n)
+ if suffix != "" {
+ continue
+ }
+ if ep, err := inaming.NewEndpoint(address); err == nil {
+ return ep.String(), nil
+ }
+ }
+ return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
+}
+
+// createEndpoints creates appropriate inaming.Endpoint instances for
+// all of the externally accessible network addresses that can be used
+// to reach this server.
+func (s *xserver) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
+ iep, ok := lep.(*inaming.Endpoint)
+ if !ok {
+ return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
+ }
+ if !strings.HasPrefix(iep.Protocol, "tcp") &&
+ !strings.HasPrefix(iep.Protocol, "ws") {
+ // If not tcp, ws, or wsh, just return the endpoint we were given.
+ return []*inaming.Endpoint{iep}, "", false, nil
+ }
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, "", false, err
+ }
+ addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
+ if err != nil {
+ return nil, port, false, err
+ }
+
+ ieps := make([]*inaming.Endpoint, 0, len(addrs))
+ for _, addr := range addrs {
+ n, err := inaming.NewEndpoint(lep.String())
+ if err != nil {
+ return nil, port, false, err
+ }
+ n.IsMountTable = s.servesMountTable
+ n.Address = net.JoinHostPort(addr.String(), port)
+ ieps = append(ieps, n)
+ }
+ return ieps, port, unspecified, nil
+}
+
+func (s *xserver) Listen(listenSpec rpc.ListenSpec) ([]naming.Endpoint, error) {
+ defer apilog.LogCallf(nil, "listenSpec=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ useProxy := len(listenSpec.Proxy) > 0
+ if !useProxy && len(listenSpec.Addrs) == 0 {
+ return nil, verror.New(verror.ErrBadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
+ }
+
+ s.Lock()
+ defer s.Unlock()
+
+ if err := s.allowed(listening, "Listen"); err != nil {
+ return nil, err
+ }
+
+ // Start the proxy as early as possible, ignore duplicate requests
+ // for the same proxy.
+ if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
+ // Pre-emptively fetch discharges on the blessings (they will be cached
+ // within s.dc for future calls).
+ // This shouldn't be required, but is a hack to reduce flakiness in
+ // JavaScript browser integration tests.
+ // See https://v.io/i/392
+ s.dc.PrepareDischarges(s.ctx, s.blessings.ThirdPartyCaveats(), security.DischargeImpetus{})
+ // We have a goroutine for listening on proxy connections.
+ s.active.Add(1)
+ go func() {
+ s.proxyListenLoop(listenSpec.Proxy)
+ s.active.Done()
+ }()
+ }
+
+ roaming := false
+ lnState := make([]*xlistenState, 0, len(listenSpec.Addrs))
+ for _, addr := range listenSpec.Addrs {
+ if len(addr.Address) > 0 {
+ // Listen if we have a local address to listen on.
+ ls := &xlistenState{
+ protocol: addr.Protocol,
+ address: addr.Address,
+ }
+ ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(addr.Protocol, addr.Address, s.principal, s.blessings, s.listenerOpts...)
+ lnState = append(lnState, ls)
+ if ls.lnerr != nil {
+ vlog.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, ls.lnerr)
+ continue
+ }
+ ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
+ if ls.roaming && ls.eperr == nil {
+ ls.protoIEP = *ls.lep.(*inaming.Endpoint)
+ roaming = true
+ }
+ }
+ }
+
+ found := false
+ for _, ls := range lnState {
+ if ls.ln != nil {
+ found = true
+ break
+ }
+ }
+ if !found && !useProxy {
+ return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
+ }
+
+ if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
+ // Create a dhcp listener if we haven't already done so.
+ dhcp := &dhcpState{
+ name: s.settingsName,
+ publisher: s.settingsPublisher,
+ watchers: make(map[chan<- rpc.NetworkChange]struct{}),
+ }
+ s.dhcpState = dhcp
+ dhcp.ch = make(chan pubsub.Setting, 10)
+ dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
+ if dhcp.err == nil {
+ // We have a goroutine to listen for dhcp changes.
+ s.active.Add(1)
+ go func() {
+ s.dhcpLoop(dhcp.ch)
+ s.active.Done()
+ }()
+ }
+ }
+
+ eps := make([]naming.Endpoint, 0, 10)
+ for _, ls := range lnState {
+ s.listenState[ls] = struct{}{}
+ if ls.ln != nil {
+ // We have a goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ s.active.Add(1)
+ go func(ln stream.XListener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(ls.ln, ls.lep)
+ }
+
+ for _, iep := range ls.ieps {
+ eps = append(eps, iep)
+ }
+ }
+
+ return eps, nil
+}
+
+func (s *xserver) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.XListener, error) {
+ resolved, err := s.resolveToEndpoint(proxy)
+ if err != nil {
+ return nil, nil, verror.New(errFailedToResolveProxy, s.ctx, proxy, err)
+ }
+ opts := append([]stream.ListenerOpt{xproxyAuth{s}}, s.listenerOpts...)
+ ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.principal, s.blessings, opts...)
+ if err != nil {
+ return nil, nil, verror.New(errFailedToListenForProxy, s.ctx, resolved, err)
+ }
+ iep, ok := ep.(*inaming.Endpoint)
+ if !ok {
+ ln.Close()
+ return nil, nil, verror.New(errInternalTypeConversion, s.ctx, fmt.Sprintf("%T", ep))
+ }
+ s.Lock()
+ s.proxies[proxy] = proxyState{iep, nil}
+ s.Unlock()
+ iep.IsMountTable = s.servesMountTable
+ iep.IsLeaf = s.isLeaf
+ s.publisher.AddServer(iep.String())
+ return iep, ln, nil
+}
+
+func (s *xserver) proxyListenLoop(proxy string) {
+ const (
+ min = 5 * time.Millisecond
+ max = 5 * time.Minute
+ )
+
+ iep, ln, err := s.reconnectAndPublishProxy(proxy)
+ if err != nil {
+ vlog.Errorf("Failed to connect to proxy: %s", err)
+ }
+ // the initial connection maybe have failed, but we enter the retry
+ // loop anyway so that we will continue to try and connect to the
+ // proxy.
+ s.Lock()
+ if s.isStopState() {
+ s.Unlock()
+ return
+ }
+ s.Unlock()
+
+ for {
+ if ln != nil && iep != nil {
+ err := s.listenLoop(ln, iep)
+ // The listener is done, so:
+ // (1) Unpublish its name
+ s.publisher.RemoveServer(iep.String())
+ s.Lock()
+ if err != nil {
+ s.proxies[proxy] = proxyState{iep, verror.New(verror.ErrNoServers, s.ctx, err)}
+ } else {
+ // err will be nil if we're stopping.
+ s.proxies[proxy] = proxyState{iep, nil}
+ s.Unlock()
+ return
+ }
+ s.Unlock()
+ }
+
+ s.Lock()
+ if s.isStopState() {
+ s.Unlock()
+ return
+ }
+ s.Unlock()
+
+ // (2) Reconnect to the proxy unless the server has been stopped
+ backoff := min
+ ln = nil
+ for {
+ select {
+ case <-time.After(backoff):
+ if backoff = backoff * 2; backoff > max {
+ backoff = max
+ }
+ case <-s.stoppedChan:
+ return
+ }
+ // (3) reconnect, publish new address
+ if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
+ vlog.Errorf("Failed to reconnect to proxy %q: %s", proxy, err)
+ } else {
+ vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
+ break
+ }
+ }
+ }
+}
+
+// addListener adds the supplied listener taking care to
+// check to see if we're already stopping. It returns true
+// if the listener was added.
+func (s *xserver) addListener(ln stream.XListener) bool {
+ s.Lock()
+ defer s.Unlock()
+ if s.isStopState() {
+ return false
+ }
+ s.listeners[ln] = struct{}{}
+ return true
+}
+
+// rmListener removes the supplied listener taking care to
+// check if we're already stopping. It returns true if the
+// listener was removed.
+func (s *xserver) rmListener(ln stream.XListener) bool {
+ s.Lock()
+ defer s.Unlock()
+ if s.isStopState() {
+ return false
+ }
+ delete(s.listeners, ln)
+ return true
+}
+
+func (s *xserver) listenLoop(ln stream.XListener, ep naming.Endpoint) error {
+ defer vlog.VI(1).Infof("rpc: Stopped listening on %s", ep)
+ var calls sync.WaitGroup
+
+ if !s.addListener(ln) {
+ // We're stopping.
+ return nil
+ }
+
+ defer func() {
+ calls.Wait()
+ s.rmListener(ln)
+ }()
+ for {
+ flow, err := ln.Accept()
+ if err != nil {
+ vlog.VI(10).Infof("rpc: Accept on %v failed: %v", ep, err)
+ return err
+ }
+ calls.Add(1)
+ go func(flow stream.Flow) {
+ defer calls.Done()
+ fs, err := newXFlowServer(flow, s)
+ if err != nil {
+ vlog.VI(1).Infof("newFlowServer on %v failed: %v", ep, err)
+ return
+ }
+ if err := fs.serve(); err != nil {
+ // TODO(caprita): Logging errors here is too spammy. For example, "not
+ // authorized" errors shouldn't be logged as server errors.
+ // TODO(cnicolaou): revisit this when verror2 transition is
+ // done.
+ if err != io.EOF {
+ vlog.VI(2).Infof("Flow.serve on %v failed: %v", ep, err)
+ }
+ }
+ }(flow)
+ }
+}
+
+func (s *xserver) dhcpLoop(ch chan pubsub.Setting) {
+ defer vlog.VI(1).Infof("rpc: Stopped listen for dhcp changes")
+ vlog.VI(2).Infof("rpc: dhcp loop")
+ for setting := range ch {
+ if setting == nil {
+ return
+ }
+ switch v := setting.Value().(type) {
+ case []net.Addr:
+ s.Lock()
+ if s.isStopState() {
+ s.Unlock()
+ return
+ }
+ change := rpc.NetworkChange{
+ Time: time.Now(),
+ State: externalStates[s.state],
+ }
+ switch setting.Name() {
+ case NewAddrsSetting:
+ change.Changed = s.addAddresses(v)
+ change.AddedAddrs = v
+ case RmAddrsSetting:
+ change.Changed, change.Error = s.removeAddresses(v)
+ change.RemovedAddrs = v
+ }
+ vlog.VI(2).Infof("rpc: dhcp: change %v", change)
+ for ch, _ := range s.dhcpState.watchers {
+ select {
+ case ch <- change:
+ default:
+ }
+ }
+ s.Unlock()
+ default:
+ vlog.Errorf("rpc: dhcpLoop: unhandled setting type %T", v)
+ }
+ }
+}
+
+// Remove all endpoints that have the same host address as the supplied
+// address parameter.
+func (s *xserver) removeAddresses(addrs []net.Addr) ([]naming.Endpoint, error) {
+ var removed []naming.Endpoint
+ for _, address := range addrs {
+ host := getHost(address)
+ for ls, _ := range s.listenState {
+ if ls != nil && ls.roaming && len(ls.ieps) > 0 {
+ remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
+ for _, iep := range ls.ieps {
+ lnHost, _, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ lnHost = iep.Address
+ }
+ if lnHost == host {
+ vlog.VI(2).Infof("rpc: dhcp removing: %s", iep)
+ removed = append(removed, iep)
+ s.publisher.RemoveServer(iep.String())
+ continue
+ }
+ remaining = append(remaining, iep)
+ }
+ ls.ieps = remaining
+ }
+ }
+ }
+ return removed, nil
+}
+
+// Add new endpoints for the new address. There is no way to know with
+// 100% confidence which new endpoints to publish without shutting down
+// all network connections and reinitializing everything from scratch.
+// Instead, we find all roaming listeners with at least one endpoint
+// and create a new endpoint with the same port as the existing ones
+// but with the new address supplied to us to by the dhcp code. As
+// an additional safeguard we reject the new address if it is not
+// externally accessible.
+// This places the onus on the dhcp/roaming code that sends us addresses
+// to ensure that those addresses are externally reachable.
+func (s *xserver) addAddresses(addrs []net.Addr) []naming.Endpoint {
+ var added []naming.Endpoint
+ for _, address := range netstate.ConvertToAddresses(addrs) {
+ if !netstate.IsAccessibleIP(address) {
+ return added
+ }
+ host := getHost(address)
+ for ls, _ := range s.listenState {
+ if ls != nil && ls.roaming {
+ niep := ls.protoIEP
+ niep.Address = net.JoinHostPort(host, ls.port)
+ niep.IsMountTable = s.servesMountTable
+ niep.IsLeaf = s.isLeaf
+ ls.ieps = append(ls.ieps, &niep)
+ vlog.VI(2).Infof("rpc: dhcp adding: %s", niep)
+ s.publisher.AddServer(niep.String())
+ added = append(added, &niep)
+ }
+ }
+ }
+ return added
+}
+
+func (s *xserver) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
+ defer apilog.LogCallf(nil, "name=%.10s...,obj=,authorizer=", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ if obj == nil {
+ return verror.New(verror.ErrBadArg, s.ctx, "nil object")
+ }
+ invoker, err := objectToInvoker(obj)
+ if err != nil {
+ return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
+ }
+ s.setLeaf(true)
+ return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
+}
+
+func (s *xserver) setLeaf(value bool) {
+ s.Lock()
+ defer s.Unlock()
+ s.isLeaf = value
+ for ls, _ := range s.listenState {
+ for i := range ls.ieps {
+ ls.ieps[i].IsLeaf = s.isLeaf
+ }
+ }
+}
+
+func (s *xserver) ServeDispatcher(name string, disp rpc.Dispatcher) error {
+ defer apilog.LogCallf(nil, "name=%.10s...,disp=", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ if disp == nil {
+ return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
+ }
+ s.Lock()
+ defer s.Unlock()
+ if err := s.allowed(serving, "Serve or ServeDispatcher"); err != nil {
+ return err
+ }
+ vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
+ s.disp = disp
+ if len(name) > 0 {
+ for ls, _ := range s.listenState {
+ for _, iep := range ls.ieps {
+ s.publisher.AddServer(iep.String())
+ }
+ }
+ s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+ }
+ return nil
+}
+
+func (s *xserver) AddName(name string) error {
+ defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ if len(name) == 0 {
+ return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
+ }
+ s.Lock()
+ defer s.Unlock()
+ if err := s.allowed(publishing, "AddName"); err != nil {
+ return err
+ }
+ vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
+ s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+ return nil
+}
+
+func (s *xserver) RemoveName(name string) {
+ defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ s.Lock()
+ defer s.Unlock()
+ if err := s.allowed(publishing, "RemoveName"); err != nil {
+ return
+ }
+ vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
+ s.publisher.RemoveName(name)
+}
+
+func (s *xserver) Stop() error {
+ defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
+ vlog.VI(1).Infof("Stop: %s", serverDebug)
+ defer vlog.VI(1).Infof("Stop done: %s", serverDebug)
+ s.Lock()
+ if s.isStopState() {
+ s.Unlock()
+ return nil
+ }
+ s.state = stopping
+ close(s.stoppedChan)
+ s.Unlock()
+
+ // Delete the stats object.
+ s.stats.stop()
+
+ // Note, It's safe to Stop/WaitForStop on the publisher outside of the
+ // server lock, since publisher is safe for concurrent access.
+
+ // Stop the publisher, which triggers unmounting of published names.
+ s.publisher.Stop()
+ // Wait for the publisher to be done unmounting before we can proceed to
+ // close the listeners (to minimize the number of mounted names pointing
+ // to endpoint that are no longer serving).
+ //
+ // TODO(caprita): See if make sense to fail fast on rejecting
+ // connections once listeners are closed, and parallelize the publisher
+ // and listener shutdown.
+ s.publisher.WaitForStop()
+
+ s.Lock()
+
+ // Close all listeners. No new flows will be accepted, while in-flight
+ // flows will continue until they terminate naturally.
+ nListeners := len(s.listeners)
+ errCh := make(chan error, nListeners)
+
+ for ln, _ := range s.listeners {
+ go func(ln stream.XListener) {
+ errCh <- ln.Close()
+ }(ln)
+ }
+
+ drain := func(ch chan pubsub.Setting) {
+ for {
+ select {
+ case v := <-ch:
+ if v == nil {
+ return
+ }
+ default:
+ close(ch)
+ return
+ }
+ }
+ }
+
+ if dhcp := s.dhcpState; dhcp != nil {
+ // TODO(cnicolaou,caprita): investigate not having to close and drain
+ // the channel here. It's a little awkward right now since we have to
+ // be careful to not close the channel in two places, i.e. here and
+ // and from the publisher's Shutdown method.
+ if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
+ drain(dhcp.ch)
+ }
+ }
+
+ s.Unlock()
+
+ var firstErr error
+ for i := 0; i < nListeners; i++ {
+ if err := <-errCh; err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ // At this point, we are guaranteed that no new requests are going to be
+ // accepted.
+
+ // Wait for the publisher and active listener + flows to finish.
+ done := make(chan struct{}, 1)
+ go func() { s.active.Wait(); done <- struct{}{} }()
+
+ select {
+ case <-done:
+ case <-time.After(5 * time.Second):
+ vlog.Errorf("%s: Listener Close Error: %v", serverDebug, firstErr)
+ vlog.Errorf("%s: Timedout waiting for goroutines to stop: listeners: %d (currently: %d)", serverDebug, nListeners, len(s.listeners))
+ for ln, _ := range s.listeners {
+ vlog.Errorf("%s: Listener: %v", serverDebug, ln)
+ }
+ for ls, _ := range s.listenState {
+ vlog.Errorf("%s: ListenState: %v", serverDebug, ls)
+ }
+ <-done
+ vlog.Infof("%s: Done waiting.", serverDebug)
+ }
+
+ s.Lock()
+ defer s.Unlock()
+ s.disp = nil
+ if firstErr != nil {
+ return verror.New(verror.ErrInternal, s.ctx, firstErr)
+ }
+ s.state = stopped
+ s.cancel()
+ return nil
+}
+
+// flowServer implements the RPC server-side protocol for a single RPC, over a
+// flow that's already connected to the client.
+type xflowServer struct {
+ ctx *context.T // context associated with the RPC
+ server *xserver // rpc.Server that this flow server belongs to
+ disp rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
+ dec *vom.Decoder // to decode requests and args from the client
+ enc *vom.Encoder // to encode responses and results to the client
+ flow stream.Flow // underlying flow
+
+ // Fields filled in during the server invocation.
+ clientBlessings security.Blessings
+ ackBlessings bool
+ grantedBlessings security.Blessings
+ method, suffix string
+ tags []*vdl.Value
+ discharges map[string]security.Discharge
+ starttime time.Time
+ endStreamArgs bool // are the stream args at EOF?
+}
+
+var (
+ _ rpc.StreamServerCall = (*xflowServer)(nil)
+ _ security.Call = (*xflowServer)(nil)
+)
+
+func newXFlowServer(flow stream.Flow, server *xserver) (*xflowServer, error) {
+ server.Lock()
+ disp := server.disp
+ server.Unlock()
+
+ fs := &xflowServer{
+ ctx: server.ctx,
+ server: server,
+ disp: disp,
+ flow: flow,
+ discharges: make(map[string]security.Discharge),
+ }
+ typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+ if typeenc == nil {
+ fs.enc = vom.NewEncoder(flow)
+ fs.dec = vom.NewDecoder(flow)
+ } else {
+ fs.enc = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder))
+ typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+ fs.dec = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder))
+ }
+ return fs, nil
+}
+
+// authorizeVtrace works by simulating a call to __debug/vtrace.Trace. That
+// rpc is essentially equivalent in power to the data we are attempting to
+// attach here.
+func (fs *xflowServer) authorizeVtrace() error {
+ // Set up a context as though we were calling __debug/vtrace.
+ params := &security.CallParams{}
+ params.Copy(fs)
+ params.Method = "Trace"
+ params.MethodTags = []*vdl.Value{vdl.ValueOf(access.Debug)}
+ params.Suffix = "__debug/vtrace"
+
+ var auth security.Authorizer
+ if fs.server.dispReserved != nil {
+ _, auth, _ = fs.server.dispReserved.Lookup(params.Suffix)
+ }
+ return authorize(fs.ctx, security.NewCall(params), auth)
+}
+
+func (fs *xflowServer) serve() error {
+ defer fs.flow.Close()
+
+ results, err := fs.processRequest()
+
+ vtrace.GetSpan(fs.ctx).Finish()
+
+ var traceResponse vtrace.Response
+ // Check if the caller is permitted to view vtrace data.
+ if fs.authorizeVtrace() == nil {
+ traceResponse = vtrace.GetResponse(fs.ctx)
+ }
+
+ // Respond to the client with the response header and positional results.
+ response := rpc.Response{
+ Error: err,
+ EndStreamResults: true,
+ NumPosResults: uint64(len(results)),
+ TraceResponse: traceResponse,
+ AckBlessings: fs.ackBlessings,
+ }
+ if err := fs.enc.Encode(response); err != nil {
+ if err == io.EOF {
+ return err
+ }
+ return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
+ }
+ if response.Error != nil {
+ return response.Error
+ }
+ for ix, res := range results {
+ if err := fs.enc.Encode(res); err != nil {
+ if err == io.EOF {
+ return err
+ }
+ return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
+ }
+ }
+ // TODO(ashankar): Should unread data from the flow be drained?
+ //
+ // Reason to do so:
+ // The common stream.Flow implementation (v.io/x/ref/runtime/internal/rpc/stream/vc/reader.go)
+ // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
+ // slices will not be returned to the pool leading to possibly increased memory usage.
+ //
+ // Reason to not do so:
+ // Draining here will conflict with any Reads on the flow in a separate goroutine
+ // (for example, see TestStreamReadTerminatedByServer in full_test.go).
+ //
+ // For now, go with the reason to not do so as having unread data in the stream
+ // should be a rare case.
+ return nil
+}
+
+func (fs *xflowServer) readRPCRequest() (*rpc.Request, error) {
+ // Set a default timeout before reading from the flow. Without this timeout,
+ // a client that sends no request or a partial request will retain the flow
+ // indefinitely (and lock up server resources).
+ initTimer := newTimer(defaultCallTimeout)
+ defer initTimer.Stop()
+ fs.flow.SetDeadline(initTimer.C)
+
+ // Decode the initial request.
+ var req rpc.Request
+ if err := fs.dec.Decode(&req); err != nil {
+ return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err))
+ }
+ return &req, nil
+}
+
+func (fs *xflowServer) processRequest() ([]interface{}, error) {
+ fs.starttime = time.Now()
+ req, err := fs.readRPCRequest()
+ if err != nil {
+ // We don't know what the rpc call was supposed to be, but we'll create
+ // a placeholder span so we can capture annotations.
+ fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
+ return nil, err
+ }
+ fs.method = req.Method
+ fs.suffix = strings.TrimLeft(req.Suffix, "/")
+
+ if req.Language != "" {
+ fs.ctx = i18n.WithLangID(fs.ctx, i18n.LangID(req.Language))
+ }
+
+ // TODO(mattr): Currently this allows users to trigger trace collection
+ // on the server even if they will not be allowed to collect the
+ // results later. This might be considered a DOS vector.
+ spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method)
+ fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest)
+
+ var cancel context.CancelFunc
+ if !req.Deadline.IsZero() {
+ fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time)
+ } else {
+ fs.ctx, cancel = context.WithCancel(fs.ctx)
+ }
+ fs.flow.SetDeadline(fs.ctx.Done())
+ go fs.cancelContextOnClose(cancel)
+
+ // Initialize security: blessings, discharges, etc.
+ if err := fs.initSecurity(req); err != nil {
+ return nil, err
+ }
+ // Lookup the invoker.
+ invoker, auth, err := fs.lookup(fs.suffix, fs.method)
+ if err != nil {
+ return nil, err
+ }
+
+ // Note that we strip the reserved prefix when calling the invoker so
+ // that __Glob will call Glob. Note that we've already assigned a
+ // special invoker so that we never call the wrong method by mistake.
+ strippedMethod := naming.StripReserved(fs.method)
+
+ // Prepare invoker and decode args.
+ numArgs := int(req.NumPosArgs)
+ argptrs, tags, err := invoker.Prepare(strippedMethod, numArgs)
+ fs.tags = tags
+ if err != nil {
+ return nil, err
+ }
+ if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
+ err := newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want)
+ // If the client is sending the wrong number of arguments, try to drain the
+ // arguments sent by the client before returning an error to ensure the client
+ // receives the correct error in call.Finish(). Otherwise, the client may get
+ // an EOF error while encoding args since the server closes the flow upon returning.
+ var any interface{}
+ for i := 0; i < int(req.NumPosArgs); i++ {
+ if decerr := fs.dec.Decode(&any); decerr != nil {
+ return nil, err
+ }
+ }
+ return nil, err
+ }
+ for ix, argptr := range argptrs {
+ if err := fs.dec.Decode(argptr); err != nil {
+ return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err)
+ }
+ }
+
+ // Check application's authorization policy.
+ if err := authorize(fs.ctx, fs, auth); err != nil {
+ return nil, err
+ }
+
+ // Invoke the method.
+ results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs)
+ fs.server.stats.record(fs.method, time.Since(fs.starttime))
+ return results, err
+}
+
+func (fs *xflowServer) cancelContextOnClose(cancel context.CancelFunc) {
+ // Ensure that the context gets cancelled if the flow is closed
+ // due to a network error, or client cancellation.
+ select {
+ case <-fs.flow.Closed():
+ // Here we remove the contexts channel as a deadline to the flow.
+ // We do this to ensure clients get a consistent error when they read/write
+ // after the flow is closed. Since the flow is already closed, it doesn't
+ // matter that the context is also cancelled.
+ fs.flow.SetDeadline(nil)
+ cancel()
+ case <-fs.ctx.Done():
+ }
+}
+
+// lookup returns the invoker and authorizer responsible for serving the given
+// name and method. The suffix is stripped of any leading slashes. If it begins
+// with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
+// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
+// value may be modified to match the actual suffix and method to use.
+func (fs *xflowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
+ if naming.IsReserved(method) {
+ return reservedInvoker(fs.disp, fs.server.dispReserved), security.AllowEveryone(), nil
+ }
+ disp := fs.disp
+ if naming.IsReserved(suffix) {
+ disp = fs.server.dispReserved
+ } else if fs.server.isLeaf && suffix != "" {
+ innerErr := verror.New(errUnexpectedSuffix, fs.ctx, suffix)
+ return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix, innerErr)
+ }
+ if disp != nil {
+ obj, auth, err := disp.Lookup(suffix)
+ switch {
+ case err != nil:
+ return nil, nil, err
+ case obj != nil:
+ invoker, err := objectToInvoker(obj)
+ if err != nil {
+ return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err)
+ }
+ return invoker, auth, nil
+ }
+ }
+ return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
+}
+
+func (fs *xflowServer) initSecurity(req *rpc.Request) error {
+ // LocalPrincipal is nil which means we are operating under
+ // SecurityNone.
+ if fs.LocalPrincipal() == nil {
+ return nil
+ }
+
+ // If additional credentials are provided, make them available in the context
+ // Detect unusable blessings now, rather then discovering they are unusable on
+ // first use.
+ //
+ // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
+ // the server's identity as the blessing. Figure out what we want to do about
+ // this - should servers be able to assume that a blessing is something that
+ // does not have the authorizations that the server's own identity has?
+ if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
+ return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
+ }
+ fs.grantedBlessings = req.GrantedBlessings
+
+ var err error
+ if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
+ // When the server can't access the blessings cache, the client is not following
+ // protocol, so the server closes the VCs corresponding to the client endpoint.
+ // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
+ // of all VCs connected to the RemoteEndpoint.
+ // TODO(mattr): commented out for experiment.
+ //fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
+ return verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadBlessingsCache(fs.ctx, err))
+ }
+ // Verify that the blessings sent by the client in the request have the same public
+ // key as those sent by the client during VC establishment.
+ if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
+ return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
+ }
+ fs.ackBlessings = true
+
+ for _, d := range req.Discharges {
+ fs.discharges[d.ID()] = d
+ }
+ return nil
+}
+
+// Send implements the rpc.Stream method.
+func (fs *xflowServer) Send(item interface{}) error {
+ defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ // The empty response header indicates what follows is a streaming result.
+ if err := fs.enc.Encode(rpc.Response{}); err != nil {
+ return err
+ }
+ return fs.enc.Encode(item)
+}
+
+// Recv implements the rpc.Stream method.
+func (fs *xflowServer) Recv(itemptr interface{}) error {
+ defer apilog.LogCallf(nil, "itemptr=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ var req rpc.Request
+ if err := fs.dec.Decode(&req); err != nil {
+ return err
+ }
+ if req.EndStreamArgs {
+ fs.endStreamArgs = true
+ return io.EOF
+ }
+ return fs.dec.Decode(itemptr)
+}
+
+// Implementations of rpc.ServerCall and security.Call methods.
+
+func (fs *xflowServer) Security() security.Call {
+ //nologcall
+ return fs
+}
+func (fs *xflowServer) LocalDischarges() map[string]security.Discharge {
+ //nologcall
+ return fs.flow.LocalDischarges()
+}
+func (fs *xflowServer) RemoteDischarges() map[string]security.Discharge {
+ //nologcall
+ return fs.discharges
+}
+func (fs *xflowServer) Server() rpc.Server {
+ //nologcall
+ return fs.server
+}
+func (fs *xflowServer) Timestamp() time.Time {
+ //nologcall
+ return fs.starttime
+}
+func (fs *xflowServer) Method() string {
+ //nologcall
+ return fs.method
+}
+func (fs *xflowServer) MethodTags() []*vdl.Value {
+ //nologcall
+ return fs.tags
+}
+func (fs *xflowServer) Suffix() string {
+ //nologcall
+ return fs.suffix
+}
+func (fs *xflowServer) LocalPrincipal() security.Principal {
+ //nologcall
+ return fs.flow.LocalPrincipal()
+}
+func (fs *xflowServer) LocalBlessings() security.Blessings {
+ //nologcall
+ return fs.flow.LocalBlessings()
+}
+func (fs *xflowServer) RemoteBlessings() security.Blessings {
+ //nologcall
+ if !fs.clientBlessings.IsZero() {
+ return fs.clientBlessings
+ }
+ return fs.flow.RemoteBlessings()
+}
+func (fs *xflowServer) GrantedBlessings() security.Blessings {
+ //nologcall
+ return fs.grantedBlessings
+}
+func (fs *xflowServer) LocalEndpoint() naming.Endpoint {
+ //nologcall
+ return fs.flow.LocalEndpoint()
+}
+func (fs *xflowServer) RemoteEndpoint() naming.Endpoint {
+ //nologcall
+ return fs.flow.RemoteEndpoint()
+}
+
+type xproxyAuth struct {
+ s *xserver
+}
+
+func (a xproxyAuth) RPCStreamListenerOpt() {}
+
+func (a xproxyAuth) Login(proxy stream.Flow) (security.Blessings, []security.Discharge, error) {
+ var (
+ principal = a.s.principal
+ dc = a.s.dc
+ ctx = a.s.ctx
+ )
+ if principal == nil {
+ return security.Blessings{}, nil, nil
+ }
+ proxyNames, _ := security.RemoteBlessingNames(ctx, security.NewCall(&security.CallParams{
+ LocalPrincipal: principal,
+ RemoteBlessings: proxy.RemoteBlessings(),
+ RemoteDischarges: proxy.RemoteDischarges(),
+ RemoteEndpoint: proxy.RemoteEndpoint(),
+ LocalEndpoint: proxy.LocalEndpoint(),
+ }))
+ blessings := principal.BlessingStore().ForPeer(proxyNames...)
+ tpc := blessings.ThirdPartyCaveats()
+ if len(tpc) == 0 {
+ return blessings, nil, nil
+ }
+ // Set DischargeImpetus.Server = proxyNames.
+ // See https://v.io/i/392
+ discharges := dc.PrepareDischarges(ctx, tpc, security.DischargeImpetus{})
+ return blessings, discharges, nil
+}
+
+var _ manager.ProxyAuthenticator = proxyAuth{}