blob: 5c8511cfce1a77b8b0e03dfdc9f7499f6aaa1ce9 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package discovery
import (
"sort"
"sync"
"v.io/v23/context"
"v.io/v23/discovery"
)
func (d *idiscovery) scan(ctx *context.T, session sessionId, query string) (<-chan discovery.Update, error) {
// TODO(jhahn): Consider to use multiple target services so that the plugins
// can filter advertisements more efficiently if possible.
matcher, err := NewMatcher(ctx, query)
if err != nil {
return nil, err
}
ctx, cancel, err := d.addTask(ctx)
if err != nil {
return nil, err
}
// TODO(jhahn): Revisit the buffer size.
scanCh := make(chan *AdInfo, 10)
updateCh := make(chan discovery.Update, 10)
barrier := NewBarrier(func() {
close(scanCh)
close(updateCh)
d.removeTask(ctx)
})
for _, plugin := range d.plugins {
if err := plugin.Scan(ctx, matcher.TargetInterfaceName(), scanCh, barrier.Add()); err != nil {
cancel()
return nil, err
}
}
go d.doScan(ctx, session, matcher, scanCh, updateCh, barrier.Add())
return updateCh, nil
}
func (d *idiscovery) doScan(ctx *context.T, session sessionId, matcher Matcher, scanCh chan *AdInfo, updateCh chan<- discovery.Update, done func()) {
// Some plugins may not return a full advertisement information when it is lost.
// So we keep the advertisements that we've seen so that we can provide the
// full advertisement information when it is lost. Note that plugins will not
// include attachments unless they're tiny enough.
seen := make(map[discovery.AdId]*AdInfo)
var wg sync.WaitGroup
defer func() {
wg.Wait()
done()
}()
for {
select {
case adinfo := <-scanCh:
if !adinfo.Lost {
// Filter out advertisements from the same session.
if d.getAdSession(adinfo.Ad.Id) == session {
continue
}
// Filter out already seen advertisements.
if prev := seen[adinfo.Ad.Id]; prev != nil && prev.Status == AdReady && prev.Hash == adinfo.Hash {
continue
}
if adinfo.Status == AdReady {
// Clear the unnecessary directory addresses.
adinfo.DirAddrs = nil
} else {
if len(adinfo.DirAddrs) == 0 {
ctx.Errorf("no directory address available for partial advertisement %v - ignored", adinfo.Ad.Id)
continue
}
// Fetch not-ready-to-serve advertisements from the directory server.
if adinfo.Status == AdNotReady {
wg.Add(1)
go fetchAd(ctx, adinfo.DirAddrs, adinfo.Ad.Id, scanCh, wg.Done)
continue
}
// Sort the directory addresses to make it easy to compare.
sort.Strings(adinfo.DirAddrs)
}
}
if err := decrypt(ctx, adinfo); err != nil {
// Couldn't decrypt it. Ignore it.
if err != errNoPermission {
ctx.Error(err)
}
continue
}
// Note that 'Lost' advertisement may not have full information. Thus we do not match
// the query against it. newUpdates() will ignore it if it has not been scanned.
if !adinfo.Lost {
matched, err := matcher.Match(&adinfo.Ad)
if err != nil {
ctx.Error(err)
continue
}
if !matched {
continue
}
}
for _, update := range newUpdates(seen, adinfo) {
select {
case updateCh <- update:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}
func fetchAd(ctx *context.T, dirAddrs []string, id discovery.AdId, scanCh chan<- *AdInfo, done func()) {
defer done()
dir := newDirClient(dirAddrs)
adinfo, err := dir.Lookup(ctx, id)
if err != nil {
select {
case <-ctx.Done():
default:
ctx.Error(err)
}
return
}
select {
case scanCh <- adinfo:
case <-ctx.Done():
}
}
func newUpdates(seen map[discovery.AdId]*AdInfo, adinfo *AdInfo) []discovery.Update {
var updates []discovery.Update
// The multiple plugins may return the same advertisements. We ignores the update
// if it has been already sent through the update channel.
prev := seen[adinfo.Ad.Id]
if adinfo.Lost {
// TODO(jhahn): If some plugins return 'Lost' events for an advertisement update, we may
// generates multiple 'Lost' and 'Found' events for the same update. In order to minimize
// this flakiness, we may need to delay the handling of 'Lost'.
if prev != nil {
delete(seen, prev.Ad.Id)
prev.Lost = true
updates = []discovery.Update{NewUpdate(prev)}
}
} else {
// TODO(jhahn): Need to compare the proximity as well.
switch {
case prev == nil:
updates = []discovery.Update{NewUpdate(adinfo)}
seen[adinfo.Ad.Id] = adinfo
case prev.TimestampNs > adinfo.TimestampNs:
// Ignore the old advertisement.
case prev.Hash != adinfo.Hash || (prev.Status != AdReady && !sortedStringsEqual(prev.DirAddrs, adinfo.DirAddrs)):
prev.Lost = true
updates = []discovery.Update{NewUpdate(prev), NewUpdate(adinfo)}
seen[adinfo.Ad.Id] = adinfo
}
}
return updates
}