blob: db2620d1fa7c504a712bc458e3633d4a60129ceb [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package lib
2
3import (
4 "bytes"
5 "encoding/json"
Shyam Jayaraman8deb14e2014-05-27 15:45:22 -07006 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07007 "reflect"
8 "testing"
Shyam Jayaraman8deb14e2014-05-27 15:45:22 -07009 "time"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070010 "veyron2"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070011 "veyron2/ipc"
12 "veyron2/naming"
13 "veyron2/rt"
Todd Wange5d67b42014-05-27 10:48:07 -070014 "veyron2/vdl"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070015 "veyron2/verror"
16 "veyron2/vlog"
17 "veyron2/wiretype"
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -070018
19 "veyron/runtimes/google/ipc/stream/proxy"
20 mounttable "veyron/services/mounttable/lib"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070021)
22
23var r veyron2.Runtime
24
25func init() {
26 r = rt.Init()
27}
28
29type simpleAdder struct{}
30
31func (s simpleAdder) Add(_ ipc.ServerCall, a, b int32) (int32, error) {
32 return a + b, nil
33}
34
35func (s simpleAdder) Divide(_ ipc.ServerCall, a, b int32) (int32, error) {
36 if b == 0 {
37 return 0, verror.BadArgf("can't divide by zero")
38 }
39 return a / b, nil
40}
41
42func (s simpleAdder) StreamingAdd(call ipc.ServerCall) (int32, error) {
43 total := int32(0)
44 var value int32
45 for err := call.Recv(&value); err == nil; err = call.Recv(&value) {
46 total += value
47 call.Send(total)
48 }
49 return total, nil
50}
51
52func (s simpleAdder) Signature(call ipc.ServerCall) (ipc.ServiceSignature, error) {
53 result := ipc.ServiceSignature{Methods: make(map[string]ipc.MethodSignature)}
54 result.Methods["Add"] = ipc.MethodSignature{
55 InArgs: []ipc.MethodArgument{
56 {Name: "A", Type: 36},
57 {Name: "B", Type: 36},
58 },
59 OutArgs: []ipc.MethodArgument{
60 {Name: "Value", Type: 36},
61 {Name: "Err", Type: 65},
62 },
63 }
64
65 result.Methods["Divide"] = ipc.MethodSignature{
66 InArgs: []ipc.MethodArgument{
67 {Name: "A", Type: 36},
68 {Name: "B", Type: 36},
69 },
70 OutArgs: []ipc.MethodArgument{
71 {Name: "Value", Type: 36},
72 {Name: "Err", Type: 65},
73 },
74 }
75
76 result.Methods["StreamingAdd"] = ipc.MethodSignature{
77 InArgs: []ipc.MethodArgument{},
78 OutArgs: []ipc.MethodArgument{
79 {Name: "Value", Type: 36},
80 {Name: "Err", Type: 65},
81 },
82 InStream: 36,
83 OutStream: 36,
84 }
Todd Wange5d67b42014-05-27 10:48:07 -070085 result.TypeDefs = []vdl.Any{
Jiri Simsa5293dcb2014-05-10 09:56:38 -070086 wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}}
87
88 return result, nil
89}
90
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -070091// A function that will register an handlers on the given server
92type registerFunc func(ipc.Server) error
93
94func startServer(registerer registerFunc) (ipc.Server, naming.Endpoint, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070095 // Create a new server instance.
96 s, err := r.NewServer()
97 if err != nil {
98 return nil, nil, err
99 }
100
101 // Register the "fortune" prefix with the fortune dispatcher.
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700102 if err := registerer(s); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700103 return nil, nil, err
104 }
105
106 endpoint, err := s.Listen("tcp", "127.0.0.1:0")
107 if err != nil {
108 return nil, nil, err
109 }
110 return s, endpoint, nil
111}
112
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700113func startAdderServer() (ipc.Server, naming.Endpoint, error) {
114 return startServer(func(server ipc.Server) error {
115 return server.Register("cache", ipc.SoloDispatcher(simpleAdder{}, nil))
116 })
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700117}
118
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700119func startProxy() (*proxy.Proxy, error) {
120 rid, err := naming.NewRoutingID()
121 if err != nil {
122 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700123 }
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700124
125 return proxy.New(rid, nil, "tcp", "127.0.0.1:0")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700126}
127
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700128func startMountTableServer() (ipc.Server, naming.Endpoint, error) {
129 return startServer(func(server ipc.Server) error {
Ryan Brownd123fa32014-05-19 10:57:32 -0700130 mt, err := mounttable.NewMountTable("")
131 if err != nil {
132 return err
133 }
134 return server.Register("mt", mt)
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700135 })
136}
137
138type testWriter struct {
139 stream []response
140 buf bytes.Buffer
141 err error
142 logger vlog.Logger
Shyam Jayaraman8deb14e2014-05-27 15:45:22 -0700143 // If this channel is set then a message will be sent
144 // to this channel after recieving a call to FinishMessage()
145 notifier chan bool
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700146}
147
148func (w *testWriter) Write(p []byte) (int, error) {
149 return w.buf.Write(p)
150
151}
152
153func (w *testWriter) getLogger() vlog.Logger {
154 return w.logger
155}
156
157func (w *testWriter) sendError(err error) {
158 w.err = err
159}
160
161func (w *testWriter) FinishMessage() error {
162 var resp response
163 p := w.buf.Bytes()
164 w.buf.Reset()
165 if err := json.Unmarshal(p, &resp); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700166 return err
167 }
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700168 w.stream = append(w.stream, resp)
Shyam Jayaraman8deb14e2014-05-27 15:45:22 -0700169 if w.notifier != nil {
170 w.notifier <- true
171 }
172 return nil
173}
174
175// Waits until there is at least n messages in w.stream. Returns an error if we timeout
176// waiting for the given number of messages.
177func (w *testWriter) waitForMessage(n int) error {
178 if len(w.stream) >= n {
179 return nil
180 }
181 w.notifier = make(chan bool, 1)
182 for len(w.stream) < n {
183 select {
184 case <-w.notifier:
185 continue
186 case <-time.After(time.Second):
187 return fmt.Errorf("timed out")
188 }
189 }
190 w.notifier = nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700191 return nil
192}
193
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700194func checkResponses(w *testWriter, expectedStream []response, err error, t *testing.T) {
195 if !reflect.DeepEqual(expectedStream, w.stream) {
196 t.Errorf("streams don't match: expected %v, got %v", expectedStream, w.stream)
197 }
198
199 if !reflect.DeepEqual(err, w.err) {
200 t.Errorf("unexpected error, got: %v, expected: %v", err, w.err)
201 }
202}
203
204var adderServiceSignature JSONServiceSignature = JSONServiceSignature{
205 "add": JSONMethodSignature{
206 InArgs: []string{"A", "B"},
207 NumOutArgs: 2,
208 IsStreaming: false,
209 },
210 "divide": JSONMethodSignature{
211 InArgs: []string{"A", "B"},
212 NumOutArgs: 2,
213 IsStreaming: false,
214 },
215 "streamingAdd": JSONMethodSignature{
216 InArgs: []string{},
217 NumOutArgs: 2,
218 IsStreaming: true,
219 },
220}
221
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700222func TestGetGoServerSignature(t *testing.T) {
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700223 s, endpoint, err := startAdderServer()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700224 if err != nil {
225 t.Errorf("unable to start server: %v", err)
226 t.Fail()
227 return
228 }
229 defer s.Stop()
230 wspr := NewWSPR(0, "mockVeyronProxyEP")
231 wspr.setup()
232 wsp := websocketPipe{ctx: wspr}
233 wsp.setup()
234 jsSig, err := wsp.getSignature("/"+endpoint.String()+"/cache", "")
235 if err != nil {
236 t.Errorf("Failed to get signature: %v", err)
237 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700238
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700239 if !reflect.DeepEqual(jsSig, adderServiceSignature) {
240 t.Errorf("Unexpected signature, got :%v, expected: %v", jsSig, adderServiceSignature)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700241 }
242}
243
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700244type goServerTestCase struct {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700245 method string
246 inArgs []interface{}
247 numOutArgs int32
248 streamingInputs []string
249 expectedStream []response
250 expectedError error
251}
252
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700253func runGoServerTestCase(t *testing.T, test goServerTestCase) {
254 s, endpoint, err := startAdderServer()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700255 if err != nil {
256 t.Errorf("unable to start server: %v", err)
257 t.Fail()
258 return
259 }
260 defer s.Stop()
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700261
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700262 wspr := NewWSPR(0, "mockVeyronProxyEP")
263 wspr.setup()
264 wsp := websocketPipe{ctx: wspr}
265 wsp.setup()
266 writer := testWriter{
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700267 logger: wspr.logger,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700268 }
269
270 var signal chan ipc.Stream
271 if len(test.streamingInputs) > 0 {
272 signal = make(chan ipc.Stream, 1)
273 wsp.outstandingStreams[0] = startQueueingStream(signal)
274 go func() {
275 for _, value := range test.streamingInputs {
276 wsp.sendOnStream(0, value, &writer)
277 }
278 wsp.closeStream(0)
279 }()
280 }
281
282 request := veyronRPC{
283 Name: "/" + endpoint.String() + "/cache",
284 Method: test.method,
285 InArgs: test.inArgs,
286 NumOutArgs: test.numOutArgs,
287 IsStreaming: signal != nil,
288 }
289 wsp.sendVeyronRequest(0, &request, &writer, signal)
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700290 checkResponses(&writer, test.expectedStream, test.expectedError, t)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700291}
292
293func TestCallingGoServer(t *testing.T) {
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700294 runGoServerTestCase(t, goServerTestCase{
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700295 method: "Add",
296 inArgs: []interface{}{2, 3},
297 numOutArgs: 2,
298 expectedStream: []response{
299 response{
300 Message: []interface{}{float64(5)},
301 Type: responseFinal,
302 },
303 },
304 })
305}
306
307func TestCallingGoServerWithError(t *testing.T) {
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700308 runGoServerTestCase(t, goServerTestCase{
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700309 method: "Divide",
310 inArgs: []interface{}{1, 0},
311 numOutArgs: 2,
312 expectedError: verror.BadArgf("can't divide by zero"),
313 })
314}
315
316func TestCallingGoWithStreaming(t *testing.T) {
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700317 runGoServerTestCase(t, goServerTestCase{
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700318 method: "StreamingAdd",
319 inArgs: []interface{}{},
320 streamingInputs: []string{"1", "2", "3", "4"},
321 numOutArgs: 2,
322 expectedStream: []response{
323 response{
324 Message: 1.0,
325 Type: responseStream,
326 },
327 response{
328 Message: 3.0,
329 Type: responseStream,
330 },
331 response{
332 Message: 6.0,
333 Type: responseStream,
334 },
335 response{
336 Message: 10.0,
337 Type: responseStream,
338 },
339 response{
340 Message: nil,
341 Type: responseStreamClose,
342 },
343 response{
344 Message: []interface{}{10.0},
345 Type: responseFinal,
346 },
347 },
348 })
349}
350
Shyam Jayaramanb1b16a82014-05-19 17:06:30 -0700351func TestJavascriptPublish(t *testing.T) {
352 mounttableServer, endpoint, err := startMountTableServer()
353
354 if err != nil {
355 t.Errorf("unable to start mounttable: %v", err)
356 return
357 }
358
359 defer mounttableServer.Stop()
360
361 proxyServer, err := startProxy()
362
363 if err != nil {
364 t.Errorf("unable to start proxy: %v", err)
365 return
366 }
367
368 defer proxyServer.Shutdown()
369
370 proxyEndpoint := proxyServer.Endpoint().String()
371
372 wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.MountTableRoots{"/" + endpoint.String() + "/mt"})
373 wspr.setup()
374 wsp := websocketPipe{ctx: wspr}
375 wsp.setup()
376 defer wsp.cleanup()
377 writer := testWriter{
378 logger: wspr.logger,
379 }
380 wsp.publish(publishRequest{
381 Name: "adder",
382 Services: map[string]JSONServiceSignature{
383 "adder": adderServiceSignature,
384 },
385 }, &writer)
386
387 if len(writer.stream) != 1 {
388 t.Errorf("expected only on response, got %d", len(writer.stream))
389 }
390
391 resp := writer.stream[0]
392
393 if resp.Type != responseFinal {
394 t.Errorf("unknown stream message Got: %v, expected: publish response", resp)
395 return
396 }
397
398 if msg, ok := resp.Message.(string); ok {
399 if _, err := r.NewEndpoint(msg); err == nil {
400 return
401 }
402
403 }
404 t.Errorf("invalid endpdoint returned from publish: %v", resp.Message)
405}
406
Shyam Jayaraman8deb14e2014-05-27 15:45:22 -0700407type jsServerTestCase struct {
408 method string
409 inArgs []interface{}
410 // The set of streaming inputs from the client to the server.
411 clientStream []interface{}
412 // The set of streaming outputs from the server to the client.
413 serverStream []interface{}
414 finalResponse interface{}
415 err *verror.Standard
416}
417
418func sendServerStream(t *testing.T, wsp *websocketPipe, test *jsServerTestCase, w clientWriter) {
419 for _, msg := range test.serverStream {
420 wsp.sendParsedMessageOnStream(0, msg, w)
421 }
422
423 serverReply := serverRPCReply{
424 Results: []interface{}{test.finalResponse},
425 Err: test.err,
426 }
427
428 bytes, err := json.Marshal(serverReply)
429 if err != nil {
430 t.Fatalf("Failed to serialize the reply: %v", err)
431 }
432 wsp.handleServerResponse(0, string(bytes))
433}
434
435func runJsServerTestCase(t *testing.T, test jsServerTestCase) {
436 mounttableServer, endpoint, err := startMountTableServer()
437
438 if err != nil {
439 t.Errorf("unable to start mounttable: %v", err)
440 return
441 }
442
443 defer mounttableServer.Stop()
444
445 proxyServer, err := startProxy()
446
447 if err != nil {
448 t.Errorf("unable to start proxy: %v", err)
449 return
450 }
451
452 defer proxyServer.Shutdown()
453
454 proxyEndpoint := proxyServer.Endpoint().String()
455
456 wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.MountTableRoots{"/" + endpoint.String() + "/mt"})
457 wspr.setup()
458 wsp := websocketPipe{ctx: wspr}
459 writer := testWriter{
460 logger: wspr.logger,
461 }
462 wsp.writerCreator = func(int64) clientWriter {
463 return &writer
464 }
465 wsp.setup()
466 defer wsp.cleanup()
467 wsp.publish(publishRequest{
468 Name: "adder",
469 Services: map[string]JSONServiceSignature{
470 "adder": adderServiceSignature,
471 },
472 }, &writer)
473
474 if len(writer.stream) != 1 {
475 t.Errorf("expected only on response, got %d", len(writer.stream))
476 }
477
478 resp := writer.stream[0]
479
480 if resp.Type != responseFinal {
481 t.Errorf("unknown stream message Got: %v, expected: publish response", resp)
482 return
483 }
484
485 msg, ok := resp.Message.(string)
486 if !ok {
487 t.Errorf("invalid endpdoint returned from publish: %v", resp.Message)
488 }
489
490 if _, err := r.NewEndpoint(msg); err != nil {
491 t.Errorf("invalid endpdoint returned from publish: %v", resp.Message)
492 }
493
494 writer.stream = nil
495
496 // Create a client using wspr's runtime so it points to the right mounttable.
497 client, err := wspr.rt.NewClient()
498
499 if err != nil {
500 t.Errorf("unable to create client: %v", err)
501 }
502
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700503 call, err := client.StartCall(wspr.rt.NewContext(), "/"+msg+"/adder", test.method, test.inArgs)
Shyam Jayaraman8deb14e2014-05-27 15:45:22 -0700504 if err != nil {
505 t.Errorf("failed to start call: %v", err)
506 }
507
508 expectedWebsocketMessage := []response{
509 response{
510 Type: serverRequest,
511 Message: map[string]interface{}{
512 "serverId": 0.0,
513 "serviceName": "adder",
514 "method": lowercaseFirstCharacter(test.method),
515 "args": test.inArgs,
516 "context": map[string]interface{}{
517 "name": "adder",
518 "suffix": "",
519 },
520 },
521 },
522 }
523
524 // Wait until the rpc has started.
525 if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
526 t.Errorf("didn't recieve expected message: %v", err)
527 }
528 for _, msg := range test.clientStream {
529 expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStream, Message: msg})
530 if err := call.Send(msg); err != nil {
531 t.Errorf("unexpected error while sending %v: %v", msg, err)
532 }
533 }
534
535 if len(test.clientStream) > 0 {
536 call.CloseSend()
537 expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStreamClose})
538 }
539
540 // Wait until all the streaming messages have been acknowledged.
541 if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
542 t.Errorf("didn't recieve expected message: %v", err)
543 }
544
545 expectedStream := test.serverStream
546 go sendServerStream(t, &wsp, &test, &writer)
547 for {
548 var data interface{}
549 if err := call.Recv(&data); err != nil {
550 break
551 }
552 if len(expectedStream) == 0 {
553 t.Errorf("unexpected stream value: %v", data)
554 continue
555 }
556 if !reflect.DeepEqual(data, expectedStream[0]) {
557 t.Errorf("unexpected stream value: got %v, expected %v", data, expectedStream[0])
558 }
559 expectedStream = expectedStream[1:]
560 }
561 var result interface{}
562 var err2 error
563
564 if err := call.Finish(&result, &err2); err != nil {
565 t.Errorf("unexpected err :%v", err)
566 }
567
568 if !reflect.DeepEqual(result, test.finalResponse) {
569 t.Errorf("unexected final response: got %v, expected %v", result, test.finalResponse)
570 }
571
572 // If err2 is nil and test.err is nil reflect.DeepEqual will return false because the
573 // types are different. Because of this, we only use reflect.DeepEqual if one of
574 // the values is non-nil. If both values are nil, then we consider them equal.
575 if (err2 != nil || test.err != nil) && !reflect.DeepEqual(err2, test.err) {
576 t.Errorf("unexected error: got %v, expected %v", err2, test.err)
577 }
578 checkResponses(&writer, expectedWebsocketMessage, nil, t)
579}
580
581func TestSimpleJSServer(t *testing.T) {
582 runJsServerTestCase(t, jsServerTestCase{
583 method: "Add",
584 inArgs: []interface{}{1.0, 2.0},
585 finalResponse: 3.0,
586 })
587}
588
589func TestJSServerWithError(t *testing.T) {
590 runJsServerTestCase(t, jsServerTestCase{
591 method: "Add",
592 inArgs: []interface{}{1.0, 2.0},
593 finalResponse: 3.0,
594 err: &verror.Standard{
595 ID: verror.Internal,
596 Msg: "JS Server failed",
597 },
598 })
599}
600
601func TestJSServerWihStreamingInputs(t *testing.T) {
602 runJsServerTestCase(t, jsServerTestCase{
603 method: "StreamingAdd",
604 inArgs: []interface{}{},
605 clientStream: []interface{}{3.0, 4.0},
606 finalResponse: 10.0,
607 })
608}
609
610func TestJSServerWihStreamingOutputs(t *testing.T) {
611 runJsServerTestCase(t, jsServerTestCase{
612 method: "StreamingAdd",
613 inArgs: []interface{}{},
614 serverStream: []interface{}{3.0, 4.0},
615 finalResponse: 10.0,
616 })
617}
618
619func TestJSServerWihStreamingInputsAndOutputs(t *testing.T) {
620 runJsServerTestCase(t, jsServerTestCase{
621 method: "StreamingAdd",
622 inArgs: []interface{}{},
623 clientStream: []interface{}{1.0, 2.0},
624 serverStream: []interface{}{3.0, 4.0},
625 finalResponse: 10.0,
626 })
627}
628
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700629// TODO(bjornick): Make sure that send on stream is nonblocking