runtime/internal/rpc/stream/vc: Ensure discharge loops are stopped.
Change-Id: I9a06ff658143c03626d5072c879ed90f2ea03da7
diff --git a/runtime/internal/rpc/stream/vc/vc.go b/runtime/internal/rpc/stream/vc/vc.go
index 6249c58..4a383aa 100644
--- a/runtime/internal/rpc/stream/vc/vc.go
+++ b/runtime/internal/rpc/stream/vc/vc.go
@@ -115,6 +115,7 @@
helper Helper
dataCache *dataCache // dataCache contains information that can shared between Flows from this VC.
+ loopWG sync.WaitGroup
}
// ServerAuthorizer encapsulates the policy used to authorize servers during VC
@@ -431,6 +432,8 @@
vc.ctx.VI(2).Infof("Closing flow %d on VC %v as VC is being closed(%q)", fid, vc, reason)
flow.Close()
}
+
+ vc.loopWG.Wait()
return nil
}
@@ -800,6 +803,7 @@
func (vc *VC) sendDischargesLoop(conn io.WriteCloser, dc DischargeClient, tpCavs []security.Caveat, dischargeExpiryBuffer time.Duration) {
defer conn.Close()
+ defer vc.loopWG.Done()
if dc == nil {
return
}
@@ -853,6 +857,7 @@
func (vc *VC) recvDischargesLoop(conn io.ReadCloser) {
defer conn.Close()
+ defer vc.loopWG.Done()
dec := vom.NewDecoder(conn)
for {
var discharges []security.Discharge
@@ -890,6 +895,7 @@
if err != nil {
return verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForDischarge, nil, err))
}
+ vc.loopWG.Add(1)
go vc.recvDischargesLoop(conn)
}
@@ -917,6 +923,7 @@
if err != nil {
return verror.New(errFlowForDischargeNotAccepted, nil, err)
}
+ vc.loopWG.Add(1)
go vc.sendDischargesLoop(conn, dischargeClient, tpCaveats, dischargeExpiryBuffer)
}
return nil