blob: 96a35bbb56eb1c9a1cd77e0d279f2ca7ad555b7f [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package discovery
import (
"reflect"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/security"
)
// Scan implements discovery.Scanner.
func (ds *ds) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
// TODO(jhahn): Consider to use multiple target services so that the plugins
// can filter advertisements more efficiently if possible.
matcher, targetInterfaceName, err := newMatcher(ctx, query)
if err != nil {
return nil, err
}
var targetServiceUuid Uuid
if len(targetInterfaceName) > 0 {
targetServiceUuid = NewServiceUUID(targetInterfaceName)
}
ctx, cancel, err := ds.addTask(ctx)
if err != nil {
return nil, err
}
// TODO(jhahn): Revisit the buffer size.
scanCh := make(chan Advertisement, 10)
barrier := NewBarrier(func() {
close(scanCh)
ds.removeTask(ctx)
})
for _, plugin := range ds.plugins {
if err := plugin.Scan(ctx, targetServiceUuid, scanCh, barrier.Add()); err != nil {
cancel()
return nil, err
}
}
// TODO(jhahn): Revisit the buffer size.
updateCh := make(chan discovery.Update, 10)
go doScan(ctx, matcher, scanCh, updateCh)
return updateCh, nil
}
func doScan(ctx *context.T, matcher matcher, scanCh <-chan Advertisement, updateCh chan<- discovery.Update) {
defer close(updateCh)
// Get the blessing names belong to the principal.
//
// TODO(jhahn): It isn't clear that we will always have the blessing required to decrypt
// the advertisement as their "default" blessing - indeed it may not even be in the store.
// Revisit this issue.
principal := v23.GetPrincipal(ctx)
var names []string
if principal != nil {
names = security.BlessingNames(principal, principal.BlessingStore().Default())
}
found := make(map[string]*Advertisement)
for {
select {
case ad := <-scanCh:
if err := decrypt(&ad, names); err != nil {
// Couldn't decrypt it. Ignore it.
if err != errNoPermission {
ctx.Error(err)
}
continue
}
// Note that 'Lost' advertisement may not have full service information.
// Thus we do not match the query against it. mergeAdvertisement() will
// ignore it if it has not been scanned.
if !ad.Lost {
if !matcher.match(&ad) {
continue
}
}
for _, update := range mergeAdvertisement(found, &ad) {
select {
case updateCh <- update:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}
func mergeAdvertisement(found map[string]*Advertisement, ad *Advertisement) (updates []discovery.Update) {
// The multiple plugins may return the same advertisements. We ignores the update
// if it has been already sent through the update channel.
prev := found[ad.Service.InstanceId]
if ad.Lost {
// TODO(jhahn): If some plugins return 'Lost' events for an advertisement update, we may
// generates multiple 'Lost' and 'Found' events for the same update. In order to minimize
// this flakiness, we may need to delay the handling of 'Lost'.
if prev != nil {
delete(found, ad.Service.InstanceId)
updates = []discovery.Update{discovery.UpdateLost{discovery.Lost{InstanceId: ad.Service.InstanceId}}}
}
} else {
// TODO(jhahn): Need to compare the proximity as well.
switch {
case prev == nil:
updates = []discovery.Update{discovery.UpdateFound{discovery.Found{Service: ad.Service}}}
case !reflect.DeepEqual(prev.Service, ad.Service):
updates = []discovery.Update{
discovery.UpdateLost{discovery.Lost{InstanceId: ad.Service.InstanceId}},
discovery.UpdateFound{discovery.Found{Service: copyService(&ad.Service)}},
}
}
found[ad.Service.InstanceId] = ad
}
return
}