blob: c387e77dec32c51742cabda17b5b2a5ff2439b1d [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package raft
import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"
"v.io/x/lib/vlog"
"v.io/x/ref/test"
)
func tempDir(t *testing.T) string {
// Get a temporary file name that is unlikely to clash with anyone else's.
dir, err := ioutil.TempDir("", "raftstate")
if err != nil {
t.Fatalf("TempDir: %s", err)
}
os.Remove(dir)
vlog.Infof("log is %s", dir)
return dir
}
func compareLogs(t *testing.T, p persistent, expected []LogEntry, tag string) {
found := 0
outer:
for i := Index(0); i < 1000; i++ {
le := p.Lookup(i)
if le == nil {
continue
}
for _, v := range expected {
if v.Index != le.Index {
continue
}
if v.Term != le.Term || string(v.Cmd) != string(le.Cmd) {
t.Fatalf("%s: expected %v, got %v", tag, v, *le)
}
found++
continue outer
}
t.Fatalf("unexpected: %v", *le)
}
if found != len(expected) {
t.Fatalf("%s: not all entries found, got %d out of %v", tag, found, expected)
}
}
// TestCreation verifies that a log is created when we start a fresh instance and is still there after we stop it.
func TestCreation(t *testing.T) {
vlog.Infof("TestCreation")
ctx, shutdown := test.V23Init()
defer shutdown()
td := tempDir(t)
defer os.RemoveAll(td)
// Launch a raft. This should create the log file and append at least one control entry to it.
config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td}
r, err := newRaft(ctx, &config, new(client))
if err != nil {
t.Fatalf("newRaft: %s", err)
}
vlog.Infof("id is %s", r.Id())
// Make sure the file is there.
info, err := os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
if err != nil {
t.Fatalf("File didn't last: %s", err)
}
if info.Size() == 0 {
t.Fatalf("File too short: %d", info.Size())
}
r.Stop()
// Make sure the file is still there after shutdown.
info, err = os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
if err != nil {
t.Fatalf("File didn't last: %s", err)
}
if info.Size() == 0 {
t.Fatalf("File too short: %d", info.Size())
}
vlog.Infof("TestCreation passed")
}
// TestPersistence verifies that we store enough state to return to the previous log state
// after a orderly stop and restart.
//
// We also test that a truncated log remains truncated after an orderly stop and restart.
func TestPersistence(t *testing.T) {
vlog.Infof("TestPersistence")
ctx, shutdown := test.V23Init()
defer shutdown()
td := tempDir(t)
defer os.RemoveAll(td)
// Launch a raft. This should create the file and populate the header with the
// nil values. The file should persist after the raft is closed.
config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td}
r, err := newRaft(ctx, &config, new(client))
if err != nil {
t.Fatalf("newRaft: %s", err)
}
vlog.Infof("id is %s", r.Id())
// Set the persistent state.
if err := r.p.SetCurrentTerm(2); err != nil {
t.Fatalf("SetCurrentTerm: %s", err)
}
if err := r.p.SetVotedFor("whocares"); err != nil {
t.Fatalf("SetVotedFor: %s", err)
}
cmds1 := []LogEntry{
LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
}
if err := r.p.AppendToLog(ctx, 0, 0, cmds1); err != nil {
t.Fatalf("AppendToLog: %s", err)
}
cmds2 := []LogEntry{
LogEntry{Term: 3, Index: 4, Cmd: []byte("cmd4")},
LogEntry{Term: 3, Index: 5, Cmd: []byte("cmd5")},
LogEntry{Term: 3, Index: 6, Cmd: []byte("cmd6")},
}
if err := r.p.AppendToLog(ctx, 2, 3, cmds2); err != nil {
t.Fatalf("AppendToLog: %s", err)
}
info, err := os.Stat(path.Join(td, "00000000000000000000.00000000000000000000.log"))
if err != nil {
t.Fatalf("File didn't last: %s", err)
}
vlog.Infof("log size %d", info.Size())
vlog.Infof("stopping %s", r.Id())
r.Stop()
// Reopen the persistent store and make sure it matches.
r, err = newRaft(ctx, &config, new(client))
if err != nil {
t.Fatalf("newRaft: %s", err)
}
vlog.Infof("id is %s", r.Id())
if r.p.CurrentTerm() != 2 {
t.Fatalf("CurrentTerm: expected %d got %d", 2, r.p.CurrentTerm())
}
if r.p.VotedFor() != "whocares" {
t.Fatalf("CurrentTerm: expected %s got %s", "whocares", r.p.VotedFor())
}
test := []LogEntry{
LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
LogEntry{Term: 3, Index: 4, Cmd: []byte("cmd4")},
LogEntry{Term: 3, Index: 5, Cmd: []byte("cmd5")},
LogEntry{Term: 3, Index: 6, Cmd: []byte("cmd6")},
}
compareLogs(t, r.p, test, "after reopen")
// Truncate the log by rewriting an index.
if err := r.p.AppendToLog(ctx, 2, 3, []LogEntry{LogEntry{Term: 4, Index: 4, Cmd: []byte("cmd7")}}); err != nil {
t.Fatalf("AppendToLog: %s", err)
}
test2 := []LogEntry{
LogEntry{Term: 2, Index: 1, Cmd: []byte("cmd1")},
LogEntry{Term: 2, Index: 2, Cmd: []byte("cmd2")},
LogEntry{Term: 2, Index: 3, Cmd: []byte("cmd3")},
LogEntry{Term: 4, Index: 4, Cmd: []byte("cmd7")},
}
compareLogs(t, r.p, test2, "after truncate")
vlog.Infof("stopping %s", r.Id())
r.Stop()
// Make sure the log is still truncated when we reread it.
r, err = newRaft(ctx, &config, new(client))
if err != nil {
t.Fatalf("newRaft: %s", err)
}
vlog.Infof("id is %s", r.Id())
compareLogs(t, r.p, test2, "after truncate and close")
vlog.Infof("stopping %s", r.Id())
r.Stop()
vlog.Infof("TestPersistence passed")
}
func getFileNames(t *testing.T, dir string) []string {
d, err := os.Open(dir)
if err != nil {
t.Fatalf("opening %s: %s", dir, err)
}
defer d.Close()
// Find all snapshot and log files.
files, err := d.Readdirnames(0)
if err != nil {
t.Fatalf("reading %s: %s", dir, err)
}
return files
}
// waitForLeadership waits for r to be elected leader.
func waitForLeadership(r *raft, timeout time.Duration) bool {
start := time.Now()
for {
_, role, _ := r.Status()
if role == RoleLeader {
return true
}
if time.Now().Sub(start) > timeout {
return false
}
time.Sleep(10 * time.Millisecond)
}
}
// TestSnapshot sets the limit for taking a snapshot down to 1 record and makes sure that snapshots are taken.
func TestSnapshot(t *testing.T) {
vlog.Infof("TestSnapshot")
ctx, shutdown := test.V23Init()
defer shutdown()
td := tempDir(t)
defer os.RemoveAll(td)
config := RaftConfig{HostPort: "127.0.0.1:0", LogDir: td, SnapshotThreshold: 1}
rs, _ := buildRafts(t, ctx, 1, &config)
defer cleanUp(rs)
r := rs[0]
c1 := r.client
vlog.Infof("id is %s", r.Id())
// This should cause a snapshot.
if apperr, err := r.Append(ctx, []byte("string1")); err != nil || apperr != nil {
t.Fatalf("AppendToLog: %s/%s", apperr, err)
}
if apperr, err := r.Append(ctx, []byte("string2")); err != nil || apperr != nil {
t.Fatalf("AppendToLog: %s/%s", apperr, err)
}
if apperr, err := r.Append(ctx, []byte("string3")); err != nil || apperr != nil {
t.Fatalf("AppendToLog: %s/%s", apperr, err)
}
// Wait for the snapshot to appear. It is a background task so just loop.
deadline := time.Now().Add(time.Minute)
outer:
for {
time.Sleep(100 * time.Millisecond)
files := getFileNames(t, td)
for _, s := range files {
if strings.HasSuffix(s, ".snap") {
log := strings.TrimSuffix(s, ".snap") + ".log"
for _, l := range files {
if l == log {
break outer
}
}
}
}
if time.Now().After(deadline) {
t.Fatalf("timeout waiting for snap")
}
}
r.Stop()
// Restart. We should read and restart from the snapshot.
rs, _ = buildRafts(t, ctx, 1, &config)
defer cleanUp(rs)
r = rs[0]
c2 := r.client
vlog.Infof("new id is %s", r.Id())
// Wait for leadership.
if !waitForLeadership(r, time.Minute) {
t.Fatalf("didn't become leader")
}
// Wait to commit.
time.Sleep(time.Second)
c1.(*client).Compare(t, c2.(*client))
vlog.Infof("TestSnapshot passed")
}
// TestRemoteSnapshot makes sure a member can be recovered from a snapshot taken elsewhere.
func TestRemoteSnapshot(t *testing.T) {
vlog.Infof("TestRemoteSnapshot")
ctx, shutdown := test.V23Init()
defer shutdown()
config := RaftConfig{HostPort: "127.0.0.1:0", SnapshotThreshold: 30}
rs, cs := buildRafts(t, ctx, 3, &config)
defer cleanUp(rs)
// This should cause a few snapshots.
for i := 0; i < 100; i++ {
if apperr, err := rs[1].Append(ctx, []byte(fmt.Sprintf("string%d", i))); err != nil || apperr != nil {
t.Fatalf("Append: %s/%s", apperr, err)
}
}
vlog.Infof("Appends done")
// Wait for them all to get to the same point.
if !waitForLogAgreement(rs, time.Minute) {
t.Fatalf("no log agreement")
}
t.Log("Logs agree")
if !waitForAppliedAgreement(rs, cs, time.Minute) {
t.Fatalf("no applied agreement")
}
vlog.Infof("Applies agree")
// Stop a server and remove all its log and snapshot info, i.e., act like its starting
// from scratch.
rs[0].Stop()
os.RemoveAll(rs[0].logDir)
restart(t, ctx, rs, cs, rs[0])
// Wait for them all to get to the same point.
if !waitForLogAgreement(rs, time.Minute) {
t.Fatalf("no log agreement")
}
if !waitForAppliedAgreement(rs, cs, time.Minute) {
t.Fatalf("no applied agreement")
}
vlog.Infof("TestRemoteSnapshot passed")
}