blob: 45b492e938660a830e69106dce9a170325bfb2f2 [file] [log] [blame]
package memstore
import (
"io"
"net"
"runtime"
"testing"
"time"
"veyron/services/store/raw"
"veyron2/ipc"
"veyron2/security"
"veyron2/storage"
)
// cancellableContext implements ipc.Context.
type cancellableContext struct {
cancelled chan struct{}
}
func newCancellableContext() *cancellableContext {
return &cancellableContext{cancelled: make(chan struct{})}
}
func (*cancellableContext) Server() ipc.Server {
return nil
}
func (*cancellableContext) Method() string {
return ""
}
func (*cancellableContext) Name() string {
return ""
}
func (*cancellableContext) Suffix() string {
return ""
}
func (*cancellableContext) Label() (l security.Label) {
return
}
func (*cancellableContext) CaveatDischarges() security.CaveatDischargeMap {
return nil
}
func (*cancellableContext) LocalID() security.PublicID {
return rootPublicID
}
func (*cancellableContext) RemoteID() security.PublicID {
return rootPublicID
}
func (*cancellableContext) LocalAddr() net.Addr {
return nil
}
func (*cancellableContext) RemoteAddr() net.Addr {
return nil
}
func (*cancellableContext) Deadline() (t time.Time) {
return
}
func (ctx *cancellableContext) IsClosed() bool {
select {
case <-ctx.cancelled:
return true
default:
return false
}
}
func (ctx *cancellableContext) Closed() <-chan struct{} {
return ctx.cancelled
}
// cancel synchronously closes the context. After cancel returns, calls to
// IsClosed will return true and the stream returned by Closed will be closed.
func (ctx *cancellableContext) cancel() {
close(ctx.cancelled)
}
// serverStream implements raw.StoreServicePutMutationsStream
type serverStream struct {
mus <-chan raw.Mutation
}
func (s *serverStream) Recv() (raw.Mutation, error) {
mu, ok := <-s.mus
if !ok {
return mu, io.EOF
}
return mu, nil
}
// clientStream implements raw.StorePutMutationsStream
type clientStream struct {
ctx ipc.Context
closed bool
mus chan<- raw.Mutation
err <-chan error
}
func (s *clientStream) Send(mu raw.Mutation) error {
s.mus <- mu
return nil
}
func (s *clientStream) CloseSend() error {
if !s.closed {
s.closed = true
close(s.mus)
}
return nil
}
func (s *clientStream) Finish() error {
s.CloseSend()
return <-s.err
}
func (s *clientStream) Cancel() {
s.ctx.(*cancellableContext).cancel()
s.CloseSend()
}
func putMutations(st *Store) raw.StorePutMutationsStream {
ctx := newCancellableContext()
mus := make(chan raw.Mutation)
err := make(chan error)
go func() {
err <- st.PutMutations(ctx, &serverStream{mus})
close(err)
}()
return &clientStream{
ctx: ctx,
mus: mus,
err: err,
}
}
func putMutationsBatch(t *testing.T, st *Store, mus []raw.Mutation) {
clientStream := putMutations(st)
for _, mu := range mus {
clientStream.Send(mu)
}
if err := clientStream.Finish(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
}
}
func mkdir(t *testing.T, st *Store, tr storage.Transaction, path string) (storage.ID, interface{}) {
_, file, line, _ := runtime.Caller(1)
dir := &Dir{}
stat, err := st.Bind(path).Put(rootPublicID, tr, dir)
if err != nil || stat == nil {
t.Errorf("%s(%d): mkdir %s: %s", file, line, path, err)
}
return stat.ID, dir
}
func get(t *testing.T, st *Store, tr storage.Transaction, path string) interface{} {
_, file, line, _ := runtime.Caller(1)
e, err := st.Bind(path).Get(rootPublicID, tr)
if err != nil {
t.Fatalf("%s(%d): can't get %s: %s", file, line, path, err)
}
return e.Value
}
func put(t *testing.T, st *Store, tr storage.Transaction, path string, v interface{}) storage.ID {
_, file, line, _ := runtime.Caller(1)
stat, err := st.Bind(path).Put(rootPublicID, tr, v)
if err != nil {
t.Errorf("%s(%d): can't put %s: %s", file, line, path, err)
}
if _, err := st.Bind(path).Get(rootPublicID, tr); err != nil {
t.Errorf("%s(%d): can't get %s: %s", file, line, path, err)
}
if stat != nil {
return stat.ID
}
return storage.ID{}
}
func remove(t *testing.T, st *Store, tr storage.Transaction, path string) {
if err := st.Bind(path).Remove(rootPublicID, tr); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): can't remove %s: %s", file, line, path, err)
}
}
func commit(t *testing.T, tr storage.Transaction) {
if err := tr.Commit(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s(%d): Transaction aborted: %s", file, line, err)
}
}
func expectExists(t *testing.T, st *Store, tr storage.Transaction, path string) {
_, file, line, _ := runtime.Caller(1)
if ok, _ := st.Bind(path).Exists(rootPublicID, tr); !ok {
t.Errorf("%s(%d): does not exist: %s", file, line, path)
}
}
func expectNotExists(t *testing.T, st *Store, tr storage.Transaction, path string) {
if e, err := st.Bind(path).Get(rootPublicID, tr); err == nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): should not exist: %s: got %+v", file, line, path, e.Value)
}
}
func expectValue(t *testing.T, st *Store, tr storage.Transaction, path string, v interface{}) {
_, file, line, _ := runtime.Caller(1)
e, err := st.Bind(path).Get(rootPublicID, tr)
if err != nil {
t.Errorf("%s(%d): does not exist: %s", file, line, path)
return
}
if e.Value != v {
t.Errorf("%s(%d): expected %+v, got %+v", file, line, e.Value, v)
}
}