ipc/stream: add idle timer to vc/vifs
https://github.com/veyron/release-issues/issues/847
As described in this issue, this CL
* Closes VC if there is no active user flow for X duration, and
* Closes VIF as soon as all VCs created on it have been closed.
The default timeout is 30 minutes for now.
Change-Id: Ibe5f4a22e566cfdd1c13b1d9eea28f352976c052
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 64f9d58..8d6e2f8 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -23,6 +23,7 @@
inaming "v.io/x/ref/profiles/internal/naming"
"v.io/x/ref/profiles/internal/rpc/stream"
"v.io/x/ref/profiles/internal/rpc/stream/crypto"
+ "v.io/x/ref/profiles/internal/rpc/stream/vc"
"v.io/x/ref/profiles/internal/rpc/stream/vif"
"v.io/x/ref/profiles/internal/rpc/version"
)
@@ -33,6 +34,10 @@
errNoBlessingNames = errors.New("stream.ListenerOpts includes a principal but no blessing names could be extracted")
)
+const (
+ defaultIdleTimeout = 30 * time.Minute
+)
+
// InternalNew creates a new stream.Manager for managing streams where the local
// process is identified by the provided RoutingID.
//
@@ -85,11 +90,10 @@
var timeout time.Duration
for _, o := range opts {
switch v := o.(type) {
- case *DialTimeout:
+ case DialTimeout:
timeout = v.Duration
}
}
-
addr := remote.Addr()
network, address := addr.Network(), addr.String()
if vf := m.vifs.Find(network, address); vf != nil {
@@ -119,7 +123,7 @@
vRange = r
}
}
- vf, err := vif.InternalNewDialedVIF(conn, m.rid, principal, vRange, opts...)
+ vf, err := vif.InternalNewDialedVIF(conn, m.rid, principal, vRange, m.deleteVIF, opts...)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to create VIF: %v", err)
@@ -144,13 +148,12 @@
if err != nil {
return nil, err
}
- vc, err := vf.Dial(remote, principal, append(opts, m.sessionCache)...)
+ opts = append([]stream.VCOpt{m.sessionCache, vc.IdleTimeout{defaultIdleTimeout}}, opts...)
+ vc, err := vf.Dial(remote, principal, opts...)
if !retry || verror.ErrorID(err) != verror.ErrAborted.ID {
return vc, err
}
vf.Close()
- m.vifs.Delete(vf)
- vlog.VI(2).Infof("VIF %v is closed, removing from cache", vf)
}
return nil, verror.NewErrInternal(nil) // Not reached
}
@@ -224,6 +227,11 @@
return ln, ep, nil
}
+func (m *manager) deleteVIF(vf *vif.VIF) {
+ vlog.VI(2).Infof("%p: VIF %v is closed, removing from cache", m, vf)
+ m.vifs.Delete(vf)
+}
+
func (m *manager) ShutdownEndpoint(remote naming.Endpoint) {
vifs := m.vifs.List()
total := 0