blob: 1617dac96df3b4c09e78aca0170555a0581c608d [file] [log] [blame]
Robin Thellendb9dd9bb2014-10-29 13:54:08 -07001package impl
2
3import (
4 "fmt"
5 "io"
6
7 "veyron.io/veyron/veyron2/ipc"
8 "veyron.io/veyron/veyron2/rt"
9 "veyron.io/veyron/veyron2/security"
10)
11
12// proxyInvoker is an ipc.Invoker implementation that proxies all requests
13// to a remote object, i.e. requests to <suffix> are forwarded to
14// <remote> transparently.
15//
16// remote is the name of the remote object.
17// label is the security label required to access this object.
18// sigStub is used to get the signature of the remote object.
19type proxyInvoker struct {
20 remote string
21 label security.Label
22 sigStub signatureStub
23}
24
Todd Wang5739dda2014-11-16 22:44:02 -080025var _ ipc.Invoker = (*proxyInvoker)(nil)
26
Robin Thellendb9dd9bb2014-10-29 13:54:08 -070027type signatureStub interface {
Todd Wang1fe7cdd2014-11-12 12:51:49 -080028 Signature(ipc.ServerContext) (ipc.ServiceSignature, error)
Robin Thellendb9dd9bb2014-10-29 13:54:08 -070029}
30
Asim Shankar214f89c2014-11-03 16:35:47 -080031func (p *proxyInvoker) Prepare(method string, numArgs int) (argptrs, tags []interface{}, err error) {
Robin Thellendb9dd9bb2014-10-29 13:54:08 -070032 argptrs = make([]interface{}, numArgs)
33 for i, _ := range argptrs {
34 var x interface{}
35 argptrs[i] = &x
36 }
Asim Shankar214f89c2014-11-03 16:35:47 -080037 tags = []interface{}{p.label}
Robin Thellendb9dd9bb2014-10-29 13:54:08 -070038 return
39}
40
41func (p *proxyInvoker) Invoke(method string, inCall ipc.ServerCall, argptrs []interface{}) (results []interface{}, err error) {
42 // We accept any values as argument and pass them through to the remote
43 // server.
44 args := make([]interface{}, len(argptrs))
45 for i, ap := range argptrs {
46 args[i] = ap
47 }
48 outCall, err := rt.R().Client().StartCall(inCall, p.remote, method, args)
49 if err != nil {
50 return nil, err
51 }
52
53 // Each RPC has a bi-directional stream, and there is no way to know in
54 // advance how much data will be sent in either direction, if any.
55 //
56 // This method (Invoke) must return when the remote server is done with
57 // the RPC, which is when outCall.Recv() returns EOF. When that happens,
58 // we need to call outCall.Finish() to get the return values, and then
59 // return these values to the client.
60 //
61 // While we are forwarding data from the server to the client, we must
62 // also forward data from the client to the server. This happens in a
63 // separate goroutine. This goroutine may return after Invoke has
64 // returned if the client doesn't call CloseSend() explicitly.
65 //
66 // Any error, other than EOF, will be returned to the client, if
67 // possible. The only situation where it is not possible to send an
68 // error to the client is when the error comes from forwarding data from
69 // the client to the server and Invoke has already returned or is about
70 // to return. In this case, the error is lost. So, it is possible that
71 // the client could successfully Send() data that the server doesn't
72 // actually receive if the server terminates the RPC while the data is
73 // in the proxy.
74 fwd := func(src, dst ipc.Stream, errors chan<- error) {
75 for {
76 var obj interface{}
77 switch err := src.Recv(&obj); err {
78 case io.EOF:
79 if call, ok := src.(ipc.Call); ok {
80 if err := call.CloseSend(); err != nil {
81 errors <- err
82 }
83 }
84 return
85 case nil:
86 break
87 default:
88 errors <- err
89 return
90 }
91 if err := dst.Send(obj); err != nil {
92 errors <- err
93 return
94 }
95 }
96 }
97 errors := make(chan error, 2)
98 go fwd(inCall, outCall, errors)
99 fwd(outCall, inCall, errors)
100 select {
101 case err := <-errors:
102 return nil, err
103 default:
104 }
105
106 nResults, err := p.numResults(method)
107 if err != nil {
108 return nil, err
109 }
110
111 // We accept any return values, without type checking, and return them
112 // to the client.
113 res := make([]interface{}, nResults)
114 for i := 0; i < len(res); i++ {
115 var foo interface{}
116 res[i] = &foo
117 }
118 err = outCall.Finish(res...)
119 results = make([]interface{}, len(res))
120 for i, r := range res {
121 results[i] = *r.(*interface{})
122 }
123 return results, err
124}
125
Todd Wang5739dda2014-11-16 22:44:02 -0800126// TODO(toddw): Expose a helper function that performs all error checking based
127// on reflection, to simplify the repeated logic processing results.
128func (p *proxyInvoker) Signature(ctx ipc.ServerContext) ([]ipc.InterfaceSig, error) {
129 call, ok := ctx.(ipc.ServerCall)
130 if !ok {
131 return nil, fmt.Errorf("couldn't upgrade ipc.ServerContext to ipc.ServerCall")
132 }
133 results, err := p.Invoke(ipc.ReservedSignature, call, nil)
134 if err != nil {
135 return nil, err
136 }
137 if len(results) != 2 {
138 return nil, fmt.Errorf("unexpected number of result values. Got %d, want 2.", len(results))
139 }
140 if results[1] != nil {
141 err, ok := results[1].(error)
142 if !ok {
143 return nil, fmt.Errorf("unexpected error type. Got %T, want error.", err)
144 }
145 return nil, err
146 }
147 var res []ipc.InterfaceSig
148 if results[0] != nil {
149 sig, ok := results[0].([]ipc.InterfaceSig)
150 if !ok {
151 return nil, fmt.Errorf("unexpected result value type. Got %T, want []ipc.InterfaceSig.", sig)
152 }
153 }
154 return res, nil
155}
156
157func (p *proxyInvoker) MethodSignature(ctx ipc.ServerContext, method string) (ipc.MethodSig, error) {
158 empty := ipc.MethodSig{}
159 call, ok := ctx.(ipc.ServerCall)
160 if !ok {
161 return empty, fmt.Errorf("couldn't upgrade ipc.ServerContext to ipc.ServerCall")
162 }
163 results, err := p.Invoke(ipc.ReservedMethodSignature, call, []interface{}{&method})
164 if err != nil {
165 return empty, err
166 }
167 if len(results) != 2 {
168 return empty, fmt.Errorf("unexpected number of result values. Got %d, want 2.", len(results))
169 }
170 if results[1] != nil {
171 err, ok := results[1].(error)
172 if !ok {
173 return empty, fmt.Errorf("unexpected error type. Got %T, want error.", err)
174 }
175 return empty, err
176 }
177 var res ipc.MethodSig
178 if results[0] != nil {
179 sig, ok := results[0].(ipc.MethodSig)
180 if !ok {
181 return empty, fmt.Errorf("unexpected result value type. Got %T, want ipc.MethodSig.", sig)
182 }
183 }
184 return res, nil
185}
186
Robin Thellend94bc4642014-11-05 18:05:08 -0800187func (p *proxyInvoker) VGlob() *ipc.GlobState {
Robin Thellendb16d7162014-11-07 13:47:26 -0800188 return &ipc.GlobState{VAllGlobber: p}
189}
190
Todd Wang1fe7cdd2014-11-12 12:51:49 -0800191func (p *proxyInvoker) Glob(ctx *ipc.GlobContextStub, pattern string) error {
Robin Thellendb16d7162014-11-07 13:47:26 -0800192 argptrs := []interface{}{&pattern}
Todd Wang1fe7cdd2014-11-12 12:51:49 -0800193 results, err := p.Invoke(ipc.GlobMethod, ctx, argptrs)
Robin Thellendb16d7162014-11-07 13:47:26 -0800194 if err != nil {
195 return err
196 }
197 if len(results) != 1 {
198 return fmt.Errorf("unexpected number of result values. Got %d, want 1.", len(results))
199 }
200 if results[0] == nil {
201 return nil
202 }
203 err, ok := results[0].(error)
204 if !ok {
205 return fmt.Errorf("unexpected result value type. Got %T, want error.", err)
206 }
207 return err
Robin Thellend94bc4642014-11-05 18:05:08 -0800208}
209
Robin Thellendb9dd9bb2014-10-29 13:54:08 -0700210// numResults returns the number of result values for the given method.
211func (p *proxyInvoker) numResults(method string) (int, error) {
Todd Wang5739dda2014-11-16 22:44:02 -0800212 // TODO(toddw): Replace this mechanism when the new signature mechanism is
213 // complete.
214 switch method {
215 case ipc.GlobMethod:
216 return 1, nil
217 case ipc.ReservedSignature, ipc.ReservedMethodSignature:
218 return 2, nil
219 }
Robin Thellendb9dd9bb2014-10-29 13:54:08 -0700220 sig, err := p.sigStub.Signature(nil)
221 if err != nil {
222 return 0, err
223 }
Robin Thellendb9dd9bb2014-10-29 13:54:08 -0700224 m, ok := sig.Methods[method]
225 if !ok {
226 return 0, fmt.Errorf("unknown method %q", method)
227 }
228 return len(m.OutArgs), nil
229}