| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vsync |
| |
| // Tests for the sync watcher in Syncbase. |
| |
| import ( |
| "bytes" |
| "reflect" |
| "testing" |
| "time" |
| |
| "v.io/v23/vom" |
| _ "v.io/x/ref/runtime/factories/generic" |
| "v.io/x/ref/services/syncbase/server/util" |
| "v.io/x/ref/services/syncbase/server/watchable" |
| ) |
| |
| // TestSetResmark tests setting and getting a resume marker. |
| func TestSetResmark(t *testing.T) { |
| svc := createService(t) |
| defer destroyService(t, svc) |
| st := svc.St() |
| |
| resmark, err := getResMark(nil, st) |
| if err == nil || resmark != nil { |
| t.Errorf("found non-existent resume marker: %s, %v", resmark, err) |
| } |
| |
| wantResmark := watchable.MakeResumeMarker(1234567890) |
| tx := st.NewTransaction() |
| if err := setResMark(nil, tx, wantResmark); err != nil { |
| t.Errorf("cannot set resume marker: %v", err) |
| } |
| tx.Commit() |
| |
| resmark, err = getResMark(nil, st) |
| if err != nil { |
| t.Errorf("cannot get new resume marker: %v", err) |
| } |
| if !bytes.Equal(resmark, wantResmark) { |
| t.Errorf("invalid new resume: got %s instead of %s", resmark, wantResmark) |
| } |
| } |
| |
| // TestWatchPrefixes tests setting and updating the watch prefixes map. |
| func TestWatchPrefixes(t *testing.T) { |
| watchPollInterval = time.Millisecond |
| svc := createService(t) |
| defer destroyService(t, svc) |
| |
| if len(watchPrefixes) != 0 { |
| t.Errorf("watch prefixes not empty: %v", watchPrefixes) |
| } |
| |
| watchPrefixOps := []struct { |
| appName, dbName, key string |
| incr bool |
| }{ |
| {"app1", "db1", "foo", true}, |
| {"app1", "db1", "bar", true}, |
| {"app2", "db1", "xyz", true}, |
| {"app3", "db1", "haha", true}, |
| {"app1", "db1", "foo", true}, |
| {"app1", "db1", "foo", true}, |
| {"app1", "db1", "foo", false}, |
| {"app2", "db1", "ttt", true}, |
| {"app2", "db1", "ttt", true}, |
| {"app2", "db1", "ttt", false}, |
| {"app2", "db1", "ttt", false}, |
| {"app2", "db2", "qwerty", true}, |
| {"app3", "db1", "haha", true}, |
| {"app2", "db2", "qwerty", false}, |
| {"app3", "db1", "haha", false}, |
| } |
| |
| for _, op := range watchPrefixOps { |
| if op.incr { |
| incrWatchPrefix(op.appName, op.dbName, op.key) |
| } else { |
| decrWatchPrefix(op.appName, op.dbName, op.key) |
| } |
| } |
| |
| expPrefixes := map[string]sgPrefixes{ |
| "app1:db1": sgPrefixes{"foo": 2, "bar": 1}, |
| "app2:db1": sgPrefixes{"xyz": 1}, |
| "app3:db1": sgPrefixes{"haha": 1}, |
| } |
| if !reflect.DeepEqual(watchPrefixes, expPrefixes) { |
| t.Errorf("invalid watch prefixes: got %v instead of %v", watchPrefixes, expPrefixes) |
| } |
| |
| checkSyncableTests := []struct { |
| appName, dbName, key string |
| result bool |
| }{ |
| {"app1", "db1", "foo", true}, |
| {"app1", "db1", "foobar", true}, |
| {"app1", "db1", "bar", true}, |
| {"app1", "db1", "bar123", true}, |
| {"app1", "db1", "f", false}, |
| {"app1", "db1", "ba", false}, |
| {"app1", "db1", "xyz", false}, |
| {"app1", "db555", "foo", false}, |
| {"app555", "db1", "foo", false}, |
| {"app2", "db1", "xyz123", true}, |
| {"app2", "db1", "ttt123", false}, |
| {"app2", "db2", "qwerty", false}, |
| {"app3", "db1", "hahahoho", true}, |
| {"app3", "db1", "hoho", false}, |
| {"app3", "db1", "h", false}, |
| } |
| |
| for _, test := range checkSyncableTests { |
| log := &watchable.LogEntry{ |
| Op: watchable.OpPut{ |
| Value: watchable.PutOp{Key: []byte(makeRowKey(test.key))}, |
| }, |
| } |
| res := syncable(appDbName(test.appName, test.dbName), log) |
| if res != test.result { |
| t.Errorf("checkSyncable: invalid output: %s, %s, %s: got %t instead of %t", |
| test.appName, test.dbName, test.key, res, test.result) |
| } |
| } |
| } |
| |
| // newLog creates a Put or Delete watch log entry. |
| func newLog(key, version string, delete bool) *watchable.LogEntry { |
| k, v := []byte(key), []byte(version) |
| log := &watchable.LogEntry{} |
| if delete { |
| log.Op = watchable.OpDelete{watchable.DeleteOp{Key: k}} |
| } else { |
| log.Op = watchable.OpPut{watchable.PutOp{Key: k, Version: v}} |
| } |
| return log |
| } |
| |
| // newSGLog creates a SyncGroup watch log entry. |
| func newSGLog(prefixes []string, remove bool) *watchable.LogEntry { |
| return &watchable.LogEntry{ |
| Op: watchable.OpSyncGroup{ |
| Value: watchable.SyncGroupOp{Prefixes: prefixes, Remove: remove}, |
| }, |
| } |
| } |
| |
| // TestProcessWatchLogBatch tests the processing of a batch of log records. |
| func TestProcessWatchLogBatch(t *testing.T) { |
| svc := createService(t) |
| defer destroyService(t, svc) |
| st := svc.St() |
| s := svc.sync |
| |
| app, db := "mockapp", "mockdb" |
| fooKey := makeRowKey("foo") |
| barKey := makeRowKey("bar") |
| fooxyzKey := makeRowKey("fooxyz") |
| |
| // Empty logs does not fail. |
| s.processWatchLogBatch(nil, app, db, st, nil, nil) |
| |
| // Non-syncable logs. |
| batch := []*watchable.LogEntry{ |
| newLog(fooKey, "123", false), |
| newLog(barKey, "555", false), |
| } |
| |
| resmark := watchable.MakeResumeMarker(1234) |
| s.processWatchLogBatch(nil, app, db, st, batch, resmark) |
| |
| if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) { |
| t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark) |
| } |
| if ok, err := hasNode(nil, st, fooKey, "123"); err != nil || ok { |
| t.Error("hasNode() found DAG entry for non-syncable log on foo") |
| } |
| if ok, err := hasNode(nil, st, barKey, "555"); err != nil || ok { |
| t.Error("hasNode() found DAG entry for non-syncable log on bar") |
| } |
| |
| // Partially syncable logs. |
| batch = []*watchable.LogEntry{ |
| newSGLog([]string{"f", "x"}, false), |
| newLog(fooKey, "333", false), |
| newLog(fooxyzKey, "444", false), |
| newLog(barKey, "222", false), |
| } |
| |
| resmark = watchable.MakeResumeMarker(3456) |
| s.processWatchLogBatch(nil, app, db, st, batch, resmark) |
| |
| if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) { |
| t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark) |
| } |
| if head, err := getHead(nil, st, fooKey); err != nil && head != "333" { |
| t.Errorf("getHead() did not find foo: %s, %v", head, err) |
| } |
| node, err := getNode(nil, st, fooKey, "333") |
| if err != nil { |
| t.Errorf("getNode() did not find foo: %v", err) |
| } |
| if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId != NoBatchId { |
| t.Errorf("invalid DAG node for foo: %v", node) |
| } |
| node2, err := getNode(nil, st, fooxyzKey, "444") |
| if err != nil { |
| t.Errorf("getNode() did not find fooxyz: %v", err) |
| } |
| if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != NoBatchId { |
| t.Errorf("invalid DAG node for fooxyz: %v", node2) |
| } |
| if ok, err := hasNode(nil, st, barKey, "222"); err != nil || ok { |
| t.Error("hasNode() found DAG entry for non-syncable log on bar") |
| } |
| |
| // More partially syncable logs updating existing ones. |
| batch = []*watchable.LogEntry{ |
| newLog(fooKey, "1", false), |
| newLog(fooxyzKey, "", true), |
| newLog(barKey, "7", false), |
| } |
| |
| resmark = watchable.MakeResumeMarker(7890) |
| s.processWatchLogBatch(nil, app, db, st, batch, resmark) |
| |
| if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) { |
| t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark) |
| } |
| if head, err := getHead(nil, st, fooKey); err != nil && head != "1" { |
| t.Errorf("getHead() did not find foo: %s, %v", head, err) |
| } |
| node, err = getNode(nil, st, fooKey, "1") |
| if err != nil { |
| t.Errorf("getNode() did not find foo: %v", err) |
| } |
| expParents := []string{"333"} |
| if node.Level != 1 || !reflect.DeepEqual(node.Parents, expParents) || |
| node.Logrec == "" || node.BatchId == NoBatchId { |
| t.Errorf("invalid DAG node for foo: %v", node) |
| } |
| head2, err := getHead(nil, st, fooxyzKey) |
| if err != nil { |
| t.Errorf("getHead() did not find fooxyz: %v", err) |
| } |
| node2, err = getNode(nil, st, fooxyzKey, head2) |
| if err != nil { |
| t.Errorf("getNode() did not find fooxyz: %v", err) |
| } |
| expParents = []string{"444"} |
| if node2.Level != 1 || !reflect.DeepEqual(node2.Parents, expParents) || |
| node2.Logrec == "" || node2.BatchId == NoBatchId { |
| t.Errorf("invalid DAG node for fooxyz: %v", node2) |
| } |
| if ok, err := hasNode(nil, st, barKey, "7"); err != nil || ok { |
| t.Error("hasNode() found DAG entry for non-syncable log on bar") |
| } |
| |
| // Back to non-syncable logs (remove "f" prefix). |
| batch = []*watchable.LogEntry{ |
| newSGLog([]string{"f"}, true), |
| newLog(fooKey, "99", false), |
| newLog(fooxyzKey, "888", true), |
| newLog(barKey, "007", false), |
| } |
| |
| resmark = watchable.MakeResumeMarker(20212223) |
| s.processWatchLogBatch(nil, app, db, st, batch, resmark) |
| |
| if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) { |
| t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark) |
| } |
| // No changes to "foo". |
| if head, err := getHead(nil, st, fooKey); err != nil && head != "333" { |
| t.Errorf("getHead() did not find foo: %s, %v", head, err) |
| } |
| if node, err := getNode(nil, st, fooKey, "99"); err == nil { |
| t.Errorf("getNode() should not have found foo @ 99: %v", node) |
| } |
| if node, err := getNode(nil, st, fooxyzKey, "888"); err == nil { |
| t.Errorf("getNode() should not have found fooxyz @ 888: %v", node) |
| } |
| if ok, err := hasNode(nil, st, barKey, "007"); err != nil || ok { |
| t.Error("hasNode() found DAG entry for non-syncable log on bar") |
| } |
| |
| // Scan the batch records and verify that there is only 1 DAG batch |
| // stored, with a total count of 3 and a map of 2 syncable entries. |
| // This is because the 1st batch, while containing syncable keys, is a |
| // SyncGroup snapshot that does not get assigned a batch ID. The 2nd |
| // batch is an application batch with 3 keys of which 2 are syncable. |
| // The 3rd batch is also a SyncGroup snapshot. |
| count := 0 |
| start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.SyncPrefix, "dag", "b"), "") |
| stream := st.Scan(start, limit) |
| for stream.Advance() { |
| count++ |
| key := string(stream.Key(nil)) |
| var info batchInfo |
| if err := vom.Decode(stream.Value(nil), &info); err != nil { |
| t.Errorf("cannot decode batch %s: %v", key, err) |
| } |
| if info.Count != 3 { |
| t.Errorf("wrong total count in batch %s: got %d instead of 3", key, info.Count) |
| } |
| if n := len(info.Objects); n != 2 { |
| t.Errorf("wrong object count in batch %s: got %d instead of 2", key, n) |
| } |
| } |
| if count != 1 { |
| t.Errorf("wrong count of batches: got %d instead of 2", count) |
| } |
| } |