Merge "device/dmrun: add a clean up step that kills the app instance"
diff --git a/cmd/gclogs/doc.go b/cmd/gclogs/doc.go
index f9ede8b..95242e8 100644
--- a/cmd/gclogs/doc.go
+++ b/cmd/gclogs/doc.go
@@ -35,5 +35,7 @@
The global flags are:
-metadata=<just specify -metadata to activate>
Displays metadata for the program and exits.
+ -time=false
+ Dump timing information to stderr before exiting the program.
*/
package main
diff --git a/cmd/mounttable/doc.go b/cmd/mounttable/doc.go
index 1ccdb36..7543861 100644
--- a/cmd/mounttable/doc.go
+++ b/cmd/mounttable/doc.go
@@ -36,6 +36,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/cmd/namespace/doc.go b/cmd/namespace/doc.go
index fb8f24d..623f185 100644
--- a/cmd/namespace/doc.go
+++ b/cmd/namespace/doc.go
@@ -44,6 +44,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/cmd/principal/doc.go b/cmd/principal/doc.go
index fe730cb..3115fd7 100644
--- a/cmd/principal/doc.go
+++ b/cmd/principal/doc.go
@@ -46,6 +46,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/cmd/uniqueid/doc.go b/cmd/uniqueid/doc.go
index 03b5c4e..57f1c17 100644
--- a/cmd/uniqueid/doc.go
+++ b/cmd/uniqueid/doc.go
@@ -20,6 +20,8 @@
The global flags are:
-metadata=<just specify -metadata to activate>
Displays metadata for the program and exits.
+ -time=false
+ Dump timing information to stderr before exiting the program.
Uniqueid generate - Generates UniqueIds
diff --git a/cmd/vdl/doc.go b/cmd/vdl/doc.go
index afc068a..b17b57c 100644
--- a/cmd/vdl/doc.go
+++ b/cmd/vdl/doc.go
@@ -43,6 +43,8 @@
The global flags are:
-metadata=<just specify -metadata to activate>
Displays metadata for the program and exits.
+ -time=false
+ Dump timing information to stderr before exiting the program.
Vdl generate
diff --git a/cmd/vom/doc.go b/cmd/vom/doc.go
index 82a6a30..dc1a03f 100644
--- a/cmd/vom/doc.go
+++ b/cmd/vom/doc.go
@@ -19,6 +19,8 @@
The global flags are:
-metadata=<just specify -metadata to activate>
Displays metadata for the program and exits.
+ -time=false
+ Dump timing information to stderr before exiting the program.
Vom decode - Decode data encoded in the vom format
diff --git a/cmd/vomtestgen/doc.go b/cmd/vomtestgen/doc.go
index 7dd15f7..fdfb8d9 100644
--- a/cmd/vomtestgen/doc.go
+++ b/cmd/vomtestgen/doc.go
@@ -48,6 +48,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/cmd/vrpc/doc.go b/cmd/vrpc/doc.go
index ff84dc9..25baf70 100644
--- a/cmd/vrpc/doc.go
+++ b/cmd/vrpc/doc.go
@@ -36,6 +36,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/cmd/vrun/doc.go b/cmd/vrun/doc.go
index b0ef6a5..6e5a996 100644
--- a/cmd/vrun/doc.go
+++ b/cmd/vrun/doc.go
@@ -36,6 +36,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/examples/rps/rpsbot/doc.go b/examples/rps/rpsbot/doc.go
index 1f94e3a..cec0fcd 100644
--- a/examples/rps/rpsbot/doc.go
+++ b/examples/rps/rpsbot/doc.go
@@ -36,6 +36,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/examples/rps/rpsplayer/doc.go b/examples/rps/rpsplayer/doc.go
index bfa7fea..a6713f9 100644
--- a/examples/rps/rpsplayer/doc.go
+++ b/examples/rps/rpsplayer/doc.go
@@ -33,6 +33,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/examples/rps/rpsscorekeeper/doc.go b/examples/rps/rpsscorekeeper/doc.go
index 93e07f7..e295e78 100644
--- a/examples/rps/rpsscorekeeper/doc.go
+++ b/examples/rps/rpsscorekeeper/doc.go
@@ -32,6 +32,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/examples/tunnel/tunneld/doc.go b/examples/tunnel/tunneld/doc.go
index de09561..71a8d8c 100644
--- a/examples/tunnel/tunneld/doc.go
+++ b/examples/tunnel/tunneld/doc.go
@@ -34,6 +34,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/examples/tunnel/vsh/doc.go b/examples/tunnel/vsh/doc.go
index 3094470..b5eaac2 100644
--- a/examples/tunnel/vsh/doc.go
+++ b/examples/tunnel/vsh/doc.go
@@ -65,6 +65,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/lib/discovery/advertise.go b/lib/discovery/advertise.go
index 521fcf0..c53d3c4 100644
--- a/lib/discovery/advertise.go
+++ b/lib/discovery/advertise.go
@@ -12,22 +12,23 @@
)
var (
- errNoInterfaceName = verror.Register(pkgPath+".errNoInterfaceName", verror.NoRetry, "{1:}{2:} interface name not provided")
- errNotPackableAttributes = verror.Register(pkgPath+".errNotPackableAttributes", verror.NoRetry, "{1:}{2:} attribute not packable")
- errNoAddresses = verror.Register(pkgPath+".errNoAddress", verror.NoRetry, "{1:}{2:} address not provided")
- errNotPackableAddresses = verror.Register(pkgPath+".errNotPackableAddresses", verror.NoRetry, "{1:}{2:} address not packable")
+ errAlreadyBeingAdvertised = verror.Register(pkgPath+".errAlreadyBeingAdvertised", verror.NoRetry, "{1:}{2:} already being advertised")
+ errNoInterfaceName = verror.Register(pkgPath+".errNoInterfaceName", verror.NoRetry, "{1:}{2:} interface name not provided")
+ errNotPackableAttributes = verror.Register(pkgPath+".errNotPackableAttributes", verror.NoRetry, "{1:}{2:} attribute not packable")
+ errNoAddresses = verror.Register(pkgPath+".errNoAddress", verror.NoRetry, "{1:}{2:} address not provided")
+ errNotPackableAddresses = verror.Register(pkgPath+".errNotPackableAddresses", verror.NoRetry, "{1:}{2:} address not packable")
)
// Advertise implements discovery.Advertiser.
-func (ds *ds) Advertise(ctx *context.T, service discovery.Service, visibility []security.BlessingPattern) error {
+func (ds *ds) Advertise(ctx *context.T, service discovery.Service, visibility []security.BlessingPattern) (<-chan struct{}, error) {
if len(service.InterfaceName) == 0 {
- return verror.New(errNoInterfaceName, ctx)
+ return nil, verror.New(errNoInterfaceName, ctx)
}
if len(service.Addrs) == 0 {
- return verror.New(errNoAddresses, ctx)
+ return nil, verror.New(errNoAddresses, ctx)
}
if err := validateAttributes(service.Attrs); err != nil {
- return err
+ return nil, err
}
if len(service.InstanceUuid) == 0 {
@@ -39,21 +40,49 @@
Service: service,
}
if err := encrypt(&ad, visibility); err != nil {
- return err
+ return nil, err
}
- adId := string(ad.InstanceUuid)
- ctx, cancel, err := ds.addTask(ctx, adId)
+ ctx, cancel, err := ds.addTask(ctx)
if err != nil {
- return err
+ return nil, err
}
- barrier := NewBarrier(func() { ds.removeTask(ctx, adId) })
+ id := string(ad.Service.InstanceUuid)
+ if !ds.addAd(id) {
+ cancel()
+ ds.removeTask(ctx)
+ return nil, verror.New(errAlreadyBeingAdvertised, ctx)
+ }
+
+ done := make(chan struct{})
+ barrier := NewBarrier(func() {
+ ds.removeAd(id)
+ ds.removeTask(ctx)
+ close(done)
+ })
for _, plugin := range ds.plugins {
if err := plugin.Advertise(ctx, ad, barrier.Add()); err != nil {
cancel()
- return err
+ return nil, err
}
}
- return nil
+ return done, nil
+}
+
+func (ds *ds) addAd(id string) bool {
+ ds.mu.Lock()
+ if _, exist := ds.ads[id]; exist {
+ ds.mu.Unlock()
+ return false
+ }
+ ds.ads[id] = struct{}{}
+ ds.mu.Unlock()
+ return true
+}
+
+func (ds *ds) removeAd(id string) {
+ ds.mu.Lock()
+ delete(ds.ads, id)
+ ds.mu.Unlock()
}
diff --git a/lib/discovery/attributes.go b/lib/discovery/attributes.go
new file mode 100644
index 0000000..8b4ab69
--- /dev/null
+++ b/lib/discovery/attributes.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import (
+ "errors"
+ "strings"
+
+ "v.io/v23/discovery"
+)
+
+// validateAttributes returns an error if the attributes are not suitable for advertising.
+func validateAttributes(attrs discovery.Attributes) error {
+ for k, _ := range attrs {
+ if len(k) == 0 {
+ return errors.New("empty key")
+ }
+ if strings.HasPrefix(k, "_") {
+ return errors.New("key starts with '_'")
+ }
+ for _, c := range k {
+ if c < 0x20 || c > 0x7e {
+ return errors.New("key is not printable US-ASCII")
+ }
+ if c == '=' {
+ return errors.New("key includes '='")
+ }
+ }
+ }
+ return nil
+}
+
diff --git a/lib/discovery/attributes_test.go b/lib/discovery/attributes_test.go
new file mode 100644
index 0000000..7387565
--- /dev/null
+++ b/lib/discovery/attributes_test.go
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import (
+ "testing"
+
+ "v.io/v23/discovery"
+)
+
+func TestValidateAttributes(t *testing.T) {
+ valids := []discovery.Attributes{
+ discovery.Attributes{"key": "v"},
+ discovery.Attributes{"k_e.y": "v"},
+ discovery.Attributes{"k!": "v"},
+ }
+ for i, attrs := range valids {
+ if err := validateAttributes(attrs); err != nil {
+ t.Errorf("[%d]: valid attributes got error: %v", i, err)
+ }
+ }
+
+ invalids := []discovery.Attributes{
+ discovery.Attributes{"_key": "v"},
+ discovery.Attributes{"k=ey": "v"},
+ discovery.Attributes{"key\n": "v"},
+ }
+ for i, attrs := range invalids {
+ if err := validateAttributes(attrs); err == nil {
+ t.Errorf("[%d]: invalid attributes didn't get error", i)
+ }
+ }
+}
+
diff --git a/lib/discovery/cipher.go b/lib/discovery/cipher.go
index 5225d44..597891a 100644
--- a/lib/discovery/cipher.go
+++ b/lib/discovery/cipher.go
@@ -40,13 +40,13 @@
// We only encrypt addresses for now.
//
// TODO(jhahn): Revisit the scope of encryption.
- encrypted := make([]string, len(ad.Addrs))
- for i, addr := range ad.Addrs {
+ encrypted := make([]string, len(ad.Service.Addrs))
+ for i, addr := range ad.Service.Addrs {
var n [24]byte
binary.LittleEndian.PutUint64(n[:], uint64(i))
encrypted[i] = string(secretbox.Seal(nil, []byte(addr), &n, sharedKey))
}
- ad.Addrs = encrypted
+ ad.Service.Addrs = encrypted
return nil
}
@@ -77,8 +77,8 @@
// Note that we should not modify the slice element directly here since the
// underlying plugins may cache services and the next plugin.Scan() may return
// the already decrypted addresses.
- decrypted := make([]string, len(ad.Addrs))
- for i, encrypted := range ad.Addrs {
+ decrypted := make([]string, len(ad.Service.Addrs))
+ for i, encrypted := range ad.Service.Addrs {
var n [24]byte
binary.LittleEndian.PutUint64(n[:], uint64(i))
addr, ok := secretbox.Open(nil, []byte(encrypted), &n, sharedKey)
@@ -87,7 +87,7 @@
}
decrypted[i] = string(addr)
}
- ad.Addrs = decrypted
+ ad.Service.Addrs = decrypted
return nil
}
diff --git a/lib/discovery/discovery.go b/lib/discovery/discovery.go
index b48bc58..78597b7 100644
--- a/lib/discovery/discovery.go
+++ b/lib/discovery/discovery.go
@@ -7,8 +7,6 @@
import (
"sync"
- "github.com/pborman/uuid"
-
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/verror"
@@ -16,49 +14,21 @@
const pkgPath = "v.io/x/ref/runtime/internal/discovery"
-// Advertisement holds a set of service properties to advertise.
-type Advertisement struct {
- discovery.Service
-
- // The service UUID to advertise.
- ServiceUuid uuid.UUID
-
- // Type of encryption applied to the advertisement so that it can
- // only be decoded by authorized principals.
- EncryptionAlgorithm EncryptionAlgorithm
- // If the advertisement is encrypted, then the data required to
- // decrypt it. The format of this data is a function of the algorithm.
- EncryptionKeys []EncryptionKey
-
- // TODO(jhahn): Add proximity.
- // TODO(jhahn): Use proximity for Lost.
- Lost bool
-}
-
-type EncryptionAlgorithm int
-type EncryptionKey []byte
-
-const (
- NoEncryption EncryptionAlgorithm = 0
- TestEncryption EncryptionAlgorithm = 1
- IbeEncryption EncryptionAlgorithm = 2
-)
var (
- errClosed = verror.Register(pkgPath+".errClosed", verror.NoRetry, "{1:}{2:} closed")
- errAlreadyBeingAdvertised = verror.Register(pkgPath+".errAlreadyBeingAdvertised", verror.NoRetry, "{1:}{2:} already being advertised")
+ errDiscoveryClosed = verror.Register(pkgPath+".errDiscoveryClosed", verror.NoRetry, "{1:}{2:} discovery closed")
)
// ds is an implementation of discovery.T.
type ds struct {
plugins []Plugin
- mu sync.Mutex
- closed bool // GUARDED_BY(mu)
- tasks map[*context.T]func() // GUARDED_BY(mu)
- advertising map[string]struct{} // GUARDED_BY(mu)
+ mu sync.Mutex
+ closed bool // GUARDED_BY(mu)
+ tasks map[*context.T]func() // GUARDED_BY(mu)
+ wg sync.WaitGroup
- wg sync.WaitGroup
+ ads map[string]struct{} // GUARDED_BY(mu)
}
func (ds *ds) Close() {
@@ -75,18 +45,11 @@
ds.wg.Wait()
}
-func (ds *ds) addTask(ctx *context.T, adId string) (*context.T, func(), error) {
+func (ds *ds) addTask(ctx *context.T) (*context.T, func(), error) {
ds.mu.Lock()
if ds.closed {
ds.mu.Unlock()
- return nil, nil, verror.New(errClosed, ctx)
- }
- if len(adId) > 0 {
- if _, exist := ds.advertising[adId]; exist {
- ds.mu.Unlock()
- return nil, nil, verror.New(errAlreadyBeingAdvertised, ctx)
- }
- ds.advertising[adId] = struct{}{}
+ return nil, nil, verror.New(errDiscoveryClosed, ctx)
}
ctx, cancel := context.WithCancel(ctx)
ds.tasks[ctx] = cancel
@@ -95,27 +58,26 @@
return ctx, cancel, nil
}
-func (ds *ds) removeTask(ctx *context.T, adId string) {
+func (ds *ds) removeTask(ctx *context.T) {
ds.mu.Lock()
- if len(adId) > 0 {
- delete(ds.advertising, adId)
- }
- _, exist := ds.tasks[ctx]
- delete(ds.tasks, ctx)
- ds.mu.Unlock()
- if exist {
+ if _, exist := ds.tasks[ctx]; exist {
+ delete(ds.tasks, ctx)
ds.wg.Done()
}
+ ds.mu.Unlock()
}
// New returns a new Discovery instance initialized with the given plugins.
//
// Mostly for internal use. Consider to use factory.New.
func NewWithPlugins(plugins []Plugin) discovery.T {
+ if len(plugins) == 0 {
+ panic("no plugins")
+ }
ds := &ds{
- plugins: make([]Plugin, len(plugins)),
- tasks: make(map[*context.T]func()),
- advertising: make(map[string]struct{}),
+ plugins: make([]Plugin, len(plugins)),
+ tasks: make(map[*context.T]func()),
+ ads: make(map[string]struct{}),
}
copy(ds.plugins, plugins)
return ds
diff --git a/lib/discovery/discovery_test.go b/lib/discovery/discovery_test.go
index 34b079f..28984cd 100644
--- a/lib/discovery/discovery_test.go
+++ b/lib/discovery/discovery_test.go
@@ -9,6 +9,7 @@
"fmt"
"reflect"
"runtime"
+ "sync"
"testing"
"time"
@@ -25,11 +26,21 @@
)
func advertise(ctx *context.T, ds discovery.Advertiser, perms []security.BlessingPattern, services ...discovery.Service) (func(), error) {
- ctx, stop := context.WithCancel(ctx)
+ var wg sync.WaitGroup
+ tr := idiscovery.NewTrigger()
+ ctx, cancel := context.WithCancel(ctx)
for _, service := range services {
- if err := ds.Advertise(ctx, service, perms); err != nil {
+ wg.Add(1)
+ done, err := ds.Advertise(ctx, service, perms)
+ if err != nil {
+ cancel()
return nil, fmt.Errorf("Advertise failed: %v", err)
}
+ tr.Add(wg.Done, done)
+ }
+ stop := func() {
+ cancel()
+ wg.Wait()
}
return stop, nil
}
diff --git a/lib/discovery/encoding.go b/lib/discovery/encoding.go
index d58b315..f6edd07 100644
--- a/lib/discovery/encoding.go
+++ b/lib/discovery/encoding.go
@@ -9,32 +9,8 @@
"encoding/binary"
"errors"
"io"
- "strings"
-
- "v.io/v23/discovery"
)
-// validateAttributes returns an error if the attributes are not suitable for advertising.
-func validateAttributes(attrs discovery.Attributes) error {
- for k, _ := range attrs {
- if len(k) == 0 {
- return errors.New("empty key")
- }
- if strings.HasPrefix(k, "_") {
- return errors.New("key starts with '_'")
- }
- for _, c := range k {
- if c < 0x20 || c > 0x7e {
- return errors.New("key is not printable US-ASCII")
- }
- if c == '=' {
- return errors.New("key includes '='")
- }
- }
- }
- return nil
-}
-
// PackAddresses packs addresses into a byte slice.
func PackAddresses(addrs []string) []byte {
var buf bytes.Buffer
diff --git a/lib/discovery/encoding_test.go b/lib/discovery/encoding_test.go
index 488bc77..ebaa103 100644
--- a/lib/discovery/encoding_test.go
+++ b/lib/discovery/encoding_test.go
@@ -2,78 +2,48 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package discovery
+package discovery_test
import (
"reflect"
"testing"
- "v.io/v23/discovery"
+ "v.io/x/ref/lib/discovery"
+ "v.io/x/ref/lib/discovery/testdata"
)
-func TestValidateAttributes(t *testing.T) {
- valids := []discovery.Attributes{
- discovery.Attributes{"key": "v"},
- discovery.Attributes{"k_e.y": "v"},
- discovery.Attributes{"k!": "v"},
- }
- for i, attrs := range valids {
- if err := validateAttributes(attrs); err != nil {
- t.Errorf("[%d]: valid attributes got error: %v", i, err)
- }
- }
-
- invalids := []discovery.Attributes{
- discovery.Attributes{"_key": "v"},
- discovery.Attributes{"k=ey": "v"},
- discovery.Attributes{"key\n": "v"},
- }
- for i, attrs := range invalids {
- if err := validateAttributes(attrs); err == nil {
- t.Errorf("[%d]: invalid attributes didn't get error", i)
- }
- }
-}
-
func TestPackAddresses(t *testing.T) {
- tests := [][]string{
- []string{"a12345"},
- []string{"a1234", "b5678", "c9012"},
- nil,
- }
-
- for _, test := range tests {
- pack := PackAddresses(test)
- unpack, err := UnpackAddresses(pack)
+ for _, test := range testdata.PackAddressTestData {
+ pack := discovery.PackAddresses(test.In)
+ if !reflect.DeepEqual(pack, test.Packed) {
+ t.Errorf("packed to: %v, but wanted: %v", pack, test.Packed)
+ }
+ unpack, err := discovery.UnpackAddresses(test.Packed)
if err != nil {
t.Errorf("unpacked error: %v", err)
continue
}
- if !reflect.DeepEqual(test, unpack) {
- t.Errorf("unpacked to %v, but want %v", unpack, test)
+ if !reflect.DeepEqual(test.In, unpack) {
+ t.Errorf("unpacked to %v, but want %v", unpack, test.In)
}
}
}
func TestPackEncryptionKeys(t *testing.T) {
- tests := []struct {
- algo EncryptionAlgorithm
- keys []EncryptionKey
- }{
- {TestEncryption, []EncryptionKey{EncryptionKey("0123456789")}},
- {IbeEncryption, []EncryptionKey{EncryptionKey("012345"), EncryptionKey("123456"), EncryptionKey("234567")}},
- {NoEncryption, nil},
- }
+ for _, test := range testdata.PackEncryptionKeysTestData {
+ pack := discovery.PackEncryptionKeys(test.Algo, test.Keys)
- for _, test := range tests {
- pack := PackEncryptionKeys(test.algo, test.keys)
- algo, keys, err := UnpackEncryptionKeys(pack)
+ if !reflect.DeepEqual(pack, test.Packed) {
+ t.Errorf("packed to: %v, but wanted: %v", pack, test.Packed)
+ }
+
+ algo, keys, err := discovery.UnpackEncryptionKeys(test.Packed)
if err != nil {
t.Errorf("unpacked error: %v", err)
continue
}
- if algo != test.algo || !reflect.DeepEqual(keys, test.keys) {
- t.Errorf("unpacked to (%d, %v), but want (%d, %v)", algo, keys, test.algo, test.keys)
+ if algo != test.Algo || !reflect.DeepEqual(keys, test.Keys) {
+ t.Errorf("unpacked to (%v, %v), but want (%v, %v)", algo, keys, test.Algo, test.Keys)
}
}
}
diff --git a/lib/discovery/factory/lazy.go b/lib/discovery/factory/lazy.go
index 05af440..5622562 100644
--- a/lib/discovery/factory/lazy.go
+++ b/lib/discovery/factory/lazy.go
@@ -27,10 +27,10 @@
derr error
}
-func (l *lazyFactory) Advertise(ctx *context.T, service discovery.Service, visibility []security.BlessingPattern) error {
+func (l *lazyFactory) Advertise(ctx *context.T, service discovery.Service, visibility []security.BlessingPattern) (<-chan struct{}, error) {
l.once.Do(l.init)
if l.derr != nil {
- return l.derr
+ return nil, l.derr
}
return l.d.Advertise(ctx, service, visibility)
}
diff --git a/lib/discovery/factory/lazy_test.go b/lib/discovery/factory/lazy_test.go
index f97cb9f..003ec45 100644
--- a/lib/discovery/factory/lazy_test.go
+++ b/lib/discovery/factory/lazy_test.go
@@ -27,9 +27,9 @@
return m, nil
}
-func (m *mock) Advertise(_ *context.T, _ discovery.Service, _ []security.BlessingPattern) error {
+func (m *mock) Advertise(_ *context.T, _ discovery.Service, _ []security.BlessingPattern) (<-chan struct{}, error) {
m.numAdvertises++
- return nil
+ return nil, nil
}
func (m *mock) Scan(_ *context.T, _ string) (<-chan discovery.Update, error) {
@@ -91,7 +91,7 @@
}
// Closed already; Shouldn't initialize it again.
- if err := d.Advertise(nil, discovery.Service{}, nil); err != errClosed {
+ if _, err := d.Advertise(nil, discovery.Service{}, nil); err != errClosed {
t.Errorf("expected an error %v, but got %v", errClosed, err)
}
if err := m.check(0, 0, 0, 0); err != nil {
@@ -111,7 +111,7 @@
m := mock{initErr: errInit}
d := newLazyFactory(m.init)
- if err := d.Advertise(nil, discovery.Service{}, nil); err != errInit {
+ if _, err := d.Advertise(nil, discovery.Service{}, nil); err != errInit {
t.Errorf("expected an error %v, but got %v", errInit, err)
}
if err := m.check(1, 0, 0, 0); err != nil {
diff --git a/lib/discovery/plugin.go b/lib/discovery/plugin.go
index daca158..092f510 100644
--- a/lib/discovery/plugin.go
+++ b/lib/discovery/plugin.go
@@ -5,8 +5,6 @@
package discovery
import (
- "github.com/pborman/uuid"
-
"v.io/v23/context"
)
@@ -24,5 +22,5 @@
// deadline. done should be called once when scanning is done or canceled.
//
// TODO(jhahn): Pass a filter on service attributes.
- Scan(ctx *context.T, serviceUuid uuid.UUID, ch chan<- Advertisement, done func()) error
+ Scan(ctx *context.T, serviceUuid Uuid, ch chan<- Advertisement, done func()) error
}
diff --git a/lib/discovery/plugins/ble/advertisement.go b/lib/discovery/plugins/ble/advertisement.go
index 7a8a3f9..50de7f9 100644
--- a/lib/discovery/plugins/ble/advertisement.go
+++ b/lib/discovery/plugins/ble/advertisement.go
@@ -21,38 +21,28 @@
attrs map[string][]byte
}
-const (
- // This uuids are v5 uuid generated out of band. These constants need
- // to be accessible in all the languages that have a ble implementation
- instanceUUID = "12db9a9c-1c7c-5560-bc6b-73a115c93413" // NewAttributeUUID("_instanceuuid")
- instanceNameUUID = "ffbdcff3-e56f-58f0-8c1a-e416c39aac0d" // NewAttributeUUID("_instancename")
- interfaceNameUUID = "b2cadfd4-d003-576c-acad-58b8e3a9cbc8" // NewAttributeUUID("_interfacename")
- addrsUUID = "ad2566b7-59d8-50ae-8885-222f43f65fdc" // NewAttributeUUID("_addrs")
- encryptionUUID = "6286d80a-adaa-519a-8a06-281a4645a607" // NewAttributeUUID("_encryption")
-)
-
func newAdvertisment(adv discovery.Advertisement) bleAdv {
attrs := map[string][]byte{
- instanceUUID: adv.InstanceUuid,
- interfaceNameUUID: []byte(adv.InterfaceName),
+ InstanceUUID: adv.Service.InstanceUuid,
+ InterfaceNameUUID: []byte(adv.Service.InterfaceName),
}
- if len(adv.InstanceName) > 0 {
- attrs[instanceNameUUID] = []byte(adv.InstanceName)
+ if len(adv.Service.InstanceName) > 0 {
+ attrs[InstanceNameUUID] = []byte(adv.Service.InstanceName)
}
- if len(adv.Addrs) > 0 {
- attrs[addrsUUID] = discovery.PackAddresses(adv.Addrs)
+ if len(adv.Service.Addrs) > 0 {
+ attrs[AddrsUUID] = discovery.PackAddresses(adv.Service.Addrs)
}
if adv.EncryptionAlgorithm != discovery.NoEncryption {
- attrs[encryptionUUID] = discovery.PackEncryptionKeys(adv.EncryptionAlgorithm, adv.EncryptionKeys)
+ attrs[EncryptionUUID] = discovery.PackEncryptionKeys(adv.EncryptionAlgorithm, adv.EncryptionKeys)
}
- for k, v := range adv.Attrs {
+ for k, v := range adv.Service.Attrs {
hexUUID := discovery.NewAttributeUUID(k).String()
attrs[hexUUID] = []byte(k + "=" + v)
}
return bleAdv{
- instanceID: adv.InstanceUuid,
- serviceUUID: adv.ServiceUuid,
+ instanceID: adv.Service.InstanceUuid,
+ serviceUUID: uuid.UUID(adv.ServiceUuid),
attrs: attrs,
}
}
@@ -63,23 +53,23 @@
InstanceUuid: a.instanceID,
Attrs: make(vdiscovery.Attributes),
},
- ServiceUuid: a.serviceUUID,
+ ServiceUuid: discovery.Uuid(a.serviceUUID),
}
var err error
for k, v := range a.attrs {
switch k {
- case instanceUUID:
- adv.InstanceUuid = v
- case instanceNameUUID:
- adv.InstanceName = string(v)
- case interfaceNameUUID:
- adv.InterfaceName = string(v)
- case addrsUUID:
- if adv.Addrs, err = discovery.UnpackAddresses(v); err != nil {
+ case InstanceUUID:
+ adv.Service.InstanceUuid = v
+ case InstanceNameUUID:
+ adv.Service.InstanceName = string(v)
+ case InterfaceNameUUID:
+ adv.Service.InterfaceName = string(v)
+ case AddrsUUID:
+ if adv.Service.Addrs, err = discovery.UnpackAddresses(v); err != nil {
return nil, err
}
- case encryptionUUID:
+ case EncryptionUUID:
if adv.EncryptionAlgorithm, adv.EncryptionKeys, err = discovery.UnpackEncryptionKeys(v); err != nil {
return nil, err
}
@@ -88,7 +78,7 @@
if len(parts) != 2 {
return nil, fmt.Errorf("incorrectly formatted value, %s", v)
}
- adv.Attrs[parts[0]] = parts[1]
+ adv.Service.Attrs[parts[0]] = parts[1]
}
}
return adv, nil
diff --git a/lib/discovery/plugins/ble/advertisement_test.go b/lib/discovery/plugins/ble/advertisement_test.go
index 7525be1..c24216d 100644
--- a/lib/discovery/plugins/ble/advertisement_test.go
+++ b/lib/discovery/plugins/ble/advertisement_test.go
@@ -8,36 +8,23 @@
"reflect"
"testing"
- "github.com/pborman/uuid"
-
- vdiscovery "v.io/v23/discovery"
-
- "v.io/x/ref/lib/discovery"
+ "v.io/x/ref/lib/discovery/plugins/ble/testdata"
)
func TestConvertingBackAndForth(t *testing.T) {
- v23Adv := discovery.Advertisement{
- Service: vdiscovery.Service{
- InstanceUuid: []byte(discovery.NewInstanceUUID()),
- InstanceName: "service",
- Attrs: vdiscovery.Attributes{
- "key1": "value1",
- "key2": "value2",
- },
- Addrs: []string{"localhost:1000", "example.com:540"},
- },
- ServiceUuid: uuid.NewUUID(),
- EncryptionAlgorithm: discovery.TestEncryption,
- EncryptionKeys: []discovery.EncryptionKey{discovery.EncryptionKey("k")},
- }
+ for _, test := range testdata.ConversionTestData {
+ v23Adv := test.VAdvertisement
+ adv := newAdvertisment(v23Adv)
+ if !reflect.DeepEqual(adv.attrs, test.BleAdvertisement) {
+ t.Errorf("wanted: %v, got %v", test.BleAdvertisement, adv.attrs)
+ }
+ out, err := adv.toDiscoveryAdvertisement()
+ if err != nil {
+ t.Errorf("unexpected error: %v", err)
+ }
- adv := newAdvertisment(v23Adv)
- out, err := adv.toDiscoveryAdvertisement()
- if err != nil {
- t.Errorf("unexpected error: %v", err)
- }
-
- if !reflect.DeepEqual(&v23Adv, out) {
- t.Errorf("input does not equal output: %v, %v", v23Adv, out)
+ if !reflect.DeepEqual(&v23Adv, out) {
+ t.Errorf("input does not equal output: %v, %v", v23Adv, out)
+ }
}
}
diff --git a/lib/discovery/plugins/ble/const.vdl b/lib/discovery/plugins/ble/const.vdl
new file mode 100644
index 0000000..e408727
--- /dev/null
+++ b/lib/discovery/plugins/ble/const.vdl
@@ -0,0 +1,15 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package ble
+
+const (
+ // This uuids are v5 uuid generated out of band. These constants need
+ // to be accessible in all the languages that have a ble implementation
+ InstanceUUID = "12db9a9c-1c7c-5560-bc6b-73a115c93413" // NewAttributeUUID("_instanceuuid")
+ InstanceNameUUID = "ffbdcff3-e56f-58f0-8c1a-e416c39aac0d" // NewAttributeUUID("_instancename")
+ InterfaceNameUUID = "b2cadfd4-d003-576c-acad-58b8e3a9cbc8" // NewAttributeUUID("_interfacename")
+ AddrsUUID = "ad2566b7-59d8-50ae-8885-222f43f65fdc" // NewAttributeUUID("_addrs")
+ EncryptionUUID = "6286d80a-adaa-519a-8a06-281a4645a607" // NewAttributeUUID("_encryption")
+)
diff --git a/lib/discovery/plugins/ble/const.vdl.go b/lib/discovery/plugins/ble/const.vdl.go
new file mode 100644
index 0000000..91f48be
--- /dev/null
+++ b/lib/discovery/plugins/ble/const.vdl.go
@@ -0,0 +1,20 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: const.vdl
+
+package ble
+
+// This uuids are v5 uuid generated out of band. These constants need
+// to be accessible in all the languages that have a ble implementation
+const InstanceUUID = "12db9a9c-1c7c-5560-bc6b-73a115c93413" // NewAttributeUUID("_instanceuuid")
+
+const InstanceNameUUID = "ffbdcff3-e56f-58f0-8c1a-e416c39aac0d" // NewAttributeUUID("_instancename")
+
+const InterfaceNameUUID = "b2cadfd4-d003-576c-acad-58b8e3a9cbc8" // NewAttributeUUID("_interfacename")
+
+const AddrsUUID = "ad2566b7-59d8-50ae-8885-222f43f65fdc" // NewAttributeUUID("_addrs")
+
+const EncryptionUUID = "6286d80a-adaa-519a-8a06-281a4645a607" // NewAttributeUUID("_encryption")
diff --git a/lib/discovery/plugins/ble/neighborhood.go b/lib/discovery/plugins/ble/neighborhood.go
index c909855..e6a1d37 100644
--- a/lib/discovery/plugins/ble/neighborhood.go
+++ b/lib/discovery/plugins/ble/neighborhood.go
@@ -160,17 +160,17 @@
b.device.SetServices(v)
}
-func (b *bleNeighborhood) addScanner(uuid uuid.UUID) (chan *discovery.Advertisement, int64) {
+func (b *bleNeighborhood) addScanner(uid discovery.Uuid) (chan *discovery.Advertisement, int64) {
ch := make(chan *discovery.Advertisement)
s := &scanner{
- uuid: uuid,
+ uuid: uuid.UUID(uid),
ch: ch,
}
b.mu.Lock()
id := b.nextScanId
b.nextScanId++
b.scannersById[id] = s
- key := uuid.String()
+ key := uuid.UUID(uid).String()
m, found := b.scannersByService[key]
if !found {
m = map[int64]*scanner{}
@@ -206,9 +206,15 @@
return
default:
}
+
b.device.Advertise(b.computeAdvertisement())
+ b.mu.Lock()
+ hasScanner := len(b.scannersById) > 0
+ b.mu.Unlock()
// TODO(bjornick): Don't scan unless there is a scanner running.
- b.device.Scan([]gatt.UUID{}, true)
+ if hasScanner {
+ b.device.Scan([]gatt.UUID{}, true)
+ }
}
// seenHash returns whether or not we have seen the hash <h> before.
@@ -333,7 +339,7 @@
services[uid.String()] = &bleAdv{
serviceUUID: uid,
attrs: charMap,
- instanceID: charMap[strings.Replace(instanceUUID, "-", "", -1)],
+ instanceID: charMap[strings.Replace(InstanceUUID, "-", "", -1)],
}
}
b.saveDevice(h, p.ID(), services)
@@ -466,6 +472,7 @@
w(k)
}
adv := &gatt.AdvPacket{}
+ adv.AppendFlags(0x06)
adv.AppendManufacturerData(manufacturerId, hasher.Sum(nil))
adv.AppendName(b.name)
return adv
diff --git a/lib/discovery/plugins/ble/plugin.go b/lib/discovery/plugins/ble/plugin.go
index fde8592..886e739 100644
--- a/lib/discovery/plugins/ble/plugin.go
+++ b/lib/discovery/plugins/ble/plugin.go
@@ -10,8 +10,6 @@
import (
"v.io/v23/context"
"v.io/x/ref/lib/discovery"
-
- "github.com/pborman/uuid"
)
type blePlugin struct {
@@ -22,14 +20,14 @@
func (b *blePlugin) Advertise(ctx *context.T, ad discovery.Advertisement, done func()) error {
b.b.addAdvertisement(newAdvertisment(ad))
stop := func() {
- b.b.removeService(ad.InstanceUuid)
+ b.b.removeService(ad.Service.InstanceUuid)
done()
}
b.trigger.Add(stop, ctx.Done())
return nil
}
-func (b *blePlugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scan chan<- discovery.Advertisement, done func()) error {
+func (b *blePlugin) Scan(ctx *context.T, serviceUuid discovery.Uuid, scan chan<- discovery.Advertisement, done func()) error {
ch, id := b.b.addScanner(serviceUuid)
drain := func() {
for range ch {
diff --git a/lib/discovery/plugins/ble/testdata/advertisement.vdl b/lib/discovery/plugins/ble/testdata/advertisement.vdl
new file mode 100644
index 0000000..097d056
--- /dev/null
+++ b/lib/discovery/plugins/ble/testdata/advertisement.vdl
@@ -0,0 +1,49 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package testdata
+
+import (
+ "v.io/v23/discovery"
+ idiscovery "v.io/x/ref/lib/discovery"
+)
+
+// AdvertisementConversionTestCase represents a test case for converting between the Vanadium Advertisement format
+// and the Ble Advertisement format.
+type AdvertisementConversionTestCase struct {
+ VAdvertisement idiscovery.Advertisement
+
+ // BleAdvertisement is a map from human readable uuid strings to the byte data.
+ BleAdvertisement map[string][]byte
+}
+
+// ConversionTestData contains test cases for conversions between the ble format and the v23 advertising format.
+const ConversionTestData = []AdvertisementConversionTestCase{
+ AdvertisementConversionTestCase{
+ VAdvertisement: idiscovery.Advertisement{
+ Service: discovery.Service{
+ InstanceUuid: []byte{5, 146, 235, 25, 108, 124, 65, 162, 165, 230, 1, 162, 179, 150, 87, 30},
+ InstanceName: "service",
+ InterfaceName: "v.io/x/ref",
+ Attrs: discovery.Attributes{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ Addrs: []string{"localhost:1000", "example.com:540"},
+ },
+ ServiceUuid: idiscovery.Uuid("\xde\xed\xe9d\xa2\xe9T\x17\x83\x84\xdd\x0c\x86\xd2D\x0e"),
+ EncryptionAlgorithm: idiscovery.TestEncryption,
+ EncryptionKeys: []idiscovery.EncryptionKey{idiscovery.EncryptionKey("k")},
+ },
+ BleAdvertisement: map[string][]byte{
+ "6286d80a-adaa-519a-8a06-281a4645a607": []byte{1, 1, 107},
+ "4ce68e8b-173b-597e-9f93-ca453e7bb790": []byte{107, 101, 121, 49, 61, 118, 97, 108, 117, 101, 49},
+ "777f349c-d01f-5543-aa31-528e48bb53bd": []byte{107, 101, 121, 50, 61, 118, 97, 108, 117, 101, 50},
+ "12db9a9c-1c7c-5560-bc6b-73a115c93413": []byte{5, 146, 235, 25, 108, 124, 65, 162, 165, 230, 1, 162, 179, 150, 87, 30},
+ "b2cadfd4-d003-576c-acad-58b8e3a9cbc8": []byte{118, 46, 105, 111, 47, 120, 47, 114, 101, 102},
+ "ffbdcff3-e56f-58f0-8c1a-e416c39aac0d": []byte{115, 101, 114, 118, 105, 99, 101},
+ "ad2566b7-59d8-50ae-8885-222f43f65fdc": []byte{14, 108, 111, 99, 97, 108, 104, 111, 115, 116, 58, 49, 48, 48, 48, 15, 101, 120, 97, 109, 112, 108, 101, 46, 99, 111, 109, 58, 53, 52, 48},
+ },
+ },
+}
diff --git a/lib/discovery/plugins/ble/testdata/advertisement.vdl.go b/lib/discovery/plugins/ble/testdata/advertisement.vdl.go
new file mode 100644
index 0000000..224288b
--- /dev/null
+++ b/lib/discovery/plugins/ble/testdata/advertisement.vdl.go
@@ -0,0 +1,69 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: advertisement.vdl
+
+package testdata
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/v23/discovery"
+ discovery_2 "v.io/x/ref/lib/discovery"
+)
+
+// AdvertisementConversionTestCase represents a test case for converting between the Vanadium Advertisement format
+// and the Ble Advertisement format.
+type AdvertisementConversionTestCase struct {
+ VAdvertisement discovery_2.Advertisement
+ // BleAdvertisement is a map from human readable uuid strings to the byte data.
+ BleAdvertisement map[string][]byte
+}
+
+func (AdvertisementConversionTestCase) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery/plugins/ble/testdata.AdvertisementConversionTestCase"`
+}) {
+}
+
+func init() {
+ vdl.Register((*AdvertisementConversionTestCase)(nil))
+}
+
+// ConversionTestData contains test cases for conversions between the ble format and the v23 advertising format.
+var ConversionTestData = []AdvertisementConversionTestCase{
+ {
+ VAdvertisement: discovery_2.Advertisement{
+ Service: discovery.Service{
+ InstanceUuid: []byte("\x05\x92\xeb\x19l|A\xa2\xa5\xe6\x01\xa2\xb3\x96W\x1e"),
+ InstanceName: "service",
+ InterfaceName: "v.io/x/ref",
+ Attrs: discovery.Attributes{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ Addrs: []string{
+ "localhost:1000",
+ "example.com:540",
+ },
+ },
+ ServiceUuid: discovery_2.Uuid("\xde\xed\xe9d\xa2\xe9T\x17\x83\x84\xdd\f\x86\xd2D\x0e"),
+ EncryptionAlgorithm: 1,
+ EncryptionKeys: []discovery_2.EncryptionKey{
+ discovery_2.EncryptionKey("k"),
+ },
+ },
+ BleAdvertisement: map[string][]byte{
+ "12db9a9c-1c7c-5560-bc6b-73a115c93413": []byte("\x05\x92\xeb\x19l|A\xa2\xa5\xe6\x01\xa2\xb3\x96W\x1e"),
+ "4ce68e8b-173b-597e-9f93-ca453e7bb790": []byte("key1=value1"),
+ "6286d80a-adaa-519a-8a06-281a4645a607": []byte("\x01\x01k"),
+ "777f349c-d01f-5543-aa31-528e48bb53bd": []byte("key2=value2"),
+ "ad2566b7-59d8-50ae-8885-222f43f65fdc": []byte("\x0elocalhost:1000\x0fexample.com:540"),
+ "b2cadfd4-d003-576c-acad-58b8e3a9cbc8": []byte("v.io/x/ref"),
+ "ffbdcff3-e56f-58f0-8c1a-e416c39aac0d": []byte("service"),
+ },
+ },
+}
diff --git a/lib/discovery/plugins/mdns/mdns.go b/lib/discovery/plugins/mdns/mdns.go
index 4cc5e6a..a006562 100644
--- a/lib/discovery/plugins/mdns/mdns.go
+++ b/lib/discovery/plugins/mdns/mdns.go
@@ -72,10 +72,10 @@
}
func (p *plugin) Advertise(ctx *context.T, ad idiscovery.Advertisement, done func()) (err error) {
- serviceName := ad.ServiceUuid.String() + serviceNameSuffix
+ serviceName := uuid.UUID(ad.ServiceUuid).String() + serviceNameSuffix
// We use the instance uuid as the host name so that we can get the instance uuid
// from the lost service instance, which has no txt records at all.
- hostName := encodeInstanceUuid(ad.InstanceUuid)
+ hostName := encodeInstanceUuid(ad.Service.InstanceUuid)
txt, err := createTxtRecords(&ad)
if err != nil {
done()
@@ -104,12 +104,12 @@
return nil
}
-func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, ch chan<- idiscovery.Advertisement, done func()) error {
+func (p *plugin) Scan(ctx *context.T, serviceUuid idiscovery.Uuid, ch chan<- idiscovery.Advertisement, done func()) error {
var serviceName string
if len(serviceUuid) == 0 {
serviceName = v23ServiceName
} else {
- serviceName = serviceUuid.String() + serviceNameSuffix
+ serviceName = uuid.UUID(serviceUuid).String() + serviceNameSuffix
}
go func() {
@@ -189,19 +189,19 @@
func createTxtRecords(ad *idiscovery.Advertisement) ([]string, error) {
// Prepare a txt record with attributes and addresses to announce.
- txt := appendTxtRecord(nil, attrInterface, ad.InterfaceName)
- if len(ad.InstanceName) > 0 {
- txt = appendTxtRecord(txt, attrName, ad.InstanceName)
+ txt := appendTxtRecord(nil, attrInterface, ad.Service.InterfaceName)
+ if len(ad.Service.InstanceName) > 0 {
+ txt = appendTxtRecord(txt, attrName, ad.Service.InstanceName)
}
- if len(ad.Addrs) > 0 {
- addrs := idiscovery.PackAddresses(ad.Addrs)
+ if len(ad.Service.Addrs) > 0 {
+ addrs := idiscovery.PackAddresses(ad.Service.Addrs)
txt = appendTxtRecord(txt, attrAddrs, string(addrs))
}
if ad.EncryptionAlgorithm != idiscovery.NoEncryption {
enc := idiscovery.PackEncryptionKeys(ad.EncryptionAlgorithm, ad.EncryptionKeys)
txt = appendTxtRecord(txt, attrEncryption, string(enc))
}
- for k, v := range ad.Attrs {
+ for k, v := range ad.Service.Attrs {
txt = appendTxtRecord(txt, k, v)
}
txt, err := maybeSplitLargeTXT(txt)
@@ -245,7 +245,7 @@
return ad, nil
}
- ad.Attrs = make(discovery.Attributes)
+ ad.Service.Attrs = make(discovery.Attributes)
for _, rr := range service.TxtRRs {
txt, err := maybeJoinLargeTXT(rr.Txt)
if err != nil {
@@ -259,11 +259,11 @@
}
switch k, v := p[0], p[1]; k {
case attrName:
- ad.InstanceName = v
+ ad.Service.InstanceName = v
case attrInterface:
- ad.InterfaceName = v
+ ad.Service.InterfaceName = v
case attrAddrs:
- if ad.Addrs, err = idiscovery.UnpackAddresses([]byte(v)); err != nil {
+ if ad.Service.Addrs, err = idiscovery.UnpackAddresses([]byte(v)); err != nil {
return idiscovery.Advertisement{}, err
}
case attrEncryption:
@@ -271,7 +271,7 @@
return idiscovery.Advertisement{}, err
}
default:
- ad.Attrs[k] = v
+ ad.Service.Attrs[k] = v
}
}
}
diff --git a/lib/discovery/plugins/mdns/mdns_test.go b/lib/discovery/plugins/mdns/mdns_test.go
index 7617a5c..e6c2b35 100644
--- a/lib/discovery/plugins/mdns/mdns_test.go
+++ b/lib/discovery/plugins/mdns/mdns_test.go
@@ -70,7 +70,7 @@
func startScan(ctx *context.T, p idiscovery.Plugin, interfaceName string) (<-chan idiscovery.Advertisement, func(), error) {
ctx, cancel := context.WithCancel(ctx)
scan := make(chan idiscovery.Advertisement)
- var serviceUuid uuid.UUID
+ var serviceUuid idiscovery.Uuid
if len(interfaceName) > 0 {
serviceUuid = idiscovery.NewServiceUUID(interfaceName)
}
@@ -108,7 +108,7 @@
for _, want := range wants {
matched := false
for i, ad := range ads {
- if !uuid.Equal(ad.InstanceUuid, want.InstanceUuid) {
+ if !uuid.Equal(ad.Service.InstanceUuid, want.InstanceUuid) {
continue
}
if lost {
diff --git a/lib/discovery/plugins/mock/mock.go b/lib/discovery/plugins/mock/mock.go
index 9c61600..d12caef 100644
--- a/lib/discovery/plugins/mock/mock.go
+++ b/lib/discovery/plugins/mock/mock.go
@@ -39,7 +39,7 @@
p.mu.Lock()
ads := p.services[key]
for i, a := range ads {
- if uuid.Equal(a.InstanceUuid, ad.InstanceUuid) {
+ if uuid.Equal(uuid.UUID(a.Service.InstanceUuid), uuid.UUID(ad.Service.InstanceUuid)) {
ads = append(ads[:i], ads[i+1:]...)
break
}
@@ -56,7 +56,7 @@
return nil
}
-func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, ch chan<- discovery.Advertisement, done func()) error {
+func (p *plugin) Scan(ctx *context.T, serviceUuid discovery.Uuid, ch chan<- discovery.Advertisement, done func()) error {
rescan := make(chan struct{})
go func() {
var updateSeqSeen int
@@ -87,7 +87,7 @@
continue
}
for _, ad := range ads {
- current[string(ad.InstanceUuid)] = ad
+ current[string(ad.Service.InstanceUuid)] = ad
}
}
p.mu.Unlock()
diff --git a/lib/discovery/scan.go b/lib/discovery/scan.go
index 0ad6d0c..1f5953f 100644
--- a/lib/discovery/scan.go
+++ b/lib/discovery/scan.go
@@ -5,8 +5,6 @@
package discovery
import (
- "github.com/pborman/uuid"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
@@ -16,12 +14,12 @@
// Scan implements discovery.Scanner.
func (ds *ds) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
// TODO(jhann): Implement a simple query processor.
- var serviceUuid uuid.UUID
+ var serviceUuid Uuid
if len(query) > 0 {
serviceUuid = NewServiceUUID(query)
}
- ctx, cancel, err := ds.addTask(ctx, "")
+ ctx, cancel, err := ds.addTask(ctx)
if err != nil {
return nil, err
}
@@ -30,7 +28,7 @@
scanCh := make(chan Advertisement, 10)
barrier := NewBarrier(func() {
close(scanCh)
- ds.removeTask(ctx, "")
+ ds.removeTask(ctx)
})
for _, plugin := range ds.plugins {
if err := plugin.Scan(ctx, serviceUuid, scanCh, barrier.Add()); err != nil {
@@ -74,11 +72,11 @@
}
// TODO(jhahn): Merge scanData based on InstanceUuid.
var update discovery.Update
- id := string(ad.InstanceUuid)
+ id := string(ad.Service.InstanceUuid)
if ad.Lost {
if _, ok := found[id]; ok {
delete(found, id)
- update = discovery.UpdateLost{discovery.Lost{InstanceUuid: ad.InstanceUuid}}
+ update = discovery.UpdateLost{discovery.Lost{InstanceUuid: ad.Service.InstanceUuid}}
}
} else {
found[id] = struct{}{}
diff --git a/lib/discovery/testdata/encoding.vdl b/lib/discovery/testdata/encoding.vdl
new file mode 100644
index 0000000..7df4f39
--- /dev/null
+++ b/lib/discovery/testdata/encoding.vdl
@@ -0,0 +1,66 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This files contains testdata for v.io/x/ref/lib/discovery/encoding_test.go. The
+// testdata is in a vdl file so that we can make sure the encoding implementations in
+// all the languages produce the same byte output.
+
+package testdata
+
+import (
+ "v.io/x/ref/lib/discovery"
+)
+
+// PackAddressTest represents a test case for PackAddress.
+type PackAddressTest struct {
+ // In is the addresses to pack.
+ In []string
+ // Packed is the expected packed output.
+ Packed []byte
+}
+
+const PackAddressTestData = []PackAddressTest{
+ PackAddressTest{
+ In: []string{"a12345"},
+ Packed: []byte{6, 97, 49, 50, 51, 52, 53},
+ },
+ PackAddressTest{
+ In: []string{"a1234", "b5678", "c9012"},
+ Packed: []byte{5, 97, 49, 50, 51, 52, 5, 98, 53, 54, 55, 56, 5, 99, 57, 48, 49, 50},
+ },
+ // An empty input should create an empty output.
+ PackAddressTest{},
+}
+
+// PackEncryptionKeysTest represents a test case for PackEncryptionKeys
+type PackEncryptionKeysTest struct {
+ // Algo is the algorithm that's in use.
+ // but that isn't defined in vdl yet.
+ Algo discovery.EncryptionAlgorithm
+ // Keys are the encryption keys.
+ // but that isn't defined in vdl yet.
+ Keys []discovery.EncryptionKey
+ // Packed is the expected output bytes.
+ Packed []byte
+}
+
+const PackEncryptionKeysTestData = []PackEncryptionKeysTest{
+ PackEncryptionKeysTest{
+ Algo: 1,
+ Keys: []discovery.EncryptionKey{discovery.EncryptionKey("0123456789")},
+ Packed: []byte{1, 10, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57},
+ },
+ PackEncryptionKeysTest{
+ Algo: 2,
+ Keys: []discovery.EncryptionKey{
+ discovery.EncryptionKey("012345"),
+ discovery.EncryptionKey("123456"),
+ discovery.EncryptionKey("234567"),
+ },
+ Packed: []byte{2, 6, 48, 49, 50, 51, 52, 53, 6, 49, 50, 51, 52, 53, 54, 6, 50, 51, 52, 53, 54, 55},
+ },
+ PackEncryptionKeysTest{
+ Packed: []byte{0},
+ },
+}
diff --git a/lib/discovery/testdata/encoding.vdl.go b/lib/discovery/testdata/encoding.vdl.go
new file mode 100644
index 0000000..5bf5454
--- /dev/null
+++ b/lib/discovery/testdata/encoding.vdl.go
@@ -0,0 +1,91 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: encoding.vdl
+
+package testdata
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/x/ref/lib/discovery"
+)
+
+// PackAddressTest represents a test case for PackAddress.
+type PackAddressTest struct {
+ // In is the addresses to pack.
+ In []string
+ // Packed is the expected packed output.
+ Packed []byte
+}
+
+func (PackAddressTest) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery/testdata.PackAddressTest"`
+}) {
+}
+
+// PackEncryptionKeysTest represents a test case for PackEncryptionKeys
+type PackEncryptionKeysTest struct {
+ // Algo is the algorithm that's in use.
+ // but that isn't defined in vdl yet.
+ Algo discovery.EncryptionAlgorithm
+ // Keys are the encryption keys.
+ // but that isn't defined in vdl yet.
+ Keys []discovery.EncryptionKey
+ // Packed is the expected output bytes.
+ Packed []byte
+}
+
+func (PackEncryptionKeysTest) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery/testdata.PackEncryptionKeysTest"`
+}) {
+}
+
+func init() {
+ vdl.Register((*PackAddressTest)(nil))
+ vdl.Register((*PackEncryptionKeysTest)(nil))
+}
+
+var PackAddressTestData = []PackAddressTest{
+ {
+ In: []string{
+ "a12345",
+ },
+ Packed: []byte("\x06a12345"),
+ },
+ {
+ In: []string{
+ "a1234",
+ "b5678",
+ "c9012",
+ },
+ Packed: []byte("\x05a1234\x05b5678\x05c9012"),
+ },
+ {},
+}
+
+var PackEncryptionKeysTestData = []PackEncryptionKeysTest{
+ {
+ Algo: 1,
+ Keys: []discovery.EncryptionKey{
+ discovery.EncryptionKey("0123456789"),
+ },
+ Packed: []byte("\x01\n0123456789"),
+ },
+ {
+ Algo: 2,
+ Keys: []discovery.EncryptionKey{
+ discovery.EncryptionKey("012345"),
+ discovery.EncryptionKey("123456"),
+ discovery.EncryptionKey("234567"),
+ },
+ Packed: []byte("\x02\x06012345\x06123456\x06234567"),
+ },
+ {
+ Packed: []byte("\x00"),
+ },
+}
diff --git a/lib/discovery/testdata/uuid.vdl b/lib/discovery/testdata/uuid.vdl
new file mode 100644
index 0000000..192aab7
--- /dev/null
+++ b/lib/discovery/testdata/uuid.vdl
@@ -0,0 +1,32 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This files contains testdata for v.io/x/ref/lib/discovery/uuid_test.go. The
+// testdata is in a vdl file so that we can make sure the uuid implementations in
+// all the languages produce the same output.
+
+package testdata
+
+// UuidTestData represents the inputs and outputs for a uuid test.
+type UuidTestData struct {
+ // In is the input string.
+ In string
+ // Want is the expected uuid's human-readable string form.
+ Want string
+}
+
+const InterfaceNameTest = []UuidTestData{
+ UuidTestData{
+ In: "v.io",
+ Want: "2101363c-688d-548a-a600-34d506e1aad0",
+ },
+ UuidTestData{
+ In: "v.io/v23/abc",
+ Want: "6726c4e5-b6eb-5547-9228-b2913f4fad52",
+ },
+ UuidTestData{
+ In: "v.io/v23/abc/xyz",
+ Want: "be8a57d7-931d-5ee4-9243-0bebde0029a5",
+ },
+}
diff --git a/lib/discovery/testdata/uuid.vdl.go b/lib/discovery/testdata/uuid.vdl.go
new file mode 100644
index 0000000..ad67a8a
--- /dev/null
+++ b/lib/discovery/testdata/uuid.vdl.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: uuid.vdl
+
+package testdata
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+// UuidTestData represents the inputs and outputs for a uuid test.
+type UuidTestData struct {
+ // In is the input string.
+ In string
+ // Want is the expected uuid's human-readable string form.
+ Want string
+}
+
+func (UuidTestData) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery/testdata.UuidTestData"`
+}) {
+}
+
+func init() {
+ vdl.Register((*UuidTestData)(nil))
+}
+
+var InterfaceNameTest = []UuidTestData{
+ {
+ In: "v.io",
+ Want: "2101363c-688d-548a-a600-34d506e1aad0",
+ },
+ {
+ In: "v.io/v23/abc",
+ Want: "6726c4e5-b6eb-5547-9228-b2913f4fad52",
+ },
+ {
+ In: "v.io/v23/abc/xyz",
+ Want: "be8a57d7-931d-5ee4-9243-0bebde0029a5",
+ },
+}
diff --git a/lib/discovery/trigger_test.go b/lib/discovery/trigger_test.go
index da2db00..dc5a2b0 100644
--- a/lib/discovery/trigger_test.go
+++ b/lib/discovery/trigger_test.go
@@ -42,4 +42,11 @@
if got, want := <-done, 0; got != want {
t.Errorf("Trigger failed; got %v, but wanted %v", got, want)
}
+
+ // Make sure the callback is triggered even when it is added with a closed channel.
+ close(c0)
+ tr.Add(f0, c0)
+ if got, want := <-done, 0; got != want {
+ t.Errorf("Trigger failed; got %v, but wanted %v", got, want)
+ }
}
diff --git a/lib/discovery/types.vdl b/lib/discovery/types.vdl
new file mode 100644
index 0000000..066409d
--- /dev/null
+++ b/lib/discovery/types.vdl
@@ -0,0 +1,40 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import (
+ "v.io/v23/discovery"
+)
+
+type EncryptionAlgorithm int32
+type EncryptionKey []byte
+
+const (
+ NoEncryption = EncryptionAlgorithm(0)
+ TestEncryption = EncryptionAlgorithm(1)
+ IbeEncryption = EncryptionAlgorithm(2)
+)
+
+type Uuid []byte
+
+// Advertisement holds a set of service properties to advertise.
+type Advertisement struct {
+ Service discovery.Service
+
+ // The service UUID to advertise.
+ ServiceUuid Uuid
+
+ // Type of encryption applied to the advertisement so that it can
+ // only be decoded by authorized principals.
+ EncryptionAlgorithm EncryptionAlgorithm
+ // If the advertisement is encrypted, then the data required to
+ // decrypt it. The format of this data is a function of the algorithm.
+ EncryptionKeys []EncryptionKey
+
+ // TODO(jhahn): Add proximity.
+ // TODO(jhahn): Use proximity for Lost.
+ Lost bool
+}
+
diff --git a/lib/discovery/types.vdl.go b/lib/discovery/types.vdl.go
new file mode 100644
index 0000000..fb26f63
--- /dev/null
+++ b/lib/discovery/types.vdl.go
@@ -0,0 +1,71 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package discovery
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/v23/discovery"
+)
+
+type EncryptionAlgorithm int32
+
+func (EncryptionAlgorithm) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery.EncryptionAlgorithm"`
+}) {
+}
+
+type EncryptionKey []byte
+
+func (EncryptionKey) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery.EncryptionKey"`
+}) {
+}
+
+type Uuid []byte
+
+func (Uuid) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery.Uuid"`
+}) {
+}
+
+// Advertisement holds a set of service properties to advertise.
+type Advertisement struct {
+ Service discovery.Service
+ // The service UUID to advertise.
+ ServiceUuid Uuid
+ // Type of encryption applied to the advertisement so that it can
+ // only be decoded by authorized principals.
+ EncryptionAlgorithm EncryptionAlgorithm
+ // If the advertisement is encrypted, then the data required to
+ // decrypt it. The format of this data is a function of the algorithm.
+ EncryptionKeys []EncryptionKey
+ // TODO(jhahn): Add proximity.
+ // TODO(jhahn): Use proximity for Lost.
+ Lost bool
+}
+
+func (Advertisement) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery.Advertisement"`
+}) {
+}
+
+func init() {
+ vdl.Register((*EncryptionAlgorithm)(nil))
+ vdl.Register((*EncryptionKey)(nil))
+ vdl.Register((*Uuid)(nil))
+ vdl.Register((*Advertisement)(nil))
+}
+
+const NoEncryption = EncryptionAlgorithm(0)
+
+const TestEncryption = EncryptionAlgorithm(1)
+
+const IbeEncryption = EncryptionAlgorithm(2)
diff --git a/lib/discovery/uuid.go b/lib/discovery/uuid.go
index e18457b..2958663 100644
--- a/lib/discovery/uuid.go
+++ b/lib/discovery/uuid.go
@@ -18,14 +18,14 @@
)
// NewServiceUUID returns a version 5 UUID for the given interface name.
-func NewServiceUUID(interfaceName string) uuid.UUID {
- return uuid.NewSHA1(v23UUID, []byte(interfaceName))
+func NewServiceUUID(interfaceName string) Uuid {
+ return Uuid(uuid.NewSHA1(v23UUID, []byte(interfaceName)))
}
// NewInstanceUUID returns a version 4 (random) UUID. Mostly used for
// uniquely identifying the discovery service instance.
-func NewInstanceUUID() uuid.UUID {
- return uuid.NewRandom()
+func NewInstanceUUID() Uuid {
+ return Uuid(uuid.NewRandom())
}
// NewAttributeUUID returns a version 5 UUID for the given key.
diff --git a/lib/discovery/uuid_test.go b/lib/discovery/uuid_test.go
index 447de1d..bdac2bb 100644
--- a/lib/discovery/uuid_test.go
+++ b/lib/discovery/uuid_test.go
@@ -8,20 +8,14 @@
"testing"
"v.io/x/ref/lib/discovery"
+ "v.io/x/ref/lib/discovery/testdata"
+ "github.com/pborman/uuid"
)
func TestServiceUUID(t *testing.T) {
- tests := []struct {
- in, want string
- }{
- {"v.io", "2101363c-688d-548a-a600-34d506e1aad0"},
- {"v.io/v23/abc", "6726c4e5-b6eb-5547-9228-b2913f4fad52"},
- {"v.io/v23/abc/xyz", "be8a57d7-931d-5ee4-9243-0bebde0029a5"},
- }
-
- for _, test := range tests {
- if got := discovery.NewServiceUUID(test.in).String(); got != test.want {
- t.Errorf("ServiceUUID for %q mismatch; got %q, want %q", test.in, got, test.want)
+ for _, test := range testdata.InterfaceNameTest {
+ if got := uuid.UUID(discovery.NewServiceUUID(test.In)).String(); got != test.Want {
+ t.Errorf("ServiceUUID for %q mismatch; got %q, want %q", test.In, got, test.Want)
}
}
}
@@ -29,7 +23,7 @@
func TestInstanceUUID(t *testing.T) {
uuids := make(map[string]struct{})
for x := 0; x < 100; x++ {
- uuid := discovery.NewInstanceUUID().String()
+ uuid := uuid.UUID(discovery.NewInstanceUUID()).String()
if _, ok := uuids[uuid]; ok {
t.Errorf("InstanceUUID returned duplicated UUID %q", uuid)
}
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index ee9453a..a015e18 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -15,7 +15,6 @@
import (
"flag"
- "net"
"v.io/x/lib/netconfig"
"v.io/x/lib/netstate"
@@ -75,8 +74,9 @@
lf := commonFlags.ListenFlags()
listenSpec := rpc.ListenSpec{
- Addrs: rpc.ListenAddrs(lf.Addrs),
- Proxy: lf.Proxy,
+ Addrs: rpc.ListenAddrs(lf.Addrs),
+ Proxy: lf.Proxy,
+ AddressChooser: internal.NewAddressChooser(logger.Global()),
}
reservedDispatcher := debuglib.NewDispatcher(securityflag.NewAuthorizerOrDie())
@@ -85,29 +85,6 @@
discovery.Close()
}
- // Our address is private, so we test for running on GCE and for its
- // 1:1 NAT configuration.
- if !internal.HasPublicIP(logger.Global()) {
- if addr := internal.GCEPublicAddress(logger.Global()); addr != nil {
- listenSpec.AddressChooser = netstate.AddressChooserFunc(func(string, []net.Addr) ([]net.Addr, error) {
- // TODO(cnicolaou): the protocol at least should
- // be configurable, or maybe there's a RuntimeFactory specific
- // flag to configure both the protocol and address.
- return []net.Addr{netstate.NewNetAddr("wsh", addr.String())}, nil
- })
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
- runtimeFactoryShutdown := func() {
- ishutdown()
- shutdown()
- }
- return runtime, ctx, runtimeFactoryShutdown, nil
- }
- }
-
publisher := pubsub.NewPublisher()
// Create stream in Init function to avoid a race between any
@@ -136,8 +113,6 @@
cleanupCh := make(chan struct{})
watcherCh := make(chan struct{})
- listenSpec.AddressChooser = internal.IPAddressChooser{}
-
runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
ishutdown()
diff --git a/runtime/factories/static/static.go b/runtime/factories/static/static.go
index 939e0e0..01fa2de 100644
--- a/runtime/factories/static/static.go
+++ b/runtime/factories/static/static.go
@@ -7,7 +7,6 @@
import (
"flag"
- "net"
"v.io/v23"
"v.io/v23/context"
@@ -58,44 +57,20 @@
lf := commonFlags.ListenFlags()
listenSpec := rpc.ListenSpec{
- Addrs: rpc.ListenAddrs(lf.Addrs),
- Proxy: lf.Proxy,
+ Addrs: rpc.ListenAddrs(lf.Addrs),
+ Proxy: lf.Proxy,
+ AddressChooser: internal.NewAddressChooser(logger.Global()),
}
reservedDispatcher := debuglib.NewDispatcher(securityflag.NewAuthorizerOrDie())
-
ishutdown := func() {
ac.Shutdown()
discovery.Close()
}
-
- // Our address is private, so we test for running on GCE and for its 1:1 NAT
- // configuration. GCEPublicAddress returns a non-nil addr if we are
- // running on GCE.
- if !internal.HasPublicIP(logger.Global()) {
- if addr := internal.GCEPublicAddress(logger.Global()); addr != nil {
- listenSpec.AddressChooser = rpc.AddressChooserFunc(func(string, []net.Addr) ([]net.Addr, error) {
- return []net.Addr{addr}, nil
- })
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
- runtimeFactoryShutdown := func() {
- ishutdown()
- shutdown()
- }
- return runtime, ctx, runtimeFactoryShutdown, nil
- }
- }
- listenSpec.AddressChooser = internal.IPAddressChooser{}
-
runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
ishutdown()
return nil, nil, nil, err
}
-
runtimeFactoryShutdown := func() {
ishutdown()
shutdown()
diff --git a/runtime/internal/address_chooser.go b/runtime/internal/address_chooser.go
new file mode 100644
index 0000000..500465c
--- /dev/null
+++ b/runtime/internal/address_chooser.go
@@ -0,0 +1,60 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+ "net"
+ "sync"
+
+ "v.io/v23/logging"
+ "v.io/v23/rpc"
+)
+
+type addressChooser struct {
+ logger logging.Logger
+ gcePublicAddressOnce sync.Once
+ gcePublicAddress net.Addr
+ ipChooser IPAddressChooser
+}
+
+func (c *addressChooser) setGCEPublicAddress() {
+ c.gcePublicAddressOnce.Do(func() {
+ c.gcePublicAddress = GCEPublicAddress(c.logger)
+ })
+}
+
+func (c *addressChooser) ChooseAddresses(protocol string, candidates []net.Addr) ([]net.Addr, error) {
+ c.setGCEPublicAddress() // Blocks till the address is set
+ if c.gcePublicAddress == nil {
+ return c.ipChooser.ChooseAddresses(protocol, candidates)
+ }
+ return []net.Addr{c.gcePublicAddress}, nil
+}
+
+// NewAddressChooser will return the public IP of process if the process is
+// is being hosted by a cloud service provider (e.g. Google Compute Engine,
+// Amazon EC2), and if not will be the same as IPAddressChooser.
+func NewAddressChooser(logger logging.Logger) rpc.AddressChooser {
+ if HasPublicIP(logger) {
+ return IPAddressChooser{}
+ }
+ // Our address is private, so we test for running on GCE and for its 1:1 NAT
+ // configuration. GCEPublicAddress returns a non-nil addr if we are
+ // running on GCE/AWS.
+ //
+ // GCEPublicAddress can unforunately take up to 1 second to determine that the
+ // external address (see https://github.com/vanadium/issues/issues/776).
+ //
+ // So NewAddressChooser fires it up in a goroutine and returns immediately,
+ // thus avoiding any blockage till the AddressChooser is actually invoked.
+ //
+ // I apologize for the existence of this code! It is ugly, so if you have any
+ // suggestions please do share. Ideally, the operation to "detect whether the
+ // process is running under an Amazon EC2 instance" wouldn't block for a
+ // timeout of 1 second and we can do away with this mess.
+ ret := &addressChooser{logger: logger}
+ go ret.setGCEPublicAddress()
+ return ret
+}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 02ff59e..80d0034 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -234,24 +234,26 @@
return nil, err
}
var bkey, dkey uint64
+ var blessings security.Blessings
+ var discharges map[string]security.Discharge
if !c.isProxy {
// TODO(suharshs): On the first flow dial, find a way to not call this twice.
rbnames, rejected, err := auth.AuthorizePeer(ctx, c.local, remote, rBlessings, rDischarges)
if err != nil {
return nil, iflow.MaybeWrapError(verror.ErrNotTrusted, ctx, err)
}
- blessings, discharges, err := auth.BlessingsForPeer(ctx, rbnames)
+ blessings, discharges, err = auth.BlessingsForPeer(ctx, rbnames)
if err != nil {
return nil, NewErrNoBlessingsForPeer(ctx, rbnames, rejected, err)
}
- if blessings.IsZero() {
- // its safe to ignore this error since c.lBlessings must be valid, so the
- // encoding of the publicKey can never error out.
- blessings, _ = security.NamelessBlessing(c.lBlessings.PublicKey())
- }
- if bkey, dkey, err = c.blessingsFlow.send(ctx, blessings, discharges); err != nil {
- return nil, err
- }
+ }
+ if blessings.IsZero() {
+ // its safe to ignore this error since c.lBlessings must be valid, so the
+ // encoding of the publicKey can never error out.
+ blessings, _ = security.NamelessBlessing(c.lBlessings.PublicKey())
+ }
+ if bkey, dkey, err = c.blessingsFlow.send(ctx, blessings, discharges); err != nil {
+ return nil, err
}
defer c.mu.Unlock()
c.mu.Lock()
diff --git a/runtime/internal/rpc/benchmark/benchmark/doc.go b/runtime/internal/rpc/benchmark/benchmark/doc.go
index 8016f5e..20ccde2 100644
--- a/runtime/internal/rpc/benchmark/benchmark/doc.go
+++ b/runtime/internal/rpc/benchmark/benchmark/doc.go
@@ -76,6 +76,8 @@
write an execution trace to the named file after execution
-test.v=false
verbose: print additional output
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v23.credentials=
directory to use for storing security credentials
-v23.i18n-catalogue=
diff --git a/runtime/internal/rpc/benchmark/benchmarkd/doc.go b/runtime/internal/rpc/benchmark/benchmarkd/doc.go
index a2bf3f8..11b555c 100644
--- a/runtime/internal/rpc/benchmark/benchmarkd/doc.go
+++ b/runtime/internal/rpc/benchmark/benchmarkd/doc.go
@@ -62,6 +62,8 @@
write an execution trace to the named file after execution
-test.v=false
verbose: print additional output
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 0768987..11a59f7 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -105,7 +105,7 @@
return c
}
-func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
+func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt, flowOpts []stream.FlowOpt) (stream.Flow, *verror.SubErr) {
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{Err: err, Options: verror.Print}
}
@@ -119,7 +119,7 @@
// We are serializing the creation of all flows per VC. This is okay
// because if one flow creation is to block, it is likely that all others
// for that VC would block as well.
- if flow, err := found.Connect(); err == nil {
+ if flow, err := found.Connect(flowOpts...); err == nil {
return flow, nil
}
// If the vc fails to establish a new flow, we assume it's
@@ -144,7 +144,7 @@
return nil, suberr(err)
}
- flow, err := v.Connect()
+ flow, err := v.Connect(flowOpts...)
if err != nil {
return nil, suberr(err)
}
@@ -335,7 +335,7 @@
// authorizer, both during creation of the VC underlying the flow and the
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
-func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt, flowOpts []stream.FlowOpt) {
defer c.wg.Done()
status := &serverStatus{index: index, server: server}
var span vtrace.Span
@@ -365,7 +365,7 @@
status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
return
}
- if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
+ if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth}), flowOpts); status.serverErr != nil {
status.serverErr.Name = suberrName(server, name, method)
ctx.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
return
@@ -470,7 +470,9 @@
responses := make([]*serverStatus, attempts)
ch := make(chan *serverStatus, attempts)
- vcOpts := append(translateVCOpts(opts), c.vcOpts...)
+ vcOpts, flowOpts := translateStreamOpts(opts)
+ vcOpts = append(vcOpts, c.vcOpts...)
+
authorizer := newServerAuthorizer(blessingPattern, opts...)
for i, server := range resolved.Names() {
// Create a copy of vcOpts for each call to tryCreateFlow
@@ -486,7 +488,7 @@
c.wg.Add(1)
c.mu.Unlock()
- go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
+ go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy, flowOpts)
}
var timeoutChan <-chan time.Time
diff --git a/runtime/internal/rpc/options.go b/runtime/internal/rpc/options.go
index 09da261..7ecf8e3 100644
--- a/runtime/internal/rpc/options.go
+++ b/runtime/internal/rpc/options.go
@@ -13,6 +13,7 @@
"v.io/x/ref/lib/apilog"
"v.io/x/ref/runtime/internal/rpc/stream"
+ "v.io/x/ref/runtime/internal/rpc/stream/vc"
)
// PreferredProtocols instructs the Runtime implementation to select
@@ -79,11 +80,13 @@
return false
}
-func translateVCOpts(opts []rpc.CallOpt) (vcOpts []stream.VCOpt) {
+func translateStreamOpts(opts []rpc.CallOpt) (vcOpts []stream.VCOpt, flowOpts []stream.FlowOpt) {
for _, o := range opts {
switch v := o.(type) {
case stream.VCOpt:
vcOpts = append(vcOpts, v)
+ case options.ChannelTimeout:
+ flowOpts = append(flowOpts, vc.ChannelTimeout(time.Duration(v)))
case options.SecurityLevel:
switch v {
case options.SecurityNone:
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index b58a3cd..80fa2bf 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -294,7 +294,9 @@
s.preferredProtocols = []string(opt)
case options.SecurityLevel:
securityLevel = opt
-
+ case options.ChannelTimeout:
+ s.listenerOpts = append(s.listenerOpts,
+ vc.ChannelTimeout(time.Duration(opt)))
}
}
diff --git a/runtime/internal/rpc/stream/message/control.go b/runtime/internal/rpc/stream/message/control.go
index 8ccc494..dbb474d 100644
--- a/runtime/internal/rpc/stream/message/control.go
+++ b/runtime/internal/rpc/stream/message/control.go
@@ -90,6 +90,17 @@
Data []byte
}
+// HealthCheckRequest is used to periodically check to see if the remote end
+// is still available.
+type HealthCheckRequest struct {
+ VCI id.VC
+}
+
+// HealthCheckResponse is sent in response to a health check request.
+type HealthCheckResponse struct {
+ VCI id.VC
+}
+
// Command enum.
type command uint8
@@ -100,6 +111,8 @@
setupCommand command = 4
setupStreamCommand command = 5
setupVCCommand command = 6
+ healthCheckReqCommand command = 7
+ healthCheckRespCommand command = 8
)
// SetupOption is the base interface for optional Setup options.
@@ -160,6 +173,10 @@
command = setupStreamCommand
case *SetupVC:
command = setupVCCommand
+ case *HealthCheckRequest:
+ command = healthCheckReqCommand
+ case *HealthCheckResponse:
+ command = healthCheckRespCommand
default:
return verror.New(errUnrecognizedVCControlMessageType, nil, fmt.Sprintf("%T", m))
}
@@ -195,6 +212,10 @@
m = new(SetupStream)
case setupVCCommand:
m = new(SetupVC)
+ case healthCheckReqCommand:
+ m = new(HealthCheckRequest)
+ case healthCheckRespCommand:
+ m = new(HealthCheckResponse)
default:
return nil, verror.New(errUnrecognizedVCControlMessageCommand, nil, command)
}
@@ -220,6 +241,22 @@
return
}
+func (m *HealthCheckRequest) writeTo(w io.Writer) (err error) {
+ return writeInt(w, m.VCI)
+}
+
+func (m *HealthCheckRequest) readFrom(r *bytes.Buffer) (err error) {
+ return readInt(r, &m.VCI)
+}
+
+func (m *HealthCheckResponse) writeTo(w io.Writer) (err error) {
+ return writeInt(w, m.VCI)
+}
+
+func (m *HealthCheckResponse) readFrom(r *bytes.Buffer) (err error) {
+ return readInt(r, &m.VCI)
+}
+
func (m *SetupVC) writeTo(w io.Writer) (err error) {
if err = writeInt(w, m.VCI); err != nil {
return
diff --git a/runtime/internal/rpc/stream/proxy/proxy.go b/runtime/internal/rpc/stream/proxy/proxy.go
index 7500150..a010f18 100644
--- a/runtime/internal/rpc/stream/proxy/proxy.go
+++ b/runtime/internal/rpc/stream/proxy/proxy.go
@@ -598,6 +598,21 @@
p.RemoveRoute(srcVCI)
case *message.AddReceiveBuffers:
p.proxy.routeCounters(p, m.Counters)
+ case *message.HealthCheckRequest:
+ if svc := p.ServerVC(m.VCI); svc != nil {
+ // If the request is for the proxy, simply respond to it.
+ p.queue.Put(&message.HealthCheckResponse{VCI: m.VCI})
+ } else if dst := p.Route(m.VCI); dst != nil {
+ m.VCI = dst.VCI
+ dst.Process.queue.Put(m)
+ }
+ case *message.HealthCheckResponse:
+ // Note that the proxy never sends health check requests, so responses
+ // should always be forwarded.
+ if dst := p.Route(m.VCI); dst != nil {
+ m.VCI = dst.VCI
+ dst.Process.queue.Put(m)
+ }
case *message.SetupVC:
// First let's ensure that we can speak a common protocol verison.
intersection, err := iversion.SupportedRange.Intersect(&m.Setup.Versions)
@@ -803,6 +818,10 @@
}
}
+func (p *process) SendHealthCheck(vci id.VC) {
+ p.queue.Put(&message.HealthCheckRequest{VCI: vci})
+}
+
func (p *process) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
if bytes == 0 {
return
diff --git a/runtime/internal/rpc/stream/vc/flow.go b/runtime/internal/rpc/stream/vc/flow.go
index d5f2d2f..ae9cffa 100644
--- a/runtime/internal/rpc/stream/vc/flow.go
+++ b/runtime/internal/rpc/stream/vc/flow.go
@@ -5,6 +5,8 @@
package vc
import (
+ "time"
+
"v.io/v23/naming"
"v.io/v23/security"
@@ -15,6 +17,7 @@
backingVC
*reader
*writer
+ channelTimeout time.Duration
}
type backingVC interface {
diff --git a/runtime/internal/rpc/stream/vc/vc.go b/runtime/internal/rpc/stream/vc/vc.go
index 65514ca..6687a5d 100644
--- a/runtime/internal/rpc/stream/vc/vc.go
+++ b/runtime/internal/rpc/stream/vc/vc.go
@@ -38,6 +38,8 @@
return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
}
+const defaultChannelTimeout = 30 * time.Minute
+
var (
// These errors are intended to be used as arguments to higher
// level errors and hence {1}{2} is omitted from their format
@@ -64,6 +66,7 @@
errFailedToCreateWriterForNewFlow = reg(".errFailedToCreateWriterForNewFlow", "failed to create writer for new flow({3}){:4}")
errFailedToEnqueueFlow = reg(".errFailedToEnqueueFlow", "failed to enqueue flow at listener{:3}")
errFailedToAcceptSystemFlows = reg(".errFailedToAcceptSystemFlows", "failed to accept system flows{:3}")
+ errHealthCheckFailed = reg(".errHealthCheckFailed", "the healthcheck deadline expired.")
)
// DischargeExpiryBuffer specifies how much before discharge expiration we should
@@ -76,6 +79,11 @@
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
}
+type ChannelTimeout time.Duration
+
+func (ChannelTimeout) RPCStreamFlowOpt() {}
+func (ChannelTimeout) RPCStreamListenerOpt() {}
+
const DefaultServerDischargeExpiryBuffer = 20 * time.Second
// DataCache Keys for TypeEncoder/Decoder.
@@ -113,6 +121,10 @@
version version.RPCVersion
remotePubKeyChan chan *crypto.BoxKey // channel which will receive the remote public key during setup.
+ healthCheckNewFlow chan time.Duration
+ healthCheckResponse chan struct{}
+ defaultChannelTimeout time.Duration
+
helper Helper
dataCache *dataCache // dataCache contains information that can shared between Flows from this VC.
loopWG sync.WaitGroup
@@ -176,6 +188,8 @@
// NewWriter creates a buffer queue for Write operations on the
// stream.Flow implementation.
NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error)
+
+ SendHealthCheck(vci id.VC)
}
// Priorities of flows.
@@ -200,13 +214,14 @@
// Params encapsulates the set of parameters needed to create a new VC.
type Params struct {
- VCI id.VC // Identifier of the VC
- Dialed bool // True if the VC was initiated by the local process.
- LocalEP naming.Endpoint // Endpoint of the local end of the VC.
- RemoteEP naming.Endpoint // Endpoint of the remote end of the VC.
- Pool *iobuf.Pool // Byte pool used for read and write buffer allocations.
- ReserveBytes uint // Number of padding bytes to reserve for headers.
- Helper Helper
+ VCI id.VC // Identifier of the VC
+ Dialed bool // True if the VC was initiated by the local process.
+ LocalEP naming.Endpoint // Endpoint of the local end of the VC.
+ RemoteEP naming.Endpoint // Endpoint of the remote end of the VC.
+ Pool *iobuf.Pool // Byte pool used for read and write buffer allocations.
+ ReserveBytes uint // Number of padding bytes to reserve for headers.
+ ChannelTimeout time.Duration // How long to wait before closing an unresponsive channel.
+ Helper Helper
}
// InternalNew creates a new VC, which implements the stream.VC interface.
@@ -219,6 +234,10 @@
if p.Dialed {
fidOffset = 0
}
+ channelTimeout := defaultChannelTimeout
+ if p.ChannelTimeout != 0 {
+ channelTimeout = p.ChannelTimeout
+ }
return &VC{
ctx: ctx,
vci: p.VCI,
@@ -234,11 +253,14 @@
// id if the VC was initiated by the local process,
// and have an odd id if the VC was initiated by the
// remote process.
- nextConnectFID: id.Flow(NumReservedFlows + fidOffset),
- crypter: crypto.NewNullCrypter(),
- closeCh: make(chan struct{}),
- helper: p.Helper,
- dataCache: newDataCache(),
+ nextConnectFID: id.Flow(NumReservedFlows + fidOffset),
+ crypter: crypto.NewNullCrypter(),
+ closeCh: make(chan struct{}),
+ helper: p.Helper,
+ dataCache: newDataCache(),
+ healthCheckNewFlow: make(chan time.Duration, 1),
+ healthCheckResponse: make(chan struct{}, 1),
+ defaultChannelTimeout: channelTimeout,
}
}
@@ -263,6 +285,12 @@
reader: newReader(readHandlerImpl{vc, fid}),
writer: writer,
}
+ for _, opt := range opts {
+ switch o := opt.(type) {
+ case ChannelTimeout:
+ f.channelTimeout = time.Duration(o)
+ }
+ }
vc.mu.Lock()
if vc.flowMap == nil {
vc.mu.Unlock()
@@ -270,6 +298,9 @@
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errConnectOnClosedVC, nil, vc.closeReason))
}
vc.flowMap[fid] = f
+ if f.channelTimeout != 0 && vc.version >= version.RPCVersion12 {
+ vc.healthCheckNewFlow <- f.channelTimeout
+ }
vc.mu.Unlock()
// New flow created, inform remote end that data can be received on it.
vc.helper.NotifyOfNewFlow(vc.vci, fid, DefaultBytesBufferedPerFlow)
@@ -532,6 +563,14 @@
if err = vc.connectSystemFlows(); err != nil {
return vc.appendCloseReason(err)
}
+
+ vc.mu.Lock()
+ if !vc.closed {
+ vc.loopWG.Add(1)
+ go vc.healthCheckLoop()
+ }
+ vc.mu.Unlock()
+
vc.ctx.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
return nil
}
@@ -577,6 +616,14 @@
if err := vc.connectSystemFlows(); err != nil {
return vc.appendCloseReason(err)
}
+
+ vc.mu.Lock()
+ if !vc.closed {
+ vc.loopWG.Add(1)
+ go vc.healthCheckLoop()
+ }
+ vc.mu.Unlock()
+
vc.ctx.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, params.RemoteBlessings, params.LocalBlessings)
return nil
}
@@ -612,6 +659,12 @@
return vc.appendCloseReason(err)
}
}
+ vc.mu.Lock()
+ if !vc.closed {
+ vc.loopWG.Add(1)
+ go vc.healthCheckLoop()
+ }
+ vc.mu.Unlock()
vc.ctx.VI(1).Infof("Client VC %v handshaked with no authentication.", vc)
return nil
}
@@ -686,6 +739,14 @@
return
}
vc.ctx.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
+
+ vc.mu.Lock()
+ if !vc.closed {
+ vc.loopWG.Add(1)
+ go vc.healthCheckLoop()
+ }
+ vc.mu.Unlock()
+
result <- HandshakeResult{ln, nil}
}()
return result
@@ -728,6 +789,14 @@
return
}
vc.ctx.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, params.RemoteBlessings, params.LocalBlessings)
+
+ vc.mu.Lock()
+ if !vc.closed {
+ vc.loopWG.Add(1)
+ go vc.healthCheckLoop()
+ }
+ vc.mu.Unlock()
+
result <- HandshakeResult{ln, nil}
}()
return result
@@ -965,6 +1034,23 @@
return ret
}
+// channelTimeout returns the minimum failure detection delay of all active flows on this VC.
+// A return value of zero means that we are not doing health checks.
+func (vc *VC) channelTimeout() time.Duration {
+ // This is not a great implementation, but it is simple, and in current programs
+ // the number of active flows on a VC is almost always very small.
+ // In the new RPC system we should consider a more efficient implementation.
+ vc.mu.Lock()
+ min := vc.defaultChannelTimeout
+ for _, f := range vc.flowMap {
+ if f.channelTimeout != 0 && f.channelTimeout < min {
+ min = f.channelTimeout
+ }
+ }
+ vc.mu.Unlock()
+ return min
+}
+
// findFlow finds the flow id for the provided flow.
// Returns 0 if there is none.
func (vc *VC) findFlow(flow interface{}) id.Flow {
@@ -1100,6 +1186,12 @@
return newWriter(MaxPayloadSizeBytes, bq, alloc, vc.sharedCounters), nil
}
+func (vc *VC) HandleHealthCheckResponse() {
+ if vc.Version() >= version.RPCVersion12 {
+ vc.healthCheckResponse <- struct{}{}
+ }
+}
+
// readHandlerImpl is an adapter for the readHandler interface required by
// the reader type.
type readHandlerImpl struct {
@@ -1144,3 +1236,52 @@
}
return dischargeClient, dischargeExpiryBuffer
}
+
+// healthCheckLoop runs a state machine that manages health checks for the VC.
+func (vc *VC) healthCheckLoop() {
+ defer vc.loopWG.Done()
+ if vc.Version() < version.RPCVersion12 {
+ return
+ }
+
+ // By default we health check the channel every 30 minutes.
+ channelTimeout, now := vc.channelTimeout(), time.Now()
+ sendTimer, closeTimer := time.NewTimer(channelTimeout/2), time.NewTimer(channelTimeout)
+ sendTime, closeTime := now.Add(channelTimeout/2), now.Add(channelTimeout)
+ outstandingRequest := false
+ defer sendTimer.Stop()
+ defer closeTimer.Stop()
+ for {
+ select {
+ case <-vc.closeCh:
+ // The VC is closing, no need for health checks.
+ return
+ case <-vc.healthCheckResponse:
+ outstandingRequest = false
+ channelTimeout, now = vc.channelTimeout(), time.Now()
+ sendTimer.Reset(channelTimeout / 2)
+ closeTimer.Reset(channelTimeout)
+ sendTime, closeTime = now.Add(channelTimeout/2), now.Add(channelTimeout)
+ case <-closeTimer.C:
+ vc.Close(verror.New(stream.ErrAborted, nil, verror.New(errHealthCheckFailed, nil)))
+ return
+ case <-sendTimer.C:
+ if !outstandingRequest {
+ vc.helper.SendHealthCheck(vc.vci)
+ outstandingRequest = true
+ }
+ case newChannelTimeout := <-vc.healthCheckNewFlow:
+ // New flows might have tighter requirements.
+ now = time.Now()
+ newSendTime, newCloseTime := now.Add(newChannelTimeout/2), now.Add(newChannelTimeout)
+ if newSendTime.Before(sendTime) {
+ sendTime = newSendTime
+ sendTimer.Reset(newChannelTimeout / 2)
+ }
+ if newCloseTime.Before(closeTime) {
+ closeTime = newCloseTime
+ closeTimer.Reset(newChannelTimeout)
+ }
+ }
+ }
+}
diff --git a/runtime/internal/rpc/stream/vc/vc_test.go b/runtime/internal/rpc/stream/vc/vc_test.go
index 2ec9d82..0f8a20c 100644
--- a/runtime/internal/rpc/stream/vc/vc_test.go
+++ b/runtime/internal/rpc/stream/vc/vc_test.go
@@ -682,6 +682,8 @@
}
}
+func (h *helper) SendHealthCheck(vci id.VC) {}
+
func (h *helper) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
h.mu.Lock()
defer h.mu.Unlock()
diff --git a/runtime/internal/rpc/stream/vif/vif.go b/runtime/internal/rpc/stream/vif/vif.go
index d8fd6d7..7cc1b0b 100644
--- a/runtime/internal/rpc/stream/vif/vif.go
+++ b/runtime/internal/rpc/stream/vif/vif.go
@@ -332,7 +332,7 @@
}
}
principal := stream.GetPrincipalVCOpts(ctx, opts...)
- vc, err := vif.newVC(ctx, vif.allocVCI(), vif.localEP, remoteEP, idleTimeout, true)
+ vc, err := vif.newVC(ctx, vif.allocVCI(), vif.localEP, remoteEP, idleTimeout, 0, true)
if err != nil {
return nil, err
}
@@ -436,14 +436,16 @@
ctx.VI(2).Infof("Ignoring SetupVC message %+v as VIF %s does not accept VCs", m, vif)
return errors.New("VCs not accepted")
}
- var idleTimeout time.Duration
+ var channelTimeout, idleTimeout time.Duration
for _, o := range lopts {
switch v := o.(type) {
case vc.IdleTimeout:
idleTimeout = v.Duration
+ case vc.ChannelTimeout:
+ channelTimeout = time.Duration(v)
}
}
- vcobj, err := vif.newVC(ctx, m.VCI, m.RemoteEndpoint, m.LocalEndpoint, idleTimeout, false)
+ vcobj, err := vif.newVC(ctx, m.VCI, m.RemoteEndpoint, m.LocalEndpoint, idleTimeout, channelTimeout, false)
if err != nil {
return err
}
@@ -737,6 +739,14 @@
}
vif.ctx.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
+ case *message.HealthCheckRequest:
+ vif.sendOnExpressQ(&message.HealthCheckResponse{VCI: m.VCI})
+
+ case *message.HealthCheckResponse:
+ if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
+ vc.HandleHealthCheckResponse()
+ }
+
case *message.Setup:
vif.ctx.Infof("Ignoring redundant Setup message %T on VIF %s", m, vif)
@@ -1024,7 +1034,7 @@
return ret
}
-func (vif *VIF) newVC(ctx *context.T, vci id.VC, localEP, remoteEP naming.Endpoint, idleTimeout time.Duration, side vifSide) (*vc.VC, error) {
+func (vif *VIF) newVC(ctx *context.T, vci id.VC, localEP, remoteEP naming.Endpoint, idleTimeout, channelTimeout time.Duration, side vifSide) (*vc.VC, error) {
vif.muStartTimer.Lock()
if vif.startTimer != nil {
vif.startTimer.Stop()
@@ -1032,13 +1042,14 @@
}
vif.muStartTimer.Unlock()
vc := vc.InternalNew(ctx, vc.Params{
- VCI: vci,
- Dialed: side == dialedVIF,
- LocalEP: localEP,
- RemoteEP: remoteEP,
- Pool: vif.pool,
- ReserveBytes: uint(message.HeaderSizeBytes + vif.ctrlCipher.MACSize()),
- Helper: vcHelper{vif},
+ VCI: vci,
+ Dialed: side == dialedVIF,
+ LocalEP: localEP,
+ RemoteEP: remoteEP,
+ Pool: vif.pool,
+ ReserveBytes: uint(message.HeaderSizeBytes + vif.ctrlCipher.MACSize()),
+ ChannelTimeout: channelTimeout,
+ Helper: vcHelper{vif},
})
added, rq, wq := vif.vcMap.Insert(vc)
if added {
@@ -1182,6 +1193,10 @@
h.vif.sendOnExpressQ(&message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)})
}
+func (h vcHelper) SendHealthCheck(vci id.VC) {
+ h.vif.sendOnExpressQ(&message.HealthCheckRequest{VCI: vci})
+}
+
func (h vcHelper) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
if bytes == 0 {
return
diff --git a/runtime/internal/rpc/stress/mtstress/doc.go b/runtime/internal/rpc/stress/mtstress/doc.go
index 75a4ac4..e4fd432 100644
--- a/runtime/internal/rpc/stress/mtstress/doc.go
+++ b/runtime/internal/rpc/stress/mtstress/doc.go
@@ -37,6 +37,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/runtime/internal/rpc/stress/stress/doc.go b/runtime/internal/rpc/stress/stress/doc.go
index 04e1957..2fc6914 100644
--- a/runtime/internal/rpc/stress/stress/doc.go
+++ b/runtime/internal/rpc/stress/stress/doc.go
@@ -34,6 +34,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/runtime/internal/rpc/stress/stressd/doc.go b/runtime/internal/rpc/stress/stressd/doc.go
index ece1767..0ca5ed9 100644
--- a/runtime/internal/rpc/stress/stressd/doc.go
+++ b/runtime/internal/rpc/stress/stressd/doc.go
@@ -30,6 +30,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/runtime/internal/rpc/test/cancel_test.go b/runtime/internal/rpc/test/cancel_test.go
index cd5d9e9..031090c 100644
--- a/runtime/internal/rpc/test/cancel_test.go
+++ b/runtime/internal/rpc/test/cancel_test.go
@@ -6,15 +6,22 @@
import (
"io"
+ "net"
+ "sync"
"testing"
+ "time"
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/x/ref"
+ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/runtime/internal/flow/conn"
+ inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
"v.io/x/ref/test"
)
@@ -190,3 +197,204 @@
waitForCancel(t, ts, cancel)
<-done
}
+
+type channelTestServer struct {
+ waiting chan struct{}
+ canceled chan struct{}
+}
+
+func (s *channelTestServer) Run(ctx *context.T, call rpc.ServerCall, wait time.Duration) error {
+ time.Sleep(wait)
+ return nil
+}
+
+func (s *channelTestServer) WaitForCancel(ctx *context.T, call rpc.ServerCall) error {
+ close(s.waiting)
+ <-ctx.Done()
+ close(s.canceled)
+ return nil
+}
+
+type disConn struct {
+ net.Conn
+ mu sync.Mutex
+ stopread, stopwrite bool
+}
+
+func (p *disConn) stop(read, write bool) {
+ p.mu.Lock()
+ p.stopread = read
+ p.stopwrite = write
+ p.mu.Unlock()
+}
+func (p *disConn) Write(b []byte) (int, error) {
+ p.mu.Lock()
+ stopwrite := p.stopwrite
+ p.mu.Unlock()
+ if stopwrite {
+ return len(b), nil
+ }
+ return p.Conn.Write(b)
+}
+func (p *disConn) Read(b []byte) (int, error) {
+ for {
+ n, err := p.Conn.Read(b)
+ p.mu.Lock()
+ stopread := p.stopread
+ p.mu.Unlock()
+ if err != nil || !stopread {
+ return n, err
+ }
+ }
+}
+
+func registerDisProtocol(wrap string, conns chan *disConn) {
+ dial, resolve, listen, protonames := rpc.RegisteredProtocol(wrap)
+ rpc.RegisterProtocol("dis", func(ctx *context.T, p, a string, t time.Duration) (net.Conn, error) {
+ conn, err := dial(ctx, protonames[0], a, t)
+ if err == nil {
+ dc := &disConn{Conn: conn}
+ conns <- dc
+ conn = dc
+ }
+ return conn, err
+ }, func(ctx *context.T, protocol, address string) (string, string, error) {
+ _, a, err := resolve(ctx, protonames[0], address)
+ return "dis", a, err
+ }, func(ctx *context.T, protocol, address string) (net.Listener, error) {
+ return listen(ctx, protonames[0], address)
+ })
+}
+
+func findEndpoint(ctx *context.T, s rpc.Server) naming.Endpoint {
+ if status := s.Status(); len(status.Endpoints) > 0 {
+ return status.Endpoints[0]
+ } else {
+ timer := time.NewTicker(10 * time.Millisecond)
+ defer timer.Stop()
+ for _ = range timer.C {
+ if status = s.Status(); len(status.Proxies) > 0 {
+ return status.Proxies[0].Endpoint
+ }
+ }
+ }
+ return nil // Unreachable
+}
+
+func testChannelTimeout(t *testing.T, ctx *context.T) {
+ _, s, err := v23.WithNewServer(ctx, "", &channelTestServer{}, security.AllowEveryone())
+ if err != nil {
+ t.Fatal(err)
+ }
+ ep := findEndpoint(ctx, s)
+ conns := make(chan *disConn, 1)
+ registerDisProtocol(ep.Addr().Network(), conns)
+
+ iep := ep.(*inaming.Endpoint)
+ iep.Protocol = "dis"
+
+ // Long calls don't cause the timeout, the control stream is still operating.
+ err = v23.GetClient(ctx).Call(ctx, iep.Name(), "Run", []interface{}{2 * time.Second},
+ nil, options.ChannelTimeout(500*time.Millisecond))
+ if err != nil {
+ t.Errorf("got %v want nil", err)
+ }
+ (<-conns).stop(true, true)
+ err = v23.GetClient(ctx).Call(ctx, iep.Name(), "Run", []interface{}{time.Duration(0)},
+ nil, options.ChannelTimeout(100*time.Millisecond))
+ if err == nil {
+ t.Errorf("wanted non-nil error", err)
+ }
+}
+
+func TestChannelTimeout(t *testing.T) {
+ if ref.RPCTransitionState() >= ref.XServers {
+ t.Skip("The new RPC system does not yet support channel timeouts")
+ }
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+ testChannelTimeout(t, ctx)
+}
+
+func TestChannelTimeout_Proxy(t *testing.T) {
+ if ref.RPCTransitionState() >= ref.XServers {
+ t.Skip("The new RPC system does not yet support channel timeouts")
+ }
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ ls := v23.GetListenSpec(ctx)
+ pshutdown, pendpoint, err := generic.NewProxy(ctx, ls, security.AllowEveryone(), "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer pshutdown()
+ ls.Addrs = nil
+ ls.Proxy = pendpoint.Name()
+ testChannelTimeout(t, v23.WithListenSpec(ctx, ls))
+}
+
+func testChannelTimeOut_Server(t *testing.T, ctx *context.T) {
+ cts := &channelTestServer{
+ canceled: make(chan struct{}),
+ waiting: make(chan struct{}),
+ }
+ _, s, err := v23.WithNewServer(ctx, "", cts, security.AllowEveryone(),
+ options.ChannelTimeout(500*time.Millisecond))
+ if err != nil {
+ t.Fatal(err)
+ }
+ ep := findEndpoint(ctx, s)
+ conns := make(chan *disConn, 1)
+ registerDisProtocol(ep.Addr().Network(), conns)
+
+ iep := ep.(*inaming.Endpoint)
+ iep.Protocol = "dis"
+
+ // Long calls don't cause the timeout, the control stream is still operating.
+ err = v23.GetClient(ctx).Call(ctx, iep.Name(), "Run", []interface{}{2 * time.Second},
+ nil)
+ if err != nil {
+ t.Errorf("got %v want nil", err)
+ }
+ // When the server closes the VC in response to the channel timeout the server
+ // call will see a cancellation. We do a call and wait for that server-side
+ // cancellation. Then we cancel the client call just to clean up.
+ cctx, cancel := context.WithCancel(ctx)
+ done := make(chan struct{})
+ go func() {
+ v23.GetClient(cctx).Call(cctx, iep.Name(), "WaitForCancel", nil, nil)
+ close(done)
+ }()
+ <-cts.waiting
+ (<-conns).stop(true, true)
+ <-cts.canceled
+ cancel()
+ <-done
+}
+
+func TestChannelTimeout_Server(t *testing.T) {
+ if ref.RPCTransitionState() >= ref.XServers {
+ t.Skip("The new RPC system does not yet support channel timeouts")
+ }
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+ testChannelTimeOut_Server(t, ctx)
+}
+
+func TestChannelTimeout_ServerProxy(t *testing.T) {
+ if ref.RPCTransitionState() >= ref.XServers {
+ t.Skip("The new RPC system does not yet support channel timeouts")
+ }
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+ ls := v23.GetListenSpec(ctx)
+ pshutdown, pendpoint, err := generic.NewProxy(ctx, ls, security.AllowEveryone(), "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer pshutdown()
+ ls.Addrs = nil
+ ls.Proxy = pendpoint.Name()
+ testChannelTimeOut_Server(t, v23.WithListenSpec(ctx, ls))
+}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 06cc3cd..67af478 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -25,7 +25,7 @@
//
// Min is incremented whenever we want to remove support for old protocol
// versions.
-var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
+var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion12}
var Supported = version.RPCVersionRange{Min: version.RPCVersion10, Max: version.RPCVersion11}
func init() {
diff --git a/runtime/internal/rt/security.go b/runtime/internal/rt/security.go
index 1b8d6cf..35d9427 100644
--- a/runtime/internal/rt/security.go
+++ b/runtime/internal/rt/security.go
@@ -8,18 +8,14 @@
"fmt"
"os"
"os/user"
- "strconv"
- "syscall"
"v.io/v23/context"
- "v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/x/ref"
"v.io/x/ref/lib/exec"
"v.io/x/ref/lib/mgmt"
vsecurity "v.io/x/ref/lib/security"
- inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/services/agent"
"v.io/x/ref/services/agent/agentlib"
)
@@ -30,9 +26,7 @@
}
if len(credentials) > 0 {
// Explicitly specified credentials, ignore the agent.
- if _, fd, _ := agentEP(); fd >= 0 {
- syscall.Close(fd)
- }
+
// TODO(ataly, ashankar): If multiple runtimes are getting
// initialized at the same time from the same
// ref.EnvCredentials we will need some kind of locking for the
@@ -54,40 +48,6 @@
} else if principal != nil {
return principal, nil, func() { principal.Close() }, nil
}
- if ep, _, err := agentEP(); err != nil {
- return nil, nil, nil, err
- } else if ep != nil {
- // Use a new stream manager and an "incomplete" client (the
- // principal is nil) to talk to the agent.
- //
- // The lack of a principal works out for the rpc.Client
- // only because the agent uses anonymous unix sockets and
- // the SecurityNone option.
- //
- // Using a distinct stream manager to manage agent-related
- // connections helps isolate these connections to the agent
- // from management of any other connections created in the
- // process (such as future RPCs to other services).
- if ctx, err = r.WithNewStreamManager(ctx); err != nil {
- return nil, nil, nil, err
- }
- client := r.GetClient(ctx)
-
- // We reparent the context we use to construct the agent.
- // We do this because the agent needs to be able to make RPCs
- // during runtime shutdown.
- ctx, shutdown = context.WithRootCancel(ctx)
-
- // TODO(cnicolaou): the agentlib can call back into runtime to get the principal,
- // which will be a problem if the runtime is not initialized, hence this code
- // path is fragile. We should ideally provide an option to work around this case.
- if principal, err = agentlib.NewAgentPrincipal(ctx, ep, client); err != nil {
- shutdown()
- client.Close()
- return nil, nil, nil, err
- }
- return principal, []interface{}{client}, shutdown, nil
- }
// No agent, no explicit credentials specified: - create a new principal and blessing in memory.
if principal, err = vsecurity.NewPrincipal(); err != nil {
return principal, nil, nil, err
@@ -95,15 +55,6 @@
return principal, nil, func() {}, vsecurity.InitDefaultBlessings(principal, defaultBlessingName())
}
-func parseAgentFD(ep naming.Endpoint) (int, error) {
- fd := ep.Addr().String()
- ifd, err := strconv.Atoi(fd)
- if err != nil {
- ifd = -1
- }
- return ifd, nil
-}
-
func ipcAgent() (agent.Principal, error) {
handle, err := exec.GetChildHandle()
if err != nil && verror.ErrorID(err) != exec.ErrNoVersion.ID {
@@ -122,40 +73,6 @@
return agentlib.NewAgentPrincipalX(path)
}
-// agentEP returns an Endpoint to be used to communicate with
-// the security agent if the current process has been configured to use the
-// agent.
-func agentEP() (naming.Endpoint, int, error) {
- handle, err := exec.GetChildHandle()
- if err != nil && verror.ErrorID(err) != exec.ErrNoVersion.ID {
- return nil, -1, err
- }
- var endpoint string
- if handle != nil {
- // We were started by a parent (presumably, device manager).
- endpoint, _ = handle.Config.Get(mgmt.SecurityAgentEndpointConfigKey)
- } else {
- endpoint = os.Getenv(ref.EnvAgentEndpoint)
- }
- if endpoint == "" {
- return nil, -1, nil
- }
- ep, err := inaming.NewEndpoint(endpoint)
- if err != nil {
- return nil, -1, err
- }
-
- // Don't let children accidentally inherit the agent connection.
- fd, err := parseAgentFD(ep)
- if err != nil {
- return nil, -1, err
- }
- if fd >= 0 {
- syscall.CloseOnExec(fd)
- }
- return ep, fd, nil
-}
-
func defaultBlessingName() string {
var name string
if user, _ := user.Current(); user != nil && len(user.Username) > 0 {
diff --git a/services/agent/agentd/doc.go b/services/agent/agentd/doc.go
index 22f3860..d040b7c 100644
--- a/services/agent/agentd/doc.go
+++ b/services/agent/agentd/doc.go
@@ -51,6 +51,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-vmodule=
diff --git a/services/agent/agentlib/client.go b/services/agent/agentlib/client.go
index 8cc257f..6f4438a 100644
--- a/services/agent/agentlib/client.go
+++ b/services/agent/agentlib/client.go
@@ -9,24 +9,14 @@
import (
"fmt"
"io"
- "net"
- "os"
- "strconv"
"sync"
- "syscall"
- "v.io/v23/context"
- "v.io/v23/naming"
- "v.io/v23/options"
- "v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
- "v.io/v23/vtrace"
"v.io/x/ref/internal/logger"
"v.io/x/ref/services/agent"
"v.io/x/ref/services/agent/internal/cache"
"v.io/x/ref/services/agent/internal/ipc"
- "v.io/x/ref/services/agent/internal/unixfd"
)
const pkgPath = "v.io/x/ref/services/agent/agentlib"
@@ -73,35 +63,6 @@
return nil
}
-type vrpcCaller struct {
- ctx *context.T
- client rpc.Client
- name string
- cancel func()
-}
-
-func (c *vrpcCaller) Close() error {
- c.cancel()
- return nil
-}
-
-func (c *vrpcCaller) call(name string, results []interface{}, args ...interface{}) error {
- call, err := c.startCall(name, args...)
- if err != nil {
- return err
- }
- if err := call.Finish(results...); err != nil {
- return err
- }
- return nil
-}
-
-func (c *vrpcCaller) startCall(name string, args ...interface{}) (rpc.ClientCall, error) {
- ctx, _ := vtrace.WithNewTrace(c.ctx)
- // SecurityNone is safe here since we're using anonymous unix sockets.
- return c.client.StartCall(ctx, c.name, name, args, options.SecurityNone, options.Preresolved{})
-}
-
func results(inputs ...interface{}) []interface{} {
return inputs
}
@@ -141,68 +102,6 @@
return cached, nil
}
-// NewAgentPrincipal returns a security.Pricipal using the PrivateKey held in a remote agent process.
-// 'endpoint' is the endpoint for connecting to the agent, typically obtained from
-// os.GetEnv(envvar.AgentEndpoint).
-// 'ctx' should not have a deadline, and should never be cancelled while the
-// principal is in use.
-func NewAgentPrincipal(ctx *context.T, endpoint naming.Endpoint, insecureClient rpc.Client) (security.Principal, error) {
- p, err := newUncachedPrincipal(ctx, endpoint, insecureClient)
- if err != nil {
- return p, err
- }
- caller := p.caller.(*vrpcCaller)
- call, callErr := caller.startCall("NotifyWhenChanged")
- if callErr != nil {
- return nil, callErr
- }
- return cache.NewCachedPrincipal(caller.ctx, p, call)
-}
-func newUncachedPrincipal(ctx *context.T, ep naming.Endpoint, insecureClient rpc.Client) (*client, error) {
- // This isn't a real vanadium endpoint. It contains the vanadium version
- // info, but the address is serving the agent protocol.
- if ep.Addr().Network() != "" {
- return nil, verror.New(errInvalidProtocol, ctx, ep.Addr().Network())
- }
- fd, err := strconv.Atoi(ep.Addr().String())
- if err != nil {
- return nil, err
- }
- syscall.ForkLock.Lock()
- fd, err = syscall.Dup(fd)
- if err == nil {
- syscall.CloseOnExec(fd)
- }
- syscall.ForkLock.Unlock()
- if err != nil {
- return nil, err
- }
- f := os.NewFile(uintptr(fd), "agent_client")
- defer f.Close()
- conn, err := net.FileConn(f)
- if err != nil {
- return nil, err
- }
- // This is just an arbitrary 1 byte string. The value is ignored.
- data := make([]byte, 1)
- addr, err := unixfd.SendConnection(conn.(*net.UnixConn), data)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(ctx)
- caller := &vrpcCaller{
- client: insecureClient,
- name: naming.JoinAddressName(agentEndpoint("unixfd", addr.String()), ""),
- ctx: ctx,
- cancel: cancel,
- }
- agent := &client{caller: caller}
- if err := agent.fetchPublicKey(); err != nil {
- return nil, err
- }
- return agent, nil
-}
-
func (c *client) Close() error {
return c.caller.Close()
}
@@ -376,13 +275,3 @@
return
}
-func agentEndpoint(proto, addr string) string {
- // TODO: use naming.FormatEndpoint when it supports version 6.
- return fmt.Sprintf("@6@%s@%s@@@s@@@", proto, addr)
-}
-
-func AgentEndpoint(fd int) string {
- // We use an empty protocol here because this isn't really speaking
- // veyron rpc.
- return agentEndpoint("", fmt.Sprintf("%d", fd))
-}
diff --git a/services/agent/agentlib/peer_test.go b/services/agent/agentlib/peer_test.go
index 3d5ceeb..cb9ed7c 100644
--- a/services/agent/agentlib/peer_test.go
+++ b/services/agent/agentlib/peer_test.go
@@ -5,16 +5,9 @@
package agentlib
import (
- "v.io/v23/context"
- "v.io/v23/naming"
- "v.io/v23/rpc"
"v.io/v23/security"
)
-func NewUncachedPrincipal(ctx *context.T, endpoint naming.Endpoint, insecureClient rpc.Client) (security.Principal, error) {
- return newUncachedPrincipal(ctx, endpoint, insecureClient)
-}
-
func NewUncachedPrincipalX(path string) (security.Principal, error) {
return newUncachedPrincipalX(path)
}
diff --git a/services/agent/internal/pingpong/doc.go b/services/agent/internal/pingpong/doc.go
index d223a6a..41245c8 100644
--- a/services/agent/internal/pingpong/doc.go
+++ b/services/agent/internal/pingpong/doc.go
@@ -30,6 +30,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/agent/internal/test_principal/doc.go b/services/agent/internal/test_principal/doc.go
index 05b9476..689da03 100644
--- a/services/agent/internal/test_principal/doc.go
+++ b/services/agent/internal/test_principal/doc.go
@@ -26,6 +26,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/agent/internal/unixfd/unixfd.go b/services/agent/internal/unixfd/unixfd.go
deleted file mode 100644
index 025ccaf..0000000
--- a/services/agent/internal/unixfd/unixfd.go
+++ /dev/null
@@ -1,352 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package unixfd provides provides support for Dialing and Listening
-// on already connected file descriptors (like those returned by socketpair).
-package unixfd
-
-import (
- "fmt"
- "io"
- "net"
- "os"
- "strconv"
- "sync"
- "syscall"
- "time"
- "unsafe"
-
- "v.io/v23/context"
- "v.io/v23/rpc"
- "v.io/v23/verror"
-)
-
-const pkgPath = "v.io/x/ref/services/agent/internal/unixfd"
-
-var (
- errListenerClosed = verror.Register(pkgPath+".errListenerClosed", verror.NoRetry, "{1:}{2:} listener closed{:_}")
- errListenerAlreadyClosed = verror.Register(pkgPath+".errListenerAlreadyClosed", verror.NoRetry, "{1:}{2:} listener already closed{:_}")
- errCantSendSocketWithoutData = verror.Register(pkgPath+".errCantSendSocketWithoutData", verror.NoRetry, "{1:}{2:} cannot send a socket without data.{:_}")
- errWrongSentLength = verror.Register(pkgPath+".errWrongSentLength", verror.NoRetry, "{1:}{2:} expected to send {3}, {4} bytes, sent {5}, {6}{:_}")
- errTooBigOOB = verror.Register(pkgPath+".errTooBigOOB", verror.NoRetry, "{1:}{2:} received too large oob data ({3}, max {4}){:_}")
- errBadNetwork = verror.Register(pkgPath+".errBadNetwork", verror.NoRetry, "{1:}{2:} invalid network{:_}")
-)
-
-const Network string = "unixfd"
-
-func init() {
- rpc.RegisterProtocol(Network, unixFDConn, unixFDResolve, unixFDListen)
-}
-
-// singleConnListener implements net.Listener for an already-connected socket.
-// This is different from net.FileListener, which calls syscall.Listen
-// on an unconnected socket.
-type singleConnListener struct {
- c chan net.Conn
- addr net.Addr
- sync.Mutex
-}
-
-func (l *singleConnListener) getChan() chan net.Conn {
- l.Lock()
- defer l.Unlock()
- return l.c
-}
-
-func (l *singleConnListener) Accept() (net.Conn, error) {
- c := l.getChan()
- if c == nil {
- return nil, verror.New(errListenerClosed, nil)
- }
- if conn, ok := <-c; ok {
- return conn, nil
- }
- return nil, io.EOF
-}
-
-func (l *singleConnListener) Close() error {
- l.Lock()
- defer l.Unlock()
- lc := l.c
- if lc == nil {
- return verror.New(errListenerAlreadyClosed, nil)
- }
- close(l.c)
- l.c = nil
- // If the socket was never Accept'ed we need to close it.
- if c, ok := <-lc; ok {
- return c.Close()
- }
- return nil
-}
-
-func (l *singleConnListener) Addr() net.Addr {
- return l.addr
-}
-
-func unixFDConn(ctx *context.T, protocol, address string, timeout time.Duration) (net.Conn, error) {
- // TODO(cnicolaou): have this respect the timeout. Possibly have a helper
- // function that can be generally used for this, but in practice, I think
- // it'll be cleaner to use the underlying protocol's deadline support of it
- // has it.
- fd, err := strconv.ParseInt(address, 10, 32)
- if err != nil {
- return nil, err
- }
- file := os.NewFile(uintptr(fd), "tmp")
- conn, err := net.FileConn(file)
- // 'file' is not used after this point, but we keep it open
- // so that 'address' remains valid.
- if err != nil {
- file.Close()
- return nil, err
- }
- // We wrap 'conn' so we can customize the address, and also
- // to close 'file'.
- return &fdConn{addr: addr(address), sock: file, Conn: conn}, nil
-}
-
-type fdConn struct {
- addr net.Addr
- sock *os.File
- net.Conn
-
- mu sync.Mutex
- closed bool
-}
-
-func (c *fdConn) Close() (err error) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.closed {
- return nil
- }
-
- c.closed = true
- defer c.sock.Close()
- return c.Conn.Close()
-}
-
-func (c *fdConn) LocalAddr() net.Addr {
- return c.addr
-}
-
-func (c *fdConn) RemoteAddr() net.Addr {
- return c.addr
-}
-
-func unixFDResolve(ctx *context.T, _, address string) (string, string, error) {
- return Network, address, nil
-}
-
-func unixFDListen(ctx *context.T, protocol, address string) (net.Listener, error) {
- conn, err := unixFDConn(ctx, protocol, address, 0)
- if err != nil {
- return nil, err
- }
- c := make(chan net.Conn, 1)
- c <- conn
- return &singleConnListener{c, conn.LocalAddr(), sync.Mutex{}}, nil
-}
-
-type addr string
-
-func (a addr) Network() string { return Network }
-func (a addr) String() string { return string(a) }
-
-// Addr returns a net.Addr for the unixfd network for the given file descriptor.
-func Addr(fd uintptr) net.Addr {
- return addr(fmt.Sprintf("%d", fd))
-}
-
-type fileDescriptor struct {
- fd chan int
- name string
-}
-
-func newFd(fd int, name string) *fileDescriptor {
- ch := make(chan int, 1)
- ch <- fd
- close(ch)
- d := &fileDescriptor{ch, name}
- return d
-}
-
-func (f *fileDescriptor) releaseAddr() net.Addr {
- if fd, ok := <-f.fd; ok {
- return Addr(uintptr(fd))
- }
- return nil
-}
-
-func (f *fileDescriptor) releaseFile() *os.File {
- if fd, ok := <-f.fd; ok {
- return os.NewFile(uintptr(fd), f.name)
- }
- return nil
-}
-
-// maybeClose closes the file descriptor, if it hasn't been released.
-func (f *fileDescriptor) maybeClose() {
- if file := f.releaseFile(); file != nil {
- file.Close()
- }
-}
-
-// Socketpair returns a pair of connected sockets for communicating with a child process.
-func Socketpair() (*net.UnixConn, *os.File, error) {
- lfd, rfd, err := socketpair()
- if err != nil {
- return nil, nil, err
- }
- defer rfd.maybeClose()
- file := lfd.releaseFile()
- // FileConn dups the fd, so we still want to close the original one.
- defer file.Close()
- conn, err := net.FileConn(file)
- if err != nil {
- return nil, nil, err
- }
- return conn.(*net.UnixConn), rfd.releaseFile(), nil
-}
-
-func socketpair() (local, remote *fileDescriptor, err error) {
- syscall.ForkLock.RLock()
- fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
- if err == nil {
- syscall.CloseOnExec(fds[0])
- syscall.CloseOnExec(fds[1])
- }
- syscall.ForkLock.RUnlock()
- if err != nil {
- return nil, nil, err
- }
- return newFd(fds[0], "local"), newFd(fds[1], "remote"), nil
-}
-
-// SendConnection creates a new connected socket and sends
-// one end over 'conn', along with 'data'. It returns the address for
-// the local end of the socketpair.
-// Note that the returned address is an open file descriptor,
-// which you must close if you do not Dial or Listen to the address.
-func SendConnection(conn *net.UnixConn, data []byte) (addr net.Addr, err error) {
- if len(data) < 1 {
- return nil, verror.New(errCantSendSocketWithoutData, nil)
- }
- remote, local, err := socketpair()
- if err != nil {
- return nil, err
- }
- defer local.maybeClose()
- rfile := remote.releaseFile()
-
- rights := syscall.UnixRights(int(rfile.Fd()))
- n, oobn, err := conn.WriteMsgUnix(data, rights, nil)
- if err != nil {
- rfile.Close()
- return nil, err
- } else if n != len(data) || oobn != len(rights) {
- rfile.Close()
- return nil, verror.New(errWrongSentLength, nil, len(data), len(rights), n, oobn)
- }
- // Wait for the other side to acknowledge.
- // This is to work around a race on OS X where it appears we can close
- // the file descriptor before it gets transfered over the socket.
- f := local.releaseFile()
- syscall.ForkLock.Lock()
- fd, err := syscall.Dup(int(f.Fd()))
- if err != nil {
- syscall.ForkLock.Unlock()
- f.Close()
- rfile.Close()
- return nil, err
- }
- syscall.CloseOnExec(fd)
- syscall.ForkLock.Unlock()
- newConn, err := net.FileConn(f)
- f.Close()
- if err != nil {
- rfile.Close()
- return nil, err
- }
- newConn.Read(make([]byte, 1))
- newConn.Close()
- rfile.Close()
-
- return Addr(uintptr(fd)), nil
-}
-
-const cmsgDataLength = int(unsafe.Sizeof(int(1)))
-
-// ReadConnection reads a connection and additional data sent on 'conn' via a call to SendConnection.
-// 'buf' must be large enough to hold the data.
-// The returned function must be called when you are ready for the other side
-// to start sending data, but before writing anything to the connection.
-// If there is an error you must still call the function before closing the connection.
-func ReadConnection(conn *net.UnixConn, buf []byte) (net.Addr, int, func(), error) {
- oob := make([]byte, syscall.CmsgLen(cmsgDataLength))
- n, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
- if err != nil {
- return nil, n, nil, err
- }
- if oobn > len(oob) {
- return nil, n, nil, verror.New(errTooBigOOB, nil, oobn, len(oob))
- }
- scms, err := syscall.ParseSocketControlMessage(oob[:oobn])
- if err != nil {
- return nil, n, nil, err
- }
- fd := -1
- // Loop through any file descriptors we are sent, and close
- // all extras.
- for _, scm := range scms {
- fds, err := syscall.ParseUnixRights(&scm)
- if err != nil {
- return nil, n, nil, err
- }
- for _, f := range fds {
- if fd == -1 {
- fd = f
- } else if f != -1 {
- syscall.Close(f)
- }
- }
- }
- if fd == -1 {
- return nil, n, nil, nil
- }
- result := Addr(uintptr(fd))
- syscall.ForkLock.Lock()
- fd, err = syscall.Dup(fd)
- if err != nil {
- syscall.ForkLock.Unlock()
- CloseUnixAddr(result)
- return nil, n, nil, err
- }
- syscall.CloseOnExec(fd)
- syscall.ForkLock.Unlock()
- file := os.NewFile(uintptr(fd), "newconn")
- newconn, err := net.FileConn(file)
- file.Close()
- if err != nil {
- CloseUnixAddr(result)
- return nil, n, nil, err
- }
- return result, n, func() {
- newconn.Write(make([]byte, 1))
- newconn.Close()
- }, nil
-}
-
-func CloseUnixAddr(addr net.Addr) error {
- if addr.Network() != Network {
- return verror.New(errBadNetwork, nil)
- }
- fd, err := strconv.ParseInt(addr.String(), 10, 32)
- if err != nil {
- return err
- }
- return syscall.Close(int(fd))
-}
diff --git a/services/agent/internal/unixfd/unixfd_test.go b/services/agent/internal/unixfd/unixfd_test.go
deleted file mode 100644
index 7f1bc6c..0000000
--- a/services/agent/internal/unixfd/unixfd_test.go
+++ /dev/null
@@ -1,202 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package unixfd
-
-import (
- "bytes"
- "io"
- "net"
- "reflect"
- "testing"
-
- "v.io/v23/context"
-)
-
-type nothing struct{}
-
-func dial(fd *fileDescriptor) (net.Conn, net.Addr, error) {
- addr := fd.releaseAddr()
- ctx, _ := context.RootContext()
- conn, err := unixFDConn(ctx, Network, addr.String(), 0)
- return conn, addr, err
-}
-
-func listen(fd *fileDescriptor) (net.Listener, net.Addr, error) {
- addr := fd.releaseAddr()
- ctx, _ := context.RootContext()
- l, err := unixFDListen(ctx, Network, addr.String())
- return l, addr, err
-}
-
-func testWrite(t *testing.T, c net.Conn, data string) {
- n, err := c.Write([]byte(data))
- if err != nil {
- t.Errorf("Write: %v", err)
- return
- }
- if n != len(data) {
- t.Errorf("Wrote %d bytes, expected %d", n, len(data))
- }
-}
-
-func testRead(t *testing.T, c net.Conn, expected string) {
- buf := make([]byte, len(expected)+2)
- n, err := c.Read(buf)
- if err != nil {
- t.Errorf("Read: %v", err)
- return
- }
- if n != len(expected) || !bytes.Equal(buf[0:n], []byte(expected)) {
- t.Errorf("got %q, expected %q", buf[0:n], expected)
- }
-}
-
-func TestDial(t *testing.T) {
- // TODO(ribrdb): Delete the unixfd code if it's no longer needed. These
- // tests are flakey.
- t.Skip()
-
- local, remote, err := socketpair()
- if err != nil {
- t.Fatalf("socketpair: %v", err)
- }
- a, a_addr, err := dial(local)
- if err != nil {
- t.Fatalf("dial: %v", err)
- }
- b, b_addr, err := dial(remote)
- if err != nil {
- t.Fatalf("dial: %v", err)
- }
-
- testWrite(t, a, "TEST1")
- testRead(t, b, "TEST1")
- testWrite(t, b, "TEST2")
- testRead(t, a, "TEST2")
-
- if !reflect.DeepEqual(a.LocalAddr(), a_addr) {
- t.Errorf("Invalid address %v, expected %v", a.LocalAddr(), a_addr)
- }
- if !reflect.DeepEqual(a.RemoteAddr(), a_addr) {
- t.Errorf("Invalid address %v, expected %v", a.RemoteAddr(), a_addr)
- }
- if !reflect.DeepEqual(b.LocalAddr(), b_addr) {
- t.Errorf("Invalid address %v, expected %v", a.LocalAddr(), b_addr)
- }
- if !reflect.DeepEqual(b.RemoteAddr(), b_addr) {
- t.Errorf("Invalid address %v, expected %v", a.RemoteAddr(), b_addr)
- }
-}
-
-func TestListen(t *testing.T) {
- // TODO(ribrdb): Delete the unixfd code if it's no longer needed. These
- // tests are flakey.
- t.Skip()
-
- local, remote, err := socketpair()
- if err != nil {
- t.Fatalf("socketpair: %v", err)
- }
- a, _, err := dial(local)
- if err != nil {
- t.Fatalf("dial: %v", err)
- }
- l, _, err := listen(remote)
- if err != nil {
- t.Fatalf("listen: %v", err)
- }
- b, err := l.Accept()
- if err != nil {
- t.Fatalf("accept: %v", err)
- }
- start := make(chan nothing, 0)
- done := make(chan nothing)
- go func() {
- defer close(done)
- <-start
- if _, err := l.Accept(); err != io.EOF {
- t.Fatalf("accept: expected EOF, got %v", err)
- }
- }()
-
- // block until the goroutine starts running
- start <- nothing{}
- testWrite(t, a, "LISTEN")
- testRead(t, b, "LISTEN")
-
- err = l.Close()
- if err != nil {
- t.Fatalf("close: %v", err)
- }
- <-done
-
- // After closed, accept should fail immediately
- _, err = l.Accept()
- if err == nil {
- t.Fatalf("Accept succeeded after close")
- }
- err = l.Close()
- if err == nil {
- t.Fatalf("Close succeeded twice")
- }
-}
-
-func TestSendConnection(t *testing.T) {
- // TODO(ribrdb): Delete the unixfd code if it's no longer needed. These
- // tests are flakey.
- t.Skip()
-
- server, client, err := Socketpair()
- if err != nil {
- t.Fatalf("Socketpair: %v", err)
- }
- uclient, err := net.FileConn(client)
- if err != nil {
- t.Fatalf("FileConn: %v", err)
- }
- var readErr error
- var n int
- var saddr net.Addr
- done := make(chan struct{})
- buf := make([]byte, 10)
- go func() {
- var ack func()
- saddr, n, ack, readErr = ReadConnection(server, buf)
- if ack != nil {
- ack()
- }
- close(done)
- }()
- caddr, err := SendConnection(uclient.(*net.UnixConn), []byte("hello"))
- if err != nil {
- t.Fatalf("SendConnection: %v", err)
- }
- <-done
- if readErr != nil {
- t.Fatalf("ReadConnection: %v", readErr)
- }
- if saddr == nil {
- t.Fatalf("ReadConnection returned nil, %d", n)
- }
- data := buf[0:n]
- if !bytes.Equal([]byte("hello"), data) {
- t.Fatalf("unexpected data %q", data)
- }
-
- ctx, _ := context.RootContext()
- a, err := unixFDConn(ctx, Network, caddr.String(), 0)
- if err != nil {
- t.Fatalf("dial %v: %v", caddr, err)
- }
- b, err := unixFDConn(ctx, Network, saddr.String(), 0)
- if err != nil {
- t.Fatalf("dial %v: %v", saddr, err)
- }
-
- testWrite(t, a, "TEST1")
- testRead(t, b, "TEST1")
- testWrite(t, b, "TEST2")
- testRead(t, a, "TEST2")
-}
diff --git a/services/agent/keymgr/client.go b/services/agent/keymgr/client.go
index 7515f58..e4e8213 100644
--- a/services/agent/keymgr/client.go
+++ b/services/agent/keymgr/client.go
@@ -7,17 +7,10 @@
package keymgr
import (
- "net"
- "os"
- "strconv"
- "sync"
-
- "v.io/v23/context"
"v.io/v23/verror"
"v.io/x/ref/services/agent"
"v.io/x/ref/services/agent/internal/ipc"
"v.io/x/ref/services/agent/internal/server"
- "v.io/x/ref/services/agent/internal/unixfd"
)
const pkgPath = "v.io/x/ref/services/agent/keymgr"
@@ -30,22 +23,10 @@
verror.NoRetry, "{1:}{2:} Invalid key handle")
)
-const defaultManagerSocket = 4
-
type keyManager struct {
conn *ipc.IPCConn
}
-type Agent struct {
- conn *net.UnixConn // Guarded by mu
- mu sync.Mutex
-}
-
-// NewAgent returns a client connected to the agent on the default file descriptors.
-func NewAgent() (*Agent, error) {
- return newAgent(defaultManagerSocket)
-}
-
// NewKeyManager returns a client connected to the specified KeyManager.
func NewKeyManager(path string) (agent.KeyManager, error) {
i := ipc.NewIPC()
@@ -61,46 +42,6 @@
return server.NewLocalKeyManager(path, passphrase)
}
-func newAgent(fd int) (a *Agent, err error) {
- file := os.NewFile(uintptr(fd), "fd")
- defer file.Close()
- conn, err := net.FileConn(file)
- if err != nil {
- return nil, err
- }
-
- return &Agent{conn: conn.(*net.UnixConn)}, nil
-}
-
-// TODO(caprita): Get rid of *context.T arg. Doesn't seem to be used.
-
-// NewPrincipal creates a new principal and returns the handle and a socket serving
-// the principal.
-// Typically the socket will be passed to a child process using cmd.ExtraFiles.
-func (a *Agent) NewPrincipal(ctx *context.T, inMemory bool) (handle []byte, conn *os.File, err error) {
- req := make([]byte, 1)
- if inMemory {
- req[0] = 1
- }
- a.mu.Lock()
- defer a.mu.Unlock()
- conn, err = a.connect(req)
- if err != nil {
- return nil, nil, err
- }
- buf := make([]byte, agent.PrincipalHandleByteSize)
- n, err := a.conn.Read(buf)
- if err != nil {
- conn.Close()
- return nil, nil, err
- }
- if n != agent.PrincipalHandleByteSize {
- conn.Close()
- return nil, nil, verror.New(errInvalidResponse, ctx, agent.PrincipalHandleByteSize, n)
- }
- return buf, conn, nil
-}
-
// NewPrincipal creates a new principal and returns a handle.
// The handle may be passed to ServePrincipal to start an agent serving the principal.
func (m *keyManager) NewPrincipal(inMemory bool) (handle [agent.PrincipalHandleByteSize]byte, err error) {
@@ -109,30 +50,6 @@
return
}
-func (a *Agent) connect(req []byte) (*os.File, error) {
- addr, err := unixfd.SendConnection(a.conn, req)
- if err != nil {
- return nil, err
- }
- fd, err := strconv.ParseInt(addr.String(), 10, 32)
- if err != nil {
- return nil, err
- }
- return os.NewFile(uintptr(fd), "client"), nil
-}
-
-// NewConnection creates a connection to an agent which exports a principal
-// previously created with NewPrincipal.
-// Typically this will be passed to a child process using cmd.ExtraFiles.
-func (a *Agent) NewConnection(handle []byte) (*os.File, error) {
- if len(handle) != agent.PrincipalHandleByteSize {
- return nil, verror.New(errInvalidKeyHandle, nil)
- }
- a.mu.Lock()
- defer a.mu.Unlock()
- return a.connect(handle)
-}
-
// ServePrincipal creates a socket at socketPath and serves a principal
// previously created with NewPrincipal.
func (m *keyManager) ServePrincipal(handle [agent.PrincipalHandleByteSize]byte, socketPath string) error {
diff --git a/services/agent/pod_agentd/doc.go b/services/agent/pod_agentd/doc.go
index 987873b..f49220b 100644
--- a/services/agent/pod_agentd/doc.go
+++ b/services/agent/pod_agentd/doc.go
@@ -38,6 +38,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/agent/vbecome/doc.go b/services/agent/vbecome/doc.go
index 7dcdc36..c0ce02b 100644
--- a/services/agent/vbecome/doc.go
+++ b/services/agent/vbecome/doc.go
@@ -36,6 +36,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/application/application/doc.go b/services/application/application/doc.go
index e90bb42..4f17c66 100644
--- a/services/application/application/doc.go
+++ b/services/application/application/doc.go
@@ -35,6 +35,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/application/applicationd/doc.go b/services/application/applicationd/doc.go
index 740e023..6fa63e3 100644
--- a/services/application/applicationd/doc.go
+++ b/services/application/applicationd/doc.go
@@ -33,6 +33,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/binary/binary/doc.go b/services/binary/binary/doc.go
index 3ad2675..763135b 100644
--- a/services/binary/binary/doc.go
+++ b/services/binary/binary/doc.go
@@ -33,6 +33,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/binary/binaryd/doc.go b/services/binary/binaryd/doc.go
index e3c8fa3..be2892c 100644
--- a/services/binary/binaryd/doc.go
+++ b/services/binary/binaryd/doc.go
@@ -35,6 +35,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/binary/tidy/doc.go b/services/binary/tidy/doc.go
index 7f67a37..01d07be 100644
--- a/services/binary/tidy/doc.go
+++ b/services/binary/tidy/doc.go
@@ -31,6 +31,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/build/build/doc.go b/services/build/build/doc.go
index 2699b49..6e621c8 100644
--- a/services/build/build/doc.go
+++ b/services/build/build/doc.go
@@ -30,6 +30,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/build/buildd/doc.go b/services/build/buildd/doc.go
index 2a5df3a..8cae0c7 100644
--- a/services/build/buildd/doc.go
+++ b/services/build/buildd/doc.go
@@ -36,6 +36,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/cluster/cluster_agent/doc.go b/services/cluster/cluster_agent/doc.go
index 0ab4629..5b20720 100644
--- a/services/cluster/cluster_agent/doc.go
+++ b/services/cluster/cluster_agent/doc.go
@@ -37,6 +37,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/cluster/cluster_agentd/doc.go b/services/cluster/cluster_agentd/doc.go
index bb9b0ba..fbc2877 100644
--- a/services/cluster/cluster_agentd/doc.go
+++ b/services/cluster/cluster_agentd/doc.go
@@ -34,6 +34,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/cluster/vkube/cluster-agent.go b/services/cluster/vkube/cluster-agent.go
new file mode 100644
index 0000000..20f267d
--- /dev/null
+++ b/services/cluster/vkube/cluster-agent.go
@@ -0,0 +1,220 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "fmt"
+ "strings"
+
+ "v.io/v23/context"
+ "v.io/v23/security"
+ "v.io/v23/services/device"
+)
+
+const (
+ clusterAgentServiceName = "cluster-agent"
+ clusterAgentServicePort = 8193
+ clusterAgentApplicationName = "cluster-agentd"
+)
+
+// createClusterAgent creates a ReplicationController and a Service to run the
+// cluster agent.
+func createClusterAgent(ctx *context.T, config *vkubeConfig) error {
+ if err := createNamespaceIfNotExist(config.ClusterAgent.Namespace); err != nil {
+ return err
+ }
+ version := "latest"
+ if p := strings.Split(config.ClusterAgent.Image, ":"); len(p) == 2 {
+ version = p[1]
+ }
+ ca := object{
+ "apiVersion": "v1",
+ "kind": "ReplicationController",
+ "metadata": object{
+ "name": clusterAgentApplicationName + "-" + version,
+ "labels": object{
+ "application": clusterAgentApplicationName,
+ },
+ "namespace": config.ClusterAgent.Namespace,
+ },
+ "spec": object{
+ "replicas": 1,
+ "template": object{
+ "metadata": object{
+ "labels": object{
+ "application": clusterAgentApplicationName,
+ "deployment": version,
+ },
+ },
+ "spec": object{
+ "containers": []object{
+ object{
+ "name": "cluster-agentd",
+ "image": config.ClusterAgent.Image,
+ "ports": []object{
+ object{
+ "containerPort": clusterAgentServicePort,
+ },
+ },
+ "resources": object{
+ "limits": object{
+ "cpu": config.ClusterAgent.CPU,
+ "memory": config.ClusterAgent.Memory,
+ },
+ },
+ "volumeMounts": []object{
+ object{
+ "name": "data",
+ "mountPath": "/data",
+ },
+ object{
+ "name": "logs",
+ "mountPath": "/logs",
+ },
+ },
+ "env": []object{
+ object{
+ "name": "ROOT_BLESSINGS",
+ "value": rootBlessings(ctx),
+ },
+ object{
+ "name": "CLAIMER",
+ "value": clusterAgentClaimer(config),
+ },
+ object{
+ "name": "ADMIN",
+ "value": config.ClusterAgent.Admin,
+ },
+ object{
+ "name": "DATADIR",
+ "value": "/data",
+ },
+ object{
+ "name": "LOGDIR",
+ "value": "/logs",
+ },
+ },
+ },
+ },
+ "volumes": []interface{}{
+ object{
+ "name": "logs",
+ "emptyDir": object{},
+ },
+ },
+ },
+ },
+ },
+ }
+ if config.ClusterAgent.PersistentDisk == "" {
+ ca.append("spec.template.spec.volumes", object{
+ "name": "data",
+ "emptyDir": object{},
+ })
+ } else {
+ ca.append("spec.template.spec.volumes", object{
+ "name": "data",
+ "gcePersistentDisk": object{
+ "pdName": config.ClusterAgent.PersistentDisk,
+ "fsType": "ext4",
+ },
+ })
+ }
+
+ if out, err := kubectlCreate(ca); err != nil {
+ return fmt.Errorf("failed to create replication controller: %v\n%s\n", err, string(out))
+ }
+
+ svc := object{
+ "apiVersion": "v1",
+ "kind": "Service",
+ "metadata": object{
+ "name": clusterAgentServiceName,
+ "namespace": config.ClusterAgent.Namespace,
+ },
+ "spec": object{
+ "ports": []object{
+ object{
+ "port": clusterAgentServicePort,
+ "targetPort": clusterAgentServicePort,
+ },
+ },
+ "selector": object{
+ "application": clusterAgentApplicationName,
+ },
+ "type": "LoadBalancer",
+ },
+ }
+ if config.ClusterAgent.ExternalIP != "" {
+ svc.set("spec.loadBalancerIP", config.ClusterAgent.ExternalIP)
+ }
+ if out, err := kubectlCreate(svc); err != nil {
+ return fmt.Errorf("failed to create service: %v\n%s\n", err, string(out))
+ }
+ return nil
+}
+
+// stopClusterAgent stops the cluster agent ReplicationController and deletes
+// its Service.
+func stopClusterAgent(config *vkubeConfig) error {
+ if out, err := kubectl("--namespace="+config.ClusterAgent.Namespace, "stop", "rc", "-l", "application="+clusterAgentApplicationName); err != nil {
+ return fmt.Errorf("failed to stop %s: %v: %s", clusterAgentApplicationName, err, out)
+ }
+ if out, err := kubectl("--namespace="+config.ClusterAgent.Namespace, "delete", "service", clusterAgentServiceName); err != nil {
+ return fmt.Errorf("failed to delete %s: %v: %s", clusterAgentServiceName, err, out)
+ }
+ return nil
+}
+
+// clusterAgentClaimer returns the blessing name of the claimer of the cluster
+// agent.
+func clusterAgentClaimer(config *vkubeConfig) string {
+ p := strings.Split(config.ClusterAgent.Blessing, security.ChainSeparator)
+ return strings.Join(p[:len(p)-1], security.ChainSeparator)
+}
+
+// findClusterAgent returns the external address of the cluster agent.
+func findClusterAgent(config *vkubeConfig, includeBlessings bool) (string, error) {
+ out, err := kubectl("--namespace="+config.ClusterAgent.Namespace, "get", "service", clusterAgentServiceName, "-o", "json")
+ if err != nil {
+ return "", fmt.Errorf("failed to get info of %s: %v: %s", clusterAgentServiceName, err, out)
+ }
+ var svc object
+ if err := svc.importJSON(out); err != nil {
+ return "", fmt.Errorf("failed to parse kubectl output: %v", err)
+ }
+ ports := svc.getObjectArray("spec.ports")
+ if len(ports) == 0 {
+ return "", fmt.Errorf("service %q has no ports", clusterAgentServiceName)
+ }
+ port := ports[0].getInt("port")
+ if port < 0 {
+ return "", fmt.Errorf("service %q has no valid port: %v", clusterAgentServiceName, port)
+ }
+ ingress := svc.getObjectArray("status.loadBalancer.ingress")
+ if len(ingress) == 0 {
+ return "", fmt.Errorf("service %q has no loadbalancer ingress", clusterAgentServiceName)
+ }
+ ip := ingress[0].getString("ip")
+ if ip == "" {
+ return "", fmt.Errorf("service %q loadbalancer has no valid ip", clusterAgentServiceName)
+ }
+ if includeBlessings {
+ return fmt.Sprintf("/(%s)@%s:%d", config.ClusterAgent.Blessing, ip, port), nil
+ }
+ return fmt.Sprintf("/%s:%d", ip, port), nil
+}
+
+// claimClusterAgent claims the cluster agent with the given blessing extension.
+func claimClusterAgent(ctx *context.T, config *vkubeConfig, extension string) error {
+ addr, err := findClusterAgent(config, false)
+ if err != nil {
+ return err
+ }
+ if err := device.ClaimableClient(addr).Claim(ctx, "", &granter{extension: extension}); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/services/cluster/vkube/config.go b/services/cluster/vkube/config.go
new file mode 100644
index 0000000..a7c2c55
--- /dev/null
+++ b/services/cluster/vkube/config.go
@@ -0,0 +1,72 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+)
+
+// The config file used by the vkube command.
+type vkubeConfig struct {
+ // The GCE project name.
+ Project string `json:"project"`
+ // The GCE zone.
+ Zone string `json:"zone"`
+ // The name of the Kubernetes cluster.
+ Cluster string `json:"cluster"`
+
+ ClusterAgent clusterAgentConfig `json:"clusterAgent"`
+ PodAgent podAgentConfig `json:"podAgent"`
+}
+
+type clusterAgentConfig struct {
+ // The Kubernetes namespace of the cluster agent. An empty
+ // value is equivalent to "default".
+ Namespace string `json:"namespace"`
+ // The name of the docker image for the cluster agent.
+ Image string `json:"image"`
+ // The amount of CPU to reserve for the cluster agent.
+ CPU string `json:"cpu"`
+ // The amount of memory to reserve for the cluster agent.
+ Memory string `json:"memory"`
+ // The blessing name of the cluster agent.
+ Blessing string `json:"blessing"`
+ // The blessing pattern of the cluster agent admin, i.e. who's
+ // allowed to create and delete secrets.
+ Admin string `json:"admin"`
+ // The external IP address of the cluster agent. An empty value
+ // means that an ephemeral address will be used.
+ // TODO(rthellend): This doesn't currently work.
+ // https://github.com/kubernetes/kubernetes/issues/10323
+ // https://github.com/kubernetes/kubernetes/pull/13005
+ ExternalIP string `json:"externalIP"`
+ // The name of the Persistent Disk of the cluster agent. An
+ // value means that the cluster agent won't use a persistent
+ // disk.
+ PersistentDisk string `json:"persistentDisk"`
+}
+
+type podAgentConfig struct {
+ // The name of the docker image for the pod agent.
+ Image string `json:"image"`
+}
+
+// readConfig reads a config file.
+func readConfig(fileName string) (*vkubeConfig, error) {
+ data, err := ioutil.ReadFile(fileName)
+ if err != nil {
+ return nil, err
+ }
+ var config vkubeConfig
+ if err := json.Unmarshal(data, &config); err != nil {
+ return nil, fmt.Errorf("json.Unmarshal: %v", err)
+ }
+ if config.ClusterAgent.Namespace == "" {
+ config.ClusterAgent.Namespace = "default"
+ }
+ return &config, nil
+}
diff --git a/services/cluster/vkube/doc.go b/services/cluster/vkube/doc.go
new file mode 100644
index 0000000..24bbfaf
--- /dev/null
+++ b/services/cluster/vkube/doc.go
@@ -0,0 +1,179 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated via go generate.
+// DO NOT UPDATE MANUALLY
+
+/*
+Manages Vanadium applications on kubernetes
+
+Usage:
+ vkube [flags] <command>
+
+The vkube commands are:
+ get-credentials Gets the kubernetes credentials from Google Cloud.
+ start Starts an application.
+ update Updates an application.
+ stop Stops an application.
+ start-cluster-agent Starts the cluster agent.
+ stop-cluster-agent Stops the cluster agent.
+ claim-cluster-agent Claims the cluster agent.
+ build-docker-images Builds the docker images for the cluster and pod agents.
+ help Display help for commands or topics
+
+The vkube flags are:
+ -config=vkube.cfg
+ The 'vkube.cfg' file to use.
+ -gcloud=gcloud
+ The 'gcloud' binary to use.
+ -kubectl=kubectl
+ The 'kubectl' binary to use.
+
+The global flags are:
+ -alsologtostderr=true
+ log to standard error as well as files
+ -log_backtrace_at=:0
+ when logging hits line file:N, emit a stack trace
+ -log_dir=
+ if non-empty, write log files to this directory
+ -logtostderr=false
+ log to standard error instead of files
+ -max_stack_buf_size=4292608
+ max size in bytes of the buffer to use for logging stack traces
+ -metadata=<just specify -metadata to activate>
+ Displays metadata for the program and exits.
+ -stderrthreshold=2
+ logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
+ -v=0
+ log level for V logs
+ -v23.credentials=
+ directory to use for storing security credentials
+ -v23.i18n-catalogue=
+ 18n catalogue files to load, comma separated
+ -v23.namespace.root=[/(dev.v.io/role/vprod/service/mounttabled)@ns.dev.v.io:8101]
+ local namespace root; can be repeated to provided multiple roots
+ -v23.proxy=
+ object name of proxy service to use to export services across network
+ boundaries
+ -v23.tcp.address=
+ address to listen on
+ -v23.tcp.protocol=wsh
+ protocol to listen with
+ -v23.vtrace.cache-size=1024
+ The number of vtrace traces to store in memory.
+ -v23.vtrace.collect-regexp=
+ Spans and annotations that match this regular expression will trigger trace
+ collection.
+ -v23.vtrace.dump-on-shutdown=true
+ If true, dump all stored traces on runtime shutdown.
+ -v23.vtrace.sample-rate=0
+ Rate (from 0.0 to 1.0) to sample vtrace traces.
+ -vmodule=
+ comma-separated list of pattern=N settings for filename-filtered logging
+ -vpath=
+ comma-separated list of pattern=N settings for file pathname-filtered logging
+
+Vkube get-credentials
+
+Gets the kubernetes credentials from Google Cloud.
+
+Usage:
+ vkube get-credentials
+
+Vkube start
+
+Starts an application.
+
+Usage:
+ vkube start [flags] <extension>
+
+<extension> The blessing name extension to give to the application.
+
+The vkube start flags are:
+ -f=
+ Filename to use to create the kubernetes resource.
+
+Vkube update
+
+Updates an application to a new version with a rolling update, preserving the
+existing blessings.
+
+Usage:
+ vkube update [flags]
+
+The vkube update flags are:
+ -f=
+ Filename to use to update the kubernetes resource.
+
+Vkube stop
+
+Stops an application.
+
+Usage:
+ vkube stop [flags]
+
+The vkube stop flags are:
+ -f=
+ Filename to use to stop the kubernetes resource.
+
+Vkube start-cluster-agent
+
+Starts the cluster agent.
+
+Usage:
+ vkube start-cluster-agent
+
+Vkube stop-cluster-agent
+
+Stops the cluster agent.
+
+Usage:
+ vkube stop-cluster-agent
+
+Vkube claim-cluster-agent
+
+Claims the cluster agent.
+
+Usage:
+ vkube claim-cluster-agent
+
+Vkube build-docker-images
+
+Builds the docker images for the cluster and pod agents.
+
+Usage:
+ vkube build-docker-images [flags]
+
+The vkube build-docker-images flags are:
+ -v=false
+ When true, the output is more verbose.
+
+Vkube help - Display help for commands or topics
+
+Help with no args displays the usage of the parent command.
+
+Help with args displays the usage of the specified sub-command or help topic.
+
+"help ..." recursively displays help for all commands and topics.
+
+Usage:
+ vkube help [flags] [command/topic ...]
+
+[command/topic ...] optionally identifies a specific sub-command or help topic.
+
+The vkube help flags are:
+ -style=compact
+ The formatting style for help output:
+ compact - Good for compact cmdline output.
+ full - Good for cmdline output, shows all global flags.
+ godoc - Good for godoc processing.
+ Override the default by setting the CMDLINE_STYLE environment variable.
+ -width=<terminal width>
+ Format output to this target width in runes, or unlimited if width < 0.
+ Defaults to the terminal width if available. Override the default by setting
+ the CMDLINE_WIDTH environment variable.
+*/
+package main
diff --git a/services/cluster/vkube/docker.go b/services/cluster/vkube/docker.go
new file mode 100644
index 0000000..4eb223b
--- /dev/null
+++ b/services/cluster/vkube/docker.go
@@ -0,0 +1,161 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "time"
+)
+
+const (
+ clusterAgentDockerfile = `
+FROM debian:stable
+
+# gcloud
+RUN apt-get update && apt-get install -y -qq --no-install-recommends wget unzip python php5-mysql php5-cli php5-cgi openjdk-7-jre-headless openssh-client python-openssl && apt-get clean
+RUN wget https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && unzip google-cloud-sdk.zip && rm google-cloud-sdk.zip
+ENV CLOUDSDK_PYTHON_SITEPACKAGES 1
+ENV HOME /root
+RUN google-cloud-sdk/install.sh --usage-reporting=false --path-update=true --bash-completion=true --rc-path=/root/.bashrc --disable-installation-options && \
+ google-cloud-sdk/bin/gcloud --quiet components update preview alpha beta app kubectl && \
+ google-cloud-sdk/bin/gcloud --quiet config set component_manager/disable_update_check true
+ENV PATH /google-cloud-sdk/bin:$PATH
+
+# vanadium
+#RUN apt-get install --no-install-recommends -y -q libssl1.0.0
+ADD claimable cluster_agent cluster_agentd init.sh /usr/local/bin/
+RUN chmod 755 /usr/local/bin/*
+
+EXPOSE 8193
+CMD ["/usr/local/bin/init.sh"]
+`
+ clusterAgentInitSh = `#!/bin/sh
+if [ ! -e "${DATADIR}/perms" ]; then
+ # Not claimed
+ /usr/local/bin/claimable \
+ --v23.credentials="${DATADIR}/creds" \
+ --v23.tcp.address=:8193 \
+ --root-blessings="${ROOT_BLESSINGS}" \
+ --perms-dir="${DATADIR}/perms" \
+ --v23.permissions.literal="{\"Admin\":{\"In\":[\"${CLAIMER}\"]}}" \
+ --log_dir="${LOGDIR}" \
+ --alsologtostderr=false
+fi
+
+mkdir -p "${DATADIR}/blessings"
+
+exec /usr/local/bin/cluster_agentd \
+ --v23.credentials="${DATADIR}/creds" \
+ --v23.tcp.address=:8193 \
+ --v23.permissions.literal="{\"Admin\":{\"In\":[\"${ADMIN}\"]}}" \
+ --log_dir="${LOGDIR}" \
+ --root-dir="${DATADIR}/blessings" \
+ --alsologtostderr=false
+`
+
+ podAgentDockerfile = `
+FROM debian:stable
+RUN apt-get update && apt-get install --no-install-recommends -y -q libssl1.0.0
+ADD pod_agentd /usr/local/bin/
+RUN chmod 755 /usr/local/bin/pod_agentd
+`
+)
+
+type dockerFile struct {
+ name string
+ content []byte
+}
+
+type dockerCmd struct {
+ name string
+ args []string
+}
+
+func buildDockerImages(config *vkubeConfig, verbose bool, stdout io.Writer) error {
+ ts := time.Now().Format("20060102150405")
+ // Cluster agent image.
+ imageName := removeTag(config.ClusterAgent.Image)
+ imageNameTag := fmt.Sprintf("%s:%s", imageName, ts)
+
+ var out io.Writer
+ if verbose {
+ out = stdout
+ }
+
+ if err := buildDockerImage([]dockerFile{
+ {"Dockerfile", []byte(clusterAgentDockerfile)},
+ {"init.sh", []byte(clusterAgentInitSh)},
+ }, []dockerCmd{
+ {"jiri", []string{"go", "build", "-o", "claimable", "v.io/x/ref/services/device/claimable"}},
+ {"jiri", []string{"go", "build", "-o", "cluster_agent", "v.io/x/ref/services/cluster/cluster_agent"}},
+ {"jiri", []string{"go", "build", "-o", "cluster_agentd", "v.io/x/ref/services/cluster/cluster_agentd"}},
+ {"docker", []string{"build", "-t", imageName, "."}},
+ {"docker", []string{"tag", imageName, imageNameTag}},
+ {flagGcloudBin, []string{"--project=" + config.Project, "docker", "push", imageName}},
+ }, out); err != nil {
+ return err
+ }
+ fmt.Fprintf(stdout, "Pushed %s successfully.\n", imageNameTag)
+
+ // Pod agent image.
+ imageName = removeTag(config.PodAgent.Image)
+ imageNameTag = fmt.Sprintf("%s:%s", imageName, ts)
+
+ if err := buildDockerImage([]dockerFile{
+ {"Dockerfile", []byte(podAgentDockerfile)},
+ }, []dockerCmd{
+ {"jiri", []string{"go", "build", "-o", "pod_agentd", "v.io/x/ref/services/agent/pod_agentd"}},
+ {"docker", []string{"build", "-t", imageName, "."}},
+ {"docker", []string{"tag", imageName, imageNameTag}},
+ {flagGcloudBin, []string{"--project=" + config.Project, "docker", "push", imageName}},
+ }, out); err != nil {
+ return err
+ }
+ fmt.Fprintf(stdout, "Pushed %s successfully.\n", imageNameTag)
+ return nil
+}
+
+func removeTag(name string) string {
+ if p := strings.Split(name, ":"); len(p) > 0 {
+ return p[0]
+ }
+ return ""
+}
+
+func buildDockerImage(files []dockerFile, cmds []dockerCmd, stdout io.Writer) error {
+ workDir, err := ioutil.TempDir("", "docker-build-")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(workDir)
+
+ for _, f := range files {
+ if stdout != nil {
+ fmt.Fprintf(stdout, "#### Writing %q\n", f.name)
+ }
+ if err := ioutil.WriteFile(filepath.Join(workDir, f.name), f.content, 0600); err != nil {
+ return fmt.Errorf("failed to write %q: %v", f.name, err)
+ }
+ }
+ for _, c := range cmds {
+ if stdout != nil {
+ fmt.Fprintf(stdout, "#### Running %s %s\n", c.name, strings.Join(c.args, " "))
+ }
+ cmd := exec.Command(c.name, c.args...)
+ cmd.Dir = workDir
+ cmd.Stdout = stdout
+ cmd.Stderr = stdout
+ if err := cmd.Run(); err != nil {
+ return fmt.Errorf("%v failed: %v", c, err)
+ }
+ }
+ return nil
+}
diff --git a/services/cluster/vkube/main.go b/services/cluster/vkube/main.go
new file mode 100644
index 0000000..4360be1
--- /dev/null
+++ b/services/cluster/vkube/main.go
@@ -0,0 +1,291 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The following enables go generate to generate the doc.go file.
+//go:generate go run $JIRI_ROOT/release/go/src/v.io/x/lib/cmdline/testdata/gendoc.go .
+
+package main
+
+import (
+ "fmt"
+ "strings"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+ "v.io/x/lib/cmdline"
+ "v.io/x/ref/lib/v23cmd"
+ _ "v.io/x/ref/runtime/factories/generic"
+)
+
+var (
+ flagConfigFile string
+ flagKubectlBin string
+ flagGcloudBin string
+ flagResourceFile string
+ flagVerbose bool
+)
+
+func main() {
+ cmdline.HideGlobalFlagsExcept()
+
+ cmd := &cmdline.Command{
+ Name: "vkube",
+ Short: "Manages Vanadium applications on kubernetes",
+ Long: "Manages Vanadium applications on kubernetes",
+ Children: []*cmdline.Command{
+ cmdGetCredentials,
+ cmdStart,
+ cmdUpdate,
+ cmdStop,
+ cmdStartClusterAgent,
+ cmdStopClusterAgent,
+ cmdClaimClusterAgent,
+ cmdBuildDockerImages,
+ },
+ }
+ cmd.Flags.StringVar(&flagConfigFile, "config", "vkube.cfg", "The 'vkube.cfg' file to use.")
+ cmd.Flags.StringVar(&flagKubectlBin, "kubectl", "kubectl", "The 'kubectl' binary to use.")
+ cmd.Flags.StringVar(&flagGcloudBin, "gcloud", "gcloud", "The 'gcloud' binary to use.")
+
+ cmdStart.Flags.StringVar(&flagResourceFile, "f", "", "Filename to use to create the kubernetes resource.")
+
+ cmdUpdate.Flags.StringVar(&flagResourceFile, "f", "", "Filename to use to update the kubernetes resource.")
+
+ cmdStop.Flags.StringVar(&flagResourceFile, "f", "", "Filename to use to stop the kubernetes resource.")
+
+ cmdBuildDockerImages.Flags.BoolVar(&flagVerbose, "v", false, "When true, the output is more verbose.")
+
+ cmdline.Main(cmd)
+}
+
+var cmdGetCredentials = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdGetCredentials),
+ Name: "get-credentials",
+ Short: "Gets the kubernetes credentials from Google Cloud.",
+ Long: "Gets the kubernetes credentials from Google Cloud.",
+}
+
+func runCmdGetCredentials(ctx *context.T, env *cmdline.Env, args []string) error {
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ if config.Cluster == "" {
+ return fmt.Errorf("Cluster must be set.")
+ }
+ if config.Project == "" {
+ return fmt.Errorf("Project must be set.")
+ }
+ if config.Zone == "" {
+ return fmt.Errorf("Zone must be set.")
+ }
+ return getCredentials(config.Cluster, config.Project, config.Zone)
+}
+
+var cmdStart = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdStart),
+ Name: "start",
+ Short: "Starts an application.",
+ Long: "Starts an application.",
+ ArgsName: "<extension>",
+ ArgsLong: "<extension> The blessing name extension to give to the application.",
+}
+
+func runCmdStart(ctx *context.T, env *cmdline.Env, args []string) error {
+ if expected, got := 1, len(args); expected != got {
+ return env.UsageErrorf("start: incorrect number of arguments, expected %d, got %d", expected, got)
+ }
+ extension := args[0]
+
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ if flagResourceFile == "" {
+ return fmt.Errorf("-f must be specified.")
+ }
+ rc, err := readReplicationControllerConfig(flagResourceFile)
+ if err != nil {
+ return err
+ }
+ for _, v := range []string{"spec.template.metadata.labels.application", "spec.template.metadata.labels.deployment"} {
+ if rc.getString(v) == "" {
+ fmt.Fprintf(env.Stderr, "WARNING: %q is not set. Rolling updates will not work.\n", v)
+ }
+ }
+ agentAddr, err := findClusterAgent(config, true)
+ if err != nil {
+ return err
+ }
+ secretName, err := makeSecretName()
+ if err != nil {
+ return err
+ }
+ namespace := rc.getString("metadata.namespace")
+ appName := rc.getString("spec.template.metadata.labels.application")
+ if n, err := findReplicationControllerNameForApp(appName, namespace); err == nil {
+ return fmt.Errorf("replication controller for application=%q already running: %s", appName, n)
+ }
+ if err := createSecret(ctx, secretName, namespace, agentAddr, extension); err != nil {
+ return err
+ }
+ fmt.Fprintln(env.Stdout, "Created Secret successfully.")
+
+ if err := createReplicationController(ctx, config, rc, secretName); err != nil {
+ if err := deleteSecret(ctx, config, secretName, namespace); err != nil {
+ ctx.Error(err)
+ }
+ return err
+ }
+ fmt.Fprintln(env.Stdout, "Created replication controller successfully.")
+ return nil
+}
+
+var cmdUpdate = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdUpdate),
+ Name: "update",
+ Short: "Updates an application.",
+ Long: "Updates an application to a new version with a rolling update, preserving the existing blessings.",
+}
+
+func runCmdUpdate(ctx *context.T, env *cmdline.Env, args []string) error {
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ if flagResourceFile == "" {
+ return fmt.Errorf("-f must be specified.")
+ }
+ rc, err := readReplicationControllerConfig(flagResourceFile)
+ if err != nil {
+ return err
+ }
+ if err := updateReplicationController(ctx, config, rc); err != nil {
+ return err
+ }
+ fmt.Fprintln(env.Stdout, "Updated replication controller successfully.")
+ return nil
+}
+
+var cmdStop = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdStop),
+ Name: "stop",
+ Short: "Stops an application.",
+ Long: "Stops an application.",
+}
+
+func runCmdStop(ctx *context.T, env *cmdline.Env, args []string) error {
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ if flagResourceFile == "" {
+ return fmt.Errorf("-f must be specified.")
+ }
+ rc, err := readReplicationControllerConfig(flagResourceFile)
+ if err != nil {
+ return err
+ }
+ name := rc.getString("metadata.name")
+ if name == "" {
+ return fmt.Errorf("metadata.name must be set")
+ }
+ namespace := rc.getString("metadata.namespace")
+ secretName, err := findSecretName(name, namespace)
+ if err != nil {
+ return err
+ }
+ if out, err := kubectl("--namespace="+namespace, "stop", "rc", name); err != nil {
+ return fmt.Errorf("failed to stop replication controller: %v: %s", err, out)
+ }
+ fmt.Fprintf(env.Stdout, "Stopping replication controller.\n")
+ if err := deleteSecret(ctx, config, secretName, namespace); err != nil {
+ return fmt.Errorf("failed to delete Secret: %v", err)
+ }
+ fmt.Fprintf(env.Stdout, "Deleting Secret.\n")
+ return nil
+}
+
+var cmdStartClusterAgent = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdStartClusterAgent),
+ Name: "start-cluster-agent",
+ Short: "Starts the cluster agent.",
+ Long: "Starts the cluster agent.",
+}
+
+func runCmdStartClusterAgent(ctx *context.T, env *cmdline.Env, args []string) error {
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ if err := createClusterAgent(ctx, config); err != nil {
+ return err
+ }
+ fmt.Fprintf(env.Stdout, "Starting Cluster Agent.\n")
+ return nil
+}
+
+var cmdStopClusterAgent = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdStopClusterAgent),
+ Name: "stop-cluster-agent",
+ Short: "Stops the cluster agent.",
+ Long: "Stops the cluster agent.",
+}
+
+func runCmdStopClusterAgent(ctx *context.T, env *cmdline.Env, args []string) error {
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ if err := stopClusterAgent(config); err != nil {
+ return err
+ }
+ fmt.Fprintf(env.Stdout, "Stopping Cluster Agent.\n")
+ return nil
+}
+
+var cmdClaimClusterAgent = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdClaimClusterAgent),
+ Name: "claim-cluster-agent",
+ Short: "Claims the cluster agent.",
+ Long: "Claims the cluster agent.",
+}
+
+func runCmdClaimClusterAgent(ctx *context.T, env *cmdline.Env, args []string) error {
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ myBlessings := v23.GetPrincipal(ctx).BlessingStore().Default()
+ claimer := clusterAgentClaimer(config)
+ if !myBlessings.CouldHaveNames([]string{claimer}) {
+ return fmt.Errorf("principal isn't the expected claimer: got %q, expected %q", myBlessings, claimer)
+ }
+ extension := strings.TrimPrefix(config.ClusterAgent.Blessing, claimer+security.ChainSeparator)
+ if err := claimClusterAgent(ctx, config, extension); err != nil {
+ if verror.ErrorID(err) == verror.ErrUnknownMethod.ID {
+ return fmt.Errorf("already claimed")
+ }
+ return err
+ }
+ fmt.Fprintf(env.Stdout, "Claimed Cluster Agent successfully.\n")
+ return nil
+}
+
+var cmdBuildDockerImages = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runCmdBuildDockerImages),
+ Name: "build-docker-images",
+ Short: "Builds the docker images for the cluster and pod agents.",
+ Long: "Builds the docker images for the cluster and pod agents.",
+}
+
+func runCmdBuildDockerImages(ctx *context.T, env *cmdline.Env, args []string) error {
+ config, err := readConfig(flagConfigFile)
+ if err != nil {
+ return err
+ }
+ return buildDockerImages(config, flagVerbose, env.Stdout)
+}
diff --git a/services/cluster/vkube/object.go b/services/cluster/vkube/object.go
new file mode 100644
index 0000000..4d517e0
--- /dev/null
+++ b/services/cluster/vkube/object.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+)
+
+// object simplifies the parsing and handling of json objects that are
+// unmarshaled into an empty interface.
+type object map[string]interface{}
+
+func (o *object) importJSON(data []byte) error {
+ var decode interface{}
+ if err := json.Unmarshal(data, &decode); err != nil {
+ return err
+ }
+ c := convertToObject(decode)
+ var ok bool
+ if *o, ok = c.(object); !ok {
+ return fmt.Errorf("object is %T", c)
+ }
+ return nil
+}
+
+// convertToObject converts all occurrences of map[string]interface{} to object.
+func convertToObject(i interface{}) interface{} {
+ switch obj := i.(type) {
+ case map[string]interface{}:
+ for k, v := range obj {
+ obj[k] = convertToObject(v)
+ }
+ return object(obj)
+ case []interface{}:
+ for x, y := range obj {
+ obj[x] = convertToObject(y)
+ }
+ return obj
+ default:
+ return obj
+ }
+}
+
+func (o object) json() ([]byte, error) {
+ return json.MarshalIndent(o, "", " ")
+}
+
+// get retrieves the value of an object inside this object, e.g.:
+// if o = { "a": { "b": "c" } }, o.get("a.b") == "c".
+func (o object) get(name string) interface{} {
+ parts := strings.Split(name, ".")
+ var obj interface{} = o
+ for _, p := range parts {
+ m, ok := obj.(object)
+ if !ok {
+ return nil
+ }
+ var exists bool
+ if obj, exists = m[p]; !exists {
+ return nil
+ }
+ }
+ return obj
+}
+
+// set sets the value of an object inside this object, e.g.:
+// if o = { "a": { "b": "c" } }, o.set("a.b", "X") change "c" to "X".
+func (o object) set(name string, value interface{}) error {
+ parts := strings.Split(name, ".")
+ var obj interface{} = o
+ for {
+ m, ok := obj.(object)
+ if !ok {
+ return fmt.Errorf("%q not an object", name)
+ }
+
+ p := parts[0]
+ parts = parts[1:]
+
+ if len(parts) == 0 {
+ m[p] = value
+ break
+ }
+ if obj, ok = m[p]; !ok {
+ obj = make(object)
+ m[p] = obj
+ }
+ }
+ return nil
+}
+
+// getString retrieves a string object.
+func (c object) getString(name string) string {
+ switch s := c.get(name).(type) {
+ case string:
+ return s
+ case nil:
+ return ""
+ default:
+ return fmt.Sprintf("%v", s)
+ }
+}
+
+// getString retrieves a integer object.
+func (c object) getInt(name string) int {
+ switch v := c.get(name).(type) {
+ case int:
+ return v
+ case float64:
+ return int(v)
+ default:
+ return -1
+ }
+}
+
+// getObjectArray retrieves an array of objects.
+func (c object) getObjectArray(name string) []object {
+ s, ok := c.get(name).([]interface{})
+ if !ok {
+ return nil
+ }
+ n := make([]object, len(s))
+ for i, o := range s {
+ if x, ok := o.(object); ok {
+ n[i] = x
+ continue
+ }
+ return nil
+ }
+ return n
+}
+
+// append adds objects to an array.
+func (c object) append(name string, values ...interface{}) error {
+ obj := c.get(name)
+ if obj == nil {
+ obj = []interface{}{}
+ }
+ switch array := obj.(type) {
+ case []interface{}:
+ return c.set(name, append(array, values...))
+ default:
+ return fmt.Errorf("%q is not an array", name)
+ }
+}
diff --git a/services/cluster/vkube/object_test.go b/services/cluster/vkube/object_test.go
new file mode 100644
index 0000000..62be6ee
--- /dev/null
+++ b/services/cluster/vkube/object_test.go
@@ -0,0 +1,112 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "testing"
+)
+
+func TestObject(t *testing.T) {
+ o := make(object)
+ o.set("foo", "bar")
+ o.set("slice", []interface{}{"a", "b", "c"})
+ o.set("obj", object{"name": "Bob"})
+ o.set("x.y.z", 5)
+ o.append("slice", "d")
+
+ out, err := o.json()
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ expected := `{
+ "foo": "bar",
+ "obj": {
+ "name": "Bob"
+ },
+ "slice": [
+ "a",
+ "b",
+ "c",
+ "d"
+ ],
+ "x": {
+ "y": {
+ "z": 5
+ }
+ }
+}`
+ if got := string(out); got != expected {
+ t.Errorf("Unexpected output. Got %q, expected %q", got, expected)
+ }
+}
+
+func TestObjectJSON(t *testing.T) {
+ json := `{
+ "foo": "bar",
+ "bar": 10,
+ "list": [ { "x":0 }, { "x":1 }, { "x":2 } ],
+ "x": { "y": [ 1, 2, 3 ] }
+ }`
+
+ o := make(object)
+ if err := o.importJSON([]byte(json)); err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ if got, expected := o.getString("foo"), "bar"; got != expected {
+ t.Errorf("Unexpected value. Got %#v, expected %#v", got, expected)
+ }
+ if got, expected := o.getInt("bar"), 10; got != expected {
+ t.Errorf("Unexpected value. Got %#v, expected %#v", got, expected)
+ }
+ if got, expected := o.getString("notthere"), ""; got != expected {
+ t.Errorf("Unexpected value. Got %#v, expected %#v", got, expected)
+ }
+ if got, expected := o.getString("x.y"), "[1 2 3]"; got != expected {
+ t.Errorf("Unexpected value. Got %#v, expected %#v", got, expected)
+ }
+ o.append("x.y", 4)
+ list := o.getObjectArray("list")
+ for i, item := range list {
+ if got, expected := item.get("x"), float64(i); got != expected {
+ t.Errorf("Unexpected value for x. Got %#v, expected %#v", got, expected)
+ }
+ }
+ list = append(list, object{"x": "y"})
+ o.set("list", list)
+
+ out, err := o.json()
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ expected := `{
+ "bar": 10,
+ "foo": "bar",
+ "list": [
+ {
+ "x": 0
+ },
+ {
+ "x": 1
+ },
+ {
+ "x": 2
+ },
+ {
+ "x": "y"
+ }
+ ],
+ "x": {
+ "y": [
+ 1,
+ 2,
+ 3,
+ 4
+ ]
+ }
+}`
+ if got := string(out); got != expected {
+ t.Errorf("Unexpected output. Got %q, expected %q", got, expected)
+ }
+}
diff --git a/services/cluster/vkube/util.go b/services/cluster/vkube/util.go
new file mode 100644
index 0000000..cd69360
--- /dev/null
+++ b/services/cluster/vkube/util.go
@@ -0,0 +1,334 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "bytes"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/hex"
+ "fmt"
+ "io/ioutil"
+ "os/exec"
+ "strings"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vom"
+ "v.io/x/ref/services/cluster"
+)
+
+// getCredentials uses the gcloud command to get the credentials required to
+// access the kubernetes cluster.
+func getCredentials(cluster, project, zone string) error {
+ if out, err := exec.Command(flagGcloudBin, "config", "set", "container/cluster", cluster).CombinedOutput(); err != nil {
+ return fmt.Errorf("failed to set container/cluster: %v: %s", err, out)
+ }
+ if out, err := exec.Command(flagGcloudBin, "container", "clusters", "get-credentials", cluster, "--project", project, "--zone", zone).CombinedOutput(); err != nil {
+ return fmt.Errorf("failed to set get credentials for %q: %v: %s", cluster, err, out)
+ }
+ return nil
+}
+
+// localAgentAddress returns the address of the cluster agent to use from within
+// the cluster.
+func localAgentAddress(config *vkubeConfig) string {
+ return fmt.Sprintf("/(%s)@%s.%s:%d",
+ config.ClusterAgent.Blessing,
+ clusterAgentServiceName,
+ config.ClusterAgent.Namespace,
+ clusterAgentServicePort,
+ )
+}
+
+// readReplicationControllerConfig reads a ReplicationController config from a
+// file.
+func readReplicationControllerConfig(fileName string) (object, error) {
+ data, err := ioutil.ReadFile(fileName)
+ if err != nil {
+ return nil, err
+ }
+ var rc object
+ if err := rc.importJSON(data); err != nil {
+ return nil, err
+ }
+ if kind := rc.getString("kind"); kind != "ReplicationController" {
+ return nil, fmt.Errorf("expected kind=\"ReplicationController\", got %q", kind)
+ }
+ return rc, nil
+}
+
+// addPodAgent takes either a ReplicationController or Pod object and adds a
+// pod-agent container to it. The existing containers are updated to use the
+// pod agent.
+func addPodAgent(ctx *context.T, config *vkubeConfig, obj object, secretName string) error {
+ var base string
+ switch kind := obj.getString("kind"); kind {
+ case "ReplicationController":
+ base = "spec.template."
+ case "Pod":
+ base = ""
+ default:
+ return fmt.Errorf("expected kind=\"ReplicationController\" or \"Pod\", got %q", kind)
+ }
+
+ // Add the volumes used by the pod agent container.
+ if err := obj.append(base+"spec.volumes",
+ object{"name": "agent-logs", "emptyDir": object{}},
+ object{"name": "agent-secret", "secret": object{"secretName": secretName}},
+ object{"name": "agent-socket", "emptyDir": object{}},
+ ); err != nil {
+ return err
+ }
+
+ // Update the existing containers to talk to the pod agent.
+ containers := obj.getObjectArray(base + "spec.containers")
+ for _, c := range containers {
+ if err := c.append("env", object{"name": "V23_AGENT_PATH", "value": "/agent/socket/agent.sock"}); err != nil {
+ return err
+ }
+ if err := c.append("volumeMounts", object{"name": "agent-socket", "mountPath": "/agent/socket", "readOnly": true}); err != nil {
+ return err
+ }
+ }
+
+ // Add the pod agent container.
+ containers = append(containers, object{
+ "name": "pod-agent",
+ "image": config.PodAgent.Image,
+ "args": []string{
+ "pod_agentd",
+ "--agent=" + localAgentAddress(config),
+ "--root-blessings=" + rootBlessings(ctx),
+ "--secret-key-file=/agent/secret/secret",
+ "--socket-path=/agent/socket/agent.sock",
+ "--log_dir=/logs",
+ },
+ "volumeMounts": []object{
+ object{"name": "agent-logs", "mountPath": "/logs"},
+ object{"name": "agent-secret", "mountPath": "/agent/secret", "readOnly": true},
+ object{"name": "agent-socket", "mountPath": "/agent/socket"},
+ },
+ })
+ return obj.set(base+"spec.containers", containers)
+}
+
+// createSecret gets a new secret key from the cluster agent, and then creates a
+// Secret object on kubernetes with it.
+func createSecret(ctx *context.T, secretName, namespace, agentAddr, extension string) error {
+ secret, err := cluster.ClusterAgentAdminClient(agentAddr).NewSecret(ctx, &granter{extension: extension})
+ if err != nil {
+ return err
+ }
+ if out, err := kubectlCreate(object{
+ "apiVersion": "v1",
+ "kind": "Secret",
+ "metadata": object{
+ "name": secretName,
+ "namespace": namespace,
+ },
+ "type": "Opaque",
+ "data": object{
+ "secret": base64.StdEncoding.EncodeToString([]byte(secret)),
+ },
+ }); err != nil {
+ return fmt.Errorf("failed to create secret %q: %v\n%s\n", secretName, err, string(out))
+ }
+ return nil
+}
+
+type granter struct {
+ rpc.CallOpt
+ extension string
+}
+
+func (g *granter) Grant(ctx *context.T, call security.Call) (security.Blessings, error) {
+ p := call.LocalPrincipal()
+ return p.Bless(call.RemoteBlessings().PublicKey(), p.BlessingStore().Default(), g.extension, security.UnconstrainedUse())
+}
+
+// deleteSecret deletes a Secret object and its associated secret key and
+// blessings.
+// We know the name of the Secret object, but we don't know the secret key. The
+// only way to get it back from Kubernetes is to mount the Secret Object to a
+// Pod, and then use the secret key to delete the secret key.
+func deleteSecret(ctx *context.T, config *vkubeConfig, name, namespace string) error {
+ podName := fmt.Sprintf("delete-secret-%s", name)
+ del := object{
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": object{
+ "name": podName,
+ "namespace": namespace,
+ },
+ "spec": object{
+ "containers": []interface{}{
+ object{
+ "name": "delete-secret",
+ "image": config.ClusterAgent.Image,
+ "args": []string{
+ "/bin/bash",
+ "-c",
+ "cluster_agent --agent='" + localAgentAddress(config) + "' forget $(cat /agent/secret/secret) && /google-cloud-sdk/bin/kubectl --namespace=" + namespace + " delete secret " + name + " && /google-cloud-sdk/bin/kubectl --namespace=" + namespace + " delete pod " + podName,
+ },
+ "volumeMounts": []interface{}{
+ object{"name": "agent-secret", "mountPath": "/agent/secret", "readOnly": true},
+ },
+ },
+ },
+ "restartPolicy": "OnFailure",
+ "activeDeadlineSeconds": 300,
+ },
+ }
+ if err := addPodAgent(ctx, config, del, name); err != nil {
+ return err
+ }
+ out, err := kubectlCreate(del)
+ if err != nil {
+ return fmt.Errorf("failed to create delete Pod: %v: %s", err, out)
+ }
+ return nil
+}
+
+// createReplicationController takes a ReplicationController object, adds a
+// pod-agent, and then creates it on kubernetes.
+func createReplicationController(ctx *context.T, config *vkubeConfig, rc object, secretName string) error {
+ if err := addPodAgent(ctx, config, rc, secretName); err != nil {
+ return err
+ }
+ if out, err := kubectlCreate(rc); err != nil {
+ return fmt.Errorf("failed to create replication controller: %v\n%s\n", err, string(out))
+ }
+ return nil
+}
+
+// updateReplicationController takes a ReplicationController object, adds a
+// pod-agent, and then performs a rolling update.
+func updateReplicationController(ctx *context.T, config *vkubeConfig, rc object) error {
+ oldName, err := findReplicationControllerNameForApp(rc.getString("spec.template.metadata.labels.application"), rc.getString("metadata.namespace"))
+ if err != nil {
+ return err
+ }
+ secretName, err := findSecretName(oldName, rc.getString("metadata.namespace"))
+ if err != nil {
+ return err
+ }
+ if err := addPodAgent(ctx, config, rc, secretName); err != nil {
+ return err
+ }
+ json, err := rc.json()
+ if err != nil {
+ return err
+ }
+ cmd := exec.Command(flagKubectlBin, "rolling-update", oldName, "-f", "-")
+ cmd.Stdin = bytes.NewBuffer(json)
+ if out, err := cmd.CombinedOutput(); err != nil {
+ return fmt.Errorf("failed to update replication controller %q: %v\n%s\n", oldName, err, string(out))
+ }
+ return nil
+}
+
+// createNamespaceIfNotExist creates a Namespace object if it doesn't already exist.
+func createNamespaceIfNotExist(name string) error {
+ if _, err := kubectl("get", "namespace", name); err == nil {
+ return nil
+ }
+ if out, err := kubectlCreate(object{
+ "apiVersion": "v1",
+ "kind": "Namespace",
+ "metadata": object{
+ "name": name,
+ },
+ }); err != nil {
+ return fmt.Errorf("failed to create Namespace %q: %v: %s", name, err, out)
+ }
+ return nil
+}
+
+// makeSecretName creates a random name for a Secret Object.
+func makeSecretName() (string, error) {
+ b := make([]byte, 16)
+ if _, err := rand.Read(b); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("secret-%s", hex.EncodeToString(b)), nil
+}
+
+// findReplicationControllerNameForApp returns the name of the
+// ReplicationController that is currently used to run the given application.
+func findReplicationControllerNameForApp(app, namespace string) (string, error) {
+ data, err := kubectl("--namespace="+namespace, "get", "rc", "-l", "application="+app, "-o", "json")
+ if err != nil {
+ return "", fmt.Errorf("failed to get replication controller for application %q: %v\n%s\n", app, err, string(data))
+ }
+ var list object
+ if err := list.importJSON(data); err != nil {
+ return "", fmt.Errorf("failed to parse kubectl output: %v", err)
+ }
+ items := list.getObjectArray("items")
+ if c := len(items); c != 1 {
+ return "", fmt.Errorf("found %d replication controllers for application %q", c, app)
+ }
+ name := items[0].getString("metadata.name")
+ if name == "" {
+ return "", fmt.Errorf("missing metadata.name")
+ }
+ return name, nil
+}
+
+// findSecretName finds the name of the Secret Object associated the given
+// Replication Controller.
+func findSecretName(rcName, namespace string) (string, error) {
+ data, err := kubectl("--namespace="+namespace, "get", "rc", rcName, "-o", "json")
+ if err != nil {
+ return "", fmt.Errorf("failed to get replication controller %q: %v\n%s\n", rcName, err, string(data))
+ }
+ var rc object
+ if err := rc.importJSON(data); err != nil {
+ return "", fmt.Errorf("failed to parse kubectl output: %v", err)
+ }
+ for _, v := range rc.getObjectArray("spec.template.spec.volumes") {
+ if v.getString("name") == "agent-secret" {
+ return v.getString("secret.secretName"), nil
+ }
+ }
+ return "", fmt.Errorf("failed to find secretName in replication controller %q", rcName)
+}
+
+// kubectlCreate runs 'kubectl create -f' on the given object and returns the
+// output.
+func kubectlCreate(o object) ([]byte, error) {
+ json, err := o.json()
+ if err != nil {
+ return nil, err
+ }
+ cmd := exec.Command(flagKubectlBin, "create", "-f", "-")
+ cmd.Stdin = bytes.NewBuffer(json)
+ return cmd.CombinedOutput()
+}
+
+// kubectl runs the 'kubectl' command with the given arguments and returns the
+// output.
+func kubectl(args ...string) ([]byte, error) {
+ return exec.Command(flagKubectlBin, args...).CombinedOutput()
+}
+
+// rootBlessings returns the root blessings for the current principal.
+func rootBlessings(ctx *context.T) string {
+ p := v23.GetPrincipal(ctx)
+ b64 := []string{}
+ for _, root := range security.RootBlessings(p.BlessingStore().Default()) {
+ data, err := vom.Encode(root)
+ if err != nil {
+ ctx.Fatalf("vom.Encode failed: %v", err)
+ }
+ // We use URLEncoding to be compatible with the principal
+ // command.
+ b64 = append(b64, base64.URLEncoding.EncodeToString(data))
+ }
+ return strings.Join(b64, ",")
+}
diff --git a/services/cluster/vkube/util_test.go b/services/cluster/vkube/util_test.go
new file mode 100644
index 0000000..f43cb54
--- /dev/null
+++ b/services/cluster/vkube/util_test.go
@@ -0,0 +1,203 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "v.io/x/ref/test"
+)
+
+func TestAddPodAgent(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ const (
+ myAppJSON = `{
+ "apiVersion": "v1",
+ "kind": "ReplicationController",
+ "metadata": {
+ "name": "my-app",
+ "labels": {
+ "run": "my-app"
+ }
+ },
+ "spec": {
+ "replicas": 5,
+ "template": {
+ "metadata": {
+ "labels": {
+ "run": "my-app"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "name": "my-app",
+ "image": "registry/me/my-app:latest",
+ "ports": [
+ { "containerPort": 8193, "hostPort": 8193 }
+ ],
+ "volumeMounts": [
+ { "name": "app-logs", "mountPath": "/logs" }
+ ]
+ }
+ ],
+ "volumes": [
+ { "name": "app-logs", "emptyDir": {} }
+ ]
+ }
+ }
+ }
+}`
+
+ expected = `{
+ "apiVersion": "v1",
+ "kind": "ReplicationController",
+ "metadata": {
+ "labels": {
+ "run": "my-app"
+ },
+ "name": "my-app"
+ },
+ "spec": {
+ "replicas": 5,
+ "template": {
+ "metadata": {
+ "labels": {
+ "run": "my-app"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "env": [
+ {
+ "name": "V23_AGENT_PATH",
+ "value": "/agent/socket/agent.sock"
+ }
+ ],
+ "image": "registry/me/my-app:latest",
+ "name": "my-app",
+ "ports": [
+ {
+ "containerPort": 8193,
+ "hostPort": 8193
+ }
+ ],
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "app-logs"
+ },
+ {
+ "mountPath": "/agent/socket",
+ "name": "agent-socket",
+ "readOnly": true
+ }
+ ]
+ },
+ {
+ "args": [
+ "pod_agentd",
+ "--agent=/(root/cluster-agent)@cluster-agent.test:8193",
+ "--root-blessings=ROOT-BLESSINGS",
+ "--secret-key-file=/agent/secret/secret",
+ "--socket-path=/agent/socket/agent.sock",
+ "--log_dir=/logs"
+ ],
+ "image": "",
+ "name": "pod-agent",
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "agent-logs"
+ },
+ {
+ "mountPath": "/agent/secret",
+ "name": "agent-secret",
+ "readOnly": true
+ },
+ {
+ "mountPath": "/agent/socket",
+ "name": "agent-socket"
+ }
+ ]
+ }
+ ],
+ "volumes": [
+ {
+ "emptyDir": {},
+ "name": "app-logs"
+ },
+ {
+ "emptyDir": {},
+ "name": "agent-logs"
+ },
+ {
+ "name": "agent-secret",
+ "secret": {
+ "secretName": "myapp-secret"
+ }
+ },
+ {
+ "emptyDir": {},
+ "name": "agent-socket"
+ }
+ ]
+ }
+ }
+ }
+}`
+ )
+
+ var myAppObj object
+ if err := myAppObj.importJSON([]byte(myAppJSON)); err != nil {
+ t.Fatalf("importJSON failed: %v", err)
+ }
+
+ config := &vkubeConfig{
+ ClusterAgent: clusterAgentConfig{
+ Blessing: "root/cluster-agent",
+ Namespace: "test",
+ },
+ }
+ if err := addPodAgent(ctx, config, myAppObj, "myapp-secret"); err != nil {
+ t.Fatalf("addPodAgent failed: %v", err)
+ }
+ outBytes, err := myAppObj.json()
+ if err != nil {
+ t.Fatalf("json failed: %v", err)
+ }
+ got := strings.Replace(string(outBytes), rootBlessings(ctx), "ROOT-BLESSINGS", 1)
+
+ if got != expected {
+ t.Errorf("unexpected output. Got %s, expected %s", got, expected)
+ diff(t, expected, got)
+ }
+}
+
+func diff(t *testing.T, expected, got string) {
+ dir, err := ioutil.TempDir("", "diff-")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dir)
+ expectedFile := filepath.Join(dir, "expected")
+ if err := ioutil.WriteFile(expectedFile, []byte(expected), 0644); err != nil {
+ t.Fatal(err)
+ }
+ gotFile := filepath.Join(dir, "got")
+ if err := ioutil.WriteFile(gotFile, []byte(got), 0644); err != nil {
+ t.Fatal(err)
+ }
+ out, _ := exec.Command("diff", "-u", expectedFile, gotFile).CombinedOutput()
+ t.Log(string(out))
+}
diff --git a/services/debug/debug/doc.go b/services/debug/debug/doc.go
index aa0c762..79b1167 100644
--- a/services/debug/debug/doc.go
+++ b/services/debug/debug/doc.go
@@ -34,6 +34,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/device/claimable/doc.go b/services/device/claimable/doc.go
index f3e1183..f0d7a37 100644
--- a/services/device/claimable/doc.go
+++ b/services/device/claimable/doc.go
@@ -37,6 +37,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/device/device/doc.go b/services/device/device/doc.go
index f5e85af..a14531b 100644
--- a/services/device/device/doc.go
+++ b/services/device/device/doc.go
@@ -53,6 +53,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/device/deviced/doc.go b/services/device/deviced/doc.go
index efea027..b3b717f 100644
--- a/services/device/deviced/doc.go
+++ b/services/device/deviced/doc.go
@@ -74,6 +74,8 @@
Path to the application to exec.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-username=
The UNIX user name used for the other functions of this tool.
-v=0
diff --git a/services/device/deviced/internal/impl/app_service.go b/services/device/deviced/internal/impl/app_service.go
index b623caf..ac637e4 100644
--- a/services/device/deviced/internal/impl/app_service.go
+++ b/services/device/deviced/internal/impl/app_service.go
@@ -154,7 +154,6 @@
vsecurity "v.io/x/ref/lib/security"
"v.io/x/ref/services/agent"
"v.io/x/ref/services/agent/agentlib"
- "v.io/x/ref/services/agent/keymgr"
"v.io/x/ref/services/device/internal/config"
"v.io/x/ref/services/device/internal/errors"
"v.io/x/ref/services/internal/packages"
@@ -211,9 +210,6 @@
type securityAgentState struct {
// Security agent key manager client.
keyMgr agent.KeyManager
- // Deprecated: security agent key manager client based on pipe
- // connections.
- keyMgrAgent *keymgr.Agent
}
// appRunner is the subset of the appService object needed to
@@ -540,35 +536,6 @@
return installationDir, nil
}
-// agentPrincipal creates a Principal backed by the given agent connection,
-// taking ownership of the connection. The returned cancel function is to be
-// called when the Principal is no longer in use.
-func agentPrincipal(ctx *context.T, conn *os.File) (security.Principal, func(), error) {
- agentctx, cancel := context.WithCancel(ctx)
- var err error
- if agentctx, err = v23.WithNewStreamManager(agentctx); err != nil {
- cancel()
- conn.Close()
- return nil, nil, err
- }
- // TODO: This should use the same network as the agent we're using,
- // not whatever this process was compiled with.
- ep, err := v23.NewEndpoint(agentlib.AgentEndpoint(int(conn.Fd())))
- if err != nil {
- cancel()
- conn.Close()
- return nil, nil, err
- }
- p, err := agentlib.NewAgentPrincipal(agentctx, ep, v23.GetClient(agentctx))
- if err != nil {
- cancel()
- conn.Close()
- return nil, nil, err
- }
- conn.Close()
- return p, cancel, nil
-}
-
// setupPrincipal sets up the instance's principal, with the right blessings.
func setupPrincipal(ctx *context.T, instanceDir string, call device.ApplicationInstantiateServerCall, securityAgent *securityAgentState, info *instanceInfo, rootDir string) error {
var p security.Principal
@@ -602,25 +569,6 @@
if p, err = agentlib.NewAgentPrincipalX(sockPath); err != nil {
return verror.New(errors.ErrOperationFailed, ctx, "NewAgentPrincipalX failed", err)
}
- case securityAgent != nil && securityAgent.keyMgrAgent != nil:
- // This code path is deprecated in favor of the socket agent
- // connection.
-
- // TODO(caprita): Part of the cleanup upon destroying an
- // instance, we should tell the agent to drop the principal.
- handle, conn, err := securityAgent.keyMgrAgent.NewPrincipal(ctx, false)
- if err != nil {
- return verror.New(errors.ErrOperationFailed, ctx, fmt.Sprintf("NewPrincipal() failed %v", err))
- }
- var cancel func()
- if p, cancel, err = agentPrincipal(ctx, conn); err != nil {
- return verror.New(errors.ErrOperationFailed, ctx, fmt.Sprintf("agentPrincipal failed: %v", err))
- }
- defer cancel()
- info.SecurityAgentHandle = handle
- // conn will be closed when the connection to the agent is shut
- // down, as a result of cancel() shutting down the stream
- // manager. No need to call conn.Close().
default:
credentialsDir := filepath.Join(instanceDir, "credentials")
// TODO(caprita): The app's system user id needs access to this dir.
@@ -993,26 +941,6 @@
ctx.Errorf("StopServing failed: %v", err)
}
}()
- case sa != nil && sa.keyMgrAgent != nil:
- // This code path is deprecated in favor of the socket agent
- // connection.
- file, err := sa.keyMgrAgent.NewConnection(info.SecurityAgentHandle)
- if err != nil {
- ctx.Errorf("NewConnection(%v) failed: %v", info.SecurityAgentHandle, err)
- return 0, err
- }
- agentCleaner = func() {
- file.Close()
- }
- // We need to account for the file descriptors corresponding to
- // std{err|out|in} as well as the implementation-specific pipes
- // that the vexec library adds to ExtraFiles during
- // handle.Start. vexec.FileOffset properly offsets fd
- // accordingly.
- fd := len(cmd.ExtraFiles) + vexec.FileOffset
- cmd.ExtraFiles = append(cmd.ExtraFiles, file)
- ep := agentlib.AgentEndpoint(fd)
- cfg.Set(mgmt.SecurityAgentEndpointConfigKey, ep)
default:
cmd.Env = append(cmd.Env, ref.EnvCredentials+"="+filepath.Join(instanceDir, "credentials"))
}
@@ -1785,18 +1713,6 @@
}()
}
debugInfo.PrincipalType = "Agent-based"
- case sa != nil && sa.keyMgrAgent != nil:
- file, err := sa.keyMgrAgent.NewConnection(debugInfo.Info.SecurityAgentHandle)
- if err != nil {
- ctx.Errorf("NewConnection(%v) failed: %v", debugInfo.Info.SecurityAgentHandle, err)
- return "", err
- }
- var cancel func()
- if debugInfo.Principal, cancel, err = agentPrincipal(ctx, file); err != nil {
- return "", err
- }
- defer cancel()
- debugInfo.PrincipalType = "Agent-based-deprecated"
default:
credentialsDir := filepath.Join(instanceDir, "credentials")
var err error
diff --git a/services/device/deviced/internal/impl/device_service.go b/services/device/deviced/internal/impl/device_service.go
index be3db00..3445163 100644
--- a/services/device/deviced/internal/impl/device_service.go
+++ b/services/device/deviced/internal/impl/device_service.go
@@ -277,7 +277,6 @@
cfg.Set(mgmt.AddressConfigKey, "127.0.0.1:0")
var p security.Principal
- var agentHandle []byte
switch sa := s.securityAgent; {
case sa != nil && sa.keyMgr != nil:
@@ -306,19 +305,6 @@
if p, err = agentlib.NewAgentPrincipalX(sockPath); err != nil {
return verror.New(errors.ErrOperationFailed, ctx, "NewAgentPrincipalX failed", err)
}
- case sa != nil && sa.keyMgrAgent != nil:
- // This code path is deprecated in favor of the socket agent
- // connection.
- handle, conn, err := sa.keyMgrAgent.NewPrincipal(ctx, true)
- if err != nil {
- return verror.New(errors.ErrOperationFailed, ctx, fmt.Sprintf("NewPrincipal() failed %v", err))
- }
- agentHandle = handle
- var cancel func()
- if p, cancel, err = agentPrincipal(ctx, conn); err != nil {
- return verror.New(errors.ErrOperationFailed, ctx, fmt.Sprintf("agentPrincipal failed: %v", err))
- }
- defer cancel()
default:
credentialsDir := filepath.Join(workspace, "credentials")
var err error
@@ -339,23 +325,6 @@
return verror.New(errors.ErrOperationFailed, ctx, fmt.Sprintf("AddToRoots() failed: %v", err))
}
- if s.securityAgent != nil && s.securityAgent.keyMgrAgent != nil {
- // This code path is deprecated in favor of the socket agent
- // connection.
- file, err := s.securityAgent.keyMgrAgent.NewConnection(agentHandle)
- if err != nil {
- return verror.New(errors.ErrOperationFailed, ctx, fmt.Sprintf("NewConnection(%v) failed: %v", agentHandle, err))
- }
- defer file.Close()
-
- fd := len(cmd.ExtraFiles) + vexec.FileOffset
- cmd.ExtraFiles = append(cmd.ExtraFiles, file)
- // TODO: This should use the same network as the agent we're using,
- // not whatever this process was compiled with.
- ep := agentlib.AgentEndpoint(fd)
- cfg.Set(mgmt.SecurityAgentEndpointConfigKey, ep)
- }
-
handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{Config: cfg})
// Start the child process.
if err := handle.Start(); err != nil {
diff --git a/services/device/deviced/internal/impl/dispatcher.go b/services/device/deviced/internal/impl/dispatcher.go
index b3dee78..5d12e66 100644
--- a/services/device/deviced/internal/impl/dispatcher.go
+++ b/services/device/deviced/internal/impl/dispatcher.go
@@ -136,16 +136,6 @@
keyMgr: km,
}
}
- } else if len(os.Getenv(ref.EnvAgentEndpoint)) > 0 {
- // This code path is deprecated in favor of socket agent
- // connection.
- if keyMgrAgent, err := keymgr.NewAgent(); err != nil {
- return nil, nil, verror.New(errNewAgentFailed, ctx, err)
- } else {
- d.internal.securityAgent = &securityAgentState{
- keyMgrAgent: keyMgrAgent,
- }
- }
}
runner := &appRunner{
callback: d.internal.callback,
diff --git a/services/discovery/service.go b/services/discovery/service.go
index 5e75c94..079e4e6 100644
--- a/services/discovery/service.go
+++ b/services/discovery/service.go
@@ -37,7 +37,8 @@
func (s *impl) RegisterService(ctx *context.T, call rpc.ServerCall, service discovery.Service, visibility []security.BlessingPattern) (sdiscovery.ServiceHandle, error) {
ctx, cancel := context.WithCancel(s.ctx)
- if err := s.d.Advertise(ctx, service, visibility); err != nil {
+ done, err := s.d.Advertise(ctx, service, visibility)
+ if err != nil {
cancel()
return 0, err
}
@@ -57,7 +58,10 @@
break
}
}
- s.handles[handle] = cancel
+ s.handles[handle] = func() {
+ cancel()
+ <-done
+ }
s.lastHandle = handle
s.mu.Unlock()
return handle, nil
@@ -65,11 +69,11 @@
func (s *impl) UnregisterService(ctx *context.T, call rpc.ServerCall, handle sdiscovery.ServiceHandle) error {
s.mu.Lock()
- cancel := s.handles[handle]
+ stop := s.handles[handle]
delete(s.handles, handle)
s.mu.Unlock()
- if cancel != nil {
- cancel()
+ if stop != nil {
+ stop()
}
return nil
}
diff --git a/services/groups/groups/doc.go b/services/groups/groups/doc.go
index a925394..afa74ed 100644
--- a/services/groups/groups/doc.go
+++ b/services/groups/groups/doc.go
@@ -35,6 +35,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/identity/identityd/doc.go b/services/identity/identityd/doc.go
index e2bfac9..be8a14e 100644
--- a/services/identity/identityd/doc.go
+++ b/services/identity/identityd/doc.go
@@ -79,6 +79,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/identity/internal/identityd_test/doc.go b/services/identity/internal/identityd_test/doc.go
index 1ab2911..d7c698e 100644
--- a/services/identity/internal/identityd_test/doc.go
+++ b/services/identity/internal/identityd_test/doc.go
@@ -51,6 +51,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/mounttable/mounttabled/doc.go b/services/mounttable/mounttabled/doc.go
index 4deba0d..e7ee1fc 100644
--- a/services/mounttable/mounttabled/doc.go
+++ b/services/mounttable/mounttabled/doc.go
@@ -42,6 +42,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/profile/profile/doc.go b/services/profile/profile/doc.go
index ec5189b..c93d1d1 100644
--- a/services/profile/profile/doc.go
+++ b/services/profile/profile/doc.go
@@ -34,6 +34,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/profile/profiled/doc.go b/services/profile/profiled/doc.go
index 85b8107..a188316 100644
--- a/services/profile/profiled/doc.go
+++ b/services/profile/profiled/doc.go
@@ -33,6 +33,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/proxy/proxyd/doc.go b/services/proxy/proxyd/doc.go
index cd0d3ce..5c5622f 100644
--- a/services/proxy/proxyd/doc.go
+++ b/services/proxy/proxyd/doc.go
@@ -42,6 +42,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/role/roled/doc.go b/services/role/roled/doc.go
index 0da6feb..7366356 100644
--- a/services/role/roled/doc.go
+++ b/services/role/roled/doc.go
@@ -32,6 +32,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index f6a9bc0..9b607a2 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -16,6 +16,7 @@
"v.io/v23/context"
"v.io/v23/naming"
+ "v.io/v23/options"
"v.io/v23/services/syncbase/nosql"
"v.io/v23/vdl"
"v.io/v23/verror"
@@ -463,7 +464,8 @@
vlog.VI(4).Infof("sync: connectToPeer: trying %v", absName)
var err error
- iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name)
+ iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name,
+ options.ChannelTimeout(connectionTimeOut))
t.Stop()
if err == nil {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index c2ccf69..e203fdd 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -352,7 +352,7 @@
}
// Duplicate calls to advertise will return an error.
- err := advertiser.Advertise(ctx, sbService, nil)
+ _, err := advertiser.Advertise(ctx, sbService, nil)
if err == nil {
s.advCancel = stop
}
diff --git a/test/goroutines/goroutines.go b/test/goroutines/goroutines.go
index 1169906..555d666 100644
--- a/test/goroutines/goroutines.go
+++ b/test/goroutines/goroutines.go
@@ -45,11 +45,11 @@
bufsize *= 2
buf = make([]byte, bufsize)
}
- return Parse(buf)
+ return Parse(buf, true)
}
// Parse parses a stack trace into a structure representation.
-func Parse(buf []byte) ([]*Goroutine, error) {
+func Parse(buf []byte, ignore bool) ([]*Goroutine, error) {
scanner := bufio.NewScanner(bytes.NewReader(buf))
var out []*Goroutine
for scanner.Scan() {
@@ -60,7 +60,7 @@
if err != nil {
return out, fmt.Errorf("Error %v parsing trace:\n%s", err, string(buf))
}
- if !shouldIgnore(g) {
+ if !ignore || !shouldIgnore(g) {
out = append(out, g)
}
}
diff --git a/test/goroutines/goroutines_test.go b/test/goroutines/goroutines_test.go
index c8c236c..3b3f725 100644
--- a/test/goroutines/goroutines_test.go
+++ b/test/goroutines/goroutines_test.go
@@ -107,7 +107,7 @@
buf = buf[:runtime.Stack(buf, true)]
close(wait)
- gs, err := Parse(buf)
+ gs, err := Parse(buf, false)
if err != nil {
t.Fatal(err)
}
diff --git a/test/hello/helloclient/doc.go b/test/hello/helloclient/doc.go
index 6f7c049..35062a9 100644
--- a/test/hello/helloclient/doc.go
+++ b/test/hello/helloclient/doc.go
@@ -30,6 +30,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=
diff --git a/test/hello/helloserver/doc.go b/test/hello/helloserver/doc.go
index 4174c3c..91cba82 100644
--- a/test/hello/helloserver/doc.go
+++ b/test/hello/helloserver/doc.go
@@ -30,6 +30,8 @@
Displays metadata for the program and exits.
-stderrthreshold=2
logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
-v=0
log level for V logs
-v23.credentials=