blob: 1a5e503aad6bee252aa62152492114236caa97ca [file] [log] [blame]
package testing
import (
"io"
"runtime"
"sync"
"testing"
"time"
"veyron/services/store/raw"
"veyron2/ipc"
"veyron2/naming"
"veyron2/security"
"veyron2/services/store"
"veyron2/services/watch"
"veyron2/storage"
)
// CancellableContext implements ipc.ServerContext.
type CancellableContext struct {
id security.PublicID
mu sync.Mutex
cancelled chan struct{}
}
func NewCancellableContext(id security.PublicID) *CancellableContext {
return &CancellableContext{
id: id,
cancelled: make(chan struct{}),
}
}
func (*CancellableContext) Server() ipc.Server {
return nil
}
func (*CancellableContext) Method() string {
return ""
}
func (*CancellableContext) Name() string {
return ""
}
func (*CancellableContext) Suffix() string {
return ""
}
func (*CancellableContext) Label() (l security.Label) {
return
}
func (*CancellableContext) CaveatDischarges() security.CaveatDischargeMap {
return nil
}
func (ctx *CancellableContext) LocalID() security.PublicID {
return ctx.id
}
func (ctx *CancellableContext) RemoteID() security.PublicID {
return ctx.id
}
func (*CancellableContext) Blessing() security.PublicID {
return nil
}
func (*CancellableContext) LocalEndpoint() naming.Endpoint {
return nil
}
func (*CancellableContext) RemoteEndpoint() naming.Endpoint {
return nil
}
func (*CancellableContext) Deadline() (t time.Time) {
return
}
func (ctx *CancellableContext) IsClosed() bool {
select {
case <-ctx.cancelled:
return true
default:
return false
}
}
// cancel synchronously closes the context. After cancel returns, calls to
// IsClosed will return true and the stream returned by Closed will be closed.
func (ctx *CancellableContext) Cancel() {
ctx.mu.Lock()
defer ctx.mu.Unlock()
if !ctx.IsClosed() {
close(ctx.cancelled)
}
}
func (ctx *CancellableContext) Closed() <-chan struct{} {
return ctx.cancelled
}
// Utilities for PutMutations.
// storeServicePutMutationsStream implements raw.StoreServicePutMutationsStream
type storeServicePutMutationsStream struct {
mus <-chan raw.Mutation
value raw.Mutation
}
func (s *storeServicePutMutationsStream) Advance() bool {
var ok bool
s.value, ok = <-s.mus
return ok
}
func (s *storeServicePutMutationsStream) Value() raw.Mutation {
return s.value
}
func (s *storeServicePutMutationsStream) Err() error {
return nil
}
// storePutMutationsStream implements raw.StorePutMutationsStream
type storePutMutationsStream struct {
ctx ipc.ServerContext
closed bool
mus chan<- raw.Mutation
err <-chan error
}
func (s *storePutMutationsStream) Send(mu raw.Mutation) error {
s.mus <- mu
return nil
}
func (s *storePutMutationsStream) CloseSend() error {
if !s.closed {
s.closed = true
close(s.mus)
}
return nil
}
func (s *storePutMutationsStream) Finish() error {
s.CloseSend()
return <-s.err
}
func (s *storePutMutationsStream) Cancel() {
s.ctx.(*CancellableContext).Cancel()
s.CloseSend()
}
func PutMutations(id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error) raw.StorePutMutationsStream {
ctx := NewCancellableContext(id)
mus := make(chan raw.Mutation)
err := make(chan error)
go func() {
err <- putMutationsFn(ctx, &storeServicePutMutationsStream{mus: mus})
close(err)
}()
return &storePutMutationsStream{
ctx: ctx,
mus: mus,
err: err,
}
}
func PutMutationsBatch(t *testing.T, id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error, mus []raw.Mutation) {
storePutMutationsStream := PutMutations(id, putMutationsFn)
for _, mu := range mus {
storePutMutationsStream.Send(mu)
}
if err := storePutMutationsStream.Finish(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
}
}
// Utilities for Watch.
// watcherServiceWatchStream implements watch.WatcherServiceWatchStream.
type watcherServiceWatchStream struct {
mu *sync.Mutex
ctx ipc.ServerContext
output chan<- watch.ChangeBatch
}
func (s *watcherServiceWatchStream) Send(cb watch.ChangeBatch) error {
s.mu.Lock()
defer s.mu.Unlock()
select {
case s.output <- cb:
return nil
case <-s.ctx.Closed():
return io.EOF
}
}
// watcherWatchStream implements watch.WatcherWatchStream.
type watcherWatchStream struct {
ctx *CancellableContext
value watch.ChangeBatch
input <-chan watch.ChangeBatch
err <-chan error
}
func (s *watcherWatchStream) Advance() bool {
var ok bool
s.value, ok = <-s.input
return ok
}
func (s *watcherWatchStream) Value() watch.ChangeBatch {
return s.value
}
func (*watcherWatchStream) Err() error {
return nil
}
func (s *watcherWatchStream) Finish() error {
<-s.input
return <-s.err
}
func (s *watcherWatchStream) Cancel() {
s.ctx.Cancel()
}
func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
mu := &sync.Mutex{}
ctx := NewCancellableContext(id)
c := make(chan watch.ChangeBatch, 1)
errc := make(chan error, 1)
go func() {
stream := &watcherServiceWatchStream{
mu: mu,
ctx: ctx,
output: c,
}
err := watchFn(ctx, stream)
mu.Lock()
defer mu.Unlock()
ctx.Cancel()
close(c)
errc <- err
close(errc)
}()
return &watcherWatchStream{
ctx: ctx,
input: c,
err: errc,
}
}
func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchStream {
return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
return watchFn(ctx, req, stream)
})
}
func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
return watchFn(ctx, req, stream)
})
}
func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
return watchFn(ctx, path, req, stream)
})
}
func ExpectInitialStateSkipped(t *testing.T, change watch.Change) {
if change.Name != "" {
t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
}
if change.State != watch.InitialStateSkipped {
t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
}
if len(change.ResumeMarker) != 0 {
t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
}
}
func ExpectEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
change := findEntry(t, changes, name)
if change.State != watch.Exists {
t.Fatalf("Expected name to exist: %v", name)
}
cv, ok := change.Value.(*storage.Entry)
if !ok {
t.Fatal("Expected an Entry")
}
if cv.Stat.ID != id {
t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
}
if cv.Value != value {
t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
}
}
func ExpectEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
change := findEntry(t, changes, name)
if change.State != watch.DoesNotExist {
t.Fatalf("Expected name to not exist: %v", name)
}
if change.Value != nil {
t.Fatal("Expected entry to be nil")
}
}
func ExpectServiceEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
change := findEntry(t, changes, name)
if change.State != watch.Exists {
t.Fatalf("Expected name to exist: %v", name)
}
cv, ok := change.Value.(*store.Entry)
if !ok {
t.Fatal("Expected a service Entry")
}
if cv.Stat.ID != id {
t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
}
if cv.Value != value {
t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
}
}
func ExpectServiceEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
change := findEntry(t, changes, name)
if change.State != watch.DoesNotExist {
t.Fatalf("Expected name to not exist: %v", name)
}
if change.Value != nil {
t.Fatal("Expected entry to be nil")
}
}
func findEntry(t *testing.T, changes []watch.Change, name string) watch.Change {
for _, change := range changes {
if change.Name == name {
return change
}
}
t.Fatalf("Expected a change for name: %v", name)
panic("Should not reach here")
}
var (
EmptyDir = []storage.DEntry{}
)
func DirOf(name string, id storage.ID) []storage.DEntry {
return []storage.DEntry{storage.DEntry{
Name: name,
ID: id,
}}
}
func ExpectMutationExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
change := findMutation(t, changes, id)
if change.State != watch.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
if cv.Version != post {
t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
}
if cv.IsRoot != isRoot {
t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
}
if cv.Value != value {
t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
}
expectDirEquals(t, cv.Dir, dir)
}
func ExpectMutationDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
change := findMutation(t, changes, id)
if change.State != watch.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
if cv.Version != storage.NoVersion {
t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
}
if cv.IsRoot != isRoot {
t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
}
if cv.Value != nil {
t.Fatal("Expected Value to be nil")
}
if cv.Dir != nil {
t.Fatal("Expected Dir to be nil")
}
}
func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID, value string) {
change := findMutation(t, changes, id)
if change.State != watch.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.Value != value {
t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
}
}
func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID) {
change := findMutation(t, changes, id)
if change.State != watch.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
}
func findMutation(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
for _, change := range changes {
cv, ok := change.Value.(*raw.Mutation)
if !ok {
t.Fatal("Expected a Mutation")
}
if cv.ID == id {
return change
}
}
t.Fatalf("Expected a change for id: %v", id)
panic("Should not reach here")
}
func expectDirEquals(t *testing.T, actual, expected []storage.DEntry) {
if len(actual) != len(expected) {
t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
}
for i, e := range expected {
a := actual[i]
if a != e {
t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
}
}
}