syncbase/vsync: Integrate SyncGroup syncing, and add e2e tests.
MultiPart: 2/2
Change-Id: Ie3e669bb0357fe42b8853e29dd4f08f0eac92a15
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 2fb5479..98e0e84 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
import (
"fmt"
+ "strconv"
"strings"
"time"
@@ -212,28 +213,30 @@
version = newSyncGroupVersion()
}
+ oid := sgOID(sg.Id)
+
// Add the SyncGroup versioned data entry.
- if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+ if err := setSGDataEntryByOID(ctx, tx, oid, version, sg); err != nil {
return err
}
- // Add a sync log record for the SyncGroup if needed.
- oid := sgIdKey(sg.Id)
- logKey := ""
- if withLog {
- if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
- return err
- }
- logKey = logRecKey(oid, servId, gen)
- }
-
- // Add the SyncGroup to the DAG.
var parents []string
if head, err := getHead(ctx, tx, oid); err == nil {
parents = []string{head}
} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
}
+
+ // Add a sync log record for the SyncGroup if needed.
+ logKey := ""
+ if withLog {
+ if err := addSyncGroupLogRec(ctx, tx, oid, version, parents, servId, gen, pos); err != nil {
+ return err
+ }
+ logKey = logRecKey(oid, servId, gen)
+ }
+
+ // Add the SyncGroup to the DAG.
if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
return err
}
@@ -241,12 +244,12 @@
}
// addSyncGroupLogRec adds a new local log record for a SyncGroup.
-func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
- oid := sgIdKey(gid)
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, oid, version string, parents []string, servId, gen, pos uint64) error {
rec := &localLogRec{
Metadata: interfaces.LogRecMetadata{
ObjId: oid,
CurVers: version,
+ Parents: parents,
Delete: false,
UpdTime: watchable.GetStoreTime(ctx, tx),
Id: servId,
@@ -267,7 +270,7 @@
// getSyncGroupVersion retrieves the current version of the SyncGroup.
func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
- return getHead(ctx, st, sgIdKey(gid))
+ return getHead(ctx, st, sgOID(gid))
}
// getSyncGroupById retrieves the SyncGroup given its ID.
@@ -321,7 +324,7 @@
// nodes). This is done separately from pruning the DAG nodes because
// some nodes may have no log record pointing back to the SyncGroup data
// entries (loose coupling to support the pending SyncGroup state).
- oid := sgIdKey(gid)
+ oid := sgOID(gid)
err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
return delSGDataEntry(ctx, tx, gid, v)
})
@@ -491,21 +494,11 @@
// Note: as with other syncable objects, the DAG "heads" table contains
// a reference to the current SyncGroup version, and the DAG "nodes"
// table tracks its history of mutations.
- // TODO(rdaoud): change the data key prefix to use the SG OID instead
- // of its ID, to be similar to the versioned user data keys. The OID
- // would use another SG-data prefix: "$sync:sgd:<gid>" and the data
- // entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
- sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+ sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgDataPrefix)
)
-// sgIdStr returns the SyncGroup ID in string format.
-// TODO(rdaoud): delete when the SG ID becomes a string throughout.
-func sgIdStr(gid interfaces.GroupId) string {
- return fmt.Sprintf("%d", uint64(gid))
-}
-
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
return util.JoinKeyParts(sgNameKeyPrefix, name)
@@ -516,11 +509,35 @@
return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
}
+// sgOID converts a group id into an oid string.
+func sgOID(gid interfaces.GroupId) string {
+ return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgID is the inverse of sgOID and converts an oid string into a group id.
+func sgID(oid string) (interfaces.GroupId, error) {
+ parts := util.SplitKeyParts(oid)
+ if len(parts) != 3 {
+ return 0, fmt.Errorf("invalid sgoid %s", oid)
+ }
+
+ id, err := strconv.ParseUint(parts[2], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ return interfaces.GroupId(id), nil
+}
+
// sgDataKey returns the key used to access a version of the SyncGroup data.
func sgDataKey(gid interfaces.GroupId, version string) string {
return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
}
+// sgDataKeyByOID returns the key used to access a version of the SyncGroup data.
+func sgDataKeyByOID(oid, version string) string {
+ return util.JoinKeyParts(oid, version)
+}
+
// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
func splitSgNameKey(ctx *context.T, key string) (string, error) {
// Note that the actual SyncGroup name may contain ":" as a separator.
@@ -558,9 +575,9 @@
return util.Put(ctx, tx, sgIdKey(gid), state)
}
-// setSGDataEntry stores the SyncGroup versioned data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
- return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+// setSGDataEntryByOID stores the SyncGroup versioned data entry.
+func setSGDataEntryByOID(ctx *context.T, tx store.Transaction, sgoid, version string, sg *interfaces.SyncGroup) error {
+ return util.Put(ctx, tx, sgDataKeyByOID(sgoid, version), sg)
}
// getSGNameEntry retrieves the SyncGroup ID for a given name.
@@ -590,6 +607,15 @@
return &sg, nil
}
+// getSGDataEntryByOID retrieves the SyncGroup data for a given group OID and version.
+func getSGDataEntryByOID(ctx *context.T, st store.StoreReader, sgoid string, version string) (*interfaces.SyncGroup, error) {
+ var sg interfaces.SyncGroup
+ if err := util.Get(ctx, st, sgDataKeyByOID(sgoid, version), &sg); err != nil {
+ return nil, err
+ }
+ return &sg, nil
+}
+
// delSGNameEntry deletes the SyncGroup name entry.
func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
return util.Delete(ctx, tx, sgNameKey(name))
@@ -642,8 +668,7 @@
// has Admin privilege.
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
return err
@@ -657,7 +682,7 @@
return err
}
- ss.initSyncStateInMem(ctx, appName, dbName, gid)
+ ss.initSyncStateInMem(ctx, appName, dbName, sgOID(gid))
// Local SG create succeeded. Publish the SG at the chosen server, or if
// that fails, enqueue it for later publish retries.
@@ -780,7 +805,7 @@
return nullSpec, err
}
- ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
+ ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sgOID(sg2.Id))
// Publish at the chosen mount table and in the neighborhood.
sd.publishInMountTables(ctx, call, sg2.Spec)
@@ -878,7 +903,7 @@
}
ss := sd.sync.(*syncService)
- //appName, dbName := sd.db.App().Name(), sd.db.Name()
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
@@ -909,8 +934,7 @@
}
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(sg.Id), 1)
// TODO(hpucha): Check SyncGroup ACL.
@@ -935,7 +959,7 @@
func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
st := sd.db.St()
ss := sd.sync.(*syncService)
- //appName, dbName := sd.db.App().Name(), sd.db.Name()
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
gid, err := getSyncGroupId(ctx, st, sgName)
if err != nil {
@@ -959,11 +983,18 @@
// and joiner list of the SyncGroup get updated. This is functionally
// correct, just not symmetrical with what happens at joiner, which
// receives the SyncGroup state post-join.
- // TODO(rdaoud): send the SyncGroup genvec to the remote peer.
status := interfaces.SyncGroupStatusPublishRejected
+ sgs := sgSet{gid: struct{}{}}
+ gv, _, err := ss.copyDbGenInfo(ctx, appName, dbName, sgs)
+ if err != nil {
+ return err
+ }
+ // TODO(hpucha): Do we want to pick the head version corresponding to
+ // the local gen of the sg? It appears that it shouldn't matter.
+
c := interfaces.SyncClient(sgName)
- peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
+ peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, gv[sgOID(gid)])
if err == nil {
status = interfaces.SyncGroupStatusRunning
@@ -1007,8 +1038,7 @@
// Reserve a log generation and position counts for the new
// SyncGroup version.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
sg.Status = status
if status == interfaces.SyncGroupStatusRunning {
@@ -1176,12 +1206,15 @@
})
if err == nil {
- s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+ s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sgOID(sg.Id))
}
return s.name, err
}
func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
+ vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: begin: %s from peer %s", sgName, joinerName)
+ defer vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: end: %s from peer %s", sgName, joinerName)
+
var dbSt store.Store
var gid interfaces.GroupId
var err error
@@ -1212,6 +1245,7 @@
version := newSyncGroupVersion()
var sg *interfaces.SyncGroup
+ var gen, pos uint64
err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
var err error
@@ -1235,8 +1269,7 @@
}
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos = s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgOID(gid), 1)
// Add to joiner list.
sg.Joiners[joinerName] = joinerInfo
@@ -1246,6 +1279,17 @@
if err != nil {
return nullSG, "", nullGV, err
}
- // TODO(rdaoud): return the SyncGroup genvec
- return *sg, version, nullGV, nil
+
+ sgs := sgSet{gid: struct{}{}}
+ gv, _, err := s.copyDbGenInfo(ctx, stAppName, stDbName, sgs)
+ if err != nil {
+ return nullSG, "", nullGV, err
+ }
+ // The retrieved genvector does not contain the mutation that adds the
+ // joiner to the list since initiator is the one checkpointing the
+ // generations. Add that generation to this genvector.
+ gv[sgOID(gid)][s.id] = gen
+
+ vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgOID(gid)])
+ return *sg, version, gv[sgOID(gid)], nil
}