blob: 96573c99c8100e8a50a113f3b9c28114da75583e [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package server
import (
type flowFactory interface {
createFlow() *Flow
cleanupFlow(id int32)
type invokerFactory interface {
createInvoker(handle int32, signature []signature.Interface, hasGlobber bool) (rpc.Invoker, error)
type authFactory interface {
createAuthorizer(handle int32, hasAuthorizer bool) (security.Authorizer, error)
type dispatcherRequest struct {
ServerId uint32 `json:"serverId"`
Suffix string `json:"suffix"`
// dispatcher holds the invoker and the authorizer to be used for lookup.
type dispatcher struct {
mu sync.Mutex
serverId uint32
flowFactory flowFactory
invokerFactory invokerFactory
authFactory authFactory
outstandingLookups map[int32]chan LookupReply
vomHelper VomHelper
closed bool
var _ rpc.Dispatcher = (*dispatcher)(nil)
// newDispatcher is a dispatcher factory.
func newDispatcher(serverId uint32, flowFactory flowFactory, invokerFactory invokerFactory, authFactory authFactory, vomHelper VomHelper) *dispatcher {
return &dispatcher{
serverId: serverId,
flowFactory: flowFactory,
invokerFactory: invokerFactory,
authFactory: authFactory,
outstandingLookups: make(map[int32]chan LookupReply),
vomHelper: vomHelper,
func (d *dispatcher) Cleanup() {
d.closed = true
for _, ch := range d.outstandingLookups {
verr := NewErrServerStopped(nil).(verror.E)
ch <- LookupReply{Err: &verr}
// Lookup implements dispatcher interface Lookup.
func (d *dispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
// If the server has been closed, we immediately return a retryable error.
if d.closed {
return nil, nil, NewErrServerStopped(nil)
flow := d.flowFactory.createFlow()
ch := make(chan LookupReply, 1)
d.outstandingLookups[flow.ID] = ch
message := dispatcherRequest{
ServerId: d.serverId,
Suffix: suffix,
if err := flow.Writer.Send(lib.ResponseDispatcherLookup, message); err != nil {
verr := verror.Convert(verror.ErrInternal, nil, err).(verror.E)
ch <- LookupReply{Err: &verr}
reply := <-ch
delete(d.outstandingLookups, flow.ID)
if reply.Err != nil {
return nil, nil, reply.Err
if reply.Handle < 0 {
return nil, nil, verror.New(verror.ErrNoExist, nil, "Dispatcher", suffix)
invoker, err := d.invokerFactory.createInvoker(reply.Handle, reply.Signature, reply.HasGlobber)
if err != nil {
return nil, nil, err
auth, err := d.authFactory.createAuthorizer(reply.Handle, reply.HasAuthorizer)
if err != nil {
return nil, nil, err
return invoker, auth, nil
func (d *dispatcher) handleLookupResponse(ctx *context.T, id int32, data string) {
ch := d.outstandingLookups[id]
if ch == nil {
ctx.Errorf("unknown invoke request for flow: %d", id)
var lookupReply LookupReply
if err := lib.HexVomDecode(data, &lookupReply, d.vomHelper.TypeDecoder()); err != nil {
err2 := verror.Convert(verror.ErrInternal, nil, err)
lookupReply = LookupReply{Err: err2}
ctx.Errorf("unmarshaling invoke request failed: %v, %s", err, data)
ch <- lookupReply
// StopServing implements dispatcher StopServing.
func (*dispatcher) StopServing() {