Fixed stuff.
Change-Id: Ib1da36a77400fec664e7edd39efe68ac26626924
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index 8293a67..2bf81db 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -299,9 +299,10 @@
// Cleanup cleans up any outstanding rpcs.
func (c *Controller) Cleanup() {
- vlog.VI(0).Info("Cleaning up controller")
+ vlog.VI(0).Info("Cleaning up controller yayayayaya")
c.Lock()
+ vlog.VI(0).Infof("Cleaning up %d requests", len(c.outstandingRequests))
for _, request := range c.outstandingRequests {
if request.cancel != nil {
request.cancel()
@@ -318,6 +319,7 @@
c.Unlock()
+ vlog.VI(0).Info("Done cleaning up outstanding requests")
// We must unlock before calling server.Stop otherwise it can deadlock.
for _, server := range servers {
server.Stop()
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index 02e05e7..850e742 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -9,6 +9,7 @@
import (
"encoding/json"
"fmt"
+ "runtime"
"sync"
"time"
@@ -94,7 +95,7 @@
outstandingValidationRequests map[int32]chan []error // GUARDED_BY outstandingRequestLock
// statusClose will be closed when the server is shutting down, this will
- // cause the status poller to exit.
+ // cause the status poller to exit and cleanup any waiting requests.
statusClose chan struct{}
}
@@ -108,6 +109,7 @@
outstandingServerRequests: make(map[int32]chan *lib.ServerRpcReply),
outstandingAuthRequests: make(map[int32]chan error),
outstandingValidationRequests: make(map[int32]chan []error),
+ statusClose: make(chan struct{}, 1),
}
var err error
ctx := helper.Context()
@@ -124,10 +126,17 @@
func (s *Server) createRemoteInvokerFunc(handle int32) remoteInvokeFunc {
return func(ctx *context.T, call rpc.StreamServerCall, methodName string, args []interface{}) <-chan *lib.ServerRpcReply {
- securityCall := ConvertSecurityCall(s.helper, ctx, call.Security(), true)
- flow := s.helper.CreateNewFlow(s, call)
replyChan := make(chan *lib.ServerRpcReply, 1)
+ select {
+ case <-s.statusClose:
+ retryError := NewErrServerStopped(nil).(verror.E)
+ replyChan <- &lib.ServerRpcReply{nil, &retryError, vtrace.Response{}}
+ return replyChan
+ }
+
+ securityCall := ConvertSecurityCall(s.helper, ctx, call.Security(), true)
+ flow := s.helper.CreateNewFlow(s, call)
s.outstandingRequestLock.Lock()
s.outstandingServerRequests[flow.ID] = replyChan
s.outstandingRequestLock.Unlock()
@@ -244,6 +253,12 @@
ch: globChan,
ctx: ctx,
})
+ select {
+ case <-s.statusClose:
+ close(globChan)
+ return globChan, nil
+ }
+
replyChan := make(chan *lib.ServerRpcReply, 1)
s.outstandingRequestLock.Lock()
s.outstandingServerRequests[flow.ID] = replyChan
@@ -363,6 +378,10 @@
// each []security.Caveat in cavs to an error (or nil) and collects them in a
// slice.
func (s *Server) caveatValidationInJavascript(ctx *context.T, call security.Call, cavs [][]security.Caveat) []error {
+ select {
+ case <-s.statusClose:
+ return make([]error, len(cavs))
+ }
flow := s.helper.CreateNewFlow(s, nil)
req := CaveatValidationRequest{
Call: ConvertSecurityCall(s.helper, ctx, call, false),
@@ -433,6 +452,7 @@
// no longer there.
select {
case <-server.statusClose:
+ vlog.VI(0).Infof("Validating caveats")
res := cav.Validate(ctx, call)
if res != nil {
valStatus[i] = validationStatus{
@@ -530,13 +550,18 @@
}
func (s *Server) authorizeRemote(ctx *context.T, call security.Call, handle int32) error {
+ select {
+ case <-s.statusClose:
+ return NewErrServerStopped(nil).(verror.E)
+ }
+
// Until the tests get fixed, we need to create a security context before
// creating the flow because creating the security context creates a flow and
// flow ids will be off.
securityCall := ConvertSecurityCall(s.helper, ctx, call, true)
+ replyChan := make(chan error, 1)
flow := s.helper.CreateNewFlow(s, nil)
- replyChan := make(chan error, 1)
s.outstandingRequestLock.Lock()
s.outstandingAuthRequests[flow.ID] = replyChan
s.outstandingRequestLock.Unlock()
@@ -554,7 +579,12 @@
replyChan <- verror.Convert(verror.ErrInternal, nil, err)
}
- err = <-replyChan
+ select {
+ case err = <-replyChan:
+ case <-s.statusClose:
+ err = NewErrServerStopped(nil).(verror.E)
+ }
+
vlog.VI(0).Infof("going to respond with %v", err)
s.outstandingRequestLock.Lock()
delete(s.outstandingAuthRequests, flow.ID)
@@ -612,7 +642,6 @@
if err := s.server.ServeDispatcher(name, s.dispatcher); err != nil {
return err
}
- s.statusClose = make(chan struct{}, 1)
go s.readStatus()
return nil
}
@@ -750,30 +779,40 @@
}
s.serverStateLock.Lock()
- if s.statusClose != nil {
- close(s.statusClose)
- }
+ vlog.VI(0).Infof("Cleaning up dispatcher")
if s.dispatcher != nil {
s.dispatcher.Cleanup()
}
+ vlog.VI(0).Infof("Cleaning up authorizer")
for _, ch := range s.outstandingAuthRequests {
ch <- fmt.Errorf("Cleaning up server")
}
+ vlog.VI(0).Infof("Cleaning up outstanding requests")
for _, ch := range s.outstandingServerRequests {
select {
case ch <- &result:
default:
}
}
+ close(s.statusClose)
+
s.outstandingRequestLock.Lock()
s.outstandingAuthRequests = make(map[int32]chan error)
s.outstandingServerRequests = make(map[int32]chan *lib.ServerRpcReply)
s.outstandingRequestLock.Unlock()
s.serverStateLock.Unlock()
+ vlog.VI(0).Infof("Calling stop")
+ go func() {
+ <-time.After(5 * time.Second)
+ buf := make([]byte, 1<<16)
+ runtime.Stack(buf, true)
+ vlog.VI(0).Infof("stack is %s", buf)
+ }()
s.server.Stop()
+ vlog.VI(0).Infof("Finishing stop")
// Only clear the validation requests map after stopping. Clearing them before
// can cause the publisher to get stuck waiting for a caveat validation that
// will never be answered, which prevents the server from stopping.