blob: 264615ff8e6f1ca517db8717a8e6d574d3c4bb0b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package raft
// This package implements the Raft protocol, https://ramcloud.stanford.edu/raft.pdf. The
// logged commands are strings. If someone wishes a more complex command structure, they
// should use an encoding (e.g. json) into the strings.
import (
"io"
"math/rand"
"sort"
"sync"
"time"
"v.io/x/lib/vlog"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/verror"
)
const pkgPath = "v.io/x/ref/lib.raft"
var (
errBadAppend = verror.Register(pkgPath+".errBadAppend", verror.NoRetry, "{1:}{2:} inconsistent append{:_}")
errAddAfterStart = verror.Register(pkgPath+".errAddAfterStart", verror.NoRetry, "{1:}{2:} adding member after start{:_}")
errNotLeader = verror.Register(pkgPath+".errNotLeader", verror.NoRetry, "{1:}{2:} not the leader{:_}")
errWTF = verror.Register(pkgPath+".errWTF", verror.NoRetry, "{1:}{2:} internal error{:_}")
errTimedOut = verror.Register(pkgPath+".errTimedOut", verror.NoRetry, "{1:}{2:} request timed out{:_}")
errBadTerm = verror.Register(pkgPath+".errBadTerm", verror.NoRetry, "{1:}{2:} new term {3} < {4} {:_}")
)
// member keeps track of another member's state.
type member struct {
id string
nextIndex Index // Next log index to send to this follower.
matchIndex Index // Last entry logged by this follower.
stopped chan struct{} // Follower go routine closes this to indicate it has terminated.
update chan struct{}
timer *time.Timer
}
// memberSlice is used for sorting members by highest logged (matched) entry.
type memberSlice []*member
func (m memberSlice) Len() int { return len(m) }
func (m memberSlice) Less(i, j int) bool { return m[i].matchIndex > m[j].matchIndex }
func (m memberSlice) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
// raft is the implementation of the raft library.
type raft struct {
sync.Mutex
ctx *context.T
cancel context.CancelFunc
rng *rand.Rand
timer *time.Timer
heartbeat time.Duration
// rpc interface between instances.
s service
// Client interface.
client RaftClient
// applied is the highest log entry applied to the client.
applied struct {
index Index
term Term
}
// Raft algorithm volatile state.
role int
leader string
quorum int // Number of members that form a quorum.
commitIndex Index // Highest index commited.
memberMap map[string]*member // Map of raft members (including current).
memberSet memberSlice // Slice of raft members (including current).
me *member
// Raft algorithm persistent state
p persistent
logDir string
// stop and stopped are for clean shutdown. All long lived go routines (perFollower and serverEvents)
// exit when stop is closed. Each perFollower then closes member.stopped and serverEvents closes
// stopped to signal that they are finished.
stop chan struct{} // perFollower and serverEvents go routines exit when this is closed.
stopped chan struct{} // serverEvents go routine closes this to indicate it has terminated.
// Each time a perFollower successfully logs entries on a follower it writes to newMatch to get the
// serverEvents routine to possibly update the commitIndex.
newMatch chan struct{} // Each time a follower reports a logged entry a message is sent on this.
// Each time a follower receives a new commit index, it sends it to newCommit to get the serverEvents
// routine to apply any newly committed entries.
newCommit chan Index // Each received leader commit index is written to this.
// Wait here for commitIndex to change.
ccv *sync.Cond
// Wait here for leadership to change.
lcv *sync.Cond
// Variables for the sync loop.
sync struct {
sync.Mutex
requested uint64 // Incremented each sync request.
requestedcv *sync.Cond
done uint64 // Updated to last request prior to the current sync.
donecv *sync.Cond
stopped chan struct{}
}
}
// logentry is the in memory structure for each logged item. It is
type logEntry struct {
Term Term
Index Index
Cmd []byte
Type byte
ApplyError error
}
// newRaft creates a new raft server.
// logDir - the name of the directory in which to persist the log.
// serverName - a name for the server to announce itself as in a mount table. All members should use the
// same name and hence be alternatives for callers.
// hostPort - the network address of the server
// hb - the interval between heartbeats. 0 means use default.
// snapshotThreshold - the size the log can reach before we create a snapshot. 0 means use default.
// client - callbacks to the client.
func newRaft(ctx *context.T, config *RaftConfig, client RaftClient) (*raft, error) {
nctx, cancel := context.WithCancel(ctx)
r := &raft{}
r.ctx = nctx
r.cancel = cancel
r.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
r.heartbeat = config.Heartbeat
if r.heartbeat == 0 {
r.heartbeat = 3 * time.Second
}
// Client interface.
r.client = client
r.applied.term = 0
r.applied.index = 0
// Raft volatile state.
r.role = RoleStopped
r.commitIndex = 0
r.leader = ""
r.memberMap = make(map[string]*member)
r.memberSet = make([]*member, 0)
r.AddMember(ctx, config.HostPort)
r.me = r.memberMap[config.HostPort]
// Raft persistent state.
var err error
r.logDir = config.LogDir
if r.p, err = openPersist(ctx, r, config.SnapshotThreshold); err != nil {
return nil, err
}
// Internal communication/synchronization.
r.newMatch = make(chan struct{}, 100)
r.newCommit = make(chan Index, 100)
r.ccv = sync.NewCond(r)
r.lcv = sync.NewCond(r)
r.sync.donecv = sync.NewCond(&r.sync)
r.sync.requestedcv = sync.NewCond(&r.sync)
// The RPC interface to other members.
eps, err := r.s.newService(nctx, r, config.ServerName, config.HostPort, config.Acl)
if err != nil {
return nil, err
}
// If we're in the V namespace, just use the name as our Id. If not create one
// from the network address.
r.me.id = config.ServerName
if r.me.id == "" {
r.me.id = string(getShortName(eps[0]))
}
return r, nil
}
// getShortName will return a /host:port name if possible. Otherwise it will just return the name
// version of the endpoint.
func getShortName(ep naming.Endpoint) string {
if ep.Addr().Network() != "tcp" {
return ep.Name()
}
return naming.JoinAddressName(ep.Addr().String(), "")
}
// AddMember adds the id as a raft member. The id must be a vanadium name.
func (r *raft) AddMember(ctx *context.T, id string) error {
if r.role != RoleStopped {
// Already started.
// TODO(p): Raft has a protocol for changing membership after
// start. I'll add that after I get at least one client
// working.
return verror.New(errAddAfterStart, ctx)
}
m := &member{id: id, stopped: make(chan struct{}), update: make(chan struct{}, 10)}
r.memberMap[id] = m
r.memberSet = append(r.memberSet, m)
// Quorum has to be more than half the servers.
r.quorum = (len(r.memberSet) + 1) / 2
return nil
}
// Id returns the vanadium name of this server.
func (r *raft) Id() string {
return r.me.id
}
// Start gets the protocol going.
func (r *raft) Start() {
vlog.Infof("@%s starting", r.me.id)
r.Lock()
defer r.Unlock()
if r.role != RoleStopped {
// already started
return
}
r.timer = time.NewTimer(2 * r.heartbeat)
// serverEvents serializes events for this server.
r.stop = make(chan struct{})
r.stopped = make(chan struct{})
go r.serverEvents()
// syncLoop syncs with the leader when needed.
r.sync.stopped = make(chan struct{})
go r.syncLoop()
// perFollowers updates the followers when we're the leader.
for _, m := range r.memberSet {
if m.id != r.me.id {
go r.perFollower(m)
}
}
return
}
// Stop ceases all function as a raft server.
func (r *raft) Stop() {
vlog.Infof("@%s stopping", r.me.id)
r.Lock()
if r.role == RoleStopped {
r.Unlock()
return
}
r.role = RoleStopped
r.Unlock()
r.cancel()
// Stop the associated go routines.
close(r.stop)
// Wait for serverEvents to stop.
<-r.stopped
// Wait for syncLoop to stop.
r.sync.donecv.Broadcast()
r.sync.requestedcv.Broadcast()
<-r.sync.stopped
// Wait for all the perFollower routines to stop.
for _, m := range r.memberSet {
if m.id != r.me.id {
<-m.stopped
}
}
// Shut down the log file.
r.p.Close()
vlog.Infof("@%s stopping service", r.me.id)
<-r.s.server.Closed()
vlog.Infof("@%s stopped", r.me.id)
}
// setRoleAndWatchdogTimer called with r.l locked.
func (r *raft) setRoleAndWatchdogTimer(role int) {
vlog.VI(2).Infof("@%s %s->%s", r.me.id, roleToString(r.role), roleToString(role))
r.role = role
switch role {
case RoleFollower:
// Wake up any RaftProto.Append()s waiting for a commitment. They
// will now have to give up since we are no longer leader.
r.ccv.Broadcast()
// Set a timer to start an election if we no longer hear from the leader.
r.resetTimerFuzzy(2 * r.heartbeat)
case RoleLeader:
r.leader = r.me.id
// Set known follower status to default values.
for _, m := range r.memberSet {
if m.id != r.me.id {
m.nextIndex = r.p.LastIndex() + 1
m.matchIndex = 0
}
}
// Set my match index to the last one logged.
r.setMatchIndex(r.me, r.p.LastIndex())
// Let waiters know a new leader exists.
r.lcv.Broadcast()
case RoleCandidate:
// If this goes off, we lost an election and need to start a new one.
// We make it longer than the follower timeout because we make have
// lost due to safety so give someone else a chance.
r.resetTimerFuzzy(4 * r.heartbeat)
}
}
// setRole called with r.l locked.
func (r *raft) setRole(role int) {
vlog.VI(2).Infof("@%s %s->%s", r.me.id, roleToString(r.role), roleToString(role))
r.role = role
}
func (r *raft) appendNull() {
// Assign an index and term to the log entry.
le := LogEntry{Term: r.p.CurrentTerm(), Index: r.p.LastIndex() + 1, Cmd: nil, Type: RaftEntry}
// Append to our own log.
if err := r.p.AppendToLog(r.ctx, r.p.LastTerm(), r.p.LastIndex(), []LogEntry{le}); err != nil {
// This shouldn't happen.
return
}
// Update the fact that we've logged it.
r.setMatchIndex(r.me, le.Index)
r.kickFollowers()
}
// Status returns the current member's id, its raft role, and who it thinks is leader.
func (r *raft) Status() (string, int, string) {
r.Lock()
defer r.Unlock()
return r.me.id, r.role, r.leader
}
// StartElection starts a new round of voting. We do this by incrementing the
// Term and, in parallel, calling each other member to vote. If we receive a
// majority we win and send a heartbeat to each member.
//
// Someone else many get elected in the middle of the vote so we have to
// make sure we're still a candidate at the end of the voting.
func (r *raft) StartElection() {
r.Lock()
defer r.Unlock()
r.startElection()
}
func (r *raft) startElection() {
// If we can't get a response in 2 seconds, something is really wrong.
ctx, cancel := context.WithTimeout(r.ctx, 2*time.Second)
defer cancel()
if err := r.p.IncCurrentTerm(); err != nil {
// If this fails, there's no way to recover.
vlog.Fatalf("incrementing current term: %s", err)
return
}
vlog.Infof("@%s startElection new term %d", r.me.id, r.p.CurrentTerm())
msg := []interface{}{
r.p.CurrentTerm(),
r.me.id,
r.p.LastTerm(),
r.p.LastIndex(),
}
var members []string
for k, m := range r.memberMap {
if m.id == r.me.id {
continue
}
members = append(members, k)
}
r.setRoleAndWatchdogTimer(RoleCandidate)
r.p.SetVotedFor(r.me.id)
r.leader = ""
r.Unlock()
// We have to do this outside the lock or the system will deadlock when two members start overlapping votes.
type reply struct {
term Term
ok bool
}
c := make(chan reply)
for _, id := range members {
go func(id string) {
var rep reply
client := v23.GetClient(ctx)
if err := client.Call(ctx, id, "RequestVote", msg, []interface{}{&rep.term, &rep.ok}, options.Preresolved{}); err != nil {
vlog.Infof("@%s sending RequestVote to %s: %s", r.me.id, id, err)
}
c <- rep
}(id)
}
// Wait till all the voters have voted or timed out.
oks := 1 // We vote for ourselves.
highest := Term(0)
for range members {
rep := <-c
if rep.ok {
oks++
}
if rep.term > highest {
highest = rep.term
}
}
r.Lock()
// We have to check the role since someone else may have become the leader during the round and
// made us a follower.
if oks <= len(members)/2 || r.role != RoleCandidate {
if highest > r.p.CurrentTerm() {
// If someone answered with a higher term, stop being a candidate.
r.setRoleAndWatchdogTimer(RoleFollower)
r.p.SetCurrentTerm(highest)
}
vlog.VI(2).Infof("@%s lost election with %d votes", r.me.id, oks)
return
}
vlog.Infof("@%s won election with %d votes", r.me.id, oks)
r.setRoleAndWatchdogTimer(RoleLeader)
// Tell followers we are now the leader.
r.appendNull()
}
// applyCommits applies any committed entries.
func (r *raft) applyCommits(commitIndex Index) {
for r.applied.index < commitIndex {
// This is the only go routine that changes r.applied
// so we don't have to protect our reads.
next := r.applied.index + 1
le := r.p.Lookup(next)
if le == nil {
// Commit index is ahead of our highest entry.
return
}
switch le.Type {
case ClientEntry:
le.ApplyError = r.client.Apply(le.Cmd, le.Index)
case RaftEntry:
}
// But we do have to lock our writes.
r.Lock()
r.applied.index = next
r.applied.term = le.Term
r.Unlock()
}
r.p.ConsiderSnapshot(r.ctx, r.applied.term, r.applied.index)
}
func (r *raft) lastApplied() Index {
r.Lock()
defer r.Unlock()
return r.applied.index
}
func (r *raft) resetTimerFuzzy(d time.Duration) {
fuzz := time.Duration(rand.Int63n(int64(r.heartbeat)))
r.timer.Reset(d + fuzz)
}
func (r *raft) resetTimer(d time.Duration) {
r.timer.Reset(d)
}
func highestFromChan(i Index, c chan Index) Index {
for {
select {
case j := <-c:
if j > i {
i = j
}
default:
return i
}
}
}
// serverEvents is a go routine that serializes server events. This loop performs:
// (1) all changes to commitIndex both as a leader and a follower.
// (2) all application of committed log commands.
// (3) all elections.
func (r *raft) serverEvents() {
r.Lock()
r.setRoleAndWatchdogTimer(RoleFollower)
r.Unlock()
for {
select {
case <-r.stop:
// Terminate.
close(r.stopped)
return
case <-r.timer.C:
// Start an election whenever either:
// (1) a follower hasn't heard from the leader in a random interval > 2 * heartbeat.
// (2) a candidate hasn't won an election or been told anyone else has after hearbeat.
r.Lock()
switch r.role {
case RoleCandidate:
r.startElection()
case RoleFollower:
r.startElection()
}
r.Unlock()
case <-r.newMatch:
// Soak up any queued requests.
emptyChan(r.newMatch)
// This happens whenever we have gotten a reply from a follower. We do it
// here rather than in perFollower solely as a matter of taste.
// Update the commitIndex if needed and apply any newly committed entries.
r.Lock()
if r.role != RoleLeader {
r.Unlock()
continue
}
sort.Sort(r.memberSet)
ci := r.memberSet[r.quorum-1].matchIndex
if ci <= r.commitIndex {
r.Unlock()
continue
}
r.commitIndex = ci
r.Unlock()
r.applyCommits(ci)
r.ccv.Broadcast()
r.kickFollowers()
case i := <-r.newCommit:
// Get highest queued up commit.
i = highestFromChan(i, r.newCommit)
// Update the commitIndex if needed and apply any newly committed entries.
r.Lock()
if r.role != RoleFollower {
r.Unlock()
continue
}
if i > r.commitIndex {
r.commitIndex = i
}
ci := r.commitIndex
r.Unlock()
r.applyCommits(ci)
r.ccv.Broadcast()
}
}
}
// makeAppendMsg creates an append message at most 10 entries long.
func (r *raft) makeAppendMsg(m *member) ([]interface{}, int) {
// Figure out if we know the previous entry.
prevTerm, prevIndex, ok := r.p.LookupPrevious(m.nextIndex)
if !ok {
return nil, 0
}
// Collect some log entries to send along. 0 is ok.
var entries []LogEntry
for i := 0; i < 10; i++ {
le := r.p.Lookup(m.nextIndex + Index(i))
if le == nil {
break
}
entries = append(entries, LogEntry{Cmd: le.Cmd, Term: le.Term, Index: le.Index, Type: le.Type})
}
return []interface{}{
r.p.CurrentTerm(),
r.me.id,
prevIndex,
prevTerm,
r.commitIndex,
entries,
}, len(entries)
}
// updateFollower loops trying to update a follower until the follower is updated or we can't proceed.
// It will always send at least one update so will also act as a heartbeat.
func (r *raft) updateFollower(m *member) {
// Bring this server up to date.
r.Lock()
defer r.Unlock()
for {
// If we're not the leader we have no followers.
if r.role != RoleLeader {
return
}
// Collect some log entries starting at m.nextIndex.
msg, n := r.makeAppendMsg(m)
if msg == nil {
// Try sending a snapshot.
r.Unlock()
vlog.Infof("@%s sending snapshot to %s", r.me.id, m.id)
snapIndex, err := r.sendLatestSnapshot(m)
r.Lock()
if err != nil {
// Try again later.
vlog.Errorf("@%s sending snapshot to %s: %s", r.me.id, m.id, err)
return
}
m.nextIndex = snapIndex + 1
vlog.Infof("@%s sent snapshot to %s", r.me.id, m.id)
// Try sending anything following the snapshot.
continue
}
// Send to the follower. We drop the lock while we do this. That means we may stop being the
// leader in the middle of the call but that's OK as long as we check when we get it back.
r.Unlock()
ctx, cancel := context.WithTimeout(r.ctx, time.Duration(2)*time.Second)
client := v23.GetClient(ctx)
err := client.Call(ctx, m.id, "AppendToLog", msg, []interface{}{}, options.Preresolved{})
cancel()
r.Lock()
if r.role != RoleLeader {
// Not leader any more, doesn't matter how he replied.
return
}
if err != nil {
if verror.ErrorID(err) != errOutOfSequence.ID {
// A problem other than missing entries. Retry later.
//vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
vlog.Errorf("@%s updating %s: %s", r.me.id, m.id, err)
return
}
// At this point we know that the follower is missing entries pervious to what
// we just sent. If we can backup, do it. Otherwise try sending a snapshot.
if m.nextIndex <= 1 {
return
}
prev := r.p.Lookup(m.nextIndex - 1)
if prev == nil {
return
}
// We can back up.
m.nextIndex = m.nextIndex - 1
continue
}
// The follower appended correctly, update indices and tell the server thread that
// the commit index may need to change.
m.nextIndex += Index(n)
logged := m.nextIndex - 1
if n > 0 {
r.setMatchIndex(m, logged)
}
// The follower is caught up?
if m.nextIndex > r.p.LastIndex() {
return
}
}
}
func (r *raft) sendLatestSnapshot(m *member) (Index, error) {
rd, term, index, err := r.p.OpenLatestSnapshot(r.ctx)
if err != nil {
return 0, err
}
ctx, cancel := context.WithTimeout(r.ctx, time.Duration(5*60)*time.Second)
defer cancel()
client := raftProtoClient(m.id)
call, err := client.InstallSnapshot(ctx, r.p.CurrentTerm(), r.me.id, term, index, options.Preresolved{})
if err != nil {
return 0, err
}
sstream := call.SendStream()
b := make([]byte, 10240)
for {
n, err := rd.Read(b)
if n == 0 && err == io.EOF {
break
}
if err = sstream.Send(b); err != nil {
return 0, err
}
}
if err := call.Finish(); err != nil {
return 0, err
}
return index, nil
}
func emptyChan(c chan struct{}) {
for {
select {
case <-c:
default:
return
}
}
}
// perFollower is a go routine that sequences all messages to a single follower.
//
// This is the only go routine that updates the follower's variables so all changes to
// the member struct are serialized by it.
func (r *raft) perFollower(m *member) {
m.timer = time.NewTimer(r.heartbeat)
for {
select {
case <-m.timer.C:
r.updateFollower(m)
m.timer.Reset(r.heartbeat)
case <-m.update:
// Soak up any waiting update requests
emptyChan(m.update)
r.updateFollower(m)
m.timer.Reset(r.heartbeat)
case <-r.stop:
close(m.stopped)
return
}
}
}
// kickFollowers causes each perFollower routine to try to update its followers.
func (r *raft) kickFollowers() {
for _, m := range r.memberMap {
select {
case m.update <- struct{}{}:
default:
}
}
}
// setMatchIndex updates the matchIndex for a member.
//
// called with r locked.
func (r *raft) setMatchIndex(m *member, i Index) {
m.matchIndex = i
if i <= r.commitIndex {
return
}
// Check if we need to change the commit index.
select {
case r.newMatch <- struct{}{}:
default:
}
}
func minIndex(indices ...Index) Index {
if len(indices) == 0 {
return 0
}
min := indices[0]
for _, x := range indices[1:] {
if x < min {
min = x
}
}
return min
}
func roleToString(r int) string {
switch r {
case RoleCandidate:
return "candidate"
case RoleLeader:
return "leader"
case RoleFollower:
return "follower"
}
return "?"
}
func (r *raft) waitForApply(ctx *context.T, term Term, index Index) (error, error) {
r.Lock()
defer r.Unlock()
for {
if r.applied.index >= index {
if term == 0 {
// Special case: we don't care about Apply() error or committed term, only that we've reached index.
return nil, nil
}
le := r.p.Lookup(index)
if le == nil || le.Term != term {
// There was an election and the log entry was lost.
return nil, verror.New(errNotLeader, ctx)
}
return le.ApplyError, nil
}
// Give up if the caller doesn't want to wait.
select {
case <-ctx.Done():
return nil, verror.New(errTimedOut, ctx)
default:
}
// Wait for an apply to happen. r will be unlocked during the wait.
r.ccv.Wait()
}
}
// waitForLeadership waits until there is an elected leader.
func (r *raft) waitForLeadership(ctx *context.T) (string, int, Index, bool) {
r.Lock()
defer r.Unlock()
for len(r.leader) == 0 {
// Give up if the caller doesn't want to wait.
select {
case <-ctx.Done():
return "", 0, 0, true
default:
}
r.lcv.Wait()
}
return r.leader, r.role, r.commitIndex, false
}
// Append tells the leader to append to the log. The first error is the result of the client.Apply. The second
// is any error from raft.
func (r *raft) Append(ctx *context.T, cmd []byte) (error, error) {
for {
leader, role, _, timedOut := r.waitForLeadership(ctx)
if timedOut {
return nil, verror.New(errTimedOut, ctx)
}
switch role {
case RoleLeader:
term, index, err := r.s.Append(ctx, nil, cmd)
if err == nil {
// We were the leader and the entry has now been applied.
return r.waitForApply(ctx, term, index)
}
// If the leader can't do it, give up.
if verror.ErrorID(err) != errNotLeader.ID {
return nil, err
}
case RoleFollower:
client := v23.GetClient(ctx)
var index Index
var term Term
if len(leader) == 0 {
break
}
err := client.Call(ctx, leader, "Append", []interface{}{cmd}, []interface{}{&term, &index}, options.Preresolved{})
if err == nil {
return r.waitForApply(ctx, term, index)
}
// If the leader can't do it, give up.
if verror.ErrorID(err) != errNotLeader.ID {
return nil, err
}
}
// Give up if the caller doesn't want to wait.
select {
case <-ctx.Done():
err := verror.New(errTimedOut, ctx)
return nil, err
default:
}
}
}
func (r *raft) Leader() (bool, string) {
r.Lock()
defer r.Unlock()
if r.role == RoleLeader {
return true, r.leader
}
return false, r.leader
}
// syncWithLeader synchronizes with the leader. On return we have applied the commit index
// that existed before the call.
func (r *raft) syncWithLeader(ctx *context.T) error {
for {
leader, role, commitIndex, timedOut := r.waitForLeadership(ctx)
if timedOut {
return verror.New(errTimedOut, ctx)
}
switch role {
case RoleLeader:
r.waitForApply(ctx, 0, commitIndex)
return nil
case RoleFollower:
client := v23.GetClient(ctx)
var index Index
err := client.Call(ctx, leader, "Committed", []interface{}{}, []interface{}{&index}, options.Preresolved{})
if err == nil {
r.waitForApply(ctx, 0, index)
return nil
}
// If the leader can't do it, give up.
if verror.ErrorID(err) != errNotLeader.ID {
return err
}
}
// Give up if the caller doesn't want to wait.
select {
case <-ctx.Done():
return verror.New(errTimedOut, ctx)
default:
}
}
}
// syncLoop is a go routine that syncs whenever necessary with the leader.
func (r *raft) syncLoop() {
for {
// Wait for someone to request syncing.
r.sync.Lock()
for r.sync.requested <= r.sync.done {
select {
case <-r.stop:
close(r.sync.stopped)
r.sync.Unlock()
return
default:
}
r.sync.requestedcv.Wait()
}
requested := r.sync.requested
r.sync.Unlock()
// Perform the sync outside the lock.
if err := r.syncWithLeader(r.ctx); err != nil {
continue
}
// Wake up waiters.
r.sync.Lock()
r.sync.done = requested
r.sync.Unlock()
r.sync.donecv.Broadcast()
}
}
// Sync waits for this member to have applied the current commit indexl.
func (r *raft) Sync(ctx *context.T) error {
r.sync.Lock()
defer r.sync.Unlock()
r.sync.requested++
requested := r.sync.requested
r.sync.requestedcv.Broadcast()
// Wait for our sync to complete.
for requested > r.sync.done {
select {
case <-r.stop:
return verror.New(errTimedOut, ctx)
case <-ctx.Done():
return verror.New(errTimedOut, ctx)
default:
}
r.sync.donecv.Wait()
}
return nil
}