blob: ebf55ea773034e9e9fc9e1fb2de3d50aad96e50c [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Client methods for listing stream data.
package client
import (
"fmt"
"v.io/v23/context"
nosql_wire "v.io/v23/services/syncbase/nosql"
"v.io/v23/syncbase/nosql"
"v.io/v23/verror"
"v.io/x/sensorlog_lite/internal/sbmodel"
"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
)
type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
// ListStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
if err != nil {
return err
}
defer bdb.Abort(ctx)
streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
if exists, err := streamRow.Exists(ctx); err != nil {
return err
} else if !exists {
return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
}
sstr := bdb.Table(sbmodel.KDataPoint{}.Table()).Scan(ctx, nosql.Prefix(dataPrefix))
defer sstr.Cancel()
for sstr.Advance() {
key := &sbmodel.KDataPoint{}
if err := key.Parse(sstr.Key()); err != nil {
return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
}
var val sbmodel.VDataPoint
if err := sstr.Value(&val); err != nil {
return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
}
if err := listCb(key, val); err != nil {
return err
}
}
return sstr.Err()
}