TBR vom: enable separate vom type stream
This change enables a separate vom type flow to support VC-level
VOM type cache.
* rpc micro benchmark shows 2X latency improvement.
* This is a part of issues/1221
(I will not submit this until I submit the idle timout CL, since
I may need to change this CL a little bit like TypeFlowID.)
MultiPart: 1/2
Change-Id: I28f707de78ebcffb321155fe907ce9aa7858088f
diff --git a/profiles/internal/rpc/client.go b/profiles/internal/rpc/client.go
index 9896aba..0dbad10 100644
--- a/profiles/internal/rpc/client.go
+++ b/profiles/internal/rpc/client.go
@@ -701,13 +701,26 @@
dc: dc,
}
var err error
- if fc.enc, err = vom.NewEncoder(flow); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
- return nil, fc.close(berr)
- }
- if fc.dec, err = vom.NewDecoder(flow); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
- return nil, fc.close(berr)
+ typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+ if typeenc == nil {
+ if fc.enc, err = vom.NewEncoder(flow); err != nil {
+ berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
+ return nil, fc.close(berr)
+ }
+ if fc.dec, err = vom.NewDecoder(flow); err != nil {
+ berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
+ return nil, fc.close(berr)
+ }
+ } else {
+ if fc.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
+ berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
+ return nil, fc.close(berr)
+ }
+ typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+ if fc.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
+ berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
+ return nil, fc.close(berr)
+ }
}
return fc, nil
}
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index b5550a2..a4b286d 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -954,13 +954,26 @@
// as a security.Call.
fs.T = security.SetCall(fs.T, fs)
var err error
- if fs.dec, err = vom.NewDecoder(flow); err != nil {
- flow.Close()
- return nil, err
- }
- if fs.enc, err = vom.NewEncoder(flow); err != nil {
- flow.Close()
- return nil, err
+ typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+ if typedec == nil {
+ if fs.dec, err = vom.NewDecoder(flow); err != nil {
+ flow.Close()
+ return nil, err
+ }
+ if fs.enc, err = vom.NewEncoder(flow); err != nil {
+ flow.Close()
+ return nil, err
+ }
+ } else {
+ if fs.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
+ flow.Close()
+ return nil, err
+ }
+ typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+ if fs.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
+ flow.Close()
+ return nil, err
+ }
}
return fs, nil
}
diff --git a/profiles/internal/rpc/stream/model.go b/profiles/internal/rpc/stream/model.go
index 409fd19..446fd86 100644
--- a/profiles/internal/rpc/stream/model.go
+++ b/profiles/internal/rpc/stream/model.go
@@ -57,6 +57,9 @@
// VCDataCache is a thread-safe store that allows data to be shared across a VC,
// with the intention of caching data that reappears over multiple flows.
type VCDataCache interface {
+ // Get returns the 'value' associated with 'key'.
+ Get(key interface{}) interface{}
+
// GetOrInsert returns the 'value' associated with 'key'. If an entry already exists in the
// cache with the 'key', the 'value' is returned, otherwise 'create' is called to create a new
// value N, the cache is updated, and N is returned. GetOrInsert may be called from
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 42f2339..3738181 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -747,8 +747,8 @@
}
}
-func (p *process) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
- return p.bq.NewWriter(packIDs(vci, fid), 0, vc.DefaultBytesBufferedPerFlow)
+func (p *process) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
+ return p.bq.NewWriter(packIDs(vci, fid), priority, vc.DefaultBytesBufferedPerFlow)
}
// Convenience functions to assist with the logging convention.
diff --git a/profiles/internal/rpc/stream/vc/data_cache.go b/profiles/internal/rpc/stream/vc/data_cache.go
index 74958a1..6c5b56c 100644
--- a/profiles/internal/rpc/stream/vc/data_cache.go
+++ b/profiles/internal/rpc/stream/vc/data_cache.go
@@ -18,6 +18,27 @@
return &dataCache{m: make(map[interface{}]interface{})}
}
+// Get returns the value stored under the key.
+func (c *dataCache) Get(key interface{}) interface{} {
+ c.RLock()
+ value, _ := c.m[key]
+ c.RUnlock()
+ return value
+}
+
+// Insert the given key and value into the cache if and only if the given key
+// did not already exist in the cache. Returns true if the key-value pair was
+// inserted; otherwise returns false.
+func (c *dataCache) Insert(key interface{}, value interface{}) bool {
+ c.Lock()
+ defer c.Unlock()
+ if _, exists := c.m[key]; exists {
+ return false
+ }
+ c.m[key] = value
+ return true
+}
+
// GetOrInsert first checks if the key exists in the cache with a reader lock.
// If it doesn't exist, it instead acquires a writer lock, creates and stores the new value
// with create and returns value.
diff --git a/profiles/internal/rpc/stream/vc/knobs.go b/profiles/internal/rpc/stream/vc/knobs.go
index 02450da..c2bdf5b 100644
--- a/profiles/internal/rpc/stream/vc/knobs.go
+++ b/profiles/internal/rpc/stream/vc/knobs.go
@@ -29,4 +29,6 @@
HandshakeFlowID = 1
// Special flow used for authenticating between VCs.
AuthFlowID = 2
+ // Special Flow ID used for interchanging of VOM types between VCs.
+ TypeFlowID = 3
)
diff --git a/profiles/internal/rpc/stream/vc/vc.go b/profiles/internal/rpc/stream/vc/vc.go
index 53f9b98..3774761 100644
--- a/profiles/internal/rpc/stream/vc/vc.go
+++ b/profiles/internal/rpc/stream/vc/vc.go
@@ -48,6 +48,10 @@
const DefaultServerDischargeExpiryBuffer = 20 * time.Second
+// DataCache Keys for TypeEncoder/Decoder.
+type TypeEncoderKey struct{}
+type TypeDecoderKey struct{}
+
// VC implements the stream.VC interface and exports additional methods to
// manage Flows.
//
@@ -139,9 +143,17 @@
// NewWriter creates a buffer queue for Write operations on the
// stream.Flow implementation.
- NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error)
+ NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error)
}
+// Priorities of flows.
+const (
+ systemFlowPriority bqueue.Priority = iota
+ normalFlowPriority
+
+ NumFlowPriorities
+)
+
// DischargeClient is an interface for obtaining discharges for a set of third-party
// caveats.
//
@@ -201,11 +213,11 @@
// Connect implements the stream.Connector.Connect method.
func (vc *VC) Connect(opts ...stream.FlowOpt) (stream.Flow, error) {
- return vc.connectFID(vc.allocFID(), opts...)
+ return vc.connectFID(vc.allocFID(), normalFlowPriority, opts...)
}
-func (vc *VC) connectFID(fid id.Flow, opts ...stream.FlowOpt) (stream.Flow, error) {
- writer, err := vc.newWriter(fid)
+func (vc *VC) connectFID(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.Flow, error) {
+ writer, err := vc.newWriter(fid, priority)
if err != nil {
return nil, fmt.Errorf("failed to create writer for Flow: %v", err)
}
@@ -296,7 +308,15 @@
if _, exists := vc.flowMap[fid]; exists {
return errDuplicateFlow
}
- writer, err := vc.newWriter(fid)
+ priority := normalFlowPriority
+ // We use the same high priority for all reserved flows including handshake and
+ // authentication flows. This is because client may open a new system flow before
+ // authentication finishes in server side and then vc.DispatchPayload() can be
+ // stuck in waiting for authentication to finish.
+ if fid < NumReservedFlows {
+ priority = systemFlowPriority
+ }
+ writer, err := vc.newWriter(fid, priority)
if err != nil {
return fmt.Errorf("failed to create writer for new flow(%d): %v", fid, err)
}
@@ -414,7 +434,7 @@
}
// Establish TLS
- handshakeConn, err := vc.connectFID(HandshakeFlowID)
+ handshakeConn, err := vc.connectFID(HandshakeFlowID, systemFlowPriority)
if err != nil {
return vc.err(fmt.Errorf("failed to create a Flow for setting up TLS: %v", err))
}
@@ -432,7 +452,7 @@
// This is not a problem when tls.Conn is used as intended (to wrap over a stream), but
// becomes a problem when shoehorning a block encrypter (Crypter interface) over this
// stream API.
- authConn, err := vc.connectFID(AuthFlowID)
+ authConn, err := vc.connectFID(AuthFlowID, systemFlowPriority)
if err != nil {
return vc.err(fmt.Errorf("failed to create a Flow for authentication: %v", err))
}
@@ -461,6 +481,11 @@
vc.remoteDischarges = rDischarges
vc.mu.Unlock()
+ // Open system flows.
+ if err = vc.connectSystemFlows(); err != nil {
+ return vc.err(fmt.Errorf("failed to connect system flows: %v", err))
+ }
+
vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
return nil
}
@@ -568,14 +593,20 @@
close(vc.acceptHandshakeDone)
vc.acceptHandshakeDone = nil
vc.mu.Unlock()
- vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
- result <- HandshakeResult{ln, nil}
if len(lBlessings.ThirdPartyCaveats()) > 0 {
go vc.sendDischargesLoop(authConn, dischargeClient, lBlessings.ThirdPartyCaveats(), dischargeExpiryBuffer)
} else {
authConn.Close()
}
+
+ // Accept system flows.
+ if err = vc.acceptSystemFlows(ln); err != nil {
+ sendErr(fmt.Errorf("failed to accept system flows: %v", err))
+ }
+
+ vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
+ result <- HandshakeResult{ln, nil}
}()
return result
}
@@ -665,6 +696,52 @@
}
}
+func (vc *VC) connectSystemFlows() error {
+ if vc.version < version.RPCVersion8 {
+ return nil
+ }
+ conn, err := vc.connectFID(TypeFlowID, systemFlowPriority)
+ if err != nil {
+ return fmt.Errorf("fail to create a Flow for wire type: %v", err)
+ }
+ typeEnc, err := vom.NewTypeEncoder(conn)
+ if err != nil {
+ conn.Close()
+ return fmt.Errorf("failed to create type encoder: %v", err)
+ }
+ vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
+ typeDec, err := vom.NewTypeDecoder(conn)
+ if err != nil {
+ conn.Close()
+ return fmt.Errorf("failed to create type decoder: %v", err)
+ }
+ vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
+ return nil
+}
+
+func (vc *VC) acceptSystemFlows(ln stream.Listener) error {
+ if vc.version < version.RPCVersion8 {
+ return nil
+ }
+ conn, err := ln.Accept()
+ if err != nil {
+ return fmt.Errorf("Flow for wire type not accepted: %v", err)
+ }
+ typeDec, err := vom.NewTypeDecoder(conn)
+ if err != nil {
+ conn.Close()
+ return fmt.Errorf("failed to create type decoder: %v", err)
+ }
+ vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
+ typeEnc, err := vom.NewTypeEncoder(conn)
+ if err != nil {
+ conn.Close()
+ return fmt.Errorf("failed to create type encoder: %v", err)
+ }
+ vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
+ return nil
+}
+
// Encrypt uses the VC's encryption scheme to encrypt the provided data payload.
// Always takes ownership of plaintext.
func (vc *VC) Encrypt(fid id.Flow, plaintext *iobuf.Slice) (cipherslice *iobuf.Slice, err error) {
@@ -813,8 +890,8 @@
return strings.Join(l, "\n")
}
-func (vc *VC) newWriter(fid id.Flow) (*writer, error) {
- bq, err := vc.helper.NewWriter(vc.vci, fid)
+func (vc *VC) newWriter(fid id.Flow, priority bqueue.Priority) (*writer, error) {
+ bq, err := vc.helper.NewWriter(vc.vci, fid, priority)
if err != nil {
return nil, err
}
diff --git a/profiles/internal/rpc/stream/vc/vc_test.go b/profiles/internal/rpc/stream/vc/vc_test.go
index 126ad44..958b6ad 100644
--- a/profiles/internal/rpc/stream/vc/vc_test.go
+++ b/profiles/internal/rpc/stream/vc/vc_test.go
@@ -269,8 +269,8 @@
}
}
-func testConnect_Small(t *testing.T, securityLevel options.SecurityLevel) {
- h, vc, err := NewSimple(LatestVersion, securityLevel)
+func testConnect_Small(t *testing.T, version version.RPCVersion, securityLevel options.SecurityLevel) {
+ h, vc, err := NewSimple(version, securityLevel)
if err != nil {
t.Fatal(err)
}
@@ -281,8 +281,12 @@
}
testFlowEcho(t, flow, 10)
}
-func TestConnect_Small(t *testing.T) { testConnect_Small(t, SecurityNone) }
-func TestConnect_SmallTLS(t *testing.T) { testConnect_Small(t, SecurityTLS) }
+func TestConnect_Small(t *testing.T) { testConnect_Small(t, LatestVersion, SecurityNone) }
+func TestConnect_SmallTLS(t *testing.T) { testConnect_Small(t, LatestVersion, SecurityTLS) }
+func TestConnect_Small7(t *testing.T) { testConnect_Small(t, version.RPCVersion7, SecurityNone) }
+func TestConnect_Small7TLS(t *testing.T) { testConnect_Small(t, version.RPCVersion7, SecurityTLS) }
+func TestConnect_Small8(t *testing.T) { testConnect_Small(t, version.RPCVersion8, SecurityNone) }
+func TestConnect_Small8TLS(t *testing.T) { testConnect_Small(t, version.RPCVersion8, SecurityTLS) }
func testConnect(t *testing.T, securityLevel options.SecurityLevel) {
h, vc, err := NewSimple(LatestVersion, securityLevel)
@@ -585,8 +589,8 @@
}
}
-func (h *helper) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
- return h.bq.NewWriter(bqueue.ID(fid), 0, DefaultBytesBufferedPerFlow)
+func (h *helper) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
+ return h.bq.NewWriter(bqueue.ID(fid), priority, DefaultBytesBufferedPerFlow)
}
func (h *helper) Close() {
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index 3f6701a..670721d 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -118,10 +118,13 @@
const (
// Priorities of the buffered queues used for flow control of writes.
expressPriority bqueue.Priority = iota
+ controlPriority
+ // The range of flow priorities is [flowPriority, flowPriority + NumFlowPriorities)
flowPriority
- normalPriority
- stopPriority
+ stopPriority = flowPriority + vc.NumFlowPriorities
+)
+const (
// Convenience aliases so that the package name "vc" does not
// conflict with the variables named "vc".
defaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow
@@ -202,7 +205,7 @@
}
expressQ.Release(-1) // Disable flow control
- flowQ, err := outgoing.NewWriter(flowID, flowPriority, flowToken.Size())
+ flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size())
if err != nil {
return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err)
}
@@ -964,9 +967,9 @@
h.vif.flowQ.TryPut(flowToken)
}
-func (h vcHelper) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
+func (h vcHelper) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
h.vif.idleTimerMap.InsertFlow(vci, fid)
- return h.vif.outgoing.NewWriter(packIDs(vci, fid), normalPriority, defaultBytesBufferedPerFlow)
+ return h.vif.outgoing.NewWriter(packIDs(vci, fid), flowPriority+priority, defaultBytesBufferedPerFlow)
}
// The token added to vif.flowQ.
diff --git a/profiles/internal/rpc/version/version.go b/profiles/internal/rpc/version/version.go
index 4ea452e..f022d39 100644
--- a/profiles/internal/rpc/version/version.go
+++ b/profiles/internal/rpc/version/version.go
@@ -25,7 +25,7 @@
// change that's not both forward and backward compatible.
// Min should be incremented whenever we want to remove
// support for old protocol versions.
- SupportedRange = &Range{Min: version.RPCVersion5, Max: version.RPCVersion7}
+ SupportedRange = &Range{Min: version.RPCVersion5, Max: version.RPCVersion8}
// Export the methods on supportedRange.
Endpoint = SupportedRange.Endpoint