blob: fd6d721cf48201d6d53149eccc927897976b96c8 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package nosql_test
import (
"crypto/md5"
"crypto/rand"
"fmt"
"reflect"
"strconv"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase"
tu "v.io/syncbase/v23/syncbase/testutil"
constants "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23"
"v.io/v23/naming"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
)
//go:generate v23 test generate
func V23TestSyncbasedWholeBlobTransfer(t *v23tests.T) {
t.Skip("https://github.com/vanadium/issues/issues/667")
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
// FetchBlob first.
tu.RunClient(t, client0Creds, runGenerateBlob, "sync0", "foo", "0", "foobarbaz")
tu.RunClient(t, client1Creds, runFetchBlob, "sync1", "foo", "0", "9", "false")
tu.RunClient(t, client1Creds, runGetBlob, "sync1", "foo", "0", "foobarbaz", "0")
// GetBlob directly.
tu.RunClient(t, client1Creds, runGenerateBlob, "sync1", "foo", "0", "abcdefghijklmn")
tu.RunClient(t, client0Creds, runGetBlob, "sync0", "foo", "0", "fghijklmn", "5")
tu.RunClient(t, client0Creds, runFetchBlob, "sync0", "foo", "0", "14", "true")
// Test with a big blob (1 MB).
tu.RunClient(t, client0Creds, runGenerateBigBlob, "sync0", "foo", "1")
tu.RunClient(t, client1Creds, runGetBigBlob, "sync1", "foo", "1")
}
////////////////////////////////////
// Helpers.
type testStruct struct {
Val string
Blob wire.BlobRef
}
// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
// actual key), 3: blob data.
var runGenerateBlob = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
b, err := d.CreateBlob(ctx)
if err != nil {
return fmt.Errorf("CreateBlob failed, err %v\n", err)
}
bw, err := b.Put(ctx)
if err != nil {
return fmt.Errorf("PutBlob RPC failed, err %v\n", err)
}
data := args[3]
if err := bw.Send([]byte(data)); err != nil {
return fmt.Errorf("Sending blob data failed, err %v\n", err)
}
if err := bw.Close(); err != nil {
return fmt.Errorf("Closing blob writer failed, err %v\n", err)
}
// Commit the blob.
if err := b.Commit(ctx); err != nil {
return fmt.Errorf("Committing a blob failed, err %v\n", err)
}
// Put the BlobRef in a key.
tb := d.Table("tb")
pos, _ := strconv.ParseUint(args[2], 10, 64)
key := fmt.Sprintf("%s%d", args[1], pos)
r := tb.Row(key)
s := testStruct{Val: "testkey" + key, Blob: b.Ref()}
if err := r.Put(ctx, s); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
return nil
}, "runGenerateBlob")
// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
// actual key), 3: blob size, 4: skip incremental status checking.
var runFetchBlob = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
pos, _ := strconv.ParseUint(args[2], 10, 64)
key := fmt.Sprintf("%s%d", args[1], pos)
r := tb.Row(key)
var s testStruct
// Try for 10 seconds to get the new value.
var err error
for i := 0; i < 10; i++ {
// Note: the error is a decode error since the old value is a
// string, and the new value is testStruct.
if err = r.Get(ctx, &s); err == nil {
break
}
time.Sleep(1 * time.Second)
}
if err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
b := d.Blob(s.Blob)
bs, err := b.Fetch(ctx, 100)
if err != nil {
return fmt.Errorf("Fetch RPC failed, err %v\n", err)
}
status := []wire.BlobFetchStatus{
wire.BlobFetchStatus{State: wire.BlobFetchStatePending},
wire.BlobFetchStatus{State: wire.BlobFetchStateLocating},
wire.BlobFetchStatus{State: wire.BlobFetchStateFetching},
wire.BlobFetchStatus{State: wire.BlobFetchStateDone}}
skipIncStatus, _ := strconv.ParseBool(args[4])
var gotStatus wire.BlobFetchStatus
i := 0
for bs.Advance() {
gotStatus = bs.Value()
if !skipIncStatus {
if i <= 1 {
if !reflect.DeepEqual(gotStatus, status[i]) {
return fmt.Errorf("Fetch blob failed, got status %v want status %v\n", gotStatus, status[i])
}
i++
} else if !(gotStatus.State == status[2].State || reflect.DeepEqual(gotStatus, status[3])) {
return fmt.Errorf("Fetch blob failed, got status %v\n", gotStatus)
}
}
}
if !reflect.DeepEqual(gotStatus, status[3]) {
return fmt.Errorf("Fetch blob failed, got status %v want status %v\n", gotStatus, status[3])
}
if bs.Err() != nil {
return fmt.Errorf("Fetch blob failed, err %v\n", err)
}
wantSize, _ := strconv.ParseInt(args[3], 10, 64)
gotSize, err := b.Size(ctx)
if err != nil || gotSize != wantSize {
return fmt.Errorf("Blob size incorrect, got %v want %v, err %v\n", gotSize, wantSize, err)
}
return nil
}, "runFetchBlob")
// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
// actual key), 3: expected blob data, 4: offset for get.
var runGetBlob = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
pos, _ := strconv.ParseUint(args[2], 10, 64)
key := fmt.Sprintf("%s%d", args[1], pos)
r := tb.Row(key)
var s testStruct
// Try for 10 seconds to get the new value.
var err error
for i := 0; i < 10; i++ {
// Note: the error is a decode error since the old value is a
// string, and the new value is testStruct.
if err = r.Get(ctx, &s); err == nil {
break
}
time.Sleep(1 * time.Second)
}
if err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
b := d.Blob(s.Blob)
offset, _ := strconv.ParseInt(args[4], 10, 64)
br, err := b.Get(ctx, offset)
if err != nil {
return fmt.Errorf("GetBlob RPC failed, err %v\n", err)
}
var gotVal []byte
for br.Advance() {
gotVal = append(gotVal, br.Value()...)
}
if br.Err() != nil {
return fmt.Errorf("Getting a blob failed, err %v\n", br.Err())
}
if !reflect.DeepEqual(gotVal, []byte(args[3])) {
return fmt.Errorf("Getting a blob failed, got %v want %v\n", gotVal, []byte(args[3]))
}
return nil
}, "runGetBlob")
// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
// actual key).
var runGenerateBigBlob = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
b, err := d.CreateBlob(ctx)
if err != nil {
return fmt.Errorf("CreateBlob failed, err %v\n", err)
}
bw, err := b.Put(ctx)
if err != nil {
return fmt.Errorf("PutBlob RPC failed, err %v\n", err)
}
hasher := md5.New()
chunkSize := 8192
content := make([]byte, chunkSize)
// Send 1 MB blob.
for i := 0; i < 128; i++ {
if n, err := rand.Read(content); err != nil || n != chunkSize {
return fmt.Errorf("Creating blob data failed, n %v err %v\n", n, err)
}
if err := bw.Send(content); err != nil {
return fmt.Errorf("Sending blob data failed, err %v\n", err)
}
hasher.Write(content)
}
if err := bw.Close(); err != nil {
return fmt.Errorf("Closing blob writer failed, err %v\n", err)
}
// Commit the blob.
if err := b.Commit(ctx); err != nil {
return fmt.Errorf("Committing a blob failed, err %v\n", err)
}
// Put the BlobRef in a key.
tb := d.Table("tb")
pos, _ := strconv.ParseUint(args[2], 10, 64)
key := fmt.Sprintf("%s%d", args[1], pos)
r := tb.Row(key)
// Blob hash is transferred via structured store.
s := testStruct{Val: hashToString(hasher.Sum(nil)), Blob: b.Ref()}
if err := r.Put(ctx, s); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
return nil
}, "runGenerateBigBlob")
// Arguments: 0: syncbase name, 1: key prefix, 2: key position (1+2 is the
// actual key).
var runGetBigBlob = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
pos, _ := strconv.ParseUint(args[2], 10, 64)
key := fmt.Sprintf("%s%d", args[1], pos)
r := tb.Row(key)
var s testStruct
// Try for 10 seconds to get the new value.
var err error
for i := 0; i < 10; i++ {
// Note: the error is a decode error since the old value is a
// string, and the new value is testStruct.
if err = r.Get(ctx, &s); err == nil {
break
}
time.Sleep(1 * time.Second)
}
if err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
b := d.Blob(s.Blob)
br, err := b.Get(ctx, 0)
if err != nil {
return fmt.Errorf("GetBlob RPC failed, err %v\n", err)
}
hasher := md5.New()
for br.Advance() {
content := br.Value()
hasher.Write(content)
}
if br.Err() != nil {
return fmt.Errorf("Getting a blob failed, err %v\n", br.Err())
}
gotHash := hashToString(hasher.Sum(nil))
if !reflect.DeepEqual(gotHash, s.Val) {
return fmt.Errorf("Getting a blob failed, got %v want %v\n", gotHash, s.Val)
}
return nil
}, "runGetBigBlob")
// Copied from localblobstore/fs_cablobstore/fs_cablobstore.go.
//
// hashToString() returns a string representation of the hash.
// Requires len(hash)==16. An md5 hash is suitable.
func hashToString(hash []byte) string {
return fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
hash[0], hash[1], hash[2], hash[3],
hash[4], hash[5], hash[6], hash[7],
hash[8], hash[9], hash[10], hash[11],
hash[12], hash[13], hash[14], hash[15])
}