| package vsync |
| |
| // Garbage collection (GC) in sync reclaims space occupied by sync's |
| // data structures when possible. For its operation, sync keeps every |
| // version of every object produced locally and remotely in its dag |
| // and log data structures. Keeping these versions indefinitely is not |
| // feasible given space constraints on any device. Thus to reclaim |
| // space, a GC thread periodically checks to see if any state can be |
| // deleted. GC looks for generations that every device in the system |
| // already knows about and deletes state belonging to those |
| // generations. Since every device in the system already knows about |
| // these generations, it is safe to delete them. Newly added devices |
| // will only get state starting from the generations not yet |
| // reclaimed. Policies are needed to handle devices that were part of |
| // the system, but are no longer available. Such devices will prevent |
| // GC from moving forward since they will not request new generations. |
| // |
| // GC in sync happens in 3 stages: |
| // ** reclamation phase |
| // ** object pruning phase |
| // ** online consistency check phase |
| // |
| // Reclamation phase: GC learns of the state of the other devices when |
| // it talks to those devices to obtain missing updates |
| // (initiator). The generation vectors of these devices are stored in |
| // the device table. In the reclamation phase, we go through the |
| // generation vectors of all the devices and compute the maximum |
| // generation of each device known to every other device. This maximum |
| // generation for each device is stored in the reclaim generation |
| // vector. We then iterate through each generation between the old |
| // reclaim vector to the new reclaim vector, and create for each |
| // object belonging to those generations, the history of versions that |
| // will be reclaimed and the most recent version that can be |
| // reclaimed. |
| // |
| // Object pruning phase: In this phase, for an object marked for |
| // reclamation, we prune its dag starting from the most recent version |
| // that is being reclaimed and delete all the versions that are |
| // older. As we prune the dag, we also delete the corresponding log |
| // records and update the generation metadata. Note that since the |
| // actual deletes proceed object by object, the generations will start |
| // to have missing log records, and we use the generation metadata to |
| // ensure that the generation deletes are tracked accurately. Thus, |
| // the decision of what to prune happens top down using generation |
| // information, while the actual pruning happens bottom up from the |
| // dag. Pruning bottom up ensures that the object dags are consistent. |
| // |
| // Online consistency check phase: GC stages need write access to the |
| // sync data structures since they perform delete operations. Hence, |
| // GC is executed under a write lock and excludes other goroutines in |
| // syncd. In order to control the impact of GC on foreground |
| // operations, GC is designed to be incremental in its execution. Once |
| // objects are marked for deletion, only a small batch of objects are |
| // pruned and persisted and the lock is released. Thus objects are |
| // incrementally deleted, a small batch every |
| // garbageCollectInterval. To persist the changes from a round of GC, |
| // we immediately persist the new reclaim vector. For the batch of |
| // objects gc'ed in a round, we also persist their deletions. However, |
| // if the system restarts or crashes when all the dirty objects from a |
| // round of GC are not processed, there will be state from generations |
| // older than the reclaim vector still persisted in kvdb. Since the |
| // reclaim vector has already been advanced, this state cannot be |
| // detected, resulting in leakage of space. To prevent this, we could |
| // have persisted the GC state to support restartability. However, to |
| // keep GC light weight, we went with the approach of not persisting |
| // the transient GC state but lazily performing a consistency check on |
| // kvdb to detect dangling records. Online consistency check phase |
| // performs this checking. It checks every generation older than the |
| // reclaim vector snapshotted at bootstrap to see if it has any state |
| // left over in kvdb. If it finds dangling state, it marks the |
| // corresponding objects as dirty for pruning. This consistency check |
| // happens only once upon reboot. Once all generations lower than the |
| // reclaim vector snapshot are verified, this phase is a noop. Once |
| // again, to limit the overhead of this phase, it processes only a |
| // small batch of generations in each round of GC invocation. |
| // |
| // Finally, the underlying kvdb store persists state by writing to a |
| // log file upon flush. Thus, as we continue to flush to kvdb, the log |
| // file will keep growing. In addition, upon restart, this log file |
| // must be processed to reconstruct the kvdb state. To keep this log |
| // file from becoming large, we need to periodically compact kvdb. |
| import ( |
| "errors" |
| "fmt" |
| "time" |
| |
| "veyron2/storage" |
| "veyron2/vlog" |
| ) |
| |
| var ( |
| // garbage collection (space reclamation) is invoked every |
| // garbageCollectInterval. |
| garbageCollectInterval = 3600 * time.Second |
| |
| // strictCheck when enabled performs strict checking of every |
| // log record being deleted to confirm that it should be in |
| // fact deleted. |
| strictCheck = true |
| |
| // Every compactCount iterations of garbage collection, kvdb |
| // is compacted. This value has performance implications as |
| // kvdb compaction is expensive. |
| compactCount = 100 |
| |
| // Batch size for the number of objects that are garbage |
| // collected every gc iteration. This value impacts the |
| // amount of time gc is running. GC holds a write lock on the |
| // data structures, blocking out all other operations in the |
| // system while it is running. |
| objBatchSize = 20 |
| |
| // Batch size for the number of generations that are verified |
| // every gc iteration. |
| genBatchSize = 100 |
| |
| // Errors. |
| errBadMetadata = errors.New("bad metadata") |
| ) |
| |
| // objGCState tracks the per-object GC state. |
| // "version" is the most recent version of the object that can be |
| // pruned (and hence all older versions can be pruned as well). |
| // |
| // "pos" is used to compute the most recent version of the object |
| // among all the versions that can be pruned (version with the highest |
| // pos is the most recent). "version" of the object belongs to a |
| // generation. "pos" is the position of that generation in the local |
| // log. |
| type objGCState struct { |
| version storage.Version |
| pos uint32 |
| } |
| |
| // objVersHist tracks all the versions of the object that need to be |
| // gc'ed when strictCheck is enabled. |
| type objVersHist struct { |
| versions map[storage.Version]struct{} |
| } |
| |
| // syncGC contains the metadata and state for the Sync GC thread. |
| type syncGC struct { |
| // syncd is a pointer to the Syncd instance owning this GC. |
| syncd *syncd |
| |
| // checkConsistency controls whether online consistency checks are run. |
| checkConsistency bool |
| |
| // reclaimSnap is the snapshot of the reclaim vector at startup. |
| reclaimSnap GenVector |
| |
| // pruneObjects holds the per-object state for garbage collection. |
| pruneObjects map[storage.ID]*objGCState |
| |
| // verifyPruneMap holds the per-object version history to verify GC operations |
| // on an object. It is used when strictCheck is enabled. |
| verifyPruneMap map[storage.ID]*objVersHist |
| } |
| |
| // newGC creates a new syncGC instance attached to the given Syncd instance. |
| func newGC(syncd *syncd) *syncGC { |
| g := &syncGC{ |
| syncd: syncd, |
| checkConsistency: true, |
| reclaimSnap: GenVector{}, |
| pruneObjects: make(map[storage.ID]*objGCState), |
| } |
| |
| if strictCheck { |
| g.verifyPruneMap = make(map[storage.ID]*objVersHist) |
| } |
| |
| // Take a snapshot (copy) of the reclaim vector at startup. |
| for dev, gnum := range syncd.devtab.head.ReclaimVec { |
| g.reclaimSnap[dev] = gnum |
| } |
| |
| return g |
| } |
| |
| // garbageCollect wakes up every garbageCollectInterval to check if it |
| // can reclaim space. |
| func (g *syncGC) garbageCollect() { |
| gcIters := 0 |
| ticker := time.NewTicker(garbageCollectInterval) |
| for { |
| select { |
| case <-g.syncd.closed: |
| ticker.Stop() |
| g.syncd.pending.Done() |
| return |
| case <-ticker.C: |
| gcIters++ |
| if gcIters == compactCount { |
| gcIters = 0 |
| g.doGC(true) |
| } else { |
| g.doGC(false) |
| } |
| } |
| } |
| } |
| |
| // doGC performs the actual garbage collection steps. |
| // If "compact" is true, also compact the Sync DBs. |
| func (g *syncGC) doGC(compact bool) { |
| vlog.VI(1).Infof("doGC:: Started at %v", time.Now().UTC()) |
| |
| g.syncd.lock.Lock() |
| defer g.syncd.lock.Unlock() |
| |
| if err := g.onlineConsistencyCheck(); err != nil { |
| vlog.Fatalf("onlineConsistencyCheck:: failed with err %v", err) |
| } |
| |
| if err := g.reclaimSpace(); err != nil { |
| vlog.Fatalf("reclaimSpace:: failed with err %v", err) |
| } |
| // TODO(hpucha): flush devtable state. |
| |
| if err := g.pruneObjectBatch(); err != nil { |
| vlog.Fatalf("pruneObjectBatch:: failed with err %v", err) |
| } |
| // TODO(hpucha): flush log and dag state. |
| |
| if compact { |
| if err := g.compactDB(); err != nil { |
| vlog.Fatalf("compactDB:: failed with err %v", err) |
| } |
| } |
| } |
| |
| // onlineConsistencyCheck checks if generations lower than the |
| // ReclaimVec (snapshotted at startup) are deleted from the log and |
| // dag data structures. It is needed to prevent space leaks when the |
| // system crashes while pruning a batch of objects. GC state is not |
| // aggressively persisted to make it efficient. Instead, upon reboot, |
| // onlineConsistencyCheck executes incrementally checking all the |
| // generations lower than the ReclaimVec snap to ensure that they are |
| // deleted. Each iteration of onlineConsistencyCheck checks a small |
| // batch of generations. Once all generations below the ReclaimVec |
| // snap are verified once, onlineConsistencyCheck is a noop. |
| func (g *syncGC) onlineConsistencyCheck() error { |
| vlog.VI(1).Infof("onlineConsistencyCheck:: called with %v", g.checkConsistency) |
| if !g.checkConsistency { |
| return nil |
| } |
| |
| vlog.VI(2).Infof("onlineConsistencyCheck:: reclaimSnap is %v", g.reclaimSnap) |
| genCount := 0 |
| for dev, gen := range g.reclaimSnap { |
| if gen == 0 { |
| continue |
| } |
| for i := gen; i > 0; i-- { |
| if genCount == genBatchSize { |
| g.reclaimSnap[dev] = i |
| return nil |
| } |
| if g.syncd.log.hasGenMetadata(dev, i) { |
| if err := g.garbageCollectGeneration(dev, i); err != nil { |
| return err |
| } |
| } |
| |
| genCount++ |
| } |
| g.reclaimSnap[dev] = 0 |
| } |
| |
| // Done with all the generations of all devices. Consistency |
| // check is no longer needed. |
| g.checkConsistency = false |
| vlog.VI(1).Infof("onlineConsistencyCheck:: exited with %v", g.checkConsistency) |
| return nil |
| } |
| |
| // garbageCollectGeneration garbage collects any existing log records |
| // for a particular generation. |
| func (g *syncGC) garbageCollectGeneration(devid DeviceID, gnum GenID) error { |
| vlog.VI(2).Infof("garbageCollectGeneration:: processing generation %s:%d", devid, gnum) |
| // Bootstrap generation for a device. Nothing to GC. |
| if gnum == 0 { |
| return nil |
| } |
| gen, err := g.syncd.log.getGenMetadata(devid, gnum) |
| if err != nil { |
| return err |
| } |
| if gen.Count <= 0 { |
| return errBadMetadata |
| } |
| |
| var count uint64 |
| // Check for log records for this generation. |
| for l := LSN(0); l <= gen.MaxLSN; l++ { |
| if !g.syncd.log.hasLogRec(devid, gnum, l) { |
| continue |
| } |
| |
| count++ |
| rec, err := g.syncd.log.getLogRec(devid, gnum, l) |
| if err != nil { |
| return err |
| } |
| |
| // Insert the object in this log record to the prune |
| // map if needed. |
| // If this object does not exist, create it. |
| // If the object exists, update the object version to |
| // prune at if the current gen is greater than the gen |
| // in the prune map or if the gen is the same but the |
| // current lsn is greater than the previous lsn. |
| gcState, ok := g.pruneObjects[rec.ObjID] |
| if !ok || gcState.pos <= gen.Pos { |
| if !ok { |
| gcState = &objGCState{} |
| g.pruneObjects[rec.ObjID] = gcState |
| } |
| gcState.pos = gen.Pos |
| gcState.version = rec.CurVers |
| vlog.VI(2).Infof("Replacing for obj %v pos %d version %d", |
| rec.ObjID, gcState.pos, gcState.version) |
| } |
| |
| // When strictCheck is enabled, track object's version |
| // history so that we can check against the versions |
| // being deleted. |
| if strictCheck { |
| objHist, ok := g.verifyPruneMap[rec.ObjID] |
| if !ok { |
| objHist = &objVersHist{ |
| versions: make(map[storage.Version]struct{}), |
| } |
| g.verifyPruneMap[rec.ObjID] = objHist |
| } |
| // Add this version to the versions that need to be pruned. |
| objHist.versions[rec.CurVers] = struct{}{} |
| } |
| } |
| |
| if count != gen.Count { |
| return errors.New("incorrect number of log records") |
| } |
| |
| return nil |
| } |
| |
| // reclaimSpace performs periodic space reclamation by deleting |
| // generations known to all devices. |
| // |
| // Approach: For each device in the system, we compute its maximum |
| // generation known to all the other devices in the system. We then |
| // delete all log and dag records below this generation. This is a |
| // O(N^2) algorithm where N is the number of devices in the system. |
| func (g *syncGC) reclaimSpace() error { |
| newReclaimVec, err := g.syncd.devtab.computeReclaimVector() |
| if err != nil { |
| return err |
| } |
| |
| vlog.VI(1).Infof("reclaimSpace:: reclaimVectors: new %v old %v", |
| newReclaimVec, g.syncd.devtab.head.ReclaimVec) |
| // Clean up generations from reclaimVec+1 to newReclaimVec. |
| for dev, high := range newReclaimVec { |
| low := g.syncd.devtab.getOldestGen(dev) |
| |
| // Garbage collect from low+1 to high. |
| for i := GenID(low + 1); i <= high; i++ { |
| if err := g.garbageCollectGeneration(dev, i); err != nil { |
| return err |
| } |
| } |
| } |
| |
| // Update reclaimVec. |
| g.syncd.devtab.head.ReclaimVec = newReclaimVec |
| return nil |
| } |
| |
| // pruneObjectBatch processes a batch of objects to be pruned from log |
| // and dag. |
| func (g *syncGC) pruneObjectBatch() error { |
| vlog.VI(1).Infof("pruneObjectBatch:: Called at %v", time.Now().UTC()) |
| count := 0 |
| for obj, gcState := range g.pruneObjects { |
| if count == objBatchSize { |
| return nil |
| } |
| vlog.VI(1).Infof("pruneObjectBatch:: Pruning obj %v at version %v", obj, gcState.version) |
| // Call dag prune on this object. |
| if err := g.syncd.dag.prune(obj, gcState.version, g.dagPruneCallBack); err != nil { |
| return err |
| } |
| |
| if strictCheck { |
| // Ensure that all but one version in the object version history are gc'ed. |
| objHist, ok := g.verifyPruneMap[obj] |
| if !ok { |
| return fmt.Errorf("missing object in verification map %v", obj) |
| } |
| if len(objHist.versions) != 1 { |
| return fmt.Errorf("leftover/no versions %d", len(objHist.versions)) |
| } |
| for v, _ := range objHist.versions { |
| if v != gcState.version { |
| return fmt.Errorf("leftover version %d %v", v, obj) |
| } |
| } |
| } |
| |
| delete(g.pruneObjects, obj) |
| count++ |
| } |
| return nil |
| } |
| |
| // dagPruneCallBack deletes the log record associated with the dag |
| // node being pruned and updates the generation metadata for the |
| // generation that this log record belongs to. |
| func (g *syncGC) dagPruneCallBack(logKey string) error { |
| dev, gnum, lsn, err := splitLogRecKey(logKey) |
| if err != nil { |
| return err |
| } |
| vlog.VI(2).Infof("dagPruneCallBack:: called for key %s (%s %d %d)", logKey, dev, gnum, lsn) |
| |
| // Check if the log record being deleted is correct as per GC state. |
| oldestGen := g.syncd.devtab.getOldestGen(dev) |
| if gnum > oldestGen { |
| vlog.VI(2).Infof("gnum is %d oldest is %d", gnum, oldestGen) |
| return errors.New("deleting incorrect log record") |
| } |
| |
| if !g.syncd.log.hasLogRec(dev, gnum, lsn) { |
| return errors.New("missing log record") |
| } |
| |
| if strictCheck { |
| rec, err := g.syncd.log.getLogRec(dev, gnum, lsn) |
| if err != nil { |
| return err |
| } |
| objHist, ok := g.verifyPruneMap[rec.ObjID] |
| if !ok { |
| return errors.New("obj not found in verifyMap") |
| } |
| _, found := objHist.versions[rec.CurVers] |
| // If online consistency check is in progress, we |
| // cannot strictly verify all the versions to be |
| // deleted, and we ignore the failure to find a |
| // version. |
| if found { |
| delete(objHist.versions, rec.CurVers) |
| } else if !g.checkConsistency { |
| return errors.New("verification failed") |
| } |
| } |
| |
| if err := g.syncd.log.delLogRec(dev, gnum, lsn); err != nil { |
| return err |
| } |
| |
| // Update generation metadata. |
| gen, err := g.syncd.log.getGenMetadata(dev, gnum) |
| if err != nil { |
| return err |
| } |
| if gen.Count <= 0 { |
| return errBadMetadata |
| } |
| gen.Count-- |
| if gen.Count == 0 { |
| if err := g.syncd.log.delGenMetadata(dev, gnum); err != nil { |
| return err |
| } |
| } else { |
| if err := g.syncd.log.putGenMetadata(dev, gnum, gen); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // compactDB compacts the underlying kvdb store for all data structures. |
| func (g *syncGC) compactDB() error { |
| vlog.VI(1).Infof("compactDB:: Compacting DBs") |
| if err := g.syncd.log.compact(); err != nil { |
| return err |
| } |
| if err := g.syncd.devtab.compact(); err != nil { |
| return err |
| } |
| if err := g.syncd.dag.compact(); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // deleteDevice takes care of permanently deleting a device from its sync peers. |
| // TODO(hpucha): to be implemented. |
| func (g *syncGC) deleteDevice() { |
| } |