Merge "syncbase/vsync: Integrate log and gen vector functionality (Part 2/2)."
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index a050e45..bb6f4d4 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -19,7 +19,11 @@
 const (
         // NodeRec type log record adds a new node in the dag.
         NodeRec = byte(0)
-        // LinkRec type log record adds a new link in the dag.
+
+        // LinkRec type log record adds a new link in the dag. Link records are
+        // added when a conflict is resolved by picking the local or the remote
+        // version as the resolution of a conflict, instead of creating a new
+        // version.
         LinkRec = byte(1)
 )
 
@@ -34,32 +38,24 @@
 
 // LogRecMetadata represents the metadata of a single log record that is
 // exchanged between two peers. Each log record represents a change made to an
-// object in the store. The log record metadata consists of Id, the device id
-// that created the log record, Gen, the generation number for that log record,
-// and RecType, the type of log record. It also contains information relevant to
-// the update to the object in the store: ObjId is the id of the object that was
-// updated. CurVers is the current version number of the object. Parents can
-// contain 0, 1 or 2 parent versions that the current version is derived from.
-// SyncTime is the timestamp when the update is committed to the Store.
-// Delete indicates whether the update resulted in the object being
-// deleted from the store. BatchId is the unique id of the Batch this update
-// belongs to. BatchCount is the number of objects in the Batch.
+// object in the store.
 //
-// TODO(hpucha): Add readset/scanset.
+// TODO(hpucha): Add readset/scanset. Look into sending tx metadata only once
+// per transaction.
 type LogRecMetadata struct {
         // Log related information.
-        Id        uint64
-	Gen       uint64
-	RecType   byte
+        Id        uint64 // device id that created the log record.
+	Gen       uint64 // generation number for the log record.
+	RecType   byte   // type of log record.
 
         // Object related information.
-        ObjId       string
-        CurVers     string
-        Parents     []string
-	SyncTime    time.Time
-	Delete      bool
-	BatchId     uint64
-	BatchCount  uint64
+        ObjId       string      // id of the object that was updated.
+        CurVers     string      // current version number of the object.
+        Parents     []string    // 0, 1 or 2 parent versions that the current version is derived from.
+	UpdTime     time.Time   // timestamp when the update is generated.
+	Delete      bool        // indicates whether the update resulted in object being deleted from the store.
+	BatchId     uint64      // unique id of the Batch this update belongs to.
+	BatchCount  uint64      // number of objects in the Batch.
 }
 
 // LogRec represents the on-wire representation of an entire log record: its
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index e188a35..a22b37d 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -39,31 +39,23 @@
 
 // LogRecMetadata represents the metadata of a single log record that is
 // exchanged between two peers. Each log record represents a change made to an
-// object in the store. The log record metadata consists of Id, the device id
-// that created the log record, Gen, the generation number for that log record,
-// and RecType, the type of log record. It also contains information relevant to
-// the update to the object in the store: ObjId is the id of the object that was
-// updated. CurVers is the current version number of the object. Parents can
-// contain 0, 1 or 2 parent versions that the current version is derived from.
-// SyncTime is the timestamp when the update is committed to the Store.
-// Delete indicates whether the update resulted in the object being
-// deleted from the store. BatchId is the unique id of the Batch this update
-// belongs to. BatchCount is the number of objects in the Batch.
+// object in the store.
 //
-// TODO(hpucha): Add readset/scanset.
+// TODO(hpucha): Add readset/scanset. Look into sending tx metadata only once
+// per transaction.
 type LogRecMetadata struct {
 	// Log related information.
-	Id      uint64
-	Gen     uint64
-	RecType byte
+	Id      uint64 // device id that created the log record.
+	Gen     uint64 // generation number for the log record.
+	RecType byte   // type of log record.
 	// Object related information.
-	ObjId      string
-	CurVers    string
-	Parents    []string
-	SyncTime   time.Time
-	Delete     bool
-	BatchId    uint64
-	BatchCount uint64
+	ObjId      string    // id of the object that was updated.
+	CurVers    string    // current version number of the object.
+	Parents    []string  // 0, 1 or 2 parent versions that the current version is derived from.
+	UpdTime    time.Time // timestamp when the update is generated.
+	Delete     bool      // indicates whether the update resulted in object being deleted from the store.
+	BatchId    uint64    // unique id of the Batch this update belongs to.
+	BatchCount uint64    // number of objects in the Batch.
 }
 
 func (LogRecMetadata) __VDLReflect(struct {
@@ -178,5 +170,8 @@
 // NodeRec type log record adds a new node in the dag.
 const NodeRec = byte(0)
 
-// LinkRec type log record adds a new link in the dag.
+// LinkRec type log record adds a new link in the dag. Link records are
+// added when a conflict is resolved by picking the local or the remote
+// version as the resolution of a conflict, instead of creating a new
+// version.
 const LinkRec = byte(1)
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 3d3563c..775d29f 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -29,7 +29,7 @@
 // syncService contains the metadata for the sync module.
 type syncService struct {
 	// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
-	id   uint64 // globally unique id for this instance of Syncbase
+	id   uint64 // globally unique id for this instance of Syncbase.
 	name string // name derived from the global id.
 	sv   interfaces.Service
 
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index dce51f2..fed4c9a 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -31,13 +31,17 @@
 // device.
 
 import (
+	"container/heap"
 	"fmt"
+	"sort"
+	"strings"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 
 	"v.io/v23/context"
+	"v.io/v23/rpc"
 	"v.io/v23/verror"
 )
 
@@ -45,7 +49,7 @@
 type dbSyncStateInMem struct {
 	gen    uint64
 	pos    uint64
-	genvec interfaces.GenVector
+	genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
 }
 
 // initSync initializes the sync module during startup. It scans all the
@@ -132,6 +136,28 @@
 	return gen, pos
 }
 
+// getDbGenInfo returns a copy of the current generation information of the Database.
+func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
+	}
+	// Make a copy of the genvec.
+	genvec := make(interfaces.GenVector)
+	for p, dspgv := range ds.genvec {
+		pgv := make(interfaces.PrefixGenVector)
+		for id, gen := range dspgv {
+			pgv[id] = gen
+		}
+		genvec[p] = pgv
+	}
+	return genvec, ds.gen, nil
+}
+
 // appDbName combines the app and db names to return a globally unique name for
 // a Database.  This relies on the fact that the app name is globally unique and
 // the db name is unique within the scope of the app.
@@ -160,7 +186,7 @@
 func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) {
 	var ds dbSyncState
 	if err := util.GetObject(st, dbSyncStateKey(), &ds); err != nil {
-		return nil, verror.New(verror.ErrInternal, ctx, err)
+		return nil, translateError(ctx, err, dbSyncStateKey())
 	}
 	return &ds, nil
 }
@@ -201,7 +227,7 @@
 func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
 	var rec localLogRec
 	if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
-		return nil, verror.New(verror.ErrInternal, ctx, err)
+		return nil, translateError(ctx, err, logRecKey(id, gen))
 	}
 	return &rec, nil
 }
@@ -215,3 +241,322 @@
 	}
 	return nil
 }
+
+////////////////////////////////////////////////////////////
+// Genvector-related utilities.
+
+// sendDeltasPerDatabase sends to an initiator all the missing generations
+// corresponding to the prefixes requested for this Database, and a genvector
+// summarizing the knowledge transferred from the responder to the
+// initiator. This happens in two phases:
+//
+// In the first phase, for a given set of nested prefixes from the initiator,
+// the shortest prefix in that set is extracted. The initiator's prefix
+// genvector for this shortest prefix represents the lower bound on its
+// knowledge for the entire set of nested prefixes. This prefix genvector
+// (representing the lower bound) is diffed with all the responder prefix
+// genvectors corresponding to same or deeper prefixes compared to the initiator
+// prefix. This diff produces a bound on the missing knowledge. For example, say
+// the initiator is interested in prefixes {foo, foobar}, where each prefix is
+// associated with a prefix genvector. Since the initiator strictly has as much
+// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
+// prefix genvector is chosen as the lower bound for the initiator's
+// knowledge. Similarly, say the responder has knowledge on prefixes {f,
+// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
+// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
+// compute a bound on missing generations (all responder's prefixes that match
+// "foo". Note that since the responder doesn't have a prefix genvector at
+// "foo", its knowledge at "f" is applicable to "foo").
+//
+// Since the first phase outputs an aggressive calculation of missing
+// generations containing more generation entries than strictly needed by the
+// initiator, in the second phase, each missing generation is sent to the
+// initiator only if the initiator is eligible for it and is not aware of
+// it. The generations are sent to the initiator in the same order as the
+// responder learned them so that the initiator can reconstruct the DAG for the
+// objects by learning older nodes first.
+func (s *syncService) sendDeltasPerDatabase(ctx *context.T, call rpc.ServerCall, appName, dbName string, initVec interfaces.GenVector, stream logRecStream) (interfaces.GenVector, error) {
+	// Phase 1 of sendDeltas. diff contains the bound on the generations
+	// missing from the initiator per device.
+	diff, outVec, err := s.computeDeltaBound(ctx, appName, dbName, initVec)
+	if err != nil {
+		return nil, err
+	}
+
+	// Phase 2 of sendDeltas: Process the diff, filtering out records that
+	// are not needed, and send the remainder on the wire ordered.
+	st, err := s.getDbStore(ctx, call, appName, dbName)
+	if err != nil {
+		return nil, err
+	}
+
+	// We now visit every log record in the generation range as obtained
+	// from phase 1 in their log order. We use a heap to incrementally sort
+	// the log records as per their position in the log.
+	//
+	// Init the min heap, one entry per device in the diff.
+	mh := make(minHeap, 0, len(diff))
+	for dev, r := range diff {
+		r.cur = r.min
+		rec, err := getNextLogRec(ctx, st, dev, r)
+		if err != nil {
+			return nil, err
+		}
+		if rec != nil {
+			mh = append(mh, rec)
+		} else {
+			delete(diff, dev)
+		}
+	}
+	heap.Init(&mh)
+
+	// Process the log records in order.
+	initPfxs := extractAndSortPrefixes(initVec)
+
+	for mh.Len() > 0 {
+		rec := heap.Pop(&mh).(*localLogRec)
+
+		if !filterLogRec(rec, initVec, initPfxs) {
+			// Send on the wire.
+			wireRec := interfaces.LogRec{Metadata: rec.Metadata}
+			// TODO(hpucha): Hash out this fake stream stuff when
+			// defining the RPC and the rest of the responder.
+			stream.Send(wireRec)
+		}
+
+		// Add a new record from the same device if not done.
+		dev := rec.Metadata.Id
+		rec, err := getNextLogRec(ctx, st, dev, diff[dev])
+		if err != nil {
+			return nil, err
+		}
+		if rec != nil {
+			heap.Push(&mh, rec)
+		} else {
+			delete(diff, dev)
+		}
+	}
+
+	return outVec, nil
+}
+
+// computeDeltaBound computes the bound on missing generations across all
+// requested prefixes (phase 1 of sendDeltas).
+func (s *syncService) computeDeltaBound(ctx *context.T, appName, dbName string, initVec interfaces.GenVector) (genRangeVector, interfaces.GenVector, error) {
+	respVec, respGen, err := s.getDbGenInfo(ctx, appName, dbName)
+	if err != nil {
+		return nil, nil, err
+	}
+	respPfxs := extractAndSortPrefixes(respVec)
+	initPfxs := extractAndSortPrefixes(initVec)
+	if len(initPfxs) == 0 {
+		return nil, nil, verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
+	}
+
+	outVec := make(interfaces.GenVector)
+	diff := make(genRangeVector)
+	pfx := initPfxs[0]
+
+	for _, p := range initPfxs {
+		if strings.HasPrefix(p, pfx) && p != pfx {
+			continue
+		}
+
+		// Process this prefix as this is the start of a new set of
+		// nested prefixes.
+		pfx = p
+
+		// Lower bound on initiator's knowledge for this prefix set.
+		initpgv := initVec[pfx]
+
+		// Find the relevant responder prefixes and add the corresponding knowledge.
+		var respgv interfaces.PrefixGenVector
+		var rpStart string
+		for _, rp := range respPfxs {
+			if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
+				// No relationship with pfx.
+				continue
+			}
+
+			if strings.HasPrefix(pfx, rp) {
+				// If rp is a prefix of pfx, remember it because
+				// it may be a potential starting point for the
+				// responder's knowledge. The actual starting
+				// point is the deepest prefix where rp is a
+				// prefix of pfx.
+				//
+				// Say the initiator is looking for "foo", and
+				// the responder has knowledge for "f" and "fo",
+				// the responder's starting point will be the
+				// prefix genvector for "fo". Similarly, if the
+				// responder has knowledge for "foo", the
+				// starting point will be the prefix genvector
+				// for "foo".
+				rpStart = rp
+			} else {
+				// If pfx is a prefix of rp, this knowledge must
+				// be definitely sent to the initiator. Diff the
+				// prefix genvectors to adjust the delta bound and
+				// include in outVec.
+				respgv = respVec[rp]
+				// Add local generation information to
+				// responder's genvec and returned genvec.
+				respgv[s.id] = respGen
+				s.diffPrefixGenVectors(respgv, initpgv, diff)
+				outVec[rp] = respgv
+			}
+		}
+
+		// Deal with the starting point.
+		if rpStart == "" {
+			// No matching prefixes for pfx were found.
+			respgv = make(interfaces.PrefixGenVector)
+		} else {
+			respgv = respVec[rpStart]
+		}
+		respgv[s.id] = respGen
+		s.diffPrefixGenVectors(respgv, initpgv, diff)
+		outVec[pfx] = respgv
+	}
+
+	return diff, outVec, nil
+}
+
+// genRange represents a range of generations (min and max inclusive).
+type genRange struct {
+	min uint64
+	max uint64
+	cur uint64
+}
+
+type genRangeVector map[uint64]*genRange
+
+// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
+// and the initiator, and updates the range of generations per device known to
+// the responder but not known to the initiator. "gens" (generation range) is
+// passed in as an input argument so that it can be incrementally updated as the
+// range of missing generations grows when different responder prefix genvectors
+// are used to compute the diff.
+//
+// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
+// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
+// two vectors returns: {A:[6-10], C:[1-1]}.
+//
+// TODO(hpucha): Add reclaimVec for GCing.
+func (s *syncService) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector, gens genRangeVector) {
+	// Compute missing generations for devices that are in both initiator's and responder's vectors.
+	for devid, gen := range initPVec {
+		rgen, ok := respPVec[devid]
+		// Skip since responder doesn't know of this device.
+		if ok {
+			updateDevRange(devid, rgen, gen, gens)
+		}
+	}
+
+	// Compute missing generations for devices not in initiator's vector but in responder's vector.
+	for devid, rgen := range respPVec {
+		if _, ok := initPVec[devid]; !ok {
+			updateDevRange(devid, rgen, 0, gens)
+		}
+	}
+}
+
+func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
+	if gen < rgen {
+		// Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
+		if r, ok := gens[devid]; !ok {
+			gens[devid] = &genRange{min: gen + 1, max: rgen}
+		} else {
+			if gen+1 < r.min {
+				r.min = gen + 1
+			}
+			if rgen > r.max {
+				r.max = rgen
+			}
+		}
+	}
+}
+
+func extractAndSortPrefixes(vec interfaces.GenVector) []string {
+	pfxs := make([]string, len(vec))
+	i := 0
+	for p := range vec {
+		pfxs[i] = p
+		i++
+	}
+	sort.Strings(pfxs)
+	return pfxs
+}
+
+// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
+// loop.
+func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
+	for i := r.cur; i <= r.max; i++ {
+		rec, err := getLogRec(ctx, sn, dev, i)
+		if err == nil {
+			r.cur = i + 1
+			return rec, nil
+		}
+		if verror.ErrorID(err) != verror.ErrNoExist.ID {
+			return nil, err
+		}
+	}
+	return nil, nil
+}
+
+// Note: initPfxs is sorted.
+func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
+	filter := true
+
+	var maxGen uint64
+	for _, p := range initPfxs {
+		if strings.HasPrefix(rec.Metadata.ObjId, p) {
+			// Do not filter. Initiator is interested in this
+			// prefix.
+			filter = false
+
+			// Track if the initiator knows of this record.
+			gen := initVec[p][rec.Metadata.Id]
+			if maxGen < gen {
+				maxGen = gen
+			}
+		}
+	}
+
+	// Filter this record if the initiator already has it.
+	if maxGen >= rec.Metadata.Gen {
+		return true
+	}
+
+	return filter
+}
+
+// A minHeap implements heap.Interface and holds local log records.
+type minHeap []*localLogRec
+
+func (mh minHeap) Len() int { return len(mh) }
+
+func (mh minHeap) Less(i, j int) bool {
+	return mh[i].Pos < mh[j].Pos
+}
+
+func (mh minHeap) Swap(i, j int) {
+	mh[i], mh[j] = mh[j], mh[i]
+}
+
+func (mh *minHeap) Push(x interface{}) {
+	item := x.(*localLogRec)
+	*mh = append(*mh, item)
+}
+
+func (mh *minHeap) Pop() interface{} {
+	old := *mh
+	n := len(old)
+	item := old[n-1]
+	*mh = old[0 : n-1]
+	return item
+}
+
+type logRecStream interface {
+	Send(interfaces.LogRec)
+}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 8606472..4a41e17 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -5,6 +5,8 @@
 package vsync
 
 import (
+	"fmt"
+	"math/rand"
 	"reflect"
 	"testing"
 	"time"
@@ -81,7 +83,7 @@
 			ObjId:      "foo",
 			CurVers:    "3",
 			Parents:    []string{"1", "2"},
-			SyncTime:   time.Now().UTC(),
+			UpdTime:    time.Now().UTC(),
 			Delete:     false,
 			BatchId:    10000,
 			BatchCount: 1,
@@ -108,9 +110,412 @@
 	checkLogRec(t, st, id, gen, false, nil)
 }
 
+// TestDiffPrefixGenVectors tests diffing prefix gen vectors.
+func TestDiffPrefixGenVectors(t *testing.T) {
+	svc := createService(t)
+	s := svc.sync
+	s.id = 10 //responder. Initiator is id 11.
+
+	tests := []struct {
+		respPVec, initPVec interfaces.PrefixGenVector
+		genDiffIn          genRangeVector
+		genDiffWant        genRangeVector
+	}{
+		{ // responder and initiator are at identical vectors.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder and initiator are at identical vectors.
+			respPVec:  interfaces.PrefixGenVector{10: 0},
+			initPVec:  interfaces.PrefixGenVector{10: 0},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder has no updates.
+			respPVec:  interfaces.PrefixGenVector{10: 0},
+			initPVec:  interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 8},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder and initiator have no updates.
+			respPVec:  interfaces.PrefixGenVector{10: 0},
+			initPVec:  interfaces.PrefixGenVector{11: 0},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder is staler than initiator.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 8, 14: 5},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder is more up-to-date than initiator for local updates.
+			respPVec:    interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 2},
+			initPVec:    interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			genDiffIn:   make(genRangeVector),
+			genDiffWant: genRangeVector{10: &genRange{min: 2, max: 5}},
+		},
+		{ // responder is fresher than initiator for local updates and one device.
+			respPVec:  interfaces.PrefixGenVector{10: 5, 11: 10, 12: 22, 13: 2},
+			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2, 14: 40},
+			genDiffIn: make(genRangeVector),
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 2, max: 5},
+				12: &genRange{min: 21, max: 22},
+			},
+		},
+		{ // responder is fresher than initiator in all but one device.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			initPVec:  interfaces.PrefixGenVector{10: 0, 11: 2, 12: 0},
+			genDiffIn: make(genRangeVector),
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 1, max: 1},
+				12: &genRange{min: 1, max: 3},
+				13: &genRange{min: 1, max: 4},
+			},
+		},
+		{ // initiator has no updates.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			initPVec:  interfaces.PrefixGenVector{},
+			genDiffIn: make(genRangeVector),
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 1, max: 1},
+				11: &genRange{min: 1, max: 2},
+				12: &genRange{min: 1, max: 3},
+				13: &genRange{min: 1, max: 4},
+			},
+		},
+		{ // initiator has no updates, pre-existing diff.
+			respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			initPVec: interfaces.PrefixGenVector{13: 1},
+			genDiffIn: genRangeVector{
+				10: &genRange{min: 5, max: 20},
+				13: &genRange{min: 1, max: 3},
+			},
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 1, max: 20},
+				11: &genRange{min: 1, max: 2},
+				12: &genRange{min: 1, max: 3},
+				13: &genRange{min: 1, max: 4},
+			},
+		},
+	}
+
+	for _, test := range tests {
+		want := test.genDiffWant
+		got := test.genDiffIn
+		s.diffPrefixGenVectors(test.respPVec, test.initPVec, got)
+		checkEqualDevRanges(t, got, want)
+	}
+}
+
+// TestSendDeltas tests the computation of the delta bound (computeDeltaBound)
+// and if the log records on the wire are correctly ordered.
+func TestSendDeltas(t *testing.T) {
+	appName := "mockapp"
+	dbName := "mockdb"
+
+	tests := []struct {
+		respVec, initVec, outVec interfaces.GenVector
+		respGen                  uint64
+		genDiff                  genRangeVector
+		keyPfxs                  []string
+	}{
+		{ // Identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{12: 8},
+				"foobar": interfaces.PrefixGenVector{12: 10},
+			},
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 8},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 1, max: 10},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", ""},
+		},
+		{ // Identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"bar":    interfaces.PrefixGenVector{12: 20},
+				"foo":    interfaces.PrefixGenVector{12: 8},
+				"foobar": interfaces.PrefixGenVector{12: 10},
+			},
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5, 12: 10},
+				"bar":    interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 8},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
+				"bar":    interfaces.PrefixGenVector{10: 5, 12: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
+		},
+		{ // Non-identical prefixes, local only updates.
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{10: 5},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"f":      interfaces.PrefixGenVector{12: 5, 13: 5},
+				"foo":    interfaces.PrefixGenVector{12: 10, 13: 10},
+				"foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 10, 13: 10},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 20},
+				13: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 20},
+				13: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"f": interfaces.PrefixGenVector{12: 20, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 20},
+				13: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical interleaving prefixes.
+			respVec: interfaces.GenVector{
+				"f":      interfaces.PrefixGenVector{12: 20, 13: 10},
+				"foo":    interfaces.PrefixGenVector{12: 30, 13: 20},
+				"foobar": interfaces.PrefixGenVector{12: 40, 13: 30},
+			},
+			initVec: interfaces.GenVector{
+				"fo":        interfaces.PrefixGenVector{11: 5, 12: 1},
+				"foob":      interfaces.PrefixGenVector{11: 5, 12: 10},
+				"foobarxyz": interfaces.PrefixGenVector{11: 5, 12: 20},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"fo":     interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 40},
+				13: &genRange{min: 1, max: 30},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
+		},
+		{ // Non-identical interleaving prefixes.
+			respVec: interfaces.GenVector{
+				"fo":        interfaces.PrefixGenVector{12: 20, 13: 10},
+				"foob":      interfaces.PrefixGenVector{12: 30, 13: 20},
+				"foobarxyz": interfaces.PrefixGenVector{12: 40, 13: 30},
+			},
+			initVec: interfaces.GenVector{
+				"f":      interfaces.PrefixGenVector{11: 5, 12: 1},
+				"foo":    interfaces.PrefixGenVector{11: 5, 12: 10},
+				"foobar": interfaces.PrefixGenVector{11: 5, 12: 20},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"f":         interfaces.PrefixGenVector{10: 5},
+				"fo":        interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+				"foob":      interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+				"foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 40},
+				13: &genRange{min: 1, max: 30},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
+		},
+		{ // Non-identical sibling prefixes.
+			respVec: interfaces.GenVector{
+				"foo":       interfaces.PrefixGenVector{12: 20, 13: 10},
+				"foobarabc": interfaces.PrefixGenVector{12: 40, 13: 30},
+				"foobarxyz": interfaces.PrefixGenVector{12: 30, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":       interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+				"foobarabc": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+				"foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 40},
+				13: &genRange{min: 1, max: 30},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "foobarabc", "foobarxyz", "foobar123", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"barbaz": interfaces.PrefixGenVector{12: 18},
+				"f":      interfaces.PrefixGenVector{12: 30, 13: 5},
+				"foobar": interfaces.PrefixGenVector{12: 30, 13: 8},
+			},
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5, 12: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5, 12: 5},
+				"bar":    interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 30, 13: 5},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 8},
+				"bar":    interfaces.PrefixGenVector{10: 5},
+				"barbaz": interfaces.PrefixGenVector{10: 5, 12: 18},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 6, max: 30},
+				13: &genRange{min: 1, max: 8},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
+		},
+	}
+
+	for i, test := range tests {
+		svc := createService(t)
+		s := svc.sync
+		s.id = 10 //responder.
+
+		wantDiff, wantVec := test.genDiff, test.outVec
+		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, genvec: test.respVec}
+
+		gotDiff, gotVec, err := s.computeDeltaBound(nil, appName, dbName, test.initVec)
+		if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
+			t.Fatalf("computeDeltaBound failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
+		}
+		checkEqualDevRanges(t, gotDiff, wantDiff)
+
+		// Insert some log records to bootstrap testing below.
+		tRng := rand.New(rand.NewSource(int64(i)))
+		var wantRecs []*localLogRec
+		st := svc.St()
+		tx := st.NewTransaction()
+		objKeyPfxs := test.keyPfxs
+		j := 0
+		for id, r := range wantDiff {
+			pos := uint64(tRng.Intn(50) + 100*j)
+			for k := r.min; k <= r.max; k++ {
+				opfx := objKeyPfxs[tRng.Intn(len(objKeyPfxs))]
+				// Create holes in the log records.
+				if opfx == "" {
+					continue
+				}
+				okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
+				vers := fmt.Sprintf("%x", tRng.Int())
+				rec := &localLogRec{
+					Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
+					Pos:      pos + k,
+				}
+				if err := putLogRec(nil, tx, rec); err != nil {
+					t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
+				}
+
+				initPfxs := extractAndSortPrefixes(test.initVec)
+				if !filterLogRec(rec, test.initVec, initPfxs) {
+					wantRecs = append(wantRecs, rec)
+				}
+			}
+			j++
+		}
+		if err := tx.Commit(); err != nil {
+			t.Fatalf("cannot commit putting log rec, err %v", err)
+		}
+
+		ts := &logRecStreamTest{}
+		gotVec, err = s.sendDeltasPerDatabase(nil, nil, appName, dbName, test.initVec, ts)
+		if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
+			t.Fatalf("sendDeltasPerDatabase failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
+		}
+		ts.diffLogRecs(t, wantRecs)
+	}
+}
+
 //////////////////////////////
 // Helpers
 
+// TODO(hpucha): Look into using v.io/syncbase/v23/syncbase/testutil.Fatalf()
+// for getting the stack trace. Right now cannot import the package due to a
+// cycle.
+
+type logRecStreamTest struct {
+	gotRecs []*localLogRec
+}
+
+func (s *logRecStreamTest) Send(rec interfaces.LogRec) {
+	s.gotRecs = append(s.gotRecs, &localLogRec{Metadata: rec.Metadata})
+}
+
+func (s *logRecStreamTest) diffLogRecs(t *testing.T, wantRecs []*localLogRec) {
+	if len(s.gotRecs) != len(wantRecs) {
+		t.Fatalf("diffLogRecMetadata failed, gotLen %v, wantLen %v\n", len(s.gotRecs), len(wantRecs))
+	}
+	for i, rec := range s.gotRecs {
+		if !reflect.DeepEqual(rec.Metadata, wantRecs[i].Metadata) {
+			t.Fatalf("diffLogRecMetadata failed, i %v, got %v, want %v\n", i, rec.Metadata, wantRecs[i].Metadata)
+		}
+	}
+}
+
 func checkDbSyncState(t *testing.T, st store.StoreReader, exists bool, wantSt *dbSyncState) {
 	gotSt, err := getDbSyncState(nil, st)
 
@@ -137,5 +542,15 @@
 	if hasLogRec(st, id, gen) != exists {
 		t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
 	}
+}
 
+func checkEqualDevRanges(t *testing.T, s1, s2 genRangeVector) {
+	if len(s1) != len(s2) {
+		t.Fatalf("len(s1): %v != len(s2): %v", len(s1), len(s2))
+	}
+	for d1, r1 := range s1 {
+		if r2, ok := s2[d1]; !ok || !reflect.DeepEqual(r1, r2) {
+			t.Fatalf("Dev %v: r1 %v != r2 %v", d1, r1, r2)
+		}
+	}
 }
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index da98994..3a9a6fb 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -616,18 +616,12 @@
 // Methods for SyncGroup create/join between Syncbases.
 
 func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
-
-	// Find the database store for this SyncGroup.
-	app, err := s.sv.App(ctx, call, sg.AppName)
-	if err != nil {
-		return err
-	}
-	db, err := app.NoSQLDatabase(ctx, call, sg.DbName)
+	st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
 	if err != nil {
 		return err
 	}
 
-	err = store.RunInTransaction(db.St(), func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
 		localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
 
 		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 2810cd0..d061f4a 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -15,11 +15,12 @@
 
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen    uint64               // local generation number
-	GenVec interfaces.GenVector // generation vector
+	Gen    uint64               // local generation number incremented on every local update.
+	GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
 }
 
-// localLogRec represents the persistent local state of a log record.
+// localLogRec represents the persistent local state of a log record. Metadata
+// is synced across peers, while pos is local-only.
 type localLogRec struct {
 	Metadata interfaces.LogRecMetadata
 	Pos      uint64 // position in the Database log.
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index 17e8348..54c0466 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -27,8 +27,8 @@
 
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen    uint64               // local generation number
-	GenVec interfaces.GenVector // generation vector
+	Gen    uint64               // local generation number incremented on every local update.
+	GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
 }
 
 func (dbSyncState) __VDLReflect(struct {
@@ -36,7 +36,8 @@
 }) {
 }
 
-// localLogRec represents the persistent local state of a log record.
+// localLogRec represents the persistent local state of a log record. Metadata
+// is synced across peers, while pos is local-only.
 type localLogRec struct {
 	Metadata interfaces.LogRecMetadata
 	Pos      uint64 // position in the Database log.
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index 01bc3eb..62edcd2 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -9,6 +9,8 @@
 import (
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
 )
 
@@ -53,3 +55,24 @@
 		}
 	}
 }
+
+// getDbStore gets the store handle to the database.
+func (s *syncService) getDbStore(ctx *context.T, call rpc.ServerCall, appName, dbName string) (store.Store, error) {
+	app, err := s.sv.App(ctx, call, appName)
+	if err != nil {
+		return nil, err
+	}
+	db, err := app.NoSQLDatabase(ctx, call, dbName)
+	if err != nil {
+		return nil, err
+	}
+	return db.St(), nil
+}
+
+// translateError translates store errors.
+func translateError(ctx *context.T, err error, key string) error {
+	if verror.ErrorID(err) == store.ErrUnknownKey.ID {
+		return verror.New(verror.ErrNoExist, ctx, key)
+	}
+	return verror.New(verror.ErrInternal, ctx, key, err)
+}