Merge "runtime/internal/flow/conn: Fix flow control accounting when flows close.."
diff --git a/services/device/dmrun/backend/backend_vcloud.go b/services/device/dmrun/backend/backend_vcloud.go
index 8d1f50a..36ccec9 100644
--- a/services/device/dmrun/backend/backend_vcloud.go
+++ b/services/device/dmrun/backend/backend_vcloud.go
@@ -66,8 +66,7 @@
 	if g.isDeleted {
 		return fmt.Errorf("trying to delete a deleted VcloudVM")
 	}
-
-	cmd := exec.Command(g.vcloud, "node", "delete", g.projectArg, g.zoneArg, g.name)
+	cmd := g.generateDeleteCmd(false)
 	output, err := cmd.CombinedOutput()
 	if err != nil {
 		err = fmt.Errorf("failed deleting GCE instance (%s): %v\nOutput:%v\n", strings.Join(cmd.Args, " "), err, string(output))
@@ -79,6 +78,10 @@
 	return err
 }
 
+func (g *VcloudVM) generateDeleteCmd(forUser bool) *exec.Cmd {
+	return exec.Command(g.vcloudCmd(forUser), "node", "delete", g.projectArg, g.zoneArg, g.name)
+}
+
 func (g *VcloudVM) Name() string {
 	return g.name
 }
@@ -92,7 +95,7 @@
 		return nil, fmt.Errorf("RunCommand called on deleted VcloudVM")
 	}
 
-	cmd := g.generateExecCmdForRun(args...)
+	cmd := g.generateExecCmdForRun(false, args...)
 	output, err := cmd.CombinedOutput()
 	if err != nil {
 		err = fmt.Errorf("failed running [%s] on VM %s", strings.Join(args, " "), g.name)
@@ -104,17 +107,29 @@
 	if g.isDeleted {
 		return ""
 	}
-	cmd := g.generateExecCmdForRun(args...)
+	return cmdLine(g.generateExecCmdForRun(true, args...))
+}
 
+func cmdLine(cmd *exec.Cmd) string {
 	result := cmd.Path
-	for i := 1; i < len(cmd.Args); i++ {
-		result = fmt.Sprintf("%s %q", result, cmd.Args[i])
+	for _, arg := range cmd.Args[1:] {
+		result = fmt.Sprintf("%s %q", result, arg)
 	}
 	return result
 }
 
-func (g *VcloudVM) generateExecCmdForRun(args ...string) *exec.Cmd {
-	return exec.Command(g.vcloud, append([]string{"sh", g.projectArg, g.name, "cd", g.workingDir, "&&"}, args...)...)
+func (g *VcloudVM) vcloudCmd(forUser bool) string {
+	if forUser {
+		// We can't return the vcloud binary that we ran for the steps
+		// above, as that one is deleted after use. For now, we assume
+		// the user will have a vcloud binary on his path to use.
+		return "vcloud"
+	}
+	return g.vcloud
+}
+
+func (g *VcloudVM) generateExecCmdForRun(forUser bool, args ...string) *exec.Cmd {
+	return exec.Command(g.vcloudCmd(forUser), append([]string{"sh", g.projectArg, g.name, "cd", g.workingDir, "&&"}, args...)...)
 }
 
 func (g *VcloudVM) CopyFile(infile, destination string) error {
@@ -134,8 +149,5 @@
 	if g.isDeleted {
 		return ""
 	}
-
-	// We can't return the vcloud binary that we ran for the steps above, as that one is deleted
-	// after use. For now, we assume the user will have a vcloud binary on his path to use.
-	return strings.Join([]string{"vcloud", "node", "delete", g.projectArg, g.zoneArg, g.name}, " ")
+	return cmdLine(g.generateDeleteCmd(true))
 }
diff --git a/services/device/dmrun/dmrun.go b/services/device/dmrun/dmrun.go
index eaf77ac..5d011f2 100644
--- a/services/device/dmrun/dmrun.go
+++ b/services/device/dmrun/dmrun.go
@@ -122,24 +122,21 @@
 	fmt.Printf("Working dir: %s\n", workDir)
 }
 
-// buildV23Binary builds the specified binary and returns the path to the
-// executable.
-func buildV23Binary(pkg string) string {
-	fmt.Println("Building", pkg)
-	dest := filepath.Join(workDir, path.Base(pkg))
-	cmd := exec.Command("jiri", "go", "build", "-x", "-o", dest, pkg)
+// buildV23Binaries builds the specified binaries and returns the paths to the
+// executables.
+func buildV23Binaries(pkg ...string) []string {
+	fmt.Print("Building ", pkg, " ...")
+	defer fmt.Println("Done.")
+	args := append([]string{"go", "install", "-x"}, pkg...)
+	cmd := exec.Command("jiri", args...)
+	cmd.Env = append(os.Environ(), "GOBIN="+workDir)
 	output, err := cmd.CombinedOutput()
 	dieIfErr(err, "Running build command %v failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
-	return dest
-}
-
-// buildDMBinaries builds the binaries required for a device manager
-// installation and returns the paths to the executables.
-func buildDMBinaries() (ret []string) {
-	for _, b := range dmBins {
-		ret = append(ret, buildV23Binary(b))
+	dest := make([]string, len(pkg))
+	for i, p := range pkg {
+		dest[i] = filepath.Join(workDir, path.Base(p))
 	}
-	return
+	return dest
 }
 
 // createArchive creates a zip archive from the given files.
@@ -169,6 +166,7 @@
 
 // setupInstance creates a new VM instance and returns its name and IP address.
 func setupInstance(vmOptions interface{}) (backend.CloudVM, string, string) {
+	fmt.Println("Setting up instance ...")
 	currUser, err := user.Current()
 	dieIfErr(err, "Couldn't obtain current user")
 	instanceName := fmt.Sprintf("%s-%s", currUser.Username, time.Now().UTC().Format("20060102-150405"))
@@ -343,7 +341,7 @@
 	switch {
 	default:
 		// Vcloud backend
-		vcloud = buildV23Binary(vcloudBin)
+		vcloud = buildV23Binaries(vcloudBin)[0]
 		vmOpts = backend.VcloudVMOptions{VcloudBinary: vcloud}
 
 	case sshTarget != "":
@@ -386,9 +384,9 @@
 	}
 	defer os.RemoveAll(workDir)
 	vmOpts := handleFlags()
-	device = buildV23Binary(deviceBin)
-	dmBins := buildDMBinaries()
-	archive := createArchive(append(dmBins, getPath(devicexRepo, devicex)))
+	dmBinaries := buildV23Binaries(append([]string{deviceBin}, dmBins[:]...)...)
+	device, dmBinaries = dmBinaries[0], dmBinaries[1:]
+	archive := createArchive(append(dmBinaries, getPath(devicexRepo, devicex)))
 
 	vm, vmInstanceName, vmInstanceIP := setupInstance(vmOpts)
 	cleanupOnDeath = func() {
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index b1d4990..6412297 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -15,8 +15,8 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	wire "v.io/v23/services/syncbase/nosql"
-	"v.io/v23/syncbase/nosql/query"
-	"v.io/v23/syncbase/nosql/query/exec"
+	"v.io/v23/query/engine"
+	ds "v.io/v23/query/engine/datasource"
 	pubutil "v.io/v23/syncbase/util"
 	"v.io/v23/vdl"
 	"v.io/v23/verror"
@@ -265,7 +265,7 @@
 			req:  d,
 			sntx: sntx,
 		}
-		headers, rs, err := exec.Exec(db, q)
+		headers, rs, err := engine.Exec(db, q)
 		if err != nil {
 			return err
 		}
@@ -439,7 +439,7 @@
 ////////////////////////////////////////
 // query interface implementations
 
-// queryDb implements query.Database.
+// queryDb implements ds.Database.
 type queryDb struct {
 	ctx  *context.T
 	call wire.DatabaseExecServerCall
@@ -451,7 +451,7 @@
 	return db.ctx
 }
 
-func (db *queryDb) GetTable(name string) (query.Table, error) {
+func (db *queryDb) GetTable(name string) (ds.Table, error) {
 	tDb := &tableDb{
 		qdb: db,
 		req: &tableReq{
@@ -466,13 +466,13 @@
 	return tDb, nil
 }
 
-// tableDb implements query.Table.
+// tableDb implements ds.Table.
 type tableDb struct {
 	qdb *queryDb
 	req *tableReq
 }
 
-func (t *tableDb) Scan(keyRanges query.KeyRanges) (query.KeyValueStream, error) {
+func (t *tableDb) Scan(keyRanges ds.KeyRanges) (ds.KeyValueStream, error) {
 	streams := []store.Stream{}
 	for _, keyRange := range keyRanges {
 		// TODO(jkline): For now, acquire all of the streams at once to minimize the
@@ -489,7 +489,7 @@
 	}, nil
 }
 
-// kvs implements query.KeyValueStream.
+// kvs implements ds.KeyValueStream.
 type kvs struct {
 	t         *tableDb
 	curr      int