Merge "syncbase/vsync: Integrate log and gen vector functionality (Part 2/2)."
diff --git a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index 8e173d1..43cc542 100644
--- a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -41,6 +41,7 @@
import "sync"
import "time"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
import "v.io/v23/context"
import "v.io/v23/verror"
@@ -247,22 +248,13 @@
// -----------------------------------------------------------
-// A BlockOrFile represents a vector of bytes, and contains either a data block
-// (as a []byte), or a (file name, size, offset) triple.
-type BlockOrFile struct {
- Block []byte // If FileName is empty, the bytes represented.
- FileName string // If non-empty, the name of the file containing the bytes.
- Size int64 // If FileName is non-empty, the number of bytes (or -1 for "all")
- Offset int64 // If FileName is non-empty, the offset of the relevant bytes within the file.
-}
-
// addFragment() ensures that the store *fscabs contains a fragment comprising
// the catenation of the byte vectors named by item[..].block and the contents
// of the files named by item[..].filename. The block field is ignored if
// fileName!="". The fragment is not physically added if already present.
// The fragment is added to the fragment list of the descriptor *desc.
func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
- desc *blobDesc, item ...BlockOrFile) (fileName string, size int64, err error) {
+ desc *blobDesc, item ...localblobstore.BlockOrFile) (fileName string, size int64, err error) {
hasher := md5.New()
var buf []byte
@@ -543,12 +535,12 @@
// should not be used concurrently by multiple threads. The returned handle
// should be closed with either the Close() or CloseWithoutFinalize() method to
// avoid leaking file handles.
-func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (bw *BlobWriter, err error) {
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (localblobstore.BlobWriter, error) {
+ var bw *BlobWriter
newName := newBlobName()
- var f *file
fileName := filepath.Join(fscabs.rootName, newName)
os.MkdirAll(filepath.Dir(fileName), dirPermissions)
- f, err = newFile(os.Create(fileName))
+ f, err := newFile(os.Create(fileName))
if err == nil {
bw = new(BlobWriter)
bw.fscabs = fscabs
@@ -568,8 +560,9 @@
// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on an
// old, but unfinalized blob name.
-func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (bw *BlobWriter, err error) {
- bw = new(BlobWriter)
+func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) {
+ var err error
+ bw := new(BlobWriter)
bw.fscabs = fscabs
bw.ctx = ctx
bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
@@ -639,7 +632,7 @@
// AppendFragment() appends a fragment to the blob being written by *bw, where
// the fragment is composed of the byte vectors described by the elements of
// item[]. The fragment is copied into the blob store.
-func (bw *BlobWriter) AppendFragment(item ...BlockOrFile) (err error) {
+func (bw *BlobWriter) AppendFragment(item ...localblobstore.BlockOrFile) (err error) {
if bw.f == nil {
panic("fs_cablobstore.BlobWriter programming error: AppendFragment() after Close()")
}
@@ -782,7 +775,7 @@
// NewBlobReader() returns a pointer to a newly allocated BlobReader on the
// specified blobName. BlobReaders should not be used concurrently by multiple
// threads. Returned handles should be closed with Close().
-func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br *BlobReader, err error) {
+func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br localblobstore.BlobReader, err error) {
var desc *blobDesc
desc, err = fscabs.getBlob(ctx, blobName)
if err == nil {
@@ -954,7 +947,7 @@
// if fscabsi.Err() != nil {
// // The loop terminated early due to an error.
// }
-func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Iter {
stack := make([]dirListing, 1)
stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
return &FsCasIter{fscabs: fscabs, stack: stack}
@@ -970,7 +963,7 @@
// if fscabsi.Err() != nil {
// // The loop terminated early due to an error.
// }
-func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Iter {
stack := make([]dirListing, 1)
stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
return &FsCasIter{fscabs: fscabs, stack: stack}
diff --git a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
index ded5e3c..886f5c4 100644
--- a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
+++ b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -5,345 +5,16 @@
// A test for fs_cablobstore
package fs_cablobstore_test
-import "bytes"
-import "crypto/md5"
-import "fmt"
-import "io"
import "io/ioutil"
import "os"
-import "path/filepath"
import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
-import "v.io/v23/context"
-import "v.io/v23/verror"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
import "v.io/x/ref/test"
import _ "v.io/x/ref/runtime/factories/generic"
-func TestCreate(t *testing.T) {
- ctx, shutdown := test.V23Init()
- defer shutdown()
-
- // Make a temporary directory.
- var err error
- var testDirName string
- testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
- if err != nil {
- t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
- }
- defer os.RemoveAll(testDirName)
-
- // Check that we can create an fs_cablobstore.
- var fscabs *fs_cablobstore.FsCaBlobStore
- fscabs, err = fs_cablobstore.Create(ctx, testDirName)
- if err != nil {
- t.Errorf("fs_cablobstore.Create failed: %v", err)
- }
-
- // Check that there are no files in the newly-created tree.
- iterator := fscabs.ListBlobIds(ctx)
- for iterator.Advance() {
- fileName := iterator.Value()
- t.Errorf("unexpected file %q\n", fileName)
- }
- if iterator.Err() != nil {
- t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
- }
-}
-
-// A blobOrBlockOrFile represents some bytes that may be contained in a named
-// blob, a named file, or in an explicit slice of bytes.
-type blobOrBlockOrFile struct {
- blob string // If non-emtpy, the name of the blob containing the bytes.
- file string // If non-empty and blob is empty, the name of the file containing the bytes.
- size int64 // Size of part of file or blob, or -1 for "everything until EOF".
- offset int64 // Offset within file or blob.
- block []byte // If both blob and file are empty, a slice containing the bytes.
-}
-
-// A testBlob records that some specified content has been stored with a given
-// blob name in the blob store.
-type testBlob struct {
- content []byte // content that has been stored.
- blobName string // the name of the blob.
-}
-
-// removeBlobFromBlobVector() removes the entry named blobName from
-// blobVector[], returning the new vector.
-func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
- n := len(blobVector)
- i := 0
- for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
- }
- if i != n {
- blobVector[i] = blobVector[n-1]
- blobVector = blobVector[0 : n-1]
- }
- return blobVector
-}
-
-// writeBlob() writes a new blob to *fscabs, and returns its name. The new
-// blob's content is described by the elements of data[]. Any error messages
-// generated include the index of the blob in blobVector and its content; the
-// latter is assumed to be printable. The expected content of the the blob is
-// "content", so that this routine can check it. If useResume is true, and data[]
-// has length more than 1, the function artificially uses ResumeBlobWriter(),
-// to test it.
-func writeBlob(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob,
- content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
- var bw *fs_cablobstore.BlobWriter
- var err error
- bw, err = fscabs.NewBlobWriter(ctx)
- if err != nil {
- t.Fatalf("fs_cablobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
- }
- blobName := bw.Name()
-
- // Construct the blob from the pieces.
- // There is a loop within the loop to exercise the possibility of
- // passing multiple fragments to AppendFragment().
- for i := 0; i != len(data) && err == nil; {
- if len(data[i].blob) != 0 {
- err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
- if err != nil {
- t.Errorf("fs_cablobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
- }
- i++
- } else {
- var pieces []fs_cablobstore.BlockOrFile
- for ; i != len(data) && len(data[i].blob) == 0; i++ {
- if len(data[i].file) != 0 {
- pieces = append(pieces, fs_cablobstore.BlockOrFile{
- FileName: data[i].file,
- Size: data[i].size,
- Offset: data[i].offset})
- } else {
- pieces = append(pieces, fs_cablobstore.BlockOrFile{Block: data[i].block})
- }
- }
- err = bw.AppendFragment(pieces...)
- if err != nil {
- t.Errorf("fs_cablobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
- }
- }
- if useResume && i < len(data)-1 && err == nil {
- err = bw.CloseWithoutFinalize()
- if err == nil {
- bw, err = fscabs.ResumeBlobWriter(ctx, blobName)
- }
- }
- }
-
- if bw != nil {
- if bw.Size() != int64(len(content)) {
- t.Errorf("fs_cablobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
- }
- if bw.IsFinalized() {
- t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
- }
- err = bw.Close()
- if err != nil {
- t.Errorf("fs_cablobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
- }
- if !bw.IsFinalized() {
- t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
- }
- if bw.Size() != int64(len(content)) {
- t.Errorf("fs_cablobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
- }
- if bw.Name() != blobName {
- t.Errorf("fs_cablobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
- }
- hasher := md5.New()
- hasher.Write(content)
- if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
- t.Errorf("fs_cablobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
- }
- }
-
- return append(blobVector,
- testBlob{
- content: content,
- blobName: blobName,
- })
-}
-
-// readBlob() returns a substring of the content of the blob named blobName in *fscabs.
-// The return values are:
-// - the "size" bytes from the content, starting at the given "offset",
-// measured from "whence" (as defined by io.Seeker.Seek).
-// - the position to which BlobBeader seeks to,
-// - the md5 hash of the bytes read, and
-// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
-// - and error.
-func readBlob(ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobName string,
- size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
-
- var br *fs_cablobstore.BlobReader
- hasher := md5.New()
- br, err = fscabs.NewBlobReader(ctx, blobName)
- if err == nil {
- buf := make([]byte, 8192, 8192)
- fullHash = br.Hash()
- pos, err = br.Seek(offset, whence)
- if err == nil {
- var n int
- first := true // Read at least once, to test reading zero bytes.
- for err == nil && (size == -1 || int64(len(content)) < size || first) {
- // Read just what was asked for.
- var toRead []byte = buf
- if size >= 0 && int(size)-len(content) < len(buf) {
- toRead = buf[0 : int(size)-len(content)]
- }
- n, err = br.Read(toRead)
- hasher.Write(toRead[0:n])
- if size >= 0 && int64(len(content)+n) > size {
- n = int(size) - len(content)
- }
- content = append(content, toRead[0:n]...)
- first = false
- }
- }
- br.Close()
- }
- return content, pos, hasher.Sum(nil), fullHash, err
-}
-
-// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
-// read, and that they contain the appropriate data.
-func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob) {
- for i := range blobVector {
- var size int64
- data := blobVector[i].content
- dataLen := int64(len(data))
- blobName := blobVector[i].blobName
- for size = -1; size != dataLen+1; size++ {
- var offset int64
- for offset = -dataLen - 1; offset != dataLen+1; offset++ {
- for whence := -1; whence != 4; whence++ {
- content, pos, hash, fullHash, err := readBlob(ctx, fscabs, blobName, size, offset, whence)
-
- // Compute expected seek position.
- expectedPos := offset
- if whence == 2 {
- expectedPos += dataLen
- }
-
- // Computed expected size.
- expectedSize := size
- if expectedSize == -1 || expectedPos+expectedSize > dataLen {
- expectedSize = dataLen - expectedPos
- }
-
- // Check that reads behave as expected.
- if (whence == -1 || whence == 3) &&
- verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
- // Expected error from bad "whence" value.
- } else if expectedPos < 0 &&
- verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
- // Expected error from negative Seek position.
- } else if expectedPos > dataLen &&
- verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
- // Expected error from too high a Seek position.
- } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
- bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
- pos == expectedPos && expectedPos+expectedSize == dataLen {
- // Expected success with EOF.
- } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
- bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
- pos == expectedPos && expectedPos+expectedSize != dataLen {
- if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
- t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v (blob is %q)",
- string(data), size, offset, whence,
- hash, fullHash, blobName)
- } // Else expected success without EOF.
- } else {
- t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d yields %q pos %d %v (blob is %q)",
- string(data), size, offset, whence,
- content, pos, err, blobName)
- }
- }
- }
- }
- }
-}
-
-// checkAllBlobs() checks all the blobs in *fscabs to ensure they correspond to
-// those in blobVector[].
-func checkAllBlobs(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob, testDirName string) {
- blobCount := 0
- iterator := fscabs.ListBlobIds(ctx)
- for iterator.Advance() {
- fileName := iterator.Value()
- i := 0
- for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
- }
- if i == len(blobVector) {
- t.Errorf("fs_cablobstore.ListBlobIds found unexpected file %s", fileName)
- } else {
- content, pos, hash, fullHash, err := readBlob(ctx, fscabs, fileName, -1, 0, 0)
- if err != nil && err != io.EOF {
- t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
- } else if bytes.Compare(blobVector[i].content, content) != 0 {
- t.Errorf("fs_cablobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
- filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
- } else if pos != 0 {
- t.Errorf("fs_cablobstore.ListCAIds Seek on %q returned %d instead of 0",
- filepath.Join(testDirName, fileName), pos)
- }
- if bytes.Compare(hash, fullHash) != 0 {
- t.Errorf("fs_cablobstore.ListCAIds read on %q; got hash %v, expected %v",
- fileName, hash, fullHash)
- }
- }
- blobCount++
- }
- if iterator.Err() != nil {
- t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
- }
- if blobCount != len(blobVector) {
- t.Errorf("fs_cablobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
- }
-}
-
-// checkFragments() checks all the fragments in *fscabs to ensure they
-// correspond to those fragmentMap[].
-func checkFragments(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, fragmentMap map[string]bool, testDirName string) {
- caCount := 0
- iterator := fscabs.ListCAIds(ctx)
- for iterator.Advance() {
- fileName := iterator.Value()
- content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
- if err != nil && err != io.EOF {
- t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
- } else if !fragmentMap[string(content)] {
- t.Errorf("fs_cablobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
- } else {
- hasher := md5.New()
- hasher.Write(content)
- hash := hasher.Sum(nil)
- nameFromContent := filepath.Join("cas",
- fmt.Sprintf("%02x", hash[0]),
- fmt.Sprintf("%02x", hash[1]),
- fmt.Sprintf("%02x", hash[2]),
- fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
- hash[3],
- hash[4], hash[5], hash[6], hash[7],
- hash[8], hash[9], hash[10], hash[11],
- hash[12], hash[13], hash[14], hash[15]))
- if nameFromContent != fileName {
- t.Errorf("fs_cablobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
- }
- }
- caCount++
- }
- if iterator.Err() != nil {
- t.Errorf("fs_cablobstore.ListCAIds iteration failed: %v", iterator.Err())
- }
- if caCount != len(fragmentMap) {
- t.Errorf("fs_cablobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
- }
-}
-
// This test case tests adding files, retrieving them and deleting them. One
// can't retrieve or delete something that hasn't been created, so it's all one
// test case.
@@ -354,231 +25,19 @@
// Make a temporary directory.
var err error
var testDirName string
- testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
+ testDirName, err = ioutil.TempDir("", "localblobstore_test")
if err != nil {
- t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
+ t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
}
defer os.RemoveAll(testDirName)
// Create an fs_cablobstore.
- var fscabs *fs_cablobstore.FsCaBlobStore
- fscabs, err = fs_cablobstore.Create(ctx, testDirName)
+ var bs localblobstore.BlobStore
+ bs, err = fs_cablobstore.Create(ctx, testDirName)
if err != nil {
t.Fatalf("fs_cablobstore.Create failed: %v", err)
}
- // Create the strings: "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
- womData := []byte("wom")
- batData := []byte("bat")
- wombatData := []byte("wombat")
- batwomData := []byte("batwom")
- atwoData := []byte("atwo")
- atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
-
- // fragmentMap will have an entry per content-addressed fragment.
- fragmentMap := make(map[string]bool)
-
- // Create the blobs, by various means.
-
- var blobVector []testBlob // Accumulate the blobs we create here.
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- womData, false,
- blobOrBlockOrFile{block: womData})
- womName := blobVector[len(blobVector)-1].blobName
- fragmentMap[string(womData)] = true
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- batData, false,
- blobOrBlockOrFile{block: batData})
- batName := blobVector[len(blobVector)-1].blobName
- fragmentMap[string(batData)] = true
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, false,
- blobOrBlockOrFile{block: wombatData})
- firstWombatName := blobVector[len(blobVector)-1].blobName
- fragmentMap[string(wombatData)] = true
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, true,
- blobOrBlockOrFile{block: womData},
- blobOrBlockOrFile{block: batData})
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, false,
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: -1,
- offset: 0})
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, false,
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: 6,
- offset: 0})
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- batwomData, false,
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: 3,
- offset: 3},
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: 3,
- offset: 0})
- batwomName := blobVector[len(blobVector)-1].blobName
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- atwoData, false,
- blobOrBlockOrFile{
- blob: batwomName,
- size: 4,
- offset: 1})
- atwoName := blobVector[len(blobVector)-1].blobName
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- atwoatwoombatatwoData, true,
- blobOrBlockOrFile{
- blob: atwoName,
- size: -1,
- offset: 0},
- blobOrBlockOrFile{
- blob: atwoName,
- size: 4,
- offset: 0},
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: -1,
- offset: 1},
- blobOrBlockOrFile{
- blob: batName,
- size: -1,
- offset: 1},
- blobOrBlockOrFile{
- blob: womName,
- size: 2,
- offset: 0})
- atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Nothing should change if we garbage collect.
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Ensure that deleting non-existent blobs fails.
- err = fscabs.DeleteBlob(ctx, "../../../../etc/passwd")
- if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
- t.Errorf("DeleteBlob attempted to delete a bogus blob name")
- }
- err = fscabs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
- if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
- t.Errorf("DeleteBlob attempted to delete a bogus blob name")
- }
-
- // -------------------------------------------------
- // Delete a blob.
- err = fscabs.DeleteBlob(ctx, batName)
- if err != nil {
- t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
- }
- blobVector = removeBlobFromBlobVector(blobVector, batName)
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Nothing should change if we garbage collect.
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Open a BlobReader on a blob we're about to delete,
- // so its fragments won't be garbage collected.
-
- var br *fs_cablobstore.BlobReader
- br, err = fscabs.NewBlobReader(ctx, atwoatwoombatatwoName)
- if err != nil {
- t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
- }
-
- // -------------------------------------------------
- // Delete a blob. This should be the last on-disc reference to the
- // content-addressed fragment "bat", but the fragment won't be deleted
- // until close the reader and garbage collect.
- err = fscabs.DeleteBlob(ctx, atwoatwoombatatwoName)
- if err != nil {
- t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
- }
- blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Garbage collection should change nothing; the fragment involved
- // is still referenced from the open reader *br.
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
-
- // Close the open BlobReader and garbage collect.
- err = br.Close()
- if err != nil {
- t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
- }
- delete(fragmentMap, string(batData))
-
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Delete all blobs.
- for len(blobVector) != 0 {
- err = fscabs.DeleteBlob(ctx, blobVector[0].blobName)
- if err != nil {
- t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
- }
- blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
- }
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // The remaining fragments should be removed when we garbage collect.
- for frag := range fragmentMap {
- delete(fragmentMap, frag)
- }
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+ // Test it.
+ localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
}
diff --git a/services/syncbase/localblobstore/localblobstore_test.go b/services/syncbase/localblobstore/localblobstore_test.go
new file mode 100644
index 0000000..973a9c8
--- /dev/null
+++ b/services/syncbase/localblobstore/localblobstore_test.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test for localblobstore
+package localblobstore_test
+
+import "io/ioutil"
+import "os"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// This test case tests adding files, retrieving them and deleting them. One
+// can't retrieve or delete something that hasn't been created, so it's all one
+// test case.
+func TestAddRetrieveAndDelete(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ // Make a temporary directory.
+ var err error
+ var testDirName string
+ testDirName, err = ioutil.TempDir("", "localblobstore_test")
+ if err != nil {
+ t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+ }
+ defer os.RemoveAll(testDirName)
+
+ // Create an fs_cablobstore.
+ var bs localblobstore.BlobStore
+ bs, err = fs_cablobstore.Create(ctx, testDirName)
+ if err != nil {
+ t.Fatalf("fs_cablobstore.Create failed: %v", err)
+ }
+
+ // Test it.
+ localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
+}
diff --git a/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go b/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
new file mode 100644
index 0000000..662e964
--- /dev/null
+++ b/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
@@ -0,0 +1,548 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test library for localblobstores.
+package localblobstore_testlib
+
+import "bytes"
+import "crypto/md5"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "path/filepath"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+// A blobOrBlockOrFile represents some bytes that may be contained in a named
+// blob, a named file, or in an explicit slice of bytes.
+type blobOrBlockOrFile struct {
+ blob string // If non-empty, the name of the blob containing the bytes.
+ file string // If non-empty and blob is empty, the name of the file containing the bytes.
+ size int64 // Size of part of file or blob, or -1 for "everything until EOF".
+ offset int64 // Offset within file or blob.
+ block []byte // If both blob and file are empty, a slice containing the bytes.
+}
+
+// A testBlob records that some specified content has been stored with a given
+// blob name in the blob store.
+type testBlob struct {
+ content []byte // content that has been stored.
+ blobName string // the name of the blob.
+}
+
+// removeBlobFromBlobVector() removes the entry named blobName from
+// blobVector[], returning the new vector.
+func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
+ n := len(blobVector)
+ i := 0
+ for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
+ }
+ if i != n {
+ blobVector[i] = blobVector[n-1]
+ blobVector = blobVector[0 : n-1]
+ }
+ return blobVector
+}
+
+// writeBlob() writes a new blob to bs, and returns its name. The new
+// blob's content is described by the elements of data[]. Any error messages
+// generated include the index of the blob in blobVector and its content; the
+// latter is assumed to be printable. The expected content of the the blob is
+// "content", so that this routine can check it. If useResume is true, and data[]
+// has length more than 1, the function artificially uses ResumeBlobWriter(),
+// to test it.
+func writeBlob(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob,
+ content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
+ var bw localblobstore.BlobWriter
+ var err error
+ bw, err = bs.NewBlobWriter(ctx)
+ if err != nil {
+ t.Errorf("localblobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
+ }
+ blobName := bw.Name()
+
+ // Construct the blob from the pieces.
+ // There is a loop within the loop to exercise the possibility of
+ // passing multiple fragments to AppendFragment().
+ for i := 0; i != len(data) && err == nil; {
+ if len(data[i].blob) != 0 {
+ err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
+ if err != nil {
+ t.Errorf("localblobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
+ }
+ i++
+ } else {
+ var pieces []localblobstore.BlockOrFile
+ for ; i != len(data) && len(data[i].blob) == 0; i++ {
+ if len(data[i].file) != 0 {
+ pieces = append(pieces, localblobstore.BlockOrFile{
+ FileName: data[i].file,
+ Size: data[i].size,
+ Offset: data[i].offset})
+ } else {
+ pieces = append(pieces, localblobstore.BlockOrFile{Block: data[i].block})
+ }
+ }
+ err = bw.AppendFragment(pieces...)
+ if err != nil {
+ t.Errorf("localblobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
+ }
+ }
+ if useResume && i < len(data)-1 && err == nil {
+ err = bw.CloseWithoutFinalize()
+ if err == nil {
+ bw, err = bs.ResumeBlobWriter(ctx, blobName)
+ }
+ }
+ }
+
+ if bw != nil {
+ if bw.Size() != int64(len(content)) {
+ t.Errorf("localblobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+ }
+ if bw.IsFinalized() {
+ t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+ }
+ err = bw.Close()
+ if err != nil {
+ t.Errorf("localblobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
+ }
+ if !bw.IsFinalized() {
+ t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+ }
+ if bw.Size() != int64(len(content)) {
+ t.Errorf("localblobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+ }
+ if bw.Name() != blobName {
+ t.Errorf("localblobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
+ }
+ hasher := md5.New()
+ hasher.Write(content)
+ if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
+ t.Errorf("localblobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
+ }
+ }
+
+ return append(blobVector,
+ testBlob{
+ content: content,
+ blobName: blobName,
+ })
+}
+
+// readBlob() returns a substring of the content of the blob named blobName in bs.
+// The return values are:
+// - the "size" bytes from the content, starting at the given "offset",
+// measured from "whence" (as defined by io.Seeker.Seek).
+// - the position to which BlobBeader seeks to,
+// - the md5 hash of the bytes read, and
+// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
+// - and error.
+func readBlob(ctx *context.T, bs localblobstore.BlobStore, blobName string,
+ size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
+
+ var br localblobstore.BlobReader
+ hasher := md5.New()
+ br, err = bs.NewBlobReader(ctx, blobName)
+ if err == nil {
+ buf := make([]byte, 8192, 8192)
+ fullHash = br.Hash()
+ pos, err = br.Seek(offset, whence)
+ if err == nil {
+ var n int
+ first := true // Read at least once, to test reading zero bytes.
+ for err == nil && (size == -1 || int64(len(content)) < size || first) {
+ // Read just what was asked for.
+ var toRead []byte = buf
+ if size >= 0 && int(size)-len(content) < len(buf) {
+ toRead = buf[0 : int(size)-len(content)]
+ }
+ n, err = br.Read(toRead)
+ hasher.Write(toRead[0:n])
+ if size >= 0 && int64(len(content)+n) > size {
+ n = int(size) - len(content)
+ }
+ content = append(content, toRead[0:n]...)
+ first = false
+ }
+ }
+ br.Close()
+ }
+ return content, pos, hasher.Sum(nil), fullHash, err
+}
+
+// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
+// read, and that they contain the appropriate data.
+func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob) {
+ for i := range blobVector {
+ var size int64
+ data := blobVector[i].content
+ dataLen := int64(len(data))
+ blobName := blobVector[i].blobName
+ for size = -1; size != dataLen+1; size++ {
+ var offset int64
+ for offset = -dataLen - 1; offset != dataLen+1; offset++ {
+ for whence := -1; whence != 4; whence++ {
+ content, pos, hash, fullHash, err := readBlob(ctx, bs, blobName, size, offset, whence)
+
+ // Compute expected seek position.
+ expectedPos := offset
+ if whence == 2 {
+ expectedPos += dataLen
+ }
+
+ // Computed expected size.
+ expectedSize := size
+ if expectedSize == -1 || expectedPos+expectedSize > dataLen {
+ expectedSize = dataLen - expectedPos
+ }
+
+ // Check that reads behave as expected.
+ if (whence == -1 || whence == 3) &&
+ verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
+ // Expected error from bad "whence" value.
+ } else if expectedPos < 0 &&
+ verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
+ // Expected error from negative Seek position.
+ } else if expectedPos > dataLen &&
+ verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
+ // Expected error from too high a Seek position.
+ } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+ bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
+ pos == expectedPos && expectedPos+expectedSize == dataLen {
+ // Expected success with EOF.
+ } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+ bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
+ pos == expectedPos && expectedPos+expectedSize != dataLen {
+ if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
+ t.Errorf("localblobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v (blob is %q)",
+ string(data), size, offset, whence,
+ hash, fullHash, blobName)
+ } // Else expected success without EOF.
+ } else {
+ t.Errorf("localblobstore read test on %q size %d offset %d whence %d yields %q pos %d %v (blob is %q)",
+ string(data), size, offset, whence,
+ content, pos, err, blobName)
+ }
+ }
+ }
+ }
+ }
+}
+
+// checkAllBlobs() checks all the blobs in bs to ensure they correspond to
+// those in blobVector[].
+func checkAllBlobs(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob, testDirName string) {
+ blobCount := 0
+ iterator := bs.ListBlobIds(ctx)
+ for iterator.Advance() {
+ fileName := iterator.Value()
+ i := 0
+ for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
+ }
+ if i == len(blobVector) {
+ t.Errorf("localblobstore.ListBlobIds found unexpected file %s", fileName)
+ } else {
+ content, pos, hash, fullHash, err := readBlob(ctx, bs, fileName, -1, 0, 0)
+ if err != nil && err != io.EOF {
+ t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+ } else if bytes.Compare(blobVector[i].content, content) != 0 {
+ t.Errorf("localblobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
+ filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
+ } else if pos != 0 {
+ t.Errorf("localblobstore.ListCAIds Seek on %q returned %d instead of 0",
+ filepath.Join(testDirName, fileName), pos)
+ }
+ if bytes.Compare(hash, fullHash) != 0 {
+ t.Errorf("localblobstore.ListCAIds read on %q; got hash %v, expected %v",
+ fileName, hash, fullHash)
+ }
+ }
+ blobCount++
+ }
+ if iterator.Err() != nil {
+ t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+ }
+ if blobCount != len(blobVector) {
+ t.Errorf("localblobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
+ }
+}
+
+// checkFragments() checks all the fragments in bs to ensure they
+// correspond to those fragmentMap[], iff testDirName is non-empty.
+func checkFragments(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, fragmentMap map[string]bool, testDirName string) {
+ if testDirName != "" {
+ caCount := 0
+ iterator := bs.ListCAIds(ctx)
+ for iterator.Advance() {
+ fileName := iterator.Value()
+ content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
+ if err != nil && err != io.EOF {
+ t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+ } else if !fragmentMap[string(content)] {
+ t.Errorf("localblobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
+ } else {
+ hasher := md5.New()
+ hasher.Write(content)
+ hash := hasher.Sum(nil)
+ nameFromContent := filepath.Join("cas",
+ fmt.Sprintf("%02x", hash[0]),
+ fmt.Sprintf("%02x", hash[1]),
+ fmt.Sprintf("%02x", hash[2]),
+ fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+ hash[3],
+ hash[4], hash[5], hash[6], hash[7],
+ hash[8], hash[9], hash[10], hash[11],
+ hash[12], hash[13], hash[14], hash[15]))
+ if nameFromContent != fileName {
+ t.Errorf("localblobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
+ }
+ }
+ caCount++
+ }
+ if iterator.Err() != nil {
+ t.Errorf("localblobstore.ListCAIds iteration failed: %v", iterator.Err())
+ }
+ if caCount != len(fragmentMap) {
+ t.Errorf("localblobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
+ }
+ }
+}
+
+// AddRetrieveAndDelete() tests adding, retrieving, and deleting blobs from a
+// blobstore bs. One can't retrieve or delete something that hasn't been
+// created, so it's all done in one routine. If testDirName is non-empty,
+// the blobstore is assumed to be accessible in the file system, and its
+// files are checked.
+func AddRetrieveAndDelete(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, testDirName string) {
+ var err error
+
+ // Check that there are no files in the blobstore we were given.
+ iterator := bs.ListBlobIds(ctx)
+ for iterator.Advance() {
+ fileName := iterator.Value()
+ t.Errorf("unexpected file %q\n", fileName)
+ }
+ if iterator.Err() != nil {
+ t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+ }
+
+ // Create the strings: "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
+ womData := []byte("wom")
+ batData := []byte("bat")
+ wombatData := []byte("wombat")
+ batwomData := []byte("batwom")
+ atwoData := []byte("atwo")
+ atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
+
+ // fragmentMap will have an entry per content-addressed fragment.
+ fragmentMap := make(map[string]bool)
+
+ // Create the blobs, by various means.
+
+ var blobVector []testBlob // Accumulate the blobs we create here.
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ womData, false,
+ blobOrBlockOrFile{block: womData})
+ womName := blobVector[len(blobVector)-1].blobName
+ fragmentMap[string(womData)] = true
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ batData, false,
+ blobOrBlockOrFile{block: batData})
+ batName := blobVector[len(blobVector)-1].blobName
+ fragmentMap[string(batData)] = true
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, false,
+ blobOrBlockOrFile{block: wombatData})
+ firstWombatName := blobVector[len(blobVector)-1].blobName
+ fragmentMap[string(wombatData)] = true
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, true,
+ blobOrBlockOrFile{block: womData},
+ blobOrBlockOrFile{block: batData})
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, false,
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: -1,
+ offset: 0})
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, false,
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: 6,
+ offset: 0})
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ batwomData, false,
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: 3,
+ offset: 3},
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: 3,
+ offset: 0})
+ batwomName := blobVector[len(blobVector)-1].blobName
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ atwoData, false,
+ blobOrBlockOrFile{
+ blob: batwomName,
+ size: 4,
+ offset: 1})
+ atwoName := blobVector[len(blobVector)-1].blobName
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ atwoatwoombatatwoData, true,
+ blobOrBlockOrFile{
+ blob: atwoName,
+ size: -1,
+ offset: 0},
+ blobOrBlockOrFile{
+ blob: atwoName,
+ size: 4,
+ offset: 0},
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: -1,
+ offset: 1},
+ blobOrBlockOrFile{
+ blob: batName,
+ size: -1,
+ offset: 1},
+ blobOrBlockOrFile{
+ blob: womName,
+ size: 2,
+ offset: 0})
+ atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Nothing should change if we garbage collect.
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Ensure that deleting non-existent blobs fails.
+ err = bs.DeleteBlob(ctx, "../../../../etc/passwd")
+ if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+ t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+ }
+ err = bs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
+ if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+ t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+ }
+
+ // -------------------------------------------------
+ // Delete a blob.
+ err = bs.DeleteBlob(ctx, batName)
+ if err != nil {
+ t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
+ }
+ blobVector = removeBlobFromBlobVector(blobVector, batName)
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Nothing should change if we garbage collect.
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Open a BlobReader on a blob we're about to delete,
+ // so its fragments won't be garbage collected.
+
+ var br localblobstore.BlobReader
+ br, err = bs.NewBlobReader(ctx, atwoatwoombatatwoName)
+ if err != nil {
+ t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
+ }
+
+ // -------------------------------------------------
+ // Delete a blob. This should be the last on-disc reference to the
+ // content-addressed fragment "bat", but the fragment won't be deleted
+ // until close the reader and garbage collect.
+ err = bs.DeleteBlob(ctx, atwoatwoombatatwoName)
+ if err != nil {
+ t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
+ }
+ blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Garbage collection should change nothing; the fragment involved
+ // is still referenced from the open reader *br.
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+
+ // Close the open BlobReader and garbage collect.
+ err = br.Close()
+ if err != nil {
+ t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
+ }
+ delete(fragmentMap, string(batData))
+
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Delete all blobs.
+ for len(blobVector) != 0 {
+ err = bs.DeleteBlob(ctx, blobVector[0].blobName)
+ if err != nil {
+ t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
+ }
+ blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
+ }
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // The remaining fragments should be removed when we garbage collect.
+ for frag := range fragmentMap {
+ delete(fragmentMap, frag)
+ }
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+}
diff --git a/services/syncbase/localblobstore/model.go b/services/syncbase/localblobstore/model.go
new file mode 100644
index 0000000..b1a4565
--- /dev/null
+++ b/services/syncbase/localblobstore/model.go
@@ -0,0 +1,170 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package localblobstore is the interface to a local blob store.
+// Implementations include fs_cablobstore.
+package localblobstore
+
+import "v.io/v23/context"
+
+// A BlobStore represents a simple, content-addressable store.
+type BlobStore interface {
+ // NewBlobReader() returns a pointer to a newly allocated BlobReader on
+ // the specified blobName. BlobReaders should not be used concurrently
+ // by multiple threads. Returned handles should be closed with
+ // Close().
+ NewBlobReader(ctx *context.T, blobName string) (br BlobReader, err error)
+
+ // NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
+ // a newly created blob name, which can be found using the Name()
+ // method. BlobWriters should not be used concurrently by multiple
+ // threads. The returned handle should be closed with either the
+ // Close() or CloseWithoutFinalize() method to avoid leaking file
+ // handles.
+ NewBlobWriter(ctx *context.T) (bw BlobWriter, err error)
+
+ // ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on
+ // an old, but unfinalized blob name.
+ ResumeBlobWriter(ctx *context.T, blobName string) (bw BlobWriter, err error)
+
+ // DeleteBlob() deletes the named blob from the BlobStore.
+ DeleteBlob(ctx *context.T, blobName string) (err error)
+
+ // GC() removes old temp files and content-addressed blocks that are no
+ // longer referenced by any blob. It may be called concurrently with
+ // other calls to GC(), and with uses of BlobReaders and BlobWriters.
+ GC(ctx *context.T) error
+
+ // ListBlobIds() returns an iterator that can be used to enumerate the
+ // blobs in a BlobStore. Expected use is:
+ //
+ // iter := bs.ListBlobIds(ctx)
+ // for iter.Advance() {
+ // // Process iter.Value() here.
+ // }
+ // if iter.Err() != nil {
+ // // The loop terminated early due to an error.
+ // }
+ ListBlobIds(ctx *context.T) (iter Iter)
+
+ // ListCAIds() returns an iterator that can be used to enumerate the
+ // content-addressable fragments in a BlobStore. Expected use is:
+ //
+ // iter := bs.ListCAIds(ctx)
+ // for iter.Advance() {
+ // // Process iter.Value() here.
+ // }
+ // if iter.Err() != nil {
+ // // The loop terminated early due to an error.
+ // }
+ ListCAIds(ctx *context.T) (iter Iter)
+
+ // Root() returns the name of the root directory where the BlobStore is stored.
+ Root() string
+}
+
+// A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
+// and Seek() calls. A BlobReader can be created with NewBlobReader(), and
+// should be closed with the Close() method to avoid leaking file handles.
+type BlobReader interface {
+ // ReadAt() fills b[] with up to len(b) bytes of data starting at
+ // position "at" within the blob that the BlobReader indicates, and
+ // returns the number of bytes read.
+ ReadAt(b []byte, at int64) (n int, err error)
+
+ // Read() fills b[] with up to len(b) bytes of data starting at the
+ // current seek position of the BlobReader within the blob that the
+ // BlobReader indicates, and then both returns the number of bytes read
+ // and advances the BlobReader's seek position by that amount.
+ Read(b []byte) (n int, err error)
+
+ // Seek() sets the seek position of the BlobReader to offset if
+ // whence==0, offset+current_seek_position if whence==1, and
+ // offset+end_of_blob if whence==2, and then returns the current seek
+ // position.
+ Seek(offset int64, whence int) (result int64, err error)
+
+ // Close() indicates that the client will perform no further operations
+ // on the BlobReader. It releases any resources held by the
+ // BlobReader.
+ Close() error
+
+ // Name() returns the BlobReader's name.
+ Name() string
+
+ // Size() returns the BlobReader's size.
+ Size() int64
+
+ // IsFinalized() returns whether the BlobReader has been finalized.
+ IsFinalized() bool
+
+ // Hash() returns the BlobReader's hash. It may be nil if the blob is
+ // not finalized.
+ Hash() []byte
+}
+
+// A BlockOrFile represents a vector of bytes, and contains either a data
+// block (as a []byte), or a (file name, size, offset) triple.
+type BlockOrFile struct {
+ Block []byte // If FileName is empty, the bytes represented.
+ FileName string // If non-empty, the name of the file containing the bytes.
+ Size int64 // If FileName is non-empty, the number of bytes (or -1 for "all")
+ Offset int64 // If FileName is non-empty, the offset of the relevant bytes within the file.
+}
+
+// A BlobWriter allows a blob to be written. If a blob has not yet been
+// finalized, it also allows that blob to be extended. A BlobWriter may be
+// created with NewBlobWriter(), and should be closed with Close() or
+// CloseWithoutFinalize().
+type BlobWriter interface {
+ // AppendBlob() adds a (substring of a) pre-existing blob to the blob
+ // being written by the BlobWriter. The fragments of the pre-existing
+ // blob are not physically copied; they are referenced by both blobs.
+ AppendBlob(blobName string, size int64, offset int64) (err error)
+
+ // AppendFragment() appends a fragment to the blob being written by the
+ // BlobWriter, where the fragment is composed of the byte vectors
+ // described by the elements of item[]. The fragment is copied into
+ // the blob store.
+ AppendFragment(item ...BlockOrFile) (err error)
+
+ // Close() finalizes the BlobWriter, and indicates that the client will
+ // perform no further append operations on the BlobWriter. Any
+ // internal open file handles are closed.
+ Close() (err error)
+
+ // CloseWithoutFinalize() indicates that the client will perform no
+ // further append operations on the BlobWriter, but does not finalize
+ // the blob. Any internal open file handles are closed. Clients are
+ // expected to need this operation infrequently.
+ CloseWithoutFinalize() (err error)
+
+ // Name() returns the BlobWriter's name.
+ Name() string
+
+ // Size() returns the BlobWriter's size.
+ Size() int64
+
+ // IsFinalized() returns whether the BlobWriter has been finalized.
+ IsFinalized() bool
+
+ // Hash() returns the BlobWriter's hash, reflecting the bytes written so far.
+ Hash() []byte
+}
+
+// A Iter represents an iterator that allows the client to enumerate
+// all the blobs of fragments in a BlobStore.
+type Iter interface {
+ // Advance() stages an item so that it may be retrieved via Value.
+ // Returns true iff there is an item to retrieve. Advance must be
+ // called before Value is called.
+ Advance() (advanced bool)
+
+ // Value() returns the item that was staged by Advance. May panic if
+ // Advance returned false or was not called. Never blocks.
+ Value() (name string)
+
+ // Err() returns any error encountered by Advance. Never blocks.
+ Err() error
+}
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index b35afce..827eaf9 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -161,7 +161,7 @@
return err
}
// Check for "database already exists".
- if _, err := a.getDbInfo(ctx, call, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -172,7 +172,7 @@
info := &dbInfo{
Name: dbName,
}
- return a.putDbInfo(ctx, call, st, dbName, info)
+ return a.putDbInfo(ctx, st, dbName, info)
}); err != nil {
return err
}
@@ -192,7 +192,7 @@
// 3. Flip dbInfo.Initialized to true.
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+ return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
info.Initialized = true
return nil
})
@@ -234,7 +234,7 @@
// 2. Flip dbInfo.Deleted to true.
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+ return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
info.Deleted = true
return nil
})
@@ -251,7 +251,7 @@
}
// 4. Delete dbInfo record.
- if err := a.delDbInfo(ctx, call, a.s.st, dbName); err != nil {
+ if err := a.delDbInfo(ctx, a.s.st, dbName); err != nil {
return err
}
diff --git a/services/syncbase/server/db_info.go b/services/syncbase/server/db_info.go
index 98216bc..cbfcebd 100644
--- a/services/syncbase/server/db_info.go
+++ b/services/syncbase/server/db_info.go
@@ -17,7 +17,6 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
- "v.io/v23/rpc"
)
type dbInfoLayer struct {
@@ -49,9 +48,9 @@
// getDbInfo reads data from the storage engine.
// Returns a VDL-compatible error.
-func (a *app) getDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
info := &dbInfo{}
- if err := util.GetWithoutAuth(ctx, call, st, &dbInfoLayer{dbName, a}, info); err != nil {
+ if err := util.GetWithoutAuth(ctx, st, &dbInfoLayer{dbName, a}, info); err != nil {
return nil, err
}
return info, nil
@@ -59,27 +58,27 @@
// putDbInfo writes data to the storage engine.
// Returns a VDL-compatible error.
-func (a *app) putDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string, info *dbInfo) error {
- return util.Put(ctx, call, st, &dbInfoLayer{dbName, a}, info)
+func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
+ return util.Put(ctx, st, &dbInfoLayer{dbName, a}, info)
}
// delDbInfo deletes data from the storage engine.
// Returns a VDL-compatible error.
-func (a *app) delDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string) error {
- return util.Delete(ctx, call, st, &dbInfoLayer{dbName, a})
+func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
+ return util.Delete(ctx, st, &dbInfoLayer{dbName, a})
}
// updateDbInfo performs a read-modify-write.
// fn should "modify" v, and should return a VDL-compatible error.
// Returns a VDL-compatible error.
-func (a *app) updateDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
+func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
_ = st.(store.Transaction) // panics on failure, as desired
- info, err := a.getDbInfo(ctx, call, st, dbName)
+ info, err := a.getDbInfo(ctx, st, dbName)
if err != nil {
return err
}
if err := fn(info); err != nil {
return err
}
- return a.putDbInfo(ctx, call, st, dbName, info)
+ return a.putDbInfo(ctx, st, dbName, info)
}
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 0971923..d18ab04 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -15,7 +15,6 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase/nosql/query_db"
"v.io/syncbase/v23/syncbase/nosql/query_exec"
- prefixutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -103,7 +102,7 @@
Name: d.name,
Perms: opts.Perms,
}
- if err := util.Put(ctx, call, d.st, d, data); err != nil {
+ if err := util.Put(ctx, d.st, d, data); err != nil {
return nil, err
}
return d, nil
@@ -404,20 +403,20 @@
req *tableReq
}
-func (t *tableDb) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
return &kvs{
- t: t,
- prefixes: prefixes,
- curr: -1,
- validRow: false,
- it: nil,
- err: nil,
+ t: t,
+ keyRanges: keyRanges,
+ curr: -1,
+ validRow: false,
+ it: nil,
+ err: nil,
}, nil
}
type kvs struct {
t *tableDb
- prefixes []string
+ keyRanges query_db.KeyRanges
curr int // current index into prefixes, -1 at start
validRow bool
currKey string
@@ -433,16 +432,22 @@
if s.curr == -1 {
s.curr++
}
- for s.curr < len(s.prefixes) {
+ for s.curr < len(s.keyRanges) {
if s.it == nil {
- start := prefixutil.PrefixRangeStart(s.prefixes[s.curr])
- limit := prefixutil.PrefixRangeLimit(s.prefixes[s.curr])
- s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), string(start), string(limit)))
+ start := s.keyRanges[s.curr].Start
+ limit := s.keyRanges[s.curr].Limit
+ // 0-255 means examine all rows
+ if start == string([]byte{0}) && limit == string([]byte{255}) {
+ start = ""
+ limit = ""
+ }
+ s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), start, limit))
}
if s.it.Advance() {
// key
keyBytes := s.it.Key(nil)
parts := util.SplitKeyParts(string(keyBytes))
+ // TODO(rogulenko): Check access for the key.
s.currKey = parts[len(parts)-1]
// value
valueBytes := s.it.Value(nil)
@@ -489,8 +494,8 @@
s.it.Cancel()
s.it = nil
}
- // set curr to end of prefixes so Advance will return false
- s.curr = len(s.prefixes)
+ // set curr to end of keyRanges so Advance will return false
+ s.curr = len(s.keyRanges)
}
////////////////////////////////////////
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 3397fd1..542224a 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -91,11 +91,10 @@
}
// checkAccess checks that this row's table exists in the database, and performs
-// an authorization check (currently against the table perms).
+// an authorization check.
// Returns a VDL-compatible error.
-// TODO(sadovsky): Use prefix permissions.
func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
- return util.Get(ctx, call, st, r.t, &tableData{})
+ return r.t.checkAccess(ctx, call, st, r.key)
}
// get reads data from the storage engine.
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 7619fc4..82a0cab 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -5,6 +5,8 @@
package nosql
import (
+ "strings"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -12,6 +14,7 @@
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
+ "v.io/v23/vom"
)
// tableReq is a per-request object that handles Table RPCs.
@@ -39,7 +42,7 @@
return err
}
// Check for "table already exists".
- if err := util.GetWithoutAuth(ctx, call, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.GetWithoutAuth(ctx, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -54,7 +57,7 @@
Name: t.name,
Perms: perms,
}
- return util.Put(ctx, call, st, t, data)
+ return util.Put(ctx, st, t, data)
})
}
@@ -71,20 +74,30 @@
return err
}
// TODO(sadovsky): Delete all rows in this table.
- return util.Delete(ctx, call, st, t)
+ return util.Delete(ctx, st, t)
})
}
func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
impl := func(st store.StoreReadWriter) error {
- // Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ // Check for table-level access before doing a scan.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
key := []byte{}
for it.Advance() {
key = it.Key(key)
+ // Check perms.
+ parts := util.SplitKeyParts(string(key))
+ externalKey := parts[len(parts)-1]
+ if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ // TODO(rogulenko): Revisit this behavior. Probably we should
+ // delete all rows that we have access to.
+ it.Cancel()
+ return err
+ }
+ // Delete the key-value pair.
if err := st.Delete(key); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -107,8 +120,8 @@
func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
impl := func(st store.StoreReader) error {
- // Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ // Check for table-level access before doing a scan.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
@@ -116,8 +129,14 @@
key, value := []byte{}, []byte{}
for it.Advance() {
key, value = it.Key(key), it.Value(value)
+ // Check perms.
parts := util.SplitKeyParts(string(key))
- sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+ externalKey := parts[len(parts)-1]
+ if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ it.Cancel()
+ return err
+ }
+ sender.Send(wire.KeyValue{Key: externalKey, Value: value})
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
@@ -135,38 +154,27 @@
return impl(st)
}
-func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
- if prefix != "" {
- return verror.NewErrNotImplemented(ctx)
- }
- impl := func(st store.StoreReadWriter) error {
- data := &tableData{}
- return util.Update(ctx, call, st, t, data, func() error {
- data.Perms = perms
- return nil
- })
- }
- if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
- return err
- } else {
- return impl(st)
- }
- } else {
- return store.RunInTransaction(t.d.st, impl)
- }
-}
-
func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
- if key != "" {
- return nil, verror.NewErrNotImplemented(ctx)
- }
impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
- data := &tableData{}
- if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+ // Check permissions only at table level.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return nil, err
}
- return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+ // Get the most specific permissions object.
+ prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ if err != nil {
+ return nil, err
+ }
+ result := []wire.PrefixPermissions{{Prefix: prefix, Perms: prefixPerms.Perms}}
+ // Collect all parent permissions objects all the way up to the table level.
+ for prefix != "" {
+ prefix = prefixPerms.Parent
+ if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+ return nil, err
+ }
+ result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
+ }
+ return result, nil
}
var st store.StoreReader
if t.d.batchId != nil {
@@ -179,17 +187,114 @@
return impl(st)
}
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+ impl := func(st store.StoreReadWriter) error {
+ if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ return err
+ }
+ // Concurrent transactions that touch this table should fail with
+ // ErrConcurrentTransaction when this transaction commits.
+ if err := t.lock(ctx, st); err != nil {
+ return err
+ }
+ if prefix == "" {
+ data := &tableData{}
+ return util.Update(ctx, call, st, t, data, func() error {
+ data.Perms = perms
+ return nil
+ })
+ }
+ // Get the most specific permissions object.
+ parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ if err != nil {
+ return err
+ }
+ // In case there is no permissions object for the given prefix, we need
+ // to add a new node to the prefix permissions tree. We do it by updating
+ // parents for all children of the prefix to the node corresponding to
+ // the prefix.
+ if parent != prefix {
+ if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+ return err
+ }
+ } else {
+ parent = prefixPerms.Parent
+ }
+ stPrefix := t.prefixPermsKey(prefix)
+ stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
+ prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
+ // Put the (prefix, perms) pair to the database.
+ if err := util.PutObject(st, stPrefix, prefixPerms); err != nil {
+ return err
+ }
+ return util.PutObject(st, stPrefixLimit, prefixPerms)
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
+}
+
func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
- return verror.NewErrNotImplemented(ctx)
+ if prefix == "" {
+ return verror.New(verror.ErrBadArg, ctx, prefix)
+ }
+ impl := func(st store.StoreReadWriter) error {
+ if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ return err
+ }
+ // Concurrent transactions that touch this table should fail with
+ // ErrConcurrentTransaction when this transaction commits.
+ if err := t.lock(ctx, st); err != nil {
+ return err
+ }
+ // Get the most specific permissions object.
+ parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ if err != nil {
+ return err
+ }
+ if parent != prefix {
+ // This can happen only if there is no permissions object for the
+ // given prefix. Since DeletePermissions is idempotent, return nil.
+ return nil
+ }
+ // We need to delete the node corresponding to the prefix from the prefix
+ // permissions tree. We do it by updating parents for all children of the
+ // prefix to the parent of the node corresponding to the prefix.
+ if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+ return err
+ }
+ stPrefix := []byte(t.prefixPermsKey(prefix))
+ stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+ if err := st.Delete(stPrefix); err != nil {
+ return err
+ }
+ return st.Delete(stPrefixLimit)
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
}
func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
// Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
closeStoreReader()
return nil, err
}
+ // TODO(rogulenko): Check prefix permissions for children.
return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
}
var st store.StoreReader
@@ -226,3 +331,145 @@
func (t *tableReq) stKeyPart() string {
return t.name
}
+
+// updateParentRefs updates the parent for all children of the given
+// prefix to newParent.
+func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+ stPrefix := []byte(t.prefixPermsKey(prefix))
+ stPrefixStart := append(stPrefix, 0)
+ stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+ it := st.Scan(stPrefixStart, stPrefixLimit)
+ var key, value []byte
+ for it.Advance() {
+ key, value = it.Key(key), it.Value(value)
+ var prefixPerms stPrefixPerms
+ if err := vom.Decode(value, &prefixPerms); err != nil {
+ it.Cancel()
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ prefixPerms.Parent = newParent
+ if err := util.PutObject(st, string(key), prefixPerms); err != nil {
+ it.Cancel()
+ return err
+ }
+ }
+ if err := it.Err(); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// lock invalidates all concurrent transactions with ErrConcurrentTransaction
+// that have accessed this table.
+// Returns a VDL-compatible error.
+//
+// It is necessary to call lock() every time prefix permissions are updated,
+// so snapshots inside all transactions reflect up-to-date permissions. Since
+// every public function that touches this table has to read the table-level
+// permissions object, it is enough to add the key of table-level permissions
+// to the write set of the current transaction.
+//
+// TODO(rogulenko): Revisit this behavior to provide more granularity.
+// A possible option would be to add prefix and its parent to the write set
+// of the current transaction when permissions object for a prefix is updated.
+func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+ var data tableData
+ if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ return err
+ }
+ return util.Put(ctx, st, t, data)
+}
+
+// checkAccess checks that this table exists in the database, and performs
+// an authorization check. The access is checked at table level and at the
+// level of the most specific prefix for the given key.
+// Returns a VDL-compatible error.
+// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
+// access check to be a check for "Resolve", i.e. also check access to
+// service, app and database.
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
+ prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ if err != nil {
+ return err
+ }
+ if prefix != "" {
+ if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ return err
+ }
+ }
+ auth, _ := access.PermissionsAuthorizer(prefixPerms.Perms, access.TypicalTagType())
+ if err := auth.Authorize(ctx, call.Security()); err != nil {
+ return verror.New(verror.ErrNoAccess, ctx, prefix)
+ }
+ return nil
+}
+
+// permsForKey returns the longest prefix of the given key that has
+// associated permissions with its permissions object.
+// permsForKey doesn't perform an authorization check.
+// Returns a VDL-compatible error.
+//
+// Virtually we represent all prefixes as a forest T, where each vertex maps to
+// a prefix. A parent for a string is the maximum proper prefix of it that
+// belongs to T. Each prefix P from T is represented as a pair of entries with
+// keys P and P~ with values of type stPrefixPerms (parent + perms).
+// High level of how this function works:
+// 1 iter = db.Scan(K, "")
+// Here last character of iter.Key() is removed automatically if it is '~'
+// 2 if hasPrefix(K, iter.Key()) return iter.Value()
+// 3 return parent(iter.Key())
+// Short proof:
+// iter returned on line 1 points to one of the following:
+// - a string t that is equal to K;
+// - a string t~: if t is not a prefix of K, then K < t < t~ which
+// contradicts with property of returned iterator on line 1 => t is prefix of
+// K; also t is the largest prefix of K, as all larger prefixes of K are
+// less than t~; in this case line 2 returns correct result;
+// - a string t that doesn't end with '~': it can't be a prefix of K, as all
+// proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
+// K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
+// prefix of K; in this case line 3 returns correct result.
+func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
+ it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+ if !it.Advance() {
+ prefixPerms, err := t.permsForPrefix(ctx, st, "")
+ return "", prefixPerms, err
+ }
+ defer it.Cancel()
+ parts := util.SplitKeyParts(string(it.Key(nil)))
+ prefix := strings.TrimSuffix(parts[len(parts)-1], util.PrefixRangeLimitSuffix)
+ value := it.Value(nil)
+ var prefixPerms stPrefixPerms
+ if err := vom.Decode(value, &prefixPerms); err != nil {
+ return "", stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+ }
+ if strings.HasPrefix(key, prefix) {
+ return prefix, prefixPerms, nil
+ }
+ prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+ return prefixPerms.Parent, prefixPerms, err
+}
+
+// permsForPrefix returns the permissions object associated with the
+// provided prefix.
+// Returns a VDL-compatible error.
+func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+ if prefix == "" {
+ var data tableData
+ if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ return stPrefixPerms{}, err
+ }
+ return stPrefixPerms{Perms: data.Perms}, nil
+ }
+ var prefixPerms stPrefixPerms
+ if err := util.GetObject(st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+ return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return prefixPerms, nil
+}
+
+// prefixPermsKey returns the key used for storing permissions for the given
+// prefix in the table.
+func (t *tableReq) prefixPermsKey(prefix string) string {
+ return util.JoinKeyParts(util.PermsPrefix, t.name, prefix)
+}
diff --git a/services/syncbase/server/nosql/types.vdl b/services/syncbase/server/nosql/types.vdl
index 4a8a328..33d3883 100644
--- a/services/syncbase/server/nosql/types.vdl
+++ b/services/syncbase/server/nosql/types.vdl
@@ -21,3 +21,17 @@
Name string
Perms access.Permissions
}
+
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key" - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+ Parent string
+ Perms access.Permissions
+}
diff --git a/services/syncbase/server/nosql/types.vdl.go b/services/syncbase/server/nosql/types.vdl.go
index 313d982..021cef6 100644
--- a/services/syncbase/server/nosql/types.vdl.go
+++ b/services/syncbase/server/nosql/types.vdl.go
@@ -39,7 +39,27 @@
}) {
}
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key" - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+ Parent string
+ Perms access.Permissions
+}
+
+func (stPrefixPerms) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/nosql.stPrefixPerms"`
+}) {
+}
+
func init() {
vdl.Register((*databaseData)(nil))
vdl.Register((*tableData)(nil))
+ vdl.Register((*stPrefixPerms)(nil))
}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index e551431..135ee35 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -68,7 +68,7 @@
data := &serviceData{
Perms: opts.Perms,
}
- if err := util.Put(ctx, call, s.st, s, data); err != nil {
+ if err := util.Put(ctx, s.st, s, data); err != nil {
return nil, err
}
if s.sync, err = vsync.New(ctx, call, s); err != nil {
@@ -178,7 +178,7 @@
return err
}
// Check for "app already exists".
- if err := util.GetWithoutAuth(ctx, call, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.GetWithoutAuth(ctx, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -192,7 +192,7 @@
Name: appName,
Perms: perms,
}
- return util.Put(ctx, call, st, a, data)
+ return util.Put(ctx, st, a, data)
}); err != nil {
return err
}
@@ -218,7 +218,7 @@
return err
}
// TODO(sadovsky): Delete all databases in this app.
- return util.Delete(ctx, call, st, a)
+ return util.Delete(ctx, st, a)
}); err != nil {
return err
}
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 44d0ff6..9251375 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -12,6 +12,7 @@
DatabasePrefix = "$database"
DbInfoPrefix = "$dbInfo"
LogPrefix = "$log"
+ PermsPrefix = "$perms"
RowPrefix = "$row"
ServicePrefix = "$service"
SyncPrefix = "$sync"
@@ -27,4 +28,8 @@
BatchSep = ":"
// Separator for parts of storage engine keys.
KeyPartSep = ":"
+ // PrefixRangeLimitSuffix is the suffix of a key which indicates the end of
+ // a prefix range. Should be more than any regular key in the store.
+ // TODO(rogulenko): Change this constant to something out of the UTF8 space.
+ PrefixRangeLimitSuffix = "~"
)
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index a495ebd..fb739ca 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -47,7 +47,7 @@
// GetWithoutAuth does st.Get(l.StKey(), v), populating v.
// Returns a VDL-compatible error.
-func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
+func GetWithoutAuth(ctx *context.T, st store.StoreReader, l Layer, v interface{}) error {
if err := GetObject(st, l.StKey(), v); err != nil {
if verror.ErrorID(err) == store.ErrUnknownKey.ID {
return verror.New(verror.ErrNoExist, ctx, l.Name())
@@ -60,7 +60,7 @@
// Get does GetWithoutAuth followed by an auth check.
// Returns a VDL-compatible error.
func Get(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v Permser) error {
- if err := GetWithoutAuth(ctx, call, st, l, v); err != nil {
+ if err := GetWithoutAuth(ctx, st, l, v); err != nil {
return err
}
auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
@@ -73,7 +73,7 @@
// Put does st.Put(l.StKey(), v).
// Returns a VDL-compatible error.
// If you need to perform an authorization check, use Update().
-func Put(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer, v interface{}) error {
+func Put(ctx *context.T, st store.StoreWriter, l Layer, v interface{}) error {
if err := PutObject(st, l.StKey(), v); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -83,7 +83,7 @@
// Delete does st.Delete(l.StKey()).
// Returns a VDL-compatible error.
// If you need to perform an authorization check, call Get() first.
-func Delete(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer) error {
+func Delete(ctx *context.T, st store.StoreWriter, l Layer) error {
if err := st.Delete([]byte(l.StKey())); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -103,7 +103,7 @@
if err := fn(); err != nil {
return err
}
- return Put(ctx, call, st, l, v)
+ return Put(ctx, st, l, v)
}
////////////////////////////////////////////////////////////
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index df96244..a75b471 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -19,6 +19,7 @@
"v.io/syncbase/x/ref/services/syncbase/server"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
+ "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
)
@@ -44,26 +45,16 @@
ctx, shutdown := v23.Init()
defer shutdown()
- s, err := v23.NewServer(ctx)
- if err != nil {
- vlog.Fatal("v23.NewServer() failed: ", err)
- }
- if _, err := s.Listen(v23.GetListenSpec(ctx)); err != nil {
- vlog.Fatal("s.Listen() failed: ", err)
- }
-
perms, err := securityflag.PermissionsFromFlag()
if err != nil {
vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
}
-
if perms != nil {
vlog.Info("Using permissions from command line flag.")
} else {
vlog.Info("No permissions flag provided. Giving local principal all permissions.")
perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
}
-
service, err := server.NewService(nil, nil, server.ServiceOptions{
Perms: perms,
RootDir: *rootDir,
@@ -74,9 +65,8 @@
}
d := server.NewDispatcher(service)
- // Publish the service in the mount table.
- if err := s.ServeDispatcher(*name, d); err != nil {
- vlog.Fatal("s.ServeDispatcher() failed: ", err)
+ if _, err = xrpc.NewDispatchingServer(ctx, *name, d); err != nil {
+ vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
}
vlog.Info("Mounted at: ", *name)