blob: 850e742e995a773bea785d223d5f3fa3fb6a0951 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// An implementation of a server for WSPR
package server
import (
"encoding/json"
"fmt"
"runtime"
"sync"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/vdl"
"v.io/v23/vdlroot/signature"
vdltime "v.io/v23/vdlroot/time"
"v.io/v23/verror"
"v.io/v23/vtrace"
"v.io/x/lib/vlog"
"v.io/x/ref/services/wspr/internal/lib"
"v.io/x/ref/services/wspr/internal/principal"
)
type Flow struct {
ID int32
Writer lib.ClientWriter
}
type FlowHandler interface {
CreateNewFlow(server interface{}, sender rpc.Stream) *Flow
CleanupFlow(id int32)
}
type HandleStore interface {
GetBlessings(handle principal.BlessingsHandle) security.Blessings
// Gets or adds blessings to the store and returns handle to the blessings
GetOrAddBlessingsHandle(blessings security.Blessings) principal.BlessingsHandle
}
type ServerHelper interface {
FlowHandler
HandleStore
SendLogMessage(level lib.LogLevel, msg string) error
Context() *context.T
}
type authReply struct {
Err *verror.E
}
// AuthRequest is a request for a javascript authorizer to run
// This is exported to make the app test easier.
type AuthRequest struct {
ServerId uint32 `json:"serverId"`
Handle int32 `json:"handle"`
Call SecurityCall `json:"call"`
}
type Server struct {
// serverStateLock should be aquired when starting or stopping the server.
// This should be locked before outstandingRequestLock.
serverStateLock sync.Mutex
// The rpc.ListenSpec to use with server.Listen
listenSpec *rpc.ListenSpec
// The server that handles the rpc layer. Listen on this server is
// lazily started.
server rpc.Server
// The saved dispatcher to reuse when serve is called multiple times.
dispatcher *dispatcher
// Whether the server is listening.
isListening bool
// The server id.
id uint32
helper ServerHelper
// outstandingRequestLock should be acquired only to update the outstanding request maps below.
outstandingRequestLock sync.Mutex
outstandingServerRequests map[int32]chan *lib.ServerRpcReply // GUARDED_BY outstandingRequestLock
outstandingAuthRequests map[int32]chan error // GUARDED_BY outstandingRequestLock
outstandingValidationRequests map[int32]chan []error // GUARDED_BY outstandingRequestLock
// statusClose will be closed when the server is shutting down, this will
// cause the status poller to exit and cleanup any waiting requests.
statusClose chan struct{}
}
type serverContextKey struct{}
func NewServer(id uint32, listenSpec *rpc.ListenSpec, helper ServerHelper) (*Server, error) {
server := &Server{
id: id,
helper: helper,
listenSpec: listenSpec,
outstandingServerRequests: make(map[int32]chan *lib.ServerRpcReply),
outstandingAuthRequests: make(map[int32]chan error),
outstandingValidationRequests: make(map[int32]chan []error),
statusClose: make(chan struct{}, 1),
}
var err error
ctx := helper.Context()
ctx = context.WithValue(ctx, serverContextKey{}, server)
if server.server, err = v23.NewServer(ctx); err != nil {
return nil, err
}
return server, nil
}
// remoteInvokeFunc is a type of function that can invoke a remote method and
// communicate the result back via a channel to the caller
type remoteInvokeFunc func(ctx *context.T, call rpc.StreamServerCall, methodName string, args []interface{}) <-chan *lib.ServerRpcReply
func (s *Server) createRemoteInvokerFunc(handle int32) remoteInvokeFunc {
return func(ctx *context.T, call rpc.StreamServerCall, methodName string, args []interface{}) <-chan *lib.ServerRpcReply {
replyChan := make(chan *lib.ServerRpcReply, 1)
select {
case <-s.statusClose:
retryError := NewErrServerStopped(nil).(verror.E)
replyChan <- &lib.ServerRpcReply{nil, &retryError, vtrace.Response{}}
return replyChan
}
securityCall := ConvertSecurityCall(s.helper, ctx, call.Security(), true)
flow := s.helper.CreateNewFlow(s, call)
s.outstandingRequestLock.Lock()
s.outstandingServerRequests[flow.ID] = replyChan
s.outstandingRequestLock.Unlock()
var timeout vdltime.Deadline
if deadline, ok := ctx.Deadline(); ok {
timeout.Time = deadline
}
errHandler := func(err error) <-chan *lib.ServerRpcReply {
if ch := s.popServerRequest(flow.ID); ch != nil {
stdErr := verror.Convert(verror.ErrInternal, ctx, err).(verror.E)
ch <- &lib.ServerRpcReply{nil, &stdErr, vtrace.Response{}}
s.helper.CleanupFlow(flow.ID)
}
return replyChan
}
var grantedBlessings *principal.JsBlessings
if !call.GrantedBlessings().IsZero() {
grantedBlessings = convertBlessingsToHandle(s.helper, call.GrantedBlessings())
}
rpcCall := ServerRpcRequestCall{
SecurityCall: securityCall,
Deadline: timeout,
TraceRequest: vtrace.GetRequest(ctx),
GrantedBlessings: grantedBlessings,
}
var vdlValArgs []*vdl.Value = make([]*vdl.Value, len(args))
for i, arg := range args {
if blessings, ok := arg.(security.Blessings); ok {
arg = principal.ConvertBlessingsToHandle(blessings, s.helper.GetOrAddBlessingsHandle(blessings))
}
vdlValArgs[i] = vdl.ValueOf(arg)
}
// Send a invocation request to JavaScript
message := ServerRpcRequest{
ServerId: s.id,
Handle: handle,
Method: lib.LowercaseFirstCharacter(methodName),
Args: vdlValArgs,
Call: rpcCall,
}
vomMessage, err := lib.VomEncode(message)
if err != nil {
return errHandler(err)
}
if err := flow.Writer.Send(lib.ResponseServerRequest, vomMessage); err != nil {
return errHandler(err)
}
vlog.VI(3).Infof("calling method %q with args %v, MessageID %d assigned\n", methodName, args, flow.ID)
// Watch for cancellation.
go func() {
<-ctx.Done()
ch := s.popServerRequest(flow.ID)
if ch == nil {
return
}
// Send a cancel message to the JS server.
flow.Writer.Send(lib.ResponseCancel, nil)
s.helper.CleanupFlow(flow.ID)
err := verror.Convert(verror.ErrAborted, ctx, ctx.Err()).(verror.E)
ch <- &lib.ServerRpcReply{nil, &err, vtrace.Response{}}
}()
go proxyStream(call, flow.Writer, s.helper)
return replyChan
}
}
type globStream struct {
ch chan naming.GlobReply
ctx *context.T
}
func (g *globStream) Send(item interface{}) error {
if v, ok := item.(naming.GlobReply); ok {
g.ch <- v
return nil
}
return verror.New(verror.ErrBadArg, g.ctx, item)
}
func (g *globStream) Recv(itemptr interface{}) error {
return verror.New(verror.ErrNoExist, g.ctx, "Can't call recieve on glob stream")
}
func (g *globStream) CloseSend() error {
close(g.ch)
return nil
}
// remoteGlobFunc is a type of function that can invoke a remote glob and
// communicate the result back via the channel returned
type remoteGlobFunc func(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error)
func (s *Server) createRemoteGlobFunc(handle int32) remoteGlobFunc {
return func(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
// Until the tests get fixed, we need to create a security context before creating the flow
// because creating the security context creates a flow and flow ids will be off.
// See https://github.com/veyron/release-issues/issues/1181
securityCall := ConvertSecurityCall(s.helper, ctx, call.Security(), true)
globChan := make(chan naming.GlobReply, 1)
flow := s.helper.CreateNewFlow(s, &globStream{
ch: globChan,
ctx: ctx,
})
select {
case <-s.statusClose:
close(globChan)
return globChan, nil
}
replyChan := make(chan *lib.ServerRpcReply, 1)
s.outstandingRequestLock.Lock()
s.outstandingServerRequests[flow.ID] = replyChan
s.outstandingRequestLock.Unlock()
var timeout vdltime.Deadline
if deadline, ok := ctx.Deadline(); ok {
timeout.Time = deadline
}
errHandler := func(err error) (<-chan naming.GlobReply, error) {
if ch := s.popServerRequest(flow.ID); ch != nil {
s.helper.CleanupFlow(flow.ID)
}
return nil, verror.Convert(verror.ErrInternal, ctx, err).(verror.E)
}
var grantedBlessings *principal.JsBlessings
if !call.GrantedBlessings().IsZero() {
grantedBlessings = convertBlessingsToHandle(s.helper, call.GrantedBlessings())
}
rpcCall := ServerRpcRequestCall{
SecurityCall: securityCall,
Deadline: timeout,
GrantedBlessings: grantedBlessings,
}
// Send a invocation request to JavaScript
message := ServerRpcRequest{
ServerId: s.id,
Handle: handle,
Method: "Glob__",
Args: []*vdl.Value{vdl.ValueOf(pattern)},
Call: rpcCall,
}
vomMessage, err := lib.VomEncode(message)
if err != nil {
return errHandler(err)
}
if err := flow.Writer.Send(lib.ResponseServerRequest, vomMessage); err != nil {
return errHandler(err)
}
vlog.VI(3).Infof("calling method 'Glob__' with args %v, MessageID %d assigned\n", []interface{}{pattern}, flow.ID)
// Watch for cancellation.
go func() {
<-ctx.Done()
ch := s.popServerRequest(flow.ID)
if ch == nil {
return
}
// Send a cancel message to the JS server.
flow.Writer.Send(lib.ResponseCancel, nil)
s.helper.CleanupFlow(flow.ID)
err := verror.Convert(verror.ErrAborted, ctx, ctx.Err()).(verror.E)
ch <- &lib.ServerRpcReply{nil, &err, vtrace.Response{}}
}()
return globChan, nil
}
}
func proxyStream(stream rpc.Stream, w lib.ClientWriter, blessingsCache HandleStore) {
var item interface{}
for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
if blessings, ok := item.(security.Blessings); ok {
item = principal.ConvertBlessingsToHandle(blessings, blessingsCache.GetOrAddBlessingsHandle(blessings))
}
vomItem, err := lib.VomEncode(item)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, nil, err))
return
}
if err := w.Send(lib.ResponseStream, vomItem); err != nil {
w.Error(verror.Convert(verror.ErrInternal, nil, err))
return
}
}
if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
w.Error(verror.Convert(verror.ErrInternal, nil, err))
return
}
}
func convertBlessingsToHandle(helper ServerHelper, blessings security.Blessings) *principal.JsBlessings {
return principal.ConvertBlessingsToHandle(blessings, helper.GetOrAddBlessingsHandle(blessings))
}
func makeListOfErrors(numErrors int, err error) []error {
errs := make([]error, numErrors)
for i := 0; i < numErrors; i++ {
errs[i] = err
}
return errs
}
// caveatValidationInGo validates caveats in Go, using the default logic.
func caveatValidationInGo(ctx *context.T, call security.Call, sets [][]security.Caveat) []error {
results := make([]error, len(sets))
for i, set := range sets {
for _, cav := range set {
if err := cav.Validate(ctx, call); err != nil {
results[i] = err
break
}
}
}
return results
}
// caveatValidationInJavascript validates caveats in javascript. It resolves
// each []security.Caveat in cavs to an error (or nil) and collects them in a
// slice.
func (s *Server) caveatValidationInJavascript(ctx *context.T, call security.Call, cavs [][]security.Caveat) []error {
select {
case <-s.statusClose:
return make([]error, len(cavs))
}
flow := s.helper.CreateNewFlow(s, nil)
req := CaveatValidationRequest{
Call: ConvertSecurityCall(s.helper, ctx, call, false),
Cavs: cavs,
}
replyChan := make(chan []error, 1)
s.outstandingRequestLock.Lock()
s.outstandingValidationRequests[flow.ID] = replyChan
s.outstandingRequestLock.Unlock()
defer func() {
s.outstandingRequestLock.Lock()
delete(s.outstandingValidationRequests, flow.ID)
s.outstandingRequestLock.Unlock()
s.cleanupFlow(flow.ID)
}()
if err := flow.Writer.Send(lib.ResponseValidate, req); err != nil {
vlog.VI(2).Infof("Failed to send validate response: %v", err)
replyChan <- makeListOfErrors(len(cavs), err)
}
// TODO(bprosnitz) Consider using a different timeout than the standard rpc timeout.
var timeoutChan <-chan time.Time
if deadline, ok := ctx.Deadline(); ok {
timeoutChan = time.After(deadline.Sub(time.Now()))
}
select {
case <-timeoutChan:
return makeListOfErrors(len(cavs), NewErrCaveatValidationTimeout(ctx))
case reply := <-replyChan:
if len(reply) != len(cavs) {
vlog.VI(2).Infof("Wspr caveat validator received %d results from javascript but expected %d", len(reply), len(cavs))
return makeListOfErrors(len(cavs), NewErrInvalidValidationResponseFromJavascript(ctx))
}
return reply
}
}
// CaveatValidation implements a function suitable for passing to
// security.OverrideCaveatValidation.
//
// Certain caveats (PublicKeyThirdPartyCaveatX) are intercepted and handled in
// go, while all other caveats are evaluated in javascript.
func CaveatValidation(ctx *context.T, call security.Call, cavs [][]security.Caveat) []error {
// If the server isn't set in the context, we just perform validation in Go.
ctxServer := ctx.Value(serverContextKey{})
if ctxServer == nil {
return caveatValidationInGo(ctx, call, cavs)
}
// Otherwise we run our special logic.
server := ctxServer.(*Server)
type validationStatus struct {
err error
isSet bool
}
valStatus := make([]validationStatus, len(cavs))
var caveatChainsToValidate [][]security.Caveat
nextCav:
for i, chainCavs := range cavs {
var newChainCavs []security.Caveat
for _, cav := range chainCavs {
// If the server is closed handle all caveats in Go, because Javascript is
// no longer there.
select {
case <-server.statusClose:
vlog.VI(0).Infof("Validating caveats")
res := cav.Validate(ctx, call)
if res != nil {
valStatus[i] = validationStatus{
err: res,
isSet: true,
}
continue nextCav
}
default:
}
switch cav.Id {
case security.PublicKeyThirdPartyCaveatX.Id:
res := cav.Validate(ctx, call)
if res != nil {
valStatus[i] = validationStatus{
err: res,
isSet: true,
}
continue nextCav
}
default:
newChainCavs = append(newChainCavs, cav)
}
}
if len(newChainCavs) == 0 {
valStatus[i] = validationStatus{
err: nil,
isSet: true,
}
} else {
caveatChainsToValidate = append(caveatChainsToValidate, newChainCavs)
}
}
jsRes := server.caveatValidationInJavascript(ctx, call, caveatChainsToValidate)
outResults := make([]error, len(cavs))
jsIndex := 0
for i, status := range valStatus {
if status.isSet {
outResults[i] = status.err
} else {
outResults[i] = jsRes[jsIndex]
jsIndex++
}
}
return outResults
}
func ConvertSecurityCall(helper ServerHelper, ctx *context.T, call security.Call, includeBlessingStrings bool) SecurityCall {
var localEndpoint string
if call.LocalEndpoint() != nil {
localEndpoint = call.LocalEndpoint().String()
}
var remoteEndpoint string
if call.RemoteEndpoint() != nil {
remoteEndpoint = call.RemoteEndpoint().String()
}
var localBlessings principal.JsBlessings
if !call.LocalBlessings().IsZero() {
localBlessings = *convertBlessingsToHandle(helper, call.LocalBlessings())
}
anymtags := make([]*vdl.Value, len(call.MethodTags()))
for i, mtag := range call.MethodTags() {
anymtags[i] = mtag
}
secCall := SecurityCall{
Method: lib.LowercaseFirstCharacter(call.Method()),
Suffix: call.Suffix(),
MethodTags: anymtags,
LocalEndpoint: localEndpoint,
RemoteEndpoint: remoteEndpoint,
LocalBlessings: localBlessings,
RemoteBlessings: *convertBlessingsToHandle(helper, call.RemoteBlessings()),
}
if includeBlessingStrings {
secCall.LocalBlessingStrings = security.LocalBlessingNames(ctx, call)
secCall.RemoteBlessingStrings, _ = security.RemoteBlessingNames(ctx, call)
}
return secCall
}
type remoteAuth struct {
Func func(*context.T, security.Call, int32) error
Handle int32
}
func (r remoteAuth) Authorize(ctx *context.T, call security.Call) error {
return r.Func(ctx, call, r.Handle)
}
func (s *Server) createRemoteAuthorizer(handle int32) security.Authorizer {
return remoteAuth{s.authorizeRemote, handle}
}
func (s *Server) authorizeRemote(ctx *context.T, call security.Call, handle int32) error {
select {
case <-s.statusClose:
return NewErrServerStopped(nil).(verror.E)
}
// Until the tests get fixed, we need to create a security context before
// creating the flow because creating the security context creates a flow and
// flow ids will be off.
securityCall := ConvertSecurityCall(s.helper, ctx, call, true)
replyChan := make(chan error, 1)
flow := s.helper.CreateNewFlow(s, nil)
s.outstandingRequestLock.Lock()
s.outstandingAuthRequests[flow.ID] = replyChan
s.outstandingRequestLock.Unlock()
message := AuthRequest{
ServerId: s.id,
Handle: handle,
Call: securityCall,
}
vlog.VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
vomMessage, err := lib.VomEncode(message)
if err != nil {
replyChan <- verror.Convert(verror.ErrInternal, nil, err)
} else if err := flow.Writer.Send(lib.ResponseAuthRequest, vomMessage); err != nil {
replyChan <- verror.Convert(verror.ErrInternal, nil, err)
}
select {
case err = <-replyChan:
case <-s.statusClose:
err = NewErrServerStopped(nil).(verror.E)
}
vlog.VI(0).Infof("going to respond with %v", err)
s.outstandingRequestLock.Lock()
delete(s.outstandingAuthRequests, flow.ID)
s.outstandingRequestLock.Unlock()
s.helper.CleanupFlow(flow.ID)
return err
}
func (s *Server) readStatus() {
// A map of names to the last error message sent.
lastErrors := map[string]string{}
for {
status := s.server.Status()
for _, mountStatus := range status.Mounts {
var errMsg string
if mountStatus.LastMountErr != nil {
errMsg = mountStatus.LastMountErr.Error()
}
mountName := mountStatus.Name
if lastMessage, ok := lastErrors[mountName]; !ok || errMsg != lastMessage {
if errMsg == "" {
s.helper.SendLogMessage(
lib.LogLevelInfo, "serve: "+mountName+" successfully mounted ")
} else {
s.helper.SendLogMessage(
lib.LogLevelError, "serve: "+mountName+" failed with: "+errMsg)
}
}
lastErrors[mountName] = errMsg
}
select {
case <-time.After(10 * time.Second):
continue
case <-s.statusClose:
return
}
}
}
func (s *Server) Serve(name string) error {
s.serverStateLock.Lock()
defer s.serverStateLock.Unlock()
if s.dispatcher == nil {
s.dispatcher = newDispatcher(s.id, s, s, s)
}
if !s.isListening {
_, err := s.server.Listen(*s.listenSpec)
if err != nil {
return err
}
s.isListening = true
}
if err := s.server.ServeDispatcher(name, s.dispatcher); err != nil {
return err
}
go s.readStatus()
return nil
}
func (s *Server) popServerRequest(id int32) chan *lib.ServerRpcReply {
s.outstandingRequestLock.Lock()
defer s.outstandingRequestLock.Unlock()
ch := s.outstandingServerRequests[id]
delete(s.outstandingServerRequests, id)
return ch
}
func (s *Server) HandleServerResponse(id int32, data string) {
ch := s.popServerRequest(id)
if ch == nil {
vlog.Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
// Ignore unknown responses that don't belong to any channel
return
}
// Decode the result and send it through the channel
var reply lib.ServerRpcReply
if err := lib.VomDecode(data, &reply); err != nil {
reply.Err = err
}
vlog.VI(0).Infof("response received from JavaScript server for "+
"MessageId %d with result %v", id, reply)
s.helper.CleanupFlow(id)
if reply.Err != nil {
ch <- &reply
return
}
jsBlessingsType := vdl.TypeOf(principal.JsBlessings{})
for i, val := range reply.Results {
if val.Type() == jsBlessingsType {
var jsBlessings principal.JsBlessings
if err := vdl.Convert(&jsBlessings, val); err != nil {
reply.Err = err
break
}
reply.Results[i] = vdl.ValueOf(
s.helper.GetBlessings(jsBlessings.Handle))
}
}
ch <- &reply
}
func (s *Server) HandleLookupResponse(id int32, data string) {
s.dispatcher.handleLookupResponse(id, data)
}
func (s *Server) HandleAuthResponse(id int32, data string) {
s.outstandingRequestLock.Lock()
ch := s.outstandingAuthRequests[id]
s.outstandingRequestLock.Unlock()
if ch == nil {
vlog.Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results(%s)", id, data)
// Ignore unknown responses that don't belong to any channel
return
}
// Decode the result and send it through the channel
var reply authReply
if decoderErr := json.Unmarshal([]byte(data), &reply); decoderErr != nil {
err := verror.Convert(verror.ErrInternal, nil, decoderErr).(verror.E)
reply = authReply{Err: &err}
}
vlog.VI(0).Infof("response received from JavaScript server for "+
"MessageId %d with result %v", id, reply)
s.helper.CleanupFlow(id)
// A nil verror.E does not result in an nil error. Instead, we have create
// a variable for the error interface and only set it's value if the struct is non-
// nil.
var err error
if reply.Err != nil {
err = reply.Err
}
ch <- err
}
func (s *Server) HandleCaveatValidationResponse(id int32, data string) {
s.outstandingRequestLock.Lock()
ch := s.outstandingValidationRequests[id]
s.outstandingRequestLock.Unlock()
if ch == nil {
vlog.Errorf("unexpected result from JavaScript. No channel "+
"for validation response with MessageId: %d exists. Ignoring the results(%s)", id, data)
// Ignore unknown responses that don't belong to any channel
return
}
var reply CaveatValidationResponse
if err := lib.VomDecode(data, &reply); err != nil {
vlog.Errorf("failed to decode validation response %q: error %v", data, err)
ch <- []error{}
return
}
ch <- reply.Results
}
func (s *Server) createFlow() *Flow {
return s.helper.CreateNewFlow(s, nil)
}
func (s *Server) cleanupFlow(id int32) {
s.helper.CleanupFlow(id)
}
func (s *Server) createInvoker(handle int32, sig []signature.Interface, hasGlobber bool) (rpc.Invoker, error) {
remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
var globFunc remoteGlobFunc
if hasGlobber {
globFunc = s.createRemoteGlobFunc(handle)
}
return newInvoker(sig, remoteInvokeFunc, globFunc), nil
}
func (s *Server) createAuthorizer(handle int32, hasAuthorizer bool) (security.Authorizer, error) {
if hasAuthorizer {
return s.createRemoteAuthorizer(handle), nil
}
return nil, nil
}
func (s *Server) Stop() {
stdErr := verror.New(verror.ErrTimeout, nil).(verror.E)
result := lib.ServerRpcReply{
Results: nil,
Err: &stdErr,
}
s.serverStateLock.Lock()
vlog.VI(0).Infof("Cleaning up dispatcher")
if s.dispatcher != nil {
s.dispatcher.Cleanup()
}
vlog.VI(0).Infof("Cleaning up authorizer")
for _, ch := range s.outstandingAuthRequests {
ch <- fmt.Errorf("Cleaning up server")
}
vlog.VI(0).Infof("Cleaning up outstanding requests")
for _, ch := range s.outstandingServerRequests {
select {
case ch <- &result:
default:
}
}
close(s.statusClose)
s.outstandingRequestLock.Lock()
s.outstandingAuthRequests = make(map[int32]chan error)
s.outstandingServerRequests = make(map[int32]chan *lib.ServerRpcReply)
s.outstandingRequestLock.Unlock()
s.serverStateLock.Unlock()
vlog.VI(0).Infof("Calling stop")
go func() {
<-time.After(5 * time.Second)
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
vlog.VI(0).Infof("stack is %s", buf)
}()
s.server.Stop()
vlog.VI(0).Infof("Finishing stop")
// Only clear the validation requests map after stopping. Clearing them before
// can cause the publisher to get stuck waiting for a caveat validation that
// will never be answered, which prevents the server from stopping.
s.serverStateLock.Lock()
s.outstandingRequestLock.Lock()
s.outstandingValidationRequests = make(map[int32]chan []error)
s.outstandingRequestLock.Unlock()
s.serverStateLock.Unlock()
}
func (s *Server) AddName(name string) error {
return s.server.AddName(name)
}
func (s *Server) RemoveName(name string) {
s.server.RemoveName(name)
}