Merge "syncbase: sb51: add tables to demoDB (needed for syncQL tutorial)"
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index ecbbc0c..9c1b654 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -409,7 +409,15 @@
 		Continued:    vc.Continued,
 	}
 
-	return s.proxy.OnChange(mc)
+	// Block until client acks receiving and processing the previous change before sending more.
+	// This effectively creates a flow control mechanism.
+	// TODO(aghassemi): Consider sending more than a single change event before
+	// blocking until receiving ack.
+	ack, err := s.proxy.OnChange(mc)
+	if !ack && err != nil {
+		err = verror.NewErrInternal(s.ctx)
+	}
+	return err
 }
 
 func (s *watchGlobStreamImpl) Recv(_ interface{}) error {
@@ -646,10 +654,20 @@
 		return verror.NewErrInternal(s.ctx)
 	}
 
-	return s.proxy.OnKeyValue(mojom.KeyValue{
+	// Block until client acks receiving and processing the previous change before sending more.
+	// This effectively creates a flow control mechanism.
+	// TODO(aghassemi): Consider sending more than a single KeyValue before
+	// blocking until receiving ack.
+	ack, err := s.proxy.OnKeyValue(mojom.KeyValue{
 		Key:   kv.Key,
 		Value: kv.Value,
 	})
+
+	if !ack && err != nil {
+		err = verror.NewErrInternal(s.ctx)
+	}
+
+	return err
 }
 
 func (s *scanStreamImpl) Recv(_ interface{}) error {