blob: 77dae7ee33a876d4e0a7da4a23e1fac3674ea91a [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package mdns implements mDNS plugin for discovery service.
//
// In order to support discovery of a specific vanadium service, an instance
// is advertised in two ways - one as a vanadium service and the other as a
// subtype of vanadium service.
//
// For example, a vanadium printer service is advertised as
//
// v23._tcp.local.
// _<printer_service_uuid>._sub._v23._tcp.local.
package mdns
import (
"bytes"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/pborman/uuid"
mdns "github.com/presotto/go-mdns-sd"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/x/lib/netconfig"
idiscovery "v.io/x/ref/lib/discovery"
)
const (
v23ServiceName = "v23"
serviceNameSuffix = "._sub._" + v23ServiceName
// Use short attribute names due to the txt record size limit.
attrInterface = "_i"
attrAddresses = "_a"
attrEncryption = "_e"
attrHash = "_h"
attrTimestamp = "_t"
attrDirAddrs = "_d"
attrStatus = "_s"
// The prefix for attribute names for encoded attachments.
attrAttachmentPrefix = "__"
// The prefix for attribute names for encoded large txt records.
attrLargeTxtPrefix = "_x"
// RFC 6763 limits each DNS txt record to 255 bytes and recommends to not have
// the cumulative size be larger than 1300 bytes. We limit the total size further
// since the underlying mdns package allows records smaller than that.
maxTxtRecordLen = 255
maxTotalTxtRecordsLen = 1184
)
var (
errMaxTxtRecordLenExceeded = errors.New("max txt record size exceeded")
)
type plugin struct {
ctx *context.T
closed chan struct{}
wg sync.WaitGroup
mdns *mdns.MDNS
adStopper *idiscovery.Trigger
subscriptionRefreshTime time.Duration
subscriptionWaitTime time.Duration
subscriptionMu sync.Mutex
subscription map[string]subscription // GUARDED_BY(subscriptionMu)
}
type subscription struct {
count int
lastSubscription time.Time
}
func interfaceNameToServiceName(interfaceName string) string {
serviceUuid := idiscovery.NewServiceUUID(interfaceName)
return uuid.UUID(serviceUuid).String() + serviceNameSuffix
}
func (p *plugin) Advertise(ctx *context.T, adinfo *idiscovery.AdInfo, done func()) (err error) {
serviceName := interfaceNameToServiceName(adinfo.Ad.InterfaceName)
// We use the instance uuid as the host name so that we can get the instance uuid
// from the lost service instance, which has no txt records at all.
hostName := encodeAdId(&adinfo.Ad.Id)
txt, err := newTxtRecords(adinfo)
if err != nil {
done()
return err
}
// Announce the service.
err = p.mdns.AddService(serviceName, hostName, 0, txt...)
if err != nil {
done()
return err
}
// Announce it as v23 service as well so that we can discover
// all v23 services through mDNS.
err = p.mdns.AddService(v23ServiceName, hostName, 0, txt...)
if err != nil {
done()
return err
}
stop := func() {
p.mdns.RemoveService(serviceName, hostName, 0, txt...)
p.mdns.RemoveService(v23ServiceName, hostName, 0, txt...)
done()
}
p.adStopper.Add(stop, ctx.Done())
return nil
}
func (p *plugin) Scan(ctx *context.T, interfaceName string, callback func(*idiscovery.AdInfo), done func()) error {
var serviceName string
if len(interfaceName) == 0 {
serviceName = v23ServiceName
} else {
serviceName = interfaceNameToServiceName(interfaceName)
}
go func() {
defer done()
p.subscribeToService(serviceName)
watcher, stopWatcher := p.mdns.ServiceMemberWatch(serviceName)
defer func() {
stopWatcher()
p.unsubscribeFromService(serviceName)
}()
for {
var service mdns.ServiceInstance
select {
case service = <-watcher:
case <-time.After(p.subscriptionRefreshTime):
p.refreshSubscription(serviceName)
continue
case <-ctx.Done():
return
}
adinfo, err := newAdInfo(service)
if err != nil {
ctx.Error(err)
continue
}
callback(adinfo)
}
}()
return nil
}
func (p *plugin) Close() {
close(p.closed)
p.wg.Wait()
}
func (p *plugin) watchNetConfig() {
defer p.wg.Done()
// Watch the network configuration so that we can make MDNS reattach to
// interfaces when the network changes.
for {
ch, err := netconfig.NotifyChange()
if err != nil {
p.ctx.Error(err)
return
}
select {
case <-ch:
if _, err := p.mdns.ScanInterfaces(); err != nil {
p.ctx.Error(err)
}
case <-p.closed:
return
}
}
}
func (p *plugin) subscribeToService(serviceName string) {
p.subscriptionMu.Lock()
sub := p.subscription[serviceName]
sub.count++
p.subscription[serviceName] = sub
p.subscriptionMu.Unlock()
p.refreshSubscription(serviceName)
}
func (p *plugin) unsubscribeFromService(serviceName string) {
p.subscriptionMu.Lock()
sub := p.subscription[serviceName]
sub.count--
if sub.count == 0 {
delete(p.subscription, serviceName)
p.mdns.UnsubscribeFromService(serviceName)
} else {
p.subscription[serviceName] = sub
}
p.subscriptionMu.Unlock()
}
func (p *plugin) refreshSubscription(serviceName string) {
p.subscriptionMu.Lock()
sub, ok := p.subscription[serviceName]
if !ok {
p.subscriptionMu.Unlock()
return
}
// Subscribe to the service again if we haven't refreshed in a while.
if time.Since(sub.lastSubscription) > p.subscriptionRefreshTime {
p.mdns.SubscribeToService(serviceName)
// Wait a bit to learn about neighborhood.
time.Sleep(p.subscriptionWaitTime)
sub.lastSubscription = time.Now()
}
p.subscription[serviceName] = sub
p.subscriptionMu.Unlock()
}
type txtRecords struct {
size int
txts []string
}
type byTxtSize []txtRecords
func (r byTxtSize) Len() int { return len(r) }
func (r byTxtSize) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r byTxtSize) Less(i, j int) bool { return r[i].size < r[j].size }
func sizeOfTxtRecords(txts []string) int {
n := 0
for _, txt := range txts {
n += len(txt)
}
return n
}
func appendTxtRecord(txts []string, k string, v interface{}, xseq int) ([]string, int) {
var buf bytes.Buffer
buf.WriteString(k)
buf.WriteByte('=')
if s, ok := v.(string); ok {
buf.WriteString(s)
} else {
buf.Write(v.([]byte))
}
kvTxts, xseq := maybeSplitTxtRecord(buf.String(), xseq)
return append(txts, kvTxts...), xseq
}
func packTxtRecords(status idiscovery.AdStatus, vtxts ...[]string) ([]string, error) {
var packed []string
if status != idiscovery.AdReady {
packed, _ = appendTxtRecord(packed, attrStatus, strconv.Itoa(int(status)), 0)
}
for _, txts := range vtxts {
packed = append(packed, txts...)
}
if sizeOfTxtRecords(packed) > maxTotalTxtRecordsLen {
return nil, errMaxTxtRecordLenExceeded
}
return packed, nil
}
func newTxtRecords(adinfo *idiscovery.AdInfo) ([]string, error) {
core, xseq := appendTxtRecord(nil, attrInterface, adinfo.Ad.InterfaceName, 0)
core, xseq = appendTxtRecord(core, attrHash, adinfo.Hash[:], xseq)
core, xseq = appendTxtRecord(core, attrTimestamp, idiscovery.EncodeTimestamp(adinfo.TimestampNs), xseq)
coreLen := sizeOfTxtRecords(core)
dir, xseq := appendTxtRecord(nil, attrDirAddrs, idiscovery.PackAddresses(adinfo.DirAddrs), xseq)
dirLen := sizeOfTxtRecords(dir)
required, xseq := appendTxtRecord(nil, attrAddresses, idiscovery.PackAddresses(adinfo.Ad.Addresses), xseq)
for k, v := range adinfo.Ad.Attributes {
required, xseq = appendTxtRecord(required, k, v, xseq)
}
if adinfo.EncryptionAlgorithm != idiscovery.NoEncryption {
enc := idiscovery.PackEncryptionKeys(adinfo.EncryptionAlgorithm, adinfo.EncryptionKeys)
required, xseq = appendTxtRecord(required, attrEncryption, enc, xseq)
}
requiredLen := sizeOfTxtRecords(required)
remainingLen := maxTotalTxtRecordsLen - coreLen - requiredLen
// Short-cut for a very large advertisement.
if remainingLen < 0 {
return packTxtRecords(idiscovery.AdNotReady, core, dir)
}
extra := make([]txtRecords, 0, len(adinfo.Ad.Attachments))
extraLen := 0
for k, v := range adinfo.Ad.Attachments {
if xseq >= maxNumLargeTxtRecords {
break
}
if len(v) > remainingLen {
continue
}
var txts []string
txts, xseq = appendTxtRecord(nil, attrAttachmentPrefix+k, v, xseq)
n := sizeOfTxtRecords(txts)
extra = append(extra, txtRecords{n, txts})
extraLen += n
}
if len(extra) == len(adinfo.Ad.Attachments) && extraLen <= remainingLen {
// The advertisement can fit in a packet.
full := make([]string, 0, len(extra))
for _, r := range extra {
full = append(full, r.txts...)
}
return packTxtRecords(idiscovery.AdReady, core, required, full)
}
const statusTxtRecordLen = 4 // _s=<s>, where <s> is one byte.
remainingLen -= dirLen + statusTxtRecordLen
if remainingLen < 0 {
// Not all required fields can fit in a packet.
return packTxtRecords(idiscovery.AdNotReady, core, dir)
}
sort.Sort(byTxtSize(extra))
selected := make([]string, 0, len(extra))
for _, r := range extra {
if remainingLen >= r.size {
selected = append(selected, r.txts...)
remainingLen -= r.size
}
if remainingLen <= 0 {
break
}
}
return packTxtRecords(idiscovery.AdPartiallyReady, core, dir, required, selected)
}
func newAdInfo(service mdns.ServiceInstance) (*idiscovery.AdInfo, error) {
// Note that service.Name starts with a host name, which is the instance uuid.
p := strings.SplitN(service.Name, ".", 2)
if len(p) < 1 {
return nil, fmt.Errorf("invalid service name: %s", service.Name)
}
adinfo := &idiscovery.AdInfo{}
if err := decodeAdId(p[0], &adinfo.Ad.Id); err != nil {
return nil, fmt.Errorf("invalid host name: %v", err)
}
if len(service.SrvRRs) == 0 && len(service.TxtRRs) == 0 {
adinfo.Lost = true
return adinfo, nil
}
adinfo.Ad.Attributes = make(discovery.Attributes)
adinfo.Ad.Attachments = make(discovery.Attachments)
for _, rr := range service.TxtRRs {
txts, err := maybeJoinTxtRecords(rr.Txt)
if err != nil {
return nil, err
}
for _, txt := range txts {
kv := strings.SplitN(txt, "=", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid txt record: %s", txt)
}
switch k, v := kv[0], kv[1]; k {
case attrInterface:
adinfo.Ad.InterfaceName = v
case attrAddresses:
if adinfo.Ad.Addresses, err = idiscovery.UnpackAddresses([]byte(v)); err != nil {
return nil, err
}
case attrEncryption:
if adinfo.EncryptionAlgorithm, adinfo.EncryptionKeys, err = idiscovery.UnpackEncryptionKeys([]byte(v)); err != nil {
return nil, err
}
case attrHash:
copy(adinfo.Hash[:], []byte(v))
case attrTimestamp:
if adinfo.TimestampNs, err = idiscovery.DecodeTimestamp([]byte(v)); err != nil {
return nil, err
}
case attrDirAddrs:
if adinfo.DirAddrs, err = idiscovery.UnpackAddresses([]byte(v)); err != nil {
return nil, err
}
case attrStatus:
status, err := strconv.Atoi(v)
if err != nil {
return nil, err
}
adinfo.Status = idiscovery.AdStatus(status)
default:
if strings.HasPrefix(k, attrAttachmentPrefix) {
adinfo.Ad.Attachments[k[len(attrAttachmentPrefix):]] = []byte(v)
} else {
adinfo.Ad.Attributes[k] = v
}
}
}
}
return adinfo, nil
}
func New(ctx *context.T, host string) (idiscovery.Plugin, error) {
return newWithLoopback(ctx, host, 0, false)
}
func newWithLoopback(ctx *context.T, host string, port int, loopback bool) (idiscovery.Plugin, error) {
switch {
case len(host) == 0:
// go-mdns-sd doesn't answer when the host name is not set.
// Assign a default one if not given.
host = "v23()"
case host == "localhost":
// localhost is a default host name in many places.
// Add a suffix for uniqueness.
host += "()"
}
var v4addr, v6addr string
if port > 0 {
v4addr = fmt.Sprintf("224.0.0.251:%d", port)
v6addr = fmt.Sprintf("[FF02::FB]:%d", port)
}
m, err := mdns.NewMDNS(host, v4addr, v6addr, loopback, 0)
if err != nil {
// The name may not have been unique. Try one more time with a unique
// name. NewMDNS will replace the "()" with "(hardware mac address)".
if len(host) > 0 && !strings.HasSuffix(host, "()") {
m, err = mdns.NewMDNS(host+"()", "", "", loopback, 0)
}
if err != nil {
return nil, err
}
}
p := plugin{
ctx: ctx,
closed: make(chan struct{}),
mdns: m,
adStopper: idiscovery.NewTrigger(),
// TODO(jhahn): Figure out a good subscription refresh time.
subscriptionRefreshTime: 5 * time.Second,
subscription: make(map[string]subscription),
}
if loopback {
p.subscriptionWaitTime = 5 * time.Millisecond
} else {
p.subscriptionWaitTime = 50 * time.Millisecond
}
p.wg.Add(1)
go p.watchNetConfig()
return &p, nil
}