veyron/runtimes/google/vsync:
* Track delete mutations from store while maintaining unique version
numbers in the DAG for delete records.
* Update testfiles and tests to not use NoVersion (i.e. 0) as a valid
version number.
Change-Id: I668f933679feb52e464c61f8c61ac0e2096d8674
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index dee40cf..0725b84 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -252,6 +252,42 @@
ResumeMarker: nil,
Continued: false,
},
+
+ // 3rd transaction: remove "/a/b" which updates "a" (its "Dir" field) and deletes "b".
+ watch.Change{
+ Name: "",
+ State: 0,
+ Value: &raw.Mutation{
+ ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
+ 0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
+ PriorVersion: 0x365a858149c6e2d1,
+ Version: 0xa858149c6e2d1000,
+ Value: "value-a",
+ Dir: []storage.DEntry{
+ storage.DEntry{
+ Name: "c",
+ ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44,
+ 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71},
+ },
+ },
+ },
+ ResumeMarker: nil,
+ Continued: true,
+ },
+ watch.Change{
+ Name: "",
+ State: watch.DoesNotExist,
+ Value: &raw.Mutation{
+ ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
+ 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
+ PriorVersion: 0x55104dc76695721d,
+ Version: 0x0,
+ Value: "",
+ Dir: nil,
+ },
+ ResumeMarker: nil,
+ Continued: false,
+ },
}
return batch
@@ -321,6 +357,7 @@
// TestWatcherRecvError tests the watcher reacting to an error from the stream receive.
// It verifies that the watcher retries the Watch() RPC after a delay.
func TestWatcherRecvError(t *testing.T) {
+ rt.Init()
dir := initTestDir(t)
defer os.RemoveAll(dir)
@@ -339,6 +376,7 @@
// TestWatcherChanges tests the watcher applying changes received from store.
func TestWatcherChanges(t *testing.T) {
+ rt.Init()
dir := initTestDir(t)
defer os.RemoveAll(dir)
@@ -353,8 +391,8 @@
oidB := storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb, 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7}
oidC := storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44, 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71}
- oids := []storage.ID{oidRoot, oidA, oidB, oidC}
- heads := []storage.Version{0x4d65822107fcfd52, 0x365a858149c6e2d1, 0x55104dc76695721d, 0x380704bb7b4d7c03}
+ oids := []storage.ID{oidRoot, oidA, oidC}
+ heads := []storage.Version{0x4d65822107fcfd52, 0xa858149c6e2d1000, 0x380704bb7b4d7c03}
for i, oid := range oids {
expHead := heads[i]
@@ -366,6 +404,15 @@
}
}
+ // Verify oidB.
+ headB, err := s.dag.getHead(oidB)
+ if err != nil {
+ t.Errorf("cannot find head node for object %d: %s", oidB, err)
+ }
+ if headB == storage.NoVersion || headB == storage.Version(0x55104dc76695721d) {
+ t.Errorf("wrong head for object B %d: %d ", oidB, headB)
+ }
+
// Verify transaction state for the first transaction.
node, err := s.dag.getNode(oidRoot, heads[0])
if err != nil {
@@ -381,7 +428,7 @@
expTxMap := dagTxMap{
oidRoot: heads[0],
oidA: storage.Version(0x57e9d1860d1d68d8),
- oidB: heads[2],
+ oidB: storage.Version(0x55104dc76695721d),
}
if !reflect.DeepEqual(txMap, expTxMap) {
t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
@@ -389,6 +436,27 @@
}
// Verify transaction state for the second transaction.
+ node, err = s.dag.getNode(oidC, heads[2])
+ if err != nil {
+ t.Errorf("cannot find dag node for object %d %v: %s", oidC, heads[2], err)
+ }
+ if node.TxID == NoTxID {
+ t.Errorf("expecting non nil txid for object %d:%v", oidC, heads[2])
+ }
+ txMap, err = s.dag.getTransaction(node.TxID)
+ if err != nil {
+ t.Errorf("cannot find transaction for id %v: %s", node.TxID, err)
+ }
+ expTxMap = dagTxMap{
+ oidA: storage.Version(0x365a858149c6e2d1),
+ oidC: heads[2],
+ }
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
+ node.TxID, txMap, expTxMap)
+ }
+
+ // Verify transaction state for the third transaction.
node, err = s.dag.getNode(oidA, heads[1])
if err != nil {
t.Errorf("cannot find dag node for object %d %v: %s", oidA, heads[1], err)
@@ -402,14 +470,26 @@
}
expTxMap = dagTxMap{
oidA: heads[1],
- oidC: heads[3],
+ oidB: headB,
}
if !reflect.DeepEqual(txMap, expTxMap) {
t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
node.TxID, txMap, expTxMap)
}
- expResmark := []byte{2, 0, 0, 0, 0, 0, 0, 0}
+ // Verify deletion tracking.
+ node, err = s.dag.getNode(oidB, headB)
+ if err != nil {
+ t.Errorf("cannot find dag node for object %d %v: %s", oidB, headB, err)
+ }
+ if !node.Deleted {
+ t.Errorf("deleted node not found for object %d %v: %s", oidB, headB, err)
+ }
+ if !s.dag.hasDeletedDescendant(oidB, storage.Version(0x55104dc76695721d)) {
+ t.Errorf("link to deleted node not found for object %d %v: %s", oidB, headB, err)
+ }
+
+ expResmark := []byte{3, 0, 0, 0, 0, 0, 0, 0}
if bytes.Compare(s.devtab.head.Resmark, expResmark) != 0 {
t.Errorf("error in watch device table resume marker: %v instead of %v", s.devtab.head.Resmark, expResmark)