Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame^] | 1 | package ipc |
| 2 | |
| 3 | import ( |
| 4 | "errors" |
| 5 | "fmt" |
| 6 | "io" |
| 7 | "log" |
| 8 | "reflect" |
| 9 | "strings" |
| 10 | "testing" |
| 11 | "time" |
| 12 | |
| 13 | _ "veyron/lib/testutil" |
| 14 | imanager "veyron/runtimes/google/ipc/stream/manager" |
| 15 | "veyron/runtimes/google/ipc/stream/vc" |
| 16 | "veyron/runtimes/google/ipc/version" |
| 17 | inaming "veyron/runtimes/google/naming" |
| 18 | isecurity "veyron/runtimes/google/security" |
| 19 | icaveat "veyron/runtimes/google/security/caveat" |
| 20 | |
| 21 | "veyron2" |
| 22 | "veyron2/ipc" |
| 23 | "veyron2/ipc/stream" |
| 24 | "veyron2/naming" |
| 25 | "veyron2/security" |
| 26 | "veyron2/verror" |
| 27 | "veyron2/vlog" |
| 28 | ) |
| 29 | |
| 30 | var ( |
| 31 | errMethod = verror.Abortedf("server returned an error") |
| 32 | clientID security.PrivateID |
| 33 | serverID security.PrivateID |
| 34 | ) |
| 35 | |
| 36 | var errAuthorizer = errors.New("ipc: application Authorizer denied access") |
| 37 | |
| 38 | type userType string |
| 39 | |
| 40 | type testServer struct{} |
| 41 | |
| 42 | func (*testServer) Closure(call ipc.ServerCall) { |
| 43 | } |
| 44 | |
| 45 | func (*testServer) Error(call ipc.ServerCall) error { |
| 46 | return errMethod |
| 47 | } |
| 48 | |
| 49 | func (*testServer) Echo(call ipc.ServerCall, arg string) string { |
| 50 | return fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg) |
| 51 | } |
| 52 | |
| 53 | func (*testServer) EchoUser(call ipc.ServerCall, arg string, u userType) (string, userType) { |
| 54 | return fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg), u |
| 55 | } |
| 56 | |
| 57 | func (*testServer) EchoIDs(call ipc.ServerCall) (server, client string) { |
| 58 | return fmt.Sprintf("%v", call.LocalID()), fmt.Sprintf("%v", call.RemoteID()) |
| 59 | } |
| 60 | |
| 61 | func (*testServer) EchoAndError(call ipc.ServerCall, arg string) (string, error) { |
| 62 | result := fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg) |
| 63 | if arg == "error" { |
| 64 | return result, errMethod |
| 65 | } |
| 66 | return result, nil |
| 67 | } |
| 68 | |
| 69 | func (*testServer) Stream(call ipc.ServerCall, arg string) (string, error) { |
| 70 | result := fmt.Sprintf("method:%q,suffix:%q,arg:%q", call.Method(), call.Suffix(), arg) |
| 71 | var u userType |
| 72 | var err error |
| 73 | for err = call.Recv(&u); err == nil; err = call.Recv(&u) { |
| 74 | result += " " + string(u) |
| 75 | if err := call.Send(u); err != nil { |
| 76 | return "", err |
| 77 | } |
| 78 | } |
| 79 | if err == io.EOF { |
| 80 | err = nil |
| 81 | } |
| 82 | return result, err |
| 83 | } |
| 84 | |
| 85 | func (*testServer) Unauthorized(ipc.ServerCall) (string, error) { |
| 86 | return "UnauthorizedResult", fmt.Errorf("Unauthorized should never be called") |
| 87 | } |
| 88 | |
| 89 | type testServerAuthorizer struct{} |
| 90 | |
| 91 | func (testServerAuthorizer) Authorize(c security.Context) error { |
| 92 | if c.Method() != "Unauthorized" { |
| 93 | return nil |
| 94 | } |
| 95 | return errAuthorizer |
| 96 | } |
| 97 | |
| 98 | type testServerDisp struct{ server interface{} } |
| 99 | |
| 100 | func (t testServerDisp) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) { |
| 101 | // If suffix is "nilAuth" we use default authorization, if it is "aclAuth" we |
| 102 | // use an ACL based authorizer, and otherwise we use the custom testServerAuthorizer. |
| 103 | if suffix == "nilAuth" { |
| 104 | return ipc.ReflectInvoker(t.server), nil, nil |
| 105 | } |
| 106 | if suffix == "aclAuth" { |
| 107 | // Only authorize clients matching patterns "client" or "server/*". |
| 108 | acl := security.ACL{ |
| 109 | "server/*": security.LabelSet(security.AdminLabel), |
| 110 | "client": security.LabelSet(security.AdminLabel), |
| 111 | } |
| 112 | return ipc.ReflectInvoker(t.server), isecurity.NewACLAuthorizer(acl), nil |
| 113 | } |
| 114 | return ipc.ReflectInvoker(t.server), testServerAuthorizer{}, nil |
| 115 | } |
| 116 | |
| 117 | func startServer(t *testing.T, serverID security.PrivateID, sm stream.Manager, mt naming.MountTable, ts interface{}) ipc.Server { |
| 118 | vlog.VI(1).Info("InternalNewServer") |
| 119 | server, err := InternalNewServer(sm, mt, veyron2.LocalID(serverID)) |
| 120 | if err != nil { |
| 121 | t.Errorf("InternalNewServer failed: %v", err) |
| 122 | } |
| 123 | vlog.VI(1).Info("server.Register") |
| 124 | disp := testServerDisp{ts} |
| 125 | if err := server.Register("server", disp); err != nil { |
| 126 | t.Errorf("server.Register failed: %v", err) |
| 127 | } |
| 128 | vlog.VI(1).Info("server.Listen") |
| 129 | if _, err := server.Listen("tcp", "localhost:0"); err != nil { |
| 130 | t.Errorf("server.Listen failed: %v", err) |
| 131 | } |
| 132 | vlog.VI(1).Info("server.Publish") |
| 133 | if err := server.Publish("mountpoint"); err != nil { |
| 134 | t.Errorf("server.Publish failed: %v", err) |
| 135 | } |
| 136 | return server |
| 137 | } |
| 138 | |
| 139 | func verifyMount(t *testing.T, mt naming.MountTable, name string) { |
| 140 | if _, err := mt.Resolve(name); err != nil { |
| 141 | t.Errorf("%s not found in mounttable", name) |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | func verifyMountMissing(t *testing.T, mt naming.MountTable, name string) { |
| 146 | if servers, err := mt.Resolve(name); err == nil { |
| 147 | t.Errorf("%s not supposed to be found in mounttable; got %d servers instead", name, len(servers)) |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | func stopServer(t *testing.T, server ipc.Server, mt naming.MountTable) { |
| 152 | vlog.VI(1).Info("server.Stop") |
| 153 | verifyMount(t, mt, "mountpoint/server") |
| 154 | |
| 155 | // Check that we can still publish. |
| 156 | server.Publish("should_appear_in_mt") |
| 157 | verifyMount(t, mt, "should_appear_in_mt/server") |
| 158 | |
| 159 | if err := server.Stop(); err != nil { |
| 160 | t.Errorf("server.Stop failed: %v", err) |
| 161 | } |
| 162 | // Check that we can no longer publish after Stop. |
| 163 | server.Publish("should_not_appear_in_mt") |
| 164 | verifyMountMissing(t, mt, "should_not_appear_in_mt/server") |
| 165 | |
| 166 | verifyMountMissing(t, mt, "mountpoint/server") |
| 167 | verifyMountMissing(t, mt, "should_appear_in_mt/server") |
| 168 | verifyMountMissing(t, mt, "should_not_appear_in_mt/server") |
| 169 | vlog.VI(1).Info("server.Stop DONE") |
| 170 | } |
| 171 | |
| 172 | func createClientAndServer(t *testing.T, clientID, serverID security.PrivateID, ts interface{}) (ipc.Client, ipc.Server, naming.MountTable, stream.Manager) { |
| 173 | streamMgr := imanager.InternalNew(naming.FixedRoutingID(0x555555555)) |
| 174 | mountTable := inaming.InternalNewMountTable() |
| 175 | server := startServer(t, serverID, streamMgr, mountTable, ts) |
| 176 | client, err := InternalNewClient(streamMgr, mountTable, veyron2.LocalID(clientID)) |
| 177 | if err != nil { |
| 178 | t.Fatalf("InternalNewClient failed: %v", err) |
| 179 | } |
| 180 | return client, server, mountTable, streamMgr |
| 181 | } |
| 182 | |
| 183 | func derive(blessor security.PrivateID, name string, caveats []security.ServiceCaveat) security.PrivateID { |
| 184 | id, err := isecurity.NewChainPrivateID("irrelevant") |
| 185 | if err != nil { |
| 186 | panic(err) |
| 187 | } |
| 188 | blessedID, err := blessor.Bless(id.PublicID(), name, 5*time.Minute, caveats) |
| 189 | if err != nil { |
| 190 | panic(err) |
| 191 | } |
| 192 | derivedID, err := id.Derive(blessedID) |
| 193 | if err != nil { |
| 194 | panic(err) |
| 195 | } |
| 196 | return derivedID |
| 197 | } |
| 198 | |
| 199 | func matchesErrorPattern(err error, pattern string) bool { |
| 200 | if (len(pattern) == 0) != (err == nil) { |
| 201 | return false |
| 202 | } |
| 203 | return err == nil || strings.Index(err.Error(), pattern) >= 0 |
| 204 | } |
| 205 | |
| 206 | func TestStartCall(t *testing.T) { |
| 207 | authorizeErr := "has one or more invalid caveats" |
| 208 | nameErr := "does not have a name matching the provided pattern" |
| 209 | |
| 210 | cavOnlyV1 := security.UniversalCaveat(&icaveat.PeerIdentity{Peers: []security.PrincipalPattern{"client/v1"}}) |
| 211 | now := time.Now() |
| 212 | cavExpired := security.ServiceCaveat{ |
| 213 | Service: security.AllPrincipals, |
| 214 | Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now}, |
| 215 | } |
| 216 | |
| 217 | clientV1ID := derive(clientID, "v1", nil) |
| 218 | clientV2ID := derive(clientID, "v2", nil) |
| 219 | serverV1ID := derive(serverID, "v1", []security.ServiceCaveat{cavOnlyV1}) |
| 220 | serverExpiredID := derive(serverID, "expired", []security.ServiceCaveat{cavExpired}) |
| 221 | |
| 222 | tests := []struct { |
| 223 | clientID, serverID security.PrivateID |
| 224 | pattern security.PrincipalPattern // pattern on the server identity expected by client. |
| 225 | err string |
| 226 | }{ |
| 227 | // Client accepts talking to server only if server's identity matches the |
| 228 | // provided pattern. |
| 229 | {clientID, serverID, security.AllPrincipals, ""}, |
| 230 | {clientID, serverID, "server", ""}, |
| 231 | {clientID, serverID, "server/v1", ""}, |
| 232 | {clientID, serverID, "anotherServer", nameErr}, |
| 233 | |
| 234 | // All clients reject talking to a server with an expired identity. |
| 235 | {clientID, serverExpiredID, security.AllPrincipals, authorizeErr}, |
| 236 | {clientV1ID, serverExpiredID, security.AllPrincipals, authorizeErr}, |
| 237 | {clientV2ID, serverExpiredID, security.AllPrincipals, authorizeErr}, |
| 238 | |
| 239 | // Only clientV1 accepts talking to serverV1. |
| 240 | {clientV1ID, serverV1ID, security.AllPrincipals, ""}, |
| 241 | {clientV2ID, serverV1ID, security.AllPrincipals, authorizeErr}, |
| 242 | } |
| 243 | // Servers and clients will be created per-test, use the same stream manager and mounttable. |
| 244 | mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111)) |
| 245 | mt := inaming.InternalNewMountTable() |
| 246 | for _, test := range tests { |
| 247 | name := fmt.Sprintf("(clientID:%q serverID:%q)", test.clientID, test.serverID) |
| 248 | server := startServer(t, test.serverID, mgr, mt, &testServer{}) |
| 249 | client, err := InternalNewClient(mgr, mt, veyron2.LocalID(test.clientID)) |
| 250 | if err != nil { |
| 251 | t.Errorf("%s: Client creation failed: %v", name, err) |
| 252 | continue |
| 253 | } |
| 254 | if _, err := client.StartCall("mountpoint/server/suffix", "irrelevant", nil, veyron2.RemoteID(test.pattern)); !matchesErrorPattern(err, test.err) { |
| 255 | t.Errorf(`%s: client.StartCall: got error "%v", want to match "%v"`, name, err, test.err) |
| 256 | } |
| 257 | client.Close() |
| 258 | stopServer(t, server, mt) |
| 259 | } |
| 260 | } |
| 261 | |
| 262 | func TestRPC(t *testing.T) { |
| 263 | type v []interface{} |
| 264 | type testcase struct { |
| 265 | name string |
| 266 | method string |
| 267 | args v |
| 268 | streamArgs v |
| 269 | startErr error |
| 270 | results v |
| 271 | finishErr error |
| 272 | } |
| 273 | tests := []testcase{ |
| 274 | {"mountpoint/server/suffix", "Closure", nil, nil, nil, nil, nil}, |
| 275 | {"mountpoint/server/suffix", "Error", nil, nil, nil, v{errMethod}, nil}, |
| 276 | |
| 277 | {"mountpoint/server/suffix", "Echo", v{"foo"}, nil, nil, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, nil}, |
| 278 | {"mountpoint/server/suffix/abc", "Echo", v{"bar"}, nil, nil, v{`method:"Echo",suffix:"suffix/abc",arg:"bar"`}, nil}, |
| 279 | |
| 280 | {"mountpoint/server/suffix", "EchoUser", v{"foo", userType("bar")}, nil, nil, v{`method:"EchoUser",suffix:"suffix",arg:"foo"`, userType("bar")}, nil}, |
| 281 | {"mountpoint/server/suffix/abc", "EchoUser", v{"baz", userType("bla")}, nil, nil, v{`method:"EchoUser",suffix:"suffix/abc",arg:"baz"`, userType("bla")}, nil}, |
| 282 | {"mountpoint/server/suffix", "Stream", v{"foo"}, v{userType("bar"), userType("baz")}, nil, v{`method:"Stream",suffix:"suffix",arg:"foo" bar baz`, nil}, nil}, |
| 283 | {"mountpoint/server/suffix/abc", "Stream", v{"123"}, v{userType("456"), userType("789")}, nil, v{`method:"Stream",suffix:"suffix/abc",arg:"123" 456 789`, nil}, nil}, |
| 284 | {"mountpoint/server/suffix", "EchoIDs", nil, nil, nil, v{"server", "client"}, nil}, |
| 285 | {"mountpoint/server/suffix", "EchoAndError", v{"bugs bunny"}, nil, nil, v{`method:"EchoAndError",suffix:"suffix",arg:"bugs bunny"`, nil}, nil}, |
| 286 | {"mountpoint/server/suffix", "EchoAndError", v{"error"}, nil, nil, v{`method:"EchoAndError",suffix:"suffix",arg:"error"`, errMethod}, nil}, |
| 287 | } |
| 288 | name := func(t testcase) string { |
| 289 | return fmt.Sprintf("%s.%s(%v)", t.name, t.method, t.args) |
| 290 | } |
| 291 | client, server, mt, _ := createClientAndServer(t, clientID, serverID, &testServer{}) |
| 292 | defer stopServer(t, server, mt) |
| 293 | defer client.Close() |
| 294 | for _, test := range tests { |
| 295 | vlog.VI(1).Infof("%s client.StartCall", name(test)) |
| 296 | call, err := client.StartCall(test.name, test.method, test.args) |
| 297 | if err != test.startErr { |
| 298 | t.Errorf(`%s client.StartCall got error "%v", want "%v"`, name(test), err, test.startErr) |
| 299 | continue |
| 300 | } |
| 301 | for _, sarg := range test.streamArgs { |
| 302 | vlog.VI(1).Infof("%s client.Send(%v)", name(test), sarg) |
| 303 | if err := call.Send(sarg); err != nil { |
| 304 | t.Errorf(`%s call.Send(%v) got unexpected error "%v"`, name(test), sarg, err) |
| 305 | } |
| 306 | var u userType |
| 307 | if err := call.Recv(&u); err != nil { |
| 308 | t.Errorf(`%s call.Recv(%v) got unexpected error "%v"`, name(test), sarg, err) |
| 309 | } |
| 310 | if !reflect.DeepEqual(u, sarg) { |
| 311 | t.Errorf("%s call.Recv got value %v, want %v", name(test), u, sarg) |
| 312 | } |
| 313 | } |
| 314 | vlog.VI(1).Infof("%s call.CloseSend", name(test)) |
| 315 | if err := call.CloseSend(); err != nil { |
| 316 | t.Errorf(`%s call.CloseSend got unexpected error "%v"`, name(test), err) |
| 317 | } |
| 318 | vlog.VI(1).Infof("%s client.Finish", name(test)) |
| 319 | results := makeResultPtrs(test.results) |
| 320 | err = call.Finish(results...) |
| 321 | if err != test.finishErr { |
| 322 | t.Errorf(`%s call.Finish got error "%v", want "%v"`, name(test), err, test.finishErr) |
| 323 | } |
| 324 | checkResultPtrs(t, name(test), results, test.results) |
| 325 | } |
| 326 | } |
| 327 | |
| 328 | func TestRPCAuthorization(t *testing.T) { |
| 329 | cavOnlyEcho := security.ServiceCaveat{ |
| 330 | Service: security.AllPrincipals, |
| 331 | Caveat: &icaveat.MethodRestriction{[]string{"Echo"}}, |
| 332 | } |
| 333 | now := time.Now() |
| 334 | cavExpired := security.ServiceCaveat{ |
| 335 | Service: security.AllPrincipals, |
| 336 | Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now}, |
| 337 | } |
| 338 | |
| 339 | blessedByServerOnlyEcho := derive(serverID, "onlyEcho", []security.ServiceCaveat{cavOnlyEcho}) |
| 340 | blessedByServerExpired := derive(serverID, "expired", []security.ServiceCaveat{cavExpired}) |
| 341 | blessedByClient := derive(clientID, "blessed", nil) |
| 342 | |
| 343 | const ( |
| 344 | expiredIDErr = "forbids credential from being used at this time" |
| 345 | nilAuthErr = "no matching principal pattern found" |
| 346 | aclAuthErr = "no matching ACL entry found" |
| 347 | ) |
| 348 | invalidMethodErr := func(method string) string { |
| 349 | return fmt.Sprintf("caveat.MethodRestriction{Methods:[Echo]} forbids invocation of method %s", method) |
| 350 | } |
| 351 | |
| 352 | type v []interface{} |
| 353 | type testcase struct { |
| 354 | clientID security.PrivateID |
| 355 | name string |
| 356 | method string |
| 357 | args v |
| 358 | results v |
| 359 | finishErr string |
| 360 | } |
| 361 | tests := []testcase{ |
| 362 | // Clients whose identities have invalid caveats are not by authorized by any authorizer. |
| 363 | {blessedByServerExpired, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, expiredIDErr}, |
| 364 | {blessedByServerExpired, "mountpoint/server/suffix", "Echo", v{"foo"}, v{""}, expiredIDErr}, |
| 365 | {blessedByServerOnlyEcho, "mountpoint/server/nilAuth", "Closure", nil, nil, invalidMethodErr("Closure")}, |
| 366 | {blessedByServerOnlyEcho, "mountpoint/server/suffix", "Closure", nil, nil, invalidMethodErr("Closure")}, |
| 367 | // Only clients with a trusted name that matches either the server's identity or an identity blessed |
| 368 | // by the server are authorized by the (default) nilAuth authorizer. |
| 369 | {clientID, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, nilAuthErr}, |
| 370 | {blessedByClient, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{""}, nilAuthErr}, |
| 371 | {serverID, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"nilAuth",arg:"foo"`}, ""}, |
| 372 | {serverID, "mountpoint/server/nilAuth", "Closure", nil, nil, ""}, |
| 373 | {blessedByServerOnlyEcho, "mountpoint/server/nilAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"nilAuth",arg:"foo"`}, ""}, |
| 374 | // Only clients matching the server's ACL are authorized. |
| 375 | {clientID, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""}, |
| 376 | {blessedByClient, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{""}, aclAuthErr}, |
| 377 | {serverID, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""}, |
| 378 | {blessedByServerOnlyEcho, "mountpoint/server/aclAuth", "Echo", v{"foo"}, v{`method:"Echo",suffix:"aclAuth",arg:"foo"`}, ""}, |
| 379 | {clientID, "mountpoint/server/aclAuth", "Closure", nil, nil, ""}, |
| 380 | {blessedByClient, "mountpoint/server/aclAuth", "Closure", nil, nil, aclAuthErr}, |
| 381 | {serverID, "mountpoint/server/aclAuth", "Closure", nil, nil, ""}, |
| 382 | |
| 383 | // All methods except "Unauthorized" are authorized by the custom authorizer. |
| 384 | {clientID, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""}, |
| 385 | {blessedByClient, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""}, |
| 386 | {serverID, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""}, |
| 387 | {blessedByServerOnlyEcho, "mountpoint/server/suffix", "Echo", v{"foo"}, v{`method:"Echo",suffix:"suffix",arg:"foo"`}, ""}, |
| 388 | {clientID, "mountpoint/server/suffix", "Closure", nil, nil, ""}, |
| 389 | {blessedByClient, "mountpoint/server/suffix", "Closure", nil, nil, ""}, |
| 390 | {serverID, "mountpoint/server/suffix", "Closure", nil, nil, ""}, |
| 391 | {clientID, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"}, |
| 392 | {blessedByClient, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"}, |
| 393 | {serverID, "mountpoint/server/suffix", "Unauthorized", nil, v{""}, "application Authorizer denied access"}, |
| 394 | } |
| 395 | name := func(t testcase) string { |
| 396 | return fmt.Sprintf("%q RPCing %s.%s(%v)", t.clientID.PublicID(), t.name, t.method, t.args) |
| 397 | } |
| 398 | |
| 399 | dummyClient, server, mt, sm := createClientAndServer(t, nil, serverID, &testServer{}) |
| 400 | defer stopServer(t, server, mt) |
| 401 | defer dummyClient.Close() |
| 402 | for _, test := range tests { |
| 403 | client, err := InternalNewClient(sm, mt, veyron2.LocalID(test.clientID)) |
| 404 | if err != nil { |
| 405 | t.Fatalf("InternalNewClient failed: %v", err) |
| 406 | } |
| 407 | defer client.Close() |
| 408 | call, err := client.StartCall(test.name, test.method, test.args) |
| 409 | if err != nil { |
| 410 | t.Errorf(`%s client.StartCall got unexpected error: "%v"`, name(test), err) |
| 411 | continue |
| 412 | } |
| 413 | if err := call.CloseSend(); err != nil { |
| 414 | t.Errorf(`%s call.CloseSend got unexpected error: "%v"`, name(test), err) |
| 415 | } |
| 416 | results := makeResultPtrs(test.results) |
| 417 | err = call.Finish(results...) |
| 418 | if !matchesErrorPattern(err, test.finishErr) { |
| 419 | t.Errorf(`%s call.Finish got error: "%v", want to match: "%v"`, name(test), err, test.finishErr) |
| 420 | } |
| 421 | } |
| 422 | } |
| 423 | |
| 424 | type cancelTestServer struct { |
| 425 | started chan struct{} |
| 426 | cancelled chan struct{} |
| 427 | } |
| 428 | |
| 429 | func newCancelTestServer() *cancelTestServer { |
| 430 | return &cancelTestServer{ |
| 431 | started: make(chan struct{}), |
| 432 | cancelled: make(chan struct{}), |
| 433 | } |
| 434 | } |
| 435 | |
| 436 | func (s *cancelTestServer) CancelStreamReader(call ipc.ServerCall) error { |
| 437 | close(s.started) |
| 438 | for { |
| 439 | var b []byte |
| 440 | if err := call.Recv(&b); err != nil && err != io.EOF { |
| 441 | return err |
| 442 | } |
| 443 | if call.IsClosed() { |
| 444 | close(s.cancelled) |
| 445 | return nil |
| 446 | } |
| 447 | } |
| 448 | } |
| 449 | |
| 450 | // CancelStreamIgnorer doesn't read from it's input stream so all it's |
| 451 | // buffers fill. The intention is to show that call.IsClosed is updated |
| 452 | // even when the stream is stalled. |
| 453 | func (s *cancelTestServer) CancelStreamIgnorer(call ipc.ServerCall) error { |
| 454 | close(s.started) |
| 455 | for { |
| 456 | time.Sleep(time.Millisecond) |
| 457 | if call.IsClosed() { |
| 458 | close(s.cancelled) |
| 459 | return nil |
| 460 | } |
| 461 | } |
| 462 | } |
| 463 | |
| 464 | func waitForCancel(t *testing.T, ts *cancelTestServer, call ipc.ClientCall) { |
| 465 | <-ts.started |
| 466 | call.Cancel() |
| 467 | <-ts.cancelled |
| 468 | } |
| 469 | |
| 470 | // TestCancel tests cancellation while the server is reading from a stream. |
| 471 | func TestCancel(t *testing.T) { |
| 472 | ts := newCancelTestServer() |
| 473 | client, server, mt, _ := createClientAndServer(t, clientID, serverID, ts) |
| 474 | defer stopServer(t, server, mt) |
| 475 | defer client.Close() |
| 476 | |
| 477 | call, err := client.StartCall("mountpoint/server/suffix", "CancelStreamReader", []interface{}{}) |
| 478 | if err != nil { |
| 479 | t.Fatalf("Start call failed: %v", err) |
| 480 | } |
| 481 | for i := 0; i <= 10; i++ { |
| 482 | b := []byte{1, 2, 3} |
| 483 | if err := call.Send(b); err != nil { |
| 484 | t.Errorf("clientCall.Send error %q", err) |
| 485 | } |
| 486 | } |
| 487 | waitForCancel(t, ts, call) |
| 488 | } |
| 489 | |
| 490 | // TestCancelWithFullBuffers tests that even if the writer has filled the buffers and |
| 491 | // the server is not reading that the cancel message gets through. |
| 492 | func TestCancelWithFullBuffers(t *testing.T) { |
| 493 | ts := newCancelTestServer() |
| 494 | client, server, mt, _ := createClientAndServer(t, clientID, serverID, ts) |
| 495 | defer stopServer(t, server, mt) |
| 496 | defer client.Close() |
| 497 | |
| 498 | call, err := client.StartCall("mountpoint/server/suffix", "CancelStreamIgnorer", []interface{}{}) |
| 499 | if err != nil { |
| 500 | t.Fatalf("Start call failed: %v", err) |
| 501 | } |
| 502 | // Fill up all the write buffers to ensure that cancelling works even when the stream |
| 503 | // is blocked. |
| 504 | call.Send(make([]byte, vc.MaxSharedBytes)) |
| 505 | call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow)) |
| 506 | |
| 507 | waitForCancel(t, ts, call) |
| 508 | } |
| 509 | |
| 510 | type streamRecvInGoroutineServer struct{ c chan error } |
| 511 | |
| 512 | func (s *streamRecvInGoroutineServer) RecvInGoroutine(call ipc.ServerCall) error { |
| 513 | // Spawn a goroutine to read streaming data from the client. |
| 514 | go func() { |
| 515 | var i interface{} |
| 516 | for { |
| 517 | err := call.Recv(&i) |
| 518 | if err != nil { |
| 519 | s.c <- err |
| 520 | return |
| 521 | } |
| 522 | } |
| 523 | }() |
| 524 | // Imagine the server did some processing here and now that it is done, |
| 525 | // it does not care to see what else the client has to say. |
| 526 | return nil |
| 527 | } |
| 528 | |
| 529 | func TestStreamReadTerminatedByServer(t *testing.T) { |
| 530 | s := &streamRecvInGoroutineServer{c: make(chan error, 1)} |
| 531 | client, server, mt, _ := createClientAndServer(t, clientID, serverID, s) |
| 532 | defer stopServer(t, server, mt) |
| 533 | defer client.Close() |
| 534 | |
| 535 | call, err := client.StartCall("mountpoint/server/suffix", "RecvInGoroutine", []interface{}{}) |
| 536 | if err != nil { |
| 537 | t.Fatalf("StartCall failed: %v", err) |
| 538 | } |
| 539 | |
| 540 | c := make(chan error, 1) |
| 541 | go func() { |
| 542 | for i := 0; true; i++ { |
| 543 | if err := call.Send(i); err != nil { |
| 544 | c <- err |
| 545 | return |
| 546 | } |
| 547 | } |
| 548 | }() |
| 549 | |
| 550 | // The goroutine at the server executing "Recv" should have terminated |
| 551 | // with EOF. |
| 552 | if err := <-s.c; err != io.EOF { |
| 553 | t.Errorf("Got %v at server, want io.EOF", err) |
| 554 | } |
| 555 | // The client Send should have failed since the RPC has been |
| 556 | // terminated. |
| 557 | if err := <-c; err == nil { |
| 558 | t.Errorf("Client Send should fail as the server should have closed the flow") |
| 559 | } |
| 560 | } |
| 561 | |
| 562 | // TestConnectWithIncompatibleServers tests that clients ignore incompatible endpoints. |
| 563 | func TestConnectWithIncompatibleServers(t *testing.T) { |
| 564 | client, server, mt, _ := createClientAndServer(t, clientID, serverID, &testServer{}) |
| 565 | |
| 566 | defer stopServer(t, server, mt) |
| 567 | defer client.Close() |
| 568 | |
| 569 | // Publish some incompatible endpoints. |
| 570 | publisher := InternalNewPublisher(mt, publishPeriod) |
| 571 | defer publisher.WaitForStop() |
| 572 | defer publisher.Stop() |
| 573 | publisher.AddName("incompatible") |
| 574 | publisher.AddServer("/@2@tcp@localhost:10000@@1000000@2000000@@") |
| 575 | publisher.AddServer("/@2@tcp@localhost:10001@@2000000@3000000@@") |
| 576 | |
| 577 | _, err := client.StartCall("incompatible/server/suffix", "Echo", []interface{}{"foo"}) |
| 578 | if !strings.Contains(err.Error(), version.NoCompatibleVersionErr.Error()) { |
| 579 | t.Errorf("Expected error %v, found: %v", version.NoCompatibleVersionErr, err) |
| 580 | } |
| 581 | |
| 582 | // Now add a server with a compatible endpoint and try again. |
| 583 | server.Publish("incompatible") |
| 584 | |
| 585 | call, err := client.StartCall("incompatible/server/suffix", "Echo", []interface{}{"foo"}) |
| 586 | expected := `method:"Echo",suffix:"suffix",arg:"foo"` |
| 587 | var result string |
| 588 | err = call.Finish(&result) |
| 589 | if err != nil { |
| 590 | t.Errorf("Unexpected error finishing call %v", err) |
| 591 | } |
| 592 | if result != expected { |
| 593 | t.Errorf("Wrong result returned. Got %s, wanted %s", result, expected) |
| 594 | } |
| 595 | } |
| 596 | |
| 597 | func init() { |
| 598 | var err error |
| 599 | if clientID, err = isecurity.NewChainPrivateID("client"); err != nil { |
| 600 | log.Fatalf("failed isecurity.NewChainPrivateID: %s", err) |
| 601 | } |
| 602 | if serverID, err = isecurity.NewChainPrivateID("server"); err != nil { |
| 603 | log.Fatalf("failed isecurity.NewChainPrivateID: %s", err) |
| 604 | } |
| 605 | isecurity.TrustIdentityProviders(clientID) |
| 606 | isecurity.TrustIdentityProviders(serverID) |
| 607 | } |