veyron/veyron2/ipc: Call should n ot have a Cancel method.
ctx.Cancel should be used instead.
Change-Id: I6809fd8723de288f07b327a35b62fd69e6d39eac
MultiPart: 1/2
diff --git a/runtimes/google/ipc/benchmarks/service.vdl.go b/runtimes/google/ipc/benchmarks/service.vdl.go
index 6fac31d..f2f23e8 100644
--- a/runtimes/google/ipc/benchmarks/service.vdl.go
+++ b/runtimes/google/ipc/benchmarks/service.vdl.go
@@ -107,17 +107,19 @@
}
// SendStream returns the send side of the Benchmark.EchoStream client stream.
SendStream() interface {
- // Send places the item onto the output stream. Returns errors encountered
- // while sending, or if Send is called after Close or Cancel. Blocks if
- // there is no buffer space; will unblock when buffer space is available or
- // after Cancel.
+ // Send places the item onto the output stream. Returns errors
+ // encountered while sending, or if Send is called after Close or
+ // the stream has been canceled. Blocks if there is no buffer
+ // space; will unblock when buffer space is available or after
+ // the stream has been canceled.
Send(item []byte) error
- // Close indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. This is an optional
- // call - e.g. a client might call Close if it needs to continue receiving
- // items from the server after it's done sending. Returns errors
- // encountered while closing, or if Close is called after Cancel. Like
- // Send, blocks if there is no buffer space available.
+ // Close indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items.
+ // This is an optional call - e.g. a client might call Close if it
+ // needs to continue receiving items from the server after it's
+ // done sending. Returns errors encountered while closing, or if
+ // Close is called after the stream has been canceled. Like Send,
+ // blocks if there is no buffer space available.
Close() error
}
}
@@ -128,18 +130,14 @@
// Finish performs the equivalent of SendStream().Close, then blocks until
// the server is done, and returns the positional return values for the call.
//
- // Finish returns immediately if Cancel has been called; depending on the
+ // Finish returns immediately if the call has been canceled; depending on the
// timing the output could either be an error signaling cancelation, or the
// valid positional return values from the server.
//
- // Calling Finish is mandatory for releasing stream resources, unless Cancel
- // has been called or any of the other methods return an error. Finish should
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
// be called at most once.
Finish() error
- // Cancel cancels the RPC, notifying the server to stop processing. It is
- // safe to call Cancel concurrently with any of the other stream methods.
- // Calling Cancel after Finish has returned is a no-op.
- Cancel()
}
type implBenchmarkEchoStreamCall struct {
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 4891254..8475dea 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -501,7 +501,8 @@
go func() {
select {
case <-ctx.Done():
- fc.Cancel()
+ ivtrace.FromContext(fc.ctx).Annotate("Cancelled")
+ fc.flow.Cancel()
case <-fc.flow.Closed():
}
}()
@@ -901,12 +902,6 @@
return fc.close(nil)
}
-func (fc *flowClient) Cancel() {
- defer vlog.LogCall()()
- ivtrace.FromContext(fc.ctx).Annotate("Cancelled")
- fc.flow.Cancel()
-}
-
func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
return fc.server, fc.flow.RemoteBlessings()
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 746b578..5f03ee0 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -15,6 +15,7 @@
"testing"
"time"
+ "veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
@@ -1090,9 +1091,9 @@
return nil
}
-func waitForCancel(t *testing.T, ts *cancelTestServer, call ipc.Call) {
+func waitForCancel(t *testing.T, ts *cancelTestServer, cancel context.CancelFunc) {
<-ts.started
- call.Cancel()
+ cancel()
<-ts.cancelled
}
@@ -1102,11 +1103,12 @@
b := createBundle(t, tsecurity.NewPrincipal("client"), tsecurity.NewPrincipal("server"), ts)
defer b.cleanup(t)
- call, err := b.client.StartCall(testContext(), "mountpoint/server/suffix", "CancelStreamReader", []interface{}{})
+ ctx, cancel := testContext().WithCancel()
+ _, err := b.client.StartCall(ctx, "mountpoint/server/suffix", "CancelStreamReader", []interface{}{})
if err != nil {
t.Fatalf("Start call failed: %v", err)
}
- waitForCancel(t, ts, call)
+ waitForCancel(t, ts, cancel)
}
// TestCancelWithFullBuffers tests that even if the writer has filled the buffers and
@@ -1116,7 +1118,8 @@
b := createBundle(t, tsecurity.NewPrincipal("client"), tsecurity.NewPrincipal("server"), ts)
defer b.cleanup(t)
- call, err := b.client.StartCall(testContext(), "mountpoint/server/suffix", "CancelStreamIgnorer", []interface{}{})
+ ctx, cancel := testContext().WithCancel()
+ call, err := b.client.StartCall(ctx, "mountpoint/server/suffix", "CancelStreamIgnorer", []interface{}{})
if err != nil {
t.Fatalf("Start call failed: %v", err)
}
@@ -1125,7 +1128,7 @@
call.Send(make([]byte, vc.MaxSharedBytes))
call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
- waitForCancel(t, ts, call)
+ waitForCancel(t, ts, cancel)
}
type streamRecvInGoroutineServer struct{ c chan error }
diff --git a/services/mgmt/build/impl/impl_test.go b/services/mgmt/build/impl/impl_test.go
index 85b4773..6c975c5 100644
--- a/services/mgmt/build/impl/impl_test.go
+++ b/services/mgmt/build/impl/impl_test.go
@@ -76,7 +76,9 @@
func invokeBuild(t *testing.T, client build.BuilderClientMethods, files []build.File) ([]byte, []build.File, error) {
arch, opsys := getArch(), getOS()
- stream, err := client.Build(globalRT.NewContext(), arch, opsys)
+ ctx, cancel := globalRT.NewContext().WithCancel()
+ defer cancel()
+ stream, err := client.Build(ctx, arch, opsys)
if err != nil {
t.Errorf("Build(%v, %v) failed: %v", err, arch, opsys)
return nil, nil, err
@@ -85,13 +87,11 @@
for _, file := range files {
if err := sender.Send(file); err != nil {
t.Logf("Send() failed: %v", err)
- stream.Cancel()
return nil, nil, err
}
}
if err := sender.Close(); err != nil {
t.Logf("Close() failed: %v", err)
- stream.Cancel()
return nil, nil, err
}
bins := make([]build.File, 0)
@@ -106,7 +106,6 @@
output, err := stream.Finish()
if err != nil {
t.Logf("Finish() failed: %v", err)
- stream.Cancel()
return nil, nil, err
}
return output, bins, nil
diff --git a/services/mgmt/lib/binary/impl.go b/services/mgmt/lib/binary/impl.go
index e66a2a6..1013992 100644
--- a/services/mgmt/lib/binary/impl.go
+++ b/services/mgmt/lib/binary/impl.go
@@ -45,6 +45,65 @@
return nil
}
+type indexedPart struct {
+ part binary.PartInfo
+ index int
+ offset int64
+}
+
+func downloadPartAttempt(ctx context.T, w io.WriteSeeker, client repository.BinaryClientStub, ip *indexedPart) bool {
+ ctx, cancel := ctx.WithCancel()
+ defer cancel()
+
+ if _, err := w.Seek(ip.offset, 0); err != nil {
+ vlog.Errorf("Seek(%v, 0) failed: %v", ip.offset, err)
+ return false
+ }
+ stream, err := client.Download(ctx, int32(ip.index))
+ if err != nil {
+ vlog.Errorf("Download(%v) failed: %v", ip.index, err)
+ return false
+ }
+ h, nreceived := md5.New(), 0
+ rStream := stream.RecvStream()
+ for rStream.Advance() {
+ bytes := rStream.Value()
+ if _, err := w.Write(bytes); err != nil {
+ vlog.Errorf("Write() failed: %v", err)
+ return false
+ }
+ h.Write(bytes)
+ nreceived += len(bytes)
+ }
+
+ if err := rStream.Err(); err != nil {
+ vlog.Errorf("Advance() failed: %v", err)
+ return false
+ }
+ if err := stream.Finish(); err != nil {
+ vlog.Errorf("Finish() failed: %v", err)
+ return false
+ }
+ if expected, got := ip.part.Checksum, hex.EncodeToString(h.Sum(nil)); expected != got {
+ vlog.Errorf("Unexpected checksum: expected %v, got %v", expected, got)
+ return false
+ }
+ if expected, got := ip.part.Size, int64(nreceived); expected != got {
+ vlog.Errorf("Unexpected size: expected %v, got %v", expected, got)
+ return false
+ }
+ return true
+}
+
+func downloadPart(ctx context.T, w io.WriteSeeker, client repository.BinaryClientStub, ip *indexedPart) bool {
+ for i := 0; i < nAttempts; i++ {
+ if downloadPartAttempt(ctx, w, client, ip) {
+ return true
+ }
+ }
+ return false
+}
+
func download(ctx context.T, w io.WriteSeeker, von string) (repository.MediaInfo, error) {
client := repository.BinaryClient(von)
parts, mediaInfo, err := client.Stat(ctx)
@@ -57,54 +116,10 @@
return repository.MediaInfo{}, verror.Make(verror.NoExist, ctx)
}
}
- offset, whence := int64(0), 0
+ offset := int64(0)
for i, part := range parts {
- success := false
- download:
- for j := 0; !success && j < nAttempts; j++ {
- if _, err := w.Seek(offset, whence); err != nil {
- vlog.Errorf("Seek(%v, %v) failed: %v", offset, whence, err)
- continue
- }
- stream, err := client.Download(ctx, int32(i))
- if err != nil {
- vlog.Errorf("Download(%v) failed: %v", i, err)
- continue
- }
- h, nreceived := md5.New(), 0
- rStream := stream.RecvStream()
- for rStream.Advance() {
- bytes := rStream.Value()
- if _, err := w.Write(bytes); err != nil {
- vlog.Errorf("Write() failed: %v", err)
- stream.Cancel()
- continue download
- }
- h.Write(bytes)
- nreceived += len(bytes)
- }
-
- if err := rStream.Err(); err != nil {
- vlog.Errorf("Advance() failed: %v", err)
- stream.Cancel()
- continue download
-
- }
- if err := stream.Finish(); err != nil {
- vlog.Errorf("Finish() failed: %v", err)
- continue
- }
- if expected, got := part.Checksum, hex.EncodeToString(h.Sum(nil)); expected != got {
- vlog.Errorf("Unexpected checksum: expected %v, got %v", expected, got)
- continue
- }
- if expected, got := part.Size, int64(nreceived); expected != got {
- vlog.Errorf("Unexpected size: expected %v, got %v", expected, got)
- continue
- }
- success = true
- }
- if !success {
+ ip := &indexedPart{part, i, offset}
+ if !downloadPart(ctx, w, client, ip) {
return repository.MediaInfo{}, verror.Make(errOperationFailed, ctx)
}
offset += part.Size
@@ -188,6 +203,86 @@
return url, ttl, nil
}
+func uploadPartAttempt(ctx context.T, r io.ReadSeeker, client repository.BinaryClientStub, part int, size int64) (bool, error) {
+ ctx, cancel := ctx.WithCancel()
+ defer cancel()
+
+ offset := int64(part * partSize)
+ if _, err := r.Seek(offset, 0); err != nil {
+ vlog.Errorf("Seek(%v, 0) failed: %v", offset, err)
+ return false, nil
+ }
+ stream, err := client.Upload(ctx, int32(part))
+ if err != nil {
+ vlog.Errorf("Upload(%v) failed: %v", part, err)
+ return false, nil
+ }
+ bufferSize := partSize
+ if remaining := size - offset; remaining < int64(bufferSize) {
+ bufferSize = int(remaining)
+ }
+ buffer := make([]byte, bufferSize)
+
+ nread := 0
+ for nread < len(buffer) {
+ n, err := r.Read(buffer[nread:])
+ nread += n
+ if err != nil && (err != io.EOF || nread < len(buffer)) {
+ vlog.Errorf("Read() failed: %v", err)
+ return false, nil
+ }
+ }
+ sender := stream.SendStream()
+ for from := 0; from < len(buffer); from += subpartSize {
+ to := from + subpartSize
+ if to > len(buffer) {
+ to = len(buffer)
+ }
+ if err := sender.Send(buffer[from:to]); err != nil {
+ vlog.Errorf("Send() failed: %v", err)
+ return false, nil
+ }
+ }
+ if err := sender.Close(); err != nil {
+ vlog.Errorf("Close() failed: %v", err)
+ parts, _, statErr := client.Stat(ctx)
+ if statErr != nil {
+ vlog.Errorf("Stat() failed: %v", statErr)
+ if deleteErr := client.Delete(ctx); err != nil {
+ vlog.Errorf("Delete() failed: %v", deleteErr)
+ }
+ return false, err
+ }
+ if parts[part].Checksum == binary.MissingChecksum {
+ return false, nil
+ }
+ }
+ if err := stream.Finish(); err != nil {
+ vlog.Errorf("Finish() failed: %v", err)
+ parts, _, statErr := client.Stat(ctx)
+ if statErr != nil {
+ vlog.Errorf("Stat() failed: %v", statErr)
+ if deleteErr := client.Delete(ctx); err != nil {
+ vlog.Errorf("Delete() failed: %v", deleteErr)
+ }
+ return false, err
+ }
+ if parts[part].Checksum == binary.MissingChecksum {
+ return false, nil
+ }
+ }
+ return true, nil
+}
+
+func uploadPart(ctx context.T, r io.ReadSeeker, client repository.BinaryClientStub, part int, size int64) error {
+ for i := 0; i < nAttempts; i++ {
+ if success, err := uploadPartAttempt(ctx, r, client, part, size); success || err != nil {
+ return err
+ }
+ }
+ return verror.Make(errOperationFailed, ctx)
+}
+
func upload(ctx context.T, r io.ReadSeeker, mediaInfo repository.MediaInfo, von string) error {
client := repository.BinaryClient(von)
offset, whence := int64(0), 2
@@ -202,78 +297,8 @@
return err
}
for i := 0; int64(i) < nparts; i++ {
- success := false
- upload:
- for j := 0; !success && j < nAttempts; j++ {
- offset, whence := int64(i*partSize), 0
- if _, err := r.Seek(offset, whence); err != nil {
- vlog.Errorf("Seek(%v, %v) failed: %v", offset, whence, err)
- continue
- }
- stream, err := client.Upload(ctx, int32(i))
- if err != nil {
- vlog.Errorf("Upload(%v) failed: %v", i, err)
- continue
- }
- buffer := make([]byte, partSize)
- if int64(i+1) == nparts {
- buffer = buffer[:(size % partSize)]
- }
- nread := 0
- for nread < len(buffer) {
- n, err := r.Read(buffer[nread:])
- nread += n
- if err != nil && (err != io.EOF || nread < len(buffer)) {
- vlog.Errorf("Read() failed: %v", err)
- stream.Cancel()
- continue upload
- }
- }
- sender := stream.SendStream()
- for from := 0; from < len(buffer); from += subpartSize {
- to := from + subpartSize
- if to > len(buffer) {
- to = len(buffer)
- }
- if err := sender.Send(buffer[from:to]); err != nil {
- vlog.Errorf("Send() failed: %v", err)
- stream.Cancel()
- continue upload
- }
- }
- if err := sender.Close(); err != nil {
- vlog.Errorf("Close() failed: %v", err)
- parts, _, statErr := client.Stat(ctx)
- if statErr != nil {
- vlog.Errorf("Stat() failed: %v", statErr)
- if deleteErr := client.Delete(ctx); err != nil {
- vlog.Errorf("Delete() failed: %v", deleteErr)
- }
- return err
- }
- if parts[i].Checksum == binary.MissingChecksum {
- stream.Cancel()
- continue
- }
- }
- if err := stream.Finish(); err != nil {
- vlog.Errorf("Finish() failed: %v", err)
- parts, _, statErr := client.Stat(ctx)
- if statErr != nil {
- vlog.Errorf("Stat() failed: %v", statErr)
- if deleteErr := client.Delete(ctx); err != nil {
- vlog.Errorf("Delete() failed: %v", deleteErr)
- }
- return err
- }
- if parts[i].Checksum == binary.MissingChecksum {
- continue
- }
- }
- success = true
- }
- if !success {
- return verror.Make(errOperationFailed, ctx)
+ if err := uploadPart(ctx, r, client, i, size); err != nil {
+ return err
}
}
return nil
diff --git a/services/mgmt/stats/impl/stats_test.go b/services/mgmt/stats/impl/stats_test.go
index 02178a3..57b00cf 100644
--- a/services/mgmt/stats/impl/stats_test.go
+++ b/services/mgmt/stats/impl/stats_test.go
@@ -97,7 +97,8 @@
{
noRM := types.ResumeMarker{}
_ = noRM
- stream, err := c.WatchGlob(runtime.NewContext(), types.GlobRequest{Pattern: "testing/foo/bar"})
+ ctx, cancel := runtime.NewContext().WithCancel()
+ stream, err := c.WatchGlob(ctx, types.GlobRequest{Pattern: "testing/foo/bar"})
if err != nil {
t.Fatalf("c.WatchGlob failed: %v", err)
}
@@ -132,7 +133,7 @@
if !reflect.DeepEqual(got, expected) {
t.Errorf("unexpected result. Got %#v, want %#v", got, expected)
}
- stream.Cancel()
+ cancel()
if iterator.Advance() {
t.Errorf("expected no more stream values, got: %v", iterator.Value())
diff --git a/tools/build/impl.go b/tools/build/impl.go
index e03779f..f684ba0 100644
--- a/tools/build/impl.go
+++ b/tools/build/impl.go
@@ -108,7 +108,7 @@
return nil
}
-func getSources(pkgMap map[string]*build.Package, cancel <-chan struct{}, errchan chan<- error) <-chan vbuild.File {
+func getSources(ctx context.T, pkgMap map[string]*build.Package, errchan chan<- error) <-chan vbuild.File {
sources := make(chan vbuild.File)
go func() {
defer close(sources)
@@ -123,8 +123,8 @@
}
select {
case sources <- vbuild.File{Contents: bytes, Name: filepath.Join(pkg.ImportPath, file)}:
- case <-cancel:
- errchan <- nil
+ case <-ctx.Done():
+ errchan <- fmt.Errorf("Get sources failed: %v", ctx.Err())
return
}
}
@@ -135,10 +135,13 @@
return sources
}
-func invokeBuild(ctx context.T, name string, sources <-chan vbuild.File, cancel <-chan struct{}, errchan chan<- error) <-chan vbuild.File {
+func invokeBuild(ctx context.T, name string, sources <-chan vbuild.File, errchan chan<- error) <-chan vbuild.File {
binaries := make(chan vbuild.File)
go func() {
defer close(binaries)
+ ctx, cancel := ctx.WithCancel()
+ defer cancel()
+
client := vbuild.BuilderClient(name)
stream, err := client.Build(ctx, vbuild.Architecture(flagArch), vbuild.OperatingSystem(flagOS))
if err != nil {
@@ -148,7 +151,6 @@
sender := stream.SendStream()
for source := range sources {
if err := sender.Send(source); err != nil {
- stream.Cancel()
errchan <- fmt.Errorf("Send() failed: %v", err)
return
}
@@ -159,12 +161,10 @@
}
iterator := stream.RecvStream()
for iterator.Advance() {
- // TODO(mattr): This custom cancellation can probably be folded into the
- // cancellation mechanism provided by the context.
select {
case binaries <- iterator.Value():
- case <-cancel:
- errchan <- nil
+ case <-ctx.Done():
+ errchan <- fmt.Errorf("Invoke build failed: %v", ctx.Err())
return
}
}
@@ -181,9 +181,15 @@
return binaries
}
-func saveBinaries(prefix string, binaries <-chan vbuild.File, cancel chan<- struct{}, errchan chan<- error) {
+func saveBinaries(ctx context.T, prefix string, binaries <-chan vbuild.File, errchan chan<- error) {
go func() {
for binary := range binaries {
+ select {
+ case <-ctx.Done():
+ errchan <- fmt.Errorf("Save binaries failed: %v", ctx.Err())
+ return
+ default:
+ }
path, perm := filepath.Join(prefix, filepath.Base(binary.Name)), os.FileMode(0755)
if err := ioutil.WriteFile(path, binary.Contents, perm); err != nil {
errchan <- fmt.Errorf("WriteFile(%v, %v) failed: %v", path, perm, err)
@@ -206,25 +212,22 @@
if err := importPackages(paths, pkgMap); err != nil {
return err
}
- cancel, errchan := make(chan struct{}), make(chan error)
+ errchan := make(chan error)
defer close(errchan)
ctx, ctxCancel := runtime.NewContext().WithTimeout(time.Minute)
defer ctxCancel()
// Start all stages of the pipeline.
- sources := getSources(pkgMap, cancel, errchan)
- binaries := invokeBuild(ctx, name, sources, cancel, errchan)
- saveBinaries(os.TempDir(), binaries, cancel, errchan)
+ sources := getSources(ctx, pkgMap, errchan)
+ binaries := invokeBuild(ctx, name, sources, errchan)
+ saveBinaries(ctx, os.TempDir(), binaries, errchan)
// Wait for all stages of the pipeline to terminate.
- cancelled, errors, numStages := false, []error{}, 3
+ errors, numStages := []error{}, 3
for i := 0; i < numStages; i++ {
if err := <-errchan; err != nil {
errors = append(errors, err)
- if !cancelled {
- close(cancel)
- cancelled = true
- }
+ ctxCancel()
}
}
if len(errors) != 0 {
diff --git a/tools/vrpc/test_base/test_base.vdl.go b/tools/vrpc/test_base/test_base.vdl.go
index 1932cfb..fe9a2df 100644
--- a/tools/vrpc/test_base/test_base.vdl.go
+++ b/tools/vrpc/test_base/test_base.vdl.go
@@ -342,18 +342,14 @@
// Finish blocks until the server is done, and returns the positional return
// values for call.
//
- // Finish returns immediately if Cancel has been called; depending on the
+ // Finish returns immediately if the call has been canceled; depending on the
// timing the output could either be an error signaling cancelation, or the
// valid positional return values from the server.
//
- // Calling Finish is mandatory for releasing stream resources, unless Cancel
- // has been called or any of the other methods return an error. Finish should
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
// be called at most once.
Finish() error
- // Cancel cancels the RPC, notifying the server to stop processing. It is
- // safe to call Cancel concurrently with any of the other stream methods.
- // Calling Cancel after Finish has returned is a no-op.
- Cancel()
}
type implTypeTesterStreamingOutputCall struct {