sensorlog: Step #1 of simplified Syncbase API transition

MultiPart: 7/7

Change-Id: I3256f063549610b3c50b12d851ae7855a565bf04
diff --git a/go/src/v.io/x/sensorlog/internal/client/device.go b/go/src/v.io/x/sensorlog/internal/client/device.go
index cfb4326..040c0be 100644
--- a/go/src/v.io/x/sensorlog/internal/client/device.go
+++ b/go/src/v.io/x/sensorlog/internal/client/device.go
@@ -10,8 +10,8 @@
 	"fmt"
 
 	"v.io/v23/context"
-	nosql_wire "v.io/v23/services/syncbase/nosql"
-	"v.io/v23/syncbase/nosql"
+	wire "v.io/v23/services/syncbase"
+	"v.io/v23/syncbase"
 	"v.io/v23/verror"
 	"v.io/x/sensorlog/internal/config"
 	"v.io/x/sensorlog/internal/sbmodel"
@@ -21,7 +21,7 @@
 // AddDevice joins the syncgroup of the measuring device identified by devId,
 // expected to be published at sgPublishSb, and makes it available for stream
 // configuration.
-func AddDevice(ctx *context.T, db nosql.Database, devId, sgPublishSb, desc string) (*sbmodel.KDeviceCfg, error) {
+func AddDevice(ctx *context.T, db syncbase.Database, devId, sgPublishSb, desc string) (*sbmodel.KDeviceCfg, error) {
 	if err := keyutil.ValidateId(devId); err != nil {
 		return nil, fmt.Errorf("invalid devId: %v", err)
 	}
@@ -46,7 +46,7 @@
 
 	// TODO(ivanpi): Lack of atomicity here can result in a duplicate join, not
 	// really relevant until syncgroup Leave is implemented.
-	sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+	sgMemberInfo := wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
 	if _, err := db.Syncgroup(devSgName).Join(ctx, sgMemberInfo); err != nil {
 		return nil, err
 	}
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index 0a0b4d9..23339e3 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -11,8 +11,8 @@
 	"sort"
 
 	"v.io/v23/context"
-	nosql_wire "v.io/v23/services/syncbase/nosql"
-	"v.io/v23/syncbase/nosql"
+	wire "v.io/v23/services/syncbase"
+	"v.io/v23/syncbase"
 	"v.io/v23/verror"
 	"v.io/x/sensorlog/internal/sbmodel"
 	"v.io/x/sensorlog/internal/sbmodel/keyutil"
@@ -23,11 +23,11 @@
 // ListStreamData lists all data points for the stream specified by streamKey
 // in chronological order, calling listCb for each.
 // TODO(ivanpi): Allow specifying time interval.
-func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+func ListStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
 	tableName := sbmodel.KDataPoint{}.Table()
 	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
 
-	bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+	bdb, err := db.BeginBatch(ctx, wire.BatchOptions{ReadOnly: true})
 	if err != nil {
 		return err
 	}
@@ -40,7 +40,7 @@
 		return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
 	}
 
-	sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
+	sstr := bdb.Table(tableName).Scan(ctx, syncbase.Prefix(dataPrefix))
 	defer sstr.Cancel()
 
 	for sstr.Advance() {
@@ -63,7 +63,7 @@
 // in chronological order, calling listCb for each. It keeps listing new data
 // points until ctx is cancelled.
 // TODO(ivanpi): Allow specifying time interval.
-func FollowStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+func FollowStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
 	tableName := sbmodel.KDataPoint{}.Table()
 	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
 
@@ -82,7 +82,7 @@
 			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
 		}
 		switch c.ChangeType {
-		case nosql.PutChange:
+		case syncbase.PutChange:
 			if err := c.Value(&elem.Val); err != nil {
 				return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
 			}
@@ -100,7 +100,7 @@
 				}
 				trans = trans[:0]
 			}
-		case nosql.DeleteChange:
+		case syncbase.DeleteChange:
 			// no-op
 		}
 	}
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
index 4462097..e4ebc4a 100644
--- a/go/src/v.io/x/sensorlog/internal/client/stream.go
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -12,8 +12,8 @@
 	"time"
 
 	"v.io/v23/context"
-	nosql_wire "v.io/v23/services/syncbase/nosql"
-	"v.io/v23/syncbase/nosql"
+	wire "v.io/v23/services/syncbase"
+	"v.io/v23/syncbase"
 	"v.io/v23/verror"
 	"v.io/x/sensorlog/internal/config"
 	"v.io/x/sensorlog/internal/sbmodel"
@@ -24,7 +24,7 @@
 // The target device must have been configured. streamId must be unique for
 // the target device.
 // TODO(ivanpi): Make start time configurable.
-func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
+func CreateStream(ctx *context.T, db syncbase.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
 	if err := keyutil.ValidateId(streamId); err != nil {
 		return nil, fmt.Errorf("invalid streamId: %v", err)
 	}
@@ -51,7 +51,7 @@
 		Enabled: true,
 	}
 
-	if err := nosql.RunInBatch(ctx, db, nosql_wire.BatchOptions{}, func(db nosql.BatchDatabase) error {
+	if err := syncbase.RunInBatch(ctx, db, wire.BatchOptions{}, func(db syncbase.BatchDatabase) error {
 		devRow := db.Table(devKey.Table()).Row(devKey.Key())
 		stmRow := db.Table(stmKey.Table()).Row(stmKey.Key())
 
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 72278b0..930ad50 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -12,8 +12,8 @@
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/security/access"
-	nosql_wire "v.io/v23/services/syncbase/nosql"
-	"v.io/v23/syncbase/nosql"
+	wire "v.io/v23/services/syncbase"
+	"v.io/v23/syncbase"
 	"v.io/v23/verror"
 	"v.io/x/sensorlog/internal/config"
 	"v.io/x/sensorlog/internal/sbmodel"
@@ -28,7 +28,7 @@
 // retried with different parameters for the same devId, otherwise behaviour
 // is unspecified.
 // TODO(ivanpi): Remove Resolve permissions when v.io/i/1110 is fixed.
-func InitSyncgroup(ctx *context.T, db nosql.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
+func InitSyncgroup(ctx *context.T, db syncbase.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
 	if err := keyutil.ValidateId(devId); err != nil {
 		return fmt.Errorf("invalid devId: %v", err)
 	}
@@ -51,11 +51,11 @@
 	sbutil.AddPermsForPattern(&sgAcl, admin, access.Read, access.Admin)
 
 	// Maps all syncgroup prefixes to ACLs.
-	prefixSpec := make(map[nosql_wire.TableRow]access.Permissions)
+	prefixSpec := make(map[wire.TableRow]access.Permissions)
 
 	// StreamDef : <devId>
 	// Admin client has full permissions, measured drops to readonly.
-	prefixStreamDef := nosql_wire.TableRow{
+	prefixStreamDef := wire.TableRow{
 		TableName: sbmodel.KStreamDef{}.Table(),
 		Row:       devId,
 	}
@@ -66,7 +66,7 @@
 
 	// DataPoint : <devId>
 	// Admin client has full permissions, measured drops to read/write.
-	prefixDataPoint := nosql_wire.TableRow{
+	prefixDataPoint := wire.TableRow{
 		TableName: sbmodel.KDataPoint{}.Table(),
 		Row:       devId,
 	}
@@ -75,24 +75,24 @@
 	sbutil.AddPermsForPattern(&aclDataPoint, admin, access.Resolve, access.Read, access.Write, access.Admin)
 	prefixSpec[prefixDataPoint] = aclDataPoint
 
-	var prefixes []nosql_wire.TableRow
+	var prefixes []wire.TableRow
 	// Apply prefix ACLs to all syncgroup prefixes.
 	for prefix, prefixAcl := range prefixSpec {
 		// Ignore ErrNoAccess, assume we already dropped permissions.
-		err := db.Table(prefix.TableName).SetPrefixPermissions(ctx, nosql.Prefix(prefix.Row), prefixAcl)
+		err := db.Table(prefix.TableName).SetPrefixPermissions(ctx, syncbase.Prefix(prefix.Row), prefixAcl)
 		if err != nil && verror.ErrorID(err) != verror.ErrNoAccess.ID {
 			return err
 		}
 		prefixes = append(prefixes, prefix)
 	}
 
-	sgSpec := nosql_wire.SyncgroupSpec{
+	sgSpec := wire.SyncgroupSpec{
 		Description: fmt.Sprintf("measured-main-%s", devId),
 		Perms:       sgAcl,
 		Prefixes:    prefixes,
 		MountTables: sgMountTables,
 	}
-	sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+	sgMemberInfo := wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
 
 	return db.Syncgroup(sgName).Create(ctx, sgSpec, sgMemberInfo)
 }
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index 3518d91..08e8415 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -10,7 +10,7 @@
 	"fmt"
 
 	"v.io/v23/context"
-	"v.io/v23/syncbase/nosql"
+	"v.io/v23/syncbase"
 	"v.io/x/sensorlog/internal/sbmodel"
 	"v.io/x/sensorlog/internal/sbmodel/keyutil"
 )
@@ -26,7 +26,7 @@
 // an error, measured exits with an error.
 // WatchForStreams is synchronous and runs until the context is cancelled or
 // an error is encountered.
-func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
+func WatchForStreams(ctx *context.T, db syncbase.Database, devId string, register RegisterWorker) error {
 	tableName := sbmodel.KStreamDef{}.Table()
 	watchPrefix := keyutil.Join(devId, "")
 
@@ -44,7 +44,7 @@
 			return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
 		}
 		switch c.ChangeType {
-		case nosql.PutChange:
+		case syncbase.PutChange:
 			val := &sbmodel.VStreamDef{}
 			if err := c.Value(val); err != nil {
 				return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
@@ -52,7 +52,7 @@
 			if err := register(key, val); err != nil {
 				return err
 			}
-		case nosql.DeleteChange:
+		case syncbase.DeleteChange:
 			return fmt.Errorf("StreamDef delete is not supported")
 		}
 	}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
index e0e315e..973381f 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -12,7 +12,7 @@
 	"time"
 
 	"v.io/v23/context"
-	"v.io/v23/syncbase/nosql"
+	"v.io/v23/syncbase"
 	_ "v.io/x/ref/runtime/factories/generic"
 	sbtu "v.io/x/ref/services/syncbase/testutil"
 	tu "v.io/x/ref/test/testutil"
@@ -115,7 +115,7 @@
 	}
 }
 
-func runWatchForStreams(ctx *context.T, db nosql.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
+func runWatchForStreams(ctx *context.T, db syncbase.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
 	ctx, stop = context.WithCancel(ctx)
 	wait = util.AsyncRun(func() error {
 		return measure.WatchForStreams(ctx, db, devId, register)
@@ -123,7 +123,7 @@
 	return stop, wait
 }
 
-func putStreamDef(t *testing.T, ctx *context.T, db nosql.DatabaseHandle, devId, desc string) {
+func putStreamDef(t *testing.T, ctx *context.T, db syncbase.DatabaseHandle, devId, desc string) {
 	key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
 	val := sbmodel.VStreamDef{Desc: desc}
 	if err := db.Table(key.Table()).Put(ctx, key.Key(), &val); err != nil {
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
index ff824b1..0f8fd43 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -11,7 +11,6 @@
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/v23/syncbase"
-	"v.io/v23/syncbase/nosql"
 	"v.io/v23/verror"
 	"v.io/x/sensorlog/internal/config"
 	"v.io/x/sensorlog/internal/sbmodel"
@@ -19,7 +18,7 @@
 
 // CreateOrOpenDB opens the Sensor Log database hosted on specified Syncbase
 // instance, creating it if missing, initializing specified tables.
-func CreateOrOpenDB(ctx *context.T, sbService string, tables []sbmodel.TableSpec) (nosql.Database, error) {
+func CreateOrOpenDB(ctx *context.T, sbService string, tables []sbmodel.TableSpec) (syncbase.Database, error) {
 	aclFull := access.Permissions{}
 	// Allow everyone to resolve to allow joining syncgroups.
 	AddPermsForPattern(&aclFull, string(security.AllPrincipals), access.Resolve)
@@ -39,7 +38,7 @@
 	}
 
 	// TODO(ivanpi): Add schema version.
-	db := app.NoSQLDatabase(config.DBName, nil)
+	db := app.Database(config.DBName, nil)
 	if err := createIfMissing(ctx, db, aclFull); err != nil {
 		return nil, err
 	}