stream/manager: use verror.
Change-Id: I4f508f85c51862ed04dde2480b460db1a4a368a5
diff --git a/profiles/internal/rpc/stream/errors.go b/profiles/internal/rpc/stream/errors.go
new file mode 100644
index 0000000..f7d7515
--- /dev/null
+++ b/profiles/internal/rpc/stream/errors.go
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stream
+
+import (
+ "v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream"
+
+// The stream family of packages guarantee to return one of the verror codes defined here, their
+// messages are constructed so as to avoid embedding a component/method name and are thus
+// more suitable for inclusion in other verrors.
+var (
+ ErrSecurity = verror.Register(pkgPath+".errSecurity", verror.NoRetry, "{:3}")
+ ErrNetwork = verror.Register(pkgPath+".errNetwork", verror.NoRetry, "{:3}")
+ ErrProxy = verror.Register(pkgPath+".errProxy", verror.NoRetry, "{:3}")
+ ErrBadArg = verror.Register(pkgPath+".errBadArg", verror.NoRetry, "{:3}")
+ ErrBadState = verror.Register(pkgPath+".errBadState", verror.NoRetry, "{:3}")
+ // TODO(cnicolaou): remove this when the rest of the stream sub packages are converted.
+ ErrSecOrNet = verror.Register(pkgPath+".errSecOrNet", verror.NoRetry, "{:3}")
+ // Update IsStreamError below if you add any other errors here.
+)
+
+// IsStreamError returns true if the err is one of the verror codes defined by this package.
+func IsStreamError(err error) bool {
+ id := verror.ErrorID(err)
+ switch id {
+ case ErrSecurity.ID, ErrNetwork.ID, ErrProxy.ID, ErrBadArg.ID, ErrBadState.ID, ErrSecOrNet.ID:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/profiles/internal/rpc/stream/manager/error_test.go b/profiles/internal/rpc/stream/manager/error_test.go
new file mode 100644
index 0000000..b2c2f9d
--- /dev/null
+++ b/profiles/internal/rpc/stream/manager/error_test.go
@@ -0,0 +1,135 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+ "net"
+ "testing"
+ "time"
+
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+
+ _ "v.io/x/ref/profiles"
+ inaming "v.io/x/ref/profiles/internal/naming"
+ "v.io/x/ref/profiles/internal/rpc/stream"
+ "v.io/x/ref/profiles/internal/rpc/stream/manager"
+ "v.io/x/ref/profiles/internal/rpc/stream/message"
+ "v.io/x/ref/profiles/internal/testing/mocks/mocknet"
+ "v.io/x/ref/test"
+ "v.io/x/ref/test/testutil"
+)
+
+func TestListenErrors(t *testing.T) {
+ server := manager.InternalNew(naming.FixedRoutingID(0x1))
+ pserver := testutil.NewPrincipal("server")
+
+ // principal, no blessings
+ _, _, err := server.Listen("tcp", "127.0.0.1:0", pserver, security.Blessings{}, nil)
+ if verror.ErrorID(err) != stream.ErrBadArg.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+
+ // blessings, no principal
+ _, _, err = server.Listen("tcp", "127.0.0.1:0", nil, pserver.BlessingStore().Default(), nil)
+ if verror.ErrorID(err) != stream.ErrBadArg.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+
+ // bad protocol
+ _, _, err = server.Listen("foo", "127.0.0.1:0", pserver, pserver.BlessingStore().Default())
+ if verror.ErrorID(err) != stream.ErrBadArg.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+
+ // bad address
+ _, _, err = server.Listen("tcp", "xx.0.0.1:0", pserver, pserver.BlessingStore().Default())
+ if verror.ErrorID(err) != stream.ErrBadArg.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+
+ // bad address for proxy
+ _, _, err = server.Listen("v23", "127x.0.0.1", pserver, pserver.BlessingStore().Default())
+ if verror.ErrorID(err) != stream.ErrBadArg.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+}
+
+func acceptLoop(ln stream.Listener) {
+ for {
+ f, err := ln.Accept()
+ if err != nil {
+ return
+ }
+ f.Close()
+ }
+
+}
+func dropDataDialer(network, address string, timeout time.Duration) (net.Conn, error) {
+ matcher := func(read bool, msg message.T) bool {
+ switch msg.(type) {
+ case *message.HopSetup:
+ return true
+ }
+ return false
+ }
+ opts := mocknet.Opts{
+ Mode: mocknet.V23CloseAtMessage,
+ V23MessageMatcher: matcher,
+ }
+ return mocknet.DialerWithOpts(opts, network, address, timeout)
+}
+
+func TestDialErrors(t *testing.T) {
+ _, shutdown := test.InitForTest()
+ defer shutdown()
+ server := manager.InternalNew(naming.FixedRoutingID(0x55555555))
+ client := manager.InternalNew(naming.FixedRoutingID(0xcccccccc))
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+
+ // bad protocol
+ ep, _ := inaming.NewEndpoint(naming.FormatEndpoint("x", "127.0.0.1:2"))
+ _, err := client.Dial(ep, pclient)
+ if verror.ErrorID(err) != stream.ErrBadArg.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+
+ // no server
+ ep, _ = inaming.NewEndpoint(naming.FormatEndpoint("tcp", "127.0.0.1:2"))
+ _, err = client.Dial(ep, pclient)
+ if verror.ErrorID(err) != stream.ErrNetwork.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+
+ rpc.RegisterProtocol("dropData", dropDataDialer, net.Listen)
+
+ ln, sep, err := server.Listen("tcp", "127.0.0.1:0", pserver, pserver.BlessingStore().Default())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Server will just listen for flows and close them.
+ go acceptLoop(ln)
+
+ cep, err := mocknet.RewriteEndpointProtocol(sep.String(), "dropData")
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = client.Dial(cep, pclient)
+ if verror.ErrorID(err) != stream.ErrNetwork.ID {
+ t.Fatalf("wrong error: %s", err)
+ }
+ t.Log(err)
+}
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index 4d705ba..16ace2f 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -5,7 +5,6 @@
package manager
import (
- "errors"
"fmt"
"net"
"strings"
@@ -25,7 +24,23 @@
"v.io/x/ref/profiles/internal/rpc/stream"
)
-var errListenerIsClosed = errors.New("Listener has been Closed")
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
+var (
+ errVomEncoder = reg(".vomEncoder", "failed to create vom encoder{:3}")
+ errVomDecoder = reg(".vomDecoder", "failed to create vom decoder{:3}")
+ errVomEncodeRequest = reg(".vomEncodeRequest", "failed to encode request to proxy{:3}")
+ errVomDecodeResponse = reg(".vomDecodeRequest", "failed to decoded response from proxy{:3}")
+ errProxyError = reg(".proxyError", "proxy error {:3}")
+ errProxyEndpointError = reg(".proxyEndpointError", "proxy returned an invalid endpoint {:3}{:4}")
+ errAlreadyConnected = reg(".alreadyConnected", "already connected to proxy and accepting connections? VIF: {3}, StartAccepting{:_}")
+ errFailedToCreateLivenessFlow = reg(".failedToCreateLivenessFlow", "unable to create liveness check flow to proxy{:3}")
+ errAcceptFailed = reg(".acceptFailed", "accept failed{:3}")
+ errFailedToEstablishVC = reg(".failedToEstablishVC", "VC establishment with proxy failed{:_}")
+ errListenerAlreadyClosed = reg(".listenerAlreadyClosed", "listener already closed")
+)
// listener extends stream.Listener with a DebugString method.
type listener interface {
@@ -106,9 +121,9 @@
item, err := ln.q.Get(nil)
switch {
case err == upcqueue.ErrQueueIsClosed:
- return nil, errListenerIsClosed
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
case err != nil:
- return nil, fmt.Errorf("Accept failed: %v", err)
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
default:
return item.(vif.ConnectorAndFlow).Flow, nil
}
@@ -187,7 +202,7 @@
// Prepend the default idle timeout for VC.
opts = append([]stream.ListenerOpt{vc.IdleTimeout{defaultIdleTimeout}}, opts...)
if err := vf.StartAccepting(opts...); err != nil {
- return nil, nil, fmt.Errorf("already connected to proxy and accepting connections? VIF: %v, StartAccepting error: %v", vf, err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errAlreadyConnected, nil, vf, err))
}
// Proxy protocol: See v.io/x/ref/profiles/internal/rpc/stream/proxy/protocol.vdl
//
@@ -198,12 +213,14 @@
if verror.ErrorID(err) == verror.ErrAborted.ID {
ln.manager.vifs.Delete(vf)
}
- return nil, nil, fmt.Errorf("VC establishment with proxy failed: %v", err)
+ // TODO(cnicolaou): use one of ErrSecurity or ErrProtocol when the vif package
+ // is converted.
+ return nil, nil, verror.New(stream.ErrSecOrNet, nil, verror.New(errFailedToEstablishVC, nil, err))
}
flow, err := vc.Connect()
if err != nil {
vf.StopAccepting()
- return nil, nil, fmt.Errorf("unable to create liveness check flow to proxy: %v", err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errFailedToCreateLivenessFlow, nil, err))
}
var request proxy.Request
var response proxy.Response
@@ -211,34 +228,34 @@
if err != nil {
flow.Close()
vf.StopAccepting()
- return nil, nil, fmt.Errorf("failed to create new Encoder: %v", err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecoder, nil, err))
}
if err := enc.Encode(request); err != nil {
flow.Close()
vf.StopAccepting()
- return nil, nil, fmt.Errorf("failed to encode request to proxy: %v", err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeRequest, nil, err))
}
dec, err := vom.NewDecoder(flow)
if err != nil {
flow.Close()
vf.StopAccepting()
- return nil, nil, fmt.Errorf("failed to create new Decoder: %v", err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecoder, nil, err))
}
if err := dec.Decode(&response); err != nil {
flow.Close()
vf.StopAccepting()
- return nil, nil, fmt.Errorf("failed to decode response from proxy: %v", err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecodeResponse, nil, err))
}
if response.Error != nil {
flow.Close()
vf.StopAccepting()
- return nil, nil, fmt.Errorf("proxy error: %v", response.Error)
+ return nil, nil, verror.New(stream.ErrProxy, nil, response.Error)
}
ep, err := inaming.NewEndpoint(response.Endpoint)
if err != nil {
flow.Close()
vf.StopAccepting()
- return nil, nil, fmt.Errorf("proxy returned invalid endpoint(%v): %v", response.Endpoint, err)
+ return nil, nil, verror.New(stream.ErrProxy, nil, verror.New(errProxyEndpointError, nil, response.Endpoint, err))
}
go func(vf *vif.VIF, flow stream.Flow, q *upcqueue.T) {
<-flow.Closed()
@@ -252,9 +269,9 @@
item, err := ln.q.Get(nil)
switch {
case err == upcqueue.ErrQueueIsClosed:
- return nil, errListenerIsClosed
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
case err != nil:
- return nil, fmt.Errorf("Accept failed: %v", err)
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
default:
return item.(vif.ConnectorAndFlow).Flow, nil
}
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 8d6e2f8..c15c6b2 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -6,7 +6,6 @@
package manager
import (
- "errors"
"fmt"
"net"
"strings"
@@ -28,10 +27,14 @@
"v.io/x/ref/profiles/internal/rpc/version"
)
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/manager"
+
var (
- errShutDown = errors.New("manager has been shut down")
- errProvidedServerBlessingsWithoutPrincipal = errors.New("blessings provided but no known principal")
- errNoBlessingNames = errors.New("stream.ListenerOpts includes a principal but no blessing names could be extracted")
+ errUnknownNetwork = reg(".unknownNetwork", "unknown network{:3}")
+ errEndpointParseError = reg(".endpointParseError", "failed to parse endpoint {3}{:4}")
+ errAlreadyShutdown = reg(".alreadyShutdown", "already shutdown")
+ errProvidedServerBlessingsWithoutPrincipal = reg(".serverBlessingsWithoutPrincipal", "blessings provided but with no principal")
+ errNoBlessingNames = reg(".noBlessingNames", "no blessing names could be extracted for the provided principal")
)
const (
@@ -79,7 +82,7 @@
if d, _, _ := rpc.RegisteredProtocol(network); d != nil {
return d(network, address, timeout)
}
- return nil, fmt.Errorf("unknown network %s", network)
+ return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnknownNetwork, nil, network))
}
// FindOrDialVIF returns the network connection (VIF) to the provided address
@@ -102,7 +105,10 @@
vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
conn, err := dial(network, address, timeout)
if err != nil {
- return nil, fmt.Errorf("net.Dial(%q, %q) failed: %v", network, address, err)
+ if !stream.IsStreamError(err) {
+ err = verror.New(stream.ErrNetwork, nil, err)
+ }
+ return nil, err
}
// (network, address) in the endpoint might not always match up
// with the key used in the vifs. For example:
@@ -126,7 +132,7 @@
vf, err := vif.InternalNewDialedVIF(conn, m.rid, principal, vRange, m.deleteVIF, opts...)
if err != nil {
conn.Close()
- return nil, fmt.Errorf("failed to create VIF: %v", err)
+ return nil, verror.New(stream.ErrNetwork, nil, err)
}
// TODO(ashankar): If two goroutines are simultaneously invoking
// manager.Dial, it is possible that two VIFs are inserted into m.vifs
@@ -162,7 +168,7 @@
if _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
return l(protocol, address)
}
- return nil, fmt.Errorf("unknown network %s", protocol)
+ return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnknownNetwork, nil, protocol))
}
func (m *manager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
@@ -182,7 +188,7 @@
m.muListeners.Lock()
if m.shutdown {
m.muListeners.Unlock()
- return nil, nil, errShutDown
+ return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
}
m.muListeners.Unlock()
@@ -190,20 +196,24 @@
// Act as if listening on the address of a remote proxy.
ep, err := inaming.NewEndpoint(address)
if err != nil {
- return nil, nil, fmt.Errorf("failed to parse endpoint %q: %v", address, err)
+ return nil, nil, verror.New(stream.ErrBadArg, nil, verror.New(errEndpointParseError, nil, address, err))
}
return m.remoteListen(ep, principal, opts)
}
netln, err := listen(protocol, address)
if err != nil {
- return nil, nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", protocol, address, err)
+ if !stream.IsStreamError(err) {
+ vlog.Infof("XXXX %v : %s\n", verror.ErrorID(err), err)
+ err = verror.New(stream.ErrBadArg, nil, err)
+ }
+ return nil, nil, err
}
m.muListeners.Lock()
if m.shutdown {
m.muListeners.Unlock()
closeNetListener(netln)
- return nil, nil, errShutDown
+ return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
}
ln := newNetListener(m, netln, principal, blessings, opts)
@@ -221,7 +231,7 @@
defer m.muListeners.Unlock()
if m.shutdown {
ln.Close()
- return nil, nil, errShutDown
+ return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
}
m.listeners[ln] = true
return ln, ep, nil
@@ -310,7 +320,7 @@
func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
if !b.IsZero() && p == nil {
- return nil, errProvidedServerBlessingsWithoutPrincipal
+ return nil, verror.New(stream.ErrBadArg, nil, verror.New(errProvidedServerBlessingsWithoutPrincipal, nil))
}
if p == nil {
return nil, nil
@@ -320,7 +330,7 @@
ret = append(ret, b)
}
if len(ret) == 0 {
- return nil, errNoBlessingNames
+ return nil, verror.New(stream.ErrBadArg, nil, verror.New(errNoBlessingNames, nil))
}
return ret, nil
}
diff --git a/profiles/internal/rpc/stream/manager/manager_test.go b/profiles/internal/rpc/stream/manager/manager_test.go
index 3dc9604..e8e6826 100644
--- a/profiles/internal/rpc/stream/manager/manager_test.go
+++ b/profiles/internal/rpc/stream/manager/manager_test.go
@@ -719,13 +719,13 @@
rpc.RegisterProtocol("tn", dialer, listener)
_, _, err := server.Listen("tnx", "127.0.0.1:0", principal, blessings)
- if err == nil || !strings.Contains(err.Error(), "unknown network tnx") {
- t.Fatal("expected error is missing (%v)", err)
+ if err == nil || !strings.Contains(err.Error(), "unknown network: tnx") {
+ t.Fatalf("expected error is missing (%v)", err)
}
_, _, err = server.Listen("tn", "127.0.0.1:0", principal, blessings)
if err == nil || !strings.Contains(err.Error(), "tn.Listen") {
- t.Fatal("expected error is missing (%v)", err)
+ t.Fatalf("expected error is missing (%v)", err)
}
// Need a functional listener to test Dial.