blob: 9cb48f2d2ebc57e4fda20b28b89bc5d060c55001 [file] [log] [blame]
package server
import (
type flowFactory interface {
createFlow() *Flow
cleanupFlow(id int64)
type invokerFactory interface {
createInvoker(handle int64, signature signature.JSONServiceSignature) (ipc.Invoker, error)
type authFactory interface {
createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error)
type lookupReply struct {
Handle int64
HasAuthorizer bool
Signature string
Err *verror2.Standard
type dispatcherRequest struct {
ServerID uint64 `json:"serverId"`
Suffix string `json:"suffix"`
// dispatcher holds the invoker and the authorizer to be used for lookup.
type dispatcher struct {
mu sync.Mutex
serverID uint64
flowFactory flowFactory
invokerFactory invokerFactory
authFactory authFactory
logger vlog.Logger
outstandingLookups map[int64]chan lookupReply
var _ ipc.Dispatcher = (*dispatcher)(nil)
// newDispatcher is a dispatcher factory.
func newDispatcher(serverID uint64, flowFactory flowFactory, invokerFactory invokerFactory, authFactory authFactory, logger vlog.Logger) *dispatcher {
return &dispatcher{
serverID: serverID,
flowFactory: flowFactory,
invokerFactory: invokerFactory,
authFactory: authFactory,
logger: logger,
outstandingLookups: make(map[int64]chan lookupReply),
// Lookup implements dispatcher interface Lookup.
func (d *dispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
flow := d.flowFactory.createFlow()
ch := make(chan lookupReply, 1)
d.outstandingLookups[flow.ID] = ch
message := dispatcherRequest{
ServerID: d.serverID,
Suffix: suffix,
if err := flow.Writer.Send(lib.ResponseDispatcherLookup, message); err != nil {
ch <- lookupReply{Err: verror2.Convert(verror2.Internal, nil, err).(*verror2.Standard)}
request := <-ch
delete(d.outstandingLookups, flow.ID)
if request.Err != nil {
return nil, nil, request.Err
if request.Handle < 0 {
return nil, nil, verror2.Make(verror2.NoExist, nil, "Dispatcher", suffix)
var sig signature.JSONServiceSignature
b, err := hex.DecodeString(request.Signature)
if err != nil {
return nil, nil, verror2.Convert(verror2.Internal, nil, err)
buf := bytes.NewBuffer(b)
decoder, err := vom2.NewDecoder(buf)
if err != nil {
return nil, nil, verror2.Convert(verror2.Internal, nil, err)
if err := decoder.Decode(&sig); err != nil {
return nil, nil, verror2.Convert(verror2.Internal, nil, err)
invoker, err := d.invokerFactory.createInvoker(request.Handle, sig)
if err != nil {
return nil, nil, err
auth, err := d.authFactory.createAuthorizer(request.Handle, request.HasAuthorizer)
return invoker, auth, err
func (d *dispatcher) handleLookupResponse(id int64, data string) {
ch := d.outstandingLookups[id]
if ch == nil {
d.logger.Errorf("unknown invoke request for flow: %d", id)
var request lookupReply
decoder := json.NewDecoder(bytes.NewBufferString(data))
if err := decoder.Decode(&request); err != nil {
err2 := verror2.Convert(verror2.Internal, nil, err).(verror2.Standard)
request = lookupReply{Err: &err2}
d.logger.Errorf("unmarshaling invoke request failed: %v, %s", err, data)
ch <- request
// StopServing implements dispatcher StopServing.
func (*dispatcher) StopServing() {