syncbase/vsync: 1st end-to-end integration test.

First end-to-end sync integration test.  Work mostly done by hpucha@
being wrapped up in a CL.  The test runs 2 Syncbases, 2 clients, and
one MountTable.  The 1st client creates a SyncGroup and stores some
syncable objects.  The 2nd client joins the SyncGroup and verifies
the receipt of the objects via sync.

The integration test exposed a few bugs across components (watcher,
initiator, responder) being fixed in this CL, and unit tests updated.

Change-Id: I6cd7d7feefd957e968b784795967e02ff66cb0d0
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index ff4b5cc..849f352 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -6,18 +6,24 @@
 
 import (
 	"fmt"
+	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase"
+	"v.io/syncbase/v23/syncbase/nosql"
 	tu "v.io/syncbase/v23/syncbase/testutil"
 	constants "v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23"
 	"v.io/v23/naming"
+	"v.io/x/ref"
 	_ "v.io/x/ref/runtime/factories/generic"
 	"v.io/x/ref/test/modules"
 	"v.io/x/ref/test/v23tests"
 )
 
+// TODO(rdaoud): change the credentials of servers s0 and s1 to independent
+// principals derived from the same root.
+
 //go:generate v23 test generate
 
 func V23TestSyncbasedJoinSyncGroup(t *v23tests.T) {
@@ -47,10 +53,13 @@
 	d := a.NoSQLDatabase("d")
 	d.Create(ctx, nil)
 
+	mtName := env.Vars[ref.EnvNamespacePrefix]
+
 	spec := wire.SyncGroupSpec{
 		Description: "test syncgroup sg",
 		Perms:       perms("root/s0", "root/s0/s1"),
-		Prefixes:    []string{"t1/foo"},
+		Prefixes:    []string{"t1:foo"},
+		MountTables: []string{mtName},
 	}
 	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
 	sg := d.SyncGroup(sgName)
@@ -74,7 +83,133 @@
 	sg := d.SyncGroup(sgName)
 	info := wire.SyncGroupMemberInfo{10}
 	if _, err := sg.Join(ctx, info); err != nil {
-		return fmt.Errorf("Join SG %v failed: %v", sgName, err)
+		return fmt.Errorf("Join SG %q failed: %v", sgName, err)
 	}
 	return nil
 }, "runJoinSyncGroup")
+
+// V23TestSyncbasedGetDeltas tests the exchange of deltas between two Syncbase
+// instances and their clients.  The 1st client creates a SyncGroup and puts
+// some database entries in it.  The 2nd client joins that SyncGroup and reads
+// the database entries.  This verifies the end-to-end synchronization of data
+// along the path: client1--Syncbase1--Syncbase2--client2.
+func V23TestSyncbasedGetDeltas(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
+	defer cleanSync1()
+
+	tu.RunClient(t, client0Creds, runCreateAndPopulateSyncGroup)
+	tu.RunClient(t, client1Creds, runJoinSyncGroupAndFetchData)
+}
+
+var runCreateAndPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService("sync0").App("a")
+	a.Create(ctx, nil)
+	d := a.NoSQLDatabase("d")
+	d.Create(ctx, nil)
+	d.CreateTable(ctx, "tb", nil)
+
+	mtName := env.Vars[ref.EnvNamespacePrefix]
+
+	spec := wire.SyncGroupSpec{
+		Description: "test syncgroup sg",
+		Perms:       perms("root/s0", "root/s0/s1"),
+		Prefixes:    []string{"tb:foo"},
+		MountTables: []string{mtName},
+	}
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+	sg := d.SyncGroup(sgName)
+	info := wire.SyncGroupMemberInfo{8}
+	if err := sg.Create(ctx, spec, info); err != nil {
+		return fmt.Errorf("Create SG %q failed: %v", sgName, err)
+	}
+
+	// Do Puts.
+	tb := d.Table("tb")
+	for i := 0; i < 10; i++ {
+		key := fmt.Sprintf("foo%d", i)
+		r := tb.Row(key)
+		if err := r.Put(ctx, "testkey"+key); err != nil {
+			return fmt.Errorf("r.Put() failed: %v", err)
+		}
+	}
+
+	return nil
+}, "runCreateAndPopulateSyncGroup")
+
+var runJoinSyncGroupAndFetchData = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService("sync1").App("a")
+	a.Create(ctx, nil)
+	d := a.NoSQLDatabase("d")
+	d.Create(ctx, nil)
+	d.CreateTable(ctx, "tb", nil)
+
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+	sg := d.SyncGroup(sgName)
+	info := wire.SyncGroupMemberInfo{10}
+	if _, err := sg.Join(ctx, info); err != nil {
+		return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
+	}
+
+	// Wait for a bit (up to 4 sec) until the last key appears.
+	tb := d.Table("tb")
+	r := tb.Row("foo9")
+	for i := 0; i < 8; i++ {
+		time.Sleep(500 * time.Millisecond)
+		var value string
+		if err := r.Get(ctx, &value); err == nil {
+			break
+		}
+	}
+
+	// Verify that all keys and values made it correctly.
+	for i := 0; i < 10; i++ {
+		key := fmt.Sprintf("foo%d", i)
+		r := tb.Row(key)
+		var got string
+		if err := r.Get(ctx, &got); err != nil {
+			return fmt.Errorf("r.Get() failed: %v\n", err)
+		}
+		want := "testkey" + key
+		if got != want {
+			return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
+		}
+	}
+
+	// Re-verify using a scan operation.
+	stream := tb.Scan(ctx, nosql.Prefix("foo"))
+	for i := 0; stream.Advance(); i++ {
+		want := fmt.Sprintf("foo%d", i)
+		got := stream.Key()
+		if got != want {
+			return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
+		}
+		want = "testkey" + want
+		if err := stream.Value(&got); err != nil {
+			return fmt.Errorf("cannot fetch value in scan: %v\n", err)
+		}
+		if got != want {
+			return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
+		}
+	}
+	if err := stream.Err(); err != nil {
+		return fmt.Errorf("scan stream error: %v\n", err)
+	}
+
+	return nil
+}, "runJoinSyncGroupAndFetchData")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index efbdd39..a14dbe0 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -28,3 +28,7 @@
 func TestV23SyncbasedJoinSyncGroup(t *testing.T) {
 	v23tests.RunTest(t, V23TestSyncbasedJoinSyncGroup)
 }
+
+func TestV23SyncbasedGetDeltas(t *testing.T) {
+	v23tests.RunTest(t, V23TestSyncbasedGetDeltas)
+}
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 1624301..1d447c5 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -10,6 +10,7 @@
 	"time"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
@@ -124,7 +125,13 @@
 		}
 
 		iSt.stream = stream
-		req := interfaces.DeltaReq{SgIds: iSt.sgIds, InitVec: iSt.local}
+		req := interfaces.DeltaReq{
+			AppName: iSt.appName,
+			DbName:  iSt.dbName,
+			SgIds:   iSt.sgIds,
+			InitVec: iSt.local,
+		}
+
 		sender := iSt.stream.SendStream()
 		sender.Send(req)
 
@@ -264,7 +271,8 @@
 		return nil, false
 	}
 	for mt := range iSt.mtTables {
-		c := interfaces.SyncClient(naming.Join(mt, iSt.peer))
+		absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
+		c := interfaces.SyncClient(absName)
 		stream, err := c.GetDeltas(ctx)
 		if err == nil {
 			return stream, true
@@ -382,7 +390,6 @@
 				return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream")
 			}
 			finish = true
-			break
 
 		case interfaces.DeltaRespRespVec:
 			iSt.remote = v.Value
@@ -411,6 +418,11 @@
 			// Mark object dirty.
 			iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
 		}
+
+		// Break out of the stream.
+		if finish {
+			break
+		}
 	}
 
 	if !(start && finish) {
@@ -514,6 +526,7 @@
 
 	for {
 		iSt.tx = iSt.st.NewTransaction()
+		watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
 
 		if err := iSt.detectConflicts(ctx); err != nil {
 			return err
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
index bfcb250..d3c55d4 100644
--- a/x/ref/services/syncbase/vsync/initiator_test.go
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -256,7 +256,7 @@
 	testIfMapArrEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
 	testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
 
-	s.syncState[gdb] = &dbSyncStateInMem{}
+	s.initDbSyncStateInMem(nil, "mockapp", "mockdb")
 
 	// Create local genvec so that it contains knowledge only about common prefixes.
 	if err := iSt.createLocalGenVec(nil); err != nil {
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 19b1366..d096aa8 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -10,15 +10,17 @@
 	"strings"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 )
 
 // GetDeltas implements the responder side of the GetDeltas RPC.
 func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
 	recvr := call.RecvStream()
-
 	for recvr.Advance() {
 		req := recvr.Value()
 		// Ignoring errors since if one Database fails for any reason,
@@ -281,10 +283,11 @@
 
 		if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
 			// Send on the wire.
-			wireRec := interfaces.LogRec{Metadata: rec.Metadata}
-			// TODO(hpucha): Hash out this fake stream stuff when
-			// defining the RPC and the rest of the responder.
-			sender.Send(interfaces.DeltaRespRec{wireRec})
+			wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
+			if err != nil {
+				return err
+			}
+			sender.Send(interfaces.DeltaRespRec{*wireRec})
 		}
 
 		// Add a new record from the same device if not done.
@@ -387,11 +390,19 @@
 
 // Note: initPfxs is sorted.
 func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
-	filter := true
+	// The key starts with one of the store's reserved prefixes for managed
+	// namespaces (e.g. $row, $perms).  Remove that prefix before comparing
+	// it with the SyncGroup prefixes which are defined by the application.
+	parts := util.SplitKeyParts(rec.Metadata.ObjId)
+	if len(parts) < 2 {
+		vlog.Fatalf("filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
+	}
+	key := util.JoinKeyParts(parts[1:]...)
 
+	filter := true
 	var maxGen uint64
 	for _, p := range initPfxs {
-		if strings.HasPrefix(rec.Metadata.ObjId, p) {
+		if strings.HasPrefix(key, p) {
 			// Do not filter. Initiator is interested in this
 			// prefix.
 			filter = false
@@ -406,12 +417,31 @@
 
 	// Filter this record if the initiator already has it.
 	if maxGen >= rec.Metadata.Gen {
-		return true
+		filter = true
 	}
 
 	return filter
 }
 
+// makeWireLogRec creates a sync log record to send on the wire from a given
+// local sync record.
+func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
+	// Get the object value at the required version.  Note: GetAtVersion()
+	// requires a transaction to read the data, so create and abort one.
+	// TODO(hpucha): remove the fake Tx after the change in GetAtVersion().
+	tx := st.NewTransaction()
+	defer tx.Abort()
+
+	key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
+	value, err := watchable.GetAtVersion(ctx, tx, []byte(key), nil, []byte(version))
+	if err != nil {
+		return nil, err
+	}
+
+	wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value}
+	return wireRec, nil
+}
+
 // A minHeap implements heap.Interface and holds local log records.
 type minHeap []*localLogRec
 
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
index 82c0d94..b452c07 100644
--- a/x/ref/services/syncbase/vsync/responder_test.go
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -12,6 +12,7 @@
 	"time"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
@@ -378,7 +379,7 @@
 				if opfx == "" {
 					continue
 				}
-				okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
+				okey := makeRowKey(fmt.Sprintf("%s~%x", opfx, tRng.Int()))
 				vers := fmt.Sprintf("%x", tRng.Int())
 				rec := &localLogRec{
 					Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
@@ -387,6 +388,10 @@
 				if err := putLogRec(nil, tx, rec); err != nil {
 					t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
 				}
+				value := fmt.Sprintf("value_%s", okey)
+				if err := watchable.PutAtVersion(nil, tx, []byte(okey), []byte(value), []byte(vers)); err != nil {
+					t.Fatalf("PutAtVersion(%d:%d) failed rec %v value %s: err %v", id, k, rec, value, err)
+				}
 
 				initPfxs := extractAndSortPrefixes(test.initVec)
 				if !filterLogRec(rec, test.initVec, initPfxs) {
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 49d9ff5..8db0bc6 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -155,10 +155,23 @@
 		return verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
 
-	ds.ckPtGen = ds.gen
+	// The frozen generation is the last generation number used, i.e. one
+	// below the next available one to use.
+	ds.ckPtGen = ds.gen - 1
 	return nil
 }
 
+// initDbSyncStateInMem initializes the in memory sync state of the Database if needed.
+func (s *syncService) initDbSyncStateInMem(ctx *context.T, appName, dbName string) {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	if s.syncState[name] == nil {
+		s.syncState[name] = &dbSyncStateInMem{gen: 1}
+	}
+}
+
 // getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
 func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
 	s.syncStateLock.Lock()
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 31eaff0..4045416 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -21,6 +21,7 @@
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/naming"
@@ -420,9 +421,8 @@
 			return err
 		}
 
-		// TODO(hpucha): Add watch notification to signal SG creation.
-
-		return nil
+		tx1 := tx.(store.Transaction)
+		return watchable.AddSyncGroupOp(ctx, tx1, spec.Prefixes, false)
 	})
 
 	if err != nil {
@@ -516,9 +516,8 @@
 			return err
 		}
 
-		// TODO(hpucha): Add a watch notification to signal new SG.
-
-		return nil
+		tx1 := tx.(store.Transaction)
+		return watchable.AddSyncGroupOp(ctx, tx1, sg.Spec.Prefixes, false)
 	})
 
 	if err != nil {
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 804907c..4865d82 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -93,6 +93,9 @@
 		resMark = ""
 	}
 
+	// Initialize Database sync state if needed.
+	s.initDbSyncStateInMem(ctx, appName, dbName)
+
 	// Get a batch of watch log entries, if any, after this resume marker.
 	if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
 		s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
@@ -327,22 +330,22 @@
 	timestamp := logEnt.CommitTimestamp
 
 	switch op := logEnt.Op.(type) {
-	case *watchable.OpGet:
+	case watchable.OpGet:
 		// TODO(rdaoud): save read-set in sync.
 
-	case *watchable.OpScan:
+	case watchable.OpScan:
 		// TODO(rdaoud): save scan-set in sync.
 
-	case *watchable.OpPut:
+	case watchable.OpPut:
 		rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
 
-	case *watchable.OpSyncSnapshot:
+	case watchable.OpSyncSnapshot:
 		rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
 
-	case *watchable.OpDelete:
+	case watchable.OpDelete:
 		rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
 
-	case *watchable.OpSyncGroup:
+	case watchable.OpSyncGroup:
 		vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
 
 	default:
@@ -377,7 +380,7 @@
 // Otherwise it returns false with no other changes.
 func processSyncGroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) bool {
 	switch op := logEnt.Op.(type) {
-	case *watchable.OpSyncGroup:
+	case watchable.OpSyncGroup:
 		remove := op.Value.Remove
 		for _, prefix := range op.Value.Prefixes {
 			if remove {
@@ -399,11 +402,11 @@
 func syncable(appdb string, logEnt *watchable.LogEntry) bool {
 	var key string
 	switch op := logEnt.Op.(type) {
-	case *watchable.OpPut:
+	case watchable.OpPut:
 		key = string(op.Value.Key)
-	case *watchable.OpDelete:
+	case watchable.OpDelete:
 		key = string(op.Value.Key)
-	case *watchable.OpSyncSnapshot:
+	case watchable.OpSyncSnapshot:
 		key = string(op.Value.Key)
 	default:
 		return false
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index aa7a72f..bf4fb95 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -116,7 +116,7 @@
 
 	for _, test := range checkSyncableTests {
 		log := &watchable.LogEntry{
-			Op: &watchable.OpPut{
+			Op: watchable.OpPut{
 				watchable.PutOp{Key: []byte(makeRowKey(test.key))},
 			},
 		}
@@ -133,9 +133,9 @@
 	k, v := []byte(key), []byte(version)
 	log := &watchable.LogEntry{}
 	if delete {
-		log.Op = &watchable.OpDelete{watchable.DeleteOp{Key: k}}
+		log.Op = watchable.OpDelete{watchable.DeleteOp{Key: k}}
 	} else {
-		log.Op = &watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
+		log.Op = watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
 	}
 	return log
 }
@@ -143,7 +143,7 @@
 // newSGLog creates a SyncGroup watch log entry.
 func newSGLog(prefixes []string, remove bool) *watchable.LogEntry {
 	return &watchable.LogEntry{
-		Op: &watchable.OpSyncGroup{
+		Op: watchable.OpSyncGroup{
 			watchable.SyncGroupOp{Prefixes: prefixes, Remove: remove},
 		},
 	}