discovery: add mdns plugin

  The initial implementation of mDNS plugin.

Change-Id: I7465b65c2bd90cc1c6400d2345aae2fee0c38363
diff --git a/runtime/internal/discovery/advertise.go b/runtime/internal/discovery/advertise.go
index f8f156b..0cf4018 100644
--- a/runtime/internal/discovery/advertise.go
+++ b/runtime/internal/discovery/advertise.go
@@ -8,12 +8,25 @@
 	"v.io/v23/context"
 	"v.io/v23/discovery"
 	"v.io/v23/security/access"
+	"v.io/v23/verror"
+)
+
+var (
+	errNoInterfaceName = verror.Register(pkgPath+".errNoInterfaceName", verror.NoRetry, "{1:}{2:} interface name not provided")
+	errNoAddresses     = verror.Register(pkgPath+".errNoAddress", verror.NoRetry, "{1:}{2:} address not provided")
 )
 
 // Advertise implements discovery.Advertiser.
 //
 // TODO(jhahn): Handle ACL.
 func (ds *ds) Advertise(ctx *context.T, service discovery.Service, perms access.Permissions) error {
+	if len(service.InterfaceName) == 0 {
+		return verror.New(errNoInterfaceName, ctx)
+	}
+	if len(service.Addrs) == 0 {
+		return verror.New(errNoAddresses, ctx)
+	}
+
 	if len(service.InstanceUuid) == 0 {
 		service.InstanceUuid = NewInstanceUUID()
 	}
diff --git a/runtime/internal/discovery/discovery.go b/runtime/internal/discovery/discovery.go
index 79329ec..b502095 100644
--- a/runtime/internal/discovery/discovery.go
+++ b/runtime/internal/discovery/discovery.go
@@ -10,6 +10,8 @@
 	"v.io/v23/discovery"
 )
 
+const pkgPath = "v.io/x/ref/runtime/internal/discovery"
+
 // ds is an implementation of discovery.T.
 type ds struct {
 	plugins []Plugin
diff --git a/runtime/internal/discovery/plugins/mdns/mdns.go b/runtime/internal/discovery/plugins/mdns/mdns.go
new file mode 100644
index 0000000..5e59790
--- /dev/null
+++ b/runtime/internal/discovery/plugins/mdns/mdns.go
@@ -0,0 +1,241 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package mdns implements mDNS plugin for discovery service.
+//
+// In order to support discovery of a specific vanadium service, an instance
+// is advertised in two ways - one as a vanadium service and the other as a
+// subtype of vanadium service.
+//
+// For example, a vanadium printer service is advertised as
+//
+//    v23._tcp.local.
+//    _<printer_service_uuid>._sub._v23._tcp.local.
+//
+// Even though an instance is advertised as two services, both PTR records refer
+// to the same name.
+//
+//    _v23._tcp.local.  PTR <instance_uuid>.<printer_service_uuid>._v23._tcp.local.
+//    _<printer_service_uuid>._sub._v23._tcp.local.
+//                      PTR <instance_uuid>.<printer_service_uuid>._v23._tcp.local.
+package mdns
+
+import (
+	"encoding/hex"
+	"fmt"
+	"strings"
+	"sync"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/discovery"
+
+	idiscovery "v.io/x/ref/runtime/internal/discovery"
+
+	"github.com/pborman/uuid"
+	mdns "github.com/presotto/go-mdns-sd"
+)
+
+const (
+	v23ServiceName    = "v23"
+	serviceNameSuffix = "._sub._" + v23ServiceName
+	// The host name is in the form of '<instance uuid>.<service uuid>._v23._tcp.local.'.
+	// The double dots at the end are for bypassing the host name composition in
+	// go-mdns-sd package so that we can use the same host name both in the (subtype)
+	// service and v23 service announcements.
+	hostNameSuffix = "._v23._tcp.local.."
+
+	attrInterface = "__intf"
+	attrAddr      = "__addr"
+)
+
+type plugin struct {
+	mdns      *mdns.MDNS
+	adStopper *idiscovery.Trigger
+
+	subscriptionRefreshTime time.Duration
+	subscriptionWaitTime    time.Duration
+	subscriptionMu          sync.Mutex
+	subscription            map[string]subscription // GUARDED_BY(subscriptionMu)
+}
+
+type subscription struct {
+	count            int
+	lastSubscription time.Time
+}
+
+func (p *plugin) Advertise(ctx *context.T, ad *idiscovery.Advertisement) error {
+	serviceName := ad.ServiceUuid.String() + serviceNameSuffix
+	hostName := fmt.Sprintf("%x.%s%s", ad.InstanceUuid, ad.ServiceUuid.String(), hostNameSuffix)
+	txt, err := createTXTRecords(ad)
+	if err != nil {
+		return err
+	}
+
+	// Announce the service.
+	err = p.mdns.AddService(serviceName, hostName, 0, txt...)
+	if err != nil {
+		return err
+	}
+	// Announce it as v23 service as well so that we can discover
+	// all v23 services through mDNS.
+	err = p.mdns.AddService(v23ServiceName, hostName, 0, txt...)
+	if err != nil {
+		return err
+	}
+	stop := func() {
+		p.mdns.RemoveService(serviceName, hostName)
+		p.mdns.RemoveService(v23ServiceName, hostName)
+	}
+	p.adStopper.Add(stop, ctx.Done())
+	return nil
+}
+
+func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *idiscovery.Advertisement) error {
+	var serviceName string
+	if len(serviceUuid) == 0 {
+		serviceName = v23ServiceName
+	} else {
+		serviceName = serviceUuid.String() + serviceNameSuffix
+	}
+
+	go func() {
+		p.subscriptionMu.Lock()
+		sub := p.subscription[serviceName]
+		sub.count++
+		// If we haven't refreshed in a while, do it now.
+		if time.Since(sub.lastSubscription) > p.subscriptionRefreshTime {
+			p.mdns.SubscribeToService(serviceName)
+			// Wait a bit to learn about neighborhood.
+			time.Sleep(p.subscriptionWaitTime)
+			sub.lastSubscription = time.Now()
+		}
+		p.subscription[serviceName] = sub
+		p.subscriptionMu.Unlock()
+
+		defer func() {
+			p.subscriptionMu.Lock()
+			sub := p.subscription[serviceName]
+			sub.count--
+			if sub.count == 0 {
+				delete(p.subscription, serviceName)
+				p.mdns.UnsubscribeFromService(serviceName)
+			} else {
+				p.subscription[serviceName] = sub
+			}
+			p.subscriptionMu.Unlock()
+		}()
+
+		// TODO(jhahn): Handle "Lost" case.
+		services := p.mdns.ServiceDiscovery(serviceName)
+		for _, service := range services {
+			ad, err := decodeAdvertisement(service)
+			if err != nil {
+				ctx.Error(err)
+				continue
+			}
+			select {
+			case scanCh <- ad:
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+	return nil
+}
+
+func createTXTRecords(ad *idiscovery.Advertisement) ([]string, error) {
+	// Prepare a TXT record with attributes and addresses to announce.
+	//
+	// TODO(jhahn): Currently, the record size is limited to 2000 bytes in
+	// go-mdns-sd package. Think about how to handle a large TXT record size
+	// exceeds the limit.
+	txt := make([]string, 0, len(ad.Attrs)+len(ad.Addrs)+1)
+	txt = append(txt, fmt.Sprintf("%s=%s", attrInterface, ad.InterfaceName))
+	for k, v := range ad.Attrs {
+		txt = append(txt, fmt.Sprintf("%s=%s", k, v))
+	}
+	for _, addr := range ad.Addrs {
+		txt = append(txt, fmt.Sprintf("%s=%s", attrAddr, addr))
+	}
+	return txt, nil
+}
+
+func decodeAdvertisement(service mdns.ServiceInstance) (*idiscovery.Advertisement, error) {
+	// Note that service.Name would be '<instance uuid>.<service uuid>._v23._tcp.local.' for
+	// subtype service discovery and ''<instance uuid>.<service uuid>' for v23 service discovery.
+	p := strings.SplitN(service.Name, ".", 3)
+	if len(p) < 2 {
+		return nil, fmt.Errorf("invalid host name: %s", service.Name)
+	}
+	instanceUuid, err := hex.DecodeString(p[0])
+	if err != nil {
+		return nil, fmt.Errorf("invalid instance uuid in host name: %s", p[0])
+	}
+	serviceUuid := uuid.Parse(p[1])
+	if len(serviceUuid) == 0 {
+		return nil, fmt.Errorf("invalid service uuid in host name: %s", p[1])
+	}
+
+	ad := idiscovery.Advertisement{
+		ServiceUuid: serviceUuid,
+		Service: discovery.Service{
+			InstanceUuid: instanceUuid,
+			Attrs:        make(discovery.Attributes),
+		},
+	}
+	for _, rr := range service.TxtRRs {
+		for _, txt := range rr.Txt {
+			kv := strings.SplitN(txt, "=", 2)
+			if len(kv) != 2 {
+				return nil, fmt.Errorf("invalid txt record: %s", txt)
+			}
+			switch k, v := kv[0], kv[1]; k {
+			case attrInterface:
+				ad.InterfaceName = v
+			case attrAddr:
+				ad.Addrs = append(ad.Addrs, v)
+			default:
+				ad.Attrs[k] = v
+			}
+		}
+	}
+	return &ad, nil
+}
+
+func New(host string) (idiscovery.Plugin, error) {
+	return newWithLoopback(host, false)
+}
+
+func newWithLoopback(host string, loopback bool) (idiscovery.Plugin, error) {
+	if len(host) == 0 {
+		// go-mdns-sd reannounce the services periodically only when the host name
+		// is set. Use a default one if not given.
+		host = "v23()"
+	}
+	m, err := mdns.NewMDNS(host, "", "", loopback, false)
+	if err != nil {
+		// The name may not have been unique. Try one more time with a unique
+		// name. NewMDNS will replace the "()" with "(hardware mac address)".
+		if len(host) > 0 && !strings.HasSuffix(host, "()") {
+			m, err = mdns.NewMDNS(host+"()", "", "", loopback, false)
+		}
+		if err != nil {
+			return nil, err
+		}
+	}
+	p := plugin{
+		mdns:      m,
+		adStopper: idiscovery.NewTrigger(),
+		// TODO(jhahn): Figure out a good subscription refresh time.
+		subscriptionRefreshTime: 10 * time.Second,
+		subscription:            make(map[string]subscription),
+	}
+	if loopback {
+		p.subscriptionWaitTime = 5 * time.Millisecond
+	} else {
+		p.subscriptionWaitTime = 50 * time.Millisecond
+	}
+	return &p, nil
+}
diff --git a/runtime/internal/discovery/plugins/mdns/mdns_test.go b/runtime/internal/discovery/plugins/mdns/mdns_test.go
new file mode 100644
index 0000000..77a7bd4
--- /dev/null
+++ b/runtime/internal/discovery/plugins/mdns/mdns_test.go
@@ -0,0 +1,175 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package mdns
+
+import (
+	"fmt"
+	"reflect"
+	"runtime"
+	"testing"
+	"time"
+
+	"github.com/pborman/uuid"
+
+	"v.io/v23/context"
+	"v.io/v23/discovery"
+
+	idiscovery "v.io/x/ref/runtime/internal/discovery"
+)
+
+func TestBasic(t *testing.T) {
+	services := []discovery.Service{
+		{
+			InstanceUuid:  idiscovery.NewInstanceUUID(),
+			InterfaceName: "v.io/x",
+			Attrs: discovery.Attributes{
+				"a": "a1234",
+				"b": "b1234",
+			},
+			Addrs: []string{
+				"/@6@wsh@foo.com:1234@@/x",
+			},
+		},
+		{
+			InstanceUuid:  idiscovery.NewInstanceUUID(),
+			InterfaceName: "v.io/x",
+			Attrs: discovery.Attributes{
+				"a": "a5678",
+				"b": "b5678",
+			},
+			Addrs: []string{
+				"/@6@wsh@bar.com:1234@@/x",
+			},
+		},
+		{
+			InstanceUuid:  idiscovery.NewInstanceUUID(),
+			InterfaceName: "v.io/y",
+			Attrs: discovery.Attributes{
+				"c": "c1234",
+				"d": "d1234",
+			},
+			Addrs: []string{
+				"/@6@wsh@foo.com:1234@@/y",
+				"/@6@wsh@bar.com:1234@@/y",
+			},
+		},
+	}
+
+	p1, err := newWithLoopback("m1", true)
+	if err != nil {
+		t.Fatalf("New() failed: %v", err)
+	}
+	p2, err := newWithLoopback("m2", true)
+	if err != nil {
+		t.Fatalf("New() failed: %v", err)
+	}
+
+	var stops []func()
+	for _, service := range services {
+		stop, err := advertise(p1, service)
+		if err != nil {
+			t.Fatal(err)
+		}
+		stops = append(stops, stop)
+	}
+
+	if err := scanAndMatch(p2, "v.io/x", services[0], services[1]); err != nil {
+		t.Error(err)
+	}
+	if err := scanAndMatch(p2, "v.io/y", services[2]); err != nil {
+		t.Error(err)
+	}
+	if err := scanAndMatch(p2, "", services...); err != nil {
+		t.Error(err)
+	}
+	if err := scanAndMatch(p2, "v.io/z"); err != nil {
+		t.Error(err)
+	}
+
+	stops[0]()
+	if err := scanAndMatch(p2, "v.io/x", services[1]); err != nil {
+		t.Error(err)
+	}
+	if err := scanAndMatch(p2, "", services[1], services[2]); err != nil {
+		t.Error(err)
+	}
+	stops[1]()
+	stops[2]()
+	if err := scanAndMatch(p2, ""); err != nil {
+		t.Error(err)
+	}
+}
+
+func advertise(p idiscovery.Plugin, service discovery.Service) (func(), error) {
+	ctx, cancel := context.RootContext()
+	ad := idiscovery.Advertisement{
+		ServiceUuid: idiscovery.NewServiceUUID(service.InterfaceName),
+		Service:     service,
+	}
+	if err := p.Advertise(ctx, &ad); err != nil {
+		return nil, fmt.Errorf("Advertise failed: %v", err)
+	}
+	return cancel, nil
+}
+
+func scan(p idiscovery.Plugin, interfaceName string) ([]idiscovery.Advertisement, error) {
+	ctx, _ := context.RootContext()
+	scanCh := make(chan *idiscovery.Advertisement)
+	var serviceUuid uuid.UUID
+	if len(interfaceName) > 0 {
+		serviceUuid = idiscovery.NewServiceUUID(interfaceName)
+	}
+	if err := p.Scan(ctx, serviceUuid, scanCh); err != nil {
+		return nil, fmt.Errorf("Scan failed: %v", err)
+	}
+	var ads []idiscovery.Advertisement
+	for {
+		select {
+		case ad := <-scanCh:
+			ads = append(ads, *ad)
+		case <-time.After(10 * time.Millisecond):
+			return ads, nil
+		}
+	}
+}
+
+func match(ads []idiscovery.Advertisement, wants ...discovery.Service) bool {
+	for _, want := range wants {
+		matched := false
+		for i, ad := range ads {
+			if !uuid.Equal(ad.ServiceUuid, idiscovery.NewServiceUUID(want.InterfaceName)) {
+				continue
+			}
+			matched = reflect.DeepEqual(ad.Service, want)
+			if matched {
+				ads = append(ads[:i], ads[i+1:]...)
+				break
+			}
+		}
+		if !matched {
+			return false
+		}
+	}
+	return len(ads) == 0
+}
+
+func scanAndMatch(p idiscovery.Plugin, interfaceName string, wants ...discovery.Service) error {
+	const timeout = 1 * time.Second
+
+	var ads []idiscovery.Advertisement
+	for now := time.Now(); time.Since(now) < timeout; {
+		runtime.Gosched()
+
+		var err error
+		ads, err = scan(p, interfaceName)
+		if err != nil {
+			return err
+		}
+		if match(ads, wants...) {
+			return nil
+		}
+	}
+	return fmt.Errorf("Match failed; got %v, but wanted %v", ads, wants)
+}
diff --git a/runtime/internal/discovery/trigger.go b/runtime/internal/discovery/trigger.go
new file mode 100644
index 0000000..00c5daf
--- /dev/null
+++ b/runtime/internal/discovery/trigger.go
@@ -0,0 +1,61 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import "reflect"
+
+// A Trigger is a simple multi-channel receiver. It triggers callbacks
+// when it receives signals from the corresponding channels.
+// Each callback will run only once and run in a separate goroutine.
+type Trigger struct {
+	addCh chan *addRequest
+}
+
+type addRequest struct {
+	callback func()
+	sc       reflect.SelectCase
+}
+
+// Add enqueues callback to be invoked on the next event on ch.
+func (tr *Trigger) Add(callback func(), ch <-chan struct{}) {
+	sc := reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+	tr.addCh <- &addRequest{callback, sc}
+}
+
+// Stop stops the trigger.
+func (tr *Trigger) Stop() {
+	close(tr.addCh)
+}
+
+func (tr *Trigger) loop() {
+	callbacks := make([]func(), 1, 4)
+	cases := make([]reflect.SelectCase, 1, 4)
+	cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tr.addCh)}
+
+	for {
+		chosen, recv, ok := reflect.Select(cases)
+		switch chosen {
+		case 0:
+			if !ok {
+				// The trigger has been stopped.
+				return
+			}
+			// Add a new callback.
+			req := recv.Interface().(*addRequest)
+			callbacks = append(callbacks, req.callback)
+			cases = append(cases, req.sc)
+		default:
+			go callbacks[chosen]()
+			callbacks = append(callbacks[:chosen], callbacks[chosen+1:]...)
+			cases = append(cases[:chosen], cases[chosen+1:]...)
+		}
+	}
+}
+
+func NewTrigger() *Trigger {
+	tr := &Trigger{addCh: make(chan *addRequest)}
+	go tr.loop()
+	return tr
+}
diff --git a/runtime/internal/discovery/trigger_test.go b/runtime/internal/discovery/trigger_test.go
new file mode 100644
index 0000000..da2db00
--- /dev/null
+++ b/runtime/internal/discovery/trigger_test.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import (
+	"runtime"
+	"testing"
+)
+
+func TestTrigger(t *testing.T) {
+	tr := NewTrigger()
+	defer tr.Stop()
+
+	done := make(chan int)
+
+	f0 := func() { done <- 0 }
+	c0 := make(chan struct{})
+	tr.Add(f0, c0)
+
+	f1 := func() { done <- 1 }
+	c1 := make(chan struct{})
+	tr.Add(f1, c1)
+
+	// No signal; Shouldn't trigger anything.
+	runtime.Gosched()
+	select {
+	case got := <-done:
+		t.Errorf("Unexpected Trigger; got %v", got)
+	default:
+	}
+
+	// Send a signal to c1; Should trigger f1().
+	close(c1)
+	if got, want := <-done, 1; got != want {
+		t.Errorf("Trigger failed; got %v, but wanted %v", got, want)
+	}
+
+	// Send a signal to c0; Should trigger f0().
+	c0 <- struct{}{}
+	if got, want := <-done, 0; got != want {
+		t.Errorf("Trigger failed; got %v, but wanted %v", got, want)
+	}
+}