blob: 00ed1f87040625db12593fdb204b38da160db89f [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package simplemr_test
import (
"fmt"
"runtime"
"strings"
"testing"
"time"
"v.io/x/lib/simplemr"
)
func newChans(chanSize int) (chan *simplemr.Record, chan *simplemr.Record) {
return make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize)
}
type termCount struct{}
func (tc *termCount) Map(mr *simplemr.MR, key string, val interface{}) error {
text, ok := val.(string)
if !ok {
return fmt.Errorf("%T is the wrong type", val)
}
for _, token := range strings.Split(text, " ") {
mr.MapOut(token, 1)
}
return nil
}
func (tc *termCount) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
count := 0
for _, val := range values {
c, ok := val.(int)
if !ok {
return fmt.Errorf("%T is the wrong type", val)
}
count += c
}
mr.ReduceOut(key, count)
return nil
}
var (
d1 = "a b c"
d2 = "a b c d"
d3 = "e f"
)
func expect(t *testing.T, out chan *simplemr.Record, key string, vals ...int) {
rec := <-out
if got, want := rec.Key, key; got != want {
t.Errorf("got %v, want %v", got, want)
}
if got, want := len(rec.Values), len(vals); got != want {
t.Errorf("got %v, want %v", got, want)
return
}
for i, v := range vals {
if got, want := rec.Values[i], v; got != want {
t.Errorf("%d: got %v, want %v", i, got, want)
}
}
}
func TestMR(t *testing.T) {
mrt := &simplemr.MR{}
in, out := newChans(10)
tc := &termCount{}
genInput := func() {
in <- &simplemr.Record{"d1", []interface{}{d1, d2, d3}}
in <- &simplemr.Record{"d2", []interface{}{d1, d2, d3}}
close(in)
}
go genInput()
if err := mrt.Run(in, out, tc, tc); err != nil {
t.Fatal(err)
}
expect(t, out, "a", 4)
expect(t, out, "b", 4)
expect(t, out, "c", 4)
expect(t, out, "d", 2)
expect(t, out, "e", 2)
expect(t, out, "f", 2)
kvs := <-out
if kvs != nil {
t.Fatal("expected the channel to be closed")
}
}
type slowReducer struct{}
func (sr *slowReducer) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
time.Sleep(time.Hour)
return nil
}
func TestTimeout(t *testing.T) {
in, out := newChans(1)
mrt := &simplemr.MR{Timeout: 10 * time.Millisecond}
identity := &simplemr.Identity{}
mrt.Run(in, out, identity, identity)
if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out mappers") {
t.Fatalf("missing or wrong error: %v", err)
}
mrt = &simplemr.MR{Timeout: 10 * time.Millisecond}
in, out = newChans(1)
in <- &simplemr.Record{"key", []interface{}{"value"}}
close(in)
mrt.Run(in, out, identity, &slowReducer{})
if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out reducers") {
t.Fatalf("missing or wrong error: %v", err)
}
}
type sleeper struct{}
const sleepTime = time.Millisecond * 100
func (sl *sleeper) Map(mr *simplemr.MR, key string, val interface{}) error {
time.Sleep(sleepTime)
mr.MapOut(key, val)
return nil
}
func (sl *sleeper) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
mr.ReduceOut(key, values...)
return nil
}
func runMappers(t *testing.T, bufsize, numMappers int) time.Duration {
mrt := &simplemr.MR{NumMappers: numMappers}
in, out := newChans(bufsize)
sl := &sleeper{}
go func() {
for i := 0; i < bufsize; i++ {
in <- &simplemr.Record{Key: fmt.Sprintf("%d", i), Values: []interface{}{i}}
}
close(in)
}()
then := time.Now()
if err := mrt.Run(in, out, sl, sl); err != nil {
t.Fatal(err)
}
return time.Since(then)
}
func TestOneMappers(t *testing.T) {
bufsize := 5
runtime := runMappers(t, bufsize, 1)
if got, want := runtime, time.Duration(int64(sleepTime)*int64(bufsize)); got < want {
t.Errorf("took %s which is too fast, should be at least %s", got, want)
}
}
func TestMultipleMappers(t *testing.T) {
numCPUs := runtime.NumCPU()
if numCPUs == 1 {
t.Skip("can't test concurrency with only one CPU")
}
bufsize := 5
runtime := runMappers(t, bufsize, numCPUs)
if got, want := runtime, time.Duration(int64(sleepTime)*int64(bufsize)); got > want {
t.Errorf("took %s which is too slow, should take no longer than %s", got, want)
}
}
type adder struct{}
func (a *adder) Map(mr *simplemr.MR, key string, val interface{}) error {
i := val.(int)
i++
mr.MapOut(key, i)
return nil
}
func (a *adder) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
mr.ReduceOut(key, values...)
return nil
}
func TestChainedMR(t *testing.T) {
chanSize := 5
in, middle, out := make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize)
mrt1 := &simplemr.MR{}
mrt2 := &simplemr.MR{}
adder := &adder{}
go mrt1.Run(in, middle, adder, adder)
go mrt2.Run(middle, out, adder, adder)
in <- &simplemr.Record{"1", []interface{}{1}}
in <- &simplemr.Record{"2", []interface{}{2}}
close(in)
expect(t, out, "1", 3)
expect(t, out, "2", 4)
if err := mrt1.Error(); err != nil {
t.Fatal(err)
}
if err := mrt2.Error(); err != nil {
t.Fatal(err)
}
}