blob: 47846730a8e76124837c4bb12e27e18628dff16a [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "errors"
5 "fmt"
6 "io"
Bogdan Caprita187269b2014-05-13 19:59:46 -07007 "net"
Bogdan Caprita783f7792014-05-15 09:29:17 -07008 "os"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "reflect"
10 "strings"
Bogdan Caprita27953142014-05-12 11:41:42 -070011 "sync"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070012 "testing"
13 "time"
14
Bogdan Caprita783f7792014-05-15 09:29:17 -070015 "veyron/lib/testutil"
16 "veyron/lib/testutil/blackbox"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070017 imanager "veyron/runtimes/google/ipc/stream/manager"
18 "veyron/runtimes/google/ipc/stream/vc"
19 "veyron/runtimes/google/ipc/version"
Bogdan Caprita187269b2014-05-13 19:59:46 -070020 inaming "veyron/runtimes/google/naming"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070021 isecurity "veyron/runtimes/google/security"
22 icaveat "veyron/runtimes/google/security/caveat"
23
24 "veyron2"
25 "veyron2/ipc"
26 "veyron2/ipc/stream"
27 "veyron2/naming"
28 "veyron2/security"
29 "veyron2/verror"
30 "veyron2/vlog"
31)
32
33var (
34 errMethod = verror.Abortedf("server returned an error")
35 clientID security.PrivateID
36 serverID security.PrivateID
37)
38
39var errAuthorizer = errors.New("ipc: application Authorizer denied access")
40
41type userType string
42
43type testServer struct{}
44
45func (*testServer) Closure(call ipc.ServerCall) {
46}
47
48func (*testServer) Error(call ipc.ServerCall) error {
49 return errMethod
50}
51
52func (*testServer) Echo(call ipc.ServerCall, arg string) string {
53 return fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg)
54}
55
56func (*testServer) EchoUser(call ipc.ServerCall, arg string, u userType) (string, userType) {
57 return fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg), u
58}
59
60func (*testServer) EchoIDs(call ipc.ServerCall) (server, client string) {
61 return fmt.Sprintf("%v", call.LocalID()), fmt.Sprintf("%v", call.RemoteID())
62}
63
64func (*testServer) EchoAndError(call ipc.ServerCall, arg string) (string, error) {
65 result := fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg)
66 if arg == "error" {
67 return result, errMethod
68 }
69 return result, nil
70}
71
72func (*testServer) Stream(call ipc.ServerCall, arg string) (string, error) {
73 result := fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg)
74 var u userType
75 var err error
76 for err = call.Recv(&u); err == nil; err = call.Recv(&u) {
77 result += " " + string(u)
78 if err := call.Send(u); err != nil {
79 return "", err
80 }
81 }
82 if err == io.EOF {
83 err = nil
84 }
85 return result, err
86}
87
88func (*testServer) Unauthorized(ipc.ServerCall) (string, error) {
89 return "UnauthorizedResult", fmt.Errorf("Unauthorized should never be called")
90}
91
92type testServerAuthorizer struct{}
93
94func (testServerAuthorizer) Authorize(c security.Context) error {
95 if c.Method() != "Unauthorized" {
96 return nil
97 }
98 return errAuthorizer
99}
100
101type testServerDisp struct{ server interface{} }
102
103func (t testServerDisp) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
104 // If suffix is "nilAuth" we use default authorization, if it is "aclAuth" we
105 // use an ACL based authorizer, and otherwise we use the custom testServerAuthorizer.
106 if suffix == "nilAuth" {
107 return ipc.ReflectInvoker(t.server), nil, nil
108 }
109 if suffix == "aclAuth" {
110 // Only authorize clients matching patterns "client" or "server/*".
111 acl := security.ACL{
112 "server/*": security.LabelSet(security.AdminLabel),
113 "client": security.LabelSet(security.AdminLabel),
114 }
Ankur992269a2014-05-13 13:03:24 -0700115 return ipc.ReflectInvoker(t.server), security.NewACLAuthorizer(acl), nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700116 }
117 return ipc.ReflectInvoker(t.server), testServerAuthorizer{}, nil
118}
119
Bogdan Caprita27953142014-05-12 11:41:42 -0700120// mountTable is a simple partial implementation of naming.MountTable. In
121// particular, it ignores TTLs and not allow fully overlapping mount names.
122type mountTable struct {
123 sync.Mutex
124 mounts map[string][]string
125}
126
127func newMountTable() naming.MountTable {
128 return &mountTable{mounts: make(map[string][]string)}
129}
130
131func (mt *mountTable) Mount(name, server string, _ time.Duration) error {
132 mt.Lock()
133 defer mt.Unlock()
134 for n, _ := range mt.mounts {
135 if n != name && (strings.HasPrefix(name, n) || strings.HasPrefix(n, name)) {
136 return fmt.Errorf("simple mount table does not allow names that are a prefix of each other")
137 }
138 }
139 mt.mounts[name] = append(mt.mounts[name], server)
140 return nil
141}
142
143func (mt *mountTable) Unmount(name, server string) error {
144 var servers []string
145 mt.Lock()
146 defer mt.Unlock()
147 for _, s := range mt.mounts[name] {
148 // When server is "", we remove all servers under name.
149 if len(server) > 0 && s != server {
150 servers = append(servers, s)
151 }
152 }
153 if len(servers) > 0 {
154 mt.mounts[name] = servers
155 } else {
156 delete(mt.mounts, name)
157 }
158 return nil
159}
160
161func (mt *mountTable) Resolve(name string) ([]string, error) {
162 if address, _ := naming.SplitAddressName(name); len(address) > 0 {
163 return []string{name}, nil
164 }
165 mt.Lock()
166 defer mt.Unlock()
167 for prefix, servers := range mt.mounts {
168 if strings.HasPrefix(name, prefix) {
169 suffix := strings.TrimLeft(strings.TrimPrefix(name, prefix), "/")
170 var ret []string
171 for _, s := range servers {
172 ret = append(ret, naming.Join(s, suffix))
173 }
174 return ret, nil
175 }
176 }
177 return nil, verror.NotFoundf("Resolve name %q not found in %v", name, mt.mounts)
178}
179
180func (mt *mountTable) ResolveToMountTable(name string) ([]string, error) {
181 panic("ResolveToMountTable not implemented")
182 return nil, nil
183}
184
185func (mt *mountTable) Unresolve(name string) ([]string, error) {
186 panic("Unresolve not implemented")
187 return nil, nil
188}
189
190func (mt *mountTable) Glob(pattern string) (chan naming.MountEntry, error) {
191 panic("Glob not implemented")
192 return nil, nil
193}
194
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700195func startServer(t *testing.T, serverID security.PrivateID, sm stream.Manager, mt naming.MountTable, ts interface{}) ipc.Server {
196 vlog.VI(1).Info("InternalNewServer")
197 server, err := InternalNewServer(sm, mt, veyron2.LocalID(serverID))
198 if err != nil {
199 t.Errorf("InternalNewServer failed: %v", err)
200 }
201 vlog.VI(1).Info("server.Register")
202 disp := testServerDisp{ts}
203 if err := server.Register("server", disp); err != nil {
204 t.Errorf("server.Register failed: %v", err)
205 }
206 vlog.VI(1).Info("server.Listen")
207 if _, err := server.Listen("tcp", "localhost:0"); err != nil {
208 t.Errorf("server.Listen failed: %v", err)
209 }
210 vlog.VI(1).Info("server.Publish")
211 if err := server.Publish("mountpoint"); err != nil {
212 t.Errorf("server.Publish failed: %v", err)
213 }
214 return server
215}
216
217func verifyMount(t *testing.T, mt naming.MountTable, name string) {
218 if _, err := mt.Resolve(name); err != nil {
219 t.Errorf("%s not found in mounttable", name)
220 }
221}
222
223func verifyMountMissing(t *testing.T, mt naming.MountTable, name string) {
224 if servers, err := mt.Resolve(name); err == nil {
225 t.Errorf("%s not supposed to be found in mounttable; got %d servers instead", name, len(servers))
226 }
227}
228
229func stopServer(t *testing.T, server ipc.Server, mt naming.MountTable) {
230 vlog.VI(1).Info("server.Stop")
231 verifyMount(t, mt, "mountpoint/server")
232
233 // Check that we can still publish.
234 server.Publish("should_appear_in_mt")
235 verifyMount(t, mt, "should_appear_in_mt/server")
236
237 if err := server.Stop(); err != nil {
238 t.Errorf("server.Stop failed: %v", err)
239 }
240 // Check that we can no longer publish after Stop.
241 server.Publish("should_not_appear_in_mt")
242 verifyMountMissing(t, mt, "should_not_appear_in_mt/server")
243
244 verifyMountMissing(t, mt, "mountpoint/server")
245 verifyMountMissing(t, mt, "should_appear_in_mt/server")
246 verifyMountMissing(t, mt, "should_not_appear_in_mt/server")
247 vlog.VI(1).Info("server.Stop DONE")
248}
249
Bogdan Caprita27953142014-05-12 11:41:42 -0700250type bundle struct {
251 client ipc.Client
252 server ipc.Server
253 mt naming.MountTable
254 sm stream.Manager
255}
256
257func (b bundle) cleanup(t *testing.T) {
258 stopServer(t, b.server, b.mt)
259 b.client.Close()
260}
261
262func createBundle(t *testing.T, clientID, serverID security.PrivateID, ts interface{}) (b bundle) {
263 b.sm = imanager.InternalNew(naming.FixedRoutingID(0x555555555))
264 b.mt = newMountTable()
265 b.server = startServer(t, serverID, b.sm, b.mt, ts)
266 var err error
267 b.client, err = InternalNewClient(b.sm, b.mt, veyron2.LocalID(clientID))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700268 if err != nil {
269 t.Fatalf("InternalNewClient failed: %v", err)
270 }
Bogdan Caprita27953142014-05-12 11:41:42 -0700271 return
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700272}
273
Bogdan Caprita783f7792014-05-15 09:29:17 -0700274func derive(blessor security.PrivateID, name string, caveats ...security.ServiceCaveat) security.PrivateID {
Asim Shankar70a28872014-05-13 14:49:20 -0700275 id, err := isecurity.NewPrivateID("irrelevant")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700276 if err != nil {
277 panic(err)
278 }
279 blessedID, err := blessor.Bless(id.PublicID(), name, 5*time.Minute, caveats)
280 if err != nil {
281 panic(err)
282 }
283 derivedID, err := id.Derive(blessedID)
284 if err != nil {
285 panic(err)
286 }
287 return derivedID
288}
289
290func matchesErrorPattern(err error, pattern string) bool {
291 if (len(pattern) == 0) != (err == nil) {
292 return false
293 }
294 return err == nil || strings.Index(err.Error(), pattern) >= 0
295}
296
297func TestStartCall(t *testing.T) {
298 authorizeErr := "has one or more invalid caveats"
299 nameErr := "does not have a name matching the provided pattern"
300
301 cavOnlyV1 := security.UniversalCaveat(&icaveat.PeerIdentity{Peers: []security.PrincipalPattern{"client/v1"}})
302 now := time.Now()
303 cavExpired := security.ServiceCaveat{
304 Service: security.AllPrincipals,
305 Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
306 }
307
Bogdan Caprita783f7792014-05-15 09:29:17 -0700308 clientV1ID := derive(clientID, "v1")
309 clientV2ID := derive(clientID, "v2")
310 serverV1ID := derive(serverID, "v1", cavOnlyV1)
311 serverExpiredID := derive(serverID, "expired", cavExpired)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700312
313 tests := []struct {
314 clientID, serverID security.PrivateID
315 pattern security.PrincipalPattern // pattern on the server identity expected by client.
316 err string
317 }{
318 // Client accepts talking to server only if server's identity matches the
319 // provided pattern.
320 {clientID, serverID, security.AllPrincipals, ""},
321 {clientID, serverID, "server", ""},
322 {clientID, serverID, "server/v1", ""},
323 {clientID, serverID, "anotherServer", nameErr},
324
325 // All clients reject talking to a server with an expired identity.
326 {clientID, serverExpiredID, security.AllPrincipals, authorizeErr},
327 {clientV1ID, serverExpiredID, security.AllPrincipals, authorizeErr},
328 {clientV2ID, serverExpiredID, security.AllPrincipals, authorizeErr},
329
330 // Only clientV1 accepts talking to serverV1.
331 {clientV1ID, serverV1ID, security.AllPrincipals, ""},
332 {clientV2ID, serverV1ID, security.AllPrincipals, authorizeErr},
333 }
334 // Servers and clients will be created per-test, use the same stream manager and mounttable.
335 mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
Bogdan Caprita27953142014-05-12 11:41:42 -0700336 mt := newMountTable()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700337 for _, test := range tests {
338 name := fmt.Sprintf("(clientID:%q serverID:%q)", test.clientID, test.serverID)
339 server := startServer(t, test.serverID, mgr, mt, &testServer{})
340 client, err := InternalNewClient(mgr, mt, veyron2.LocalID(test.clientID))
341 if err != nil {
342 t.Errorf("%s: Client creation failed: %v", name, err)
Bogdan Caprita187269b2014-05-13 19:59:46 -0700343 stopServer(t, server, mt)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700344 continue
345 }
346 if _, err := client.StartCall("mountpoint/server/suffix", "irrelevant", nil, veyron2.RemoteID(test.pattern)); !matchesErrorPattern(err, test.err) {
347 t.Errorf(`%s: client.StartCall: got error "%v", want to match "%v"`, name, err, test.err)
348 }
349 client.Close()
350 stopServer(t, server, mt)
351 }
352}
353
354func TestRPC(t *testing.T) {
355 type v []interface{}
356 type testcase struct {
357 name string
358 method string
359 args v
360 streamArgs v
361 startErr error
362 results v
363 finishErr error
364 }
365 tests := []testcase{
366 {"mountpoint/server/suffix", "Closure", nil, nil, nil, nil, nil},
367 {"mountpoint/server/suffix", "Error", nil, nil, nil, v{errMethod}, nil},
368
369 {"mountpoint/server/suffix", "Echo", v{"foo"}, nil, nil, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, nil},
370 {"mountpoint/server/suffix/abc", "Echo", v{"bar"}, nil, nil, v{`method:"Echo",suffix:"suffix/abc",arg:"bar"`}, nil},
371
372 {"mountpoint/server/suffix", "EchoUser", v{"foo", userType("bar")}, nil, nil, v{`method:"EchoUser",suffix:"suffix",arg:"foo"`, userType("bar")}, nil},
373 {"mountpoint/server/suffix/abc", "EchoUser", v{"baz", userType("bla")}, nil, nil, v{`method:"EchoUser",suffix:"suffix/abc",arg:"baz"`, userType("bla")}, nil},
374 {"mountpoint/server/suffix", "Stream", v{"foo"}, v{userType("bar"), userType("baz")}, nil, v{`method:"Stream",suffix:"suffix",arg:"foo" bar baz`, nil}, nil},
375 {"mountpoint/server/suffix/abc", "Stream", v{"123"}, v{userType("456"), userType("789")}, nil, v{`method:"Stream",suffix:"suffix/abc",arg:"123" 456 789`, nil}, nil},
376 {"mountpoint/server/suffix", "EchoIDs", nil, nil, nil, v{"server", "client"}, nil},
377 {"mountpoint/server/suffix", "EchoAndError", v{"bugs bunny"}, nil, nil, v{`method:"EchoAndError",suffix:"suffix",arg:"bugs bunny"`, nil}, nil},
378 {"mountpoint/server/suffix", "EchoAndError", v{"error"}, nil, nil, v{`method:"EchoAndError",suffix:"suffix",arg:"error"`, errMethod}, nil},
379 }
380 name := func(t testcase) string {
381 return fmt.Sprintf("%s.%s(%v)", t.name, t.method, t.args)
382 }
Bogdan Caprita27953142014-05-12 11:41:42 -0700383 b := createBundle(t, clientID, serverID, &testServer{})
384 defer b.cleanup(t)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700385 for _, test := range tests {
386 vlog.VI(1).Infof("%s client.StartCall", name(test))
Bogdan Caprita27953142014-05-12 11:41:42 -0700387 call, err := b.client.StartCall(test.name, test.method, test.args)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700388 if err != test.startErr {
389 t.Errorf(`%s client.StartCall got error "%v", want "%v"`, name(test), err, test.startErr)
390 continue
391 }
392 for _, sarg := range test.streamArgs {
393 vlog.VI(1).Infof("%s client.Send(%v)", name(test), sarg)
394 if err := call.Send(sarg); err != nil {
395 t.Errorf(`%s call.Send(%v) got unexpected error "%v"`, name(test), sarg, err)
396 }
397 var u userType
398 if err := call.Recv(&u); err != nil {
399 t.Errorf(`%s call.Recv(%v) got unexpected error "%v"`, name(test), sarg, err)
400 }
401 if !reflect.DeepEqual(u, sarg) {
402 t.Errorf("%s call.Recv got value %v, want %v", name(test), u, sarg)
403 }
404 }
405 vlog.VI(1).Infof("%s call.CloseSend", name(test))
406 if err := call.CloseSend(); err != nil {
407 t.Errorf(`%s call.CloseSend got unexpected error "%v"`, name(test), err)
408 }
409 vlog.VI(1).Infof("%s client.Finish", name(test))
410 results := makeResultPtrs(test.results)
411 err = call.Finish(results...)
412 if err != test.finishErr {
413 t.Errorf(`%s call.Finish got error "%v", want "%v"`, name(test), err, test.finishErr)
414 }
415 checkResultPtrs(t, name(test), results, test.results)
416 }
417}
418
419func TestRPCAuthorization(t *testing.T) {
420 cavOnlyEcho := security.ServiceCaveat{
421 Service: security.AllPrincipals,
422 Caveat: &icaveat.MethodRestriction{[]string{"Echo"}},
423 }
424 now := time.Now()
425 cavExpired := security.ServiceCaveat{
426 Service: security.AllPrincipals,
427 Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
428 }
429
Bogdan Caprita783f7792014-05-15 09:29:17 -0700430 blessedByServerOnlyEcho := derive(serverID, "onlyEcho", cavOnlyEcho)
431 blessedByServerExpired := derive(serverID, "expired", cavExpired)
432 blessedByClient := derive(clientID, "blessed")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700433
434 const (
435 expiredIDErr = "forbids credential from being used at this time"
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700436 aclAuthErr = "no matching ACL entry found"
437 )
438 invalidMethodErr := func(method string) string {
439 return fmt.Sprintf("caveat.MethodRestriction{Methods:[Echo]} forbids invocation of method %s", method)
440 }
441
442 type v []interface{}
443 type testcase struct {
444 clientID security.PrivateID
445 name string
446 method string
447 args v
448 results v
449 finishErr string
450 }
451 tests := []testcase{
452 // Clients whose identities have invalid caveats are not by authorized by any authorizer.
453 {blessedByServerExpired, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, expiredIDErr},
454 {blessedByServerExpired, "mountpoint/server/suffix", "Echo", v{"foo"}, v{""}, expiredIDErr},
455 {blessedByServerOnlyEcho, "mountpoint/server/nilAuth", "Closure", nil, nil, invalidMethodErr("Closure")},
456 {blessedByServerOnlyEcho, "mountpoint/server/suffix", "Closure", nil, nil, invalidMethodErr("Closure")},
457 // Only clients with a trusted name that matches either the server's identity or an identity blessed
458 // by the server are authorized by the (default) nilAuth authorizer.
Ankur992269a2014-05-13 13:03:24 -0700459 {clientID, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, aclAuthErr},
460 {blessedByClient, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, aclAuthErr},
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700461 {serverID, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"nilAuth",arg:"foo"`}, ""},
462 {serverID, "mountpoint/server/nilAuth", "Closure", nil, nil, ""},
463 {blessedByServerOnlyEcho, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"nilAuth",arg:"foo"`}, ""},
464 // Only clients matching the server's ACL are authorized.
465 {clientID, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""},
466 {blessedByClient, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{""}, aclAuthErr},
467 {serverID, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""},
468 {blessedByServerOnlyEcho, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""},
469 {clientID, "mountpoint/server/aclAuth", "Closure", nil, nil, ""},
470 {blessedByClient, "mountpoint/server/aclAuth", "Closure", nil, nil, aclAuthErr},
471 {serverID, "mountpoint/server/aclAuth", "Closure", nil, nil, ""},
472
473 // All methods except "Unauthorized" are authorized by the custom authorizer.
474 {clientID, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
475 {blessedByClient, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
476 {serverID, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
477 {blessedByServerOnlyEcho, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""},
478 {clientID, "mountpoint/server/suffix", "Closure", nil, nil, ""},
479 {blessedByClient, "mountpoint/server/suffix", "Closure", nil, nil, ""},
480 {serverID, "mountpoint/server/suffix", "Closure", nil, nil, ""},
481 {clientID, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"},
482 {blessedByClient, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"},
483 {serverID, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"},
484 }
485 name := func(t testcase) string {
486 return fmt.Sprintf("%q RPCing %s.%s(%v)", t.clientID.PublicID(), t.name, t.method, t.args)
487 }
488
Bogdan Caprita27953142014-05-12 11:41:42 -0700489 b := createBundle(t, nil, serverID, &testServer{})
490 defer b.cleanup(t)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700491 for _, test := range tests {
Bogdan Caprita27953142014-05-12 11:41:42 -0700492 client, err := InternalNewClient(b.sm, b.mt, veyron2.LocalID(test.clientID))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700493 if err != nil {
494 t.Fatalf("InternalNewClient failed: %v", err)
495 }
496 defer client.Close()
497 call, err := client.StartCall(test.name, test.method, test.args)
498 if err != nil {
499 t.Errorf(`%s client.StartCall got unexpected error: "%v"`, name(test), err)
500 continue
501 }
502 if err := call.CloseSend(); err != nil {
503 t.Errorf(`%s call.CloseSend got unexpected error: "%v"`, name(test), err)
504 }
505 results := makeResultPtrs(test.results)
506 err = call.Finish(results...)
507 if !matchesErrorPattern(err, test.finishErr) {
508 t.Errorf(`%s call.Finish got error: "%v", want to match: "%v"`, name(test), err, test.finishErr)
509 }
510 }
511}
512
513type cancelTestServer struct {
514 started chan struct{}
515 cancelled chan struct{}
516}
517
518func newCancelTestServer() *cancelTestServer {
519 return &cancelTestServer{
520 started: make(chan struct{}),
521 cancelled: make(chan struct{}),
522 }
523}
524
525func (s *cancelTestServer) CancelStreamReader(call ipc.ServerCall) error {
526 close(s.started)
527 for {
528 var b []byte
529 if err := call.Recv(&b); err != nil && err != io.EOF {
530 return err
531 }
532 if call.IsClosed() {
533 close(s.cancelled)
534 return nil
535 }
536 }
537}
538
539// CancelStreamIgnorer doesn't read from it's input stream so all it's
540// buffers fill. The intention is to show that call.IsClosed is updated
541// even when the stream is stalled.
542func (s *cancelTestServer) CancelStreamIgnorer(call ipc.ServerCall) error {
543 close(s.started)
544 for {
545 time.Sleep(time.Millisecond)
546 if call.IsClosed() {
547 close(s.cancelled)
548 return nil
549 }
550 }
551}
552
553func waitForCancel(t *testing.T, ts *cancelTestServer, call ipc.ClientCall) {
554 <-ts.started
555 call.Cancel()
556 <-ts.cancelled
557}
558
559// TestCancel tests cancellation while the server is reading from a stream.
560func TestCancel(t *testing.T) {
561 ts := newCancelTestServer()
Bogdan Caprita27953142014-05-12 11:41:42 -0700562 b := createBundle(t, clientID, serverID, ts)
563 defer b.cleanup(t)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700564
Bogdan Caprita27953142014-05-12 11:41:42 -0700565 call, err := b.client.StartCall("mountpoint/server/suffix", "CancelStreamReader", []interface{}{})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700566 if err != nil {
567 t.Fatalf("Start call failed: %v", err)
568 }
569 for i := 0; i <= 10; i++ {
570 b := []byte{1, 2, 3}
571 if err := call.Send(b); err != nil {
572 t.Errorf("clientCall.Send error %q", err)
573 }
574 }
575 waitForCancel(t, ts, call)
576}
577
578// TestCancelWithFullBuffers tests that even if the writer has filled the buffers and
579// the server is not reading that the cancel message gets through.
580func TestCancelWithFullBuffers(t *testing.T) {
581 ts := newCancelTestServer()
Bogdan Caprita27953142014-05-12 11:41:42 -0700582 b := createBundle(t, clientID, serverID, ts)
583 defer b.cleanup(t)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700584
Bogdan Caprita27953142014-05-12 11:41:42 -0700585 call, err := b.client.StartCall("mountpoint/server/suffix", "CancelStreamIgnorer", []interface{}{})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700586 if err != nil {
587 t.Fatalf("Start call failed: %v", err)
588 }
589 // Fill up all the write buffers to ensure that cancelling works even when the stream
590 // is blocked.
591 call.Send(make([]byte, vc.MaxSharedBytes))
592 call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
593
594 waitForCancel(t, ts, call)
595}
596
597type streamRecvInGoroutineServer struct{ c chan error }
598
599func (s *streamRecvInGoroutineServer) RecvInGoroutine(call ipc.ServerCall) error {
600 // Spawn a goroutine to read streaming data from the client.
601 go func() {
602 var i interface{}
603 for {
604 err := call.Recv(&i)
605 if err != nil {
606 s.c <- err
607 return
608 }
609 }
610 }()
611 // Imagine the server did some processing here and now that it is done,
612 // it does not care to see what else the client has to say.
613 return nil
614}
615
616func TestStreamReadTerminatedByServer(t *testing.T) {
617 s := &streamRecvInGoroutineServer{c: make(chan error, 1)}
Bogdan Caprita27953142014-05-12 11:41:42 -0700618 b := createBundle(t, clientID, serverID, s)
619 defer b.cleanup(t)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700620
Bogdan Caprita27953142014-05-12 11:41:42 -0700621 call, err := b.client.StartCall("mountpoint/server/suffix", "RecvInGoroutine", []interface{}{})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700622 if err != nil {
623 t.Fatalf("StartCall failed: %v", err)
624 }
625
626 c := make(chan error, 1)
627 go func() {
628 for i := 0; true; i++ {
629 if err := call.Send(i); err != nil {
630 c <- err
631 return
632 }
633 }
634 }()
635
636 // The goroutine at the server executing "Recv" should have terminated
637 // with EOF.
638 if err := <-s.c; err != io.EOF {
639 t.Errorf("Got %v at server, want io.EOF", err)
640 }
641 // The client Send should have failed since the RPC has been
642 // terminated.
643 if err := <-c; err == nil {
644 t.Errorf("Client Send should fail as the server should have closed the flow")
645 }
646}
647
648// TestConnectWithIncompatibleServers tests that clients ignore incompatible endpoints.
649func TestConnectWithIncompatibleServers(t *testing.T) {
Bogdan Caprita27953142014-05-12 11:41:42 -0700650 b := createBundle(t, clientID, serverID, &testServer{})
651 defer b.cleanup(t)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700652
653 // Publish some incompatible endpoints.
Bogdan Caprita27953142014-05-12 11:41:42 -0700654 publisher := InternalNewPublisher(b.mt, publishPeriod)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700655 defer publisher.WaitForStop()
656 defer publisher.Stop()
657 publisher.AddName("incompatible")
658 publisher.AddServer("/@2@tcp@localhost:10000@@1000000@2000000@@")
659 publisher.AddServer("/@2@tcp@localhost:10001@@2000000@3000000@@")
660
Bogdan Caprita27953142014-05-12 11:41:42 -0700661 _, err := b.client.StartCall("incompatible/server/suffix", "Echo", []interface{}{"foo"})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700662 if !strings.Contains(err.Error(), version.NoCompatibleVersionErr.Error()) {
663 t.Errorf("Expected error %v, found: %v", version.NoCompatibleVersionErr, err)
664 }
665
666 // Now add a server with a compatible endpoint and try again.
Bogdan Caprita27953142014-05-12 11:41:42 -0700667 b.server.Publish("incompatible")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700668
Bogdan Caprita27953142014-05-12 11:41:42 -0700669 call, err := b.client.StartCall("incompatible/server/suffix", "Echo", []interface{}{"foo"})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700670 if err != nil {
Asim Shankar3a8a7e22014-05-12 18:01:44 -0700671 t.Fatal(err)
672 }
673 var result string
674 if err = call.Finish(&result); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700675 t.Errorf("Unexpected error finishing call %v", err)
676 }
Asim Shankar3a8a7e22014-05-12 18:01:44 -0700677 expected := `method:"Echo",suffix:"suffix",arg:"foo"`
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700678 if result != expected {
679 t.Errorf("Wrong result returned. Got %s, wanted %s", result, expected)
680 }
681}
682
Bogdan Caprita187269b2014-05-13 19:59:46 -0700683// TestPublishOptions verifies that the options that are relevant to how
684// a server publishes its endpoints have the right effect.
685func TestPublishOptions(t *testing.T) {
686 sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
687 mt := newMountTable()
688 cases := []struct {
689 opts []ipc.ServerOpt
690 expect []string
691 }{
692 {[]ipc.ServerOpt{}, []string{"127.0.0.1", "127.0.0.1"}},
693 {[]ipc.ServerOpt{veyron2.PublishAll}, []string{"127.0.0.1", "127.0.0.1"}},
694 {[]ipc.ServerOpt{veyron2.PublishFirst}, []string{"127.0.0.1"}},
Bogdan Caprita502de9d2014-05-14 18:48:06 -0700695 {[]ipc.ServerOpt{veyron2.EndpointRewriteOpt("example1.com"), veyron2.EndpointRewriteOpt("example2.com")}, []string{"example2.com", "example2.com"}},
Bogdan Caprita187269b2014-05-13 19:59:46 -0700696 {[]ipc.ServerOpt{veyron2.PublishFirst, veyron2.EndpointRewriteOpt("example.com")}, []string{"example.com"}},
697 }
698 for i, c := range cases {
699 server, err := InternalNewServer(sm, mt, append([]ipc.ServerOpt{veyron2.LocalID(serverID)}, c.opts...)...)
700 if err != nil {
701 t.Errorf("InternalNewServer failed: %v", err)
702 continue
703 }
704 if _, err := server.Listen("tcp", "localhost:0"); err != nil {
705 t.Errorf("server.Listen failed: %v", err)
706 server.Stop()
707 continue
708 }
709 if _, err := server.Listen("tcp", "localhost:0"); err != nil {
710 t.Errorf("server.Listen failed: %v", err)
711 server.Stop()
712 continue
713 }
714 if err := server.Publish("mountpoint"); err != nil {
715 t.Errorf("server.Publish failed: %v", err)
716 server.Stop()
717 continue
718 }
719 servers, err := mt.Resolve("mountpoint")
720 if err != nil {
721 t.Errorf("mountpoint not found in mounttable")
722 server.Stop()
723 continue
724 }
725 var got []string
726 for _, s := range servers {
727 address, _ := naming.SplitAddressName(s)
728 ep, err := inaming.NewEndpoint(address)
729 if err != nil {
730 t.Errorf("case #%d: server with invalid endpoint %q: %v", i, address, err)
731 continue
732 }
733 host, _, err := net.SplitHostPort(ep.Addr().String())
734 if err != nil {
735 t.Errorf("case #%d: server endpoint with invalid address %q: %v", i, ep.Addr(), err)
736 continue
737 }
738 got = append(got, host)
739 }
740 if want := c.expect; !reflect.DeepEqual(want, got) {
741 t.Errorf("case #%d: expected mounted servers with addresses %q, got %q instead", i, want, got)
742 }
743 server.Stop()
744 }
745}
746
Bogdan Caprita783f7792014-05-15 09:29:17 -0700747// TestReconnect verifies that the client transparently re-establishes the
748// connection to the server if the server dies and comes back (on the same
749// endpoint).
750func TestReconnect(t *testing.T) {
751 b := createBundle(t, clientID, nil, nil) // We only need the client from the bundle.
752 defer b.cleanup(t)
753 idFile := testutil.SaveIdentityToFile(derive(clientID, "server"))
754 server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0", idFile)
755 server.Cmd.Start()
756 addr, err := server.ReadLineFromChild()
757 if err != nil {
758 t.Fatalf("Failed to read server address from process: %v", err)
759 }
760 ep, err := inaming.NewEndpoint(addr)
761 if err != nil {
762 t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
763 }
764 serverName := naming.JoinAddressName(ep.String(), "server/suffix")
765 makeCall := func() (string, error) {
766 call, err := b.client.StartCall(serverName, "Echo", []interface{}{"bratman"})
767 if err != nil {
768 return "", err
769 }
770 var result string
771 if err = call.Finish(&result); err != nil {
772 return "", err
773 }
774 return result, nil
775 }
776 expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
777 if result, err := makeCall(); err != nil || result != expected {
778 t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
779 }
780 // Kill the server, verify client can't talk to it anymore.
781 server.Cleanup()
782 if _, err := makeCall(); err == nil {
783 t.Fatal("Expected call to fail since server is dead")
784 }
785 // Resurrect the server with the same address, verify client
786 // re-establishes the connection.
787 server = blackbox.HelperCommand(t, "runServer", addr, idFile)
788 defer server.Cleanup()
789 server.Cmd.Start()
790 if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
791 t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
792 }
793 if result, err := makeCall(); err != nil || result != expected {
794 t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
795 }
796}
797
798func loadIdentityFromFile(file string) security.PrivateID {
799 f, err := os.Open(file)
800 if err != nil {
801 vlog.Fatalf("failed to open %v: %v", file, err)
802 }
803 id, err := security.LoadIdentity(f)
804 f.Close()
805 if err != nil {
806 vlog.Fatalf("Failed to load identity from %v: %v", file, err)
807 }
808 return id
809}
810
811func runServer(argv []string) {
812 mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
813 mt := newMountTable()
814 id := loadIdentityFromFile(argv[1])
815 isecurity.TrustIdentityProviders(id)
816 server, err := InternalNewServer(mgr, mt, veyron2.LocalID(id))
817 if err != nil {
818 vlog.Fatalf("InternalNewServer failed: %v", err)
819 }
820 disp := testServerDisp{new(testServer)}
821 if err := server.Register("server", disp); err != nil {
822 vlog.Fatalf("server.Register failed: %v", err)
823 }
824 ep, err := server.Listen("tcp", argv[0])
825 if err != nil {
826 vlog.Fatalf("server.Listen failed: %v", err)
827 }
828 fmt.Println(ep.Addr())
829 // Live forever (parent process should explicitly kill us).
830 <-make(chan struct{})
831}
832
833// Required by blackbox framework.
834func TestHelperProcess(t *testing.T) {
835 blackbox.HelperProcess(t)
836}
837
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700838func init() {
839 var err error
Asim Shankar70a28872014-05-13 14:49:20 -0700840 if clientID, err = isecurity.NewPrivateID("client"); err != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -0700841 vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700842 }
Asim Shankar70a28872014-05-13 14:49:20 -0700843 if serverID, err = isecurity.NewPrivateID("server"); err != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -0700844 vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700845 }
846 isecurity.TrustIdentityProviders(clientID)
847 isecurity.TrustIdentityProviders(serverID)
Bogdan Caprita783f7792014-05-15 09:29:17 -0700848
849 blackbox.CommandTable["runServer"] = runServer
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700850}