watchable store: extract func that gets next log seq

Extracting the function that returns the next sequence number to be
used for a new commit. The function is optimized with a binary search.
The same function will be used to implement the GetResumeMarker().

Change-Id: I89fd5899f82e0e25e666873ee8a79a1d095e82bb
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 82fc8d4..2b0037a 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -24,13 +24,11 @@
 
 import (
 	"fmt"
-	"strconv"
 	"strings"
 	"sync"
 
 	pubutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/clock"
-	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
@@ -48,40 +46,16 @@
 
 // Wrap returns a watchable.Store that wraps the given store.Store.
 func Wrap(st store.Store, opts *Options) (Store, error) {
-	// Determine initial value for seq.
-	// TODO(sadovsky): Consider using a bigger seq.
-	var seq uint64 = 0
-	// TODO(sadovsky): Perform a binary search to determine seq, or persist the
-	// current sequence number on every nth write so that at startup time we can
-	// start our scan from there instead scanning over all log entries just to
-	// find the latest one.
-	it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
-	advanced := false
-	keybuf := []byte{}
-	for it.Advance() {
-		advanced = true
-		keybuf = it.Key(keybuf)
-	}
-	if err := it.Err(); err != nil {
+	seq, err := getNextLogSeq(st)
+	if err != nil {
 		return nil, err
 	}
-	if advanced {
-		key := string(keybuf)
-		parts := split(key)
-		if len(parts) != 2 {
-			panic("wrong number of parts: " + key)
-		}
-		var err error
-		seq, err = strconv.ParseUint(parts[1], 10, 64)
-		// Current value of seq points to the last transaction committed.
-		// Increment this value by 1.
-		seq++
-		if err != nil {
-			panic("failed to parse seq: " + key)
-		}
-	}
-	vclock := clock.NewVClock()
-	return &wstore{ist: st, opts: opts, seq: seq, clock: vclock}, nil
+	return &wstore{
+		ist:   st,
+		opts:  opts,
+		seq:   seq,
+		clock: clock.NewVClock(),
+	}, nil
 }
 
 type wstore struct {
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 12f5e1f..93b6b33 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -301,10 +301,3 @@
 	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: cp(key), Version: cp(version)}})
 	return nil
 }
-
-// Exported as a helper function for testing purposes.
-func getLogEntryKey(seq uint64) string {
-	// Note: MaxUint64 is 0xffffffffffffffff.
-	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
-	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
-}
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index a21057f..3d08129 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -15,6 +15,7 @@
 import (
 	"fmt"
 	"math/rand"
+	"strconv"
 	"sync"
 	"time"
 
@@ -89,6 +90,71 @@
 	}
 }
 
+func getLogEntryKey(seq uint64) string {
+	// Note: MaxUint64 is 0xffffffffffffffff.
+	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
+}
+
+// logEntryExists returns true iff the log contains an entry with the given
+// sequence number.
+func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
+	_, err := st.Get([]byte(getLogEntryKey(seq)), nil)
+	if err != nil && verror.ErrorID(err) != store.ErrUnknownKey.ID {
+		return false, err
+	}
+	return err == nil, nil
+}
+
+// getNextLogSeq returns the next sequence number to be used for a new commit.
+// NOTE: this function assumes that all sequence numbers in the log represent
+// some range [start, limit] without gaps.
+func getNextLogSeq(st store.StoreReader) (uint64, error) {
+	// Determine initial value for seq.
+	// TODO(sadovsky): Consider using a bigger seq.
+
+	// Find the beginning of the log.
+	it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
+	if !it.Advance() {
+		return 0, nil
+	}
+	if it.Err() != nil {
+		return 0, it.Err()
+	}
+	key := string(it.Key(nil))
+	parts := split(key)
+	if len(parts) != 2 {
+		panic("wrong number of parts: " + key)
+	}
+	seq, err := strconv.ParseUint(parts[1], 10, 64)
+	if err != nil {
+		panic("failed to parse seq: " + key)
+	}
+	var step uint64 = 1
+	// Suppose the actual value we are looking for is S. First, we estimate the
+	// range for S. We find seq, step: seq < S <= seq + step.
+	for {
+		if ok, err := logEntryExists(st, seq+step); err != nil {
+			return 0, err
+		} else if !ok {
+			break
+		}
+		seq += step
+		step *= 2
+	}
+	// Next we keep the seq < S <= seq + step invariant, reducing step to 1.
+	for step > 1 {
+		step /= 2
+		if ok, err := logEntryExists(st, seq+step); err != nil {
+			return 0, err
+		} else if ok {
+			seq += step
+		}
+	}
+	// Now seq < S <= seq + 1, thus S = seq + 1.
+	return seq + 1, nil
+}
+
 func join(parts ...string) string {
 	return util.JoinKeyParts(parts...)
 }
diff --git a/x/ref/services/syncbase/server/watchable/util_test.go b/x/ref/services/syncbase/server/watchable/util_test.go
new file mode 100644
index 0000000..f6875f6
--- /dev/null
+++ b/x/ref/services/syncbase/server/watchable/util_test.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"testing"
+)
+
+// TestGetNextLogSeq tests that the getNextLogSeq helper works on range 0..10.
+func TestGetNextLogSeq(t *testing.T) {
+	st, destroy := createStore()
+	defer destroy()
+	st, err := Wrap(st, &Options{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	for i := uint64(0); i <= uint64(10); i++ {
+		seq, err := getNextLogSeq(st)
+		if err != nil {
+			t.Fatalf("failed to get log seq: %v", err)
+		}
+		if got, want := seq, i; got != want {
+			t.Fatalf("unexpected log seq: got %v, want %v", got, want)
+		}
+		st.Put([]byte(getLogEntryKey(i)), nil)
+	}
+}