blob: 38d29991188890e0a2c0c9ebedbd414f2e62607b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package watchable
import (
"bytes"
"fmt"
"testing"
"time"
"v.io/v23/services/watch"
"v.io/v23/verror"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/store"
"v.io/x/ref/services/syncbase/vclock"
)
// TestWatchLogBatch tests fetching a batch of log records.
func TestWatchLogBatch(t *testing.T) {
runTest(t, []string{common.RowPrefix, common.CollectionPermsPrefix}, runWatchLogBatchTest)
}
// runWatchLogBatchTest tests fetching a batch of log records.
func runWatchLogBatchTest(t *testing.T, st store.Store) {
// Create a set of batches to fill the log queue.
numTx, numPut := 3, 4
makeKeyVal := func(batchNum, recNum int) ([]byte, []byte) {
key := common.JoinKeyParts(common.RowPrefix, fmt.Sprintf("foo-%d-%d", batchNum, recNum))
val := fmt.Sprintf("val-%d-%d", batchNum, recNum)
return []byte(key), []byte(val)
}
for i := 0; i < numTx; i++ {
tx := st.NewTransaction()
for j := 0; j < numPut; j++ {
key, val := makeKeyVal(i, j)
if err := tx.Put(key, val); err != nil {
t.Errorf("cannot put %s (%s): %v", key, val, err)
}
}
tx.Commit()
}
// Fetch the batches and a few more empty fetches and verify them.
var seq uint64 = 0
var wantSeq uint64 = 0
for i := 0; i < (numTx + 3); i++ {
logs, newSeq, err := readBatchFromLog(st, seq)
if err != nil {
t.Fatalf("can't get watch log batch: %v", err)
}
if i < numTx {
if len(logs) != numPut {
t.Errorf("log fetch (i=%d) wrong log seq: %d instead of %d",
i, len(logs), numPut)
}
wantSeq += uint64(len(logs))
if newSeq != wantSeq {
t.Errorf("log fetch (i=%d) wrong seq: %d instead of %d",
i, newSeq, wantSeq)
}
for j, log := range logs {
var op PutOp
if err := log.Op.ToValue(&op); err != nil {
t.Fatalf("ToValue failed: %v", err)
}
expKey, expVal := makeKeyVal(i, j)
key := op.Key
if !bytes.Equal(key, expKey) {
t.Errorf("log fetch (i=%d, j=%d) bad key: %s instead of %s",
i, j, key, expKey)
}
tx := st.NewTransaction()
var val []byte
val, err := GetAtVersion(nil, tx, key, val, op.Version)
if err != nil {
t.Errorf("log fetch (i=%d, j=%d) cannot GetAtVersion(): %v", i, j, err)
}
if !bytes.Equal(val, expVal) {
t.Errorf("log fetch (i=%d, j=%d) bad value: %s instead of %s",
i, j, val, expVal)
}
tx.Abort()
}
} else {
if logs != nil || newSeq != seq {
t.Errorf("NOP log fetch (i=%d) had changes: %d logs, seq %d",
i, len(logs), newSeq)
}
}
seq = newSeq
}
}
// TestBroadcastUpdates tests broadcasting updates to watch clients.
func TestBroadcastUpdates(t *testing.T) {
w := newWatcher(0)
// Update broadcast should never block. It should be safe to call with no
// clients registered.
w.broadcastUpdates()
w.broadcastUpdates()
// Never-receiving client should not block watcher.
_, cancel1 := w.watchUpdates(0)
defer cancel1()
// Cancelled client should not affect watcher.
c2, cancel2 := w.watchUpdates(0)
cancel2()
// Cancel should be idempotent.
cancel2()
// Channel should be closed when client is cancelled.
select {
case _, ok := <-c2.Wait():
if ok {
t.Fatalf("cancel2 was called, c2 channel should be drained and closed")
}
default:
t.Fatalf("cancel2 was called, c2 channel should be closed")
}
if verror.ErrorID(c2.Err()) != verror.ErrCanceled.ID {
t.Fatalf("expected c2.Err() to return ErrCanceled, got: %v", c2.Err())
}
// Update broadcast should not block client registration or vice versa.
timer1 := time.After(10 * time.Second)
registerLoop1 := make(chan bool)
go func() {
for i := 0; i < 5000; i++ {
_, canceli := w.watchUpdates(0)
defer canceli()
}
registerLoop1 <- true
}()
c3, cancel3 := w.watchUpdates(0)
for i := 0; i < 5000; i++ {
w.broadcastUpdates()
}
select {
case <-registerLoop1:
// ok
case <-timer1:
t.Fatalf("registerLoop1 didn't finish after 10s")
}
// Wait for broadcast to fully propagate out of watcherLoop.
time.Sleep(1 * time.Second)
// chan3 should have a single pending notification.
select {
case _, ok := <-c3.Wait():
if !ok {
t.Fatalf("c3 channel should not be closed")
}
default:
t.Fatalf("c3 channel should have a notification")
}
select {
case <-c3.Wait():
t.Fatalf("c3 channel should not have another notification")
default:
// ok
}
if c3.Err() != nil {
t.Fatalf("expected c3.Err() to return nil, got: %v", c3.Err())
}
// After notification was read, chan3 still receives updates.
go func() {
time.Sleep(500 * time.Millisecond)
w.broadcastUpdates()
}()
select {
case _, ok := <-c3.Wait():
if !ok {
t.Fatalf("c3 channel should not be closed")
}
case <-time.After(5 * time.Second):
t.Fatalf("c3 channel didn't receive after 5s")
}
if c3.Err() != nil {
t.Fatalf("expected c3.Err() to return nil, got: %v", c3.Err())
}
// Closing the watcher.
w.close()
// Close should be idempotent.
w.close()
// Client channels should be closed when watcher is closed.
select {
case _, ok := <-c3.Wait():
if ok {
t.Fatalf("watcher was closed, c3 channel should be drained and closed")
}
default:
t.Fatalf("watcher was closed, c3 channel should be closed")
}
if verror.ErrorID(c3.Err()) != verror.ErrAborted.ID {
t.Fatalf("expected c3.Err() to return ErrAborted, got: %v", c3.Err())
}
// Cancel is safe to call after the store is closed.
cancel3()
// ErrAborted should be preserved instead of being overridden by ErrCanceled.
if verror.ErrorID(c3.Err()) != verror.ErrAborted.ID {
t.Fatalf("expected c3.Err() to return ErrAborted, got: %v", c3.Err())
}
// watchUpdates is safe to call after the store is closed, returning closed
// channel.
c4, cancel4 := w.watchUpdates(0)
select {
case _, ok := <-c4.Wait():
if ok {
t.Fatalf("watcher was closed, c4 channel should be drained and closed")
}
default:
t.Fatalf("watcher was closed, c4 channel should be closed")
}
if verror.ErrorID(c4.Err()) != verror.ErrAborted.ID {
t.Fatalf("expected c4.Err() to return ErrAborted, got: %v", c4.Err())
}
cancel4()
// broadcastUpdates is safe to call after the store is closed, although it
// logs an error message.
w.broadcastUpdates()
}
// TestUpdateLogStart tests that UpdateLogStart moves the log start up to the
// earliest active watcher's seq, including syncSeq, and that NextBatchFromLog
// correctly iterates over the log.
func TestUpdateLogStart(t *testing.T) {
ist, destroy := createStore()
defer destroy()
st, err := Wrap(ist, vclock.NewVClockForTests(nil), &Options{ManagedPrefixes: []string{common.RowPrefix, common.CollectionPermsPrefix}})
if err != nil {
t.Fatal(err)
}
// UpdateLogStart(0) should work on an empty log.
updateLogStart(t, st, 0, 0)
putBatch(t, st, 0, 3)
putBatch(t, st, 3, 1)
// UpdateLogStart(3) with no active watchers should move the log start to 3.
updateLogStart(t, st, 3, 3)
putBatch(t, st, 4, 5)
// UpdateLogStart should be idempotent if called with the same value.
updateLogStart(t, st, 3, 3)
// Start watchers @3 and @9.
w1, w1cancel := st.WatchUpdates(MakeResumeMarker(3))
w2, _ := st.WatchUpdates(MakeResumeMarker(9))
// Watching from 0 should fail because log start has moved to 3.
if wFail, _ := st.WatchUpdates(MakeResumeMarker(0)); verror.ErrorID(wFail.Err()) != watch.ErrUnknownResumeMarker.ID {
t.Fatalf("WatchUpdates with old resume marker should have failed with ErrUnknownResumeMarker, got: %v", err)
}
putBatch(t, st, 9, 1)
// UpdateLogStart(9): sync @9, w1 @3, w2 @9 => log start stays at 3.
updateLogStart(t, st, 9, 3)
// w1 still @3, next @4.
expectBatch(t, st, w1, 3, 1)
// UpdateLogStart(9): sync @9, w1 @3, w2 @9 => log start stays at 3.
updateLogStart(t, st, 9, 3)
// w1 @4, next @9.
expectBatch(t, st, w1, 4, 5)
// UpdateLogStart(9): sync @9, w1 @4, w2 @9 => log start moves to 4.
updateLogStart(t, st, 9, 4)
// UpdateLogStart requires the sync resmark to monotonically increase.
if _, err := st.UpdateLogStart(MakeResumeMarker(3)); err == nil {
t.Fatalf("UpdateLogStart should fail when sync resume marker is smaller than log start")
}
// Above UpdateLogStart with an invalid sync resmark should have had no effect.
updateLogStart(t, st, 9, 4)
putBatch(t, st, 10, 2)
// w1 @9, next @10.
expectBatch(t, st, w1, 9, 1)
// w1 @10, next @12.
expectBatch(t, st, w1, 10, 2)
// UpdateLogStart(9): sync @9, w1 @10, w2 @9 => log start stays at 9.
updateLogStart(t, st, 9, 9)
// UpdateLogStart(10): sync @10, w1 @10, w2 @9 => log start stays at 9.
updateLogStart(t, st, 10, 9)
// w2 still @9, next @10.
expectBatch(t, st, w2, 9, 1)
// w2 @10, next @12.
expectBatch(t, st, w2, 10, 2)
// w2 @12, nothing to read, next @12.
expectBatch(t, st, w2, 12, 0)
// UpdateLogStart(12): sync @12, w1 @10, w2 @12 => log start moves to 10.
updateLogStart(t, st, 12, 10)
putBatch(t, st, 12, 5)
// w2 still @12, next @17.
expectBatch(t, st, w2, 12, 5)
// w2 @17, nothing to read, next @17.
expectBatch(t, st, w2, 17, 0)
putBatch(t, st, 17, 1)
// UpdateLogStart(12): sync @12, w1 @10, w2 @17 => log start stays at 10.
updateLogStart(t, st, 12, 10)
// Stop w1.
w1cancel()
if _, _, err := w1.NextBatchFromLog(st); verror.ErrorID(err) != verror.ErrCanceled.ID {
t.Fatalf("NextBatchFromLog after watcher cancel should have failed with ErrCanceled, got: %v", err)
}
// UpdateLogStart(12): sync @12, w2 @17 => log start moves to 12.
updateLogStart(t, st, 12, 12)
// UpdateLogStart(18): sync @18, w2 @17 => log start moves to 17.
updateLogStart(t, st, 18, 17)
// Simulate store close and reopen by closing the watcher (which stops w2) and
// rewrapping ist.
st.watcher.close()
if _, _, err := w2.NextBatchFromLog(st); verror.ErrorID(err) != verror.ErrAborted.ID {
t.Fatalf("NextBatchFromLog after watcher closure should have failed with ErrAborted, got: %v", err)
}
if _, err := st.UpdateLogStart(MakeResumeMarker(18)); verror.ErrorID(err) != verror.ErrAborted.ID {
t.Fatalf("UpdateLogStart on closed watcher should have failed with ErrAborted, got: %v", err)
}
st, err = Wrap(ist, vclock.NewVClockForTests(nil), &Options{ManagedPrefixes: []string{common.RowPrefix, common.CollectionPermsPrefix}})
if err != nil {
t.Fatal(err)
}
// Before starting watchers, test sync resume marker sanity check.
if _, err := st.UpdateLogStart(MakeResumeMarker(42)); err == nil {
t.Fatalf("UpdateLogStart should fail when sync resume marker is greater than seq")
}
putBatch(t, st, 18, 2)
putBatch(t, st, 20, 5)
putBatch(t, st, 25, 2)
// UpdateLogStart(20) with no active watchers should move the log start to 20.
updateLogStart(t, st, 20, 20)
// Watching from 18 should fail because log start has moved to 20.
if wFail, _ := st.WatchUpdates(MakeResumeMarker(18)); verror.ErrorID(wFail.Err()) != watch.ErrUnknownResumeMarker.ID {
t.Fatalf("WatchUpdates with old resume marker should have failed with ErrUnknownResumeMarker, got: %v", err)
}
// Start watcher @20.
w3, w3cancel := st.WatchUpdates(MakeResumeMarker(20))
// Watching from 42 should fail because log end hasn't reached 42 yet.
if wFail, _ := st.WatchUpdates(MakeResumeMarker(42)); verror.ErrorID(wFail.Err()) != watch.ErrUnknownResumeMarker.ID {
t.Fatalf("WatchUpdates with resume marker greater than seq should have failed with ErrUnknownResumeMarker, got: %v", err)
}
// w3 still @20, next @25.
expectBatch(t, st, w3, 20, 5)
// UpdateLogStart(27): sync @27, w3 @20 => log start stays at 20.
updateLogStart(t, st, 27, 20)
// w3 @25, next @27.
expectBatch(t, st, w3, 25, 2)
// UpdateLogStart(27): sync @27, w3 @25 => log start moves to 25.
updateLogStart(t, st, 27, 25)
// Stop w3.
w3cancel()
// UpdateLogStart(27) with no active watchers should move the log start to 27.
updateLogStart(t, st, 27, 27)
}
// putBatch puts a batch with size changes, asserting that it starts at wantSeq.
func putBatch(t *testing.T, st *Store, wantSeq, size uint64) {
seq := st.getSeq()
if seq != wantSeq {
t.Errorf("putBatch %d: unexpected seq before: %d, want %d", wantSeq, seq, wantSeq)
}
if err := RunInTransaction(st, func(tx *Transaction) error {
for i := uint64(0); i < size; i++ {
if err := tx.Put([]byte(dummyRowKey(seq+i)), []byte("value")); err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatalf("putBatch %d: RunInTransaction failed: %v", wantSeq, err)
}
if got, want := st.getSeq(), seq+size; got != want {
t.Fatalf("putBatch %d: unexpected seq after: %d, want %d", wantSeq, got, want)
}
}
// expectBatch reads the next batch for the watcher using NextBatchFromLog,
// asserting that it starts at wantSeq and has wantSize changes.
func expectBatch(t *testing.T, st *Store, watcher *Client, wantSeq, wantSize uint64) {
gotLogs, gotRm, err := watcher.NextBatchFromLog(st)
if err != nil {
t.Fatalf("expectBatch %d: NextBatchFromLog failed: %v", wantSeq, err)
}
if got, want := uint64(len(gotLogs)), wantSize; got != want {
t.Errorf("expectBatch %d: expected %d logs, got %d", wantSeq, want, got)
}
if len(gotLogs) > 0 {
if endSeq, err := parseResumeMarker(string(gotRm)); err != nil {
t.Fatalf("expectBatch %d: got invalid resume marker %q: %v", wantSeq, gotRm, err)
} else if got, want := endSeq-uint64(len(gotLogs)), wantSeq; got != want {
t.Errorf("expectBatch %d: expected logs starting from seq %d, got %d", wantSeq, want, got)
}
}
}
// updateLogStart calls UpdateLogStart with syncSeq, asserting that the updated
// log start, both returned and persisted, matches wantSeq.
func updateLogStart(t *testing.T, st *Store, syncSeq, wantSeq uint64) {
gotResMark, err := st.UpdateLogStart(MakeResumeMarker(syncSeq))
if err != nil {
t.Fatalf("UpdateLogStart %d: failed: %v", syncSeq, err)
}
gotSeq, err := parseResumeMarker(string(gotResMark))
if err != nil {
t.Fatalf("UpdateLogStart %d: returned invalid resume marker %q: %v", syncSeq, gotResMark, err)
}
if gotSeq != wantSeq {
t.Errorf("UpdateLogStart %d: expected updated seq %d, got %d", syncSeq, wantSeq, gotSeq)
}
if readSeq, err := getLogStartSeq(st); err != nil {
t.Fatalf("UpdateLogStart %d: getLogStartSeq failed: %v", syncSeq, err)
} else if readSeq != wantSeq {
t.Errorf("UpdateLogStart %d: expected persisted seq %d, got %d", syncSeq, wantSeq, readSeq)
}
}
func dummyRowKey(seq uint64) string {
return common.JoinKeyParts(common.RowPrefix, "u,c", fmt.Sprintf("r%08d", seq))
}