blob: d21cb8b63db5301e6716ae7168684878d138579c [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Client methods for listing stream data.
package client
import (
"fmt"
"sort"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/syncbase/util"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
)
type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
// ListStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
func ListStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
dataCollection := db.Collection(ctx, sbmodel.KDataPoint{}.Collection())
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
return syncbase.RunInBatch(ctx, db, wire.BatchOptions{ReadOnly: true}, func(bdb syncbase.BatchDatabase) error {
streamRow := bdb.Collection(ctx, streamKey.Collection()).Row(streamKey.Key())
if exists, err := streamRow.Exists(ctx); err != nil {
return err
} else if !exists {
return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
}
sstr := bdb.CollectionForId(dataCollection.Id()).Scan(ctx, syncbase.Prefix(dataPrefix))
defer sstr.Cancel()
for sstr.Advance() {
key := &sbmodel.KDataPoint{}
if err := key.Parse(sstr.Key()); err != nil {
return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
}
var val sbmodel.VDataPoint
if err := sstr.Value(&val); err != nil {
return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
}
if err := listCb(key, val); err != nil {
return err
}
}
return sstr.Err()
})
}
// FollowStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each. It keeps listing new data
// points until ctx is cancelled.
// TODO(ivanpi): Allow specifying time interval.
func FollowStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
dataCollection := db.Collection(ctx, sbmodel.KDataPoint{}.Collection())
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
// Watch for DataPoints, existing followed by new.
// TODO(ivanpi): Check if stream exists.
ws := db.Watch(ctx, nil, []wire.CollectionRowPattern{
util.RowPrefixPattern(dataCollection.Id(), dataPrefix),
})
defer ws.Cancel()
trans := make([]*dataPoint, 0, 16)
for ws.Advance() {
c := ws.Change()
if c.EntityType != syncbase.EntityRow {
continue
}
var elem dataPoint
if err := elem.Key.Parse(c.Row); err != nil {
return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
}
switch c.ChangeType {
case syncbase.PutChange:
if err := c.Value(&elem.Val); err != nil {
return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
}
// In the absence of overlapping syncgroups or partial ACLs, the order of
// writes is preserved by sync. However, several writes may be grouped
// into a single batch, and the order is not preserved within a batch.
// Each batch is manually sorted before being emitted to the callback.
trans = append(trans, &elem)
if !c.Continued {
sort.Stable(dataPointSort(trans))
for _, elem := range trans {
if err := listCb(&elem.Key, elem.Val); err != nil {
return err
}
}
trans = trans[:0]
}
case syncbase.DeleteChange:
// no-op
}
}
return ws.Err()
}
type dataPoint struct {
Key sbmodel.KDataPoint
Val sbmodel.VDataPoint
}
// dataPointSort implements sorting a dataPoint slice by timestamp.
type dataPointSort []*dataPoint
func (s dataPointSort) Len() int { return len(s) }
func (s dataPointSort) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s dataPointSort) Less(i, j int) bool { return s[i].Key.Timestamp.Before(s[j].Key.Timestamp) }