Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package lib |
| 2 | |
| 3 | import ( |
| 4 | "bytes" |
| 5 | "encoding/json" |
Shyam Jayaraman | 8deb14e | 2014-05-27 15:45:22 -0700 | [diff] [blame] | 6 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 7 | "reflect" |
| 8 | "testing" |
Shyam Jayaraman | 8deb14e | 2014-05-27 15:45:22 -0700 | [diff] [blame] | 9 | "time" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 10 | "veyron2" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 11 | "veyron2/ipc" |
| 12 | "veyron2/naming" |
| 13 | "veyron2/rt" |
Todd Wang | e5d67b4 | 2014-05-27 10:48:07 -0700 | [diff] [blame] | 14 | "veyron2/vdl" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 15 | "veyron2/verror" |
| 16 | "veyron2/vlog" |
| 17 | "veyron2/wiretype" |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 18 | |
| 19 | "veyron/runtimes/google/ipc/stream/proxy" |
| 20 | mounttable "veyron/services/mounttable/lib" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 21 | ) |
| 22 | |
| 23 | var r veyron2.Runtime |
| 24 | |
| 25 | func init() { |
| 26 | r = rt.Init() |
| 27 | } |
| 28 | |
| 29 | type simpleAdder struct{} |
| 30 | |
| 31 | func (s simpleAdder) Add(_ ipc.ServerCall, a, b int32) (int32, error) { |
| 32 | return a + b, nil |
| 33 | } |
| 34 | |
| 35 | func (s simpleAdder) Divide(_ ipc.ServerCall, a, b int32) (int32, error) { |
| 36 | if b == 0 { |
| 37 | return 0, verror.BadArgf("can't divide by zero") |
| 38 | } |
| 39 | return a / b, nil |
| 40 | } |
| 41 | |
| 42 | func (s simpleAdder) StreamingAdd(call ipc.ServerCall) (int32, error) { |
| 43 | total := int32(0) |
| 44 | var value int32 |
| 45 | for err := call.Recv(&value); err == nil; err = call.Recv(&value) { |
| 46 | total += value |
| 47 | call.Send(total) |
| 48 | } |
| 49 | return total, nil |
| 50 | } |
| 51 | |
| 52 | func (s simpleAdder) Signature(call ipc.ServerCall) (ipc.ServiceSignature, error) { |
| 53 | result := ipc.ServiceSignature{Methods: make(map[string]ipc.MethodSignature)} |
| 54 | result.Methods["Add"] = ipc.MethodSignature{ |
| 55 | InArgs: []ipc.MethodArgument{ |
| 56 | {Name: "A", Type: 36}, |
| 57 | {Name: "B", Type: 36}, |
| 58 | }, |
| 59 | OutArgs: []ipc.MethodArgument{ |
| 60 | {Name: "Value", Type: 36}, |
| 61 | {Name: "Err", Type: 65}, |
| 62 | }, |
| 63 | } |
| 64 | |
| 65 | result.Methods["Divide"] = ipc.MethodSignature{ |
| 66 | InArgs: []ipc.MethodArgument{ |
| 67 | {Name: "A", Type: 36}, |
| 68 | {Name: "B", Type: 36}, |
| 69 | }, |
| 70 | OutArgs: []ipc.MethodArgument{ |
| 71 | {Name: "Value", Type: 36}, |
| 72 | {Name: "Err", Type: 65}, |
| 73 | }, |
| 74 | } |
| 75 | |
| 76 | result.Methods["StreamingAdd"] = ipc.MethodSignature{ |
| 77 | InArgs: []ipc.MethodArgument{}, |
| 78 | OutArgs: []ipc.MethodArgument{ |
| 79 | {Name: "Value", Type: 36}, |
| 80 | {Name: "Err", Type: 65}, |
| 81 | }, |
| 82 | InStream: 36, |
| 83 | OutStream: 36, |
| 84 | } |
Todd Wang | e5d67b4 | 2014-05-27 10:48:07 -0700 | [diff] [blame] | 85 | result.TypeDefs = []vdl.Any{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 86 | wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}} |
| 87 | |
| 88 | return result, nil |
| 89 | } |
| 90 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 91 | // A function that will register an handlers on the given server |
| 92 | type registerFunc func(ipc.Server) error |
| 93 | |
| 94 | func startServer(registerer registerFunc) (ipc.Server, naming.Endpoint, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 95 | // Create a new server instance. |
| 96 | s, err := r.NewServer() |
| 97 | if err != nil { |
| 98 | return nil, nil, err |
| 99 | } |
| 100 | |
| 101 | // Register the "fortune" prefix with the fortune dispatcher. |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 102 | if err := registerer(s); err != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 103 | return nil, nil, err |
| 104 | } |
| 105 | |
| 106 | endpoint, err := s.Listen("tcp", "127.0.0.1:0") |
| 107 | if err != nil { |
| 108 | return nil, nil, err |
| 109 | } |
| 110 | return s, endpoint, nil |
| 111 | } |
| 112 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 113 | func startAdderServer() (ipc.Server, naming.Endpoint, error) { |
| 114 | return startServer(func(server ipc.Server) error { |
| 115 | return server.Register("cache", ipc.SoloDispatcher(simpleAdder{}, nil)) |
| 116 | }) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 117 | } |
| 118 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 119 | func startProxy() (*proxy.Proxy, error) { |
| 120 | rid, err := naming.NewRoutingID() |
| 121 | if err != nil { |
| 122 | return nil, err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 123 | } |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 124 | |
| 125 | return proxy.New(rid, nil, "tcp", "127.0.0.1:0") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 126 | } |
| 127 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 128 | func startMountTableServer() (ipc.Server, naming.Endpoint, error) { |
| 129 | return startServer(func(server ipc.Server) error { |
Ryan Brown | d123fa3 | 2014-05-19 10:57:32 -0700 | [diff] [blame] | 130 | mt, err := mounttable.NewMountTable("") |
| 131 | if err != nil { |
| 132 | return err |
| 133 | } |
| 134 | return server.Register("mt", mt) |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 135 | }) |
| 136 | } |
| 137 | |
| 138 | type testWriter struct { |
| 139 | stream []response |
| 140 | buf bytes.Buffer |
| 141 | err error |
| 142 | logger vlog.Logger |
Shyam Jayaraman | 8deb14e | 2014-05-27 15:45:22 -0700 | [diff] [blame] | 143 | // If this channel is set then a message will be sent |
| 144 | // to this channel after recieving a call to FinishMessage() |
| 145 | notifier chan bool |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 146 | } |
| 147 | |
| 148 | func (w *testWriter) Write(p []byte) (int, error) { |
| 149 | return w.buf.Write(p) |
| 150 | |
| 151 | } |
| 152 | |
| 153 | func (w *testWriter) getLogger() vlog.Logger { |
| 154 | return w.logger |
| 155 | } |
| 156 | |
| 157 | func (w *testWriter) sendError(err error) { |
| 158 | w.err = err |
| 159 | } |
| 160 | |
| 161 | func (w *testWriter) FinishMessage() error { |
| 162 | var resp response |
| 163 | p := w.buf.Bytes() |
| 164 | w.buf.Reset() |
| 165 | if err := json.Unmarshal(p, &resp); err != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 166 | return err |
| 167 | } |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 168 | w.stream = append(w.stream, resp) |
Shyam Jayaraman | 8deb14e | 2014-05-27 15:45:22 -0700 | [diff] [blame] | 169 | if w.notifier != nil { |
| 170 | w.notifier <- true |
| 171 | } |
| 172 | return nil |
| 173 | } |
| 174 | |
| 175 | // Waits until there is at least n messages in w.stream. Returns an error if we timeout |
| 176 | // waiting for the given number of messages. |
| 177 | func (w *testWriter) waitForMessage(n int) error { |
| 178 | if len(w.stream) >= n { |
| 179 | return nil |
| 180 | } |
| 181 | w.notifier = make(chan bool, 1) |
| 182 | for len(w.stream) < n { |
| 183 | select { |
| 184 | case <-w.notifier: |
| 185 | continue |
| 186 | case <-time.After(time.Second): |
| 187 | return fmt.Errorf("timed out") |
| 188 | } |
| 189 | } |
| 190 | w.notifier = nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 191 | return nil |
| 192 | } |
| 193 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 194 | func checkResponses(w *testWriter, expectedStream []response, err error, t *testing.T) { |
| 195 | if !reflect.DeepEqual(expectedStream, w.stream) { |
| 196 | t.Errorf("streams don't match: expected %v, got %v", expectedStream, w.stream) |
| 197 | } |
| 198 | |
| 199 | if !reflect.DeepEqual(err, w.err) { |
| 200 | t.Errorf("unexpected error, got: %v, expected: %v", err, w.err) |
| 201 | } |
| 202 | } |
| 203 | |
| 204 | var adderServiceSignature JSONServiceSignature = JSONServiceSignature{ |
| 205 | "add": JSONMethodSignature{ |
| 206 | InArgs: []string{"A", "B"}, |
| 207 | NumOutArgs: 2, |
| 208 | IsStreaming: false, |
| 209 | }, |
| 210 | "divide": JSONMethodSignature{ |
| 211 | InArgs: []string{"A", "B"}, |
| 212 | NumOutArgs: 2, |
| 213 | IsStreaming: false, |
| 214 | }, |
| 215 | "streamingAdd": JSONMethodSignature{ |
| 216 | InArgs: []string{}, |
| 217 | NumOutArgs: 2, |
| 218 | IsStreaming: true, |
| 219 | }, |
| 220 | } |
| 221 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 222 | func TestGetGoServerSignature(t *testing.T) { |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 223 | s, endpoint, err := startAdderServer() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 224 | if err != nil { |
| 225 | t.Errorf("unable to start server: %v", err) |
| 226 | t.Fail() |
| 227 | return |
| 228 | } |
| 229 | defer s.Stop() |
| 230 | wspr := NewWSPR(0, "mockVeyronProxyEP") |
| 231 | wspr.setup() |
| 232 | wsp := websocketPipe{ctx: wspr} |
| 233 | wsp.setup() |
| 234 | jsSig, err := wsp.getSignature("/"+endpoint.String()+"/cache", "") |
| 235 | if err != nil { |
| 236 | t.Errorf("Failed to get signature: %v", err) |
| 237 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 238 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 239 | if !reflect.DeepEqual(jsSig, adderServiceSignature) { |
| 240 | t.Errorf("Unexpected signature, got :%v, expected: %v", jsSig, adderServiceSignature) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 241 | } |
| 242 | } |
| 243 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 244 | type goServerTestCase struct { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 245 | method string |
| 246 | inArgs []interface{} |
| 247 | numOutArgs int32 |
| 248 | streamingInputs []string |
| 249 | expectedStream []response |
| 250 | expectedError error |
| 251 | } |
| 252 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 253 | func runGoServerTestCase(t *testing.T, test goServerTestCase) { |
| 254 | s, endpoint, err := startAdderServer() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 255 | if err != nil { |
| 256 | t.Errorf("unable to start server: %v", err) |
| 257 | t.Fail() |
| 258 | return |
| 259 | } |
| 260 | defer s.Stop() |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 261 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 262 | wspr := NewWSPR(0, "mockVeyronProxyEP") |
| 263 | wspr.setup() |
| 264 | wsp := websocketPipe{ctx: wspr} |
| 265 | wsp.setup() |
| 266 | writer := testWriter{ |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 267 | logger: wspr.logger, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 268 | } |
| 269 | |
| 270 | var signal chan ipc.Stream |
| 271 | if len(test.streamingInputs) > 0 { |
| 272 | signal = make(chan ipc.Stream, 1) |
| 273 | wsp.outstandingStreams[0] = startQueueingStream(signal) |
| 274 | go func() { |
| 275 | for _, value := range test.streamingInputs { |
| 276 | wsp.sendOnStream(0, value, &writer) |
| 277 | } |
| 278 | wsp.closeStream(0) |
| 279 | }() |
| 280 | } |
| 281 | |
| 282 | request := veyronRPC{ |
| 283 | Name: "/" + endpoint.String() + "/cache", |
| 284 | Method: test.method, |
| 285 | InArgs: test.inArgs, |
| 286 | NumOutArgs: test.numOutArgs, |
| 287 | IsStreaming: signal != nil, |
| 288 | } |
| 289 | wsp.sendVeyronRequest(0, &request, &writer, signal) |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 290 | checkResponses(&writer, test.expectedStream, test.expectedError, t) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 291 | } |
| 292 | |
| 293 | func TestCallingGoServer(t *testing.T) { |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 294 | runGoServerTestCase(t, goServerTestCase{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 295 | method: "Add", |
| 296 | inArgs: []interface{}{2, 3}, |
| 297 | numOutArgs: 2, |
| 298 | expectedStream: []response{ |
| 299 | response{ |
| 300 | Message: []interface{}{float64(5)}, |
| 301 | Type: responseFinal, |
| 302 | }, |
| 303 | }, |
| 304 | }) |
| 305 | } |
| 306 | |
| 307 | func TestCallingGoServerWithError(t *testing.T) { |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 308 | runGoServerTestCase(t, goServerTestCase{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 309 | method: "Divide", |
| 310 | inArgs: []interface{}{1, 0}, |
| 311 | numOutArgs: 2, |
| 312 | expectedError: verror.BadArgf("can't divide by zero"), |
| 313 | }) |
| 314 | } |
| 315 | |
| 316 | func TestCallingGoWithStreaming(t *testing.T) { |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 317 | runGoServerTestCase(t, goServerTestCase{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 318 | method: "StreamingAdd", |
| 319 | inArgs: []interface{}{}, |
| 320 | streamingInputs: []string{"1", "2", "3", "4"}, |
| 321 | numOutArgs: 2, |
| 322 | expectedStream: []response{ |
| 323 | response{ |
| 324 | Message: 1.0, |
| 325 | Type: responseStream, |
| 326 | }, |
| 327 | response{ |
| 328 | Message: 3.0, |
| 329 | Type: responseStream, |
| 330 | }, |
| 331 | response{ |
| 332 | Message: 6.0, |
| 333 | Type: responseStream, |
| 334 | }, |
| 335 | response{ |
| 336 | Message: 10.0, |
| 337 | Type: responseStream, |
| 338 | }, |
| 339 | response{ |
| 340 | Message: nil, |
| 341 | Type: responseStreamClose, |
| 342 | }, |
| 343 | response{ |
| 344 | Message: []interface{}{10.0}, |
| 345 | Type: responseFinal, |
| 346 | }, |
| 347 | }, |
| 348 | }) |
| 349 | } |
| 350 | |
Shyam Jayaraman | b1b16a8 | 2014-05-19 17:06:30 -0700 | [diff] [blame] | 351 | func TestJavascriptPublish(t *testing.T) { |
| 352 | mounttableServer, endpoint, err := startMountTableServer() |
| 353 | |
| 354 | if err != nil { |
| 355 | t.Errorf("unable to start mounttable: %v", err) |
| 356 | return |
| 357 | } |
| 358 | |
| 359 | defer mounttableServer.Stop() |
| 360 | |
| 361 | proxyServer, err := startProxy() |
| 362 | |
| 363 | if err != nil { |
| 364 | t.Errorf("unable to start proxy: %v", err) |
| 365 | return |
| 366 | } |
| 367 | |
| 368 | defer proxyServer.Shutdown() |
| 369 | |
| 370 | proxyEndpoint := proxyServer.Endpoint().String() |
| 371 | |
| 372 | wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.MountTableRoots{"/" + endpoint.String() + "/mt"}) |
| 373 | wspr.setup() |
| 374 | wsp := websocketPipe{ctx: wspr} |
| 375 | wsp.setup() |
| 376 | defer wsp.cleanup() |
| 377 | writer := testWriter{ |
| 378 | logger: wspr.logger, |
| 379 | } |
| 380 | wsp.publish(publishRequest{ |
| 381 | Name: "adder", |
| 382 | Services: map[string]JSONServiceSignature{ |
| 383 | "adder": adderServiceSignature, |
| 384 | }, |
| 385 | }, &writer) |
| 386 | |
| 387 | if len(writer.stream) != 1 { |
| 388 | t.Errorf("expected only on response, got %d", len(writer.stream)) |
| 389 | } |
| 390 | |
| 391 | resp := writer.stream[0] |
| 392 | |
| 393 | if resp.Type != responseFinal { |
| 394 | t.Errorf("unknown stream message Got: %v, expected: publish response", resp) |
| 395 | return |
| 396 | } |
| 397 | |
| 398 | if msg, ok := resp.Message.(string); ok { |
| 399 | if _, err := r.NewEndpoint(msg); err == nil { |
| 400 | return |
| 401 | } |
| 402 | |
| 403 | } |
| 404 | t.Errorf("invalid endpdoint returned from publish: %v", resp.Message) |
| 405 | } |
| 406 | |
Shyam Jayaraman | 8deb14e | 2014-05-27 15:45:22 -0700 | [diff] [blame] | 407 | type jsServerTestCase struct { |
| 408 | method string |
| 409 | inArgs []interface{} |
| 410 | // The set of streaming inputs from the client to the server. |
| 411 | clientStream []interface{} |
| 412 | // The set of streaming outputs from the server to the client. |
| 413 | serverStream []interface{} |
| 414 | finalResponse interface{} |
| 415 | err *verror.Standard |
| 416 | } |
| 417 | |
| 418 | func sendServerStream(t *testing.T, wsp *websocketPipe, test *jsServerTestCase, w clientWriter) { |
| 419 | for _, msg := range test.serverStream { |
| 420 | wsp.sendParsedMessageOnStream(0, msg, w) |
| 421 | } |
| 422 | |
| 423 | serverReply := serverRPCReply{ |
| 424 | Results: []interface{}{test.finalResponse}, |
| 425 | Err: test.err, |
| 426 | } |
| 427 | |
| 428 | bytes, err := json.Marshal(serverReply) |
| 429 | if err != nil { |
| 430 | t.Fatalf("Failed to serialize the reply: %v", err) |
| 431 | } |
| 432 | wsp.handleServerResponse(0, string(bytes)) |
| 433 | } |
| 434 | |
| 435 | func runJsServerTestCase(t *testing.T, test jsServerTestCase) { |
| 436 | mounttableServer, endpoint, err := startMountTableServer() |
| 437 | |
| 438 | if err != nil { |
| 439 | t.Errorf("unable to start mounttable: %v", err) |
| 440 | return |
| 441 | } |
| 442 | |
| 443 | defer mounttableServer.Stop() |
| 444 | |
| 445 | proxyServer, err := startProxy() |
| 446 | |
| 447 | if err != nil { |
| 448 | t.Errorf("unable to start proxy: %v", err) |
| 449 | return |
| 450 | } |
| 451 | |
| 452 | defer proxyServer.Shutdown() |
| 453 | |
| 454 | proxyEndpoint := proxyServer.Endpoint().String() |
| 455 | |
| 456 | wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.MountTableRoots{"/" + endpoint.String() + "/mt"}) |
| 457 | wspr.setup() |
| 458 | wsp := websocketPipe{ctx: wspr} |
| 459 | writer := testWriter{ |
| 460 | logger: wspr.logger, |
| 461 | } |
| 462 | wsp.writerCreator = func(int64) clientWriter { |
| 463 | return &writer |
| 464 | } |
| 465 | wsp.setup() |
| 466 | defer wsp.cleanup() |
| 467 | wsp.publish(publishRequest{ |
| 468 | Name: "adder", |
| 469 | Services: map[string]JSONServiceSignature{ |
| 470 | "adder": adderServiceSignature, |
| 471 | }, |
| 472 | }, &writer) |
| 473 | |
| 474 | if len(writer.stream) != 1 { |
| 475 | t.Errorf("expected only on response, got %d", len(writer.stream)) |
| 476 | } |
| 477 | |
| 478 | resp := writer.stream[0] |
| 479 | |
| 480 | if resp.Type != responseFinal { |
| 481 | t.Errorf("unknown stream message Got: %v, expected: publish response", resp) |
| 482 | return |
| 483 | } |
| 484 | |
| 485 | msg, ok := resp.Message.(string) |
| 486 | if !ok { |
| 487 | t.Errorf("invalid endpdoint returned from publish: %v", resp.Message) |
| 488 | } |
| 489 | |
| 490 | if _, err := r.NewEndpoint(msg); err != nil { |
| 491 | t.Errorf("invalid endpdoint returned from publish: %v", resp.Message) |
| 492 | } |
| 493 | |
| 494 | writer.stream = nil |
| 495 | |
| 496 | // Create a client using wspr's runtime so it points to the right mounttable. |
| 497 | client, err := wspr.rt.NewClient() |
| 498 | |
| 499 | if err != nil { |
| 500 | t.Errorf("unable to create client: %v", err) |
| 501 | } |
| 502 | |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 503 | call, err := client.StartCall(wspr.rt.NewContext(), "/"+msg+"/adder", test.method, test.inArgs) |
Shyam Jayaraman | 8deb14e | 2014-05-27 15:45:22 -0700 | [diff] [blame] | 504 | if err != nil { |
| 505 | t.Errorf("failed to start call: %v", err) |
| 506 | } |
| 507 | |
| 508 | expectedWebsocketMessage := []response{ |
| 509 | response{ |
| 510 | Type: serverRequest, |
| 511 | Message: map[string]interface{}{ |
| 512 | "serverId": 0.0, |
| 513 | "serviceName": "adder", |
| 514 | "method": lowercaseFirstCharacter(test.method), |
| 515 | "args": test.inArgs, |
| 516 | "context": map[string]interface{}{ |
| 517 | "name": "adder", |
| 518 | "suffix": "", |
| 519 | }, |
| 520 | }, |
| 521 | }, |
| 522 | } |
| 523 | |
| 524 | // Wait until the rpc has started. |
| 525 | if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil { |
| 526 | t.Errorf("didn't recieve expected message: %v", err) |
| 527 | } |
| 528 | for _, msg := range test.clientStream { |
| 529 | expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStream, Message: msg}) |
| 530 | if err := call.Send(msg); err != nil { |
| 531 | t.Errorf("unexpected error while sending %v: %v", msg, err) |
| 532 | } |
| 533 | } |
| 534 | |
| 535 | if len(test.clientStream) > 0 { |
| 536 | call.CloseSend() |
| 537 | expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStreamClose}) |
| 538 | } |
| 539 | |
| 540 | // Wait until all the streaming messages have been acknowledged. |
| 541 | if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil { |
| 542 | t.Errorf("didn't recieve expected message: %v", err) |
| 543 | } |
| 544 | |
| 545 | expectedStream := test.serverStream |
| 546 | go sendServerStream(t, &wsp, &test, &writer) |
| 547 | for { |
| 548 | var data interface{} |
| 549 | if err := call.Recv(&data); err != nil { |
| 550 | break |
| 551 | } |
| 552 | if len(expectedStream) == 0 { |
| 553 | t.Errorf("unexpected stream value: %v", data) |
| 554 | continue |
| 555 | } |
| 556 | if !reflect.DeepEqual(data, expectedStream[0]) { |
| 557 | t.Errorf("unexpected stream value: got %v, expected %v", data, expectedStream[0]) |
| 558 | } |
| 559 | expectedStream = expectedStream[1:] |
| 560 | } |
| 561 | var result interface{} |
| 562 | var err2 error |
| 563 | |
| 564 | if err := call.Finish(&result, &err2); err != nil { |
| 565 | t.Errorf("unexpected err :%v", err) |
| 566 | } |
| 567 | |
| 568 | if !reflect.DeepEqual(result, test.finalResponse) { |
| 569 | t.Errorf("unexected final response: got %v, expected %v", result, test.finalResponse) |
| 570 | } |
| 571 | |
| 572 | // If err2 is nil and test.err is nil reflect.DeepEqual will return false because the |
| 573 | // types are different. Because of this, we only use reflect.DeepEqual if one of |
| 574 | // the values is non-nil. If both values are nil, then we consider them equal. |
| 575 | if (err2 != nil || test.err != nil) && !reflect.DeepEqual(err2, test.err) { |
| 576 | t.Errorf("unexected error: got %v, expected %v", err2, test.err) |
| 577 | } |
| 578 | checkResponses(&writer, expectedWebsocketMessage, nil, t) |
| 579 | } |
| 580 | |
| 581 | func TestSimpleJSServer(t *testing.T) { |
| 582 | runJsServerTestCase(t, jsServerTestCase{ |
| 583 | method: "Add", |
| 584 | inArgs: []interface{}{1.0, 2.0}, |
| 585 | finalResponse: 3.0, |
| 586 | }) |
| 587 | } |
| 588 | |
| 589 | func TestJSServerWithError(t *testing.T) { |
| 590 | runJsServerTestCase(t, jsServerTestCase{ |
| 591 | method: "Add", |
| 592 | inArgs: []interface{}{1.0, 2.0}, |
| 593 | finalResponse: 3.0, |
| 594 | err: &verror.Standard{ |
| 595 | ID: verror.Internal, |
| 596 | Msg: "JS Server failed", |
| 597 | }, |
| 598 | }) |
| 599 | } |
| 600 | |
| 601 | func TestJSServerWihStreamingInputs(t *testing.T) { |
| 602 | runJsServerTestCase(t, jsServerTestCase{ |
| 603 | method: "StreamingAdd", |
| 604 | inArgs: []interface{}{}, |
| 605 | clientStream: []interface{}{3.0, 4.0}, |
| 606 | finalResponse: 10.0, |
| 607 | }) |
| 608 | } |
| 609 | |
| 610 | func TestJSServerWihStreamingOutputs(t *testing.T) { |
| 611 | runJsServerTestCase(t, jsServerTestCase{ |
| 612 | method: "StreamingAdd", |
| 613 | inArgs: []interface{}{}, |
| 614 | serverStream: []interface{}{3.0, 4.0}, |
| 615 | finalResponse: 10.0, |
| 616 | }) |
| 617 | } |
| 618 | |
| 619 | func TestJSServerWihStreamingInputsAndOutputs(t *testing.T) { |
| 620 | runJsServerTestCase(t, jsServerTestCase{ |
| 621 | method: "StreamingAdd", |
| 622 | inArgs: []interface{}{}, |
| 623 | clientStream: []interface{}{1.0, 2.0}, |
| 624 | serverStream: []interface{}{3.0, 4.0}, |
| 625 | finalResponse: 10.0, |
| 626 | }) |
| 627 | } |
| 628 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 629 | // TODO(bjornick): Make sure that send on stream is nonblocking |