syncbase/vsync: Responder module.

Change-Id: I2bc987c263283dc78c00642887208e54af6d3540
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
index caf0339..5ab18b0 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
@@ -96,8 +96,11 @@
 }
 
 // DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested with in a Database when requesting deltas for that Database.
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for that Database.
 type DeltaReq struct {
+	AppName string
+	DbName string
 	SgIds   set[GroupId]
 	InitVec GenVector
 }
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
index a22731d..aa0872a 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -156,8 +156,11 @@
 }
 
 // DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested with in a Database when requesting deltas for that Database.
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for that Database.
 type DeltaReq struct {
+	AppName string
+	DbName  string
 	SgIds   map[GroupId]struct{}
 	InitVec GenVector
 }
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index 135ee35..3447fcb 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -48,6 +48,9 @@
 	RootDir string
 	// Storage engine to use (for service and per-database engines).
 	Engine string
+	// RPC server for this service. Needed to advertise this service in
+	// mount tables attached to SyncGroups.
+	Server rpc.Server
 }
 
 // NewService creates a new service instance and returns it.
@@ -71,7 +74,7 @@
 	if err := util.Put(ctx, s.st, s, data); err != nil {
 		return nil, err
 	}
-	if s.sync, err = vsync.New(ctx, call, s); err != nil {
+	if s.sync, err = vsync.New(ctx, call, s, opts.Server); err != nil {
 		return nil, err
 	}
 	return s, nil
diff --git a/x/ref/services/syncbase/syncbased/main.go b/x/ref/services/syncbase/syncbased/main.go
index 729e386..e1ee16f 100644
--- a/x/ref/services/syncbase/syncbased/main.go
+++ b/x/ref/services/syncbase/syncbased/main.go
@@ -19,7 +19,6 @@
 	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/lib/signals"
-	"v.io/x/ref/lib/xrpc"
 	_ "v.io/x/ref/runtime/factories/roaming"
 )
 
@@ -45,6 +44,14 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
+	s, err := v23.NewServer(ctx)
+	if err != nil {
+		vlog.Fatal("v23.NewServer() failed: ", err)
+	}
+	if _, err := s.Listen(v23.GetListenSpec(ctx)); err != nil {
+		vlog.Fatal("s.Listen() failed: ", err)
+	}
+
 	perms, err := securityflag.PermissionsFromFlag()
 	if err != nil {
 		vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
@@ -60,14 +67,16 @@
 		Perms:   perms,
 		RootDir: *rootDir,
 		Engine:  *engine,
+		Server:  s,
 	})
 	if err != nil {
 		vlog.Fatal("server.NewService() failed: ", err)
 	}
 	d := server.NewDispatcher(service)
 
-	if _, err = xrpc.NewDispatchingServer(ctx, *name, d); err != nil {
-		vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
+	// Publish the service in the mount table.
+	if err := s.ServeDispatcher(*name, d); err != nil {
+		vlog.Fatal("s.ServeDispatcher() failed: ", err)
 	}
 	if *name != "" {
 		vlog.Info("Mounted at: ", *name)
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
new file mode 100644
index 0000000..19b1366
--- /dev/null
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -0,0 +1,439 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"container/heap"
+	"sort"
+	"strings"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+// GetDeltas implements the responder side of the GetDeltas RPC.
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
+	recvr := call.RecvStream()
+
+	for recvr.Advance() {
+		req := recvr.Value()
+		// Ignoring errors since if one Database fails for any reason,
+		// it is fine to continue to the next one. In fact, sometimes
+		// the failure might be genuine. For example, the responder is
+		// no longer part of the requested SyncGroups, or the app/db is
+		// locally deleted, or a permission change has denied access.
+		rSt := newResponderState(ctx, call, s, req)
+		rSt.sendDeltasPerDatabase(ctx)
+	}
+
+	// TODO(hpucha): Is there a need to call finish or some such?
+	return recvr.Err()
+}
+
+// responderState is state accumulated per Database by the responder during an
+// initiation round.
+type responderState struct {
+	req      interfaces.DeltaReq
+	call     interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
+	errState error                              // Captures the error from the first two phases of the responder.
+	sync     *syncService
+	st       store.Store // Store handle to the Database.
+	diff     genRangeVector
+	outVec   interfaces.GenVector
+}
+
+func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq) *responderState {
+	rSt := &responderState{call: call, sync: sync, req: req}
+	return rSt
+}
+
+// sendDeltasPerDatabase sends to an initiator all the missing generations
+// corresponding to the prefixes requested for this Database, and a genvector
+// summarizing the knowledge transferred from the responder to the
+// initiator. This happens in three phases:
+//
+// In the first phase, the initiator is checked against the SyncGroup ACLs of
+// all the SyncGroups it is requesting, and only those prefixes that belong to
+// allowed SyncGroups are carried forward.
+//
+// In the second phase, for a given set of nested prefixes from the initiator,
+// the shortest prefix in that set is extracted. The initiator's prefix
+// genvector for this shortest prefix represents the lower bound on its
+// knowledge for the entire set of nested prefixes. This prefix genvector
+// (representing the lower bound) is diffed with all the responder prefix
+// genvectors corresponding to same or deeper prefixes compared to the initiator
+// prefix. This diff produces a bound on the missing knowledge. For example, say
+// the initiator is interested in prefixes {foo, foobar}, where each prefix is
+// associated with a prefix genvector. Since the initiator strictly has as much
+// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
+// prefix genvector is chosen as the lower bound for the initiator's
+// knowledge. Similarly, say the responder has knowledge on prefixes {f,
+// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
+// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
+// compute a bound on missing generations (all responder's prefixes that match
+// "foo". Note that since the responder doesn't have a prefix genvector at
+// "foo", its knowledge at "f" is applicable to "foo").
+//
+// Since the second phase outputs an aggressive calculation of missing
+// generations containing more generation entries than strictly needed by the
+// initiator, in the third phase, each missing generation is sent to the
+// initiator only if the initiator is eligible for it and is not aware of
+// it. The generations are sent to the initiator in the same order as the
+// responder learned them so that the initiator can reconstruct the DAG for the
+// objects by learning older nodes first.
+func (rSt *responderState) sendDeltasPerDatabase(ctx *context.T) error {
+	// Phase 1 of sendDeltas: Authorize the initiator and respond to the
+	// caller only for the SyncGroups that allow access.
+	rSt.authorizeAndFilterSyncGroups(ctx)
+
+	// Phase 2 of sendDeltas: diff contains the bound on the
+	// generations missing from the initiator per device.
+	rSt.computeDeltaBound(ctx)
+
+	// Phase 3 of sendDeltas: Process the diff, filtering out records that
+	// are not needed, and send the remainder on the wire ordered.
+	return rSt.filterAndSendDeltas(ctx)
+}
+
+// authorizeAndFilterSyncGroups authorizes the initiator against the requested
+// SyncGroups and filters the initiator's prefixes to only include those from
+// allowed SyncGroups (phase 1 of sendDeltas).
+func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) {
+	rSt.st, rSt.errState = rSt.sync.getDbStore(ctx, nil, rSt.req.AppName, rSt.req.DbName)
+	if rSt.errState != nil {
+		return
+	}
+
+	allowedPfxs := make(map[string]struct{})
+	for sgid := range rSt.req.SgIds {
+		// Check permissions for the SyncGroup.
+		var sg *interfaces.SyncGroup
+		sg, rSt.errState = getSyncGroupById(ctx, rSt.st, sgid)
+		if rSt.errState != nil {
+			return
+		}
+		rSt.errState = authorize(ctx, rSt.call.Security(), sg)
+		if verror.ErrorID(rSt.errState) == verror.ErrNoAccess.ID {
+			continue
+		} else if rSt.errState != nil {
+			return
+		}
+
+		for _, p := range sg.Spec.Prefixes {
+			allowedPfxs[p] = struct{}{}
+		}
+	}
+
+	// Filter the initiator's prefixes to what is allowed.
+	for pfx := range rSt.req.InitVec {
+		if _, ok := allowedPfxs[pfx]; ok {
+			continue
+		}
+		allowed := false
+		for p := range allowedPfxs {
+			if strings.HasPrefix(pfx, p) {
+				allowed = true
+			}
+		}
+
+		if !allowed {
+			delete(rSt.req.InitVec, pfx)
+		}
+	}
+	return
+}
+
+// computeDeltaBound computes the bound on missing generations across all
+// requested prefixes (phase 2 of sendDeltas).
+func (rSt *responderState) computeDeltaBound(ctx *context.T) {
+	// Check error from phase 1.
+	if rSt.errState != nil {
+		return
+	}
+
+	if len(rSt.req.InitVec) == 0 {
+		rSt.errState = verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
+		return
+	}
+
+	var respVec interfaces.GenVector
+	var respGen uint64
+	respVec, respGen, rSt.errState = rSt.sync.getDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
+	if rSt.errState != nil {
+		return
+	}
+	respPfxs := extractAndSortPrefixes(respVec)
+	initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+
+	rSt.outVec = make(interfaces.GenVector)
+	rSt.diff = make(genRangeVector)
+	pfx := initPfxs[0]
+
+	for _, p := range initPfxs {
+		if strings.HasPrefix(p, pfx) && p != pfx {
+			continue
+		}
+
+		// Process this prefix as this is the start of a new set of
+		// nested prefixes.
+		pfx = p
+
+		// Lower bound on initiator's knowledge for this prefix set.
+		initpgv := rSt.req.InitVec[pfx]
+
+		// Find the relevant responder prefixes and add the corresponding knowledge.
+		var respgv interfaces.PrefixGenVector
+		var rpStart string
+		for _, rp := range respPfxs {
+			if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
+				// No relationship with pfx.
+				continue
+			}
+
+			if strings.HasPrefix(pfx, rp) {
+				// If rp is a prefix of pfx, remember it because
+				// it may be a potential starting point for the
+				// responder's knowledge. The actual starting
+				// point is the deepest prefix where rp is a
+				// prefix of pfx.
+				//
+				// Say the initiator is looking for "foo", and
+				// the responder has knowledge for "f" and "fo",
+				// the responder's starting point will be the
+				// prefix genvector for "fo". Similarly, if the
+				// responder has knowledge for "foo", the
+				// starting point will be the prefix genvector
+				// for "foo".
+				rpStart = rp
+			} else {
+				// If pfx is a prefix of rp, this knowledge must
+				// be definitely sent to the initiator. Diff the
+				// prefix genvectors to adjust the delta bound and
+				// include in outVec.
+				respgv = respVec[rp]
+				rSt.diffPrefixGenVectors(respgv, initpgv)
+				rSt.outVec[rp] = respgv
+			}
+		}
+
+		// Deal with the starting point.
+		if rpStart == "" {
+			// No matching prefixes for pfx were found.
+			respgv = make(interfaces.PrefixGenVector)
+			respgv[rSt.sync.id] = respGen
+		} else {
+			respgv = respVec[rpStart]
+		}
+		rSt.diffPrefixGenVectors(respgv, initpgv)
+		rSt.outVec[pfx] = respgv
+	}
+
+	return
+}
+
+// filterAndSendDeltas filters the computed delta to remove records already
+// known by the initiator, and sends the resulting records to the initiator
+// (phase 3 of sendDeltas).
+func (rSt *responderState) filterAndSendDeltas(ctx *context.T) error {
+	// Always send a start and finish response so that the initiator can
+	// move on to the next Database.
+	//
+	// TODO(hpucha): Although ok for now to call SendStream once per
+	// Database, would like to make this implementation agnostic.
+	sender := rSt.call.SendStream()
+	sender.Send(interfaces.DeltaRespStart{true})
+	defer sender.Send(interfaces.DeltaRespFinish{true})
+
+	// Check error from phase 2.
+	if rSt.errState != nil {
+		return rSt.errState
+	}
+
+	// First two phases were successful. So now on to phase 3. We now visit
+	// every log record in the generation range as obtained from phase 1 in
+	// their log order. We use a heap to incrementally sort the log records
+	// as per their position in the log.
+	//
+	// Init the min heap, one entry per device in the diff.
+	mh := make(minHeap, 0, len(rSt.diff))
+	for dev, r := range rSt.diff {
+		r.cur = r.min
+		rec, err := getNextLogRec(ctx, rSt.st, dev, r)
+		if err != nil {
+			return err
+		}
+		if rec != nil {
+			mh = append(mh, rec)
+		} else {
+			delete(rSt.diff, dev)
+		}
+	}
+	heap.Init(&mh)
+
+	// Process the log records in order.
+	initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+	for mh.Len() > 0 {
+		rec := heap.Pop(&mh).(*localLogRec)
+
+		if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
+			// Send on the wire.
+			wireRec := interfaces.LogRec{Metadata: rec.Metadata}
+			// TODO(hpucha): Hash out this fake stream stuff when
+			// defining the RPC and the rest of the responder.
+			sender.Send(interfaces.DeltaRespRec{wireRec})
+		}
+
+		// Add a new record from the same device if not done.
+		dev := rec.Metadata.Id
+		rec, err := getNextLogRec(ctx, rSt.st, dev, rSt.diff[dev])
+		if err != nil {
+			return err
+		}
+		if rec != nil {
+			heap.Push(&mh, rec)
+		} else {
+			delete(rSt.diff, dev)
+		}
+	}
+
+	sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
+	return nil
+}
+
+// genRange represents a range of generations (min and max inclusive).
+type genRange struct {
+	min uint64
+	max uint64
+	cur uint64
+}
+
+type genRangeVector map[uint64]*genRange
+
+// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
+// and the initiator, and updates the range of generations per device known to
+// the responder but not known to the initiator. "gens" (generation range) is
+// passed in as an input argument so that it can be incrementally updated as the
+// range of missing generations grows when different responder prefix genvectors
+// are used to compute the diff.
+//
+// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
+// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
+// two vectors returns: {A:[6-10], C:[1-1]}.
+//
+// TODO(hpucha): Add reclaimVec for GCing.
+func (rSt *responderState) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector) {
+	// Compute missing generations for devices that are in both initiator's and responder's vectors.
+	for devid, gen := range initPVec {
+		rgen, ok := respPVec[devid]
+		if ok {
+			updateDevRange(devid, rgen, gen, rSt.diff)
+		}
+	}
+
+	// Compute missing generations for devices not in initiator's vector but in responder's vector.
+	for devid, rgen := range respPVec {
+		if _, ok := initPVec[devid]; !ok {
+			updateDevRange(devid, rgen, 0, rSt.diff)
+		}
+	}
+}
+
+func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
+	if gen < rgen {
+		// Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
+		if r, ok := gens[devid]; !ok {
+			gens[devid] = &genRange{min: gen + 1, max: rgen}
+		} else {
+			if gen+1 < r.min {
+				r.min = gen + 1
+			}
+			if rgen > r.max {
+				r.max = rgen
+			}
+		}
+	}
+}
+
+func extractAndSortPrefixes(vec interfaces.GenVector) []string {
+	pfxs := make([]string, len(vec))
+	i := 0
+	for p := range vec {
+		pfxs[i] = p
+		i++
+	}
+	sort.Strings(pfxs)
+	return pfxs
+}
+
+// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
+// loop.
+func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
+	for i := r.cur; i <= r.max; i++ {
+		rec, err := getLogRec(ctx, sn, dev, i)
+		if err == nil {
+			r.cur = i + 1
+			return rec, nil
+		}
+		if verror.ErrorID(err) != verror.ErrNoExist.ID {
+			return nil, err
+		}
+	}
+	return nil, nil
+}
+
+// Note: initPfxs is sorted.
+func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
+	filter := true
+
+	var maxGen uint64
+	for _, p := range initPfxs {
+		if strings.HasPrefix(rec.Metadata.ObjId, p) {
+			// Do not filter. Initiator is interested in this
+			// prefix.
+			filter = false
+
+			// Track if the initiator knows of this record.
+			gen := initVec[p][rec.Metadata.Id]
+			if maxGen < gen {
+				maxGen = gen
+			}
+		}
+	}
+
+	// Filter this record if the initiator already has it.
+	if maxGen >= rec.Metadata.Gen {
+		return true
+	}
+
+	return filter
+}
+
+// A minHeap implements heap.Interface and holds local log records.
+type minHeap []*localLogRec
+
+func (mh minHeap) Len() int { return len(mh) }
+
+func (mh minHeap) Less(i, j int) bool {
+	return mh[i].Pos < mh[j].Pos
+}
+
+func (mh minHeap) Swap(i, j int) {
+	mh[i], mh[j] = mh[j], mh[i]
+}
+
+func (mh *minHeap) Push(x interface{}) {
+	item := x.(*localLogRec)
+	*mh = append(*mh, item)
+}
+
+func (mh *minHeap) Pop() interface{} {
+	old := *mh
+	n := len(old)
+	item := old[n-1]
+	*mh = old[0 : n-1]
+	return item
+}
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
new file mode 100644
index 0000000..e2158b2
--- /dev/null
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -0,0 +1,514 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"fmt"
+	"math/rand"
+	"reflect"
+	"testing"
+	"time"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+)
+
+// TestDiffPrefixGenVectors tests diffing prefix gen vectors.
+func TestDiffPrefixGenVectors(t *testing.T) {
+	svc := createService(t)
+	defer destroyService(t, svc)
+	s := svc.sync
+	s.id = 10 //responder. Initiator is id 11.
+
+	tests := []struct {
+		respPVec, initPVec interfaces.PrefixGenVector
+		genDiffIn          genRangeVector
+		genDiffWant        genRangeVector
+	}{
+		{ // responder and initiator are at identical vectors.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder and initiator are at identical vectors.
+			respPVec:  interfaces.PrefixGenVector{10: 0},
+			initPVec:  interfaces.PrefixGenVector{10: 0},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder has no updates.
+			respPVec:  interfaces.PrefixGenVector{10: 0},
+			initPVec:  interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 8},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder and initiator have no updates.
+			respPVec:  interfaces.PrefixGenVector{10: 0},
+			initPVec:  interfaces.PrefixGenVector{11: 0},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder is staler than initiator.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 8, 14: 5},
+			genDiffIn: make(genRangeVector),
+		},
+		{ // responder is more up-to-date than initiator for local updates.
+			respPVec:    interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 2},
+			initPVec:    interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			genDiffIn:   make(genRangeVector),
+			genDiffWant: genRangeVector{10: &genRange{min: 2, max: 5}},
+		},
+		{ // responder is fresher than initiator for local updates and one device.
+			respPVec:  interfaces.PrefixGenVector{10: 5, 11: 10, 12: 22, 13: 2},
+			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2, 14: 40},
+			genDiffIn: make(genRangeVector),
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 2, max: 5},
+				12: &genRange{min: 21, max: 22},
+			},
+		},
+		{ // responder is fresher than initiator in all but one device.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			initPVec:  interfaces.PrefixGenVector{10: 0, 11: 2, 12: 0},
+			genDiffIn: make(genRangeVector),
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 1, max: 1},
+				12: &genRange{min: 1, max: 3},
+				13: &genRange{min: 1, max: 4},
+			},
+		},
+		{ // initiator has no updates.
+			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			initPVec:  interfaces.PrefixGenVector{},
+			genDiffIn: make(genRangeVector),
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 1, max: 1},
+				11: &genRange{min: 1, max: 2},
+				12: &genRange{min: 1, max: 3},
+				13: &genRange{min: 1, max: 4},
+			},
+		},
+		{ // initiator has no updates, pre-existing diff.
+			respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			initPVec: interfaces.PrefixGenVector{13: 1},
+			genDiffIn: genRangeVector{
+				10: &genRange{min: 5, max: 20},
+				13: &genRange{min: 1, max: 3},
+			},
+			genDiffWant: genRangeVector{
+				10: &genRange{min: 1, max: 20},
+				11: &genRange{min: 1, max: 2},
+				12: &genRange{min: 1, max: 3},
+				13: &genRange{min: 1, max: 4},
+			},
+		},
+	}
+
+	for _, test := range tests {
+		want := test.genDiffWant
+		got := test.genDiffIn
+		rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{})
+		rSt.diff = got
+		rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
+		checkEqualDevRanges(t, got, want)
+	}
+}
+
+// TestSendDeltas tests the computation of the delta bound (computeDeltaBound)
+// and if the log records on the wire are correctly ordered (phases 2 and 3 of
+// SendDeltas).
+func TestSendDeltas(t *testing.T) {
+	appName := "mockapp"
+	dbName := "mockdb"
+
+	tests := []struct {
+		respVec, initVec, outVec interfaces.GenVector
+		respGen                  uint64
+		genDiff                  genRangeVector
+		keyPfxs                  []string
+	}{
+		{ // Identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{12: 8},
+				"foobar": interfaces.PrefixGenVector{12: 10},
+			},
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 8},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 1, max: 10},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", ""},
+		},
+		{ // Identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"bar":    interfaces.PrefixGenVector{12: 20},
+				"foo":    interfaces.PrefixGenVector{12: 8},
+				"foobar": interfaces.PrefixGenVector{12: 10},
+			},
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5, 12: 10},
+				"bar":    interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 8},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
+				"bar":    interfaces.PrefixGenVector{10: 5, 12: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
+		},
+		{ // Non-identical prefixes, local only updates.
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{10: 5},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"f":      interfaces.PrefixGenVector{12: 5, 13: 5},
+				"foo":    interfaces.PrefixGenVector{12: 10, 13: 10},
+				"foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 10, 13: 10},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 20},
+				13: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 20},
+				13: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"f": interfaces.PrefixGenVector{12: 20, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 20},
+				13: &genRange{min: 1, max: 20},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+		},
+		{ // Non-identical interleaving prefixes.
+			respVec: interfaces.GenVector{
+				"f":      interfaces.PrefixGenVector{12: 20, 13: 10},
+				"foo":    interfaces.PrefixGenVector{12: 30, 13: 20},
+				"foobar": interfaces.PrefixGenVector{12: 40, 13: 30},
+			},
+			initVec: interfaces.GenVector{
+				"fo":        interfaces.PrefixGenVector{11: 5, 12: 1},
+				"foob":      interfaces.PrefixGenVector{11: 5, 12: 10},
+				"foobarxyz": interfaces.PrefixGenVector{11: 5, 12: 20},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"fo":     interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 40},
+				13: &genRange{min: 1, max: 30},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
+		},
+		{ // Non-identical interleaving prefixes.
+			respVec: interfaces.GenVector{
+				"fo":        interfaces.PrefixGenVector{12: 20, 13: 10},
+				"foob":      interfaces.PrefixGenVector{12: 30, 13: 20},
+				"foobarxyz": interfaces.PrefixGenVector{12: 40, 13: 30},
+			},
+			initVec: interfaces.GenVector{
+				"f":      interfaces.PrefixGenVector{11: 5, 12: 1},
+				"foo":    interfaces.PrefixGenVector{11: 5, 12: 10},
+				"foobar": interfaces.PrefixGenVector{11: 5, 12: 20},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"f":         interfaces.PrefixGenVector{10: 5},
+				"fo":        interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+				"foob":      interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+				"foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 40},
+				13: &genRange{min: 1, max: 30},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
+		},
+		{ // Non-identical sibling prefixes.
+			respVec: interfaces.GenVector{
+				"foo":       interfaces.PrefixGenVector{12: 20, 13: 10},
+				"foobarabc": interfaces.PrefixGenVector{12: 40, 13: 30},
+				"foobarxyz": interfaces.PrefixGenVector{12: 30, 13: 20},
+			},
+			initVec: interfaces.GenVector{
+				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":       interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+				"foobarabc": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+				"foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 2, max: 40},
+				13: &genRange{min: 1, max: 30},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "foobarabc", "foobarxyz", "foobar123", "fooxyz"},
+		},
+		{ // Non-identical prefixes, local and remote updates.
+			respVec: interfaces.GenVector{
+				"barbaz": interfaces.PrefixGenVector{12: 18},
+				"f":      interfaces.PrefixGenVector{12: 30, 13: 5},
+				"foobar": interfaces.PrefixGenVector{12: 30, 13: 8},
+			},
+			initVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{11: 5, 12: 5},
+				"foobar": interfaces.PrefixGenVector{11: 5, 12: 5},
+				"bar":    interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
+			},
+			respGen: 5,
+			outVec: interfaces.GenVector{
+				"foo":    interfaces.PrefixGenVector{10: 5, 12: 30, 13: 5},
+				"foobar": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 8},
+				"bar":    interfaces.PrefixGenVector{10: 5},
+				"barbaz": interfaces.PrefixGenVector{10: 5, 12: 18},
+			},
+			genDiff: genRangeVector{
+				10: &genRange{min: 1, max: 5},
+				12: &genRange{min: 6, max: 30},
+				13: &genRange{min: 1, max: 8},
+			},
+			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
+		},
+	}
+
+	for i, test := range tests {
+		svc := createService(t)
+		s := svc.sync
+		s.id = 10 //responder.
+
+		wantDiff, wantVec := test.genDiff, test.outVec
+		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
+
+		req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
+		rSt := newResponderState(nil, nil, s, req)
+
+		rSt.computeDeltaBound(nil)
+		if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
+			t.Fatalf("computeDeltaBound failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, rSt.outVec, wantVec, rSt.errState)
+		}
+		checkEqualDevRanges(t, rSt.diff, wantDiff)
+
+		////////////////////////////////////////
+		// Test sending deltas.
+
+		// Insert some log records to bootstrap testing below.
+		tRng := rand.New(rand.NewSource(int64(i)))
+		var wantRecs []*localLogRec
+		st := svc.St()
+		tx := st.NewTransaction()
+		objKeyPfxs := test.keyPfxs
+		j := 0
+		for id, r := range wantDiff {
+			pos := uint64(tRng.Intn(50) + 100*j)
+			for k := r.min; k <= r.max; k++ {
+				opfx := objKeyPfxs[tRng.Intn(len(objKeyPfxs))]
+				// Create holes in the log records.
+				if opfx == "" {
+					continue
+				}
+				okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
+				vers := fmt.Sprintf("%x", tRng.Int())
+				rec := &localLogRec{
+					Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
+					Pos:      pos + k,
+				}
+				if err := putLogRec(nil, tx, rec); err != nil {
+					t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
+				}
+
+				initPfxs := extractAndSortPrefixes(test.initVec)
+				if !filterLogRec(rec, test.initVec, initPfxs) {
+					wantRecs = append(wantRecs, rec)
+				}
+			}
+			j++
+		}
+		if err := tx.Commit(); err != nil {
+			t.Fatalf("cannot commit putting log rec, err %v", err)
+		}
+
+		d := &dummyResponder{}
+		rSt.call = d
+		rSt.st, rSt.errState = rSt.sync.getDbStore(nil, nil, rSt.req.AppName, rSt.req.DbName)
+		if rSt.errState != nil {
+			t.Fatalf("filterAndSendDeltas failed to get store handle for app/db %v %v", rSt.req.AppName, rSt.req.DbName)
+		}
+		err := rSt.filterAndSendDeltas(nil)
+		if err != nil {
+			t.Fatalf("filterAndSendDeltas failed (I: %v), (R: %v, %v) err %v", test.initVec, test.respGen, test.respVec, err)
+		}
+		d.diffLogRecs(t, wantRecs, wantVec)
+
+		destroyService(t, svc)
+	}
+}
+
+//////////////////////////////
+// Helpers
+
+type dummyResponder struct {
+	start, finish int
+	gotRecs       []*localLogRec
+	outVec        interfaces.GenVector
+}
+
+func (d *dummyResponder) RecvStream() interface {
+	Advance() bool
+	Value() interfaces.DeltaReq
+	Err() error
+} {
+	return d
+}
+
+func (d *dummyResponder) Advance() bool {
+	return false
+}
+
+func (d *dummyResponder) Value() interfaces.DeltaReq {
+	return interfaces.DeltaReq{}
+}
+
+func (d *dummyResponder) Err() error { return nil }
+
+func (d *dummyResponder) SendStream() interface {
+	Send(item interfaces.DeltaResp) error
+} {
+	return d
+}
+
+func (d *dummyResponder) Send(item interfaces.DeltaResp) error {
+	switch v := item.(type) {
+	case interfaces.DeltaRespStart:
+		d.start++
+	case interfaces.DeltaRespFinish:
+		d.finish++
+	case interfaces.DeltaRespRespVec:
+		d.outVec = v.Value
+	case interfaces.DeltaRespRec:
+		d.gotRecs = append(d.gotRecs, &localLogRec{Metadata: v.Value.Metadata})
+	}
+	return nil
+}
+
+func (d *dummyResponder) Security() security.Call {
+	return nil
+}
+
+func (d *dummyResponder) Suffix() string {
+	return ""
+}
+
+func (d *dummyResponder) LocalEndpoint() naming.Endpoint {
+	return nil
+}
+
+func (d *dummyResponder) RemoteEndpoint() naming.Endpoint {
+	return nil
+}
+
+func (d *dummyResponder) GrantedBlessings() security.Blessings {
+	return security.Blessings{}
+}
+
+func (d *dummyResponder) Server() rpc.Server {
+	return nil
+}
+
+func (d *dummyResponder) diffLogRecs(t *testing.T, wantRecs []*localLogRec, wantVec interfaces.GenVector) {
+	if d.start != 1 || d.finish != 1 {
+		t.Fatalf("diffLogRecs incorrect start/finish records (%v, %v)", d.start, d.finish)
+	}
+	if len(d.gotRecs) != len(wantRecs) {
+		t.Fatalf("diffLogRecs failed, gotLen %v, wantLen %v\n", len(d.gotRecs), len(wantRecs))
+	}
+	for i, rec := range d.gotRecs {
+		if !reflect.DeepEqual(rec.Metadata, wantRecs[i].Metadata) {
+			t.Fatalf("diffLogRecs failed, i %v, got %v, want %v\n", i, rec.Metadata, wantRecs[i].Metadata)
+		}
+	}
+	if !reflect.DeepEqual(d.outVec, wantVec) {
+		t.Fatalf("diffLogRecs failed genvector, got %v, want %v\n", d.outVec, wantVec)
+	}
+}
+
+func checkEqualDevRanges(t *testing.T, s1, s2 genRangeVector) {
+	if len(s1) != len(s2) {
+		t.Fatalf("len(s1): %v != len(s2): %v", len(s1), len(s2))
+	}
+	for d1, r1 := range s1 {
+		if r2, ok := s2[d1]; !ok || !reflect.DeepEqual(r1, r2) {
+			t.Fatalf("Dev %v: r1 %v != r2 %v", d1, r1, r2)
+		}
+	}
+}
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index eac07f7..6d787e1 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -29,9 +29,10 @@
 // syncService contains the metadata for the sync module.
 type syncService struct {
 	// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
-	id   uint64 // globally unique id for this instance of Syncbase.
-	name string // name derived from the global id.
-	sv   interfaces.Service
+	id     uint64 // globally unique id for this instance of Syncbase.
+	name   string // name derived from the global id.
+	sv     interfaces.Service
+	server rpc.Server
 
 	// State to coordinate shutdown of spawned goroutines.
 	pending sync.WaitGroup
@@ -96,9 +97,10 @@
 // changes to its objects. The "initiator" thread is responsible for
 // periodically contacting peers to fetch changes from them. In addition, the
 // sync module responds to incoming RPCs from remote sync modules.
-func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service) (*syncService, error) {
+func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, server rpc.Server) (*syncService, error) {
 	s := &syncService{
 		sv:      sv,
+		server:  server,
 		batches: make(batchSet),
 	}
 
@@ -149,13 +151,6 @@
 }
 
 ////////////////////////////////////////
-// Core sync method.
-
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
-	return verror.NewErrNotImplemented(ctx)
-}
-
-////////////////////////////////////////
 // util.Layer methods.
 
 func (s *syncService) Name() string {
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 101d08c..49d9ff5 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -31,17 +31,13 @@
 // device.
 
 import (
-	"container/heap"
 	"fmt"
-	"sort"
 	"strconv"
-	"strings"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
-	"v.io/v23/rpc"
 	"v.io/v23/verror"
 )
 
@@ -59,6 +55,7 @@
 // a) in-memory sync state of a Database consisting of the current generation
 //    number, log position and generation vector.
 // b) watcher map of prefixes currently being synced.
+// c) republish names in mount tables for all syncgroups.
 //
 // TODO(hpucha): This is incomplete. Flesh this out further.
 func (s *syncService) initSync(ctx *context.T) error {
@@ -347,319 +344,3 @@
 	}
 	return nil
 }
-
-////////////////////////////////////////////////////////////
-// Genvector-related utilities.
-
-// sendDeltasPerDatabase sends to an initiator all the missing generations
-// corresponding to the prefixes requested for this Database, and a genvector
-// summarizing the knowledge transferred from the responder to the
-// initiator. This happens in two phases:
-//
-// In the first phase, for a given set of nested prefixes from the initiator,
-// the shortest prefix in that set is extracted. The initiator's prefix
-// genvector for this shortest prefix represents the lower bound on its
-// knowledge for the entire set of nested prefixes. This prefix genvector
-// (representing the lower bound) is diffed with all the responder prefix
-// genvectors corresponding to same or deeper prefixes compared to the initiator
-// prefix. This diff produces a bound on the missing knowledge. For example, say
-// the initiator is interested in prefixes {foo, foobar}, where each prefix is
-// associated with a prefix genvector. Since the initiator strictly has as much
-// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
-// prefix genvector is chosen as the lower bound for the initiator's
-// knowledge. Similarly, say the responder has knowledge on prefixes {f,
-// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
-// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
-// compute a bound on missing generations (all responder's prefixes that match
-// "foo". Note that since the responder doesn't have a prefix genvector at
-// "foo", its knowledge at "f" is applicable to "foo").
-//
-// Since the first phase outputs an aggressive calculation of missing
-// generations containing more generation entries than strictly needed by the
-// initiator, in the second phase, each missing generation is sent to the
-// initiator only if the initiator is eligible for it and is not aware of
-// it. The generations are sent to the initiator in the same order as the
-// responder learned them so that the initiator can reconstruct the DAG for the
-// objects by learning older nodes first.
-func (s *syncService) sendDeltasPerDatabase(ctx *context.T, call rpc.ServerCall, appName, dbName string, initVec interfaces.GenVector, stream logRecStream) (interfaces.GenVector, error) {
-	// Phase 1 of sendDeltas. diff contains the bound on the generations
-	// missing from the initiator per device.
-	diff, outVec, err := s.computeDeltaBound(ctx, appName, dbName, initVec)
-	if err != nil {
-		return nil, err
-	}
-
-	// Phase 2 of sendDeltas: Process the diff, filtering out records that
-	// are not needed, and send the remainder on the wire ordered.
-	st, err := s.getDbStore(ctx, call, appName, dbName)
-	if err != nil {
-		return nil, err
-	}
-
-	// We now visit every log record in the generation range as obtained
-	// from phase 1 in their log order. We use a heap to incrementally sort
-	// the log records as per their position in the log.
-	//
-	// Init the min heap, one entry per device in the diff.
-	mh := make(minHeap, 0, len(diff))
-	for dev, r := range diff {
-		r.cur = r.min
-		rec, err := getNextLogRec(ctx, st, dev, r)
-		if err != nil {
-			return nil, err
-		}
-		if rec != nil {
-			mh = append(mh, rec)
-		} else {
-			delete(diff, dev)
-		}
-	}
-	heap.Init(&mh)
-
-	// Process the log records in order.
-	initPfxs := extractAndSortPrefixes(initVec)
-
-	for mh.Len() > 0 {
-		rec := heap.Pop(&mh).(*localLogRec)
-
-		if !filterLogRec(rec, initVec, initPfxs) {
-			// Send on the wire.
-			wireRec := interfaces.LogRec{Metadata: rec.Metadata}
-			// TODO(hpucha): Hash out this fake stream stuff when
-			// defining the RPC and the rest of the responder.
-			stream.Send(wireRec)
-		}
-
-		// Add a new record from the same device if not done.
-		dev := rec.Metadata.Id
-		rec, err := getNextLogRec(ctx, st, dev, diff[dev])
-		if err != nil {
-			return nil, err
-		}
-		if rec != nil {
-			heap.Push(&mh, rec)
-		} else {
-			delete(diff, dev)
-		}
-	}
-
-	return outVec, nil
-}
-
-// computeDeltaBound computes the bound on missing generations across all
-// requested prefixes (phase 1 of sendDeltas).
-func (s *syncService) computeDeltaBound(ctx *context.T, appName, dbName string, initVec interfaces.GenVector) (genRangeVector, interfaces.GenVector, error) {
-	respVec, respGen, err := s.getDbGenInfo(ctx, appName, dbName)
-	if err != nil {
-		return nil, nil, err
-	}
-	respPfxs := extractAndSortPrefixes(respVec)
-	initPfxs := extractAndSortPrefixes(initVec)
-	if len(initPfxs) == 0 {
-		return nil, nil, verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
-	}
-
-	outVec := make(interfaces.GenVector)
-	diff := make(genRangeVector)
-	pfx := initPfxs[0]
-
-	for _, p := range initPfxs {
-		if strings.HasPrefix(p, pfx) && p != pfx {
-			continue
-		}
-
-		// Process this prefix as this is the start of a new set of
-		// nested prefixes.
-		pfx = p
-
-		// Lower bound on initiator's knowledge for this prefix set.
-		initpgv := initVec[pfx]
-
-		// Find the relevant responder prefixes and add the corresponding knowledge.
-		var respgv interfaces.PrefixGenVector
-		var rpStart string
-		for _, rp := range respPfxs {
-			if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
-				// No relationship with pfx.
-				continue
-			}
-
-			if strings.HasPrefix(pfx, rp) {
-				// If rp is a prefix of pfx, remember it because
-				// it may be a potential starting point for the
-				// responder's knowledge. The actual starting
-				// point is the deepest prefix where rp is a
-				// prefix of pfx.
-				//
-				// Say the initiator is looking for "foo", and
-				// the responder has knowledge for "f" and "fo",
-				// the responder's starting point will be the
-				// prefix genvector for "fo". Similarly, if the
-				// responder has knowledge for "foo", the
-				// starting point will be the prefix genvector
-				// for "foo".
-				rpStart = rp
-			} else {
-				// If pfx is a prefix of rp, this knowledge must
-				// be definitely sent to the initiator. Diff the
-				// prefix genvectors to adjust the delta bound and
-				// include in outVec.
-				respgv = respVec[rp]
-				s.diffPrefixGenVectors(respgv, initpgv, diff)
-				outVec[rp] = respgv
-			}
-		}
-
-		// Deal with the starting point.
-		if rpStart == "" {
-			// No matching prefixes for pfx were found.
-			respgv = make(interfaces.PrefixGenVector)
-			respgv[s.id] = respGen
-		} else {
-			respgv = respVec[rpStart]
-		}
-		s.diffPrefixGenVectors(respgv, initpgv, diff)
-		outVec[pfx] = respgv
-	}
-
-	return diff, outVec, nil
-}
-
-// genRange represents a range of generations (min and max inclusive).
-type genRange struct {
-	min uint64
-	max uint64
-	cur uint64
-}
-
-type genRangeVector map[uint64]*genRange
-
-// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
-// and the initiator, and updates the range of generations per device known to
-// the responder but not known to the initiator. "gens" (generation range) is
-// passed in as an input argument so that it can be incrementally updated as the
-// range of missing generations grows when different responder prefix genvectors
-// are used to compute the diff.
-//
-// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
-// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
-// two vectors returns: {A:[6-10], C:[1-1]}.
-//
-// TODO(hpucha): Add reclaimVec for GCing.
-func (s *syncService) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector, gens genRangeVector) {
-	// Compute missing generations for devices that are in both initiator's and responder's vectors.
-	for devid, gen := range initPVec {
-		rgen, ok := respPVec[devid]
-		// Skip since responder doesn't know of this device.
-		if ok {
-			updateDevRange(devid, rgen, gen, gens)
-		}
-	}
-
-	// Compute missing generations for devices not in initiator's vector but in responder's vector.
-	for devid, rgen := range respPVec {
-		if _, ok := initPVec[devid]; !ok {
-			updateDevRange(devid, rgen, 0, gens)
-		}
-	}
-}
-
-func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
-	if gen < rgen {
-		// Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
-		if r, ok := gens[devid]; !ok {
-			gens[devid] = &genRange{min: gen + 1, max: rgen}
-		} else {
-			if gen+1 < r.min {
-				r.min = gen + 1
-			}
-			if rgen > r.max {
-				r.max = rgen
-			}
-		}
-	}
-}
-
-func extractAndSortPrefixes(vec interfaces.GenVector) []string {
-	pfxs := make([]string, len(vec))
-	i := 0
-	for p := range vec {
-		pfxs[i] = p
-		i++
-	}
-	sort.Strings(pfxs)
-	return pfxs
-}
-
-// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
-// loop.
-func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
-	for i := r.cur; i <= r.max; i++ {
-		rec, err := getLogRec(ctx, sn, dev, i)
-		if err == nil {
-			r.cur = i + 1
-			return rec, nil
-		}
-		if verror.ErrorID(err) != verror.ErrNoExist.ID {
-			return nil, err
-		}
-	}
-	return nil, nil
-}
-
-// Note: initPfxs is sorted.
-func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
-	filter := true
-
-	var maxGen uint64
-	for _, p := range initPfxs {
-		if strings.HasPrefix(rec.Metadata.ObjId, p) {
-			// Do not filter. Initiator is interested in this
-			// prefix.
-			filter = false
-
-			// Track if the initiator knows of this record.
-			gen := initVec[p][rec.Metadata.Id]
-			if maxGen < gen {
-				maxGen = gen
-			}
-		}
-	}
-
-	// Filter this record if the initiator already has it.
-	if maxGen >= rec.Metadata.Gen {
-		return true
-	}
-
-	return filter
-}
-
-// A minHeap implements heap.Interface and holds local log records.
-type minHeap []*localLogRec
-
-func (mh minHeap) Len() int { return len(mh) }
-
-func (mh minHeap) Less(i, j int) bool {
-	return mh[i].Pos < mh[j].Pos
-}
-
-func (mh minHeap) Swap(i, j int) {
-	mh[i], mh[j] = mh[j], mh[i]
-}
-
-func (mh *minHeap) Push(x interface{}) {
-	item := x.(*localLogRec)
-	*mh = append(*mh, item)
-}
-
-func (mh *minHeap) Pop() interface{} {
-	old := *mh
-	n := len(old)
-	item := old[n-1]
-	*mh = old[0 : n-1]
-	return item
-}
-
-type logRecStream interface {
-	Send(interfaces.LogRec)
-}
diff --git a/x/ref/services/syncbase/vsync/sync_state_test.go b/x/ref/services/syncbase/vsync/sync_state_test.go
index 57c1f66..aa232e4 100644
--- a/x/ref/services/syncbase/vsync/sync_state_test.go
+++ b/x/ref/services/syncbase/vsync/sync_state_test.go
@@ -5,8 +5,6 @@
 package vsync
 
 import (
-	"fmt"
-	"math/rand"
 	"reflect"
 	"testing"
 	"time"
@@ -113,389 +111,6 @@
 	checkLogRec(t, st, id, gen, false, nil)
 }
 
-// TestDiffPrefixGenVectors tests diffing prefix gen vectors.
-func TestDiffPrefixGenVectors(t *testing.T) {
-	svc := createService(t)
-	defer destroyService(t, svc)
-	s := svc.sync
-	s.id = 10 //responder. Initiator is id 11.
-
-	tests := []struct {
-		respPVec, initPVec interfaces.PrefixGenVector
-		genDiffIn          genRangeVector
-		genDiffWant        genRangeVector
-	}{
-		{ // responder and initiator are at identical vectors.
-			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
-			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
-			genDiffIn: make(genRangeVector),
-		},
-		{ // responder and initiator are at identical vectors.
-			respPVec:  interfaces.PrefixGenVector{10: 0},
-			initPVec:  interfaces.PrefixGenVector{10: 0},
-			genDiffIn: make(genRangeVector),
-		},
-		{ // responder has no updates.
-			respPVec:  interfaces.PrefixGenVector{10: 0},
-			initPVec:  interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 8},
-			genDiffIn: make(genRangeVector),
-		},
-		{ // responder and initiator have no updates.
-			respPVec:  interfaces.PrefixGenVector{10: 0},
-			initPVec:  interfaces.PrefixGenVector{11: 0},
-			genDiffIn: make(genRangeVector),
-		},
-		{ // responder is staler than initiator.
-			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
-			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 8, 14: 5},
-			genDiffIn: make(genRangeVector),
-		},
-		{ // responder is more up-to-date than initiator for local updates.
-			respPVec:    interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 2},
-			initPVec:    interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
-			genDiffIn:   make(genRangeVector),
-			genDiffWant: genRangeVector{10: &genRange{min: 2, max: 5}},
-		},
-		{ // responder is fresher than initiator for local updates and one device.
-			respPVec:  interfaces.PrefixGenVector{10: 5, 11: 10, 12: 22, 13: 2},
-			initPVec:  interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2, 14: 40},
-			genDiffIn: make(genRangeVector),
-			genDiffWant: genRangeVector{
-				10: &genRange{min: 2, max: 5},
-				12: &genRange{min: 21, max: 22},
-			},
-		},
-		{ // responder is fresher than initiator in all but one device.
-			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
-			initPVec:  interfaces.PrefixGenVector{10: 0, 11: 2, 12: 0},
-			genDiffIn: make(genRangeVector),
-			genDiffWant: genRangeVector{
-				10: &genRange{min: 1, max: 1},
-				12: &genRange{min: 1, max: 3},
-				13: &genRange{min: 1, max: 4},
-			},
-		},
-		{ // initiator has no updates.
-			respPVec:  interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
-			initPVec:  interfaces.PrefixGenVector{},
-			genDiffIn: make(genRangeVector),
-			genDiffWant: genRangeVector{
-				10: &genRange{min: 1, max: 1},
-				11: &genRange{min: 1, max: 2},
-				12: &genRange{min: 1, max: 3},
-				13: &genRange{min: 1, max: 4},
-			},
-		},
-		{ // initiator has no updates, pre-existing diff.
-			respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
-			initPVec: interfaces.PrefixGenVector{13: 1},
-			genDiffIn: genRangeVector{
-				10: &genRange{min: 5, max: 20},
-				13: &genRange{min: 1, max: 3},
-			},
-			genDiffWant: genRangeVector{
-				10: &genRange{min: 1, max: 20},
-				11: &genRange{min: 1, max: 2},
-				12: &genRange{min: 1, max: 3},
-				13: &genRange{min: 1, max: 4},
-			},
-		},
-	}
-
-	for _, test := range tests {
-		want := test.genDiffWant
-		got := test.genDiffIn
-		s.diffPrefixGenVectors(test.respPVec, test.initPVec, got)
-		checkEqualDevRanges(t, got, want)
-	}
-}
-
-// TestSendDeltas tests the computation of the delta bound (computeDeltaBound)
-// and if the log records on the wire are correctly ordered.
-func TestSendDeltas(t *testing.T) {
-	appName := "mockapp"
-	dbName := "mockdb"
-
-	tests := []struct {
-		respVec, initVec, outVec interfaces.GenVector
-		respGen                  uint64
-		genDiff                  genRangeVector
-		keyPfxs                  []string
-	}{
-		{ // Identical prefixes, local and remote updates.
-			respVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{12: 8},
-				"foobar": interfaces.PrefixGenVector{12: 10},
-			},
-			initVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{11: 5},
-				"foobar": interfaces.PrefixGenVector{11: 5},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{10: 5, 12: 8},
-				"foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 1, max: 10},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", ""},
-		},
-		{ // Identical prefixes, local and remote updates.
-			respVec: interfaces.GenVector{
-				"bar":    interfaces.PrefixGenVector{12: 20},
-				"foo":    interfaces.PrefixGenVector{12: 8},
-				"foobar": interfaces.PrefixGenVector{12: 10},
-			},
-			initVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{11: 5},
-				"foobar": interfaces.PrefixGenVector{11: 5, 12: 10},
-				"bar":    interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{10: 5, 12: 8},
-				"foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
-				"bar":    interfaces.PrefixGenVector{10: 5, 12: 20},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 1, max: 20},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
-		},
-		{ // Non-identical prefixes, local only updates.
-			initVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{11: 5},
-				"foobar": interfaces.PrefixGenVector{11: 5},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo": interfaces.PrefixGenVector{10: 5},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
-		},
-		{ // Non-identical prefixes, local and remote updates.
-			respVec: interfaces.GenVector{
-				"f":      interfaces.PrefixGenVector{12: 5, 13: 5},
-				"foo":    interfaces.PrefixGenVector{12: 10, 13: 10},
-				"foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
-			},
-			initVec: interfaces.GenVector{
-				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{10: 5, 12: 10, 13: 10},
-				"foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 2, max: 20},
-				13: &genRange{min: 1, max: 20},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
-		},
-		{ // Non-identical prefixes, local and remote updates.
-			respVec: interfaces.GenVector{
-				"foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
-			},
-			initVec: interfaces.GenVector{
-				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{10: 5},
-				"foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 2, max: 20},
-				13: &genRange{min: 1, max: 20},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
-		},
-		{ // Non-identical prefixes, local and remote updates.
-			respVec: interfaces.GenVector{
-				"f": interfaces.PrefixGenVector{12: 20, 13: 20},
-			},
-			initVec: interfaces.GenVector{
-				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 2, max: 20},
-				13: &genRange{min: 1, max: 20},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
-		},
-		{ // Non-identical interleaving prefixes.
-			respVec: interfaces.GenVector{
-				"f":      interfaces.PrefixGenVector{12: 20, 13: 10},
-				"foo":    interfaces.PrefixGenVector{12: 30, 13: 20},
-				"foobar": interfaces.PrefixGenVector{12: 40, 13: 30},
-			},
-			initVec: interfaces.GenVector{
-				"fo":        interfaces.PrefixGenVector{11: 5, 12: 1},
-				"foob":      interfaces.PrefixGenVector{11: 5, 12: 10},
-				"foobarxyz": interfaces.PrefixGenVector{11: 5, 12: 20},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"fo":     interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
-				"foo":    interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
-				"foobar": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 2, max: 40},
-				13: &genRange{min: 1, max: 30},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
-		},
-		{ // Non-identical interleaving prefixes.
-			respVec: interfaces.GenVector{
-				"fo":        interfaces.PrefixGenVector{12: 20, 13: 10},
-				"foob":      interfaces.PrefixGenVector{12: 30, 13: 20},
-				"foobarxyz": interfaces.PrefixGenVector{12: 40, 13: 30},
-			},
-			initVec: interfaces.GenVector{
-				"f":      interfaces.PrefixGenVector{11: 5, 12: 1},
-				"foo":    interfaces.PrefixGenVector{11: 5, 12: 10},
-				"foobar": interfaces.PrefixGenVector{11: 5, 12: 20},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"f":         interfaces.PrefixGenVector{10: 5},
-				"fo":        interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
-				"foob":      interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
-				"foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 2, max: 40},
-				13: &genRange{min: 1, max: 30},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
-		},
-		{ // Non-identical sibling prefixes.
-			respVec: interfaces.GenVector{
-				"foo":       interfaces.PrefixGenVector{12: 20, 13: 10},
-				"foobarabc": interfaces.PrefixGenVector{12: 40, 13: 30},
-				"foobarxyz": interfaces.PrefixGenVector{12: 30, 13: 20},
-			},
-			initVec: interfaces.GenVector{
-				"foo": interfaces.PrefixGenVector{11: 5, 12: 1},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo":       interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
-				"foobarabc": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
-				"foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 2, max: 40},
-				13: &genRange{min: 1, max: 30},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "foobarabc", "foobarxyz", "foobar123", "fooxyz"},
-		},
-		{ // Non-identical prefixes, local and remote updates.
-			respVec: interfaces.GenVector{
-				"barbaz": interfaces.PrefixGenVector{12: 18},
-				"f":      interfaces.PrefixGenVector{12: 30, 13: 5},
-				"foobar": interfaces.PrefixGenVector{12: 30, 13: 8},
-			},
-			initVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{11: 5, 12: 5},
-				"foobar": interfaces.PrefixGenVector{11: 5, 12: 5},
-				"bar":    interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
-			},
-			respGen: 5,
-			outVec: interfaces.GenVector{
-				"foo":    interfaces.PrefixGenVector{10: 5, 12: 30, 13: 5},
-				"foobar": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 8},
-				"bar":    interfaces.PrefixGenVector{10: 5},
-				"barbaz": interfaces.PrefixGenVector{10: 5, 12: 18},
-			},
-			genDiff: genRangeVector{
-				10: &genRange{min: 1, max: 5},
-				12: &genRange{min: 6, max: 30},
-				13: &genRange{min: 1, max: 8},
-			},
-			keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
-		},
-	}
-
-	for i, test := range tests {
-		svc := createService(t)
-		s := svc.sync
-		s.id = 10 //responder.
-
-		wantDiff, wantVec := test.genDiff, test.outVec
-		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
-
-		gotDiff, gotVec, err := s.computeDeltaBound(nil, appName, dbName, test.initVec)
-		if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
-			t.Fatalf("computeDeltaBound failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
-		}
-		checkEqualDevRanges(t, gotDiff, wantDiff)
-
-		// Insert some log records to bootstrap testing below.
-		tRng := rand.New(rand.NewSource(int64(i)))
-		var wantRecs []*localLogRec
-		st := svc.St()
-		tx := st.NewTransaction()
-		objKeyPfxs := test.keyPfxs
-		j := 0
-		for id, r := range wantDiff {
-			pos := uint64(tRng.Intn(50) + 100*j)
-			for k := r.min; k <= r.max; k++ {
-				opfx := objKeyPfxs[tRng.Intn(len(objKeyPfxs))]
-				// Create holes in the log records.
-				if opfx == "" {
-					continue
-				}
-				okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
-				vers := fmt.Sprintf("%x", tRng.Int())
-				rec := &localLogRec{
-					Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
-					Pos:      pos + k,
-				}
-				if err := putLogRec(nil, tx, rec); err != nil {
-					t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
-				}
-
-				initPfxs := extractAndSortPrefixes(test.initVec)
-				if !filterLogRec(rec, test.initVec, initPfxs) {
-					wantRecs = append(wantRecs, rec)
-				}
-			}
-			j++
-		}
-		if err := tx.Commit(); err != nil {
-			t.Fatalf("cannot commit putting log rec, err %v", err)
-		}
-
-		ts := &logRecStreamTest{}
-		gotVec, err = s.sendDeltasPerDatabase(nil, nil, appName, dbName, test.initVec, ts)
-		if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
-			t.Fatalf("sendDeltasPerDatabase failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
-		}
-		ts.diffLogRecs(t, wantRecs)
-
-		destroyService(t, svc)
-	}
-}
-
 //////////////////////////////
 // Helpers
 
@@ -503,25 +118,6 @@
 // for getting the stack trace. Right now cannot import the package due to a
 // cycle.
 
-type logRecStreamTest struct {
-	gotRecs []*localLogRec
-}
-
-func (s *logRecStreamTest) Send(rec interfaces.LogRec) {
-	s.gotRecs = append(s.gotRecs, &localLogRec{Metadata: rec.Metadata})
-}
-
-func (s *logRecStreamTest) diffLogRecs(t *testing.T, wantRecs []*localLogRec) {
-	if len(s.gotRecs) != len(wantRecs) {
-		t.Fatalf("diffLogRecMetadata failed, gotLen %v, wantLen %v\n", len(s.gotRecs), len(wantRecs))
-	}
-	for i, rec := range s.gotRecs {
-		if !reflect.DeepEqual(rec.Metadata, wantRecs[i].Metadata) {
-			t.Fatalf("diffLogRecMetadata failed, i %v, got %v, want %v\n", i, rec.Metadata, wantRecs[i].Metadata)
-		}
-	}
-}
-
 func checkDbSyncState(t *testing.T, st store.StoreReader, exists bool, wantSt *dbSyncState) {
 	gotSt, err := getDbSyncState(nil, st)
 
@@ -549,14 +145,3 @@
 		t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
 	}
 }
-
-func checkEqualDevRanges(t *testing.T, s1, s2 genRangeVector) {
-	if len(s1) != len(s2) {
-		t.Fatalf("len(s1): %v != len(s2): %v", len(s1), len(s2))
-	}
-	for d1, r1 := range s1 {
-		if r2, ok := s2[d1]; !ok || !reflect.DeepEqual(r1, r2) {
-			t.Fatalf("Dev %v: r1 %v != r2 %v", d1, r1, r2)
-		}
-	}
-}
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 79cc9bf..31eaff0 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -18,13 +18,12 @@
 	"strings"
 	"time"
 
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
-
-	wire "v.io/syncbase/v23/services/syncbase/nosql"
-
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/security/access"
@@ -609,9 +608,20 @@
 }
 
 func (sd *syncDatabase) publishInMountTables(ctx *context.T, call rpc.ServerCall, spec wire.SyncGroupSpec) error {
-	// TODO(hpucha): To be implemented.
-	// Pass server to Service in store.
-	// server.ServeDispatcher(*name, service, authorizer)
+	// Get this Syncbase's sync module handle.
+	ss := sd.db.App().Service().Sync().(*syncService)
+
+	for _, mt := range spec.MountTables {
+		name := naming.Join(mt, ss.name)
+		// TODO(hpucha): Is this add idempotent? Appears to be from code.
+		// Will it handle absolute names. Appears to be.
+		if err := ss.server.AddName(name); err != nil {
+			return err
+		}
+	}
+
+	// TODO(hpucha): Do we have to publish in neighborhood explicitly?
+
 	return nil
 }
 
diff --git a/x/ref/services/syncbase/vsync/util_test.go b/x/ref/services/syncbase/vsync/util_test.go
index 0848d35..3e6b629 100644
--- a/x/ref/services/syncbase/vsync/util_test.go
+++ b/x/ref/services/syncbase/vsync/util_test.go
@@ -136,7 +136,7 @@
 		path:     path,
 		shutdown: shutdown,
 	}
-	if s.sync, err = New(ctx, nil, s); err != nil {
+	if s.sync, err = New(ctx, nil, s, nil); err != nil {
 		util.DestroyStore(engine, path)
 		t.Fatalf("cannot create sync service: %v", err)
 	}