Merge "veyron/runtimes/google/ipc: Fix a race in the test."
diff --git a/lib/testutil/init.go b/lib/testutil/init.go
index c8aaf5d..794ec5a 100644
--- a/lib/testutil/init.go
+++ b/lib/testutil/init.go
@@ -14,6 +14,7 @@
"strconv"
"sync"
"time"
+
"veyron.io/veyron/veyron2/vlog"
)
@@ -47,6 +48,7 @@
}
var Rand *Random
+var once sync.Once
// Init sets up state for running tests: Adjusting GOMAXPROCS,
// configuring the vlog logging library, setting up the random number generator
@@ -56,28 +58,32 @@
// flags. Thus, it is NOT a good idea to call this from the init() function
// of any module except "main" or _test.go files.
func Init() {
- if os.Getenv("GOMAXPROCS") == "" {
- // Set the number of logical processors to the number of CPUs,
- // if GOMAXPROCS is not set in the environment.
- runtime.GOMAXPROCS(runtime.NumCPU())
- }
- // At this point all of the flags that we're going to use for
- // tests must be defined.
- // This will be the case if this is called from the init()
- // function of a _test.go file.
- flag.Parse()
- vlog.ConfigureLibraryLoggerFromFlags()
- // Initialize pseudo-random number generator.
- seed := time.Now().UnixNano()
- seedString := os.Getenv(SeedEnv)
- if seedString != "" {
- var err error
- base, bitSize := 0, 64
- seed, err = strconv.ParseInt(seedString, base, bitSize)
- if err != nil {
- vlog.Fatalf("ParseInt(%v, %v, %v) failed: %v", seedString, base, bitSize, err)
+ init := func() {
+ if os.Getenv("GOMAXPROCS") == "" {
+ // Set the number of logical processors to the number of CPUs,
+ // if GOMAXPROCS is not set in the environment.
+ runtime.GOMAXPROCS(runtime.NumCPU())
}
+ // At this point all of the flags that we're going to use for
+ // tests must be defined.
+ // This will be the case if this is called from the init()
+ // function of a _test.go file.
+ flag.Parse()
+ vlog.ConfigureLibraryLoggerFromFlags()
+
+ // Initialize pseudo-random number generator.
+ seed := time.Now().UnixNano()
+ seedString := os.Getenv(SeedEnv)
+ if seedString != "" {
+ var err error
+ base, bitSize := 0, 64
+ seed, err = strconv.ParseInt(seedString, base, bitSize)
+ if err != nil {
+ vlog.Fatalf("ParseInt(%v, %v, %v) failed: %v", seedString, base, bitSize, err)
+ }
+ }
+ vlog.Infof("Seeding pseudo-random number generator with %v", seed)
+ Rand = &Random{rand: rand.New(rand.NewSource(seed))}
}
- vlog.Infof("Seeding pseudo-random number generator with %v", seed)
- Rand = &Random{rand: rand.New(rand.NewSource(seed))}
+ once.Do(init)
}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index e1cc0ae..a246e1f 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -13,7 +13,7 @@
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
- "veyron.io/veyron/veyron/runtimes/google/vtrace"
+ ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
"veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/i18n"
@@ -27,6 +27,7 @@
verror "veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vom"
+ "veyron.io/veyron/veyron2/vtrace"
)
const pkgPath = "veyron.io/veyron/veyron/runtimes/google/ipc"
@@ -154,7 +155,7 @@
if noDischarges {
vcOpts = append(vcOpts, vc.NoDischarges{})
}
- vc, err := sm.Dial(ep, vcOpts...)
+ vc, err := sm.Dial(ep, append(vcOpts, vc.DialContext{ctx})...)
c.vcMapMu.Lock()
if err != nil {
if strings.Contains(err.Error(), "authentication failed") {
@@ -282,7 +283,7 @@
if ctx == nil {
return nil, verror.ExplicitMake(verror.BadArg, i18n.NoLangID, "ipc.Client", "StartCall")
}
- ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<client>%q.%s", name, method))
+ ctx, span := ivtrace.WithNewSpan(ctx, fmt.Sprintf("<client>%q.%s", name, method))
ctx = verror.ContextWithComponentName(ctx, "ipc.Client")
// Context specified deadline.
@@ -327,6 +328,10 @@
func (c *client) tryServer(ctx context.T, index int, server string, ch chan<- *serverStatus, noDischarges bool) {
status := &serverStatus{index: index}
var err verror.E
+ var span vtrace.Span
+ ctx, span = ivtrace.WithNewSpan(ctx, "<client>connectFlow")
+ span.Annotatef("address:%v", server)
+ defer span.Finish()
if status.flow, status.suffix, err = c.connectFlow(ctx, server, noDischarges); err != nil {
vlog.VI(2).Infof("ipc: err: %s", err)
status.err = err
@@ -631,7 +636,7 @@
// Fetch any discharges for third-party caveats on the client's blessings
// if this client owns a discharge-client.
if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil {
- fc.discharges = fc.dc.PrepareDischarges(self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args))
+ fc.discharges = fc.dc.PrepareDischarges(fc.ctx, self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args))
}
req := ipc.Request{
Suffix: suffix,
@@ -640,7 +645,7 @@
Timeout: int64(timeout),
GrantedBlessings: security.MarshalBlessings(blessings),
NumDischarges: uint64(len(fc.discharges)),
- TraceRequest: vtrace.Request(fc.ctx),
+ TraceRequest: ivtrace.Request(fc.ctx),
}
if err := fc.enc.Encode(req); err != nil {
return fc.close(badProtocol(fc.ctx, verror.Make(errRequestEncoding, fc.ctx, req, err)))
@@ -738,7 +743,7 @@
func (fc *flowClient) Finish(resultptrs ...interface{}) error {
defer vlog.LogCall()()
err := fc.finish(resultptrs...)
- vtrace.FromContext(fc.ctx).Finish()
+ ivtrace.FromContext(fc.ctx).Finish()
return err
}
@@ -781,7 +786,7 @@
}
// Incorporate any VTrace info that was returned.
- vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
+ ivtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
if fc.response.Error != nil {
// TODO(cnicolaou): remove verror.NoAccess with verror version
@@ -811,7 +816,7 @@
func (fc *flowClient) Cancel() {
defer vlog.LogCall()()
- vtrace.FromContext(fc.ctx).Annotate("Cancelled")
+ ivtrace.FromContext(fc.ctx).Annotate("Cancelled")
fc.flow.Cancel()
}
diff --git a/runtimes/google/ipc/discharges.go b/runtimes/google/ipc/discharges.go
index 53ce00a..37bb62d 100644
--- a/runtimes/google/ipc/discharges.go
+++ b/runtimes/google/ipc/discharges.go
@@ -1,9 +1,11 @@
package ipc
import (
+ "fmt"
"sync"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
+ ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
"veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc"
@@ -12,24 +14,35 @@
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vdl/vdlutil"
"veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vtrace"
)
// discharger implements vc.DischargeClient.
type dischargeClient struct {
- c ipc.Client
- ctx context.T
- cache dischargeCache
+ c ipc.Client
+ defaultCtx context.T
+ cache dischargeCache
}
-func InternalNewDischargeClient(streamMgr stream.Manager, ns naming.Namespace, ctx context.T, opts ...ipc.ClientOpt) (vc.DischargeClient, error) {
+// InternalNewDischargeClient creates a vc.DischargeClient that will be used to
+// fetch discharges to support blessings presented to a remote process.
+//
+// defaultCtx is the context used when none (nil) is explicitly provided to the
+// PrepareDischarges call. This typically happens when fetching discharges on
+// behalf of a server accepting connections, i.e., before any notion of the
+// "context" of an API call has been established.
+func InternalNewDischargeClient(streamMgr stream.Manager, ns naming.Namespace, defaultCtx context.T, opts ...ipc.ClientOpt) (vc.DischargeClient, error) {
+ if defaultCtx == nil {
+ return nil, fmt.Errorf("must provide a non-nil context to InternalNewDischargeClient")
+ }
c, err := InternalNewClient(streamMgr, ns, opts...)
if err != nil {
return nil, err
}
return &dischargeClient{
- c: c,
- ctx: ctx,
- cache: dischargeCache{cache: make(map[string]security.Discharge)},
+ c: c,
+ defaultCtx: defaultCtx,
+ cache: dischargeCache{cache: make(map[string]security.Discharge)},
}, nil
}
@@ -43,7 +56,7 @@
// options, or requested from the discharge issuer indicated on the caveat.
// Note that requesting a discharge is an ipc call, so one copy of this
// function must be able to successfully terminate while another is blocked.
-func (d *dischargeClient) PrepareDischarges(forcaveats []security.ThirdPartyCaveat, impetus security.DischargeImpetus) (ret []security.Discharge) {
+func (d *dischargeClient) PrepareDischarges(ctx context.T, forcaveats []security.ThirdPartyCaveat, impetus security.DischargeImpetus) (ret []security.Discharge) {
if len(forcaveats) == 0 {
return
}
@@ -59,7 +72,16 @@
// Fetch discharges for caveats for which no discharges were found
// in the cache.
- d.fetchDischarges(d.ctx, caveats, impetus, discharges)
+ if ctx == nil {
+ ctx = d.defaultCtx
+ }
+ if ctx != nil {
+ var span vtrace.Span
+ ctx, span = ivtrace.WithNewSpan(ctx, "Fetching Discharges")
+ defer span.Finish()
+ }
+
+ d.fetchDischarges(ctx, caveats, impetus, discharges)
for _, d := range discharges {
if d != nil {
ret = append(ret, d)
@@ -89,7 +111,7 @@
continue
}
wg.Add(1)
- go func(i int, cav security.ThirdPartyCaveat) {
+ go func(i int, ctx context.T, cav security.ThirdPartyCaveat) {
defer wg.Done()
vlog.VI(3).Infof("Fetching discharge for %v", cav)
call, err := d.c.StartCall(ctx, cav.Location(), "Discharge", []interface{}{cav, filteredImpetus(cav.Requirements(), impetus)}, vc.NoDischarges{})
@@ -107,7 +129,7 @@
vlog.Errorf("fetchDischarges: server at %s sent a %T (%v) instead of a Discharge", cav.Location(), dAny, dAny)
}
discharges <- fetched{i, d}
- }(i, caveats[i])
+ }(i, ctx, caveats[i])
}
wg.Wait()
close(discharges)
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 6a80944..266f3c1 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -1,6 +1,7 @@
package ipc
import (
+ "encoding/hex"
"errors"
"fmt"
"io"
@@ -19,6 +20,7 @@
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/uniqueid"
"veyron.io/veyron/veyron2/vdl/vdlutil"
verror "veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
@@ -34,6 +36,7 @@
"veyron.io/veyron/veyron/runtimes/google/lib/publisher"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
+ "veyron.io/veyron/veyron/runtimes/google/vtrace"
vsecurity "veyron.io/veyron/veyron/security"
)
@@ -671,53 +674,37 @@
return newCaveat(tpc)
}
-// dischargeImpetusServer implements the discharge service. Always fails to
-// issue a discharge, but records the impetus.
-type dischargeImpetusServer struct {
- mu sync.Mutex
- impetus []security.DischargeImpetus // GUARDED_BY(mu)
+// dischargeTestServer implements the discharge service. Always fails to
+// issue a discharge, but records the impetus and traceid of the RPC call.
+type dischargeTestServer struct {
+ p security.Principal
+ impetus []security.DischargeImpetus
+ traceid []uniqueid.ID
}
-func (s *dischargeImpetusServer) Discharge(ctx ipc.ServerContext, cav vdlutil.Any, impetus security.DischargeImpetus) (vdlutil.Any, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
+func (s *dischargeTestServer) Discharge(ctx ipc.ServerContext, cav vdlutil.Any, impetus security.DischargeImpetus) (vdlutil.Any, error) {
s.impetus = append(s.impetus, impetus)
+ s.traceid = append(s.traceid, vtrace.FromContext(ctx).Trace().ID())
return nil, fmt.Errorf("discharges not issued")
}
-// TestAndClearImpetus checks if all the recorded impetuses match want.
-// Returns an error if they do not.
-// Error or no error, it clears the set of recorded impetuses.
-func (s *dischargeImpetusServer) TestAndClearImpetus(want security.DischargeImpetus) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- defer func() { s.impetus = nil }()
- for idx, imp := range s.impetus {
- if !reflect.DeepEqual(imp, want) {
- return fmt.Errorf("impetus %d of %d: Got [%v] want [%v]", idx, len(s.impetus), imp, want)
- }
- }
- return nil
+func (s *dischargeTestServer) Release() ([]security.DischargeImpetus, []uniqueid.ID) {
+ impetus, traceid := s.impetus, s.traceid
+ s.impetus, s.traceid = nil, nil
+ return impetus, traceid
}
-func names2patterns(names []string) []security.BlessingPattern {
- ret := make([]security.BlessingPattern, len(names))
- for idx, n := range names {
- ret[idx] = security.BlessingPattern(n)
- }
- return ret
-}
-
-func TestDischargeImpetus(t *testing.T) {
+func TestDischargeImpetusAndContextPropagation(t *testing.T) {
var (
pserver = tsecurity.NewPrincipal("server")
- pdischarger = pserver // In general, the discharger can be a separate principal. In this test, it happens to be the server.
+ pdischarger = tsecurity.NewPrincipal("discharger")
+ pclient = tsecurity.NewPrincipal("client")
sm = imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns = tnaming.NewSimpleNamespace()
mkClient = func(req security.ThirdPartyRequirements) vc.LocalPrincipal {
- pclient := tsecurity.NewPrincipal()
- tpc, err := security.NewPublicKeyCaveat(pdischarger.PublicKey(), "mountpoint/server/discharger", req, security.UnconstrainedUse())
+ // Setup the client so that it shares a blessing with a third-party caveat with the server.
+ tpc, err := security.NewPublicKeyCaveat(pdischarger.PublicKey(), "mountpoint/discharger", req, security.UnconstrainedUse())
if err != nil {
t.Fatalf("Failed to create ThirdPartyCaveat(%+v): %v", req, err)
}
@@ -725,29 +712,77 @@
if err != nil {
t.Fatal(err)
}
- b, err := pclient.BlessSelf("client", cav)
+ b, err := pclient.BlessSelf("client_for_server", cav)
if err != nil {
t.Fatalf("BlessSelf failed: %v", err)
}
- pclient.AddToRoots(pserver.BlessingStore().Default()) // make the client recognize the server.
pclient.BlessingStore().Set(b, "server")
return vc.LocalPrincipal{pclient}
}
)
- server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{pserver})
+ // Initialize the client principal.
+ // It trusts both the application server and the discharger.
+ pclient.AddToRoots(pserver.BlessingStore().Default())
+ pclient.AddToRoots(pdischarger.BlessingStore().Default())
+ // Share a blessing without any third-party caveats with the discharger.
+ // It could share the same blessing as generated by setupClientBlessing, but
+ // that will lead to possibly debugging confusion (since it will try to fetch
+ // a discharge to talk to the discharge service).
+ if b, err := pclient.BlessSelf("client_for_discharger"); err != nil {
+ t.Fatalf("BlessSelf failed: %v", err)
+ } else {
+ pclient.BlessingStore().Set(b, "discharger")
+ }
+
+ // Setup the discharge server.
+ var tester dischargeTestServer
+ dischargeServer, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{pdischarger})
if err != nil {
t.Fatal(err)
}
- defer server.Stop()
- if _, err := server.Listen(listenSpec); err != nil {
+ defer dischargeServer.Stop()
+ if _, err := dischargeServer.Listen(listenSpec); err != nil {
+ t.Fatal(err)
+ }
+ if err := dischargeServer.Serve("mountpoint/discharger", &tester, &testServerAuthorizer{}); err != nil {
t.Fatal(err)
}
- var tester dischargeImpetusServer
- if err := server.Serve("mountpoint", &tester, testServerAuthorizer{}); err != nil {
- t.Fatal(err)
+ // DischargeClient used to fetch discharges.
+ dc, err := InternalNewDischargeClient(sm, ns, testContext(), vc.LocalPrincipal{pclient})
+ if err != nil {
+ t.Fatalf("InternalDischargeNewClient failed: %v", err)
}
+ // Setup the application server.
+ appServer, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{pserver})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer appServer.Stop()
+ ep, err := appServer.Listen(listenSpec)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // TODO(bjornick,cnicolaou,ashankar): This is a hack to workaround the
+ // fact that a single Listen on the "tcp" protocol followed by a call
+ // to Serve(<name>, ...) transparently creates two endpoints (one for
+ // tcp, one for websockets) and maps both to <name> via a mount.
+ // Because all endpoints to a name are tried in a parallel, this
+ // transparency makes this test hard to follow (many discharge fetch
+ // attempts are made - one for VIF authentication, one for VC
+ // authentication and one for the actual RPC - and having them be made
+ // to two different endpoints in parallel leads to a lot of
+ // non-determinism). The last plan of record known by the author of
+ // this comment was to stop this sly creation of two endpoints and
+ // require that they be done explicitly. When that happens, this hack
+ // can go away, but till then, this workaround allows the test to be
+ // more predictable by ensuring there is only one VIF/VC/Flow to the
+ // server.
+ object := naming.JoinAddressName(ep.String(), "object") // instead of "mountpoint/object"
+ if err := appServer.Serve("mountpoint/object", &testServer{}, &testServerAuthorizer{}); err != nil {
+ t.Fatal(err)
+ }
tests := []struct {
Requirements security.ThirdPartyRequirements
Impetus security.DischargeImpetus
@@ -766,24 +801,52 @@
},
}
- for _, test := range tests {
+ for testidx, test := range tests {
pclient := mkClient(test.Requirements)
- dc, err := InternalNewDischargeClient(sm, ns, testContext(), pclient)
- if err != nil {
- t.Fatalf("InternalDischargeNewClient failed: %v", err)
- }
client, err := InternalNewClient(sm, ns, pclient, dc)
if err != nil {
t.Fatalf("InternalNewClient(%+v) failed: %v", test.Requirements, err)
}
defer client.Close()
+ ctx := testContext()
+ tid := vtrace.FromContext(ctx).Trace().ID()
// StartCall should fetch the discharge, do not worry about finishing the RPC - do not care about that for this test.
- if _, err := client.StartCall(testContext(), "mountpoint/object", "Method", []interface{}{"argument"}); err != nil {
+ if _, err := client.StartCall(ctx, object, "Method", []interface{}{"argument"}); err != nil {
t.Errorf("StartCall(%+v) failed: %v", test.Requirements, err)
continue
}
- if err := tester.TestAndClearImpetus(test.Impetus); err != nil {
- t.Errorf("Test %+v: %v", test.Requirements, err)
+ impetus, traceid := tester.Release()
+ // There should have been 2 or 3 attempts to fetch a discharge
+ // (since the discharge service doesn't actually issue a valid
+ // discharge, there is no re-usable discharge between these attempts):
+ // (1) When creating a VIF with the server hosting the remote object.
+ // (This will happen only for the first test, where the stream.Manager
+ // authenticates at the VIF level for the very first time).
+ // (2) When creating a VC with the server hosting the remote object.
+ // (3) When making the RPC to the remote object.
+ num := 3
+ if testidx > 0 {
+ num = 2
+ }
+ if want := num; len(impetus) != want || len(traceid) != want {
+ t.Errorf("Test %+v: Got (%d, %d) (#impetus, #traceid), wanted %d each", test.Requirements, len(impetus), len(traceid), want)
+ continue
+ }
+ // VC creation does not have any "impetus", it is established without
+ // knowledge of the context of the RPC. So ignore that.
+ //
+ // TODO(ashankar): Should the impetus of the RPC that initiated the
+ // VIF/VC creation be propagated?
+ if got, want := impetus[len(impetus)-1], test.Impetus; !reflect.DeepEqual(got, want) {
+ t.Errorf("Test %+v: Got impetus %v, want %v", test.Requirements, got, want)
+ }
+ // But the context used for all of this should be the same
+ // (thereby allowing debug traces to link VIF/VC creation with
+ // the RPC that initiated them).
+ for idx, got := range traceid {
+ if !reflect.DeepEqual(got, tid) {
+ t.Errorf("Test %+v: %d - Got trace id %q, want %q", test.Requirements, idx, hex.EncodeToString(got[:]), hex.EncodeToString(tid[:]))
+ }
}
}
}
@@ -1530,7 +1593,7 @@
if err != nil {
t.Error(err)
}
- _ = dc.PrepareDischarges([]security.ThirdPartyCaveat{tpcav2}, security.DischargeImpetus{})
+ dc.PrepareDischarges(testContext(), []security.ThirdPartyCaveat{tpcav2}, security.DischargeImpetus{})
// Ensure that discharger1 was not called and discharger2 was called.
if discharger1.called {
diff --git a/runtimes/google/ipc/stream/vc/auth.go b/runtimes/google/ipc/stream/vc/auth.go
index cf83fff..5ecc6ca 100644
--- a/runtimes/google/ipc/stream/vc/auth.go
+++ b/runtimes/google/ipc/stream/vc/auth.go
@@ -9,6 +9,7 @@
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
+ "veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc/version"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vom"
@@ -37,7 +38,7 @@
}
var discharges []security.Discharge
if tpcavs := server.ThirdPartyCaveats(); len(tpcavs) > 0 && dc != nil {
- discharges = dc.PrepareDischarges(tpcavs, security.DischargeImpetus{})
+ discharges = dc.PrepareDischarges(nil, tpcavs, security.DischargeImpetus{})
}
if err = writeBlessings(conn, authServerContextTag, crypter, principal, server, discharges, v); err != nil {
return
@@ -56,7 +57,7 @@
//
// TODO(ashankar): Seems like there is no way the blessing store
// can say that it does NOT want to share the default blessing with the server?
-func AuthenticateAsClient(conn io.ReadWriteCloser, principal security.Principal, dc DischargeClient, crypter crypto.Crypter, v version.IPCVersion) (server, client security.Blessings, serverDischarges map[string]security.Discharge, err error) {
+func AuthenticateAsClient(ctx context.T, conn io.ReadWriteCloser, principal security.Principal, dc DischargeClient, crypter crypto.Crypter, v version.IPCVersion) (server, client security.Blessings, serverDischarges map[string]security.Discharge, err error) {
defer conn.Close()
if server, serverDischarges, err = readBlessings(conn, authServerContextTag, crypter, v); err != nil {
return
@@ -79,7 +80,7 @@
}
var discharges []security.Discharge
if dc != nil {
- discharges = dc.PrepareDischarges(client.ThirdPartyCaveats(), security.DischargeImpetus{})
+ discharges = dc.PrepareDischarges(ctx, client.ThirdPartyCaveats(), security.DischargeImpetus{})
}
if err = writeBlessings(conn, authClientContextTag, crypter, principal, client, discharges, v); err != nil {
return
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index 3a91f8b..4460e6e 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -16,13 +16,16 @@
"veyron.io/veyron/veyron/runtimes/google/lib/bqueue"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
vsync "veyron.io/veyron/veyron/runtimes/google/lib/sync"
+ ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
+ "veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/ipc/version"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vtrace"
)
var (
@@ -117,7 +120,7 @@
//
// TODO(ataly, ashankar): What should be the impetus for obtaining the discharges?
type DischargeClient interface {
- PrepareDischarges(forcaveats []security.ThirdPartyCaveat, impetus security.DischargeImpetus) []security.Discharge
+ PrepareDischarges(ctx context.T, forcaveats []security.ThirdPartyCaveat, impetus security.DischargeImpetus) []security.Discharge
// Invalidate marks the provided discharges as invalid, and therefore unfit
// for being returned by a subsequent PrepareDischarges call.
Invalidate(discharges ...security.Discharge)
@@ -127,6 +130,11 @@
IPCStreamVCOpt()
}
+// DialContext establishes the context under which a VC Dial was initiated.
+type DialContext struct{ context.T }
+
+func (DialContext) IPCStreamVCOpt() {}
+
// InternalNew creates a new VC, which implements the stream.VC interface.
//
// As the name suggests, this method is intended for use only within packages
@@ -367,10 +375,13 @@
tlsSessionCache crypto.TLSClientSessionCache
securityLevel options.VCSecurityLevel
dischargeClient DischargeClient
+ ctx context.T
noDischarges bool
)
for _, o := range opts {
switch v := o.(type) {
+ case DialContext:
+ ctx = v.T
case DischargeClient:
dischargeClient = v
case LocalPrincipal:
@@ -383,6 +394,11 @@
noDischarges = true
}
}
+ if ctx != nil {
+ var span vtrace.Span
+ ctx, span = ivtrace.WithNewSpan(ctx, "vc.HandshakeDialedVC")
+ defer span.Finish()
+ }
// If noDischarge is provided, disable the dischargeClient.
if noDischarges {
dischargeClient = nil
@@ -423,7 +439,7 @@
if err != nil {
return vc.err(fmt.Errorf("failed to create a Flow for authentication: %v", err))
}
- rBlessings, lBlessings, rDischarges, err := AuthenticateAsClient(authConn, principal, dischargeClient, crypter, vc.version)
+ rBlessings, lBlessings, rDischarges, err := AuthenticateAsClient(ctx, authConn, principal, dischargeClient, crypter, vc.version)
if err != nil {
return vc.err(fmt.Errorf("authentication failed: %v", err))
}
diff --git a/runtimes/google/ipc/stream/vc/vc_test.go b/runtimes/google/ipc/stream/vc/vc_test.go
index 4e3b376..4ac541e 100644
--- a/runtimes/google/ipc/stream/vc/vc_test.go
+++ b/runtimes/google/ipc/stream/vc/vc_test.go
@@ -21,6 +21,7 @@
"veyron.io/veyron/veyron/runtimes/google/lib/bqueue/drrqueue"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
+ "veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/ipc/version"
"veyron.io/veyron/veyron2/naming"
@@ -164,7 +165,7 @@
type mockDischargeClient []security.Discharge
-func (m mockDischargeClient) PrepareDischarges(forcaveats []security.ThirdPartyCaveat, impetus security.DischargeImpetus) []security.Discharge {
+func (m mockDischargeClient) PrepareDischarges(_ context.T, forcaveats []security.ThirdPartyCaveat, impetus security.DischargeImpetus) []security.Discharge {
return m
}
func (mockDischargeClient) Invalidate(...security.Discharge) {}
diff --git a/runtimes/google/ipc/stream/vif/auth.go b/runtimes/google/ipc/stream/vif/auth.go
index 5d3adc6..53a4b71 100644
--- a/runtimes/google/ipc/stream/vif/auth.go
+++ b/runtimes/google/ipc/stream/vif/auth.go
@@ -14,6 +14,7 @@
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
+ "veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc/stream"
ipcversion "veyron.io/veyron/veyron2/ipc/version"
"veyron.io/veyron/veyron2/options"
@@ -60,7 +61,7 @@
// including a hash of the HopSetup message in the encrypted stream. It is
// likely that this will be addressed in subsequent protocol versions (or it may
// not be addressed at all if IPCVersion6 becomes the only supported version).
-func AuthenticateAsClient(conn net.Conn, versions *version.Range, principal security.Principal, dc vc.DischargeClient) (crypto.ControlCipher, error) {
+func AuthenticateAsClient(ctx context.T, conn net.Conn, versions *version.Range, principal security.Principal, dc vc.DischargeClient) (crypto.ControlCipher, error) {
if versions == nil {
versions = version.SupportedRange
}
@@ -110,13 +111,13 @@
// Perform the authentication.
switch v {
case ipcversion.IPCVersion6:
- return authenticateAsClientIPC6(conn, reader, principal, dc, &pvt, &pub, ppub)
+ return authenticateAsClientIPC6(ctx, conn, reader, principal, dc, &pvt, &pub, ppub)
default:
return nil, errUnsupportedEncryptVersion
}
}
-func authenticateAsClientIPC6(writer io.Writer, reader *iobuf.Reader, principal security.Principal, dc vc.DischargeClient,
+func authenticateAsClientIPC6(ctx context.T, writer io.Writer, reader *iobuf.Reader, principal security.Principal, dc vc.DischargeClient,
pvt *privateData, pub, ppub *message.HopSetup) (crypto.ControlCipher, error) {
pbox := ppub.NaclBox()
if pbox == nil {
@@ -125,7 +126,7 @@
c := crypto.NewControlCipherIPC6(&pbox.PublicKey, &pvt.naclBoxPrivateKey, false)
sconn := newSetupConn(writer, reader, c)
// TODO(jyh): act upon the authentication results.
- _, _, _, err := vc.AuthenticateAsClient(sconn, principal, dc, crypto.NewNullCrypter(), ipcversion.IPCVersion6)
+ _, _, _, err := vc.AuthenticateAsClient(ctx, sconn, principal, dc, crypto.NewNullCrypter(), ipcversion.IPCVersion6)
if err != nil {
return nil, err
}
@@ -229,11 +230,13 @@
// clientAuthOptions extracts the client authentication options from the options
// list.
-func clientAuthOptions(lopts []stream.VCOpt) (principal security.Principal, dischargeClient vc.DischargeClient, err error) {
+func clientAuthOptions(lopts []stream.VCOpt) (ctx context.T, principal security.Principal, dischargeClient vc.DischargeClient, err error) {
var securityLevel options.VCSecurityLevel
var noDischarges bool
for _, o := range lopts {
switch v := o.(type) {
+ case vc.DialContext:
+ ctx = v.T
case vc.DischargeClient:
dischargeClient = v
case vc.LocalPrincipal:
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 355ea0b..6447009 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -25,10 +25,12 @@
"veyron.io/veyron/veyron/runtimes/google/lib/pcqueue"
vsync "veyron.io/veyron/veyron/runtimes/google/lib/sync"
"veyron.io/veyron/veyron/runtimes/google/lib/upcqueue"
+ ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
"veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vtrace"
)
// VIF implements a "virtual interface" over an underlying network connection
@@ -107,11 +109,17 @@
// placed inside veyron/runtimes/google. Code outside the
// veyron2/runtimes/google/* packages should never call this method.
func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, versions *version.Range, opts ...stream.VCOpt) (*VIF, error) {
- principal, dc, err := clientAuthOptions(opts)
+ ctx, principal, dc, err := clientAuthOptions(opts)
if err != nil {
return nil, err
}
- c, err := AuthenticateAsClient(conn, versions, principal, dc)
+ if ctx != nil {
+ var span vtrace.Span
+ ctx, span = ivtrace.WithNewSpan(ctx, "InternalNewDialedVIF")
+ span.Annotatef("(%v, %v)", conn.RemoteAddr().Network(), conn.RemoteAddr())
+ defer span.Finish()
+ }
+ c, err := AuthenticateAsClient(ctx, conn, versions, principal, dc)
if err != nil {
return nil, err
}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 9214839..9476dd9 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -112,11 +112,12 @@
// Add the option that provides a discharge client to the server.
// TODO(cnicolaou): extend the timeout when parallel connections are
// going.
- dc, err := iipc.InternalNewDischargeClient(sm, ns, rt.NewContext(), vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Second})
+ ctx := rt.NewContext()
+ dc, err := iipc.InternalNewDischargeClient(sm, ns, ctx, vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Second})
if err != nil {
return nil, fmt.Errorf("failed to create discharge-client: %v", err)
}
- return iipc.InternalNewServer(rt.NewContext(), sm, ns, rt.traceStore, append(otherOpts, dc)...)
+ return iipc.InternalNewServer(ctx, sm, ns, rt.traceStore, append(otherOpts, dc)...)
}
func (rt *vrt) NewStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) {
diff --git a/runtimes/google/rt/rt_test.go b/runtimes/google/rt/rt_test.go
index 4872dca..15f9de7 100644
--- a/runtimes/google/rt/rt_test.go
+++ b/runtimes/google/rt/rt_test.go
@@ -177,7 +177,7 @@
t.Fatalf("unexpected error: %s", err)
}
- s := expect.NewSession(t, h.Stdout(), 2*time.Second) //time.Minute)
+ s := expect.NewSession(t, h.Stdout(), time.Minute)
runnerBlessing := s.ExpectVar("RUNNER_DEFAULT_BLESSING")
principalBlessing := s.ExpectVar("DEFAULT_BLESSING")
if err := s.Error(); err != nil {
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index 07ebd2e..bc9cb87 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -134,7 +134,9 @@
}
func expectSequence(t *testing.T, trace vtrace.TraceRecord, expectedSpans []string) {
- if got, want := len(trace.Spans), len(expectedSpans); got != want {
+ // It's okay to have additional spans - someone may have inserted
+ // additional spans for more debugging.
+ if got, want := len(trace.Spans), len(expectedSpans); got < want {
t.Errorf("Found %d spans, want %d", got, want)
}
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index 51576fd..170d8cc 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -235,6 +235,9 @@
if err := server.Serve(publishName, new(appService), nil); err != nil {
vlog.Fatalf("Serve(%v) failed: %v", publishName, err)
}
+ // Some of our tests look for log files, so make sure they are flushed
+ // to ensure that at least the files exist.
+ vlog.FlushLog()
ping()
<-signals.ShutdownOnSignals()