syncbase/vsync: Preliminary blob discovery and support for fetching blobs
remotely as a whole (doesn't support incremental blob transfer yet).

Change-Id: I935eb693414f4c6c7820180c9ec1c7173a8c4ac1
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 0f56ba3..d23c9c4 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -264,7 +264,7 @@
 	// fetched before the lower priority ones. However an ongoing blob
 	// transfer is not interrupted. Status updates are streamed back to the
 	// client as fetch is in progress.
-	FetchBlob(br BlobRef, priority uint64) stream<_, FetchStatus> error {access.Read}
+	FetchBlob(br BlobRef, priority uint64) stream<_, BlobFetchStatus> error {access.Read}
 
 	// PinBlob locally pins the blob so that it is not evicted.
 	PinBlob(br BlobRef) error {access.Write}
@@ -322,4 +322,5 @@
 	ReadOnlyBatch() {"en": "batch is read-only"}
 	ConcurrentBatch() {"en": "concurrent batch"}
 	SchemaVersionMismatch() {"en": "actual schema version does not match the provided one"}
+	BlobNotCommitted() {"en": "blob is not yet committed"}
 )
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 624036c..7318ff2 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -30,6 +30,7 @@
 	ErrReadOnlyBatch         = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ReadOnlyBatch", verror.NoRetry, "{1:}{2:} batch is read-only")
 	ErrConcurrentBatch       = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ConcurrentBatch", verror.NoRetry, "{1:}{2:} concurrent batch")
 	ErrSchemaVersionMismatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.SchemaVersionMismatch", verror.NoRetry, "{1:}{2:} actual schema version does not match the provided one")
+	ErrBlobNotCommitted      = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.BlobNotCommitted", verror.NoRetry, "{1:}{2:} blob is not yet committed")
 )
 
 func init() {
@@ -38,6 +39,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrReadOnlyBatch.ID), "{1:}{2:} batch is read-only")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConcurrentBatch.ID), "{1:}{2:} concurrent batch")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrSchemaVersionMismatch.ID), "{1:}{2:} actual schema version does not match the provided one")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBlobNotCommitted.ID), "{1:}{2:} blob is not yet committed")
 }
 
 // NewErrBoundToBatch returns an error with the ErrBoundToBatch ID.
@@ -65,6 +67,11 @@
 	return verror.New(ErrSchemaVersionMismatch, ctx)
 }
 
+// NewErrBlobNotCommitted returns an error with the ErrBlobNotCommitted ID.
+func NewErrBlobNotCommitted(ctx *context.T) error {
+	return verror.New(ErrBlobNotCommitted, ctx)
+}
+
 // DatabaseWatcherClientMethods is the client interface
 // containing DatabaseWatcher methods.
 //
@@ -840,7 +847,7 @@
 		Advance() bool
 		// Value returns the item that was staged by Advance.  May panic if Advance
 		// returned false or was not called.  Never blocks.
-		Value() FetchStatus
+		Value() BlobFetchStatus
 		// Err returns any error encountered by Advance.  Never blocks.
 		Err() error
 	}
@@ -864,13 +871,13 @@
 
 type implBlobManagerFetchBlobClientCall struct {
 	rpc.ClientCall
-	valRecv FetchStatus
+	valRecv BlobFetchStatus
 	errRecv error
 }
 
 func (c *implBlobManagerFetchBlobClientCall) RecvStream() interface {
 	Advance() bool
-	Value() FetchStatus
+	Value() BlobFetchStatus
 	Err() error
 } {
 	return implBlobManagerFetchBlobClientCallRecv{c}
@@ -881,11 +888,11 @@
 }
 
 func (c implBlobManagerFetchBlobClientCallRecv) Advance() bool {
-	c.c.valRecv = FetchStatus{}
+	c.c.valRecv = BlobFetchStatus{}
 	c.c.errRecv = c.c.Recv(&c.c.valRecv)
 	return c.c.errRecv == nil
 }
-func (c implBlobManagerFetchBlobClientCallRecv) Value() FetchStatus {
+func (c implBlobManagerFetchBlobClientCallRecv) Value() BlobFetchStatus {
 	return c.c.valRecv
 }
 func (c implBlobManagerFetchBlobClientCallRecv) Err() error {
@@ -1260,7 +1267,7 @@
 		// Send places the item onto the output stream.  Returns errors encountered
 		// while sending.  Blocks if there is no buffer space; will unblock when
 		// buffer space is available.
-		Send(item FetchStatus) error
+		Send(item BlobFetchStatus) error
 	}
 }
 
@@ -1283,7 +1290,7 @@
 
 // SendStream returns the send side of the BlobManager.FetchBlob server stream.
 func (s *BlobManagerFetchBlobServerCallStub) SendStream() interface {
-	Send(item FetchStatus) error
+	Send(item BlobFetchStatus) error
 } {
 	return implBlobManagerFetchBlobServerCallSend{s}
 }
@@ -1292,7 +1299,7 @@
 	s *BlobManagerFetchBlobServerCallStub
 }
 
-func (s implBlobManagerFetchBlobServerCallSend) Send(item FetchStatus) error {
+func (s implBlobManagerFetchBlobServerCallSend) Send(item BlobFetchStatus) error {
 	return s.s.Send(item)
 }
 
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index 37fd383..eae8e9d 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -156,19 +156,19 @@
 	NullBlobRef = BlobRef("")
 )
 
-// FetchState represents the state transitions of a blob fetch.
-type FetchState enum {
+// BlobFetchState represents the state transitions of a blob fetch.
+type BlobFetchState enum {
 	Pending // Fetch request is queued.
 	Locating // Blob discovery is in progress to find a source for the blob.
 	Fetching // Blob transfer is in progress.
 	Done // Blob is locally cached.
 }
 
-// FetchStatus describes the progress of an asynchronous blob fetch.
-type FetchStatus struct {
-	State FetchState // State of the blob fetch request.
-	Received uint64 // Total number of bytes received.
-	Total uint64 // Blob size.
+// BlobFetchStatus describes the progress of an asynchronous blob fetch.
+type BlobFetchStatus struct {
+	State BlobFetchState // State of the blob fetch request.
+	Received int64 // Total number of bytes received.
+	Total int64 // Blob size.
 }
 
 // StoreChange is the new value for a watched entity.
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index ac27aad..adb4e66 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -224,75 +224,75 @@
 }) {
 }
 
-// FetchState represents the state transitions of a blob fetch.
-type FetchState int
+// BlobFetchState represents the state transitions of a blob fetch.
+type BlobFetchState int
 
 const (
-	FetchStatePending FetchState = iota
-	FetchStateLocating
-	FetchStateFetching
-	FetchStateDone
+	BlobFetchStatePending BlobFetchState = iota
+	BlobFetchStateLocating
+	BlobFetchStateFetching
+	BlobFetchStateDone
 )
 
-// FetchStateAll holds all labels for FetchState.
-var FetchStateAll = [...]FetchState{FetchStatePending, FetchStateLocating, FetchStateFetching, FetchStateDone}
+// BlobFetchStateAll holds all labels for BlobFetchState.
+var BlobFetchStateAll = [...]BlobFetchState{BlobFetchStatePending, BlobFetchStateLocating, BlobFetchStateFetching, BlobFetchStateDone}
 
-// FetchStateFromString creates a FetchState from a string label.
-func FetchStateFromString(label string) (x FetchState, err error) {
+// BlobFetchStateFromString creates a BlobFetchState from a string label.
+func BlobFetchStateFromString(label string) (x BlobFetchState, err error) {
 	err = x.Set(label)
 	return
 }
 
 // Set assigns label to x.
-func (x *FetchState) Set(label string) error {
+func (x *BlobFetchState) Set(label string) error {
 	switch label {
 	case "Pending", "pending":
-		*x = FetchStatePending
+		*x = BlobFetchStatePending
 		return nil
 	case "Locating", "locating":
-		*x = FetchStateLocating
+		*x = BlobFetchStateLocating
 		return nil
 	case "Fetching", "fetching":
-		*x = FetchStateFetching
+		*x = BlobFetchStateFetching
 		return nil
 	case "Done", "done":
-		*x = FetchStateDone
+		*x = BlobFetchStateDone
 		return nil
 	}
 	*x = -1
-	return fmt.Errorf("unknown label %q in nosql.FetchState", label)
+	return fmt.Errorf("unknown label %q in nosql.BlobFetchState", label)
 }
 
 // String returns the string label of x.
-func (x FetchState) String() string {
+func (x BlobFetchState) String() string {
 	switch x {
-	case FetchStatePending:
+	case BlobFetchStatePending:
 		return "Pending"
-	case FetchStateLocating:
+	case BlobFetchStateLocating:
 		return "Locating"
-	case FetchStateFetching:
+	case BlobFetchStateFetching:
 		return "Fetching"
-	case FetchStateDone:
+	case BlobFetchStateDone:
 		return "Done"
 	}
 	return ""
 }
 
-func (FetchState) __VDLReflect(struct {
-	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.FetchState"`
+func (BlobFetchState) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BlobFetchState"`
 	Enum struct{ Pending, Locating, Fetching, Done string }
 }) {
 }
 
-// FetchStatus describes the progress of an asynchronous blob fetch.
-type FetchStatus struct {
-	State    FetchState // State of the blob fetch request.
-	Received uint64     // Total number of bytes received.
-	Total    uint64     // Blob size.
+// BlobFetchStatus describes the progress of an asynchronous blob fetch.
+type BlobFetchStatus struct {
+	State    BlobFetchState // State of the blob fetch request.
+	Received int64          // Total number of bytes received.
+	Total    int64          // Blob size.
 }
 
-func (FetchStatus) __VDLReflect(struct {
-	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.FetchStatus"`
+func (BlobFetchStatus) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BlobFetchStatus"`
 }) {
 }
 
@@ -323,8 +323,8 @@
 	vdl.Register((*CrPolicy)(nil))
 	vdl.Register((*CrRule)(nil))
 	vdl.Register((*BlobRef)(nil))
-	vdl.Register((*FetchState)(nil))
-	vdl.Register((*FetchStatus)(nil))
+	vdl.Register((*BlobFetchState)(nil))
+	vdl.Register((*BlobFetchStatus)(nil))
 	vdl.Register((*StoreChange)(nil))
 }
 
diff --git a/v23/syncbase/nosql/blob.go b/v23/syncbase/nosql/blob.go
new file mode 100644
index 0000000..37cddc4
--- /dev/null
+++ b/v23/syncbase/nosql/blob.go
@@ -0,0 +1,294 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	"sync"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+var _ Blob = (*blob)(nil)
+
+type blob struct {
+	c  wire.DatabaseClientMethods
+	br wire.BlobRef
+}
+
+func newBlob(dbName string, br wire.BlobRef) Blob {
+	return &blob{
+		c:  wire.DatabaseClient(dbName),
+		br: br,
+	}
+}
+
+func createBlob(ctx *context.T, dbName string) (Blob, error) {
+	b := &blob{
+		c: wire.DatabaseClient(dbName),
+	}
+	var err error
+	b.br, err = b.c.CreateBlob(ctx)
+	return b, err
+}
+
+// Ref implements Blob.Ref.
+func (b *blob) Ref() wire.BlobRef {
+	return b.br
+}
+
+// Put implements Blob.Put.
+func (b *blob) Put(ctx *context.T) (BlobWriter, error) {
+	call, err := b.c.PutBlob(ctx, b.br)
+	if err != nil {
+		return nil, err
+	}
+	return newBlobWriter(call), nil
+}
+
+// Commit implements Blob.Commit.
+func (b *blob) Commit(ctx *context.T) error {
+	return b.c.CommitBlob(ctx, b.br)
+}
+
+// Size implements Blob.Size.
+func (b *blob) Size(ctx *context.T) (int64, error) {
+	return b.c.GetBlobSize(ctx, b.br)
+}
+
+// Delete implements Blob.Delete.
+func (b *blob) Delete(ctx *context.T) error {
+	return b.c.DeleteBlob(ctx, b.br)
+}
+
+// Get implements Blob.Get.
+func (b *blob) Get(ctx *context.T, offset int64) (BlobReader, error) {
+	ctx, cancel := context.WithCancel(ctx)
+	call, err := b.c.GetBlob(ctx, b.br, offset)
+	if err != nil {
+		cancel()
+		return nil, err
+	}
+	return newBlobReader(cancel, call), nil
+}
+
+// Fetch implements Blob.Fetch.
+func (b *blob) Fetch(ctx *context.T, priority uint64) (BlobStatus, error) {
+	ctx, cancel := context.WithCancel(ctx)
+	call, err := b.c.FetchBlob(ctx, b.br, priority)
+	if err != nil {
+		cancel()
+		return nil, err
+	}
+	return newBlobStatus(cancel, call), nil
+}
+
+// Pin implements Blob.Pin.
+func (b *blob) Pin(ctx *context.T) error {
+	return b.c.PinBlob(ctx, b.br)
+}
+
+// Unpin implements Blob.Unpin.
+func (b *blob) Unpin(ctx *context.T) error {
+	return b.c.UnpinBlob(ctx, b.br)
+}
+
+// Keep implements Blob.Keep.
+func (b *blob) Keep(ctx *context.T, rank uint64) error {
+	return b.c.KeepBlob(ctx, b.br, rank)
+}
+
+////////////////////////////////////////
+// BlobWriter methods.
+
+type blobWriter struct {
+	mu sync.Mutex
+	// call is the RPC stream object.
+	call wire.BlobManagerPutBlobClientCall
+	// finished records whether we have called call.Finish().
+	finished bool
+}
+
+var _ BlobWriter = (*blobWriter)(nil)
+
+func newBlobWriter(call wire.BlobManagerPutBlobClientCall) *blobWriter {
+	return &blobWriter{
+		call: call,
+	}
+}
+
+// Send implements BlobWriter.Send.
+func (bw *blobWriter) Send(buf []byte) error {
+	return bw.call.SendStream().Send(buf)
+}
+
+// Close implements BlobWriter.Close.
+func (bw *blobWriter) Close() error {
+	bw.mu.Lock()
+	defer bw.mu.Unlock()
+	if !bw.finished {
+		// No need to call Close explicitly. Finish will take care of
+		// that.
+		bw.finished = true
+		return bw.call.Finish()
+	}
+	return nil
+}
+
+////////////////////////////////////////
+// BlobReader methods.
+// (similar to methods in stream.go).
+
+type blobReader struct {
+	mu sync.Mutex
+	// cancel cancels the RPC stream.
+	cancel context.CancelFunc
+	// call is the RPC stream object.
+	call wire.BlobManagerGetBlobClientCall
+	// curr is the currently staged bytes, or nil if nothing is staged.
+	curr []byte
+	// err is the first error encountered during streaming. It may also be
+	// populated by a call to Cancel.
+	err error
+	// finished records whether we have called call.Finish().
+	finished bool
+}
+
+var _ BlobReader = (*blobReader)(nil)
+
+func newBlobReader(cancel context.CancelFunc, call wire.BlobManagerGetBlobClientCall) *blobReader {
+	return &blobReader{
+		cancel: cancel,
+		call:   call,
+	}
+}
+
+// Advance implements BlobReader.Advance.
+func (br *blobReader) Advance() bool {
+	br.mu.Lock()
+	defer br.mu.Unlock()
+	if br.err != nil || br.finished {
+		return false
+	}
+	if br.call.RecvStream().Advance() {
+		br.curr = br.call.RecvStream().Value()
+		return true
+	}
+
+	br.err = br.call.RecvStream().Err()
+	err := br.call.Finish()
+	if br.err == nil {
+		br.err = err
+	}
+	br.cancel()
+	br.finished = true
+	return false
+}
+
+// Value implements BlobReader.Value.
+func (br *blobReader) Value() []byte {
+	br.mu.Lock()
+	defer br.mu.Unlock()
+	if br.curr == nil {
+		panic("nothing staged")
+	}
+	return br.curr
+}
+
+// Err implements BlobReader.Err.
+func (br *blobReader) Err() error {
+	br.mu.Lock()
+	defer br.mu.Unlock()
+	return br.err
+}
+
+// Cancel implements BlobReader.Cancel.
+// TODO(hpucha): Make Cancel non-blocking. Copied from stream.go
+func (br *blobReader) Cancel() {
+	br.mu.Lock()
+	defer br.mu.Unlock()
+	br.cancel()
+	br.call.Finish()
+	br.err = verror.New(verror.ErrCanceled, nil)
+}
+
+////////////////////////////////////////
+// BlobStatus methods.
+// (similar to methods in stream.go).
+
+type blobStatus struct {
+	mu sync.Mutex
+	// cancel cancels the RPC stream.
+	cancel context.CancelFunc
+	// call is the RPC stream object.
+	call wire.BlobManagerFetchBlobClientCall
+	// curr is the currently staged item, or nil if nothing is staged.
+	curr *wire.BlobFetchStatus
+	// err is the first error encountered during streaming. It may also be
+	// populated by a call to Cancel.
+	err error
+	// finished records whether we have called call.Finish().
+	finished bool
+}
+
+var _ BlobStatus = (*blobStatus)(nil)
+
+func newBlobStatus(cancel context.CancelFunc, call wire.BlobManagerFetchBlobClientCall) *blobStatus {
+	return &blobStatus{
+		cancel: cancel,
+		call:   call,
+	}
+}
+
+// Advance implements BlobStatus.Advance.
+func (bs *blobStatus) Advance() bool {
+	bs.mu.Lock()
+	defer bs.mu.Unlock()
+	if bs.err != nil || bs.finished {
+		return false
+	}
+	if bs.call.RecvStream().Advance() {
+		val := bs.call.RecvStream().Value()
+		bs.curr = &val
+		return true
+	}
+
+	bs.err = bs.call.RecvStream().Err()
+	err := bs.call.Finish()
+	if bs.err == nil {
+		bs.err = err
+	}
+	bs.cancel()
+	bs.finished = true
+	return false
+}
+
+// Value implements BlobStatus.Value.
+func (bs *blobStatus) Value() wire.BlobFetchStatus {
+	bs.mu.Lock()
+	defer bs.mu.Unlock()
+	if bs.curr == nil {
+		panic("nothing staged")
+	}
+	return *bs.curr
+}
+
+// Err implements BlobStatus.Err.
+func (bs *blobStatus) Err() error {
+	bs.mu.Lock()
+	defer bs.mu.Unlock()
+	return bs.err
+}
+
+// Cancel implements BlobStatus.Cancel.
+// TODO(hpucha): Make Cancel non-blocking. Copied from stream.go
+func (bs *blobStatus) Cancel() {
+	bs.mu.Lock()
+	defer bs.mu.Unlock()
+	bs.cancel()
+	bs.call.Finish()
+	bs.err = verror.New(verror.ErrCanceled, nil)
+}
diff --git a/v23/syncbase/nosql/blob_test.go b/v23/syncbase/nosql/blob_test.go
new file mode 100644
index 0000000..17cd2a8
--- /dev/null
+++ b/v23/syncbase/nosql/blob_test.go
@@ -0,0 +1,135 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql_test
+
+import (
+	"reflect"
+	"testing"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase"
+	"v.io/syncbase/v23/syncbase/nosql"
+	tu "v.io/syncbase/v23/syncbase/testutil"
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+// Tests local blob get following a put.
+func TestLocalBlobPutGet(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(perms("root/client"))
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+
+	b, err := d.CreateBlob(ctx)
+	if err != nil {
+		t.Fatalf("Creating local blob failed, err %v", err)
+	}
+
+	// Verify that getting/fetching an uncommitted blob should fail.
+	br, err := b.Get(ctx, 0)
+	if err != nil {
+		t.Fatalf("GetBlob RPC failed, err %v", err)
+	}
+	if br.Advance() {
+		t.Fatalf("Getting uncommitted blob didn't fail")
+	}
+	if verror.ErrorID(br.Err()) != wire.ErrBlobNotCommitted.ID {
+		t.Fatalf("Getting uncommitted blob didn't fail, err %v", err)
+	}
+
+	bs, err := b.Fetch(ctx, 100)
+	if err != nil {
+		t.Fatalf("FetchBlob RPC failed, err %v", err)
+	}
+	if bs.Advance() {
+		t.Fatalf("Fetching uncommitted blob didn't fail")
+	}
+	if verror.ErrorID(bs.Err()) != wire.ErrBlobNotCommitted.ID {
+		t.Fatalf("Fetching uncommitted blob didn't fail, err %v", err)
+	}
+
+	verifyBlobSize(t, ctx, b, 0)
+
+	// Put some data in the blob.
+	bw, err := b.Put(ctx)
+	if err != nil {
+		t.Fatalf("PutBlob RPC failed, err %v", err)
+	}
+	wantVal := "foobarbaz"
+	wantSize := int64(len(wantVal))
+	if err := bw.Send([]byte(wantVal)); err != nil {
+		t.Fatalf("Sending blob data failed, err %v", err)
+	}
+
+	// Before the put is completed, the blob size can be anything between 0
+	// and wantSize since the blob size is obtained from the persistent
+	// storage before closing. Note: if Fatalf is used when there is an
+	// error, the test hangs. Not sure why but looks like it is because
+	// there is still a pending goroutine spawned for the Put rpc.
+	gotSize, err := b.Size(ctx)
+	if err != nil || gotSize > wantSize {
+		t.Errorf("Getting blob size failed, got %v, want %v, err %v", gotSize, wantSize, err)
+	}
+
+	if err := bw.Close(); err != nil {
+		t.Fatalf("Closing blob writer failed, err %v", err)
+	}
+
+	verifyBlobSize(t, ctx, b, wantSize)
+
+	// Commit the blob.
+	if err := b.Commit(ctx); err != nil {
+		t.Fatalf("Committing a blob failed, err %v", err)
+	}
+
+	// Verify getting/fetching the committed blob.
+	verifyBlobSize(t, ctx, b, wantSize)
+	verifyGetBlob(t, ctx, b, 0, wantVal)
+	verifyGetBlob(t, ctx, b, 3, "barbaz")
+
+	bs, err = b.Fetch(ctx, 100)
+	if err != nil {
+		t.Fatalf("FetchBlob RPC failed, err %v", err)
+	}
+	wantStatus := wire.BlobFetchStatus{State: wire.BlobFetchStateDone}
+	var gotStatus wire.BlobFetchStatus
+	for bs.Advance() {
+		gotStatus = bs.Value()
+	}
+	if bs.Err() != nil {
+		t.Fatalf("Fetching a blob failed, err %v", br.Err())
+	}
+	if !reflect.DeepEqual(gotStatus, wantStatus) {
+		t.Fatalf("Fetching a blob failed, got %v want %v", gotStatus, wantStatus)
+	}
+}
+
+///////////////////
+// Helpers.
+
+func verifyBlobSize(t *testing.T, ctx *context.T, b nosql.Blob, wantSize int64) {
+	gotSize, err := b.Size(ctx)
+	if err != nil || gotSize != wantSize {
+		t.Fatalf("Getting blob size failed, got %v, want %v, err %v", gotSize, wantSize, err)
+	}
+}
+
+func verifyGetBlob(t *testing.T, ctx *context.T, b nosql.Blob, offset int64, wantVal string) {
+	br, err := b.Get(ctx, offset)
+	if err != nil {
+		t.Fatalf("GetBlob RPC failed, err %v", err)
+	}
+	var gotVal []byte
+	for br.Advance() {
+		gotVal = append(gotVal, br.Value()...)
+	}
+	if br.Err() != nil {
+		t.Fatalf("Getting a blob failed, err %v", br.Err())
+	}
+	if !reflect.DeepEqual(gotVal, []byte(wantVal)) {
+		t.Fatalf("Getting a blob failed, got %v want %v", gotVal, []byte(wantVal))
+	}
+}
diff --git a/v23/syncbase/nosql/blob_v23_test.go b/v23/syncbase/nosql/blob_v23_test.go
new file mode 100644
index 0000000..3abed9c
--- /dev/null
+++ b/v23/syncbase/nosql/blob_v23_test.go
@@ -0,0 +1,371 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql_test
+
+import (
+	"crypto/md5"
+	"crypto/rand"
+	"fmt"
+	"reflect"
+	"strconv"
+	"time"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase"
+	tu "v.io/syncbase/v23/syncbase/testutil"
+	constants "v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/v23"
+	"v.io/v23/naming"
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/ref/test/modules"
+	"v.io/x/ref/test/v23tests"
+)
+
+//go:generate v23 test generate
+
+func V23TestSyncbasedWholeBlobTransfer(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+	defer cleanSync1()
+
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
+	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+
+	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
+
+	// FetchBlob first.
+	tu.RunClient(t, client0Creds, runGenerateBlob, "sync0", "foo", "0", "foobarbaz")
+	tu.RunClient(t, client1Creds, runFetchBlob, "sync1", "foo", "0", "9", "false")
+	tu.RunClient(t, client1Creds, runGetBlob, "sync1", "foo", "0", "foobarbaz", "0")
+
+	// GetBlob directly.
+	tu.RunClient(t, client1Creds, runGenerateBlob, "sync1", "foo", "0", "abcdefghijklmn")
+	tu.RunClient(t, client0Creds, runGetBlob, "sync0", "foo", "0", "fghijklmn", "5")
+	tu.RunClient(t, client0Creds, runFetchBlob, "sync0", "foo", "0", "14", "true")
+
+	// Test with a big blob (1 MB).
+	tu.RunClient(t, client0Creds, runGenerateBigBlob, "sync0", "foo", "1")
+	tu.RunClient(t, client1Creds, runGetBigBlob, "sync1", "foo", "1")
+}
+
+////////////////////////////////////
+// Helpers.
+
+type testStruct struct {
+	Val  string
+	Blob wire.BlobRef
+}
+
+// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
+// actual key), 3: blob data.
+var runGenerateBlob = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+
+	b, err := d.CreateBlob(ctx)
+	if err != nil {
+		return fmt.Errorf("CreateBlob failed, err %v\n", err)
+	}
+	bw, err := b.Put(ctx)
+	if err != nil {
+		return fmt.Errorf("PutBlob RPC failed, err %v\n", err)
+	}
+
+	data := args[3]
+	if err := bw.Send([]byte(data)); err != nil {
+		return fmt.Errorf("Sending blob data failed, err %v\n", err)
+	}
+	if err := bw.Close(); err != nil {
+		return fmt.Errorf("Closing blob writer failed, err %v\n", err)
+	}
+
+	// Commit the blob.
+	if err := b.Commit(ctx); err != nil {
+		return fmt.Errorf("Committing a blob failed, err %v\n", err)
+	}
+
+	// Put the BlobRef in a key.
+	tb := d.Table("tb")
+	pos, _ := strconv.ParseUint(args[2], 10, 64)
+
+	key := fmt.Sprintf("%s%d", args[1], pos)
+	r := tb.Row(key)
+	s := testStruct{Val: "testkey" + key, Blob: b.Ref()}
+	if err := r.Put(ctx, s); err != nil {
+		return fmt.Errorf("r.Put() failed: %v\n", err)
+	}
+
+	return nil
+}, "runGenerateBlob")
+
+// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
+// actual key), 3: blob size, 4: skip incremental status checking.
+var runFetchBlob = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+
+	tb := d.Table("tb")
+	pos, _ := strconv.ParseUint(args[2], 10, 64)
+
+	key := fmt.Sprintf("%s%d", args[1], pos)
+	r := tb.Row(key)
+	var s testStruct
+
+	// Try for 10 seconds to get the new value.
+	var err error
+	for i := 0; i < 10; i++ {
+		// Note: the error is a decode error since the old value is a
+		// string, and the new value is testStruct.
+		if err = r.Get(ctx, &s); err == nil {
+			break
+		}
+		time.Sleep(1 * time.Second)
+	}
+
+	if err != nil {
+		return fmt.Errorf("r.Get() failed: %v\n", err)
+	}
+
+	b := d.Blob(s.Blob)
+	bs, err := b.Fetch(ctx, 100)
+	if err != nil {
+		return fmt.Errorf("Fetch RPC failed, err %v\n", err)
+	}
+
+	status := []wire.BlobFetchStatus{
+		wire.BlobFetchStatus{State: wire.BlobFetchStatePending},
+		wire.BlobFetchStatus{State: wire.BlobFetchStateLocating},
+		wire.BlobFetchStatus{State: wire.BlobFetchStateFetching},
+		wire.BlobFetchStatus{State: wire.BlobFetchStateDone}}
+
+	skipIncStatus, _ := strconv.ParseBool(args[4])
+
+	var gotStatus wire.BlobFetchStatus
+	i := 0
+	for bs.Advance() {
+		gotStatus = bs.Value()
+
+		if !skipIncStatus {
+			if i <= 1 {
+				if !reflect.DeepEqual(gotStatus, status[i]) {
+					return fmt.Errorf("Fetch blob failed, got status %v want status %v\n", gotStatus, status[i])
+				}
+				i++
+			} else if !(gotStatus.State == status[2].State || reflect.DeepEqual(gotStatus, status[3])) {
+				return fmt.Errorf("Fetch blob failed, got status %v\n", gotStatus)
+			}
+		}
+	}
+
+	if !reflect.DeepEqual(gotStatus, status[3]) {
+		return fmt.Errorf("Fetch blob failed, got status %v want status %v\n", gotStatus, status[3])
+	}
+
+	if bs.Err() != nil {
+		return fmt.Errorf("Fetch blob failed, err %v\n", err)
+	}
+
+	wantSize, _ := strconv.ParseInt(args[3], 10, 64)
+	gotSize, err := b.Size(ctx)
+	if err != nil || gotSize != wantSize {
+		return fmt.Errorf("Blob size incorrect, got %v want %v, err %v\n", gotSize, wantSize, err)
+	}
+
+	return nil
+}, "runFetchBlob")
+
+// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
+// actual key), 3: expected blob data, 4: offset for get.
+var runGetBlob = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+
+	tb := d.Table("tb")
+	pos, _ := strconv.ParseUint(args[2], 10, 64)
+
+	key := fmt.Sprintf("%s%d", args[1], pos)
+	r := tb.Row(key)
+	var s testStruct
+
+	// Try for 10 seconds to get the new value.
+	var err error
+	for i := 0; i < 10; i++ {
+		// Note: the error is a decode error since the old value is a
+		// string, and the new value is testStruct.
+		if err = r.Get(ctx, &s); err == nil {
+			break
+		}
+		time.Sleep(1 * time.Second)
+	}
+
+	if err != nil {
+		return fmt.Errorf("r.Get() failed: %v\n", err)
+	}
+
+	b := d.Blob(s.Blob)
+	offset, _ := strconv.ParseInt(args[4], 10, 64)
+	br, err := b.Get(ctx, offset)
+	if err != nil {
+		return fmt.Errorf("GetBlob RPC failed, err %v\n", err)
+	}
+	var gotVal []byte
+	for br.Advance() {
+		gotVal = append(gotVal, br.Value()...)
+	}
+	if br.Err() != nil {
+		return fmt.Errorf("Getting a blob failed, err %v\n", br.Err())
+	}
+	if !reflect.DeepEqual(gotVal, []byte(args[3])) {
+		return fmt.Errorf("Getting a blob failed, got %v want %v\n", gotVal, []byte(args[3]))
+	}
+
+	return nil
+}, "runGetBlob")
+
+// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
+// actual key).
+var runGenerateBigBlob = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+
+	b, err := d.CreateBlob(ctx)
+	if err != nil {
+		return fmt.Errorf("CreateBlob failed, err %v\n", err)
+	}
+	bw, err := b.Put(ctx)
+	if err != nil {
+		return fmt.Errorf("PutBlob RPC failed, err %v\n", err)
+	}
+
+	hasher := md5.New()
+
+	chunkSize := 8192
+	content := make([]byte, chunkSize)
+	// Send 1 MB blob.
+	for i := 0; i < 128; i++ {
+		if n, err := rand.Read(content); err != nil || n != chunkSize {
+			return fmt.Errorf("Creating blob data failed, n %v err %v\n", n, err)
+		}
+		if err := bw.Send(content); err != nil {
+			return fmt.Errorf("Sending blob data failed, err %v\n", err)
+		}
+		hasher.Write(content)
+	}
+	if err := bw.Close(); err != nil {
+		return fmt.Errorf("Closing blob writer failed, err %v\n", err)
+	}
+
+	// Commit the blob.
+	if err := b.Commit(ctx); err != nil {
+		return fmt.Errorf("Committing a blob failed, err %v\n", err)
+	}
+
+	// Put the BlobRef in a key.
+	tb := d.Table("tb")
+	pos, _ := strconv.ParseUint(args[2], 10, 64)
+
+	key := fmt.Sprintf("%s%d", args[1], pos)
+	r := tb.Row(key)
+
+	// Blob hash is transferred via structured store.
+	s := testStruct{Val: hashToString(hasher.Sum(nil)), Blob: b.Ref()}
+
+	if err := r.Put(ctx, s); err != nil {
+		return fmt.Errorf("r.Put() failed: %v\n", err)
+	}
+
+	return nil
+}, "runGenerateBigBlob")
+
+// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
+// actual key).
+var runGetBigBlob = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+
+	tb := d.Table("tb")
+	pos, _ := strconv.ParseUint(args[2], 10, 64)
+
+	key := fmt.Sprintf("%s%d", args[1], pos)
+	r := tb.Row(key)
+	var s testStruct
+
+	// Try for 10 seconds to get the new value.
+	var err error
+	for i := 0; i < 10; i++ {
+		// Note: the error is a decode error since the old value is a
+		// string, and the new value is testStruct.
+		if err = r.Get(ctx, &s); err == nil {
+			break
+		}
+		time.Sleep(1 * time.Second)
+	}
+
+	if err != nil {
+		return fmt.Errorf("r.Get() failed: %v\n", err)
+	}
+
+	b := d.Blob(s.Blob)
+	br, err := b.Get(ctx, 0)
+	if err != nil {
+		return fmt.Errorf("GetBlob RPC failed, err %v\n", err)
+	}
+	hasher := md5.New()
+	for br.Advance() {
+		content := br.Value()
+		hasher.Write(content)
+	}
+	if br.Err() != nil {
+		return fmt.Errorf("Getting a blob failed, err %v\n", br.Err())
+	}
+
+	gotHash := hashToString(hasher.Sum(nil))
+	if !reflect.DeepEqual(gotHash, s.Val) {
+		return fmt.Errorf("Getting a blob failed, got %v want %v\n", gotHash, s.Val)
+	}
+
+	return nil
+}, "runGetBigBlob")
+
+// Copied from localblobstore/fs_cablobstore/fs_cablobstore.go.
+//
+// hashToString() returns a string representation of the hash.
+// Requires len(hash)==16.  An md5 hash is suitable.
+func hashToString(hash []byte) string {
+	return fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+		hash[0], hash[1], hash[2], hash[3],
+		hash[4], hash[5], hash[6], hash[7],
+		hash[8], hash[9], hash[10], hash[11],
+		hash[12], hash[13], hash[14], hash[15])
+}
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index 5a23295..5e8159d 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -139,6 +139,16 @@
 	return d.c.GetSyncGroupNames(ctx)
 }
 
+// Blob implements Database.Blob.
+func (d *database) Blob(br wire.BlobRef) Blob {
+	return newBlob(d.fullName, br)
+}
+
+// CreateBlob implements Database.CreateBlob.
+func (d *database) CreateBlob(ctx *context.T) (Blob, error) {
+	return createBlob(ctx, d.fullName)
+}
+
 // UpgradeIfOutdated implements Database.UpgradeIfOutdated.
 func (d *database) UpgradeIfOutdated(ctx *context.T) (bool, error) {
 	var schema *Schema = d.schema
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 99f11eb..e26b76d 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -105,6 +105,12 @@
 	// this database.
 	GetSyncGroupNames(ctx *context.T) ([]string, error)
 
+	// CreateBlob returns a handle to the new Blob instantiated by Syncbase.
+	CreateBlob(ctx *context.T) (Blob, error)
+
+	// Blob returns a handle to the blob with the given BlobRef.
+	Blob(br wire.BlobRef) Blob
+
 	// This method compares the current schema version of the database with the
 	// schema version provided while creating this database handle. If the
 	// current database schema version is lower, then the SchemaUpdater is
@@ -342,6 +348,104 @@
 	GetMembers(ctx *context.T) (map[string]wire.SyncGroupMemberInfo, error)
 }
 
+// Blob is the interface for a Blob in the store.
+type Blob interface {
+	// Ref returns Syncbase's BlobRef for this blob.
+	Ref() wire.BlobRef
+
+	// Put appends the byte stream to the blob.
+	Put(ctx *context.T) (BlobWriter, error)
+
+	// Commit marks the blob as immutable.
+	Commit(ctx *context.T) error
+
+	// Size returns the count of bytes written as part of the blob
+	// (committed or uncommitted).
+	Size(ctx *context.T) (int64, error)
+
+	// Delete locally deletes the blob (committed or uncommitted).
+	Delete(ctx *context.T) error
+
+	// Get returns the byte stream from a committed blob starting at offset.
+	Get(ctx *context.T, offset int64) (BlobReader, error)
+
+	// Fetch initiates fetching a blob if not locally found. priority
+	// controls the network priority of the blob. Higher priority blobs are
+	// fetched before the lower priority ones. However an ongoing blob
+	// transfer is not interrupted. Status updates are streamed back to the
+	// client as fetch is in progress.
+	Fetch(ctx *context.T, priority uint64) (BlobStatus, error)
+
+	// Pin locally pins the blob so that it is not evicted.
+	Pin(ctx *context.T) error
+
+	// Unpin locally unpins the blob so that it can be evicted if needed.
+	Unpin(ctx *context.T) error
+
+	// Keep locally caches the blob with the specified rank. Lower
+	// ranked blobs are more eagerly evicted.
+	Keep(ctx *context.T, rank uint64) error
+}
+
+// BlobWriter is an interface for putting a blob.
+type BlobWriter interface {
+	// Send places the bytes given by the client onto the output
+	// stream. Returns errors encountered while sending. Blocks if there is
+	// no buffer space.
+	Send([]byte) error
+
+	// Close indicates that no more bytes will be sent.
+	Close() error
+}
+
+// BlobReader is an interface for getting a blob.
+type BlobReader interface {
+	// Advance() stages bytes so that they may be retrieved via
+	// Value(). Returns true iff there are bytes to retrieve. Advance() must
+	// be called before Value() is called. The caller is expected to read
+	// until Advance() returns false, or to call Cancel().
+	Advance() bool
+
+	// Value() returns the bytes that were staged by Advance(). May panic if
+	// Advance() returned false or was not called. Never blocks.
+	Value() []byte
+
+	// Err() returns any error encountered by Advance. Never blocks.
+	Err() error
+
+	// Cancel notifies the stream provider that it can stop producing
+	// elements.  The client must call Cancel if it does not iterate through
+	// all elements (i.e. until Advance returns false). Cancel is idempotent
+	// and can be called concurrently with a goroutine that is iterating via
+	// Advance.  Cancel causes Advance to subsequently return false. Cancel
+	// does not block.
+	Cancel()
+}
+
+// BlobStatus is an interface for getting the status of a blob transfer.
+type BlobStatus interface {
+	// Advance() stages an item so that it may be retrieved via
+	// Value(). Returns true iff there are items to retrieve. Advance() must
+	// be called before Value() is called. The caller is expected to read
+	// until Advance() returns false, or to call Cancel().
+	Advance() bool
+
+	// Value() returns the item that was staged by Advance(). May panic if
+	// Advance() returned false or was not called. Never blocks.
+	Value() wire.BlobFetchStatus
+
+	// Err() returns any error encountered by Advance. Never blocks.
+	Err() error
+
+	// Cancel notifies the stream provider that it can stop producing
+	// elements.  The client must call Cancel if it does not iterate through
+	// all elements (i.e. until Advance returns false). Cancel is idempotent
+	// and can be called concurrently with a goroutine that is iterating via
+	// Advance.  Cancel causes Advance to subsequently return false. Cancel
+	// does not block.
+	Cancel()
+}
+
 // SchemaUpgrader interface must be implemented by the App in order to upgrade
 // the database schema from a lower version to a higher version.
 type SchemaUpgrader interface {
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index d235b1b..39da63c 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -25,6 +25,10 @@
 	os.Exit(r)
 }
 
+func TestV23SyncbasedWholeBlobTransfer(t *testing.T) {
+	v23tests.RunTest(t, V23TestSyncbasedWholeBlobTransfer)
+}
+
 func TestV23SyncbasedJoinSyncGroup(t *testing.T) {
 	v23tests.RunTest(t, V23TestSyncbasedJoinSyncGroup)
 }
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
index c3f11d4..b5378ec 100644
--- a/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
@@ -258,7 +258,7 @@
 		rs := bsR.RecipeStreamFromChunkStream(ctx, cs)
 
 		// 6. The following thread sends the chunk hashes that the
-		//    receiver does not have are to the sender.  It also makes
+		//    receiver does not have to the sender.  It also makes
 		//    a duplicate of the stream on the channel rsCopy.  The
 		//    buffering in rsCopy allows the receiver to put several
 		//    chunks into a fragment.
diff --git a/x/ref/services/syncbase/localblobstore/model.go b/x/ref/services/syncbase/localblobstore/model.go
index ba08726..f51f455 100644
--- a/x/ref/services/syncbase/localblobstore/model.go
+++ b/x/ref/services/syncbase/localblobstore/model.go
@@ -97,7 +97,7 @@
 	// that this will be called on a receiving device, and be given a
 	// ChunkStream from a sending device, to yield a recipe for efficient
 	// chunk transfer.  RecipeStep values with non-nil Chunk fields need
-	// the chunk from the sender; once the data is returned is can be
+	// the chunk from the sender; once the data is returned it can be
 	// written with BlobWriter.AppendFragment().  Those with blob
 	// references can be written locally with BlobWriter.AppendBlob().
 	RecipeStreamFromChunkStream(ctx *context.T, chunkStream ChunkStream) RecipeStream
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index d7103e9..c404f77 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -246,9 +246,6 @@
 	if err := d.St().Close(); err != nil {
 		return err
 	}
-	if err := d.BlobSt().Close(); err != nil {
-		return err
-	}
 	if err := util.DestroyStore(a.s.opts.Engine, a.rootDirForDb(dbName)); err != nil {
 		return err
 	}
diff --git a/x/ref/services/syncbase/server/interfaces/database.go b/x/ref/services/syncbase/server/interfaces/database.go
index 02e232e..8be30d8 100644
--- a/x/ref/services/syncbase/server/interfaces/database.go
+++ b/x/ref/services/syncbase/server/interfaces/database.go
@@ -5,7 +5,6 @@
 package interfaces
 
 import (
-	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
@@ -17,9 +16,6 @@
 	// St returns the storage engine instance for this database.
 	St() store.Store
 
-	// BlobSt returns the blob storage engine instance for this database.
-	BlobSt() localblobstore.BlobStore
-
 	// App returns the app handle for this database.
 	App() App
 
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl b/x/ref/services/syncbase/server/interfaces/sync.vdl
index ee15a16..b97e845 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl
@@ -37,6 +37,23 @@
 	JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (SyncGroup | error) {access.Read}
 
 	// BlobSync methods.
-	// FetchBlob returns the requested blob.
-	FetchBlob(br wire.BlobRef) error {access.Read}
+
+	// HaveBlob verifies that the peer has the requested blob, and if
+	// present, returns its size.
+	HaveBlob(br wire.BlobRef) (int64 | error)
+
+	// FetchBlob fetches the requested blob.
+	FetchBlob(br wire.BlobRef) stream<_, []byte> error
+
+	// Methods for incremental blob transfer. The transfer starts with the
+	// receiver making a FetchBlobRecipe call to the sender for a given
+	// BlobRef. The sender, in turn, sends the chunk hashes of all the
+	// chunks that make up the requested blob (blob recipe). The receiver
+	// looks up the chunk hashes in its local blob store, and identifies the
+	// missing ones. The receiver then fetches the missing chunks using a
+	// FetchChunks call from the sender. Finally, the receiver finishes the
+	// blob fetch by combining the chunks obtained over the network with the
+	// already available local chunks as per the blob recipe.
+	FetchBlobRecipe(br wire.BlobRef) stream<_, ChunkHash> error
+	FetchChunks() stream<ChunkHash, ChunkData> error
 }
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl.go b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
index d378170..9cd1383 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
@@ -45,9 +45,22 @@
 	// allowed to join the named SyncGroup, and if so, adds the requestor to
 	// the SyncGroup.
 	JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (SyncGroup, error)
-	// BlobSync methods.
-	// FetchBlob returns the requested blob.
-	FetchBlob(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) error
+	// HaveBlob verifies that the peer has the requested blob, and if
+	// present, returns its size.
+	HaveBlob(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) (int64, error)
+	// FetchBlob fetches the requested blob.
+	FetchBlob(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) (SyncFetchBlobClientCall, error)
+	// Methods for incremental blob transfer. The transfer starts with the
+	// receiver making a FetchBlobRecipe call to the sender for a given
+	// BlobRef. The sender, in turn, sends the chunk hashes of all the
+	// chunks that make up the requested blob (blob recipe). The receiver
+	// looks up the chunk hashes in its local blob store, and identifies the
+	// missing ones. The receiver then fetches the missing chunks using a
+	// FetchChunks call from the sender. Finally, the receiver finishes the
+	// blob fetch by combining the chunks obtained over the network with the
+	// already available local chunks as per the blob recipe.
+	FetchBlobRecipe(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) (SyncFetchBlobRecipeClientCall, error)
+	FetchChunks(*context.T, ...rpc.CallOpt) (SyncFetchChunksClientCall, error)
 }
 
 // SyncClientStub adds universal methods to SyncClientMethods.
@@ -84,8 +97,35 @@
 	return
 }
 
-func (c implSyncClientStub) FetchBlob(ctx *context.T, i0 nosql.BlobRef, opts ...rpc.CallOpt) (err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "FetchBlob", []interface{}{i0}, nil, opts...)
+func (c implSyncClientStub) HaveBlob(ctx *context.T, i0 nosql.BlobRef, opts ...rpc.CallOpt) (o0 int64, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "HaveBlob", []interface{}{i0}, []interface{}{&o0}, opts...)
+	return
+}
+
+func (c implSyncClientStub) FetchBlob(ctx *context.T, i0 nosql.BlobRef, opts ...rpc.CallOpt) (ocall SyncFetchBlobClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "FetchBlob", []interface{}{i0}, opts...); err != nil {
+		return
+	}
+	ocall = &implSyncFetchBlobClientCall{ClientCall: call}
+	return
+}
+
+func (c implSyncClientStub) FetchBlobRecipe(ctx *context.T, i0 nosql.BlobRef, opts ...rpc.CallOpt) (ocall SyncFetchBlobRecipeClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "FetchBlobRecipe", []interface{}{i0}, opts...); err != nil {
+		return
+	}
+	ocall = &implSyncFetchBlobRecipeClientCall{ClientCall: call}
+	return
+}
+
+func (c implSyncClientStub) FetchChunks(ctx *context.T, opts ...rpc.CallOpt) (ocall SyncFetchChunksClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "FetchChunks", nil, opts...); err != nil {
+		return
+	}
+	ocall = &implSyncFetchChunksClientCall{ClientCall: call}
 	return
 }
 
@@ -191,6 +231,246 @@
 	return
 }
 
+// SyncFetchBlobClientStream is the client stream for Sync.FetchBlob.
+type SyncFetchBlobClientStream interface {
+	// RecvStream returns the receiver side of the Sync.FetchBlob client stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() []byte
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+}
+
+// SyncFetchBlobClientCall represents the call returned from Sync.FetchBlob.
+type SyncFetchBlobClientCall interface {
+	SyncFetchBlobClientStream
+	// Finish blocks until the server is done, and returns the positional return
+	// values for call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implSyncFetchBlobClientCall struct {
+	rpc.ClientCall
+	valRecv []byte
+	errRecv error
+}
+
+func (c *implSyncFetchBlobClientCall) RecvStream() interface {
+	Advance() bool
+	Value() []byte
+	Err() error
+} {
+	return implSyncFetchBlobClientCallRecv{c}
+}
+
+type implSyncFetchBlobClientCallRecv struct {
+	c *implSyncFetchBlobClientCall
+}
+
+func (c implSyncFetchBlobClientCallRecv) Advance() bool {
+	c.c.errRecv = c.c.Recv(&c.c.valRecv)
+	return c.c.errRecv == nil
+}
+func (c implSyncFetchBlobClientCallRecv) Value() []byte {
+	return c.c.valRecv
+}
+func (c implSyncFetchBlobClientCallRecv) Err() error {
+	if c.c.errRecv == io.EOF {
+		return nil
+	}
+	return c.c.errRecv
+}
+func (c *implSyncFetchBlobClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
+// SyncFetchBlobRecipeClientStream is the client stream for Sync.FetchBlobRecipe.
+type SyncFetchBlobRecipeClientStream interface {
+	// RecvStream returns the receiver side of the Sync.FetchBlobRecipe client stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() ChunkHash
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+}
+
+// SyncFetchBlobRecipeClientCall represents the call returned from Sync.FetchBlobRecipe.
+type SyncFetchBlobRecipeClientCall interface {
+	SyncFetchBlobRecipeClientStream
+	// Finish blocks until the server is done, and returns the positional return
+	// values for call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implSyncFetchBlobRecipeClientCall struct {
+	rpc.ClientCall
+	valRecv ChunkHash
+	errRecv error
+}
+
+func (c *implSyncFetchBlobRecipeClientCall) RecvStream() interface {
+	Advance() bool
+	Value() ChunkHash
+	Err() error
+} {
+	return implSyncFetchBlobRecipeClientCallRecv{c}
+}
+
+type implSyncFetchBlobRecipeClientCallRecv struct {
+	c *implSyncFetchBlobRecipeClientCall
+}
+
+func (c implSyncFetchBlobRecipeClientCallRecv) Advance() bool {
+	c.c.valRecv = ChunkHash{}
+	c.c.errRecv = c.c.Recv(&c.c.valRecv)
+	return c.c.errRecv == nil
+}
+func (c implSyncFetchBlobRecipeClientCallRecv) Value() ChunkHash {
+	return c.c.valRecv
+}
+func (c implSyncFetchBlobRecipeClientCallRecv) Err() error {
+	if c.c.errRecv == io.EOF {
+		return nil
+	}
+	return c.c.errRecv
+}
+func (c *implSyncFetchBlobRecipeClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
+// SyncFetchChunksClientStream is the client stream for Sync.FetchChunks.
+type SyncFetchChunksClientStream interface {
+	// RecvStream returns the receiver side of the Sync.FetchChunks client stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() ChunkData
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the Sync.FetchChunks client stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors
+		// encountered while sending, or if Send is called after Close or
+		// the stream has been canceled.  Blocks if there is no buffer
+		// space; will unblock when buffer space is available or after
+		// the stream has been canceled.
+		Send(item ChunkHash) error
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.
+		// This is an optional call - e.g. a client might call Close if it
+		// needs to continue receiving items from the server after it's
+		// done sending.  Returns errors encountered while closing, or if
+		// Close is called after the stream has been canceled.  Like Send,
+		// blocks if there is no buffer space available.
+		Close() error
+	}
+}
+
+// SyncFetchChunksClientCall represents the call returned from Sync.FetchChunks.
+type SyncFetchChunksClientCall interface {
+	SyncFetchChunksClientStream
+	// Finish performs the equivalent of SendStream().Close, then blocks until
+	// the server is done, and returns the positional return values for the call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implSyncFetchChunksClientCall struct {
+	rpc.ClientCall
+	valRecv ChunkData
+	errRecv error
+}
+
+func (c *implSyncFetchChunksClientCall) RecvStream() interface {
+	Advance() bool
+	Value() ChunkData
+	Err() error
+} {
+	return implSyncFetchChunksClientCallRecv{c}
+}
+
+type implSyncFetchChunksClientCallRecv struct {
+	c *implSyncFetchChunksClientCall
+}
+
+func (c implSyncFetchChunksClientCallRecv) Advance() bool {
+	c.c.valRecv = ChunkData{}
+	c.c.errRecv = c.c.Recv(&c.c.valRecv)
+	return c.c.errRecv == nil
+}
+func (c implSyncFetchChunksClientCallRecv) Value() ChunkData {
+	return c.c.valRecv
+}
+func (c implSyncFetchChunksClientCallRecv) Err() error {
+	if c.c.errRecv == io.EOF {
+		return nil
+	}
+	return c.c.errRecv
+}
+func (c *implSyncFetchChunksClientCall) SendStream() interface {
+	Send(item ChunkHash) error
+	Close() error
+} {
+	return implSyncFetchChunksClientCallSend{c}
+}
+
+type implSyncFetchChunksClientCallSend struct {
+	c *implSyncFetchChunksClientCall
+}
+
+func (c implSyncFetchChunksClientCallSend) Send(item ChunkHash) error {
+	return c.c.Send(item)
+}
+func (c implSyncFetchChunksClientCallSend) Close() error {
+	return c.c.CloseSend()
+}
+func (c *implSyncFetchChunksClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
 // SyncServerMethods is the interface a server writer
 // implements for Sync.
 //
@@ -216,9 +496,22 @@
 	// allowed to join the named SyncGroup, and if so, adds the requestor to
 	// the SyncGroup.
 	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
-	// BlobSync methods.
-	// FetchBlob returns the requested blob.
-	FetchBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) error
+	// HaveBlob verifies that the peer has the requested blob, and if
+	// present, returns its size.
+	HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
+	// FetchBlob fetches the requested blob.
+	FetchBlob(ctx *context.T, call SyncFetchBlobServerCall, br nosql.BlobRef) error
+	// Methods for incremental blob transfer. The transfer starts with the
+	// receiver making a FetchBlobRecipe call to the sender for a given
+	// BlobRef. The sender, in turn, sends the chunk hashes of all the
+	// chunks that make up the requested blob (blob recipe). The receiver
+	// looks up the chunk hashes in its local blob store, and identifies the
+	// missing ones. The receiver then fetches the missing chunks using a
+	// FetchChunks call from the sender. Finally, the receiver finishes the
+	// blob fetch by combining the chunks obtained over the network with the
+	// already available local chunks as per the blob recipe.
+	FetchBlobRecipe(ctx *context.T, call SyncFetchBlobRecipeServerCall, br nosql.BlobRef) error
+	FetchChunks(*context.T, SyncFetchChunksServerCall) error
 }
 
 // SyncServerStubMethods is the server interface containing
@@ -245,9 +538,22 @@
 	// allowed to join the named SyncGroup, and if so, adds the requestor to
 	// the SyncGroup.
 	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
-	// BlobSync methods.
-	// FetchBlob returns the requested blob.
-	FetchBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) error
+	// HaveBlob verifies that the peer has the requested blob, and if
+	// present, returns its size.
+	HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
+	// FetchBlob fetches the requested blob.
+	FetchBlob(ctx *context.T, call *SyncFetchBlobServerCallStub, br nosql.BlobRef) error
+	// Methods for incremental blob transfer. The transfer starts with the
+	// receiver making a FetchBlobRecipe call to the sender for a given
+	// BlobRef. The sender, in turn, sends the chunk hashes of all the
+	// chunks that make up the requested blob (blob recipe). The receiver
+	// looks up the chunk hashes in its local blob store, and identifies the
+	// missing ones. The receiver then fetches the missing chunks using a
+	// FetchChunks call from the sender. Finally, the receiver finishes the
+	// blob fetch by combining the chunks obtained over the network with the
+	// already available local chunks as per the blob recipe.
+	FetchBlobRecipe(ctx *context.T, call *SyncFetchBlobRecipeServerCallStub, br nosql.BlobRef) error
+	FetchChunks(*context.T, *SyncFetchChunksServerCallStub) error
 }
 
 // SyncServerStub adds universal methods to SyncServerStubMethods.
@@ -291,10 +597,22 @@
 	return s.impl.JoinSyncGroupAtAdmin(ctx, call, i0, i1, i2)
 }
 
-func (s implSyncServerStub) FetchBlob(ctx *context.T, call rpc.ServerCall, i0 nosql.BlobRef) error {
+func (s implSyncServerStub) HaveBlob(ctx *context.T, call rpc.ServerCall, i0 nosql.BlobRef) (int64, error) {
+	return s.impl.HaveBlob(ctx, call, i0)
+}
+
+func (s implSyncServerStub) FetchBlob(ctx *context.T, call *SyncFetchBlobServerCallStub, i0 nosql.BlobRef) error {
 	return s.impl.FetchBlob(ctx, call, i0)
 }
 
+func (s implSyncServerStub) FetchBlobRecipe(ctx *context.T, call *SyncFetchBlobRecipeServerCallStub, i0 nosql.BlobRef) error {
+	return s.impl.FetchBlobRecipe(ctx, call, i0)
+}
+
+func (s implSyncServerStub) FetchChunks(ctx *context.T, call *SyncFetchChunksServerCallStub) error {
+	return s.impl.FetchChunks(ctx, call)
+}
+
 func (s implSyncServerStub) Globber() *rpc.GlobState {
 	return s.gs
 }
@@ -342,12 +660,31 @@
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
-			Name: "FetchBlob",
-			Doc:  "// BlobSync methods.\n// FetchBlob returns the requested blob.",
+			Name: "HaveBlob",
+			Doc:  "// HaveBlob verifies that the peer has the requested blob, and if\n// present, returns its size.",
 			InArgs: []rpc.ArgDesc{
 				{"br", ``}, // nosql.BlobRef
 			},
-			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // int64
+			},
+		},
+		{
+			Name: "FetchBlob",
+			Doc:  "// FetchBlob fetches the requested blob.",
+			InArgs: []rpc.ArgDesc{
+				{"br", ``}, // nosql.BlobRef
+			},
+		},
+		{
+			Name: "FetchBlobRecipe",
+			Doc:  "// Methods for incremental blob transfer. The transfer starts with the\n// receiver making a FetchBlobRecipe call to the sender for a given\n// BlobRef. The sender, in turn, sends the chunk hashes of all the\n// chunks that make up the requested blob (blob recipe). The receiver\n// looks up the chunk hashes in its local blob store, and identifies the\n// missing ones. The receiver then fetches the missing chunks using a\n// FetchChunks call from the sender. Finally, the receiver finishes the\n// blob fetch by combining the chunks obtained over the network with the\n// already available local chunks as per the blob recipe.",
+			InArgs: []rpc.ArgDesc{
+				{"br", ``}, // nosql.BlobRef
+			},
+		},
+		{
+			Name: "FetchChunks",
 		},
 	},
 }
@@ -436,3 +773,174 @@
 func (s implSyncGetDeltasServerCallSend) Send(item DeltaResp) error {
 	return s.s.Send(item)
 }
+
+// SyncFetchBlobServerStream is the server stream for Sync.FetchBlob.
+type SyncFetchBlobServerStream interface {
+	// SendStream returns the send side of the Sync.FetchBlob server stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors encountered
+		// while sending.  Blocks if there is no buffer space; will unblock when
+		// buffer space is available.
+		Send(item []byte) error
+	}
+}
+
+// SyncFetchBlobServerCall represents the context passed to Sync.FetchBlob.
+type SyncFetchBlobServerCall interface {
+	rpc.ServerCall
+	SyncFetchBlobServerStream
+}
+
+// SyncFetchBlobServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements SyncFetchBlobServerCall.
+type SyncFetchBlobServerCallStub struct {
+	rpc.StreamServerCall
+}
+
+// Init initializes SyncFetchBlobServerCallStub from rpc.StreamServerCall.
+func (s *SyncFetchBlobServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// SendStream returns the send side of the Sync.FetchBlob server stream.
+func (s *SyncFetchBlobServerCallStub) SendStream() interface {
+	Send(item []byte) error
+} {
+	return implSyncFetchBlobServerCallSend{s}
+}
+
+type implSyncFetchBlobServerCallSend struct {
+	s *SyncFetchBlobServerCallStub
+}
+
+func (s implSyncFetchBlobServerCallSend) Send(item []byte) error {
+	return s.s.Send(item)
+}
+
+// SyncFetchBlobRecipeServerStream is the server stream for Sync.FetchBlobRecipe.
+type SyncFetchBlobRecipeServerStream interface {
+	// SendStream returns the send side of the Sync.FetchBlobRecipe server stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors encountered
+		// while sending.  Blocks if there is no buffer space; will unblock when
+		// buffer space is available.
+		Send(item ChunkHash) error
+	}
+}
+
+// SyncFetchBlobRecipeServerCall represents the context passed to Sync.FetchBlobRecipe.
+type SyncFetchBlobRecipeServerCall interface {
+	rpc.ServerCall
+	SyncFetchBlobRecipeServerStream
+}
+
+// SyncFetchBlobRecipeServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements SyncFetchBlobRecipeServerCall.
+type SyncFetchBlobRecipeServerCallStub struct {
+	rpc.StreamServerCall
+}
+
+// Init initializes SyncFetchBlobRecipeServerCallStub from rpc.StreamServerCall.
+func (s *SyncFetchBlobRecipeServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// SendStream returns the send side of the Sync.FetchBlobRecipe server stream.
+func (s *SyncFetchBlobRecipeServerCallStub) SendStream() interface {
+	Send(item ChunkHash) error
+} {
+	return implSyncFetchBlobRecipeServerCallSend{s}
+}
+
+type implSyncFetchBlobRecipeServerCallSend struct {
+	s *SyncFetchBlobRecipeServerCallStub
+}
+
+func (s implSyncFetchBlobRecipeServerCallSend) Send(item ChunkHash) error {
+	return s.s.Send(item)
+}
+
+// SyncFetchChunksServerStream is the server stream for Sync.FetchChunks.
+type SyncFetchChunksServerStream interface {
+	// RecvStream returns the receiver side of the Sync.FetchChunks server stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() ChunkHash
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the Sync.FetchChunks server stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors encountered
+		// while sending.  Blocks if there is no buffer space; will unblock when
+		// buffer space is available.
+		Send(item ChunkData) error
+	}
+}
+
+// SyncFetchChunksServerCall represents the context passed to Sync.FetchChunks.
+type SyncFetchChunksServerCall interface {
+	rpc.ServerCall
+	SyncFetchChunksServerStream
+}
+
+// SyncFetchChunksServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements SyncFetchChunksServerCall.
+type SyncFetchChunksServerCallStub struct {
+	rpc.StreamServerCall
+	valRecv ChunkHash
+	errRecv error
+}
+
+// Init initializes SyncFetchChunksServerCallStub from rpc.StreamServerCall.
+func (s *SyncFetchChunksServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the Sync.FetchChunks server stream.
+func (s *SyncFetchChunksServerCallStub) RecvStream() interface {
+	Advance() bool
+	Value() ChunkHash
+	Err() error
+} {
+	return implSyncFetchChunksServerCallRecv{s}
+}
+
+type implSyncFetchChunksServerCallRecv struct {
+	s *SyncFetchChunksServerCallStub
+}
+
+func (s implSyncFetchChunksServerCallRecv) Advance() bool {
+	s.s.valRecv = ChunkHash{}
+	s.s.errRecv = s.s.Recv(&s.s.valRecv)
+	return s.s.errRecv == nil
+}
+func (s implSyncFetchChunksServerCallRecv) Value() ChunkHash {
+	return s.s.valRecv
+}
+func (s implSyncFetchChunksServerCallRecv) Err() error {
+	if s.s.errRecv == io.EOF {
+		return nil
+	}
+	return s.s.errRecv
+}
+
+// SendStream returns the send side of the Sync.FetchChunks server stream.
+func (s *SyncFetchChunksServerCallStub) SendStream() interface {
+	Send(item ChunkData) error
+} {
+	return implSyncFetchChunksServerCallSend{s}
+}
+
+type implSyncFetchChunksServerCallSend struct {
+	s *SyncFetchChunksServerCallStub
+}
+
+func (s implSyncFetchChunksServerCallSend) Send(item ChunkData) error {
+	return s.s.Send(item)
+}
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
index 6d608a3..324928b 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
@@ -117,3 +117,13 @@
 	Rec     LogRec
 	RespVec GenVector
 }
+
+// ChunkHash contains the hash of a chunk that is part of a blob's recipe.
+type ChunkHash struct {
+	Hash []byte
+}
+
+// ChunkData contains the data of a chunk.
+type ChunkData struct {
+	Data []byte
+}
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
index aeb4e31..8ef80a8 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -228,6 +228,26 @@
 func (x DeltaRespRespVec) Name() string                    { return "RespVec" }
 func (x DeltaRespRespVec) __VDLReflect(__DeltaRespReflect) {}
 
+// ChunkHash contains the hash of a chunk that is part of a blob's recipe.
+type ChunkHash struct {
+	Hash []byte
+}
+
+func (ChunkHash) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.ChunkHash"`
+}) {
+}
+
+// ChunkData contains the data of a chunk.
+type ChunkData struct {
+	Data []byte
+}
+
+func (ChunkData) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.ChunkData"`
+}) {
+}
+
 func init() {
 	vdl.Register((*PrefixGenVector)(nil))
 	vdl.Register((*GenVector)(nil))
@@ -238,6 +258,8 @@
 	vdl.Register((*SyncGroup)(nil))
 	vdl.Register((*DeltaReq)(nil))
 	vdl.Register((*DeltaResp)(nil))
+	vdl.Register((*ChunkHash)(nil))
+	vdl.Register((*ChunkData)(nil))
 }
 
 const NoGroupId = GroupId(0)
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 1fe3066..94b37d8 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -15,8 +15,6 @@
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/nosql/query_db"
 	"v.io/syncbase/v23/syncbase/nosql/query_exec"
-	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
-	"v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -45,9 +43,6 @@
 	// and do not actually open the store in NewDatabase.
 	st store.Store // stores all data for a single database
 
-	// Local blob store associated with this database.
-	bst localblobstore.BlobStore
-
 	// Active snapshots and transactions corresponding to client batches.
 	// TODO(sadovsky): Add timeouts and GC.
 	mu  sync.Mutex // protects the fields below
@@ -93,17 +88,11 @@
 	if err != nil {
 		return nil, err
 	}
-	// Open a co-located blob store, adjacent to the structured store.
-	bst, err := fs_cablobstore.Create(ctx, path.Join(opts.RootDir, "blobs"))
-	if err != nil {
-		return nil, err
-	}
 	return &database{
 		name:   name,
 		a:      a,
 		exists: true,
 		st:     st,
-		bst:    bst,
 		sns:    make(map[uint64]store.Snapshot),
 		txs:    make(map[uint64]store.Transaction),
 	}, nil
@@ -379,13 +368,6 @@
 	return d.st
 }
 
-func (d *database) BlobSt() localblobstore.BlobStore {
-	if !d.exists {
-		vlog.Fatalf("database %q does not exist", d.name)
-	}
-	return d.bst
-}
-
 func (d *database) App() interfaces.App {
 	return d.a
 }
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index bafa388..180692f 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -130,7 +130,7 @@
 	}
 	// Note, vsync.New internally handles both first-time and subsequent
 	// invocations.
-	if s.sync, err = vsync.New(ctx, call, s, opts.Server); err != nil {
+	if s.sync, err = vsync.New(ctx, call, s, opts.Server, opts.RootDir); err != nil {
 		return nil, err
 	}
 	return s, nil
diff --git a/x/ref/services/syncbase/vsync/blob.go b/x/ref/services/syncbase/vsync/blob.go
index 56a4eed..ff04066 100644
--- a/x/ref/services/syncbase/vsync/blob.go
+++ b/x/ref/services/syncbase/vsync/blob.go
@@ -8,8 +8,11 @@
 	"io"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
-	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
+	blob "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
@@ -19,11 +22,25 @@
 	chunkSize = 8 * 1024
 )
 
+// blobLocInfo contains the location information about a BlobRef. This location
+// information is merely a hint used to search for the blob.
+type blobLocInfo struct {
+	peer   string                          // Syncbase from which the presence of this BlobRef was first learned.
+	source string                          // Syncbase that originated this blob.
+	sgIds  map[interfaces.GroupId]struct{} // SyncGroups through which the BlobRef was learned.
+}
+
 ////////////////////////////////////////////////////////////
 // RPCs for managing blobs between Syncbase and its clients.
 
 func (sd *syncDatabase) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
-	bst := sd.db.BlobSt()
+	vlog.VI(2).Infof("sync: CreateBlob: begin")
+	defer vlog.VI(2).Infof("sync: CreateBlob: end")
+
+	// Get this Syncbase's blob store handle.
+	ss := sd.sync.(*syncService)
+	bst := ss.bst
+
 	writer, err := bst.NewBlobWriter(ctx, "")
 	if err != nil {
 		return wire.NullBlobRef, err
@@ -31,13 +48,19 @@
 	defer writer.CloseWithoutFinalize()
 
 	name := writer.Name()
-	vlog.VI(2).Infof("sync: CreateBlob: blob ref %s", name)
+	vlog.VI(4).Infof("sync: CreateBlob: blob ref %s", name)
 	return wire.BlobRef(name), nil
 }
 
 func (sd *syncDatabase) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
-	bst := sd.db.BlobSt()
-	writer, err := bst.NewBlobWriter(ctx, string(br))
+	vlog.VI(2).Infof("sync: PutBlob: begin br %v", br)
+	defer vlog.VI(2).Infof("sync: PutBlob: end br %v", br)
+
+	// Get this Syncbase's blob store handle.
+	ss := sd.sync.(*syncService)
+	bst := ss.bst
+
+	writer, err := bst.ResumeBlobWriter(ctx, string(br))
 	if err != nil {
 		return err
 	}
@@ -45,7 +68,7 @@
 
 	stream := call.RecvStream()
 	for stream.Advance() {
-		item := localblobstore.BlockOrFile{Block: stream.Value()}
+		item := blob.BlockOrFile{Block: stream.Value()}
 		if err = writer.AppendFragment(item); err != nil {
 			return err
 		}
@@ -54,8 +77,14 @@
 }
 
 func (sd *syncDatabase) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
-	bst := sd.db.BlobSt()
-	writer, err := bst.NewBlobWriter(ctx, string(br))
+	vlog.VI(2).Infof("sync: CommitBlob: begin br %v", br)
+	defer vlog.VI(2).Infof("sync: CommitBlob: end br %v", br)
+
+	// Get this Syncbase's blob store handle.
+	ss := sd.sync.(*syncService)
+	bst := ss.bst
+
+	writer, err := bst.ResumeBlobWriter(ctx, string(br))
 	if err != nil {
 		return err
 	}
@@ -63,7 +92,13 @@
 }
 
 func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
-	bst := sd.db.BlobSt()
+	vlog.VI(2).Infof("sync: GetBlobSize: begin br %v", br)
+	defer vlog.VI(2).Infof("sync: GetBlobSize: end br %v", br)
+
+	// Get this Syncbase's blob store handle.
+	ss := sd.sync.(*syncService)
+	bst := ss.bst
+
 	reader, err := bst.NewBlobReader(ctx, string(br))
 	if err != nil {
 		return 0, err
@@ -78,47 +113,46 @@
 }
 
 func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
+	vlog.VI(2).Infof("sync: GetBlob: begin br %v", br)
+	defer vlog.VI(2).Infof("sync: GetBlob: end br %v", br)
+
 	// First get the blob locally if available.
-	err := sd.getLocalBlob(ctx, call, br, offset)
-	if err == nil {
-		return nil
-	}
-
-	// Locate the blob on a remote blob store and fetch it.
-	return verror.NewErrNotImplemented(ctx)
-}
-
-// getLocalBlob looks for a blob in the local store and, if found, reads the
-// blob and sends it to the client.  If the blob is found, it starts reading it
-// from the given offset and sends its bytes into the client stream.
-func (sd *syncDatabase) getLocalBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
-	bst := sd.db.BlobSt()
-	reader, err := bst.NewBlobReader(ctx, string(br))
-	if err != nil {
+	ss := sd.sync.(*syncService)
+	err := getLocalBlob(ctx, call.SendStream(), ss.bst, br, offset)
+	if err == nil || verror.ErrorID(err) == wire.ErrBlobNotCommitted.ID {
 		return err
 	}
-	defer reader.Close()
 
-	buf := make([]byte, chunkSize)
-	stream := call.SendStream()
-
-	for {
-		nbytes, err := reader.ReadAt(buf, offset)
-		if err != nil && err != io.EOF {
-			return err
-		}
-		if err == io.EOF || nbytes <= 0 {
-			break
-		}
-		offset += int64(nbytes)
-		stream.Send(buf[:nbytes])
-	}
-
-	return nil
+	return sd.fetchBlobRemote(ctx, br, nil, call, offset)
 }
 
 func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
-	return verror.NewErrNotImplemented(ctx)
+	vlog.VI(2).Infof("sync: FetchBlob: begin br %v", br)
+	defer vlog.VI(2).Infof("sync: FetchBlob: end br %v", br)
+
+	clientStream := call.SendStream()
+
+	// Check if BlobRef already exists locally.
+	ss := sd.sync.(*syncService)
+	bst := ss.bst
+
+	bReader, err := bst.NewBlobReader(ctx, string(br))
+	if err == nil {
+		finalized := bReader.IsFinalized()
+		bReader.Close()
+
+		if !finalized {
+			return wire.NewErrBlobNotCommitted(ctx)
+		}
+		clientStream.Send(wire.BlobFetchStatus{State: wire.BlobFetchStateDone})
+		return nil
+	}
+
+	// Wait for this blob's turn.
+	// TODO(hpucha): Implement a blob queue.
+	clientStream.Send(wire.BlobFetchStatus{State: wire.BlobFetchStatePending})
+
+	return sd.fetchBlobRemote(ctx, br, call, nil, 0)
 }
 
 func (sd *syncDatabase) PinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
@@ -136,6 +170,260 @@
 ////////////////////////////////////////////////////////////
 // RPC for blob fetch between Syncbases.
 
-func (s *syncService) FetchBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+func (s *syncService) FetchBlob(ctx *context.T, call interfaces.SyncFetchBlobServerCall, br wire.BlobRef) error {
+	vlog.VI(2).Infof("sync: FetchBlob: sb-sb begin br %v", br)
+	defer vlog.VI(2).Infof("sync: FetchBlob: sb-sb end br %v", br)
+	return getLocalBlob(ctx, call.SendStream(), s.bst, br, 0)
+}
+
+func (s *syncService) HaveBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
+	vlog.VI(2).Infof("sync: HaveBlob: begin br %v", br)
+	defer vlog.VI(2).Infof("sync: HaveBlob: end br %v", br)
+
+	bReader, err := s.bst.NewBlobReader(ctx, string(br))
+	if err != nil {
+		return 0, err
+	}
+	defer bReader.Close()
+	if !bReader.IsFinalized() {
+		return 0, wire.NewErrBlobNotCommitted(ctx)
+	}
+	return bReader.Size(), nil
+}
+
+func (s *syncService) FetchBlobRecipe(ctx *context.T, call interfaces.SyncFetchBlobRecipeServerCall, br wire.BlobRef) error {
 	return verror.NewErrNotImplemented(ctx)
 }
+
+func (s *syncService) FetchChunks(ctx *context.T, call interfaces.SyncFetchChunksServerCall) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+////////////////////////////////////////////////////////////
+// Helpers.
+
+type byteStream interface {
+	Send(item []byte) error
+}
+
+// getLocalBlob looks for a blob in the local store and, if found, reads the
+// blob and sends it to the client.  If the blob is found, it starts reading it
+// from the given offset and sends its bytes into the client stream.
+func getLocalBlob(ctx *context.T, stream byteStream, bst blob.BlobStore, br wire.BlobRef, offset int64) error {
+	vlog.VI(4).Infof("sync: getLocalBlob: begin br %v, offset %v", br, offset)
+	defer vlog.VI(4).Infof("sync: getLocalBlob: end br %v, offset %v", br, offset)
+
+	reader, err := bst.NewBlobReader(ctx, string(br))
+	if err != nil {
+		return err
+	}
+	defer reader.Close()
+
+	if !reader.IsFinalized() {
+		return wire.NewErrBlobNotCommitted(ctx)
+	}
+
+	buf := make([]byte, chunkSize)
+	for {
+		nbytes, err := reader.ReadAt(buf, offset)
+		if err != nil && err != io.EOF {
+			return err
+		}
+		if nbytes <= 0 {
+			break
+		}
+		offset += int64(nbytes)
+		stream.Send(buf[:nbytes])
+		if err == io.EOF {
+			break
+		}
+	}
+
+	return nil
+}
+
+func (sd *syncDatabase) fetchBlobRemote(ctx *context.T, br wire.BlobRef, statusCall wire.BlobManagerFetchBlobServerCall, dataCall wire.BlobManagerGetBlobServerCall, offset int64) error {
+	vlog.VI(4).Infof("sync: fetchBlobRemote: begin br %v, offset %v", br, offset)
+	defer vlog.VI(4).Infof("sync: fetchBlobRemote: end br %v, offset %v", br, offset)
+
+	var sendStatus, sendData bool
+	var statusStream interface {
+		Send(item wire.BlobFetchStatus) error
+	}
+	var dataStream interface {
+		Send(item []byte) error
+	}
+
+	if statusCall != nil {
+		sendStatus = true
+		statusStream = statusCall.SendStream()
+	}
+	if dataCall != nil {
+		sendData = true
+		dataStream = dataCall.SendStream()
+	}
+
+	if sendStatus {
+		// Start blob source discovery.
+		statusStream.Send(wire.BlobFetchStatus{State: wire.BlobFetchStateLocating})
+	}
+
+	// Locate blob.
+	peer, size, err := sd.locateBlob(ctx, br)
+	if err != nil {
+		return err
+	}
+
+	// Start blob fetching.
+	status := wire.BlobFetchStatus{State: wire.BlobFetchStateFetching, Total: size}
+	if sendStatus {
+		statusStream.Send(status)
+	}
+
+	ss := sd.sync.(*syncService)
+	bst := ss.bst
+
+	bWriter, err := bst.NewBlobWriter(ctx, string(br))
+	if err != nil {
+		return err
+	}
+
+	c := interfaces.SyncClient(peer)
+	ctxPeer, cancel := context.WithRootCancel(ctx)
+	stream, err := c.FetchBlob(ctxPeer, br)
+	if err == nil {
+		peerStream := stream.RecvStream()
+		for peerStream.Advance() {
+			item := blob.BlockOrFile{Block: peerStream.Value()}
+			if err = bWriter.AppendFragment(item); err != nil {
+				break
+			}
+			curSize := int64(len(item.Block))
+			status.Received += curSize
+			if sendStatus {
+				statusStream.Send(status)
+			}
+			if sendData {
+				if curSize <= offset {
+					offset -= curSize
+				} else if offset != 0 {
+					dataStream.Send(item.Block[offset:])
+					offset = 0
+				} else {
+					dataStream.Send(item.Block)
+				}
+			}
+		}
+
+		if err != nil {
+			cancel()
+			stream.Finish()
+		} else {
+			err = peerStream.Err()
+			if terr := stream.Finish(); err == nil {
+				err = terr
+			}
+			cancel()
+		}
+	}
+
+	bWriter.Close()
+	if err != nil {
+		// Clean up the blob with failed download, so that it can be
+		// downloaded again. Ignore any error from deletion.
+		bst.DeleteBlob(ctx, string(br))
+	} else {
+		status := wire.BlobFetchStatus{State: wire.BlobFetchStateDone}
+		if sendStatus {
+			statusStream.Send(status)
+		}
+	}
+	return err
+}
+
+// TODO(hpucha): Add syncgroup driven blob discovery.
+func (sd *syncDatabase) locateBlob(ctx *context.T, br wire.BlobRef) (string, int64, error) {
+	vlog.VI(4).Infof("sync: locateBlob: begin br %v", br)
+	defer vlog.VI(4).Infof("sync: locateBlob: end br %v", br)
+
+	ss := sd.sync.(*syncService)
+	loc, err := ss.getBlobLocInfo(ctx, br)
+	if err != nil {
+		return "", 0, err
+	}
+
+	// Search for blob amongst the source peer and peer learned from.
+	var peers = []string{loc.source, loc.peer}
+	for _, p := range peers {
+		vlog.VI(4).Infof("sync: locateBlob: attempting %s", p)
+		// Get the mounttables for this peer.
+		mtTables, err := sd.getMountTables(ctx, p)
+		if err != nil {
+			continue
+		}
+
+		for mt := range mtTables {
+			absName := naming.Join(mt, p, util.SyncbaseSuffix)
+			c := interfaces.SyncClient(absName)
+			size, err := c.HaveBlob(ctx, br)
+			if err == nil {
+				vlog.VI(4).Infof("sync: locateBlob: found blob on %s", absName)
+				return absName, size, nil
+			}
+		}
+	}
+
+	return "", 0, verror.New(verror.ErrInternal, ctx, "blob not found")
+
+}
+
+func (sd *syncDatabase) getMountTables(ctx *context.T, peer string) (map[string]struct{}, error) {
+	ss := sd.sync.(*syncService)
+	mInfo := ss.copyMemberInfo(ctx, peer)
+
+	mtTables := make(map[string]struct{})
+	for gdbName, sgInfo := range mInfo.db2sg {
+		appName, dbName, err := splitAppDbName(ctx, gdbName)
+		if err != nil {
+			return nil, err
+		}
+		st, err := ss.getDbStore(ctx, nil, appName, dbName)
+		if err != nil {
+			return nil, err
+		}
+
+		for id := range sgInfo {
+			sg, err := getSyncGroupById(ctx, st, id)
+			if err != nil {
+				continue
+			}
+			if _, ok := sg.Joiners[peer]; !ok {
+				// Peer is no longer part of the SyncGroup.
+				continue
+			}
+			for _, mt := range sg.Spec.MountTables {
+				mtTables[mt] = struct{}{}
+			}
+		}
+	}
+	return mtTables, nil
+}
+
+// TODO(hpucha): Persist the blob directory periodically.
+func (s *syncService) addBlobLocInfo(ctx *context.T, br wire.BlobRef, info *blobLocInfo) error {
+	s.blobDirLock.Lock()
+	defer s.blobDirLock.Unlock()
+
+	s.blobDirectory[br] = info
+	return nil
+}
+
+func (s *syncService) getBlobLocInfo(ctx *context.T, br wire.BlobRef) (*blobLocInfo, error) {
+	s.blobDirLock.Lock()
+	defer s.blobDirLock.Unlock()
+
+	if info, ok := s.blobDirectory[br]; ok {
+		return info, nil
+	}
+	return nil, verror.New(verror.ErrInternal, ctx, "blob state not found", br)
+}
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 5d3d9b8..cccf217 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -15,14 +15,16 @@
 	"strings"
 	"time"
 
+	"v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/naming"
+	"v.io/v23/vdl"
 	"v.io/v23/verror"
-	"v.io/x/lib/set"
+	"v.io/v23/vom"
 	"v.io/x/lib/vlog"
 )
 
@@ -62,10 +64,6 @@
 func (s *syncService) syncer(ctx *context.T) {
 	defer s.pending.Done()
 
-	// TODO(hpucha): Do we need context per initiator round?
-	ctx, cancel := context.WithRootCancel(ctx)
-	defer cancel()
-
 	ticker := time.NewTicker(peerSyncInterval)
 	defer ticker.Stop()
 
@@ -106,11 +104,13 @@
 // initiation round), the work done by the initiator is idempotent.
 //
 // TODO(hpucha): Check the idempotence, esp in addNode in DAG.
-func (s *syncService) getDeltasFromPeer(ctx *context.T, peer string) {
+func (s *syncService) getDeltasFromPeer(ctxIn *context.T, peer string) {
 	vlog.VI(2).Infof("sync: getDeltasFromPeer: begin: contacting peer %s", peer)
 	defer vlog.VI(2).Infof("sync: getDeltasFromPeer: end: contacting peer %s", peer)
 
-	info := s.allMembers.members[peer]
+	ctx, cancel := context.WithRootCancel(ctxIn)
+
+	info := s.copyMemberInfo(ctx, peer)
 	if info == nil {
 		vlog.Fatalf("sync: getDeltasFromPeer: missing information in member view for %q", peer)
 	}
@@ -167,6 +167,8 @@
 			// Returning here since something could be wrong with
 			// the connection, and no point in attempting the next
 			// Database.
+			cancel()
+			stream.Finish()
 			return
 		}
 		vlog.VI(3).Infof("sync: getDeltasFromPeer: got reply: %v", iSt.remote)
@@ -182,8 +184,11 @@
 	if connected {
 		stream.Finish()
 	}
+	cancel()
 }
 
+type sgSet map[interfaces.GroupId]struct{}
+
 // initiationState is accumulated for each Database during an initiation round.
 type initiationState struct {
 	// Relative name of the peer to sync with.
@@ -193,10 +198,11 @@
 	mtTables map[string]struct{}
 
 	// SyncGroups being requested in the initiation round.
-	sgIds map[interfaces.GroupId]struct{}
+	sgIds sgSet
 
-	// SyncGroup prefixes being requested in the initiation round.
-	sgPfxs map[string]struct{}
+	// SyncGroup prefixes being requested in the initiation round, and their
+	// corresponding SyncGroup ids.
+	sgPfxs map[string]sgSet
 
 	// Local generation vector.
 	local interfaces.GenVector
@@ -267,8 +273,8 @@
 // SyncGroups in the specified Database.
 func (iSt *initiationState) peerMtTblsAndSgInfo(ctx *context.T, peer string, info sgMemberInfo) {
 	iSt.mtTables = make(map[string]struct{})
-	iSt.sgIds = make(map[interfaces.GroupId]struct{})
-	iSt.sgPfxs = make(map[string]struct{})
+	iSt.sgIds = make(sgSet)
+	iSt.sgPfxs = make(map[string]sgSet)
 
 	for id := range info {
 		sg, err := getSyncGroupById(ctx, iSt.st, id)
@@ -285,7 +291,12 @@
 		iSt.sgIds[id] = struct{}{}
 
 		for _, p := range sg.Spec.Prefixes {
-			iSt.sgPfxs[p] = struct{}{}
+			sgs, ok := iSt.sgPfxs[p]
+			if !ok {
+				sgs = make(sgSet)
+				iSt.sgPfxs[p] = sgs
+			}
+			sgs[id] = struct{}{}
 		}
 	}
 }
@@ -340,7 +351,12 @@
 	}
 	localPfxs := extractAndSortPrefixes(local)
 
-	sgPfxs := set.String.ToSlice(iSt.sgPfxs)
+	sgPfxs := make([]string, len(iSt.sgPfxs))
+	i := 0
+	for p := range iSt.sgPfxs {
+		sgPfxs[i] = p
+		i++
+	}
 	sort.Strings(sgPfxs)
 
 	iSt.local = make(interfaces.GenVector)
@@ -454,6 +470,12 @@
 			if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil {
 				return err
 			}
+
+			// Check for BlobRefs, and process them.
+			if err := iSt.processBlobRefs(ctx, &rec.Metadata, v.Value.Value); err != nil {
+				return err
+			}
+
 			// Mark object dirty.
 			iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
 		}
@@ -494,6 +516,68 @@
 	return err
 }
 
+func (iSt *initiationState) processBlobRefs(ctx *context.T, m *interfaces.LogRecMetadata, valbuf []byte) error {
+	objid := m.ObjId
+	srcPeer := syncbaseIdToName(m.Id)
+
+	vlog.VI(4).Infof("sync: processBlobRefs: begin processing blob refs for objid %s", objid)
+	defer vlog.VI(4).Infof("sync: processBlobRefs: end processing blob refs for objid %s", objid)
+
+	if valbuf == nil {
+		return nil
+	}
+
+	var val *vdl.Value
+	if err := vom.Decode(valbuf, &val); err != nil {
+		return err
+	}
+
+	brs := make(map[nosql.BlobRef]struct{})
+	if err := extractBlobRefs(val, brs); err != nil {
+		return err
+	}
+	sgIds := make(sgSet)
+	for br := range brs {
+		for p, sgs := range iSt.sgPfxs {
+			if strings.HasPrefix(extractAppKey(objid), p) {
+				for sg := range sgs {
+					sgIds[sg] = struct{}{}
+				}
+			}
+		}
+		vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.peer, srcPeer, sgIds)
+		info := &blobLocInfo{peer: iSt.peer, source: srcPeer, sgIds: sgIds}
+		if err := iSt.sync.addBlobLocInfo(ctx, br, info); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// TODO(hpucha): Handle blobrefs part of list, map, any.
+func extractBlobRefs(val *vdl.Value, brs map[nosql.BlobRef]struct{}) error {
+	if val == nil {
+		return nil
+	}
+	switch val.Kind() {
+	case vdl.String:
+		// Could be a BlobRef.
+		var br nosql.BlobRef
+		if val.Type() == vdl.TypeOf(br) {
+			brs[nosql.BlobRef(val.RawString())] = struct{}{}
+		}
+	case vdl.Struct:
+		for i := 0; i < val.Type().NumField(); i++ {
+			v := val.StructField(i)
+			if err := extractBlobRefs(v, brs); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
 // insertRecInLogDagAndDb adds a new log record to log and dag data structures,
 // and inserts the versioned value in the Database.
 func (iSt *initiationState) insertRecInLogDagAndDb(ctx *context.T, rec *localLogRec, batchId uint64, valbuf []byte, tx store.Transaction) error {
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
index c3f2538..af09ce1 100644
--- a/x/ref/services/syncbase/vsync/initiator_test.go
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -22,9 +22,65 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+	"v.io/v23/vdl"
+	"v.io/v23/vom"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
+func TestExtractBlobRefs(t *testing.T) {
+	var tests [][]byte
+	br := nosql.BlobRef("123")
+
+	// BlobRef is the value.
+	buf0, err := vom.Encode(br)
+	if err != nil {
+		t.Fatalf("Encode(BlobRef) failed, err %v", err)
+	}
+	tests = append(tests, buf0)
+
+	// Struct contains BlobRef.
+	type test1Struct struct {
+		A int64
+		B string
+		C nosql.BlobRef
+	}
+	v1 := test1Struct{A: 10, B: "foo", C: br}
+	buf1, err := vom.Encode(v1)
+	if err != nil {
+		t.Fatalf("Encode(test1Struct) failed, err %v", err)
+	}
+	tests = append(tests, buf1)
+
+	// Nested struct contains BlobRef.
+	type test2Struct struct {
+		A int64
+		B string
+		C test1Struct
+	}
+	v2 := test2Struct{A: 10, B: "foo", C: v1}
+	buf2, err := vom.Encode(v2)
+	if err != nil {
+		t.Fatalf("Encode(test2Struct) failed, err %v", err)
+	}
+	tests = append(tests, buf2)
+
+	for i, buf := range tests {
+		var val *vdl.Value
+		if err := vom.Decode(buf, &val); err != nil {
+			t.Fatalf("Decode failed (test %d), err %v", i, err)
+		}
+
+		gotbrs := make(map[nosql.BlobRef]struct{})
+		if err := extractBlobRefs(val, gotbrs); err != nil {
+			t.Fatalf("extractBlobRefs failed (test %d), err %v", i, err)
+		}
+		wantbrs := map[nosql.BlobRef]struct{}{br: struct{}{}}
+		if !reflect.DeepEqual(gotbrs, wantbrs) {
+			t.Fatalf("Data mismatch in blobrefs (test %d), got %v, want %v", i, gotbrs, wantbrs)
+		}
+	}
+}
+
 // TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
 // in file testdata/remote-init-00.log.sync.
 func TestLogStreamRemoteOnly(t *testing.T) {
@@ -99,8 +155,12 @@
 
 	// Verify Database state.
 	valbuf, err := svc.St().Get([]byte(objid), nil)
-	if err != nil || string(valbuf) != "abc" {
-		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+	var val string
+	if err := vom.Decode(valbuf, &val); err != nil {
+		t.Fatalf("Value decode failed, err %v", err)
+	}
+	if err != nil || val != "abc" {
+		t.Fatalf("Invalid object %s in Database %v, err %v", objid, val, err)
 	}
 	tx := svc.St().NewTransaction()
 	version, err := watchable.GetVersion(nil, tx, []byte(objid))
@@ -192,8 +252,12 @@
 
 	// Verify Database state.
 	valbuf, err := svc.St().Get([]byte(objid), nil)
-	if err != nil || string(valbuf) != "abc" {
-		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+	var val string
+	if err := vom.Decode(valbuf, &val); err != nil {
+		t.Fatalf("Value decode failed, err %v", err)
+	}
+	if err != nil || val != "abc" {
+		t.Fatalf("Invalid object %s in Database %v, err %v", objid, val, err)
 	}
 	tx := svc.St().NewTransaction()
 	versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
@@ -234,7 +298,11 @@
 
 	// Verify Database state.
 	valbuf, err := svc.St().Get([]byte(objid), nil)
-	if err != nil || string(valbuf) != "abc" {
+	var val string
+	if err := vom.Decode(valbuf, &val); err != nil {
+		t.Fatalf("Value decode failed, err %v", err)
+	}
+	if err != nil || val != "abc" {
 		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
 	}
 	tx := svc.St().NewTransaction()
@@ -277,7 +345,11 @@
 
 	// Verify Database state.
 	valbuf, err := svc.St().Get([]byte(objid), nil)
-	if err != nil || string(valbuf) != "abc" {
+	var val string
+	if err := vom.Decode(valbuf, &val); err != nil {
+		t.Fatalf("Value decode failed, err %v", err)
+	}
+	if err != nil || val != "abc" {
 		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
 	}
 	tx := svc.St().NewTransaction()
@@ -345,7 +417,7 @@
 		t.Fatalf("newInitiationState failed with err %v", err)
 	}
 
-	testIfMapArrEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
+	testIfSgPfxsEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
 	testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
 
 	s.initDbSyncStateInMem(nil, "mockapp", "mockdb")
@@ -375,14 +447,31 @@
 	return svc, iSt, cleanup
 }
 
-func testIfMapArrEqual(t *testing.T, m map[string]struct{}, a []string) {
-	if len(a) != len(m) {
-		t.Fatalf("testIfMapArrEqual diff lengths, got %v want %v", a, m)
+func testIfSgPfxsEqual(t *testing.T, m map[string]sgSet, a []string) {
+	aMap := arrToMap(a)
+
+	if len(aMap) != len(m) {
+		t.Fatalf("testIfSgPfxsEqual diff lengths, got %v want %v", aMap, m)
 	}
 
-	for _, p := range a {
+	for p := range aMap {
 		if _, ok := m[p]; !ok {
-			t.Fatalf("testIfMapArrEqual want %v", p)
+			t.Fatalf("testIfSgPfxsEqual want %v", p)
 		}
 	}
 }
+
+func testIfMapArrEqual(t *testing.T, m map[string]struct{}, a []string) {
+	aMap := arrToMap(a)
+	if !reflect.DeepEqual(m, aMap) {
+		t.Fatalf("testIfMapArrEqual failed map %v, arr %v", m, aMap)
+	}
+}
+
+func arrToMap(a []string) map[string]struct{} {
+	m := make(map[string]struct{})
+	for _, s := range a {
+		m[s] = struct{}{}
+	}
+	return m
+}
diff --git a/x/ref/services/syncbase/vsync/replay_test.go b/x/ref/services/syncbase/vsync/replay_test.go
index 98b7d20..03d6dc6 100644
--- a/x/ref/services/syncbase/vsync/replay_test.go
+++ b/x/ref/services/syncbase/vsync/replay_test.go
@@ -22,6 +22,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/v23/context"
+	"v.io/v23/vom"
 )
 
 const (
@@ -359,9 +360,15 @@
 			t.Fatalf("createReplayStream unknown command %v", cmd)
 		}
 
+		var val string = "abc"
+		valbuf, err := vom.Encode(val)
+		if err != nil {
+			t.Fatalf("createReplayStream encode failed, err %v", err)
+		}
+
 		rec := interfaces.DeltaRespRec{interfaces.LogRec{
 			Metadata: createMetadata(t, ty, cmd),
-			Value:    []byte("abc"),
+			Value:    valbuf,
 		}}
 
 		stream.add(rec)
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 80c2c52..c417eca 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -11,7 +11,6 @@
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
-	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
@@ -446,11 +445,7 @@
 	// The key starts with one of the store's reserved prefixes for managed
 	// namespaces (e.g. $row, $perms).  Remove that prefix before comparing
 	// it with the SyncGroup prefixes which are defined by the application.
-	parts := util.SplitKeyParts(rec.Metadata.ObjId)
-	if len(parts) < 2 {
-		vlog.Fatalf("sync: filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
-	}
-	key := util.JoinKeyParts(parts[1:]...)
+	key := extractAppKey(rec.Metadata.ObjId)
 
 	filter := true
 	var maxGen uint64
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index ebb55f6..fd65c66 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -14,9 +14,13 @@
 import (
 	"fmt"
 	"math/rand"
+	"path"
 	"sync"
 	"time"
 
+	"v.io/syncbase/v23/services/syncbase/nosql"
+	blob "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+	fsblob "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -62,7 +66,8 @@
 	// names must be advertised in the appropriate mount tables.
 
 	// In-memory sync membership info aggregated across databases.
-	allMembers *memberView
+	allMembers     *memberView
+	allMembersLock sync.RWMutex
 
 	// In-memory sync state per Database. This state is populated at
 	// startup, and periodically persisted by the initiator.
@@ -75,13 +80,20 @@
 	// access to the batch set.
 	batchesLock sync.Mutex
 	batches     batchSet
+
+	// Metadata related to blob handling.
+	bst           blob.BlobStore                 // local blob store associated with this Syncbase.
+	blobDirectory map[nosql.BlobRef]*blobLocInfo // directory structure containing blob location information.
+	blobDirLock   sync.RWMutex                   // lock to synchronize access to the blob directory information.
+
 }
 
 // syncDatabase contains the metadata for syncing a database. This struct is
 // used as a receiver to hand off the app-initiated SyncGroup calls that arrive
 // against a nosql.Database to the sync module.
 type syncDatabase struct {
-	db interfaces.Database
+	db   interfaces.Database
+	sync interfaces.SyncServerMethods
 }
 
 var (
@@ -111,7 +123,7 @@
 // changes to its objects. The "initiator" thread is responsible for
 // periodically contacting peers to fetch changes from them. In addition, the
 // sync module responds to incoming RPCs from remote sync modules.
-func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, server rpc.Server) (*syncService, error) {
+func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, server rpc.Server, rootDir string) (*syncService, error) {
 	s := &syncService{
 		sv:      sv,
 		server:  server,
@@ -136,13 +148,21 @@
 
 	// data.Id is now guaranteed to be initialized.
 	s.id = data.Id
-	s.name = fmt.Sprintf("%x", s.id)
+	s.name = syncbaseIdToName(s.id)
 
 	// Initialize in-memory state for the sync module before starting any threads.
 	if err := s.initSync(ctx); err != nil {
 		return nil, verror.New(verror.ErrInternal, ctx, err)
 	}
 
+	// Open a blob store.
+	var err error
+	s.bst, err = fsblob.Create(ctx, path.Join(rootDir, "blobs"))
+	if err != nil {
+		return nil, err
+	}
+	s.blobDirectory = make(map[nosql.BlobRef]*blobLocInfo)
+
 	// Channel to propagate close event to all threads.
 	s.closed = make(chan struct{})
 	s.pending.Add(2)
@@ -159,12 +179,17 @@
 // Close cleans up sync state.
 // TODO(hpucha): Hook it up to server shutdown of syncbased.
 func (s *syncService) Close() {
+	s.bst.Close()
 	close(s.closed)
 	s.pending.Wait()
 }
 
+func syncbaseIdToName(id uint64) string {
+	return fmt.Sprintf("%x", id)
+}
+
 func NewSyncDatabase(db interfaces.Database) *syncDatabase {
-	return &syncDatabase{db: db}
+	return &syncDatabase{db: db, sync: db.App().Service().Sync()}
 }
 
 func (s *syncService) stKey() string {
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index bf4e3aa..905a319 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -263,6 +263,9 @@
 // getMembers returns all SyncGroup members and the count of SyncGroups each one
 // joined.
 func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
+	s.allMembersLock.Lock()
+	defer s.allMembersLock.Unlock()
+
 	s.refreshMembersIfExpired(ctx)
 
 	members := make(map[string]uint32)
@@ -277,6 +280,28 @@
 	return members
 }
 
+// copyMemberInfo returns a copy of the info for the requested peer.
+func (s *syncService) copyMemberInfo(ctx *context.T, member string) *memberInfo {
+	s.allMembersLock.RLock()
+	defer s.allMembersLock.RUnlock()
+
+	info, ok := s.allMembers.members[member]
+	if !ok {
+		return nil
+	}
+
+	// Make a copy.
+	infoCopy := &memberInfo{make(map[string]sgMemberInfo)}
+	for gdbName, sgInfo := range info.db2sg {
+		infoCopy.db2sg[gdbName] = make(sgMemberInfo)
+		for gid, mi := range sgInfo {
+			infoCopy.db2sg[gdbName][gid] = mi
+		}
+	}
+
+	return infoCopy
+}
+
 // Low-level utility functions to access DB entries without tracking their
 // relationships.
 // Use the functions above to manipulate SyncGroups.
@@ -395,7 +420,7 @@
 		// has Admin privilege.
 
 		// Get this Syncbase's sync module handle.
-		ss := sd.db.App().Service().Sync().(*syncService)
+		ss := sd.sync.(*syncService)
 
 		// Instantiate sg. Add self as joiner.
 		sg := &interfaces.SyncGroup{
@@ -485,7 +510,7 @@
 	// Brand new join.
 
 	// Get this Syncbase's sync module handle.
-	ss := sd.db.App().Service().Sync().(*syncService)
+	ss := sd.sync.(*syncService)
 
 	// Contact a SyncGroup Admin to join the SyncGroup.
 	sg = &interfaces.SyncGroup{}
@@ -739,7 +764,7 @@
 
 func (sd *syncDatabase) publishInMountTables(ctx *context.T, call rpc.ServerCall, spec wire.SyncGroupSpec) error {
 	// Get this Syncbase's sync module handle.
-	ss := sd.db.App().Service().Sync().(*syncService)
+	ss := sd.sync.(*syncService)
 
 	for _, mt := range spec.MountTables {
 		name := naming.Join(mt, ss.name)
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index b06b785..a323f19 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -14,8 +14,6 @@
 	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
-	"v.io/syncbase/x/ref/services/syncbase/localblobstore"
-	"v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -33,7 +31,6 @@
 	engine   string
 	dir      string
 	st       store.Store
-	bst      localblobstore.BlobStore
 	sync     *syncService
 	shutdown func()
 }
@@ -47,7 +44,7 @@
 }
 
 func (s *mockService) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
-	return &mockApp{st: s.st, bst: s.bst}, nil
+	return &mockApp{st: s.st}, nil
 }
 
 func (s *mockService) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
@@ -56,12 +53,11 @@
 
 // mockApp emulates a Syncbase App.  It is used to access a mock database.
 type mockApp struct {
-	st  store.Store
-	bst localblobstore.BlobStore
+	st store.Store
 }
 
 func (a *mockApp) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
-	return &mockDatabase{st: a.st, bst: a.bst}, nil
+	return &mockDatabase{st: a.st}, nil
 }
 
 func (a *mockApp) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
@@ -90,18 +86,13 @@
 
 // mockDatabase emulates a Syncbase Database.  It is used to test sync functionality.
 type mockDatabase struct {
-	st  store.Store
-	bst localblobstore.BlobStore
+	st store.Store
 }
 
 func (d *mockDatabase) St() store.Store {
 	return d.st
 }
 
-func (d *mockDatabase) BlobSt() localblobstore.BlobStore {
-	return d.bst
-}
-
 func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
 	return verror.NewErrNotImplemented(ctx)
 }
@@ -132,20 +123,14 @@
 	st, err = watchable.Wrap(st, &watchable.Options{
 		ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
 	})
-	bst, err := fs_cablobstore.Create(ctx, path.Join(dir, "blobs"))
-	if err != nil {
-		t.Fatalf("cannot create blob store (%s): %v", dir, err)
-	}
 
 	s := &mockService{
 		st:       st,
-		bst:      bst,
 		engine:   engine,
 		dir:      dir,
 		shutdown: shutdown,
 	}
-	if s.sync, err = New(ctx, nil, s, nil); err != nil {
-		bst.Close()
+	if s.sync, err = New(ctx, nil, s, nil, dir); err != nil {
 		util.DestroyStore(engine, dir)
 		t.Fatalf("cannot create sync service: %v", err)
 	}
@@ -156,7 +141,6 @@
 func destroyService(t *testing.T, s *mockService) {
 	defer s.shutdown()
 	defer s.sync.Close()
-	s.bst.Close()
 	if err := util.DestroyStore(s.engine, s.dir); err != nil {
 		t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.dir, err)
 	}
diff --git a/x/ref/services/syncbase/vsync/util.go b/x/ref/services/syncbase/vsync/util.go
index af0b158..2f15cf8 100644
--- a/x/ref/services/syncbase/vsync/util.go
+++ b/x/ref/services/syncbase/vsync/util.go
@@ -9,6 +9,7 @@
 import (
 	"time"
 
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
@@ -81,3 +82,17 @@
 	}
 	return time.Unix(timestamp/nanoPerSec, timestamp%nanoPerSec)
 }
+
+// extractAppKey extracts the app key from the key sent over the wire between
+// two Syncbases. The on-wire key starts with one of the store's reserved
+// prefixes for managed namespaces (e.g. $row, $perms). This function removes
+// that prefix and returns the application component of the key. This is done
+// typically before comparing keys with the SyncGroup prefixes which are defined
+// by the application.
+func extractAppKey(key string) string {
+	parts := util.SplitKeyParts(key)
+	if len(parts) < 2 {
+		vlog.Fatalf("sync: extractAppKey: invalid entry key %s", key)
+	}
+	return util.JoinKeyParts(parts[1:]...)
+}