profiles/internal/rpc: more verror conversions and tests.

MultiPart: 2/2

Change-Id: Id0e77e55764b6df0116566ff5052b2056c418b40
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index db689b8..76260fa 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -33,9 +33,20 @@
 	inaming "v.io/x/ref/profiles/internal/naming"
 	"v.io/x/ref/profiles/internal/rpc/stream"
 	"v.io/x/ref/profiles/internal/rpc/stream/vc"
+)
 
-	// TODO(cnicolaou): finish verror2 -> verror transition, in particular
-	// for communicating from server to client.
+var (
+	// These errors are intended to be used as arguments to higher
+	// level errors and hence {1}{2} is omitted from their format
+	// strings to avoid repeating these n-times in the final error
+	// message visible to the user.
+	errResponseEncoding          = reg(".errResponseEncoding", "failed to encode RPC response {3} <-> {4}{:5}")
+	errResultEncoding            = reg(".errResultEncoding", "failed to encode result #{3} [{4}]{:5}")
+	errFailedToResolveToEndpoint = reg(".errFailedToResolveToEndpoint", "failed to resolve {3} to an endpoint")
+	errFailedToResolveProxy      = reg(".errFailedToResolveProxy", "failed to resolve proxy {3}{:4}")
+	errFailedToListenForProxy    = reg(".errFailedToListenForProxy", "failed to listen on {3}{:4}")
+	errInternalTypeConversion    = reg(".errInternalTypeConversion", "failed to convert {3} to v.io/x/ref/profiles/internal/naming.Endpoint")
+	errFailedToParseIP           = reg(".errFailedToParseIP", "failed to parse {3} as an IP host")
 )
 
 // state for each requested listen address
@@ -302,7 +313,7 @@
 			return ep.String(), nil
 		}
 	}
-	return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
+	return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
 }
 
 // getPossbileAddrs returns an appropriate set of addresses that could be used
@@ -314,7 +325,7 @@
 
 	ip := net.ParseIP(host)
 	if ip == nil {
-		return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
+		return nil, false, verror.New(errFailedToParseIP, nil, host)
 	}
 
 	addrFromIP := func(ip net.IP) rpc.Address {
@@ -347,7 +358,7 @@
 func (s *server) createEndpoints(lep naming.Endpoint, chooser rpc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
 	iep, ok := lep.(*inaming.Endpoint)
 	if !ok {
-		return nil, "", false, fmt.Errorf("internal type conversion error for %T", lep)
+		return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
 	}
 	if !strings.HasPrefix(iep.Protocol, "tcp") &&
 		!strings.HasPrefix(iep.Protocol, "ws") {
@@ -479,16 +490,16 @@
 func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
 	resolved, err := s.resolveToEndpoint(proxy)
 	if err != nil {
-		return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
+		return nil, nil, verror.New(errFailedToResolveProxy, s.ctx, proxy, err)
 	}
 	ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.principal, s.blessings, s.listenerOpts...)
 	if err != nil {
-		return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
+		return nil, nil, verror.New(errFailedToListenForProxy, s.ctx, resolved, err)
 	}
 	iep, ok := ep.(*inaming.Endpoint)
 	if !ok {
 		ln.Close()
-		return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
+		return nil, nil, verror.New(errInternalTypeConversion, s.ctx, fmt.Sprintf("%T", ep))
 	}
 	s.Lock()
 	s.proxies[proxy] = proxyState{iep, nil}
@@ -617,7 +628,7 @@
 			defer calls.Done()
 			fs, err := newFlowServer(flow, s)
 			if err != nil {
-				vlog.Errorf("newFlowServer on %v failed: %v", ep, err)
+				vlog.VI(1).Infof("newFlowServer on %v failed: %v", ep, err)
 				return
 			}
 			if err := fs.serve(); err != nil {
@@ -626,7 +637,7 @@
 				// TODO(cnicolaou): revisit this when verror2 transition is
 				// done.
 				if err != io.EOF {
-					vlog.VI(2).Infof("Flow serve on %v failed: %v", ep, err)
+					vlog.VI(2).Infof("Flow.serve on %v failed: %v", ep, err)
 				}
 			}
 		}(flow)
@@ -754,7 +765,7 @@
 
 func (d leafDispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
 	if suffix != "" {
-		return nil, nil, rpc.NewErrUnknownSuffix(nil, suffix)
+		return nil, nil, verror.New(verror.ErrUnknownSuffix, nil, suffix)
 	}
 	return d.invoker, d.auth, nil
 }
@@ -1044,7 +1055,7 @@
 		if err == io.EOF {
 			return err
 		}
-		return fmt.Errorf("rpc: response encoding failed: %v", err)
+		return verror.New(errResponseEncoding, fs.Context(), fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
 	}
 	if response.Error != nil {
 		return response.Error
@@ -1054,7 +1065,7 @@
 			if err == io.EOF {
 				return err
 			}
-			return fmt.Errorf("rpc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
+			return verror.New(errResultEncoding, fs.Context(), ix, fmt.Sprintf("%T=%v", res, res), err)
 		}
 	}
 	// TODO(ashankar): Should unread data from the flow be drained?
@@ -1139,11 +1150,11 @@
 		return nil, err
 	}
 	if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
-		return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadNumInputArgs(fs.T, fs.suffix, fs.method, called, want))
+		return nil, newErrBadNumInputArgs(fs.T, fs.suffix, fs.method, called, want)
 	}
 	for ix, argptr := range argptrs {
 		if err := fs.dec.Decode(argptr); err != nil {
-			return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadInputArg(fs.T, fs.suffix, fs.method, uint64(ix), err))
+			return nil, newErrBadInputArg(fs.T, fs.suffix, fs.method, uint64(ix), err)
 		}
 	}
 
@@ -1199,7 +1210,7 @@
 			return invoker, auth, nil
 		}
 	}
-	return nil, nil, rpc.NewErrUnknownSuffix(nil, suffix)
+	return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.T, suffix)
 }
 
 func objectToInvoker(obj interface{}) (rpc.Invoker, error) {