Merge "services/device: make device manager work on Brillo"
diff --git a/cmd/sb51/internal/demodb/db.go b/cmd/sb51/internal/demodb/db.go
index 88b84f5..0429b4f 100644
--- a/cmd/sb51/internal/demodb/db.go
+++ b/cmd/sb51/internal/demodb/db.go
@@ -107,6 +107,37 @@
 			},
 		},
 	},
+	table{
+		name: "Students",
+		rows: []kv{
+			kv{
+				"1",
+				vdl.ValueOf(Student{Name: "John Smith", TestTime: t("Jul 22 12:34:56 PDT 2015"), Score: ActOrSatScoreActScore{Value: 36}}),
+			},
+			kv{
+				"2",
+				vdl.ValueOf(Student{Name: "Mary Jones", TestTime: t("Sep 4 01:23:45 PDT 2015"), Score: ActOrSatScoreSatScore{Value: 1200}}),
+			},
+		},
+	},
+	table{
+		name: "AnythingGoes",
+		rows: []kv{
+			kv{
+				"bar",
+				vdl.ValueOf(AnythingGoes{NameOfType: "Student", Anything: vdl.ValueOf(Student{Name: "John Smith", Score: ActOrSatScoreActScore{Value: 36}})}),
+			},
+			kv{
+				"baz",
+				vdl.ValueOf(AnythingGoes{NameOfType: "Customer", Anything: vdl.ValueOf(Customer{"Bat Masterson", 2, true, AddressInfo{"777 Any St.", "Collins", "IA", "50055"}, CreditReport{Agency: CreditAgencyTransUnion, Report: AgencyReportTransUnionReport{TransUnionCreditReport{80}}}})}),
+			},
+		},
+	},
+}
+
+func t(timeStr string) time.Time {
+	t, _ := time.Parse("Jan 2 15:04:05 MST 2006", timeStr)
+	return t
 }
 
 // Creates demo tables in the provided database. Tables are destroyed and
diff --git a/cmd/sb51/internal/demodb/db_objects.vdl b/cmd/sb51/internal/demodb/db_objects.vdl
index cbf119a..eb32dcc 100644
--- a/cmd/sb51/internal/demodb/db_objects.vdl
+++ b/cmd/sb51/internal/demodb/db_objects.vdl
@@ -113,3 +113,19 @@
 	Maybe ?Times
 	Rec   map[Array2String]Recursive
 }
+
+type ActOrSatScore union {
+	ActScore uint16
+	SatScore uint16
+}
+
+type Student struct {
+	Name     string
+	TestTime time.Time
+	Score    ActOrSatScore
+}
+
+type AnythingGoes struct {
+	NameOfType string
+	Anything   any
+}
diff --git a/cmd/sb51/internal/demodb/db_objects.vdl.go b/cmd/sb51/internal/demodb/db_objects.vdl.go
index ebdda02..6298c30 100644
--- a/cmd/sb51/internal/demodb/db_objects.vdl.go
+++ b/cmd/sb51/internal/demodb/db_objects.vdl.go
@@ -360,6 +360,64 @@
 }) {
 }
 
+type (
+	// ActOrSatScore represents any single field of the ActOrSatScore union type.
+	ActOrSatScore interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the ActOrSatScore union type.
+		__VDLReflect(__ActOrSatScoreReflect)
+	}
+	// ActOrSatScoreActScore represents field ActScore of the ActOrSatScore union type.
+	ActOrSatScoreActScore struct{ Value uint16 }
+	// ActOrSatScoreSatScore represents field SatScore of the ActOrSatScore union type.
+	ActOrSatScoreSatScore struct{ Value uint16 }
+	// __ActOrSatScoreReflect describes the ActOrSatScore union type.
+	__ActOrSatScoreReflect struct {
+		Name  string `vdl:"v.io/x/ref/cmd/sb51/internal/demodb.ActOrSatScore"`
+		Type  ActOrSatScore
+		Union struct {
+			ActScore ActOrSatScoreActScore
+			SatScore ActOrSatScoreSatScore
+		}
+	}
+)
+
+func (x ActOrSatScoreActScore) Index() int                          { return 0 }
+func (x ActOrSatScoreActScore) Interface() interface{}              { return x.Value }
+func (x ActOrSatScoreActScore) Name() string                        { return "ActScore" }
+func (x ActOrSatScoreActScore) __VDLReflect(__ActOrSatScoreReflect) {}
+
+func (x ActOrSatScoreSatScore) Index() int                          { return 1 }
+func (x ActOrSatScoreSatScore) Interface() interface{}              { return x.Value }
+func (x ActOrSatScoreSatScore) Name() string                        { return "SatScore" }
+func (x ActOrSatScoreSatScore) __VDLReflect(__ActOrSatScoreReflect) {}
+
+type Student struct {
+	Name     string
+	TestTime time.Time
+	Score    ActOrSatScore
+}
+
+func (Student) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/cmd/sb51/internal/demodb.Student"`
+}) {
+}
+
+type AnythingGoes struct {
+	NameOfType string
+	Anything   *vdl.Value
+}
+
+func (AnythingGoes) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/cmd/sb51/internal/demodb.AnythingGoes"`
+}) {
+}
+
 func init() {
 	vdl.Register((*AddressInfo)(nil))
 	vdl.Register((*CreditAgency)(nil))
@@ -380,4 +438,7 @@
 	vdl.Register((*Composite)(nil))
 	vdl.Register((*Times)(nil))
 	vdl.Register((*Recursive)(nil))
+	vdl.Register((*ActOrSatScore)(nil))
+	vdl.Register((*Student)(nil))
+	vdl.Register((*AnythingGoes)(nil))
 }
diff --git a/cmd/sb51/shell.go b/cmd/sb51/shell.go
index fb28d3c..e97419a 100644
--- a/cmd/sb51/shell.go
+++ b/cmd/sb51/shell.go
@@ -82,7 +82,7 @@
 	}
 
 	if flagMakeDemoTables {
-		if err := makeDemoDB(ctx, d); err != nil {
+		if err := makeDemoDB(ctx, env.Stdout, d); err != nil {
 			return err
 		}
 	}
@@ -122,7 +122,7 @@
 					err = dumpDB(ctx, env.Stdout, d)
 				case "make-demo":
 					// TODO(jkline): add an "Are you sure prompt" to give the user a 2nd chance.
-					err = makeDemoDB(ctx, d)
+					err = makeDemoDB(ctx, env.Stdout, d)
 				case "select":
 					err = queryExec(ctx, env.Stdout, d, q)
 				default:
@@ -191,8 +191,10 @@
 	return nil
 }
 
-func makeDemoDB(ctx *context.T, d nosql.Database) error {
-	if err := demodb.PopulateDemoDB(ctx, d); err != nil {
+func makeDemoDB(ctx *context.T, w io.Writer, d nosql.Database) error {
+	if err := demodb.PopulateDemoDB(ctx, d); err == nil {
+		fmt.Fprintln(w, "Demo tables created and populated.")
+	} else {
 		return fmt.Errorf("failed making demo tables: %v", err)
 	}
 	return nil
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 185ff12..80099f4 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -73,7 +73,8 @@
 				// Periodically kill closed connections.
 				m.cache.KillConnections(ctx, 0)
 			case e := <-events:
-				if e.Status.Closed && !e.Status.LocalLameDuck || e.Status.LocalLameDuck {
+				if e.Status.Closed && !e.Status.LocalLameDuck ||
+					!e.Status.Closed && e.Status.LocalLameDuck {
 					m.ls.activeConns.Done()
 				}
 			}
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index ecbbc0c..9c1b654 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -409,7 +409,15 @@
 		Continued:    vc.Continued,
 	}
 
-	return s.proxy.OnChange(mc)
+	// Block until client acks receiving and processing the previous change before sending more.
+	// This effectively creates a flow control mechanism.
+	// TODO(aghassemi): Consider sending more than a single change event before
+	// blocking until receiving ack.
+	ack, err := s.proxy.OnChange(mc)
+	if !ack && err != nil {
+		err = verror.NewErrInternal(s.ctx)
+	}
+	return err
 }
 
 func (s *watchGlobStreamImpl) Recv(_ interface{}) error {
@@ -646,10 +654,20 @@
 		return verror.NewErrInternal(s.ctx)
 	}
 
-	return s.proxy.OnKeyValue(mojom.KeyValue{
+	// Block until client acks receiving and processing the previous change before sending more.
+	// This effectively creates a flow control mechanism.
+	// TODO(aghassemi): Consider sending more than a single KeyValue before
+	// blocking until receiving ack.
+	ack, err := s.proxy.OnKeyValue(mojom.KeyValue{
 		Key:   kv.Key,
 		Value: kv.Value,
 	})
+
+	if !ack && err != nil {
+		err = verror.NewErrInternal(s.ctx)
+	}
+
+	return err
 }
 
 func (s *scanStreamImpl) Recv(_ interface{}) error {