blob: a4397b0fda02489f3f4e21f172b2ff1e6769740c [file] [log] [blame]
package memstore
import (
"errors"
"io"
"veyron/runtimes/google/lib/sync"
"veyron/services/store/memstore/state"
"veyron/services/store/raw"
"veyron/services/store/service"
"veyron2/ipc"
"veyron2/security"
"veyron2/storage"
)
// Store is the in-memory state of the store.
type Store struct {
sync.DebugMutex
// state holds the current state of the store.
State *state.State
// log persists the state to disk and any committed transactions.
// An ephemeral state has a nil log, and does not persist to disk.
log *wlog
}
// Store implements the service.Store interface.
var _ service.Store = (*Store)(nil)
var (
ErrRequestCancelled = errors.New("request cancelled")
)
// New creates a new store. admin is the public ID of the administrator, dbName
// is the path of the database directory, to which logs are written.
func New(admin security.PublicID, dbName string) (*Store, error) {
rlog, err := openDB(dbName, false)
if err != nil {
return nil, err
}
if rlog == nil {
return newState(admin, dbName)
}
st, err := readAndCloseDB(admin, rlog)
if err != nil {
return nil, err
}
// Rename the log file by date.
if err := backupLog(dbName); err != nil {
return nil, err
}
if err := st.setLog(dbName); err != nil {
return nil, err
}
return st, nil
}
// newState returns an empty state. dbName is the path of the database directory,
// to which logs are written.
func newState(admin security.PublicID, dbName string) (*Store, error) {
st := &Store{State: state.New(admin)}
if err := st.setLog(dbName); err != nil {
return nil, err
}
return st, nil
}
// setLog creates a fresh log file and writes to it.
func (st *Store) setLog(dbName string) error {
if dbName != "" {
log, err := createLog(dbName)
if err != nil {
return err
}
err = log.writeState(st)
if err != nil {
log.close()
return err
}
st.log = log
}
return nil
}
func (st *Store) Close() error {
st.Lock()
st.closeLocked()
st.Unlock()
return nil
}
func (st *Store) closeLocked() {
st.State = nil
if st.log != nil {
st.log.close()
}
st.log = nil
}
// GC performs a manual garbage collection.
func (st *Store) GC() error {
st.Lock()
defer st.Unlock()
st.State.GC()
// Append a transaction containing deletions, if there are any.
mu := st.State.Deletions()
if st.log != nil && mu != nil {
if err := st.log.appendTransaction(mu); err != nil {
// We can't continue because the log failed. The state has already been updated,
// but access to the state is blocked because we have the lock. Close the state
// to ensure that it is never used again.
st.closeLocked()
return err
}
}
return nil
}
// Snapshot returns a read-only state.
func (st *Store) Snapshot() state.Snapshot {
st.Lock()
defer st.Unlock()
return st.State.Snapshot()
}
// ApplyMutations applies the mutations to the state atomically.
func (st *Store) ApplyMutations(mu *state.Mutations) error {
st.Lock()
defer st.Unlock()
if err := st.State.ApplyMutations(mu); err != nil {
return err
}
if st.log != nil {
// Append the transaction to the log.
if err := st.log.appendTransaction(mu); err != nil {
// We can't continue because the log failed. The state has already been updated,
// but access to the state is blocked because we have the lock. Close the state
// to ensure that it is never used again.
st.closeLocked()
return err
}
}
return nil
}
// PutMutations atomically commits a stream of Mutations when the stream is
// closed. Mutations are not committed if the request is cancelled before the
// stream has been closed.
func (st *Store) PutMutations(ctx ipc.ServerContext, stream raw.StoreServicePutMutationsStream) error {
tr := st.newNilTransaction()
for {
mu, err := stream.Recv()
if err == io.EOF {
if ctx.IsClosed() {
tr.Abort()
return ErrRequestCancelled
}
break
}
if err != nil {
tr.Abort()
return err
}
if err := tr.snapshot.PutMutation(mu); err != nil {
tr.Abort()
return err
}
}
return tr.Commit()
}
// SetConflictResolver specifies a function to perform conflict resolution.
// The <ty> represents the IDL name for the type.
func (st *Store) SetConflictResolver(ty string, r storage.ConflictResolver) {
panic("not implemented")
}