| // An implementation of a server for WSPR |
| |
| package lib |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "strings" |
| "sync" |
| |
| "veyron2" |
| "veyron2/ipc" |
| "veyron2/security" |
| "veyron2/verror" |
| "veyron2/vlog" |
| "veyron2/vom" |
| ) |
| |
| type flow struct { |
| id int64 |
| writer clientWriter |
| } |
| |
| type serverHelper interface { |
| createNewFlow(server *server, sender sender) *flow |
| |
| cleanupFlow(id int64) |
| |
| getLogger() vlog.Logger |
| |
| rt() veyron2.Runtime |
| } |
| |
| type server struct { |
| sync.Mutex |
| |
| // The server that handles the ipc layer. Listen on this server is |
| // lazily started. |
| server ipc.Server |
| dispatcher exactMatchDispatcher |
| |
| // The endpoint of the server. This is empty until the server has been |
| // started and listen has been called on it. |
| endpoint string |
| |
| // The server id. |
| id uint64 |
| helper serverHelper |
| |
| // The proxy to listen through. |
| veyronProxy string |
| |
| // The set of outstanding server requests. |
| outstandingServerRequests map[int64]chan *serverRPCReply |
| } |
| |
| func newServer(id uint64, veyronProxy string, helper serverHelper) (*server, error) { |
| server := &server{ |
| id: id, |
| helper: helper, |
| veyronProxy: veyronProxy, |
| outstandingServerRequests: make(map[int64]chan *serverRPCReply), |
| dispatcher: exactMatchDispatcher{dispatchers: make(map[string]ipc.Dispatcher)}, |
| } |
| var err error |
| if server.server, err = helper.rt().NewServer(); err != nil { |
| return nil, err |
| } |
| return server, nil |
| } |
| |
| // remoteInvokeFunc is a type of function that can invoke a remote method and |
| // communicate the result back via a channel to the caller |
| type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply |
| |
| func (s *server) createRemoteInvokerFunc(serviceName string) remoteInvokeFunc { |
| return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply { |
| flow := s.helper.createNewFlow(s, senderWrapper{stream: call}) |
| replyChan := make(chan *serverRPCReply, 1) |
| s.Lock() |
| s.outstandingServerRequests[flow.id] = replyChan |
| s.Unlock() |
| context := serverRPCRequestContext{ |
| Suffix: call.Suffix(), |
| Name: call.Name(), |
| } |
| // Send a invocation request to JavaScript |
| message := serverRPCRequest{ |
| ServerId: s.id, |
| ServiceName: serviceName, |
| Method: lowercaseFirstCharacter(methodName), |
| Args: args, |
| Context: context, |
| } |
| |
| data := response{Type: responseServerRequest, Message: message} |
| if err := vom.ObjToJSON(flow.writer, vom.ValueOf(data)); err != nil { |
| // Error in marshaling, pass the error through the channel immediately |
| replyChan <- &serverRPCReply{nil, |
| &verror.Standard{ |
| ID: verror.Internal, |
| Msg: fmt.Sprintf("could not marshal the method call data: %v", err)}, |
| } |
| return replyChan |
| } |
| if err := flow.writer.FinishMessage(); err != nil { |
| replyChan <- &serverRPCReply{nil, |
| &verror.Standard{ |
| ID: verror.Internal, |
| Msg: fmt.Sprintf("WSPR: error finishing message: %v", err)}, |
| } |
| return replyChan |
| } |
| |
| s.helper.getLogger().VI(3).Infof("request received to call method %q on "+ |
| "JavaScript server %q with args %v, MessageId %d was assigned.", |
| methodName, serviceName, args, flow.id) |
| |
| go proxyStream(call, flow.writer, s.helper.getLogger()) |
| return replyChan |
| } |
| } |
| |
| func proxyStream(stream ipc.Stream, w clientWriter, logger vlog.Logger) { |
| var item interface{} |
| for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) { |
| data := response{Type: responseStream, Message: item} |
| if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil { |
| w.sendError(verror.Internalf("error marshalling stream: %v:", err)) |
| return |
| } |
| if err := w.FinishMessage(); err != nil { |
| logger.Error("WSPR: error finishing message", err) |
| return |
| } |
| } |
| |
| if err := vom.ObjToJSON(w, vom.ValueOf(response{Type: responseStreamClose})); err != nil { |
| w.sendError(verror.Internalf("error closing stream: %v:", err)) |
| return |
| } |
| if err := w.FinishMessage(); err != nil { |
| logger.Error("WSPR: error finishing message", err) |
| return |
| } |
| } |
| |
| type exactMatchDispatcher struct { |
| sync.Mutex |
| dispatchers map[string]ipc.Dispatcher |
| } |
| |
| func (em *exactMatchDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) { |
| parts := strings.Split(suffix, "/") |
| if len(parts) == 0 || len(parts[0]) == 0 { |
| return nil, nil, fmt.Errorf("can't extract first path component from %q", suffix) |
| } |
| name := parts[0] |
| em.Lock() |
| defer em.Unlock() |
| if disp := em.dispatchers[name]; disp == nil { |
| return nil, nil, fmt.Errorf("no dispatcher registered for %q, from %q", name, suffix) |
| } else { |
| suffix = strings.TrimLeft(suffix, "/") |
| suffix = strings.TrimPrefix(suffix, name) |
| suffix = strings.TrimLeft(suffix, "/") |
| return disp.Lookup(suffix) |
| } |
| } |
| |
| // register associates a dispatcher with name, where name cannot contain |
| // any /s. Incoming invocations of the form <name>/... will be passed |
| // on to the dispatcher with <name>/... as the parameter to its lookup |
| // method. |
| func (s *server) register(name string, sig JSONServiceSignature) error { |
| serviceSig, err := sig.ServiceSignature() |
| if err != nil { |
| return err |
| } |
| |
| if strings.Contains(name, "/") { |
| return fmt.Errorf("%q must not contain /", name) |
| } |
| |
| remoteInvokeFunc := s.createRemoteInvokerFunc(name) |
| invoker, err := newInvoker(serviceSig, remoteInvokeFunc) |
| |
| if err != nil { |
| return err |
| } |
| dispatcher := newDispatcher(invoker, security.NewACLAuthorizer( |
| security.ACL{security.AllPrincipals: security.AllLabels}, |
| )) |
| |
| s.dispatcher.Lock() |
| s.dispatcher.dispatchers[name] = dispatcher |
| s.dispatcher.Unlock() |
| return nil |
| } |
| |
| func (s *server) publish(name string) (string, error) { |
| s.Lock() |
| defer s.Unlock() |
| if s.endpoint == "" { |
| endpoint, err := s.server.Listen("veyron", s.veyronProxy) |
| |
| if err != nil { |
| return "", err |
| } |
| s.endpoint = endpoint.String() |
| } |
| if err := s.server.Serve(name, &s.dispatcher); err != nil { |
| return "", err |
| } |
| s.helper.getLogger().VI(1).Infof("endpoint is %s", s.endpoint) |
| return s.endpoint, nil |
| } |
| |
| func (s *server) handleServerResponse(id int64, data string) { |
| s.Lock() |
| ch := s.outstandingServerRequests[id] |
| delete(s.outstandingServerRequests, id) |
| s.Unlock() |
| if ch == nil { |
| s.helper.getLogger().Errorf("unexpected result from JavaScript. No channel "+ |
| "for MessageId: %d exists. Ignoring the results.", id) |
| //Ignore unknown responses that don't belong to any channel |
| return |
| } |
| // Decode the result and send it through the channel |
| var serverReply serverRPCReply |
| decoder := json.NewDecoder(bytes.NewBufferString(data)) |
| if decoderErr := decoder.Decode(&serverReply); decoderErr != nil { |
| err := verror.Standard{ |
| ID: verror.Internal, |
| Msg: fmt.Sprintf("could not unmarshal the result from the server: %v", decoderErr), |
| } |
| serverReply = serverRPCReply{nil, &err} |
| } |
| |
| s.helper.getLogger().VI(3).Infof("response received from JavaScript server for "+ |
| "MessageId %d with result %v", id, serverReply) |
| s.helper.cleanupFlow(id) |
| ch <- &serverReply |
| } |
| |
| func (s *server) Stop() { |
| result := serverRPCReply{ |
| Results: []interface{}{nil}, |
| Err: &verror.Standard{ |
| ID: verror.Aborted, |
| Msg: "timeout", |
| }, |
| } |
| s.Lock() |
| defer s.Unlock() |
| for _, ch := range s.outstandingServerRequests { |
| select { |
| case ch <- &result: |
| default: |
| } |
| } |
| s.outstandingServerRequests = make(map[int64]chan *serverRPCReply) |
| s.server.Stop() |
| } |