sensorlog: Step #1 of simplified Syncbase API transition
MultiPart: 7/7
Change-Id: I3256f063549610b3c50b12d851ae7855a565bf04
diff --git a/go/src/v.io/x/sensorlog/internal/client/device.go b/go/src/v.io/x/sensorlog/internal/client/device.go
index cfb4326..040c0be 100644
--- a/go/src/v.io/x/sensorlog/internal/client/device.go
+++ b/go/src/v.io/x/sensorlog/internal/client/device.go
@@ -10,8 +10,8 @@
"fmt"
"v.io/v23/context"
- nosql_wire "v.io/v23/services/syncbase/nosql"
- "v.io/v23/syncbase/nosql"
+ wire "v.io/v23/services/syncbase"
+ "v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/config"
"v.io/x/sensorlog/internal/sbmodel"
@@ -21,7 +21,7 @@
// AddDevice joins the syncgroup of the measuring device identified by devId,
// expected to be published at sgPublishSb, and makes it available for stream
// configuration.
-func AddDevice(ctx *context.T, db nosql.Database, devId, sgPublishSb, desc string) (*sbmodel.KDeviceCfg, error) {
+func AddDevice(ctx *context.T, db syncbase.Database, devId, sgPublishSb, desc string) (*sbmodel.KDeviceCfg, error) {
if err := keyutil.ValidateId(devId); err != nil {
return nil, fmt.Errorf("invalid devId: %v", err)
}
@@ -46,7 +46,7 @@
// TODO(ivanpi): Lack of atomicity here can result in a duplicate join, not
// really relevant until syncgroup Leave is implemented.
- sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+ sgMemberInfo := wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
if _, err := db.Syncgroup(devSgName).Join(ctx, sgMemberInfo); err != nil {
return nil, err
}
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index 0a0b4d9..23339e3 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -11,8 +11,8 @@
"sort"
"v.io/v23/context"
- nosql_wire "v.io/v23/services/syncbase/nosql"
- "v.io/v23/syncbase/nosql"
+ wire "v.io/v23/services/syncbase"
+ "v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
@@ -23,11 +23,11 @@
// ListStreamData lists all data points for the stream specified by streamKey
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
-func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+func ListStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
tableName := sbmodel.KDataPoint{}.Table()
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
- bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+ bdb, err := db.BeginBatch(ctx, wire.BatchOptions{ReadOnly: true})
if err != nil {
return err
}
@@ -40,7 +40,7 @@
return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
}
- sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
+ sstr := bdb.Table(tableName).Scan(ctx, syncbase.Prefix(dataPrefix))
defer sstr.Cancel()
for sstr.Advance() {
@@ -63,7 +63,7 @@
// in chronological order, calling listCb for each. It keeps listing new data
// points until ctx is cancelled.
// TODO(ivanpi): Allow specifying time interval.
-func FollowStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+func FollowStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
tableName := sbmodel.KDataPoint{}.Table()
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
@@ -82,7 +82,7 @@
return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
}
switch c.ChangeType {
- case nosql.PutChange:
+ case syncbase.PutChange:
if err := c.Value(&elem.Val); err != nil {
return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
}
@@ -100,7 +100,7 @@
}
trans = trans[:0]
}
- case nosql.DeleteChange:
+ case syncbase.DeleteChange:
// no-op
}
}
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
index 4462097..e4ebc4a 100644
--- a/go/src/v.io/x/sensorlog/internal/client/stream.go
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -12,8 +12,8 @@
"time"
"v.io/v23/context"
- nosql_wire "v.io/v23/services/syncbase/nosql"
- "v.io/v23/syncbase/nosql"
+ wire "v.io/v23/services/syncbase"
+ "v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/config"
"v.io/x/sensorlog/internal/sbmodel"
@@ -24,7 +24,7 @@
// The target device must have been configured. streamId must be unique for
// the target device.
// TODO(ivanpi): Make start time configurable.
-func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
+func CreateStream(ctx *context.T, db syncbase.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
if err := keyutil.ValidateId(streamId); err != nil {
return nil, fmt.Errorf("invalid streamId: %v", err)
}
@@ -51,7 +51,7 @@
Enabled: true,
}
- if err := nosql.RunInBatch(ctx, db, nosql_wire.BatchOptions{}, func(db nosql.BatchDatabase) error {
+ if err := syncbase.RunInBatch(ctx, db, wire.BatchOptions{}, func(db syncbase.BatchDatabase) error {
devRow := db.Table(devKey.Table()).Row(devKey.Key())
stmRow := db.Table(stmKey.Table()).Row(stmKey.Key())
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 72278b0..930ad50 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -12,8 +12,8 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/security/access"
- nosql_wire "v.io/v23/services/syncbase/nosql"
- "v.io/v23/syncbase/nosql"
+ wire "v.io/v23/services/syncbase"
+ "v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/config"
"v.io/x/sensorlog/internal/sbmodel"
@@ -28,7 +28,7 @@
// retried with different parameters for the same devId, otherwise behaviour
// is unspecified.
// TODO(ivanpi): Remove Resolve permissions when v.io/i/1110 is fixed.
-func InitSyncgroup(ctx *context.T, db nosql.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
+func InitSyncgroup(ctx *context.T, db syncbase.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
if err := keyutil.ValidateId(devId); err != nil {
return fmt.Errorf("invalid devId: %v", err)
}
@@ -51,11 +51,11 @@
sbutil.AddPermsForPattern(&sgAcl, admin, access.Read, access.Admin)
// Maps all syncgroup prefixes to ACLs.
- prefixSpec := make(map[nosql_wire.TableRow]access.Permissions)
+ prefixSpec := make(map[wire.TableRow]access.Permissions)
// StreamDef : <devId>
// Admin client has full permissions, measured drops to readonly.
- prefixStreamDef := nosql_wire.TableRow{
+ prefixStreamDef := wire.TableRow{
TableName: sbmodel.KStreamDef{}.Table(),
Row: devId,
}
@@ -66,7 +66,7 @@
// DataPoint : <devId>
// Admin client has full permissions, measured drops to read/write.
- prefixDataPoint := nosql_wire.TableRow{
+ prefixDataPoint := wire.TableRow{
TableName: sbmodel.KDataPoint{}.Table(),
Row: devId,
}
@@ -75,24 +75,24 @@
sbutil.AddPermsForPattern(&aclDataPoint, admin, access.Resolve, access.Read, access.Write, access.Admin)
prefixSpec[prefixDataPoint] = aclDataPoint
- var prefixes []nosql_wire.TableRow
+ var prefixes []wire.TableRow
// Apply prefix ACLs to all syncgroup prefixes.
for prefix, prefixAcl := range prefixSpec {
// Ignore ErrNoAccess, assume we already dropped permissions.
- err := db.Table(prefix.TableName).SetPrefixPermissions(ctx, nosql.Prefix(prefix.Row), prefixAcl)
+ err := db.Table(prefix.TableName).SetPrefixPermissions(ctx, syncbase.Prefix(prefix.Row), prefixAcl)
if err != nil && verror.ErrorID(err) != verror.ErrNoAccess.ID {
return err
}
prefixes = append(prefixes, prefix)
}
- sgSpec := nosql_wire.SyncgroupSpec{
+ sgSpec := wire.SyncgroupSpec{
Description: fmt.Sprintf("measured-main-%s", devId),
Perms: sgAcl,
Prefixes: prefixes,
MountTables: sgMountTables,
}
- sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+ sgMemberInfo := wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
return db.Syncgroup(sgName).Create(ctx, sgSpec, sgMemberInfo)
}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index 3518d91..08e8415 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -10,7 +10,7 @@
"fmt"
"v.io/v23/context"
- "v.io/v23/syncbase/nosql"
+ "v.io/v23/syncbase"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
)
@@ -26,7 +26,7 @@
// an error, measured exits with an error.
// WatchForStreams is synchronous and runs until the context is cancelled or
// an error is encountered.
-func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
+func WatchForStreams(ctx *context.T, db syncbase.Database, devId string, register RegisterWorker) error {
tableName := sbmodel.KStreamDef{}.Table()
watchPrefix := keyutil.Join(devId, "")
@@ -44,7 +44,7 @@
return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
}
switch c.ChangeType {
- case nosql.PutChange:
+ case syncbase.PutChange:
val := &sbmodel.VStreamDef{}
if err := c.Value(val); err != nil {
return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
@@ -52,7 +52,7 @@
if err := register(key, val); err != nil {
return err
}
- case nosql.DeleteChange:
+ case syncbase.DeleteChange:
return fmt.Errorf("StreamDef delete is not supported")
}
}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
index e0e315e..973381f 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -12,7 +12,7 @@
"time"
"v.io/v23/context"
- "v.io/v23/syncbase/nosql"
+ "v.io/v23/syncbase"
_ "v.io/x/ref/runtime/factories/generic"
sbtu "v.io/x/ref/services/syncbase/testutil"
tu "v.io/x/ref/test/testutil"
@@ -115,7 +115,7 @@
}
}
-func runWatchForStreams(ctx *context.T, db nosql.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
+func runWatchForStreams(ctx *context.T, db syncbase.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
ctx, stop = context.WithCancel(ctx)
wait = util.AsyncRun(func() error {
return measure.WatchForStreams(ctx, db, devId, register)
@@ -123,7 +123,7 @@
return stop, wait
}
-func putStreamDef(t *testing.T, ctx *context.T, db nosql.DatabaseHandle, devId, desc string) {
+func putStreamDef(t *testing.T, ctx *context.T, db syncbase.DatabaseHandle, devId, desc string) {
key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
val := sbmodel.VStreamDef{Desc: desc}
if err := db.Table(key.Table()).Put(ctx, key.Key(), &val); err != nil {
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
index ff824b1..0f8fd43 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -11,7 +11,6 @@
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/v23/syncbase"
- "v.io/v23/syncbase/nosql"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/config"
"v.io/x/sensorlog/internal/sbmodel"
@@ -19,7 +18,7 @@
// CreateOrOpenDB opens the Sensor Log database hosted on specified Syncbase
// instance, creating it if missing, initializing specified tables.
-func CreateOrOpenDB(ctx *context.T, sbService string, tables []sbmodel.TableSpec) (nosql.Database, error) {
+func CreateOrOpenDB(ctx *context.T, sbService string, tables []sbmodel.TableSpec) (syncbase.Database, error) {
aclFull := access.Permissions{}
// Allow everyone to resolve to allow joining syncgroups.
AddPermsForPattern(&aclFull, string(security.AllPrincipals), access.Resolve)
@@ -39,7 +38,7 @@
}
// TODO(ivanpi): Add schema version.
- db := app.NoSQLDatabase(config.DBName, nil)
+ db := app.Database(config.DBName, nil)
if err := createIfMissing(ctx, db, aclFull); err != nil {
return nil, err
}