add timestamp to joiners and implement syngroup resolve
This CL adds syncgroup resolve and tests and also
changes the Joiner list to also include a timestamp and
a tombstone flag for each joiner and to use that when
performing conflict resolution.
Change-Id: I9ed80c008550480e962baca164aca54311b4ed54
diff --git a/services/syncbase/server/interfaces/interfaces.vdl.go b/services/syncbase/server/interfaces/interfaces.vdl.go
index 22bc11f..f9557cc 100644
--- a/services/syncbase/server/interfaces/interfaces.vdl.go
+++ b/services/syncbase/server/interfaces/interfaces.vdl.go
@@ -603,15 +603,106 @@
return nil
}
+// SyncgroupMemberState contains information about a joiner and the internal bookkeeping
+// state required for resolving conflicts on this joiner's join/leave activity.
+type SyncgroupMemberState struct {
+ // Timestamp of when the member last joined/left the syncgroup. This timestamp is updated
+ // even when an existing member rejoins a syncgroup. Represented as Unix time.
+ WhenUpdated int64
+ // If set then this record indicates that this member has left the group. The SyncgroupMember
+ // entry is retained after a delete so that it can be used during conflict resolution, when
+ // one node indicates that the member has left the group and another says that the member
+ // is still in the group.
+ HasLeft bool
+ // Information supplied when requesting a join.
+ MemberInfo syncbase.SyncgroupMemberInfo
+}
+
+func (SyncgroupMemberState) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.SyncgroupMemberState"`
+}) {
+}
+
+func (x SyncgroupMemberState) VDLIsZero() bool {
+ return x == SyncgroupMemberState{}
+}
+
+func (x SyncgroupMemberState) VDLWrite(enc vdl.Encoder) error {
+ if err := enc.StartValue(__VDLType_struct_9); err != nil {
+ return err
+ }
+ if x.WhenUpdated != 0 {
+ if err := enc.NextFieldValueInt("WhenUpdated", vdl.Int64Type, x.WhenUpdated); err != nil {
+ return err
+ }
+ }
+ if x.HasLeft {
+ if err := enc.NextFieldValueBool("HasLeft", vdl.BoolType, x.HasLeft); err != nil {
+ return err
+ }
+ }
+ if x.MemberInfo != (syncbase.SyncgroupMemberInfo{}) {
+ if err := enc.NextField("MemberInfo"); err != nil {
+ return err
+ }
+ if err := x.MemberInfo.VDLWrite(enc); err != nil {
+ return err
+ }
+ }
+ if err := enc.NextField(""); err != nil {
+ return err
+ }
+ return enc.FinishValue()
+}
+
+func (x *SyncgroupMemberState) VDLRead(dec vdl.Decoder) error {
+ *x = SyncgroupMemberState{}
+ if err := dec.StartValue(__VDLType_struct_9); err != nil {
+ return err
+ }
+ for {
+ f, err := dec.NextField()
+ if err != nil {
+ return err
+ }
+ switch f {
+ case "":
+ return dec.FinishValue()
+ case "WhenUpdated":
+ switch value, err := dec.ReadValueInt(64); {
+ case err != nil:
+ return err
+ default:
+ x.WhenUpdated = value
+ }
+ case "HasLeft":
+ switch value, err := dec.ReadValueBool(); {
+ case err != nil:
+ return err
+ default:
+ x.HasLeft = value
+ }
+ case "MemberInfo":
+ if err := x.MemberInfo.VDLRead(dec); err != nil {
+ return err
+ }
+ default:
+ if err := dec.SkipValue(); err != nil {
+ return err
+ }
+ }
+ }
+}
+
// Syncgroup contains the state of a syncgroup.
type Syncgroup struct {
- Id syncbase.Id // the relative syncgroup Id chosen by app
- SpecVersion string // version on syncgroup spec for concurrency control
- Spec syncbase.SyncgroupSpec // app-given specification
- Creator string // Creator's Vanadium name
- DbId syncbase.Id // Globally unique database id
- Status SyncgroupStatus // Status of the syncgroup
- Joiners map[string]syncbase.SyncgroupMemberInfo // map of joiners to their metadata
+ Id syncbase.Id // the relative syncgroup Id chosen by app
+ SpecVersion string // version on syncgroup spec for concurrency control
+ Spec syncbase.SyncgroupSpec // app-given specification
+ Creator string // Creator's Vanadium name
+ DbId syncbase.Id // Globally unique database id
+ Status SyncgroupStatus // Status of the syncgroup
+ Joiners map[string]SyncgroupMemberState // map of joiners to their metadata
}
func (Syncgroup) __VDLReflect(struct {
@@ -645,7 +736,7 @@
}
func (x Syncgroup) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_9); err != nil {
+ if err := enc.StartValue(__VDLType_struct_11); err != nil {
return err
}
if x.Id != (syncbase.Id{}) {
@@ -701,8 +792,8 @@
return enc.FinishValue()
}
-func __VDLWriteAnon_map_2(enc vdl.Encoder, x map[string]syncbase.SyncgroupMemberInfo) error {
- if err := enc.StartValue(__VDLType_map_12); err != nil {
+func __VDLWriteAnon_map_2(enc vdl.Encoder, x map[string]SyncgroupMemberState) error {
+ if err := enc.StartValue(__VDLType_map_14); err != nil {
return err
}
if err := enc.SetLenHint(len(x)); err != nil {
@@ -724,7 +815,7 @@
func (x *Syncgroup) VDLRead(dec vdl.Decoder) error {
*x = Syncgroup{}
- if err := dec.StartValue(__VDLType_struct_9); err != nil {
+ if err := dec.StartValue(__VDLType_struct_11); err != nil {
return err
}
for {
@@ -782,13 +873,13 @@
}
}
-func __VDLReadAnon_map_2(dec vdl.Decoder, x *map[string]syncbase.SyncgroupMemberInfo) error {
- if err := dec.StartValue(__VDLType_map_12); err != nil {
+func __VDLReadAnon_map_2(dec vdl.Decoder, x *map[string]SyncgroupMemberState) error {
+ if err := dec.StartValue(__VDLType_map_14); err != nil {
return err
}
- var tmpMap map[string]syncbase.SyncgroupMemberInfo
+ var tmpMap map[string]SyncgroupMemberState
if len := dec.LenHint(); len > 0 {
- tmpMap = make(map[string]syncbase.SyncgroupMemberInfo, len)
+ tmpMap = make(map[string]SyncgroupMemberState, len)
}
for {
switch done, key, err := dec.NextEntryValueString(); {
@@ -798,12 +889,12 @@
*x = tmpMap
return dec.FinishValue()
default:
- var elem syncbase.SyncgroupMemberInfo
+ var elem SyncgroupMemberState
if err := elem.VDLRead(dec); err != nil {
return err
}
if tmpMap == nil {
- tmpMap = make(map[string]syncbase.SyncgroupMemberInfo)
+ tmpMap = make(map[string]SyncgroupMemberState)
}
tmpMap[key] = elem
}
@@ -831,7 +922,7 @@
}
func (x CollectionPerms) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_map_14); err != nil {
+ if err := enc.StartValue(__VDLType_map_15); err != nil {
return err
}
if err := enc.SetLenHint(len(x)); err != nil {
@@ -852,7 +943,7 @@
}
func (x *CollectionPerms) VDLRead(dec vdl.Decoder) error {
- if err := dec.StartValue(__VDLType_map_14); err != nil {
+ if err := dec.StartValue(__VDLType_map_15); err != nil {
return err
}
var tmpMap CollectionPerms
@@ -902,7 +993,7 @@
}
func (x SgDeltaReq) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_16); err != nil {
+ if err := enc.StartValue(__VDLType_struct_17); err != nil {
return err
}
if x.DbId != (syncbase.Id{}) {
@@ -929,7 +1020,7 @@
func (x *SgDeltaReq) VDLRead(dec vdl.Decoder) error {
*x = SgDeltaReq{}
- if err := dec.StartValue(__VDLType_struct_16); err != nil {
+ if err := dec.StartValue(__VDLType_struct_17); err != nil {
return err
}
for {
@@ -983,7 +1074,7 @@
}
func (x DataDeltaReq) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_17); err != nil {
+ if err := enc.StartValue(__VDLType_struct_18); err != nil {
return err
}
if x.DbId != (syncbase.Id{}) {
@@ -1017,7 +1108,7 @@
}
func __VDLWriteAnon_set_3(enc vdl.Encoder, x map[GroupId]struct{}) error {
- if err := enc.StartValue(__VDLType_set_18); err != nil {
+ if err := enc.StartValue(__VDLType_set_19); err != nil {
return err
}
if err := enc.SetLenHint(len(x)); err != nil {
@@ -1036,7 +1127,7 @@
func (x *DataDeltaReq) VDLRead(dec vdl.Decoder) error {
*x = DataDeltaReq{}
- if err := dec.StartValue(__VDLType_struct_17); err != nil {
+ if err := dec.StartValue(__VDLType_struct_18); err != nil {
return err
}
for {
@@ -1068,7 +1159,7 @@
}
func __VDLReadAnon_set_3(dec vdl.Decoder, x *map[GroupId]struct{}) error {
- if err := dec.StartValue(__VDLType_set_18); err != nil {
+ if err := dec.StartValue(__VDLType_set_19); err != nil {
return err
}
var tmpMap map[GroupId]struct{}
@@ -1142,7 +1233,7 @@
}
func (x DeltaReqSgs) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_union_19); err != nil {
+ if err := enc.StartValue(__VDLType_union_20); err != nil {
return err
}
if err := enc.NextField("Sgs"); err != nil {
@@ -1158,7 +1249,7 @@
}
func (x DeltaReqData) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_union_19); err != nil {
+ if err := enc.StartValue(__VDLType_union_20); err != nil {
return err
}
if err := enc.NextField("Data"); err != nil {
@@ -1174,7 +1265,7 @@
}
func VDLReadDeltaReq(dec vdl.Decoder, x *DeltaReq) error {
- if err := dec.StartValue(__VDLType_union_19); err != nil {
+ if err := dec.StartValue(__VDLType_union_20); err != nil {
return err
}
f, err := dec.NextField()
@@ -1259,7 +1350,7 @@
}
func (x DeltaRespRec) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_union_20); err != nil {
+ if err := enc.StartValue(__VDLType_union_21); err != nil {
return err
}
if err := enc.NextField("Rec"); err != nil {
@@ -1275,7 +1366,7 @@
}
func (x DeltaRespGvs) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_union_20); err != nil {
+ if err := enc.StartValue(__VDLType_union_21); err != nil {
return err
}
if err := enc.NextField("Gvs"); err != nil {
@@ -1291,7 +1382,7 @@
}
func VDLReadDeltaResp(dec vdl.Decoder, x *DeltaResp) error {
- if err := dec.StartValue(__VDLType_union_20); err != nil {
+ if err := dec.StartValue(__VDLType_union_21); err != nil {
return err
}
f, err := dec.NextField()
@@ -1352,7 +1443,7 @@
}
func (x SgPriority) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_21); err != nil {
+ if err := enc.StartValue(__VDLType_struct_22); err != nil {
return err
}
if x.DevType != 0 {
@@ -1385,7 +1476,7 @@
func (x *SgPriority) VDLRead(dec vdl.Decoder) error {
*x = SgPriority{}
- if err := dec.StartValue(__VDLType_struct_21); err != nil {
+ if err := dec.StartValue(__VDLType_struct_22); err != nil {
return err
}
for {
@@ -1441,7 +1532,7 @@
}
func (x SgPriorities) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_map_22); err != nil {
+ if err := enc.StartValue(__VDLType_map_23); err != nil {
return err
}
if err := enc.SetLenHint(len(x)); err != nil {
@@ -1462,7 +1553,7 @@
}
func (x *SgPriorities) VDLRead(dec vdl.Decoder) error {
- if err := dec.StartValue(__VDLType_map_22); err != nil {
+ if err := dec.StartValue(__VDLType_map_23); err != nil {
return err
}
var tmpMap SgPriorities
@@ -1507,7 +1598,7 @@
}
func (x DeltaFinalResp) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_23); err != nil {
+ if err := enc.StartValue(__VDLType_struct_24); err != nil {
return err
}
if len(x.SgPriorities) != 0 {
@@ -1526,7 +1617,7 @@
func (x *DeltaFinalResp) VDLRead(dec vdl.Decoder) error {
*x = DeltaFinalResp{}
- if err := dec.StartValue(__VDLType_struct_23); err != nil {
+ if err := dec.StartValue(__VDLType_struct_24); err != nil {
return err
}
for {
@@ -1567,11 +1658,11 @@
}
func (x ChunkHash) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_24); err != nil {
+ if err := enc.StartValue(__VDLType_struct_25); err != nil {
return err
}
if len(x.Hash) != 0 {
- if err := enc.NextFieldValueBytes("Hash", __VDLType_list_25, x.Hash); err != nil {
+ if err := enc.NextFieldValueBytes("Hash", __VDLType_list_26, x.Hash); err != nil {
return err
}
}
@@ -1583,7 +1674,7 @@
func (x *ChunkHash) VDLRead(dec vdl.Decoder) error {
*x = ChunkHash{}
- if err := dec.StartValue(__VDLType_struct_24); err != nil {
+ if err := dec.StartValue(__VDLType_struct_25); err != nil {
return err
}
for {
@@ -1624,11 +1715,11 @@
}
func (x ChunkData) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_26); err != nil {
+ if err := enc.StartValue(__VDLType_struct_27); err != nil {
return err
}
if len(x.Data) != 0 {
- if err := enc.NextFieldValueBytes("Data", __VDLType_list_25, x.Data); err != nil {
+ if err := enc.NextFieldValueBytes("Data", __VDLType_list_26, x.Data); err != nil {
return err
}
}
@@ -1640,7 +1731,7 @@
func (x *ChunkData) VDLRead(dec vdl.Decoder) error {
*x = ChunkData{}
- if err := dec.StartValue(__VDLType_struct_26); err != nil {
+ if err := dec.StartValue(__VDLType_struct_27); err != nil {
return err
}
for {
@@ -1681,7 +1772,7 @@
}
func (x TimeReq) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_27); err != nil {
+ if err := enc.StartValue(__VDLType_struct_28); err != nil {
return err
}
if !x.SendTs.IsZero() {
@@ -1704,7 +1795,7 @@
func (x *TimeReq) VDLRead(dec vdl.Decoder) error {
*x = TimeReq{}
- if err := dec.StartValue(__VDLType_struct_27); err != nil {
+ if err := dec.StartValue(__VDLType_struct_28); err != nil {
return err
}
for {
@@ -1776,7 +1867,7 @@
}
func (x TimeResp) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_28); err != nil {
+ if err := enc.StartValue(__VDLType_struct_29); err != nil {
return err
}
if !x.OrigTs.IsZero() {
@@ -1845,7 +1936,7 @@
func (x *TimeResp) VDLRead(dec vdl.Decoder) error {
*x = TimeResp{}
- if err := dec.StartValue(__VDLType_struct_28); err != nil {
+ if err := dec.StartValue(__VDLType_struct_29); err != nil {
return err
}
for {
@@ -1924,7 +2015,7 @@
}
func (x BlobSharesBySyncgroup) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_map_29); err != nil {
+ if err := enc.StartValue(__VDLType_map_30); err != nil {
return err
}
if err := enc.SetLenHint(len(x)); err != nil {
@@ -1945,7 +2036,7 @@
}
func (x *BlobSharesBySyncgroup) VDLRead(dec vdl.Decoder) error {
- if err := dec.StartValue(__VDLType_map_29); err != nil {
+ if err := dec.StartValue(__VDLType_map_30); err != nil {
return err
}
var tmpMap BlobSharesBySyncgroup
@@ -2002,7 +2093,7 @@
}
func (x LocationData) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_30); err != nil {
+ if err := enc.StartValue(__VDLType_struct_31); err != nil {
return err
}
if !x.WhenSeen.IsZero() {
@@ -2035,7 +2126,7 @@
func (x *LocationData) VDLRead(dec vdl.Decoder) error {
*x = LocationData{}
- if err := dec.StartValue(__VDLType_struct_30); err != nil {
+ if err := dec.StartValue(__VDLType_struct_31); err != nil {
return err
}
for {
@@ -2089,7 +2180,7 @@
}
func (x PeerToLocationDataMap) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_map_31); err != nil {
+ if err := enc.StartValue(__VDLType_map_32); err != nil {
return err
}
if err := enc.SetLenHint(len(x)); err != nil {
@@ -2110,7 +2201,7 @@
}
func (x *PeerToLocationDataMap) VDLRead(dec vdl.Decoder) error {
- if err := dec.StartValue(__VDLType_map_31); err != nil {
+ if err := dec.StartValue(__VDLType_map_32); err != nil {
return err
}
var tmpMap PeerToLocationDataMap
@@ -2161,7 +2252,7 @@
}
func (x Signpost) VDLWrite(enc vdl.Encoder) error {
- if err := enc.StartValue(__VDLType_struct_32); err != nil {
+ if err := enc.StartValue(__VDLType_struct_33); err != nil {
return err
}
if len(x.Locations) != 0 {
@@ -2188,7 +2279,7 @@
func (x *Signpost) VDLRead(dec vdl.Decoder) error {
*x = Signpost{}
- if err := dec.StartValue(__VDLType_struct_32); err != nil {
+ if err := dec.StartValue(__VDLType_struct_33); err != nil {
return err
}
for {
@@ -3393,27 +3484,28 @@
__VDLType_struct_9 *vdl.Type
__VDLType_struct_10 *vdl.Type
__VDLType_struct_11 *vdl.Type
- __VDLType_map_12 *vdl.Type
+ __VDLType_struct_12 *vdl.Type
__VDLType_struct_13 *vdl.Type
__VDLType_map_14 *vdl.Type
- __VDLType_struct_15 *vdl.Type
+ __VDLType_map_15 *vdl.Type
__VDLType_struct_16 *vdl.Type
__VDLType_struct_17 *vdl.Type
- __VDLType_set_18 *vdl.Type
- __VDLType_union_19 *vdl.Type
+ __VDLType_struct_18 *vdl.Type
+ __VDLType_set_19 *vdl.Type
__VDLType_union_20 *vdl.Type
- __VDLType_struct_21 *vdl.Type
- __VDLType_map_22 *vdl.Type
- __VDLType_struct_23 *vdl.Type
+ __VDLType_union_21 *vdl.Type
+ __VDLType_struct_22 *vdl.Type
+ __VDLType_map_23 *vdl.Type
__VDLType_struct_24 *vdl.Type
- __VDLType_list_25 *vdl.Type
- __VDLType_struct_26 *vdl.Type
+ __VDLType_struct_25 *vdl.Type
+ __VDLType_list_26 *vdl.Type
__VDLType_struct_27 *vdl.Type
__VDLType_struct_28 *vdl.Type
- __VDLType_map_29 *vdl.Type
- __VDLType_struct_30 *vdl.Type
- __VDLType_map_31 *vdl.Type
- __VDLType_struct_32 *vdl.Type
+ __VDLType_struct_29 *vdl.Type
+ __VDLType_map_30 *vdl.Type
+ __VDLType_struct_31 *vdl.Type
+ __VDLType_map_32 *vdl.Type
+ __VDLType_struct_33 *vdl.Type
)
var __VDLInitCalled bool
@@ -3444,6 +3536,7 @@
vdl.Register((*LogRec)(nil))
vdl.Register((*GroupId)(nil))
vdl.Register((*SyncgroupStatus)(nil))
+ vdl.Register((*SyncgroupMemberState)(nil))
vdl.Register((*Syncgroup)(nil))
vdl.Register((*CollectionPerms)(nil))
vdl.Register((*SgDeltaReq)(nil))
@@ -3471,30 +3564,31 @@
__VDLType_struct_6 = vdl.TypeOf((*LogRec)(nil)).Elem()
__VDLType_string_7 = vdl.TypeOf((*GroupId)(nil))
__VDLType_enum_8 = vdl.TypeOf((*SyncgroupStatus)(nil))
- __VDLType_struct_9 = vdl.TypeOf((*Syncgroup)(nil)).Elem()
- __VDLType_struct_10 = vdl.TypeOf((*syncbase.Id)(nil)).Elem()
- __VDLType_struct_11 = vdl.TypeOf((*syncbase.SyncgroupSpec)(nil)).Elem()
- __VDLType_map_12 = vdl.TypeOf((*map[string]syncbase.SyncgroupMemberInfo)(nil))
- __VDLType_struct_13 = vdl.TypeOf((*syncbase.SyncgroupMemberInfo)(nil)).Elem()
- __VDLType_map_14 = vdl.TypeOf((*CollectionPerms)(nil))
- __VDLType_struct_15 = vdl.TypeOf((*access.AccessList)(nil)).Elem()
- __VDLType_struct_16 = vdl.TypeOf((*SgDeltaReq)(nil)).Elem()
- __VDLType_struct_17 = vdl.TypeOf((*DataDeltaReq)(nil)).Elem()
- __VDLType_set_18 = vdl.TypeOf((*map[GroupId]struct{})(nil))
- __VDLType_union_19 = vdl.TypeOf((*DeltaReq)(nil))
- __VDLType_union_20 = vdl.TypeOf((*DeltaResp)(nil))
- __VDLType_struct_21 = vdl.TypeOf((*SgPriority)(nil)).Elem()
- __VDLType_map_22 = vdl.TypeOf((*SgPriorities)(nil))
- __VDLType_struct_23 = vdl.TypeOf((*DeltaFinalResp)(nil)).Elem()
- __VDLType_struct_24 = vdl.TypeOf((*ChunkHash)(nil)).Elem()
- __VDLType_list_25 = vdl.TypeOf((*[]byte)(nil))
- __VDLType_struct_26 = vdl.TypeOf((*ChunkData)(nil)).Elem()
- __VDLType_struct_27 = vdl.TypeOf((*TimeReq)(nil)).Elem()
- __VDLType_struct_28 = vdl.TypeOf((*TimeResp)(nil)).Elem()
- __VDLType_map_29 = vdl.TypeOf((*BlobSharesBySyncgroup)(nil))
- __VDLType_struct_30 = vdl.TypeOf((*LocationData)(nil)).Elem()
- __VDLType_map_31 = vdl.TypeOf((*PeerToLocationDataMap)(nil))
- __VDLType_struct_32 = vdl.TypeOf((*Signpost)(nil)).Elem()
+ __VDLType_struct_9 = vdl.TypeOf((*SyncgroupMemberState)(nil)).Elem()
+ __VDLType_struct_10 = vdl.TypeOf((*syncbase.SyncgroupMemberInfo)(nil)).Elem()
+ __VDLType_struct_11 = vdl.TypeOf((*Syncgroup)(nil)).Elem()
+ __VDLType_struct_12 = vdl.TypeOf((*syncbase.Id)(nil)).Elem()
+ __VDLType_struct_13 = vdl.TypeOf((*syncbase.SyncgroupSpec)(nil)).Elem()
+ __VDLType_map_14 = vdl.TypeOf((*map[string]SyncgroupMemberState)(nil))
+ __VDLType_map_15 = vdl.TypeOf((*CollectionPerms)(nil))
+ __VDLType_struct_16 = vdl.TypeOf((*access.AccessList)(nil)).Elem()
+ __VDLType_struct_17 = vdl.TypeOf((*SgDeltaReq)(nil)).Elem()
+ __VDLType_struct_18 = vdl.TypeOf((*DataDeltaReq)(nil)).Elem()
+ __VDLType_set_19 = vdl.TypeOf((*map[GroupId]struct{})(nil))
+ __VDLType_union_20 = vdl.TypeOf((*DeltaReq)(nil))
+ __VDLType_union_21 = vdl.TypeOf((*DeltaResp)(nil))
+ __VDLType_struct_22 = vdl.TypeOf((*SgPriority)(nil)).Elem()
+ __VDLType_map_23 = vdl.TypeOf((*SgPriorities)(nil))
+ __VDLType_struct_24 = vdl.TypeOf((*DeltaFinalResp)(nil)).Elem()
+ __VDLType_struct_25 = vdl.TypeOf((*ChunkHash)(nil)).Elem()
+ __VDLType_list_26 = vdl.TypeOf((*[]byte)(nil))
+ __VDLType_struct_27 = vdl.TypeOf((*ChunkData)(nil)).Elem()
+ __VDLType_struct_28 = vdl.TypeOf((*TimeReq)(nil)).Elem()
+ __VDLType_struct_29 = vdl.TypeOf((*TimeResp)(nil)).Elem()
+ __VDLType_map_30 = vdl.TypeOf((*BlobSharesBySyncgroup)(nil))
+ __VDLType_struct_31 = vdl.TypeOf((*LocationData)(nil)).Elem()
+ __VDLType_map_32 = vdl.TypeOf((*PeerToLocationDataMap)(nil))
+ __VDLType_struct_33 = vdl.TypeOf((*Signpost)(nil)).Elem()
// Set error format strings.
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDupSyncgroupPublish.ID), "{1:}{2:} duplicate publish on syncgroup: {3}")
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 355d937..20dab50 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -90,13 +90,30 @@
// Syncgroup contains the state of a syncgroup.
type Syncgroup struct {
- Id wire.Id // the relative syncgroup Id chosen by app
- SpecVersion string // version on syncgroup spec for concurrency control
- Spec wire.SyncgroupSpec // app-given specification
- Creator string // Creator's Vanadium name
- DbId wire.Id // Globally unique database id
- Status SyncgroupStatus // Status of the syncgroup
- Joiners map[string]wire.SyncgroupMemberInfo // map of joiners to their metadata
+ Id wire.Id // the relative syncgroup Id chosen by app
+ SpecVersion string // version on syncgroup spec for concurrency control
+ Spec wire.SyncgroupSpec // app-given specification
+ Creator string // Creator's Vanadium name
+ DbId wire.Id // Globally unique database id
+ Status SyncgroupStatus // Status of the syncgroup
+ Joiners map[string]SyncgroupMemberState // map of joiners to their metadata
+}
+
+// SyncgroupMemberState contains information about a joiner and the internal bookkeeping
+// state required for resolving conflicts on this joiner's join/leave activity.
+type SyncgroupMemberState struct {
+ // Timestamp of when the member last joined/left the syncgroup. This timestamp is updated
+ // even when an existing member rejoins a syncgroup. Represented as Unix time.
+ WhenUpdated int64
+
+ // If set then this record indicates that this member has left the group. The SyncgroupMember
+ // entry is retained after a delete so that it can be used during conflict resolution, when
+ // one node indicates that the member has left the group and another says that the member
+ // is still in the group.
+ HasLeft bool
+
+ // Information supplied when requesting a join.
+ MemberInfo wire.SyncgroupMemberInfo
}
// CollectionPerms represent the persistent, synced permissions of a Collection.
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index c21b088..cf33a9b 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -745,8 +745,8 @@
if signpost.Locations[peer].IsServer {
keepPeer[peer] = true
} else {
- sgMemberInfo, joinerInSg := sg.Joiners[peer]
- if joinerInSg && (isVisible || sgMemberInfo.BlobDevType == byte(wire.BlobDevTypeServer)) {
+ sgMember, joinerInSg := sg.Joiners[peer]
+ if joinerInSg && (isVisible || sgMember.MemberInfo.BlobDevType == byte(wire.BlobDevTypeServer)) {
keepPeer[peer] = true
}
}
@@ -927,14 +927,14 @@
s.allMembersLock.Lock()
member := s.allMembers.members[peer]
if member != nil {
- var sgMemberInfoMaps map[wire.Id]sgMemberInfo = member.db2sg // All dbId entries.
+ var sgMemberInfoMaps map[wire.Id]sgMember = member.db2sg // All dbId entries.
if dbId != (wire.Id{}) {
// If dbId was specified, pick that one entry.
- sgMemberInfoMaps = map[wire.Id]sgMemberInfo{dbId: member.db2sg[dbId]}
+ sgMemberInfoMaps = map[wire.Id]sgMember{dbId: member.db2sg[dbId]}
}
for _, sgMemberInfoMap := range sgMemberInfoMaps {
for gid := range sgs {
- if int32(sgMemberInfoMap[gid].BlobDevType) == wire.BlobDevTypeServer {
+ if int32(sgMemberInfoMap[gid].MemberInfo.BlobDevType) == wire.BlobDevTypeServer {
serverSgsForPeer[gid] = struct{}{}
}
}
diff --git a/services/syncbase/vsync/cr_app_resolves.go b/services/syncbase/vsync/cr_app_resolves.go
index 789081d..3d1fd9d 100644
--- a/services/syncbase/vsync/cr_app_resolves.go
+++ b/services/syncbase/vsync/cr_app_resolves.go
@@ -195,9 +195,8 @@
// conflict, the local version is supposed to be its ancestor.
// To avoid a dag cycle, we treat it as a createNew.
res.ty = createNew
- // TODO(jlodhia):[correctness] Use vclock to create the write
- // timestamp.
- timestamp := time.Now()
+ now, _ := iSt.tx.St.Clock.Now()
+ timestamp := now
dagNode := getNodeOrFail(ctx, iSt.tx, oid, conflictState.oldHead)
if !dagNode.Deleted {
res.val = getObjectAtVer(ctx, iSt, oid, conflictState.oldHead)
diff --git a/services/syncbase/vsync/cr_permissions.go b/services/syncbase/vsync/cr_permissions.go
index 105f8f3..269e168 100644
--- a/services/syncbase/vsync/cr_permissions.go
+++ b/services/syncbase/vsync/cr_permissions.go
@@ -33,7 +33,6 @@
local[tag].NotIn,
remote[tag].NotIn,
ancestor[tag].NotIn)
-
// TODO(fredq): remove this when it is added to Normalize
if len(in) != 0 || len(notIn) != 0 {
result[tag] = access.AccessList{In: in, NotIn: notIn}
@@ -79,7 +78,6 @@
}
}
- //
// Convert the map back to a slice, sort it, and return it.
return set.String.ToSlice(ls)
}
diff --git a/services/syncbase/vsync/cr_permissions_test.go b/services/syncbase/vsync/cr_permissions_test.go
index b17b230..7b37e92 100644
--- a/services/syncbase/vsync/cr_permissions_test.go
+++ b/services/syncbase/vsync/cr_permissions_test.go
@@ -154,6 +154,5 @@
}
func assertPermsEqual(t *testing.T, expected, actual access.Permissions) {
- assert.Equal(t, expected, actual)
assert.True(t, reflect.DeepEqual(expected, actual))
}
diff --git a/services/syncbase/vsync/cr_syncgroup.go b/services/syncbase/vsync/cr_syncgroup.go
new file mode 100644
index 0000000..508ad91
--- /dev/null
+++ b/services/syncbase/vsync/cr_syncgroup.go
@@ -0,0 +1,177 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/services/syncbase"
+ "v.io/v23/verror"
+ "v.io/x/ref/services/syncbase/server/interfaces"
+)
+
+// resolveSyncgroup performs fine-grained conflict resolution of Syncgroups.
+
+// Immutable Fields: Id, Creator, DbId
+// Fields where latest wins: none
+// Custom logic: SpecVersion, Spec, Joiners, Status
+//
+// Joiners: a map of joinerSyncbaseName->SyncgroupMemberState. Each joiner is merged using latest
+// wins according to the WhenUpdated field of the SyncgroupMemberState.
+//
+// Status: Running > Rejected > Pending
+//
+// Spec: if no conflict take remote or local depending on which one changed, otherwise
+// perform a three-way merge of the Spec.
+//
+// SpecVersion: the version of the chosen Spec is used, and if the Spec was the result of a
+// merge then the version is left as NoVersion and the caller is responsible for generating a
+// new version when needed. Generally this happens when the syncgroup object is persisted.
+//
+// preferLocal This is derived from the versions of the local, remote, and ancestor and
+// is true if the local is to be considered the latest, and false if the remote
+// is the latest.
+func resolveSyncgroup(ctx *context.T, preferLocal bool, local, remote, ancestor *interfaces.Syncgroup) (*interfaces.Syncgroup, error) {
+ // Validate the immutable fields.
+ if ancestor == nil {
+ ancestor = &interfaces.Syncgroup{}
+ } else {
+ if err := validateImmutableFields(ctx, ancestor, local); err != nil {
+ return nil, err
+ }
+ }
+ if err := validateImmutableFields(ctx, remote, local); err != nil {
+ return nil, err
+ }
+
+ // Create a Syncgroup to hold the result of the merge.
+ result := &interfaces.Syncgroup{Id: local.Id, Creator: local.Creator, DbId: local.DbId}
+ result.Status = resolveStatus(local.Status, remote.Status, ancestor.Status)
+ result.Joiners = resolveJoiners(preferLocal, local.Joiners, remote.Joiners, ancestor.Joiners)
+ result.SpecVersion, result.Spec = resolveSpec(preferLocal,
+ local.SpecVersion, remote.SpecVersion, ancestor.SpecVersion,
+ local.Spec, remote.Spec, ancestor.Spec)
+
+ return result, nil
+}
+
+func validateImmutableFields(ctx *context.T, a, b *interfaces.Syncgroup) error {
+ if a.Id != b.Id {
+ return verror.New(verror.ErrBadState, ctx, "illegal change to the Id")
+ }
+ if a.Creator != b.Creator {
+ return verror.New(verror.ErrBadState, ctx, "illegal change to the Creator")
+ }
+ if a.DbId != b.DbId {
+ return verror.New(verror.ErrBadState, ctx, "illegal change to the DbId")
+ }
+ if !sameCollections(a.Spec.Collections, b.Spec.Collections) {
+ return verror.New(verror.ErrBadState, ctx, "illegal change to the Collections set")
+ }
+ return nil
+}
+
+// Spec:
+// Immutable: Prefixes
+// Latest Wins: Description, PublishSyncbaseName, IsPrivate
+// Custom: Perms, MountTables
+//
+// Perms: standard Permissions 3-way merging
+// MountTables: standard slice 3-way merging
+//
+// If the SyncgroupSpec version is NoVersion then the caller is responsible for generating a
+// new one if needed.
+func resolveSpec(preferLocal bool, vlocal, vremote, vancestor string, local, remote, ancestor syncbase.SyncgroupSpec) (string, syncbase.SyncgroupSpec) {
+ // If the versions are the same then pick any, local is fine.
+ // If only local or remote changed then use it.
+ // Otherwise merge the Spec.
+ if vlocal == vremote {
+ return vlocal, local
+ }
+
+ if vlocal == vancestor {
+ return vremote, remote
+ }
+
+ if vremote == vancestor {
+ return vlocal, local
+ }
+
+ // Both local and remote changed from the ancestor, perform a three-way merge and indicate that a new version number is needed.
+ vresult := NoVersion
+ result := syncbase.SyncgroupSpec{}
+ result.Collections = local.Collections
+ if preferLocal {
+ result.Description = local.Description
+ result.PublishSyncbaseName = local.PublishSyncbaseName
+ result.IsPrivate = local.IsPrivate
+ } else {
+ result.Description = remote.Description
+ result.PublishSyncbaseName = remote.PublishSyncbaseName
+ result.IsPrivate = remote.IsPrivate
+ }
+ result.Perms = resolvePermissions(local.Perms, remote.Perms, ancestor.Perms)
+ result.MountTables = resolveSlice(local.MountTables, remote.MountTables, ancestor.MountTables)
+ return vresult, result
+}
+
+// resolveStatus merges Status using the rule: Running > Rejected > Pending. If any
+// of the inputs is Running then pick Running, if any are Rejected then pick Rejected,
+// otherwise pick Pending.
+func resolveStatus(local, remote, ancestor interfaces.SyncgroupStatus) interfaces.SyncgroupStatus {
+ if local == interfaces.SyncgroupStatusRunning ||
+ remote == interfaces.SyncgroupStatusRunning ||
+ ancestor == interfaces.SyncgroupStatusRunning {
+ return interfaces.SyncgroupStatusRunning
+ } else if local == interfaces.SyncgroupStatusPublishRejected ||
+ remote == interfaces.SyncgroupStatusPublishRejected ||
+ ancestor == interfaces.SyncgroupStatusPublishRejected {
+ return interfaces.SyncgroupStatusPublishRejected
+ } else {
+ return interfaces.SyncgroupStatusPublishPending
+ }
+}
+
+// resolveJoiners performs a granular merge of the members of a syncgroup, choosing the
+// local or remote version of the syncgroup information based on latest wins.
+// The new set of members is the union of those in the local and remote sets.
+// Each member is then individually merged preferring the one with the later WhenUpdated timestamp.
+// TODO(fredq): we need to garbage collect the members after they have left the group for a while.
+func resolveJoiners(preferLocal bool, local, remote, ancestor map[string]interfaces.SyncgroupMemberState) (result map[string]interfaces.SyncgroupMemberState) {
+ result = make(map[string]interfaces.SyncgroupMemberState)
+ newMembers := map[string]bool{}
+ for member, _ := range local {
+ newMembers[member] = true
+ }
+ for member, _ := range remote {
+ newMembers[member] = true
+ }
+ for key, _ := range newMembers {
+ _, inL := local[key]
+ _, inR := remote[key]
+ if !inL {
+ result[key] = remote[key]
+ } else if !inR {
+ result[key] = local[key]
+ } else if local[key].WhenUpdated == remote[key].WhenUpdated {
+ // The member is in both remote and local and the update times are the same,
+ // choose one based on the value of preferLocal. The MemberInfo should
+ // be the same in this case.
+ if preferLocal {
+ result[key] = local[key]
+ } else {
+ result[key] = remote[key]
+ }
+ } else {
+ // The member is in both remote and local but the update times are
+ // different, choose the one with the later update time.
+ if local[key].WhenUpdated > remote[key].WhenUpdated {
+ result[key] = local[key]
+ } else {
+ result[key] = remote[key]
+ }
+ }
+ }
+ return
+}
diff --git a/services/syncbase/vsync/cr_syncgroup_test.go b/services/syncbase/vsync/cr_syncgroup_test.go
new file mode 100644
index 0000000..88b8239
--- /dev/null
+++ b/services/syncbase/vsync/cr_syncgroup_test.go
@@ -0,0 +1,363 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "github.com/stretchr/testify/assert"
+ "sort"
+ "testing"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
+ wire "v.io/v23/services/syncbase"
+ "v.io/x/ref/services/syncbase/server/interfaces"
+)
+
+func TestResolveSyncgroup(t *testing.T) {
+ ancestor := interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ SpecVersion: "1",
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{
+ "R": access.AccessList{In: []security.BlessingPattern{"alice", "bob"}, NotIn: []string{"bob:bad"}},
+ "W": access.AccessList{In: []security.BlessingPattern{"alice", "bob"}, NotIn: []string{"bob:bad"}},
+ },
+ Collections: []wire.Id{wire.Id{"a", "b"}},
+ MountTables: []string{"a", "b", "c"},
+ IsPrivate: false,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusPublishPending,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ "adminc": mkSm(100, false, 123, wire.BlobDevTypeNormal),
+ },
+ }
+
+ local := interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ SpecVersion: "2",
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{
+ "R": access.AccessList{In: []security.BlessingPattern{"alice", "bob"}, NotIn: []string{"bob:bad"}},
+ "W": access.AccessList{In: []security.BlessingPattern{"alice", "bob", "carol"}, NotIn: []string{"bob:bad"}},
+ },
+ Collections: []wire.Id{wire.Id{"a", "b"}},
+ MountTables: []string{"a", "b", "d"},
+ IsPrivate: true,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusRunning,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ "admind": mkSm(100, false, 124, wire.BlobDevTypeNormal),
+ },
+ }
+
+ remote := interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ SpecVersion: "3",
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg2",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{
+ "R": access.AccessList{In: []security.BlessingPattern{"alice", "bob"}, NotIn: []string{"bob:bad"}},
+ "W": access.AccessList{In: []security.BlessingPattern{"bob"}, NotIn: []string{"bob:bad"}},
+ },
+ Collections: []wire.Id{wire.Id{"a", "b"}},
+ MountTables: []string{"b", "c", "e"},
+ IsPrivate: false,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusPublishRejected,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ "admine": mkSm(100, false, 125, wire.BlobDevTypeNormal),
+ },
+ }
+
+ expected := interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg2",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{
+ "R": access.AccessList{In: []security.BlessingPattern{"alice", "bob"}, NotIn: []string{"bob:bad"}},
+ "W": access.AccessList{In: []security.BlessingPattern{"bob", "carol"}, NotIn: []string{"bob:bad"}},
+ },
+ Collections: []wire.Id{wire.Id{"a", "b"}},
+ MountTables: []string{"b", "d", "e"},
+ IsPrivate: false,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusRunning,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ "admind": mkSm(100, false, 124, wire.BlobDevTypeNormal),
+ "admine": mkSm(100, false, 125, wire.BlobDevTypeNormal),
+ },
+ }
+
+ result, err := resolveSyncgroup(nil, false, &local, &remote, &ancestor)
+ if err != nil {
+ t.Fatalf("merge failed, %v", err)
+ }
+ assertSyncgroupsEqual(t, expected, *result)
+
+ expected = interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{
+ "R": access.AccessList{In: []security.BlessingPattern{"alice", "bob"}, NotIn: []string{"bob:bad"}},
+ "W": access.AccessList{In: []security.BlessingPattern{"bob", "carol"}, NotIn: []string{"bob:bad"}},
+ },
+ Collections: []wire.Id{wire.Id{"a", "b"}},
+ MountTables: []string{"b", "d", "e"},
+ IsPrivate: true,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusRunning,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ "admind": mkSm(100, false, 124, wire.BlobDevTypeNormal),
+ "admine": mkSm(100, false, 125, wire.BlobDevTypeNormal),
+ },
+ }
+ result, err = resolveSyncgroup(nil, true, &local, &remote, &ancestor)
+ if err != nil {
+ t.Fatalf("merge failed, %v", err)
+ }
+ assertSyncgroupsEqual(t, expected, *result)
+
+ // try again with no ancestor
+ expected = interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{
+ "R": access.AccessList{In: []security.BlessingPattern{"alice", "bob"}, NotIn: []string{"bob:bad"}},
+ "W": access.AccessList{In: []security.BlessingPattern{"alice", "bob", "carol"}, NotIn: []string{"bob:bad"}},
+ },
+ Collections: []wire.Id{wire.Id{"a", "b"}},
+ MountTables: []string{"a", "b", "c", "d", "e"},
+ IsPrivate: true,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusRunning,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ "admind": mkSm(100, false, 124, wire.BlobDevTypeNormal),
+ "admine": mkSm(100, false, 125, wire.BlobDevTypeNormal),
+ },
+ }
+
+ result, err = resolveSyncgroup(nil, true, &local, &remote, nil)
+ if err != nil {
+ t.Fatalf("merge failed, %v", err)
+ }
+ assertSyncgroupsEqual(t, expected, *result)
+}
+
+func TestResolveSyncgroupImmutablePrefix(t *testing.T) {
+ ancestor := interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ SpecVersion: "1",
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{},
+ Collections: []wire.Id{wire.Id{"a", "b"}, wire.Id{"b", "c"}},
+ MountTables: []string{},
+ IsPrivate: false,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusPublishPending,
+ Joiners: map[string]interfaces.SyncgroupMemberState{},
+ }
+
+ local := interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ SpecVersion: "1",
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{},
+ Collections: []wire.Id{wire.Id{"r", "b"}, wire.Id{"b", "c"}},
+ MountTables: []string{},
+ IsPrivate: false,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusPublishPending,
+ Joiners: map[string]interfaces.SyncgroupMemberState{},
+ }
+
+ remote := interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ SpecVersion: "1",
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{},
+ Collections: []wire.Id{wire.Id{"a", "b"}, wire.Id{"b", "c"}},
+ MountTables: []string{},
+ IsPrivate: false,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusPublishPending,
+ Joiners: map[string]interfaces.SyncgroupMemberState{},
+ }
+
+ if result, err := resolveSyncgroup(nil, false, &local, &remote, &ancestor); err == nil {
+ if sameCollections(ancestor.Spec.Collections, result.Spec.Collections) {
+ t.Fatalf("merge should have produced an incorrect collection merge, is now %+v", result.Spec.Collections)
+ }
+ }
+
+ // fix the Prefixes, merge call should succeed
+ local.Spec.Collections = ancestor.Spec.Collections
+ if _, err := resolveSyncgroup(nil, false, &local, &remote, &ancestor); err != nil {
+ t.Fatalf("merge failed, %+v", err)
+ }
+
+ // test the local id
+ local.Id = wire.Id{"something", "else"} // change the local id to something incorrect
+ if result, err := resolveSyncgroup(nil, false, &local, &remote, &ancestor); err == nil {
+ if sameCollections(ancestor.Spec.Collections, result.Spec.Collections) {
+ t.Fatalf("merge should have produced an incorrect collection merge, is now %+v", result.Spec.Collections)
+ }
+ }
+ local.Id = ancestor.Id // change it back
+ if _, err := resolveSyncgroup(nil, false, &local, &remote, &ancestor); err != nil {
+ t.Fatalf("merge failed, %+v", err)
+ }
+}
+
+func TestResolveSyncgroupJoinersFavorDelete(t *testing.T) {
+ ancestor := mkTestSg()
+ ancestor.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ }
+
+ local := mkTestSg()
+ local.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(110, false, 122, wire.BlobDevTypeLeaf),
+ }
+
+ remote := mkTestSg()
+ remote.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(105, true, 122, wire.BlobDevTypeLeaf),
+ }
+
+ expected := mkTestSg()
+ expected.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(110, false, 122, wire.BlobDevTypeLeaf),
+ }
+
+ result, err := resolveSyncgroup(nil, false, &local, &remote, &ancestor)
+ if err != nil {
+ t.Fatalf("merge failed, %v", err)
+ }
+ assertSyncgroupsEqual(t, expected, *result)
+}
+
+func TestResolveSyncgroupJoinersFavorJoin(t *testing.T) {
+ ancestor := mkTestSg()
+ ancestor.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ }
+
+ local := mkTestSg()
+ local.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(110, false, 122, wire.BlobDevTypeLeaf),
+ }
+
+ remote := mkTestSg()
+ remote.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(115, true, 122, wire.BlobDevTypeLeaf),
+ }
+
+ expected := mkTestSg()
+ expected.Joiners = map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(115, true, 122, wire.BlobDevTypeLeaf),
+ }
+
+ result, err := resolveSyncgroup(nil, false, &local, &remote, &ancestor)
+ if err != nil {
+ t.Fatalf("merge failed, %v", err)
+ }
+ assertSyncgroupsEqual(t, expected, *result)
+}
+
+func assertSyncgroupsEqual(t *testing.T, expected, actual interfaces.Syncgroup) {
+ assert.Equal(t, expected.Id, actual.Id)
+ assert.Equal(t, expected.Creator, actual.Creator)
+ assert.Equal(t, expected.DbId, actual.DbId)
+ assert.Equal(t, expected.Status, actual.Status)
+ assert.Equal(t, expected.Joiners, actual.Joiners)
+ assert.Equal(t, expected.Spec.Description, actual.Spec.Description)
+ assert.Equal(t, expected.Spec.PublishSyncbaseName, actual.Spec.PublishSyncbaseName)
+ assertPermsEqual(t, expected.Spec.Perms, actual.Spec.Perms)
+ assert.Equal(t, expected.Spec.Collections, actual.Spec.Collections)
+ sort.Strings(expected.Spec.MountTables)
+ sort.Strings(actual.Spec.MountTables)
+ assert.Equal(t, expected.Spec.MountTables, actual.Spec.MountTables)
+ assert.Equal(t, expected.Spec.IsPrivate, actual.Spec.IsPrivate)
+}
+
+func mkSm(when int64, hasLeft bool, priority, blobType int32) interfaces.SyncgroupMemberState {
+ return interfaces.SyncgroupMemberState{WhenUpdated: when, HasLeft: hasLeft, MemberInfo: wire.SyncgroupMemberInfo{byte(priority), byte(blobType)}}
+}
+
+func mkTestSg() interfaces.Syncgroup {
+ return interfaces.Syncgroup{
+ Id: wire.Id{"name1", "blessing2"},
+ SpecVersion: "1",
+ Spec: wire.SyncgroupSpec{
+ Description: "mysg",
+ PublishSyncbaseName: "mysb",
+ Perms: access.Permissions{},
+ Collections: []wire.Id{wire.Id{}},
+ MountTables: []string{},
+ IsPrivate: false,
+ },
+ Creator: "mysb",
+ DbId: wire.Id{"name3", "blessing4"},
+ Status: interfaces.SyncgroupStatusPublishPending,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "admina": mkSm(100, false, 121, wire.BlobDevTypeServer),
+ "adminb": mkSm(100, false, 122, wire.BlobDevTypeLeaf),
+ "adminc": mkSm(100, false, 123, wire.BlobDevTypeNormal),
+ },
+ }
+}
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 36d0347..85662cf 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -101,7 +101,7 @@
}
// getDBDeltas performs an initiation round for the specified database.
-func (s *syncService) getDBDeltas(ctx *context.T, dbId wire.Id, info sgMemberInfo, peer connInfo) (connInfo, error) {
+func (s *syncService) getDBDeltas(ctx *context.T, dbId wire.Id, info sgMember, peer connInfo) (connInfo, error) {
vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer %v for db %v", peer, dbId)
defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer %v for db %v", peer, dbId)
@@ -348,7 +348,7 @@
// newInitiatonConfig creates new initiation config. This will be shared between
// the two sync rounds in the initiation round of a Database.
-func newInitiationConfig(ctx *context.T, s *syncService, dbId wire.Id, info sgMemberInfo, peer connInfo) (*initiationConfig, error) {
+func newInitiationConfig(ctx *context.T, s *syncService, dbId wire.Id, info sgMember, peer connInfo) (*initiationConfig, error) {
c := &initiationConfig{
peer: peer,
// Note: allSgPfxs and sharedSgPfxs will be inited during syncgroup
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 6939aee..b8eb1ea 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -331,16 +331,20 @@
}
}()
+ now, err := svc.vclock.Now()
+ if err != nil {
+ t.Fatalf("unable to get time: %v\n", err)
+ }
s := svc.sync
s.id = 10 // initiator
sgId1 := interfaces.GroupId("1234")
nullInfo := wire.SyncgroupMemberInfo{}
- sgInfo := sgMemberInfo{
- sgId1: nullInfo,
+ sgInfo := sgMember{
+ sgId1: interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), MemberInfo: nullInfo},
}
info := &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
mockDbId: sgInfo,
},
mtTables: map[string]struct{}{
@@ -359,9 +363,9 @@
Perms: mockSgPerms,
MountTables: []string{"1/2/3/4", "5/6/7/8"},
},
- Joiners: map[string]wire.SyncgroupMemberInfo{
- "a": nullInfo,
- "b": nullInfo,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "a": interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), MemberInfo: nullInfo},
+ "b": interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), MemberInfo: nullInfo},
},
}
diff --git a/services/syncbase/vsync/peer_manager_test.go b/services/syncbase/vsync/peer_manager_test.go
index 3fd6641..b1e36a1 100644
--- a/services/syncbase/vsync/peer_manager_test.go
+++ b/services/syncbase/vsync/peer_manager_test.go
@@ -34,6 +34,10 @@
}
// Add one joiner.
+ now, err := s.vclock.Now()
+ if err != nil {
+ t.Fatalf("unable to get time: %v\n", err)
+ }
nullInfo := wire.SyncgroupMemberInfo{}
sg := &interfaces.Syncgroup{
Id: wire.Id{Name: "sg", Blessing: "blessing"},
@@ -45,13 +49,13 @@
Perms: mockSgPerms,
MountTables: []string{"1/2/3/4", "5/6/7/8"},
},
- Joiners: map[string]wire.SyncgroupMemberInfo{
- "a": nullInfo,
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "a": interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), MemberInfo: nullInfo},
},
}
tx := createDatabase(t, svc).St().NewWatchableTransaction()
- if err := s.addSyncgroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err != nil {
+ if err = s.addSyncgroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err != nil {
t.Fatalf("cannot add syncgroup %v, err %v", sg.Id, err)
}
if err := tx.Commit(); err != nil {
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 05a1000..21ac9f3 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -60,12 +60,12 @@
// It also maintains all the mount table candidates that could be used to reach
// this peer, learned from the syncgroup metadata.
type memberInfo struct {
- db2sg map[wire.Id]sgMemberInfo
+ db2sg map[wire.Id]sgMember
mtTables map[string]struct{}
}
-// sgMemberInfo maps syncgroups to their member metadata.
-type sgMemberInfo map[interfaces.GroupId]wire.SyncgroupMemberInfo
+// sgMember maps syncgroups to their member metadata.
+type sgMember map[interfaces.GroupId]interfaces.SyncgroupMemberState
// newSyncgroupVersion generates a random syncgroup version ("etag").
func (s *syncService) newSyncgroupVersion() string {
@@ -168,8 +168,8 @@
// low, so the "Distance" metric is non-zero.
psg := blob.PerSyncgroup{Priority: interfaces.SgPriority{DevType: wire.BlobDevTypeNormal, Distance: 1}}
if myInfo, ok := sg.Joiners[s.name]; ok {
- psg.Priority.DevType = int32(myInfo.BlobDevType)
- psg.Priority.Distance = float32(myInfo.BlobDevType)
+ psg.Priority.DevType = int32(myInfo.MemberInfo.BlobDevType)
+ psg.Priority.Distance = float32(myInfo.MemberInfo.BlobDevType)
}
if err := s.bst.SetPerSyncgroup(ctx, gid, &psg); err != nil {
return err
@@ -382,12 +382,12 @@
for member, info := range sg.Joiners {
if _, ok := newMembers[member]; !ok {
newMembers[member] = &memberInfo{
- db2sg: make(map[wire.Id]sgMemberInfo),
+ db2sg: make(map[wire.Id]sgMember),
mtTables: make(map[string]struct{}),
}
}
if _, ok := newMembers[member].db2sg[dbId]; !ok {
- newMembers[member].db2sg[dbId] = make(sgMemberInfo)
+ newMembers[member].db2sg[dbId] = make(sgMember)
}
newMembers[member].db2sg[dbId][gid] = info
@@ -462,11 +462,11 @@
// Make a copy.
infoCopy := &memberInfo{
- db2sg: make(map[wire.Id]sgMemberInfo),
+ db2sg: make(map[wire.Id]sgMember),
mtTables: make(map[string]struct{}),
}
for dbId, sgInfo := range info.db2sg {
- infoCopy.db2sg[dbId] = make(sgMemberInfo)
+ infoCopy.db2sg[dbId] = make(sgMember)
for gid, mi := range sgInfo {
infoCopy.db2sg[dbId][gid] = mi
}
@@ -670,6 +670,11 @@
ss := sd.sync.(*syncService)
dbId := sd.db.Id()
+ now, err := ss.vclock.Now()
+ if err != nil {
+ return err
+ }
+ sm := interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), HasLeft: false, MemberInfo: myInfo}
// Instantiate sg. Add self as joiner.
// TODO(ivanpi): Spec is sanity checked later in addSyncgroup. Do it here
// instead to fail early?
@@ -681,10 +686,10 @@
Creator: ss.name,
DbId: dbId,
Status: interfaces.SyncgroupStatusPublishPending,
- Joiners: map[string]wire.SyncgroupMemberInfo{ss.name: myInfo},
+ Joiners: map[string]interfaces.SyncgroupMemberState{ss.name: sm},
}
- err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
+ err = watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
return err
@@ -968,7 +973,11 @@
// TODO(hpucha): Check syncgroup ACL.
vlog.VI(2).Infof("sync: GetSyncgroupMembers: %v members %v, len %v", sgId, sg.Joiners, len(sg.Joiners))
- return sg.Joiners, nil
+ joiners := make(map[string]wire.SyncgroupMemberInfo)
+ for key, value := range sg.Joiners {
+ joiners[key] = value.MemberInfo
+ }
+ return joiners, nil
}
func (sd *syncDatabase) SetSyncgroupSpec(ctx *context.T, call rpc.ServerCall, sgId wire.Id, spec wire.SyncgroupSpec, version string) error {
@@ -1159,7 +1168,13 @@
sg.Status = status
if status == interfaces.SyncgroupStatusRunning {
// TODO(hpucha): Default priority?
- sg.Joiners[peer] = wire.SyncgroupMemberInfo{}
+ // TODO(fredq): shouldn't the PublishSyncgroup() call return the MemberInfo?
+ memberInfo := wire.SyncgroupMemberInfo{}
+ now, err := ss.vclock.Now()
+ if err != nil {
+ return err
+ }
+ sg.Joiners[peer] = interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), HasLeft: false, MemberInfo: memberInfo}
}
return ss.updateSyncgroupVersioning(ctx, tx, gid, NoVersion, true, ss.id, gen, pos, sg)
@@ -1470,7 +1485,11 @@
gen, pos = s.reserveGenAndPosInDbLog(ctx, dbId, sgoid, 1)
// Add to joiner list.
- sg.Joiners[joinerName] = joinerInfo
+ now, err := s.vclock.Now()
+ if err != nil {
+ return err
+ }
+ sg.Joiners[joinerName] = interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), HasLeft: false, MemberInfo: joinerInfo}
return s.updateSyncgroupVersioning(ctx, tx, gid, version, true, s.id, gen, pos, sg)
})
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 12c6f8d..2387409 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -68,10 +68,10 @@
Collections: []wire.Id{makeCxId("foo"), makeCxId("bar")},
Perms: mockSgPerms,
},
- Joiners: map[string]wire.SyncgroupMemberInfo{
- "phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
- "tablet": wire.SyncgroupMemberInfo{SyncPriority: 25},
- "cloud": wire.SyncgroupMemberInfo{SyncPriority: 1},
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "phone": wrap(wire.SyncgroupMemberInfo{SyncPriority: 10}),
+ "tablet": wrap(wire.SyncgroupMemberInfo{SyncPriority: 25}),
+ "cloud": wrap(wire.SyncgroupMemberInfo{SyncPriority: 1}),
},
}
@@ -122,7 +122,7 @@
if len(mi.db2sg) != 1 {
t.Errorf("invalid info for syncgroup member %s: %v", mm, mi)
}
- var sgmi sgMemberInfo
+ var sgmi sgMember
for _, v := range mi.db2sg {
sgmi = v
break
@@ -191,10 +191,10 @@
Collections: []wire.Id{makeCxId("foo"), makeCxId("bar")},
Perms: mockSgPerms,
},
- Joiners: map[string]wire.SyncgroupMemberInfo{
- "phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
- "tablet": wire.SyncgroupMemberInfo{SyncPriority: 25},
- "cloud": wire.SyncgroupMemberInfo{SyncPriority: 1},
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "phone": wrap(wire.SyncgroupMemberInfo{SyncPriority: 10}),
+ "tablet": wrap(wire.SyncgroupMemberInfo{SyncPriority: 25}),
+ "cloud": wrap(wire.SyncgroupMemberInfo{SyncPriority: 1}),
},
}
}
@@ -284,10 +284,10 @@
Collections: []wire.Id{makeCxId("foo"), makeCxId("bar")},
Perms: mockSgPerms,
},
- Joiners: map[string]wire.SyncgroupMemberInfo{
- "phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
- "tablet": wire.SyncgroupMemberInfo{SyncPriority: 25},
- "cloud": wire.SyncgroupMemberInfo{SyncPriority: 1},
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "phone": wrap(wire.SyncgroupMemberInfo{SyncPriority: 10}),
+ "tablet": wrap(wire.SyncgroupMemberInfo{SyncPriority: 25}),
+ "cloud": wrap(wire.SyncgroupMemberInfo{SyncPriority: 1}),
},
}
@@ -368,10 +368,10 @@
Collections: []wire.Id{makeCxId("foo")},
Perms: mockSgPerms,
},
- Joiners: map[string]wire.SyncgroupMemberInfo{
- "phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
- "tablet": wire.SyncgroupMemberInfo{SyncPriority: 25},
- "cloud": wire.SyncgroupMemberInfo{SyncPriority: 1},
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "phone": wrap(wire.SyncgroupMemberInfo{SyncPriority: 10}),
+ "tablet": wrap(wire.SyncgroupMemberInfo{SyncPriority: 25}),
+ "cloud": wrap(wire.SyncgroupMemberInfo{SyncPriority: 1}),
},
}
sg2 := &interfaces.Syncgroup{
@@ -384,10 +384,10 @@
Collections: []wire.Id{makeCxId("bar")},
Perms: mockSgPerms,
},
- Joiners: map[string]wire.SyncgroupMemberInfo{
- "tablet": wire.SyncgroupMemberInfo{SyncPriority: 111},
- "door": wire.SyncgroupMemberInfo{SyncPriority: 33},
- "lamp": wire.SyncgroupMemberInfo{SyncPriority: 9},
+ Joiners: map[string]interfaces.SyncgroupMemberState{
+ "tablet": wrap(wire.SyncgroupMemberInfo{SyncPriority: 111}),
+ "door": wrap(wire.SyncgroupMemberInfo{SyncPriority: 33}),
+ "lamp": wrap(wire.SyncgroupMemberInfo{SyncPriority: 9}),
},
}
@@ -427,16 +427,16 @@
expMemberInfo := map[string]*memberInfo{
"phone": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal1: sg1.Joiners["phone"],
},
},
mtTables: map[string]struct{}{"mt1": struct{}{}},
},
"tablet": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal1: sg1.Joiners["tablet"],
sgIdInternal2: sg2.Joiners["tablet"],
},
@@ -448,24 +448,24 @@
},
},
"cloud": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal1: sg1.Joiners["cloud"],
},
},
mtTables: map[string]struct{}{"mt1": struct{}{}},
},
"door": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal2: sg2.Joiners["door"],
},
},
mtTables: mt2and3,
},
"lamp": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal2: sg2.Joiners["lamp"],
},
},
@@ -508,24 +508,24 @@
expMemberInfo = map[string]*memberInfo{
"tablet": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal2: sg2.Joiners["tablet"],
},
},
mtTables: mt2and3,
},
"door": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal2: sg2.Joiners["door"],
},
},
mtTables: mt2and3,
},
"lamp": &memberInfo{
- db2sg: map[wire.Id]sgMemberInfo{
- mockDbId: sgMemberInfo{
+ db2sg: map[wire.Id]sgMember{
+ mockDbId: sgMember{
sgIdInternal2: sg2.Joiners["lamp"],
},
},
@@ -571,3 +571,7 @@
check(t, []string{"a", "b", "c"}, []string{"x", "y"}, false, "no overlap")
check(t, []string{"a", "b"}, []string{"B", "a"}, false, "upper/lowercases")
}
+
+func wrap(mi wire.SyncgroupMemberInfo) interfaces.SyncgroupMemberState {
+ return interfaces.SyncgroupMemberState{WhenUpdated: time.Now().Unix(), MemberInfo: mi}
+}