blob: 18aee4f2b9844ed36a37ea135e2a7d8dd2567ac3 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package featuretests_test
import (
"flag"
"fmt"
"os"
"testing"
"time"
"v.io/v23/naming"
"v.io/v23/services/watch"
"v.io/v23/syncbase"
"v.io/v23/syncbase/nosql"
constants "v.io/x/ref/services/syncbase/server/util"
"v.io/x/ref/test/v23test"
)
const pingPongPairIterations = 500
var numSync *int = flag.Int("numSync", 2, "run test with 2 or more syncbases")
var numGroup *int = flag.Int("numGroup", 1, "run test with 1 or more syncgroups")
// BenchmarkPingPongPair measures the round trip sync latency between a pair of
// Syncbase instances that Ping Pong data to each other over a syncgroup.
//
// This benchmark performs the following operations:
// - Create two syncbase instances and have them join the same syncgroup.
// - Each watches the other syncbase's "section" of the table.
// - A preliminary write by each syncbase ensures that clocks are synced.
// - During Ping Pong, each syncbase instance writes data to its own section of
// the table. The other syncbase watches that section for writes. Once a write
// is received, it does the same.
//
// After the benchmark completes, the "ns/op" value refers to the average time
// for |pingPongPairIterations| of Ping Pong roundtrips completed.
func BenchmarkPingPongPair(b *testing.B) {
flag.Parse()
if *numSync < 2 {
b.Fatalf("numSync should be at least 2, received %d\n", *numSync)
}
if *numGroup < 1 {
b.Fatalf("numGroup should be at least 1, received %d\n", *numGroup)
}
sh := v23test.NewShell(b, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
b.ResetTimer()
b.StopTimer()
for iter := 0; iter < b.N; iter++ {
// Setup *numSync Syncbases.
sbs := setupSyncbases(b, sh, *numSync)
// Setup *numGroup Syncgroups
for g := 0; g < *numGroup; g++ {
// Syncbase s0 is the creator.
sgSuffix := fmt.Sprintf("SG%d", g+1)
sgName := naming.Join(sbs[0].sbName, constants.SyncbaseSuffix, sgSuffix)
// TODO(alexfandrianto): Was unable to use the empty prefix ("tb:").
// Observation: w0's watch isn't working with the empty prefix.
// Possible Explanation: The empty prefix ACL receives an initial value from
// the Table ACL. If this value is synced over from the opposing peer,
// conflict resolution can mean that s0 loses the ability to watch.
syncString := fmt.Sprintf("%s:p", testTable)
ok(b, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgName, syncString, "", sbBlessings(sbs), nil))
// The other syncbases will attempt to join the syncgroup.
for i := 1; i < *numSync; i++ {
ok(b, joinSyncgroup(sbs[i].clientCtx, sbs[i].sbName, sgName))
}
}
// Obtain the handles to the databases.
db0, _ := getDbAndTable(sbs[0].sbName)
db1, _ := getDbAndTable(sbs[1].sbName)
// Setup the remaining syncbases
for i := 2; i < *numSync; i++ {
getDbAndTable(sbs[i].sbName)
}
// Set up the watch streams (watching the other syncbase's prefix).
prefix0, prefix1 := "prefix0", "prefix1"
w0, err := db0.Watch(sbs[0].clientCtx, testTable, prefix1, watch.ResumeMarker("now"))
ok(b, err)
w1, err := db1.Watch(sbs[1].clientCtx, testTable, prefix0, watch.ResumeMarker("now"))
ok(b, err)
// The join has succeeded, so make sure sync is initialized.
// The strategy is: s0 sends to s1, and then s1 responds.
sendInt32Sync(b, sbs[0], prefix0, w1)
sendInt32Sync(b, sbs[1], prefix1, w0)
// Sync is now active, so it is time to really start our watch streams.
c0, c1 := make(chan int32), make(chan int32)
go watchInt32s(b, w0, c0)
go watchInt32s(b, w1, c1)
// Perform and time |pingPongIterations| of ping pong between syncbases.
b.StartTimer()
lastTime := time.Now()
var t0to1, t1to0 int64
for i := 0; i < pingPongPairIterations; i++ {
var delta int64
value := int32(i)
ok(b, writeInt32(sbs[0], prefix0, value))
<-c1
delta, lastTime = getDelta(lastTime)
t0to1 += delta
ok(b, writeInt32(sbs[1], prefix1, value))
<-c0
delta, lastTime = getDelta(lastTime)
t1to0 += delta
}
b.StopTimer()
// Clean up syncbases.
for i, _ := range sbs {
sbs[i].cleanup(os.Interrupt)
}
// Log intermediate information
b.Logf("Iteration %d of %d\n", iter, b.N)
b.Logf("Avg Time from 0 to 1: %d ns\n", t0to1/int64(pingPongPairIterations))
b.Logf("Avg Time from 1 to 0: %d ns\n", t1to0/int64(pingPongPairIterations))
b.Logf("Avg Time per iteration: %d ns\n", (t1to0+t0to1)/int64(pingPongPairIterations))
}
}
// getDelta computes the delta between the last time and now.
// It returns both that delta and the new time.
func getDelta(lastTime time.Time) (delta int64, nextTime time.Time) {
nextTime = time.Now()
delta = nextTime.Sub(lastTime).Nanoseconds()
return
}
// sendInt32Sync sends data from 1 syncbase to another.
// Be sure that the receiving watch stream sees the data and is still ok.
func sendInt32Sync(b *testing.B, ts *testSyncbase, senderPrefix string, w nosql.WatchStream) {
ok(b, writeInt32(ts, senderPrefix, -1))
if w.Advance() {
w.Change() // grab the change, but ignore the value.
}
watchStreamOk(b, w)
}
// watchInt32s sends the value of each put through to the channel.
func watchInt32s(b *testing.B, w nosql.WatchStream, c chan int32) {
var count int
for count < pingPongPairIterations && w.Advance() {
var t int32
change := w.Change()
if change.ChangeType == nosql.DeleteChange {
b.Error("Received a delete change")
}
err := change.Value(&t)
if err != nil {
b.Error(err)
}
c <- t
watchStreamOk(b, w)
count++
}
w.Cancel() // The stream can be canceled since we've seen enough iterations.
}
// getDbAndTable obtains the database and table handles for a syncbase name.
func getDbAndTable(syncbaseName string) (d nosql.Database, tb nosql.Table) {
a := syncbase.NewService(syncbaseName).App(testApp)
d = a.NoSQLDatabase(testDb, nil)
tb = d.Table(testTable)
return
}
// writeInt32 writes keyValue into syncbase at keyPrefix/keyValue.
func writeInt32(ts *testSyncbase, keyPrefix string, keyValue int32) error {
ctx := ts.clientCtx
syncbaseName := ts.sbName
_, tb := getDbAndTable(syncbaseName)
key := fmt.Sprintf("%s/%d", keyPrefix, keyValue)
if err := tb.Put(ctx, key, keyValue); err != nil {
return fmt.Errorf("tb.Put() failed: %v", err)
}
return nil
}
// watchStreamOk emits an error if the watch stream has an error.
func watchStreamOk(b *testing.B, w nosql.WatchStream) {
if w.Err() != nil {
b.Errorf("stream error: %v", w.Err())
}
}