blob: 67d43cbc22e91d70740539555a769766c9ccbce3 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package discovery
import (
"fmt"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/x/ref/lib/stats"
)
const (
msPerSec = int64(time.Second / time.Millisecond)
nsPerMs = int64(time.Millisecond / time.Nanosecond)
)
func (d *idiscovery) advertise(ctx *context.T, session sessionId, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
if !ad.Id.IsValid() {
var err error
if ad.Id, err = discovery.NewAdId(); err != nil {
return nil, err
}
}
if err := validateAd(ad); err != nil {
return nil, NewErrBadAdvertisement(ctx, err)
}
adinfo := &AdInfo{Ad: *ad}
if err := encrypt(ctx, adinfo, visibility); err != nil {
return nil, err
}
hashAd(adinfo)
adinfo.TimestampNs = d.newAdTimestampNs()
ctx, cancel, err := d.addTask(ctx)
if err != nil {
return nil, err
}
id := adinfo.Ad.Id
if !d.addAd(id, session) {
cancel()
d.removeTask(ctx)
return nil, NewErrAlreadyBeingAdvertised(ctx, id)
}
subtask := &adSubtask{parent: ctx}
d.adMu.Lock()
d.adSubtasks[id] = subtask
d.adMu.Unlock()
done := make(chan struct{})
stop := func() {
d.stopAdvertising(id)
d.dirServer.unpublish(id)
d.removeAd(id)
d.removeTask(ctx)
close(done)
}
// Lock the subtask to prevent any update from directory server endpoint changes while
// the advertising is being started to not lose any endpoint change during starting.
subtask.mu.Lock()
d.dirServer.publish(adinfo)
subtask.stop, err = d.startAdvertising(ctx, adinfo)
subtask.mu.Unlock()
if err != nil {
cancel()
stop()
return nil, err
}
d.adStopTrigger.Add(stop, ctx.Done())
return done, nil
}
func (d *idiscovery) newAdTimestampNs() int64 {
now := time.Now()
timestampNs := now.UnixNano()
d.adMu.Lock()
if d.adTimestampNs >= timestampNs {
timestampNs = d.adTimestampNs + 1
}
d.adTimestampNs = timestampNs
d.adMu.Unlock()
return timestampNs
}
func (d *idiscovery) addAd(id discovery.AdId, session sessionId) bool {
d.adMu.Lock()
if _, exist := d.adSessions[id]; exist {
d.adMu.Unlock()
return false
}
d.adSessions[id] = session
d.adMu.Unlock()
return true
}
func (d *idiscovery) removeAd(id discovery.AdId) {
d.adMu.Lock()
delete(d.adSessions, id)
d.adMu.Unlock()
}
func (d *idiscovery) getAdSession(id discovery.AdId) sessionId {
d.adMu.Lock()
session := d.adSessions[id]
d.adMu.Unlock()
return session
}
func (d *idiscovery) startAdvertising(ctx *context.T, adinfo *AdInfo) (func(), error) {
statName := naming.Join(d.statsPrefix, "ad", adinfo.Ad.Id.String())
stats.NewStringFunc(statName, func() string { return fmt.Sprint(*adinfo) })
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for _, plugin := range d.plugins {
wg.Add(1)
if err := plugin.Advertise(ctx, adinfo, wg.Done); err != nil {
cancel()
return nil, err
}
}
stop := func() {
stats.Delete(statName)
cancel()
wg.Wait()
}
return stop, nil
}
func (d *idiscovery) stopAdvertising(id discovery.AdId) {
d.adMu.Lock()
subtask := d.adSubtasks[id]
delete(d.adSubtasks, id)
d.adMu.Unlock()
if subtask == nil {
return
}
subtask.mu.Lock()
if subtask.stop != nil {
subtask.stop()
subtask.stop = nil
}
subtask.mu.Unlock()
}
func (d *idiscovery) updateAdvertising(adinfo *AdInfo) {
d.adMu.Lock()
subtask := d.adSubtasks[adinfo.Ad.Id]
if subtask == nil {
d.adMu.Unlock()
return
}
d.adMu.Unlock()
subtask.mu.Lock()
defer subtask.mu.Unlock()
if subtask.stop == nil {
return
}
subtask.stop()
ctx := subtask.parent
var err error
subtask.stop, err = d.startAdvertising(ctx, adinfo)
if err != nil {
ctx.Error(err)
d.cancelTask(ctx)
}
}