Merge "services/groups: Authorization fixes."
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index a3d7f7f..6dd6f25 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -14,15 +14,8 @@
type Sync interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(req DeltaReq, initiator string) stream<_, DeltaResp> error {access.Read}
// SyncGroup-related methods.
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index a281543..3006f3c 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -28,15 +28,8 @@
type SyncClientMethods interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(ctx *context.T, req DeltaReq, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -78,9 +71,9 @@
name string
}
-func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 DeltaReq, i1 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
var call rpc.ClientCall
- if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0, i1}, opts...); err != nil {
return
}
ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -143,30 +136,13 @@
// Err returns any error encountered by Advance. Never blocks.
Err() error
}
- // SendStream returns the send side of the Sync.GetDeltas client stream.
- SendStream() interface {
- // Send places the item onto the output stream. Returns errors
- // encountered while sending, or if Send is called after Close or
- // the stream has been canceled. Blocks if there is no buffer
- // space; will unblock when buffer space is available or after
- // the stream has been canceled.
- Send(item DeltaReq) error
- // Close indicates to the server that no more items will be sent;
- // server Recv calls will receive io.EOF after all sent items.
- // This is an optional call - e.g. a client might call Close if it
- // needs to continue receiving items from the server after it's
- // done sending. Returns errors encountered while closing, or if
- // Close is called after the stream has been canceled. Like Send,
- // blocks if there is no buffer space available.
- Close() error
- }
}
// SyncGetDeltasClientCall represents the call returned from Sync.GetDeltas.
type SyncGetDeltasClientCall interface {
SyncGetDeltasClientStream
- // Finish performs the equivalent of SendStream().Close, then blocks until
- // the server is done, and returns the positional return values for the call.
+ // Finish blocks until the server is done, and returns the positional return
+ // values for call.
//
// Finish returns immediately if the call has been canceled; depending on the
// timing the output could either be an error signaling cancelation, or the
@@ -209,23 +185,6 @@
}
return c.c.errRecv
}
-func (c *implSyncGetDeltasClientCall) SendStream() interface {
- Send(item DeltaReq) error
- Close() error
-} {
- return implSyncGetDeltasClientCallSend{c}
-}
-
-type implSyncGetDeltasClientCallSend struct {
- c *implSyncGetDeltasClientCall
-}
-
-func (c implSyncGetDeltasClientCallSend) Send(item DeltaReq) error {
- return c.c.Send(item)
-}
-func (c implSyncGetDeltasClientCallSend) Close() error {
- return c.c.CloseSend()
-}
func (c *implSyncGetDeltasClientCall) Finish() (err error) {
err = c.ClientCall.Finish()
return
@@ -479,15 +438,8 @@
type SyncServerMethods interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, req DeltaReq, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -521,15 +473,8 @@
type SyncServerStubMethods interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -585,8 +530,8 @@
gs *rpc.GlobState
}
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
- return s.impl.GetDeltas(ctx, call, i0)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 DeltaReq, i1 string) error {
+ return s.impl.GetDeltas(ctx, call, i0, i1)
}
func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -632,8 +577,9 @@
Methods: []rpc.MethodDesc{
{
Name: "GetDeltas",
- Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+ Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector for one Database for either SyncGroup metadata or data.",
InArgs: []rpc.ArgDesc{
+ {"req", ``}, // DeltaReq
{"initiator", ``}, // string
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
@@ -691,18 +637,6 @@
// SyncGetDeltasServerStream is the server stream for Sync.GetDeltas.
type SyncGetDeltasServerStream interface {
- // RecvStream returns the receiver side of the Sync.GetDeltas server stream.
- RecvStream() interface {
- // Advance stages an item so that it may be retrieved via Value. Returns
- // true iff there is an item to retrieve. Advance must be called before
- // Value is called. May block if an item is not available.
- Advance() bool
- // Value returns the item that was staged by Advance. May panic if Advance
- // returned false or was not called. Never blocks.
- Value() DeltaReq
- // Err returns any error encountered by Advance. Never blocks.
- Err() error
- }
// SendStream returns the send side of the Sync.GetDeltas server stream.
SendStream() interface {
// Send places the item onto the output stream. Returns errors encountered
@@ -722,8 +656,6 @@
// a typesafe stub that implements SyncGetDeltasServerCall.
type SyncGetDeltasServerCallStub struct {
rpc.StreamServerCall
- valRecv DeltaReq
- errRecv error
}
// Init initializes SyncGetDeltasServerCallStub from rpc.StreamServerCall.
@@ -731,34 +663,6 @@
s.StreamServerCall = call
}
-// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
-func (s *SyncGetDeltasServerCallStub) RecvStream() interface {
- Advance() bool
- Value() DeltaReq
- Err() error
-} {
- return implSyncGetDeltasServerCallRecv{s}
-}
-
-type implSyncGetDeltasServerCallRecv struct {
- s *SyncGetDeltasServerCallStub
-}
-
-func (s implSyncGetDeltasServerCallRecv) Advance() bool {
- s.s.valRecv = DeltaReq{}
- s.s.errRecv = s.s.Recv(&s.s.valRecv)
- return s.s.errRecv == nil
-}
-func (s implSyncGetDeltasServerCallRecv) Value() DeltaReq {
- return s.s.valRecv
-}
-func (s implSyncGetDeltasServerCallRecv) Err() error {
- if s.s.errRecv == io.EOF {
- return nil
- }
- return s.s.errRecv
-}
-
// SendStream returns the send side of the Sync.GetDeltas server stream.
func (s *SyncGetDeltasServerCallStub) SendStream() interface {
Send(item DeltaResp) error
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
new file mode 100644
index 0000000..39732b1
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+func (in GenVector) DeepCopy() GenVector {
+ out := make(GenVector)
+ for p, inpgv := range in {
+ out[p] = inpgv.DeepCopy()
+ }
+ return out
+}
+
+func (in PrefixGenVector) DeepCopy() PrefixGenVector {
+ out := make(PrefixGenVector)
+ for id, gen := range in {
+ out[id] = gen
+ }
+ return out
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 692a731..65a7f32 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -29,6 +29,7 @@
// PrefixGenVector is the generation vector for a data prefix, which maps each
// device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
type PrefixGenVector map[uint64]uint64
// GenVector is the generation vector for a Database, and maps prefixes to their
@@ -70,6 +71,7 @@
}
// GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
type GroupId uint64
// Possible states for a SyncGroup.
@@ -99,21 +101,35 @@
Joiners map[string]wire.SyncGroupMemberInfo // map of joiners to their metadata
}
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+// DeltaReq contains a request to sync either data or SyncGroup metadata for a
+// Database.
+type DeltaReq union {
+ Sgs SgDeltaReq
+ Data DataDeltaReq
+}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
// requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
AppName string
DbName string
SgIds set[GroupId]
InitVec GenVector
}
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+ AppName string
+ DbName string
+ InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
// DeltaResp contains the responder's genvector or the missing log records
// returned in response to an initiator's request for deltas for a Database.
type DeltaResp union {
- Start bool
- Finish bool
Rec LogRec
RespVec GenVector
}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index 1fb1396..ca29abc 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -20,6 +20,7 @@
// PrefixGenVector is the generation vector for a data prefix, which maps each
// device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
type PrefixGenVector map[uint64]uint64
func (PrefixGenVector) __VDLReflect(struct {
@@ -78,6 +79,7 @@
}
// GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
type GroupId uint64
func (GroupId) __VDLReflect(struct {
@@ -157,18 +159,72 @@
}) {
}
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+type (
+ // DeltaReq represents any single field of the DeltaReq union type.
+ //
+ // DeltaReq contains a request to sync either data or SyncGroup metadata for a
+ // Database.
+ DeltaReq interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the DeltaReq union type.
+ __VDLReflect(__DeltaReqReflect)
+ }
+ // DeltaReqSgs represents field Sgs of the DeltaReq union type.
+ DeltaReqSgs struct{ Value SgDeltaReq }
+ // DeltaReqData represents field Data of the DeltaReq union type.
+ DeltaReqData struct{ Value DataDeltaReq }
+ // __DeltaReqReflect describes the DeltaReq union type.
+ __DeltaReqReflect struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+ Type DeltaReq
+ Union struct {
+ Sgs DeltaReqSgs
+ Data DeltaReqData
+ }
+ }
+)
+
+func (x DeltaReqSgs) Index() int { return 0 }
+func (x DeltaReqSgs) Interface() interface{} { return x.Value }
+func (x DeltaReqSgs) Name() string { return "Sgs" }
+func (x DeltaReqSgs) __VDLReflect(__DeltaReqReflect) {}
+
+func (x DeltaReqData) Index() int { return 1 }
+func (x DeltaReqData) Interface() interface{} { return x.Value }
+func (x DeltaReqData) Name() string { return "Data" }
+func (x DeltaReqData) __VDLReflect(__DeltaReqReflect) {}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
// requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
AppName string
DbName string
SgIds map[GroupId]struct{}
InitVec GenVector
}
-func (DeltaReq) __VDLReflect(struct {
- Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+func (DataDeltaReq) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DataDeltaReq"`
+}) {
+}
+
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+ AppName string
+ DbName string
+ InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
+func (SgDeltaReq) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.SgDeltaReq"`
}) {
}
@@ -187,10 +243,6 @@
// __VDLReflect describes the DeltaResp union type.
__VDLReflect(__DeltaRespReflect)
}
- // DeltaRespStart represents field Start of the DeltaResp union type.
- DeltaRespStart struct{ Value bool }
- // DeltaRespFinish represents field Finish of the DeltaResp union type.
- DeltaRespFinish struct{ Value bool }
// DeltaRespRec represents field Rec of the DeltaResp union type.
DeltaRespRec struct{ Value LogRec }
// DeltaRespRespVec represents field RespVec of the DeltaResp union type.
@@ -200,30 +252,18 @@
Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaResp"`
Type DeltaResp
Union struct {
- Start DeltaRespStart
- Finish DeltaRespFinish
Rec DeltaRespRec
RespVec DeltaRespRespVec
}
}
)
-func (x DeltaRespStart) Index() int { return 0 }
-func (x DeltaRespStart) Interface() interface{} { return x.Value }
-func (x DeltaRespStart) Name() string { return "Start" }
-func (x DeltaRespStart) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespFinish) Index() int { return 1 }
-func (x DeltaRespFinish) Interface() interface{} { return x.Value }
-func (x DeltaRespFinish) Name() string { return "Finish" }
-func (x DeltaRespFinish) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespRec) Index() int { return 2 }
+func (x DeltaRespRec) Index() int { return 0 }
func (x DeltaRespRec) Interface() interface{} { return x.Value }
func (x DeltaRespRec) Name() string { return "Rec" }
func (x DeltaRespRec) __VDLReflect(__DeltaRespReflect) {}
-func (x DeltaRespRespVec) Index() int { return 3 }
+func (x DeltaRespRespVec) Index() int { return 1 }
func (x DeltaRespRespVec) Interface() interface{} { return x.Value }
func (x DeltaRespRespVec) Name() string { return "RespVec" }
func (x DeltaRespRespVec) __VDLReflect(__DeltaRespReflect) {}
@@ -257,6 +297,8 @@
vdl.Register((*SyncGroupStatus)(nil))
vdl.Register((*SyncGroup)(nil))
vdl.Register((*DeltaReq)(nil))
+ vdl.Register((*DataDeltaReq)(nil))
+ vdl.Register((*SgDeltaReq)(nil))
vdl.Register((*DeltaResp)(nil))
vdl.Register((*ChunkHash)(nil))
vdl.Register((*ChunkData)(nil))
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index 51a8f7c..eb78d00 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -380,33 +380,7 @@
func (sd *syncDatabase) getMountTables(ctx *context.T, peer string) (map[string]struct{}, error) {
ss := sd.sync.(*syncService)
mInfo := ss.copyMemberInfo(ctx, peer)
-
- mtTables := make(map[string]struct{})
- for gdbName, sgInfo := range mInfo.db2sg {
- appName, dbName, err := splitAppDbName(ctx, gdbName)
- if err != nil {
- return nil, err
- }
- st, err := ss.getDbStore(ctx, nil, appName, dbName)
- if err != nil {
- return nil, err
- }
-
- for id := range sgInfo {
- sg, err := getSyncGroupById(ctx, st, id)
- if err != nil {
- continue
- }
- if _, ok := sg.Joiners[peer]; !ok {
- // Peer is no longer part of the SyncGroup.
- continue
- }
- for _, mt := range sg.Spec.MountTables {
- mtTables[mt] = struct{}{}
- }
- }
- }
- return mtTables, nil
+ return mInfo.mtTables, nil
}
// TODO(hpucha): Persist the blob directory periodically.
diff --git a/services/syncbase/vsync/conflict_resolution.go b/services/syncbase/vsync/conflict_resolution.go
index a8e41f6..689e2d3 100644
--- a/services/syncbase/vsync/conflict_resolution.go
+++ b/services/syncbase/vsync/conflict_resolution.go
@@ -131,11 +131,7 @@
if err != nil {
return nil, err
}
- dev, gen, err := splitLogRecKey(ctx, logKey)
- if err != nil {
- return nil, err
- }
- lrecs[p], err = getLogRec(ctx, iSt.tx, dev, gen)
+ lrecs[p], err = getLogRecByKey(ctx, iSt.tx, logKey)
if err != nil {
return nil, err
}
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index 2644655..c72b3a7 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -335,7 +335,7 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -411,10 +411,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -494,13 +494,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+ if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:data:10:2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -587,13 +587,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:2" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+ if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:data:10:2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -674,10 +674,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 9fa297f..5875e21 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -4,11 +4,10 @@
package vsync
-// Initiator is a goroutine that periodically picks a peer from all the known
-// remote peers, and requests deltas from that peer for all the SyncGroups in
-// common across all apps/databases. It then modifies the sync metadata (DAG and
-// local log records) based on the deltas, detects and resolves conflicts if
-// any, and suitably updates the local Databases.
+// Initiator requests deltas from a chosen peer for all the SyncGroups in common
+// across all apps/databases. It then modifies the sync metadata (DAG and local
+// log records) based on the deltas, detects and resolves conflicts if any, and
+// suitably updates the local Databases.
import (
"sort"
@@ -21,6 +20,7 @@
"v.io/v23/vdl"
"v.io/v23/verror"
"v.io/v23/vom"
+ "v.io/x/lib/set"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/util"
@@ -28,204 +28,167 @@
"v.io/x/ref/services/syncbase/store"
)
-// Policies to pick a peer to sync with.
-const (
- // Picks a peer at random from the available set.
- selectRandom = iota
-
- // TODO(hpucha): implement other policies.
- // Picks a peer with most differing generations.
- selectMostDiff
-
- // Picks a peer that was synced with the furthest in the past.
- selectOldest
-)
-
-var (
- // peerSyncInterval is the duration between two consecutive peer
- // contacts. During every peer contact, the initiator obtains any
- // pending updates from that peer.
- peerSyncInterval = 50 * time.Millisecond
-
- // peerSelectionPolicy is the policy used to select a peer when
- // the initiator gets a chance to sync.
- peerSelectionPolicy = selectRandom
-)
-
-// syncer wakes up every peerSyncInterval to do work: (1) Act as an initiator
-// for SyncGroup metadata by selecting a SyncGroup Admin, and syncing Syncgroup
-// metadata with it (getting updates from the remote peer, detecting and
-// resolving conflicts); (2) Refresh memberView if needed and act as an
-// initiator for data by selecting a peer, and syncing data corresponding to all
-// common SyncGroups across all Databases; (3) Act as a SyncGroup publisher to
-// publish pending SyncGroups; (4) Garbage collect older generations.
-//
-// TODO(hpucha): Currently only does initiation. Add rest.
-func (s *syncService) syncer(ctx *context.T) {
- defer s.pending.Done()
-
- ticker := time.NewTicker(peerSyncInterval)
- defer ticker.Stop()
-
- for {
- select {
- case <-s.closed:
- vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
- return
-
- case <-ticker.C:
- }
-
- // TODO(hpucha): Cut a gen for the responder even if there is no
- // one to initiate to?
-
- // Do work.
- peer, err := s.pickPeer(ctx)
- if err != nil {
- continue
- }
- s.getDeltasFromPeer(ctx, peer)
- }
-}
-
-// getDeltasFromPeer performs an initiation round to the specified
-// peer. An initiation round consists of:
+// getDeltas performs an initiation round to the specified peer. An
+// initiation round consists of two sync rounds:
+// * Sync SyncGroup metadata.
+// * Sync data.
+// Each sync round involves:
// * Contacting the peer to receive all the deltas based on the local genvector.
// * Processing those deltas to discover objects which have been updated.
// * Processing updated objects to detect and resolve any conflicts if needed.
-// * Communicating relevant object updates to the Database, and updating local
-// genvector to catch up to the received remote genvector.
+// * Communicating relevant object updates to the Database in case of data.
+// * Updating local genvector to catch up to the received remote genvector.
//
-// The processing of the deltas is done one Database at a time. If a local error
-// is encountered during the processing of a Database, that Database is skipped
-// and the initiator continues on to the next one. If the connection to the peer
+// The processing of the deltas is done one Database at a time, encompassing all
+// the SyncGroups common to the initiator and the responder. If a local error is
+// encountered during the processing of a Database, that Database is skipped and
+// the initiator continues on to the next one. If the connection to the peer
// encounters an error, this initiation round is aborted. Note that until the
// local genvector is updated based on the received deltas (the last step in an
// initiation round), the work done by the initiator is idempotent.
//
// TODO(hpucha): Check the idempotence, esp in addNode in DAG.
-func (s *syncService) getDeltasFromPeer(ctxIn *context.T, peer string) {
- vlog.VI(2).Infof("sync: getDeltasFromPeer: begin: contacting peer %s", peer)
- defer vlog.VI(2).Infof("sync: getDeltasFromPeer: end: contacting peer %s", peer)
-
- ctx, cancel := context.WithRootCancel(ctxIn)
+func (s *syncService) getDeltas(ctx *context.T, peer string) {
+ vlog.VI(2).Infof("sync: getDeltas: begin: contacting peer %s", peer)
+ defer vlog.VI(2).Infof("sync: getDeltas: end: contacting peer %s", peer)
info := s.copyMemberInfo(ctx, peer)
if info == nil {
- vlog.Fatalf("sync: getDeltasFromPeer: missing information in member view for %q", peer)
+ vlog.Fatalf("sync: getDeltas: missing information in member view for %q", peer)
}
- connected := false
- var stream interfaces.SyncGetDeltasClientCall
+
+ // Preferred mount tables for this peer.
+ prfMtTbls := set.String.ToSlice(info.mtTables)
// Sync each Database that may have SyncGroups common with this peer,
// one at a time.
- for gdbName, sgInfo := range info.db2sg {
+ for gdbName := range info.db2sg {
+ vlog.VI(4).Infof("sync: getDeltas: started for peer %s db %s", peer, gdbName)
- // Initialize initiation state for syncing this Database.
- iSt, err := newInitiationState(ctx, s, peer, gdbName, sgInfo)
- if err != nil {
- vlog.Errorf("sync: getDeltasFromPeer: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
- continue
- }
-
- if len(iSt.sgIds) == 0 || len(iSt.sgPfxs) == 0 {
- vlog.Errorf("sync: getDeltasFromPeer: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
- continue
- }
-
- // Make contact with the peer once.
- if !connected {
- stream, connected = iSt.connectToPeer(ctx)
- if !connected {
- // Try a different Database. Perhaps there are
- // different mount tables.
- continue
- }
- }
-
- // Create local genvec so that it contains knowledge only about common prefixes.
- if err := iSt.createLocalGenVec(ctx); err != nil {
- vlog.Errorf("sync: getDeltasFromPeer: error creating local genvec for gdb %s, err %v", gdbName, err)
- continue
- }
-
- iSt.stream = stream
- req := interfaces.DeltaReq{
- AppName: iSt.appName,
- DbName: iSt.dbName,
- SgIds: iSt.sgIds,
- InitVec: iSt.local,
- }
-
- vlog.VI(3).Infof("sync: getDeltasFromPeer: send request: %v", req)
- sender := iSt.stream.SendStream()
- sender.Send(req)
-
- // Obtain deltas from the peer over the network.
- if err := iSt.recvAndProcessDeltas(ctx); err != nil {
- vlog.Errorf("sync: getDeltasFromPeer: error receiving deltas for gdb %s, err %v", gdbName, err)
- // Returning here since something could be wrong with
- // the connection, and no point in attempting the next
- // Database.
- cancel()
- stream.Finish()
+ if len(prfMtTbls) < 1 {
+ vlog.Errorf("sync: getDeltas: no mount tables found to connect to peer %s", peer)
return
}
- vlog.VI(3).Infof("sync: getDeltasFromPeer: got reply: %v", iSt.remote)
- if err := iSt.processUpdatedObjects(ctx); err != nil {
- vlog.Errorf("sync: getDeltasFromPeer: error processing objects for gdb %s, err %v", gdbName, err)
- // Move to the next Database even if processing updates
- // failed.
+ c, err := newInitiationConfig(ctx, s, peer, gdbName, info, prfMtTbls)
+ if err != nil {
+ vlog.Errorf("sync: getDeltas: couldn't initialize initiator config for peer %s, gdb %s, err %v", peer, gdbName, err)
continue
}
+
+ if err := s.getDBDeltas(ctx, peer, c, true); err == nil {
+ if err := s.getDBDeltas(ctx, peer, c, false); err != nil {
+ vlog.Errorf("sync: getDeltas: failed for data sync, err %v", err)
+ }
+ } else {
+ // If SyncGroup sync fails, abort data sync as well.
+ vlog.Errorf("sync: getDeltas: failed for SyncGroup sync, err %v", err)
+ }
+
+ // Cache the pruned mount table list for the next Database.
+ prfMtTbls = c.mtTables
+
+ vlog.VI(4).Infof("sync: getDeltas: done for peer %s db %s", peer, gdbName)
+ }
+}
+
+// getDBDeltas gets the deltas from the chosen peer. If sg flag is set to true,
+// it will sync SyncGroup metadata. If sg flag is false, it will sync data.
+func (s *syncService) getDBDeltas(ctxIn *context.T, peer string, c *initiationConfig, sg bool) error {
+ vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer sg %v %s", sg, peer)
+ defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer sg %v %s", sg, peer)
+
+ ctx, cancel := context.WithRootCancel(ctxIn)
+ // cancel() is idempotent.
+ defer cancel()
+
+ // Initialize initiation state for syncing this Database.
+ iSt := newInitiationState(ctx, c, sg)
+
+ // Initialize SyncGroup prefixes for data syncing.
+ if !sg {
+ iSt.peerSgInfo(ctx)
+ if len(iSt.config.sgPfxs) == 0 {
+ return verror.New(verror.ErrInternal, ctx, "no SyncGroup prefixes found", peer, iSt.config.appName, iSt.config.dbName)
+ }
}
- if connected {
- stream.Finish()
+ if sg {
+ // Create local genvec so that it contains knowledge about
+ // common SyncGroups and then send the SyncGroup metadata sync
+ // request.
+ if err := iSt.prepareSGDeltaReq(ctx); err != nil {
+ return err
+ }
+ } else {
+ // Create local genvec so that it contains knowledge only about common
+ // prefixes and then send the data sync request.
+ if err := iSt.prepareDataDeltaReq(ctx); err != nil {
+ return err
+ }
}
- cancel()
+
+ // Make contact with the peer.
+ if !iSt.connectToPeer(ctx) {
+ return verror.New(verror.ErrInternal, ctx, "couldn't connect to peer", peer)
+ }
+
+ // Obtain deltas from the peer over the network.
+ if err := iSt.recvAndProcessDeltas(ctx); err != nil {
+ cancel()
+ iSt.stream.Finish()
+ return err
+ }
+
+ vlog.VI(4).Infof("sync: getDBDeltas: got reply: %v", iSt.remote)
+
+ if err := iSt.processUpdatedObjects(ctx); err != nil {
+ return err
+ }
+
+ return iSt.stream.Finish()
}
type sgSet map[interfaces.GroupId]struct{}
-// initiationState is accumulated for each Database during an initiation round.
-type initiationState struct {
- // Relative name of the peer to sync with.
- peer string
+// initiationConfig is the configuration information for a Database in an
+// initiation round.
+type initiationConfig struct {
+ peer string // relative name of the peer to sync with.
- // Collection of mount tables that this peer may have registered with.
- mtTables map[string]struct{}
+ // Mount tables that this peer may have registered with. The first entry
+ // in this array is the mount table where the peer was successfully
+ // reached the last time.
+ mtTables []string
- // SyncGroups being requested in the initiation round.
- sgIds sgSet
-
- // SyncGroup prefixes being requested in the initiation round, and their
- // corresponding SyncGroup ids.
- sgPfxs map[string]sgSet
-
- // Local generation vector.
- local interfaces.GenVector
-
- // Generation vector from the remote peer.
- remote interfaces.GenVector
-
- // Updated local generation vector at the end of the initiation round.
- updLocal interfaces.GenVector
-
- // State to track updated objects during a log replay.
- updObjects map[string]*objConflictState
-
- // DAG state that tracks conflicts and common ancestors.
- dagGraft graftMap
-
+ sgIds sgSet // SyncGroups being requested in the initiation round.
+ sgPfxs map[string]sgSet // SyncGroup prefixes and their ids being requested in the initiation round.
sync *syncService
appName string
dbName string
- st store.Store // Store handle to the Database.
- stream interfaces.SyncGetDeltasClientCall // Stream handle for the GetDeltas RPC.
+ st store.Store // Store handle to the Database.
+}
- // Transaction handle for the initiation round. Used during the update
+// initiationState is accumulated for a Database in each sync round in an
+// initiation round.
+type initiationState struct {
+ // Config information.
+ config *initiationConfig
+
+ // Accumulated sync state.
+ local interfaces.GenVector // local generation vector.
+ remote interfaces.GenVector // generation vector from the remote peer.
+ updLocal interfaces.GenVector // updated local generation vector at the end of sync round.
+ updObjects map[string]*objConflictState // tracks updated objects during a log replay.
+ dagGraft graftMap // DAG state that tracks conflicts and common ancestors.
+
+ req interfaces.DeltaReq // GetDeltas RPC request.
+ stream interfaces.SyncGetDeltasClientCall // stream handle for the GetDeltas RPC.
+
+ // Flag to indicate if this is SyncGroup metadata sync.
+ sg bool
+
+ // Transaction handle for the sync round. Used during the update
// of objects in the Database.
tx store.Transaction
}
@@ -240,93 +203,90 @@
res *conflictResolution
}
-// newInitiationState creates new initiation state.
-func newInitiationState(ctx *context.T, s *syncService, peer string, name string, sgInfo sgMemberInfo) (*initiationState, error) {
- iSt := &initiationState{}
- iSt.peer = peer
- iSt.updObjects = make(map[string]*objConflictState)
- iSt.dagGraft = newGraft()
- iSt.sync = s
+// newInitiatonConfig creates new initiation config. This will be shared between
+// the two sync rounds in the initiation round of a Database.
+func newInitiationConfig(ctx *context.T, s *syncService, peer string, name string, info *memberInfo, mtTables []string) (*initiationConfig, error) {
+ c := &initiationConfig{}
+ c.peer = peer
+ c.mtTables = mtTables
+ c.sgIds = make(sgSet)
+ for id := range info.db2sg[name] {
+ c.sgIds[id] = struct{}{}
+ }
+ if len(c.sgIds) == 0 {
+ return nil, verror.New(verror.ErrInternal, ctx, "no SyncGroups found", peer, name)
+ }
+ // Note: sgPfxs will be inited when needed by the data sync.
+
+ c.sync = s
// TODO(hpucha): Would be nice to standardize on the combined "app:db"
// name across sync (not syncbase) so we only join split/join them at
// the boundary with the store part.
var err error
- iSt.appName, iSt.dbName, err = splitAppDbName(ctx, name)
+ c.appName, c.dbName, err = splitAppDbName(ctx, name)
if err != nil {
return nil, err
}
// TODO(hpucha): nil rpc.ServerCall ok?
- iSt.st, err = s.getDbStore(ctx, nil, iSt.appName, iSt.dbName)
+ c.st, err = s.getDbStore(ctx, nil, c.appName, c.dbName)
if err != nil {
return nil, err
}
- iSt.peerMtTblsAndSgInfo(ctx, peer, sgInfo)
-
- return iSt, nil
+ return c, nil
}
-// peerMtTblsAndSgInfo computes the possible mount tables, the SyncGroup Ids and
-// prefixes common with a remote peer in a particular Database by consulting the
-// SyncGroups in the specified Database.
-func (iSt *initiationState) peerMtTblsAndSgInfo(ctx *context.T, peer string, info sgMemberInfo) {
- iSt.mtTables = make(map[string]struct{})
- iSt.sgIds = make(sgSet)
- iSt.sgPfxs = make(map[string]sgSet)
+// newInitiationState creates new initiation state.
+func newInitiationState(ctx *context.T, c *initiationConfig, sg bool) *initiationState {
+ iSt := &initiationState{}
+ iSt.config = c
+ iSt.updObjects = make(map[string]*objConflictState)
+ iSt.dagGraft = newGraft()
+ iSt.sg = sg
+ return iSt
+}
- for id := range info {
- sg, err := getSyncGroupById(ctx, iSt.st, id)
+// peerSgInfo computes the SyncGroup Ids and prefixes common with a remote peer
+// in a particular Database by consulting the SyncGroups in the specified
+// Database.
+func (iSt *initiationState) peerSgInfo(ctx *context.T) {
+ sgs := iSt.config.sgIds
+ iSt.config.sgIds = make(sgSet) // regenerate the sgids since we are going through the SyncGroups in any case.
+ iSt.config.sgPfxs = make(map[string]sgSet)
+
+ for id := range sgs {
+ sg, err := getSyncGroupById(ctx, iSt.config.st, id)
if err != nil {
continue
}
- if _, ok := sg.Joiners[peer]; !ok {
+ if _, ok := sg.Joiners[iSt.config.peer]; !ok {
// Peer is no longer part of the SyncGroup.
continue
}
- for _, mt := range sg.Spec.MountTables {
- iSt.mtTables[mt] = struct{}{}
- }
- iSt.sgIds[id] = struct{}{}
+
+ iSt.config.sgIds[id] = struct{}{}
for _, p := range sg.Spec.Prefixes {
- sgs, ok := iSt.sgPfxs[p]
+ sgs, ok := iSt.config.sgPfxs[p]
if !ok {
sgs = make(sgSet)
- iSt.sgPfxs[p] = sgs
+ iSt.config.sgPfxs[p] = sgs
}
sgs[id] = struct{}{}
}
}
}
-// connectToPeer attempts to connect to the remote peer using the mount tables
-// obtained from the SyncGroups being synced in the current Database.
-func (iSt *initiationState) connectToPeer(ctx *context.T) (interfaces.SyncGetDeltasClientCall, bool) {
- if len(iSt.mtTables) < 1 {
- vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
- return nil, false
- }
- for mt := range iSt.mtTables {
- absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
- c := interfaces.SyncClient(absName)
- stream, err := c.GetDeltas(ctx, iSt.sync.name)
- if err == nil {
- vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
- return stream, true
- }
- }
- return nil, false
-}
-
-// createLocalGenVec creates the generation vector with local knowledge for the
-// initiator to send to the responder.
+// prepareDataDeltaReq creates the generation vector with local knowledge for the
+// initiator to send to the responder, and creates the request to start the data
+// sync.
//
// TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
-func (iSt *initiationState) createLocalGenVec(ctx *context.T) error {
- iSt.sync.thLock.Lock()
- defer iSt.sync.thLock.Unlock()
+func (iSt *initiationState) prepareDataDeltaReq(ctx *context.T) error {
+ iSt.config.sync.thLock.Lock()
+ defer iSt.config.sync.thLock.Unlock()
// Freeze the most recent batch of local changes before fetching
// remote changes from a peer. This frozen state is used by the
@@ -341,19 +301,20 @@
// devices. These remote devices in turn can send these
// generations back to the initiator in progress which was
// started with older generation information.
- if err := iSt.sync.checkptLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
+ if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.appName, iSt.config.dbName, nil); err != nil {
return err
}
- local, lgen, err := iSt.sync.copyDbGenInfo(ctx, iSt.appName, iSt.dbName)
+ local, lgen, err := iSt.config.sync.copyDbGenInfo(ctx, iSt.config.appName, iSt.config.dbName, nil)
if err != nil {
return err
}
+
localPfxs := extractAndSortPrefixes(local)
- sgPfxs := make([]string, len(iSt.sgPfxs))
+ sgPfxs := make([]string, len(iSt.config.sgPfxs))
i := 0
- for p := range iSt.sgPfxs {
+ for p := range iSt.config.sgPfxs {
sgPfxs[i] = p
i++
}
@@ -390,14 +351,84 @@
if lpStart == "" {
// No matching prefixes for pfx were found.
iSt.local[pfx] = make(interfaces.PrefixGenVector)
- iSt.local[pfx][iSt.sync.id] = lgen
+ iSt.local[pfx][iSt.config.sync.id] = lgen
} else {
iSt.local[pfx] = local[lpStart]
}
}
+
+ // Send request.
+ req := interfaces.DataDeltaReq{
+ AppName: iSt.config.appName,
+ DbName: iSt.config.dbName,
+ SgIds: iSt.config.sgIds,
+ InitVec: iSt.local,
+ }
+
+ iSt.req = interfaces.DeltaReqData{req}
+
+ vlog.VI(4).Infof("sync: prepareDataDeltaReq: request: %v", req)
+
return nil
}
+// prepareSGDeltaReq creates the SyncGroup generation vector with local knowledge
+// for the initiator to send to the responder, and prepares the request to start
+// the SyncGroup sync.
+func (iSt *initiationState) prepareSGDeltaReq(ctx *context.T) error {
+ iSt.config.sync.thLock.Lock()
+ defer iSt.config.sync.thLock.Unlock()
+
+ if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.appName, iSt.config.dbName, iSt.config.sgIds); err != nil {
+ return err
+ }
+
+ var err error
+ iSt.local, _, err = iSt.config.sync.copyDbGenInfo(ctx, iSt.config.appName, iSt.config.dbName, iSt.config.sgIds)
+ if err != nil {
+ return err
+ }
+
+ // Send request.
+ req := interfaces.SgDeltaReq{
+ AppName: iSt.config.appName,
+ DbName: iSt.config.dbName,
+ InitVec: iSt.local,
+ }
+
+ iSt.req = interfaces.DeltaReqSgs{req}
+
+ vlog.VI(4).Infof("sync: prepareSGDeltaReq: request: %v", req)
+
+ return nil
+}
+
+// connectToPeer attempts to connect to the remote peer using the mount tables
+// obtained from all the common SyncGroups.
+func (iSt *initiationState) connectToPeer(ctx *context.T) bool {
+ if len(iSt.config.mtTables) < 1 {
+ vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.config.peer, iSt.config.appName, iSt.config.dbName)
+ return false
+ }
+
+ for i, mt := range iSt.config.mtTables {
+ absName := naming.Join(mt, iSt.config.peer, util.SyncbaseSuffix)
+ c := interfaces.SyncClient(absName)
+ var err error
+ iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name)
+ if err == nil {
+ vlog.VI(4).Infof("sync: connectToPeer: established on %s", absName)
+
+ // Prune out the unsuccessful mount tables.
+ iSt.config.mtTables = iSt.config.mtTables[i:]
+ return true
+ }
+ }
+ iSt.config.mtTables = nil
+ vlog.Errorf("sync: connectToPeer: couldn't connect to peer %s", iSt.config.peer)
+ return false
+}
+
// recvAndProcessDeltas first receives the log records and generation vector
// from the GetDeltas RPC and puts them in the Database. It also replays the
// entire log stream as the log records arrive. These records span multiple
@@ -405,20 +436,28 @@
// resolution during replay. This avoids resolving conflicts that have already
// been resolved by other devices.
func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error {
- iSt.sync.thLock.Lock()
- defer iSt.sync.thLock.Unlock()
+ // This is to handle issues with graftMap in DAG. Ideally, the
+ // transaction created to store all the deltas received over the network
+ // should not contend with any other store changes since this is all
+ // brand new information. However, as log records are received over the
+ // network, they are also incrementally processed. To enable incremental
+ // processing, the current head of each dirty object is read to populate
+ // the graftMap. This read can potentially contend with the watcher
+ // updating the head of an object. This lock prevents that contention in
+ // order to avoid retrying the whole transaction.
+ iSt.config.sync.thLock.Lock()
+ defer iSt.config.sync.thLock.Unlock()
// TODO(hpucha): This works for now, but figure out a long term solution
// as this may be implementation dependent. It currently works because
// the RecvStream call is stateless, and grabbing a handle to it
// repeatedly doesn't affect what data is seen next.
rcvr := iSt.stream.RecvStream()
- start, finish := false, false
// TODO(hpucha): See if we can avoid committing the entire delta stream
// as one batch. Currently the dependency is between the log records and
// the batch info.
- tx := iSt.st.NewTransaction()
+ tx := iSt.config.st.NewTransaction()
committed := false
defer func() {
@@ -433,18 +472,6 @@
for rcvr.Advance() {
resp := rcvr.Value()
switch v := resp.(type) {
- case interfaces.DeltaRespStart:
- if start {
- return verror.New(verror.ErrInternal, ctx, "received start followed by start in delta response stream")
- }
- start = true
-
- case interfaces.DeltaRespFinish:
- if finish {
- return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream")
- }
- finish = true
-
case interfaces.DeltaRespRespVec:
iSt.remote = v.Value
@@ -452,12 +479,18 @@
// Insert log record in Database.
// TODO(hpucha): Should we reserve more positions in a batch?
// TODO(hpucha): Handle if SyncGroup is left/destroyed while sync is in progress.
- pos := iSt.sync.reservePosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+ var pos uint64
+ if iSt.sg {
+ pos = iSt.config.sync.reservePosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, v.Value.Metadata.ObjId, 1)
+ } else {
+ pos = iSt.config.sync.reservePosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, "", 1)
+ }
+
rec := &localLogRec{Metadata: v.Value.Metadata, Pos: pos}
batchId := rec.Metadata.BatchId
if batchId != NoBatchId {
if cnt, ok := batchMap[batchId]; !ok {
- if iSt.sync.startBatch(ctx, tx, batchId) != batchId {
+ if iSt.config.sync.startBatch(ctx, tx, batchId) != batchId {
return verror.New(verror.ErrInternal, ctx, "failed to create batch info")
}
batchMap[batchId] = rec.Metadata.BatchCount
@@ -467,27 +500,25 @@
}
vlog.VI(4).Infof("sync: recvAndProcessDeltas: processing rec %v", rec)
- if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil {
+ if err := iSt.insertRecInLogAndDag(ctx, rec, batchId, tx); err != nil {
return err
}
- // Check for BlobRefs, and process them.
- if err := iSt.processBlobRefs(ctx, &rec.Metadata, v.Value.Value); err != nil {
- return err
+ if iSt.sg {
+ // Add the SyncGroup value to the Database.
+ } else {
+ if err := iSt.insertRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
+ return err
+ }
+ // Check for BlobRefs, and process them.
+ if err := iSt.processBlobRefs(ctx, &rec.Metadata, v.Value.Value); err != nil {
+ return err
+ }
}
// Mark object dirty.
iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
}
-
- // Break out of the stream.
- if finish {
- break
- }
- }
-
- if !(start && finish) {
- return verror.New(verror.ErrInternal, ctx, "didn't receive start/finish delimiters in delta response stream")
}
if err := rcvr.Err(); err != nil {
@@ -496,7 +527,7 @@
// End the started batches if any.
for bid, cnt := range batchMap {
- if err := iSt.sync.endBatch(ctx, tx, bid, cnt); err != nil {
+ if err := iSt.config.sync.endBatch(ctx, tx, bid, cnt); err != nil {
return err
}
}
@@ -516,6 +547,48 @@
return err
}
+// insertRecInLogAndDag adds a new log record to log and dag data structures.
+func (iSt *initiationState) insertRecInLogAndDag(ctx *context.T, rec *localLogRec, batchId uint64, tx store.Transaction) error {
+ var pfx string
+ m := rec.Metadata
+
+ if iSt.sg {
+ pfx = m.ObjId
+ } else {
+ pfx = logDataPrefix
+ }
+
+ if err := putLogRec(ctx, tx, pfx, rec); err != nil {
+ return err
+ }
+ logKey := logRecKey(pfx, m.Id, m.Gen)
+
+ var err error
+ switch m.RecType {
+ case interfaces.NodeRec:
+ err = iSt.config.sync.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, iSt.dagGraft)
+ case interfaces.LinkRec:
+ err = iSt.config.sync.addParent(ctx, tx, m.ObjId, m.CurVers, m.Parents[0], iSt.dagGraft)
+ default:
+ err = verror.New(verror.ErrInternal, ctx, "unknown log record type")
+ }
+
+ return err
+}
+
+// insertRecInDb inserts the versioned value in the Database.
+func (iSt *initiationState) insertRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
+ m := rec.Metadata
+ // TODO(hpucha): Hack right now. Need to change Database's handling of
+ // deleted objects. Currently, the initiator needs to treat deletions
+ // specially since deletions do not get a version number or a special
+ // value in the Database.
+ if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
+ return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
+ }
+ return nil
+}
+
func (iSt *initiationState) processBlobRefs(ctx *context.T, m *interfaces.LogRecMetadata, valbuf []byte) error {
objid := m.ObjId
srcPeer := syncbaseIdToName(m.Id)
@@ -538,16 +611,16 @@
}
sgIds := make(sgSet)
for br := range brs {
- for p, sgs := range iSt.sgPfxs {
+ for p, sgs := range iSt.config.sgPfxs {
if strings.HasPrefix(extractAppKey(objid), p) {
for sg := range sgs {
sgIds[sg] = struct{}{}
}
}
}
- vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.peer, srcPeer, sgIds)
- info := &blobLocInfo{peer: iSt.peer, source: srcPeer, sgIds: sgIds}
- if err := iSt.sync.addBlobLocInfo(ctx, br, info); err != nil {
+ vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.config.peer, srcPeer, sgIds)
+ info := &blobLocInfo{peer: iSt.config.peer, source: srcPeer, sgIds: sgIds}
+ if err := iSt.config.sync.addBlobLocInfo(ctx, br, info); err != nil {
return err
}
}
@@ -578,39 +651,6 @@
return nil
}
-// insertRecInLogDagAndDb adds a new log record to log and dag data structures,
-// and inserts the versioned value in the Database.
-func (iSt *initiationState) insertRecInLogDagAndDb(ctx *context.T, rec *localLogRec, batchId uint64, valbuf []byte, tx store.Transaction) error {
- if err := putLogRec(ctx, tx, rec); err != nil {
- return err
- }
-
- m := rec.Metadata
- logKey := logRecKey(m.Id, m.Gen)
-
- var err error
- switch m.RecType {
- case interfaces.NodeRec:
- err = iSt.sync.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, iSt.dagGraft)
- case interfaces.LinkRec:
- err = iSt.sync.addParent(ctx, tx, m.ObjId, m.CurVers, m.Parents[0], iSt.dagGraft)
- default:
- err = verror.New(verror.ErrInternal, ctx, "unknown log record type")
- }
-
- if err != nil {
- return err
- }
- // TODO(hpucha): Hack right now. Need to change Database's handling of
- // deleted objects. Currently, the initiator needs to treat deletions
- // specially since deletions do not get a version number or a special
- // value in the Database.
- if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
- return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
- }
- return nil
-}
-
// processUpdatedObjects processes all the updates received by the initiator,
// one object at a time. Conflict detection and resolution is carried out after
// the entire delta of log records is replayed, instead of incrementally after
@@ -660,15 +700,15 @@
}()
for {
- vlog.VI(3).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
+ vlog.VI(4).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
- iSt.tx = iSt.st.NewTransaction()
+ iSt.tx = iSt.config.st.NewTransaction()
watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
if count, err := iSt.detectConflicts(ctx); err != nil {
return err
} else {
- vlog.VI(3).Infof("sync: processUpdatedObjects: %d conflicts detected", count)
+ vlog.VI(4).Infof("sync: processUpdatedObjects: %d conflicts detected", count)
}
if err := iSt.resolveConflicts(ctx); err != nil {
@@ -682,10 +722,10 @@
if err == nil {
committed = true
// Update in-memory genvector since commit is successful.
- if err := iSt.sync.putDbGenInfoRemote(ctx, iSt.appName, iSt.dbName, iSt.updLocal); err != nil {
- vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
+ if err := iSt.config.sync.putDbGenInfoRemote(ctx, iSt.config.appName, iSt.config.dbName, iSt.sg, iSt.updLocal); err != nil {
+ vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for app %s db %s, err %v", iSt.config.appName, iSt.config.dbName, err)
}
- vlog.VI(3).Info("sync: processUpdatedObjects: end: changes committed")
+ vlog.VI(4).Info("sync: processUpdatedObjects: end: changes committed")
return nil
}
if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
@@ -699,7 +739,7 @@
// solution. Next iteration will have coordination with watch
// thread to intelligently retry. Hence this value is not a
// config param.
- vlog.VI(3).Info("sync: processUpdatedObjects: retry due to local mutations")
+ vlog.VI(4).Info("sync: processUpdatedObjects: retry due to local mutations")
iSt.tx.Abort()
time.Sleep(1 * time.Second)
}
@@ -736,7 +776,7 @@
// If the local version is picked, no further updates to the
// Database are needed. If the remote version is picked or if a
// new version is created, we put it in the Database.
- if confSt.res.ty != pickLocal {
+ if confSt.res.ty != pickLocal && !iSt.sg {
// TODO(hpucha): Hack right now. Need to change Database's
// handling of deleted objects.
@@ -815,10 +855,10 @@
}
// updateLogAndDag updates the log and dag data structures.
-func (iSt *initiationState) updateLogAndDag(ctx *context.T, obj string) error {
- confSt, ok := iSt.updObjects[obj]
+func (iSt *initiationState) updateLogAndDag(ctx *context.T, objid string) error {
+ confSt, ok := iSt.updObjects[objid]
if !ok {
- return verror.New(verror.ErrInternal, ctx, "object state not found", obj)
+ return verror.New(verror.ErrInternal, ctx, "object state not found", objid)
}
var newVersion string
@@ -831,11 +871,11 @@
switch {
case confSt.res.ty == pickLocal:
// Local version was picked as the conflict resolution.
- rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.oldHead, confSt.newHead)
+ rec = iSt.createLocalLinkLogRec(ctx, objid, confSt.oldHead, confSt.newHead)
newVersion = confSt.oldHead
case confSt.res.ty == pickRemote:
// Remote version was picked as the conflict resolution.
- rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.newHead, confSt.oldHead)
+ rec = iSt.createLocalLinkLogRec(ctx, objid, confSt.newHead, confSt.oldHead)
newVersion = confSt.newHead
default:
// New version was created to resolve the conflict.
@@ -843,7 +883,13 @@
newVersion = confSt.res.rec.Metadata.CurVers
}
- if err := putLogRec(ctx, iSt.tx, rec); err != nil {
+ var pfx string
+ if iSt.sg {
+ pfx = objid
+ } else {
+ pfx = logDataPrefix
+ }
+ if err := putLogRec(ctx, iSt.tx, pfx, rec); err != nil {
return err
}
@@ -852,9 +898,9 @@
m := rec.Metadata
switch m.RecType {
case interfaces.NodeRec:
- err = iSt.sync.addNode(ctx, iSt.tx, obj, m.CurVers, logRecKey(m.Id, m.Gen), m.Delete, m.Parents, NoBatchId, nil)
+ err = iSt.config.sync.addNode(ctx, iSt.tx, objid, m.CurVers, logRecKey(pfx, m.Id, m.Gen), m.Delete, m.Parents, NoBatchId, nil)
case interfaces.LinkRec:
- err = iSt.sync.addParent(ctx, iSt.tx, obj, m.CurVers, m.Parents[0], nil)
+ err = iSt.config.sync.addParent(ctx, iSt.tx, objid, m.CurVers, m.Parents[0], nil)
default:
return verror.New(verror.ErrInternal, ctx, "unknown log record type")
}
@@ -865,21 +911,26 @@
// Move the head. This should be idempotent. We may move head to the
// local head in some cases.
- return moveHead(ctx, iSt.tx, obj, newVersion)
+ return moveHead(ctx, iSt.tx, objid, newVersion)
}
-func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, obj, vers, par string) *localLogRec {
- gen, pos := iSt.sync.reserveGenAndPosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, objid, vers, par string) *localLogRec {
+ vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", objid, vers, par)
- vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", obj, vers, par)
+ var gen, pos uint64
+ if iSt.sg {
+ gen, pos = iSt.config.sync.reserveGenAndPosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, objid, 1)
+ } else {
+ gen, pos = iSt.config.sync.reserveGenAndPosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, "", 1)
+ }
rec := &localLogRec{
Metadata: interfaces.LogRecMetadata{
- Id: iSt.sync.id,
+ Id: iSt.config.sync.id,
Gen: gen,
RecType: interfaces.LinkRec,
- ObjId: obj,
+ ObjId: objid,
CurVers: vers,
Parents: []string{par},
UpdTime: time.Now().UTC(),
@@ -895,35 +946,53 @@
// updateSyncSt updates local sync state at the end of an initiator cycle.
func (iSt *initiationState) updateSyncSt(ctx *context.T) error {
// Get the current local sync state.
- dsInMem, err := iSt.sync.copyDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
+ dsInMem, err := iSt.config.sync.copyDbSyncStateInMem(ctx, iSt.config.appName, iSt.config.dbName)
if err != nil {
return err
}
+ // Create the state to be persisted.
ds := &dbSyncState{
- Gen: dsInMem.gen,
- CheckptGen: dsInMem.checkptGen,
- GenVec: dsInMem.genvec,
+ Data: localGenInfo{
+ Gen: dsInMem.data.gen,
+ CheckptGen: dsInMem.data.checkptGen,
+ },
+ Sgs: make(map[interfaces.GroupId]localGenInfo),
+ GenVec: dsInMem.genvec,
+ SgGenVec: dsInMem.sggenvec,
}
+ for id, info := range dsInMem.sgs {
+ ds.Sgs[id] = localGenInfo{
+ Gen: info.gen,
+ CheckptGen: info.checkptGen,
+ }
+ }
+
+ genvec := ds.GenVec
+ if iSt.sg {
+ genvec = ds.SgGenVec
+ }
// remote can be a subset of local.
for rpfx, respgv := range iSt.remote {
- for lpfx, lpgv := range ds.GenVec {
+ for lpfx, lpgv := range genvec {
if strings.HasPrefix(lpfx, rpfx) {
mergePrefixGenVectors(lpgv, respgv)
}
}
- if _, ok := ds.GenVec[rpfx]; !ok {
- ds.GenVec[rpfx] = respgv
+ if _, ok := genvec[rpfx]; !ok {
+ genvec[rpfx] = respgv
}
}
- iSt.updLocal = ds.GenVec
+ iSt.updLocal = genvec
// Clean the genvector of any local state. Note that local state is held
// in gen/ckPtGen in sync state struct.
for _, pgv := range iSt.updLocal {
- delete(pgv, iSt.sync.id)
+ delete(pgv, iSt.config.sync.id)
}
+ // TODO(hpucha): Flip join pending if needed.
+
// TODO(hpucha): Add knowledge compaction.
return putDbSyncState(ctx, iSt.tx, ds)
@@ -938,31 +1007,3 @@
}
}
}
-
-////////////////////////////////////////
-// Peer selection policies.
-
-// pickPeer picks a Syncbase to sync with.
-func (s *syncService) pickPeer(ctx *context.T) (string, error) {
- switch peerSelectionPolicy {
- case selectRandom:
- members := s.getMembers(ctx)
- // Remove myself from the set.
- delete(members, s.name)
- if len(members) == 0 {
- return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
- }
-
- // Pick a peer at random.
- ind := randIntn(len(members))
- for m := range members {
- if ind == 0 {
- return m, nil
- }
- ind--
- }
- return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
- default:
- return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
- }
-}
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 01a63ad..935539b 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -15,12 +15,14 @@
import (
"fmt"
"reflect"
+ "sort"
"testing"
"time"
"v.io/v23/services/syncbase/nosql"
"v.io/v23/vdl"
"v.io/v23/vom"
+ "v.io/x/lib/set"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/util"
@@ -84,7 +86,7 @@
// TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
// in file testdata/remote-init-00.log.sync.
func TestLogStreamRemoteOnly(t *testing.T) {
- svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync")
+ svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync", false)
defer cleanup(t, svc)
// Check all log records.
@@ -92,9 +94,9 @@
var gen uint64
var parents []string
for gen = 1; gen < 4; gen++ {
- gotRec, err := getLogRec(nil, svc.St(), 11, gen)
+ gotRec, err := getLogRec(nil, svc.St(), logDataPrefix, 11, gen)
if err != nil || gotRec == nil {
- t.Fatalf("getLogRec can not find object 11 %d, err %v", gen, err)
+ t.Fatalf("getLogRec can not find object %s 11 %d, err %v", logDataPrefix, gen, err)
}
vers := fmt.Sprintf("%d", gen)
wantRec := &localLogRec{
@@ -174,7 +176,7 @@
// correctly applied (when there are no conflicts). Commands are in files
// testdata/<local-init-00.log.sync,remote-noconf-00.log.sync>.
func TestLogStreamNoConflict(t *testing.T) {
- svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync")
+ svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync", false)
defer cleanup(t, svc)
objid := util.JoinKeyParts(util.RowPrefix, "foo1")
@@ -185,10 +187,10 @@
for _, devid := range []uint64{10, 11} {
var gen uint64
for gen = 1; gen < 4; gen++ {
- gotRec, err := getLogRec(nil, svc.St(), devid, gen)
+ gotRec, err := getLogRec(nil, svc.St(), logDataPrefix, devid, gen)
if err != nil || gotRec == nil {
- t.Fatalf("getLogRec can not find object %d:%d, err %v",
- devid, gen, err)
+ t.Fatalf("getLogRec can not find object %s:%d:%d, err %v",
+ logDataPrefix, devid, gen, err)
}
vers := fmt.Sprintf("%d", version)
wantRec := &localLogRec{
@@ -271,7 +273,7 @@
// correctly applied when there are conflicts. Commands are in files
// testdata/<local-init-00.log.sync,remote-conf-00.log.sync>.
func TestLogStreamConflict(t *testing.T) {
- svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-00.log.sync")
+ svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-00.log.sync", false)
defer cleanup(t, svc)
objid := util.JoinKeyParts(util.RowPrefix, "foo1")
@@ -318,7 +320,7 @@
// two versions of an object have no common ancestor. Commands are in files
// testdata/<local-init-00.log.sync,remote-conf-03.log.sync>.
func TestLogStreamConflictNoAncestor(t *testing.T) {
- svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-03.log.sync")
+ svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-03.log.sync", false)
defer cleanup(t, svc)
objid := util.JoinKeyParts(util.RowPrefix, "foo1")
@@ -360,10 +362,59 @@
tx.Abort()
}
+// TestSgSync tests that a local and a remote log stream can be correctly
+// applied when there are conflicts on a SyncGroup update. Commands are in files
+// testdata/<local-sginit-00.log.sync,remote-sgconf-00.log.sync>. Note that
+// since data syncing tests are covering different scenarios in detail, a single
+// sniff test is added for SyncGroup sync.
+// TODO(hpucha): Add this test.
+/* func TestSgSync(t *testing.T) {
+ svc, iSt, cleanup := testInit(t, "local-sginit-00.log.sync", "remote-sgconf-00.log.sync", false)
+ defer cleanup(t, svc)
+
+ objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+
+ // Verify conflict state.
+ if len(iSt.updObjects) != 1 {
+ t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+ }
+ st := iSt.updObjects[objid]
+ if !st.isConflict {
+ t.Fatalf("Didn't detect a conflict %v", st)
+ }
+ if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "2" {
+ t.Fatalf("Conflict detection didn't succeed %v", st)
+ }
+ if st.res.ty != pickRemote {
+ t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
+ }
+
+ // Verify DAG state.
+ if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
+ t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+ }
+
+ // Verify Database state.
+ valbuf, err := svc.St().Get([]byte(objid), nil)
+ var val string
+ if err := vom.Decode(valbuf, &val); err != nil {
+ t.Fatalf("Value decode failed, err %v", err)
+ }
+ if err != nil || val != "abc" {
+ t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+ }
+ tx := svc.St().NewTransaction()
+ versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
+ if err != nil || string(versbuf) != "6" {
+ t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
+ }
+ tx.Abort()
+} */
+
//////////////////////////////
// Helpers.
-func testInit(t *testing.T, lfile, rfile string) (*mockService, *initiationState, func(*testing.T, *mockService)) {
+func testInit(t *testing.T, lfile, rfile string, sg bool) (*mockService, *initiationState, func(*testing.T, *mockService)) {
// Set a large value to prevent the initiator from running.
peerSyncInterval = 1 * time.Hour
conflictResolutionPolicy = useTime
@@ -373,10 +424,20 @@
s.id = 10 // initiator
sgId1 := interfaces.GroupId(1234)
+ gdb := appDbName("mockapp", "mockdb")
nullInfo := nosql.SyncGroupMemberInfo{}
sgInfo := sgMemberInfo{
sgId1: nullInfo,
}
+ info := &memberInfo{
+ db2sg: map[string]sgMemberInfo{
+ gdb: sgInfo,
+ },
+ mtTables: map[string]struct{}{
+ "1/2/3/4": struct{}{},
+ "5/6/7/8": struct{}{},
+ },
+ }
sg1 := &interfaces.SyncGroup{
Name: "sg1",
@@ -411,32 +472,52 @@
return svc, nil, cleanup
}
- gdb := appDbName("mockapp", "mockdb")
- iSt, err := newInitiationState(nil, s, "b", gdb, sgInfo)
+ c, err := newInitiationConfig(nil, s, "b", gdb, info, set.String.ToSlice(info.mtTables))
if err != nil {
- t.Fatalf("newInitiationState failed with err %v", err)
+ t.Fatalf("newInitiationConfig failed with err %v", err)
}
- testIfSgPfxsEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
- testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
+ iSt := newInitiationState(nil, c, sg)
- s.initDbSyncStateInMem(nil, "mockapp", "mockdb")
-
- // Create local genvec so that it contains knowledge only about common prefixes.
- if err := iSt.createLocalGenVec(nil); err != nil {
- t.Fatalf("createLocalGenVec failed with err %v", err)
+ if !sg {
+ iSt.peerSgInfo(nil)
+ testIfSgPfxsEqual(t, iSt.config.sgPfxs, sg1.Spec.Prefixes)
}
- wantVec := interfaces.GenVector{
- "foo": interfaces.PrefixGenVector{10: 0},
- "bar": interfaces.PrefixGenVector{10: 0},
+ sort.Strings(iSt.config.mtTables)
+ sort.Strings(sg1.Spec.MountTables)
+ if !reflect.DeepEqual(iSt.config.mtTables, sg1.Spec.MountTables) {
+ t.Fatalf("Mount tables are not equal config %v, spec %v", iSt.config.mtTables, sg1.Spec.MountTables)
}
+
+ s.initSyncStateInMem(nil, "mockapp", "mockdb", sgId1)
+
+ iSt.stream = createReplayStream(t, rfile)
+
+ var wantVec interfaces.GenVector
+ if sg {
+ if err := iSt.prepareSGDeltaReq(nil); err != nil {
+ t.Fatalf("prepareSGDeltaReq failed with err %v", err)
+ }
+ sg := fmt.Sprintf("%d", sgId1)
+ wantVec = interfaces.GenVector{
+ sg: interfaces.PrefixGenVector{10: 0},
+ }
+ } else {
+ if err := iSt.prepareDataDeltaReq(nil); err != nil {
+ t.Fatalf("prepareDataDeltaReq failed with err %v", err)
+ }
+
+ wantVec = interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 0},
+ "bar": interfaces.PrefixGenVector{10: 0},
+ }
+ }
+
if !reflect.DeepEqual(iSt.local, wantVec) {
t.Fatalf("createLocalGenVec failed got %v, want %v", iSt.local, wantVec)
}
- iSt.stream = createReplayStream(t, rfile)
-
if err := iSt.recvAndProcessDeltas(nil); err != nil {
t.Fatalf("recvAndProcessDeltas failed with err %v", err)
}
@@ -461,13 +542,6 @@
}
}
-func testIfMapArrEqual(t *testing.T, m map[string]struct{}, a []string) {
- aMap := arrToMap(a)
- if !reflect.DeepEqual(m, aMap) {
- t.Fatalf("testIfMapArrEqual failed map %v, arr %v", m, aMap)
- }
-}
-
func arrToMap(a []string) map[string]struct{} {
m := make(map[string]struct{})
for _, s := range a {
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
index 1a4e2f1..e13eab6 100644
--- a/services/syncbase/vsync/replay_test.go
+++ b/services/syncbase/vsync/replay_test.go
@@ -278,21 +278,6 @@
func (ds *dummyStream) Cancel() {
}
-func (ds *dummyStream) SendStream() interface {
- Send(item interfaces.DeltaReq) error
- Close() error
-} {
- return ds
-}
-
-func (ds *dummyStream) Send(item interfaces.DeltaReq) error {
- return nil
-}
-
-func (ds *dummyStream) Close() error {
- return nil
-}
-
// replayLocalCommands replays local log records parsed from the input file.
func replayLocalCommands(t *testing.T, s *mockService, syncfile string) {
cmds, err := parseSyncCommands(syncfile)
@@ -342,9 +327,6 @@
}
stream := newStream()
- start := interfaces.DeltaRespStart{true}
- stream.add(start)
-
for _, cmd := range cmds {
var ty byte
switch cmd.cmd {
@@ -373,13 +355,11 @@
stream.add(rec)
}
- fin := interfaces.DeltaRespFinish{true}
- stream.add(fin)
return stream
}
func createMetadata(t *testing.T, ty byte, cmd syncCommand) interfaces.LogRecMetadata {
- id, gen, err := splitLogRecKey(nil, cmd.logrec)
+ _, id, gen, err := splitLogRecKey(nil, cmd.logrec)
if err != nil {
t.Fatalf("createReplayStream splitLogRecKey failed, key %s, err %v", cmd.logrec, gen)
}
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 0290b41..46d03a1 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -6,7 +6,9 @@
import (
"container/heap"
+ "fmt"
"sort"
+ "strconv"
"strings"
"v.io/v23/context"
@@ -19,41 +21,62 @@
)
// GetDeltas implements the responder side of the GetDeltas RPC.
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error {
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, req interfaces.DeltaReq, initiator string) error {
vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
- recvr := call.RecvStream()
- for recvr.Advance() {
- req := recvr.Value()
- // Ignoring errors since if one Database fails for any reason,
- // it is fine to continue to the next one. In fact, sometimes
- // the failure might be genuine. For example, the responder is
- // no longer part of the requested SyncGroups, or the app/db is
- // locally deleted, or a permission change has denied access.
- rSt := newResponderState(ctx, call, s, req, initiator)
- rSt.sendDeltasPerDatabase(ctx)
- }
-
- // TODO(hpucha): Is there a need to call finish or some such?
- return recvr.Err()
+ rSt := newResponderState(ctx, call, s, req, initiator)
+ return rSt.sendDeltasPerDatabase(ctx)
}
// responderState is state accumulated per Database by the responder during an
// initiation round.
type responderState struct {
- req interfaces.DeltaReq
+ // Parameters from the request.
+ appName string
+ dbName string
+ sgIds sgSet
+ initVec interfaces.GenVector
+ sg bool
+
call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
initiator string
- errState error // Captures the error from the first two phases of the responder.
sync *syncService
st store.Store // Store handle to the Database.
- diff genRangeVector
- outVec interfaces.GenVector
+
+ diff genRangeVector
+ outVec interfaces.GenVector
}
func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState {
- rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator}
+ rSt := &responderState{
+ call: call,
+ sync: sync,
+ initiator: initiator,
+ }
+
+ switch v := req.(type) {
+ case interfaces.DeltaReqData:
+ rSt.appName = v.Value.AppName
+ rSt.dbName = v.Value.DbName
+ rSt.sgIds = v.Value.SgIds
+ rSt.initVec = v.Value.InitVec
+
+ case interfaces.DeltaReqSgs:
+ rSt.sg = true
+ rSt.appName = v.Value.AppName
+ rSt.dbName = v.Value.DbName
+ rSt.initVec = v.Value.InitVec
+ rSt.sgIds = make(sgSet)
+ // Populate the sgids from the initvec.
+ for id := range rSt.initVec {
+ gid, err := strconv.ParseUint(id, 10, 64)
+ if err != nil {
+ vlog.Fatalf("sync: newResponderState: invalid syncgroup id", gid)
+ }
+ rSt.sgIds[interfaces.GroupId(gid)] = struct{}{}
+ }
+ }
return rSt
}
@@ -97,43 +120,60 @@
// (see http://goo.gl/mEa4L0) but only incur that overhead when the
// logging level specified is enabled.
vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
- rSt.req.AppName, rSt.req.DbName, rSt.req.SgIds, rSt.req.InitVec)
+ rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
// Phase 1 of sendDeltas: Authorize the initiator and respond to the
// caller only for the SyncGroups that allow access.
- rSt.authorizeAndFilterSyncGroups(ctx)
+ err := rSt.authorizeAndFilterSyncGroups(ctx)
- // Phase 2 of sendDeltas: diff contains the bound on the
+ // Check error from phase 1.
+ if err != nil {
+ return err
+ }
+
+ if len(rSt.initVec) == 0 {
+ return verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
+ }
+
+ // Phase 2 and 3 of sendDeltas: diff contains the bound on the
// generations missing from the initiator per device.
- rSt.computeDeltaBound(ctx)
+ if rSt.sg {
+ err = rSt.sendSgDeltas(ctx)
+ } else {
+ err = rSt.sendDataDeltas(ctx)
+ }
- // Phase 3 of sendDeltas: Process the diff, filtering out records that
- // are not needed, and send the remainder on the wire ordered.
- return rSt.filterAndSendDeltas(ctx)
+ return err
}
// authorizeAndFilterSyncGroups authorizes the initiator against the requested
// SyncGroups and filters the initiator's prefixes to only include those from
// allowed SyncGroups (phase 1 of sendDeltas).
-func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) {
- rSt.st, rSt.errState = rSt.sync.getDbStore(ctx, nil, rSt.req.AppName, rSt.req.DbName)
- if rSt.errState != nil {
- return
+func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) error {
+ var err error
+ rSt.st, err = rSt.sync.getDbStore(ctx, nil, rSt.appName, rSt.dbName)
+ if err != nil {
+ return err
}
allowedPfxs := make(map[string]struct{})
- for sgid := range rSt.req.SgIds {
+ for sgid := range rSt.sgIds {
// Check permissions for the SyncGroup.
var sg *interfaces.SyncGroup
- sg, rSt.errState = getSyncGroupById(ctx, rSt.st, sgid)
- if rSt.errState != nil {
- return
- }
- rSt.errState = authorize(ctx, rSt.call.Security(), sg)
- if verror.ErrorID(rSt.errState) == verror.ErrNoAccess.ID {
+ sg, err = getSyncGroupById(ctx, rSt.st, sgid)
+ if err != nil {
+ vlog.Errorf("sync: authorizeAndFilterSyncGroups: accessing SyncGroup information failed %v, err %v", sgid, err)
continue
- } else if rSt.errState != nil {
- return
+ }
+ err = authorize(ctx, rSt.call.Security(), sg)
+ if verror.ErrorID(err) == verror.ErrNoAccess.ID {
+ if rSt.sg {
+ id := fmt.Sprintf("%d", sgid)
+ delete(rSt.initVec, id)
+ }
+ continue
+ } else if err != nil {
+ return err
}
for _, p := range sg.Spec.Prefixes {
@@ -147,8 +187,16 @@
rSt.addInitiatorToSyncGroup(ctx, sgid)
}
+ if err != nil {
+ return err
+ }
+
+ if rSt.sg {
+ return nil
+ }
+
// Filter the initiator's prefixes to what is allowed.
- for pfx := range rSt.req.InitVec {
+ for pfx := range rSt.initVec {
if _, ok := allowedPfxs[pfx]; ok {
continue
}
@@ -160,10 +208,10 @@
}
if !allowed {
- delete(rSt.req.InitVec, pfx)
+ delete(rSt.initVec, pfx)
}
}
- return
+ return nil
}
// addInitiatorToSyncGroup adds the request initiator to the membership of the
@@ -199,27 +247,58 @@
}
}
-// computeDeltaBound computes the bound on missing generations across all
-// requested prefixes (phase 2 of sendDeltas).
-func (rSt *responderState) computeDeltaBound(ctx *context.T) {
- // Check error from phase 1.
- if rSt.errState != nil {
- return
+// sendSgDeltas computes the bound on missing generations, and sends the missing
+// log records across all requested SyncGroups (phases 2 and 3 of sendDeltas).
+func (rSt *responderState) sendSgDeltas(ctx *context.T) error {
+ vlog.VI(3).Infof("sync: sendSgDeltas: %s, %s: sgids %v, genvec %v",
+ rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
+
+ respVec, _, err := rSt.sync.copyDbGenInfo(ctx, rSt.appName, rSt.dbName, rSt.sgIds)
+ if err != nil {
+ return err
}
- if len(rSt.req.InitVec) == 0 {
- rSt.errState = verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
- return
+ rSt.outVec = make(interfaces.GenVector)
+
+ for sg, initpgv := range rSt.initVec {
+ respgv, ok := respVec[sg]
+ if !ok {
+ continue
+ }
+ rSt.diff = make(genRangeVector)
+ rSt.diffPrefixGenVectors(respgv, initpgv)
+ rSt.outVec[sg] = respgv
+
+ if err := rSt.filterAndSendDeltas(ctx, sg); err != nil {
+ return err
+ }
+ }
+ return rSt.sendGenVec(ctx)
+}
+
+// sendDataDeltas computes the bound on missing generations across all requested
+// prefixes, and sends the missing log records (phases 2 and 3 of sendDeltas).
+func (rSt *responderState) sendDataDeltas(ctx *context.T) error {
+ // Phase 2 of sendDeltas: Compute the missing generations.
+ if err := rSt.computeDataDeltas(ctx); err != nil {
+ return err
}
- var respVec interfaces.GenVector
- var respGen uint64
- respVec, respGen, rSt.errState = rSt.sync.copyDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
- if rSt.errState != nil {
- return
+ // Phase 3 of sendDeltas: Process the diff, filtering out records that
+ // are not needed, and send the remainder on the wire ordered.
+ if err := rSt.filterAndSendDeltas(ctx, logDataPrefix); err != nil {
+ return err
+ }
+ return rSt.sendGenVec(ctx)
+}
+
+func (rSt *responderState) computeDataDeltas(ctx *context.T) error {
+ respVec, respGen, err := rSt.sync.copyDbGenInfo(ctx, rSt.appName, rSt.dbName, nil)
+ if err != nil {
+ return err
}
respPfxs := extractAndSortPrefixes(respVec)
- initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+ initPfxs := extractAndSortPrefixes(rSt.initVec)
rSt.outVec = make(interfaces.GenVector)
rSt.diff = make(genRangeVector)
@@ -235,7 +314,7 @@
pfx = p
// Lower bound on initiator's knowledge for this prefix set.
- initpgv := rSt.req.InitVec[pfx]
+ initpgv := rSt.initVec[pfx]
// Find the relevant responder prefixes and add the corresponding knowledge.
var respgv interfaces.PrefixGenVector
@@ -285,27 +364,17 @@
}
vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
- rSt.req.AppName, rSt.req.DbName, rSt.diff, rSt.outVec)
- return
+ rSt.appName, rSt.dbName, rSt.diff, rSt.outVec)
+ return nil
}
// filterAndSendDeltas filters the computed delta to remove records already
// known by the initiator, and sends the resulting records to the initiator
// (phase 3 of sendDeltas).
-func (rSt *responderState) filterAndSendDeltas(ctx *context.T) error {
- // Always send a start and finish response so that the initiator can
- // move on to the next Database.
- //
+func (rSt *responderState) filterAndSendDeltas(ctx *context.T, pfx string) error {
// TODO(hpucha): Although ok for now to call SendStream once per
// Database, would like to make this implementation agnostic.
sender := rSt.call.SendStream()
- sender.Send(interfaces.DeltaRespStart{true})
- defer sender.Send(interfaces.DeltaRespFinish{true})
-
- // Check error from phase 2.
- if rSt.errState != nil {
- return rSt.errState
- }
// First two phases were successful. So now on to phase 3. We now visit
// every log record in the generation range as obtained from phase 1 in
@@ -316,7 +385,7 @@
mh := make(minHeap, 0, len(rSt.diff))
for dev, r := range rSt.diff {
r.cur = r.min
- rec, err := getNextLogRec(ctx, rSt.st, dev, r)
+ rec, err := getNextLogRec(ctx, rSt.st, pfx, dev, r)
if err != nil {
return err
}
@@ -329,11 +398,14 @@
heap.Init(&mh)
// Process the log records in order.
- initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+ var initPfxs []string
+ if !rSt.sg {
+ initPfxs = extractAndSortPrefixes(rSt.initVec)
+ }
for mh.Len() > 0 {
rec := heap.Pop(&mh).(*localLogRec)
- if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
+ if rSt.sg || !filterLogRec(rec, rSt.initVec, initPfxs) {
// Send on the wire.
wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
if err != nil {
@@ -344,7 +416,7 @@
// Add a new record from the same device if not done.
dev := rec.Metadata.Id
- rec, err := getNextLogRec(ctx, rSt.st, dev, rSt.diff[dev])
+ rec, err := getNextLogRec(ctx, rSt.st, pfx, dev, rSt.diff[dev])
if err != nil {
return err
}
@@ -354,7 +426,11 @@
delete(rSt.diff, dev)
}
}
+ return nil
+}
+func (rSt *responderState) sendGenVec(ctx *context.T) error {
+ sender := rSt.call.SendStream()
sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
return nil
}
@@ -426,9 +502,9 @@
// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
// loop.
-func getNextLogRec(ctx *context.T, st store.Store, dev uint64, r *genRange) (*localLogRec, error) {
+func getNextLogRec(ctx *context.T, st store.Store, pfx string, dev uint64, r *genRange) (*localLogRec, error) {
for i := r.cur; i <= r.max; i++ {
- rec, err := getLogRec(ctx, st, dev, i)
+ rec, err := getLogRec(ctx, st, pfx, dev, i)
if err == nil {
r.cur = i + 1
return rec, nil
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
index dbd0fea..f1c42e0 100644
--- a/services/syncbase/vsync/responder_test.go
+++ b/services/syncbase/vsync/responder_test.go
@@ -111,7 +111,7 @@
for _, test := range tests {
want := test.genDiffWant
got := test.genDiffIn
- rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{}, "fakeInitiator")
+ rSt := newResponderState(nil, nil, s, interfaces.DeltaReqData{}, "fakeInitiator")
rSt.diff = got
rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
checkEqualDevRanges(t, got, want)
@@ -121,7 +121,7 @@
// TestSendDeltas tests the computation of the delta bound (computeDeltaBound)
// and if the log records on the wire are correctly ordered (phases 2 and 3 of
// SendDeltas).
-func TestSendDeltas(t *testing.T) {
+func TestSendDataDeltas(t *testing.T) {
appName := "mockapp"
dbName := "mockdb"
@@ -350,16 +350,13 @@
s.id = 10 //responder.
wantDiff, wantVec := test.genDiff, test.outVec
- s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
-
- req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
- rSt := newResponderState(nil, nil, s, req, "fakeInitiator")
-
- rSt.computeDeltaBound(nil)
- if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
- t.Fatalf("computeDeltaBound failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, rSt.outVec, wantVec, rSt.errState)
+ s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{
+ data: &localGenInfoInMem{
+ gen: test.respGen,
+ checkptGen: test.respGen,
+ },
+ genvec: test.respVec,
}
- checkEqualDevRanges(t, rSt.diff, wantDiff)
////////////////////////////////////////
// Test sending deltas.
@@ -385,7 +382,7 @@
Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
Pos: pos + k,
}
- if err := putLogRec(nil, tx, rec); err != nil {
+ if err := putLogRec(nil, tx, logDataPrefix, rec); err != nil {
t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
}
value := fmt.Sprintf("value_%s", okey)
@@ -404,16 +401,31 @@
t.Fatalf("cannot commit putting log rec, err %v", err)
}
+ req := interfaces.DataDeltaReq{
+ AppName: appName,
+ DbName: dbName,
+ InitVec: test.initVec,
+ }
+
+ rSt := newResponderState(nil, nil, s, interfaces.DeltaReqData{req}, "fakeInitiator")
d := &dummyResponder{}
rSt.call = d
- rSt.st, rSt.errState = rSt.sync.getDbStore(nil, nil, rSt.req.AppName, rSt.req.DbName)
- if rSt.errState != nil {
- t.Fatalf("filterAndSendDeltas failed to get store handle for app/db %v %v", rSt.req.AppName, rSt.req.DbName)
- }
- err := rSt.filterAndSendDeltas(nil)
+ var err error
+ rSt.st, err = rSt.sync.getDbStore(nil, nil, rSt.appName, rSt.dbName)
if err != nil {
- t.Fatalf("filterAndSendDeltas failed (I: %v), (R: %v, %v) err %v", test.initVec, test.respGen, test.respVec, err)
+ t.Fatalf("getDbStore failed to get store handle for app/db %v %v", rSt.appName, rSt.dbName)
}
+
+ err = rSt.computeDataDeltas(nil)
+ if err != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
+ t.Fatalf("computeDataDeltas failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, rSt.outVec, wantVec, err)
+ }
+ checkEqualDevRanges(t, rSt.diff, wantDiff)
+
+ if err = rSt.sendDataDeltas(nil); err != nil {
+ t.Fatalf("sendDataDeltas failed, err %v", err)
+ }
+
d.diffLogRecs(t, wantRecs, wantVec)
destroyService(t, svc)
@@ -424,29 +436,10 @@
// Helpers
type dummyResponder struct {
- start, finish int
- gotRecs []*localLogRec
- outVec interfaces.GenVector
+ gotRecs []*localLogRec
+ outVec interfaces.GenVector
}
-func (d *dummyResponder) RecvStream() interface {
- Advance() bool
- Value() interfaces.DeltaReq
- Err() error
-} {
- return d
-}
-
-func (d *dummyResponder) Advance() bool {
- return false
-}
-
-func (d *dummyResponder) Value() interfaces.DeltaReq {
- return interfaces.DeltaReq{}
-}
-
-func (d *dummyResponder) Err() error { return nil }
-
func (d *dummyResponder) SendStream() interface {
Send(item interfaces.DeltaResp) error
} {
@@ -455,10 +448,6 @@
func (d *dummyResponder) Send(item interfaces.DeltaResp) error {
switch v := item.(type) {
- case interfaces.DeltaRespStart:
- d.start++
- case interfaces.DeltaRespFinish:
- d.finish++
case interfaces.DeltaRespRespVec:
d.outVec = v.Value
case interfaces.DeltaRespRec:
@@ -492,9 +481,6 @@
}
func (d *dummyResponder) diffLogRecs(t *testing.T, wantRecs []*localLogRec, wantVec interfaces.GenVector) {
- if d.start != 1 || d.finish != 1 {
- t.Fatalf("diffLogRecs incorrect start/finish records (%v, %v)", d.start, d.finish)
- }
if len(d.gotRecs) != len(wantRecs) {
t.Fatalf("diffLogRecs failed, gotLen %v, wantLen %v\n", len(d.gotRecs), len(wantRecs))
}
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 94ab6eb..1eebf88 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -29,6 +29,26 @@
// generation vector for a Database. Log records are indexed such that they can
// be selectively retrieved from the store for any missing generation from any
// device.
+//
+// Sync also tracks the current generation number and the current local log
+// position for each mutation of a SyncGroup, created on a Database. Similar to
+// the data log records, these log records are used to sync SyncGroup metadata.
+//
+// The generations for the data mutations and mutations for each SyncGroup are
+// in separate spaces. Data mutations in a Database start at gen 1, and
+// grow. Mutations for each SyncGroup start at gen 1, and grow. Thus, for the
+// local data log records, the keys are of the form
+// $sync:log:data:<devid>:<gen>, and the keys for local SyncGroup log record are
+// of the form $sync:log:<sgid>:<devid>:<gen>.
+
+// TODO(hpucha): Should this space be separate from the data or not? If it is
+// not, it can provide consistency between data and SyncGroup metadata. For
+// example, lets say we mutate the data in a SyncGroup and soon after change the
+// SyncGroup ACL to prevent syncing with a device. This device may not get the
+// last batch of updates since the next time it will try to sync, it will be
+// rejected. However implementing consistency is not straightforward. Even if we
+// had SyncGroup updates in the same space as the data, we need to switch to the
+// right SyncGroup ACL at the responder based on the requested generations.
import (
"fmt"
@@ -36,24 +56,58 @@
"v.io/v23/context"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/util"
"v.io/x/ref/services/syncbase/store"
)
-// dbSyncStateInMem represents the in-memory sync state of a Database.
-type dbSyncStateInMem struct {
- gen uint64
- pos uint64
-
+// localGenInfoInMem represents the state corresponding to local generations.
+type localGenInfoInMem struct {
+ gen uint64
+ pos uint64
checkptGen uint64
- genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+}
+
+func (in *localGenInfoInMem) deepCopy() *localGenInfoInMem {
+ out := &localGenInfoInMem{
+ gen: in.gen,
+ pos: in.pos,
+ checkptGen: in.checkptGen,
+ }
+ return out
+}
+
+// dbSyncStateInMem represents the in-memory sync state of a Database and all
+// its SyncGroups.
+type dbSyncStateInMem struct {
+ data *localGenInfoInMem // info for data.
+ sgs map[interfaces.GroupId]*localGenInfoInMem // info for SyncGroups.
+
+ // Note: Generation vector contains state from remote devices only.
+ genvec interfaces.GenVector
+ sggenvec interfaces.GenVector
+}
+
+func (in *dbSyncStateInMem) deepCopy() *dbSyncStateInMem {
+ out := &dbSyncStateInMem{}
+ out.data = in.data.deepCopy()
+
+ out.sgs = make(map[interfaces.GroupId]*localGenInfoInMem)
+ for id, info := range in.sgs {
+ out.sgs[id] = info.deepCopy()
+ }
+
+ out.genvec = in.genvec.DeepCopy()
+ out.sggenvec = in.sggenvec.DeepCopy()
+
+ return out
}
// initSync initializes the sync module during startup. It scans all the
// databases across all apps to initialize the following:
-// a) in-memory sync state of a Database consisting of the current generation
-// number, log position and generation vector.
+// a) in-memory sync state of a Database and all its SyncGroups consisting of
+// the current generation number, log position and generation vector.
// b) watcher map of prefixes currently being synced.
// c) republish names in mount tables for all syncgroups.
//
@@ -94,14 +148,14 @@
if verror.ErrorID(err) == verror.ErrNoExist.ID {
scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
} else {
- scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
+ scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(logDataPrefix, s.id, ds.Data.Gen), "")
}
var maxpos uint64
var dbName string
// Scan local log records to find the most recent one.
st.Scan(scanStart, scanLimit)
// Scan remote log records using the persisted GenVector.
- s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
+ s.syncState[dbName] = &dbSyncStateInMem{data: &localGenInfoInMem{pos: maxpos + 1}}
}
return false
@@ -110,42 +164,68 @@
return errFinal
}
+// Note: For all the utilities below, if the sgid parameter is non-nil, the
+// operation is performed in the SyncGroup space. If nil, it is performed in the
+// data space for the Database.
+//
+// TODO(hpucha): Once GroupId is changed to string, clean up these function
+// signatures.
+
// reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
// positions in a Database's log. Used when local updates result in log
// entries.
-func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName string, count uint64) (uint64, uint64) {
- return s.reserveGenAndPosInternal(appName, dbName, count, count)
+func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) (uint64, uint64) {
+ return s.reserveGenAndPosInternal(appName, dbName, sgid, count, count)
}
// reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
// when remote log records are received.
-func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName string, count uint64) uint64 {
- _, pos := s.reserveGenAndPosInternal(appName, dbName, 0, count)
+func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) uint64 {
+ _, pos := s.reserveGenAndPosInternal(appName, dbName, sgid, 0, count)
return pos
}
-func (s *syncService) reserveGenAndPosInternal(appName, dbName string, genCount, posCount uint64) (uint64, uint64) {
+func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgid string, genCount, posCount uint64) (uint64, uint64) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
name := appDbName(appName, dbName)
ds, ok := s.syncState[name]
if !ok {
- ds = &dbSyncStateInMem{gen: 1}
+ ds = &dbSyncStateInMem{
+ data: &localGenInfoInMem{gen: 1},
+ sgs: make(map[interfaces.GroupId]*localGenInfoInMem),
+ }
s.syncState[name] = ds
}
- gen := ds.gen
- pos := ds.pos
+ var info *localGenInfoInMem
+ if sgid != "" {
+ id, err := strconv.ParseUint(sgid, 10, 64)
+ if err != nil {
+ vlog.Fatalf("sync: reserveGenAndPosInternal: invalid syncgroup id", sgid)
+ }
- ds.gen += genCount
- ds.pos += posCount
+ var ok bool
+ info, ok = ds.sgs[interfaces.GroupId(id)]
+ if !ok {
+ info = &localGenInfoInMem{gen: 1}
+ ds.sgs[interfaces.GroupId(id)] = info
+ }
+ } else {
+ info = ds.data
+ }
+ gen := info.gen
+ pos := info.pos
+
+ info.gen += genCount
+ info.pos += posCount
return gen, pos
}
// checkptLocalGen freezes the local generation number for the responder's use.
-func (s *syncService) checkptLocalGen(ctx *context.T, appName, dbName string) error {
+func (s *syncService) checkptLocalGen(ctx *context.T, appName, dbName string, sgs sgSet) error {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -157,19 +237,40 @@
// The frozen generation is the last generation number used, i.e. one
// below the next available one to use.
- ds.checkptGen = ds.gen - 1
+ if len(sgs) > 0 {
+ // Checkpoint requested SyncGroups.
+ for id := range sgs {
+ info, ok := ds.sgs[id]
+ if !ok {
+ return verror.New(verror.ErrInternal, ctx, "sg state not found", name, id)
+ }
+ info.checkptGen = info.gen - 1
+ }
+ } else {
+ ds.data.checkptGen = ds.data.gen - 1
+ }
return nil
}
-// initDbSyncStateInMem initializes the in memory sync state of the Database if needed.
-func (s *syncService) initDbSyncStateInMem(ctx *context.T, appName, dbName string) {
+// initSyncStateInMem initializes the in memory sync state of the Database/SyncGroup if needed.
+func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgid interfaces.GroupId) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
name := appDbName(appName, dbName)
if s.syncState[name] == nil {
- s.syncState[name] = &dbSyncStateInMem{gen: 1}
+ s.syncState[name] = &dbSyncStateInMem{
+ data: &localGenInfoInMem{gen: 1},
+ sgs: make(map[interfaces.GroupId]*localGenInfoInMem),
+ }
}
+ if sgid != interfaces.NoGroupId {
+ ds := s.syncState[name]
+ if _, ok := ds.sgs[sgid]; !ok {
+ ds.sgs[sgid] = &localGenInfoInMem{gen: 1}
+ }
+ }
+ return
}
// copyDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
@@ -182,20 +283,11 @@
if !ok {
return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
-
- dsCopy := &dbSyncStateInMem{
- gen: ds.gen,
- pos: ds.pos,
- checkptGen: ds.checkptGen,
- }
-
- dsCopy.genvec = copyGenVec(ds.genvec)
-
- return dsCopy, nil
+ return ds.deepCopy(), nil
}
// copyDbGenInfo returns a copy of the current generation information of the Database.
-func (s *syncService) copyDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
+func (s *syncService) copyDbGenInfo(ctx *context.T, appName, dbName string, sgs sgSet) (interfaces.GenVector, uint64, error) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -205,18 +297,30 @@
return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
- genvec := copyGenVec(ds.genvec)
+ var genvec interfaces.GenVector
+ var gen uint64
+ if len(sgs) > 0 {
+ genvec = make(interfaces.GenVector)
+ for id := range sgs {
+ sid := fmt.Sprintf("%d", id)
+ gv := ds.sggenvec[sid]
+ genvec[sid] = gv.DeepCopy()
+ genvec[sid][s.id] = ds.sgs[id].checkptGen
+ }
+ } else {
+ genvec = ds.genvec.DeepCopy()
- // Add local generation information to the genvec.
- for _, gv := range genvec {
- gv[s.id] = ds.checkptGen
+ // Add local generation information to the genvec.
+ for _, gv := range genvec {
+ gv[s.id] = ds.data.checkptGen
+ }
+ gen = ds.data.checkptGen
}
-
- return genvec, ds.checkptGen, nil
+ return genvec, gen, nil
}
// putDbGenInfoRemote puts the current remote generation information of the Database.
-func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error {
+func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, sg bool, genvec interfaces.GenVector) error {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -226,7 +330,11 @@
return verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
- ds.genvec = copyGenVec(genvec)
+ if sg {
+ ds.sggenvec = genvec.DeepCopy()
+ } else {
+ ds.genvec = genvec.DeepCopy()
+ }
return nil
}
@@ -248,18 +356,6 @@
return parts[0], parts[1], nil
}
-func copyGenVec(in interfaces.GenVector) interfaces.GenVector {
- genvec := make(interfaces.GenVector)
- for p, inpgv := range in {
- pgv := make(interfaces.PrefixGenVector)
- for id, gen := range inpgv {
- pgv[id] = gen
- }
- genvec[p] = pgv
- }
- return genvec
-}
-
////////////////////////////////////////////////////////////
// Low-level utility functions to access sync state.
@@ -291,36 +387,42 @@
}
// logRecKey returns the key used to access a specific log record.
-func logRecKey(id, gen uint64) string {
- return util.JoinKeyParts(util.SyncPrefix, logPrefix, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
+func logRecKey(pfx string, id, gen uint64) string {
+ return util.JoinKeyParts(util.SyncPrefix, logPrefix, pfx, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
}
-// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
-func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) {
+// splitLogRecKey is the inverse of logRecKey and returns the prefix, device id
+// and generation number.
+func splitLogRecKey(ctx *context.T, key string) (string, uint64, uint64, error) {
parts := util.SplitKeyParts(key)
verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
- if len(parts) != 4 {
- return 0, 0, verr
+ if len(parts) != 5 {
+ return "", 0, 0, verr
}
if parts[0] != util.SyncPrefix || parts[1] != logPrefix {
- return 0, 0, verr
+ return "", 0, 0, verr
}
- id, err := strconv.ParseUint(parts[2], 10, 64)
+ if parts[2] != logDataPrefix {
+ if _, err := strconv.ParseUint(parts[2], 10, 64); err != nil {
+ return "", 0, 0, verr
+ }
+ }
+ id, err := strconv.ParseUint(parts[3], 10, 64)
if err != nil {
- return 0, 0, verr
+ return "", 0, 0, verr
}
- gen, err := strconv.ParseUint(parts[3], 16, 64)
+ gen, err := strconv.ParseUint(parts[4], 16, 64)
if err != nil {
- return 0, 0, verr
+ return "", 0, 0, verr
}
- return id, gen, nil
+ return parts[2], id, gen, nil
}
// hasLogRec returns true if the log record for (devid, gen) exists.
-func hasLogRec(st store.StoreReader, id, gen uint64) (bool, error) {
+func hasLogRec(st store.StoreReader, pfx string, id, gen uint64) (bool, error) {
// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
var rec localLogRec
- if err := util.Get(nil, st, logRecKey(id, gen), &rec); err != nil {
+ if err := util.Get(nil, st, logRecKey(pfx, id, gen), &rec); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
err = nil
}
@@ -330,20 +432,25 @@
}
// putLogRec stores the log record.
-func putLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
- return util.Put(ctx, tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec)
+func putLogRec(ctx *context.T, tx store.Transaction, pfx string, rec *localLogRec) error {
+ return util.Put(ctx, tx, logRecKey(pfx, rec.Metadata.Id, rec.Metadata.Gen), rec)
}
// getLogRec retrieves the log record for a given (devid, gen).
-func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
+func getLogRec(ctx *context.T, st store.StoreReader, pfx string, id, gen uint64) (*localLogRec, error) {
+ return getLogRecByKey(ctx, st, logRecKey(pfx, id, gen))
+}
+
+// getLogRecByKey retrieves the log record for a given log record key.
+func getLogRecByKey(ctx *context.T, st store.StoreReader, key string) (*localLogRec, error) {
var rec localLogRec
- if err := util.Get(ctx, st, logRecKey(id, gen), &rec); err != nil {
+ if err := util.Get(ctx, st, key, &rec); err != nil {
return nil, err
}
return &rec, nil
}
// delLogRec deletes the log record for a given (devid, gen).
-func delLogRec(ctx *context.T, tx store.Transaction, id, gen uint64) error {
- return util.Delete(ctx, tx, logRecKey(id, gen))
+func delLogRec(ctx *context.T, tx store.Transaction, pfx string, id, gen uint64) error {
+ return util.Delete(ctx, tx, logRecKey(pfx, id, gen))
}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index b621fdc..3036611 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -6,6 +6,7 @@
import (
"reflect"
+ "strconv"
"testing"
"time"
@@ -23,18 +24,31 @@
defer destroyService(t, svc)
s := svc.sync
- var wantGen, wantPos uint64 = 1, 0
- for i := 0; i < 5; i++ {
- gotGen, gotPos := s.reserveGenAndPosInternal("mockapp", "mockdb", 5, 10)
- if gotGen != wantGen || gotPos != wantPos {
- t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", gotGen, wantGen, gotPos, wantPos)
- }
- wantGen += 5
- wantPos += 10
+ sgids := []string{"", "100", "200"}
+ for _, sgid := range sgids {
+ var wantGen, wantPos uint64 = 1, 0
+ for i := 0; i < 5; i++ {
+ gotGen, gotPos := s.reserveGenAndPosInternal("mockapp", "mockdb", sgid, 5, 10)
+ if gotGen != wantGen || gotPos != wantPos {
+ t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", gotGen, wantGen, gotPos, wantPos)
+ }
+ wantGen += 5
+ wantPos += 10
- name := appDbName("mockapp", "mockdb")
- if s.syncState[name].gen != wantGen || s.syncState[name].pos != wantPos {
- t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", s.syncState[name].gen, wantGen, s.syncState[name].pos, wantPos)
+ name := appDbName("mockapp", "mockdb")
+ var info *localGenInfoInMem
+ if sgid == "" {
+ info = s.syncState[name].data
+ } else {
+ id, err := strconv.ParseUint(sgid, 10, 64)
+ if err != nil {
+ t.Fatalf("reserveGenAndPosInternal failed, invalid sgid %v", sgid)
+ }
+ info = s.syncState[name].sgs[interfaces.GroupId(id)]
+ }
+ if info.gen != wantGen || info.pos != wantPos {
+ t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", info.gen, wantGen, info.pos, wantPos)
+ }
}
}
}
@@ -52,9 +66,25 @@
1: 2, 3: 4, 5: 6,
},
}
+ sggv := interfaces.GenVector{
+ "mocksg1": interfaces.PrefixGenVector{
+ 10: 20, 30: 40, 50: 60,
+ },
+ "mocksg2": interfaces.PrefixGenVector{
+ 100: 200, 300: 400, 500: 600,
+ },
+ }
+ localsgs := make(map[interfaces.GroupId]localGenInfo)
+ localsgs[interfaces.GroupId(8888)] = localGenInfo{Gen: 56, CheckptGen: 2000}
+ localsgs[interfaces.GroupId(1008888)] = localGenInfo{Gen: 25890, CheckptGen: 100}
tx := st.NewTransaction()
- wantSt := &dbSyncState{Gen: 40, GenVec: gv}
+ wantSt := &dbSyncState{
+ Data: localGenInfo{Gen: 40},
+ Sgs: localsgs,
+ GenVec: gv,
+ SgGenVec: sggv,
+ }
if err := putDbSyncState(nil, tx, wantSt); err != nil {
t.Fatalf("putDbSyncState failed, err %v", err)
}
@@ -92,8 +122,8 @@
},
Pos: 10,
}
- if err := putLogRec(nil, tx, wantRec); err != nil {
- t.Fatalf("putLogRec(%d:%d) failed err %v", id, gen, err)
+ if err := putLogRec(nil, tx, logDataPrefix, wantRec); err != nil {
+ t.Fatalf("putLogRec(%s:%d:%d) failed err %v", logDataPrefix, id, gen, err)
}
if err := tx.Commit(); err != nil {
t.Fatalf("cannot commit putting log rec, err %v", err)
@@ -102,8 +132,8 @@
checkLogRec(t, st, id, gen, true, wantRec)
tx = st.NewTransaction()
- if err := delLogRec(nil, tx, id, gen); err != nil {
- t.Fatalf("delLogRec(%d:%d) failed err %v", id, gen, err)
+ if err := delLogRec(nil, tx, logDataPrefix, id, gen); err != nil {
+ t.Fatalf("delLogRec(%s:%d:%d) failed err %v", logDataPrefix, id, gen, err)
}
if err := tx.Commit(); err != nil {
t.Fatalf("cannot commit deleting log rec, err %v", err)
@@ -113,27 +143,28 @@
}
func TestLogRecKeyUtils(t *testing.T) {
- invalid := []string{"$sync:aa:bb", "log:aa:bb", "$sync:log:aa:xx", "$sync:log:x:bb"}
+ invalid := []string{"$sync:100:bb", "log:100:bb", "$sync:log:data:100:xx", "$sync:log:data:aa:bb", "$sync:log:xx:100:bb"}
for _, k := range invalid {
- if _, _, err := splitLogRecKey(nil, k); err == nil {
+ if _, _, _, err := splitLogRecKey(nil, k); err == nil {
t.Fatalf("splitting log rec key didn't fail %q", k)
}
}
valid := []struct {
+ pfx string
id uint64
gen uint64
}{
- {10, 20},
- {190, 540},
- {9999, 999999},
+ {logDataPrefix, 10, 20},
+ {"2500", 190, 540},
+ {"4200", 9999, 999999},
}
for _, v := range valid {
- gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.id, v.gen))
- if gotId != v.id || gotGen != v.gen || err != nil {
- t.Fatalf("failed key conversion id got %v want %v, gen got %v want %v, err %v", gotId, v.id, gotGen, v.gen, err)
+ gotPfx, gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.pfx, v.id, v.gen))
+ if gotPfx != v.pfx || gotId != v.id || gotGen != v.gen || err != nil {
+ t.Fatalf("failed key conversion pfx got %v want %v, id got %v want %v, gen got %v want %v, err %v", gotPfx, v.pfx, gotId, v.id, gotGen, v.gen, err)
}
}
}
@@ -158,7 +189,7 @@
}
func checkLogRec(t *testing.T, st store.StoreReader, id, gen uint64, exists bool, wantRec *localLogRec) {
- gotRec, err := getLogRec(nil, st, id, gen)
+ gotRec, err := getLogRec(nil, st, logDataPrefix, id, gen)
if (!exists && err == nil) || (exists && err != nil) {
t.Fatalf("getLogRec(%d:%d) failed, exists %v err %v", id, gen, exists, err)
@@ -168,7 +199,7 @@
t.Fatalf("getLogRec(%d:%d) failed, got %v, want %v", id, gen, gotRec, wantRec)
}
- if ok, err := hasLogRec(st, id, gen); err != nil || ok != exists {
+ if ok, err := hasLogRec(st, logDataPrefix, id, gen); err != nil || ok != exists {
t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
}
}
diff --git a/services/syncbase/vsync/syncer.go b/services/syncbase/vsync/syncer.go
new file mode 100644
index 0000000..22f485b
--- /dev/null
+++ b/services/syncbase/vsync/syncer.go
@@ -0,0 +1,105 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+ "v.io/x/lib/vlog"
+)
+
+// Policies to pick a peer to sync with.
+const (
+ // Picks a peer at random from the available set.
+ selectRandom = iota
+
+ // TODO(hpucha): implement other policies.
+ // Picks a peer with most differing generations.
+ selectMostDiff
+
+ // Picks a peer that was synced with the furthest in the past.
+ selectOldest
+)
+
+var (
+ // peerSyncInterval is the duration between two consecutive peer
+ // contacts. During every peer contact, the initiator obtains any
+ // pending updates from that peer.
+ peerSyncInterval = 50 * time.Millisecond
+
+ // peerSelectionPolicy is the policy used to select a peer when
+ // the initiator gets a chance to sync.
+ peerSelectionPolicy = selectRandom
+)
+
+// syncer wakes up every peerSyncInterval to do work: (1) Refresh memberView if
+// needed and pick a peer from all the known remote peers to sync with. (2) Act
+// as an initiator and sync Syncgroup metadata for all common SyncGroups with
+// the chosen peer (getting updates from the remote peer, detecting and
+// resolving conflicts) (3) Act as an initiator and sync data corresponding to
+// all common SyncGroups across all Apps/Databases with the chosen peer; (4)
+// Fetch any queued blobs. (5) Transfer ownership of blobs if needed. (6) Act as
+// a SyncGroup publisher to publish pending SyncGroups; (6) Garbage collect
+// older generations.
+//
+// TODO(hpucha): Currently only does initiation. Add rest.
+func (s *syncService) syncer(ctx *context.T) {
+ defer s.pending.Done()
+
+ ticker := time.NewTicker(peerSyncInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-s.closed:
+ vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
+ return
+
+ case <-ticker.C:
+ }
+
+ // TODO(hpucha): Cut a gen for the responder even if there is no
+ // one to initiate to?
+
+ // Do work.
+ peer, err := s.pickPeer(ctx)
+ if err != nil {
+ continue
+ }
+
+ // Sync Syncgroup metadata and data.
+ s.getDeltas(ctx, peer)
+ }
+}
+
+////////////////////////////////////////
+// Peer selection policies.
+
+// pickPeer picks a Syncbase to sync with.
+func (s *syncService) pickPeer(ctx *context.T) (string, error) {
+ switch peerSelectionPolicy {
+ case selectRandom:
+ members := s.getMembers(ctx)
+ // Remove myself from the set.
+ delete(members, s.name)
+ if len(members) == 0 {
+ return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
+ }
+
+ // Pick a peer at random.
+ ind := randIntn(len(members))
+ for m := range members {
+ if ind == 0 {
+ return m, nil
+ }
+ ind--
+ }
+ return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+ default:
+ return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
+ }
+}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 7225404..91ba499 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -54,10 +54,13 @@
}
// memberInfo holds the member metadata for each SyncGroup this member belongs
-// to within each App/Database (i.e. global database name). It's a mapping of
-// global DB names to sets of SyncGroup member information.
+// to within each App/Database (i.e. global database name). It's a mapping of
+// global DB names to sets of SyncGroup member information. It also maintains
+// all the mount table candidates that could be used to reach this peer, learned
+// from the SyncGroup metadata.
type memberInfo struct {
- db2sg map[string]sgMemberInfo
+ db2sg map[string]sgMemberInfo
+ mtTables map[string]struct{}
}
// sgMemberInfo maps SyncGroups to their member metadata.
@@ -220,12 +223,20 @@
// A member's info is different across SyncGroups, so gather all of them.
for member, info := range sg.Joiners {
if _, ok := newMembers[member]; !ok {
- newMembers[member] = &memberInfo{db2sg: make(map[string]sgMemberInfo)}
+ newMembers[member] = &memberInfo{
+ db2sg: make(map[string]sgMemberInfo),
+ mtTables: make(map[string]struct{}),
+ }
}
if _, ok := newMembers[member].db2sg[name]; !ok {
newMembers[member].db2sg[name] = make(sgMemberInfo)
}
newMembers[member].db2sg[name][sg.Id] = info
+
+ // Collect mount tables.
+ for _, mt := range sg.Spec.MountTables {
+ newMembers[member].mtTables[mt] = struct{}{}
+ }
}
return false
})
@@ -291,13 +302,19 @@
}
// Make a copy.
- infoCopy := &memberInfo{make(map[string]sgMemberInfo)}
+ infoCopy := &memberInfo{
+ db2sg: make(map[string]sgMemberInfo),
+ mtTables: make(map[string]struct{}),
+ }
for gdbName, sgInfo := range info.db2sg {
infoCopy.db2sg[gdbName] = make(sgMemberInfo)
for gid, mi := range sgInfo {
infoCopy.db2sg[gdbName][gid] = mi
}
}
+ for mt := range info.mtTables {
+ infoCopy.mtTables[mt] = struct{}{}
+ }
return infoCopy
}
@@ -407,6 +424,10 @@
vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
+ // Get this Syncbase's sync module handle.
+ ss := sd.sync.(*syncService)
+ var sg *interfaces.SyncGroup
+
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -419,11 +440,8 @@
// TODO(hpucha): Do some SG ACL checking. Check creator
// has Admin privilege.
- // Get this Syncbase's sync module handle.
- ss := sd.sync.(*syncService)
-
// Instantiate sg. Add self as joiner.
- sg := &interfaces.SyncGroup{
+ sg = &interfaces.SyncGroup{
Id: newSyncGroupId(),
Name: sgName,
SpecVersion: newSyncGroupVersion(),
@@ -449,6 +467,7 @@
return err
}
+ ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
// Local SG create succeeded. Publish the SG at the chosen server.
sd.publishSyncGroup(ctx, call, sgName)
@@ -542,6 +561,8 @@
return nullSpec, err
}
+ ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+
// Publish at the chosen mount table and in the neighborhood.
sd.publishInMountTables(ctx, call, sg.Spec)
@@ -841,7 +862,11 @@
return addSyncGroup(ctx, tx, &sg)
})
- return err
+ if err != nil {
+ return err
+ }
+ s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+ return nil
}
func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index e48e1f6..4dc5845 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -329,7 +329,8 @@
Creator: "mockCreator",
SpecVersion: "etag-1",
Spec: nosql.SyncGroupSpec{
- Prefixes: []string{"foo"},
+ MountTables: []string{"mt1"},
+ Prefixes: []string{"foo"},
},
Joiners: map[string]nosql.SyncGroupMemberInfo{
"phone": nosql.SyncGroupMemberInfo{SyncPriority: 10},
@@ -345,7 +346,8 @@
Creator: "mockCreator",
SpecVersion: "etag-2",
Spec: nosql.SyncGroupSpec{
- Prefixes: []string{"bar"},
+ MountTables: []string{"mt2", "mt3"},
+ Prefixes: []string{"bar"},
},
Joiners: map[string]nosql.SyncGroupMemberInfo{
"tablet": nosql.SyncGroupMemberInfo{SyncPriority: 111},
@@ -383,6 +385,11 @@
t.Errorf("invalid SyncGroup members: got %v instead of %v", members, expMembers)
}
+ mt2and3 := map[string]struct{}{
+ "mt2": struct{}{},
+ "mt3": struct{}{},
+ }
+
expMemberInfo := map[string]*memberInfo{
"phone": &memberInfo{
db2sg: map[string]sgMemberInfo{
@@ -390,6 +397,7 @@
sgId1: sg1.Joiners["phone"],
},
},
+ mtTables: map[string]struct{}{"mt1": struct{}{}},
},
"tablet": &memberInfo{
db2sg: map[string]sgMemberInfo{
@@ -398,6 +406,11 @@
sgId2: sg2.Joiners["tablet"],
},
},
+ mtTables: map[string]struct{}{
+ "mt1": struct{}{},
+ "mt2": struct{}{},
+ "mt3": struct{}{},
+ },
},
"cloud": &memberInfo{
db2sg: map[string]sgMemberInfo{
@@ -405,6 +418,7 @@
sgId1: sg1.Joiners["cloud"],
},
},
+ mtTables: map[string]struct{}{"mt1": struct{}{}},
},
"door": &memberInfo{
db2sg: map[string]sgMemberInfo{
@@ -412,6 +426,7 @@
sgId2: sg2.Joiners["door"],
},
},
+ mtTables: mt2and3,
},
"lamp": &memberInfo{
db2sg: map[string]sgMemberInfo{
@@ -419,6 +434,7 @@
sgId2: sg2.Joiners["lamp"],
},
},
+ mtTables: mt2and3,
},
}
@@ -430,7 +446,7 @@
}
expInfo := expMemberInfo[mm]
if !reflect.DeepEqual(mi, expInfo) {
- t.Errorf("invalid Info for SyncGroup member %s: got %v instead of %v", mm, mi, expInfo)
+ t.Errorf("invalid Info for SyncGroup member %s: got %#v instead of %#v", mm, mi, expInfo)
}
}
@@ -462,6 +478,7 @@
sgId2: sg2.Joiners["tablet"],
},
},
+ mtTables: mt2and3,
},
"door": &memberInfo{
db2sg: map[string]sgMemberInfo{
@@ -469,6 +486,7 @@
sgId2: sg2.Joiners["door"],
},
},
+ mtTables: mt2and3,
},
"lamp": &memberInfo{
db2sg: map[string]sgMemberInfo{
@@ -476,6 +494,7 @@
sgId2: sg2.Joiners["lamp"],
},
},
+ mtTables: mt2and3,
},
}
diff --git a/services/syncbase/vsync/testdata/local-init-00.log.sync b/services/syncbase/vsync/testdata/local-init-00.log.sync
index 7435348..7f655eb 100644
--- a/services/syncbase/vsync/testdata/local-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/local-init-00.log.sync
@@ -1,6 +1,6 @@
# Create an object locally and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addl|foo1|1|||$sync:log:10:1|0|1|false
-addl|foo1|2|1||$sync:log:10:2|0|1|false
-addl|foo1|3|2||$sync:log:10:3|0|1|false
+addl|foo1|1|||$sync:log:data:10:1|0|1|false
+addl|foo1|2|1||$sync:log:data:10:2|0|1|false
+addl|foo1|3|2||$sync:log:data:10:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-00.log.sync b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
index 060cf0c..66cdce3 100644
--- a/services/syncbase/vsync/testdata/remote-conf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
@@ -3,6 +3,6 @@
# it from the local sync at v2, then updated separately).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|foo1|4|2||$sync:log:11:1|0|1|false
-addr|foo1|5|4||$sync:log:11:2|0|1|false
-addr|foo1|6|5||$sync:log:11:3|0|1|false
+addr|foo1|4|2||$sync:log:data:11:1|0|1|false
+addr|foo1|5|4||$sync:log:data:11:2|0|1|false
+addr|foo1|6|5||$sync:log:data:11:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-01.log.sync b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
index 2053157..5f440b5 100644
--- a/services/syncbase/vsync/testdata/remote-conf-01.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
@@ -5,6 +5,6 @@
# sees 2 graft points: v1-v4 and v2-v5.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|foo1|4|1||$sync:log:12:1|0|1|false
-addr|foo1|5|2|4|$sync:log:11:1|0|1|false
-addr|foo1|6|5||$sync:log:11:2|0|1|false
+addr|foo1|4|1||$sync:log:data:12:1|0|1|false
+addr|foo1|5|2|4|$sync:log:data:11:1|0|1|false
+addr|foo1|6|5||$sync:log:data:11:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-03.log.sync b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
index 673405e..edf91a0 100644
--- a/services/syncbase/vsync/testdata/remote-conf-03.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
@@ -1,6 +1,6 @@
# Create the same object remotely from scratch and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|foo1|4|||$sync:log:11:1|0|1|false
-addr|foo1|5|4||$sync:log:11:2|0|1|false
-addr|foo1|6|5||$sync:log:11:3|0|1|false
+addr|foo1|4|||$sync:log:data:11:1|0|1|false
+addr|foo1|5|4||$sync:log:data:11:2|0|1|false
+addr|foo1|6|5||$sync:log:data:11:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-init-00.log.sync b/services/syncbase/vsync/testdata/remote-init-00.log.sync
index 35b1511..a5a4864 100644
--- a/services/syncbase/vsync/testdata/remote-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-init-00.log.sync
@@ -1,7 +1,7 @@
# Create an object remotely and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|foo1|1|||$sync:log:11:1|0|1|false
-addr|foo1|2|1||$sync:log:11:2|0|1|false
-addr|foo1|3|2||$sync:log:11:3|0|1|false
+addr|foo1|1|||$sync:log:data:11:1|0|1|false
+addr|foo1|2|1||$sync:log:data:11:2|0|1|false
+addr|foo1|3|2||$sync:log:data:11:3|0|1|false
genvec|foo1|10:0,11:3|bar|11:0
diff --git a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
index ef52b0a..94e144e 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
@@ -2,8 +2,7 @@
# after it was created locally up to v3 (i.e. assume the remote sync
# received it from the local sync first, then updated it).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-
-addr|foo1|4|3||$sync:log:11:1|0|1|false
-addr|foo1|5|4||$sync:log:11:2|0|1|false
-addr|foo1|6|5||$sync:log:11:3|0|1|false
+addr|foo1|4|3||$sync:log:data:11:1|0|1|false
+addr|foo1|5|4||$sync:log:data:11:2|0|1|false
+addr|foo1|6|5||$sync:log:data:11:3|0|1|false
genvec|foo1|10:0,11:3|bar|11:0
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index f34d415..9cf53d3 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -11,10 +11,11 @@
// Key prefixes for sync data structures. All these prefixes are prepended with
// util.SyncPrefix.
const (
- logPrefix = "log"
- dbssPrefix = "dbss"
- dagPrefix = "dag"
- sgPrefix = "sg"
+ logPrefix = "log" // log state.
+ logDataPrefix = "data" // data log state.
+ dbssPrefix = "dbss" // database sync state.
+ dagPrefix = "dag" // dag state.
+ sgPrefix = "sg" // syncgroup state.
)
// syncData represents the persistent state of the sync module.
@@ -22,11 +23,18 @@
Id uint64
}
+// localGenInfo represents the persistent state corresponding to local generations.
+type localGenInfo struct {
+ Gen uint64 // local generation number incremented on every local update.
+ CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
+}
+
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
- Gen uint64 // local generation number incremented on every local update.
- CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
- GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+ Data localGenInfo
+ Sgs map[interfaces.GroupId]localGenInfo
+ GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
+ SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
}
// localLogRec represents the persistent local state of a log record. Metadata
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index f305083..c02d2d4 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -25,11 +25,23 @@
}) {
}
+// localGenInfo represents the persistent state corresponding to local generations.
+type localGenInfo struct {
+ Gen uint64 // local generation number incremented on every local update.
+ CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
+}
+
+func (localGenInfo) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/vsync.localGenInfo"`
+}) {
+}
+
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
- Gen uint64 // local generation number incremented on every local update.
- CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
- GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+ Data localGenInfo
+ Sgs map[interfaces.GroupId]localGenInfo
+ GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
+ SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
}
func (dbSyncState) __VDLReflect(struct {
@@ -51,14 +63,17 @@
func init() {
vdl.Register((*syncData)(nil))
+ vdl.Register((*localGenInfo)(nil))
vdl.Register((*dbSyncState)(nil))
vdl.Register((*localLogRec)(nil))
}
-const logPrefix = "log"
+const logPrefix = "log" // log state.
-const dbssPrefix = "dbss"
+const logDataPrefix = "data" // data log state.
-const dagPrefix = "dag"
+const dbssPrefix = "dbss" // database sync state.
-const sgPrefix = "sg"
+const dagPrefix = "dag" // dag state.
+
+const sgPrefix = "sg" // syncgroup state.
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 2e9e6fc..9c16294 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -120,7 +120,7 @@
}
// Initialize Database sync state if needed.
- s.initDbSyncStateInMem(ctx, appName, dbName)
+ s.initSyncStateInMem(ctx, appName, dbName, interfaces.NoGroupId)
// Get a batch of watch log entries, if any, after this resume marker.
logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)
@@ -212,7 +212,7 @@
}
}
- gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
+ gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, "", count)
vlog.VI(3).Infof("sync: processBatch: %s, %s: len %d, total %d, btid %x, gen %d, pos %d",
appName, dbName, count, totalCount, batchId, gen, pos)
@@ -251,12 +251,12 @@
// suitably updating the DAG metadata.
func (s *syncService) processLocalLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
// Insert the new log record into the log.
- if err := putLogRec(ctx, tx, rec); err != nil {
+ if err := putLogRec(ctx, tx, logDataPrefix, rec); err != nil {
return err
}
m := rec.Metadata
- logKey := logRecKey(m.Id, m.Gen)
+ logKey := logRecKey(logDataPrefix, m.Id, m.Gen)
// Insert the new log record into dag.
if err := s.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, nil); err != nil {