syncbase: store log entries chronologically
This change makes it easier for clients to iterate over log entries
chronologically, which is the common case (both for client watch and for sync
watch). Originally I used reverse chronological order because it was more
efficient for determining the most recent sequence number (done once, at
startup time) - but that's not what we should optimize for.
Change-Id: I84879054ed5617f23f8d717e0e363aee12ea2873
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 5a44625..c042cbe 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -40,7 +40,7 @@
}
// TODO(sadovsky): Make storage engine pluggable.
st, err := watchable.Wrap(memstore.New(), &watchable.Options{
- ManagedPrefixes: []string{},
+ ManagedPrefixes: []string{util.RowPrefix},
})
if err != nil {
return nil, err
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index ed8e15f..3e0e7d3 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -8,15 +8,11 @@
// for the implementation of client-facing watch as well as the sync module's
// internal watching of store updates.
//
-// Log entries are keyed in reverse chronological order. More specifically, the
-// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
-// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
-// the lexicographic order matches the numeric order. Thus, clients implementing
-// ResumeMarkers (i.e. implementing the watch API) should use
-// fmt.Sprintf("%016x", MaxUint64-marker) to convert external markers to
-// internal LogEntry key prefixes.
+// LogEntry records are stored chronologically, using keys of the form
+// "$log:<seq>:<txSeq>". Sequence numbers are zero-padded to ensure that the
+// lexicographic order matches the numeric order.
//
-// Version number records are stored with keys of the form "$version:<key>",
+// Version number records are stored using keys of the form "$version:<key>",
// where <key> is the client-specified key.
package watchable
@@ -35,12 +31,6 @@
pubutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
-)
-
-const (
- MaxUint16 uint64 = 1<<16 - 1 // 0xffff
- MaxUint64 uint64 = 1<<64 - 1 // 0xffffffffffffffff
)
// Store is a store.Store that provides versioned storage and a watchable oplog.
@@ -59,22 +49,31 @@
func Wrap(st store.Store, opts *Options) (Store, error) {
// Determine initial value for seq.
var seq uint64 = 0
- it := st.Scan([]byte(util.LogPrefix), []byte(""))
+ // TODO(sadovsky): Perform a binary search to determine seq, or persist the
+ // current sequence number on every nth write so that at startup time we can
+ // start our scan from there instead scanning over all log entries just to
+ // find the latest one.
+ it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
+ advanced := false
+ keybuf := []byte{}
for it.Advance() {
- key := string(it.Key(nil))
+ advanced = true
+ keybuf = it.Key(keybuf)
+ }
+ if err := it.Err(); err != nil {
+ return nil, err
+ }
+ if advanced {
+ key := string(keybuf)
parts := split(key)
if len(parts) != 3 {
panic("wrong number of parts: " + key)
}
- invSeq, err := strconv.ParseUint(parts[1], 10, 64)
+ var err error
+ seq, err = strconv.ParseUint(parts[1], 10, 64)
if err != nil {
- panic("failed to parse invSeq: " + key)
+ panic("failed to parse seq: " + key)
}
- seq = MaxUint64 - invSeq
- it.Cancel()
- }
- if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
- return nil, err
}
return &wstore{ist: st, opts: opts, seq: seq}, nil
}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 17c899f..1be1b85 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "math"
"sync"
"v.io/syncbase/x/ref/services/syncbase/server/util"
@@ -109,17 +110,18 @@
tx.st.mu.Lock()
defer tx.st.mu.Unlock()
// Check sequence numbers.
- if uint64(len(tx.ops)) > MaxUint16 {
+ if uint64(len(tx.ops)) > math.MaxUint16 {
return verror.New(verror.ErrInternal, nil, "too many ops")
}
- if tx.st.seq == MaxUint64 {
+ if tx.st.seq == math.MaxUint64 {
return verror.New(verror.ErrInternal, nil, "seq maxed out")
}
// Write LogEntry records.
+ // Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
- keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", MaxUint64-tx.st.seq))
+ keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", tx.st.seq))
for txSeq, op := range tx.ops {
- key := join(keyPrefix, fmt.Sprintf("%04x", MaxUint16-uint64(txSeq)))
+ key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
value := &LogEntry{
Op: op,
// TODO(sadovsky): This information is also captured in LogEntry keys.
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 4e95c12..847fcb5 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -32,19 +32,20 @@
////////////////////////////////////////////////////////////
// SyncGroup management internal to Syncbase.
-// memberView holds an aggregated view of all SyncGroup members across databases.
-// The view is not coherent, it gets refreshed according to a configured TTL and
-// not (coherently) when SyncGroup membership is updated in the various databases.
-// It is needed by the sync Initiator, which must select a peer to contact from a
-// global view of all SyncGroup members gathered from all databases. This is why
-// a slightly stale view is acceptable.
+// memberView holds an aggregated view of all SyncGroup members across
+// databases. The view is not coherent, it gets refreshed according to a
+// configured TTL and not (coherently) when SyncGroup membership is updated in
+// the various databases. It is needed by the sync Initiator, which must select
+// a peer to contact from a global view of all SyncGroup members gathered from
+// all databases. This is why a slightly stale view is acceptable.
// The members are identified by their Vanadium names (map keys).
type memberView struct {
expiration time.Time
members map[string]*memberInfo
}
-// memberInfo holds the member metadata for each SyncGroup this member belongs to.
+// memberInfo holds the member metadata for each SyncGroup this member belongs
+// to.
type memberInfo struct {
gid2info map[GroupId]nosql.SyncGroupMemberInfo
}
@@ -148,14 +149,15 @@
return delSyncGroupById(ctx, tx, gid)
}
-// refreshMembersIfExpired updates the aggregate view of SyncGroup members across
-// databases if the view has expired.
+// refreshMembersIfExpired updates the aggregate view of SyncGroup members
+// across databases if the view has expired.
// TODO(rdaoud): track dirty apps/dbs since the last refresh and incrementally
// update the membership view for them instead of always scanning all of them.
func (s *syncService) refreshMembersIfExpired(ctx *context.T) {
view := s.allMembers
if view == nil {
- // The empty expiration time in Go is before "now" and treated as expired below.
+ // The empty expiration time in Go is before "now" and treated as expired
+ // below.
view = &memberView{expiration: time.Time{}, members: make(map[string]*memberInfo)}
s.allMembers = view
}
@@ -200,7 +202,8 @@
view.expiration = time.Now().Add(memberViewTTL)
}
-// getMembers returns all SyncGroup members and the count of SyncGroups each one joined.
+// getMembers returns all SyncGroup members and the count of SyncGroups each one
+// joined.
func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
s.refreshMembersIfExpired(ctx)
@@ -212,7 +215,8 @@
return members
}
-// Low-level utility functions to access DB entries without tracking their relationships.
+// Low-level utility functions to access DB entries without tracking their
+// relationships.
// Use the functions above to manipulate SyncGroups.
// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index 4f6d285..4c6f852 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -12,12 +12,14 @@
)
// forEachDatabaseStore iterates over all Databases in all Apps within the
-// service and invokes the callback function provided on each database.
-// The callback returns a "done" flag to make storeIter() stop the iteration
-// earlier, otherwise the function loops across all databases of all apps.
+// service and invokes the provided callback function on each database. The
+// callback returns a "done" flag to make forEachDatabaseStore() stop the
+// iteration earlier; otherwise the function loops across all databases of all
+// apps.
func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(store.Store) bool) {
// Get the apps and iterate over them.
- // TODO(rdaoud): use a "privileged call" parameter instead of nil.
+ // TODO(rdaoud): use a "privileged call" parameter instead of nil (here and
+ // elsewhere).
appNames, err := s.sv.AppNames(ctx, nil)
if err != nil {
return
@@ -25,12 +27,10 @@
for _, a := range appNames {
// For each app, get its databases and iterate over them.
- // TODO(rdaoud): use a "privileged call" parameter instead of nil.
app, err := s.sv.App(ctx, nil, a)
if err != nil {
continue
}
- // TODO(rdaoud): use a "privileged call" parameter instead of nil.
dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
if err != nil {
continue
@@ -38,7 +38,6 @@
for _, d := range dbNames {
// For each database, get its Store and invoke the callback.
- // TODO(rdaoud): use a "privileged call" parameter instead of nil.
db, err := app.NoSQLDatabase(ctx, nil, d)
if err != nil {
continue