Merge "veyron/examples/pipetobrowser: Adding couple of new plugins similar to *nix's /dev/null concept. "
diff --git a/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js b/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js
new file mode 100644
index 0000000..bf2b442
--- /dev/null
+++ b/examples/pipetobrowser/browser/libs/utils/stream-byte-counter.js
@@ -0,0 +1,20 @@
+var Transform = require('stream').Transform;
+
+/*
+ * A through transform stream that counts number of bytes being piped to it
+ * @param {function} onUpdate Callback function that gets called with number of
+ * bytes read when a chunk is read
+ * @class
+ */
+export class StreamByteCounter extends Transform {
+  constructor(onUpdate) {
+    super();
+    this._onUpdate = onUpdate;
+  }
+
+  _transform(chunk, encoding, cb) {
+    this._onUpdate(chunk.length);
+    this.push(chunk)
+    cb();
+  }
+}
diff --git a/examples/pipetobrowser/browser/services/pipe-to-browser.js b/examples/pipetobrowser/browser/services/pipe-to-browser.js
index 77e955d..3bf580a 100644
--- a/examples/pipetobrowser/browser/services/pipe-to-browser.js
+++ b/examples/pipetobrowser/browser/services/pipe-to-browser.js
@@ -8,6 +8,7 @@
 import { Logger } from 'libs/logs/logger'
 import { config } from 'config'
 import { ByteObjectStreamAdapter } from 'libs/utils/byte-object-stream-adapter'
+import { StreamByteCounter } from 'libs/utils/stream-byte-counter'
 
 var log = new Logger('services/p2b');
 var v = new Veyron(config.veyron);
@@ -55,13 +56,21 @@
     pipe($suffix, $stream) {
       return new Promise(function(resolve, reject) {
         log.debug('received pipe request for:', $suffix);
+        var numBytesForThisCall = 0;
 
         var bufferStream = new ByteObjectStreamAdapter();
-        $stream.pipe(bufferStream);
+        var streamByteCounter = new StreamByteCounter((numBytesRead) => {
+          // increment total number of bytes received and total for this call
+          numBytesForThisCall += numBytesRead;
+          state.numBytes += numBytesRead;
+        });
+
+        var stream = $stream.pipe(bufferStream).pipe(streamByteCounter);
 
         bufferStream.on('end', () => {
           log.debug('end of stream');
-          resolve('done');
+          // send total number of bytes received for this call as final result
+          resolve(numBytesForThisCall);
         });
 
         bufferStream.on('error', (e) => {
@@ -70,9 +79,8 @@
         });
 
         state.numPipes++;
-        //TODO(aghassemi) pipe to a byte-size-sniffer to update state.numBytes
 
-        pipeRequestHandler($suffix, bufferStream);
+        pipeRequestHandler($suffix, stream);
       });
     }
   };
@@ -105,4 +113,4 @@
     state.reset();
     return;
   });
-}
\ No newline at end of file
+}
diff --git a/examples/pipetobrowser/p2b.vdl b/examples/pipetobrowser/p2b.vdl
index 204127f..eff991e 100644
--- a/examples/pipetobrowser/p2b.vdl
+++ b/examples/pipetobrowser/p2b.vdl
@@ -2,6 +2,6 @@
 
 // Viewer allows clients to stream data to it and to request a particular viewer to format and display the data.
 type Viewer interface {
-  // Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
-  Pipe() stream<[]byte, _> (string, error)
+  // Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
+  Pipe() stream<[]byte, _> (int64, error)
 }
\ No newline at end of file
diff --git a/examples/pipetobrowser/p2b.vdl.go b/examples/pipetobrowser/p2b.vdl.go
index 7e2719d..a9d7a3b 100644
--- a/examples/pipetobrowser/p2b.vdl.go
+++ b/examples/pipetobrowser/p2b.vdl.go
@@ -19,7 +19,7 @@
 // Viewer_ExcludingUniversal is the interface without internal framework-added methods
 // to enable embedding without method collisions.  Not to be used directly by clients.
 type Viewer_ExcludingUniversal interface {
-	// Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
+	// Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
 	Pipe(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply ViewerPipeStream, err error)
 }
 type Viewer interface {
@@ -30,8 +30,8 @@
 // ViewerService is the interface the server implements.
 type ViewerService interface {
 
-	// Pipe creates a bidirectional pipe between client and viewer service, returns a success message provided by the client
-	Pipe(context _gen_ipc.ServerContext, stream ViewerServicePipeStream) (reply string, err error)
+	// Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
+	Pipe(context _gen_ipc.ServerContext, stream ViewerServicePipeStream) (reply int64, err error)
 }
 
 // ViewerPipeStream is the interface for streaming responses of the method
@@ -50,7 +50,7 @@
 
 	// Finish closes the stream and returns the positional return values for
 	// call.
-	Finish() (reply string, err error)
+	Finish() (reply int64, err error)
 
 	// Cancel cancels the RPC, notifying the server to stop processing.
 	Cancel()
@@ -69,7 +69,7 @@
 	return c.clientCall.CloseSend()
 }
 
-func (c *implViewerPipeStream) Finish() (reply string, err error) {
+func (c *implViewerPipeStream) Finish() (reply int64, err error) {
 	if ierr := c.clientCall.Finish(&reply, &err); ierr != nil {
 		err = ierr
 	}
@@ -208,7 +208,7 @@
 	result.Methods["Pipe"] = _gen_ipc.MethodSignature{
 		InArgs: []_gen_ipc.MethodArgument{},
 		OutArgs: []_gen_ipc.MethodArgument{
-			{Name: "", Type: 3},
+			{Name: "", Type: 37},
 			{Name: "", Type: 65},
 		},
 		InStream: 67,
@@ -238,7 +238,7 @@
 	return
 }
 
-func (__gen_s *ServerStubViewer) Pipe(call _gen_ipc.ServerCall) (reply string, err error) {
+func (__gen_s *ServerStubViewer) Pipe(call _gen_ipc.ServerCall) (reply int64, err error) {
 	stream := &implViewerServicePipeStream{serverCall: call}
 	reply, err = __gen_s.service.Pipe(call, stream)
 	return
diff --git a/examples/pipetobrowser/p2b/main.go b/examples/pipetobrowser/p2b/main.go
index 4a7a10a..d368b17 100644
--- a/examples/pipetobrowser/p2b/main.go
+++ b/examples/pipetobrowser/p2b/main.go
@@ -87,8 +87,11 @@
 		return
 	}
 
-	log.Infof("Stream finished with status: %v", result)
-	log.Infof("Total of %d bytes were piped to browser", numBytes)
+	if numBytes != result {
+		log.Infof("*** number of bytes sent and received do NOT match ***")
+	}
+	log.Infof("%d bytes were piped to browser", numBytes)
+	log.Infof("%d bytes were received by browser", result)
 
 	fmt.Println("Finished piping to browser! Thanks for using p2b.")
 }
diff --git a/services/security/discharger.vdl.go b/services/security/discharger.vdl.go
index b24b968..34ab8fc 100644
--- a/services/security/discharger.vdl.go
+++ b/services/security/discharger.vdl.go
@@ -1,7 +1,7 @@
 // This file was auto-generated by the veyron vdl tool.
 // Source: discharger.vdl
 
-package discharge
+package security
 
 import (
 	"veyron2/security"
@@ -16,7 +16,7 @@
 	_gen_wiretype "veyron2/wiretype"
 )
 
-// DischargeIssuer service issues caveat discharges when requested
+// DischargeIssuer service issues caveat discharges when requested.
 // Discharger is the interface the client binds and uses.
 // Discharger_ExcludingUniversal is the interface without internal framework-added methods
 // to enable embedding without method collisions.  Not to be used directly by clients.