Merge "gosh: convert Shell.Rename to Shell.Move"
diff --git a/simplemr/.api b/simplemr/.api
new file mode 100644
index 0000000..16b6f34
--- /dev/null
+++ b/simplemr/.api
@@ -0,0 +1,17 @@
+pkg simplemr, method (*Identity) Map(*MR, string, interface{}) error
+pkg simplemr, method (*Identity) Reduce(*MR, string, []interface{}) error
+pkg simplemr, method (*MR) Error() error
+pkg simplemr, method (*MR) MapOut(string, ...interface{})
+pkg simplemr, method (*MR) ReduceOut(string, ...interface{})
+pkg simplemr, method (*MR) Run(<-chan *Record, chan<- *Record, Mapper, Reducer) error
+pkg simplemr, type Identity struct
+pkg simplemr, type MR struct
+pkg simplemr, type MR struct, NumMappers int
+pkg simplemr, type MR struct, Timeout time.Duration
+pkg simplemr, type Mapper interface { Map }
+pkg simplemr, type Mapper interface, Map(*MR, string, interface{}) error
+pkg simplemr, type Record struct
+pkg simplemr, type Record struct, Key string
+pkg simplemr, type Record struct, Values []interface{}
+pkg simplemr, type Reducer interface { Reduce }
+pkg simplemr, type Reducer interface, Reduce(*MR, string, []interface{}) error
diff --git a/simplemr/example_test.go b/simplemr/example_test.go
new file mode 100644
index 0000000..c3df40a
--- /dev/null
+++ b/simplemr/example_test.go
@@ -0,0 +1,31 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package simplemr_test
+
+import (
+ "fmt"
+
+ "v.io/x/lib/simplemr"
+)
+
+func ExampleMR() {
+ in, out := make(chan *simplemr.Record, 2), make(chan *simplemr.Record, 2)
+ mr := &simplemr.MR{}
+ identity := &simplemr.Identity{}
+ go mr.Run(in, out, identity, identity)
+ in <- &simplemr.Record{"1", []interface{}{"hello\n"}}
+ in <- &simplemr.Record{"2", []interface{}{"world\n"}}
+ close(in)
+ k := <-out
+ fmt.Printf("%s: %s", k.Key, k.Values[0].(string))
+ k = <-out
+ fmt.Printf("%s: %s", k.Key, k.Values[0].(string))
+ if err := mr.Error(); err != nil {
+ fmt.Printf("mr failed: %v", err)
+ }
+ // Output:
+ // 1: hello
+ // 2: world
+}
diff --git a/simplemr/mr.go b/simplemr/mr.go
new file mode 100644
index 0000000..09d0347
--- /dev/null
+++ b/simplemr/mr.go
@@ -0,0 +1,192 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package simplemr provides a simple map reduce framework for use by
+// commandline and other tools and consequently can only be used from
+// within a single process. It is specifically not intended to support
+// large datasets, but mappers are run concurrently so that long running
+// tasks (e.g. external shell commands will be run in parallel). The
+// current implementation supoorts only a single reducer however future
+// implementations are likely to run multiple reducers and hence reducers
+// should be coded accordingly.
+package simplemr
+
+import (
+ "fmt"
+ "runtime"
+ "sort"
+ "sync"
+ "time"
+)
+
+// Mapper is in the interface that must be implemented by all mappers.
+type Mapper interface {
+ // Map is called by the framework for every key, value pair read
+ // from the specified input.
+ Map(mr *MR, key string, value interface{}) error
+}
+
+// Reducer is the interface that must be implemented by the reducer.
+type Reducer interface {
+ // Reduce is called by the framework for every key and associated
+ // values that are emitted by the Mappers.
+ Reduce(mr *MR, key string, values []interface{}) error
+}
+
+// Record represents all input and output data.
+type Record struct {
+ Key string
+ Values []interface{}
+}
+
+type store struct {
+ sync.Mutex
+ data map[string][]interface{}
+}
+
+func newStore() *store {
+ return &store{data: make(map[string][]interface{})}
+}
+
+func (s *store) sortedKeys() []string {
+ s.Lock()
+ defer s.Unlock()
+ keys := make([]string, 0, len(s.data))
+ for k, _ := range s.data {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ return keys
+}
+
+func (s *store) insert(k string, v ...interface{}) {
+ s.Lock()
+ defer s.Unlock()
+ s.data[k] = append(s.data[k], v...)
+}
+
+func (s *store) lookup(k string) []interface{} {
+ s.Lock()
+ defer s.Unlock()
+ return s.data[k]
+}
+
+// MR represents the Map Reduction.
+type MR struct {
+ input <-chan *Record
+ output chan<- *Record
+ err error
+ data *store
+ // The number of conccurent mappers to use. A value of 0 instructs
+ // the implementation to use an appropriate number, such as the number
+ // of available CPUs.
+ NumMappers int
+ // The time to wait for the map reduce to complete. A value of 0 implies
+ // no timeout - i.e. an infinite wait.
+ Timeout time.Duration
+}
+
+// Error returns any error that was returned by the Run method. It is
+// safe to read its value once the output channel passed to Run has been
+// closed.
+func (mr *MR) Error() error {
+ return mr.err
+}
+
+// MapOut outputs the key and associated values for subsequent
+// processing by a Reducer. It should only be called from a mapper.
+func (mr *MR) MapOut(key string, values ...interface{}) {
+ mr.data.insert(key, values...)
+}
+
+// ReduceOut outputs the key and associated values to the specified output
+// stream. It should only be called from a reducer.
+func (mr *MR) ReduceOut(key string, values ...interface{}) {
+ mr.output <- &Record{key, values}
+}
+
+func (mr *MR) runMapper(ch chan error, mapper Mapper) {
+ for {
+ rec := <-mr.input
+ if rec == nil {
+ ch <- nil
+ return
+ }
+ for _, v := range rec.Values {
+ if err := mapper.Map(mr, rec.Key, v); err != nil {
+ ch <- err
+ return
+ }
+ }
+ }
+}
+
+func (mr *MR) runMappers(mapper Mapper, timeout <-chan time.Time) error {
+ ch := make(chan error, mr.NumMappers)
+ for i := 0; i < mr.NumMappers; i++ {
+ go mr.runMapper(ch, mapper)
+ }
+ done := 0
+ for {
+ select {
+ case err := <-ch:
+ if err != nil {
+ // We should probably drain the channel.
+ return err
+ }
+ done++
+ if done == mr.NumMappers {
+ return nil
+ }
+ case <-timeout:
+ return fmt.Errorf("timed out mappers after %s", mr.Timeout)
+ }
+ }
+}
+
+func (mr *MR) runReducers(reducer Reducer, timeout <-chan time.Time) error {
+ ch := make(chan error, 1)
+ go func() {
+ for _, k := range mr.data.sortedKeys() {
+ v := mr.data.lookup(k)
+ if err := reducer.Reduce(mr, k, v); err != nil {
+ ch <- err
+ }
+ }
+ close(ch)
+ }()
+ var err error
+ select {
+ case err = <-ch:
+ case <-timeout:
+ err = fmt.Errorf("timed out reducers after %s", mr.Timeout)
+ }
+ return err
+}
+
+// Run runs the map reduction using the supplied mapper and reducer reading
+// from input and writing to output. The caller must close the input channel
+// when there is no more input data. The implementation of Run will close
+// the output channel when the Reducer has processed all intermediate data.
+// Run may only be called once per MR receiver.
+func (mr *MR) Run(input <-chan *Record, output chan<- *Record, mapper Mapper, reducer Reducer) error {
+ mr.input, mr.output, mr.data = input, output, newStore()
+ if mr.NumMappers == 0 {
+ // TODO(cnicolaou,toddw): consider using a new goroutine
+ // for every input record rather than fixing concurrency like
+ // this. Maybe an another option is to use the capacity of the
+ // input channel.
+ mr.NumMappers = runtime.NumCPU()
+ }
+ var timeout <-chan time.Time
+ if mr.Timeout > 0 {
+ timeout = time.After(mr.Timeout)
+ }
+ defer close(mr.output)
+ if mr.err = mr.runMappers(mapper, timeout); mr.err != nil {
+ return mr.err
+ }
+ mr.err = mr.runReducers(reducer, timeout)
+ return mr.err
+}
diff --git a/simplemr/mr_test.go b/simplemr/mr_test.go
new file mode 100644
index 0000000..00ed1f8
--- /dev/null
+++ b/simplemr/mr_test.go
@@ -0,0 +1,205 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package simplemr_test
+
+import (
+ "fmt"
+ "runtime"
+ "strings"
+ "testing"
+ "time"
+
+ "v.io/x/lib/simplemr"
+)
+
+func newChans(chanSize int) (chan *simplemr.Record, chan *simplemr.Record) {
+ return make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize)
+}
+
+type termCount struct{}
+
+func (tc *termCount) Map(mr *simplemr.MR, key string, val interface{}) error {
+ text, ok := val.(string)
+ if !ok {
+ return fmt.Errorf("%T is the wrong type", val)
+ }
+ for _, token := range strings.Split(text, " ") {
+ mr.MapOut(token, 1)
+ }
+ return nil
+}
+
+func (tc *termCount) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+ count := 0
+ for _, val := range values {
+ c, ok := val.(int)
+ if !ok {
+ return fmt.Errorf("%T is the wrong type", val)
+ }
+ count += c
+ }
+ mr.ReduceOut(key, count)
+ return nil
+}
+
+var (
+ d1 = "a b c"
+ d2 = "a b c d"
+ d3 = "e f"
+)
+
+func expect(t *testing.T, out chan *simplemr.Record, key string, vals ...int) {
+ rec := <-out
+ if got, want := rec.Key, key; got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ if got, want := len(rec.Values), len(vals); got != want {
+ t.Errorf("got %v, want %v", got, want)
+ return
+ }
+ for i, v := range vals {
+ if got, want := rec.Values[i], v; got != want {
+ t.Errorf("%d: got %v, want %v", i, got, want)
+ }
+ }
+}
+
+func TestMR(t *testing.T) {
+ mrt := &simplemr.MR{}
+ in, out := newChans(10)
+ tc := &termCount{}
+ genInput := func() {
+ in <- &simplemr.Record{"d1", []interface{}{d1, d2, d3}}
+ in <- &simplemr.Record{"d2", []interface{}{d1, d2, d3}}
+ close(in)
+ }
+ go genInput()
+ if err := mrt.Run(in, out, tc, tc); err != nil {
+ t.Fatal(err)
+ }
+
+ expect(t, out, "a", 4)
+ expect(t, out, "b", 4)
+ expect(t, out, "c", 4)
+ expect(t, out, "d", 2)
+ expect(t, out, "e", 2)
+ expect(t, out, "f", 2)
+ kvs := <-out
+ if kvs != nil {
+ t.Fatal("expected the channel to be closed")
+ }
+}
+
+type slowReducer struct{}
+
+func (sr *slowReducer) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+ time.Sleep(time.Hour)
+ return nil
+}
+
+func TestTimeout(t *testing.T) {
+ in, out := newChans(1)
+ mrt := &simplemr.MR{Timeout: 10 * time.Millisecond}
+ identity := &simplemr.Identity{}
+ mrt.Run(in, out, identity, identity)
+ if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out mappers") {
+ t.Fatalf("missing or wrong error: %v", err)
+ }
+ mrt = &simplemr.MR{Timeout: 10 * time.Millisecond}
+ in, out = newChans(1)
+ in <- &simplemr.Record{"key", []interface{}{"value"}}
+ close(in)
+ mrt.Run(in, out, identity, &slowReducer{})
+ if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out reducers") {
+ t.Fatalf("missing or wrong error: %v", err)
+ }
+}
+
+type sleeper struct{}
+
+const sleepTime = time.Millisecond * 100
+
+func (sl *sleeper) Map(mr *simplemr.MR, key string, val interface{}) error {
+ time.Sleep(sleepTime)
+ mr.MapOut(key, val)
+ return nil
+}
+
+func (sl *sleeper) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+ mr.ReduceOut(key, values...)
+ return nil
+}
+
+func runMappers(t *testing.T, bufsize, numMappers int) time.Duration {
+ mrt := &simplemr.MR{NumMappers: numMappers}
+ in, out := newChans(bufsize)
+ sl := &sleeper{}
+ go func() {
+ for i := 0; i < bufsize; i++ {
+ in <- &simplemr.Record{Key: fmt.Sprintf("%d", i), Values: []interface{}{i}}
+ }
+ close(in)
+ }()
+ then := time.Now()
+ if err := mrt.Run(in, out, sl, sl); err != nil {
+ t.Fatal(err)
+ }
+ return time.Since(then)
+}
+
+func TestOneMappers(t *testing.T) {
+ bufsize := 5
+ runtime := runMappers(t, bufsize, 1)
+ if got, want := runtime, time.Duration(int64(sleepTime)*int64(bufsize)); got < want {
+ t.Errorf("took %s which is too fast, should be at least %s", got, want)
+ }
+}
+
+func TestMultipleMappers(t *testing.T) {
+ numCPUs := runtime.NumCPU()
+ if numCPUs == 1 {
+ t.Skip("can't test concurrency with only one CPU")
+ }
+ bufsize := 5
+ runtime := runMappers(t, bufsize, numCPUs)
+ if got, want := runtime, time.Duration(int64(sleepTime)*int64(bufsize)); got > want {
+ t.Errorf("took %s which is too slow, should take no longer than %s", got, want)
+ }
+}
+
+type adder struct{}
+
+func (a *adder) Map(mr *simplemr.MR, key string, val interface{}) error {
+ i := val.(int)
+ i++
+ mr.MapOut(key, i)
+ return nil
+}
+
+func (a *adder) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
+ mr.ReduceOut(key, values...)
+ return nil
+}
+
+func TestChainedMR(t *testing.T) {
+ chanSize := 5
+ in, middle, out := make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize), make(chan *simplemr.Record, chanSize)
+ mrt1 := &simplemr.MR{}
+ mrt2 := &simplemr.MR{}
+ adder := &adder{}
+ go mrt1.Run(in, middle, adder, adder)
+ go mrt2.Run(middle, out, adder, adder)
+ in <- &simplemr.Record{"1", []interface{}{1}}
+ in <- &simplemr.Record{"2", []interface{}{2}}
+ close(in)
+ expect(t, out, "1", 3)
+ expect(t, out, "2", 4)
+ if err := mrt1.Error(); err != nil {
+ t.Fatal(err)
+ }
+ if err := mrt2.Error(); err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/simplemr/util.go b/simplemr/util.go
new file mode 100644
index 0000000..8d1a19a
--- /dev/null
+++ b/simplemr/util.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package simplemr
+
+type Identity struct{}
+
+func (i *Identity) Map(mr *MR, key string, val interface{}) error {
+ mr.MapOut(key, val)
+ return nil
+}
+
+func (i *Identity) Reduce(mr *MR, key string, values []interface{}) error {
+ mr.ReduceOut(key, values...)
+ return nil
+}