blob: 8b72fc412107a629fda136745ce2bc985001f4ee [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001// Package server implements a storage service.
Adam Sadovsky04c5eef2014-07-30 17:33:27 -07002
Jiri Simsa5293dcb2014-05-10 09:56:38 -07003package server
4
Adam Sadovsky04c5eef2014-07-30 17:33:27 -07005// This file defines Server, which implements the server-side Store API from
6// veyron2/services/store/service.vdl.
7
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008import (
Adam Sadovsky04c5eef2014-07-30 17:33:27 -07009 "fmt"
10 "math/rand"
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070011 "reflect"
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070012 "strconv"
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -070013 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070014 "sync"
15 "time"
16
Jiri Simsa5293dcb2014-05-10 09:56:38 -070017 "veyron/services/store/memstore"
18 memwatch "veyron/services/store/memstore/watch"
Tilak Sharma2af70022014-05-14 11:29:27 -070019 "veyron/services/store/raw"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070020 "veyron/services/store/service"
21
Jiri Simsa5293dcb2014-05-10 09:56:38 -070022 "veyron2/ipc"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070023 "veyron2/security"
24 "veyron2/services/store"
Benjamin Prosnitz552aa072014-06-09 13:13:25 -070025 "veyron2/verror"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070026)
27
28const (
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070029 // transactionMaxLifetime is the maximum duration before a transaction will
Jiri Simsa5293dcb2014-05-10 09:56:38 -070030 // be garbage collected.
31 //
32 // TODO(jyh): This should probably be a configuration parameter.
33 transactionMaxLifetime = 30 * time.Second
34)
35
36var (
Jiri Simsa5293dcb2014-05-10 09:56:38 -070037 _ store.StoreService = (*Server)(nil)
38
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070039 errNestedTransaction = verror.BadArgf("cannot create a nested Transaction")
40 // Note, this can happen e.g. due to expiration.
41 errTransactionDoesNotExist = verror.NotFoundf("transaction does not exist")
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070042 // Transaction exists, but may not be used by the caller.
Tilak Sharmae189e6a2014-07-14 11:45:22 -070043 errPermissionDenied = verror.NotAuthorizedf("permission denied")
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070044
45 rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
46
47 nullTransactionID transactionID
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048)
49
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070050// Server implements Store and uses memstore internally.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070051type Server struct {
52 mutex sync.RWMutex
53
54 // store is the actual store implementation.
55 store service.Store
56
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070057 // transactions is the set of active transactions.
58 transactions map[transactionID]*transaction
Jiri Simsa5293dcb2014-05-10 09:56:38 -070059
60 // Transaction garbage collection.
61 pending sync.WaitGroup
62 ticker *time.Ticker
63 closed chan struct{}
64
65 // watcher is the actual store watcher implementation.
66 watcher service.Watcher
67}
68
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070069// transactionID is an internal transaction identifier chosen by the server.
70//
71// TODO(jyh): Consider using a larger identifier space to reduce chance of
72// collisions. (Note, createTransaction handles collisions when generating
73// transactionIDs.)
74type transactionID uint64
75
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070076// transactionContext defines the context in which a transaction is used. A
77// transaction may be used only in the context that created it.
78// transactionContext weakly identifies a session by the local and remote
79// principals involved in the RPC.
80// TODO(tilaks): Use the local and remote addresses to identify the session.
81// Does a session with a mobile device break if the remote address changes?
82type transactionContext interface {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070083 // LocalID returns the PublicID of the principal at the local end of the
84 // request.
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070085 LocalID() security.PublicID
Adam Sadovsky04c5eef2014-07-30 17:33:27 -070086 // RemoteID returns the PublicID of the principal at the remote end of the
87 // request.
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070088 RemoteID() security.PublicID
89}
90
Jiri Simsa5293dcb2014-05-10 09:56:38 -070091type transaction struct {
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -070092 trans service.Transaction
93 expires time.Time
94 creatorCtx transactionContext
Jiri Simsa5293dcb2014-05-10 09:56:38 -070095}
96
97// ServerConfig provides the parameters needed to construct a Server.
98type ServerConfig struct {
99 Admin security.PublicID // Administrator.
100 DBName string // DBName is the name if the database directory.
101}
102
103// New creates a new server.
104func New(config ServerConfig) (*Server, error) {
105 mstore, err := memstore.New(config.Admin, config.DBName)
106 if err != nil {
107 return nil, err
108 }
109 mwatcher, err := memwatch.New(config.Admin, config.DBName)
110 if err != nil {
111 return nil, err
112 }
113 s := &Server{
114 store: mstore,
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700115 transactions: make(map[transactionID]*transaction),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700116 ticker: time.NewTicker(time.Second),
117 closed: make(chan struct{}),
118 watcher: mwatcher,
119 }
120 s.pending.Add(1)
121 go s.gcLoop()
122 return s, nil
123}
124
125func (s *Server) Close() {
126 close(s.closed)
127 s.ticker.Stop()
128 s.pending.Wait()
129 s.store.Close()
130 s.watcher.Close()
131}
132
133func (s *Server) String() string {
134 return "StoreServer"
135}
136
137// Attributes returns the server status.
138func (s *Server) Attributes(arg string) map[string]string {
139 return map[string]string{
140 "health": "ok",
141 "servertype": s.String(),
142 }
143}
144
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700145// findTransactionComponent returns the (begin, end) offsets of the "$tid.*"
146// component in the given object name, or (-1, -1) if oname does not contain a
147// transaction component.
148func findTransactionComponent(oname string) (int, int) {
149 begin := 0
150 if !strings.HasPrefix(oname, "$tid") {
151 begin = strings.Index(oname, "/$tid")
152 }
153 if begin == -1 {
154 return -1, -1
155 }
156 end := strings.Index(oname[begin+1:], "/")
157 if end == -1 {
158 end = len(oname)
159 } else {
160 end += begin + 1
161 }
162 return begin, end
163}
164
165// TODO(sadovsky): One of the following:
166// - Reserve prefix string "$tid." for internal use.
167// - Reserve prefix char "$" for internal use.
168// - Require users to escape prefix char "$" when they are referring to their
169// own data, e.g. "\$foo".
170func makeTransactionComponent(id transactionID) string {
171 return fmt.Sprintf("$tid.%d", id)
172}
173
174// stripTransactionComponent returns the given object name with its "$tid.*"
175// component removed, and also returns the stripped transactionID.
176// Examples:
177// "/foo/$tid.123/bar" => {"/foo/bar", transactionID(123)}
178// "/foo/bar" => {"/foo/bar", nullTransactionID}
179func stripTransactionComponent(oname string) (string, transactionID, error) {
180 begin, end := findTransactionComponent(oname)
181 if begin == -1 {
182 return oname, nullTransactionID, nil
183 }
184 tc := oname[begin:end]
185 id, err := strconv.ParseInt(tc[strings.LastIndex(tc, ".")+1:], 10, 64)
186 if err != nil {
187 return "", nullTransactionID, fmt.Errorf("Failed to extract id from %q", tc)
188 }
189 return oname[:begin] + oname[end:], transactionID(id), nil
190}
191
192func (s *Server) createTransaction(ctx transactionContext, oname string) (string, error) {
193 s.mutex.Lock()
194 defer s.mutex.Unlock()
195
196 var id transactionID
197 for {
198 id = transactionID(rng.Int63())
199 _, ok := s.transactions[id]
200 if !ok {
201 break
202 }
203 }
204 info := &transaction{
205 trans: memstore.NewTransaction(),
206 expires: time.Now().Add(transactionMaxLifetime),
207 creatorCtx: ctx,
208 }
209 s.transactions[id] = info
210 return makeTransactionComponent(id), nil
211}
212
213// findTransaction returns the transaction for the given transaction ID.
214func (s *Server) findTransaction(ctx transactionContext, id transactionID) (service.Transaction, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700215 s.mutex.RLock()
216 defer s.mutex.RUnlock()
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700217 return s.findTransactionLocked(ctx, id)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700218}
219
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700220func (s *Server) findTransactionLocked(ctx transactionContext, id transactionID) (service.Transaction, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700221 if id == nullTransactionID {
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700222 return nil, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700223 }
224 info, ok := s.transactions[id]
225 if !ok {
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700226 return nil, errTransactionDoesNotExist
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700227 }
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700228 // A transaction may be used only by the session (and therefore client)
229 // that created it.
230 if !info.matchesContext(ctx) {
231 return nil, errPermissionDenied
232 }
233 return info.trans, nil
234}
235
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700236// Commit commits the changes in the transaction to the store. The
237// operation is atomic, so all mutations are performed, or none. Returns an
238// error if the transaction aborted.
239func (s *Server) commitTransaction(ctx transactionContext, id transactionID) error {
240 s.mutex.Lock()
241 defer s.mutex.Unlock()
242 t, err := s.findTransactionLocked(ctx, id)
243 if err != nil {
244 return err
245 }
246 if t == nil {
247 return errTransactionDoesNotExist
248 }
249 err = t.Commit()
250 delete(s.transactions, id)
251 return err
252}
253
254// Abort discards a transaction. This is an optimization; transactions
255// eventually time out and get discarded. However, live transactions
256// consume resources, so if you know that you won't be using a transaction
257// anymore, you should discard it explicitly.
258func (s *Server) abortTransaction(ctx transactionContext, id transactionID) error {
259 s.mutex.Lock()
260 defer s.mutex.Unlock()
261 t, err := s.findTransactionLocked(ctx, id)
262 if err != nil {
263 return err
264 }
265 if t == nil {
266 return errTransactionDoesNotExist
267 }
268 err = t.Abort()
269 delete(s.transactions, id)
270 return err
271}
272
Tilak Sharma7cfc9cb2014-05-24 01:45:49 -0700273func (t *transaction) matchesContext(ctx transactionContext) bool {
274 creatorCtx := t.creatorCtx
275 return membersEqual(creatorCtx.LocalID().Names(), ctx.LocalID().Names()) &&
276 membersEqual(creatorCtx.RemoteID().Names(), ctx.RemoteID().Names())
277}
278
279// membersEquals checks whether two slices of strings have the same set of
280// members, regardless of order.
281func membersEqual(slice1, slice2 []string) bool {
282 set1 := make(map[string]bool, len(slice1))
283 for _, s := range slice1 {
284 set1[s] = true
285 }
286 set2 := make(map[string]bool, len(slice2))
287 for _, s := range slice2 {
288 set2[s] = true
289 }
290 // DeepEqual tests keys for == equality, which is sufficient for strings.
291 return reflect.DeepEqual(set1, set2)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700292}
293
294// gcLoop drops transactions that have expired.
295func (s *Server) gcLoop() {
296 for {
297 select {
298 case <-s.closed:
299 s.pending.Done()
300 return
301 case <-s.ticker.C:
302 }
303
304 s.mutex.Lock()
305 now := time.Now()
306 for id, t := range s.transactions {
307 if now.After(t.expires) {
308 t.trans.Abort()
309 delete(s.transactions, id)
310 }
311 }
312 s.mutex.Unlock()
313 }
314}
315
Tilak Sharma86571322014-06-11 14:15:07 -0700316// Watch returns a stream of all changes.
317func (s *Server) Watch(ctx ipc.ServerContext, req raw.Request, stream raw.StoreServiceWatchStream) error {
318 return s.watcher.WatchRaw(ctx, req, stream)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700319}
320
Tilak Sharma2af70022014-05-14 11:29:27 -0700321// PutMutations atomically commits a stream of Mutations when the stream is
322// closed. Mutations are not committed if the request is cancelled before the
323// stream has been closed.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700324func (s *Server) PutMutations(ctx ipc.ServerContext, stream raw.StoreServicePutMutationsStream) error {
Tilak Sharma2af70022014-05-14 11:29:27 -0700325 return s.store.PutMutations(ctx, stream)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700326}
327
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700328// ReadConflicts returns the stream of conflicts to store values. A
329// conflict occurs when there is a concurrent modification to a value.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700330func (s *Server) ReadConflicts(_ ipc.ServerContext, stream store.StoreServiceReadConflictsStream) error {
Benjamin Prosnitz552aa072014-06-09 13:13:25 -0700331 return verror.Internalf("ReadConflicts not yet implemented")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700332}
333
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700334type storeDispatcher struct {
Tilak Sharmadafe6bf2014-05-16 13:54:51 -0700335 s *Server
336 auth security.Authorizer
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700337}
338
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700339// NewStoreDispatcher returns an object dispatcher.
340func NewStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
341 return &storeDispatcher{s: s, auth: auth}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700342}
343
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700344func (d *storeDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700345 serv, err := d.lookupServer(suffix)
346 if err != nil {
347 return nil, nil, err
348 }
349 return ipc.ReflectInvoker(serv), d.auth, nil
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700350}
351
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700352func (d *storeDispatcher) lookupServer(suffix string) (interface{}, error) {
353 // Strip leading "/" if present so that server internals can reliably use
354 // naming.Join(suffix, "foo").
355 suffix = strings.TrimPrefix(suffix, "/")
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700356 if strings.HasSuffix(suffix, store.StoreSuffix) {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700357 return store.NewServerStore(d.s), nil
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700358 } else if strings.HasSuffix(suffix, raw.RawStoreSuffix) {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700359 return raw.NewServerStore(d.s), nil
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700360 } else {
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700361 // TODO(sadovsky): Create Object, Transaction, and TransactionRoot stubs,
362 // merge them, and return the result. See TODO in
363 // veyron2/services/store/service.vdl.
364 o, err := d.s.lookupObject(suffix)
365 if err != nil {
366 return nil, err
367 }
368 return store.NewServerObject(o), nil
Benjamin Prosnitz3916d2c2014-06-04 15:30:41 -0700369 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700370}
371
Adam Sadovsky04c5eef2014-07-30 17:33:27 -0700372func (s *Server) lookupObject(name string) (*object, error) {
373 oname, tid, err := stripTransactionComponent(name)
374 if err != nil {
375 return nil, err
376 }
377 return &object{name: oname, obj: s.store.Bind(oname), tid: tid, server: s}, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700378}