Merge "gosh: convert Shell.Rename to Shell.Move"
diff --git a/simplemr/.api b/simplemr/.api
new file mode 100644
index 0000000..16b6f34
--- /dev/null
+++ b/simplemr/.api
@@ -0,0 +1,17 @@
+pkg simplemr, method (*Identity) Map(*MR, string, interface{}) error
+pkg simplemr, method (*Identity) Reduce(*MR, string, []interface{}) error
+pkg simplemr, method (*MR) Error() error
+pkg simplemr, method (*MR) MapOut(string, ...interface{})
+pkg simplemr, method (*MR) ReduceOut(string, ...interface{})
+pkg simplemr, method (*MR) Run(<-chan *Record, chan<- *Record, Mapper, Reducer) error
+pkg simplemr, type Identity struct
+pkg simplemr, type MR struct
+pkg simplemr, type MR struct, NumMappers int
+pkg simplemr, type MR struct, Timeout time.Duration
+pkg simplemr, type Mapper interface { Map }
+pkg simplemr, type Mapper interface, Map(*MR, string, interface{}) error
+pkg simplemr, type Record struct
+pkg simplemr, type Record struct, Key string
+pkg simplemr, type Record struct, Values []interface{}
+pkg simplemr, type Reducer interface { Reduce }
+pkg simplemr, type Reducer interface, Reduce(*MR, string, []interface{}) error
diff --git a/simplemr/example_test.go b/simplemr/example_test.go
new file mode 100644
index 0000000..c3df40a
--- /dev/null
+++ b/simplemr/example_test.go
@@ -0,0 +1,31 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package simplemr_test
+
+import (
+	"fmt"
+
+	"v.io/x/lib/simplemr"
+)
+
+func ExampleMR() {
+	in, out := make(chan *simplemr.Record, 2), make(chan *simplemr.Record, 2)
+	mr := &simplemr.MR{}
+	identity := &simplemr.Identity{}
+	go mr.Run(in, out, identity, identity)
+	in <- &simplemr.Record{"1", []interface{}{"hello\n"}}
+	in <- &simplemr.Record{"2", []interface{}{"world\n"}}
+	close(in)
+	k := <-out
+	fmt.Printf("%s: %s", k.Key, k.Values[0].(string))
+	k = <-out
+	fmt.Printf("%s: %s", k.Key, k.Values[0].(string))
+	if err := mr.Error(); err != nil {
+		fmt.Printf("mr failed: %v", err)
+	}
+	// Output:
+	// 1: hello
+	// 2: world
+}
diff --git a/simplemr/mr.go b/simplemr/mr.go
new file mode 100644
index 0000000..09d0347
--- /dev/null
+++ b/simplemr/mr.go
@@ -0,0 +1,192 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package simplemr provides a simple map reduce framework for use by
+// commandline and other tools and consequently can only be used from
+// within a single process. It is specifically not intended to support
+// large datasets, but mappers are run concurrently so that long running
+// tasks (e.g. external shell commands will be run in parallel). The
+// current implementation supoorts only a single reducer however future
+// implementations are likely to run multiple reducers and hence reducers
+// should be coded accordingly.
+package simplemr
+
+import (
+	"fmt"
+	"runtime"
+	"sort"
+	"sync"
+	"time"
+)
+
+// Mapper is in the interface that must be implemented by all mappers.
+type Mapper interface {
+	// Map is called by the framework for every key, value pair read
+	// from the specified input.
+	Map(mr *MR, key string, value interface{}) error
+}
+
+// Reducer is the interface that must be implemented by the reducer.
+type Reducer interface {
+	// Reduce is called by the framework for every key and associated
+	// values that are emitted by the Mappers.
+	Reduce(mr *MR, key string, values []interface{}) error
+}
+
+// Record represents all input and output data.
+type Record struct {
+	Key    string
+	Values []interface{}
+}
+
+type store struct {
+	sync.Mutex
+	data map[string][]interface{}
+}
+
+func newStore() *store {
+	return &store{data: make(map[string][]interface{})}
+}
+
+func (s *store) sortedKeys() []string {
+	s.Lock()
+	defer s.Unlock()
+	keys := make([]string, 0, len(s.data))
+	for k, _ := range s.data {
+		keys = append(keys, k)
+	}
+	sort.Strings(keys)
+	return keys
+}
+
+func (s *store) insert(k string, v ...interface{}) {
+	s.Lock()
+	defer s.Unlock()
+	s.data[k] = append(s.data[k], v...)
+}
+
+func (s *store) lookup(k string) []interface{} {
+	s.Lock()
+	defer s.Unlock()
+	return s.data[k]
+}
+
+// MR represents the Map Reduction.
+type MR struct {
+	input  <-chan *Record
+	output chan<- *Record
+	err    error
+	data   *store
+	// The number of conccurent mappers to use. A value of 0 instructs
+	// the implementation to use an appropriate number, such as the number
+	// of available CPUs.
+	NumMappers int
+	// The time to wait for the map reduce to complete. A value of 0 implies
+	// no timeout - i.e. an infinite wait.
+	Timeout time.Duration
+}
+
+// Error returns any error that was returned by the Run method. It is
+// safe to read its value once the output channel passed to Run has been
+// closed.
+func (mr *MR) Error() error {
+	return mr.err
+}
+
+// MapOut outputs the key and associated values for subsequent
+// processing by a Reducer. It should only be called from a mapper.
+func (mr *MR) MapOut(key string, values ...interface{}) {
+	mr.data.insert(key, values...)
+}
+
+// ReduceOut outputs the key and associated values to the specified output
+// stream. It should only be called from a reducer.
+func (mr *MR) ReduceOut(key string, values ...interface{}) {
+	mr.output <- &Record{key, values}
+}
+
+func (mr *MR) runMapper(ch chan error, mapper Mapper) {
+	for {
+		rec := <-mr.input
+		if rec == nil {
+			ch <- nil
+			return
+		}
+		for _, v := range rec.Values {
+			if err := mapper.Map(mr, rec.Key, v); err != nil {
+				ch <- err
+				return
+			}
+		}
+	}
+}
+
+func (mr *MR) runMappers(mapper Mapper, timeout <-chan time.Time) error {
+	ch := make(chan error, mr.NumMappers)
+	for i := 0; i < mr.NumMappers; i++ {
+		go mr.runMapper(ch, mapper)
+	}
+	done := 0
+	for {
+		select {
+		case err := <-ch:
+			if err != nil {
+				// We should probably drain the channel.
+				return err
+			}
+			done++
+			if done == mr.NumMappers {
+				return nil
+			}
+		case <-timeout:
+			return fmt.Errorf("timed out mappers after %s", mr.Timeout)
+		}
+	}
+}
+
+func (mr *MR) runReducers(reducer Reducer, timeout <-chan time.Time) error {
+	ch := make(chan error, 1)
+	go func() {
+		for _, k := range mr.data.sortedKeys() {
+			v := mr.data.lookup(k)
+			if err := reducer.Reduce(mr, k, v); err != nil {
+				ch <- err
+			}
+		}
+		close(ch)
+	}()
+	var err error
+	select {
+	case err = <-ch:
+	case <-timeout:
+		err = fmt.Errorf("timed out reducers after %s", mr.Timeout)
+	}
+	return err
+}
+
+// Run runs the map reduction using the supplied mapper and reducer reading
+// from input and writing to output. The caller must close the input channel
+// when there is no more input data. The implementation of Run will close
+// the output channel when the Reducer has processed all intermediate data.
+// Run may only be called once per MR receiver.
+func (mr *MR) Run(input <-chan *Record, output chan<- *Record, mapper Mapper, reducer Reducer) error {
+	mr.input, mr.output, mr.data = input, output, newStore()
+	if mr.NumMappers == 0 {
+		// TODO(cnicolaou,toddw): consider using a new goroutine
+		// for every input record rather than fixing concurrency like
+		// this. Maybe an another option is to use the capacity of the
+		// input channel.
+		mr.NumMappers = runtime.NumCPU()
+	}
+	var timeout <-chan time.Time
+	if mr.Timeout > 0 {
+		timeout = time.After(mr.Timeout)
+	}
+	defer close(mr.output)
+	if mr.err = mr.runMappers(mapper, timeout); mr.err != nil {
+		return mr.err
+	}
+	mr.err = mr.runReducers(reducer, timeout)
+	return mr.err
+}
diff --git a/simplemr/mr_test.go b/simplemr/mr_test.go
new file mode 100644
index 0000000..00ed1f8
--- /dev/null
+++ b/simplemr/mr_test.go
@@ -0,0 +1,205 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package simplemr_test
+
+import (
+	"fmt"
+	"runtime"
+	"strings"
+	"testing"
+	"time"
+
+	"v.io/x/lib/simplemr"
+)
+
+func newChans(chanSize int) (chan *simplemr.Record, chan *simplemr.Record) {
+	return make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize)
+}
+
+type termCount struct{}
+
+func (tc *termCount) Map(mr *simplemr.MR, key string, val interface{}) error {
+	text, ok := val.(string)
+	if !ok {
+		return fmt.Errorf("%T is the wrong type", val)
+	}
+	for _, token := range strings.Split(text, " ") {
+		mr.MapOut(token, 1)
+	}
+	return nil
+}
+
+func (tc *termCount) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+	count := 0
+	for _, val := range values {
+		c, ok := val.(int)
+		if !ok {
+			return fmt.Errorf("%T is the wrong type", val)
+		}
+		count += c
+	}
+	mr.ReduceOut(key, count)
+	return nil
+}
+
+var (
+	d1 = "a b c"
+	d2 = "a b c d"
+	d3 = "e f"
+)
+
+func expect(t *testing.T, out chan *simplemr.Record, key string, vals ...int) {
+	rec := <-out
+	if got, want := rec.Key, key; got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+	if got, want := len(rec.Values), len(vals); got != want {
+		t.Errorf("got %v, want %v", got, want)
+		return
+	}
+	for i, v := range vals {
+		if got, want := rec.Values[i], v; got != want {
+			t.Errorf("%d: got %v, want %v", i, got, want)
+		}
+	}
+}
+
+func TestMR(t *testing.T) {
+	mrt := &simplemr.MR{}
+	in, out := newChans(10)
+	tc := &termCount{}
+	genInput := func() {
+		in <- &simplemr.Record{"d1", []interface{}{d1, d2, d3}}
+		in <- &simplemr.Record{"d2", []interface{}{d1, d2, d3}}
+		close(in)
+	}
+	go genInput()
+	if err := mrt.Run(in, out, tc, tc); err != nil {
+		t.Fatal(err)
+	}
+
+	expect(t, out, "a", 4)
+	expect(t, out, "b", 4)
+	expect(t, out, "c", 4)
+	expect(t, out, "d", 2)
+	expect(t, out, "e", 2)
+	expect(t, out, "f", 2)
+	kvs := <-out
+	if kvs != nil {
+		t.Fatal("expected the channel to be closed")
+	}
+}
+
+type slowReducer struct{}
+
+func (sr *slowReducer) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+	time.Sleep(time.Hour)
+	return nil
+}
+
+func TestTimeout(t *testing.T) {
+	in, out := newChans(1)
+	mrt := &simplemr.MR{Timeout: 10 * time.Millisecond}
+	identity := &simplemr.Identity{}
+	mrt.Run(in, out, identity, identity)
+	if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out mappers") {
+		t.Fatalf("missing or wrong error: %v", err)
+	}
+	mrt = &simplemr.MR{Timeout: 10 * time.Millisecond}
+	in, out = newChans(1)
+	in <- &simplemr.Record{"key", []interface{}{"value"}}
+	close(in)
+	mrt.Run(in, out, identity, &slowReducer{})
+	if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out reducers") {
+		t.Fatalf("missing or wrong error: %v", err)
+	}
+}
+
+type sleeper struct{}
+
+const sleepTime = time.Millisecond * 100
+
+func (sl *sleeper) Map(mr *simplemr.MR, key string, val interface{}) error {
+	time.Sleep(sleepTime)
+	mr.MapOut(key, val)
+	return nil
+}
+
+func (sl *sleeper) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+	mr.ReduceOut(key, values...)
+	return nil
+}
+
+func runMappers(t *testing.T, bufsize, numMappers int) time.Duration {
+	mrt := &simplemr.MR{NumMappers: numMappers}
+	in, out := newChans(bufsize)
+	sl := &sleeper{}
+	go func() {
+		for i := 0; i < bufsize; i++ {
+			in <- &simplemr.Record{Key: fmt.Sprintf("%d", i), Values: []interface{}{i}}
+		}
+		close(in)
+	}()
+	then := time.Now()
+	if err := mrt.Run(in, out, sl, sl); err != nil {
+		t.Fatal(err)
+	}
+	return time.Since(then)
+}
+
+func TestOneMappers(t *testing.T) {
+	bufsize := 5
+	runtime := runMappers(t, bufsize, 1)
+	if got, want := runtime, time.Duration(int64(sleepTime)*int64(bufsize)); got < want {
+		t.Errorf("took %s which is too fast, should be at least %s", got, want)
+	}
+}
+
+func TestMultipleMappers(t *testing.T) {
+	numCPUs := runtime.NumCPU()
+	if numCPUs == 1 {
+		t.Skip("can't test concurrency with only one CPU")
+	}
+	bufsize := 5
+	runtime := runMappers(t, bufsize, numCPUs)
+	if got, want := runtime, time.Duration(int64(sleepTime)*int64(bufsize)); got > want {
+		t.Errorf("took %s which is too slow, should take no longer than %s", got, want)
+	}
+}
+
+type adder struct{}
+
+func (a *adder) Map(mr *simplemr.MR, key string, val interface{}) error {
+	i := val.(int)
+	i++
+	mr.MapOut(key, i)
+	return nil
+}
+
+func (a *adder) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+	mr.ReduceOut(key, values...)
+	return nil
+}
+
+func TestChainedMR(t *testing.T) {
+	chanSize := 5
+	in, middle, out := make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize)
+	mrt1 := &simplemr.MR{}
+	mrt2 := &simplemr.MR{}
+	adder := &adder{}
+	go mrt1.Run(in, middle, adder, adder)
+	go mrt2.Run(middle, out, adder, adder)
+	in <- &simplemr.Record{"1", []interface{}{1}}
+	in <- &simplemr.Record{"2", []interface{}{2}}
+	close(in)
+	expect(t, out, "1", 3)
+	expect(t, out, "2", 4)
+	if err := mrt1.Error(); err != nil {
+		t.Fatal(err)
+	}
+	if err := mrt2.Error(); err != nil {
+		t.Fatal(err)
+	}
+}
diff --git a/simplemr/util.go b/simplemr/util.go
new file mode 100644
index 0000000..8d1a19a
--- /dev/null
+++ b/simplemr/util.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package simplemr
+
+type Identity struct{}
+
+func (i *Identity) Map(mr *MR, key string, val interface{}) error {
+	mr.MapOut(key, val)
+	return nil
+}
+
+func (i *Identity) Reduce(mr *MR, key string, values []interface{}) error {
+	mr.ReduceOut(key, values...)
+	return nil
+}