blob: c9c5d071c0cb7348e41c3056de9b6f61205d053b [file] [log] [blame]
Jiri Simsa78b646f2014-10-08 10:23:05 -07001// An implementation of a server for WSPR
2
3package server
4
5import (
Shyam Jayaramanc96e1aa2014-11-12 16:42:39 -08006 "bytes"
7 "encoding/hex"
Jiri Simsa78b646f2014-10-08 10:23:05 -07008 "encoding/json"
Jiri Simsa78b646f2014-10-08 10:23:05 -07009 "sync"
Matt Rosencrantz4aabe572014-10-22 09:25:50 -070010 "time"
Jiri Simsa78b646f2014-10-08 10:23:05 -070011
Jiri Simsa78b646f2014-10-08 10:23:05 -070012 "veyron.io/wspr/veyron/services/wsprd/lib"
Ankure7889242014-10-20 18:37:29 -070013 "veyron.io/wspr/veyron/services/wsprd/principal"
Jiri Simsa78b646f2014-10-08 10:23:05 -070014 "veyron.io/wspr/veyron/services/wsprd/signature"
15
16 "veyron.io/veyron/veyron2"
17 "veyron.io/veyron/veyron2/ipc"
18 "veyron.io/veyron/veyron2/security"
Mike Burrowsb6689c22014-10-08 11:14:15 -070019 "veyron.io/veyron/veyron2/verror2"
Jiri Simsa78b646f2014-10-08 10:23:05 -070020 "veyron.io/veyron/veyron2/vlog"
Shyam Jayaramanc96e1aa2014-11-12 16:42:39 -080021 "veyron.io/veyron/veyron2/vom2"
Jiri Simsa78b646f2014-10-08 10:23:05 -070022)
23
24type Flow struct {
25 ID int64
26 Writer lib.ClientWriter
27}
28
29// A request from the proxy to javascript to handle an RPC
30type serverRPCRequest struct {
31 ServerId uint64
32 Handle int64
33 Method string
Shyam Jayaramanc96e1aa2014-11-12 16:42:39 -080034 Args string
Jiri Simsa78b646f2014-10-08 10:23:05 -070035 Context serverRPCRequestContext
36}
37
Jiri Simsa78b646f2014-10-08 10:23:05 -070038// call context for a serverRPCRequest
39type serverRPCRequestContext struct {
Ankurc814b322014-10-22 18:16:06 -070040 Suffix string
41 Name string
42 RemoteBlessings principal.BlessingsHandle
43 RemoteBlessingStrings []string
Ankur5b802242014-10-29 11:32:21 -070044 Timeout int64 // The time period (in ns) between now and the deadline.
Jiri Simsa78b646f2014-10-08 10:23:05 -070045}
46
47// The response from the javascript server to the proxy.
48type serverRPCReply struct {
49 Results []interface{}
Mike Burrowsb6689c22014-10-08 11:14:15 -070050 Err *verror2.Standard
Jiri Simsa78b646f2014-10-08 10:23:05 -070051}
52
53type FlowHandler interface {
54 CreateNewFlow(server *Server, sender ipc.Stream) *Flow
55
56 CleanupFlow(id int64)
57}
58
59type HandleStore interface {
Ankure7889242014-10-20 18:37:29 -070060 // Adds blessings to the store and returns handle to the blessings
61 AddBlessings(blessings security.Blessings) int64
Jiri Simsa78b646f2014-10-08 10:23:05 -070062}
63
64type ServerHelper interface {
65 FlowHandler
66 HandleStore
67
68 GetLogger() vlog.Logger
69
70 RT() veyron2.Runtime
71}
72
73type authReply struct {
Mike Burrowsb6689c22014-10-08 11:14:15 -070074 Err *verror2.Standard
Jiri Simsa78b646f2014-10-08 10:23:05 -070075}
76
77type context struct {
Ankure7889242014-10-20 18:37:29 -070078 Method string `json:"method"`
79 Name string `json:"name"`
80 Suffix string `json:"suffix"`
81 Label security.Label `json:"label"`
82 LocalBlessings principal.BlessingsHandle `json:"localBlessings"`
83 LocalBlessingStrings []string `json:"localBlessingStrings"`
84 RemoteBlessings principal.BlessingsHandle `json:"remoteBlessings"`
85 RemoteBlessingStrings []string `json:"remoteBlessingStrings"`
86 LocalEndpoint string `json:"localEndpoint"`
87 RemoteEndpoint string `json:"remoteEndpoint"`
Jiri Simsa78b646f2014-10-08 10:23:05 -070088}
89
90type authRequest struct {
91 ServerID uint64 `json:"serverID"`
92 Handle int64 `json:"handle"`
93 Context context `json:"context"`
94}
95
96type Server struct {
97 mu sync.Mutex
98
99 // The ipc.ListenSpec to use with server.Listen
100 listenSpec *ipc.ListenSpec
101
102 // The server that handles the ipc layer. Listen on this server is
103 // lazily started.
104 server ipc.Server
105
106 // The saved dispatcher to reuse when serve is called multiple times.
107 dispatcher *dispatcher
108
109 // The endpoint of the server. This is empty until the server has been
110 // started and listen has been called on it.
111 endpoint string
112
113 // The server id.
114 id uint64
115 helper ServerHelper
116
117 // The set of outstanding server requests.
118 outstandingServerRequests map[int64]chan *serverRPCReply
119
120 outstandingAuthRequests map[int64]chan error
121}
122
123func NewServer(id uint64, listenSpec *ipc.ListenSpec, helper ServerHelper) (*Server, error) {
124 server := &Server{
125 id: id,
126 helper: helper,
127 listenSpec: listenSpec,
128 outstandingServerRequests: make(map[int64]chan *serverRPCReply),
129 outstandingAuthRequests: make(map[int64]chan error),
130 }
131 var err error
132 if server.server, err = helper.RT().NewServer(); err != nil {
133 return nil, err
134 }
135 return server, nil
136}
137
138// remoteInvokeFunc is a type of function that can invoke a remote method and
139// communicate the result back via a channel to the caller
140type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
141
142func (s *Server) createRemoteInvokerFunc(handle int64) remoteInvokeFunc {
143 return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
144 flow := s.helper.CreateNewFlow(s, call)
145 replyChan := make(chan *serverRPCReply, 1)
146 s.mu.Lock()
147 s.outstandingServerRequests[flow.ID] = replyChan
148 s.mu.Unlock()
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700149
150 timeout := lib.JSIPCNoTimeout
151 if deadline, ok := call.Deadline(); ok {
152 timeout = lib.GoToJSDuration(deadline.Sub(time.Now()))
153 }
154
Jiri Simsa78b646f2014-10-08 10:23:05 -0700155 context := serverRPCRequestContext{
Ankur5b802242014-10-29 11:32:21 -0700156 Suffix: call.Suffix(),
157 Name: call.Name(),
158 Timeout: timeout,
159 RemoteBlessings: s.convertBlessingsToHandle(call.RemoteBlessings()),
160 RemoteBlessingStrings: call.RemoteBlessings().ForContext(call),
Jiri Simsa78b646f2014-10-08 10:23:05 -0700161 }
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700162
Shyam Jayaramanc96e1aa2014-11-12 16:42:39 -0800163 errHandler := func(err error) <-chan *serverRPCReply {
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700164 if ch := s.popServerRequest(flow.ID); ch != nil {
165 stdErr := verror2.Convert(verror2.Internal, call, err).(verror2.Standard)
166 ch <- &serverRPCReply{nil, &stdErr}
167 s.helper.CleanupFlow(flow.ID)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700168 }
169 return replyChan
Shyam Jayaramanc96e1aa2014-11-12 16:42:39 -0800170
171 }
172 var buf bytes.Buffer
173 encoder, err := vom2.NewBinaryEncoder(&buf)
174 if err != nil {
175 return errHandler(err)
176 }
177
178 if err := encoder.Encode(args); err != nil {
179 return errHandler(err)
180 }
181
182 // Send a invocation request to JavaScript
183 message := serverRPCRequest{
184 ServerId: s.id,
185 Handle: handle,
186 Method: lib.LowercaseFirstCharacter(methodName),
187 Args: hex.EncodeToString(buf.Bytes()),
188 Context: context,
189 }
190
191 if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
192 return errHandler(err)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700193 }
194
195 s.helper.GetLogger().VI(3).Infof("request received to call method %q on "+
196 "JavaScript server with args %v, MessageId %d was assigned.",
197 methodName, args, flow.ID)
198
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700199 // Watch for cancellation.
200 go func() {
201 <-call.Done()
202 ch := s.popServerRequest(flow.ID)
203 if ch == nil {
204 return
205 }
206
207 // Send a cancel message to the JS server.
208 flow.Writer.Send(lib.ResponseCancel, nil)
209 s.helper.CleanupFlow(flow.ID)
210
211 err := verror2.Convert(verror2.Aborted, call, call.Err()).(verror2.Standard)
212 ch <- &serverRPCReply{nil, &err}
213 }()
214
Jiri Simsa78b646f2014-10-08 10:23:05 -0700215 go proxyStream(call, flow.Writer, s.helper.GetLogger())
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700216
Jiri Simsa78b646f2014-10-08 10:23:05 -0700217 return replyChan
218 }
219}
220
221func proxyStream(stream ipc.Stream, w lib.ClientWriter, logger vlog.Logger) {
222 var item interface{}
223 for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
Shyam Jayaramanc96e1aa2014-11-12 16:42:39 -0800224 var buf bytes.Buffer
225 encoder, err := vom2.NewBinaryEncoder(&buf)
226 if err != nil {
227 w.Error(verror2.Convert(verror2.Internal, nil, err))
228 return
229 }
230
231 if err := encoder.Encode(item); err != nil {
232 w.Error(verror2.Convert(verror2.Internal, nil, err))
233 return
234 }
235
236 if err := w.Send(lib.ResponseStream, hex.EncodeToString(buf.Bytes())); err != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700237 w.Error(verror2.Convert(verror2.Internal, nil, err))
Jiri Simsa78b646f2014-10-08 10:23:05 -0700238 return
239 }
240 }
241
242 if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700243 w.Error(verror2.Convert(verror2.Internal, nil, err))
Jiri Simsa78b646f2014-10-08 10:23:05 -0700244 return
245 }
246}
247
Ankure7889242014-10-20 18:37:29 -0700248func (s *Server) convertBlessingsToHandle(blessings security.Blessings) principal.BlessingsHandle {
249 return *principal.ConvertBlessingsToHandle(blessings, s.helper.AddBlessings(blessings))
Jiri Simsa78b646f2014-10-08 10:23:05 -0700250}
251
252type remoteAuthFunc func(security.Context) error
253
254func (s *Server) createRemoteAuthFunc(handle int64) remoteAuthFunc {
255 return func(ctx security.Context) error {
256 flow := s.helper.CreateNewFlow(s, nil)
257 replyChan := make(chan error, 1)
258 s.mu.Lock()
259 s.outstandingAuthRequests[flow.ID] = replyChan
260 s.mu.Unlock()
261 message := authRequest{
262 ServerID: s.id,
263 Handle: handle,
264 Context: context{
Ankur5b802242014-10-29 11:32:21 -0700265 Method: lib.LowercaseFirstCharacter(ctx.Method()),
266 Name: ctx.Name(),
267 Suffix: ctx.Suffix(),
268 Label: ctx.Label(),
269 LocalEndpoint: ctx.LocalEndpoint().String(),
270 RemoteEndpoint: ctx.RemoteEndpoint().String(),
271 LocalBlessings: s.convertBlessingsToHandle(ctx.LocalBlessings()),
272 LocalBlessingStrings: ctx.LocalBlessings().ForContext(ctx),
273 RemoteBlessings: s.convertBlessingsToHandle(ctx.RemoteBlessings()),
274 RemoteBlessingStrings: ctx.RemoteBlessings().ForContext(ctx),
Jiri Simsa78b646f2014-10-08 10:23:05 -0700275 },
276 }
277 s.helper.GetLogger().VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
278
279 if err := flow.Writer.Send(lib.ResponseAuthRequest, message); err != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700280 replyChan <- verror2.Convert(verror2.Internal, nil, err)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700281 }
282
283 err := <-replyChan
284 s.helper.GetLogger().VI(0).Infof("going to respond with %v", err)
285 s.mu.Lock()
286 delete(s.outstandingAuthRequests, flow.ID)
287 s.mu.Unlock()
288 s.helper.CleanupFlow(flow.ID)
289 return err
290 }
291}
292
293func (s *Server) Serve(name string) (string, error) {
294 s.mu.Lock()
295 defer s.mu.Unlock()
296
297 if s.dispatcher == nil {
298 s.dispatcher = newDispatcher(s.id, s, s, s, s.helper.GetLogger())
299 }
300
301 if s.endpoint == "" {
Cosmos Nicolaou408de0f2014-10-24 13:32:29 -0700302 endpoint, err := s.server.Listen(*s.listenSpec)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700303 if err != nil {
304 return "", err
305 }
306 s.endpoint = endpoint.String()
307 }
Cosmos Nicolaou89303d62014-11-02 12:58:11 -0800308 if err := s.server.ServeDispatcher(name, s.dispatcher); err != nil {
Jiri Simsa78b646f2014-10-08 10:23:05 -0700309 return "", err
310 }
311 s.helper.GetLogger().VI(1).Infof("endpoint is %s", s.endpoint)
312 return s.endpoint, nil
313}
314
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700315func (s *Server) popServerRequest(id int64) chan *serverRPCReply {
Jiri Simsa78b646f2014-10-08 10:23:05 -0700316 s.mu.Lock()
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700317 defer s.mu.Unlock()
Jiri Simsa78b646f2014-10-08 10:23:05 -0700318 ch := s.outstandingServerRequests[id]
319 delete(s.outstandingServerRequests, id)
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700320
321 return ch
322}
323
324func (s *Server) HandleServerResponse(id int64, data string) {
325 ch := s.popServerRequest(id)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700326 if ch == nil {
327 s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
328 "for MessageId: %d exists. Ignoring the results.", id)
329 //Ignore unknown responses that don't belong to any channel
330 return
331 }
Matt Rosencrantz4aabe572014-10-22 09:25:50 -0700332
Jiri Simsa78b646f2014-10-08 10:23:05 -0700333 // Decode the result and send it through the channel
334 var serverReply serverRPCReply
335 if decoderErr := json.Unmarshal([]byte(data), &serverReply); decoderErr != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700336 err := verror2.Convert(verror2.Internal, nil, decoderErr).(verror2.Standard)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700337 serverReply = serverRPCReply{nil, &err}
338 }
339
340 s.helper.GetLogger().VI(3).Infof("response received from JavaScript server for "+
341 "MessageId %d with result %v", id, serverReply)
342 s.helper.CleanupFlow(id)
343 ch <- &serverReply
344}
345
346func (s *Server) HandleLookupResponse(id int64, data string) {
347 s.dispatcher.handleLookupResponse(id, data)
348}
349
350func (s *Server) HandleAuthResponse(id int64, data string) {
351 s.mu.Lock()
352 ch := s.outstandingAuthRequests[id]
353 s.mu.Unlock()
354 if ch == nil {
355 s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
356 "for MessageId: %d exists. Ignoring the results.", id)
357 //Ignore unknown responses that don't belong to any channel
358 return
359 }
360 // Decode the result and send it through the channel
361 var reply authReply
362 if decoderErr := json.Unmarshal([]byte(data), &reply); decoderErr != nil {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700363 err := verror2.Convert(verror2.Internal, nil, decoderErr).(verror2.Standard)
364 reply = authReply{Err: &err}
Jiri Simsa78b646f2014-10-08 10:23:05 -0700365 }
366
367 s.helper.GetLogger().VI(0).Infof("response received from JavaScript server for "+
368 "MessageId %d with result %v", id, reply)
369 s.helper.CleanupFlow(id)
370 // A nil verror.Standard does not result in an nil error. Instead, we have create
371 // a variable for the error interface and only set it's value if the struct is non-
372 // nil.
373 var err error
374 if reply.Err != nil {
375 err = reply.Err
376 }
377 ch <- err
378}
379
380func (s *Server) createFlow() *Flow {
381 return s.helper.CreateNewFlow(s, nil)
382}
383
384func (s *Server) cleanupFlow(id int64) {
385 s.helper.CleanupFlow(id)
386}
387
388func (s *Server) createInvoker(handle int64, sig signature.JSONServiceSignature, label security.Label) (ipc.Invoker, error) {
389 serviceSig, err := sig.ServiceSignature()
390 if err != nil {
391 return nil, err
392 }
393
394 remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
395 return newInvoker(serviceSig, label, remoteInvokeFunc), nil
396}
397
398func (s *Server) createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error) {
399 if hasAuthorizer {
400 return &authorizer{authFunc: s.createRemoteAuthFunc(handle)}, nil
401 }
Asim Shankar8572f6c2014-10-28 15:24:17 -0700402 return nil, nil
Jiri Simsa78b646f2014-10-08 10:23:05 -0700403}
404
405func (s *Server) Stop() {
Mike Burrowsb6689c22014-10-08 11:14:15 -0700406 stdErr := verror2.Make(verror2.Timeout, nil).(verror2.Standard)
Jiri Simsa78b646f2014-10-08 10:23:05 -0700407 result := serverRPCReply{
408 Results: []interface{}{nil},
Mike Burrowsb6689c22014-10-08 11:14:15 -0700409 Err: &stdErr,
Jiri Simsa78b646f2014-10-08 10:23:05 -0700410 }
411 s.mu.Lock()
412 defer s.mu.Unlock()
413 for _, ch := range s.outstandingServerRequests {
414 select {
415 case ch <- &result:
416 default:
417 }
418 }
419 s.outstandingServerRequests = make(map[int64]chan *serverRPCReply)
420 s.server.Stop()
421}
Ali Ghassemi1008bbe2014-11-07 16:36:08 -0800422
423func (s *Server) AddName(name string) error {
424 return s.server.AddName(name)
425}
426
427func (s *Server) RemoveName(name string) error {
428 return s.server.RemoveName(name)
429}