blob: 7390425e54404037b15547b6263b7def97456c3c [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This file implements the PerSyncgroup table of a blobmap.
package blobmap
import "v.io/v23/context"
import "v.io/v23/verror"
import "v.io/v23/vom"
import "v.io/x/ref/services/syncbase/localblobstore"
import "v.io/x/ref/services/syncbase/server/interfaces"
import "v.io/x/ref/services/syncbase/store"
// perSyncgroupKey() places in keyBuf[] the key used in the store to reference
// the PerSyncgroup for sgId, and returns the correctly sized slice pointing
// into keyBuf. The argument keyBuf is required to be large enough to hold the
// key.
func perSyncgroupKey(sgId interfaces.GroupId, keyBuf []byte) []byte {
if len(keyBuf) < (len(perSyncgroupPrefix) + len(sgId)) {
panic("keyBuf too short")
}
keyLen := copy(keyBuf[:], perSyncgroupPrefix)
keyLen += copy(keyBuf[keyLen:], []byte(sgId))
return keyBuf[:keyLen]
}
// SetPerSyncgroup() sets the PerSyncgroup associated with a syncgroup to *psg.
func (bm *BlobMap) SetPerSyncgroup(ctx *context.T, sgId interfaces.GroupId, psg *localblobstore.PerSyncgroup) (err error) {
var val []byte
val, err = vom.Encode(psg)
if err == nil {
var keyBuf [64]byte
err = bm.st.Put(perSyncgroupKey(sgId, keyBuf[:]), val)
}
return err
}
// GetPerSyncgroup() yields in *psg the PerSyncgroup associated with a syncgroup.
// If there is an error, *psg is set to a canonical empty PerSyncgroup.
// On return, it is guaranteed that any maps in *psg are non-nil.
func (bm *BlobMap) GetPerSyncgroup(ctx *context.T, sgId interfaces.GroupId, psg *localblobstore.PerSyncgroup) (err error) {
var keyBuf [64]byte
var valBuf [maxPerSyncgroupLen]byte
var val []byte
val, err = bm.st.Get(perSyncgroupKey(sgId, keyBuf[:]), valBuf[:])
if err == nil {
err = vom.Decode(val, psg)
}
if err != nil {
*psg = localblobstore.PerSyncgroup{}
}
return err
}
// DeletePerSyncgroup() deletes the PerSyncgroup for the specified syncgroup.
func (bm *BlobMap) DeletePerSyncgroup(ctx *context.T, sgId interfaces.GroupId) error {
var keyBuf [64]byte
return bm.st.Delete(perSyncgroupKey(sgId, keyBuf[:]))
}
// A PerSyncgroupStream allows the client to iterate over the PerSyncgroups associated with syncgroups.
// psgs := bm.NewPerSyncgroupStream(ctx, sgId)
// for psgs.Advance() {
// syncgroup := psgs.SyncgroupName()
// perSyncgroup := psgs.PerSyncgroup()
// ...process blob, perSyncgroup...
// }
// if psgs.Err() != nil {
// ...there was an error...
// }
type PerSyncgroupStream struct {
bm *BlobMap
ctx *context.T
stream store.Stream
keyBuf [maxKeyLen]byte // buffer for keys
valBuf [maxPerSyncgroupLen]byte // buffer for values
sgId interfaces.GroupId // GroupId key for current element
perSyncgroup localblobstore.PerSyncgroup // PerSyncgroup of current element
err error // error encountered.
more bool // whether stream may be consulted again
}
// NewPerSyncgroupStream() returns, as a localblobstore.BlobMetadataStream
// interface, a pointer to a new blobmap.PerSyncgroupStream that allows the
// client to enumerate the PerSyncgroups asscoaiedted with blob IDs, in order.
func (bm *BlobMap) NewPerSyncgroupStream(ctx *context.T) localblobstore.PerSyncgroupStream {
psgs := new(PerSyncgroupStream)
psgs.bm = bm
psgs.ctx = ctx
psgs.stream = bm.st.Scan([]byte(perSyncgroupPrefix), perSyncgroupStreamKeyLimit)
psgs.more = true
return psgs
}
// Advance() stages an element so the client can retrieve the blob ID hash with
// BlobID(), or its PerSyncgroup with PerSyncgroup(). Advance() returns true iff there
// is an element to retrieve. The client must call Advance() before calling
// BlobID() or PerSyncgroup() The client must call Cancel if it does not iterate
// through all elements (i.e. until Advance() returns false). Advance() may
// block if an element is not immediately available.
func (psgs *PerSyncgroupStream) Advance() (ok bool) {
if psgs.more && psgs.err == nil {
if !psgs.stream.Advance() {
psgs.err = psgs.stream.Err()
psgs.more = false // no more stream, even if no error
} else {
var key []byte = psgs.stream.Key(psgs.keyBuf[:])
psgs.sgId = interfaces.GroupId(key[1:])
var value []byte = psgs.stream.Value(psgs.valBuf[:])
psgs.err = vom.Decode(value, &psgs.perSyncgroup)
// The syncgroup ID is always a 43-byte string.
if psgs.err == nil && len(key) != len(perSyncgroupPrefix)+43 {
psgs.err = verror.New(errMalformedPerSyncgroupEntry, psgs.ctx, psgs.bm.dir, key, value)
}
ok = (psgs.err == nil)
if !ok {
psgs.stream.Cancel()
}
}
}
return ok
}
// SyncgroupID() returns the syncgroup Id staged by Advance(). SyncgroupID()
// may panic if Advance() returned false or was not called at all.
// SyncgroupID() does not block.
func (psgs *PerSyncgroupStream) SyncgroupId() interfaces.GroupId {
return psgs.sgId
}
// PerSyncgroup() returns the PerSyncgroup associated with the blob ID staged by
// Advance(). PerSyncgroup() may panic if Advance() returned false or was not
// called at all. PerSyncgroup() does not block.
func (psgs *PerSyncgroupStream) PerSyncgroup() localblobstore.PerSyncgroup {
return psgs.perSyncgroup
}
// Err() returns a non-nil error iff the stream encountered any errors. Err()
// does not block.
func (psgs *PerSyncgroupStream) Err() error {
return psgs.err
}
// Cancel() notifies the stream provider that it can stop producing elements.
// The client must call Cancel() if it does not iterate through all elements
// (i.e. until Advance() returns false). Cancel() is idempotent and can be
// called concurrently with a goroutine that is iterating via Advance() and
// BlobID(). Cancel() causes Advance() to subsequently return false.
// Cancel() does not block.
func (psgs *PerSyncgroupStream) Cancel() {
psgs.stream.Cancel()
}