blob: 5c3494b67428a357cc14b2f1f4b974a1afe7d6b6 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "errors"
5 "fmt"
6 "io"
7 "log"
8 "reflect"
9 "strings"
10 "testing"
11 "time"
12
13 _ "veyron/lib/testutil"
14 imanager "veyron/runtimes/google/ipc/stream/manager"
15 "veyron/runtimes/google/ipc/stream/vc"
16 "veyron/runtimes/google/ipc/version"
17 inaming "veyron/runtimes/google/naming"
18 isecurity "veyron/runtimes/google/security"
19 icaveat "veyron/runtimes/google/security/caveat"
20
21 "veyron2"
22 "veyron2/ipc"
23 "veyron2/ipc/stream"
24 "veyron2/naming"
25 "veyron2/security"
26 "veyron2/verror"
27 "veyron2/vlog"
28)
29
30var (
31 errMethod = verror.Abortedf("server returned an error")
32 clientID security.PrivateID
33 serverID security.PrivateID
34)
35
36var errAuthorizer = errors.New("ipc: application Authorizer denied access")
37
38type userType string
39
40type testServer struct{}
41
42func (*testServer) Closure(call ipc.ServerCall) {
43}
44
45func (*testServer) Error(call ipc.ServerCall) error {
46 return errMethod
47}
48
49func (*testServer) Echo(call ipc.ServerCall, arg string) string {
50 return fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg)
51}
52
53func (*testServer) EchoUser(call ipc.ServerCall, arg string, u userType) (string, userType) {
54 return fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg), u
55}
56
57func (*testServer) EchoIDs(call ipc.ServerCall) (server, client string) {
58 return fmt.Sprintf("%v", call.LocalID()), fmt.Sprintf("%v", call.RemoteID())
59}
60
61func (*testServer) EchoAndError(call ipc.ServerCall, arg string) (string, error) {
62 result := fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg)
63 if arg == "error" {
64 return result, errMethod
65 }
66 return result, nil
67}
68
69func (*testServer) Stream(call ipc.ServerCall, arg string) (string, error) {
70 result := fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg)
71 var u userType
72 var err error
73 for err = call.Recv(&u); err == nil; err = call.Recv(&u) {
74 result += " " + string(u)
75 if err := call.Send(u); err != nil {
76 return "", err
77 }
78 }
79 if err == io.EOF {
80 err = nil
81 }
82 return result, err
83}
84
85func (*testServer) Unauthorized(ipc.ServerCall) (string, error) {
86 return "UnauthorizedResult", fmt.Errorf("Unauthorized should never be called")
87}
88
89type testServerAuthorizer struct{}
90
91func (testServerAuthorizer) Authorize(c security.Context) error {
92 if c.Method() != "Unauthorized" {
93 return nil
94 }
95 return errAuthorizer
96}
97
98type testServerDisp struct{ server interface{} }
99
100func (t testServerDisp) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
101 // If suffix is "nilAuth" we use default authorization, if it is "aclAuth" we
102 // use an ACL based authorizer, and otherwise we use the custom testServerAuthorizer.
103 if suffix == "nilAuth" {
104 return ipc.ReflectInvoker(t.server), nil, nil
105 }
106 if suffix == "aclAuth" {
107 // Only authorize clients matching patterns "client" or "server/*".
108 acl := security.ACL{
109 "server/*": security.LabelSet(security.AdminLabel),
110 "client": security.LabelSet(security.AdminLabel),
111 }
112 return ipc.ReflectInvoker(t.server), isecurity.NewACLAuthorizer(acl), nil
113 }
114 return ipc.ReflectInvoker(t.server), testServerAuthorizer{}, nil
115}
116
117func startServer(t *testing.T, serverID security.PrivateID, sm stream.Manager, mt naming.MountTable, ts interface{}) ipc.Server {
118 vlog.VI(1).Info("InternalNewServer")
119 server, err := InternalNewServer(sm, mt, veyron2.LocalID(serverID))
120 if err != nil {
121 t.Errorf("InternalNewServer failed: %v", err)
122 }
123 vlog.VI(1).Info("server.Register")
124 disp := testServerDisp{ts}
125 if err := server.Register("server", disp); err != nil {
126 t.Errorf("server.Register failed: %v", err)
127 }
128 vlog.VI(1).Info("server.Listen")
129 if _, err := server.Listen("tcp", "localhost:0"); err != nil {
130 t.Errorf("server.Listen failed: %v", err)
131 }
132 vlog.VI(1).Info("server.Publish")
133 if err := server.Publish("mountpoint"); err != nil {
134 t.Errorf("server.Publish failed: %v", err)
135 }
136 return server
137}
138
139func verifyMount(t *testing.T, mt naming.MountTable, name string) {
140 if _, err := mt.Resolve(name); err != nil {
141 t.Errorf("%s not found in mounttable", name)
142 }
143}
144
145func verifyMountMissing(t *testing.T, mt naming.MountTable, name string) {
146 if servers, err := mt.Resolve(name); err == nil {
147 t.Errorf("%s not supposed to be found in mounttable; got %d servers instead", name, len(servers))
148 }
149}
150
151func stopServer(t *testing.T, server ipc.Server, mt naming.MountTable) {
152 vlog.VI(1).Info("server.Stop")
153 verifyMount(t, mt, "mountpoint/server")
154
155 // Check that we can still publish.
156 server.Publish("should_appear_in_mt")
157 verifyMount(t, mt, "should_appear_in_mt/server")
158
159 if err := server.Stop(); err != nil {
160 t.Errorf("server.Stop failed: %v", err)
161 }
162 // Check that we can no longer publish after Stop.
163 server.Publish("should_not_appear_in_mt")
164 verifyMountMissing(t, mt, "should_not_appear_in_mt/server")
165
166 verifyMountMissing(t, mt, "mountpoint/server")
167 verifyMountMissing(t, mt, "should_appear_in_mt/server")
168 verifyMountMissing(t, mt, "should_not_appear_in_mt/server")
169 vlog.VI(1).Info("server.Stop DONE")
170}
171
172func createClientAndServer(t *testing.T, clientID, serverID security.PrivateID, ts interface{}) (ipc.Client, ipc.Server, naming.MountTable, stream.Manager) {
173 streamMgr := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
174 mountTable := inaming.InternalNewMountTable()
175 server := startServer(t, serverID, streamMgr, mountTable, ts)
176 client, err := InternalNewClient(streamMgr, mountTable, veyron2.LocalID(clientID))
177 if err != nil {
178 t.Fatalf("InternalNewClient failed: %v", err)
179 }
180 return client, server, mountTable, streamMgr
181}
182
183func derive(blessor security.PrivateID, name string, caveats []security.ServiceCaveat) security.PrivateID {
184 id, err := isecurity.NewChainPrivateID("irrelevant")
185 if err != nil {
186 panic(err)
187 }
188 blessedID, err := blessor.Bless(id.PublicID(), name, 5*time.Minute, caveats)
189 if err != nil {
190 panic(err)
191 }
192 derivedID, err := id.Derive(blessedID)
193 if err != nil {
194 panic(err)
195 }
196 return derivedID
197}
198
199func matchesErrorPattern(err error, pattern string) bool {
200 if (len(pattern) == 0) != (err == nil) {
201 return false
202 }
203 return err == nil || strings.Index(err.Error(), pattern) >= 0
204}
205
206func TestStartCall(t *testing.T) {
207 authorizeErr := "has one or more invalid caveats"
208 nameErr := "does not have a name matching the provided pattern"
209
210 cavOnlyV1 := security.UniversalCaveat(&icaveat.PeerIdentity{Peers: []security.PrincipalPattern{"client/v1"}})
211 now := time.Now()
212 cavExpired := security.ServiceCaveat{
213 Service: security.AllPrincipals,
214 Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
215 }
216
217 clientV1ID := derive(clientID, "v1", nil)
218 clientV2ID := derive(clientID, "v2", nil)
219 serverV1ID := derive(serverID, "v1", []security.ServiceCaveat{cavOnlyV1})
220 serverExpiredID := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
221
222 tests := []struct {
223 clientID, serverID security.PrivateID
224 pattern security.PrincipalPattern // pattern on the server identity expected by client.
225 err string
226 }{
227 // Client accepts talking to server only if server's identity matches the
228 // provided pattern.
229 {clientID, serverID, security.AllPrincipals, ""},
230 {clientID, serverID, "server", ""},
231 {clientID, serverID, "server/v1", ""},
232 {clientID, serverID, "anotherServer", nameErr},
233
234 // All clients reject talking to a server with an expired identity.
235 {clientID, serverExpiredID, security.AllPrincipals, authorizeErr},
236 {clientV1ID, serverExpiredID, security.AllPrincipals, authorizeErr},
237 {clientV2ID, serverExpiredID, security.AllPrincipals, authorizeErr},
238
239 // Only clientV1 accepts talking to serverV1.
240 {clientV1ID, serverV1ID, security.AllPrincipals, ""},
241 {clientV2ID, serverV1ID, security.AllPrincipals, authorizeErr},
242 }
243 // Servers and clients will be created per-test, use the same stream manager and mounttable.
244 mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
245 mt := inaming.InternalNewMountTable()
246 for _, test := range tests {
247 name := fmt.Sprintf("(clientID:%q serverID:%q)", test.clientID, test.serverID)
248 server := startServer(t, test.serverID, mgr, mt, &testServer{})
249 client, err := InternalNewClient(mgr, mt, veyron2.LocalID(test.clientID))
250 if err != nil {
251 t.Errorf("%s: Client creation failed: %v", name, err)
252 continue
253 }
254 if _, err := client.StartCall("mountpoint/server/suffix", "irrelevant", nil, veyron2.RemoteID(test.pattern)); !matchesErrorPattern(err, test.err) {
255 t.Errorf(`%s: client.StartCall: got error "%v", want to match "%v"`, name, err, test.err)
256 }
257 client.Close()
258 stopServer(t, server, mt)
259 }
260}
261
262func TestRPC(t *testing.T) {
263 type v []interface{}
264 type testcase struct {
265 name string
266 method string
267 args v
268 streamArgs v
269 startErr error
270 results v
271 finishErr error
272 }
273 tests := []testcase{
274 {"mountpoint/server/suffix", "Closure", nil, nil, nil, nil, nil},
275 {"mountpoint/server/suffix", "Error", nil, nil, nil, v{errMethod}, nil},
276
277 {"mountpoint/server/suffix", "Echo", v{"foo"}, nil, nil, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, nil},
278 {"mountpoint/server/suffix/abc", "Echo", v{"bar"}, nil, nil, v{`method:"Echo",suffix:"suffix/abc",arg:"bar"`}, nil},
279
280 {"mountpoint/server/suffix", "EchoUser", v{"foo", userType("bar")}, nil, nil, v{`method:"EchoUser",suffix:"suffix",arg:"foo"`, userType("bar")}, nil},
281 {"mountpoint/server/suffix/abc", "EchoUser", v{"baz", userType("bla")}, nil, nil, v{`method:"EchoUser",suffix:"suffix/abc",arg:"baz"`, userType("bla")}, nil},
282 {"mountpoint/server/suffix", "Stream", v{"foo"}, v{userType("bar"), userType("baz")}, nil, v{`method:"Stream",suffix:"suffix",arg:"foo" bar baz`, nil}, nil},
283 {"mountpoint/server/suffix/abc", "Stream", v{"123"}, v{userType("456"), userType("789")}, nil, v{`method:"Stream",suffix:"suffix/abc",arg:"123" 456 789`, nil}, nil},
284 {"mountpoint/server/suffix", "EchoIDs", nil, nil, nil, v{"server", "client"}, nil},
285 {"mountpoint/server/suffix", "EchoAndError", v{"bugs bunny"}, nil, nil, v{`method:"EchoAndError",suffix:"suffix",arg:"bugs bunny"`, nil}, nil},
286 {"mountpoint/server/suffix", "EchoAndError", v{"error"}, nil, nil, v{`method:"EchoAndError",suffix:"suffix",arg:"error"`, errMethod}, nil},
287 }
288 name := func(t testcase) string {
289 return fmt.Sprintf("%s.%s(%v)", t.name, t.method, t.args)
290 }
291 client, server, mt, _ := createClientAndServer(t, clientID, serverID, &testServer{})
292 defer stopServer(t, server, mt)
293 defer client.Close()
294 for _, test := range tests {
295 vlog.VI(1).Infof("%s client.StartCall", name(test))
296 call, err := client.StartCall(test.name, test.method, test.args)
297 if err != test.startErr {
298 t.Errorf(`%s client.StartCall got error "%v", want "%v"`, name(test), err, test.startErr)
299 continue
300 }
301 for _, sarg := range test.streamArgs {
302 vlog.VI(1).Infof("%s client.Send(%v)", name(test), sarg)
303 if err := call.Send(sarg); err != nil {
304 t.Errorf(`%s call.Send(%v) got unexpected error "%v"`, name(test), sarg, err)
305 }
306 var u userType
307 if err := call.Recv(&u); err != nil {
308 t.Errorf(`%s call.Recv(%v) got unexpected error "%v"`, name(test), sarg, err)
309 }
310 if !reflect.DeepEqual(u, sarg) {
311 t.Errorf("%s call.Recv got value %v, want %v", name(test), u, sarg)
312 }
313 }
314 vlog.VI(1).Infof("%s call.CloseSend", name(test))
315 if err := call.CloseSend(); err != nil {
316 t.Errorf(`%s call.CloseSend got unexpected error "%v"`, name(test), err)
317 }
318 vlog.VI(1).Infof("%s client.Finish", name(test))
319 results := makeResultPtrs(test.results)
320 err = call.Finish(results...)
321 if err != test.finishErr {
322 t.Errorf(`%s call.Finish got error "%v", want "%v"`, name(test), err, test.finishErr)
323 }
324 checkResultPtrs(t, name(test), results, test.results)
325 }
326}
327
328func TestRPCAuthorization(t *testing.T) {
329 cavOnlyEcho := security.ServiceCaveat{
330 Service: security.AllPrincipals,
331 Caveat: &icaveat.MethodRestriction{[]string{"Echo"}},
332 }
333 now := time.Now()
334 cavExpired := security.ServiceCaveat{
335 Service: security.AllPrincipals,
336 Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
337 }
338
339 blessedByServerOnlyEcho := derive(serverID, "onlyEcho", []security.ServiceCaveat{cavOnlyEcho})
340 blessedByServerExpired := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
341 blessedByClient := derive(clientID, "blessed", nil)
342
343 const (
344 expiredIDErr = "forbids credential from being used at this time"
345 nilAuthErr = "no matching principal pattern found"
346 aclAuthErr = "no matching ACL entry found"
347 )
348 invalidMethodErr := func(method string) string {
349 return fmt.Sprintf("caveat.MethodRestriction{Methods:[Echo]} forbids invocation of method %s", method)
350 }
351
352 type v []interface{}
353 type testcase struct {
354 clientID security.PrivateID
355 name string
356 method string
357 args v
358 results v
359 finishErr string
360 }
361 tests := []testcase{
362 // Clients whose identities have invalid caveats are not by authorized by any authorizer.
363 {blessedByServerExpired, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, expiredIDErr},
364 {blessedByServerExpired, "mountpoint/server/suffix", "Echo", v{"foo"}, v{""}, expiredIDErr},
365 {blessedByServerOnlyEcho, "mountpoint/server/nilAuth", "Closure", nil, nil, invalidMethodErr("Closure")},
366 {blessedByServerOnlyEcho, "mountpoint/server/suffix", "Closure", nil, nil, invalidMethodErr("Closure")},
367 // Only clients with a trusted name that matches either the server's identity or an identity blessed
368 // by the server are authorized by the (default) nilAuth authorizer.
369 {clientID, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, nilAuthErr},
370 {blessedByClient, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, nilAuthErr},
371 {serverID, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"nilAuth",arg:"foo"`}, ""},
372 {serverID, "mountpoint/server/nilAuth", "Closure", nil, nil, ""},
373 {blessedByServerOnlyEcho, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"nilAuth",arg:"foo"`}, ""},
374 // Only clients matching the server's ACL are authorized.
375 {clientID, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""},
376 {blessedByClient, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{""}, aclAuthErr},
377 {serverID, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""},
378 {blessedByServerOnlyEcho, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""},
379 {clientID, "mountpoint/server/aclAuth", "Closure", nil, nil, ""},
380 {blessedByClient, "mountpoint/server/aclAuth", "Closure", nil, nil, aclAuthErr},
381 {serverID, "mountpoint/server/aclAuth", "Closure", nil, nil, ""},
382
383 // All methods except "Unauthorized" are authorized by the custom authorizer.
384 {clientID, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
385 {blessedByClient, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
386 {serverID, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
387 {blessedByServerOnlyEcho, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
388 {clientID, "mountpoint/server/suffix", "Closure", nil, nil, ""},
389 {blessedByClient, "mountpoint/server/suffix", "Closure", nil, nil, ""},
390 {serverID, "mountpoint/server/suffix", "Closure", nil, nil, ""},
391 {clientID, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"},
392 {blessedByClient, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"},
393 {serverID, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"},
394 }
395 name := func(t testcase) string {
396 return fmt.Sprintf("%q RPCing %s.%s(%v)", t.clientID.PublicID(), t.name, t.method, t.args)
397 }
398
399 dummyClient, server, mt, sm := createClientAndServer(t, nil, serverID, &testServer{})
400 defer stopServer(t, server, mt)
401 defer dummyClient.Close()
402 for _, test := range tests {
403 client, err := InternalNewClient(sm, mt, veyron2.LocalID(test.clientID))
404 if err != nil {
405 t.Fatalf("InternalNewClient failed: %v", err)
406 }
407 defer client.Close()
408 call, err := client.StartCall(test.name, test.method, test.args)
409 if err != nil {
410 t.Errorf(`%s client.StartCall got unexpected error: "%v"`, name(test), err)
411 continue
412 }
413 if err := call.CloseSend(); err != nil {
414 t.Errorf(`%s call.CloseSend got unexpected error: "%v"`, name(test), err)
415 }
416 results := makeResultPtrs(test.results)
417 err = call.Finish(results...)
418 if !matchesErrorPattern(err, test.finishErr) {
419 t.Errorf(`%s call.Finish got error: "%v", want to match: "%v"`, name(test), err, test.finishErr)
420 }
421 }
422}
423
424type cancelTestServer struct {
425 started chan struct{}
426 cancelled chan struct{}
427}
428
429func newCancelTestServer() *cancelTestServer {
430 return &cancelTestServer{
431 started: make(chan struct{}),
432 cancelled: make(chan struct{}),
433 }
434}
435
436func (s *cancelTestServer) CancelStreamReader(call ipc.ServerCall) error {
437 close(s.started)
438 for {
439 var b []byte
440 if err := call.Recv(&b); err != nil && err != io.EOF {
441 return err
442 }
443 if call.IsClosed() {
444 close(s.cancelled)
445 return nil
446 }
447 }
448}
449
450// CancelStreamIgnorer doesn't read from it's input stream so all it's
451// buffers fill. The intention is to show that call.IsClosed is updated
452// even when the stream is stalled.
453func (s *cancelTestServer) CancelStreamIgnorer(call ipc.ServerCall) error {
454 close(s.started)
455 for {
456 time.Sleep(time.Millisecond)
457 if call.IsClosed() {
458 close(s.cancelled)
459 return nil
460 }
461 }
462}
463
464func waitForCancel(t *testing.T, ts *cancelTestServer, call ipc.ClientCall) {
465 <-ts.started
466 call.Cancel()
467 <-ts.cancelled
468}
469
470// TestCancel tests cancellation while the server is reading from a stream.
471func TestCancel(t *testing.T) {
472 ts := newCancelTestServer()
473 client, server, mt, _ := createClientAndServer(t, clientID, serverID, ts)
474 defer stopServer(t, server, mt)
475 defer client.Close()
476
477 call, err := client.StartCall("mountpoint/server/suffix", "CancelStreamReader", []interface{}{})
478 if err != nil {
479 t.Fatalf("Start call failed: %v", err)
480 }
481 for i := 0; i <= 10; i++ {
482 b := []byte{1, 2, 3}
483 if err := call.Send(b); err != nil {
484 t.Errorf("clientCall.Send error %q", err)
485 }
486 }
487 waitForCancel(t, ts, call)
488}
489
490// TestCancelWithFullBuffers tests that even if the writer has filled the buffers and
491// the server is not reading that the cancel message gets through.
492func TestCancelWithFullBuffers(t *testing.T) {
493 ts := newCancelTestServer()
494 client, server, mt, _ := createClientAndServer(t, clientID, serverID, ts)
495 defer stopServer(t, server, mt)
496 defer client.Close()
497
498 call, err := client.StartCall("mountpoint/server/suffix", "CancelStreamIgnorer", []interface{}{})
499 if err != nil {
500 t.Fatalf("Start call failed: %v", err)
501 }
502 // Fill up all the write buffers to ensure that cancelling works even when the stream
503 // is blocked.
504 call.Send(make([]byte, vc.MaxSharedBytes))
505 call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
506
507 waitForCancel(t, ts, call)
508}
509
510type streamRecvInGoroutineServer struct{ c chan error }
511
512func (s *streamRecvInGoroutineServer) RecvInGoroutine(call ipc.ServerCall) error {
513 // Spawn a goroutine to read streaming data from the client.
514 go func() {
515 var i interface{}
516 for {
517 err := call.Recv(&i)
518 if err != nil {
519 s.c <- err
520 return
521 }
522 }
523 }()
524 // Imagine the server did some processing here and now that it is done,
525 // it does not care to see what else the client has to say.
526 return nil
527}
528
529func TestStreamReadTerminatedByServer(t *testing.T) {
530 s := &streamRecvInGoroutineServer{c: make(chan error, 1)}
531 client, server, mt, _ := createClientAndServer(t, clientID, serverID, s)
532 defer stopServer(t, server, mt)
533 defer client.Close()
534
535 call, err := client.StartCall("mountpoint/server/suffix", "RecvInGoroutine", []interface{}{})
536 if err != nil {
537 t.Fatalf("StartCall failed: %v", err)
538 }
539
540 c := make(chan error, 1)
541 go func() {
542 for i := 0; true; i++ {
543 if err := call.Send(i); err != nil {
544 c <- err
545 return
546 }
547 }
548 }()
549
550 // The goroutine at the server executing "Recv" should have terminated
551 // with EOF.
552 if err := <-s.c; err != io.EOF {
553 t.Errorf("Got %v at server, want io.EOF", err)
554 }
555 // The client Send should have failed since the RPC has been
556 // terminated.
557 if err := <-c; err == nil {
558 t.Errorf("Client Send should fail as the server should have closed the flow")
559 }
560}
561
562// TestConnectWithIncompatibleServers tests that clients ignore incompatible endpoints.
563func TestConnectWithIncompatibleServers(t *testing.T) {
564 client, server, mt, _ := createClientAndServer(t, clientID, serverID, &testServer{})
565
566 defer stopServer(t, server, mt)
567 defer client.Close()
568
569 // Publish some incompatible endpoints.
570 publisher := InternalNewPublisher(mt, publishPeriod)
571 defer publisher.WaitForStop()
572 defer publisher.Stop()
573 publisher.AddName("incompatible")
574 publisher.AddServer("/@2@tcp@localhost:10000@@1000000@2000000@@")
575 publisher.AddServer("/@2@tcp@localhost:10001@@2000000@3000000@@")
576
577 _, err := client.StartCall("incompatible/server/suffix", "Echo", []interface{}{"foo"})
578 if !strings.Contains(err.Error(), version.NoCompatibleVersionErr.Error()) {
579 t.Errorf("Expected error %v, found: %v", version.NoCompatibleVersionErr, err)
580 }
581
582 // Now add a server with a compatible endpoint and try again.
583 server.Publish("incompatible")
584
585 call, err := client.StartCall("incompatible/server/suffix", "Echo", []interface{}{"foo"})
586 expected := `method:"Echo",suffix:"suffix",arg:"foo"`
587 var result string
588 err = call.Finish(&result)
589 if err != nil {
590 t.Errorf("Unexpected error finishing call %v", err)
591 }
592 if result != expected {
593 t.Errorf("Wrong result returned. Got %s, wanted %s", result, expected)
594 }
595}
596
597func init() {
598 var err error
599 if clientID, err = isecurity.NewChainPrivateID("client"); err != nil {
600 log.Fatalf("failed isecurity.NewChainPrivateID: %s", err)
601 }
602 if serverID, err = isecurity.NewChainPrivateID("server"); err != nil {
603 log.Fatalf("failed isecurity.NewChainPrivateID: %s", err)
604 }
605 isecurity.TrustIdentityProviders(clientID)
606 isecurity.TrustIdentityProviders(serverID)
607}