Merge "syncbase: sb51: add tables to demoDB (needed for syncQL tutorial)"
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index ecbbc0c..9c1b654 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -409,7 +409,15 @@
Continued: vc.Continued,
}
- return s.proxy.OnChange(mc)
+ // Block until client acks receiving and processing the previous change before sending more.
+ // This effectively creates a flow control mechanism.
+ // TODO(aghassemi): Consider sending more than a single change event before
+ // blocking until receiving ack.
+ ack, err := s.proxy.OnChange(mc)
+ if !ack && err != nil {
+ err = verror.NewErrInternal(s.ctx)
+ }
+ return err
}
func (s *watchGlobStreamImpl) Recv(_ interface{}) error {
@@ -646,10 +654,20 @@
return verror.NewErrInternal(s.ctx)
}
- return s.proxy.OnKeyValue(mojom.KeyValue{
+ // Block until client acks receiving and processing the previous change before sending more.
+ // This effectively creates a flow control mechanism.
+ // TODO(aghassemi): Consider sending more than a single KeyValue before
+ // blocking until receiving ack.
+ ack, err := s.proxy.OnKeyValue(mojom.KeyValue{
Key: kv.Key,
Value: kv.Value,
})
+
+ if !ack && err != nil {
+ err = verror.NewErrInternal(s.ctx)
+ }
+
+ return err
}
func (s *scanStreamImpl) Recv(_ interface{}) error {