Merge "veyron/examples/pipetobrowser: Adding couple of new plugins similar to *nix's /dev/null concept. "
diff --git a/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js b/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js
new file mode 100644
index 0000000..bf2b442
--- /dev/null
+++ b/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js
@@ -0,0 +1,20 @@
+var Transform = require('stream').Transform;
+
+/*
+ * A through transform stream that counts number of bytes being piped to it
+ * @param {function} onUpdate Callback function that gets called with number of
+ * bytes read when a chunk is read
+ * @class
+ */
+export class StreamByteCounter extends Transform {
+ constructor(onUpdate) {
+ super();
+ this._onUpdate = onUpdate;
+ }
+
+ _transform(chunk, encoding, cb) {
+ this._onUpdate(chunk.length);
+ this.push(chunk)
+ cb();
+ }
+}
diff --git a/examples/pipetobrowser/browser/services/pipe-to-browser.js b/examples/pipetobrowser/browser/services/pipe-to-browser.js
index 77e955d..3bf580a 100644
--- a/examples/pipetobrowser/browser/services/pipe-to-browser.js
+++ b/examples/pipetobrowser/browser/services/pipe-to-browser.js
@@ -8,6 +8,7 @@
import { Logger } from 'libs/logs/logger'
import { config } from 'config'
import { ByteObjectStreamAdapter } from 'libs/utils/byte-object-stream-adapter'
+import { StreamByteCounter } from 'libs/utils/stream-byte-counter'
var log = new Logger('services/p2b');
var v = new Veyron(config.veyron);
@@ -55,13 +56,21 @@
pipe($suffix, $stream) {
return new Promise(function(resolve, reject) {
log.debug('received pipe request for:', $suffix);
+ var numBytesForThisCall = 0;
var bufferStream = new ByteObjectStreamAdapter();
- $stream.pipe(bufferStream);
+ var streamByteCounter = new StreamByteCounter((numBytesRead) => {
+ // increment total number of bytes received and total for this call
+ numBytesForThisCall += numBytesRead;
+ state.numBytes += numBytesRead;
+ });
+
+ var stream = $stream.pipe(bufferStream).pipe(streamByteCounter);
bufferStream.on('end', () => {
log.debug('end of stream');
- resolve('done');
+ // send total number of bytes received for this call as final result
+ resolve(numBytesForThisCall);
});
bufferStream.on('error', (e) => {
@@ -70,9 +79,8 @@
});
state.numPipes++;
- //TODO(aghassemi) pipe to a byte-size-sniffer to update state.numBytes
- pipeRequestHandler($suffix, bufferStream);
+ pipeRequestHandler($suffix, stream);
});
}
};
@@ -105,4 +113,4 @@
state.reset();
return;
});
-}
\ No newline at end of file
+}
diff --git a/examples/pipetobrowser/p2b.vdl b/examples/pipetobrowser/p2b.vdl
index 204127f..eff991e 100644
--- a/examples/pipetobrowser/p2b.vdl
+++ b/examples/pipetobrowser/p2b.vdl
@@ -2,6 +2,6 @@
// Viewer allows clients to stream data to it and to request a particular viewer to format and display the data.
type Viewer interface {
- // Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
- Pipe() stream<[]byte, _> (string, error)
+ // Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
+ Pipe() stream<[]byte, _> (int64, error)
}
\ No newline at end of file
diff --git a/examples/pipetobrowser/p2b.vdl.go b/examples/pipetobrowser/p2b.vdl.go
index 7e2719d..a9d7a3b 100644
--- a/examples/pipetobrowser/p2b.vdl.go
+++ b/examples/pipetobrowser/p2b.vdl.go
@@ -19,7 +19,7 @@
// Viewer_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
type Viewer_ExcludingUniversal interface {
- // Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
+ // Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
Pipe(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply ViewerPipeStream, err error)
}
type Viewer interface {
@@ -30,8 +30,8 @@
// ViewerService is the interface the server implements.
type ViewerService interface {
- // Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
- Pipe(context _gen_ipc.ServerContext, stream ViewerServicePipeStream) (reply string, err error)
+ // Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
+ Pipe(context _gen_ipc.ServerContext, stream ViewerServicePipeStream) (reply int64, err error)
}
// ViewerPipeStream is the interface for streaming responses of the method
@@ -50,7 +50,7 @@
// Finish closes the stream and returns the positional return values for
// call.
- Finish() (reply string, err error)
+ Finish() (reply int64, err error)
// Cancel cancels the RPC, notifying the server to stop processing.
Cancel()
@@ -69,7 +69,7 @@
return c.clientCall.CloseSend()
}
-func (c *implViewerPipeStream) Finish() (reply string, err error) {
+func (c *implViewerPipeStream) Finish() (reply int64, err error) {
if ierr := c.clientCall.Finish(&reply, &err); ierr != nil {
err = ierr
}
@@ -208,7 +208,7 @@
result.Methods["Pipe"] = _gen_ipc.MethodSignature{
InArgs: []_gen_ipc.MethodArgument{},
OutArgs: []_gen_ipc.MethodArgument{
- {Name: "", Type: 3},
+ {Name: "", Type: 37},
{Name: "", Type: 65},
},
InStream: 67,
@@ -238,7 +238,7 @@
return
}
-func (__gen_s *ServerStubViewer) Pipe(call _gen_ipc.ServerCall) (reply string, err error) {
+func (__gen_s *ServerStubViewer) Pipe(call _gen_ipc.ServerCall) (reply int64, err error) {
stream := &implViewerServicePipeStream{serverCall: call}
reply, err = __gen_s.service.Pipe(call, stream)
return
diff --git a/examples/pipetobrowser/p2b/main.go b/examples/pipetobrowser/p2b/main.go
index 4a7a10a..d368b17 100644
--- a/examples/pipetobrowser/p2b/main.go
+++ b/examples/pipetobrowser/p2b/main.go
@@ -87,8 +87,11 @@
return
}
- log.Infof("Stream finished with status: %v", result)
- log.Infof("Total of %d bytes were piped to browser", numBytes)
+ if numBytes != result {
+ log.Infof("*** number of bytes sent and received do NOT match ***")
+ }
+ log.Infof("%d bytes were piped to browser", numBytes)
+ log.Infof("%d bytes were received by browser", result)
fmt.Println("Finished piping to browser! Thanks for using p2b.")
}
diff --git a/services/security/discharger.vdl.go b/services/security/discharger.vdl.go
index b24b968..34ab8fc 100644
--- a/services/security/discharger.vdl.go
+++ b/services/security/discharger.vdl.go
@@ -1,7 +1,7 @@
// This file was auto-generated by the veyron vdl tool.
// Source: discharger.vdl
-package discharge
+package security
import (
"veyron2/security"
@@ -16,7 +16,7 @@
_gen_wiretype "veyron2/wiretype"
)
-// DischargeIssuer service issues caveat discharges when requested
+// DischargeIssuer service issues caveat discharges when requested.
// Discharger is the interface the client binds and uses.
// Discharger_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.