TBR vom: enable separate vom type stream

  This change enables a separate vom type flow to support VC-level
  VOM type cache.

  * rpc micro benchmark shows 2X latency improvement.
  * This is a part of issues/1221

(I will not submit this until I submit the idle timout CL, since
 I may need to change this CL a little bit like TypeFlowID.)

MultiPart: 1/2
Change-Id: I28f707de78ebcffb321155fe907ce9aa7858088f
diff --git a/profiles/internal/rpc/client.go b/profiles/internal/rpc/client.go
index 9896aba..0dbad10 100644
--- a/profiles/internal/rpc/client.go
+++ b/profiles/internal/rpc/client.go
@@ -701,13 +701,26 @@
 		dc:     dc,
 	}
 	var err error
-	if fc.enc, err = vom.NewEncoder(flow); err != nil {
-		berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
-		return nil, fc.close(berr)
-	}
-	if fc.dec, err = vom.NewDecoder(flow); err != nil {
-		berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
-		return nil, fc.close(berr)
+	typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+	if typeenc == nil {
+		if fc.enc, err = vom.NewEncoder(flow); err != nil {
+			berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
+			return nil, fc.close(berr)
+		}
+		if fc.dec, err = vom.NewDecoder(flow); err != nil {
+			berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
+			return nil, fc.close(berr)
+		}
+	} else {
+		if fc.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
+			berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
+			return nil, fc.close(berr)
+		}
+		typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+		if fc.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
+			berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
+			return nil, fc.close(berr)
+		}
 	}
 	return fc, nil
 }
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index b5550a2..a4b286d 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -954,13 +954,26 @@
 	// as a security.Call.
 	fs.T = security.SetCall(fs.T, fs)
 	var err error
-	if fs.dec, err = vom.NewDecoder(flow); err != nil {
-		flow.Close()
-		return nil, err
-	}
-	if fs.enc, err = vom.NewEncoder(flow); err != nil {
-		flow.Close()
-		return nil, err
+	typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+	if typedec == nil {
+		if fs.dec, err = vom.NewDecoder(flow); err != nil {
+			flow.Close()
+			return nil, err
+		}
+		if fs.enc, err = vom.NewEncoder(flow); err != nil {
+			flow.Close()
+			return nil, err
+		}
+	} else {
+		if fs.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
+			flow.Close()
+			return nil, err
+		}
+		typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+		if fs.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
+			flow.Close()
+			return nil, err
+		}
 	}
 	return fs, nil
 }
diff --git a/profiles/internal/rpc/stream/model.go b/profiles/internal/rpc/stream/model.go
index 409fd19..446fd86 100644
--- a/profiles/internal/rpc/stream/model.go
+++ b/profiles/internal/rpc/stream/model.go
@@ -57,6 +57,9 @@
 // VCDataCache is a thread-safe store that allows data to be shared across a VC,
 // with the intention of caching data that reappears over multiple flows.
 type VCDataCache interface {
+	// Get returns the 'value' associated with 'key'.
+	Get(key interface{}) interface{}
+
 	// GetOrInsert returns the 'value' associated with 'key'. If an entry already exists in the
 	// cache with the 'key', the 'value' is returned, otherwise 'create' is called to create a new
 	// value N, the cache is updated, and N is returned.  GetOrInsert may be called from
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 42f2339..3738181 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -747,8 +747,8 @@
 	}
 }
 
-func (p *process) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
-	return p.bq.NewWriter(packIDs(vci, fid), 0, vc.DefaultBytesBufferedPerFlow)
+func (p *process) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
+	return p.bq.NewWriter(packIDs(vci, fid), priority, vc.DefaultBytesBufferedPerFlow)
 }
 
 // Convenience functions to assist with the logging convention.
diff --git a/profiles/internal/rpc/stream/vc/data_cache.go b/profiles/internal/rpc/stream/vc/data_cache.go
index 74958a1..6c5b56c 100644
--- a/profiles/internal/rpc/stream/vc/data_cache.go
+++ b/profiles/internal/rpc/stream/vc/data_cache.go
@@ -18,6 +18,27 @@
 	return &dataCache{m: make(map[interface{}]interface{})}
 }
 
+// Get returns the value stored under the key.
+func (c *dataCache) Get(key interface{}) interface{} {
+	c.RLock()
+	value, _ := c.m[key]
+	c.RUnlock()
+	return value
+}
+
+// Insert the given key and value into the cache if and only if the given key
+// did not already exist in the cache. Returns true if the key-value pair was
+// inserted; otherwise returns false.
+func (c *dataCache) Insert(key interface{}, value interface{}) bool {
+	c.Lock()
+	defer c.Unlock()
+	if _, exists := c.m[key]; exists {
+		return false
+	}
+	c.m[key] = value
+	return true
+}
+
 // GetOrInsert first checks if the key exists in the cache with a reader lock.
 // If it doesn't exist, it instead acquires a writer lock, creates and stores the new value
 // with create and returns value.
diff --git a/profiles/internal/rpc/stream/vc/knobs.go b/profiles/internal/rpc/stream/vc/knobs.go
index 02450da..c2bdf5b 100644
--- a/profiles/internal/rpc/stream/vc/knobs.go
+++ b/profiles/internal/rpc/stream/vc/knobs.go
@@ -29,4 +29,6 @@
 	HandshakeFlowID = 1
 	// Special flow used for authenticating between VCs.
 	AuthFlowID = 2
+	// Special Flow ID used for interchanging of VOM types between VCs.
+	TypeFlowID = 3
 )
diff --git a/profiles/internal/rpc/stream/vc/vc.go b/profiles/internal/rpc/stream/vc/vc.go
index 53f9b98..3774761 100644
--- a/profiles/internal/rpc/stream/vc/vc.go
+++ b/profiles/internal/rpc/stream/vc/vc.go
@@ -48,6 +48,10 @@
 
 const DefaultServerDischargeExpiryBuffer = 20 * time.Second
 
+// DataCache Keys for TypeEncoder/Decoder.
+type TypeEncoderKey struct{}
+type TypeDecoderKey struct{}
+
 // VC implements the stream.VC interface and exports additional methods to
 // manage Flows.
 //
@@ -139,9 +143,17 @@
 
 	// NewWriter creates a buffer queue for Write operations on the
 	// stream.Flow implementation.
-	NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error)
+	NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error)
 }
 
+// Priorities of flows.
+const (
+	systemFlowPriority bqueue.Priority = iota
+	normalFlowPriority
+
+	NumFlowPriorities
+)
+
 // DischargeClient is an interface for obtaining discharges for a set of third-party
 // caveats.
 //
@@ -201,11 +213,11 @@
 
 // Connect implements the stream.Connector.Connect method.
 func (vc *VC) Connect(opts ...stream.FlowOpt) (stream.Flow, error) {
-	return vc.connectFID(vc.allocFID(), opts...)
+	return vc.connectFID(vc.allocFID(), normalFlowPriority, opts...)
 }
 
-func (vc *VC) connectFID(fid id.Flow, opts ...stream.FlowOpt) (stream.Flow, error) {
-	writer, err := vc.newWriter(fid)
+func (vc *VC) connectFID(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.Flow, error) {
+	writer, err := vc.newWriter(fid, priority)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create writer for Flow: %v", err)
 	}
@@ -296,7 +308,15 @@
 	if _, exists := vc.flowMap[fid]; exists {
 		return errDuplicateFlow
 	}
-	writer, err := vc.newWriter(fid)
+	priority := normalFlowPriority
+	// We use the same high priority for all reserved flows including handshake and
+	// authentication flows. This is because client may open a new system flow before
+	// authentication finishes in server side and then vc.DispatchPayload() can be
+	// stuck in waiting for authentication to finish.
+	if fid < NumReservedFlows {
+		priority = systemFlowPriority
+	}
+	writer, err := vc.newWriter(fid, priority)
 	if err != nil {
 		return fmt.Errorf("failed to create writer for new flow(%d): %v", fid, err)
 	}
@@ -414,7 +434,7 @@
 	}
 
 	// Establish TLS
-	handshakeConn, err := vc.connectFID(HandshakeFlowID)
+	handshakeConn, err := vc.connectFID(HandshakeFlowID, systemFlowPriority)
 	if err != nil {
 		return vc.err(fmt.Errorf("failed to create a Flow for setting up TLS: %v", err))
 	}
@@ -432,7 +452,7 @@
 	// This is not a problem when tls.Conn is used as intended (to wrap over a stream), but
 	// becomes a problem when shoehorning a block encrypter (Crypter interface) over this
 	// stream API.
-	authConn, err := vc.connectFID(AuthFlowID)
+	authConn, err := vc.connectFID(AuthFlowID, systemFlowPriority)
 	if err != nil {
 		return vc.err(fmt.Errorf("failed to create a Flow for authentication: %v", err))
 	}
@@ -461,6 +481,11 @@
 	vc.remoteDischarges = rDischarges
 	vc.mu.Unlock()
 
+	// Open system flows.
+	if err = vc.connectSystemFlows(); err != nil {
+		return vc.err(fmt.Errorf("failed to connect system flows: %v", err))
+	}
+
 	vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
 	return nil
 }
@@ -568,14 +593,20 @@
 		close(vc.acceptHandshakeDone)
 		vc.acceptHandshakeDone = nil
 		vc.mu.Unlock()
-		vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
-		result <- HandshakeResult{ln, nil}
 
 		if len(lBlessings.ThirdPartyCaveats()) > 0 {
 			go vc.sendDischargesLoop(authConn, dischargeClient, lBlessings.ThirdPartyCaveats(), dischargeExpiryBuffer)
 		} else {
 			authConn.Close()
 		}
+
+		// Accept system flows.
+		if err = vc.acceptSystemFlows(ln); err != nil {
+			sendErr(fmt.Errorf("failed to accept system flows: %v", err))
+		}
+
+		vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
+		result <- HandshakeResult{ln, nil}
 	}()
 	return result
 }
@@ -665,6 +696,52 @@
 	}
 }
 
+func (vc *VC) connectSystemFlows() error {
+	if vc.version < version.RPCVersion8 {
+		return nil
+	}
+	conn, err := vc.connectFID(TypeFlowID, systemFlowPriority)
+	if err != nil {
+		return fmt.Errorf("fail to create a Flow for wire type: %v", err)
+	}
+	typeEnc, err := vom.NewTypeEncoder(conn)
+	if err != nil {
+		conn.Close()
+		return fmt.Errorf("failed to create type encoder: %v", err)
+	}
+	vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
+	typeDec, err := vom.NewTypeDecoder(conn)
+	if err != nil {
+		conn.Close()
+		return fmt.Errorf("failed to create type decoder: %v", err)
+	}
+	vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
+	return nil
+}
+
+func (vc *VC) acceptSystemFlows(ln stream.Listener) error {
+	if vc.version < version.RPCVersion8 {
+		return nil
+	}
+	conn, err := ln.Accept()
+	if err != nil {
+		return fmt.Errorf("Flow for wire type not accepted: %v", err)
+	}
+	typeDec, err := vom.NewTypeDecoder(conn)
+	if err != nil {
+		conn.Close()
+		return fmt.Errorf("failed to create type decoder: %v", err)
+	}
+	vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
+	typeEnc, err := vom.NewTypeEncoder(conn)
+	if err != nil {
+		conn.Close()
+		return fmt.Errorf("failed to create type encoder: %v", err)
+	}
+	vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
+	return nil
+}
+
 // Encrypt uses the VC's encryption scheme to encrypt the provided data payload.
 // Always takes ownership of plaintext.
 func (vc *VC) Encrypt(fid id.Flow, plaintext *iobuf.Slice) (cipherslice *iobuf.Slice, err error) {
@@ -813,8 +890,8 @@
 	return strings.Join(l, "\n")
 }
 
-func (vc *VC) newWriter(fid id.Flow) (*writer, error) {
-	bq, err := vc.helper.NewWriter(vc.vci, fid)
+func (vc *VC) newWriter(fid id.Flow, priority bqueue.Priority) (*writer, error) {
+	bq, err := vc.helper.NewWriter(vc.vci, fid, priority)
 	if err != nil {
 		return nil, err
 	}
diff --git a/profiles/internal/rpc/stream/vc/vc_test.go b/profiles/internal/rpc/stream/vc/vc_test.go
index 126ad44..958b6ad 100644
--- a/profiles/internal/rpc/stream/vc/vc_test.go
+++ b/profiles/internal/rpc/stream/vc/vc_test.go
@@ -269,8 +269,8 @@
 	}
 }
 
-func testConnect_Small(t *testing.T, securityLevel options.SecurityLevel) {
-	h, vc, err := NewSimple(LatestVersion, securityLevel)
+func testConnect_Small(t *testing.T, version version.RPCVersion, securityLevel options.SecurityLevel) {
+	h, vc, err := NewSimple(version, securityLevel)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -281,8 +281,12 @@
 	}
 	testFlowEcho(t, flow, 10)
 }
-func TestConnect_Small(t *testing.T)    { testConnect_Small(t, SecurityNone) }
-func TestConnect_SmallTLS(t *testing.T) { testConnect_Small(t, SecurityTLS) }
+func TestConnect_Small(t *testing.T)     { testConnect_Small(t, LatestVersion, SecurityNone) }
+func TestConnect_SmallTLS(t *testing.T)  { testConnect_Small(t, LatestVersion, SecurityTLS) }
+func TestConnect_Small7(t *testing.T)    { testConnect_Small(t, version.RPCVersion7, SecurityNone) }
+func TestConnect_Small7TLS(t *testing.T) { testConnect_Small(t, version.RPCVersion7, SecurityTLS) }
+func TestConnect_Small8(t *testing.T)    { testConnect_Small(t, version.RPCVersion8, SecurityNone) }
+func TestConnect_Small8TLS(t *testing.T) { testConnect_Small(t, version.RPCVersion8, SecurityTLS) }
 
 func testConnect(t *testing.T, securityLevel options.SecurityLevel) {
 	h, vc, err := NewSimple(LatestVersion, securityLevel)
@@ -585,8 +589,8 @@
 	}
 }
 
-func (h *helper) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
-	return h.bq.NewWriter(bqueue.ID(fid), 0, DefaultBytesBufferedPerFlow)
+func (h *helper) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
+	return h.bq.NewWriter(bqueue.ID(fid), priority, DefaultBytesBufferedPerFlow)
 }
 
 func (h *helper) Close() {
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index 3f6701a..670721d 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -118,10 +118,13 @@
 const (
 	// Priorities of the buffered queues used for flow control of writes.
 	expressPriority bqueue.Priority = iota
+	controlPriority
+	// The range of flow priorities is [flowPriority, flowPriority + NumFlowPriorities)
 	flowPriority
-	normalPriority
-	stopPriority
+	stopPriority = flowPriority + vc.NumFlowPriorities
+)
 
+const (
 	// Convenience aliases so that the package name "vc" does not
 	// conflict with the variables named "vc".
 	defaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow
@@ -202,7 +205,7 @@
 	}
 	expressQ.Release(-1) // Disable flow control
 
-	flowQ, err := outgoing.NewWriter(flowID, flowPriority, flowToken.Size())
+	flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size())
 	if err != nil {
 		return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err)
 	}
@@ -964,9 +967,9 @@
 	h.vif.flowQ.TryPut(flowToken)
 }
 
-func (h vcHelper) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
+func (h vcHelper) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
 	h.vif.idleTimerMap.InsertFlow(vci, fid)
-	return h.vif.outgoing.NewWriter(packIDs(vci, fid), normalPriority, defaultBytesBufferedPerFlow)
+	return h.vif.outgoing.NewWriter(packIDs(vci, fid), flowPriority+priority, defaultBytesBufferedPerFlow)
 }
 
 // The token added to vif.flowQ.
diff --git a/profiles/internal/rpc/version/version.go b/profiles/internal/rpc/version/version.go
index 4ea452e..f022d39 100644
--- a/profiles/internal/rpc/version/version.go
+++ b/profiles/internal/rpc/version/version.go
@@ -25,7 +25,7 @@
 	// change that's not both forward and backward compatible.
 	// Min should be incremented whenever we want to remove
 	// support for old protocol versions.
-	SupportedRange = &Range{Min: version.RPCVersion5, Max: version.RPCVersion7}
+	SupportedRange = &Range{Min: version.RPCVersion5, Max: version.RPCVersion8}
 
 	// Export the methods on supportedRange.
 	Endpoint           = SupportedRange.Endpoint