blob: 98e0e8425b20c0893bd6dfc8c3bbaf745d8f1e8b [file] [log] [blame]
Himabindu Puchafb26a832015-05-20 15:37:50 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package vsync
6
Raja Daouda9cfe5e2015-05-28 14:38:34 -07007// SyncGroup management and storage in Syncbase. Handles the lifecycle
8// of SyncGroups (create, join, leave, etc.) and their persistence as
9// sync metadata in the application databases. Provides helper functions
Raja Daoud20aed3f2015-05-26 12:41:10 -070010// to the higher levels of sync (Initiator, Watcher) to get membership
11// information and map key/value changes to their matching SyncGroups.
12
Himabindu Pucha12e1a122015-06-08 15:59:09 -070013// TODO(hpucha): Add high level commentary about the logic behind create/join
14// etc.
15
Himabindu Puchafb26a832015-05-20 15:37:50 -070016import (
Raja Daoud20aed3f2015-05-26 12:41:10 -070017 "fmt"
Himabindu Puchaf2796e12015-10-01 19:05:10 -070018 "strconv"
Himabindu Pucha25a1d022015-07-21 20:46:33 -070019 "strings"
Raja Daoud20aed3f2015-05-26 12:41:10 -070020 "time"
21
Himabindu Puchafb26a832015-05-20 15:37:50 -070022 "v.io/v23/context"
Himabindu Puchad964ef02015-06-30 01:10:47 -070023 "v.io/v23/naming"
Himabindu Puchafb26a832015-05-20 15:37:50 -070024 "v.io/v23/rpc"
Himabindu Pucha12e1a122015-06-08 15:59:09 -070025 "v.io/v23/security"
26 "v.io/v23/security/access"
Adam Sadovskyf2efeb52015-08-31 14:17:49 -070027 wire "v.io/v23/services/syncbase/nosql"
Himabindu Puchafb26a832015-05-20 15:37:50 -070028 "v.io/v23/verror"
Raja Daouda9cfe5e2015-05-28 14:38:34 -070029 "v.io/v23/vom"
Raja Daoud5d5fae72015-05-29 18:09:48 -070030 "v.io/x/lib/vlog"
Adam Sadovskyf2efeb52015-08-31 14:17:49 -070031 "v.io/x/ref/services/syncbase/server/interfaces"
32 "v.io/x/ref/services/syncbase/server/util"
33 "v.io/x/ref/services/syncbase/server/watchable"
34 "v.io/x/ref/services/syncbase/store"
Himabindu Puchafb26a832015-05-20 15:37:50 -070035)
36
Raja Daoud20aed3f2015-05-26 12:41:10 -070037var (
38 // memberViewTTL is the shelf-life of the aggregate view of SyncGroup members.
39 memberViewTTL = 2 * time.Second
Raja Daoud20aed3f2015-05-26 12:41:10 -070040)
41
Raja Daoud20aed3f2015-05-26 12:41:10 -070042////////////////////////////////////////////////////////////
43// SyncGroup management internal to Syncbase.
44
Adam Sadovskyb2a65692015-05-29 21:57:40 -070045// memberView holds an aggregated view of all SyncGroup members across
46// databases. The view is not coherent, it gets refreshed according to a
47// configured TTL and not (coherently) when SyncGroup membership is updated in
48// the various databases. It is needed by the sync Initiator, which must select
49// a peer to contact from a global view of all SyncGroup members gathered from
50// all databases. This is why a slightly stale view is acceptable.
Raja Daoud20aed3f2015-05-26 12:41:10 -070051// The members are identified by their Vanadium names (map keys).
52type memberView struct {
53 expiration time.Time
54 members map[string]*memberInfo
55}
56
Adam Sadovskyb2a65692015-05-29 21:57:40 -070057// memberInfo holds the member metadata for each SyncGroup this member belongs
Himabindu Puchab41fc142015-09-10 17:10:57 -070058// to within each App/Database (i.e. global database name). It's a mapping of
59// global DB names to sets of SyncGroup member information. It also maintains
60// all the mount table candidates that could be used to reach this peer, learned
61// from the SyncGroup metadata.
Raja Daoud20aed3f2015-05-26 12:41:10 -070062type memberInfo struct {
Himabindu Puchab41fc142015-09-10 17:10:57 -070063 db2sg map[string]sgMemberInfo
64 mtTables map[string]struct{}
Raja Daoud20aed3f2015-05-26 12:41:10 -070065}
66
Raja Daoud28d3b602015-06-17 20:02:20 -070067// sgMemberInfo maps SyncGroups to their member metadata.
68type sgMemberInfo map[interfaces.GroupId]wire.SyncGroupMemberInfo
69
Raja Daoud20aed3f2015-05-26 12:41:10 -070070// newSyncGroupVersion generates a random SyncGroup version ("etag").
71func newSyncGroupVersion() string {
Raja Daoud65e68b82015-06-11 16:33:33 -070072 return fmt.Sprintf("%x", rand64())
Raja Daoud20aed3f2015-05-26 12:41:10 -070073}
74
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070075// newSyncGroupId generates a random SyncGroup ID.
76func newSyncGroupId() interfaces.GroupId {
Raja Daoud65e68b82015-06-11 16:33:33 -070077 id := interfaces.NoGroupId
78 for id == interfaces.NoGroupId {
79 id = interfaces.GroupId(rand64())
80 }
81 return id
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070082}
Raja Daouda9cfe5e2015-05-28 14:38:34 -070083
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070084// verifySyncGroup verifies if a SyncGroup struct is well-formed.
Raja Daoud52851362015-09-14 15:50:40 -070085// TODO(rdaoud): define verrors for all ErrBadArg cases.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070086func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
Raja Daoud20aed3f2015-05-26 12:41:10 -070087 if sg == nil {
Raja Daouda9cfe5e2015-05-28 14:38:34 -070088 return verror.New(verror.ErrBadArg, ctx, "group information not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -070089 }
90 if sg.Name == "" {
Raja Daouda9cfe5e2015-05-28 14:38:34 -070091 return verror.New(verror.ErrBadArg, ctx, "group name not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -070092 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070093 if sg.AppName == "" {
94 return verror.New(verror.ErrBadArg, ctx, "app name not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -070095 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070096 if sg.DbName == "" {
97 return verror.New(verror.ErrBadArg, ctx, "db name not specified")
98 }
99 if sg.Creator == "" {
100 return verror.New(verror.ErrBadArg, ctx, "creator id not specified")
101 }
102 if sg.Id == interfaces.NoGroupId {
103 return verror.New(verror.ErrBadArg, ctx, "group id not specified")
104 }
105 if sg.SpecVersion == "" {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700106 return verror.New(verror.ErrBadArg, ctx, "group version not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700107 }
108 if len(sg.Joiners) == 0 {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700109 return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700110 }
Raja Daoud52851362015-09-14 15:50:40 -0700111 return verifySyncGroupSpec(ctx, &sg.Spec)
112}
113
114// verifySyncGroupSpec verifies if a SyncGroupSpec is well-formed.
115func verifySyncGroupSpec(ctx *context.T, spec *wire.SyncGroupSpec) error {
116 if spec == nil {
117 return verror.New(verror.ErrBadArg, ctx, "group spec not specified")
118 }
119 if len(spec.Prefixes) == 0 {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700120 return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700121 }
Raja Daoud52851362015-09-14 15:50:40 -0700122
123 // Duplicate prefixes are not allowed.
124 prefixes := make(map[string]bool, len(spec.Prefixes))
125 for _, pfx := range spec.Prefixes {
126 prefixes[pfx] = true
127 }
128 if len(prefixes) != len(spec.Prefixes) {
129 return verror.New(verror.ErrBadArg, ctx, "group has duplicate prefixes specified")
130 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700131 return nil
132}
133
Raja Daoud52851362015-09-14 15:50:40 -0700134// samePrefixes returns true if the two sets of prefixes are the same.
135func samePrefixes(pfx1, pfx2 []string) bool {
136 pfxMap := make(map[string]uint8)
137 for _, p := range pfx1 {
138 pfxMap[p] |= 0x01
139 }
140 for _, p := range pfx2 {
141 pfxMap[p] |= 0x02
142 }
143 for _, mask := range pfxMap {
144 if mask != 0x03 {
145 return false
146 }
147 }
148 return true
149}
150
151// addSyncGroup adds a new SyncGroup given its version and information. This
152// also includes creating a DAG node entry and updating the DAG head. If the
153// caller is the creator of the SyncGroup, a local log record is also created
154// using the given server ID and gen and pos counters to index the log record.
155// Otherwise, it's a joiner case and the SyncGroup is put in a pending state
156// (waiting for its full metadata to be synchronized) and the log record is
157// skipped, delaying its creation till the Initiator does p2p sync.
158func (s *syncService) addSyncGroup(ctx *context.T, tx store.Transaction, version string, creator bool, remotePublisher string, genvec interfaces.PrefixGenVector, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
159 // Verify the SyncGroup information before storing it since it may have
160 // been received from a remote peer.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700161 if err := verifySyncGroup(ctx, sg); err != nil {
162 return err
163 }
Raja Daoud20aed3f2015-05-26 12:41:10 -0700164
Raja Daoud52851362015-09-14 15:50:40 -0700165 // Add the group name and ID entries.
Raja Daoud4171c9c2015-07-14 20:07:44 -0700166 if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
167 return err
168 } else if ok {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700169 return verror.New(verror.ErrExist, ctx, "group name already exists")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700170 }
Raja Daoud52851362015-09-14 15:50:40 -0700171 if ok, err := hasSGIdEntry(tx, sg.Id); err != nil {
172 return err
173 } else if ok {
174 return verror.New(verror.ErrExist, ctx, "group id already exists")
175 }
Raja Daoud20aed3f2015-05-26 12:41:10 -0700176
Raja Daoud52851362015-09-14 15:50:40 -0700177 state := sgLocalState{
178 RemotePublisher: remotePublisher,
179 SyncPending: !creator,
180 PendingGenVec: genvec,
181 }
182 if remotePublisher == "" {
183 state.NumLocalJoiners = 1
184 }
185
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700186 if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700187 return err
188 }
Raja Daoud52851362015-09-14 15:50:40 -0700189 if err := setSGIdEntry(ctx, tx, sg.Id, &state); err != nil {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700190 return err
191 }
192
Raja Daoud52851362015-09-14 15:50:40 -0700193 // Add the SyncGroup versioned data entry.
194 if ok, err := hasSGDataEntry(tx, sg.Id, version); err != nil {
195 return err
196 } else if ok {
197 return verror.New(verror.ErrExist, ctx, "group id version already exists")
198 }
199
200 return s.updateSyncGroupVersioning(ctx, tx, version, creator, servId, gen, pos, sg)
201}
202
203// updateSyncGroupVersioning updates the per-version information of a SyncGroup.
204// It writes a new versioned copy of the SyncGroup data entry, a new DAG node,
205// and updates the DAG head. Optionally, it also writes a new local log record
206// using the given server ID and gen and pos counters to index it. The caller
207// can provide the version number to use otherwise, if NoVersion is given, a new
208// version is generated by the function.
209// TODO(rdaoud): hook SyncGroup mutations (and deletions) to the watch log so
210// apps can monitor SG changes as well.
211func (s *syncService) updateSyncGroupVersioning(ctx *context.T, tx store.Transaction, version string, withLog bool, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
212 if version == NoVersion {
213 version = newSyncGroupVersion()
214 }
215
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700216 oid := sgOID(sg.Id)
217
Raja Daoud52851362015-09-14 15:50:40 -0700218 // Add the SyncGroup versioned data entry.
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700219 if err := setSGDataEntryByOID(ctx, tx, oid, version, sg); err != nil {
Raja Daoud52851362015-09-14 15:50:40 -0700220 return err
221 }
222
Raja Daoud52851362015-09-14 15:50:40 -0700223 var parents []string
224 if head, err := getHead(ctx, tx, oid); err == nil {
225 parents = []string{head}
226 } else if verror.ErrorID(err) != verror.ErrNoExist.ID {
227 return err
228 }
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700229
230 // Add a sync log record for the SyncGroup if needed.
231 logKey := ""
232 if withLog {
233 if err := addSyncGroupLogRec(ctx, tx, oid, version, parents, servId, gen, pos); err != nil {
234 return err
235 }
236 logKey = logRecKey(oid, servId, gen)
237 }
238
239 // Add the SyncGroup to the DAG.
Raja Daoud52851362015-09-14 15:50:40 -0700240 if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
241 return err
242 }
243 return setHead(ctx, tx, oid, version)
244}
245
246// addSyncGroupLogRec adds a new local log record for a SyncGroup.
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700247func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, oid, version string, parents []string, servId, gen, pos uint64) error {
Raja Daoud52851362015-09-14 15:50:40 -0700248 rec := &localLogRec{
249 Metadata: interfaces.LogRecMetadata{
250 ObjId: oid,
251 CurVers: version,
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700252 Parents: parents,
Raja Daoud52851362015-09-14 15:50:40 -0700253 Delete: false,
254 UpdTime: watchable.GetStoreTime(ctx, tx),
255 Id: servId,
256 Gen: gen,
257 RecType: interfaces.NodeRec,
258 BatchId: NoBatchId,
259 },
260 Pos: pos,
261 }
262
263 return putLogRec(ctx, tx, oid, rec)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700264}
265
266// getSyncGroupId retrieves the SyncGroup ID given its name.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700267func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700268 return getSGNameEntry(ctx, st, name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700269}
270
Raja Daoud52851362015-09-14 15:50:40 -0700271// getSyncGroupVersion retrieves the current version of the SyncGroup.
272func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700273 return getHead(ctx, st, sgOID(gid))
Raja Daoud20aed3f2015-05-26 12:41:10 -0700274}
275
276// getSyncGroupById retrieves the SyncGroup given its ID.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700277func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
Raja Daoud52851362015-09-14 15:50:40 -0700278 version, err := getSyncGroupVersion(ctx, st, gid)
279 if err != nil {
280 return nil, err
281 }
282 return getSGDataEntry(ctx, st, gid, version)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700283}
284
285// getSyncGroupByName retrieves the SyncGroup given its name.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700286func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*interfaces.SyncGroup, error) {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700287 gid, err := getSyncGroupId(ctx, st, name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700288 if err != nil {
289 return nil, err
290 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700291 return getSyncGroupById(ctx, st, gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700292}
293
294// delSyncGroupById deletes the SyncGroup given its ID.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700295func delSyncGroupById(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700296 sg, err := getSyncGroupById(ctx, tx, gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700297 if err != nil {
298 return err
299 }
Raja Daoud52851362015-09-14 15:50:40 -0700300 return delSyncGroupByName(ctx, tx, sg.Name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700301}
302
303// delSyncGroupByName deletes the SyncGroup given its name.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700304func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
Raja Daoud52851362015-09-14 15:50:40 -0700305 // Get the SyncGroup ID and current version.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700306 gid, err := getSyncGroupId(ctx, tx, name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700307 if err != nil {
308 return err
309 }
Raja Daoud52851362015-09-14 15:50:40 -0700310 version, err := getSyncGroupVersion(ctx, tx, gid)
311 if err != nil {
312 return err
313 }
314
315 // Delete the name and ID entries.
316 if err := delSGNameEntry(ctx, tx, name); err != nil {
317 return err
318 }
319 if err := delSGIdEntry(ctx, tx, gid); err != nil {
320 return err
321 }
322
323 // Delete all versioned SyncGroup data entries (same versions as DAG
324 // nodes). This is done separately from pruning the DAG nodes because
325 // some nodes may have no log record pointing back to the SyncGroup data
326 // entries (loose coupling to support the pending SyncGroup state).
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700327 oid := sgOID(gid)
Raja Daoud52851362015-09-14 15:50:40 -0700328 err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
329 return delSGDataEntry(ctx, tx, gid, v)
330 })
331 if err != nil {
332 return err
333 }
334
335 // Delete all DAG nodes and log records.
336 bset := newBatchPruning()
337 err = prune(ctx, tx, oid, NoVersion, bset, func(ctx *context.T, tx store.Transaction, lr string) error {
338 if lr != "" {
339 return util.Delete(ctx, tx, lr)
340 }
341 return nil
342 })
343 if err != nil {
344 return err
345 }
346 return pruneDone(ctx, tx, bset)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700347}
348
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700349// refreshMembersIfExpired updates the aggregate view of SyncGroup members
350// across databases if the view has expired.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700351// TODO(rdaoud): track dirty apps/dbs since the last refresh and incrementally
352// update the membership view for them instead of always scanning all of them.
353func (s *syncService) refreshMembersIfExpired(ctx *context.T) {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700354 view := s.allMembers
355 if view == nil {
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700356 // The empty expiration time in Go is before "now" and treated as expired
357 // below.
Raja Daoud28d3b602015-06-17 20:02:20 -0700358 view = &memberView{expiration: time.Time{}, members: nil}
Raja Daoud20aed3f2015-05-26 12:41:10 -0700359 s.allMembers = view
360 }
361
362 if time.Now().Before(view.expiration) {
363 return
364 }
365
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700366 // Create a new aggregate view of SyncGroup members across all app databases.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700367 newMembers := make(map[string]*memberInfo)
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700368
Raja Daoud28d3b602015-06-17 20:02:20 -0700369 s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700370 // For each database, fetch its SyncGroup data entries by scanning their
371 // prefix range. Use a database snapshot for the scan.
Raja Daoud5d5fae72015-05-29 18:09:48 -0700372 sn := st.NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700373 defer sn.Abort()
Raja Daoud28d3b602015-06-17 20:02:20 -0700374 name := appDbName(appName, dbName)
Raja Daoud5d5fae72015-05-29 18:09:48 -0700375
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700376 forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool {
Raja Daoud5d5fae72015-05-29 18:09:48 -0700377 // Add all members of this SyncGroup to the membership view.
378 // A member's info is different across SyncGroups, so gather all of them.
379 for member, info := range sg.Joiners {
380 if _, ok := newMembers[member]; !ok {
Himabindu Puchab41fc142015-09-10 17:10:57 -0700381 newMembers[member] = &memberInfo{
382 db2sg: make(map[string]sgMemberInfo),
383 mtTables: make(map[string]struct{}),
384 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700385 }
Raja Daoud28d3b602015-06-17 20:02:20 -0700386 if _, ok := newMembers[member].db2sg[name]; !ok {
387 newMembers[member].db2sg[name] = make(sgMemberInfo)
388 }
389 newMembers[member].db2sg[name][sg.Id] = info
Himabindu Puchab41fc142015-09-10 17:10:57 -0700390
391 // Collect mount tables.
392 for _, mt := range sg.Spec.MountTables {
393 newMembers[member].mtTables[mt] = struct{}{}
394 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700395 }
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700396 return false
397 })
Raja Daoud5d5fae72015-05-29 18:09:48 -0700398 return false
399 })
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700400
401 view.members = newMembers
Raja Daoud20aed3f2015-05-26 12:41:10 -0700402 view.expiration = time.Now().Add(memberViewTTL)
403}
404
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700405// forEachSyncGroup iterates over all SyncGroups in the Database and invokes
406// the callback function on each one. The callback returns a "done" flag to
407// make forEachSyncGroup() stop the iteration earlier; otherwise the function
408// loops across all SyncGroups in the Database.
409func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
Raja Daoud52851362015-09-14 15:50:40 -0700410 stream := st.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
411 defer stream.Cancel()
412
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700413 for stream.Advance() {
Raja Daoud52851362015-09-14 15:50:40 -0700414 var gid interfaces.GroupId
415 if vom.Decode(stream.Value(nil), &gid) != nil {
416 vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup ID for key %s", string(stream.Key(nil)))
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700417 continue
418 }
419
Raja Daoud52851362015-09-14 15:50:40 -0700420 sg, err := getSyncGroupById(nil, st, gid)
421 if err != nil {
422 vlog.Errorf("sync: forEachSyncGroup: cannot get SyncGroup %d: %v", gid, err)
423 continue
424 }
425
426 if callback(sg) {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700427 break // done, early exit
428 }
429 }
430
431 if err := stream.Err(); err != nil {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700432 vlog.Errorf("sync: forEachSyncGroup: scan stream error: %v", err)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700433 }
434}
435
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700436// getMembers returns all SyncGroup members and the count of SyncGroups each one
437// joined.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700438func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700439 s.allMembersLock.Lock()
440 defer s.allMembersLock.Unlock()
441
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700442 s.refreshMembersIfExpired(ctx)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700443
444 members := make(map[string]uint32)
445 for member, info := range s.allMembers.members {
Raja Daoud28d3b602015-06-17 20:02:20 -0700446 count := 0
447 for _, sgmi := range info.db2sg {
448 count += len(sgmi)
449 }
450 members[member] = uint32(count)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700451 }
452
453 return members
454}
455
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700456// copyMemberInfo returns a copy of the info for the requested peer.
457func (s *syncService) copyMemberInfo(ctx *context.T, member string) *memberInfo {
458 s.allMembersLock.RLock()
459 defer s.allMembersLock.RUnlock()
460
461 info, ok := s.allMembers.members[member]
462 if !ok {
463 return nil
464 }
465
466 // Make a copy.
Himabindu Puchab41fc142015-09-10 17:10:57 -0700467 infoCopy := &memberInfo{
468 db2sg: make(map[string]sgMemberInfo),
469 mtTables: make(map[string]struct{}),
470 }
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700471 for gdbName, sgInfo := range info.db2sg {
472 infoCopy.db2sg[gdbName] = make(sgMemberInfo)
473 for gid, mi := range sgInfo {
474 infoCopy.db2sg[gdbName][gid] = mi
475 }
476 }
Himabindu Puchab41fc142015-09-10 17:10:57 -0700477 for mt := range info.mtTables {
478 infoCopy.mtTables[mt] = struct{}{}
479 }
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700480
481 return infoCopy
482}
483
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700484// Low-level utility functions to access DB entries without tracking their
485// relationships.
Raja Daoud20aed3f2015-05-26 12:41:10 -0700486// Use the functions above to manipulate SyncGroups.
487
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700488var (
Raja Daoud52851362015-09-14 15:50:40 -0700489 // Prefixes used to store the different mappings of a SyncGroup:
490 // sgNameKeyPrefix: name --> ID
491 // sgIdKeyPrefix: ID --> SyncGroup local state
492 // sgDataKeyPrefix: (ID, version) --> SyncGroup data (synchronized)
493 //
494 // Note: as with other syncable objects, the DAG "heads" table contains
495 // a reference to the current SyncGroup version, and the DAG "nodes"
496 // table tracks its history of mutations.
Raja Daoud52851362015-09-14 15:50:40 -0700497 sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
498 sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700499 sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgDataPrefix)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700500)
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700501
Raja Daoud20aed3f2015-05-26 12:41:10 -0700502// sgNameKey returns the key used to access the SyncGroup name entry.
503func sgNameKey(name string) string {
Raja Daoud52851362015-09-14 15:50:40 -0700504 return util.JoinKeyParts(sgNameKeyPrefix, name)
505}
506
507// sgIdKey returns the key used to access the SyncGroup ID entry.
508func sgIdKey(gid interfaces.GroupId) string {
509 return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
510}
511
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700512// sgOID converts a group id into an oid string.
513func sgOID(gid interfaces.GroupId) string {
514 return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid))
515}
516
517// sgID is the inverse of sgOID and converts an oid string into a group id.
518func sgID(oid string) (interfaces.GroupId, error) {
519 parts := util.SplitKeyParts(oid)
520 if len(parts) != 3 {
521 return 0, fmt.Errorf("invalid sgoid %s", oid)
522 }
523
524 id, err := strconv.ParseUint(parts[2], 10, 64)
525 if err != nil {
526 return 0, err
527 }
528 return interfaces.GroupId(id), nil
529}
530
Raja Daoud52851362015-09-14 15:50:40 -0700531// sgDataKey returns the key used to access a version of the SyncGroup data.
532func sgDataKey(gid interfaces.GroupId, version string) string {
533 return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700534}
535
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700536// sgDataKeyByOID returns the key used to access a version of the SyncGroup data.
537func sgDataKeyByOID(oid, version string) string {
538 return util.JoinKeyParts(oid, version)
539}
540
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700541// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
542func splitSgNameKey(ctx *context.T, key string) (string, error) {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700543 // Note that the actual SyncGroup name may contain ":" as a separator.
Raja Daoud52851362015-09-14 15:50:40 -0700544 // So don't split the key on the separator, instead trim its prefix.
545 prefix := util.JoinKeyParts(sgNameKeyPrefix, "")
546 name := strings.TrimPrefix(key, prefix)
547 if name == key {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700548 return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
549 }
Raja Daoud52851362015-09-14 15:50:40 -0700550 return name, nil
Raja Daoud20aed3f2015-05-26 12:41:10 -0700551}
552
553// hasSGNameEntry returns true if the SyncGroup name entry exists.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700554func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
Raja Daoud52851362015-09-14 15:50:40 -0700555 return util.Exists(nil, sntx, sgNameKey(name))
Raja Daoud20aed3f2015-05-26 12:41:10 -0700556}
557
Raja Daoud52851362015-09-14 15:50:40 -0700558// hasSGIdEntry returns true if the SyncGroup ID entry exists.
559func hasSGIdEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
560 return util.Exists(nil, sntx, sgIdKey(gid))
561}
562
563// hasSGDataEntry returns true if the SyncGroup versioned data entry exists.
564func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId, version string) (bool, error) {
565 return util.Exists(nil, sntx, sgDataKey(gid, version))
Raja Daoud20aed3f2015-05-26 12:41:10 -0700566}
567
568// setSGNameEntry stores the SyncGroup name entry.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700569func setSGNameEntry(ctx *context.T, tx store.Transaction, name string, gid interfaces.GroupId) error {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700570 return util.Put(ctx, tx, sgNameKey(name), gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700571}
572
Raja Daoud52851362015-09-14 15:50:40 -0700573// setSGIdEntry stores the SyncGroup ID entry.
574func setSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, state *sgLocalState) error {
575 return util.Put(ctx, tx, sgIdKey(gid), state)
576}
577
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700578// setSGDataEntryByOID stores the SyncGroup versioned data entry.
579func setSGDataEntryByOID(ctx *context.T, tx store.Transaction, sgoid, version string, sg *interfaces.SyncGroup) error {
580 return util.Put(ctx, tx, sgDataKeyByOID(sgoid, version), sg)
Raja Daoud52851362015-09-14 15:50:40 -0700581}
582
583// getSGNameEntry retrieves the SyncGroup ID for a given name.
584func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
585 var gid interfaces.GroupId
586 if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
587 return interfaces.NoGroupId, err
588 }
589 return gid, nil
590}
591
592// getSGIdEntry retrieves the SyncGroup local state for a given group ID.
593func getSGIdEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*sgLocalState, error) {
594 var state sgLocalState
595 if err := util.Get(ctx, st, sgIdKey(gid), &state); err != nil {
596 return nil, err
597 }
598 return &state, nil
599}
600
601// getSGDataEntry retrieves the SyncGroup data for a given group ID and version.
602func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId, version string) (*interfaces.SyncGroup, error) {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700603 var sg interfaces.SyncGroup
Raja Daoud52851362015-09-14 15:50:40 -0700604 if err := util.Get(ctx, st, sgDataKey(gid, version), &sg); err != nil {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700605 return nil, err
Raja Daoud20aed3f2015-05-26 12:41:10 -0700606 }
607 return &sg, nil
608}
609
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700610// getSGDataEntryByOID retrieves the SyncGroup data for a given group OID and version.
611func getSGDataEntryByOID(ctx *context.T, st store.StoreReader, sgoid string, version string) (*interfaces.SyncGroup, error) {
612 var sg interfaces.SyncGroup
613 if err := util.Get(ctx, st, sgDataKeyByOID(sgoid, version), &sg); err != nil {
614 return nil, err
615 }
616 return &sg, nil
617}
618
Raja Daoud52851362015-09-14 15:50:40 -0700619// delSGNameEntry deletes the SyncGroup name entry.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700620func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700621 return util.Delete(ctx, tx, sgNameKey(name))
Raja Daoud20aed3f2015-05-26 12:41:10 -0700622}
623
Raja Daoud52851362015-09-14 15:50:40 -0700624// delSGIdEntry deletes the SyncGroup ID entry.
625func delSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
626 return util.Delete(ctx, tx, sgIdKey(gid))
627}
628
629// delSGDataEntry deletes the SyncGroup versioned data entry.
630func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string) error {
631 return util.Delete(ctx, tx, sgDataKey(gid, version))
632}
633
Himabindu Puchafb26a832015-05-20 15:37:50 -0700634////////////////////////////////////////////////////////////
635// SyncGroup methods between Client and Syncbase.
636
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700637// TODO(hpucha): Pass blessings along.
638func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700639 vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
640 defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
641
Himabindu Puchab41fc142015-09-10 17:10:57 -0700642 ss := sd.sync.(*syncService)
Raja Daoud52851362015-09-14 15:50:40 -0700643 appName, dbName := sd.db.App().Name(), sd.db.Name()
644
645 // Instantiate sg. Add self as joiner.
646 gid, version := newSyncGroupId(), newSyncGroupVersion()
647 sg := &interfaces.SyncGroup{
648 Id: gid,
649 Name: sgName,
650 SpecVersion: version,
651 Spec: spec,
652 Creator: ss.name,
653 AppName: appName,
654 DbName: dbName,
655 Status: interfaces.SyncGroupStatusPublishPending,
656 Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
657 }
Himabindu Puchab41fc142015-09-10 17:10:57 -0700658
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700659 err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700660 // Check permissions on Database.
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700661 if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700662 return err
663 }
664
665 // TODO(hpucha): Check prefix ACLs on all SG prefixes.
666 // This may need another method on util.Database interface.
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700667 // TODO(hpucha): Do some SG ACL checking. Check creator
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700668 // has Admin privilege.
669
Raja Daoud52851362015-09-14 15:50:40 -0700670 // Reserve a log generation and position counts for the new SyncGroup.
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700671 gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700672
Raja Daoud52851362015-09-14 15:50:40 -0700673 if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700674 return err
675 }
676
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700677 // Take a snapshot of the data to bootstrap the SyncGroup.
Raja Daoud0dfdd252015-07-10 20:02:22 -0700678 return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700679 })
680
681 if err != nil {
682 return err
683 }
684
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700685 ss.initSyncStateInMem(ctx, appName, dbName, sgOID(gid))
Raja Daoud52851362015-09-14 15:50:40 -0700686
687 // Local SG create succeeded. Publish the SG at the chosen server, or if
688 // that fails, enqueue it for later publish retries.
689 if err := sd.publishSyncGroup(ctx, call, sgName); err != nil {
690 ss.enqueuePublishSyncGroup(sgName, appName, dbName, true)
691 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700692
693 // Publish at the chosen mount table and in the neighborhood.
694 sd.publishInMountTables(ctx, call, spec)
695
696 return nil
697}
698
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700699// TODO(hpucha): Pass blessings along.
700func (sd *syncDatabase) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700701 vlog.VI(2).Infof("sync: JoinSyncGroup: begin: %s", sgName)
702 defer vlog.VI(2).Infof("sync: JoinSyncGroup: end: %s", sgName)
703
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700704 var sgErr error
705 var sg *interfaces.SyncGroup
706 nullSpec := wire.SyncGroupSpec{}
707
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700708 err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700709 // Check permissions on Database.
710 if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
711 return err
712 }
713
Raja Daoud52851362015-09-14 15:50:40 -0700714 // Check if SyncGroup already exists and get its info.
715 var gid interfaces.GroupId
716 gid, sgErr = getSyncGroupId(ctx, tx, sgName)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700717 if sgErr != nil {
718 return sgErr
719 }
720
Raja Daoud52851362015-09-14 15:50:40 -0700721 sg, sgErr = getSyncGroupById(ctx, tx, gid)
722 if sgErr != nil {
723 return sgErr
724 }
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700725
726 // Check SG ACL.
Raja Daoud52851362015-09-14 15:50:40 -0700727 if err := authorize(ctx, call.Security(), sg); err != nil {
728 return err
729 }
730
731 // SyncGroup already exists, increment the number of local
732 // joiners in its local state information. This presents
733 // different scenarios:
734 // 1- An additional local joiner: the current number of local
735 // joiners is > 0 and the SyncGroup was already bootstrapped
736 // to the Watcher, so there is nothing else to do.
737 // 2- A new local joiner after all previous local joiners had
738 // left: the number of local joiners is 0, the Watcher must
739 // be re-notified via a SyncGroup bootstrap because the last
740 // previous joiner to leave had un-notified the Watcher. In
741 // this scenario the SyncGroup was not destroyed after the
742 // last joiner left because the SyncGroup was also published
743 // here by a remote peer and thus cannot be destroyed only
744 // based on the local joiners.
745 // 3- A first local joiner for a SyncGroup that was published
746 // here from a remote Syncbase: the number of local joiners
747 // is also 0 (and the remote publish flag is set), and the
748 // Watcher must be notified via a SyncGroup bootstrap.
749 // Conclusion: bootstrap if the number of local joiners is 0.
750 sgState, err := getSGIdEntry(ctx, tx, gid)
751 if err != nil {
752 return err
753 }
754
755 if sgState.NumLocalJoiners == 0 {
756 if err := sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes); err != nil {
757 return err
758 }
759 }
760 sgState.NumLocalJoiners++
761 return setSGIdEntry(ctx, tx, gid, sgState)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700762 })
763
764 // The presented blessing is allowed to make this Syncbase instance join
765 // the specified SyncGroup, but this Syncbase instance has in fact
766 // already joined the SyncGroup. Join is idempotent, so we simply return
767 // the spec to indicate success.
768 if err == nil {
769 return sg.Spec, nil
770 }
771
772 // Join is not allowed (possibilities include Database permissions check
773 // failed, SG ACL check failed or error during fetching SG information).
774 if verror.ErrorID(sgErr) != verror.ErrNoExist.ID {
775 return nullSpec, err
776 }
777
778 // Brand new join.
779
780 // Get this Syncbase's sync module handle.
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700781 ss := sd.sync.(*syncService)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700782
783 // Contact a SyncGroup Admin to join the SyncGroup.
Raja Daoud52851362015-09-14 15:50:40 -0700784 sg2, version, genvec, err := sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700785 if err != nil {
786 return nullSpec, err
787 }
788
789 // Verify that the app/db combination is valid for this SyncGroup.
Raja Daoud52851362015-09-14 15:50:40 -0700790 appName, dbName := sd.db.App().Name(), sd.db.Name()
791 if sg2.AppName != appName || sg2.DbName != dbName {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700792 return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
793 }
794
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700795 err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Raja Daoud52851362015-09-14 15:50:40 -0700796 if err := ss.addSyncGroup(ctx, tx, version, false, "", genvec, 0, 0, 0, &sg2); err != nil {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700797 return err
798 }
799
800 // Take a snapshot of the data to bootstrap the SyncGroup.
Raja Daoud52851362015-09-14 15:50:40 -0700801 return sd.bootstrapSyncGroup(ctx, tx, sg2.Spec.Prefixes)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700802 })
803
804 if err != nil {
805 return nullSpec, err
806 }
807
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700808 ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sgOID(sg2.Id))
Himabindu Puchab41fc142015-09-10 17:10:57 -0700809
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700810 // Publish at the chosen mount table and in the neighborhood.
Raja Daoud52851362015-09-14 15:50:40 -0700811 sd.publishInMountTables(ctx, call, sg2.Spec)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700812
Raja Daoud52851362015-09-14 15:50:40 -0700813 return sg2.Spec, nil
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700814}
815
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700816func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700817 vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
Raja Daoud52851362015-09-14 15:50:40 -0700818 defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end")
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700819
820 sn := sd.db.St().NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700821 defer sn.Abort()
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700822
823 // Check permissions on Database.
824 if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
825 return nil, err
826 }
827
828 // Scan all the SyncGroup names found in the Database.
Raja Daoud52851362015-09-14 15:50:40 -0700829 stream := sn.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
830 var sgNames []string
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700831 var key []byte
832 for stream.Advance() {
833 sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
834 if err != nil {
835 return nil, err
836 }
837 sgNames = append(sgNames, sgName)
838 }
839
840 if err := stream.Err(); err != nil {
841 return nil, err
842 }
843
Raja Daoud52851362015-09-14 15:50:40 -0700844 vlog.VI(2).Infof("sync: GetSyncGroupNames: %v", sgNames)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700845 return sgNames, nil
846}
847
848func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700849 vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
Raja Daoud52851362015-09-14 15:50:40 -0700850 defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s", sgName)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700851
852 sn := sd.db.St().NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700853 defer sn.Abort()
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700854
Raja Daoud52851362015-09-14 15:50:40 -0700855 var spec wire.SyncGroupSpec
856
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700857 // Check permissions on Database.
858 if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
859 return spec, "", err
860 }
861
862 // Get the SyncGroup information.
863 sg, err := getSyncGroupByName(ctx, sn, sgName)
864 if err != nil {
865 return spec, "", err
866 }
867 // TODO(hpucha): Check SyncGroup ACL.
868
Raja Daoud52851362015-09-14 15:50:40 -0700869 vlog.VI(2).Infof("sync: GetSyncGroupSpec: %s spec %v", sgName, sg.Spec)
870 return sg.Spec, sg.SpecVersion, nil
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700871}
872
873func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700874 vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
Raja Daoud52851362015-09-14 15:50:40 -0700875 defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s", sgName)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700876
877 sn := sd.db.St().NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700878 defer sn.Abort()
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700879
880 // Check permissions on Database.
881 if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
Raja Daoud52851362015-09-14 15:50:40 -0700882 return nil, err
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700883 }
884
885 // Get the SyncGroup information.
886 sg, err := getSyncGroupByName(ctx, sn, sgName)
887 if err != nil {
Raja Daoud52851362015-09-14 15:50:40 -0700888 return nil, err
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700889 }
890
891 // TODO(hpucha): Check SyncGroup ACL.
892
Raja Daoud52851362015-09-14 15:50:40 -0700893 vlog.VI(2).Infof("sync: GetSyncGroupMembers: %s members %v", sgName, sg.Joiners)
894 return sg.Joiners, nil
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700895}
896
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700897func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
898 vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
899 defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
900
Raja Daoud52851362015-09-14 15:50:40 -0700901 if err := verifySyncGroupSpec(ctx, &spec); err != nil {
902 return err
903 }
904
905 ss := sd.sync.(*syncService)
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700906 appName, dbName := sd.db.App().Name(), sd.db.Name()
Raja Daoud52851362015-09-14 15:50:40 -0700907
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700908 err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700909 // Check permissions on Database.
910 if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
911 return err
912 }
913
Raja Daoudccfd6c12015-08-03 18:46:28 -0700914 sg, err := getSyncGroupByName(ctx, tx, sgName)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700915 if err != nil {
916 return err
917 }
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700918
Raja Daoud52851362015-09-14 15:50:40 -0700919 if version != NoVersion && sg.SpecVersion != version {
920 return verror.NewErrBadVersion(ctx)
921 }
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700922
Raja Daoud52851362015-09-14 15:50:40 -0700923 // Must not change the SyncGroup prefixes.
924 if !samePrefixes(spec.Prefixes, sg.Spec.Prefixes) {
925 return verror.New(verror.ErrBadArg, ctx, "cannot modify prefixes")
926 }
927
928 sgState, err := getSGIdEntry(ctx, tx, sg.Id)
929 if err != nil {
930 return err
931 }
932 if sgState.SyncPending {
933 return verror.NewErrBadState(ctx)
934 }
935
936 // Reserve a log generation and position counts for the new SyncGroup.
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700937 gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(sg.Id), 1)
Raja Daoud52851362015-09-14 15:50:40 -0700938
939 // TODO(hpucha): Check SyncGroup ACL.
940
941 newVersion := newSyncGroupVersion()
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700942 sg.Spec = spec
Raja Daoud52851362015-09-14 15:50:40 -0700943 sg.SpecVersion = newVersion
944 return ss.updateSyncGroupVersioning(ctx, tx, newVersion, true, ss.id, gen, pos, sg)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700945 })
946 return err
947}
948
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700949//////////////////////////////
950// Helper functions
951
Raja Daoud52851362015-09-14 15:50:40 -0700952// publishSyncGroup publishes the SyncGroup at the remote peer and update its
953// status. If the publish operation is either successful or rejected by the
954// peer, the status is updated to "running" or "rejected" respectively and the
955// function returns "nil" to indicate to the caller there is no need to make
956// further attempts. Otherwise an error (typically RPC error, but could also
957// be a store error) is returned to the caller.
958// TODO(rdaoud): make all SG admins try to publish after they join.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700959func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
Raja Daoud52851362015-09-14 15:50:40 -0700960 st := sd.db.St()
961 ss := sd.sync.(*syncService)
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700962 appName, dbName := sd.db.App().Name(), sd.db.Name()
Raja Daoud52851362015-09-14 15:50:40 -0700963
964 gid, err := getSyncGroupId(ctx, st, sgName)
965 if err != nil {
966 return err
967 }
968 version, err := getSyncGroupVersion(ctx, st, gid)
969 if err != nil {
970 return err
971 }
972 sg, err := getSGDataEntry(ctx, st, gid, version)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700973 if err != nil {
974 return err
975 }
976
977 if sg.Status != interfaces.SyncGroupStatusPublishPending {
978 return nil
979 }
980
Raja Daoud52851362015-09-14 15:50:40 -0700981 // Note: the remote peer is given the SyncGroup version and genvec at
982 // the point before the post-publish update, at which time the status
983 // and joiner list of the SyncGroup get updated. This is functionally
984 // correct, just not symmetrical with what happens at joiner, which
985 // receives the SyncGroup state post-join.
Raja Daoud52851362015-09-14 15:50:40 -0700986 status := interfaces.SyncGroupStatusPublishRejected
987
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700988 sgs := sgSet{gid: struct{}{}}
989 gv, _, err := ss.copyDbGenInfo(ctx, appName, dbName, sgs)
990 if err != nil {
991 return err
992 }
993 // TODO(hpucha): Do we want to pick the head version corresponding to
994 // the local gen of the sg? It appears that it shouldn't matter.
995
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700996 c := interfaces.SyncClient(sgName)
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700997 peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, gv[sgOID(gid)])
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700998
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700999 if err == nil {
Raja Daoud52851362015-09-14 15:50:40 -07001000 status = interfaces.SyncGroupStatusRunning
1001 } else {
1002 errId := verror.ErrorID(err)
1003 if errId == interfaces.ErrDupSyncGroupPublish.ID {
1004 // Duplicate publish: another admin already published
1005 // the SyncGroup, nothing else needs to happen because
1006 // that other admin would have updated the SyncGroup
1007 // status and p2p SG sync will propagate the change.
1008 // TODO(rdaoud): what if that other admin crashes and
1009 // never updates the SyncGroup status (dies permanently
1010 // or is ejected before the status update)? Eventually
1011 // some admin must decide to update the SG status anyway
1012 // even if that causes extra SG mutations and conflicts.
1013 vlog.VI(3).Infof("sync: publishSyncGroup: %s: duplicate publish", sgName)
1014 return nil
1015 }
1016
1017 if errId != verror.ErrExist.ID {
1018 // The publish operation failed with an error other
1019 // than ErrExist then it must be retried later on.
1020 // TODO(hpucha): Is there an RPC error that we can check here?
1021 vlog.VI(3).Infof("sync: publishSyncGroup: %s: failed, retry later: %v", sgName, err)
1022 return err
1023 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001024 }
1025
Raja Daoud52851362015-09-14 15:50:40 -07001026 // The publish operation is done because either it succeeded or it
1027 // failed with the ErrExist error. Update the SyncGroup status and, if
1028 // the publish was successful, add the remote peer to the SyncGroup.
1029 vlog.VI(3).Infof("sync: publishSyncGroup: %s: peer %s: done: status %s: %v",
1030 sgName, peer, status.String(), err)
1031
1032 err = store.RunInTransaction(st, func(tx store.Transaction) error {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001033 // Ensure SG still exists.
Raja Daoud52851362015-09-14 15:50:40 -07001034 sg, err := getSyncGroupById(ctx, tx, gid)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001035 if err != nil {
1036 return err
1037 }
1038
Raja Daoud52851362015-09-14 15:50:40 -07001039 // Reserve a log generation and position counts for the new
1040 // SyncGroup version.
Himabindu Puchaf2796e12015-10-01 19:05:10 -07001041 gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
Raja Daoud52851362015-09-14 15:50:40 -07001042
1043 sg.Status = status
1044 if status == interfaces.SyncGroupStatusRunning {
1045 // TODO(hpucha): Default priority?
1046 sg.Joiners[peer] = wire.SyncGroupMemberInfo{}
1047 }
1048
1049 return ss.updateSyncGroupVersioning(ctx, tx, NoVersion, true, ss.id, gen, pos, sg)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001050 })
Raja Daoud52851362015-09-14 15:50:40 -07001051 if err != nil {
1052 vlog.Errorf("sync: publishSyncGroup: cannot update SyncGroup %s status to %s: %v",
1053 sgName, status.String(), err)
1054 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001055 return err
1056}
1057
Raja Daoud0dfdd252015-07-10 20:02:22 -07001058// bootstrapSyncGroup inserts into the transaction log a SyncGroup operation and
1059// a set of Snapshot operations to notify the sync watcher about the SyncGroup
1060// prefixes to start accepting and the initial state of existing store keys that
1061// match these prefixes (both data and permission keys).
1062// TODO(rdaoud): this operation scans the managed keys of the database and can
1063// be time consuming. Consider doing it asynchronously and letting the server
1064// reply to the client earlier. However it must happen within the scope of this
1065// transaction (and its snapshot view).
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -07001066func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.Transaction, prefixes []string) error {
Raja Daoud0dfdd252015-07-10 20:02:22 -07001067 if len(prefixes) == 0 {
1068 return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
1069 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001070
Raja Daoud0dfdd252015-07-10 20:02:22 -07001071 // Get the store options to retrieve the list of managed key prefixes.
1072 opts, err := watchable.GetOptions(sd.db.St())
1073 if err != nil {
1074 return err
1075 }
1076 if len(opts.ManagedPrefixes) == 0 {
1077 return verror.New(verror.ErrInternal, ctx, "store has no managed prefixes")
1078 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001079
Raja Daoud0dfdd252015-07-10 20:02:22 -07001080 // Notify the watcher of the SyncGroup prefixes to start accepting.
1081 if err := watchable.AddSyncGroupOp(ctx, tx, prefixes, false); err != nil {
1082 return err
1083 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001084
Raja Daoud0dfdd252015-07-10 20:02:22 -07001085 // Loop over the store managed key prefixes (e.g. data and permissions).
1086 // For each one, scan the ranges of the given SyncGroup prefixes. For
1087 // each matching key, insert a snapshot operation in the log. Scanning
1088 // is done over the version entries to retrieve the matching keys and
1089 // their version numbers (the key values). Remove the version prefix
1090 // from the key used in the snapshot operation.
1091 // TODO(rdaoud): for SyncGroup prefixes, there should be a separation
1092 // between their representation at the client (a list of (db, prefix)
1093 // tuples) and internally as strings that match the store's key format.
1094 for _, mp := range opts.ManagedPrefixes {
1095 for _, p := range prefixes {
Raja Daoud0dfdd252015-07-10 20:02:22 -07001096 start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.VersionPrefix, mp), p)
1097 stream := tx.Scan(start, limit)
1098 for stream.Advance() {
Raja Daoud4171c9c2015-07-14 20:07:44 -07001099 k, v := stream.Key(nil), stream.Value(nil)
Raja Daoud0dfdd252015-07-10 20:02:22 -07001100 parts := util.SplitKeyParts(string(k))
1101 if len(parts) < 2 {
Raja Daoud4171c9c2015-07-14 20:07:44 -07001102 vlog.Fatalf("sync: bootstrapSyncGroup: invalid version key %s", string(k))
Raja Daoud0dfdd252015-07-10 20:02:22 -07001103
1104 }
1105 key := []byte(util.JoinKeyParts(parts[1:]...))
1106 if err := watchable.AddSyncSnapshotOp(ctx, tx, key, v); err != nil {
1107 return err
1108 }
1109
1110 }
1111 if err := stream.Err(); err != nil {
1112 return err
1113 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001114 }
1115 }
1116 return nil
1117}
1118
1119func (sd *syncDatabase) publishInMountTables(ctx *context.T, call rpc.ServerCall, spec wire.SyncGroupSpec) error {
Himabindu Puchad964ef02015-06-30 01:10:47 -07001120 // Get this Syncbase's sync module handle.
Himabindu Pucha665f14c2015-08-13 13:42:01 -07001121 ss := sd.sync.(*syncService)
Himabindu Puchad964ef02015-06-30 01:10:47 -07001122
1123 for _, mt := range spec.MountTables {
1124 name := naming.Join(mt, ss.name)
1125 // TODO(hpucha): Is this add idempotent? Appears to be from code.
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -07001126 // Confirm that it is ok to use absolute names here.
Matt Rosencrantza475b4e2015-09-01 13:53:21 -07001127 if err := call.Server().AddName(name); err != nil {
Himabindu Puchad964ef02015-06-30 01:10:47 -07001128 return err
1129 }
1130 }
1131
1132 // TODO(hpucha): Do we have to publish in neighborhood explicitly?
1133
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001134 return nil
1135}
1136
Raja Daoud52851362015-09-14 15:50:40 -07001137func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001138 c := interfaces.SyncClient(sgName)
1139 return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
1140
1141 // TODO(hpucha): Try to join using an Admin on neighborhood if the publisher is not reachable.
1142}
1143
1144func authorize(ctx *context.T, call security.Call, sg *interfaces.SyncGroup) error {
1145 auth := access.TypicalTagTypePermissionsAuthorizer(sg.Spec.Perms)
1146 if err := auth.Authorize(ctx, call); err != nil {
1147 return verror.New(verror.ErrNoAccess, ctx, err)
1148 }
1149 return nil
1150}
1151
Himabindu Puchafb26a832015-05-20 15:37:50 -07001152////////////////////////////////////////////////////////////
1153// Methods for SyncGroup create/join between Syncbases.
1154
Raja Daoud52851362015-09-14 15:50:40 -07001155func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg interfaces.SyncGroup, version string, genvec interfaces.PrefixGenVector) (string, error) {
Himabindu Pucha53497022015-06-19 11:57:13 -07001156 st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001157 if err != nil {
Raja Daoud52851362015-09-14 15:50:40 -07001158 return s.name, err
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001159 }
1160
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -07001161 err = store.RunInTransaction(st, func(tx store.Transaction) error {
Raja Daoud52851362015-09-14 15:50:40 -07001162 gid, err := getSyncGroupId(ctx, tx, sg.Name)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001163 if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
1164 return err
1165 }
1166
Raja Daoud52851362015-09-14 15:50:40 -07001167 if err == nil {
1168 // SG name already claimed. Note that in this case of
1169 // split-brain (same SG name, different IDs), those in
1170 // SG ID being rejected here do not benefit from the
1171 // de-duping optimization below and will end up making
1172 // duplicate SG mutations to set the status, yielding
1173 // more SG conflicts. It is functionally correct but
1174 // bypasses the de-dup optimization for the rejected SG.
1175 if gid != sg.Id {
1176 return verror.New(verror.ErrExist, ctx, sg.Name)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001177 }
Raja Daoud52851362015-09-14 15:50:40 -07001178
1179 // SG exists locally, either locally created/joined or
1180 // previously published. Make it idempotent for the
1181 // same publisher, otherwise it's a duplicate.
1182 state, err := getSGIdEntry(ctx, tx, gid)
1183 if err != nil {
1184 return err
1185 }
1186 if state.RemotePublisher == "" {
1187 // Locally created/joined SyncGroup: update its
1188 // state to include the publisher.
1189 state.RemotePublisher = publisher
1190 return setSGIdEntry(ctx, tx, gid, state)
1191 }
1192 if publisher == state.RemotePublisher {
1193 // Same previous publisher: nothing to change,
1194 // the old genvec and version info is valid.
1195 return nil
1196 }
1197 return interfaces.NewErrDupSyncGroupPublish(ctx, sg.Name)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001198 }
1199
1200 // Publish the SyncGroup.
1201
1202 // TODO(hpucha): Use some ACL check to allow/deny publishing.
1203 // TODO(hpucha): Ensure node is on Admin ACL.
1204
Raja Daoud52851362015-09-14 15:50:40 -07001205 return s.addSyncGroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -07001206 })
1207
Raja Daoud52851362015-09-14 15:50:40 -07001208 if err == nil {
Himabindu Puchaf2796e12015-10-01 19:05:10 -07001209 s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sgOID(sg.Id))
Himabindu Puchab41fc142015-09-10 17:10:57 -07001210 }
Raja Daoud52851362015-09-14 15:50:40 -07001211 return s.name, err
Himabindu Puchafb26a832015-05-20 15:37:50 -07001212}
1213
Raja Daoud52851362015-09-14 15:50:40 -07001214func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
Himabindu Puchaf2796e12015-10-01 19:05:10 -07001215 vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: begin: %s from peer %s", sgName, joinerName)
1216 defer vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: end: %s from peer %s", sgName, joinerName)
1217
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001218 var dbSt store.Store
1219 var gid interfaces.GroupId
1220 var err error
Raja Daoud52851362015-09-14 15:50:40 -07001221 var stAppName, stDbName string
1222 nullSG, nullGV := interfaces.SyncGroup{}, interfaces.PrefixGenVector{}
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001223
1224 // Find the database store for this SyncGroup.
1225 //
1226 // TODO(hpucha): At a high level, we have yet to decide if the SG name
1227 // is stand-alone or is derived from the app/db namespace, based on the
1228 // feedback from app developers (see discussion in SyncGroup API
1229 // doc). If we decide to keep the SG name as stand-alone, this scan can
1230 // be optimized by a lazy cache of sgname to <app, db> info.
Raja Daoud28d3b602015-06-17 20:02:20 -07001231 s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001232 if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
1233 // Found the SyncGroup being looked for.
1234 dbSt = st
Raja Daoud52851362015-09-14 15:50:40 -07001235 stAppName, stDbName = appName, dbName
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001236 return true
1237 }
1238 return false
1239 })
1240
1241 // SyncGroup not found.
1242 if err != nil {
Raja Daoud52851362015-09-14 15:50:40 -07001243 return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001244 }
1245
Raja Daoud52851362015-09-14 15:50:40 -07001246 version := newSyncGroupVersion()
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001247 var sg *interfaces.SyncGroup
Himabindu Puchaf2796e12015-10-01 19:05:10 -07001248 var gen, pos uint64
Raja Daoud52851362015-09-14 15:50:40 -07001249
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -07001250 err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001251 var err error
1252 sg, err = getSyncGroupById(ctx, tx, gid)
1253 if err != nil {
1254 return err
1255 }
1256
1257 // Check SG ACL.
1258 if err := authorize(ctx, call.Security(), sg); err != nil {
1259 return err
1260 }
1261
Raja Daoud52851362015-09-14 15:50:40 -07001262 // Check that the SG is not in pending state.
1263 state, err := getSGIdEntry(ctx, tx, gid)
1264 if err != nil {
1265 return err
1266 }
1267 if state.SyncPending {
1268 return verror.NewErrBadState(ctx)
1269 }
1270
1271 // Reserve a log generation and position counts for the new SyncGroup.
Himabindu Puchaf2796e12015-10-01 19:05:10 -07001272 gen, pos = s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgOID(gid), 1)
Raja Daoud52851362015-09-14 15:50:40 -07001273
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001274 // Add to joiner list.
1275 sg.Joiners[joinerName] = joinerInfo
Raja Daoud52851362015-09-14 15:50:40 -07001276 return s.updateSyncGroupVersioning(ctx, tx, version, true, s.id, gen, pos, sg)
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001277 })
1278
1279 if err != nil {
Raja Daoud52851362015-09-14 15:50:40 -07001280 return nullSG, "", nullGV, err
Himabindu Pucha12e1a122015-06-08 15:59:09 -07001281 }
Himabindu Puchaf2796e12015-10-01 19:05:10 -07001282
1283 sgs := sgSet{gid: struct{}{}}
1284 gv, _, err := s.copyDbGenInfo(ctx, stAppName, stDbName, sgs)
1285 if err != nil {
1286 return nullSG, "", nullGV, err
1287 }
1288 // The retrieved genvector does not contain the mutation that adds the
1289 // joiner to the list since initiator is the one checkpointing the
1290 // generations. Add that generation to this genvector.
1291 gv[sgOID(gid)][s.id] = gen
1292
1293 vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgOID(gid)])
1294 return *sg, version, gv[sgOID(gid)], nil
Himabindu Puchafb26a832015-05-20 15:37:50 -07001295}