syncbase/vsync:
Bugfix to handle syncing deletions.
Added a test to cover this case.

Change-Id: Ia3600dd2ca0475650afc4c4d4dc1ed8ccd219825
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 3d8054c..0633079 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -7,6 +7,7 @@
 import (
 	"fmt"
 	"strconv"
+	"strings"
 	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -16,6 +17,7 @@
 	constants "v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23"
 	"v.io/v23/naming"
+	"v.io/v23/verror"
 	"v.io/x/ref"
 	_ "v.io/x/ref/runtime/factories/generic"
 	"v.io/x/ref/test/modules"
@@ -81,6 +83,46 @@
 	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
 }
 
+// V23TestSyncbasedGetDeltasWithDel tests the sending of deltas between two
+// Syncbase instances and their clients. The 1st client creates a SyncGroup and
+// puts some database entries in it. The 2nd client joins that SyncGroup and
+// reads the database entries. The 1st client then deletes a portion of this
+// data, and adds new entries. The 2nd client verifies that these changes are
+// correctly synced. This verifies the end-to-end synchronization of data along
+// the path: client0--Syncbase0--Syncbase1--client1 with a workload of puts and
+// deletes.
+func V23TestSyncbasedGetDeltasWithDel(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+	defer cleanSync1()
+
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo,tb:bar", "root/s0", "root/s1")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
+
+	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
+
+	tu.RunClient(t, client0Creds, runDeleteData, "sync0", "foo", "0")
+	tu.RunClient(t, client0Creds, runVerifyDeletedData, "sync0", "foo")
+	tu.RunClient(t, client1Creds, runVerifyDeletedData, "sync1", "foo")
+
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "bar", "0")
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "bar", "0", "10")
+}
+
 // V23TestSyncbasedExchangeDeltas tests the exchange of deltas between two
 // Syncbase instances and their clients.  The 1st client creates a SyncGroup and
 // puts some database entries in it.  The 2nd client joins that SyncGroup and
@@ -274,6 +316,9 @@
 ////////////////////////////////////
 // Helpers.
 
+// TODO(hpucha): Look into refactoring scan logic out of the helpers, and
+// avoiding gets when we can scan.
+
 var runSetupAppA = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
@@ -298,7 +343,7 @@
 	spec := wire.SyncGroupSpec{
 		Description: "test syncgroup sg",
 		Perms:       perms(args[3:]...),
-		Prefixes:    []string{args[2]},
+		Prefixes:    strings.Split(args[2], ","),
 		MountTables: []string{mtName},
 	}
 
@@ -368,6 +413,28 @@
 	return nil
 }, "runUpdateData")
 
+var runDeleteData = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+
+	// Do Puts.
+	tb := d.Table("tb")
+	start, _ := strconv.ParseUint(args[1], 10, 64)
+
+	for i := start; i < start+5; i++ {
+		key := fmt.Sprintf("foo%d", i)
+		r := tb.Row(key)
+		if err := r.Delete(ctx); err != nil {
+			return fmt.Errorf("r.Delete() failed: %v\n", err)
+		}
+	}
+
+	return nil
+}, "runDeleteData")
+
 var runVerifySyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
@@ -428,6 +495,55 @@
 	return nil
 }, "runVerifySyncGroupData")
 
+var runVerifyDeletedData = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+
+	// Wait for a bit for deletions to propagate.
+	tb := d.Table("tb")
+
+	r := tb.Row("foo4")
+	for i := 0; i < 8; i++ {
+		time.Sleep(500 * time.Millisecond)
+		var value string
+		if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoExist.ID {
+			break
+		}
+	}
+
+	// Verify using a scan operation.
+	stream := tb.Scan(ctx, nosql.Prefix(args[1]))
+	count := 0
+	for i := 5; stream.Advance(); i++ {
+		want := fmt.Sprintf("%s%d", args[1], i)
+		got := stream.Key()
+		if got != want {
+			return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
+		}
+		want = "testkey" + want
+		if err := stream.Value(&got); err != nil {
+			return fmt.Errorf("cannot fetch value in scan: %v\n", err)
+		}
+		if got != want {
+			return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
+		}
+		count++
+	}
+
+	if err := stream.Err(); err != nil {
+		return fmt.Errorf("scan stream error: %v\n", err)
+	}
+
+	if count != 5 {
+		return fmt.Errorf("scan stream count error: %v\n", count)
+	}
+
+	return nil
+}, "runVerifyDeletedData")
+
 var runVerifyNonSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index 3c8bc00..88e046d 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -33,6 +33,10 @@
 	v23tests.RunTest(t, V23TestSyncbasedGetDeltas)
 }
 
+func TestV23SyncbasedGetDeltasWithDel(t *testing.T) {
+	v23tests.RunTest(t, V23TestSyncbasedGetDeltasWithDel)
+}
+
 func TestV23SyncbasedExchangeDeltas(t *testing.T) {
 	v23tests.RunTest(t, V23TestSyncbasedExchangeDeltas)
 }
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index e14dac6..18df27b 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -444,6 +444,7 @@
 				}
 			}
 
+			vlog.VI(4).Infof("sync: recvAndProcessDeltas: processing rec %v", rec)
 			if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil {
 				return err
 			}
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index c0f2c9f..2211da4 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -440,9 +440,13 @@
 func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
 	// Get the object value at the required version.
 	key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
-	value, err := watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
-	if err != nil {
-		return nil, err
+	var value []byte
+	if !rec.Metadata.Delete {
+		var err error
+		value, err = watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value}