rpc/stream/vif: reuse vif auth in vc
This change allows to reuse VIF authentication in VCs when the peer
is the same as VIF - i.e., non-through-proxy connection.
Details:
o During a VIF setup, peers exchange their endpoint and save VIF
authentication, which can be used in the following VCs.
o During vif.Dial(), re-use the VIF authentication when
- a principal given to vif.InternalNewDialedVIF() and one given to
vif.Dial() are same, and
- two remote endpoints are same or the given endpoint is null
which means a direct connection with hostname and port
- a server verifies a signature for reusing the VIF authentication
that are sent from a client.
o When reusing VIF auth
- a dialed VC creates a new public and private key, but uses
a server's public key which were exchanged during VIF setup.
The VC sends the new public key to the server, but skips
all authentication except running server authorizer.
- a accepted VC creates a new crypter using the VIF's public and
private key pair with a new public key from the client.
The server doesn't send a SetupVC response and skips the
authentication.
RPC Setup Benchmark: (in GCE)
o OLD: RPC Connection 30.27 ms/rpc
o NEW: RPC Connection 14.02 ms/rpc
Agent Benchmark: (in GCE)
benchmark old ns/op new ns/op delta
BenchmarkSignNoAgent-4 936403 932508 -0.42%
BenchmarkSignCachedAgent-4 5309176 1764895 -66.76%
BenchmarkSignUncachedAgent-4 5381638 1758834 -67.32%
BenchmarkDefaultNoAgent-4 137 140 +2.19%
BenchmarkDefaultCachedAgent-4 44.7 45.2 +1.12%
BenchmarkDefaultUncachedAgent-4 7828080 3846960 -50.86%
BenchmarkRecognizedNegativeNoAgent-4 34227 36651 +7.08%
BenchmarkRecognizedNegativeCachedAgent-4 33353 35574 +6.66%
BenchmarkRecognizedNegativeUncachedAgent-4 4291997 944295 -78.00%
BenchmarkRecognizedNoAgent-4 13485 14271 +5.83%
BenchmarkRecognizedCachedAgent-4 13031 13456 +3.26%
BenchmarkRecognizedUncachedAgent-4 4016690 785538 -80.44%
MultiPart: 2/2
Change-Id: Ida0c8f65a7b0083d8b75e59af45202c6276964d6
diff --git a/runtime/internal/rpc/stream/crypto/box.go b/runtime/internal/rpc/stream/crypto/box.go
index eec1bc9..6760026 100644
--- a/runtime/internal/rpc/stream/crypto/box.go
+++ b/runtime/internal/rpc/stream/crypto/box.go
@@ -43,10 +43,15 @@
type BoxKey [32]byte
-// BoxKeyExchanger is used to communicate public keys between the two ends of
-// communication.
+// BoxKeyExchanger is used to communicate public keys between peers.
type BoxKeyExchanger func(myPublicKey *BoxKey) (theirPublicKey *BoxKey, err error)
+// GenerateBoxKey generates a new public/private key pair for BoxCrypter.
+func GenerateBoxKey() (publicKey, privateKey *BoxKey, err error) {
+ pk, sk, err := box.GenerateKey(rand.Reader)
+ return (*BoxKey)(pk), (*BoxKey)(sk), err
+}
+
// NewBoxCrypter uses Curve25519, XSalsa20 and Poly1305 to encrypt and
// authenticate messages (as defined in http://nacl.cr.yp.to/box.html).
// An ephemeral Diffie-Hellman key exchange is performed per invocation
@@ -54,34 +59,36 @@
// granularity. One round-trip is required before any data can be sent.
// BoxCrypter does NOT do anything to verify the identity of the peer.
func NewBoxCrypter(exchange BoxKeyExchanger, pool *iobuf.Pool) (Crypter, error) {
- pk, sk, err := box.GenerateKey(rand.Reader)
+ pk, sk, err := GenerateBoxKey()
if err != nil {
return nil, err
}
-
- theirPK, err := exchange((*BoxKey)(pk))
+ theirPK, err := exchange(pk)
if err != nil {
return nil, err
}
if theirPK == nil {
return nil, verror.New(errRemotePublicKey, nil)
}
+ return NewBoxCrypterWithKey(pk, sk, theirPK, pool), nil
+}
- ret := &boxcrypter{alloc: iobuf.NewAllocator(pool, 0)}
-
- box.Precompute(&ret.sharedKey, (*[32]byte)(theirPK), sk)
+// NewBoxCrypterWithKey is used when public keys have been already exchanged between peers.
+func NewBoxCrypterWithKey(myPublicKey, myPrivateKey, theirPublicKey *BoxKey, pool *iobuf.Pool) Crypter {
+ c := boxcrypter{alloc: iobuf.NewAllocator(pool, 0)}
+ box.Precompute(&c.sharedKey, (*[32]byte)(theirPublicKey), (*[32]byte)(myPrivateKey))
// Distinct messages between the same {sender, receiver} set are required
// to have distinct nonces. The server with the lexicographically smaller
// public key will be sending messages with 0, 2, 4... and the other will
// be using 1, 3, 5...
- if bytes.Compare(pk[:], theirPK[:]) < 0 {
- ret.writeNonce, ret.readNonce = 0, 1
- ret.sortedPubkeys = append(pk[:], theirPK[:]...)
+ if bytes.Compare(myPublicKey[:], theirPublicKey[:]) < 0 {
+ c.writeNonce, c.readNonce = 0, 1
+ c.sortedPubkeys = append(myPublicKey[:], theirPublicKey[:]...)
} else {
- ret.writeNonce, ret.readNonce = 1, 0
- ret.sortedPubkeys = append(theirPK[:], pk[:]...)
+ c.writeNonce, c.readNonce = 1, 0
+ c.sortedPubkeys = append(theirPublicKey[:], myPublicKey[:]...)
}
- return ret, nil
+ return &c
}
func (c *boxcrypter) Encrypt(src *iobuf.Slice) (*iobuf.Slice, error) {
diff --git a/runtime/internal/rpc/stream/crypto/box_cipher.go b/runtime/internal/rpc/stream/crypto/box_cipher.go
index f28757d..45ab98a 100644
--- a/runtime/internal/rpc/stream/crypto/box_cipher.go
+++ b/runtime/internal/rpc/stream/crypto/box_cipher.go
@@ -5,6 +5,7 @@
package crypto
import (
+ "bytes"
"encoding/binary"
"golang.org/x/crypto/nacl/box"
@@ -17,9 +18,10 @@
// cbox implements a ControlCipher using go.crypto/nacl/box.
type cbox struct {
- sharedKey [32]byte
- enc cboxStream
- dec cboxStream
+ sharedKey [32]byte
+ channelBinding []byte
+ enc cboxStream
+ dec cboxStream
}
// cboxStream implements one stream of encryption or decryption.
@@ -73,10 +75,10 @@
copy(counter[:], nonce[16:])
}
-// NewControlCipher returns a ControlCipher for RPC versions greater than 6.
-func NewControlCipherRPC6(peersPublicKey, privateKey *BoxKey, isServer bool) ControlCipher {
+// NewControlCipher returns a ControlCipher for RPC versions from 6 to 10.
+func NewControlCipherRPC6(myPrivateKey, theirPublicKey *BoxKey, isServer bool) ControlCipher {
var c cbox
- box.Precompute(&c.sharedKey, (*[32]byte)(peersPublicKey), (*[32]byte)(privateKey))
+ box.Precompute(&c.sharedKey, (*[32]byte)(theirPublicKey), (*[32]byte)(myPrivateKey))
// The stream is full-duplex, and we want the directions to use different
// nonces, so we set bit (1 << 64) in the server-to-client stream, and leave
// it cleared in the client-to-server stream. advanceNone touches only the
@@ -90,6 +92,24 @@
return &c
}
+// NewControlCipher returns a ControlCipher for RPC versions greater than or equal to 11.
+func NewControlCipherRPC11(myPublicKey, myPrivateKey, theirPublicKey *BoxKey) ControlCipher {
+ var c cbox
+ box.Precompute(&c.sharedKey, (*[32]byte)(theirPublicKey), (*[32]byte)(myPrivateKey))
+ // The stream is full-duplex, and we want the directions to use different
+ // nonces, so we set bit (1 << 64) in one stream, and leave it cleared in
+ // the other server stream. advanceNone touches only the first 8 bytes,
+ // so this change is permanent for the duration of the stream.
+ if bytes.Compare(myPublicKey[:], theirPublicKey[:]) < 0 {
+ c.enc.nonce[8] = 1
+ c.channelBinding = append(myPublicKey[:], theirPublicKey[:]...)
+ } else {
+ c.dec.nonce[8] = 1
+ c.channelBinding = append(theirPublicKey[:], myPublicKey[:]...)
+ }
+ return &c
+}
+
// MACSize implements the ControlCipher method.
func (c *cbox) MACSize() int {
return cboxMACSize
@@ -145,3 +165,7 @@
c.dec.advanceNonce()
salsa.XORKeyStream(data, data, &counter, &subKey)
}
+
+func (c *cbox) ChannelBinding() []byte {
+ return c.channelBinding
+}
diff --git a/runtime/internal/rpc/stream/crypto/box_cipher_test.go b/runtime/internal/rpc/stream/crypto/box_cipher_test.go
index 6727f7d..a7c46ec 100644
--- a/runtime/internal/rpc/stream/crypto/box_cipher_test.go
+++ b/runtime/internal/rpc/stream/crypto/box_cipher_test.go
@@ -7,6 +7,7 @@
import (
"bytes"
"crypto/rand"
+ "errors"
"testing"
"golang.org/x/crypto/nacl/box"
@@ -21,17 +22,38 @@
return b
}
-func TestOpenSeal(t *testing.T) {
- pub1, pvt1, err := box.GenerateKey(rand.Reader)
+type testCipherVersion int
+
+const (
+ cipherRPC6 testCipherVersion = iota
+ cipherRPC11
+)
+
+func newCipher(ver testCipherVersion) (c1, c2 crypto.ControlCipher, err error) {
+ pk1, sk1, err := box.GenerateKey(rand.Reader)
if err != nil {
- t.Fatalf("can't generate key")
+ return nil, nil, errors.New("can't generate key")
}
- pub2, pvt2, err := box.GenerateKey(rand.Reader)
+ pk2, sk2, err := box.GenerateKey(rand.Reader)
if err != nil {
- t.Fatalf("can't generate key")
+ return nil, nil, errors.New("can't generate key")
}
- c1 := crypto.NewControlCipherRPC6((*crypto.BoxKey)(pub2), (*crypto.BoxKey)(pvt1), true)
- c2 := crypto.NewControlCipherRPC6((*crypto.BoxKey)(pub1), (*crypto.BoxKey)(pvt2), false)
+ switch ver {
+ case cipherRPC6:
+ c1 = crypto.NewControlCipherRPC6((*crypto.BoxKey)(sk1), (*crypto.BoxKey)(pk2), true)
+ c2 = crypto.NewControlCipherRPC6((*crypto.BoxKey)(sk2), (*crypto.BoxKey)(pk1), false)
+ case cipherRPC11:
+ c1 = crypto.NewControlCipherRPC11((*crypto.BoxKey)(pk1), (*crypto.BoxKey)(sk1), (*crypto.BoxKey)(pk2))
+ c2 = crypto.NewControlCipherRPC11((*crypto.BoxKey)(pk2), (*crypto.BoxKey)(sk2), (*crypto.BoxKey)(pk1))
+ }
+ return
+}
+
+func testCipherOpenSeal(t *testing.T, ver testCipherVersion) {
+ c1, c2, err := newCipher(ver)
+ if err != nil {
+ t.Fatalf("can't create cipher: %v", err)
+ }
msg1 := newMessage("hello")
if err := c1.Seal(msg1); err != nil {
@@ -92,18 +114,14 @@
t.Errorf("got %q, expected %q", msg3[:5], "hello")
}
}
+func TestCipherOpenSealRPC6(t *testing.T) { testCipherOpenSeal(t, cipherRPC6) }
+func TestCipherOpenSealRPC11(t *testing.T) { testCipherOpenSeal(t, cipherRPC11) }
-func TestXORKeyStream(t *testing.T) {
- pub1, pvt1, err := box.GenerateKey(rand.Reader)
+func testCipherXORKeyStream(t *testing.T, ver testCipherVersion) {
+ c1, c2, err := newCipher(ver)
if err != nil {
- t.Fatalf("can't generate key")
+ t.Fatalf("can't create cipher: %v", err)
}
- pub2, pvt2, err := box.GenerateKey(rand.Reader)
- if err != nil {
- t.Fatalf("can't generate key")
- }
- c1 := crypto.NewControlCipherRPC6((*crypto.BoxKey)(pub2), (*crypto.BoxKey)(pvt1), true)
- c2 := crypto.NewControlCipherRPC6((*crypto.BoxKey)(pub1), (*crypto.BoxKey)(pvt2), false)
msg1 := []byte("hello")
msg2 := []byte("world")
@@ -131,3 +149,26 @@
t.Errorf("got %q, expected 'hello'", s3)
}
}
+func TestCipherXORKeyStreamRPC6(t *testing.T) { testCipherXORKeyStream(t, cipherRPC6) }
+func TestCipherXORKeyStreamRPC11(t *testing.T) { testCipherXORKeyStream(t, cipherRPC11) }
+
+func TestCipherChannelBinding(t *testing.T) {
+ values := make([][]byte, 100)
+ for i := 0; i < len(values); i++ {
+ c1, c2, err := newCipher(cipherRPC11)
+ if err != nil {
+ t.Fatalf("can't create cipher: %v", err)
+ }
+ if !bytes.Equal(c1.ChannelBinding(), c2.ChannelBinding()) {
+ t.Fatalf("Two ends of the crypter ended up with different channel bindings (iteration #%d)", i)
+ }
+ values[i] = c1.ChannelBinding()
+ }
+ for i := 0; i < len(values); i++ {
+ for j := i + 1; j < len(values); j++ {
+ if bytes.Equal(values[i], values[j]) {
+ t.Fatalf("Same ChannelBinding seen on multiple channels (%d and %d)", i, j)
+ }
+ }
+ }
+}
diff --git a/runtime/internal/rpc/stream/crypto/control_cipher.go b/runtime/internal/rpc/stream/crypto/control_cipher.go
index ae25daf..33c97df 100644
--- a/runtime/internal/rpc/stream/crypto/control_cipher.go
+++ b/runtime/internal/rpc/stream/crypto/control_cipher.go
@@ -24,4 +24,11 @@
// Decrypt decrypts the data in place. No MAC is verified.
Decrypt(data []byte)
+
+ // ChannelBinding Returns a byte slice that is unique for the the
+ // particular cipher (and the parties between which it is operating).
+ // Having both parties assert out of the band that they are indeed
+ // participating in a connection with that channel binding value is
+ // sufficient to authenticate the data received through the cipher.
+ ChannelBinding() []byte
}
diff --git a/runtime/internal/rpc/stream/crypto/crypto.go b/runtime/internal/rpc/stream/crypto/crypto.go
index bee1726..aba2628 100644
--- a/runtime/internal/rpc/stream/crypto/crypto.go
+++ b/runtime/internal/rpc/stream/crypto/crypto.go
@@ -29,7 +29,7 @@
type Crypter interface {
Encrypter
Decrypter
- // ChannelBinding Returns a byte slice that is unique for the the
+ // ChannelBinding returns a byte slice that is unique for the the
// particular crypter (and the parties between which it is operating).
// Having both parties assert out of the band that they are indeed
// participating in a connection with that channel binding value is
diff --git a/runtime/internal/rpc/stream/crypto/crypto_test.go b/runtime/internal/rpc/stream/crypto/crypto_test.go
index fe83db7..4f67cb6 100644
--- a/runtime/internal/rpc/stream/crypto/crypto_test.go
+++ b/runtime/internal/rpc/stream/crypto/crypto_test.go
@@ -41,6 +41,16 @@
crypter.String() // Only to test that String does not crash.
}
+func TestNullWithChannelBinding(t *testing.T) {
+ cb := []byte{1, 2, 3, 5}
+ crypter := NewNullCrypterWithChannelBinding(cb)
+ quickTest(t, crypter, crypter)
+ if got := crypter.ChannelBinding(); !bytes.Equal(cb, got) {
+ t.Errorf("Unexpected channel binding; got %q, want %q", got, cb)
+ }
+ crypter.String() // Only to test that String does not crash.
+}
+
func testSimple(t *testing.T, c1, c2 Crypter) {
// Execute String just to check that it does not crash.
c1.String()
diff --git a/runtime/internal/rpc/stream/crypto/null.go b/runtime/internal/rpc/stream/crypto/null.go
index 036b541..73110a6 100644
--- a/runtime/internal/rpc/stream/crypto/null.go
+++ b/runtime/internal/rpc/stream/crypto/null.go
@@ -9,9 +9,21 @@
// NewNullCrypter returns a Crypter that does no encryption/decryption.
func NewNullCrypter() Crypter { return null{} }
-type null struct{}
+// NewNullCrypterWithChannelBinding returns a null Crypter with a channel binding.
+func NewNullCrypterWithChannelBinding(channelBinding []byte) Crypter {
+ return null{channelBinding}
+}
+
+type null struct {
+ channelBinding []byte
+}
func (null) Encrypt(src *iobuf.Slice) (*iobuf.Slice, error) { return src, nil }
func (null) Decrypt(src *iobuf.Slice) (*iobuf.Slice, error) { return src, nil }
-func (null) String() string { return "Null" }
-func (null) ChannelBinding() []byte { return nil }
+func (n null) ChannelBinding() []byte { return n.channelBinding }
+func (n null) String() string {
+ if n.channelBinding == nil {
+ return "Null"
+ }
+ return "Null(ChannelBinding)"
+}
diff --git a/runtime/internal/rpc/stream/crypto/null_cipher.go b/runtime/internal/rpc/stream/crypto/null_cipher.go
index cdfadc5..96a642c 100644
--- a/runtime/internal/rpc/stream/crypto/null_cipher.go
+++ b/runtime/internal/rpc/stream/crypto/null_cipher.go
@@ -12,6 +12,7 @@
func (NullControlCipher) Open(data []byte) bool { return true }
func (NullControlCipher) Encrypt(data []byte) {}
func (NullControlCipher) Decrypt(data []byte) {}
+func (NullControlCipher) ChannelBinding() []byte { return nil }
type disabledControlCipher struct {
NullControlCipher
diff --git a/runtime/internal/rpc/stream/crypto/tls.go b/runtime/internal/rpc/stream/crypto/tls.go
index db8833c..b210a11 100644
--- a/runtime/internal/rpc/stream/crypto/tls.go
+++ b/runtime/internal/rpc/stream/crypto/tls.go
@@ -194,6 +194,10 @@
return plaintext, nil
}
+func (c *tlsCrypter) ChannelBinding() []byte {
+ return c.tls.ConnectionState().TLSUnique
+}
+
func (c *tlsCrypter) String() string {
state := c.tls.ConnectionState()
return fmt.Sprintf("TLS CipherSuite:0x%04x Resumed:%v", state.CipherSuite, state.DidResume)
@@ -220,10 +224,6 @@
}
}
-func (c *tlsCrypter) ChannelBinding() []byte {
- return c.tls.ConnectionState().TLSUnique
-}
-
// PEM-encoded certificates and keys used in the tests.
// One way to generate them is:
// go run $GOROOT/src/pkg/crypto/tls/generate_cert.go --host=localhost --duration=87600h --ecdsa-curve=P256
diff --git a/runtime/internal/rpc/stream/manager/manager_test.go b/runtime/internal/rpc/stream/manager/manager_test.go
index bce2bab..2c20fcb 100644
--- a/runtime/internal/rpc/stream/manager/manager_test.go
+++ b/runtime/internal/rpc/stream/manager/manager_test.go
@@ -65,6 +65,7 @@
server := InternalNew(naming.FixedRoutingID(0x55555555))
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
pclient := testutil.NewPrincipal("client")
+ pclient2 := testutil.NewPrincipal("client2")
pserver := testutil.NewPrincipal("server")
ln, ep, err := server.Listen(protocol, "127.0.0.1:0", pserver, pserver.BlessingStore().Default())
@@ -141,8 +142,10 @@
if err := writeLine(clientF2, lotsOfData); err == nil {
t.Errorf("Should not be able to Dial or Write after the Listener is closed")
}
- // Opening a new VC should fail fast.
- if _, err := client.Dial(ep, pclient); err == nil {
+ // Opening a new VC should fail fast. Note that we need to use a different
+ // principal since the client doesn't expect a response from a server
+ // when re-using VIF authentication.
+ if _, err := client.Dial(ep, pclient2); err == nil {
t.Errorf("Should not be able to Dial after listener is closed")
}
}
@@ -458,12 +461,11 @@
if err != nil {
t.Fatal(err)
}
+ errch := make(chan error)
go func() {
for {
_, err := ln.Accept()
- if err != nil {
- return
- }
+ errch <- err
}
}()
@@ -475,6 +477,10 @@
if f == nil || err != nil {
t.Fatalf("vc.Connect failed: (%v, %v)", f, err)
}
+ // Wait until the server accepts the flow or fails.
+ if err = <-errch; err != nil {
+ t.Fatalf("ln.Accept failed: %v", err)
+ }
// Trigger the idle timers.
triggerTimers()
@@ -664,6 +670,7 @@
func testServerRestartDuringClientLifetime(t *testing.T, protocol string) {
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
pclient := testutil.NewPrincipal("client")
+ pclient2 := testutil.NewPrincipal("client2")
sh, err := modules.NewShell(nil, nil, testing.Verbose(), t)
if err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -683,8 +690,10 @@
}
h.Shutdown(nil, os.Stderr)
- // A new VC cannot be created since the server is dead
- if _, err := client.Dial(ep, pclient); err == nil {
+ // A new VC cannot be created since the server is dead. Note that we need to
+ // use a different principal since the client doesn't expect a response from
+ // a server when re-using VIF authentication.
+ if _, err := client.Dial(ep, pclient2); err == nil {
t.Fatal("Expected client.Dial to fail since server is dead")
}
@@ -953,12 +962,12 @@
errCh := make(chan error, 10)
for i := 0; i < 10; i++ {
go func() {
- _, err = client.Dial(nep, testutil.NewPrincipal("client"))
+ _, err := client.Dial(nep, testutil.NewPrincipal("client"))
errCh <- err
}()
}
for i := 0; i < 10; i++ {
- if err = <-errCh; err != nil {
+ if err := <-errCh; err != nil {
t.Fatal(err)
}
}
diff --git a/runtime/internal/rpc/stream/message/coding.go b/runtime/internal/rpc/stream/message/coding.go
index d8e7c68..ad028ba 100644
--- a/runtime/internal/rpc/stream/message/coding.go
+++ b/runtime/internal/rpc/stream/message/coding.go
@@ -26,8 +26,16 @@
// message visible to the user.
errLargerThan3ByteUint = reg(".errLargerThan3ByteUnit", "integer too large to represent in 3 bytes")
errReadWrongNumBytes = reg(".errReadWrongNumBytes", "read {3} bytes, wanted to read {4}")
+
+ sizeOfSizeT int // How many bytes are used to encode the type sizeT.
)
+type sizeT uint32
+
+func init() {
+ sizeOfSizeT = binary.Size(sizeT(0))
+}
+
func write3ByteUint(dst []byte, n int) error {
if n >= (1<<24) || n < 0 {
return verror.New(errLargerThan3ByteUint, nil)
@@ -53,37 +61,45 @@
return uint32(src[0])<<24 | uint32(src[1])<<16 | uint32(src[2])<<8 | uint32(src[3])
}
-func readInt(r io.Reader, ptr interface{}) error {
- return binary.Read(r, binary.BigEndian, ptr)
+func readInt(r io.Reader, i interface{}) error {
+ return binary.Read(r, binary.BigEndian, i)
}
-func writeInt(w io.Writer, ptr interface{}) error {
- return binary.Write(w, binary.BigEndian, ptr)
+func writeInt(w io.Writer, i interface{}) error {
+ return binary.Write(w, binary.BigEndian, i)
}
-func readString(r io.Reader, s *string) error {
- var size uint32
- if err := readInt(r, &size); err != nil {
- return err
- }
- bytes := make([]byte, size)
- n, err := r.Read(bytes)
- if err != nil {
- return err
- }
- if n != int(size) {
- return verror.New(errReadWrongNumBytes, nil, n, int(size))
- }
- *s = string(bytes)
- return nil
+func readString(r io.Reader) (string, error) {
+ b, err := readBytes(r)
+ return string(b), err
}
func writeString(w io.Writer, s string) error {
- size := uint32(len(s))
+ return writeBytes(w, []byte(s))
+}
+
+func readBytes(r io.Reader) ([]byte, error) {
+ var size sizeT
+ if err := readInt(r, &size); err != nil {
+ return nil, err
+ }
+ b := make([]byte, size)
+ n, err := r.Read(b)
+ if err != nil {
+ return nil, err
+ }
+ if n != int(size) {
+ return nil, verror.New(errReadWrongNumBytes, nil, n, int(size))
+ }
+ return b, nil
+}
+
+func writeBytes(w io.Writer, b []byte) error {
+ size := sizeT(len(b))
if err := writeInt(w, size); err != nil {
return err
}
- n, err := w.Write([]byte(s))
+ n, err := w.Write(b)
if err != nil {
return err
}
@@ -176,12 +192,24 @@
}
l := &io.LimitedReader{R: r, N: int64(size)}
switch code {
- case naclBoxPublicKey:
+ case naclBoxOptionCode:
var opt NaclBox
if err := opt.read(l); err != nil {
return nil, err
}
opts = append(opts, &opt)
+ case peerEndpointOptionCode:
+ var opt PeerEndpoint
+ if err := opt.read(l); err != nil {
+ return nil, err
+ }
+ opts = append(opts, &opt)
+ case useVIFAuthenticationOptionCode:
+ var opt UseVIFAuthentication
+ if err := opt.read(l); err != nil {
+ return nil, err
+ }
+ opts = append(opts, &opt)
}
// Consume any data remaining.
readAndDiscardToError(l)
diff --git a/runtime/internal/rpc/stream/message/control.go b/runtime/internal/rpc/stream/message/control.go
index 0c33b91..8ccc494 100644
--- a/runtime/internal/rpc/stream/message/control.go
+++ b/runtime/internal/rpc/stream/message/control.go
@@ -79,6 +79,29 @@
Options []SetupOption
}
+// SetupStream is a byte stream used to negotiate VIF setup. During VIF setup,
+// each party sends a Setup message to the other party containing their version
+// and options. If the version requires further negotiation (such as for
+// authentication), the SetupStream is used for the negotiation.
+//
+// The protocol used on the stream is version-specific, it is not specified here.
+// See vif/auth.go for an example.
+type SetupStream struct {
+ Data []byte
+}
+
+// Command enum.
+type command uint8
+
+const (
+ closeVCCommand command = 1
+ addReceiveBuffersCommand command = 2
+ openFlowCommand command = 3
+ setupCommand command = 4
+ setupStreamCommand command = 5
+ setupVCCommand command = 6
+)
+
// SetupOption is the base interface for optional Setup options.
type SetupOption interface {
// code is the identifier for the option.
@@ -94,42 +117,33 @@
read(r io.Reader) error
}
+// Setup option codes.
+type setupOptionCode uint16
+
+const (
+ naclBoxOptionCode setupOptionCode = 0
+ peerEndpointOptionCode setupOptionCode = 1
+ useVIFAuthenticationOptionCode setupOptionCode = 2
+)
+
// NaclBox is a SetupOption that specifies the public key for the NaclBox
// encryption protocol.
type NaclBox struct {
PublicKey crypto.BoxKey
}
-// SetupStream is a byte stream used to negotiate VIF setup. During VIF setup,
-// each party sends a Setup message to the other party containing their version
-// and options. If the version requires further negotiation (such as for authentication),
-// the SetupStream is used for the negotiation.
-//
-// The protocol used on the stream is version-specific, it is not specified here. See
-// vif/auth.go for an example.
-type SetupStream struct {
- Data []byte
+// PeerEndpoint is a SetupOption that exchanges the endpoints between peers.
+type PeerEndpoint struct {
+ LocalEndpoint naming.Endpoint // Endpoint of the sender (as seen by the sender).
}
-// Setup option codes.
-type setupOptionCode uint16
-
-const (
- naclBoxPublicKey setupOptionCode = 0
-)
-
-// Command enum.
-type command uint8
-
-const (
- deprecatedOpenVCCommand command = 0
- closeVCCommand command = 1
- addReceiveBuffersCommand command = 2
- openFlowCommand command = 3
- hopSetupCommand command = 4
- hopSetupStreamCommand command = 5
- setupVCCommand command = 6
-)
+// UseVIFAuthentication is a SetupOption that notifies the server to use
+// the VIF authentication for the new virtual circuit.
+type UseVIFAuthentication struct {
+ // Signature for binding a principal to a channel to make sure that the peer
+ // who requests to use VIF authentication is the same peer of the VIF.
+ Signature []byte
+}
func writeControl(w io.Writer, m Control) error {
var command command
@@ -141,9 +155,9 @@
case *OpenFlow:
command = openFlowCommand
case *Setup:
- command = hopSetupCommand
+ command = setupCommand
case *SetupStream:
- command = hopSetupStreamCommand
+ command = setupStreamCommand
case *SetupVC:
command = setupVCCommand
default:
@@ -175,9 +189,9 @@
m = new(AddReceiveBuffers)
case openFlowCommand:
m = new(OpenFlow)
- case hopSetupCommand:
+ case setupCommand:
m = new(Setup)
- case hopSetupStreamCommand:
+ case setupStreamCommand:
m = new(SetupStream)
case setupVCCommand:
m = new(SetupVC)
@@ -194,9 +208,7 @@
if err = writeInt(w, m.VCI); err != nil {
return
}
- if err = writeString(w, m.Error); err != nil {
- return
- }
+ err = writeString(w, m.Error)
return
}
@@ -204,9 +216,7 @@
if err = readInt(r, &m.VCI); err != nil {
return
}
- if err = readString(r, &m.Error); err != nil {
- return
- }
+ m.Error, err = readString(r)
return
}
@@ -214,26 +224,23 @@
if err = writeInt(w, m.VCI); err != nil {
return
}
- var localep string
+ var ep string
if m.LocalEndpoint != nil {
- localep = m.LocalEndpoint.String()
+ ep = m.LocalEndpoint.String()
}
- if err = writeString(w, localep); err != nil {
+ if err = writeString(w, ep); err != nil {
return
}
- var remoteep string
if m.RemoteEndpoint != nil {
- remoteep = m.RemoteEndpoint.String()
+ ep = m.RemoteEndpoint.String()
}
- if err = writeString(w, remoteep); err != nil {
+ if err = writeString(w, ep); err != nil {
return
}
if err = writeCounters(w, m.Counters); err != nil {
return
}
- if err = m.Setup.writeTo(w); err != nil {
- return
- }
+ err = m.Setup.writeTo(w)
return
}
@@ -242,18 +249,18 @@
return
}
var ep string
- if err = readString(r, &ep); err != nil {
+ if ep, err = readString(r); err != nil {
return
}
- if ep != "" {
+ if len(ep) > 0 {
if m.LocalEndpoint, err = inaming.NewEndpoint(ep); err != nil {
return
}
}
- if err = readString(r, &ep); err != nil {
+ if ep, err = readString(r); err != nil {
return
}
- if ep != "" {
+ if len(ep) > 0 {
if m.RemoteEndpoint, err = inaming.NewEndpoint(ep); err != nil {
return
}
@@ -261,9 +268,7 @@
if m.Counters, err = readCounters(r); err != nil {
return
}
- if err = m.Setup.readFrom(r); err != nil {
- return
- }
+ err = m.Setup.readFrom(r)
return
}
@@ -283,9 +288,7 @@
if err = writeInt(w, m.Flow); err != nil {
return
}
- if err = writeInt(w, m.InitialCounters); err != nil {
- return
- }
+ err = writeInt(w, m.InitialCounters)
return
}
@@ -296,9 +299,7 @@
if err = readInt(r, &m.Flow); err != nil {
return
}
- if err = readInt(r, &m.InitialCounters); err != nil {
- return
- }
+ err = readInt(r, &m.InitialCounters)
return
}
@@ -309,9 +310,7 @@
if err = writeInt(w, m.Versions.Max); err != nil {
return
}
- if err = writeSetupOptions(w, m.Options); err != nil {
- return
- }
+ err = writeSetupOptions(w, m.Options)
return
}
@@ -322,24 +321,32 @@
if err = readInt(r, &m.Versions.Max); err != nil {
return
}
- if m.Options, err = readSetupOptions(r); err != nil {
- return
- }
+ m.Options, err = readSetupOptions(r)
return
}
+func (m *SetupStream) writeTo(w io.Writer) error {
+ _, err := w.Write(m.Data)
+ return err
+}
+
+func (m *SetupStream) readFrom(r *bytes.Buffer) error {
+ m.Data = r.Bytes()
+ return nil
+}
+
// NaclBox returns the first NaclBox option, or nil if there is none.
func (m *Setup) NaclBox() *NaclBox {
for _, opt := range m.Options {
- if b, ok := opt.(*NaclBox); ok {
- return b
+ if o, ok := opt.(*NaclBox); ok {
+ return o
}
}
return nil
}
func (*NaclBox) code() setupOptionCode {
- return naclBoxPublicKey
+ return naclBoxOptionCode
}
func (m *NaclBox) size() uint16 {
@@ -356,12 +363,72 @@
return err
}
-func (m *SetupStream) writeTo(w io.Writer) error {
- _, err := w.Write(m.Data)
- return err
+// PeerEndpoint returns the naming.Endpoint in the first PeerEndpoint
+// option, or nil if there is none.
+func (m *Setup) PeerEndpoint() naming.Endpoint {
+ for _, opt := range m.Options {
+ if o, ok := opt.(*PeerEndpoint); ok {
+ return o.LocalEndpoint
+ }
+ }
+ return nil
}
-func (m *SetupStream) readFrom(r *bytes.Buffer) error {
- m.Data = r.Bytes()
+func (*PeerEndpoint) code() setupOptionCode {
+ return peerEndpointOptionCode
+}
+
+func (m *PeerEndpoint) size() uint16 {
+ var ep string
+ if m.LocalEndpoint != nil {
+ ep = m.LocalEndpoint.String()
+ }
+ return uint16(sizeOfSizeT + len(ep))
+}
+
+func (m *PeerEndpoint) write(w io.Writer) error {
+ var ep string
+ if m.LocalEndpoint != nil {
+ ep = m.LocalEndpoint.String()
+ }
+ return writeString(w, ep)
+}
+
+func (m *PeerEndpoint) read(r io.Reader) (err error) {
+ var ep string
+ if ep, err = readString(r); err != nil {
+ return
+ }
+ if len(ep) > 0 {
+ m.LocalEndpoint, err = inaming.NewEndpoint(ep)
+ }
+ return
+}
+
+// UseVIFAuthentication returns the signature of the first UseVIFAuthentication
+// option, or nil if there is none.
+func (m *Setup) UseVIFAuthentication() []byte {
+ for _, opt := range m.Options {
+ if o, ok := opt.(*UseVIFAuthentication); ok {
+ return o.Signature
+ }
+ }
return nil
}
+
+func (*UseVIFAuthentication) code() setupOptionCode {
+ return useVIFAuthenticationOptionCode
+}
+
+func (m *UseVIFAuthentication) size() uint16 {
+ return uint16(sizeOfSizeT + len(m.Signature))
+}
+
+func (m *UseVIFAuthentication) write(w io.Writer) error {
+ return writeBytes(w, m.Signature)
+}
+
+func (m *UseVIFAuthentication) read(r io.Reader) (err error) {
+ m.Signature, err = readBytes(r)
+ return
+}
diff --git a/runtime/internal/rpc/stream/message/message_test.go b/runtime/internal/rpc/stream/message/message_test.go
index aa79604..72c108e 100644
--- a/runtime/internal/rpc/stream/message/message_test.go
+++ b/runtime/internal/rpc/stream/message/message_test.go
@@ -65,6 +65,10 @@
return nil
}
+func (c *testControlCipher) ChannelBinding() []byte {
+ return nil
+}
+
func TestControl(t *testing.T) {
counters := NewCounters()
counters.Add(12, 13, 10240)
@@ -101,7 +105,28 @@
Versions: iversion.Range{Min: 34, Max: 56},
Options: []SetupOption{
&NaclBox{PublicKey: crypto.BoxKey{'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'}},
- &NaclBox{PublicKey: crypto.BoxKey{7, 67, 31}},
+ },
+ },
+ },
+ // SetupVC with use-vif-authentication.
+ &SetupVC{
+ VCI: 1,
+ LocalEndpoint: &inaming.Endpoint{
+ Protocol: "tcp",
+ Address: "batman.com:1990",
+ RID: naming.FixedRoutingID(0xba7),
+ },
+ RemoteEndpoint: &inaming.Endpoint{
+ Protocol: "tcp",
+ Address: "bugsbunny.com:1940",
+ RID: naming.FixedRoutingID(0xbb),
+ },
+ Counters: counters,
+ Setup: Setup{
+ Versions: iversion.Range{Min: 34, Max: 56},
+ Options: []SetupOption{
+ &NaclBox{PublicKey: crypto.BoxKey{'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'}},
+ &UseVIFAuthentication{Signature: []byte{'s', 'i', 'g', 'n', 'a', 't', 'u', 'r', 'e'}},
},
},
},
@@ -115,7 +140,27 @@
Versions: iversion.Range{Min: 21, Max: 71},
Options: []SetupOption{
&NaclBox{PublicKey: crypto.BoxKey{'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'}},
- &NaclBox{PublicKey: crypto.BoxKey{7, 67, 31}},
+ },
+ },
+ // Setup with peer endpoint.
+ &Setup{
+ Versions: iversion.Range{Min: 21, Max: 71},
+ Options: []SetupOption{
+ &NaclBox{PublicKey: crypto.BoxKey{'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'}},
+ &PeerEndpoint{
+ LocalEndpoint: &inaming.Endpoint{
+ Protocol: "tcp",
+ Address: "batman.com:1990",
+ RID: naming.FixedRoutingID(0xba7),
+ },
+ },
+ &PeerEndpoint{
+ LocalEndpoint: &inaming.Endpoint{
+ Protocol: "tcp",
+ Address: "bugsbunny.com:1940",
+ RID: naming.FixedRoutingID(0xbb),
+ },
+ },
},
},
diff --git a/runtime/internal/rpc/stream/proxy/proxy.go b/runtime/internal/rpc/stream/proxy/proxy.go
index 40fec9b..b623365 100644
--- a/runtime/internal/rpc/stream/proxy/proxy.go
+++ b/runtime/internal/rpc/stream/proxy/proxy.go
@@ -291,7 +291,7 @@
blessings = p.principal.BlessingStore().Default()
}
- c, err := vif.AuthenticateAsServer(conn, reader, nil, p.principal, blessings, nil)
+ cipher, _, err := vif.AuthenticateAsServer(conn, reader, nil, nil, p.principal, blessings, nil)
if err != nil {
processLog().Infof("Process %v failed to authenticate: %s", p, err)
return
@@ -302,7 +302,7 @@
conn: conn,
pool: pool,
reader: reader,
- ctrlCipher: c,
+ ctrlCipher: cipher,
queue: upcqueue.New(),
routingTable: make(map[id.VC]*destination),
servers: make(map[id.VC]*vc.VC),
@@ -310,6 +310,11 @@
}
p.mu.Lock()
+ if p.processes == nil {
+ // The proxy has been shutdowned.
+ p.mu.Unlock()
+ return
+ }
p.processes[process] = struct{}{}
p.mu.Unlock()
@@ -635,7 +640,7 @@
}
return theirPK, nil
}
- go p.proxy.runServer(server, vcObj.HandshakeAcceptedVC(intersection.Max, p.proxy.principal, p.proxy.blessings, keyExchanger))
+ go p.proxy.runServer(server, vcObj.HandshakeAcceptedVCWithAuthentication(intersection.Max, p.proxy.principal, p.proxy.blessings, keyExchanger))
}
break
}
diff --git a/runtime/internal/rpc/stream/vc/auth.go b/runtime/internal/rpc/stream/vc/auth.go
index ff9ec89..b85a4f1 100644
--- a/runtime/internal/rpc/stream/vc/auth.go
+++ b/runtime/internal/rpc/stream/vc/auth.go
@@ -35,10 +35,14 @@
errNoBlessingsToPresentToServer = reg(".errerrNoBlessingsToPresentToServer ", "no blessings to present as a server")
)
+// TODO(jhahn): Add len(ChannelBinding) > 0 check in writeBlessing/readBlessings
+// to make sure that the auth protocol only works when len(ChannelBinding) > 0
+// once we deprecate RPCv10.
+
// AuthenticateAsServer executes the authentication protocol at the server.
// It returns the blessings shared by the client, and the discharges shared
// by the server.
-func AuthenticateAsServer(conn io.ReadWriteCloser, principal security.Principal, server security.Blessings, dc DischargeClient, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
+func AuthenticateAsServer(conn io.ReadWriteCloser, crypter crypto.Crypter, v version.RPCVersion, principal security.Principal, server security.Blessings, dc DischargeClient) (security.Blessings, map[string]security.Discharge, error) {
if server.IsZero() {
return security.Blessings{}, nil, verror.New(stream.ErrSecurity, nil, verror.New(errNoBlessingsToPresentToServer, nil))
}
@@ -59,12 +63,12 @@
}
// AuthenticateAsClient executes the authentication protocol at the client.
-// It returns the blessing shared by the server, the blessings shared by the
-// client, and any discharges shared by the server.
+// It returns the blessing shared by the client, and the blessings and any
+// discharges shared by the server.
//
// The client will only share its blessings if the server (who shares its
// blessings first) is authorized as per the authorizer for this RPC.
-func AuthenticateAsClient(conn io.ReadWriteCloser, crypter crypto.Crypter, params security.CallParams, auth *ServerAuthorizer, v version.RPCVersion) (security.Blessings, security.Blessings, map[string]security.Discharge, error) {
+func AuthenticateAsClient(conn io.ReadWriteCloser, crypter crypto.Crypter, v version.RPCVersion, params security.CallParams, auth *ServerAuthorizer) (security.Blessings, security.Blessings, map[string]security.Discharge, error) {
server, serverDischarges, err := readBlessings(conn, authServerContextTag, crypter, v)
if err != nil {
return security.Blessings{}, security.Blessings{}, nil, err
@@ -74,6 +78,7 @@
params.RemoteBlessings = server
params.RemoteDischarges = serverDischarges
if err := auth.Authorize(params); err != nil {
+ // Note this error type should match with the one in HandshakeDialedVCPreAuthenticated().
return security.Blessings{}, security.Blessings{}, nil, verror.New(stream.ErrNotTrusted, nil, err)
}
}
@@ -89,7 +94,7 @@
if err := writeBlessings(conn, authClientContextTag, crypter, principal, client, nil, v); err != nil {
return security.Blessings{}, security.Blessings{}, nil, err
}
- return server, client, serverDischarges, nil
+ return client, server, serverDischarges, nil
}
func writeBlessings(w io.Writer, tag []byte, crypter crypto.Crypter, p security.Principal, b security.Blessings, discharges []security.Discharge, v version.RPCVersion) error {
@@ -163,3 +168,40 @@
}
return m
}
+
+func bindClientPrincipalToChannel(crypter crypto.Crypter, p security.Principal) ([]byte, error) {
+ sig, err := p.Sign(append(authClientContextTag, crypter.ChannelBinding()...))
+ if err != nil {
+ return nil, err
+ }
+ var buf bytes.Buffer
+ enc := vom.NewEncoder(&buf)
+ if err := enc.Encode(sig); err != nil {
+ return nil, verror.New(errVomEncodeBlessing, nil, err)
+ }
+ msg, err := crypter.Encrypt(iobuf.NewSlice(buf.Bytes()))
+ if err != nil {
+ return nil, err
+ }
+ defer msg.Release()
+ signature := make([]byte, len(msg.Contents))
+ copy(signature, msg.Contents)
+ return signature, nil
+}
+
+func verifyClientPrincipalBoundToChannel(signature []byte, crypter crypto.Crypter, publicKey security.PublicKey) error {
+ msg, err := crypter.Decrypt(iobuf.NewSlice(signature))
+ if err != nil {
+ return err
+ }
+ defer msg.Release()
+ dec := vom.NewDecoder(bytes.NewReader(msg.Contents))
+ var sig security.Signature
+ if err = dec.Decode(&sig); err != nil {
+ return verror.New(errHandshakeMessage, nil, err)
+ }
+ if !sig.Verify(publicKey, append(authClientContextTag, crypter.ChannelBinding()...)) {
+ return verror.New(errInvalidSignatureInMessage, nil)
+ }
+ return nil
+}
diff --git a/runtime/internal/rpc/stream/vc/vc.go b/runtime/internal/rpc/stream/vc/vc.go
index 06f2c58..75585d5 100644
--- a/runtime/internal/rpc/stream/vc/vc.go
+++ b/runtime/internal/rpc/stream/vc/vc.go
@@ -313,6 +313,7 @@
var err error
if payload, err = vc.crypter.Decrypt(payload); err != nil {
vc.mu.Unlock()
+ vlog.Errorf("failed to decrypt payload on VC %v failed: %v", vc, err)
return verror.New(stream.ErrSecurity, nil, verror.New(errFailedToDecryptPayload, nil, err))
}
}
@@ -452,9 +453,9 @@
return err
}
-// FinishHandshakeDialedVC should be called from another goroutine
-// after HandshakeDialedVC is called, when version/encryption
-// negotiation is complete.
+// FinishHandshakeDialedVC should be called from another goroutine after
+// HandshakeDialedVCWithAuthentication or HandshakeDialedVCNoAuthentication
+// is called, when version/encryption negotiation is complete.
func (vc *VC) FinishHandshakeDialedVC(vers version.RPCVersion, remotePubKey *crypto.BoxKey) error {
vc.mu.Lock()
defer vc.mu.Unlock()
@@ -467,87 +468,50 @@
return nil
}
-// HandshakeDialedVC completes initialization of the VC (setting up encryption,
-// authentication etc.) under the assumption that the VC was initiated by the
-// local process (i.e., the local process "Dial"ed to create the VC).
-// HandshakeDialedVC will not return until FinishHandshakeDialedVC is called
-// from another goroutine.
-// TODO(mattr): vers can be removed as a parameter when we get rid of TLS and
-// the version is always obtained via the Setup call.
-func (vc *VC) HandshakeDialedVC(principal security.Principal, sendKey func(*crypto.BoxKey) error, opts ...stream.VCOpt) error {
+// HandshakeDialedVCWithAuthentication completes initialization of the VC (setting
+// up encryption, authentication etc.) under the assumption that the VC was initiated
+// by the local process (i.e., the local process "Dial"ed to create the VC).
+// HandshakeDialedVCWithAuthentication will not return until FinishHandshakeDialedVC
+// is called from another goroutine.
+func (vc *VC) HandshakeDialedVCWithAuthentication(principal security.Principal, sendSetupVC func(*crypto.BoxKey) error, opts ...stream.VCOpt) error {
remotePubKeyChan := make(chan *crypto.BoxKey, 1)
vc.mu.Lock()
vc.remotePubKeyChan = remotePubKeyChan
vc.mu.Unlock()
-
- // principal = nil means that we are running in SecurityNone and we don't need
- // to authenticate the VC. We still need to negotiate versioning information,
- // so we still set remotePubKeyChan and call sendKey, though we don't care about
- // the resulting key.
- if principal == nil {
- if err := sendKey(nil); err != nil {
- return err
- }
- // TODO(mattr): Error on some timeout so a non-responding server doesn't
- // cause this to hang forever.
- select {
- case <-remotePubKeyChan:
- case <-vc.closeCh:
- return verror.New(stream.ErrNetwork, nil, verror.New(errClosedDuringHandshake, nil, vc.VCI))
- }
- return nil
- }
-
- var auth *ServerAuthorizer
- for _, o := range opts {
- switch v := o.(type) {
- case *ServerAuthorizer:
- auth = v
- }
- }
-
exchange := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
- if err := sendKey(pubKey); err != nil {
+ if err := sendSetupVC(pubKey); err != nil {
return nil, err
}
// TODO(mattr): Error on some timeout so a non-responding server doesn't
// cause this to hang forever.
select {
- case theirKey := <-remotePubKeyChan:
- return theirKey, nil
+ case remotePublicKey := <-remotePubKeyChan:
+ return remotePublicKey, nil
case <-vc.closeCh:
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errClosedDuringHandshake, nil, vc.VCI))
}
}
crypter, err := crypto.NewBoxCrypter(exchange, vc.pool)
if err != nil {
- return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil,
- verror.New(errFailedToSetupEncryption, nil, err)))
+ return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToSetupEncryption, nil, err)))
}
+
// The version is set by FinishHandshakeDialedVC and exchange (called by
// NewBoxCrypter) will block until FinishHandshakeDialedVC is called, so we
// should look up the version now.
- vers := vc.Version()
+ ver := vc.Version()
- // Authenticate (exchange identities)
+ // Authenticate (exchange blessings).
authConn, err := vc.connectFID(AuthFlowID, systemFlowPriority)
if err != nil {
return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForAuth, nil, err)))
}
- params := security.CallParams{
- LocalPrincipal: principal,
- LocalEndpoint: vc.localEP,
- RemoteEndpoint: vc.remoteEP,
- }
-
- rBlessings, lBlessings, rDischarges, err := AuthenticateAsClient(authConn, crypter, params, auth, vers)
- if err != nil || len(rBlessings.ThirdPartyCaveats()) == 0 {
- authConn.Close()
- if err != nil {
- return vc.appendCloseReason(err)
- }
- } else if vers < version.RPCVersion10 {
- go vc.recvDischargesLoop(authConn)
+ auth := serverAuthorizer(opts)
+ params := security.CallParams{LocalPrincipal: principal, LocalEndpoint: vc.localEP, RemoteEndpoint: vc.remoteEP}
+ lBlessings, rBlessings, rDischarges, err := AuthenticateAsClient(authConn, crypter, ver, params, auth)
+ authConn.Close()
+ if err != nil {
+ return vc.appendCloseReason(err)
}
vc.mu.Lock()
@@ -562,9 +526,87 @@
if err = vc.connectSystemFlows(); err != nil {
return vc.appendCloseReason(err)
}
+ vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
+ return nil
+}
- vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v",
- vc, rBlessings, lBlessings)
+// HandshakeDialedVCPreAuthenticated completes initialization of the VC
+// under the assumption that authentication happened out of band.
+// Authentication results are provided via params. Unlike
+// HandshakeDialedVCWithAuthentication or HandshakeDialedVCNoAuthentication,
+// this doesn't wait for FinishHandshakeDialedVC.
+func (vc *VC) HandshakeDialedVCPreAuthenticated(ver version.RPCVersion, params security.CallParams, remotePublicKeyPreauth *crypto.BoxKey, sendSetupVC func(*crypto.BoxKey, []byte) error, opts ...stream.VCOpt) error {
+ pk, sk, err := crypto.GenerateBoxKey()
+ if err != nil {
+ return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToSetupEncryption, nil, err)))
+ }
+ crypter := crypto.NewBoxCrypterWithKey(pk, sk, remotePublicKeyPreauth, vc.pool)
+ sigPreauth, err := bindClientPrincipalToChannel(crypter, params.LocalPrincipal)
+ if err != nil {
+ return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil, err))
+ }
+
+ if err := sendSetupVC(pk, sigPreauth); err != nil {
+ return vc.appendCloseReason(err)
+ }
+
+ // Authorize the server.
+ if auth := serverAuthorizer(opts); auth != nil {
+ if err := auth.Authorize(params); err != nil {
+ // Note this error type should match with the one in AuthenticateAsClient().
+ return vc.appendCloseReason(verror.New(stream.ErrNotTrusted, nil, err))
+ }
+ }
+
+ vc.mu.Lock()
+ vc.version = ver
+ vc.crypter = crypter
+ vc.localPrincipal = params.LocalPrincipal
+ vc.localBlessings = params.LocalBlessings
+ vc.remoteBlessings = params.RemoteBlessings
+ vc.remoteDischarges = params.RemoteDischarges
+ vc.mu.Unlock()
+
+ // Open system flows.
+ if err := vc.connectSystemFlows(); err != nil {
+ return vc.appendCloseReason(err)
+ }
+ vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, params.RemoteBlessings, params.LocalBlessings)
+ return nil
+}
+
+// HandshakeDialedVCNoAuthentication completes initialization of the VC
+// without authentication. Blocks until FinishHandshakeVC is called.
+func (vc *VC) HandshakeDialedVCNoAuthentication(sendSetupVC func() error, opts ...stream.VCOpt) error {
+ remotePubKeyChan := make(chan *crypto.BoxKey, 1)
+ vc.mu.Lock()
+ vc.remotePubKeyChan = remotePubKeyChan
+ vc.mu.Unlock()
+ // We are running in SecurityNone and we don't need to authenticate the VC. But
+ // we still need to negotiate versioning information, so we call sendSetupVC.
+ if err := sendSetupVC(); err != nil {
+ return err
+ }
+ // TODO(mattr): Error on some timeout so a non-responding server doesn't
+ // cause this to hang forever.
+ select {
+ case <-remotePubKeyChan:
+ case <-vc.closeCh:
+ return verror.New(stream.ErrNetwork, nil, verror.New(errClosedDuringHandshake, nil, vc.VCI))
+ }
+
+ ver := vc.Version()
+
+ // Open system flows.
+ //
+ // For backward compatibility, we do not establish any system flow in old RPC versions.
+ // TODO(jhahn): Clean up this once we deprecate RPCVersion10.
+ if ver >= version.RPCVersion11 {
+ if err := vc.connectSystemFlows(); err != nil {
+ return vc.appendCloseReason(err)
+ }
+ }
+ vlog.VI(1).Infof("Client VC %v handshaked with no authentication.", vc)
return nil
}
@@ -574,103 +616,55 @@
Error error // Error, if any, during the handshake.
}
-// HandshakeAcceptedVC completes initialization of the VC (setting up
-// encryption, authentication etc.) under the assumption that the VC was
-// initiated by a remote process (and the local process wishes to "accept" it).
+// HandshakeAcceptedVCWithAuthentication completes initialization of the VC
+// (setting up encryption, authentication etc.) under the assumption that the VC
+// was initiated by a remote process (and the local process wishes to "accept" it).
//
// Since the handshaking process might involve several round trips, a bulk of the work
// is done asynchronously and the result of the handshake is written to the
// channel returned by this method.
//
// 'principal' is the principal used by the server used during authentication.
-// If principal is nil, then the VC expects to be used for unauthenticated, unencrypted communication.
-// 'lBlessings' is presented to the client during authentication.
-func (vc *VC) HandshakeAcceptedVC(
- vers version.RPCVersion,
- principal security.Principal,
- lBlessings security.Blessings,
- exchange crypto.BoxKeyExchanger,
- opts ...stream.ListenerOpt) <-chan HandshakeResult {
+// If principal is nil, then the VC expects to be used for unauthenticated,
+// unencrypted communication. 'lBlessings' is presented to the client during
+// authentication.
+func (vc *VC) HandshakeAcceptedVCWithAuthentication(ver version.RPCVersion, principal security.Principal, lBlessings security.Blessings, exchange crypto.BoxKeyExchanger, opts ...stream.ListenerOpt) <-chan HandshakeResult {
result := make(chan HandshakeResult, 1)
- finish := func(ln stream.Listener, err error) chan HandshakeResult {
- result <- HandshakeResult{ln, err}
- return result
- }
- var (
- dischargeClient DischargeClient
- dischargeExpiryBuffer = DefaultServerDischargeExpiryBuffer
- )
- for _, o := range opts {
- switch v := o.(type) {
- case DischargeClient:
- dischargeClient = v
- case DischargeExpiryBuffer:
- dischargeExpiryBuffer = time.Duration(v)
- }
- }
- // If the listener was setup asynchronously, there is a race between
- // the listener being setup and the caller of this method trying to
- // dispatch messages, thus it is setup synchronously.
- ln, err := vc.Listen()
+ ln, err := vc.initHandshakeAcceptedVC()
if err != nil {
- return finish(nil, err)
- }
- // TODO(mattr): We could instead send counters in the return SetupVC message
- // and avoid this extra message. It probably doesn't make much difference
- // so for now I'll leave it. May be a nice cleanup after we are always
- // using SetupVC.
- vc.helper.AddReceiveBuffers(vc.VCI(), SharedFlowID, DefaultBytesBufferedPerFlow)
-
- // principal == nil means that we are running in SecurityNone and we don't need
- // to authenticate the VC.
- if principal == nil {
- go func() {
- _, err = exchange(nil)
- result <- HandshakeResult{ln, err}
- }()
+ result <- HandshakeResult{nil, err}
return result
}
- var crypter crypto.Crypter
-
go func() {
- sendErr := func(err error) {
- ln.Close()
- result <- HandshakeResult{nil, vc.appendCloseReason(err)}
- }
-
- vc.mu.Lock()
- vc.acceptHandshakeDone = make(chan struct{})
- vc.mu.Unlock()
-
- crypter, err = crypto.NewBoxCrypter(exchange, vc.pool)
+ crypter, err := crypto.NewBoxCrypter(exchange, vc.pool)
if err != nil {
- sendErr(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToSetupEncryption, nil, err)))
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToSetupEncryption, nil, err)), ln, result)
return
}
- // Authenticate (exchange identities)
+ // Authenticate (exchange blessings).
authConn, err := ln.Accept()
if err != nil {
- sendErr(verror.New(stream.ErrNetwork, nil, verror.New(errAuthFlowNotAccepted, nil, err)))
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrNetwork, nil, verror.New(errAuthFlowNotAccepted, nil, err)), ln, result)
return
}
if vc.findFlow(authConn) != AuthFlowID {
- // This should have been an auth flow. We can't establish security so send
- // an error.
- sendErr(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForAuth, nil, err)))
+ // This should have been an auth flow. We can't establish security so send an error.
+ authConn.Close()
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForAuth, nil, err)), ln, result)
return
}
-
- rBlessings, lDischarges, err := AuthenticateAsServer(authConn, principal, lBlessings, dischargeClient, crypter, vers)
+ dischargeClient, dischargeExpiryBuffer := dischargeOptions(opts)
+ rBlessings, lDischarges, err := AuthenticateAsServer(authConn, crypter, ver, principal, lBlessings, dischargeClient)
+ authConn.Close()
if err != nil {
- authConn.Close()
- sendErr(verror.New(stream.ErrSecurity, nil, verror.New(errAuthFailed, nil, err)))
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrSecurity, nil, verror.New(errAuthFailed, nil, err)), ln, result)
return
}
vc.mu.Lock()
- vc.version = vers
+ vc.version = ver
vc.crypter = crypter
vc.localPrincipal = principal
vc.localBlessings = lBlessings
@@ -680,23 +674,135 @@
vc.acceptHandshakeDone = nil
vc.mu.Unlock()
- if len(lBlessings.ThirdPartyCaveats()) > 0 && vers < version.RPCVersion10 {
- go vc.sendDischargesLoop(authConn, dischargeClient, lBlessings.ThirdPartyCaveats(), dischargeExpiryBuffer)
- } else {
- authConn.Close()
- }
-
// Accept system flows.
if err = vc.acceptSystemFlows(ln, dischargeClient, dischargeExpiryBuffer); err != nil {
- sendErr(verror.New(stream.ErrNetwork, nil, verror.New(errFailedToAcceptSystemFlows, nil, err)))
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrNetwork, nil, verror.New(errFailedToAcceptSystemFlows, nil, err)), ln, result)
+ return
}
-
vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
result <- HandshakeResult{ln, nil}
}()
return result
}
+// HandshakeAcceptedVCPreAuthenticated completes initialization of the VC
+// under the assumption that authentication happened out of band. Unlike
+// HandshakeAcceptedVCWithAuthentication or HandshakeAcceptedVCNoAuthentication,
+// this doesn't send a SetupVC response.
+func (vc *VC) HandshakeAcceptedVCPreAuthenticated(ver version.RPCVersion, params security.CallParams, sigPreauth []byte, lPublicKeyPreauth, lPrivateKeyPreauth, rPublicKey *crypto.BoxKey, opts ...stream.ListenerOpt) <-chan HandshakeResult {
+ result := make(chan HandshakeResult, 1)
+ ln, err := vc.initHandshakeAcceptedVC()
+ if err != nil {
+ result <- HandshakeResult{nil, err}
+ return result
+ }
+
+ go func() {
+ crypter := crypto.NewBoxCrypterWithKey(lPublicKeyPreauth, lPrivateKeyPreauth, rPublicKey, vc.pool)
+ if err := verifyClientPrincipalBoundToChannel(sigPreauth, crypter, params.RemoteBlessings.PublicKey()); err != nil {
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrSecurity, nil, verror.New(errAuthFailed, nil, err)), ln, result)
+ return
+ }
+
+ vc.mu.Lock()
+ vc.version = ver
+ vc.crypter = crypter
+ vc.localPrincipal = params.LocalPrincipal
+ vc.localBlessings = params.LocalBlessings
+ vc.remoteBlessings = params.RemoteBlessings
+ vc.localDischarges = params.LocalDischarges
+ close(vc.acceptHandshakeDone)
+ vc.acceptHandshakeDone = nil
+ vc.mu.Unlock()
+
+ // Accept system flows.
+ dischargeClient, dischargeExpiryBuffer := dischargeOptions(opts)
+ if err = vc.acceptSystemFlows(ln, dischargeClient, dischargeExpiryBuffer); err != nil {
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrNetwork, nil, verror.New(errFailedToAcceptSystemFlows, nil, err)), ln, result)
+ return
+ }
+ vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, params.RemoteBlessings, params.LocalBlessings)
+ result <- HandshakeResult{ln, nil}
+ }()
+ return result
+}
+
+// HandshakeAcceptedVCNoAuthentication completes initialization of the VC
+// without authentication.
+func (vc *VC) HandshakeAcceptedVCNoAuthentication(ver version.RPCVersion, sendSetupVC func() error, opts ...stream.ListenerOpt) <-chan HandshakeResult {
+ result := make(chan HandshakeResult, 1)
+ ln, err := vc.initHandshakeAcceptedVC()
+ if err != nil {
+ result <- HandshakeResult{nil, err}
+ return result
+ }
+
+ go func() {
+ // We don't need to authenticate the VC, but we still need to negotiate
+ // versioning information. So we call sendSetupVC.
+ if err := sendSetupVC(); err != nil {
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrNetwork, nil, err), ln, result)
+ return
+ }
+
+ vc.mu.Lock()
+ vc.version = ver
+ close(vc.acceptHandshakeDone)
+ vc.acceptHandshakeDone = nil
+ vc.mu.Unlock()
+
+ // Accept system flows.
+ //
+ // For backward compatibility, we do not establish any system flow in old RPC versions.
+ // TODO(jhahn): Clean up this once we deprecate RPCVersion10.
+ if ver >= version.RPCVersion11 {
+ if err = vc.acceptSystemFlows(ln, nil, 0); err != nil {
+ vc.abortHandshakeAcceptedVC(verror.New(stream.ErrNetwork, nil, verror.New(errFailedToAcceptSystemFlows, nil, err)), ln, result)
+ return
+ }
+ }
+ vlog.VI(1).Infof("Server VC %v handshaked with no authentication.", vc)
+ result <- HandshakeResult{ln, err}
+ }()
+ return result
+}
+
+// initHandshakeAcceptedVC setups a listener and a notification channel
+// synchronously to avoid races.
+//
+// If the listener was setup asynchronously, there is a race between
+// the listener being setup and the caller of this method trying to
+// dispatch messages.
+//
+// If the channel was setup after starting the handshake, there is a race
+// where DispatchPayload() may handle messages before the handshake has been
+// completed.
+func (vc *VC) initHandshakeAcceptedVC() (stream.Listener, error) {
+ ln, err := vc.Listen()
+ if err != nil {
+ return nil, err
+ }
+ // TODO(mattr): We could instead send counters in the return SetupVC message
+ // and avoid this extra message. It probably doesn't make much difference
+ // so for now I'll leave it. May be a nice cleanup after we are always
+ // using SetupVC.
+ vc.helper.AddReceiveBuffers(vc.VCI(), SharedFlowID, DefaultBytesBufferedPerFlow)
+
+ vc.mu.Lock()
+ vc.acceptHandshakeDone = make(chan struct{})
+ vc.mu.Unlock()
+ return ln, nil
+}
+
+func (vc *VC) abortHandshakeAcceptedVC(reason error, ln stream.Listener, result chan<- HandshakeResult) {
+ ln.Close()
+ if vc.acceptHandshakeDone != nil {
+ close(vc.acceptHandshakeDone)
+ vc.acceptHandshakeDone = nil
+ }
+ result <- HandshakeResult{nil, vc.appendCloseReason(reason)}
+}
+
func (vc *VC) sendDischargesLoop(conn io.WriteCloser, dc DischargeClient, tpCavs []security.Caveat, dischargeExpiryBuffer time.Duration) {
defer conn.Close()
if dc == nil {
@@ -781,10 +887,6 @@
vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(conn))
vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(conn))
- if vc.Version() < version.RPCVersion10 {
- return nil
- }
-
vc.mu.Lock()
rBlessings := vc.remoteBlessings
vc.mu.Unlock()
@@ -804,13 +906,13 @@
if err != nil {
return verror.New(errFlowForWireTypeNotAccepted, nil, err)
}
+ if vc.findFlow(conn) != TypeFlowID {
+ // This should have been a type flow.
+ return verror.New(errFailedToCreateFlowForWireType, nil, err)
+ }
vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(conn))
vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(conn))
- if vc.Version() < version.RPCVersion10 {
- return nil
- }
-
vc.mu.Lock()
lBlessings := vc.localBlessings
vc.mu.Unlock()
@@ -822,7 +924,6 @@
}
go vc.sendDischargesLoop(conn, dischargeClient, tpCaveats, dischargeExpiryBuffer)
}
-
return nil
}
@@ -1002,5 +1103,30 @@
ret[id] = d
}
return ret
+}
+func serverAuthorizer(opts []stream.VCOpt) *ServerAuthorizer {
+ for _, o := range opts {
+ switch auth := o.(type) {
+ case *ServerAuthorizer:
+ return auth
+ }
+ }
+ return nil
+}
+
+func dischargeOptions(opts []stream.ListenerOpt) (DischargeClient, time.Duration) {
+ var (
+ dischargeClient DischargeClient
+ dischargeExpiryBuffer = DefaultServerDischargeExpiryBuffer
+ )
+ for _, o := range opts {
+ switch v := o.(type) {
+ case DischargeClient:
+ dischargeClient = v
+ case DischargeExpiryBuffer:
+ dischargeExpiryBuffer = time.Duration(v)
+ }
+ }
+ return dischargeClient, dischargeExpiryBuffer
}
diff --git a/runtime/internal/rpc/stream/vc/vc_test.go b/runtime/internal/rpc/stream/vc/vc_test.go
index 6f01d63..1c08719 100644
--- a/runtime/internal/rpc/stream/vc/vc_test.go
+++ b/runtime/internal/rpc/stream/vc/vc_test.go
@@ -20,7 +20,6 @@
"v.io/v23/context"
"v.io/v23/naming"
- "v.io/v23/options"
"v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/verror"
@@ -48,13 +47,18 @@
const (
// Convenience alias to avoid conflicts between the package name "vc" and variables called "vc".
DefaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow
- // Shorthands
- SecurityNone = options.SecurityNone
- SecurityDefault = options.SecurityConfidential
)
var LatestVersion = iversion.SupportedRange.Max
+type testSecurityLevel int
+
+const (
+ SecurityDefault testSecurityLevel = iota
+ SecurityPreAuthenticated
+ SecurityNone
+)
+
// testFlowEcho writes a random string of 'size' bytes on the flow and then
// ensures that the same string is read back.
func testFlowEcho(t *testing.T, flow stream.Flow, size int) {
@@ -91,12 +95,12 @@
func TestHandshakeNoSecurity(t *testing.T) {
// When the principals are nil, no blessings should be sent over the wire.
- h, vc, err := New(LatestVersion, nil, nil, nil, nil)
- if err != nil {
+ clientH, serverH := newVC()
+ if err := handshakeVCNoAuthentication(LatestVersion, clientH.VC, serverH.VC); err != nil {
t.Fatal(err)
}
- defer h.Close()
- flow, err := vc.Connect()
+ defer clientH.Close()
+ flow, err := clientH.VC.Connect()
if err != nil {
t.Fatal(err)
}
@@ -172,7 +176,7 @@
// Test that mockDischargeClient implements vc.DischargeClient.
var _ vc.DischargeClient = (mockDischargeClient)(nil)
-func TestHandshake(t *testing.T) {
+func testHandshake(t *testing.T, securityLevel testSecurityLevel) {
matchesError := func(got error, want string) error {
if (got == nil) && len(want) == 0 {
return nil
@@ -185,8 +189,8 @@
var (
root = testutil.NewIDProvider("root")
discharger = testutil.NewPrincipal("discharger")
- client = testutil.NewPrincipal()
- server = testutil.NewPrincipal()
+ pclient = testutil.NewPrincipal()
+ pserver = testutil.NewPrincipal()
)
tpcav, err := security.NewPublicKeyCaveat(discharger.PublicKey(), "irrelevant", security.ThirdPartyRequirements{}, security.UnconstrainedUse())
if err != nil {
@@ -197,11 +201,11 @@
t.Fatal(err)
}
// Root blesses the client
- if err := root.Bless(client, "client"); err != nil {
+ if err := root.Bless(pclient, "client"); err != nil {
t.Fatal(err)
}
// Root blesses the server with a third-party caveat
- if err := root.Bless(server, "server", tpcav); err != nil {
+ if err := root.Bless(pserver, "server", tpcav); err != nil {
t.Fatal(err)
}
@@ -213,11 +217,11 @@
flowRemoteDischarges map[string]security.Discharge
}{
{
- flowRemoteBlessings: server.BlessingStore().Default(),
+ flowRemoteBlessings: pserver.BlessingStore().Default(),
},
{
dischargeClient: mockDischargeClient([]security.Discharge{dis}),
- flowRemoteBlessings: server.BlessingStore().Default(),
+ flowRemoteBlessings: pserver.BlessingStore().Default(),
flowRemoteDischarges: map[string]security.Discharge{dis.ID(): dis},
},
{
@@ -226,14 +230,14 @@
Suffix: "suffix",
Method: "method",
Policy: &auth{
- localPrincipal: client,
- remoteBlessings: server.BlessingStore().Default(),
+ localPrincipal: pclient,
+ remoteBlessings: pserver.BlessingStore().Default(),
remoteDischarges: map[string]security.Discharge{dis.ID(): dis},
suffix: "suffix",
method: "method",
},
},
- flowRemoteBlessings: server.BlessingStore().Default(),
+ flowRemoteBlessings: pserver.BlessingStore().Default(),
flowRemoteDischarges: map[string]security.Discharge{dis.ID(): dis},
},
{
@@ -249,29 +253,42 @@
},
}
for i, d := range testdata {
- h, vc, err := New(LatestVersion, client, server, d.dischargeClient, d.auth)
+ clientH, serverH := newVC()
+ var err error
+ switch securityLevel {
+ case SecurityPreAuthenticated:
+ var serverPK, serverSK *crypto.BoxKey
+ if serverPK, serverSK, err = crypto.GenerateBoxKey(); err != nil {
+ t.Fatal(err)
+ }
+ err = handshakeVCPreAuthenticated(LatestVersion, clientH.VC, serverH.VC, pclient, pserver, serverPK, serverSK, d.flowRemoteDischarges, d.dischargeClient, d.auth)
+ case SecurityDefault:
+ err = handshakeVCWithAuthentication(LatestVersion, clientH.VC, serverH.VC, pclient, pserver, d.flowRemoteDischarges, d.dischargeClient, d.auth)
+ }
if merr := matchesError(err, d.dialErr); merr != nil {
t.Errorf("Test #%d: HandshakeDialedVC with server authorizer %#v:: %v", i, d.auth.Policy, merr)
}
if err != nil {
continue
}
- flow, err := vc.Connect()
+ flow, err := clientH.VC.Connect()
if err != nil {
- h.Close()
+ clientH.Close()
t.Errorf("Unable to create flow: %v", err)
continue
}
- if err := testFlowAuthN(flow, d.flowRemoteBlessings, d.flowRemoteDischarges, client.PublicKey()); err != nil {
- h.Close()
+ if err := testFlowAuthN(flow, d.flowRemoteBlessings, d.flowRemoteDischarges, pclient.PublicKey()); err != nil {
+ clientH.Close()
t.Error(err)
continue
}
- h.Close()
+ clientH.Close()
}
}
+func TestHandshakePreAuthenticated(t *testing.T) { testHandshake(t, SecurityPreAuthenticated) }
+func TestHandshake(t *testing.T) { testHandshake(t, SecurityDefault) }
-func testConnect_Small(t *testing.T, version version.RPCVersion, securityLevel options.SecurityLevel) {
+func testConnect_Small(t *testing.T, version version.RPCVersion, securityLevel testSecurityLevel) {
h, vc, err := NewSimple(version, securityLevel)
if err != nil {
t.Fatal(err)
@@ -284,9 +301,12 @@
testFlowEcho(t, flow, 10)
}
func TestConnect_SmallNoSecurity(t *testing.T) { testConnect_Small(t, LatestVersion, SecurityNone) }
-func TestConnect_Small(t *testing.T) { testConnect_Small(t, LatestVersion, SecurityDefault) }
+func TestConnect_SmallPreAuthenticated(t *testing.T) {
+ testConnect_Small(t, LatestVersion, SecurityPreAuthenticated)
+}
+func TestConnect_Small(t *testing.T) { testConnect_Small(t, LatestVersion, SecurityDefault) }
-func testConnect(t *testing.T, securityLevel options.SecurityLevel) {
+func testConnect(t *testing.T, securityLevel testSecurityLevel) {
h, vc, err := NewSimple(LatestVersion, securityLevel)
if err != nil {
t.Fatal(err)
@@ -298,28 +318,14 @@
}
testFlowEcho(t, flow, 10*DefaultBytesBufferedPerFlow)
}
-func TestConnectNoSecurity(t *testing.T) { testConnect(t, SecurityNone) }
-func TestConnect(t *testing.T) { testConnect(t, SecurityDefault) }
-
-func testConnect_Version7(t *testing.T, securityLevel options.SecurityLevel) {
- h, vc, err := NewSimple(LatestVersion, securityLevel)
- if err != nil {
- t.Fatal(err)
- }
- defer h.Close()
- flow, err := vc.Connect()
- if err != nil {
- t.Fatal(err)
- }
- testFlowEcho(t, flow, 10)
-}
-func TestConnect_Version7NoSecurity(t *testing.T) { testConnect_Version7(t, SecurityNone) }
-func TestConnect_Version7(t *testing.T) { testConnect_Version7(t, SecurityDefault) }
+func TestConnectNoSecurity(t *testing.T) { testConnect(t, SecurityNone) }
+func TestConnectPreAuthenticated(t *testing.T) { testConnect(t, SecurityPreAuthenticated) }
+func TestConnect(t *testing.T) { testConnect(t, SecurityDefault) }
// helper function for testing concurrent operations on multiple flows over the
// same VC. Such tests are most useful when running the race detector.
// (go test -race ...)
-func testConcurrentFlows(t *testing.T, securityLevel options.SecurityLevel, flows, gomaxprocs int) {
+func testConcurrentFlows(t *testing.T, securityLevel testSecurityLevel, flows, gomaxprocs int) {
mp := runtime.GOMAXPROCS(gomaxprocs)
defer runtime.GOMAXPROCS(mp)
h, vc, err := NewSimple(LatestVersion, securityLevel)
@@ -345,12 +351,18 @@
}
func TestConcurrentFlows_1NOSecurity(t *testing.T) { testConcurrentFlows(t, SecurityNone, 10, 1) }
-func TestConcurrentFlows_1(t *testing.T) { testConcurrentFlows(t, SecurityDefault, 10, 1) }
+func TestConcurrentFlows_1PreAuthenticated(t *testing.T) {
+ testConcurrentFlows(t, SecurityPreAuthenticated, 10, 1)
+}
+func TestConcurrentFlows_1(t *testing.T) { testConcurrentFlows(t, SecurityDefault, 10, 1) }
func TestConcurrentFlows_10NoSecurity(t *testing.T) { testConcurrentFlows(t, SecurityNone, 10, 10) }
-func TestConcurrentFlows_10(t *testing.T) { testConcurrentFlows(t, SecurityDefault, 10, 10) }
+func TestConcurrentFlows_10PreAuthenticated(t *testing.T) {
+ testConcurrentFlows(t, SecurityPreAuthenticated, 10, 10)
+}
+func TestConcurrentFlows_10(t *testing.T) { testConcurrentFlows(t, SecurityDefault, 10, 10) }
-func testListen(t *testing.T, securityLevel options.SecurityLevel) {
+func testListen(t *testing.T, securityLevel testSecurityLevel) {
h, vc, err := NewSimple(LatestVersion, securityLevel)
if err != nil {
t.Fatal(err)
@@ -398,10 +410,11 @@
t.Errorf("Got (%d, %v) want (0, %v)", n, err, io.EOF)
}
}
-func TestListenNoSecurity(t *testing.T) { testListen(t, SecurityNone) }
-func TestListen(t *testing.T) { testListen(t, SecurityDefault) }
+func TestListenNoSecurity(t *testing.T) { testListen(t, SecurityNone) }
+func TestListenPreAuthenticated(t *testing.T) { testListen(t, SecurityPreAuthenticated) }
+func TestListen(t *testing.T) { testListen(t, SecurityDefault) }
-func testNewFlowAfterClose(t *testing.T, securityLevel options.SecurityLevel) {
+func testNewFlowAfterClose(t *testing.T, securityLevel testSecurityLevel) {
h, _, err := NewSimple(LatestVersion, securityLevel)
if err != nil {
t.Fatal(err)
@@ -413,9 +426,12 @@
}
}
func TestNewFlowAfterCloseNoSecurity(t *testing.T) { testNewFlowAfterClose(t, SecurityNone) }
-func TestNewFlowAfterClose(t *testing.T) { testNewFlowAfterClose(t, SecurityDefault) }
+func TestNewFlowAfterClosePreAuthenticated(t *testing.T) {
+ testNewFlowAfterClose(t, SecurityPreAuthenticated)
+}
+func TestNewFlowAfterClose(t *testing.T) { testNewFlowAfterClose(t, SecurityDefault) }
-func testConnectAfterClose(t *testing.T, securityLevel options.SecurityLevel) {
+func testConnectAfterClose(t *testing.T, securityLevel testSecurityLevel) {
h, vc, err := NewSimple(LatestVersion, securityLevel)
if err != nil {
t.Fatal(err)
@@ -427,7 +443,10 @@
}
}
func TestConnectAfterCloseNoSecurity(t *testing.T) { testConnectAfterClose(t, SecurityNone) }
-func TestConnectAfterClose(t *testing.T) { testConnectAfterClose(t, SecurityDefault) }
+func TestConnectAfterClosePreAuthenticated(t *testing.T) {
+ testConnectAfterClose(t, SecurityPreAuthenticated)
+}
+func TestConnectAfterClose(t *testing.T) { testConnectAfterClose(t, SecurityDefault) }
// helper implements vc.Helper and also sets up a single VC.
type helper struct {
@@ -438,26 +457,33 @@
otherEnd *helper // GUARDED_BY(mu)
}
-func createPrincipals(securityLevel options.SecurityLevel) (client, server security.Principal) {
- if securityLevel == SecurityDefault {
- client = testutil.NewPrincipal("client")
- server = testutil.NewPrincipal("server")
+// NewSimple creates both ends of a VC but returns only the "client" end (i.e.,
+// the one that initiated the VC). The "server" end (the one that "accepted" the
+// VC) listens for flows and simply echoes data read.
+func NewSimple(v version.RPCVersion, securityLevel testSecurityLevel) (*helper, stream.VC, error) {
+ clientH, serverH := newVC()
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ var err error
+ switch securityLevel {
+ case SecurityNone:
+ err = handshakeVCNoAuthentication(v, clientH.VC, serverH.VC)
+ case SecurityPreAuthenticated:
+ serverPK, serverSK, _ := crypto.GenerateBoxKey()
+ err = handshakeVCPreAuthenticated(v, clientH.VC, serverH.VC, pclient, pserver, serverPK, serverSK, nil, nil, nil)
+ case SecurityDefault:
+ err = handshakeVCWithAuthentication(v, clientH.VC, serverH.VC, pclient, pserver, nil, nil, nil)
}
- return
+ if err != nil {
+ clientH.Close()
+ return nil, nil, err
+ }
+ return clientH, clientH.VC, err
}
-// A convenient version of New() with default parameters.
-func NewSimple(v version.RPCVersion, securityLevel options.SecurityLevel) (*helper, stream.VC, error) {
- pclient, pserver := createPrincipals(securityLevel)
- return New(v, pclient, pserver, nil, nil)
-}
-
-// New creates both ends of a VC but returns only the "client" end (i.e., the
-// one that initiated the VC). The "server" end (the one that "accepted" the VC)
-// listens for flows and simply echoes data read.
-func New(v version.RPCVersion, client, server security.Principal, dischargeClient vc.DischargeClient, auth *vc.ServerAuthorizer) (*helper, stream.VC, error) {
- clientH := &helper{bq: drrqueue.New(vc.MaxPayloadSizeBytes)}
- serverH := &helper{bq: drrqueue.New(vc.MaxPayloadSizeBytes)}
+func newVC() (clientH, serverH *helper) {
+ clientH = &helper{bq: drrqueue.New(vc.MaxPayloadSizeBytes)}
+ serverH = &helper{bq: drrqueue.New(vc.MaxPayloadSizeBytes)}
clientH.otherEnd = serverH
serverH.otherEnd = clientH
@@ -485,48 +511,106 @@
go clientH.pipeLoop(serverH.VC)
go serverH.pipeLoop(clientH.VC)
+ return
+}
- var (
- lopts []stream.ListenerOpt
- vcopts []stream.VCOpt
- )
-
+func handshakeVCWithAuthentication(v version.RPCVersion, client, server *vc.VC, pclient, pserver security.Principal, discharges map[string]security.Discharge, dischargeClient vc.DischargeClient, auth *vc.ServerAuthorizer) error {
+ var lopts []stream.ListenerOpt
if dischargeClient != nil {
lopts = append(lopts, dischargeClient)
}
+ var vcopts []stream.VCOpt
if auth != nil {
vcopts = append(vcopts, auth)
}
- var bserver security.Blessings
- if server != nil {
- bserver = server.BlessingStore().Default()
+ clientPK, serverPK := make(chan *crypto.BoxKey, 1), make(chan *crypto.BoxKey, 1)
+ clientSendSetupVC := func(pubKey *crypto.BoxKey) error {
+ clientPK <- pubKey
+ return client.FinishHandshakeDialedVC(v, <-serverPK)
+ }
+ serverExchange := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
+ serverPK <- pubKey
+ return <-clientPK, nil
}
- var clientExchanger func(*crypto.BoxKey) error
- var serverExchanger func(*crypto.BoxKey) (*crypto.BoxKey, error)
-
- serverch, clientch := make(chan *crypto.BoxKey, 1), make(chan *crypto.BoxKey, 1)
- clientExchanger = func(pubKey *crypto.BoxKey) error {
- clientch <- pubKey
- return clientH.VC.FinishHandshakeDialedVC(v, <-serverch)
+ hrCH := server.HandshakeAcceptedVCWithAuthentication(v, pserver, pserver.BlessingStore().Default(), serverExchange, lopts...)
+ if err := client.HandshakeDialedVCWithAuthentication(pclient, clientSendSetupVC, vcopts...); err != nil {
+ go func() { <-hrCH }()
+ return err
}
- serverExchanger = func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
- serverch <- pubKey
- return <-clientch, nil
- }
-
- c := serverH.VC.HandshakeAcceptedVC(v, server, bserver, serverExchanger, lopts...)
- if err := clientH.VC.HandshakeDialedVC(client, clientExchanger, vcopts...); err != nil {
- go func() { <-c }()
- return nil, nil, err
- }
- hr := <-c
+ hr := <-hrCH
if hr.Error != nil {
- return nil, nil, hr.Error
+ return hr.Error
}
go acceptLoop(hr.Listener)
- return clientH, clientH.VC, nil
+ return nil
+}
+
+func handshakeVCPreAuthenticated(v version.RPCVersion, client, server *vc.VC, pclient, pserver security.Principal, serverPK, serverSK *crypto.BoxKey, discharges map[string]security.Discharge, dischargeClient vc.DischargeClient, auth *vc.ServerAuthorizer) error {
+ var lopts []stream.ListenerOpt
+ if dischargeClient != nil {
+ lopts = append(lopts, dischargeClient)
+ }
+ var vcopts []stream.VCOpt
+ if auth != nil {
+ vcopts = append(vcopts, auth)
+ }
+ bserver := pserver.BlessingStore().Default()
+ bclient, _ := pclient.BlessSelf("vcauth")
+
+ clientPK, clientSig := make(chan *crypto.BoxKey, 1), make(chan []byte, 1)
+ serverAccepted := make(chan struct{})
+ sendSetupVC := func(pubKey *crypto.BoxKey, signature []byte) error {
+ clientPK <- pubKey
+ clientSig <- signature
+ // Unlike the real world (in VIF), a message can be delivered to a server before
+ // it handles SetupVC message. So we explictly sync in this test.
+ <-serverAccepted
+ return nil
+ }
+
+ var hrCH <-chan vc.HandshakeResult
+ go func() {
+ params := security.CallParams{LocalPrincipal: pserver, LocalBlessings: bserver, RemoteBlessings: bclient, LocalDischarges: discharges}
+ hrCH = server.HandshakeAcceptedVCPreAuthenticated(v, params, <-clientSig, serverPK, serverSK, <-clientPK, lopts...)
+ close(serverAccepted)
+ }()
+ params := security.CallParams{LocalPrincipal: pclient, LocalBlessings: bclient, RemoteBlessings: bserver, RemoteDischarges: discharges}
+ if err := client.HandshakeDialedVCPreAuthenticated(v, params, serverPK, sendSetupVC, vcopts...); err != nil {
+ go func() { <-hrCH }()
+ return err
+ }
+ hr := <-hrCH
+ if hr.Error != nil {
+ return hr.Error
+ }
+ go acceptLoop(hr.Listener)
+ return nil
+}
+
+func handshakeVCNoAuthentication(v version.RPCVersion, client, server *vc.VC) error {
+ clientCH, serverCH := make(chan struct{}), make(chan struct{})
+ clientSendSetupVC := func() error {
+ close(clientCH)
+ return client.FinishHandshakeDialedVC(v, nil)
+ }
+ serverSendSetupVC := func() error {
+ close(serverCH)
+ return nil
+ }
+
+ hrCH := server.HandshakeAcceptedVCNoAuthentication(v, serverSendSetupVC)
+ if err := client.HandshakeDialedVCNoAuthentication(clientSendSetupVC); err != nil {
+ go func() { <-hrCH }()
+ return err
+ }
+ hr := <-hrCH
+ if hr.Error != nil {
+ return hr.Error
+ }
+ go acceptLoop(hr.Listener)
+ return nil
}
// pipeLoop forwards slices written to h.bq to dst.
diff --git a/runtime/internal/rpc/stream/vif/auth.go b/runtime/internal/rpc/stream/vif/auth.go
index c0590f0..3a1c318 100644
--- a/runtime/internal/rpc/stream/vif/auth.go
+++ b/runtime/internal/rpc/stream/vif/auth.go
@@ -5,12 +5,10 @@
package vif
import (
- "crypto/rand"
"io"
- "golang.org/x/crypto/nacl/box"
-
- rpcversion "v.io/v23/rpc/version"
+ "v.io/v23/naming"
+ "v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/verror"
@@ -19,7 +17,7 @@
"v.io/x/ref/runtime/internal/rpc/stream/crypto"
"v.io/x/ref/runtime/internal/rpc/stream/message"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
- "v.io/x/ref/runtime/internal/rpc/version"
+ iversion "v.io/x/ref/runtime/internal/rpc/version"
)
var (
@@ -34,18 +32,31 @@
nullCipher crypto.NullControlCipher
)
-// privateData includes secret data we need for encryption.
-type privateData struct {
- naclBoxPrivateKey crypto.BoxKey
+// AuthenticationResult includes the result of the VIF authentication.
+type AuthenticationResult struct {
+ Dialed bool
+ Version version.RPCVersion
+ RemoteEndpoint naming.Endpoint
+ SessionKeys SessionKeys
+ LocalBlessings security.Blessings
+ RemoteBlessings security.Blessings
+ LocalDischarges map[string]security.Discharge
+ RemoteDischarges map[string]security.Discharge
}
-// AuthenticateAsClient sends a Setup message if possible. If so, it chooses
+// Public/private keys used to establish an encrypted communication channel
+// (and not the keys corresponding to the principal used in authentication).
+type SessionKeys struct {
+ LocalPublic, LocalPrivate, RemotePublic crypto.BoxKey
+}
+
+// AuthenticateAsClient sends a Setup message if possible. If so, it chooses
// encryption based on the max supported version.
//
// The sequence is initiated by the client.
//
// - The client sends a Setup message to the server, containing the client's
-// supported versions, and the client's crypto options. The Setup message
+// supported versions, and the client's crypto options. The Setup message
// is sent in the clear.
//
// - When the server receives the Setup message, it calls
@@ -55,7 +66,7 @@
// - The client and server use the public/private key pairs
// generated for the Setup messages to create an encrypted stream
// of SetupStream messages for the remainder of the authentication
-// setup. The encyrption uses NewControlCipherRPC6, which is based
+// setup. The encryption uses NewControlCipherRPC11, which is based
// on code.google.com/p/go.crypto/nacl/box.
//
// - Once the encrypted SetupStream channel is setup, the client and
@@ -66,82 +77,101 @@
// modifying the acceptable version ranges downward. This can be addressed by
// including a hash of the Setup message in the encrypted stream. It is
// likely that this will be addressed in subsequent protocol versions.
-func AuthenticateAsClient(writer io.Writer, reader *iobuf.Reader, versions *version.Range, params security.CallParams, auth *vc.ServerAuthorizer) (crypto.ControlCipher, error) {
+func AuthenticateAsClient(writer io.Writer, reader *iobuf.Reader, localEP naming.Endpoint, versions *iversion.Range, principal security.Principal, auth *vc.ServerAuthorizer) (crypto.ControlCipher, *AuthenticationResult, error) {
if versions == nil {
- versions = version.SupportedRange
+ versions = iversion.SupportedRange
}
// Send the client's public data.
- pvt, pub, err := makeSetup(versions, params.LocalPrincipal != nil)
+ setup, pk, sk, err := makeSetup(versions, localEP)
if err != nil {
- return nil, verror.New(stream.ErrSecurity, nil, err)
+ return nil, nil, verror.New(stream.ErrSecurity, nil, err)
}
errch := make(chan error, 1)
go func() {
- errch <- message.WriteTo(writer, pub, nullCipher)
+ errch <- message.WriteTo(writer, setup, nullCipher)
}()
- pmsg, err := message.ReadFrom(reader, nullCipher)
+ remoteMsg, err := message.ReadFrom(reader, nullCipher)
if err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, err)
}
- ppub, ok := pmsg.(*message.Setup)
+ remoteSetup, ok := remoteMsg.(*message.Setup)
if !ok {
- return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
+ return nil, nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
}
// Wait for the write to succeed.
if err := <-errch; err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, err)
}
// Choose the max version in the intersection.
- vrange, err := pub.Versions.Intersect(&ppub.Versions)
+ vrange, err := setup.Versions.Intersect(&remoteSetup.Versions)
if err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, err)
}
- v := vrange.Max
- if params.LocalPrincipal == nil {
- return nullCipher, nil
+ if principal == nil {
+ return nullCipher, nil, nil
}
// Perform the authentication.
- return authenticateAsClient(writer, reader, params, auth, pvt, pub, ppub, v)
-}
+ ver := vrange.Max
+ remoteBox := remoteSetup.NaclBox()
+ if remoteBox == nil {
+ return nil, nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
+ }
+ remoteEP := remoteSetup.PeerEndpoint()
-func authenticateAsClient(writer io.Writer, reader *iobuf.Reader, params security.CallParams, auth *vc.ServerAuthorizer,
- pvt *privateData, pub, ppub *message.Setup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
- pbox := ppub.NaclBox()
- if pbox == nil {
- return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
+ var cipher crypto.ControlCipher
+ switch {
+ case ver < version.RPCVersion11:
+ cipher = crypto.NewControlCipherRPC6(sk, &remoteBox.PublicKey, false)
+ default:
+ cipher = crypto.NewControlCipherRPC11(pk, sk, &remoteBox.PublicKey)
}
- c := crypto.NewControlCipherRPC6(&pbox.PublicKey, &pvt.naclBoxPrivateKey, false)
- sconn := newSetupConn(writer, reader, c)
+ sconn := newSetupConn(writer, reader, cipher)
+ crypter := crypto.NewNullCrypterWithChannelBinding(cipher.ChannelBinding())
+ params := security.CallParams{LocalPrincipal: principal, LocalEndpoint: localEP}
// TODO(jyh): act upon the authentication results.
- _, _, _, err := vc.AuthenticateAsClient(sconn, crypto.NewNullCrypter(), params, auth, version)
+ lBlessings, rBlessings, rDischarges, err := vc.AuthenticateAsClient(sconn, crypter, ver, params, auth)
if err != nil {
- return nil, err
+ return nil, nil, err
}
- return c, nil
+ if ver < version.RPCVersion11 || remoteEP == nil {
+ // We do not return AuthenticationResult for old versions due to a channel binding bug.
+ return cipher, nil, nil
+ }
+
+ authr := AuthenticationResult{
+ Dialed: true,
+ Version: ver,
+ RemoteEndpoint: remoteEP,
+ SessionKeys: SessionKeys{
+ RemotePublic: remoteBox.PublicKey,
+ },
+ LocalBlessings: lBlessings,
+ RemoteBlessings: rBlessings,
+ RemoteDischarges: rDischarges,
+ }
+ return cipher, &authr, nil
}
// AuthenticateAsServer handles a Setup message, choosing authentication
// based on the max common version.
//
// See AuthenticateAsClient for a description of the negotiation.
-func AuthenticateAsServer(writer io.Writer, reader *iobuf.Reader, versions *version.Range, principal security.Principal, lBlessings security.Blessings,
- dc vc.DischargeClient) (crypto.ControlCipher, error) {
- var err error
+func AuthenticateAsServer(writer io.Writer, reader *iobuf.Reader, localEP naming.Endpoint, versions *iversion.Range, principal security.Principal, lBlessings security.Blessings, dc vc.DischargeClient) (crypto.ControlCipher, *AuthenticationResult, error) {
if versions == nil {
- versions = version.SupportedRange
+ versions = iversion.SupportedRange
}
// Send server's public data.
- pvt, pub, err := makeSetup(versions, principal != nil)
+ setup, pk, sk, err := makeSetup(versions, localEP)
if err != nil {
- return nil, err
+ return nil, nil, err
}
errch := make(chan error, 1)
@@ -158,55 +188,88 @@
if principal == nil {
<-readch
}
- err := message.WriteTo(writer, pub, nullCipher)
- errch <- err
+ errch <- message.WriteTo(writer, setup, nullCipher)
}()
// Read client's public data.
- pmsg, err := message.ReadFrom(reader, nullCipher)
+ remoteMsg, err := message.ReadFrom(reader, nullCipher)
close(readch) // Note: we need to close this whether we get an error or not.
if err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, err)
}
- ppub, ok := pmsg.(*message.Setup)
+ remoteSetup, ok := remoteMsg.(*message.Setup)
if !ok {
- return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
+ return nil, nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
}
// Wait for the write to succeed.
if err := <-errch; err != nil {
- return nil, err
+ return nil, nil, err
}
// Choose the max version in the intersection.
- vrange, err := versions.Intersect(&ppub.Versions)
+ vrange, err := versions.Intersect(&remoteSetup.Versions)
if err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, err)
}
- v := vrange.Max
if principal == nil {
- return nullCipher, nil
+ return nullCipher, nil, nil
}
// Perform authentication.
- return authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, pvt, pub, ppub, v)
+ ver := vrange.Max
+ remoteBox := remoteSetup.NaclBox()
+ if remoteBox == nil {
+ return nil, nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
+ }
+ remoteEP := remoteSetup.PeerEndpoint()
+
+ var cipher crypto.ControlCipher
+ switch {
+ case ver < version.RPCVersion11:
+ cipher = crypto.NewControlCipherRPC6(sk, &remoteBox.PublicKey, true)
+ default:
+ cipher = crypto.NewControlCipherRPC11(pk, sk, &remoteBox.PublicKey)
+ }
+ sconn := newSetupConn(writer, reader, cipher)
+ crypter := crypto.NewNullCrypterWithChannelBinding(cipher.ChannelBinding())
+ // TODO(jyh): act upon authentication results.
+ rBlessings, lDischarges, err := vc.AuthenticateAsServer(sconn, crypter, ver, principal, lBlessings, dc)
+ if err != nil {
+ return nil, nil, verror.New(errAuthFailed, nil, err)
+ }
+ if ver < version.RPCVersion11 || remoteEP == nil {
+ // We do not return AuthenticationResult for old versions due to a channel binding bug.
+ return cipher, nil, nil
+ }
+
+ authr := AuthenticationResult{
+ Version: ver,
+ RemoteEndpoint: remoteEP,
+ SessionKeys: SessionKeys{
+ LocalPublic: *pk,
+ LocalPrivate: *sk,
+ },
+ LocalBlessings: lBlessings,
+ RemoteBlessings: rBlessings,
+ LocalDischarges: lDischarges,
+ }
+ return cipher, &authr, nil
}
-func authenticateAsServerRPC6(writer io.Writer, reader *iobuf.Reader, principal security.Principal, lBlessings security.Blessings, dc vc.DischargeClient,
- pvt *privateData, pub, ppub *message.Setup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
- box := ppub.NaclBox()
- if box == nil {
- return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
- }
- c := crypto.NewControlCipherRPC6(&box.PublicKey, &pvt.naclBoxPrivateKey, true)
- sconn := newSetupConn(writer, reader, c)
- // TODO(jyh): act upon authentication results.
- _, _, err := vc.AuthenticateAsServer(sconn, principal, lBlessings, dc, crypto.NewNullCrypter(), version)
+// makeSetup constructs the options that this process can support.
+func makeSetup(versions *iversion.Range, localEP naming.Endpoint) (setup *message.Setup, publicKey, privateKey *crypto.BoxKey, err error) {
+ publicKey, privateKey, err = crypto.GenerateBoxKey()
if err != nil {
- return nil, verror.New(errAuthFailed, nil, err)
+ return nil, nil, nil, err
}
- return c, nil
+ options := []message.SetupOption{&message.NaclBox{PublicKey: *publicKey}}
+ if localEP != nil {
+ options = append(options, &message.PeerEndpoint{LocalEndpoint: localEP})
+ }
+ setup = &message.Setup{Versions: *versions, Options: options}
+ return
}
// getDischargeClient returns the dischargeClient needed to fetch server discharges for this call.
@@ -220,26 +283,3 @@
}
return nil
}
-
-// makeSetup constructs the options that this process can support.
-func makeSetup(versions *version.Range, secure bool) (*privateData, *message.Setup, error) {
- var options []message.SetupOption
- var pvt *privateData
- if secure {
- pubKey, pvtKey, err := box.GenerateKey(rand.Reader)
- if err != nil {
- return nil, nil, err
- }
- options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
- pvt = &privateData{
- naclBoxPrivateKey: *pvtKey,
- }
- }
-
- pub := &message.Setup{
- Versions: *versions,
- Options: options,
- }
-
- return pvt, pub, nil
-}
diff --git a/runtime/internal/rpc/stream/vif/setup_conn_test.go b/runtime/internal/rpc/stream/vif/setup_conn_test.go
index 17622a7..7f96b82 100644
--- a/runtime/internal/rpc/stream/vif/setup_conn_test.go
+++ b/runtime/internal/rpc/stream/vif/setup_conn_test.go
@@ -74,6 +74,8 @@
return nil
}
+func (c *testControlCipher) ChannelBinding() []byte { return nil }
+
// shortConn performs at most 3 bytes of IO at a time.
type shortConn struct {
io.ReadWriteCloser
diff --git a/runtime/internal/rpc/stream/vif/vif.go b/runtime/internal/rpc/stream/vif/vif.go
index 124ce98..1dfd9c6 100644
--- a/runtime/internal/rpc/stream/vif/vif.go
+++ b/runtime/internal/rpc/stream/vif/vif.go
@@ -11,8 +11,10 @@
import (
"bytes"
+ "errors"
"fmt"
"net"
+ "reflect"
"sort"
"strings"
"sync"
@@ -20,6 +22,7 @@
"v.io/v23/context"
"v.io/v23/naming"
+ "v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/v23/vtrace"
@@ -96,7 +99,10 @@
acceptor *upcqueue.T // GUARDED_BY(muListen)
listenerOpts []stream.ListenerOpt // GUARDED_BY(muListen)
principal security.Principal
- blessings security.Blessings
+ // TODO(jhahn): Merge this blessing with the one in authResult once
+ // we fixed to pass blessings to StartAccepting().
+ blessings security.Blessings
+ authResult *AuthenticationResult
muNextVCI sync.Mutex
nextVCI id.VC
@@ -120,10 +126,8 @@
isClosed bool // GUARDED_BY(isClosedMu)
onClose func(*VIF)
- // These counters track the number of messages sent and received by
- // this VIF.
- muMsgCounters sync.Mutex
- msgCounters map[string]int64
+ muStats sync.Mutex
+ stats Stats
}
// ConnectorAndFlow represents a Flow and the Connector that can be used to
@@ -133,6 +137,16 @@
Flow stream.Flow
}
+// Stats holds stats for a VIF.
+type Stats struct {
+ SendMsgCounter map[reflect.Type]uint64
+ RecvMsgCounter map[reflect.Type]uint64
+
+ NumDialedVCs uint
+ NumAcceptedVCs uint
+ NumPreAuthenticated uint
+}
+
// Separate out constants that are not exported so that godoc looks nicer for
// the exported ones.
const (
@@ -151,6 +165,13 @@
sharedFlowID = vc.SharedFlowID
)
+type vifSide bool
+
+const (
+ dialedVIF vifSide = true
+ acceptedVIF vifSide = false
+)
+
// InternalNewDialedVIF creates a new virtual interface over the provided
// network connection, under the assumption that the conn object was created
// using net.Dial. If onClose is given, it is run in its own goroutine when
@@ -169,18 +190,17 @@
}
pool := iobuf.NewPool(0)
reader := iobuf.NewReader(pool, conn)
- params := security.CallParams{LocalPrincipal: principal, LocalEndpoint: localEP(conn, rid, versions)}
+ localEP := localEndpoint(conn, rid, versions)
// TODO(ataly, ashankar, suharshs): Figure out what authorization policy to use
// for authenticating the server during VIF establishment. Note that we cannot
// use the VC.ServerAuthorizer available in 'opts' as that applies to the end
// server and not the remote endpoint of the VIF.
- c, err := AuthenticateAsClient(conn, reader, versions, params, nil)
+ c, authr, err := AuthenticateAsClient(conn, reader, localEP, versions, principal, nil)
if err != nil {
return nil, verror.New(stream.ErrNetwork, ctx, err)
}
var blessings security.Blessings
-
if principal != nil {
blessings = principal.BlessingStore().Default()
}
@@ -191,7 +211,7 @@
startTimeout = v.Duration
}
}
- return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c)
+ return internalNew(conn, pool, reader, localEP, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c, authr)
}
// InternalNewAcceptedVIF creates a new virtual interface over the provided
@@ -208,10 +228,10 @@
func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, blessings security.Blessings, versions *iversion.Range, onClose func(*VIF), lopts ...stream.ListenerOpt) (*VIF, error) {
pool := iobuf.NewPool(0)
reader := iobuf.NewReader(pool, conn)
-
+ localEP := localEndpoint(conn, rid, versions)
dischargeClient := getDischargeClient(lopts)
- c, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
+ c, authr, err := AuthenticateAsServer(conn, reader, localEP, versions, principal, blessings, dischargeClient)
if err != nil {
return nil, err
}
@@ -223,10 +243,10 @@
startTimeout = v.Duration
}
}
- return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c)
+ return internalNew(conn, pool, reader, localEP, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c, authr)
}
-func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
+func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, localEP naming.Endpoint, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher, authr *AuthenticationResult) (*VIF, error) {
var (
// Choose IDs that will not conflict with any other (VC, Flow)
// pairs. VCI 0 is never used by the application (it is
@@ -264,12 +284,14 @@
conn: conn,
pool: pool,
reader: reader,
+ localEP: localEP,
ctrlCipher: c,
vcMap: newVCMap(),
acceptor: acceptor,
listenerOpts: listenerOpts,
principal: principal,
- localEP: localEP(conn, rid, versions),
+ blessings: blessings,
+ authResult: authr,
nextVCI: initialVCI,
outgoing: outgoing,
expressQ: expressQ,
@@ -278,8 +300,7 @@
stopQ: stopQ,
versions: versions,
onClose: onClose,
- msgCounters: make(map[string]int64),
- blessings: blessings,
+ stats: Stats{SendMsgCounter: make(map[reflect.Type]uint64), RecvMsgCounter: make(map[reflect.Type]uint64)},
}
if startTimeout > 0 {
vif.startTimer = newTimer(startTimeout, vif.Close)
@@ -312,35 +333,196 @@
counters := message.NewCounters()
counters.Add(vc.VCI(), sharedFlowID, defaultBytesBufferedPerFlow)
- sendPublicKey := func(pubKey *crypto.BoxKey) error {
- var options []message.SetupOption
- if pubKey != nil {
- options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
+ usePreauth := vif.useVIFAuthForVC(vif.versions.Max, vif.localEP, remoteEP, dialedVIF) &&
+ reflect.DeepEqual(principal.PublicKey(), vif.principal.PublicKey())
+ switch {
+ case usePreauth:
+ preauth := vif.authResult
+ params := security.CallParams{
+ LocalPrincipal: principal,
+ LocalEndpoint: vif.localEP,
+ RemoteEndpoint: preauth.RemoteEndpoint,
+ LocalBlessings: preauth.LocalBlessings,
+ RemoteBlessings: preauth.RemoteBlessings,
+ RemoteDischarges: preauth.RemoteDischarges,
}
- err := vif.sendOnExpressQ(&message.SetupVC{
- VCI: vc.VCI(),
- RemoteEndpoint: remoteEP,
- LocalEndpoint: vif.localEP,
- Counters: counters,
- Setup: message.Setup{
- Versions: *vif.versions,
- Options: options,
- },
- })
- if err != nil {
- err = verror.New(stream.ErrNetwork, nil,
- verror.New(errSendOnExpressQFailed, nil, err))
+ sendSetupVC := func(pubKey *crypto.BoxKey, sigPreauth []byte) error {
+ err := vif.sendOnExpressQ(&message.SetupVC{
+ VCI: vc.VCI(),
+ RemoteEndpoint: remoteEP,
+ LocalEndpoint: vif.localEP,
+ Counters: counters,
+ Setup: message.Setup{
+ Versions: iversion.Range{Min: preauth.Version, Max: preauth.Version},
+ Options: []message.SetupOption{
+ &message.NaclBox{PublicKey: *pubKey},
+ &message.UseVIFAuthentication{sigPreauth},
+ },
+ },
+ })
+ if err != nil {
+ return verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
+ }
+ return nil
}
- return err
+ err = vc.HandshakeDialedVCPreAuthenticated(preauth.Version, params, &preauth.SessionKeys.RemotePublic, sendSetupVC, opts...)
+ case principal == nil:
+ sendSetupVC := func() error {
+ err := vif.sendOnExpressQ(&message.SetupVC{
+ VCI: vc.VCI(),
+ RemoteEndpoint: remoteEP,
+ LocalEndpoint: vif.localEP,
+ Counters: counters,
+ Setup: message.Setup{Versions: *vif.versions},
+ })
+ if err != nil {
+ return verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
+ }
+ return nil
+ }
+ err = vc.HandshakeDialedVCNoAuthentication(sendSetupVC, opts...)
+ default:
+ sendSetupVC := func(pubKey *crypto.BoxKey) error {
+ err := vif.sendOnExpressQ(&message.SetupVC{
+ VCI: vc.VCI(),
+ RemoteEndpoint: remoteEP,
+ LocalEndpoint: vif.localEP,
+ Counters: counters,
+ Setup: message.Setup{
+ Versions: *vif.versions,
+ Options: []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}},
+ },
+ })
+ if err != nil {
+ return verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
+ }
+ return nil
+ }
+ err = vc.HandshakeDialedVCWithAuthentication(principal, sendSetupVC, opts...)
}
- if err = vc.HandshakeDialedVC(principal, sendPublicKey, opts...); err != nil {
+ if err != nil {
vif.deleteVC(vc.VCI())
vc.Close(err)
return nil, err
}
+
+ vif.muStats.Lock()
+ vif.stats.NumDialedVCs++
+ if usePreauth {
+ vif.stats.NumPreAuthenticated++
+ }
+ vif.muStats.Unlock()
return vc, nil
}
+func (vif *VIF) acceptVC(m *message.SetupVC) error {
+ vrange, err := vif.versions.Intersect(&m.Setup.Versions)
+ if err != nil {
+ vlog.VI(2).Infof("SetupVC message %+v to VIF %s did not present compatible versions: %v", m, vif, err)
+ return err
+ }
+ vif.muListen.Lock()
+ closed := vif.acceptor == nil || vif.acceptor.IsClosed()
+ lopts := vif.listenerOpts
+ vif.muListen.Unlock()
+ if closed {
+ vlog.VI(2).Infof("Ignoring SetupVC message %+v as VIF %s does not accept VCs", m, vif)
+ return errors.New("VCs not accepted")
+ }
+ var idleTimeout time.Duration
+ for _, o := range lopts {
+ switch v := o.(type) {
+ case vc.IdleTimeout:
+ idleTimeout = v.Duration
+ }
+ }
+ vcobj, err := vif.newVC(m.VCI, m.RemoteEndpoint, m.LocalEndpoint, idleTimeout, false)
+ if err != nil {
+ return err
+ }
+ vif.distributeCounters(m.Counters)
+
+ var remotePK *crypto.BoxKey
+ if box := m.Setup.NaclBox(); box != nil {
+ remotePK = &box.PublicKey
+ }
+ sigPreauth := m.Setup.UseVIFAuthentication()
+ var hrCH <-chan vc.HandshakeResult
+ switch {
+ case len(sigPreauth) > 0:
+ if !vif.useVIFAuthForVC(vrange.Max, m.RemoteEndpoint, m.LocalEndpoint, acceptedVIF) {
+ vlog.VI(2).Infof("Ignoring SetupVC message %+v as VIF %s does not allow re-using VIF authentication for this VC", m, vif)
+ return errors.New("VCs not accepted: cannot re-use VIF authentication for this VC")
+ }
+ preauth := vif.authResult
+ params := security.CallParams{
+ LocalPrincipal: vif.principal,
+ LocalBlessings: vif.blessings,
+ RemoteBlessings: preauth.RemoteBlessings,
+ LocalDischarges: preauth.LocalDischarges,
+ }
+ hrCH = vcobj.HandshakeAcceptedVCPreAuthenticated(preauth.Version, params, sigPreauth, &preauth.SessionKeys.LocalPublic, &preauth.SessionKeys.LocalPrivate, remotePK, lopts...)
+ case vif.principal == nil:
+ sendSetupVC := func() error {
+ err = vif.sendOnExpressQ(&message.SetupVC{
+ VCI: m.VCI,
+ Setup: message.Setup{Versions: *vrange},
+ RemoteEndpoint: m.LocalEndpoint,
+ LocalEndpoint: vif.localEP,
+ })
+ return err
+ }
+ hrCH = vcobj.HandshakeAcceptedVCNoAuthentication(vrange.Max, sendSetupVC, lopts...)
+ default:
+ exchanger := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
+ err = vif.sendOnExpressQ(&message.SetupVC{
+ VCI: m.VCI,
+ Setup: message.Setup{
+ // Note that servers send clients not their actual supported versions,
+ // but the intersected range of the server and client ranges. This
+ // is important because proxies may have adjusted the version ranges
+ // along the way, and we should negotiate a version that is compatible
+ // with all intermediate hops.
+ Versions: *vrange,
+ Options: []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}},
+ },
+ RemoteEndpoint: m.LocalEndpoint,
+ LocalEndpoint: vif.localEP,
+ // TODO(mattr): Consider adding counters. See associated comment in
+ // vc.initHandshakeAcceptedVC for more details. Note that we need to send
+ // AddReceiveBuffers message when reusing VIF authentication since servers
+ // doesn't send a reply for SetupVC.
+ })
+ return remotePK, err
+ }
+ hrCH = vcobj.HandshakeAcceptedVCWithAuthentication(vrange.Max, vif.principal, vif.blessings, exchanger, lopts...)
+ }
+ go vif.acceptFlowsLoop(vcobj, hrCH)
+
+ vif.muStats.Lock()
+ vif.stats.NumAcceptedVCs++
+ if len(sigPreauth) > 0 {
+ vif.stats.NumPreAuthenticated++
+ }
+ vif.muStats.Unlock()
+ return nil
+}
+
+func (vif *VIF) useVIFAuthForVC(ver version.RPCVersion, localEP, remoteEP naming.Endpoint, side vifSide) bool {
+ dialed := side == dialedVIF
+ if vif.authResult == nil || vif.authResult.Dialed != dialed || vif.authResult.Version != ver {
+ return false
+ }
+ // We allow to use the VIF authentication when the routing ID is null, since it
+ // means that this VIF is connected to the peer directly with a hostname and port.
+ if dialed {
+ return naming.Compare(vif.authResult.RemoteEndpoint.RoutingID(), remoteEP.RoutingID()) ||
+ naming.Compare(remoteEP.RoutingID(), naming.NullRoutingID)
+ }
+ return naming.Compare(vif.authResult.RemoteEndpoint.RoutingID(), remoteEP.RoutingID()) &&
+ (naming.Compare(vif.localEP.RoutingID(), localEP.RoutingID()) || naming.Compare(localEP.RoutingID(), naming.NullRoutingID))
+}
+
// Close closes all VCs (and thereby Flows) over the VIF and then closes the
// underlying network connection after draining all pending writes on those
// VCs.
@@ -465,9 +647,10 @@
// handleMessage handles a single incoming message. Any error returned is
// fatal, causing the VIF to close.
func (vif *VIF) handleMessage(msg message.T) error {
- vif.muMsgCounters.Lock()
- vif.msgCounters[fmt.Sprintf("Recv(%T)", msg)]++
- vif.muMsgCounters.Unlock()
+ mtype := reflect.TypeOf(msg)
+ vif.muStats.Lock()
+ vif.stats.RecvMsgCounter[mtype]++
+ vif.muStats.Unlock()
switch m := msg.(type) {
@@ -483,107 +666,6 @@
m.Release()
}
- case *message.SetupVC:
- // First, find the public key we need out of the message.
- var theirPK *crypto.BoxKey
- box := m.Setup.NaclBox()
- if box != nil {
- theirPK = &box.PublicKey
- }
-
- // If we dialed this VC, then this is a response and we should finish
- // the vc handshake. Otherwise, this message is opening a new VC.
- if vif.dialedVCI(m.VCI) {
- vif.distributeCounters(m.Counters)
- if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
- intersection, err := vif.versions.Intersect(&m.Setup.Versions)
- if err != nil {
- vif.closeVCAndSendMsg(vc, false, err)
- } else if err := vc.FinishHandshakeDialedVC(intersection.Max, theirPK); err != nil {
- vif.closeVCAndSendMsg(vc, false, err)
- }
- return nil
- }
- vlog.VI(2).Infof("Ignoring SetupVC message %+v for unknown dialed VC", m)
- return nil
- }
-
- // This is an accepted VC.
- intersection, err := vif.versions.Intersect(&m.Setup.Versions)
- if err != nil {
- vlog.VI(2).Infof("SetupVC message %+v to VIF %s did not present compatible versions: %v", m, vif, err)
- vif.sendOnExpressQ(&message.CloseVC{
- VCI: m.VCI,
- Error: err.Error(),
- })
- return nil
- }
- vif.muListen.Lock()
- closed := vif.acceptor == nil || vif.acceptor.IsClosed()
- lopts := vif.listenerOpts
- vif.muListen.Unlock()
- if closed {
- vlog.VI(2).Infof("Ignoring SetupVC message %+v as VIF %s does not accept VCs", m, vif)
- vif.sendOnExpressQ(&message.CloseVC{
- VCI: m.VCI,
- Error: "VCs not accepted",
- })
- return nil
- }
- var idleTimeout time.Duration
- for _, o := range lopts {
- switch v := o.(type) {
- case vc.IdleTimeout:
- idleTimeout = v.Duration
- }
- }
- vc, err := vif.newVC(m.VCI, m.RemoteEndpoint, m.LocalEndpoint, idleTimeout, false)
- if err != nil {
- vif.sendOnExpressQ(&message.CloseVC{
- VCI: m.VCI,
- Error: err.Error(),
- })
- return nil
- }
- vif.distributeCounters(m.Counters)
- keyExchanger := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
- var options []message.SetupOption
- if pubKey != nil {
- options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
- }
- err = vif.sendOnExpressQ(&message.SetupVC{
- VCI: m.VCI,
- Setup: message.Setup{
- // Note that servers send clients not their actual supported versions,
- // but the intersected range of the server and client ranges. This
- // is important because proxies may have adjusted the version ranges
- // along the way, and we should negotiate a version that is compatible
- // with all intermediate hops.
- Versions: *intersection,
- Options: options,
- },
- RemoteEndpoint: m.LocalEndpoint,
- LocalEndpoint: vif.localEP,
- // TODO(mattr): Consider adding counters. See associated comment
- // in vc.go:VC.HandshakeAcceptedVC for more details.
- })
- return theirPK, err
- }
- go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(intersection.Max, vif.principal, vif.blessings, keyExchanger, lopts...))
-
- case *message.CloseVC:
- if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
- vif.deleteVC(vc.VCI())
- vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif)
- // TODO(cnicolaou): it would be nice to have a method on VC
- // to indicate a 'remote close' rather than a 'local one'. This helps
- // with error reporting since we expect reads/writes to occur
- // after a remote close, but not after a local close.
- vc.Close(verror.New(stream.ErrNetwork, nil, verror.New(errRemoteEndClosedVC, nil, m.Error)))
- return nil
- }
- vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
-
case *message.AddReceiveBuffers:
vif.distributeCounters(m.Counters)
@@ -601,6 +683,49 @@
}
vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif)
+ case *message.SetupVC:
+ // If we dialed this VC, then this is a response and we should finish
+ // the vc handshake. Otherwise, this message is opening a new VC.
+ if vif.dialedVCI(m.VCI) {
+ vif.distributeCounters(m.Counters)
+ vc, _, _ := vif.vcMap.Find(m.VCI)
+ if vc == nil {
+ vlog.VI(2).Infof("Ignoring SetupVC message %+v for unknown dialed VC", m)
+ return nil
+ }
+ vrange, err := vif.versions.Intersect(&m.Setup.Versions)
+ if err != nil {
+ vif.closeVCAndSendMsg(vc, false, err)
+ return nil
+ }
+ var remotePK *crypto.BoxKey
+ if box := m.Setup.NaclBox(); box != nil {
+ remotePK = &box.PublicKey
+ }
+ if err := vc.FinishHandshakeDialedVC(vrange.Max, remotePK); err != nil {
+ vif.closeVCAndSendMsg(vc, false, err)
+ }
+ return nil
+ }
+ // This is an accepted VC.
+ if err := vif.acceptVC(m); err != nil {
+ vif.sendOnExpressQ(&message.CloseVC{VCI: m.VCI, Error: err.Error()})
+ }
+ return nil
+
+ case *message.CloseVC:
+ if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
+ vif.deleteVC(vc.VCI())
+ vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif)
+ // TODO(cnicolaou): it would be nice to have a method on VC
+ // to indicate a 'remote close' rather than a 'local one'. This helps
+ // with error reporting since we expect reads/writes to occur
+ // after a remote close, but not after a local close.
+ vc.Close(verror.New(stream.ErrNetwork, nil, verror.New(errRemoteEndClosedVC, nil, m.Error)))
+ return nil
+ }
+ vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
+
case *message.Setup:
vlog.Infof("Ignoring redundant Setup message %T on VIF %s", m, vif)
@@ -698,9 +823,10 @@
vlog.VI(1).Infof("Exiting writeLoop of VIF %s because of bqueue.Get error: %v", vif, err)
return
}
- vif.muMsgCounters.Lock()
- vif.msgCounters[fmt.Sprintf("Send(%T)", writer)]++
- vif.muMsgCounters.Unlock()
+ wtype := reflect.TypeOf(writer)
+ vif.muStats.Lock()
+ vif.stats.SendMsgCounter[wtype]++
+ vif.muStats.Unlock()
switch writer {
case vif.expressQ:
for _, b := range bufs {
@@ -872,7 +998,10 @@
}
func (vif *VIF) dialedVCI(VCI id.VC) bool {
- return vif.nextVCI%2 == VCI%2
+ vif.muNextVCI.Lock()
+ dialed := vif.nextVCI%2 == VCI%2
+ vif.muNextVCI.Unlock()
+ return dialed
}
func (vif *VIF) allocVCI() id.VC {
@@ -883,21 +1012,20 @@
return ret
}
-func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, idleTimeout time.Duration, dialed bool) (*vc.VC, error) {
+func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, idleTimeout time.Duration, side vifSide) (*vc.VC, error) {
vif.muStartTimer.Lock()
if vif.startTimer != nil {
vif.startTimer.Stop()
vif.startTimer = nil
}
vif.muStartTimer.Unlock()
- macSize := vif.ctrlCipher.MACSize()
vc := vc.InternalNew(vc.Params{
VCI: vci,
- Dialed: dialed,
+ Dialed: side == dialedVIF,
LocalEP: localEP,
RemoteEP: remoteEP,
Pool: vif.pool,
- ReserveBytes: uint(message.HeaderSizeBytes + macSize),
+ ReserveBytes: uint(message.HeaderSizeBytes + vif.ctrlCipher.MACSize()),
Helper: vcHelper{vif},
})
added, rq, wq := vif.vcMap.Insert(vc)
@@ -980,30 +1108,55 @@
// NumVCs returns the number of VCs established over this VIF.
func (vif *VIF) NumVCs() int { return vif.vcMap.Size() }
+// Stats returns the current stats of this VIF.
+func (vif *VIF) Stats() Stats {
+ stats := Stats{SendMsgCounter: make(map[reflect.Type]uint64), RecvMsgCounter: make(map[reflect.Type]uint64)}
+ vif.muStats.Lock()
+ for k, v := range vif.stats.SendMsgCounter {
+ stats.SendMsgCounter[k] = v
+ }
+ for k, v := range vif.stats.RecvMsgCounter {
+ stats.RecvMsgCounter[k] = v
+ }
+ stats.NumDialedVCs = vif.stats.NumDialedVCs
+ stats.NumAcceptedVCs = vif.stats.NumAcceptedVCs
+ stats.NumPreAuthenticated = vif.stats.NumPreAuthenticated
+ vif.muStats.Unlock()
+ return stats
+}
+
// DebugString returns a descriptive state of the VIF.
//
// The returned string is meant for consumptions by humans. The specific format
// should not be relied upon by any automated processing.
func (vif *VIF) DebugString() string {
- vcs := vif.vcMap.List()
- l := make([]string, 0, len(vcs)+1)
-
- vif.muNextVCI.Lock() // Needed for vif.nextVCI
- l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.ctrlCipher != nullCipher, vif.isClosed))
+ vif.muNextVCI.Lock()
+ nextVCI := vif.nextVCI
vif.muNextVCI.Unlock()
+ vif.isClosedMu.Lock()
+ isClosed := vif.isClosed
+ vif.isClosedMu.Unlock()
+ vcs := vif.vcMap.List()
+ stats := vif.Stats()
+
+ l := make([]string, 0, 2+len(vcs)+len(stats.SendMsgCounter)+len(stats.RecvMsgCounter))
+ l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%t IsClosed:%t #Dialed:%d #Accepted:%d #PreAuthenticated:%d",
+ vif, len(vcs), nextVCI, vif.ctrlCipher != nullCipher, isClosed, stats.NumDialedVCs, stats.NumAcceptedVCs, stats.NumPreAuthenticated))
+
+ l = append(l, "Message Counters:")
+ msgStats := make([]string, 0, len(stats.SendMsgCounter)+len(stats.RecvMsgCounter))
+ for k, v := range stats.SendMsgCounter {
+ msgStats = append(msgStats, fmt.Sprintf(" %-32s %10d", "Send("+k.String()+")", v))
+ }
+ for k, v := range stats.RecvMsgCounter {
+ msgStats = append(msgStats, fmt.Sprintf(" %-32s %10d", "Recv("+k.String()+")", v))
+ }
+ sort.Strings(msgStats)
+ l = append(l, msgStats...)
for _, vc := range vcs {
l = append(l, vc.DebugString())
}
-
- l = append(l, "Message Counters:")
- ctrs := len(l)
- vif.muMsgCounters.Lock()
- for k, v := range vif.msgCounters {
- l = append(l, fmt.Sprintf(" %-32s %10d", k, v))
- }
- vif.muMsgCounters.Unlock()
- sort.Strings(l[ctrs:])
return strings.Join(l, "\n")
}
@@ -1055,7 +1208,7 @@
}
}
-// localEP creates a naming.Endpoint from the provided parameters.
+// localEndpoint creates a naming.Endpoint from the provided parameters.
//
// It intentionally does not include any blessings (present in endpoints in the
// v5 format). At this point it is not clear whether the endpoint is being
@@ -1070,7 +1223,7 @@
// The addition of the endpoints is left as an excercise to higher layers of
// the stack, where the desire to share or hide blessings from the endpoint is
// clearer.
-func localEP(conn net.Conn, rid naming.RoutingID, versions *iversion.Range) naming.Endpoint {
+func localEndpoint(conn net.Conn, rid naming.RoutingID, versions *iversion.Range) naming.Endpoint {
localAddr := conn.LocalAddr()
ep := &inaming.Endpoint{
Protocol: localAddr.Network(),
diff --git a/runtime/internal/rpc/stream/vif/vif_test.go b/runtime/internal/rpc/stream/vif/vif_test.go
index fd820ff..1a6aa24 100644
--- a/runtime/internal/rpc/stream/vif/vif_test.go
+++ b/runtime/internal/rpc/stream/vif/vif_test.go
@@ -22,6 +22,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
+ "v.io/v23/security"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
@@ -34,10 +35,12 @@
//go:generate v23 test generate
func TestSingleFlowCreatedAtClient(t *testing.T) {
- client, server := NewClientServer()
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ client, server := NewClientServer(pclient, pserver)
defer client.Close()
- clientVC, _, err := createVC(client, server, makeEP(0x5))
+ clientVC, _, err := createVC(client, server, pclient, makeEP(0x5))
if err != nil {
t.Fatal(err)
}
@@ -56,10 +59,12 @@
}
func TestSingleFlowCreatedAtServer(t *testing.T) {
- client, server := NewClientServer()
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ client, server := NewClientServer(pclient, pserver)
defer client.Close()
- clientVC, serverConnector, err := createVC(client, server, makeEP(0x5))
+ clientVC, serverConnector, err := createVC(client, server, pclient, makeEP(0x5))
if err != nil {
t.Fatal(err)
}
@@ -100,13 +105,16 @@
mp := runtime.GOMAXPROCS(gomaxprocs)
defer runtime.GOMAXPROCS(mp)
- client, server := NewClientServer()
+
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ client, server := NewClientServer(pclient, pserver)
defer client.Close()
// Create all the VCs
// clientVCs[i] is the VC at the client process
// serverConnectors[i] is the corresponding VC at the server process.
- clientVCs, serverConnectors, err := createNVCs(client, server, 0, nVCs)
+ clientVCs, serverConnectors, err := createNVCs(client, server, pclient, 0, nVCs)
if err != nil {
t.Fatal(err)
}
@@ -232,8 +240,10 @@
}
func TestClose(t *testing.T) {
- client, server := NewClientServer()
- vc, _, err := createVC(client, server, makeEP(0x5))
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ client, server := NewClientServer(pclient, pserver)
+ vc, _, err := createVC(client, server, pclient, makeEP(0x5))
if err != nil {
t.Fatal(err)
}
@@ -265,12 +275,14 @@
}
func TestOnClose(t *testing.T) {
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
notifyC, notifyS := make(chan *vif.VIF), make(chan *vif.VIF)
notifyFuncC := func(vf *vif.VIF) { notifyC <- vf }
notifyFuncS := func(vf *vif.VIF) { notifyS <- vf }
// Close the client VIF. Both client and server should be notified.
- client, server, err := New(nil, nil, notifyFuncC, notifyFuncS, nil, nil)
+ client, server, err := New(nil, nil, pclient, pserver, notifyFuncC, notifyFuncS, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -283,7 +295,7 @@
}
// Same as above, but close the server VIF at this time.
- client, server, err = New(nil, nil, notifyFuncC, notifyFuncS, nil, nil)
+ client, server, err = New(nil, nil, pclient, pserver, notifyFuncC, notifyFuncS, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -301,12 +313,14 @@
waitTime = 5 * time.Millisecond
)
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
notify := make(chan interface{})
notifyFunc := func(vf *vif.VIF) { notify <- vf }
newVIF := func() (vf, remote *vif.VIF) {
var err error
- vf, remote, err = New(nil, nil, notifyFunc, notifyFunc, nil, nil)
+ vf, remote, err = New(nil, nil, pclient, pserver, notifyFunc, notifyFunc, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -327,7 +341,7 @@
// Open one VC. Should not be closed.
vf, remote = newVIF()
- if _, _, err := createVC(vf, remote, makeEP(0x10)); err != nil {
+ if _, _, err := createVC(vf, remote, pclient, makeEP(0x10)); err != nil {
t.Fatal(err)
}
if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
@@ -342,7 +356,7 @@
// Same as above, but open a VC from the remote side.
vf, remote = newVIF()
- _, _, err := createVC(remote, vf, makeEP(0x10))
+ _, _, err := createVC(remote, vf, pclient, makeEP(0x10))
if err != nil {
t.Fatal(err)
}
@@ -356,7 +370,7 @@
// Create two VCs.
vf, remote = newVIF()
- if _, _, err := createNVCs(vf, remote, 0x10, 2); err != nil {
+ if _, _, err := createNVCs(vf, remote, pclient, 0x10, 2); err != nil {
t.Fatal(err)
}
@@ -385,6 +399,8 @@
waitTime = 150 * time.Millisecond
)
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
notify := make(chan interface{})
notifyFunc := func(vf *vif.VIF) { notify <- vf }
@@ -395,7 +411,7 @@
vfStartTime, remoteStartTime = remoteStartTime, vfStartTime
}
var err error
- vf, remote, err = New(nil, nil, notifyFunc, notifyFunc, []stream.VCOpt{vc.StartTimeout{vfStartTime}}, []stream.ListenerOpt{vc.StartTimeout{remoteStartTime}})
+ vf, remote, err = New(nil, nil, pclient, pserver, notifyFunc, notifyFunc, []stream.VCOpt{vc.StartTimeout{vfStartTime}}, []stream.ListenerOpt{vc.StartTimeout{remoteStartTime}})
if err != nil {
t.Fatal(err)
}
@@ -417,7 +433,7 @@
// Open one VC. Should not be closed.
vf, remote, triggerTimers = newVIF()
- if _, _, err := createVC(vf, remote, makeEP(0x10)); err != nil {
+ if _, _, err := createVC(vf, remote, pclient, makeEP(0x10)); err != nil {
t.Fatal(err)
}
triggerTimers()
@@ -441,12 +457,14 @@
waitTime = idleTime * 2
)
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
notify := make(chan interface{})
notifyFunc := func(vf *vif.VIF) { notify <- vf }
newVIF := func() (vf, remote *vif.VIF) {
var err error
- if vf, remote, err = New(nil, nil, notifyFunc, notifyFunc, nil, nil); err != nil {
+ if vf, remote, err = New(nil, nil, pclient, pserver, notifyFunc, notifyFunc, nil, nil); err != nil {
t.Fatal(err)
}
if err = vf.StartAccepting(); err != nil {
@@ -461,7 +479,7 @@
triggerTimers := vif.SetFakeTimers()
defer triggerTimers()
var err error
- VC, remoteVC, err = createVC(vf, remote, makeEP(0x10), vc.IdleTimeout{idleTime})
+ VC, remoteVC, err = createVC(vf, remote, pclient, makeEP(0x10), vc.IdleTimeout{idleTime})
if err != nil {
t.Fatal(err)
}
@@ -489,7 +507,7 @@
// Same as above, but with multiple VCs.
vf, remote = newVIF()
triggerTimers := vif.SetFakeTimers()
- if _, _, err := createNVCs(vf, remote, 0x10, 5, vc.IdleTimeout{idleTime}); err != nil {
+ if _, _, err := createNVCs(vf, remote, pclient, 0x10, 5, vc.IdleTimeout{idleTime}); err != nil {
t.Fatal(err)
}
triggerTimers()
@@ -551,7 +569,9 @@
func TestIdleTimeoutServer(t *testing.T) { testIdleTimeout(t, true) }
func TestShutdownVCs(t *testing.T) {
- client, server := NewClientServer()
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ client, server := NewClientServer(pclient, pserver)
defer server.Close()
defer client.Close()
@@ -563,19 +583,19 @@
return nil
}
- if _, _, err := createVC(client, server, makeEP(0x5)); err != nil {
+ if _, _, err := createVC(client, server, pclient, makeEP(0x5)); err != nil {
t.Fatal(err)
}
if err := testN(1); err != nil {
t.Error(err)
}
- if _, _, err := createVC(client, server, makeEP(0x5)); err != nil {
+ if _, _, err := createVC(client, server, pclient, makeEP(0x5)); err != nil {
t.Fatal(err)
}
if err := testN(2); err != nil {
t.Error(err)
}
- if _, _, err := createVC(client, server, makeEP(0x7)); err != nil {
+ if _, _, err := createVC(client, server, pclient, makeEP(0x7)); err != nil {
t.Fatal(err)
}
if err := testN(3); err != nil {
@@ -612,7 +632,9 @@
}
func (tc *versionTestCase) Run(t *testing.T) {
- client, server, err := NewVersionedClientServer(tc.client, tc.server)
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ client, server, err := NewVersionedClientServer(tc.client, tc.server, pclient, pserver)
if (err != nil) != tc.expectVIFError {
t.Errorf("Error mismatch. Wanted error: %v, got %v; client: %v, server: %v", tc.expectVIFError, err, tc.client, tc.server)
}
@@ -626,7 +648,7 @@
Address: "addr",
RID: naming.FixedRoutingID(0x5),
}
- clientVC, _, err := createVC(client, server, ep)
+ clientVC, _, err := createVC(client, server, pclient, ep)
if (err != nil) != tc.expectError {
t.Errorf("Error mismatch. Wanted error: %v, got %v (client:%v, server:%v ep:%v)", tc.expectError, err, tc.client, tc.server, tc.ep)
@@ -666,25 +688,27 @@
}
func TestNetworkFailure(t *testing.T) {
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
c1, c2 := pipe()
result := make(chan *vif.VIF)
- pclient := testutil.NewPrincipal("client")
+ closed := make(chan struct{})
go func() {
- client, err := vif.InternalNewDialedVIF(c1, naming.FixedRoutingID(0xc), pclient, nil, nil)
+ client, err := vif.InternalNewDialedVIF(c1, naming.FixedRoutingID(0xc), pclient, nil, func(vf *vif.VIF) { close(closed) })
if err != nil {
t.Fatal(err)
}
result <- client
}()
- pserver := testutil.NewPrincipal("server")
- blessings := pserver.BlessingStore().Default()
- server, err := vif.InternalNewAcceptedVIF(c2, naming.FixedRoutingID(0x5), pserver, blessings, nil, nil)
+ server, err := vif.InternalNewAcceptedVIF(c2, naming.FixedRoutingID(0x5), pserver, pserver.BlessingStore().Default(), nil, nil)
if err != nil {
t.Fatal(err)
}
client := <-result
// If the network connection dies, Dial and Accept should fail.
c1.Close()
+ // Wait until the VIF is closed, since Dial() may run before the underlying VC is closed.
+ <-closed
if _, err := client.Dial(makeEP(0x5), pclient); err == nil {
t.Errorf("Expected client.Dial to fail")
}
@@ -693,6 +717,67 @@
}
}
+func TestPreAuthentication(t *testing.T) {
+ pclient := testutil.NewPrincipal("client")
+ pserver := testutil.NewPrincipal("server")
+ client, server := NewClientServer(pclient, pserver)
+ defer client.Close()
+
+ check := func(numVCs, numPreAuth uint) error {
+ stats := client.Stats()
+ if stats.NumDialedVCs != numVCs {
+ return fmt.Errorf("Unexpected NumDialedVCs in client; got %d want %d", stats.NumDialedVCs, numVCs)
+ }
+ if stats.NumPreAuthenticated != numPreAuth {
+ return fmt.Errorf("Unexpected NumPreAuthenticated in client; got %d want %d", stats.NumPreAuthenticated, numPreAuth)
+ }
+ stats = server.Stats()
+ if stats.NumAcceptedVCs != numVCs {
+ return fmt.Errorf("Unexpected NumAcceptedVCs in server; got %d want %d", stats.NumAcceptedVCs, numVCs)
+ }
+ if stats.NumPreAuthenticated != numPreAuth {
+ return fmt.Errorf("Unexpected PreAuthUsed in server; got %d want %d", stats.NumPreAuthenticated, numPreAuth)
+ }
+ return nil
+ }
+
+ // Use a different routing ID. Should not use pre-auth.
+ _, _, err := createVC(client, server, pclient, makeEP(0x55))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := check(1, 0); err != nil {
+ t.Error(err)
+ }
+
+ // Use the same routing ID. Should use pre-auth.
+ _, _, err = createVC(client, server, pclient, makeEP(0x5))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := check(2, 1); err != nil {
+ t.Error(err)
+ }
+
+ // Use the null routing ID. Should use VIF pre-auth.
+ _, _, err = createVC(client, server, pclient, makeEP(0x0))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := check(3, 2); err != nil {
+ t.Error(err)
+ }
+
+ // Use a different principal. Should not use pre-auth.
+ _, _, err = createVC(client, server, testutil.NewPrincipal("client2"), makeEP(0x5))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := check(4, 2); err != nil {
+ t.Error(err)
+ }
+}
+
func makeEP(rid uint64) naming.Endpoint {
return &inaming.Endpoint{
Protocol: "test",
@@ -771,25 +856,25 @@
return p1, p2
}
-func NewClientServer() (client, server *vif.VIF) {
+func NewClientServer(pclient, pserver security.Principal) (client, server *vif.VIF) {
var err error
- client, server, err = New(nil, nil, nil, nil, nil, nil)
+ client, server, err = New(nil, nil, pclient, pserver, nil, nil, nil, nil)
if err != nil {
panic(err)
}
return
}
-func NewVersionedClientServer(clientVersions, serverVersions *iversion.Range) (client, server *vif.VIF, verr error) {
- return New(clientVersions, serverVersions, nil, nil, nil, nil)
+func NewVersionedClientServer(clientVersions, serverVersions *iversion.Range, pclient, pserver security.Principal) (client, server *vif.VIF, verr error) {
+ return New(clientVersions, serverVersions, pclient, pserver, nil, nil, nil, nil)
}
-func New(clientVersions, serverVersions *iversion.Range, clientOnClose, serverOnClose func(*vif.VIF), opts []stream.VCOpt, lopts []stream.ListenerOpt) (client, server *vif.VIF, verr error) {
+func New(clientVersions, serverVersions *iversion.Range, pclient, pserver security.Principal, clientOnClose, serverOnClose func(*vif.VIF), opts []stream.VCOpt, lopts []stream.ListenerOpt) (client, server *vif.VIF, verr error) {
c1, c2 := pipe()
var cerr error
cl := make(chan *vif.VIF)
go func() {
- c, err := vif.InternalNewDialedVIF(c1, naming.FixedRoutingID(0xc), testutil.NewPrincipal("client"), clientVersions, clientOnClose, opts...)
+ c, err := vif.InternalNewDialedVIF(c1, naming.FixedRoutingID(0xc), pclient, clientVersions, clientOnClose, opts...)
if err != nil {
cerr = err
close(cl)
@@ -797,9 +882,7 @@
cl <- c
}
}()
- pserver := testutil.NewPrincipal("server")
- bserver := pserver.BlessingStore().Default()
- s, err := vif.InternalNewAcceptedVIF(c2, naming.FixedRoutingID(0x5), pserver, bserver, serverVersions, serverOnClose, lopts...)
+ s, err := vif.InternalNewAcceptedVIF(c2, naming.FixedRoutingID(0x5), pserver, pserver.BlessingStore().Default(), serverVersions, serverOnClose, lopts...)
c, ok := <-cl
if err != nil {
verr = err
@@ -843,12 +926,12 @@
// createVC creates a VC by dialing from the client process to the server
// process. It returns the VC at the client and the Connector at the server
// (which the server can use to create flows over the VC)).
-func createVC(client, server *vif.VIF, ep naming.Endpoint, opts ...stream.VCOpt) (clientVC stream.VC, serverConnector stream.Connector, err error) {
+func createVC(client, server *vif.VIF, pclient security.Principal, ep naming.Endpoint, opts ...stream.VCOpt) (clientVC stream.VC, serverConnector stream.Connector, err error) {
vcChan := make(chan stream.VC)
scChan := make(chan stream.Connector)
errChan := make(chan error)
go func() {
- vc, err := client.Dial(ep, testutil.NewPrincipal("client"), opts...)
+ vc, err := client.Dial(ep, pclient, opts...)
errChan <- err
vcChan <- vc
}()
@@ -870,11 +953,11 @@
return
}
-func createNVCs(client, server *vif.VIF, startRID uint64, N int, opts ...stream.VCOpt) (clientVCs []stream.VC, serverConnectors []stream.Connector, err error) {
+func createNVCs(client, server *vif.VIF, pclient security.Principal, startRID uint64, N int, opts ...stream.VCOpt) (clientVCs []stream.VC, serverConnectors []stream.Connector, err error) {
var c stream.VC
var s stream.Connector
for i := 0; i < N; i++ {
- c, s, err = createVC(client, server, makeEP(startRID+uint64(i)), opts...)
+ c, s, err = createVC(client, server, pclient, makeEP(startRID+uint64(i)), opts...)
if err != nil {
return
}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 76c8ce9..63f3133 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -25,7 +25,7 @@
//
// Min is incremented whenever we want to remove support for old protocol
// versions.
-var SupportedRange = &Range{Min: version.RPCVersion9, Max: version.RPCVersion10}
+var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
func init() {
metadata.Insert("v23.RPCVersionMax", fmt.Sprint(SupportedRange.Max))
diff --git a/runtime/internal/vtrace/vtrace_test.go b/runtime/internal/vtrace/vtrace_test.go
index f407098..38e3df5 100644
--- a/runtime/internal/vtrace/vtrace_test.go
+++ b/runtime/internal/vtrace/vtrace_test.go
@@ -309,6 +309,7 @@
vtrace.ForceCollect(ctx)
ctx, client, err := v23.WithNewClient(ctx)
+ defer client.Close()
if err != nil {
t.Fatalf("Couldn't create client %v", err)
}