blob: de5824bb26b70920ec2659e27899b2175c236c3d [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001// Package server implements a storage service.
Adam Sadovsky04c5eef2014-07-30 17:33:27 -07002
Jiri Simsa5293dcb2014-05-10 09:56:38 -07003package server
4
Adam Sadovsky04c5eef2014-07-30 17:33:27 -07005// This file defines Server, which implements the server-side Store API from
6// veyron2/services/store/service.vdl.
7
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008import (
Adam Sadovsky04c5eef2014-07-30 17:33:27 -07009 "fmt"
10 "math/rand"
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070011 "reflect"
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070012 "strconv"
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -070013 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070014 "sync"
15 "time"
16
Jiri Simsa5293dcb2014-05-10 09:56:38 -070017 "veyron/services/store/memstore"
18 memwatch "veyron/services/store/memstore/watch"
Tilak Sharma2af70022014-05-14 11:29:27 -070019 "veyron/services/store/raw"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070020
Jiri Simsa5293dcb2014-05-10 09:56:38 -070021 "veyron2/ipc"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070022 "veyron2/security"
23 "veyron2/services/store"
Benjamin Prosnitz552aa072014-06-09 13:13:25 -070024 "veyron2/verror"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070025)
26
27const (
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070028 // transactionMaxLifetime is the maximum duration before a transaction will
Jiri Simsa5293dcb2014-05-10 09:56:38 -070029 // be garbage collected.
30 //
31 // TODO(jyh): This should probably be a configuration parameter.
32 transactionMaxLifetime = 30 * time.Second
33)
34
35var (
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070036 errNestedTransaction = verror.BadArgf("cannot create a nested Transaction")
37 // Note, this can happen e.g. due to expiration.
38 errTransactionDoesNotExist = verror.NotFoundf("transaction does not exist")
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070039 // Transaction exists, but may not be used by the caller.
Tilak Sharmae189e6a2014-07-14 11:45:22 -070040 errPermissionDenied = verror.NotAuthorizedf("permission denied")
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070041
42 rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
43
44 nullTransactionID transactionID
Jiri Simsa5293dcb2014-05-10 09:56:38 -070045)
46
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070047// Server implements Store and uses memstore internally.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048type Server struct {
49 mutex sync.RWMutex
50
51 // store is the actual store implementation.
Ken Ashcraft18279f52014-07-31 10:19:08 -070052 store *memstore.Store
Jiri Simsa5293dcb2014-05-10 09:56:38 -070053
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070054 // transactions is the set of active transactions.
55 transactions map[transactionID]*transaction
Jiri Simsa5293dcb2014-05-10 09:56:38 -070056
57 // Transaction garbage collection.
58 pending sync.WaitGroup
59 ticker *time.Ticker
60 closed chan struct{}
61
62 // watcher is the actual store watcher implementation.
Ken Ashcraftb7e15ad2014-08-04 14:38:37 -070063 watcher *memwatch.Watcher
Jiri Simsa5293dcb2014-05-10 09:56:38 -070064}
65
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070066// transactionID is an internal transaction identifier chosen by the server.
67//
68// TODO(jyh): Consider using a larger identifier space to reduce chance of
69// collisions. (Note, createTransaction handles collisions when generating
70// transactionIDs.)
71type transactionID uint64
72
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070073// transactionContext defines the context in which a transaction is used. A
74// transaction may be used only in the context that created it.
75// transactionContext weakly identifies a session by the local and remote
76// principals involved in the RPC.
77// TODO(tilaks): Use the local and remote addresses to identify the session.
78// Does a session with a mobile device break if the remote address changes?
79type transactionContext interface {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070080 // LocalID returns the PublicID of the principal at the local end of the
81 // request.
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070082 LocalID() security.PublicID
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070083 // RemoteID returns the PublicID of the principal at the remote end of the
84 // request.
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070085 RemoteID() security.PublicID
86}
87
Jiri Simsa5293dcb2014-05-10 09:56:38 -070088type transaction struct {
Ken Ashcraftb7e15ad2014-08-04 14:38:37 -070089 trans *memstore.Transaction
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070090 expires time.Time
91 creatorCtx transactionContext
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092}
93
94// ServerConfig provides the parameters needed to construct a Server.
95type ServerConfig struct {
96 Admin security.PublicID // Administrator.
97 DBName string // DBName is the name if the database directory.
98}
99
100// New creates a new server.
101func New(config ServerConfig) (*Server, error) {
102 mstore, err := memstore.New(config.Admin, config.DBName)
103 if err != nil {
104 return nil, err
105 }
106 mwatcher, err := memwatch.New(config.Admin, config.DBName)
107 if err != nil {
108 return nil, err
109 }
110 s := &Server{
111 store: mstore,
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700112 transactions: make(map[transactionID]*transaction),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700113 ticker: time.NewTicker(time.Second),
114 closed: make(chan struct{}),
115 watcher: mwatcher,
116 }
117 s.pending.Add(1)
118 go s.gcLoop()
119 return s, nil
120}
121
122func (s *Server) Close() {
123 close(s.closed)
124 s.ticker.Stop()
125 s.pending.Wait()
126 s.store.Close()
127 s.watcher.Close()
128}
129
130func (s *Server) String() string {
131 return "StoreServer"
132}
133
134// Attributes returns the server status.
135func (s *Server) Attributes(arg string) map[string]string {
136 return map[string]string{
137 "health": "ok",
138 "servertype": s.String(),
139 }
140}
141
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700142// findTransactionComponent returns the (begin, end) offsets of the "$tid.*"
143// component in the given object name, or (-1, -1) if oname does not contain a
144// transaction component.
145func findTransactionComponent(oname string) (int, int) {
146 begin := 0
147 if !strings.HasPrefix(oname, "$tid") {
148 begin = strings.Index(oname, "/$tid")
149 }
150 if begin == -1 {
151 return -1, -1
152 }
153 end := strings.Index(oname[begin+1:], "/")
154 if end == -1 {
155 end = len(oname)
156 } else {
157 end += begin + 1
158 }
159 return begin, end
160}
161
162// TODO(sadovsky): One of the following:
163// - Reserve prefix string "$tid." for internal use.
164// - Reserve prefix char "$" for internal use.
165// - Require users to escape prefix char "$" when they are referring to their
166// own data, e.g. "\$foo".
167func makeTransactionComponent(id transactionID) string {
168 return fmt.Sprintf("$tid.%d", id)
169}
170
171// stripTransactionComponent returns the given object name with its "$tid.*"
172// component removed, and also returns the stripped transactionID.
173// Examples:
174// "/foo/$tid.123/bar" => {"/foo/bar", transactionID(123)}
175// "/foo/bar" => {"/foo/bar", nullTransactionID}
176func stripTransactionComponent(oname string) (string, transactionID, error) {
177 begin, end := findTransactionComponent(oname)
178 if begin == -1 {
179 return oname, nullTransactionID, nil
180 }
181 tc := oname[begin:end]
182 id, err := strconv.ParseInt(tc[strings.LastIndex(tc, ".")+1:], 10, 64)
183 if err != nil {
184 return "", nullTransactionID, fmt.Errorf("Failed to extract id from %q", tc)
185 }
186 return oname[:begin] + oname[end:], transactionID(id), nil
187}
188
189func (s *Server) createTransaction(ctx transactionContext, oname string) (string, error) {
190 s.mutex.Lock()
191 defer s.mutex.Unlock()
192
193 var id transactionID
194 for {
195 id = transactionID(rng.Int63())
196 _, ok := s.transactions[id]
197 if !ok {
198 break
199 }
200 }
201 info := &transaction{
202 trans: memstore.NewTransaction(),
203 expires: time.Now().Add(transactionMaxLifetime),
204 creatorCtx: ctx,
205 }
206 s.transactions[id] = info
207 return makeTransactionComponent(id), nil
208}
209
210// findTransaction returns the transaction for the given transaction ID.
Ken Ashcraftb7e15ad2014-08-04 14:38:37 -0700211func (s *Server) findTransaction(ctx transactionContext, id transactionID) (*memstore.Transaction, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700212 s.mutex.RLock()
213 defer s.mutex.RUnlock()
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700214 return s.findTransactionLocked(ctx, id)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700215}
216
Ken Ashcraftb7e15ad2014-08-04 14:38:37 -0700217func (s *Server) findTransactionLocked(ctx transactionContext, id transactionID) (*memstore.Transaction, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700218 if id == nullTransactionID {
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700219 return nil, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700220 }
221 info, ok := s.transactions[id]
222 if !ok {
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700223 return nil, errTransactionDoesNotExist
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700224 }
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700225 // A transaction may be used only by the session (and therefore client)
226 // that created it.
227 if !info.matchesContext(ctx) {
228 return nil, errPermissionDenied
229 }
230 return info.trans, nil
231}
232
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700233// Commit commits the changes in the transaction to the store. The
234// operation is atomic, so all mutations are performed, or none. Returns an
235// error if the transaction aborted.
236func (s *Server) commitTransaction(ctx transactionContext, id transactionID) error {
237 s.mutex.Lock()
238 defer s.mutex.Unlock()
239 t, err := s.findTransactionLocked(ctx, id)
240 if err != nil {
241 return err
242 }
243 if t == nil {
244 return errTransactionDoesNotExist
245 }
246 err = t.Commit()
247 delete(s.transactions, id)
248 return err
249}
250
251// Abort discards a transaction. This is an optimization; transactions
252// eventually time out and get discarded. However, live transactions
253// consume resources, so if you know that you won't be using a transaction
254// anymore, you should discard it explicitly.
255func (s *Server) abortTransaction(ctx transactionContext, id transactionID) error {
256 s.mutex.Lock()
257 defer s.mutex.Unlock()
258 t, err := s.findTransactionLocked(ctx, id)
259 if err != nil {
260 return err
261 }
262 if t == nil {
263 return errTransactionDoesNotExist
264 }
265 err = t.Abort()
266 delete(s.transactions, id)
267 return err
268}
269
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700270func (t *transaction) matchesContext(ctx transactionContext) bool {
271 creatorCtx := t.creatorCtx
272 return membersEqual(creatorCtx.LocalID().Names(), ctx.LocalID().Names()) &&
273 membersEqual(creatorCtx.RemoteID().Names(), ctx.RemoteID().Names())
274}
275
276// membersEquals checks whether two slices of strings have the same set of
277// members, regardless of order.
278func membersEqual(slice1, slice2 []string) bool {
279 set1 := make(map[string]bool, len(slice1))
280 for _, s := range slice1 {
281 set1[s] = true
282 }
283 set2 := make(map[string]bool, len(slice2))
284 for _, s := range slice2 {
285 set2[s] = true
286 }
287 // DeepEqual tests keys for == equality, which is sufficient for strings.
288 return reflect.DeepEqual(set1, set2)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700289}
290
291// gcLoop drops transactions that have expired.
292func (s *Server) gcLoop() {
293 for {
294 select {
295 case <-s.closed:
296 s.pending.Done()
297 return
298 case <-s.ticker.C:
299 }
300
301 s.mutex.Lock()
302 now := time.Now()
303 for id, t := range s.transactions {
304 if now.After(t.expires) {
305 t.trans.Abort()
306 delete(s.transactions, id)
307 }
308 }
309 s.mutex.Unlock()
310 }
311}
312
Tilak Sharma86571322014-06-11 14:15:07 -0700313// Watch returns a stream of all changes.
314func (s *Server) Watch(ctx ipc.ServerContext, req raw.Request, stream raw.StoreServiceWatchStream) error {
315 return s.watcher.WatchRaw(ctx, req, stream)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700316}
317
Tilak Sharma2af70022014-05-14 11:29:27 -0700318// PutMutations atomically commits a stream of Mutations when the stream is
319// closed. Mutations are not committed if the request is cancelled before the
320// stream has been closed.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700321func (s *Server) PutMutations(ctx ipc.ServerContext, stream raw.StoreServicePutMutationsStream) error {
Tilak Sharma2af70022014-05-14 11:29:27 -0700322 return s.store.PutMutations(ctx, stream)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700323}
324
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700325type storeDispatcher struct {
Tilak Sharmadafe6bf2014-05-16 13:54:51 -0700326 s *Server
327 auth security.Authorizer
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700328}
329
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700330// NewStoreDispatcher returns an object dispatcher.
331func NewStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
332 return &storeDispatcher{s: s, auth: auth}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700333}
334
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700335func (d *storeDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700336 serv, err := d.lookupServer(suffix)
337 if err != nil {
338 return nil, nil, err
339 }
340 return ipc.ReflectInvoker(serv), d.auth, nil
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700341}
342
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700343func (d *storeDispatcher) lookupServer(suffix string) (interface{}, error) {
344 // Strip leading "/" if present so that server internals can reliably use
345 // naming.Join(suffix, "foo").
346 suffix = strings.TrimPrefix(suffix, "/")
Ken Ashcraftad299042014-07-31 10:14:18 -0700347 if strings.HasSuffix(suffix, raw.RawStoreSuffix) {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700348 return raw.NewServerStore(d.s), nil
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700349 } else {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700350 // TODO(sadovsky): Create Object, Transaction, and TransactionRoot stubs,
351 // merge them, and return the result. See TODO in
352 // veyron2/services/store/service.vdl.
353 o, err := d.s.lookupObject(suffix)
354 if err != nil {
355 return nil, err
356 }
357 return store.NewServerObject(o), nil
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700358 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700359}
360
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700361func (s *Server) lookupObject(name string) (*object, error) {
362 oname, tid, err := stripTransactionComponent(name)
363 if err != nil {
364 return nil, err
365 }
366 return &object{name: oname, obj: s.store.Bind(oname), tid: tid, server: s}, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700367}