ref/runtime/internal/rpc: Implement typeflows for xclient/xserver.
Change-Id: I2af40e09871abd1dc38ec0cd20029518e70e9d72
diff --git a/runtime/internal/rpc/typecache.go b/runtime/internal/rpc/typecache.go
new file mode 100644
index 0000000..c734913
--- /dev/null
+++ b/runtime/internal/rpc/typecache.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "sync"
+
+ "v.io/v23/flow"
+ "v.io/v23/vom"
+)
+
+type typeCacheEntry struct {
+ enc *vom.TypeEncoder
+ dec *vom.TypeDecoder
+ writer bool
+ ready chan struct{}
+}
+
+const typeCacheCollectInterval = 1000
+
+type typeCache struct {
+ mu sync.Mutex
+ flows map[flow.ManagedConn]*typeCacheEntry
+ writes int
+}
+
+func newTypeCache() *typeCache {
+ return &typeCache{flows: make(map[flow.ManagedConn]*typeCacheEntry)}
+}
+
+func (tc *typeCache) writer(c flow.ManagedConn) (write func(flow.Flow)) {
+ tc.mu.Lock()
+ tce := tc.flows[c]
+ if tce == nil {
+ tce = &typeCacheEntry{ready: make(chan struct{})}
+ tc.flows[c] = tce
+ }
+ if !tce.writer {
+ // TODO(mattr): This is a very course garbage collection policy.
+ // Every 1000 connections we clean out expired entries.
+ if tc.writes++; tc.writes%typeCacheCollectInterval == 0 {
+ go tc.collect()
+ }
+ tce.writer = true
+ write = func(f flow.Flow) {
+ tce.enc = vom.NewTypeEncoder(f)
+ tce.dec = vom.NewTypeDecoder(f)
+ close(tce.ready)
+ }
+ }
+ tc.mu.Unlock()
+ return
+}
+
+func (tc *typeCache) get(c flow.ManagedConn) (*vom.TypeEncoder, *vom.TypeDecoder) {
+ tc.mu.Lock()
+ tce := tc.flows[c]
+ if tce == nil {
+ tce = &typeCacheEntry{ready: make(chan struct{})}
+ tc.flows[c] = tce
+ }
+ tc.mu.Unlock()
+ <-tce.ready
+ return tce.enc, tce.dec
+}
+
+func (tc *typeCache) collect() {
+ tc.mu.Lock()
+ conns := make([]flow.ManagedConn, len(tc.flows))
+ i := 0
+ for c, _ := range tc.flows {
+ conns[i] = c
+ i++
+ }
+ tc.mu.Unlock()
+
+ last := 0
+ for _, c := range conns {
+ select {
+ case <-c.Closed():
+ conns[last] = c
+ last++
+ default:
+ }
+ }
+
+ if last > 0 {
+ tc.mu.Lock()
+ for idx := 0; idx < last; idx++ {
+ delete(tc.flows, conns[idx])
+ }
+ tc.mu.Unlock()
+ }
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index afde0e4..87d749f 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -28,6 +28,11 @@
inaming "v.io/x/ref/runtime/internal/naming"
)
+const (
+ dataFlow = 'd'
+ typeFlow = 't'
+)
+
type xclient struct {
flowMgr flow.Manager
ns namespace.T
@@ -39,6 +44,9 @@
// directly.
ipNets []*net.IPNet
+ // typeCache maintains a cache of type encoders and decoders.
+ typeCache *typeCache
+
wg sync.WaitGroup
mu sync.Mutex
closed bool
@@ -48,8 +56,9 @@
func NewXClient(ctx *context.T, fm flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
c := &xclient{
- flowMgr: fm,
- ns: ns,
+ flowMgr: fm,
+ ns: ns,
+ typeCache: newTypeCache(),
}
ipNets, err := ipNetworks()
if err != nil {
@@ -118,6 +127,8 @@
server, suffix string
flow flow.Flow
serverErr *verror.SubErr
+ typeEnc *vom.TypeEncoder
+ typeDec *vom.TypeDecoder
}
// tryCreateFlow attempts to establish a Flow to "server" (which must be a
@@ -157,13 +168,27 @@
status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
return
}
- flow, err := c.flowMgr.Dial(ctx, ep, blessingsForPeer{auth, method, suffix}.run)
- if err != nil {
+ bfp := blessingsForPeer{auth, method, suffix}.run
+ if status.flow, err = c.flowMgr.Dial(ctx, ep, bfp); err != nil {
ctx.VI(2).Infof("rpc: failed to create Flow with %v: %v", server, err)
status.serverErr = suberr(err)
return
}
- status.flow = flow
+ if write := c.typeCache.writer(status.flow.Conn()); write != nil {
+ if tflow, err := c.flowMgr.Dial(ctx, ep, bfp); err != nil {
+ ctx.VI(2).Infof("rpc: failed to create type Flow with %v: %v", server, err)
+ status.serverErr = suberr(err)
+ return
+ } else if _, err = tflow.Write([]byte{typeFlow}); err != nil {
+ ctx.VI(2).Infof("rpc: Failed to write type byte. %v: %v", server, err)
+ tflow.Close()
+ status.serverErr = suberr(err)
+ return
+ } else {
+ write(tflow)
+ }
+ }
+ status.typeEnc, status.typeDec = c.typeCache.get(status.flow.Conn())
}
type blessingsForPeer struct {
@@ -295,7 +320,7 @@
continue
}
- fc, err := newFlowXClient(ctx, r.flow)
+ fc, err := newFlowXClient(ctx, r.flow, r.typeEnc, r.typeDec)
if err != nil {
return nil, verror.NoRetry, false, err
}
@@ -441,14 +466,17 @@
var _ rpc.ClientCall = (*flowXClient)(nil)
var _ rpc.Stream = (*flowXClient)(nil)
-func newFlowXClient(ctx *context.T, flow flow.Flow) (*flowXClient, error) {
+func newFlowXClient(ctx *context.T, flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowXClient, error) {
+ if _, err := flow.Write([]byte{dataFlow}); err != nil {
+ flow.Close()
+ return nil, err
+ }
fc := &flowXClient{
ctx: ctx,
flow: flow,
- dec: vom.NewDecoder(flow),
- enc: vom.NewEncoder(flow),
+ dec: vom.NewDecoderWithTypeDecoder(flow, typeDec),
+ enc: vom.NewEncoderWithTypeEncoder(flow, typeEnc),
}
- // TODO(toddw): Add logic to create separate type flows!
return fc, nil
}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 1bdf4a4..8b3b892 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -53,6 +53,7 @@
blessings security.Blessings
protoEndpoints []*inaming.Endpoint
chosenEndpoints []*inaming.Endpoint
+ typeCache *typeCache
// state of proxies keyed by the name of the proxy
proxies map[string]proxyState
@@ -109,6 +110,7 @@
settingsPublisher: settingsPublisher,
settingsName: settingsName,
disp: dispatcher,
+ typeCache: newTypeCache(),
}
ipNets, err := ipNetworks()
if err != nil {
@@ -294,19 +296,34 @@
calls.Add(1)
go func(fl flow.Flow) {
defer calls.Done()
- fs, err := newXFlowServer(fl, s)
- if err != nil {
- s.ctx.VI(1).Infof("newFlowServer on %v failed", err)
+ var ty [1]byte
+ if _, err := io.ReadFull(fl, ty[:]); err != nil {
+ s.ctx.VI(1).Infof("failed to read flow type: %v", err)
return
}
- if err := fs.serve(); err != nil {
- // TODO(caprita): Logging errors here is too spammy. For example, "not
- // authorized" errors shouldn't be logged as server errors.
- // TODO(cnicolaou): revisit this when verror2 transition is
- // done.
- if err != io.EOF {
- s.ctx.VI(2).Infof("Flow.serve failed: %v", err)
+ switch ty[0] {
+ case dataFlow:
+ fs, err := newXFlowServer(fl, s)
+ if err != nil {
+ s.ctx.VI(1).Infof("newFlowServer failed %v", err)
+ return
}
+ if err := fs.serve(); err != nil {
+ // TODO(caprita): Logging errors here is too spammy. For example, "not
+ // authorized" errors shouldn't be logged as server errors.
+ // TODO(cnicolaou): revisit this when verror2 transition is
+ // done.
+ if err != io.EOF {
+ s.ctx.VI(2).Infof("Flow.serve failed: %v", err)
+ }
+ }
+ case typeFlow:
+ write := s.typeCache.writer(fl.Conn())
+ if write == nil {
+ s.ctx.VI(1).Infof("ignoring duplicate type flow.")
+ return
+ }
+ write(fl)
}
}(fl)
}
@@ -452,14 +469,14 @@
server.Lock()
disp := server.disp
server.Unlock()
-
+ typeEnc, typeDec := server.typeCache.get(flow.Conn())
fs := &xflowServer{
ctx: server.ctx,
server: server,
disp: disp,
flow: flow,
- enc: vom.NewEncoder(flow),
- dec: vom.NewDecoder(flow),
+ enc: vom.NewEncoderWithTypeEncoder(flow, typeEnc),
+ dec: vom.NewDecoderWithTypeDecoder(flow, typeDec),
discharges: make(map[string]security.Discharge),
}
// TODO(toddw): Add logic to create separate type flows!