// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Client methods for listing stream data.

package client

import (
	"fmt"
	"sort"

	"v.io/v23/context"
	nosql_wire "v.io/v23/services/syncbase/nosql"
	"v.io/v23/services/watch"
	"v.io/v23/syncbase/nosql"
	"v.io/v23/verror"
	"v.io/x/sensorlog_lite/internal/sbmodel"
	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
)

type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error

// ListStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
	_, err := listStreamData(ctx, db, streamKey, listCb)
	return err
}

// FollowStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each. It keeps listing new data
// points until ctx is cancelled.
// TODO(ivanpi): Allow specifying time interval.
func FollowStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
	tableName := sbmodel.KDataPoint{}.Table()
	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")

	resMark, err := listStreamData(ctx, db, streamKey, listCb)
	if err != nil {
		return err
	}

	// Watch for new DataPoints.
	ws, err := db.Watch(ctx, tableName, dataPrefix, resMark)
	if err != nil {
		return err
	}
	defer ws.Cancel()

	trans := make([]*dataPoint, 0, 16)
	for ws.Advance() {
		c := ws.Change()
		var elem dataPoint
		if err := elem.Key.Parse(c.Row); err != nil {
			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
		}
		switch c.ChangeType {
		case nosql.PutChange:
			if err := c.Value(&elem.Val); err != nil {
				return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
			}
			// In the absence of overlapping syncgroups or partial ACLs, the order of
			// writes is preserved by sync. However, several writes may be grouped
			// into a single batch, and the order is not preserved within a batch.
			// Each batch is manually sorted before being emitted to the callback.
			trans = append(trans, &elem)
			if !c.Continued {
				sort.Stable(dataPointSort(trans))
				for _, elem := range trans {
					if err := listCb(&elem.Key, elem.Val); err != nil {
						return err
					}
				}
				trans = trans[:0]
			}
		case nosql.DeleteChange:
			// no-op
		}
	}
	return ws.Err()
}

// listStreamData implements listing (scanning over) existing stream data. It
// also returns the resume marker to allow watching for future data.
func listStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) (watch.ResumeMarker, error) {
	var resMark watch.ResumeMarker
	tableName := sbmodel.KDataPoint{}.Table()
	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")

	bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
	if err != nil {
		return resMark, err
	}
	defer bdb.Abort(ctx)

	resMark, err = bdb.GetResumeMarker(ctx)
	if err != nil {
		return resMark, err
	}

	streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
	if exists, err := streamRow.Exists(ctx); err != nil {
		return resMark, err
	} else if !exists {
		return resMark, verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
	}

	sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
	defer sstr.Cancel()

	for sstr.Advance() {
		key := &sbmodel.KDataPoint{}
		if err := key.Parse(sstr.Key()); err != nil {
			return resMark, fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
		}
		var val sbmodel.VDataPoint
		if err := sstr.Value(&val); err != nil {
			return resMark, fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
		}
		if err := listCb(key, val); err != nil {
			return resMark, err
		}
	}
	return resMark, sstr.Err()
}

type dataPoint struct {
	Key sbmodel.KDataPoint
	Val sbmodel.VDataPoint
}

// dataPointSort implements sorting a dataPoint slice by timestamp.
type dataPointSort []*dataPoint

func (s dataPointSort) Len() int           { return len(s) }
func (s dataPointSort) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
func (s dataPointSort) Less(i, j int) bool { return s[i].Key.Timestamp.Before(s[j].Key.Timestamp) }
