blob: 00066f5f31e3fc0567fa4eade40a49a2e1c5f353 [file] [log] [blame]
Tilak Sharma13329d92014-07-14 12:25:28 -07001package testing
Tilak Sharma48202342014-05-13 23:49:49 -07002
3import (
Tilak Sharma13329d92014-07-14 12:25:28 -07004 "io"
5 "runtime"
Tilak Sharma48202342014-05-13 23:49:49 -07006 "sync"
Tilak Sharmadfba8ac2014-06-18 13:49:32 -07007 "testing"
Tilak Sharma48202342014-05-13 23:49:49 -07008 "time"
9
Tilak Sharma86571322014-06-11 14:15:07 -070010 "veyron/services/store/raw"
11
Tilak Sharma48202342014-05-13 23:49:49 -070012 "veyron2/ipc"
Andres Erbsencdeacfe2014-06-11 14:55:16 -070013 "veyron2/naming"
Tilak Sharma48202342014-05-13 23:49:49 -070014 "veyron2/security"
Tilak Sharmadfba8ac2014-06-18 13:49:32 -070015 "veyron2/services/store"
Tilak Sharma48202342014-05-13 23:49:49 -070016 "veyron2/services/watch"
Tilak Sharmadfba8ac2014-06-18 13:49:32 -070017 "veyron2/storage"
Tilak Sharma48202342014-05-13 23:49:49 -070018)
19
Tilak Sharma48202342014-05-13 23:49:49 -070020// CancellableContext implements ipc.ServerContext.
21type CancellableContext struct {
22 id security.PublicID
23 mu sync.Mutex
24 cancelled chan struct{}
25}
26
27func NewCancellableContext(id security.PublicID) *CancellableContext {
28 return &CancellableContext{
29 id: id,
30 cancelled: make(chan struct{}),
31 }
32}
33
34func (*CancellableContext) Server() ipc.Server {
35 return nil
36}
37
38func (*CancellableContext) Method() string {
39 return ""
40}
41
42func (*CancellableContext) Name() string {
43 return ""
44}
45
46func (*CancellableContext) Suffix() string {
47 return ""
48}
49
50func (*CancellableContext) Label() (l security.Label) {
51 return
52}
53
54func (*CancellableContext) CaveatDischarges() security.CaveatDischargeMap {
55 return nil
56}
57
58func (ctx *CancellableContext) LocalID() security.PublicID {
59 return ctx.id
60}
61
62func (ctx *CancellableContext) RemoteID() security.PublicID {
63 return ctx.id
64}
65
66func (*CancellableContext) Blessing() security.PublicID {
67 return nil
68}
69
Andres Erbsencdeacfe2014-06-11 14:55:16 -070070func (*CancellableContext) LocalEndpoint() naming.Endpoint {
Tilak Sharma48202342014-05-13 23:49:49 -070071 return nil
72}
73
Andres Erbsencdeacfe2014-06-11 14:55:16 -070074func (*CancellableContext) RemoteEndpoint() naming.Endpoint {
Tilak Sharma48202342014-05-13 23:49:49 -070075 return nil
76}
77
78func (*CancellableContext) Deadline() (t time.Time) {
79 return
80}
81
82func (ctx *CancellableContext) IsClosed() bool {
83 select {
84 case <-ctx.cancelled:
85 return true
86 default:
87 return false
88 }
89}
90
91// cancel synchronously closes the context. After cancel returns, calls to
92// IsClosed will return true and the stream returned by Closed will be closed.
93func (ctx *CancellableContext) Cancel() {
94 ctx.mu.Lock()
95 defer ctx.mu.Unlock()
96 if !ctx.IsClosed() {
97 close(ctx.cancelled)
98 }
99}
100
101func (ctx *CancellableContext) Closed() <-chan struct{} {
102 return ctx.cancelled
103}
104
Tilak Sharma13329d92014-07-14 12:25:28 -0700105// Utilities for PutMutations.
106
107// storeServicePutMutationsStream implements raw.StoreServicePutMutationsStream
108type storeServicePutMutationsStream struct {
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700109 mus <-chan raw.Mutation
110 value raw.Mutation
Tilak Sharma13329d92014-07-14 12:25:28 -0700111}
112
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700113func (s *storeServicePutMutationsStream) RecvStream() interface {
114 Advance() bool
115 Value() raw.Mutation
116 Err() error
117} {
118 return s
119}
120
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700121func (s *storeServicePutMutationsStream) Advance() bool {
122 var ok bool
123 s.value, ok = <-s.mus
124 return ok
125}
126
127func (s *storeServicePutMutationsStream) Value() raw.Mutation {
128 return s.value
129}
130
131func (s *storeServicePutMutationsStream) Err() error {
132 return nil
Tilak Sharma13329d92014-07-14 12:25:28 -0700133}
134
135// storePutMutationsStream implements raw.StorePutMutationsStream
136type storePutMutationsStream struct {
Tilak Sharma13329d92014-07-14 12:25:28 -0700137 closed bool
138 mus chan<- raw.Mutation
Tilak Sharma13329d92014-07-14 12:25:28 -0700139}
140
141func (s *storePutMutationsStream) Send(mu raw.Mutation) error {
142 s.mus <- mu
143 return nil
144}
145
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700146func (s *storePutMutationsStream) Close() error {
Tilak Sharma13329d92014-07-14 12:25:28 -0700147 if !s.closed {
148 s.closed = true
149 close(s.mus)
150 }
151 return nil
152}
153
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700154type storePutMutationsCall struct {
155 ctx ipc.ServerContext
156 stream storePutMutationsStream
157 err <-chan error
158}
159
160func (s *storePutMutationsCall) SendStream() interface {
161 Send(mu raw.Mutation) error
162 Close() error
163} {
164 return &s.stream
165}
166
167func (s *storePutMutationsCall) Finish() error {
168 s.stream.Close()
Tilak Sharma13329d92014-07-14 12:25:28 -0700169 return <-s.err
170}
171
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700172func (s *storePutMutationsCall) Cancel() {
Tilak Sharma13329d92014-07-14 12:25:28 -0700173 s.ctx.(*CancellableContext).Cancel()
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700174 s.stream.Close()
Tilak Sharma13329d92014-07-14 12:25:28 -0700175}
176
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700177func PutMutations(id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error) raw.StorePutMutationsCall {
Tilak Sharma13329d92014-07-14 12:25:28 -0700178 ctx := NewCancellableContext(id)
179 mus := make(chan raw.Mutation)
180 err := make(chan error)
181 go func() {
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700182 err <- putMutationsFn(ctx, &storeServicePutMutationsStream{mus: mus})
Tilak Sharma13329d92014-07-14 12:25:28 -0700183 close(err)
184 }()
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700185 return &storePutMutationsCall{
Tilak Sharma13329d92014-07-14 12:25:28 -0700186 ctx: ctx,
Tilak Sharma13329d92014-07-14 12:25:28 -0700187 err: err,
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700188 stream: storePutMutationsStream{
189 mus: mus,
190 },
Tilak Sharma13329d92014-07-14 12:25:28 -0700191 }
192}
193
194func PutMutationsBatch(t *testing.T, id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error, mus []raw.Mutation) {
195 storePutMutationsStream := PutMutations(id, putMutationsFn)
196 for _, mu := range mus {
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700197 storePutMutationsStream.SendStream().Send(mu)
Tilak Sharma13329d92014-07-14 12:25:28 -0700198 }
199 if err := storePutMutationsStream.Finish(); err != nil {
200 _, file, line, _ := runtime.Caller(1)
201 t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
202 }
203}
204
205// Utilities for Watch.
206
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700207// watcherServiceWatchStreamSender implements watch.WatcherServiceWatchStreamSender
208type watcherServiceWatchStreamSender struct {
Tilak Sharma48202342014-05-13 23:49:49 -0700209 mu *sync.Mutex
210 ctx ipc.ServerContext
211 output chan<- watch.ChangeBatch
212}
213
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700214func (s *watcherServiceWatchStreamSender) Send(cb watch.ChangeBatch) error {
Tilak Sharma48202342014-05-13 23:49:49 -0700215 s.mu.Lock()
216 defer s.mu.Unlock()
217 select {
218 case s.output <- cb:
219 return nil
220 case <-s.ctx.Closed():
Tilak Sharma13329d92014-07-14 12:25:28 -0700221 return io.EOF
Tilak Sharma48202342014-05-13 23:49:49 -0700222 }
223}
224
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700225// watcherServiceWatchStream implements watch.WatcherServiceWatchStream
226type watcherServiceWatchStream struct {
227 watcherServiceWatchStreamSender
228}
229
230func (s *watcherServiceWatchStream) SendStream() interface {
231 Send(cb watch.ChangeBatch) error
232} {
233 return s
234}
235func (*watcherServiceWatchStream) Cancel() {}
236
Tilak Sharma48202342014-05-13 23:49:49 -0700237// watcherWatchStream implements watch.WatcherWatchStream.
238type watcherWatchStream struct {
239 ctx *CancellableContext
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700240 value watch.ChangeBatch
Tilak Sharma48202342014-05-13 23:49:49 -0700241 input <-chan watch.ChangeBatch
242 err <-chan error
243}
244
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700245func (s *watcherWatchStream) Advance() bool {
246 var ok bool
247 s.value, ok = <-s.input
248 return ok
249}
250
251func (s *watcherWatchStream) Value() watch.ChangeBatch {
252 return s.value
253}
254
255func (*watcherWatchStream) Err() error {
256 return nil
Tilak Sharma48202342014-05-13 23:49:49 -0700257}
258
259func (s *watcherWatchStream) Finish() error {
260 <-s.input
261 return <-s.err
262}
263
264func (s *watcherWatchStream) Cancel() {
265 s.ctx.Cancel()
266}
267
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700268func (s *watcherWatchStream) RecvStream() interface {
269 Advance() bool
270 Value() watch.ChangeBatch
271 Err() error
272} {
273 return s
274}
275
Tilak Sharmadfba8ac2014-06-18 13:49:32 -0700276func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
Tilak Sharma48202342014-05-13 23:49:49 -0700277 mu := &sync.Mutex{}
278 ctx := NewCancellableContext(id)
279 c := make(chan watch.ChangeBatch, 1)
280 errc := make(chan error, 1)
281 go func() {
Tilak Sharmadfba8ac2014-06-18 13:49:32 -0700282 stream := &watcherServiceWatchStream{
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700283 watcherServiceWatchStreamSender{
284 mu: mu,
285 ctx: ctx,
286 output: c,
287 },
Tilak Sharmadfba8ac2014-06-18 13:49:32 -0700288 }
289 err := watchFn(ctx, stream)
Tilak Sharma48202342014-05-13 23:49:49 -0700290 mu.Lock()
291 defer mu.Unlock()
292 ctx.Cancel()
293 close(c)
294 errc <- err
295 close(errc)
296 }()
297 return &watcherWatchStream{
298 ctx: ctx,
299 input: c,
300 err: errc,
301 }
302}
Tilak Sharmadfba8ac2014-06-18 13:49:32 -0700303
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700304func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchCall {
Tilak Sharmadfba8ac2014-06-18 13:49:32 -0700305 return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
306 return watchFn(ctx, req, stream)
307 })
308}
309
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700310func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
311 return watchImpl(id, func(ctx ipc.ServerContext, iterator *watcherServiceWatchStream) error {
312 return watchFn(ctx, req, iterator)
Tilak Sharmadfba8ac2014-06-18 13:49:32 -0700313 })
314}
315
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700316func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
Tilak Sharmadfba8ac2014-06-18 13:49:32 -0700317 return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
318 return watchFn(ctx, path, req, stream)
319 })
320}
321
322func ExpectInitialStateSkipped(t *testing.T, change watch.Change) {
323 if change.Name != "" {
324 t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
325 }
326 if change.State != watch.InitialStateSkipped {
327 t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
328 }
329 if len(change.ResumeMarker) != 0 {
330 t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
331 }
332}
333
334func ExpectEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
335 change := findEntry(t, changes, name)
336 if change.State != watch.Exists {
337 t.Fatalf("Expected name to exist: %v", name)
338 }
339 cv, ok := change.Value.(*storage.Entry)
340 if !ok {
341 t.Fatal("Expected an Entry")
342 }
343 if cv.Stat.ID != id {
344 t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
345 }
346 if cv.Value != value {
347 t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
348 }
349}
350
351func ExpectEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
352 change := findEntry(t, changes, name)
353 if change.State != watch.DoesNotExist {
354 t.Fatalf("Expected name to not exist: %v", name)
355 }
356 if change.Value != nil {
357 t.Fatal("Expected entry to be nil")
358 }
359}
360
361func ExpectServiceEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
362 change := findEntry(t, changes, name)
363 if change.State != watch.Exists {
364 t.Fatalf("Expected name to exist: %v", name)
365 }
366 cv, ok := change.Value.(*store.Entry)
367 if !ok {
368 t.Fatal("Expected a service Entry")
369 }
370 if cv.Stat.ID != id {
371 t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
372 }
373 if cv.Value != value {
374 t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
375 }
376}
377
378func ExpectServiceEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
379 change := findEntry(t, changes, name)
380 if change.State != watch.DoesNotExist {
381 t.Fatalf("Expected name to not exist: %v", name)
382 }
383 if change.Value != nil {
384 t.Fatal("Expected entry to be nil")
385 }
386}
387
388func findEntry(t *testing.T, changes []watch.Change, name string) watch.Change {
389 for _, change := range changes {
390 if change.Name == name {
391 return change
392 }
393 }
394 t.Fatalf("Expected a change for name: %v", name)
395 panic("Should not reach here")
396}
397
398var (
399 EmptyDir = []storage.DEntry{}
400)
401
402func DirOf(name string, id storage.ID) []storage.DEntry {
403 return []storage.DEntry{storage.DEntry{
404 Name: name,
405 ID: id,
406 }}
407}
408
409func ExpectMutationExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
410 change := findMutation(t, changes, id)
411 if change.State != watch.Exists {
412 t.Fatalf("Expected id to exist: %v", id)
413 }
414 cv := change.Value.(*raw.Mutation)
415 if cv.PriorVersion != pre {
416 t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
417 }
418 if cv.Version != post {
419 t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
420 }
421 if cv.IsRoot != isRoot {
422 t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
423 }
424 if cv.Value != value {
425 t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
426 }
427 expectDirEquals(t, cv.Dir, dir)
428}
429
430func ExpectMutationDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
431 change := findMutation(t, changes, id)
432 if change.State != watch.DoesNotExist {
433 t.Fatalf("Expected id to not exist: %v", id)
434 }
435 cv := change.Value.(*raw.Mutation)
436 if cv.PriorVersion != pre {
437 t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
438 }
439 if cv.Version != storage.NoVersion {
440 t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
441 }
442 if cv.IsRoot != isRoot {
443 t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
444 }
445 if cv.Value != nil {
446 t.Fatal("Expected Value to be nil")
447 }
448 if cv.Dir != nil {
449 t.Fatal("Expected Dir to be nil")
450 }
451}
452
453func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID, value string) {
454 change := findMutation(t, changes, id)
455 if change.State != watch.Exists {
456 t.Fatalf("Expected id to exist: %v", id)
457 }
458 cv := change.Value.(*raw.Mutation)
459 if cv.Value != value {
460 t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
461 }
462}
463
464func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID) {
465 change := findMutation(t, changes, id)
466 if change.State != watch.DoesNotExist {
467 t.Fatalf("Expected id to not exist: %v", id)
468 }
469}
470
471func findMutation(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
472 for _, change := range changes {
473 cv, ok := change.Value.(*raw.Mutation)
474 if !ok {
475 t.Fatal("Expected a Mutation")
476 }
477 if cv.ID == id {
478 return change
479 }
480 }
481 t.Fatalf("Expected a change for id: %v", id)
482 panic("Should not reach here")
483}
484
485func expectDirEquals(t *testing.T, actual, expected []storage.DEntry) {
486 if len(actual) != len(expected) {
487 t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
488 }
489 for i, e := range expected {
490 a := actual[i]
491 if a != e {
492 t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
493 }
494 }
495}