TBR: syncbase-mojo: Implement table.Scan
Re-do of go/vcl/14746 that got accidentally stomped by broken v23 cl
tool.
Change-Id: I1d145ad4952536ddf5f9e70aab3f0266e68fb72e
diff --git a/x/ref/services/syncbase/server/mojo_impl.go b/x/ref/services/syncbase/server/mojo_impl.go
index 52e2d90..973bf7d 100644
--- a/x/ref/services/syncbase/server/mojo_impl.go
+++ b/x/ref/services/syncbase/server/mojo_impl.go
@@ -16,6 +16,8 @@
"fmt"
"strings"
+ "mojo/public/go/bindings"
+
mojom "mojom/syncbase"
wire "v.io/syncbase/v23/services/syncbase"
nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -296,7 +298,7 @@
return toMojoError(err), exists, nil
}
-func (m *mojoImpl) DbExec(name string, query string, stream mojom.ExecStream_Request) (mojom.Error, error) {
+func (m *mojoImpl) DbExec(name string, query string, stream mojom.ExecStream_Pointer) (mojom.Error, error) {
return mojom.Error{}, nil
}
@@ -423,7 +425,57 @@
return mojom.Error{}, nil
}
-func (m *mojoImpl) TableScan(name string, start, limit []byte, stream mojom.ScanStream_Request) (mojom.Error, error) {
+type scanStreamImpl struct {
+ ctx *context.T
+ proxy *mojom.ScanStream_Proxy
+}
+
+func (s *scanStreamImpl) Send(item interface{}) error {
+ kv, ok := item.(nosqlwire.KeyValue)
+ if !ok {
+ return verror.NewErrInternal(s.ctx)
+ }
+
+ return s.proxy.OnKeyValue(mojom.KeyValue{
+ Key: kv.Key,
+ Value: kv.Value,
+ })
+}
+
+func (s *scanStreamImpl) Recv(_ interface{}) error {
+ // This should never be called.
+ return verror.NewErrInternal(s.ctx)
+}
+
+var _ rpc.Stream = (*scanStreamImpl)(nil)
+
+// TODO(nlacasse): Provide some way for the client to cancel the stream.
+func (m *mojoImpl) TableScan(name string, start, limit []byte, ptr mojom.ScanStream_Pointer) (mojom.Error, error) {
+ ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.TableDesc, "Scan"))
+ stub, err := m.getTable(ctx, call, name)
+ if err != nil {
+ return toMojoError(err), nil
+ }
+
+ proxy := mojom.NewScanStreamProxy(ptr, bindings.GetAsyncWaiter())
+
+ tableScanServerCallStub := &nosqlwire.TableScanServerCallStub{struct {
+ rpc.Stream
+ rpc.ServerCall
+ }{
+ &scanStreamImpl{
+ ctx: ctx,
+ proxy: proxy,
+ },
+ call,
+ }}
+
+ err = stub.Scan(ctx, tableScanServerCallStub, NoSchema, start, limit)
+
+ // NOTE(nlacasse): Since we are already streaming, we send any error back
+ // to the client on the stream. The TableScan function itself should not
+ // return an error at this point.
+ proxy.OnDone(toMojoError(err))
return mojom.Error{}, nil
}
diff --git a/x/ref/services/syncbase/syncbased/mojo_main.go b/x/ref/services/syncbase/syncbased/mojo_main.go
index b478d4e..74a247f 100644
--- a/x/ref/services/syncbase/syncbased/mojo_main.go
+++ b/x/ref/services/syncbase/syncbased/mojo_main.go
@@ -8,7 +8,7 @@
// To build:
// cd $V23_ROOT/experimental/projects/ether
-// make gen/mojo/syncbased.mojo
+// make build
import (
"log"