TBR: syncbase-mojo: Implement table.Scan

Re-do of go/vcl/14746 that got accidentally stomped by broken v23 cl
tool.

Change-Id: I1d145ad4952536ddf5f9e70aab3f0266e68fb72e
diff --git a/x/ref/services/syncbase/server/mojo_impl.go b/x/ref/services/syncbase/server/mojo_impl.go
index 52e2d90..973bf7d 100644
--- a/x/ref/services/syncbase/server/mojo_impl.go
+++ b/x/ref/services/syncbase/server/mojo_impl.go
@@ -16,6 +16,8 @@
 	"fmt"
 	"strings"
 
+	"mojo/public/go/bindings"
+
 	mojom "mojom/syncbase"
 	wire "v.io/syncbase/v23/services/syncbase"
 	nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -296,7 +298,7 @@
 	return toMojoError(err), exists, nil
 }
 
-func (m *mojoImpl) DbExec(name string, query string, stream mojom.ExecStream_Request) (mojom.Error, error) {
+func (m *mojoImpl) DbExec(name string, query string, stream mojom.ExecStream_Pointer) (mojom.Error, error) {
 	return mojom.Error{}, nil
 }
 
@@ -423,7 +425,57 @@
 	return mojom.Error{}, nil
 }
 
-func (m *mojoImpl) TableScan(name string, start, limit []byte, stream mojom.ScanStream_Request) (mojom.Error, error) {
+type scanStreamImpl struct {
+	ctx   *context.T
+	proxy *mojom.ScanStream_Proxy
+}
+
+func (s *scanStreamImpl) Send(item interface{}) error {
+	kv, ok := item.(nosqlwire.KeyValue)
+	if !ok {
+		return verror.NewErrInternal(s.ctx)
+	}
+
+	return s.proxy.OnKeyValue(mojom.KeyValue{
+		Key:   kv.Key,
+		Value: kv.Value,
+	})
+}
+
+func (s *scanStreamImpl) Recv(_ interface{}) error {
+	// This should never be called.
+	return verror.NewErrInternal(s.ctx)
+}
+
+var _ rpc.Stream = (*scanStreamImpl)(nil)
+
+// TODO(nlacasse): Provide some way for the client to cancel the stream.
+func (m *mojoImpl) TableScan(name string, start, limit []byte, ptr mojom.ScanStream_Pointer) (mojom.Error, error) {
+	ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.TableDesc, "Scan"))
+	stub, err := m.getTable(ctx, call, name)
+	if err != nil {
+		return toMojoError(err), nil
+	}
+
+	proxy := mojom.NewScanStreamProxy(ptr, bindings.GetAsyncWaiter())
+
+	tableScanServerCallStub := &nosqlwire.TableScanServerCallStub{struct {
+		rpc.Stream
+		rpc.ServerCall
+	}{
+		&scanStreamImpl{
+			ctx:   ctx,
+			proxy: proxy,
+		},
+		call,
+	}}
+
+	err = stub.Scan(ctx, tableScanServerCallStub, NoSchema, start, limit)
+
+	// NOTE(nlacasse): Since we are already streaming, we send any error back
+	// to the client on the stream.  The TableScan function itself should not
+	// return an error at this point.
+	proxy.OnDone(toMojoError(err))
 	return mojom.Error{}, nil
 }
 
diff --git a/x/ref/services/syncbase/syncbased/mojo_main.go b/x/ref/services/syncbase/syncbased/mojo_main.go
index b478d4e..74a247f 100644
--- a/x/ref/services/syncbase/syncbased/mojo_main.go
+++ b/x/ref/services/syncbase/syncbased/mojo_main.go
@@ -8,7 +8,7 @@
 
 // To build:
 // cd $V23_ROOT/experimental/projects/ether
-// make gen/mojo/syncbased.mojo
+// make build
 
 import (
 	"log"