syncbase/vsync: Responder module.
Change-Id: I2bc987c263283dc78c00642887208e54af6d3540
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index caf0339..5ab18b0 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -96,8 +96,11 @@
}
// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested with in a Database when requesting deltas for that Database.
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for that Database.
type DeltaReq struct {
+ AppName string
+ DbName string
SgIds set[GroupId]
InitVec GenVector
}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index a22731d..aa0872a 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -156,8 +156,11 @@
}
// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested with in a Database when requesting deltas for that Database.
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for that Database.
type DeltaReq struct {
+ AppName string
+ DbName string
SgIds map[GroupId]struct{}
InitVec GenVector
}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 135ee35..3447fcb 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -48,6 +48,9 @@
RootDir string
// Storage engine to use (for service and per-database engines).
Engine string
+ // RPC server for this service. Needed to advertise this service in
+ // mount tables attached to SyncGroups.
+ Server rpc.Server
}
// NewService creates a new service instance and returns it.
@@ -71,7 +74,7 @@
if err := util.Put(ctx, s.st, s, data); err != nil {
return nil, err
}
- if s.sync, err = vsync.New(ctx, call, s); err != nil {
+ if s.sync, err = vsync.New(ctx, call, s, opts.Server); err != nil {
return nil, err
}
return s, nil
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index 729e386..e1ee16f 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -19,7 +19,6 @@
"v.io/syncbase/x/ref/services/syncbase/server"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -45,6 +44,14 @@
ctx, shutdown := v23.Init()
defer shutdown()
+ s, err := v23.NewServer(ctx)
+ if err != nil {
+ vlog.Fatal("v23.NewServer() failed: ", err)
+ }
+ if _, err := s.Listen(v23.GetListenSpec(ctx)); err != nil {
+ vlog.Fatal("s.Listen() failed: ", err)
+ }
+
perms, err := securityflag.PermissionsFromFlag()
if err != nil {
vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
@@ -60,14 +67,16 @@
Perms: perms,
RootDir: *rootDir,
Engine: *engine,
+ Server: s,
})
if err != nil {
vlog.Fatal("server.NewService() failed: ", err)
}
d := server.NewDispatcher(service)
- if _, err = xrpc.NewDispatchingServer(ctx, *name, d); err != nil {
- vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
+ // Publish the service in the mount table.
+ if err := s.ServeDispatcher(*name, d); err != nil {
+ vlog.Fatal("s.ServeDispatcher() failed: ", err)
}
if *name != "" {
vlog.Info("Mounted at: ", *name)
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
new file mode 100644
index 0000000..19b1366
--- /dev/null
+++ b/services/syncbase/vsync/responder.go
@@ -0,0 +1,439 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "container/heap"
+ "sort"
+ "strings"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+// GetDeltas implements the responder side of the GetDeltas RPC.
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
+ recvr := call.RecvStream()
+
+ for recvr.Advance() {
+ req := recvr.Value()
+ // Ignoring errors since if one Database fails for any reason,
+ // it is fine to continue to the next one. In fact, sometimes
+ // the failure might be genuine. For example, the responder is
+ // no longer part of the requested SyncGroups, or the app/db is
+ // locally deleted, or a permission change has denied access.
+ rSt := newResponderState(ctx, call, s, req)
+ rSt.sendDeltasPerDatabase(ctx)
+ }
+
+ // TODO(hpucha): Is there a need to call finish or some such?
+ return recvr.Err()
+}
+
+// responderState is state accumulated per Database by the responder during an
+// initiation round.
+type responderState struct {
+ req interfaces.DeltaReq
+ call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
+ errState error // Captures the error from the first two phases of the responder.
+ sync *syncService
+ st store.Store // Store handle to the Database.
+ diff genRangeVector
+ outVec interfaces.GenVector
+}
+
+func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq) *responderState {
+ rSt := &responderState{call: call, sync: sync, req: req}
+ return rSt
+}
+
+// sendDeltasPerDatabase sends to an initiator all the missing generations
+// corresponding to the prefixes requested for this Database, and a genvector
+// summarizing the knowledge transferred from the responder to the
+// initiator. This happens in three phases:
+//
+// In the first phase, the initiator is checked against the SyncGroup ACLs of
+// all the SyncGroups it is requesting, and only those prefixes that belong to
+// allowed SyncGroups are carried forward.
+//
+// In the second phase, for a given set of nested prefixes from the initiator,
+// the shortest prefix in that set is extracted. The initiator's prefix
+// genvector for this shortest prefix represents the lower bound on its
+// knowledge for the entire set of nested prefixes. This prefix genvector
+// (representing the lower bound) is diffed with all the responder prefix
+// genvectors corresponding to same or deeper prefixes compared to the initiator
+// prefix. This diff produces a bound on the missing knowledge. For example, say
+// the initiator is interested in prefixes {foo, foobar}, where each prefix is
+// associated with a prefix genvector. Since the initiator strictly has as much
+// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
+// prefix genvector is chosen as the lower bound for the initiator's
+// knowledge. Similarly, say the responder has knowledge on prefixes {f,
+// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
+// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
+// compute a bound on missing generations (all responder's prefixes that match
+// "foo". Note that since the responder doesn't have a prefix genvector at
+// "foo", its knowledge at "f" is applicable to "foo").
+//
+// Since the second phase outputs an aggressive calculation of missing
+// generations containing more generation entries than strictly needed by the
+// initiator, in the third phase, each missing generation is sent to the
+// initiator only if the initiator is eligible for it and is not aware of
+// it. The generations are sent to the initiator in the same order as the
+// responder learned them so that the initiator can reconstruct the DAG for the
+// objects by learning older nodes first.
+func (rSt *responderState) sendDeltasPerDatabase(ctx *context.T) error {
+ // Phase 1 of sendDeltas: Authorize the initiator and respond to the
+ // caller only for the SyncGroups that allow access.
+ rSt.authorizeAndFilterSyncGroups(ctx)
+
+ // Phase 2 of sendDeltas: diff contains the bound on the
+ // generations missing from the initiator per device.
+ rSt.computeDeltaBound(ctx)
+
+ // Phase 3 of sendDeltas: Process the diff, filtering out records that
+ // are not needed, and send the remainder on the wire ordered.
+ return rSt.filterAndSendDeltas(ctx)
+}
+
+// authorizeAndFilterSyncGroups authorizes the initiator against the requested
+// SyncGroups and filters the initiator's prefixes to only include those from
+// allowed SyncGroups (phase 1 of sendDeltas).
+func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) {
+ rSt.st, rSt.errState = rSt.sync.getDbStore(ctx, nil, rSt.req.AppName, rSt.req.DbName)
+ if rSt.errState != nil {
+ return
+ }
+
+ allowedPfxs := make(map[string]struct{})
+ for sgid := range rSt.req.SgIds {
+ // Check permissions for the SyncGroup.
+ var sg *interfaces.SyncGroup
+ sg, rSt.errState = getSyncGroupById(ctx, rSt.st, sgid)
+ if rSt.errState != nil {
+ return
+ }
+ rSt.errState = authorize(ctx, rSt.call.Security(), sg)
+ if verror.ErrorID(rSt.errState) == verror.ErrNoAccess.ID {
+ continue
+ } else if rSt.errState != nil {
+ return
+ }
+
+ for _, p := range sg.Spec.Prefixes {
+ allowedPfxs[p] = struct{}{}
+ }
+ }
+
+ // Filter the initiator's prefixes to what is allowed.
+ for pfx := range rSt.req.InitVec {
+ if _, ok := allowedPfxs[pfx]; ok {
+ continue
+ }
+ allowed := false
+ for p := range allowedPfxs {
+ if strings.HasPrefix(pfx, p) {
+ allowed = true
+ }
+ }
+
+ if !allowed {
+ delete(rSt.req.InitVec, pfx)
+ }
+ }
+ return
+}
+
+// computeDeltaBound computes the bound on missing generations across all
+// requested prefixes (phase 2 of sendDeltas).
+func (rSt *responderState) computeDeltaBound(ctx *context.T) {
+ // Check error from phase 1.
+ if rSt.errState != nil {
+ return
+ }
+
+ if len(rSt.req.InitVec) == 0 {
+ rSt.errState = verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
+ return
+ }
+
+ var respVec interfaces.GenVector
+ var respGen uint64
+ respVec, respGen, rSt.errState = rSt.sync.getDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
+ if rSt.errState != nil {
+ return
+ }
+ respPfxs := extractAndSortPrefixes(respVec)
+ initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+
+ rSt.outVec = make(interfaces.GenVector)
+ rSt.diff = make(genRangeVector)
+ pfx := initPfxs[0]
+
+ for _, p := range initPfxs {
+ if strings.HasPrefix(p, pfx) && p != pfx {
+ continue
+ }
+
+ // Process this prefix as this is the start of a new set of
+ // nested prefixes.
+ pfx = p
+
+ // Lower bound on initiator's knowledge for this prefix set.
+ initpgv := rSt.req.InitVec[pfx]
+
+ // Find the relevant responder prefixes and add the corresponding knowledge.
+ var respgv interfaces.PrefixGenVector
+ var rpStart string
+ for _, rp := range respPfxs {
+ if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
+ // No relationship with pfx.
+ continue
+ }
+
+ if strings.HasPrefix(pfx, rp) {
+ // If rp is a prefix of pfx, remember it because
+ // it may be a potential starting point for the
+ // responder's knowledge. The actual starting
+ // point is the deepest prefix where rp is a
+ // prefix of pfx.
+ //
+ // Say the initiator is looking for "foo", and
+ // the responder has knowledge for "f" and "fo",
+ // the responder's starting point will be the
+ // prefix genvector for "fo". Similarly, if the
+ // responder has knowledge for "foo", the
+ // starting point will be the prefix genvector
+ // for "foo".
+ rpStart = rp
+ } else {
+ // If pfx is a prefix of rp, this knowledge must
+ // be definitely sent to the initiator. Diff the
+ // prefix genvectors to adjust the delta bound and
+ // include in outVec.
+ respgv = respVec[rp]
+ rSt.diffPrefixGenVectors(respgv, initpgv)
+ rSt.outVec[rp] = respgv
+ }
+ }
+
+ // Deal with the starting point.
+ if rpStart == "" {
+ // No matching prefixes for pfx were found.
+ respgv = make(interfaces.PrefixGenVector)
+ respgv[rSt.sync.id] = respGen
+ } else {
+ respgv = respVec[rpStart]
+ }
+ rSt.diffPrefixGenVectors(respgv, initpgv)
+ rSt.outVec[pfx] = respgv
+ }
+
+ return
+}
+
+// filterAndSendDeltas filters the computed delta to remove records already
+// known by the initiator, and sends the resulting records to the initiator
+// (phase 3 of sendDeltas).
+func (rSt *responderState) filterAndSendDeltas(ctx *context.T) error {
+ // Always send a start and finish response so that the initiator can
+ // move on to the next Database.
+ //
+ // TODO(hpucha): Although ok for now to call SendStream once per
+ // Database, would like to make this implementation agnostic.
+ sender := rSt.call.SendStream()
+ sender.Send(interfaces.DeltaRespStart{true})
+ defer sender.Send(interfaces.DeltaRespFinish{true})
+
+ // Check error from phase 2.
+ if rSt.errState != nil {
+ return rSt.errState
+ }
+
+ // First two phases were successful. So now on to phase 3. We now visit
+ // every log record in the generation range as obtained from phase 1 in
+ // their log order. We use a heap to incrementally sort the log records
+ // as per their position in the log.
+ //
+ // Init the min heap, one entry per device in the diff.
+ mh := make(minHeap, 0, len(rSt.diff))
+ for dev, r := range rSt.diff {
+ r.cur = r.min
+ rec, err := getNextLogRec(ctx, rSt.st, dev, r)
+ if err != nil {
+ return err
+ }
+ if rec != nil {
+ mh = append(mh, rec)
+ } else {
+ delete(rSt.diff, dev)
+ }
+ }
+ heap.Init(&mh)
+
+ // Process the log records in order.
+ initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+ for mh.Len() > 0 {
+ rec := heap.Pop(&mh).(*localLogRec)
+
+ if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
+ // Send on the wire.
+ wireRec := interfaces.LogRec{Metadata: rec.Metadata}
+ // TODO(hpucha): Hash out this fake stream stuff when
+ // defining the RPC and the rest of the responder.
+ sender.Send(interfaces.DeltaRespRec{wireRec})
+ }
+
+ // Add a new record from the same device if not done.
+ dev := rec.Metadata.Id
+ rec, err := getNextLogRec(ctx, rSt.st, dev, rSt.diff[dev])
+ if err != nil {
+ return err
+ }
+ if rec != nil {
+ heap.Push(&mh, rec)
+ } else {
+ delete(rSt.diff, dev)
+ }
+ }
+
+ sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
+ return nil
+}
+
+// genRange represents a range of generations (min and max inclusive).
+type genRange struct {
+ min uint64
+ max uint64
+ cur uint64
+}
+
+type genRangeVector map[uint64]*genRange
+
+// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
+// and the initiator, and updates the range of generations per device known to
+// the responder but not known to the initiator. "gens" (generation range) is
+// passed in as an input argument so that it can be incrementally updated as the
+// range of missing generations grows when different responder prefix genvectors
+// are used to compute the diff.
+//
+// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
+// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
+// two vectors returns: {A:[6-10], C:[1-1]}.
+//
+// TODO(hpucha): Add reclaimVec for GCing.
+func (rSt *responderState) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector) {
+ // Compute missing generations for devices that are in both initiator's and responder's vectors.
+ for devid, gen := range initPVec {
+ rgen, ok := respPVec[devid]
+ if ok {
+ updateDevRange(devid, rgen, gen, rSt.diff)
+ }
+ }
+
+ // Compute missing generations for devices not in initiator's vector but in responder's vector.
+ for devid, rgen := range respPVec {
+ if _, ok := initPVec[devid]; !ok {
+ updateDevRange(devid, rgen, 0, rSt.diff)
+ }
+ }
+}
+
+func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
+ if gen < rgen {
+ // Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
+ if r, ok := gens[devid]; !ok {
+ gens[devid] = &genRange{min: gen + 1, max: rgen}
+ } else {
+ if gen+1 < r.min {
+ r.min = gen + 1
+ }
+ if rgen > r.max {
+ r.max = rgen
+ }
+ }
+ }
+}
+
+func extractAndSortPrefixes(vec interfaces.GenVector) []string {
+ pfxs := make([]string, len(vec))
+ i := 0
+ for p := range vec {
+ pfxs[i] = p
+ i++
+ }
+ sort.Strings(pfxs)
+ return pfxs
+}
+
+// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
+// loop.
+func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
+ for i := r.cur; i <= r.max; i++ {
+ rec, err := getLogRec(ctx, sn, dev, i)
+ if err == nil {
+ r.cur = i + 1
+ return rec, nil
+ }
+ if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return nil, err
+ }
+ }
+ return nil, nil
+}
+
+// Note: initPfxs is sorted.
+func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
+ filter := true
+
+ var maxGen uint64
+ for _, p := range initPfxs {
+ if strings.HasPrefix(rec.Metadata.ObjId, p) {
+ // Do not filter. Initiator is interested in this
+ // prefix.
+ filter = false
+
+ // Track if the initiator knows of this record.
+ gen := initVec[p][rec.Metadata.Id]
+ if maxGen < gen {
+ maxGen = gen
+ }
+ }
+ }
+
+ // Filter this record if the initiator already has it.
+ if maxGen >= rec.Metadata.Gen {
+ return true
+ }
+
+ return filter
+}
+
+// A minHeap implements heap.Interface and holds local log records.
+type minHeap []*localLogRec
+
+func (mh minHeap) Len() int { return len(mh) }
+
+func (mh minHeap) Less(i, j int) bool {
+ return mh[i].Pos < mh[j].Pos
+}
+
+func (mh minHeap) Swap(i, j int) {
+ mh[i], mh[j] = mh[j], mh[i]
+}
+
+func (mh *minHeap) Push(x interface{}) {
+ item := x.(*localLogRec)
+ *mh = append(*mh, item)
+}
+
+func (mh *minHeap) Pop() interface{} {
+ old := *mh
+ n := len(old)
+ item := old[n-1]
+ *mh = old[0 : n-1]
+ return item
+}
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
new file mode 100644
index 0000000..e2158b2
--- /dev/null
+++ b/services/syncbase/vsync/responder_test.go
@@ -0,0 +1,514 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "fmt"
+ "math/rand"
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+)
+
+// TestDiffPrefixGenVectors tests diffing prefix gen vectors.
+func TestDiffPrefixGenVectors(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ s := svc.sync
+ s.id = 10 //responder. Initiator is id 11.
+
+ tests := []struct {
+ respPVec, initPVec interfaces.PrefixGenVector
+ genDiffIn genRangeVector
+ genDiffWant genRangeVector
+ }{
+ { // responder and initiator are at identical vectors.
+ respPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ genDiffIn: make(genRangeVector),
+ },
+ { // responder and initiator are at identical vectors.
+ respPVec: interfaces.PrefixGenVector{10: 0},
+ initPVec: interfaces.PrefixGenVector{10: 0},
+ genDiffIn: make(genRangeVector),
+ },
+ { // responder has no updates.
+ respPVec: interfaces.PrefixGenVector{10: 0},
+ initPVec: interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 8},
+ genDiffIn: make(genRangeVector),
+ },
+ { // responder and initiator have no updates.
+ respPVec: interfaces.PrefixGenVector{10: 0},
+ initPVec: interfaces.PrefixGenVector{11: 0},
+ genDiffIn: make(genRangeVector),
+ },
+ { // responder is staler than initiator.
+ respPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 8, 14: 5},
+ genDiffIn: make(genRangeVector),
+ },
+ { // responder is more up-to-date than initiator for local updates.
+ respPVec: interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 2},
+ initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ genDiffIn: make(genRangeVector),
+ genDiffWant: genRangeVector{10: &genRange{min: 2, max: 5}},
+ },
+ { // responder is fresher than initiator for local updates and one device.
+ respPVec: interfaces.PrefixGenVector{10: 5, 11: 10, 12: 22, 13: 2},
+ initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2, 14: 40},
+ genDiffIn: make(genRangeVector),
+ genDiffWant: genRangeVector{
+ 10: &genRange{min: 2, max: 5},
+ 12: &genRange{min: 21, max: 22},
+ },
+ },
+ { // responder is fresher than initiator in all but one device.
+ respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+ initPVec: interfaces.PrefixGenVector{10: 0, 11: 2, 12: 0},
+ genDiffIn: make(genRangeVector),
+ genDiffWant: genRangeVector{
+ 10: &genRange{min: 1, max: 1},
+ 12: &genRange{min: 1, max: 3},
+ 13: &genRange{min: 1, max: 4},
+ },
+ },
+ { // initiator has no updates.
+ respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+ initPVec: interfaces.PrefixGenVector{},
+ genDiffIn: make(genRangeVector),
+ genDiffWant: genRangeVector{
+ 10: &genRange{min: 1, max: 1},
+ 11: &genRange{min: 1, max: 2},
+ 12: &genRange{min: 1, max: 3},
+ 13: &genRange{min: 1, max: 4},
+ },
+ },
+ { // initiator has no updates, pre-existing diff.
+ respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+ initPVec: interfaces.PrefixGenVector{13: 1},
+ genDiffIn: genRangeVector{
+ 10: &genRange{min: 5, max: 20},
+ 13: &genRange{min: 1, max: 3},
+ },
+ genDiffWant: genRangeVector{
+ 10: &genRange{min: 1, max: 20},
+ 11: &genRange{min: 1, max: 2},
+ 12: &genRange{min: 1, max: 3},
+ 13: &genRange{min: 1, max: 4},
+ },
+ },
+ }
+
+ for _, test := range tests {
+ want := test.genDiffWant
+ got := test.genDiffIn
+ rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{})
+ rSt.diff = got
+ rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
+ checkEqualDevRanges(t, got, want)
+ }
+}
+
+// TestSendDeltas tests the computation of the delta bound (computeDeltaBound)
+// and if the log records on the wire are correctly ordered (phases 2 and 3 of
+// SendDeltas).
+func TestSendDeltas(t *testing.T) {
+ appName := "mockapp"
+ dbName := "mockdb"
+
+ tests := []struct {
+ respVec, initVec, outVec interfaces.GenVector
+ respGen uint64
+ genDiff genRangeVector
+ keyPfxs []string
+ }{
+ { // Identical prefixes, local and remote updates.
+ respVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{12: 8},
+ "foobar": interfaces.PrefixGenVector{12: 10},
+ },
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5},
+ "foobar": interfaces.PrefixGenVector{11: 5},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5, 12: 8},
+ "foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 1, max: 10},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", ""},
+ },
+ { // Identical prefixes, local and remote updates.
+ respVec: interfaces.GenVector{
+ "bar": interfaces.PrefixGenVector{12: 20},
+ "foo": interfaces.PrefixGenVector{12: 8},
+ "foobar": interfaces.PrefixGenVector{12: 10},
+ },
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5},
+ "foobar": interfaces.PrefixGenVector{11: 5, 12: 10},
+ "bar": interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5, 12: 8},
+ "foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
+ "bar": interfaces.PrefixGenVector{10: 5, 12: 20},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 1, max: 20},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
+ },
+ { // Non-identical prefixes, local only updates.
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5},
+ "foobar": interfaces.PrefixGenVector{11: 5},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+ },
+ { // Non-identical prefixes, local and remote updates.
+ respVec: interfaces.GenVector{
+ "f": interfaces.PrefixGenVector{12: 5, 13: 5},
+ "foo": interfaces.PrefixGenVector{12: 10, 13: 10},
+ "foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
+ },
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5, 12: 10, 13: 10},
+ "foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 2, max: 20},
+ 13: &genRange{min: 1, max: 20},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+ },
+ { // Non-identical prefixes, local and remote updates.
+ respVec: interfaces.GenVector{
+ "foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
+ },
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5},
+ "foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 2, max: 20},
+ 13: &genRange{min: 1, max: 20},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+ },
+ { // Non-identical prefixes, local and remote updates.
+ respVec: interfaces.GenVector{
+ "f": interfaces.PrefixGenVector{12: 20, 13: 20},
+ },
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 2, max: 20},
+ 13: &genRange{min: 1, max: 20},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
+ },
+ { // Non-identical interleaving prefixes.
+ respVec: interfaces.GenVector{
+ "f": interfaces.PrefixGenVector{12: 20, 13: 10},
+ "foo": interfaces.PrefixGenVector{12: 30, 13: 20},
+ "foobar": interfaces.PrefixGenVector{12: 40, 13: 30},
+ },
+ initVec: interfaces.GenVector{
+ "fo": interfaces.PrefixGenVector{11: 5, 12: 1},
+ "foob": interfaces.PrefixGenVector{11: 5, 12: 10},
+ "foobarxyz": interfaces.PrefixGenVector{11: 5, 12: 20},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "fo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+ "foo": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+ "foobar": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 2, max: 40},
+ 13: &genRange{min: 1, max: 30},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
+ },
+ { // Non-identical interleaving prefixes.
+ respVec: interfaces.GenVector{
+ "fo": interfaces.PrefixGenVector{12: 20, 13: 10},
+ "foob": interfaces.PrefixGenVector{12: 30, 13: 20},
+ "foobarxyz": interfaces.PrefixGenVector{12: 40, 13: 30},
+ },
+ initVec: interfaces.GenVector{
+ "f": interfaces.PrefixGenVector{11: 5, 12: 1},
+ "foo": interfaces.PrefixGenVector{11: 5, 12: 10},
+ "foobar": interfaces.PrefixGenVector{11: 5, 12: 20},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "f": interfaces.PrefixGenVector{10: 5},
+ "fo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+ "foob": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+ "foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 2, max: 40},
+ 13: &genRange{min: 1, max: 30},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
+ },
+ { // Non-identical sibling prefixes.
+ respVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{12: 20, 13: 10},
+ "foobarabc": interfaces.PrefixGenVector{12: 40, 13: 30},
+ "foobarxyz": interfaces.PrefixGenVector{12: 30, 13: 20},
+ },
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
+ "foobarabc": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
+ "foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 2, max: 40},
+ 13: &genRange{min: 1, max: 30},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "foobarabc", "foobarxyz", "foobar123", "fooxyz"},
+ },
+ { // Non-identical prefixes, local and remote updates.
+ respVec: interfaces.GenVector{
+ "barbaz": interfaces.PrefixGenVector{12: 18},
+ "f": interfaces.PrefixGenVector{12: 30, 13: 5},
+ "foobar": interfaces.PrefixGenVector{12: 30, 13: 8},
+ },
+ initVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{11: 5, 12: 5},
+ "foobar": interfaces.PrefixGenVector{11: 5, 12: 5},
+ "bar": interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
+ },
+ respGen: 5,
+ outVec: interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 5},
+ "foobar": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 8},
+ "bar": interfaces.PrefixGenVector{10: 5},
+ "barbaz": interfaces.PrefixGenVector{10: 5, 12: 18},
+ },
+ genDiff: genRangeVector{
+ 10: &genRange{min: 1, max: 5},
+ 12: &genRange{min: 6, max: 30},
+ 13: &genRange{min: 1, max: 8},
+ },
+ keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
+ },
+ }
+
+ for i, test := range tests {
+ svc := createService(t)
+ s := svc.sync
+ s.id = 10 //responder.
+
+ wantDiff, wantVec := test.genDiff, test.outVec
+ s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
+
+ req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
+ rSt := newResponderState(nil, nil, s, req)
+
+ rSt.computeDeltaBound(nil)
+ if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
+ t.Fatalf("computeDeltaBound failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, rSt.outVec, wantVec, rSt.errState)
+ }
+ checkEqualDevRanges(t, rSt.diff, wantDiff)
+
+ ////////////////////////////////////////
+ // Test sending deltas.
+
+ // Insert some log records to bootstrap testing below.
+ tRng := rand.New(rand.NewSource(int64(i)))
+ var wantRecs []*localLogRec
+ st := svc.St()
+ tx := st.NewTransaction()
+ objKeyPfxs := test.keyPfxs
+ j := 0
+ for id, r := range wantDiff {
+ pos := uint64(tRng.Intn(50) + 100*j)
+ for k := r.min; k <= r.max; k++ {
+ opfx := objKeyPfxs[tRng.Intn(len(objKeyPfxs))]
+ // Create holes in the log records.
+ if opfx == "" {
+ continue
+ }
+ okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
+ vers := fmt.Sprintf("%x", tRng.Int())
+ rec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
+ Pos: pos + k,
+ }
+ if err := putLogRec(nil, tx, rec); err != nil {
+ t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
+ }
+
+ initPfxs := extractAndSortPrefixes(test.initVec)
+ if !filterLogRec(rec, test.initVec, initPfxs) {
+ wantRecs = append(wantRecs, rec)
+ }
+ }
+ j++
+ }
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("cannot commit putting log rec, err %v", err)
+ }
+
+ d := &dummyResponder{}
+ rSt.call = d
+ rSt.st, rSt.errState = rSt.sync.getDbStore(nil, nil, rSt.req.AppName, rSt.req.DbName)
+ if rSt.errState != nil {
+ t.Fatalf("filterAndSendDeltas failed to get store handle for app/db %v %v", rSt.req.AppName, rSt.req.DbName)
+ }
+ err := rSt.filterAndSendDeltas(nil)
+ if err != nil {
+ t.Fatalf("filterAndSendDeltas failed (I: %v), (R: %v, %v) err %v", test.initVec, test.respGen, test.respVec, err)
+ }
+ d.diffLogRecs(t, wantRecs, wantVec)
+
+ destroyService(t, svc)
+ }
+}
+
+//////////////////////////////
+// Helpers
+
+type dummyResponder struct {
+ start, finish int
+ gotRecs []*localLogRec
+ outVec interfaces.GenVector
+}
+
+func (d *dummyResponder) RecvStream() interface {
+ Advance() bool
+ Value() interfaces.DeltaReq
+ Err() error
+} {
+ return d
+}
+
+func (d *dummyResponder) Advance() bool {
+ return false
+}
+
+func (d *dummyResponder) Value() interfaces.DeltaReq {
+ return interfaces.DeltaReq{}
+}
+
+func (d *dummyResponder) Err() error { return nil }
+
+func (d *dummyResponder) SendStream() interface {
+ Send(item interfaces.DeltaResp) error
+} {
+ return d
+}
+
+func (d *dummyResponder) Send(item interfaces.DeltaResp) error {
+ switch v := item.(type) {
+ case interfaces.DeltaRespStart:
+ d.start++
+ case interfaces.DeltaRespFinish:
+ d.finish++
+ case interfaces.DeltaRespRespVec:
+ d.outVec = v.Value
+ case interfaces.DeltaRespRec:
+ d.gotRecs = append(d.gotRecs, &localLogRec{Metadata: v.Value.Metadata})
+ }
+ return nil
+}
+
+func (d *dummyResponder) Security() security.Call {
+ return nil
+}
+
+func (d *dummyResponder) Suffix() string {
+ return ""
+}
+
+func (d *dummyResponder) LocalEndpoint() naming.Endpoint {
+ return nil
+}
+
+func (d *dummyResponder) RemoteEndpoint() naming.Endpoint {
+ return nil
+}
+
+func (d *dummyResponder) GrantedBlessings() security.Blessings {
+ return security.Blessings{}
+}
+
+func (d *dummyResponder) Server() rpc.Server {
+ return nil
+}
+
+func (d *dummyResponder) diffLogRecs(t *testing.T, wantRecs []*localLogRec, wantVec interfaces.GenVector) {
+ if d.start != 1 || d.finish != 1 {
+ t.Fatalf("diffLogRecs incorrect start/finish records (%v, %v)", d.start, d.finish)
+ }
+ if len(d.gotRecs) != len(wantRecs) {
+ t.Fatalf("diffLogRecs failed, gotLen %v, wantLen %v\n", len(d.gotRecs), len(wantRecs))
+ }
+ for i, rec := range d.gotRecs {
+ if !reflect.DeepEqual(rec.Metadata, wantRecs[i].Metadata) {
+ t.Fatalf("diffLogRecs failed, i %v, got %v, want %v\n", i, rec.Metadata, wantRecs[i].Metadata)
+ }
+ }
+ if !reflect.DeepEqual(d.outVec, wantVec) {
+ t.Fatalf("diffLogRecs failed genvector, got %v, want %v\n", d.outVec, wantVec)
+ }
+}
+
+func checkEqualDevRanges(t *testing.T, s1, s2 genRangeVector) {
+ if len(s1) != len(s2) {
+ t.Fatalf("len(s1): %v != len(s2): %v", len(s1), len(s2))
+ }
+ for d1, r1 := range s1 {
+ if r2, ok := s2[d1]; !ok || !reflect.DeepEqual(r1, r2) {
+ t.Fatalf("Dev %v: r1 %v != r2 %v", d1, r1, r2)
+ }
+ }
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index eac07f7..6d787e1 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -29,9 +29,10 @@
// syncService contains the metadata for the sync module.
type syncService struct {
// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
- id uint64 // globally unique id for this instance of Syncbase.
- name string // name derived from the global id.
- sv interfaces.Service
+ id uint64 // globally unique id for this instance of Syncbase.
+ name string // name derived from the global id.
+ sv interfaces.Service
+ server rpc.Server
// State to coordinate shutdown of spawned goroutines.
pending sync.WaitGroup
@@ -96,9 +97,10 @@
// changes to its objects. The "initiator" thread is responsible for
// periodically contacting peers to fetch changes from them. In addition, the
// sync module responds to incoming RPCs from remote sync modules.
-func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service) (*syncService, error) {
+func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, server rpc.Server) (*syncService, error) {
s := &syncService{
sv: sv,
+ server: server,
batches: make(batchSet),
}
@@ -149,13 +151,6 @@
}
////////////////////////////////////////
-// Core sync method.
-
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
- return verror.NewErrNotImplemented(ctx)
-}
-
-////////////////////////////////////////
// util.Layer methods.
func (s *syncService) Name() string {
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 101d08c..49d9ff5 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -31,17 +31,13 @@
// device.
import (
- "container/heap"
"fmt"
- "sort"
"strconv"
- "strings"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
- "v.io/v23/rpc"
"v.io/v23/verror"
)
@@ -59,6 +55,7 @@
// a) in-memory sync state of a Database consisting of the current generation
// number, log position and generation vector.
// b) watcher map of prefixes currently being synced.
+// c) republish names in mount tables for all syncgroups.
//
// TODO(hpucha): This is incomplete. Flesh this out further.
func (s *syncService) initSync(ctx *context.T) error {
@@ -347,319 +344,3 @@
}
return nil
}
-
-////////////////////////////////////////////////////////////
-// Genvector-related utilities.
-
-// sendDeltasPerDatabase sends to an initiator all the missing generations
-// corresponding to the prefixes requested for this Database, and a genvector
-// summarizing the knowledge transferred from the responder to the
-// initiator. This happens in two phases:
-//
-// In the first phase, for a given set of nested prefixes from the initiator,
-// the shortest prefix in that set is extracted. The initiator's prefix
-// genvector for this shortest prefix represents the lower bound on its
-// knowledge for the entire set of nested prefixes. This prefix genvector
-// (representing the lower bound) is diffed with all the responder prefix
-// genvectors corresponding to same or deeper prefixes compared to the initiator
-// prefix. This diff produces a bound on the missing knowledge. For example, say
-// the initiator is interested in prefixes {foo, foobar}, where each prefix is
-// associated with a prefix genvector. Since the initiator strictly has as much
-// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
-// prefix genvector is chosen as the lower bound for the initiator's
-// knowledge. Similarly, say the responder has knowledge on prefixes {f,
-// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
-// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
-// compute a bound on missing generations (all responder's prefixes that match
-// "foo". Note that since the responder doesn't have a prefix genvector at
-// "foo", its knowledge at "f" is applicable to "foo").
-//
-// Since the first phase outputs an aggressive calculation of missing
-// generations containing more generation entries than strictly needed by the
-// initiator, in the second phase, each missing generation is sent to the
-// initiator only if the initiator is eligible for it and is not aware of
-// it. The generations are sent to the initiator in the same order as the
-// responder learned them so that the initiator can reconstruct the DAG for the
-// objects by learning older nodes first.
-func (s *syncService) sendDeltasPerDatabase(ctx *context.T, call rpc.ServerCall, appName, dbName string, initVec interfaces.GenVector, stream logRecStream) (interfaces.GenVector, error) {
- // Phase 1 of sendDeltas. diff contains the bound on the generations
- // missing from the initiator per device.
- diff, outVec, err := s.computeDeltaBound(ctx, appName, dbName, initVec)
- if err != nil {
- return nil, err
- }
-
- // Phase 2 of sendDeltas: Process the diff, filtering out records that
- // are not needed, and send the remainder on the wire ordered.
- st, err := s.getDbStore(ctx, call, appName, dbName)
- if err != nil {
- return nil, err
- }
-
- // We now visit every log record in the generation range as obtained
- // from phase 1 in their log order. We use a heap to incrementally sort
- // the log records as per their position in the log.
- //
- // Init the min heap, one entry per device in the diff.
- mh := make(minHeap, 0, len(diff))
- for dev, r := range diff {
- r.cur = r.min
- rec, err := getNextLogRec(ctx, st, dev, r)
- if err != nil {
- return nil, err
- }
- if rec != nil {
- mh = append(mh, rec)
- } else {
- delete(diff, dev)
- }
- }
- heap.Init(&mh)
-
- // Process the log records in order.
- initPfxs := extractAndSortPrefixes(initVec)
-
- for mh.Len() > 0 {
- rec := heap.Pop(&mh).(*localLogRec)
-
- if !filterLogRec(rec, initVec, initPfxs) {
- // Send on the wire.
- wireRec := interfaces.LogRec{Metadata: rec.Metadata}
- // TODO(hpucha): Hash out this fake stream stuff when
- // defining the RPC and the rest of the responder.
- stream.Send(wireRec)
- }
-
- // Add a new record from the same device if not done.
- dev := rec.Metadata.Id
- rec, err := getNextLogRec(ctx, st, dev, diff[dev])
- if err != nil {
- return nil, err
- }
- if rec != nil {
- heap.Push(&mh, rec)
- } else {
- delete(diff, dev)
- }
- }
-
- return outVec, nil
-}
-
-// computeDeltaBound computes the bound on missing generations across all
-// requested prefixes (phase 1 of sendDeltas).
-func (s *syncService) computeDeltaBound(ctx *context.T, appName, dbName string, initVec interfaces.GenVector) (genRangeVector, interfaces.GenVector, error) {
- respVec, respGen, err := s.getDbGenInfo(ctx, appName, dbName)
- if err != nil {
- return nil, nil, err
- }
- respPfxs := extractAndSortPrefixes(respVec)
- initPfxs := extractAndSortPrefixes(initVec)
- if len(initPfxs) == 0 {
- return nil, nil, verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
- }
-
- outVec := make(interfaces.GenVector)
- diff := make(genRangeVector)
- pfx := initPfxs[0]
-
- for _, p := range initPfxs {
- if strings.HasPrefix(p, pfx) && p != pfx {
- continue
- }
-
- // Process this prefix as this is the start of a new set of
- // nested prefixes.
- pfx = p
-
- // Lower bound on initiator's knowledge for this prefix set.
- initpgv := initVec[pfx]
-
- // Find the relevant responder prefixes and add the corresponding knowledge.
- var respgv interfaces.PrefixGenVector
- var rpStart string
- for _, rp := range respPfxs {
- if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
- // No relationship with pfx.
- continue
- }
-
- if strings.HasPrefix(pfx, rp) {
- // If rp is a prefix of pfx, remember it because
- // it may be a potential starting point for the
- // responder's knowledge. The actual starting
- // point is the deepest prefix where rp is a
- // prefix of pfx.
- //
- // Say the initiator is looking for "foo", and
- // the responder has knowledge for "f" and "fo",
- // the responder's starting point will be the
- // prefix genvector for "fo". Similarly, if the
- // responder has knowledge for "foo", the
- // starting point will be the prefix genvector
- // for "foo".
- rpStart = rp
- } else {
- // If pfx is a prefix of rp, this knowledge must
- // be definitely sent to the initiator. Diff the
- // prefix genvectors to adjust the delta bound and
- // include in outVec.
- respgv = respVec[rp]
- s.diffPrefixGenVectors(respgv, initpgv, diff)
- outVec[rp] = respgv
- }
- }
-
- // Deal with the starting point.
- if rpStart == "" {
- // No matching prefixes for pfx were found.
- respgv = make(interfaces.PrefixGenVector)
- respgv[s.id] = respGen
- } else {
- respgv = respVec[rpStart]
- }
- s.diffPrefixGenVectors(respgv, initpgv, diff)
- outVec[pfx] = respgv
- }
-
- return diff, outVec, nil
-}
-
-// genRange represents a range of generations (min and max inclusive).
-type genRange struct {
- min uint64
- max uint64
- cur uint64
-}
-
-type genRangeVector map[uint64]*genRange
-
-// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
-// and the initiator, and updates the range of generations per device known to
-// the responder but not known to the initiator. "gens" (generation range) is
-// passed in as an input argument so that it can be incrementally updated as the
-// range of missing generations grows when different responder prefix genvectors
-// are used to compute the diff.
-//
-// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
-// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
-// two vectors returns: {A:[6-10], C:[1-1]}.
-//
-// TODO(hpucha): Add reclaimVec for GCing.
-func (s *syncService) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector, gens genRangeVector) {
- // Compute missing generations for devices that are in both initiator's and responder's vectors.
- for devid, gen := range initPVec {
- rgen, ok := respPVec[devid]
- // Skip since responder doesn't know of this device.
- if ok {
- updateDevRange(devid, rgen, gen, gens)
- }
- }
-
- // Compute missing generations for devices not in initiator's vector but in responder's vector.
- for devid, rgen := range respPVec {
- if _, ok := initPVec[devid]; !ok {
- updateDevRange(devid, rgen, 0, gens)
- }
- }
-}
-
-func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
- if gen < rgen {
- // Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
- if r, ok := gens[devid]; !ok {
- gens[devid] = &genRange{min: gen + 1, max: rgen}
- } else {
- if gen+1 < r.min {
- r.min = gen + 1
- }
- if rgen > r.max {
- r.max = rgen
- }
- }
- }
-}
-
-func extractAndSortPrefixes(vec interfaces.GenVector) []string {
- pfxs := make([]string, len(vec))
- i := 0
- for p := range vec {
- pfxs[i] = p
- i++
- }
- sort.Strings(pfxs)
- return pfxs
-}
-
-// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
-// loop.
-func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
- for i := r.cur; i <= r.max; i++ {
- rec, err := getLogRec(ctx, sn, dev, i)
- if err == nil {
- r.cur = i + 1
- return rec, nil
- }
- if verror.ErrorID(err) != verror.ErrNoExist.ID {
- return nil, err
- }
- }
- return nil, nil
-}
-
-// Note: initPfxs is sorted.
-func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
- filter := true
-
- var maxGen uint64
- for _, p := range initPfxs {
- if strings.HasPrefix(rec.Metadata.ObjId, p) {
- // Do not filter. Initiator is interested in this
- // prefix.
- filter = false
-
- // Track if the initiator knows of this record.
- gen := initVec[p][rec.Metadata.Id]
- if maxGen < gen {
- maxGen = gen
- }
- }
- }
-
- // Filter this record if the initiator already has it.
- if maxGen >= rec.Metadata.Gen {
- return true
- }
-
- return filter
-}
-
-// A minHeap implements heap.Interface and holds local log records.
-type minHeap []*localLogRec
-
-func (mh minHeap) Len() int { return len(mh) }
-
-func (mh minHeap) Less(i, j int) bool {
- return mh[i].Pos < mh[j].Pos
-}
-
-func (mh minHeap) Swap(i, j int) {
- mh[i], mh[j] = mh[j], mh[i]
-}
-
-func (mh *minHeap) Push(x interface{}) {
- item := x.(*localLogRec)
- *mh = append(*mh, item)
-}
-
-func (mh *minHeap) Pop() interface{} {
- old := *mh
- n := len(old)
- item := old[n-1]
- *mh = old[0 : n-1]
- return item
-}
-
-type logRecStream interface {
- Send(interfaces.LogRec)
-}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 57c1f66..aa232e4 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -5,8 +5,6 @@
package vsync
import (
- "fmt"
- "math/rand"
"reflect"
"testing"
"time"
@@ -113,389 +111,6 @@
checkLogRec(t, st, id, gen, false, nil)
}
-// TestDiffPrefixGenVectors tests diffing prefix gen vectors.
-func TestDiffPrefixGenVectors(t *testing.T) {
- svc := createService(t)
- defer destroyService(t, svc)
- s := svc.sync
- s.id = 10 //responder. Initiator is id 11.
-
- tests := []struct {
- respPVec, initPVec interfaces.PrefixGenVector
- genDiffIn genRangeVector
- genDiffWant genRangeVector
- }{
- { // responder and initiator are at identical vectors.
- respPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
- initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
- genDiffIn: make(genRangeVector),
- },
- { // responder and initiator are at identical vectors.
- respPVec: interfaces.PrefixGenVector{10: 0},
- initPVec: interfaces.PrefixGenVector{10: 0},
- genDiffIn: make(genRangeVector),
- },
- { // responder has no updates.
- respPVec: interfaces.PrefixGenVector{10: 0},
- initPVec: interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 8},
- genDiffIn: make(genRangeVector),
- },
- { // responder and initiator have no updates.
- respPVec: interfaces.PrefixGenVector{10: 0},
- initPVec: interfaces.PrefixGenVector{11: 0},
- genDiffIn: make(genRangeVector),
- },
- { // responder is staler than initiator.
- respPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
- initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 8, 14: 5},
- genDiffIn: make(genRangeVector),
- },
- { // responder is more up-to-date than initiator for local updates.
- respPVec: interfaces.PrefixGenVector{10: 5, 11: 10, 12: 20, 13: 2},
- initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
- genDiffIn: make(genRangeVector),
- genDiffWant: genRangeVector{10: &genRange{min: 2, max: 5}},
- },
- { // responder is fresher than initiator for local updates and one device.
- respPVec: interfaces.PrefixGenVector{10: 5, 11: 10, 12: 22, 13: 2},
- initPVec: interfaces.PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2, 14: 40},
- genDiffIn: make(genRangeVector),
- genDiffWant: genRangeVector{
- 10: &genRange{min: 2, max: 5},
- 12: &genRange{min: 21, max: 22},
- },
- },
- { // responder is fresher than initiator in all but one device.
- respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
- initPVec: interfaces.PrefixGenVector{10: 0, 11: 2, 12: 0},
- genDiffIn: make(genRangeVector),
- genDiffWant: genRangeVector{
- 10: &genRange{min: 1, max: 1},
- 12: &genRange{min: 1, max: 3},
- 13: &genRange{min: 1, max: 4},
- },
- },
- { // initiator has no updates.
- respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
- initPVec: interfaces.PrefixGenVector{},
- genDiffIn: make(genRangeVector),
- genDiffWant: genRangeVector{
- 10: &genRange{min: 1, max: 1},
- 11: &genRange{min: 1, max: 2},
- 12: &genRange{min: 1, max: 3},
- 13: &genRange{min: 1, max: 4},
- },
- },
- { // initiator has no updates, pre-existing diff.
- respPVec: interfaces.PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
- initPVec: interfaces.PrefixGenVector{13: 1},
- genDiffIn: genRangeVector{
- 10: &genRange{min: 5, max: 20},
- 13: &genRange{min: 1, max: 3},
- },
- genDiffWant: genRangeVector{
- 10: &genRange{min: 1, max: 20},
- 11: &genRange{min: 1, max: 2},
- 12: &genRange{min: 1, max: 3},
- 13: &genRange{min: 1, max: 4},
- },
- },
- }
-
- for _, test := range tests {
- want := test.genDiffWant
- got := test.genDiffIn
- s.diffPrefixGenVectors(test.respPVec, test.initPVec, got)
- checkEqualDevRanges(t, got, want)
- }
-}
-
-// TestSendDeltas tests the computation of the delta bound (computeDeltaBound)
-// and if the log records on the wire are correctly ordered.
-func TestSendDeltas(t *testing.T) {
- appName := "mockapp"
- dbName := "mockdb"
-
- tests := []struct {
- respVec, initVec, outVec interfaces.GenVector
- respGen uint64
- genDiff genRangeVector
- keyPfxs []string
- }{
- { // Identical prefixes, local and remote updates.
- respVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{12: 8},
- "foobar": interfaces.PrefixGenVector{12: 10},
- },
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5},
- "foobar": interfaces.PrefixGenVector{11: 5},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5, 12: 8},
- "foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 1, max: 10},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", ""},
- },
- { // Identical prefixes, local and remote updates.
- respVec: interfaces.GenVector{
- "bar": interfaces.PrefixGenVector{12: 20},
- "foo": interfaces.PrefixGenVector{12: 8},
- "foobar": interfaces.PrefixGenVector{12: 10},
- },
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5},
- "foobar": interfaces.PrefixGenVector{11: 5, 12: 10},
- "bar": interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5, 12: 8},
- "foobar": interfaces.PrefixGenVector{10: 5, 12: 10},
- "bar": interfaces.PrefixGenVector{10: 5, 12: 20},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 1, max: 20},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
- },
- { // Non-identical prefixes, local only updates.
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5},
- "foobar": interfaces.PrefixGenVector{11: 5},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
- },
- { // Non-identical prefixes, local and remote updates.
- respVec: interfaces.GenVector{
- "f": interfaces.PrefixGenVector{12: 5, 13: 5},
- "foo": interfaces.PrefixGenVector{12: 10, 13: 10},
- "foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
- },
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5, 12: 10, 13: 10},
- "foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 2, max: 20},
- 13: &genRange{min: 1, max: 20},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
- },
- { // Non-identical prefixes, local and remote updates.
- respVec: interfaces.GenVector{
- "foobar": interfaces.PrefixGenVector{12: 20, 13: 20},
- },
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5},
- "foobar": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 2, max: 20},
- 13: &genRange{min: 1, max: 20},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
- },
- { // Non-identical prefixes, local and remote updates.
- respVec: interfaces.GenVector{
- "f": interfaces.PrefixGenVector{12: 20, 13: 20},
- },
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 20},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 2, max: 20},
- 13: &genRange{min: 1, max: 20},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "fooxyz"},
- },
- { // Non-identical interleaving prefixes.
- respVec: interfaces.GenVector{
- "f": interfaces.PrefixGenVector{12: 20, 13: 10},
- "foo": interfaces.PrefixGenVector{12: 30, 13: 20},
- "foobar": interfaces.PrefixGenVector{12: 40, 13: 30},
- },
- initVec: interfaces.GenVector{
- "fo": interfaces.PrefixGenVector{11: 5, 12: 1},
- "foob": interfaces.PrefixGenVector{11: 5, 12: 10},
- "foobarxyz": interfaces.PrefixGenVector{11: 5, 12: 20},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "fo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
- "foo": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
- "foobar": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 2, max: 40},
- 13: &genRange{min: 1, max: 30},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
- },
- { // Non-identical interleaving prefixes.
- respVec: interfaces.GenVector{
- "fo": interfaces.PrefixGenVector{12: 20, 13: 10},
- "foob": interfaces.PrefixGenVector{12: 30, 13: 20},
- "foobarxyz": interfaces.PrefixGenVector{12: 40, 13: 30},
- },
- initVec: interfaces.GenVector{
- "f": interfaces.PrefixGenVector{11: 5, 12: 1},
- "foo": interfaces.PrefixGenVector{11: 5, 12: 10},
- "foobar": interfaces.PrefixGenVector{11: 5, 12: 20},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "f": interfaces.PrefixGenVector{10: 5},
- "fo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
- "foob": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
- "foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 2, max: 40},
- 13: &genRange{min: 1, max: 30},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "fo", "foob", "foobarxyz", "fooxyz"},
- },
- { // Non-identical sibling prefixes.
- respVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{12: 20, 13: 10},
- "foobarabc": interfaces.PrefixGenVector{12: 40, 13: 30},
- "foobarxyz": interfaces.PrefixGenVector{12: 30, 13: 20},
- },
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5, 12: 1},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5, 12: 20, 13: 10},
- "foobarabc": interfaces.PrefixGenVector{10: 5, 12: 40, 13: 30},
- "foobarxyz": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 20},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 2, max: 40},
- 13: &genRange{min: 1, max: 30},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "", "foobarabc", "foobarxyz", "foobar123", "fooxyz"},
- },
- { // Non-identical prefixes, local and remote updates.
- respVec: interfaces.GenVector{
- "barbaz": interfaces.PrefixGenVector{12: 18},
- "f": interfaces.PrefixGenVector{12: 30, 13: 5},
- "foobar": interfaces.PrefixGenVector{12: 30, 13: 8},
- },
- initVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{11: 5, 12: 5},
- "foobar": interfaces.PrefixGenVector{11: 5, 12: 5},
- "bar": interfaces.PrefixGenVector{10: 5, 11: 5, 12: 5},
- },
- respGen: 5,
- outVec: interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 5},
- "foobar": interfaces.PrefixGenVector{10: 5, 12: 30, 13: 8},
- "bar": interfaces.PrefixGenVector{10: 5},
- "barbaz": interfaces.PrefixGenVector{10: 5, 12: 18},
- },
- genDiff: genRangeVector{
- 10: &genRange{min: 1, max: 5},
- 12: &genRange{min: 6, max: 30},
- 13: &genRange{min: 1, max: 8},
- },
- keyPfxs: []string{"baz", "wombat", "f", "foo", "foobar", "bar", "barbaz", ""},
- },
- }
-
- for i, test := range tests {
- svc := createService(t)
- s := svc.sync
- s.id = 10 //responder.
-
- wantDiff, wantVec := test.genDiff, test.outVec
- s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
-
- gotDiff, gotVec, err := s.computeDeltaBound(nil, appName, dbName, test.initVec)
- if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
- t.Fatalf("computeDeltaBound failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
- }
- checkEqualDevRanges(t, gotDiff, wantDiff)
-
- // Insert some log records to bootstrap testing below.
- tRng := rand.New(rand.NewSource(int64(i)))
- var wantRecs []*localLogRec
- st := svc.St()
- tx := st.NewTransaction()
- objKeyPfxs := test.keyPfxs
- j := 0
- for id, r := range wantDiff {
- pos := uint64(tRng.Intn(50) + 100*j)
- for k := r.min; k <= r.max; k++ {
- opfx := objKeyPfxs[tRng.Intn(len(objKeyPfxs))]
- // Create holes in the log records.
- if opfx == "" {
- continue
- }
- okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
- vers := fmt.Sprintf("%x", tRng.Int())
- rec := &localLogRec{
- Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
- Pos: pos + k,
- }
- if err := putLogRec(nil, tx, rec); err != nil {
- t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
- }
-
- initPfxs := extractAndSortPrefixes(test.initVec)
- if !filterLogRec(rec, test.initVec, initPfxs) {
- wantRecs = append(wantRecs, rec)
- }
- }
- j++
- }
- if err := tx.Commit(); err != nil {
- t.Fatalf("cannot commit putting log rec, err %v", err)
- }
-
- ts := &logRecStreamTest{}
- gotVec, err = s.sendDeltasPerDatabase(nil, nil, appName, dbName, test.initVec, ts)
- if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
- t.Fatalf("sendDeltasPerDatabase failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
- }
- ts.diffLogRecs(t, wantRecs)
-
- destroyService(t, svc)
- }
-}
-
//////////////////////////////
// Helpers
@@ -503,25 +118,6 @@
// for getting the stack trace. Right now cannot import the package due to a
// cycle.
-type logRecStreamTest struct {
- gotRecs []*localLogRec
-}
-
-func (s *logRecStreamTest) Send(rec interfaces.LogRec) {
- s.gotRecs = append(s.gotRecs, &localLogRec{Metadata: rec.Metadata})
-}
-
-func (s *logRecStreamTest) diffLogRecs(t *testing.T, wantRecs []*localLogRec) {
- if len(s.gotRecs) != len(wantRecs) {
- t.Fatalf("diffLogRecMetadata failed, gotLen %v, wantLen %v\n", len(s.gotRecs), len(wantRecs))
- }
- for i, rec := range s.gotRecs {
- if !reflect.DeepEqual(rec.Metadata, wantRecs[i].Metadata) {
- t.Fatalf("diffLogRecMetadata failed, i %v, got %v, want %v\n", i, rec.Metadata, wantRecs[i].Metadata)
- }
- }
-}
-
func checkDbSyncState(t *testing.T, st store.StoreReader, exists bool, wantSt *dbSyncState) {
gotSt, err := getDbSyncState(nil, st)
@@ -549,14 +145,3 @@
t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
}
}
-
-func checkEqualDevRanges(t *testing.T, s1, s2 genRangeVector) {
- if len(s1) != len(s2) {
- t.Fatalf("len(s1): %v != len(s2): %v", len(s1), len(s2))
- }
- for d1, r1 := range s1 {
- if r2, ok := s2[d1]; !ok || !reflect.DeepEqual(r1, r2) {
- t.Fatalf("Dev %v: r1 %v != r2 %v", d1, r1, r2)
- }
- }
-}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 79cc9bf..31eaff0 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -18,13 +18,12 @@
"strings"
"time"
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
-
- wire "v.io/syncbase/v23/services/syncbase/nosql"
-
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
@@ -609,9 +608,20 @@
}
func (sd *syncDatabase) publishInMountTables(ctx *context.T, call rpc.ServerCall, spec wire.SyncGroupSpec) error {
- // TODO(hpucha): To be implemented.
- // Pass server to Service in store.
- // server.ServeDispatcher(*name, service, authorizer)
+ // Get this Syncbase's sync module handle.
+ ss := sd.db.App().Service().Sync().(*syncService)
+
+ for _, mt := range spec.MountTables {
+ name := naming.Join(mt, ss.name)
+ // TODO(hpucha): Is this add idempotent? Appears to be from code.
+ // Will it handle absolute names. Appears to be.
+ if err := ss.server.AddName(name); err != nil {
+ return err
+ }
+ }
+
+ // TODO(hpucha): Do we have to publish in neighborhood explicitly?
+
return nil
}
diff --git a/services/syncbase/vsync/util_test.go b/services/syncbase/vsync/util_test.go
index 0848d35..3e6b629 100644
--- a/services/syncbase/vsync/util_test.go
+++ b/services/syncbase/vsync/util_test.go
@@ -136,7 +136,7 @@
path: path,
shutdown: shutdown,
}
- if s.sync, err = New(ctx, nil, s); err != nil {
+ if s.sync, err = New(ctx, nil, s, nil); err != nil {
util.DestroyStore(engine, path)
t.Fatalf("cannot create sync service: %v", err)
}