blob: f14948e5de4ebcc61e40da61d4f280b218afd5b4 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package raft
import (
"reflect"
"v.io/x/lib/vlog"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/v23/verror"
)
// service is the implementation of the raftProto. It is only used between members
// to communicate with each other.
type service struct {
r *raft
server rpc.Server
serverName string
acl access.AccessList
}
// newService creates a new service for the raft protocol and returns its network endpoints.
func (s *service) newService(ctx *context.T, r *raft, serverName, hostPort string, acl access.AccessList) ([]naming.Endpoint, error) {
var err error
s.r = r
s.serverName = serverName
s.acl = acl
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", hostPort}}})
_, s.server, err = v23.WithNewDispatchingServer(ctx, serverName, s)
if err != nil {
return nil, err
}
return s.server.Status().Endpoints, nil
}
// Lookup implements rpc.Dispatcher.Lookup.
func (s *service) Lookup(ctx *context.T, name string) (interface{}, security.Authorizer, error) {
return raftProtoServer(s), s, nil
}
// Authorize allows anyone matching the ACL from the RaftConfig or with the same public
// key as the server.
func (s *service) Authorize(ctx *context.T, call security.Call) error {
if names, _ := security.RemoteBlessingNames(ctx, call); len(names) != 0 {
if s.acl.Includes(names...) {
return nil
}
}
if l, r := call.LocalBlessings().PublicKey(), call.RemoteBlessings().PublicKey(); l != nil && reflect.DeepEqual(l, r) {
return nil
}
return verror.New(verror.ErrNoAccess, ctx)
}
// Members implements raftProto.Members.
func (s *service) Members(ctx *context.T, call rpc.ServerCall) ([]string, error) {
r := s.r
r.Lock()
defer r.Unlock()
var members []string
for m := range r.memberMap {
members = append(members, m)
}
return members, nil
}
// Members implements raftProto.Leader.
func (s *service) Leader(ctx *context.T, call rpc.ServerCall) (string, error) {
r := s.r
r.Lock()
defer r.Unlock()
return r.leader, nil
}
// RequestVote implements raftProto.RequestVote.
func (s *service) RequestVote(ctx *context.T, call rpc.ServerCall, term Term, candidate string, lastLogTerm Term, lastLogIndex Index) (Term, bool, error) {
r := s.r
// The voting needs to be atomic.
r.Lock()
defer r.Unlock()
// An old election?
if term < r.p.CurrentTerm() {
return r.p.CurrentTerm(), false, nil
}
// If the term is higher than the current election term, then we are into a new election.
if term > r.p.CurrentTerm() {
r.setRoleAndWatchdogTimer(RoleFollower)
r.p.SetCurrentTermAndVotedFor(term, "")
}
vf := r.p.VotedFor()
// Have we already voted for someone else (for example myself)?
if vf != "" && vf != candidate {
return r.p.CurrentTerm(), false, nil
}
// If we have a more up to date log, ignore the candidate. (RAFT's safety restriction)
if r.p.LastTerm() > lastLogTerm {
return r.p.CurrentTerm(), false, nil
}
if r.p.LastTerm() == lastLogTerm && r.p.LastIndex() > lastLogIndex {
return r.p.CurrentTerm(), false, nil
}
// Vote for candidate and make sure we're a follower.
r.setRole(RoleFollower)
r.p.SetCurrentTermAndVotedFor(term, candidate)
return r.p.CurrentTerm(), true, nil
}
// AppendToLog implements RaftProto.AppendToLog.
func (s *service) AppendToLog(ctx *context.T, call rpc.ServerCall, term Term, leader string, prevIndex Index, prevTerm Term, leaderCommit Index, entries []LogEntry) error {
r := s.r
// The append needs to be atomic.
r.Lock()
// The leader has to be at least as up to date as we are.
if term < r.p.CurrentTerm() {
r.Unlock()
return verror.New(errBadTerm, ctx, term, r.p.CurrentTerm())
}
// At this point we accept the sender as leader and become a follower.
if r.leader != leader {
vlog.VI(2).Infof("@%s new leader %s during AppendToLog", r.me.id, leader)
}
r.leader = leader
r.lcv.Broadcast()
// Update our term if we are behind.
if term > r.p.CurrentTerm() {
r.p.SetCurrentTermAndVotedFor(term, "")
}
// Restart our timer since we just heard from the leader.
r.setRoleAndWatchdogTimer(RoleFollower)
// Append if we can.
if err := r.p.AppendToLog(ctx, prevTerm, prevIndex, entries); err != nil {
vlog.Errorf("@%s AppendToLog returns %s", r.me.id, err)
r.Unlock()
return err
}
r.Unlock()
r.newCommit <- leaderCommit
return nil
}
// Append implements RaftProto.Append.
func (s *service) Append(ctx *context.T, call rpc.ServerCall, cmd []byte) (Term, Index, error) {
r := s.r
r.Lock()
if r.role != RoleLeader {
r.Unlock()
return 0, 0, verror.New(errNotLeader, ctx)
}
// Assign an index and term to the log entry.
le := LogEntry{Term: r.p.CurrentTerm(), Index: r.p.LastIndex() + 1, Cmd: cmd}
// Append to our own log.
if err := r.p.AppendToLog(ctx, r.p.LastTerm(), r.p.LastIndex(), []LogEntry{le}); err != nil {
// This shouldn't happen.
r.Unlock()
return 0, 0, err
}
// Update the fact that we've logged it.
r.setMatchIndex(r.me, le.Index)
// Tell each follower to update.
r.kickFollowers()
r.Unlock()
// The entry is not yet commited or applied. The caller must verify that itself.
return le.Term, le.Index, nil
}
// InstallSnapshot implements RaftProto.InstallSnapshot.
func (s *service) InstallSnapshot(ctx *context.T, call raftProtoInstallSnapshotServerCall, term Term, leader string, appliedTerm Term, appliedIndex Index) error {
r := s.r
// The snapshot needs to be atomic.
r.Lock()
defer r.Unlock()
// The leader has to be at least as up to date as we are.
if term < r.p.CurrentTerm() {
return verror.New(errBadTerm, ctx, term, r.p.CurrentTerm())
}
// At this point we accept the sender as leader and become a follower.
if r.leader != leader {
vlog.VI(2).Infof("@%s new leader %s during InstallSnapshot", r.me.id, leader)
}
r.leader = leader
r.lcv.Broadcast()
// Update our term if we are behind.
if term > r.p.CurrentTerm() {
r.p.SetCurrentTermAndVotedFor(term, "")
}
// Restart our timer since we just heard from the leader.
r.setRoleAndWatchdogTimer(RoleFollower)
// Store the snapshot and restore client from it.
return r.p.SnapshotFromLeader(ctx, appliedTerm, appliedIndex, call)
}
func (s *service) Committed(ctx *context.T, call rpc.ServerCall) (Index, error) {
r := s.r
r.Lock()
defer r.Unlock()
if r.role != RoleLeader {
return 0, verror.New(errNotLeader, ctx)
}
return r.commitIndex, nil
}