blob: f57666f7ff32f6639377418e17f429f7426782eb [file] [log] [blame] [edit]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package raft
// The persistent log consists of a directory containing the files:
// snap.<term>.<index> - snapshots of the client database to which all log entries
// up to <term>.<index> have been applied.
// log.<term>.<index> - a log commencing after <term>.<index>
//
// The tail of the log is kept in memory. Once the tail is long enough (> snapshotThreshold) a
// snapshot is taken.
import (
"encoding/gob"
"errors"
"fmt"
"io"
"math/rand"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"v.io/x/lib/vlog"
"v.io/v23/context"
"v.io/v23/verror"
)
const (
headerLen = 1024
defaultSnapshotThreshold = 10 * 1024 * 1024
)
// fsPersist is persistent state implemented in a file system. It stores the state in a single directory.
// One file, <directory>/persistent, represents the persistent non-log state and a series of files,
// <directory>/log<n>, the logs. A new log file is created on startup and after every snapshot.
type fsPersist struct {
sync.Mutex
r *raft
currentTerm Term
votedFor string
dir string // the directory
client RaftClient
baseIndex Index // The index before the log starts (either 0 or from a snapshot).
baseTerm Term // The term before the log starts (either 0 or from a snapshot).
lastIndex Index // last index stored to
lastTerm Term // term for entry at lastIindex
// For appending the log file.
lf *os.File
encoder *gob.Encoder
// In-memory version of the log file. We depend on log truncation to keep this
// to a reasonable size.
logTail map[Index]*logEntry
firstLogTailIndex Index
snapshotThreshold int64
// Name of last snapshot read or written.
lastSnapFile string
lastSnapTerm Term
lastSnapIndex Index
snapping bool // true if we are in the process of creating a snapshot.
}
// ControlEntry's are appended to the log to reflect changes in the current term and/or the voted for
// leader during ellections
type ControlEntry struct {
InUse bool
CurrentTerm Term
VotedFor string
}
// logEntrySize is an approximation of the size of a log entry.
func logEntrySize(le *LogEntry) int64 {
return int64(32 + len(le.Cmd))
}
// SetCurrentTerm implements persistent.SetCurrentTerm.
func (p *fsPersist) SetCurrentTerm(ct Term) error {
p.Lock()
defer p.Unlock()
p.currentTerm = ct
return p.syncLog()
}
// IncCurrentTerm increments the current term.
func (p *fsPersist) IncCurrentTerm() error {
p.Lock()
defer p.Unlock()
p.currentTerm = p.currentTerm + 1
return p.syncLog()
}
// SetVotedFor implements persistent.SetVotedFor.
func (p *fsPersist) SetVotedFor(vf string) error {
p.Lock()
defer p.Unlock()
if vf == p.votedFor {
return nil
}
p.votedFor = vf
return p.syncLog()
}
// SetVotedFor implements persistent.SetCurrentTermAndVotedFor.
func (p *fsPersist) SetCurrentTermAndVotedFor(ct Term, vf string) error {
p.Lock()
defer p.Unlock()
p.currentTerm = ct
p.votedFor = vf
return p.syncLog()
}
// CurrentTerm implements persistent.CurrentTerm.
func (p *fsPersist) CurrentTerm() Term {
p.Lock()
defer p.Unlock()
return p.currentTerm
}
// LastIndex implements persistent.LastIndex.
func (p *fsPersist) LastIndex() Index {
p.Lock()
defer p.Unlock()
return p.lastIndex
}
// LastTerm implements persistent.LastTerm.
func (p *fsPersist) LastTerm() Term {
p.Lock()
defer p.Unlock()
return p.lastTerm
}
// VotedFor implements persistent.VotedFor.
func (p *fsPersist) VotedFor() string {
p.Lock()
defer p.Unlock()
return p.votedFor
}
// Close implements persistent.Close.
func (p *fsPersist) Close() {
p.lf.Sync()
p.lf.Close()
}
// AppendToLog implements persistent.AppendToLog.
func (p *fsPersist) AppendToLog(ctx *context.T, prevTerm Term, prevIndex Index, entries []LogEntry) error {
p.Lock()
defer p.Unlock()
if prevIndex != p.baseIndex || prevTerm != p.baseTerm {
// We will not log if the previous entry either doesn't exist or has the wrong term.
le := p.lookup(prevIndex)
if le == nil {
return verror.New(errOutOfSequence, ctx, prevTerm, prevIndex)
} else if le.Term != prevTerm {
return verror.New(errOutOfSequence, ctx, prevTerm, prevIndex)
}
}
for i, e := range entries {
// If its already in the log, do nothing.
le, ok := p.logTail[prevIndex+Index(i)+1]
if ok && le.Term == e.Term {
continue
}
// Add both to the log logTail and the log file.
// TODO(p): Think about syncing the output before returning.
ne := e
if err := p.encoder.Encode(ne); err != nil {
return fmt.Errorf("append %d, %d, %v: %s", prevTerm, prevIndex, ne, err)
}
p.addToLogTail(&ne)
}
return nil
}
// ConsiderSnapshot implements Persistent.ConsiderSnapshot.
func (p *fsPersist) ConsiderSnapshot(ctx *context.T, lastAppliedTerm Term, lastAppliedIndex Index) {
p.Lock()
if p.snapping {
p.Unlock()
return
}
// Take a snapshot if the log is too big.
if int64(lastAppliedIndex-p.firstLogTailIndex) < p.snapshotThreshold {
p.Unlock()
return
}
p.snapping = true
p.Unlock()
safeToProceed := make(chan struct{})
go p.takeSnapshot(ctx, lastAppliedTerm, lastAppliedIndex, safeToProceed)
<-safeToProceed
}
// TrimLog trims the log to contain only entries from prevIndex on. This will fail if prevIndex isn't
// in the logTail or if creating or writing the new log file fails.
func (p *fsPersist) trimLog(ctx *context.T, prevTerm Term, prevIndex Index) error {
// Nothing to trim?
vlog.Infof("trimLog(%d, %d)", prevTerm, prevIndex)
if prevIndex == 0 {
return nil
}
p.Lock()
le := p.lookup(prevIndex)
p.Unlock()
if le == nil {
return verror.New(errOutOfSequence, ctx, 0, prevIndex)
}
// Create a new log file.
fn, encoder, err := p.newLog(prevTerm, prevIndex)
if err != nil {
return err
}
// Append all log entries after prevIndex. Unlock while writing so we don't
// overly affect performance. We might miss some entries that are added while the
// lock is released so redo this loop later under lock.
i := 1
for {
p.Lock()
le, ok := p.logTail[prevIndex+Index(i)]
p.Unlock()
if !ok {
break
}
if err := encoder.Encode(*le); err != nil {
os.Remove(fn)
return fmt.Errorf("trim %d, %v: %s", prevIndex, *le, err)
}
i++
}
// Finish up under lock.
p.Lock()
defer p.Unlock()
for {
le, ok := p.logTail[prevIndex+Index(i)]
if !ok {
break
}
if err := encoder.Encode(*le); err != nil {
os.Remove(fn)
return fmt.Errorf("trim %d, %v: %s", prevIndex, *le, err)
}
i++
}
// Start using new file.
p.encoder = encoder
p.syncLog()
// Remove some logTail entries. Try to keep at least half of the entries around in case
// we'll need them to update a lagging member.
if prevIndex >= p.firstLogTailIndex {
i := p.firstLogTailIndex + (prevIndex-p.firstLogTailIndex)/2
for p.firstLogTailIndex <= i {
delete(p.logTail, p.firstLogTailIndex)
p.firstLogTailIndex++
}
}
return nil
}
// takeSnapshot is a go routine that starts a snapshot and on success trims the log.
// 'safeToProceed' is closed when it is safe to continue applying commands.
func (p *fsPersist) takeSnapshot(ctx *context.T, t Term, i Index, safeToProceed chan struct{}) error {
r := p.r
defer func() { p.Lock(); p.snapping = false; p.Unlock() }()
// Create a file for the snapshot. Lock to prevent any logs from being applied while we do this.
fn := p.snapPath(termIndexToFileName(t, i))
vlog.Infof("snapshot %s", fn)
fp, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
close(safeToProceed)
vlog.Errorf("snapshot %s: %s", fn, err)
return err
}
// Start the snapshot.
c := make(chan error)
if err := r.client.SaveToSnapshot(ctx, fp, c); err != nil {
close(safeToProceed)
fp.Close()
os.Remove(fn)
vlog.Errorf("snapshot %s: %s", fn, err)
return err
}
// At this point the client has saved whatever it needs for the snapshot (copy on write perhaps)
// so we can release the raft server to continue applying commands.
close(safeToProceed)
// Wait for the snapshot to finish.
if err := <-c; err != nil {
fp.Close()
os.Remove(fn)
vlog.Errorf("snapshot %s: %s", fn, err)
return err
}
if err := fp.Sync(); err != nil {
os.Remove(fn)
vlog.Errorf("snapshot %s: %s", fn, err)
return err
}
if err := fp.Close(); err != nil {
os.Remove(fn)
vlog.Errorf("snapshot %s: %s", fn, err)
return err
}
if err := p.trimLog(ctx, t, i); err != nil {
os.Remove(fn)
vlog.Errorf("snapshot %s: %s", fn, err)
return err
}
p.Lock()
p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex = fn, t, i
p.Unlock()
vlog.Infof("snapshot %s succeeded", fn)
return nil
}
// SnapshotFromLeader implements Persistence.SnapshotFromLeader. Called with p.r locked.
func (p *fsPersist) SnapshotFromLeader(ctx *context.T, term Term, index Index, call raftProtoInstallSnapshotServerCall) error {
r := p.r
// First securely save the snapshot.
fn := p.snapPath(termIndexToFileName(term, index))
fp, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
return err
}
rstream := call.RecvStream()
for rstream.Advance() {
value := rstream.Value()
if _, err := fp.Write(value); err != nil {
fp.Close()
os.Remove(fn)
return err
}
}
if err := fp.Sync(); err != nil {
fp.Close()
os.Remove(fn)
return err
}
if err := fp.Close(); err != nil {
os.Remove(fn)
return err
}
if err := rstream.Err(); err != nil {
os.Remove(fn)
return err
}
// Now try restoring client state with it.
fp, err = os.Open(fn)
if err != nil {
return err
}
defer fp.Close()
if err := r.client.RestoreFromSnapshot(ctx, index, fp); err != nil {
return err
}
r.applied.term, r.applied.index = term, index
p.Lock()
p.baseTerm, p.baseIndex = term, index
p.lastTerm, p.lastIndex = term, index
p.Unlock()
return nil
}
func (p *fsPersist) lookup(i Index) *logEntry {
if le, ok := p.logTail[i]; ok {
return le
}
return nil
}
// Lookup implements persistent.Lookup.
func (p *fsPersist) Lookup(i Index) *logEntry {
p.Lock()
defer p.Unlock()
return p.lookup(i)
}
// LookupPrevious implements persistent.LookupPrevious.
func (p *fsPersist) LookupPrevious(i Index) (Term, Index, bool) {
p.Lock()
defer p.Unlock()
i--
if i == p.baseIndex {
return p.baseTerm, i, true
}
le := p.lookup(i)
if le == nil {
return 0, i, false
}
return le.Term, i, true
}
// syncLog assumes that p is locked.
func (p *fsPersist) syncLog() error {
ce := ControlEntry{InUse: true, CurrentTerm: p.currentTerm, VotedFor: p.votedFor}
if err := p.encoder.Encode(ce); err != nil {
return fmt.Errorf("syncLog: %s", err)
}
p.lf.Sync()
return nil
}
type Entry struct {
LogEntry
ControlEntry
}
// readLog reads a log file into memory.
//
// Assumes p is locked.
func (p *fsPersist) readLog(file string) error {
vlog.Infof("reading %s", file)
f, err := os.OpenFile(file, os.O_RDONLY, 0666)
if err != nil {
return err
}
defer f.Close()
d := gob.NewDecoder(f)
for {
var e Entry
if err := d.Decode(&e); err != nil {
if err == io.EOF {
return nil
}
return err
}
if e.InUse {
p.currentTerm = e.CurrentTerm
p.votedFor = e.VotedFor
}
if e.Index != 0 && (p.lastIndex == 0 || e.Index <= p.lastIndex+1) {
p.addToLogTail(&LogEntry{Term: e.Term, Index: e.Index, Cmd: e.Cmd})
}
}
}
// addToLogTail adds the entry to the logTail if not already there and returns true
// if the logTail changed.
func (p *fsPersist) addToLogTail(wle *LogEntry) bool {
le := &logEntry{Term: wle.Term, Index: wle.Index, Cmd: wle.Cmd, Type: wle.Type}
ole, ok := p.logTail[le.Index]
if ok {
if ole.Term == le.Term {
return false
}
// Remove all higher entries.
for i := le.Index + 1; i <= p.lastIndex; i++ {
delete(p.logTail, i)
}
}
p.logTail[le.Index] = le
p.lastIndex = le.Index
p.lastTerm = le.Term
return true
}
// openPersist returns an object that implements the persistent interface. If the directory doesn't
// already exist, it is created. If there is a problem with the contained files, an error is returned.
func openPersist(ctx *context.T, r *raft, snapshotThreshold int64) (*fsPersist, error) {
if snapshotThreshold == 0 {
snapshotThreshold = defaultSnapshotThreshold
}
p := &fsPersist{r: r, dir: r.logDir, logTail: make(map[Index]*logEntry), client: r.client, snapshotThreshold: snapshotThreshold}
p.Lock()
defer p.Unlock()
// Randomize max size so all members aren't checkpointing at the same time.
p.snapshotThreshold = p.snapshotThreshold + rand.Int63n(1+(p.snapshotThreshold>>3))
// Read the persistent state, the latest snapshot, and any log entries since then.
if err := p.readState(ctx, r); err != nil {
if err := p.createState(); err != nil {
return nil, err
}
}
// Start a new command log.
if err := p.rotateLog(); err != nil {
return nil, err
}
return p, nil
}
func (p *fsPersist) snapPath(s string) string {
return path.Join(p.dir, s+".snap")
}
func (p *fsPersist) logPath(s string) string {
return path.Join(p.dir, s+".log")
}
func termIndexToFileName(t Term, i Index) string {
return fmt.Sprintf("%20.20d.%20.20d", t, i)
}
// paerseFileName returns the term and index from a file name of the form <term>.<index>[.whatever]
func parseFileName(s string) (Term, Index, error) {
p := strings.Split(path.Base(s), ".")
if len(p) < 2 {
return 0, 0, errors.New("not log name")
}
t, err := strconv.ParseInt(p[0], 10, 64)
if err != nil {
return 0, 0, err
}
if err != nil {
return 0, 0, err
}
i, err := strconv.ParseInt(p[1], 10, 64)
return Term(t), Index(i), nil
}
// readState is called on start up. State is recreated from the last valid snapshot and
// any subsequent log files. If any log file has a decoding error, we give up and return
// an error.
//
// TODO(p): This is perhaps too extreme since we should be able to continue from a snapshot
// and a prefix of the log.
//
// Assumes p is locked.
func (p *fsPersist) readState(ctx *context.T, r *raft) error {
d, err := os.Open(p.dir)
if err != nil {
return err
}
defer d.Close()
// Find all snapshot and log files.
vlog.Infof("reading directory %s", p.dir)
files, err := d.Readdirnames(0)
if err != nil {
return err
}
lmap := make(map[string]struct{})
var logs []string
smap := make(map[string]struct{})
for _, f := range files {
if strings.HasSuffix(f, ".log") {
f = strings.TrimSuffix(f, ".log")
lmap[f] = struct{}{}
logs = append(logs, f)
} else if strings.HasSuffix(f, ".snap") {
smap[strings.TrimSuffix(f, ".snap")] = struct{}{}
}
}
// Throw out any snapshots that don't have an equivalent log file; they are not complete.
for s := range smap {
if _, ok := lmap[s]; !ok {
os.Remove(p.snapPath(s))
delete(smap, s)
}
}
// Find the latest readable snapshot.
var snaps []string
for s := range smap {
snaps = append(snaps, s)
}
sort.StringSlice(snaps).Sort()
sort.StringSlice(logs).Sort()
firstLog := termIndexToFileName(0, 0)
for i := len(snaps) - 1; i >= 0; i-- {
f := p.snapPath(snaps[i])
fp, err := os.Open(f)
if err != nil {
os.Remove(f)
continue
}
term, index, err := parseFileName(snaps[i])
if err != nil {
os.Remove(f)
continue
}
vlog.Infof("restoring from snapshot %s", f)
err = p.client.RestoreFromSnapshot(ctx, index, fp)
vlog.Infof("restored from snapshot %s", err)
fp.Close()
if err == nil {
// The snapshot was readable, we're done.
firstLog = snaps[i]
// The name of the snapshot has the last applied entry
// in that snapshot. Remember it so that we won't reapply
// any log entries in case they aren't equipotent.
r.applied.term, r.applied.index = term, index
p.baseTerm, p.baseIndex = term, index
p.lastTerm, p.lastIndex = term, index
p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex = f, term, index
break
}
os.Remove(f)
}
// find the log that goes with the snapshot.
found := false
for _, l := range logs {
if l == firstLog {
found = true
}
f := p.logPath(l)
if !found {
// Remove stale log files.
os.Remove(f)
continue
}
// Give up on first bad/incomplete log entry. If we lost any log entries,
// we should be refreshed from some other member.
if err := p.readLog(f); err != nil {
vlog.Infof("reading %s: %s", f, err)
break
}
}
r.setMatchIndex(r.me, p.lastIndex)
return nil
}
func (p *fsPersist) OpenLatestSnapshot(ctx *context.T) (io.Reader, Term, Index, error) {
p.Lock()
if len(p.lastSnapFile) == 0 {
p.Unlock()
return nil, 0, 0, errors.New("no snapshot")
}
fn, t, i := p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex
p.Unlock()
fp, err := os.Open(fn)
if err != nil {
return nil, 0, 0, err
}
return fp, t, i, nil
}
// newLog starts a new log file. It doesn't point the p.encoder at it yet.
func (p *fsPersist) newLog(prevTerm Term, prevIndex Index) (string, *gob.Encoder, error) {
// Create an append only encoder for log entries.
fn := p.logPath(termIndexToFileName(prevTerm, prevIndex))
append, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
return "", nil, err
}
vlog.Infof("new log file %s", fn)
return fn, gob.NewEncoder(append), nil
}
// rotateLog starts a new log.
//
// Assumes p is already locked.
func (p *fsPersist) rotateLog() error {
_, encoder, err := p.newLog(p.lastTerm, p.lastIndex)
if err != nil {
return err
}
p.encoder = encoder
p.syncLog()
return nil
}
// createState throws out all state and starts again from scratch.
func (p *fsPersist) createState() error {
os.RemoveAll(p.dir)
if err := os.Mkdir(p.dir, 0770); err != nil {
return err
}
return nil
}