Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | // Package server implements a storage service. |
| 2 | package server |
| 3 | |
| 4 | import ( |
| 5 | "errors" |
| 6 | "sync" |
| 7 | "time" |
| 8 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 9 | "veyron/services/store/memstore" |
| 10 | memwatch "veyron/services/store/memstore/watch" |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 11 | "veyron/services/store/raw" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 12 | "veyron/services/store/service" |
| 13 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 14 | "veyron2/ipc" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 15 | "veyron2/security" |
| 16 | "veyron2/services/store" |
| 17 | "veyron2/services/watch" |
Todd Wang | e5d67b4 | 2014-05-27 10:48:07 -0700 | [diff] [blame] | 18 | "veyron2/vdl" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 19 | ) |
| 20 | |
| 21 | const ( |
| 22 | // transactionMAxLifetime is the maximum duration before a transaction will |
| 23 | // be garbage collected. |
| 24 | // |
| 25 | // TODO(jyh): This should probably be a configuration parameter. |
| 26 | transactionMaxLifetime = 30 * time.Second |
| 27 | ) |
| 28 | |
| 29 | var ( |
| 30 | // Server implements the StoreService interface. |
| 31 | _ store.StoreService = (*Server)(nil) |
| 32 | |
| 33 | nullTransactionID store.TransactionID |
| 34 | |
| 35 | errTransactionAlreadyExists = errors.New("transaction already exists") |
| 36 | errTransactionDoesNotExist = errors.New("transaction does not exist") |
| 37 | ) |
| 38 | |
| 39 | // Server stores the dictionary of all media items. It has a scanner.Scanner |
| 40 | // for collecting files from the filesystem. For each file, a FileService is |
| 41 | // registered to serve the file. |
| 42 | type Server struct { |
| 43 | mutex sync.RWMutex |
| 44 | |
| 45 | // store is the actual store implementation. |
| 46 | store service.Store |
| 47 | |
| 48 | // transactions are the set of active transactions. |
| 49 | transactions map[store.TransactionID]*transaction |
| 50 | |
| 51 | // Transaction garbage collection. |
| 52 | pending sync.WaitGroup |
| 53 | ticker *time.Ticker |
| 54 | closed chan struct{} |
| 55 | |
| 56 | // watcher is the actual store watcher implementation. |
| 57 | watcher service.Watcher |
| 58 | } |
| 59 | |
| 60 | type transaction struct { |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 61 | trans service.Transaction |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 62 | expires time.Time |
| 63 | } |
| 64 | |
| 65 | // ServerConfig provides the parameters needed to construct a Server. |
| 66 | type ServerConfig struct { |
| 67 | Admin security.PublicID // Administrator. |
| 68 | DBName string // DBName is the name if the database directory. |
| 69 | } |
| 70 | |
| 71 | // New creates a new server. |
| 72 | func New(config ServerConfig) (*Server, error) { |
| 73 | mstore, err := memstore.New(config.Admin, config.DBName) |
| 74 | if err != nil { |
| 75 | return nil, err |
| 76 | } |
| 77 | mwatcher, err := memwatch.New(config.Admin, config.DBName) |
| 78 | if err != nil { |
| 79 | return nil, err |
| 80 | } |
| 81 | s := &Server{ |
| 82 | store: mstore, |
| 83 | transactions: make(map[store.TransactionID]*transaction), |
| 84 | ticker: time.NewTicker(time.Second), |
| 85 | closed: make(chan struct{}), |
| 86 | watcher: mwatcher, |
| 87 | } |
| 88 | s.pending.Add(1) |
| 89 | go s.gcLoop() |
| 90 | return s, nil |
| 91 | } |
| 92 | |
| 93 | func (s *Server) Close() { |
| 94 | close(s.closed) |
| 95 | s.ticker.Stop() |
| 96 | s.pending.Wait() |
| 97 | s.store.Close() |
| 98 | s.watcher.Close() |
| 99 | } |
| 100 | |
| 101 | func (s *Server) String() string { |
| 102 | return "StoreServer" |
| 103 | } |
| 104 | |
| 105 | // Attributes returns the server status. |
| 106 | func (s *Server) Attributes(arg string) map[string]string { |
| 107 | return map[string]string{ |
| 108 | "health": "ok", |
| 109 | "servertype": s.String(), |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | // findTransaction returns the transaction for the TransactionID. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 114 | func (s *Server) findTransaction(id store.TransactionID) (service.Transaction, bool) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 115 | s.mutex.RLock() |
| 116 | defer s.mutex.RUnlock() |
| 117 | return s.findTransactionLocked(id) |
| 118 | } |
| 119 | |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 120 | func (s *Server) findTransactionLocked(id store.TransactionID) (service.Transaction, bool) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 121 | if id == nullTransactionID { |
| 122 | return nil, true |
| 123 | } |
| 124 | info, ok := s.transactions[id] |
| 125 | if !ok { |
| 126 | return nil, false |
| 127 | } |
| 128 | return info.trans, true |
| 129 | } |
| 130 | |
| 131 | // gcLoop drops transactions that have expired. |
| 132 | func (s *Server) gcLoop() { |
| 133 | for { |
| 134 | select { |
| 135 | case <-s.closed: |
| 136 | s.pending.Done() |
| 137 | return |
| 138 | case <-s.ticker.C: |
| 139 | } |
| 140 | |
| 141 | s.mutex.Lock() |
| 142 | now := time.Now() |
| 143 | for id, t := range s.transactions { |
| 144 | if now.After(t.expires) { |
| 145 | t.trans.Abort() |
| 146 | delete(s.transactions, id) |
| 147 | } |
| 148 | } |
| 149 | s.mutex.Unlock() |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | // CreateTransaction creates a transaction. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 154 | func (s *Server) CreateTransaction(_ ipc.ServerContext, id store.TransactionID, opts []vdl.Any) error { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 155 | s.mutex.Lock() |
| 156 | defer s.mutex.Unlock() |
| 157 | |
| 158 | info, ok := s.transactions[id] |
| 159 | if ok { |
| 160 | return errTransactionAlreadyExists |
| 161 | } |
| 162 | info = &transaction{ |
| 163 | trans: memstore.NewTransaction(), |
| 164 | expires: time.Now().Add(transactionMaxLifetime), |
| 165 | } |
| 166 | s.transactions[id] = info |
| 167 | return nil |
| 168 | } |
| 169 | |
| 170 | // Commit commits the changes in the transaction to the store. The |
| 171 | // operation is atomic, so all mutations are performed, or none. Returns an |
| 172 | // error if the transaction aborted. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 173 | func (s *Server) Commit(_ ipc.ServerContext, id store.TransactionID) error { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 174 | s.mutex.Lock() |
| 175 | defer s.mutex.Unlock() |
| 176 | |
| 177 | t, ok := s.findTransactionLocked(id) |
| 178 | if !ok { |
| 179 | return errTransactionDoesNotExist |
| 180 | } |
| 181 | err := t.Commit() |
| 182 | delete(s.transactions, id) |
| 183 | return err |
| 184 | } |
| 185 | |
| 186 | // Abort discards a transaction. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 187 | func (s *Server) Abort(_ ipc.ServerContext, id store.TransactionID) error { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 188 | s.mutex.Lock() |
| 189 | defer s.mutex.Unlock() |
| 190 | |
| 191 | t, ok := s.transactions[id] |
| 192 | if !ok { |
| 193 | return errTransactionDoesNotExist |
| 194 | } |
| 195 | t.trans.Abort() |
| 196 | delete(s.transactions, id) |
| 197 | return nil |
| 198 | } |
| 199 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 200 | // Watch returns a stream of changes. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 201 | func (s *Server) Watch(ctx ipc.ServerContext, req watch.Request, stream watch.WatcherServiceWatchStream) error { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 202 | return s.watcher.Watch(ctx, req, stream) |
| 203 | } |
| 204 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 205 | // PutMutations atomically commits a stream of Mutations when the stream is |
| 206 | // closed. Mutations are not committed if the request is cancelled before the |
| 207 | // stream has been closed. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 208 | func (s *Server) PutMutations(ctx ipc.ServerContext, stream raw.StoreServicePutMutationsStream) error { |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 209 | return s.store.PutMutations(ctx, stream) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 210 | } |
| 211 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 212 | // ReadConflicts returns the stream of conflicts to store values. A |
| 213 | // conflict occurs when there is a concurrent modification to a value. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 214 | func (s *Server) ReadConflicts(_ ipc.ServerContext, stream store.StoreServiceReadConflictsStream) error { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 215 | panic("not implemented") |
| 216 | } |
| 217 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 218 | // Store, raw store and object dispatchers. |
| 219 | // Typically, the store dispatcher handles paths with ".store" prefix, the |
| 220 | // raw store dispatcher handles paths with ".store.raw" prefix, and the |
Tilak Sharma | dafe6bf | 2014-05-16 13:54:51 -0700 | [diff] [blame] | 221 | // object dispatcher handles paths with "" prefix. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 222 | // TODO(sadovsky): Revisit this scheme. Seems simpler to have one dispatcher? |
| 223 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 224 | // NewStoreDispatcher returns a store dispatcher. |
Tilak Sharma | dafe6bf | 2014-05-16 13:54:51 -0700 | [diff] [blame] | 225 | func NewStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher { |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 226 | return ipc.SoloDispatcher(store.NewServerStore(s), auth) |
| 227 | } |
| 228 | |
| 229 | // NewRawStoreDispatcher returns a raw store dispatcher. |
| 230 | func NewRawStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher { |
| 231 | return ipc.SoloDispatcher(raw.NewServerStore(s), auth) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 232 | } |
| 233 | |
| 234 | type objectDispatcher struct { |
Tilak Sharma | dafe6bf | 2014-05-16 13:54:51 -0700 | [diff] [blame] | 235 | s *Server |
| 236 | auth security.Authorizer |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 237 | } |
| 238 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 239 | // NewObjectDispatcher returns an object dispatcher. |
Tilak Sharma | dafe6bf | 2014-05-16 13:54:51 -0700 | [diff] [blame] | 240 | func NewObjectDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher { |
| 241 | return &objectDispatcher{s: s, auth: auth} |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 242 | } |
| 243 | |
| 244 | func (d *objectDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) { |
| 245 | o := d.s.lookupObject(suffix) |
| 246 | serverObject := store.NewServerObject(o) |
Tilak Sharma | dafe6bf | 2014-05-16 13:54:51 -0700 | [diff] [blame] | 247 | return ipc.ReflectInvoker(serverObject), d.auth, nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 248 | } |
| 249 | |
| 250 | func (s *Server) lookupObject(name string) *object { |
| 251 | return &object{name: name, obj: s.store.Bind(name), server: s} |
| 252 | } |