blob: ca83e40eec9f802b45e5f21d5571e9c2df854f87 [file] [log] [blame]
package server
import (
"bytes"
"encoding/json"
"fmt"
"sync"
"veyron.io/veyron/veyron/services/wsprd/lib"
"veyron.io/veyron/veyron/services/wsprd/signature"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
)
type flowFactory interface {
createFlow() *Flow
cleanupFlow(id int64)
}
type invokerFactory interface {
createInvoker(handle int64, signature signature.JSONServiceSignature, label security.Label) (ipc.Invoker, error)
}
type authFactory interface {
createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error)
}
type lookupReply struct {
Handle int64
HasAuthorizer bool
Label security.Label
Signature signature.JSONServiceSignature
Err *verror.Standard
}
type dispatcherRequest struct {
ServerID uint64 `json:"serverId"`
Suffix string `json:"suffix"`
Method string `json:"method"`
}
// dispatcher holds the invoker and the authorizer to be used for lookup.
type dispatcher struct {
mu sync.Mutex
serverID uint64
flowFactory flowFactory
invokerFactory invokerFactory
authFactory authFactory
logger vlog.Logger
outstandingLookups map[int64]chan lookupReply
}
// newDispatcher is a dispatcher factory.
func newDispatcher(serverID uint64, flowFactory flowFactory, invokerFactory invokerFactory, authFactory authFactory, logger vlog.Logger) *dispatcher {
return &dispatcher{
serverID: serverID,
flowFactory: flowFactory,
invokerFactory: invokerFactory,
authFactory: authFactory,
logger: logger,
outstandingLookups: make(map[int64]chan lookupReply),
}
}
// Lookup implements dispatcher interface Lookup.
func (d *dispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
flow := d.flowFactory.createFlow()
d.mu.Lock()
ch := make(chan lookupReply, 1)
d.outstandingLookups[flow.ID] = ch
d.mu.Unlock()
message := dispatcherRequest{
ServerID: d.serverID,
Suffix: suffix,
Method: lib.LowercaseFirstCharacter(method),
}
if err := flow.Writer.Send(lib.ResponseDispatcherLookup, message); err != nil {
ch <- lookupReply{
Err: &verror.Standard{
ID: verror.Internal,
Msg: fmt.Sprintf("could not marshal the method call data: %v", err),
},
}
}
request := <-ch
d.mu.Lock()
delete(d.outstandingLookups, flow.ID)
d.mu.Unlock()
d.flowFactory.cleanupFlow(flow.ID)
if request.Err != nil {
return nil, nil, request.Err
}
if request.Handle < 0 {
return nil, nil, verror.NoExistf("ipc: dispatcher for %s not found", suffix)
}
invoker, err := d.invokerFactory.createInvoker(request.Handle, request.Signature, request.Label)
if err != nil {
return nil, nil, err
}
auth, err := d.authFactory.createAuthorizer(request.Handle, request.HasAuthorizer)
return invoker, auth, err
}
func (d *dispatcher) handleLookupResponse(id int64, data string) {
d.mu.Lock()
ch := d.outstandingLookups[id]
d.mu.Unlock()
if ch == nil {
d.flowFactory.cleanupFlow(id)
d.logger.Errorf("unknown invoke request for flow: %d", id)
return
}
var request lookupReply
decoder := json.NewDecoder(bytes.NewBufferString(data))
if err := decoder.Decode(&request); err != nil {
request = lookupReply{
Err: &verror.Standard{
ID: verror.Internal,
Msg: fmt.Sprintf("could not unmarshal invoke request: %v", err),
},
}
d.logger.Errorf("unmarshaling invoke request failed: %v", err)
}
ch <- request
}
// StopServing implements dispatcher StopServing.
func (*dispatcher) StopServing() {
}