syncbase/vsync: vdl for client-facing blob API.
Change-Id: If691e483a1359a20f38dd215579cccb42bdc0c50
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 6d221d1..9bdf954 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -61,6 +61,10 @@
// SyncGroupManager implements the API for managing SyncGroups attached to a
// Database.
SyncGroupManager
+
+ // BlobManager implements the API for managing blobs attached to rows in
+ // a Database.
+ BlobManager
}
// Table represents a collection of Rows.
@@ -203,6 +207,60 @@
// aggressively", "sync once per day".
}
+// BlobManager is the interface for blob operations.
+type BlobManager interface {
+ // API for resumable blob creation (append-only). After commit, a blob
+ // is immutable. Before commit, the BlobRef can be used with PutBlob,
+ // GetBlobSize, DeleteBlob, and CommitBlob.
+ //
+ // CreateBlob returns a BlobRef for a newly created blob.
+ CreateBlob() (br BlobRef | error) {access.Write}
+
+ // PutBlob appends the byte stream to the blob.
+ PutBlob(br BlobRef) stream<[]byte, _> error {access.Write}
+
+ // CommitBlob marks the blob as immutable.
+ CommitBlob(br BlobRef) error {access.Write}
+
+ // GetBlobSize returns the count of bytes written as part of the blob
+ // (committed or uncommitted).
+ GetBlobSize(br BlobRef) (uint64 | error) {access.Read}
+
+ // DeleteBlob locally deletes the blob (committed or uncommitted).
+ DeleteBlob(br BlobRef) error {access.Write}
+
+ // GetBlob returns the byte stream from a committed blob starting at offset.
+ GetBlob(br BlobRef, offset uint64) stream<_, []byte> error {access.Read}
+
+ // FetchBlob initiates fetching a blob if not locally found. priority
+ // controls the network priority of the blob. Higher priority blobs are
+ // fetched before the lower priority ones. However an ongoing blob
+ // transfer is not interrupted. Status updates are streamed back to the
+ // client as fetch is in progress.
+ FetchBlob(br BlobRef, priority uint64) stream<_, FetchStatus> error {access.Read}
+
+ // PinBlob locally pins the blob so that it is not evicted.
+ PinBlob(br BlobRef) error {access.Write}
+
+ // UnpinBlob locally unpins the blob so that it can be evicted if needed.
+ UnpinBlob(br BlobRef) error {access.Write}
+
+ // KeepBlob locally caches the blob with the specified rank. Lower
+ // ranked blobs are more eagerly evicted.
+ KeepBlob(br BlobRef, rank uint64) error {access.Write}
+
+ // TODO(hpucha): Add API for efficient blob cloning. Options include:
+ // (1) CloneBlob RPC with an array of mods that sepcify the offset and
+ // len for the new bytes. This might need two len fields to support
+ // growing a blob in the middle instead of just replacing byte for byte
+ // in the src blob. Or perhaps Offset>=0 to mean "read from old blob at
+ // this offset for Length bytes", and Offset<0 to mean "read the next
+ // Length Bytes from the PutBlob() stream". (2) We could adopt API
+ // similar to the local blob store with BlockOrFile segments, giving a
+ // more flexible way to clone blobs. Also provide support for parallel
+ // blob upload.
+}
+
// Watch allows a client to watch for updates in the database. For each watched
// request, the client will receive a reliable stream of watch events without
// re-ordering.
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 31c877b..56c50fa 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -710,6 +710,717 @@
},
}
+// BlobManagerClientMethods is the client interface
+// containing BlobManager methods.
+//
+// BlobManager is the interface for blob operations.
+type BlobManagerClientMethods interface {
+ // API for resumable blob creation (append-only). After commit, a blob
+ // is immutable. Before commit, the BlobRef can be used with PutBlob,
+ // GetBlobSize, DeleteBlob, and CommitBlob.
+ //
+ // CreateBlob returns a BlobRef for a newly created blob.
+ CreateBlob(*context.T, ...rpc.CallOpt) (br BlobRef, err error)
+ // PutBlob appends the byte stream to the blob.
+ PutBlob(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) (BlobManagerPutBlobClientCall, error)
+ // CommitBlob marks the blob as immutable.
+ CommitBlob(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) error
+ // GetBlobSize returns the count of bytes written as part of the blob
+ // (committed or uncommitted).
+ GetBlobSize(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) (uint64, error)
+ // DeleteBlob locally deletes the blob (committed or uncommitted).
+ DeleteBlob(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) error
+ // GetBlob returns the byte stream from a committed blob starting at offset.
+ GetBlob(ctx *context.T, br BlobRef, offset uint64, opts ...rpc.CallOpt) (BlobManagerGetBlobClientCall, error)
+ // FetchBlob initiates fetching a blob if not locally found. priority
+ // controls the network priority of the blob. Higher priority blobs are
+ // fetched before the lower priority ones. However an ongoing blob
+ // transfer is not interrupted. Status updates are streamed back to the
+ // client as fetch is in progress.
+ FetchBlob(ctx *context.T, br BlobRef, priority uint64, opts ...rpc.CallOpt) (BlobManagerFetchBlobClientCall, error)
+ // PinBlob locally pins the blob so that it is not evicted.
+ PinBlob(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) error
+ // UnpinBlob locally unpins the blob so that it can be evicted if needed.
+ UnpinBlob(ctx *context.T, br BlobRef, opts ...rpc.CallOpt) error
+ // KeepBlob locally caches the blob with the specified rank. Lower
+ // ranked blobs are more eagerly evicted.
+ KeepBlob(ctx *context.T, br BlobRef, rank uint64, opts ...rpc.CallOpt) error
+}
+
+// BlobManagerClientStub adds universal methods to BlobManagerClientMethods.
+type BlobManagerClientStub interface {
+ BlobManagerClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// BlobManagerClient returns a client stub for BlobManager.
+func BlobManagerClient(name string) BlobManagerClientStub {
+ return implBlobManagerClientStub{name}
+}
+
+type implBlobManagerClientStub struct {
+ name string
+}
+
+func (c implBlobManagerClientStub) CreateBlob(ctx *context.T, opts ...rpc.CallOpt) (o0 BlobRef, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "CreateBlob", nil, []interface{}{&o0}, opts...)
+ return
+}
+
+func (c implBlobManagerClientStub) PutBlob(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (ocall BlobManagerPutBlobClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "PutBlob", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ ocall = &implBlobManagerPutBlobClientCall{ClientCall: call}
+ return
+}
+
+func (c implBlobManagerClientStub) CommitBlob(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "CommitBlob", []interface{}{i0}, nil, opts...)
+ return
+}
+
+func (c implBlobManagerClientStub) GetBlobSize(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (o0 uint64, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "GetBlobSize", []interface{}{i0}, []interface{}{&o0}, opts...)
+ return
+}
+
+func (c implBlobManagerClientStub) DeleteBlob(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "DeleteBlob", []interface{}{i0}, nil, opts...)
+ return
+}
+
+func (c implBlobManagerClientStub) GetBlob(ctx *context.T, i0 BlobRef, i1 uint64, opts ...rpc.CallOpt) (ocall BlobManagerGetBlobClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetBlob", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ ocall = &implBlobManagerGetBlobClientCall{ClientCall: call}
+ return
+}
+
+func (c implBlobManagerClientStub) FetchBlob(ctx *context.T, i0 BlobRef, i1 uint64, opts ...rpc.CallOpt) (ocall BlobManagerFetchBlobClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "FetchBlob", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ ocall = &implBlobManagerFetchBlobClientCall{ClientCall: call}
+ return
+}
+
+func (c implBlobManagerClientStub) PinBlob(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "PinBlob", []interface{}{i0}, nil, opts...)
+ return
+}
+
+func (c implBlobManagerClientStub) UnpinBlob(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "UnpinBlob", []interface{}{i0}, nil, opts...)
+ return
+}
+
+func (c implBlobManagerClientStub) KeepBlob(ctx *context.T, i0 BlobRef, i1 uint64, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "KeepBlob", []interface{}{i0, i1}, nil, opts...)
+ return
+}
+
+// BlobManagerPutBlobClientStream is the client stream for BlobManager.PutBlob.
+type BlobManagerPutBlobClientStream interface {
+ // SendStream returns the send side of the BlobManager.PutBlob client stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors
+ // encountered while sending, or if Send is called after Close or
+ // the stream has been canceled. Blocks if there is no buffer
+ // space; will unblock when buffer space is available or after
+ // the stream has been canceled.
+ Send(item []byte) error
+ // Close indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items.
+ // This is an optional call - e.g. a client might call Close if it
+ // needs to continue receiving items from the server after it's
+ // done sending. Returns errors encountered while closing, or if
+ // Close is called after the stream has been canceled. Like Send,
+ // blocks if there is no buffer space available.
+ Close() error
+ }
+}
+
+// BlobManagerPutBlobClientCall represents the call returned from BlobManager.PutBlob.
+type BlobManagerPutBlobClientCall interface {
+ BlobManagerPutBlobClientStream
+ // Finish performs the equivalent of SendStream().Close, then blocks until
+ // the server is done, and returns the positional return values for the call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implBlobManagerPutBlobClientCall struct {
+ rpc.ClientCall
+}
+
+func (c *implBlobManagerPutBlobClientCall) SendStream() interface {
+ Send(item []byte) error
+ Close() error
+} {
+ return implBlobManagerPutBlobClientCallSend{c}
+}
+
+type implBlobManagerPutBlobClientCallSend struct {
+ c *implBlobManagerPutBlobClientCall
+}
+
+func (c implBlobManagerPutBlobClientCallSend) Send(item []byte) error {
+ return c.c.Send(item)
+}
+func (c implBlobManagerPutBlobClientCallSend) Close() error {
+ return c.c.CloseSend()
+}
+func (c *implBlobManagerPutBlobClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
+// BlobManagerGetBlobClientStream is the client stream for BlobManager.GetBlob.
+type BlobManagerGetBlobClientStream interface {
+ // RecvStream returns the receiver side of the BlobManager.GetBlob client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() []byte
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+}
+
+// BlobManagerGetBlobClientCall represents the call returned from BlobManager.GetBlob.
+type BlobManagerGetBlobClientCall interface {
+ BlobManagerGetBlobClientStream
+ // Finish blocks until the server is done, and returns the positional return
+ // values for call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implBlobManagerGetBlobClientCall struct {
+ rpc.ClientCall
+ valRecv []byte
+ errRecv error
+}
+
+func (c *implBlobManagerGetBlobClientCall) RecvStream() interface {
+ Advance() bool
+ Value() []byte
+ Err() error
+} {
+ return implBlobManagerGetBlobClientCallRecv{c}
+}
+
+type implBlobManagerGetBlobClientCallRecv struct {
+ c *implBlobManagerGetBlobClientCall
+}
+
+func (c implBlobManagerGetBlobClientCallRecv) Advance() bool {
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implBlobManagerGetBlobClientCallRecv) Value() []byte {
+ return c.c.valRecv
+}
+func (c implBlobManagerGetBlobClientCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implBlobManagerGetBlobClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
+// BlobManagerFetchBlobClientStream is the client stream for BlobManager.FetchBlob.
+type BlobManagerFetchBlobClientStream interface {
+ // RecvStream returns the receiver side of the BlobManager.FetchBlob client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() FetchStatus
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+}
+
+// BlobManagerFetchBlobClientCall represents the call returned from BlobManager.FetchBlob.
+type BlobManagerFetchBlobClientCall interface {
+ BlobManagerFetchBlobClientStream
+ // Finish blocks until the server is done, and returns the positional return
+ // values for call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implBlobManagerFetchBlobClientCall struct {
+ rpc.ClientCall
+ valRecv FetchStatus
+ errRecv error
+}
+
+func (c *implBlobManagerFetchBlobClientCall) RecvStream() interface {
+ Advance() bool
+ Value() FetchStatus
+ Err() error
+} {
+ return implBlobManagerFetchBlobClientCallRecv{c}
+}
+
+type implBlobManagerFetchBlobClientCallRecv struct {
+ c *implBlobManagerFetchBlobClientCall
+}
+
+func (c implBlobManagerFetchBlobClientCallRecv) Advance() bool {
+ c.c.valRecv = FetchStatus{}
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implBlobManagerFetchBlobClientCallRecv) Value() FetchStatus {
+ return c.c.valRecv
+}
+func (c implBlobManagerFetchBlobClientCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implBlobManagerFetchBlobClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
+// BlobManagerServerMethods is the interface a server writer
+// implements for BlobManager.
+//
+// BlobManager is the interface for blob operations.
+type BlobManagerServerMethods interface {
+ // API for resumable blob creation (append-only). After commit, a blob
+ // is immutable. Before commit, the BlobRef can be used with PutBlob,
+ // GetBlobSize, DeleteBlob, and CommitBlob.
+ //
+ // CreateBlob returns a BlobRef for a newly created blob.
+ CreateBlob(*context.T, rpc.ServerCall) (br BlobRef, err error)
+ // PutBlob appends the byte stream to the blob.
+ PutBlob(ctx *context.T, call BlobManagerPutBlobServerCall, br BlobRef) error
+ // CommitBlob marks the blob as immutable.
+ CommitBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // GetBlobSize returns the count of bytes written as part of the blob
+ // (committed or uncommitted).
+ GetBlobSize(ctx *context.T, call rpc.ServerCall, br BlobRef) (uint64, error)
+ // DeleteBlob locally deletes the blob (committed or uncommitted).
+ DeleteBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // GetBlob returns the byte stream from a committed blob starting at offset.
+ GetBlob(ctx *context.T, call BlobManagerGetBlobServerCall, br BlobRef, offset uint64) error
+ // FetchBlob initiates fetching a blob if not locally found. priority
+ // controls the network priority of the blob. Higher priority blobs are
+ // fetched before the lower priority ones. However an ongoing blob
+ // transfer is not interrupted. Status updates are streamed back to the
+ // client as fetch is in progress.
+ FetchBlob(ctx *context.T, call BlobManagerFetchBlobServerCall, br BlobRef, priority uint64) error
+ // PinBlob locally pins the blob so that it is not evicted.
+ PinBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // UnpinBlob locally unpins the blob so that it can be evicted if needed.
+ UnpinBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // KeepBlob locally caches the blob with the specified rank. Lower
+ // ranked blobs are more eagerly evicted.
+ KeepBlob(ctx *context.T, call rpc.ServerCall, br BlobRef, rank uint64) error
+}
+
+// BlobManagerServerStubMethods is the server interface containing
+// BlobManager methods, as expected by rpc.Server.
+// The only difference between this interface and BlobManagerServerMethods
+// is the streaming methods.
+type BlobManagerServerStubMethods interface {
+ // API for resumable blob creation (append-only). After commit, a blob
+ // is immutable. Before commit, the BlobRef can be used with PutBlob,
+ // GetBlobSize, DeleteBlob, and CommitBlob.
+ //
+ // CreateBlob returns a BlobRef for a newly created blob.
+ CreateBlob(*context.T, rpc.ServerCall) (br BlobRef, err error)
+ // PutBlob appends the byte stream to the blob.
+ PutBlob(ctx *context.T, call *BlobManagerPutBlobServerCallStub, br BlobRef) error
+ // CommitBlob marks the blob as immutable.
+ CommitBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // GetBlobSize returns the count of bytes written as part of the blob
+ // (committed or uncommitted).
+ GetBlobSize(ctx *context.T, call rpc.ServerCall, br BlobRef) (uint64, error)
+ // DeleteBlob locally deletes the blob (committed or uncommitted).
+ DeleteBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // GetBlob returns the byte stream from a committed blob starting at offset.
+ GetBlob(ctx *context.T, call *BlobManagerGetBlobServerCallStub, br BlobRef, offset uint64) error
+ // FetchBlob initiates fetching a blob if not locally found. priority
+ // controls the network priority of the blob. Higher priority blobs are
+ // fetched before the lower priority ones. However an ongoing blob
+ // transfer is not interrupted. Status updates are streamed back to the
+ // client as fetch is in progress.
+ FetchBlob(ctx *context.T, call *BlobManagerFetchBlobServerCallStub, br BlobRef, priority uint64) error
+ // PinBlob locally pins the blob so that it is not evicted.
+ PinBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // UnpinBlob locally unpins the blob so that it can be evicted if needed.
+ UnpinBlob(ctx *context.T, call rpc.ServerCall, br BlobRef) error
+ // KeepBlob locally caches the blob with the specified rank. Lower
+ // ranked blobs are more eagerly evicted.
+ KeepBlob(ctx *context.T, call rpc.ServerCall, br BlobRef, rank uint64) error
+}
+
+// BlobManagerServerStub adds universal methods to BlobManagerServerStubMethods.
+type BlobManagerServerStub interface {
+ BlobManagerServerStubMethods
+ // Describe the BlobManager interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// BlobManagerServer returns a server stub for BlobManager.
+// It converts an implementation of BlobManagerServerMethods into
+// an object that may be used by rpc.Server.
+func BlobManagerServer(impl BlobManagerServerMethods) BlobManagerServerStub {
+ stub := implBlobManagerServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implBlobManagerServerStub struct {
+ impl BlobManagerServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implBlobManagerServerStub) CreateBlob(ctx *context.T, call rpc.ServerCall) (BlobRef, error) {
+ return s.impl.CreateBlob(ctx, call)
+}
+
+func (s implBlobManagerServerStub) PutBlob(ctx *context.T, call *BlobManagerPutBlobServerCallStub, i0 BlobRef) error {
+ return s.impl.PutBlob(ctx, call, i0)
+}
+
+func (s implBlobManagerServerStub) CommitBlob(ctx *context.T, call rpc.ServerCall, i0 BlobRef) error {
+ return s.impl.CommitBlob(ctx, call, i0)
+}
+
+func (s implBlobManagerServerStub) GetBlobSize(ctx *context.T, call rpc.ServerCall, i0 BlobRef) (uint64, error) {
+ return s.impl.GetBlobSize(ctx, call, i0)
+}
+
+func (s implBlobManagerServerStub) DeleteBlob(ctx *context.T, call rpc.ServerCall, i0 BlobRef) error {
+ return s.impl.DeleteBlob(ctx, call, i0)
+}
+
+func (s implBlobManagerServerStub) GetBlob(ctx *context.T, call *BlobManagerGetBlobServerCallStub, i0 BlobRef, i1 uint64) error {
+ return s.impl.GetBlob(ctx, call, i0, i1)
+}
+
+func (s implBlobManagerServerStub) FetchBlob(ctx *context.T, call *BlobManagerFetchBlobServerCallStub, i0 BlobRef, i1 uint64) error {
+ return s.impl.FetchBlob(ctx, call, i0, i1)
+}
+
+func (s implBlobManagerServerStub) PinBlob(ctx *context.T, call rpc.ServerCall, i0 BlobRef) error {
+ return s.impl.PinBlob(ctx, call, i0)
+}
+
+func (s implBlobManagerServerStub) UnpinBlob(ctx *context.T, call rpc.ServerCall, i0 BlobRef) error {
+ return s.impl.UnpinBlob(ctx, call, i0)
+}
+
+func (s implBlobManagerServerStub) KeepBlob(ctx *context.T, call rpc.ServerCall, i0 BlobRef, i1 uint64) error {
+ return s.impl.KeepBlob(ctx, call, i0, i1)
+}
+
+func (s implBlobManagerServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implBlobManagerServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{BlobManagerDesc}
+}
+
+// BlobManagerDesc describes the BlobManager interface.
+var BlobManagerDesc rpc.InterfaceDesc = descBlobManager
+
+// descBlobManager hides the desc to keep godoc clean.
+var descBlobManager = rpc.InterfaceDesc{
+ Name: "BlobManager",
+ PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+ Doc: "// BlobManager is the interface for blob operations.",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "CreateBlob",
+ Doc: "// API for resumable blob creation (append-only). After commit, a blob\n// is immutable. Before commit, the BlobRef can be used with PutBlob,\n// GetBlobSize, DeleteBlob, and CommitBlob.\n//\n// CreateBlob returns a BlobRef for a newly created blob.",
+ OutArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ {
+ Name: "PutBlob",
+ Doc: "// PutBlob appends the byte stream to the blob.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ {
+ Name: "CommitBlob",
+ Doc: "// CommitBlob marks the blob as immutable.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ {
+ Name: "GetBlobSize",
+ Doc: "// GetBlobSize returns the count of bytes written as part of the blob\n// (committed or uncommitted).",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // uint64
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "DeleteBlob",
+ Doc: "// DeleteBlob locally deletes the blob (committed or uncommitted).",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ {
+ Name: "GetBlob",
+ Doc: "// GetBlob returns the byte stream from a committed blob starting at offset.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ {"offset", ``}, // uint64
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "FetchBlob",
+ Doc: "// FetchBlob initiates fetching a blob if not locally found. priority\n// controls the network priority of the blob. Higher priority blobs are\n// fetched before the lower priority ones. However an ongoing blob\n// transfer is not interrupted. Status updates are streamed back to the\n// client as fetch is in progress.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ {"priority", ``}, // uint64
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "PinBlob",
+ Doc: "// PinBlob locally pins the blob so that it is not evicted.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ {
+ Name: "UnpinBlob",
+ Doc: "// UnpinBlob locally unpins the blob so that it can be evicted if needed.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ {
+ Name: "KeepBlob",
+ Doc: "// KeepBlob locally caches the blob with the specified rank. Lower\n// ranked blobs are more eagerly evicted.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ {"rank", ``}, // uint64
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ },
+}
+
+// BlobManagerPutBlobServerStream is the server stream for BlobManager.PutBlob.
+type BlobManagerPutBlobServerStream interface {
+ // RecvStream returns the receiver side of the BlobManager.PutBlob server stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() []byte
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+}
+
+// BlobManagerPutBlobServerCall represents the context passed to BlobManager.PutBlob.
+type BlobManagerPutBlobServerCall interface {
+ rpc.ServerCall
+ BlobManagerPutBlobServerStream
+}
+
+// BlobManagerPutBlobServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements BlobManagerPutBlobServerCall.
+type BlobManagerPutBlobServerCallStub struct {
+ rpc.StreamServerCall
+ valRecv []byte
+ errRecv error
+}
+
+// Init initializes BlobManagerPutBlobServerCallStub from rpc.StreamServerCall.
+func (s *BlobManagerPutBlobServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the BlobManager.PutBlob server stream.
+func (s *BlobManagerPutBlobServerCallStub) RecvStream() interface {
+ Advance() bool
+ Value() []byte
+ Err() error
+} {
+ return implBlobManagerPutBlobServerCallRecv{s}
+}
+
+type implBlobManagerPutBlobServerCallRecv struct {
+ s *BlobManagerPutBlobServerCallStub
+}
+
+func (s implBlobManagerPutBlobServerCallRecv) Advance() bool {
+ s.s.errRecv = s.s.Recv(&s.s.valRecv)
+ return s.s.errRecv == nil
+}
+func (s implBlobManagerPutBlobServerCallRecv) Value() []byte {
+ return s.s.valRecv
+}
+func (s implBlobManagerPutBlobServerCallRecv) Err() error {
+ if s.s.errRecv == io.EOF {
+ return nil
+ }
+ return s.s.errRecv
+}
+
+// BlobManagerGetBlobServerStream is the server stream for BlobManager.GetBlob.
+type BlobManagerGetBlobServerStream interface {
+ // SendStream returns the send side of the BlobManager.GetBlob server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item []byte) error
+ }
+}
+
+// BlobManagerGetBlobServerCall represents the context passed to BlobManager.GetBlob.
+type BlobManagerGetBlobServerCall interface {
+ rpc.ServerCall
+ BlobManagerGetBlobServerStream
+}
+
+// BlobManagerGetBlobServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements BlobManagerGetBlobServerCall.
+type BlobManagerGetBlobServerCallStub struct {
+ rpc.StreamServerCall
+}
+
+// Init initializes BlobManagerGetBlobServerCallStub from rpc.StreamServerCall.
+func (s *BlobManagerGetBlobServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// SendStream returns the send side of the BlobManager.GetBlob server stream.
+func (s *BlobManagerGetBlobServerCallStub) SendStream() interface {
+ Send(item []byte) error
+} {
+ return implBlobManagerGetBlobServerCallSend{s}
+}
+
+type implBlobManagerGetBlobServerCallSend struct {
+ s *BlobManagerGetBlobServerCallStub
+}
+
+func (s implBlobManagerGetBlobServerCallSend) Send(item []byte) error {
+ return s.s.Send(item)
+}
+
+// BlobManagerFetchBlobServerStream is the server stream for BlobManager.FetchBlob.
+type BlobManagerFetchBlobServerStream interface {
+ // SendStream returns the send side of the BlobManager.FetchBlob server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item FetchStatus) error
+ }
+}
+
+// BlobManagerFetchBlobServerCall represents the context passed to BlobManager.FetchBlob.
+type BlobManagerFetchBlobServerCall interface {
+ rpc.ServerCall
+ BlobManagerFetchBlobServerStream
+}
+
+// BlobManagerFetchBlobServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements BlobManagerFetchBlobServerCall.
+type BlobManagerFetchBlobServerCallStub struct {
+ rpc.StreamServerCall
+}
+
+// Init initializes BlobManagerFetchBlobServerCallStub from rpc.StreamServerCall.
+func (s *BlobManagerFetchBlobServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// SendStream returns the send side of the BlobManager.FetchBlob server stream.
+func (s *BlobManagerFetchBlobServerCallStub) SendStream() interface {
+ Send(item FetchStatus) error
+} {
+ return implBlobManagerFetchBlobServerCallSend{s}
+}
+
+type implBlobManagerFetchBlobServerCallSend struct {
+ s *BlobManagerFetchBlobServerCallStub
+}
+
+func (s implBlobManagerFetchBlobServerCallSend) Send(item FetchStatus) error {
+ return s.s.Send(item)
+}
+
// DatabaseClientMethods is the client interface
// containing Database methods.
//
@@ -792,6 +1503,8 @@
// SyncGroupManager is the interface for SyncGroup operations.
// TODO(hpucha): Add blessings to create/join and add a refresh method.
SyncGroupManagerClientMethods
+ // BlobManager is the interface for blob operations.
+ BlobManagerClientMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
@@ -831,7 +1544,7 @@
// DatabaseClient returns a client stub for Database.
func DatabaseClient(name string) DatabaseClientStub {
- return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name)}
+ return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name)}
}
type implDatabaseClientStub struct {
@@ -840,6 +1553,7 @@
permissions.ObjectClientStub
DatabaseWatcherClientStub
SyncGroupManagerClientStub
+ BlobManagerClientStub
}
func (c implDatabaseClientStub) Create(ctx *context.T, i0 access.Permissions, opts ...rpc.CallOpt) (err error) {
@@ -1031,6 +1745,8 @@
// SyncGroupManager is the interface for SyncGroup operations.
// TODO(hpucha): Add blessings to create/join and add a refresh method.
SyncGroupManagerServerMethods
+ // BlobManager is the interface for blob operations.
+ BlobManagerServerMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
@@ -1140,6 +1856,8 @@
// SyncGroupManager is the interface for SyncGroup operations.
// TODO(hpucha): Add blessings to create/join and add a refresh method.
SyncGroupManagerServerStubMethods
+ // BlobManager is the interface for blob operations.
+ BlobManagerServerStubMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
@@ -1187,6 +1905,7 @@
ObjectServerStub: permissions.ObjectServer(impl),
DatabaseWatcherServerStub: DatabaseWatcherServer(impl),
SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
+ BlobManagerServerStub: BlobManagerServer(impl),
}
// Initialize GlobState; always check the stub itself first, to handle the
// case where the user has the Glob method defined in their VDL source.
@@ -1203,6 +1922,7 @@
permissions.ObjectServerStub
DatabaseWatcherServerStub
SyncGroupManagerServerStub
+ BlobManagerServerStub
gs *rpc.GlobState
}
@@ -1239,7 +1959,7 @@
}
func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
- return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc}
+ return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc}
}
// DatabaseDesc describes the Database interface.
@@ -1254,6 +1974,7 @@
{"Object", "v.io/v23/services/permissions", "// Object provides access control for Vanadium objects.\n//\n// Vanadium services implementing dynamic access control would typically embed\n// this interface and tag additional methods defined by the service with one of\n// Admin, Read, Write, Resolve etc. For example, the VDL definition of the\n// object would be:\n//\n// package mypackage\n//\n// import \"v.io/v23/security/access\"\n// import \"v.io/v23/services/permissions\"\n//\n// type MyObject interface {\n// permissions.Object\n// MyRead() (string, error) {access.Read}\n// MyWrite(string) error {access.Write}\n// }\n//\n// If the set of pre-defined tags is insufficient, services may define their\n// own tag type and annotate all methods with this new type.\n//\n// Instead of embedding this Object interface, define SetPermissions and\n// GetPermissions in their own interface. Authorization policies will typically\n// respect annotations of a single type. For example, the VDL definition of an\n// object would be:\n//\n// package mypackage\n//\n// import \"v.io/v23/security/access\"\n//\n// type MyTag string\n//\n// const (\n// Blue = MyTag(\"Blue\")\n// Red = MyTag(\"Red\")\n// )\n//\n// type MyObject interface {\n// MyMethod() (string, error) {Blue}\n//\n// // Allow clients to change access via the access.Object interface:\n// SetPermissions(perms access.Permissions, version string) error {Red}\n// GetPermissions() (perms access.Permissions, version string, err error) {Blue}\n// }"},
{"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
+ {"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
},
Methods: []rpc.MethodDesc{
{
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index ffa95ab..1d03be6 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -74,6 +74,28 @@
SyncPriority byte
}
+// BlobRef is a reference to a blob.
+type BlobRef string
+
+const (
+ NullBlobRef = BlobRef("")
+)
+
+// FetchState represents the state transitions of a blob fetch.
+type FetchState enum {
+ Pending // Fetch request is queued.
+ Locating // Blob discovery is in progress to find a source for the blob.
+ Fetching // Blob transfer is in progress.
+ Done // Blob is locally cached.
+}
+
+// FetchStatus describes the progress of an asynchronous blob fetch.
+type FetchStatus struct {
+ State FetchState // State of the blob fetch request.
+ Received uint64 // Total number of bytes received.
+ Total uint64 // Blob size.
+}
+
// ResumeMarker is a pointer in the database event log.
type ResumeMarker string
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index 784ef37..040e253 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -102,6 +102,86 @@
}) {
}
+// BlobRef is a reference to a blob.
+type BlobRef string
+
+func (BlobRef) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BlobRef"`
+}) {
+}
+
+// FetchState represents the state transitions of a blob fetch.
+type FetchState int
+
+const (
+ FetchStatePending FetchState = iota
+ FetchStateLocating
+ FetchStateFetching
+ FetchStateDone
+)
+
+// FetchStateAll holds all labels for FetchState.
+var FetchStateAll = [...]FetchState{FetchStatePending, FetchStateLocating, FetchStateFetching, FetchStateDone}
+
+// FetchStateFromString creates a FetchState from a string label.
+func FetchStateFromString(label string) (x FetchState, err error) {
+ err = x.Set(label)
+ return
+}
+
+// Set assigns label to x.
+func (x *FetchState) Set(label string) error {
+ switch label {
+ case "Pending", "pending":
+ *x = FetchStatePending
+ return nil
+ case "Locating", "locating":
+ *x = FetchStateLocating
+ return nil
+ case "Fetching", "fetching":
+ *x = FetchStateFetching
+ return nil
+ case "Done", "done":
+ *x = FetchStateDone
+ return nil
+ }
+ *x = -1
+ return fmt.Errorf("unknown label %q in nosql.FetchState", label)
+}
+
+// String returns the string label of x.
+func (x FetchState) String() string {
+ switch x {
+ case FetchStatePending:
+ return "Pending"
+ case FetchStateLocating:
+ return "Locating"
+ case FetchStateFetching:
+ return "Fetching"
+ case FetchStateDone:
+ return "Done"
+ }
+ return ""
+}
+
+func (FetchState) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.FetchState"`
+ Enum struct{ Pending, Locating, Fetching, Done string }
+}) {
+}
+
+// FetchStatus describes the progress of an asynchronous blob fetch.
+type FetchStatus struct {
+ State FetchState // State of the blob fetch request.
+ Received uint64 // Total number of bytes received.
+ Total uint64 // Blob size.
+}
+
+func (FetchStatus) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.FetchStatus"`
+}) {
+}
+
// ResumeMarker is a pointer in the database event log.
type ResumeMarker string
@@ -225,9 +305,14 @@
vdl.Register((*KeyValue)(nil))
vdl.Register((*SyncGroupSpec)(nil))
vdl.Register((*SyncGroupMemberInfo)(nil))
+ vdl.Register((*BlobRef)(nil))
+ vdl.Register((*FetchState)(nil))
+ vdl.Register((*FetchStatus)(nil))
vdl.Register((*ResumeMarker)(nil))
vdl.Register((*TablePrefixRange)(nil))
vdl.Register((*WatchRequest)(nil))
vdl.Register((*ChangeType)(nil))
vdl.Register((*Change)(nil))
}
+
+const NullBlobRef = BlobRef("")
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index c38337b..182141a 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -677,7 +677,7 @@
numTbs, _ := strconv.Atoi(args[3])
prefixes := args[4:]
- time.Sleep(10 * time.Second)
+ time.Sleep(15 * time.Second)
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl b/x/ref/services/syncbase/server/interfaces/sync.vdl
index 3bb3667..470d5b9 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl
@@ -38,5 +38,5 @@
// BlobSync methods.
// FetchBlob returns the requested blob.
- FetchBlob() error {access.Read}
+ FetchBlob(br wire.BlobRef) error {access.Read}
}
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl.go b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
index b87397b..adb964f 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
@@ -47,7 +47,7 @@
JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (SyncGroup, error)
// BlobSync methods.
// FetchBlob returns the requested blob.
- FetchBlob(*context.T, ...rpc.CallOpt) error
+ FetchBlob(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) error
}
// SyncClientStub adds universal methods to SyncClientMethods.
@@ -84,8 +84,8 @@
return
}
-func (c implSyncClientStub) FetchBlob(ctx *context.T, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "FetchBlob", nil, nil, opts...)
+func (c implSyncClientStub) FetchBlob(ctx *context.T, i0 nosql.BlobRef, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "FetchBlob", []interface{}{i0}, nil, opts...)
return
}
@@ -218,7 +218,7 @@
JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
// BlobSync methods.
// FetchBlob returns the requested blob.
- FetchBlob(*context.T, rpc.ServerCall) error
+ FetchBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) error
}
// SyncServerStubMethods is the server interface containing
@@ -247,7 +247,7 @@
JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
// BlobSync methods.
// FetchBlob returns the requested blob.
- FetchBlob(*context.T, rpc.ServerCall) error
+ FetchBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) error
}
// SyncServerStub adds universal methods to SyncServerStubMethods.
@@ -291,8 +291,8 @@
return s.impl.JoinSyncGroupAtAdmin(ctx, call, i0, i1, i2)
}
-func (s implSyncServerStub) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
- return s.impl.FetchBlob(ctx, call)
+func (s implSyncServerStub) FetchBlob(ctx *context.T, call rpc.ServerCall, i0 nosql.BlobRef) error {
+ return s.impl.FetchBlob(ctx, call, i0)
}
func (s implSyncServerStub) Globber() *rpc.GlobState {
@@ -341,6 +341,9 @@
{
Name: "FetchBlob",
Doc: "// BlobSync methods.\n// FetchBlob returns the requested blob.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // nosql.BlobRef
+ },
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
},
diff --git a/x/ref/services/syncbase/server/nosql/database_bm.go b/x/ref/services/syncbase/server/nosql/database_bm.go
new file mode 100644
index 0000000..65a1413
--- /dev/null
+++ b/x/ref/services/syncbase/server/nosql/database_bm.go
@@ -0,0 +1,95 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/vsync"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+)
+
+////////////////////////////////////////////////////////////////////////////////
+// RPCs for managing blobs between Syncbase and its clients.
+
+func (d *databaseReq) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
+ if d.batchId != nil {
+ return wire.NullBlobRef, wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.CreateBlob(ctx, call)
+}
+
+func (d *databaseReq) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.PutBlob(ctx, call, br)
+}
+
+func (d *databaseReq) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.CommitBlob(ctx, call, br)
+}
+
+func (d *databaseReq) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (uint64, error) {
+ if d.batchId != nil {
+ return 0, wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetBlobSize(ctx, call, br)
+}
+
+func (d *databaseReq) DeleteBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.DeleteBlob(ctx, call, br)
+}
+
+func (d *databaseReq) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset uint64) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetBlob(ctx, call, br, offset)
+}
+
+func (d *databaseReq) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.FetchBlob(ctx, call, br, priority)
+}
+
+func (d *databaseReq) PinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.PinBlob(ctx, call, br)
+}
+
+func (d *databaseReq) UnpinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.UnpinBlob(ctx, call, br)
+}
+
+func (d *databaseReq) KeepBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef, rank uint64) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ sd := vsync.NewSyncDatabase(d)
+ return sd.KeepBlob(ctx, call, br, rank)
+}
diff --git a/x/ref/services/syncbase/server/nosql/database_sgm.go b/x/ref/services/syncbase/server/nosql/database_sgm.go
index 62c905a..10ad323 100644
--- a/x/ref/services/syncbase/server/nosql/database_sgm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sgm.go
@@ -5,9 +5,8 @@
package nosql
import (
- "v.io/syncbase/x/ref/services/syncbase/vsync"
-
wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/vsync"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
diff --git a/x/ref/services/syncbase/vsync/blob.go b/x/ref/services/syncbase/vsync/blob.go
index a50b58c..7c98be0 100644
--- a/x/ref/services/syncbase/vsync/blob.go
+++ b/x/ref/services/syncbase/vsync/blob.go
@@ -5,14 +5,58 @@
package vsync
import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
)
-//////////////////////////////////////////////////
-// Methods for blob fetch between Syncbases.
+////////////////////////////////////////////////////////////
+// RPCs for managing blobs between Syncbase and its clients.
-func (s *syncService) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
+func (sd *syncDatabase) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
+ return wire.BlobRef(""), verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (uint64, error) {
+ return 0, verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) DeleteBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset uint64) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) PinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) UnpinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (sd *syncDatabase) KeepBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef, rank uint64) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+////////////////////////////////////////////////////////////
+// RPC for blob fetch between Syncbases.
+
+func (s *syncService) FetchBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
return verror.NewErrNotImplemented(ctx)
}