// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package internal

import (
	"fmt"
	"reflect"
	"strconv"
	"sync"

	ds "v.io/v23/query/engine/datasource"
	"v.io/v23/query/engine/internal/query_checker"
	"v.io/v23/query/engine/internal/query_functions"
	"v.io/v23/query/engine/internal/query_parser"
	"v.io/v23/query/engine/public"
	"v.io/v23/query/syncql"
	"v.io/v23/vdl"
)

type queryEngineImpl struct {
	db                      ds.Database
	mutexNextID             sync.Mutex
	nextID                  int64
	mutexPreparedStatements sync.Mutex
	preparedStatements      map[int64]*query_parser.Statement
}

type preparedStatementImpl struct {
	qe *queryEngineImpl
	id int64 // key to AST stored in queryEngineImpl.
}

type paramInfo struct {
	paramValues []*vdl.Value
	cursor      int64
}

func Create(db ds.Database) public.QueryEngine {
	return &queryEngineImpl{db: db, nextID: 0, preparedStatements: map[int64]*query_parser.Statement{}}
}

func (qe *queryEngineImpl) Exec(q string) ([]string, syncql.ResultStream, error) {
	return Exec(qe.db, q)
}

func (qe *queryEngineImpl) GetPreparedStatement(handle int64) (public.PreparedStatement, error) {
	qe.mutexPreparedStatements.Lock()
	_, ok := qe.preparedStatements[handle]
	qe.mutexPreparedStatements.Unlock()
	if ok {
		return &preparedStatementImpl{qe, handle}, nil
	} else {
		return nil, syncql.NewErrPreparedStatementNotFound(qe.db.GetContext())
	}
}

func (qe *queryEngineImpl) PrepareStatement(q string) (public.PreparedStatement, error) {
	s, err := query_parser.Parse(qe.db, q)
	if err != nil {
		return nil, err
	}
	qe.mutexNextID.Lock()
	id := qe.nextID
	qe.nextID++
	qe.mutexNextID.Unlock()
	qe.mutexPreparedStatements.Lock()
	qe.preparedStatements[id] = s
	qe.mutexPreparedStatements.Unlock()
	return &preparedStatementImpl{qe, id}, nil
}

func (p *preparedStatementImpl) Exec(paramValues ...*vdl.Value) ([]string, syncql.ResultStream, error) {
	// Find the AST
	p.qe.mutexPreparedStatements.Lock()
	s := p.qe.preparedStatements[p.id]
	p.qe.mutexPreparedStatements.Unlock()

	// Copy the AST and substitute any parameters with actual values.
	// Note: Not all of the AST is copied as most parts are immutable.
	sCopy, err := (*s).CopyAndSubstitute(p.qe.db, paramValues)
	if err != nil {
		return nil, nil, err
	}

	// Sematnically check the copied AST and then execute it.
	return checkAndExec(p.qe.db, &sCopy)
}

func (p *preparedStatementImpl) Handle() int64 {
	return p.id
}

func (p *preparedStatementImpl) Close() {
	p.qe.mutexPreparedStatements.Lock()
	delete(p.qe.preparedStatements, p.id)
	p.qe.mutexPreparedStatements.Unlock()
}

func Exec(db ds.Database, q string) ([]string, syncql.ResultStream, error) {
	s, err := query_parser.Parse(db, q)
	if err != nil {
		return nil, nil, err
	}
	return checkAndExec(db, s)
}

func checkAndExec(db ds.Database, s *query_parser.Statement) ([]string, syncql.ResultStream, error) {
	if err := query_checker.Check(db, s); err != nil {
		return nil, nil, err
	}
	switch (*s).(type) {
	case query_parser.SelectStatement, query_parser.DeleteStatement:
		return execStatement(db, s)
	default:
		return nil, nil, syncql.NewErrExecOfUnknownStatementType(db.GetContext(), (*s).Offset(), reflect.TypeOf(*s).Name())
	}
}

// Given a key, a value and a SelectClause, return the projection.
// This function is only called if Eval returned true on the WhereClause expression.
func ComposeProjection(db ds.Database, k string, v *vdl.Value, s *query_parser.SelectClause) []*vdl.Value {
	var projection []*vdl.Value
	for _, selector := range s.Selectors {
		switch selector.Type {
		case query_parser.TypSelField:
			// If field not found, nil is returned (as per specification).
			f := ResolveField(db, k, v, selector.Field)
			projection = append(projection, f)
		case query_parser.TypSelFunc:
			if selector.Function.Computed {
				projection = append(projection, query_functions.ConvertFunctionRetValueToVdlValue(selector.Function.RetValue))
			} else {
				// need to exec function
				// If error executing function, return nil (as per specification).
				retValue, err := resolveArgsAndExecFunction(db, k, v, selector.Function)
				if err != nil {
					retValue = nil
				}
				projection = append(projection, query_functions.ConvertFunctionRetValueToVdlValue(retValue))
			}
		}
	}
	return projection
}

// For testing purposes, given a SelectStatement, k and v;
// return nil if row not selected, else return the projection (type []*vdl.Value).
// Note: limit and offset clauses are ignored for this function as they make no sense
// for a single row.
func ExecSelectSingleRow(db ds.Database, k string, v *vdl.Value, s *query_parser.SelectStatement) []*vdl.Value {
	if !Eval(db, k, v, s.Where.Expr) {
		rs := []*vdl.Value{}
		return rs
	}
	return ComposeProjection(db, k, v, s.Select)
}

func getColumnHeadings(s *query_parser.SelectStatement) []string {
	columnHeaders := []string{}
	for _, selector := range s.Select.Selectors {
		columnName := ""
		if selector.As != nil {
			columnName = selector.As.AltName.Value
		} else {
			switch selector.Type {
			case query_parser.TypSelField:
				sep := ""
				for _, segment := range selector.Field.Segments {
					columnName = columnName + sep + segment.Value
					for _, key := range segment.Keys {
						columnName += getSegmentKeyAsHeading(key)
					}
					sep = "."
				}
			case query_parser.TypSelFunc:
				columnName = selector.Function.Name
			}
		}
		columnHeaders = append(columnHeaders, columnName)
	}
	return columnHeaders
}

// TODO(jkline): Should we really include key/index of a map/set/array/list in the header?
// The column names can get quite long.  Perhaps just "[]" at the end of the segment
// would be better.  The author of the query can always use the As clause to specify a
// better heading.  Note: for functions, just the function name is included in the header.
// When a decision is made, it's best to be consistent for functions and key/indexes.
func getSegmentKeyAsHeading(segKey *query_parser.Operand) string {
	val := "["
	switch segKey.Type {
	case query_parser.TypBigInt:
		val += segKey.BigInt.String()
	case query_parser.TypBigRat:
		val += segKey.BigRat.String()
	case query_parser.TypComplex:
		val += fmt.Sprintf("%g", segKey.Complex)
	case query_parser.TypField:
		sep := ""
		for _, segment := range segKey.Column.Segments {
			val += sep + segment.Value
			for _, key := range segment.Keys {
				val += getSegmentKeyAsHeading(key)
			}
			sep = "."
		}
	case query_parser.TypBool:
		val += strconv.FormatBool(segKey.Bool)
	case query_parser.TypInt:
		val += strconv.FormatInt(segKey.Int, 10)
	case query_parser.TypFloat:
		val += strconv.FormatFloat(segKey.Float, 'f', -1, 64)
	case query_parser.TypFunction:
		val += segKey.Function.Name
	case query_parser.TypStr:
		val += segKey.Str
	case query_parser.TypTime:
		val += segKey.Time.Format("Mon Jan 2 15:04:05 -0700 MST 2006")
	case query_parser.TypNil:
		val += "<nil>"
	case query_parser.TypObject:
		val += "<object>"
	default:
		val += "<?>"
	}
	val += "]"
	return val
}

func getIndexRanges(db ds.Database, tableName string, tableOff int64, indexFields []ds.Index, w *query_parser.WhereClause) ([]ds.IndexRanges, error) {
	indexes := []ds.IndexRanges{}

	// Get IndexRanges for k
	kField := &query_parser.Field{Segments: []query_parser.Segment{query_parser.Segment{Value: "k"}}}
	idxRanges := *query_checker.CompileIndexRanges(kField, vdl.String, w)
	indexes = append(indexes, idxRanges)

	// Get IndexRanges for secondary indexes.
	for _, idx := range indexFields {
		if idx.Kind != vdl.String {
			return nil, syncql.NewErrIndexKindNotSupported(db.GetContext(), tableOff, idx.Kind.String(), idx.FieldName, tableName)
		}
		var err error
		var idxField *query_parser.Field
		// Construct a Field from the string.  Use the parser as it knows best.
		if idxField, err = query_parser.ParseIndexField(db, idx.FieldName, tableName); err != nil {
			return nil, err
		}
		idxRanges := *query_checker.CompileIndexRanges(idxField, idx.Kind, w)
		indexes = append(indexes, idxRanges)
	}
	return indexes, nil
}

func execStatement(db ds.Database, s *query_parser.Statement) ([]string, syncql.ResultStream, error) {
	switch st := (*s).(type) {

	// Select
	case query_parser.SelectStatement:
		indexes, err := getIndexRanges(db, st.From.Table.Name, st.From.Table.Off, st.From.Table.DBTable.GetIndexFields(), st.Where)
		if err != nil {
			return nil, nil, err
		}

		keyValueStream, err := st.From.Table.DBTable.Scan(indexes...)
		if err != nil {
			return nil, nil, syncql.NewErrScanError(db.GetContext(), st.Off, err)
		}
		var resultStream selectResultStreamImpl
		resultStream.db = db
		resultStream.selectStatement = &st
		resultStream.keyValueStream = keyValueStream
		return getColumnHeadings(&st), &resultStream, nil

	// Delete
	case query_parser.DeleteStatement:
		indexes, err := getIndexRanges(db, st.From.Table.Name, st.From.Table.Off, st.From.Table.DBTable.GetIndexFields(), st.Where)
		if err != nil {
			return nil, nil, err
		}

		keyValueStream, err := st.From.Table.DBTable.Scan(indexes...)
		if err != nil {
			return nil, nil, syncql.NewErrScanError(db.GetContext(), st.Off, err)
		}

		deleteCount := int64(0)
		for keyValueStream.Advance() {
			if st.Limit != nil && deleteCount >= st.Limit.Limit.Value {
				break
			}
			k, v := keyValueStream.KeyValue()
			// EvalWhereUsingOnlyKey
			// INCLUDE: the row should be included in the results
			// EXCLUDE: the row should NOT be included
			// FETCH_VALUE: the value and/or type of the value are required to make determination.
			rv := EvalWhereUsingOnlyKey(db, st.Where, k)
			var match bool
			switch rv {
			case INCLUDE:
				match = true
			case EXCLUDE:
				match = false
			case FETCH_VALUE:
				match = Eval(db, k, v, st.Where.Expr)
			}
			if match {
				b, err := st.From.Table.DBTable.Delete(k)
				// May not have delete permission to delete this k/v pair.
				// Continue, but don't increment delete count.
				if err == nil && b {
					deleteCount++
				}
			}
		}
		if err := keyValueStream.Err(); err != nil {
			return nil, nil, syncql.NewErrKeyValueStreamError(db.GetContext(), st.Off, err)
		}

		var resultStream deleteResultStreamImpl
		resultStream.db = db
		resultStream.deleteStatement = &st
		resultStream.deleteCursor = 0
		resultStream.deleteCount = deleteCount
		return []string{"Count"}, &resultStream, nil
	}
	return nil, nil, syncql.NewErrOperationNotSupported(db.GetContext(), "")
}
