discovery: add mdns plugin
The initial implementation of mDNS plugin.
Change-Id: I7465b65c2bd90cc1c6400d2345aae2fee0c38363
diff --git a/runtime/internal/discovery/advertise.go b/runtime/internal/discovery/advertise.go
index f8f156b..0cf4018 100644
--- a/runtime/internal/discovery/advertise.go
+++ b/runtime/internal/discovery/advertise.go
@@ -8,12 +8,25 @@
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/security/access"
+ "v.io/v23/verror"
+)
+
+var (
+ errNoInterfaceName = verror.Register(pkgPath+".errNoInterfaceName", verror.NoRetry, "{1:}{2:} interface name not provided")
+ errNoAddresses = verror.Register(pkgPath+".errNoAddress", verror.NoRetry, "{1:}{2:} address not provided")
)
// Advertise implements discovery.Advertiser.
//
// TODO(jhahn): Handle ACL.
func (ds *ds) Advertise(ctx *context.T, service discovery.Service, perms access.Permissions) error {
+ if len(service.InterfaceName) == 0 {
+ return verror.New(errNoInterfaceName, ctx)
+ }
+ if len(service.Addrs) == 0 {
+ return verror.New(errNoAddresses, ctx)
+ }
+
if len(service.InstanceUuid) == 0 {
service.InstanceUuid = NewInstanceUUID()
}
diff --git a/runtime/internal/discovery/discovery.go b/runtime/internal/discovery/discovery.go
index 79329ec..b502095 100644
--- a/runtime/internal/discovery/discovery.go
+++ b/runtime/internal/discovery/discovery.go
@@ -10,6 +10,8 @@
"v.io/v23/discovery"
)
+const pkgPath = "v.io/x/ref/runtime/internal/discovery"
+
// ds is an implementation of discovery.T.
type ds struct {
plugins []Plugin
diff --git a/runtime/internal/discovery/plugins/mdns/mdns.go b/runtime/internal/discovery/plugins/mdns/mdns.go
new file mode 100644
index 0000000..5e59790
--- /dev/null
+++ b/runtime/internal/discovery/plugins/mdns/mdns.go
@@ -0,0 +1,241 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package mdns implements mDNS plugin for discovery service.
+//
+// In order to support discovery of a specific vanadium service, an instance
+// is advertised in two ways - one as a vanadium service and the other as a
+// subtype of vanadium service.
+//
+// For example, a vanadium printer service is advertised as
+//
+// v23._tcp.local.
+// _<printer_service_uuid>._sub._v23._tcp.local.
+//
+// Even though an instance is advertised as two services, both PTR records refer
+// to the same name.
+//
+// _v23._tcp.local. PTR <instance_uuid>.<printer_service_uuid>._v23._tcp.local.
+// _<printer_service_uuid>._sub._v23._tcp.local.
+// PTR <instance_uuid>.<printer_service_uuid>._v23._tcp.local.
+package mdns
+
+import (
+ "encoding/hex"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+
+ idiscovery "v.io/x/ref/runtime/internal/discovery"
+
+ "github.com/pborman/uuid"
+ mdns "github.com/presotto/go-mdns-sd"
+)
+
+const (
+ v23ServiceName = "v23"
+ serviceNameSuffix = "._sub._" + v23ServiceName
+ // The host name is in the form of '<instance uuid>.<service uuid>._v23._tcp.local.'.
+ // The double dots at the end are for bypassing the host name composition in
+ // go-mdns-sd package so that we can use the same host name both in the (subtype)
+ // service and v23 service announcements.
+ hostNameSuffix = "._v23._tcp.local.."
+
+ attrInterface = "__intf"
+ attrAddr = "__addr"
+)
+
+type plugin struct {
+ mdns *mdns.MDNS
+ adStopper *idiscovery.Trigger
+
+ subscriptionRefreshTime time.Duration
+ subscriptionWaitTime time.Duration
+ subscriptionMu sync.Mutex
+ subscription map[string]subscription // GUARDED_BY(subscriptionMu)
+}
+
+type subscription struct {
+ count int
+ lastSubscription time.Time
+}
+
+func (p *plugin) Advertise(ctx *context.T, ad *idiscovery.Advertisement) error {
+ serviceName := ad.ServiceUuid.String() + serviceNameSuffix
+ hostName := fmt.Sprintf("%x.%s%s", ad.InstanceUuid, ad.ServiceUuid.String(), hostNameSuffix)
+ txt, err := createTXTRecords(ad)
+ if err != nil {
+ return err
+ }
+
+ // Announce the service.
+ err = p.mdns.AddService(serviceName, hostName, 0, txt...)
+ if err != nil {
+ return err
+ }
+ // Announce it as v23 service as well so that we can discover
+ // all v23 services through mDNS.
+ err = p.mdns.AddService(v23ServiceName, hostName, 0, txt...)
+ if err != nil {
+ return err
+ }
+ stop := func() {
+ p.mdns.RemoveService(serviceName, hostName)
+ p.mdns.RemoveService(v23ServiceName, hostName)
+ }
+ p.adStopper.Add(stop, ctx.Done())
+ return nil
+}
+
+func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *idiscovery.Advertisement) error {
+ var serviceName string
+ if len(serviceUuid) == 0 {
+ serviceName = v23ServiceName
+ } else {
+ serviceName = serviceUuid.String() + serviceNameSuffix
+ }
+
+ go func() {
+ p.subscriptionMu.Lock()
+ sub := p.subscription[serviceName]
+ sub.count++
+ // If we haven't refreshed in a while, do it now.
+ if time.Since(sub.lastSubscription) > p.subscriptionRefreshTime {
+ p.mdns.SubscribeToService(serviceName)
+ // Wait a bit to learn about neighborhood.
+ time.Sleep(p.subscriptionWaitTime)
+ sub.lastSubscription = time.Now()
+ }
+ p.subscription[serviceName] = sub
+ p.subscriptionMu.Unlock()
+
+ defer func() {
+ p.subscriptionMu.Lock()
+ sub := p.subscription[serviceName]
+ sub.count--
+ if sub.count == 0 {
+ delete(p.subscription, serviceName)
+ p.mdns.UnsubscribeFromService(serviceName)
+ } else {
+ p.subscription[serviceName] = sub
+ }
+ p.subscriptionMu.Unlock()
+ }()
+
+ // TODO(jhahn): Handle "Lost" case.
+ services := p.mdns.ServiceDiscovery(serviceName)
+ for _, service := range services {
+ ad, err := decodeAdvertisement(service)
+ if err != nil {
+ ctx.Error(err)
+ continue
+ }
+ select {
+ case scanCh <- ad:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ return nil
+}
+
+func createTXTRecords(ad *idiscovery.Advertisement) ([]string, error) {
+ // Prepare a TXT record with attributes and addresses to announce.
+ //
+ // TODO(jhahn): Currently, the record size is limited to 2000 bytes in
+ // go-mdns-sd package. Think about how to handle a large TXT record size
+ // exceeds the limit.
+ txt := make([]string, 0, len(ad.Attrs)+len(ad.Addrs)+1)
+ txt = append(txt, fmt.Sprintf("%s=%s", attrInterface, ad.InterfaceName))
+ for k, v := range ad.Attrs {
+ txt = append(txt, fmt.Sprintf("%s=%s", k, v))
+ }
+ for _, addr := range ad.Addrs {
+ txt = append(txt, fmt.Sprintf("%s=%s", attrAddr, addr))
+ }
+ return txt, nil
+}
+
+func decodeAdvertisement(service mdns.ServiceInstance) (*idiscovery.Advertisement, error) {
+ // Note that service.Name would be '<instance uuid>.<service uuid>._v23._tcp.local.' for
+ // subtype service discovery and ''<instance uuid>.<service uuid>' for v23 service discovery.
+ p := strings.SplitN(service.Name, ".", 3)
+ if len(p) < 2 {
+ return nil, fmt.Errorf("invalid host name: %s", service.Name)
+ }
+ instanceUuid, err := hex.DecodeString(p[0])
+ if err != nil {
+ return nil, fmt.Errorf("invalid instance uuid in host name: %s", p[0])
+ }
+ serviceUuid := uuid.Parse(p[1])
+ if len(serviceUuid) == 0 {
+ return nil, fmt.Errorf("invalid service uuid in host name: %s", p[1])
+ }
+
+ ad := idiscovery.Advertisement{
+ ServiceUuid: serviceUuid,
+ Service: discovery.Service{
+ InstanceUuid: instanceUuid,
+ Attrs: make(discovery.Attributes),
+ },
+ }
+ for _, rr := range service.TxtRRs {
+ for _, txt := range rr.Txt {
+ kv := strings.SplitN(txt, "=", 2)
+ if len(kv) != 2 {
+ return nil, fmt.Errorf("invalid txt record: %s", txt)
+ }
+ switch k, v := kv[0], kv[1]; k {
+ case attrInterface:
+ ad.InterfaceName = v
+ case attrAddr:
+ ad.Addrs = append(ad.Addrs, v)
+ default:
+ ad.Attrs[k] = v
+ }
+ }
+ }
+ return &ad, nil
+}
+
+func New(host string) (idiscovery.Plugin, error) {
+ return newWithLoopback(host, false)
+}
+
+func newWithLoopback(host string, loopback bool) (idiscovery.Plugin, error) {
+ if len(host) == 0 {
+ // go-mdns-sd reannounce the services periodically only when the host name
+ // is set. Use a default one if not given.
+ host = "v23()"
+ }
+ m, err := mdns.NewMDNS(host, "", "", loopback, false)
+ if err != nil {
+ // The name may not have been unique. Try one more time with a unique
+ // name. NewMDNS will replace the "()" with "(hardware mac address)".
+ if len(host) > 0 && !strings.HasSuffix(host, "()") {
+ m, err = mdns.NewMDNS(host+"()", "", "", loopback, false)
+ }
+ if err != nil {
+ return nil, err
+ }
+ }
+ p := plugin{
+ mdns: m,
+ adStopper: idiscovery.NewTrigger(),
+ // TODO(jhahn): Figure out a good subscription refresh time.
+ subscriptionRefreshTime: 10 * time.Second,
+ subscription: make(map[string]subscription),
+ }
+ if loopback {
+ p.subscriptionWaitTime = 5 * time.Millisecond
+ } else {
+ p.subscriptionWaitTime = 50 * time.Millisecond
+ }
+ return &p, nil
+}
diff --git a/runtime/internal/discovery/plugins/mdns/mdns_test.go b/runtime/internal/discovery/plugins/mdns/mdns_test.go
new file mode 100644
index 0000000..77a7bd4
--- /dev/null
+++ b/runtime/internal/discovery/plugins/mdns/mdns_test.go
@@ -0,0 +1,175 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package mdns
+
+import (
+ "fmt"
+ "reflect"
+ "runtime"
+ "testing"
+ "time"
+
+ "github.com/pborman/uuid"
+
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+
+ idiscovery "v.io/x/ref/runtime/internal/discovery"
+)
+
+func TestBasic(t *testing.T) {
+ services := []discovery.Service{
+ {
+ InstanceUuid: idiscovery.NewInstanceUUID(),
+ InterfaceName: "v.io/x",
+ Attrs: discovery.Attributes{
+ "a": "a1234",
+ "b": "b1234",
+ },
+ Addrs: []string{
+ "/@6@wsh@foo.com:1234@@/x",
+ },
+ },
+ {
+ InstanceUuid: idiscovery.NewInstanceUUID(),
+ InterfaceName: "v.io/x",
+ Attrs: discovery.Attributes{
+ "a": "a5678",
+ "b": "b5678",
+ },
+ Addrs: []string{
+ "/@6@wsh@bar.com:1234@@/x",
+ },
+ },
+ {
+ InstanceUuid: idiscovery.NewInstanceUUID(),
+ InterfaceName: "v.io/y",
+ Attrs: discovery.Attributes{
+ "c": "c1234",
+ "d": "d1234",
+ },
+ Addrs: []string{
+ "/@6@wsh@foo.com:1234@@/y",
+ "/@6@wsh@bar.com:1234@@/y",
+ },
+ },
+ }
+
+ p1, err := newWithLoopback("m1", true)
+ if err != nil {
+ t.Fatalf("New() failed: %v", err)
+ }
+ p2, err := newWithLoopback("m2", true)
+ if err != nil {
+ t.Fatalf("New() failed: %v", err)
+ }
+
+ var stops []func()
+ for _, service := range services {
+ stop, err := advertise(p1, service)
+ if err != nil {
+ t.Fatal(err)
+ }
+ stops = append(stops, stop)
+ }
+
+ if err := scanAndMatch(p2, "v.io/x", services[0], services[1]); err != nil {
+ t.Error(err)
+ }
+ if err := scanAndMatch(p2, "v.io/y", services[2]); err != nil {
+ t.Error(err)
+ }
+ if err := scanAndMatch(p2, "", services...); err != nil {
+ t.Error(err)
+ }
+ if err := scanAndMatch(p2, "v.io/z"); err != nil {
+ t.Error(err)
+ }
+
+ stops[0]()
+ if err := scanAndMatch(p2, "v.io/x", services[1]); err != nil {
+ t.Error(err)
+ }
+ if err := scanAndMatch(p2, "", services[1], services[2]); err != nil {
+ t.Error(err)
+ }
+ stops[1]()
+ stops[2]()
+ if err := scanAndMatch(p2, ""); err != nil {
+ t.Error(err)
+ }
+}
+
+func advertise(p idiscovery.Plugin, service discovery.Service) (func(), error) {
+ ctx, cancel := context.RootContext()
+ ad := idiscovery.Advertisement{
+ ServiceUuid: idiscovery.NewServiceUUID(service.InterfaceName),
+ Service: service,
+ }
+ if err := p.Advertise(ctx, &ad); err != nil {
+ return nil, fmt.Errorf("Advertise failed: %v", err)
+ }
+ return cancel, nil
+}
+
+func scan(p idiscovery.Plugin, interfaceName string) ([]idiscovery.Advertisement, error) {
+ ctx, _ := context.RootContext()
+ scanCh := make(chan *idiscovery.Advertisement)
+ var serviceUuid uuid.UUID
+ if len(interfaceName) > 0 {
+ serviceUuid = idiscovery.NewServiceUUID(interfaceName)
+ }
+ if err := p.Scan(ctx, serviceUuid, scanCh); err != nil {
+ return nil, fmt.Errorf("Scan failed: %v", err)
+ }
+ var ads []idiscovery.Advertisement
+ for {
+ select {
+ case ad := <-scanCh:
+ ads = append(ads, *ad)
+ case <-time.After(10 * time.Millisecond):
+ return ads, nil
+ }
+ }
+}
+
+func match(ads []idiscovery.Advertisement, wants ...discovery.Service) bool {
+ for _, want := range wants {
+ matched := false
+ for i, ad := range ads {
+ if !uuid.Equal(ad.ServiceUuid, idiscovery.NewServiceUUID(want.InterfaceName)) {
+ continue
+ }
+ matched = reflect.DeepEqual(ad.Service, want)
+ if matched {
+ ads = append(ads[:i], ads[i+1:]...)
+ break
+ }
+ }
+ if !matched {
+ return false
+ }
+ }
+ return len(ads) == 0
+}
+
+func scanAndMatch(p idiscovery.Plugin, interfaceName string, wants ...discovery.Service) error {
+ const timeout = 1 * time.Second
+
+ var ads []idiscovery.Advertisement
+ for now := time.Now(); time.Since(now) < timeout; {
+ runtime.Gosched()
+
+ var err error
+ ads, err = scan(p, interfaceName)
+ if err != nil {
+ return err
+ }
+ if match(ads, wants...) {
+ return nil
+ }
+ }
+ return fmt.Errorf("Match failed; got %v, but wanted %v", ads, wants)
+}
diff --git a/runtime/internal/discovery/trigger.go b/runtime/internal/discovery/trigger.go
new file mode 100644
index 0000000..00c5daf
--- /dev/null
+++ b/runtime/internal/discovery/trigger.go
@@ -0,0 +1,61 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import "reflect"
+
+// A Trigger is a simple multi-channel receiver. It triggers callbacks
+// when it receives signals from the corresponding channels.
+// Each callback will run only once and run in a separate goroutine.
+type Trigger struct {
+ addCh chan *addRequest
+}
+
+type addRequest struct {
+ callback func()
+ sc reflect.SelectCase
+}
+
+// Add enqueues callback to be invoked on the next event on ch.
+func (tr *Trigger) Add(callback func(), ch <-chan struct{}) {
+ sc := reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ tr.addCh <- &addRequest{callback, sc}
+}
+
+// Stop stops the trigger.
+func (tr *Trigger) Stop() {
+ close(tr.addCh)
+}
+
+func (tr *Trigger) loop() {
+ callbacks := make([]func(), 1, 4)
+ cases := make([]reflect.SelectCase, 1, 4)
+ cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tr.addCh)}
+
+ for {
+ chosen, recv, ok := reflect.Select(cases)
+ switch chosen {
+ case 0:
+ if !ok {
+ // The trigger has been stopped.
+ return
+ }
+ // Add a new callback.
+ req := recv.Interface().(*addRequest)
+ callbacks = append(callbacks, req.callback)
+ cases = append(cases, req.sc)
+ default:
+ go callbacks[chosen]()
+ callbacks = append(callbacks[:chosen], callbacks[chosen+1:]...)
+ cases = append(cases[:chosen], cases[chosen+1:]...)
+ }
+ }
+}
+
+func NewTrigger() *Trigger {
+ tr := &Trigger{addCh: make(chan *addRequest)}
+ go tr.loop()
+ return tr
+}
diff --git a/runtime/internal/discovery/trigger_test.go b/runtime/internal/discovery/trigger_test.go
new file mode 100644
index 0000000..da2db00
--- /dev/null
+++ b/runtime/internal/discovery/trigger_test.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import (
+ "runtime"
+ "testing"
+)
+
+func TestTrigger(t *testing.T) {
+ tr := NewTrigger()
+ defer tr.Stop()
+
+ done := make(chan int)
+
+ f0 := func() { done <- 0 }
+ c0 := make(chan struct{})
+ tr.Add(f0, c0)
+
+ f1 := func() { done <- 1 }
+ c1 := make(chan struct{})
+ tr.Add(f1, c1)
+
+ // No signal; Shouldn't trigger anything.
+ runtime.Gosched()
+ select {
+ case got := <-done:
+ t.Errorf("Unexpected Trigger; got %v", got)
+ default:
+ }
+
+ // Send a signal to c1; Should trigger f1().
+ close(c1)
+ if got, want := <-done, 1; got != want {
+ t.Errorf("Trigger failed; got %v, but wanted %v", got, want)
+ }
+
+ // Send a signal to c0; Should trigger f0().
+ c0 <- struct{}{}
+ if got, want := <-done, 0; got != want {
+ t.Errorf("Trigger failed; got %v, but wanted %v", got, want)
+ }
+}