// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Client methods for listing stream data.

package client

import (
	"fmt"
	"sort"

	"v.io/v23/context"
	wire "v.io/v23/services/syncbase"
	"v.io/v23/syncbase"
	"v.io/v23/syncbase/util"
	"v.io/v23/verror"
	"v.io/x/sensorlog/internal/sbmodel"
	"v.io/x/sensorlog/internal/sbmodel/keyutil"
)

type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error

// ListStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
func ListStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
	dataCollection := db.Collection(ctx, sbmodel.KDataPoint{}.Collection())
	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")

	return syncbase.RunInBatch(ctx, db, wire.BatchOptions{ReadOnly: true}, func(bdb syncbase.BatchDatabase) error {
		streamRow := bdb.Collection(ctx, streamKey.Collection()).Row(streamKey.Key())
		if exists, err := streamRow.Exists(ctx); err != nil {
			return err
		} else if !exists {
			return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
		}

		sstr := bdb.CollectionForId(dataCollection.Id()).Scan(ctx, syncbase.Prefix(dataPrefix))
		defer sstr.Cancel()

		for sstr.Advance() {
			key := &sbmodel.KDataPoint{}
			if err := key.Parse(sstr.Key()); err != nil {
				return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
			}
			var val sbmodel.VDataPoint
			if err := sstr.Value(&val); err != nil {
				return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
			}
			if err := listCb(key, val); err != nil {
				return err
			}
		}
		return sstr.Err()
	})
}

// FollowStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each. It keeps listing new data
// points until ctx is cancelled.
// TODO(ivanpi): Allow specifying time interval.
func FollowStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
	dataCollection := db.Collection(ctx, sbmodel.KDataPoint{}.Collection())
	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")

	// Watch for DataPoints, existing followed by new.
	// TODO(ivanpi): Check if stream exists.
	ws := db.Watch(ctx, nil, []wire.CollectionRowPattern{
		util.RowPrefixPattern(dataCollection.Id(), dataPrefix),
	})
	defer ws.Cancel()

	trans := make([]*dataPoint, 0, 16)
	for ws.Advance() {
		c := ws.Change()
		var elem dataPoint
		if err := elem.Key.Parse(c.Row); err != nil {
			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
		}
		switch c.ChangeType {
		case syncbase.PutChange:
			if err := c.Value(&elem.Val); err != nil {
				return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
			}
			// In the absence of overlapping syncgroups or partial ACLs, the order of
			// writes is preserved by sync. However, several writes may be grouped
			// into a single batch, and the order is not preserved within a batch.
			// Each batch is manually sorted before being emitted to the callback.
			trans = append(trans, &elem)
			if !c.Continued {
				sort.Stable(dataPointSort(trans))
				for _, elem := range trans {
					if err := listCb(&elem.Key, elem.Val); err != nil {
						return err
					}
				}
				trans = trans[:0]
			}
		case syncbase.DeleteChange:
			// no-op
		}
	}
	return ws.Err()
}

type dataPoint struct {
	Key sbmodel.KDataPoint
	Val sbmodel.VDataPoint
}

// dataPointSort implements sorting a dataPoint slice by timestamp.
type dataPointSort []*dataPoint

func (s dataPointSort) Len() int           { return len(s) }
func (s dataPointSort) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
func (s dataPointSort) Less(i, j int) bool { return s[i].Key.Timestamp.Before(s[j].Key.Timestamp) }
