stream/manager: use verror.

Change-Id: I4f508f85c51862ed04dde2480b460db1a4a368a5
diff --git a/profiles/internal/rpc/stream/errors.go b/profiles/internal/rpc/stream/errors.go
new file mode 100644
index 0000000..f7d7515
--- /dev/null
+++ b/profiles/internal/rpc/stream/errors.go
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stream
+
+import (
+	"v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream"
+
+// The stream family of packages guarantee to return one of the verror codes defined here, their
+// messages are constructed so as to avoid embedding a component/method name and are thus
+// more suitable for inclusion in other verrors.
+var (
+	ErrSecurity = verror.Register(pkgPath+".errSecurity", verror.NoRetry, "{:3}")
+	ErrNetwork  = verror.Register(pkgPath+".errNetwork", verror.NoRetry, "{:3}")
+	ErrProxy    = verror.Register(pkgPath+".errProxy", verror.NoRetry, "{:3}")
+	ErrBadArg   = verror.Register(pkgPath+".errBadArg", verror.NoRetry, "{:3}")
+	ErrBadState = verror.Register(pkgPath+".errBadState", verror.NoRetry, "{:3}")
+	// TODO(cnicolaou): remove this when the rest of the stream sub packages are converted.
+	ErrSecOrNet = verror.Register(pkgPath+".errSecOrNet", verror.NoRetry, "{:3}")
+	// Update IsStreamError below if you add any other errors here.
+)
+
+// IsStreamError returns true if the err is one of the verror codes defined by this package.
+func IsStreamError(err error) bool {
+	id := verror.ErrorID(err)
+	switch id {
+	case ErrSecurity.ID, ErrNetwork.ID, ErrProxy.ID, ErrBadArg.ID, ErrBadState.ID, ErrSecOrNet.ID:
+		return true
+	default:
+		return false
+	}
+}
diff --git a/profiles/internal/rpc/stream/manager/error_test.go b/profiles/internal/rpc/stream/manager/error_test.go
new file mode 100644
index 0000000..b2c2f9d
--- /dev/null
+++ b/profiles/internal/rpc/stream/manager/error_test.go
@@ -0,0 +1,135 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+	"net"
+	"testing"
+	"time"
+
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/verror"
+
+	_ "v.io/x/ref/profiles"
+	inaming "v.io/x/ref/profiles/internal/naming"
+	"v.io/x/ref/profiles/internal/rpc/stream"
+	"v.io/x/ref/profiles/internal/rpc/stream/manager"
+	"v.io/x/ref/profiles/internal/rpc/stream/message"
+	"v.io/x/ref/profiles/internal/testing/mocks/mocknet"
+	"v.io/x/ref/test"
+	"v.io/x/ref/test/testutil"
+)
+
+func TestListenErrors(t *testing.T) {
+	server := manager.InternalNew(naming.FixedRoutingID(0x1))
+	pserver := testutil.NewPrincipal("server")
+
+	// principal, no blessings
+	_, _, err := server.Listen("tcp", "127.0.0.1:0", pserver, security.Blessings{}, nil)
+	if verror.ErrorID(err) != stream.ErrBadArg.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+
+	// blessings, no principal
+	_, _, err = server.Listen("tcp", "127.0.0.1:0", nil, pserver.BlessingStore().Default(), nil)
+	if verror.ErrorID(err) != stream.ErrBadArg.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+
+	// bad protocol
+	_, _, err = server.Listen("foo", "127.0.0.1:0", pserver, pserver.BlessingStore().Default())
+	if verror.ErrorID(err) != stream.ErrBadArg.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+
+	// bad address
+	_, _, err = server.Listen("tcp", "xx.0.0.1:0", pserver, pserver.BlessingStore().Default())
+	if verror.ErrorID(err) != stream.ErrBadArg.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+
+	// bad address for proxy
+	_, _, err = server.Listen("v23", "127x.0.0.1", pserver, pserver.BlessingStore().Default())
+	if verror.ErrorID(err) != stream.ErrBadArg.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+}
+
+func acceptLoop(ln stream.Listener) {
+	for {
+		f, err := ln.Accept()
+		if err != nil {
+			return
+		}
+		f.Close()
+	}
+
+}
+func dropDataDialer(network, address string, timeout time.Duration) (net.Conn, error) {
+	matcher := func(read bool, msg message.T) bool {
+		switch msg.(type) {
+		case *message.HopSetup:
+			return true
+		}
+		return false
+	}
+	opts := mocknet.Opts{
+		Mode:              mocknet.V23CloseAtMessage,
+		V23MessageMatcher: matcher,
+	}
+	return mocknet.DialerWithOpts(opts, network, address, timeout)
+}
+
+func TestDialErrors(t *testing.T) {
+	_, shutdown := test.InitForTest()
+	defer shutdown()
+	server := manager.InternalNew(naming.FixedRoutingID(0x55555555))
+	client := manager.InternalNew(naming.FixedRoutingID(0xcccccccc))
+	pclient := testutil.NewPrincipal("client")
+	pserver := testutil.NewPrincipal("server")
+
+	// bad protocol
+	ep, _ := inaming.NewEndpoint(naming.FormatEndpoint("x", "127.0.0.1:2"))
+	_, err := client.Dial(ep, pclient)
+	if verror.ErrorID(err) != stream.ErrBadArg.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+
+	// no server
+	ep, _ = inaming.NewEndpoint(naming.FormatEndpoint("tcp", "127.0.0.1:2"))
+	_, err = client.Dial(ep, pclient)
+	if verror.ErrorID(err) != stream.ErrNetwork.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+
+	rpc.RegisterProtocol("dropData", dropDataDialer, net.Listen)
+
+	ln, sep, err := server.Listen("tcp", "127.0.0.1:0", pserver, pserver.BlessingStore().Default())
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Server will just listen for flows and close them.
+	go acceptLoop(ln)
+
+	cep, err := mocknet.RewriteEndpointProtocol(sep.String(), "dropData")
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, err = client.Dial(cep, pclient)
+	if verror.ErrorID(err) != stream.ErrNetwork.ID {
+		t.Fatalf("wrong error: %s", err)
+	}
+	t.Log(err)
+}
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index 4d705ba..16ace2f 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -5,7 +5,6 @@
 package manager
 
 import (
-	"errors"
 	"fmt"
 	"net"
 	"strings"
@@ -25,7 +24,23 @@
 	"v.io/x/ref/profiles/internal/rpc/stream"
 )
 
-var errListenerIsClosed = errors.New("Listener has been Closed")
+func reg(id, msg string) verror.IDAction {
+	return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
+var (
+	errVomEncoder                 = reg(".vomEncoder", "failed to create vom encoder{:3}")
+	errVomDecoder                 = reg(".vomDecoder", "failed to create vom decoder{:3}")
+	errVomEncodeRequest           = reg(".vomEncodeRequest", "failed to encode request to proxy{:3}")
+	errVomDecodeResponse          = reg(".vomDecodeRequest", "failed to decoded response from proxy{:3}")
+	errProxyError                 = reg(".proxyError", "proxy error {:3}")
+	errProxyEndpointError         = reg(".proxyEndpointError", "proxy returned an invalid endpoint {:3}{:4}")
+	errAlreadyConnected           = reg(".alreadyConnected", "already connected to proxy and accepting connections? VIF: {3}, StartAccepting{:_}")
+	errFailedToCreateLivenessFlow = reg(".failedToCreateLivenessFlow", "unable to create liveness check flow to proxy{:3}")
+	errAcceptFailed               = reg(".acceptFailed", "accept failed{:3}")
+	errFailedToEstablishVC        = reg(".failedToEstablishVC", "VC establishment with proxy failed{:_}")
+	errListenerAlreadyClosed      = reg(".listenerAlreadyClosed", "listener already closed")
+)
 
 // listener extends stream.Listener with a DebugString method.
 type listener interface {
@@ -106,9 +121,9 @@
 	item, err := ln.q.Get(nil)
 	switch {
 	case err == upcqueue.ErrQueueIsClosed:
-		return nil, errListenerIsClosed
+		return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
 	case err != nil:
-		return nil, fmt.Errorf("Accept failed: %v", err)
+		return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
 	default:
 		return item.(vif.ConnectorAndFlow).Flow, nil
 	}
@@ -187,7 +202,7 @@
 	// Prepend the default idle timeout for VC.
 	opts = append([]stream.ListenerOpt{vc.IdleTimeout{defaultIdleTimeout}}, opts...)
 	if err := vf.StartAccepting(opts...); err != nil {
-		return nil, nil, fmt.Errorf("already connected to proxy and accepting connections? VIF: %v, StartAccepting error: %v", vf, err)
+		return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errAlreadyConnected, nil, vf, err))
 	}
 	// Proxy protocol: See v.io/x/ref/profiles/internal/rpc/stream/proxy/protocol.vdl
 	//
@@ -198,12 +213,14 @@
 		if verror.ErrorID(err) == verror.ErrAborted.ID {
 			ln.manager.vifs.Delete(vf)
 		}
-		return nil, nil, fmt.Errorf("VC establishment with proxy failed: %v", err)
+		// TODO(cnicolaou): use one of ErrSecurity or ErrProtocol when the vif package
+		// is converted.
+		return nil, nil, verror.New(stream.ErrSecOrNet, nil, verror.New(errFailedToEstablishVC, nil, err))
 	}
 	flow, err := vc.Connect()
 	if err != nil {
 		vf.StopAccepting()
-		return nil, nil, fmt.Errorf("unable to create liveness check flow to proxy: %v", err)
+		return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errFailedToCreateLivenessFlow, nil, err))
 	}
 	var request proxy.Request
 	var response proxy.Response
@@ -211,34 +228,34 @@
 	if err != nil {
 		flow.Close()
 		vf.StopAccepting()
-		return nil, nil, fmt.Errorf("failed to create new Encoder: %v", err)
+		return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecoder, nil, err))
 	}
 	if err := enc.Encode(request); err != nil {
 		flow.Close()
 		vf.StopAccepting()
-		return nil, nil, fmt.Errorf("failed to encode request to proxy: %v", err)
+		return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeRequest, nil, err))
 	}
 	dec, err := vom.NewDecoder(flow)
 	if err != nil {
 		flow.Close()
 		vf.StopAccepting()
-		return nil, nil, fmt.Errorf("failed to create new Decoder: %v", err)
+		return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecoder, nil, err))
 	}
 	if err := dec.Decode(&response); err != nil {
 		flow.Close()
 		vf.StopAccepting()
-		return nil, nil, fmt.Errorf("failed to decode response from proxy: %v", err)
+		return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecodeResponse, nil, err))
 	}
 	if response.Error != nil {
 		flow.Close()
 		vf.StopAccepting()
-		return nil, nil, fmt.Errorf("proxy error: %v", response.Error)
+		return nil, nil, verror.New(stream.ErrProxy, nil, response.Error)
 	}
 	ep, err := inaming.NewEndpoint(response.Endpoint)
 	if err != nil {
 		flow.Close()
 		vf.StopAccepting()
-		return nil, nil, fmt.Errorf("proxy returned invalid endpoint(%v): %v", response.Endpoint, err)
+		return nil, nil, verror.New(stream.ErrProxy, nil, verror.New(errProxyEndpointError, nil, response.Endpoint, err))
 	}
 	go func(vf *vif.VIF, flow stream.Flow, q *upcqueue.T) {
 		<-flow.Closed()
@@ -252,9 +269,9 @@
 	item, err := ln.q.Get(nil)
 	switch {
 	case err == upcqueue.ErrQueueIsClosed:
-		return nil, errListenerIsClosed
+		return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
 	case err != nil:
-		return nil, fmt.Errorf("Accept failed: %v", err)
+		return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
 	default:
 		return item.(vif.ConnectorAndFlow).Flow, nil
 	}
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 8d6e2f8..c15c6b2 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -6,7 +6,6 @@
 package manager
 
 import (
-	"errors"
 	"fmt"
 	"net"
 	"strings"
@@ -28,10 +27,14 @@
 	"v.io/x/ref/profiles/internal/rpc/version"
 )
 
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/manager"
+
 var (
-	errShutDown                                = errors.New("manager has been shut down")
-	errProvidedServerBlessingsWithoutPrincipal = errors.New("blessings provided but no known principal")
-	errNoBlessingNames                         = errors.New("stream.ListenerOpts includes a principal but no blessing names could be extracted")
+	errUnknownNetwork                          = reg(".unknownNetwork", "unknown network{:3}")
+	errEndpointParseError                      = reg(".endpointParseError", "failed to parse endpoint {3}{:4}")
+	errAlreadyShutdown                         = reg(".alreadyShutdown", "already shutdown")
+	errProvidedServerBlessingsWithoutPrincipal = reg(".serverBlessingsWithoutPrincipal", "blessings provided but with no principal")
+	errNoBlessingNames                         = reg(".noBlessingNames", "no blessing names could be extracted for the provided principal")
 )
 
 const (
@@ -79,7 +82,7 @@
 	if d, _, _ := rpc.RegisteredProtocol(network); d != nil {
 		return d(network, address, timeout)
 	}
-	return nil, fmt.Errorf("unknown network %s", network)
+	return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnknownNetwork, nil, network))
 }
 
 // FindOrDialVIF returns the network connection (VIF) to the provided address
@@ -102,7 +105,10 @@
 	vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
 	conn, err := dial(network, address, timeout)
 	if err != nil {
-		return nil, fmt.Errorf("net.Dial(%q, %q) failed: %v", network, address, err)
+		if !stream.IsStreamError(err) {
+			err = verror.New(stream.ErrNetwork, nil, err)
+		}
+		return nil, err
 	}
 	// (network, address) in the endpoint might not always match up
 	// with the key used in the vifs. For example:
@@ -126,7 +132,7 @@
 	vf, err := vif.InternalNewDialedVIF(conn, m.rid, principal, vRange, m.deleteVIF, opts...)
 	if err != nil {
 		conn.Close()
-		return nil, fmt.Errorf("failed to create VIF: %v", err)
+		return nil, verror.New(stream.ErrNetwork, nil, err)
 	}
 	// TODO(ashankar): If two goroutines are simultaneously invoking
 	// manager.Dial, it is possible that two VIFs are inserted into m.vifs
@@ -162,7 +168,7 @@
 	if _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
 		return l(protocol, address)
 	}
-	return nil, fmt.Errorf("unknown network %s", protocol)
+	return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnknownNetwork, nil, protocol))
 }
 
 func (m *manager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
@@ -182,7 +188,7 @@
 	m.muListeners.Lock()
 	if m.shutdown {
 		m.muListeners.Unlock()
-		return nil, nil, errShutDown
+		return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
 	}
 	m.muListeners.Unlock()
 
@@ -190,20 +196,24 @@
 		// Act as if listening on the address of a remote proxy.
 		ep, err := inaming.NewEndpoint(address)
 		if err != nil {
-			return nil, nil, fmt.Errorf("failed to parse endpoint %q: %v", address, err)
+			return nil, nil, verror.New(stream.ErrBadArg, nil, verror.New(errEndpointParseError, nil, address, err))
 		}
 		return m.remoteListen(ep, principal, opts)
 	}
 	netln, err := listen(protocol, address)
 	if err != nil {
-		return nil, nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", protocol, address, err)
+		if !stream.IsStreamError(err) {
+			vlog.Infof("XXXX %v : %s\n", verror.ErrorID(err), err)
+			err = verror.New(stream.ErrBadArg, nil, err)
+		}
+		return nil, nil, err
 	}
 
 	m.muListeners.Lock()
 	if m.shutdown {
 		m.muListeners.Unlock()
 		closeNetListener(netln)
-		return nil, nil, errShutDown
+		return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
 	}
 
 	ln := newNetListener(m, netln, principal, blessings, opts)
@@ -221,7 +231,7 @@
 	defer m.muListeners.Unlock()
 	if m.shutdown {
 		ln.Close()
-		return nil, nil, errShutDown
+		return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
 	}
 	m.listeners[ln] = true
 	return ln, ep, nil
@@ -310,7 +320,7 @@
 
 func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
 	if !b.IsZero() && p == nil {
-		return nil, errProvidedServerBlessingsWithoutPrincipal
+		return nil, verror.New(stream.ErrBadArg, nil, verror.New(errProvidedServerBlessingsWithoutPrincipal, nil))
 	}
 	if p == nil {
 		return nil, nil
@@ -320,7 +330,7 @@
 		ret = append(ret, b)
 	}
 	if len(ret) == 0 {
-		return nil, errNoBlessingNames
+		return nil, verror.New(stream.ErrBadArg, nil, verror.New(errNoBlessingNames, nil))
 	}
 	return ret, nil
 }
diff --git a/profiles/internal/rpc/stream/manager/manager_test.go b/profiles/internal/rpc/stream/manager/manager_test.go
index 3dc9604..e8e6826 100644
--- a/profiles/internal/rpc/stream/manager/manager_test.go
+++ b/profiles/internal/rpc/stream/manager/manager_test.go
@@ -719,13 +719,13 @@
 	rpc.RegisterProtocol("tn", dialer, listener)
 
 	_, _, err := server.Listen("tnx", "127.0.0.1:0", principal, blessings)
-	if err == nil || !strings.Contains(err.Error(), "unknown network tnx") {
-		t.Fatal("expected error is missing (%v)", err)
+	if err == nil || !strings.Contains(err.Error(), "unknown network: tnx") {
+		t.Fatalf("expected error is missing (%v)", err)
 	}
 
 	_, _, err = server.Listen("tn", "127.0.0.1:0", principal, blessings)
 	if err == nil || !strings.Contains(err.Error(), "tn.Listen") {
-		t.Fatal("expected error is missing (%v)", err)
+		t.Fatalf("expected error is missing (%v)", err)
 	}
 
 	// Need a functional listener to test Dial.