blob: df3483a77ef155bb4c9a1de81cbda1e3c697b495 [file] [log] [blame]
Jiri Simsa78b646f2014-10-08 10:23:05 -07001// An implementation of a server for WSPR
2
3package server
4
5import (
6 "encoding/json"
Jiri Simsa78b646f2014-10-08 10:23:05 -07007 "sync"
8
9 vsecurity "veyron.io/veyron/veyron/security"
10 "veyron.io/wspr/veyron/services/wsprd/lib"
11 "veyron.io/wspr/veyron/services/wsprd/signature"
12
13 "veyron.io/veyron/veyron2"
14 "veyron.io/veyron/veyron2/ipc"
15 "veyron.io/veyron/veyron2/security"
Mike Burrowsb6689c22014-10-08 11:14:15 -070016 "veyron.io/veyron/veyron2/verror2"
Jiri Simsa78b646f2014-10-08 10:23:05 -070017 "veyron.io/veyron/veyron2/vlog"
18)
19
20type Flow struct {
21 ID int64
22 Writer lib.ClientWriter
23}
24
25// A request from the proxy to javascript to handle an RPC
26type serverRPCRequest struct {
27 ServerId uint64
28 Handle int64
29 Method string
30 Args []interface{}
31 Context serverRPCRequestContext
32}
33
34type publicID struct {
35 Handle int64
36 Names []string
37}
38
39// call context for a serverRPCRequest
40type serverRPCRequestContext struct {
41 Suffix string
42 Name string
43 RemoteID publicID
44}
45
46// The response from the javascript server to the proxy.
47type serverRPCReply struct {
48 Results []interface{}
Mike Burrowsb6689c22014-10-08 11:14:15 -070049 Err *verror2.Standard
Jiri Simsa78b646f2014-10-08 10:23:05 -070050}
51
52type FlowHandler interface {
53 CreateNewFlow(server *Server, sender ipc.Stream) *Flow
54
55 CleanupFlow(id int64)
56}
57
58type HandleStore interface {
59 // Adds an identity to the store and returns handle to the identity
60 AddIdentity(identity security.PublicID) int64
61}
62
63type ServerHelper interface {
64 FlowHandler
65 HandleStore
66
67 GetLogger() vlog.Logger
68
69 RT() veyron2.Runtime
70}
71
72type authReply struct {
Mike Burrowsb6689c22014-10-08 11:14:15 -070073 Err *verror2.Standard
Jiri Simsa78b646f2014-10-08 10:23:05 -070074}
75
76type context struct {
77 Method string `json:"method"`
78 Name string `json:"name"`
79 Suffix string `json:"suffix"`
80 Label security.Label `json:"label"`
81 LocalID publicID `json:"localId"`
82 RemoteID publicID `json:"remoteId"`
83 LocalEndpoint string `json:"localEndpoint"`
84 RemoteEndpoint string `json:"remoteEndpoint"`
85}
86
87type authRequest struct {
88 ServerID uint64 `json:"serverID"`
89 Handle int64 `json:"handle"`
90 Context context `json:"context"`
91}
92
93type Server struct {
94 mu sync.Mutex
95
96 // The ipc.ListenSpec to use with server.Listen
97 listenSpec *ipc.ListenSpec
98
99 // The server that handles the ipc layer. Listen on this server is
100 // lazily started.
101 server ipc.Server
102
103 // The saved dispatcher to reuse when serve is called multiple times.
104 dispatcher *dispatcher
105
106 // The endpoint of the server. This is empty until the server has been
107 // started and listen has been called on it.
108 endpoint string
109
110 // The server id.
111 id uint64
112 helper ServerHelper
113
114 // The set of outstanding server requests.
115 outstandingServerRequests map[int64]chan *serverRPCReply
116
117 outstandingAuthRequests map[int64]chan error
118}
119
120func NewServer(id uint64, listenSpec *ipc.ListenSpec, helper ServerHelper) (*Server, error) {
121 server := &Server{
122 id: id,
123 helper: helper,
124 listenSpec: listenSpec,
125 outstandingServerRequests: make(map[int64]chan *serverRPCReply),
126 outstandingAuthRequests: make(map[int64]chan error),
127 }
128 var err error
129 if server.server, err = helper.RT().NewServer(); err != nil {
130 return nil, err
131 }
132 return server, nil
133}
134
135// remoteInvokeFunc is a type of function that can invoke a remote method and
136// communicate the result back via a channel to the caller
137type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
138
139func (s *Server) createRemoteInvokerFunc(handle int64) remoteInvokeFunc {
140 return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
141 flow := s.helper.CreateNewFlow(s, call)
142 replyChan := make(chan *serverRPCReply, 1)
143 s.mu.Lock()
144 s.outstandingServerRequests[flow.ID] = replyChan
145 s.mu.Unlock()
146 remoteID := call.RemoteID()
147 context := serverRPCRequestContext{
148 Suffix: call.Suffix(),
149 Name: call.Name(),
150 RemoteID: publicID{
151 Handle: s.helper.AddIdentity(remoteID),
152 Names: remoteID.Names(),
153 },
154 }
155 // Send a invocation request to JavaScript
156 message := serverRPCRequest{
157 ServerId: s.id,
158 Handle: handle,
159 Method: lib.LowercaseFirstCharacter(methodName),
160 Args: args,
161 Context: context,
162 }
163
164 if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
165 // Error in marshaling, pass the error through the channel immediately
Mike Burrowsb6689c22014-10-08 11:14:15 -0700166 stdErr := verror2.Convert(verror2.Internal, nil, err).(verror2.Standard)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700167 replyChan <- &serverRPCReply{nil,
Mike Burrowsb6689c22014-10-08 11:14:15 -0700168 &stdErr,
Jiri Simsa78b646f2014-10-08 10:23:05 -0700169 }
170 return replyChan
171 }
172
173 s.helper.GetLogger().VI(3).Infof("request received to call method %q on "+
174 "JavaScript server with args %v, MessageId %d was assigned.",
175 methodName, args, flow.ID)
176
177 go proxyStream(call, flow.Writer, s.helper.GetLogger())
178 return replyChan
179 }
180}
181
182func proxyStream(stream ipc.Stream, w lib.ClientWriter, logger vlog.Logger) {
183 var item interface{}
184 for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
185 if err := w.Send(lib.ResponseStream, item); err != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700186 w.Error(verror2.Convert(verror2.Internal, nil, err))
Jiri Simsa78b646f2014-10-08 10:23:05 -0700187 return
188 }
189 }
190
191 if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700192 w.Error(verror2.Convert(verror2.Internal, nil, err))
Jiri Simsa78b646f2014-10-08 10:23:05 -0700193 return
194 }
195}
196
197func (s *Server) convertPublicID(id security.PublicID) publicID {
198 return publicID{
199 Handle: s.helper.AddIdentity(id),
200 Names: id.Names(),
201 }
202
203}
204
205type remoteAuthFunc func(security.Context) error
206
207func (s *Server) createRemoteAuthFunc(handle int64) remoteAuthFunc {
208 return func(ctx security.Context) error {
209 flow := s.helper.CreateNewFlow(s, nil)
210 replyChan := make(chan error, 1)
211 s.mu.Lock()
212 s.outstandingAuthRequests[flow.ID] = replyChan
213 s.mu.Unlock()
214 message := authRequest{
215 ServerID: s.id,
216 Handle: handle,
217 Context: context{
218 Method: lib.LowercaseFirstCharacter(ctx.Method()),
219 Name: ctx.Name(),
220 Suffix: ctx.Suffix(),
221 Label: ctx.Label(),
222 LocalID: s.convertPublicID(ctx.LocalID()),
223 RemoteID: s.convertPublicID(ctx.RemoteID()),
224 LocalEndpoint: ctx.LocalEndpoint().String(),
225 RemoteEndpoint: ctx.RemoteEndpoint().String(),
226 },
227 }
228 s.helper.GetLogger().VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
229
230 if err := flow.Writer.Send(lib.ResponseAuthRequest, message); err != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700231 replyChan <- verror2.Convert(verror2.Internal, nil, err)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700232 }
233
234 err := <-replyChan
235 s.helper.GetLogger().VI(0).Infof("going to respond with %v", err)
236 s.mu.Lock()
237 delete(s.outstandingAuthRequests, flow.ID)
238 s.mu.Unlock()
239 s.helper.CleanupFlow(flow.ID)
240 return err
241 }
242}
243
244func (s *Server) Serve(name string) (string, error) {
245 s.mu.Lock()
246 defer s.mu.Unlock()
247
248 if s.dispatcher == nil {
249 s.dispatcher = newDispatcher(s.id, s, s, s, s.helper.GetLogger())
250 }
251
252 if s.endpoint == "" {
253 endpoint, err := s.server.ListenX(s.listenSpec)
254 if err != nil {
255 return "", err
256 }
257 s.endpoint = endpoint.String()
258 }
259 if err := s.server.Serve(name, s.dispatcher); err != nil {
260 return "", err
261 }
262 s.helper.GetLogger().VI(1).Infof("endpoint is %s", s.endpoint)
263 return s.endpoint, nil
264}
265
266func (s *Server) HandleServerResponse(id int64, data string) {
267 s.mu.Lock()
268 ch := s.outstandingServerRequests[id]
269 delete(s.outstandingServerRequests, id)
270 s.mu.Unlock()
271 if ch == nil {
272 s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
273 "for MessageId: %d exists. Ignoring the results.", id)
274 //Ignore unknown responses that don't belong to any channel
275 return
276 }
277 // Decode the result and send it through the channel
278 var serverReply serverRPCReply
279 if decoderErr := json.Unmarshal([]byte(data), &serverReply); decoderErr != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700280 err := verror2.Convert(verror2.Internal, nil, decoderErr).(verror2.Standard)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700281 serverReply = serverRPCReply{nil, &err}
282 }
283
284 s.helper.GetLogger().VI(3).Infof("response received from JavaScript server for "+
285 "MessageId %d with result %v", id, serverReply)
286 s.helper.CleanupFlow(id)
287 ch <- &serverReply
288}
289
290func (s *Server) HandleLookupResponse(id int64, data string) {
291 s.dispatcher.handleLookupResponse(id, data)
292}
293
294func (s *Server) HandleAuthResponse(id int64, data string) {
295 s.mu.Lock()
296 ch := s.outstandingAuthRequests[id]
297 s.mu.Unlock()
298 if ch == nil {
299 s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
300 "for MessageId: %d exists. Ignoring the results.", id)
301 //Ignore unknown responses that don't belong to any channel
302 return
303 }
304 // Decode the result and send it through the channel
305 var reply authReply
306 if decoderErr := json.Unmarshal([]byte(data), &reply); decoderErr != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700307 err := verror2.Convert(verror2.Internal, nil, decoderErr).(verror2.Standard)
308 reply = authReply{Err: &err}
Jiri Simsa78b646f2014-10-08 10:23:05 -0700309 }
310
311 s.helper.GetLogger().VI(0).Infof("response received from JavaScript server for "+
312 "MessageId %d with result %v", id, reply)
313 s.helper.CleanupFlow(id)
314 // A nil verror.Standard does not result in an nil error. Instead, we have create
315 // a variable for the error interface and only set it's value if the struct is non-
316 // nil.
317 var err error
318 if reply.Err != nil {
319 err = reply.Err
320 }
321 ch <- err
322}
323
324func (s *Server) createFlow() *Flow {
325 return s.helper.CreateNewFlow(s, nil)
326}
327
328func (s *Server) cleanupFlow(id int64) {
329 s.helper.CleanupFlow(id)
330}
331
332func (s *Server) createInvoker(handle int64, sig signature.JSONServiceSignature, label security.Label) (ipc.Invoker, error) {
333 serviceSig, err := sig.ServiceSignature()
334 if err != nil {
335 return nil, err
336 }
337
338 remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
339 return newInvoker(serviceSig, label, remoteInvokeFunc), nil
340}
341
342func (s *Server) createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error) {
343 if hasAuthorizer {
344 return &authorizer{authFunc: s.createRemoteAuthFunc(handle)}, nil
345 }
346 return vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
347 security.AllPrincipals: security.AllLabels,
348 }}), nil
349}
350
351func (s *Server) Stop() {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700352 stdErr := verror2.Make(verror2.Timeout, nil).(verror2.Standard)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700353 result := serverRPCReply{
354 Results: []interface{}{nil},
Mike Burrowsb6689c22014-10-08 11:14:15 -0700355 Err: &stdErr,
Jiri Simsa78b646f2014-10-08 10:23:05 -0700356 }
357 s.mu.Lock()
358 defer s.mu.Unlock()
359 for _, ch := range s.outstandingServerRequests {
360 select {
361 case ch <- &result:
362 default:
363 }
364 }
365 s.outstandingServerRequests = make(map[int64]chan *serverRPCReply)
366 s.server.Stop()
367}