blob: 41669e1c0af39c91ac7da50890534e48af6d73b4 [file] [log] [blame]
// A mdns based config service. We make an mdns group for each config.
// Mdns is probably an over kill for this but why use two when one will do?
// The service provides an eventually consistent map to all nodes on the
// network. The winning map is the one with the highest version number.
// If a server stores the map in a file, it can also be a source of the
// map
package config
import (
"fmt"
"io/ioutil"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
verror "veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
"github.com/presotto/go-mdns-sd"
"github.com/presotto/go-mdns-sd/go_dns"
)
const maxDNSStringLength = 254
type config struct {
version uint64
pairs map[string]string
}
type configService struct {
rwlock sync.RWMutex // protects elements of this structure.
mdns *mdns.MDNS
service string
file string // file containing config
current *config // the highest numbered config
done chan bool // closed to tell children to go away
change *sync.Cond // condition variable broadcast to on every config change
gen int // incremented every config change
}
const pkgPath = "veyron.io/veyron/veyron/services/config/lib"
// Errors
var (
errCantParse = verror.Register(pkgPath+".errCantParse", verror.NoRetry, "{1:}{2:} can't parse{:_}")
errEntryTooLong = verror.Register(pkgPath+".errEntryTooLong", verror.NoRetry, "{1:}{2:} entry {3}:{4} is too long{:_}")
errNoFileToRead = verror.Register(pkgPath+".errNoFileToRead", verror.NoRetry, "{1:}{2:} no file to read{:_}")
errFileError = verror.Register(pkgPath+".errFileError", verror.NoRetry, "{1:}{2:} file {3}{:_}")
errMissingLegalVersion = verror.Register(pkgPath+".errMissingLegalVersion", verror.NoRetry, "{1:}{2:} missing legal version for file{:_}")
errMissingConfigVersion = verror.Register(pkgPath+".errMissingConfigVersion", verror.NoRetry, "{1:}{2:} missing config version{:_}")
errNoConfig = verror.Register(pkgPath+".errNoConfig", verror.NoRetry, "{1:}{2:} no config{:_}")
errConfigHasNoKey = verror.Register(pkgPath+".errConfigHasNoKey", verror.NoRetry, "{1:}{2:} config has no key {3}{:_}")
errOfferingConfigError = verror.Register(pkgPath+".errOfferingConfigError", verror.NoRetry, "{1:}{2:} offering config {3}{:_}")
)
// MDNSConfigService creates a new instance of the config service with the given name.
// If file is non blank, the initial config is read from file and any learned configs are
// stored in it. Only instances with a file to backup will offer configs to the net.
// All other instances are passive.
func MDNSConfigService(name, file string, loopback bool, port uint16) (ConfigService, error) {
x := filepath.Base(file)
if x == "." {
x = ""
}
var ipv4hp, ipv6hp string
if port != 0 {
ipv4hp = fmt.Sprintf("224.0.0.251:%d", port)
ipv6hp = fmt.Sprintf("[FF02::FB]:%d", port)
}
mdns, err := mdns.NewMDNS(x, ipv4hp, ipv6hp, loopback, false)
if err != nil {
vlog.Errorf("mdns startup failed: %s", err)
return nil, err
}
cs := &configService{mdns: mdns, service: name + "-config", file: file, done: make(chan bool)}
cs.change = sync.NewCond(&cs.rwlock)
// Read the config file if we have one and offer it to everyone else.
if cs.current, err = readFile(file); err != nil {
vlog.Errorf("reading initial config: %s", err)
}
cs.Offer()
// Watch config changes and remember them.
go cs.watcher()
return cs, nil
}
// Stop the service.
func (cs *configService) Stop() {
cs.rwlock.Lock()
mdns := cs.mdns
cs.mdns = nil
cs.rwlock.Unlock()
if mdns != nil {
mdns.Stop()
close(cs.done)
}
}
func newConfig() *config {
return &config{pairs: make(map[string]string)}
}
// parseVersion parses a config version. The version is a pair of uint32s separated by a '.'.
// If the second is missing, we assume 0. The first number is for humans. The
// second is to break ties if machines generate configs.
func parseVersion(s string) (uint64, error) {
f := strings.SplitN(s, ".", 2)
v, err := strconv.ParseUint(f[0], 10, 32)
if err != nil {
return 0, err
}
var r uint64
if len(f) == 2 {
r, err = strconv.ParseUint(f[1], 10, 32)
if err != nil {
return 0, err
}
}
return (v << 32) | r, nil
}
func serializeVersion(version uint64) string {
return fmt.Sprintf("%d.%d", version>>32, version&0xffffffff)
}
// parseEntry parse an entry of the form "key : value" and
// add it to the map. White space before and after "key" and
// value is discarded.
func (c *config) parseEntry(l string) error {
// Ignore lines with nothing but white space or starting with #
l = strings.TrimSpace(l)
if len(l) == 0 || strings.HasPrefix(l, "#") {
return nil
}
// The reset have to be key<white>*:<white>*value
f := strings.SplitN(l, ":", 2)
if len(f) != 2 {
return verror.Make(errCantParse, nil, l)
}
k := strings.TrimSpace(f[0])
v := strings.TrimSpace(f[1])
if len(k)+len(v) > maxDNSStringLength {
return verror.Make(errEntryTooLong, nil, k, v)
}
c.pairs[k] = v
if k != "version" {
return nil
}
var err error
c.version, err = parseVersion(v)
return err
}
func serializeEntry(k, v string) (string, error) {
if len(k)+len(v) > maxDNSStringLength {
return "", verror.Make(errEntryTooLong, nil, k, v)
}
return k + ":" + v, nil
}
func readFile(file string) (*config, error) {
if len(file) == 0 {
return nil, verror.Make(errNoFileToRead, nil)
}
// The config has to be small so just read it all in one go.
b, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
c := newConfig()
for _, l := range strings.Split(string(b), "\n") {
if err := c.parseEntry(l); err != nil {
return nil, verror.Make(errFileError, nil, file, err)
}
}
if _, ok := c.pairs["version"]; !ok {
return nil, verror.Make(errMissingLegalVersion, nil, file)
}
return c, nil
}
// writeFile is called with the write lock held.
func writeFile(file string, c *config) {
if len(file) == 0 || c == nil || len(c.pairs) == 0 {
return
}
var s string
for k, v := range c.pairs {
e, err := serializeEntry(k, v)
if err != nil {
vlog.Errorf("writing %s: %s", file, err)
return
}
s += e + "\n"
}
if err := ioutil.WriteFile(file, []byte(s), 0644); err != nil {
vlog.Errorf("writing %s: %q", file, err)
}
}
// rrToConfig converts a set of TXT rrs to a config.
func rrToConfig(rr *dns.RR_TXT) (*config, error) {
c := newConfig()
for _, s := range rr.Txt {
if err := c.parseEntry(s); err != nil {
return nil, err
}
}
// Ignore any config with no version.
if _, ok := c.pairs["version"]; !ok {
return nil, verror.Make(errMissingConfigVersion, nil)
}
return c, nil
}
func (cs *configService) watchSingle(key string, c chan Pair) {
ov, err := cs.Get(key)
oexists := err == nil
c <- Pair{Key: key, Value: ov, Nonexistant: !oexists}
gen := 0
for {
v, err := cs.Get(key)
exists := err == nil
if exists != oexists || v != ov {
c <- Pair{Key: key, Value: v, Nonexistant: !exists}
ov, oexists = v, exists
}
// Block waiting for a change.
cs.rwlock.Lock()
for gen == cs.gen {
cs.change.Wait()
}
gen = cs.gen
cs.rwlock.Unlock()
}
}
func (cs *configService) watchAll(c chan Pair) {
omap := make(map[string]string, 0)
gen := 0
for {
nmap, err := cs.GetAll()
if err != nil {
nmap = make(map[string]string, 0)
}
// See if any key disappeared.
for k := range omap {
if _, ok := nmap[k]; !ok {
c <- Pair{Key: k, Value: "", Nonexistant: true}
}
}
// See if any value changed or new key appeared.
for k, v := range nmap {
ov, ok := omap[k]
if !ok || v != ov {
c <- Pair{Key: k, Value: v}
}
}
omap = nmap
// Block waiting for a change.
cs.rwlock.Lock()
for gen == cs.gen {
cs.change.Wait()
}
gen = cs.gen
cs.rwlock.Unlock()
}
}
// watcher waits for config changes and remembers them.
// TODO(p): Should we also watch the file for changes?
func (cs *configService) watcher() {
cs.rwlock.RLock()
if cs.mdns == nil {
cs.rwlock.RUnlock()
return
}
c := cs.mdns.ServiceMemberWatch(cs.service)
cs.mdns.SubscribeToService(cs.service)
cs.rwlock.RUnlock()
defer close(c)
for {
select {
case si := <-c:
var config *config
for _, rr := range si.TxtRRs {
c, err := rrToConfig(rr)
if err != nil {
continue
}
if config == nil || c.version > config.version {
config = c
}
}
if config == nil {
continue
}
// This lock is to synchronize writing and reading of the data structure.
cs.rwlock.Lock()
if cs.current != nil && config.version <= cs.current.version {
cs.rwlock.Unlock()
continue
}
cs.current = config
cs.gen++
cs.change.Broadcast()
writeFile(cs.file, cs.current)
cs.rwlock.Unlock()
cs.Offer()
case <-cs.done:
return
}
}
}
// Get returns the value associated with key.
func (cs *configService) Get(key string) (string, error) {
cs.rwlock.RLock()
defer cs.rwlock.RUnlock()
if cs.current == nil {
return "", verror.Make(errNoConfig, nil)
}
if v, ok := cs.current.pairs[key]; !ok {
return "", verror.Make(errConfigHasNoKey, nil, key)
} else {
return v, nil
}
}
// Watch for changes to a particular key.
func (cs *configService) Watch(key string) chan Pair {
c := make(chan Pair)
go cs.watchSingle(key, c)
return c
}
// Get returns the complete configuration.
func (cs *configService) GetAll() (map[string]string, error) {
cs.rwlock.RLock()
defer cs.rwlock.RUnlock()
if cs.current == nil {
return nil, verror.Make(errNoConfig, nil)
}
// Copy so caller can't change the map under our feet.
reply := make(map[string]string)
for k, v := range cs.current.pairs {
reply[k] = v
}
return reply, nil
}
// WatchAll watches for changes to any key.
func (cs *configService) WatchAll() chan Pair {
c := make(chan Pair)
go cs.watchAll(c)
return c
}
// Offer offers the pairs for the config to other servers.
func (cs *configService) Offer() {
cs.rwlock.RLock()
defer cs.rwlock.RUnlock()
if cs.mdns == nil || cs.current == nil || len(cs.file) == 0 {
return
}
// First sort the keys to get a canonical list for the txt entries.
keys := make([]string, 0)
for k := range cs.current.pairs {
keys = append(keys, k)
}
sort.Strings(keys)
// Convert config to a slice of strings.
var txt []string
for _, k := range keys {
e, err := serializeEntry(k, cs.current.pairs[k])
if err != nil {
verror.Make(errOfferingConfigError, nil, cs.file, err)
return
}
txt = append(txt, e)
}
// Send (and keep sending).
cs.mdns.AddService(cs.service, "", 0, txt...)
}
// Reread the config file and remember it if the version is newer than current.
func (cs *configService) Reread() error {
cs.rwlock.Lock()
file := cs.file
if len(file) == 0 {
cs.rwlock.Unlock()
return nil
}
c, err := readFile(file)
if err != nil {
cs.rwlock.Unlock()
return err
}
if cs.current != nil && c.version <= cs.current.version {
cs.rwlock.Unlock()
return nil
}
cs.current = c
cs.gen++
cs.change.Broadcast()
cs.rwlock.Unlock()
cs.Offer()
return nil
}