blob: b3f52f00366da0260f9b06b0121228a2d87d7812 [file] [log] [blame]
package impl
import (
"fmt"
"io"
"veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/services/security/access"
"veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
)
// proxyInvoker is an ipc.Invoker implementation that proxies all requests
// to a remote object, i.e. requests to <suffix> are forwarded to
// <remote> transparently.
//
// remote is the name of the remote object.
// access is the access tag require to access the object.
// sigStub is used to get the signature of the remote object.
type proxyInvoker struct {
remote string
access access.Tag
sigStub signatureStub
}
var _ ipc.Invoker = (*proxyInvoker)(nil)
type signatureStub interface {
Signature(ipc.ServerContext) (ipc.ServiceSignature, error)
}
func (p *proxyInvoker) Prepare(method string, numArgs int) (argptrs, tags []interface{}, err error) {
argptrs = make([]interface{}, numArgs)
for i, _ := range argptrs {
var x interface{}
argptrs[i] = &x
}
tags = []interface{}{p.access}
return
}
func (p *proxyInvoker) Invoke(method string, inCall ipc.ServerCall, argptrs []interface{}) (results []interface{}, err error) {
// We accept any values as argument and pass them through to the remote
// server.
args := make([]interface{}, len(argptrs))
for i, ap := range argptrs {
args[i] = ap
}
runtime := veyron2.RuntimeFromContext(inCall.Context())
outCall, err := runtime.Client().StartCall(inCall.Context(), p.remote, method, args)
if err != nil {
return nil, err
}
// Each RPC has a bi-directional stream, and there is no way to know in
// advance how much data will be sent in either direction, if any.
//
// This method (Invoke) must return when the remote server is done with
// the RPC, which is when outCall.Recv() returns EOF. When that happens,
// we need to call outCall.Finish() to get the return values, and then
// return these values to the client.
//
// While we are forwarding data from the server to the client, we must
// also forward data from the client to the server. This happens in a
// separate goroutine. This goroutine may return after Invoke has
// returned if the client doesn't call CloseSend() explicitly.
//
// Any error, other than EOF, will be returned to the client, if
// possible. The only situation where it is not possible to send an
// error to the client is when the error comes from forwarding data from
// the client to the server and Invoke has already returned or is about
// to return. In this case, the error is lost. So, it is possible that
// the client could successfully Send() data that the server doesn't
// actually receive if the server terminates the RPC while the data is
// in the proxy.
fwd := func(src, dst ipc.Stream, errors chan<- error) {
for {
var obj interface{}
switch err := src.Recv(&obj); err {
case io.EOF:
if call, ok := src.(ipc.Call); ok {
if err := call.CloseSend(); err != nil {
errors <- err
}
}
return
case nil:
break
default:
errors <- err
return
}
if err := dst.Send(obj); err != nil {
errors <- err
return
}
}
}
errors := make(chan error, 2)
go fwd(inCall, outCall, errors)
fwd(outCall, inCall, errors)
select {
case err := <-errors:
return nil, err
default:
}
nResults, err := p.numResults(method)
if err != nil {
return nil, err
}
// We accept any return values, without type checking, and return them
// to the client.
res := make([]interface{}, nResults)
for i := 0; i < len(res); i++ {
var foo interface{}
res[i] = &foo
}
err = outCall.Finish(res...)
results = make([]interface{}, len(res))
for i, r := range res {
results[i] = *r.(*interface{})
}
return results, err
}
// TODO(toddw): Expose a helper function that performs all error checking based
// on reflection, to simplify the repeated logic processing results.
func (p *proxyInvoker) Signature(ctx ipc.ServerContext) ([]signature.Interface, error) {
call, ok := ctx.(ipc.ServerCall)
if !ok {
return nil, fmt.Errorf("couldn't upgrade ipc.ServerContext to ipc.ServerCall")
}
results, err := p.Invoke(ipc.ReservedSignature, call, nil)
if err != nil {
return nil, err
}
if len(results) != 2 {
return nil, fmt.Errorf("unexpected number of result values. Got %d, want 2.", len(results))
}
if results[1] != nil {
err, ok := results[1].(error)
if !ok {
return nil, fmt.Errorf("unexpected error type. Got %T, want error.", err)
}
return nil, err
}
var res []signature.Interface
if results[0] != nil {
sig, ok := results[0].([]signature.Interface)
if !ok {
return nil, fmt.Errorf("unexpected result value type. Got %T, want []signature.Interface.", sig)
}
}
return res, nil
}
func (p *proxyInvoker) MethodSignature(ctx ipc.ServerContext, method string) (signature.Method, error) {
empty := signature.Method{}
call, ok := ctx.(ipc.ServerCall)
if !ok {
return empty, fmt.Errorf("couldn't upgrade ipc.ServerContext to ipc.ServerCall")
}
results, err := p.Invoke(ipc.ReservedMethodSignature, call, []interface{}{&method})
if err != nil {
return empty, err
}
if len(results) != 2 {
return empty, fmt.Errorf("unexpected number of result values. Got %d, want 2.", len(results))
}
if results[1] != nil {
err, ok := results[1].(error)
if !ok {
return empty, fmt.Errorf("unexpected error type. Got %T, want error.", err)
}
return empty, err
}
var res signature.Method
if results[0] != nil {
sig, ok := results[0].(signature.Method)
if !ok {
return empty, fmt.Errorf("unexpected result value type. Got %T, want signature.Method.", sig)
}
}
return res, nil
}
func (p *proxyInvoker) Globber() *ipc.GlobState {
return &ipc.GlobState{AllGlobber: p}
}
type call struct {
ipc.ServerContext
ch chan<- naming.VDLMountEntry
}
func (c *call) Recv(v interface{}) error {
return io.EOF
}
func (c *call) Send(v interface{}) error {
c.ch <- v.(naming.VDLMountEntry)
return nil
}
func (p *proxyInvoker) Glob__(ctx ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
ch := make(chan naming.VDLMountEntry)
go func() {
p.Invoke(ipc.GlobMethod, &call{ctx, ch}, []interface{}{&pattern})
close(ch)
}()
return ch, nil
}
// numResults returns the number of result values for the given method.
func (p *proxyInvoker) numResults(method string) (int, error) {
// TODO(toddw): Replace this mechanism when the new signature mechanism is
// complete.
switch method {
case ipc.GlobMethod:
return 1, nil
case ipc.ReservedSignature, ipc.ReservedMethodSignature:
return 2, nil
}
sig, err := p.sigStub.Signature(nil)
if err != nil {
return 0, err
}
m, ok := sig.Methods[method]
if !ok {
return 0, fmt.Errorf("unknown method %q", method)
}
return len(m.OutArgs), nil
}