Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package vsync |
| 6 | |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 7 | // SyncGroup management and storage in Syncbase. Handles the lifecycle |
| 8 | // of SyncGroups (create, join, leave, etc.) and their persistence as |
| 9 | // sync metadata in the application databases. Provides helper functions |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 10 | // to the higher levels of sync (Initiator, Watcher) to get membership |
| 11 | // information and map key/value changes to their matching SyncGroups. |
| 12 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 13 | // TODO(hpucha): Add high level commentary about the logic behind create/join |
| 14 | // etc. |
| 15 | |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 16 | import ( |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 17 | "fmt" |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 18 | "strings" |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 19 | "time" |
| 20 | |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 21 | "v.io/v23/context" |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 22 | "v.io/v23/naming" |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 23 | "v.io/v23/rpc" |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 24 | "v.io/v23/security" |
| 25 | "v.io/v23/security/access" |
Adam Sadovsky | f2efeb5 | 2015-08-31 14:17:49 -0700 | [diff] [blame] | 26 | wire "v.io/v23/services/syncbase/nosql" |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 27 | "v.io/v23/verror" |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 28 | "v.io/v23/vom" |
Raja Daoud | 5d5fae7 | 2015-05-29 18:09:48 -0700 | [diff] [blame] | 29 | "v.io/x/lib/vlog" |
Adam Sadovsky | f2efeb5 | 2015-08-31 14:17:49 -0700 | [diff] [blame] | 30 | "v.io/x/ref/services/syncbase/server/interfaces" |
| 31 | "v.io/x/ref/services/syncbase/server/util" |
| 32 | "v.io/x/ref/services/syncbase/server/watchable" |
| 33 | "v.io/x/ref/services/syncbase/store" |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 34 | ) |
| 35 | |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 36 | var ( |
| 37 | // memberViewTTL is the shelf-life of the aggregate view of SyncGroup members. |
| 38 | memberViewTTL = 2 * time.Second |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 39 | ) |
| 40 | |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 41 | //////////////////////////////////////////////////////////// |
| 42 | // SyncGroup management internal to Syncbase. |
| 43 | |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 44 | // memberView holds an aggregated view of all SyncGroup members across |
| 45 | // databases. The view is not coherent, it gets refreshed according to a |
| 46 | // configured TTL and not (coherently) when SyncGroup membership is updated in |
| 47 | // the various databases. It is needed by the sync Initiator, which must select |
| 48 | // a peer to contact from a global view of all SyncGroup members gathered from |
| 49 | // all databases. This is why a slightly stale view is acceptable. |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 50 | // The members are identified by their Vanadium names (map keys). |
| 51 | type memberView struct { |
| 52 | expiration time.Time |
| 53 | members map[string]*memberInfo |
| 54 | } |
| 55 | |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 56 | // memberInfo holds the member metadata for each SyncGroup this member belongs |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 57 | // to within each App/Database (i.e. global database name). It's a mapping of |
| 58 | // global DB names to sets of SyncGroup member information. It also maintains |
| 59 | // all the mount table candidates that could be used to reach this peer, learned |
| 60 | // from the SyncGroup metadata. |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 61 | type memberInfo struct { |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 62 | db2sg map[string]sgMemberInfo |
| 63 | mtTables map[string]struct{} |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 64 | } |
| 65 | |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 66 | // sgMemberInfo maps SyncGroups to their member metadata. |
| 67 | type sgMemberInfo map[interfaces.GroupId]wire.SyncGroupMemberInfo |
| 68 | |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 69 | // newSyncGroupVersion generates a random SyncGroup version ("etag"). |
| 70 | func newSyncGroupVersion() string { |
Raja Daoud | 65e68b8 | 2015-06-11 16:33:33 -0700 | [diff] [blame] | 71 | return fmt.Sprintf("%x", rand64()) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 72 | } |
| 73 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 74 | // newSyncGroupId generates a random SyncGroup ID. |
| 75 | func newSyncGroupId() interfaces.GroupId { |
Raja Daoud | 65e68b8 | 2015-06-11 16:33:33 -0700 | [diff] [blame] | 76 | id := interfaces.NoGroupId |
| 77 | for id == interfaces.NoGroupId { |
| 78 | id = interfaces.GroupId(rand64()) |
| 79 | } |
| 80 | return id |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 81 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 82 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 83 | // verifySyncGroup verifies if a SyncGroup struct is well-formed. |
| 84 | func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error { |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 85 | if sg == nil { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 86 | return verror.New(verror.ErrBadArg, ctx, "group information not specified") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 87 | } |
| 88 | if sg.Name == "" { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 89 | return verror.New(verror.ErrBadArg, ctx, "group name not specified") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 90 | } |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 91 | if sg.AppName == "" { |
| 92 | return verror.New(verror.ErrBadArg, ctx, "app name not specified") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 93 | } |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 94 | if sg.DbName == "" { |
| 95 | return verror.New(verror.ErrBadArg, ctx, "db name not specified") |
| 96 | } |
| 97 | if sg.Creator == "" { |
| 98 | return verror.New(verror.ErrBadArg, ctx, "creator id not specified") |
| 99 | } |
| 100 | if sg.Id == interfaces.NoGroupId { |
| 101 | return verror.New(verror.ErrBadArg, ctx, "group id not specified") |
| 102 | } |
| 103 | if sg.SpecVersion == "" { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 104 | return verror.New(verror.ErrBadArg, ctx, "group version not specified") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 105 | } |
| 106 | if len(sg.Joiners) == 0 { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 107 | return verror.New(verror.ErrBadArg, ctx, "group has no joiners") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 108 | } |
| 109 | if len(sg.Spec.Prefixes) == 0 { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 110 | return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 111 | } |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 112 | return nil |
| 113 | } |
| 114 | |
| 115 | // addSyncGroup adds a new SyncGroup given its information. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 116 | func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error { |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 117 | // Verify SyncGroup before storing it since it may have been received |
| 118 | // from a remote peer. |
| 119 | if err := verifySyncGroup(ctx, sg); err != nil { |
| 120 | return err |
| 121 | } |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 122 | |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 123 | if ok, err := hasSGDataEntry(tx, sg.Id); err != nil { |
| 124 | return err |
| 125 | } else if ok { |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 126 | return verror.New(verror.ErrExist, ctx, "group id already exists") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 127 | } |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 128 | if ok, err := hasSGNameEntry(tx, sg.Name); err != nil { |
| 129 | return err |
| 130 | } else if ok { |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 131 | return verror.New(verror.ErrExist, ctx, "group name already exists") |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 132 | } |
| 133 | |
| 134 | // Add the group name and data entries. |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 135 | if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil { |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 136 | return err |
| 137 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 138 | if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil { |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 139 | return err |
| 140 | } |
| 141 | |
| 142 | return nil |
| 143 | } |
| 144 | |
| 145 | // getSyncGroupId retrieves the SyncGroup ID given its name. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 146 | func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 147 | return getSGNameEntry(ctx, st, name) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 148 | } |
| 149 | |
| 150 | // getSyncGroupName retrieves the SyncGroup name given its ID. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 151 | func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 152 | sg, err := getSyncGroupById(ctx, st, gid) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 153 | if err != nil { |
| 154 | return "", err |
| 155 | } |
| 156 | return sg.Name, nil |
| 157 | } |
| 158 | |
| 159 | // getSyncGroupById retrieves the SyncGroup given its ID. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 160 | func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 161 | return getSGDataEntry(ctx, st, gid) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 162 | } |
| 163 | |
| 164 | // getSyncGroupByName retrieves the SyncGroup given its name. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 165 | func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*interfaces.SyncGroup, error) { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 166 | gid, err := getSyncGroupId(ctx, st, name) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 167 | if err != nil { |
| 168 | return nil, err |
| 169 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 170 | return getSyncGroupById(ctx, st, gid) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 171 | } |
| 172 | |
| 173 | // delSyncGroupById deletes the SyncGroup given its ID. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 174 | func delSyncGroupById(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 175 | sg, err := getSyncGroupById(ctx, tx, gid) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 176 | if err != nil { |
| 177 | return err |
| 178 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 179 | if err = delSGNameEntry(ctx, tx, sg.Name); err != nil { |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 180 | return err |
| 181 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 182 | return delSGDataEntry(ctx, tx, sg.Id) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 183 | } |
| 184 | |
| 185 | // delSyncGroupByName deletes the SyncGroup given its name. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 186 | func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 187 | gid, err := getSyncGroupId(ctx, tx, name) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 188 | if err != nil { |
| 189 | return err |
| 190 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 191 | return delSyncGroupById(ctx, tx, gid) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 192 | } |
| 193 | |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 194 | // refreshMembersIfExpired updates the aggregate view of SyncGroup members |
| 195 | // across databases if the view has expired. |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 196 | // TODO(rdaoud): track dirty apps/dbs since the last refresh and incrementally |
| 197 | // update the membership view for them instead of always scanning all of them. |
| 198 | func (s *syncService) refreshMembersIfExpired(ctx *context.T) { |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 199 | view := s.allMembers |
| 200 | if view == nil { |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 201 | // The empty expiration time in Go is before "now" and treated as expired |
| 202 | // below. |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 203 | view = &memberView{expiration: time.Time{}, members: nil} |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 204 | s.allMembers = view |
| 205 | } |
| 206 | |
| 207 | if time.Now().Before(view.expiration) { |
| 208 | return |
| 209 | } |
| 210 | |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 211 | // Create a new aggregate view of SyncGroup members across all app databases. |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 212 | newMembers := make(map[string]*memberInfo) |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 213 | |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 214 | s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool { |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 215 | // For each database, fetch its SyncGroup data entries by scanning their |
| 216 | // prefix range. Use a database snapshot for the scan. |
Raja Daoud | 5d5fae7 | 2015-05-29 18:09:48 -0700 | [diff] [blame] | 217 | sn := st.NewSnapshot() |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 218 | defer sn.Abort() |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 219 | name := appDbName(appName, dbName) |
Raja Daoud | 5d5fae7 | 2015-05-29 18:09:48 -0700 | [diff] [blame] | 220 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 221 | forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool { |
Raja Daoud | 5d5fae7 | 2015-05-29 18:09:48 -0700 | [diff] [blame] | 222 | // Add all members of this SyncGroup to the membership view. |
| 223 | // A member's info is different across SyncGroups, so gather all of them. |
| 224 | for member, info := range sg.Joiners { |
| 225 | if _, ok := newMembers[member]; !ok { |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 226 | newMembers[member] = &memberInfo{ |
| 227 | db2sg: make(map[string]sgMemberInfo), |
| 228 | mtTables: make(map[string]struct{}), |
| 229 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 230 | } |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 231 | if _, ok := newMembers[member].db2sg[name]; !ok { |
| 232 | newMembers[member].db2sg[name] = make(sgMemberInfo) |
| 233 | } |
| 234 | newMembers[member].db2sg[name][sg.Id] = info |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 235 | |
| 236 | // Collect mount tables. |
| 237 | for _, mt := range sg.Spec.MountTables { |
| 238 | newMembers[member].mtTables[mt] = struct{}{} |
| 239 | } |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 240 | } |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 241 | return false |
| 242 | }) |
Raja Daoud | 5d5fae7 | 2015-05-29 18:09:48 -0700 | [diff] [blame] | 243 | return false |
| 244 | }) |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 245 | |
| 246 | view.members = newMembers |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 247 | view.expiration = time.Now().Add(memberViewTTL) |
| 248 | } |
| 249 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 250 | // forEachSyncGroup iterates over all SyncGroups in the Database and invokes |
| 251 | // the callback function on each one. The callback returns a "done" flag to |
| 252 | // make forEachSyncGroup() stop the iteration earlier; otherwise the function |
| 253 | // loops across all SyncGroups in the Database. |
| 254 | func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) { |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 255 | scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "") |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 256 | stream := st.Scan(scanStart, scanLimit) |
| 257 | for stream.Advance() { |
| 258 | var sg interfaces.SyncGroup |
| 259 | if vom.Decode(stream.Value(nil), &sg) != nil { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 260 | vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil))) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 261 | continue |
| 262 | } |
| 263 | |
| 264 | if callback(&sg) { |
| 265 | break // done, early exit |
| 266 | } |
| 267 | } |
| 268 | |
| 269 | if err := stream.Err(); err != nil { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 270 | vlog.Errorf("sync: forEachSyncGroup: scan stream error: %v", err) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 271 | } |
| 272 | } |
| 273 | |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 274 | // getMembers returns all SyncGroup members and the count of SyncGroups each one |
| 275 | // joined. |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 276 | func (s *syncService) getMembers(ctx *context.T) map[string]uint32 { |
Himabindu Pucha | 665f14c | 2015-08-13 13:42:01 -0700 | [diff] [blame] | 277 | s.allMembersLock.Lock() |
| 278 | defer s.allMembersLock.Unlock() |
| 279 | |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 280 | s.refreshMembersIfExpired(ctx) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 281 | |
| 282 | members := make(map[string]uint32) |
| 283 | for member, info := range s.allMembers.members { |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 284 | count := 0 |
| 285 | for _, sgmi := range info.db2sg { |
| 286 | count += len(sgmi) |
| 287 | } |
| 288 | members[member] = uint32(count) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 289 | } |
| 290 | |
| 291 | return members |
| 292 | } |
| 293 | |
Himabindu Pucha | 665f14c | 2015-08-13 13:42:01 -0700 | [diff] [blame] | 294 | // copyMemberInfo returns a copy of the info for the requested peer. |
| 295 | func (s *syncService) copyMemberInfo(ctx *context.T, member string) *memberInfo { |
| 296 | s.allMembersLock.RLock() |
| 297 | defer s.allMembersLock.RUnlock() |
| 298 | |
| 299 | info, ok := s.allMembers.members[member] |
| 300 | if !ok { |
| 301 | return nil |
| 302 | } |
| 303 | |
| 304 | // Make a copy. |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 305 | infoCopy := &memberInfo{ |
| 306 | db2sg: make(map[string]sgMemberInfo), |
| 307 | mtTables: make(map[string]struct{}), |
| 308 | } |
Himabindu Pucha | 665f14c | 2015-08-13 13:42:01 -0700 | [diff] [blame] | 309 | for gdbName, sgInfo := range info.db2sg { |
| 310 | infoCopy.db2sg[gdbName] = make(sgMemberInfo) |
| 311 | for gid, mi := range sgInfo { |
| 312 | infoCopy.db2sg[gdbName][gid] = mi |
| 313 | } |
| 314 | } |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 315 | for mt := range info.mtTables { |
| 316 | infoCopy.mtTables[mt] = struct{}{} |
| 317 | } |
Himabindu Pucha | 665f14c | 2015-08-13 13:42:01 -0700 | [diff] [blame] | 318 | |
| 319 | return infoCopy |
| 320 | } |
| 321 | |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 322 | // Low-level utility functions to access DB entries without tracking their |
| 323 | // relationships. |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 324 | // Use the functions above to manipulate SyncGroups. |
| 325 | |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 326 | var ( |
| 327 | // sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries. |
| 328 | sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d") |
| 329 | |
| 330 | // sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries. |
| 331 | sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n") |
| 332 | ) |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 333 | |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 334 | // sgDataKey returns the key used to access the SyncGroup data entry. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 335 | func sgDataKey(gid interfaces.GroupId) string { |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 336 | return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid)) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 337 | } |
| 338 | |
| 339 | // sgNameKey returns the key used to access the SyncGroup name entry. |
| 340 | func sgNameKey(name string) string { |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 341 | return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 342 | } |
| 343 | |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 344 | // splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name. |
| 345 | func splitSgNameKey(ctx *context.T, key string) (string, error) { |
| 346 | prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "") |
| 347 | |
| 348 | // Note that the actual SyncGroup name may contain ":" as a separator. |
| 349 | if !strings.HasPrefix(key, prefix) { |
| 350 | return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key) |
| 351 | } |
| 352 | return strings.TrimPrefix(key, prefix), nil |
| 353 | } |
| 354 | |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 355 | // hasSGDataEntry returns true if the SyncGroup data entry exists. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 356 | func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) { |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 357 | // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 358 | var sg interfaces.SyncGroup |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 359 | if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 360 | if verror.ErrorID(err) == verror.ErrNoExist.ID { |
| 361 | err = nil |
| 362 | } |
| 363 | return false, err |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 364 | } |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 365 | return true, nil |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 366 | } |
| 367 | |
| 368 | // hasSGNameEntry returns true if the SyncGroup name entry exists. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 369 | func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) { |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 370 | // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 371 | var gid interfaces.GroupId |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 372 | if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 373 | if verror.ErrorID(err) == verror.ErrNoExist.ID { |
| 374 | err = nil |
| 375 | } |
| 376 | return false, err |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 377 | } |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 378 | return true, nil |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 379 | } |
| 380 | |
| 381 | // setSGDataEntry stores the SyncGroup data entry. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 382 | func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error { |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 383 | return util.Put(ctx, tx, sgDataKey(gid), sg) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 384 | } |
| 385 | |
| 386 | // setSGNameEntry stores the SyncGroup name entry. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 387 | func setSGNameEntry(ctx *context.T, tx store.Transaction, name string, gid interfaces.GroupId) error { |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 388 | return util.Put(ctx, tx, sgNameKey(name), gid) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 389 | } |
| 390 | |
| 391 | // getSGDataEntry retrieves the SyncGroup data for a given group ID. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 392 | func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) { |
| 393 | var sg interfaces.SyncGroup |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 394 | if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil { |
| 395 | return nil, err |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 396 | } |
| 397 | return &sg, nil |
| 398 | } |
| 399 | |
| 400 | // getSGNameEntry retrieves the SyncGroup name to ID mapping. |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 401 | func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) { |
| 402 | var gid interfaces.GroupId |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 403 | if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil { |
| 404 | return gid, err |
Raja Daoud | a9cfe5e | 2015-05-28 14:38:34 -0700 | [diff] [blame] | 405 | } |
| 406 | return gid, nil |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 407 | } |
| 408 | |
| 409 | // delSGDataEntry deletes the SyncGroup data entry. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 410 | func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error { |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 411 | return util.Delete(ctx, tx, sgDataKey(gid)) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 412 | } |
| 413 | |
| 414 | // delSGNameEntry deletes the SyncGroup name to ID mapping. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 415 | func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error { |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 416 | return util.Delete(ctx, tx, sgNameKey(name)) |
Raja Daoud | 20aed3f | 2015-05-26 12:41:10 -0700 | [diff] [blame] | 417 | } |
| 418 | |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 419 | //////////////////////////////////////////////////////////// |
| 420 | // SyncGroup methods between Client and Syncbase. |
| 421 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 422 | // TODO(hpucha): Pass blessings along. |
| 423 | func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 424 | vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName) |
| 425 | defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName) |
| 426 | |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 427 | // Get this Syncbase's sync module handle. |
| 428 | ss := sd.sync.(*syncService) |
| 429 | var sg *interfaces.SyncGroup |
| 430 | |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 431 | err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error { |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 432 | // Check permissions on Database. |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 433 | if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil { |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 434 | return err |
| 435 | } |
| 436 | |
| 437 | // TODO(hpucha): Check prefix ACLs on all SG prefixes. |
| 438 | // This may need another method on util.Database interface. |
| 439 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 440 | // TODO(hpucha): Do some SG ACL checking. Check creator |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 441 | // has Admin privilege. |
| 442 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 443 | // Instantiate sg. Add self as joiner. |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 444 | sg = &interfaces.SyncGroup{ |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 445 | Id: newSyncGroupId(), |
| 446 | Name: sgName, |
| 447 | SpecVersion: newSyncGroupVersion(), |
| 448 | Spec: spec, |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 449 | Creator: ss.name, |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 450 | AppName: sd.db.App().Name(), |
| 451 | DbName: sd.db.Name(), |
| 452 | Status: interfaces.SyncGroupStatusPublishPending, |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 453 | Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo}, |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 454 | } |
| 455 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 456 | if err := addSyncGroup(ctx, tx, sg); err != nil { |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 457 | return err |
| 458 | } |
| 459 | |
| 460 | // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata. |
| 461 | |
| 462 | // Take a snapshot of the data to bootstrap the SyncGroup. |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 463 | return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 464 | }) |
| 465 | |
| 466 | if err != nil { |
| 467 | return err |
| 468 | } |
| 469 | |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 470 | ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 471 | // Local SG create succeeded. Publish the SG at the chosen server. |
| 472 | sd.publishSyncGroup(ctx, call, sgName) |
| 473 | |
| 474 | // Publish at the chosen mount table and in the neighborhood. |
| 475 | sd.publishInMountTables(ctx, call, spec) |
| 476 | |
| 477 | return nil |
| 478 | } |
| 479 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 480 | // TODO(hpucha): Pass blessings along. |
| 481 | func (sd *syncDatabase) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 482 | vlog.VI(2).Infof("sync: JoinSyncGroup: begin: %s", sgName) |
| 483 | defer vlog.VI(2).Infof("sync: JoinSyncGroup: end: %s", sgName) |
| 484 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 485 | var sgErr error |
| 486 | var sg *interfaces.SyncGroup |
| 487 | nullSpec := wire.SyncGroupSpec{} |
| 488 | |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 489 | err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error { |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 490 | // Check permissions on Database. |
| 491 | if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil { |
| 492 | return err |
| 493 | } |
| 494 | |
| 495 | // Check if SyncGroup already exists. |
| 496 | sg, sgErr = getSyncGroupByName(ctx, tx, sgName) |
| 497 | if sgErr != nil { |
| 498 | return sgErr |
| 499 | } |
| 500 | |
| 501 | // SyncGroup already exists. Possibilities include created |
| 502 | // locally, already joined locally or published at the device as |
| 503 | // a result of SyncGroup creation on a different device. |
| 504 | // |
| 505 | // TODO(hpucha): Handle the above cases. If the SG was published |
| 506 | // locally, but not joined, we need to bootstrap the DAG and |
| 507 | // watcher. If multiple joins are done locally, we may want to |
| 508 | // ref count the SG state and track the leaves accordingly. So |
| 509 | // we may need to add some local state for each SyncGroup. |
| 510 | |
| 511 | // Check SG ACL. |
| 512 | return authorize(ctx, call.Security(), sg) |
| 513 | }) |
| 514 | |
| 515 | // The presented blessing is allowed to make this Syncbase instance join |
| 516 | // the specified SyncGroup, but this Syncbase instance has in fact |
| 517 | // already joined the SyncGroup. Join is idempotent, so we simply return |
| 518 | // the spec to indicate success. |
| 519 | if err == nil { |
| 520 | return sg.Spec, nil |
| 521 | } |
| 522 | |
| 523 | // Join is not allowed (possibilities include Database permissions check |
| 524 | // failed, SG ACL check failed or error during fetching SG information). |
| 525 | if verror.ErrorID(sgErr) != verror.ErrNoExist.ID { |
| 526 | return nullSpec, err |
| 527 | } |
| 528 | |
| 529 | // Brand new join. |
| 530 | |
| 531 | // Get this Syncbase's sync module handle. |
Himabindu Pucha | 665f14c | 2015-08-13 13:42:01 -0700 | [diff] [blame] | 532 | ss := sd.sync.(*syncService) |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 533 | |
| 534 | // Contact a SyncGroup Admin to join the SyncGroup. |
Sergey Rogulenko | c2e60f6 | 2015-06-23 14:50:58 -0700 | [diff] [blame] | 535 | sg = &interfaces.SyncGroup{} |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 536 | *sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo) |
| 537 | if err != nil { |
| 538 | return nullSpec, err |
| 539 | } |
| 540 | |
| 541 | // Verify that the app/db combination is valid for this SyncGroup. |
| 542 | if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() { |
| 543 | return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup") |
| 544 | } |
| 545 | |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 546 | err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error { |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 547 | |
| 548 | // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata. |
| 549 | |
| 550 | // TODO(hpucha): Get SG Deltas from Admin device. |
| 551 | |
| 552 | if err := addSyncGroup(ctx, tx, sg); err != nil { |
| 553 | return err |
| 554 | } |
| 555 | |
| 556 | // Take a snapshot of the data to bootstrap the SyncGroup. |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 557 | return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes) |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 558 | }) |
| 559 | |
| 560 | if err != nil { |
| 561 | return nullSpec, err |
| 562 | } |
| 563 | |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 564 | ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id) |
| 565 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 566 | // Publish at the chosen mount table and in the neighborhood. |
| 567 | sd.publishInMountTables(ctx, call, sg.Spec) |
| 568 | |
| 569 | return sg.Spec, nil |
| 570 | } |
| 571 | |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 572 | func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) { |
| 573 | var sgNames []string |
| 574 | |
| 575 | vlog.VI(2).Infof("sync: GetSyncGroupNames: begin") |
| 576 | defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames) |
| 577 | |
| 578 | sn := sd.db.St().NewSnapshot() |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 579 | defer sn.Abort() |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 580 | |
| 581 | // Check permissions on Database. |
| 582 | if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil { |
| 583 | return nil, err |
| 584 | } |
| 585 | |
| 586 | // Scan all the SyncGroup names found in the Database. |
| 587 | scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "") |
| 588 | stream := sn.Scan(scanStart, scanLimit) |
| 589 | var key []byte |
| 590 | for stream.Advance() { |
| 591 | sgName, err := splitSgNameKey(ctx, string(stream.Key(key))) |
| 592 | if err != nil { |
| 593 | return nil, err |
| 594 | } |
| 595 | sgNames = append(sgNames, sgName) |
| 596 | } |
| 597 | |
| 598 | if err := stream.Err(); err != nil { |
| 599 | return nil, err |
| 600 | } |
| 601 | |
| 602 | return sgNames, nil |
| 603 | } |
| 604 | |
| 605 | func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) { |
| 606 | var spec wire.SyncGroupSpec |
| 607 | |
| 608 | vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName) |
| 609 | defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec) |
| 610 | |
| 611 | sn := sd.db.St().NewSnapshot() |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 612 | defer sn.Abort() |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 613 | |
| 614 | // Check permissions on Database. |
| 615 | if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil { |
| 616 | return spec, "", err |
| 617 | } |
| 618 | |
| 619 | // Get the SyncGroup information. |
| 620 | sg, err := getSyncGroupByName(ctx, sn, sgName) |
| 621 | if err != nil { |
| 622 | return spec, "", err |
| 623 | } |
| 624 | // TODO(hpucha): Check SyncGroup ACL. |
| 625 | |
| 626 | spec = sg.Spec |
| 627 | return spec, sg.SpecVersion, nil |
| 628 | } |
| 629 | |
| 630 | func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) { |
| 631 | var members map[string]wire.SyncGroupMemberInfo |
| 632 | |
| 633 | vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName) |
| 634 | defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members) |
| 635 | |
| 636 | sn := sd.db.St().NewSnapshot() |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 637 | defer sn.Abort() |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 638 | |
| 639 | // Check permissions on Database. |
| 640 | if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil { |
| 641 | return members, err |
| 642 | } |
| 643 | |
| 644 | // Get the SyncGroup information. |
| 645 | sg, err := getSyncGroupByName(ctx, sn, sgName) |
| 646 | if err != nil { |
| 647 | return members, err |
| 648 | } |
| 649 | |
| 650 | // TODO(hpucha): Check SyncGroup ACL. |
| 651 | |
| 652 | members = sg.Joiners |
| 653 | return members, nil |
| 654 | } |
| 655 | |
| 656 | // TODO(hpucha): Enable syncing syncgroup metadata. |
| 657 | func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error { |
| 658 | vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version) |
| 659 | defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName) |
| 660 | |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 661 | err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error { |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 662 | // Check permissions on Database. |
| 663 | if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil { |
| 664 | return err |
| 665 | } |
| 666 | |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 667 | sg, err := getSyncGroupByName(ctx, tx, sgName) |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 668 | if err != nil { |
| 669 | return err |
| 670 | } |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 671 | |
| 672 | // TODO(hpucha): Check SyncGroup ACL. Perform version checking. |
| 673 | |
| 674 | sg.Spec = spec |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 675 | return setSGDataEntry(ctx, tx, sg.Id, sg) |
Himabindu Pucha | 25a1d02 | 2015-07-21 20:46:33 -0700 | [diff] [blame] | 676 | }) |
| 677 | return err |
| 678 | } |
| 679 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 680 | ////////////////////////////// |
| 681 | // Helper functions |
| 682 | |
| 683 | // TODO(hpucha): Call this periodically until we are able to contact the remote peer. |
| 684 | func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error { |
| 685 | sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName) |
| 686 | if err != nil { |
| 687 | return err |
| 688 | } |
| 689 | |
| 690 | if sg.Status != interfaces.SyncGroupStatusPublishPending { |
| 691 | return nil |
| 692 | } |
| 693 | |
| 694 | c := interfaces.SyncClient(sgName) |
| 695 | err = c.PublishSyncGroup(ctx, *sg) |
| 696 | |
| 697 | // Publish failed temporarily. Retry later. |
| 698 | // TODO(hpucha): Is there an RPC error that we can check here? |
| 699 | if err != nil && verror.ErrorID(err) != verror.ErrExist.ID { |
| 700 | return err |
| 701 | } |
| 702 | |
| 703 | // Publish succeeded. |
| 704 | if err == nil { |
| 705 | // TODO(hpucha): Get SG Deltas from publisher. Obtaining the |
| 706 | // new version from the publisher prevents SG conflicts. |
| 707 | return err |
| 708 | } |
| 709 | |
| 710 | // Publish rejected. Persist that to avoid retrying in the |
| 711 | // future and to remember the split universe scenario. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 712 | err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error { |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 713 | // Ensure SG still exists. |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 714 | sg, err := getSyncGroupByName(ctx, tx, sgName) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 715 | if err != nil { |
| 716 | return err |
| 717 | } |
| 718 | |
| 719 | sg.Status = interfaces.SyncGroupStatusPublishRejected |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 720 | return setSGDataEntry(ctx, tx, sg.Id, sg) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 721 | }) |
| 722 | return err |
| 723 | } |
| 724 | |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 725 | // bootstrapSyncGroup inserts into the transaction log a SyncGroup operation and |
| 726 | // a set of Snapshot operations to notify the sync watcher about the SyncGroup |
| 727 | // prefixes to start accepting and the initial state of existing store keys that |
| 728 | // match these prefixes (both data and permission keys). |
| 729 | // TODO(rdaoud): this operation scans the managed keys of the database and can |
| 730 | // be time consuming. Consider doing it asynchronously and letting the server |
| 731 | // reply to the client earlier. However it must happen within the scope of this |
| 732 | // transaction (and its snapshot view). |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 733 | func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.Transaction, prefixes []string) error { |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 734 | if len(prefixes) == 0 { |
| 735 | return verror.New(verror.ErrInternal, ctx, "no prefixes specified") |
| 736 | } |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 737 | |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 738 | // Get the store options to retrieve the list of managed key prefixes. |
| 739 | opts, err := watchable.GetOptions(sd.db.St()) |
| 740 | if err != nil { |
| 741 | return err |
| 742 | } |
| 743 | if len(opts.ManagedPrefixes) == 0 { |
| 744 | return verror.New(verror.ErrInternal, ctx, "store has no managed prefixes") |
| 745 | } |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 746 | |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 747 | // Notify the watcher of the SyncGroup prefixes to start accepting. |
| 748 | if err := watchable.AddSyncGroupOp(ctx, tx, prefixes, false); err != nil { |
| 749 | return err |
| 750 | } |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 751 | |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 752 | // Loop over the store managed key prefixes (e.g. data and permissions). |
| 753 | // For each one, scan the ranges of the given SyncGroup prefixes. For |
| 754 | // each matching key, insert a snapshot operation in the log. Scanning |
| 755 | // is done over the version entries to retrieve the matching keys and |
| 756 | // their version numbers (the key values). Remove the version prefix |
| 757 | // from the key used in the snapshot operation. |
| 758 | // TODO(rdaoud): for SyncGroup prefixes, there should be a separation |
| 759 | // between their representation at the client (a list of (db, prefix) |
| 760 | // tuples) and internally as strings that match the store's key format. |
| 761 | for _, mp := range opts.ManagedPrefixes { |
| 762 | for _, p := range prefixes { |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 763 | start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.VersionPrefix, mp), p) |
| 764 | stream := tx.Scan(start, limit) |
| 765 | for stream.Advance() { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 766 | k, v := stream.Key(nil), stream.Value(nil) |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 767 | parts := util.SplitKeyParts(string(k)) |
| 768 | if len(parts) < 2 { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 769 | vlog.Fatalf("sync: bootstrapSyncGroup: invalid version key %s", string(k)) |
Raja Daoud | 0dfdd25 | 2015-07-10 20:02:22 -0700 | [diff] [blame] | 770 | |
| 771 | } |
| 772 | key := []byte(util.JoinKeyParts(parts[1:]...)) |
| 773 | if err := watchable.AddSyncSnapshotOp(ctx, tx, key, v); err != nil { |
| 774 | return err |
| 775 | } |
| 776 | |
| 777 | } |
| 778 | if err := stream.Err(); err != nil { |
| 779 | return err |
| 780 | } |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 781 | } |
| 782 | } |
| 783 | return nil |
| 784 | } |
| 785 | |
| 786 | func (sd *syncDatabase) publishInMountTables(ctx *context.T, call rpc.ServerCall, spec wire.SyncGroupSpec) error { |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 787 | // Get this Syncbase's sync module handle. |
Himabindu Pucha | 665f14c | 2015-08-13 13:42:01 -0700 | [diff] [blame] | 788 | ss := sd.sync.(*syncService) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 789 | |
| 790 | for _, mt := range spec.MountTables { |
| 791 | name := naming.Join(mt, ss.name) |
| 792 | // TODO(hpucha): Is this add idempotent? Appears to be from code. |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 793 | // Confirm that it is ok to use absolute names here. |
Matt Rosencrantz | a475b4e | 2015-09-01 13:53:21 -0700 | [diff] [blame] | 794 | if err := call.Server().AddName(name); err != nil { |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 795 | return err |
| 796 | } |
| 797 | } |
| 798 | |
| 799 | // TODO(hpucha): Do we have to publish in neighborhood explicitly? |
| 800 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 801 | return nil |
| 802 | } |
| 803 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 804 | func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) { |
| 805 | c := interfaces.SyncClient(sgName) |
| 806 | return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo) |
| 807 | |
| 808 | // TODO(hpucha): Try to join using an Admin on neighborhood if the publisher is not reachable. |
| 809 | } |
| 810 | |
| 811 | func authorize(ctx *context.T, call security.Call, sg *interfaces.SyncGroup) error { |
| 812 | auth := access.TypicalTagTypePermissionsAuthorizer(sg.Spec.Perms) |
| 813 | if err := auth.Authorize(ctx, call); err != nil { |
| 814 | return verror.New(verror.ErrNoAccess, ctx, err) |
| 815 | } |
| 816 | return nil |
| 817 | } |
| 818 | |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 819 | //////////////////////////////////////////////////////////// |
| 820 | // Methods for SyncGroup create/join between Syncbases. |
| 821 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 822 | func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error { |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 823 | st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 824 | if err != nil { |
| 825 | return err |
| 826 | } |
| 827 | |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 828 | err = store.RunInTransaction(st, func(tx store.Transaction) error { |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 829 | localSG, err := getSyncGroupByName(ctx, tx, sg.Name) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 830 | |
| 831 | if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID { |
| 832 | return err |
| 833 | } |
| 834 | |
| 835 | // SG name already claimed. |
| 836 | if err == nil && localSG.Id != sg.Id { |
| 837 | return verror.New(verror.ErrExist, ctx, sg.Name) |
| 838 | } |
| 839 | |
| 840 | // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG |
| 841 | // metadata if needed. |
| 842 | // |
| 843 | // TODO(hpucha): Catch up on SG versions so far. |
| 844 | |
| 845 | // SG already published. Update if needed. |
| 846 | if err == nil && localSG.Id == sg.Id { |
| 847 | if localSG.Status == interfaces.SyncGroupStatusPublishPending { |
| 848 | localSG.Status = interfaces.SyncGroupStatusRunning |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 849 | return setSGDataEntry(ctx, tx, localSG.Id, localSG) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 850 | } |
| 851 | return nil |
| 852 | } |
| 853 | |
| 854 | // Publish the SyncGroup. |
| 855 | |
| 856 | // TODO(hpucha): Use some ACL check to allow/deny publishing. |
| 857 | // TODO(hpucha): Ensure node is on Admin ACL. |
| 858 | |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 859 | // TODO(hpucha): Default priority? |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 860 | sg.Joiners[s.name] = wire.SyncGroupMemberInfo{} |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 861 | sg.Status = interfaces.SyncGroupStatusRunning |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 862 | return addSyncGroup(ctx, tx, &sg) |
Himabindu Pucha | f9ec56f | 2015-06-02 11:34:05 -0700 | [diff] [blame] | 863 | }) |
| 864 | |
Himabindu Pucha | b41fc14 | 2015-09-10 17:10:57 -0700 | [diff] [blame] | 865 | if err != nil { |
| 866 | return err |
| 867 | } |
| 868 | s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id) |
| 869 | return nil |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 870 | } |
| 871 | |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 872 | func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) { |
| 873 | var dbSt store.Store |
| 874 | var gid interfaces.GroupId |
| 875 | var err error |
| 876 | |
| 877 | // Find the database store for this SyncGroup. |
| 878 | // |
| 879 | // TODO(hpucha): At a high level, we have yet to decide if the SG name |
| 880 | // is stand-alone or is derived from the app/db namespace, based on the |
| 881 | // feedback from app developers (see discussion in SyncGroup API |
| 882 | // doc). If we decide to keep the SG name as stand-alone, this scan can |
| 883 | // be optimized by a lazy cache of sgname to <app, db> info. |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 884 | s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool { |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 885 | if gid, err = getSyncGroupId(ctx, st, sgName); err == nil { |
| 886 | // Found the SyncGroup being looked for. |
| 887 | dbSt = st |
| 888 | return true |
| 889 | } |
| 890 | return false |
| 891 | }) |
| 892 | |
| 893 | // SyncGroup not found. |
| 894 | if err != nil { |
| 895 | return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName) |
| 896 | } |
| 897 | |
| 898 | var sg *interfaces.SyncGroup |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 899 | err = store.RunInTransaction(dbSt, func(tx store.Transaction) error { |
Himabindu Pucha | 12e1a12 | 2015-06-08 15:59:09 -0700 | [diff] [blame] | 900 | var err error |
| 901 | sg, err = getSyncGroupById(ctx, tx, gid) |
| 902 | if err != nil { |
| 903 | return err |
| 904 | } |
| 905 | |
| 906 | // Check SG ACL. |
| 907 | if err := authorize(ctx, call.Security(), sg); err != nil { |
| 908 | return err |
| 909 | } |
| 910 | |
| 911 | // Add to joiner list. |
| 912 | sg.Joiners[joinerName] = joinerInfo |
| 913 | return setSGDataEntry(ctx, tx, sg.Id, sg) |
| 914 | }) |
| 915 | |
| 916 | if err != nil { |
| 917 | return interfaces.SyncGroup{}, err |
| 918 | } |
| 919 | return *sg, nil |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 920 | } |