discovery: the first naive implementation of discovery.

   * This is a very simple and naive implementation of discovery and
   * there're tons of TODOs. Hope this works as a starting point.

MultiPart: 2/2

Change-Id: I21fe484f77a652f847dd309f065981b3985d60bb
diff --git a/runtime/internal/discovery/advertise.go b/runtime/internal/discovery/advertise.go
index 1422760..f8f156b 100644
--- a/runtime/internal/discovery/advertise.go
+++ b/runtime/internal/discovery/advertise.go
@@ -11,6 +11,23 @@
 )
 
 // Advertise implements discovery.Advertiser.
+//
+// TODO(jhahn): Handle ACL.
 func (ds *ds) Advertise(ctx *context.T, service discovery.Service, perms access.Permissions) error {
+	if len(service.InstanceUuid) == 0 {
+		service.InstanceUuid = NewInstanceUUID()
+	}
+	ad := &Advertisement{
+		ServiceUuid: NewServiceUUID(service.InterfaceName),
+		Service:     service,
+	}
+	ctx, cancel := context.WithCancel(ctx)
+	for _, plugin := range ds.plugins {
+		err := plugin.Advertise(ctx, ad)
+		if err != nil {
+			cancel()
+			return err
+		}
+	}
 	return nil
 }
diff --git a/runtime/internal/discovery/discovery.go b/runtime/internal/discovery/discovery.go
index 53501d7..79329ec 100644
--- a/runtime/internal/discovery/discovery.go
+++ b/runtime/internal/discovery/discovery.go
@@ -4,6 +4,30 @@
 
 package discovery
 
+import (
+	"github.com/pborman/uuid"
+
+	"v.io/v23/discovery"
+)
+
 // ds is an implementation of discovery.T.
 type ds struct {
+	plugins []Plugin
+}
+
+// Advertisement holds a set of service properties to advertise.
+type Advertisement struct {
+	discovery.Service
+
+	// The service UUID to advertise.
+	ServiceUuid uuid.UUID
+
+	// TODO(jhahn): Add proximity.
+}
+
+// TODO(jhahn): Need a better API.
+func New(plugins []Plugin) discovery.T {
+	ds := &ds{plugins: make([]Plugin, len(plugins))}
+	copy(ds.plugins, plugins)
+	return ds
 }
diff --git a/runtime/internal/discovery/discovery_test.go b/runtime/internal/discovery/discovery_test.go
new file mode 100644
index 0000000..6a4b248
--- /dev/null
+++ b/runtime/internal/discovery/discovery_test.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery_test
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/discovery"
+
+	idiscovery "v.io/x/ref/runtime/internal/discovery"
+	"v.io/x/ref/runtime/internal/discovery/plugins/mock"
+)
+
+func TestBasic(t *testing.T) {
+	ds := idiscovery.New([]idiscovery.Plugin{mock.New()})
+	services := []discovery.Service{
+		{
+			InstanceUuid:  idiscovery.NewInstanceUUID(),
+			InterfaceName: "v.io/v23/a",
+			Addrs:         []string{"/h1:123/x", "/h2:123/y"},
+		},
+		{
+			InstanceUuid:  idiscovery.NewInstanceUUID(),
+			InterfaceName: "v.io/v23/b",
+			Addrs:         []string{"/h1:123/x", "/h2:123/z"},
+		},
+	}
+	var stops []func()
+	for _, service := range services {
+		stop, err := advertise(ds, service)
+		if err != nil {
+			t.Fatalf("Advertise failed: %v\n", err)
+		}
+		stops = append(stops, stop)
+	}
+
+	updates, err := scan(ds, "v.io/v23/a")
+	if err != nil {
+		t.Fatalf("Scan failed: %v\n", err)
+	}
+	if !match(updates, services[0]) {
+		t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[0])
+	}
+	updates, err = scan(ds, "v.io/v23/b")
+	if err != nil {
+		t.Fatalf("Scan failed: %v\n", err)
+	}
+	if !match(updates, services[1]) {
+		t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[1])
+	}
+	updates, err = scan(ds, "")
+	if err != nil {
+		t.Fatalf("Scan failed: %v\n", err)
+	}
+	if !match(updates, services...) {
+		t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services)
+	}
+	updates, err = scan(ds, "v.io/v23/c")
+	if err != nil {
+		t.Fatalf("Scan failed: %v\n", err)
+	}
+	if !match(updates) {
+		t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+	}
+
+	// Stop advertising the first service. Shouldn't affect the other.
+	stops[0]()
+	updates, err = scan(ds, "v.io/v23/a")
+	if err != nil {
+		t.Fatalf("Scan failed: %v\n", err)
+	}
+	if !match(updates) {
+		t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+	}
+	updates, err = scan(ds, "v.io/v23/b")
+	if err != nil {
+		t.Fatalf("Scan failed: %v\n", err)
+	}
+	if !match(updates, services[1]) {
+		t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[1])
+	}
+	// Stop advertising the other. Now shouldn't discover any service.
+	stops[1]()
+	updates, err = scan(ds, "")
+	if err != nil {
+		t.Fatalf("Scan failed: %v\n", err)
+	}
+	if !match(updates) {
+		t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+	}
+}
+
+func advertise(ds discovery.Advertiser, services ...discovery.Service) (func(), error) {
+	ctx, cancel := context.RootContext()
+	for _, service := range services {
+		if err := ds.Advertise(ctx, service, nil); err != nil {
+			return nil, err
+		}
+	}
+	return cancel, nil
+}
+
+func scan(ds discovery.Scanner, query string) ([]discovery.Update, error) {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	updateCh, err := ds.Scan(ctx, query)
+	if err != nil {
+		return nil, err
+	}
+	var updates []discovery.Update
+	for {
+		select {
+		case update := <-updateCh:
+			updates = append(updates, update)
+		case <-time.After(10 * time.Millisecond):
+			return updates, nil
+		}
+	}
+}
+
+func match(updates []discovery.Update, wants ...discovery.Service) bool {
+	for _, want := range wants {
+		matched := false
+		for i, update := range updates {
+			found, ok := update.(discovery.UpdateFound)
+			if !ok {
+				continue
+			}
+			matched = reflect.DeepEqual(found.Value.Service, want)
+			if matched {
+				updates = append(updates[:i], updates[i+1:]...)
+				break
+			}
+		}
+		if !matched {
+			return false
+		}
+	}
+	return len(updates) == 0
+}
diff --git a/runtime/internal/discovery/plugin.go b/runtime/internal/discovery/plugin.go
index c275d89..57a74c1 100644
--- a/runtime/internal/discovery/plugin.go
+++ b/runtime/internal/discovery/plugin.go
@@ -4,6 +4,24 @@
 
 package discovery
 
+import (
+	"github.com/pborman/uuid"
+
+	"v.io/v23/context"
+)
+
 // Plugin is the basic interface for a plugin to discovery service.
+// All implementation should be goroutine-safe.
 type Plugin interface {
+	// Advertise advertises the advertisement. Advertising will continue until
+	// the context is canceled or exceeds its deadline.
+	Advertise(ctx *context.T, ad *Advertisement) error
+
+	// Scan scans services that match the service uuid and returns scanned
+	// advertisements to the channel. A zero-value service uuid means any service.
+	// Scanning will continue until the context is canceled or exceeds its
+	// deadline.
+	//
+	// TODO(jhahn): Pass a filter on service attributes.
+	Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *Advertisement) error
 }
diff --git a/runtime/internal/discovery/plugins/mock/mock.go b/runtime/internal/discovery/plugins/mock/mock.go
new file mode 100644
index 0000000..30423da
--- /dev/null
+++ b/runtime/internal/discovery/plugins/mock/mock.go
@@ -0,0 +1,70 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package mock
+
+import (
+	"sync"
+
+	"github.com/pborman/uuid"
+
+	"v.io/v23/context"
+
+	"v.io/x/ref/runtime/internal/discovery"
+)
+
+type plugin struct {
+	mu       sync.Mutex
+	services map[string][]*discovery.Advertisement // GUARDED_BY(mu)
+}
+
+func (p *plugin) Advertise(ctx *context.T, ad *discovery.Advertisement) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	key := string(ad.ServiceUuid)
+	ads := p.services[key]
+	p.services[key] = append(ads, ad)
+	go func() {
+		<-ctx.Done()
+		p.mu.Lock()
+		defer p.mu.Unlock()
+		ads := p.services[key]
+		for i, a := range ads {
+			if uuid.Equal(a.InstanceUuid, ad.InstanceUuid) {
+				ads = append(ads[:i], ads[i+1:]...)
+				break
+			}
+		}
+		if len(ads) > 0 {
+			p.services[key] = ads
+		} else {
+			delete(p.services, key)
+		}
+	}()
+	return nil
+}
+
+func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *discovery.Advertisement) error {
+	go func() {
+		p.mu.Lock()
+		defer p.mu.Unlock()
+		for key, service := range p.services {
+			if len(serviceUuid) > 0 && key != string(serviceUuid) {
+				continue
+			}
+			for _, ad := range service {
+				select {
+				case scanCh <- ad:
+				case <-ctx.Done():
+					return
+				}
+			}
+		}
+	}()
+	return nil
+}
+
+func New() discovery.Plugin {
+	return &plugin{services: make(map[string][]*discovery.Advertisement)}
+}
diff --git a/runtime/internal/discovery/scan.go b/runtime/internal/discovery/scan.go
index f955326..9425201 100644
--- a/runtime/internal/discovery/scan.go
+++ b/runtime/internal/discovery/scan.go
@@ -5,11 +5,48 @@
 package discovery
 
 import (
+	"github.com/pborman/uuid"
+
 	"v.io/v23/context"
 	"v.io/v23/discovery"
 )
 
 // Scan implements discovery.Scanner.
 func (ds *ds) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
-	return nil, nil
+	// TODO(jhann): Implement a simple query processor.
+	var serviceUuid uuid.UUID
+	if len(query) > 0 {
+		serviceUuid = NewServiceUUID(query)
+	}
+	// TODO(jhahn): Revisit the buffer size.
+	scanCh := make(chan *Advertisement, 10)
+	ctx, cancel := context.WithCancel(ctx)
+	for _, plugin := range ds.plugins {
+		err := plugin.Scan(ctx, serviceUuid, scanCh)
+		if err != nil {
+			cancel()
+			return nil, err
+		}
+	}
+	// TODO(jhahn): Revisit the buffer size.
+	updateCh := make(chan discovery.Update, 10)
+	go doScan(ctx, scanCh, updateCh)
+	return updateCh, nil
+}
+
+func doScan(ctx *context.T, scanCh <-chan *Advertisement, updateCh chan<- discovery.Update) {
+	defer close(updateCh)
+	for {
+		select {
+		case ad := <-scanCh:
+			// TODO(jhahn): Merge scanData based on InstanceUuid.
+			// TODO(jhahn): Handle "Lost" case.
+			update := discovery.UpdateFound{
+				Value: discovery.Found{Service: ad.Service},
+			}
+			updateCh <- update
+		case <-ctx.Done():
+			return
+		}
+	}
 }