v.io/v23/syncbase: call context.CancelFunc functions

In a previous cleanup, we made sure to call all context.CancelFunc
functions where we would otherwise leak memory.

The check in change 22953 is now more strict.  It enforces the spec of
Go's context package, which requires that the context.CancelFunc
be called, even if the context will be cancelled by other means.

This change fixes up the places in syncbase that the new check
reveals.

MultiPart: 2/2
Change-Id: Iba2004dbe179780159f0295f9d811ed3b7af8648
diff --git a/services/syncbase/discovery/discovery.go b/services/syncbase/discovery/discovery.go
index 9e2abc8..d6208da 100644
--- a/services/syncbase/discovery/discovery.go
+++ b/services/syncbase/discovery/discovery.go
@@ -84,6 +84,7 @@
 	// IBE is set up.  See v.io/i/1345.
 	updates := make(chan discovery.Update)
 	go func() {
+		defer nhCancel()
 		defer close(updates)
 		seen := make(map[discovery.AdId]*updateRef)
 		for {
@@ -190,6 +191,7 @@
 		if d.globalDiscovery != nil {
 			<-globalStopped
 		}
+		nhCancel()
 		close(stopped)
 	}()
 	ad.Id = adCopy.Id
diff --git a/services/syncbase/vsync/clock.go b/services/syncbase/vsync/clock.go
index 7598895..408c1a2 100644
--- a/services/syncbase/vsync/clock.go
+++ b/services/syncbase/vsync/clock.go
@@ -80,12 +80,15 @@
 	// leverage what we learned here about available mount tables and endpoints
 	// when we continue on to the next initiation phase.
 	var timeRespInt interface{}
-	if _, timeRespInt, err = runAtPeer(ctx, peer, func(ctx *context.T, peer string) (interface{}, error) {
+	var runAtPeerCancel context.CancelFunc
+	if _, timeRespInt, runAtPeerCancel, err = runAtPeer(ctx, peer, func(ctx *context.T, peer string) (interface{}, error) {
 		return interfaces.SyncClient(peer).GetTime(ctx, interfaces.TimeReq{SendTs: origNow}, s.name, options.ConnectionTimeout(syncConnectionTimeout))
 	}); err != nil {
 		vlog.Errorf("sync: syncVClock: GetTime failed: %v", err)
+		runAtPeerCancel()
 		return err
 	}
+	defer runAtPeerCancel()
 
 	// Check for sysclock changes.
 	recvNow, _, err := s.vclock.SysClockValsIfNotChanged(origNow, origElapsedTime)
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 85662cf..ea7af2e 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -287,7 +287,9 @@
 
 	// Make contact with the peer to start getting the deltas.
 	var err error
-	c.peer, _, err = runAtPeer(ctx, c.peer, op)
+	var runAtPeerCancel context.CancelFunc
+	c.peer, _, runAtPeerCancel, err = runAtPeer(ctx, c.peer, op)
+	defer runAtPeerCancel()
 	if err != nil {
 		return err
 	}
@@ -1226,41 +1228,56 @@
 // be nil (e.g. for GetDeltas stream connections).
 type remoteOp func(ctx *context.T, peer string) (interface{}, error)
 
+// runAllCancelFuncs runs the context.CancelFunc routines in cancelFuncs.
+func runAllCancelFuncs(cancelFuncs []context.CancelFunc) {
+	for _, f := range cancelFuncs {
+		f()
+	}
+}
+
 // runAtPeer attempts to connect to the remote peer using the neighborhood
 // address when specified or the mount tables obtained from all the common
 // syncgroups, and runs the specified op.
-func runAtPeer(ctx *context.T, peer connInfo, op remoteOp) (connInfo, interface{}, error) {
+func runAtPeer(ctx *context.T, peer connInfo, op remoteOp) (connInfo, interface{}, context.CancelFunc, error) {
 	if len(peer.mtTbls) < 1 && len(peer.addrs) < 1 {
-		return peer, nil, verror.New(verror.ErrInternal, ctx, "no mount tables or endpoints found", peer)
+		return peer, nil, func() {}, verror.New(verror.ErrInternal, ctx, "no mount tables or endpoints found", peer)
 	}
 
 	updPeer := peer
+	var cancelFuncs []context.CancelFunc
 	if peer.addrs != nil {
 		for i, addr := range peer.addrs {
 			absName := naming.Join(addr, common.SyncbaseSuffix)
-			if resp, err := runRemoteOp(ctx, absName, op); verror.ErrorID(err) != interfaces.ErrConnFail.ID {
+			resp, cancel, err := runRemoteOp(ctx, absName, op)
+			cancelFuncs = append(cancelFuncs, cancel)
+			if verror.ErrorID(err) != interfaces.ErrConnFail.ID {
 				updPeer.addrs = updPeer.addrs[i:]
-				return updPeer, resp, err
+				return updPeer, resp, func() { runAllCancelFuncs(cancelFuncs) }, err
 			}
 		}
 		updPeer.addrs = nil
 	} else {
 		for i, mt := range peer.mtTbls {
 			absName := naming.Join(mt, peer.relName, common.SyncbaseSuffix)
-			if resp, err := runRemoteOp(ctx, absName, op); verror.ErrorID(err) != interfaces.ErrConnFail.ID {
+			resp, cancel, err := runRemoteOp(ctx, absName, op)
+			if verror.ErrorID(err) != interfaces.ErrConnFail.ID {
+				cancelFuncs = append(cancelFuncs, cancel)
 				updPeer.mtTbls = updPeer.mtTbls[i:]
-				return updPeer, resp, err
+				return updPeer, resp, func() { runAllCancelFuncs(cancelFuncs) }, err
 			}
+			cancelFuncs = append(cancelFuncs, cancel)
 		}
 		updPeer.mtTbls = nil
 	}
 
-	return updPeer, nil, verror.New(interfaces.ErrConnFail, ctx, "couldn't connect to peer", updPeer.relName, updPeer.addrs)
+	return updPeer, nil, func() { runAllCancelFuncs(cancelFuncs) },
+		verror.New(interfaces.ErrConnFail, ctx, "couldn't connect to peer", updPeer.relName, updPeer.addrs)
 }
 
 // runRemoteOp runs the remoteOp on the server specified by absName.
-// It is the caller's responsibility to cancel the ctx argument.
-func runRemoteOp(ctxIn *context.T, absName string, op remoteOp) (interface{}, error) {
+// It is the caller's responsibility to call the returned context.CancelFunc
+// if this call returns a nil error value.
+func runRemoteOp(ctxIn *context.T, absName string, op remoteOp) (interface{}, context.CancelFunc, error) {
 	vlog.VI(4).Infof("sync: runRemoteOp: begin for %v", absName)
 
 	ctx, cancel := context.WithCancel(ctxIn)
@@ -1270,7 +1287,7 @@
 	if err == nil {
 		vlog.VI(4).Infof("sync: runRemoteOp: end op established for %s", absName)
 		// Responsibility for calling cancel() is passed to caller.
-		return resp, nil
+		return resp, cancel, nil
 	}
 
 	vlog.VI(4).Infof("sync: runRemoteOp: end for %s, err %v", absName, err)
@@ -1284,5 +1301,5 @@
 	}
 
 	cancel()
-	return nil, err
+	return nil, func() {}, err
 }