blob: fa0d09c2e11f353409a4e88cd3bce83455f30c72 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package discovery
import (
"bytes"
"fmt"
"sort"
"strings"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/security"
wire "v.io/v23/services/syncbase"
"v.io/x/lib/nsync"
"v.io/x/ref/lib/discovery/global"
"v.io/x/ref/services/syncbase/server/interfaces"
)
const (
visibilityKey = "vis"
nhDiscoveryKey = iota
globalDiscoveryKey
)
// Discovery implements v.io/v23/discovery.T for syncbase based
// applications.
// TODO(mattr): Actually this is not syncbase specific. At some
// point we should just replace the result of v23.NewDiscovery
// with this.
type Discovery struct {
nhDiscovery discovery.T
globalDiscovery discovery.T
}
// NewDiscovery creates a new syncbase discovery object.
// globalDiscoveryPath is the path in the namespace where global disovery
// advertisements will be mounted.
// If globalDiscoveryPath is empty, no global discovery service will be created.
// globalScanInterval is the interval at which global discovery will be refreshed.
// If globalScanInterval is 0, the defaultScanInterval of global discovery will
// be used.
func NewDiscovery(ctx *context.T, globalDiscoveryPath string, globalScanInterval time.Duration) (discovery.T, error) {
d := &Discovery{}
var err error
if d.nhDiscovery, err = v23.NewDiscovery(ctx); err != nil {
return nil, err
}
if globalDiscoveryPath != "" {
if d.globalDiscovery, err = global.NewWithTTL(ctx, globalDiscoveryPath, 0, globalScanInterval); err != nil {
return nil, err
}
}
return d, nil
}
// Scan implements v.io/v23/discovery/T.Scan.
func (d *Discovery) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
nhCtx, nhCancel := context.WithCancel(ctx)
nhUpdates, err := d.nhDiscovery.Scan(nhCtx, query)
if err != nil {
nhCancel()
return nil, err
}
var globalUpdates <-chan discovery.Update
if d.globalDiscovery != nil {
if globalUpdates, err = d.globalDiscovery.Scan(ctx, query); err != nil {
nhCancel()
return nil, err
}
}
// Currently setting visibility on the neighborhood discovery
// service turns IBE encryption on. We currently don't have the
// infrastructure support for IBE, so that would make our advertisements
// unreadable by everyone.
// Instead we add the visibility list to the attributes of the advertisement
// and filter on the client side. This is a temporary measure until
// IBE is set up. See v.io/i/1345.
updates := make(chan discovery.Update)
go func() {
defer close(updates)
seen := make(map[discovery.AdId]*updateRef)
for {
var u discovery.Update
var src uint // key of the source discovery service where the update came from
select {
case <-ctx.Done():
return
case u = <-nhUpdates:
src = nhDiscoveryKey
case u = <-globalUpdates:
src = globalDiscoveryKey
}
d.handleUpdate(ctx, u, src, seen, updates)
}
}()
return updates, nil
}
func (d *Discovery) handleUpdate(ctx *context.T, u discovery.Update, src uint, seen map[discovery.AdId]*updateRef, updates chan discovery.Update) {
if u == nil {
return
}
patterns := splitPatterns(u.Attribute(visibilityKey))
if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
return
}
id := u.Id()
prev := seen[id]
if u.IsLost() {
// Only send the lost noitification if a found event was previously seen,
// and all discovery services that found it have lost it.
if prev == nil || !prev.unset(src) {
return
}
delete(seen, id)
updates <- update{Update: u, lost: true}
return
}
if prev == nil {
// Always send updates for updates that we have never seen before.
ref := &updateRef{update: u}
ref.set(src)
seen[id] = ref
updates <- update{Update: u}
return
}
if differ := updatesDiffer(prev.update, u); (differ && u.Timestamp().After(prev.update.Timestamp())) ||
(!differ && src == nhDiscoveryKey && len(u.Advertisement().Attachments) > 0) {
// If the updates differ and the newly found update has a later time than
// previously found one, lose prev and find new.
// Or, if the update doesn't differ, but is from neighborhood discovery, it
// could have more information since we don't yet encode attachements in
// global discovery.
updates <- update{Update: prev.update, lost: true}
ref := &updateRef{update: u}
ref.set(src)
seen[id] = ref
updates <- update{Update: u}
return
}
}
// Advertise implements v.io/v23/discovery/T.Advertise.
func (d *Discovery) Advertise(ctx *context.T, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
// Currently setting visibility on the neighborhood discovery
// service turns IBE encryption on. We currently don't have the
// infrastructure support for IBE, so that would make our advertisements
// unreadable by everyone.
// Instead we add the visibility list to the attributes of the advertisement
// and filter on the client side. This is a temporary measure until
// IBE is set up. See v.io/i/1345.
adCopy := *ad
if len(visibility) > 0 {
adCopy.Attributes = make(discovery.Attributes, len(ad.Attributes)+1)
for k, v := range ad.Attributes {
adCopy.Attributes[k] = v
}
patterns := joinPatterns(visibility)
adCopy.Attributes[visibilityKey] = patterns
}
stopped := make(chan struct{})
nhCtx, nhCancel := context.WithCancel(ctx)
nhStopped, err := d.nhDiscovery.Advertise(nhCtx, &adCopy, nil)
if err != nil {
nhCancel()
return nil, err
}
var globalStopped <-chan struct{}
if d.globalDiscovery != nil {
if globalStopped, err = d.globalDiscovery.Advertise(ctx, &adCopy, nil); err != nil {
nhCancel()
<-nhStopped
return nil, err
}
}
go func() {
<-nhStopped
if d.globalDiscovery != nil {
<-globalStopped
}
close(stopped)
}()
ad.Id = adCopy.Id
return stopped, nil
}
func updatesDiffer(a, b discovery.Update) bool {
if !sortedStringsEqual(a.Addresses(), b.Addresses()) {
return true
}
if !mapsEqual(a.Advertisement().Attributes, b.Advertisement().Attributes) {
return true
}
return false
}
func mapsEqual(a, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for ka, va := range a {
if vb, ok := b[ka]; !ok || va != vb {
return false
}
}
return true
}
func sortedStringsEqual(a, b []string) bool {
// We want to make a nil and an empty slices equal to avoid unnecessary inequality by that.
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
func matchesPatterns(ctx *context.T, patterns []security.BlessingPattern) bool {
p := v23.GetPrincipal(ctx)
blessings := p.BlessingStore().PeerBlessings()
for _, b := range blessings {
names := security.BlessingNames(p, b)
for _, pattern := range patterns {
if pattern.MatchedBy(names...) {
return true
}
}
}
return false
}
type updateRef struct {
update discovery.Update
nhRef bool
globalRef bool
}
func (r *updateRef) set(d uint) {
switch d {
case nhDiscoveryKey:
r.nhRef = true
case globalDiscoveryKey:
r.globalRef = true
}
}
func (r *updateRef) unset(d uint) bool {
switch d {
case nhDiscoveryKey:
r.nhRef = false
case globalDiscoveryKey:
r.globalRef = false
}
return !r.nhRef && !r.globalRef
}
// update wraps the discovery.Update to remove the visibility attribute which we add
// and allows us to mark the update as lost.
type update struct {
discovery.Update
lost bool
}
func (u update) IsLost() bool { return u.lost }
func (u update) Attribute(name string) string {
if name == visibilityKey {
return ""
}
return u.Update.Attribute(name)
}
func (u update) Advertisement() discovery.Advertisement {
cp := u.Update.Advertisement()
orig := cp.Attributes
cp.Attributes = make(discovery.Attributes, len(orig))
for k, v := range orig {
if k != visibilityKey {
cp.Attributes[k] = v
}
}
return cp
}
// blessingSeparator is used to join multiple blessings into a
// single string.
// Note that comma cannot appear in blessings, see:
// v.io/v23/security/certificate.go
const blessingsSeparator = ','
// joinPatterns concatenates the elements of a to create a single string.
// The string can be split again with SplitPatterns.
func joinPatterns(a []security.BlessingPattern) string {
if len(a) == 0 {
return ""
}
if len(a) == 1 {
return string(a[0])
}
n := (len(a) - 1)
for i := 0; i < len(a); i++ {
n += len(a[i])
}
b := make([]byte, n)
bp := copy(b, a[0])
for _, s := range a[1:] {
b[bp] = blessingsSeparator
bp++
bp += copy(b[bp:], s)
}
return string(b)
}
// splitPatterns splits BlessingPatterns that were joined with
// JoinBlessingPattern.
func splitPatterns(patterns string) []security.BlessingPattern {
if patterns == "" {
return nil
}
n := strings.Count(patterns, string(blessingsSeparator)) + 1
out := make([]security.BlessingPattern, n)
last, start := 0, 0
for i, r := range patterns {
if r == blessingsSeparator {
out[last] = security.BlessingPattern(patterns[start:i])
last++
start = i + 1
}
}
out[last] = security.BlessingPattern(patterns[start:])
return out
}
var state struct {
mu nsync.Mu
scans map[security.Principal]*scanState
}
type scanState struct {
dbs map[wire.Id]*inviteQueue
listeners int
cancel context.CancelFunc
}
func newScan(ctx *context.T) (*scanState, error) {
ctx, cancel := context.WithRootCancel(ctx)
scan := &scanState{
dbs: make(map[wire.Id]*inviteQueue),
cancel: cancel,
}
// TODO(suharshs): Add globalDiscoveryPath.
d, err := NewDiscovery(ctx, "", 0)
if err != nil {
scan.cancel()
return nil, err
}
query := fmt.Sprintf("v.InterfaceName=\"%s/%s\"",
interfaces.SyncDesc.PkgPath, interfaces.SyncDesc.Name)
updates, err := d.Scan(ctx, query)
if err != nil {
scan.cancel()
return nil, err
}
go func() {
for u := range updates {
if invite, db, ok := makeInvite(u.Advertisement()); ok {
state.mu.Lock()
q := scan.dbs[db]
if u.IsLost() {
// TODO(mattr): Removing like this can result in resurfacing already
// retrieved invites. For example if the ACL of a syncgroup is
// changed to add a new memeber, then we might see a remove and then
// later an add. In this case we would end up moving the invite
// to the end of the queue. One way to fix this would be to
// keep removed invites for some time.
if q != nil {
q.remove(invite)
if q.empty() {
delete(scan.dbs, db)
}
}
} else {
if q == nil {
q = newInviteQueue()
scan.dbs[db] = q
}
q.add(invite)
q.cond.Broadcast()
}
state.mu.Unlock()
}
}
}()
return scan, nil
}
// Invite represents an invitation to join a syncgroup as found via Discovery.
type Invite struct {
Syncgroup wire.Id //Syncgroup is the Id of the syncgroup you've been invited to.
Addresses []string //Addresses are the list of addresses of the inviting server.
key string
}
// ListenForInvites listens via Discovery for syncgroup invitations for the given
// database and sends the invites to the provided channel. We stop listening when
// the given context is canceled. When that happens we close the given channel.
func ListenForInvites(ctx *context.T, db wire.Id, ch chan<- Invite) error {
defer state.mu.Unlock()
state.mu.Lock()
p := v23.GetPrincipal(ctx)
scan := state.scans[p]
if scan == nil {
var err error
if scan, err = newScan(ctx); err != nil {
return err
}
if state.scans == nil {
state.scans = make(map[security.Principal]*scanState)
}
state.scans[p] = scan
}
scan.listeners++
q := scan.dbs[db]
if q == nil {
q = newInviteQueue()
scan.dbs[db] = q
}
go func() {
c := q.scan()
for {
next, ok := q.next(ctx, c)
if !ok {
break
}
select {
case ch <- next.copy():
case <-ctx.Done():
break
}
}
close(ch)
defer state.mu.Unlock()
state.mu.Lock()
if q.empty() {
delete(scan.dbs, db)
}
scan.listeners--
if scan.listeners == 0 {
scan.cancel()
delete(state.scans, v23.GetPrincipal(ctx))
}
}()
return nil
}
// inviteQueue is a linked list based queue. As we get new invitations we
// add them to the queue, and when advertisements are lost we remove elements.
// Each call to scan creates a cursor on the queue that will iterate until
// the Listen is canceled. We wait when we hit the end of the queue via the
// condition variable cond.
type inviteQueue struct {
mu nsync.Mu
cond nsync.CV
elems map[string]*ielement
sentinel ielement
cursors map[int]*ielement
nextCursorId int
}
func (q *inviteQueue) debugLocked() string {
buf := &bytes.Buffer{}
fmt.Fprintf(buf, "*%p", &q.sentinel)
for c := q.sentinel.next; c != &q.sentinel; c = c.next {
fmt.Fprintf(buf, " %p", c)
}
return buf.String()
}
type ielement struct {
invite Invite
prev, next *ielement
}
func newInviteQueue() *inviteQueue {
iq := &inviteQueue{
elems: make(map[string]*ielement),
cursors: make(map[int]*ielement),
}
iq.sentinel.next, iq.sentinel.prev = &iq.sentinel, &iq.sentinel
return iq
}
func (q *inviteQueue) empty() bool {
defer q.mu.Unlock()
q.mu.Lock()
return len(q.elems) == 0 && len(q.cursors) == 0
}
func (q *inviteQueue) size() int {
defer q.mu.Unlock()
q.mu.Lock()
return len(q.elems)
}
func (q *inviteQueue) remove(i Invite) {
defer q.mu.Unlock()
q.mu.Lock()
el, ok := q.elems[i.key]
if !ok {
return
}
delete(q.elems, i.key)
el.next.prev, el.prev.next = el.prev, el.next
for id, c := range q.cursors {
if c == el {
q.cursors[id] = c.prev
}
}
}
func (q *inviteQueue) add(i Invite) {
defer q.mu.Unlock()
q.mu.Lock()
if _, ok := q.elems[i.key]; ok {
return
}
el := &ielement{i, q.sentinel.prev, &q.sentinel}
q.sentinel.prev, q.sentinel.prev.next = el, el
q.elems[i.key] = el
q.cond.Broadcast()
}
func (q *inviteQueue) next(ctx *context.T, cursor int) (Invite, bool) {
defer q.mu.Unlock()
q.mu.Lock()
c, exists := q.cursors[cursor]
if !exists {
return Invite{}, false
}
for c.next == &q.sentinel {
if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
delete(q.cursors, cursor)
return Invite{}, false
}
c = q.cursors[cursor]
}
c = c.next
q.cursors[cursor] = c
return c.invite, true
}
func (q *inviteQueue) scan() int {
defer q.mu.Unlock()
q.mu.Lock()
id := q.nextCursorId
q.nextCursorId++
q.cursors[id] = &q.sentinel
return id
}
func makeInvite(ad discovery.Advertisement) (Invite, wire.Id, bool) {
var dbId, sgId wire.Id
var ok bool
if dbId.Name, ok = ad.Attributes[wire.DiscoveryAttrDatabaseName]; !ok {
return Invite{}, dbId, false
}
if dbId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrDatabaseBlessing]; !ok {
return Invite{}, dbId, false
}
if sgId.Name, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupName]; !ok {
return Invite{}, dbId, false
}
if sgId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupBlessing]; !ok {
return Invite{}, dbId, false
}
i := Invite{
Addresses: append([]string{}, ad.Addresses...),
Syncgroup: sgId,
}
sort.Strings(i.Addresses)
i.key = fmt.Sprintf("%v", i)
return i, dbId, true
}
func (i Invite) copy() Invite {
cp := i
cp.Addresses = append([]string(nil), i.Addresses...)
return cp
}