raft: new implementation
See model.go for the client interface to the library.
ACLs have now been added to control access.
Change-Id: I9cc7824cd9ba2c2f5adb8546ab6ea967e22e296d
diff --git a/lib/raft/README b/lib/raft/README
new file mode 100644
index 0000000..8c7c3f4
--- /dev/null
+++ b/lib/raft/README
@@ -0,0 +1,27 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+This is an implemetation of the Raft agreement protocol. Each raft
+member maintains a log of commands in the same order. All commands
+go through a single master. The master keeps track of the commit point,
+i.e., the point in the log where a quorum of servers have stored the log.
+Each server can apply the commands up to the commit point. When a client
+starts a member it provides a callback for applying the commands as they are
+committed. It is up to the application to make sure commands are idempotent.
+For example, it can remember the last command applied and not let any old
+ones be reapplied.
+
+Raft members use the file system to persist their log records across crashes.
+We're currently not syncing after writing each record to the disk to
+speed things up. Because of the idempotent callback, this will work as long
+as a member of the quorum survives each master reelection but I may have to
+eventualy rethink this.
+
+The leader sends heartbeat messages at a fixed interval (hb). Each follower will
+trigger a new election if it hasn't heard from the leader in an interval 2.x * hb,
+where x is a random number between 0 and 1. The random interval reduces but does
+not eliminate the likelihood of two elections starting simultaneously.
+
+The VDL protocol is internal, i.e., it is just for raft members to talk to each
+other.
diff --git a/lib/raft/client_test.go b/lib/raft/client_test.go
new file mode 100644
index 0000000..d8d5422
--- /dev/null
+++ b/lib/raft/client_test.go
@@ -0,0 +1,180 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+ "encoding/json"
+ "io"
+ "io/ioutil"
+ "os"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/x/lib/vlog"
+ _ "v.io/x/ref/runtime/factories/generic"
+)
+
+type client struct {
+ sync.RWMutex
+ cmds [][]byte // applied commands
+ id string
+ applied Index
+}
+
+func (c *client) Apply(cmd []byte, index Index) error {
+ //vlog.Infof("Applying %d %s", index, cmd)
+ c.Lock()
+ c.cmds = append(c.cmds, cmd)
+ c.applied = index
+ c.Unlock()
+ return nil
+}
+
+func (c *client) Applied() Index {
+ c.RLock()
+ defer c.RUnlock()
+ return c.applied
+}
+
+func (c *client) SaveToSnapshot(ctx *context.T, wr io.Writer, response chan<- error) error {
+ close(response)
+ c.RLock()
+ defer c.RUnlock()
+ return json.NewEncoder(wr).Encode(c.cmds)
+}
+
+func (c *client) RestoreFromSnapshot(ctx *context.T, index Index, rd io.Reader) error {
+ c.Lock()
+ defer c.Unlock()
+ c.applied = index
+ return json.NewDecoder(rd).Decode(&c.cmds)
+}
+
+func (c *client) LeaderChange(me, leader string) {
+ if me == leader {
+ vlog.Infof("%s now leader", leader)
+ } else {
+ vlog.Infof("%s recognizes %s as leader", me, leader)
+ }
+}
+
+func (c *client) Compare(t *testing.T, nc *client) {
+ c.RLock()
+ defer c.RUnlock()
+ nc.RLock()
+ defer nc.RUnlock()
+ if !reflect.DeepEqual(c.cmds, nc.cmds) {
+ t.Fatalf("%v != %v", c.cmds, nc.cmds)
+ }
+}
+
+// buildRafts creates a set of raft members and starts up the services.
+func buildRafts(t *testing.T, ctx *context.T, n int, config *RaftConfig) ([]*raft, []*client) {
+ if config == nil {
+ config = new(RaftConfig)
+ }
+ config.Heartbeat = time.Second
+ if len(config.HostPort) == 0 {
+ config.HostPort = "127.0.0.1:0"
+ }
+ // Start each server with its own log directory.
+ var rs []*raft
+ var cs []*client
+ var td []string
+ for i := 0; i < n; i++ {
+ if n > 1 || len(config.LogDir) == 0 {
+ config.LogDir = tempDir(t)
+ }
+ c := new(client)
+ r, err := newRaft(ctx, config, c)
+ if err != nil {
+ t.Fatalf("NewRaft: %s", err)
+ }
+ td = append(td, config.LogDir)
+ c.id = r.Id()
+ rs = append(rs, r)
+ cs = append(cs, c)
+ vlog.Infof("id is %s", r.Id())
+ }
+ // Tell each server about the complete set.
+ for i := range rs {
+ for j := range rs {
+ rs[i].AddMember(ctx, rs[j].Id())
+ }
+ }
+ // Start the servers up.
+ for i := range rs {
+ rs[i].Start()
+ }
+ return rs, cs
+}
+
+// restart a member from scratch, keeping its address and log name.
+func restart(t *testing.T, ctx *context.T, rs []*raft, cs []*client, r *raft) {
+ config := RaftConfig{HostPort: r.me.id[1:], LogDir: r.logDir}
+ c := new(client)
+ rn, err := newRaft(ctx, &config, c)
+ if err != nil {
+ t.Fatalf("NewRaft: %s", err)
+ }
+ for i := range rs {
+ if rs[i] == r {
+ rs[i] = rn
+ cs[i] = c
+ c.id = rn.Id()
+ break
+ }
+ }
+ for j := range rs {
+ rn.AddMember(ctx, rs[j].Id())
+ }
+ rn.Start()
+}
+
+// cleanUp all the rafts.
+func cleanUp(rs []*raft) {
+ for i := range rs {
+ rs[i].Stop()
+ os.RemoveAll(rs[i].logDir)
+ }
+}
+
+func TestClientSnapshot(t *testing.T) {
+ vlog.Infof("TestCreation")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ // Make sure the test client works as expected.
+ c := new(client)
+ for i, cmd := range []string{"the", "rain", "in", "spain", "falls", "mainly", "on", "the", "plain"} {
+ c.Apply([]byte(cmd), Index(i))
+ }
+ fp, err := ioutil.TempFile(".", "TestClientSnapshot")
+ if err != nil {
+ t.Fatalf("can't create snapshot: %s", err)
+ }
+ done := make(chan error)
+ if err := c.SaveToSnapshot(ctx, fp, done); err != nil {
+ t.Fatalf("can't write snapshot: %s", err)
+ }
+ <-done
+ fp.Sync()
+ fp.Seek(0, 0)
+ nc := new(client)
+ if err != nil {
+ t.Fatalf("can't open snapshot: %s", err)
+ }
+ if err := nc.RestoreFromSnapshot(ctx, 0, fp); err != nil {
+ t.Fatalf("can't read snapshot: %s", err)
+ }
+ fp.Close()
+ os.Remove(fp.Name())
+ c.Compare(t, nc)
+ vlog.Infof("TestCreation passed")
+}
diff --git a/lib/raft/election_test.go b/lib/raft/election_test.go
new file mode 100644
index 0000000..b690408
--- /dev/null
+++ b/lib/raft/election_test.go
@@ -0,0 +1,166 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+ "testing"
+ "time"
+
+ "v.io/x/lib/vlog"
+
+ "v.io/v23"
+ _ "v.io/x/ref/runtime/factories/generic"
+)
+
+// waitForElection waits for a new leader to be elected. We also make sure there
+// is only one leader.
+func waitForElection(t *testing.T, rs []*raft, timeout time.Duration) *raft {
+ start := time.Now()
+ for {
+ var leader *raft
+ leaders := 0
+ for _, r := range rs {
+ id, role, _ := r.Status()
+ if role == RoleLeader {
+ vlog.Infof("%s is leader", id)
+ leaders++
+ leader = r
+ }
+ }
+ if leaders > 1 {
+ t.Fatalf("found %d leaders", leaders)
+ }
+ if leader != nil {
+ return leader
+ }
+ if time.Now().Sub(start) > timeout {
+ return nil
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}
+
+// waitForLeaderAgreement makes sure all working servers agree on the leader.
+func waitForLeaderAgreement(rs []*raft, timeout time.Duration) bool {
+ start := time.Now()
+ for {
+ leader := make(map[string]string)
+ for _, r := range rs {
+ id, role, l := r.Status()
+ switch role {
+ case RoleLeader:
+ leader[id] = id
+ case RoleFollower:
+ leader[l] = id
+ }
+ }
+ if len(leader) == 1 {
+ return true
+ }
+ if time.Now().Sub(start) > timeout {
+ vlog.Errorf("oops %v", leader)
+ return false
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func TestElection(t *testing.T) {
+ vlog.Infof("TestElection")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ rs, cs := buildRafts(t, ctx, 5, nil)
+ defer cleanUp(rs)
+ thb := rs[0].heartbeat
+
+ // One of the raft members should time out not hearing a leader and start an election.
+ r1 := waitForElection(t, rs, 5*thb)
+ if r1 == nil {
+ t.Fatalf("too long to find a leader")
+ }
+ time.Sleep(time.Millisecond)
+ if !waitForLeaderAgreement(rs, thb) {
+ t.Fatalf("no leader agreement")
+ }
+
+ // Stop the leader and wait for the next election.
+ r1.Stop()
+ r2 := waitForElection(t, rs, 5*thb)
+ if r2 == nil {
+ t.Fatalf("too long to find a leader")
+ }
+ if !waitForLeaderAgreement(rs, thb) {
+ t.Fatalf("no leader agreement")
+ }
+
+ // One more time.
+ r2.Stop()
+ r3 := waitForElection(t, rs, 5*thb)
+ if r3 == nil {
+ t.Fatalf("too long to find a leader")
+ }
+ if !waitForLeaderAgreement(rs, thb) {
+ t.Fatalf("no leader agreement")
+ }
+
+ // One more time. Shouldn't succeed since we no longer have a quorum.
+ r3.Stop()
+ r4 := waitForElection(t, rs, 5*thb)
+ if r4 != nil {
+ t.Fatalf("shouldn't have a leader with no quorum")
+ }
+
+ // Restart r1. We should be back to a quorum so an election should succeed.
+ restart(t, ctx, rs, cs, r1)
+ r4 = waitForElection(t, rs, 5*thb)
+ if r4 == nil {
+ t.Fatalf("too long to find a leader")
+ }
+ if !waitForLeaderAgreement(rs, thb) {
+ t.Fatalf("no leader agreement")
+ }
+
+ // Restart r2. Within thb time the new guy should agree with everyone else on who the leader is.
+ restart(t, ctx, rs, cs, r2)
+ if !waitForLeaderAgreement(rs, 3*thb) {
+ t.Fatalf("no leader agreement")
+ }
+ vlog.Infof("TestElection passed")
+}
+
+func TestPerformanceElection(t *testing.T) {
+ vlog.Infof("TestPerformanceElection")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ rs, _ := buildRafts(t, ctx, 5, nil)
+ defer cleanUp(rs)
+ thb := rs[0].heartbeat
+
+ // One of the raft members should time out not hearing a leader and start an election.
+ r1 := waitForElection(t, rs, 5*thb)
+ if r1 == nil {
+ t.Fatalf("too long to find a leader")
+ }
+ time.Sleep(time.Millisecond)
+ if !waitForLeaderAgreement(rs, thb) {
+ t.Fatalf("no leader agreement")
+ }
+ vlog.Infof("starting 1000 elections")
+
+ // Now force 1000 elections.
+ start := time.Now()
+ for i := 0; i < 200; i++ {
+ x := i % 5
+ rs[x].StartElection()
+ if !waitForLeaderAgreement(rs, 5*thb) {
+ t.Fatalf("no leader agreement")
+ }
+ }
+ duration := time.Now().Sub(start)
+ vlog.Infof("200 elections took %s", duration)
+ vlog.Infof("TestPerformanceElection passed")
+}
diff --git a/lib/raft/fspersistence.go b/lib/raft/fspersistence.go
new file mode 100644
index 0000000..f57666f
--- /dev/null
+++ b/lib/raft/fspersistence.go
@@ -0,0 +1,696 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+// The persistent log consists of a directory containing the files:
+// snap.<term>.<index> - snapshots of the client database to which all log entries
+// up to <term>.<index> have been applied.
+// log.<term>.<index> - a log commencing after <term>.<index>
+//
+// The tail of the log is kept in memory. Once the tail is long enough (> snapshotThreshold) a
+// snapshot is taken.
+
+import (
+ "encoding/gob"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "os"
+ "path"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+
+ "v.io/x/lib/vlog"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+const (
+ headerLen = 1024
+ defaultSnapshotThreshold = 10 * 1024 * 1024
+)
+
+// fsPersist is persistent state implemented in a file system. It stores the state in a single directory.
+// One file, <directory>/persistent, represents the persistent non-log state and a series of files,
+// <directory>/log<n>, the logs. A new log file is created on startup and after every snapshot.
+type fsPersist struct {
+ sync.Mutex
+
+ r *raft
+ currentTerm Term
+ votedFor string
+
+ dir string // the directory
+ client RaftClient
+ baseIndex Index // The index before the log starts (either 0 or from a snapshot).
+ baseTerm Term // The term before the log starts (either 0 or from a snapshot).
+ lastIndex Index // last index stored to
+ lastTerm Term // term for entry at lastIindex
+
+ // For appending the log file.
+ lf *os.File
+ encoder *gob.Encoder
+
+ // In-memory version of the log file. We depend on log truncation to keep this
+ // to a reasonable size.
+ logTail map[Index]*logEntry
+ firstLogTailIndex Index
+ snapshotThreshold int64
+
+ // Name of last snapshot read or written.
+ lastSnapFile string
+ lastSnapTerm Term
+ lastSnapIndex Index
+ snapping bool // true if we are in the process of creating a snapshot.
+}
+
+// ControlEntry's are appended to the log to reflect changes in the current term and/or the voted for
+// leader during ellections
+type ControlEntry struct {
+ InUse bool
+ CurrentTerm Term
+ VotedFor string
+}
+
+// logEntrySize is an approximation of the size of a log entry.
+func logEntrySize(le *LogEntry) int64 {
+ return int64(32 + len(le.Cmd))
+}
+
+// SetCurrentTerm implements persistent.SetCurrentTerm.
+func (p *fsPersist) SetCurrentTerm(ct Term) error {
+ p.Lock()
+ defer p.Unlock()
+ p.currentTerm = ct
+ return p.syncLog()
+}
+
+// IncCurrentTerm increments the current term.
+func (p *fsPersist) IncCurrentTerm() error {
+ p.Lock()
+ defer p.Unlock()
+ p.currentTerm = p.currentTerm + 1
+ return p.syncLog()
+}
+
+// SetVotedFor implements persistent.SetVotedFor.
+func (p *fsPersist) SetVotedFor(vf string) error {
+ p.Lock()
+ defer p.Unlock()
+ if vf == p.votedFor {
+ return nil
+ }
+ p.votedFor = vf
+ return p.syncLog()
+}
+
+// SetVotedFor implements persistent.SetCurrentTermAndVotedFor.
+func (p *fsPersist) SetCurrentTermAndVotedFor(ct Term, vf string) error {
+ p.Lock()
+ defer p.Unlock()
+ p.currentTerm = ct
+ p.votedFor = vf
+ return p.syncLog()
+}
+
+// CurrentTerm implements persistent.CurrentTerm.
+func (p *fsPersist) CurrentTerm() Term {
+ p.Lock()
+ defer p.Unlock()
+ return p.currentTerm
+}
+
+// LastIndex implements persistent.LastIndex.
+func (p *fsPersist) LastIndex() Index {
+ p.Lock()
+ defer p.Unlock()
+ return p.lastIndex
+}
+
+// LastTerm implements persistent.LastTerm.
+func (p *fsPersist) LastTerm() Term {
+ p.Lock()
+ defer p.Unlock()
+ return p.lastTerm
+}
+
+// VotedFor implements persistent.VotedFor.
+func (p *fsPersist) VotedFor() string {
+ p.Lock()
+ defer p.Unlock()
+ return p.votedFor
+}
+
+// Close implements persistent.Close.
+func (p *fsPersist) Close() {
+ p.lf.Sync()
+ p.lf.Close()
+}
+
+// AppendToLog implements persistent.AppendToLog.
+func (p *fsPersist) AppendToLog(ctx *context.T, prevTerm Term, prevIndex Index, entries []LogEntry) error {
+ p.Lock()
+ defer p.Unlock()
+
+ if prevIndex != p.baseIndex || prevTerm != p.baseTerm {
+ // We will not log if the previous entry either doesn't exist or has the wrong term.
+ le := p.lookup(prevIndex)
+ if le == nil {
+ return verror.New(errOutOfSequence, ctx, prevTerm, prevIndex)
+ } else if le.Term != prevTerm {
+ return verror.New(errOutOfSequence, ctx, prevTerm, prevIndex)
+ }
+ }
+ for i, e := range entries {
+ // If its already in the log, do nothing.
+ le, ok := p.logTail[prevIndex+Index(i)+1]
+ if ok && le.Term == e.Term {
+ continue
+ }
+
+ // Add both to the log logTail and the log file.
+ // TODO(p): Think about syncing the output before returning.
+ ne := e
+ if err := p.encoder.Encode(ne); err != nil {
+ return fmt.Errorf("append %d, %d, %v: %s", prevTerm, prevIndex, ne, err)
+ }
+ p.addToLogTail(&ne)
+ }
+
+ return nil
+}
+
+// ConsiderSnapshot implements Persistent.ConsiderSnapshot.
+func (p *fsPersist) ConsiderSnapshot(ctx *context.T, lastAppliedTerm Term, lastAppliedIndex Index) {
+ p.Lock()
+ if p.snapping {
+ p.Unlock()
+ return
+ }
+
+ // Take a snapshot if the log is too big.
+ if int64(lastAppliedIndex-p.firstLogTailIndex) < p.snapshotThreshold {
+ p.Unlock()
+ return
+ }
+ p.snapping = true
+ p.Unlock()
+
+ safeToProceed := make(chan struct{})
+ go p.takeSnapshot(ctx, lastAppliedTerm, lastAppliedIndex, safeToProceed)
+ <-safeToProceed
+}
+
+// TrimLog trims the log to contain only entries from prevIndex on. This will fail if prevIndex isn't
+// in the logTail or if creating or writing the new log file fails.
+func (p *fsPersist) trimLog(ctx *context.T, prevTerm Term, prevIndex Index) error {
+ // Nothing to trim?
+ vlog.Infof("trimLog(%d, %d)", prevTerm, prevIndex)
+ if prevIndex == 0 {
+ return nil
+ }
+
+ p.Lock()
+ le := p.lookup(prevIndex)
+ p.Unlock()
+ if le == nil {
+ return verror.New(errOutOfSequence, ctx, 0, prevIndex)
+ }
+
+ // Create a new log file.
+ fn, encoder, err := p.newLog(prevTerm, prevIndex)
+ if err != nil {
+ return err
+ }
+
+ // Append all log entries after prevIndex. Unlock while writing so we don't
+ // overly affect performance. We might miss some entries that are added while the
+ // lock is released so redo this loop later under lock.
+ i := 1
+ for {
+ p.Lock()
+ le, ok := p.logTail[prevIndex+Index(i)]
+ p.Unlock()
+ if !ok {
+ break
+ }
+ if err := encoder.Encode(*le); err != nil {
+ os.Remove(fn)
+ return fmt.Errorf("trim %d, %v: %s", prevIndex, *le, err)
+ }
+ i++
+ }
+
+ // Finish up under lock.
+ p.Lock()
+ defer p.Unlock()
+ for {
+ le, ok := p.logTail[prevIndex+Index(i)]
+ if !ok {
+ break
+ }
+ if err := encoder.Encode(*le); err != nil {
+ os.Remove(fn)
+ return fmt.Errorf("trim %d, %v: %s", prevIndex, *le, err)
+ }
+ i++
+ }
+
+ // Start using new file.
+ p.encoder = encoder
+ p.syncLog()
+
+ // Remove some logTail entries. Try to keep at least half of the entries around in case
+ // we'll need them to update a lagging member.
+ if prevIndex >= p.firstLogTailIndex {
+ i := p.firstLogTailIndex + (prevIndex-p.firstLogTailIndex)/2
+ for p.firstLogTailIndex <= i {
+ delete(p.logTail, p.firstLogTailIndex)
+ p.firstLogTailIndex++
+ }
+ }
+
+ return nil
+}
+
+// takeSnapshot is a go routine that starts a snapshot and on success trims the log.
+// 'safeToProceed' is closed when it is safe to continue applying commands.
+func (p *fsPersist) takeSnapshot(ctx *context.T, t Term, i Index, safeToProceed chan struct{}) error {
+ r := p.r
+ defer func() { p.Lock(); p.snapping = false; p.Unlock() }()
+
+ // Create a file for the snapshot. Lock to prevent any logs from being applied while we do this.
+ fn := p.snapPath(termIndexToFileName(t, i))
+ vlog.Infof("snapshot %s", fn)
+ fp, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
+ if err != nil {
+ close(safeToProceed)
+ vlog.Errorf("snapshot %s: %s", fn, err)
+ return err
+ }
+
+ // Start the snapshot.
+ c := make(chan error)
+ if err := r.client.SaveToSnapshot(ctx, fp, c); err != nil {
+ close(safeToProceed)
+ fp.Close()
+ os.Remove(fn)
+ vlog.Errorf("snapshot %s: %s", fn, err)
+ return err
+ }
+
+ // At this point the client has saved whatever it needs for the snapshot (copy on write perhaps)
+ // so we can release the raft server to continue applying commands.
+ close(safeToProceed)
+
+ // Wait for the snapshot to finish.
+ if err := <-c; err != nil {
+ fp.Close()
+ os.Remove(fn)
+ vlog.Errorf("snapshot %s: %s", fn, err)
+ return err
+ }
+ if err := fp.Sync(); err != nil {
+ os.Remove(fn)
+ vlog.Errorf("snapshot %s: %s", fn, err)
+ return err
+ }
+ if err := fp.Close(); err != nil {
+ os.Remove(fn)
+ vlog.Errorf("snapshot %s: %s", fn, err)
+ return err
+ }
+
+ if err := p.trimLog(ctx, t, i); err != nil {
+ os.Remove(fn)
+ vlog.Errorf("snapshot %s: %s", fn, err)
+ return err
+ }
+
+ p.Lock()
+ p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex = fn, t, i
+ p.Unlock()
+
+ vlog.Infof("snapshot %s succeeded", fn)
+ return nil
+}
+
+// SnapshotFromLeader implements Persistence.SnapshotFromLeader. Called with p.r locked.
+func (p *fsPersist) SnapshotFromLeader(ctx *context.T, term Term, index Index, call raftProtoInstallSnapshotServerCall) error {
+ r := p.r
+
+ // First securely save the snapshot.
+ fn := p.snapPath(termIndexToFileName(term, index))
+ fp, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ rstream := call.RecvStream()
+ for rstream.Advance() {
+ value := rstream.Value()
+ if _, err := fp.Write(value); err != nil {
+ fp.Close()
+ os.Remove(fn)
+ return err
+ }
+ }
+ if err := fp.Sync(); err != nil {
+ fp.Close()
+ os.Remove(fn)
+ return err
+ }
+ if err := fp.Close(); err != nil {
+ os.Remove(fn)
+ return err
+ }
+ if err := rstream.Err(); err != nil {
+ os.Remove(fn)
+ return err
+ }
+
+ // Now try restoring client state with it.
+ fp, err = os.Open(fn)
+ if err != nil {
+ return err
+ }
+ defer fp.Close()
+ if err := r.client.RestoreFromSnapshot(ctx, index, fp); err != nil {
+ return err
+ }
+ r.applied.term, r.applied.index = term, index
+ p.Lock()
+ p.baseTerm, p.baseIndex = term, index
+ p.lastTerm, p.lastIndex = term, index
+ p.Unlock()
+ return nil
+}
+
+func (p *fsPersist) lookup(i Index) *logEntry {
+ if le, ok := p.logTail[i]; ok {
+ return le
+ }
+ return nil
+}
+
+// Lookup implements persistent.Lookup.
+func (p *fsPersist) Lookup(i Index) *logEntry {
+ p.Lock()
+ defer p.Unlock()
+ return p.lookup(i)
+}
+
+// LookupPrevious implements persistent.LookupPrevious.
+func (p *fsPersist) LookupPrevious(i Index) (Term, Index, bool) {
+ p.Lock()
+ defer p.Unlock()
+ i--
+ if i == p.baseIndex {
+ return p.baseTerm, i, true
+ }
+ le := p.lookup(i)
+ if le == nil {
+ return 0, i, false
+ }
+ return le.Term, i, true
+}
+
+// syncLog assumes that p is locked.
+func (p *fsPersist) syncLog() error {
+ ce := ControlEntry{InUse: true, CurrentTerm: p.currentTerm, VotedFor: p.votedFor}
+ if err := p.encoder.Encode(ce); err != nil {
+ return fmt.Errorf("syncLog: %s", err)
+ }
+ p.lf.Sync()
+ return nil
+}
+
+type Entry struct {
+ LogEntry
+ ControlEntry
+}
+
+// readLog reads a log file into memory.
+//
+// Assumes p is locked.
+func (p *fsPersist) readLog(file string) error {
+ vlog.Infof("reading %s", file)
+ f, err := os.OpenFile(file, os.O_RDONLY, 0666)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ d := gob.NewDecoder(f)
+ for {
+ var e Entry
+ if err := d.Decode(&e); err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ if e.InUse {
+ p.currentTerm = e.CurrentTerm
+ p.votedFor = e.VotedFor
+ }
+ if e.Index != 0 && (p.lastIndex == 0 || e.Index <= p.lastIndex+1) {
+ p.addToLogTail(&LogEntry{Term: e.Term, Index: e.Index, Cmd: e.Cmd})
+ }
+ }
+}
+
+// addToLogTail adds the entry to the logTail if not already there and returns true
+// if the logTail changed.
+func (p *fsPersist) addToLogTail(wle *LogEntry) bool {
+ le := &logEntry{Term: wle.Term, Index: wle.Index, Cmd: wle.Cmd, Type: wle.Type}
+ ole, ok := p.logTail[le.Index]
+ if ok {
+ if ole.Term == le.Term {
+ return false
+ }
+ // Remove all higher entries.
+ for i := le.Index + 1; i <= p.lastIndex; i++ {
+ delete(p.logTail, i)
+ }
+ }
+ p.logTail[le.Index] = le
+ p.lastIndex = le.Index
+ p.lastTerm = le.Term
+ return true
+}
+
+// openPersist returns an object that implements the persistent interface. If the directory doesn't
+// already exist, it is created. If there is a problem with the contained files, an error is returned.
+func openPersist(ctx *context.T, r *raft, snapshotThreshold int64) (*fsPersist, error) {
+ if snapshotThreshold == 0 {
+ snapshotThreshold = defaultSnapshotThreshold
+ }
+ p := &fsPersist{r: r, dir: r.logDir, logTail: make(map[Index]*logEntry), client: r.client, snapshotThreshold: snapshotThreshold}
+ p.Lock()
+ defer p.Unlock()
+
+ // Randomize max size so all members aren't checkpointing at the same time.
+ p.snapshotThreshold = p.snapshotThreshold + rand.Int63n(1+(p.snapshotThreshold>>3))
+
+ // Read the persistent state, the latest snapshot, and any log entries since then.
+ if err := p.readState(ctx, r); err != nil {
+ if err := p.createState(); err != nil {
+ return nil, err
+ }
+ }
+
+ // Start a new command log.
+ if err := p.rotateLog(); err != nil {
+ return nil, err
+ }
+ return p, nil
+}
+
+func (p *fsPersist) snapPath(s string) string {
+ return path.Join(p.dir, s+".snap")
+}
+
+func (p *fsPersist) logPath(s string) string {
+ return path.Join(p.dir, s+".log")
+}
+
+func termIndexToFileName(t Term, i Index) string {
+ return fmt.Sprintf("%20.20d.%20.20d", t, i)
+}
+
+// paerseFileName returns the term and index from a file name of the form <term>.<index>[.whatever]
+func parseFileName(s string) (Term, Index, error) {
+ p := strings.Split(path.Base(s), ".")
+ if len(p) < 2 {
+ return 0, 0, errors.New("not log name")
+ }
+ t, err := strconv.ParseInt(p[0], 10, 64)
+ if err != nil {
+ return 0, 0, err
+ }
+ if err != nil {
+ return 0, 0, err
+ }
+ i, err := strconv.ParseInt(p[1], 10, 64)
+ return Term(t), Index(i), nil
+}
+
+// readState is called on start up. State is recreated from the last valid snapshot and
+// any subsequent log files. If any log file has a decoding error, we give up and return
+// an error.
+//
+// TODO(p): This is perhaps too extreme since we should be able to continue from a snapshot
+// and a prefix of the log.
+//
+// Assumes p is locked.
+func (p *fsPersist) readState(ctx *context.T, r *raft) error {
+ d, err := os.Open(p.dir)
+ if err != nil {
+ return err
+ }
+ defer d.Close()
+
+ // Find all snapshot and log files.
+ vlog.Infof("reading directory %s", p.dir)
+ files, err := d.Readdirnames(0)
+ if err != nil {
+ return err
+ }
+ lmap := make(map[string]struct{})
+ var logs []string
+ smap := make(map[string]struct{})
+ for _, f := range files {
+ if strings.HasSuffix(f, ".log") {
+ f = strings.TrimSuffix(f, ".log")
+ lmap[f] = struct{}{}
+ logs = append(logs, f)
+ } else if strings.HasSuffix(f, ".snap") {
+ smap[strings.TrimSuffix(f, ".snap")] = struct{}{}
+ }
+ }
+
+ // Throw out any snapshots that don't have an equivalent log file; they are not complete.
+ for s := range smap {
+ if _, ok := lmap[s]; !ok {
+ os.Remove(p.snapPath(s))
+ delete(smap, s)
+ }
+ }
+
+ // Find the latest readable snapshot.
+ var snaps []string
+ for s := range smap {
+ snaps = append(snaps, s)
+ }
+ sort.StringSlice(snaps).Sort()
+ sort.StringSlice(logs).Sort()
+ firstLog := termIndexToFileName(0, 0)
+ for i := len(snaps) - 1; i >= 0; i-- {
+ f := p.snapPath(snaps[i])
+ fp, err := os.Open(f)
+ if err != nil {
+ os.Remove(f)
+ continue
+ }
+ term, index, err := parseFileName(snaps[i])
+ if err != nil {
+ os.Remove(f)
+ continue
+ }
+ vlog.Infof("restoring from snapshot %s", f)
+ err = p.client.RestoreFromSnapshot(ctx, index, fp)
+ vlog.Infof("restored from snapshot %s", err)
+ fp.Close()
+ if err == nil {
+ // The snapshot was readable, we're done.
+ firstLog = snaps[i]
+ // The name of the snapshot has the last applied entry
+ // in that snapshot. Remember it so that we won't reapply
+ // any log entries in case they aren't equipotent.
+ r.applied.term, r.applied.index = term, index
+ p.baseTerm, p.baseIndex = term, index
+ p.lastTerm, p.lastIndex = term, index
+ p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex = f, term, index
+ break
+ }
+ os.Remove(f)
+ }
+
+ // find the log that goes with the snapshot.
+ found := false
+ for _, l := range logs {
+ if l == firstLog {
+ found = true
+ }
+ f := p.logPath(l)
+ if !found {
+ // Remove stale log files.
+ os.Remove(f)
+ continue
+ }
+ // Give up on first bad/incomplete log entry. If we lost any log entries,
+ // we should be refreshed from some other member.
+ if err := p.readLog(f); err != nil {
+ vlog.Infof("reading %s: %s", f, err)
+ break
+ }
+ }
+ r.setMatchIndex(r.me, p.lastIndex)
+
+ return nil
+}
+
+func (p *fsPersist) OpenLatestSnapshot(ctx *context.T) (io.Reader, Term, Index, error) {
+ p.Lock()
+ if len(p.lastSnapFile) == 0 {
+ p.Unlock()
+ return nil, 0, 0, errors.New("no snapshot")
+ }
+ fn, t, i := p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex
+ p.Unlock()
+ fp, err := os.Open(fn)
+ if err != nil {
+ return nil, 0, 0, err
+ }
+ return fp, t, i, nil
+}
+
+// newLog starts a new log file. It doesn't point the p.encoder at it yet.
+func (p *fsPersist) newLog(prevTerm Term, prevIndex Index) (string, *gob.Encoder, error) {
+ // Create an append only encoder for log entries.
+ fn := p.logPath(termIndexToFileName(prevTerm, prevIndex))
+ append, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
+ if err != nil {
+ return "", nil, err
+ }
+ vlog.Infof("new log file %s", fn)
+ return fn, gob.NewEncoder(append), nil
+}
+
+// rotateLog starts a new log.
+//
+// Assumes p is already locked.
+func (p *fsPersist) rotateLog() error {
+ _, encoder, err := p.newLog(p.lastTerm, p.lastIndex)
+ if err != nil {
+ return err
+ }
+ p.encoder = encoder
+ p.syncLog()
+ return nil
+}
+
+// createState throws out all state and starts again from scratch.
+func (p *fsPersist) createState() error {
+ os.RemoveAll(p.dir)
+ if err := os.Mkdir(p.dir, 0770); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/lib/raft/fspersistence_test.go b/lib/raft/fspersistence_test.go
new file mode 100644
index 0000000..5a0e1a8
--- /dev/null
+++ b/lib/raft/fspersistence_test.go
@@ -0,0 +1,332 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path"
+ "strings"
+ "testing"
+ "time"
+
+ "v.io/v23"
+ "v.io/x/lib/vlog"
+)
+
+func tempDir(t *testing.T) string {
+ // Get a temporary file name that is unlikely to clash with anyone else's.
+ dir, err := ioutil.TempDir("", "raftstate")
+ if err != nil {
+ t.Fatalf("TempDir: %s", err)
+ }
+ os.Remove(dir)
+ vlog.Infof("log is %s", dir)
+ return dir
+}
+
+func compareLogs(t *testing.T, p persistent, expected []LogEntry, tag string) {
+ found := 0
+outer:
+ for i := Index(0); i < 1000; i++ {
+ le := p.Lookup(i)
+ if le == nil {
+ continue
+ }
+ for _, v := range expected {
+ if v.Index != le.Index {
+ continue
+ }
+ if v.Term != le.Term || string(v.Cmd) != string(le.Cmd) {
+ t.Fatalf("%s: expected %v, got %v", tag, v, *le)
+ }
+ found++
+ continue outer
+ }
+ t.Fatalf("unexpected: %v", *le)
+ }
+ if found != len(expected) {
+ t.Fatalf("%s: not all entries found, got %d out of %v", tag, found, expected)
+ }
+}
+
+// TestCreation verifies that a log is created when we start a fresh instance and is still there after we stop it.
+func TestCreation(t *testing.T) {
+ vlog.Infof("TestCreation")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ td := tempDir(t)
+ defer os.RemoveAll(td)
+
+ // Launch a raft. This should create the log file and append at least one control entry to it.
+ config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td}
+ r, err := newRaft(ctx, &config, new(client))
+ if err != nil {
+ t.Fatalf("newRaft: %s", err)
+ }
+ vlog.Infof("id is %s", r.Id())
+
+ // Make sure the file is there.
+ info, err := os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
+ if err != nil {
+ t.Fatalf("File didn't last: %s", err)
+ }
+ if info.Size() == 0 {
+ t.Fatalf("File too short: %d", info.Size())
+ }
+ r.Stop()
+
+ // Make sure the file is still there after shutdown.
+ info, err = os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
+ if err != nil {
+ t.Fatalf("File didn't last: %s", err)
+ }
+ if info.Size() == 0 {
+ t.Fatalf("File too short: %d", info.Size())
+ }
+ vlog.Infof("TestCreation passed")
+}
+
+// TestPersistence verifies that we store enough state to return to the previous log state
+// after a orderly stop and restart.
+//
+// We also test that a truncated log remains truncated after an orderly stop and restart.
+func TestPersistence(t *testing.T) {
+ vlog.Infof("TestPersistence")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ td := tempDir(t)
+ defer os.RemoveAll(td)
+
+ // Launch a raft. This should create the file and populate the header with the
+ // nil values. The file should persist after the raft is closed.
+ config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td}
+ r, err := newRaft(ctx, &config, new(client))
+ if err != nil {
+ t.Fatalf("newRaft: %s", err)
+ }
+ vlog.Infof("id is %s", r.Id())
+
+ // Set the persistent state.
+ if err := r.p.SetCurrentTerm(2); err != nil {
+ t.Fatalf("SetCurrentTerm: %s", err)
+ }
+ if err := r.p.SetVotedFor("whocares"); err != nil {
+ t.Fatalf("SetVotedFor: %s", err)
+ }
+ cmds1 := []LogEntry{
+ LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
+ LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
+ LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
+ }
+ if err := r.p.AppendToLog(ctx, 0, 0, cmds1); err != nil {
+ t.Fatalf("AppendToLog: %s", err)
+ }
+ cmds2 := []LogEntry{
+ LogEntry{Term: 3, Index: 4, Cmd: []byte("cmd4")},
+ LogEntry{Term: 3, Index: 5, Cmd: []byte("cmd5")},
+ LogEntry{Term: 3, Index: 6, Cmd: []byte("cmd6")},
+ }
+ if err := r.p.AppendToLog(ctx, 2, 3, cmds2); err != nil {
+ t.Fatalf("AppendToLog: %s", err)
+ }
+ info, err := os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
+ if err != nil {
+ t.Fatalf("File didn't last: %s", err)
+ }
+ vlog.Infof("log size %d", info.Size())
+ vlog.Infof("stopping %s", r.Id())
+ r.Stop()
+
+ // Reopen the persistent store and make sure it matches.
+ r, err = newRaft(ctx, &config, new(client))
+ if err != nil {
+ t.Fatalf("newRaft: %s", err)
+ }
+ vlog.Infof("id is %s", r.Id())
+ if r.p.CurrentTerm() != 2 {
+ t.Fatalf("CurrentTerm: expected %d got %d", 2, r.p.CurrentTerm())
+ }
+ if r.p.VotedFor() != "whocares" {
+ t.Fatalf("CurrentTerm: expected %s got %s", "whocares", r.p.VotedFor())
+ }
+ test := []LogEntry{
+ LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
+ LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
+ LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
+ LogEntry{Term: 3, Index: 4, Cmd: []byte("cmd4")},
+ LogEntry{Term: 3, Index: 5, Cmd: []byte("cmd5")},
+ LogEntry{Term: 3, Index: 6, Cmd: []byte("cmd6")},
+ }
+ compareLogs(t, r.p, test, "after reopen")
+
+ // Truncate the log by rewriting an index.
+ if err := r.p.AppendToLog(ctx, 2, 3, []LogEntry{LogEntry{Term: 4, Index: 4, Cmd: []byte("cmd7")}}); err != nil {
+ t.Fatalf("AppendToLog: %s", err)
+ }
+ test2 := []LogEntry{
+ LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
+ LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
+ LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
+ LogEntry{Term: 4, Index: 4, Cmd: []byte("cmd7")},
+ }
+ compareLogs(t, r.p, test2, "after truncate")
+ vlog.Infof("stopping %s", r.Id())
+ r.Stop()
+
+ // Make sure the log is still truncated when we reread it.
+ r, err = newRaft(ctx, &config, new(client))
+ if err != nil {
+ t.Fatalf("newRaft: %s", err)
+ }
+ vlog.Infof("id is %s", r.Id())
+ compareLogs(t, r.p, test2, "after truncate and close")
+ vlog.Infof("stopping %s", r.Id())
+ r.Stop()
+ vlog.Infof("TestPersistence passed")
+}
+
+func getFileNames(t *testing.T, dir string) []string {
+ d, err := os.Open(dir)
+ if err != nil {
+ t.Fatalf("opening %s: %s", dir, err)
+ }
+ defer d.Close()
+
+ // Find all snapshot and log files.
+ files, err := d.Readdirnames(0)
+ if err != nil {
+ t.Fatalf("reading %s: %s", dir, err)
+ }
+
+ return files
+}
+
+// waitForLeadership waits for r to be elected leader.
+func waitForLeadership(r *raft, timeout time.Duration) bool {
+ start := time.Now()
+ for {
+ _, role, _ := r.Status()
+ if role == RoleLeader {
+ return true
+ }
+ if time.Now().Sub(start) > timeout {
+ return false
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}
+
+// TestSnapshot sets the limit for taking a snapshot down to 1 record and makes sure that snapshots are taken.
+func TestSnapshot(t *testing.T) {
+ vlog.Infof("TestSnapshot")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ td := tempDir(t)
+ defer os.RemoveAll(td)
+
+ config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td, SnapshotThreshold: 1}
+ rs, _ := buildRafts(t, ctx, 1, &config)
+ defer cleanUp(rs)
+ r := rs[0]
+ c1 := r.client
+ vlog.Infof("id is %s", r.Id())
+
+ // This should cause a snapshot.
+ if apperr, err := r.Append(ctx, []byte("string1")); err != nil || apperr != nil {
+ t.Fatalf("AppendToLog: %s/%s", apperr, err)
+ }
+ if apperr, err := r.Append(ctx, []byte("string2")); err != nil || apperr != nil {
+ t.Fatalf("AppendToLog: %s/%s", apperr, err)
+ }
+ if apperr, err := r.Append(ctx, []byte("string3")); err != nil || apperr != nil {
+ t.Fatalf("AppendToLog: %s/%s", apperr, err)
+ }
+
+ // Wait for the snapshot to appear. It is a background task so just loop.
+ deadline := time.Now().Add(time.Minute)
+outer:
+ for {
+ time.Sleep(100 * time.Millisecond)
+ files := getFileNames(t, td)
+ for _, s := range files {
+ if strings.HasSuffix(s, ".snap") {
+ log := strings.TrimSuffix(s, ".snap") + ".log"
+ for _, l := range files {
+ if l == log {
+ break outer
+ }
+ }
+ }
+ }
+ if time.Now().After(deadline) {
+ t.Fatalf("timeout waiting for snap")
+ }
+ }
+ r.Stop()
+
+ // Restart. We should read and restart from the snapshot.
+ rs, _ = buildRafts(t, ctx, 1, &config)
+ defer cleanUp(rs)
+ r = rs[0]
+ c2 := r.client
+ vlog.Infof("new id is %s", r.Id())
+
+ // Wait for leadership.
+ if !waitForLeadership(r, time.Minute) {
+ t.Fatalf("didn't become leader")
+ }
+
+ // Wait to commit.
+ time.Sleep(time.Second)
+ c1.(*client).Compare(t, c2.(*client))
+
+ vlog.Infof("TestSnapshot passed")
+}
+
+// TestRemoteSnapshot makes sure a member can be recovered from a snapshot taken elsewhere.
+func TestRemoteSnapshot(t *testing.T) {
+ vlog.Infof("TestRemoteSnapshot")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ config := RaftConfig{HostPort: "127.0.0.1:0", SnapshotThreshold: 30}
+ rs, cs := buildRafts(t, ctx, 3, &config)
+ defer cleanUp(rs)
+
+ // This should cause a few snapshots.
+ for i := 0; i < 100; i++ {
+ if apperr, err := rs[1].Append(ctx, []byte(fmt.Sprintf("string%d", i))); err != nil || apperr != nil {
+ t.Fatalf("Append: %s/%s", apperr, err)
+ }
+ }
+ vlog.Infof("Appends done")
+
+ // Wait for them all to get to the same point.
+ if !waitForLogAgreement(rs, time.Minute) {
+ t.Fatalf("no log agreement")
+ }
+ t.Log("Logs agree")
+ if !waitForAppliedAgreement(rs, cs, time.Minute) {
+ t.Fatalf("no applied agreement")
+ }
+ vlog.Infof("Applies agree")
+
+ // Stop a server and remove all its log and snapshot info, i.e., act like its starting
+ // from scratch.
+ rs[0].Stop()
+ os.RemoveAll(rs[0].logDir)
+ restart(t, ctx, rs, cs, rs[0])
+
+ // Wait for them all to get to the same point.
+ if !waitForLogAgreement(rs, time.Minute) {
+ t.Fatalf("no log agreement")
+ }
+ if !waitForAppliedAgreement(rs, cs, time.Minute) {
+ t.Fatalf("no applied agreement")
+ }
+
+ vlog.Infof("TestRemoteSnapshot passed")
+}
diff --git a/lib/raft/model.go b/lib/raft/model.go
new file mode 100644
index 0000000..0c5dab2
--- /dev/null
+++ b/lib/raft/model.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+ "io"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/security/access"
+)
+
+// Raft provides a consistent log across multiple instances of a client.
+
+// RaftClient defines the call backs from the Raft library to the application.
+type RaftClient interface {
+ // Apply appies a logged command, 'cmd', to the client. The commands will
+ // be delivered in the same order and with the same 'index' to all clients.
+ // 'index' is a monotonically increasing number and is just an index into the
+ // common log.
+ //
+ // Whenever a client restarts (after a crash perhaps) or falls too far behind
+ // (as in a partitioned network) it will be reinitialized with a RestoreFomSnapshot
+ // and then replayed all subsequent logged commands.
+ //
+ // A client that wishes to may return empty snapshots, i.e., just close the error
+ // channel without writing anything and worry about reliably storing its database
+ // itself. It that case it must remember the highest index it has seen if it wishes
+ // to avoid replays. Hence the index is supplied with the Apply().
+ Apply(cmd []byte, index Index) error
+
+ // SaveToSnapshot requests the application to write a snapshot to 'wr'.
+ // Until SaveToSnapshot returns, no commands will be Apply()ed. Closing
+ // the response channel signals that the snapshot is finished. Any
+ // error written to the response channel will be logged by the library
+ // and the library will discard the snapshot if any error is returned.
+ SaveToSnapshot(ctx *context.T, wr io.Writer, response chan<- error) error
+
+ // RestoreFromSnapshot requests the application to rebuild its database from the snapshot
+ // it must read from 'rd'. 'index' is the last index applied to the snapshot. No Apply()s
+ // will be performed until RestoreFromSnapshot() returns. 'index' can be ignored
+ // or used for debugging.
+ RestoreFromSnapshot(ctx *context.T, index Index, rd io.Reader) error
+}
+
+const (
+ RoleCandidate = iota // Requesting to be voted leader.
+ RoleFollower
+ RoleLeader
+ RoleStopped
+)
+
+type Raft interface {
+ // AddMember adds a new member to the server set. "id" is actually a network address for the member,
+ // currently host:port. This has to be done before starting the server.
+ AddMember(ctx *context.T, id string) error
+
+ // Id returns the id of this member.
+ Id() string
+
+ // Start starts the local server communicating with other members.
+ Start()
+
+ // Stop terminates the server. It cannot be Start'ed again.
+ Stop()
+
+ // Append appends a new command to the replicated log. The command will be Apply()ed at each member
+ // once a quorum has logged it. The Append() will terminate once a quorum has logged it and at least
+ // the leader has Apply()ed the command. 'applyError' is the error returned by the Apply() while
+ // 'raftError' is returned by the raft library itself reporting that the Append could not be
+ // performed.
+ Append(ctx *context.T, cmd []byte) (applyError, raftError error)
+
+ // Status returns the state of the raft.
+ Status() (myId string, role int, leader string)
+
+ // StartElection forces an election. Normally just used for debugging.
+ StartElection()
+}
+
+// RaftConfig is passed to NewRaft to avoid lots of parameters.
+type RaftConfig struct {
+ LogDir string // Directory in which to put log and snapshot files.
+ HostPort string // For RPCs from other members.
+ ServerName string // Where to mount if not empty.
+ Heartbeat time.Duration // Time between heartbeats.
+ SnapshotThreshold int64 // Approximate number of log entries between snapshots.
+ Acl access.AccessList // For sending RPC to the members.
+}
+
+// NewRaft creates a new raft server.
+func NewRaft(ctx *context.T, config *RaftConfig, client RaftClient) (Raft, error) {
+ return newRaft(ctx, config, client)
+}
diff --git a/lib/raft/persistence.go b/lib/raft/persistence.go
new file mode 100644
index 0000000..7ec5128
--- /dev/null
+++ b/lib/raft/persistence.go
@@ -0,0 +1,76 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+ "io"
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+var (
+ errOutOfSequence = verror.Register(pkgPath+".errOutOfSequence", verror.NoRetry, "{1:}{2:} append {3}, {4} out of sequence{:_}")
+)
+
+// persistent represents all the persistent state for the raft algorithm. The division exists should we
+// want to replace the file system based store with something else.
+type persistent interface {
+ // AppendLog appends cmds to the log starting at Index prevIndex+1. It will return
+ // an error if there does not exist a previous command at index preIndex with term
+ // prevTerm. It will also return an error if we cannot write to the persistent store.
+ AppendToLog(ctx *context.T, prevTerm Term, prevIndex Index, entries []LogEntry) error
+
+ // SetCurrentTerm, SetVotedFor, and SetCurrentTermAndVotedFor changes the non-log persistent state.
+ SetCurrentTerm(Term) error
+ IncCurrentTerm() error
+ SetVotedFor(id string) error
+ SetCurrentTermAndVotedFor(term Term, id string) error
+
+ // CurrentTerm returns the current Term.
+ CurrentTerm() Term
+
+ // LastIndex returns the highest index in the log.
+ LastIndex() Index
+
+ // LastTerm returns the highest term in the log.
+ LastTerm() Term
+
+ // GetVotedFor returns the current voted for string.
+ VotedFor() string
+
+ // Close all state. No other calls can be made on this object following
+ // the Close().
+ Close()
+
+ // Lookup returns the log entry at that index or nil if none exists.
+ Lookup(Index) *logEntry
+
+ // LookupPrevious returns the index and term preceding Index. It returns false if there is none.
+ // This is used when appending entries to the log. The leader needs to send the follower the
+ // term and index of the last entry appended to the log, and the follower has to check if it
+ // matches what they have at that point. However, either one may no longer have that entry, the
+ // leader because he is continuing after restoring from a snapshot or the follower because he has
+ // trimmed the log after a snapshot or for either when this is the first log record. I could
+ // have entered fake log entries but this seemed easier since it is closer to the logic from the
+ // raft paper.
+ //
+ // Granted I could avoid returning the previous index and have each client do the index-1. I just
+ // felt like doing it once rather than on every call.
+ LookupPrevious(Index) (Term, Index, bool)
+
+ // ConsiderSnapshot checks to see if it is time for a snapshot and, if so, calls back the client to
+ // generate a snapshot and then trims the log. On return it is safe to continue
+ // RaftClient.Apply()ing commands although the snapshot may still be in progress.
+ ConsiderSnapshot(ctx *context.T, lastAppliedTerm Term, lastAppliedIndex Index)
+
+ // SnapshotFromLeader receives and stores a snapshot from the leader and then restores the
+ // client state from the snapshot. 'lastTermApplied' and 'lastIndexApplied' represent the last
+ // log entry RaftClient.Apply()ed before the snapshot was taken.
+ SnapshotFromLeader(ctx *context.T, lastTermApplied Term, lastIndexApplied Index, call raftProtoInstallSnapshotServerCall) error
+
+ // OpenLatestSnapshot opens the latest snapshot and returns a reader for it. The returned Term
+ // and Index represent the last log entry RaftClient.Appy()ed before the snapshot was taken.
+ OpenLatestSnapshot(ctx *context.T) (io.Reader, Term, Index, error)
+}
diff --git a/lib/raft/raft.go b/lib/raft/raft.go
new file mode 100644
index 0000000..3dac805
--- /dev/null
+++ b/lib/raft/raft.go
@@ -0,0 +1,827 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+// This package implements the Raft protocol, https://ramcloud.stanford.edu/raft.pdf. The
+// logged commands are strings. If someone wishes a more complex command structure, they
+// should use an encoding (e.g. json) into the strings.
+
+import (
+ "io"
+ "math/rand"
+ "sort"
+ "sync"
+ "time"
+
+ "v.io/x/lib/vlog"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/lib.raft"
+
+var (
+ errBadAppend = verror.Register(pkgPath+".errBadAppend", verror.NoRetry, "{1:}{2:} inconsistent append{:_}")
+ errAddAfterStart = verror.Register(pkgPath+".errAddAfterStart", verror.NoRetry, "{1:}{2:} adding member after start{:_}")
+ errNotLeader = verror.Register(pkgPath+".errNotLeader", verror.NoRetry, "{1:}{2:} not the leader{:_}")
+ errWTF = verror.Register(pkgPath+".errWTF", verror.NoRetry, "{1:}{2:} internal error{:_}")
+ errTimedOut = verror.Register(pkgPath+".errTimedOut", verror.NoRetry, "{1:}{2:} request timed out{:_}")
+ errBadTerm = verror.Register(pkgPath+".errBadTerm", verror.NoRetry, "{1:}{2:} new term {3} < {4} {:_}")
+)
+
+// member keeps track of another member's state.
+type member struct {
+ id string
+ nextIndex Index // Next log index to send to this follower.
+ matchIndex Index // Last entry logged by this follower.
+ stopped chan struct{} // Follower go routine closes this to indicate it has terminated.
+ update chan struct{}
+ timer *time.Timer
+}
+
+// memberSlice is used for sorting members by highest logged (matched) entry.
+type memberSlice []*member
+
+func (m memberSlice) Len() int { return len(m) }
+func (m memberSlice) Less(i, j int) bool { return m[i].matchIndex > m[j].matchIndex }
+func (m memberSlice) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
+
+// raft is the implementation of the raft library.
+type raft struct {
+ sync.Mutex
+
+ ctx *context.T
+ cancel context.CancelFunc
+ rng *rand.Rand
+ timer *time.Timer
+ heartbeat time.Duration
+
+ // rpc interface between instances.
+ s service
+
+ // Client interface.
+ client RaftClient
+
+ // applied is the highest log entry applied to the client. All access/update is in a single
+ // go routine.
+ applied struct {
+ sync.RWMutex
+ index Index
+ term Term
+ }
+
+ // Raft algorithm volatile state.
+ role int
+ leader string
+ quorum int // Number of members that form a quorum.
+ commitIndex Index // Highest index commited.
+ memberMap map[string]*member // Map of raft members (including current).
+ memberSet memberSlice // Slice of raft members (including current).
+ me *member
+
+ // Raft algorithm persistent state
+ p persistent
+ logDir string
+
+ // stop and stopped are for clean shutdown. All long lived go routines (perFollower and serverEvents)
+ // exit when stop is closed. Each perFollower then closes member.stopped and serverEvents closes
+ // stopped to signal that they are finished.
+ stop chan struct{} // perFollower and serverEvents go routines exit when this is closed.
+ stopped chan struct{} // serverEvents go routine closes this to indicate it has terminated.
+
+ // Each time a perFollower successfully logs entries on a follower it writes to newMatch to get the
+ // serverEvents routine to possibly update the commitIndex.
+ newMatch chan struct{} // Each time a follower reports a logged entry a message is sent on this.
+
+ // Each time a follower receives a new commit index, it sends it to newCommit to get the serverEvents
+ // routine to apply any newly committed entries.
+ newCommit chan Index // Each received leader commit index is written to this.
+
+ // Wait here for commitIndex to change.
+ ccv *sync.Cond
+
+ // Wait here for leadership to change.
+ lcv *sync.Cond
+}
+
+// logentry is the in memory structure for each logged item. It is
+type logEntry struct {
+ Term Term
+ Index Index
+ Cmd []byte
+ Type byte
+ ApplyError error
+}
+
+// newRaft creates a new raft server.
+// logDir - the name of the directory in which to persist the log.
+// serverName - a name for the server to announce itself as in a mount table. All members should use the
+// same name and hence be alternatives for callers.
+// hostPort - the network address of the server
+// hb - the interval between heartbeats. 0 means use default.
+// snapshotThreshold - the size the log can reach before we create a snapshot. 0 means use default.
+// client - callbacks to the client.
+func newRaft(ctx *context.T, config *RaftConfig, client RaftClient) (*raft, error) {
+ nctx, cancel := context.WithCancel(ctx)
+ r := &raft{}
+ r.ctx = nctx
+ r.cancel = cancel
+ r.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
+ r.heartbeat = config.Heartbeat
+ if r.heartbeat == 0 {
+ r.heartbeat = 3 * time.Second
+ }
+
+ // Client interface.
+ r.client = client
+ r.applied.term = 0
+ r.applied.index = 0
+
+ // Raft volatile state.
+ r.role = RoleStopped
+ r.commitIndex = 0
+ r.leader = ""
+ r.memberMap = make(map[string]*member)
+ r.memberSet = make([]*member, 0)
+ r.AddMember(ctx, config.HostPort)
+ r.me = r.memberMap[config.HostPort]
+
+ // Raft persistent state.
+ var err error
+ r.logDir = config.LogDir
+ if r.p, err = openPersist(ctx, r, config.SnapshotThreshold); err != nil {
+ return nil, err
+ }
+
+ // Internal communication/synchronization.
+ r.newMatch = make(chan struct{}, 100)
+ r.newCommit = make(chan Index, 100)
+ r.ccv = sync.NewCond(r)
+ r.lcv = sync.NewCond(r)
+
+ // The RPC interface to other members.
+ eps, err := r.s.newService(nctx, r, config.ServerName, config.HostPort, config.Acl)
+ if err != nil {
+ return nil, err
+ }
+
+ // If we're in the V namespace, just use the name as our Id. If not create one
+ // from the network address.
+ r.me.id = config.ServerName
+ if r.me.id == "" {
+ r.me.id = string(getShortName(eps[0]))
+ }
+
+ return r, nil
+}
+
+// getShortName will return a /host:port name if possible. Otherwise it will just return the name
+// version of the endpoint.
+func getShortName(ep naming.Endpoint) string {
+ if ep.Addr().Network() != "tcp" {
+ return ep.Name()
+ }
+ return naming.JoinAddressName(ep.Addr().String(), "")
+}
+
+// AddMember adds the id as a raft member. The id must be a vanadium name.
+func (r *raft) AddMember(ctx *context.T, id string) error {
+ if r.role != RoleStopped {
+ // Already started.
+ // TODO(p): Raft has a protocol for changing membership after
+ // start. I'll add that after I get at least one client
+ // working.
+ return verror.New(errAddAfterStart, ctx)
+ }
+ m := &member{id: id, stopped: make(chan struct{}), update: make(chan struct{}, 10)}
+ r.memberMap[id] = m
+ r.memberSet = append(r.memberSet, m)
+ // Quorum has to be more than half the servers.
+ r.quorum = (len(r.memberSet) + 1) / 2
+ return nil
+}
+
+// Id returns the vanadium name of this server.
+func (r *raft) Id() string {
+ return r.me.id
+}
+
+// Start gets the protocol going.
+func (r *raft) Start() {
+ vlog.Infof("@%s starting", r.me.id)
+ r.Lock()
+ defer r.Unlock()
+ if r.role != RoleStopped {
+ // already started
+ return
+ }
+ r.timer = time.NewTimer(2 * r.heartbeat)
+
+ // Signaling for stopping the perFollower and serverEvents go routines.
+ r.stop = make(chan struct{})
+ r.stopped = make(chan struct{})
+
+ // serverEvents serializes events for this server.
+ go r.serverEvents()
+
+ // perFollowers updates the followers when we're the leader.
+ for _, m := range r.memberSet {
+ if m.id != r.me.id {
+ go r.perFollower(m)
+ }
+ }
+ return
+}
+
+// Stop ceases all function as a raft server.
+func (r *raft) Stop() {
+ vlog.Infof("@%s stopping", r.me.id)
+ r.Lock()
+ if r.role == RoleStopped {
+ r.Unlock()
+ return
+ }
+ r.role = RoleStopped
+ r.Unlock()
+ r.cancel()
+
+ // Stop the associated go routines.
+ close(r.stop)
+
+ // Wait for serverEvents to stop.
+ <-r.stopped
+
+ // Wait for all the perFollower routines to stop.
+ for _, m := range r.memberSet {
+ if m.id != r.me.id {
+ <-m.stopped
+ }
+ }
+
+ // Shut down the log file.
+ r.p.Close()
+
+ vlog.Infof("@%s stopping service", r.me.id)
+ r.s.stopService()
+ vlog.Infof("@%s stopped", r.me.id)
+}
+
+// setRoleAndWatchdogTimer called with r.l locked.
+func (r *raft) setRoleAndWatchdogTimer(role int) {
+ vlog.VI(2).Infof("@%s %s->%s", r.me.id, roleToString(r.role), roleToString(role))
+ r.role = role
+ switch role {
+ case RoleFollower:
+ // Wake up any RaftProto.Append()s waiting for a commitment. They
+ // will now have to give up since we are no longer leader.
+ r.ccv.Broadcast()
+ // Set a timer to start an election if we no longer hear from the leader.
+ r.resetTimerFuzzy(2 * r.heartbeat)
+ case RoleLeader:
+ // Set known follower status to default values.
+ for _, m := range r.memberSet {
+ if m.id != r.me.id {
+ m.nextIndex = r.p.LastIndex() + 1
+ m.matchIndex = 0
+ }
+ }
+ r.leader = r.me.id
+ // Tell followers we are now the leader.
+ r.appendNull()
+ case RoleCandidate:
+ // Set a timer to restart the election if noone becomes leader.
+ r.resetTimerFuzzy(r.heartbeat)
+ }
+}
+
+func (r *raft) appendNull() {
+ // Assign an index and term to the log entry.
+ le := LogEntry{Term: r.p.CurrentTerm(), Index: r.p.LastIndex() + 1, Cmd: nil, Type: RaftEntry}
+
+ // Append to our own log.
+ if err := r.p.AppendToLog(r.ctx, r.p.LastTerm(), r.p.LastIndex(), []LogEntry{le}); err != nil {
+ // This shouldn't happen.
+ return
+ }
+
+ // Update the fact that we've logged it.
+ r.setMatchIndex(r.me, le.Index)
+ r.kickFollowers()
+}
+
+// Status returns the current member's id, its raft role, and who it thinks is leader.
+func (r *raft) Status() (string, int, string) {
+ r.Lock()
+ defer r.Unlock()
+ return r.me.id, r.role, r.leader
+}
+
+// StartElection starts a new round of voting. We do this by incrementing the
+// Term and, in parallel, calling each other member to vote. If we receive a
+// majority we win and send a heartbeat to each member.
+//
+// Someone else many get elected in the middle of the vote so we have to
+// make sure we're still a candidate at the end of the voting.
+func (r *raft) StartElection() {
+ r.Lock()
+ defer r.Unlock()
+ r.startElection()
+}
+
+func (r *raft) startElection() {
+ // If we can't get a response in 2 seconds, something is really wrong.
+ ctx, _ := context.WithTimeout(r.ctx, time.Duration(2)*time.Second)
+ if err := r.p.IncCurrentTerm(); err != nil {
+ // If this fails, there's no way to recover.
+ vlog.Fatalf("incrementing current term: %s", err)
+ return
+ }
+ vlog.VI(2).Infof("@%s startElection new term %d", r.me.id, r.p.CurrentTerm())
+
+ msg := []interface{}{
+ r.p.CurrentTerm(),
+ r.me.id,
+ r.p.LastTerm(),
+ r.p.LastIndex(),
+ }
+ var members []string
+ for k, m := range r.memberMap {
+ if m.id == r.me.id {
+ continue
+ }
+ members = append(members, k)
+ }
+ r.setRoleAndWatchdogTimer(RoleCandidate)
+ r.p.SetVotedFor(r.me.id)
+ r.leader = ""
+ r.Unlock()
+
+ // We have to do this outside the lock or the system will deadlock when two members start overlapping votes.
+ c := make(chan bool)
+ for _, id := range members {
+ go func(id string) {
+ var term Term
+ var ok bool
+ client := v23.GetClient(ctx)
+ if err := client.Call(ctx, id, "RequestVote", msg, []interface{}{&term, &ok}, options.Preresolved{}); err != nil {
+ vlog.VI(2).Infof("@%s sending RequestVote to %s: %s", r.me.id, id, err)
+ }
+ c <- ok
+ }(id)
+ }
+
+ // Wait till all the voters have voted or timed out.
+ oks := 1 // We vote for ourselves.
+ for range members {
+ if <-c {
+ oks++
+ }
+ }
+
+ r.Lock()
+ // We have to check the role since someone else may have become the leader during the round and
+ // made us a follower.
+ if oks <= len(members)/2 || r.role != RoleCandidate {
+ vlog.VI(2).Infof("@%s lost election with %d votes", r.me.id, oks)
+ return
+ }
+ vlog.Infof("@%s won election with %d votes", r.me.id, oks)
+ r.setRoleAndWatchdogTimer(RoleLeader)
+ r.leader = r.me.id
+ r.setMatchIndex(r.me, r.p.LastIndex())
+ r.lcv.Broadcast()
+}
+
+// applyCommits applies any committed entries.
+func (r *raft) applyCommits(commitIndex Index) {
+ for r.applied.index < commitIndex {
+ // This is the only go routine that changes r.applied
+ // so we don't have to protect our reads.
+ next := r.applied.index + 1
+ le := r.p.Lookup(next)
+ if le == nil {
+ // Commit index is ahead of our highest entry.
+ return
+ }
+ switch le.Type {
+ case ClientEntry:
+ le.ApplyError = r.client.Apply(le.Cmd, le.Index)
+ case RaftEntry:
+ }
+
+ // But we do have to lock our writes.
+ r.applied.Lock()
+ r.applied.index = next
+ r.applied.term = le.Term
+ r.applied.Unlock()
+ }
+
+ r.p.ConsiderSnapshot(r.ctx, r.applied.term, r.applied.index)
+}
+
+func (r *raft) lastApplied() Index {
+ r.applied.RLock()
+ defer r.applied.RUnlock()
+ return r.applied.index
+}
+
+func (r *raft) resetTimerFuzzy(d time.Duration) {
+ fuzz := time.Duration(rand.Int63n(int64(r.heartbeat)))
+ r.timer.Reset(d + fuzz)
+}
+
+func (r *raft) resetTimer(d time.Duration) {
+ r.timer.Reset(d)
+}
+
+func highestFromChan(i Index, c chan Index) Index {
+ for {
+ select {
+ case j := <-c:
+ if j > i {
+ i = j
+ }
+ default:
+ return i
+ }
+ }
+}
+
+// serverEvents is a go routine that serializes server events. This loop performs:
+// (1) all changes to commitIndex both as a leader and a follower.
+// (2) all application of committed log commands.
+// (3) all elections.
+func (r *raft) serverEvents() {
+ r.Lock()
+ r.setRoleAndWatchdogTimer(RoleFollower)
+ r.Unlock()
+ for {
+ select {
+ case <-r.stop:
+ // Terminate.
+ close(r.stopped)
+ return
+ case <-r.timer.C:
+ // Start an election whenever either:
+ // (1) a follower hasn't heard from the leader in a random interval > 2 * heartbeat.
+ // (2) a candidate hasn't won an election or been told anyone else has after hearbeat.
+ r.Lock()
+ switch r.role {
+ case RoleCandidate:
+ r.startElection()
+ case RoleFollower:
+ r.startElection()
+ }
+ r.Unlock()
+ case <-r.newMatch:
+ // Soak up any queued requests.
+ emptyChan(r.newMatch)
+
+ // This happens whenever we have gotten a reply from a follower. We do it
+ // here rather than in perFollower solely as a matter of taste.
+ // Update the commitIndex if needed and apply any newly committed entries.
+ r.Lock()
+ if r.role != RoleLeader {
+ r.Unlock()
+ continue
+ }
+ sort.Sort(r.memberSet)
+ ci := r.memberSet[r.quorum-1].matchIndex
+ if ci <= r.commitIndex {
+ r.Unlock()
+ continue
+ }
+ r.commitIndex = ci
+ r.Unlock()
+ r.applyCommits(ci)
+ r.ccv.Broadcast()
+ r.kickFollowers()
+ case i := <-r.newCommit:
+ // Get highest queued up commit.
+ i = highestFromChan(i, r.newCommit)
+
+ // Update the commitIndex if needed and apply any newly committed entries.
+ r.Lock()
+ if r.role != RoleFollower {
+ r.Unlock()
+ continue
+ }
+ if i > r.commitIndex {
+ r.commitIndex = i
+ }
+ ci := r.commitIndex
+ r.Unlock()
+ r.applyCommits(ci)
+ r.ccv.Broadcast()
+ }
+ }
+}
+
+// makeAppendMsg creates an append message at most 10 entries long.
+func (r *raft) makeAppendMsg(m *member) ([]interface{}, int) {
+ // Figure out if we know the previous entry.
+ prevTerm, prevIndex, ok := r.p.LookupPrevious(m.nextIndex)
+ if !ok {
+ return nil, 0
+ }
+ // Collect some log entries to send along. 0 is ok.
+ var entries []LogEntry
+ for i := 0; i < 10; i++ {
+ le := r.p.Lookup(m.nextIndex + Index(i))
+ if le == nil {
+ break
+ }
+ entries = append(entries, LogEntry{Cmd: le.Cmd, Term: le.Term, Index: le.Index, Type: le.Type})
+ }
+ return []interface{}{
+ r.p.CurrentTerm(),
+ r.me.id,
+ prevIndex,
+ prevTerm,
+ r.commitIndex,
+ entries,
+ }, len(entries)
+}
+
+// updateFollower loops trying to update a follower until the follower is updated or we can't proceed.
+// It will always send at least one update so will also act as a heartbeat.
+func (r *raft) updateFollower(m *member) {
+ // Bring this server up to date.
+ r.Lock()
+ defer r.Unlock()
+ for {
+ // If we're not the leader we have no followers.
+ if r.role != RoleLeader {
+ return
+ }
+
+ // Collect some log entries starting at m.nextIndex.
+ msg, n := r.makeAppendMsg(m)
+ if msg == nil {
+ // Try sending a snapshot.
+ r.Unlock()
+ vlog.Infof("@%s sending snapshot to %s", r.me.id, m.id)
+ snapIndex, err := r.sendLatestSnapshot(m)
+ r.Lock()
+ if err != nil {
+ // Try again later.
+ vlog.Errorf("@%s sending snapshot to %s: %s", r.me.id, m.id, err)
+ return
+ }
+ m.nextIndex = snapIndex + 1
+ vlog.Infof("@%s sent snapshot to %s", r.me.id, m.id)
+ // Try sending anything following the snapshot.
+ continue
+ }
+
+ // Send to the follower. We drop the lock while we do this. That means we may stop being the
+ // leader in the middle of the call but that's OK as long as we check when we get it back.
+ r.Unlock()
+ ctx, _ := context.WithTimeout(r.ctx, time.Duration(2)*time.Second)
+ client := v23.GetClient(ctx)
+ err := client.Call(ctx, m.id, "AppendToLog", msg, []interface{}{}, options.Preresolved{})
+ r.Lock()
+ if r.role != RoleLeader {
+ // Not leader any more, doesn't matter how he replied.
+ return
+ }
+
+ if err != nil {
+ if verror.ErrorID(err) != errOutOfSequence.ID {
+ // A problem other than missing entries. Retry later.
+ //vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
+ vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
+ return
+ }
+ // At this point we know that the follower is missing entries pervious to what
+ // we just sent. If we can backup, do it. Otherwise try sending a snapshot.
+ if m.nextIndex <= 1 {
+ return
+ }
+ prev := r.p.Lookup(m.nextIndex - 1)
+ if prev == nil {
+ return
+ }
+ // We can back up.
+ m.nextIndex = m.nextIndex - 1
+ continue
+ }
+
+ // The follower appended correctly, update indices and tell the server thread that
+ // the commit index may need to change.
+ m.nextIndex += Index(n)
+ logged := m.nextIndex - 1
+ if n > 0 {
+ r.setMatchIndex(m, logged)
+ }
+
+ // The follower is caught up?
+ if m.nextIndex > r.p.LastIndex() {
+ return
+ }
+ }
+}
+
+func (r *raft) sendLatestSnapshot(m *member) (Index, error) {
+ rd, term, index, err := r.p.OpenLatestSnapshot(r.ctx)
+ if err != nil {
+ return 0, err
+ }
+ ctx, _ := context.WithTimeout(r.ctx, time.Duration(5*60)*time.Second)
+ client := raftProtoClient(m.id)
+ call, err := client.InstallSnapshot(ctx, r.p.CurrentTerm(), r.me.id, term, index, options.Preresolved{})
+ if err != nil {
+ return 0, err
+ }
+ sstream := call.SendStream()
+ b := make([]byte, 10240)
+ for {
+ n, err := rd.Read(b)
+ if n == 0 && err == io.EOF {
+ break
+ }
+ if err = sstream.Send(b); err != nil {
+ return 0, err
+ }
+ }
+ if err := call.Finish(); err != nil {
+ return 0, err
+ }
+ return index, nil
+}
+
+func emptyChan(c chan struct{}) {
+ for {
+ select {
+ case <-c:
+ default:
+ return
+ }
+ }
+}
+
+// perFollower is a go routine that sequences all messages to a single follower.
+//
+// This is the only go routine that updates the follower's variables so all changes to
+// the member struct are serialized by it.
+func (r *raft) perFollower(m *member) {
+ m.timer = time.NewTimer(r.heartbeat)
+ for {
+ select {
+ case <-m.timer.C:
+ r.updateFollower(m)
+ m.timer.Reset(r.heartbeat)
+ case <-m.update:
+ // Soak up any waiting update requests
+ emptyChan(m.update)
+ r.updateFollower(m)
+ m.timer.Reset(r.heartbeat)
+ case <-r.stop:
+ close(m.stopped)
+ return
+ }
+ }
+}
+
+// kickFollowers causes each perFollower routine to try to update its followers.
+func (r *raft) kickFollowers() {
+ for _, m := range r.memberMap {
+ select {
+ case m.update <- struct{}{}:
+ default:
+ }
+ }
+}
+
+// setMatchIndex updates the matchIndex for a member.
+//
+// called with r locked.
+func (r *raft) setMatchIndex(m *member, i Index) {
+ m.matchIndex = i
+ if i <= r.commitIndex {
+ return
+ }
+ // Check if we need to change the commit index.
+ select {
+ case r.newMatch <- struct{}{}:
+ default:
+ }
+}
+
+func minIndex(indices ...Index) Index {
+ if len(indices) == 0 {
+ return 0
+ }
+ min := indices[0]
+ for _, x := range indices[1:] {
+ if x < min {
+ min = x
+ }
+ }
+ return min
+}
+
+func roleToString(r int) string {
+ switch r {
+ case RoleCandidate:
+ return "candidate"
+ case RoleLeader:
+ return "leader"
+ case RoleFollower:
+ return "follower"
+ }
+ return "?"
+}
+
+func (r *raft) waitForApply(ctx *context.T, term Term, index Index) (error, error) {
+ for {
+ r.applied.RLock()
+ applied := r.applied.index
+ r.applied.RUnlock()
+
+ if applied >= index {
+ le := r.p.Lookup(index)
+ if le == nil || le.Term != term {
+ // There was an election and the log entry was lost.
+ return nil, verror.New(errNotLeader, ctx)
+ }
+ return le.ApplyError, nil
+ }
+
+ // Give up if the caller doesn't want to wait.
+ select {
+ case <-ctx.Done():
+ return nil, verror.New(errTimedOut, ctx)
+ default:
+ }
+
+ // Wait for an apply to happen.
+ r.Lock()
+ r.ccv.Wait()
+ r.Unlock()
+ }
+}
+
+// Append tells the leader to append to the log. The first error is the result of the client.Apply. The second
+// is any error from raft.
+func (r *raft) Append(ctx *context.T, cmd []byte) (applyErr error, err error) {
+ for {
+ r.Lock()
+ if len(r.leader) == 0 {
+ r.lcv.Wait()
+ }
+ leader := r.leader
+ role := r.role
+ r.Unlock()
+
+ switch role {
+ case RoleLeader:
+ term, index, err := r.s.Append(ctx, nil, cmd)
+ if err == nil {
+ // We were the leader and the entry has now been applied.
+ return r.waitForApply(ctx, term, index)
+ }
+ // If the leader can't do it, give up.
+ if verror.ErrorID(err) != errNotLeader.ID {
+ return nil, err
+ }
+ case RoleFollower:
+ client := v23.GetClient(ctx)
+ var index Index
+ var term Term
+ if len(leader) == 0 {
+ break
+ }
+ if err = client.Call(ctx, leader, "Append", []interface{}{cmd}, []interface{}{&term, &index}, options.Preresolved{}); err == nil {
+ return r.waitForApply(ctx, term, index)
+ }
+ // If the leader can't do it, give up.
+ if verror.ErrorID(err) != errNotLeader.ID {
+ return nil, err
+ }
+ }
+
+ // Give up if the caller doesn't want to wait.
+ select {
+ case <-ctx.Done():
+ err = verror.New(errTimedOut, ctx)
+ return
+ default:
+ }
+ }
+}
+
+func (r *raft) Leader() (bool, string) {
+ r.Lock()
+ defer r.Unlock()
+ if r.role == RoleLeader {
+ return true, r.leader
+ }
+ return false, r.leader
+}
diff --git a/lib/raft/raft.vdl b/lib/raft/raft.vdl
new file mode 100644
index 0000000..bcc8912
--- /dev/null
+++ b/lib/raft/raft.vdl
@@ -0,0 +1,75 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+// This vdl defines the interface between raft members and not the library interface to raft.
+
+// Every log entry is represented by a Term and an Index.
+
+// Term is a counter incremented each time a member starts an election. The log will
+// show gaps in Term numbers because all elections need not be successful.
+type Term uint64
+
+// Index is an index into the log. The log entries are numbered sequentially. At the moment
+// the entries RaftClient.Apply()ed should be sequential but that will change if we introduce
+// system entries. For example, we could have an entry type that is used to add members to the
+// set of replicas.
+type Index uint64
+
+// LogEntry types.
+const (
+ ClientEntry = byte(0)
+ RaftEntry = byte(1)
+)
+
+// The LogEntry is what the log consists of. 'error' starts nil and is never written to stable
+// storage. It represents the result of RaftClient.Apply(Cmd, Index). This is a hack but I
+// haven't figured out a better way.
+type LogEntry struct {
+ Term Term
+ Index Index
+ Cmd []byte
+ Type byte
+}
+
+// raftProto is used by the members of a raft set to communicate with each other.
+type raftProto interface {
+ // Members returns the current set of ids of raft members.
+ Members() ([]string | error)
+
+ // Leader returns the id of the current leader.
+ Leader() (string | error)
+
+ // RequestVote starts a new round of voting. It returns the server's current Term and true if
+ // the server voted for the client.
+ RequestVote(term Term, candidateId string, lastLogTerm Term, lastLogIndex Index) (Term Term, Granted bool | error)
+
+ // AppendToLog is sent by the leader to tell followers to append an entry. If cmds
+ // is empty, this is a keep alive message (at a random interval after a keep alive, followers
+ // will initiate a new round of voting).
+ // term -- the current term of the sender
+ // leaderId -- the id of the sender
+ // prevIndex -- the index of the log entry immediately preceding cmds
+ // prevTerm -- the term of the log entry immediately preceding cmds. The receiver must have
+ // received the previous index'd entry and it must have had the same term. Otherwise
+ // an error is returned.
+ // leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+ // to have logged.
+ // cmds -- sequential log entries starting at prevIndex+1
+ AppendToLog(term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry) error
+
+ // Append is sent to the leader by followers. Only the leader is allowed to send AppendToLog.
+ // If a follower receives an Append() call it performs an Append() to the leader to run the actual
+ // Raft algorithm. The leader will respond after it has RaftClient.Apply()ed the command.
+ //
+ // Returns the term and index of the append entry or an error.
+ Append(cmd []byte) (term Term, index Index | error)
+
+ // InstallSnapshot is sent from the leader to follower to install the given snapshot. It is
+ // sent when it becomes apparent that the leader does not have log entries needed by the follower
+ // to progress. 'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+ // snapshot.
+ InstallSnapshot(term Term, leaderId string, appliedTerm Term, appliedIndex Index) stream<[]byte> error
+}
diff --git a/lib/raft/raft.vdl.go b/lib/raft/raft.vdl.go
new file mode 100644
index 0000000..c579d09
--- /dev/null
+++ b/lib/raft/raft.vdl.go
@@ -0,0 +1,484 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: raft.vdl
+
+package raft
+
+import (
+ // VDL system imports
+ "io"
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/vdl"
+)
+
+// Term is a counter incremented each time a member starts an election. The log will
+// show gaps in Term numbers because all elections need not be successful.
+type Term uint64
+
+func (Term) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/raft.Term"`
+}) {
+}
+
+// Index is an index into the log. The log entries are numbered sequentially. At the moment
+// the entries RaftClient.Apply()ed should be sequential but that will change if we introduce
+// system entries. For example, we could have an entry type that is used to add members to the
+// set of replicas.
+type Index uint64
+
+func (Index) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/raft.Index"`
+}) {
+}
+
+// The LogEntry is what the log consists of. 'error' starts nil and is never written to stable
+// storage. It represents the result of RaftClient.Apply(Cmd, Index). This is a hack but I
+// haven't figured out a better way.
+type LogEntry struct {
+ Term Term
+ Index Index
+ Cmd []byte
+ Type byte
+}
+
+func (LogEntry) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/raft.LogEntry"`
+}) {
+}
+
+func init() {
+ vdl.Register((*Term)(nil))
+ vdl.Register((*Index)(nil))
+ vdl.Register((*LogEntry)(nil))
+}
+
+const ClientEntry = byte(0)
+
+const RaftEntry = byte(1)
+
+// raftProtoClientMethods is the client interface
+// containing raftProto methods.
+//
+// raftProto is used by the members of a raft set to communicate with each other.
+type raftProtoClientMethods interface {
+ // Members returns the current set of ids of raft members.
+ Members(*context.T, ...rpc.CallOpt) ([]string, error)
+ // Leader returns the id of the current leader.
+ Leader(*context.T, ...rpc.CallOpt) (string, error)
+ // RequestVote starts a new round of voting. It returns the server's current Term and true if
+ // the server voted for the client.
+ RequestVote(_ *context.T, term Term, candidateId string, lastLogTerm Term, lastLogIndex Index, _ ...rpc.CallOpt) (Term Term, Granted bool, _ error)
+ // AppendToLog is sent by the leader to tell followers to append an entry. If cmds
+ // is empty, this is a keep alive message (at a random interval after a keep alive, followers
+ // will initiate a new round of voting).
+ // term -- the current term of the sender
+ // leaderId -- the id of the sender
+ // prevIndex -- the index of the log entry immediately preceding cmds
+ // prevTerm -- the term of the log entry immediately preceding cmds. The receiver must have
+ // received the previous index'd entry and it must have had the same term. Otherwise
+ // an error is returned.
+ // leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+ // to have logged.
+ // cmds -- sequential log entries starting at prevIndex+1
+ AppendToLog(_ *context.T, term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry, _ ...rpc.CallOpt) error
+ // Append is sent to the leader by followers. Only the leader is allowed to send AppendToLog.
+ // If a follower receives an Append() call it performs an Append() to the leader to run the actual
+ // Raft algorithm. The leader will respond after it has RaftClient.Apply()ed the command.
+ //
+ // Returns the term and index of the append entry or an error.
+ Append(_ *context.T, cmd []byte, _ ...rpc.CallOpt) (term Term, index Index, _ error)
+ // InstallSnapshot is sent from the leader to follower to install the given snapshot. It is
+ // sent when it becomes apparent that the leader does not have log entries needed by the follower
+ // to progress. 'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+ // snapshot.
+ InstallSnapshot(_ *context.T, term Term, leaderId string, appliedTerm Term, appliedIndex Index, _ ...rpc.CallOpt) (raftProtoInstallSnapshotClientCall, error)
+}
+
+// raftProtoClientStub adds universal methods to raftProtoClientMethods.
+type raftProtoClientStub interface {
+ raftProtoClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// raftProtoClient returns a client stub for raftProto.
+func raftProtoClient(name string) raftProtoClientStub {
+ return implraftProtoClientStub{name}
+}
+
+type implraftProtoClientStub struct {
+ name string
+}
+
+func (c implraftProtoClientStub) Members(ctx *context.T, opts ...rpc.CallOpt) (o0 []string, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "Members", nil, []interface{}{&o0}, opts...)
+ return
+}
+
+func (c implraftProtoClientStub) Leader(ctx *context.T, opts ...rpc.CallOpt) (o0 string, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "Leader", nil, []interface{}{&o0}, opts...)
+ return
+}
+
+func (c implraftProtoClientStub) RequestVote(ctx *context.T, i0 Term, i1 string, i2 Term, i3 Index, opts ...rpc.CallOpt) (o0 Term, o1 bool, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "RequestVote", []interface{}{i0, i1, i2, i3}, []interface{}{&o0, &o1}, opts...)
+ return
+}
+
+func (c implraftProtoClientStub) AppendToLog(ctx *context.T, i0 Term, i1 string, i2 Index, i3 Term, i4 Index, i5 []LogEntry, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "AppendToLog", []interface{}{i0, i1, i2, i3, i4, i5}, nil, opts...)
+ return
+}
+
+func (c implraftProtoClientStub) Append(ctx *context.T, i0 []byte, opts ...rpc.CallOpt) (o0 Term, o1 Index, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "Append", []interface{}{i0}, []interface{}{&o0, &o1}, opts...)
+ return
+}
+
+func (c implraftProtoClientStub) InstallSnapshot(ctx *context.T, i0 Term, i1 string, i2 Term, i3 Index, opts ...rpc.CallOpt) (ocall raftProtoInstallSnapshotClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "InstallSnapshot", []interface{}{i0, i1, i2, i3}, opts...); err != nil {
+ return
+ }
+ ocall = &implraftProtoInstallSnapshotClientCall{ClientCall: call}
+ return
+}
+
+// raftProtoInstallSnapshotClientStream is the client stream for raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotClientStream interface {
+ // SendStream returns the send side of the raftProto.InstallSnapshot client stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors
+ // encountered while sending, or if Send is called after Close or
+ // the stream has been canceled. Blocks if there is no buffer
+ // space; will unblock when buffer space is available or after
+ // the stream has been canceled.
+ Send(item []byte) error
+ // Close indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items.
+ // This is an optional call - e.g. a client might call Close if it
+ // needs to continue receiving items from the server after it's
+ // done sending. Returns errors encountered while closing, or if
+ // Close is called after the stream has been canceled. Like Send,
+ // blocks if there is no buffer space available.
+ Close() error
+ }
+}
+
+// raftProtoInstallSnapshotClientCall represents the call returned from raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotClientCall interface {
+ raftProtoInstallSnapshotClientStream
+ // Finish performs the equivalent of SendStream().Close, then blocks until
+ // the server is done, and returns the positional return values for the call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implraftProtoInstallSnapshotClientCall struct {
+ rpc.ClientCall
+}
+
+func (c *implraftProtoInstallSnapshotClientCall) SendStream() interface {
+ Send(item []byte) error
+ Close() error
+} {
+ return implraftProtoInstallSnapshotClientCallSend{c}
+}
+
+type implraftProtoInstallSnapshotClientCallSend struct {
+ c *implraftProtoInstallSnapshotClientCall
+}
+
+func (c implraftProtoInstallSnapshotClientCallSend) Send(item []byte) error {
+ return c.c.Send(item)
+}
+func (c implraftProtoInstallSnapshotClientCallSend) Close() error {
+ return c.c.CloseSend()
+}
+func (c *implraftProtoInstallSnapshotClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
+// raftProtoServerMethods is the interface a server writer
+// implements for raftProto.
+//
+// raftProto is used by the members of a raft set to communicate with each other.
+type raftProtoServerMethods interface {
+ // Members returns the current set of ids of raft members.
+ Members(*context.T, rpc.ServerCall) ([]string, error)
+ // Leader returns the id of the current leader.
+ Leader(*context.T, rpc.ServerCall) (string, error)
+ // RequestVote starts a new round of voting. It returns the server's current Term and true if
+ // the server voted for the client.
+ RequestVote(_ *context.T, _ rpc.ServerCall, term Term, candidateId string, lastLogTerm Term, lastLogIndex Index) (Term Term, Granted bool, _ error)
+ // AppendToLog is sent by the leader to tell followers to append an entry. If cmds
+ // is empty, this is a keep alive message (at a random interval after a keep alive, followers
+ // will initiate a new round of voting).
+ // term -- the current term of the sender
+ // leaderId -- the id of the sender
+ // prevIndex -- the index of the log entry immediately preceding cmds
+ // prevTerm -- the term of the log entry immediately preceding cmds. The receiver must have
+ // received the previous index'd entry and it must have had the same term. Otherwise
+ // an error is returned.
+ // leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+ // to have logged.
+ // cmds -- sequential log entries starting at prevIndex+1
+ AppendToLog(_ *context.T, _ rpc.ServerCall, term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry) error
+ // Append is sent to the leader by followers. Only the leader is allowed to send AppendToLog.
+ // If a follower receives an Append() call it performs an Append() to the leader to run the actual
+ // Raft algorithm. The leader will respond after it has RaftClient.Apply()ed the command.
+ //
+ // Returns the term and index of the append entry or an error.
+ Append(_ *context.T, _ rpc.ServerCall, cmd []byte) (term Term, index Index, _ error)
+ // InstallSnapshot is sent from the leader to follower to install the given snapshot. It is
+ // sent when it becomes apparent that the leader does not have log entries needed by the follower
+ // to progress. 'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+ // snapshot.
+ InstallSnapshot(_ *context.T, _ raftProtoInstallSnapshotServerCall, term Term, leaderId string, appliedTerm Term, appliedIndex Index) error
+}
+
+// raftProtoServerStubMethods is the server interface containing
+// raftProto methods, as expected by rpc.Server.
+// The only difference between this interface and raftProtoServerMethods
+// is the streaming methods.
+type raftProtoServerStubMethods interface {
+ // Members returns the current set of ids of raft members.
+ Members(*context.T, rpc.ServerCall) ([]string, error)
+ // Leader returns the id of the current leader.
+ Leader(*context.T, rpc.ServerCall) (string, error)
+ // RequestVote starts a new round of voting. It returns the server's current Term and true if
+ // the server voted for the client.
+ RequestVote(_ *context.T, _ rpc.ServerCall, term Term, candidateId string, lastLogTerm Term, lastLogIndex Index) (Term Term, Granted bool, _ error)
+ // AppendToLog is sent by the leader to tell followers to append an entry. If cmds
+ // is empty, this is a keep alive message (at a random interval after a keep alive, followers
+ // will initiate a new round of voting).
+ // term -- the current term of the sender
+ // leaderId -- the id of the sender
+ // prevIndex -- the index of the log entry immediately preceding cmds
+ // prevTerm -- the term of the log entry immediately preceding cmds. The receiver must have
+ // received the previous index'd entry and it must have had the same term. Otherwise
+ // an error is returned.
+ // leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+ // to have logged.
+ // cmds -- sequential log entries starting at prevIndex+1
+ AppendToLog(_ *context.T, _ rpc.ServerCall, term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry) error
+ // Append is sent to the leader by followers. Only the leader is allowed to send AppendToLog.
+ // If a follower receives an Append() call it performs an Append() to the leader to run the actual
+ // Raft algorithm. The leader will respond after it has RaftClient.Apply()ed the command.
+ //
+ // Returns the term and index of the append entry or an error.
+ Append(_ *context.T, _ rpc.ServerCall, cmd []byte) (term Term, index Index, _ error)
+ // InstallSnapshot is sent from the leader to follower to install the given snapshot. It is
+ // sent when it becomes apparent that the leader does not have log entries needed by the follower
+ // to progress. 'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+ // snapshot.
+ InstallSnapshot(_ *context.T, _ *raftProtoInstallSnapshotServerCallStub, term Term, leaderId string, appliedTerm Term, appliedIndex Index) error
+}
+
+// raftProtoServerStub adds universal methods to raftProtoServerStubMethods.
+type raftProtoServerStub interface {
+ raftProtoServerStubMethods
+ // Describe the raftProto interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// raftProtoServer returns a server stub for raftProto.
+// It converts an implementation of raftProtoServerMethods into
+// an object that may be used by rpc.Server.
+func raftProtoServer(impl raftProtoServerMethods) raftProtoServerStub {
+ stub := implraftProtoServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implraftProtoServerStub struct {
+ impl raftProtoServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implraftProtoServerStub) Members(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ return s.impl.Members(ctx, call)
+}
+
+func (s implraftProtoServerStub) Leader(ctx *context.T, call rpc.ServerCall) (string, error) {
+ return s.impl.Leader(ctx, call)
+}
+
+func (s implraftProtoServerStub) RequestVote(ctx *context.T, call rpc.ServerCall, i0 Term, i1 string, i2 Term, i3 Index) (Term, bool, error) {
+ return s.impl.RequestVote(ctx, call, i0, i1, i2, i3)
+}
+
+func (s implraftProtoServerStub) AppendToLog(ctx *context.T, call rpc.ServerCall, i0 Term, i1 string, i2 Index, i3 Term, i4 Index, i5 []LogEntry) error {
+ return s.impl.AppendToLog(ctx, call, i0, i1, i2, i3, i4, i5)
+}
+
+func (s implraftProtoServerStub) Append(ctx *context.T, call rpc.ServerCall, i0 []byte) (Term, Index, error) {
+ return s.impl.Append(ctx, call, i0)
+}
+
+func (s implraftProtoServerStub) InstallSnapshot(ctx *context.T, call *raftProtoInstallSnapshotServerCallStub, i0 Term, i1 string, i2 Term, i3 Index) error {
+ return s.impl.InstallSnapshot(ctx, call, i0, i1, i2, i3)
+}
+
+func (s implraftProtoServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implraftProtoServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{raftProtoDesc}
+}
+
+// raftProtoDesc describes the raftProto interface.
+var raftProtoDesc rpc.InterfaceDesc = descraftProto
+
+// descraftProto hides the desc to keep godoc clean.
+var descraftProto = rpc.InterfaceDesc{
+ Name: "raftProto",
+ PkgPath: "v.io/x/ref/lib/raft",
+ Doc: "// raftProto is used by the members of a raft set to communicate with each other.",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "Members",
+ Doc: "// Members returns the current set of ids of raft members.",
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "Leader",
+ Doc: "// Leader returns the id of the current leader.",
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // string
+ },
+ },
+ {
+ Name: "RequestVote",
+ Doc: "// RequestVote starts a new round of voting. It returns the server's current Term and true if\n// the server voted for the client.",
+ InArgs: []rpc.ArgDesc{
+ {"term", ``}, // Term
+ {"candidateId", ``}, // string
+ {"lastLogTerm", ``}, // Term
+ {"lastLogIndex", ``}, // Index
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"Term", ``}, // Term
+ {"Granted", ``}, // bool
+ },
+ },
+ {
+ Name: "AppendToLog",
+ Doc: "// AppendToLog is sent by the leader to tell followers to append an entry. If cmds\n// is empty, this is a keep alive message (at a random interval after a keep alive, followers\n// will initiate a new round of voting).\n// term -- the current term of the sender\n// leaderId -- the id of the sender\n// prevIndex -- the index of the log entry immediately preceding cmds\n// prevTerm -- the term of the log entry immediately preceding cmds. The receiver must have\n// received the previous index'd entry and it must have had the same term. Otherwise\n// an error is returned.\n// leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed\n// to have logged.\n// cmds -- sequential log entries starting at prevIndex+1",
+ InArgs: []rpc.ArgDesc{
+ {"term", ``}, // Term
+ {"leaderId", ``}, // string
+ {"prevIndex", ``}, // Index
+ {"prevTerm", ``}, // Term
+ {"leaderCommit", ``}, // Index
+ {"cmds", ``}, // []LogEntry
+ },
+ },
+ {
+ Name: "Append",
+ Doc: "// Append is sent to the leader by followers. Only the leader is allowed to send AppendToLog.\n// If a follower receives an Append() call it performs an Append() to the leader to run the actual\n// Raft algorithm. The leader will respond after it has RaftClient.Apply()ed the command.\n//\n// Returns the term and index of the append entry or an error.",
+ InArgs: []rpc.ArgDesc{
+ {"cmd", ``}, // []byte
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"term", ``}, // Term
+ {"index", ``}, // Index
+ },
+ },
+ {
+ Name: "InstallSnapshot",
+ Doc: "// InstallSnapshot is sent from the leader to follower to install the given snapshot. It is\n// sent when it becomes apparent that the leader does not have log entries needed by the follower\n// to progress. 'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the\n// snapshot.",
+ InArgs: []rpc.ArgDesc{
+ {"term", ``}, // Term
+ {"leaderId", ``}, // string
+ {"appliedTerm", ``}, // Term
+ {"appliedIndex", ``}, // Index
+ },
+ },
+ },
+}
+
+// raftProtoInstallSnapshotServerStream is the server stream for raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotServerStream interface {
+ // RecvStream returns the receiver side of the raftProto.InstallSnapshot server stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() []byte
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+}
+
+// raftProtoInstallSnapshotServerCall represents the context passed to raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotServerCall interface {
+ rpc.ServerCall
+ raftProtoInstallSnapshotServerStream
+}
+
+// raftProtoInstallSnapshotServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements raftProtoInstallSnapshotServerCall.
+type raftProtoInstallSnapshotServerCallStub struct {
+ rpc.StreamServerCall
+ valRecv []byte
+ errRecv error
+}
+
+// Init initializes raftProtoInstallSnapshotServerCallStub from rpc.StreamServerCall.
+func (s *raftProtoInstallSnapshotServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the raftProto.InstallSnapshot server stream.
+func (s *raftProtoInstallSnapshotServerCallStub) RecvStream() interface {
+ Advance() bool
+ Value() []byte
+ Err() error
+} {
+ return implraftProtoInstallSnapshotServerCallRecv{s}
+}
+
+type implraftProtoInstallSnapshotServerCallRecv struct {
+ s *raftProtoInstallSnapshotServerCallStub
+}
+
+func (s implraftProtoInstallSnapshotServerCallRecv) Advance() bool {
+ s.s.errRecv = s.s.Recv(&s.s.valRecv)
+ return s.s.errRecv == nil
+}
+func (s implraftProtoInstallSnapshotServerCallRecv) Value() []byte {
+ return s.s.valRecv
+}
+func (s implraftProtoInstallSnapshotServerCallRecv) Err() error {
+ if s.s.errRecv == io.EOF {
+ return nil
+ }
+ return s.s.errRecv
+}
diff --git a/lib/raft/raft_test.go b/lib/raft/raft_test.go
new file mode 100644
index 0000000..c6033f1
--- /dev/null
+++ b/lib/raft/raft_test.go
@@ -0,0 +1,158 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "v.io/v23"
+ "v.io/x/lib/vlog"
+ _ "v.io/x/ref/runtime/factories/generic"
+)
+
+// waitForLogAgreement makes sure all working servers agree on the log.
+func waitForLogAgreement(rs []*raft, timeout time.Duration) bool {
+ start := time.Now()
+ for {
+ indexMap := make(map[Index]string)
+ termMap := make(map[Term]string)
+ for _, r := range rs {
+ id := r.Id()
+ indexMap[r.p.LastIndex()] = id
+ termMap[r.p.LastTerm()] = id
+ }
+ if len(indexMap) == 1 && len(termMap) == 1 {
+ vlog.Infof("tada, all logs agree at %v@%v", indexMap, termMap)
+ return true
+ }
+ if time.Now().Sub(start) > timeout {
+ vlog.Errorf("oops, logs disagree %v@%v", indexMap, termMap)
+ return false
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}
+
+// waitForAppliedAgreement makes sure all working servers have applied all logged commands.
+func waitForAppliedAgreement(rs []*raft, cs []*client, timeout time.Duration) bool {
+ start := time.Now()
+ for {
+ notyet := false
+ indexMap := make(map[Index]string)
+ termMap := make(map[Term]string)
+ for _, r := range rs {
+ id, role, _ := r.Status()
+ if role == RoleStopped {
+ vlog.Infof("ignoring %s", id)
+ continue
+ }
+ if r.p.LastIndex() != r.lastApplied() {
+ notyet = true
+ break
+ }
+ indexMap[r.p.LastIndex()] = id
+ termMap[r.p.LastTerm()] = id
+ }
+ if len(indexMap) == 1 && len(termMap) == 1 && !notyet {
+ vlog.Infof("tada, all applys agree at %v@%v", indexMap, termMap)
+ return true
+ }
+ if time.Now().Sub(start) > timeout {
+ vlog.Errorf("oops, applys disagree %v@%v", indexMap, termMap)
+ return false
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}
+
+// waitForAllRunning waits until all servers are in leader or follower state.
+func waitForAllRunning(rs []*raft, timeout time.Duration) bool {
+ n := len(rs)
+ start := time.Now()
+ for {
+ i := 0
+ for _, r := range rs {
+ _, role, _ := r.Status()
+ if role == RoleLeader || role == RoleFollower {
+ i++
+ }
+ }
+ if i == n {
+ return true
+ }
+ if time.Now().Sub(start) > timeout {
+ return false
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+}
+
+func TestAppend(t *testing.T) {
+ vlog.Infof("TestAppend")
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ rs, cs := buildRafts(t, ctx, 3, nil)
+ defer cleanUp(rs)
+ thb := rs[0].heartbeat
+
+ // One of the raft members should time out not hearing a leader and start an election.
+ r1 := waitForElection(t, rs, 5*thb)
+ if r1 == nil {
+ t.Fatalf("too long to find a leader")
+ }
+ time.Sleep(time.Millisecond)
+ if !waitForLeaderAgreement(rs, thb) {
+ t.Fatalf("no leader agreement")
+ }
+
+ // Append some entries.
+ start := time.Now()
+ for i := 0; i < 100; i++ {
+ cmd := fmt.Sprintf("the rain in spain %d", i)
+ if apperr, err := r1.Append(ctx, []byte(cmd)); apperr != nil || err != nil {
+ t.Fatalf("append %s failed with %s", cmd, err)
+ }
+ }
+ if !waitForAppliedAgreement(rs, cs, 2*thb) {
+ t.Fatalf("no log agreement")
+ }
+ vlog.Infof("appends took %s", time.Now().Sub(start))
+
+ // Kill off one instance and see if we keep committing.
+ r1.Stop()
+ r2 := waitForElection(t, rs, 5*thb)
+ if r2 == nil {
+ t.Fatalf("too long to find a leader")
+ }
+ if !waitForLeaderAgreement(rs, thb) {
+ t.Fatalf("no leader agreement")
+ }
+
+ // Append some entries.
+ for i := 0; i < 10; i++ {
+ cmd := fmt.Sprintf("the rain in spain %d", i+10)
+ if apperr, err := r2.Append(ctx, []byte(cmd)); apperr != nil || err != nil {
+ t.Fatalf("append %s failed with %s", cmd, err)
+ }
+ }
+ if !waitForAppliedAgreement(rs, cs, thb) {
+ t.Fatalf("no log agreement")
+ }
+
+ // Restart the stopped server and wait for it to become a follower.
+ restart(t, ctx, rs, cs, r1)
+ if !waitForAllRunning(rs, 3*thb) {
+ t.Fatalf("server didn't joing after restart")
+ }
+
+ if !waitForAppliedAgreement(rs, cs, 3*thb) {
+ t.Fatalf("no log agreement")
+ }
+
+ vlog.Infof("TestAppend passed")
+}
diff --git a/lib/raft/service.go b/lib/raft/service.go
new file mode 100644
index 0000000..4daab9d
--- /dev/null
+++ b/lib/raft/service.go
@@ -0,0 +1,214 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+ "reflect"
+
+ "v.io/x/lib/vlog"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
+ "v.io/v23/verror"
+)
+
+// service is the implementation of the raftProto. It is only used between members
+// to communicate with each other.
+type service struct {
+ r *raft
+ server rpc.Server
+ serverName string
+ acl access.AccessList
+}
+
+// newService creates a new service for the raft protocol and returns its network endpoints.
+func (s *service) newService(ctx *context.T, r *raft, serverName, hostPort string, acl access.AccessList) ([]naming.Endpoint, error) {
+ var err error
+ s.r = r
+ s.serverName = serverName
+ s.acl = acl
+ ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", hostPort}}})
+ _, s.server, err = v23.WithNewDispatchingServer(ctx, serverName, s)
+ if err != nil {
+ return nil, err
+ }
+ return s.server.Status().Endpoints, nil
+}
+
+func (s *service) stopService() {
+ s.server.Stop()
+}
+
+// Lookup implements rpc.Dispatcher.Lookup.
+func (s *service) Lookup(ctx *context.T, name string) (interface{}, security.Authorizer, error) {
+ return raftProtoServer(s), s, nil
+}
+
+// Authorize allows anyone matching the ACL from the RaftConfig or with the same public
+// key as the server.
+func (s *service) Authorize(ctx *context.T, call security.Call) error {
+ if names, _ := security.RemoteBlessingNames(ctx, call); len(names) != 0 {
+ if s.acl.Includes(names...) {
+ return nil
+ }
+ }
+ if l, r := call.LocalBlessings().PublicKey(), call.RemoteBlessings().PublicKey(); l != nil && reflect.DeepEqual(l, r) {
+ return nil
+ }
+ return verror.New(verror.ErrNoAccess, ctx)
+}
+
+// Members implements raftProto.Members.
+func (s *service) Members(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ r := s.r
+ r.Lock()
+ defer r.Unlock()
+
+ var members []string
+ for m := range r.memberMap {
+ members = append(members, m)
+ }
+ return members, nil
+}
+
+// Members implements raftProto.Leader.
+func (s *service) Leader(ctx *context.T, call rpc.ServerCall) (string, error) {
+ r := s.r
+ r.Lock()
+ defer r.Unlock()
+ return r.leader, nil
+}
+
+// RequestVote implements raftProto.RequestVote.
+func (s *service) RequestVote(ctx *context.T, call rpc.ServerCall, term Term, candidate string, lastLogTerm Term, lastLogIndex Index) (Term, bool, error) {
+ r := s.r
+
+ // The voting needs to be atomic.
+ r.Lock()
+ defer r.Unlock()
+
+ // An old election?
+ if term < r.p.CurrentTerm() {
+ return r.p.CurrentTerm(), false, nil
+ }
+
+ // If the term is higher than the current election term, then we are into a new election.
+ if term > r.p.CurrentTerm() {
+ r.setRoleAndWatchdogTimer(RoleFollower)
+ r.p.SetVotedFor("")
+ }
+ vf := r.p.VotedFor()
+
+ // Have we already voted for someone else (for example myself)?
+ if vf != "" && vf != candidate {
+ return r.p.CurrentTerm(), false, nil
+ }
+
+ // If we have a more up to date log, ignore the candidate. (RAFT's safety restriction)
+ if r.p.LastTerm() > lastLogTerm {
+ return r.p.CurrentTerm(), false, nil
+ }
+ if r.p.LastTerm() == lastLogTerm && r.p.LastIndex() > lastLogIndex {
+ return r.p.CurrentTerm(), false, nil
+ }
+
+ // Vote for candidate.
+ r.p.SetCurrentTermAndVotedFor(term, candidate)
+ return r.p.CurrentTerm(), true, nil
+}
+
+// AppendToLog implements RaftProto.AppendToLog.
+func (s *service) AppendToLog(ctx *context.T, call rpc.ServerCall, term Term, leader string, prevIndex Index, prevTerm Term, leaderCommit Index, entries []LogEntry) error {
+ r := s.r
+
+ // The append needs to be atomic.
+ r.Lock()
+
+ // The leader has to be at least as up to date as we are.
+ if term < r.p.CurrentTerm() {
+ r.Unlock()
+ return verror.New(errBadTerm, ctx, term, r.p.CurrentTerm())
+ }
+
+ // At this point we accept the sender as leader and become a follower.
+ if r.leader != leader {
+ vlog.Infof("@%s new leader %s during AppendToLog", r.me.id, leader)
+ }
+ r.leader = leader
+ r.lcv.Broadcast()
+ r.setRoleAndWatchdogTimer(RoleFollower)
+ r.p.SetVotedFor("")
+
+ // Append if we can.
+ if err := r.p.AppendToLog(ctx, prevTerm, prevIndex, entries); err != nil {
+ vlog.Errorf("@%s AppendToLog returns %s", r.me.id, err)
+ r.Unlock()
+ return err
+ }
+ r.Unlock()
+ r.newCommit <- leaderCommit
+ return nil
+}
+
+// Append implements RaftProto.Append.
+func (s *service) Append(ctx *context.T, call rpc.ServerCall, cmd []byte) (Term, Index, error) {
+ r := s.r
+ r.Lock()
+
+ if r.role != RoleLeader {
+ r.Unlock()
+ return 0, 0, verror.New(errNotLeader, ctx)
+ }
+
+ // Assign an index and term to the log entry.
+ le := LogEntry{Term: r.p.CurrentTerm(), Index: r.p.LastIndex()+1, Cmd: cmd}
+
+ // Append to our own log.
+ if err := r.p.AppendToLog(ctx, r.p.LastTerm(), r.p.LastIndex(), []LogEntry{le}); err != nil {
+ // This shouldn't happen.
+ r.Unlock()
+ return 0, 0, err
+ }
+
+ // Update the fact that we've logged it.
+ r.setMatchIndex(r.me, le.Index)
+
+ // Tell each follower to update.
+ r.kickFollowers()
+ r.Unlock()
+
+ // The entry is not yet commited or applied. The caller must verify that itself.
+ return le.Term, le.Index, nil
+}
+
+// InstallSnapshot implements RaftProto.InstallSnapshot.
+func (s *service) InstallSnapshot(ctx *context.T, call raftProtoInstallSnapshotServerCall, term Term, leader string, appliedTerm Term, appliedIndex Index) error {
+ r := s.r
+
+ // The snapshot needs to be atomic.
+ r.Lock()
+ defer r.Unlock()
+
+ // The leader has to be at least as up to date as we are.
+ if term < r.p.CurrentTerm() {
+ return verror.New(errBadTerm, ctx, term, r.p.CurrentTerm())
+ }
+
+ // At this point we accept the sender as leader and become a follower.
+ if r.leader != leader {
+ vlog.Infof("@%s new leader %s during InstallSnapshot", r.me.id, leader)
+ }
+ r.leader = leader
+ r.lcv.Broadcast()
+ r.setRoleAndWatchdogTimer(RoleFollower)
+ r.p.SetVotedFor("")
+
+ // Store the snapshot and restore client from it.
+ return r.p.SnapshotFromLeader(ctx, appliedTerm, appliedIndex, call)
+}