watchable store: extract func that gets next log seq
Extracting the function that returns the next sequence number to be
used for a new commit. The function is optimized with a binary search.
The same function will be used to implement the GetResumeMarker().
Change-Id: I89fd5899f82e0e25e666873ee8a79a1d095e82bb
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 82fc8d4..2b0037a 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -24,13 +24,11 @@
import (
"fmt"
- "strconv"
"strings"
"sync"
pubutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/clock"
- "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
)
@@ -48,40 +46,16 @@
// Wrap returns a watchable.Store that wraps the given store.Store.
func Wrap(st store.Store, opts *Options) (Store, error) {
- // Determine initial value for seq.
- // TODO(sadovsky): Consider using a bigger seq.
- var seq uint64 = 0
- // TODO(sadovsky): Perform a binary search to determine seq, or persist the
- // current sequence number on every nth write so that at startup time we can
- // start our scan from there instead scanning over all log entries just to
- // find the latest one.
- it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
- advanced := false
- keybuf := []byte{}
- for it.Advance() {
- advanced = true
- keybuf = it.Key(keybuf)
- }
- if err := it.Err(); err != nil {
+ seq, err := getNextLogSeq(st)
+ if err != nil {
return nil, err
}
- if advanced {
- key := string(keybuf)
- parts := split(key)
- if len(parts) != 2 {
- panic("wrong number of parts: " + key)
- }
- var err error
- seq, err = strconv.ParseUint(parts[1], 10, 64)
- // Current value of seq points to the last transaction committed.
- // Increment this value by 1.
- seq++
- if err != nil {
- panic("failed to parse seq: " + key)
- }
- }
- vclock := clock.NewVClock()
- return &wstore{ist: st, opts: opts, seq: seq, clock: vclock}, nil
+ return &wstore{
+ ist: st,
+ opts: opts,
+ seq: seq,
+ clock: clock.NewVClock(),
+ }, nil
}
type wstore struct {
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 12f5e1f..93b6b33 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -301,10 +301,3 @@
wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: cp(key), Version: cp(version)}})
return nil
}
-
-// Exported as a helper function for testing purposes.
-func getLogEntryKey(seq uint64) string {
- // Note: MaxUint64 is 0xffffffffffffffff.
- // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
- return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
-}
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index a21057f..3d08129 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -15,6 +15,7 @@
import (
"fmt"
"math/rand"
+ "strconv"
"sync"
"time"
@@ -89,6 +90,71 @@
}
}
+func getLogEntryKey(seq uint64) string {
+ // Note: MaxUint64 is 0xffffffffffffffff.
+ // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+ return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
+}
+
+// logEntryExists returns true iff the log contains an entry with the given
+// sequence number.
+func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
+ _, err := st.Get([]byte(getLogEntryKey(seq)), nil)
+ if err != nil && verror.ErrorID(err) != store.ErrUnknownKey.ID {
+ return false, err
+ }
+ return err == nil, nil
+}
+
+// getNextLogSeq returns the next sequence number to be used for a new commit.
+// NOTE: this function assumes that all sequence numbers in the log represent
+// some range [start, limit] without gaps.
+func getNextLogSeq(st store.StoreReader) (uint64, error) {
+ // Determine initial value for seq.
+ // TODO(sadovsky): Consider using a bigger seq.
+
+ // Find the beginning of the log.
+ it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
+ if !it.Advance() {
+ return 0, nil
+ }
+ if it.Err() != nil {
+ return 0, it.Err()
+ }
+ key := string(it.Key(nil))
+ parts := split(key)
+ if len(parts) != 2 {
+ panic("wrong number of parts: " + key)
+ }
+ seq, err := strconv.ParseUint(parts[1], 10, 64)
+ if err != nil {
+ panic("failed to parse seq: " + key)
+ }
+ var step uint64 = 1
+ // Suppose the actual value we are looking for is S. First, we estimate the
+ // range for S. We find seq, step: seq < S <= seq + step.
+ for {
+ if ok, err := logEntryExists(st, seq+step); err != nil {
+ return 0, err
+ } else if !ok {
+ break
+ }
+ seq += step
+ step *= 2
+ }
+ // Next we keep the seq < S <= seq + step invariant, reducing step to 1.
+ for step > 1 {
+ step /= 2
+ if ok, err := logEntryExists(st, seq+step); err != nil {
+ return 0, err
+ } else if ok {
+ seq += step
+ }
+ }
+ // Now seq < S <= seq + 1, thus S = seq + 1.
+ return seq + 1, nil
+}
+
func join(parts ...string) string {
return util.JoinKeyParts(parts...)
}
diff --git a/services/syncbase/server/watchable/util_test.go b/services/syncbase/server/watchable/util_test.go
new file mode 100644
index 0000000..f6875f6
--- /dev/null
+++ b/services/syncbase/server/watchable/util_test.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "testing"
+)
+
+// TestGetNextLogSeq tests that the getNextLogSeq helper works on range 0..10.
+func TestGetNextLogSeq(t *testing.T) {
+ st, destroy := createStore()
+ defer destroy()
+ st, err := Wrap(st, &Options{})
+ if err != nil {
+ t.Fatal(err)
+ }
+ for i := uint64(0); i <= uint64(10); i++ {
+ seq, err := getNextLogSeq(st)
+ if err != nil {
+ t.Fatalf("failed to get log seq: %v", err)
+ }
+ if got, want := seq, i; got != want {
+ t.Fatalf("unexpected log seq: got %v, want %v", got, want)
+ }
+ st.Put([]byte(getLogEntryKey(i)), nil)
+ }
+}