ref/runtime/internal/flow/conn: Propery pass discharges.

I'm waiting to write tests until my prevoius change is in.

MultiPart: 1/2
Change-Id: I5adea06ea7c2ca41845b5a227f037b7b12d1de6c
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index afccf37..b9552ce 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -8,6 +8,7 @@
 	"crypto/rand"
 	"reflect"
 	"sync"
+	"time"
 
 	"golang.org/x/crypto/nacl/box"
 	"v.io/v23"
@@ -16,6 +17,7 @@
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
 	"v.io/v23/vom"
+	slib "v.io/x/ref/lib/security"
 )
 
 func (c *Conn) dialHandshake(ctx *context.T, versions version.RPCVersionRange) error {
@@ -41,18 +43,13 @@
 	// We only send our blessings if we are a server in addition to being a client.
 	// If we are a pure client, we only send our public key.
 	if c.handler != nil {
-		bkey, dkey, err := c.blessingsFlow.put(ctx, c.lBlessings, c.lDischarges)
-		if err != nil {
+		if lAuth.bkey, lAuth.dkey, err = c.refreshDischarges(ctx); err != nil {
 			return err
 		}
-		lAuth.bkey, lAuth.dkey = bkey, dkey
 	} else {
 		lAuth.publicKey = c.lBlessings.PublicKey()
 	}
-	if err = c.mp.writeMsg(ctx, lAuth); err != nil {
-		return err
-	}
-	return err
+	return c.mp.writeMsg(ctx, lAuth)
 }
 
 func (c *Conn) acceptHandshake(ctx *context.T, versions version.RPCVersionRange) error {
@@ -66,16 +63,13 @@
 	if err != nil {
 		return err
 	}
-	bkey, dkey, err := c.blessingsFlow.put(ctx, c.lBlessings, c.lDischarges)
-	if err != nil {
+	lAuth := &auth{
+		channelBinding: signedBinding,
+	}
+	if lAuth.bkey, lAuth.dkey, err = c.refreshDischarges(ctx); err != nil {
 		return err
 	}
-	err = c.mp.writeMsg(ctx, &auth{
-		bkey:           bkey,
-		dkey:           dkey,
-		channelBinding: signedBinding,
-	})
-	if err != nil {
+	if err = c.mp.writeMsg(ctx, lAuth); err != nil {
 		return err
 	}
 	return c.readRemoteAuth(ctx, binding)
@@ -144,7 +138,7 @@
 	if rauth.bkey != 0 {
 		var err error
 		// TODO(mattr): Make sure we cancel out of this at some point.
-		c.rBlessings, c.rDischarges, err = c.blessingsFlow.get(ctx, rauth.bkey, rauth.dkey)
+		c.rBlessings, _, err = c.blessingsFlow.get(ctx, rauth.bkey, rauth.dkey)
 		if err != nil {
 			return err
 		}
@@ -161,6 +155,45 @@
 	return nil
 }
 
+func (c *Conn) refreshDischarges(ctx *context.T) (bkey, dkey uint64, err error) {
+	dis := slib.PrepareDischarges(ctx, c.lBlessings,
+		security.DischargeImpetus{}, time.Minute)
+	// Schedule the next update.
+	var timer *time.Timer
+	if dur, expires := minExpiryTime(c.lBlessings, dis); expires {
+		timer = time.AfterFunc(dur, func() {
+			c.refreshDischarges(ctx)
+		})
+	}
+	bkey, dkey, err = c.blessingsFlow.put(ctx, c.lBlessings, dis)
+	c.mu.Lock()
+	c.dischargeTimer = timer
+	c.mu.Unlock()
+	return
+}
+
+func minExpiryTime(blessings security.Blessings, discharges map[string]security.Discharge) (time.Duration, bool) {
+	var min time.Time
+	cavCount := len(blessings.ThirdPartyCaveats())
+	if cavCount == 0 {
+		return 0, false
+	}
+	for _, d := range discharges {
+		if exp := d.Expiry(); min.IsZero() || (!exp.IsZero() && exp.Before(min)) {
+			min = exp
+		}
+	}
+	if min.IsZero() && cavCount == len(discharges) {
+		return 0, false
+	}
+	now := time.Now()
+	d := min.Sub(now)
+	if d > time.Minute && cavCount > len(discharges) {
+		d = time.Minute
+	}
+	return d, true
+}
+
 type blessingsFlow struct {
 	enc *vom.Encoder
 	dec *vom.Decoder
@@ -236,6 +269,20 @@
 	return security.Blessings{}, nil, NewErrBlessingsFlowClosed(ctx)
 }
 
+func (b *blessingsFlow) getLatestDischarges(ctx *context.T, blessings security.Blessings) (map[string]security.Discharge, error) {
+	defer b.mu.Unlock()
+	b.mu.Lock()
+	buid := string(blessings.UniqueID())
+	for !b.closed {
+		element, has := b.byUID[buid]
+		if has {
+			return dischargeMap(element.Discharges), nil
+		}
+		b.cond.Wait()
+	}
+	return nil, NewErrBlessingsFlowClosed(ctx)
+}
+
 func (b *blessingsFlow) readLoop(ctx *context.T, loopWG *sync.WaitGroup) {
 	defer loopWG.Done()
 	for {
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 8ebd51f..422eb73 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -6,38 +6,40 @@
 
 import (
 	"testing"
+	"time"
 
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
+	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/verror"
-	_ "v.io/x/ref/runtime/factories/fake"
+	vsecurity "v.io/x/ref/lib/security"
+	"v.io/x/ref/runtime/factories/fake"
 	"v.io/x/ref/test/goroutines"
 	"v.io/x/ref/test/testutil"
 )
 
-func checkBlessings(t *testing.T, df, af flow.Flow, db, ab security.Blessings) {
-	msg, err := af.ReadMsg()
-	if err != nil {
-		t.Fatal(err)
+func checkBlessings(t *testing.T, got, want security.Blessings, gotd map[string]security.Discharge) {
+	if !got.Equivalent(want) {
+		t.Errorf("got: %#v wanted %#v", got, want)
 	}
-	if string(msg) != "hello" {
-		t.Fatalf("Got %s, wanted hello", string(msg))
-	}
-	if !af.LocalBlessings().Equivalent(ab) {
-		t.Errorf("got: %#v wanted %#v", af.LocalBlessings(), ab)
-	}
-	if !af.RemoteBlessings().Equivalent(db) {
-		t.Errorf("got: %#v wanted %#v", af.RemoteBlessings(), db)
-	}
-	if !df.LocalBlessings().Equivalent(db) {
-		t.Errorf("got: %#v wanted %#v", df.LocalBlessings(), db)
-	}
-	if !df.RemoteBlessings().Equivalent(ab) {
-		t.Errorf("got: %#v wanted %#v", df.RemoteBlessings(), ab)
+	tpid := got.ThirdPartyCaveats()[0].ThirdPartyDetails().ID()
+	if _, has := gotd[tpid]; !has {
+		t.Errorf("got: %#v wanted %s", gotd, tpid)
 	}
 }
+
+func checkFlowBlessings(t *testing.T, df, af flow.Flow, db, ab security.Blessings) {
+	if msg, err := af.ReadMsg(); err != nil || string(msg) != "hello" {
+		t.Errorf("Got %s, %v wanted hello, nil", string(msg), err)
+	}
+	checkBlessings(t, af.LocalBlessings(), ab, af.LocalDischarges())
+	checkBlessings(t, af.RemoteBlessings(), db, af.RemoteDischarges())
+	checkBlessings(t, df.LocalBlessings(), db, df.LocalDischarges())
+	checkBlessings(t, df.RemoteBlessings(), ab, df.RemoteDischarges())
+}
+
 func dialFlow(t *testing.T, ctx *context.T, dc *Conn, b security.Blessings) flow.Flow {
 	df, err := dc.Dial(ctx, makeBFP(b))
 	if err != nil {
@@ -49,15 +51,61 @@
 	return df
 }
 
-func TestUnidirectional(t *testing.T) {
-	defer goroutines.NoLeaks(t, leakWaitTime)()
-
-	dctx, shutdown := v23.Init()
-	defer shutdown()
-	actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
+func BlessWithTPCaveat(t *testing.T, ctx *context.T, p security.Principal, s string) security.Blessings {
+	dp := v23.GetPrincipal(ctx)
+	expcav, err := security.NewExpiryCaveat(time.Now().Add(time.Hour))
 	if err != nil {
 		t.Fatal(err)
 	}
+	tpcav, err := security.NewPublicKeyCaveat(dp.PublicKey(), "discharge",
+		security.ThirdPartyRequirements{}, expcav)
+	if err != nil {
+		t.Fatal(err)
+	}
+	b, err := testutil.IDProviderFromPrincipal(dp).NewBlessings(p, s, tpcav)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return b
+}
+
+func NewPrincipalWithTPCaveat(t *testing.T, ctx *context.T, s string) *context.T {
+	p := testutil.NewPrincipal()
+	vsecurity.SetDefaultBlessings(p, BlessWithTPCaveat(t, ctx, p, s))
+	ctx, err := v23.WithPrincipal(ctx, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return ctx
+}
+
+type fakeDischargeClient struct {
+	p security.Principal
+}
+
+func (fc *fakeDischargeClient) Call(_ *context.T, _, _ string, inArgs, outArgs []interface{}, _ ...rpc.CallOpt) error {
+	expiry, err := security.NewExpiryCaveat(time.Now().Add(time.Minute))
+	if err != nil {
+		panic(err)
+	}
+	*(outArgs[0].(*security.Discharge)), err = fc.p.MintDischarge(
+		inArgs[0].(security.Caveat), expiry)
+	return err
+}
+func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
+	return nil, nil
+}
+func (fc *fakeDischargeClient) Close() {}
+
+func TestUnidirectional(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+
+	dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
+	actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
 	aflows := make(chan flow.Flow, 2)
 	dc, ac, _ := setupConns(t, dctx, actx, nil, aflows)
 	defer dc.Close(dctx, nil)
@@ -65,22 +113,19 @@
 
 	df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
 	af1 := <-aflows
-	checkBlessings(t, df1, af1,
+	checkFlowBlessings(t, df1, af1,
 		v23.GetPrincipal(dctx).BlessingStore().Default(),
 		v23.GetPrincipal(actx).BlessingStore().Default())
 
-	db2, err := v23.GetPrincipal(dctx).BlessSelf("other")
-	if err != nil {
-		t.Fatal(err)
-	}
+	db2 := BlessWithTPCaveat(t, ctx, v23.GetPrincipal(dctx), "other")
 	df2 := dialFlow(t, dctx, dc, db2)
 	af2 := <-aflows
-	checkBlessings(t, df2, af2, db2,
+	checkFlowBlessings(t, df2, af2, db2,
 		v23.GetPrincipal(actx).BlessingStore().Default())
 
 	// We should not be able to dial in the other direction, because that flow
 	// manager is not willing to accept flows.
-	_, err = ac.Dial(actx, testBFP)
+	_, err := ac.Dial(actx, testBFP)
 	if verror.ErrorID(err) != ErrDialingNonServer.ID {
 		t.Errorf("got %v, wanted ErrDialingNonServer", err)
 	}
@@ -89,12 +134,12 @@
 func TestBidirectional(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 
-	dctx, shutdown := v23.Init()
+	ctx, shutdown := v23.Init()
 	defer shutdown()
-	actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
-	if err != nil {
-		t.Fatal(err)
-	}
+	ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+
+	dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
+	actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
 	dflows := make(chan flow.Flow, 2)
 	aflows := make(chan flow.Flow, 2)
 	dc, ac, _ := setupConns(t, dctx, actx, dflows, aflows)
@@ -103,31 +148,25 @@
 
 	df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
 	af1 := <-aflows
-	checkBlessings(t, df1, af1,
+	checkFlowBlessings(t, df1, af1,
 		v23.GetPrincipal(dctx).BlessingStore().Default(),
 		v23.GetPrincipal(actx).BlessingStore().Default())
 
-	db2, err := v23.GetPrincipal(dctx).BlessSelf("other")
-	if err != nil {
-		t.Fatal(err)
-	}
+	db2 := BlessWithTPCaveat(t, ctx, v23.GetPrincipal(dctx), "other")
 	df2 := dialFlow(t, dctx, dc, db2)
 	af2 := <-aflows
-	checkBlessings(t, df2, af2, db2,
+	checkFlowBlessings(t, df2, af2, db2,
 		v23.GetPrincipal(actx).BlessingStore().Default())
 
 	af3 := dialFlow(t, actx, ac, v23.GetPrincipal(actx).BlessingStore().Default())
 	df3 := <-dflows
-	checkBlessings(t, af3, df3,
+	checkFlowBlessings(t, af3, df3,
 		v23.GetPrincipal(actx).BlessingStore().Default(),
 		v23.GetPrincipal(dctx).BlessingStore().Default())
 
-	ab2, err := v23.GetPrincipal(actx).BlessSelf("aother")
-	if err != nil {
-		t.Fatal(err)
-	}
+	ab2 := BlessWithTPCaveat(t, ctx, v23.GetPrincipal(actx), "aother")
 	af4 := dialFlow(t, actx, ac, ab2)
 	df4 := <-dflows
-	checkBlessings(t, af4, df4, ab2,
+	checkFlowBlessings(t, af4, df4, ab2,
 		v23.GetPrincipal(dctx).BlessingStore().Default())
 }
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 28f584d..156cfdc 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -7,6 +7,7 @@
 import (
 	"reflect"
 	"sync"
+	"time"
 
 	"v.io/v23"
 	"v.io/v23/context"
@@ -50,20 +51,20 @@
 
 // Conns are a multiplexing encrypted channels that can host Flows.
 type Conn struct {
-	fc                       *flowcontrol.FlowController
-	mp                       *messagePipe
-	handler                  FlowHandler
-	version                  version.RPCVersion
-	lBlessings, rBlessings   security.Blessings
-	rDischarges, lDischarges map[string]security.Discharge
-	local, remote            naming.Endpoint
-	closed                   chan struct{}
-	blessingsFlow            *blessingsFlow
-	loopWG                   sync.WaitGroup
+	fc                     *flowcontrol.FlowController
+	mp                     *messagePipe
+	handler                FlowHandler
+	version                version.RPCVersion
+	lBlessings, rBlessings security.Blessings
+	local, remote          naming.Endpoint
+	closed                 chan struct{}
+	blessingsFlow          *blessingsFlow
+	loopWG                 sync.WaitGroup
 
-	mu      sync.Mutex
-	nextFid flowID
-	flows   map[flowID]*flw
+	mu             sync.Mutex
+	nextFid        flowID
+	flows          map[flowID]*flw
+	dischargeTimer *time.Timer
 }
 
 // Ensure that *Conn implements flow.Conn.
@@ -127,11 +128,15 @@
 	if c.rBlessings.IsZero() {
 		return nil, NewErrDialingNonServer(ctx)
 	}
-	blessings, err := fn(ctx, c.local, c.remote, c.rBlessings, c.rDischarges)
+	rDischarges, err := c.blessingsFlow.getLatestDischarges(ctx, c.rBlessings)
 	if err != nil {
 		return nil, err
 	}
-	bkey, dkey, err := c.blessingsFlow.put(ctx, blessings, nil)
+	blessings, discharges, err := fn(ctx, c.local, c.remote, c.rBlessings, rDischarges)
+	if err != nil {
+		return nil, err
+	}
+	bkey, dkey, err := c.blessingsFlow.put(ctx, blessings, discharges)
 	if err != nil {
 		return nil, err
 	}
@@ -161,6 +166,10 @@
 	c.mu.Lock()
 	var flows map[flowID]*flw
 	flows, c.flows = c.flows, nil
+	if c.dischargeTimer != nil {
+		c.dischargeTimer.Stop()
+		c.dischargeTimer = nil
+	}
 	c.mu.Unlock()
 
 	if flows == nil {
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index fd301bc..8b46b53 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -206,14 +206,17 @@
 //
 // Discharges are organized in a map keyed by the discharge-identifier.
 func (f *flw) LocalDischarges() map[string]security.Discharge {
+	var discharges map[string]security.Discharge
+	var err error
 	if f.dialed {
-		_, discharges, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
-		if err != nil {
-			f.conn.Close(f.ctx, err)
-		}
-		return discharges
+		_, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+	} else {
+		discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.lBlessings)
 	}
-	return f.conn.lDischarges
+	if err != nil {
+		f.conn.Close(f.ctx, err)
+	}
+	return discharges
 }
 
 // RemoteDischarges returns the discharges presented by the remote end of the
@@ -221,14 +224,17 @@
 //
 // Discharges are organized in a map keyed by the discharge-identifier.
 func (f *flw) RemoteDischarges() map[string]security.Discharge {
+	var discharges map[string]security.Discharge
+	var err error
 	if !f.dialed {
-		_, discharges, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
-		if err != nil {
-			f.conn.Close(f.ctx, err)
-		}
-		return discharges
+		_, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+	} else {
+		discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.rBlessings)
 	}
-	return f.conn.rDischarges
+	if err != nil {
+		f.conn.Close(f.ctx, err)
+	}
+	return discharges
 }
 
 // Conn returns the connection the flow is multiplexed on.
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 62799e9..dbb3537 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"testing"
+	"time"
 
 	"v.io/v23"
 	"v.io/v23/context"
@@ -13,6 +14,7 @@
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
+	securitylib "v.io/x/ref/lib/security"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 )
 
@@ -80,8 +82,8 @@
 	localEndpoint, remoteEndpoint naming.Endpoint,
 	remoteBlessings security.Blessings,
 	remoteDischarges map[string]security.Discharge,
-) (security.Blessings, error) {
-	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil
+) (security.Blessings, map[string]security.Discharge, error) {
+	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
 }
 
 func makeBFP(in security.Blessings) flow.BlessingsForPeer {
@@ -90,7 +92,9 @@
 		localEndpoint, remoteEndpoint naming.Endpoint,
 		remoteBlessings security.Blessings,
 		remoteDischarges map[string]security.Discharge,
-	) (security.Blessings, error) {
-		return in, nil
+	) (security.Blessings, map[string]security.Discharge, error) {
+		dis := securitylib.PrepareDischarges(
+			ctx, in, security.DischargeImpetus{}, time.Minute)
+		return in, dis, nil
 	}
 }
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 77d8c57..0402f15 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -41,8 +41,8 @@
 		localEndpoint, remoteEndpoint naming.Endpoint,
 		remoteBlessings security.Blessings,
 		remoteDischarges map[string]security.Discharge,
-	) (security.Blessings, error) {
-		return p.BlessingStore().Default(), nil
+	) (security.Blessings, map[string]security.Discharge, error) {
+		return p.BlessingStore().Default(), nil, nil
 	}
 	eps := m.ListeningEndpoints()
 	if len(eps) == 0 {
@@ -82,8 +82,8 @@
 		localEndpoint, remoteEndpoint naming.Endpoint,
 		remoteBlessings security.Blessings,
 		remoteDischarges map[string]security.Discharge,
-	) (security.Blessings, error) {
-		return p.BlessingStore().Default(), nil
+	) (security.Blessings, map[string]security.Discharge, error) {
+		return p.BlessingStore().Default(), nil, nil
 	}
 	eps := am.ListeningEndpoints()
 	if len(eps) == 0 {