ref/runtime/internal/flow/conn: Propery pass discharges.
I'm waiting to write tests until my prevoius change is in.
MultiPart: 1/2
Change-Id: I5adea06ea7c2ca41845b5a227f037b7b12d1de6c
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index afccf37..b9552ce 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -8,6 +8,7 @@
"crypto/rand"
"reflect"
"sync"
+ "time"
"golang.org/x/crypto/nacl/box"
"v.io/v23"
@@ -16,6 +17,7 @@
"v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/vom"
+ slib "v.io/x/ref/lib/security"
)
func (c *Conn) dialHandshake(ctx *context.T, versions version.RPCVersionRange) error {
@@ -41,18 +43,13 @@
// We only send our blessings if we are a server in addition to being a client.
// If we are a pure client, we only send our public key.
if c.handler != nil {
- bkey, dkey, err := c.blessingsFlow.put(ctx, c.lBlessings, c.lDischarges)
- if err != nil {
+ if lAuth.bkey, lAuth.dkey, err = c.refreshDischarges(ctx); err != nil {
return err
}
- lAuth.bkey, lAuth.dkey = bkey, dkey
} else {
lAuth.publicKey = c.lBlessings.PublicKey()
}
- if err = c.mp.writeMsg(ctx, lAuth); err != nil {
- return err
- }
- return err
+ return c.mp.writeMsg(ctx, lAuth)
}
func (c *Conn) acceptHandshake(ctx *context.T, versions version.RPCVersionRange) error {
@@ -66,16 +63,13 @@
if err != nil {
return err
}
- bkey, dkey, err := c.blessingsFlow.put(ctx, c.lBlessings, c.lDischarges)
- if err != nil {
+ lAuth := &auth{
+ channelBinding: signedBinding,
+ }
+ if lAuth.bkey, lAuth.dkey, err = c.refreshDischarges(ctx); err != nil {
return err
}
- err = c.mp.writeMsg(ctx, &auth{
- bkey: bkey,
- dkey: dkey,
- channelBinding: signedBinding,
- })
- if err != nil {
+ if err = c.mp.writeMsg(ctx, lAuth); err != nil {
return err
}
return c.readRemoteAuth(ctx, binding)
@@ -144,7 +138,7 @@
if rauth.bkey != 0 {
var err error
// TODO(mattr): Make sure we cancel out of this at some point.
- c.rBlessings, c.rDischarges, err = c.blessingsFlow.get(ctx, rauth.bkey, rauth.dkey)
+ c.rBlessings, _, err = c.blessingsFlow.get(ctx, rauth.bkey, rauth.dkey)
if err != nil {
return err
}
@@ -161,6 +155,45 @@
return nil
}
+func (c *Conn) refreshDischarges(ctx *context.T) (bkey, dkey uint64, err error) {
+ dis := slib.PrepareDischarges(ctx, c.lBlessings,
+ security.DischargeImpetus{}, time.Minute)
+ // Schedule the next update.
+ var timer *time.Timer
+ if dur, expires := minExpiryTime(c.lBlessings, dis); expires {
+ timer = time.AfterFunc(dur, func() {
+ c.refreshDischarges(ctx)
+ })
+ }
+ bkey, dkey, err = c.blessingsFlow.put(ctx, c.lBlessings, dis)
+ c.mu.Lock()
+ c.dischargeTimer = timer
+ c.mu.Unlock()
+ return
+}
+
+func minExpiryTime(blessings security.Blessings, discharges map[string]security.Discharge) (time.Duration, bool) {
+ var min time.Time
+ cavCount := len(blessings.ThirdPartyCaveats())
+ if cavCount == 0 {
+ return 0, false
+ }
+ for _, d := range discharges {
+ if exp := d.Expiry(); min.IsZero() || (!exp.IsZero() && exp.Before(min)) {
+ min = exp
+ }
+ }
+ if min.IsZero() && cavCount == len(discharges) {
+ return 0, false
+ }
+ now := time.Now()
+ d := min.Sub(now)
+ if d > time.Minute && cavCount > len(discharges) {
+ d = time.Minute
+ }
+ return d, true
+}
+
type blessingsFlow struct {
enc *vom.Encoder
dec *vom.Decoder
@@ -236,6 +269,20 @@
return security.Blessings{}, nil, NewErrBlessingsFlowClosed(ctx)
}
+func (b *blessingsFlow) getLatestDischarges(ctx *context.T, blessings security.Blessings) (map[string]security.Discharge, error) {
+ defer b.mu.Unlock()
+ b.mu.Lock()
+ buid := string(blessings.UniqueID())
+ for !b.closed {
+ element, has := b.byUID[buid]
+ if has {
+ return dischargeMap(element.Discharges), nil
+ }
+ b.cond.Wait()
+ }
+ return nil, NewErrBlessingsFlowClosed(ctx)
+}
+
func (b *blessingsFlow) readLoop(ctx *context.T, loopWG *sync.WaitGroup) {
defer loopWG.Done()
for {
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 8ebd51f..422eb73 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -6,38 +6,40 @@
import (
"testing"
+ "time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
+ "v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
- _ "v.io/x/ref/runtime/factories/fake"
+ vsecurity "v.io/x/ref/lib/security"
+ "v.io/x/ref/runtime/factories/fake"
"v.io/x/ref/test/goroutines"
"v.io/x/ref/test/testutil"
)
-func checkBlessings(t *testing.T, df, af flow.Flow, db, ab security.Blessings) {
- msg, err := af.ReadMsg()
- if err != nil {
- t.Fatal(err)
+func checkBlessings(t *testing.T, got, want security.Blessings, gotd map[string]security.Discharge) {
+ if !got.Equivalent(want) {
+ t.Errorf("got: %#v wanted %#v", got, want)
}
- if string(msg) != "hello" {
- t.Fatalf("Got %s, wanted hello", string(msg))
- }
- if !af.LocalBlessings().Equivalent(ab) {
- t.Errorf("got: %#v wanted %#v", af.LocalBlessings(), ab)
- }
- if !af.RemoteBlessings().Equivalent(db) {
- t.Errorf("got: %#v wanted %#v", af.RemoteBlessings(), db)
- }
- if !df.LocalBlessings().Equivalent(db) {
- t.Errorf("got: %#v wanted %#v", df.LocalBlessings(), db)
- }
- if !df.RemoteBlessings().Equivalent(ab) {
- t.Errorf("got: %#v wanted %#v", df.RemoteBlessings(), ab)
+ tpid := got.ThirdPartyCaveats()[0].ThirdPartyDetails().ID()
+ if _, has := gotd[tpid]; !has {
+ t.Errorf("got: %#v wanted %s", gotd, tpid)
}
}
+
+func checkFlowBlessings(t *testing.T, df, af flow.Flow, db, ab security.Blessings) {
+ if msg, err := af.ReadMsg(); err != nil || string(msg) != "hello" {
+ t.Errorf("Got %s, %v wanted hello, nil", string(msg), err)
+ }
+ checkBlessings(t, af.LocalBlessings(), ab, af.LocalDischarges())
+ checkBlessings(t, af.RemoteBlessings(), db, af.RemoteDischarges())
+ checkBlessings(t, df.LocalBlessings(), db, df.LocalDischarges())
+ checkBlessings(t, df.RemoteBlessings(), ab, df.RemoteDischarges())
+}
+
func dialFlow(t *testing.T, ctx *context.T, dc *Conn, b security.Blessings) flow.Flow {
df, err := dc.Dial(ctx, makeBFP(b))
if err != nil {
@@ -49,15 +51,61 @@
return df
}
-func TestUnidirectional(t *testing.T) {
- defer goroutines.NoLeaks(t, leakWaitTime)()
-
- dctx, shutdown := v23.Init()
- defer shutdown()
- actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
+func BlessWithTPCaveat(t *testing.T, ctx *context.T, p security.Principal, s string) security.Blessings {
+ dp := v23.GetPrincipal(ctx)
+ expcav, err := security.NewExpiryCaveat(time.Now().Add(time.Hour))
if err != nil {
t.Fatal(err)
}
+ tpcav, err := security.NewPublicKeyCaveat(dp.PublicKey(), "discharge",
+ security.ThirdPartyRequirements{}, expcav)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b, err := testutil.IDProviderFromPrincipal(dp).NewBlessings(p, s, tpcav)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return b
+}
+
+func NewPrincipalWithTPCaveat(t *testing.T, ctx *context.T, s string) *context.T {
+ p := testutil.NewPrincipal()
+ vsecurity.SetDefaultBlessings(p, BlessWithTPCaveat(t, ctx, p, s))
+ ctx, err := v23.WithPrincipal(ctx, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return ctx
+}
+
+type fakeDischargeClient struct {
+ p security.Principal
+}
+
+func (fc *fakeDischargeClient) Call(_ *context.T, _, _ string, inArgs, outArgs []interface{}, _ ...rpc.CallOpt) error {
+ expiry, err := security.NewExpiryCaveat(time.Now().Add(time.Minute))
+ if err != nil {
+ panic(err)
+ }
+ *(outArgs[0].(*security.Discharge)), err = fc.p.MintDischarge(
+ inArgs[0].(security.Caveat), expiry)
+ return err
+}
+func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
+ return nil, nil
+}
+func (fc *fakeDischargeClient) Close() {}
+
+func TestUnidirectional(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+
+ dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
+ actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
aflows := make(chan flow.Flow, 2)
dc, ac, _ := setupConns(t, dctx, actx, nil, aflows)
defer dc.Close(dctx, nil)
@@ -65,22 +113,19 @@
df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
af1 := <-aflows
- checkBlessings(t, df1, af1,
+ checkFlowBlessings(t, df1, af1,
v23.GetPrincipal(dctx).BlessingStore().Default(),
v23.GetPrincipal(actx).BlessingStore().Default())
- db2, err := v23.GetPrincipal(dctx).BlessSelf("other")
- if err != nil {
- t.Fatal(err)
- }
+ db2 := BlessWithTPCaveat(t, ctx, v23.GetPrincipal(dctx), "other")
df2 := dialFlow(t, dctx, dc, db2)
af2 := <-aflows
- checkBlessings(t, df2, af2, db2,
+ checkFlowBlessings(t, df2, af2, db2,
v23.GetPrincipal(actx).BlessingStore().Default())
// We should not be able to dial in the other direction, because that flow
// manager is not willing to accept flows.
- _, err = ac.Dial(actx, testBFP)
+ _, err := ac.Dial(actx, testBFP)
if verror.ErrorID(err) != ErrDialingNonServer.ID {
t.Errorf("got %v, wanted ErrDialingNonServer", err)
}
@@ -89,12 +134,12 @@
func TestBidirectional(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
- dctx, shutdown := v23.Init()
+ ctx, shutdown := v23.Init()
defer shutdown()
- actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
- if err != nil {
- t.Fatal(err)
- }
+ ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+
+ dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
+ actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
dflows := make(chan flow.Flow, 2)
aflows := make(chan flow.Flow, 2)
dc, ac, _ := setupConns(t, dctx, actx, dflows, aflows)
@@ -103,31 +148,25 @@
df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
af1 := <-aflows
- checkBlessings(t, df1, af1,
+ checkFlowBlessings(t, df1, af1,
v23.GetPrincipal(dctx).BlessingStore().Default(),
v23.GetPrincipal(actx).BlessingStore().Default())
- db2, err := v23.GetPrincipal(dctx).BlessSelf("other")
- if err != nil {
- t.Fatal(err)
- }
+ db2 := BlessWithTPCaveat(t, ctx, v23.GetPrincipal(dctx), "other")
df2 := dialFlow(t, dctx, dc, db2)
af2 := <-aflows
- checkBlessings(t, df2, af2, db2,
+ checkFlowBlessings(t, df2, af2, db2,
v23.GetPrincipal(actx).BlessingStore().Default())
af3 := dialFlow(t, actx, ac, v23.GetPrincipal(actx).BlessingStore().Default())
df3 := <-dflows
- checkBlessings(t, af3, df3,
+ checkFlowBlessings(t, af3, df3,
v23.GetPrincipal(actx).BlessingStore().Default(),
v23.GetPrincipal(dctx).BlessingStore().Default())
- ab2, err := v23.GetPrincipal(actx).BlessSelf("aother")
- if err != nil {
- t.Fatal(err)
- }
+ ab2 := BlessWithTPCaveat(t, ctx, v23.GetPrincipal(actx), "aother")
af4 := dialFlow(t, actx, ac, ab2)
df4 := <-dflows
- checkBlessings(t, af4, df4, ab2,
+ checkFlowBlessings(t, af4, df4, ab2,
v23.GetPrincipal(dctx).BlessingStore().Default())
}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 28f584d..156cfdc 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -7,6 +7,7 @@
import (
"reflect"
"sync"
+ "time"
"v.io/v23"
"v.io/v23/context"
@@ -50,20 +51,20 @@
// Conns are a multiplexing encrypted channels that can host Flows.
type Conn struct {
- fc *flowcontrol.FlowController
- mp *messagePipe
- handler FlowHandler
- version version.RPCVersion
- lBlessings, rBlessings security.Blessings
- rDischarges, lDischarges map[string]security.Discharge
- local, remote naming.Endpoint
- closed chan struct{}
- blessingsFlow *blessingsFlow
- loopWG sync.WaitGroup
+ fc *flowcontrol.FlowController
+ mp *messagePipe
+ handler FlowHandler
+ version version.RPCVersion
+ lBlessings, rBlessings security.Blessings
+ local, remote naming.Endpoint
+ closed chan struct{}
+ blessingsFlow *blessingsFlow
+ loopWG sync.WaitGroup
- mu sync.Mutex
- nextFid flowID
- flows map[flowID]*flw
+ mu sync.Mutex
+ nextFid flowID
+ flows map[flowID]*flw
+ dischargeTimer *time.Timer
}
// Ensure that *Conn implements flow.Conn.
@@ -127,11 +128,15 @@
if c.rBlessings.IsZero() {
return nil, NewErrDialingNonServer(ctx)
}
- blessings, err := fn(ctx, c.local, c.remote, c.rBlessings, c.rDischarges)
+ rDischarges, err := c.blessingsFlow.getLatestDischarges(ctx, c.rBlessings)
if err != nil {
return nil, err
}
- bkey, dkey, err := c.blessingsFlow.put(ctx, blessings, nil)
+ blessings, discharges, err := fn(ctx, c.local, c.remote, c.rBlessings, rDischarges)
+ if err != nil {
+ return nil, err
+ }
+ bkey, dkey, err := c.blessingsFlow.put(ctx, blessings, discharges)
if err != nil {
return nil, err
}
@@ -161,6 +166,10 @@
c.mu.Lock()
var flows map[flowID]*flw
flows, c.flows = c.flows, nil
+ if c.dischargeTimer != nil {
+ c.dischargeTimer.Stop()
+ c.dischargeTimer = nil
+ }
c.mu.Unlock()
if flows == nil {
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index fd301bc..8b46b53 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -206,14 +206,17 @@
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) LocalDischarges() map[string]security.Discharge {
+ var discharges map[string]security.Discharge
+ var err error
if f.dialed {
- _, discharges, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
- if err != nil {
- f.conn.Close(f.ctx, err)
- }
- return discharges
+ _, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+ } else {
+ discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.lBlessings)
}
- return f.conn.lDischarges
+ if err != nil {
+ f.conn.Close(f.ctx, err)
+ }
+ return discharges
}
// RemoteDischarges returns the discharges presented by the remote end of the
@@ -221,14 +224,17 @@
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) RemoteDischarges() map[string]security.Discharge {
+ var discharges map[string]security.Discharge
+ var err error
if !f.dialed {
- _, discharges, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
- if err != nil {
- f.conn.Close(f.ctx, err)
- }
- return discharges
+ _, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+ } else {
+ discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.rBlessings)
}
- return f.conn.rDischarges
+ if err != nil {
+ f.conn.Close(f.ctx, err)
+ }
+ return discharges
}
// Conn returns the connection the flow is multiplexed on.
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 62799e9..dbb3537 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -6,6 +6,7 @@
import (
"testing"
+ "time"
"v.io/v23"
"v.io/v23/context"
@@ -13,6 +14,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
+ securitylib "v.io/x/ref/lib/security"
"v.io/x/ref/runtime/internal/flow/flowtest"
)
@@ -80,8 +82,8 @@
localEndpoint, remoteEndpoint naming.Endpoint,
remoteBlessings security.Blessings,
remoteDischarges map[string]security.Discharge,
-) (security.Blessings, error) {
- return v23.GetPrincipal(ctx).BlessingStore().Default(), nil
+) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
}
func makeBFP(in security.Blessings) flow.BlessingsForPeer {
@@ -90,7 +92,9 @@
localEndpoint, remoteEndpoint naming.Endpoint,
remoteBlessings security.Blessings,
remoteDischarges map[string]security.Discharge,
- ) (security.Blessings, error) {
- return in, nil
+ ) (security.Blessings, map[string]security.Discharge, error) {
+ dis := securitylib.PrepareDischarges(
+ ctx, in, security.DischargeImpetus{}, time.Minute)
+ return in, dis, nil
}
}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 77d8c57..0402f15 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -41,8 +41,8 @@
localEndpoint, remoteEndpoint naming.Endpoint,
remoteBlessings security.Blessings,
remoteDischarges map[string]security.Discharge,
- ) (security.Blessings, error) {
- return p.BlessingStore().Default(), nil
+ ) (security.Blessings, map[string]security.Discharge, error) {
+ return p.BlessingStore().Default(), nil, nil
}
eps := m.ListeningEndpoints()
if len(eps) == 0 {
@@ -82,8 +82,8 @@
localEndpoint, remoteEndpoint naming.Endpoint,
remoteBlessings security.Blessings,
remoteDischarges map[string]security.Discharge,
- ) (security.Blessings, error) {
- return p.BlessingStore().Default(), nil
+ ) (security.Blessings, map[string]security.Discharge, error) {
+ return p.BlessingStore().Default(), nil, nil
}
eps := am.ListeningEndpoints()
if len(eps) == 0 {