syncbase/vsync:
Bugfix to handle syncing deletions.
Added a test to cover this case.
Change-Id: Ia3600dd2ca0475650afc4c4d4dc1ed8ccd219825
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 3d8054c..0633079 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -7,6 +7,7 @@
import (
"fmt"
"strconv"
+ "strings"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -16,6 +17,7 @@
constants "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23"
"v.io/v23/naming"
+ "v.io/v23/verror"
"v.io/x/ref"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test/modules"
@@ -81,6 +83,46 @@
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
}
+// V23TestSyncbasedGetDeltasWithDel tests the sending of deltas between two
+// Syncbase instances and their clients. The 1st client creates a SyncGroup and
+// puts some database entries in it. The 2nd client joins that SyncGroup and
+// reads the database entries. The 1st client then deletes a portion of this
+// data, and adds new entries. The 2nd client verifies that these changes are
+// correctly synced. This verifies the end-to-end synchronization of data along
+// the path: client0--Syncbase0--Syncbase1--client1 with a workload of puts and
+// deletes.
+func V23TestSyncbasedGetDeltasWithDel(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo,tb:bar", "root/s0", "root/s1")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
+
+ tu.RunClient(t, client0Creds, runDeleteData, "sync0", "foo", "0")
+ tu.RunClient(t, client0Creds, runVerifyDeletedData, "sync0", "foo")
+ tu.RunClient(t, client1Creds, runVerifyDeletedData, "sync1", "foo")
+
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "bar", "0")
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "bar", "0", "10")
+}
+
// V23TestSyncbasedExchangeDeltas tests the exchange of deltas between two
// Syncbase instances and their clients. The 1st client creates a SyncGroup and
// puts some database entries in it. The 2nd client joins that SyncGroup and
@@ -274,6 +316,9 @@
////////////////////////////////////
// Helpers.
+// TODO(hpucha): Look into refactoring scan logic out of the helpers, and
+// avoiding gets when we can scan.
+
var runSetupAppA = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -298,7 +343,7 @@
spec := wire.SyncGroupSpec{
Description: "test syncgroup sg",
Perms: perms(args[3:]...),
- Prefixes: []string{args[2]},
+ Prefixes: strings.Split(args[2], ","),
MountTables: []string{mtName},
}
@@ -368,6 +413,28 @@
return nil
}, "runUpdateData")
+var runDeleteData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d", nil)
+
+ // Do Puts.
+ tb := d.Table("tb")
+ start, _ := strconv.ParseUint(args[1], 10, 64)
+
+ for i := start; i < start+5; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ if err := r.Delete(ctx); err != nil {
+ return fmt.Errorf("r.Delete() failed: %v\n", err)
+ }
+ }
+
+ return nil
+}, "runDeleteData")
+
var runVerifySyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -428,6 +495,55 @@
return nil
}, "runVerifySyncGroupData")
+var runVerifyDeletedData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d", nil)
+
+ // Wait for a bit for deletions to propagate.
+ tb := d.Table("tb")
+
+ r := tb.Row("foo4")
+ for i := 0; i < 8; i++ {
+ time.Sleep(500 * time.Millisecond)
+ var value string
+ if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoExist.ID {
+ break
+ }
+ }
+
+ // Verify using a scan operation.
+ stream := tb.Scan(ctx, nosql.Prefix(args[1]))
+ count := 0
+ for i := 5; stream.Advance(); i++ {
+ want := fmt.Sprintf("%s%d", args[1], i)
+ got := stream.Key()
+ if got != want {
+ return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
+ }
+ want = "testkey" + want
+ if err := stream.Value(&got); err != nil {
+ return fmt.Errorf("cannot fetch value in scan: %v\n", err)
+ }
+ if got != want {
+ return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
+ }
+ count++
+ }
+
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("scan stream error: %v\n", err)
+ }
+
+ if count != 5 {
+ return fmt.Errorf("scan stream count error: %v\n", count)
+ }
+
+ return nil
+}, "runVerifyDeletedData")
+
var runVerifyNonSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index 3c8bc00..88e046d 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -33,6 +33,10 @@
v23tests.RunTest(t, V23TestSyncbasedGetDeltas)
}
+func TestV23SyncbasedGetDeltasWithDel(t *testing.T) {
+ v23tests.RunTest(t, V23TestSyncbasedGetDeltasWithDel)
+}
+
func TestV23SyncbasedExchangeDeltas(t *testing.T) {
v23tests.RunTest(t, V23TestSyncbasedExchangeDeltas)
}
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index e14dac6..18df27b 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -444,6 +444,7 @@
}
}
+ vlog.VI(4).Infof("sync: recvAndProcessDeltas: processing rec %v", rec)
if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil {
return err
}
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index c0f2c9f..2211da4 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -440,9 +440,13 @@
func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
// Get the object value at the required version.
key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
- value, err := watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
- if err != nil {
- return nil, err
+ var value []byte
+ if !rec.Metadata.Delete {
+ var err error
+ value, err = watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
+ if err != nil {
+ return nil, err
+ }
}
wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value}