syncbase/vsync: SyncGroup and DAG changes for SG syncing
* Change SyncGroup storage to support their versioning and being tracked
in DAG and log records.
* Update the SyncGroup join & publish calls to support the asynchronous
transition from known-but-pending SyncGroup to caught-up after sync.
* Add a SyncGroup local state info to track the number of local peer
joiners, and the watchability, remote publishing, and sync pending
states of the SyncGroup.
* Add the retry loop for the SyncGroup publishing to the remote peer.
* Improve error checking of SyncGroup prefixes passed in the Spec.
* Add a 1st-cut HasKey() store util API to bypass the VOM decode. This
will be later further optimized for leveldb to avoid a copy of the
value from the C buffer to the Go buffer.
* Add DAG support to fully prune all nodes for an object including its
current head node.
* Remove temporay solution of the responder adding the initiator to the
SyncGroup joiner list.
* Improve unittest coverage.
Change-Id: I4585e248bd9e15b8b9585ec7b1830f8225686b2a
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 6dd6f25..80b46a0 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -19,15 +19,36 @@
// SyncGroup-related methods.
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(sg SyncGroup) error {access.Write}
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string | error) {access.Write}
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (SyncGroup | error) {access.Read}
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector | error) {access.Read}
// BlobSync methods.
@@ -50,3 +71,7 @@
FetchBlobRecipe(br wire.BlobRef) stream<_, ChunkHash> error
FetchChunks() stream<ChunkHash, ChunkData> error
}
+
+error (
+ DupSyncGroupPublish(name string) {"en": "duplicate publish on SyncGroup: {name}"}
+)
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 3006f3c..102699b 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -12,14 +12,29 @@
"io"
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/i18n"
"v.io/v23/rpc"
"v.io/v23/vdl"
+ "v.io/v23/verror"
// VDL user imports
"v.io/v23/security/access"
"v.io/v23/services/syncbase/nosql"
)
+var (
+ ErrDupSyncGroupPublish = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.DupSyncGroupPublish", verror.NoRetry, "{1:}{2:} duplicate publish on SyncGroup: {3}")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDupSyncGroupPublish.ID), "{1:}{2:} duplicate publish on SyncGroup: {3}")
+}
+
+// NewErrDupSyncGroupPublish returns an error with the ErrDupSyncGroupPublish ID.
+func NewErrDupSyncGroupPublish(ctx *context.T, name string) error {
+ return verror.New(ErrDupSyncGroupPublish, ctx, name)
+}
+
// SyncClientMethods is the client interface
// containing Sync methods.
//
@@ -30,14 +45,35 @@
// the missing log records when compared to the initiator's generation
// vector for one Database for either SyncGroup metadata or data.
GetDeltas(ctx *context.T, req DeltaReq, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(ctx *context.T, publisher string, sg SyncGroup, version string, genvec PrefixGenVector, opts ...rpc.CallOpt) (string, error)
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (SyncGroup, error)
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
// HaveBlob verifies that the peer has the requested blob, and if
// present, returns its size.
HaveBlob(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) (int64, error)
@@ -80,13 +116,13 @@
return
}
-func (c implSyncClientStub) PublishSyncGroup(ctx *context.T, i0 SyncGroup, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "PublishSyncGroup", []interface{}{i0}, nil, opts...)
+func (c implSyncClientStub) PublishSyncGroup(ctx *context.T, i0 string, i1 SyncGroup, i2 string, i3 PrefixGenVector, opts ...rpc.CallOpt) (o0 string, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "PublishSyncGroup", []interface{}{i0, i1, i2, i3}, []interface{}{&o0}, opts...)
return
}
-func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0}, opts...)
+func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, o1 string, o2 PrefixGenVector, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0, &o1, &o2}, opts...)
return
}
@@ -440,14 +476,35 @@
// the missing log records when compared to the initiator's generation
// vector for one Database for either SyncGroup metadata or data.
GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, req DeltaReq, initiator string) error
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string, error)
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
// HaveBlob verifies that the peer has the requested blob, and if
// present, returns its size.
HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
@@ -475,14 +532,35 @@
// the missing log records when compared to the initiator's generation
// vector for one Database for either SyncGroup metadata or data.
GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) error
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string, error)
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
// HaveBlob verifies that the peer has the requested blob, and if
// present, returns its size.
HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
@@ -534,11 +612,11 @@
return s.impl.GetDeltas(ctx, call, i0, i1)
}
-func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
- return s.impl.PublishSyncGroup(ctx, call, i0)
+func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 string, i1 SyncGroup, i2 string, i3 PrefixGenVector) (string, error) {
+ return s.impl.PublishSyncGroup(ctx, call, i0, i1, i2, i3)
}
-func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, error) {
+func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, string, PrefixGenVector, error) {
return s.impl.JoinSyncGroupAtAdmin(ctx, call, i0, i1, i2)
}
@@ -586,22 +664,30 @@
},
{
Name: "PublishSyncGroup",
- Doc: "// PublishSyncGroup is typically invoked on a \"central\" peer to publish\n// the SyncGroup.",
+ Doc: "// PublishSyncGroup is invoked on the SyncGroup name (typically served\n// by a \"central\" peer) to publish the SyncGroup. It takes the name of\n// Syncbase doing the publishing (the publisher) and returns the name\n// of the Syncbase where the SyncGroup is published (the publishee).\n// This allows the publisher and the publishee to learn of each other.\n// When a SyncGroup is published, the publishee is given the SyncGroup\n// metadata, its current version at the publisher, and the current\n// SyncGroup generation vector. The generation vector serves as a\n// checkpoint at the time of publishing. The publishing proceeds\n// asynchronously, and the publishee learns the SyncGroup history\n// through the routine p2p sync process and determines when it has\n// caught up to the level of knowledge at the time of publishing using\n// the checkpointed generation vector. Until that point, the publishee\n// locally deems the SyncGroup to be in a pending state and does not\n// mutate it. Thus it locally rejects SyncGroup joins or updates to\n// its spec until it is caught up on the SyncGroup history.",
InArgs: []rpc.ArgDesc{
- {"sg", ``}, // SyncGroup
+ {"publisher", ``}, // string
+ {"sg", ``}, // SyncGroup
+ {"version", ``}, // string
+ {"genvec", ``}, // PrefixGenVector
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // string
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
},
{
Name: "JoinSyncGroupAtAdmin",
- Doc: "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup.",
+ Doc: "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup. It returns a copy of the updated SyncGroup metadata,\n// its version, and the SyncGroup generation vector at the time of the\n// join. Similar to the PublishSyncGroup scenario, the joiner at that\n// point does not have the SyncGroup history and locally deems it to be\n// in a pending state and does not mutate it. This means it rejects\n// local updates to the SyncGroup spec or, if it were also an admin on\n// the SyncGroup, it would reject SyncGroup joins until it is caught up\n// on the SyncGroup history through p2p sync.",
InArgs: []rpc.ArgDesc{
{"sgName", ``}, // string
{"joinerName", ``}, // string
{"myInfo", ``}, // nosql.SyncGroupMemberInfo
},
OutArgs: []rpc.ArgDesc{
- {"", ``}, // SyncGroup
+ {"sg", ``}, // SyncGroup
+ {"version", ``}, // string
+ {"genvec", ``}, // PrefixGenVector
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index b3d96d4..15fa69a 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -52,6 +52,21 @@
return nil
}
+// Exists returns true if the key exists in the store.
+// TODO(rdaoud): for now it only bypasses the Get's VOM decode step. It should
+// be optimized further by adding a st.Exists(k) API and let each implementation
+// do its best to reduce data fetching in its key lookup.
+func Exists(ctx *context.T, st store.StoreReader, k string) (bool, error) {
+ _, err := st.Get([]byte(k), nil)
+ if err != nil {
+ if verror.ErrorID(err) == store.ErrUnknownKey.ID {
+ return false, nil
+ }
+ return false, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return true, nil
+}
+
// GetWithAuth does Get followed by an auth check.
func GetWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, k string, v Permser) error {
if err := Get(ctx, st, k, v); err != nil {
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 22b53f4..3178cb7 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -8,6 +8,7 @@
"fmt"
"math"
"sync"
+ "time"
"v.io/v23/context"
"v.io/v23/verror"
@@ -173,6 +174,12 @@
return tx.itx.Abort()
}
+// GetStoreTime returns the current time from the given transaction store.
+func GetStoreTime(ctx *context.T, tx store.Transaction) time.Time {
+ wtx := tx.(*transaction)
+ return wtx.st.clock.Now(ctx)
+}
+
// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
// that the transaction writes when it is committed. It allows the SyncGroup
// operations (create, join, leave, destroy) to notify the sync watcher of the
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index 40ef3b3..086296c 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -279,13 +279,6 @@
}
}
- // The new node must not exist.
- if ok, err := hasNode(ctx, tx, oid, version); err != nil {
- return err
- } else if ok {
- return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
- }
-
// Verify the parents, determine the node level. Also save the levels
// of the parent nodes for later in this function in graft updates.
parentLevels := make(map[string]uint64)
@@ -620,10 +613,14 @@
// prune trims the DAG of an object at a given version (node) by deleting all
// its ancestor nodes, making it the new root node. For each deleted node it
-// calls the given callback function to delete its log record.
+// calls the given callback function to delete its log record. If NoVersion
+// is given instead, then all object nodes are deleted, including the head node.
//
-// Note: this function should only be used when sync determines that all devices
-// that know about this object have gotten past this version.
+// Note: this function is typically used when sync determines that all devices
+// that know about this object have gotten past this version, as part of its
+// GC operations. It can also be used when an object history is obliterated,
+// for example when destroying a SyncGroup, which is also versioned and tracked
+// in the DAG.
//
// The batch set passed is used to track batches affected by the deletion of DAG
// objects across multiple calls to prune(). It is later given to pruneDone()
@@ -633,21 +630,33 @@
return verror.New(verror.ErrInternal, ctx, "missing batch set")
}
- // Get the node at the pruning point and set its parents to nil.
- // It will become the oldest DAG node (root) for the object.
- node, err := getNode(ctx, tx, oid, version)
- if err != nil {
- return err
- }
- if node.Parents == nil {
- // Nothing to do, this node is already the root.
- return nil
- }
-
- parents := node.Parents
- node.Parents = nil
- if err = setNode(ctx, tx, oid, version, node); err != nil {
- return err
+ var parents []string
+ if version == NoVersion {
+ // Delete all object versions including its head version.
+ head, err := getHead(ctx, tx, oid)
+ if err != nil {
+ return err
+ }
+ if err := delHead(ctx, tx, oid); err != nil {
+ return err
+ }
+ parents = []string{head}
+ } else {
+ // Get the node at the pruning point and set its parents to nil.
+ // It will become the oldest DAG node (root) for the object.
+ node, err := getNode(ctx, tx, oid, version)
+ if err != nil {
+ return err
+ }
+ if node.Parents == nil {
+ // Nothing to do, this node is already the root.
+ return nil
+ }
+ parents = node.Parents
+ node.Parents = nil
+ if err = setNode(ctx, tx, oid, version, node); err != nil {
+ return err
+ }
}
// Delete all ancestor nodes and their log records. Delete as many as
@@ -724,7 +733,7 @@
// setNode stores the DAG node entry.
func setNode(ctx *context.T, tx store.Transaction, oid, version string, node *dagNode) error {
if version == NoVersion {
- return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ vlog.Fatalf("sync: setNode: invalid version: %s", version)
}
return util.Put(ctx, tx, nodeKey(oid, version), node)
@@ -733,7 +742,7 @@
// getNode retrieves the DAG node entry for the given (oid, version).
func getNode(ctx *context.T, st store.StoreReader, oid, version string) (*dagNode, error) {
if version == NoVersion {
- return nil, verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ vlog.Fatalf("sync: getNode: invalid version: %s", version)
}
var node dagNode
@@ -747,7 +756,7 @@
// delNode deletes the DAG node entry.
func delNode(ctx *context.T, tx store.Transaction, oid, version string) error {
if version == NoVersion {
- return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ vlog.Fatalf("sync: delNode: invalid version: %s", version)
}
return util.Delete(ctx, tx, nodeKey(oid, version))
@@ -755,14 +764,11 @@
// hasNode returns true if the node (oid, version) exists in the DAG.
func hasNode(ctx *context.T, st store.StoreReader, oid, version string) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- if _, err := getNode(ctx, st, oid, version); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
+ if version == NoVersion {
+ vlog.Fatalf("sync: hasNode: invalid version: %s", version)
}
- return true, nil
+
+ return util.Exists(ctx, st, nodeKey(oid, version))
}
// headKey returns the key used to access the DAG object head.
@@ -773,7 +779,7 @@
// setHead stores version as the DAG object head.
func setHead(ctx *context.T, tx store.Transaction, oid, version string) error {
if version == NoVersion {
- return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
+ vlog.Fatalf("sync: setHead: invalid version: %s", version)
}
return util.Put(ctx, tx, headKey(oid), version)
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index c72b3a7..57291a4 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -255,11 +255,7 @@
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
- // Make sure an existing node cannot be added again.
tx := st.NewTransaction()
- if err := s.addNode(nil, tx, oid, "2", "foo", false, []string{"1", "3"}, NoBatchId, nil); err == nil {
- t.Errorf("addNode() did not fail when given an existing node")
- }
// Make sure a new node cannot have more than 2 parents.
if err := s.addNode(nil, tx, oid, "4", "foo", false, []string{"1", "2", "3"}, NoBatchId, nil); err == nil {
@@ -780,10 +776,10 @@
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"3", "4"}, "6": {"5"}, "7": {"2"}, "8": {"6", "7"}, "9": {"8"}}
// Loop pruning at an invalid version (333) then at different valid versions.
- testVersions := []string{"333", "1", "2", "6", "8", "9", "9"}
- delCounts := []int{0, 0, 1, 4, 2, 1, 0}
- which := "prune-snip-"
- remain := 9
+ // The last version used (NoVersion) deletes all remaining nodes for the
+ // object, including the head node.
+ testVersions := []string{"333", "1", "2", "6", "8", "9", "9", NoVersion}
+ delCounts := []int{0, 0, 1, 4, 2, 1, 0, 1}
for i, version := range testVersions {
batches := newBatchPruning()
@@ -807,11 +803,13 @@
oid, version, del, delCounts[i])
}
- which += "*"
- remain -= del
-
- if head, err := getHead(nil, st, oid); err != nil || head != "9" {
- t.Errorf("object %s has wrong head: %s", oid, head)
+ head, err := getHead(nil, st, oid)
+ if version != NoVersion {
+ if err != nil || head != "9" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+ } else if err == nil {
+ t.Errorf("found head %s for object %s after pruning all versions", head, oid)
}
tx = st.NewTransaction()
@@ -823,16 +821,20 @@
// Remove pruned nodes from the expected parent map used to validate
// and set the parents of the pruned node to nil.
- intVersion, err := strconv.ParseInt(version, 10, 32)
- if err != nil {
- t.Errorf("invalid version: %s", version)
- }
-
- if intVersion < 10 {
- for j := int64(0); j < intVersion; j++ {
- delete(exp, fmt.Sprintf("%d", j))
+ if version == NoVersion {
+ exp = make(map[string][]string)
+ } else {
+ intVersion, err := strconv.ParseInt(version, 10, 32)
+ if err != nil {
+ t.Errorf("invalid version: %s", version)
}
- exp[version] = nil
+
+ if intVersion < 10 {
+ for j := int64(0); j < intVersion; j++ {
+ delete(exp, fmt.Sprintf("%d", j))
+ }
+ exp[version] = nil
+ }
}
pmap := getParentMap(nil, st, oid, nil)
@@ -903,6 +905,16 @@
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
+
+ // Invalid pruning without a batch set.
+ tx = st.NewTransaction()
+ err = prune(nil, tx, oid, version, nil, func(ctx *context.T, tx store.Transaction, lr string) error {
+ return nil
+ })
+ if err == nil {
+ t.Errorf("pruning object %s:%s without a batch set did not fail", oid, version)
+ }
+ tx.Abort()
}
// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 935539b..7c1a045 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -457,7 +457,7 @@
}
tx := svc.St().NewTransaction()
- if err := addSyncGroup(nil, tx, sg1); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg1); err != nil {
t.Fatalf("cannot add SyncGroup ID %d, err %v", sg1.Id, err)
}
if err := tx.Commit(); err != nil {
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 46d03a1..0c78f52 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -226,7 +226,11 @@
}
err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error {
- sg, err := getSyncGroupById(ctx, tx, gid)
+ version, err := getSyncGroupVersion(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ sg, err := getSGDataEntry(ctx, tx, gid, version)
if err != nil {
return err
}
@@ -239,7 +243,7 @@
vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
- return setSGDataEntry(ctx, tx, gid, sg)
+ return setSGDataEntry(ctx, tx, gid, version, sg)
})
if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index f2e72ae..0b73aad 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -12,6 +12,7 @@
// records in response to a GetDeltas request, it replays those log
// records to get in sync with the sender.
import (
+ "container/list"
"fmt"
"math/rand"
"path"
@@ -73,6 +74,11 @@
syncState map[string]*dbSyncStateInMem
syncStateLock sync.Mutex // lock to protect access to the sync state.
+ // In-memory queue of SyncGroups to be published. It is reconstructed
+ // at startup from SyncGroup info so it does not need to be persisted.
+ sgPublishQueue *list.List
+ sgPublishQueueLock sync.Mutex
+
// In-memory tracking of batches during their construction.
// The sync Initiator and Watcher build batches incrementally here
// and then persist them in DAG batch entries. The mutex guards
@@ -124,8 +130,9 @@
// sync module responds to incoming RPCs from remote sync modules.
func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, rootDir string) (*syncService, error) {
s := &syncService{
- sv: sv,
- batches: make(batchSet),
+ sv: sv,
+ batches: make(batchSet),
+ sgPublishQueue: list.New(),
}
data := &syncData{}
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 1eebf88..d820d8f 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -53,6 +53,7 @@
import (
"fmt"
"strconv"
+ "time"
"v.io/v23/context"
"v.io/v23/verror"
@@ -104,12 +105,23 @@
return out
}
+// sgPublishInfo holds information on a SyncGroup waiting to be published to a
+// remote peer. It is an in-memory entry in a queue of pending SyncGroups.
+type sgPublishInfo struct {
+ sgName string
+ appName string
+ dbName string
+ queued time.Time
+ lastTry time.Time
+}
+
// initSync initializes the sync module during startup. It scans all the
// databases across all apps to initialize the following:
// a) in-memory sync state of a Database and all its SyncGroups consisting of
// the current generation number, log position and generation vector.
// b) watcher map of prefixes currently being synced.
// c) republish names in mount tables for all syncgroups.
+// d) in-memory queue of SyncGroups to be published.
//
// TODO(hpucha): This is incomplete. Flesh this out further.
func (s *syncService) initSync(ctx *context.T) error {
@@ -133,6 +145,10 @@
for _, prefix := range sg.Spec.Prefixes {
incrWatchPrefix(appName, dbName, prefix)
}
+
+ if sg.Status == interfaces.SyncGroupStatusPublishPending {
+ s.enqueuePublishSyncGroup(sg.Name, appName, dbName, false)
+ }
return false
})
@@ -164,6 +180,23 @@
return errFinal
}
+// enqueuePublishSyncGroup appends the given SyncGroup to the publish queue.
+func (s *syncService) enqueuePublishSyncGroup(sgName, appName, dbName string, attempted bool) {
+ s.sgPublishQueueLock.Lock()
+ defer s.sgPublishQueueLock.Unlock()
+
+ entry := &sgPublishInfo{
+ sgName: sgName,
+ appName: appName,
+ dbName: dbName,
+ queued: time.Now(),
+ }
+ if attempted {
+ entry.lastTry = entry.queued
+ }
+ s.sgPublishQueue.PushBack(entry)
+}
+
// Note: For all the utilities below, if the sgid parameter is non-nil, the
// operation is performed in the SyncGroup space. If nil, it is performed in the
// data space for the Database.
@@ -420,15 +453,7 @@
// hasLogRec returns true if the log record for (devid, gen) exists.
func hasLogRec(st store.StoreReader, pfx string, id, gen uint64) (bool, error) {
- // TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
- var rec localLogRec
- if err := util.Get(nil, st, logRecKey(pfx, id, gen), &rec); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return util.Exists(nil, st, logRecKey(pfx, id, gen))
}
// putLogRec stores the log record.
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 91ba499..2fb5479 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -81,6 +81,7 @@
}
// verifySyncGroup verifies if a SyncGroup struct is well-formed.
+// TODO(rdaoud): define verrors for all ErrBadArg cases.
func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
if sg == nil {
return verror.New(verror.ErrBadArg, ctx, "group information not specified")
@@ -106,40 +107,157 @@
if len(sg.Joiners) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
}
- if len(sg.Spec.Prefixes) == 0 {
+ return verifySyncGroupSpec(ctx, &sg.Spec)
+}
+
+// verifySyncGroupSpec verifies if a SyncGroupSpec is well-formed.
+func verifySyncGroupSpec(ctx *context.T, spec *wire.SyncGroupSpec) error {
+ if spec == nil {
+ return verror.New(verror.ErrBadArg, ctx, "group spec not specified")
+ }
+ if len(spec.Prefixes) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
}
+
+ // Duplicate prefixes are not allowed.
+ prefixes := make(map[string]bool, len(spec.Prefixes))
+ for _, pfx := range spec.Prefixes {
+ prefixes[pfx] = true
+ }
+ if len(prefixes) != len(spec.Prefixes) {
+ return verror.New(verror.ErrBadArg, ctx, "group has duplicate prefixes specified")
+ }
return nil
}
-// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
- // Verify SyncGroup before storing it since it may have been received
- // from a remote peer.
+// samePrefixes returns true if the two sets of prefixes are the same.
+func samePrefixes(pfx1, pfx2 []string) bool {
+ pfxMap := make(map[string]uint8)
+ for _, p := range pfx1 {
+ pfxMap[p] |= 0x01
+ }
+ for _, p := range pfx2 {
+ pfxMap[p] |= 0x02
+ }
+ for _, mask := range pfxMap {
+ if mask != 0x03 {
+ return false
+ }
+ }
+ return true
+}
+
+// addSyncGroup adds a new SyncGroup given its version and information. This
+// also includes creating a DAG node entry and updating the DAG head. If the
+// caller is the creator of the SyncGroup, a local log record is also created
+// using the given server ID and gen and pos counters to index the log record.
+// Otherwise, it's a joiner case and the SyncGroup is put in a pending state
+// (waiting for its full metadata to be synchronized) and the log record is
+// skipped, delaying its creation till the Initiator does p2p sync.
+func (s *syncService) addSyncGroup(ctx *context.T, tx store.Transaction, version string, creator bool, remotePublisher string, genvec interfaces.PrefixGenVector, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+ // Verify the SyncGroup information before storing it since it may have
+ // been received from a remote peer.
if err := verifySyncGroup(ctx, sg); err != nil {
return err
}
- if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
- return err
- } else if ok {
- return verror.New(verror.ErrExist, ctx, "group id already exists")
- }
+ // Add the group name and ID entries.
if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
return err
} else if ok {
return verror.New(verror.ErrExist, ctx, "group name already exists")
}
+ if ok, err := hasSGIdEntry(tx, sg.Id); err != nil {
+ return err
+ } else if ok {
+ return verror.New(verror.ErrExist, ctx, "group id already exists")
+ }
- // Add the group name and data entries.
+ state := sgLocalState{
+ RemotePublisher: remotePublisher,
+ SyncPending: !creator,
+ PendingGenVec: genvec,
+ }
+ if remotePublisher == "" {
+ state.NumLocalJoiners = 1
+ }
+
if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
return err
}
- if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
+ if err := setSGIdEntry(ctx, tx, sg.Id, &state); err != nil {
return err
}
- return nil
+ // Add the SyncGroup versioned data entry.
+ if ok, err := hasSGDataEntry(tx, sg.Id, version); err != nil {
+ return err
+ } else if ok {
+ return verror.New(verror.ErrExist, ctx, "group id version already exists")
+ }
+
+ return s.updateSyncGroupVersioning(ctx, tx, version, creator, servId, gen, pos, sg)
+}
+
+// updateSyncGroupVersioning updates the per-version information of a SyncGroup.
+// It writes a new versioned copy of the SyncGroup data entry, a new DAG node,
+// and updates the DAG head. Optionally, it also writes a new local log record
+// using the given server ID and gen and pos counters to index it. The caller
+// can provide the version number to use otherwise, if NoVersion is given, a new
+// version is generated by the function.
+// TODO(rdaoud): hook SyncGroup mutations (and deletions) to the watch log so
+// apps can monitor SG changes as well.
+func (s *syncService) updateSyncGroupVersioning(ctx *context.T, tx store.Transaction, version string, withLog bool, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+ if version == NoVersion {
+ version = newSyncGroupVersion()
+ }
+
+ // Add the SyncGroup versioned data entry.
+ if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+ return err
+ }
+
+ // Add a sync log record for the SyncGroup if needed.
+ oid := sgIdKey(sg.Id)
+ logKey := ""
+ if withLog {
+ if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
+ return err
+ }
+ logKey = logRecKey(oid, servId, gen)
+ }
+
+ // Add the SyncGroup to the DAG.
+ var parents []string
+ if head, err := getHead(ctx, tx, oid); err == nil {
+ parents = []string{head}
+ } else if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return err
+ }
+ if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
+ return err
+ }
+ return setHead(ctx, tx, oid, version)
+}
+
+// addSyncGroupLogRec adds a new local log record for a SyncGroup.
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
+ oid := sgIdKey(gid)
+ rec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{
+ ObjId: oid,
+ CurVers: version,
+ Delete: false,
+ UpdTime: watchable.GetStoreTime(ctx, tx),
+ Id: servId,
+ Gen: gen,
+ RecType: interfaces.NodeRec,
+ BatchId: NoBatchId,
+ },
+ Pos: pos,
+ }
+
+ return putLogRec(ctx, tx, oid, rec)
}
// getSyncGroupId retrieves the SyncGroup ID given its name.
@@ -147,18 +265,18 @@
return getSGNameEntry(ctx, st, name)
}
-// getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
- sg, err := getSyncGroupById(ctx, st, gid)
- if err != nil {
- return "", err
- }
- return sg.Name, nil
+// getSyncGroupVersion retrieves the current version of the SyncGroup.
+func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
+ return getHead(ctx, st, sgIdKey(gid))
}
// getSyncGroupById retrieves the SyncGroup given its ID.
func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
- return getSGDataEntry(ctx, st, gid)
+ version, err := getSyncGroupVersion(ctx, st, gid)
+ if err != nil {
+ return nil, err
+ }
+ return getSGDataEntry(ctx, st, gid, version)
}
// getSyncGroupByName retrieves the SyncGroup given its name.
@@ -176,19 +294,53 @@
if err != nil {
return err
}
- if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
- return err
- }
- return delSGDataEntry(ctx, tx, sg.Id)
+ return delSyncGroupByName(ctx, tx, sg.Name)
}
// delSyncGroupByName deletes the SyncGroup given its name.
func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
+ // Get the SyncGroup ID and current version.
gid, err := getSyncGroupId(ctx, tx, name)
if err != nil {
return err
}
- return delSyncGroupById(ctx, tx, gid)
+ version, err := getSyncGroupVersion(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ // Delete the name and ID entries.
+ if err := delSGNameEntry(ctx, tx, name); err != nil {
+ return err
+ }
+ if err := delSGIdEntry(ctx, tx, gid); err != nil {
+ return err
+ }
+
+ // Delete all versioned SyncGroup data entries (same versions as DAG
+ // nodes). This is done separately from pruning the DAG nodes because
+ // some nodes may have no log record pointing back to the SyncGroup data
+ // entries (loose coupling to support the pending SyncGroup state).
+ oid := sgIdKey(gid)
+ err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
+ return delSGDataEntry(ctx, tx, gid, v)
+ })
+ if err != nil {
+ return err
+ }
+
+ // Delete all DAG nodes and log records.
+ bset := newBatchPruning()
+ err = prune(ctx, tx, oid, NoVersion, bset, func(ctx *context.T, tx store.Transaction, lr string) error {
+ if lr != "" {
+ return util.Delete(ctx, tx, lr)
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ return pruneDone(ctx, tx, bset)
}
// refreshMembersIfExpired updates the aggregate view of SyncGroup members
@@ -252,16 +404,23 @@
// make forEachSyncGroup() stop the iteration earlier; otherwise the function
// loops across all SyncGroups in the Database.
func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
- scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
- stream := st.Scan(scanStart, scanLimit)
+ stream := st.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ defer stream.Cancel()
+
for stream.Advance() {
- var sg interfaces.SyncGroup
- if vom.Decode(stream.Value(nil), &sg) != nil {
- vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
+ var gid interfaces.GroupId
+ if vom.Decode(stream.Value(nil), &gid) != nil {
+ vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup ID for key %s", string(stream.Key(nil)))
continue
}
- if callback(&sg) {
+ sg, err := getSyncGroupById(nil, st, gid)
+ if err != nil {
+ vlog.Errorf("sync: forEachSyncGroup: cannot get SyncGroup %d: %v", gid, err)
+ continue
+ }
+
+ if callback(sg) {
break // done, early exit
}
}
@@ -324,63 +483,69 @@
// Use the functions above to manipulate SyncGroups.
var (
- // sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
- sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-
- // sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
- sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+ // Prefixes used to store the different mappings of a SyncGroup:
+ // sgNameKeyPrefix: name --> ID
+ // sgIdKeyPrefix: ID --> SyncGroup local state
+ // sgDataKeyPrefix: (ID, version) --> SyncGroup data (synchronized)
+ //
+ // Note: as with other syncable objects, the DAG "heads" table contains
+ // a reference to the current SyncGroup version, and the DAG "nodes"
+ // table tracks its history of mutations.
+ // TODO(rdaoud): change the data key prefix to use the SG OID instead
+ // of its ID, to be similar to the versioned user data keys. The OID
+ // would use another SG-data prefix: "$sync:sgd:<gid>" and the data
+ // entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
+ sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+ sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
+ sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
)
-// sgDataKey returns the key used to access the SyncGroup data entry.
-func sgDataKey(gid interfaces.GroupId) string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
+// sgIdStr returns the SyncGroup ID in string format.
+// TODO(rdaoud): delete when the SG ID becomes a string throughout.
+func sgIdStr(gid interfaces.GroupId) string {
+ return fmt.Sprintf("%d", uint64(gid))
}
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
+ return util.JoinKeyParts(sgNameKeyPrefix, name)
+}
+
+// sgIdKey returns the key used to access the SyncGroup ID entry.
+func sgIdKey(gid interfaces.GroupId) string {
+ return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgDataKey returns the key used to access a version of the SyncGroup data.
+func sgDataKey(gid interfaces.GroupId, version string) string {
+ return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
}
// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
func splitSgNameKey(ctx *context.T, key string) (string, error) {
- prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
-
// Note that the actual SyncGroup name may contain ":" as a separator.
- if !strings.HasPrefix(key, prefix) {
+ // So don't split the key on the separator, instead trim its prefix.
+ prefix := util.JoinKeyParts(sgNameKeyPrefix, "")
+ name := strings.TrimPrefix(key, prefix)
+ if name == key {
return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
}
- return strings.TrimPrefix(key, prefix), nil
-}
-
-// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var sg interfaces.SyncGroup
- if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return name, nil
}
// hasSGNameEntry returns true if the SyncGroup name entry exists.
func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var gid interfaces.GroupId
- if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return util.Exists(nil, sntx, sgNameKey(name))
}
-// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
- return util.Put(ctx, tx, sgDataKey(gid), sg)
+// hasSGIdEntry returns true if the SyncGroup ID entry exists.
+func hasSGIdEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
+ return util.Exists(nil, sntx, sgIdKey(gid))
+}
+
+// hasSGDataEntry returns true if the SyncGroup versioned data entry exists.
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId, version string) (bool, error) {
+ return util.Exists(nil, sntx, sgDataKey(gid, version))
}
// setSGNameEntry stores the SyncGroup name entry.
@@ -388,34 +553,58 @@
return util.Put(ctx, tx, sgNameKey(name), gid)
}
-// getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
+// setSGIdEntry stores the SyncGroup ID entry.
+func setSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, state *sgLocalState) error {
+ return util.Put(ctx, tx, sgIdKey(gid), state)
+}
+
+// setSGDataEntry stores the SyncGroup versioned data entry.
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
+ return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+}
+
+// getSGNameEntry retrieves the SyncGroup ID for a given name.
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
+ var gid interfaces.GroupId
+ if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
+ return interfaces.NoGroupId, err
+ }
+ return gid, nil
+}
+
+// getSGIdEntry retrieves the SyncGroup local state for a given group ID.
+func getSGIdEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*sgLocalState, error) {
+ var state sgLocalState
+ if err := util.Get(ctx, st, sgIdKey(gid), &state); err != nil {
+ return nil, err
+ }
+ return &state, nil
+}
+
+// getSGDataEntry retrieves the SyncGroup data for a given group ID and version.
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId, version string) (*interfaces.SyncGroup, error) {
var sg interfaces.SyncGroup
- if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil {
+ if err := util.Get(ctx, st, sgDataKey(gid, version), &sg); err != nil {
return nil, err
}
return &sg, nil
}
-// getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
- var gid interfaces.GroupId
- if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
- return gid, err
- }
- return gid, nil
-}
-
-// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
- return util.Delete(ctx, tx, sgDataKey(gid))
-}
-
-// delSGNameEntry deletes the SyncGroup name to ID mapping.
+// delSGNameEntry deletes the SyncGroup name entry.
func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
return util.Delete(ctx, tx, sgNameKey(name))
}
+// delSGIdEntry deletes the SyncGroup ID entry.
+func delSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
+ return util.Delete(ctx, tx, sgIdKey(gid))
+}
+
+// delSGDataEntry deletes the SyncGroup versioned data entry.
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string) error {
+ return util.Delete(ctx, tx, sgDataKey(gid, version))
+}
+
////////////////////////////////////////////////////////////
// SyncGroup methods between Client and Syncbase.
@@ -424,9 +613,22 @@
vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
- // Get this Syncbase's sync module handle.
ss := sd.sync.(*syncService)
- var sg *interfaces.SyncGroup
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+ // Instantiate sg. Add self as joiner.
+ gid, version := newSyncGroupId(), newSyncGroupVersion()
+ sg := &interfaces.SyncGroup{
+ Id: gid,
+ Name: sgName,
+ SpecVersion: version,
+ Spec: spec,
+ Creator: ss.name,
+ AppName: appName,
+ DbName: dbName,
+ Status: interfaces.SyncGroupStatusPublishPending,
+ Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
+ }
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
@@ -436,29 +638,17 @@
// TODO(hpucha): Check prefix ACLs on all SG prefixes.
// This may need another method on util.Database interface.
-
// TODO(hpucha): Do some SG ACL checking. Check creator
// has Admin privilege.
- // Instantiate sg. Add self as joiner.
- sg = &interfaces.SyncGroup{
- Id: newSyncGroupId(),
- Name: sgName,
- SpecVersion: newSyncGroupVersion(),
- Spec: spec,
- Creator: ss.name,
- AppName: sd.db.App().Name(),
- DbName: sd.db.Name(),
- Status: interfaces.SyncGroupStatusPublishPending,
- Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
- }
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
- if err := addSyncGroup(ctx, tx, sg); err != nil {
+ if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
return err
}
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
// Take a snapshot of the data to bootstrap the SyncGroup.
return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
})
@@ -467,9 +657,13 @@
return err
}
- ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
- // Local SG create succeeded. Publish the SG at the chosen server.
- sd.publishSyncGroup(ctx, call, sgName)
+ ss.initSyncStateInMem(ctx, appName, dbName, gid)
+
+ // Local SG create succeeded. Publish the SG at the chosen server, or if
+ // that fails, enqueue it for later publish retries.
+ if err := sd.publishSyncGroup(ctx, call, sgName); err != nil {
+ ss.enqueuePublishSyncGroup(sgName, appName, dbName, true)
+ }
// Publish at the chosen mount table and in the neighborhood.
sd.publishInMountTables(ctx, call, spec)
@@ -492,24 +686,54 @@
return err
}
- // Check if SyncGroup already exists.
- sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
+ // Check if SyncGroup already exists and get its info.
+ var gid interfaces.GroupId
+ gid, sgErr = getSyncGroupId(ctx, tx, sgName)
if sgErr != nil {
return sgErr
}
- // SyncGroup already exists. Possibilities include created
- // locally, already joined locally or published at the device as
- // a result of SyncGroup creation on a different device.
- //
- // TODO(hpucha): Handle the above cases. If the SG was published
- // locally, but not joined, we need to bootstrap the DAG and
- // watcher. If multiple joins are done locally, we may want to
- // ref count the SG state and track the leaves accordingly. So
- // we may need to add some local state for each SyncGroup.
+ sg, sgErr = getSyncGroupById(ctx, tx, gid)
+ if sgErr != nil {
+ return sgErr
+ }
// Check SG ACL.
- return authorize(ctx, call.Security(), sg)
+ if err := authorize(ctx, call.Security(), sg); err != nil {
+ return err
+ }
+
+ // SyncGroup already exists, increment the number of local
+ // joiners in its local state information. This presents
+ // different scenarios:
+ // 1- An additional local joiner: the current number of local
+ // joiners is > 0 and the SyncGroup was already bootstrapped
+ // to the Watcher, so there is nothing else to do.
+ // 2- A new local joiner after all previous local joiners had
+ // left: the number of local joiners is 0, the Watcher must
+ // be re-notified via a SyncGroup bootstrap because the last
+ // previous joiner to leave had un-notified the Watcher. In
+ // this scenario the SyncGroup was not destroyed after the
+ // last joiner left because the SyncGroup was also published
+ // here by a remote peer and thus cannot be destroyed only
+ // based on the local joiners.
+ // 3- A first local joiner for a SyncGroup that was published
+ // here from a remote Syncbase: the number of local joiners
+ // is also 0 (and the remote publish flag is set), and the
+ // Watcher must be notified via a SyncGroup bootstrap.
+ // Conclusion: bootstrap if the number of local joiners is 0.
+ sgState, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ if sgState.NumLocalJoiners == 0 {
+ if err := sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes); err != nil {
+ return err
+ }
+ }
+ sgState.NumLocalJoiners++
+ return setSGIdEntry(ctx, tx, gid, sgState)
})
// The presented blessing is allowed to make this Syncbase instance join
@@ -532,48 +756,41 @@
ss := sd.sync.(*syncService)
// Contact a SyncGroup Admin to join the SyncGroup.
- sg = &interfaces.SyncGroup{}
- *sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
+ sg2, version, genvec, err := sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
if err != nil {
return nullSpec, err
}
// Verify that the app/db combination is valid for this SyncGroup.
- if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
+ if sg2.AppName != appName || sg2.DbName != dbName {
return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
}
err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
-
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
- // TODO(hpucha): Get SG Deltas from Admin device.
-
- if err := addSyncGroup(ctx, tx, sg); err != nil {
+ if err := ss.addSyncGroup(ctx, tx, version, false, "", genvec, 0, 0, 0, &sg2); err != nil {
return err
}
// Take a snapshot of the data to bootstrap the SyncGroup.
- return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
+ return sd.bootstrapSyncGroup(ctx, tx, sg2.Spec.Prefixes)
})
if err != nil {
return nullSpec, err
}
- ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+ ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
// Publish at the chosen mount table and in the neighborhood.
- sd.publishInMountTables(ctx, call, sg.Spec)
+ sd.publishInMountTables(ctx, call, sg2.Spec)
- return sg.Spec, nil
+ return sg2.Spec, nil
}
func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
- var sgNames []string
-
vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
- defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end")
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
@@ -584,8 +801,8 @@
}
// Scan all the SyncGroup names found in the Database.
- scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
- stream := sn.Scan(scanStart, scanLimit)
+ stream := sn.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ var sgNames []string
var key []byte
for stream.Advance() {
sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
@@ -599,18 +816,19 @@
return nil, err
}
+ vlog.VI(2).Infof("sync: GetSyncGroupNames: %v", sgNames)
return sgNames, nil
}
func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
- var spec wire.SyncGroupSpec
-
vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
- defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s", sgName)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
+ var spec wire.SyncGroupSpec
+
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
return spec, "", err
@@ -623,41 +841,45 @@
}
// TODO(hpucha): Check SyncGroup ACL.
- spec = sg.Spec
- return spec, sg.SpecVersion, nil
+ vlog.VI(2).Infof("sync: GetSyncGroupSpec: %s spec %v", sgName, sg.Spec)
+ return sg.Spec, sg.SpecVersion, nil
}
func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
- var members map[string]wire.SyncGroupMemberInfo
-
vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
- defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s", sgName)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
- return members, err
+ return nil, err
}
// Get the SyncGroup information.
sg, err := getSyncGroupByName(ctx, sn, sgName)
if err != nil {
- return members, err
+ return nil, err
}
// TODO(hpucha): Check SyncGroup ACL.
- members = sg.Joiners
- return members, nil
+ vlog.VI(2).Infof("sync: GetSyncGroupMembers: %s members %v", sgName, sg.Joiners)
+ return sg.Joiners, nil
}
-// TODO(hpucha): Enable syncing syncgroup metadata.
func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
+ if err := verifySyncGroupSpec(ctx, &spec); err != nil {
+ return err
+ }
+
+ ss := sd.sync.(*syncService)
+ //appName, dbName := sd.db.App().Name(), sd.db.Name()
+
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -669,10 +891,33 @@
return err
}
- // TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+ if version != NoVersion && sg.SpecVersion != version {
+ return verror.NewErrBadVersion(ctx)
+ }
+ // Must not change the SyncGroup prefixes.
+ if !samePrefixes(spec.Prefixes, sg.Spec.Prefixes) {
+ return verror.New(verror.ErrBadArg, ctx, "cannot modify prefixes")
+ }
+
+ sgState, err := getSGIdEntry(ctx, tx, sg.Id)
+ if err != nil {
+ return err
+ }
+ if sgState.SyncPending {
+ return verror.NewErrBadState(ctx)
+ }
+
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
+ gen, pos := uint64(1), uint64(1)
+
+ // TODO(hpucha): Check SyncGroup ACL.
+
+ newVersion := newSyncGroupVersion()
sg.Spec = spec
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ sg.SpecVersion = newVersion
+ return ss.updateSyncGroupVersioning(ctx, tx, newVersion, true, ss.id, gen, pos, sg)
})
return err
}
@@ -680,9 +925,27 @@
//////////////////////////////
// Helper functions
-// TODO(hpucha): Call this periodically until we are able to contact the remote peer.
+// publishSyncGroup publishes the SyncGroup at the remote peer and update its
+// status. If the publish operation is either successful or rejected by the
+// peer, the status is updated to "running" or "rejected" respectively and the
+// function returns "nil" to indicate to the caller there is no need to make
+// further attempts. Otherwise an error (typically RPC error, but could also
+// be a store error) is returned to the caller.
+// TODO(rdaoud): make all SG admins try to publish after they join.
func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
- sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName)
+ st := sd.db.St()
+ ss := sd.sync.(*syncService)
+ //appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+ gid, err := getSyncGroupId(ctx, st, sgName)
+ if err != nil {
+ return err
+ }
+ version, err := getSyncGroupVersion(ctx, st, gid)
+ if err != nil {
+ return err
+ }
+ sg, err := getSGDataEntry(ctx, st, gid, version)
if err != nil {
return err
}
@@ -691,34 +954,74 @@
return nil
}
+ // Note: the remote peer is given the SyncGroup version and genvec at
+ // the point before the post-publish update, at which time the status
+ // and joiner list of the SyncGroup get updated. This is functionally
+ // correct, just not symmetrical with what happens at joiner, which
+ // receives the SyncGroup state post-join.
+ // TODO(rdaoud): send the SyncGroup genvec to the remote peer.
+ status := interfaces.SyncGroupStatusPublishRejected
+
c := interfaces.SyncClient(sgName)
- err = c.PublishSyncGroup(ctx, *sg)
+ peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
- // Publish failed temporarily. Retry later.
- // TODO(hpucha): Is there an RPC error that we can check here?
- if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
- return err
- }
-
- // Publish succeeded.
if err == nil {
- // TODO(hpucha): Get SG Deltas from publisher. Obtaining the
- // new version from the publisher prevents SG conflicts.
- return err
+ status = interfaces.SyncGroupStatusRunning
+ } else {
+ errId := verror.ErrorID(err)
+ if errId == interfaces.ErrDupSyncGroupPublish.ID {
+ // Duplicate publish: another admin already published
+ // the SyncGroup, nothing else needs to happen because
+ // that other admin would have updated the SyncGroup
+ // status and p2p SG sync will propagate the change.
+ // TODO(rdaoud): what if that other admin crashes and
+ // never updates the SyncGroup status (dies permanently
+ // or is ejected before the status update)? Eventually
+ // some admin must decide to update the SG status anyway
+ // even if that causes extra SG mutations and conflicts.
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: duplicate publish", sgName)
+ return nil
+ }
+
+ if errId != verror.ErrExist.ID {
+ // The publish operation failed with an error other
+ // than ErrExist then it must be retried later on.
+ // TODO(hpucha): Is there an RPC error that we can check here?
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: failed, retry later: %v", sgName, err)
+ return err
+ }
}
- // Publish rejected. Persist that to avoid retrying in the
- // future and to remember the split universe scenario.
- err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
+ // The publish operation is done because either it succeeded or it
+ // failed with the ErrExist error. Update the SyncGroup status and, if
+ // the publish was successful, add the remote peer to the SyncGroup.
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: peer %s: done: status %s: %v",
+ sgName, peer, status.String(), err)
+
+ err = store.RunInTransaction(st, func(tx store.Transaction) error {
// Ensure SG still exists.
- sg, err := getSyncGroupByName(ctx, tx, sgName)
+ sg, err := getSyncGroupById(ctx, tx, gid)
if err != nil {
return err
}
- sg.Status = interfaces.SyncGroupStatusPublishRejected
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ // Reserve a log generation and position counts for the new
+ // SyncGroup version.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
+
+ sg.Status = status
+ if status == interfaces.SyncGroupStatusRunning {
+ // TODO(hpucha): Default priority?
+ sg.Joiners[peer] = wire.SyncGroupMemberInfo{}
+ }
+
+ return ss.updateSyncGroupVersioning(ctx, tx, NoVersion, true, ss.id, gen, pos, sg)
})
+ if err != nil {
+ vlog.Errorf("sync: publishSyncGroup: cannot update SyncGroup %s status to %s: %v",
+ sgName, status.String(), err)
+ }
return err
}
@@ -801,7 +1104,7 @@
return nil
}
-func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
c := interfaces.SyncClient(sgName)
return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
@@ -819,36 +1122,49 @@
////////////////////////////////////////////////////////////
// Methods for SyncGroup create/join between Syncbases.
-func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
+func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg interfaces.SyncGroup, version string, genvec interfaces.PrefixGenVector) (string, error) {
st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
if err != nil {
- return err
+ return s.name, err
}
err = store.RunInTransaction(st, func(tx store.Transaction) error {
- localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
-
+ gid, err := getSyncGroupId(ctx, tx, sg.Name)
if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
}
- // SG name already claimed.
- if err == nil && localSG.Id != sg.Id {
- return verror.New(verror.ErrExist, ctx, sg.Name)
- }
-
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG
- // metadata if needed.
- //
- // TODO(hpucha): Catch up on SG versions so far.
-
- // SG already published. Update if needed.
- if err == nil && localSG.Id == sg.Id {
- if localSG.Status == interfaces.SyncGroupStatusPublishPending {
- localSG.Status = interfaces.SyncGroupStatusRunning
- return setSGDataEntry(ctx, tx, localSG.Id, localSG)
+ if err == nil {
+ // SG name already claimed. Note that in this case of
+ // split-brain (same SG name, different IDs), those in
+ // SG ID being rejected here do not benefit from the
+ // de-duping optimization below and will end up making
+ // duplicate SG mutations to set the status, yielding
+ // more SG conflicts. It is functionally correct but
+ // bypasses the de-dup optimization for the rejected SG.
+ if gid != sg.Id {
+ return verror.New(verror.ErrExist, ctx, sg.Name)
}
- return nil
+
+ // SG exists locally, either locally created/joined or
+ // previously published. Make it idempotent for the
+ // same publisher, otherwise it's a duplicate.
+ state, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.RemotePublisher == "" {
+ // Locally created/joined SyncGroup: update its
+ // state to include the publisher.
+ state.RemotePublisher = publisher
+ return setSGIdEntry(ctx, tx, gid, state)
+ }
+ if publisher == state.RemotePublisher {
+ // Same previous publisher: nothing to change,
+ // the old genvec and version info is valid.
+ return nil
+ }
+ return interfaces.NewErrDupSyncGroupPublish(ctx, sg.Name)
}
// Publish the SyncGroup.
@@ -856,23 +1172,21 @@
// TODO(hpucha): Use some ACL check to allow/deny publishing.
// TODO(hpucha): Ensure node is on Admin ACL.
- // TODO(hpucha): Default priority?
- sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
- sg.Status = interfaces.SyncGroupStatusRunning
- return addSyncGroup(ctx, tx, &sg)
+ return s.addSyncGroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
})
- if err != nil {
- return err
+ if err == nil {
+ s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
}
- s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
- return nil
+ return s.name, err
}
-func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
var dbSt store.Store
var gid interfaces.GroupId
var err error
+ var stAppName, stDbName string
+ nullSG, nullGV := interfaces.SyncGroup{}, interfaces.PrefixGenVector{}
// Find the database store for this SyncGroup.
//
@@ -885,6 +1199,7 @@
if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
// Found the SyncGroup being looked for.
dbSt = st
+ stAppName, stDbName = appName, dbName
return true
}
return false
@@ -892,10 +1207,12 @@
// SyncGroup not found.
if err != nil {
- return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
+ return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
}
+ version := newSyncGroupVersion()
var sg *interfaces.SyncGroup
+
err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
var err error
sg, err = getSyncGroupById(ctx, tx, gid)
@@ -908,13 +1225,27 @@
return err
}
+ // Check that the SG is not in pending state.
+ state, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.SyncPending {
+ return verror.NewErrBadState(ctx)
+ }
+
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
+
// Add to joiner list.
sg.Joiners[joinerName] = joinerInfo
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ return s.updateSyncGroupVersioning(ctx, tx, version, true, s.id, gen, pos, sg)
})
if err != nil {
- return interfaces.SyncGroup{}, err
+ return nullSG, "", nullGV, err
}
- return *sg, nil
+ // TODO(rdaoud): return the SyncGroup genvec
+ return *sg, version, nullGV, nil
}
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 4dc5845..af44cfb 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -50,6 +50,7 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
checkSGStats(t, svc, "add-1", 0, 0)
@@ -57,6 +58,7 @@
sgName := "foobar"
sgId := interfaces.GroupId(1234)
+ version := "v111"
sg := &interfaces.SyncGroup{
Name: sgName,
@@ -76,7 +78,7 @@
}
tx := st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err != nil {
+ if err := s.addSyncGroup(nil, tx, version, true, "", nil, s.id, 1, 1, sg); err != nil {
t.Errorf("cannot add SyncGroup ID %d: %v", sg.Id, err)
}
if err := tx.Commit(); err != nil {
@@ -88,10 +90,6 @@
if id, err := getSyncGroupId(nil, st, sgName); err != nil || id != sgId {
t.Errorf("cannot get ID of SyncGroup %s: got %d instead of %d; err: %v", sgName, id, sgId, err)
}
- if name, err := getSyncGroupName(nil, st, sgId); err != nil || name != sgName {
- t.Errorf("cannot get name of SyncGroup %d: got %s instead of %s; err: %v",
- sgId, name, sgName, err)
- }
sgOut, err := getSyncGroupById(nil, st, sgId)
if err != nil {
@@ -150,7 +148,7 @@
sg.Name = "another-name"
tx = st.NewTransaction()
- if err = addSyncGroup(nil, tx, sg); err == nil {
+ if err = s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg); err == nil {
t.Errorf("re-adding SyncGroup %d did not fail", sgId)
}
tx.Abort()
@@ -159,7 +157,7 @@
sg.Id = interfaces.GroupId(5555)
tx = st.NewTransaction()
- if err = addSyncGroup(nil, tx, sg); err == nil {
+ if err = s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 3, 3, sg); err == nil {
t.Errorf("adding SyncGroup %s with a different ID did not fail", sgName)
}
tx.Abort()
@@ -173,9 +171,6 @@
if id, err := getSyncGroupId(nil, st, badName); err == nil {
t.Errorf("found non-existing SyncGroup %s: got ID %d", badName, id)
}
- if name, err := getSyncGroupName(nil, st, badId); err == nil {
- t.Errorf("found non-existing SyncGroup %d: got name %s", badId, name)
- }
if sg, err := getSyncGroupByName(nil, st, badName); err == nil {
t.Errorf("found non-existing SyncGroup %s: got %v", badName, sg)
}
@@ -191,10 +186,11 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
checkBadAddSyncGroup := func(t *testing.T, st store.Store, sg *interfaces.SyncGroup, msg string) {
tx := st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err == nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err == nil {
t.Errorf("checkBadAddSyncGroup: adding bad SyncGroup (%s) did not fail", msg)
}
tx.Abort()
@@ -202,13 +198,22 @@
checkBadAddSyncGroup(t, st, nil, "nil SG")
- sg := &interfaces.SyncGroup{Id: 1234}
+ sg := &interfaces.SyncGroup{}
checkBadAddSyncGroup(t, st, sg, "SG w/o name")
- sg = &interfaces.SyncGroup{Name: "foobar"}
- checkBadAddSyncGroup(t, st, sg, "SG w/o Id")
+ sg.Name = "foobar"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o AppName")
- sg.Id = 1234
+ sg.AppName = "mockApp"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o DbName")
+
+ sg.DbName = "mockDb"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o creator")
+
+ sg.Creator = "haha"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o ID")
+
+ sg.Id = newSyncGroupId()
checkBadAddSyncGroup(t, st, sg, "SG w/o Version")
sg.SpecVersion = "v1"
@@ -218,6 +223,9 @@
"phone": nosql.SyncGroupMemberInfo{SyncPriority: 10},
}
checkBadAddSyncGroup(t, st, sg, "SG w/o Prefixes")
+
+ sg.Spec.Prefixes = []string{"foo", "bar", "foo"}
+ checkBadAddSyncGroup(t, st, sg, "SG with duplicate Prefixes")
}
// TestDeleteSyncGroup tests deleting a SyncGroup.
@@ -227,6 +235,7 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
sgName := "foobar"
sgId := interfaces.GroupId(1234)
@@ -264,7 +273,7 @@
}
tx = st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err != nil {
t.Errorf("creating SyncGroup ID %d failed: %v", sgId, err)
}
if err := tx.Commit(); err != nil {
@@ -285,16 +294,24 @@
checkSGStats(t, svc, "del-3", 0, 0)
- // Create it again then delete it by name.
+ // Create it again, update it, then delete it by name.
tx = st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg); err != nil {
t.Errorf("creating SyncGroup ID %d after delete failed: %v", sgId, err)
}
if err := tx.Commit(); err != nil {
t.Errorf("cannot commit adding SyncGroup ID %d after delete: %v", sgId, err)
}
+ tx = st.NewTransaction()
+ if err := s.updateSyncGroupVersioning(nil, tx, NoVersion, true, s.id, 3, 3, sg); err != nil {
+ t.Errorf("updating SyncGroup ID %d version: %v", sgId, err)
+ }
+ if err := tx.Commit(); err != nil {
+ t.Errorf("cannot commit updating SyncGroup ID %d version: %v", sgId, err)
+ }
+
checkSGStats(t, svc, "del-4", 1, 3)
tx = st.NewTransaction()
@@ -315,6 +332,7 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
sgName1, sgName2 := "foo", "bar"
sgId1, sgId2 := interfaces.GroupId(1234), interfaces.GroupId(8888)
@@ -357,7 +375,7 @@
}
tx := st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg1); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg1); err != nil {
t.Errorf("creating SyncGroup ID %d failed: %v", sgId1, err)
}
if err := tx.Commit(); err != nil {
@@ -367,7 +385,7 @@
checkSGStats(t, svc, "multi-1", 1, 3)
tx = st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg2); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg2); err != nil {
t.Errorf("creating SyncGroup ID %d failed: %v", sgId2, err)
}
if err := tx.Commit(); err != nil {
@@ -510,3 +528,21 @@
}
}
}
+
+// TestPrefixCompare tests the prefix comparison utility.
+func TestPrefixCompare(t *testing.T) {
+ check := func(t *testing.T, pfx1, pfx2 []string, want bool, msg string) {
+ if got := samePrefixes(pfx1, pfx2); got != want {
+ t.Errorf("samePrefixes: %s: got %t instead of %t", msg, got, want)
+ }
+ }
+
+ check(t, nil, nil, true, "both nil")
+ check(t, []string{}, nil, true, "empty vs nil")
+ check(t, []string{"a", "b"}, []string{"b", "a"}, true, "different ordering")
+ check(t, []string{"a", "b", "c"}, []string{"b", "a"}, false, "p1 superset of p2")
+ check(t, []string{"a", "b"}, []string{"b", "a", "c"}, false, "p2 superset of p1")
+ check(t, []string{"a", "b", "c"}, []string{"b", "d", "a"}, false, "overlap")
+ check(t, []string{"a", "b", "c"}, []string{"x", "y"}, false, "no overlap")
+ check(t, []string{"a", "b"}, []string{"B", "a"}, false, "upper/lowercases")
+}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 9cf53d3..a0c8731 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -43,3 +43,44 @@
Metadata interfaces.LogRecMetadata
Pos uint64 // position in the Database log.
}
+
+// sgLocalState holds the SyncGroup local state, only relevant to this member
+// (i.e. the local Syncbase). This is needed for crash recovery of the internal
+// state transitions of the SyncGroup.
+type sgLocalState struct {
+ // The count of local joiners to the same SyncGroup.
+ NumLocalJoiners uint32
+
+ // The SyncGroup is watched when the sync Watcher starts processing the
+ // SyncGroup data. When a SyncGroup is created or joined, an entry is
+ // added to the Watcher queue (log) to inform it from which point to
+ // start accepting store mutations, an asynchronous notification similar
+ // to regular store mutations. When the Watcher processes that queue
+ // entry, it sets this bit to true. When Syncbase restarts, the value
+ // of this bit allows the new sync Watcher to recreate its in-memory
+ // state by resuming to watch only the prefixes of SyncGroups that were
+ // previously being watched.
+ Watched bool
+
+ // The SyncGroup was published here by this remote peer (if non-empty
+ // string), typically the SyncGroup creator. In this case the SyncGroup
+ // cannot be GCed locally even if it has no local joiners.
+ RemotePublisher string
+
+ // The SyncGroup is in pending state on a device that learns the current
+ // state of the SyncGroup from another device but has not yet received
+ // through peer-to-peer sync the history of the changes (DAG and logs).
+ // This happens in two cases:
+ // 1- A joiner was accepted into a SyncGroup by a SyncGroup admin and
+ // only given the current SyncGroup info synchronously and will
+ // receive the full history later via p2p sync.
+ // 2- A remote server where the SyncGroup is published was told by the
+ // SyncGroup publisher the current SyncGroup info synchronously and
+ // will receive the full history later via p2p sync.
+ // The pending state is over when the device reaches or exceeds the
+ // knowledge level indicated in the pending genvec. While SyncPending
+ // is true, no local SyncGroup mutations are allowed (i.e. no join or
+ // set-spec requests).
+ SyncPending bool
+ PendingGenVec interfaces.PrefixGenVector
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index c02d2d4..34f425f 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -61,11 +61,55 @@
}) {
}
+// sgLocalState holds the SyncGroup local state, only relevant to this member
+// (i.e. the local Syncbase). This is needed for crash recovery of the internal
+// state transitions of the SyncGroup.
+type sgLocalState struct {
+ // The count of local joiners to the same SyncGroup.
+ NumLocalJoiners uint32
+ // The SyncGroup is watched when the sync Watcher starts processing the
+ // SyncGroup data. When a SyncGroup is created or joined, an entry is
+ // added to the Watcher queue (log) to inform it from which point to
+ // start accepting store mutations, an asynchronous notification similar
+ // to regular store mutations. When the Watcher processes that queue
+ // entry, it sets this bit to true. When Syncbase restarts, the value
+ // of this bit allows the new sync Watcher to recreate its in-memory
+ // state by resuming to watch only the prefixes of SyncGroups that were
+ // previously being watched.
+ Watched bool
+ // The SyncGroup was published here by this remote peer (if non-empty
+ // string), typically the SyncGroup creator. In this case the SyncGroup
+ // cannot be GCed locally even if it has no local joiners.
+ RemotePublisher string
+ // The SyncGroup is in pending state on a device that learns the current
+ // state of the SyncGroup from another device but has not yet received
+ // through peer-to-peer sync the history of the changes (DAG and logs).
+ // This happens in two cases:
+ // 1- A joiner was accepted into a SyncGroup by a SyncGroup admin and
+ // only given the current SyncGroup info synchronously and will
+ // receive the full history later via p2p sync.
+ // 2- A remote server where the SyncGroup is published was told by the
+ // SyncGroup publisher the current SyncGroup info synchronously and
+ // will receive the full history later via p2p sync.
+ // The pending state is over when the device reaches or exceeds the
+ // knowledge level indicated in the pending genvec. While SyncPending
+ // is true, no local SyncGroup mutations are allowed (i.e. no join or
+ // set-spec requests).
+ SyncPending bool
+ PendingGenVec interfaces.PrefixGenVector
+}
+
+func (sgLocalState) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/vsync.sgLocalState"`
+}) {
+}
+
func init() {
vdl.Register((*syncData)(nil))
vdl.Register((*localGenInfo)(nil))
vdl.Register((*dbSyncState)(nil))
vdl.Register((*localLogRec)(nil))
+ vdl.Register((*sgLocalState)(nil))
}
const logPrefix = "log" // log state.