// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package raft

// This package implements the Raft protocol, https://ramcloud.stanford.edu/raft.pdf. The
// logged commands are strings.   If someone wishes a more complex command structure, they
// should use an encoding (e.g. json) into the strings.

import (
	"io"
	"math/rand"
	"sort"
	"sync"
	"time"

	"v.io/x/lib/vlog"

	"v.io/v23"
	"v.io/v23/context"
	"v.io/v23/naming"
	"v.io/v23/options"
	"v.io/v23/verror"
)

const pkgPath = "v.io/x/ref/lib.raft"

var (
	errBadAppend     = verror.Register(pkgPath+".errBadAppend", verror.NoRetry, "{1:}{2:} inconsistent append{:_}")
	errAddAfterStart = verror.Register(pkgPath+".errAddAfterStart", verror.NoRetry, "{1:}{2:} adding member after start{:_}")
	errNotLeader     = verror.Register(pkgPath+".errNotLeader", verror.NoRetry, "{1:}{2:} not the leader{:_}")
	errWTF           = verror.Register(pkgPath+".errWTF", verror.NoRetry, "{1:}{2:} internal error{:_}")
	errTimedOut      = verror.Register(pkgPath+".errTimedOut", verror.NoRetry, "{1:}{2:} request timed out{:_}")
	errBadTerm       = verror.Register(pkgPath+".errBadTerm", verror.NoRetry, "{1:}{2:} new term {3} < {4} {:_}")
)

// member keeps track of another member's state.
type member struct {
	id         string
	nextIndex  Index         // Next log index to send to this follower.
	matchIndex Index         // Last entry logged by this follower.
	stopped    chan struct{} // Follower go routine closes this to indicate it has terminated.
	update     chan struct{}
	timer      *time.Timer
}

// memberSlice is used for sorting members by highest logged (matched) entry.
type memberSlice []*member

func (m memberSlice) Len() int           { return len(m) }
func (m memberSlice) Less(i, j int) bool { return m[i].matchIndex > m[j].matchIndex }
func (m memberSlice) Swap(i, j int)      { m[i], m[j] = m[j], m[i] }

// raft is the implementation of the raft library.
type raft struct {
	sync.Mutex

	ctx       *context.T
	cancel    context.CancelFunc
	rng       *rand.Rand
	timer     *time.Timer
	heartbeat time.Duration

	// rpc interface between instances.
	s service

	// Client interface.
	client RaftClient

	// applied is the highest log entry applied to the client.
	applied struct {
		index Index
		term  Term
	}

	// Raft algorithm volatile state.
	role        int
	leader      string
	quorum      int                // Number of members that form a quorum.
	commitIndex Index              // Highest index commited.
	memberMap   map[string]*member // Map of raft members (including current).
	memberSet   memberSlice        // Slice of raft members (including current).
	me          *member

	// Raft algorithm persistent state
	p      persistent
	logDir string

	// stop and stopped are for clean shutdown.  All long lived go routines (perFollower and serverEvents)
	// exit when stop is closed.  Each perFollower then closes member.stopped and serverEvents closes
	// stopped to signal that they are finished.
	stop    chan struct{} // perFollower and serverEvents go routines exit when this is closed.
	stopped chan struct{} // serverEvents go routine closes this to indicate it has terminated.

	// Each time a perFollower successfully logs entries on a follower it writes to newMatch to get the
	// serverEvents routine to possibly update the commitIndex.
	newMatch chan struct{} // Each time a follower reports a logged entry a message is sent on this.

	// Each time a follower receives a new commit index, it sends it to newCommit to get the serverEvents
	// routine to apply any newly committed entries.
	newCommit chan Index // Each received leader commit index is written to this.

	// Wait here for commitIndex to change.
	ccv *sync.Cond

	// Wait here for leadership to change.
	lcv *sync.Cond

	// Variables for the sync loop.
	sync struct {
		sync.Mutex
		requested   uint64 // Incremented each sync request.
		requestedcv *sync.Cond
		done        uint64 // Updated to last request prior to the current sync.
		donecv      *sync.Cond
		stopped     chan struct{}
	}
}

// logentry is the in memory structure for each logged item.  It is
type logEntry struct {
	Term       Term
	Index      Index
	Cmd        []byte
	Type       byte
	ApplyError error
}

// newRaft creates a new raft server.
//  logDir        - the name of the directory in which to persist the log.
//  serverName    - a name for the server to announce itself as in a mount table.  All members should use the
//                 same name and hence be alternatives for callers.
//  hostPort      - the network address of the server
//  hb            - the interval between heartbeats.  0 means use default.
//  snapshotThreshold - the size the log can reach before we create a snapshot.  0 means use default.
//  client        - callbacks to the client.
func newRaft(ctx *context.T, config *RaftConfig, client RaftClient) (*raft, error) {
	nctx, cancel := context.WithCancel(ctx)
	r := &raft{}
	r.ctx = nctx
	r.cancel = cancel
	r.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
	r.heartbeat = config.Heartbeat
	if r.heartbeat == 0 {
		r.heartbeat = 3 * time.Second
	}

	// Client interface.
	r.client = client
	r.applied.term = 0
	r.applied.index = 0

	// Raft volatile state.
	r.role = RoleStopped
	r.commitIndex = 0
	r.leader = ""
	r.memberMap = make(map[string]*member)
	r.memberSet = make([]*member, 0)
	r.AddMember(ctx, config.HostPort)
	r.me = r.memberMap[config.HostPort]

	// Raft persistent state.
	var err error
	r.logDir = config.LogDir
	if r.p, err = openPersist(ctx, r, config.SnapshotThreshold); err != nil {
		return nil, err
	}

	// Internal communication/synchronization.
	r.newMatch = make(chan struct{}, 100)
	r.newCommit = make(chan Index, 100)
	r.ccv = sync.NewCond(r)
	r.lcv = sync.NewCond(r)
	r.sync.donecv = sync.NewCond(&r.sync)
	r.sync.requestedcv = sync.NewCond(&r.sync)

	// The RPC interface to other members.
	eps, err := r.s.newService(nctx, r, config.ServerName, config.HostPort, config.Acl)
	if err != nil {
		return nil, err
	}

	// If we're in the V namespace, just use the name as our Id.  If not create one
	// from the network address.
	r.me.id = config.ServerName
	if r.me.id == "" {
		r.me.id = string(getShortName(eps[0]))
	}

	return r, nil
}

// getShortName will return a /host:port name if possible.  Otherwise it will just return the name
// version of the endpoint.
func getShortName(ep naming.Endpoint) string {
	if ep.Addr().Network() != "tcp" {
		return ep.Name()
	}
	return naming.JoinAddressName(ep.Addr().String(), "")
}

// AddMember adds the id as a raft member.  The id must be a vanadium name.
func (r *raft) AddMember(ctx *context.T, id string) error {
	if r.role != RoleStopped {
		// Already started.
		// TODO(p): Raft has a protocol for changing membership after
		// start.  I'll add that after I get at least one client
		// working.
		return verror.New(errAddAfterStart, ctx)
	}
	m := &member{id: id, stopped: make(chan struct{}), update: make(chan struct{}, 10)}
	r.memberMap[id] = m
	r.memberSet = append(r.memberSet, m)
	// Quorum has to be more than half the servers.
	r.quorum = (len(r.memberSet) + 1) / 2
	return nil
}

// Id returns the vanadium name of this server.
func (r *raft) Id() string {
	return r.me.id
}

// Start gets the protocol going.
func (r *raft) Start() {
	vlog.Infof("@%s starting", r.me.id)
	r.Lock()
	defer r.Unlock()
	if r.role != RoleStopped {
		// already started
		return
	}
	r.timer = time.NewTimer(2 * r.heartbeat)

	// serverEvents serializes events for this server.
	r.stop = make(chan struct{})
	r.stopped = make(chan struct{})
	go r.serverEvents()

	// syncLoop syncs with the leader when needed.
	r.sync.stopped = make(chan struct{})
	go r.syncLoop()

	// perFollowers updates the followers when we're the leader.
	for _, m := range r.memberSet {
		if m.id != r.me.id {
			go r.perFollower(m)
		}
	}
	return
}

// Stop ceases all function as a raft server.
func (r *raft) Stop() {
	vlog.Infof("@%s stopping", r.me.id)
	r.Lock()
	if r.role == RoleStopped {
		r.Unlock()
		return
	}
	r.role = RoleStopped
	r.Unlock()
	r.cancel()

	// Stop the associated go routines.
	close(r.stop)

	// Wait for serverEvents to stop.
	<-r.stopped

	// Wait for syncLoop to stop.
	r.sync.donecv.Broadcast()
	r.sync.requestedcv.Broadcast()
	<-r.sync.stopped

	// Wait for all the perFollower routines to stop.
	for _, m := range r.memberSet {
		if m.id != r.me.id {
			<-m.stopped
		}
	}

	// Shut down the log file.
	r.p.Close()

	vlog.Infof("@%s stopping service", r.me.id)
	<-r.s.server.Closed()
	vlog.Infof("@%s stopped", r.me.id)
}

// setRoleAndWatchdogTimer called with r.l locked.
func (r *raft) setRoleAndWatchdogTimer(role int) {
	vlog.VI(2).Infof("@%s %s->%s", r.me.id, roleToString(r.role), roleToString(role))
	r.role = role
	switch role {
	case RoleFollower:
		// Wake up any RaftProto.Append()s waiting for a commitment.  They
		// will now have to give up since we are no longer leader.
		r.ccv.Broadcast()
		// Set a timer to start an election if we no longer hear from the leader.
		r.resetTimerFuzzy(2 * r.heartbeat)
	case RoleLeader:
		r.leader = r.me.id

		// Set known follower status to default values.
		for _, m := range r.memberSet {
			if m.id != r.me.id {
				m.nextIndex = r.p.LastIndex() + 1
				m.matchIndex = 0
			}
		}

		// Set my match index to the last one logged.
		r.setMatchIndex(r.me, r.p.LastIndex())

		// Let waiters know a new leader exists.
		r.lcv.Broadcast()
	case RoleCandidate:
		// If this goes off, we lost an election and need to start a new one.
		// We make it longer than the follower timeout because we make have
		// lost due to safety so give someone else a chance.
		r.resetTimerFuzzy(4 * r.heartbeat)
	}
}

// setRole called with r.l locked.
func (r *raft) setRole(role int) {
	vlog.VI(2).Infof("@%s %s->%s", r.me.id, roleToString(r.role), roleToString(role))
	r.role = role
}

func (r *raft) appendNull() {
	// Assign an index and term to the log entry.
	le := LogEntry{Term: r.p.CurrentTerm(), Index: r.p.LastIndex() + 1, Cmd: nil, Type: RaftEntry}

	// Append to our own log.
	if err := r.p.AppendToLog(r.ctx, r.p.LastTerm(), r.p.LastIndex(), []LogEntry{le}); err != nil {
		// This shouldn't happen.
		return
	}

	// Update the fact that we've logged it.
	r.setMatchIndex(r.me, le.Index)
	r.kickFollowers()
}

// Status returns the current member's id, its raft role, and who it thinks is leader.
func (r *raft) Status() (string, int, string) {
	r.Lock()
	defer r.Unlock()
	return r.me.id, r.role, r.leader
}

// StartElection starts a new round of voting.  We do this by incrementing the
// Term and, in parallel, calling each other member to vote.  If we receive a
// majority we win and send a heartbeat to each member.
//
// Someone else many get elected in the middle of the vote so we have to
// make sure we're still a candidate at the end of the voting.
func (r *raft) StartElection() {
	r.Lock()
	defer r.Unlock()
	r.startElection()
}

func (r *raft) startElection() {
	// If we can't get a response in 2 seconds, something is really wrong.
	ctx, _ := context.WithTimeout(r.ctx, 2*time.Second)
	if err := r.p.IncCurrentTerm(); err != nil {
		// If this fails, there's no way to recover.
		vlog.Fatalf("incrementing current term: %s", err)
		return
	}
	vlog.Infof("@%s startElection new term %d", r.me.id, r.p.CurrentTerm())

	msg := []interface{}{
		r.p.CurrentTerm(),
		r.me.id,
		r.p.LastTerm(),
		r.p.LastIndex(),
	}
	var members []string
	for k, m := range r.memberMap {
		if m.id == r.me.id {
			continue
		}
		members = append(members, k)
	}
	r.setRoleAndWatchdogTimer(RoleCandidate)
	r.p.SetVotedFor(r.me.id)
	r.leader = ""
	r.Unlock()

	// We have to do this outside the lock or the system will deadlock when two members start overlapping votes.
	type reply struct {
		term Term
		ok   bool
	}
	c := make(chan reply)
	for _, id := range members {
		go func(id string) {
			var rep reply
			client := v23.GetClient(ctx)
			if err := client.Call(ctx, id, "RequestVote", msg, []interface{}{&rep.term, &rep.ok}, options.Preresolved{}); err != nil {
				vlog.Infof("@%s sending RequestVote to %s: %s", r.me.id, id, err)
			}
			c <- rep
		}(id)
	}

	// Wait till all the voters have voted or timed out.
	oks := 1 // We vote for ourselves.
	highest := Term(0)
	for range members {
		rep := <-c
		if rep.ok {
			oks++
		}
		if rep.term > highest {
			highest = rep.term
		}
	}

	r.Lock()
	// We have to check the role since someone else may have become the leader during the round and
	// made us a follower.
	if oks <= len(members)/2 || r.role != RoleCandidate {
		if highest > r.p.CurrentTerm() {
			// If someone answered with a higher term, stop being a candidate.
			r.setRoleAndWatchdogTimer(RoleFollower)
			r.p.SetCurrentTerm(highest)
		}
		vlog.VI(2).Infof("@%s lost election with %d votes", r.me.id, oks)
		return
	}
	vlog.Infof("@%s won election with %d votes", r.me.id, oks)
	r.setRoleAndWatchdogTimer(RoleLeader)

	// Tell followers we are now the leader.
	r.appendNull()

}

// applyCommits applies any committed entries.
func (r *raft) applyCommits(commitIndex Index) {
	for r.applied.index < commitIndex {
		// This is the only go routine that changes r.applied
		// so we don't have to protect our reads.
		next := r.applied.index + 1
		le := r.p.Lookup(next)
		if le == nil {
			// Commit index is ahead of our highest entry.
			return
		}
		switch le.Type {
		case ClientEntry:
			le.ApplyError = r.client.Apply(le.Cmd, le.Index)
		case RaftEntry:
		}

		// But we do have to lock our writes.
		r.Lock()
		r.applied.index = next
		r.applied.term = le.Term
		r.Unlock()
	}

	r.p.ConsiderSnapshot(r.ctx, r.applied.term, r.applied.index)
}

func (r *raft) lastApplied() Index {
	r.Lock()
	defer r.Unlock()
	return r.applied.index
}

func (r *raft) resetTimerFuzzy(d time.Duration) {
	fuzz := time.Duration(rand.Int63n(int64(r.heartbeat)))
	r.timer.Reset(d + fuzz)
}

func (r *raft) resetTimer(d time.Duration) {
	r.timer.Reset(d)
}

func highestFromChan(i Index, c chan Index) Index {
	for {
		select {
		case j := <-c:
			if j > i {
				i = j
			}
		default:
			return i
		}
	}
}

// serverEvents is a go routine that serializes server events.  This loop performs:
// (1) all changes to commitIndex both as a leader and a follower.
// (2) all application of committed log commands.
// (3) all elections.
func (r *raft) serverEvents() {
	r.Lock()
	r.setRoleAndWatchdogTimer(RoleFollower)
	r.Unlock()
	for {
		select {
		case <-r.stop:
			// Terminate.
			close(r.stopped)
			return
		case <-r.timer.C:
			// Start an election whenever either:
			// (1) a follower hasn't heard from the leader in a random interval > 2 * heartbeat.
			// (2) a candidate hasn't won an election or been told anyone else has after hearbeat.
			r.Lock()
			switch r.role {
			case RoleCandidate:
				r.startElection()
			case RoleFollower:
				r.startElection()
			}
			r.Unlock()
		case <-r.newMatch:
			// Soak up any queued requests.
			emptyChan(r.newMatch)

			// This happens whenever we have gotten a reply from a follower.  We do it
			// here rather than in perFollower solely as a matter of taste.
			// Update the commitIndex if needed and apply any newly committed entries.
			r.Lock()
			if r.role != RoleLeader {
				r.Unlock()
				continue
			}
			sort.Sort(r.memberSet)
			ci := r.memberSet[r.quorum-1].matchIndex
			if ci <= r.commitIndex {
				r.Unlock()
				continue
			}
			r.commitIndex = ci
			r.Unlock()
			r.applyCommits(ci)
			r.ccv.Broadcast()
			r.kickFollowers()
		case i := <-r.newCommit:
			// Get highest queued up commit.
			i = highestFromChan(i, r.newCommit)

			// Update the commitIndex if needed and apply any newly committed entries.
			r.Lock()
			if r.role != RoleFollower {
				r.Unlock()
				continue
			}
			if i > r.commitIndex {
				r.commitIndex = i
			}
			ci := r.commitIndex
			r.Unlock()
			r.applyCommits(ci)
			r.ccv.Broadcast()
		}
	}
}

// makeAppendMsg creates an append message at most 10 entries long.
func (r *raft) makeAppendMsg(m *member) ([]interface{}, int) {
	// Figure out if we know the previous entry.
	prevTerm, prevIndex, ok := r.p.LookupPrevious(m.nextIndex)
	if !ok {
		return nil, 0
	}
	// Collect some log entries to send along.  0 is ok.
	var entries []LogEntry
	for i := 0; i < 10; i++ {
		le := r.p.Lookup(m.nextIndex + Index(i))
		if le == nil {
			break
		}
		entries = append(entries, LogEntry{Cmd: le.Cmd, Term: le.Term, Index: le.Index, Type: le.Type})
	}
	return []interface{}{
		r.p.CurrentTerm(),
		r.me.id,
		prevIndex,
		prevTerm,
		r.commitIndex,
		entries,
	}, len(entries)
}

// updateFollower loops trying to update a follower until the follower is updated or we can't proceed.
// It will always send at least one update so will also act as a heartbeat.
func (r *raft) updateFollower(m *member) {
	// Bring this server up to date.
	r.Lock()
	defer r.Unlock()
	for {
		// If we're not the leader we have no followers.
		if r.role != RoleLeader {
			return
		}

		// Collect some log entries starting at m.nextIndex.
		msg, n := r.makeAppendMsg(m)
		if msg == nil {
			// Try sending a snapshot.
			r.Unlock()
			vlog.Infof("@%s sending snapshot to %s", r.me.id, m.id)
			snapIndex, err := r.sendLatestSnapshot(m)
			r.Lock()
			if err != nil {
				// Try again later.
				vlog.Errorf("@%s sending snapshot to %s: %s", r.me.id, m.id, err)
				return
			}
			m.nextIndex = snapIndex + 1
			vlog.Infof("@%s sent snapshot to %s", r.me.id, m.id)
			// Try sending anything following the snapshot.
			continue
		}

		// Send to the follower. We drop the lock while we do this. That means we may stop being the
		// leader in the middle of the call but that's OK as long as we check when we get it back.
		r.Unlock()
		ctx, _ := context.WithTimeout(r.ctx, time.Duration(2)*time.Second)
		client := v23.GetClient(ctx)
		err := client.Call(ctx, m.id, "AppendToLog", msg, []interface{}{}, options.Preresolved{})
		r.Lock()
		if r.role != RoleLeader {
			// Not leader any more, doesn't matter how he replied.
			return
		}

		if err != nil {
			if verror.ErrorID(err) != errOutOfSequence.ID {
				// A problem other than missing entries.  Retry later.
				//vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
				vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
				return
			}
			// At this point we know that the follower is missing entries pervious to what
			// we just sent.  If we can backup, do it.  Otherwise try sending a snapshot.
			if m.nextIndex <= 1 {
				return
			}
			prev := r.p.Lookup(m.nextIndex - 1)
			if prev == nil {
				return
			}
			// We can back up.
			m.nextIndex = m.nextIndex - 1
			continue
		}

		// The follower appended correctly, update indices and tell the server thread that
		// the commit index may need to change.
		m.nextIndex += Index(n)
		logged := m.nextIndex - 1
		if n > 0 {
			r.setMatchIndex(m, logged)
		}

		// The follower is caught up?
		if m.nextIndex > r.p.LastIndex() {
			return
		}
	}
}

func (r *raft) sendLatestSnapshot(m *member) (Index, error) {
	rd, term, index, err := r.p.OpenLatestSnapshot(r.ctx)
	if err != nil {
		return 0, err
	}
	ctx, _ := context.WithTimeout(r.ctx, time.Duration(5*60)*time.Second)
	client := raftProtoClient(m.id)
	call, err := client.InstallSnapshot(ctx, r.p.CurrentTerm(), r.me.id, term, index, options.Preresolved{})
	if err != nil {
		return 0, err
	}
	sstream := call.SendStream()
	b := make([]byte, 10240)
	for {
		n, err := rd.Read(b)
		if n == 0 && err == io.EOF {
			break
		}
		if err = sstream.Send(b); err != nil {
			return 0, err
		}
	}
	if err := call.Finish(); err != nil {
		return 0, err
	}
	return index, nil
}

func emptyChan(c chan struct{}) {
	for {
		select {
		case <-c:
		default:
			return
		}
	}
}

// perFollower is a go routine that sequences all messages to a single follower.
//
// This is the only go routine that updates the follower's variables so all changes to
// the member struct are serialized by it.
func (r *raft) perFollower(m *member) {
	m.timer = time.NewTimer(r.heartbeat)
	for {
		select {
		case <-m.timer.C:
			r.updateFollower(m)
			m.timer.Reset(r.heartbeat)
		case <-m.update:
			// Soak up any waiting update requests
			emptyChan(m.update)
			r.updateFollower(m)
			m.timer.Reset(r.heartbeat)
		case <-r.stop:
			close(m.stopped)
			return
		}
	}
}

// kickFollowers causes each perFollower routine to try to update its followers.
func (r *raft) kickFollowers() {
	for _, m := range r.memberMap {
		select {
		case m.update <- struct{}{}:
		default:
		}
	}
}

// setMatchIndex updates the matchIndex for a member.
//
// called with r locked.
func (r *raft) setMatchIndex(m *member, i Index) {
	m.matchIndex = i
	if i <= r.commitIndex {
		return
	}
	// Check if we need to change the commit index.
	select {
	case r.newMatch <- struct{}{}:
	default:
	}
}

func minIndex(indices ...Index) Index {
	if len(indices) == 0 {
		return 0
	}
	min := indices[0]
	for _, x := range indices[1:] {
		if x < min {
			min = x
		}
	}
	return min
}

func roleToString(r int) string {
	switch r {
	case RoleCandidate:
		return "candidate"
	case RoleLeader:
		return "leader"
	case RoleFollower:
		return "follower"
	}
	return "?"
}

func (r *raft) waitForApply(ctx *context.T, term Term, index Index) (error, error) {
	r.Lock()
	defer r.Unlock()
	for {
		if r.applied.index >= index {
			if term == 0 {
				// Special case: we don't care about Apply() error or committed term, only that we've reached index.
				return nil, nil
			}
			le := r.p.Lookup(index)
			if le == nil || le.Term != term {
				// There was an election and the log entry was lost.
				return nil, verror.New(errNotLeader, ctx)
			}
			return le.ApplyError, nil
		}

		// Give up if the caller doesn't want to wait.
		select {
		case <-ctx.Done():
			return nil, verror.New(errTimedOut, ctx)
		default:
		}

		// Wait for an apply to happen.  r will be unlocked during the wait.
		r.ccv.Wait()
	}
}

// waitForLeadership waits until there is an elected leader.
func (r *raft) waitForLeadership(ctx *context.T) (string, int, Index, bool) {
	r.Lock()
	defer r.Unlock()
	for len(r.leader) == 0 {
		// Give up if the caller doesn't want to wait.
		select {
		case <-ctx.Done():
			return "", 0, 0, true
		default:
		}
		r.lcv.Wait()
	}
	return r.leader, r.role, r.commitIndex, false
}

// Append tells the leader to append to the log.  The first error is the result of the client.Apply.  The second
// is any error from raft.
func (r *raft) Append(ctx *context.T, cmd []byte) (error, error) {
	for {
		leader, role, _, timedOut := r.waitForLeadership(ctx)
		if timedOut {
			return nil, verror.New(errTimedOut, ctx)
		}
		switch role {
		case RoleLeader:
			term, index, err := r.s.Append(ctx, nil, cmd)
			if err == nil {
				// We were the leader and the entry has now been applied.
				return r.waitForApply(ctx, term, index)
			}
			// If the leader can't do it, give up.
			if verror.ErrorID(err) != errNotLeader.ID {
				return nil, err
			}
		case RoleFollower:
			client := v23.GetClient(ctx)
			var index Index
			var term Term
			if len(leader) == 0 {
				break
			}
			err := client.Call(ctx, leader, "Append", []interface{}{cmd}, []interface{}{&term, &index}, options.Preresolved{})
			if err == nil {
				return r.waitForApply(ctx, term, index)
			}
			// If the leader can't do it, give up.
			if verror.ErrorID(err) != errNotLeader.ID {
				return nil, err
			}
		}

		// Give up if the caller doesn't want to wait.
		select {
		case <-ctx.Done():
			err := verror.New(errTimedOut, ctx)
			return nil, err
		default:
		}
	}
}

func (r *raft) Leader() (bool, string) {
	r.Lock()
	defer r.Unlock()
	if r.role == RoleLeader {
		return true, r.leader
	}
	return false, r.leader
}

// syncWithLeader synchronizes with the leader.  On return we have applied the commit index
// that existed before the call.
func (r *raft) syncWithLeader(ctx *context.T) error {
	for {
		leader, role, commitIndex, timedOut := r.waitForLeadership(ctx)
		if timedOut {
			return verror.New(errTimedOut, ctx)
		}

		switch role {
		case RoleLeader:
			r.waitForApply(ctx, 0, commitIndex)
			return nil
		case RoleFollower:
			client := v23.GetClient(ctx)
			var index Index
			err := client.Call(ctx, leader, "Committed", []interface{}{}, []interface{}{&index}, options.Preresolved{})
			if err == nil {
				r.waitForApply(ctx, 0, index)
				return nil
			}
			// If the leader can't do it, give up.
			if verror.ErrorID(err) != errNotLeader.ID {
				return err
			}
		}

		// Give up if the caller doesn't want to wait.
		select {
		case <-ctx.Done():
			return verror.New(errTimedOut, ctx)
		default:
		}
	}
}

// syncLoop is a go routine that syncs whenever necessary with the leader.
func (r *raft) syncLoop() {
	for {
		// Wait for someone to request syncing.
		r.sync.Lock()
		for r.sync.requested <= r.sync.done {
			select {
			case <-r.stop:
				close(r.sync.stopped)
				r.sync.Unlock()
				return
			default:
			}
			r.sync.requestedcv.Wait()
		}
		requested := r.sync.requested
		r.sync.Unlock()

		// Perform the sync outside the lock.
		if err := r.syncWithLeader(r.ctx); err != nil {
			continue
		}

		// Wake up waiters.
		r.sync.Lock()
		r.sync.done = requested
		r.sync.Unlock()
		r.sync.donecv.Broadcast()
	}
}

// Sync waits for this member to have applied the current commit indexl.
func (r *raft) Sync(ctx *context.T) error {
	r.sync.Lock()
	defer r.sync.Unlock()
	r.sync.requested++
	requested := r.sync.requested
	r.sync.requestedcv.Broadcast()
	// Wait for our sync to complete.
	for requested > r.sync.done {
		select {
		case <-r.stop:
			return verror.New(errTimedOut, ctx)
		case <-ctx.Done():
			return verror.New(errTimedOut, ctx)
		default:
		}
		r.sync.donecv.Wait()
	}
	return nil
}
