profiles/internal/rcp/stream: verror conversions.
Change-Id: Iac0aac0ed95817db7678f3527ae8eb55242be8c5
diff --git a/profiles/internal/rpc/stream/crypto/box.go b/profiles/internal/rpc/stream/crypto/box.go
index 95e27a1..0dfba87 100644
--- a/profiles/internal/rpc/stream/crypto/box.go
+++ b/profiles/internal/rpc/stream/crypto/box.go
@@ -14,7 +14,26 @@
"golang.org/x/crypto/nacl/box"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/iobuf"
+ "v.io/x/ref/profiles/internal/rpc/stream"
+)
+
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/crypto"
+
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errCipherTextTooShort = reg(".errCipherTextTooShort", "ciphertext too short")
+ errMessageAuthFailed = reg(".errMessageAuthFailed", "message authentication failed")
+ errUnrecognizedCipherText = reg(".errUnrecognizedCipherText", "CipherSuite {3} is not recognized. Must use one that uses Diffie-Hellman as the key exchange algorithm")
)
type boxcrypter struct {
@@ -80,13 +99,13 @@
c.readNonce += 2
retLen := len(src.Contents) - box.Overhead
if retLen < 0 {
- return nil, fmt.Errorf("ciphertext too short")
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errCipherTextTooShort, nil))
}
ret := c.alloc.Alloc(uint(retLen))
var ok bool
ret.Contents, ok = box.OpenAfterPrecomputation(ret.Contents[:0], src.Contents, &nonce, &c.sharedKey)
if !ok {
- return nil, fmt.Errorf("message authentication failed")
+ return nil, verror.New(stream.ErrSecurity, nil, verror.New(errMessageAuthFailed, nil))
}
return ret, nil
}
diff --git a/profiles/internal/rpc/stream/crypto/box_cipher.go b/profiles/internal/rpc/stream/crypto/box_cipher.go
index ce85fe4..5dc0cb8 100644
--- a/profiles/internal/rpc/stream/crypto/box_cipher.go
+++ b/profiles/internal/rpc/stream/crypto/box_cipher.go
@@ -6,10 +6,13 @@
import (
"encoding/binary"
- "errors"
"golang.org/x/crypto/nacl/box"
"golang.org/x/crypto/salsa20/salsa"
+
+ "v.io/v23/verror"
+
+ "v.io/x/ref/profiles/internal/rpc/stream"
)
// cbox implements a ControlCipher using go.crypto/nacl/box.
@@ -32,7 +35,11 @@
)
var (
- errMessageTooShort = errors.New("control cipher: message is too short")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errMessageTooShort = reg(".errMessageTooShort", "control cipher: message is too short")
)
func (s *cboxStream) alloc(n int) []byte {
@@ -92,7 +99,7 @@
func (c *cbox) Seal(data []byte) error {
n := len(data)
if n < cboxMACSize {
- return errMessageTooShort
+ return verror.New(stream.ErrNetwork, nil, verror.New(errMessageTooShort, nil))
}
tmp := c.enc.alloc(n)
nonce := c.enc.currentNonce()
diff --git a/profiles/internal/rpc/stream/crypto/tls.go b/profiles/internal/rpc/stream/crypto/tls.go
index 2f68b6b..8b371ab 100644
--- a/profiles/internal/rpc/stream/crypto/tls.go
+++ b/profiles/internal/rpc/stream/crypto/tls.go
@@ -9,17 +9,26 @@
import (
"bytes"
"crypto/tls"
- "errors"
"fmt"
"io"
"net"
"sync"
"time"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/iobuf"
+ "v.io/x/ref/profiles/internal/rpc/stream"
)
-var errDeadlinesNotSupported = errors.New("deadlines not supported")
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errDeadlinesNotSupported = reg(".errDeadlinesNotSupported", "deadlines not supported")
+ errEndOfEncryptedSlice = reg(".errEndOfEncryptedSlice", "end of encrypted slice")
+)
// TLSClientSessionCacheOpt specifies the ClientSessionCache used to resume TLS sessions.
// It adapts tls.ClientSessionCache to the v.io/v23/x/ref/profiles/internal/rpc/stream.VCOpt interface.
@@ -63,7 +72,7 @@
return c.handshakeConn.Read(b)
}
if len(c.in) == 0 {
- return 0, tempError{}
+ return 0, stream.NewNetError(verror.New(stream.ErrNetwork, nil, verror.New(errEndOfEncryptedSlice, nil)), false, true)
}
n = copy(b, c.in)
c.in = c.in[n:]
@@ -77,26 +86,23 @@
return c.out.Write(b)
}
-func (*fakeConn) Close() error { return nil }
-func (c *fakeConn) LocalAddr() net.Addr { return c.laddr }
-func (c *fakeConn) RemoteAddr() net.Addr { return c.raddr }
-func (*fakeConn) SetDeadline(t time.Time) error { return errDeadlinesNotSupported }
-func (*fakeConn) SetReadDeadline(t time.Time) error { return errDeadlinesNotSupported }
-func (*fakeConn) SetWriteDeadline(t time.Time) error { return errDeadlinesNotSupported }
-
-// tempError implements net.Error and returns true for Temporary.
-// Providing this error in fakeConn.Read allows tls.Conn.Read to return with an
-// error without changing underlying state.
-type tempError struct{}
-
-func (tempError) Error() string { return "end of encrypted slice" }
-func (tempError) Timeout() bool { return false }
-func (tempError) Temporary() bool { return true }
+func (*fakeConn) Close() error { return nil }
+func (c *fakeConn) LocalAddr() net.Addr { return c.laddr }
+func (c *fakeConn) RemoteAddr() net.Addr { return c.raddr }
+func (*fakeConn) SetDeadline(t time.Time) error {
+ return verror.New(stream.ErrBadState, nil, verror.New(errDeadlinesNotSupported, nil))
+}
+func (*fakeConn) SetReadDeadline(t time.Time) error {
+ return verror.New(stream.ErrBadState, nil, verror.New(errDeadlinesNotSupported, nil))
+}
+func (*fakeConn) SetWriteDeadline(t time.Time) error {
+ return verror.New(stream.ErrBadState, nil, verror.New(errDeadlinesNotSupported, nil))
+}
// tlsCrypter implements the Crypter interface using crypto/tls.
//
// crypto/tls provides a net.Conn, while the Crypter interface operates on
-// iobuf.Slice objects. In order to adapt to the Crypter interface, the
+// iobuf.Slice objects. In order to adapt to the Crypter in stream.ErrNetwork, verrorterface, the
// strategy is as follows:
//
// - netTLSCrypter wraps a net.Conn with an alternative implementation
@@ -146,7 +152,7 @@
case tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
default:
t.Close()
- return nil, fmt.Errorf("CipherSuite 0x%04x is not recognized. Must use one that uses Diffie-Hellman as the key exchange algorithm", cs)
+ return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnrecognizedCipherText, nil, fmt.Sprintf("0x%04x", cs)))
}
fc.handshakeConn = nil
return &tlsCrypter{
@@ -183,7 +189,7 @@
for {
n, err := c.tls.Read(out)
if err != nil {
- if _, exit := err.(tempError); exit {
+ if _, exit := err.(*stream.NetError); exit {
break
}
plaintext.Release()
diff --git a/profiles/internal/rpc/stream/error_test.go b/profiles/internal/rpc/stream/error_test.go
new file mode 100644
index 0000000..d576d38
--- /dev/null
+++ b/profiles/internal/rpc/stream/error_test.go
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stream_test
+
+import (
+ "net"
+ "testing"
+
+ "v.io/v23/verror"
+
+ "v.io/x/ref/profiles/internal/rpc/stream"
+)
+
+func TestTimeoutError(t *testing.T) {
+ e := verror.Register(".test", verror.NoRetry, "hello{:3}")
+ timeoutErr := stream.NewNetError(verror.New(e, nil, "world"), true, false)
+
+ // TimeoutError implements error & net.Error. We test that it
+ // implements error by assigning timeoutErr to err which is of type error.
+ var err error
+ err = timeoutErr
+
+ neterr, ok := err.(net.Error)
+ if !ok {
+ t.Fatalf("%T not a net.Error", err)
+ }
+
+ if got, want := neterr.Timeout(), true; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := neterr.Error(), "hello: world"; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+}
diff --git a/profiles/internal/rpc/stream/errors.go b/profiles/internal/rpc/stream/errors.go
index f7d7515..0aa2e9d 100644
--- a/profiles/internal/rpc/stream/errors.go
+++ b/profiles/internal/rpc/stream/errors.go
@@ -5,32 +5,42 @@
package stream
import (
+ "net"
+
"v.io/v23/verror"
)
const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream"
-// The stream family of packages guarantee to return one of the verror codes defined here, their
-// messages are constructed so as to avoid embedding a component/method name and are thus
-// more suitable for inclusion in other verrors.
+// The stream family of packages guarantee to return one of the verror codes defined
+// here, their messages are constructed so as to avoid embedding a component/method name
+// and are thus more suitable for inclusion in other verrors.
+// This practiced of omitting {1}{2} is used throughout the stream packages since all
+// of their errors are intended to be used as arguments to higher level errors.
var (
ErrSecurity = verror.Register(pkgPath+".errSecurity", verror.NoRetry, "{:3}")
ErrNetwork = verror.Register(pkgPath+".errNetwork", verror.NoRetry, "{:3}")
ErrProxy = verror.Register(pkgPath+".errProxy", verror.NoRetry, "{:3}")
ErrBadArg = verror.Register(pkgPath+".errBadArg", verror.NoRetry, "{:3}")
ErrBadState = verror.Register(pkgPath+".errBadState", verror.NoRetry, "{:3}")
+ ErrAborted = verror.Register(pkgPath+".errAborted", verror.NoRetry, "{:3}")
// TODO(cnicolaou): remove this when the rest of the stream sub packages are converted.
ErrSecOrNet = verror.Register(pkgPath+".errSecOrNet", verror.NoRetry, "{:3}")
- // Update IsStreamError below if you add any other errors here.
)
-// IsStreamError returns true if the err is one of the verror codes defined by this package.
-func IsStreamError(err error) bool {
- id := verror.ErrorID(err)
- switch id {
- case ErrSecurity.ID, ErrNetwork.ID, ErrProxy.ID, ErrBadArg.ID, ErrBadState.ID, ErrSecOrNet.ID:
- return true
- default:
- return false
- }
+// NetError implements net.Error
+type NetError struct {
+ err error
+ timeout, temp bool
}
+
+// NewNetError returns a new net.Error which will return the
+// supplied error, timeout and temporary parameters when the corresponding
+// methods are invoked.
+func NewNetError(err error, timeout, temporary bool) net.Error {
+ return &NetError{err, timeout, temporary}
+}
+
+func (t NetError) Error() string { return t.err.Error() }
+func (t NetError) Timeout() bool { return t.timeout }
+func (t NetError) Temporary() bool { return t.temp }
diff --git a/profiles/internal/rpc/stream/manager/error_test.go b/profiles/internal/rpc/stream/manager/error_test.go
index b2c2f9d..9c33348 100644
--- a/profiles/internal/rpc/stream/manager/error_test.go
+++ b/profiles/internal/rpc/stream/manager/error_test.go
@@ -51,7 +51,7 @@
// bad address
_, _, err = server.Listen("tcp", "xx.0.0.1:0", pserver, pserver.BlessingStore().Default())
- if verror.ErrorID(err) != stream.ErrBadArg.ID {
+ if verror.ErrorID(err) != stream.ErrNetwork.ID {
t.Fatalf("wrong error: %s", err)
}
t.Log(err)
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index 16ace2f..3bbdbf6 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -29,17 +29,21 @@
}
var (
- errVomEncoder = reg(".vomEncoder", "failed to create vom encoder{:3}")
- errVomDecoder = reg(".vomDecoder", "failed to create vom decoder{:3}")
- errVomEncodeRequest = reg(".vomEncodeRequest", "failed to encode request to proxy{:3}")
- errVomDecodeResponse = reg(".vomDecodeRequest", "failed to decoded response from proxy{:3}")
- errProxyError = reg(".proxyError", "proxy error {:3}")
- errProxyEndpointError = reg(".proxyEndpointError", "proxy returned an invalid endpoint {:3}{:4}")
- errAlreadyConnected = reg(".alreadyConnected", "already connected to proxy and accepting connections? VIF: {3}, StartAccepting{:_}")
- errFailedToCreateLivenessFlow = reg(".failedToCreateLivenessFlow", "unable to create liveness check flow to proxy{:3}")
- errAcceptFailed = reg(".acceptFailed", "accept failed{:3}")
- errFailedToEstablishVC = reg(".failedToEstablishVC", "VC establishment with proxy failed{:_}")
- errListenerAlreadyClosed = reg(".listenerAlreadyClosed", "listener already closed")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errVomEncoder = reg(".errVomEncoder", "failed to create vom encoder{:3}")
+ errVomDecoder = reg(".errVomDecoder", "failed to create vom decoder{:3}")
+ errVomEncodeRequest = reg(".errVomEncodeRequest", "failed to encode request to proxy{:3}")
+ errVomDecodeResponse = reg(".errVomDecodeRequest", "failed to decoded response from proxy{:3}")
+ errProxyError = reg(".errProxyError", "proxy error {:3}")
+ errProxyEndpointError = reg(".errProxyEndpointError", "proxy returned an invalid endpoint {:3}{:4}")
+ errAlreadyConnected = reg(".errAlreadyConnected", "already connected to proxy and accepting connections? VIF: {3}, StartAccepting{:_}")
+ errFailedToCreateLivenessFlow = reg(".errFailedToCreateLivenessFlow", "unable to create liveness check flow to proxy{:3}")
+ errAcceptFailed = reg(".errAcceptFailed", "accept failed{:3}")
+ errFailedToEstablishVC = reg(".errFailedToEstablishVC", "VC establishment with proxy failed{:_}")
+ errListenerAlreadyClosed = reg(".errListenerAlreadyClosed", "listener already closed")
)
// listener extends stream.Listener with a DebugString method.
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 756a479..88fa2af 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -30,11 +30,15 @@
const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/manager"
var (
- errUnknownNetwork = reg(".unknownNetwork", "unknown network{:3}")
- errEndpointParseError = reg(".endpointParseError", "failed to parse endpoint {3}{:4}")
- errAlreadyShutdown = reg(".alreadyShutdown", "already shutdown")
- errProvidedServerBlessingsWithoutPrincipal = reg(".serverBlessingsWithoutPrincipal", "blessings provided but with no principal")
- errNoBlessingNames = reg(".noBlessingNames", "no blessing names could be extracted for the provided principal")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errUnknownNetwork = reg(".errUnknownNetwork", "unknown network{:3}")
+ errEndpointParseError = reg(".errEndpointParseError", "failed to parse endpoint {3}{:4}")
+ errAlreadyShutdown = reg(".errAlreadyShutdown", "already shutdown")
+ errProvidedServerBlessingsWithoutPrincipal = reg(".errServerBlessingsWithoutPrincipal", "blessings provided but with no principal")
+ errNoBlessingNames = reg(".errNoBlessingNames", "no blessing names could be extracted for the provided principal")
)
const (
@@ -80,7 +84,11 @@
func dial(network, address string, timeout time.Duration) (net.Conn, error) {
if d, _, _ := rpc.RegisteredProtocol(network); d != nil {
- return d(network, address, timeout)
+ conn, err := d(network, address, timeout)
+ if err != nil {
+ return nil, verror.New(stream.ErrNetwork, nil, err)
+ }
+ return conn, nil
}
return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnknownNetwork, nil, network))
}
@@ -105,9 +113,6 @@
vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
conn, err := dial(network, address, timeout)
if err != nil {
- if !stream.IsStreamError(err) {
- err = verror.New(stream.ErrNetwork, nil, err)
- }
return nil, err
}
// (network, address) in the endpoint might not always match up
@@ -156,7 +161,7 @@
}
opts = append([]stream.VCOpt{m.sessionCache, vc.IdleTimeout{defaultIdleTimeout}}, opts...)
vc, err := vf.Dial(remote, principal, opts...)
- if !retry || verror.ErrorID(err) != verror.ErrAborted.ID {
+ if !retry || verror.ErrorID(err) != stream.ErrAborted.ID {
return vc, err
}
vf.Close()
@@ -166,7 +171,11 @@
func listen(protocol, address string) (net.Listener, error) {
if _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
- return l(protocol, address)
+ ln, err := l(protocol, address)
+ if err != nil {
+ return nil, verror.New(stream.ErrNetwork, nil, err)
+ }
+ return ln, nil
}
return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnknownNetwork, nil, protocol))
}
@@ -202,10 +211,6 @@
}
netln, err := listen(protocol, address)
if err != nil {
- if !stream.IsStreamError(err) {
- vlog.Infof("XXXX %v : %s\n", verror.ErrorID(err), err)
- err = verror.New(stream.ErrBadArg, nil, err)
- }
return nil, nil, err
}
diff --git a/profiles/internal/rpc/stream/message/coding.go b/profiles/internal/rpc/stream/message/coding.go
index 4186a0a..619ebd3 100644
--- a/profiles/internal/rpc/stream/message/coding.go
+++ b/profiles/internal/rpc/stream/message/coding.go
@@ -6,18 +6,31 @@
import (
"encoding/binary"
- "errors"
- "fmt"
"io"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/rpc/stream/id"
)
-var errLargerThan3ByteUint = errors.New("integer too large to represent in 3 bytes")
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/message"
+
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errLargerThan3ByteUint = reg(".errLargerThan3ByteUnit", "integer too large to represent in 3 bytes")
+ errReadWrongNumBytes = reg(".errReadWrongNumBytes", "read {3} bytes, wanted to read {4}")
+)
func write3ByteUint(dst []byte, n int) error {
if n >= (1<<24) || n < 0 {
- return errLargerThan3ByteUint
+ return verror.New(errLargerThan3ByteUint, nil)
}
dst[0] = byte((n & 0xff0000) >> 16)
dst[1] = byte((n & 0x00ff00) >> 8)
@@ -59,7 +72,7 @@
return err
}
if n != int(size) {
- return io.ErrUnexpectedEOF
+ return verror.New(errReadWrongNumBytes, nil, n, int(size))
}
*s = string(bytes)
return nil
@@ -75,12 +88,13 @@
return err
}
if n != int(size) {
- return io.ErrUnexpectedEOF
+ return verror.New(errReadWrongNumBytes, nil, n, int(size))
}
return nil
}
-// byteReader adapts an io.Reader to an io.ByteReader
+// byteReader adapts an io.Reader to an io.ByteReader so that we can
+// use it with encoding/Binary for varint etc.
type byteReader struct{ io.Reader }
func (b byteReader) ReadByte() (byte, error) {
@@ -92,7 +106,7 @@
case err != nil:
return 0, err
default:
- return 0, fmt.Errorf("read %d bytes, wanted to read 1", n)
+ return 0, verror.New(errReadWrongNumBytes, nil, n, 1)
}
}
diff --git a/profiles/internal/rpc/stream/message/control.go b/profiles/internal/rpc/stream/message/control.go
index 75afa34..93bfb31 100644
--- a/profiles/internal/rpc/stream/message/control.go
+++ b/profiles/internal/rpc/stream/message/control.go
@@ -10,11 +10,26 @@
"io"
"v.io/v23/naming"
+ "v.io/v23/verror"
+
inaming "v.io/x/ref/profiles/internal/naming"
"v.io/x/ref/profiles/internal/rpc/stream/id"
"v.io/x/ref/profiles/internal/rpc/version"
)
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errUnrecognizedVCControlMessageCommand = reg(".errUnrecognizedVCControlMessageCommand",
+ "unrecognized VC control message command({3})")
+ errUnrecognizedVCControlMessageType = reg(".errUnrecognizedVCControlMessageType",
+ "unrecognized VC control message type({3})")
+ errFailedToDeserializedVCControlMessage = reg(".errFailedToDeserializedVCControlMessage", "failed to deserialize control message {3}({4}): {5}")
+ errFailedToWriteHeader = reg(".errFailedToWriteHeader", "failed to write header. Wrote {3} bytes instead of {4}{:5}")
+)
+
// Control is the interface implemented by all control messages.
type Control interface {
readFrom(r *bytes.Buffer) error
@@ -145,12 +160,12 @@
case *SetupVC:
command = setupVCCommand
default:
- return fmt.Errorf("unrecognized VC control message: %T", m)
+ return verror.New(errUnrecognizedVCControlMessageType, nil, fmt.Sprintf("%T", m))
}
var header [1]byte
header[0] = byte(command)
if n, err := w.Write(header[:]); n != len(header) || err != nil {
- return fmt.Errorf("failed to write header. Got (%d, %v) want (%d, nil)", n, err, len(header))
+ return verror.New(errFailedToWriteHeader, nil, n, len(header), err)
}
if err := m.writeTo(w); err != nil {
return err
@@ -162,7 +177,7 @@
var header byte
var err error
if header, err = r.ReadByte(); err != nil {
- return nil, fmt.Errorf("message too small, cannot read control message command (0, %v)", err)
+ return nil, err
}
command := command(header)
var m Control
@@ -182,10 +197,10 @@
case setupVCCommand:
m = new(SetupVC)
default:
- return nil, fmt.Errorf("unrecognized VC control message command(%d)", command)
+ return nil, verror.New(errUnrecognizedVCControlMessageCommand, nil, command)
}
if err := m.readFrom(r); err != nil {
- return nil, fmt.Errorf("failed to deserialize control message %d(%T): %v", command, m, err)
+ return nil, verror.New(errFailedToDeserializedVCControlMessage, nil, command, fmt.Sprintf("%T", m), err)
}
return m, nil
}
diff --git a/profiles/internal/rpc/stream/message/message.go b/profiles/internal/rpc/stream/message/message.go
index b8ef71f..c729949 100644
--- a/profiles/internal/rpc/stream/message/message.go
+++ b/profiles/internal/rpc/stream/message/message.go
@@ -64,11 +64,13 @@
import (
"bytes"
- "errors"
"fmt"
"io"
"v.io/x/lib/vlog"
+
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/iobuf"
"v.io/x/ref/profiles/internal/rpc/stream/crypto"
"v.io/x/ref/profiles/internal/rpc/stream/id"
@@ -93,8 +95,16 @@
)
var (
- emptyMessageErr = errors.New("message is empty")
- corruptedMessageErr = errors.New("corrupted message")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errEmptyMessage = reg(".errEmptyMessage", "message is empty")
+ errCorruptedMessage = reg(".errCorruptedMessage", "corrupted message")
+ errInvalidMessageType = reg("errInvalidMessageType", "invalid message type {3}")
+ errUnrecognizedMessageType = reg("errUrecognizedMessageType", "unrecognized message type {3}")
+ errFailedToReadVCHeader = reg(".errFailedToReadVCHeader", "failed to read VC header{:3}")
+ errFailedToReadPayload = reg(".errFailedToReadPayload", "failed to read payload of {3} bytes for type {4}{:5}")
)
// T is the interface implemented by all messages communicated over a VIF.
@@ -117,7 +127,7 @@
func ReadFrom(r *iobuf.Reader, c crypto.ControlCipher) (T, error) {
header, err := r.Read(commonHeaderSizeBytes)
if err != nil {
- return nil, fmt.Errorf("failed to read VC header: %v", err)
+ return nil, verror.New(errFailedToReadVCHeader, nil, err)
}
c.Decrypt(header.Contents)
msgType := header.Contents[0]
@@ -125,14 +135,14 @@
header.Release()
payload, err := r.Read(msgPayloadSize)
if err != nil {
- return nil, fmt.Errorf("failed to read payload of %d bytes for type %d: %v", msgPayloadSize, msgType, err)
+ return nil, verror.New(errFailedToReadPayload, nil, msgPayloadSize, msgType, err)
}
macSize := c.MACSize()
switch msgType {
case controlType, controlTypeWS:
if !c.Open(payload.Contents) {
payload.Release()
- return nil, corruptedMessageErr
+ return nil, verror.New(errCorruptedMessage, nil)
}
m, err := readControl(bytes.NewBuffer(payload.Contents[:msgPayloadSize-macSize]))
payload.Release()
@@ -140,7 +150,7 @@
case dataType, dataTypeWS:
if !c.Open(payload.Contents[0 : dataHeaderSizeBytes+macSize]) {
payload.Release()
- return nil, corruptedMessageErr
+ return nil, verror.New(errCorruptedMessage, nil)
}
m := &Data{
VCI: id.VC(read4ByteUint(payload.Contents[0:4])),
@@ -152,7 +162,7 @@
return m, nil
default:
payload.Release()
- return nil, fmt.Errorf("unrecognized message type: %d", msgType)
+ return nil, verror.New(errUnrecognizedMessageType, nil, msgType)
}
}
@@ -207,7 +217,7 @@
_, err := w.Write(msg)
return err
default:
- return fmt.Errorf("invalid message type %T", m)
+ return verror.New(errInvalidMessageType, nil, fmt.Sprintf("%T", m))
}
return nil
}
@@ -215,7 +225,7 @@
// EncryptMessage encrypts the message's control data in place.
func EncryptMessage(msg []byte, c crypto.ControlCipher) error {
if len(msg) == 0 {
- return emptyMessageErr
+ return verror.New(errEmptyMessage, nil)
}
n := len(msg)
switch msgType := msg[0]; msgType {
@@ -224,7 +234,7 @@
case dataType:
n = HeaderSizeBytes + c.MACSize()
default:
- return fmt.Errorf("unrecognized message type: %d", msgType)
+ return verror.New(errUnrecognizedMessageType, nil, msgType)
}
c.Encrypt(msg[0:commonHeaderSizeBytes])
c.Seal(msg[commonHeaderSizeBytes:n])
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 3738181..184b209 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -5,7 +5,6 @@
package proxy
import (
- "errors"
"fmt"
"net"
"sync"
@@ -25,6 +24,7 @@
"v.io/x/ref/profiles/internal/lib/iobuf"
"v.io/x/ref/profiles/internal/lib/publisher"
"v.io/x/ref/profiles/internal/lib/upcqueue"
+ "v.io/x/ref/profiles/internal/rpc/stream"
"v.io/x/ref/profiles/internal/rpc/stream/crypto"
"v.io/x/ref/profiles/internal/rpc/stream/id"
"v.io/x/ref/profiles/internal/rpc/stream/message"
@@ -37,13 +37,36 @@
const pkgPath = "v.io/x/ref/profiles/proxy"
-var (
- errNoRoutingTableEntry = errors.New("routing table has no entry for the VC")
- errProcessVanished = errors.New("remote process vanished")
- errDuplicateOpenVC = errors.New("duplicate OpenVC request")
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
- errNoDecoder = verror.Register(pkgPath+".errNoDecoder", verror.NoRetry, "{1:}{2:} proxy: failed to create Decoder{:_}")
- errNoRequest = verror.Register(pkgPath+".errNoRequest", verror.NoRetry, "{1:}{2:} proxy: unable to read Request{:_}")
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errNoRoutingTableEntry = reg(".errNoRoutingTableEntry", "routing table has no entry for the VC")
+ errProcessVanished = reg(".errProcessVanished", "remote process vanished")
+ errDuplicateOpenVC = reg(".errDuplicateOpenVC", "duplicate OpenVC request")
+ errVomDecoder = reg(".errVomDecoder", "failed to create vom decoder{:3}")
+ errVomEncoder = reg(".errVomEncoder", "failed to create vom encoder{:3}")
+ errVomEncodeResponse = reg(".errVomEncodeResponse", "failed to encode response from proxy{:3}")
+ errNoRequest = reg(".errNoRequest", "unable to read Request{:3}")
+ errServerClosedByProxy = reg(".errServerClosedByProxy", "server closed by proxy")
+ errRemoveServerVC = reg(".errRemoveServerVC", "failed to remove server VC {3}{:4}")
+ errNetConnClosing = reg(".errNetConnClosing", "net.Conn is closing")
+ errFailedToAcceptHealthCheck = reg(".errFailedToAcceptHealthCheck", "failed to accept health check flow")
+ errIncompatibleVersions = reg(".errIncompatibleVersions", "{:3}")
+ errAlreadyProxied = reg(".errAlreadyProxied", "server with routing id {3} is already being proxied")
+ errUnknownNetwork = reg(".errUnknownNetwork", "unknown network {3}")
+ errListenFailed = reg(".errListenFailed", "net.Listen({3}, {4}) failed{:5}")
+ errFailedToForwardRxBufs = reg(".errFailedToForwardRxBufs", "failed to forward receive buffers{:3}")
+ errFailedToFowardDataMsg = reg(".errFailedToFowardDataMsg", "failed to forward data message{:3}")
+ errFailedToFowardOpenFlow = reg(".errFailedToFowardOpenFlow", "failed to forward open flow{:3}")
+ errUnsupportedSetupVC = reg(".errUnsupportedSetupVC", "proxy support for SetupVC not implemented yet")
+ errServerNotBeingProxied = reg(".errServerNotBeingProxied", "no server with routing id {3} is being proxied")
+ errServerVanished = reg(".errServerVanished", "server with routing id {3} vanished")
)
// Proxy routes virtual circuit (VC) traffic between multiple underlying
@@ -94,9 +117,9 @@
func (s *server) Close(err error) {
if vc := s.Process.RemoveServerVC(s.VC.VCI()); vc != nil {
if err != nil {
- vc.Close(err.Error())
+ vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errRemoveServerVC, nil, s.VC.VCI(), err)))
} else {
- vc.Close("server closed by proxy")
+ vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errServerClosedByProxy, nil)))
}
s.Process.SendCloseVC(s.VC.VCI(), err)
}
@@ -118,7 +141,7 @@
m.mu.Lock()
defer m.mu.Unlock()
if m.m[key] != nil {
- return fmt.Errorf("server with routing id %v is already being proxied", key)
+ return verror.New(stream.ErrProxy, nil, verror.New(errAlreadyProxied, nil, key))
}
m.m[key] = server
proxyLog().Infof("Started proxying server: %v", server)
@@ -197,11 +220,11 @@
func internalNew(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (*Proxy, error) {
_, listenFn, _ := rpc.RegisteredProtocol(network)
if listenFn == nil {
- return nil, fmt.Errorf("unknown network %s", network)
+ return nil, verror.New(stream.ErrProxy, nil, verror.New(errUnknownNetwork, nil, network))
}
ln, err := listenFn(network, address)
if err != nil {
- return nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", network, address, err)
+ return nil, verror.New(stream.ErrProxy, nil, verror.New(errListenFailed, nil, network, address, err))
}
if len(pubAddress) == 0 {
pubAddress = ln.Addr().String()
@@ -276,7 +299,7 @@
// See comments in protocol.vdl for the protocol between servers and the proxy.
conn, err := hr.Listener.Accept()
if err != nil {
- server.Close(errors.New("failed to accept health check flow"))
+ server.Close(verror.New(stream.ErrProxy, nil, verror.New(errFailedToAcceptHealthCheck, nil)))
return
}
server.Process.InitVCI(server.VC.VCI())
@@ -284,9 +307,9 @@
var response Response
dec, err := vom.NewDecoder(conn)
if err != nil {
- response.Error = verror.New(errNoDecoder, nil, err)
+ response.Error = verror.New(stream.ErrProxy, nil, verror.New(errVomDecoder, nil, err))
} else if err := dec.Decode(&request); err != nil {
- response.Error = verror.New(errNoRequest, nil, err)
+ response.Error = verror.New(stream.ErrProxy, nil, verror.New(errNoRequest, nil, err))
} else if err := p.servers.Add(server); err != nil {
response.Error = verror.Convert(verror.ErrUnknown, nil, err)
} else {
@@ -302,12 +325,12 @@
enc, err := vom.NewEncoder(conn)
if err != nil {
proxyLog().Infof("Failed to create Encoder for server %v: %v", server, err)
- server.Close(err)
+ server.Close(verror.New(stream.ErrProxy, nil, verror.New(errVomEncoder, nil, err)))
return
}
if err := enc.Encode(response); err != nil {
proxyLog().Infof("Failed to encode response %#v for server %v", response, server)
- server.Close(err)
+ server.Close(verror.New(stream.ErrProxy, nil, verror.New(errVomEncodeResponse, nil, err)))
return
}
// Reject all other flows
@@ -341,7 +364,7 @@
c.Add(d.VCI, cid.Flow(), bytes)
if err := d.Process.queue.Put(&message.AddReceiveBuffers{Counters: c}); err != nil {
process.RemoveRoute(srcVCI)
- process.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward receive buffers: %v", err))
+ process.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errFailedToForwardRxBufs, nil, err)))
}
}
}
@@ -470,11 +493,11 @@
if err := d.Process.queue.Put(m); err != nil {
m.Release()
p.RemoveRoute(srcVCI)
- p.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward data message: %v", err))
+ p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errFailedToFowardDataMsg, nil, err)))
}
break
}
- p.SendCloseVC(srcVCI, errNoRoutingTableEntry)
+ p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errNoRoutingTableEntry, nil)))
case *message.OpenFlow:
if vc := p.ServerVC(m.VCI); vc != nil {
if err := vc.AcceptFlow(m.Flow); err != nil {
@@ -491,14 +514,14 @@
m.VCI = d.VCI
if err := d.Process.queue.Put(m); err != nil {
p.RemoveRoute(srcVCI)
- p.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward open flow message: %v", err))
+ p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errFailedToFowardOpenFlow, nil, err)))
}
break
}
- p.SendCloseVC(srcVCI, errNoRoutingTableEntry)
+ p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errNoRoutingTableEntry, nil)))
case *message.CloseVC:
if vc := p.RemoveServerVC(m.VCI); vc != nil {
- vc.Close(m.Error)
+ vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errRemoveServerVC, nil, m.VCI, m.Error)))
break
}
srcVCI := m.VCI
@@ -515,13 +538,13 @@
if naming.Compare(dstrid, p.proxy.rid) || naming.Compare(dstrid, naming.NullRoutingID) {
// VC that terminates at the proxy.
// TODO(ashankar,mattr): Implement this!
- p.SendCloseVC(m.VCI, fmt.Errorf("proxy support for SetupVC not implemented yet"))
+ p.SendCloseVC(m.VCI, verror.New(stream.ErrProxy, nil, verror.New(errUnsupportedSetupVC, nil)))
p.proxy.routeCounters(p, m.Counters)
break
}
dstprocess := p.proxy.servers.Process(dstrid)
if dstprocess == nil {
- p.SendCloseVC(m.VCI, fmt.Errorf("no server with routing id %v is being proxied", dstrid))
+ p.SendCloseVC(m.VCI, verror.New(stream.ErrProxy, nil, verror.New(errServerNotBeingProxied, nil, dstrid)))
p.proxy.routeCounters(p, m.Counters)
break
}
@@ -538,7 +561,7 @@
dstVCI := dstprocess.AllocVCI()
startRoutingVC(srcVCI, dstVCI, p, dstprocess)
if d = p.Route(srcVCI); d == nil {
- p.SendCloseVC(srcVCI, fmt.Errorf("server with routing id %v vanished", dstrid))
+ p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errServerVanished, nil, dstrid)))
p.proxy.routeCounters(p, m.Counters)
break
}
@@ -576,7 +599,7 @@
}
dstprocess := p.proxy.servers.Process(dstrid)
if dstprocess == nil {
- p.SendCloseVC(m.VCI, fmt.Errorf("no server with routing id %v is being proxied", dstrid))
+ p.SendCloseVC(m.VCI, verror.New(stream.ErrProxy, nil, verror.New(errServerNotBeingProxied, nil, dstrid)))
p.proxy.routeCounters(p, m.Counters)
break
}
@@ -672,11 +695,11 @@
rt := p.routingTable
p.routingTable = nil
for _, vc := range p.servers {
- vc.Close("net.Conn is closing")
+ vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errNetConnClosing, nil)))
}
p.mu.Unlock()
for _, d := range rt {
- d.Process.SendCloseVC(d.VCI, errProcessVanished)
+ d.Process.SendCloseVC(d.VCI, verror.New(stream.ErrProxy, nil, verror.New(errProcessVanished, nil)))
}
p.bq.Close()
p.queue.Close()
@@ -695,12 +718,12 @@
p.mu.Lock()
defer p.mu.Unlock()
if vc := p.servers[m.VCI]; vc != nil {
- vc.Close("duplicate OpenVC request")
+ vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errDuplicateOpenVC, nil)))
return nil
}
version, err := version.CommonVersion(m.DstEndpoint, m.SrcEndpoint)
if err != nil {
- p.SendCloseVC(m.VCI, fmt.Errorf("incompatible RPC protocol versions: %v", err))
+ p.SendCloseVC(m.VCI, verror.New(stream.ErrProxy, nil, verror.New(errIncompatibleVersions, nil, err)))
return nil
}
vc := vc.InternalNew(vc.Params{
diff --git a/profiles/internal/rpc/stream/vc/auth.go b/profiles/internal/rpc/stream/vc/auth.go
index ff0dee9..ffb806f 100644
--- a/profiles/internal/rpc/stream/vc/auth.go
+++ b/profiles/internal/rpc/stream/vc/auth.go
@@ -6,16 +6,16 @@
import (
"bytes"
- "errors"
- "fmt"
"io"
- "v.io/x/ref/profiles/internal/lib/iobuf"
- "v.io/x/ref/profiles/internal/rpc/stream/crypto"
-
"v.io/v23/rpc/version"
"v.io/v23/security"
+ "v.io/v23/verror"
"v.io/v23/vom"
+
+ "v.io/x/ref/profiles/internal/lib/iobuf"
+ "v.io/x/ref/profiles/internal/rpc/stream"
+ "v.io/x/ref/profiles/internal/rpc/stream/crypto"
)
var (
@@ -24,12 +24,18 @@
)
var (
- errSameChannelPublicKey = errors.New("same public keys for both ends of the channel")
- errChannelIDMismatch = errors.New("channel id does not match expectation")
- errChecksumMismatch = errors.New("checksum mismatch")
- errInvalidSignatureInMessage = errors.New("signature does not verify in authentication handshake message")
- errNoCertificatesReceived = errors.New("no certificates received")
- errSingleCertificateRequired = errors.New("exactly one X.509 certificate chain with exactly one certificate is required")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errVomDecoder = reg(".errVomDecoder", "failed to create vom decoder{:3}")
+ errVomEncoder = reg(".errVomEncoder", "failed to create vom encoder{:3}")
+ errVomEncodeBlessing = reg(".errVomEncodeRequest", "failed to encode blessing{:3}")
+ errHandshakeMessage = reg(".errHandshakeMessage", "failed to read hanshake message{:3}")
+ errInvalidSignatureInMessage = reg(".errInvalidSignatureInMessage", "signature does not verify in authentication handshake message")
+ errEncryptBlessing = reg(".errEncryptBlessing", "failed to encrypt blessing{:3}")
+ errFailedToCreateSelfBlessing = reg(".errFailedToCreateSelfBlessing", "failed to create self blessing{:3}")
+ errNoBlessingsToPresentToServer = reg(".errerrNoBlessingsToPresentToServer ", "no blessings to present as a server")
)
// AuthenticateAsServer executes the authentication protocol at the server.
@@ -37,20 +43,20 @@
// by the server.
func AuthenticateAsServer(conn io.ReadWriteCloser, principal security.Principal, server security.Blessings, dc DischargeClient, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
if server.IsZero() {
- return security.Blessings{}, nil, errors.New("no blessings to present as a server")
+ return security.Blessings{}, nil, verror.New(stream.ErrSecurity, nil, verror.New(errNoBlessingsToPresentToServer, nil))
}
var serverDischarges []security.Discharge
if tpcavs := server.ThirdPartyCaveats(); len(tpcavs) > 0 && dc != nil {
serverDischarges = dc.PrepareDischarges(nil, tpcavs, security.DischargeImpetus{})
}
- if err := writeBlessings(conn, authServerContextTag, crypter, principal, server, serverDischarges, v); err != nil {
- return security.Blessings{}, nil, err
+ if errID, err := writeBlessings(conn, authServerContextTag, crypter, principal, server, serverDischarges, v); err != nil {
+ return security.Blessings{}, nil, verror.New(errID, nil, err)
}
// Note that since the client uses a self-signed blessing to authenticate
// during VC setup, it does not share any discharges.
- client, _, err := readBlessings(conn, authClientContextTag, crypter, v)
+ client, _, errID, err := readBlessings(conn, authClientContextTag, crypter, v)
if err != nil {
- return security.Blessings{}, nil, err
+ return security.Blessings{}, nil, verror.New(errID, nil, err)
}
return client, mkDischargeMap(serverDischarges), nil
}
@@ -62,16 +68,16 @@
// The client will only share its blessings if the server (who shares its
// blessings first) is authorized as per the authorizer for this RPC.
func AuthenticateAsClient(conn io.ReadWriteCloser, crypter crypto.Crypter, params security.CallParams, auth *ServerAuthorizer, v version.RPCVersion) (security.Blessings, security.Blessings, map[string]security.Discharge, error) {
- server, serverDischarges, err := readBlessings(conn, authServerContextTag, crypter, v)
+ server, serverDischarges, errID, err := readBlessings(conn, authServerContextTag, crypter, v)
if err != nil {
- return security.Blessings{}, security.Blessings{}, nil, err
+ return security.Blessings{}, security.Blessings{}, nil, verror.New(errID, nil, err)
}
// Authorize the server based on the provided authorizer.
if auth != nil {
params.RemoteBlessings = server
params.RemoteDischarges = serverDischarges
if err := auth.Authorize(params); err != nil {
- return security.Blessings{}, security.Blessings{}, nil, err
+ return security.Blessings{}, security.Blessings{}, nil, verror.New(stream.ErrSecurity, nil, err)
}
}
@@ -81,65 +87,68 @@
principal := params.LocalPrincipal
client, err := principal.BlessSelf("vcauth")
if err != nil {
- return security.Blessings{}, security.Blessings{}, nil, fmt.Errorf("failed to created self blessing: %v", err)
+ return security.Blessings{}, security.Blessings{}, nil, verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateSelfBlessing, nil, err))
}
- if err := writeBlessings(conn, authClientContextTag, crypter, principal, client, nil, v); err != nil {
- return security.Blessings{}, security.Blessings{}, nil, err
+ if errID, err := writeBlessings(conn, authClientContextTag, crypter, principal, client, nil, v); err != nil {
+ return security.Blessings{}, security.Blessings{}, nil, verror.New(errID, nil, err)
}
return server, client, serverDischarges, nil
}
-func writeBlessings(w io.Writer, tag []byte, crypter crypto.Crypter, p security.Principal, b security.Blessings, discharges []security.Discharge, v version.RPCVersion) error {
+func writeBlessings(w io.Writer, tag []byte, crypter crypto.Crypter, p security.Principal, b security.Blessings, discharges []security.Discharge, v version.RPCVersion) (verror.IDAction, error) {
signature, err := p.Sign(append(tag, crypter.ChannelBinding()...))
if err != nil {
- return err
+ return stream.ErrSecurity, err
}
var buf bytes.Buffer
enc, err := vom.NewEncoder(&buf)
if err != nil {
- return err
+ return stream.ErrNetwork, verror.New(errVomEncoder, nil, err)
}
if err := enc.Encode(signature); err != nil {
- return err
+ return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
}
if err := enc.Encode(b); err != nil {
- return err
+ return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
}
if v >= version.RPCVersion5 {
if err := enc.Encode(discharges); err != nil {
- return err
+ return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
}
}
msg, err := crypter.Encrypt(iobuf.NewSlice(buf.Bytes()))
if err != nil {
- return err
+ return verror.IDAction{ID: verror.ErrorID(err)}, verror.New(errEncryptBlessing, nil, err)
}
defer msg.Release()
enc, err = vom.NewEncoder(w)
if err != nil {
- return err
+ return stream.ErrNetwork, verror.New(errVomEncoder, nil, err)
}
- return enc.Encode(msg.Contents)
+ if err := enc.Encode(msg.Contents); err != nil {
+ return stream.ErrNetwork, verror.New(errVomEncodeBlessing, nil, err)
+ }
+ return verror.IDAction{}, nil
}
-func readBlessings(r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
+func readBlessings(r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, verror.IDAction, error) {
var msg []byte
var noBlessings security.Blessings
dec, err := vom.NewDecoder(r)
if err != nil {
- return noBlessings, nil, fmt.Errorf("failed to create new decoder: %v", err)
+ return noBlessings, nil, stream.ErrNetwork, verror.New(errVomDecoder, nil, err)
}
if err := dec.Decode(&msg); err != nil {
- return noBlessings, nil, fmt.Errorf("failed to read handshake message: %v", err)
+ return noBlessings, nil, stream.ErrNetwork, verror.New(errHandshakeMessage, nil, err)
}
buf, err := crypter.Decrypt(iobuf.NewSlice(msg))
if err != nil {
- return noBlessings, nil, err
+ return noBlessings, nil, verror.IDAction{ID: verror.ErrorID(err)}, err
}
defer buf.Release()
dec, err = vom.NewDecoder(bytes.NewReader(buf.Contents))
if err != nil {
- return noBlessings, nil, fmt.Errorf("failed to create new decoder: %v", err)
+ return noBlessings, nil, stream.ErrNetwork, verror.New(errVomDecoder, nil, err)
}
var (
@@ -147,21 +156,21 @@
sig security.Signature
)
if err = dec.Decode(&sig); err != nil {
- return noBlessings, nil, err
+ return noBlessings, nil, stream.ErrNetwork, err
}
if err = dec.Decode(&blessings); err != nil {
- return noBlessings, nil, err
+ return noBlessings, nil, stream.ErrNetwork, err
}
var discharges []security.Discharge
if v >= version.RPCVersion5 {
if err := dec.Decode(&discharges); err != nil {
- return noBlessings, nil, err
+ return noBlessings, nil, stream.ErrNetwork, err
}
}
if !sig.Verify(blessings.PublicKey(), append(tag, crypter.ChannelBinding()...)) {
- return noBlessings, nil, errInvalidSignatureInMessage
+ return noBlessings, nil, stream.ErrSecurity, verror.New(errInvalidSignatureInMessage, nil)
}
- return blessings, mkDischargeMap(discharges), nil
+ return blessings, mkDischargeMap(discharges), verror.IDAction{}, nil
}
func mkDischargeMap(discharges []security.Discharge) map[string]security.Discharge {
diff --git a/profiles/internal/rpc/stream/vc/listener.go b/profiles/internal/rpc/stream/vc/listener.go
index c4b007b..991d776 100644
--- a/profiles/internal/rpc/stream/vc/listener.go
+++ b/profiles/internal/rpc/stream/vc/listener.go
@@ -5,13 +5,20 @@
package vc
import (
- "errors"
+ "v.io/v23/verror"
"v.io/x/ref/profiles/internal/lib/upcqueue"
"v.io/x/ref/profiles/internal/rpc/stream"
)
-var errListenerClosed = errors.New("Listener has been closed")
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errListenerClosed = reg(".errListenerClosed", "Listener has been closed")
+ errGetFromQueue = reg(".errGetFromQueue", "upcqueue.Get failed{:3}")
+)
type listener struct {
q *upcqueue.T
@@ -24,7 +31,7 @@
func (l *listener) Enqueue(f stream.Flow) error {
err := l.q.Put(f)
if err == upcqueue.ErrQueueIsClosed {
- return errListenerClosed
+ return verror.New(stream.ErrBadState, nil, verror.New(errListenerClosed, nil))
}
return err
}
@@ -32,10 +39,10 @@
func (l *listener) Accept() (stream.Flow, error) {
item, err := l.q.Get(nil)
if err == upcqueue.ErrQueueIsClosed {
- return nil, errListenerClosed
+ return nil, verror.New(stream.ErrBadState, nil, verror.New(errListenerClosed, nil))
}
if err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errGetFromQueue, nil, err))
}
return item.(stream.Flow), nil
}
diff --git a/profiles/internal/rpc/stream/vc/listener_test.go b/profiles/internal/rpc/stream/vc/listener_test.go
index 6ddc5cf..1aa4899 100644
--- a/profiles/internal/rpc/stream/vc/listener_test.go
+++ b/profiles/internal/rpc/stream/vc/listener_test.go
@@ -5,10 +5,13 @@
package vc
import (
+ "strings"
"testing"
"v.io/v23/naming"
"v.io/v23/security"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/rpc/stream"
)
@@ -56,10 +59,10 @@
if err := ln.Close(); err != nil {
t.Error(err)
}
- if err := ln.Enqueue(f1); err != errListenerClosed {
+ if err := ln.Enqueue(f1); verror.ErrorID(err) != stream.ErrBadState.ID || !strings.Contains(err.Error(), "closed") {
t.Error(err)
}
- if f, err := ln.Accept(); f != nil || err != errListenerClosed {
+ if f, err := ln.Accept(); f != nil || verror.ErrorID(err) != stream.ErrBadState.ID || !strings.Contains(err.Error(), "closed") {
t.Errorf("Accept returned (%p, %v) wanted (nil, %v)", f, err, errListenerClosed)
}
}
diff --git a/profiles/internal/rpc/stream/vc/reader.go b/profiles/internal/rpc/stream/vc/reader.go
index be778fb..c90dbdd 100644
--- a/profiles/internal/rpc/stream/vc/reader.go
+++ b/profiles/internal/rpc/stream/vc/reader.go
@@ -5,14 +5,24 @@
package vc
import (
- "fmt"
"io"
"sync"
"sync/atomic"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/iobuf"
vsync "v.io/x/ref/profiles/internal/lib/sync"
"v.io/x/ref/profiles/internal/lib/upcqueue"
+ "v.io/x/ref/profiles/internal/rpc/stream"
+)
+
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errGetFailed = reg(".errGetFailed", "upcqueue.Get failed:{:3}")
)
// readHandler is the interface used by the reader to notify other components
@@ -63,9 +73,9 @@
return 0, io.EOF
case vsync.ErrCanceled:
// As per net.Conn.Read specification
- return 0, timeoutError{}
+ return 0, stream.NewNetError(verror.New(stream.ErrNetwork, nil, verror.New(errCanceled, nil)), true, false)
default:
- return 0, fmt.Errorf("upcqueue.Get failed: %v", err)
+ return 0, verror.New(stream.ErrNetwork, nil, verror.New(errGetFailed, nil, err))
}
}
r.buf = slice.(*iobuf.Slice)
@@ -103,10 +113,3 @@
func (r *reader) Put(slice *iobuf.Slice) error {
return r.src.Put(slice)
}
-
-// timeoutError implements net.Error with Timeout returning true.
-type timeoutError struct{}
-
-func (t timeoutError) Error() string { return "deadline exceeded" }
-func (t timeoutError) Timeout() bool { return true }
-func (t timeoutError) Temporary() bool { return false }
diff --git a/profiles/internal/rpc/stream/vc/vc.go b/profiles/internal/rpc/stream/vc/vc.go
index 3774761..c85342f 100644
--- a/profiles/internal/rpc/stream/vc/vc.go
+++ b/profiles/internal/rpc/stream/vc/vc.go
@@ -9,7 +9,6 @@
// Verbosity level 2 is for per-Flow messages.
import (
- "errors"
"fmt"
"io"
"sort"
@@ -21,6 +20,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
+ "v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/lib/vlog"
@@ -32,10 +32,39 @@
"v.io/x/ref/profiles/internal/rpc/stream/id"
)
+const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/vc"
+
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
var (
- errAlreadyListening = errors.New("Listen has already been called")
- errDuplicateFlow = errors.New("duplicate OpenFlow message")
- errUnrecognizedFlow = errors.New("unrecognized flow")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errAlreadyListening = reg(".errAlreadyListening", "Listen has already been called")
+ errDuplicateFlow = reg(".errDuplicateFlow", "duplicate OpenFlow message")
+ errUnrecognizedFlow = reg(".errUnrecognizedFlow", "unrecognized flow")
+ errFailedToCreateWriterForFlow = reg(".errFailedToCreateWriterForFlow", "failed to create writer for Flow{:3}")
+ errConnectOnClosedVC = reg(".errConnectOnClosedVC", "connect on closed VC{:3}")
+ errFailedToDecryptPayload = reg(".errFailedToDecryptPayload", "failed to decrypt payload{:3}")
+ errIgnoringMessageOnClosedVC = reg(".errIgnoringMessageOnClosedVC", "ignoring message for Flow {3} on closed VC {4}")
+ errVomTypedDecoder = reg(".errVomDecoder", "failed to create typed vom decoder{:3}")
+ errVomTypedEncoder = reg(".errVomEncoder", "failed to create vom typed encoder{:3}")
+ errFailedToCreateFlowForWireType = reg(".errFailedToCreateFlowForWireType", "fail to create a Flow for wire type{:3}")
+ errFlowForWireTypeNotAccepted = reg(".errFlowForWireTypeNotAccepted", "Flow for wire type not accepted{:3}")
+ errFailedToCreateTLSFlow = reg(".errFailedToCreateTLSFlow", "failed to create a Flow for setting up TLS{3:}")
+ errFailedToSetupTLS = reg(".errFailedToSetupTLS", "failed to setup TLS{:3}")
+ errFailedToCreateFlowForAuth = reg(".errFailedToCreateFlowForAuth", "failed to create a Flow for authentication{:3}")
+ errAuthFailed = reg(".errAuthFailed", "authentication failed{:3}")
+ errFailedToConnectSystemFlows = reg(".errFailedToConnectSystemFlows", "failed to connect system flows{:3}")
+ errNoActiveListener = reg(".errNoActiveListener", "no active listener on VCI {3}")
+ errFailedToCreateWriterForNewFlow = reg(".errFailedToCreateWriterForNewFlow", "failed to create writer for new flow({3}){:4}")
+ errFailedToEnqueueFlow = reg(".errFailedToEnqueueFlow", "failed to enqueue flow at listener{:3}")
+ errTLSFlowNotAccepted = reg(".errTLSFlowNotAccepted", "TLS handshake Flow not accepted{:3}")
+ errAuthFlowNotAccepted = reg(".errAuthFlowNotAccepted", "authentication Flow not accepted{:3}")
+ errFailedToAcceptSystemFlows = reg(".errFailedToAcceptSystemFlows", "failed to accept system flows{:3}")
)
// DischargeExpiryBuffer specifies how much before discharge expiration we should
@@ -78,7 +107,7 @@
nextConnectFID id.Flow
listener *listener // non-nil iff Listen has been called and the VC has not been closed.
crypter crypto.Crypter
- closeReason string // reason why the VC was closed
+ closeReason error // reason why the VC was closed, possibly nil
closeCh chan struct{}
closed bool
@@ -219,7 +248,7 @@
func (vc *VC) connectFID(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.Flow, error) {
writer, err := vc.newWriter(fid, priority)
if err != nil {
- return nil, fmt.Errorf("failed to create writer for Flow: %v", err)
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errFailedToCreateWriterForFlow, nil, err))
}
f := &flow{
backingVC: vc,
@@ -230,7 +259,7 @@
if vc.flowMap == nil {
vc.mu.Unlock()
f.Shutdown()
- return nil, fmt.Errorf("Connect on closed VC(%q)", vc.closeReason)
+ return nil, verror.New(stream.ErrNetwork, nil, verror.New(errConnectOnClosedVC, nil, vc.closeReason))
}
vc.flowMap[fid] = f
vc.mu.Unlock()
@@ -244,7 +273,7 @@
vc.mu.Lock()
defer vc.mu.Unlock()
if vc.listener != nil {
- return nil, errAlreadyListening
+ return nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyListening, nil))
}
vc.listener = newListener()
return vc.listener, nil
@@ -264,7 +293,7 @@
if vc.flowMap == nil {
vc.mu.Unlock()
payload.Release()
- return fmt.Errorf("ignoring message for Flow %d on closed VC %d", fid, vc.VCI())
+ return verror.New(stream.ErrNetwork, nil, verror.New(errIgnoringMessageOnClosedVC, nil, fid, vc.VCI()))
}
// TLS decryption is stateful, so even if the message will be discarded
// because of other checks further down in this method, go through with
@@ -274,7 +303,7 @@
var err error
if payload, err = vc.crypter.Decrypt(payload); err != nil {
vc.mu.Unlock()
- return fmt.Errorf("failed to decrypt payload: %v", err)
+ return verror.New(stream.ErrSecurity, nil, verror.New(errFailedToDecryptPayload, nil, err))
}
}
if payload.Size() == 0 {
@@ -286,12 +315,12 @@
if f == nil {
vc.mu.Unlock()
payload.Release()
- return errUnrecognizedFlow
+ return verror.New(stream.ErrNetwork, nil, verror.New(errDuplicateFlow, nil))
}
vc.mu.Unlock()
if err := f.reader.Put(payload); err != nil {
payload.Release()
- return err
+ return verror.New(stream.ErrNetwork, nil, err)
}
return nil
}
@@ -303,10 +332,10 @@
vc.mu.Lock()
defer vc.mu.Unlock()
if vc.listener == nil {
- return fmt.Errorf("no active listener on VCI %d", vc.vci)
+ return verror.New(stream.ErrBadState, nil, vc.vci)
}
if _, exists := vc.flowMap[fid]; exists {
- return errDuplicateFlow
+ return verror.New(stream.ErrNetwork, nil, verror.New(errDuplicateFlow, nil))
}
priority := normalFlowPriority
// We use the same high priority for all reserved flows including handshake and
@@ -318,7 +347,7 @@
}
writer, err := vc.newWriter(fid, priority)
if err != nil {
- return fmt.Errorf("failed to create writer for new flow(%d): %v", fid, err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errFailedToCreateWriterForNewFlow, nil, fid, err))
}
f := &flow{
backingVC: vc,
@@ -327,7 +356,7 @@
}
if err = vc.listener.Enqueue(f); err != nil {
f.Shutdown()
- return fmt.Errorf("failed to enqueue flow at listener: %v", err)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errFailedToEnqueueFlow, nil, err))
}
vc.flowMap[fid] = f
// New flow accepted, notify remote end that it can send over data.
@@ -375,7 +404,7 @@
// Close closes the VC and all flows on it, allowing any pending writes in the
// flow to drain.
-func (vc *VC) Close(reason string) error {
+func (vc *VC) Close(reason error) error {
vlog.VI(1).Infof("Closing VC %v. Reason:%q", vc, reason)
vc.mu.Lock()
if vc.closed {
@@ -405,8 +434,8 @@
func (vc *VC) err(err error) error {
vc.mu.Lock()
defer vc.mu.Unlock()
- if vc.closeReason != "" {
- return errors.New(vc.closeReason)
+ if vc.closeReason != nil {
+ return vc.closeReason
}
return err
}
@@ -436,11 +465,11 @@
// Establish TLS
handshakeConn, err := vc.connectFID(HandshakeFlowID, systemFlowPriority)
if err != nil {
- return vc.err(fmt.Errorf("failed to create a Flow for setting up TLS: %v", err))
+ return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateTLSFlow, nil, err)))
}
crypter, err := crypto.NewTLSClient(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), tlsSessionCache, vc.pool)
if err != nil {
- return vc.err(fmt.Errorf("failed to setup TLS: %v", err))
+ return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToSetupTLS, nil, err)))
}
// Authenticate (exchange identities)
@@ -454,7 +483,7 @@
// stream API.
authConn, err := vc.connectFID(AuthFlowID, systemFlowPriority)
if err != nil {
- return vc.err(fmt.Errorf("failed to create a Flow for authentication: %v", err))
+ return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForAuth, nil, err)))
}
params := security.CallParams{
LocalPrincipal: principal,
@@ -465,7 +494,7 @@
if err != nil || len(rBlessings.ThirdPartyCaveats()) == 0 {
authConn.Close()
if err != nil {
- return vc.err(fmt.Errorf("authentication failed: %v", err))
+ return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errAuthFailed, nil, err)))
}
} else {
go vc.recvDischargesLoop(authConn)
@@ -483,7 +512,7 @@
// Open system flows.
if err = vc.connectSystemFlows(); err != nil {
- return vc.err(fmt.Errorf("failed to connect system flows: %v", err))
+ return vc.err(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToConnectSystemFlows, nil, err)))
}
vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
@@ -552,7 +581,7 @@
// the identity exchange protocol.
handshakeConn, err := ln.Accept()
if err != nil {
- sendErr(fmt.Errorf("TLS handshake Flow not accepted: %v", err))
+ sendErr(verror.New(stream.ErrNetwork, nil, verror.New(errTLSFlowNotAccepted, nil, err)))
return
}
vc.mu.Lock()
@@ -563,14 +592,14 @@
// Establish TLS
crypter, err := crypto.NewTLSServer(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), vc.pool)
if err != nil {
- sendErr(fmt.Errorf("failed to setup TLS: %v", err))
+ sendErr(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToSetupTLS, nil, err)))
return
}
// Authenticate (exchange identities)
authConn, err := ln.Accept()
if err != nil {
- sendErr(fmt.Errorf("Authentication Flow not accepted: %v", err))
+ sendErr(verror.New(stream.ErrNetwork, nil, verror.New(errAuthFlowNotAccepted, nil, err)))
return
}
vc.mu.Lock()
@@ -580,7 +609,7 @@
rBlessings, lDischarges, err := AuthenticateAsServer(authConn, principal, lBlessings, dischargeClient, crypter, vc.version)
if err != nil {
authConn.Close()
- sendErr(fmt.Errorf("authentication failed: %v", err))
+ sendErr(verror.New(stream.ErrSecurity, nil, verror.New(errAuthFailed, nil, err)))
return
}
@@ -602,7 +631,7 @@
// Accept system flows.
if err = vc.acceptSystemFlows(ln); err != nil {
- sendErr(fmt.Errorf("failed to accept system flows: %v", err))
+ sendErr(verror.New(stream.ErrNetwork, nil, verror.New(errFailedToAcceptSystemFlows, nil, err)))
}
vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
@@ -702,18 +731,18 @@
}
conn, err := vc.connectFID(TypeFlowID, systemFlowPriority)
if err != nil {
- return fmt.Errorf("fail to create a Flow for wire type: %v", err)
+ return verror.New(errFailedToCreateFlowForWireType, nil, err)
}
typeEnc, err := vom.NewTypeEncoder(conn)
if err != nil {
conn.Close()
- return fmt.Errorf("failed to create type encoder: %v", err)
+ return verror.New(errVomTypedEncoder, nil, err)
}
vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
typeDec, err := vom.NewTypeDecoder(conn)
if err != nil {
conn.Close()
- return fmt.Errorf("failed to create type decoder: %v", err)
+ return verror.New(errVomTypedDecoder, nil, err)
}
vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
return nil
@@ -725,18 +754,18 @@
}
conn, err := ln.Accept()
if err != nil {
- return fmt.Errorf("Flow for wire type not accepted: %v", err)
+ return verror.New(errFlowForWireTypeNotAccepted, nil, err)
}
typeDec, err := vom.NewTypeDecoder(conn)
if err != nil {
conn.Close()
- return fmt.Errorf("failed to create type decoder: %v", err)
+ return verror.New(errVomTypedDecoder, nil, err)
}
vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
typeEnc, err := vom.NewTypeEncoder(conn)
if err != nil {
conn.Close()
- return fmt.Errorf("failed to create type encoder: %v", err)
+ return verror.New(errVomTypedEncoder, nil, err)
}
vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
return nil
diff --git a/profiles/internal/rpc/stream/vc/vc_test.go b/profiles/internal/rpc/stream/vc/vc_test.go
index 6bc422d..649d671 100644
--- a/profiles/internal/rpc/stream/vc/vc_test.go
+++ b/profiles/internal/rpc/stream/vc/vc_test.go
@@ -409,7 +409,7 @@
t.Fatal(err)
}
defer h.Close()
- h.VC.Close("reason")
+ h.VC.Close(fmt.Errorf("reason"))
if err := h.VC.AcceptFlow(id.Flow(10)); err == nil {
t.Fatalf("New flows should not be accepted once the VC is closed")
}
@@ -423,7 +423,7 @@
t.Fatal(err)
}
defer h.Close()
- h.VC.Close("myerr")
+ h.VC.Close(fmt.Errorf("myerr"))
if f, err := vc.Connect(); f != nil || err == nil || !strings.Contains(err.Error(), "myerr") {
t.Fatalf("Got (%v, %v), want (nil, %q)", f, err, "myerr")
}
@@ -594,7 +594,7 @@
}
func (h *helper) Close() {
- h.VC.Close("helper closed")
+ h.VC.Close(fmt.Errorf("helper closed"))
h.bq.Close()
h.mu.Lock()
otherEnd := h.otherEnd
diff --git a/profiles/internal/rpc/stream/vc/writer.go b/profiles/internal/rpc/stream/vc/writer.go
index 0e04985..ab4ccfc 100644
--- a/profiles/internal/rpc/stream/vc/writer.go
+++ b/profiles/internal/rpc/stream/vc/writer.go
@@ -5,18 +5,28 @@
package vc
import (
- "errors"
- "fmt"
"io"
"sync"
"sync/atomic"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/bqueue"
"v.io/x/ref/profiles/internal/lib/iobuf"
vsync "v.io/x/ref/profiles/internal/lib/sync"
+ "v.io/x/ref/profiles/internal/rpc/stream"
)
-var errWriterClosed = errors.New("attempt to call Write on Flow that has been Closed")
+var (
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errWriterClosed = reg(".errWriterClosed", "attempt to call Write on Flow that has been Closed")
+ errBQueuePutFailed = reg(".errBqueuePutFailed", "bqueue.Writer.Put failed{:3}")
+ errFailedToGetQuota = reg(".errFailedToGetQuota", "failed to get quota from receive buffers shared by all new flows on a VC{:3}")
+ errCanceled = reg(".errCanceled", "underlying queues canceled")
+)
// writer implements the io.Writer and SetWriteDeadline interfaces for Flow.
type writer struct {
@@ -48,7 +58,7 @@
Alloc: alloc,
SharedCounters: counters,
closed: make(chan struct{}),
- closeError: errWriterClosed,
+ closeError: verror.New(errWriterClosed, nil),
}
}
@@ -114,7 +124,10 @@
w.mu.Lock()
defer w.mu.Unlock()
if w.isClosed {
- return 0, w.closeError
+ if w.closeError == io.EOF {
+ return 0, io.EOF
+ }
+ return 0, verror.New(stream.ErrBadState, nil, w.closeError)
}
for len(b) > 0 {
@@ -129,9 +142,9 @@
}
if err := w.SharedCounters.DecN(uint(n), w.deadline); err != nil {
if err == vsync.ErrCanceled {
- return 0, timeoutError{}
+ return 0, stream.NewNetError(verror.New(stream.ErrNetwork, nil, verror.New(errCanceled, nil)), true, false)
}
- return 0, fmt.Errorf("failed to get quota from receive buffers shared by all new flows on a VC: %v", err)
+ return 0, verror.New(stream.ErrNetwork, nil, verror.New(errFailedToGetQuota, nil, err))
}
w.muSharedCountersBorrowed.Lock()
w.sharedCountersBorrowed = n
@@ -144,11 +157,11 @@
atomic.AddUint32(&w.totalBytes, uint32(written))
switch err {
case bqueue.ErrCancelled, vsync.ErrCanceled:
- return written, timeoutError{}
+ return written, stream.NewNetError(verror.New(stream.ErrNetwork, nil, verror.New(errCanceled, nil)), true, false)
case bqueue.ErrWriterIsClosed:
- return written, w.closeError
+ return written, verror.New(stream.ErrBadState, nil, verror.New(errWriterClosed, nil))
default:
- return written, fmt.Errorf("bqueue.Writer.Put failed: %v", err)
+ return written, verror.New(stream.ErrNetwork, nil, verror.New(errBQueuePutFailed, nil, err))
}
}
written += n
diff --git a/profiles/internal/rpc/stream/vc/writer_test.go b/profiles/internal/rpc/stream/vc/writer_test.go
index 5b318a0..eb7018a 100644
--- a/profiles/internal/rpc/stream/vc/writer_test.go
+++ b/profiles/internal/rpc/stream/vc/writer_test.go
@@ -11,10 +11,13 @@
"reflect"
"testing"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/bqueue"
"v.io/x/ref/profiles/internal/lib/bqueue/drrqueue"
"v.io/x/ref/profiles/internal/lib/iobuf"
"v.io/x/ref/profiles/internal/lib/sync"
+ "v.io/x/ref/profiles/internal/rpc/stream"
)
// TestWrite is a very basic, easy to follow, but not very thorough test of the
@@ -95,8 +98,8 @@
w := newTestWriter(bw, shared)
w.Close()
- if n, err := w.Write([]byte{1, 2}); n != 0 || err != errWriterClosed {
- t.Errorf("Got (%v, %v) want (0, %v)", n, err, errWriterClosed)
+ if n, err := w.Write([]byte{1, 2}); n != 0 || verror.ErrorID(err) != stream.ErrBadState.ID {
+ t.Errorf("Got (%v, %v) want (0, %v)", n, err, stream.ErrBadState)
}
}
@@ -204,8 +207,8 @@
go w.Close()
<-w.Closed()
- if n, err := w.Write([]byte{1, 2}); n != 0 || err != errWriterClosed {
- t.Errorf("Got (%v, %v) want (0, %v)", n, err, errWriterClosed)
+ if n, err := w.Write([]byte{1, 2}); n != 0 || verror.ErrorID(err) != stream.ErrBadState.ID {
+ t.Errorf("Got (%v, %v) want (0, %v)", n, err, stream.ErrBadState.ID)
}
}
diff --git a/profiles/internal/rpc/stream/vif/auth.go b/profiles/internal/rpc/stream/vif/auth.go
index ff45990..4e273e7 100644
--- a/profiles/internal/rpc/stream/vif/auth.go
+++ b/profiles/internal/rpc/stream/vif/auth.go
@@ -6,14 +6,14 @@
import (
"crypto/rand"
- "errors"
- "fmt"
"io"
"golang.org/x/crypto/nacl/box"
rpcversion "v.io/v23/rpc/version"
"v.io/v23/security"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/iobuf"
"v.io/x/ref/profiles/internal/rpc/stream"
"v.io/x/ref/profiles/internal/rpc/stream/crypto"
@@ -23,9 +23,15 @@
)
var (
- errUnsupportedEncryptVersion = errors.New("unsupported encryption version")
- errVersionNegotiationFailed = errors.New("encryption version negotiation failed")
- nullCipher crypto.NullControlCipher
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errAuthFailed = reg(".errAuthFailed", "authentication failed{:3}")
+ errUnsupportedEncryptVersion = reg(".errUnsupportedEncryptVersion", "unsupported encryption version {4} < {5}")
+ errNaclBoxVersionNegotiationFailed = reg(".errNaclBoxVersionNegotiationFailed", "nacl box encryption version negotiation failed")
+ errVersionNegotiationFailed = reg(".errVersionNegotiationFailed", "encryption version negotiation failed")
+ nullCipher crypto.NullControlCipher
)
// privateData includes secret data we need for encryption.
@@ -94,7 +100,7 @@
}
ppub, ok := pmsg.(*message.HopSetup)
if !ok {
- return nil, errVersionNegotiationFailed
+ return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
}
// Choose the max version in the intersection.
@@ -114,18 +120,18 @@
func authenticateAsClient(writer io.Writer, reader *iobuf.Reader, params security.CallParams, auth *vc.ServerAuthorizer,
pvt *privateData, pub, ppub *message.HopSetup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
if version < rpcversion.RPCVersion6 {
- return nil, errUnsupportedEncryptVersion
+ return nil, verror.New(errUnsupportedEncryptVersion, nil, version, rpcversion.RPCVersion6)
}
pbox := ppub.NaclBox()
if pbox == nil {
- return nil, errVersionNegotiationFailed
+ return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
}
c := crypto.NewControlCipherRPC6(&pbox.PublicKey, &pvt.naclBoxPrivateKey, false)
sconn := newSetupConn(writer, reader, c)
// TODO(jyh): act upon the authentication results.
_, _, _, err := vc.AuthenticateAsClient(sconn, crypto.NewNullCrypter(), params, auth, version)
if err != nil {
- return nil, fmt.Errorf("authentication failed: %v", err)
+ return nil, verror.New(errAuthFailed, nil, err)
}
return c, nil
}
@@ -174,18 +180,18 @@
func authenticateAsServerRPC6(writer io.Writer, reader *iobuf.Reader, principal security.Principal, lBlessings security.Blessings, dc vc.DischargeClient,
pvt *privateData, pub, ppub *message.HopSetup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
if version < rpcversion.RPCVersion6 {
- return nil, errUnsupportedEncryptVersion
+ return nil, verror.New(errUnsupportedEncryptVersion, nil, version, rpcversion.RPCVersion6)
}
box := ppub.NaclBox()
if box == nil {
- return nil, errVersionNegotiationFailed
+ return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
}
c := crypto.NewControlCipherRPC6(&box.PublicKey, &pvt.naclBoxPrivateKey, true)
sconn := newSetupConn(writer, reader, c)
// TODO(jyh): act upon authentication results.
_, _, err := vc.AuthenticateAsServer(sconn, principal, lBlessings, dc, crypto.NewNullCrypter(), version)
if err != nil {
- return nil, fmt.Errorf("authentication failed: %v", err)
+ return nil, verror.New(errAuthFailed, nil, err)
}
return c, nil
}
diff --git a/profiles/internal/rpc/stream/vif/setup_conn.go b/profiles/internal/rpc/stream/vif/setup_conn.go
index 9287799..06d71a6 100644
--- a/profiles/internal/rpc/stream/vif/setup_conn.go
+++ b/profiles/internal/rpc/stream/vif/setup_conn.go
@@ -7,7 +7,10 @@
import (
"io"
+ "v.io/v23/verror"
+
"v.io/x/ref/profiles/internal/lib/iobuf"
+ "v.io/x/ref/profiles/internal/rpc/stream"
"v.io/x/ref/profiles/internal/rpc/stream/crypto"
"v.io/x/ref/profiles/internal/rpc/stream/message"
)
@@ -37,7 +40,7 @@
}
emsg, ok := msg.(*message.HopSetupStream)
if !ok {
- return 0, errVersionNegotiationFailed
+ return 0, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
}
s.rbuffer = emsg.Data
}
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index 670721d..85798a2 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -11,7 +11,6 @@
import (
"bytes"
- "errors"
"fmt"
"net"
"sort"
@@ -42,8 +41,31 @@
const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/vif"
+func reg(id, msg string) verror.IDAction {
+ return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
var (
- errShuttingDown = verror.Register(pkgPath+".errShuttingDown", verror.NoRetry, "{1:}{2:} underlying network connection({3}) shutting down{:_}")
+ // These errors are intended to be used as arguments to higher
+ // level errors and hence {1}{2} is omitted from their format
+ // strings to avoid repeating these n-times in the final error
+ // message visible to the user.
+ errShuttingDown = reg(".errShuttingDown", "underlying network connection({3}) shutting down")
+ errVCHandshakeFailed = reg(".errVCHandshakeFailed", "VC handshake failed{:3}")
+ errSendOnExpressQFailed = reg(".errSendOnExpressQFailed", "vif.sendOnExpressQ(OpenVC) failed{:3}")
+ errVIFIsBeingClosed = reg(".errVIFIsBeingClosed", "VIF is being closed")
+ errVIFAlreadyAcceptingFlows = reg(".errVIFAlreadyAcceptingFlows", "already accepting flows on VIF {3}")
+ errVCsNotAcceptedOnVIF = reg(".errVCsNotAcceptedOnVIF", "VCs not accepted on VIF {3}")
+ errAcceptFailed = reg(".errAcceptFailed", "Accept failed{:3}")
+ errRemoteEndClosedVC = reg(".errRemoteEndClosedVC", "remote end closed VC{:3}")
+ errFlowsNoLongerAccepted = reg(".errFlowsNowLongerAccepted", "Flows no longer being accepted")
+ errVCAcceptFailed = reg(".errVCAcceptFailed", "VC accept failed{:3}")
+ errIdleTimeout = reg(".errIdleTimeout", "idle timeout")
+ errVIFAlreadySetup = reg(".errVIFAlreadySetupt", "VIF is already setup")
+ errBqueueWriterForXpress = reg(".errBqueueWriterForXpress", "failed to create bqueue.Writer for express messages{:3}")
+ errBqueueWriterForControl = reg(".errBqueueWriterForControl", "failed to create bqueue.Writer for flow control counters{:3}")
+ errBqueueWriterForStopping = reg(".errBqueueWriterForStopping", "failed to create bqueue.Writer for stopping the write loop{:3}")
+ errWriteFailed = reg(".errWriteFailed", "write failed: got ({3}, {4}) for {5} byte message)")
)
// VIF implements a "virtual interface" over an underlying network connection
@@ -131,10 +153,6 @@
sharedFlowID = vc.SharedFlowID
)
-var (
- errAlreadySetup = errors.New("VIF is already setup")
-)
-
// InternalNewDialedVIF creates a new virtual interface over the provided
// network connection, under the assumption that the conn object was created
// using net.Dial. If onClose is given, it is run in its own goroutine when
@@ -201,19 +219,19 @@
expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow)
if err != nil {
- return nil, fmt.Errorf("failed to create bqueue.Writer for express messages: %v", err)
+ return nil, verror.New(errBqueueWriterForXpress, nil, err)
}
expressQ.Release(-1) // Disable flow control
flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size())
if err != nil {
- return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err)
+ return nil, verror.New(errBqueueWriterForControl, nil, err)
}
flowQ.Release(-1) // Disable flow control
stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1)
if err != nil {
- return nil, fmt.Errorf("failed to create bqueue.Writer for stopping the write loop: %v", err)
+ return nil, verror.New(errBqueueWriterForStopping, nil, err)
}
stopQ.Release(-1) // Disable flow control
@@ -241,7 +259,7 @@
vif.idleTimerMap = newIdleTimerMap(func(vci id.VC) {
vc, _, _ := vif.vcMap.Find(vci)
if vc != nil {
- vif.closeVCAndSendMsg(vc, "idle timeout")
+ vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
}
})
go vif.readLoop()
@@ -281,15 +299,15 @@
Counters: counters})
if err != nil {
vif.deleteVC(vc.VCI())
- err = fmt.Errorf("vif.sendOnExpressQ(OpenVC) failed: %v", err)
- vc.Close(err.Error())
+ err = verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
+ vc.Close(err)
return nil, err
}
if err := vc.HandshakeDialedVC(principal, opts...); err != nil {
vif.deleteVC(vc.VCI())
- err = fmt.Errorf("VC handshake failed: %v", err)
- vc.Close(err.Error())
- return nil, err
+ verr := verror.New(stream.ErrSecurity, nil, verror.New(errVCHandshakeFailed, nil, err))
+ vc.Close(verr)
+ return nil, verr
}
return vc, nil
}
@@ -343,7 +361,7 @@
// Stop the idle timers.
vif.idleTimerMap.Stop()
for _, vc := range vcs {
- vc.VC.Close("VIF is being closed")
+ vc.VC.Close(verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil)))
}
// Wait for the vcWriteLoops to exit (after draining queued up messages).
vif.stopQ.Close()
@@ -367,7 +385,7 @@
vif.muListen.Lock()
defer vif.muListen.Unlock()
if vif.acceptor != nil {
- return fmt.Errorf("already accepting Flows on VIF %v", vif)
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil, vif))
}
vif.acceptor = upcqueue.New()
vif.listenerOpts = opts
@@ -409,11 +427,11 @@
acceptor := vif.acceptor
vif.muListen.Unlock()
if acceptor == nil {
- return ConnectorAndFlow{}, fmt.Errorf("VCs not accepted on VIF %v", vif)
+ return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errVCsNotAcceptedOnVIF, nil, vif))
}
item, err := acceptor.Get(nil)
if err != nil {
- return ConnectorAndFlow{}, fmt.Errorf("Accept failed: %v", err)
+ return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
}
return item.(ConnectorAndFlow), nil
}
@@ -510,7 +528,7 @@
// to indicate a 'remote close' rather than a 'local one'. This helps
// with error reporting since we expect reads/writes to occur
// after a remote close, but not after a local close.
- vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error))
+ vc.Close(verror.New(stream.ErrNetwork, nil, verror.New(errRemoteEndClosedVC, nil, m.Error)))
return nil
}
vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
@@ -532,7 +550,7 @@
case *message.HopSetup:
// Configure the VIF. This takes over the conn during negotiation.
if vif.isSetup {
- return errAlreadySetup
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVIFAlreadySetup, nil))
}
vif.muListen.Lock()
dischargeClient := getDischargeClient(vif.listenerOpts)
@@ -579,10 +597,16 @@
vif.rpending.Wait()
}
+func clientVCClosed(err error) bool {
+ // If we've encountered a networking error, then all likelihood the
+ // connection to the client is closed.
+ return verror.ErrorID(err) == stream.ErrNetwork.ID
+}
+
func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) {
hr := <-c
if hr.Error != nil {
- vif.closeVCAndSendMsg(vc, hr.Error.Error())
+ vif.closeVCAndSendMsg(vc, clientVCClosed(hr.Error), hr.Error)
return
}
@@ -590,13 +614,13 @@
acceptor := vif.acceptor
vif.muListen.Unlock()
if acceptor == nil {
- vif.closeVCAndSendMsg(vc, "Flows no longer being accepted")
+ vif.closeVCAndSendMsg(vc, false, verror.New(errFlowsNoLongerAccepted, nil))
return
}
// Notify any listeners that a new VC has been established
if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil {
- vif.closeVCAndSendMsg(vc, fmt.Sprintf("VC accept failed: %v", err))
+ vif.closeVCAndSendMsg(vc, clientVCClosed(err), verror.New(errVCAcceptFailed, nil, err))
return
}
@@ -766,7 +790,7 @@
return err
}
if n, err := vif.conn.Write(msg); err != nil {
- return fmt.Errorf("write failed: got (%d, %v) for %d byte message", n, err, len(msg))
+ return verror.New(errWriteFailed, nil, n, err, len(msg))
}
return nil
}
@@ -858,11 +882,8 @@
wq.Close()
}
vif.vcMap.Delete(vci)
- vc.Close("underlying network connection shutting down")
- // We embed an error inside verror.ErrAborted because other layers
- // check for the "Aborted" error as a special case. Perhaps
- // eventually we'll get rid of the Aborted layer.
- return nil, verror.New(verror.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
+ vc.Close(verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif)))
+ return nil, verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
}
vif.idleTimerMap.Insert(vc.VCI(), idleTimeout)
return vc, nil
@@ -875,16 +896,18 @@
}
}
-func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, msg string) {
- vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, msg)
+func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, clientVCClosed bool, errMsg error) {
+ vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, errMsg)
vif.deleteVC(vc.VCI())
- vc.Close(msg)
- // HACK: Don't send CloseVC if it is a "failed new decoder" error because that means the
- // client already has closed its VC.
- // TODO(suharshs,ataly,ashankar): Find a better way to fix: https://github.com/veyron/release-issues/issues/1234.
- if strings.Contains(msg, "failed to create new decoder") {
+ vc.Close(errMsg)
+ if clientVCClosed {
+ // No point in sending to the client if the VC is closed, or otherwise broken.
return
}
+ msg := ""
+ if errMsg != nil {
+ msg = errMsg.Error()
+ }
if err := vif.sendOnExpressQ(&message.CloseVC{
VCI: vc.VCI(),
Error: msg,
@@ -910,7 +933,7 @@
for _, vc := range vcs {
if naming.Compare(vc.RemoteEndpoint().RoutingID(), remote.RoutingID()) {
vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif)
- vif.closeVCAndSendMsg(vc, "")
+ vif.closeVCAndSendMsg(vc, false, nil)
n++
}
}
diff --git a/profiles/internal/rpc/stream/vif/vif_test.go b/profiles/internal/rpc/stream/vif/vif_test.go
index a9d2bff..f10245b 100644
--- a/profiles/internal/rpc/stream/vif/vif_test.go
+++ b/profiles/internal/rpc/stream/vif/vif_test.go
@@ -22,6 +22,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
+ "v.io/v23/verror"
"v.io/x/ref/profiles/internal/rpc/stream"
"v.io/x/ref/profiles/internal/rpc/stream/vc"
@@ -336,6 +337,7 @@
// Close the VC. Should be closed.
vf.ShutdownVCs(makeEP(0x10))
if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ t.Logf(verror.DebugString(err))
t.Error(err)
}
@@ -346,6 +348,7 @@
t.Fatal(err)
}
if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ t.Logf(verror.DebugString(err))
t.Error(err)
}
remote.ShutdownVCs(makeEP(0x10))
diff --git a/profiles/internal/rpc/test/proxy_test.go b/profiles/internal/rpc/test/proxy_test.go
index 3fe5e72..bf6ede5 100644
--- a/profiles/internal/rpc/test/proxy_test.go
+++ b/profiles/internal/rpc/test/proxy_test.go
@@ -350,8 +350,15 @@
verifyMountMissing(t, ctx, ns, name)
status = server.Status()
- if len(status.Proxies) != 1 || status.Proxies[0].Proxy != spec.Proxy || verror.ErrorID(status.Proxies[0].Error) != verror.ErrNoServers.ID {
- t.Fatalf("proxy status is incorrect: %v", status.Proxies)
+ if got, want := len(status.Proxies), 1; got != want {
+ t.Logf("Proxies: %v", status.Proxies)
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := status.Proxies[0].Proxy, spec.Proxy; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := verror.ErrorID(status.Proxies[0].Error), verror.ErrNoServers.ID; got != want {
+ t.Fatalf("got %v, want %v", got, want)
}
// Proxy restarts, calls should eventually start succeeding.
diff --git a/test/modules/modules_test.go b/test/modules/modules_test.go
index 79ca03c..4e7f012 100644
--- a/test/modules/modules_test.go
+++ b/test/modules/modules_test.go
@@ -154,7 +154,7 @@
sh.Cleanup(&stdout, &stderr)
want := ""
if testing.Verbose() {
- want = "---- Shell Cleanup ----\n"
+ want = "---- Shell Cleanup ----\n---- Cleanup calling cancelCtx ----\n---- Shell Cleanup Complete ----\n"
}
if got := stdout.String(); got != "" && got != want {
t.Errorf("got %q, want %q", got, want)
diff --git a/test/modules/shell.go b/test/modules/shell.go
index 9d76804..a9b4f21 100644
--- a/test/modules/shell.go
+++ b/test/modules/shell.go
@@ -665,9 +665,9 @@
}
}
- if verbose {
- writeMsg("---- Shell Cleanup ----\n")
- }
+ writeMsg("---- Shell Cleanup ----\n")
+ defer writeMsg("---- Shell Cleanup Complete ----\n")
+
sh.mu.Lock()
handles := make([]Handle, 0, len(sh.lifoHandles))
for _, h := range sh.lifoHandles {
@@ -707,6 +707,7 @@
}
if sh.cancelCtx != nil {
+ writeMsg("---- Cleanup calling cancelCtx ----\n")
// Note(ribrdb, caprita): This will shutdown the agents. If there
// were errors shutting down it is possible there could be child
// processes still running, and stopping the agent may cause