| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package global |
| |
| import ( |
| "fmt" |
| "reflect" |
| "sort" |
| |
| "v.io/v23/context" |
| "v.io/v23/discovery" |
| "v.io/v23/naming" |
| |
| idiscovery "v.io/x/ref/lib/discovery" |
| ) |
| |
| func (d *gdiscovery) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) { |
| matcher, err := idiscovery.NewMatcher(ctx, query) |
| if err != nil { |
| return nil, err |
| } |
| |
| updateCh := make(chan discovery.Update, 10) |
| go func() { |
| defer close(updateCh) |
| |
| var prevFound map[discovery.AdId]*idiscovery.AdInfo |
| for { |
| found, err := d.doScan(ctx, matcher) |
| if found == nil { |
| if err != nil { |
| ctx.Error(err) |
| } |
| } else { |
| sendUpdates(ctx, prevFound, found, updateCh) |
| prevFound = found |
| } |
| |
| select { |
| case <-d.clock.After(d.scanInterval): |
| case <-ctx.Done(): |
| return |
| } |
| } |
| }() |
| return updateCh, nil |
| } |
| |
| func (d *gdiscovery) doScan(ctx *context.T, matcher idiscovery.Matcher) (map[discovery.AdId]*idiscovery.AdInfo, error) { |
| scanCh, err := d.ns.Glob(ctx, generateGlobQuery(matcher)) |
| if err != nil { |
| return nil, err |
| } |
| defer func() { |
| for range scanCh { |
| } |
| }() |
| |
| found := make(map[discovery.AdId]*idiscovery.AdInfo) |
| for { |
| select { |
| case glob, ok := <-scanCh: |
| if !ok { |
| return found, nil |
| } |
| adinfo, err := convToAdInfo(glob) |
| if err != nil { |
| ctx.Error(err) |
| continue |
| } |
| // Since mount operations are not atomic, we may not have addresses yet. |
| // Ignore it. It will be re-scanned in the next cycle. |
| if len(adinfo.Ad.Addresses) == 0 { |
| continue |
| } |
| // Filter out advertisements from the same discovery instance. |
| if d.hasAd(&adinfo.Ad) { |
| continue |
| } |
| matched, err := matcher.Match(&adinfo.Ad) |
| if err != nil { |
| ctx.Error(err) |
| continue |
| } |
| if !matched { |
| continue |
| } |
| found[adinfo.Ad.Id] = adinfo |
| case <-ctx.Done(): |
| return nil, nil |
| } |
| } |
| } |
| |
| func generateGlobQuery(matcher idiscovery.Matcher) string { |
| // The suffixes are of the form "id/interfaceName/timestamp/attrs" so we need |
| // to replace wildcards in our query with values based on the query matcher. |
| id, interfaceName, timestamp, attrs := "*", "*", "*", "*" |
| // Currently we support query by id or interfaceName. |
| if targetKey := matcher.TargetKey(); targetKey != "" { |
| id = targetKey |
| } |
| if targetInterface := matcher.TargetInterfaceName(); targetInterface != "" { |
| interfaceName = targetInterface |
| } |
| return naming.Join(id, interfaceName, timestamp, attrs) |
| } |
| |
| func (d *gdiscovery) hasAd(ad *discovery.Advertisement) bool { |
| d.mu.Lock() |
| _, ok := d.ads[ad.Id] |
| d.mu.Unlock() |
| return ok |
| } |
| |
| func convToAdInfo(glob naming.GlobReply) (*idiscovery.AdInfo, error) { |
| switch g := glob.(type) { |
| case *naming.GlobReplyEntry: |
| ad, timestampNs, err := decodeAdFromSuffix(g.Value.Name) |
| if err != nil { |
| return nil, err |
| } |
| addrs := make([]string, 0, len(g.Value.Servers)) |
| for _, server := range g.Value.Servers { |
| addrs = append(addrs, server.Server) |
| } |
| // We sort the addresses to avoid false updates. |
| sort.Strings(addrs) |
| ad.Addresses = addrs |
| return &idiscovery.AdInfo{Ad: *ad, TimestampNs: timestampNs}, nil |
| case *naming.GlobReplyError: |
| return nil, fmt.Errorf("glob error on %s: %v", g.Value.Name, g.Value.Error) |
| default: |
| return nil, fmt.Errorf("unexpected glob reply %v", g) |
| } |
| } |
| |
| func sendUpdates(ctx *context.T, prevFound, found map[discovery.AdId]*idiscovery.AdInfo, updateCh chan<- discovery.Update) { |
| for id, adinfo := range found { |
| var updates []discovery.Update |
| if prev := prevFound[id]; prev == nil { |
| updates = []discovery.Update{idiscovery.NewUpdate(adinfo)} |
| } else { |
| if !reflect.DeepEqual(prev.Ad, adinfo.Ad) { |
| prev.Lost = true |
| updates = []discovery.Update{ |
| idiscovery.NewUpdate(prev), |
| idiscovery.NewUpdate(adinfo), |
| } |
| } |
| delete(prevFound, id) |
| } |
| for _, update := range updates { |
| select { |
| case updateCh <- update: |
| case <-ctx.Done(): |
| return |
| } |
| } |
| } |
| |
| for _, prev := range prevFound { |
| prev.Lost = true |
| update := idiscovery.NewUpdate(prev) |
| select { |
| case updateCh <- update: |
| case <-ctx.Done(): |
| return |
| } |
| } |
| } |