blob: 5f1087513c2e83a89548aeab07273e79d7fd2211 [file] [log] [blame]
package watch
import (
"io/ioutil"
"net"
"os"
"runtime"
"testing"
"time"
"veyron/services/store/memstore"
"veyron/services/store/raw"
"veyron/services/store/service"
"veyron2/ipc"
"veyron2/security"
"veyron2/services/watch"
"veyron2/storage"
)
var (
rootPublicID security.PublicID = security.FakePublicID("root")
rootCtx ipc.ServerContext = &rootContext{}
)
type rootContext struct{}
func (*rootContext) Server() ipc.Server {
return nil
}
func (*rootContext) Method() string {
return ""
}
func (*rootContext) Name() string {
return ""
}
func (*rootContext) Suffix() string {
return ""
}
func (*rootContext) Label() (l security.Label) {
return
}
func (*rootContext) CaveatDischarges() security.CaveatDischargeMap {
return nil
}
func (*rootContext) LocalID() security.PublicID {
return rootPublicID
}
func (*rootContext) RemoteID() security.PublicID {
return rootPublicID
}
func (*rootContext) LocalAddr() net.Addr {
return nil
}
func (*rootContext) RemoteAddr() net.Addr {
return nil
}
func (*rootContext) Deadline() (t time.Time) {
return
}
func (rootContext) IsClosed() bool {
return false
}
func (rootContext) Closed() <-chan struct{} {
return nil
}
type cancellableContext struct {
rootContext
cancelled chan struct{}
}
func newCancellableContext() *cancellableContext {
return &cancellableContext{cancelled: make(chan struct{})}
}
func (ctx *cancellableContext) IsClosed() bool {
select {
case <-ctx.cancelled:
return true
default:
return false
}
}
func (ctx *cancellableContext) Cancel() {
close(ctx.cancelled)
}
func (ctx *cancellableContext) Closed() <-chan struct{} {
return ctx.cancelled
}
func get(t *testing.T, st *memstore.Store, tr service.Transaction, path string) interface{} {
_, file, line, _ := runtime.Caller(1)
e, err := st.Bind(path).Get(rootPublicID, tr)
if err != nil {
t.Fatalf("%s(%d): can't get %s: %s", file, line, path, err)
}
return e.Value
}
func put(t *testing.T, st *memstore.Store, tr service.Transaction, path string, v interface{}) storage.ID {
_, file, line, _ := runtime.Caller(1)
stat, err := st.Bind(path).Put(rootPublicID, tr, v)
if err != nil {
t.Fatalf("%s(%d): can't put %s: %s", file, line, path, err)
}
if _, err := st.Bind(path).Get(rootPublicID, tr); err != nil {
t.Fatalf("%s(%d): can't get %s: %s", file, line, path, err)
}
if stat != nil {
return stat.ID
}
return storage.ID{}
}
func remove(t *testing.T, st *memstore.Store, tr service.Transaction, path string) {
if err := st.Bind(path).Remove(rootPublicID, tr); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s(%d): can't remove %s: %s", file, line, path, err)
}
}
func commit(t *testing.T, tr service.Transaction) {
if err := tr.Commit(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s(%d): Transaction aborted: %s", file, line, err)
}
}
func gc(t *testing.T, st *memstore.Store) {
if err := st.GC(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s(%d): can't gc: %s", file, line, err)
}
}
func createStore(t *testing.T) (string, *memstore.Store, func()) {
dbName, err := ioutil.TempDir(os.TempDir(), "vstore")
if err != nil {
t.Fatalf("ioutil.TempDir() failed: %v", err)
}
cleanup := func() {
os.RemoveAll(dbName)
}
st, err := memstore.New(rootPublicID, dbName)
if err != nil {
cleanup()
t.Fatalf("memstore.New() failed: %v", err)
}
return dbName, st, cleanup
}
func openStore(t *testing.T, dbName string) (*memstore.Store, func()) {
st, err := memstore.New(rootPublicID, dbName)
if err != nil {
t.Fatalf("memstore.New() failed: %v", err)
}
return st, func() {
os.RemoveAll(dbName)
}
}
func openLog(t *testing.T, dbName string) (*memstore.RLog, func(), reqProcessor) {
log, err := memstore.OpenLog(dbName, true)
if err != nil {
t.Fatalf("openLog() failed: %v", err)
}
cleanup := func() {
log.Close()
}
processor, err := newSyncProcessor(rootPublicID)
if err != nil {
cleanup()
t.Fatalf("newSyncProcessor() failed: %v", err)
}
return log, cleanup, processor
}
func createWatcher(t *testing.T, dbName string) (service.Watcher, func()) {
w, err := New(rootPublicID, dbName)
if err != nil {
t.Fatalf("New() failed: %v", err)
}
return w, func() {
w.Close()
}
}
type watchResult struct {
changes chan watch.Change
err error
}
func (wr *watchResult) Send(cb watch.ChangeBatch) error {
for _, change := range cb.Changes {
wr.changes <- change
}
return nil
}
// doWatch executes a watch request and returns a new watchResult.
// Change events may be received on the channel "changes".
// Once "changes" is closed, any error that occurred is stored to "err".
func doWatch(w service.Watcher, ctx ipc.ServerContext, req watch.Request) *watchResult {
wr := &watchResult{changes: make(chan watch.Change)}
go func() {
defer close(wr.changes)
if err := w.Watch(ctx, req, wr); err != nil {
wr.err = err
}
}()
return wr
}
var (
empty = []storage.DEntry{}
)
func dir(name string, id storage.ID) []storage.DEntry {
return []storage.DEntry{storage.DEntry{
Name: name,
ID: id,
}}
}
func expectInitialStateSkipped(t *testing.T, change watch.Change) {
if change.Name != "" {
t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
}
if change.State != watch.InitialStateSkipped {
t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
}
if len(change.ResumeMarker) != 0 {
t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
}
}
func expectExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
change := findChange(t, changes, id)
if change.State != watch.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
if cv.Version != post {
t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
}
if cv.IsRoot != isRoot {
t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
}
if cv.Value != value {
t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
}
expectDirEquals(t, cv.Dir, dir)
}
func expectDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
change := findChange(t, changes, id)
if change.State != watch.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
if cv.Version != storage.NoVersion {
t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
}
if cv.IsRoot != isRoot {
t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
}
if cv.Value != nil {
t.Fatal("Expected Value to be nil")
}
if cv.Dir != nil {
t.Fatal("Expected Dir to be nil")
}
}
func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
for _, change := range changes {
cv, ok := change.Value.(*raw.Mutation)
if !ok {
t.Fatal("Expected a Mutation")
}
if cv.ID == id {
return change
}
}
t.Fatalf("Expected a change for id: %v", id)
panic("should not reach here")
}
func expectDirEquals(t *testing.T, actual, expected []storage.DEntry) {
if len(actual) != len(expected) {
t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
}
for i, e := range expected {
a := actual[i]
if a != e {
t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
}
}
}