veyron/veyron2/ipc: Call should n ot have a Cancel method.

ctx.Cancel should be used instead.

Change-Id: I6809fd8723de288f07b327a35b62fd69e6d39eac
MultiPart: 1/2
diff --git a/runtimes/google/ipc/benchmarks/service.vdl.go b/runtimes/google/ipc/benchmarks/service.vdl.go
index 6fac31d..f2f23e8 100644
--- a/runtimes/google/ipc/benchmarks/service.vdl.go
+++ b/runtimes/google/ipc/benchmarks/service.vdl.go
@@ -107,17 +107,19 @@
 	}
 	// SendStream returns the send side of the Benchmark.EchoStream client stream.
 	SendStream() interface {
-		// Send places the item onto the output stream.  Returns errors encountered
-		// while sending, or if Send is called after Close or Cancel.  Blocks if
-		// there is no buffer space; will unblock when buffer space is available or
-		// after Cancel.
+		// Send places the item onto the output stream.  Returns errors
+		// encountered while sending, or if Send is called after Close or
+		// the stream has been canceled.  Blocks if there is no buffer
+		// space; will unblock when buffer space is available or after
+		// the stream has been canceled.
 		Send(item []byte) error
-		// Close indicates to the server that no more items will be sent; server
-		// Recv calls will receive io.EOF after all sent items.  This is an optional
-		// call - e.g. a client might call Close if it needs to continue receiving
-		// items from the server after it's done sending.  Returns errors
-		// encountered while closing, or if Close is called after Cancel.  Like
-		// Send, blocks if there is no buffer space available.
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.
+		// This is an optional call - e.g. a client might call Close if it
+		// needs to continue receiving items from the server after it's
+		// done sending.  Returns errors encountered while closing, or if
+		// Close is called after the stream has been canceled.  Like Send,
+		// blocks if there is no buffer space available.
 		Close() error
 	}
 }
@@ -128,18 +130,14 @@
 	// Finish performs the equivalent of SendStream().Close, then blocks until
 	// the server is done, and returns the positional return values for the call.
 	//
-	// Finish returns immediately if Cancel has been called; depending on the
+	// Finish returns immediately if the call has been canceled; depending on the
 	// timing the output could either be an error signaling cancelation, or the
 	// valid positional return values from the server.
 	//
-	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return an error.  Finish should
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
 	// be called at most once.
 	Finish() error
-	// Cancel cancels the RPC, notifying the server to stop processing.  It is
-	// safe to call Cancel concurrently with any of the other stream methods.
-	// Calling Cancel after Finish has returned is a no-op.
-	Cancel()
 }
 
 type implBenchmarkEchoStreamCall struct {
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 4891254..8475dea 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -501,7 +501,8 @@
 				go func() {
 					select {
 					case <-ctx.Done():
-						fc.Cancel()
+						ivtrace.FromContext(fc.ctx).Annotate("Cancelled")
+						fc.flow.Cancel()
 					case <-fc.flow.Closed():
 					}
 				}()
@@ -901,12 +902,6 @@
 	return fc.close(nil)
 }
 
-func (fc *flowClient) Cancel() {
-	defer vlog.LogCall()()
-	ivtrace.FromContext(fc.ctx).Annotate("Cancelled")
-	fc.flow.Cancel()
-}
-
 func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
 	return fc.server, fc.flow.RemoteBlessings()
 }
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 746b578..5f03ee0 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -15,6 +15,7 @@
 	"testing"
 	"time"
 
+	"veyron.io/veyron/veyron2/context"
 	"veyron.io/veyron/veyron2/ipc"
 	"veyron.io/veyron/veyron2/ipc/stream"
 	"veyron.io/veyron/veyron2/naming"
@@ -1090,9 +1091,9 @@
 	return nil
 }
 
-func waitForCancel(t *testing.T, ts *cancelTestServer, call ipc.Call) {
+func waitForCancel(t *testing.T, ts *cancelTestServer, cancel context.CancelFunc) {
 	<-ts.started
-	call.Cancel()
+	cancel()
 	<-ts.cancelled
 }
 
@@ -1102,11 +1103,12 @@
 	b := createBundle(t, tsecurity.NewPrincipal("client"), tsecurity.NewPrincipal("server"), ts)
 	defer b.cleanup(t)
 
-	call, err := b.client.StartCall(testContext(), "mountpoint/server/suffix", "CancelStreamReader", []interface{}{})
+	ctx, cancel := testContext().WithCancel()
+	_, err := b.client.StartCall(ctx, "mountpoint/server/suffix", "CancelStreamReader", []interface{}{})
 	if err != nil {
 		t.Fatalf("Start call failed: %v", err)
 	}
-	waitForCancel(t, ts, call)
+	waitForCancel(t, ts, cancel)
 }
 
 // TestCancelWithFullBuffers tests that even if the writer has filled the buffers and
@@ -1116,7 +1118,8 @@
 	b := createBundle(t, tsecurity.NewPrincipal("client"), tsecurity.NewPrincipal("server"), ts)
 	defer b.cleanup(t)
 
-	call, err := b.client.StartCall(testContext(), "mountpoint/server/suffix", "CancelStreamIgnorer", []interface{}{})
+	ctx, cancel := testContext().WithCancel()
+	call, err := b.client.StartCall(ctx, "mountpoint/server/suffix", "CancelStreamIgnorer", []interface{}{})
 	if err != nil {
 		t.Fatalf("Start call failed: %v", err)
 	}
@@ -1125,7 +1128,7 @@
 	call.Send(make([]byte, vc.MaxSharedBytes))
 	call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
 
-	waitForCancel(t, ts, call)
+	waitForCancel(t, ts, cancel)
 }
 
 type streamRecvInGoroutineServer struct{ c chan error }
diff --git a/services/mgmt/build/impl/impl_test.go b/services/mgmt/build/impl/impl_test.go
index 85b4773..6c975c5 100644
--- a/services/mgmt/build/impl/impl_test.go
+++ b/services/mgmt/build/impl/impl_test.go
@@ -76,7 +76,9 @@
 
 func invokeBuild(t *testing.T, client build.BuilderClientMethods, files []build.File) ([]byte, []build.File, error) {
 	arch, opsys := getArch(), getOS()
-	stream, err := client.Build(globalRT.NewContext(), arch, opsys)
+	ctx, cancel := globalRT.NewContext().WithCancel()
+	defer cancel()
+	stream, err := client.Build(ctx, arch, opsys)
 	if err != nil {
 		t.Errorf("Build(%v, %v) failed: %v", err, arch, opsys)
 		return nil, nil, err
@@ -85,13 +87,11 @@
 	for _, file := range files {
 		if err := sender.Send(file); err != nil {
 			t.Logf("Send() failed: %v", err)
-			stream.Cancel()
 			return nil, nil, err
 		}
 	}
 	if err := sender.Close(); err != nil {
 		t.Logf("Close() failed: %v", err)
-		stream.Cancel()
 		return nil, nil, err
 	}
 	bins := make([]build.File, 0)
@@ -106,7 +106,6 @@
 	output, err := stream.Finish()
 	if err != nil {
 		t.Logf("Finish() failed: %v", err)
-		stream.Cancel()
 		return nil, nil, err
 	}
 	return output, bins, nil
diff --git a/services/mgmt/lib/binary/impl.go b/services/mgmt/lib/binary/impl.go
index e66a2a6..1013992 100644
--- a/services/mgmt/lib/binary/impl.go
+++ b/services/mgmt/lib/binary/impl.go
@@ -45,6 +45,65 @@
 	return nil
 }
 
+type indexedPart struct {
+	part   binary.PartInfo
+	index  int
+	offset int64
+}
+
+func downloadPartAttempt(ctx context.T, w io.WriteSeeker, client repository.BinaryClientStub, ip *indexedPart) bool {
+	ctx, cancel := ctx.WithCancel()
+	defer cancel()
+
+	if _, err := w.Seek(ip.offset, 0); err != nil {
+		vlog.Errorf("Seek(%v, 0) failed: %v", ip.offset, err)
+		return false
+	}
+	stream, err := client.Download(ctx, int32(ip.index))
+	if err != nil {
+		vlog.Errorf("Download(%v) failed: %v", ip.index, err)
+		return false
+	}
+	h, nreceived := md5.New(), 0
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		bytes := rStream.Value()
+		if _, err := w.Write(bytes); err != nil {
+			vlog.Errorf("Write() failed: %v", err)
+			return false
+		}
+		h.Write(bytes)
+		nreceived += len(bytes)
+	}
+
+	if err := rStream.Err(); err != nil {
+		vlog.Errorf("Advance() failed: %v", err)
+		return false
+	}
+	if err := stream.Finish(); err != nil {
+		vlog.Errorf("Finish() failed: %v", err)
+		return false
+	}
+	if expected, got := ip.part.Checksum, hex.EncodeToString(h.Sum(nil)); expected != got {
+		vlog.Errorf("Unexpected checksum: expected %v, got %v", expected, got)
+		return false
+	}
+	if expected, got := ip.part.Size, int64(nreceived); expected != got {
+		vlog.Errorf("Unexpected size: expected %v, got %v", expected, got)
+		return false
+	}
+	return true
+}
+
+func downloadPart(ctx context.T, w io.WriteSeeker, client repository.BinaryClientStub, ip *indexedPart) bool {
+	for i := 0; i < nAttempts; i++ {
+		if downloadPartAttempt(ctx, w, client, ip) {
+			return true
+		}
+	}
+	return false
+}
+
 func download(ctx context.T, w io.WriteSeeker, von string) (repository.MediaInfo, error) {
 	client := repository.BinaryClient(von)
 	parts, mediaInfo, err := client.Stat(ctx)
@@ -57,54 +116,10 @@
 			return repository.MediaInfo{}, verror.Make(verror.NoExist, ctx)
 		}
 	}
-	offset, whence := int64(0), 0
+	offset := int64(0)
 	for i, part := range parts {
-		success := false
-	download:
-		for j := 0; !success && j < nAttempts; j++ {
-			if _, err := w.Seek(offset, whence); err != nil {
-				vlog.Errorf("Seek(%v, %v) failed: %v", offset, whence, err)
-				continue
-			}
-			stream, err := client.Download(ctx, int32(i))
-			if err != nil {
-				vlog.Errorf("Download(%v) failed: %v", i, err)
-				continue
-			}
-			h, nreceived := md5.New(), 0
-			rStream := stream.RecvStream()
-			for rStream.Advance() {
-				bytes := rStream.Value()
-				if _, err := w.Write(bytes); err != nil {
-					vlog.Errorf("Write() failed: %v", err)
-					stream.Cancel()
-					continue download
-				}
-				h.Write(bytes)
-				nreceived += len(bytes)
-			}
-
-			if err := rStream.Err(); err != nil {
-				vlog.Errorf("Advance() failed: %v", err)
-				stream.Cancel()
-				continue download
-
-			}
-			if err := stream.Finish(); err != nil {
-				vlog.Errorf("Finish() failed: %v", err)
-				continue
-			}
-			if expected, got := part.Checksum, hex.EncodeToString(h.Sum(nil)); expected != got {
-				vlog.Errorf("Unexpected checksum: expected %v, got %v", expected, got)
-				continue
-			}
-			if expected, got := part.Size, int64(nreceived); expected != got {
-				vlog.Errorf("Unexpected size: expected %v, got %v", expected, got)
-				continue
-			}
-			success = true
-		}
-		if !success {
+		ip := &indexedPart{part, i, offset}
+		if !downloadPart(ctx, w, client, ip) {
 			return repository.MediaInfo{}, verror.Make(errOperationFailed, ctx)
 		}
 		offset += part.Size
@@ -188,6 +203,86 @@
 	return url, ttl, nil
 }
 
+func uploadPartAttempt(ctx context.T, r io.ReadSeeker, client repository.BinaryClientStub, part int, size int64) (bool, error) {
+	ctx, cancel := ctx.WithCancel()
+	defer cancel()
+
+	offset := int64(part * partSize)
+	if _, err := r.Seek(offset, 0); err != nil {
+		vlog.Errorf("Seek(%v, 0) failed: %v", offset, err)
+		return false, nil
+	}
+	stream, err := client.Upload(ctx, int32(part))
+	if err != nil {
+		vlog.Errorf("Upload(%v) failed: %v", part, err)
+		return false, nil
+	}
+	bufferSize := partSize
+	if remaining := size - offset; remaining < int64(bufferSize) {
+		bufferSize = int(remaining)
+	}
+	buffer := make([]byte, bufferSize)
+
+	nread := 0
+	for nread < len(buffer) {
+		n, err := r.Read(buffer[nread:])
+		nread += n
+		if err != nil && (err != io.EOF || nread < len(buffer)) {
+			vlog.Errorf("Read() failed: %v", err)
+			return false, nil
+		}
+	}
+	sender := stream.SendStream()
+	for from := 0; from < len(buffer); from += subpartSize {
+		to := from + subpartSize
+		if to > len(buffer) {
+			to = len(buffer)
+		}
+		if err := sender.Send(buffer[from:to]); err != nil {
+			vlog.Errorf("Send() failed: %v", err)
+			return false, nil
+		}
+	}
+	if err := sender.Close(); err != nil {
+		vlog.Errorf("Close() failed: %v", err)
+		parts, _, statErr := client.Stat(ctx)
+		if statErr != nil {
+			vlog.Errorf("Stat() failed: %v", statErr)
+			if deleteErr := client.Delete(ctx); err != nil {
+				vlog.Errorf("Delete() failed: %v", deleteErr)
+			}
+			return false, err
+		}
+		if parts[part].Checksum == binary.MissingChecksum {
+			return false, nil
+		}
+	}
+	if err := stream.Finish(); err != nil {
+		vlog.Errorf("Finish() failed: %v", err)
+		parts, _, statErr := client.Stat(ctx)
+		if statErr != nil {
+			vlog.Errorf("Stat() failed: %v", statErr)
+			if deleteErr := client.Delete(ctx); err != nil {
+				vlog.Errorf("Delete() failed: %v", deleteErr)
+			}
+			return false, err
+		}
+		if parts[part].Checksum == binary.MissingChecksum {
+			return false, nil
+		}
+	}
+	return true, nil
+}
+
+func uploadPart(ctx context.T, r io.ReadSeeker, client repository.BinaryClientStub, part int, size int64) error {
+	for i := 0; i < nAttempts; i++ {
+		if success, err := uploadPartAttempt(ctx, r, client, part, size); success || err != nil {
+			return err
+		}
+	}
+	return verror.Make(errOperationFailed, ctx)
+}
+
 func upload(ctx context.T, r io.ReadSeeker, mediaInfo repository.MediaInfo, von string) error {
 	client := repository.BinaryClient(von)
 	offset, whence := int64(0), 2
@@ -202,78 +297,8 @@
 		return err
 	}
 	for i := 0; int64(i) < nparts; i++ {
-		success := false
-	upload:
-		for j := 0; !success && j < nAttempts; j++ {
-			offset, whence := int64(i*partSize), 0
-			if _, err := r.Seek(offset, whence); err != nil {
-				vlog.Errorf("Seek(%v, %v) failed: %v", offset, whence, err)
-				continue
-			}
-			stream, err := client.Upload(ctx, int32(i))
-			if err != nil {
-				vlog.Errorf("Upload(%v) failed: %v", i, err)
-				continue
-			}
-			buffer := make([]byte, partSize)
-			if int64(i+1) == nparts {
-				buffer = buffer[:(size % partSize)]
-			}
-			nread := 0
-			for nread < len(buffer) {
-				n, err := r.Read(buffer[nread:])
-				nread += n
-				if err != nil && (err != io.EOF || nread < len(buffer)) {
-					vlog.Errorf("Read() failed: %v", err)
-					stream.Cancel()
-					continue upload
-				}
-			}
-			sender := stream.SendStream()
-			for from := 0; from < len(buffer); from += subpartSize {
-				to := from + subpartSize
-				if to > len(buffer) {
-					to = len(buffer)
-				}
-				if err := sender.Send(buffer[from:to]); err != nil {
-					vlog.Errorf("Send() failed: %v", err)
-					stream.Cancel()
-					continue upload
-				}
-			}
-			if err := sender.Close(); err != nil {
-				vlog.Errorf("Close() failed: %v", err)
-				parts, _, statErr := client.Stat(ctx)
-				if statErr != nil {
-					vlog.Errorf("Stat() failed: %v", statErr)
-					if deleteErr := client.Delete(ctx); err != nil {
-						vlog.Errorf("Delete() failed: %v", deleteErr)
-					}
-					return err
-				}
-				if parts[i].Checksum == binary.MissingChecksum {
-					stream.Cancel()
-					continue
-				}
-			}
-			if err := stream.Finish(); err != nil {
-				vlog.Errorf("Finish() failed: %v", err)
-				parts, _, statErr := client.Stat(ctx)
-				if statErr != nil {
-					vlog.Errorf("Stat() failed: %v", statErr)
-					if deleteErr := client.Delete(ctx); err != nil {
-						vlog.Errorf("Delete() failed: %v", deleteErr)
-					}
-					return err
-				}
-				if parts[i].Checksum == binary.MissingChecksum {
-					continue
-				}
-			}
-			success = true
-		}
-		if !success {
-			return verror.Make(errOperationFailed, ctx)
+		if err := uploadPart(ctx, r, client, i, size); err != nil {
+			return err
 		}
 	}
 	return nil
diff --git a/services/mgmt/stats/impl/stats_test.go b/services/mgmt/stats/impl/stats_test.go
index 02178a3..57b00cf 100644
--- a/services/mgmt/stats/impl/stats_test.go
+++ b/services/mgmt/stats/impl/stats_test.go
@@ -97,7 +97,8 @@
 	{
 		noRM := types.ResumeMarker{}
 		_ = noRM
-		stream, err := c.WatchGlob(runtime.NewContext(), types.GlobRequest{Pattern: "testing/foo/bar"})
+		ctx, cancel := runtime.NewContext().WithCancel()
+		stream, err := c.WatchGlob(ctx, types.GlobRequest{Pattern: "testing/foo/bar"})
 		if err != nil {
 			t.Fatalf("c.WatchGlob failed: %v", err)
 		}
@@ -132,7 +133,7 @@
 		if !reflect.DeepEqual(got, expected) {
 			t.Errorf("unexpected result. Got %#v, want %#v", got, expected)
 		}
-		stream.Cancel()
+		cancel()
 
 		if iterator.Advance() {
 			t.Errorf("expected no more stream values, got: %v", iterator.Value())
diff --git a/tools/build/impl.go b/tools/build/impl.go
index e03779f..f684ba0 100644
--- a/tools/build/impl.go
+++ b/tools/build/impl.go
@@ -108,7 +108,7 @@
 	return nil
 }
 
-func getSources(pkgMap map[string]*build.Package, cancel <-chan struct{}, errchan chan<- error) <-chan vbuild.File {
+func getSources(ctx context.T, pkgMap map[string]*build.Package, errchan chan<- error) <-chan vbuild.File {
 	sources := make(chan vbuild.File)
 	go func() {
 		defer close(sources)
@@ -123,8 +123,8 @@
 					}
 					select {
 					case sources <- vbuild.File{Contents: bytes, Name: filepath.Join(pkg.ImportPath, file)}:
-					case <-cancel:
-						errchan <- nil
+					case <-ctx.Done():
+						errchan <- fmt.Errorf("Get sources failed: %v", ctx.Err())
 						return
 					}
 				}
@@ -135,10 +135,13 @@
 	return sources
 }
 
-func invokeBuild(ctx context.T, name string, sources <-chan vbuild.File, cancel <-chan struct{}, errchan chan<- error) <-chan vbuild.File {
+func invokeBuild(ctx context.T, name string, sources <-chan vbuild.File, errchan chan<- error) <-chan vbuild.File {
 	binaries := make(chan vbuild.File)
 	go func() {
 		defer close(binaries)
+		ctx, cancel := ctx.WithCancel()
+		defer cancel()
+
 		client := vbuild.BuilderClient(name)
 		stream, err := client.Build(ctx, vbuild.Architecture(flagArch), vbuild.OperatingSystem(flagOS))
 		if err != nil {
@@ -148,7 +151,6 @@
 		sender := stream.SendStream()
 		for source := range sources {
 			if err := sender.Send(source); err != nil {
-				stream.Cancel()
 				errchan <- fmt.Errorf("Send() failed: %v", err)
 				return
 			}
@@ -159,12 +161,10 @@
 		}
 		iterator := stream.RecvStream()
 		for iterator.Advance() {
-			// TODO(mattr): This custom cancellation can probably be folded into the
-			// cancellation mechanism provided by the context.
 			select {
 			case binaries <- iterator.Value():
-			case <-cancel:
-				errchan <- nil
+			case <-ctx.Done():
+				errchan <- fmt.Errorf("Invoke build failed: %v", ctx.Err())
 				return
 			}
 		}
@@ -181,9 +181,15 @@
 	return binaries
 }
 
-func saveBinaries(prefix string, binaries <-chan vbuild.File, cancel chan<- struct{}, errchan chan<- error) {
+func saveBinaries(ctx context.T, prefix string, binaries <-chan vbuild.File, errchan chan<- error) {
 	go func() {
 		for binary := range binaries {
+			select {
+			case <-ctx.Done():
+				errchan <- fmt.Errorf("Save binaries failed: %v", ctx.Err())
+				return
+			default:
+			}
 			path, perm := filepath.Join(prefix, filepath.Base(binary.Name)), os.FileMode(0755)
 			if err := ioutil.WriteFile(path, binary.Contents, perm); err != nil {
 				errchan <- fmt.Errorf("WriteFile(%v, %v) failed: %v", path, perm, err)
@@ -206,25 +212,22 @@
 	if err := importPackages(paths, pkgMap); err != nil {
 		return err
 	}
-	cancel, errchan := make(chan struct{}), make(chan error)
+	errchan := make(chan error)
 	defer close(errchan)
 
 	ctx, ctxCancel := runtime.NewContext().WithTimeout(time.Minute)
 	defer ctxCancel()
 
 	// Start all stages of the pipeline.
-	sources := getSources(pkgMap, cancel, errchan)
-	binaries := invokeBuild(ctx, name, sources, cancel, errchan)
-	saveBinaries(os.TempDir(), binaries, cancel, errchan)
+	sources := getSources(ctx, pkgMap, errchan)
+	binaries := invokeBuild(ctx, name, sources, errchan)
+	saveBinaries(ctx, os.TempDir(), binaries, errchan)
 	// Wait for all stages of the pipeline to terminate.
-	cancelled, errors, numStages := false, []error{}, 3
+	errors, numStages := []error{}, 3
 	for i := 0; i < numStages; i++ {
 		if err := <-errchan; err != nil {
 			errors = append(errors, err)
-			if !cancelled {
-				close(cancel)
-				cancelled = true
-			}
+			ctxCancel()
 		}
 	}
 	if len(errors) != 0 {
diff --git a/tools/vrpc/test_base/test_base.vdl.go b/tools/vrpc/test_base/test_base.vdl.go
index 1932cfb..fe9a2df 100644
--- a/tools/vrpc/test_base/test_base.vdl.go
+++ b/tools/vrpc/test_base/test_base.vdl.go
@@ -342,18 +342,14 @@
 	// Finish blocks until the server is done, and returns the positional return
 	// values for call.
 	//
-	// Finish returns immediately if Cancel has been called; depending on the
+	// Finish returns immediately if the call has been canceled; depending on the
 	// timing the output could either be an error signaling cancelation, or the
 	// valid positional return values from the server.
 	//
-	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return an error.  Finish should
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
 	// be called at most once.
 	Finish() error
-	// Cancel cancels the RPC, notifying the server to stop processing.  It is
-	// safe to call Cancel concurrently with any of the other stream methods.
-	// Calling Cancel after Finish has returned is a no-op.
-	Cancel()
 }
 
 type implTypeTesterStreamingOutputCall struct {