watch: refactoring required for the client watch
This change precedes the client watch implementation and contains
refactoring with some minor fixes. The intention is to reuse more
code.
The main parts are:
- v23/syncbase/nosql: rename Stream to ScanStream and extract
common Stream interface from ScanStream and ResultStream
(also WatchStream is coming)
- move Watch methods from server/nosql/database.go to
server/nosql/database_watch.go
- move watch log helpers from watchable/util.go to
watchable/watcher.go
- move the WatchLogBatch() func from vsync/ to watchable/
together with the test
Change-Id: Iaab18a1ec6dd218e85284bd994ed71e766feab26
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 7b9a470..1fe3066 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -25,7 +25,6 @@
"v.io/v23/glob"
"v.io/v23/rpc"
"v.io/v23/security/access"
- "v.io/v23/services/watch"
"v.io/v23/vdl"
"v.io/v23/verror"
"v.io/v23/vom"
@@ -273,7 +272,10 @@
sender.Send(resultHeaders)
for rs.Advance() {
result := rs.Result()
- sender.Send(result)
+ if err := sender.Send(result); err != nil {
+ rs.Cancel()
+ return err
+ }
}
return rs.Err()
}
@@ -321,25 +323,6 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
- // TODO(rogulenko): Implement.
- if !d.exists {
- return verror.New(verror.ErrNoExist, ctx, d.name)
- }
- if d.batchId != nil {
- return wire.NewErrBoundToBatch(ctx)
- }
- return verror.NewErrNotImplemented(ctx)
-}
-
-func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
- // TODO(rogulenko): Implement.
- if !d.exists {
- return nil, verror.New(verror.ErrNoExist, ctx, d.name)
- }
- return nil, verror.NewErrNotImplemented(ctx)
-}
-
func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
if !d.exists {
return verror.New(verror.ErrNoExist, ctx, d.name)
diff --git a/services/syncbase/server/nosql/database_watch.go b/services/syncbase/server/nosql/database_watch.go
new file mode 100644
index 0000000..951444d
--- /dev/null
+++ b/services/syncbase/server/nosql/database_watch.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/services/watch"
+ "v.io/v23/verror"
+)
+
+// WatchGlob implements the nosqlwire.DatabaseWatcher interface.
+func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
+ // TODO(rogulenko): Implement.
+ if !d.exists {
+ return verror.New(verror.ErrNoExist, ctx, d.name)
+ }
+ if d.batchId != nil {
+ return nosqlwire.NewErrBoundToBatch(ctx)
+ }
+ return verror.NewErrNotImplemented(ctx)
+}
+
+// GetResumeMarker implements the nosqlwire.DatabaseWatcher interface.
+func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
+ // TODO(rogulenko): Implement.
+ if !d.exists {
+ return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+ }
+ return nil, verror.NewErrNotImplemented(ctx)
+}
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index a3ed4b7..8cf744e 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -157,7 +157,10 @@
it.Cancel()
return err
}
- sender.Send(wire.KeyValue{Key: externalKey, Value: value})
+ if err := sender.Send(wire.KeyValue{Key: externalKey, Value: value}); err != nil {
+ it.Cancel()
+ return err
+ }
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
diff --git a/services/syncbase/server/watchable/test_util.go b/services/syncbase/server/watchable/test_util.go
index 7db1755..77aab33 100644
--- a/services/syncbase/server/watchable/test_util.go
+++ b/services/syncbase/server/watchable/test_util.go
@@ -91,7 +91,7 @@
}
func newLogEntryReader(st store.Store, seq uint64) *logEntryReader {
- stream := st.Scan([]byte(getLogEntryKey(seq)), []byte(getLogEntryKey(math.MaxUint64)))
+ stream := st.Scan([]byte(logEntryKey(seq)), []byte(logEntryKey(math.MaxUint64)))
return &logEntryReader{stream: stream}
}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index fb82f72..f557ca7 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -142,7 +142,7 @@
timestamp := tx.st.clock.Now().UnixNano()
seq := tx.st.seq
for i, op := range tx.ops {
- key := getLogEntryKey(seq)
+ key := logEntryKey(seq)
value := &LogEntry{
Op: op,
CommitTimestamp: timestamp,
@@ -248,6 +248,8 @@
// StoreReader interface is required since this is a Get operation.
func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
switch w := st.(type) {
+ case *snapshot:
+ return getAtVersion(w.isn, key, valbuf, version)
case *transaction:
w.mu.Lock()
defer w.mu.Unlock()
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index 64f8eeb..8eb606e 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -15,7 +15,6 @@
import (
"fmt"
"math/rand"
- "strconv"
"sync"
"time"
@@ -81,71 +80,6 @@
return tx.Delete(makeVersionKey(key))
}
-func getLogEntryKey(seq uint64) string {
- // Note: MaxUint64 is 0xffffffffffffffff.
- // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
- return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
-}
-
-// logEntryExists returns true iff the log contains an entry with the given
-// sequence number.
-func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
- _, err := st.Get([]byte(getLogEntryKey(seq)), nil)
- if err != nil && verror.ErrorID(err) != store.ErrUnknownKey.ID {
- return false, err
- }
- return err == nil, nil
-}
-
-// getNextLogSeq returns the next sequence number to be used for a new commit.
-// NOTE: this function assumes that all sequence numbers in the log represent
-// some range [start, limit] without gaps.
-func getNextLogSeq(st store.StoreReader) (uint64, error) {
- // Determine initial value for seq.
- // TODO(sadovsky): Consider using a bigger seq.
-
- // Find the beginning of the log.
- it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
- if !it.Advance() {
- return 0, nil
- }
- if it.Err() != nil {
- return 0, it.Err()
- }
- key := string(it.Key(nil))
- parts := split(key)
- if len(parts) != 2 {
- panic("wrong number of parts: " + key)
- }
- seq, err := strconv.ParseUint(parts[1], 10, 64)
- if err != nil {
- panic("failed to parse seq: " + key)
- }
- var step uint64 = 1
- // Suppose the actual value we are looking for is S. First, we estimate the
- // range for S. We find seq, step: seq < S <= seq + step.
- for {
- if ok, err := logEntryExists(st, seq+step); err != nil {
- return 0, err
- } else if !ok {
- break
- }
- seq += step
- step *= 2
- }
- // Next we keep the seq < S <= seq + step invariant, reducing step to 1.
- for step > 1 {
- step /= 2
- if ok, err := logEntryExists(st, seq+step); err != nil {
- return 0, err
- } else if ok {
- seq += step
- }
- }
- // Now seq < S <= seq + 1, thus S = seq + 1.
- return seq + 1, nil
-}
-
func join(parts ...string) string {
return util.JoinKeyParts(parts...)
}
diff --git a/services/syncbase/server/watchable/util_test.go b/services/syncbase/server/watchable/util_test.go
index f6875f6..99c440a 100644
--- a/services/syncbase/server/watchable/util_test.go
+++ b/services/syncbase/server/watchable/util_test.go
@@ -24,6 +24,6 @@
if got, want := seq, i; got != want {
t.Fatalf("unexpected log seq: got %v, want %v", got, want)
}
- st.Put([]byte(getLogEntryKey(i)), nil)
+ st.Put([]byte(logEntryKey(i)), nil)
}
}
diff --git a/services/syncbase/server/watchable/watcher.go b/services/syncbase/server/watchable/watcher.go
new file mode 100644
index 0000000..1209a1e
--- /dev/null
+++ b/services/syncbase/server/watchable/watcher.go
@@ -0,0 +1,138 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "fmt"
+ "strconv"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/services/watch"
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+ "v.io/x/lib/vlog"
+)
+
+// MakeResumeMarker converts a sequence number to the resume marker.
+func MakeResumeMarker(seq uint64) watch.ResumeMarker {
+ return watch.ResumeMarker(logEntryKey(seq))
+}
+
+func logEntryKey(seq uint64) string {
+ // Note: MaxUint64 is 0xffffffffffffffff.
+ // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+ return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
+}
+
+// WatchLogBatch returns a batch of watch log records (a transaction) from
+// the given database and the new resume marker at the end of the batch.
+func WatchLogBatch(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
+ seq, err := parseResumeMarker(string(resumeMarker))
+ if err != nil {
+ return nil, resumeMarker, err
+ }
+ _, scanLimit := util.ScanPrefixArgs(util.LogPrefix, "")
+ scanStart := resumeMarker
+ endOfBatch := false
+
+ // Use the store directly to scan these read-only log entries, no need
+ // to create a snapshot since they are never overwritten. Read and
+ // buffer a batch before processing it.
+ var logs []*LogEntry
+ stream := st.Scan(scanStart, scanLimit)
+ for stream.Advance() {
+ seq++
+ var logEnt LogEntry
+ if err := vom.Decode(stream.Value(nil), &logEnt); err != nil {
+ return nil, resumeMarker, err
+ }
+
+ logs = append(logs, &logEnt)
+
+ // Stop if this is the end of the batch.
+ if logEnt.Continued == false {
+ endOfBatch = true
+ break
+ }
+ }
+
+ if err = stream.Err(); err != nil {
+ return nil, resumeMarker, err
+ }
+ if !endOfBatch {
+ if len(logs) > 0 {
+ vlog.Fatalf("end of batch not found after %d entries", len(logs))
+ }
+ return nil, resumeMarker, nil
+ }
+ return logs, watch.ResumeMarker(logEntryKey(seq)), nil
+}
+
+func parseResumeMarker(resumeMarker string) (uint64, error) {
+ parts := split(resumeMarker)
+ if len(parts) != 2 {
+ return 0, verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)
+ }
+ seq, err := strconv.ParseUint(parts[1], 16, 64)
+ if err != nil {
+ return 0, verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)
+ }
+ return seq, nil
+}
+
+// logEntryExists returns true iff the log contains an entry with the given
+// sequence number.
+func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
+ _, err := st.Get([]byte(logEntryKey(seq)), nil)
+ if err != nil && verror.ErrorID(err) != store.ErrUnknownKey.ID {
+ return false, err
+ }
+ return err == nil, nil
+}
+
+// getNextLogSeq returns the next sequence number to be used for a new commit.
+// NOTE: this function assumes that all sequence numbers in the log represent
+// some range [start, limit] without gaps.
+func getNextLogSeq(st store.StoreReader) (uint64, error) {
+ // Determine initial value for seq.
+ // TODO(sadovsky): Consider using a bigger seq.
+
+ // Find the beginning of the log.
+ it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
+ if !it.Advance() {
+ return 0, nil
+ }
+ if it.Err() != nil {
+ return 0, it.Err()
+ }
+ seq, err := parseResumeMarker(string(it.Key(nil)))
+ if err != nil {
+ return 0, err
+ }
+ var step uint64 = 1
+ // Suppose the actual value we are looking for is S. First, we estimate the
+ // range for S. We find seq, step: seq < S <= seq + step.
+ for {
+ if ok, err := logEntryExists(st, seq+step); err != nil {
+ return 0, err
+ } else if !ok {
+ break
+ }
+ seq += step
+ step *= 2
+ }
+ // Next we keep the seq < S <= seq + step invariant, reducing step to 1.
+ for step > 1 {
+ step /= 2
+ if ok, err := logEntryExists(st, seq+step); err != nil {
+ return 0, err
+ } else if ok {
+ seq += step
+ }
+ }
+ // Now seq < S <= seq + 1, thus S = seq + 1.
+ return seq + 1, nil
+}
diff --git a/services/syncbase/server/watchable/watcher_test.go b/services/syncbase/server/watchable/watcher_test.go
new file mode 100644
index 0000000..246bc65
--- /dev/null
+++ b/services/syncbase/server/watchable/watcher_test.go
@@ -0,0 +1,93 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// TestWatchLogBatch tests fetching a batch of log records.
+func TestWatchLogBatch(t *testing.T) {
+ runTest(t, []string{util.RowPrefix, util.PermsPrefix}, runWatchLogBatchTest)
+}
+
+// runWatchLogBatchTest tests fetching a batch of log records.
+func runWatchLogBatchTest(t *testing.T, st store.Store) {
+ // Create a set of batches to fill the log queue.
+ numTx, numPut := 3, 4
+
+ makeKeyVal := func(batchNum, recNum int) ([]byte, []byte) {
+ key := util.JoinKeyParts(util.RowPrefix, fmt.Sprintf("foo-%d-%d", batchNum, recNum))
+ val := fmt.Sprintf("val-%d-%d", batchNum, recNum)
+ return []byte(key), []byte(val)
+ }
+
+ for i := 0; i < numTx; i++ {
+ tx := st.NewTransaction()
+ for j := 0; j < numPut; j++ {
+ key, val := makeKeyVal(i, j)
+ if err := tx.Put(key, val); err != nil {
+ t.Errorf("cannot put %s (%s): %v", key, val, err)
+ }
+ }
+ tx.Commit()
+ }
+
+ // Fetch the batches and a few more empty fetches and verify them.
+ resmark := MakeResumeMarker(0)
+ var seq uint64
+
+ for i := 0; i < (numTx + 3); i++ {
+ logs, newResmark, err := WatchLogBatch(st, resmark)
+ if err != nil {
+ t.Fatalf("can't get watch log batch: %v", err)
+ }
+ if i < numTx {
+ if len(logs) != numPut {
+ t.Errorf("log fetch (i=%d) wrong log seq: %d instead of %d",
+ i, len(logs), numPut)
+ }
+
+ seq += uint64(len(logs))
+ expResmark := MakeResumeMarker(seq)
+ if !bytes.Equal(newResmark, expResmark) {
+ t.Errorf("log fetch (i=%d) wrong resmark: %s instead of %s",
+ i, newResmark, expResmark)
+ }
+
+ for j, log := range logs {
+ op := log.Op.(OpPut)
+ expKey, expVal := makeKeyVal(i, j)
+ key := op.Value.Key
+ if !bytes.Equal(key, expKey) {
+ t.Errorf("log fetch (i=%d, j=%d) bad key: %s instead of %s",
+ i, j, key, expKey)
+ }
+ tx := st.NewTransaction()
+ var val []byte
+ val, err := GetAtVersion(nil, tx, key, val, op.Value.Version)
+ if err != nil {
+ t.Errorf("log fetch (i=%d, j=%d) cannot GetAtVersion(): %v", i, j, err)
+ }
+ if !bytes.Equal(val, expVal) {
+ t.Errorf("log fetch (i=%d, j=%d) bad value: %s instead of %s",
+ i, j, val, expVal)
+ }
+ tx.Abort()
+ }
+ } else {
+ if logs != nil || !bytes.Equal(newResmark, resmark) {
+ t.Errorf("NOP log fetch (i=%d) had changes: %d logs, resmask %s",
+ i, len(logs), newResmark)
+ }
+ }
+ resmark = newResmark
+ }
+}
diff --git a/services/syncbase/vsync/test_util.go b/services/syncbase/vsync/test_util.go
index e454574..b06b785 100644
--- a/services/syncbase/vsync/test_util.go
+++ b/services/syncbase/vsync/test_util.go
@@ -162,11 +162,6 @@
}
}
-// makeResMark returns the resume marker for a given log entry position.
-func makeResMark(pos int) string {
- return util.JoinKeyParts(util.LogPrefix, fmt.Sprintf("%016x", pos))
-}
-
// makeRowKey returns the database row key for a given application key.
func makeRowKey(key string) string {
return util.JoinKeyParts(util.RowPrefix, key)
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 6d80317..04c3bed 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -22,8 +22,8 @@
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
+ "v.io/v23/services/watch"
"v.io/v23/verror"
- "v.io/v23/vom"
"v.io/x/lib/vlog"
)
@@ -116,14 +116,18 @@
vlog.Errorf("sync: processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
return false
}
- resMark = ""
+ resMark = watchable.MakeResumeMarker(0)
}
// Initialize Database sync state if needed.
s.initDbSyncStateInMem(ctx, appName, dbName)
// Get a batch of watch log entries, if any, after this resume marker.
- if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
+ logs, nextResmark, err := watchable.WatchLogBatch(st, resMark)
+ if err != nil {
+ vlog.Fatalf("sync: processDatabase: %s, %s: cannot get watch log batch: %v", appName, dbName, verror.DebugString(err))
+ }
+ if logs != nil {
s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
return true
}
@@ -134,7 +138,7 @@
// watchable SyncGroup prefixes, uses the prefixes to filter the batch to the
// subset of syncable records, and transactionally applies these updates to the
// sync metadata (DAG & log records) and updates the watch resume marker.
-func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark string) {
+func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark watch.ResumeMarker) {
if len(logs) == 0 {
return
}
@@ -289,68 +293,6 @@
}
}
-// dbLogScanArgs determines the arguments needed to start a new scan from a
-// given resume marker (last log entry read). An empty resume marker is used
-// to begin the scan from the start of the log.
-func dbLogScanArgs(resMark string) ([]byte, []byte) {
- start, limit := util.ScanPrefixArgs(util.LogPrefix, "")
- if resMark != "" {
- // To start just after the current resume marker, augment it by
- // appending an extra byte at the end. Use byte value zero to
- // use the lowest value possible. This works because resume
- // markers have a fixed length and are sorted lexicographically.
- // By creationg a fake longer resume marker that falls between
- // real resume markers, the next scan will start right after
- // where the previous one stopped without missing data.
- start = append([]byte(resMark), 0)
- }
- return start, limit
-}
-
-// getWatchLogBatch returns a batch of watch log records (a transaction) from
-// the given database and the new resume marker at the end of the batch.
-func getWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, resMark string) ([]*watchable.LogEntry, string) {
- scanStart, scanLimit := dbLogScanArgs(resMark)
- endOfBatch := false
- var newResmark string
-
- // Use the store directly to scan these read-only log entries, no need
- // to create a snapshot since they are never overwritten. Read and
- // buffer a batch before processing it.
- var logs []*watchable.LogEntry
- stream := st.Scan(scanStart, scanLimit)
- for stream.Advance() {
- logKey := string(stream.Key(nil))
- var logEnt watchable.LogEntry
- if vom.Decode(stream.Value(nil), &logEnt) != nil {
- vlog.Fatalf("sync: getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
- appName, dbName, logKey, stream.Value(nil))
- }
-
- logs = append(logs, &logEnt)
-
- // Stop if this is the end of the batch.
- if logEnt.Continued == false {
- newResmark = logKey
- endOfBatch = true
- break
- }
- }
-
- if err := stream.Err(); err != nil {
- vlog.Errorf("sync: getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
- return nil, resMark
- }
- if !endOfBatch {
- if len(logs) > 0 {
- vlog.Fatalf("sync: getWatchLogBatch: %s, %s: end of batch not found after %d entries",
- appName, dbName, len(logs))
- }
- return nil, resMark
- }
- return logs, newResmark
-}
-
// convertLogRecord converts a store log entry to a sync log record. It fills
// the previous object version (parent) by fetching its current DAG head if it
// has one. For a delete, it generates a new object version because the store
@@ -479,16 +421,16 @@
}
// setResMark stores the watcher resume marker for a database.
-func setResMark(ctx *context.T, tx store.Transaction, resMark string) error {
+func setResMark(ctx *context.T, tx store.Transaction, resMark watch.ResumeMarker) error {
return util.Put(ctx, tx, resMarkKey(), resMark)
}
// getResMark retrieves the watcher resume marker for a database.
-func getResMark(ctx *context.T, st store.StoreReader) (string, error) {
- var resMark string
+func getResMark(ctx *context.T, st store.StoreReader) (watch.ResumeMarker, error) {
+ var resMark watch.ResumeMarker
key := resMarkKey()
if err := util.Get(ctx, st, key, &resMark); err != nil {
- return NoVersion, err
+ return nil, err
}
return resMark, nil
}
diff --git a/services/syncbase/vsync/watcher_test.go b/services/syncbase/vsync/watcher_test.go
index 66aae8f..c730e77 100644
--- a/services/syncbase/vsync/watcher_test.go
+++ b/services/syncbase/vsync/watcher_test.go
@@ -8,7 +8,6 @@
import (
"bytes"
- "fmt"
"reflect"
"testing"
"time"
@@ -26,11 +25,11 @@
st := svc.St()
resmark, err := getResMark(nil, st)
- if err == nil || resmark != "" {
+ if err == nil || resmark != nil {
t.Errorf("found non-existent resume marker: %s, %v", resmark, err)
}
- wantResmark := "1234567890"
+ wantResmark := watchable.MakeResumeMarker(1234567890)
tx := st.NewTransaction()
if err := setResMark(nil, tx, wantResmark); err != nil {
t.Errorf("cannot set resume marker: %v", err)
@@ -41,7 +40,7 @@
if err != nil {
t.Errorf("cannot get new resume marker: %v", err)
}
- if resmark != wantResmark {
+ if !bytes.Equal(resmark, wantResmark) {
t.Errorf("invalid new resume: got %s instead of %s", resmark, wantResmark)
}
}
@@ -163,7 +162,7 @@
fooxyzKey := makeRowKey("fooxyz")
// Empty logs does not fail.
- s.processWatchLogBatch(nil, app, db, st, nil, "")
+ s.processWatchLogBatch(nil, app, db, st, nil, nil)
// Non-syncable logs.
batch := []*watchable.LogEntry{
@@ -171,10 +170,10 @@
newLog(barKey, "555", false),
}
- resmark := "abcd"
+ resmark := watchable.MakeResumeMarker(1234)
s.processWatchLogBatch(nil, app, db, st, batch, resmark)
- if res, err := getResMark(nil, st); err != nil && res != resmark {
+ if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
}
if ok, err := hasNode(nil, st, fooKey, "123"); err != nil || ok {
@@ -192,10 +191,10 @@
newLog(barKey, "222", false),
}
- resmark = "cdef"
+ resmark = watchable.MakeResumeMarker(3456)
s.processWatchLogBatch(nil, app, db, st, batch, resmark)
- if res, err := getResMark(nil, st); err != nil && res != resmark {
+ if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
}
if head, err := getHead(nil, st, fooKey); err != nil && head != "333" {
@@ -226,10 +225,10 @@
newLog(barKey, "7", false),
}
- resmark = "ghij"
+ resmark = watchable.MakeResumeMarker(7890)
s.processWatchLogBatch(nil, app, db, st, batch, resmark)
- if res, err := getResMark(nil, st); err != nil && res != resmark {
+ if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
}
if head, err := getHead(nil, st, fooKey); err != nil && head != "1" {
@@ -269,10 +268,10 @@
newLog(barKey, "007", false),
}
- resmark = "tuvw"
+ resmark = watchable.MakeResumeMarker(20212223)
s.processWatchLogBatch(nil, app, db, st, batch, resmark)
- if res, err := getResMark(nil, st); err != nil && res != resmark {
+ if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
}
// No changes to "foo".
@@ -316,79 +315,3 @@
t.Errorf("wrong count of batches: got %d instead of 2", count)
}
}
-
-// TestGetWatchLogBatch tests fetching a batch of log records.
-func TestGetWatchLogBatch(t *testing.T) {
- svc := createService(t)
- defer destroyService(t, svc)
- st := svc.St()
-
- // Create a set of batches to fill the log queue.
- numTx, numPut := 3, 4
-
- makeKeyVal := func(batchNum, recNum int) ([]byte, []byte) {
- key := util.JoinKeyParts(util.RowPrefix, fmt.Sprintf("foo-%d-%d", batchNum, recNum))
- val := fmt.Sprintf("val-%d-%d", batchNum, recNum)
- return []byte(key), []byte(val)
- }
-
- for i := 0; i < numTx; i++ {
- tx := st.NewTransaction()
- for j := 0; j < numPut; j++ {
- key, val := makeKeyVal(i, j)
- if err := tx.Put(key, val); err != nil {
- t.Errorf("cannot put %s (%s): %v", key, val, err)
- }
- }
- tx.Commit()
- }
-
- // Fetch the batches and a few more empty fetches and verify them.
- app, db := "mockapp", "mockdb"
- resmark := ""
- count := 0
-
- for i := 0; i < (numTx + 3); i++ {
- logs, newResmark := getWatchLogBatch(nil, app, db, st, resmark)
- if i < numTx {
- if len(logs) != numPut {
- t.Errorf("log fetch (i=%d) wrong log count: %d instead of %d",
- i, len(logs), numPut)
- }
-
- count += len(logs)
- expResmark := makeResMark(count - 1)
- if newResmark != expResmark {
- t.Errorf("log fetch (i=%d) wrong resmark: %s instead of %s",
- i, newResmark, expResmark)
- }
-
- for j, log := range logs {
- op := log.Op.(watchable.OpPut)
- expKey, expVal := makeKeyVal(i, j)
- key := op.Value.Key
- if !bytes.Equal(key, expKey) {
- t.Errorf("log fetch (i=%d, j=%d) bad key: %s instead of %s",
- i, j, key, expKey)
- }
- tx := st.NewTransaction()
- var val []byte
- val, err := watchable.GetAtVersion(nil, tx, key, val, op.Value.Version)
- if err != nil {
- t.Errorf("log fetch (i=%d, j=%d) cannot GetAtVersion(): %v", i, j, err)
- }
- if !bytes.Equal(val, expVal) {
- t.Errorf("log fetch (i=%d, j=%d) bad value: %s instead of %s",
- i, j, val, expVal)
- }
- tx.Abort()
- }
- } else {
- if logs != nil || newResmark != resmark {
- t.Errorf("NOP log fetch (i=%d) had changes: %d logs, resmask %s",
- i, len(logs), newResmark)
- }
- }
- resmark = newResmark
- }
-}