syncbase: 1st setup in hookup of blob store
Create a blob store for each app database and fill the RPC handlers for
(create, put, get, commit) of a local blob. No blob tests yet, they
will be added once this in integrated with hpucha@'s blob integration
and the blob support in the client library.
Change-Id: If8e63fd049fb327e0a43470292d692e79757889f
diff --git a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index cb8bca8..d3386a0 100644
--- a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -194,6 +194,11 @@
return fscabs, err
}
+// Close() closes the FsCaBlobStore. {
+func (fscabs *FsCaBlobStore) Close() error {
+ return fscabs.cm.Close()
+}
+
// Root() returns the name of the root directory where *fscabs is stored.
func (fscabs *FsCaBlobStore) Root() string {
return fscabs.rootName
diff --git a/services/syncbase/localblobstore/model.go b/services/syncbase/localblobstore/model.go
index 16af4cf..ba08726 100644
--- a/services/syncbase/localblobstore/model.go
+++ b/services/syncbase/localblobstore/model.go
@@ -135,6 +135,9 @@
// Root() returns the name of the root directory where the BlobStore is stored.
Root() string
+
+ // Close() closes the BlobStore.
+ Close() error
}
// A Location describes chunk's location within a blob. It is returned by
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 8862f57..5dd9cad 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -248,6 +248,9 @@
if err := d.St().Close(); err != nil {
return err
}
+ if err := d.BlobSt().Close(); err != nil {
+ return err
+ }
if err := util.DestroyStore(a.s.opts.Engine, a.rootDirForDb(dbName)); err != nil {
return err
}
diff --git a/services/syncbase/server/interfaces/database.go b/services/syncbase/server/interfaces/database.go
index bf518a4..32cca17 100644
--- a/services/syncbase/server/interfaces/database.go
+++ b/services/syncbase/server/interfaces/database.go
@@ -5,6 +5,7 @@
package interfaces
import (
+ "v.io/syncbase/x/ref/services/syncbase/localblobstore"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
@@ -16,6 +17,9 @@
// St returns the storage engine instance for this database.
St() store.Store
+ // BlobSt returns the blob storage engine instance for this database.
+ BlobSt() localblobstore.BlobStore
+
// App returns the app handle for this database.
App() App
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index bfe419a..14dc948 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -15,6 +15,8 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase/nosql/query_db"
"v.io/syncbase/v23/syncbase/nosql/query_exec"
+ "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+ "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -42,6 +44,9 @@
// and do not actually open the store in NewDatabase.
st store.Store // stores all data for a single database
+ // Local blob store associated with this database.
+ bst localblobstore.BlobStore
+
// Active snapshots and transactions corresponding to client batches.
// TODO(sadovsky): Add timeouts and GC.
mu sync.Mutex // protects the fields below
@@ -87,11 +92,17 @@
if err != nil {
return nil, err
}
+ // Open a co-located blob store, adjacent to the structured store.
+ bst, err := fs_cablobstore.Create(ctx, path.Join(opts.RootDir, "blobs"))
+ if err != nil {
+ return nil, err
+ }
return &database{
name: name,
a: a,
exists: true,
st: st,
+ bst: bst,
sns: make(map[uint64]store.Snapshot),
txs: make(map[uint64]store.Transaction),
}, nil
@@ -365,6 +376,13 @@
return d.st
}
+func (d *database) BlobSt() localblobstore.BlobStore {
+ if !d.exists {
+ vlog.Fatalf("database %q does not exist", d.name)
+ }
+ return d.bst
+}
+
func (d *database) App() interfaces.App {
return d.a
}
diff --git a/services/syncbase/server/nosql/database_bm.go b/services/syncbase/server/nosql/database_bm.go
index 65a1413..e646c42 100644
--- a/services/syncbase/server/nosql/database_bm.go
+++ b/services/syncbase/server/nosql/database_bm.go
@@ -38,7 +38,7 @@
return sd.CommitBlob(ctx, call, br)
}
-func (d *databaseReq) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (uint64, error) {
+func (d *databaseReq) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
if d.batchId != nil {
return 0, wire.NewErrBoundToBatch(ctx)
}
@@ -54,7 +54,7 @@
return sd.DeleteBlob(ctx, call, br)
}
-func (d *databaseReq) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset uint64) error {
+func (d *databaseReq) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
if d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index 7c98be0..56a4eed 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -5,39 +5,118 @@
package vsync
import (
+ "io"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/localblobstore"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
+)
+
+const (
+ chunkSize = 8 * 1024
)
////////////////////////////////////////////////////////////
// RPCs for managing blobs between Syncbase and its clients.
func (sd *syncDatabase) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
- return wire.BlobRef(""), verror.NewErrNotImplemented(ctx)
+ bst := sd.db.BlobSt()
+ writer, err := bst.NewBlobWriter(ctx, "")
+ if err != nil {
+ return wire.NullBlobRef, err
+ }
+ defer writer.CloseWithoutFinalize()
+
+ name := writer.Name()
+ vlog.VI(2).Infof("sync: CreateBlob: blob ref %s", name)
+ return wire.BlobRef(name), nil
}
func (sd *syncDatabase) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
- return verror.NewErrNotImplemented(ctx)
+ bst := sd.db.BlobSt()
+ writer, err := bst.NewBlobWriter(ctx, string(br))
+ if err != nil {
+ return err
+ }
+ defer writer.CloseWithoutFinalize()
+
+ stream := call.RecvStream()
+ for stream.Advance() {
+ item := localblobstore.BlockOrFile{Block: stream.Value()}
+ if err = writer.AppendFragment(item); err != nil {
+ return err
+ }
+ }
+ return stream.Err()
}
func (sd *syncDatabase) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
- return verror.NewErrNotImplemented(ctx)
+ bst := sd.db.BlobSt()
+ writer, err := bst.NewBlobWriter(ctx, string(br))
+ if err != nil {
+ return err
+ }
+ return writer.Close()
}
-func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (uint64, error) {
- return 0, verror.NewErrNotImplemented(ctx)
+func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
+ bst := sd.db.BlobSt()
+ reader, err := bst.NewBlobReader(ctx, string(br))
+ if err != nil {
+ return 0, err
+ }
+ defer reader.Close()
+
+ return reader.Size(), nil
}
func (sd *syncDatabase) DeleteBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
return verror.NewErrNotImplemented(ctx)
}
-func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset uint64) error {
+func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
+ // First get the blob locally if available.
+ err := sd.getLocalBlob(ctx, call, br, offset)
+ if err == nil {
+ return nil
+ }
+
+ // Locate the blob on a remote blob store and fetch it.
return verror.NewErrNotImplemented(ctx)
}
+// getLocalBlob looks for a blob in the local store and, if found, reads the
+// blob and sends it to the client. If the blob is found, it starts reading it
+// from the given offset and sends its bytes into the client stream.
+func (sd *syncDatabase) getLocalBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
+ bst := sd.db.BlobSt()
+ reader, err := bst.NewBlobReader(ctx, string(br))
+ if err != nil {
+ return err
+ }
+ defer reader.Close()
+
+ buf := make([]byte, chunkSize)
+ stream := call.SendStream()
+
+ for {
+ nbytes, err := reader.ReadAt(buf, offset)
+ if err != nil && err != io.EOF {
+ return err
+ }
+ if err == io.EOF || nbytes <= 0 {
+ break
+ }
+ offset += int64(nbytes)
+ stream.Send(buf[:nbytes])
+ }
+
+ return nil
+}
+
func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
return verror.NewErrNotImplemented(ctx)
}
diff --git a/services/syncbase/vsync/test_util.go b/services/syncbase/vsync/test_util.go
index 9dd4e3e..b145887 100644
--- a/services/syncbase/vsync/test_util.go
+++ b/services/syncbase/vsync/test_util.go
@@ -9,10 +9,13 @@
import (
"fmt"
"os"
+ "path"
"testing"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+ "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -28,8 +31,9 @@
// It is used to access a mock application.
type mockService struct {
engine string
- path string
+ dir string
st store.Store
+ bst localblobstore.BlobStore
sync *syncService
shutdown func()
}
@@ -43,7 +47,7 @@
}
func (s *mockService) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
- return &mockApp{st: s.st}, nil
+ return &mockApp{st: s.st, bst: s.bst}, nil
}
func (s *mockService) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
@@ -52,11 +56,12 @@
// mockApp emulates a Syncbase App. It is used to access a mock database.
type mockApp struct {
- st store.Store
+ st store.Store
+ bst localblobstore.BlobStore
}
func (a *mockApp) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
- return &mockDatabase{st: a.st}, nil
+ return &mockDatabase{st: a.st, bst: a.bst}, nil
}
func (a *mockApp) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
@@ -89,13 +94,18 @@
// mockDatabase emulates a Syncbase Database. It is used to test sync functionality.
type mockDatabase struct {
- st store.Store
+ st store.Store
+ bst localblobstore.BlobStore
}
func (d *mockDatabase) St() store.Store {
return d.st
}
+func (d *mockDatabase) BlobSt() localblobstore.BlobStore {
+ return d.bst
+}
+
func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
return verror.NewErrNotImplemented(ctx)
}
@@ -120,24 +130,30 @@
func createService(t *testing.T) *mockService {
ctx, shutdown := test.V23Init()
engine := "leveldb"
- path := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
+ opts := util.OpenOptions{CreateIfMissing: true, ErrorIfExists: false}
+ dir := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
- st, err := util.OpenStore(engine, path, util.OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
+ st, err := util.OpenStore(engine, path.Join(dir, engine), opts)
if err != nil {
- t.Fatalf("cannot create store %s (%s): %v", engine, path, err)
+ t.Fatalf("cannot create store %s (%s): %v", engine, dir, err)
}
st, err = watchable.Wrap(st, &watchable.Options{
ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
})
+ bst, err := fs_cablobstore.Create(ctx, path.Join(dir, "blobs"))
+ if err != nil {
+ t.Fatalf("cannot create blob store (%s): %v", dir, err)
+ }
s := &mockService{
st: st,
+ bst: bst,
engine: engine,
- path: path,
+ dir: dir,
shutdown: shutdown,
}
if s.sync, err = New(ctx, nil, s, nil); err != nil {
- util.DestroyStore(engine, path)
+ util.DestroyStore(engine, dir)
t.Fatalf("cannot create sync service: %v", err)
}
return s
@@ -147,8 +163,8 @@
func destroyService(t *testing.T, s *mockService) {
defer s.shutdown()
defer s.sync.Close()
- if err := util.DestroyStore(s.engine, s.path); err != nil {
- t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.path, err)
+ if err := util.DestroyStore(s.engine, s.dir); err != nil {
+ t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.dir, err)
}
}