blob: 504d56008e1657b4f6a36b5687b779ecf4135564 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001// Package server implements a storage service.
2package server
3
4import (
5 "errors"
6 "sync"
7 "time"
8
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "veyron/services/store/memstore"
10 memwatch "veyron/services/store/memstore/watch"
Tilak Sharma2af70022014-05-14 11:29:27 -070011 "veyron/services/store/raw"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070012 "veyron/services/store/service"
13
Jiri Simsa5293dcb2014-05-10 09:56:38 -070014 "veyron2/ipc"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070015 "veyron2/security"
16 "veyron2/services/store"
17 "veyron2/services/watch"
Todd Wange5d67b42014-05-27 10:48:07 -070018 "veyron2/vdl"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070019)
20
21const (
22 // transactionMAxLifetime is the maximum duration before a transaction will
23 // be garbage collected.
24 //
25 // TODO(jyh): This should probably be a configuration parameter.
26 transactionMaxLifetime = 30 * time.Second
27)
28
29var (
30 // Server implements the StoreService interface.
31 _ store.StoreService = (*Server)(nil)
32
33 nullTransactionID store.TransactionID
34
35 errTransactionAlreadyExists = errors.New("transaction already exists")
36 errTransactionDoesNotExist = errors.New("transaction does not exist")
37)
38
39// Server stores the dictionary of all media items. It has a scanner.Scanner
40// for collecting files from the filesystem. For each file, a FileService is
41// registered to serve the file.
42type Server struct {
43 mutex sync.RWMutex
44
45 // store is the actual store implementation.
46 store service.Store
47
48 // transactions are the set of active transactions.
49 transactions map[store.TransactionID]*transaction
50
51 // Transaction garbage collection.
52 pending sync.WaitGroup
53 ticker *time.Ticker
54 closed chan struct{}
55
56 // watcher is the actual store watcher implementation.
57 watcher service.Watcher
58}
59
60type transaction struct {
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -070061 trans service.Transaction
Jiri Simsa5293dcb2014-05-10 09:56:38 -070062 expires time.Time
63}
64
65// ServerConfig provides the parameters needed to construct a Server.
66type ServerConfig struct {
67 Admin security.PublicID // Administrator.
68 DBName string // DBName is the name if the database directory.
69}
70
71// New creates a new server.
72func New(config ServerConfig) (*Server, error) {
73 mstore, err := memstore.New(config.Admin, config.DBName)
74 if err != nil {
75 return nil, err
76 }
77 mwatcher, err := memwatch.New(config.Admin, config.DBName)
78 if err != nil {
79 return nil, err
80 }
81 s := &Server{
82 store: mstore,
83 transactions: make(map[store.TransactionID]*transaction),
84 ticker: time.NewTicker(time.Second),
85 closed: make(chan struct{}),
86 watcher: mwatcher,
87 }
88 s.pending.Add(1)
89 go s.gcLoop()
90 return s, nil
91}
92
93func (s *Server) Close() {
94 close(s.closed)
95 s.ticker.Stop()
96 s.pending.Wait()
97 s.store.Close()
98 s.watcher.Close()
99}
100
101func (s *Server) String() string {
102 return "StoreServer"
103}
104
105// Attributes returns the server status.
106func (s *Server) Attributes(arg string) map[string]string {
107 return map[string]string{
108 "health": "ok",
109 "servertype": s.String(),
110 }
111}
112
113// findTransaction returns the transaction for the TransactionID.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700114func (s *Server) findTransaction(id store.TransactionID) (service.Transaction, bool) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115 s.mutex.RLock()
116 defer s.mutex.RUnlock()
117 return s.findTransactionLocked(id)
118}
119
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700120func (s *Server) findTransactionLocked(id store.TransactionID) (service.Transaction, bool) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700121 if id == nullTransactionID {
122 return nil, true
123 }
124 info, ok := s.transactions[id]
125 if !ok {
126 return nil, false
127 }
128 return info.trans, true
129}
130
131// gcLoop drops transactions that have expired.
132func (s *Server) gcLoop() {
133 for {
134 select {
135 case <-s.closed:
136 s.pending.Done()
137 return
138 case <-s.ticker.C:
139 }
140
141 s.mutex.Lock()
142 now := time.Now()
143 for id, t := range s.transactions {
144 if now.After(t.expires) {
145 t.trans.Abort()
146 delete(s.transactions, id)
147 }
148 }
149 s.mutex.Unlock()
150 }
151}
152
153// CreateTransaction creates a transaction.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700154func (s *Server) CreateTransaction(_ ipc.ServerContext, id store.TransactionID, opts []vdl.Any) error {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700155 s.mutex.Lock()
156 defer s.mutex.Unlock()
157
158 info, ok := s.transactions[id]
159 if ok {
160 return errTransactionAlreadyExists
161 }
162 info = &transaction{
163 trans: memstore.NewTransaction(),
164 expires: time.Now().Add(transactionMaxLifetime),
165 }
166 s.transactions[id] = info
167 return nil
168}
169
170// Commit commits the changes in the transaction to the store. The
171// operation is atomic, so all mutations are performed, or none. Returns an
172// error if the transaction aborted.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700173func (s *Server) Commit(_ ipc.ServerContext, id store.TransactionID) error {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700174 s.mutex.Lock()
175 defer s.mutex.Unlock()
176
177 t, ok := s.findTransactionLocked(id)
178 if !ok {
179 return errTransactionDoesNotExist
180 }
181 err := t.Commit()
182 delete(s.transactions, id)
183 return err
184}
185
186// Abort discards a transaction.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700187func (s *Server) Abort(_ ipc.ServerContext, id store.TransactionID) error {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700188 s.mutex.Lock()
189 defer s.mutex.Unlock()
190
191 t, ok := s.transactions[id]
192 if !ok {
193 return errTransactionDoesNotExist
194 }
195 t.trans.Abort()
196 delete(s.transactions, id)
197 return nil
198}
199
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700200// Watch returns a stream of changes.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700201func (s *Server) Watch(ctx ipc.ServerContext, req watch.Request, stream watch.WatcherServiceWatchStream) error {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700202 return s.watcher.Watch(ctx, req, stream)
203}
204
Tilak Sharma2af70022014-05-14 11:29:27 -0700205// PutMutations atomically commits a stream of Mutations when the stream is
206// closed. Mutations are not committed if the request is cancelled before the
207// stream has been closed.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700208func (s *Server) PutMutations(ctx ipc.ServerContext, stream raw.StoreServicePutMutationsStream) error {
Tilak Sharma2af70022014-05-14 11:29:27 -0700209 return s.store.PutMutations(ctx, stream)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700210}
211
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700212// ReadConflicts returns the stream of conflicts to store values. A
213// conflict occurs when there is a concurrent modification to a value.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700214func (s *Server) ReadConflicts(_ ipc.ServerContext, stream store.StoreServiceReadConflictsStream) error {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700215 panic("not implemented")
216}
217
Tilak Sharma2af70022014-05-14 11:29:27 -0700218// Store, raw store and object dispatchers.
219// Typically, the store dispatcher handles paths with ".store" prefix, the
220// raw store dispatcher handles paths with ".store.raw" prefix, and the
Tilak Sharmadafe6bf2014-05-16 13:54:51 -0700221// object dispatcher handles paths with "" prefix.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700222// TODO(sadovsky): Revisit this scheme. Seems simpler to have one dispatcher?
223
Tilak Sharma2af70022014-05-14 11:29:27 -0700224// NewStoreDispatcher returns a store dispatcher.
Tilak Sharmadafe6bf2014-05-16 13:54:51 -0700225func NewStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
Tilak Sharma2af70022014-05-14 11:29:27 -0700226 return ipc.SoloDispatcher(store.NewServerStore(s), auth)
227}
228
229// NewRawStoreDispatcher returns a raw store dispatcher.
230func NewRawStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
231 return ipc.SoloDispatcher(raw.NewServerStore(s), auth)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700232}
233
234type objectDispatcher struct {
Tilak Sharmadafe6bf2014-05-16 13:54:51 -0700235 s *Server
236 auth security.Authorizer
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700237}
238
Tilak Sharma2af70022014-05-14 11:29:27 -0700239// NewObjectDispatcher returns an object dispatcher.
Tilak Sharmadafe6bf2014-05-16 13:54:51 -0700240func NewObjectDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
241 return &objectDispatcher{s: s, auth: auth}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700242}
243
244func (d *objectDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
245 o := d.s.lookupObject(suffix)
246 serverObject := store.NewServerObject(o)
Tilak Sharmadafe6bf2014-05-16 13:54:51 -0700247 return ipc.ReflectInvoker(serverObject), d.auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700248}
249
250func (s *Server) lookupObject(name string) *object {
251 return &object{name: name, obj: s.store.Bind(name), server: s}
252}