runtime/internal/rpc/stream/vc: Ensure discharge loops are stopped.

Change-Id: I9a06ff658143c03626d5072c879ed90f2ea03da7
diff --git a/runtime/internal/rpc/stream/vc/vc.go b/runtime/internal/rpc/stream/vc/vc.go
index 6249c58..4a383aa 100644
--- a/runtime/internal/rpc/stream/vc/vc.go
+++ b/runtime/internal/rpc/stream/vc/vc.go
@@ -115,6 +115,7 @@
 
 	helper    Helper
 	dataCache *dataCache // dataCache contains information that can shared between Flows from this VC.
+	loopWG    sync.WaitGroup
 }
 
 // ServerAuthorizer encapsulates the policy used to authorize servers during VC
@@ -431,6 +432,8 @@
 		vc.ctx.VI(2).Infof("Closing flow %d on VC %v as VC is being closed(%q)", fid, vc, reason)
 		flow.Close()
 	}
+
+	vc.loopWG.Wait()
 	return nil
 }
 
@@ -800,6 +803,7 @@
 
 func (vc *VC) sendDischargesLoop(conn io.WriteCloser, dc DischargeClient, tpCavs []security.Caveat, dischargeExpiryBuffer time.Duration) {
 	defer conn.Close()
+	defer vc.loopWG.Done()
 	if dc == nil {
 		return
 	}
@@ -853,6 +857,7 @@
 
 func (vc *VC) recvDischargesLoop(conn io.ReadCloser) {
 	defer conn.Close()
+	defer vc.loopWG.Done()
 	dec := vom.NewDecoder(conn)
 	for {
 		var discharges []security.Discharge
@@ -890,6 +895,7 @@
 		if err != nil {
 			return verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForDischarge, nil, err))
 		}
+		vc.loopWG.Add(1)
 		go vc.recvDischargesLoop(conn)
 	}
 
@@ -917,6 +923,7 @@
 		if err != nil {
 			return verror.New(errFlowForDischargeNotAccepted, nil, err)
 		}
+		vc.loopWG.Add(1)
 		go vc.sendDischargesLoop(conn, dischargeClient, tpCaveats, dischargeExpiryBuffer)
 	}
 	return nil