blob: 9e35c24f25490bafe8d2e6f2a2d4fd2a2653b8bd [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// Package vsync provides veyron sync ILog utility functions. ILog
// (Indexed Log) provides log functionality with indexing support.
// ILog stores log records that are locally generated or obtained over
// the network. Indexing is needed since sync needs to selectively
// retrieve log records that belong to a particular device, syncroot
// and generation during synchronization.
//
// When a device receives a request to send log records, it first
// computes the missing generations between itself and the incoming
// request on a per-syncroot basis. It then sends all the log records
// belonging to each missing generation. A device that receives log
// records over the network replays all the records received from
// another device in a single batch. Each replayed log record adds a
// new version to the dag of the object contained in the log
// record. At the end of replaying all the log records, conflict
// detection and resolution is carried out for all the objects learned
// during this iteration. Conflict detection and resolution is carried
// out after a batch of log records are replayed, instead of
// incrementally after each record is replayed, to avoid repeating
// conflict resolution already performed by other devices.
//
// New log records are created when objects in the local store are
// created/updated. Local log records are also replayed to keep the
// per-object dags consistent with the local store state.
//
// Note that syncgroups are mainly tracked between syncd/store and the
// client. Sync-related metadata (log records, generations and
// generation vectors) is unaware of syncgroups, and uses syncroots
// instead. Each syncgroup contains a root object and a unique root
// object ID. In case of peer syncgroups, all the peer syncgroups
// contain the same root object ID. Sync translates any given
// syncgroup to a syncroot rooted at this root object ID and syncs all
// the syncroots the device is part of. Thus, although a device may
// be part of more than 1 peer syncgroup, at the sync level, a single
// syncroot is synced on behalf of all the peer syncgroups.
//
// Implementation notes: ILog records are stored in a persistent K/V
// database in the current implementation. ILog db consists of 3
// tables:
// ** records: table consists of all the log records indexed
// by deviceid:srid:genid:lsn referring to
// the device that creates the log record
// the root oid of the syncroot to which the log record belongs
// the generation on the syncroot the log record is part of
// the sequence number in that generation
// Note that lsn in each generation starts from 0 and genid starts from 1.
// ** gens: table consists of the generation metadata for each
// generation, and is indexed by deviceid:srid:genid.
// ** head: table consists of the log header.
import (
"errors"
"fmt"
"strconv"
"strings"
"v.io/x/lib/vlog"
)
var (
errNoUpdates = errors.New("no new local updates")
errInvalidLog = errors.New("invalid log db")
)
// curLocalGen contains metadata re. the current local generation for a SyncRoot.
type curLocalGen struct {
CurGenNum GenId // generation id for a SyncRoot's current generation.
CurSeqNum SeqNum // log sequence number for a SyncRoot's current generation.
}
// iLogHeader contains the log header metadata.
type iLogHeader struct {
CurSRGens map[ObjId]*curLocalGen // local generation info per SyncRoot.
Curorder uint32 // position in log for the next generation.
}
// genMetadata contains the metadata for a generation.
type genMetadata struct {
// All generations stored in the log are ordered wrt each
// other and this order needs to be preserved.
// Position of this generation in the log.
Pos uint32
// Number of log records in this generation still stored in the log.
// This count is used during garbage collection.
Count uint64
// Maximum SeqNum that was part of this generation.
// This is useful during garbage collection to track any unclaimed log records.
MaxSeqNum SeqNum
}
// iLog contains the metadata for the ILog db.
type iLog struct {
fname string // file pathname.
db *kvdb // underlying k/v db.
// Key:deviceid:srid:genid:lsn Value:LogRecord. Pointer to
// the "records" table in the kvdb. Contains log records.
records *kvtable
// Key:deviceid:srid:genid Value:genMetadata. Pointer to the
// "gens" table in the kvdb. Contains generation metadata for
// each generation.
gens *kvtable
// Key:"Head" Value:iLogHeader. Pointer to the "header" table
// in the kvdb. Contains logheader.
header *kvtable
head *iLogHeader // log head cached in memory.
s *syncd // pointer to the sync daemon object.
}
// openILog opens or creates a ILog for the given filename.
func openILog(filename string, sin *syncd) (*iLog, error) {
ilog := &iLog{
fname: filename,
head: &iLogHeader{},
s: sin,
}
// Open the file and create it if it does not exist.
// Also initialize the kvdb and its three collections.
db, tbls, err := kvdbOpen(filename, []string{"records", "gens", "header"})
if err != nil {
return nil, err
}
ilog.db = db
ilog.records = tbls[0]
ilog.gens = tbls[1]
ilog.header = tbls[2]
// If header already exists in db, read it back from db.
if ilog.hasHead() {
if err := ilog.getHead(); err != nil {
ilog.db.close() // this also closes the tables.
return nil, err
}
} else {
ilog.head.CurSRGens = make(map[ObjId]*curLocalGen)
}
return ilog, nil
}
// close closes the ILog and invalidate its struct.
func (l *iLog) close() error {
if l.db == nil {
return errInvalidLog
}
// Flush the dirty data.
if err := l.flush(); err != nil {
return err
}
l.db.close() // this also closes the tables.
*l = iLog{} // zero out the ILog struct.
return nil
}
// flush flushes the ILog db to disk.
func (l *iLog) flush() error {
if l.db == nil {
return errInvalidLog
}
// Set the head from memory before flushing.
if err := l.putHead(); err != nil {
return err
}
l.db.flush()
return nil
}
// initSyncRoot initializes the log header for this SyncRoot.
func (l *iLog) initSyncRoot(srid ObjId) error {
if l.db == nil {
return errInvalidLog
}
if _, ok := l.head.CurSRGens[srid]; ok {
return fmt.Errorf("syncroot already exists %v", srid)
}
// First generation to be created is generation 1. A
// generation of 0 represents no updates on the device.
l.head.CurSRGens[srid] = &curLocalGen{CurGenNum: 1}
return nil
}
// delSyncRoot deletes the log header for this SyncRoot.
func (l *iLog) delSyncRoot(srid ObjId) error {
if l.db == nil {
return errInvalidLog
}
if _, ok := l.head.CurSRGens[srid]; !ok {
return fmt.Errorf("syncroot doesn't exist %v", srid)
}
delete(l.head.CurSRGens, srid)
return nil
}
// putHead puts the log head into the ILog db.
func (l *iLog) putHead() error {
return l.header.set("Head", l.head)
}
// getHead gets the log head from the ILog db.
func (l *iLog) getHead() error {
if l.head == nil {
return errors.New("nil log header")
}
if err := l.header.get("Head", l.head); err != nil {
return err
}
// When we put an empty map in kvdb, we get back a "nil" map
// (due to VOM encoding/decoding). To keep putHead/getHead
// symmetrical, we add this additional initialization.
if l.head.CurSRGens == nil {
l.head.CurSRGens = make(map[ObjId]*curLocalGen)
}
return nil
}
// hasHead returns true if the ILog db has a log head.
func (l *iLog) hasHead() bool {
return l.header.hasKey("Head")
}
// logRecKey creates a key for a log record.
func logRecKey(devid DeviceId, srid ObjId, gnum GenId, lsn SeqNum) string {
return fmt.Sprintf("%s:%s:%d:%d", devid, srid.String(), uint64(gnum), uint64(lsn))
}
// strToGenId converts a string to GenId.
func strToGenId(genIDStr string) (GenId, error) {
id, err := strconv.ParseUint(genIDStr, 10, 64)
if err != nil {
return 0, err
}
return GenId(id), nil
}
// strToObjId converts a string to ObjId.
func strToObjId(objIDStr string) (ObjId, error) {
return ObjId(objIDStr), nil
}
// splitLogRecKey splits a : separated logrec key into its components.
func splitLogRecKey(key string) (DeviceId, ObjId, GenId, SeqNum, error) {
args := strings.Split(key, ":")
if len(args) != 4 {
return "", NoObjId, 0, 0, fmt.Errorf("bad logrec key %s", key)
}
srid, err := strToObjId(args[1])
if err != nil {
return "", NoObjId, 0, 0, err
}
gnum, err := strToGenId(args[2])
if err != nil {
return "", NoObjId, 0, 0, err
}
lsn, err := strconv.ParseUint(args[3], 10, 64)
if err != nil {
return "", NoObjId, 0, 0, err
}
return DeviceId(args[0]), srid, gnum, SeqNum(lsn), nil
}
// putLogRec puts the log record into the ILog db.
func (l *iLog) putLogRec(rec *LogRec) (string, error) {
if l.db == nil {
return "", errInvalidLog
}
key := logRecKey(rec.DevId, rec.SyncRootId, rec.GenNum, rec.SeqNum)
return key, l.records.set(key, rec)
}
// getLogRec gets the log record from the ILog db.
func (l *iLog) getLogRec(devid DeviceId, srid ObjId, gnum GenId, lsn SeqNum) (*LogRec, error) {
if l.db == nil {
return nil, errInvalidLog
}
key := logRecKey(devid, srid, gnum, lsn)
var rec LogRec
if err := l.records.get(key, &rec); err != nil {
return nil, err
}
return &rec, nil
}
// hasLogRec returns true if the ILog db has a log record matching (devid, srid, gnum, lsn).
func (l *iLog) hasLogRec(devid DeviceId, srid ObjId, gnum GenId, lsn SeqNum) bool {
if l.db == nil {
return false
}
key := logRecKey(devid, srid, gnum, lsn)
return l.records.hasKey(key)
}
// delLogRec deletes the log record matching (devid, srid, gnum, lsn) from the ILog db.
func (l *iLog) delLogRec(devid DeviceId, srid ObjId, gnum GenId, lsn SeqNum) error {
if l.db == nil {
return errInvalidLog
}
key := logRecKey(devid, srid, gnum, lsn)
return l.records.del(key)
}
// generationKey creates a key for a generation.
func generationKey(devid DeviceId, srid ObjId, gnum GenId) string {
return fmt.Sprintf("%s:%s:%d", devid, srid.String(), gnum)
}
// splitGenerationKey splits a : separated generation key into its components.
func splitGenerationKey(key string) (DeviceId, ObjId, GenId, error) {
args := strings.Split(key, ":")
if len(args) != 3 {
return "", NoObjId, 0, fmt.Errorf("bad generation key %s", key)
}
srid, err := strToObjId(args[1])
if err != nil {
return "", NoObjId, 0, err
}
gnum, err := strToGenId(args[2])
if err != nil {
return "", NoObjId, 0, err
}
return DeviceId(args[0]), srid, gnum, nil
}
// putGenMetadata puts the metadata of the generation (devid, srid, gnum) into the ILog db.
func (l *iLog) putGenMetadata(devid DeviceId, srid ObjId, gnum GenId, val *genMetadata) error {
key := generationKey(devid, srid, gnum)
return l.gens.set(key, val)
}
// getGenMetadata gets the metadata of the generation (devid, srid, gnum) from the ILog db.
func (l *iLog) getGenMetadata(devid DeviceId, srid ObjId, gnum GenId) (*genMetadata, error) {
if l.db == nil {
return nil, errInvalidLog
}
key := generationKey(devid, srid, gnum)
var val genMetadata
if err := l.gens.get(key, &val); err != nil {
return nil, err
}
return &val, nil
}
// hasGenMetadata returns true if the ILog db has the generation (devid, srid, gnum).
func (l *iLog) hasGenMetadata(devid DeviceId, srid ObjId, gnum GenId) bool {
key := generationKey(devid, srid, gnum)
return l.gens.hasKey(key)
}
// delGenMetadata deletes the generation (devid, srid, gnum) metadata from the ILog db.
func (l *iLog) delGenMetadata(devid DeviceId, srid ObjId, gnum GenId) error {
if l.db == nil {
return errInvalidLog
}
key := generationKey(devid, srid, gnum)
return l.gens.del(key)
}
// getLocalGenInfo gets the local generation info for the given syncroot.
func (l *iLog) getLocalGenInfo(srid ObjId) (*curLocalGen, error) {
gen, ok := l.head.CurSRGens[srid]
if !ok {
return nil, fmt.Errorf("no gen info found for srid %s", srid.String())
}
return gen, nil
}
// createLocalLogRec creates a new local log record of type NodeRec.
func (l *iLog) createLocalLogRec(obj ObjId, vers Version,
par []Version, val *LogValue, srid ObjId) (*LogRec, error) {
gen, err := l.getLocalGenInfo(srid)
if err != nil {
return nil, err
}
rec := &LogRec{
DevId: l.s.id,
SyncRootId: srid,
GenNum: gen.CurGenNum,
SeqNum: gen.CurSeqNum,
RecType: NodeRec,
ObjId: obj,
CurVers: vers,
Parents: par,
Value: *val,
}
// Increment the SeqNum for the local log.
gen.CurSeqNum++
return rec, nil
}
// createLocalLinkLogRec creates a new local log record of type LinkRec.
func (l *iLog) createLocalLinkLogRec(obj ObjId, vers, par Version, srid ObjId) (*LogRec, error) {
gen, err := l.getLocalGenInfo(srid)
if err != nil {
return nil, err
}
rec := &LogRec{
DevId: l.s.id,
SyncRootId: srid,
GenNum: gen.CurGenNum,
SeqNum: gen.CurSeqNum,
RecType: LinkRec,
ObjId: obj,
CurVers: vers,
Parents: []Version{par},
Value: LogValue{},
}
// Increment the SeqNum for the local log.
gen.CurSeqNum++
return rec, nil
}
// createRemoteGeneration adds a new remote generation.
func (l *iLog) createRemoteGeneration(dev DeviceId, srid ObjId, gnum GenId, gen *genMetadata) error {
if l.db == nil {
return errInvalidLog
}
if gen.Count != uint64(gen.MaxSeqNum+1) {
return errors.New("mismatch in count and lsn")
}
vlog.VI(2).Infof("createRemoteGeneration:: dev %s srid %s gen %d %v", dev, srid.String(), gnum, gen)
gen.Pos = l.head.Curorder
l.head.Curorder++
return l.putGenMetadata(dev, srid, gnum, gen)
}
// createLocalGeneration creates a new local generation.
func (l *iLog) createLocalGeneration(srid ObjId) (GenId, error) {
if l.db == nil {
return 0, errInvalidLog
}
g, err := l.getLocalGenInfo(srid)
if err != nil {
return 0, err
}
gnum := g.CurGenNum
// If there are no updates, there will be no new generation.
if g.CurSeqNum == 0 {
return gnum - 1, errNoUpdates
}
// Add the current generation to the db.
val := &genMetadata{
Pos: l.head.Curorder,
Count: uint64(g.CurSeqNum),
MaxSeqNum: g.CurSeqNum - 1,
}
err = l.putGenMetadata(l.s.id, srid, gnum, val)
vlog.VI(2).Infof("createLocalGeneration:: created srid %s gen %d %v", srid.String(), gnum, val)
// Move to the next generation irrespective of err.
l.head.Curorder++
g.CurGenNum++
g.CurSeqNum = 0
return gnum, err
}
// processWatchRecord processes new object versions obtained from the local store.
func (l *iLog) processWatchRecord(objID ObjId, vers, parent Version, val *LogValue, srid ObjId) error {
if l.db == nil {
return errInvalidLog
}
vlog.VI(2).Infof("processWatchRecord:: adding for object %v %v (srid %v)", objID, vers, srid)
if !srid.IsValid() {
return errors.New("invalid syncroot id")
}
// Filter out the echo from the watcher. When syncd puts mutations into store,
// it hears back these mutations once again on the watch stream. We need to
// filter out these echoes and only process brand new watch changes.
if vers != NoVersion {
// Check if the object's vers already exists in the DAG.
if l.s.dag.hasNode(objID, vers) {
return nil
}
// When we successfully join a SyncGroup, Store
// creates an empty directory with object ID as the
// rootObjId of the SyncGroup joined. We do not care
// about this version of the directory. We will start
// accepting local mutations on this object only after
// we hear about it remotely first.
//
// NOTE: Since this new directory is protected from
// any local updates via an Permissions until after the first
// sync, waiting to hear remotely first works without
// dropping any updates. However, we cache in the
// "priv" table its version in the Store so that we
// can correctly specify prior version when we put
// mutations into the store.
if _, err := l.s.dag.getHead(objID); err != nil && objID == srid {
priv := &privNode{ /*Mutation: &val.Mutation, */ SyncTime: val.SyncTime, TxId: val.TxId, TxCount: val.TxCount}
return l.s.dag.setPrivNode(objID, priv)
}
} else {
// Check if the parent version has a deleted
// descendant already in the DAG.
if l.s.dag.hasDeletedDescendant(objID, parent) {
return nil
}
}
var pars []Version
if parent != NoVersion {
pars = []Version{parent}
}
// If the current version is a deletion, generate a new version number.
if val.Delete {
if vers != NoVersion {
return fmt.Errorf("deleted vers is %v", vers)
}
vers = NewVersion()
//val.Mutation.Version = vers
}
// Create a log record from Watch's Change Record.
rec, err := l.createLocalLogRec(objID, vers, pars, val, srid)
if err != nil {
return err
}
// Insert the new log record into the log.
logKey, err := l.putLogRec(rec)
if err != nil {
return err
}
// Insert the new log record into dag.
if err = l.s.dag.addNode(rec.ObjId, rec.CurVers, false, val.Delete, rec.Parents, logKey, val.TxId); err != nil {
return err
}
// Move the head.
return l.s.dag.moveHead(rec.ObjId, rec.CurVers)
}
// dump writes to the log file information on ILog internals.
func (l *iLog) dump() {
if l.db == nil {
return
}
vlog.VI(1).Infof("DUMP: ILog: #SR %d, cur %d", len(l.head.CurSRGens), l.head.Curorder)
for sr, gen := range l.head.CurSRGens {
vlog.VI(1).Infof("DUMP: ILog: SR %v: gen %v, lsn %v", sr, gen.CurGenNum, gen.CurSeqNum)
}
// Find for each (devid, srid) pair its lowest and highest generation numbers.
type genInfo struct{ min, max GenId }
devs := make(map[DeviceId]map[ObjId]*genInfo)
l.gens.keyIter(func(genKey string) {
devid, srid, gnum, err := splitGenerationKey(genKey)
if err != nil {
return
}
srids, ok := devs[devid]
if !ok {
srids = make(map[ObjId]*genInfo)
devs[devid] = srids
}
info, ok := srids[srid]
if ok {
if gnum > info.max {
info.max = gnum
}
if gnum < info.min {
info.min = gnum
}
} else {
srids[srid] = &genInfo{min: gnum, max: gnum}
}
})
// For each (devid, srid) pair dump its generation info in order from
// min to max generation numbers, inclusive.
for devid, srids := range devs {
for srid, info := range srids {
for gnum := info.min; gnum <= info.max; gnum++ {
meta, err := l.getGenMetadata(devid, srid, gnum)
if err == nil {
vlog.VI(1).Infof(
"DUMP: ILog: gen (%v, %v, %v): pos %v, count %v, maxlsn %v",
devid, srid, gnum, meta.Pos, meta.Count, meta.MaxSeqNum)
} else {
vlog.VI(1).Infof("DUMP: ILog: gen (%v, %v, %v): missing generation",
devid, srid, gnum)
}
}
}
}
}