syncbase/vsync: Addressing Adam's review comments from go/vcl/12873 and a few other cleanups.
Change-Id: I876d7685cc5e34c9575c0dfe4c41271826a97dc1
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 410371e..3bb3667 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -21,7 +21,7 @@
// records, the responder's genvector, and a "Finish" DeltaResp
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
- // next Database, in common with this responder.
+ // next Database in common with this responder.
GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
// SyncGroup-related methods.
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 0ca399f..b87397b 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -35,7 +35,7 @@
// records, the responder's genvector, and a "Finish" DeltaResp
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
- // next Database, in common with this responder.
+ // next Database in common with this responder.
GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
@@ -206,7 +206,7 @@
// records, the responder's genvector, and a "Finish" DeltaResp
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
- // next Database, in common with this responder.
+ // next Database in common with this responder.
GetDeltas(*context.T, SyncGetDeltasServerCall) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
@@ -235,7 +235,7 @@
// records, the responder's genvector, and a "Finish" DeltaResp
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
- // next Database, in common with this responder.
+ // next Database in common with this responder.
GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
@@ -314,7 +314,7 @@
Methods: []rpc.MethodDesc{
{
Name: "GetDeltas",
- Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database, in common with this responder.",
+ Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
{
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 5ab18b0..6d608a3 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -49,7 +49,11 @@
RecType byte // type of log record.
// Object related information.
- ObjId string // id of the object that was updated. This id is relative to Application and Database names.
+
+ // Id of the object that was updated. This id is relative to Application
+ // and Database names and is the store key for a particular row in a
+ // table.
+ ObjId string
CurVers string // current version number of the object.
Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
UpdTime time.Time // timestamp when the update is generated.
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index aa0872a..aeb4e31 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -48,8 +48,10 @@
Id uint64 // device id that created the log record.
Gen uint64 // generation number for the log record.
RecType byte // type of log record.
- // Object related information.
- ObjId string // id of the object that was updated. This id is relative to Application and Database names.
+ // Id of the object that was updated. This id is relative to Application
+ // and Database names and is the store key for a particular row in a
+ // table.
+ ObjId string
CurVers string // current version number of the object.
Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
UpdTime time.Time // timestamp when the update is generated.
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3c4a054..963121e 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -208,7 +208,85 @@
wtx.fromSync = true
}
-// Exported as a helper function for testing purposes
+// GetVersion returns the current version of a managed key. This method is used
+// by the Sync module when the initiator is attempting to add new versions of
+// objects. Reading the version key is used for optimistic concurrency
+// control. At minimum, an object implementing the StoreReader interface is
+// required since this is a Get operation.
+func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
+ switch w := st.(type) {
+ case *transaction:
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.err != nil {
+ return nil, convertError(w.err)
+ }
+ return getVersion(w.itx, key)
+ case *wstore:
+ return getVersion(w.ist, key)
+ }
+ return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// GetAtVersion returns the value of a managed key at the requested
+// version. This method is used by the Sync module when the responder needs to
+// send objects over the wire. At minimum, an object implementing the
+// StoreReader interface is required since this is a Get operation.
+func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+ switch w := st.(type) {
+ case *transaction:
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.err != nil {
+ return valbuf, convertError(w.err)
+ }
+ return getAtVersion(w.itx, key, valbuf, version)
+ case *wstore:
+ return getAtVersion(w.ist, key, valbuf, version)
+ }
+ return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// PutAtVersion puts a value for the managed key at the requested version. This
+// method is used by the Sync module exclusively when the initiator adds objects
+// with versions created on other Syncbases. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+ wtx := tx.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+
+ // Note that we do not enqueue a PutOp in the log since this Put is not
+ // updating the current version of a key.
+ return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
+}
+
+// PutVersion updates the version of a managed key to the requested
+// version. This method is used by the Sync module exclusively when the
+// initiator selects which of the already stored versions (via PutAtVersion
+// calls) becomes the current version. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+ wtx := tx.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+
+ if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
+ return err
+ }
+ wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
+ return nil
+}
+
+// Exported as a helper function for testing purposes.
func getLogEntryKey(seq uint64) string {
// Note: MaxUint64 is 0xffffffffffffffff.
// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index 036d326..a21057f 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -20,7 +20,6 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/context"
"v.io/v23/verror"
)
@@ -50,34 +49,6 @@
return []byte(join(string(key), string(version)))
}
-// GetVersion returns the current version of a managed key. This method is used
-// by the Sync module when the initiator is attempting to add new versions of
-// objects. Reading the version key is used for optimistic concurrency control.
-func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
- wtx := st.(*transaction)
-
- wtx.mu.Lock()
- defer wtx.mu.Unlock()
- if wtx.err != nil {
- return nil, convertError(wtx.err)
- }
- return getVersion(wtx.itx, key)
-}
-
-// GetAtVersion returns the value of a managed key at the requested
-// version. This method is used by the Sync module when the responder needs to
-// send objects over the wire.
-func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
- wtx := st.(*transaction)
-
- wtx.mu.Lock()
- defer wtx.mu.Unlock()
- if wtx.err != nil {
- return valbuf, convertError(wtx.err)
- }
- return getAtVersion(wtx.itx, key, valbuf, version)
-}
-
func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
return st.Get(makeVersionKey(key), nil)
}
@@ -95,41 +66,6 @@
return getAtVersion(st, key, valbuf, version)
}
-// PutAtVersion puts a value for the managed key at the requested version. This
-// method is used by the Sync module exclusively when the initiator adds objects
-// with versions created on other Syncbases.
-func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
- wtx := tx.(*transaction)
-
- wtx.mu.Lock()
- defer wtx.mu.Unlock()
- if wtx.err != nil {
- return convertError(wtx.err)
- }
-
- return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
-}
-
-// PutVersion updates the version of a managed key to the requested
-// version. This method is used by the Sync module exclusively when the
-// initiator selects which of the already stored versions (via PutAtVersion
-// calls) becomes the current version.
-func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
- wtx := tx.(*transaction)
-
- wtx.mu.Lock()
- defer wtx.mu.Unlock()
- if wtx.err != nil {
- return convertError(wtx.err)
- }
-
- if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
- return err
- }
- wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
- return nil
-}
-
func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
version := NewVersion()
if err := tx.Put(makeVersionKey(key), version); err != nil {
diff --git a/services/syncbase/vsync/cr.go b/services/syncbase/vsync/conflict_resolution.go
similarity index 83%
rename from services/syncbase/vsync/cr.go
rename to services/syncbase/vsync/conflict_resolution.go
index f7b1827..724e49f 100644
--- a/services/syncbase/vsync/cr.go
+++ b/services/syncbase/vsync/conflict_resolution.go
@@ -7,6 +7,7 @@
import (
"v.io/v23/context"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
// Policies for conflict resolution.
@@ -64,9 +65,9 @@
// resolveObjConflict resolves a conflict for an object given its ID and the 3
// versions that express the conflict: the object's local version, its remote
-// version (from the device contacted), and the common ancestor from which both
-// versions branched away. The function returns the new object value according
-// to the conflict resolution policy.
+// version (from the device contacted), and the closest common ancestor (see
+// dag.go on how the ancestor is chosen). The function returns the new object
+// value according to the conflict resolution policy.
func (iSt *initiationState) resolveObjConflict(ctx *context.T, oid, local, remote, ancestor string) (*conflictResolution, error) {
// Fetch the log records of the 3 object versions.
versions := []string{local, remote, ancestor}
@@ -101,35 +102,29 @@
res.ty = pickLocal
case local.Metadata.CurVers < remote.Metadata.CurVers:
res.ty = pickRemote
+ default:
+ vlog.Fatalf("resolveObjConflictByTime:: local and remote update times and versions are the same, local %v remote %v", local, remote)
}
return &res, nil
}
-// getLogRecsBatch gets the log records for an array of versions.
+// getLogRecsBatch gets the log records for an array of versions for a given object.
func (iSt *initiationState) getLogRecsBatch(ctx *context.T, obj string, versions []string) ([]*localLogRec, error) {
lrecs := make([]*localLogRec, len(versions))
- var err error
for p, v := range versions {
- lrecs[p], err = iSt.getLogRec(ctx, obj, v)
+ logKey, err := getLogRecKey(ctx, iSt.tx, obj, v)
+ if err != nil {
+ return nil, err
+ }
+ dev, gen, err := splitLogRecKey(ctx, logKey)
+ if err != nil {
+ return nil, err
+ }
+ lrecs[p], err = getLogRec(ctx, iSt.tx, dev, gen)
if err != nil {
return nil, err
}
}
return lrecs, nil
}
-
-// getLogRec returns the log record corresponding to a given object and its version.
-func (iSt *initiationState) getLogRec(ctx *context.T, obj, vers string) (*localLogRec, error) {
- // TODO(hpucha): May be the change the name in dag for getLogrec. We now
- // have a few functions of this name.
- logKey, err := getLogrec(ctx, iSt.tx, obj, vers)
- if err != nil {
- return nil, err
- }
- dev, gen, err := splitLogRecKey(ctx, logKey)
- if err != nil {
- return nil, err
- }
- return getLogRec(ctx, iSt.tx, dev, gen)
-}
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index c0d8efa..0f5adfd 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -688,8 +688,8 @@
return nil
}
-// getLogrec returns the log record information for a given object version.
-func getLogrec(ctx *context.T, st store.StoreReader, oid, version string) (string, error) {
+// getLogRecKey returns the key of the log record for a given object version.
+func getLogRecKey(ctx *context.T, st store.StoreReader, oid, version string) (string, error) {
node, err := getNode(ctx, st, oid, version)
if err != nil {
return "", err
@@ -702,7 +702,7 @@
// nodeKey returns the key used to access a DAG node (oid, version).
func nodeKey(oid, version string) string {
- return util.JoinKeyParts(util.SyncPrefix, "dag", "n", oid, version)
+ return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "n", oid, version)
}
// setNode stores the DAG node entry.
@@ -752,7 +752,7 @@
// headKey returns the key used to access the DAG object head.
func headKey(oid string) string {
- return util.JoinKeyParts(util.SyncPrefix, "dag", "h", oid)
+ return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "h", oid)
}
// setHead stores version as the DAG object head.
@@ -784,7 +784,7 @@
// batchKey returns the key used to access the DAG batch info.
func batchKey(btid uint64) string {
- return util.JoinKeyParts(util.SyncPrefix, "dag", "b", fmt.Sprintf("%d", btid))
+ return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "b", fmt.Sprintf("%d", btid))
}
// setBatch stores the DAG batch entry.
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index 7dd7175..1aa75d5 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -35,7 +35,7 @@
t.Errorf("hasNode() found non-existent object %s:%s", oid, version)
}
- if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+ if logrec, err := getLogRecKey(nil, st, oid, version); err == nil || logrec != "" {
t.Errorf("non-existent object %s:%s has a logrec: %v", oid, version, logrec)
}
@@ -60,7 +60,7 @@
t.Errorf("object %s:%s has wrong data: %v instead of %v", oid, version, node2, node)
}
- if logrec, err := getLogrec(nil, st, oid, version); err != nil || logrec != "logrec-23" {
+ if logrec, err := getLogRecKey(nil, st, oid, version); err != nil || logrec != "logrec-23" {
t.Errorf("object %s:%s has wrong logrec: %s", oid, version, logrec)
}
}
@@ -96,7 +96,7 @@
t.Errorf("hasNode() found deleted object %s:%s", oid, version)
}
- if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+ if logrec, err := getLogRecKey(nil, st, oid, version); err == nil || logrec != "" {
t.Errorf("deleted object %s:%s has logrec: %s", oid, version, logrec)
}
}
@@ -335,7 +335,7 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -411,10 +411,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -494,13 +494,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+ if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -587,13 +587,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+ if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -674,10 +674,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 1d447c5..600af4b 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -4,6 +4,12 @@
package vsync
+// Initiator is a goroutine that periodically picks a peer from all the known
+// remote peers, and requests deltas from that peer for all the SyncGroups in
+// common across all apps/databases. It then modifies the sync metadata (DAG and
+// local log records) based on the deltas, detects and resolves conflicts if
+// any, and suitably updates the local Databases.
+
import (
"sort"
"strings"
@@ -68,6 +74,9 @@
case <-ticker.C:
}
+ // TODO(hpucha): Cut a gen for the responder even if there is no
+ // one to initiate to?
+
// Do work.
peer, err := s.pickPeer(ctx)
if err != nil {
@@ -82,12 +91,21 @@
// * Contacting the peer to receive all the deltas based on the local genvector.
// * Processing those deltas to discover objects which have been updated.
// * Processing updated objects to detect and resolve any conflicts if needed.
-// * Communicating relevant object updates to the Database.
-// The processing of the deltas is done one Database at a time.
+// * Communicating relevant object updates to the Database, and updating local
+// genvector to catch up to the received remote genvector.
+//
+// The processing of the deltas is done one Database at a time. If a local error
+// is encountered during the processing of a Database, that Database is skipped
+// and the initiator continues on to the next one. If the connection to the peer
+// encounters an error, this initiation round is aborted. Note that until the
+// local genvector is updated based on the received deltas (the last step in an
+// initiation round), the work done by the initiator is idempotent.
+//
+// TODO(hpucha): Check the idempotence, esp in addNode in DAG.
func (s *syncService) getDeltasFromPeer(ctx *context.T, peer string) {
info := s.allMembers.members[peer]
if info == nil {
- return
+ vlog.Fatalf("getDeltasFromPeer:: missing information in member view for %q", peer)
}
connected := false
var stream interfaces.SyncGetDeltasClientCall
@@ -299,11 +317,11 @@
// devices. These remote devices in turn can send these
// generations back to the initiator in progress which was
// started with older generation information.
- if err := iSt.sync.checkPtLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
+ if err := iSt.sync.checkptLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
return err
}
- local, lgen, err := iSt.sync.getDbGenInfo(ctx, iSt.appName, iSt.dbName)
+ local, lgen, err := iSt.sync.copyDbGenInfo(ctx, iSt.appName, iSt.dbName)
if err != nil {
return err
}
@@ -364,6 +382,10 @@
// repeatedly doesn't affect what data is seen next.
rcvr := iSt.stream.RecvStream()
start, finish := false, false
+
+ // TODO(hpucha): See if we can avoid committing the entire delta stream
+ // as one batch. Currently the dependency is between the log records and
+ // the batch info.
tx := iSt.st.NewTransaction()
committed := false
@@ -373,7 +395,7 @@
}
}()
- // Track received batches.
+ // Track received batches (BatchId --> BatchCount mapping).
batchMap := make(map[uint64]uint64)
for rcvr.Advance() {
@@ -396,7 +418,7 @@
case interfaces.DeltaRespRec:
// Insert log record in Database.
- // TODO(hpucha): Should we reserve more postions in a batch?
+ // TODO(hpucha): Should we reserve more positions in a batch?
// TODO(hpucha): Handle if SyncGroup is left/destroyed while sync is in progress.
pos := iSt.sync.reservePosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
rec := &localLogRec{Metadata: v.Value.Metadata, Pos: pos}
@@ -444,6 +466,11 @@
// should not conflict with any other keys. So if it fails, it is a
// non-retriable error.
err := tx.Commit()
+ if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
+ // Note: This might be triggered with memstore until it handles
+ // transactions in a more fine-grained fashion.
+ vlog.Fatalf("recvAndProcessDeltas:: encountered concurrent transaction")
+ }
if err == nil {
committed = true
}
@@ -474,7 +501,9 @@
return err
}
// TODO(hpucha): Hack right now. Need to change Database's handling of
- // deleted objects.
+ // deleted objects. Currently, the initiator needs to treat deletions
+ // specially since deletions do not get a version number or a special
+ // value in the Database.
if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
}
@@ -482,8 +511,13 @@
}
// processUpdatedObjects processes all the updates received by the initiator,
-// one object at a time. For each updated object, we first check if the object
-// has any conflicts, resulting in three possibilities:
+// one object at a time. Conflict detection and resolution is carried out after
+// the entire delta of log records is replayed, instead of incrementally after
+// each record/batch is replayed, to avoid repeating conflict resolution already
+// performed by other peers.
+//
+// For each updated object, we first check if the object has any conflicts,
+// resulting in three possibilities:
//
// * There is no conflict, and no updates are needed to the Database
// (isConflict=false, newHead == oldHead). All changes received convey
@@ -565,16 +599,16 @@
// detectConflicts iterates through all the updated objects to detect conflicts.
func (iSt *initiationState) detectConflicts(ctx *context.T) error {
- for obj, st := range iSt.updObjects {
+ for objid, confSt := range iSt.updObjects {
// Check if object has a conflict.
var err error
- st.isConflict, st.newHead, st.oldHead, st.ancestor, err = hasConflict(ctx, iSt.tx, obj, iSt.dagGraft)
+ confSt.isConflict, confSt.newHead, confSt.oldHead, confSt.ancestor, err = hasConflict(ctx, iSt.tx, objid, iSt.dagGraft)
if err != nil {
return err
}
- if !st.isConflict {
- st.res = &conflictResolution{ty: pickRemote}
+ if !confSt.isConflict {
+ confSt.res = &conflictResolution{ty: pickRemote}
}
}
return nil
@@ -583,10 +617,10 @@
// updateDbAndSync updates the Database, and if that is successful, updates log,
// dag and genvector data structures as needed.
func (iSt *initiationState) updateDbAndSyncSt(ctx *context.T) error {
- for obj, st := range iSt.updObjects {
+ for objid, confSt := range iSt.updObjects {
// If the local version is picked, no further updates to the
// Database are needed.
- if st.res.ty == pickLocal {
+ if confSt.res.ty == pickLocal {
continue
}
@@ -596,8 +630,8 @@
// TODO(hpucha): Hack right now. Need to change Database's
// handling of deleted objects.
oldVersDeleted := true
- if st.oldHead != NoVersion {
- oldDagNode, err := getNode(ctx, iSt.tx, obj, st.oldHead)
+ if confSt.oldHead != NoVersion {
+ oldDagNode, err := getNode(ctx, iSt.tx, objid, confSt.oldHead)
if err != nil {
return err
}
@@ -606,17 +640,17 @@
var newVersion string
var newVersDeleted bool
- switch st.res.ty {
+ switch confSt.res.ty {
case pickRemote:
- newVersion = st.newHead
- newDagNode, err := getNode(ctx, iSt.tx, obj, newVersion)
+ newVersion = confSt.newHead
+ newDagNode, err := getNode(ctx, iSt.tx, objid, newVersion)
if err != nil {
return err
}
newVersDeleted = newDagNode.Deleted
case createNew:
- newVersion = st.res.rec.Metadata.CurVers
- newVersDeleted = st.res.rec.Metadata.Delete
+ newVersion = confSt.res.rec.Metadata.CurVers
+ newVersDeleted = confSt.res.rec.Metadata.Delete
}
// Skip delete followed by a delete.
@@ -626,36 +660,36 @@
if !oldVersDeleted {
// Read current version to enter it in the readset of the transaction.
- version, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj))
+ version, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid))
if err != nil {
return err
}
- if string(version) != st.oldHead {
+ if string(version) != confSt.oldHead {
return store.NewErrConcurrentTransaction(ctx)
}
} else {
// Ensure key doesn't exist.
- if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+ if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
return store.NewErrConcurrentTransaction(ctx)
}
}
if !newVersDeleted {
- if st.res.ty == createNew {
- if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(obj), st.res.val, []byte(newVersion)); err != nil {
+ if confSt.res.ty == createNew {
+ if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil {
return err
}
}
- if err := watchable.PutVersion(ctx, iSt.tx, []byte(obj), []byte(newVersion)); err != nil {
+ if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil {
return err
}
} else {
- if err := iSt.tx.Delete([]byte(obj)); err != nil {
+ if err := iSt.tx.Delete([]byte(objid)); err != nil {
return err
}
}
- if err := iSt.updateLogAndDag(ctx, obj); err != nil {
+ if err := iSt.updateLogAndDag(ctx, objid); err != nil {
return err
}
}
@@ -665,31 +699,31 @@
// updateLogAndDag updates the log and dag data structures.
func (iSt *initiationState) updateLogAndDag(ctx *context.T, obj string) error {
- st, ok := iSt.updObjects[obj]
+ confSt, ok := iSt.updObjects[obj]
if !ok {
return verror.New(verror.ErrInternal, ctx, "object state not found", obj)
}
var newVersion string
- if !st.isConflict {
- newVersion = st.newHead
+ if !confSt.isConflict {
+ newVersion = confSt.newHead
} else {
// Object had a conflict. Create a log record to reflect resolution.
var rec *localLogRec
switch {
- case st.res.ty == pickLocal:
+ case confSt.res.ty == pickLocal:
// Local version was picked as the conflict resolution.
- rec = iSt.createLocalLinkLogRec(ctx, obj, st.oldHead, st.newHead)
- newVersion = st.oldHead
- case st.res.ty == pickRemote:
+ rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.oldHead, confSt.newHead)
+ newVersion = confSt.oldHead
+ case confSt.res.ty == pickRemote:
// Remote version was picked as the conflict resolution.
- rec = iSt.createLocalLinkLogRec(ctx, obj, st.newHead, st.oldHead)
- newVersion = st.newHead
+ rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.newHead, confSt.oldHead)
+ newVersion = confSt.newHead
default:
// New version was created to resolve the conflict.
- rec = st.res.rec
- newVersion = st.res.rec.Metadata.CurVers
+ rec = confSt.res.rec
+ newVersion = confSt.res.rec.Metadata.CurVers
}
if err := putLogRec(ctx, iSt.tx, rec); err != nil {
@@ -742,14 +776,14 @@
// updateSyncSt updates local sync state at the end of an initiator cycle.
func (iSt *initiationState) updateSyncSt(ctx *context.T) error {
// Get the current local sync state.
- dsInMem, err := iSt.sync.getDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
+ dsInMem, err := iSt.sync.copyDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
if err != nil {
return err
}
ds := &dbSyncState{
- Gen: dsInMem.gen,
- CkPtGen: dsInMem.ckPtGen,
- GenVec: dsInMem.genvec,
+ Gen: dsInMem.gen,
+ CheckptGen: dsInMem.checkptGen,
+ GenVec: dsInMem.genvec,
}
// remote can be a subset of local.
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index d3c55d4..bb62d6e 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -2,6 +2,14 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+// The initiator tests below are driven by replaying the state from the log
+// files (in testdata directory). These log files may mimic watching the
+// Database locally (addl commands in the log file) or obtaining log records and
+// generation vector from a remote peer (addr, genvec commands). The log files
+// contain the metadata of log records. The log files are only used to set up
+// the state. The tests verify that given a particular local state and a stream
+// of remote deltas, the initiator behaves as expected.
+
package vsync
import (
@@ -199,8 +207,7 @@
// Helpers.
func testInit(t *testing.T, lfile, rfile string) (*mockService, *initiationState, func(*testing.T, *mockService)) {
- // Set a large value to prevent the threads from firing.
- // Test is not thread safe.
+ // Set a large value to prevent the initiator from running.
peerSyncInterval = 1 * time.Hour
conflictResolutionPolicy = useTime
svc := createService(t)
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index d096aa8..2049755 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -164,7 +164,7 @@
var respVec interfaces.GenVector
var respGen uint64
- respVec, respGen, rSt.errState = rSt.sync.getDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
+ respVec, respGen, rSt.errState = rSt.sync.copyDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
if rSt.errState != nil {
return
}
@@ -426,14 +426,9 @@
// makeWireLogRec creates a sync log record to send on the wire from a given
// local sync record.
func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
- // Get the object value at the required version. Note: GetAtVersion()
- // requires a transaction to read the data, so create and abort one.
- // TODO(hpucha): remove the fake Tx after the change in GetAtVersion().
- tx := st.NewTransaction()
- defer tx.Abort()
-
+ // Get the object value at the required version.
key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
- value, err := watchable.GetAtVersion(ctx, tx, []byte(key), nil, []byte(version))
+ value, err := watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
if err != nil {
return nil, err
}
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
index b452c07..cf67ca8 100644
--- a/services/syncbase/vsync/responder_test.go
+++ b/services/syncbase/vsync/responder_test.go
@@ -350,7 +350,7 @@
s.id = 10 //responder.
wantDiff, wantVec := test.genDiff, test.outVec
- s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
+ s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
rSt := newResponderState(nil, nil, s, req)
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index d268843..d0322d4 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -81,7 +81,7 @@
return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2))
}
-// randIntn generates as an int, a non-negative pseudo-random number in [0,n).
+// randIntn mimics rand.Intn (generates a non-negative pseudo-random number in [0,n)).
func randIntn(n int) int {
rngLock.Lock()
defer rngLock.Unlock()
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 9f8bf17..c0e1429 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -46,8 +46,8 @@
gen uint64
pos uint64
- ckPtGen uint64
- genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+ checkptGen uint64
+ genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
}
// initSync initializes the sync module during startup. It scans all the
@@ -144,8 +144,8 @@
return gen, pos
}
-// checkPtLocalGen freezes the local generation number for the responder's use.
-func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error {
+// checkptLocalGen freezes the local generation number for the responder's use.
+func (s *syncService) checkptLocalGen(ctx *context.T, appName, dbName string) error {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -157,7 +157,7 @@
// The frozen generation is the last generation number used, i.e. one
// below the next available one to use.
- ds.ckPtGen = ds.gen - 1
+ ds.checkptGen = ds.gen - 1
return nil
}
@@ -172,8 +172,8 @@
}
}
-// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
-func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
+// copyDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
+func (s *syncService) copyDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -184,19 +184,18 @@
}
dsCopy := &dbSyncStateInMem{
- gen: ds.gen,
- pos: ds.pos,
- ckPtGen: ds.ckPtGen,
+ gen: ds.gen,
+ pos: ds.pos,
+ checkptGen: ds.checkptGen,
}
- // Make a copy of the genvec.
dsCopy.genvec = copyGenVec(ds.genvec)
return dsCopy, nil
}
-// getDbGenInfo returns a copy of the current generation information of the Database.
-func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
+// copyDbGenInfo returns a copy of the current generation information of the Database.
+func (s *syncService) copyDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -206,15 +205,14 @@
return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
- // Make a copy of the genvec.
genvec := copyGenVec(ds.genvec)
// Add local generation information to the genvec.
for _, gv := range genvec {
- gv[s.id] = ds.ckPtGen
+ gv[s.id] = ds.checkptGen
}
- return genvec, ds.ckPtGen, nil
+ return genvec, ds.checkptGen, nil
}
// putDbGenInfoRemote puts the current remote generation information of the Database.
@@ -228,7 +226,6 @@
return verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
- // Make a copy of the genvec.
ds.genvec = copyGenVec(genvec)
return nil
@@ -268,7 +265,7 @@
// dbSyncStateKey returns the key used to access the sync state of a Database.
func dbSyncStateKey() string {
- return util.JoinKeyParts(util.SyncPrefix, "dbss")
+ return util.JoinKeyParts(util.SyncPrefix, dbssPrefix)
}
// putDbSyncState persists the sync state object for a given Database.
@@ -291,12 +288,12 @@
// logRecsPerDeviceScanPrefix returns the prefix used to scan log records for a particular device.
func logRecsPerDeviceScanPrefix(id uint64) string {
- return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id))
+ return util.JoinKeyParts(util.SyncPrefix, logPrefix, fmt.Sprintf("%x", id))
}
// logRecKey returns the key used to access a specific log record.
func logRecKey(id, gen uint64) string {
- return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
+ return util.JoinKeyParts(util.SyncPrefix, logPrefix, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
}
// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
@@ -306,11 +303,14 @@
if len(parts) != 4 {
return 0, 0, verr
}
+ if parts[0] != util.SyncPrefix || parts[1] != logPrefix {
+ return 0, 0, verr
+ }
id, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
return 0, 0, verr
}
- gen, err := strconv.ParseUint(parts[3], 10, 64)
+ gen, err := strconv.ParseUint(parts[3], 16, 64)
if err != nil {
return 0, 0, verr
}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 437a373..3929a05 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -112,6 +112,32 @@
checkLogRec(t, st, id, gen, false, nil)
}
+func TestLogRecKeyUtils(t *testing.T) {
+ invalid := []string{"$sync:aa:bb", "log:aa:bb", "$sync:log:aa:xx", "$sync:log:x:bb"}
+
+ for _, k := range invalid {
+ if _, _, err := splitLogRecKey(nil, k); err == nil {
+ t.Fatalf("splitting log rec key didn't fail %q", k)
+ }
+ }
+
+ valid := []struct {
+ id uint64
+ gen uint64
+ }{
+ {10, 20},
+ {190, 540},
+ {9999, 999999},
+ }
+
+ for _, v := range valid {
+ gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.id, v.gen))
+ if gotId != v.id || gotGen != v.gen || err != nil {
+ t.Fatalf("failed key conversion id got %v want %v, gen got %v want %v, err %v", gotId, v.id, gotGen, v.gen, err)
+ }
+ }
+}
+
//////////////////////////////
// Helpers
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 31b1a3d..5be4af6 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,7 +15,6 @@
import (
"fmt"
- "strings"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -285,17 +284,17 @@
// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
func sgDataKeyScanPrefix() string {
- return util.JoinKeyParts(util.SyncPrefix, "sg", "d")
+ return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
}
// sgDataKey returns the key used to access the SyncGroup data entry.
func sgDataKey(gid interfaces.GroupId) string {
- return util.JoinKeyParts(util.SyncPrefix, "sg", "d", fmt.Sprintf("%d", gid))
+ return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
}
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
- return util.JoinKeyParts(util.SyncPrefix, "sg", "n", name)
+ return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
}
// hasSGDataEntry returns true if the SyncGroup data entry exists.
@@ -552,15 +551,6 @@
return err
}
-// TODO(hpucha): Should this be generalized?
-func splitPrefix(name string) (string, string) {
- parts := strings.SplitN(name, "/", 2)
- if len(parts) == 2 {
- return parts[0], parts[1]
- }
- return parts[0], ""
-}
-
// bootstrapSyncGroup inserts into the transaction log a SyncGroup operation and
// a set of Snapshot operations to notify the sync watcher about the SyncGroup
// prefixes to start accepting and the initial state of existing store keys that
@@ -630,7 +620,7 @@
for _, mt := range spec.MountTables {
name := naming.Join(mt, ss.name)
// TODO(hpucha): Is this add idempotent? Appears to be from code.
- // Will it handle absolute names. Appears to be.
+ // Confirm that it is ok to use absolute names here.
if err := ss.server.AddName(name); err != nil {
return err
}
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 396a6bd..6c87ee7 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -43,7 +43,9 @@
// TestAddSyncGroup tests adding SyncGroups.
func TestAddSyncGroup(t *testing.T) {
- // Set a large value to prevent the threads from firing.
+ // Set a large value to prevent the initiator from running. Since this
+ // test adds a fake SyncGroup, if the initiator runs, it will attempt
+ // to initiate using this fake and partial SyncGroup data.
peerSyncInterval = 1 * time.Hour
svc := createService(t)
defer destroyService(t, svc)
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 226f739..f44468e 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -8,6 +8,15 @@
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
)
+// Key prefixes for sync data structures. All these prefixes are prepended with
+// util.SyncPrefix.
+const (
+ logPrefix = "log"
+ dbssPrefix = "dbss"
+ dagPrefix = "dag"
+ sgPrefix = "sg"
+)
+
// syncData represents the persistent state of the sync module.
type syncData struct {
Id uint64
@@ -15,9 +24,9 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
- Gen uint64 // local generation number incremented on every local update.
- CkPtGen uint64 // local generation number advertised to remote peers (used by the responder).
- GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+ Gen uint64 // local generation number incremented on every local update.
+ CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
+ GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
}
// localLogRec represents the persistent local state of a log record. Metadata
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index ae51caa..e9b1d7d 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -27,9 +27,9 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
- Gen uint64 // local generation number incremented on every local update.
- CkPtGen uint64 // local generation number advertised to remote peers (used by the responder).
- GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+ Gen uint64 // local generation number incremented on every local update.
+ CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
+ GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
}
func (dbSyncState) __VDLReflect(struct {
@@ -54,3 +54,11 @@
vdl.Register((*dbSyncState)(nil))
vdl.Register((*localLogRec)(nil))
}
+
+const logPrefix = "log"
+
+const dbssPrefix = "dbss"
+
+const dagPrefix = "dag"
+
+const sgPrefix = "sg"