veyron/examples/pipe-to-browser: Adding a stream-byte-counter that will
work as a meter to count number of bytes coming in. We use this through stream
to update the total number of bytes read and update the Status page UI accordingly.
Also total number of bytes read in the particular call itself is sent back
to Go client after all steaming ends for book-keeping.
Change-Id: I39b088f8d2ebceba383f3d3ef8064931fc27dbf2
diff --git a/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js b/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js
new file mode 100644
index 0000000..bf2b442
--- /dev/null
+++ b/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js
@@ -0,0 +1,20 @@
+var Transform = require('stream').Transform;
+
+/*
+ * A through transform stream that counts number of bytes being piped to it
+ * @param {function} onUpdate Callback function that gets called with number of
+ * bytes read when a chunk is read
+ * @class
+ */
+export class StreamByteCounter extends Transform {
+ constructor(onUpdate) {
+ super();
+ this._onUpdate = onUpdate;
+ }
+
+ _transform(chunk, encoding, cb) {
+ this._onUpdate(chunk.length);
+ this.push(chunk)
+ cb();
+ }
+}
diff --git a/examples/pipetobrowser/browser/services/pipe-to-browser.js b/examples/pipetobrowser/browser/services/pipe-to-browser.js
index b9b241e..b616aca 100644
--- a/examples/pipetobrowser/browser/services/pipe-to-browser.js
+++ b/examples/pipetobrowser/browser/services/pipe-to-browser.js
@@ -8,6 +8,7 @@
import { Logger } from 'libs/logs/logger'
import { config } from 'config'
import { ByteObjectStreamAdapter } from 'libs/utils/byte-object-stream-adapter'
+import { StreamByteCounter } from 'libs/utils/stream-byte-counter'
var log = new Logger('services/p2b');
var v = new Veyron(config.veyron);
@@ -53,13 +54,21 @@
pipe($suffix, $stream) {
return new Promise(function(resolve, reject) {
log.debug('received pipe request for:', $suffix);
+ var numBytesForThisCall = 0;
var bufferStream = new ByteObjectStreamAdapter();
- $stream.pipe(bufferStream);
+ var streamByteCounter = new StreamByteCounter((numBytesRead) => {
+ // increment total number of bytes received and total for this call
+ numBytesForThisCall += numBytesRead;
+ state.numBytes += numBytesRead;
+ });
+
+ var stream = $stream.pipe(bufferStream).pipe(streamByteCounter);
bufferStream.on('end', () => {
log.debug('end of stream');
- resolve('done');
+ // send total number of bytes received for this call as final result
+ resolve(numBytesForThisCall);
});
bufferStream.on('error', (e) => {
@@ -68,9 +77,8 @@
});
state.numPipes++;
- //TODO(aghassemi) pipe to a byte-size-sniffer to update state.numBytes
- pipeRequestHandler($suffix, bufferStream);
+ pipeRequestHandler($suffix, stream);
});
}
};
diff --git a/examples/pipetobrowser/p2b.vdl b/examples/pipetobrowser/p2b.vdl
index 204127f..eff991e 100644
--- a/examples/pipetobrowser/p2b.vdl
+++ b/examples/pipetobrowser/p2b.vdl
@@ -2,6 +2,6 @@
// Viewer allows clients to stream data to it and to request a particular viewer to format and display the data.
type Viewer interface {
- // Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
- Pipe() stream<[]byte, _> (string, error)
+ // Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
+ Pipe() stream<[]byte, _> (int64, error)
}
\ No newline at end of file
diff --git a/examples/pipetobrowser/p2b.vdl.go b/examples/pipetobrowser/p2b.vdl.go
index 7e2719d..a9d7a3b 100644
--- a/examples/pipetobrowser/p2b.vdl.go
+++ b/examples/pipetobrowser/p2b.vdl.go
@@ -19,7 +19,7 @@
// Viewer_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
type Viewer_ExcludingUniversal interface {
- // Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
+ // Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
Pipe(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply ViewerPipeStream, err error)
}
type Viewer interface {
@@ -30,8 +30,8 @@
// ViewerService is the interface the server implements.
type ViewerService interface {
- // Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
- Pipe(context _gen_ipc.ServerContext, stream ViewerServicePipeStream) (reply string, err error)
+ // Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
+ Pipe(context _gen_ipc.ServerContext, stream ViewerServicePipeStream) (reply int64, err error)
}
// ViewerPipeStream is the interface for streaming responses of the method
@@ -50,7 +50,7 @@
// Finish closes the stream and returns the positional return values for
// call.
- Finish() (reply string, err error)
+ Finish() (reply int64, err error)
// Cancel cancels the RPC, notifying the server to stop processing.
Cancel()
@@ -69,7 +69,7 @@
return c.clientCall.CloseSend()
}
-func (c *implViewerPipeStream) Finish() (reply string, err error) {
+func (c *implViewerPipeStream) Finish() (reply int64, err error) {
if ierr := c.clientCall.Finish(&reply, &err); ierr != nil {
err = ierr
}
@@ -208,7 +208,7 @@
result.Methods["Pipe"] = _gen_ipc.MethodSignature{
InArgs: []_gen_ipc.MethodArgument{},
OutArgs: []_gen_ipc.MethodArgument{
- {Name: "", Type: 3},
+ {Name: "", Type: 37},
{Name: "", Type: 65},
},
InStream: 67,
@@ -238,7 +238,7 @@
return
}
-func (__gen_s *ServerStubViewer) Pipe(call _gen_ipc.ServerCall) (reply string, err error) {
+func (__gen_s *ServerStubViewer) Pipe(call _gen_ipc.ServerCall) (reply int64, err error) {
stream := &implViewerServicePipeStream{serverCall: call}
reply, err = __gen_s.service.Pipe(call, stream)
return
diff --git a/examples/pipetobrowser/p2b/main.go b/examples/pipetobrowser/p2b/main.go
index 4a7a10a..d368b17 100644
--- a/examples/pipetobrowser/p2b/main.go
+++ b/examples/pipetobrowser/p2b/main.go
@@ -87,8 +87,11 @@
return
}
- log.Infof("Stream finished with status: %v", result)
- log.Infof("Total of %d bytes were piped to browser", numBytes)
+ if numBytes != result {
+ log.Infof("*** number of bytes sent and received do NOT match ***")
+ }
+ log.Infof("%d bytes were piped to browser", numBytes)
+ log.Infof("%d bytes were received by browser", result)
fmt.Println("Finished piping to browser! Thanks for using p2b.")
}