blob: 9d371fdd0e95e1facfb1ac2b4837bd66ee8ef9ca [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package featuretests_test
import (
"crypto/md5"
"crypto/rand"
"fmt"
"reflect"
"testing"
"time"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/x/ref/test/v23test"
)
func TestV23BlobWholeTransfer(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
sbs := setupSyncbases(t, sh, 2, false, false)
sgId := wire.Id{Name: "SG1", Blessing: testCx.Blessing}
ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "", wire.SyncgroupMemberInfo{SyncPriority: 8}))
ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId,
wire.SyncgroupMemberInfo{SyncPriority: 10}))
ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 10))
// FetchBlob first.
ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, 9, false))
ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
// GetBlob directly.
ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("abcdefghijklmn")))
// Sleep so that the update to key "foo0" makes it to the other side.
time.Sleep(10 * time.Second)
ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("fghijklmn"), 5))
ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 14, true))
// Test with a big blob (1 MB).
ok(t, generateBigBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1))
ok(t, getBigBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1))
}
// TestV23ServerBlobFetch tests the mechanism for fetch blobs automatically on
// servers. It sets up two syncbases:
// - 0 is a normal syncgroup member, and creates a syncgroup.
// - 1 is a server, and joins the syncgroup.
// Both create blobs, then the test waits until the autmoatic fetch mechanism
// has had a chance to run. Then, it checks that each syncbase has the blob it
// created itself, that the server has the blob created by the non-server, and
// the non-server does not (initially) have the blob created by the server
// (this latter check fetches the blob to the non-creator as a side effect). Then
// the values of the blobs are checked at both syncbases.
func TestV23ServerBlobFetch(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
sbs := setupSyncbases(t, sh, 2, true, false)
sgId := wire.Id{Name: "SG1", Blessing: testCx.Blessing}
ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
// sbs[0] is not a server.
ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "",
wire.SyncgroupMemberInfo{SyncPriority: 8}))
// sbs[1] is a server, so will fetch blobs automatically; it joins the syncgroup.
ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId,
wire.SyncgroupMemberInfo{SyncPriority: 10, BlobDevType: byte(wire.BlobDevTypeServer)}))
ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 10))
// Generate a blob on each member.
ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat")))
// Before the server fetch, each node should have just its own blob.
checkShares(t, "before server fetch", sbs, []int{
2, 0,
0, 2,
})
// Sleep until the fetch mechanism has a chance to run (its initial timeout is 10s,
// to make this test possible).
time.Sleep(15 * time.Second)
// Now the server should have all blob shares.
checkShares(t, "after server fetch", sbs, []int{
0, 0,
2, 2,
})
// The syncbases should have the blobs they themselves created.
ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 9, true /*already present*/))
ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, 12, true /*already present*/))
// sbs[1] should already have the blob created by sbs[0], since sbs[1] is a server.
ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, 9, true /*already present*/))
// sbs[0] should not already have the blob created by sbs[1], since sbs[0] is not a server.
ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, 12, false /*not already present*/))
// The share distribution should not have changed.
checkShares(t, "at end", sbs, []int{
0, 0,
2, 2,
})
// Wrap up by checking the blob values.
ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, []byte("wombatnumbat"), 0))
ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat"), 0))
ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz"), 0))
ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
}
// TestV23LeafBlobFetch tests the mechanism for leaf syncbases to give blobs to
// non-leaf syncbases with which they sync. It sets up two syncbases:
// - 0 is a non-leaf syncgroup member; it creates a syncgroup.
// - 1 is a leaf syncgroup member; it joins the syncgroup.
// Both create blobs. The test checks that the non-leaf syncbase automatically
// fetches the blob created on the leaf, but not vice versa.
func TestV23LeafBlobFetch(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
sbs := setupSyncbases(t, sh, 2, true, false)
sgId := wire.Id{Name: "SG1", Blessing: testCx.Blessing}
ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
// sbs[0] is a non-leaf member that creates the syncgroup, and creates a blob.
ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "",
wire.SyncgroupMemberInfo{SyncPriority: 8}))
// sbs[1] is a leaf member; it joins the syncgroup, and creates a blob.
// It will tell the non-leaf member to fetch its blob.
ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId,
wire.SyncgroupMemberInfo{SyncPriority: 8, BlobDevType: byte(wire.BlobDevTypeLeaf)}))
ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 10))
// Generate a blob on each member.
ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat")))
time.Sleep(5 * time.Second)
// One ownership share of the leaf's blob should have been transferred to the non-leaf.
checkShares(t, "leaf initial", sbs, []int{
2, 1,
0, 1,
})
// The syncbases should have the blobs they themselves created.
ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 9, true /*already present*/))
ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, 12, true /*already present*/))
// sbs[0] should have the leaf's blob too.
ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, 12, true /*already present*/))
// sbs[1] should not have the blob created by sbs[0].
ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, 9, false /*not already present*/))
// Wrap up by checking the blob values.
ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz"), 0))
ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, []byte("wombatnumbat"), 0))
ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat"), 0))
}
////////////////////////////////////
// Helpers.
// TODO(sadovsky): Noticed while refactoring: there is a lot of duplicated code
// below, e.g. for generating blobs, getting testStruct values, etc. We should
// aim to avoid such duplication as it makes it more difficult to change APIs,
// fix bugs, etc.
type testStruct struct {
Val string
Blob wire.BlobRef
}
func generateBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, data []byte) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
c := d.CollectionForId(testCx)
b, err := d.CreateBlob(ctx)
if err != nil {
return fmt.Errorf("CreateBlob failed, err %v", err)
}
bw, err := b.Put(ctx)
if err != nil {
return fmt.Errorf("PutBlob RPC failed, err %v", err)
}
if err := bw.Send(data); err != nil {
return fmt.Errorf("Sending blob data failed, err %v", err)
}
if err := bw.Close(); err != nil {
return fmt.Errorf("Closing blob writer failed, err %v", err)
}
// Commit the blob.
if err := b.Commit(ctx); err != nil {
return fmt.Errorf("Committing a blob failed, err %v", err)
}
// Put the BlobRef in a key.
key := fmt.Sprintf("%s%d", keyPrefix, pos)
r := c.Row(key)
s := testStruct{Val: "testkey" + key, Blob: b.Ref()}
if err := r.Put(ctx, s); err != nil {
return fmt.Errorf("r.Put() failed: %v", err)
}
return nil
}
// getTestStructAndBlobFromRow waits until the row with the key keyPrefix + pos
// is accessible on the specified syncbase, then reads and returns the value
// from that row, which is expected to be a testStruct. A blob handle from the
// blobref is also returned.
// This is a common subroutine used in several of the routines below.
func getTestStructAndBlobFromRow(ctx *context.T, syncbaseName, keyPrefix string, pos int) (testStruct, syncbase.Blob, error) {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
c := d.CollectionForId(testCx)
key := fmt.Sprintf("%s%d", keyPrefix, pos)
r := c.Row(key)
// Try for 10 seconds to get the new value.
var s testStruct
var err error
for i := 0; i < 10; i++ {
// Note: the error is a decode error since the old value is a
// string, and the new value is testStruct.
if err = r.Get(ctx, &s); err == nil {
break
}
time.Sleep(1 * time.Second)
}
if err != nil {
return testStruct{}, nil, fmt.Errorf("r.Get() failed: %v", err)
}
return s, d.Blob(s.Blob), nil
}
// fetchBlob ensures that the blob named by key (keyPrefix, pos) can be fetched
// by syncbaseName, and has size wantSize. If alreadyPresent is set, it checks
// that the blob is already present, and other checks that the blob is fetched
// from some remote syncbase.
func fetchBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantSize int64, alreadyPresent bool) error {
var b syncbase.Blob
var err error
_, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
if err != nil {
return err
}
bs, err := b.Fetch(ctx, 100)
if err != nil {
return fmt.Errorf("Fetch RPC failed, err %v", err)
}
var status []wire.BlobFetchStatus
if alreadyPresent {
status = []wire.BlobFetchStatus{wire.BlobFetchStatus{State: wire.BlobFetchStateDone}}
} else {
status = []wire.BlobFetchStatus{
wire.BlobFetchStatus{State: wire.BlobFetchStatePending},
wire.BlobFetchStatus{State: wire.BlobFetchStateLocating},
wire.BlobFetchStatus{State: wire.BlobFetchStateFetching},
wire.BlobFetchStatus{State: wire.BlobFetchStateDone}}
}
var gotStatus wire.BlobFetchStatus
i := 0
for bs.Advance() {
gotStatus = bs.Value()
if i <= 1 {
if !reflect.DeepEqual(gotStatus, status[i]) {
return fmt.Errorf("Fetch blob failed, got status %v want status %v", gotStatus, status[i])
}
i++
} else if !(gotStatus.State == status[2].State || reflect.DeepEqual(gotStatus, status[3])) {
return fmt.Errorf("Fetch blob failed, got status %v", gotStatus)
}
}
if !reflect.DeepEqual(gotStatus, status[len(status)-1]) {
return fmt.Errorf("Fetch blob failed, got status %v want status %v", gotStatus, status[3])
}
if bs.Err() != nil {
return fmt.Errorf("Fetch blob failed, err %v", err)
}
gotSize, err := b.Size(ctx)
if err != nil || gotSize != wantSize {
return fmt.Errorf("Blob size incorrect, got %v want %v, err %v", gotSize, wantSize, err)
}
return nil
}
func getBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantVal []byte, offset int64) error {
var b syncbase.Blob
var err error
_, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
if err != nil {
return err
}
br, err := b.Get(ctx, offset)
if err != nil {
return fmt.Errorf("GetBlob RPC failed, err %v", err)
}
var gotVal []byte
for br.Advance() {
gotVal = append(gotVal, br.Value()...)
}
if br.Err() != nil {
return fmt.Errorf("Getting a blob failed, err %v", br.Err())
}
if !reflect.DeepEqual(gotVal, wantVal) {
return fmt.Errorf("Getting a blob failed, got %v want %v", gotVal, wantVal)
}
return nil
}
func generateBigBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
c := d.CollectionForId(testCx)
b, err := d.CreateBlob(ctx)
if err != nil {
return fmt.Errorf("CreateBlob failed, err %v", err)
}
bw, err := b.Put(ctx)
if err != nil {
return fmt.Errorf("PutBlob RPC failed, err %v", err)
}
hasher := md5.New()
chunkSize := 8192
content := make([]byte, chunkSize)
// Send 1 MB blob.
for i := 0; i < 128; i++ {
if n, err := rand.Read(content); err != nil || n != chunkSize {
return fmt.Errorf("Creating blob data failed, n %v err %v", n, err)
}
if err := bw.Send(content); err != nil {
return fmt.Errorf("Sending blob data failed, err %v", err)
}
hasher.Write(content)
}
if err := bw.Close(); err != nil {
return fmt.Errorf("Closing blob writer failed, err %v", err)
}
// Commit the blob.
if err := b.Commit(ctx); err != nil {
return fmt.Errorf("Committing a blob failed, err %v", err)
}
// Put the BlobRef in a key.
key := fmt.Sprintf("%s%d", keyPrefix, pos)
r := c.Row(key)
// Blob hash is transferred via structured store.
s := testStruct{Val: hashToString(hasher.Sum(nil)), Blob: b.Ref()}
if err := r.Put(ctx, s); err != nil {
return fmt.Errorf("r.Put() failed: %v", err)
}
return nil
}
func getBigBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int) error {
var s testStruct
var b syncbase.Blob
var err error
s, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
if err != nil {
return err
}
br, err := b.Get(ctx, 0)
if err != nil {
return fmt.Errorf("GetBlob RPC failed, err %v", err)
}
hasher := md5.New()
for br.Advance() {
content := br.Value()
hasher.Write(content)
}
if br.Err() != nil {
return fmt.Errorf("Getting a blob failed, err %v", br.Err())
}
gotHash := hashToString(hasher.Sum(nil))
if !reflect.DeepEqual(gotHash, s.Val) {
return fmt.Errorf("Getting a blob failed, got %v want %v", gotHash, s.Val)
}
return nil
}
// getBlobOwnershipShares returns the ownership shares (summed across all syncgroups) for blob br.
func getBlobOwnershipShares(ctx *context.T, syncbaseName string, keyPrefix string, pos int) (shares int) {
var s testStruct
var err error
s, _, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
if err == nil {
var shareMap map[string]int32
shareMap, err = sc(syncbaseName).DevModeGetBlobShares(ctx, s.Blob)
if err == nil {
// Sum the shares across all syncgroups.
for _, shareCount := range shareMap {
shares += int(shareCount)
}
}
}
return shares
}
// checkShares checks that the shares of the blobs mentioned in rows fooN (for N in 0..len(sbs)-1)
// are at the expected values for the various syncbases. The elements of expectedShares[]
// are in the order sbs[0] for foo0, foo1, ..., followed by sbs[1] for foo0, foo1, ... etc.
// An expected value of -1 means "don't care".
func checkShares(t *testing.T, msg string, sbs []*testSyncbase, expectedShares []int) {
for sbIndex := 0; sbIndex != len(sbs); sbIndex++ {
for blobIndex := 0; blobIndex != len(sbs); blobIndex++ {
var expected int = expectedShares[blobIndex+len(sbs)*sbIndex]
if expected != -1 {
var shares int = getBlobOwnershipShares(sbs[sbIndex].clientCtx, sbs[sbIndex].sbName, "foo", blobIndex)
if expected != shares {
t.Errorf("%s: sb %d blob %d got %d shares, expected %d\n",
msg, sbIndex, blobIndex, shares, expected)
}
}
}
}
}
// Copied from localblobstore/fs_cablobstore/fs_cablobstore.go.
//
// hashToString() returns a string representation of the hash.
// Requires len(hash)==16. An md5 hash is suitable.
func hashToString(hash []byte) string {
return fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
hash[0], hash[1], hash[2], hash[3],
hash[4], hash[5], hash[6], hash[7],
hash[8], hash[9], hash[10], hash[11],
hash[12], hash[13], hash[14], hash[15])
}