discovery: the first naive implementation of discovery.
* This is a very simple and naive implementation of discovery and
* there're tons of TODOs. Hope this works as a starting point.
MultiPart: 2/2
Change-Id: I21fe484f77a652f847dd309f065981b3985d60bb
diff --git a/runtime/internal/discovery/advertise.go b/runtime/internal/discovery/advertise.go
index 1422760..f8f156b 100644
--- a/runtime/internal/discovery/advertise.go
+++ b/runtime/internal/discovery/advertise.go
@@ -11,6 +11,23 @@
)
// Advertise implements discovery.Advertiser.
+//
+// TODO(jhahn): Handle ACL.
func (ds *ds) Advertise(ctx *context.T, service discovery.Service, perms access.Permissions) error {
+ if len(service.InstanceUuid) == 0 {
+ service.InstanceUuid = NewInstanceUUID()
+ }
+ ad := &Advertisement{
+ ServiceUuid: NewServiceUUID(service.InterfaceName),
+ Service: service,
+ }
+ ctx, cancel := context.WithCancel(ctx)
+ for _, plugin := range ds.plugins {
+ err := plugin.Advertise(ctx, ad)
+ if err != nil {
+ cancel()
+ return err
+ }
+ }
return nil
}
diff --git a/runtime/internal/discovery/discovery.go b/runtime/internal/discovery/discovery.go
index 53501d7..79329ec 100644
--- a/runtime/internal/discovery/discovery.go
+++ b/runtime/internal/discovery/discovery.go
@@ -4,6 +4,30 @@
package discovery
+import (
+ "github.com/pborman/uuid"
+
+ "v.io/v23/discovery"
+)
+
// ds is an implementation of discovery.T.
type ds struct {
+ plugins []Plugin
+}
+
+// Advertisement holds a set of service properties to advertise.
+type Advertisement struct {
+ discovery.Service
+
+ // The service UUID to advertise.
+ ServiceUuid uuid.UUID
+
+ // TODO(jhahn): Add proximity.
+}
+
+// TODO(jhahn): Need a better API.
+func New(plugins []Plugin) discovery.T {
+ ds := &ds{plugins: make([]Plugin, len(plugins))}
+ copy(ds.plugins, plugins)
+ return ds
}
diff --git a/runtime/internal/discovery/discovery_test.go b/runtime/internal/discovery/discovery_test.go
new file mode 100644
index 0000000..6a4b248
--- /dev/null
+++ b/runtime/internal/discovery/discovery_test.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery_test
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+
+ idiscovery "v.io/x/ref/runtime/internal/discovery"
+ "v.io/x/ref/runtime/internal/discovery/plugins/mock"
+)
+
+func TestBasic(t *testing.T) {
+ ds := idiscovery.New([]idiscovery.Plugin{mock.New()})
+ services := []discovery.Service{
+ {
+ InstanceUuid: idiscovery.NewInstanceUUID(),
+ InterfaceName: "v.io/v23/a",
+ Addrs: []string{"/h1:123/x", "/h2:123/y"},
+ },
+ {
+ InstanceUuid: idiscovery.NewInstanceUUID(),
+ InterfaceName: "v.io/v23/b",
+ Addrs: []string{"/h1:123/x", "/h2:123/z"},
+ },
+ }
+ var stops []func()
+ for _, service := range services {
+ stop, err := advertise(ds, service)
+ if err != nil {
+ t.Fatalf("Advertise failed: %v\n", err)
+ }
+ stops = append(stops, stop)
+ }
+
+ updates, err := scan(ds, "v.io/v23/a")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services[0]) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[0])
+ }
+ updates, err = scan(ds, "v.io/v23/b")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services[1]) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[1])
+ }
+ updates, err = scan(ds, "")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services...) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services)
+ }
+ updates, err = scan(ds, "v.io/v23/c")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+ }
+
+ // Stop advertising the first service. Shouldn't affect the other.
+ stops[0]()
+ updates, err = scan(ds, "v.io/v23/a")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+ }
+ updates, err = scan(ds, "v.io/v23/b")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services[1]) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[1])
+ }
+ // Stop advertising the other. Now shouldn't discover any service.
+ stops[1]()
+ updates, err = scan(ds, "")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+ }
+}
+
+func advertise(ds discovery.Advertiser, services ...discovery.Service) (func(), error) {
+ ctx, cancel := context.RootContext()
+ for _, service := range services {
+ if err := ds.Advertise(ctx, service, nil); err != nil {
+ return nil, err
+ }
+ }
+ return cancel, nil
+}
+
+func scan(ds discovery.Scanner, query string) ([]discovery.Update, error) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ updateCh, err := ds.Scan(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+ var updates []discovery.Update
+ for {
+ select {
+ case update := <-updateCh:
+ updates = append(updates, update)
+ case <-time.After(10 * time.Millisecond):
+ return updates, nil
+ }
+ }
+}
+
+func match(updates []discovery.Update, wants ...discovery.Service) bool {
+ for _, want := range wants {
+ matched := false
+ for i, update := range updates {
+ found, ok := update.(discovery.UpdateFound)
+ if !ok {
+ continue
+ }
+ matched = reflect.DeepEqual(found.Value.Service, want)
+ if matched {
+ updates = append(updates[:i], updates[i+1:]...)
+ break
+ }
+ }
+ if !matched {
+ return false
+ }
+ }
+ return len(updates) == 0
+}
diff --git a/runtime/internal/discovery/plugin.go b/runtime/internal/discovery/plugin.go
index c275d89..57a74c1 100644
--- a/runtime/internal/discovery/plugin.go
+++ b/runtime/internal/discovery/plugin.go
@@ -4,6 +4,24 @@
package discovery
+import (
+ "github.com/pborman/uuid"
+
+ "v.io/v23/context"
+)
+
// Plugin is the basic interface for a plugin to discovery service.
+// All implementation should be goroutine-safe.
type Plugin interface {
+ // Advertise advertises the advertisement. Advertising will continue until
+ // the context is canceled or exceeds its deadline.
+ Advertise(ctx *context.T, ad *Advertisement) error
+
+ // Scan scans services that match the service uuid and returns scanned
+ // advertisements to the channel. A zero-value service uuid means any service.
+ // Scanning will continue until the context is canceled or exceeds its
+ // deadline.
+ //
+ // TODO(jhahn): Pass a filter on service attributes.
+ Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *Advertisement) error
}
diff --git a/runtime/internal/discovery/plugins/mock/mock.go b/runtime/internal/discovery/plugins/mock/mock.go
new file mode 100644
index 0000000..30423da
--- /dev/null
+++ b/runtime/internal/discovery/plugins/mock/mock.go
@@ -0,0 +1,70 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package mock
+
+import (
+ "sync"
+
+ "github.com/pborman/uuid"
+
+ "v.io/v23/context"
+
+ "v.io/x/ref/runtime/internal/discovery"
+)
+
+type plugin struct {
+ mu sync.Mutex
+ services map[string][]*discovery.Advertisement // GUARDED_BY(mu)
+}
+
+func (p *plugin) Advertise(ctx *context.T, ad *discovery.Advertisement) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ key := string(ad.ServiceUuid)
+ ads := p.services[key]
+ p.services[key] = append(ads, ad)
+ go func() {
+ <-ctx.Done()
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ ads := p.services[key]
+ for i, a := range ads {
+ if uuid.Equal(a.InstanceUuid, ad.InstanceUuid) {
+ ads = append(ads[:i], ads[i+1:]...)
+ break
+ }
+ }
+ if len(ads) > 0 {
+ p.services[key] = ads
+ } else {
+ delete(p.services, key)
+ }
+ }()
+ return nil
+}
+
+func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *discovery.Advertisement) error {
+ go func() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ for key, service := range p.services {
+ if len(serviceUuid) > 0 && key != string(serviceUuid) {
+ continue
+ }
+ for _, ad := range service {
+ select {
+ case scanCh <- ad:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }()
+ return nil
+}
+
+func New() discovery.Plugin {
+ return &plugin{services: make(map[string][]*discovery.Advertisement)}
+}
diff --git a/runtime/internal/discovery/scan.go b/runtime/internal/discovery/scan.go
index f955326..9425201 100644
--- a/runtime/internal/discovery/scan.go
+++ b/runtime/internal/discovery/scan.go
@@ -5,11 +5,48 @@
package discovery
import (
+ "github.com/pborman/uuid"
+
"v.io/v23/context"
"v.io/v23/discovery"
)
// Scan implements discovery.Scanner.
func (ds *ds) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
- return nil, nil
+ // TODO(jhann): Implement a simple query processor.
+ var serviceUuid uuid.UUID
+ if len(query) > 0 {
+ serviceUuid = NewServiceUUID(query)
+ }
+ // TODO(jhahn): Revisit the buffer size.
+ scanCh := make(chan *Advertisement, 10)
+ ctx, cancel := context.WithCancel(ctx)
+ for _, plugin := range ds.plugins {
+ err := plugin.Scan(ctx, serviceUuid, scanCh)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ }
+ // TODO(jhahn): Revisit the buffer size.
+ updateCh := make(chan discovery.Update, 10)
+ go doScan(ctx, scanCh, updateCh)
+ return updateCh, nil
+}
+
+func doScan(ctx *context.T, scanCh <-chan *Advertisement, updateCh chan<- discovery.Update) {
+ defer close(updateCh)
+ for {
+ select {
+ case ad := <-scanCh:
+ // TODO(jhahn): Merge scanData based on InstanceUuid.
+ // TODO(jhahn): Handle "Lost" case.
+ update := discovery.UpdateFound{
+ Value: discovery.Found{Service: ad.Service},
+ }
+ updateCh <- update
+ case <-ctx.Done():
+ return
+ }
+ }
}