syncbase: Simplify ping pong benchmark

Store int32 instead of time.Time and print out directional average times.
We no longer need DevModeGetTime.

It looks like the parameters peerSyncInterval and watchPollInterval still
dominate these times.

I have chosen 500 iterations of ping pong to establish our baseline.

With 50 and 100, we get about 200ms of latency per round trip.
With 50 and 10, we get about 150ms of latency per round trip.
With 10 and 10, we get about 32ms.
With 5 and 5, we get about 26ms.

(With 1 and 1, we can get as low as 9ms for small values of b.N, but as
b.N increases, the number rises rapidly.)

To make comparisons more fair, we may want to
* reuse the same prefix (when writing to the table)
* decrease the values in parameters.go

Change-Id: I42b16d759a48c9499d91f015cb51a52e175b0b38
diff --git a/syncbase/featuretests/ping_pong_test.go b/syncbase/featuretests/ping_pong_test.go
index 0473d27..278b2a7 100644
--- a/syncbase/featuretests/ping_pong_test.go
+++ b/syncbase/featuretests/ping_pong_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"os"
 	"testing"
 	"time"
 
@@ -17,6 +18,8 @@
 	"v.io/x/ref/test/v23test"
 )
 
+const pingPongPairIterations = 500
+
 // BenchmarkPingPongPair measures the round trip sync latency between a pair of
 // Syncbase instances that Ping Pong data to each other over a syncgroup.
 //
@@ -24,87 +27,113 @@
 // - Create two syncbase instances and have them join the same syncgroup.
 // - Each watches the other syncbase's "section" of the table.
 // - A preliminary write by each syncbase ensures that clocks are synced.
-// - During Ping Pong, each syncbase instance finds its local dev time and
-//   writes it to its section of the table. The other syncbase watches for
-//   this value; once received, it does the same.
+// - During Ping Pong, each syncbase instance writes data to its own section of
+//   the table. The other syncbase watches that section for writes. Once a write
+//   is received, it does the same.
 //
 // After the benchmark completes, the "ns/op" value refers to the average time
-// per Ping Pong roundtrip completed by the two syncbases.
-//
-// Note: This benchmark can write simpler values (like int32) or use time.Now()
-// instead of using a DevModeGetTime RPC. This can affect the benchmark stats.
+// for |pingPongPairIterations| of Ping Pong roundtrips completed.
 func BenchmarkPingPongPair(b *testing.B) {
 	sh := v23test.NewShell(b, v23test.Opts{})
 	defer sh.Cleanup()
 	sh.StartRootMountTable()
 
-	// Setup 2 Syncbases with the dev mode clock on.
-	sbs := setupSyncbases(b, sh, 2, "--dev")
-
-	// Syncbase s0 is the creator.
-	sgName := naming.Join(sbs[0].sbName, constants.SyncbaseSuffix, "SG1")
-
-	// TODO(alexfandrianto): Was unable to use the empty prefix ("tb:").
-	// Observation: w0's watch isn't working with the empty prefix.
-	// Possible Explanation: The empty prefix ACL receives an initial value from
-	// the Table ACL. If this value is synced over from the opposing peer,
-	// conflict resolution can mean that s0 loses the ability to watch.
-	syncString := fmt.Sprintf("%s:p", testTable)
-	ok(b, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgName, syncString, "", sbBlessings(sbs), nil))
-
-	// Syncbase s1 will attempt to join the syncgroup.
-	ok(b, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sgName))
-
-	// Obtain the handles to the databases.
-	db0, _ := getDbAndTable(sbs[0].sbName)
-	db1, _ := getDbAndTable(sbs[1].sbName)
-
-	// Set up the watch streams (watching the other syncbase's prefix).
-	prefix0, prefix1 := "prefix0", "prefix1"
-	w0, err := db0.Watch(sbs[0].clientCtx, testTable, prefix1, watch.ResumeMarker("now"))
-	ok(b, err)
-	w1, err := db1.Watch(sbs[1].clientCtx, testTable, prefix0, watch.ResumeMarker("now"))
-	ok(b, err)
-
-	// The join has succeeded, so it's time to ensure clocks are synchronized.
-	// The strategy is: s0 sends to s1, and then s1 responds.
-	sendTimeSync(b, sbs[0], prefix0, w1)
-	sendTimeSync(b, sbs[1], prefix1, w0)
-
-	// The clocks are synchronized, so it is now time to really start our watch
-	// streams. This will allow us to ping and pong.
-	c0, c1 := make(chan time.Time), make(chan time.Time)
-	go watchTimes(b, w0, c0)
-	go watchTimes(b, w1, c1)
 	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		suffix := fmt.Sprintf("%d", i)
-		ok(b, writeTime(sbs[0], prefix0, suffix))
-		<-c1
-		ok(b, writeTime(sbs[1], prefix1, suffix))
-		<-c0
-	}
+	b.StopTimer()
 
-	// TODO(alexfandrianto): Should cancel these watch streams. Unfortunately, we
-	// cannot cancel while a watch stream's Advance() blocks.
-	//w0.Cancel()
-	//w1.Cancel()
+	for iter := 0; iter < b.N; iter++ {
+		// Setup 2 Syncbases.
+		sbs := setupSyncbases(b, sh, 2)
+
+		// Syncbase s0 is the creator.
+		sgName := naming.Join(sbs[0].sbName, constants.SyncbaseSuffix, "SG1")
+
+		// TODO(alexfandrianto): Was unable to use the empty prefix ("tb:").
+		// Observation: w0's watch isn't working with the empty prefix.
+		// Possible Explanation: The empty prefix ACL receives an initial value from
+		// the Table ACL. If this value is synced over from the opposing peer,
+		// conflict resolution can mean that s0 loses the ability to watch.
+		syncString := fmt.Sprintf("%s:p", testTable)
+		ok(b, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgName, syncString, "", sbBlessings(sbs), nil))
+
+		// Syncbase s1 will attempt to join the syncgroup.
+		ok(b, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sgName))
+
+		// Obtain the handles to the databases.
+		db0, _ := getDbAndTable(sbs[0].sbName)
+		db1, _ := getDbAndTable(sbs[1].sbName)
+
+		// Set up the watch streams (watching the other syncbase's prefix).
+		prefix0, prefix1 := "prefix0", "prefix1"
+		w0, err := db0.Watch(sbs[0].clientCtx, testTable, prefix1, watch.ResumeMarker("now"))
+		ok(b, err)
+		w1, err := db1.Watch(sbs[1].clientCtx, testTable, prefix0, watch.ResumeMarker("now"))
+		ok(b, err)
+
+		// The join has succeeded, so make sure sync is initialized.
+		// The strategy is: s0 sends to s1, and then s1 responds.
+		sendInt32Sync(b, sbs[0], prefix0, w1)
+		sendInt32Sync(b, sbs[1], prefix1, w0)
+
+		// Sync is now active, so it is time to really start our watch streams.
+		c0, c1 := make(chan int32), make(chan int32)
+		go watchInt32s(b, w0, c0)
+		go watchInt32s(b, w1, c1)
+
+		// Perform and time |pingPongIterations| of ping pong between syncbases.
+		b.StartTimer()
+		lastTime := time.Now()
+		var t0to1, t1to0 int64
+		for i := 0; i < pingPongPairIterations; i++ {
+			var delta int64
+			value := int32(i)
+			ok(b, writeInt32(sbs[0], prefix0, value))
+			<-c1
+			delta, lastTime = getDelta(lastTime)
+			t0to1 += delta
+			ok(b, writeInt32(sbs[1], prefix1, value))
+			<-c0
+			delta, lastTime = getDelta(lastTime)
+			t1to0 += delta
+		}
+		b.StopTimer()
+
+		// Clean up syncbases.
+		for i, _ := range sbs {
+			sbs[i].cleanup(os.Interrupt)
+		}
+
+		// Print out intermediate information.
+		fmt.Printf("Iteration %d of %d\n", iter, b.N)
+		fmt.Printf("Avg Time from 0 to 1: %d ns\n", t0to1/int64(pingPongPairIterations))
+		fmt.Printf("Avg Time from 1 to 0: %d ns\n", t1to0/int64(pingPongPairIterations))
+		fmt.Printf("Avg Time per iteration: %d ns\n", (t1to0+t0to1)/int64(pingPongPairIterations))
+	}
 }
 
-// sendTimeSync sends data from 1 syncbase to another.
+// getDelta computes the delta between the last time and now.
+// It returns both that delta and the new time.
+func getDelta(lastTime time.Time) (delta int64, nextTime time.Time) {
+	nextTime = time.Now()
+	delta = nextTime.Sub(lastTime).Nanoseconds()
+	return
+}
+
+// sendInt32Sync sends data from 1 syncbase to another.
 // Be sure that the receiving watch stream sees the data and is still ok.
-func sendTimeSync(b *testing.B, ts *testSyncbase, senderPrefix string, w nosql.WatchStream) {
-	ok(b, writeTime(ts, senderPrefix, "synctime"))
+func sendInt32Sync(b *testing.B, ts *testSyncbase, senderPrefix string, w nosql.WatchStream) {
+	ok(b, writeInt32(ts, senderPrefix, -1))
 	if w.Advance() {
 		w.Change() // grab the change, but ignore the value.
 	}
 	watchStreamOk(b, w)
 }
 
-// watchTimes sends the value of each put through to the channel.
-func watchTimes(b *testing.B, w nosql.WatchStream, c chan time.Time) {
-	for w.Advance() {
-		var t time.Time
+// watchInt32s sends the value of each put through to the channel.
+func watchInt32s(b *testing.B, w nosql.WatchStream, c chan int32) {
+	var count int
+	for count < pingPongPairIterations && w.Advance() {
+		var t int32
 		change := w.Change()
 		if change.ChangeType == nosql.DeleteChange {
 			b.Error("Received a delete change")
@@ -115,7 +144,10 @@
 		}
 		c <- t
 		watchStreamOk(b, w)
+		count++
 	}
+
+	w.Cancel() // The stream can be canceled since we've seen enough iterations.
 }
 
 // getDbAndTable obtains the database and table handles for a syncbase name.
@@ -126,22 +158,16 @@
 	return
 }
 
-// writeTime writes a timestamp into syncbase at the combination of keyPrefix
-// and keySuffix.
-func writeTime(ts *testSyncbase, keyPrefix, keySuffix string) error {
+// writeInt32 writes keyValue into syncbase at keyPrefix/keyValue.
+func writeInt32(ts *testSyncbase, keyPrefix string, keyValue int32) error {
 	ctx := ts.clientCtx
 	syncbaseName := ts.sbName
 	_, tb := getDbAndTable(syncbaseName)
 
-	key := fmt.Sprintf("%s/%s", keyPrefix, keySuffix)
-	// sc is a helper to obtain the syncbase wire service.
-	time, err := sc(syncbaseName).DevModeGetTime(ctx)
-	if err != nil {
-		return fmt.Errorf("sb.DevModeGetTime() failed: %v", err)
-	}
+	key := fmt.Sprintf("%s/%d", keyPrefix, keyValue)
 
-	if err2 := tb.Put(ctx, key, time); err2 != nil {
-		return fmt.Errorf("tb.Put() failed: %v", err2)
+	if err := tb.Put(ctx, key, keyValue); err != nil {
+		return fmt.Errorf("tb.Put() failed: %v", err)
 	}
 	return nil
 }
diff --git a/syncbase/featuretests/test_util_test.go b/syncbase/featuretests/test_util_test.go
index cd2b251..4bd6b12 100644
--- a/syncbase/featuretests/test_util_test.go
+++ b/syncbase/featuretests/test_util_test.go
@@ -7,6 +7,7 @@
 import (
 	"errors"
 	"fmt"
+	"os"
 	"reflect"
 	"strings"
 	"testing"
@@ -50,6 +51,7 @@
 	sbCreds   *v23test.Credentials
 	clientId  string
 	clientCtx *context.T
+	cleanup   func(sig os.Signal)
 }
 
 // Spawns "num" Syncbase instances and returns handles to them.
@@ -65,7 +67,7 @@
 		}
 		// Give XRWA permissions to this Syncbase's client.
 		acl := fmt.Sprintf(`{"Resolve":{"In":["root:%s"]},"Read":{"In":["root:%s"]},"Write":{"In":["root:%s"]},"Admin":{"In":["root:%s"]}}`, clientId, clientId, clientId, clientId)
-		sh.StartSyncbase(sbs[i].sbCreds, sbs[i].sbName, "", acl, args...)
+		sbs[i].cleanup = sh.StartSyncbase(sbs[i].sbCreds, sbs[i].sbName, "", acl, args...)
 	}
 	// Call setupHierarchy on each Syncbase.
 	for _, sb := range sbs {