blob: 229a11aebd13fe5ac3b62b13c64f7d9dc74fadff [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"bytes"
"reflect"
"sync"
"v.io/v23/context"
"v.io/x/ref/lib/discovery"
)
type plugin struct {
mu sync.Mutex
services map[string][]discovery.Advertisement // GUARDED_BY(mu)
updated *sync.Cond
updateSeq int // GUARDED_BY(mu)
}
func (p *plugin) Advertise(ctx *context.T, ad discovery.Advertisement, done func()) error {
p.RegisterAdvertisement(ad)
go func() {
defer done()
<-ctx.Done()
p.UnregisterAdvertisement(ad)
}()
return nil
}
func (p *plugin) Scan(ctx *context.T, serviceUuid discovery.Uuid, ch chan<- discovery.Advertisement, done func()) error {
rescan := make(chan struct{})
go func() {
var updateSeqSeen int
for {
p.mu.Lock()
for updateSeqSeen == p.updateSeq {
p.updated.Wait()
}
updateSeqSeen = p.updateSeq
p.mu.Unlock()
select {
case rescan <- struct{}{}:
case <-ctx.Done():
return
}
}
}()
go func() {
defer done()
scanned := make(map[string]discovery.Advertisement)
for {
current := make(map[string]discovery.Advertisement)
p.mu.Lock()
for key, ads := range p.services {
if len(serviceUuid) > 0 && key != string(serviceUuid) {
continue
}
for _, ad := range ads {
current[string(ad.Service.InstanceUuid)] = ad
}
}
p.mu.Unlock()
changed := make([]discovery.Advertisement, 0, len(current))
for key, ad := range current {
old, ok := scanned[key]
if !ok || !reflect.DeepEqual(old, ad) {
changed = append(changed, ad)
}
}
for key, ad := range scanned {
if _, ok := current[key]; !ok {
ad.Lost = true
changed = append(changed, ad)
}
}
// Push new changes.
for _, ad := range changed {
select {
case ch <- ad:
case <-ctx.Done():
return
}
}
scanned = current
// Wait the next update.
select {
case <-rescan:
case <-ctx.Done():
return
}
}
}()
return nil
}
// RegisterService registers an advertisement service to the plugin. If there is
// an advertisement with the same instance uuid, it will be updated with the
// given advertisement.
func (p *plugin) RegisterAdvertisement(ad discovery.Advertisement) {
p.mu.Lock()
key := string(ad.ServiceUuid)
ads := p.services[key]
if i := findAd(ads, ad.Service.InstanceUuid); i >= 0 {
ads[i] = ad
} else {
ads = append(ads, ad)
}
p.services[key] = ads
p.updateSeq++
p.mu.Unlock()
p.updated.Broadcast()
}
// UnregisterAdvertisement unregisters a registered service from the plugin.
func (p *plugin) UnregisterAdvertisement(ad discovery.Advertisement) {
p.mu.Lock()
key := string(ad.ServiceUuid)
ads := p.services[key]
if i := findAd(ads, ad.Service.InstanceUuid); i >= 0 {
ads = append(ads[:i], ads[i+1:]...)
if len(ads) > 0 {
p.services[key] = ads
} else {
delete(p.services, key)
}
p.updateSeq++
}
p.mu.Unlock()
p.updated.Broadcast()
}
func findAd(ads []discovery.Advertisement, instanceUuid []byte) int {
for i, ad := range ads {
if bytes.Equal(ad.Service.InstanceUuid, instanceUuid) {
return i
}
}
return -1
}
func New() *plugin {
p := &plugin{services: make(map[string][]discovery.Advertisement)}
p.updated = sync.NewCond(&p.mu)
return p
}