syncbase/vsync: add the sync DAG layer.
Add the sync DAG layer to store the history of object mutations,
use it to detect conflicts, and record conflict resolutions.
Change-Id: I52967a0b2626bf2fe5324081c64ff14059d566fa
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
new file mode 100644
index 0000000..a84d916
--- /dev/null
+++ b/services/syncbase/vsync/dag.go
@@ -0,0 +1,862 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Syncbase DAG (directed acyclic graph) utility functions.
+//
+// The DAG is used to track the version history of synced objects in order
+// to detect and resolve conflicts (concurrent changes on different devices).
+//
+// Note: the sync code uses the words "object" and "object ID" (oid) as a
+// generic way to refer to syncable entities, whether they are actual user data
+// (table row and its row key), prefix-ACLs (permission entry and its prefix),
+// or other metadata such as SyncGroups (SyncGroup value and its internal key
+// based on the SyncGroup ID).
+//
+// * Object IDs are globally unique across all devices.
+// * Syncable objects have version numbers associated with their mutations.
+// * For a given object ID, the version number is globally unique across all
+// devices, i.e. the (oid, version) tuple is globally unique.
+// * Each (oid, version) tuple is represented by a node in the DAG.
+// * The previous version of an object is its parent in the DAG, i.e. the
+// new version is derived from that parent.
+// * DAG nodes have child-to-parent pointers.
+// * When there are no conflicts, the parent node has a single child node
+// that points to it.
+// * When a parent node has more than one child, this indicates concurrent
+// mutations which are treated as a conflict to be resolved.
+// * When a conflict is resolved, the new version has pointers back to each of
+// the two parents to indicate that it is derived from both nodes.
+// * During a sync operation from a source device to a target device, the
+// target receives a DAG fragment from the source. That fragment has to
+// be incorporated (grafted) into the target device's DAG. It may be a
+// continuation of the DAG of an object, with the attachment (graft) point
+// being the current head of DAG, in which case there are no conflicts.
+// Or the graft point(s) may be older nodes, which means the new fragment
+// is a divergence in the graph causing a conflict that must be resolved
+// in order to re-converge the two DAG fragments.
+//
+// In the diagrams below:
+// (h) represents the head node in the local device.
+// (nh) represents the new head node received from the remote device.
+// (g) represents a graft node, where new nodes attach to the existing DAG.
+// <- represents a derived-from mutation, i.e. a child-to-parent pointer
+//
+// a- No-conflict example: the new nodes (v4, v5) attach to the head node (v3).
+// In this case the new head becomes the head node, the new DAG fragment
+// being a continuation of the existing DAG.
+//
+// Before:
+// v1 <- v2 <- v3(h)
+//
+// Sync updates applied, no conflict detected:
+// v1 <- v2 <- v3(h,g) <- v4 <- v5 (nh)
+//
+// After:
+// v1 <- v2 <- v3 <- v4 <- v5 (h)
+//
+// b- Conflict example: the new nodes (v4, v5) attach to an old node (v2).
+// The current head node (v3) and the new head node (v5) are divergent
+// (concurrent) mutations that need to be resolved. The conflict
+// resolution function is passed the old head (v3), new head (v5), and
+// the common ancestor (v2). It resolves the conflict with (v6) which
+// is represented in the DAG as derived from both v3 and v5 (2 parents).
+//
+// Before:
+// v1 <- v2 <- v3(h)
+//
+// Sync updates applied, conflict detected (v3 not a graft node):
+// v1 <- v2(g) <- v3(h)
+// <- v4 <- v5 (nh)
+//
+// After: conflict resolver creates v6 having 2 parents (v3, v5):
+// v1 <- v2(g) <- v3 <------- v6(h)
+// <- v4 <- v5 <-
+//
+// The DAG does not grow indefinitely. During a sync operation each device
+// learns what the other device already knows -- where it's at in the version
+// history for the objects. When a device determines that all devices that
+// sync an object (members of matching SyncGroups) have moved past some version
+// for that object, the DAG for that object can be pruned up to that common
+// version, deleting all prior (ancestor) nodes.
+//
+// The DAG contains three tables persisted to disk (nodes, heads, batches):
+//
+// * nodes: one entry per (oid, version) with references to parent node(s)
+// it is derived from, a reference to the log record identifying
+// that mutation, a reference to its write batch (if any), and a
+// boolean to indicate whether this was an object deletion.
+//
+// * heads: one entry per object pointing to its most recent version.
+//
+// * batches: one entry per batch ID containing the set of objects in the
+// write batch and their versions.
+
+import (
+ "container/list"
+ "fmt"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+const (
+ NoVersion = ""
+ NoBatchId = uint64(0)
+)
+
+// dagNode holds the information on a object mutation in the DAG.
+// Note: the batch ID and deleted flag are copies of information in the log
+// record. They are also stored in the DAG node to improve DAG traversal for
+// conflict resolution and pruning without having to fetch the full log record
+// every time.
+type dagNode struct {
+ Level uint64 // node distance from root
+ Parents []string // references to parent versions
+ Logrec string // reference to log record
+ BatchId uint64 // ID of a write batch
+ Deleted bool // true if the change was a delete
+}
+
+// batchSet holds information on a set of write batches.
+type batchSet map[uint64]*batchInfo
+
+// batchInfo holds the information on a write batch:
+// - The map of syncable (versioned) objects: {oid: version}
+// - The total count of batch objects, including non-syncable ones.
+// TODO(rdaoud): add support to track the read and scan sets.
+type batchInfo struct {
+ Objects map[string]string
+ Count uint64
+}
+
+// graftMap holds the state of DAG node grafting (attaching) per object.
+type graftMap map[string]*graftInfo
+
+// graftInfo holds the state of an object's node grafting in the DAG.
+// It is ephemeral (in-memory), used during a single sync operation to track
+// where the new DAG fragments are attached to the existing DAG for the object:
+// - newNodes: the set of newly added nodes; used to detect the type of edges
+// between nodes (new-node to old-node or vice versa).
+// - newHeads: the set of new candidate head nodes; used to detect conflicts.
+// - graftNodes: the set of old nodes on which new nodes were added, and their
+// level in the DAG; used to find common ancestors for conflicts.
+//
+// After the received mutations are applied, if there are two heads in the
+// newHeads set, there is a conflict to be resolved for the object. Otherwise,
+// if there is one head, no conflict was triggered and the new head becomes the
+// current object version. In case of conflict, the graftNodes set is used to
+// select a common ancestor.
+// TODO(rdaoud): support open DAGs to handle delayed conflict resolution by
+// tracking multiple dangling remote heads in addition to the local head node.
+type graftInfo struct {
+ newNodes map[string]bool
+ newHeads map[string]bool
+ graftNodes map[string]uint64
+}
+
+// newBatchInfo allocates and initializes a batch info entry.
+func newBatchInfo() *batchInfo {
+ return &batchInfo{Objects: make(map[string]string), Count: 0}
+}
+
+// startBatch marks the start of a batch. It generates a batch ID and returns
+// it to the caller, if an ID is not given. The batch ID is used to track DAG
+// nodes that are part of the same batch and it is stored in the log records.
+// If a batch ID is given by the caller, its information is accessed.
+func (s *syncService) startBatch(ctx *context.T, st store.StoreReader, btid uint64) uint64 {
+ s.batchesLock.Lock()
+ defer s.batchesLock.Unlock()
+
+ // If no batch ID is given, generate a new unused one.
+ if btid == NoBatchId {
+ for (btid == NoBatchId) || (s.batches[btid] != nil) {
+ btid = rand64()
+ }
+
+ s.batches[btid] = newBatchInfo()
+ return btid
+ }
+
+ // Use the given batch ID and, if needed, refetch its in-memory entry
+ // from the store. It is OK not to find it in the store; it means sync
+ // is learning about this batch ID the first time from another sync.
+ if s.batches[btid] == nil {
+ info, err := getBatch(ctx, st, btid)
+ if err != nil {
+ info = newBatchInfo()
+ }
+ s.batches[btid] = info
+ }
+
+ return btid
+}
+
+// addNodeToBatch adds a node (oid, version) to a batch under construction.
+func (s *syncService) addNodeToBatch(ctx *context.T, btid uint64, oid, version string) error {
+ s.batchesLock.Lock()
+ defer s.batchesLock.Unlock()
+
+ if btid == NoBatchId {
+ return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+ }
+
+ info := s.batches[btid]
+ if info == nil {
+ return verror.New(verror.ErrInternal, ctx, "unknown batch id", btid)
+ }
+
+ info.Objects[oid] = version
+ return nil
+}
+
+// endBatch marks the end of a given batch. The batch information is persisted
+// to the store and removed from the temporary in-memory entry.
+func (s *syncService) endBatch(ctx *context.T, tx store.StoreReadWriter, btid, count uint64) error {
+ _ = tx.(store.Transaction)
+
+ s.batchesLock.Lock()
+ defer s.batchesLock.Unlock()
+
+ if btid == NoBatchId || count == 0 {
+ return verror.New(verror.ErrInternal, ctx, "invalid batch info", btid, count)
+ }
+
+ info := s.batches[btid]
+ if info == nil {
+ return verror.New(verror.ErrInternal, ctx, "unknown batch id", btid)
+ }
+
+ // The first time a batch is ended, info.Count is zero. Subsequently,
+ // if this batch ID is started and ended again, info.Count should be
+ // the same as the "count" value given.
+ if info.Count != 0 && info.Count != count {
+ return verror.New(verror.ErrInternal, ctx, "wrong counts for batch", btid, info.Count, count)
+ }
+
+ // Only save non-empty batches.
+ if len(info.Objects) > 0 {
+ info.Count = count
+ if err := setBatch(ctx, tx, btid, info); err != nil {
+ return err
+ }
+ }
+
+ delete(s.batches, btid)
+ return nil
+}
+
+// addNode adds a new node for a DAG object, linking it to its parent nodes.
+// It verifies that the node does not exist and its parent nodes are valid.
+// If a batch ID is given, track the node membership in the batch.
+//
+// Note: an in-memory grafting structure is passed to track DAG attachments
+// during a sync operation. This is needed when nodes are being added due to
+// remote changes fetched by the sync protocol. The Initiator allocates a
+// grafting structure at the start of a sync operation and passes it across
+// calls to addNode() to update the DAG grafting state:
+// - If a parent node is not new, mark it as a DAG graft point.
+// - Mark this version as a new node.
+// - Update the new head node pointer of the grafted DAG.
+//
+// The grafting structure is not needed when nodes are being added locally by
+// the Watcher, passing a nil grafting structure.
+func (s *syncService) addNode(ctx *context.T, tx store.StoreReadWriter, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
+ _ = tx.(store.Transaction)
+
+ if parents != nil {
+ if len(parents) > 2 {
+ return verror.New(verror.ErrInternal, ctx, "cannot have more than 2 parents")
+ }
+ if len(parents) == 0 {
+ parents = nil // replace an empty array with a nil
+ }
+ }
+
+ // The new node must not exist.
+ if hasNode(ctx, tx, oid, version) {
+ return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
+ }
+
+ // A new root node (no parents) is allowed only for new objects.
+ // TODO(rdaoud): remove that limitation, we now can have key-collision.
+ if parents == nil {
+ if _, err := getHead(ctx, tx, oid); err == nil {
+ return verror.New(verror.ErrInternal, ctx,
+ "cannot add another root node for this object", oid, version)
+ }
+ }
+
+ // Verify the parents, determine the node level. Also save the levels
+ // of the parent nodes for later in this function in graft updates.
+ parentLevels := make(map[string]uint64)
+ var level uint64
+ for _, parent := range parents {
+ pnode, err := getNode(ctx, tx, oid, parent)
+ if err != nil {
+ return err
+ }
+ parentLevels[parent] = pnode.Level
+ if level <= pnode.Level {
+ level = pnode.Level + 1
+ }
+ }
+
+ // If a batch ID is given, add the node to that batch.
+ if btid != NoBatchId {
+ if err := s.addNodeToBatch(ctx, btid, oid, version); err != nil {
+ return err
+ }
+ }
+
+ // Add the node entry to the DAG.
+ node := &dagNode{
+ Level: level,
+ Parents: parents,
+ Logrec: logrec,
+ BatchId: btid,
+ Deleted: deleted,
+ }
+ if err := setNode(ctx, tx, oid, version, node); err != nil {
+ return err
+ }
+
+ // We are done if grafting is not being tracked (a local node add).
+ if graft == nil {
+ return nil
+ }
+
+ // Get the object's graft info entry in order to update it. It happens
+ // when addNode() is called by the sync Initiator and the DAG is updated
+ // with new nodes fetched from other devices.
+ //
+ // During a sync operation, each mutated object gets new nodes added in
+ // its DAG. These new nodes are either derived from nodes that were
+ // previously known on this device (i.e. their parent nodes are pre-
+ // existing), or they are derived from other new DAG nodes being
+ // discovered during this sync (i.e. their parent nodes were also just
+ // added to the DAG).
+ //
+ // To detect a conflict and find the most recent common ancestor to
+ // pass to the conflict resolver, the DAG graft info keeps track of the
+ // new nodes that have old parent nodes. These old-to-new edges are
+ // points where new DAG fragments are attached (grafted) onto the
+ // existing DAG. The old nodes are the graft nodes forming the set of
+ // common ancestors to use in conflict resolution:
+ //
+ // 1- A conflict happens when the current "head node" for an object is
+ // not in the set of graft nodes. It means the object mutations
+ // were not derived from what the device knows, but are divergent
+ // changes at a prior point.
+ //
+ // 2- The most recent common ancestor to use in resolving the conflict
+ // is the object graft node with the deepest level (furthest from
+ // the root node), representing the most up-to-date common knowledge
+ // between the devices.
+ info := getObjectGraftInfo(ctx, tx, graft, oid)
+
+ for _, parent := range parents {
+ // If this parent is an old node, it's a graft point.
+ if !info.newNodes[parent] {
+ info.graftNodes[parent] = parentLevels[parent]
+ }
+
+ // A parent cannot be a candidate for a new head.
+ delete(info.newHeads, parent)
+ }
+
+ // This new node is a candidate for new head version.
+ info.newNodes[version] = true
+ info.newHeads[version] = true
+ return nil
+}
+
+// addParent adds to the DAG node (oid, version) linkage to this parent node.
+//
+// Note: as with the addNode() call, an in-memory grafting structure is passed
+// to track DAG attachements during a sync operation. It is not needed if the
+// parent linkage is due to a local change (from conflict resolution selecting
+// an existing version).
+func (s *syncService) addParent(ctx *context.T, tx store.StoreReadWriter, oid, version, parent string, graft graftMap) error {
+ _ = tx.(store.Transaction)
+
+ if version == parent {
+ return verror.New(verror.ErrInternal, ctx, "object", oid, version, "cannot be its own parent")
+ }
+
+ node, err := getNode(ctx, tx, oid, version)
+ if err != nil {
+ return err
+ }
+ pnode, err := getNode(ctx, tx, oid, parent)
+ if err != nil {
+ return err
+ }
+
+ // Check if the parent is already linked to this node.
+ found := false
+ for _, p := range node.Parents {
+ if p == parent {
+ found = true
+ break
+ }
+ }
+
+ // Add the parent if it is not yet linked.
+ if !found {
+ // Make sure that adding the link does not create a DAG cycle.
+ // Verify that the node is not an ancestor of the parent that
+ // it is being linked to.
+ err = forEachAncestor(ctx, tx, oid, pnode.Parents, func(v string, nd *dagNode) error {
+ if v == version {
+ return verror.New(verror.ErrInternal, ctx, "cycle on object",
+ oid, ": node", version, "is ancestor of parent", parent)
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ node.Parents = append(node.Parents, parent)
+ if err = setNode(ctx, tx, oid, version, node); err != nil {
+ return err
+ }
+ }
+
+ // If no grafting structure is given (i.e. local changes), we are done.
+ if graft == nil {
+ return nil
+ }
+
+ // Update graft: if the node and its parent are new/old or old/new then
+ // add the parent as a graft point (a potential common ancestor).
+ info := getObjectGraftInfo(ctx, tx, graft, oid)
+
+ _, nodeNew := info.newNodes[version]
+ _, parentNew := info.newNodes[parent]
+ if (nodeNew && !parentNew) || (!nodeNew && parentNew) {
+ info.graftNodes[parent] = pnode.Level
+ }
+
+ // The parent node can no longer be a candidate for a new head version.
+ delete(info.newHeads, parent)
+ return nil
+}
+
+// moveHead moves the object head node in the DAG.
+func moveHead(ctx *context.T, tx store.StoreReadWriter, oid, head string) error {
+ _ = tx.(store.Transaction)
+
+ // Verify that the node exists.
+ if !hasNode(ctx, tx, oid, head) {
+ return verror.New(verror.ErrInternal, ctx, "node", oid, head, "does not exist")
+ }
+
+ return setHead(ctx, tx, oid, head)
+}
+
+// hasConflict determines if an object has a conflict between its new and old
+// head nodes.
+// - Yes: return (true, newHead, oldHead, ancestor)
+// - No: return (false, newHead, oldHead, NoVersion)
+// A conflict exists when there are two new-head nodes in the graft structure.
+// It means the newly added object versions are not derived in part from this
+// device's current knowledge. If there is a single new-head, the object
+// changes were applied without triggering a conflict.
+func hasConflict(ctx *context.T, st store.StoreReader, oid string, graft graftMap) (isConflict bool, newHead, oldHead, ancestor string, err error) {
+ isConflict = false
+ oldHead = NoVersion
+ newHead = NoVersion
+ ancestor = NoVersion
+ err = nil
+
+ info := graft[oid]
+ if info == nil {
+ err = verror.New(verror.ErrInternal, ctx, "node", oid, "has no DAG graft info")
+ return
+ }
+
+ numHeads := len(info.newHeads)
+ if numHeads < 1 || numHeads > 2 {
+ err = verror.New(verror.ErrInternal, ctx, "node", oid, "invalid count of new heads", numHeads)
+ return
+ }
+
+ // Fetch the current head for this object if it exists. The error from
+ // getHead() is ignored because a newly received object is not yet known
+ // on this device and will not trigger a conflict.
+ oldHead, _ = getHead(ctx, st, oid)
+
+ // If there is only one new head node there is no conflict. The new
+ // head is that single one, even if it might also be the same old node.
+ if numHeads == 1 {
+ for head := range info.newHeads {
+ newHead = head
+ }
+ return
+ }
+
+ // With two candidate head nodes, the new head is the non-old one.
+ for head := range info.newHeads {
+ if head != oldHead {
+ newHead = head
+ break
+ }
+ }
+
+ // There is a conflict: the best choice ancestor is the graft node with
+ // the largest level (farthest from the root). It is possible in some
+ // corner cases to have multiple graft nodes at the same level. This
+ // would still be a single conflict, but the multiple same-level graft
+ // nodes representing equivalent conflict resolutions on different
+ // devices that are now merging their resolutions. In such a case it
+ // does not matter which node is chosen as the ancestor because the
+ // conflict resolver function is assumed to be convergent. However it
+ // is nicer to make that selection deterministic so all devices see the
+ // same choice: the version number is used as a tie-breaker.
+ isConflict = true
+ var maxLevel uint64
+ for node, level := range info.graftNodes {
+ if maxLevel < level || (maxLevel == level && ancestor < node) {
+ maxLevel = level
+ ancestor = node
+ }
+ }
+ return
+}
+
+// newGraft allocates a graftMap to track DAG node grafting during sync.
+func newGraft() graftMap {
+ return make(graftMap)
+}
+
+// getObjectGraft returns the graftInfo for an object ID. If the graftMap is
+// nil, a nil graftInfo is returned because grafting is not being tracked.
+func getObjectGraftInfo(ctx *context.T, st store.StoreReader, graft graftMap, oid string) *graftInfo {
+ if graft == nil {
+ return nil
+ }
+ if info := graft[oid]; info != nil {
+ return info
+ }
+
+ info := &graftInfo{
+ newNodes: make(map[string]bool),
+ newHeads: make(map[string]bool),
+ graftNodes: make(map[string]uint64),
+ }
+
+ // If the object has a head node, include it in the set of new heads.
+ if head, err := getHead(ctx, st, oid); err == nil {
+ info.newHeads[head] = true
+ }
+
+ graft[oid] = info
+ return info
+}
+
+// forEachAncestor loops over the DAG ancestor nodes of an object in a breadth-
+// first traversal starting from given version nodes. It calls the given
+// callback function once for each ancestor node.
+func forEachAncestor(ctx *context.T, st store.StoreReader, oid string, startVersions []string, callback func(version string, node *dagNode) error) error {
+ visited := make(map[string]bool)
+ queue := list.New()
+ for _, version := range startVersions {
+ queue.PushBack(version)
+ visited[version] = true
+ }
+
+ for queue.Len() > 0 {
+ version := queue.Remove(queue.Front()).(string)
+ node, err := getNode(ctx, st, oid, version)
+ if err != nil {
+ // Ignore it, the parent was previously pruned.
+ continue
+ }
+ for _, parent := range node.Parents {
+ if !visited[parent] {
+ queue.PushBack(parent)
+ visited[parent] = true
+ }
+ }
+ if err = callback(version, node); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// newBatchPruning allocates an in-memory structure to track batches affected
+// by a DAG pruning operation across objects.
+func newBatchPruning() batchSet {
+ return make(batchSet)
+}
+
+// prune trims the DAG of an object at a given version (node) by deleting all
+// its ancestor nodes, making it the new root node. For each deleted node it
+// calls the given callback function to delete its log record.
+//
+// Note: this function should only be used when sync determines that all devices
+// that know about this object have gotten past this version.
+//
+// The batch set passed is used to track batches affected by the deletion of DAG
+// objects across multiple calls to prune(). It is later given to pruneDone()
+// to do GC on these batches.
+func prune(ctx *context.T, tx store.StoreReadWriter, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.StoreReadWriter, logrec string) error) error {
+ _ = tx.(store.Transaction)
+
+ if batches == nil {
+ return verror.New(verror.ErrInternal, ctx, "missing batch set")
+ }
+
+ // Get the node at the pruning point and set its parents to nil.
+ // It will become the oldest DAG node (root) for the object.
+ node, err := getNode(ctx, tx, oid, version)
+ if err != nil {
+ return err
+ }
+ if node.Parents == nil {
+ // Nothing to do, this node is already the root.
+ return nil
+ }
+
+ parents := node.Parents
+ node.Parents = nil
+ if err = setNode(ctx, tx, oid, version, node); err != nil {
+ return err
+ }
+
+ // Delete all ancestor nodes and their log records. Delete as many as
+ // possible and track the error counts. Update the batch set to track
+ // their pruning.
+ nodeErrs, logErrs := 0, 0
+ forEachAncestor(ctx, tx, oid, parents, func(v string, nd *dagNode) error {
+ if btid := nd.BatchId; btid != NoBatchId {
+ if batches[btid] == nil {
+ batches[btid] = newBatchInfo()
+ }
+ batches[btid].Objects[oid] = v
+ }
+
+ if err := delLogRec(ctx, tx, nd.Logrec); err != nil {
+ logErrs++
+ }
+ if err := delNode(ctx, tx, oid, v); err != nil {
+ nodeErrs++
+ }
+ return nil
+ })
+ if nodeErrs != 0 || logErrs != 0 {
+ return verror.New(verror.ErrInternal, ctx,
+ "prune failed to delete nodes and logs:", nodeErrs, logErrs)
+ }
+ return nil
+}
+
+// pruneDone is called when object pruning is finished within a single pass of
+// the sync garbage collector. It updates the batch sets affected by objects
+// deleted by prune().
+func pruneDone(ctx *context.T, tx store.StoreReadWriter, batches batchSet) error {
+ _ = tx.(store.Transaction)
+
+ // Update batch sets by removing the pruned objects from them.
+ for btid, pruneInfo := range batches {
+ info, err := getBatch(ctx, tx, btid)
+ if err != nil {
+ return err
+ }
+
+ for oid := range pruneInfo.Objects {
+ delete(info.Objects, oid)
+ }
+
+ if len(info.Objects) > 0 {
+ err = setBatch(ctx, tx, btid, info)
+ } else {
+ err = delBatch(ctx, tx, btid)
+ }
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// getLogrec returns the log record information for a given object version.
+func getLogrec(ctx *context.T, st store.StoreReader, oid, version string) (string, error) {
+ node, err := getNode(ctx, st, oid, version)
+ if err != nil {
+ return "", err
+ }
+ return node.Logrec, nil
+}
+
+// Low-level utility functions to access DB entries without tracking their
+// relationships. Use the functions above to manipulate the DAG.
+
+// nodeKey returns the key used to access a DAG node (oid, version).
+func nodeKey(oid, version string) string {
+ return util.JoinKeyParts(util.SyncPrefix, "dag", "n", oid, version)
+}
+
+// setNode stores the DAG node entry.
+func setNode(ctx *context.T, tx store.StoreReadWriter, oid, version string, node *dagNode) error {
+ _ = tx.(store.Transaction)
+
+ if version == NoVersion {
+ return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ }
+
+ if err := util.PutObject(tx, nodeKey(oid, version), node); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// getNode retrieves the DAG node entry for the given (oid, version).
+func getNode(ctx *context.T, st store.StoreReader, oid, version string) (*dagNode, error) {
+ if version == NoVersion {
+ return nil, verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ }
+
+ var node dagNode
+ if err := util.GetObject(st, nodeKey(oid, version), &node); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return &node, nil
+}
+
+// delNode deletes the DAG node entry.
+func delNode(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
+ _ = tx.(store.Transaction)
+
+ if version == NoVersion {
+ return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ }
+
+ if err := tx.Delete([]byte(nodeKey(oid, version))); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// hasNode returns true if the node (oid, version) exists in the DAG.
+func hasNode(ctx *context.T, st store.StoreReader, oid, version string) bool {
+ // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
+ if _, err := getNode(ctx, st, oid, version); err != nil {
+ return false
+ }
+ return true
+}
+
+// headKey returns the key used to access the DAG object head.
+func headKey(oid string) string {
+ return util.JoinKeyParts(util.SyncPrefix, "dag", "h", oid)
+}
+
+// setHead stores version as the DAG object head.
+func setHead(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
+ _ = tx.(store.Transaction)
+
+ if version == NoVersion {
+ return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
+ }
+
+ if err := util.PutObject(tx, headKey(oid), version); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// getHead retrieves the DAG object head.
+func getHead(ctx *context.T, st store.StoreReader, oid string) (string, error) {
+ var version string
+ if err := util.GetObject(st, headKey(oid), &version); err != nil {
+ return NoVersion, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return version, nil
+}
+
+// delHead deletes the DAG object head.
+func delHead(ctx *context.T, tx store.StoreReadWriter, oid string) error {
+ _ = tx.(store.Transaction)
+
+ if err := tx.Delete([]byte(headKey(oid))); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// batchKey returns the key used to access the DAG batch info.
+func batchKey(btid uint64) string {
+ return util.JoinKeyParts(util.SyncPrefix, "dag", "b", fmt.Sprintf("%d", btid))
+}
+
+// setBatch stores the DAG batch entry.
+func setBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64, info *batchInfo) error {
+ _ = tx.(store.Transaction)
+
+ if btid == NoBatchId {
+ return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+ }
+
+ if err := util.PutObject(tx, batchKey(btid), info); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// getBatch retrieves the DAG batch entry.
+func getBatch(ctx *context.T, st store.StoreReader, btid uint64) (*batchInfo, error) {
+ if btid == NoBatchId {
+ return nil, verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+ }
+
+ var info batchInfo
+ if err := util.GetObject(st, batchKey(btid), &info); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return &info, nil
+}
+
+// delBatch deletes the DAG batch entry.
+func delBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64) error {
+ _ = tx.(store.Transaction)
+
+ if btid == NoBatchId {
+ return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+ }
+
+ if err := tx.Delete([]byte(batchKey(btid))); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// getParentMap is a testing and debug helper function that returns for an
+// object a map of its DAG (node-to-parents relations). If a graft structure
+// is given, include its fragments in the map.
+func getParentMap(ctx *context.T, st store.StoreReader, oid string, graft graftMap) map[string][]string {
+ parentMap := make(map[string][]string)
+ var start []string
+
+ if head, err := getHead(ctx, st, oid); err == nil {
+ start = append(start, head)
+ }
+ if graft != nil && graft[oid] != nil {
+ for v := range graft[oid].newHeads {
+ start = append(start, v)
+ }
+ }
+
+ forEachAncestor(ctx, st, oid, start, func(v string, nd *dagNode) error {
+ parentMap[v] = nd.Parents
+ return nil
+ })
+ return parentMap
+}
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
new file mode 100644
index 0000000..1b899df
--- /dev/null
+++ b/services/syncbase/vsync/dag_test.go
@@ -0,0 +1,809 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Tests for the Syncbase DAG.
+
+import (
+ "errors"
+ "fmt"
+ "reflect"
+ "strconv"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+
+ "v.io/v23/context"
+)
+
+// TestSetNode tests setting and getting a DAG node.
+func TestSetNode(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+
+ oid, version := "1111", "1"
+
+ node, err := getNode(nil, st, oid, version)
+ if err == nil || node != nil {
+ t.Errorf("found non-existent object %s:%s: %v", oid, version, node)
+ }
+
+ if hasNode(nil, st, oid, version) {
+ t.Errorf("hasNode() found non-existent object %s:%s", oid, version)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+ t.Errorf("non-existent object %s:%s has a logrec: %v", oid, version, logrec)
+ }
+
+ node = &dagNode{Level: 15, Parents: []string{"444", "555"}, Logrec: "logrec-23"}
+
+ tx := st.NewTransaction()
+ if err = setNode(nil, tx, oid, version, node); err != nil {
+ t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
+ }
+ tx.Commit()
+
+ node2, err := getNode(nil, st, oid, version)
+ if err != nil || node2 == nil {
+ t.Errorf("cannot find stored object %s:%s: %v", oid, version, err)
+ }
+
+ if !hasNode(nil, st, oid, version) {
+ t.Errorf("hasNode() did not find object %s:%s", oid, version)
+ }
+
+ if !reflect.DeepEqual(node, node2) {
+ t.Errorf("object %s:%s has wrong data: %v instead of %v", oid, version, node2, node)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, version); err != nil || logrec != "logrec-23" {
+ t.Errorf("object %s:%s has wrong logrec: %s", oid, version, logrec)
+ }
+}
+
+// TestDelNode tests deleting a DAG node.
+func TestDelNode(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+
+ oid, version := "2222", "2"
+
+ node := &dagNode{Level: 123, Parents: []string{"333"}, Logrec: "logrec-789"}
+
+ tx := st.NewTransaction()
+ if err := setNode(nil, tx, oid, version, node); err != nil {
+ t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
+ }
+ tx.Commit()
+
+ tx = st.NewTransaction()
+ if err := delNode(nil, tx, oid, version); err != nil {
+ t.Fatalf("cannot delete object %s:%s: %v", oid, version, err)
+ }
+ tx.Commit()
+
+ node2, err := getNode(nil, st, oid, version)
+ if err == nil || node2 != nil {
+ t.Errorf("found deleted object %s:%s (%v)", oid, version, node2)
+ }
+
+ if hasNode(nil, st, oid, version) {
+ t.Errorf("hasNode() found deleted object %s:%s", oid, version)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+ t.Errorf("deleted object %s:%s has logrec: %s", oid, version, logrec)
+ }
+}
+
+// TestAddParent tests adding parents to a DAG node.
+func TestAddParent(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid, version := "1234", "7"
+
+ tx := st.NewTransaction()
+ if err := s.addParent(nil, tx, oid, version, "haha", nil); err == nil {
+ t.Errorf("addParent() did not fail for an unknown object %s:%s", oid, version)
+ }
+ tx.Abort()
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ node := &dagNode{Level: 15, Logrec: "logrec-22"}
+
+ tx = st.NewTransaction()
+ if err := setNode(nil, tx, oid, version, node); err != nil {
+ t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
+ }
+ tx.Commit()
+
+ graft := newGraft()
+ tx = st.NewTransaction()
+ if err := s.addParent(nil, tx, oid, version, version, graft); err == nil {
+ t.Errorf("addParent() did not fail on a self-parent for object %s:%s", oid, version)
+ }
+ tx.Abort()
+
+ remote := true
+ expParents := []string{"4", "5", "6"}
+
+ for _, parent := range expParents {
+ tx = st.NewTransaction()
+ if err := s.addParent(nil, tx, oid, version, parent, graft); err == nil {
+ t.Errorf("addParent() did not reject invalid parent %s for object %s:%s",
+ parent, oid, version)
+ }
+ tx.Abort()
+
+ pnode := &dagNode{Level: 11, Logrec: fmt.Sprintf("logrec-%s", parent), Parents: []string{"3"}}
+
+ tx = st.NewTransaction()
+ if err := setNode(nil, tx, oid, parent, pnode); err != nil {
+ t.Fatalf("cannot set parent object %s:%s (%v): %v", oid, parent, pnode, err)
+ }
+ tx.Commit()
+
+ var g graftMap
+ if remote {
+ g = graft
+ }
+
+ // addParent() twice to verify it is idempotent.
+ for i := 0; i < 2; i++ {
+ tx = st.NewTransaction()
+ if err := s.addParent(nil, tx, oid, version, parent, g); err != nil {
+ t.Errorf("addParent() failed on parent %s, remote %t (i=%d) for %s:%s: %v",
+ parent, remote, i, oid, version, err)
+ }
+ tx.Commit()
+ }
+
+ remote = !remote
+ }
+
+ node2, err := getNode(nil, st, oid, version)
+ if err != nil || node2 == nil {
+ t.Errorf("cannot find object %s:%s: %v", oid, version, err)
+ }
+
+ if !reflect.DeepEqual(node2.Parents, expParents) {
+ t.Errorf("invalid parents for object %s:%s: %v instead of %v",
+ oid, version, node2.Parents, expParents)
+ }
+
+ // Creating cycles should fail.
+ for v := 1; v < 7; v++ {
+ ver := fmt.Sprintf("%d", v)
+ tx = st.NewTransaction()
+ if err = s.addParent(nil, tx, oid, ver, version, nil); err == nil {
+ t.Errorf("addParent() failed to reject a cycle for %s: from ancestor %s to node %s",
+ oid, ver, version)
+ }
+ tx.Abort()
+ }
+}
+
+// TestSetHead tests setting and getting a DAG head node.
+func TestSetHead(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+
+ oid := "3333"
+
+ version, err := getHead(nil, st, oid)
+ if err == nil {
+ t.Errorf("found non-existent object head %s:%s", oid, version)
+ }
+
+ for i := 0; i < 2; i++ {
+ version = fmt.Sprintf("v%d", 555+i)
+
+ tx := st.NewTransaction()
+ if err = setHead(nil, tx, oid, version); err != nil {
+ t.Fatalf("cannot set object head %s:%s (i=%d)", oid, version, i)
+ }
+ tx.Commit()
+
+ version2, err := getHead(nil, st, oid)
+ if err != nil {
+ t.Errorf("cannot find stored object head %s (i=%d)", oid, i)
+ }
+ if version != version2 {
+ t.Errorf("object %s has wrong head data (i=%d): %s instead of %s",
+ oid, i, version2, version)
+ }
+ }
+}
+
+// TestLocalUpdates tests the sync handling of initial local updates: an object
+// is created (v1) and updated twice (v2, v3) on this device. The DAG should
+// show: v1 -> v2 -> v3 and the head should point to v3.
+func TestLocalUpdates(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must have moved to v3 and the parent map shows the updated DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("invalid object %s head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, nil)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Make sure an existing node cannot be added again.
+ tx := st.NewTransaction()
+ if err := s.addNode(nil, tx, oid, "2", "foo", false, []string{"1", "3"}, NoBatchId, nil); err == nil {
+ t.Errorf("addNode() did not fail when given an existing node")
+ }
+
+ // Make sure a new node cannot have more than 2 parents.
+ if err := s.addNode(nil, tx, oid, "4", "foo", false, []string{"1", "2", "3"}, NoBatchId, nil); err == nil {
+ t.Errorf("addNode() did not fail when given 3 parents")
+ }
+
+ // Make sure a new node cannot have an invalid parent.
+ if err := s.addNode(nil, tx, oid, "4", "foo", false, []string{"1", "555"}, NoBatchId, nil); err == nil {
+ t.Errorf("addNode() did not fail when using an invalid parent")
+ }
+
+ // Make sure a new root node (no parents) cannot be added once a root exists.
+ // For the parents array, check both the "nil" and the empty array as input.
+ if err := s.addNode(nil, tx, oid, "6789", "foo", false, nil, NoBatchId, nil); err == nil {
+ t.Errorf("adding a 2nd root node (nil parents) for object %s did not fail", oid)
+ }
+ if err := s.addNode(nil, tx, oid, "6789", "foo", false, []string{}, NoBatchId, nil); err == nil {
+ t.Errorf("adding a 2nd root node (empty parents) for object %s did not fail", oid)
+ }
+ tx.Abort()
+}
+
+// TestRemoteUpdates tests the sync handling of initial remote updates:
+// an object is created (v1) and updated twice (v2, v3) on another device and
+// we learn about it during sync. The updated DAG should show: v1 -> v2 -> v3
+// and report no conflicts with the new head pointing at v3.
+func TestRemoteUpdates(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ graft, err := s.dagReplayCommands(nil, "remote-init-00.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still undefined) and the parent
+ // map shows the newly grafted DAG fragment.
+ if head, err := getHead(nil, st, oid); err == nil {
+ t.Errorf("object %s head found: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"3": true}
+
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(!isConflict && newHead == "3" && oldHead == NoVersion && ancestor == NoVersion && errConflict == nil) {
+ t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
+ }
+
+ // Make sure an unknown node cannot become the new head.
+ tx := st.NewTransaction()
+ if err := moveHead(nil, tx, oid, "55"); err == nil {
+ t.Errorf("moveHead() did not fail on an invalid node")
+ }
+ tx.Abort()
+
+ // Then move the head.
+ tx = st.NewTransaction()
+ if err := moveHead(nil, tx, oid, newHead); err != nil {
+ t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
+ }
+ tx.Commit()
+}
+
+// TestRemoteNoConflict tests sync of remote updates on top of a local initial
+// state without conflict. An object is created locally and updated twice
+// (v1 -> v2 -> v3). Another device, having gotten this info, makes 3 updates
+// on top of that (v3 -> v4 -> v5 -> v6) and sends this info in a later sync.
+// The updated DAG should show (v1 -> v2 -> v3 -> v4 -> v5 -> v6) and report
+// no conflicts with the new head pointing at v6. It should also report v3 as
+// the graft point on which the new fragment (v4 -> v5 -> v6) gets attached.
+func TestRemoteNoConflict(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-noconf-00.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v3) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"3"}, "5": {"4"}, "6": {"5"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"6": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{"3": 2}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(!isConflict && newHead == "6" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
+ t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
+ }
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
+ }
+
+ // Then move the head.
+ tx := st.NewTransaction()
+ if err := moveHead(nil, tx, oid, newHead); err != nil {
+ t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
+ }
+ tx.Commit()
+
+ // Verify that hasConflict() fails without graft data.
+ isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, nil)
+ if errConflict == nil {
+ t.Errorf("hasConflict() on %s did not fail w/o graft data: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+}
+
+// TestRemoteConflict tests sync handling remote updates that build on the
+// local initial state and trigger a conflict. An object is created locally
+// and updated twice (v1 -> v2 -> v3). Another device, having only gotten
+// the v1 -> v2 history, makes 3 updates on top of v2 (v2 -> v4 -> v5 -> v6)
+// and sends this info during a later sync. Separately, the local device
+// makes a conflicting (concurrent) update v2 -> v3. The updated DAG should
+// show the branches: (v1 -> v2 -> v3) and (v1 -> v2 -> v4 -> v5 -> v6) and
+// report the conflict between v3 and v6 (current and new heads). It should
+// also report v2 as the graft point and the common ancestor in the conflict.
+// The conflict is resolved locally by creating v7 that is derived from both
+// v3 and v6 and it becomes the new head.
+func TestRemoteConflict(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-conf-00.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v3) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"4"}, "6": {"5"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"3": true, "6": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{"2": 1}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be a conflict between v3 and v6 with v2 as ancestor.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
+ t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
+ }
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
+ }
+ if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+ t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
+ }
+
+ // Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
+ if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // Verify that the head moved to v7 and the parent map shows the resolution.
+ if head, err := getHead(nil, st, oid); err != nil || head != "7" {
+ t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
+ }
+
+ exp["7"] = []string{"3", "6"}
+ pmap = getParentMap(nil, st, oid, nil)
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
+ oid, pmap, exp)
+ }
+}
+
+// TestRemoteConflictTwoGrafts tests sync handling remote updates that build
+// on the local initial state and trigger a conflict with 2 graft points.
+// An object is created locally and updated twice (v1 -> v2 -> v3). Another
+// device, first learns about v1 and makes it own conflicting update v1 -> v4.
+// That remote device later learns about v2 and resolves the v2/v4 confict by
+// creating v5. Then it makes a last v5 -> v6 update -- which will conflict
+// with v3 but it doesn't know that.
+// Now the sync order is reversed and the local device learns all of what
+// happened on the remote device. The local DAG should get be augmented by
+// a subtree with 2 graft points: v1 and v2. It receives this new branch:
+// v1 -> v4 -> v5 -> v6. Note that v5 is also derived from v2 as a remote
+// conflict resolution. This should report a conflict between v3 and v6
+// (current and new heads), with v1 and v2 as graft points, and v2 as the
+// most-recent common ancestor for that conflict. The conflict is resolved
+// locally by creating v7, derived from both v3 and v6, becoming the new head.
+func TestRemoteConflictTwoGrafts(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-conf-01.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v3) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"1"}, "5": {"2", "4"}, "6": {"5"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"3": true, "6": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{"1": 0, "2": 1}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be a conflict between v3 and v6 with v2 as ancestor.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
+ t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
+ }
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:1" {
+ t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
+ }
+ if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+ t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
+ }
+
+ // Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
+ if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // Verify that the head moved to v7 and the parent map shows the resolution.
+ if head, err := getHead(nil, st, oid); err != nil || head != "7" {
+ t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
+ }
+
+ exp["7"] = []string{"3", "6"}
+ pmap = getParentMap(nil, st, oid, nil)
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
+ oid, pmap, exp)
+ }
+}
+
+// TestAncestorIterator checks that the iterator goes over the correct set
+// of ancestor nodes for an object given a starting node. It should traverse
+// reconvergent DAG branches only visiting each ancestor once:
+// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
+// |--> v4 ---| |
+// +--> v7 ---------------+
+// - Starting at v1 it should only cover v1.
+// - Starting at v3 it should only cover v1-v3.
+// - Starting at v6 it should only cover v1-v6.
+// - Starting at v9 it should cover all nodes (v1-v9).
+func TestAncestorIterator(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // Loop checking the iteration behavior for different starting nodes.
+ for _, start := range []int{1, 3, 6, 9} {
+ visitCount := make(map[string]int)
+ vstart := fmt.Sprintf("%d", start)
+ forEachAncestor(nil, st, oid, []string{vstart}, func(v string, nd *dagNode) error {
+ visitCount[v]++
+ return nil
+ })
+
+ // Check that all prior nodes are visited only once.
+ for i := 1; i < (start + 1); i++ {
+ vv := fmt.Sprintf("%d", i)
+ if visitCount[vv] != 1 {
+ t.Errorf("wrong visit count on object %s:%s starting from %s: %d instead of 1",
+ oid, vv, vstart, visitCount[vv])
+ }
+ }
+ }
+
+ // Make sure an error in the callback is returned.
+ cbErr := errors.New("callback error")
+ err := forEachAncestor(nil, st, oid, []string{"9"}, func(v string, nd *dagNode) error {
+ if v == "1" {
+ return cbErr
+ }
+ return nil
+ })
+ if err != cbErr {
+ t.Errorf("wrong error returned from callback: %v instead of %v", err, cbErr)
+ }
+}
+
+// TestPruning tests sync pruning of the DAG for an object with 3 concurrent
+// updates (i.e. 2 conflict resolution convergent points). The pruning must
+// get rid of the DAG branches across the reconvergence points:
+// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
+// |--> v4 ---| |
+// +--> v7 ---------------+
+// By pruning at v1, nothing is deleted.
+// Then by pruning at v2, only v1 is deleted.
+// Then by pruning at v6, v2-v5 are deleted leaving v6 and "v7 -> v8 -> v9".
+// Then by pruning at v8, v6-v7 are deleted leaving "v8 -> v9".
+// Then by pruning at v9, v8 is deleted leaving v9 as the head.
+// Then by pruning again at v9 nothing changes.
+func TestPruning(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"3", "4"}, "6": {"5"}, "7": {"2"}, "8": {"6", "7"}, "9": {"8"}}
+
+ // Loop pruning at an invalid version (333) then at different valid versions.
+ testVersions := []string{"333", "1", "2", "6", "8", "9", "9"}
+ delCounts := []int{0, 0, 1, 4, 2, 1, 0}
+ which := "prune-snip-"
+ remain := 9
+
+ for i, version := range testVersions {
+ batches := newBatchPruning()
+ tx := st.NewTransaction()
+ del := 0
+ err := prune(nil, tx, oid, version, batches,
+ func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+ del++
+ return nil
+ })
+ tx.Commit()
+
+ if i == 0 && err == nil {
+ t.Errorf("pruning non-existent object %s:%s did not fail", oid, version)
+ } else if i > 0 && err != nil {
+ t.Errorf("pruning object %s:%s failed: %v", oid, version, err)
+ }
+
+ if del != delCounts[i] {
+ t.Errorf("pruning object %s:%s deleted %d log records instead of %d",
+ oid, version, del, delCounts[i])
+ }
+
+ which += "*"
+ remain -= del
+
+ if head, err := getHead(nil, st, oid); err != nil || head != "9" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ tx = st.NewTransaction()
+ err = pruneDone(nil, tx, batches)
+ if err != nil {
+ t.Errorf("pruneDone() failed: %v", err)
+ }
+ tx.Commit()
+
+ // Remove pruned nodes from the expected parent map used to validate
+ // and set the parents of the pruned node to nil.
+ intVersion, err := strconv.ParseInt(version, 10, 32)
+ if err != nil {
+ t.Errorf("invalid version: %s", version)
+ }
+
+ if intVersion < 10 {
+ for j := int64(0); j < intVersion; j++ {
+ delete(exp, fmt.Sprintf("%d", j))
+ }
+ exp[version] = nil
+ }
+
+ pmap := getParentMap(nil, st, oid, nil)
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+ }
+}
+
+// TestPruningCallbackError tests sync pruning of the DAG when the callback
+// function returns an error. The pruning must try to delete as many nodes
+// and log records as possible and properly adjust the parent pointers of
+// the pruning node. The object DAG is:
+// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
+// |--> v4 ---| |
+// +--> v7 ---------------+
+// By pruning at v9 and having the callback function fail for v4, all other
+// nodes must be deleted and only v9 remains as the head.
+func TestPruningCallbackError(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := map[string][]string{"9": nil}
+
+ // Prune at v9 with a callback function that fails for v4.
+ del, expDel := 0, 8
+ version := "9"
+
+ batches := newBatchPruning()
+ tx := st.NewTransaction()
+ err := prune(nil, tx, oid, version, batches,
+ func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+ del++
+ if lr == "logrec-03" {
+ return fmt.Errorf("refuse to delete %s", lr)
+ }
+ return nil
+ })
+ tx.Commit()
+
+ if err == nil {
+ t.Errorf("pruning object %s:%s did not fail", oid, version)
+ }
+ if del != expDel {
+ t.Errorf("pruning object %s:%s deleted %d log records instead of %d", oid, version, del, expDel)
+ }
+
+ tx = st.NewTransaction()
+ err = pruneDone(nil, tx, batches)
+ if err != nil {
+ t.Errorf("pruneDone() failed: %v", err)
+ }
+ tx.Commit()
+
+ if head, err := getHead(nil, st, oid); err != nil || head != version {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, nil)
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+}
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
new file mode 100644
index 0000000..50fbf14
--- /dev/null
+++ b/services/syncbase/vsync/replay_test.go
@@ -0,0 +1,183 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Used to ease the setup of sync test scenarios.
+// Parses a sync command file and returns a vector of commands to execute.
+// dagReplayCommands() executes the parsed commands at the DAG API level.
+
+import (
+ "bufio"
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+
+ "v.io/v23/context"
+)
+
+const (
+ addLocal = iota
+ addRemote
+ linkLocal
+ linkRemote
+)
+
+type syncCommand struct {
+ cmd int
+ oid string
+ version string
+ parents []string
+ logrec string
+ deleted bool
+}
+
+// parseSyncCommands parses a sync test file and returns its commands.
+func parseSyncCommands(file string) ([]syncCommand, error) {
+ cmds := []syncCommand{}
+
+ sf, err := os.Open("testdata/" + file)
+ if err != nil {
+ return nil, err
+ }
+ defer sf.Close()
+
+ scanner := bufio.NewScanner(sf)
+ lineno := 0
+ for scanner.Scan() {
+ lineno++
+ line := strings.TrimSpace(scanner.Text())
+ if line == "" || line[0] == '#' {
+ continue
+ }
+
+ args := strings.Split(line, "|")
+ nargs := len(args)
+
+ switch args[0] {
+ case "addl", "addr":
+ expNargs := 9
+ if nargs != expNargs {
+ return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
+ file, lineno, expNargs, nargs)
+ }
+ var parents []string
+ for i := 3; i <= 4; i++ {
+ if args[i] != "" {
+ parents = append(parents, args[i])
+ }
+ }
+
+ del, err := strconv.ParseBool(args[8])
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid deleted bit: %s", file, lineno, args[8])
+ }
+ cmd := syncCommand{
+ oid: args[1],
+ version: args[2],
+ parents: parents,
+ logrec: args[5],
+ deleted: del,
+ }
+ if args[0] == "addl" {
+ cmd.cmd = addLocal
+ } else {
+ cmd.cmd = addRemote
+ }
+ cmds = append(cmds, cmd)
+
+ case "linkl", "linkr":
+ expNargs := 6
+ if nargs != expNargs {
+ return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
+ file, lineno, expNargs, nargs)
+ }
+
+ if args[3] == "" {
+ return nil, fmt.Errorf("%s:%d: parent version not specified", file, lineno)
+ }
+ if args[4] != "" {
+ return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent: %s",
+ file, lineno, args[4])
+ }
+
+ cmd := syncCommand{
+ oid: args[1],
+ version: args[2],
+ parents: []string{args[3]},
+ logrec: args[5],
+ }
+ if args[0] == "linkl" {
+ cmd.cmd = linkLocal
+ } else {
+ cmd.cmd = linkRemote
+ }
+ cmds = append(cmds, cmd)
+
+ default:
+ return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
+ }
+ }
+
+ err = scanner.Err()
+ return cmds, err
+}
+
+// dagReplayCommands parses a sync test file and replays its commands, updating
+// the DAG structures associated with the sync service.
+func (s *syncService) dagReplayCommands(ctx *context.T, syncfile string) (graftMap, error) {
+ cmds, err := parseSyncCommands(syncfile)
+ if err != nil {
+ return nil, err
+ }
+
+ st := s.sv.St()
+ graft := newGraft()
+
+ for _, cmd := range cmds {
+ tx := st.NewTransaction()
+
+ switch cmd.cmd {
+ case addLocal:
+ err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
+ cmd.deleted, cmd.parents, NoBatchId, nil)
+ if err != nil {
+ return nil, fmt.Errorf("cannot add local node %s:%s: %v",
+ cmd.oid, cmd.version, err)
+ }
+ tx.Commit()
+
+ tx = st.NewTransaction()
+ if err = moveHead(ctx, tx, cmd.oid, cmd.version); err != nil {
+ return nil, fmt.Errorf("cannot move head to %s:%s: %v",
+ cmd.oid, cmd.version, err)
+ }
+
+ case addRemote:
+ err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
+ cmd.deleted, cmd.parents, NoBatchId, graft)
+ if err != nil {
+ return nil, fmt.Errorf("cannot add remote node %s:%s: %v",
+ cmd.oid, cmd.version, err)
+ }
+
+ case linkLocal:
+ if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], nil); err != nil {
+ return nil, fmt.Errorf("cannot add local parent %s to node %s:%s: %v",
+ cmd.parents[0], cmd.oid, cmd.version, err)
+ }
+
+ case linkRemote:
+ if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], graft); err != nil {
+ return nil, fmt.Errorf("cannot add remote parent %s to node %s:%s: %v",
+ cmd.parents[0], cmd.oid, cmd.version, err)
+ }
+ }
+
+ tx.Commit()
+ }
+
+ return graft, nil
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 567d588..bded17b 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -47,6 +47,13 @@
// In-memory sync membership info aggregated across databases.
allMembers *memberView
+
+ // In-memory tracking of batches during their construction.
+ // The sync Initiator and Watcher build batches incrementally here
+ // and then persist them in DAG batch entries. The mutex guards
+ // access to the batch set.
+ batchesLock sync.Mutex
+ batches batchSet
}
// syncDatabase contains the metadata for syncing a database. This struct is
@@ -62,6 +69,11 @@
_ util.Layer = (*syncService)(nil)
)
+// rand64 generates an unsigned 64-bit pseudo-random number.
+func rand64() uint64 {
+ return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2))
+}
+
// New creates a new sync module.
//
// Concurrency: sync initializes two goroutines at startup: a "watcher" and an
@@ -70,7 +82,10 @@
// periodically contacting peers to fetch changes from them. In addition, the
// sync module responds to incoming RPCs from remote sync modules.
func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service) (*syncService, error) {
- s := &syncService{sv: sv}
+ s := &syncService{
+ sv: sv,
+ batches: make(batchSet),
+ }
data := &syncData{}
if err := util.GetObject(sv.St(), s.StKey(), data); err != nil {
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 72e8533..9e6efd4 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -61,12 +61,16 @@
// newSyncGroupVersion generates a random SyncGroup version ("etag").
func newSyncGroupVersion() string {
- return fmt.Sprintf("%x", rng.Int63())
+ return fmt.Sprintf("%x", rand64())
}
// newSyncGroupId generates a random SyncGroup ID.
func newSyncGroupId() interfaces.GroupId {
- return interfaces.GroupId(rng.Int63())
+ id := interfaces.NoGroupId
+ for id == interfaces.NoGroupId {
+ id = interfaces.GroupId(rand64())
+ }
+ return id
}
// verifySyncGroup verifies if a SyncGroup struct is well-formed.
diff --git a/services/syncbase/vsync/testdata/local-init-00.log.sync b/services/syncbase/vsync/testdata/local-init-00.log.sync
new file mode 100644
index 0000000..46b3502
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-init-00.log.sync
@@ -0,0 +1,6 @@
+# Create an object locally and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|1|||logrec-00|0|1|false
+addl|1234|2|1||logrec-01|0|1|false
+addl|1234|3|2||logrec-02|0|1|false
diff --git a/services/syncbase/vsync/testdata/local-init-01.sync b/services/syncbase/vsync/testdata/local-init-01.sync
new file mode 100644
index 0000000..86e24de
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-init-01.sync
@@ -0,0 +1,12 @@
+# Create an object DAG locally with branches and resolved conflicts.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|1|||logrec-00|0|1|false
+addl|1234|2|1||logrec-01|0|1|false
+addl|1234|3|2||logrec-02|0|1|false
+addl|1234|4|2||logrec-03|0|1|false
+addl|1234|5|3|4|logrec-04|0|1|false
+addl|1234|6|5||logrec-05|0|1|false
+addl|1234|7|2||logrec-06|0|1|false
+addl|1234|8|6|7|logrec-07|0|1|false
+addl|1234|9|8||logrec-08|0|1|false
diff --git a/services/syncbase/vsync/testdata/local-init-03.sync b/services/syncbase/vsync/testdata/local-init-03.sync
new file mode 100644
index 0000000..202a752
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-init-03.sync
@@ -0,0 +1,10 @@
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|1|||logrec-01|0|1|false
+addl|1234|2|1||logrec-02|0|1|false
+addl|1234|3|1||logrec-03|0|1|false
+addl|1234|4|2||logrec-04|0|1|false
+addl|1234|5|2||logrec-05|0|1|true
+addl|1234|6|4|5|logrec-06|0|1|false
+addl|1234|7|3|5|logrec-07|0|1|false
+addl|1234|8|6|7|logrec-08|0|1|false
diff --git a/services/syncbase/vsync/testdata/local-resolve-00.sync b/services/syncbase/vsync/testdata/local-resolve-00.sync
new file mode 100644
index 0000000..2c87de5
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-resolve-00.sync
@@ -0,0 +1,4 @@
+# Create an object locally and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|7|3|6|logrec-06|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-00.log.sync b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
new file mode 100644
index 0000000..1f9bb5b
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
@@ -0,0 +1,8 @@
+# Update an object remotely three times triggering one conflict after
+# it was created locally up to v3 (i.e. assume the remote sync received
+# it from the local sync at v2, then updated separately).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|2||VeyronPhone:10:1:0|0|1|false
+addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
+addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-01.log.sync b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
new file mode 100644
index 0000000..9581f69
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
@@ -0,0 +1,10 @@
+# Update an object remotely three times triggering a conflict with
+# 2 graft points: v1 and v4. This assumes that the remote sync got
+# v1, made its own conflicting v4 that it resolved into v5 (against v2)
+# then made a v6 change. When the local sync gets back this info it
+# sees 2 graft points: v1-v4 and v2-v5.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|1||VeyronLaptop:10:1:0|0|1|false
+addr|1234|5|2|4|VeyronPhone:10:1:0|0|1|false
+addr|1234|6|5||VeyronPhone:10:1:1|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-init-00.log.sync b/services/syncbase/vsync/testdata/remote-init-00.log.sync
new file mode 100644
index 0000000..9795d53
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-init-00.log.sync
@@ -0,0 +1,6 @@
+# Create an object remotely and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|1|||VeyronPhone:10:1:0|0|1|false
+addr|1234|2|1||VeyronPhone:10:1:1|0|1|false
+addr|1234|3|2||VeyronPhone:10:1:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
new file mode 100644
index 0000000..e2e2afa
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
@@ -0,0 +1,8 @@
+# Update an object remotely three times without triggering a conflict
+# after it was created locally up to v3 (i.e. assume the remote sync
+# received it from the local sync first, then updated it).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|3||VeyronPhone:10:1:0|0|1|false
+addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
+addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index 4c6f852..bb34c27 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -9,19 +9,20 @@
import (
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
+ "v.io/x/lib/vlog"
)
// forEachDatabaseStore iterates over all Databases in all Apps within the
-// service and invokes the provided callback function on each database. The
-// callback returns a "done" flag to make forEachDatabaseStore() stop the
-// iteration earlier; otherwise the function loops across all databases of all
-// apps.
+// service and invokes the callback function on each database. The callback
+// returns a "done" flag to make forEachDatabaseStore() stop the iteration
+// earlier; otherwise the function loops across all databases of all apps.
func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(store.Store) bool) {
// Get the apps and iterate over them.
// TODO(rdaoud): use a "privileged call" parameter instead of nil (here and
// elsewhere).
appNames, err := s.sv.AppNames(ctx, nil)
if err != nil {
+ vlog.Errorf("forEachDatabaseStore: cannot get all app names: %v", err)
return
}
@@ -29,10 +30,12 @@
// For each app, get its databases and iterate over them.
app, err := s.sv.App(ctx, nil, a)
if err != nil {
+ vlog.Errorf("forEachDatabaseStore: cannot get app %s: %v", a, err)
continue
}
dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
if err != nil {
+ vlog.Errorf("forEachDatabaseStore: cannot get all db names for app %s: %v", a, err)
continue
}
@@ -40,6 +43,7 @@
// For each database, get its Store and invoke the callback.
db, err := app.NoSQLDatabase(ctx, nil, d)
if err != nil {
+ vlog.Errorf("forEachDatabaseStore: cannot get db %s for app %s: %v", d, a, err)
continue
}