blob: 56a4eed333e6e9ffa2465021b3e193998aa6e65e [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"io"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/localblobstore"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
"v.io/x/lib/vlog"
)
const (
chunkSize = 8 * 1024
)
////////////////////////////////////////////////////////////
// RPCs for managing blobs between Syncbase and its clients.
func (sd *syncDatabase) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
bst := sd.db.BlobSt()
writer, err := bst.NewBlobWriter(ctx, "")
if err != nil {
return wire.NullBlobRef, err
}
defer writer.CloseWithoutFinalize()
name := writer.Name()
vlog.VI(2).Infof("sync: CreateBlob: blob ref %s", name)
return wire.BlobRef(name), nil
}
func (sd *syncDatabase) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
bst := sd.db.BlobSt()
writer, err := bst.NewBlobWriter(ctx, string(br))
if err != nil {
return err
}
defer writer.CloseWithoutFinalize()
stream := call.RecvStream()
for stream.Advance() {
item := localblobstore.BlockOrFile{Block: stream.Value()}
if err = writer.AppendFragment(item); err != nil {
return err
}
}
return stream.Err()
}
func (sd *syncDatabase) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
bst := sd.db.BlobSt()
writer, err := bst.NewBlobWriter(ctx, string(br))
if err != nil {
return err
}
return writer.Close()
}
func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
bst := sd.db.BlobSt()
reader, err := bst.NewBlobReader(ctx, string(br))
if err != nil {
return 0, err
}
defer reader.Close()
return reader.Size(), nil
}
func (sd *syncDatabase) DeleteBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
return verror.NewErrNotImplemented(ctx)
}
func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
// First get the blob locally if available.
err := sd.getLocalBlob(ctx, call, br, offset)
if err == nil {
return nil
}
// Locate the blob on a remote blob store and fetch it.
return verror.NewErrNotImplemented(ctx)
}
// getLocalBlob looks for a blob in the local store and, if found, reads the
// blob and sends it to the client. If the blob is found, it starts reading it
// from the given offset and sends its bytes into the client stream.
func (sd *syncDatabase) getLocalBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
bst := sd.db.BlobSt()
reader, err := bst.NewBlobReader(ctx, string(br))
if err != nil {
return err
}
defer reader.Close()
buf := make([]byte, chunkSize)
stream := call.SendStream()
for {
nbytes, err := reader.ReadAt(buf, offset)
if err != nil && err != io.EOF {
return err
}
if err == io.EOF || nbytes <= 0 {
break
}
offset += int64(nbytes)
stream.Send(buf[:nbytes])
}
return nil
}
func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
return verror.NewErrNotImplemented(ctx)
}
func (sd *syncDatabase) PinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
return verror.NewErrNotImplemented(ctx)
}
func (sd *syncDatabase) UnpinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
return verror.NewErrNotImplemented(ctx)
}
func (sd *syncDatabase) KeepBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef, rank uint64) error {
return verror.NewErrNotImplemented(ctx)
}
////////////////////////////////////////////////////////////
// RPC for blob fetch between Syncbases.
func (s *syncService) FetchBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
return verror.NewErrNotImplemented(ctx)
}