| package server |
| |
| import ( |
| "veyron/services/store/service" |
| |
| "veyron2/ipc" |
| "veyron2/query" |
| "veyron2/services/mounttable" |
| "veyron2/services/store" |
| "veyron2/services/watch" |
| "veyron2/storage" |
| "veyron2/vdl/vdlutil" |
| "veyron2/verror" |
| ) |
| |
| type object struct { |
| name string |
| obj service.Object |
| server *Server |
| } |
| |
| var ( |
| errNotAValue = verror.BadArgf("not a storage.Value") |
| errNotAnAttribute = verror.BadArgf("not a storage.Attr") |
| |
| _ store.ObjectService = (*object)(nil) |
| |
| nullEntry store.Entry |
| nullStat store.Stat |
| ) |
| |
| func fillServiceStat(result *store.Stat, stat *storage.Stat) { |
| result.ID = stat.ID |
| result.MTimeNS = stat.MTime.UnixNano() |
| result.Attrs = attrsToAnyData(stat.Attrs) |
| } |
| |
| func makeServiceStat(stat *storage.Stat) store.Stat { |
| if stat == nil { |
| return nullStat |
| } |
| var result store.Stat |
| fillServiceStat(&result, stat) |
| return result |
| } |
| |
| func makeServiceEntry(e *storage.Entry) store.Entry { |
| if e == nil { |
| return nullEntry |
| } |
| result := store.Entry{Value: e.Value} |
| fillServiceStat(&result.Stat, &e.Stat) |
| return result |
| } |
| |
| func (o *object) String() string { |
| return o.name |
| } |
| |
| func (o *object) Attributes(arg string) map[string]string { |
| return map[string]string{ |
| "health": "ok", |
| "servertype": o.String(), |
| } |
| } |
| |
| func attrsFromAnyData(attrs []vdlutil.Any) ([]storage.Attr, error) { |
| typedAttrs := make([]storage.Attr, len(attrs)) |
| for i, x := range attrs { |
| a, ok := x.(storage.Attr) |
| if !ok { |
| return nil, errNotAnAttribute |
| } |
| typedAttrs[i] = a |
| } |
| return typedAttrs, nil |
| } |
| |
| func attrsToAnyData(attrs []storage.Attr) []vdlutil.Any { |
| uattrs := make([]vdlutil.Any, len(attrs)) |
| for i, x := range attrs { |
| uattrs[i] = x |
| } |
| return uattrs |
| } |
| |
| // Exists returns true iff the Entry has a value. |
| func (o *object) Exists(ctx ipc.ServerContext, tid store.TransactionID) (bool, error) { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return false, err |
| } |
| return o.obj.Exists(ctx.RemoteID(), t) |
| } |
| |
| // Get returns the value for the Object. The value returned is from the |
| // most recent mutation of the entry in the Transaction, or from the |
| // Transaction's snapshot if there is no mutation. |
| func (o *object) Get(ctx ipc.ServerContext, tid store.TransactionID) (store.Entry, error) { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return nullEntry, err |
| } |
| entry, err := o.obj.Get(ctx.RemoteID(), t) |
| if err != nil { |
| return nullEntry, err |
| } |
| return makeServiceEntry(entry), err |
| } |
| |
| // Put modifies the value of the Object. |
| func (o *object) Put(ctx ipc.ServerContext, tid store.TransactionID, val vdlutil.Any) (store.Stat, error) { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return nullStat, err |
| } |
| stat, err := o.obj.Put(ctx.RemoteID(), t, interface{}(val)) |
| if err != nil { |
| return nullStat, err |
| } |
| return makeServiceStat(stat), nil |
| } |
| |
| // Remove removes the Object. |
| func (o *object) Remove(ctx ipc.ServerContext, tid store.TransactionID) error { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return err |
| } |
| return o.obj.Remove(ctx.RemoteID(), t) |
| } |
| |
| // SetAttr changes the attributes of the entry, such as permissions and |
| // replication groups. Attributes are associated with the value, not the |
| // path. |
| func (o *object) SetAttr(ctx ipc.ServerContext, tid store.TransactionID, attrs []vdlutil.Any) error { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return err |
| } |
| typedAttrs, err := attrsFromAnyData(attrs) |
| if err != nil { |
| return err |
| } |
| return o.obj.SetAttr(ctx.RemoteID(), t, typedAttrs...) |
| } |
| |
| // Stat returns entry info. |
| func (o *object) Stat(ctx ipc.ServerContext, tid store.TransactionID) (store.Stat, error) { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return nullStat, err |
| } |
| stat, err := o.obj.Stat(ctx.RemoteID(), t) |
| if err != nil { |
| return nullStat, err |
| } |
| return makeServiceStat(stat), nil |
| } |
| |
| // Query returns a sequence of objects that match the given query. |
| func (o *object) Query(ctx ipc.ServerContext, tid store.TransactionID, q query.Query, stream store.ObjectServiceQueryStream) error { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return err |
| } |
| it, err := o.obj.Query(ctx.RemoteID(), t, q) |
| if err != nil { |
| return err |
| } |
| for it.Next() { |
| if err := stream.Send(*it.Get()); err != nil { |
| it.Abort() |
| return err |
| } |
| } |
| return it.Err() |
| } |
| |
| type globStreamAdapter struct { |
| stream mounttable.GlobableServiceGlobStream |
| } |
| |
| func (a *globStreamAdapter) Send(item string) error { |
| return a.stream.Send(mounttable.MountEntry{ |
| Name: item, |
| }) |
| } |
| |
| // Glob streams a series of names that match the given pattern. |
| func (o *object) Glob(ctx ipc.ServerContext, pattern string, stream mounttable.GlobableServiceGlobStream) error { |
| return o.GlobT(ctx, nullTransactionID, pattern, &globStreamAdapter{stream}) |
| } |
| |
| // Glob streams a series of names that match the given pattern. |
| func (o *object) GlobT(ctx ipc.ServerContext, tid store.TransactionID, pattern string, stream store.ObjectServiceGlobTStream) error { |
| t, err := o.server.findTransaction(ctx, tid) |
| if err != nil { |
| return err |
| } |
| it, err := o.obj.Glob(ctx.RemoteID(), t, pattern) |
| if err != nil { |
| return err |
| } |
| for ; it.IsValid(); it.Next() { |
| if ctx.IsClosed() { |
| break |
| } |
| if err := stream.Send(it.Name()); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // ChangeBatchStream is an interface for streaming responses of type ChangeBatch. |
| type ChangeBatchStream interface { |
| // Send places the item onto the output stream, blocking if there is no buffer |
| // space available. |
| Send(watch.ChangeBatch) error |
| } |
| |
| // entryTransformStream implements GlobWatcherServiceWatchGlobStream and |
| // QueryWatcherServiceWatchQueryStream. It wraps a ChangeBatchStream, |
| // transforming the value in each change from *storage.Entry to *store.Entry. |
| type entryTransformStream struct { |
| delegate ChangeBatchStream |
| } |
| |
| func (s *entryTransformStream) Send(cb watch.ChangeBatch) error { |
| // Copy and transform the ChangeBatch. |
| changes := cb.Changes |
| changesCp := make([]watch.Change, len(changes)) |
| cbCp := watch.ChangeBatch{changesCp} |
| for i, changeCp := range changes { |
| if changes[i].Value != nil { |
| entry := changes[i].Value.(*storage.Entry) |
| serviceEntry := makeServiceEntry(entry) |
| changeCp.Value = &serviceEntry |
| } |
| changesCp[i] = changeCp |
| } |
| return s.delegate.Send(cbCp) |
| } |
| |
| // WatchGlob returns a stream of changes that match a pattern. |
| func (o *object) WatchGlob(ctx ipc.ServerContext, req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error { |
| path := storage.ParsePath(o.name) |
| stream = &entryTransformStream{stream} |
| return o.server.watcher.WatchGlob(ctx, path, req, stream) |
| } |
| |
| // WatchQuery returns a stream of changes that satisfy a query. |
| func (o *object) WatchQuery(ctx ipc.ServerContext, req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error { |
| path := storage.ParsePath(o.name) |
| stream = &entryTransformStream{stream} |
| return o.server.watcher.WatchQuery(ctx, path, req, stream) |
| } |