syncbase/vsync: Initiator module.
Change-Id: I3dfc95ee8f9a6cd6a40558bf1dfab3a2282c3e3e
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 082d645..410371e 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -12,15 +12,22 @@
// Sync defines methods for data exchange between Syncbases.
// TODO(hpucha): Flesh this out further.
type Sync interface {
- // GetDeltas returns the responder's current generation vector
- // and all the missing log records when compared to the
- // initiator's generation vector.
- GetDeltas() error {access.Read}
+ // GetDeltas returns the responder's current generation vector and all
+ // the missing log records when compared to the initiator's generation
+ // vector. This process happens one Database at a time encompassing all
+ // the SyncGroups common to the initiator and the responder. For each
+ // Database, the initiator sends a DeltaReq. In response, the
+ // responder sends a "Start" DeltaResp record, all the missing log
+ // records, the responder's genvector, and a "Finish" DeltaResp
+ // record. The initiator parses the stream between a Start and a Finish
+ // record as the response to its DeltaReq, and then moves on to the
+ // next Database, in common with this responder.
+ GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
// SyncGroup-related methods.
- // PublishSyncGroup is typically invoked on a "central" peer
- // to publish the SyncGroup.
+ // PublishSyncGroup is typically invoked on a "central" peer to publish
+ // the SyncGroup.
PublishSyncGroup(sg SyncGroup) error {access.Write}
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 594ab0f..0ca399f 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -9,6 +9,7 @@
import (
// VDL system imports
+ "io"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
@@ -25,12 +26,19 @@
// Sync defines methods for data exchange between Syncbases.
// TODO(hpucha): Flesh this out further.
type SyncClientMethods interface {
- // GetDeltas returns the responder's current generation vector
- // and all the missing log records when compared to the
- // initiator's generation vector.
- GetDeltas(*context.T, ...rpc.CallOpt) error
- // PublishSyncGroup is typically invoked on a "central" peer
- // to publish the SyncGroup.
+ // GetDeltas returns the responder's current generation vector and all
+ // the missing log records when compared to the initiator's generation
+ // vector. This process happens one Database at a time encompassing all
+ // the SyncGroups common to the initiator and the responder. For each
+ // Database, the initiator sends a DeltaReq. In response, the
+ // responder sends a "Start" DeltaResp record, all the missing log
+ // records, the responder's genvector, and a "Finish" DeltaResp
+ // record. The initiator parses the stream between a Start and a Finish
+ // record as the response to its DeltaReq, and then moves on to the
+ // next Database, in common with this responder.
+ GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+ // PublishSyncGroup is typically invoked on a "central" peer to publish
+ // the SyncGroup.
PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
@@ -57,8 +65,12 @@
name string
}
-func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "GetDeltas", nil, nil, opts...)
+func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", nil, opts...); err != nil {
+ return
+ }
+ ocall = &implSyncGetDeltasClientCall{ClientCall: call}
return
}
@@ -77,18 +89,127 @@
return
}
+// SyncGetDeltasClientStream is the client stream for Sync.GetDeltas.
+type SyncGetDeltasClientStream interface {
+ // RecvStream returns the receiver side of the Sync.GetDeltas client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() DeltaResp
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+ // SendStream returns the send side of the Sync.GetDeltas client stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors
+ // encountered while sending, or if Send is called after Close or
+ // the stream has been canceled. Blocks if there is no buffer
+ // space; will unblock when buffer space is available or after
+ // the stream has been canceled.
+ Send(item DeltaReq) error
+ // Close indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items.
+ // This is an optional call - e.g. a client might call Close if it
+ // needs to continue receiving items from the server after it's
+ // done sending. Returns errors encountered while closing, or if
+ // Close is called after the stream has been canceled. Like Send,
+ // blocks if there is no buffer space available.
+ Close() error
+ }
+}
+
+// SyncGetDeltasClientCall represents the call returned from Sync.GetDeltas.
+type SyncGetDeltasClientCall interface {
+ SyncGetDeltasClientStream
+ // Finish performs the equivalent of SendStream().Close, then blocks until
+ // the server is done, and returns the positional return values for the call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implSyncGetDeltasClientCall struct {
+ rpc.ClientCall
+ valRecv DeltaResp
+ errRecv error
+}
+
+func (c *implSyncGetDeltasClientCall) RecvStream() interface {
+ Advance() bool
+ Value() DeltaResp
+ Err() error
+} {
+ return implSyncGetDeltasClientCallRecv{c}
+}
+
+type implSyncGetDeltasClientCallRecv struct {
+ c *implSyncGetDeltasClientCall
+}
+
+func (c implSyncGetDeltasClientCallRecv) Advance() bool {
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implSyncGetDeltasClientCallRecv) Value() DeltaResp {
+ return c.c.valRecv
+}
+func (c implSyncGetDeltasClientCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implSyncGetDeltasClientCall) SendStream() interface {
+ Send(item DeltaReq) error
+ Close() error
+} {
+ return implSyncGetDeltasClientCallSend{c}
+}
+
+type implSyncGetDeltasClientCallSend struct {
+ c *implSyncGetDeltasClientCall
+}
+
+func (c implSyncGetDeltasClientCallSend) Send(item DeltaReq) error {
+ return c.c.Send(item)
+}
+func (c implSyncGetDeltasClientCallSend) Close() error {
+ return c.c.CloseSend()
+}
+func (c *implSyncGetDeltasClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
// SyncServerMethods is the interface a server writer
// implements for Sync.
//
// Sync defines methods for data exchange between Syncbases.
// TODO(hpucha): Flesh this out further.
type SyncServerMethods interface {
- // GetDeltas returns the responder's current generation vector
- // and all the missing log records when compared to the
- // initiator's generation vector.
- GetDeltas(*context.T, rpc.ServerCall) error
- // PublishSyncGroup is typically invoked on a "central" peer
- // to publish the SyncGroup.
+ // GetDeltas returns the responder's current generation vector and all
+ // the missing log records when compared to the initiator's generation
+ // vector. This process happens one Database at a time encompassing all
+ // the SyncGroups common to the initiator and the responder. For each
+ // Database, the initiator sends a DeltaReq. In response, the
+ // responder sends a "Start" DeltaResp record, all the missing log
+ // records, the responder's genvector, and a "Finish" DeltaResp
+ // record. The initiator parses the stream between a Start and a Finish
+ // record as the response to its DeltaReq, and then moves on to the
+ // next Database, in common with this responder.
+ GetDeltas(*context.T, SyncGetDeltasServerCall) error
+ // PublishSyncGroup is typically invoked on a "central" peer to publish
+ // the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
@@ -102,9 +223,32 @@
// SyncServerStubMethods is the server interface containing
// Sync methods, as expected by rpc.Server.
-// There is no difference between this interface and SyncServerMethods
-// since there are no streaming methods.
-type SyncServerStubMethods SyncServerMethods
+// The only difference between this interface and SyncServerMethods
+// is the streaming methods.
+type SyncServerStubMethods interface {
+ // GetDeltas returns the responder's current generation vector and all
+ // the missing log records when compared to the initiator's generation
+ // vector. This process happens one Database at a time encompassing all
+ // the SyncGroups common to the initiator and the responder. For each
+ // Database, the initiator sends a DeltaReq. In response, the
+ // responder sends a "Start" DeltaResp record, all the missing log
+ // records, the responder's genvector, and a "Finish" DeltaResp
+ // record. The initiator parses the stream between a Start and a Finish
+ // record as the response to its DeltaReq, and then moves on to the
+ // next Database, in common with this responder.
+ GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
+ // PublishSyncGroup is typically invoked on a "central" peer to publish
+ // the SyncGroup.
+ PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+ // JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+ // Syncbase on a SyncGroup admin. It checks whether the requestor is
+ // allowed to join the named SyncGroup, and if so, adds the requestor to
+ // the SyncGroup.
+ JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+ // BlobSync methods.
+ // FetchBlob returns the requested blob.
+ FetchBlob(*context.T, rpc.ServerCall) error
+}
// SyncServerStub adds universal methods to SyncServerStubMethods.
type SyncServerStub interface {
@@ -135,7 +279,7 @@
gs *rpc.GlobState
}
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub) error {
return s.impl.GetDeltas(ctx, call)
}
@@ -170,12 +314,12 @@
Methods: []rpc.MethodDesc{
{
Name: "GetDeltas",
- Doc: "// GetDeltas returns the responder's current generation vector\n// and all the missing log records when compared to the\n// initiator's generation vector.",
+ Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database, in common with this responder.",
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
{
Name: "PublishSyncGroup",
- Doc: "// PublishSyncGroup is typically invoked on a \"central\" peer\n// to publish the SyncGroup.",
+ Doc: "// PublishSyncGroup is typically invoked on a \"central\" peer to publish\n// the SyncGroup.",
InArgs: []rpc.ArgDesc{
{"sg", ``}, // SyncGroup
},
@@ -201,3 +345,88 @@
},
},
}
+
+// SyncGetDeltasServerStream is the server stream for Sync.GetDeltas.
+type SyncGetDeltasServerStream interface {
+ // RecvStream returns the receiver side of the Sync.GetDeltas server stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() DeltaReq
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+ // SendStream returns the send side of the Sync.GetDeltas server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item DeltaResp) error
+ }
+}
+
+// SyncGetDeltasServerCall represents the context passed to Sync.GetDeltas.
+type SyncGetDeltasServerCall interface {
+ rpc.ServerCall
+ SyncGetDeltasServerStream
+}
+
+// SyncGetDeltasServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements SyncGetDeltasServerCall.
+type SyncGetDeltasServerCallStub struct {
+ rpc.StreamServerCall
+ valRecv DeltaReq
+ errRecv error
+}
+
+// Init initializes SyncGetDeltasServerCallStub from rpc.StreamServerCall.
+func (s *SyncGetDeltasServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
+func (s *SyncGetDeltasServerCallStub) RecvStream() interface {
+ Advance() bool
+ Value() DeltaReq
+ Err() error
+} {
+ return implSyncGetDeltasServerCallRecv{s}
+}
+
+type implSyncGetDeltasServerCallRecv struct {
+ s *SyncGetDeltasServerCallStub
+}
+
+func (s implSyncGetDeltasServerCallRecv) Advance() bool {
+ s.s.valRecv = DeltaReq{}
+ s.s.errRecv = s.s.Recv(&s.s.valRecv)
+ return s.s.errRecv == nil
+}
+func (s implSyncGetDeltasServerCallRecv) Value() DeltaReq {
+ return s.s.valRecv
+}
+func (s implSyncGetDeltasServerCallRecv) Err() error {
+ if s.s.errRecv == io.EOF {
+ return nil
+ }
+ return s.s.errRecv
+}
+
+// SendStream returns the send side of the Sync.GetDeltas server stream.
+func (s *SyncGetDeltasServerCallStub) SendStream() interface {
+ Send(item DeltaResp) error
+} {
+ return implSyncGetDeltasServerCallSend{s}
+}
+
+type implSyncGetDeltasServerCallSend struct {
+ s *SyncGetDeltasServerCallStub
+}
+
+func (s implSyncGetDeltasServerCallSend) Send(item DeltaResp) error {
+ return s.s.Send(item)
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index bb6f4d4..caf0339 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -32,8 +32,8 @@
type PrefixGenVector map[uint64]uint64
// GenVector is the generation vector for a Database, and maps prefixes to their
-// generation vectors. Note that the prefixes in a GenVector are global prefixes
-// that include the appropriate Application and Database name.
+// generation vectors. Note that the prefixes in a GenVector are relative to the
+// the Application and Database name.
type GenVector map[string]PrefixGenVector
// LogRecMetadata represents the metadata of a single log record that is
@@ -49,7 +49,7 @@
RecType byte // type of log record.
// Object related information.
- ObjId string // id of the object that was updated.
+ ObjId string // id of the object that was updated. This id is relative to Application and Database names.
CurVers string // current version number of the object.
Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
UpdTime time.Time // timestamp when the update is generated.
@@ -94,3 +94,19 @@
Status SyncGroupStatus // Status of the SyncGroup
Joiners map[string]wire.SyncGroupMemberInfo // map of joiners to their metadata
}
+
+// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
+// interested with in a Database when requesting deltas for that Database.
+type DeltaReq struct {
+ SgIds set[GroupId]
+ InitVec GenVector
+}
+
+// DeltaResp contains the responder's genvector or the missing log records
+// returned in response to an initiator's request for deltas for a Database.
+type DeltaResp union {
+ Start bool
+ Finish bool
+ Rec LogRec
+ RespVec GenVector
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index a22b37d..a22731d 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -28,8 +28,8 @@
}
// GenVector is the generation vector for a Database, and maps prefixes to their
-// generation vectors. Note that the prefixes in a GenVector are global prefixes
-// that include the appropriate Application and Database name.
+// generation vectors. Note that the prefixes in a GenVector are relative to the
+// the Application and Database name.
type GenVector map[string]PrefixGenVector
func (GenVector) __VDLReflect(struct {
@@ -49,7 +49,7 @@
Gen uint64 // generation number for the log record.
RecType byte // type of log record.
// Object related information.
- ObjId string // id of the object that was updated.
+ ObjId string // id of the object that was updated. This id is relative to Application and Database names.
CurVers string // current version number of the object.
Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
UpdTime time.Time // timestamp when the update is generated.
@@ -155,6 +155,74 @@
}) {
}
+// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
+// interested with in a Database when requesting deltas for that Database.
+type DeltaReq struct {
+ SgIds map[GroupId]struct{}
+ InitVec GenVector
+}
+
+func (DeltaReq) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+}) {
+}
+
+type (
+ // DeltaResp represents any single field of the DeltaResp union type.
+ //
+ // DeltaResp contains the responder's genvector or the missing log records
+ // returned in response to an initiator's request for deltas for a Database.
+ DeltaResp interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the DeltaResp union type.
+ __VDLReflect(__DeltaRespReflect)
+ }
+ // DeltaRespStart represents field Start of the DeltaResp union type.
+ DeltaRespStart struct{ Value bool }
+ // DeltaRespFinish represents field Finish of the DeltaResp union type.
+ DeltaRespFinish struct{ Value bool }
+ // DeltaRespRec represents field Rec of the DeltaResp union type.
+ DeltaRespRec struct{ Value LogRec }
+ // DeltaRespRespVec represents field RespVec of the DeltaResp union type.
+ DeltaRespRespVec struct{ Value GenVector }
+ // __DeltaRespReflect describes the DeltaResp union type.
+ __DeltaRespReflect struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.DeltaResp"`
+ Type DeltaResp
+ Union struct {
+ Start DeltaRespStart
+ Finish DeltaRespFinish
+ Rec DeltaRespRec
+ RespVec DeltaRespRespVec
+ }
+ }
+)
+
+func (x DeltaRespStart) Index() int { return 0 }
+func (x DeltaRespStart) Interface() interface{} { return x.Value }
+func (x DeltaRespStart) Name() string { return "Start" }
+func (x DeltaRespStart) __VDLReflect(__DeltaRespReflect) {}
+
+func (x DeltaRespFinish) Index() int { return 1 }
+func (x DeltaRespFinish) Interface() interface{} { return x.Value }
+func (x DeltaRespFinish) Name() string { return "Finish" }
+func (x DeltaRespFinish) __VDLReflect(__DeltaRespReflect) {}
+
+func (x DeltaRespRec) Index() int { return 2 }
+func (x DeltaRespRec) Interface() interface{} { return x.Value }
+func (x DeltaRespRec) Name() string { return "Rec" }
+func (x DeltaRespRec) __VDLReflect(__DeltaRespReflect) {}
+
+func (x DeltaRespRespVec) Index() int { return 3 }
+func (x DeltaRespRespVec) Interface() interface{} { return x.Value }
+func (x DeltaRespRespVec) Name() string { return "RespVec" }
+func (x DeltaRespRespVec) __VDLReflect(__DeltaRespReflect) {}
+
func init() {
vdl.Register((*PrefixGenVector)(nil))
vdl.Register((*GenVector)(nil))
@@ -163,6 +231,8 @@
vdl.Register((*GroupId)(nil))
vdl.Register((*SyncGroupStatus)(nil))
vdl.Register((*SyncGroup)(nil))
+ vdl.Register((*DeltaReq)(nil))
+ vdl.Register((*DeltaResp)(nil))
}
const NoGroupId = GroupId(0)
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index 5eb1bb4..3dc3e78 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -16,6 +16,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
"v.io/v23/verror"
)
@@ -32,6 +33,34 @@
return []byte(join(string(key), string(version)))
}
+// GetVersion returns the current version of a managed key. This method is used
+// by the Sync module when the initiator is attempting to add new versions of
+// objects. Reading the version key is used for optimistic concurrency control.
+func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
+ wtx := st.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return nil, convertError(wtx.err)
+ }
+ return getVersion(wtx.itx, key)
+}
+
+// GetAtVersion returns the value of a managed key at the requested
+// version. This method is used by the Sync module when the responder needs to
+// send objects over the wire.
+func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+ wtx := st.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return valbuf, convertError(wtx.err)
+ }
+ return getAtVersion(wtx.itx, key, valbuf, version)
+}
+
func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
return st.Get(makeVersionKey(key), nil)
}
@@ -49,6 +78,41 @@
return getAtVersion(st, key, valbuf, version)
}
+// PutAtVersion puts a value for the managed key at the requested version. This
+// method is used by the Sync module exclusively when the initiator adds objects
+// with versions created on other Syncbases.
+func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
+ wtx := tx.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+
+ return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
+}
+
+// PutVersion updates the version of a managed key to the requested
+// version. This method is used by the Sync module exclusively when the
+// initiator selects which of the already stored versions (via PutAtVersion
+// calls) becomes the current version.
+func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
+ wtx := tx.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+
+ if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
+ return err
+ }
+ wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Value: version}})
+ return nil
+}
+
func putVersioned(tx store.Transaction, key, value []byte) error {
rngLock.Lock()
num := rng.Int63()
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index d158752..7e1b52b 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -56,7 +56,7 @@
perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
}
vlog.Infof("Perms: %v", perms)
- service, err := server.NewService(nil, nil, server.ServiceOptions{
+ service, err := server.NewService(ctx, nil, server.ServiceOptions{
Perms: perms,
RootDir: *rootDir,
Engine: *engine,
diff --git a/services/syncbase/vsync/cr.go b/services/syncbase/vsync/cr.go
new file mode 100644
index 0000000..f7b1827
--- /dev/null
+++ b/services/syncbase/vsync/cr.go
@@ -0,0 +1,135 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+// Policies for conflict resolution.
+// TODO(hpucha): Move relevant parts to client-facing vdl.
+const (
+ // Resolves conflicts by picking the update with the most recent timestamp.
+ useTime = iota
+
+ // TODO(hpucha): implement other policies.
+ // Resolves conflicts by using the app conflict resolver callbacks via store.
+ useCallback
+)
+
+var (
+ // conflictResolutionPolicy is the policy used to resolve conflicts.
+ conflictResolutionPolicy = useTime
+)
+
+// resolutionType represents how a conflict is resolved.
+type resolutionType byte
+
+const (
+ pickLocal resolutionType = iota // local update was chosen as the resolution.
+ pickRemote // remote update was chosen as the resolution.
+ createNew // new update was created as the resolution.
+)
+
+// conflictResolution represents the state of a conflict resolution.
+type conflictResolution struct {
+ ty resolutionType
+ rec *localLogRec // Valid only if ty == createNew.
+ val []byte // Valid only if ty == createNew.
+}
+
+// resolveConflicts resolves conflicts for updated objects. Conflicts may be
+// resolved by adding new versions or picking either the local or the remote
+// version.
+func (iSt *initiationState) resolveConflicts(ctx *context.T) error {
+ for obj, st := range iSt.updObjects {
+ if !st.isConflict {
+ continue
+ }
+
+ // TODO(hpucha): Look up policy from the schema. Currently,
+ // hardcoded to time.
+ var err error
+ st.res, err = iSt.resolveObjConflict(ctx, obj, st.oldHead, st.newHead, st.ancestor)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// resolveObjConflict resolves a conflict for an object given its ID and the 3
+// versions that express the conflict: the object's local version, its remote
+// version (from the device contacted), and the common ancestor from which both
+// versions branched away. The function returns the new object value according
+// to the conflict resolution policy.
+func (iSt *initiationState) resolveObjConflict(ctx *context.T, oid, local, remote, ancestor string) (*conflictResolution, error) {
+ // Fetch the log records of the 3 object versions.
+ versions := []string{local, remote, ancestor}
+ lrecs, err := iSt.getLogRecsBatch(ctx, oid, versions)
+ if err != nil {
+ return nil, err
+ }
+
+ // Resolve the conflict according to the resolution policy.
+ switch conflictResolutionPolicy {
+ case useTime:
+ return iSt.resolveObjConflictByTime(ctx, oid, lrecs[0], lrecs[1], lrecs[2])
+ default:
+ return nil, verror.New(verror.ErrInternal, ctx, "unknown conflict resolution policy", conflictResolutionPolicy)
+ }
+}
+
+// resolveObjConflictByTime resolves conflicts using the timestamps of the
+// conflicting mutations. It picks a mutation with the larger timestamp,
+// i.e. the most recent update. If the timestamps are equal, it uses the
+// mutation version numbers as a tie-breaker, picking the mutation with the
+// larger version. Instead of creating a new version that resolves the
+// conflict, we pick an existing version as the conflict resolution.
+func (iSt *initiationState) resolveObjConflictByTime(ctx *context.T, oid string, local, remote, ancestor *localLogRec) (*conflictResolution, error) {
+ var res conflictResolution
+ switch {
+ case local.Metadata.UpdTime.After(remote.Metadata.UpdTime):
+ res.ty = pickLocal
+ case local.Metadata.UpdTime.Before(remote.Metadata.UpdTime):
+ res.ty = pickRemote
+ case local.Metadata.CurVers > remote.Metadata.CurVers:
+ res.ty = pickLocal
+ case local.Metadata.CurVers < remote.Metadata.CurVers:
+ res.ty = pickRemote
+ }
+
+ return &res, nil
+}
+
+// getLogRecsBatch gets the log records for an array of versions.
+func (iSt *initiationState) getLogRecsBatch(ctx *context.T, obj string, versions []string) ([]*localLogRec, error) {
+ lrecs := make([]*localLogRec, len(versions))
+ var err error
+ for p, v := range versions {
+ lrecs[p], err = iSt.getLogRec(ctx, obj, v)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return lrecs, nil
+}
+
+// getLogRec returns the log record corresponding to a given object and its version.
+func (iSt *initiationState) getLogRec(ctx *context.T, obj, vers string) (*localLogRec, error) {
+ // TODO(hpucha): May be the change the name in dag for getLogrec. We now
+ // have a few functions of this name.
+ logKey, err := getLogrec(ctx, iSt.tx, obj, vers)
+ if err != nil {
+ return nil, err
+ }
+ dev, gen, err := splitLogRecKey(ctx, logKey)
+ if err != nil {
+ return nil, err
+ }
+ return getLogRec(ctx, iSt.tx, dev, gen)
+}
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index c3f135a..6177079 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -108,7 +108,7 @@
st := svc.St()
s := svc.sync
- oid, version := "1234", "7"
+ oid, version := "foo1", "7"
tx := st.NewTransaction()
if err := s.addParent(nil, tx, oid, version, "haha", nil); err == nil {
@@ -236,7 +236,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -293,7 +293,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
graft, err := s.dagReplayCommands(nil, "remote-init-00.log.sync")
if err != nil {
@@ -335,7 +335,7 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -367,7 +367,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -411,10 +411,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -450,7 +450,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -494,13 +494,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+ if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -543,7 +543,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -587,13 +587,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:1" {
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+ if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -630,7 +630,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -674,10 +674,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
@@ -919,7 +919,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -989,7 +989,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -1049,7 +1049,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
@@ -1111,7 +1111,7 @@
st := svc.St()
s := svc.sync
- oid := "1234"
+ oid := "foo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index f836cb2..1624301 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -4,18 +4,59 @@
package vsync
-import "time"
+import (
+ "sort"
+ "strings"
+ "time"
-var (
- // peerSyncInterval is the duration between two consecutive
- // peer contacts. During every peer contact, the initiator
- // obtains any pending updates from that peer.
- peerSyncInterval = 50 * time.Millisecond
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/verror"
+ "v.io/x/lib/set"
+ "v.io/x/lib/vlog"
)
-// contactPeers wakes up every peerSyncInterval to select a peer, and
-// get updates from it.
-func (s *syncService) contactPeers() {
+// Policies to pick a peer to sync with.
+const (
+ // Picks a peer at random from the available set.
+ selectRandom = iota
+
+ // TODO(hpucha): implement other policies.
+ // Picks a peer with most differing generations.
+ selectMostDiff
+
+ // Picks a peer that was synced with the furthest in the past.
+ selectOldest
+)
+
+var (
+ // peerSyncInterval is the duration between two consecutive peer
+ // contacts. During every peer contact, the initiator obtains any
+ // pending updates from that peer.
+ peerSyncInterval = 50 * time.Millisecond
+
+ // peerSelectionPolicy is the policy used to select a peer when
+ // the initiator gets a chance to sync.
+ peerSelectionPolicy = selectRandom
+)
+
+// syncer wakes up every peerSyncInterval to do work: (1) Act as an initiator
+// for SyncGroup metadata by selecting a SyncGroup Admin, and syncing Syncgroup
+// metadata with it (getting updates from the remote peer, detecting and
+// resolving conflicts); (2) Refresh memberView if needed and act as an
+// initiator for data by selecting a peer, and syncing data corresponding to all
+// common SyncGroups across all Databases; (3) Act as a SyncGroup publisher to
+// publish pending SyncGroups; (4) Garbage collect older generations.
+//
+// TODO(hpucha): Currently only does initiation. Add rest.
+func (s *syncService) syncer(ctx *context.T) {
+ // TODO(hpucha): Do we need context per initiator round?
+ ctx, cancel := context.WithRootCancel(ctx)
+ defer cancel()
+
ticker := time.NewTicker(peerSyncInterval)
for {
select {
@@ -27,5 +68,735 @@
}
// Do work.
+ peer, err := s.pickPeer(ctx)
+ if err != nil {
+ continue
+ }
+ s.getDeltasFromPeer(ctx, peer)
+ }
+}
+
+// getDeltasFromPeer performs an initiation round to the specified
+// peer. An initiation round consists of:
+// * Contacting the peer to receive all the deltas based on the local genvector.
+// * Processing those deltas to discover objects which have been updated.
+// * Processing updated objects to detect and resolve any conflicts if needed.
+// * Communicating relevant object updates to the Database.
+// The processing of the deltas is done one Database at a time.
+func (s *syncService) getDeltasFromPeer(ctx *context.T, peer string) {
+ info := s.allMembers.members[peer]
+ if info == nil {
+ return
+ }
+ connected := false
+ var stream interfaces.SyncGetDeltasClientCall
+
+ // Sync each Database that may have SyncGroups common with this peer,
+ // one at a time.
+ for gdbName, sgInfo := range info.db2sg {
+
+ // Initialize initiation state for syncing this Database.
+ iSt, err := newInitiationState(ctx, s, peer, gdbName, sgInfo)
+ if err != nil {
+ vlog.Errorf("getDeltasFromPeer:: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
+ continue
+ }
+
+ if len(iSt.sgIds) == 0 || len(iSt.sgPfxs) == 0 {
+ vlog.Errorf("getDeltasFromPeer:: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
+ continue
+ }
+
+ // Make contact with the peer once.
+ if !connected {
+ stream, connected = iSt.connectToPeer(ctx)
+ if !connected {
+ // Try a different Database. Perhaps there are
+ // different mount tables.
+ continue
+ }
+ }
+
+ // Create local genvec so that it contains knowledge only about common prefixes.
+ if err := iSt.createLocalGenVec(ctx); err != nil {
+ vlog.Errorf("getDeltasFromPeer:: error creating local genvec for gdb %s, err %v", gdbName, err)
+ continue
+ }
+
+ iSt.stream = stream
+ req := interfaces.DeltaReq{SgIds: iSt.sgIds, InitVec: iSt.local}
+ sender := iSt.stream.SendStream()
+ sender.Send(req)
+
+ // Obtain deltas from the peer over the network.
+ if err := iSt.recvAndProcessDeltas(ctx); err != nil {
+ vlog.Errorf("getDeltasFromPeer:: error receiving deltas for gdb %s, err %v", gdbName, err)
+ // Returning here since something could be wrong with
+ // the connection, and no point in attempting the next
+ // Database.
+ return
+ }
+
+ if err := iSt.processUpdatedObjects(ctx); err != nil {
+ vlog.Errorf("getDeltasFromPeer:: error processing objects for gdb %s, err %v", gdbName, err)
+ // Move to the next Database even if processing updates
+ // failed.
+ continue
+ }
+ }
+
+ if connected {
+ stream.Finish()
+ }
+}
+
+// initiationState is accumulated for each Database during an initiation round.
+type initiationState struct {
+ // Relative name of the peer to sync with.
+ peer string
+
+ // Collection of mount tables that this peer may have registered with.
+ mtTables map[string]struct{}
+
+ // SyncGroups being requested in the initiation round.
+ sgIds map[interfaces.GroupId]struct{}
+
+ // SyncGroup prefixes being requested in the initiation round.
+ sgPfxs map[string]struct{}
+
+ // Local generation vector.
+ local interfaces.GenVector
+
+ // Generation vector from the remote peer.
+ remote interfaces.GenVector
+
+ // Updated local generation vector at the end of the initiation round.
+ updLocal interfaces.GenVector
+
+ // State to track updated objects during a log replay.
+ updObjects map[string]*objConflictState
+
+ // DAG state that tracks conflicts and common ancestors.
+ dagGraft graftMap
+
+ sync *syncService
+ appName string
+ dbName string
+ st store.Store // Store handle to the Database.
+ stream interfaces.SyncGetDeltasClientCall // Stream handle for the GetDeltas RPC.
+
+ // Transaction handle for the initiation round. Used during the update
+ // of objects in the Database.
+ tx store.Transaction
+}
+
+// objConflictState contains the conflict state for an object that is updated
+// during an initiator round.
+type objConflictState struct {
+ isConflict bool
+ newHead string
+ oldHead string
+ ancestor string
+ res *conflictResolution
+}
+
+// newInitiationState creates new initiation state.
+func newInitiationState(ctx *context.T, s *syncService, peer string, name string, sgInfo sgMemberInfo) (*initiationState, error) {
+ iSt := &initiationState{}
+ iSt.peer = peer
+ iSt.updObjects = make(map[string]*objConflictState)
+ iSt.dagGraft = newGraft()
+ iSt.sync = s
+
+ // TODO(hpucha): Would be nice to standardize on the combined "app:db"
+ // name across sync (not syncbase) so we only join split/join them at
+ // the boundary with the store part.
+ var err error
+ iSt.appName, iSt.dbName, err = splitAppDbName(ctx, name)
+ if err != nil {
+ return nil, err
+ }
+
+ // TODO(hpucha): nil rpc.ServerCall ok?
+ iSt.st, err = s.getDbStore(ctx, nil, iSt.appName, iSt.dbName)
+ if err != nil {
+ return nil, err
+ }
+
+ iSt.peerMtTblsAndSgInfo(ctx, peer, sgInfo)
+
+ return iSt, nil
+}
+
+// peerMtTblsAndSgInfo computes the possible mount tables, the SyncGroup Ids and
+// prefixes common with a remote peer in a particular Database by consulting the
+// SyncGroups in the specified Database.
+func (iSt *initiationState) peerMtTblsAndSgInfo(ctx *context.T, peer string, info sgMemberInfo) {
+ iSt.mtTables = make(map[string]struct{})
+ iSt.sgIds = make(map[interfaces.GroupId]struct{})
+ iSt.sgPfxs = make(map[string]struct{})
+
+ for id := range info {
+ sg, err := getSyncGroupById(ctx, iSt.st, id)
+ if err != nil {
+ continue
+ }
+ if _, ok := sg.Joiners[peer]; !ok {
+ // Peer is no longer part of the SyncGroup.
+ continue
+ }
+ for _, mt := range sg.Spec.MountTables {
+ iSt.mtTables[mt] = struct{}{}
+ }
+ iSt.sgIds[id] = struct{}{}
+
+ for _, p := range sg.Spec.Prefixes {
+ iSt.sgPfxs[p] = struct{}{}
+ }
+ }
+}
+
+// connectToPeer attempts to connect to the remote peer using the mount tables
+// obtained from the SyncGroups being synced in the current Database.
+func (iSt *initiationState) connectToPeer(ctx *context.T) (interfaces.SyncGetDeltasClientCall, bool) {
+ if len(iSt.mtTables) < 1 {
+ vlog.Errorf("getDeltasFromPeer:: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
+ return nil, false
+ }
+ for mt := range iSt.mtTables {
+ c := interfaces.SyncClient(naming.Join(mt, iSt.peer))
+ stream, err := c.GetDeltas(ctx)
+ if err == nil {
+ return stream, true
+ }
+ }
+ return nil, false
+}
+
+// createLocalGenVec creates the generation vector with local knowledge for the
+// initiator to send to the responder.
+//
+// TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
+func (iSt *initiationState) createLocalGenVec(ctx *context.T) error {
+ // Freeze the most recent batch of local changes before fetching
+ // remote changes from a peer. This frozen state is used by the
+ // responder when responding to GetDeltas RPC.
+ //
+ // We only allow an initiator to freeze local generations (not
+ // responders/watcher) in order to maintain a static baseline
+ // for the duration of a sync. This addresses the following race
+ // condition: If we allow responders to use newer local
+ // generations while the initiator is in progress, they may beat
+ // the initiator and send these new generations to remote
+ // devices. These remote devices in turn can send these
+ // generations back to the initiator in progress which was
+ // started with older generation information.
+ if err := iSt.sync.checkPtLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
+ return err
+ }
+
+ local, lgen, err := iSt.sync.getDbGenInfo(ctx, iSt.appName, iSt.dbName)
+ if err != nil {
+ return err
+ }
+ localPfxs := extractAndSortPrefixes(local)
+
+ sgPfxs := set.String.ToSlice(iSt.sgPfxs)
+ sort.Strings(sgPfxs)
+
+ iSt.local = make(interfaces.GenVector)
+
+ if len(sgPfxs) == 0 {
+ return verror.New(verror.ErrInternal, ctx, "no syncgroups for syncing")
+ }
+
+ pfx := sgPfxs[0]
+ for _, p := range sgPfxs {
+ if strings.HasPrefix(p, pfx) && p != pfx {
+ continue
+ }
+
+ // Process this prefix as this is the start of a new set of
+ // nested prefixes.
+ pfx = p
+ var lpStart string
+ for _, lp := range localPfxs {
+ if !strings.HasPrefix(lp, pfx) && !strings.HasPrefix(pfx, lp) {
+ // No relationship with pfx.
+ continue
+ }
+ if strings.HasPrefix(pfx, lp) {
+ lpStart = lp
+ } else {
+ iSt.local[lp] = local[lp]
+ }
+ }
+ // Deal with the starting point.
+ if lpStart == "" {
+ // No matching prefixes for pfx were found.
+ iSt.local[pfx] = make(interfaces.PrefixGenVector)
+ iSt.local[pfx][iSt.sync.id] = lgen
+ } else {
+ iSt.local[pfx] = local[lpStart]
+ }
+ }
+ return nil
+}
+
+// recvAndProcessDeltas first receives the log records and generation vector
+// from the GetDeltas RPC and puts them in the Database. It also replays the
+// entire log stream as the log records arrive. These records span multiple
+// generations from different devices. It does not perform any conflict
+// resolution during replay. This avoids resolving conflicts that have already
+// been resolved by other devices.
+func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error {
+ // TODO(hpucha): This works for now, but figure out a long term solution
+ // as this may be implementation dependent. It currently works because
+ // the RecvStream call is stateless, and grabbing a handle to it
+ // repeatedly doesn't affect what data is seen next.
+ rcvr := iSt.stream.RecvStream()
+ start, finish := false, false
+ tx := iSt.st.NewTransaction()
+ committed := false
+
+ defer func() {
+ if !committed {
+ tx.Abort()
+ }
+ }()
+
+ // Track received batches.
+ batchMap := make(map[uint64]uint64)
+
+ for rcvr.Advance() {
+ resp := rcvr.Value()
+ switch v := resp.(type) {
+ case interfaces.DeltaRespStart:
+ if start {
+ return verror.New(verror.ErrInternal, ctx, "received start followed by start in delta response stream")
+ }
+ start = true
+
+ case interfaces.DeltaRespFinish:
+ if finish {
+ return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream")
+ }
+ finish = true
+ break
+
+ case interfaces.DeltaRespRespVec:
+ iSt.remote = v.Value
+
+ case interfaces.DeltaRespRec:
+ // Insert log record in Database.
+ // TODO(hpucha): Should we reserve more postions in a batch?
+ // TODO(hpucha): Handle if SyncGroup is left/destroyed while sync is in progress.
+ pos := iSt.sync.reservePosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+ rec := &localLogRec{Metadata: v.Value.Metadata, Pos: pos}
+ batchId := rec.Metadata.BatchId
+ if batchId != NoBatchId {
+ if cnt, ok := batchMap[batchId]; !ok {
+ if iSt.sync.startBatch(ctx, tx, batchId) != batchId {
+ return verror.New(verror.ErrInternal, ctx, "failed to create batch info")
+ }
+ batchMap[batchId] = rec.Metadata.BatchCount
+ } else if cnt != rec.Metadata.BatchCount {
+ return verror.New(verror.ErrInternal, ctx, "inconsistent counts for tid", batchId, cnt, rec.Metadata.BatchCount)
+ }
+ }
+
+ if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil {
+ return err
+ }
+ // Mark object dirty.
+ iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
+ }
+ }
+
+ if !(start && finish) {
+ return verror.New(verror.ErrInternal, ctx, "didn't receive start/finish delimiters in delta response stream")
+ }
+
+ if err := rcvr.Err(); err != nil {
+ return err
+ }
+
+ // End the started batches if any.
+ for bid, cnt := range batchMap {
+ if err := iSt.sync.endBatch(ctx, tx, bid, cnt); err != nil {
+ return err
+ }
+ }
+
+ // Commit this transaction. We do not retry this transaction since it
+ // should not conflict with any other keys. So if it fails, it is a
+ // non-retriable error.
+ err := tx.Commit()
+ if err == nil {
+ committed = true
+ }
+ return err
+}
+
+// insertRecInLogDagAndDb adds a new log record to log and dag data structures,
+// and inserts the versioned value in the Database.
+func (iSt *initiationState) insertRecInLogDagAndDb(ctx *context.T, rec *localLogRec, batchId uint64, valbuf []byte, tx store.Transaction) error {
+ if err := putLogRec(ctx, tx, rec); err != nil {
+ return err
+ }
+
+ m := rec.Metadata
+ logKey := logRecKey(m.Id, m.Gen)
+
+ var err error
+ switch m.RecType {
+ case interfaces.NodeRec:
+ err = iSt.sync.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, iSt.dagGraft)
+ case interfaces.LinkRec:
+ err = iSt.sync.addParent(ctx, tx, m.ObjId, m.CurVers, m.Parents[0], iSt.dagGraft)
+ default:
+ err = verror.New(verror.ErrInternal, ctx, "unknown log record type")
+ }
+
+ if err != nil {
+ return err
+ }
+ // TODO(hpucha): Hack right now. Need to change Database's handling of
+ // deleted objects.
+ if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
+ return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
+ }
+ return nil
+}
+
+// processUpdatedObjects processes all the updates received by the initiator,
+// one object at a time. For each updated object, we first check if the object
+// has any conflicts, resulting in three possibilities:
+//
+// * There is no conflict, and no updates are needed to the Database
+// (isConflict=false, newHead == oldHead). All changes received convey
+// information that still keeps the local head as the most recent version. This
+// occurs when conflicts are resolved by picking the existing local version.
+//
+// * There is no conflict, but a remote version is discovered that builds on the
+// local head (isConflict=false, newHead != oldHead). In this case, we generate
+// a Database update to simply update the Database to the latest value.
+//
+// * There is a conflict and we call into the app or use a well-known policy to
+// resolve the conflict, resulting in three possibilties: (a) conflict was
+// resolved by picking the local version. In this case, Database need not be
+// updated, but a link is added to record the choice. (b) conflict was resolved
+// by picking the remote version. In this case, Database is updated with the
+// remote version and a link is added as well. (c) conflict was resolved by
+// generating a new Database update. In this case, Database is updated with the
+// new version.
+//
+// We collect all the updates to the Database in a transaction. In addition, as
+// part of the same transaction, we update the log and dag state suitably (move
+// the head ptr of the object in the dag to the latest version, and create a new
+// log record reflecting conflict resolution if any). Finally, we update the
+// sync state first on storage. This transaction's commit can fail since
+// preconditions on the objects may have been violated. In this case, we wait to
+// get the latest versions of objects from the Database, and recheck if the object
+// has any conflicts and repeat the above steps, until the transaction commits
+// successfully. Upon commit, we also update the in-memory sync state of the
+// Database.
+func (iSt *initiationState) processUpdatedObjects(ctx *context.T) error {
+ // Note that the tx handle in initiation state is cached for the scope of
+ // this function only as different stages in the pipeline add to the
+ // transaction.
+ committed := false
+ defer func() {
+ if !committed {
+ iSt.tx.Abort()
+ }
+ }()
+
+ for {
+ iSt.tx = iSt.st.NewTransaction()
+
+ if err := iSt.detectConflicts(ctx); err != nil {
+ return err
+ }
+
+ if err := iSt.resolveConflicts(ctx); err != nil {
+ return err
+ }
+
+ if err := iSt.updateDbAndSyncSt(ctx); err != nil {
+ return err
+ }
+
+ err := iSt.tx.Commit()
+ if err == nil {
+ committed = true
+ // Update in-memory genvector since commit is successful.
+ if err := iSt.sync.putDbGenInfoRemote(ctx, iSt.appName, iSt.dbName, iSt.updLocal); err != nil {
+ vlog.Fatalf("processUpdatedObjects:: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
+ }
+ return nil
+ }
+
+ if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
+ return err
+ }
+
+ // TODO(hpucha): Sleeping and retrying is a temporary
+ // solution. Next iteration will have coordination with watch
+ // thread to intelligently retry. Hence this value is not a
+ // config param.
+ iSt.tx.Abort()
+ time.Sleep(1 * time.Second)
+ }
+}
+
+// detectConflicts iterates through all the updated objects to detect conflicts.
+func (iSt *initiationState) detectConflicts(ctx *context.T) error {
+ for obj, st := range iSt.updObjects {
+ // Check if object has a conflict.
+ var err error
+ st.isConflict, st.newHead, st.oldHead, st.ancestor, err = hasConflict(ctx, iSt.tx, obj, iSt.dagGraft)
+ if err != nil {
+ return err
+ }
+
+ if !st.isConflict {
+ st.res = &conflictResolution{ty: pickRemote}
+ }
+ }
+ return nil
+}
+
+// updateDbAndSync updates the Database, and if that is successful, updates log,
+// dag and genvector data structures as needed.
+func (iSt *initiationState) updateDbAndSyncSt(ctx *context.T) error {
+ for obj, st := range iSt.updObjects {
+ // If the local version is picked, no further updates to the
+ // Database are needed.
+ if st.res.ty == pickLocal {
+ continue
+ }
+
+ // If the remote version is picked or if a new version is
+ // created, we put it in the Database.
+
+ // TODO(hpucha): Hack right now. Need to change Database's
+ // handling of deleted objects.
+ oldVersDeleted := true
+ if st.oldHead != NoVersion {
+ oldDagNode, err := getNode(ctx, iSt.tx, obj, st.oldHead)
+ if err != nil {
+ return err
+ }
+ oldVersDeleted = oldDagNode.Deleted
+ }
+
+ var newVersion string
+ var newVersDeleted bool
+ switch st.res.ty {
+ case pickRemote:
+ newVersion = st.newHead
+ newDagNode, err := getNode(ctx, iSt.tx, obj, newVersion)
+ if err != nil {
+ return err
+ }
+ newVersDeleted = newDagNode.Deleted
+ case createNew:
+ newVersion = st.res.rec.Metadata.CurVers
+ newVersDeleted = st.res.rec.Metadata.Delete
+ }
+
+ // Skip delete followed by a delete.
+ if oldVersDeleted && newVersDeleted {
+ continue
+ }
+
+ if !oldVersDeleted {
+ // Read current version to enter it in the readset of the transaction.
+ version, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj))
+ if err != nil {
+ return err
+ }
+ if string(version) != st.oldHead {
+ return store.NewErrConcurrentTransaction(ctx)
+ }
+ } else {
+ // Ensure key doesn't exist.
+ if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+ return store.NewErrConcurrentTransaction(ctx)
+ }
+ }
+
+ if !newVersDeleted {
+ if st.res.ty == createNew {
+ if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(obj), st.res.val, []byte(newVersion)); err != nil {
+ return err
+ }
+ }
+ if err := watchable.PutVersion(ctx, iSt.tx, []byte(obj), []byte(newVersion)); err != nil {
+ return err
+ }
+ } else {
+ if err := iSt.tx.Delete([]byte(obj)); err != nil {
+ return err
+ }
+ }
+
+ if err := iSt.updateLogAndDag(ctx, obj); err != nil {
+ return err
+ }
+ }
+
+ return iSt.updateSyncSt(ctx)
+}
+
+// updateLogAndDag updates the log and dag data structures.
+func (iSt *initiationState) updateLogAndDag(ctx *context.T, obj string) error {
+ st, ok := iSt.updObjects[obj]
+ if !ok {
+ return verror.New(verror.ErrInternal, ctx, "object state not found", obj)
+ }
+ var newVersion string
+
+ if !st.isConflict {
+ newVersion = st.newHead
+ } else {
+ // Object had a conflict. Create a log record to reflect resolution.
+ var rec *localLogRec
+
+ switch {
+ case st.res.ty == pickLocal:
+ // Local version was picked as the conflict resolution.
+ rec = iSt.createLocalLinkLogRec(ctx, obj, st.oldHead, st.newHead)
+ newVersion = st.oldHead
+ case st.res.ty == pickRemote:
+ // Remote version was picked as the conflict resolution.
+ rec = iSt.createLocalLinkLogRec(ctx, obj, st.newHead, st.oldHead)
+ newVersion = st.newHead
+ default:
+ // New version was created to resolve the conflict.
+ rec = st.res.rec
+ newVersion = st.res.rec.Metadata.CurVers
+ }
+
+ if err := putLogRec(ctx, iSt.tx, rec); err != nil {
+ return err
+ }
+
+ // Add a new DAG node.
+ var err error
+ m := rec.Metadata
+ switch m.RecType {
+ case interfaces.NodeRec:
+ err = iSt.sync.addNode(ctx, iSt.tx, obj, m.CurVers, logRecKey(m.Id, m.Gen), m.Delete, m.Parents, NoBatchId, nil)
+ case interfaces.LinkRec:
+ err = iSt.sync.addParent(ctx, iSt.tx, obj, m.CurVers, m.Parents[0], nil)
+ default:
+ return verror.New(verror.ErrInternal, ctx, "unknown log record type")
+ }
+ if err != nil {
+ return err
+ }
+ }
+
+ // Move the head. This should be idempotent. We may move head to the
+ // local head in some cases.
+ return moveHead(ctx, iSt.tx, obj, newVersion)
+}
+
+func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, obj, vers, par string) *localLogRec {
+ gen, pos := iSt.sync.reserveGenAndPosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+
+ rec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{
+ Id: iSt.sync.id,
+ Gen: gen,
+ RecType: interfaces.LinkRec,
+
+ ObjId: obj,
+ CurVers: vers,
+ Parents: []string{par},
+ UpdTime: time.Now().UTC(),
+ BatchId: NoBatchId,
+ BatchCount: 1,
+ // TODO(hpucha): What is its batchid and count?
+ },
+ Pos: pos,
+ }
+ return rec
+}
+
+// updateSyncSt updates local sync state at the end of an initiator cycle.
+func (iSt *initiationState) updateSyncSt(ctx *context.T) error {
+ // Get the current local sync state.
+ dsInMem, err := iSt.sync.getDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
+ if err != nil {
+ return err
+ }
+ ds := &dbSyncState{
+ Gen: dsInMem.gen,
+ CkPtGen: dsInMem.ckPtGen,
+ GenVec: dsInMem.genvec,
+ }
+
+ // remote can be a subset of local.
+ for rpfx, respgv := range iSt.remote {
+ for lpfx, lpgv := range ds.GenVec {
+ if strings.HasPrefix(lpfx, rpfx) {
+ mergePrefixGenVectors(lpgv, respgv)
+ }
+ }
+ if _, ok := ds.GenVec[rpfx]; !ok {
+ ds.GenVec[rpfx] = respgv
+ }
+ }
+
+ iSt.updLocal = ds.GenVec
+ // Clean the genvector of any local state. Note that local state is held
+ // in gen/ckPtGen in sync state struct.
+ for _, pgv := range iSt.updLocal {
+ delete(pgv, iSt.sync.id)
+ }
+
+ // TODO(hpucha): Add knowledge compaction.
+
+ return putDbSyncState(ctx, iSt.tx, ds)
+}
+
+// mergePrefixGenVectors merges responder prefix genvector into local genvector.
+func mergePrefixGenVectors(lpgv, respgv interfaces.PrefixGenVector) {
+ for devid, rgen := range respgv {
+ gen, ok := lpgv[devid]
+ if !ok || gen < rgen {
+ lpgv[devid] = rgen
+ }
+ }
+}
+
+////////////////////////////////////////
+// Peer selection policies.
+
+// pickPeer picks a Syncbase to sync with.
+func (s *syncService) pickPeer(ctx *context.T) (string, error) {
+ switch peerSelectionPolicy {
+ case selectRandom:
+ members := s.getMembers(ctx)
+ // Remove myself from the set.
+ delete(members, s.name)
+ if len(members) == 0 {
+ return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
+ }
+
+ // Pick a peer at random.
+ ind := randIntn(len(members))
+ for m := range members {
+ if ind == 0 {
+ return m, nil
+ }
+ ind--
+ }
+ return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+ default:
+ return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
}
}
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
new file mode 100644
index 0000000..83fc8f4
--- /dev/null
+++ b/services/syncbase/vsync/initiator_test.go
@@ -0,0 +1,295 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
+)
+
+// TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
+// in file testdata/remote-init-00.log.sync.
+func TestLogStreamRemoteOnly(t *testing.T) {
+ svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync")
+ defer cleanup(t, svc)
+
+ // Check all log records.
+ objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+ var gen uint64
+ var parents []string
+ for gen = 1; gen < 4; gen++ {
+ gotRec, err := getLogRec(nil, svc.St(), 11, gen)
+ if err != nil || gotRec == nil {
+ t.Fatalf("getLogRec can not find object 11 %d, err %v", gen, err)
+ }
+ vers := fmt.Sprintf("%d", gen)
+ wantRec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{
+ Id: 11,
+ Gen: gen,
+ RecType: interfaces.NodeRec,
+ ObjId: objid,
+ CurVers: vers,
+ Parents: parents,
+ UpdTime: constTime,
+ BatchCount: 1,
+ },
+ Pos: gen - 1,
+ }
+
+ if !reflect.DeepEqual(gotRec, wantRec) {
+ t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
+ }
+ // Verify DAG state.
+ if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
+ t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
+ }
+ // Verify Database state.
+ tx := svc.St().NewTransaction()
+ if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
+ t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
+ }
+ tx.Abort()
+ parents = []string{vers}
+ }
+
+ // Verify conflict state.
+ if len(iSt.updObjects) != 1 {
+ t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+ }
+ st := iSt.updObjects[objid]
+ if st.isConflict {
+ t.Fatalf("Detected a conflict %v", st)
+ }
+ if st.newHead != "3" || st.oldHead != NoVersion {
+ t.Fatalf("Conflict detection didn't succeed %v", st)
+ }
+
+ // Verify genvec state.
+ wantVec := interfaces.GenVector{
+ "foo1": interfaces.PrefixGenVector{11: 3},
+ "bar": interfaces.PrefixGenVector{11: 0},
+ }
+ if !reflect.DeepEqual(iSt.updLocal, wantVec) {
+ t.Fatalf("Final local gen vec mismatch got %v, want %v", iSt.updLocal, wantVec)
+ }
+
+ // Verify DAG state.
+ if head, err := getHead(nil, svc.St(), objid); err != nil || head != "3" {
+ t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+ }
+
+ // Verify Database state.
+ valbuf, err := svc.St().Get([]byte(objid), nil)
+ if err != nil || string(valbuf) != "abc" {
+ t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+ }
+ tx := svc.St().NewTransaction()
+ version, err := watchable.GetVersion(nil, tx, []byte(objid))
+ if err != nil || string(version) != "3" {
+ t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(version), err)
+ }
+ tx.Abort()
+}
+
+// TestLogStreamNoConflict tests that a local and a remote log stream can be
+// correctly applied (when there are no conflicts). Commands are in files
+// testdata/<local-init-00.log.sync,remote-noconf-00.log.sync>.
+func TestLogStreamNoConflict(t *testing.T) {
+ svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync")
+ defer cleanup(t, svc)
+
+ objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+
+ // Check all log records.
+ var version uint64 = 1
+ var parents []string
+ for _, devid := range []uint64{10, 11} {
+ var gen uint64
+ for gen = 1; gen < 4; gen++ {
+ gotRec, err := getLogRec(nil, svc.St(), devid, gen)
+ if err != nil || gotRec == nil {
+ t.Fatalf("getLogRec can not find object %d:%d, err %v",
+ devid, gen, err)
+ }
+ vers := fmt.Sprintf("%d", version)
+ wantRec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{
+ Id: devid,
+ Gen: gen,
+ RecType: interfaces.NodeRec,
+ ObjId: objid,
+ CurVers: vers,
+ Parents: parents,
+ UpdTime: constTime,
+ BatchCount: 1,
+ },
+ Pos: gen - 1,
+ }
+
+ if !reflect.DeepEqual(gotRec, wantRec) {
+ t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
+ }
+
+ // Verify DAG state.
+ if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
+ t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
+ }
+ // Verify Database state.
+ tx := svc.St().NewTransaction()
+ if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
+ t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
+ }
+ tx.Abort()
+ parents = []string{vers}
+ version++
+ }
+ }
+
+ // Verify conflict state.
+ if len(iSt.updObjects) != 1 {
+ t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+ }
+ st := iSt.updObjects[objid]
+ if st.isConflict {
+ t.Fatalf("Detected a conflict %v", st)
+ }
+ if st.newHead != "6" || st.oldHead != "3" {
+ t.Fatalf("Conflict detection didn't succeed %v", st)
+ }
+
+ // Verify genvec state.
+ wantVec := interfaces.GenVector{
+ "foo1": interfaces.PrefixGenVector{11: 3},
+ "bar": interfaces.PrefixGenVector{11: 0},
+ }
+ if !reflect.DeepEqual(iSt.updLocal, wantVec) {
+ t.Fatalf("Final local gen vec failed got %v, want %v", iSt.updLocal, wantVec)
+ }
+
+ // Verify DAG state.
+ if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
+ t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+ }
+
+ // Verify Database state.
+ valbuf, err := svc.St().Get([]byte(objid), nil)
+ if err != nil || string(valbuf) != "abc" {
+ t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+ }
+ tx := svc.St().NewTransaction()
+ versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
+ if err != nil || string(versbuf) != "6" {
+ t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
+ }
+ tx.Abort()
+}
+
+//////////////////////////////
+// Helpers.
+
+func testInit(t *testing.T, lfile, rfile string) (*mockService, *initiationState, func(*testing.T, *mockService)) {
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ conflictResolutionPolicy = useTime
+ svc := createService(t)
+ cleanup := destroyService
+ s := svc.sync
+ s.id = 10 // initiator
+
+ sgId1 := interfaces.GroupId(1234)
+ nullInfo := nosql.SyncGroupMemberInfo{}
+ sgInfo := sgMemberInfo{
+ sgId1: nullInfo,
+ }
+
+ sg1 := &interfaces.SyncGroup{
+ Name: "sg1",
+ Id: sgId1,
+ AppName: "mockapp",
+ DbName: "mockdb",
+ Creator: "mockCreator",
+ SpecVersion: "etag-0",
+ Spec: nosql.SyncGroupSpec{
+ Prefixes: []string{"foo", "bar"},
+ MountTables: []string{"1/2/3/4", "5/6/7/8"},
+ },
+ Joiners: map[string]nosql.SyncGroupMemberInfo{
+ "a": nullInfo,
+ "b": nullInfo,
+ },
+ }
+
+ tx := svc.St().NewTransaction()
+ if err := addSyncGroup(nil, tx, sg1); err != nil {
+ t.Fatalf("cannot add SyncGroup ID %d, err %v", sg1.Id, err)
+ }
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("cannot commit adding SyncGroup ID %d, err %v", sg1.Id, err)
+ }
+
+ if lfile != "" {
+ replayLocalCommands(t, svc, lfile)
+ }
+
+ if rfile == "" {
+ return svc, nil, cleanup
+ }
+
+ gdb := appDbName("mockapp", "mockdb")
+ iSt, err := newInitiationState(nil, s, "b", gdb, sgInfo)
+ if err != nil {
+ t.Fatalf("newInitiationState failed with err %v", err)
+ }
+
+ testIfMapArrEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
+ testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
+
+ s.syncState[gdb] = &dbSyncStateInMem{}
+
+ // Create local genvec so that it contains knowledge only about common prefixes.
+ if err := iSt.createLocalGenVec(nil); err != nil {
+ t.Fatalf("createLocalGenVec failed with err %v", err)
+ }
+
+ wantVec := interfaces.GenVector{
+ "foo": interfaces.PrefixGenVector{10: 0},
+ "bar": interfaces.PrefixGenVector{10: 0},
+ }
+ if !reflect.DeepEqual(iSt.local, wantVec) {
+ t.Fatalf("createLocalGenVec failed got %v, want %v", iSt.local, wantVec)
+ }
+
+ iSt.stream = createReplayStream(t, rfile)
+
+ if err := iSt.recvAndProcessDeltas(nil); err != nil {
+ t.Fatalf("recvAndProcessDeltas failed with err %v", err)
+ }
+
+ if err := iSt.processUpdatedObjects(nil); err != nil {
+ t.Fatalf("processUpdatedObjects failed with err %v", err)
+ }
+ return svc, iSt, cleanup
+}
+
+func testIfMapArrEqual(t *testing.T, m map[string]struct{}, a []string) {
+ if len(a) != len(m) {
+ t.Fatalf("testIfMapArrEqual diff lengths, got %v want %v", a, m)
+ }
+
+ for _, p := range a {
+ if _, ok := m[p]; !ok {
+ t.Fatalf("testIfMapArrEqual want %v", p)
+ }
+ }
+}
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
index 470d086..98b7d20 100644
--- a/services/syncbase/vsync/replay_test.go
+++ b/services/syncbase/vsync/replay_test.go
@@ -10,11 +10,17 @@
import (
"bufio"
+ "container/list"
"fmt"
"os"
"strconv"
"strings"
+ "testing"
+ "time"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/v23/context"
)
@@ -23,15 +29,23 @@
addRemote
linkLocal
linkRemote
+ genvec
+)
+
+var (
+ constTime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
)
type syncCommand struct {
- cmd int
- oid string
- version string
- parents []string
- logrec string
- deleted bool
+ cmd int
+ oid string
+ version string
+ parents []string
+ logrec string
+ deleted bool
+ batchId uint64
+ batchCount uint64
+ genVec interfaces.GenVector
}
// parseSyncCommands parses a sync test file and returns its commands.
@@ -70,16 +84,26 @@
}
}
+ batchId, err := strconv.ParseUint(args[6], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid batchId: %s", file, lineno, args[6])
+ }
+ batchCount, err := strconv.ParseUint(args[7], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid batch count: %s", file, lineno, args[7])
+ }
del, err := strconv.ParseBool(args[8])
if err != nil {
return nil, fmt.Errorf("%s:%d: invalid deleted bit: %s", file, lineno, args[8])
}
cmd := syncCommand{
- oid: args[1],
- version: args[2],
- parents: parents,
- logrec: args[5],
- deleted: del,
+ oid: args[1],
+ version: args[2],
+ parents: parents,
+ logrec: args[5],
+ batchId: batchId,
+ batchCount: batchCount,
+ deleted: del,
}
if args[0] == "addl" {
cmd.cmd = addLocal
@@ -116,6 +140,33 @@
}
cmds = append(cmds, cmd)
+ case "genvec":
+ cmd := syncCommand{
+ cmd: genvec,
+ genVec: make(interfaces.GenVector),
+ }
+ for i := 1; i < len(args); i = i + 2 {
+ pfx := args[i]
+ genVec := make(interfaces.PrefixGenVector)
+ for _, elem := range strings.Split(args[i+1], ",") {
+ kv := strings.Split(elem, ":")
+ if len(kv) != 2 {
+ return nil, fmt.Errorf("%s:%d: invalid gen vector key/val: %s", file, lineno, elem)
+ }
+ dev, err := strconv.ParseUint(kv[0], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid devid: %s", file, lineno, args[i+1])
+ }
+ gen, err := strconv.ParseUint(kv[1], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid gen: %s", file, lineno, args[i+1])
+ }
+ genVec[dev] = gen
+ }
+ cmd.genVec[pfx] = genVec
+ }
+ cmds = append(cmds, cmd)
+
default:
return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
}
@@ -179,3 +230,163 @@
return graft, nil
}
+
+// dummyStream emulates stream of log records received from RPC.
+type dummyStream struct {
+ l *list.List
+ entry interfaces.DeltaResp
+}
+
+func newStream() *dummyStream {
+ ds := &dummyStream{
+ l: list.New(),
+ }
+ return ds
+}
+
+func (ds *dummyStream) add(entry interfaces.DeltaResp) {
+ ds.l.PushBack(entry)
+}
+
+func (ds *dummyStream) Advance() bool {
+ if ds.l.Len() > 0 {
+ ds.entry = ds.l.Remove(ds.l.Front()).(interfaces.DeltaResp)
+ return true
+ }
+ return false
+}
+
+func (ds *dummyStream) Value() interfaces.DeltaResp {
+ return ds.entry
+}
+
+func (ds *dummyStream) RecvStream() interface {
+ Advance() bool
+ Value() interfaces.DeltaResp
+ Err() error
+} {
+ return ds
+}
+
+func (*dummyStream) Err() error { return nil }
+
+func (ds *dummyStream) Finish() error {
+ return nil
+}
+
+func (ds *dummyStream) Cancel() {
+}
+
+func (ds *dummyStream) SendStream() interface {
+ Send(item interfaces.DeltaReq) error
+ Close() error
+} {
+ return ds
+}
+
+func (ds *dummyStream) Send(item interfaces.DeltaReq) error {
+ return nil
+}
+
+func (ds *dummyStream) Close() error {
+ return nil
+}
+
+// replayLocalCommands replays local log records parsed from the input file.
+func replayLocalCommands(t *testing.T, s *mockService, syncfile string) {
+ cmds, err := parseSyncCommands(syncfile)
+ if err != nil {
+ t.Fatalf("parseSyncCommands failed with err %v", err)
+ }
+
+ tx := s.St().NewTransaction()
+ var pos uint64
+ for _, cmd := range cmds {
+ switch cmd.cmd {
+ case addLocal:
+ rec := &localLogRec{
+ Metadata: createMetadata(t, interfaces.NodeRec, cmd),
+ Pos: pos,
+ }
+ err = s.sync.processLocalLogRec(nil, tx, rec)
+ if err != nil {
+ t.Fatalf("processLocalLogRec failed with err %v", err)
+ }
+
+ // Add to Store.
+ err = watchable.PutVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte(rec.Metadata.CurVers))
+ if err != nil {
+ t.Fatalf("PutVersion failed with err %v", err)
+ }
+ err = watchable.PutAtVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte("abc"), []byte(rec.Metadata.CurVers))
+ if err != nil {
+ t.Fatalf("PutAtVersion failed with err %v", err)
+ }
+
+ default:
+ t.Fatalf("replayLocalCommands failed with unknown command %v", cmd)
+ }
+ pos++
+ }
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("cannot commit local log records %s, err %v", syncfile, err)
+ }
+}
+
+// createReplayStream creates a dummy stream of log records parsed from the input file.
+func createReplayStream(t *testing.T, syncfile string) *dummyStream {
+ cmds, err := parseSyncCommands(syncfile)
+ if err != nil {
+ t.Fatalf("parseSyncCommands failed with err %v", err)
+ }
+
+ stream := newStream()
+ start := interfaces.DeltaRespStart{true}
+ stream.add(start)
+
+ for _, cmd := range cmds {
+ var ty byte
+ switch cmd.cmd {
+ case genvec:
+ gv := interfaces.DeltaRespRespVec{cmd.genVec}
+ stream.add(gv)
+ continue
+ case addRemote:
+ ty = interfaces.NodeRec
+ case linkRemote:
+ ty = interfaces.LinkRec
+ default:
+ t.Fatalf("createReplayStream unknown command %v", cmd)
+ }
+
+ rec := interfaces.DeltaRespRec{interfaces.LogRec{
+ Metadata: createMetadata(t, ty, cmd),
+ Value: []byte("abc"),
+ }}
+
+ stream.add(rec)
+ }
+ fin := interfaces.DeltaRespFinish{true}
+ stream.add(fin)
+ return stream
+}
+
+func createMetadata(t *testing.T, ty byte, cmd syncCommand) interfaces.LogRecMetadata {
+ id, gen, err := splitLogRecKey(nil, cmd.logrec)
+ if err != nil {
+ t.Fatalf("createReplayStream splitLogRecKey failed, key %s, err %v", cmd.logrec, gen)
+ }
+ m := interfaces.LogRecMetadata{
+ Id: id,
+ Gen: gen,
+ RecType: ty,
+ ObjId: util.JoinKeyParts(util.RowPrefix, cmd.oid),
+ CurVers: cmd.version,
+ Parents: cmd.parents,
+ UpdTime: constTime,
+ Delete: cmd.deleted,
+ BatchId: cmd.batchId,
+ BatchCount: cmd.batchCount,
+ }
+ return m
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 775d29f..ca2e3fc 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -82,6 +82,13 @@
return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2))
}
+// randIntn generates as an int, a non-negative pseudo-random number in [0,n).
+func randIntn(n int) int {
+ rngLock.Lock()
+ defer rngLock.Unlock()
+ return rng.Intn(n)
+}
+
// New creates a new sync module.
//
// Concurrency: sync initializes two goroutines at startup: a "watcher" and an
@@ -125,11 +132,18 @@
go s.watchStore()
// Start initiator thread to periodically get deltas from peers.
- go s.contactPeers()
+ go s.syncer(ctx)
return s, nil
}
+// Close cleans up sync state.
+// TODO(hpucha): Hook it up to server shutdown of syncbased.
+func (s *syncService) Close() {
+ close(s.closed)
+ s.pending.Wait()
+}
+
func NewSyncDatabase(db interfaces.Database) *syncDatabase {
return &syncDatabase{db: db}
}
@@ -137,7 +151,7 @@
////////////////////////////////////////
// Core sync method.
-func (s *syncService) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
return verror.NewErrNotImplemented(ctx)
}
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index fed4c9a..f025327 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -34,12 +34,12 @@
"container/heap"
"fmt"
"sort"
+ "strconv"
"strings"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
-
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
@@ -47,9 +47,11 @@
// dbSyncStateInMem represents the in-memory sync state of a Database.
type dbSyncStateInMem struct {
- gen uint64
- pos uint64
- genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+ gen uint64
+ pos uint64
+
+ ckPtGen uint64
+ genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
}
// initSync initializes the sync module during startup. It scans all the
@@ -136,6 +138,44 @@
return gen, pos
}
+// checkPtLocalGen freezes the local generation number for the responder's use.
+func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+ }
+
+ ds.ckPtGen = ds.gen
+ return nil
+}
+
+// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
+func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name)
+ }
+
+ dsCopy := &dbSyncStateInMem{
+ gen: ds.gen,
+ pos: ds.pos,
+ ckPtGen: ds.ckPtGen,
+ }
+
+ // Make a copy of the genvec.
+ dsCopy.genvec = copyGenVec(ds.genvec)
+
+ return dsCopy, nil
+}
+
// getDbGenInfo returns a copy of the current generation information of the Database.
func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
s.syncStateLock.Lock()
@@ -146,16 +186,33 @@
if !ok {
return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
+
// Make a copy of the genvec.
- genvec := make(interfaces.GenVector)
- for p, dspgv := range ds.genvec {
- pgv := make(interfaces.PrefixGenVector)
- for id, gen := range dspgv {
- pgv[id] = gen
- }
- genvec[p] = pgv
+ genvec := copyGenVec(ds.genvec)
+
+ // Add local generation information to the genvec.
+ for _, gv := range genvec {
+ gv[s.id] = ds.ckPtGen
}
- return genvec, ds.gen, nil
+
+ return genvec, ds.ckPtGen, nil
+}
+
+// putDbGenInfoRemote puts the current remote generation information of the Database.
+func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+ }
+
+ // Make a copy of the genvec.
+ ds.genvec = copyGenVec(genvec)
+
+ return nil
}
// appDbName combines the app and db names to return a globally unique name for
@@ -165,6 +222,28 @@
return util.JoinKeyParts(appName, dbName)
}
+// splitAppDbName is the inverse of appDbName and returns app and db name from a
+// globally unique name for a Database.
+func splitAppDbName(ctx *context.T, name string) (string, string, error) {
+ parts := util.SplitKeyParts(name)
+ if len(parts) != 2 {
+ return "", "", verror.New(verror.ErrInternal, ctx, "invalid appDbName", name)
+ }
+ return parts[0], parts[1], nil
+}
+
+func copyGenVec(in interfaces.GenVector) interfaces.GenVector {
+ genvec := make(interfaces.GenVector)
+ for p, inpgv := range in {
+ pgv := make(interfaces.PrefixGenVector)
+ for id, gen := range inpgv {
+ pgv[id] = gen
+ }
+ genvec[p] = pgv
+ }
+ return genvec
+}
+
////////////////////////////////////////////////////////////
// Low-level utility functions to access sync state.
@@ -204,6 +283,24 @@
return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
}
+// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
+func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) {
+ parts := util.SplitKeyParts(key)
+ verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
+ if len(parts) != 4 {
+ return 0, 0, verr
+ }
+ id, err := strconv.ParseUint(parts[2], 10, 64)
+ if err != nil {
+ return 0, 0, verr
+ }
+ gen, err := strconv.ParseUint(parts[3], 10, 64)
+ if err != nil {
+ return 0, 0, verr
+ }
+ return id, gen, nil
+}
+
// hasLogRec returns true if the log record for (devid, gen) exists.
func hasLogRec(st store.StoreReader, id, gen uint64) bool {
// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
@@ -399,9 +496,6 @@
// prefix genvectors to adjust the delta bound and
// include in outVec.
respgv = respVec[rp]
- // Add local generation information to
- // responder's genvec and returned genvec.
- respgv[s.id] = respGen
s.diffPrefixGenVectors(respgv, initpgv, diff)
outVec[rp] = respgv
}
@@ -411,10 +505,10 @@
if rpStart == "" {
// No matching prefixes for pfx were found.
respgv = make(interfaces.PrefixGenVector)
+ respgv[s.id] = respGen
} else {
respgv = respVec[rpStart]
}
- respgv[s.id] = respGen
s.diffPrefixGenVectors(respgv, initpgv, diff)
outVec[pfx] = respgv
}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 4a41e17..57c1f66 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -21,6 +21,7 @@
// Database log.
func TestReserveGenAndPos(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
s := svc.sync
var wantGen, wantPos uint64 = 1, 0
@@ -42,6 +43,7 @@
// TestPutGetDbSyncState tests setting and getting sync metadata.
func TestPutGetDbSyncState(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
checkDbSyncState(t, st, false, nil)
@@ -67,6 +69,7 @@
// TestPutGetDelLogRec tests setting, getting, and deleting a log record.
func TestPutGetDelLogRec(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
var id uint64 = 10
@@ -113,6 +116,7 @@
// TestDiffPrefixGenVectors tests diffing prefix gen vectors.
func TestDiffPrefixGenVectors(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
s := svc.sync
s.id = 10 //responder. Initiator is id 11.
@@ -437,7 +441,7 @@
s.id = 10 //responder.
wantDiff, wantVec := test.genDiff, test.outVec
- s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, genvec: test.respVec}
+ s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
gotDiff, gotVec, err := s.computeDeltaBound(nil, appName, dbName, test.initVec)
if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
@@ -487,6 +491,8 @@
t.Fatalf("sendDeltasPerDatabase failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
}
ts.diffLogRecs(t, wantRecs)
+
+ destroyService(t, svc)
}
}
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 31ad4c2..c37dba7 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -9,6 +9,7 @@
import (
"reflect"
"testing"
+ "time"
"v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
@@ -41,6 +42,8 @@
// TestAddSyncGroup tests adding SyncGroups.
func TestAddSyncGroup(t *testing.T) {
+ // Set a large value to prevent the threads from firing.
+ peerSyncInterval = 1 * time.Hour
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
@@ -180,6 +183,8 @@
// TestInvalidAddSyncGroup tests adding SyncGroups.
func TestInvalidAddSyncGroup(t *testing.T) {
+ // Set a large value to prevent the threads from firing.
+ peerSyncInterval = 1 * time.Hour
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
@@ -214,6 +219,8 @@
// TestDeleteSyncGroup tests deleting a SyncGroup.
func TestDeleteSyncGroup(t *testing.T) {
+ // Set a large value to prevent the threads from firing.
+ peerSyncInterval = 1 * time.Hour
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
@@ -300,6 +307,8 @@
// TestMultiSyncGroups tests creating multiple SyncGroups.
func TestMultiSyncGroups(t *testing.T) {
+ // Set a large value to prevent the threads from firing.
+ peerSyncInterval = 1 * time.Hour
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
diff --git a/services/syncbase/vsync/testdata/local-init-00.log.sync b/services/syncbase/vsync/testdata/local-init-00.log.sync
index 46b3502..7435348 100644
--- a/services/syncbase/vsync/testdata/local-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/local-init-00.log.sync
@@ -1,6 +1,6 @@
# Create an object locally and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addl|1234|1|||logrec-00|0|1|false
-addl|1234|2|1||logrec-01|0|1|false
-addl|1234|3|2||logrec-02|0|1|false
+addl|foo1|1|||$sync:log:10:1|0|1|false
+addl|foo1|2|1||$sync:log:10:2|0|1|false
+addl|foo1|3|2||$sync:log:10:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/local-resolve-00.sync b/services/syncbase/vsync/testdata/local-resolve-00.sync
index 2c87de5..1666cf0 100644
--- a/services/syncbase/vsync/testdata/local-resolve-00.sync
+++ b/services/syncbase/vsync/testdata/local-resolve-00.sync
@@ -1,4 +1,4 @@
# Create an object locally and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addl|1234|7|3|6|logrec-06|0|1|false
+addl|foo1|7|3|6|logrec-06|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-00.log.sync b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
index 1f9bb5b..060cf0c 100644
--- a/services/syncbase/vsync/testdata/remote-conf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
@@ -3,6 +3,6 @@
# it from the local sync at v2, then updated separately).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|2||VeyronPhone:10:1:0|0|1|false
-addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
+addr|foo1|4|2||$sync:log:11:1|0|1|false
+addr|foo1|5|4||$sync:log:11:2|0|1|false
+addr|foo1|6|5||$sync:log:11:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-01.log.sync b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
index 9581f69..2053157 100644
--- a/services/syncbase/vsync/testdata/remote-conf-01.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
@@ -5,6 +5,6 @@
# sees 2 graft points: v1-v4 and v2-v5.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|1||VeyronLaptop:10:1:0|0|1|false
-addr|1234|5|2|4|VeyronPhone:10:1:0|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:1|0|1|false
+addr|foo1|4|1||$sync:log:12:1|0|1|false
+addr|foo1|5|2|4|$sync:log:11:1|0|1|false
+addr|foo1|6|5||$sync:log:11:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-03.log.sync b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
index c293656..673405e 100644
--- a/services/syncbase/vsync/testdata/remote-conf-03.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
@@ -1,6 +1,6 @@
# Create the same object remotely from scratch and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|||VeyronPhone:10:1:0|0|1|false
-addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
+addr|foo1|4|||$sync:log:11:1|0|1|false
+addr|foo1|5|4||$sync:log:11:2|0|1|false
+addr|foo1|6|5||$sync:log:11:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-link.log.sync b/services/syncbase/vsync/testdata/remote-conf-link.log.sync
index a324e4f..bdf0331 100644
--- a/services/syncbase/vsync/testdata/remote-conf-link.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-link.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the local version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|4|2||VeyronPhone:10:1:1
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|4|2||$sync:log:11:2
diff --git a/services/syncbase/vsync/testdata/remote-init-00.log.sync b/services/syncbase/vsync/testdata/remote-init-00.log.sync
index 9795d53..2546c47 100644
--- a/services/syncbase/vsync/testdata/remote-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-init-00.log.sync
@@ -1,6 +1,7 @@
# Create an object remotely and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|1|||VeyronPhone:10:1:0|0|1|false
-addr|1234|2|1||VeyronPhone:10:1:1|0|1|false
-addr|1234|3|2||VeyronPhone:10:1:2|0|1|false
+addr|foo1|1|||$sync:log:11:1|0|1|false
+addr|foo1|2|1||$sync:log:11:2|0|1|false
+addr|foo1|3|2||$sync:log:11:3|0|1|false
+genvec|foo1|10:0,11:3|bar|11:0
\ No newline at end of file
diff --git a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
index e2e2afa..6adf5dd 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
@@ -3,6 +3,7 @@
# received it from the local sync first, then updated it).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|3||VeyronPhone:10:1:0|0|1|false
-addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
+addr|foo1|4|3||$sync:log:11:1|0|1|false
+addr|foo1|5|4||$sync:log:11:2|0|1|false
+addr|foo1|6|5||$sync:log:11:3|0|1|false
+genvec|foo1|10:0,11:3|bar|11:0
\ No newline at end of file
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
index 6945bb2..a06bec5 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the remote version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|2|4||VeyronPhone:10:1:1
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|2|4||$sync:log:11:2
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
index 0c6969e..1271e23 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the local version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|4|3||VeyronPhone:10:1:1
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|4|3||$sync:log:11:2
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
index df9e128..890d2bc 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
@@ -1,6 +1,6 @@
# Update an object remotely, detect conflict, and bless the remote version, and continue updating.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|3|4||VeyronPhone:10:1:1
-addr|1234|5|3||VeyronPhone:10:2:0|0|1|false
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|3|4||$sync:log:11:2
+addr|foo1|5|3||$sync:log:11:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
index 82e11c6..31e85a9 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
@@ -1,4 +1,4 @@
# Resolve the same conflict on two different devices.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-linkr|1234|3|4||VeyronLaptop:10:1:0
+linkr|foo1|3|4||$sync:log:12:1
\ No newline at end of file
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index d061f4a..226f739 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -15,8 +15,9 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
- Gen uint64 // local generation number incremented on every local update.
- GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+ Gen uint64 // local generation number incremented on every local update.
+ CkPtGen uint64 // local generation number advertised to remote peers (used by the responder).
+ GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
}
// localLogRec represents the persistent local state of a log record. Metadata
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index 54c0466..ae51caa 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -27,8 +27,9 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
- Gen uint64 // local generation number incremented on every local update.
- GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+ Gen uint64 // local generation number incremented on every local update.
+ CkPtGen uint64 // local generation number advertised to remote peers (used by the responder).
+ GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
}
func (dbSyncState) __VDLReflect(struct {
diff --git a/services/syncbase/vsync/util_test.go b/services/syncbase/vsync/util_test.go
index cbaad6c..0848d35 100644
--- a/services/syncbase/vsync/util_test.go
+++ b/services/syncbase/vsync/util_test.go
@@ -14,20 +14,24 @@
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/test"
)
// mockService emulates a Syncbase service that includes store and sync.
// It is used to access a mock application.
type mockService struct {
- engine string
- path string
- st store.Store
- sync *syncService
+ engine string
+ path string
+ st store.Store
+ sync *syncService
+ shutdown func()
}
func (s *mockService) St() store.Store {
@@ -114,6 +118,7 @@
// createService creates a mock Syncbase service used for testing sync functionality.
func createService(t *testing.T) *mockService {
+ ctx, shutdown := test.V23Init()
engine := "leveldb"
path := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
@@ -121,13 +126,17 @@
if err != nil {
t.Fatalf("cannot create store %s (%s): %v", engine, path, err)
}
+ st, err = watchable.Wrap(st, &watchable.Options{
+ ManagedPrefixes: []string{util.RowPrefix},
+ })
s := &mockService{
- st: st,
- engine: engine,
- path: path,
+ st: st,
+ engine: engine,
+ path: path,
+ shutdown: shutdown,
}
- if s.sync, err = New(nil, nil, s); err != nil {
+ if s.sync, err = New(ctx, nil, s); err != nil {
util.DestroyStore(engine, path)
t.Fatalf("cannot create sync service: %v", err)
}
@@ -136,6 +145,8 @@
// destroyService cleans up the mock Syncbase service.
func destroyService(t *testing.T, s *mockService) {
+ defer s.shutdown()
+ defer s.sync.Close()
if err := util.DestroyStore(s.engine, s.path); err != nil {
t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.path, err)
}