blob: 741954b5e04413f0a74d3331cb5f0d8814203bc5 [file] [log] [blame]
package server
import (
"errors"
"veyron/services/store/service"
"veyron2/ipc"
"veyron2/query"
"veyron2/services/mounttable"
"veyron2/services/store"
"veyron2/services/watch"
"veyron2/storage"
"veyron2/vdl"
)
type object struct {
name string
obj service.Object
server *Server
}
var (
errNotAValue = errors.New("not a storage.Value")
errNotAnAttribute = errors.New("not a storage.Attr")
_ store.ObjectService = (*object)(nil)
nullEntry store.Entry
nullStat store.Stat
)
func fillServiceStat(result *store.Stat, stat *storage.Stat) {
result.ID = stat.ID
result.MTimeNS = stat.MTime.UnixNano()
result.Attrs = attrsToAnyData(stat.Attrs)
}
func makeServiceStat(stat *storage.Stat) store.Stat {
if stat == nil {
return nullStat
}
var result store.Stat
fillServiceStat(&result, stat)
return result
}
func makeServiceEntry(e *storage.Entry) store.Entry {
if e == nil {
return nullEntry
}
result := store.Entry{Value: e.Value}
fillServiceStat(&result.Stat, &e.Stat)
return result
}
func (o *object) String() string {
return o.name
}
func (o *object) Attributes(arg string) map[string]string {
return map[string]string{
"health": "ok",
"servertype": o.String(),
}
}
func attrsFromAnyData(attrs []vdl.Any) ([]storage.Attr, error) {
typedAttrs := make([]storage.Attr, len(attrs))
for i, x := range attrs {
a, ok := x.(storage.Attr)
if !ok {
return nil, errNotAnAttribute
}
typedAttrs[i] = a
}
return typedAttrs, nil
}
func attrsToAnyData(attrs []storage.Attr) []vdl.Any {
uattrs := make([]vdl.Any, len(attrs))
for i, x := range attrs {
uattrs[i] = x
}
return uattrs
}
// Exists returns true iff the Entry has a value.
func (o *object) Exists(ctx ipc.Context, tid store.TransactionID) (bool, error) {
t, ok := o.server.findTransaction(tid)
if !ok {
return false, errTransactionDoesNotExist
}
return o.obj.Exists(ctx.RemoteID(), t)
}
// Get returns the value for the Object. The value returned is from the
// most recent mutation of the entry in the Transaction, or from the
// Transaction's snapshot if there is no mutation.
func (o *object) Get(ctx ipc.Context, tid store.TransactionID) (store.Entry, error) {
t, ok := o.server.findTransaction(tid)
if !ok {
return nullEntry, errTransactionDoesNotExist
}
entry, err := o.obj.Get(ctx.RemoteID(), t)
if err != nil {
return nullEntry, err
}
return makeServiceEntry(entry), err
}
// Put modifies the value of the Object.
func (o *object) Put(ctx ipc.Context, tid store.TransactionID, val vdl.Any) (store.Stat, error) {
t, ok := o.server.findTransaction(tid)
if !ok {
return nullStat, errTransactionDoesNotExist
}
stat, err := o.obj.Put(ctx.RemoteID(), t, interface{}(val))
if err != nil {
return nullStat, err
}
return makeServiceStat(stat), nil
}
// Remove removes the Object.
func (o *object) Remove(ctx ipc.Context, tid store.TransactionID) error {
t, ok := o.server.findTransaction(tid)
if !ok {
return errTransactionDoesNotExist
}
return o.obj.Remove(ctx.RemoteID(), t)
}
// SetAttr changes the attributes of the entry, such as permissions and
// replication groups. Attributes are associated with the value, not the
// path.
func (o *object) SetAttr(ctx ipc.Context, tid store.TransactionID, attrs []vdl.Any) error {
t, ok := o.server.findTransaction(tid)
if !ok {
return errTransactionDoesNotExist
}
typedAttrs, err := attrsFromAnyData(attrs)
if err != nil {
return err
}
return o.obj.SetAttr(ctx.RemoteID(), t, typedAttrs...)
}
// Stat returns entry info.
func (o *object) Stat(ctx ipc.Context, tid store.TransactionID) (store.Stat, error) {
t, ok := o.server.findTransaction(tid)
if !ok {
return nullStat, errTransactionDoesNotExist
}
stat, err := o.obj.Stat(ctx.RemoteID(), t)
if err != nil {
return nullStat, err
}
return makeServiceStat(stat), nil
}
// Query returns a sequence of objects that match the given query.
func (o *object) Query(ctx ipc.Context, tid store.TransactionID, q query.Query, stream store.ObjectServiceQueryStream) error {
t, ok := o.server.findTransaction(tid)
if !ok {
return errTransactionDoesNotExist
}
it, err := o.obj.Query(ctx.RemoteID(), t, q)
if err != nil {
return err
}
for it.Next() {
if err := stream.Send(*it.Get()); err != nil {
it.Abort()
return err
}
}
return it.Err()
}
type globStreamAdapter struct {
stream mounttable.GlobableServiceGlobStream
}
func (a *globStreamAdapter) Send(item string) error {
return a.stream.Send(mounttable.MountEntry{
Name: item,
})
}
// Glob streams a series of names that match the given pattern.
func (o *object) Glob(ctx ipc.Context, pattern string, stream mounttable.GlobableServiceGlobStream) error {
return o.GlobT(ctx, nullTransactionID, pattern, &globStreamAdapter{stream})
}
// Glob streams a series of names that match the given pattern.
func (o *object) GlobT(ctx ipc.Context, tid store.TransactionID, pattern string, stream store.ObjectServiceGlobTStream) error {
t, ok := o.server.findTransaction(tid)
if !ok {
return errTransactionDoesNotExist
}
it, err := o.obj.Glob(ctx.RemoteID(), t, pattern)
if err != nil {
return err
}
for ; it.IsValid(); it.Next() {
if ctx.IsClosed() {
break
}
if err := stream.Send(it.Name()); err != nil {
return err
}
}
return nil
}
// Watch returns a stream of changes.
func (o *object) Watch(ctx ipc.Context, req watch.Request, stream watch.WatcherServiceWatchStream) error {
panic("not implemented")
}