x/ref/services/syncbase: Implementing watch API for Mojo/Dart
MultiPart: 2/3
Change-Id: Iac08c0a7ffc0196e0bd1fa48f9d7958a591d03c5
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index 556350c..eae0967 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -25,6 +25,8 @@
"v.io/v23/services/permissions"
wire "v.io/v23/services/syncbase"
nosqlwire "v.io/v23/services/syncbase/nosql"
+ watchwire "v.io/v23/services/watch"
+ "v.io/v23/syncbase/nosql"
"v.io/v23/verror"
"v.io/v23/vtrace"
)
@@ -345,6 +347,83 @@
return toMojoError(err), nil
}
+type watchGlobStreamImpl struct {
+ ctx *context.T
+ proxy *mojom.WatchGlobStream_Proxy
+}
+
+func (s *watchGlobStreamImpl) Send(item interface{}) error {
+ c, ok := item.(watchwire.Change)
+ if !ok {
+ return verror.NewErrInternal(s.ctx)
+ }
+
+ vc := nosql.ToWatchChange(c)
+ mc := mojom.WatchChange{
+ TableName: vc.Table,
+ RowName: vc.Row,
+ ChangeType: uint32(vc.ChangeType),
+ ValueBytes: vc.ValueBytes,
+ ResumeMarker: vc.ResumeMarker,
+ FromSync: vc.FromSync,
+ Continued: vc.Continued,
+ }
+
+ return s.proxy.OnChange(mc)
+}
+
+func (s *watchGlobStreamImpl) Recv(_ interface{}) error {
+ // This should never be called.
+ return verror.NewErrInternal(s.ctx)
+}
+
+var _ rpc.Stream = (*watchGlobStreamImpl)(nil)
+
+func (m *mojoImpl) DbWatchGlob(name string, mReq mojom.GlobRequest, ptr mojom.WatchGlobStream_Pointer) (mojom.Error, error) {
+ ctx, call := m.newCtxCall(name, methodDesc(watchwire.GlobWatcherDesc, "WatchGlob"))
+ stub, err := m.getDb(ctx, call, name)
+ if err != nil {
+ return toMojoError(err), nil
+ }
+
+ var vReq = watchwire.GlobRequest{
+ Pattern: mReq.Pattern,
+ ResumeMarker: watchwire.ResumeMarker(mReq.ResumeMarker),
+ }
+ proxy := mojom.NewWatchGlobStreamProxy(ptr, bindings.GetAsyncWaiter())
+
+ watchGlobServerCallStub := &watchwire.GlobWatcherWatchGlobServerCallStub{struct {
+ rpc.Stream
+ rpc.ServerCall
+ }{
+ &watchGlobStreamImpl{
+ ctx: ctx,
+ proxy: proxy,
+ },
+ call,
+ }}
+
+ go func() {
+ var err = stub.WatchGlob(ctx, watchGlobServerCallStub, vReq)
+ // NOTE(nlacasse): Since we are already streaming, we send any error back
+ // to the client on the stream. The WatchGlob function itself should not
+ // return an error at this point.
+ proxy.OnReturn(toMojoError(err))
+ }()
+
+ return mojom.Error{}, nil
+}
+
+func (m *mojoImpl) DbGetResumeMarker(name string) (mojom.Error, []byte, error) {
+ ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.DatabaseWatcherDesc, "GetResumeMarker"))
+ stub, err := m.getDb(ctx, call, name)
+ if err != nil {
+ return toMojoError(err), nil, nil
+ }
+ marker, err := stub.GetResumeMarker(ctx, call)
+ return toMojoError(err), marker, nil
+}
+
////////////////////////////////////////
// nosql.Database:SyncGroupManager
@@ -476,12 +555,15 @@
call,
}}
- err = stub.Scan(ctx, tableScanServerCallStub, NoSchema, start, limit)
+ go func() {
+ var err = stub.Scan(ctx, tableScanServerCallStub, NoSchema, start, limit)
- // NOTE(nlacasse): Since we are already streaming, we send any error back
- // to the client on the stream. The TableScan function itself should not
- // return an error at this point.
- proxy.OnDone(toMojoError(err))
+ // NOTE(nlacasse): Since we are already streaming, we send any error back
+ // to the client on the stream. The TableScan function itself should not
+ // return an error at this point.
+ proxy.OnReturn(toMojoError(err))
+ }()
+
return mojom.Error{}, nil
}