| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package impl |
| |
| import ( |
| "fmt" |
| "io" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/v23/rpc" |
| "v.io/v23/security/access" |
| "v.io/v23/vdl" |
| "v.io/v23/vdlroot/signature" |
| "v.io/v23/verror" |
| ) |
| |
| var ( |
| errCantUpgradeServerCall = verror.Register(pkgPath+".errCantUpgradeServerCall", verror.NoRetry, "{1:}{2:} couldn't upgrade rpc.ServerCall to rpc.StreamServerCall{:_}") |
| errBadNumberOfResults = verror.Register(pkgPath+".errBadNumberOfResults", verror.NoRetry, "{1:}{2:} unexpected number of result values. Got {3}, want 2.{:_}") |
| errBadErrorType = verror.Register(pkgPath+".errBadErrorType", verror.NoRetry, "{1:}{2:} unexpected error type. Got {3}, want error.{:_}") |
| errWantSigInterfaceSlice = verror.Register(pkgPath+".errWantSigInterfaceSlice", verror.NoRetry, "{1:}{2:} unexpected result value type. Got {3}, want []signature.Interface.{:_}") |
| errWantSigMethod = verror.Register(pkgPath+".errWantSigMethod", verror.NoRetry, "{1:}{2:} unexpected result value type. Got {3}, want signature.Method.{:_}") |
| errUnknownMethod = verror.Register(pkgPath+".errUnknownMethod", verror.NoRetry, "{1:}{2:} unknown method{:_}") |
| ) |
| |
| // proxyInvoker is an rpc.Invoker implementation that proxies all requests |
| // to a remote object, i.e. requests to <suffix> are forwarded to |
| // <remote> transparently. |
| // |
| // remote is the name of the remote object. |
| // access is the access tag require to access the object. |
| // desc is used to determine the number of results for a given method. |
| func newProxyInvoker(remote string, access access.Tag, desc []rpc.InterfaceDesc) *proxyInvoker { |
| methodNumResults := make(map[string]int) |
| for _, iface := range desc { |
| for _, method := range iface.Methods { |
| methodNumResults[method.Name] = len(method.OutArgs) |
| } |
| } |
| return &proxyInvoker{remote, access, methodNumResults} |
| } |
| |
| type proxyInvoker struct { |
| remote string |
| access access.Tag |
| methodNumResults map[string]int |
| } |
| |
| var _ rpc.Invoker = (*proxyInvoker)(nil) |
| |
| func (p *proxyInvoker) Prepare(method string, numArgs int) (argptrs []interface{}, tags []*vdl.Value, _ error) { |
| // TODO(toddw): Change argptrs to be filled in with *vdl.Value, to avoid |
| // unnecessary type lookups. |
| argptrs = make([]interface{}, numArgs) |
| for i, _ := range argptrs { |
| var x interface{} |
| argptrs[i] = &x |
| } |
| tags = []*vdl.Value{vdl.ValueOf(p.access)} |
| return |
| } |
| |
| func (p *proxyInvoker) Invoke(ctx *context.T, inCall rpc.StreamServerCall, method string, argptrs []interface{}) (results []interface{}, err error) { |
| // We accept any values as argument and pass them through to the remote |
| // server. |
| args := make([]interface{}, len(argptrs)) |
| for i, ap := range argptrs { |
| args[i] = ap |
| } |
| client := v23.GetClient(ctx) |
| |
| outCall, err := client.StartCall(ctx, p.remote, method, args) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Each RPC has a bi-directional stream, and there is no way to know in |
| // advance how much data will be sent in either direction, if any. |
| // |
| // This method (Invoke) must return when the remote server is done with |
| // the RPC, which is when outCall.Recv() returns EOF. When that happens, |
| // we need to call outCall.Finish() to get the return values, and then |
| // return these values to the client. |
| // |
| // While we are forwarding data from the server to the client, we must |
| // also forward data from the client to the server. This happens in a |
| // separate goroutine. This goroutine may return after Invoke has |
| // returned if the client doesn't call CloseSend() explicitly. |
| // |
| // Any error, other than EOF, will be returned to the client, if |
| // possible. The only situation where it is not possible to send an |
| // error to the client is when the error comes from forwarding data from |
| // the client to the server and Invoke has already returned or is about |
| // to return. In this case, the error is lost. So, it is possible that |
| // the client could successfully Send() data that the server doesn't |
| // actually receive if the server terminates the RPC while the data is |
| // in the proxy. |
| fwd := func(src, dst rpc.Stream, errors chan<- error) { |
| for { |
| var obj interface{} |
| switch err := src.Recv(&obj); err { |
| case io.EOF: |
| if call, ok := src.(rpc.ClientCall); ok { |
| if err := call.CloseSend(); err != nil { |
| errors <- err |
| } |
| } |
| return |
| case nil: |
| break |
| default: |
| errors <- err |
| return |
| } |
| if err := dst.Send(obj); err != nil { |
| errors <- err |
| return |
| } |
| } |
| } |
| errors := make(chan error, 2) |
| go fwd(inCall, outCall, errors) |
| fwd(outCall, inCall, errors) |
| select { |
| case err := <-errors: |
| return nil, err |
| default: |
| } |
| |
| nResults, err := p.numResults(ctx, method) |
| if err != nil { |
| return nil, err |
| } |
| |
| // We accept any return values, without type checking, and return them |
| // to the client. |
| res := make([]interface{}, nResults) |
| for i := 0; i < len(res); i++ { |
| var foo interface{} |
| res[i] = &foo |
| } |
| err = outCall.Finish(res...) |
| results = make([]interface{}, len(res)) |
| for i, r := range res { |
| results[i] = *r.(*interface{}) |
| } |
| return results, err |
| } |
| |
| // TODO(toddw): Expose a helper function that performs all error checking based |
| // on reflection, to simplify the repeated logic processing results. |
| func (p *proxyInvoker) Signature(ctx *context.T, call rpc.ServerCall) ([]signature.Interface, error) { |
| streamCall, ok := call.(rpc.StreamServerCall) |
| if !ok { |
| return nil, verror.New(errCantUpgradeServerCall, ctx) |
| } |
| results, err := p.Invoke(ctx, streamCall, rpc.ReservedSignature, nil) |
| if err != nil { |
| return nil, err |
| } |
| if len(results) != 2 { |
| return nil, verror.New(errBadNumberOfResults, ctx, len(results)) |
| } |
| if results[1] != nil { |
| err, ok := results[1].(error) |
| if !ok { |
| return nil, verror.New(errBadErrorType, ctx, fmt.Sprintf("%T", err)) |
| } |
| return nil, err |
| } |
| var res []signature.Interface |
| if results[0] != nil { |
| sig, ok := results[0].([]signature.Interface) |
| if !ok { |
| return nil, verror.New(errWantSigInterfaceSlice, ctx, fmt.Sprintf("%T", sig)) |
| } |
| } |
| return res, nil |
| } |
| |
| func (p *proxyInvoker) MethodSignature(ctx *context.T, call rpc.ServerCall, method string) (signature.Method, error) { |
| empty := signature.Method{} |
| streamCall, ok := call.(rpc.StreamServerCall) |
| if !ok { |
| return empty, verror.New(errCantUpgradeServerCall, ctx) |
| } |
| results, err := p.Invoke(ctx, streamCall, rpc.ReservedMethodSignature, []interface{}{&method}) |
| if err != nil { |
| return empty, err |
| } |
| if len(results) != 2 { |
| return empty, verror.New(errBadNumberOfResults, ctx, len(results)) |
| } |
| if results[1] != nil { |
| err, ok := results[1].(error) |
| if !ok { |
| return empty, verror.New(errBadErrorType, ctx, fmt.Sprintf("%T", err)) |
| } |
| return empty, err |
| } |
| var res signature.Method |
| if results[0] != nil { |
| sig, ok := results[0].(signature.Method) |
| if !ok { |
| return empty, verror.New(errWantSigMethod, ctx, fmt.Sprintf("%T", sig)) |
| } |
| } |
| return res, nil |
| } |
| |
| func (p *proxyInvoker) Globber() *rpc.GlobState { |
| return &rpc.GlobState{AllGlobber: p} |
| } |
| |
| type call struct { |
| rpc.ServerCall |
| ch chan<- naming.GlobReply |
| } |
| |
| func (c *call) Recv(v interface{}) error { |
| return io.EOF |
| } |
| |
| func (c *call) Send(v interface{}) error { |
| c.ch <- v.(naming.GlobReply) |
| return nil |
| } |
| |
| func (p *proxyInvoker) Glob__(ctx *context.T, serverCall rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) { |
| ch := make(chan naming.GlobReply) |
| go func() { |
| p.Invoke(ctx, &call{serverCall, ch}, rpc.GlobMethod, []interface{}{&pattern}) |
| close(ch) |
| }() |
| return ch, nil |
| } |
| |
| // numResults returns the number of result values for the given method. |
| func (p *proxyInvoker) numResults(ctx *context.T, method string) (int, error) { |
| switch method { |
| case rpc.GlobMethod: |
| return 1, nil |
| case rpc.ReservedSignature, rpc.ReservedMethodSignature: |
| return 2, nil |
| } |
| num, ok := p.methodNumResults[method] |
| if !ok { |
| return 0, verror.New(errUnknownMethod, ctx, method) |
| } |
| return num, nil |
| } |