blob: 91ba4994106d7a6507ffdeab63b467d4561f33a6 [file] [log] [blame]
Himabindu Puchafb26a832015-05-20 15:37:50 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package vsync
6
Raja Daouda9cfe5e2015-05-28 14:38:34 -07007// SyncGroup management and storage in Syncbase. Handles the lifecycle
8// of SyncGroups (create, join, leave, etc.) and their persistence as
9// sync metadata in the application databases. Provides helper functions
Raja Daoud20aed3f2015-05-26 12:41:10 -070010// to the higher levels of sync (Initiator, Watcher) to get membership
11// information and map key/value changes to their matching SyncGroups.
12
Himabindu Pucha12e1a122015-06-08 15:59:09 -070013// TODO(hpucha): Add high level commentary about the logic behind create/join
14// etc.
15
Himabindu Puchafb26a832015-05-20 15:37:50 -070016import (
Raja Daoud20aed3f2015-05-26 12:41:10 -070017 "fmt"
Himabindu Pucha25a1d022015-07-21 20:46:33 -070018 "strings"
Raja Daoud20aed3f2015-05-26 12:41:10 -070019 "time"
20
Himabindu Puchafb26a832015-05-20 15:37:50 -070021 "v.io/v23/context"
Himabindu Puchad964ef02015-06-30 01:10:47 -070022 "v.io/v23/naming"
Himabindu Puchafb26a832015-05-20 15:37:50 -070023 "v.io/v23/rpc"
Himabindu Pucha12e1a122015-06-08 15:59:09 -070024 "v.io/v23/security"
25 "v.io/v23/security/access"
Adam Sadovskyf2efeb52015-08-31 14:17:49 -070026 wire "v.io/v23/services/syncbase/nosql"
Himabindu Puchafb26a832015-05-20 15:37:50 -070027 "v.io/v23/verror"
Raja Daouda9cfe5e2015-05-28 14:38:34 -070028 "v.io/v23/vom"
Raja Daoud5d5fae72015-05-29 18:09:48 -070029 "v.io/x/lib/vlog"
Adam Sadovskyf2efeb52015-08-31 14:17:49 -070030 "v.io/x/ref/services/syncbase/server/interfaces"
31 "v.io/x/ref/services/syncbase/server/util"
32 "v.io/x/ref/services/syncbase/server/watchable"
33 "v.io/x/ref/services/syncbase/store"
Himabindu Puchafb26a832015-05-20 15:37:50 -070034)
35
Raja Daoud20aed3f2015-05-26 12:41:10 -070036var (
37 // memberViewTTL is the shelf-life of the aggregate view of SyncGroup members.
38 memberViewTTL = 2 * time.Second
Raja Daoud20aed3f2015-05-26 12:41:10 -070039)
40
Raja Daoud20aed3f2015-05-26 12:41:10 -070041////////////////////////////////////////////////////////////
42// SyncGroup management internal to Syncbase.
43
Adam Sadovskyb2a65692015-05-29 21:57:40 -070044// memberView holds an aggregated view of all SyncGroup members across
45// databases. The view is not coherent, it gets refreshed according to a
46// configured TTL and not (coherently) when SyncGroup membership is updated in
47// the various databases. It is needed by the sync Initiator, which must select
48// a peer to contact from a global view of all SyncGroup members gathered from
49// all databases. This is why a slightly stale view is acceptable.
Raja Daoud20aed3f2015-05-26 12:41:10 -070050// The members are identified by their Vanadium names (map keys).
51type memberView struct {
52 expiration time.Time
53 members map[string]*memberInfo
54}
55
Adam Sadovskyb2a65692015-05-29 21:57:40 -070056// memberInfo holds the member metadata for each SyncGroup this member belongs
Himabindu Puchab41fc142015-09-10 17:10:57 -070057// to within each App/Database (i.e. global database name). It's a mapping of
58// global DB names to sets of SyncGroup member information. It also maintains
59// all the mount table candidates that could be used to reach this peer, learned
60// from the SyncGroup metadata.
Raja Daoud20aed3f2015-05-26 12:41:10 -070061type memberInfo struct {
Himabindu Puchab41fc142015-09-10 17:10:57 -070062 db2sg map[string]sgMemberInfo
63 mtTables map[string]struct{}
Raja Daoud20aed3f2015-05-26 12:41:10 -070064}
65
Raja Daoud28d3b602015-06-17 20:02:20 -070066// sgMemberInfo maps SyncGroups to their member metadata.
67type sgMemberInfo map[interfaces.GroupId]wire.SyncGroupMemberInfo
68
Raja Daoud20aed3f2015-05-26 12:41:10 -070069// newSyncGroupVersion generates a random SyncGroup version ("etag").
70func newSyncGroupVersion() string {
Raja Daoud65e68b82015-06-11 16:33:33 -070071 return fmt.Sprintf("%x", rand64())
Raja Daoud20aed3f2015-05-26 12:41:10 -070072}
73
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070074// newSyncGroupId generates a random SyncGroup ID.
75func newSyncGroupId() interfaces.GroupId {
Raja Daoud65e68b82015-06-11 16:33:33 -070076 id := interfaces.NoGroupId
77 for id == interfaces.NoGroupId {
78 id = interfaces.GroupId(rand64())
79 }
80 return id
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070081}
Raja Daouda9cfe5e2015-05-28 14:38:34 -070082
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070083// verifySyncGroup verifies if a SyncGroup struct is well-formed.
84func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
Raja Daoud20aed3f2015-05-26 12:41:10 -070085 if sg == nil {
Raja Daouda9cfe5e2015-05-28 14:38:34 -070086 return verror.New(verror.ErrBadArg, ctx, "group information not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -070087 }
88 if sg.Name == "" {
Raja Daouda9cfe5e2015-05-28 14:38:34 -070089 return verror.New(verror.ErrBadArg, ctx, "group name not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -070090 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070091 if sg.AppName == "" {
92 return verror.New(verror.ErrBadArg, ctx, "app name not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -070093 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -070094 if sg.DbName == "" {
95 return verror.New(verror.ErrBadArg, ctx, "db name not specified")
96 }
97 if sg.Creator == "" {
98 return verror.New(verror.ErrBadArg, ctx, "creator id not specified")
99 }
100 if sg.Id == interfaces.NoGroupId {
101 return verror.New(verror.ErrBadArg, ctx, "group id not specified")
102 }
103 if sg.SpecVersion == "" {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700104 return verror.New(verror.ErrBadArg, ctx, "group version not specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700105 }
106 if len(sg.Joiners) == 0 {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700107 return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700108 }
109 if len(sg.Spec.Prefixes) == 0 {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700110 return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700111 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700112 return nil
113}
114
115// addSyncGroup adds a new SyncGroup given its information.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700116func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700117 // Verify SyncGroup before storing it since it may have been received
118 // from a remote peer.
119 if err := verifySyncGroup(ctx, sg); err != nil {
120 return err
121 }
Raja Daoud20aed3f2015-05-26 12:41:10 -0700122
Raja Daoud4171c9c2015-07-14 20:07:44 -0700123 if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
124 return err
125 } else if ok {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700126 return verror.New(verror.ErrExist, ctx, "group id already exists")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700127 }
Raja Daoud4171c9c2015-07-14 20:07:44 -0700128 if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
129 return err
130 } else if ok {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700131 return verror.New(verror.ErrExist, ctx, "group name already exists")
Raja Daoud20aed3f2015-05-26 12:41:10 -0700132 }
133
134 // Add the group name and data entries.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700135 if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700136 return err
137 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700138 if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700139 return err
140 }
141
142 return nil
143}
144
145// getSyncGroupId retrieves the SyncGroup ID given its name.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700146func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700147 return getSGNameEntry(ctx, st, name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700148}
149
150// getSyncGroupName retrieves the SyncGroup name given its ID.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700151func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700152 sg, err := getSyncGroupById(ctx, st, gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700153 if err != nil {
154 return "", err
155 }
156 return sg.Name, nil
157}
158
159// getSyncGroupById retrieves the SyncGroup given its ID.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700160func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700161 return getSGDataEntry(ctx, st, gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700162}
163
164// getSyncGroupByName retrieves the SyncGroup given its name.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700165func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*interfaces.SyncGroup, error) {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700166 gid, err := getSyncGroupId(ctx, st, name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700167 if err != nil {
168 return nil, err
169 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700170 return getSyncGroupById(ctx, st, gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700171}
172
173// delSyncGroupById deletes the SyncGroup given its ID.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700174func delSyncGroupById(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700175 sg, err := getSyncGroupById(ctx, tx, gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700176 if err != nil {
177 return err
178 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700179 if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700180 return err
181 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700182 return delSGDataEntry(ctx, tx, sg.Id)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700183}
184
185// delSyncGroupByName deletes the SyncGroup given its name.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700186func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700187 gid, err := getSyncGroupId(ctx, tx, name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700188 if err != nil {
189 return err
190 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700191 return delSyncGroupById(ctx, tx, gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700192}
193
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700194// refreshMembersIfExpired updates the aggregate view of SyncGroup members
195// across databases if the view has expired.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700196// TODO(rdaoud): track dirty apps/dbs since the last refresh and incrementally
197// update the membership view for them instead of always scanning all of them.
198func (s *syncService) refreshMembersIfExpired(ctx *context.T) {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700199 view := s.allMembers
200 if view == nil {
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700201 // The empty expiration time in Go is before "now" and treated as expired
202 // below.
Raja Daoud28d3b602015-06-17 20:02:20 -0700203 view = &memberView{expiration: time.Time{}, members: nil}
Raja Daoud20aed3f2015-05-26 12:41:10 -0700204 s.allMembers = view
205 }
206
207 if time.Now().Before(view.expiration) {
208 return
209 }
210
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700211 // Create a new aggregate view of SyncGroup members across all app databases.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700212 newMembers := make(map[string]*memberInfo)
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700213
Raja Daoud28d3b602015-06-17 20:02:20 -0700214 s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700215 // For each database, fetch its SyncGroup data entries by scanning their
216 // prefix range. Use a database snapshot for the scan.
Raja Daoud5d5fae72015-05-29 18:09:48 -0700217 sn := st.NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700218 defer sn.Abort()
Raja Daoud28d3b602015-06-17 20:02:20 -0700219 name := appDbName(appName, dbName)
Raja Daoud5d5fae72015-05-29 18:09:48 -0700220
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700221 forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool {
Raja Daoud5d5fae72015-05-29 18:09:48 -0700222 // Add all members of this SyncGroup to the membership view.
223 // A member's info is different across SyncGroups, so gather all of them.
224 for member, info := range sg.Joiners {
225 if _, ok := newMembers[member]; !ok {
Himabindu Puchab41fc142015-09-10 17:10:57 -0700226 newMembers[member] = &memberInfo{
227 db2sg: make(map[string]sgMemberInfo),
228 mtTables: make(map[string]struct{}),
229 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700230 }
Raja Daoud28d3b602015-06-17 20:02:20 -0700231 if _, ok := newMembers[member].db2sg[name]; !ok {
232 newMembers[member].db2sg[name] = make(sgMemberInfo)
233 }
234 newMembers[member].db2sg[name][sg.Id] = info
Himabindu Puchab41fc142015-09-10 17:10:57 -0700235
236 // Collect mount tables.
237 for _, mt := range sg.Spec.MountTables {
238 newMembers[member].mtTables[mt] = struct{}{}
239 }
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700240 }
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700241 return false
242 })
Raja Daoud5d5fae72015-05-29 18:09:48 -0700243 return false
244 })
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700245
246 view.members = newMembers
Raja Daoud20aed3f2015-05-26 12:41:10 -0700247 view.expiration = time.Now().Add(memberViewTTL)
248}
249
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700250// forEachSyncGroup iterates over all SyncGroups in the Database and invokes
251// the callback function on each one. The callback returns a "done" flag to
252// make forEachSyncGroup() stop the iteration earlier; otherwise the function
253// loops across all SyncGroups in the Database.
254func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700255 scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700256 stream := st.Scan(scanStart, scanLimit)
257 for stream.Advance() {
258 var sg interfaces.SyncGroup
259 if vom.Decode(stream.Value(nil), &sg) != nil {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700260 vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700261 continue
262 }
263
264 if callback(&sg) {
265 break // done, early exit
266 }
267 }
268
269 if err := stream.Err(); err != nil {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700270 vlog.Errorf("sync: forEachSyncGroup: scan stream error: %v", err)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700271 }
272}
273
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700274// getMembers returns all SyncGroup members and the count of SyncGroups each one
275// joined.
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700276func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700277 s.allMembersLock.Lock()
278 defer s.allMembersLock.Unlock()
279
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700280 s.refreshMembersIfExpired(ctx)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700281
282 members := make(map[string]uint32)
283 for member, info := range s.allMembers.members {
Raja Daoud28d3b602015-06-17 20:02:20 -0700284 count := 0
285 for _, sgmi := range info.db2sg {
286 count += len(sgmi)
287 }
288 members[member] = uint32(count)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700289 }
290
291 return members
292}
293
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700294// copyMemberInfo returns a copy of the info for the requested peer.
295func (s *syncService) copyMemberInfo(ctx *context.T, member string) *memberInfo {
296 s.allMembersLock.RLock()
297 defer s.allMembersLock.RUnlock()
298
299 info, ok := s.allMembers.members[member]
300 if !ok {
301 return nil
302 }
303
304 // Make a copy.
Himabindu Puchab41fc142015-09-10 17:10:57 -0700305 infoCopy := &memberInfo{
306 db2sg: make(map[string]sgMemberInfo),
307 mtTables: make(map[string]struct{}),
308 }
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700309 for gdbName, sgInfo := range info.db2sg {
310 infoCopy.db2sg[gdbName] = make(sgMemberInfo)
311 for gid, mi := range sgInfo {
312 infoCopy.db2sg[gdbName][gid] = mi
313 }
314 }
Himabindu Puchab41fc142015-09-10 17:10:57 -0700315 for mt := range info.mtTables {
316 infoCopy.mtTables[mt] = struct{}{}
317 }
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700318
319 return infoCopy
320}
321
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700322// Low-level utility functions to access DB entries without tracking their
323// relationships.
Raja Daoud20aed3f2015-05-26 12:41:10 -0700324// Use the functions above to manipulate SyncGroups.
325
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700326var (
327 // sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
328 sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
329
330 // sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
331 sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
332)
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700333
Raja Daoud20aed3f2015-05-26 12:41:10 -0700334// sgDataKey returns the key used to access the SyncGroup data entry.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700335func sgDataKey(gid interfaces.GroupId) string {
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700336 return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
Raja Daoud20aed3f2015-05-26 12:41:10 -0700337}
338
339// sgNameKey returns the key used to access the SyncGroup name entry.
340func sgNameKey(name string) string {
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700341 return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700342}
343
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700344// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
345func splitSgNameKey(ctx *context.T, key string) (string, error) {
346 prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
347
348 // Note that the actual SyncGroup name may contain ":" as a separator.
349 if !strings.HasPrefix(key, prefix) {
350 return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
351 }
352 return strings.TrimPrefix(key, prefix), nil
353}
354
Raja Daoud20aed3f2015-05-26 12:41:10 -0700355// hasSGDataEntry returns true if the SyncGroup data entry exists.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700356func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700357 // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700358 var sg interfaces.SyncGroup
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700359 if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700360 if verror.ErrorID(err) == verror.ErrNoExist.ID {
361 err = nil
362 }
363 return false, err
Raja Daoud20aed3f2015-05-26 12:41:10 -0700364 }
Raja Daoud4171c9c2015-07-14 20:07:44 -0700365 return true, nil
Raja Daoud20aed3f2015-05-26 12:41:10 -0700366}
367
368// hasSGNameEntry returns true if the SyncGroup name entry exists.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700369func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
Raja Daoud20aed3f2015-05-26 12:41:10 -0700370 // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700371 var gid interfaces.GroupId
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700372 if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700373 if verror.ErrorID(err) == verror.ErrNoExist.ID {
374 err = nil
375 }
376 return false, err
Raja Daoud20aed3f2015-05-26 12:41:10 -0700377 }
Raja Daoud4171c9c2015-07-14 20:07:44 -0700378 return true, nil
Raja Daoud20aed3f2015-05-26 12:41:10 -0700379}
380
381// setSGDataEntry stores the SyncGroup data entry.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700382func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700383 return util.Put(ctx, tx, sgDataKey(gid), sg)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700384}
385
386// setSGNameEntry stores the SyncGroup name entry.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700387func setSGNameEntry(ctx *context.T, tx store.Transaction, name string, gid interfaces.GroupId) error {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700388 return util.Put(ctx, tx, sgNameKey(name), gid)
Raja Daoud20aed3f2015-05-26 12:41:10 -0700389}
390
391// getSGDataEntry retrieves the SyncGroup data for a given group ID.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700392func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
393 var sg interfaces.SyncGroup
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700394 if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil {
395 return nil, err
Raja Daoud20aed3f2015-05-26 12:41:10 -0700396 }
397 return &sg, nil
398}
399
400// getSGNameEntry retrieves the SyncGroup name to ID mapping.
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700401func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
402 var gid interfaces.GroupId
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700403 if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
404 return gid, err
Raja Daouda9cfe5e2015-05-28 14:38:34 -0700405 }
406 return gid, nil
Raja Daoud20aed3f2015-05-26 12:41:10 -0700407}
408
409// delSGDataEntry deletes the SyncGroup data entry.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700410func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700411 return util.Delete(ctx, tx, sgDataKey(gid))
Raja Daoud20aed3f2015-05-26 12:41:10 -0700412}
413
414// delSGNameEntry deletes the SyncGroup name to ID mapping.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700415func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700416 return util.Delete(ctx, tx, sgNameKey(name))
Raja Daoud20aed3f2015-05-26 12:41:10 -0700417}
418
Himabindu Puchafb26a832015-05-20 15:37:50 -0700419////////////////////////////////////////////////////////////
420// SyncGroup methods between Client and Syncbase.
421
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700422// TODO(hpucha): Pass blessings along.
423func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700424 vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
425 defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
426
Himabindu Puchab41fc142015-09-10 17:10:57 -0700427 // Get this Syncbase's sync module handle.
428 ss := sd.sync.(*syncService)
429 var sg *interfaces.SyncGroup
430
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700431 err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700432 // Check permissions on Database.
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700433 if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700434 return err
435 }
436
437 // TODO(hpucha): Check prefix ACLs on all SG prefixes.
438 // This may need another method on util.Database interface.
439
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700440 // TODO(hpucha): Do some SG ACL checking. Check creator
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700441 // has Admin privilege.
442
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700443 // Instantiate sg. Add self as joiner.
Himabindu Puchab41fc142015-09-10 17:10:57 -0700444 sg = &interfaces.SyncGroup{
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700445 Id: newSyncGroupId(),
446 Name: sgName,
447 SpecVersion: newSyncGroupVersion(),
448 Spec: spec,
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700449 Creator: ss.name,
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700450 AppName: sd.db.App().Name(),
451 DbName: sd.db.Name(),
452 Status: interfaces.SyncGroupStatusPublishPending,
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700453 Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700454 }
455
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700456 if err := addSyncGroup(ctx, tx, sg); err != nil {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700457 return err
458 }
459
460 // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
461
462 // Take a snapshot of the data to bootstrap the SyncGroup.
Raja Daoud0dfdd252015-07-10 20:02:22 -0700463 return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700464 })
465
466 if err != nil {
467 return err
468 }
469
Himabindu Puchab41fc142015-09-10 17:10:57 -0700470 ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700471 // Local SG create succeeded. Publish the SG at the chosen server.
472 sd.publishSyncGroup(ctx, call, sgName)
473
474 // Publish at the chosen mount table and in the neighborhood.
475 sd.publishInMountTables(ctx, call, spec)
476
477 return nil
478}
479
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700480// TODO(hpucha): Pass blessings along.
481func (sd *syncDatabase) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700482 vlog.VI(2).Infof("sync: JoinSyncGroup: begin: %s", sgName)
483 defer vlog.VI(2).Infof("sync: JoinSyncGroup: end: %s", sgName)
484
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700485 var sgErr error
486 var sg *interfaces.SyncGroup
487 nullSpec := wire.SyncGroupSpec{}
488
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700489 err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700490 // Check permissions on Database.
491 if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
492 return err
493 }
494
495 // Check if SyncGroup already exists.
496 sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
497 if sgErr != nil {
498 return sgErr
499 }
500
501 // SyncGroup already exists. Possibilities include created
502 // locally, already joined locally or published at the device as
503 // a result of SyncGroup creation on a different device.
504 //
505 // TODO(hpucha): Handle the above cases. If the SG was published
506 // locally, but not joined, we need to bootstrap the DAG and
507 // watcher. If multiple joins are done locally, we may want to
508 // ref count the SG state and track the leaves accordingly. So
509 // we may need to add some local state for each SyncGroup.
510
511 // Check SG ACL.
512 return authorize(ctx, call.Security(), sg)
513 })
514
515 // The presented blessing is allowed to make this Syncbase instance join
516 // the specified SyncGroup, but this Syncbase instance has in fact
517 // already joined the SyncGroup. Join is idempotent, so we simply return
518 // the spec to indicate success.
519 if err == nil {
520 return sg.Spec, nil
521 }
522
523 // Join is not allowed (possibilities include Database permissions check
524 // failed, SG ACL check failed or error during fetching SG information).
525 if verror.ErrorID(sgErr) != verror.ErrNoExist.ID {
526 return nullSpec, err
527 }
528
529 // Brand new join.
530
531 // Get this Syncbase's sync module handle.
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700532 ss := sd.sync.(*syncService)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700533
534 // Contact a SyncGroup Admin to join the SyncGroup.
Sergey Rogulenkoc2e60f62015-06-23 14:50:58 -0700535 sg = &interfaces.SyncGroup{}
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700536 *sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
537 if err != nil {
538 return nullSpec, err
539 }
540
541 // Verify that the app/db combination is valid for this SyncGroup.
542 if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
543 return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
544 }
545
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700546 err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700547
548 // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
549
550 // TODO(hpucha): Get SG Deltas from Admin device.
551
552 if err := addSyncGroup(ctx, tx, sg); err != nil {
553 return err
554 }
555
556 // Take a snapshot of the data to bootstrap the SyncGroup.
Raja Daoud0dfdd252015-07-10 20:02:22 -0700557 return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700558 })
559
560 if err != nil {
561 return nullSpec, err
562 }
563
Himabindu Puchab41fc142015-09-10 17:10:57 -0700564 ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
565
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700566 // Publish at the chosen mount table and in the neighborhood.
567 sd.publishInMountTables(ctx, call, sg.Spec)
568
569 return sg.Spec, nil
570}
571
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700572func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
573 var sgNames []string
574
575 vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
576 defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
577
578 sn := sd.db.St().NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700579 defer sn.Abort()
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700580
581 // Check permissions on Database.
582 if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
583 return nil, err
584 }
585
586 // Scan all the SyncGroup names found in the Database.
587 scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
588 stream := sn.Scan(scanStart, scanLimit)
589 var key []byte
590 for stream.Advance() {
591 sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
592 if err != nil {
593 return nil, err
594 }
595 sgNames = append(sgNames, sgName)
596 }
597
598 if err := stream.Err(); err != nil {
599 return nil, err
600 }
601
602 return sgNames, nil
603}
604
605func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
606 var spec wire.SyncGroupSpec
607
608 vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
609 defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
610
611 sn := sd.db.St().NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700612 defer sn.Abort()
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700613
614 // Check permissions on Database.
615 if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
616 return spec, "", err
617 }
618
619 // Get the SyncGroup information.
620 sg, err := getSyncGroupByName(ctx, sn, sgName)
621 if err != nil {
622 return spec, "", err
623 }
624 // TODO(hpucha): Check SyncGroup ACL.
625
626 spec = sg.Spec
627 return spec, sg.SpecVersion, nil
628}
629
630func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
631 var members map[string]wire.SyncGroupMemberInfo
632
633 vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
634 defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
635
636 sn := sd.db.St().NewSnapshot()
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700637 defer sn.Abort()
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700638
639 // Check permissions on Database.
640 if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
641 return members, err
642 }
643
644 // Get the SyncGroup information.
645 sg, err := getSyncGroupByName(ctx, sn, sgName)
646 if err != nil {
647 return members, err
648 }
649
650 // TODO(hpucha): Check SyncGroup ACL.
651
652 members = sg.Joiners
653 return members, nil
654}
655
656// TODO(hpucha): Enable syncing syncgroup metadata.
657func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
658 vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
659 defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
660
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700661 err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700662 // Check permissions on Database.
663 if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
664 return err
665 }
666
Raja Daoudccfd6c12015-08-03 18:46:28 -0700667 sg, err := getSyncGroupByName(ctx, tx, sgName)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700668 if err != nil {
669 return err
670 }
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700671
672 // TODO(hpucha): Check SyncGroup ACL. Perform version checking.
673
674 sg.Spec = spec
Raja Daoudccfd6c12015-08-03 18:46:28 -0700675 return setSGDataEntry(ctx, tx, sg.Id, sg)
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700676 })
677 return err
678}
679
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700680//////////////////////////////
681// Helper functions
682
683// TODO(hpucha): Call this periodically until we are able to contact the remote peer.
684func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
685 sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName)
686 if err != nil {
687 return err
688 }
689
690 if sg.Status != interfaces.SyncGroupStatusPublishPending {
691 return nil
692 }
693
694 c := interfaces.SyncClient(sgName)
695 err = c.PublishSyncGroup(ctx, *sg)
696
697 // Publish failed temporarily. Retry later.
698 // TODO(hpucha): Is there an RPC error that we can check here?
699 if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
700 return err
701 }
702
703 // Publish succeeded.
704 if err == nil {
705 // TODO(hpucha): Get SG Deltas from publisher. Obtaining the
706 // new version from the publisher prevents SG conflicts.
707 return err
708 }
709
710 // Publish rejected. Persist that to avoid retrying in the
711 // future and to remember the split universe scenario.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700712 err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700713 // Ensure SG still exists.
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700714 sg, err := getSyncGroupByName(ctx, tx, sgName)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700715 if err != nil {
716 return err
717 }
718
719 sg.Status = interfaces.SyncGroupStatusPublishRejected
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700720 return setSGDataEntry(ctx, tx, sg.Id, sg)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700721 })
722 return err
723}
724
Raja Daoud0dfdd252015-07-10 20:02:22 -0700725// bootstrapSyncGroup inserts into the transaction log a SyncGroup operation and
726// a set of Snapshot operations to notify the sync watcher about the SyncGroup
727// prefixes to start accepting and the initial state of existing store keys that
728// match these prefixes (both data and permission keys).
729// TODO(rdaoud): this operation scans the managed keys of the database and can
730// be time consuming. Consider doing it asynchronously and letting the server
731// reply to the client earlier. However it must happen within the scope of this
732// transaction (and its snapshot view).
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700733func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.Transaction, prefixes []string) error {
Raja Daoud0dfdd252015-07-10 20:02:22 -0700734 if len(prefixes) == 0 {
735 return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
736 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700737
Raja Daoud0dfdd252015-07-10 20:02:22 -0700738 // Get the store options to retrieve the list of managed key prefixes.
739 opts, err := watchable.GetOptions(sd.db.St())
740 if err != nil {
741 return err
742 }
743 if len(opts.ManagedPrefixes) == 0 {
744 return verror.New(verror.ErrInternal, ctx, "store has no managed prefixes")
745 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700746
Raja Daoud0dfdd252015-07-10 20:02:22 -0700747 // Notify the watcher of the SyncGroup prefixes to start accepting.
748 if err := watchable.AddSyncGroupOp(ctx, tx, prefixes, false); err != nil {
749 return err
750 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700751
Raja Daoud0dfdd252015-07-10 20:02:22 -0700752 // Loop over the store managed key prefixes (e.g. data and permissions).
753 // For each one, scan the ranges of the given SyncGroup prefixes. For
754 // each matching key, insert a snapshot operation in the log. Scanning
755 // is done over the version entries to retrieve the matching keys and
756 // their version numbers (the key values). Remove the version prefix
757 // from the key used in the snapshot operation.
758 // TODO(rdaoud): for SyncGroup prefixes, there should be a separation
759 // between their representation at the client (a list of (db, prefix)
760 // tuples) and internally as strings that match the store's key format.
761 for _, mp := range opts.ManagedPrefixes {
762 for _, p := range prefixes {
Raja Daoud0dfdd252015-07-10 20:02:22 -0700763 start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.VersionPrefix, mp), p)
764 stream := tx.Scan(start, limit)
765 for stream.Advance() {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700766 k, v := stream.Key(nil), stream.Value(nil)
Raja Daoud0dfdd252015-07-10 20:02:22 -0700767 parts := util.SplitKeyParts(string(k))
768 if len(parts) < 2 {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700769 vlog.Fatalf("sync: bootstrapSyncGroup: invalid version key %s", string(k))
Raja Daoud0dfdd252015-07-10 20:02:22 -0700770
771 }
772 key := []byte(util.JoinKeyParts(parts[1:]...))
773 if err := watchable.AddSyncSnapshotOp(ctx, tx, key, v); err != nil {
774 return err
775 }
776
777 }
778 if err := stream.Err(); err != nil {
779 return err
780 }
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700781 }
782 }
783 return nil
784}
785
786func (sd *syncDatabase) publishInMountTables(ctx *context.T, call rpc.ServerCall, spec wire.SyncGroupSpec) error {
Himabindu Puchad964ef02015-06-30 01:10:47 -0700787 // Get this Syncbase's sync module handle.
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700788 ss := sd.sync.(*syncService)
Himabindu Puchad964ef02015-06-30 01:10:47 -0700789
790 for _, mt := range spec.MountTables {
791 name := naming.Join(mt, ss.name)
792 // TODO(hpucha): Is this add idempotent? Appears to be from code.
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700793 // Confirm that it is ok to use absolute names here.
Matt Rosencrantza475b4e2015-09-01 13:53:21 -0700794 if err := call.Server().AddName(name); err != nil {
Himabindu Puchad964ef02015-06-30 01:10:47 -0700795 return err
796 }
797 }
798
799 // TODO(hpucha): Do we have to publish in neighborhood explicitly?
800
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700801 return nil
802}
803
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700804func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
805 c := interfaces.SyncClient(sgName)
806 return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
807
808 // TODO(hpucha): Try to join using an Admin on neighborhood if the publisher is not reachable.
809}
810
811func authorize(ctx *context.T, call security.Call, sg *interfaces.SyncGroup) error {
812 auth := access.TypicalTagTypePermissionsAuthorizer(sg.Spec.Perms)
813 if err := auth.Authorize(ctx, call); err != nil {
814 return verror.New(verror.ErrNoAccess, ctx, err)
815 }
816 return nil
817}
818
Himabindu Puchafb26a832015-05-20 15:37:50 -0700819////////////////////////////////////////////////////////////
820// Methods for SyncGroup create/join between Syncbases.
821
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700822func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
Himabindu Pucha53497022015-06-19 11:57:13 -0700823 st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700824 if err != nil {
825 return err
826 }
827
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700828 err = store.RunInTransaction(st, func(tx store.Transaction) error {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700829 localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700830
831 if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
832 return err
833 }
834
835 // SG name already claimed.
836 if err == nil && localSG.Id != sg.Id {
837 return verror.New(verror.ErrExist, ctx, sg.Name)
838 }
839
840 // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG
841 // metadata if needed.
842 //
843 // TODO(hpucha): Catch up on SG versions so far.
844
845 // SG already published. Update if needed.
846 if err == nil && localSG.Id == sg.Id {
847 if localSG.Status == interfaces.SyncGroupStatusPublishPending {
848 localSG.Status = interfaces.SyncGroupStatusRunning
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700849 return setSGDataEntry(ctx, tx, localSG.Id, localSG)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700850 }
851 return nil
852 }
853
854 // Publish the SyncGroup.
855
856 // TODO(hpucha): Use some ACL check to allow/deny publishing.
857 // TODO(hpucha): Ensure node is on Admin ACL.
858
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700859 // TODO(hpucha): Default priority?
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700860 sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700861 sg.Status = interfaces.SyncGroupStatusRunning
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700862 return addSyncGroup(ctx, tx, &sg)
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700863 })
864
Himabindu Puchab41fc142015-09-10 17:10:57 -0700865 if err != nil {
866 return err
867 }
868 s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
869 return nil
Himabindu Puchafb26a832015-05-20 15:37:50 -0700870}
871
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700872func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
873 var dbSt store.Store
874 var gid interfaces.GroupId
875 var err error
876
877 // Find the database store for this SyncGroup.
878 //
879 // TODO(hpucha): At a high level, we have yet to decide if the SG name
880 // is stand-alone or is derived from the app/db namespace, based on the
881 // feedback from app developers (see discussion in SyncGroup API
882 // doc). If we decide to keep the SG name as stand-alone, this scan can
883 // be optimized by a lazy cache of sgname to <app, db> info.
Raja Daoud28d3b602015-06-17 20:02:20 -0700884 s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700885 if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
886 // Found the SyncGroup being looked for.
887 dbSt = st
888 return true
889 }
890 return false
891 })
892
893 // SyncGroup not found.
894 if err != nil {
895 return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
896 }
897
898 var sg *interfaces.SyncGroup
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700899 err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
Himabindu Pucha12e1a122015-06-08 15:59:09 -0700900 var err error
901 sg, err = getSyncGroupById(ctx, tx, gid)
902 if err != nil {
903 return err
904 }
905
906 // Check SG ACL.
907 if err := authorize(ctx, call.Security(), sg); err != nil {
908 return err
909 }
910
911 // Add to joiner list.
912 sg.Joiners[joinerName] = joinerInfo
913 return setSGDataEntry(ctx, tx, sg.Id, sg)
914 })
915
916 if err != nil {
917 return interfaces.SyncGroup{}, err
918 }
919 return *sg, nil
Himabindu Puchafb26a832015-05-20 15:37:50 -0700920}