profiles/internal/rpc: more verror conversions and tests.
MultiPart: 2/2
Change-Id: Id0e77e55764b6df0116566ff5052b2056c418b40
diff --git a/profiles/internal/naming/namespace/namespace.go b/profiles/internal/naming/namespace/namespace.go
index 13311bd..bda109e 100644
--- a/profiles/internal/naming/namespace/namespace.go
+++ b/profiles/internal/naming/namespace/namespace.go
@@ -158,7 +158,7 @@
case verror.ErrBadArg.ID:
// This should cover "rpc: wrong number of in-args".
return true
- case verror.ErrNoExist.ID:
+ case verror.ErrNoExist.ID, verror.ErrUnknownMethod.ID, verror.ErrUnknownSuffix.ID:
// This should cover "rpc: unknown method", "rpc: dispatcher not
// found", and dispatcher Lookup not found errors.
return true
diff --git a/profiles/internal/naming/namespace/resolve.go b/profiles/internal/naming/namespace/resolve.go
index 807060d..96b08f8 100644
--- a/profiles/internal/naming/namespace/resolve.go
+++ b/profiles/internal/naming/namespace/resolve.go
@@ -43,7 +43,7 @@
}
// Keep track of the final error and continue with next server.
finalErr = err
- vlog.VI(2).Infof("ResolveStep %s failed: %s", name, err)
+ vlog.VI(2).Infof("resolveAMT: Finish %s failed: %s", name, err)
continue
}
// Add result to cache.
@@ -51,6 +51,7 @@
vlog.VI(2).Infof("resolveAMT %s -> %v", name, entry)
return entry, nil
}
+ vlog.VI(2).Infof("resolveAMT %v -> %v", e.Servers, finalErr)
return nil, finalErr
}
diff --git a/profiles/internal/rpc/blessings_cache.go b/profiles/internal/rpc/blessings_cache.go
index 2eca000..116f638 100644
--- a/profiles/internal/rpc/blessings_cache.go
+++ b/profiles/internal/rpc/blessings_cache.go
@@ -6,14 +6,24 @@
import (
"crypto/sha256"
- "fmt"
"sync"
"v.io/v23/rpc"
"v.io/v23/security"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/rpc/stream"
)
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errMissingBlessingsKey = reg(".blessingsKey", "key {3} was not in blessings cache")
+ errInvalidClientBlessings = reg("invalidClientBlessings", "client sent invalid Blessings")
+)
+
// clientEncodeBlessings gets or inserts the blessings into the cache.
func clientEncodeBlessings(cache stream.VCDataCache, blessings security.Blessings) rpc.BlessingsRequest {
blessingsCacheAny := cache.GetOrInsert(clientBlessingsCacheKey{}, newClientBlessingsCache)
@@ -146,7 +156,7 @@
cached, exists := c.m[req.Key]
c.RUnlock()
if !exists {
- return security.Blessings{}, fmt.Errorf("rpc: key was not in the cache")
+ return security.Blessings{}, verror.New(errMissingBlessingsKey, nil, req.Key)
}
stats.recordBlessingCache(true)
return cached, nil
@@ -160,7 +170,7 @@
defer c.Unlock()
if cached, exists := c.m[req.Key]; exists {
if !cached.Equivalent(recv) {
- return security.Blessings{}, fmt.Errorf("client sent invalid Blessings")
+ return security.Blessings{}, verror.New(errInvalidClientBlessings, nil)
}
return cached, nil
}
diff --git a/profiles/internal/rpc/client.go b/profiles/internal/rpc/client.go
index 345b014..25ee7fb 100644
--- a/profiles/internal/rpc/client.go
+++ b/profiles/internal/rpc/client.go
@@ -10,7 +10,6 @@
"math/rand"
"net"
"reflect"
- "strings"
"sync"
"time"
@@ -37,51 +36,42 @@
const pkgPath = "v.io/x/ref/profiles/internal/rpc"
+func reg(id, msg string) verror.IDAction {
+ // Note: the error action is never used and is instead computed
+ // at a higher level. The errors here are purely for informational
+ // purposes.
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
var (
- // Local errs that are used to provide details to the public ones.
- errClientCloseAlreadyCalled = verror.Register(pkgPath+".closeAlreadyCalled", verror.NoRetry,
- "rpc.Client.Close has already been called")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errClientCloseAlreadyCalled = reg(".errCloseAlreadyCalled", "rpc.Client.Close has already been called")
+ errClientFinishAlreadyCalled = reg(".errFinishAlreadyCalled", "rpc.ClientCall.Finish has already been called")
+ errNonRootedName = reg(".errNonRootedName", "{3} does not appear to contain an address")
+ errInvalidEndpoint = reg(".errInvalidEndpoint", "failed to parse endpoint")
+ errIncompatibleEndpoint = reg(".errIncompatibleEndpoint", "incompatible endpoint")
+ errVomEncoder = reg(".errVomEncoder", "failed to create vom encoder{:3}")
+ errVomDecoder = reg(".errVomDecoder", "failed to create vom decoder{:3}")
+ errRequestEncoding = reg(".errRequestEncoding", "failed to encode request {3}{:4}")
+ errDischargeEncoding = reg(".errDischargeEncoding", "failed to encode discharges {:3}")
+ errBlessingEncoding = reg(".errBlessingEncoding", "failed to encode blessing {3}{:4}")
+ errArgEncoding = reg(".errArgEncoding", "failed to encode arg #{3}{:4:}")
+ errMismatchedResults = reg(".errMismatchedResults", "got {3} results, but want {4}")
+ errResultDecoding = reg(".errResultDecoding", "failed to decode result #{3}{:4}")
+ errResponseDecoding = reg(".errResponseDecoding", "failed to decode response{:3}")
+ errRemainingStreamResults = reg(".errRemaingStreamResults", "stream closed with remaining stream results")
+ errNoBlessingsForPeer = reg(".errNoBlessingsForPeer", "no blessings tagged for peer {3}{:4}")
+ errBlessingGrant = reg(".errBlessingGrant", "failed to grant blessing to server with blessings{:3}")
+ errBlessingAdd = reg(".errBlessingAdd", "failed to add blessing granted to server{:3}")
+ errServerAuthorizeFailed = reg(".errServerAuthorizedFailed", "failed to authorized flow with remote blessings{:3} {:4}")
- errClientFinishAlreadyCalled = verror.Register(pkgPath+".finishAlreadyCalled", verror.NoRetry, "rpc.ClientCall.Finish has already been called")
+ errPrepareBlessingsAndDischarges = reg(".prepareBlessingsAndDischarges", "failed to prepare blessings and discharges: remote blessings{:3} {:4}")
- errNonRootedName = verror.Register(pkgPath+".nonRootedName", verror.NoRetry, "{3} does not appear to contain an address")
-
- errInvalidEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an invalid endpoint")
-
- errIncompatibleEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an incompatible endpoint")
-
- errNotTrusted = verror.Register(pkgPath+".notTrusted", verror.NoRetry, "name {3} not trusted using blessings {4}{:5}")
-
- errAuthError = verror.Register(pkgPath+".authError", verror.RetryRefetch, "{3}")
-
- errSystemRetry = verror.Register(pkgPath+".sysErrorRetryConnection", verror.RetryConnection, "{:3:}")
-
- errVomEncoder = verror.Register(pkgPath+".vomEncoder", verror.NoRetry, "failed to create vom encoder {:3}")
- errVomDecoder = verror.Register(pkgPath+".vomDecoder", verror.NoRetry, "failed to create vom decoder {:3}")
-
- errRequestEncoding = verror.Register(pkgPath+".requestEncoding", verror.NoRetry, "failed to encode request {3}{:4}")
-
- errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharges {:3}")
-
- errBlessingEncoding = verror.Register(pkgPath+".blessingEncoding", verror.NoRetry, "failed to encode blessing {3}{:4}")
-
- errArgEncoding = verror.Register(pkgPath+".argEncoding", verror.NoRetry, "failed to encode arg #{3}{:4:}")
-
- errMismatchedResults = verror.Register(pkgPath+".mismatchedResults", verror.NoRetry, "got {3} results, but want {4}")
-
- errResultDecoding = verror.Register(pkgPath+".resultDecoding", verror.NoRetry, "failed to decode result #{3}{:4}")
-
- errResponseDecoding = verror.Register(pkgPath+".responseDecoding", verror.NoRetry, "failed to decode response{:3}")
-
- errRemainingStreamResults = verror.Register(pkgPath+".remaingStreamResults", verror.NoRetry, "stream closed with remaining stream results")
-
- errNoBlessingsForPeer = verror.Register(pkgPath+".noBlessingsForPeer", verror.NoRetry, "no blessings tagged for peer {3}{:4}")
-
- errBlessingGrant = verror.Register(pkgPath+".blessingGrantFailed", verror.NoRetry, "failed to grant blessing to server with blessings {3}{:4}")
-
- errBlessingAdd = verror.Register(pkgPath+".blessingAddFailed", verror.NoRetry, "failed to add blessing granted to server {3}{:4}")
-
- errNoPrincipal = verror.Register(pkgPath+".noPrincipal", verror.NoRetry, "principal required for secure connections")
+ errDischargeImpetus = reg(".errDischargeImpetus", "couldn't make discharge impetus{:3}")
+ errNoPrincipal = reg(".errNoPrincipal", "principal required for secure connections")
)
type client struct {
@@ -137,12 +127,18 @@
return c, nil
}
-func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, error) {
+func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
c.vcMapMu.Lock()
defer c.vcMapMu.Unlock()
- if c.vcMap == nil {
- return nil, verror.New(errClientCloseAlreadyCalled, ctx)
+
+ suberr := func(err error) *verror.SubErr {
+ return &verror.SubErr{Err: err, Options: verror.Print}
}
+
+ if c.vcMap == nil {
+ return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
+ }
+
vcKey := vcMapKey{endpoint: ep.String()}
if principal != nil {
vcKey.clientPublicKey = principal.PublicKey().String()
@@ -161,19 +157,14 @@
}
sm := c.streamMgr
c.vcMapMu.Unlock()
-
vc, err := sm.Dial(ep, principal, vcOpts...)
c.vcMapMu.Lock()
if err != nil {
- if strings.Contains(err.Error(), "authentication failed") {
- return nil, verror.New(errAuthError, ctx, err)
- } else {
- return nil, verror.New(errSystemRetry, ctx, err)
- }
+ return nil, suberr(err)
}
if c.vcMap == nil {
sm.ShutdownEndpoint(ep)
- return nil, verror.New(errClientCloseAlreadyCalled, ctx)
+ return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
}
if othervc, exists := c.vcMap[vcKey]; exists {
vc = othervc.vc
@@ -182,7 +173,11 @@
} else {
c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
}
- return vc.Connect()
+ flow, err := vc.Connect()
+ if err != nil {
+ return nil, suberr(err)
+ }
+ return flow, nil
}
// A randomized exponential backoff. The randomness deters error convoys
@@ -341,11 +336,11 @@
type serverStatus struct {
index int
- suffix string
+ server, suffix string
flow stream.Flow
blessings []string // authorized server blessings
rejectedBlessings []security.RejectedBlessing // rejected server blessings
- err error
+ serverErr *verror.SubErr
}
// tryCreateFlow attempts to establish a Flow to "server" (which must be a
@@ -356,7 +351,7 @@
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
- status := &serverStatus{index: index}
+ status := &serverStatus{index: index, server: server}
var span vtrace.Span
ctx, span = vtrace.SetNewSpan(ctx, "<client>tryCreateFlow")
span.Annotatef("address:%v", server)
@@ -365,24 +360,33 @@
span.Finish()
}()
+ suberr := func(err error) *verror.SubErr {
+ return &verror.SubErr{
+ Name: fmt.Sprintf("%s:%s.%s", server, name, method),
+ Err: err,
+ Options: verror.Print,
+ }
+ }
+
address, suffix := naming.SplitAddressName(server)
if len(address) == 0 {
- status.err = verror.New(errNonRootedName, ctx, server)
+ status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
return
}
status.suffix = suffix
ep, err := inaming.NewEndpoint(address)
if err != nil {
- status.err = verror.New(errInvalidEndpoint, ctx, address)
+ status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
return
}
if err = version.CheckCompatibility(ep); err != nil {
- status.err = verror.New(errIncompatibleEndpoint, ctx, ep)
+ status.serverErr = suberr(verror.New(errIncompatibleEndpoint, ctx))
return
}
- if status.flow, status.err = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.err != nil {
- vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.err)
+ if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
+ status.serverErr.Name = fmt.Sprintf("%s:%s.%s", server, name, method)
+ vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
return
}
@@ -405,8 +409,10 @@
})
ctx = security.SetCall(ctx, seccall)
if err := auth.Authorize(ctx); err != nil {
- status.err = verror.New(verror.ErrNotTrusted, ctx, name, status.flow.RemoteBlessings(), err)
- vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.err)
+ // We will test for errServerAuthorizeFailed in failedTryCall and report
+ // verror.ErrNotTrusted
+ status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
+ vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
status.flow.Close()
status.flow = nil
return
@@ -537,15 +543,19 @@
doneChan := ctx.Done()
r.flow.SetDeadline(doneChan)
- // TODO(cnicolaou): continue verror testing from here.
fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
if err != nil {
return nil, verror.NoRetry, err
}
if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
- r.err = verror.New(verror.ErrNotTrusted, ctx, name, r.flow.RemoteBlessings(), err)
- vlog.VI(2).Infof("rpc: err: %s", r.err)
+ n := fmt.Sprintf("%s:%s.%s", r.server, name, method)
+ r.serverErr = &verror.SubErr{
+ Name: n,
+ Options: verror.Print,
+ Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
+ }
+ vlog.VI(2).Infof("rpc: err: %s", r.serverErr)
r.flow.Close()
r.flow = nil
continue
@@ -622,17 +632,20 @@
func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, error) {
go cleanupTryCall(nil, responses, ch)
c.ns.FlushCacheEntry(name)
- noconn, untrusted := []string{}, []string{}
+ suberrs := []verror.SubErr{}
+ topLevelError := verror.ErrNoServers
+ topLevelAction := verror.RetryRefetch
for _, r := range responses {
- if r != nil && r.err != nil {
- switch {
- case verror.ErrorID(r.err) == verror.ErrNotTrusted.ID || verror.ErrorID(r.err) == errAuthError.ID:
- untrusted = append(untrusted, "("+r.err.Error()+") ")
- default:
- noconn = append(noconn, "("+r.err.Error()+") ")
+ if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
+ switch verror.ErrorID(r.serverErr.Err) {
+ case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
+ topLevelError = verror.ErrNotTrusted
+ topLevelAction = verror.NoRetry
}
+ suberrs = append(suberrs, *r.serverErr)
}
}
+
// TODO(cnicolaou): we get system errors for things like dialing using
// the 'ws' protocol which can never succeed even if we retry the connection,
// hence we return RetryRefetch below except for the case where the servers
@@ -640,17 +653,8 @@
// TODO(cnicolaou): implementing at-most-once rpc semantics in the future
// will require thinking through all of the cases where the RPC can
// be retried by the client whilst it's actually being executed on the
- // client.
- switch {
- case len(untrusted) > 0 && len(noconn) > 0:
- return nil, verror.RetryRefetch, verror.New(verror.ErrNoServersAndAuth, ctx, append(noconn, untrusted...))
- case len(noconn) > 0:
- return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, noconn)
- case len(untrusted) > 0:
- return nil, verror.NoRetry, verror.New(verror.ErrNotTrusted, ctx, untrusted)
- default:
- return nil, verror.RetryRefetch, verror.New(verror.ErrTimeout, ctx)
- }
+ // server.
+ return nil, topLevelAction, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
}
// prepareBlessingsAndDischarges prepares blessings and discharges for
@@ -679,8 +683,7 @@
if !fc.blessings.IsZero() && fc.dc != nil {
impetus, err := mkDischargeImpetus(fc.server, method, args)
if err != nil {
- // TODO(toddw): Fix up the internal error.
- return verror.New(verror.ErrBadProtocol, fc.ctx, fmt.Errorf("couldn't make discharge impetus: %v", err))
+ return verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errDischargeImpetus, nil, err))
}
fc.discharges = fc.dc.PrepareDischarges(fc.ctx, fc.blessings.ThirdPartyCaveats(), impetus)
}
@@ -722,9 +725,9 @@
switch v := o.(type) {
case rpc.Granter:
if b, err := v.Grant(ctx); err != nil {
- return verror.New(errBlessingGrant, fc.ctx, fc.server, err)
+ return verror.New(errBlessingGrant, fc.ctx, err)
} else if fc.grantedBlessings, err = security.UnionOfBlessings(fc.grantedBlessings, b); err != nil {
- return verror.New(errBlessingAdd, fc.ctx, fc.server, err)
+ return verror.New(errBlessingAdd, fc.ctx, err)
}
}
}
@@ -776,21 +779,24 @@
typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
if typeenc == nil {
if fc.enc, err = vom.NewEncoder(flow); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
+ // In practice, this will never fail because of a networking
+ // problem since the encoder writes the 'magic byte' which
+ // will be buffered and not written to the network immediately.
+ berr := verror.AddSubErrs(verror.New(errVomEncoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
if fc.dec, err = vom.NewDecoder(flow); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
+ berr := verror.AddSubErrs(verror.New(errVomDecoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
} else {
if fc.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
+ berr := verror.AddSubErrs(verror.New(errVomEncoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
if fc.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
+ berr := verror.AddSubErrs(verror.New(errVomDecoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
}
@@ -798,18 +804,13 @@
}
func (fc *flowClient) close(err error) error {
- if _, ok := err.(verror.E); err != nil && !ok {
- // TODO(cnicolaou): remove this once the second CL in this
- // series of CLs to use verror consistently is complete.
- vlog.Infof("WARNING: expected %v to be a verror", err)
- }
subErr := verror.SubErr{Err: err, Options: verror.Print}
subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
if cerr := fc.flow.Close(); cerr != nil && err == nil {
return verror.New(verror.ErrInternal, fc.ctx, subErr)
}
- switch {
- case verror.ErrorID(err) == verror.ErrBadProtocol.ID:
+ switch verror.ErrorID(err) {
+ case verror.ErrBadProtocol.ID, errRequestEncoding.ID, errArgEncoding.ID:
switch fc.ctx.Err() {
case context.DeadlineExceeded:
timeout := verror.New(verror.ErrTimeout, fc.ctx)
@@ -820,11 +821,13 @@
err := verror.AddSubErrs(canceled, fc.ctx, subErr)
return err
}
- case verror.ErrorID(err) == verror.ErrTimeout.ID:
+ case errVomEncoder.ID, errVomDecoder.ID:
+ badProtocol := verror.New(verror.ErrBadProtocol, fc.ctx)
+ err = verror.AddSubErrs(badProtocol, fc.ctx, subErr)
+ case verror.ErrTimeout.ID:
// Canceled trumps timeout.
if fc.ctx.Err() == context.Canceled {
- // TODO(cnicolaou,m3b): reintroduce 'append' when the new verror API is done.
- return verror.New(verror.ErrCanceled, fc.ctx, err.Error())
+ return verror.AddSubErrs(verror.New(verror.ErrCanceled, fc.ctx), fc.ctx, subErr)
}
}
return err
@@ -847,12 +850,12 @@
TraceRequest: vtrace.GetRequest(fc.ctx),
}
if err := fc.enc.Encode(req); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
+ berr := verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err)
return fc.close(berr)
}
for ix, arg := range args {
if err := fc.enc.Encode(arg); err != nil {
- berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errArgEncoding, fc.ctx, ix, err))
+ berr := verror.New(errArgEncoding, fc.ctx, ix, err)
return fc.close(berr)
}
}
@@ -1008,9 +1011,8 @@
// Incorporate any VTrace info that was returned.
vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse)
if fc.response.Error != nil {
- // TODO(cnicolaou): remove verror.ErrNoAccess with verror version
- // when rpc.Server is converted.
- if verror.ErrorID(fc.response.Error) == verror.ErrNoAccess.ID && fc.dc != nil {
+ id := verror.ErrorID(fc.response.Error)
+ if id == verror.ErrNoAccess.ID && fc.dc != nil {
// In case the error was caused by a bad discharge, we do not want to get stuck
// with retrying again and again with this discharge. As there is no direct way
// to detect it, we conservatively flush all discharges we used from the cache.
@@ -1018,6 +1020,9 @@
vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
fc.dc.Invalidate(fc.discharges...)
}
+ if id == errBadNumInputArgs.ID || id == errBadInputArg.ID {
+ return fc.close(verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error))
+ }
return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error))
}
if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
diff --git a/profiles/internal/rpc/options.go b/profiles/internal/rpc/options.go
index d710939..5a61261 100644
--- a/profiles/internal/rpc/options.go
+++ b/profiles/internal/rpc/options.go
@@ -7,11 +7,11 @@
import (
"time"
- "v.io/x/ref/profiles/internal/rpc/stream"
-
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
+
+ "v.io/x/ref/profiles/internal/rpc/stream"
)
// PreferredProtocols instructs the Runtime implementation to select
diff --git a/profiles/internal/rpc/reserved.go b/profiles/internal/rpc/reserved.go
index 7152349..c095378 100644
--- a/profiles/internal/rpc/reserved.go
+++ b/profiles/internal/rpc/reserved.go
@@ -83,14 +83,14 @@
disp = r.dispReserved
}
if disp == nil {
- return nil, rpc.NewErrUnknownSuffix(ctx, suffix)
+ return nil, verror.New(verror.ErrUnknownSuffix, ctx, suffix)
}
obj, _, err := disp.Lookup(suffix)
switch {
case err != nil:
return nil, err
case obj == nil:
- return nil, rpc.NewErrUnknownSuffix(ctx, suffix)
+ return nil, verror.New(verror.ErrUnknownSuffix, ctx, suffix)
}
invoker, err := objectToInvoker(obj)
if err != nil {
@@ -126,14 +126,14 @@
disp = r.dispReserved
}
if disp == nil {
- return signature.Method{}, rpc.NewErrUnknownMethod(ctx, "__MethodSignature")
+ return signature.Method{}, verror.New(verror.ErrUnknownMethod, ctx, rpc.ReservedMethodSignature)
}
obj, _, err := disp.Lookup(suffix)
switch {
case err != nil:
return signature.Method{}, err
case obj == nil:
- return signature.Method{}, rpc.NewErrUnknownMethod(ctx, "__MethodSignature")
+ return signature.Method{}, verror.New(verror.ErrUnknownMethod, ctx, rpc.ReservedMethodSignature)
}
invoker, err := objectToInvoker(obj)
if err != nil {
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index db689b8..76260fa 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -33,9 +33,20 @@
inaming "v.io/x/ref/profiles/internal/naming"
"v.io/x/ref/profiles/internal/rpc/stream"
"v.io/x/ref/profiles/internal/rpc/stream/vc"
+)
- // TODO(cnicolaou): finish verror2 -> verror transition, in particular
- // for communicating from server to client.
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errResponseEncoding = reg(".errResponseEncoding", "failed to encode RPC response {3} <-> {4}{:5}")
+ errResultEncoding = reg(".errResultEncoding", "failed to encode result #{3} [{4}]{:5}")
+ errFailedToResolveToEndpoint = reg(".errFailedToResolveToEndpoint", "failed to resolve {3} to an endpoint")
+ errFailedToResolveProxy = reg(".errFailedToResolveProxy", "failed to resolve proxy {3}{:4}")
+ errFailedToListenForProxy = reg(".errFailedToListenForProxy", "failed to listen on {3}{:4}")
+ errInternalTypeConversion = reg(".errInternalTypeConversion", "failed to convert {3} to v.io/x/ref/profiles/internal/naming.Endpoint")
+ errFailedToParseIP = reg(".errFailedToParseIP", "failed to parse {3} as an IP host")
)
// state for each requested listen address
@@ -302,7 +313,7 @@
return ep.String(), nil
}
}
- return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
+ return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
}
// getPossbileAddrs returns an appropriate set of addresses that could be used
@@ -314,7 +325,7 @@
ip := net.ParseIP(host)
if ip == nil {
- return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
+ return nil, false, verror.New(errFailedToParseIP, nil, host)
}
addrFromIP := func(ip net.IP) rpc.Address {
@@ -347,7 +358,7 @@
func (s *server) createEndpoints(lep naming.Endpoint, chooser rpc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
iep, ok := lep.(*inaming.Endpoint)
if !ok {
- return nil, "", false, fmt.Errorf("internal type conversion error for %T", lep)
+ return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
}
if !strings.HasPrefix(iep.Protocol, "tcp") &&
!strings.HasPrefix(iep.Protocol, "ws") {
@@ -479,16 +490,16 @@
func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
resolved, err := s.resolveToEndpoint(proxy)
if err != nil {
- return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
+ return nil, nil, verror.New(errFailedToResolveProxy, s.ctx, proxy, err)
}
ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.principal, s.blessings, s.listenerOpts...)
if err != nil {
- return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
+ return nil, nil, verror.New(errFailedToListenForProxy, s.ctx, resolved, err)
}
iep, ok := ep.(*inaming.Endpoint)
if !ok {
ln.Close()
- return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
+ return nil, nil, verror.New(errInternalTypeConversion, s.ctx, fmt.Sprintf("%T", ep))
}
s.Lock()
s.proxies[proxy] = proxyState{iep, nil}
@@ -617,7 +628,7 @@
defer calls.Done()
fs, err := newFlowServer(flow, s)
if err != nil {
- vlog.Errorf("newFlowServer on %v failed: %v", ep, err)
+ vlog.VI(1).Infof("newFlowServer on %v failed: %v", ep, err)
return
}
if err := fs.serve(); err != nil {
@@ -626,7 +637,7 @@
// TODO(cnicolaou): revisit this when verror2 transition is
// done.
if err != io.EOF {
- vlog.VI(2).Infof("Flow serve on %v failed: %v", ep, err)
+ vlog.VI(2).Infof("Flow.serve on %v failed: %v", ep, err)
}
}
}(flow)
@@ -754,7 +765,7 @@
func (d leafDispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
if suffix != "" {
- return nil, nil, rpc.NewErrUnknownSuffix(nil, suffix)
+ return nil, nil, verror.New(verror.ErrUnknownSuffix, nil, suffix)
}
return d.invoker, d.auth, nil
}
@@ -1044,7 +1055,7 @@
if err == io.EOF {
return err
}
- return fmt.Errorf("rpc: response encoding failed: %v", err)
+ return verror.New(errResponseEncoding, fs.Context(), fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
}
if response.Error != nil {
return response.Error
@@ -1054,7 +1065,7 @@
if err == io.EOF {
return err
}
- return fmt.Errorf("rpc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
+ return verror.New(errResultEncoding, fs.Context(), ix, fmt.Sprintf("%T=%v", res, res), err)
}
}
// TODO(ashankar): Should unread data from the flow be drained?
@@ -1139,11 +1150,11 @@
return nil, err
}
if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
- return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadNumInputArgs(fs.T, fs.suffix, fs.method, called, want))
+ return nil, newErrBadNumInputArgs(fs.T, fs.suffix, fs.method, called, want)
}
for ix, argptr := range argptrs {
if err := fs.dec.Decode(argptr); err != nil {
- return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadInputArg(fs.T, fs.suffix, fs.method, uint64(ix), err))
+ return nil, newErrBadInputArg(fs.T, fs.suffix, fs.method, uint64(ix), err)
}
}
@@ -1199,7 +1210,7 @@
return invoker, auth, nil
}
}
- return nil, nil, rpc.NewErrUnknownSuffix(nil, suffix)
+ return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.T, suffix)
}
func objectToInvoker(obj interface{}) (rpc.Invoker, error) {
diff --git a/profiles/internal/rpc/server_authorizer.go b/profiles/internal/rpc/server_authorizer.go
index 3166d1b..2922824 100644
--- a/profiles/internal/rpc/server_authorizer.go
+++ b/profiles/internal/rpc/server_authorizer.go
@@ -19,18 +19,18 @@
const enableSecureServerAuth = false
var (
- errNoBlessings = verror.Register(pkgPath+".noBlessings", verror.NoRetry, "server has not presented any blessings")
-
- errAuthPossibleManInTheMiddle = verror.Register(pkgPath+".authPossibleManInTheMiddle",
- verror.NoRetry, "server blessings {3} do not match expectations set by endpoint {4}, possible man-in-the-middle or the server blessings are not accepted by the client? (endpoint: {5}, rejected blessings: {6})")
-
- errAuthServerNotAllowed = verror.Register(pkgPath+".authServerNotAllowed",
- verror.NoRetry, "server blessings {3} do not match any allowed server patterns {4}{:5}")
-
- errAuthServerKeyNotAllowed = verror.Register(pkgPath+".authServerKeyNotAllowed",
- verror.NoRetry, "remote public key {3} not matched by server key {4}")
-
- errMultiplePublicKeys = verror.Register(pkgPath+".multiplePublicKeyOptions", verror.NoRetry, "multiple ServerPublicKey options supplied to call, at most one is allowed")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errNoBlessingsFromServer = reg(".errNoBlessingsFromServer", "server has not presented any blessings")
+ errAuthPossibleManInTheMiddle = reg(".errAuthPossibleManInTheMiddle",
+ "server blessings {3} do not match expectations set by endpoint {4}, possible man-in-the-middle or the server blessings are not accepted by the client? (endpoint: {5}, rejected blessings: {6})")
+ errAuthServerNotAllowed = reg(".errAuthServerNotAllowed",
+ "server blessings {3} do not match any allowed server patterns {4}{:5}")
+ errAuthServerKeyNotAllowed = reg(".errAuthServerKeyNotAllowed",
+ "remote public key {3} not matched by server key {4}")
+ errMultiplePublicKeys = reg(".errMultiplePublicKeyOptions", "multiple ServerPublicKey options supplied to call, at most one is allowed")
)
// serverAuthorizer implements security.Authorizer.
@@ -70,7 +70,7 @@
func (a *serverAuthorizer) Authorize(ctx *context.T) error {
call := security.GetCall(ctx)
if call.RemoteBlessings().IsZero() {
- return verror.New(errNoBlessings, ctx)
+ return verror.New(errNoBlessingsFromServer, ctx)
}
serverBlessings, rejectedBlessings := security.RemoteBlessingNames(ctx)
diff --git a/profiles/internal/rpc/sort_endpoints.go b/profiles/internal/rpc/sort_endpoints.go
index ef1549d..8ea0bd9 100644
--- a/profiles/internal/rpc/sort_endpoints.go
+++ b/profiles/internal/rpc/sort_endpoints.go
@@ -9,37 +9,26 @@
"net"
"sort"
- "v.io/v23/naming"
"v.io/x/lib/vlog"
+ "v.io/v23/naming"
+ "v.io/v23/verror"
+
"v.io/x/lib/netstate"
inaming "v.io/x/ref/profiles/internal/naming"
"v.io/x/ref/profiles/internal/rpc/version"
)
-type errorAccumulator struct {
- errs []error
-}
-
-func (e *errorAccumulator) add(err error) {
- e.errs = append(e.errs, err)
-}
-
-func (e *errorAccumulator) failed() bool {
- return len(e.errs) > 0
-}
-
-func (e *errorAccumulator) String() string {
- r := ""
- for _, err := range e.errs {
- r += fmt.Sprintf("(%s)", err)
- }
- return r
-}
-
-func newErrorAccumulator() *errorAccumulator {
- return &errorAccumulator{errs: make([]error, 0, 4)}
-}
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errMalformedEndpoint = reg(".errMalformedEndpoint", "malformed endpoint{:3}")
+ errUndesiredProtocol = reg(".errUndesiredProtocol", "undesired protocol{:3}")
+ errIncompatibleEndpointVersions = reg(".errIncompatibleEndpointVersions", "incompatible endpoint versions{:3}")
+ errNoCompatibleServers = reg(".errNoComaptibleServers", "failed to find any compatible servers{:3}")
+)
type serverLocality int
@@ -99,27 +88,31 @@
func filterAndOrderServers(servers []naming.MountedServer, protocols []string, ipnets []*net.IPNet) ([]naming.MountedServer, error) {
vlog.VI(3).Infof("filterAndOrderServers%v: %v", protocols, servers)
var (
- errs = newErrorAccumulator()
+ errs = verror.SubErrs{}
list = make(sortableServerList, 0, len(servers))
protoRanks = mkProtocolRankMap(protocols)
)
if len(protoRanks) == 0 {
protoRanks = defaultPreferredProtocolOrder
}
+ adderr := func(name string, err error) {
+ errs = append(errs, verror.SubErr{Name: "server=" + name, Err: err, Options: verror.Print})
+ }
for _, server := range servers {
name := server.Server
ep, err := name2endpoint(name)
if err != nil {
- errs.add(fmt.Errorf("malformed endpoint %q: %v", name, err))
+ adderr(name, verror.New(errMalformedEndpoint, nil, err))
continue
}
if err = version.CheckCompatibility(ep); err != nil {
- errs.add(fmt.Errorf("%q: %v", name, err))
+ // TODO(cnicolaou): convert rpc/version to verror.
+ adderr(name, verror.New(errIncompatibleEndpointVersions, nil, err))
continue
}
rank, err := protocol2rank(ep.Addr().Network(), protoRanks)
if err != nil {
- errs.add(fmt.Errorf("%q: %v", name, err))
+ adderr(name, err)
continue
}
list = append(list, sortableServer{
@@ -129,7 +122,7 @@
})
}
if len(list) == 0 {
- return nil, fmt.Errorf("failed to find any compatible servers: %v", errs)
+ return nil, verror.AddSubErrs(verror.New(errNoCompatibleServers, nil), nil, errs...)
}
// TODO(ashankar): Don't have to use stable sorting, could
// just use sort.Sort. The only problem with that is the
@@ -178,7 +171,7 @@
if protocol == naming.UnknownProtocol {
return -1, nil
}
- return 0, fmt.Errorf("undesired protocol %q", protocol)
+ return 0, verror.New(errUndesiredProtocol, nil, protocol)
}
// locality returns the serverLocality to use given an endpoint and the
diff --git a/profiles/internal/rpc/sort_internal_test.go b/profiles/internal/rpc/sort_internal_test.go
index 67ad39d..a4c7b4d 100644
--- a/profiles/internal/rpc/sort_internal_test.go
+++ b/profiles/internal/rpc/sort_internal_test.go
@@ -23,7 +23,7 @@
servers := []naming.MountedServer{}
_, err := filterAndOrderServers(servers, []string{"tcp"}, nil)
- if err == nil || err.Error() != "failed to find any compatible servers: " {
+ if err == nil || err.Error() != "failed to find any compatible servers" {
t.Errorf("expected a different error: %v", err)
}
@@ -44,10 +44,9 @@
}
_, err = filterAndOrderServers(servers, []string{"foobar"}, nil)
- if err == nil || !strings.HasSuffix(err.Error(), "undesired protocol \"tcp\")") {
+ if err == nil || !strings.HasSuffix(err.Error(), "undesired protocol: tcp]") {
t.Errorf("expected a different error to: %v", err)
}
-
}
func TestOrderingByProtocol(t *testing.T) {
diff --git a/profiles/internal/rpc/stream/errors.go b/profiles/internal/rpc/stream/errors.go
index 0aa2e9d..82079e6 100644
--- a/profiles/internal/rpc/stream/errors.go
+++ b/profiles/internal/rpc/stream/errors.go
@@ -18,14 +18,14 @@
// This practiced of omitting {1}{2} is used throughout the stream packages since all
// of their errors are intended to be used as arguments to higher level errors.
var (
- ErrSecurity = verror.Register(pkgPath+".errSecurity", verror.NoRetry, "{:3}")
- ErrNetwork = verror.Register(pkgPath+".errNetwork", verror.NoRetry, "{:3}")
- ErrProxy = verror.Register(pkgPath+".errProxy", verror.NoRetry, "{:3}")
- ErrBadArg = verror.Register(pkgPath+".errBadArg", verror.NoRetry, "{:3}")
- ErrBadState = verror.Register(pkgPath+".errBadState", verror.NoRetry, "{:3}")
- ErrAborted = verror.Register(pkgPath+".errAborted", verror.NoRetry, "{:3}")
- // TODO(cnicolaou): remove this when the rest of the stream sub packages are converted.
- ErrSecOrNet = verror.Register(pkgPath+".errSecOrNet", verror.NoRetry, "{:3}")
+ // TODO(cnicolaou): rename ErrSecurity to ErrAuth
+ ErrSecurity = verror.Register(pkgPath+".errSecurity", verror.NoRetry, "{:3}")
+ ErrNotTrusted = verror.Register(pkgPath+".errNotTrusted", verror.NoRetry, "{:3}")
+ ErrNetwork = verror.Register(pkgPath+".errNetwork", verror.NoRetry, "{:3}")
+ ErrProxy = verror.Register(pkgPath+".errProxy", verror.NoRetry, "{:3}")
+ ErrBadArg = verror.Register(pkgPath+".errBadArg", verror.NoRetry, "{:3}")
+ ErrBadState = verror.Register(pkgPath+".errBadState", verror.NoRetry, "{:3}")
+ ErrAborted = verror.Register(pkgPath+".errAborted", verror.NoRetry, "{:3}")
)
// NetError implements net.Error
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index bc4805e..02c77c5 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -119,6 +119,13 @@
}
}
if err != nil {
+ // TODO(cnicolaou): closeListener in manager.go writes to ln (by calling
+ // ln.Close()) and we read it here in the Infof output, so there is
+ // an unguarded read here that will fail under --race. This will only show
+ // itself if the Infof below is changed to always be printed (which is
+ // how I noticed). The right solution is to lock these datastructures, but
+ // that can wait until a bigger overhaul occurs. For now, we leave this at
+ // VI(1) knowing that it's basically harmless.
vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
return
}
@@ -232,10 +239,9 @@
vf.StopAccepting()
if verror.ErrorID(err) == verror.ErrAborted.ID {
ln.manager.vifs.Delete(vf)
+ return nil, nil, verror.New(stream.ErrAborted, nil, err)
}
- // TODO(cnicolaou): use one of ErrSecurity or ErrProtocol when the vif package
- // is converted.
- return nil, nil, verror.New(stream.ErrSecOrNet, nil, verror.New(errFailedToEstablishVC, nil, err))
+ return nil, nil, err
}
flow, err := vc.Connect()
if err != nil {
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 88fa2af..e76b1ac 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -137,7 +137,7 @@
vf, err := vif.InternalNewDialedVIF(conn, m.rid, principal, vRange, m.deleteVIF, opts...)
if err != nil {
conn.Close()
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ return nil, err
}
// TODO(ashankar): If two goroutines are simultaneously invoking
// manager.Dial, it is possible that two VIFs are inserted into m.vifs
diff --git a/profiles/internal/rpc/stream/vc/auth.go b/profiles/internal/rpc/stream/vc/auth.go
index ffb806f..b74a351 100644
--- a/profiles/internal/rpc/stream/vc/auth.go
+++ b/profiles/internal/rpc/stream/vc/auth.go
@@ -33,7 +33,6 @@
errVomEncodeBlessing = reg(".errVomEncodeRequest", "failed to encode blessing{:3}")
errHandshakeMessage = reg(".errHandshakeMessage", "failed to read hanshake message{:3}")
errInvalidSignatureInMessage = reg(".errInvalidSignatureInMessage", "signature does not verify in authentication handshake message")
- errEncryptBlessing = reg(".errEncryptBlessing", "failed to encrypt blessing{:3}")
errFailedToCreateSelfBlessing = reg(".errFailedToCreateSelfBlessing", "failed to create self blessing{:3}")
errNoBlessingsToPresentToServer = reg(".errerrNoBlessingsToPresentToServer ", "no blessings to present as a server")
)
@@ -49,14 +48,14 @@
if tpcavs := server.ThirdPartyCaveats(); len(tpcavs) > 0 && dc != nil {
serverDischarges = dc.PrepareDischarges(nil, tpcavs, security.DischargeImpetus{})
}
- if errID, err := writeBlessings(conn, authServerContextTag, crypter, principal, server, serverDischarges, v); err != nil {
- return security.Blessings{}, nil, verror.New(errID, nil, err)
+ if err := writeBlessings(conn, authServerContextTag, crypter, principal, server, serverDischarges, v); err != nil {
+ return security.Blessings{}, nil, err
}
// Note that since the client uses a self-signed blessing to authenticate
// during VC setup, it does not share any discharges.
- client, _, errID, err := readBlessings(conn, authClientContextTag, crypter, v)
+ client, _, err := readBlessings(conn, authClientContextTag, crypter, v)
if err != nil {
- return security.Blessings{}, nil, verror.New(errID, nil, err)
+ return security.Blessings{}, nil, err
}
return client, mkDischargeMap(serverDischarges), nil
}
@@ -68,16 +67,16 @@
// The client will only share its blessings if the server (who shares its
// blessings first) is authorized as per the authorizer for this RPC.
func AuthenticateAsClient(conn io.ReadWriteCloser, crypter crypto.Crypter, params security.CallParams, auth *ServerAuthorizer, v version.RPCVersion) (security.Blessings, security.Blessings, map[string]security.Discharge, error) {
- server, serverDischarges, errID, err := readBlessings(conn, authServerContextTag, crypter, v)
+ server, serverDischarges, err := readBlessings(conn, authServerContextTag, crypter, v)
if err != nil {
- return security.Blessings{}, security.Blessings{}, nil, verror.New(errID, nil, err)
+ return security.Blessings{}, security.Blessings{}, nil, err
}
// Authorize the server based on the provided authorizer.
if auth != nil {
params.RemoteBlessings = server
params.RemoteDischarges = serverDischarges
if err := auth.Authorize(params); err != nil {
- return security.Blessings{}, security.Blessings{}, nil, verror.New(stream.ErrSecurity, nil, err)
+ return security.Blessings{}, security.Blessings{}, nil, verror.New(stream.ErrNotTrusted, nil, err)
}
}
@@ -89,66 +88,66 @@
if err != nil {
return security.Blessings{}, security.Blessings{}, nil, verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateSelfBlessing, nil, err))
}
- if errID, err := writeBlessings(conn, authClientContextTag, crypter, principal, client, nil, v); err != nil {
- return security.Blessings{}, security.Blessings{}, nil, verror.New(errID, nil, err)
+ if err := writeBlessings(conn, authClientContextTag, crypter, principal, client, nil, v); err != nil {
+ return security.Blessings{}, security.Blessings{}, nil, err
}
return server, client, serverDischarges, nil
}
-func writeBlessings(w io.Writer, tag []byte, crypter crypto.Crypter, p security.Principal, b security.Blessings, discharges []security.Discharge, v version.RPCVersion) (verror.IDAction, error) {
+func writeBlessings(w io.Writer, tag []byte, crypter crypto.Crypter, p security.Principal, b security.Blessings, discharges []security.Discharge, v version.RPCVersion) error {
signature, err := p.Sign(append(tag, crypter.ChannelBinding()...))
if err != nil {
- return stream.ErrSecurity, err
+ return err
}
var buf bytes.Buffer
enc, err := vom.NewEncoder(&buf)
if err != nil {
- return stream.ErrNetwork, verror.New(errVomEncoder, nil, err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncoder, nil, err))
}
if err := enc.Encode(signature); err != nil {
- return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
}
if err := enc.Encode(b); err != nil {
- return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
}
if v >= version.RPCVersion5 {
if err := enc.Encode(discharges); err != nil {
- return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
}
}
msg, err := crypter.Encrypt(iobuf.NewSlice(buf.Bytes()))
if err != nil {
- return verror.IDAction{ID: verror.ErrorID(err)}, verror.New(errEncryptBlessing, nil, err)
+ return err
}
defer msg.Release()
enc, err = vom.NewEncoder(w)
if err != nil {
- return stream.ErrNetwork, verror.New(errVomEncoder, nil, err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncoder, nil, err))
}
if err := enc.Encode(msg.Contents); err != nil {
- return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
}
- return verror.IDAction{}, nil
+ return nil
}
-func readBlessings(r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, verror.IDAction, error) {
+func readBlessings(r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
var msg []byte
var noBlessings security.Blessings
dec, err := vom.NewDecoder(r)
if err != nil {
- return noBlessings, nil, stream.ErrNetwork, verror.New(errVomDecoder, nil, err)
+ return noBlessings, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecoder, nil, err))
}
if err := dec.Decode(&msg); err != nil {
- return noBlessings, nil, stream.ErrNetwork, verror.New(errHandshakeMessage, nil, err)
+ return noBlessings, nil, verror.New(stream.ErrNetwork, nil, verror.New(errHandshakeMessage, nil, err))
}
buf, err := crypter.Decrypt(iobuf.NewSlice(msg))
if err != nil {
- return noBlessings, nil, verror.IDAction{ID: verror.ErrorID(err)}, err
+ return noBlessings, nil, err
}
defer buf.Release()
dec, err = vom.NewDecoder(bytes.NewReader(buf.Contents))
if err != nil {
- return noBlessings, nil, stream.ErrNetwork, verror.New(errVomDecoder, nil, err)
+ return noBlessings, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecoder, nil, err))
}
var (
@@ -156,21 +155,21 @@
sig security.Signature
)
if err = dec.Decode(&sig); err != nil {
- return noBlessings, nil, stream.ErrNetwork, err
+ return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
if err = dec.Decode(&blessings); err != nil {
- return noBlessings, nil, stream.ErrNetwork, err
+ return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
var discharges []security.Discharge
if v >= version.RPCVersion5 {
if err := dec.Decode(&discharges); err != nil {
- return noBlessings, nil, stream.ErrNetwork, err
+ return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
}
if !sig.Verify(blessings.PublicKey(), append(tag, crypter.ChannelBinding()...)) {
- return noBlessings, nil, stream.ErrSecurity, verror.New(errInvalidSignatureInMessage, nil)
+ return noBlessings, nil, verror.New(stream.ErrSecurity, nil, verror.New(errInvalidSignatureInMessage, nil))
}
- return blessings, mkDischargeMap(discharges), verror.IDAction{}, nil
+ return blessings, mkDischargeMap(discharges), nil
}
func mkDischargeMap(discharges []security.Discharge) map[string]security.Discharge {
diff --git a/profiles/internal/rpc/stream/vc/flow.go b/profiles/internal/rpc/stream/vc/flow.go
index 4a5af99..88a6b42 100644
--- a/profiles/internal/rpc/stream/vc/flow.go
+++ b/profiles/internal/rpc/stream/vc/flow.go
@@ -7,6 +7,7 @@
import (
"v.io/v23/naming"
"v.io/v23/security"
+
"v.io/x/ref/profiles/internal/rpc/stream"
)
diff --git a/profiles/internal/rpc/stream/vc/vc.go b/profiles/internal/rpc/stream/vc/vc.go
index c85342f..80d6084 100644
--- a/profiles/internal/rpc/stream/vc/vc.go
+++ b/profiles/internal/rpc/stream/vc/vc.go
@@ -58,7 +58,6 @@
errFailedToSetupTLS = reg(".errFailedToSetupTLS", "failed to setup TLS{:3}")
errFailedToCreateFlowForAuth = reg(".errFailedToCreateFlowForAuth", "failed to create a Flow for authentication{:3}")
errAuthFailed = reg(".errAuthFailed", "authentication failed{:3}")
- errFailedToConnectSystemFlows = reg(".errFailedToConnectSystemFlows", "failed to connect system flows{:3}")
errNoActiveListener = reg(".errNoActiveListener", "no active listener on VCI {3}")
errFailedToCreateWriterForNewFlow = reg(".errFailedToCreateWriterForNewFlow", "failed to create writer for new flow({3}){:4}")
errFailedToEnqueueFlow = reg(".errFailedToEnqueueFlow", "failed to enqueue flow at listener{:3}")
@@ -430,12 +429,16 @@
return nil
}
-// err prefers vc.closeReason over err.
-func (vc *VC) err(err error) error {
+// appendCloseReason adds a closeReason, if any, as a sub error to err.
+func (vc *VC) appendCloseReason(err error) error {
vc.mu.Lock()
defer vc.mu.Unlock()
if vc.closeReason != nil {
- return vc.closeReason
+ return verror.AddSubErrs(err, nil, verror.SubErr{
+ Name: "remote=" + vc.RemoteEndpoint().String(),
+ Err: vc.closeReason,
+ Options: verror.Print,
+ })
}
return err
}
@@ -465,11 +468,16 @@
// Establish TLS
handshakeConn, err := vc.connectFID(HandshakeFlowID, systemFlowPriority)
if err != nil {
- return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateTLSFlow, nil, err)))
+ return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateTLSFlow, nil, err)))
}
crypter, err := crypto.NewTLSClient(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), tlsSessionCache, vc.pool)
if err != nil {
- return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToSetupTLS, nil, err)))
+ // Assume that we don't trust the server if the TLS handshake fails for any
+ // reason other than EOF.
+ if err == io.EOF {
+ return vc.appendCloseReason(verror.New(stream.ErrNetwork, nil, verror.New(errFailedToSetupTLS, nil, err)))
+ }
+ return vc.appendCloseReason(verror.New(stream.ErrNotTrusted, nil, verror.New(errFailedToSetupTLS, nil, err)))
}
// Authenticate (exchange identities)
@@ -483,7 +491,7 @@
// stream API.
authConn, err := vc.connectFID(AuthFlowID, systemFlowPriority)
if err != nil {
- return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForAuth, nil, err)))
+ return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForAuth, nil, err)))
}
params := security.CallParams{
LocalPrincipal: principal,
@@ -494,7 +502,7 @@
if err != nil || len(rBlessings.ThirdPartyCaveats()) == 0 {
authConn.Close()
if err != nil {
- return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errAuthFailed, nil, err)))
+ return vc.appendCloseReason(err)
}
} else {
go vc.recvDischargesLoop(authConn)
@@ -512,7 +520,7 @@
// Open system flows.
if err = vc.connectSystemFlows(); err != nil {
- return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToConnectSystemFlows, nil, err)))
+ return vc.appendCloseReason(err)
}
vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
@@ -572,7 +580,7 @@
go func() {
sendErr := func(err error) {
ln.Close()
- result <- HandshakeResult{nil, vc.err(err)}
+ result <- HandshakeResult{nil, vc.appendCloseReason(err)}
}
// TODO(ashankar): There should be a timeout on this Accept
// call. Otherwise, a malicious (or incompetent) client can
@@ -731,18 +739,18 @@
}
conn, err := vc.connectFID(TypeFlowID, systemFlowPriority)
if err != nil {
- return verror.New(errFailedToCreateFlowForWireType, nil, err)
+ return verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForWireType, nil, err))
}
typeEnc, err := vom.NewTypeEncoder(conn)
if err != nil {
conn.Close()
- return verror.New(errVomTypedEncoder, nil, err)
+ return verror.New(stream.ErrSecurity, nil, verror.New(errVomTypedEncoder, nil, err))
}
vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
typeDec, err := vom.NewTypeDecoder(conn)
if err != nil {
conn.Close()
- return verror.New(errVomTypedDecoder, nil, err)
+ return verror.New(stream.ErrSecurity, nil, verror.New(errVomTypedDecoder, nil, err))
}
vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
return nil
diff --git a/profiles/internal/rpc/stream/vif/auth.go b/profiles/internal/rpc/stream/vif/auth.go
index 4e273e7..d1cb403 100644
--- a/profiles/internal/rpc/stream/vif/auth.go
+++ b/profiles/internal/rpc/stream/vif/auth.go
@@ -77,7 +77,7 @@
var err error
versions, err = versions.Intersect(&version.Range{Min: 0, Max: rpcversion.RPCVersion5})
if err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrNetwork, nil, err)
}
}
if versions.Max < rpcversion.RPCVersion6 {
@@ -87,16 +87,16 @@
// The client has not yet sent its public data. Construct it and send it.
pvt, pub, err := makeHopSetup(versions)
if err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrSecurity, nil, err)
}
if err := message.WriteTo(writer, &pub, nullCipher); err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrNetwork, nil, err)
}
// Read the server's public data.
pmsg, err := message.ReadFrom(reader, nullCipher)
if err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrNetwork, nil, err)
}
ppub, ok := pmsg.(*message.HopSetup)
if !ok {
@@ -106,7 +106,7 @@
// Choose the max version in the intersection.
vrange, err := pub.Versions.Intersect(&ppub.Versions)
if err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrNetwork, nil, err)
}
v := vrange.Max
if v < rpcversion.RPCVersion6 {
@@ -131,7 +131,7 @@
// TODO(jyh): act upon the authentication results.
_, _, _, err := vc.AuthenticateAsClient(sconn, crypto.NewNullCrypter(), params, auth, version)
if err != nil {
- return nil, verror.New(errAuthFailed, nil, err)
+ return nil, err
}
return c, nil
}
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index 85798a2..176cba0 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -182,6 +182,7 @@
return nil, err
}
var blessings security.Blessings
+
if principal != nil {
blessings = principal.BlessingStore().Default()
}
@@ -219,19 +220,19 @@
expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow)
if err != nil {
- return nil, verror.New(errBqueueWriterForXpress, nil, err)
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForXpress, nil, err))
}
expressQ.Release(-1) // Disable flow control
flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size())
if err != nil {
- return nil, verror.New(errBqueueWriterForControl, nil, err)
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForControl, nil, err))
}
flowQ.Release(-1) // Disable flow control
stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1)
if err != nil {
- return nil, verror.New(errBqueueWriterForStopping, nil, err)
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForStopping, nil, err))
}
stopQ.Release(-1) // Disable flow control
@@ -305,9 +306,8 @@
}
if err := vc.HandshakeDialedVC(principal, opts...); err != nil {
vif.deleteVC(vc.VCI())
- verr := verror.New(stream.ErrSecurity, nil, verror.New(errVCHandshakeFailed, nil, err))
- vc.Close(verr)
- return nil, verr
+ vc.Close(err)
+ return nil, err
}
return vc, nil
}
@@ -439,7 +439,7 @@
func (vif *VIF) String() string {
l := vif.conn.LocalAddr()
r := vif.conn.RemoteAddr()
- return fmt.Sprintf("(%s, %s) <-> (%s, %s)", r.Network(), r, l.Network(), l)
+ return fmt.Sprintf("(%s, %s) <-> (%s, %s)", l.Network(), l, r.Network(), r)
}
func (vif *VIF) readLoop() {
@@ -790,7 +790,7 @@
return err
}
if n, err := vif.conn.Write(msg); err != nil {
- return verror.New(errWriteFailed, nil, n, err, len(msg))
+ return verror.New(stream.ErrNetwork, nil, verror.New(errWriteFailed, nil, n, err, len(msg)))
}
return nil
}
diff --git a/profiles/internal/rpc/test/client_test.go b/profiles/internal/rpc/test/client_test.go
index 7374a2d..8b49b31 100644
--- a/profiles/internal/rpc/test/client_test.go
+++ b/profiles/internal/rpc/test/client_test.go
@@ -7,6 +7,7 @@
import (
"fmt"
"io"
+ "net"
"os"
"path/filepath"
"runtime"
@@ -20,12 +21,15 @@
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
+ "v.io/v23/vdlroot/signature"
"v.io/v23/verror"
"v.io/x/ref/envvar"
_ "v.io/x/ref/profiles"
inaming "v.io/x/ref/profiles/internal/naming"
irpc "v.io/x/ref/profiles/internal/rpc"
+ "v.io/x/ref/profiles/internal/rpc/stream/message"
+ "v.io/x/ref/profiles/internal/testing/mocks/mocknet"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/test"
"v.io/x/ref/test/expect"
@@ -36,11 +40,19 @@
//go:generate v23 test generate .
func rootMT(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ seclevel := options.SecurityConfidential
+ if len(args) == 1 && args[0] == "nosec" {
+ seclevel = options.SecurityNone
+ }
+ return runRootMT(stdin, stdout, stderr, seclevel, env, args...)
+}
+
+func runRootMT(stdin io.Reader, stdout, stderr io.Writer, seclevel options.SecurityLevel, env map[string]string, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
lspec := v23.GetListenSpec(ctx)
- server, err := v23.NewServer(ctx, options.ServesMountTable(true))
+ server, err := v23.NewServer(ctx, options.ServesMountTable(true), seclevel)
if err != nil {
return fmt.Errorf("root failed: %v", err)
}
@@ -138,12 +150,12 @@
return ctx, shutdown
}
-func runMountTable(t *testing.T, ctx *context.T) (*modules.Shell, func()) {
+func runMountTable(t *testing.T, ctx *context.T, args ...string) (*modules.Shell, func()) {
sh, err := modules.NewShell(ctx, nil, testing.Verbose(), t)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
- root, err := sh.Start("rootMT", nil)
+ root, err := sh.Start("rootMT", nil, args...)
if err != nil {
t.Fatalf("unexpected error for root mt: %s", err)
}
@@ -250,14 +262,17 @@
}
}
-func logErrors(t *testing.T, logerr, logstack bool, err error) {
+func logErrors(t *testing.T, msg string, logerr, logstack, debugString bool, err error) {
_, file, line, _ := runtime.Caller(2)
loc := fmt.Sprintf("%s:%d", filepath.Base(file), line)
if logerr {
- t.Logf("%s: %v", loc, err)
+ t.Logf("%s: %s: %v", loc, msg, err)
}
if logstack {
- t.Logf("%s: %v", loc, verror.Stack(err).String())
+ t.Logf("%s: %s: %v", loc, msg, verror.Stack(err).String())
+ }
+ if debugString {
+ t.Logf("%s: %s: %v", loc, msg, verror.DebugString(err))
}
}
@@ -269,52 +284,51 @@
ns := v23.GetNamespace(ctx)
v23.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
- logErr := func(err error) {
- logErrors(t, true, false, err)
+ logErr := func(msg string, err error) {
+ logErrors(t, msg, true, false, false, err)
}
emptyCtx := &context.T{}
_, err := client.StartCall(emptyCtx, "noname", "nomethod", nil)
- logErr(err)
if verror.ErrorID(err) != verror.ErrBadArg.ID {
t.Fatalf("wrong error: %s", err)
}
+ logErr("no context", err)
p1 := options.ServerPublicKey{testutil.NewPrincipal().PublicKey()}
p2 := options.ServerPublicKey{testutil.NewPrincipal().PublicKey()}
_, err = client.StartCall(ctx, "noname", "nomethod", nil, p1, p2)
- logErr(err)
if verror.ErrorID(err) != verror.ErrBadArg.ID {
t.Fatalf("wrong error: %s", err)
}
+ logErr("too many public keys", err)
// This will fail with NoServers, but because there is no mount table
// to communicate with. The error message should include a
// 'connection refused' string.
ns.SetRoots("/127.0.0.1:8101")
_, err = client.StartCall(ctx, "noname", "nomethod", nil, options.NoRetry{})
- logErr(err)
if verror.ErrorID(err) != verror.ErrNoServers.ID {
t.Fatalf("wrong error: %s", err)
}
- if want := "connection refused"; !strings.Contains(err.Error(), want) {
+ if want := "connection refused"; !strings.Contains(verror.DebugString(err), want) {
t.Fatalf("wrong error: %s - doesn't contain %q", err, want)
}
+ logErr("no mount table", err)
// This will fail with NoServers, but because there really is no
// name registered with the mount table.
_, shutdown = runMountTable(t, ctx)
defer shutdown()
_, err = client.StartCall(ctx, "noname", "nomethod", nil, options.NoRetry{})
- logErr(err)
if verror.ErrorID(err) != verror.ErrNoServers.ID {
t.Fatalf("wrong error: %s", err)
}
roots := ns.Roots()
-
if unwanted := "connection refused"; strings.Contains(err.Error(), unwanted) {
t.Fatalf("wrong error: %s - does contain %q", err, unwanted)
}
+ logErr("no name registered", err)
// The following tests will fail with NoServers, but because there are
// no protocols that the client and servers (mount table, and "name") share.
@@ -328,20 +342,18 @@
// This will fail in its attempt to call ResolveStep to the mount table
// because we are using both the new context and the new client.
_, err = nclient.StartCall(nctx, "name", "nomethod", nil, options.NoRetry{})
- logErr(err)
if verror.ErrorID(err) != verror.ErrNoServers.ID {
t.Fatalf("wrong error: %s", err)
}
if want := "ResolveStep"; !strings.Contains(err.Error(), want) {
t.Fatalf("wrong error: %s - doesn't contain %q", err, want)
}
+ logErr("mismatched protocols", err)
// This will fail in its attempt to invoke the actual RPC because
// we are using the old context (which supplies the context for the calls
// to ResolveStep) and the new client.
_, err = nclient.StartCall(ctx, "name", "nomethod", nil, options.NoRetry{})
- logErr(err)
-
if verror.ErrorID(err) != verror.ErrNoServers.ID {
t.Fatalf("wrong error: %s", err)
}
@@ -352,6 +364,7 @@
t.Fatalf("wrong error: %s - does contain %q", err, unwanted)
}
+ logErr("mismatched protocols", err)
// The following two tests will fail due to a timeout.
ns.SetRoots("/203.0.113.10:8101")
@@ -368,7 +381,7 @@
if call != nil {
t.Fatalf("expected call to be nil")
}
- logErr(err)
+ logErr("timeout to mount table", err)
// This, second test, will fail due a timeout contacting the server itself.
ns.SetRoots(roots...)
@@ -385,7 +398,115 @@
if call != nil {
t.Fatalf("expected call to be nil")
}
- logErr(err)
+ logErr("timeout to server", err)
+}
+
+func dropDataDialer(network, address string, timeout time.Duration) (net.Conn, error) {
+ matcher := func(read bool, msg message.T) bool {
+ switch msg.(type) {
+ case *message.Data:
+ return true
+ }
+ return false
+ }
+ opts := mocknet.Opts{
+ Mode: mocknet.V23CloseAtMessage,
+ V23MessageMatcher: matcher,
+ }
+ return mocknet.DialerWithOpts(opts, network, address, timeout)
+}
+
+func TestStartCallBadProtocol(t *testing.T) {
+ ctx, shutdown := newCtx()
+ defer shutdown()
+ client := v23.GetClient(ctx)
+
+ ns := v23.GetNamespace(ctx)
+ ns.CacheCtl(naming.DisableCache(true))
+
+ logErr := func(msg string, err error) {
+ logErrors(t, msg, true, false, false, err)
+ }
+
+ rpc.RegisterProtocol("dropData", dropDataDialer, net.Listen)
+
+ // The following test will fail due to a broken connection.
+ // We need to run mount table and servers with no security to use
+ // the V23CloseAtMessage net.Conn mock.
+ _, shutdown = runMountTable(t, ctx, "nosec")
+ defer shutdown()
+
+ roots := ns.Roots()
+ brkRoot, err := mocknet.RewriteEndpointProtocol(roots[0], "dropData")
+ if err != nil {
+ t.Fatal(err)
+ }
+ ns.SetRoots(brkRoot.Name())
+
+ nctx, _ := context.WithTimeout(ctx, 100*time.Millisecond)
+ call, err := client.StartCall(nctx, "name", "noname", nil, options.NoRetry{}, options.SecurityNone)
+ if verror.ErrorID(err) != verror.ErrBadProtocol.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ if call != nil {
+ t.Fatalf("expected call to be nil")
+ }
+ logErr("broken connection", err)
+
+ // The following test will fail with because the client will set up
+ // a secure connection to a server that isn't expecting one.
+ name, fn := initServer(t, ctx, options.SecurityNone)
+ defer fn()
+
+ call, err = client.StartCall(nctx, name, "noname", nil, options.NoRetry{})
+ if verror.ErrorID(err) != verror.ErrNoServers.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ if call != nil {
+ t.Fatalf("expected call to be nil")
+ }
+ logErr("insecure server", err)
+
+ // This is the inverse, secure server, insecure client
+ name, fn = initServer(t, ctx)
+ defer fn()
+
+ call, err = client.StartCall(nctx, name, "noname", nil, options.NoRetry{}, options.SecurityNone)
+ if verror.ErrorID(err) != verror.ErrBadProtocol.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ if call != nil {
+ t.Fatalf("expected call to be nil")
+ }
+ logErr("insecure client", err)
+}
+
+func TestStartCallSecurity(t *testing.T) {
+ ctx, shutdown := newCtx()
+ defer shutdown()
+ client := v23.GetClient(ctx)
+
+ logErr := func(msg string, err error) {
+ logErrors(t, msg, true, false, false, err)
+ }
+
+ name, fn := initServer(t, ctx)
+ defer fn()
+
+ // Create a context with a new principal that doesn't match the server,
+ // so that the client will not trust the server.
+ ctx1, err := v23.SetPrincipal(ctx, testutil.NewPrincipal("test-blessing"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ call, err := client.StartCall(ctx1, name, "noname", nil, options.NoRetry{})
+ if verror.ErrorID(err) != verror.ErrNotTrusted.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ if call != nil {
+ t.Fatalf("expected call to be nil")
+ }
+ logErr("client does not trust server", err)
}
func childPing(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
@@ -402,8 +523,8 @@
return nil
}
-func initServer(t *testing.T, ctx *context.T) (string, func()) {
- server, err := v23.NewServer(ctx)
+func initServer(t *testing.T, ctx *context.T, opts ...rpc.ServerOpt) (string, func()) {
+ server, err := v23.NewServer(ctx, opts...)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -652,8 +773,7 @@
_, fn := runMountTable(t, ctx)
defer fn()
name := "noservers"
- ctx, _ = context.WithTimeout(ctx, 1000*time.Millisecond)
- call, err := v23.GetClient(ctx).StartCall(ctx, name, "Sleep", nil)
+ call, err := v23.GetClient(ctx).StartCall(ctx, name, "Sleep", nil, options.NoRetry{})
if err != nil {
testForVerror(t, err, verror.ErrNoServers)
return
@@ -736,5 +856,138 @@
}
}
-// TODO(cnicolaou:) tests for:
-// -- Test for bad discharges error and correct invalidation, client.go:870..880
+func TestMethodErrors(t *testing.T) {
+ ctx, shutdown := newCtx()
+ defer shutdown()
+ clt := v23.GetClient(ctx)
+
+ name, fn := initServer(t, ctx)
+ defer fn()
+
+ logErr := func(msg string, err error) {
+ logErrors(t, msg, true, false, false, err)
+ }
+
+ // Unknown method
+ call, err := clt.StartCall(ctx, name, "NoMethod", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ verr := call.Finish()
+ if verror.ErrorID(verr) != verror.ErrUnknownMethod.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ logErr("unknown method", verr)
+
+ // Unknown suffix
+ call, err = clt.StartCall(ctx, name+"/NoSuffix", "Ping", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ verr = call.Finish()
+ if verror.ErrorID(verr) != verror.ErrUnknownSuffix.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ logErr("unknown suffix", verr)
+
+ // Too many args.
+ call, err = clt.StartCall(ctx, name, "Ping", []interface{}{1, 2})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ r1 := ""
+ verr = call.Finish(&r1)
+ if verror.ErrorID(verr) != verror.ErrBadProtocol.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ if got, want := verr.Error(), "wrong number of input arguments"; !strings.Contains(got, want) {
+ t.Fatalf("want %q to contain %q", got, want)
+ }
+ logErr("wrong # args", verr)
+
+ // Too many results.
+ call, err = clt.StartCall(ctx, name, "Ping", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ r2 := ""
+ verr = call.Finish(&r1, &r2)
+ if verror.ErrorID(verr) != verror.ErrBadProtocol.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ if got, want := verr.Error(), "results, but want"; !strings.Contains(got, want) {
+ t.Fatalf("want %q to contain %q", got, want)
+ }
+ logErr("wrong # results", verr)
+
+ // Mismatched arg types
+ call, err = clt.StartCall(ctx, name, "Echo", []interface{}{1})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ verr = call.Finish(&r2)
+ if verror.ErrorID(verr) != verror.ErrBadProtocol.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ if got, want := verr.Error(), "aren't compatible"; !strings.Contains(got, want) {
+ t.Fatalf("want %q to contain %q", got, want)
+ }
+ logErr("wrong arg types", verr)
+
+ // Mismatched result types
+ call, err = clt.StartCall(ctx, name, "Ping", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ r3 := 2
+ verr = call.Finish(&r3)
+ if verror.ErrorID(verr) != verror.ErrBadProtocol.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ if got, want := verr.Error(), "aren't compatible"; !strings.Contains(got, want) {
+ t.Fatalf("want %q to contain %q", got, want)
+ }
+ logErr("wrong result types", verr)
+}
+
+func TestReservedMethodErrors(t *testing.T) {
+ ctx, shutdown := newCtx()
+ defer shutdown()
+ clt := v23.GetClient(ctx)
+
+ name, fn := initServer(t, ctx)
+ defer fn()
+
+ logErr := func(msg string, err error) {
+ logErrors(t, msg, true, false, false, err)
+ }
+
+ // This call will fail because the __xx suffix is not supported by
+ // the dispatcher implementing Signature.
+ call, err := clt.StartCall(ctx, name+"/__xx", "__Signature", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ sig := []signature.Interface{}
+ verr := call.Finish(&sig)
+ if verror.ErrorID(verr) != verror.ErrUnknownSuffix.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ logErr("unknown suffix", verr)
+
+ // This call will fail for the same reason, but with a different error,
+ // saying that MethodSignature is an unknown method.
+ call, err = clt.StartCall(ctx, name+"/__xx", "__MethodSignature", []interface{}{"dummy"})
+ if err != nil {
+ t.Fatal(err)
+ }
+ verr = call.Finish(&sig)
+ if verror.ErrorID(verr) != verror.ErrUnknownMethod.ID {
+ t.Fatalf("wrong error: %s", verr)
+ }
+ logErr("unknown method", verr)
+}
diff --git a/profiles/internal/rpc/test/proxy_test.go b/profiles/internal/rpc/test/proxy_test.go
index bf6ede5..ed47050 100644
--- a/profiles/internal/rpc/test/proxy_test.go
+++ b/profiles/internal/rpc/test/proxy_test.go
@@ -80,9 +80,7 @@
if expected == len(pubState) {
break
}
- fmt.Fprintf(stderr, "%s\n", pub.DebugString())
delay := time.Second
- fmt.Fprintf(stderr, "Sleeping: %s\n", delay)
time.Sleep(delay)
}
}
diff --git a/profiles/internal/rpc/test/simple_test.go b/profiles/internal/rpc/test/simple_test.go
index bc653f4..c83ec25 100644
--- a/profiles/internal/rpc/test/simple_test.go
+++ b/profiles/internal/rpc/test/simple_test.go
@@ -29,6 +29,10 @@
return "pong", nil
}
+func (s *simple) Echo(call rpc.ServerCall, arg string) (string, error) {
+ return arg, nil
+}
+
func (s *simple) Source(call rpc.StreamServerCall, start int) error {
i := start
backoff := 25 * time.Millisecond
diff --git a/profiles/internal/rpc/version/version.go b/profiles/internal/rpc/version/version.go
index 4ea452e..668784e 100644
--- a/profiles/internal/rpc/version/version.go
+++ b/profiles/internal/rpc/version/version.go
@@ -11,6 +11,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
+ "v.io/v23/verror"
)
// Range represents a range of RPC versions.
@@ -34,14 +35,26 @@
CheckCompatibility = SupportedRange.CheckCompatibility
)
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/version"
+
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
var (
- NoCompatibleVersionErr = fmt.Errorf("No compatible RPC version available")
- UnknownVersionErr = fmt.Errorf("There was not enough information to determine a version.")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ ErrNoCompatibleVersion = reg(".errNoCompatibleVersionErr", "No compatible RPC version available{:3} not in range {4}..{5}")
+ ErrUnknownVersion = reg(".errUnknownVersionErr", "There was not enough information to determine a version")
+ errInternalTypeConversionError = reg(".errInternalTypeConversionError", "failed to convert {3} to v.io/ref/profiles/internal/naming.Endpoint {3}")
)
// IsVersionError returns true if err is a versioning related error.
func IsVersionError(err error) bool {
- return err == NoCompatibleVersionErr || err == UnknownVersionErr
+ id := verror.ErrorID(err)
+ return id == ErrNoCompatibleVersion.ID || id == ErrUnknownVersion.ID
}
// Endpoint returns an endpoint with the Min/MaxRPCVersion properly filled in
@@ -77,9 +90,9 @@
}
if min == u || max == u {
- err = UnknownVersionErr
+ err = verror.New(ErrUnknownVersion, nil)
} else if min > max {
- err = NoCompatibleVersionErr
+ err = verror.New(ErrNoCompatibleVersion, nil, u, min, max)
}
return
}
@@ -102,7 +115,7 @@
func (r *Range) ProxiedEndpoint(rid naming.RoutingID, proxy naming.Endpoint) (*inaming.Endpoint, error) {
proxyEP, ok := proxy.(*inaming.Endpoint)
if !ok {
- return nil, fmt.Errorf("unrecognized naming.Endpoint type %T", proxy)
+ return nil, verror.New(errInternalTypeConversionError, nil, fmt.Sprintf("%T", proxy))
}
ep := &inaming.Endpoint{
@@ -129,11 +142,11 @@
func (r *Range) CommonVersion(a, b naming.Endpoint) (version.RPCVersion, error) {
aEP, ok := a.(*inaming.Endpoint)
if !ok {
- return 0, fmt.Errorf("Unrecognized naming.Endpoint type: %T", a)
+ return 0, verror.New(errInternalTypeConversionError, nil, fmt.Sprintf("%T", a))
}
bEP, ok := b.(*inaming.Endpoint)
if !ok {
- return 0, fmt.Errorf("Unrecognized naming.Endpoint type: %T", b)
+ return 0, verror.New(errInternalTypeConversionError, nil, fmt.Sprintf("%T", b))
}
_, max, err := intersectEndpoints(aEP, bEP)
@@ -144,7 +157,7 @@
// We want to use the maximum common version of the protocol. We just
// need to make sure that it is supported by this RPC implementation.
if max < r.Min || max > r.Max {
- return version.UnknownRPCVersion, NoCompatibleVersionErr
+ return version.UnknownRPCVersion, verror.New(ErrNoCompatibleVersion, nil, max, r.Min, r.Max)
}
return max, nil
}
@@ -154,7 +167,7 @@
func (r *Range) CheckCompatibility(remote naming.Endpoint) error {
remoteEP, ok := remote.(*inaming.Endpoint)
if !ok {
- return fmt.Errorf("Unrecognized naming.Endpoint type: %T", remote)
+ return verror.New(errInternalTypeConversionError, nil, fmt.Sprintf("%T", remote))
}
_, _, err := intersectRanges(r.Min, r.Max,
diff --git a/profiles/internal/rpc/version/version_test.go b/profiles/internal/rpc/version/version_test.go
index 7515a35..bc54048 100644
--- a/profiles/internal/rpc/version/version_test.go
+++ b/profiles/internal/rpc/version/version_test.go
@@ -11,6 +11,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
+ "v.io/v23/verror"
)
func TestCommonVersion(t *testing.T) {
@@ -20,17 +21,17 @@
localMin, localMax version.RPCVersion
remoteMin, remoteMax version.RPCVersion
expectedVer version.RPCVersion
- expectedErr error
+ expectedErr verror.IDAction
}
tests := []testCase{
- {0, 0, 0, 0, 0, UnknownVersionErr},
- {0, 1, 2, 3, 0, NoCompatibleVersionErr},
- {2, 3, 0, 1, 0, NoCompatibleVersionErr},
- {0, 5, 5, 6, 0, NoCompatibleVersionErr},
- {0, 2, 2, 4, 2, nil},
- {0, 2, 1, 3, 2, nil},
- {1, 3, 1, 3, 3, nil},
- {3, 3, 3, 3, 3, nil},
+ {0, 0, 0, 0, 0, ErrUnknownVersion},
+ {0, 1, 2, 3, 0, ErrNoCompatibleVersion},
+ {2, 3, 0, 1, 0, ErrNoCompatibleVersion},
+ {0, 5, 5, 6, 0, ErrNoCompatibleVersion},
+ {0, 2, 2, 4, 2, verror.ErrUnknown},
+ {0, 2, 1, 3, 2, verror.ErrUnknown},
+ {1, 3, 1, 3, 3, verror.ErrUnknown},
+ {3, 3, 3, 3, 3, verror.ErrUnknown},
}
for _, tc := range tests {
local := &inaming.Endpoint{
@@ -41,10 +42,14 @@
MinRPCVersion: tc.remoteMin,
MaxRPCVersion: tc.remoteMax,
}
- if ver, err := r.CommonVersion(local, remote); ver != tc.expectedVer || err != tc.expectedErr {
+ ver, err := r.CommonVersion(local, remote)
+ if ver != tc.expectedVer || (err != nil && verror.ErrorID(err) != tc.expectedErr.ID) {
t.Errorf("Unexpected result for local: %v, remote: %v. Got (%d, %v) wanted (%d, %v)",
local, remote, ver, err, tc.expectedVer, tc.expectedErr)
}
+ if err != nil {
+ t.Logf("%s", err)
+ }
}
}
@@ -93,15 +98,15 @@
type testCase struct {
supportMin, supportMax version.RPCVersion
remoteMin, remoteMax version.RPCVersion
- expectedError error
+ expectedError verror.IDAction
}
tests := []testCase{
- {0, 0, 0, 0, UnknownVersionErr},
- {5, 10, 1, 4, NoCompatibleVersionErr},
- {1, 4, 5, 10, NoCompatibleVersionErr},
- {1, 10, 2, 9, nil},
- {3, 8, 1, 4, nil},
- {3, 8, 7, 9, nil},
+ {0, 0, 0, 0, ErrUnknownVersion},
+ {5, 10, 1, 4, ErrNoCompatibleVersion},
+ {1, 4, 5, 10, ErrNoCompatibleVersion},
+ {1, 10, 2, 9, verror.ErrUnknown},
+ {3, 8, 1, 4, verror.ErrUnknown},
+ {3, 8, 7, 9, verror.ErrUnknown},
}
for _, tc := range tests {
@@ -110,9 +115,13 @@
MinRPCVersion: tc.remoteMin,
MaxRPCVersion: tc.remoteMax,
}
- if err := r.CheckCompatibility(remote); err != tc.expectedError {
+ err := r.CheckCompatibility(remote)
+ if err != nil && verror.ErrorID(err) != tc.expectedError.ID {
t.Errorf("Unexpected error for case %+v: got %v, wanted %v",
tc, err, tc.expectedError)
}
+ if err != nil {
+ t.Logf("%s", err)
+ }
}
}
diff --git a/profiles/internal/testing/mocks/mocknet/mocknet.go b/profiles/internal/testing/mocks/mocknet/mocknet.go
index 192def4..0b73e4b 100644
--- a/profiles/internal/testing/mocks/mocknet/mocknet.go
+++ b/profiles/internal/testing/mocks/mocknet/mocknet.go
@@ -331,7 +331,11 @@
}
func RewriteEndpointProtocol(ep string, protocol string) (naming.Endpoint, error) {
- n, err := v23.NewEndpoint(ep)
+ addr := ep
+ if naming.Rooted(ep) {
+ addr, _ = naming.SplitAddressName(ep)
+ }
+ n, err := v23.NewEndpoint(addr)
if err != nil {
return nil, err
}