Merge "syncbase: syncQL: replace t with Type(), remove restrictions on k"
diff --git a/x/ref/services/syncbase/server/mojo_impl.go b/x/ref/services/syncbase/server/mojo_impl.go
new file mode 100644
index 0000000..e99195e
--- /dev/null
+++ b/x/ref/services/syncbase/server/mojo_impl.go
@@ -0,0 +1,193 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build mojo
+
+package server
+
+import (
+	wire "mojom/syncbase"
+)
+
+type mojoImpl struct {
+	s *service
+}
+
+func NewMojoImpl(s interface{}) *mojoImpl {
+	return &mojoImpl{s: s.(*service)}
+}
+
+// TODO(sadovsky): Implement all stubs. The high-level plan is to translate Mojo
+// requests into Vanadium requests by constructing suitable ctx and call objects
+// and then performing the same work that server.Dispatcher and nosql.Dispatcher
+// do (perhaps with some refactoring to share dispatcher code).
+
+// TODO(sadovsky): Implement security hack, where we derive a client blessing
+// from the Syncbase server's blessing.
+
+////////////////////////////////////////
+// Service
+
+func (m *mojoImpl) ServiceGetPermissions() (wire.Error, wire.Perms, string, error) {
+	return wire.Error{}, wire.Perms{}, "", nil
+}
+
+func (m *mojoImpl) ServiceSetPermissions(perms wire.Perms, version string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+////////////////////////////////////////
+// App
+
+func (m *mojoImpl) AppCreate(name string, perms wire.Perms) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) AppDelete(name string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) AppExists(name string) (wire.Error, bool, error) {
+	return wire.Error{}, false, nil
+}
+
+func (m *mojoImpl) AppGetPermissions(name string) (wire.Error, wire.Perms, string, error) {
+	return wire.Error{}, wire.Perms{}, "", nil
+}
+
+func (m *mojoImpl) AppSetPermissions(name string, perms wire.Perms, version string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+////////////////////////////////////////
+// nosql.Database
+
+func (m *mojoImpl) DbCreate(name string, perms wire.Perms) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbDelete(name string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbExists(name string) (wire.Error, bool, error) {
+	return wire.Error{}, false, nil
+}
+
+func (m *mojoImpl) DbExec(name string, stream wire.ExecStream_Request) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbBeginBatch(name string, bo *wire.BatchOptions) (wire.Error, string, error) {
+	return wire.Error{}, "", nil
+}
+
+func (m *mojoImpl) DbCommit(name string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbAbort(name string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbGetPermissions(name string) (wire.Error, wire.Perms, string, error) {
+	return wire.Error{}, wire.Perms{}, "", nil
+}
+
+func (m *mojoImpl) DbSetPermissions(name string, perms wire.Perms, version string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+////////////////////////////////////////
+// nosql.Database:SyncGroupManager
+
+func (m *mojoImpl) DbGetSyncGroupNames(name string) (wire.Error, []string, error) {
+	return wire.Error{}, nil, nil
+}
+
+func (m *mojoImpl) DbCreateSyncGroup(name, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbJoinSyncGroup(name, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbLeaveSyncGroup(name, sgName string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbDestroySyncGroup(name, sgName string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbEjectFromSyncGroup(name, sgName string, member string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbGetSyncGroupSpec(name, sgName string) (wire.Error, wire.SyncGroupSpec, string, error) {
+	return wire.Error{}, wire.SyncGroupSpec{}, "", nil
+}
+
+func (m *mojoImpl) DbSetSyncGroupSpec(name, sgName string, spec wire.SyncGroupSpec, version string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) DbGetSyncGroupMembers(name, sgName string) (wire.Error, map[string]wire.SyncGroupMemberInfo, error) {
+	return wire.Error{}, nil, nil
+}
+
+////////////////////////////////////////
+// nosql.Table
+
+func (m *mojoImpl) TableCreate(name string, perms wire.Perms) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) TableDelete(name string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) TableExists(name string) (wire.Error, bool, error) {
+	return wire.Error{}, false, nil
+}
+
+func (m *mojoImpl) TableDeleteRowRange(name string, start, limit []byte) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) TableScan(name string, start, limit []byte, stream wire.ScanStream_Request) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) TableGetPermissions(name, key string) (wire.Error, []wire.PrefixPerms, error) {
+	return wire.Error{}, nil, nil
+}
+
+func (m *mojoImpl) TableSetPermissions(name, prefix string, perms wire.Perms) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) TableDeletePermissions(name, prefix string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+////////////////////////////////////////
+// nosql.Row
+
+func (m *mojoImpl) RowExists(name string) (wire.Error, bool, error) {
+	return wire.Error{}, false, nil
+}
+
+func (m *mojoImpl) RowGet(name string) (wire.Error, []byte, error) {
+	return wire.Error{}, nil, nil
+}
+
+func (m *mojoImpl) RowPut(name string, value []byte) (wire.Error, error) {
+	return wire.Error{}, nil
+}
+
+func (m *mojoImpl) RowDelete(name string) (wire.Error, error) {
+	return wire.Error{}, nil
+}
diff --git a/x/ref/services/syncbase/syncbased/main.go b/x/ref/services/syncbase/syncbased/main.go
index c21264f..7fc1e6d 100644
--- a/x/ref/services/syncbase/syncbased/main.go
+++ b/x/ref/services/syncbase/syncbased/main.go
@@ -2,22 +2,18 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// syncbased is a syncbase daemon.
 package main
 
-// Example invocation:
-// syncbased --veyron.tcp.address="127.0.0.1:0" --name=syncbased
-
 import (
 	"flag"
 
 	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/v23"
+	"v.io/v23/context"
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/x/lib/vlog"
 	"v.io/x/ref/lib/security/securityflag"
-	"v.io/x/ref/lib/signals"
 	_ "v.io/x/ref/runtime/factories/roaming"
 )
 
@@ -39,10 +35,10 @@
 	return perms
 }
 
-func main() {
-	ctx, shutdown := v23.Init()
-	defer shutdown()
-
+// TODO(sadovsky): We return interface{} as a quick hack to support Mojo. The
+// actual return value is of type *server.service, which we don't want to
+// export.
+func Serve(ctx *context.T) interface{} {
 	s, err := v23.NewServer(ctx)
 	if err != nil {
 		vlog.Fatal("v23.NewServer() failed: ", err)
@@ -81,6 +77,5 @@
 		vlog.Info("Mounted at: ", *name)
 	}
 
-	// Wait forever.
-	<-signals.ShutdownOnSignals(ctx)
+	return service
 }
diff --git a/x/ref/services/syncbase/syncbased/mojo_main.go b/x/ref/services/syncbase/syncbased/mojo_main.go
new file mode 100644
index 0000000..2be9286
--- /dev/null
+++ b/x/ref/services/syncbase/syncbased/mojo_main.go
@@ -0,0 +1,79 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build mojo
+
+package main
+
+// To build:
+// cd $V23_ROOT/experimental/projects/ether
+// make gen/mojo/syncbased.mojo
+
+// TODO(sadovsky): Currently fails with error "flag provided but not defined:
+// -child-connection-id". Need to debug. Probably just need to peel off any Mojo
+// flags before having the Syncbase code do its flag parsing.
+
+import (
+	"log"
+
+	"golang.org/x/mobile/app"
+
+	"mojo/public/go/application"
+	"mojo/public/go/bindings"
+	"mojo/public/go/system"
+
+	"mojom/syncbase"
+
+	"v.io/syncbase/x/ref/services/syncbase/server"
+	"v.io/v23"
+)
+
+//#include "mojo/public/c/system/types.h"
+import "C"
+
+type delegate struct {
+	service interface{} // actual type is *server.service
+	stubs   []*bindings.Stub
+}
+
+func (d *delegate) Initialize(ctx application.Context) {}
+
+func (d *delegate) Create(req syncbase.Syncbase_Request) {
+	impl := server.NewMojoImpl(d.service)
+	stub := syncbase.NewSyncbaseStub(req, impl, bindings.GetAsyncWaiter())
+	d.stubs = append(d.stubs, stub)
+	go func() {
+		for {
+			if err := stub.ServeRequest(); err != nil {
+				connErr, ok := err.(*bindings.ConnectionError)
+				if !ok || !connErr.Closed() {
+					log.Println(err)
+				}
+				break
+			}
+		}
+	}()
+}
+
+func (d *delegate) AcceptConnection(conn *application.Connection) {
+	conn.ProvideServices(&syncbase.Syncbase_ServiceFactory{d})
+}
+
+func (d *delegate) Quit() {
+	for _, stub := range d.stubs {
+		stub.Close()
+	}
+}
+
+//export MojoMain
+func MojoMain(handle C.MojoHandle) C.MojoResult {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	application.Run(&delegate{service: Serve(ctx)}, system.MojoHandle(handle))
+	return C.MOJO_RESULT_OK
+}
+
+func main() {
+	app.Run(app.Callbacks{})
+}
diff --git a/x/ref/services/syncbase/syncbased/v23_main.go b/x/ref/services/syncbase/syncbased/v23_main.go
new file mode 100644
index 0000000..1651dc1
--- /dev/null
+++ b/x/ref/services/syncbase/syncbased/v23_main.go
@@ -0,0 +1,23 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !mojo
+
+// syncbased is a syncbase daemon.
+package main
+
+// Example invocation:
+// syncbased --veyron.tcp.address="127.0.0.1:0" --name=syncbased
+
+import (
+	"v.io/v23"
+	"v.io/x/ref/lib/signals"
+)
+
+func main() {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	Serve(ctx)
+	<-signals.ShutdownOnSignals(ctx)
+}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index e70d0d2..8f5f60c 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -71,10 +71,29 @@
 			return
 
 		case <-ticker.C:
-			s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
-				s.processDatabase(ctx, appName, dbName, st)
-				return false
-			})
+			s.processStoreUpdates(ctx)
+		}
+	}
+}
+
+// processStoreUpdates fetches updates from all databases and processes them.
+// To maintain fairness among databases, it processes one batch update from
+// each database, in a round-robin manner, until there are no further updates
+// from any database.
+func (s *syncService) processStoreUpdates(ctx *context.T) {
+	for {
+		total, active := 0, 0
+		s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
+			if s.processDatabase(ctx, appName, dbName, st) {
+				active++
+			}
+			total++
+			return false
+		})
+
+		vlog.VI(2).Infof("sync: processStoreUpdates: %d/%d databases had updates", active, total)
+		if active == 0 {
+			break
 		}
 	}
 }
@@ -82,8 +101,9 @@
 // processDatabase fetches from the given database at most one new batch update
 // (transaction) and processes it.  The one-batch limit prevents one database
 // from starving others.  A batch is stored as a contiguous set of log records
-// ending with one record having the "continued" flag set to false.
-func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+// ending with one record having the "continued" flag set to false.  The call
+// returns true if a new batch update was processed.
+func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) bool {
 	s.thLock.Lock()
 	defer s.thLock.Unlock()
 
@@ -94,7 +114,7 @@
 	if err != nil {
 		if verror.ErrorID(err) != verror.ErrNoExist.ID {
 			vlog.Errorf("sync: processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
-			return
+			return false
 		}
 		resMark = ""
 	}
@@ -105,7 +125,9 @@
 	// Get a batch of watch log entries, if any, after this resume marker.
 	if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
 		s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
+		return true
 	}
+	return false
 }
 
 // processWatchLogBatch parses the given batch of watch log records, updates the