syncbase/vsync: Integrate SyncGroup syncing, and add e2e tests.
MultiPart: 2/2
Change-Id: Ie3e669bb0357fe42b8853e29dd4f08f0eac92a15
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
index 39732b1..a37ddc5 100644
--- a/services/syncbase/server/interfaces/sync_types.go
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -19,3 +19,49 @@
}
return out
}
+
+// Compare returns an integer comparing two prefix generation vectors. The
+// result will be 0 if a==b, -1 if a < b, +1 if a > b and +2 if a and b are
+// uncomparable.
+func (a PrefixGenVector) Compare(b PrefixGenVector) int {
+ res := -2
+
+ if len(a) == 0 && len(b) == 0 {
+ return 0
+ }
+
+ for aid, agen := range a {
+ bgen, ok := b[aid]
+
+ resCur := 0
+ if agen > bgen || !ok {
+ resCur = 1
+ } else if agen < bgen {
+ resCur = -1
+ }
+
+ if res == -2 || res == 0 {
+ // Initialize/overwrite safely with the curent result.
+ res = resCur
+ } else if res != resCur && resCur != 0 {
+ // Uncomparable, since some elements are less and others
+ // are greater.
+ return 2
+ }
+ }
+
+ for bid := range b {
+ if _, ok := a[bid]; ok {
+ continue
+ }
+
+ if res == 1 {
+ // Missing elements. So a cannot be greater than b.
+ return 2
+ }
+
+ return -1
+ }
+
+ return res
+}
diff --git a/services/syncbase/server/interfaces/sync_types_test.go b/services/syncbase/server/interfaces/sync_types_test.go
new file mode 100644
index 0000000..4386bf5
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types_test.go
@@ -0,0 +1,105 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+import "testing"
+
+func TestPrefixGenVectorCompare(t *testing.T) {
+ tests := []struct {
+ a, b PrefixGenVector
+ resAB int
+ resBA int
+ }{
+ { // a = b.
+ a: PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ b: PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ resAB: 0,
+ resBA: 0,
+ },
+ { // a = b.
+ a: PrefixGenVector{},
+ b: PrefixGenVector{},
+ resAB: 0,
+ resBA: 0,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 2, 12: 3, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 11, 12: 2, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 11, 12: 12, 13: 13},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 1, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 5, 12: 23, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 0, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{11: 5, 12: 23, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a != b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{10: 56, 11: 5, 12: 23, 13: 4},
+ resAB: 2,
+ resBA: 2,
+ },
+ { // a != b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 50, 12: 23, 13: 4},
+ resAB: 2,
+ resBA: 2,
+ },
+ { // a != b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 11, 12: 2, 13: 4, 15: 40},
+ resAB: 2,
+ resBA: 2,
+ },
+ }
+
+ for pos, test := range tests {
+ got, want := test.a.Compare(test.b), test.resAB
+ if got != want {
+ t.Fatalf("Comparison failed for pos %d (a=%v, b=%v), got %v, want %v", pos, test.a, test.b, got, want)
+ }
+ got, want = test.b.Compare(test.a), test.resBA
+ if got != want {
+ t.Fatalf("Comparison failed for pos %d (a=%v, b=%v), got %v, want %v", pos, test.a, test.b, got, want)
+ }
+ }
+}
diff --git a/services/syncbase/testutil/v23util.go b/services/syncbase/testutil/v23util.go
index 8e370a8..42496a7 100644
--- a/services/syncbase/testutil/v23util.go
+++ b/services/syncbase/testutil/v23util.go
@@ -31,6 +31,7 @@
}
rmRootDir = true
}
+
// Start syncbased.
invocation := syncbased.WithStartOpts(syncbased.StartOpts().WithCustomCredentials(creds)).Start(
"--v23.tcp.address=127.0.0.1:0",
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 94f79e1..0727cca 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -494,6 +494,9 @@
if iSt.sg {
// Add the SyncGroup value to the Database.
+ if err := iSt.insertSgRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
+ return err
+ }
} else {
if err := iSt.insertRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
return err
@@ -564,6 +567,16 @@
return err
}
+// insertSgRecInDb inserts the versioned value of a SyncGroup in the Database.
+func (iSt *initiationState) insertSgRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
+ m := rec.Metadata
+ var sg interfaces.SyncGroup
+ if err := vom.Decode(valbuf, &sg); err != nil {
+ return err
+ }
+ return setSGDataEntryByOID(ctx, tx, m.ObjId, m.CurVers, &sg)
+}
+
// insertRecInDb inserts the versioned value in the Database.
func (iSt *initiationState) insertRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
m := rec.Metadata
@@ -944,7 +957,7 @@
Gen: dsInMem.data.gen,
CheckptGen: dsInMem.data.checkptGen,
},
- Sgs: make(map[interfaces.GroupId]localGenInfo),
+ Sgs: make(map[string]localGenInfo),
GenVec: dsInMem.genvec,
SgGenVec: dsInMem.sggenvec,
}
@@ -970,6 +983,31 @@
if _, ok := genvec[rpfx]; !ok {
genvec[rpfx] = respgv
}
+
+ if iSt.sg {
+ // Flip sync pending if needed in case of SyncGroup
+ // syncing. See explanation for SyncPending flag in
+ // types.vdl.
+ gid, err := sgID(rpfx)
+ if err != nil {
+ return err
+ }
+ state, err := getSGIdEntry(ctx, iSt.tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.SyncPending {
+ curgv := genvec[rpfx]
+ res := curgv.Compare(state.PendingGenVec)
+ vlog.VI(4).Infof("sync: updateSyncSt:: checking join pending %v, curgv %v, res %v", state.PendingGenVec, curgv, res)
+ if res >= 0 {
+ state.SyncPending = false
+ if err := setSGIdEntry(ctx, iSt.tx, gid, state); err != nil {
+ return err
+ }
+ }
+ }
+ }
}
iSt.updLocal = genvec
@@ -979,8 +1017,6 @@
delete(pgv, iSt.config.sync.id)
}
- // TODO(hpucha): Flip join pending if needed.
-
// TODO(hpucha): Add knowledge compaction.
return putDbSyncState(ctx, iSt.tx, ds)
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 7c1a045..c740d95 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -490,7 +490,7 @@
t.Fatalf("Mount tables are not equal config %v, spec %v", iSt.config.mtTables, sg1.Spec.MountTables)
}
- s.initSyncStateInMem(nil, "mockapp", "mockdb", sgId1)
+ s.initSyncStateInMem(nil, "mockapp", "mockdb", sgOID(sgId1))
iSt.stream = createReplayStream(t, rfile)
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 0c78f52..8cdc47a 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -8,12 +8,11 @@
"container/heap"
"fmt"
"sort"
- "strconv"
"strings"
"v.io/v23/context"
- wire "v.io/v23/services/syncbase/nosql"
"v.io/v23/verror"
+ "v.io/v23/vom"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/watchable"
@@ -69,10 +68,10 @@
rSt.initVec = v.Value.InitVec
rSt.sgIds = make(sgSet)
// Populate the sgids from the initvec.
- for id := range rSt.initVec {
- gid, err := strconv.ParseUint(id, 10, 64)
+ for oid := range rSt.initVec {
+ gid, err := sgID(oid)
if err != nil {
- vlog.Fatalf("sync: newResponderState: invalid syncgroup id", gid)
+ vlog.Fatalf("sync: newResponderState: invalid syncgroup key", oid)
}
rSt.sgIds[interfaces.GroupId(gid)] = struct{}{}
}
@@ -119,8 +118,8 @@
// embedded, consider using a helper function to auto-fill it instead
// (see http://goo.gl/mEa4L0) but only incur that overhead when the
// logging level specified is enabled.
- vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
- rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
+ vlog.VI(3).Infof("sync: sendDeltasPerDatabase: recvd %s, %s: sgids %v, genvec %v, sg %v",
+ rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec, rSt.sg)
// Phase 1 of sendDeltas: Authorize the initiator and respond to the
// caller only for the SyncGroups that allow access.
@@ -179,12 +178,6 @@
for _, p := range sg.Spec.Prefixes {
allowedPfxs[p] = struct{}{}
}
-
- // Add the initiator to the SyncGroup membership if not already
- // in it. It is a temporary solution until SyncGroup metadata
- // is synchronized peer to peer.
- // TODO(rdaoud): remove this when SyncGroups are synced.
- rSt.addInitiatorToSyncGroup(ctx, sgid)
}
if err != nil {
@@ -214,49 +207,9 @@
return nil
}
-// addInitiatorToSyncGroup adds the request initiator to the membership of the
-// given SyncGroup if the initiator is not already a member. It is a temporary
-// solution until SyncGroup metadata starts being synchronized, at which time
-// peers will learn of new members through mutations of the SyncGroup metadata
-// by the SyncGroup administrators.
-// Note: the joiner metadata is fake because the responder does not have it.
-func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
- if rSt.initiator == "" {
- return
- }
-
- err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error {
- version, err := getSyncGroupVersion(ctx, tx, gid)
- if err != nil {
- return err
- }
- sg, err := getSGDataEntry(ctx, tx, gid, version)
- if err != nil {
- return err
- }
-
- // If the initiator is already a member of the SyncGroup abort
- // the transaction with a special error code.
- if _, ok := sg.Joiners[rSt.initiator]; ok {
- return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
- }
-
- vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
- sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
- return setSGDataEntry(ctx, tx, gid, version, sg)
- })
-
- if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
- vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
- }
-}
-
// sendSgDeltas computes the bound on missing generations, and sends the missing
// log records across all requested SyncGroups (phases 2 and 3 of sendDeltas).
func (rSt *responderState) sendSgDeltas(ctx *context.T) error {
- vlog.VI(3).Infof("sync: sendSgDeltas: %s, %s: sgids %v, genvec %v",
- rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
-
respVec, _, err := rSt.sync.copyDbGenInfo(ctx, rSt.appName, rSt.dbName, rSt.sgIds)
if err != nil {
return err
@@ -367,7 +320,7 @@
rSt.outVec[pfx] = respgv
}
- vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
+ vlog.VI(3).Infof("sync: computeDataDeltas: %s, %s: diff %v, outvec %v",
rSt.appName, rSt.dbName, rSt.diff, rSt.outVec)
return nil
}
@@ -411,7 +364,7 @@
if rSt.sg || !filterLogRec(rec, rSt.initVec, initPfxs) {
// Send on the wire.
- wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
+ wireRec, err := makeWireLogRec(ctx, rSt.sg, rSt.st, rec)
if err != nil {
return err
}
@@ -434,6 +387,8 @@
}
func (rSt *responderState) sendGenVec(ctx *context.T) error {
+ vlog.VI(3).Infof("sync: sendGenVec: sending genvec %v", rSt.outVec)
+
sender := rSt.call.SendStream()
sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
return nil
@@ -553,11 +508,20 @@
// makeWireLogRec creates a sync log record to send on the wire from a given
// local sync record.
-func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
+func makeWireLogRec(ctx *context.T, sg bool, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
// Get the object value at the required version.
key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
var value []byte
- if !rec.Metadata.Delete {
+ if sg {
+ sg, err := getSGDataEntryByOID(ctx, st, key, version)
+ if err != nil {
+ return nil, err
+ }
+ value, err = vom.Encode(sg)
+ if err != nil {
+ return nil, err
+ }
+ } else if !rec.Metadata.Delete {
var err error
value, err = watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
if err != nil {
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index d820d8f..953f150 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -57,7 +57,6 @@
"v.io/v23/context"
"v.io/v23/verror"
- "v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/util"
"v.io/x/ref/services/syncbase/store"
@@ -82,8 +81,11 @@
// dbSyncStateInMem represents the in-memory sync state of a Database and all
// its SyncGroups.
type dbSyncStateInMem struct {
- data *localGenInfoInMem // info for data.
- sgs map[interfaces.GroupId]*localGenInfoInMem // info for SyncGroups.
+ data *localGenInfoInMem // info for data.
+
+ // Info for SyncGroups. The key here is the SyncGroup oid of the form
+ // $sync:sgd:<group id>. More details in syncgroup.go.
+ sgs map[string]*localGenInfoInMem
// Note: Generation vector contains state from remote devices only.
genvec interfaces.GenVector
@@ -94,9 +96,9 @@
out := &dbSyncStateInMem{}
out.data = in.data.deepCopy()
- out.sgs = make(map[interfaces.GroupId]*localGenInfoInMem)
- for id, info := range in.sgs {
- out.sgs[id] = info.deepCopy()
+ out.sgs = make(map[string]*localGenInfoInMem)
+ for oid, info := range in.sgs {
+ out.sgs[oid] = info.deepCopy()
}
out.genvec = in.genvec.DeepCopy()
@@ -200,25 +202,22 @@
// Note: For all the utilities below, if the sgid parameter is non-nil, the
// operation is performed in the SyncGroup space. If nil, it is performed in the
// data space for the Database.
-//
-// TODO(hpucha): Once GroupId is changed to string, clean up these function
-// signatures.
// reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
// positions in a Database's log. Used when local updates result in log
// entries.
-func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) (uint64, uint64) {
- return s.reserveGenAndPosInternal(appName, dbName, sgid, count, count)
+func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgoid string, count uint64) (uint64, uint64) {
+ return s.reserveGenAndPosInternal(appName, dbName, sgoid, count, count)
}
// reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
// when remote log records are received.
-func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) uint64 {
- _, pos := s.reserveGenAndPosInternal(appName, dbName, sgid, 0, count)
+func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgoid string, count uint64) uint64 {
+ _, pos := s.reserveGenAndPosInternal(appName, dbName, sgoid, 0, count)
return pos
}
-func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgid string, genCount, posCount uint64) (uint64, uint64) {
+func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgoid string, genCount, posCount uint64) (uint64, uint64) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -227,23 +226,18 @@
if !ok {
ds = &dbSyncStateInMem{
data: &localGenInfoInMem{gen: 1},
- sgs: make(map[interfaces.GroupId]*localGenInfoInMem),
+ sgs: make(map[string]*localGenInfoInMem),
}
s.syncState[name] = ds
}
var info *localGenInfoInMem
- if sgid != "" {
- id, err := strconv.ParseUint(sgid, 10, 64)
- if err != nil {
- vlog.Fatalf("sync: reserveGenAndPosInternal: invalid syncgroup id", sgid)
- }
-
+ if sgoid != "" {
var ok bool
- info, ok = ds.sgs[interfaces.GroupId(id)]
+ info, ok = ds.sgs[sgoid]
if !ok {
info = &localGenInfoInMem{gen: 1}
- ds.sgs[interfaces.GroupId(id)] = info
+ ds.sgs[sgoid] = info
}
} else {
info = ds.data
@@ -273,7 +267,7 @@
if len(sgs) > 0 {
// Checkpoint requested SyncGroups.
for id := range sgs {
- info, ok := ds.sgs[id]
+ info, ok := ds.sgs[sgOID(id)]
if !ok {
return verror.New(verror.ErrInternal, ctx, "sg state not found", name, id)
}
@@ -286,7 +280,7 @@
}
// initSyncStateInMem initializes the in memory sync state of the Database/SyncGroup if needed.
-func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgid interfaces.GroupId) {
+func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgoid string) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -294,13 +288,13 @@
if s.syncState[name] == nil {
s.syncState[name] = &dbSyncStateInMem{
data: &localGenInfoInMem{gen: 1},
- sgs: make(map[interfaces.GroupId]*localGenInfoInMem),
+ sgs: make(map[string]*localGenInfoInMem),
}
}
- if sgid != interfaces.NoGroupId {
+ if sgoid != "" {
ds := s.syncState[name]
- if _, ok := ds.sgs[sgid]; !ok {
- ds.sgs[sgid] = &localGenInfoInMem{gen: 1}
+ if _, ok := ds.sgs[sgoid]; !ok {
+ ds.sgs[sgoid] = &localGenInfoInMem{gen: 1}
}
}
return
@@ -335,10 +329,10 @@
if len(sgs) > 0 {
genvec = make(interfaces.GenVector)
for id := range sgs {
- sid := fmt.Sprintf("%d", id)
- gv := ds.sggenvec[sid]
- genvec[sid] = gv.DeepCopy()
- genvec[sid][s.id] = ds.sgs[id].checkptGen
+ sgoid := sgOID(id)
+ gv := ds.sggenvec[sgoid]
+ genvec[sgoid] = gv.DeepCopy()
+ genvec[sgoid][s.id] = ds.sgs[sgoid].checkptGen
}
} else {
genvec = ds.genvec.DeepCopy()
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 3036611..5a3b2c8 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -6,7 +6,6 @@
import (
"reflect"
- "strconv"
"testing"
"time"
@@ -40,11 +39,7 @@
if sgid == "" {
info = s.syncState[name].data
} else {
- id, err := strconv.ParseUint(sgid, 10, 64)
- if err != nil {
- t.Fatalf("reserveGenAndPosInternal failed, invalid sgid %v", sgid)
- }
- info = s.syncState[name].sgs[interfaces.GroupId(id)]
+ info = s.syncState[name].sgs[sgid]
}
if info.gen != wantGen || info.pos != wantPos {
t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", info.gen, wantGen, info.pos, wantPos)
@@ -74,9 +69,9 @@
100: 200, 300: 400, 500: 600,
},
}
- localsgs := make(map[interfaces.GroupId]localGenInfo)
- localsgs[interfaces.GroupId(8888)] = localGenInfo{Gen: 56, CheckptGen: 2000}
- localsgs[interfaces.GroupId(1008888)] = localGenInfo{Gen: 25890, CheckptGen: 100}
+ localsgs := make(map[string]localGenInfo)
+ localsgs["8888"] = localGenInfo{Gen: 56, CheckptGen: 2000}
+ localsgs["1008888"] = localGenInfo{Gen: 25890, CheckptGen: 100}
tx := st.NewTransaction()
wantSt := &dbSyncState{
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 2fb5479..98e0e84 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
import (
"fmt"
+ "strconv"
"strings"
"time"
@@ -212,28 +213,30 @@
version = newSyncGroupVersion()
}
+ oid := sgOID(sg.Id)
+
// Add the SyncGroup versioned data entry.
- if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+ if err := setSGDataEntryByOID(ctx, tx, oid, version, sg); err != nil {
return err
}
- // Add a sync log record for the SyncGroup if needed.
- oid := sgIdKey(sg.Id)
- logKey := ""
- if withLog {
- if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
- return err
- }
- logKey = logRecKey(oid, servId, gen)
- }
-
- // Add the SyncGroup to the DAG.
var parents []string
if head, err := getHead(ctx, tx, oid); err == nil {
parents = []string{head}
} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
}
+
+ // Add a sync log record for the SyncGroup if needed.
+ logKey := ""
+ if withLog {
+ if err := addSyncGroupLogRec(ctx, tx, oid, version, parents, servId, gen, pos); err != nil {
+ return err
+ }
+ logKey = logRecKey(oid, servId, gen)
+ }
+
+ // Add the SyncGroup to the DAG.
if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
return err
}
@@ -241,12 +244,12 @@
}
// addSyncGroupLogRec adds a new local log record for a SyncGroup.
-func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
- oid := sgIdKey(gid)
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, oid, version string, parents []string, servId, gen, pos uint64) error {
rec := &localLogRec{
Metadata: interfaces.LogRecMetadata{
ObjId: oid,
CurVers: version,
+ Parents: parents,
Delete: false,
UpdTime: watchable.GetStoreTime(ctx, tx),
Id: servId,
@@ -267,7 +270,7 @@
// getSyncGroupVersion retrieves the current version of the SyncGroup.
func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
- return getHead(ctx, st, sgIdKey(gid))
+ return getHead(ctx, st, sgOID(gid))
}
// getSyncGroupById retrieves the SyncGroup given its ID.
@@ -321,7 +324,7 @@
// nodes). This is done separately from pruning the DAG nodes because
// some nodes may have no log record pointing back to the SyncGroup data
// entries (loose coupling to support the pending SyncGroup state).
- oid := sgIdKey(gid)
+ oid := sgOID(gid)
err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
return delSGDataEntry(ctx, tx, gid, v)
})
@@ -491,21 +494,11 @@
// Note: as with other syncable objects, the DAG "heads" table contains
// a reference to the current SyncGroup version, and the DAG "nodes"
// table tracks its history of mutations.
- // TODO(rdaoud): change the data key prefix to use the SG OID instead
- // of its ID, to be similar to the versioned user data keys. The OID
- // would use another SG-data prefix: "$sync:sgd:<gid>" and the data
- // entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
- sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+ sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgDataPrefix)
)
-// sgIdStr returns the SyncGroup ID in string format.
-// TODO(rdaoud): delete when the SG ID becomes a string throughout.
-func sgIdStr(gid interfaces.GroupId) string {
- return fmt.Sprintf("%d", uint64(gid))
-}
-
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
return util.JoinKeyParts(sgNameKeyPrefix, name)
@@ -516,11 +509,35 @@
return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
}
+// sgOID converts a group id into an oid string.
+func sgOID(gid interfaces.GroupId) string {
+ return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgID is the inverse of sgOID and converts an oid string into a group id.
+func sgID(oid string) (interfaces.GroupId, error) {
+ parts := util.SplitKeyParts(oid)
+ if len(parts) != 3 {
+ return 0, fmt.Errorf("invalid sgoid %s", oid)
+ }
+
+ id, err := strconv.ParseUint(parts[2], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ return interfaces.GroupId(id), nil
+}
+
// sgDataKey returns the key used to access a version of the SyncGroup data.
func sgDataKey(gid interfaces.GroupId, version string) string {
return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
}
+// sgDataKeyByOID returns the key used to access a version of the SyncGroup data.
+func sgDataKeyByOID(oid, version string) string {
+ return util.JoinKeyParts(oid, version)
+}
+
// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
func splitSgNameKey(ctx *context.T, key string) (string, error) {
// Note that the actual SyncGroup name may contain ":" as a separator.
@@ -558,9 +575,9 @@
return util.Put(ctx, tx, sgIdKey(gid), state)
}
-// setSGDataEntry stores the SyncGroup versioned data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
- return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+// setSGDataEntryByOID stores the SyncGroup versioned data entry.
+func setSGDataEntryByOID(ctx *context.T, tx store.Transaction, sgoid, version string, sg *interfaces.SyncGroup) error {
+ return util.Put(ctx, tx, sgDataKeyByOID(sgoid, version), sg)
}
// getSGNameEntry retrieves the SyncGroup ID for a given name.
@@ -590,6 +607,15 @@
return &sg, nil
}
+// getSGDataEntryByOID retrieves the SyncGroup data for a given group OID and version.
+func getSGDataEntryByOID(ctx *context.T, st store.StoreReader, sgoid string, version string) (*interfaces.SyncGroup, error) {
+ var sg interfaces.SyncGroup
+ if err := util.Get(ctx, st, sgDataKeyByOID(sgoid, version), &sg); err != nil {
+ return nil, err
+ }
+ return &sg, nil
+}
+
// delSGNameEntry deletes the SyncGroup name entry.
func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
return util.Delete(ctx, tx, sgNameKey(name))
@@ -642,8 +668,7 @@
// has Admin privilege.
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
return err
@@ -657,7 +682,7 @@
return err
}
- ss.initSyncStateInMem(ctx, appName, dbName, gid)
+ ss.initSyncStateInMem(ctx, appName, dbName, sgOID(gid))
// Local SG create succeeded. Publish the SG at the chosen server, or if
// that fails, enqueue it for later publish retries.
@@ -780,7 +805,7 @@
return nullSpec, err
}
- ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
+ ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sgOID(sg2.Id))
// Publish at the chosen mount table and in the neighborhood.
sd.publishInMountTables(ctx, call, sg2.Spec)
@@ -878,7 +903,7 @@
}
ss := sd.sync.(*syncService)
- //appName, dbName := sd.db.App().Name(), sd.db.Name()
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
@@ -909,8 +934,7 @@
}
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(sg.Id), 1)
// TODO(hpucha): Check SyncGroup ACL.
@@ -935,7 +959,7 @@
func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
st := sd.db.St()
ss := sd.sync.(*syncService)
- //appName, dbName := sd.db.App().Name(), sd.db.Name()
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
gid, err := getSyncGroupId(ctx, st, sgName)
if err != nil {
@@ -959,11 +983,18 @@
// and joiner list of the SyncGroup get updated. This is functionally
// correct, just not symmetrical with what happens at joiner, which
// receives the SyncGroup state post-join.
- // TODO(rdaoud): send the SyncGroup genvec to the remote peer.
status := interfaces.SyncGroupStatusPublishRejected
+ sgs := sgSet{gid: struct{}{}}
+ gv, _, err := ss.copyDbGenInfo(ctx, appName, dbName, sgs)
+ if err != nil {
+ return err
+ }
+ // TODO(hpucha): Do we want to pick the head version corresponding to
+ // the local gen of the sg? It appears that it shouldn't matter.
+
c := interfaces.SyncClient(sgName)
- peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
+ peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, gv[sgOID(gid)])
if err == nil {
status = interfaces.SyncGroupStatusRunning
@@ -1007,8 +1038,7 @@
// Reserve a log generation and position counts for the new
// SyncGroup version.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
sg.Status = status
if status == interfaces.SyncGroupStatusRunning {
@@ -1176,12 +1206,15 @@
})
if err == nil {
- s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+ s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sgOID(sg.Id))
}
return s.name, err
}
func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
+ vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: begin: %s from peer %s", sgName, joinerName)
+ defer vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: end: %s from peer %s", sgName, joinerName)
+
var dbSt store.Store
var gid interfaces.GroupId
var err error
@@ -1212,6 +1245,7 @@
version := newSyncGroupVersion()
var sg *interfaces.SyncGroup
+ var gen, pos uint64
err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
var err error
@@ -1235,8 +1269,7 @@
}
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos = s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgOID(gid), 1)
// Add to joiner list.
sg.Joiners[joinerName] = joinerInfo
@@ -1246,6 +1279,17 @@
if err != nil {
return nullSG, "", nullGV, err
}
- // TODO(rdaoud): return the SyncGroup genvec
- return *sg, version, nullGV, nil
+
+ sgs := sgSet{gid: struct{}{}}
+ gv, _, err := s.copyDbGenInfo(ctx, stAppName, stDbName, sgs)
+ if err != nil {
+ return nullSG, "", nullGV, err
+ }
+ // The retrieved genvector does not contain the mutation that adds the
+ // joiner to the list since initiator is the one checkpointing the
+ // generations. Add that generation to this genvector.
+ gv[sgOID(gid)][s.id] = gen
+
+ vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgOID(gid)])
+ return *sg, version, gv[sgOID(gid)], nil
}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 820dec6..054eb20 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -15,7 +15,8 @@
logDataPrefix = "data" // data log state.
dbssPrefix = "dbss" // database sync state.
dagPrefix = "dag" // dag state.
- sgPrefix = "sg" // syncgroup state.
+ sgPrefix = "sg" // local syncgroup state.
+ sgDataPrefix = "sgd" // synced syncgroup state.
)
// syncData represents the persistent state of the sync module.
@@ -32,7 +33,7 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
Data localGenInfo
- Sgs map[interfaces.GroupId]localGenInfo
+ Sgs map[string]localGenInfo
GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index e352de4..3726fb0 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -39,7 +39,7 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
Data localGenInfo
- Sgs map[interfaces.GroupId]localGenInfo
+ Sgs map[string]localGenInfo
GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
}
@@ -156,4 +156,6 @@
const dagPrefix = "dag" // dag state.
-const sgPrefix = "sg" // syncgroup state.
+const sgPrefix = "sg" // local syncgroup state.
+
+const sgDataPrefix = "sgd" // synced syncgroup state.
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 9c16294..5b8b469 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -120,7 +120,7 @@
}
// Initialize Database sync state if needed.
- s.initSyncStateInMem(ctx, appName, dbName, interfaces.NoGroupId)
+ s.initSyncStateInMem(ctx, appName, dbName, "")
// Get a batch of watch log entries, if any, after this resume marker.
logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)