blob: 86985aecf4d6731ec24761e157f68a828f936ed9 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package watchable
import (
"bytes"
"fmt"
"testing"
"time"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/store"
)
// TestWatchLogBatch tests fetching a batch of log records.
func TestWatchLogBatch(t *testing.T) {
runTest(t, []string{common.RowPrefix, common.CollectionPermsPrefix}, runWatchLogBatchTest)
}
// runWatchLogBatchTest tests fetching a batch of log records.
func runWatchLogBatchTest(t *testing.T, st store.Store) {
// Create a set of batches to fill the log queue.
numTx, numPut := 3, 4
makeKeyVal := func(batchNum, recNum int) ([]byte, []byte) {
key := common.JoinKeyParts(common.RowPrefix, fmt.Sprintf("foo-%d-%d", batchNum, recNum))
val := fmt.Sprintf("val-%d-%d", batchNum, recNum)
return []byte(key), []byte(val)
}
for i := 0; i < numTx; i++ {
tx := st.NewTransaction()
for j := 0; j < numPut; j++ {
key, val := makeKeyVal(i, j)
if err := tx.Put(key, val); err != nil {
t.Errorf("cannot put %s (%s): %v", key, val, err)
}
}
tx.Commit()
}
// Fetch the batches and a few more empty fetches and verify them.
resmark := MakeResumeMarker(0)
var seq uint64
for i := 0; i < (numTx + 3); i++ {
logs, newResmark, err := ReadBatchFromLog(st, resmark)
if err != nil {
t.Fatalf("can't get watch log batch: %v", err)
}
if i < numTx {
if len(logs) != numPut {
t.Errorf("log fetch (i=%d) wrong log seq: %d instead of %d",
i, len(logs), numPut)
}
seq += uint64(len(logs))
expResmark := MakeResumeMarker(seq)
if !bytes.Equal(newResmark, expResmark) {
t.Errorf("log fetch (i=%d) wrong resmark: %s instead of %s",
i, newResmark, expResmark)
}
for j, log := range logs {
var op PutOp
if err := log.Op.ToValue(&op); err != nil {
t.Fatalf("ToValue failed: %v", err)
}
expKey, expVal := makeKeyVal(i, j)
key := op.Key
if !bytes.Equal(key, expKey) {
t.Errorf("log fetch (i=%d, j=%d) bad key: %s instead of %s",
i, j, key, expKey)
}
tx := st.NewTransaction()
var val []byte
val, err := GetAtVersion(nil, tx, key, val, op.Version)
if err != nil {
t.Errorf("log fetch (i=%d, j=%d) cannot GetAtVersion(): %v", i, j, err)
}
if !bytes.Equal(val, expVal) {
t.Errorf("log fetch (i=%d, j=%d) bad value: %s instead of %s",
i, j, val, expVal)
}
tx.Abort()
}
} else {
if logs != nil || !bytes.Equal(newResmark, resmark) {
t.Errorf("NOP log fetch (i=%d) had changes: %d logs, resmask %s",
i, len(logs), newResmark)
}
}
resmark = newResmark
}
}
// TestWatcher tests broadcasting updates to watch clients.
func TestWatcher(t *testing.T) {
w := newWatcher()
// Update broadcast should never block. It should be safe to call with no
// clients registered.
w.broadcastUpdates()
w.broadcastUpdates()
// Never-receiving client should not block watcher.
_, cancel1 := w.watchUpdates()
defer cancel1()
// Cancelled client should not affect watcher.
chan2, cancel2 := w.watchUpdates()
cancel2()
// Cancel should be idempotent.
cancel2()
// Channel should be closed when client is cancelled.
select {
case _, ok := <-chan2:
if ok {
t.Fatalf("cancel2 was called, chan2 should be drained and closed")
}
default:
t.Fatalf("cancel2 was called, chan2 should be closed")
}
// Update broadcast should not block client registration or vice versa.
timer1 := time.After(10 * time.Second)
registerLoop1 := make(chan bool)
go func() {
for i := 0; i < 5000; i++ {
_, canceli := w.watchUpdates()
defer canceli()
}
registerLoop1 <- true
}()
chan3, cancel3 := w.watchUpdates()
for i := 0; i < 5000; i++ {
w.broadcastUpdates()
}
select {
case <-registerLoop1:
// ok
case <-timer1:
t.Fatalf("registerLoop1 didn't finish after 10s")
}
// Wait for broadcast to fully propagate out of watcherLoop.
time.Sleep(1 * time.Second)
// chan3 should have a single pending notification.
select {
case _, ok := <-chan3:
if !ok {
t.Fatalf("chan3 should not be closed")
}
default:
t.Fatalf("chan3 should have a notification")
}
select {
case <-chan3:
t.Fatalf("chan3 should not have another notification")
default:
// ok
}
// After notification was read, chan3 still receives updates.
go func() {
time.Sleep(500 * time.Millisecond)
w.broadcastUpdates()
}()
select {
case _, ok := <-chan3:
if !ok {
t.Fatalf("chan3 should not be closed")
}
case <-time.After(5 * time.Second):
t.Fatalf("chan3 didn't receive after 5s")
}
// Closing the watcher.
w.close()
// Close should be idempotent.
w.close()
// Client channels should be closed when watcher is closed.
select {
case _, ok := <-chan3:
if ok {
t.Fatalf("watcher was closed, chan3 should be drained and closed")
}
default:
t.Fatalf("watcher was closed, chan3 should be closed")
}
// Cancel is safe to call after the store is closed.
cancel3()
// watchUpdates is safe to call after the store is closed, returning closed
// channel.
chan4, cancel4 := w.watchUpdates()
select {
case _, ok := <-chan4:
if ok {
t.Fatalf("watcher was closed, chan4 should be drained and closed")
}
default:
t.Fatalf("watcher was closed, chan4 should be closed")
}
cancel4()
// broadcastUpdates is safe to call after the store is closed, although it
// logs an error message.
w.broadcastUpdates()
}