blob: 2688d27d77d0edcac8dc593ef6966dcdc3ea405a [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package discovery
import (
"bytes"
"fmt"
"sort"
"strings"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/security"
wire "v.io/v23/services/syncbase"
"v.io/x/lib/nsync"
"v.io/x/ref/services/syncbase/server/interfaces"
)
const visibilityKey = "vis"
// Discovery implements v.io/v23/discovery.T for syncbase based
// applications.
// TODO(mattr): Actually this is not syncbase specific. At some
// point we should just replace the result of v23.NewDiscovery
// with this.
type Discovery struct {
nhDiscovery discovery.T
// TODO(mattr): Add global discovery.
}
// NewDiscovery creates a new syncbase discovery object.
func NewDiscovery(ctx *context.T) (discovery.T, error) {
nhDiscovery, err := v23.NewDiscovery(ctx)
if err != nil {
return nil, err
}
return &Discovery{nhDiscovery: nhDiscovery}, nil
}
// Scan implements v.io/v23/discovery/T.Scan.
func (d *Discovery) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
nhUpdates, err := d.nhDiscovery.Scan(ctx, query)
if err != nil {
return nil, err
}
// Currently setting visibility on the neighborhood discovery
// service turns IBE encryption on. We currently don't have the
// infrastructure support for IBE, so that would make our advertisements
// unreadable by everyone.
// Instead we add the visibility list to the attributes of the advertisement
// and filter on the client side. This is a temporary measure until
// IBE is set up. See v.io/i/1345.
updates := make(chan discovery.Update)
go func() {
defer close(updates)
for {
var u discovery.Update
select {
case <-ctx.Done():
return
case u = <-nhUpdates:
}
if u == nil {
continue
}
patterns := splitPatterns(u.Attribute(visibilityKey))
if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
continue
}
updates <- update{u}
}
}()
return updates, nil
}
// Advertise implements v.io/v23/discovery/T.Advertise.
func (d *Discovery) Advertise(ctx *context.T, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
// Currently setting visibility on the neighborhood discovery
// service turns IBE encryption on. We currently don't have the
// infrastructure support for IBE, so that would make our advertisements
// unreadable by everyone.
// Instead we add the visibility list to the attributes of the advertisement
// and filter on the client side. This is a temporary measure until
// IBE is set up. See v.io/i/1345.
adCopy := *ad
if len(visibility) > 0 {
adCopy.Attributes = make(discovery.Attributes, len(ad.Attributes)+1)
for k, v := range ad.Attributes {
adCopy.Attributes[k] = v
}
patterns := joinPatterns(visibility)
adCopy.Attributes[visibilityKey] = patterns
}
ch, err := d.nhDiscovery.Advertise(ctx, &adCopy, nil)
ad.Id = adCopy.Id
return ch, err
}
func matchesPatterns(ctx *context.T, patterns []security.BlessingPattern) bool {
p := v23.GetPrincipal(ctx)
blessings := p.BlessingStore().PeerBlessings()
for _, b := range blessings {
names := security.BlessingNames(p, b)
for _, pattern := range patterns {
if pattern.MatchedBy(names...) {
return true
}
}
}
return false
}
// update wraps the discovery.Update to remove the visibility attribute which we add.
type update struct {
discovery.Update
}
func (u update) Attribute(name string) string {
if name == visibilityKey {
return ""
}
return u.Update.Attribute(name)
}
func (u update) Advertisement() discovery.Advertisement {
cp := u.Update.Advertisement()
orig := cp.Attributes
cp.Attributes = make(discovery.Attributes, len(orig))
for k, v := range orig {
if k != visibilityKey {
cp.Attributes[k] = v
}
}
return cp
}
// blessingSeparator is used to join multiple blessings into a
// single string.
// Note that comma cannot appear in blessings, see:
// v.io/v23/security/certificate.go
const blessingsSeparator = ','
// joinPatterns concatenates the elements of a to create a single string.
// The string can be split again with SplitPatterns.
func joinPatterns(a []security.BlessingPattern) string {
if len(a) == 0 {
return ""
}
if len(a) == 1 {
return string(a[0])
}
n := (len(a) - 1)
for i := 0; i < len(a); i++ {
n += len(a[i])
}
b := make([]byte, n)
bp := copy(b, a[0])
for _, s := range a[1:] {
b[bp] = blessingsSeparator
bp++
bp += copy(b[bp:], s)
}
return string(b)
}
// splitPatterns splits BlessingPatterns that were joined with
// JoinBlessingPattern.
func splitPatterns(patterns string) []security.BlessingPattern {
if patterns == "" {
return nil
}
n := strings.Count(patterns, string(blessingsSeparator)) + 1
out := make([]security.BlessingPattern, n)
last, start := 0, 0
for i, r := range patterns {
if r == blessingsSeparator {
out[last] = security.BlessingPattern(patterns[start:i])
last++
start = i + 1
}
}
out[last] = security.BlessingPattern(patterns[start:])
return out
}
var state struct {
mu nsync.Mu
scans map[security.Principal]*scanState
}
type scanState struct {
dbs map[wire.Id]*inviteQueue
listeners int
cancel context.CancelFunc
}
func newScan(ctx *context.T) (*scanState, error) {
ctx, cancel := context.WithRootCancel(ctx)
scan := &scanState{
dbs: make(map[wire.Id]*inviteQueue),
cancel: cancel,
}
d, err := NewDiscovery(ctx)
if err != nil {
scan.cancel()
return nil, err
}
query := fmt.Sprintf("v.InterfaceName=\"%s/%s\"",
interfaces.SyncDesc.PkgPath, interfaces.SyncDesc.Name)
updates, err := d.Scan(ctx, query)
if err != nil {
scan.cancel()
return nil, err
}
go func() {
for u := range updates {
if invite, db, ok := makeInvite(u.Advertisement()); ok {
state.mu.Lock()
q := scan.dbs[db]
if u.IsLost() {
// TODO(mattr): Removing like this can result in resurfacing already
// retrieved invites. For example if the ACL of a syncgroup is
// changed to add a new memeber, then we might see a remove and then
// later an add. In this case we would end up moving the invite
// to the end of the queue. One way to fix this would be to
// keep removed invites for some time.
if q != nil {
q.remove(invite)
if q.empty() {
delete(scan.dbs, db)
}
}
} else {
if q == nil {
q = newInviteQueue()
scan.dbs[db] = q
}
q.add(invite)
q.cond.Broadcast()
}
state.mu.Unlock()
}
}
}()
return scan, nil
}
// Invite represents an invitation to join a syncgroup as found via Discovery.
type Invite struct {
Syncgroup wire.Id //Syncgroup is the Id of the syncgroup you've been invited to.
Addresses []string //Addresses are the list of addresses of the inviting server.
key string
}
// ListenForInvites listens via Discovery for syncgroup invitations for the given
// database and sends the invites to the provided channel. We stop listening when
// the given context is canceled. When that happens we close the given channel.
func ListenForInvites(ctx *context.T, db wire.Id, ch chan<- Invite) error {
defer state.mu.Unlock()
state.mu.Lock()
p := v23.GetPrincipal(ctx)
scan := state.scans[p]
if scan == nil {
var err error
if scan, err = newScan(ctx); err != nil {
return err
}
if state.scans == nil {
state.scans = make(map[security.Principal]*scanState)
}
state.scans[p] = scan
}
scan.listeners++
q := scan.dbs[db]
if q == nil {
q = newInviteQueue()
scan.dbs[db] = q
}
go func() {
c := q.scan()
for {
next, ok := q.next(ctx, c)
if !ok {
break
}
select {
case ch <- next.copy():
case <-ctx.Done():
break
}
}
close(ch)
defer state.mu.Unlock()
state.mu.Lock()
if q.empty() {
delete(scan.dbs, db)
}
scan.listeners--
if scan.listeners == 0 {
scan.cancel()
delete(state.scans, v23.GetPrincipal(ctx))
}
}()
return nil
}
// inviteQueue is a linked list based queue. As we get new invitations we
// add them to the queue, and when advertisements are lost we remove elements.
// Each call to scan creates a cursor on the queue that will iterate until
// the Listen is canceled. We wait when we hit the end of the queue via the
// condition variable cond.
type inviteQueue struct {
mu nsync.Mu
cond nsync.CV
elems map[string]*ielement
sentinel ielement
cursors map[int]*ielement
nextCursorId int
}
func (q *inviteQueue) debugLocked() string {
buf := &bytes.Buffer{}
fmt.Fprintf(buf, "*%p", &q.sentinel)
for c := q.sentinel.next; c != &q.sentinel; c = c.next {
fmt.Fprintf(buf, " %p", c)
}
return buf.String()
}
type ielement struct {
invite Invite
prev, next *ielement
}
func newInviteQueue() *inviteQueue {
iq := &inviteQueue{
elems: make(map[string]*ielement),
cursors: make(map[int]*ielement),
}
iq.sentinel.next, iq.sentinel.prev = &iq.sentinel, &iq.sentinel
return iq
}
func (q *inviteQueue) empty() bool {
defer q.mu.Unlock()
q.mu.Lock()
return len(q.elems) == 0 && len(q.cursors) == 0
}
func (q *inviteQueue) size() int {
defer q.mu.Unlock()
q.mu.Lock()
return len(q.elems)
}
func (q *inviteQueue) remove(i Invite) {
defer q.mu.Unlock()
q.mu.Lock()
el, ok := q.elems[i.key]
if !ok {
return
}
delete(q.elems, i.key)
el.next.prev, el.prev.next = el.prev, el.next
for id, c := range q.cursors {
if c == el {
q.cursors[id] = c.prev
}
}
}
func (q *inviteQueue) add(i Invite) {
defer q.mu.Unlock()
q.mu.Lock()
if _, ok := q.elems[i.key]; ok {
return
}
el := &ielement{i, q.sentinel.prev, &q.sentinel}
q.sentinel.prev, q.sentinel.prev.next = el, el
q.elems[i.key] = el
q.cond.Broadcast()
}
func (q *inviteQueue) next(ctx *context.T, cursor int) (Invite, bool) {
defer q.mu.Unlock()
q.mu.Lock()
c, exists := q.cursors[cursor]
if !exists {
return Invite{}, false
}
for c.next == &q.sentinel {
if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
delete(q.cursors, cursor)
return Invite{}, false
}
c = q.cursors[cursor]
}
c = c.next
q.cursors[cursor] = c
return c.invite, true
}
func (q *inviteQueue) scan() int {
defer q.mu.Unlock()
q.mu.Lock()
id := q.nextCursorId
q.nextCursorId++
q.cursors[id] = &q.sentinel
return id
}
func makeInvite(ad discovery.Advertisement) (Invite, wire.Id, bool) {
var dbId, sgId wire.Id
var ok bool
if dbId.Name, ok = ad.Attributes[wire.DiscoveryAttrDatabaseName]; !ok {
return Invite{}, dbId, false
}
if dbId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrDatabaseBlessing]; !ok {
return Invite{}, dbId, false
}
if sgId.Name, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupName]; !ok {
return Invite{}, dbId, false
}
if sgId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupBlessing]; !ok {
return Invite{}, dbId, false
}
i := Invite{
Addresses: append([]string{}, ad.Addresses...),
Syncgroup: sgId,
}
sort.Strings(i.Addresses)
i.key = fmt.Sprintf("%v", i)
return i, dbId, true
}
func (i Invite) copy() Invite {
cp := i
cp.Addresses = append([]string(nil), i.Addresses...)
return cp
}