| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vsync |
| |
| // Syncbase DAG (directed acyclic graph) utility functions. |
| // |
| // The DAG is used to track the version history of synced objects in order |
| // to detect and resolve conflicts (concurrent changes on different devices). |
| // |
| // Note: the sync code uses the words "object" and "object ID" (oid) as a |
| // generic way to refer to syncable entities, whether they are actual user data |
| // (table row and its row key), prefix-ACLs (permission entry and its prefix), |
| // or other metadata such as SyncGroups (SyncGroup value and its internal key |
| // based on the SyncGroup ID). |
| // |
| // * Object IDs are globally unique across all devices. |
| // * Syncable objects have version numbers associated with their mutations. |
| // * For a given object ID, the version number is globally unique across all |
| // devices, i.e. the (oid, version) tuple is globally unique. |
| // * Each (oid, version) tuple is represented by a node in the DAG. |
| // * The previous version of an object is its parent in the DAG, i.e. the |
| // new version is derived from that parent. |
| // * DAG nodes have child-to-parent pointers. |
| // * When there are no conflicts, the parent node has a single child node |
| // that points to it. |
| // * When a parent node has more than one child, this indicates concurrent |
| // mutations which are treated as a conflict to be resolved. |
| // * When a conflict is resolved, the new version has pointers back to each of |
| // the two parents to indicate that it is derived from both nodes. |
| // * During a sync operation from a source device to a target device, the |
| // target receives a DAG fragment from the source. That fragment has to |
| // be incorporated (grafted) into the target device's DAG. It may be a |
| // continuation of the DAG of an object, with the attachment (graft) point |
| // being the current head of DAG, in which case there are no conflicts. |
| // Or the graft point(s) may be older nodes, which means the new fragment |
| // is a divergence in the graph causing a conflict that must be resolved |
| // in order to re-converge the two DAG fragments. |
| // |
| // In the diagrams below: |
| // (h) represents the head node in the local device. |
| // (nh) represents the new head node received from the remote device. |
| // (g) represents a graft node, where new nodes attach to the existing DAG. |
| // <- represents a derived-from mutation, i.e. a child-to-parent pointer |
| // |
| // a- No-conflict example: the new nodes (v4, v5) attach to the head node (v3). |
| // In this case the new head becomes the head node, the new DAG fragment |
| // being a continuation of the existing DAG. |
| // |
| // Before: |
| // v1 <- v2 <- v3(h) |
| // |
| // Sync updates applied, no conflict detected: |
| // v1 <- v2 <- v3(h,g) <- v4 <- v5 (nh) |
| // |
| // After: |
| // v1 <- v2 <- v3 <- v4 <- v5 (h) |
| // |
| // b- Conflict example: the new nodes (v4, v5) attach to an old node (v2). |
| // The current head node (v3) and the new head node (v5) are divergent |
| // (concurrent) mutations that need to be resolved. The conflict |
| // resolution function is passed the old head (v3), new head (v5), and |
| // the common ancestor (v2). It resolves the conflict with (v6) which |
| // is represented in the DAG as derived from both v3 and v5 (2 parents). |
| // |
| // Before: |
| // v1 <- v2 <- v3(h) |
| // |
| // Sync updates applied, conflict detected (v3 not a graft node): |
| // v1 <- v2(g) <- v3(h) |
| // <- v4 <- v5 (nh) |
| // |
| // After: conflict resolver creates v6 having 2 parents (v3, v5): |
| // v1 <- v2(g) <- v3 <------- v6(h) |
| // <- v4 <- v5 <- |
| // |
| // The DAG does not grow indefinitely. During a sync operation each device |
| // learns what the other device already knows -- where it's at in the version |
| // history for the objects. When a device determines that all devices that |
| // sync an object (members of matching SyncGroups) have moved past some version |
| // for that object, the DAG for that object can be pruned up to that common |
| // version, deleting all prior (ancestor) nodes. |
| // |
| // The DAG contains three tables persisted to disk (nodes, heads, batches): |
| // |
| // * nodes: one entry per (oid, version) with references to parent node(s) |
| // it is derived from, a reference to the log record identifying |
| // that mutation, a reference to its write batch (if any), and a |
| // boolean to indicate whether this was an object deletion. |
| // |
| // * heads: one entry per object pointing to its most recent version. |
| // |
| // * batches: one entry per batch ID containing the set of objects in the |
| // write batch and their versions. |
| |
| import ( |
| "container/list" |
| "fmt" |
| |
| "v.io/syncbase/x/ref/services/syncbase/server/util" |
| "v.io/syncbase/x/ref/services/syncbase/store" |
| "v.io/v23/context" |
| "v.io/v23/verror" |
| "v.io/x/lib/vlog" |
| ) |
| |
| const ( |
| NoVersion = "" |
| NoBatchId = uint64(0) |
| ) |
| |
| // dagNode holds the information on a object mutation in the DAG. |
| // Note: the batch ID and deleted flag are copies of information in the log |
| // record. They are also stored in the DAG node to improve DAG traversal for |
| // conflict resolution and pruning without having to fetch the full log record |
| // every time. |
| type dagNode struct { |
| Level uint64 // node distance from root |
| Parents []string // references to parent versions |
| Logrec string // reference to log record |
| BatchId uint64 // ID of a write batch |
| Deleted bool // true if the change was a delete |
| } |
| |
| // batchSet holds information on a set of write batches. |
| type batchSet map[uint64]*batchInfo |
| |
| // batchInfo holds the information on a write batch: |
| // - The map of syncable (versioned) objects: {oid: version} |
| // - The total count of batch objects, including non-syncable ones. |
| // TODO(rdaoud): add support to track the read and scan sets. |
| type batchInfo struct { |
| Objects map[string]string |
| Count uint64 |
| } |
| |
| // graftMap holds the state of DAG node grafting (attaching) per object. |
| type graftMap map[string]*graftInfo |
| |
| // graftInfo holds the state of an object's node grafting in the DAG. |
| // It is ephemeral (in-memory), used during a single sync operation to track |
| // where the new DAG fragments are attached to the existing DAG for the object: |
| // - newNodes: the set of newly added nodes; used to detect the type of edges |
| // between nodes (new-node to old-node or vice versa). |
| // - newHeads: the set of new candidate head nodes; used to detect conflicts. |
| // - graftNodes: the set of old nodes on which new nodes were added, and their |
| // level in the DAG; used to find common ancestors for conflicts. |
| // - oldHeadSnap: snapshot of the current local head known by sync, used in |
| // conflict detection, particularly when conflict detection needs |
| // to be retried due to sync dag state being stale compared |
| // to local store. |
| // |
| // After the received mutations are applied, if there are two heads in the |
| // newHeads set, there is a conflict to be resolved for the object. Otherwise, |
| // if there is one head, no conflict was triggered and the new head becomes the |
| // current object version. In case of conflict, the graftNodes set is used to |
| // select a common ancestor. |
| // TODO(rdaoud): support open DAGs to handle delayed conflict resolution by |
| // tracking multiple dangling remote heads in addition to the local head node. |
| type graftInfo struct { |
| newNodes map[string]bool |
| newHeads map[string]bool |
| graftNodes map[string]uint64 |
| oldHeadSnap string |
| } |
| |
| // newBatchInfo allocates and initializes a batch info entry. |
| func newBatchInfo() *batchInfo { |
| return &batchInfo{Objects: make(map[string]string), Count: 0} |
| } |
| |
| // startBatch marks the start of a batch. It generates a batch ID and returns |
| // it to the caller, if an ID is not given. The batch ID is used to track DAG |
| // nodes that are part of the same batch and it is stored in the log records. |
| // If a batch ID is given by the caller, its information is accessed. |
| func (s *syncService) startBatch(ctx *context.T, st store.StoreReader, btid uint64) uint64 { |
| s.batchesLock.Lock() |
| defer s.batchesLock.Unlock() |
| |
| // If no batch ID is given, generate a new unused one. |
| if btid == NoBatchId { |
| for (btid == NoBatchId) || (s.batches[btid] != nil) { |
| btid = rand64() |
| } |
| |
| s.batches[btid] = newBatchInfo() |
| return btid |
| } |
| |
| // Use the given batch ID and, if needed, refetch its in-memory entry |
| // from the store. It is OK not to find it in the store; it means sync |
| // is learning about this batch ID the first time from another sync. |
| if s.batches[btid] == nil { |
| info, err := getBatch(ctx, st, btid) |
| if err != nil { |
| info = newBatchInfo() |
| } |
| s.batches[btid] = info |
| } |
| |
| return btid |
| } |
| |
| // addNodeToBatch adds a node (oid, version) to a batch under construction. |
| func (s *syncService) addNodeToBatch(ctx *context.T, btid uint64, oid, version string) error { |
| s.batchesLock.Lock() |
| defer s.batchesLock.Unlock() |
| |
| if btid == NoBatchId { |
| return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid) |
| } |
| |
| info := s.batches[btid] |
| if info == nil { |
| return verror.New(verror.ErrInternal, ctx, "unknown batch id", btid) |
| } |
| |
| info.Objects[oid] = version |
| return nil |
| } |
| |
| // endBatch marks the end of a given batch. The batch information is persisted |
| // to the store and removed from the temporary in-memory entry. |
| func (s *syncService) endBatch(ctx *context.T, tx store.StoreReadWriter, btid, count uint64) error { |
| _ = tx.(store.Transaction) |
| |
| s.batchesLock.Lock() |
| defer s.batchesLock.Unlock() |
| |
| if btid == NoBatchId || count == 0 { |
| return verror.New(verror.ErrInternal, ctx, "invalid batch info", btid, count) |
| } |
| |
| info := s.batches[btid] |
| if info == nil { |
| return verror.New(verror.ErrInternal, ctx, "unknown batch id", btid) |
| } |
| |
| // The first time a batch is ended, info.Count is zero. Subsequently, |
| // if this batch ID is started and ended again, info.Count should be |
| // the same as the "count" value given. |
| if info.Count != 0 && info.Count != count { |
| return verror.New(verror.ErrInternal, ctx, "wrong counts for batch", btid, info.Count, count) |
| } |
| |
| // Only save non-empty batches. |
| if len(info.Objects) > 0 { |
| info.Count = count |
| if err := setBatch(ctx, tx, btid, info); err != nil { |
| return err |
| } |
| } |
| |
| delete(s.batches, btid) |
| return nil |
| } |
| |
| // addNode adds a new node for a DAG object, linking it to its parent nodes. |
| // It verifies that the node does not exist and its parent nodes are valid. |
| // If a batch ID is given, track the node membership in the batch. |
| // |
| // Note: an in-memory grafting structure is passed to track DAG attachments |
| // during a sync operation. This is needed when nodes are being added due to |
| // remote changes fetched by the sync protocol. The Initiator allocates a |
| // grafting structure at the start of a sync operation and passes it across |
| // calls to addNode() to update the DAG grafting state: |
| // - If a parent node is not new, mark it as a DAG graft point. |
| // - Mark this version as a new node. |
| // - Update the new head node pointer of the grafted DAG. |
| // |
| // The grafting structure is not needed when nodes are being added locally by |
| // the Watcher, passing a nil grafting structure. |
| func (s *syncService) addNode(ctx *context.T, tx store.StoreReadWriter, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error { |
| _ = tx.(store.Transaction) |
| |
| if parents != nil { |
| if len(parents) > 2 { |
| return verror.New(verror.ErrInternal, ctx, "cannot have more than 2 parents") |
| } |
| if len(parents) == 0 { |
| parents = nil // replace an empty array with a nil |
| } |
| } |
| |
| // The new node must not exist. |
| if ok, err := hasNode(ctx, tx, oid, version); err != nil { |
| return err |
| } else if ok { |
| return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version) |
| } |
| |
| // Verify the parents, determine the node level. Also save the levels |
| // of the parent nodes for later in this function in graft updates. |
| parentLevels := make(map[string]uint64) |
| var level uint64 |
| for _, parent := range parents { |
| pnode, err := getNode(ctx, tx, oid, parent) |
| if err != nil { |
| return err |
| } |
| parentLevels[parent] = pnode.Level |
| if level <= pnode.Level { |
| level = pnode.Level + 1 |
| } |
| } |
| |
| // If a batch ID is given, add the node to that batch. |
| if btid != NoBatchId { |
| if err := s.addNodeToBatch(ctx, btid, oid, version); err != nil { |
| return err |
| } |
| } |
| |
| // Add the node entry to the DAG. |
| node := &dagNode{ |
| Level: level, |
| Parents: parents, |
| Logrec: logrec, |
| BatchId: btid, |
| Deleted: deleted, |
| } |
| if err := setNode(ctx, tx, oid, version, node); err != nil { |
| return err |
| } |
| |
| // We are done if grafting is not being tracked (a local node add). |
| if graft == nil { |
| return nil |
| } |
| |
| // Get the object's graft info entry in order to update it. It happens |
| // when addNode() is called by the sync Initiator and the DAG is updated |
| // with new nodes fetched from other devices. |
| // |
| // During a sync operation, each mutated object gets new nodes added in |
| // its DAG. These new nodes are either derived from nodes that were |
| // previously known on this device (i.e. their parent nodes are pre- |
| // existing, or they have no parents (new root nodes)), or they are |
| // derived from other new DAG nodes being discovered during this sync |
| // (i.e. their parent nodes were also just added to the DAG). |
| // |
| // To detect a conflict and find the most recent common ancestor to |
| // pass to the conflict resolver, the DAG graft info keeps track of the |
| // new nodes that have old parent nodes. These old-to-new edges are |
| // points where new DAG fragments are attached (grafted) onto the |
| // existing DAG. The old nodes are the graft nodes forming the set of |
| // common ancestors to use in conflict resolution: |
| // |
| // 1- A conflict happens when the current "head node" for an object is |
| // not in the set of graft nodes. It means the object mutations |
| // were not derived from what the device knows, but are divergent |
| // changes at a prior point. |
| // |
| // 2- The most recent common ancestor to use in resolving the conflict |
| // is the object graft node with the deepest level (furthest from |
| // the root node), representing the most up-to-date common knowledge |
| // between the devices. |
| info := getObjectGraftInfo(ctx, tx, graft, oid) |
| |
| for _, parent := range parents { |
| // If this parent is an old node, it's a graft point. |
| if !info.newNodes[parent] { |
| info.graftNodes[parent] = parentLevels[parent] |
| } |
| |
| // A parent cannot be a candidate for a new head. |
| delete(info.newHeads, parent) |
| } |
| |
| // This new node is a candidate for new head version. |
| info.newNodes[version] = true |
| info.newHeads[version] = true |
| return nil |
| } |
| |
| // addParent adds to the DAG node (oid, version) linkage to this parent node. |
| // |
| // Note: as with the addNode() call, an in-memory grafting structure is passed |
| // to track DAG attachements during a sync operation. It is not needed if the |
| // parent linkage is due to a local change (from conflict resolution selecting |
| // an existing version). |
| func (s *syncService) addParent(ctx *context.T, tx store.StoreReadWriter, oid, version, parent string, graft graftMap) error { |
| _ = tx.(store.Transaction) |
| |
| if version == parent { |
| return verror.New(verror.ErrInternal, ctx, "object", oid, version, "cannot be its own parent") |
| } |
| |
| node, err := getNode(ctx, tx, oid, version) |
| if err != nil { |
| return err |
| } |
| pnode, err := getNode(ctx, tx, oid, parent) |
| if err != nil { |
| return err |
| } |
| |
| // Check if the parent is already linked to this node. |
| found := false |
| for _, p := range node.Parents { |
| if p == parent { |
| found = true |
| break |
| } |
| } |
| |
| // Add the parent if it is not yet linked. |
| if !found { |
| // Make sure that adding the link does not create a DAG cycle. |
| // Verify that the node is not an ancestor of the parent that |
| // it is being linked to. |
| err = forEachAncestor(ctx, tx, oid, pnode.Parents, func(v string, nd *dagNode) error { |
| if v == version { |
| return verror.New(verror.ErrInternal, ctx, "cycle on object", |
| oid, ": node", version, "is ancestor of parent", parent) |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| node.Parents = append(node.Parents, parent) |
| if err = setNode(ctx, tx, oid, version, node); err != nil { |
| return err |
| } |
| } |
| |
| // If no grafting structure is given (i.e. local changes), we are done. |
| if graft == nil { |
| return nil |
| } |
| |
| // Update graft: if the node and its parent are new/old or old/new then |
| // add the parent as a graft point (a potential common ancestor). |
| info := getObjectGraftInfo(ctx, tx, graft, oid) |
| |
| _, nodeNew := info.newNodes[version] |
| _, parentNew := info.newNodes[parent] |
| if (nodeNew && !parentNew) || (!nodeNew && parentNew) { |
| info.graftNodes[parent] = pnode.Level |
| } |
| |
| // The parent node can no longer be a candidate for a new head version. |
| delete(info.newHeads, parent) |
| return nil |
| } |
| |
| // moveHead moves the object head node in the DAG. |
| func moveHead(ctx *context.T, tx store.StoreReadWriter, oid, head string) error { |
| _ = tx.(store.Transaction) |
| |
| // Verify that the node exists. |
| if ok, err := hasNode(ctx, tx, oid, head); err != nil { |
| return err |
| } else if !ok { |
| return verror.New(verror.ErrInternal, ctx, "node", oid, head, "does not exist") |
| } |
| |
| return setHead(ctx, tx, oid, head) |
| } |
| |
| // hasConflict determines if an object has a conflict between its new and old |
| // head nodes. |
| // - Yes: return (true, newHead, oldHead, ancestor) -- from a common past |
| // - Yes: return (true, newHead, oldHead, NoVersion) -- from disjoint pasts |
| // - No: return (false, newHead, oldHead, NoVersion) -- no conflict |
| // |
| // A conflict exists when there are two new-head nodes in the graft structure. |
| // It means the newly added object versions are not derived in part from this |
| // device's current knowledge. A conflict also exists if the snapshotted local |
| // head is different from the current local head. If there is a single new-head |
| // and the snapshot head is the same as the current local head, the object |
| // changes were applied without triggering a conflict. |
| func hasConflict(ctx *context.T, st store.StoreReader, oid string, graft graftMap) (isConflict bool, newHead, oldHead, ancestor string, err error) { |
| isConflict = false |
| oldHead = NoVersion |
| newHead = NoVersion |
| ancestor = NoVersion |
| err = nil |
| |
| if graft == nil { |
| err = verror.New(verror.ErrInternal, ctx, "no DAG graft map given") |
| return |
| } |
| |
| info := graft[oid] |
| if info == nil { |
| err = verror.New(verror.ErrInternal, ctx, "node", oid, "has no DAG graft info") |
| return |
| } |
| |
| numHeads := len(info.newHeads) |
| if numHeads < 1 || numHeads > 2 { |
| err = verror.New(verror.ErrInternal, ctx, "node", oid, "invalid count of new heads", numHeads) |
| return |
| } |
| |
| // Fetch the current head for this object if it exists. The error from |
| // getHead() is ignored because a newly received object is not yet known |
| // on this device and will not trigger a conflict. |
| oldHead, _ = getHead(ctx, st, oid) |
| |
| // If there is only one new head node and the snapshotted old head is |
| // still unchanged, there is no conflict. The new head is that single |
| // one, even if it might also be the same old node. |
| if numHeads == 1 { |
| for head := range info.newHeads { |
| newHead = head |
| } |
| if newHead == info.oldHeadSnap { |
| // Only link log records could've been received. |
| newHead = oldHead |
| return |
| } else if oldHead == info.oldHeadSnap { |
| return |
| } |
| } |
| |
| // The new head is the non-old one. |
| for head := range info.newHeads { |
| if head != info.oldHeadSnap { |
| newHead = head |
| break |
| } |
| } |
| |
| // There wasn't a conflict at the old snapshot, but now there is. The |
| // snapshotted head is the common ancestor. |
| isConflict = true |
| if numHeads == 1 { |
| vlog.VI(4).Infof("sync: hasConflict: old graft snapshot %v, head %s", graft, oldHead) |
| ancestor = info.oldHeadSnap |
| return |
| } |
| |
| // There is a conflict: the best choice ancestor is the graft node with |
| // the largest level (farthest from the root). It is possible in some |
| // corner cases to have multiple graft nodes at the same level. This |
| // would still be a single conflict, but the multiple same-level graft |
| // nodes representing equivalent conflict resolutions on different |
| // devices that are now merging their resolutions. In such a case it |
| // does not matter which node is chosen as the ancestor because the |
| // conflict resolver function is assumed to be convergent. However it |
| // is nicer to make that selection deterministic so all devices see the |
| // same choice: the version number is used as a tie-breaker. |
| // Note: for the case of a conflict from disjoint pasts, there are no |
| // graft nodes (empty set) and thus no common ancestor because the two |
| // DAG fragments were created from distinct root nodes. The "NoVersion" |
| // value is returned as the ancestor. |
| var maxLevel uint64 |
| for node, level := range info.graftNodes { |
| if maxLevel < level || (maxLevel == level && ancestor < node) { |
| maxLevel = level |
| ancestor = node |
| } |
| } |
| return |
| } |
| |
| // newGraft allocates a graftMap to track DAG node grafting during sync. |
| func newGraft() graftMap { |
| return make(graftMap) |
| } |
| |
| // getObjectGraft returns the graftInfo for an object ID. If the graftMap is |
| // nil, a nil graftInfo is returned because grafting is not being tracked. |
| func getObjectGraftInfo(ctx *context.T, st store.StoreReader, graft graftMap, oid string) *graftInfo { |
| if graft == nil { |
| return nil |
| } |
| if info := graft[oid]; info != nil { |
| return info |
| } |
| |
| info := &graftInfo{ |
| newNodes: make(map[string]bool), |
| newHeads: make(map[string]bool), |
| graftNodes: make(map[string]uint64), |
| } |
| |
| // If the object has a head node, include it in the set of new heads. |
| if head, err := getHead(ctx, st, oid); err == nil { |
| info.newHeads[head] = true |
| info.oldHeadSnap = head |
| } |
| |
| graft[oid] = info |
| return info |
| } |
| |
| // forEachAncestor loops over the DAG ancestor nodes of an object in a breadth- |
| // first traversal starting from given version nodes. It calls the given |
| // callback function once for each ancestor node. |
| func forEachAncestor(ctx *context.T, st store.StoreReader, oid string, startVersions []string, callback func(version string, node *dagNode) error) error { |
| visited := make(map[string]bool) |
| queue := list.New() |
| for _, version := range startVersions { |
| queue.PushBack(version) |
| visited[version] = true |
| } |
| |
| for queue.Len() > 0 { |
| version := queue.Remove(queue.Front()).(string) |
| node, err := getNode(ctx, st, oid, version) |
| if err != nil { |
| // Ignore it, the parent was previously pruned. |
| continue |
| } |
| for _, parent := range node.Parents { |
| if !visited[parent] { |
| queue.PushBack(parent) |
| visited[parent] = true |
| } |
| } |
| if err = callback(version, node); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // newBatchPruning allocates an in-memory structure to track batches affected |
| // by a DAG pruning operation across objects. |
| func newBatchPruning() batchSet { |
| return make(batchSet) |
| } |
| |
| // prune trims the DAG of an object at a given version (node) by deleting all |
| // its ancestor nodes, making it the new root node. For each deleted node it |
| // calls the given callback function to delete its log record. |
| // |
| // Note: this function should only be used when sync determines that all devices |
| // that know about this object have gotten past this version. |
| // |
| // The batch set passed is used to track batches affected by the deletion of DAG |
| // objects across multiple calls to prune(). It is later given to pruneDone() |
| // to do GC on these batches. |
| func prune(ctx *context.T, tx store.StoreReadWriter, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.StoreReadWriter, logrec string) error) error { |
| _ = tx.(store.Transaction) |
| |
| if batches == nil { |
| return verror.New(verror.ErrInternal, ctx, "missing batch set") |
| } |
| |
| // Get the node at the pruning point and set its parents to nil. |
| // It will become the oldest DAG node (root) for the object. |
| node, err := getNode(ctx, tx, oid, version) |
| if err != nil { |
| return err |
| } |
| if node.Parents == nil { |
| // Nothing to do, this node is already the root. |
| return nil |
| } |
| |
| parents := node.Parents |
| node.Parents = nil |
| if err = setNode(ctx, tx, oid, version, node); err != nil { |
| return err |
| } |
| |
| // Delete all ancestor nodes and their log records. Delete as many as |
| // possible and track the error counts. Update the batch set to track |
| // their pruning. |
| nodeErrs, logErrs := 0, 0 |
| forEachAncestor(ctx, tx, oid, parents, func(v string, nd *dagNode) error { |
| if btid := nd.BatchId; btid != NoBatchId { |
| if batches[btid] == nil { |
| batches[btid] = newBatchInfo() |
| } |
| batches[btid].Objects[oid] = v |
| } |
| |
| if err := delLogRec(ctx, tx, nd.Logrec); err != nil { |
| logErrs++ |
| } |
| if err := delNode(ctx, tx, oid, v); err != nil { |
| nodeErrs++ |
| } |
| return nil |
| }) |
| if nodeErrs != 0 || logErrs != 0 { |
| return verror.New(verror.ErrInternal, ctx, |
| "prune failed to delete nodes and logs:", nodeErrs, logErrs) |
| } |
| return nil |
| } |
| |
| // pruneDone is called when object pruning is finished within a single pass of |
| // the sync garbage collector. It updates the batch sets affected by objects |
| // deleted by prune(). |
| func pruneDone(ctx *context.T, tx store.StoreReadWriter, batches batchSet) error { |
| _ = tx.(store.Transaction) |
| |
| // Update batch sets by removing the pruned objects from them. |
| for btid, pruneInfo := range batches { |
| info, err := getBatch(ctx, tx, btid) |
| if err != nil { |
| return err |
| } |
| |
| for oid := range pruneInfo.Objects { |
| delete(info.Objects, oid) |
| } |
| |
| if len(info.Objects) > 0 { |
| err = setBatch(ctx, tx, btid, info) |
| } else { |
| err = delBatch(ctx, tx, btid) |
| } |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // getLogRecKey returns the key of the log record for a given object version. |
| func getLogRecKey(ctx *context.T, st store.StoreReader, oid, version string) (string, error) { |
| node, err := getNode(ctx, st, oid, version) |
| if err != nil { |
| return "", err |
| } |
| return node.Logrec, nil |
| } |
| |
| // Low-level utility functions to access DB entries without tracking their |
| // relationships. Use the functions above to manipulate the DAG. |
| |
| // nodeKey returns the key used to access a DAG node (oid, version). |
| func nodeKey(oid, version string) string { |
| return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "n", oid, version) |
| } |
| |
| // setNode stores the DAG node entry. |
| func setNode(ctx *context.T, tx store.StoreReadWriter, oid, version string, node *dagNode) error { |
| _ = tx.(store.Transaction) |
| |
| if version == NoVersion { |
| return verror.New(verror.ErrInternal, ctx, "invalid version", version) |
| } |
| |
| return util.Put(ctx, tx, nodeKey(oid, version), node) |
| } |
| |
| // getNode retrieves the DAG node entry for the given (oid, version). |
| func getNode(ctx *context.T, st store.StoreReader, oid, version string) (*dagNode, error) { |
| if version == NoVersion { |
| return nil, verror.New(verror.ErrInternal, ctx, "invalid version", version) |
| } |
| |
| var node dagNode |
| key := nodeKey(oid, version) |
| if err := util.Get(ctx, st, key, &node); err != nil { |
| return nil, err |
| } |
| return &node, nil |
| } |
| |
| // delNode deletes the DAG node entry. |
| func delNode(ctx *context.T, tx store.StoreReadWriter, oid, version string) error { |
| _ = tx.(store.Transaction) |
| |
| if version == NoVersion { |
| return verror.New(verror.ErrInternal, ctx, "invalid version", version) |
| } |
| |
| return util.Delete(ctx, tx, nodeKey(oid, version)) |
| } |
| |
| // hasNode returns true if the node (oid, version) exists in the DAG. |
| func hasNode(ctx *context.T, st store.StoreReader, oid, version string) (bool, error) { |
| // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data. |
| if _, err := getNode(ctx, st, oid, version); err != nil { |
| if verror.ErrorID(err) == verror.ErrNoExist.ID { |
| err = nil |
| } |
| return false, err |
| } |
| return true, nil |
| } |
| |
| // headKey returns the key used to access the DAG object head. |
| func headKey(oid string) string { |
| return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "h", oid) |
| } |
| |
| // setHead stores version as the DAG object head. |
| func setHead(ctx *context.T, tx store.StoreReadWriter, oid, version string) error { |
| _ = tx.(store.Transaction) |
| |
| if version == NoVersion { |
| return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version)) |
| } |
| |
| return util.Put(ctx, tx, headKey(oid), version) |
| } |
| |
| // getHead retrieves the DAG object head. |
| func getHead(ctx *context.T, st store.StoreReader, oid string) (string, error) { |
| var version string |
| key := headKey(oid) |
| if err := util.Get(ctx, st, key, &version); err != nil { |
| return NoVersion, err |
| } |
| return version, nil |
| } |
| |
| // delHead deletes the DAG object head. |
| func delHead(ctx *context.T, tx store.StoreReadWriter, oid string) error { |
| _ = tx.(store.Transaction) |
| return util.Delete(ctx, tx, headKey(oid)) |
| } |
| |
| // batchKey returns the key used to access the DAG batch info. |
| func batchKey(btid uint64) string { |
| return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "b", fmt.Sprintf("%d", btid)) |
| } |
| |
| // setBatch stores the DAG batch entry. |
| func setBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64, info *batchInfo) error { |
| _ = tx.(store.Transaction) |
| |
| if btid == NoBatchId { |
| return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid) |
| } |
| |
| return util.Put(ctx, tx, batchKey(btid), info) |
| } |
| |
| // getBatch retrieves the DAG batch entry. |
| func getBatch(ctx *context.T, st store.StoreReader, btid uint64) (*batchInfo, error) { |
| if btid == NoBatchId { |
| return nil, verror.New(verror.ErrInternal, ctx, "invalid batch id", btid) |
| } |
| |
| var info batchInfo |
| key := batchKey(btid) |
| if err := util.Get(ctx, st, key, &info); err != nil { |
| return nil, err |
| } |
| return &info, nil |
| } |
| |
| // delBatch deletes the DAG batch entry. |
| func delBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64) error { |
| _ = tx.(store.Transaction) |
| |
| if btid == NoBatchId { |
| return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid) |
| } |
| |
| return util.Delete(ctx, tx, batchKey(btid)) |
| } |
| |
| // getParentMap is a testing and debug helper function that returns for an |
| // object a map of its DAG (node-to-parents relations). If a graft structure |
| // is given, include its fragments in the map. |
| func getParentMap(ctx *context.T, st store.StoreReader, oid string, graft graftMap) map[string][]string { |
| parentMap := make(map[string][]string) |
| var start []string |
| |
| if head, err := getHead(ctx, st, oid); err == nil { |
| start = append(start, head) |
| } |
| if graft != nil && graft[oid] != nil { |
| for v := range graft[oid].newHeads { |
| start = append(start, v) |
| } |
| } |
| |
| forEachAncestor(ctx, st, oid, start, func(v string, nd *dagNode) error { |
| parentMap[v] = nd.Parents |
| return nil |
| }) |
| return parentMap |
| } |