blob: 8c6019023bbb0b815a9fde1fdae343914c14d214 [file] [log] [blame]
package testing
import (
"io"
"runtime"
"sync"
"testing"
"veyron/services/store/raw"
"veyron2/context"
"veyron2/ipc"
"veyron2/naming"
"veyron2/rt"
"veyron2/security"
"veyron2/services/watch"
"veyron2/services/watch/types"
"veyron2/storage"
)
// FakeServerContext implements ipc.ServerContext.
type FakeServerContext struct {
context.T
cancel context.CancelFunc
id security.PublicID
}
func NewFakeServerContext(id security.PublicID) *FakeServerContext {
ctx, cancel := rt.R().NewContext().WithCancel()
return &FakeServerContext{
T: ctx,
cancel: cancel,
id: id,
}
}
func (*FakeServerContext) Server() ipc.Server { return nil }
func (*FakeServerContext) Method() string { return "" }
func (*FakeServerContext) Name() string { return "" }
func (*FakeServerContext) Suffix() string { return "" }
func (*FakeServerContext) Label() (l security.Label) { return }
func (*FakeServerContext) CaveatDischarges() security.CaveatDischargeMap { return nil }
func (ctx *FakeServerContext) LocalID() security.PublicID { return ctx.id }
func (ctx *FakeServerContext) RemoteID() security.PublicID { return ctx.id }
func (*FakeServerContext) Blessing() security.PublicID { return nil }
func (*FakeServerContext) LocalEndpoint() naming.Endpoint { return nil }
func (*FakeServerContext) RemoteEndpoint() naming.Endpoint { return nil }
func (ctx *FakeServerContext) Cancel() { ctx.cancel() }
// Utilities for PutMutations.
// storeServicePutMutationsStream implements raw.StoreServicePutMutationsStream
type storeServicePutMutationsStream struct {
mus <-chan raw.Mutation
value raw.Mutation
}
func (s *storeServicePutMutationsStream) RecvStream() interface {
Advance() bool
Value() raw.Mutation
Err() error
} {
return s
}
func (s *storeServicePutMutationsStream) Advance() bool {
var ok bool
s.value, ok = <-s.mus
return ok
}
func (s *storeServicePutMutationsStream) Value() raw.Mutation {
return s.value
}
func (s *storeServicePutMutationsStream) Err() error {
return nil
}
// storePutMutationsStream implements raw.StorePutMutationsStream
type storePutMutationsStream struct {
closed bool
mus chan<- raw.Mutation
}
func (s *storePutMutationsStream) Send(mu raw.Mutation) error {
s.mus <- mu
return nil
}
func (s *storePutMutationsStream) Close() error {
if !s.closed {
s.closed = true
close(s.mus)
}
return nil
}
type storePutMutationsCall struct {
ctx ipc.ServerContext
stream storePutMutationsStream
err <-chan error
}
func (s *storePutMutationsCall) SendStream() interface {
Send(mu raw.Mutation) error
Close() error
} {
return &s.stream
}
func (s *storePutMutationsCall) Finish() error {
s.stream.Close()
return <-s.err
}
func (s *storePutMutationsCall) Cancel() {
s.ctx.(*FakeServerContext).Cancel()
s.stream.Close()
}
func PutMutations(id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error) raw.StorePutMutationsCall {
ctx := NewFakeServerContext(id)
mus := make(chan raw.Mutation)
err := make(chan error)
go func() {
err <- putMutationsFn(ctx, &storeServicePutMutationsStream{mus: mus})
close(err)
}()
return &storePutMutationsCall{
ctx: ctx,
err: err,
stream: storePutMutationsStream{
mus: mus,
},
}
}
func PutMutationsBatch(t *testing.T, id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error, mus []raw.Mutation) {
storePutMutationsStream := PutMutations(id, putMutationsFn)
for _, mu := range mus {
storePutMutationsStream.SendStream().Send(mu)
}
if err := storePutMutationsStream.Finish(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
}
}
// Utilities for Watch.
// watcherServiceWatchStreamSender implements watch.WatcherServiceWatchStreamSender
type watcherServiceWatchStreamSender struct {
mu *sync.Mutex
ctx ipc.ServerContext
output chan<- types.ChangeBatch
}
func (s *watcherServiceWatchStreamSender) Send(cb types.ChangeBatch) error {
s.mu.Lock()
defer s.mu.Unlock()
select {
case s.output <- cb:
return nil
case <-s.ctx.Done():
return io.EOF
}
}
// watcherServiceWatchStream implements watch.WatcherServiceWatchStream
type watcherServiceWatchStream struct {
watcherServiceWatchStreamSender
}
func (s *watcherServiceWatchStream) SendStream() interface {
Send(cb types.ChangeBatch) error
} {
return s
}
func (*watcherServiceWatchStream) Cancel() {}
// watcherWatchStream implements watch.WatcherWatchStream.
type watcherWatchStream struct {
ctx *FakeServerContext
value types.ChangeBatch
input <-chan types.ChangeBatch
err <-chan error
}
func (s *watcherWatchStream) Advance() bool {
var ok bool
s.value, ok = <-s.input
return ok
}
func (s *watcherWatchStream) Value() types.ChangeBatch {
return s.value
}
func (*watcherWatchStream) Err() error {
return nil
}
func (s *watcherWatchStream) Finish() error {
<-s.input
return <-s.err
}
func (s *watcherWatchStream) Cancel() {
s.ctx.Cancel()
}
func (s *watcherWatchStream) RecvStream() interface {
Advance() bool
Value() types.ChangeBatch
Err() error
} {
return s
}
func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
mu := &sync.Mutex{}
ctx := NewFakeServerContext(id)
c := make(chan types.ChangeBatch, 1)
errc := make(chan error, 1)
go func() {
stream := &watcherServiceWatchStream{
watcherServiceWatchStreamSender{
mu: mu,
ctx: ctx,
output: c,
},
}
err := watchFn(ctx, stream)
mu.Lock()
defer mu.Unlock()
ctx.Cancel()
close(c)
errc <- err
close(errc)
}()
return &watcherWatchStream{
ctx: ctx,
input: c,
err: errc,
}
}
func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchCall {
return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
return watchFn(ctx, req, stream)
})
}
func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, types.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req types.GlobRequest) watch.GlobWatcherWatchGlobCall {
return watchImpl(id, func(ctx ipc.ServerContext, iterator *watcherServiceWatchStream) error {
return watchFn(ctx, req, iterator)
})
}
func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, types.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req types.GlobRequest) watch.GlobWatcherWatchGlobCall {
return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
return watchFn(ctx, path, req, stream)
})
}
func ExpectInitialStateSkipped(t *testing.T, change types.Change) {
if change.Name != "" {
t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
}
if change.State != types.InitialStateSkipped {
t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
}
if len(change.ResumeMarker) != 0 {
t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
}
}
func ExpectEntryExists(t *testing.T, changes []types.Change, name string, id storage.ID, value string) {
change := findEntry(t, changes, name)
if change.State != types.Exists {
t.Fatalf("Expected name to exist: %v", name)
}
cv, ok := change.Value.(*storage.Entry)
if !ok {
t.Fatal("Expected an Entry")
}
if cv.Stat.ID != id {
t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
}
if cv.Value != value {
t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
}
}
func ExpectEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
change := findEntry(t, changes, name)
if change.State != types.DoesNotExist {
t.Fatalf("Expected name to not exist: %v", name)
}
if change.Value != nil {
t.Fatal("Expected entry to be nil")
}
}
func ExpectServiceEntryExists(t *testing.T, changes []types.Change, name string, id storage.ID, value string) {
change := findEntry(t, changes, name)
if change.State != types.Exists {
t.Fatalf("Expected name to exist: %v", name)
}
cv, ok := change.Value.(*storage.Entry)
if !ok {
t.Fatal("Expected a service Entry")
}
if cv.Stat.ID != id {
t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
}
if cv.Value != value {
t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
}
}
func ExpectServiceEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
change := findEntry(t, changes, name)
if change.State != types.DoesNotExist {
t.Fatalf("Expected name to not exist: %v", name)
}
if change.Value != nil {
t.Fatal("Expected entry to be nil")
}
}
func findEntry(t *testing.T, changes []types.Change, name string) types.Change {
for _, change := range changes {
if change.Name == name {
return change
}
}
t.Fatalf("Expected a change for name: %v", name)
panic("Should not reach here")
}
var (
EmptyDir = []raw.DEntry{}
)
func DirOf(name string, id storage.ID) []raw.DEntry {
return []raw.DEntry{raw.DEntry{
Name: name,
ID: id,
}}
}
func ExpectMutationExists(t *testing.T, changes []types.Change, id storage.ID, pre, post raw.Version, isRoot bool, value string, dir []raw.DEntry) {
change := findMutation(t, changes, id)
if change.State != types.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
if cv.Version != post {
t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
}
if cv.IsRoot != isRoot {
t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
}
if cv.Value != value {
t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
}
expectDirEquals(t, cv.Dir, dir)
}
func ExpectMutationDoesNotExist(t *testing.T, changes []types.Change, id storage.ID, pre raw.Version, isRoot bool) {
change := findMutation(t, changes, id)
if change.State != types.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
if cv.Version != raw.NoVersion {
t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
}
if cv.IsRoot != isRoot {
t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
}
if cv.Value != nil {
t.Fatal("Expected Value to be nil")
}
if cv.Dir != nil {
t.Fatal("Expected Dir to be nil")
}
}
func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []types.Change, id storage.ID, value string) {
change := findMutation(t, changes, id)
if change.State != types.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.Value != value {
t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
}
}
func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []types.Change, id storage.ID) {
change := findMutation(t, changes, id)
if change.State != types.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
}
func findMutation(t *testing.T, changes []types.Change, id storage.ID) types.Change {
for _, change := range changes {
cv, ok := change.Value.(*raw.Mutation)
if !ok {
t.Fatal("Expected a Mutation")
}
if cv.ID == id {
return change
}
}
t.Fatalf("Expected a change for id: %v", id)
panic("Should not reach here")
}
func expectDirEquals(t *testing.T, actual, expected []raw.DEntry) {
if len(actual) != len(expected) {
t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
}
for i, e := range expected {
a := actual[i]
if a != e {
t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
}
}
}