blob: 4a2fe7120832eae2cabd64249cdd18ef20e834ca [file] [log] [blame]
package vsync
// Package vsync provides veyron sync DevTable utility functions.
// DevTable is indexed by the device id and stores device level
// information needed by sync. Main component of a device's info is
// its generation vector. Generation vector is the version vector for
// a device's store, representing all the different generations (from
// different devices) seen by a given device. A generation represents
// a collection of updates that originated on a device during an
// interval of time. It serves as a checkpoint when communicating with
// other devices. Generations do not overlap and all updates belong to
// a generation.
//
// Synchronization between two devices A and B uses generation vectors
// as follows:
// A B
// <== B's generation vector
// diff(A's generation vector, B's generation vector)
// log records of missing generations ==>
// cache B's generation vector (for space reclamation)
//
// Implementation notes: DevTable is stored in a persistent K/V
// database in the current implementation. Generation vector is
// implemented as a map of (Device ID -> Generation ID), one entry for
// every known device. If the generation vector contains an entry
// (Device ID -> Generation ID), it implies that the device has
// learned of all the generations until and including Generation
// ID. Generation IDs start from 1. A generation ID of 0 is a
// reserved boot strap value, and indicates the device has no updates.
import (
"errors"
"sort"
"time"
"veyron2/vlog"
)
var (
errInvalidDTab = errors.New("invalid devtable db")
)
// devInfo is the information stored per device.
type devInfo struct {
Vector GenVector // device generation vector.
Ts time.Time // last communication time stamp.
}
// devTableHeader contains the header metadata.
type devTableHeader struct {
Resmark []byte // resume marker for watch.
// Generation vector for space reclamation. All generations
// less than this generation vector are deleted from storage.
ReclaimVec GenVector
}
// devTable contains the metadata for the device table db.
type devTable struct {
fname string // file pathname.
db *kvdb // underlying K/V DB.
devices *kvtable // pointer to the "devices" table in the kvdb. Contains device info.
// Key:"Head" Value:devTableHeader
header *kvtable // pointer to the "header" table in the kvdb. Contains device table header.
head *devTableHeader // devTable head cached in memory.
s *syncd // pointer to the sync daemon object.
}
// genOrder represents a generation along with its position in the log.
type genOrder struct {
devID DeviceID
genID GenID
order uint32
}
// byOrder is used to sort the genOrder array.
type byOrder []*genOrder
func (a byOrder) Len() int {
return len(a)
}
func (a byOrder) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a byOrder) Less(i, j int) bool {
return a[i].order < a[j].order
}
// openDevTable opens or creates a devTable for the given filename.
func openDevTable(filename string, sin *syncd) (*devTable, error) {
dtab := &devTable{
fname: filename,
s: sin,
}
// Open the file and create it if it does not exist.
// Also initialize the kvdb and its collection.
db, tbls, err := kvdbOpen(filename, []string{"devices", "header"})
if err != nil {
return nil, err
}
dtab.db = db
dtab.devices = tbls[0]
dtab.header = tbls[1]
// Initialize local gen vector with own device id and
// generation id of 0 if local vector doesn't exist.
if !dtab.hasDevInfo(dtab.s.id) {
vector := GenVector{
dtab.s.id: 0,
}
if err := dtab.putGenVec(dtab.s.id, vector); err != nil {
dtab.db.close() // this also closes the tables.
return nil, err
}
}
// Initialize the devTable header.
dtab.head = &devTableHeader{
ReclaimVec: GenVector{
dtab.s.id: 0,
},
}
// If header already exists in db, read it back from db.
if dtab.hasHead() {
if err := dtab.getHead(); err != nil {
dtab.db.close() // this also closes the tables.
return nil, err
}
}
return dtab, nil
}
// close closes the devTable and invalidates its struct.
func (dt *devTable) close() error {
if dt.db == nil {
return errInvalidDTab
}
// Flush the dirty data.
if err := dt.flush(); err != nil {
return err
}
dt.db.close() // this also closes the tables.
*dt = devTable{} // zero out the devTable struct.
return nil
}
// flush flushes the devTable db to storage.
func (dt *devTable) flush() error {
if dt.db == nil {
return errInvalidDTab
}
// Set the head from memory before flushing.
if err := dt.putHead(); err != nil {
return err
}
dt.db.flush()
return nil
}
// compact compacts the file associated with kvdb.
func (dt *devTable) compact() error {
if dt.db == nil {
return errInvalidDTab
}
db, tbls, err := dt.db.compact(dt.fname, []string{"devices", "header"})
if err != nil {
return err
}
dt.db = db
dt.devices = tbls[0]
dt.header = tbls[1]
return nil
}
// putHead puts the devTable head into the devTable db.
func (dt *devTable) putHead() error {
return dt.header.set("Head", dt.head)
}
// getHead gets the devTable head from the devTable db.
func (dt *devTable) getHead() error {
if dt.head == nil {
return errors.New("nil devTable header")
}
return dt.header.get("Head", dt.head)
}
// hasHead returns true if the devTable db has a devTable head.
func (dt *devTable) hasHead() bool {
return dt.header.hasKey("Head")
}
// putDevInfo puts a devInfo struct in the devTable db.
func (dt *devTable) putDevInfo(devid DeviceID, info *devInfo) error {
if dt.db == nil {
return errInvalidDTab
}
return dt.devices.set(string(devid), info)
}
// getDevInfo gets a devInfo struct from the devTable db.
func (dt *devTable) getDevInfo(devid DeviceID) (*devInfo, error) {
if dt.db == nil {
return nil, errInvalidDTab
}
var info devInfo
if err := dt.devices.get(string(devid), &info); err != nil {
return nil, err
}
if info.Vector == nil {
return nil, errors.New("nil genvector")
}
return &info, nil
}
// hasDevInfo returns true if the device (devid) has any devInfo in the devTable db.
func (dt *devTable) hasDevInfo(devid DeviceID) bool {
if dt.db == nil {
return false
}
return dt.devices.hasKey(string(devid))
}
// putGenVec puts a generation vector in the devTable db.
func (dt *devTable) putGenVec(devid DeviceID, v GenVector) error {
if dt.db == nil {
return errInvalidDTab
}
var info *devInfo
if dt.hasDevInfo(devid) {
var err error
if info, err = dt.getDevInfo(devid); err != nil {
return err
}
info.Vector = v
} else {
info = &devInfo{
Vector: v,
Ts: time.Now().UTC(),
}
}
return dt.putDevInfo(devid, info)
}
// getGenVec gets a generation vector from the devTable db.
func (dt *devTable) getGenVec(devid DeviceID) (GenVector, error) {
if dt.db == nil {
return nil, errInvalidDTab
}
info, err := dt.getDevInfo(devid)
if err != nil {
return nil, err
}
return info.Vector, nil
}
// populateGenOrderEntry populates a genOrder entry.
func (dt *devTable) populateGenOrderEntry(e *genOrder, id DeviceID, gnum GenID) error {
e.devID = id
e.genID = gnum
o, err := dt.s.log.getGenMetadata(id, gnum)
if err != nil {
return err
}
e.order = o.Pos
return nil
}
// updateGeneration updates a single generation (upID, upGen) in a device's generation vector.
func (dt *devTable) updateGeneration(key, upID DeviceID, upGen GenID) error {
if dt.db == nil {
return errInvalidDTab
}
info, err := dt.getDevInfo(key)
if err != nil {
return err
}
info.Vector[upID] = upGen
return dt.putDevInfo(key, info)
}
// updateLocalGenVector updates local generation vector based on the remote generation vector.
func (dt *devTable) updateLocalGenVector(local, remote GenVector) error {
if dt.db == nil {
return errInvalidDTab
}
if local == nil || remote == nil {
return errors.New("invalid input args to function")
}
for rid, rgen := range remote {
lgen, ok := local[rid]
if !ok || lgen < rgen {
local[rid] = rgen
}
}
return nil
}
// diffGenVectors diffs generation vectors belonging to src and dest
// and returns the generations known to src and not known to dest. In
// addition, sync needs to maintain the order in which device
// generations are created/received. Hence, when two generation
// vectors are diffed, the differing generations are returned in a
// sorted order based on their position in the src's log. genOrder
// array consists of every generation that is missing between src and
// dest sorted using its position in the src's log.
// Example: Generation vector for device A (src) AVec = {A:10, B:5, C:1}
// Generation vector for device B (dest) BVec = {A:5, B:10, D:2}
// Missing generations in unsorted order: {A:6, A:7, A:8, A:9, A:10,
// C:1} TODO(hpucha): Revisit for the case of a lot of generations to
// send back (say during bootstrap).
func (dt *devTable) diffGenVectors(srcVec, destVec GenVector) ([]*genOrder, error) {
if dt.db == nil {
return nil, errInvalidDTab
}
// Create an array for the generations that need to be returned.
var gens []*genOrder
// Compute missing generations for devices that are in destination and source vector.
for devid, genid := range destVec {
srcGenID, ok := srcVec[devid]
// Skip since src doesn't know of this device.
if !ok {
continue
}
// Need to include all generations in the interval [genid+1, srcGenID],
// genid+1 and srcGenID inclusive.
// Check against reclaimVec to see if required generations are already GCed.
// Starting gen is then max(oldGen, genid+1)
startGen := genid + 1
oldGen := dt.getOldestGen(devid) + 1
if startGen < oldGen {
vlog.VI(1).Infof("diffGenVectors:: Adjusting starting generations from %d to %d",
startGen, oldGen)
startGen = oldGen
}
for i := startGen; i <= srcGenID; i++ {
// Populate the genorder entry.
var entry genOrder
if err := dt.populateGenOrderEntry(&entry, devid, i); err != nil {
return nil, err
}
gens = append(gens, &entry)
}
}
// Compute missing generations for devices not in destination vector but in source vector.
for devid, genid := range srcVec {
// Add devices destination does not know about.
if _, ok := destVec[devid]; !ok {
// Bootstrap generation to oldest available.
destGenID := dt.getOldestGen(devid) + 1
// Need to include all generations in the interval [destGenID, genid],
// destGenID and genid inclusive.
for i := destGenID; i <= genid; i++ {
// Populate the genorder entry.
var entry genOrder
if err := dt.populateGenOrderEntry(&entry, devid, i); err != nil {
return nil, err
}
gens = append(gens, &entry)
}
}
}
// Sort generations in log order.
sort.Sort(byOrder(gens))
return gens, nil
}
// getOldestGen returns the most recent gc'ed generation for the device "dev".
func (dt *devTable) getOldestGen(dev DeviceID) GenID {
return dt.head.ReclaimVec[dev]
}
// computeReclaimVector computes a generation vector such that the
// generations less than or equal to those in the vector can be
// garbage collected. Caller holds a lock on s.lock.
//
// Approach: For each device in the system, we compute its maximum
// generation known to all the other devices in the system. This is a
// O(N^2) algorithm where N is the number of devices in the system. N
// is assumed to be small, of the order of hundreds of devices.
func (dt *devTable) computeReclaimVector() (GenVector, error) {
// Get local generation vector to create the set of devices in
// the system. Local generation vector is a good bootstrap
// device set since it contains all the devices whose log
// records were ever stored locally.
devSet, err := dt.getGenVec(dt.s.id)
if err != nil {
return nil, err
}
newReclaimVec := GenVector{}
for devid := range devSet {
if !dt.hasDevInfo(devid) {
// This node knows of devid, but hasn't yet
// contacted the device. Do not garbage
// collect any further. For instance, when
// node A learns of node C's generations from
// node B, node A may not have an entry for
// node C yet, but node C will be part of its
// devSet.
for dev := range devSet {
newReclaimVec[dev] = dt.getOldestGen(dev)
}
return newReclaimVec, nil
}
vec, err := dt.getGenVec(devid)
if err != nil {
return nil, err
}
for dev := range devSet {
gen1, ok := vec[dev]
// Device "devid" does not know about device "dev".
if !ok {
newReclaimVec[dev] = dt.getOldestGen(dev)
continue
}
gen2, ok := newReclaimVec[dev]
if !ok || (gen1 < gen2) {
newReclaimVec[dev] = gen1
}
}
}
return newReclaimVec, nil
}
// addDevice adds a newly learned device to the devTable state.
func (dt *devTable) addDevice(newDev DeviceID) error {
// Create an entry in the device table for the new device.
vector := GenVector{
newDev: 0,
}
if err := dt.putGenVec(newDev, vector); err != nil {
return err
}
// Update local generation vector with the new device.
local, err := dt.getDevInfo(dt.s.id)
if err != nil {
return err
}
if err := dt.updateLocalGenVector(local.Vector, vector); err != nil {
return err
}
if err := dt.putDevInfo(dt.s.id, local); err != nil {
return err
}
return nil
}
// updateReclaimVec updates the reclaim vector to track gc'ed generations.
func (dt *devTable) updateReclaimVec(minGens GenVector) error {
for dev, min := range minGens {
gen, ok := dt.head.ReclaimVec[dev]
if !ok {
if min < 1 {
vlog.Errorf("updateReclaimVec:: Received bad generation %s %d",
dev, min)
dt.head.ReclaimVec[dev] = 0
} else {
dt.head.ReclaimVec[dev] = min - 1
}
continue
}
// We obtained a generation that is already reclaimed.
if min <= gen {
return errors.New("requested gen smaller than GC'ed gen")
}
}
return nil
}