syncbase/vsync: SyncGroup and DAG changes for SG syncing
* Change SyncGroup storage to support their versioning and being tracked
in DAG and log records.
* Update the SyncGroup join & publish calls to support the asynchronous
transition from known-but-pending SyncGroup to caught-up after sync.
* Add a SyncGroup local state info to track the number of local peer
joiners, and the watchability, remote publishing, and sync pending
states of the SyncGroup.
* Add the retry loop for the SyncGroup publishing to the remote peer.
* Improve error checking of SyncGroup prefixes passed in the Spec.
* Add a 1st-cut HasKey() store util API to bypass the VOM decode. This
will be later further optimized for leveldb to avoid a copy of the
value from the C buffer to the Go buffer.
* Add DAG support to fully prune all nodes for an object including its
current head node.
* Remove temporay solution of the responder adding the initiator to the
SyncGroup joiner list.
* Improve unittest coverage.
Change-Id: I4585e248bd9e15b8b9585ec7b1830f8225686b2a
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 91ba499..2fb5479 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -81,6 +81,7 @@
}
// verifySyncGroup verifies if a SyncGroup struct is well-formed.
+// TODO(rdaoud): define verrors for all ErrBadArg cases.
func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
if sg == nil {
return verror.New(verror.ErrBadArg, ctx, "group information not specified")
@@ -106,40 +107,157 @@
if len(sg.Joiners) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
}
- if len(sg.Spec.Prefixes) == 0 {
+ return verifySyncGroupSpec(ctx, &sg.Spec)
+}
+
+// verifySyncGroupSpec verifies if a SyncGroupSpec is well-formed.
+func verifySyncGroupSpec(ctx *context.T, spec *wire.SyncGroupSpec) error {
+ if spec == nil {
+ return verror.New(verror.ErrBadArg, ctx, "group spec not specified")
+ }
+ if len(spec.Prefixes) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
}
+
+ // Duplicate prefixes are not allowed.
+ prefixes := make(map[string]bool, len(spec.Prefixes))
+ for _, pfx := range spec.Prefixes {
+ prefixes[pfx] = true
+ }
+ if len(prefixes) != len(spec.Prefixes) {
+ return verror.New(verror.ErrBadArg, ctx, "group has duplicate prefixes specified")
+ }
return nil
}
-// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
- // Verify SyncGroup before storing it since it may have been received
- // from a remote peer.
+// samePrefixes returns true if the two sets of prefixes are the same.
+func samePrefixes(pfx1, pfx2 []string) bool {
+ pfxMap := make(map[string]uint8)
+ for _, p := range pfx1 {
+ pfxMap[p] |= 0x01
+ }
+ for _, p := range pfx2 {
+ pfxMap[p] |= 0x02
+ }
+ for _, mask := range pfxMap {
+ if mask != 0x03 {
+ return false
+ }
+ }
+ return true
+}
+
+// addSyncGroup adds a new SyncGroup given its version and information. This
+// also includes creating a DAG node entry and updating the DAG head. If the
+// caller is the creator of the SyncGroup, a local log record is also created
+// using the given server ID and gen and pos counters to index the log record.
+// Otherwise, it's a joiner case and the SyncGroup is put in a pending state
+// (waiting for its full metadata to be synchronized) and the log record is
+// skipped, delaying its creation till the Initiator does p2p sync.
+func (s *syncService) addSyncGroup(ctx *context.T, tx store.Transaction, version string, creator bool, remotePublisher string, genvec interfaces.PrefixGenVector, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+ // Verify the SyncGroup information before storing it since it may have
+ // been received from a remote peer.
if err := verifySyncGroup(ctx, sg); err != nil {
return err
}
- if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
- return err
- } else if ok {
- return verror.New(verror.ErrExist, ctx, "group id already exists")
- }
+ // Add the group name and ID entries.
if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
return err
} else if ok {
return verror.New(verror.ErrExist, ctx, "group name already exists")
}
+ if ok, err := hasSGIdEntry(tx, sg.Id); err != nil {
+ return err
+ } else if ok {
+ return verror.New(verror.ErrExist, ctx, "group id already exists")
+ }
- // Add the group name and data entries.
+ state := sgLocalState{
+ RemotePublisher: remotePublisher,
+ SyncPending: !creator,
+ PendingGenVec: genvec,
+ }
+ if remotePublisher == "" {
+ state.NumLocalJoiners = 1
+ }
+
if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
return err
}
- if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
+ if err := setSGIdEntry(ctx, tx, sg.Id, &state); err != nil {
return err
}
- return nil
+ // Add the SyncGroup versioned data entry.
+ if ok, err := hasSGDataEntry(tx, sg.Id, version); err != nil {
+ return err
+ } else if ok {
+ return verror.New(verror.ErrExist, ctx, "group id version already exists")
+ }
+
+ return s.updateSyncGroupVersioning(ctx, tx, version, creator, servId, gen, pos, sg)
+}
+
+// updateSyncGroupVersioning updates the per-version information of a SyncGroup.
+// It writes a new versioned copy of the SyncGroup data entry, a new DAG node,
+// and updates the DAG head. Optionally, it also writes a new local log record
+// using the given server ID and gen and pos counters to index it. The caller
+// can provide the version number to use otherwise, if NoVersion is given, a new
+// version is generated by the function.
+// TODO(rdaoud): hook SyncGroup mutations (and deletions) to the watch log so
+// apps can monitor SG changes as well.
+func (s *syncService) updateSyncGroupVersioning(ctx *context.T, tx store.Transaction, version string, withLog bool, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+ if version == NoVersion {
+ version = newSyncGroupVersion()
+ }
+
+ // Add the SyncGroup versioned data entry.
+ if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+ return err
+ }
+
+ // Add a sync log record for the SyncGroup if needed.
+ oid := sgIdKey(sg.Id)
+ logKey := ""
+ if withLog {
+ if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
+ return err
+ }
+ logKey = logRecKey(oid, servId, gen)
+ }
+
+ // Add the SyncGroup to the DAG.
+ var parents []string
+ if head, err := getHead(ctx, tx, oid); err == nil {
+ parents = []string{head}
+ } else if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return err
+ }
+ if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
+ return err
+ }
+ return setHead(ctx, tx, oid, version)
+}
+
+// addSyncGroupLogRec adds a new local log record for a SyncGroup.
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
+ oid := sgIdKey(gid)
+ rec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{
+ ObjId: oid,
+ CurVers: version,
+ Delete: false,
+ UpdTime: watchable.GetStoreTime(ctx, tx),
+ Id: servId,
+ Gen: gen,
+ RecType: interfaces.NodeRec,
+ BatchId: NoBatchId,
+ },
+ Pos: pos,
+ }
+
+ return putLogRec(ctx, tx, oid, rec)
}
// getSyncGroupId retrieves the SyncGroup ID given its name.
@@ -147,18 +265,18 @@
return getSGNameEntry(ctx, st, name)
}
-// getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
- sg, err := getSyncGroupById(ctx, st, gid)
- if err != nil {
- return "", err
- }
- return sg.Name, nil
+// getSyncGroupVersion retrieves the current version of the SyncGroup.
+func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
+ return getHead(ctx, st, sgIdKey(gid))
}
// getSyncGroupById retrieves the SyncGroup given its ID.
func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
- return getSGDataEntry(ctx, st, gid)
+ version, err := getSyncGroupVersion(ctx, st, gid)
+ if err != nil {
+ return nil, err
+ }
+ return getSGDataEntry(ctx, st, gid, version)
}
// getSyncGroupByName retrieves the SyncGroup given its name.
@@ -176,19 +294,53 @@
if err != nil {
return err
}
- if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
- return err
- }
- return delSGDataEntry(ctx, tx, sg.Id)
+ return delSyncGroupByName(ctx, tx, sg.Name)
}
// delSyncGroupByName deletes the SyncGroup given its name.
func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
+ // Get the SyncGroup ID and current version.
gid, err := getSyncGroupId(ctx, tx, name)
if err != nil {
return err
}
- return delSyncGroupById(ctx, tx, gid)
+ version, err := getSyncGroupVersion(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ // Delete the name and ID entries.
+ if err := delSGNameEntry(ctx, tx, name); err != nil {
+ return err
+ }
+ if err := delSGIdEntry(ctx, tx, gid); err != nil {
+ return err
+ }
+
+ // Delete all versioned SyncGroup data entries (same versions as DAG
+ // nodes). This is done separately from pruning the DAG nodes because
+ // some nodes may have no log record pointing back to the SyncGroup data
+ // entries (loose coupling to support the pending SyncGroup state).
+ oid := sgIdKey(gid)
+ err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
+ return delSGDataEntry(ctx, tx, gid, v)
+ })
+ if err != nil {
+ return err
+ }
+
+ // Delete all DAG nodes and log records.
+ bset := newBatchPruning()
+ err = prune(ctx, tx, oid, NoVersion, bset, func(ctx *context.T, tx store.Transaction, lr string) error {
+ if lr != "" {
+ return util.Delete(ctx, tx, lr)
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ return pruneDone(ctx, tx, bset)
}
// refreshMembersIfExpired updates the aggregate view of SyncGroup members
@@ -252,16 +404,23 @@
// make forEachSyncGroup() stop the iteration earlier; otherwise the function
// loops across all SyncGroups in the Database.
func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
- scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
- stream := st.Scan(scanStart, scanLimit)
+ stream := st.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ defer stream.Cancel()
+
for stream.Advance() {
- var sg interfaces.SyncGroup
- if vom.Decode(stream.Value(nil), &sg) != nil {
- vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
+ var gid interfaces.GroupId
+ if vom.Decode(stream.Value(nil), &gid) != nil {
+ vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup ID for key %s", string(stream.Key(nil)))
continue
}
- if callback(&sg) {
+ sg, err := getSyncGroupById(nil, st, gid)
+ if err != nil {
+ vlog.Errorf("sync: forEachSyncGroup: cannot get SyncGroup %d: %v", gid, err)
+ continue
+ }
+
+ if callback(sg) {
break // done, early exit
}
}
@@ -324,63 +483,69 @@
// Use the functions above to manipulate SyncGroups.
var (
- // sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
- sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-
- // sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
- sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+ // Prefixes used to store the different mappings of a SyncGroup:
+ // sgNameKeyPrefix: name --> ID
+ // sgIdKeyPrefix: ID --> SyncGroup local state
+ // sgDataKeyPrefix: (ID, version) --> SyncGroup data (synchronized)
+ //
+ // Note: as with other syncable objects, the DAG "heads" table contains
+ // a reference to the current SyncGroup version, and the DAG "nodes"
+ // table tracks its history of mutations.
+ // TODO(rdaoud): change the data key prefix to use the SG OID instead
+ // of its ID, to be similar to the versioned user data keys. The OID
+ // would use another SG-data prefix: "$sync:sgd:<gid>" and the data
+ // entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
+ sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+ sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
+ sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
)
-// sgDataKey returns the key used to access the SyncGroup data entry.
-func sgDataKey(gid interfaces.GroupId) string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
+// sgIdStr returns the SyncGroup ID in string format.
+// TODO(rdaoud): delete when the SG ID becomes a string throughout.
+func sgIdStr(gid interfaces.GroupId) string {
+ return fmt.Sprintf("%d", uint64(gid))
}
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
+ return util.JoinKeyParts(sgNameKeyPrefix, name)
+}
+
+// sgIdKey returns the key used to access the SyncGroup ID entry.
+func sgIdKey(gid interfaces.GroupId) string {
+ return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgDataKey returns the key used to access a version of the SyncGroup data.
+func sgDataKey(gid interfaces.GroupId, version string) string {
+ return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
}
// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
func splitSgNameKey(ctx *context.T, key string) (string, error) {
- prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
-
// Note that the actual SyncGroup name may contain ":" as a separator.
- if !strings.HasPrefix(key, prefix) {
+ // So don't split the key on the separator, instead trim its prefix.
+ prefix := util.JoinKeyParts(sgNameKeyPrefix, "")
+ name := strings.TrimPrefix(key, prefix)
+ if name == key {
return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
}
- return strings.TrimPrefix(key, prefix), nil
-}
-
-// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var sg interfaces.SyncGroup
- if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return name, nil
}
// hasSGNameEntry returns true if the SyncGroup name entry exists.
func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var gid interfaces.GroupId
- if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return util.Exists(nil, sntx, sgNameKey(name))
}
-// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
- return util.Put(ctx, tx, sgDataKey(gid), sg)
+// hasSGIdEntry returns true if the SyncGroup ID entry exists.
+func hasSGIdEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
+ return util.Exists(nil, sntx, sgIdKey(gid))
+}
+
+// hasSGDataEntry returns true if the SyncGroup versioned data entry exists.
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId, version string) (bool, error) {
+ return util.Exists(nil, sntx, sgDataKey(gid, version))
}
// setSGNameEntry stores the SyncGroup name entry.
@@ -388,34 +553,58 @@
return util.Put(ctx, tx, sgNameKey(name), gid)
}
-// getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
+// setSGIdEntry stores the SyncGroup ID entry.
+func setSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, state *sgLocalState) error {
+ return util.Put(ctx, tx, sgIdKey(gid), state)
+}
+
+// setSGDataEntry stores the SyncGroup versioned data entry.
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
+ return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+}
+
+// getSGNameEntry retrieves the SyncGroup ID for a given name.
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
+ var gid interfaces.GroupId
+ if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
+ return interfaces.NoGroupId, err
+ }
+ return gid, nil
+}
+
+// getSGIdEntry retrieves the SyncGroup local state for a given group ID.
+func getSGIdEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*sgLocalState, error) {
+ var state sgLocalState
+ if err := util.Get(ctx, st, sgIdKey(gid), &state); err != nil {
+ return nil, err
+ }
+ return &state, nil
+}
+
+// getSGDataEntry retrieves the SyncGroup data for a given group ID and version.
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId, version string) (*interfaces.SyncGroup, error) {
var sg interfaces.SyncGroup
- if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil {
+ if err := util.Get(ctx, st, sgDataKey(gid, version), &sg); err != nil {
return nil, err
}
return &sg, nil
}
-// getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
- var gid interfaces.GroupId
- if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
- return gid, err
- }
- return gid, nil
-}
-
-// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
- return util.Delete(ctx, tx, sgDataKey(gid))
-}
-
-// delSGNameEntry deletes the SyncGroup name to ID mapping.
+// delSGNameEntry deletes the SyncGroup name entry.
func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
return util.Delete(ctx, tx, sgNameKey(name))
}
+// delSGIdEntry deletes the SyncGroup ID entry.
+func delSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
+ return util.Delete(ctx, tx, sgIdKey(gid))
+}
+
+// delSGDataEntry deletes the SyncGroup versioned data entry.
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string) error {
+ return util.Delete(ctx, tx, sgDataKey(gid, version))
+}
+
////////////////////////////////////////////////////////////
// SyncGroup methods between Client and Syncbase.
@@ -424,9 +613,22 @@
vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
- // Get this Syncbase's sync module handle.
ss := sd.sync.(*syncService)
- var sg *interfaces.SyncGroup
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+ // Instantiate sg. Add self as joiner.
+ gid, version := newSyncGroupId(), newSyncGroupVersion()
+ sg := &interfaces.SyncGroup{
+ Id: gid,
+ Name: sgName,
+ SpecVersion: version,
+ Spec: spec,
+ Creator: ss.name,
+ AppName: appName,
+ DbName: dbName,
+ Status: interfaces.SyncGroupStatusPublishPending,
+ Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
+ }
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
@@ -436,29 +638,17 @@
// TODO(hpucha): Check prefix ACLs on all SG prefixes.
// This may need another method on util.Database interface.
-
// TODO(hpucha): Do some SG ACL checking. Check creator
// has Admin privilege.
- // Instantiate sg. Add self as joiner.
- sg = &interfaces.SyncGroup{
- Id: newSyncGroupId(),
- Name: sgName,
- SpecVersion: newSyncGroupVersion(),
- Spec: spec,
- Creator: ss.name,
- AppName: sd.db.App().Name(),
- DbName: sd.db.Name(),
- Status: interfaces.SyncGroupStatusPublishPending,
- Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
- }
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
- if err := addSyncGroup(ctx, tx, sg); err != nil {
+ if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
return err
}
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
// Take a snapshot of the data to bootstrap the SyncGroup.
return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
})
@@ -467,9 +657,13 @@
return err
}
- ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
- // Local SG create succeeded. Publish the SG at the chosen server.
- sd.publishSyncGroup(ctx, call, sgName)
+ ss.initSyncStateInMem(ctx, appName, dbName, gid)
+
+ // Local SG create succeeded. Publish the SG at the chosen server, or if
+ // that fails, enqueue it for later publish retries.
+ if err := sd.publishSyncGroup(ctx, call, sgName); err != nil {
+ ss.enqueuePublishSyncGroup(sgName, appName, dbName, true)
+ }
// Publish at the chosen mount table and in the neighborhood.
sd.publishInMountTables(ctx, call, spec)
@@ -492,24 +686,54 @@
return err
}
- // Check if SyncGroup already exists.
- sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
+ // Check if SyncGroup already exists and get its info.
+ var gid interfaces.GroupId
+ gid, sgErr = getSyncGroupId(ctx, tx, sgName)
if sgErr != nil {
return sgErr
}
- // SyncGroup already exists. Possibilities include created
- // locally, already joined locally or published at the device as
- // a result of SyncGroup creation on a different device.
- //
- // TODO(hpucha): Handle the above cases. If the SG was published
- // locally, but not joined, we need to bootstrap the DAG and
- // watcher. If multiple joins are done locally, we may want to
- // ref count the SG state and track the leaves accordingly. So
- // we may need to add some local state for each SyncGroup.
+ sg, sgErr = getSyncGroupById(ctx, tx, gid)
+ if sgErr != nil {
+ return sgErr
+ }
// Check SG ACL.
- return authorize(ctx, call.Security(), sg)
+ if err := authorize(ctx, call.Security(), sg); err != nil {
+ return err
+ }
+
+ // SyncGroup already exists, increment the number of local
+ // joiners in its local state information. This presents
+ // different scenarios:
+ // 1- An additional local joiner: the current number of local
+ // joiners is > 0 and the SyncGroup was already bootstrapped
+ // to the Watcher, so there is nothing else to do.
+ // 2- A new local joiner after all previous local joiners had
+ // left: the number of local joiners is 0, the Watcher must
+ // be re-notified via a SyncGroup bootstrap because the last
+ // previous joiner to leave had un-notified the Watcher. In
+ // this scenario the SyncGroup was not destroyed after the
+ // last joiner left because the SyncGroup was also published
+ // here by a remote peer and thus cannot be destroyed only
+ // based on the local joiners.
+ // 3- A first local joiner for a SyncGroup that was published
+ // here from a remote Syncbase: the number of local joiners
+ // is also 0 (and the remote publish flag is set), and the
+ // Watcher must be notified via a SyncGroup bootstrap.
+ // Conclusion: bootstrap if the number of local joiners is 0.
+ sgState, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ if sgState.NumLocalJoiners == 0 {
+ if err := sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes); err != nil {
+ return err
+ }
+ }
+ sgState.NumLocalJoiners++
+ return setSGIdEntry(ctx, tx, gid, sgState)
})
// The presented blessing is allowed to make this Syncbase instance join
@@ -532,48 +756,41 @@
ss := sd.sync.(*syncService)
// Contact a SyncGroup Admin to join the SyncGroup.
- sg = &interfaces.SyncGroup{}
- *sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
+ sg2, version, genvec, err := sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
if err != nil {
return nullSpec, err
}
// Verify that the app/db combination is valid for this SyncGroup.
- if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
+ if sg2.AppName != appName || sg2.DbName != dbName {
return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
}
err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
-
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
- // TODO(hpucha): Get SG Deltas from Admin device.
-
- if err := addSyncGroup(ctx, tx, sg); err != nil {
+ if err := ss.addSyncGroup(ctx, tx, version, false, "", genvec, 0, 0, 0, &sg2); err != nil {
return err
}
// Take a snapshot of the data to bootstrap the SyncGroup.
- return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
+ return sd.bootstrapSyncGroup(ctx, tx, sg2.Spec.Prefixes)
})
if err != nil {
return nullSpec, err
}
- ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+ ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
// Publish at the chosen mount table and in the neighborhood.
- sd.publishInMountTables(ctx, call, sg.Spec)
+ sd.publishInMountTables(ctx, call, sg2.Spec)
- return sg.Spec, nil
+ return sg2.Spec, nil
}
func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
- var sgNames []string
-
vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
- defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end")
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
@@ -584,8 +801,8 @@
}
// Scan all the SyncGroup names found in the Database.
- scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
- stream := sn.Scan(scanStart, scanLimit)
+ stream := sn.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ var sgNames []string
var key []byte
for stream.Advance() {
sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
@@ -599,18 +816,19 @@
return nil, err
}
+ vlog.VI(2).Infof("sync: GetSyncGroupNames: %v", sgNames)
return sgNames, nil
}
func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
- var spec wire.SyncGroupSpec
-
vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
- defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s", sgName)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
+ var spec wire.SyncGroupSpec
+
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
return spec, "", err
@@ -623,41 +841,45 @@
}
// TODO(hpucha): Check SyncGroup ACL.
- spec = sg.Spec
- return spec, sg.SpecVersion, nil
+ vlog.VI(2).Infof("sync: GetSyncGroupSpec: %s spec %v", sgName, sg.Spec)
+ return sg.Spec, sg.SpecVersion, nil
}
func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
- var members map[string]wire.SyncGroupMemberInfo
-
vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
- defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s", sgName)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
- return members, err
+ return nil, err
}
// Get the SyncGroup information.
sg, err := getSyncGroupByName(ctx, sn, sgName)
if err != nil {
- return members, err
+ return nil, err
}
// TODO(hpucha): Check SyncGroup ACL.
- members = sg.Joiners
- return members, nil
+ vlog.VI(2).Infof("sync: GetSyncGroupMembers: %s members %v", sgName, sg.Joiners)
+ return sg.Joiners, nil
}
-// TODO(hpucha): Enable syncing syncgroup metadata.
func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
+ if err := verifySyncGroupSpec(ctx, &spec); err != nil {
+ return err
+ }
+
+ ss := sd.sync.(*syncService)
+ //appName, dbName := sd.db.App().Name(), sd.db.Name()
+
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -669,10 +891,33 @@
return err
}
- // TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+ if version != NoVersion && sg.SpecVersion != version {
+ return verror.NewErrBadVersion(ctx)
+ }
+ // Must not change the SyncGroup prefixes.
+ if !samePrefixes(spec.Prefixes, sg.Spec.Prefixes) {
+ return verror.New(verror.ErrBadArg, ctx, "cannot modify prefixes")
+ }
+
+ sgState, err := getSGIdEntry(ctx, tx, sg.Id)
+ if err != nil {
+ return err
+ }
+ if sgState.SyncPending {
+ return verror.NewErrBadState(ctx)
+ }
+
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
+ gen, pos := uint64(1), uint64(1)
+
+ // TODO(hpucha): Check SyncGroup ACL.
+
+ newVersion := newSyncGroupVersion()
sg.Spec = spec
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ sg.SpecVersion = newVersion
+ return ss.updateSyncGroupVersioning(ctx, tx, newVersion, true, ss.id, gen, pos, sg)
})
return err
}
@@ -680,9 +925,27 @@
//////////////////////////////
// Helper functions
-// TODO(hpucha): Call this periodically until we are able to contact the remote peer.
+// publishSyncGroup publishes the SyncGroup at the remote peer and update its
+// status. If the publish operation is either successful or rejected by the
+// peer, the status is updated to "running" or "rejected" respectively and the
+// function returns "nil" to indicate to the caller there is no need to make
+// further attempts. Otherwise an error (typically RPC error, but could also
+// be a store error) is returned to the caller.
+// TODO(rdaoud): make all SG admins try to publish after they join.
func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
- sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName)
+ st := sd.db.St()
+ ss := sd.sync.(*syncService)
+ //appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+ gid, err := getSyncGroupId(ctx, st, sgName)
+ if err != nil {
+ return err
+ }
+ version, err := getSyncGroupVersion(ctx, st, gid)
+ if err != nil {
+ return err
+ }
+ sg, err := getSGDataEntry(ctx, st, gid, version)
if err != nil {
return err
}
@@ -691,34 +954,74 @@
return nil
}
+ // Note: the remote peer is given the SyncGroup version and genvec at
+ // the point before the post-publish update, at which time the status
+ // and joiner list of the SyncGroup get updated. This is functionally
+ // correct, just not symmetrical with what happens at joiner, which
+ // receives the SyncGroup state post-join.
+ // TODO(rdaoud): send the SyncGroup genvec to the remote peer.
+ status := interfaces.SyncGroupStatusPublishRejected
+
c := interfaces.SyncClient(sgName)
- err = c.PublishSyncGroup(ctx, *sg)
+ peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
- // Publish failed temporarily. Retry later.
- // TODO(hpucha): Is there an RPC error that we can check here?
- if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
- return err
- }
-
- // Publish succeeded.
if err == nil {
- // TODO(hpucha): Get SG Deltas from publisher. Obtaining the
- // new version from the publisher prevents SG conflicts.
- return err
+ status = interfaces.SyncGroupStatusRunning
+ } else {
+ errId := verror.ErrorID(err)
+ if errId == interfaces.ErrDupSyncGroupPublish.ID {
+ // Duplicate publish: another admin already published
+ // the SyncGroup, nothing else needs to happen because
+ // that other admin would have updated the SyncGroup
+ // status and p2p SG sync will propagate the change.
+ // TODO(rdaoud): what if that other admin crashes and
+ // never updates the SyncGroup status (dies permanently
+ // or is ejected before the status update)? Eventually
+ // some admin must decide to update the SG status anyway
+ // even if that causes extra SG mutations and conflicts.
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: duplicate publish", sgName)
+ return nil
+ }
+
+ if errId != verror.ErrExist.ID {
+ // The publish operation failed with an error other
+ // than ErrExist then it must be retried later on.
+ // TODO(hpucha): Is there an RPC error that we can check here?
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: failed, retry later: %v", sgName, err)
+ return err
+ }
}
- // Publish rejected. Persist that to avoid retrying in the
- // future and to remember the split universe scenario.
- err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
+ // The publish operation is done because either it succeeded or it
+ // failed with the ErrExist error. Update the SyncGroup status and, if
+ // the publish was successful, add the remote peer to the SyncGroup.
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: peer %s: done: status %s: %v",
+ sgName, peer, status.String(), err)
+
+ err = store.RunInTransaction(st, func(tx store.Transaction) error {
// Ensure SG still exists.
- sg, err := getSyncGroupByName(ctx, tx, sgName)
+ sg, err := getSyncGroupById(ctx, tx, gid)
if err != nil {
return err
}
- sg.Status = interfaces.SyncGroupStatusPublishRejected
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ // Reserve a log generation and position counts for the new
+ // SyncGroup version.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
+
+ sg.Status = status
+ if status == interfaces.SyncGroupStatusRunning {
+ // TODO(hpucha): Default priority?
+ sg.Joiners[peer] = wire.SyncGroupMemberInfo{}
+ }
+
+ return ss.updateSyncGroupVersioning(ctx, tx, NoVersion, true, ss.id, gen, pos, sg)
})
+ if err != nil {
+ vlog.Errorf("sync: publishSyncGroup: cannot update SyncGroup %s status to %s: %v",
+ sgName, status.String(), err)
+ }
return err
}
@@ -801,7 +1104,7 @@
return nil
}
-func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
c := interfaces.SyncClient(sgName)
return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
@@ -819,36 +1122,49 @@
////////////////////////////////////////////////////////////
// Methods for SyncGroup create/join between Syncbases.
-func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
+func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg interfaces.SyncGroup, version string, genvec interfaces.PrefixGenVector) (string, error) {
st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
if err != nil {
- return err
+ return s.name, err
}
err = store.RunInTransaction(st, func(tx store.Transaction) error {
- localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
-
+ gid, err := getSyncGroupId(ctx, tx, sg.Name)
if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
}
- // SG name already claimed.
- if err == nil && localSG.Id != sg.Id {
- return verror.New(verror.ErrExist, ctx, sg.Name)
- }
-
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG
- // metadata if needed.
- //
- // TODO(hpucha): Catch up on SG versions so far.
-
- // SG already published. Update if needed.
- if err == nil && localSG.Id == sg.Id {
- if localSG.Status == interfaces.SyncGroupStatusPublishPending {
- localSG.Status = interfaces.SyncGroupStatusRunning
- return setSGDataEntry(ctx, tx, localSG.Id, localSG)
+ if err == nil {
+ // SG name already claimed. Note that in this case of
+ // split-brain (same SG name, different IDs), those in
+ // SG ID being rejected here do not benefit from the
+ // de-duping optimization below and will end up making
+ // duplicate SG mutations to set the status, yielding
+ // more SG conflicts. It is functionally correct but
+ // bypasses the de-dup optimization for the rejected SG.
+ if gid != sg.Id {
+ return verror.New(verror.ErrExist, ctx, sg.Name)
}
- return nil
+
+ // SG exists locally, either locally created/joined or
+ // previously published. Make it idempotent for the
+ // same publisher, otherwise it's a duplicate.
+ state, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.RemotePublisher == "" {
+ // Locally created/joined SyncGroup: update its
+ // state to include the publisher.
+ state.RemotePublisher = publisher
+ return setSGIdEntry(ctx, tx, gid, state)
+ }
+ if publisher == state.RemotePublisher {
+ // Same previous publisher: nothing to change,
+ // the old genvec and version info is valid.
+ return nil
+ }
+ return interfaces.NewErrDupSyncGroupPublish(ctx, sg.Name)
}
// Publish the SyncGroup.
@@ -856,23 +1172,21 @@
// TODO(hpucha): Use some ACL check to allow/deny publishing.
// TODO(hpucha): Ensure node is on Admin ACL.
- // TODO(hpucha): Default priority?
- sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
- sg.Status = interfaces.SyncGroupStatusRunning
- return addSyncGroup(ctx, tx, &sg)
+ return s.addSyncGroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
})
- if err != nil {
- return err
+ if err == nil {
+ s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
}
- s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
- return nil
+ return s.name, err
}
-func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
var dbSt store.Store
var gid interfaces.GroupId
var err error
+ var stAppName, stDbName string
+ nullSG, nullGV := interfaces.SyncGroup{}, interfaces.PrefixGenVector{}
// Find the database store for this SyncGroup.
//
@@ -885,6 +1199,7 @@
if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
// Found the SyncGroup being looked for.
dbSt = st
+ stAppName, stDbName = appName, dbName
return true
}
return false
@@ -892,10 +1207,12 @@
// SyncGroup not found.
if err != nil {
- return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
+ return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
}
+ version := newSyncGroupVersion()
var sg *interfaces.SyncGroup
+
err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
var err error
sg, err = getSyncGroupById(ctx, tx, gid)
@@ -908,13 +1225,27 @@
return err
}
+ // Check that the SG is not in pending state.
+ state, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.SyncPending {
+ return verror.NewErrBadState(ctx)
+ }
+
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
+
// Add to joiner list.
sg.Joiners[joinerName] = joinerInfo
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ return s.updateSyncGroupVersioning(ctx, tx, version, true, s.id, gen, pos, sg)
})
if err != nil {
- return interfaces.SyncGroup{}, err
+ return nullSG, "", nullGV, err
}
- return *sg, nil
+ // TODO(rdaoud): return the SyncGroup genvec
+ return *sg, version, nullGV, nil
}