x/ref/services/syncbase: Implementing watch API for Mojo/Dart

MultiPart: 2/3
Change-Id: Iac08c0a7ffc0196e0bd1fa48f9d7958a591d03c5
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index 556350c..eae0967 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -25,6 +25,8 @@
 	"v.io/v23/services/permissions"
 	wire "v.io/v23/services/syncbase"
 	nosqlwire "v.io/v23/services/syncbase/nosql"
+	watchwire "v.io/v23/services/watch"
+	"v.io/v23/syncbase/nosql"
 	"v.io/v23/verror"
 	"v.io/v23/vtrace"
 )
@@ -345,6 +347,83 @@
 	return toMojoError(err), nil
 }
 
+type watchGlobStreamImpl struct {
+	ctx   *context.T
+	proxy *mojom.WatchGlobStream_Proxy
+}
+
+func (s *watchGlobStreamImpl) Send(item interface{}) error {
+	c, ok := item.(watchwire.Change)
+	if !ok {
+		return verror.NewErrInternal(s.ctx)
+	}
+
+	vc := nosql.ToWatchChange(c)
+	mc := mojom.WatchChange{
+		TableName:    vc.Table,
+		RowName:      vc.Row,
+		ChangeType:   uint32(vc.ChangeType),
+		ValueBytes:   vc.ValueBytes,
+		ResumeMarker: vc.ResumeMarker,
+		FromSync:     vc.FromSync,
+		Continued:    vc.Continued,
+	}
+
+	return s.proxy.OnChange(mc)
+}
+
+func (s *watchGlobStreamImpl) Recv(_ interface{}) error {
+	// This should never be called.
+	return verror.NewErrInternal(s.ctx)
+}
+
+var _ rpc.Stream = (*watchGlobStreamImpl)(nil)
+
+func (m *mojoImpl) DbWatchGlob(name string, mReq mojom.GlobRequest, ptr mojom.WatchGlobStream_Pointer) (mojom.Error, error) {
+	ctx, call := m.newCtxCall(name, methodDesc(watchwire.GlobWatcherDesc, "WatchGlob"))
+	stub, err := m.getDb(ctx, call, name)
+	if err != nil {
+		return toMojoError(err), nil
+	}
+
+	var vReq = watchwire.GlobRequest{
+		Pattern:      mReq.Pattern,
+		ResumeMarker: watchwire.ResumeMarker(mReq.ResumeMarker),
+	}
+	proxy := mojom.NewWatchGlobStreamProxy(ptr, bindings.GetAsyncWaiter())
+
+	watchGlobServerCallStub := &watchwire.GlobWatcherWatchGlobServerCallStub{struct {
+		rpc.Stream
+		rpc.ServerCall
+	}{
+		&watchGlobStreamImpl{
+			ctx:   ctx,
+			proxy: proxy,
+		},
+		call,
+	}}
+
+	go func() {
+		var err = stub.WatchGlob(ctx, watchGlobServerCallStub, vReq)
+		// NOTE(nlacasse): Since we are already streaming, we send any error back
+		// to the client on the stream.  The WatchGlob function itself should not
+		// return an error at this point.
+		proxy.OnReturn(toMojoError(err))
+	}()
+
+	return mojom.Error{}, nil
+}
+
+func (m *mojoImpl) DbGetResumeMarker(name string) (mojom.Error, []byte, error) {
+	ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.DatabaseWatcherDesc, "GetResumeMarker"))
+	stub, err := m.getDb(ctx, call, name)
+	if err != nil {
+		return toMojoError(err), nil, nil
+	}
+	marker, err := stub.GetResumeMarker(ctx, call)
+	return toMojoError(err), marker, nil
+}
+
 ////////////////////////////////////////
 // nosql.Database:SyncGroupManager
 
@@ -476,12 +555,15 @@
 		call,
 	}}
 
-	err = stub.Scan(ctx, tableScanServerCallStub, NoSchema, start, limit)
+	go func() {
+		var err = stub.Scan(ctx, tableScanServerCallStub, NoSchema, start, limit)
 
-	// NOTE(nlacasse): Since we are already streaming, we send any error back
-	// to the client on the stream.  The TableScan function itself should not
-	// return an error at this point.
-	proxy.OnDone(toMojoError(err))
+		// NOTE(nlacasse): Since we are already streaming, we send any error back
+		// to the client on the stream.  The TableScan function itself should not
+		// return an error at this point.
+		proxy.OnReturn(toMojoError(err))
+	}()
+
 	return mojom.Error{}, nil
 }