blob: 696e77317890a20f099c03c0ff5134cfd1e05a3b [file] [log] [blame]
package watch
import (
"bytes"
"io"
"sync"
"testing"
"time"
"veyron/services/store/memstore"
watchtesting "veyron/services/store/memstore/testing"
"veyron/services/store/raw"
"veyron2/services/watch"
"veyron2/storage"
"veyron2/verror"
)
func TestWatchRaw(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Put /
tr := memstore.NewTransaction()
id1 := put(t, st, tr, "/", "val1")
commit(t, tr)
post1 := st.Snapshot().Find(id1).Version
// Start a watch request.
req := raw.Request{}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the first transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post1, true, "val1", watchtesting.EmptyDir)
// Put /a
tr = memstore.NewTransaction()
id2 := put(t, st, tr, "/a", "val2")
commit(t, tr)
pre1 := post1
post1 = st.Snapshot().Find(id1).Version
post2 := st.Snapshot().Find(id2).Version
// Check that watch detects the changes in the second transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
change = changes[1]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, pre1, post1, true, "val1", watchtesting.DirOf("a", id2))
watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post2, false, "val2", watchtesting.EmptyDir)
}
func TestWatchGlob(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Put /
tr := memstore.NewTransaction()
id1 := put(t, st, tr, "/", "val1")
commit(t, tr)
// Start a watch request.
path := storage.ParsePath("/")
req := watch.GlobRequest{Pattern: "..."}
ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
// Check that watch detects the changes in the first transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
// Put /a
tr = memstore.NewTransaction()
id2 := put(t, st, tr, "/a", "val2")
commit(t, tr)
// Check that watch detects the changes in the second transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
change = changes[1]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
}
func TestWatchCancellation(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Start a watch request.
req := raw.Request{}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Commit a transaction.
tr := memstore.NewTransaction()
put(t, st, tr, "/", "val1")
commit(t, tr)
// Check that watch processed the first transaction.
if !ws.Advance() {
t.Error("Expected a change.")
}
// Cancel the watch request.
ws.Cancel()
// Give watch some time to process the cancellation.
time.Sleep(time.Second)
// Commit a second transaction.
tr = memstore.NewTransaction()
put(t, st, tr, "/a", "val2")
commit(t, tr)
// Check that watch did not processed the second transaction.
if ws.Advance() || ws.Err() != nil {
t.Errorf("Unexpected error: %v", ws.Err())
}
// Check that io.EOF was returned.
if err := ws.Finish(); err != io.EOF {
t.Errorf("Unexpected error: %v", err)
}
}
func TestWatchClosed(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
var once sync.Once
defer once.Do(cleanup)
// Start a watch request.
req := raw.Request{}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Commit a transaction.
tr := memstore.NewTransaction()
put(t, st, tr, "/", "val1")
commit(t, tr)
// Check that watch processed the first transaction.
if !ws.Advance() {
t.Error("Expected a change.")
}
// Close the watcher, check that io.EOF was returned.
once.Do(cleanup)
if err := ws.Finish(); err != io.EOF {
t.Errorf("Unexpected error: %v", err)
}
}
func TestStateResumeMarker(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Put /
tr := memstore.NewTransaction()
id1 := put(t, st, tr, "/", "val1")
commit(t, tr)
post11 := st.Snapshot().Find(id1).Version
if err := st.Close(); err != nil {
t.Fatalf("Close() failed: %v", err)
}
// Re-create a new store. This should compress the log, creating an initial
// state containing / and /a.
st, cleanup = openStore(t, dbName)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Put /a
tr = memstore.NewTransaction()
id2 := put(t, st, tr, "/a", "val2")
commit(t, tr)
pre21 := post11
post21 := st.Snapshot().Find(id1).Version
post22 := st.Snapshot().Find(id2).Version
// Start a watch request.
req := raw.Request{}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Retrieve the resume marker for the initial state.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb := ws.Value()
changes := cb.Changes
change := changes[0]
resumeMarker1 := change.ResumeMarker
// Cancel the watch request.
ws.Cancel()
ws.Finish()
// Start a watch request after the initial state.
req = raw.Request{ResumeMarker: resumeMarker1}
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the state and the transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
// Check that watch detects the changes in the state and the transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
change = changes[1]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
}
func TestTransactionResumeMarker(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Put /
tr := memstore.NewTransaction()
id1 := put(t, st, tr, "/", "val1")
commit(t, tr)
post11 := st.Snapshot().Find(id1).Version
// Put /a
tr = memstore.NewTransaction()
id2 := put(t, st, tr, "/a", "val2")
commit(t, tr)
pre21 := post11
post21 := st.Snapshot().Find(id1).Version
post22 := st.Snapshot().Find(id2).Version
// Start a watch request.
req := raw.Request{}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Retrieve the resume marker for the first transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb := ws.Value()
changes := cb.Changes
change := changes[0]
resumeMarker1 := change.ResumeMarker
// Cancel the watch request.
ws.Cancel()
ws.Finish()
// Start a watch request after the first transaction.
req = raw.Request{ResumeMarker: resumeMarker1}
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the first and second transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
change = changes[1]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
resumeMarker2 := change.ResumeMarker
watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
// Cancel the watch request.
ws.Cancel()
ws.Finish()
// Start a watch request at the second transaction.
req = raw.Request{ResumeMarker: resumeMarker2}
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the second transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
change = changes[1]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
}
func TestNowResumeMarker(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Put /
tr := memstore.NewTransaction()
put(t, st, tr, "/", "val1")
commit(t, tr)
// Put /a
tr = memstore.NewTransaction()
id2 := put(t, st, tr, "/a", "val2")
commit(t, tr)
post22 := st.Snapshot().Find(id2).Version
// Start a watch request with the "now" resume marker.
req := raw.Request{ResumeMarker: nowResumeMarker}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Give watch some time to pick "now".
time.Sleep(time.Second)
// Put /a/b
tr = memstore.NewTransaction()
id3 := put(t, st, tr, "/a/b", "val3")
commit(t, tr)
pre32 := post22
post32 := st.Snapshot().Find(id2).Version
post33 := st.Snapshot().Find(id3).Version
// Check that watch announces that the initial state was skipped.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb := ws.Value()
changes := cb.Changes
change := changes[0]
watchtesting.ExpectInitialStateSkipped(t, change)
// Check that watch detects the changes in the third transaction.
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
change = changes[1]
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id2, pre32, post32, false, "val2", watchtesting.DirOf("b", id3))
watchtesting.ExpectMutationExists(t, changes, id3, storage.NoVersion, post33, false, "val3", watchtesting.EmptyDir)
}
func TestUnknownResumeMarkers(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Put /
tr := memstore.NewTransaction()
put(t, st, tr, "/", "val1")
commit(t, tr)
// Start a watch request with a resume marker that's too early.
resumeMarker := timestampToResumeMarker(1)
req := raw.Request{ResumeMarker: resumeMarker}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// The resume marker should be unknown.
if err := ws.Finish(); !verror.Is(err, verror.BadArg) {
t.Errorf("Error should be %v: got %v", verror.BadArg, err)
}
// Start a watch request with a resume marker that's too late.
resumeMarker = timestampToResumeMarker(2 ^ 63 - 1)
req = raw.Request{ResumeMarker: resumeMarker}
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// The resume marker should be unknown.
if err := ws.Finish(); !verror.Is(err, verror.BadArg) {
t.Errorf("Error should be %v: got %v", verror.BadArg, err)
}
}
func TestConsistentResumeMarkers(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
// Put /
tr := memstore.NewTransaction()
put(t, st, tr, "/", "val1")
commit(t, tr)
// Start a watch request.
path := storage.ParsePath("/")
req := watch.GlobRequest{Pattern: "..."}
ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb := ws.Value()
changes := cb.Changes
change := changes[0]
// Save the ResumeMarker of the change.
r := change.ResumeMarker
// Start another watch request.
ws = watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
if !ws.Advance() {
t.Error("Advance() failed: %v", ws.Err())
}
cb = ws.Value()
changes = cb.Changes
change = changes[0]
// Expect the same ResumeMarker.
if !bytes.Equal(r, change.ResumeMarker) {
t.Error("Inconsistent ResumeMarker.")
}
}