ref/runtime/internal/rpc: Implement typeflows for xclient/xserver.

Change-Id: I2af40e09871abd1dc38ec0cd20029518e70e9d72
diff --git a/runtime/internal/rpc/typecache.go b/runtime/internal/rpc/typecache.go
new file mode 100644
index 0000000..c734913
--- /dev/null
+++ b/runtime/internal/rpc/typecache.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+	"sync"
+
+	"v.io/v23/flow"
+	"v.io/v23/vom"
+)
+
+type typeCacheEntry struct {
+	enc    *vom.TypeEncoder
+	dec    *vom.TypeDecoder
+	writer bool
+	ready  chan struct{}
+}
+
+const typeCacheCollectInterval = 1000
+
+type typeCache struct {
+	mu     sync.Mutex
+	flows  map[flow.ManagedConn]*typeCacheEntry
+	writes int
+}
+
+func newTypeCache() *typeCache {
+	return &typeCache{flows: make(map[flow.ManagedConn]*typeCacheEntry)}
+}
+
+func (tc *typeCache) writer(c flow.ManagedConn) (write func(flow.Flow)) {
+	tc.mu.Lock()
+	tce := tc.flows[c]
+	if tce == nil {
+		tce = &typeCacheEntry{ready: make(chan struct{})}
+		tc.flows[c] = tce
+	}
+	if !tce.writer {
+		// TODO(mattr): This is a very course garbage collection policy.
+		// Every 1000 connections we clean out expired entries.
+		if tc.writes++; tc.writes%typeCacheCollectInterval == 0 {
+			go tc.collect()
+		}
+		tce.writer = true
+		write = func(f flow.Flow) {
+			tce.enc = vom.NewTypeEncoder(f)
+			tce.dec = vom.NewTypeDecoder(f)
+			close(tce.ready)
+		}
+	}
+	tc.mu.Unlock()
+	return
+}
+
+func (tc *typeCache) get(c flow.ManagedConn) (*vom.TypeEncoder, *vom.TypeDecoder) {
+	tc.mu.Lock()
+	tce := tc.flows[c]
+	if tce == nil {
+		tce = &typeCacheEntry{ready: make(chan struct{})}
+		tc.flows[c] = tce
+	}
+	tc.mu.Unlock()
+	<-tce.ready
+	return tce.enc, tce.dec
+}
+
+func (tc *typeCache) collect() {
+	tc.mu.Lock()
+	conns := make([]flow.ManagedConn, len(tc.flows))
+	i := 0
+	for c, _ := range tc.flows {
+		conns[i] = c
+		i++
+	}
+	tc.mu.Unlock()
+
+	last := 0
+	for _, c := range conns {
+		select {
+		case <-c.Closed():
+			conns[last] = c
+			last++
+		default:
+		}
+	}
+
+	if last > 0 {
+		tc.mu.Lock()
+		for idx := 0; idx < last; idx++ {
+			delete(tc.flows, conns[idx])
+		}
+		tc.mu.Unlock()
+	}
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index afde0e4..87d749f 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -28,6 +28,11 @@
 	inaming "v.io/x/ref/runtime/internal/naming"
 )
 
+const (
+	dataFlow = 'd'
+	typeFlow = 't'
+)
+
 type xclient struct {
 	flowMgr            flow.Manager
 	ns                 namespace.T
@@ -39,6 +44,9 @@
 	// directly.
 	ipNets []*net.IPNet
 
+	// typeCache maintains a cache of type encoders and decoders.
+	typeCache *typeCache
+
 	wg     sync.WaitGroup
 	mu     sync.Mutex
 	closed bool
@@ -48,8 +56,9 @@
 
 func NewXClient(ctx *context.T, fm flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
 	c := &xclient{
-		flowMgr: fm,
-		ns:      ns,
+		flowMgr:   fm,
+		ns:        ns,
+		typeCache: newTypeCache(),
 	}
 	ipNets, err := ipNetworks()
 	if err != nil {
@@ -118,6 +127,8 @@
 	server, suffix string
 	flow           flow.Flow
 	serverErr      *verror.SubErr
+	typeEnc        *vom.TypeEncoder
+	typeDec        *vom.TypeDecoder
 }
 
 // tryCreateFlow attempts to establish a Flow to "server" (which must be a
@@ -157,13 +168,27 @@
 		status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
 		return
 	}
-	flow, err := c.flowMgr.Dial(ctx, ep, blessingsForPeer{auth, method, suffix}.run)
-	if err != nil {
+	bfp := blessingsForPeer{auth, method, suffix}.run
+	if status.flow, err = c.flowMgr.Dial(ctx, ep, bfp); err != nil {
 		ctx.VI(2).Infof("rpc: failed to create Flow with %v: %v", server, err)
 		status.serverErr = suberr(err)
 		return
 	}
-	status.flow = flow
+	if write := c.typeCache.writer(status.flow.Conn()); write != nil {
+		if tflow, err := c.flowMgr.Dial(ctx, ep, bfp); err != nil {
+			ctx.VI(2).Infof("rpc: failed to create type Flow with %v: %v", server, err)
+			status.serverErr = suberr(err)
+			return
+		} else if _, err = tflow.Write([]byte{typeFlow}); err != nil {
+			ctx.VI(2).Infof("rpc: Failed to write type byte. %v: %v", server, err)
+			tflow.Close()
+			status.serverErr = suberr(err)
+			return
+		} else {
+			write(tflow)
+		}
+	}
+	status.typeEnc, status.typeDec = c.typeCache.get(status.flow.Conn())
 }
 
 type blessingsForPeer struct {
@@ -295,7 +320,7 @@
 				continue
 			}
 
-			fc, err := newFlowXClient(ctx, r.flow)
+			fc, err := newFlowXClient(ctx, r.flow, r.typeEnc, r.typeDec)
 			if err != nil {
 				return nil, verror.NoRetry, false, err
 			}
@@ -441,14 +466,17 @@
 var _ rpc.ClientCall = (*flowXClient)(nil)
 var _ rpc.Stream = (*flowXClient)(nil)
 
-func newFlowXClient(ctx *context.T, flow flow.Flow) (*flowXClient, error) {
+func newFlowXClient(ctx *context.T, flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowXClient, error) {
+	if _, err := flow.Write([]byte{dataFlow}); err != nil {
+		flow.Close()
+		return nil, err
+	}
 	fc := &flowXClient{
 		ctx:  ctx,
 		flow: flow,
-		dec:  vom.NewDecoder(flow),
-		enc:  vom.NewEncoder(flow),
+		dec:  vom.NewDecoderWithTypeDecoder(flow, typeDec),
+		enc:  vom.NewEncoderWithTypeEncoder(flow, typeEnc),
 	}
-	// TODO(toddw): Add logic to create separate type flows!
 	return fc, nil
 }
 
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 1bdf4a4..8b3b892 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -53,6 +53,7 @@
 	blessings         security.Blessings
 	protoEndpoints    []*inaming.Endpoint
 	chosenEndpoints   []*inaming.Endpoint
+	typeCache         *typeCache
 
 	// state of proxies keyed by the name of the proxy
 	proxies map[string]proxyState
@@ -109,6 +110,7 @@
 		settingsPublisher: settingsPublisher,
 		settingsName:      settingsName,
 		disp:              dispatcher,
+		typeCache:         newTypeCache(),
 	}
 	ipNets, err := ipNetworks()
 	if err != nil {
@@ -294,19 +296,34 @@
 		calls.Add(1)
 		go func(fl flow.Flow) {
 			defer calls.Done()
-			fs, err := newXFlowServer(fl, s)
-			if err != nil {
-				s.ctx.VI(1).Infof("newFlowServer on %v failed", err)
+			var ty [1]byte
+			if _, err := io.ReadFull(fl, ty[:]); err != nil {
+				s.ctx.VI(1).Infof("failed to read flow type: %v", err)
 				return
 			}
-			if err := fs.serve(); err != nil {
-				// TODO(caprita): Logging errors here is too spammy. For example, "not
-				// authorized" errors shouldn't be logged as server errors.
-				// TODO(cnicolaou): revisit this when verror2 transition is
-				// done.
-				if err != io.EOF {
-					s.ctx.VI(2).Infof("Flow.serve failed: %v", err)
+			switch ty[0] {
+			case dataFlow:
+				fs, err := newXFlowServer(fl, s)
+				if err != nil {
+					s.ctx.VI(1).Infof("newFlowServer failed %v", err)
+					return
 				}
+				if err := fs.serve(); err != nil {
+					// TODO(caprita): Logging errors here is too spammy. For example, "not
+					// authorized" errors shouldn't be logged as server errors.
+					// TODO(cnicolaou): revisit this when verror2 transition is
+					// done.
+					if err != io.EOF {
+						s.ctx.VI(2).Infof("Flow.serve failed: %v", err)
+					}
+				}
+			case typeFlow:
+				write := s.typeCache.writer(fl.Conn())
+				if write == nil {
+					s.ctx.VI(1).Infof("ignoring duplicate type flow.")
+					return
+				}
+				write(fl)
 			}
 		}(fl)
 	}
@@ -452,14 +469,14 @@
 	server.Lock()
 	disp := server.disp
 	server.Unlock()
-
+	typeEnc, typeDec := server.typeCache.get(flow.Conn())
 	fs := &xflowServer{
 		ctx:        server.ctx,
 		server:     server,
 		disp:       disp,
 		flow:       flow,
-		enc:        vom.NewEncoder(flow),
-		dec:        vom.NewDecoder(flow),
+		enc:        vom.NewEncoderWithTypeEncoder(flow, typeEnc),
+		dec:        vom.NewDecoderWithTypeDecoder(flow, typeDec),
 		discharges: make(map[string]security.Discharge),
 	}
 	// TODO(toddw): Add logic to create separate type flows!