profiles/internal/rcp/stream: verror conversions.
Change-Id: Iac0aac0ed95817db7678f3527ae8eb55242be8c5
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index 670721d..85798a2 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -11,7 +11,6 @@
import (
"bytes"
- "errors"
"fmt"
"net"
"sort"
@@ -42,8 +41,31 @@
const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/vif"
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
var (
- errShuttingDown = verror.Register(pkgPath+".errShuttingDown", verror.NoRetry, "{1:}{2:} underlying network connection({3}) shutting down{:_}")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errShuttingDown = reg(".errShuttingDown", "underlying network connection({3}) shutting down")
+ errVCHandshakeFailed = reg(".errVCHandshakeFailed", "VC handshake failed{:3}")
+ errSendOnExpressQFailed = reg(".errSendOnExpressQFailed", "vif.sendOnExpressQ(OpenVC) failed{:3}")
+ errVIFIsBeingClosed = reg(".errVIFIsBeingClosed", "VIF is being closed")
+ errVIFAlreadyAcceptingFlows = reg(".errVIFAlreadyAcceptingFlows", "already accepting flows on VIF {3}")
+ errVCsNotAcceptedOnVIF = reg(".errVCsNotAcceptedOnVIF", "VCs not accepted on VIF {3}")
+ errAcceptFailed = reg(".errAcceptFailed", "Accept failed{:3}")
+ errRemoteEndClosedVC = reg(".errRemoteEndClosedVC", "remote end closed VC{:3}")
+ errFlowsNoLongerAccepted = reg(".errFlowsNowLongerAccepted", "Flows no longer being accepted")
+ errVCAcceptFailed = reg(".errVCAcceptFailed", "VC accept failed{:3}")
+ errIdleTimeout = reg(".errIdleTimeout", "idle timeout")
+ errVIFAlreadySetup = reg(".errVIFAlreadySetupt", "VIF is already setup")
+ errBqueueWriterForXpress = reg(".errBqueueWriterForXpress", "failed to create bqueue.Writer for express messages{:3}")
+ errBqueueWriterForControl = reg(".errBqueueWriterForControl", "failed to create bqueue.Writer for flow control counters{:3}")
+ errBqueueWriterForStopping = reg(".errBqueueWriterForStopping", "failed to create bqueue.Writer for stopping the write loop{:3}")
+ errWriteFailed = reg(".errWriteFailed", "write failed: got ({3}, {4}) for {5} byte message)")
)
// VIF implements a "virtual interface" over an underlying network connection
@@ -131,10 +153,6 @@
sharedFlowID = vc.SharedFlowID
)
-var (
- errAlreadySetup = errors.New("VIF is already setup")
-)
-
// InternalNewDialedVIF creates a new virtual interface over the provided
// network connection, under the assumption that the conn object was created
// using net.Dial. If onClose is given, it is run in its own goroutine when
@@ -201,19 +219,19 @@
expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow)
if err != nil {
- return nil, fmt.Errorf("failed to create bqueue.Writer for express messages: %v", err)
+ return nil, verror.New(errBqueueWriterForXpress, nil, err)
}
expressQ.Release(-1) // Disable flow control
flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size())
if err != nil {
- return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err)
+ return nil, verror.New(errBqueueWriterForControl, nil, err)
}
flowQ.Release(-1) // Disable flow control
stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1)
if err != nil {
- return nil, fmt.Errorf("failed to create bqueue.Writer for stopping the write loop: %v", err)
+ return nil, verror.New(errBqueueWriterForStopping, nil, err)
}
stopQ.Release(-1) // Disable flow control
@@ -241,7 +259,7 @@
vif.idleTimerMap = newIdleTimerMap(func(vci id.VC) {
vc, _, _ := vif.vcMap.Find(vci)
if vc != nil {
- vif.closeVCAndSendMsg(vc, "idle timeout")
+ vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
}
})
go vif.readLoop()
@@ -281,15 +299,15 @@
Counters: counters})
if err != nil {
vif.deleteVC(vc.VCI())
- err = fmt.Errorf("vif.sendOnExpressQ(OpenVC) failed: %v", err)
- vc.Close(err.Error())
+ err = verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
+ vc.Close(err)
return nil, err
}
if err := vc.HandshakeDialedVC(principal, opts...); err != nil {
vif.deleteVC(vc.VCI())
- err = fmt.Errorf("VC handshake failed: %v", err)
- vc.Close(err.Error())
- return nil, err
+ verr := verror.New(stream.ErrSecurity, nil, verror.New(errVCHandshakeFailed, nil, err))
+ vc.Close(verr)
+ return nil, verr
}
return vc, nil
}
@@ -343,7 +361,7 @@
// Stop the idle timers.
vif.idleTimerMap.Stop()
for _, vc := range vcs {
- vc.VC.Close("VIF is being closed")
+ vc.VC.Close(verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil)))
}
// Wait for the vcWriteLoops to exit (after draining queued up messages).
vif.stopQ.Close()
@@ -367,7 +385,7 @@
vif.muListen.Lock()
defer vif.muListen.Unlock()
if vif.acceptor != nil {
- return fmt.Errorf("already accepting Flows on VIF %v", vif)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil, vif))
}
vif.acceptor = upcqueue.New()
vif.listenerOpts = opts
@@ -409,11 +427,11 @@
acceptor := vif.acceptor
vif.muListen.Unlock()
if acceptor == nil {
- return ConnectorAndFlow{}, fmt.Errorf("VCs not accepted on VIF %v", vif)
+ return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errVCsNotAcceptedOnVIF, nil, vif))
}
item, err := acceptor.Get(nil)
if err != nil {
- return ConnectorAndFlow{}, fmt.Errorf("Accept failed: %v", err)
+ return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
}
return item.(ConnectorAndFlow), nil
}
@@ -510,7 +528,7 @@
// to indicate a 'remote close' rather than a 'local one'. This helps
// with error reporting since we expect reads/writes to occur
// after a remote close, but not after a local close.
- vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error))
+ vc.Close(verror.New(stream.ErrNetwork, nil, verror.New(errRemoteEndClosedVC, nil, m.Error)))
return nil
}
vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
@@ -532,7 +550,7 @@
case *message.HopSetup:
// Configure the VIF. This takes over the conn during negotiation.
if vif.isSetup {
- return errAlreadySetup
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVIFAlreadySetup, nil))
}
vif.muListen.Lock()
dischargeClient := getDischargeClient(vif.listenerOpts)
@@ -579,10 +597,16 @@
vif.rpending.Wait()
}
+func clientVCClosed(err error) bool {
+ // If we've encountered a networking error, then all likelihood the
+ // connection to the client is closed.
+ return verror.ErrorID(err) == stream.ErrNetwork.ID
+}
+
func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) {
hr := <-c
if hr.Error != nil {
- vif.closeVCAndSendMsg(vc, hr.Error.Error())
+ vif.closeVCAndSendMsg(vc, clientVCClosed(hr.Error), hr.Error)
return
}
@@ -590,13 +614,13 @@
acceptor := vif.acceptor
vif.muListen.Unlock()
if acceptor == nil {
- vif.closeVCAndSendMsg(vc, "Flows no longer being accepted")
+ vif.closeVCAndSendMsg(vc, false, verror.New(errFlowsNoLongerAccepted, nil))
return
}
// Notify any listeners that a new VC has been established
if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil {
- vif.closeVCAndSendMsg(vc, fmt.Sprintf("VC accept failed: %v", err))
+ vif.closeVCAndSendMsg(vc, clientVCClosed(err), verror.New(errVCAcceptFailed, nil, err))
return
}
@@ -766,7 +790,7 @@
return err
}
if n, err := vif.conn.Write(msg); err != nil {
- return fmt.Errorf("write failed: got (%d, %v) for %d byte message", n, err, len(msg))
+ return verror.New(errWriteFailed, nil, n, err, len(msg))
}
return nil
}
@@ -858,11 +882,8 @@
wq.Close()
}
vif.vcMap.Delete(vci)
- vc.Close("underlying network connection shutting down")
- // We embed an error inside verror.ErrAborted because other layers
- // check for the "Aborted" error as a special case. Perhaps
- // eventually we'll get rid of the Aborted layer.
- return nil, verror.New(verror.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
+ vc.Close(verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif)))
+ return nil, verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
}
vif.idleTimerMap.Insert(vc.VCI(), idleTimeout)
return vc, nil
@@ -875,16 +896,18 @@
}
}
-func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, msg string) {
- vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, msg)
+func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, clientVCClosed bool, errMsg error) {
+ vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, errMsg)
vif.deleteVC(vc.VCI())
- vc.Close(msg)
- // HACK: Don't send CloseVC if it is a "failed new decoder" error because that means the
- // client already has closed its VC.
- // TODO(suharshs,ataly,ashankar): Find a better way to fix: https://github.com/veyron/release-issues/issues/1234.
- if strings.Contains(msg, "failed to create new decoder") {
+ vc.Close(errMsg)
+ if clientVCClosed {
+ // No point in sending to the client if the VC is closed, or otherwise broken.
return
}
+ msg := ""
+ if errMsg != nil {
+ msg = errMsg.Error()
+ }
if err := vif.sendOnExpressQ(&message.CloseVC{
VCI: vc.VCI(),
Error: msg,
@@ -910,7 +933,7 @@
for _, vc := range vcs {
if naming.Compare(vc.RemoteEndpoint().RoutingID(), remote.RoutingID()) {
vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif)
- vif.closeVCAndSendMsg(vc, "")
+ vif.closeVCAndSendMsg(vc, false, nil)
n++
}
}