discovery: Add timestamp to discovery.Update.
The syncbase discovery service needs to be able to order updates from
global discovery and neighborhood discovery based on timestamps in the
case of a changed advertisement.
This change exposes timestamps from discovery.Update.
MultiPart: 3/5
Change-Id: I6bd76a5d2c2e3ade28fa8abf698af807a356a193
diff --git a/lib/discovery/global/advertise.go b/lib/discovery/global/advertise.go
index 312c68e..2679e8f 100644
--- a/lib/discovery/global/advertise.go
+++ b/lib/discovery/global/advertise.go
@@ -5,6 +5,8 @@
package global
import (
+ "time"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
@@ -42,7 +44,7 @@
// TODO(jhahn): There is no atomic way to check and reserve the name under mounttable.
// For example, the name can be overwritten by other applications of the same owner.
// But this would be OK for now.
- name, err := encodeAdToSuffix(ad)
+ name, err := encodeAdToSuffix(ad, d.newAdTimestampNs())
if err != nil {
return nil, err
}
@@ -76,6 +78,18 @@
return done, nil
}
+func (d *gdiscovery) newAdTimestampNs() int64 {
+ now := time.Now()
+ timestampNs := now.UnixNano()
+ d.mu.Lock()
+ if d.adTimestampNs >= timestampNs {
+ timestampNs = d.adTimestampNs + 1
+ }
+ d.adTimestampNs = timestampNs
+ d.mu.Unlock()
+ return timestampNs
+}
+
func (d *gdiscovery) addAd(ad *discovery.Advertisement) bool {
d.mu.Lock()
if _, exist := d.ads[ad.Id]; exist {
diff --git a/lib/discovery/global/encoding.go b/lib/discovery/global/encoding.go
index 9bb0a59..5d3e324 100644
--- a/lib/discovery/global/encoding.go
+++ b/lib/discovery/global/encoding.go
@@ -5,6 +5,7 @@
package global
import (
+ "strconv"
"strings"
"v.io/v23/discovery"
@@ -14,38 +15,44 @@
// encodeAdToSuffix encodes the ad.Id and the ad.Attributes into the suffix at
// which we mount the advertisement.
+// The format of the generated suffix is id/timestamp/attributes.
//
// TODO(suharshs): Currently only the id and the attributes are encoded; we may
// want to encode the rest of the advertisement someday?
-func encodeAdToSuffix(ad *discovery.Advertisement) (string, error) {
+func encodeAdToSuffix(ad *discovery.Advertisement, timestampNs int64) (string, error) {
b, err := vom.Encode(ad.Attributes)
if err != nil {
return "", err
}
// Escape suffixDelim to use it as our delimeter between the id and the attrs.
id := ad.Id.String()
+ timestamp := strconv.FormatInt(timestampNs, 10)
attr := naming.EncodeAsNameElement(string(b))
- return naming.Join(id, attr), nil
+ return naming.Join(id, timestamp, attr), nil
}
-// decodeAdFromSuffix decodes s into an advertisement.
-func decodeAdFromSuffix(in string) (*discovery.Advertisement, error) {
- parts := strings.SplitN(in, "/", 2)
- if len(parts) != 2 {
- return nil, NewErrAdInvalidEncoding(nil, in)
+// decodeAdFromSuffix decodes in into an advertisement.
+// The format of the input suffix is id/timestamp/attributes.
+func decodeAdFromSuffix(in string) (*discovery.Advertisement, int64, error) {
+ parts := strings.SplitN(in, "/", 3)
+ if len(parts) != 3 {
+ return nil, 0, NewErrAdInvalidEncoding(nil, in)
}
- id, attrs := parts[0], parts[1]
- attrs, ok := naming.DecodeFromNameElement(attrs)
- if !ok {
- return nil, NewErrAdInvalidEncoding(nil, in)
- }
- ad := &discovery.Advertisement{}
var err error
- if ad.Id, err = discovery.ParseAdId(id); err != nil {
- return nil, err
+ ad := &discovery.Advertisement{}
+ if ad.Id, err = discovery.ParseAdId(parts[0]); err != nil {
+ return nil, 0, err
+ }
+ timestampNs, err := strconv.ParseInt(parts[1], 10, 64)
+ if err != nil {
+ return nil, 0, err
+ }
+ attrs, ok := naming.DecodeFromNameElement(parts[2])
+ if !ok {
+ return nil, 0, NewErrAdInvalidEncoding(nil, in)
}
if err = vom.Decode([]byte(attrs), &ad.Attributes); err != nil {
- return nil, err
+ return nil, 0, err
}
- return ad, nil
+ return ad, timestampNs, nil
}
diff --git a/lib/discovery/global/encoding_test.go b/lib/discovery/global/encoding_test.go
index cf4b0db..63f369c 100644
--- a/lib/discovery/global/encoding_test.go
+++ b/lib/discovery/global/encoding_test.go
@@ -7,28 +7,35 @@
import (
"reflect"
"testing"
+ "time"
"v.io/v23/discovery"
)
func TestAdSuffix(t *testing.T) {
- testCases := []discovery.Advertisement{
- {},
- {Id: discovery.AdId{1, 2, 3}},
- {Attributes: discovery.Attributes{"k": "v"}},
- {Id: discovery.AdId{1, 2, 3}, Attributes: discovery.Attributes{"k": "v"}},
+ testCases := []testCase{
+ {ad: &discovery.Advertisement{}, timestampNs: time.Now().UnixNano()},
+ {ad: &discovery.Advertisement{Id: discovery.AdId{1, 2, 3}}, timestampNs: time.Now().UnixNano()},
+ {ad: &discovery.Advertisement{Attributes: discovery.Attributes{"k": "v"}}, timestampNs: time.Now().UnixNano()},
+ {ad: &discovery.Advertisement{Id: discovery.AdId{1, 2, 3}, Attributes: discovery.Attributes{"k": "v"}}, timestampNs: time.Now().UnixNano()},
}
- for _, want := range testCases {
- encAd, err := encodeAdToSuffix(&want)
+ for i, want := range testCases {
+ encAd, err := encodeAdToSuffix(want.ad, want.timestampNs)
if err != nil {
t.Error(err)
}
- got, err := decodeAdFromSuffix(encAd)
+ var got testCase
+ got.ad, got.timestampNs, err = decodeAdFromSuffix(encAd)
if err != nil {
t.Error(err)
}
- if !reflect.DeepEqual(*got, want) {
- t.Errorf("got %v, want %v", *got, want)
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("#%d: got %#v, want %#v", i, got, want)
}
}
}
+
+type testCase struct {
+ ad *discovery.Advertisement
+ timestampNs int64
+}
diff --git a/lib/discovery/global/global.go b/lib/discovery/global/global.go
index c66ca2b..6066649 100644
--- a/lib/discovery/global/global.go
+++ b/lib/discovery/global/global.go
@@ -30,8 +30,9 @@
type gdiscovery struct {
ns namespace.T
- mu sync.Mutex
- ads map[discovery.AdId]struct{} // GUARDED_BY(mu)
+ mu sync.Mutex
+ ads map[discovery.AdId]struct{} // GUARDED_BY(mu)
+ adTimestampNs int64 // GUARDED_BY(mu)
clock timekeeper.TimeKeeper
diff --git a/lib/discovery/global/scan.go b/lib/discovery/global/scan.go
index dec7be0..489ada1 100644
--- a/lib/discovery/global/scan.go
+++ b/lib/discovery/global/scan.go
@@ -26,7 +26,7 @@
go func() {
defer close(updateCh)
- var prevFound map[discovery.AdId]*discovery.Advertisement
+ var prevFound map[discovery.AdId]*idiscovery.AdInfo
for {
found, err := d.doScan(ctx, matcher.TargetKey(), matcher)
if found == nil {
@@ -48,7 +48,7 @@
return updateCh, nil
}
-func (d *gdiscovery) doScan(ctx *context.T, target string, matcher idiscovery.Matcher) (map[discovery.AdId]*discovery.Advertisement, error) {
+func (d *gdiscovery) doScan(ctx *context.T, target string, matcher idiscovery.Matcher) (map[discovery.AdId]*idiscovery.AdInfo, error) {
// If the target is neither empty nor a valid AdId, we return without an error,
// since there will be not entries with the requested target length in the namespace.
if len(target) > 0 {
@@ -61,9 +61,9 @@
// In the case where target is a AdId we need to scan for entries prefixed with
// the AdId with any encoded attributes afterwards.
if len(target) == 0 {
- target = naming.Join("*", "*")
+ target = naming.Join("*", "*", "*")
} else {
- target = naming.Join(target, "*")
+ target = naming.Join(target, "*", "*")
}
scanCh, err := d.ns.Glob(ctx, target)
if err != nil {
@@ -74,28 +74,28 @@
}
}()
- found := make(map[discovery.AdId]*discovery.Advertisement)
+ found := make(map[discovery.AdId]*idiscovery.AdInfo)
for {
select {
case glob, ok := <-scanCh:
if !ok {
return found, nil
}
- ad, err := convToAd(glob)
+ adinfo, err := convToAdInfo(glob)
if err != nil {
ctx.Error(err)
continue
}
// Since mount operations are not atomic, we may not have addresses yet.
// Ignore it. It will be re-scanned in the next cycle.
- if len(ad.Addresses) == 0 {
+ if len(adinfo.Ad.Addresses) == 0 {
continue
}
// Filter out advertisements from the same discovery instance.
- if d.hasAd(ad) {
+ if d.hasAd(&adinfo.Ad) {
continue
}
- matched, err := matcher.Match(ad)
+ matched, err := matcher.Match(&adinfo.Ad)
if err != nil {
ctx.Error(err)
continue
@@ -103,7 +103,7 @@
if !matched {
continue
}
- found[ad.Id] = ad
+ found[adinfo.Ad.Id] = adinfo
case <-ctx.Done():
return nil, nil
}
@@ -117,10 +117,10 @@
return ok
}
-func convToAd(glob naming.GlobReply) (*discovery.Advertisement, error) {
+func convToAdInfo(glob naming.GlobReply) (*idiscovery.AdInfo, error) {
switch g := glob.(type) {
case *naming.GlobReplyEntry:
- ad, err := decodeAdFromSuffix(g.Value.Name)
+ ad, timestampNs, err := decodeAdFromSuffix(g.Value.Name)
if err != nil {
return nil, err
}
@@ -131,7 +131,7 @@
// We sort the addresses to avoid false updates.
sort.Strings(addrs)
ad.Addresses = addrs
- return ad, nil
+ return &idiscovery.AdInfo{Ad: *ad, TimestampNs: timestampNs}, nil
case *naming.GlobReplyError:
return nil, fmt.Errorf("glob error on %s: %v", g.Value.Name, g.Value.Error)
default:
@@ -139,16 +139,17 @@
}
}
-func sendUpdates(ctx *context.T, prevFound, found map[discovery.AdId]*discovery.Advertisement, updateCh chan<- discovery.Update) {
- for id, ad := range found {
+func sendUpdates(ctx *context.T, prevFound, found map[discovery.AdId]*idiscovery.AdInfo, updateCh chan<- discovery.Update) {
+ for id, adinfo := range found {
var updates []discovery.Update
if prev := prevFound[id]; prev == nil {
- updates = []discovery.Update{idiscovery.NewUpdate(&idiscovery.AdInfo{Ad: *ad})}
+ updates = []discovery.Update{idiscovery.NewUpdate(adinfo)}
} else {
- if !reflect.DeepEqual(prev, ad) {
+ if !reflect.DeepEqual(prev.Ad, adinfo.Ad) {
+ prev.Lost = true
updates = []discovery.Update{
- idiscovery.NewUpdate(&idiscovery.AdInfo{Ad: *prev, Lost: true}),
- idiscovery.NewUpdate(&idiscovery.AdInfo{Ad: *ad}),
+ idiscovery.NewUpdate(prev),
+ idiscovery.NewUpdate(adinfo),
}
}
delete(prevFound, id)
@@ -163,7 +164,8 @@
}
for _, prev := range prevFound {
- update := idiscovery.NewUpdate(&idiscovery.AdInfo{Ad: *prev, Lost: true})
+ prev.Lost = true
+ update := idiscovery.NewUpdate(prev)
select {
case updateCh <- update:
case <-ctx.Done():
diff --git a/lib/discovery/update.go b/lib/discovery/update.go
index 21fc3a9..fc761a1 100644
--- a/lib/discovery/update.go
+++ b/lib/discovery/update.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "time"
"v.io/v23/context"
"v.io/v23/discovery"
@@ -13,11 +14,12 @@
// update is an implementation of discovery.Update.
type update struct {
- ad discovery.Advertisement
- hash AdHash
- dirAddrs []string
- status AdStatus
- lost bool
+ ad discovery.Advertisement
+ hash AdHash
+ dirAddrs []string
+ status AdStatus
+ lost bool
+ timestamp time.Time
}
func (u *update) IsLost() bool { return u.lost }
@@ -78,6 +80,10 @@
return ad
}
+func (u *update) Timestamp() time.Time {
+ return u.timestamp
+}
+
func (u *update) String() string {
return fmt.Sprintf("{%v %s %s %v %v}", u.lost, u.ad.Id, u.ad.InterfaceName, u.ad.Addresses, u.ad.Attributes)
}
@@ -85,10 +91,11 @@
// NewUpdate returns a new update with the given advertisement information.
func NewUpdate(adinfo *AdInfo) discovery.Update {
return &update{
- ad: adinfo.Ad,
- hash: adinfo.Hash,
- dirAddrs: adinfo.DirAddrs,
- status: adinfo.Status,
- lost: adinfo.Lost,
+ ad: adinfo.Ad,
+ hash: adinfo.Hash,
+ dirAddrs: adinfo.DirAddrs,
+ status: adinfo.Status,
+ lost: adinfo.Lost,
+ timestamp: time.Unix(adinfo.TimestampNs/1e6, adinfo.TimestampNs%1e6),
}
}