ipc[/stream/vc]: Improvements to automatic discharge refreshing.

(1) DischargeExpiryBuffer is now configurable with an opt (not public API).
(2) Discharges will be used if they are not yet expired but their refreshing failed.

Change-Id: I42bcf119554c6b6524f17880c89a107a1a0b724b
diff --git a/profiles/internal/ipc/client.go b/profiles/internal/ipc/client.go
index 2a32fd4..8363bd4 100644
--- a/profiles/internal/ipc/client.go
+++ b/profiles/internal/ipc/client.go
@@ -124,7 +124,7 @@
 		ipNets:    ipNetworks(),
 		vcMap:     make(map[vcMapKey]*vcInfo),
 	}
-	c.dc = InternalNewDischargeClient(nil, c)
+	c.dc = InternalNewDischargeClient(nil, c, 0)
 	for _, opt := range opts {
 		// Collect all client opts that are also vc opts.
 		switch v := opt.(type) {
diff --git a/profiles/internal/ipc/discharges.go b/profiles/internal/ipc/discharges.go
index 3919a02..b45912a 100644
--- a/profiles/internal/ipc/discharges.go
+++ b/profiles/internal/ipc/discharges.go
@@ -22,16 +22,12 @@
 
 // discharger implements vc.DischargeClient.
 type dischargeClient struct {
-	c          ipc.Client
-	defaultCtx *context.T
-	cache      dischargeCache
+	c                     ipc.Client
+	defaultCtx            *context.T
+	cache                 dischargeCache
+	dischargeExpiryBuffer time.Duration
 }
 
-// TODO(suharshs): Should we make this configurable?
-// We make this shorter than the vc DischargeExpiryBuffer to ensure the discharges
-// are fetched when the VC needs them.
-const dischargeExpiryBuffer = vc.DischargeExpiryBuffer - (5 * time.Second)
-
 // InternalNewDischargeClient creates a vc.DischargeClient that will be used to
 // fetch discharges to support blessings presented to a remote process.
 //
@@ -39,11 +35,15 @@
 // PrepareDischarges call. This typically happens when fetching discharges on
 // behalf of a server accepting connections, i.e., before any notion of the
 // "context" of an API call has been established.
-func InternalNewDischargeClient(defaultCtx *context.T, client ipc.Client) vc.DischargeClient {
+// dischargeExpiryBuffer specifies how much before discharge expiration we should
+// refresh discharges.
+// Attempts will be made to refresh a discharge DischargeExpiryBuffer before they expire.
+func InternalNewDischargeClient(defaultCtx *context.T, client ipc.Client, dischargeExpiryBuffer time.Duration) vc.DischargeClient {
 	return &dischargeClient{
 		c:          client,
 		defaultCtx: defaultCtx,
 		cache:      dischargeCache{cache: make(map[string]security.Discharge)},
+		dischargeExpiryBuffer: dischargeExpiryBuffer,
 	}
 }
 
@@ -112,8 +112,7 @@
 		discharges := make(chan fetched, len(caveats))
 		want := 0
 		for i := range caveats {
-			if out[i] != nil {
-				// Already fetched
+			if !d.shouldFetchDischarge(out[i]) {
 				continue
 			}
 			want++
@@ -152,6 +151,17 @@
 	}
 }
 
+func (d *dischargeClient) shouldFetchDischarge(dis *security.Discharge) bool {
+	if dis == nil {
+		return true
+	}
+	expiry := dis.Expiry()
+	if expiry.IsZero() {
+		return false
+	}
+	return expiry.Before(time.Now().Add(d.dischargeExpiryBuffer))
+}
+
 // dischargeCache is a concurrency-safe cache for third party caveat discharges.
 // TODO(suharshs,ataly,ashankar): This should be keyed by filtered impetus as well.
 type dischargeCache struct {
@@ -183,8 +193,8 @@
 		}
 		if cached, exists := dcc.cache[caveats[i].ThirdPartyDetails().ID()]; exists {
 			out[i] = &cached
-			// If the discharge has expired purge it from the cache.
-			if !isDischargeUsable(out[i]) {
+			// If the discharge has expired, purge it from the cache.
+			if hasDischargeExpired(out[i]) {
 				out[i] = nil
 				delete(dcc.cache, cached.ID())
 				remaining++
@@ -197,14 +207,12 @@
 	return
 }
 
-// TODO(suharshs): Have PrepareDischarges try to fetch fresh discharges for the
-// discharges that are about to expire, but if they fail then return what is in the cache.
-func isDischargeUsable(dis *security.Discharge) bool {
+func hasDischargeExpired(dis *security.Discharge) bool {
 	expiry := dis.Expiry()
 	if expiry.IsZero() {
-		return true
+		return false
 	}
-	return expiry.After(time.Now().Add(dischargeExpiryBuffer))
+	return expiry.Before(time.Now())
 }
 
 func (dcc *dischargeCache) invalidate(discharges ...security.Discharge) {
diff --git a/profiles/internal/ipc/full_test.go b/profiles/internal/ipc/full_test.go
index 80a8be4..a6573ff 100644
--- a/profiles/internal/ipc/full_test.go
+++ b/profiles/internal/ipc/full_test.go
@@ -1803,7 +1803,7 @@
 		t.Fatalf("failed to create client: %v", err)
 	}
 	defer client.Close()
-	dc := InternalNewDischargeClient(ctx, client)
+	dc := InternalNewDischargeClient(ctx, client, 0)
 
 	// Fetch discharges for tpcav.
 	dis := dc.PrepareDischarges(nil, []security.Caveat{tpcav}, security.DischargeImpetus{})[0]
diff --git a/profiles/internal/ipc/server.go b/profiles/internal/ipc/server.go
index 48a318a..b4e52ee 100644
--- a/profiles/internal/ipc/server.go
+++ b/profiles/internal/ipc/server.go
@@ -170,8 +170,9 @@
 		stats:       newIPCStats(statsPrefix),
 	}
 	var (
-		principal security.Principal
-		blessings security.Blessings
+		principal             security.Principal
+		blessings             security.Blessings
+		dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
 	)
 	for _, opt := range opts {
 		switch opt := opt.(type) {
@@ -183,6 +184,8 @@
 				principal = opt.Principal
 			case options.ServerBlessings:
 				blessings = opt.Blessings
+			case vc.DischargeExpiryBuffer:
+				dischargeExpiryBuffer = time.Duration(opt)
 			}
 		case options.ServesMountTable:
 			s.servesMountTable = bool(opt)
@@ -192,7 +195,9 @@
 			s.preferredProtocols = []string(opt)
 		}
 	}
-	dc := InternalNewDischargeClient(ctx, client)
+	// Make dischargeExpiryBuffer shorter than the VC discharge buffer to ensure we have fetched
+	// the discharges by the time the VC asks for them.`
+	dc := InternalNewDischargeClient(ctx, client, dischargeExpiryBuffer-(5*time.Second))
 	s.listenerOpts = append(s.listenerOpts, dc)
 	s.listenerOpts = append(s.listenerOpts, vc.DialContext{ctx})
 	blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
diff --git a/profiles/internal/ipc/stream/vc/vc.go b/profiles/internal/ipc/stream/vc/vc.go
index 9d6ce00..15cf5c2 100644
--- a/profiles/internal/ipc/stream/vc/vc.go
+++ b/profiles/internal/ipc/stream/vc/vc.go
@@ -35,8 +35,15 @@
 	errUnrecognizedFlow = errors.New("unrecognized flow")
 )
 
-// TODO(suharshs): Should we make this configurable?
-const DischargeExpiryBuffer = 20 * time.Second
+// DischargeExpiryBuffer specifies how much before discharge expiration we should
+// refresh discharges.
+// Discharges will be refreshed DischargeExpiryBuffer before they expire.
+type DischargeExpiryBuffer time.Duration
+
+func (DischargeExpiryBuffer) IPCStreamListenerOpt() {}
+func (DischargeExpiryBuffer) IPCServerOpt()         {}
+
+const DefaultServerDischargeExpiryBuffer = 20 * time.Second
 
 // VC implements the stream.VC interface and exports additional methods to
 // manage Flows.
@@ -507,6 +514,8 @@
 		securityLevel   options.VCSecurityLevel
 		dischargeClient DischargeClient
 		lBlessings      security.Blessings
+
+		dischargeExpiryBuffer = DefaultServerDischargeExpiryBuffer
 	)
 	for _, o := range opts {
 		switch v := o.(type) {
@@ -518,6 +527,8 @@
 			securityLevel = v
 		case options.ServerBlessings:
 			lBlessings = v.Blessings
+		case DischargeExpiryBuffer:
+			dischargeExpiryBuffer = time.Duration(v)
 		}
 	}
 	// If the listener was setup asynchronously, there is a race between
@@ -597,7 +608,7 @@
 		result <- HandshakeResult{ln, nil}
 
 		if len(lBlessings.ThirdPartyCaveats()) > 0 {
-			go vc.sendDischargesLoop(authConn, dischargeClient, lBlessings.ThirdPartyCaveats())
+			go vc.sendDischargesLoop(authConn, dischargeClient, lBlessings.ThirdPartyCaveats(), dischargeExpiryBuffer)
 		} else {
 			authConn.Close()
 		}
@@ -605,7 +616,7 @@
 	return result
 }
 
-func (vc *VC) sendDischargesLoop(conn io.WriteCloser, dc DischargeClient, tpCavs []security.Caveat) {
+func (vc *VC) sendDischargesLoop(conn io.WriteCloser, dc DischargeClient, tpCavs []security.Caveat, dischargeExpiryBuffer time.Duration) {
 	defer conn.Close()
 	if dc == nil {
 		return
@@ -618,7 +629,7 @@
 	discharges := dc.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
 	for expiry := minExpiryTime(discharges, tpCavs); !expiry.IsZero(); expiry = minExpiryTime(discharges, tpCavs) {
 		select {
-		case <-time.After(fetchDuration(expiry)):
+		case <-time.After(fetchDuration(expiry, dischargeExpiryBuffer)):
 			discharges = dc.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
 			if err := enc.Encode(discharges); err != nil {
 				vlog.Errorf("encoding discharges on VC %v failed: %v", vc, err)
@@ -631,10 +642,10 @@
 	}
 }
 
-func fetchDuration(expiry time.Time) time.Duration {
+func fetchDuration(expiry time.Time, buffer time.Duration) time.Duration {
 	// Fetch the discharge earlier than the actual expiry to factor in for clock
 	// skew and RPC time.
-	return expiry.Sub(time.Now().Add(DischargeExpiryBuffer))
+	return expiry.Sub(time.Now().Add(buffer))
 }
 
 func minExpiryTime(discharges []security.Discharge, tpCavs []security.Caveat) time.Time {
diff --git a/profiles/internal/rt/ipc_test.go b/profiles/internal/rt/ipc_test.go
index 559e7ca..cf36848 100644
--- a/profiles/internal/rt/ipc_test.go
+++ b/profiles/internal/rt/ipc_test.go
@@ -18,6 +18,7 @@
 	"v.io/x/ref/lib/testutil"
 	tsecurity "v.io/x/ref/lib/testutil/security"
 	_ "v.io/x/ref/profiles"
+	"v.io/x/ref/profiles/internal/ipc/stream/vc"
 )
 
 //go:generate v23 test generate
@@ -33,35 +34,6 @@
 	return nil
 }
 
-type dischargeService struct {
-	called int
-	mu     sync.Mutex
-}
-
-func (ds *dischargeService) Discharge(call ipc.StreamServerCall, cav security.Caveat, _ security.DischargeImpetus) (security.Discharge, error) {
-	tp := cav.ThirdPartyDetails()
-	if tp == nil {
-		return security.Discharge{}, fmt.Errorf("discharger: not a third party caveat (%v)", cav)
-	}
-	if err := tp.Dischargeable(call); err != nil {
-		return security.Discharge{}, fmt.Errorf("third-party caveat %v cannot be discharged for this context: %v", tp, err)
-	}
-	// If its the first time being called, add an expiry caveat and a MethodCaveat for "EchoBlessings".
-	// Otherwise, just add a MethodCaveat for "Foo".
-	ds.mu.Lock()
-	called := ds.called
-	ds.mu.Unlock()
-	caveats := []security.Caveat{mkCaveat(security.MethodCaveat("Foo"))}
-	if called == 0 {
-		caveats = []security.Caveat{
-			mkCaveat(security.MethodCaveat("EchoBlessings")),
-			mkCaveat(security.ExpiryCaveat(time.Now().Add(time.Second))),
-		}
-	}
-
-	return call.LocalPrincipal().MintDischarge(cav, caveats[0], caveats[1:]...)
-}
-
 func newCtx(rootCtx *context.T) *context.T {
 	ctx, err := v23.SetPrincipal(rootCtx, tsecurity.NewPrincipal("defaultBlessings"))
 	if err != nil {
@@ -106,8 +78,8 @@
 	return tpc
 }
 
-func startServer(ctx *context.T, s interface{}) (ipc.Server, string, error) {
-	server, err := v23.NewServer(ctx)
+func startServer(ctx *context.T, s interface{}, opts ...ipc.ServerOpt) (ipc.Server, string, error) {
+	server, err := v23.NewServer(ctx, opts...)
 	if err != nil {
 		return nil, "", err
 	}
@@ -210,6 +182,32 @@
 	}
 }
 
+type dischargeService struct {
+	called int
+	mu     sync.Mutex
+}
+
+func (ds *dischargeService) Discharge(call ipc.StreamServerCall, cav security.Caveat, _ security.DischargeImpetus) (security.Discharge, error) {
+	tp := cav.ThirdPartyDetails()
+	if tp == nil {
+		return security.Discharge{}, fmt.Errorf("discharger: not a third party caveat (%v)", cav)
+	}
+	if err := tp.Dischargeable(call); err != nil {
+		return security.Discharge{}, fmt.Errorf("third-party caveat %v cannot be discharged for this context: %v", tp, err)
+	}
+	// If its the first time being called, add an expiry caveat and a MethodCaveat for "EchoBlessings".
+	// Otherwise, just add a MethodCaveat for "Foo".
+	ds.mu.Lock()
+	called := ds.called
+	ds.mu.Unlock()
+	caveat := security.UnconstrainedUse()
+	if called == 0 {
+		caveat = mkCaveat(security.ExpiryCaveat(time.Now().Add(-1 * time.Second)))
+	}
+
+	return call.LocalPrincipal().MintDischarge(cav, caveat)
+}
+
 func TestServerDischarges(t *testing.T) {
 	ctx, shutdown := testutil.InitForTest()
 	defer shutdown()
@@ -236,7 +234,7 @@
 	if err := root.Bless(pserver, "server", mkThirdPartyCaveat(pdischarger.PublicKey(), dischargeServerName)); err != nil {
 		t.Fatal(err)
 	}
-	server, serverName, err := startServer(serverCtx, &testService{})
+	server, serverName, err := startServer(serverCtx, &testService{}, vc.DischargeExpiryBuffer(10*time.Millisecond))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -280,31 +278,19 @@
 		return nil
 	}
 
-	if err := makeCall(); err != nil {
-		t.Error(err)
+	if err := makeCall(); !verror.Is(err, verror.ErrNotTrusted.ID) {
+		t.Fatalf("got error %v, expected %v", err, verror.ErrNotTrusted.ID)
 	}
 	ds.mu.Lock()
 	ds.called++
 	ds.mu.Unlock()
-	// makeCall should eventually fail because the discharge will expire, and when it does
-	// it no longer allows calls to "EchoBlessings".
+	// makeCall should eventually succeed because a valid discharge should be refreshed.
 	start := time.Now()
-	for {
-		if time.Since(start) > time.Second {
-			t.Fatalf("Discharge no refreshed in 1 second")
+	for err := makeCall(); err != nil; time.Sleep(10 * time.Millisecond) {
+		if time.Since(start) > 10*time.Second {
+			t.Fatalf("Discharge not refreshed in 10 seconds")
 		}
-		if err := makeCall(); err == nil {
-			time.Sleep(10 * time.Millisecond)
-			continue
-		} else if !verror.Is(err, verror.ErrNotTrusted.ID) {
-			t.Fatalf("got error %v, expected %v", err, verror.ErrNotTrusted.ID)
-		}
-		break
-	}
-
-	// Discharge should now be refreshed and calls to "Foo" should succeed.
-	if _, err := client.StartCall(clientCtx, serverName, "Foo", nil, allowedServers); err != nil {
-		t.Errorf("client.StartCall should have succeeded: %v", err)
+		err = makeCall()
 	}
 
 	// Test that the client fails to talk to server that does not present appropriate discharges.