blob: 4894e4f976cf1a05d6c9bba423f9ebb90bd507bc [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vine
import (
"sync"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/rpc"
"v.io/v23/security"
idiscovery "v.io/x/ref/lib/discovery"
"v.io/x/ref/lib/timekeeper"
)
type store struct {
mu sync.Mutex
adinfoMap map[string]map[discovery.AdId]*idiscovery.AdInfo // GUARDED_BY(mu)
expirations map[discovery.AdId]time.Time // GUARDED_BY(mu)
updated *sync.Cond
updateSeq int // GUARDED_BY(mu)
clock timekeeper.TimeKeeper
stop func()
}
func (s *store) Add(_ *context.T, _ rpc.ServerCall, adinfo idiscovery.AdInfo, ttl time.Duration) error {
s.mu.Lock()
defer s.mu.Unlock()
// Added adinfo to empty key ("") as well to simplify wildcard scan.
for _, key := range []string{adinfo.Ad.InterfaceName, ""} {
adinfos := s.adinfoMap[key]
if adinfos == nil {
adinfos = make(map[discovery.AdId]*idiscovery.AdInfo)
s.adinfoMap[key] = adinfos
}
adinfos[adinfo.Ad.Id] = &adinfo
}
s.expirations[adinfo.Ad.Id] = s.clock.Now().Add(ttl)
s.updateSeq++
s.updated.Broadcast()
return nil
}
func (s *store) Delete(_ *context.T, _ rpc.ServerCall, id discovery.AdId) error {
s.mu.Lock()
defer s.mu.Unlock()
s.deleteLocked(id)
return nil
}
func (s *store) lookup(interfaceName string) map[discovery.AdId]idiscovery.AdInfo {
s.mu.Lock()
defer s.mu.Unlock()
adinfos := make(map[discovery.AdId]idiscovery.AdInfo, len(s.adinfoMap[interfaceName]))
for id, adinfo := range s.adinfoMap[interfaceName] {
adinfos[id] = *adinfo
}
return adinfos
}
func (s *store) deleteLocked(id discovery.AdId) {
for key, adinfos := range s.adinfoMap {
delete(adinfos, id)
if len(adinfos) == 0 {
delete(s.adinfoMap, key)
}
}
delete(s.expirations, id)
s.updateSeq++
s.updated.Broadcast()
}
func (s *store) listenToUpdates(ctx *context.T) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
var updateSeqSeen int
for {
s.mu.Lock()
for updateSeqSeen == s.updateSeq {
s.updated.Wait()
}
updateSeqSeen = s.updateSeq
s.mu.Unlock()
select {
case ch <- struct{}{}:
case <-ctx.Done():
return
}
}
}()
return ch
}
func (s *store) flushExpired(ctx *context.T) {
updated := s.listenToUpdates(ctx)
for {
s.mu.Lock()
now := s.clock.Now()
var nextExpiry time.Time
for id, expiry := range s.expirations {
if expiry.After(now) {
if nextExpiry.IsZero() || expiry.Before(nextExpiry) {
nextExpiry = expiry
}
continue
}
s.deleteLocked(id)
}
s.mu.Unlock()
var timer <-chan time.Time
if !nextExpiry.IsZero() {
timer = s.clock.After(nextExpiry.Sub(now))
}
select {
case <-timer:
case <-updated:
case <-ctx.Done():
return
}
}
}
func newStore(ctx *context.T, name string, clock timekeeper.TimeKeeper) (*store, error) {
s := &store{
adinfoMap: make(map[string]map[discovery.AdId]*idiscovery.AdInfo),
expirations: make(map[discovery.AdId]time.Time),
clock: clock,
}
s.updated = sync.NewCond(&s.mu)
ctx, cancel := context.WithCancel(ctx)
// TODO(suharshs): Make the authorizer configurable?
_, server, err := v23.WithNewServer(ctx, name, StoreServer(s), security.AllowEveryone())
if err != nil {
cancel()
return nil, err
}
go s.flushExpired(ctx)
s.stop = func() {
cancel()
<-server.Closed()
}
return s, nil
}