syncbase/vsync: 1st end-to-end integration test.
First end-to-end sync integration test. Work mostly done by hpucha@
being wrapped up in a CL. The test runs 2 Syncbases, 2 clients, and
one MountTable. The 1st client creates a SyncGroup and stores some
syncable objects. The 2nd client joins the SyncGroup and verifies
the receipt of the objects via sync.
The integration test exposed a few bugs across components (watcher,
initiator, responder) being fixed in this CL, and unit tests updated.
Change-Id: I6cd7d7feefd957e968b784795967e02ff66cb0d0
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index ff4b5cc..849f352 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -6,18 +6,24 @@
import (
"fmt"
+ "time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase"
+ "v.io/syncbase/v23/syncbase/nosql"
tu "v.io/syncbase/v23/syncbase/testutil"
constants "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23"
"v.io/v23/naming"
+ "v.io/x/ref"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
)
+// TODO(rdaoud): change the credentials of servers s0 and s1 to independent
+// principals derived from the same root.
+
//go:generate v23 test generate
func V23TestSyncbasedJoinSyncGroup(t *v23tests.T) {
@@ -47,10 +53,13 @@
d := a.NoSQLDatabase("d")
d.Create(ctx, nil)
+ mtName := env.Vars[ref.EnvNamespacePrefix]
+
spec := wire.SyncGroupSpec{
Description: "test syncgroup sg",
Perms: perms("root/s0", "root/s0/s1"),
- Prefixes: []string{"t1/foo"},
+ Prefixes: []string{"t1:foo"},
+ MountTables: []string{mtName},
}
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
sg := d.SyncGroup(sgName)
@@ -74,7 +83,133 @@
sg := d.SyncGroup(sgName)
info := wire.SyncGroupMemberInfo{10}
if _, err := sg.Join(ctx, info); err != nil {
- return fmt.Errorf("Join SG %v failed: %v", sgName, err)
+ return fmt.Errorf("Join SG %q failed: %v", sgName, err)
}
return nil
}, "runJoinSyncGroup")
+
+// V23TestSyncbasedGetDeltas tests the exchange of deltas between two Syncbase
+// instances and their clients. The 1st client creates a SyncGroup and puts
+// some database entries in it. The 2nd client joins that SyncGroup and reads
+// the database entries. This verifies the end-to-end synchronization of data
+// along the path: client1--Syncbase1--Syncbase2--client2.
+func V23TestSyncbasedGetDeltas(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
+ defer cleanSync1()
+
+ tu.RunClient(t, client0Creds, runCreateAndPopulateSyncGroup)
+ tu.RunClient(t, client1Creds, runJoinSyncGroupAndFetchData)
+}
+
+var runCreateAndPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService("sync0").App("a")
+ a.Create(ctx, nil)
+ d := a.NoSQLDatabase("d")
+ d.Create(ctx, nil)
+ d.CreateTable(ctx, "tb", nil)
+
+ mtName := env.Vars[ref.EnvNamespacePrefix]
+
+ spec := wire.SyncGroupSpec{
+ Description: "test syncgroup sg",
+ Perms: perms("root/s0", "root/s0/s1"),
+ Prefixes: []string{"tb:foo"},
+ MountTables: []string{mtName},
+ }
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+ sg := d.SyncGroup(sgName)
+ info := wire.SyncGroupMemberInfo{8}
+ if err := sg.Create(ctx, spec, info); err != nil {
+ return fmt.Errorf("Create SG %q failed: %v", sgName, err)
+ }
+
+ // Do Puts.
+ tb := d.Table("tb")
+ for i := 0; i < 10; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ if err := r.Put(ctx, "testkey"+key); err != nil {
+ return fmt.Errorf("r.Put() failed: %v", err)
+ }
+ }
+
+ return nil
+}, "runCreateAndPopulateSyncGroup")
+
+var runJoinSyncGroupAndFetchData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService("sync1").App("a")
+ a.Create(ctx, nil)
+ d := a.NoSQLDatabase("d")
+ d.Create(ctx, nil)
+ d.CreateTable(ctx, "tb", nil)
+
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+ sg := d.SyncGroup(sgName)
+ info := wire.SyncGroupMemberInfo{10}
+ if _, err := sg.Join(ctx, info); err != nil {
+ return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
+ }
+
+ // Wait for a bit (up to 4 sec) until the last key appears.
+ tb := d.Table("tb")
+ r := tb.Row("foo9")
+ for i := 0; i < 8; i++ {
+ time.Sleep(500 * time.Millisecond)
+ var value string
+ if err := r.Get(ctx, &value); err == nil {
+ break
+ }
+ }
+
+ // Verify that all keys and values made it correctly.
+ for i := 0; i < 10; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ var got string
+ if err := r.Get(ctx, &got); err != nil {
+ return fmt.Errorf("r.Get() failed: %v\n", err)
+ }
+ want := "testkey" + key
+ if got != want {
+ return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
+ }
+ }
+
+ // Re-verify using a scan operation.
+ stream := tb.Scan(ctx, nosql.Prefix("foo"))
+ for i := 0; stream.Advance(); i++ {
+ want := fmt.Sprintf("foo%d", i)
+ got := stream.Key()
+ if got != want {
+ return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
+ }
+ want = "testkey" + want
+ if err := stream.Value(&got); err != nil {
+ return fmt.Errorf("cannot fetch value in scan: %v\n", err)
+ }
+ if got != want {
+ return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
+ }
+ }
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("scan stream error: %v\n", err)
+ }
+
+ return nil
+}, "runJoinSyncGroupAndFetchData")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index efbdd39..a14dbe0 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -28,3 +28,7 @@
func TestV23SyncbasedJoinSyncGroup(t *testing.T) {
v23tests.RunTest(t, V23TestSyncbasedJoinSyncGroup)
}
+
+func TestV23SyncbasedGetDeltas(t *testing.T) {
+ v23tests.RunTest(t, V23TestSyncbasedGetDeltas)
+}
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 1624301..1d447c5 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -10,6 +10,7 @@
"time"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
@@ -124,7 +125,13 @@
}
iSt.stream = stream
- req := interfaces.DeltaReq{SgIds: iSt.sgIds, InitVec: iSt.local}
+ req := interfaces.DeltaReq{
+ AppName: iSt.appName,
+ DbName: iSt.dbName,
+ SgIds: iSt.sgIds,
+ InitVec: iSt.local,
+ }
+
sender := iSt.stream.SendStream()
sender.Send(req)
@@ -264,7 +271,8 @@
return nil, false
}
for mt := range iSt.mtTables {
- c := interfaces.SyncClient(naming.Join(mt, iSt.peer))
+ absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
+ c := interfaces.SyncClient(absName)
stream, err := c.GetDeltas(ctx)
if err == nil {
return stream, true
@@ -382,7 +390,6 @@
return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream")
}
finish = true
- break
case interfaces.DeltaRespRespVec:
iSt.remote = v.Value
@@ -411,6 +418,11 @@
// Mark object dirty.
iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
}
+
+ // Break out of the stream.
+ if finish {
+ break
+ }
}
if !(start && finish) {
@@ -514,6 +526,7 @@
for {
iSt.tx = iSt.st.NewTransaction()
+ watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
if err := iSt.detectConflicts(ctx); err != nil {
return err
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
index bfcb250..d3c55d4 100644
--- a/x/ref/services/syncbase/vsync/initiator_test.go
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -256,7 +256,7 @@
testIfMapArrEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
- s.syncState[gdb] = &dbSyncStateInMem{}
+ s.initDbSyncStateInMem(nil, "mockapp", "mockdb")
// Create local genvec so that it contains knowledge only about common prefixes.
if err := iSt.createLocalGenVec(nil); err != nil {
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 19b1366..d096aa8 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -10,15 +10,17 @@
"strings"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
// GetDeltas implements the responder side of the GetDeltas RPC.
func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
recvr := call.RecvStream()
-
for recvr.Advance() {
req := recvr.Value()
// Ignoring errors since if one Database fails for any reason,
@@ -281,10 +283,11 @@
if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
// Send on the wire.
- wireRec := interfaces.LogRec{Metadata: rec.Metadata}
- // TODO(hpucha): Hash out this fake stream stuff when
- // defining the RPC and the rest of the responder.
- sender.Send(interfaces.DeltaRespRec{wireRec})
+ wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
+ if err != nil {
+ return err
+ }
+ sender.Send(interfaces.DeltaRespRec{*wireRec})
}
// Add a new record from the same device if not done.
@@ -387,11 +390,19 @@
// Note: initPfxs is sorted.
func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
- filter := true
+ // The key starts with one of the store's reserved prefixes for managed
+ // namespaces (e.g. $row, $perms). Remove that prefix before comparing
+ // it with the SyncGroup prefixes which are defined by the application.
+ parts := util.SplitKeyParts(rec.Metadata.ObjId)
+ if len(parts) < 2 {
+ vlog.Fatalf("filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
+ }
+ key := util.JoinKeyParts(parts[1:]...)
+ filter := true
var maxGen uint64
for _, p := range initPfxs {
- if strings.HasPrefix(rec.Metadata.ObjId, p) {
+ if strings.HasPrefix(key, p) {
// Do not filter. Initiator is interested in this
// prefix.
filter = false
@@ -406,12 +417,31 @@
// Filter this record if the initiator already has it.
if maxGen >= rec.Metadata.Gen {
- return true
+ filter = true
}
return filter
}
+// makeWireLogRec creates a sync log record to send on the wire from a given
+// local sync record.
+func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
+ // Get the object value at the required version. Note: GetAtVersion()
+ // requires a transaction to read the data, so create and abort one.
+ // TODO(hpucha): remove the fake Tx after the change in GetAtVersion().
+ tx := st.NewTransaction()
+ defer tx.Abort()
+
+ key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
+ value, err := watchable.GetAtVersion(ctx, tx, []byte(key), nil, []byte(version))
+ if err != nil {
+ return nil, err
+ }
+
+ wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value}
+ return wireRec, nil
+}
+
// A minHeap implements heap.Interface and holds local log records.
type minHeap []*localLogRec
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
index 82c0d94..b452c07 100644
--- a/x/ref/services/syncbase/vsync/responder_test.go
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -12,6 +12,7 @@
"time"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
@@ -378,7 +379,7 @@
if opfx == "" {
continue
}
- okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
+ okey := makeRowKey(fmt.Sprintf("%s~%x", opfx, tRng.Int()))
vers := fmt.Sprintf("%x", tRng.Int())
rec := &localLogRec{
Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
@@ -387,6 +388,10 @@
if err := putLogRec(nil, tx, rec); err != nil {
t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
}
+ value := fmt.Sprintf("value_%s", okey)
+ if err := watchable.PutAtVersion(nil, tx, []byte(okey), []byte(value), []byte(vers)); err != nil {
+ t.Fatalf("PutAtVersion(%d:%d) failed rec %v value %s: err %v", id, k, rec, value, err)
+ }
initPfxs := extractAndSortPrefixes(test.initVec)
if !filterLogRec(rec, test.initVec, initPfxs) {
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 49d9ff5..8db0bc6 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -155,10 +155,23 @@
return verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
- ds.ckPtGen = ds.gen
+ // The frozen generation is the last generation number used, i.e. one
+ // below the next available one to use.
+ ds.ckPtGen = ds.gen - 1
return nil
}
+// initDbSyncStateInMem initializes the in memory sync state of the Database if needed.
+func (s *syncService) initDbSyncStateInMem(ctx *context.T, appName, dbName string) {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ if s.syncState[name] == nil {
+ s.syncState[name] = &dbSyncStateInMem{gen: 1}
+ }
+}
+
// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
s.syncStateLock.Lock()
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 31eaff0..4045416 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -21,6 +21,7 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/naming"
@@ -420,9 +421,8 @@
return err
}
- // TODO(hpucha): Add watch notification to signal SG creation.
-
- return nil
+ tx1 := tx.(store.Transaction)
+ return watchable.AddSyncGroupOp(ctx, tx1, spec.Prefixes, false)
})
if err != nil {
@@ -516,9 +516,8 @@
return err
}
- // TODO(hpucha): Add a watch notification to signal new SG.
-
- return nil
+ tx1 := tx.(store.Transaction)
+ return watchable.AddSyncGroupOp(ctx, tx1, sg.Spec.Prefixes, false)
})
if err != nil {
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 804907c..4865d82 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -93,6 +93,9 @@
resMark = ""
}
+ // Initialize Database sync state if needed.
+ s.initDbSyncStateInMem(ctx, appName, dbName)
+
// Get a batch of watch log entries, if any, after this resume marker.
if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
@@ -327,22 +330,22 @@
timestamp := logEnt.CommitTimestamp
switch op := logEnt.Op.(type) {
- case *watchable.OpGet:
+ case watchable.OpGet:
// TODO(rdaoud): save read-set in sync.
- case *watchable.OpScan:
+ case watchable.OpScan:
// TODO(rdaoud): save scan-set in sync.
- case *watchable.OpPut:
+ case watchable.OpPut:
rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
- case *watchable.OpSyncSnapshot:
+ case watchable.OpSyncSnapshot:
rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
- case *watchable.OpDelete:
+ case watchable.OpDelete:
rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
- case *watchable.OpSyncGroup:
+ case watchable.OpSyncGroup:
vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
default:
@@ -377,7 +380,7 @@
// Otherwise it returns false with no other changes.
func processSyncGroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) bool {
switch op := logEnt.Op.(type) {
- case *watchable.OpSyncGroup:
+ case watchable.OpSyncGroup:
remove := op.Value.Remove
for _, prefix := range op.Value.Prefixes {
if remove {
@@ -399,11 +402,11 @@
func syncable(appdb string, logEnt *watchable.LogEntry) bool {
var key string
switch op := logEnt.Op.(type) {
- case *watchable.OpPut:
+ case watchable.OpPut:
key = string(op.Value.Key)
- case *watchable.OpDelete:
+ case watchable.OpDelete:
key = string(op.Value.Key)
- case *watchable.OpSyncSnapshot:
+ case watchable.OpSyncSnapshot:
key = string(op.Value.Key)
default:
return false
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index aa7a72f..bf4fb95 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -116,7 +116,7 @@
for _, test := range checkSyncableTests {
log := &watchable.LogEntry{
- Op: &watchable.OpPut{
+ Op: watchable.OpPut{
watchable.PutOp{Key: []byte(makeRowKey(test.key))},
},
}
@@ -133,9 +133,9 @@
k, v := []byte(key), []byte(version)
log := &watchable.LogEntry{}
if delete {
- log.Op = &watchable.OpDelete{watchable.DeleteOp{Key: k}}
+ log.Op = watchable.OpDelete{watchable.DeleteOp{Key: k}}
} else {
- log.Op = &watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
+ log.Op = watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
}
return log
}
@@ -143,7 +143,7 @@
// newSGLog creates a SyncGroup watch log entry.
func newSGLog(prefixes []string, remove bool) *watchable.LogEntry {
return &watchable.LogEntry{
- Op: &watchable.OpSyncGroup{
+ Op: watchable.OpSyncGroup{
watchable.SyncGroupOp{Prefixes: prefixes, Remove: remove},
},
}