// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package simplemr provides a simple map reduce framework for use by
// commandline and other tools and consequently can only be used from
// within a single process. It is specifically not intended to support
// large datasets, but mappers are run concurrently so that long running
// tasks (e.g. external shell commands will be run in parallel). The
// current implementation supoorts only a single reducer however future
// implementations are likely to run multiple reducers and hence reducers
// should be coded accordingly.
package simplemr

import (
	"fmt"
	"runtime"
	"sort"
	"sync"
	"time"
)

// Mapper is in the interface that must be implemented by all mappers.
type Mapper interface {
	// Map is called by the framework for every key, value pair read
	// from the specified input.
	Map(mr *MR, key string, value interface{}) error
}

// Reducer is the interface that must be implemented by the reducer.
type Reducer interface {
	// Reduce is called by the framework for every key and associated
	// values that are emitted by the Mappers.
	Reduce(mr *MR, key string, values []interface{}) error
}

// Record represents all input and output data.
type Record struct {
	Key    string
	Values []interface{}
}

type store struct {
	sync.Mutex
	data map[string][]interface{}
}

func newStore() *store {
	return &store{data: make(map[string][]interface{})}
}

func (s *store) sortedKeys() []string {
	s.Lock()
	defer s.Unlock()
	keys := make([]string, 0, len(s.data))
	for k, _ := range s.data {
		keys = append(keys, k)
	}
	sort.Strings(keys)
	return keys
}

func (s *store) insert(k string, v ...interface{}) {
	s.Lock()
	defer s.Unlock()
	s.data[k] = append(s.data[k], v...)
}

func (s *store) lookup(k string) []interface{} {
	s.Lock()
	defer s.Unlock()
	return s.data[k]
}

// MR represents the Map Reduction.
type MR struct {
	input  <-chan *Record
	output chan<- *Record
	err    error
	data   *store
	// The number of conccurent mappers to use. A value of 0 instructs
	// the implementation to use an appropriate number, such as the number
	// of available CPUs.
	NumMappers int
	// The time to wait for the map reduce to complete. A value of 0 implies
	// no timeout - i.e. an infinite wait.
	Timeout time.Duration
}

// Error returns any error that was returned by the Run method. It is
// safe to read its value once the output channel passed to Run has been
// closed.
func (mr *MR) Error() error {
	return mr.err
}

// MapOut outputs the key and associated values for subsequent
// processing by a Reducer. It should only be called from a mapper.
func (mr *MR) MapOut(key string, values ...interface{}) {
	mr.data.insert(key, values...)
}

// ReduceOut outputs the key and associated values to the specified output
// stream. It should only be called from a reducer.
func (mr *MR) ReduceOut(key string, values ...interface{}) {
	mr.output <- &Record{key, values}
}

func (mr *MR) runMapper(ch chan error, mapper Mapper) {
	for {
		rec := <-mr.input
		if rec == nil {
			ch <- nil
			return
		}
		for _, v := range rec.Values {
			if err := mapper.Map(mr, rec.Key, v); err != nil {
				ch <- err
				return
			}
		}
	}
}

func (mr *MR) runMappers(mapper Mapper, timeout <-chan time.Time) error {
	ch := make(chan error, mr.NumMappers)
	for i := 0; i < mr.NumMappers; i++ {
		go mr.runMapper(ch, mapper)
	}
	done := 0
	for {
		select {
		case err := <-ch:
			if err != nil {
				// We should probably drain the channel.
				return err
			}
			done++
			if done == mr.NumMappers {
				return nil
			}
		case <-timeout:
			return fmt.Errorf("timed out mappers after %s", mr.Timeout)
		}
	}
}

func (mr *MR) runReducers(reducer Reducer, timeout <-chan time.Time) error {
	ch := make(chan error, 1)
	go func() {
		for _, k := range mr.data.sortedKeys() {
			v := mr.data.lookup(k)
			if err := reducer.Reduce(mr, k, v); err != nil {
				ch <- err
			}
		}
		close(ch)
	}()
	var err error
	select {
	case err = <-ch:
	case <-timeout:
		err = fmt.Errorf("timed out reducers after %s", mr.Timeout)
	}
	return err
}

// Run runs the map reduction using the supplied mapper and reducer reading
// from input and writing to output. The caller must close the input channel
// when there is no more input data. The implementation of Run will close
// the output channel when the Reducer has processed all intermediate data.
// Run may only be called once per MR receiver.
func (mr *MR) Run(input <-chan *Record, output chan<- *Record, mapper Mapper, reducer Reducer) error {
	mr.input, mr.output, mr.data = input, output, newStore()
	if mr.NumMappers == 0 {
		// TODO(cnicolaou,toddw): consider using a new goroutine
		// for every input record rather than fixing concurrency like
		// this. Maybe an another option is to use the capacity of the
		// input channel.
		mr.NumMappers = runtime.NumCPU()
	}
	var timeout <-chan time.Time
	if mr.Timeout > 0 {
		timeout = time.After(mr.Timeout)
	}
	defer close(mr.output)
	if mr.err = mr.runMappers(mapper, timeout); mr.err != nil {
		return mr.err
	}
	mr.err = mr.runReducers(reducer, timeout)
	return mr.err
}
