blob: 3f4c73da43b4d6a84f937b374a398b37591ba843 [file] [log] [blame]
// An implementation of a server for WSPR
package server
import (
"encoding/json"
"fmt"
"sync"
"veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
vsecurity "veyron.io/veyron/veyron/security"
"veyron.io/veyron/veyron/services/wsprd/lib"
"veyron.io/veyron/veyron/services/wsprd/signature"
)
type Flow struct {
ID int64
Writer lib.ClientWriter
}
// A request from the proxy to javascript to handle an RPC
type serverRPCRequest struct {
ServerId uint64
Handle int64
Method string
Args []interface{}
Context serverRPCRequestContext
}
type publicID struct {
Handle int64
Names []string
}
// call context for a serverRPCRequest
type serverRPCRequestContext struct {
Suffix string
Name string
RemoteID publicID
}
// The response from the javascript server to the proxy.
type serverRPCReply struct {
Results []interface{}
Err *verror.Standard
}
type FlowHandler interface {
CreateNewFlow(server *Server, sender ipc.Stream) *Flow
CleanupFlow(id int64)
}
type HandleStore interface {
// Adds an identity to the store and returns handle to the identity
AddIdentity(identity security.PublicID) int64
}
type ServerHelper interface {
FlowHandler
HandleStore
GetLogger() vlog.Logger
RT() veyron2.Runtime
}
type authReply struct {
Err *verror.Standard
}
type context struct {
Method string `json:"method"`
Name string `json:"name"`
Suffix string `json:"suffix"`
Label security.Label `json:"label"`
LocalID publicID `json:"localId"`
RemoteID publicID `json:"remoteId"`
LocalEndpoint string `json:"localEndpoint"`
RemoteEndpoint string `json:"remoteEndpoint"`
}
type authRequest struct {
ServerID uint64 `json:"serverID"`
Handle int64 `json:"handle"`
Context context `json:"context"`
}
type Server struct {
mu sync.Mutex
// The server that handles the ipc layer. Listen on this server is
// lazily started.
server ipc.Server
// The saved dispatcher to reuse when serve is called multiple times.
dispatcher *dispatcher
// The endpoint of the server. This is empty until the server has been
// started and listen has been called on it.
endpoint string
// The server id.
id uint64
helper ServerHelper
// The proxy to listen through.
veyronProxy string
// The set of outstanding server requests.
outstandingServerRequests map[int64]chan *serverRPCReply
outstandingAuthRequests map[int64]chan error
}
func NewServer(id uint64, veyronProxy string, helper ServerHelper) (*Server, error) {
server := &Server{
id: id,
helper: helper,
veyronProxy: veyronProxy,
outstandingServerRequests: make(map[int64]chan *serverRPCReply),
outstandingAuthRequests: make(map[int64]chan error),
}
var err error
if server.server, err = helper.RT().NewServer(); err != nil {
return nil, err
}
return server, nil
}
// remoteInvokeFunc is a type of function that can invoke a remote method and
// communicate the result back via a channel to the caller
type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
func (s *Server) createRemoteInvokerFunc(handle int64) remoteInvokeFunc {
return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
flow := s.helper.CreateNewFlow(s, call)
replyChan := make(chan *serverRPCReply, 1)
s.mu.Lock()
s.outstandingServerRequests[flow.ID] = replyChan
s.mu.Unlock()
remoteID := call.RemoteID()
context := serverRPCRequestContext{
Suffix: call.Suffix(),
Name: call.Name(),
RemoteID: publicID{
Handle: s.helper.AddIdentity(remoteID),
Names: remoteID.Names(),
},
}
// Send a invocation request to JavaScript
message := serverRPCRequest{
ServerId: s.id,
Handle: handle,
Method: lib.LowercaseFirstCharacter(methodName),
Args: args,
Context: context,
}
if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
// Error in marshaling, pass the error through the channel immediately
replyChan <- &serverRPCReply{nil,
&verror.Standard{
ID: verror.Internal,
Msg: fmt.Sprintf("could not marshal the method call data: %v", err)},
}
return replyChan
}
s.helper.GetLogger().VI(3).Infof("request received to call method %q on "+
"JavaScript server with args %v, MessageId %d was assigned.",
methodName, args, flow.ID)
go proxyStream(call, flow.Writer, s.helper.GetLogger())
return replyChan
}
}
func proxyStream(stream ipc.Stream, w lib.ClientWriter, logger vlog.Logger) {
var item interface{}
for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
if err := w.Send(lib.ResponseStream, item); err != nil {
w.Error(verror.Internalf("error marshalling stream: %v:", err))
return
}
}
if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
w.Error(verror.Internalf("error closing stream: %v:", err))
return
}
}
func (s *Server) convertPublicID(id security.PublicID) publicID {
return publicID{
Handle: s.helper.AddIdentity(id),
Names: id.Names(),
}
}
type remoteAuthFunc func(security.Context) error
func (s *Server) createRemoteAuthFunc(handle int64) remoteAuthFunc {
return func(ctx security.Context) error {
flow := s.helper.CreateNewFlow(s, nil)
replyChan := make(chan error, 1)
s.mu.Lock()
s.outstandingAuthRequests[flow.ID] = replyChan
s.mu.Unlock()
message := authRequest{
ServerID: s.id,
Handle: handle,
Context: context{
Method: lib.LowercaseFirstCharacter(ctx.Method()),
Name: ctx.Name(),
Suffix: ctx.Suffix(),
Label: ctx.Label(),
LocalID: s.convertPublicID(ctx.LocalID()),
RemoteID: s.convertPublicID(ctx.RemoteID()),
LocalEndpoint: ctx.LocalEndpoint().String(),
RemoteEndpoint: ctx.RemoteEndpoint().String(),
},
}
s.helper.GetLogger().VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
if err := flow.Writer.Send(lib.ResponseAuthRequest, message); err != nil {
replyChan <- verror.Internalf("failed to find authorizer %v", err)
}
err := <-replyChan
s.helper.GetLogger().VI(0).Infof("going to respond with %v", err)
s.mu.Lock()
delete(s.outstandingAuthRequests, flow.ID)
s.mu.Unlock()
s.helper.CleanupFlow(flow.ID)
return err
}
}
func (s *Server) Serve(name string) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.dispatcher == nil {
s.dispatcher = newDispatcher(s.id, s, s, s, s.helper.GetLogger())
}
if s.endpoint == "" {
endpoint, err := s.server.Listen("veyron", s.veyronProxy)
if err != nil {
return "", err
}
s.endpoint = endpoint.String()
}
if err := s.server.Serve(name, s.dispatcher); err != nil {
return "", err
}
s.helper.GetLogger().VI(1).Infof("endpoint is %s", s.endpoint)
return s.endpoint, nil
}
func (s *Server) HandleServerResponse(id int64, data string) {
s.mu.Lock()
ch := s.outstandingServerRequests[id]
delete(s.outstandingServerRequests, id)
s.mu.Unlock()
if ch == nil {
s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
//Ignore unknown responses that don't belong to any channel
return
}
// Decode the result and send it through the channel
var serverReply serverRPCReply
if decoderErr := json.Unmarshal([]byte(data), &serverReply); decoderErr != nil {
err := verror.Standard{
ID: verror.Internal,
Msg: fmt.Sprintf("could not unmarshal the result from the server: %v", decoderErr),
}
serverReply = serverRPCReply{nil, &err}
}
s.helper.GetLogger().VI(3).Infof("response received from JavaScript server for "+
"MessageId %d with result %v", id, serverReply)
s.helper.CleanupFlow(id)
ch <- &serverReply
}
func (s *Server) HandleLookupResponse(id int64, data string) {
s.dispatcher.handleLookupResponse(id, data)
}
func (s *Server) HandleAuthResponse(id int64, data string) {
s.mu.Lock()
ch := s.outstandingAuthRequests[id]
s.mu.Unlock()
if ch == nil {
s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
//Ignore unknown responses that don't belong to any channel
return
}
// Decode the result and send it through the channel
var reply authReply
if decoderErr := json.Unmarshal([]byte(data), &reply); decoderErr != nil {
reply = authReply{Err: &verror.Standard{
ID: verror.Internal,
Msg: fmt.Sprintf("could not unmarshal the result from the server: %v", decoderErr),
}}
}
s.helper.GetLogger().VI(0).Infof("response received from JavaScript server for "+
"MessageId %d with result %v", id, reply)
s.helper.CleanupFlow(id)
// A nil verror.Standard does not result in an nil error. Instead, we have create
// a variable for the error interface and only set it's value if the struct is non-
// nil.
var err error
if reply.Err != nil {
err = reply.Err
}
ch <- err
}
func (s *Server) createFlow() *Flow {
return s.helper.CreateNewFlow(s, nil)
}
func (s *Server) cleanupFlow(id int64) {
s.helper.CleanupFlow(id)
}
func (s *Server) createInvoker(handle int64, sig signature.JSONServiceSignature, label security.Label) (ipc.Invoker, error) {
serviceSig, err := sig.ServiceSignature()
if err != nil {
return nil, err
}
remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
return newInvoker(serviceSig, label, remoteInvokeFunc), nil
}
func (s *Server) createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error) {
if hasAuthorizer {
return &authorizer{authFunc: s.createRemoteAuthFunc(handle)}, nil
}
return vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
security.AllPrincipals: security.AllLabels,
}}), nil
}
func (s *Server) Stop() {
result := serverRPCReply{
Results: []interface{}{nil},
Err: &verror.Standard{
ID: verror.Aborted,
Msg: "timeout",
},
}
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.outstandingServerRequests {
select {
case ch <- &result:
default:
}
}
s.outstandingServerRequests = make(map[int64]chan *serverRPCReply)
s.server.Stop()
}