Merge "syncbase: syqnQL: query_parser/doc.go: doc.go syntax update"
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index b56275b..080a1f4 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -243,13 +243,13 @@
 
 	// GetBlobSize returns the count of bytes written as part of the blob
 	// (committed or uncommitted).
-	GetBlobSize(br BlobRef) (uint64 | error) {access.Read}
+	GetBlobSize(br BlobRef) (int64 | error) {access.Read}
 
 	// DeleteBlob locally deletes the blob (committed or uncommitted).
 	DeleteBlob(br BlobRef) error {access.Write}
 
 	// GetBlob returns the byte stream from a committed blob starting at offset.
-	GetBlob(br BlobRef, offset uint64) stream<_, []byte> error {access.Read}
+	GetBlob(br BlobRef, offset int64) stream<_, []byte> error {access.Read}
 
 	// FetchBlob initiates fetching a blob if not locally found. priority
 	// controls the network priority of the blob. Higher priority blobs are
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 447bd7d..6a63970 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -727,11 +727,11 @@
 	CommitBlob(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) error
 	// GetBlobSize returns the count of bytes written as part of the blob
 	// (committed or uncommitted).
-	GetBlobSize(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) (uint64, error)
+	GetBlobSize(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) (int64, error)
 	// DeleteBlob locally deletes the blob (committed or uncommitted).
 	DeleteBlob(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) error
 	// GetBlob returns the byte stream from a committed blob starting at offset.
-	GetBlob(ctx *context.T, br BlobRef, offset uint64, opts ...rpc.CallOpt) (BlobManagerGetBlobClientCall, error)
+	GetBlob(ctx *context.T, br BlobRef, offset int64, opts ...rpc.CallOpt) (BlobManagerGetBlobClientCall, error)
 	// FetchBlob initiates fetching a blob if not locally found. priority
 	// controls the network priority of the blob. Higher priority blobs are
 	// fetched before the lower priority ones. However an ongoing blob
@@ -781,7 +781,7 @@
 	return
 }
 
-func (c implBlobManagerClientStub) GetBlobSize(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (o0 uint64, err error) {
+func (c implBlobManagerClientStub) GetBlobSize(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (o0 int64, err error) {
 	err = v23.GetClient(ctx).Call(ctx, c.name, "GetBlobSize", []interface{}{i0}, []interface{}{&o0}, opts...)
 	return
 }
@@ -791,7 +791,7 @@
 	return
 }
 
-func (c implBlobManagerClientStub) GetBlob(ctx *context.T, i0 BlobRef, i1 uint64, opts ...rpc.CallOpt) (ocall BlobManagerGetBlobClientCall, err error) {
+func (c implBlobManagerClientStub) GetBlob(ctx *context.T, i0 BlobRef, i1 int64, opts ...rpc.CallOpt) (ocall BlobManagerGetBlobClientCall, err error) {
 	var call rpc.ClientCall
 	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetBlob", []interface{}{i0, i1}, opts...); err != nil {
 		return
@@ -1041,11 +1041,11 @@
 	CommitBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
 	// GetBlobSize returns the count of bytes written as part of the blob
 	// (committed or uncommitted).
-	GetBlobSize(ctx *context.T, call rpc.ServerCall, br BlobRef) (uint64, error)
+	GetBlobSize(ctx *context.T, call rpc.ServerCall, br BlobRef) (int64, error)
 	// DeleteBlob locally deletes the blob (committed or uncommitted).
 	DeleteBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
 	// GetBlob returns the byte stream from a committed blob starting at offset.
-	GetBlob(ctx *context.T, call BlobManagerGetBlobServerCall, br BlobRef, offset uint64) error
+	GetBlob(ctx *context.T, call BlobManagerGetBlobServerCall, br BlobRef, offset int64) error
 	// FetchBlob initiates fetching a blob if not locally found. priority
 	// controls the network priority of the blob. Higher priority blobs are
 	// fetched before the lower priority ones. However an ongoing blob
@@ -1078,11 +1078,11 @@
 	CommitBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
 	// GetBlobSize returns the count of bytes written as part of the blob
 	// (committed or uncommitted).
-	GetBlobSize(ctx *context.T, call rpc.ServerCall, br BlobRef) (uint64, error)
+	GetBlobSize(ctx *context.T, call rpc.ServerCall, br BlobRef) (int64, error)
 	// DeleteBlob locally deletes the blob (committed or uncommitted).
 	DeleteBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
 	// GetBlob returns the byte stream from a committed blob starting at offset.
-	GetBlob(ctx *context.T, call *BlobManagerGetBlobServerCallStub, br BlobRef, offset uint64) error
+	GetBlob(ctx *context.T, call *BlobManagerGetBlobServerCallStub, br BlobRef, offset int64) error
 	// FetchBlob initiates fetching a blob if not locally found. priority
 	// controls the network priority of the blob. Higher priority blobs are
 	// fetched before the lower priority ones. However an ongoing blob
@@ -1139,7 +1139,7 @@
 	return s.impl.CommitBlob(ctx, call, i0)
 }
 
-func (s implBlobManagerServerStub) GetBlobSize(ctx *context.T, call rpc.ServerCall, i0 BlobRef) (uint64, error) {
+func (s implBlobManagerServerStub) GetBlobSize(ctx *context.T, call rpc.ServerCall, i0 BlobRef) (int64, error) {
 	return s.impl.GetBlobSize(ctx, call, i0)
 }
 
@@ -1147,7 +1147,7 @@
 	return s.impl.DeleteBlob(ctx, call, i0)
 }
 
-func (s implBlobManagerServerStub) GetBlob(ctx *context.T, call *BlobManagerGetBlobServerCallStub, i0 BlobRef, i1 uint64) error {
+func (s implBlobManagerServerStub) GetBlob(ctx *context.T, call *BlobManagerGetBlobServerCallStub, i0 BlobRef, i1 int64) error {
 	return s.impl.GetBlob(ctx, call, i0, i1)
 }
 
@@ -1215,7 +1215,7 @@
 				{"br", ``}, // BlobRef
 			},
 			OutArgs: []rpc.ArgDesc{
-				{"", ``}, // uint64
+				{"", ``}, // int64
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
@@ -1232,7 +1232,7 @@
 			Doc:  "// GetBlob returns the byte stream from a committed blob starting at offset.",
 			InArgs: []rpc.ArgDesc{
 				{"br", ``},     // BlobRef
-				{"offset", ``}, // uint64
+				{"offset", ``}, // int64
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index adb0058..3d8054c 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -677,7 +677,7 @@
 	numTbs, _ := strconv.Atoi(args[3])
 	prefixes := args[4:]
 
-	time.Sleep(15 * time.Second)
+	time.Sleep(20 * time.Second)
 
 	for i := 0; i < numApps; i++ {
 		appName := fmt.Sprintf("a%d", i)
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index cb8bca8..d3386a0 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -194,6 +194,11 @@
 	return fscabs, err
 }
 
+// Close() closes the FsCaBlobStore. {
+func (fscabs *FsCaBlobStore) Close() error {
+	return fscabs.cm.Close()
+}
+
 // Root() returns the name of the root directory where *fscabs is stored.
 func (fscabs *FsCaBlobStore) Root() string {
 	return fscabs.rootName
diff --git a/x/ref/services/syncbase/localblobstore/model.go b/x/ref/services/syncbase/localblobstore/model.go
index 16af4cf..ba08726 100644
--- a/x/ref/services/syncbase/localblobstore/model.go
+++ b/x/ref/services/syncbase/localblobstore/model.go
@@ -135,6 +135,9 @@
 
 	// Root() returns the name of the root directory where the BlobStore is stored.
 	Root() string
+
+	// Close() closes the BlobStore.
+	Close() error
 }
 
 // A Location describes chunk's location within a blob.  It is returned by
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index 8862f57..5dd9cad 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -248,6 +248,9 @@
 	if err := d.St().Close(); err != nil {
 		return err
 	}
+	if err := d.BlobSt().Close(); err != nil {
+		return err
+	}
 	if err := util.DestroyStore(a.s.opts.Engine, a.rootDirForDb(dbName)); err != nil {
 		return err
 	}
diff --git a/x/ref/services/syncbase/server/interfaces/database.go b/x/ref/services/syncbase/server/interfaces/database.go
index bf518a4..32cca17 100644
--- a/x/ref/services/syncbase/server/interfaces/database.go
+++ b/x/ref/services/syncbase/server/interfaces/database.go
@@ -5,6 +5,7 @@
 package interfaces
 
 import (
+	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
@@ -16,6 +17,9 @@
 	// St returns the storage engine instance for this database.
 	St() store.Store
 
+	// BlobSt returns the blob storage engine instance for this database.
+	BlobSt() localblobstore.BlobStore
+
 	// App returns the app handle for this database.
 	App() App
 
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index bfe419a..14dc948 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -15,6 +15,8 @@
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/nosql/query_db"
 	"v.io/syncbase/v23/syncbase/nosql/query_exec"
+	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
+	"v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -42,6 +44,9 @@
 	// and do not actually open the store in NewDatabase.
 	st store.Store // stores all data for a single database
 
+	// Local blob store associated with this database.
+	bst localblobstore.BlobStore
+
 	// Active snapshots and transactions corresponding to client batches.
 	// TODO(sadovsky): Add timeouts and GC.
 	mu  sync.Mutex // protects the fields below
@@ -87,11 +92,17 @@
 	if err != nil {
 		return nil, err
 	}
+	// Open a co-located blob store, adjacent to the structured store.
+	bst, err := fs_cablobstore.Create(ctx, path.Join(opts.RootDir, "blobs"))
+	if err != nil {
+		return nil, err
+	}
 	return &database{
 		name:   name,
 		a:      a,
 		exists: true,
 		st:     st,
+		bst:    bst,
 		sns:    make(map[uint64]store.Snapshot),
 		txs:    make(map[uint64]store.Transaction),
 	}, nil
@@ -365,6 +376,13 @@
 	return d.st
 }
 
+func (d *database) BlobSt() localblobstore.BlobStore {
+	if !d.exists {
+		vlog.Fatalf("database %q does not exist", d.name)
+	}
+	return d.bst
+}
+
 func (d *database) App() interfaces.App {
 	return d.a
 }
diff --git a/x/ref/services/syncbase/server/nosql/database_bm.go b/x/ref/services/syncbase/server/nosql/database_bm.go
index 65a1413..e646c42 100644
--- a/x/ref/services/syncbase/server/nosql/database_bm.go
+++ b/x/ref/services/syncbase/server/nosql/database_bm.go
@@ -38,7 +38,7 @@
 	return sd.CommitBlob(ctx, call, br)
 }
 
-func (d *databaseReq) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (uint64, error) {
+func (d *databaseReq) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
 	if d.batchId != nil {
 		return 0, wire.NewErrBoundToBatch(ctx)
 	}
@@ -54,7 +54,7 @@
 	return sd.DeleteBlob(ctx, call, br)
 }
 
-func (d *databaseReq) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset uint64) error {
+func (d *databaseReq) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
 	if d.batchId != nil {
 		return wire.NewErrBoundToBatch(ctx)
 	}
diff --git a/x/ref/services/syncbase/vsync/blob.go b/x/ref/services/syncbase/vsync/blob.go
index 7c98be0..56a4eed 100644
--- a/x/ref/services/syncbase/vsync/blob.go
+++ b/x/ref/services/syncbase/vsync/blob.go
@@ -5,39 +5,118 @@
 package vsync
 
 import (
+	"io"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
+)
+
+const (
+	chunkSize = 8 * 1024
 )
 
 ////////////////////////////////////////////////////////////
 // RPCs for managing blobs between Syncbase and its clients.
 
 func (sd *syncDatabase) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
-	return wire.BlobRef(""), verror.NewErrNotImplemented(ctx)
+	bst := sd.db.BlobSt()
+	writer, err := bst.NewBlobWriter(ctx, "")
+	if err != nil {
+		return wire.NullBlobRef, err
+	}
+	defer writer.CloseWithoutFinalize()
+
+	name := writer.Name()
+	vlog.VI(2).Infof("sync: CreateBlob: blob ref %s", name)
+	return wire.BlobRef(name), nil
 }
 
 func (sd *syncDatabase) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
-	return verror.NewErrNotImplemented(ctx)
+	bst := sd.db.BlobSt()
+	writer, err := bst.NewBlobWriter(ctx, string(br))
+	if err != nil {
+		return err
+	}
+	defer writer.CloseWithoutFinalize()
+
+	stream := call.RecvStream()
+	for stream.Advance() {
+		item := localblobstore.BlockOrFile{Block: stream.Value()}
+		if err = writer.AppendFragment(item); err != nil {
+			return err
+		}
+	}
+	return stream.Err()
 }
 
 func (sd *syncDatabase) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
-	return verror.NewErrNotImplemented(ctx)
+	bst := sd.db.BlobSt()
+	writer, err := bst.NewBlobWriter(ctx, string(br))
+	if err != nil {
+		return err
+	}
+	return writer.Close()
 }
 
-func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (uint64, error) {
-	return 0, verror.NewErrNotImplemented(ctx)
+func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
+	bst := sd.db.BlobSt()
+	reader, err := bst.NewBlobReader(ctx, string(br))
+	if err != nil {
+		return 0, err
+	}
+	defer reader.Close()
+
+	return reader.Size(), nil
 }
 
 func (sd *syncDatabase) DeleteBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset uint64) error {
+func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
+	// First get the blob locally if available.
+	err := sd.getLocalBlob(ctx, call, br, offset)
+	if err == nil {
+		return nil
+	}
+
+	// Locate the blob on a remote blob store and fetch it.
 	return verror.NewErrNotImplemented(ctx)
 }
 
+// getLocalBlob looks for a blob in the local store and, if found, reads the
+// blob and sends it to the client.  If the blob is found, it starts reading it
+// from the given offset and sends its bytes into the client stream.
+func (sd *syncDatabase) getLocalBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
+	bst := sd.db.BlobSt()
+	reader, err := bst.NewBlobReader(ctx, string(br))
+	if err != nil {
+		return err
+	}
+	defer reader.Close()
+
+	buf := make([]byte, chunkSize)
+	stream := call.SendStream()
+
+	for {
+		nbytes, err := reader.ReadAt(buf, offset)
+		if err != nil && err != io.EOF {
+			return err
+		}
+		if err == io.EOF || nbytes <= 0 {
+			break
+		}
+		offset += int64(nbytes)
+		stream.Send(buf[:nbytes])
+	}
+
+	return nil
+}
+
 func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
 	return verror.NewErrNotImplemented(ctx)
 }
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index 9dd4e3e..b145887 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -9,10 +9,13 @@
 import (
 	"fmt"
 	"os"
+	"path"
 	"testing"
 	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
+	"v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -28,8 +31,9 @@
 // It is used to access a mock application.
 type mockService struct {
 	engine   string
-	path     string
+	dir      string
 	st       store.Store
+	bst      localblobstore.BlobStore
 	sync     *syncService
 	shutdown func()
 }
@@ -43,7 +47,7 @@
 }
 
 func (s *mockService) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
-	return &mockApp{st: s.st}, nil
+	return &mockApp{st: s.st, bst: s.bst}, nil
 }
 
 func (s *mockService) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
@@ -52,11 +56,12 @@
 
 // mockApp emulates a Syncbase App.  It is used to access a mock database.
 type mockApp struct {
-	st store.Store
+	st  store.Store
+	bst localblobstore.BlobStore
 }
 
 func (a *mockApp) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
-	return &mockDatabase{st: a.st}, nil
+	return &mockDatabase{st: a.st, bst: a.bst}, nil
 }
 
 func (a *mockApp) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
@@ -89,13 +94,18 @@
 
 // mockDatabase emulates a Syncbase Database.  It is used to test sync functionality.
 type mockDatabase struct {
-	st store.Store
+	st  store.Store
+	bst localblobstore.BlobStore
 }
 
 func (d *mockDatabase) St() store.Store {
 	return d.st
 }
 
+func (d *mockDatabase) BlobSt() localblobstore.BlobStore {
+	return d.bst
+}
+
 func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
 	return verror.NewErrNotImplemented(ctx)
 }
@@ -120,24 +130,30 @@
 func createService(t *testing.T) *mockService {
 	ctx, shutdown := test.V23Init()
 	engine := "leveldb"
-	path := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
+	opts := util.OpenOptions{CreateIfMissing: true, ErrorIfExists: false}
+	dir := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
 
-	st, err := util.OpenStore(engine, path, util.OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
+	st, err := util.OpenStore(engine, path.Join(dir, engine), opts)
 	if err != nil {
-		t.Fatalf("cannot create store %s (%s): %v", engine, path, err)
+		t.Fatalf("cannot create store %s (%s): %v", engine, dir, err)
 	}
 	st, err = watchable.Wrap(st, &watchable.Options{
 		ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
 	})
+	bst, err := fs_cablobstore.Create(ctx, path.Join(dir, "blobs"))
+	if err != nil {
+		t.Fatalf("cannot create blob store (%s): %v", dir, err)
+	}
 
 	s := &mockService{
 		st:       st,
+		bst:      bst,
 		engine:   engine,
-		path:     path,
+		dir:      dir,
 		shutdown: shutdown,
 	}
 	if s.sync, err = New(ctx, nil, s, nil); err != nil {
-		util.DestroyStore(engine, path)
+		util.DestroyStore(engine, dir)
 		t.Fatalf("cannot create sync service: %v", err)
 	}
 	return s
@@ -147,8 +163,8 @@
 func destroyService(t *testing.T, s *mockService) {
 	defer s.shutdown()
 	defer s.sync.Close()
-	if err := util.DestroyStore(s.engine, s.path); err != nil {
-		t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.path, err)
+	if err := util.DestroyStore(s.engine, s.dir); err != nil {
+		t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.dir, err)
 	}
 }