raft: new implementation

See model.go for the client interface to the library.

ACLs have now been added to control access.

Change-Id: I9cc7824cd9ba2c2f5adb8546ab6ea967e22e296d
diff --git a/lib/raft/README b/lib/raft/README
new file mode 100644
index 0000000..8c7c3f4
--- /dev/null
+++ b/lib/raft/README
@@ -0,0 +1,27 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+This is an implemetation of the Raft agreement protocol.  Each raft
+member maintains a log of commands in the same order.  All commands 
+go through a single master.  The master keeps track of the commit point,
+i.e., the point in the log where a quorum of servers have stored the log.
+Each server can apply the commands up to the commit point.  When a client
+starts a member it provides a callback for applying the commands as they are
+committed.  It is up to the application to make sure commands are idempotent.
+For example, it can remember the last command applied and not let any old
+ones be reapplied.
+
+Raft members use the file system to persist their log records across crashes.
+We're currently not syncing after writing each record to the disk to
+speed things up.  Because of the idempotent callback, this will work as long
+as a member of the quorum survives each master reelection but I may have to
+eventualy rethink this.
+
+The leader sends heartbeat messages at a fixed interval (hb).  Each follower will
+trigger a new election if it hasn't heard from the leader in an interval 2.x * hb,
+where x is a random number between 0 and 1.  The random interval reduces but does
+not eliminate the likelihood of two elections starting simultaneously.
+
+The VDL protocol is internal, i.e., it is just for raft members to talk to each
+other.
diff --git a/lib/raft/client_test.go b/lib/raft/client_test.go
new file mode 100644
index 0000000..d8d5422
--- /dev/null
+++ b/lib/raft/client_test.go
@@ -0,0 +1,180 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+	"encoding/json"
+	"io"
+	"io/ioutil"
+	"os"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/x/lib/vlog"
+	_ "v.io/x/ref/runtime/factories/generic"
+)
+
+type client struct {
+	sync.RWMutex
+	cmds    [][]byte // applied commands
+	id      string
+	applied Index
+}
+
+func (c *client) Apply(cmd []byte, index Index) error {
+	//vlog.Infof("Applying %d %s", index, cmd)
+	c.Lock()
+	c.cmds = append(c.cmds, cmd)
+	c.applied = index
+	c.Unlock()
+	return nil
+}
+
+func (c *client) Applied() Index {
+	c.RLock()
+	defer c.RUnlock()
+	return c.applied
+}
+
+func (c *client) SaveToSnapshot(ctx *context.T, wr io.Writer, response chan<- error) error {
+	close(response)
+	c.RLock()
+	defer c.RUnlock()
+	return json.NewEncoder(wr).Encode(c.cmds)
+}
+
+func (c *client) RestoreFromSnapshot(ctx *context.T, index Index, rd io.Reader) error {
+	c.Lock()
+	defer c.Unlock()
+	c.applied = index
+	return json.NewDecoder(rd).Decode(&c.cmds)
+}
+
+func (c *client) LeaderChange(me, leader string) {
+	if me == leader {
+		vlog.Infof("%s now leader", leader)
+	} else {
+		vlog.Infof("%s recognizes %s as leader", me, leader)
+	}
+}
+
+func (c *client) Compare(t *testing.T, nc *client) {
+	c.RLock()
+	defer c.RUnlock()
+	nc.RLock()
+	defer nc.RUnlock()
+	if !reflect.DeepEqual(c.cmds, nc.cmds) {
+		t.Fatalf("%v != %v", c.cmds, nc.cmds)
+	}
+}
+
+// buildRafts creates a set of raft members and starts up the services.
+func buildRafts(t *testing.T, ctx *context.T, n int, config *RaftConfig) ([]*raft, []*client) {
+	if config == nil {
+		config = new(RaftConfig)
+	}
+	config.Heartbeat = time.Second
+	if len(config.HostPort) == 0 {
+		config.HostPort = "127.0.0.1:0"
+	}
+	// Start each server with its own log directory.
+	var rs []*raft
+	var cs []*client
+	var td []string
+	for i := 0; i < n; i++ {
+		if n > 1 || len(config.LogDir) == 0 {
+			config.LogDir = tempDir(t)
+		}
+		c := new(client)
+		r, err := newRaft(ctx, config, c)
+		if err != nil {
+			t.Fatalf("NewRaft: %s", err)
+		}
+		td = append(td, config.LogDir)
+		c.id = r.Id()
+		rs = append(rs, r)
+		cs = append(cs, c)
+		vlog.Infof("id is %s", r.Id())
+	}
+	// Tell each server about the complete set.
+	for i := range rs {
+		for j := range rs {
+			rs[i].AddMember(ctx, rs[j].Id())
+		}
+	}
+	// Start the servers up.
+	for i := range rs {
+		rs[i].Start()
+	}
+	return rs, cs
+}
+
+// restart a member from scratch, keeping its address and log name.
+func restart(t *testing.T, ctx *context.T, rs []*raft, cs []*client, r *raft) {
+	config := RaftConfig{HostPort: r.me.id[1:], LogDir: r.logDir}
+	c := new(client)
+	rn, err := newRaft(ctx, &config, c)
+	if err != nil {
+		t.Fatalf("NewRaft: %s", err)
+	}
+	for i := range rs {
+		if rs[i] == r {
+			rs[i] = rn
+			cs[i] = c
+			c.id = rn.Id()
+			break
+		}
+	}
+	for j := range rs {
+		rn.AddMember(ctx, rs[j].Id())
+	}
+	rn.Start()
+}
+
+// cleanUp all the rafts.
+func cleanUp(rs []*raft) {
+	for i := range rs {
+		rs[i].Stop()
+		os.RemoveAll(rs[i].logDir)
+	}
+}
+
+func TestClientSnapshot(t *testing.T) {
+	vlog.Infof("TestCreation")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	// Make sure the test client works as expected.
+	c := new(client)
+	for i, cmd := range []string{"the", "rain", "in", "spain", "falls", "mainly", "on", "the", "plain"} {
+		c.Apply([]byte(cmd), Index(i))
+	}
+	fp, err := ioutil.TempFile(".", "TestClientSnapshot")
+	if err != nil {
+		t.Fatalf("can't create snapshot: %s", err)
+	}
+	done := make(chan error)
+	if err := c.SaveToSnapshot(ctx, fp, done); err != nil {
+		t.Fatalf("can't write snapshot: %s", err)
+	}
+	<-done
+	fp.Sync()
+	fp.Seek(0, 0)
+	nc := new(client)
+	if err != nil {
+		t.Fatalf("can't open snapshot: %s", err)
+	}
+	if err := nc.RestoreFromSnapshot(ctx, 0, fp); err != nil {
+		t.Fatalf("can't read snapshot: %s", err)
+	}
+	fp.Close()
+	os.Remove(fp.Name())
+	c.Compare(t, nc)
+	vlog.Infof("TestCreation passed")
+}
diff --git a/lib/raft/election_test.go b/lib/raft/election_test.go
new file mode 100644
index 0000000..b690408
--- /dev/null
+++ b/lib/raft/election_test.go
@@ -0,0 +1,166 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+	"testing"
+	"time"
+
+	"v.io/x/lib/vlog"
+
+	"v.io/v23"
+	_ "v.io/x/ref/runtime/factories/generic"
+)
+
+// waitForElection waits for a new leader to be elected.  We also make sure there
+// is only one leader.
+func waitForElection(t *testing.T, rs []*raft, timeout time.Duration) *raft {
+	start := time.Now()
+	for {
+		var leader *raft
+		leaders := 0
+		for _, r := range rs {
+			id, role, _ := r.Status()
+			if role == RoleLeader {
+				vlog.Infof("%s is leader", id)
+				leaders++
+				leader = r
+			}
+		}
+		if leaders > 1 {
+			t.Fatalf("found %d leaders", leaders)
+		}
+		if leader != nil {
+			return leader
+		}
+		if time.Now().Sub(start) > timeout {
+			return nil
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+}
+
+// waitForLeaderAgreement makes sure all working servers agree on the leader.
+func waitForLeaderAgreement(rs []*raft, timeout time.Duration) bool {
+	start := time.Now()
+	for {
+		leader := make(map[string]string)
+		for _, r := range rs {
+			id, role, l := r.Status()
+			switch role {
+			case RoleLeader:
+				leader[id] = id
+			case RoleFollower:
+				leader[l] = id
+			}
+		}
+		if len(leader) == 1 {
+			return true
+		}
+		if time.Now().Sub(start) > timeout {
+			vlog.Errorf("oops %v", leader)
+			return false
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func TestElection(t *testing.T) {
+	vlog.Infof("TestElection")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	rs, cs := buildRafts(t, ctx, 5, nil)
+	defer cleanUp(rs)
+	thb := rs[0].heartbeat
+
+	// One of the raft members should time out not hearing a leader and start an election.
+	r1 := waitForElection(t, rs, 5*thb)
+	if r1 == nil {
+		t.Fatalf("too long to find a leader")
+	}
+	time.Sleep(time.Millisecond)
+	if !waitForLeaderAgreement(rs, thb) {
+		t.Fatalf("no leader agreement")
+	}
+
+	// Stop the leader and wait for the next election.
+	r1.Stop()
+	r2 := waitForElection(t, rs, 5*thb)
+	if r2 == nil {
+		t.Fatalf("too long to find a leader")
+	}
+	if !waitForLeaderAgreement(rs, thb) {
+		t.Fatalf("no leader agreement")
+	}
+
+	// One more time.
+	r2.Stop()
+	r3 := waitForElection(t, rs, 5*thb)
+	if r3 == nil {
+		t.Fatalf("too long to find a leader")
+	}
+	if !waitForLeaderAgreement(rs, thb) {
+		t.Fatalf("no leader agreement")
+	}
+
+	// One more time.  Shouldn't succeed since we no longer have a quorum.
+	r3.Stop()
+	r4 := waitForElection(t, rs, 5*thb)
+	if r4 != nil {
+		t.Fatalf("shouldn't have a leader with no quorum")
+	}
+
+	// Restart r1.  We should be back to a quorum so an election should succeed.
+	restart(t, ctx, rs, cs, r1)
+	r4 = waitForElection(t, rs, 5*thb)
+	if r4 == nil {
+		t.Fatalf("too long to find a leader")
+	}
+	if !waitForLeaderAgreement(rs, thb) {
+		t.Fatalf("no leader agreement")
+	}
+
+	// Restart r2.  Within thb time the new guy should agree with everyone else on who the leader is.
+	restart(t, ctx, rs, cs, r2)
+	if !waitForLeaderAgreement(rs, 3*thb) {
+		t.Fatalf("no leader agreement")
+	}
+	vlog.Infof("TestElection passed")
+}
+
+func TestPerformanceElection(t *testing.T) {
+	vlog.Infof("TestPerformanceElection")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	rs, _ := buildRafts(t, ctx, 5, nil)
+	defer cleanUp(rs)
+	thb := rs[0].heartbeat
+
+	// One of the raft members should time out not hearing a leader and start an election.
+	r1 := waitForElection(t, rs, 5*thb)
+	if r1 == nil {
+		t.Fatalf("too long to find a leader")
+	}
+	time.Sleep(time.Millisecond)
+	if !waitForLeaderAgreement(rs, thb) {
+		t.Fatalf("no leader agreement")
+	}
+	vlog.Infof("starting 1000 elections")
+
+	// Now force 1000 elections.
+	start := time.Now()
+	for i := 0; i < 200; i++ {
+		x := i % 5
+		rs[x].StartElection()
+		if !waitForLeaderAgreement(rs, 5*thb) {
+			t.Fatalf("no leader agreement")
+		}
+	}
+	duration := time.Now().Sub(start)
+	vlog.Infof("200 elections took %s", duration)
+	vlog.Infof("TestPerformanceElection passed")
+}
diff --git a/lib/raft/fspersistence.go b/lib/raft/fspersistence.go
new file mode 100644
index 0000000..f57666f
--- /dev/null
+++ b/lib/raft/fspersistence.go
@@ -0,0 +1,696 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+// The persistent log consists of a directory containing the files:
+//    snap.<term>.<index> - snapshots of the client database to which all log entries
+//                          up to <term>.<index> have been applied.
+//    log.<term>.<index> - a log commencing after <term>.<index>
+//
+// The tail of the log is kept in memory.  Once the tail is long enough (> snapshotThreshold) a
+// snapshot is taken.
+
+import (
+	"encoding/gob"
+	"errors"
+	"fmt"
+	"io"
+	"math/rand"
+	"os"
+	"path"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+
+	"v.io/x/lib/vlog"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+const (
+	headerLen                = 1024
+	defaultSnapshotThreshold = 10 * 1024 * 1024
+)
+
+// fsPersist is persistent state implemented in a file system.  It stores the state in a single directory.
+// One file, <directory>/persistent, represents the persistent non-log state and a series of files,
+// <directory>/log<n>, the logs.  A new log file is created on startup and after every snapshot.
+type fsPersist struct {
+	sync.Mutex
+
+	r           *raft
+	currentTerm Term
+	votedFor    string
+
+	dir       string // the directory
+	client    RaftClient
+	baseIndex Index // The index before the log starts (either 0 or from a snapshot).
+	baseTerm  Term  // The term before the log starts (either 0 or from a snapshot).
+	lastIndex Index // last index stored to
+	lastTerm  Term  // term for entry at lastIindex
+
+	// For appending the log file.
+	lf      *os.File
+	encoder *gob.Encoder
+
+	// In-memory version of the log file.  We depend on log truncation to keep this
+	// to a reasonable size.
+	logTail           map[Index]*logEntry
+	firstLogTailIndex Index
+	snapshotThreshold int64
+
+	// Name of last snapshot read or written.
+	lastSnapFile  string
+	lastSnapTerm  Term
+	lastSnapIndex Index
+	snapping      bool // true if we are in the process of creating a snapshot.
+}
+
+// ControlEntry's are appended to the log to reflect changes in the current term and/or the voted for
+// leader during ellections
+type ControlEntry struct {
+	InUse       bool
+	CurrentTerm Term
+	VotedFor    string
+}
+
+// logEntrySize is an approximation of the size of a log entry.
+func logEntrySize(le *LogEntry) int64 {
+	return int64(32 + len(le.Cmd))
+}
+
+// SetCurrentTerm implements persistent.SetCurrentTerm.
+func (p *fsPersist) SetCurrentTerm(ct Term) error {
+	p.Lock()
+	defer p.Unlock()
+	p.currentTerm = ct
+	return p.syncLog()
+}
+
+// IncCurrentTerm increments the current term.
+func (p *fsPersist) IncCurrentTerm() error {
+	p.Lock()
+	defer p.Unlock()
+	p.currentTerm = p.currentTerm + 1
+	return p.syncLog()
+}
+
+// SetVotedFor implements persistent.SetVotedFor.
+func (p *fsPersist) SetVotedFor(vf string) error {
+	p.Lock()
+	defer p.Unlock()
+	if vf == p.votedFor {
+		return nil
+	}
+	p.votedFor = vf
+	return p.syncLog()
+}
+
+// SetVotedFor implements persistent.SetCurrentTermAndVotedFor.
+func (p *fsPersist) SetCurrentTermAndVotedFor(ct Term, vf string) error {
+	p.Lock()
+	defer p.Unlock()
+	p.currentTerm = ct
+	p.votedFor = vf
+	return p.syncLog()
+}
+
+// CurrentTerm implements persistent.CurrentTerm.
+func (p *fsPersist) CurrentTerm() Term {
+	p.Lock()
+	defer p.Unlock()
+	return p.currentTerm
+}
+
+// LastIndex implements persistent.LastIndex.
+func (p *fsPersist) LastIndex() Index {
+	p.Lock()
+	defer p.Unlock()
+	return p.lastIndex
+}
+
+// LastTerm implements persistent.LastTerm.
+func (p *fsPersist) LastTerm() Term {
+	p.Lock()
+	defer p.Unlock()
+	return p.lastTerm
+}
+
+// VotedFor implements persistent.VotedFor.
+func (p *fsPersist) VotedFor() string {
+	p.Lock()
+	defer p.Unlock()
+	return p.votedFor
+}
+
+// Close implements persistent.Close.
+func (p *fsPersist) Close() {
+	p.lf.Sync()
+	p.lf.Close()
+}
+
+// AppendToLog implements persistent.AppendToLog.
+func (p *fsPersist) AppendToLog(ctx *context.T, prevTerm Term, prevIndex Index, entries []LogEntry) error {
+	p.Lock()
+	defer p.Unlock()
+
+	if prevIndex != p.baseIndex || prevTerm != p.baseTerm {
+		// We will not log if the previous entry either doesn't exist or has the wrong term.
+		le := p.lookup(prevIndex)
+		if le == nil {
+			return verror.New(errOutOfSequence, ctx, prevTerm, prevIndex)
+		} else if le.Term != prevTerm {
+			return verror.New(errOutOfSequence, ctx, prevTerm, prevIndex)
+		}
+	}
+	for i, e := range entries {
+		// If its already in the log, do nothing.
+		le, ok := p.logTail[prevIndex+Index(i)+1]
+		if ok && le.Term == e.Term {
+			continue
+		}
+
+		// Add both to the log logTail and the log file.
+		// TODO(p): Think about syncing the output before returning.
+		ne := e
+		if err := p.encoder.Encode(ne); err != nil {
+			return fmt.Errorf("append %d, %d, %v: %s", prevTerm, prevIndex, ne, err)
+		}
+		p.addToLogTail(&ne)
+	}
+
+	return nil
+}
+
+// ConsiderSnapshot implements Persistent.ConsiderSnapshot.
+func (p *fsPersist) ConsiderSnapshot(ctx *context.T, lastAppliedTerm Term, lastAppliedIndex Index) {
+	p.Lock()
+	if p.snapping {
+		p.Unlock()
+		return
+	}
+
+	// Take a snapshot if the log is too big.
+	if int64(lastAppliedIndex-p.firstLogTailIndex) < p.snapshotThreshold {
+		p.Unlock()
+		return
+	}
+	p.snapping = true
+	p.Unlock()
+
+	safeToProceed := make(chan struct{})
+	go p.takeSnapshot(ctx, lastAppliedTerm, lastAppliedIndex, safeToProceed)
+	<-safeToProceed
+}
+
+// TrimLog trims the log to contain only entries from prevIndex on.  This will fail if prevIndex isn't
+// in the logTail or if creating or writing the new log file fails.
+func (p *fsPersist) trimLog(ctx *context.T, prevTerm Term, prevIndex Index) error {
+	// Nothing to trim?
+	vlog.Infof("trimLog(%d, %d)", prevTerm, prevIndex)
+	if prevIndex == 0 {
+		return nil
+	}
+
+	p.Lock()
+	le := p.lookup(prevIndex)
+	p.Unlock()
+	if le == nil {
+		return verror.New(errOutOfSequence, ctx, 0, prevIndex)
+	}
+
+	// Create a new log file.
+	fn, encoder, err := p.newLog(prevTerm, prevIndex)
+	if err != nil {
+		return err
+	}
+
+	// Append all log entries after prevIndex.  Unlock while writing so we don't
+	// overly affect performance.  We might miss some entries that are added while the
+	// lock is released so redo this loop later under lock.
+	i := 1
+	for {
+		p.Lock()
+		le, ok := p.logTail[prevIndex+Index(i)]
+		p.Unlock()
+		if !ok {
+			break
+		}
+		if err := encoder.Encode(*le); err != nil {
+			os.Remove(fn)
+			return fmt.Errorf("trim %d, %v: %s", prevIndex, *le, err)
+		}
+		i++
+	}
+
+	// Finish up under lock.
+	p.Lock()
+	defer p.Unlock()
+	for {
+		le, ok := p.logTail[prevIndex+Index(i)]
+		if !ok {
+			break
+		}
+		if err := encoder.Encode(*le); err != nil {
+			os.Remove(fn)
+			return fmt.Errorf("trim %d, %v: %s", prevIndex, *le, err)
+		}
+		i++
+	}
+
+	// Start using new file.
+	p.encoder = encoder
+	p.syncLog()
+
+	// Remove some logTail entries.  Try to keep at least half of the entries around in case
+	// we'll need them to update a lagging member.
+	if prevIndex >= p.firstLogTailIndex {
+		i := p.firstLogTailIndex + (prevIndex-p.firstLogTailIndex)/2
+		for p.firstLogTailIndex <= i {
+			delete(p.logTail, p.firstLogTailIndex)
+			p.firstLogTailIndex++
+		}
+	}
+
+	return nil
+}
+
+// takeSnapshot is a go routine that starts a snapshot and on success trims the log.
+// 'safeToProceed' is closed when it is safe to continue applying commands.
+func (p *fsPersist) takeSnapshot(ctx *context.T, t Term, i Index, safeToProceed chan struct{}) error {
+	r := p.r
+	defer func() { p.Lock(); p.snapping = false; p.Unlock() }()
+
+	// Create a file for the snapshot.  Lock to prevent any logs from being applied while we do this.
+	fn := p.snapPath(termIndexToFileName(t, i))
+	vlog.Infof("snapshot %s", fn)
+	fp, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
+	if err != nil {
+		close(safeToProceed)
+		vlog.Errorf("snapshot %s: %s", fn, err)
+		return err
+	}
+
+	// Start the snapshot.
+	c := make(chan error)
+	if err := r.client.SaveToSnapshot(ctx, fp, c); err != nil {
+		close(safeToProceed)
+		fp.Close()
+		os.Remove(fn)
+		vlog.Errorf("snapshot %s: %s", fn, err)
+		return err
+	}
+
+	// At this point the client has saved whatever it needs for the snapshot (copy on write perhaps)
+	// so we can release the raft server to continue applying commands.
+	close(safeToProceed)
+
+	// Wait for the snapshot to finish.
+	if err := <-c; err != nil {
+		fp.Close()
+		os.Remove(fn)
+		vlog.Errorf("snapshot %s: %s", fn, err)
+		return err
+	}
+	if err := fp.Sync(); err != nil {
+		os.Remove(fn)
+		vlog.Errorf("snapshot %s: %s", fn, err)
+		return err
+	}
+	if err := fp.Close(); err != nil {
+		os.Remove(fn)
+		vlog.Errorf("snapshot %s: %s", fn, err)
+		return err
+	}
+
+	if err := p.trimLog(ctx, t, i); err != nil {
+		os.Remove(fn)
+		vlog.Errorf("snapshot %s: %s", fn, err)
+		return err
+	}
+
+	p.Lock()
+	p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex = fn, t, i
+	p.Unlock()
+
+	vlog.Infof("snapshot %s succeeded", fn)
+	return nil
+}
+
+// SnapshotFromLeader implements Persistence.SnapshotFromLeader.  Called with p.r locked.
+func (p *fsPersist) SnapshotFromLeader(ctx *context.T, term Term, index Index, call raftProtoInstallSnapshotServerCall) error {
+	r := p.r
+
+	// First securely save the snapshot.
+	fn := p.snapPath(termIndexToFileName(term, index))
+	fp, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
+	if err != nil {
+		return err
+	}
+	rstream := call.RecvStream()
+	for rstream.Advance() {
+		value := rstream.Value()
+		if _, err := fp.Write(value); err != nil {
+			fp.Close()
+			os.Remove(fn)
+			return err
+		}
+	}
+	if err := fp.Sync(); err != nil {
+		fp.Close()
+		os.Remove(fn)
+		return err
+	}
+	if err := fp.Close(); err != nil {
+		os.Remove(fn)
+		return err
+	}
+	if err := rstream.Err(); err != nil {
+		os.Remove(fn)
+		return err
+	}
+
+	// Now try restoring client state with it.
+	fp, err = os.Open(fn)
+	if err != nil {
+		return err
+	}
+	defer fp.Close()
+	if err := r.client.RestoreFromSnapshot(ctx, index, fp); err != nil {
+		return err
+	}
+	r.applied.term, r.applied.index = term, index
+	p.Lock()
+	p.baseTerm, p.baseIndex = term, index
+	p.lastTerm, p.lastIndex = term, index
+	p.Unlock()
+	return nil
+}
+
+func (p *fsPersist) lookup(i Index) *logEntry {
+	if le, ok := p.logTail[i]; ok {
+		return le
+	}
+	return nil
+}
+
+// Lookup implements persistent.Lookup.
+func (p *fsPersist) Lookup(i Index) *logEntry {
+	p.Lock()
+	defer p.Unlock()
+	return p.lookup(i)
+}
+
+// LookupPrevious implements persistent.LookupPrevious.
+func (p *fsPersist) LookupPrevious(i Index) (Term, Index, bool) {
+	p.Lock()
+	defer p.Unlock()
+	i--
+	if i == p.baseIndex {
+		return p.baseTerm, i, true
+	}
+	le := p.lookup(i)
+	if le == nil {
+		return 0, i, false
+	}
+	return le.Term, i, true
+}
+
+// syncLog assumes that p is locked.
+func (p *fsPersist) syncLog() error {
+	ce := ControlEntry{InUse: true, CurrentTerm: p.currentTerm, VotedFor: p.votedFor}
+	if err := p.encoder.Encode(ce); err != nil {
+		return fmt.Errorf("syncLog: %s", err)
+	}
+	p.lf.Sync()
+	return nil
+}
+
+type Entry struct {
+	LogEntry
+	ControlEntry
+}
+
+// readLog reads a log file into memory.
+//
+// Assumes p is locked.
+func (p *fsPersist) readLog(file string) error {
+	vlog.Infof("reading %s", file)
+	f, err := os.OpenFile(file, os.O_RDONLY, 0666)
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+
+	d := gob.NewDecoder(f)
+	for {
+		var e Entry
+		if err := d.Decode(&e); err != nil {
+			if err == io.EOF {
+				return nil
+			}
+			return err
+		}
+		if e.InUse {
+			p.currentTerm = e.CurrentTerm
+			p.votedFor = e.VotedFor
+		}
+		if e.Index != 0 && (p.lastIndex == 0 || e.Index <= p.lastIndex+1) {
+			p.addToLogTail(&LogEntry{Term: e.Term, Index: e.Index, Cmd: e.Cmd})
+		}
+	}
+}
+
+// addToLogTail adds the entry to the logTail if not already there and returns true
+// if the logTail changed.
+func (p *fsPersist) addToLogTail(wle *LogEntry) bool {
+	le := &logEntry{Term: wle.Term, Index: wle.Index, Cmd: wle.Cmd, Type: wle.Type}
+	ole, ok := p.logTail[le.Index]
+	if ok {
+		if ole.Term == le.Term {
+			return false
+		}
+		// Remove all higher entries.
+		for i := le.Index + 1; i <= p.lastIndex; i++ {
+			delete(p.logTail, i)
+		}
+	}
+	p.logTail[le.Index] = le
+	p.lastIndex = le.Index
+	p.lastTerm = le.Term
+	return true
+}
+
+// openPersist returns an object that implements the persistent interface.  If the directory doesn't
+// already exist, it is created.  If there is a problem with the contained files, an error is returned.
+func openPersist(ctx *context.T, r *raft, snapshotThreshold int64) (*fsPersist, error) {
+	if snapshotThreshold == 0 {
+		snapshotThreshold = defaultSnapshotThreshold
+	}
+	p := &fsPersist{r: r, dir: r.logDir, logTail: make(map[Index]*logEntry), client: r.client, snapshotThreshold: snapshotThreshold}
+	p.Lock()
+	defer p.Unlock()
+
+	// Randomize max size so all members aren't checkpointing at the same time.
+	p.snapshotThreshold = p.snapshotThreshold + rand.Int63n(1+(p.snapshotThreshold>>3))
+
+	// Read the persistent state, the latest snapshot, and any log entries since then.
+	if err := p.readState(ctx, r); err != nil {
+		if err := p.createState(); err != nil {
+			return nil, err
+		}
+	}
+
+	// Start a new command log.
+	if err := p.rotateLog(); err != nil {
+		return nil, err
+	}
+	return p, nil
+}
+
+func (p *fsPersist) snapPath(s string) string {
+	return path.Join(p.dir, s+".snap")
+}
+
+func (p *fsPersist) logPath(s string) string {
+	return path.Join(p.dir, s+".log")
+}
+
+func termIndexToFileName(t Term, i Index) string {
+	return fmt.Sprintf("%20.20d.%20.20d", t, i)
+}
+
+// paerseFileName returns the term and index from a file name of the form <term>.<index>[.whatever]
+func parseFileName(s string) (Term, Index, error) {
+	p := strings.Split(path.Base(s), ".")
+	if len(p) < 2 {
+		return 0, 0, errors.New("not log name")
+	}
+	t, err := strconv.ParseInt(p[0], 10, 64)
+	if err != nil {
+		return 0, 0, err
+	}
+	if err != nil {
+		return 0, 0, err
+	}
+	i, err := strconv.ParseInt(p[1], 10, 64)
+	return Term(t), Index(i), nil
+}
+
+// readState is called on start up.  State is recreated from the last valid snapshot and
+// any subsequent log files.  If any log file has a decoding error, we give up and return
+// an error.
+//
+// TODO(p): This is perhaps too extreme since we should be able to continue from a snapshot
+// and a prefix of the log.
+//
+// Assumes p is locked.
+func (p *fsPersist) readState(ctx *context.T, r *raft) error {
+	d, err := os.Open(p.dir)
+	if err != nil {
+		return err
+	}
+	defer d.Close()
+
+	// Find all snapshot and log files.
+	vlog.Infof("reading directory %s", p.dir)
+	files, err := d.Readdirnames(0)
+	if err != nil {
+		return err
+	}
+	lmap := make(map[string]struct{})
+	var logs []string
+	smap := make(map[string]struct{})
+	for _, f := range files {
+		if strings.HasSuffix(f, ".log") {
+			f = strings.TrimSuffix(f, ".log")
+			lmap[f] = struct{}{}
+			logs = append(logs, f)
+		} else if strings.HasSuffix(f, ".snap") {
+			smap[strings.TrimSuffix(f, ".snap")] = struct{}{}
+		}
+	}
+
+	// Throw out any snapshots that don't have an equivalent log file; they are not complete.
+	for s := range smap {
+		if _, ok := lmap[s]; !ok {
+			os.Remove(p.snapPath(s))
+			delete(smap, s)
+		}
+	}
+
+	// Find the latest readable snapshot.
+	var snaps []string
+	for s := range smap {
+		snaps = append(snaps, s)
+	}
+	sort.StringSlice(snaps).Sort()
+	sort.StringSlice(logs).Sort()
+	firstLog := termIndexToFileName(0, 0)
+	for i := len(snaps) - 1; i >= 0; i-- {
+		f := p.snapPath(snaps[i])
+		fp, err := os.Open(f)
+		if err != nil {
+			os.Remove(f)
+			continue
+		}
+		term, index, err := parseFileName(snaps[i])
+		if err != nil {
+			os.Remove(f)
+			continue
+		}
+		vlog.Infof("restoring from snapshot %s", f)
+		err = p.client.RestoreFromSnapshot(ctx, index, fp)
+		vlog.Infof("restored from snapshot %s", err)
+		fp.Close()
+		if err == nil {
+			// The snapshot was readable, we're done.
+			firstLog = snaps[i]
+			// The name of the snapshot has the last applied entry
+			// in that snapshot.  Remember it so that we won't reapply
+			// any log entries in case they aren't equipotent.
+			r.applied.term, r.applied.index = term, index
+			p.baseTerm, p.baseIndex = term, index
+			p.lastTerm, p.lastIndex = term, index
+			p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex = f, term, index
+			break
+		}
+		os.Remove(f)
+	}
+
+	// find the log that goes with the snapshot.
+	found := false
+	for _, l := range logs {
+		if l == firstLog {
+			found = true
+		}
+		f := p.logPath(l)
+		if !found {
+			// Remove stale log files.
+			os.Remove(f)
+			continue
+		}
+		// Give up on first bad/incomplete log entry.  If we lost any log entries,
+		// we should be refreshed from some other member.
+		if err := p.readLog(f); err != nil {
+			vlog.Infof("reading %s: %s", f, err)
+			break
+		}
+	}
+	r.setMatchIndex(r.me, p.lastIndex)
+
+	return nil
+}
+
+func (p *fsPersist) OpenLatestSnapshot(ctx *context.T) (io.Reader, Term, Index, error) {
+	p.Lock()
+	if len(p.lastSnapFile) == 0 {
+		p.Unlock()
+		return nil, 0, 0, errors.New("no snapshot")
+	}
+	fn, t, i := p.lastSnapFile, p.lastSnapTerm, p.lastSnapIndex
+	p.Unlock()
+	fp, err := os.Open(fn)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	return fp, t, i, nil
+}
+
+// newLog starts a new log file. It doesn't point the p.encoder at it yet.
+func (p *fsPersist) newLog(prevTerm Term, prevIndex Index) (string, *gob.Encoder, error) {
+	// Create an append only encoder for log entries.
+	fn := p.logPath(termIndexToFileName(prevTerm, prevIndex))
+	append, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
+	if err != nil {
+		return "", nil, err
+	}
+	vlog.Infof("new log file %s", fn)
+	return fn, gob.NewEncoder(append), nil
+}
+
+// rotateLog starts a new log.
+//
+// Assumes p is already locked.
+func (p *fsPersist) rotateLog() error {
+	_, encoder, err := p.newLog(p.lastTerm, p.lastIndex)
+	if err != nil {
+		return err
+	}
+	p.encoder = encoder
+	p.syncLog()
+	return nil
+}
+
+// createState throws out all state and starts again from scratch.
+func (p *fsPersist) createState() error {
+	os.RemoveAll(p.dir)
+	if err := os.Mkdir(p.dir, 0770); err != nil {
+		return err
+	}
+	return nil
+}
diff --git a/lib/raft/fspersistence_test.go b/lib/raft/fspersistence_test.go
new file mode 100644
index 0000000..5a0e1a8
--- /dev/null
+++ b/lib/raft/fspersistence_test.go
@@ -0,0 +1,332 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path"
+	"strings"
+	"testing"
+	"time"
+
+	"v.io/v23"
+	"v.io/x/lib/vlog"
+)
+
+func tempDir(t *testing.T) string {
+	// Get a temporary file name that is unlikely to clash with anyone else's.
+	dir, err := ioutil.TempDir("", "raftstate")
+	if err != nil {
+		t.Fatalf("TempDir: %s", err)
+	}
+	os.Remove(dir)
+	vlog.Infof("log is %s", dir)
+	return dir
+}
+
+func compareLogs(t *testing.T, p persistent, expected []LogEntry, tag string) {
+	found := 0
+outer:
+	for i := Index(0); i < 1000; i++ {
+		le := p.Lookup(i)
+		if le == nil {
+			continue
+		}
+		for _, v := range expected {
+			if v.Index != le.Index {
+				continue
+			}
+			if v.Term != le.Term || string(v.Cmd) != string(le.Cmd) {
+				t.Fatalf("%s: expected %v, got %v", tag, v, *le)
+			}
+			found++
+			continue outer
+		}
+		t.Fatalf("unexpected: %v", *le)
+	}
+	if found != len(expected) {
+		t.Fatalf("%s: not all entries found, got %d out of %v", tag, found, expected)
+	}
+}
+
+// TestCreation verifies that a log is created when we start a fresh instance and is still there after we stop it.
+func TestCreation(t *testing.T) {
+	vlog.Infof("TestCreation")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	td := tempDir(t)
+	defer os.RemoveAll(td)
+
+	// Launch a raft.  This should create the log file and append at least one control entry to it.
+	config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td}
+	r, err := newRaft(ctx, &config, new(client))
+	if err != nil {
+		t.Fatalf("newRaft: %s", err)
+	}
+	vlog.Infof("id is %s", r.Id())
+
+	// Make sure the file is there.
+	info, err := os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
+	if err != nil {
+		t.Fatalf("File didn't last: %s", err)
+	}
+	if info.Size() == 0 {
+		t.Fatalf("File too short: %d", info.Size())
+	}
+	r.Stop()
+
+	// Make sure the file is still there after shutdown.
+	info, err = os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
+	if err != nil {
+		t.Fatalf("File didn't last: %s", err)
+	}
+	if info.Size() == 0 {
+		t.Fatalf("File too short: %d", info.Size())
+	}
+	vlog.Infof("TestCreation passed")
+}
+
+// TestPersistence verifies that we store enough state to return to the previous log state
+// after a orderly stop and restart.
+//
+// We also test that a truncated log remains truncated after an orderly stop and restart.
+func TestPersistence(t *testing.T) {
+	vlog.Infof("TestPersistence")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	td := tempDir(t)
+	defer os.RemoveAll(td)
+
+	// Launch a raft.  This should create the file and populate the header with the
+	// nil values.  The file should persist after the raft is closed.
+	config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td}
+	r, err := newRaft(ctx, &config, new(client))
+	if err != nil {
+		t.Fatalf("newRaft: %s", err)
+	}
+	vlog.Infof("id is %s", r.Id())
+
+	// Set the persistent state.
+	if err := r.p.SetCurrentTerm(2); err != nil {
+		t.Fatalf("SetCurrentTerm: %s", err)
+	}
+	if err := r.p.SetVotedFor("whocares"); err != nil {
+		t.Fatalf("SetVotedFor: %s", err)
+	}
+	cmds1 := []LogEntry{
+		LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
+		LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
+		LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
+	}
+	if err := r.p.AppendToLog(ctx, 0, 0, cmds1); err != nil {
+		t.Fatalf("AppendToLog: %s", err)
+	}
+	cmds2 := []LogEntry{
+		LogEntry{Term: 3, Index: 4, Cmd: []byte("cmd4")},
+		LogEntry{Term: 3, Index: 5, Cmd: []byte("cmd5")},
+		LogEntry{Term: 3, Index: 6, Cmd: []byte("cmd6")},
+	}
+	if err := r.p.AppendToLog(ctx, 2, 3, cmds2); err != nil {
+		t.Fatalf("AppendToLog: %s", err)
+	}
+	info, err := os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
+	if err != nil {
+		t.Fatalf("File didn't last: %s", err)
+	}
+	vlog.Infof("log size %d", info.Size())
+	vlog.Infof("stopping %s", r.Id())
+	r.Stop()
+
+	// Reopen the persistent store and make sure it matches.
+	r, err = newRaft(ctx, &config, new(client))
+	if err != nil {
+		t.Fatalf("newRaft: %s", err)
+	}
+	vlog.Infof("id is %s", r.Id())
+	if r.p.CurrentTerm() != 2 {
+		t.Fatalf("CurrentTerm: expected %d got %d", 2, r.p.CurrentTerm())
+	}
+	if r.p.VotedFor() != "whocares" {
+		t.Fatalf("CurrentTerm: expected %s got %s", "whocares", r.p.VotedFor())
+	}
+	test := []LogEntry{
+		LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
+		LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
+		LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
+		LogEntry{Term: 3, Index: 4, Cmd: []byte("cmd4")},
+		LogEntry{Term: 3, Index: 5, Cmd: []byte("cmd5")},
+		LogEntry{Term: 3, Index: 6, Cmd: []byte("cmd6")},
+	}
+	compareLogs(t, r.p, test, "after reopen")
+
+	// Truncate the log by rewriting an index.
+	if err := r.p.AppendToLog(ctx, 2, 3, []LogEntry{LogEntry{Term: 4, Index: 4, Cmd: []byte("cmd7")}}); err != nil {
+		t.Fatalf("AppendToLog: %s", err)
+	}
+	test2 := []LogEntry{
+		LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
+		LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
+		LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
+		LogEntry{Term: 4, Index: 4, Cmd: []byte("cmd7")},
+	}
+	compareLogs(t, r.p, test2, "after truncate")
+	vlog.Infof("stopping %s", r.Id())
+	r.Stop()
+
+	// Make sure the log is still truncated when we reread it.
+	r, err = newRaft(ctx, &config, new(client))
+	if err != nil {
+		t.Fatalf("newRaft: %s", err)
+	}
+	vlog.Infof("id is %s", r.Id())
+	compareLogs(t, r.p, test2, "after truncate and close")
+	vlog.Infof("stopping %s", r.Id())
+	r.Stop()
+	vlog.Infof("TestPersistence passed")
+}
+
+func getFileNames(t *testing.T, dir string) []string {
+	d, err := os.Open(dir)
+	if err != nil {
+		t.Fatalf("opening %s: %s", dir, err)
+	}
+	defer d.Close()
+
+	// Find all snapshot and log files.
+	files, err := d.Readdirnames(0)
+	if err != nil {
+		t.Fatalf("reading %s: %s", dir, err)
+	}
+
+	return files
+}
+
+// waitForLeadership waits for r to be elected leader.
+func waitForLeadership(r *raft, timeout time.Duration) bool {
+	start := time.Now()
+	for {
+		_, role, _ := r.Status()
+		if role == RoleLeader {
+			return true
+		}
+		if time.Now().Sub(start) > timeout {
+			return false
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+}
+
+// TestSnapshot sets the limit for taking a snapshot down to 1 record and makes sure that snapshots are taken.
+func TestSnapshot(t *testing.T) {
+	vlog.Infof("TestSnapshot")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	td := tempDir(t)
+	defer os.RemoveAll(td)
+
+	config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td, SnapshotThreshold: 1}
+	rs, _ := buildRafts(t, ctx, 1, &config)
+	defer cleanUp(rs)
+	r := rs[0]
+	c1 := r.client
+	vlog.Infof("id is %s", r.Id())
+
+	// This should cause a snapshot.
+	if apperr, err := r.Append(ctx, []byte("string1")); err != nil || apperr != nil {
+		t.Fatalf("AppendToLog: %s/%s", apperr, err)
+	}
+	if apperr, err := r.Append(ctx, []byte("string2")); err != nil || apperr != nil {
+		t.Fatalf("AppendToLog: %s/%s", apperr, err)
+	}
+	if apperr, err := r.Append(ctx, []byte("string3")); err != nil || apperr != nil {
+		t.Fatalf("AppendToLog: %s/%s", apperr, err)
+	}
+
+	// Wait for the snapshot to appear.  It is a background task so just loop.
+	deadline := time.Now().Add(time.Minute)
+outer:
+	for {
+		time.Sleep(100 * time.Millisecond)
+		files := getFileNames(t, td)
+		for _, s := range files {
+			if strings.HasSuffix(s, ".snap") {
+				log := strings.TrimSuffix(s, ".snap") + ".log"
+				for _, l := range files {
+					if l == log {
+						break outer
+					}
+				}
+			}
+		}
+		if time.Now().After(deadline) {
+			t.Fatalf("timeout waiting for snap")
+		}
+	}
+	r.Stop()
+
+	// Restart.  We should read and restart from the snapshot.
+	rs, _ = buildRafts(t, ctx, 1, &config)
+	defer cleanUp(rs)
+	r = rs[0]
+	c2 := r.client
+	vlog.Infof("new id is %s", r.Id())
+
+	// Wait for leadership.
+	if !waitForLeadership(r, time.Minute) {
+		t.Fatalf("didn't become leader")
+	}
+
+	// Wait to commit.
+	time.Sleep(time.Second)
+	c1.(*client).Compare(t, c2.(*client))
+
+	vlog.Infof("TestSnapshot passed")
+}
+
+// TestRemoteSnapshot makes sure a member can be recovered from a snapshot taken elsewhere.
+func TestRemoteSnapshot(t *testing.T) {
+	vlog.Infof("TestRemoteSnapshot")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	config := RaftConfig{HostPort: "127.0.0.1:0", SnapshotThreshold: 30}
+	rs, cs := buildRafts(t, ctx, 3, &config)
+	defer cleanUp(rs)
+
+	// This should cause a few snapshots.
+	for i := 0; i < 100; i++ {
+		if apperr, err := rs[1].Append(ctx, []byte(fmt.Sprintf("string%d", i))); err != nil || apperr != nil {
+			t.Fatalf("Append: %s/%s", apperr, err)
+		}
+	}
+	vlog.Infof("Appends done")
+
+	// Wait for them all to get to the same point.
+	if !waitForLogAgreement(rs, time.Minute) {
+		t.Fatalf("no log agreement")
+	}
+	t.Log("Logs agree")
+	if !waitForAppliedAgreement(rs, cs, time.Minute) {
+		t.Fatalf("no applied agreement")
+	}
+	vlog.Infof("Applies agree")
+
+	// Stop a server and remove all its log and snapshot info, i.e., act like its starting
+	// from scratch.
+	rs[0].Stop()
+	os.RemoveAll(rs[0].logDir)
+	restart(t, ctx, rs, cs, rs[0])
+
+	// Wait for them all to get to the same point.
+	if !waitForLogAgreement(rs, time.Minute) {
+		t.Fatalf("no log agreement")
+	}
+	if !waitForAppliedAgreement(rs, cs, time.Minute) {
+		t.Fatalf("no applied agreement")
+	}
+
+	vlog.Infof("TestRemoteSnapshot passed")
+}
diff --git a/lib/raft/model.go b/lib/raft/model.go
new file mode 100644
index 0000000..0c5dab2
--- /dev/null
+++ b/lib/raft/model.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+	"io"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/security/access"
+)
+
+// Raft provides a consistent log across multiple instances of a client.
+
+// RaftClient defines the call backs from the Raft library to the application.
+type RaftClient interface {
+	// Apply appies a logged command, 'cmd', to the client. The commands will
+	// be delivered in the same order and with the same 'index' to all clients.
+	// 'index' is a monotonically increasing number and is just an index into the
+	// common log.
+	//
+	// Whenever a client restarts (after a crash perhaps) or falls too far behind
+	// (as in a partitioned network) it will be reinitialized with a RestoreFomSnapshot
+	// and then replayed all subsequent logged commands.
+	//
+	// A client that wishes to may return empty snapshots, i.e., just close the error
+	// channel without writing anything and worry about reliably storing its database
+	// itself.  It that case it must remember the highest index it has seen if it wishes
+	// to avoid replays.  Hence the index is supplied with the Apply().
+	Apply(cmd []byte, index Index) error
+
+	// SaveToSnapshot requests the application to write a snapshot to 'wr'.
+	// Until SaveToSnapshot returns, no commands will be Apply()ed.  Closing
+	// the response channel signals that the snapshot is finished.  Any
+	// error written to the response channel will be logged by the library
+	// and the library will discard the snapshot if any error is returned.
+	SaveToSnapshot(ctx *context.T, wr io.Writer, response chan<- error) error
+
+	// RestoreFromSnapshot requests the application to rebuild its database from the snapshot
+	// it must read from 'rd'.  'index' is the last index applied to the snapshot.  No Apply()s
+	// will be performed until RestoreFromSnapshot() returns. 'index' can be ignored
+	// or used for debugging.
+	RestoreFromSnapshot(ctx *context.T, index Index, rd io.Reader) error
+}
+
+const (
+	RoleCandidate = iota // Requesting to be voted leader.
+	RoleFollower
+	RoleLeader
+	RoleStopped
+)
+
+type Raft interface {
+	// AddMember adds a new member to the server set.  "id" is actually a network address for the member,
+	// currently host:port.  This has to be done before starting the server.
+	AddMember(ctx *context.T, id string) error
+
+	// Id returns the id of this member.
+	Id() string
+
+	// Start starts the local server communicating with other members.
+	Start()
+
+	// Stop terminates the server.   It cannot be Start'ed again.
+	Stop()
+
+	// Append appends a new command to the replicated log.  The command will be Apply()ed at each member
+	// once a quorum has logged it. The Append() will terminate once a quorum has logged it and at least
+	// the leader has Apply()ed the command.  'applyError' is the error returned by the Apply() while
+	// 'raftError' is returned by the raft library itself reporting that the Append could not be
+	// performed.
+	Append(ctx *context.T, cmd []byte) (applyError, raftError error)
+
+	// Status returns the state of the raft.
+	Status() (myId string, role int, leader string)
+
+	// StartElection forces an election.  Normally just used for debugging.
+	StartElection()
+}
+
+// RaftConfig is passed to NewRaft to avoid lots of parameters.
+type RaftConfig struct {
+	LogDir            string        // Directory in which to put log and snapshot files.
+	HostPort          string        // For RPCs from other members.
+	ServerName        string        // Where to mount if not empty.
+	Heartbeat         time.Duration // Time between heartbeats.
+	SnapshotThreshold int64         // Approximate number of log entries between snapshots.
+	Acl               access.AccessList  // For sending RPC to the members.
+}
+
+// NewRaft creates a new raft server.
+func NewRaft(ctx *context.T, config *RaftConfig, client RaftClient) (Raft, error) {
+	return newRaft(ctx, config, client)
+}
diff --git a/lib/raft/persistence.go b/lib/raft/persistence.go
new file mode 100644
index 0000000..7ec5128
--- /dev/null
+++ b/lib/raft/persistence.go
@@ -0,0 +1,76 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+	"io"
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+var (
+	errOutOfSequence = verror.Register(pkgPath+".errOutOfSequence", verror.NoRetry, "{1:}{2:} append {3}, {4} out of sequence{:_}")
+)
+
+// persistent represents all the persistent state for the raft algorithm.  The division exists should we
+// want to replace the file system based store with something else.
+type persistent interface {
+	// AppendLog appends cmds to the log starting at Index prevIndex+1.  It will return
+	// an error if there does not exist a previous command at index preIndex with term
+	// prevTerm.  It will also return an error if we cannot write to the persistent store.
+	AppendToLog(ctx *context.T, prevTerm Term, prevIndex Index, entries []LogEntry) error
+
+	// SetCurrentTerm, SetVotedFor, and SetCurrentTermAndVotedFor changes the non-log persistent state.
+	SetCurrentTerm(Term) error
+	IncCurrentTerm() error
+	SetVotedFor(id string) error
+	SetCurrentTermAndVotedFor(term Term, id string) error
+
+	// CurrentTerm returns the current Term.
+	CurrentTerm() Term
+
+	// LastIndex returns the highest index in the log.
+	LastIndex() Index
+
+	// LastTerm returns the highest term in the log.
+	LastTerm() Term
+
+	// GetVotedFor returns the current voted for string.
+	VotedFor() string
+
+	// Close all state.  No other calls can be made on this object following
+	// the Close().
+	Close()
+
+	// Lookup returns the log entry at that index or nil if none exists.
+	Lookup(Index) *logEntry
+
+	// LookupPrevious returns the index and term preceding Index.  It returns false if there is none.
+	// This is used when appending entries to the log.  The leader needs to send the follower the
+	// term and index of the last entry appended to the log, and the follower has to check if it
+	// matches what they have at that point.  However, either one may no longer have that entry, the
+	// leader because he is continuing after restoring from a snapshot or the follower because he has
+	// trimmed the log after a snapshot or for either when this is the first log record.  I could
+	// have entered fake log entries but this seemed easier since it is closer to the logic from the
+	// raft paper.
+	//
+	// Granted I could avoid returning the previous index and have each client do the index-1.  I just
+	// felt like doing it once rather than on every call.
+	LookupPrevious(Index) (Term, Index, bool)
+
+	// ConsiderSnapshot checks to see if it is time for a snapshot and, if so, calls back the client to
+	// generate a snapshot and then trims the log.  On return it is safe to continue
+	// RaftClient.Apply()ing commands although the snapshot may still be in progress.
+	ConsiderSnapshot(ctx *context.T, lastAppliedTerm Term, lastAppliedIndex Index)
+
+	// SnapshotFromLeader receives and stores a snapshot from the leader and then restores the
+	// client state from the snapshot.  'lastTermApplied' and 'lastIndexApplied' represent the last
+	// log entry RaftClient.Apply()ed before the snapshot was taken.
+	SnapshotFromLeader(ctx *context.T, lastTermApplied Term, lastIndexApplied Index, call raftProtoInstallSnapshotServerCall) error
+
+	// OpenLatestSnapshot opens the latest snapshot and returns a reader for it.  The returned Term
+	// and Index represent the last log entry RaftClient.Appy()ed before the snapshot was taken.
+	OpenLatestSnapshot(ctx *context.T) (io.Reader, Term, Index, error)
+}
diff --git a/lib/raft/raft.go b/lib/raft/raft.go
new file mode 100644
index 0000000..3dac805
--- /dev/null
+++ b/lib/raft/raft.go
@@ -0,0 +1,827 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+// This package implements the Raft protocol, https://ramcloud.stanford.edu/raft.pdf. The
+// logged commands are strings.   If someone wishes a more complex command structure, they
+// should use an encoding (e.g. json) into the strings.
+
+import (
+	"io"
+	"math/rand"
+	"sort"
+	"sync"
+	"time"
+
+	"v.io/x/lib/vlog"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/options"
+	"v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/lib.raft"
+
+var (
+	errBadAppend     = verror.Register(pkgPath+".errBadAppend", verror.NoRetry, "{1:}{2:} inconsistent append{:_}")
+	errAddAfterStart = verror.Register(pkgPath+".errAddAfterStart", verror.NoRetry, "{1:}{2:} adding member after start{:_}")
+	errNotLeader     = verror.Register(pkgPath+".errNotLeader", verror.NoRetry, "{1:}{2:} not the leader{:_}")
+	errWTF           = verror.Register(pkgPath+".errWTF", verror.NoRetry, "{1:}{2:} internal error{:_}")
+	errTimedOut      = verror.Register(pkgPath+".errTimedOut", verror.NoRetry, "{1:}{2:} request timed out{:_}")
+	errBadTerm       = verror.Register(pkgPath+".errBadTerm", verror.NoRetry, "{1:}{2:} new term {3} < {4} {:_}")
+)
+
+// member keeps track of another member's state.
+type member struct {
+	id         string
+	nextIndex  Index         // Next log index to send to this follower.
+	matchIndex Index         // Last entry logged by this follower.
+	stopped    chan struct{} // Follower go routine closes this to indicate it has terminated.
+	update     chan struct{}
+	timer      *time.Timer
+}
+
+// memberSlice is used for sorting members by highest logged (matched) entry.
+type memberSlice []*member
+
+func (m memberSlice) Len() int           { return len(m) }
+func (m memberSlice) Less(i, j int) bool { return m[i].matchIndex > m[j].matchIndex }
+func (m memberSlice) Swap(i, j int)      { m[i], m[j] = m[j], m[i] }
+
+// raft is the implementation of the raft library.
+type raft struct {
+	sync.Mutex
+
+	ctx       *context.T
+	cancel    context.CancelFunc
+	rng       *rand.Rand
+	timer     *time.Timer
+	heartbeat time.Duration
+
+	// rpc interface between instances.
+	s service
+
+	// Client interface.
+	client RaftClient
+
+	// applied is the highest log entry applied to the client.  All access/update is in a single
+	// go routine.
+	applied struct {
+		sync.RWMutex
+		index Index
+		term  Term
+	}
+
+	// Raft algorithm volatile state.
+	role        int
+	leader      string
+	quorum      int                // Number of members that form a quorum.
+	commitIndex Index              // Highest index commited.
+	memberMap   map[string]*member // Map of raft members (including current).
+	memberSet   memberSlice        // Slice of raft members (including current).
+	me          *member
+
+	// Raft algorithm persistent state
+	p      persistent
+	logDir string
+
+	// stop and stopped are for clean shutdown.  All long lived go routines (perFollower and serverEvents)
+	// exit when stop is closed.  Each perFollower then closes member.stopped and serverEvents closes
+	// stopped to signal that they are finished.
+	stop    chan struct{} // perFollower and serverEvents go routines exit when this is closed.
+	stopped chan struct{} // serverEvents go routine closes this to indicate it has terminated.
+
+	// Each time a perFollower successfully logs entries on a follower it writes to newMatch to get the
+	// serverEvents routine to possibly update the commitIndex.
+	newMatch chan struct{} // Each time a follower reports a logged entry a message is sent on this.
+
+	// Each time a follower receives a new commit index, it sends it to newCommit to get the serverEvents
+	// routine to apply any newly committed entries.
+	newCommit chan Index // Each received leader commit index is written to this.
+
+	// Wait here for commitIndex to change.
+	ccv *sync.Cond
+
+	// Wait here for leadership to change.
+	lcv *sync.Cond
+}
+
+// logentry is the in memory structure for each logged item.  It is
+type logEntry struct {
+	Term       Term
+	Index      Index
+	Cmd        []byte
+	Type       byte
+	ApplyError error
+}
+
+// newRaft creates a new raft server.
+//  logDir        - the name of the directory in which to persist the log.
+//  serverName    - a name for the server to announce itself as in a mount table.  All members should use the
+//                 same name and hence be alternatives for callers.
+//  hostPort      - the network address of the server
+//  hb            - the interval between heartbeats.  0 means use default.
+//  snapshotThreshold - the size the log can reach before we create a snapshot.  0 means use default.
+//  client        - callbacks to the client.
+func newRaft(ctx *context.T, config *RaftConfig, client RaftClient) (*raft, error) {
+	nctx, cancel := context.WithCancel(ctx)
+	r := &raft{}
+	r.ctx = nctx
+	r.cancel = cancel
+	r.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
+	r.heartbeat = config.Heartbeat
+	if r.heartbeat == 0 {
+		r.heartbeat = 3 * time.Second
+	}
+
+	// Client interface.
+	r.client = client
+	r.applied.term = 0
+	r.applied.index = 0
+
+	// Raft volatile state.
+	r.role = RoleStopped
+	r.commitIndex = 0
+	r.leader = ""
+	r.memberMap = make(map[string]*member)
+	r.memberSet = make([]*member, 0)
+	r.AddMember(ctx, config.HostPort)
+	r.me = r.memberMap[config.HostPort]
+
+	// Raft persistent state.
+	var err error
+	r.logDir = config.LogDir
+	if r.p, err = openPersist(ctx, r, config.SnapshotThreshold); err != nil {
+		return nil, err
+	}
+
+	// Internal communication/synchronization.
+	r.newMatch = make(chan struct{}, 100)
+	r.newCommit = make(chan Index, 100)
+	r.ccv = sync.NewCond(r)
+	r.lcv = sync.NewCond(r)
+
+	// The RPC interface to other members.
+	eps, err := r.s.newService(nctx, r, config.ServerName, config.HostPort, config.Acl)
+	if err != nil {
+		return nil, err
+	}
+
+	// If we're in the V namespace, just use the name as our Id.  If not create one
+	// from the network address.
+	r.me.id = config.ServerName
+	if r.me.id == "" {
+		r.me.id = string(getShortName(eps[0]))
+	}
+
+	return r, nil
+}
+
+// getShortName will return a /host:port name if possible.  Otherwise it will just return the name
+// version of the endpoint.
+func getShortName(ep naming.Endpoint) string {
+	if ep.Addr().Network() != "tcp" {
+		return ep.Name()
+	}
+	return naming.JoinAddressName(ep.Addr().String(), "")
+}
+
+// AddMember adds the id as a raft member.  The id must be a vanadium name.
+func (r *raft) AddMember(ctx *context.T, id string) error {
+	if r.role != RoleStopped {
+		// Already started.
+		// TODO(p): Raft has a protocol for changing membership after
+		// start.  I'll add that after I get at least one client
+		// working.
+		return verror.New(errAddAfterStart, ctx)
+	}
+	m := &member{id: id, stopped: make(chan struct{}), update: make(chan struct{}, 10)}
+	r.memberMap[id] = m
+	r.memberSet = append(r.memberSet, m)
+	// Quorum has to be more than half the servers.
+	r.quorum = (len(r.memberSet) + 1) / 2
+	return nil
+}
+
+// Id returns the vanadium name of this server.
+func (r *raft) Id() string {
+	return r.me.id
+}
+
+// Start gets the protocol going.
+func (r *raft) Start() {
+	vlog.Infof("@%s starting", r.me.id)
+	r.Lock()
+	defer r.Unlock()
+	if r.role != RoleStopped {
+		// already started
+		return
+	}
+	r.timer = time.NewTimer(2 * r.heartbeat)
+
+	// Signaling for stopping the perFollower and serverEvents go routines.
+	r.stop = make(chan struct{})
+	r.stopped = make(chan struct{})
+
+	// serverEvents serializes events for this server.
+	go r.serverEvents()
+
+	// perFollowers updates the followers when we're the leader.
+	for _, m := range r.memberSet {
+		if m.id != r.me.id {
+			go r.perFollower(m)
+		}
+	}
+	return
+}
+
+// Stop ceases all function as a raft server.
+func (r *raft) Stop() {
+	vlog.Infof("@%s stopping", r.me.id)
+	r.Lock()
+	if r.role == RoleStopped {
+		r.Unlock()
+		return
+	}
+	r.role = RoleStopped
+	r.Unlock()
+	r.cancel()
+
+	// Stop the associated go routines.
+	close(r.stop)
+
+	// Wait for serverEvents to stop.
+	<-r.stopped
+
+	// Wait for all the perFollower routines to stop.
+	for _, m := range r.memberSet {
+		if m.id != r.me.id {
+			<-m.stopped
+		}
+	}
+
+	// Shut down the log file.
+	r.p.Close()
+
+	vlog.Infof("@%s stopping service", r.me.id)
+	r.s.stopService()
+	vlog.Infof("@%s stopped", r.me.id)
+}
+
+// setRoleAndWatchdogTimer called with r.l locked.
+func (r *raft) setRoleAndWatchdogTimer(role int) {
+	vlog.VI(2).Infof("@%s %s->%s", r.me.id, roleToString(r.role), roleToString(role))
+	r.role = role
+	switch role {
+	case RoleFollower:
+		// Wake up any RaftProto.Append()s waiting for a commitment.  They
+		// will now have to give up since we are no longer leader.
+		r.ccv.Broadcast()
+		// Set a timer to start an election if we no longer hear from the leader.
+		r.resetTimerFuzzy(2 * r.heartbeat)
+	case RoleLeader:
+		// Set known follower status to default values.
+		for _, m := range r.memberSet {
+			if m.id != r.me.id {
+				m.nextIndex = r.p.LastIndex() + 1
+				m.matchIndex = 0
+			}
+		}
+		r.leader = r.me.id
+		// Tell followers we are now the leader.
+		r.appendNull()
+	case RoleCandidate:
+		// Set a timer to restart the election if noone becomes leader.
+		r.resetTimerFuzzy(r.heartbeat)
+	}
+}
+
+func (r *raft) appendNull() {
+	// Assign an index and term to the log entry.
+	le := LogEntry{Term: r.p.CurrentTerm(), Index: r.p.LastIndex() + 1, Cmd: nil, Type: RaftEntry}
+
+	// Append to our own log.
+	if err := r.p.AppendToLog(r.ctx, r.p.LastTerm(), r.p.LastIndex(), []LogEntry{le}); err != nil {
+		// This shouldn't happen.
+		return
+	}
+
+	// Update the fact that we've logged it.
+	r.setMatchIndex(r.me, le.Index)
+	r.kickFollowers()
+}
+
+// Status returns the current member's id, its raft role, and who it thinks is leader.
+func (r *raft) Status() (string, int, string) {
+	r.Lock()
+	defer r.Unlock()
+	return r.me.id, r.role, r.leader
+}
+
+// StartElection starts a new round of voting.  We do this by incrementing the
+// Term and, in parallel, calling each other member to vote.  If we receive a
+// majority we win and send a heartbeat to each member.
+//
+// Someone else many get elected in the middle of the vote so we have to
+// make sure we're still a candidate at the end of the voting.
+func (r *raft) StartElection() {
+	r.Lock()
+	defer r.Unlock()
+	r.startElection()
+}
+
+func (r *raft) startElection() {
+	// If we can't get a response in 2 seconds, something is really wrong.
+	ctx, _ := context.WithTimeout(r.ctx, time.Duration(2)*time.Second)
+	if err := r.p.IncCurrentTerm(); err != nil {
+		// If this fails, there's no way to recover.
+		vlog.Fatalf("incrementing current term: %s", err)
+		return
+	}
+	vlog.VI(2).Infof("@%s startElection new term %d", r.me.id, r.p.CurrentTerm())
+
+	msg := []interface{}{
+		r.p.CurrentTerm(),
+		r.me.id,
+		r.p.LastTerm(),
+		r.p.LastIndex(),
+	}
+	var members []string
+	for k, m := range r.memberMap {
+		if m.id == r.me.id {
+			continue
+		}
+		members = append(members, k)
+	}
+	r.setRoleAndWatchdogTimer(RoleCandidate)
+	r.p.SetVotedFor(r.me.id)
+	r.leader = ""
+	r.Unlock()
+
+	// We have to do this outside the lock or the system will deadlock when two members start overlapping votes.
+	c := make(chan bool)
+	for _, id := range members {
+		go func(id string) {
+			var term Term
+			var ok bool
+			client := v23.GetClient(ctx)
+			if err := client.Call(ctx, id, "RequestVote", msg, []interface{}{&term, &ok}, options.Preresolved{}); err != nil {
+				vlog.VI(2).Infof("@%s sending RequestVote to %s: %s", r.me.id, id, err)
+			}
+			c <- ok
+		}(id)
+	}
+
+	// Wait till all the voters have voted or timed out.
+	oks := 1 // We vote for ourselves.
+	for range members {
+		if <-c {
+			oks++
+		}
+	}
+
+	r.Lock()
+	// We have to check the role since someone else may have become the leader during the round and
+	// made us a follower.
+	if oks <= len(members)/2 || r.role != RoleCandidate {
+		vlog.VI(2).Infof("@%s lost election with %d votes", r.me.id, oks)
+		return
+	}
+	vlog.Infof("@%s won election with %d votes", r.me.id, oks)
+	r.setRoleAndWatchdogTimer(RoleLeader)
+	r.leader = r.me.id
+	r.setMatchIndex(r.me, r.p.LastIndex())
+	r.lcv.Broadcast()
+}
+
+// applyCommits applies any committed entries.
+func (r *raft) applyCommits(commitIndex Index) {
+	for r.applied.index < commitIndex {
+		// This is the only go routine that changes r.applied
+		// so we don't have to protect our reads.
+		next := r.applied.index + 1
+		le := r.p.Lookup(next)
+		if le == nil {
+			// Commit index is ahead of our highest entry.
+			return
+		}
+		switch le.Type {
+		case ClientEntry:
+			le.ApplyError = r.client.Apply(le.Cmd, le.Index)
+		case RaftEntry:
+		}
+
+		// But we do have to lock our writes.
+		r.applied.Lock()
+		r.applied.index = next
+		r.applied.term = le.Term
+		r.applied.Unlock()
+	}
+
+	r.p.ConsiderSnapshot(r.ctx, r.applied.term, r.applied.index)
+}
+
+func (r *raft) lastApplied() Index {
+	r.applied.RLock()
+	defer r.applied.RUnlock()
+	return r.applied.index
+}
+
+func (r *raft) resetTimerFuzzy(d time.Duration) {
+	fuzz := time.Duration(rand.Int63n(int64(r.heartbeat)))
+	r.timer.Reset(d + fuzz)
+}
+
+func (r *raft) resetTimer(d time.Duration) {
+	r.timer.Reset(d)
+}
+
+func highestFromChan(i Index, c chan Index) Index {
+	for {
+		select {
+		case j := <-c:
+			if j > i {
+				i = j
+			}
+		default:
+			return i
+		}
+	}
+}
+
+// serverEvents is a go routine that serializes server events.  This loop performs:
+// (1) all changes to commitIndex both as a leader and a follower.
+// (2) all application of committed log commands.
+// (3) all elections.
+func (r *raft) serverEvents() {
+	r.Lock()
+	r.setRoleAndWatchdogTimer(RoleFollower)
+	r.Unlock()
+	for {
+		select {
+		case <-r.stop:
+			// Terminate.
+			close(r.stopped)
+			return
+		case <-r.timer.C:
+			// Start an election whenever either:
+			// (1) a follower hasn't heard from the leader in a random interval > 2 * heartbeat.
+			// (2) a candidate hasn't won an election or been told anyone else has after hearbeat.
+			r.Lock()
+			switch r.role {
+			case RoleCandidate:
+				r.startElection()
+			case RoleFollower:
+				r.startElection()
+			}
+			r.Unlock()
+		case <-r.newMatch:
+			// Soak up any queued requests.
+			emptyChan(r.newMatch)
+
+			// This happens whenever we have gotten a reply from a follower.  We do it
+			// here rather than in perFollower solely as a matter of taste.
+			// Update the commitIndex if needed and apply any newly committed entries.
+			r.Lock()
+			if r.role != RoleLeader {
+				r.Unlock()
+				continue
+			}
+			sort.Sort(r.memberSet)
+			ci := r.memberSet[r.quorum-1].matchIndex
+			if ci <= r.commitIndex {
+				r.Unlock()
+				continue
+			}
+			r.commitIndex = ci
+			r.Unlock()
+			r.applyCommits(ci)
+			r.ccv.Broadcast()
+			r.kickFollowers()
+		case i := <-r.newCommit:
+			// Get highest queued up commit.
+			i = highestFromChan(i, r.newCommit)
+
+			// Update the commitIndex if needed and apply any newly committed entries.
+			r.Lock()
+			if r.role != RoleFollower {
+				r.Unlock()
+				continue
+			}
+			if i > r.commitIndex {
+				r.commitIndex = i
+			}
+			ci := r.commitIndex
+			r.Unlock()
+			r.applyCommits(ci)
+			r.ccv.Broadcast()
+		}
+	}
+}
+
+// makeAppendMsg creates an append message at most 10 entries long.
+func (r *raft) makeAppendMsg(m *member) ([]interface{}, int) {
+	// Figure out if we know the previous entry.
+	prevTerm, prevIndex, ok := r.p.LookupPrevious(m.nextIndex)
+	if !ok {
+		return nil, 0
+	}
+	// Collect some log entries to send along.  0 is ok.
+	var entries []LogEntry
+	for i := 0; i < 10; i++ {
+		le := r.p.Lookup(m.nextIndex + Index(i))
+		if le == nil {
+			break
+		}
+		entries = append(entries, LogEntry{Cmd: le.Cmd, Term: le.Term, Index: le.Index, Type: le.Type})
+	}
+	return []interface{}{
+		r.p.CurrentTerm(),
+		r.me.id,
+		prevIndex,
+		prevTerm,
+		r.commitIndex,
+		entries,
+	}, len(entries)
+}
+
+// updateFollower loops trying to update a follower until the follower is updated or we can't proceed.
+// It will always send at least one update so will also act as a heartbeat.
+func (r *raft) updateFollower(m *member) {
+	// Bring this server up to date.
+	r.Lock()
+	defer r.Unlock()
+	for {
+		// If we're not the leader we have no followers.
+		if r.role != RoleLeader {
+			return
+		}
+
+		// Collect some log entries starting at m.nextIndex.
+		msg, n := r.makeAppendMsg(m)
+		if msg == nil {
+			// Try sending a snapshot.
+			r.Unlock()
+			vlog.Infof("@%s sending snapshot to %s", r.me.id, m.id)
+			snapIndex, err := r.sendLatestSnapshot(m)
+			r.Lock()
+			if err != nil {
+				// Try again later.
+				vlog.Errorf("@%s sending snapshot to %s: %s", r.me.id, m.id, err)
+				return
+			}
+			m.nextIndex = snapIndex + 1
+			vlog.Infof("@%s sent snapshot to %s", r.me.id, m.id)
+			// Try sending anything following the snapshot.
+			continue
+		}
+
+		// Send to the follower. We drop the lock while we do this. That means we may stop being the
+		// leader in the middle of the call but that's OK as long as we check when we get it back.
+		r.Unlock()
+		ctx, _ := context.WithTimeout(r.ctx, time.Duration(2)*time.Second)
+		client := v23.GetClient(ctx)
+		err := client.Call(ctx, m.id, "AppendToLog", msg, []interface{}{}, options.Preresolved{})
+		r.Lock()
+		if r.role != RoleLeader {
+			// Not leader any more, doesn't matter how he replied.
+			return
+		}
+
+		if err != nil {
+			if verror.ErrorID(err) != errOutOfSequence.ID {
+				// A problem other than missing entries.  Retry later.
+				//vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
+				vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
+				return
+			}
+			// At this point we know that the follower is missing entries pervious to what
+			// we just sent.  If we can backup, do it.  Otherwise try sending a snapshot.
+			if m.nextIndex <= 1 {
+				return
+			}
+			prev := r.p.Lookup(m.nextIndex - 1)
+			if prev == nil {
+				return
+			}
+			// We can back up.
+			m.nextIndex = m.nextIndex - 1
+			continue
+		}
+
+		// The follower appended correctly, update indices and tell the server thread that
+		// the commit index may need to change.
+		m.nextIndex += Index(n)
+		logged := m.nextIndex - 1
+		if n > 0 {
+			r.setMatchIndex(m, logged)
+		}
+
+		// The follower is caught up?
+		if m.nextIndex > r.p.LastIndex() {
+			return
+		}
+	}
+}
+
+func (r *raft) sendLatestSnapshot(m *member) (Index, error) {
+	rd, term, index, err := r.p.OpenLatestSnapshot(r.ctx)
+	if err != nil {
+		return 0, err
+	}
+	ctx, _ := context.WithTimeout(r.ctx, time.Duration(5*60)*time.Second)
+	client := raftProtoClient(m.id)
+	call, err := client.InstallSnapshot(ctx, r.p.CurrentTerm(), r.me.id, term, index, options.Preresolved{})
+	if err != nil {
+		return 0, err
+	}
+	sstream := call.SendStream()
+	b := make([]byte, 10240)
+	for {
+		n, err := rd.Read(b)
+		if n == 0 && err == io.EOF {
+			break
+		}
+		if err = sstream.Send(b); err != nil {
+			return 0, err
+		}
+	}
+	if err := call.Finish(); err != nil {
+		return 0, err
+	}
+	return index, nil
+}
+
+func emptyChan(c chan struct{}) {
+	for {
+		select {
+		case <-c:
+		default:
+			return
+		}
+	}
+}
+
+// perFollower is a go routine that sequences all messages to a single follower.
+//
+// This is the only go routine that updates the follower's variables so all changes to
+// the member struct are serialized by it.
+func (r *raft) perFollower(m *member) {
+	m.timer = time.NewTimer(r.heartbeat)
+	for {
+		select {
+		case <-m.timer.C:
+			r.updateFollower(m)
+			m.timer.Reset(r.heartbeat)
+		case <-m.update:
+			// Soak up any waiting update requests
+			emptyChan(m.update)
+			r.updateFollower(m)
+			m.timer.Reset(r.heartbeat)
+		case <-r.stop:
+			close(m.stopped)
+			return
+		}
+	}
+}
+
+// kickFollowers causes each perFollower routine to try to update its followers.
+func (r *raft) kickFollowers() {
+	for _, m := range r.memberMap {
+		select {
+		case m.update <- struct{}{}:
+		default:
+		}
+	}
+}
+
+// setMatchIndex updates the matchIndex for a member.
+//
+// called with r locked.
+func (r *raft) setMatchIndex(m *member, i Index) {
+	m.matchIndex = i
+	if i <= r.commitIndex {
+		return
+	}
+	// Check if we need to change the commit index.
+	select {
+	case r.newMatch <- struct{}{}:
+	default:
+	}
+}
+
+func minIndex(indices ...Index) Index {
+	if len(indices) == 0 {
+		return 0
+	}
+	min := indices[0]
+	for _, x := range indices[1:] {
+		if x < min {
+			min = x
+		}
+	}
+	return min
+}
+
+func roleToString(r int) string {
+	switch r {
+	case RoleCandidate:
+		return "candidate"
+	case RoleLeader:
+		return "leader"
+	case RoleFollower:
+		return "follower"
+	}
+	return "?"
+}
+
+func (r *raft) waitForApply(ctx *context.T, term Term, index Index) (error, error) {
+	for {
+		r.applied.RLock()
+		applied := r.applied.index
+		r.applied.RUnlock()
+
+		if applied >= index {
+			le := r.p.Lookup(index)
+			if le == nil || le.Term != term {
+				// There was an election and the log entry was lost.
+				return nil, verror.New(errNotLeader, ctx)
+			}
+			return le.ApplyError, nil
+		}
+
+		// Give up if the caller doesn't want to wait.
+		select {
+		case <-ctx.Done():
+			return nil, verror.New(errTimedOut, ctx)
+		default:
+		}
+
+		// Wait for an apply to happen.
+		r.Lock()
+		r.ccv.Wait()
+		r.Unlock()
+	}
+}
+
+// Append tells the leader to append to the log.  The first error is the result of the client.Apply.  The second
+// is any error from raft.
+func (r *raft) Append(ctx *context.T, cmd []byte) (applyErr error, err error) {
+	for {
+		r.Lock()
+		if len(r.leader) == 0 {
+			r.lcv.Wait()
+		}
+		leader := r.leader
+		role := r.role
+		r.Unlock()
+
+		switch role {
+		case RoleLeader:
+			term, index, err := r.s.Append(ctx, nil, cmd)
+			if err == nil {
+				// We were the leader and the entry has now been applied.
+				return r.waitForApply(ctx, term, index)
+			}
+			// If the leader can't do it, give up.
+			if verror.ErrorID(err) != errNotLeader.ID {
+				return nil, err
+			}
+		case RoleFollower:
+			client := v23.GetClient(ctx)
+			var index Index
+			var term Term
+			if len(leader) == 0 {
+				break
+			}
+			if err = client.Call(ctx, leader, "Append", []interface{}{cmd}, []interface{}{&term, &index}, options.Preresolved{}); err == nil {
+				return r.waitForApply(ctx, term, index)
+			}
+			// If the leader can't do it, give up.
+			if verror.ErrorID(err) != errNotLeader.ID {
+				return nil, err
+			}
+		}
+
+		// Give up if the caller doesn't want to wait.
+		select {
+		case <-ctx.Done():
+			err = verror.New(errTimedOut, ctx)
+			return
+		default:
+		}
+	}
+}
+
+func (r *raft) Leader() (bool, string) {
+	r.Lock()
+	defer r.Unlock()
+	if r.role == RoleLeader {
+		return true, r.leader
+	}
+	return false, r.leader
+}
diff --git a/lib/raft/raft.vdl b/lib/raft/raft.vdl
new file mode 100644
index 0000000..bcc8912
--- /dev/null
+++ b/lib/raft/raft.vdl
@@ -0,0 +1,75 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+// This vdl defines the interface between raft members and not the library interface to raft.
+
+// Every log entry is represented by a Term and an Index.
+
+// Term is a counter incremented each time a member starts an election.  The log will
+// show gaps in Term numbers because all elections need not be successful.
+type Term uint64
+
+// Index is an index into the log.  The log entries are numbered sequentially.  At the moment
+// the entries RaftClient.Apply()ed should be sequential but that will change if we introduce
+// system entries. For example, we could have an entry type that is used to add members to the
+// set of replicas.
+type Index uint64
+
+// LogEntry types.
+const (
+	ClientEntry = byte(0)
+	RaftEntry = byte(1)
+)
+
+// The LogEntry is what the log consists of.  'error' starts nil and is never written to stable
+// storage.  It represents the result of RaftClient.Apply(Cmd, Index).  This is a hack but I
+// haven't figured out a better way.
+type LogEntry struct {
+	Term  Term
+	Index Index
+	Cmd   []byte
+	Type  byte
+}
+
+// raftProto is used by the members of a raft set to communicate with each other.
+type raftProto interface {
+	// Members returns the current set of ids of raft members.
+	Members() ([]string | error)
+
+	// Leader returns the id of the current leader.
+	Leader() (string | error)
+
+	// RequestVote starts a new round of voting.  It returns the server's current Term and true if
+	// the server voted for the client.
+	RequestVote(term Term, candidateId string, lastLogTerm Term, lastLogIndex Index) (Term Term, Granted bool | error)
+
+	// AppendToLog is sent by the leader to tell followers to append an entry.  If cmds
+	// is empty, this is a keep alive message (at a random interval after a keep alive, followers
+	// will initiate a new round of voting).
+	//   term -- the current term of the sender
+	//   leaderId -- the id of the sender
+	//   prevIndex -- the index of the log entry immediately preceding cmds
+	//   prevTerm -- the term of the log entry immediately preceding cmds.  The receiver must have
+	//               received the previous index'd entry and it must have had the same term.  Otherwise
+	//               an error is returned.
+	//   leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+	//                   to have logged.
+	//   cmds -- sequential log entries starting at prevIndex+1
+	AppendToLog(term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry) error
+
+	// Append is sent to the leader by followers.  Only the leader is allowed to send AppendToLog.
+	// If a follower receives an Append() call it performs an Append() to the leader to run the actual
+	// Raft algorithm.  The leader will respond after it has RaftClient.Apply()ed the command.
+	//
+	// Returns the term and index of the append entry or an error.
+	Append(cmd []byte) (term Term, index Index | error)
+
+	// InstallSnapshot is sent from the leader to follower to install the given snapshot.  It is
+	// sent when it becomes apparent that the leader does not have log entries needed by the follower
+	// to progress.  'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+	// snapshot.
+	InstallSnapshot(term Term, leaderId string, appliedTerm Term, appliedIndex Index) stream<[]byte> error
+}
diff --git a/lib/raft/raft.vdl.go b/lib/raft/raft.vdl.go
new file mode 100644
index 0000000..c579d09
--- /dev/null
+++ b/lib/raft/raft.vdl.go
@@ -0,0 +1,484 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: raft.vdl
+
+package raft
+
+import (
+	// VDL system imports
+	"io"
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/vdl"
+)
+
+// Term is a counter incremented each time a member starts an election.  The log will
+// show gaps in Term numbers because all elections need not be successful.
+type Term uint64
+
+func (Term) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/lib/raft.Term"`
+}) {
+}
+
+// Index is an index into the log.  The log entries are numbered sequentially.  At the moment
+// the entries RaftClient.Apply()ed should be sequential but that will change if we introduce
+// system entries. For example, we could have an entry type that is used to add members to the
+// set of replicas.
+type Index uint64
+
+func (Index) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/lib/raft.Index"`
+}) {
+}
+
+// The LogEntry is what the log consists of.  'error' starts nil and is never written to stable
+// storage.  It represents the result of RaftClient.Apply(Cmd, Index).  This is a hack but I
+// haven't figured out a better way.
+type LogEntry struct {
+	Term  Term
+	Index Index
+	Cmd   []byte
+	Type  byte
+}
+
+func (LogEntry) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/lib/raft.LogEntry"`
+}) {
+}
+
+func init() {
+	vdl.Register((*Term)(nil))
+	vdl.Register((*Index)(nil))
+	vdl.Register((*LogEntry)(nil))
+}
+
+const ClientEntry = byte(0)
+
+const RaftEntry = byte(1)
+
+// raftProtoClientMethods is the client interface
+// containing raftProto methods.
+//
+// raftProto is used by the members of a raft set to communicate with each other.
+type raftProtoClientMethods interface {
+	// Members returns the current set of ids of raft members.
+	Members(*context.T, ...rpc.CallOpt) ([]string, error)
+	// Leader returns the id of the current leader.
+	Leader(*context.T, ...rpc.CallOpt) (string, error)
+	// RequestVote starts a new round of voting.  It returns the server's current Term and true if
+	// the server voted for the client.
+	RequestVote(_ *context.T, term Term, candidateId string, lastLogTerm Term, lastLogIndex Index, _ ...rpc.CallOpt) (Term Term, Granted bool, _ error)
+	// AppendToLog is sent by the leader to tell followers to append an entry.  If cmds
+	// is empty, this is a keep alive message (at a random interval after a keep alive, followers
+	// will initiate a new round of voting).
+	//   term -- the current term of the sender
+	//   leaderId -- the id of the sender
+	//   prevIndex -- the index of the log entry immediately preceding cmds
+	//   prevTerm -- the term of the log entry immediately preceding cmds.  The receiver must have
+	//               received the previous index'd entry and it must have had the same term.  Otherwise
+	//               an error is returned.
+	//   leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+	//                   to have logged.
+	//   cmds -- sequential log entries starting at prevIndex+1
+	AppendToLog(_ *context.T, term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry, _ ...rpc.CallOpt) error
+	// Append is sent to the leader by followers.  Only the leader is allowed to send AppendToLog.
+	// If a follower receives an Append() call it performs an Append() to the leader to run the actual
+	// Raft algorithm.  The leader will respond after it has RaftClient.Apply()ed the command.
+	//
+	// Returns the term and index of the append entry or an error.
+	Append(_ *context.T, cmd []byte, _ ...rpc.CallOpt) (term Term, index Index, _ error)
+	// InstallSnapshot is sent from the leader to follower to install the given snapshot.  It is
+	// sent when it becomes apparent that the leader does not have log entries needed by the follower
+	// to progress.  'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+	// snapshot.
+	InstallSnapshot(_ *context.T, term Term, leaderId string, appliedTerm Term, appliedIndex Index, _ ...rpc.CallOpt) (raftProtoInstallSnapshotClientCall, error)
+}
+
+// raftProtoClientStub adds universal methods to raftProtoClientMethods.
+type raftProtoClientStub interface {
+	raftProtoClientMethods
+	rpc.UniversalServiceMethods
+}
+
+// raftProtoClient returns a client stub for raftProto.
+func raftProtoClient(name string) raftProtoClientStub {
+	return implraftProtoClientStub{name}
+}
+
+type implraftProtoClientStub struct {
+	name string
+}
+
+func (c implraftProtoClientStub) Members(ctx *context.T, opts ...rpc.CallOpt) (o0 []string, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "Members", nil, []interface{}{&o0}, opts...)
+	return
+}
+
+func (c implraftProtoClientStub) Leader(ctx *context.T, opts ...rpc.CallOpt) (o0 string, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "Leader", nil, []interface{}{&o0}, opts...)
+	return
+}
+
+func (c implraftProtoClientStub) RequestVote(ctx *context.T, i0 Term, i1 string, i2 Term, i3 Index, opts ...rpc.CallOpt) (o0 Term, o1 bool, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "RequestVote", []interface{}{i0, i1, i2, i3}, []interface{}{&o0, &o1}, opts...)
+	return
+}
+
+func (c implraftProtoClientStub) AppendToLog(ctx *context.T, i0 Term, i1 string, i2 Index, i3 Term, i4 Index, i5 []LogEntry, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "AppendToLog", []interface{}{i0, i1, i2, i3, i4, i5}, nil, opts...)
+	return
+}
+
+func (c implraftProtoClientStub) Append(ctx *context.T, i0 []byte, opts ...rpc.CallOpt) (o0 Term, o1 Index, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "Append", []interface{}{i0}, []interface{}{&o0, &o1}, opts...)
+	return
+}
+
+func (c implraftProtoClientStub) InstallSnapshot(ctx *context.T, i0 Term, i1 string, i2 Term, i3 Index, opts ...rpc.CallOpt) (ocall raftProtoInstallSnapshotClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "InstallSnapshot", []interface{}{i0, i1, i2, i3}, opts...); err != nil {
+		return
+	}
+	ocall = &implraftProtoInstallSnapshotClientCall{ClientCall: call}
+	return
+}
+
+// raftProtoInstallSnapshotClientStream is the client stream for raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotClientStream interface {
+	// SendStream returns the send side of the raftProto.InstallSnapshot client stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors
+		// encountered while sending, or if Send is called after Close or
+		// the stream has been canceled.  Blocks if there is no buffer
+		// space; will unblock when buffer space is available or after
+		// the stream has been canceled.
+		Send(item []byte) error
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.
+		// This is an optional call - e.g. a client might call Close if it
+		// needs to continue receiving items from the server after it's
+		// done sending.  Returns errors encountered while closing, or if
+		// Close is called after the stream has been canceled.  Like Send,
+		// blocks if there is no buffer space available.
+		Close() error
+	}
+}
+
+// raftProtoInstallSnapshotClientCall represents the call returned from raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotClientCall interface {
+	raftProtoInstallSnapshotClientStream
+	// Finish performs the equivalent of SendStream().Close, then blocks until
+	// the server is done, and returns the positional return values for the call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implraftProtoInstallSnapshotClientCall struct {
+	rpc.ClientCall
+}
+
+func (c *implraftProtoInstallSnapshotClientCall) SendStream() interface {
+	Send(item []byte) error
+	Close() error
+} {
+	return implraftProtoInstallSnapshotClientCallSend{c}
+}
+
+type implraftProtoInstallSnapshotClientCallSend struct {
+	c *implraftProtoInstallSnapshotClientCall
+}
+
+func (c implraftProtoInstallSnapshotClientCallSend) Send(item []byte) error {
+	return c.c.Send(item)
+}
+func (c implraftProtoInstallSnapshotClientCallSend) Close() error {
+	return c.c.CloseSend()
+}
+func (c *implraftProtoInstallSnapshotClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
+// raftProtoServerMethods is the interface a server writer
+// implements for raftProto.
+//
+// raftProto is used by the members of a raft set to communicate with each other.
+type raftProtoServerMethods interface {
+	// Members returns the current set of ids of raft members.
+	Members(*context.T, rpc.ServerCall) ([]string, error)
+	// Leader returns the id of the current leader.
+	Leader(*context.T, rpc.ServerCall) (string, error)
+	// RequestVote starts a new round of voting.  It returns the server's current Term and true if
+	// the server voted for the client.
+	RequestVote(_ *context.T, _ rpc.ServerCall, term Term, candidateId string, lastLogTerm Term, lastLogIndex Index) (Term Term, Granted bool, _ error)
+	// AppendToLog is sent by the leader to tell followers to append an entry.  If cmds
+	// is empty, this is a keep alive message (at a random interval after a keep alive, followers
+	// will initiate a new round of voting).
+	//   term -- the current term of the sender
+	//   leaderId -- the id of the sender
+	//   prevIndex -- the index of the log entry immediately preceding cmds
+	//   prevTerm -- the term of the log entry immediately preceding cmds.  The receiver must have
+	//               received the previous index'd entry and it must have had the same term.  Otherwise
+	//               an error is returned.
+	//   leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+	//                   to have logged.
+	//   cmds -- sequential log entries starting at prevIndex+1
+	AppendToLog(_ *context.T, _ rpc.ServerCall, term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry) error
+	// Append is sent to the leader by followers.  Only the leader is allowed to send AppendToLog.
+	// If a follower receives an Append() call it performs an Append() to the leader to run the actual
+	// Raft algorithm.  The leader will respond after it has RaftClient.Apply()ed the command.
+	//
+	// Returns the term and index of the append entry or an error.
+	Append(_ *context.T, _ rpc.ServerCall, cmd []byte) (term Term, index Index, _ error)
+	// InstallSnapshot is sent from the leader to follower to install the given snapshot.  It is
+	// sent when it becomes apparent that the leader does not have log entries needed by the follower
+	// to progress.  'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+	// snapshot.
+	InstallSnapshot(_ *context.T, _ raftProtoInstallSnapshotServerCall, term Term, leaderId string, appliedTerm Term, appliedIndex Index) error
+}
+
+// raftProtoServerStubMethods is the server interface containing
+// raftProto methods, as expected by rpc.Server.
+// The only difference between this interface and raftProtoServerMethods
+// is the streaming methods.
+type raftProtoServerStubMethods interface {
+	// Members returns the current set of ids of raft members.
+	Members(*context.T, rpc.ServerCall) ([]string, error)
+	// Leader returns the id of the current leader.
+	Leader(*context.T, rpc.ServerCall) (string, error)
+	// RequestVote starts a new round of voting.  It returns the server's current Term and true if
+	// the server voted for the client.
+	RequestVote(_ *context.T, _ rpc.ServerCall, term Term, candidateId string, lastLogTerm Term, lastLogIndex Index) (Term Term, Granted bool, _ error)
+	// AppendToLog is sent by the leader to tell followers to append an entry.  If cmds
+	// is empty, this is a keep alive message (at a random interval after a keep alive, followers
+	// will initiate a new round of voting).
+	//   term -- the current term of the sender
+	//   leaderId -- the id of the sender
+	//   prevIndex -- the index of the log entry immediately preceding cmds
+	//   prevTerm -- the term of the log entry immediately preceding cmds.  The receiver must have
+	//               received the previous index'd entry and it must have had the same term.  Otherwise
+	//               an error is returned.
+	//   leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed
+	//                   to have logged.
+	//   cmds -- sequential log entries starting at prevIndex+1
+	AppendToLog(_ *context.T, _ rpc.ServerCall, term Term, leaderId string, prevIndex Index, prevTerm Term, leaderCommit Index, cmds []LogEntry) error
+	// Append is sent to the leader by followers.  Only the leader is allowed to send AppendToLog.
+	// If a follower receives an Append() call it performs an Append() to the leader to run the actual
+	// Raft algorithm.  The leader will respond after it has RaftClient.Apply()ed the command.
+	//
+	// Returns the term and index of the append entry or an error.
+	Append(_ *context.T, _ rpc.ServerCall, cmd []byte) (term Term, index Index, _ error)
+	// InstallSnapshot is sent from the leader to follower to install the given snapshot.  It is
+	// sent when it becomes apparent that the leader does not have log entries needed by the follower
+	// to progress.  'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the
+	// snapshot.
+	InstallSnapshot(_ *context.T, _ *raftProtoInstallSnapshotServerCallStub, term Term, leaderId string, appliedTerm Term, appliedIndex Index) error
+}
+
+// raftProtoServerStub adds universal methods to raftProtoServerStubMethods.
+type raftProtoServerStub interface {
+	raftProtoServerStubMethods
+	// Describe the raftProto interfaces.
+	Describe__() []rpc.InterfaceDesc
+}
+
+// raftProtoServer returns a server stub for raftProto.
+// It converts an implementation of raftProtoServerMethods into
+// an object that may be used by rpc.Server.
+func raftProtoServer(impl raftProtoServerMethods) raftProtoServerStub {
+	stub := implraftProtoServerStub{
+		impl: impl,
+	}
+	// Initialize GlobState; always check the stub itself first, to handle the
+	// case where the user has the Glob method defined in their VDL source.
+	if gs := rpc.NewGlobState(stub); gs != nil {
+		stub.gs = gs
+	} else if gs := rpc.NewGlobState(impl); gs != nil {
+		stub.gs = gs
+	}
+	return stub
+}
+
+type implraftProtoServerStub struct {
+	impl raftProtoServerMethods
+	gs   *rpc.GlobState
+}
+
+func (s implraftProtoServerStub) Members(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	return s.impl.Members(ctx, call)
+}
+
+func (s implraftProtoServerStub) Leader(ctx *context.T, call rpc.ServerCall) (string, error) {
+	return s.impl.Leader(ctx, call)
+}
+
+func (s implraftProtoServerStub) RequestVote(ctx *context.T, call rpc.ServerCall, i0 Term, i1 string, i2 Term, i3 Index) (Term, bool, error) {
+	return s.impl.RequestVote(ctx, call, i0, i1, i2, i3)
+}
+
+func (s implraftProtoServerStub) AppendToLog(ctx *context.T, call rpc.ServerCall, i0 Term, i1 string, i2 Index, i3 Term, i4 Index, i5 []LogEntry) error {
+	return s.impl.AppendToLog(ctx, call, i0, i1, i2, i3, i4, i5)
+}
+
+func (s implraftProtoServerStub) Append(ctx *context.T, call rpc.ServerCall, i0 []byte) (Term, Index, error) {
+	return s.impl.Append(ctx, call, i0)
+}
+
+func (s implraftProtoServerStub) InstallSnapshot(ctx *context.T, call *raftProtoInstallSnapshotServerCallStub, i0 Term, i1 string, i2 Term, i3 Index) error {
+	return s.impl.InstallSnapshot(ctx, call, i0, i1, i2, i3)
+}
+
+func (s implraftProtoServerStub) Globber() *rpc.GlobState {
+	return s.gs
+}
+
+func (s implraftProtoServerStub) Describe__() []rpc.InterfaceDesc {
+	return []rpc.InterfaceDesc{raftProtoDesc}
+}
+
+// raftProtoDesc describes the raftProto interface.
+var raftProtoDesc rpc.InterfaceDesc = descraftProto
+
+// descraftProto hides the desc to keep godoc clean.
+var descraftProto = rpc.InterfaceDesc{
+	Name:    "raftProto",
+	PkgPath: "v.io/x/ref/lib/raft",
+	Doc:     "// raftProto is used by the members of a raft set to communicate with each other.",
+	Methods: []rpc.MethodDesc{
+		{
+			Name: "Members",
+			Doc:  "// Members returns the current set of ids of raft members.",
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // []string
+			},
+		},
+		{
+			Name: "Leader",
+			Doc:  "// Leader returns the id of the current leader.",
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // string
+			},
+		},
+		{
+			Name: "RequestVote",
+			Doc:  "// RequestVote starts a new round of voting.  It returns the server's current Term and true if\n// the server voted for the client.",
+			InArgs: []rpc.ArgDesc{
+				{"term", ``},         // Term
+				{"candidateId", ``},  // string
+				{"lastLogTerm", ``},  // Term
+				{"lastLogIndex", ``}, // Index
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"Term", ``},    // Term
+				{"Granted", ``}, // bool
+			},
+		},
+		{
+			Name: "AppendToLog",
+			Doc:  "// AppendToLog is sent by the leader to tell followers to append an entry.  If cmds\n// is empty, this is a keep alive message (at a random interval after a keep alive, followers\n// will initiate a new round of voting).\n//   term -- the current term of the sender\n//   leaderId -- the id of the sender\n//   prevIndex -- the index of the log entry immediately preceding cmds\n//   prevTerm -- the term of the log entry immediately preceding cmds.  The receiver must have\n//               received the previous index'd entry and it must have had the same term.  Otherwise\n//               an error is returned.\n//   leaderCommit -- the index of the last committed entry, i.e., the one a quorum has gauranteed\n//                   to have logged.\n//   cmds -- sequential log entries starting at prevIndex+1",
+			InArgs: []rpc.ArgDesc{
+				{"term", ``},         // Term
+				{"leaderId", ``},     // string
+				{"prevIndex", ``},    // Index
+				{"prevTerm", ``},     // Term
+				{"leaderCommit", ``}, // Index
+				{"cmds", ``},         // []LogEntry
+			},
+		},
+		{
+			Name: "Append",
+			Doc:  "// Append is sent to the leader by followers.  Only the leader is allowed to send AppendToLog.\n// If a follower receives an Append() call it performs an Append() to the leader to run the actual\n// Raft algorithm.  The leader will respond after it has RaftClient.Apply()ed the command.\n//\n// Returns the term and index of the append entry or an error.",
+			InArgs: []rpc.ArgDesc{
+				{"cmd", ``}, // []byte
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"term", ``},  // Term
+				{"index", ``}, // Index
+			},
+		},
+		{
+			Name: "InstallSnapshot",
+			Doc:  "// InstallSnapshot is sent from the leader to follower to install the given snapshot.  It is\n// sent when it becomes apparent that the leader does not have log entries needed by the follower\n// to progress.  'term' and 'index' represent the last LogEntry RaftClient.Apply()ed to the\n// snapshot.",
+			InArgs: []rpc.ArgDesc{
+				{"term", ``},         // Term
+				{"leaderId", ``},     // string
+				{"appliedTerm", ``},  // Term
+				{"appliedIndex", ``}, // Index
+			},
+		},
+	},
+}
+
+// raftProtoInstallSnapshotServerStream is the server stream for raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotServerStream interface {
+	// RecvStream returns the receiver side of the raftProto.InstallSnapshot server stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() []byte
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+}
+
+// raftProtoInstallSnapshotServerCall represents the context passed to raftProto.InstallSnapshot.
+type raftProtoInstallSnapshotServerCall interface {
+	rpc.ServerCall
+	raftProtoInstallSnapshotServerStream
+}
+
+// raftProtoInstallSnapshotServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements raftProtoInstallSnapshotServerCall.
+type raftProtoInstallSnapshotServerCallStub struct {
+	rpc.StreamServerCall
+	valRecv []byte
+	errRecv error
+}
+
+// Init initializes raftProtoInstallSnapshotServerCallStub from rpc.StreamServerCall.
+func (s *raftProtoInstallSnapshotServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the raftProto.InstallSnapshot server stream.
+func (s *raftProtoInstallSnapshotServerCallStub) RecvStream() interface {
+	Advance() bool
+	Value() []byte
+	Err() error
+} {
+	return implraftProtoInstallSnapshotServerCallRecv{s}
+}
+
+type implraftProtoInstallSnapshotServerCallRecv struct {
+	s *raftProtoInstallSnapshotServerCallStub
+}
+
+func (s implraftProtoInstallSnapshotServerCallRecv) Advance() bool {
+	s.s.errRecv = s.s.Recv(&s.s.valRecv)
+	return s.s.errRecv == nil
+}
+func (s implraftProtoInstallSnapshotServerCallRecv) Value() []byte {
+	return s.s.valRecv
+}
+func (s implraftProtoInstallSnapshotServerCallRecv) Err() error {
+	if s.s.errRecv == io.EOF {
+		return nil
+	}
+	return s.s.errRecv
+}
diff --git a/lib/raft/raft_test.go b/lib/raft/raft_test.go
new file mode 100644
index 0000000..c6033f1
--- /dev/null
+++ b/lib/raft/raft_test.go
@@ -0,0 +1,158 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"v.io/v23"
+	"v.io/x/lib/vlog"
+	_ "v.io/x/ref/runtime/factories/generic"
+)
+
+// waitForLogAgreement makes sure all working servers agree on the log.
+func waitForLogAgreement(rs []*raft, timeout time.Duration) bool {
+	start := time.Now()
+	for {
+		indexMap := make(map[Index]string)
+		termMap := make(map[Term]string)
+		for _, r := range rs {
+			id := r.Id()
+			indexMap[r.p.LastIndex()] = id
+			termMap[r.p.LastTerm()] = id
+		}
+		if len(indexMap) == 1 && len(termMap) == 1 {
+			vlog.Infof("tada, all logs agree at %v@%v", indexMap, termMap)
+			return true
+		}
+		if time.Now().Sub(start) > timeout {
+			vlog.Errorf("oops, logs disagree %v@%v", indexMap, termMap)
+			return false
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+}
+
+// waitForAppliedAgreement makes sure all working servers have applied all logged commands.
+func waitForAppliedAgreement(rs []*raft, cs []*client, timeout time.Duration) bool {
+	start := time.Now()
+	for {
+		notyet := false
+		indexMap := make(map[Index]string)
+		termMap := make(map[Term]string)
+		for _, r := range rs {
+			id, role, _ := r.Status()
+			if role == RoleStopped {
+				vlog.Infof("ignoring %s", id)
+				continue
+			}
+			if r.p.LastIndex() != r.lastApplied() {
+				notyet = true
+				break
+			}
+			indexMap[r.p.LastIndex()] = id
+			termMap[r.p.LastTerm()] = id
+		}
+		if len(indexMap) == 1 && len(termMap) == 1 && !notyet {
+			vlog.Infof("tada, all applys agree at %v@%v", indexMap, termMap)
+			return true
+		}
+		if time.Now().Sub(start) > timeout {
+			vlog.Errorf("oops, applys disagree %v@%v", indexMap, termMap)
+			return false
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+}
+
+// waitForAllRunning waits until all servers are in leader or follower state.
+func waitForAllRunning(rs []*raft, timeout time.Duration) bool {
+	n := len(rs)
+	start := time.Now()
+	for {
+		i := 0
+		for _, r := range rs {
+			_, role, _ := r.Status()
+			if role == RoleLeader || role == RoleFollower {
+				i++
+			}
+		}
+		if i == n {
+			return true
+		}
+		if time.Now().Sub(start) > timeout {
+			return false
+		}
+		time.Sleep(500 * time.Millisecond)
+	}
+}
+
+func TestAppend(t *testing.T) {
+	vlog.Infof("TestAppend")
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	rs, cs := buildRafts(t, ctx, 3, nil)
+	defer cleanUp(rs)
+	thb := rs[0].heartbeat
+
+	// One of the raft members should time out not hearing a leader and start an election.
+	r1 := waitForElection(t, rs, 5*thb)
+	if r1 == nil {
+		t.Fatalf("too long to find a leader")
+	}
+	time.Sleep(time.Millisecond)
+	if !waitForLeaderAgreement(rs, thb) {
+		t.Fatalf("no leader agreement")
+	}
+
+	// Append some entries.
+	start := time.Now()
+	for i := 0; i < 100; i++ {
+		cmd := fmt.Sprintf("the rain in spain %d", i)
+		if apperr, err := r1.Append(ctx, []byte(cmd)); apperr != nil || err != nil {
+			t.Fatalf("append %s failed with %s", cmd, err)
+		}
+	}
+	if !waitForAppliedAgreement(rs, cs, 2*thb) {
+		t.Fatalf("no log agreement")
+	}
+	vlog.Infof("appends took %s", time.Now().Sub(start))
+
+	// Kill off one instance and see if we keep committing.
+	r1.Stop()
+	r2 := waitForElection(t, rs, 5*thb)
+	if r2 == nil {
+		t.Fatalf("too long to find a leader")
+	}
+	if !waitForLeaderAgreement(rs, thb) {
+		t.Fatalf("no leader agreement")
+	}
+
+	// Append some entries.
+	for i := 0; i < 10; i++ {
+		cmd := fmt.Sprintf("the rain in spain %d", i+10)
+		if apperr, err := r2.Append(ctx, []byte(cmd)); apperr != nil || err != nil {
+			t.Fatalf("append %s failed with %s", cmd, err)
+		}
+	}
+	if !waitForAppliedAgreement(rs, cs, thb) {
+		t.Fatalf("no log agreement")
+	}
+
+	// Restart the stopped server and wait for it to become a follower.
+	restart(t, ctx, rs, cs, r1)
+	if !waitForAllRunning(rs, 3*thb) {
+		t.Fatalf("server didn't joing after restart")
+	}
+
+	if !waitForAppliedAgreement(rs, cs, 3*thb) {
+		t.Fatalf("no log agreement")
+	}
+
+	vlog.Infof("TestAppend passed")
+}
diff --git a/lib/raft/service.go b/lib/raft/service.go
new file mode 100644
index 0000000..4daab9d
--- /dev/null
+++ b/lib/raft/service.go
@@ -0,0 +1,214 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package raft
+
+import (
+	"reflect"
+
+	"v.io/x/lib/vlog"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	"v.io/v23/verror"
+)
+
+// service is the implementation of the raftProto.  It is only used between members
+// to communicate with each other.
+type service struct {
+	r          *raft
+	server     rpc.Server
+	serverName string
+	acl        access.AccessList
+}
+
+// newService creates a new service for the raft protocol and returns its network endpoints.
+func (s *service) newService(ctx *context.T, r *raft, serverName, hostPort string, acl access.AccessList) ([]naming.Endpoint, error) {
+	var err error
+	s.r = r
+	s.serverName = serverName
+	s.acl = acl
+	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", hostPort}}})
+	_, s.server, err = v23.WithNewDispatchingServer(ctx, serverName, s)
+	if err != nil {
+		return nil, err
+	}
+	return s.server.Status().Endpoints, nil
+}
+
+func (s *service) stopService() {
+	s.server.Stop()
+}
+
+// Lookup implements rpc.Dispatcher.Lookup.
+func (s *service) Lookup(ctx *context.T, name string) (interface{}, security.Authorizer, error) {
+	return raftProtoServer(s), s, nil
+}
+
+// Authorize allows anyone matching the ACL from the RaftConfig or with the same public
+// key as the server.
+func (s *service) Authorize(ctx *context.T, call security.Call) error {
+	if names, _ := security.RemoteBlessingNames(ctx, call); len(names) != 0 {
+		if s.acl.Includes(names...) {
+			return nil
+		}
+	}
+	if l, r := call.LocalBlessings().PublicKey(), call.RemoteBlessings().PublicKey(); l != nil && reflect.DeepEqual(l, r) {
+			return nil
+	}
+	return verror.New(verror.ErrNoAccess, ctx)
+}
+
+// Members implements raftProto.Members.
+func (s *service) Members(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	r := s.r
+	r.Lock()
+	defer r.Unlock()
+
+	var members []string
+	for m := range r.memberMap {
+		members = append(members, m)
+	}
+	return members, nil
+}
+
+// Members implements raftProto.Leader.
+func (s *service) Leader(ctx *context.T, call rpc.ServerCall) (string, error) {
+	r := s.r
+	r.Lock()
+	defer r.Unlock()
+	return r.leader, nil
+}
+
+// RequestVote implements raftProto.RequestVote.
+func (s *service) RequestVote(ctx *context.T, call rpc.ServerCall, term Term, candidate string, lastLogTerm Term, lastLogIndex Index) (Term, bool, error) {
+	r := s.r
+
+	// The voting needs to be atomic.
+	r.Lock()
+	defer r.Unlock()
+
+	// An old election?
+	if term < r.p.CurrentTerm() {
+		return r.p.CurrentTerm(), false, nil
+	}
+
+	// If the term is higher than the current election term, then we are into a new election.
+	if term > r.p.CurrentTerm() {
+		r.setRoleAndWatchdogTimer(RoleFollower)
+		r.p.SetVotedFor("")
+	}
+	vf := r.p.VotedFor()
+
+	// Have we already voted for someone else (for example myself)?
+	if vf != "" && vf != candidate {
+		return r.p.CurrentTerm(), false, nil
+	}
+
+	// If we have a more up to date log, ignore the candidate. (RAFT's safety restriction)
+	if r.p.LastTerm() > lastLogTerm {
+		return r.p.CurrentTerm(), false, nil
+	}
+	if r.p.LastTerm() == lastLogTerm && r.p.LastIndex() > lastLogIndex {
+		return r.p.CurrentTerm(), false, nil
+	}
+
+	// Vote for candidate.
+	r.p.SetCurrentTermAndVotedFor(term, candidate)
+	return r.p.CurrentTerm(), true, nil
+}
+
+// AppendToLog implements RaftProto.AppendToLog.
+func (s *service) AppendToLog(ctx *context.T, call rpc.ServerCall, term Term, leader string, prevIndex Index, prevTerm Term, leaderCommit Index, entries []LogEntry) error {
+	r := s.r
+
+	// The append needs to be atomic.
+	r.Lock()
+
+	// The leader has to be at least as up to date as we are.
+	if term < r.p.CurrentTerm() {
+		r.Unlock()
+		return verror.New(errBadTerm, ctx, term, r.p.CurrentTerm())
+	}
+
+	// At this point we  accept the sender as leader and become a follower.
+	if r.leader != leader {
+		vlog.Infof("@%s new leader %s during AppendToLog", r.me.id, leader)
+	}
+	r.leader = leader
+	r.lcv.Broadcast()
+	r.setRoleAndWatchdogTimer(RoleFollower)
+	r.p.SetVotedFor("")
+
+	// Append if we can.
+	if err := r.p.AppendToLog(ctx, prevTerm, prevIndex, entries); err != nil {
+		vlog.Errorf("@%s AppendToLog returns %s", r.me.id, err)
+		r.Unlock()
+		return err
+	}
+	r.Unlock()
+	r.newCommit <- leaderCommit
+	return nil
+}
+
+// Append implements RaftProto.Append.
+func (s *service) Append(ctx *context.T, call rpc.ServerCall, cmd []byte) (Term, Index, error) {
+	r := s.r
+	r.Lock()
+
+	if r.role != RoleLeader {
+		r.Unlock()
+		return 0, 0, verror.New(errNotLeader, ctx)
+	}
+
+	// Assign an index and term to the log entry.
+	le := LogEntry{Term: r.p.CurrentTerm(), Index: r.p.LastIndex()+1, Cmd: cmd}
+
+	// Append to our own log.
+	if err := r.p.AppendToLog(ctx, r.p.LastTerm(), r.p.LastIndex(), []LogEntry{le}); err != nil {
+		// This shouldn't happen.
+		r.Unlock()
+		return 0, 0, err
+	}
+
+	// Update the fact that we've logged it.
+	r.setMatchIndex(r.me, le.Index)
+
+	// Tell each follower to update.
+	r.kickFollowers()
+	r.Unlock()
+
+	// The entry is not yet commited or applied.  The caller must verify that itself.
+	return le.Term, le.Index, nil
+}
+
+// InstallSnapshot implements RaftProto.InstallSnapshot.
+func (s *service) InstallSnapshot(ctx *context.T, call raftProtoInstallSnapshotServerCall, term Term, leader string, appliedTerm Term, appliedIndex Index) error {
+	r := s.r
+
+	// The snapshot needs to be atomic.
+	r.Lock()
+	defer r.Unlock()
+
+	// The leader has to be at least as up to date as we are.
+	if term < r.p.CurrentTerm() {
+		return verror.New(errBadTerm, ctx, term, r.p.CurrentTerm())
+	}
+
+	// At this point we  accept the sender as leader and become a follower.
+	if r.leader != leader {
+		vlog.Infof("@%s new leader %s during InstallSnapshot", r.me.id, leader)
+	}
+	r.leader = leader
+	r.lcv.Broadcast()
+	r.setRoleAndWatchdogTimer(RoleFollower)
+	r.p.SetVotedFor("")
+
+	// Store the snapshot and restore client from it.
+	return r.p.SnapshotFromLeader(ctx, appliedTerm, appliedIndex, call)
+}