profiles/internal/rcp/stream: verror conversions.

Change-Id: Iac0aac0ed95817db7678f3527ae8eb55242be8c5
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index 670721d..85798a2 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -11,7 +11,6 @@
 
 import (
 	"bytes"
-	"errors"
 	"fmt"
 	"net"
 	"sort"
@@ -42,8 +41,31 @@
 
 const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/vif"
 
+func reg(id, msg string) verror.IDAction {
+	return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
 var (
-	errShuttingDown = verror.Register(pkgPath+".errShuttingDown", verror.NoRetry, "{1:}{2:} underlying network connection({3}) shutting down{:_}")
+	// These errors are intended to be used as arguments to higher
+	// level errors and hence {1}{2} is omitted from their format
+	// strings to avoid repeating these n-times in the final error
+	// message visible to the user.
+	errShuttingDown             = reg(".errShuttingDown", "underlying network connection({3}) shutting down")
+	errVCHandshakeFailed        = reg(".errVCHandshakeFailed", "VC handshake failed{:3}")
+	errSendOnExpressQFailed     = reg(".errSendOnExpressQFailed", "vif.sendOnExpressQ(OpenVC) failed{:3}")
+	errVIFIsBeingClosed         = reg(".errVIFIsBeingClosed", "VIF is being closed")
+	errVIFAlreadyAcceptingFlows = reg(".errVIFAlreadyAcceptingFlows", "already accepting flows on VIF {3}")
+	errVCsNotAcceptedOnVIF      = reg(".errVCsNotAcceptedOnVIF", "VCs not accepted on VIF {3}")
+	errAcceptFailed             = reg(".errAcceptFailed", "Accept failed{:3}")
+	errRemoteEndClosedVC        = reg(".errRemoteEndClosedVC", "remote end closed VC{:3}")
+	errFlowsNoLongerAccepted    = reg(".errFlowsNowLongerAccepted", "Flows no longer being accepted")
+	errVCAcceptFailed           = reg(".errVCAcceptFailed", "VC accept failed{:3}")
+	errIdleTimeout              = reg(".errIdleTimeout", "idle timeout")
+	errVIFAlreadySetup          = reg(".errVIFAlreadySetupt", "VIF is already setup")
+	errBqueueWriterForXpress    = reg(".errBqueueWriterForXpress", "failed to create bqueue.Writer for express messages{:3}")
+	errBqueueWriterForControl   = reg(".errBqueueWriterForControl", "failed to create bqueue.Writer for flow control counters{:3}")
+	errBqueueWriterForStopping  = reg(".errBqueueWriterForStopping", "failed to create bqueue.Writer for stopping the write loop{:3}")
+	errWriteFailed              = reg(".errWriteFailed", "write failed: got ({3}, {4}) for {5} byte message)")
 )
 
 // VIF implements a "virtual interface" over an underlying network connection
@@ -131,10 +153,6 @@
 	sharedFlowID                = vc.SharedFlowID
 )
 
-var (
-	errAlreadySetup = errors.New("VIF is already setup")
-)
-
 // InternalNewDialedVIF creates a new virtual interface over the provided
 // network connection, under the assumption that the conn object was created
 // using net.Dial. If onClose is given, it is run in its own goroutine when
@@ -201,19 +219,19 @@
 
 	expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow)
 	if err != nil {
-		return nil, fmt.Errorf("failed to create bqueue.Writer for express messages: %v", err)
+		return nil, verror.New(errBqueueWriterForXpress, nil, err)
 	}
 	expressQ.Release(-1) // Disable flow control
 
 	flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size())
 	if err != nil {
-		return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err)
+		return nil, verror.New(errBqueueWriterForControl, nil, err)
 	}
 	flowQ.Release(-1) // Disable flow control
 
 	stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1)
 	if err != nil {
-		return nil, fmt.Errorf("failed to create bqueue.Writer for stopping the write loop: %v", err)
+		return nil, verror.New(errBqueueWriterForStopping, nil, err)
 	}
 	stopQ.Release(-1) // Disable flow control
 
@@ -241,7 +259,7 @@
 	vif.idleTimerMap = newIdleTimerMap(func(vci id.VC) {
 		vc, _, _ := vif.vcMap.Find(vci)
 		if vc != nil {
-			vif.closeVCAndSendMsg(vc, "idle timeout")
+			vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
 		}
 	})
 	go vif.readLoop()
@@ -281,15 +299,15 @@
 		Counters:    counters})
 	if err != nil {
 		vif.deleteVC(vc.VCI())
-		err = fmt.Errorf("vif.sendOnExpressQ(OpenVC) failed: %v", err)
-		vc.Close(err.Error())
+		err = verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
+		vc.Close(err)
 		return nil, err
 	}
 	if err := vc.HandshakeDialedVC(principal, opts...); err != nil {
 		vif.deleteVC(vc.VCI())
-		err = fmt.Errorf("VC handshake failed: %v", err)
-		vc.Close(err.Error())
-		return nil, err
+		verr := verror.New(stream.ErrSecurity, nil, verror.New(errVCHandshakeFailed, nil, err))
+		vc.Close(verr)
+		return nil, verr
 	}
 	return vc, nil
 }
@@ -343,7 +361,7 @@
 	// Stop the idle timers.
 	vif.idleTimerMap.Stop()
 	for _, vc := range vcs {
-		vc.VC.Close("VIF is being closed")
+		vc.VC.Close(verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil)))
 	}
 	// Wait for the vcWriteLoops to exit (after draining queued up messages).
 	vif.stopQ.Close()
@@ -367,7 +385,7 @@
 	vif.muListen.Lock()
 	defer vif.muListen.Unlock()
 	if vif.acceptor != nil {
-		return fmt.Errorf("already accepting Flows on VIF %v", vif)
+		return verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil, vif))
 	}
 	vif.acceptor = upcqueue.New()
 	vif.listenerOpts = opts
@@ -409,11 +427,11 @@
 	acceptor := vif.acceptor
 	vif.muListen.Unlock()
 	if acceptor == nil {
-		return ConnectorAndFlow{}, fmt.Errorf("VCs not accepted on VIF %v", vif)
+		return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errVCsNotAcceptedOnVIF, nil, vif))
 	}
 	item, err := acceptor.Get(nil)
 	if err != nil {
-		return ConnectorAndFlow{}, fmt.Errorf("Accept failed: %v", err)
+		return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
 	}
 	return item.(ConnectorAndFlow), nil
 }
@@ -510,7 +528,7 @@
 			// to indicate a 'remote close' rather than a 'local one'. This helps
 			// with error reporting since we expect reads/writes to occur
 			// after a remote close, but not after a local close.
-			vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error))
+			vc.Close(verror.New(stream.ErrNetwork, nil, verror.New(errRemoteEndClosedVC, nil, m.Error)))
 			return nil
 		}
 		vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
@@ -532,7 +550,7 @@
 	case *message.HopSetup:
 		// Configure the VIF.  This takes over the conn during negotiation.
 		if vif.isSetup {
-			return errAlreadySetup
+			return verror.New(stream.ErrNetwork, nil, verror.New(errVIFAlreadySetup, nil))
 		}
 		vif.muListen.Lock()
 		dischargeClient := getDischargeClient(vif.listenerOpts)
@@ -579,10 +597,16 @@
 	vif.rpending.Wait()
 }
 
+func clientVCClosed(err error) bool {
+	// If we've encountered a networking error, then all likelihood the
+	// connection to the client is closed.
+	return verror.ErrorID(err) == stream.ErrNetwork.ID
+}
+
 func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) {
 	hr := <-c
 	if hr.Error != nil {
-		vif.closeVCAndSendMsg(vc, hr.Error.Error())
+		vif.closeVCAndSendMsg(vc, clientVCClosed(hr.Error), hr.Error)
 		return
 	}
 
@@ -590,13 +614,13 @@
 	acceptor := vif.acceptor
 	vif.muListen.Unlock()
 	if acceptor == nil {
-		vif.closeVCAndSendMsg(vc, "Flows no longer being accepted")
+		vif.closeVCAndSendMsg(vc, false, verror.New(errFlowsNoLongerAccepted, nil))
 		return
 	}
 
 	// Notify any listeners that a new VC has been established
 	if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil {
-		vif.closeVCAndSendMsg(vc, fmt.Sprintf("VC accept failed: %v", err))
+		vif.closeVCAndSendMsg(vc, clientVCClosed(err), verror.New(errVCAcceptFailed, nil, err))
 		return
 	}
 
@@ -766,7 +790,7 @@
 		return err
 	}
 	if n, err := vif.conn.Write(msg); err != nil {
-		return fmt.Errorf("write failed: got (%d, %v) for %d byte message", n, err, len(msg))
+		return verror.New(errWriteFailed, nil, n, err, len(msg))
 	}
 	return nil
 }
@@ -858,11 +882,8 @@
 			wq.Close()
 		}
 		vif.vcMap.Delete(vci)
-		vc.Close("underlying network connection shutting down")
-		// We embed an error inside verror.ErrAborted because other layers
-		// check for the "Aborted" error as a special case.  Perhaps
-		// eventually we'll get rid of the Aborted layer.
-		return nil, verror.New(verror.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
+		vc.Close(verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif)))
+		return nil, verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
 	}
 	vif.idleTimerMap.Insert(vc.VCI(), idleTimeout)
 	return vc, nil
@@ -875,16 +896,18 @@
 	}
 }
 
-func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, msg string) {
-	vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, msg)
+func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, clientVCClosed bool, errMsg error) {
+	vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, errMsg)
 	vif.deleteVC(vc.VCI())
-	vc.Close(msg)
-	// HACK: Don't send CloseVC if it is a "failed new decoder" error because that means the
-	// client already has closed its VC.
-	// TODO(suharshs,ataly,ashankar): Find a better way to fix: https://github.com/veyron/release-issues/issues/1234.
-	if strings.Contains(msg, "failed to create new decoder") {
+	vc.Close(errMsg)
+	if clientVCClosed {
+		// No point in sending to the client if the VC is closed, or otherwise broken.
 		return
 	}
+	msg := ""
+	if errMsg != nil {
+		msg = errMsg.Error()
+	}
 	if err := vif.sendOnExpressQ(&message.CloseVC{
 		VCI:   vc.VCI(),
 		Error: msg,
@@ -910,7 +933,7 @@
 	for _, vc := range vcs {
 		if naming.Compare(vc.RemoteEndpoint().RoutingID(), remote.RoutingID()) {
 			vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif)
-			vif.closeVCAndSendMsg(vc, "")
+			vif.closeVCAndSendMsg(vc, false, nil)
 			n++
 		}
 	}