| <!doctype html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <title>Stream Node.js v0.10.24 Manual & Documentation</title> |
| <link rel="stylesheet" href="assets/style.css"> |
| <link rel="stylesheet" href="assets/sh.css"> |
| <link rel="canonical" href="http://nodejs.org/api/stream.html"> |
| </head> |
| <body class="alt apidoc" id="api-section-stream"> |
| <div id="intro" class="interior"> |
| <a href="/" title="Go back to the home page"> |
| <img id="logo" src="http://nodejs.org/images/logo-light.png" alt="node.js"> |
| </a> |
| </div> |
| <div id="content" class="clearfix"> |
| <div id="column2" class="interior"> |
| <ul> |
| <li><a href="/" class="home">Home</a></li> |
| <li><a href="/download/" class="download">Download</a></li> |
| <li><a href="/about/" class="about">About</a></li> |
| <li><a href="http://npmjs.org/" class="npm">npm Registry</a></li> |
| <li><a href="http://nodejs.org/api/" class="docs current">Docs</a></li> |
| <li><a href="http://blog.nodejs.org" class="blog">Blog</a></li> |
| <li><a href="/community/" class="community">Community</a></li> |
| <li><a href="/logos/" class="logos">Logos</a></li> |
| <li><a href="http://jobs.nodejs.org/" class="jobs">Jobs</a></li> |
| </ul> |
| <p class="twitter"><a href="http://twitter.com/nodejs">@nodejs</a></p> |
| </div> |
| |
| <div id="column1" class="interior"> |
| <header> |
| <h1>Node.js v0.10.24 Manual & Documentation</h1> |
| <div id="gtoc"> |
| <p> |
| <a href="index.html" name="toc">Index</a> | |
| <a href="all.html">View on single page</a> | |
| <a href="stream.json">View as JSON</a> |
| </p> |
| </div> |
| <hr> |
| </header> |
| |
| <div id="toc"> |
| <h2>Table of Contents</h2> |
| <ul> |
| <li><a href="#stream_stream">Stream</a><ul> |
| <li><a href="#stream_api_for_stream_consumers">API for Stream Consumers</a><ul> |
| <li><a href="#stream_class_stream_readable">Class: stream.Readable</a><ul> |
| <li><a href="#stream_event_readable">Event: 'readable'</a></li> |
| <li><a href="#stream_event_data">Event: 'data'</a></li> |
| <li><a href="#stream_event_end">Event: 'end'</a></li> |
| <li><a href="#stream_event_close">Event: 'close'</a></li> |
| <li><a href="#stream_event_error">Event: 'error'</a></li> |
| <li><a href="#stream_readable_read_size">readable.read([size])</a></li> |
| <li><a href="#stream_readable_setencoding_encoding">readable.setEncoding(encoding)</a></li> |
| <li><a href="#stream_readable_resume">readable.resume()</a></li> |
| <li><a href="#stream_readable_pause">readable.pause()</a></li> |
| <li><a href="#stream_readable_pipe_destination_options">readable.pipe(destination, [options])</a></li> |
| <li><a href="#stream_readable_unpipe_destination">readable.unpipe([destination])</a></li> |
| <li><a href="#stream_readable_unshift_chunk">readable.unshift(chunk)</a></li> |
| <li><a href="#stream_readable_wrap_stream">readable.wrap(stream)</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_class_stream_writable">Class: stream.Writable</a><ul> |
| <li><a href="#stream_writable_write_chunk_encoding_callback">writable.write(chunk, [encoding], [callback])</a></li> |
| <li><a href="#stream_event_drain">Event: 'drain'</a></li> |
| <li><a href="#stream_writable_end_chunk_encoding_callback">writable.end([chunk], [encoding], [callback])</a></li> |
| <li><a href="#stream_event_finish">Event: 'finish'</a></li> |
| <li><a href="#stream_event_pipe">Event: 'pipe'</a></li> |
| <li><a href="#stream_event_unpipe">Event: 'unpipe'</a></li> |
| <li><a href="#stream_event_error_1">Event: 'error'</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_class_stream_duplex">Class: stream.Duplex</a></li> |
| <li><a href="#stream_class_stream_transform">Class: stream.Transform</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_api_for_stream_implementors">API for Stream Implementors</a><ul> |
| <li><a href="#stream_class_stream_readable_1">Class: stream.Readable</a><ul> |
| <li><a href="#stream_example_a_counting_stream">Example: A Counting Stream</a></li> |
| <li><a href="#stream_example_simpleprotocol_v1_sub_optimal">Example: SimpleProtocol v1 (Sub-optimal)</a></li> |
| <li><a href="#stream_new_stream_readable_options">new stream.Readable([options])</a></li> |
| <li><a href="#stream_readable_read_size_1">readable._read(size)</a></li> |
| <li><a href="#stream_readable_push_chunk_encoding">readable.push(chunk, [encoding])</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_class_stream_writable_1">Class: stream.Writable</a><ul> |
| <li><a href="#stream_new_stream_writable_options">new stream.Writable([options])</a></li> |
| <li><a href="#stream_writable_write_chunk_encoding_callback_1">writable._write(chunk, encoding, callback)</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_class_stream_duplex_1">Class: stream.Duplex</a><ul> |
| <li><a href="#stream_new_stream_duplex_options">new stream.Duplex(options)</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_class_stream_transform_1">Class: stream.Transform</a><ul> |
| <li><a href="#stream_new_stream_transform_options">new stream.Transform([options])</a></li> |
| <li><a href="#stream_transform_transform_chunk_encoding_callback">transform._transform(chunk, encoding, callback)</a></li> |
| <li><a href="#stream_transform_flush_callback">transform._flush(callback)</a></li> |
| <li><a href="#stream_example_simpleprotocol_parser_v2">Example: <code>SimpleProtocol</code> parser v2</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_class_stream_passthrough">Class: stream.PassThrough</a></li> |
| </ul> |
| </li> |
| <li><a href="#stream_streams_under_the_hood">Streams: Under the Hood</a><ul> |
| <li><a href="#stream_buffering">Buffering</a></li> |
| <li><a href="#stream_stream_read_0"><code>stream.read(0)</code></a></li> |
| <li><a href="#stream_stream_push"><code>stream.push('')</code></a></li> |
| <li><a href="#stream_compatibility_with_older_node_versions">Compatibility with Older Node Versions</a></li> |
| <li><a href="#stream_object_mode">Object Mode</a></li> |
| <li><a href="#stream_state_objects">State Objects</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| </ul> |
| |
| </div> |
| |
| <div id="apicontent"> |
| <h1>Stream<span><a class="mark" href="#stream_stream" id="stream_stream">#</a></span></h1> |
| <pre class="api_stability_2">Stability: 2 - Unstable</pre><p>A stream is an abstract interface implemented by various objects in |
| Node. For example a <a href="http.html#http_http_incomingmessage">request to an HTTP |
| server</a> is a stream, as is |
| <a href="process.html#process_process_stdout">stdout</a>. Streams are readable, writable, or both. All streams are |
| instances of <a href="events.html#events_class_events_eventemitter">EventEmitter</a> |
| |
| </p> |
| <p>You can load the Stream base classes by doing <code>require('stream')</code>. |
| There are base classes provided for <a href="#stream_class_stream_readable">Readable</a> streams, <a href="#stream_class_stream_writable">Writable</a> |
| streams, <a href="#stream_class_stream_duplex">Duplex</a> streams, and <a href="#stream_class_stream_transform">Transform</a> streams. |
| |
| </p> |
| <p>This document is split up into 3 sections. The first explains the |
| parts of the API that you need to be aware of to use streams in your |
| programs. If you never implement a streaming API yourself, you can |
| stop there. |
| |
| </p> |
| <p>The second section explains the parts of the API that you need to use |
| if you implement your own custom streams yourself. The API is |
| designed to make this easy for you to do. |
| |
| </p> |
| <p>The third section goes into more depth about how streams work, |
| including some of the internal mechanisms and functions that you |
| should probably not modify unless you definitely know what you are |
| doing. |
| |
| |
| </p> |
| <h2>API for Stream Consumers<span><a class="mark" href="#stream_api_for_stream_consumers" id="stream_api_for_stream_consumers">#</a></span></h2> |
| <!--type=misc--> |
| |
| <p>Streams can be either <a href="#stream_class_stream_readable">Readable</a>, <a href="#stream_class_stream_writable">Writable</a>, or both (<a href="#stream_class_stream_duplex">Duplex</a>). |
| |
| </p> |
| <p>All streams are EventEmitters, but they also have other custom methods |
| and properties depending on whether they are Readable, Writable, or |
| Duplex. |
| |
| </p> |
| <p>If a stream is both Readable and Writable, then it implements all of |
| the methods and events below. So, a <a href="#stream_class_stream_duplex">Duplex</a> or <a href="#stream_class_stream_transform">Transform</a> stream is |
| fully described by this API, though their implementation may be |
| somewhat different. |
| |
| </p> |
| <p>It is not necessary to implement Stream interfaces in order to consume |
| streams in your programs. If you <strong>are</strong> implementing streaming |
| interfaces in your own program, please also refer to |
| <a href="#stream_api_for_stream_implementors">API for Stream Implementors</a> below. |
| |
| </p> |
| <p>Almost all Node programs, no matter how simple, use Streams in some |
| way. Here is an example of using Streams in a Node program: |
| |
| </p> |
| <pre><code class="javascript">var http = require('http'); |
| |
| var server = http.createServer(function (req, res) { |
| // req is an http.IncomingMessage, which is a Readable Stream |
| // res is an http.ServerResponse, which is a Writable Stream |
| |
| var body = ''; |
| // we want to get the data as utf8 strings |
| // If you don't set an encoding, then you'll get Buffer objects |
| req.setEncoding('utf8'); |
| |
| // Readable streams emit 'data' events once a listener is added |
| req.on('data', function (chunk) { |
| body += chunk; |
| }) |
| |
| // the end event tells you that you have entire body |
| req.on('end', function () { |
| try { |
| var data = JSON.parse(body); |
| } catch (er) { |
| // uh oh! bad json! |
| res.statusCode = 400; |
| return res.end('error: ' + er.message); |
| } |
| |
| // write back something interesting to the user: |
| res.write(typeof data); |
| res.end(); |
| }) |
| }) |
| |
| server.listen(1337); |
| |
| // $ curl localhost:1337 -d '{}' |
| // object |
| // $ curl localhost:1337 -d '"foo"' |
| // string |
| // $ curl localhost:1337 -d 'not json' |
| // error: Unexpected token o</code></pre> |
| <h3>Class: stream.Readable<span><a class="mark" href="#stream_class_stream_readable" id="stream_class_stream_readable">#</a></span></h3> |
| <!--type=class--> |
| |
| <p>The Readable stream interface is the abstraction for a <em>source</em> of |
| data that you are reading from. In other words, data comes <em>out</em> of a |
| Readable stream. |
| |
| </p> |
| <p>A Readable stream will not start emitting data until you indicate that |
| you are ready to receive it. |
| |
| </p> |
| <p>Readable streams have two "modes": a <strong>flowing mode</strong> and a <strong>non-flowing |
| mode</strong>. When in flowing mode, data is read from the underlying system |
| and provided to your program as fast as possible. In non-flowing |
| mode, you must explicitly call <code>stream.read()</code> to get chunks of data |
| out. |
| |
| </p> |
| <p>Examples of readable streams include: |
| |
| </p> |
| <ul> |
| <li><a href="http.html#http_http_incomingmessage">http responses, on the client</a></li> |
| <li><a href="http.html#http_http_incomingmessage">http requests, on the server</a></li> |
| <li><a href="fs.html#fs_class_fs_readstream">fs read streams</a></li> |
| <li><a href="zlib.html">zlib streams</a></li> |
| <li><a href="crypto.html">crypto streams</a></li> |
| <li><a href="net.html#net_class_net_socket">tcp sockets</a></li> |
| <li><a href="child_process.html#child_process_child_stdout">child process stdout and stderr</a></li> |
| <li><a href="process.html#process_process_stdin">process.stdin</a></li> |
| </ul> |
| <h4>Event: 'readable'<span><a class="mark" href="#stream_event_readable" id="stream_event_readable">#</a></span></h4> |
| <p>When a chunk of data can be read from the stream, it will emit a |
| <code>'readable'</code> event. |
| |
| </p> |
| <p>In some cases, listening for a <code>'readable'</code> event will cause some data |
| to be read into the internal buffer from the underlying system, if it |
| hadn't already. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| readable.on('readable', function() { |
| // there is some data to read now |
| })</code></pre> |
| <p>Once the internal buffer is drained, a <code>readable</code> event will fire |
| again when more data is available. |
| |
| </p> |
| <h4>Event: 'data'<span><a class="mark" href="#stream_event_data" id="stream_event_data">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>chunk</code> <span class="type">Buffer | String</span> The chunk of data.</li> |
| </div></ul> |
| <p>If you attach a <code>data</code> event listener, then it will switch the stream |
| into flowing mode, and data will be passed to your handler as soon as |
| it is available. |
| |
| </p> |
| <p>If you just want to get all the data out of the stream as fast as |
| possible, this is the best way to do so. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| readable.on('data', function(chunk) { |
| console.log('got %d bytes of data', chunk.length); |
| })</code></pre> |
| <h4>Event: 'end'<span><a class="mark" href="#stream_event_end" id="stream_event_end">#</a></span></h4> |
| <p>This event fires when no more data will be provided. |
| |
| </p> |
| <p>Note that the <code>end</code> event <strong>will not fire</strong> unless the data is |
| completely consumed. This can be done by switching into flowing mode, |
| or by calling <code>read()</code> repeatedly until you get to the end. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| readable.on('data', function(chunk) { |
| console.log('got %d bytes of data', chunk.length); |
| }) |
| readable.on('end', function() { |
| console.log('there will be no more data.'); |
| });</code></pre> |
| <h4>Event: 'close'<span><a class="mark" href="#stream_event_close" id="stream_event_close">#</a></span></h4> |
| <p>Emitted when the underlying resource (for example, the backing file |
| descriptor) has been closed. Not all streams will emit this. |
| |
| </p> |
| <h4>Event: 'error'<span><a class="mark" href="#stream_event_error" id="stream_event_error">#</a></span></h4> |
| <p>Emitted if there was an error receiving data. |
| |
| </p> |
| <h4>readable.read([size])<span><a class="mark" href="#stream_readable_read_size" id="stream_readable_read_size">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>size</code> <span class="type">Number</span> Optional argument to specify how much data to read.</li> |
| <li>Return <span class="type">String | Buffer | null</span></li> |
| </div></ul> |
| <p>The <code>read()</code> method pulls some data out of the internal buffer and |
| returns it. If there is no data available, then it will return |
| <code>null</code>. |
| |
| </p> |
| <p>If you pass in a <code>size</code> argument, then it will return that many |
| bytes. If <code>size</code> bytes are not available, then it will return <code>null</code>. |
| |
| </p> |
| <p>If you do not specify a <code>size</code> argument, then it will return all the |
| data in the internal buffer. |
| |
| </p> |
| <p>This method should only be called in non-flowing mode. In |
| flowing-mode, this method is called automatically until the internal |
| buffer is drained. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| readable.on('readable', function() { |
| var chunk; |
| while (null !== (chunk = readable.read())) { |
| console.log('got %d bytes of data', chunk.length); |
| } |
| });</code></pre> |
| <h4>readable.setEncoding(encoding)<span><a class="mark" href="#stream_readable_setencoding_encoding" id="stream_readable_setencoding_encoding">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>encoding</code> <span class="type">String</span> The encoding to use.</li> |
| </div></ul> |
| <p>Call this function to cause the stream to return strings of the |
| specified encoding instead of Buffer objects. For example, if you do |
| <code>readable.setEncoding('utf8')</code>, then the output data will be |
| interpreted as UTF-8 data, and returned as strings. If you do |
| <code>readable.setEncoding('hex')</code>, then the data will be encoded in |
| hexadecimal string format. |
| |
| </p> |
| <p>This properly handles multi-byte characters that would otherwise be |
| potentially mangled if you simply pulled the Buffers directly and |
| called <code>buf.toString(encoding)</code> on them. If you want to read the data |
| as strings, always use this method. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| readable.setEncoding('utf8'); |
| readable.on('data', function(chunk) { |
| assert.equal(typeof chunk, 'string'); |
| console.log('got %d characters of string data', chunk.length); |
| })</code></pre> |
| <h4>readable.resume()<span><a class="mark" href="#stream_readable_resume" id="stream_readable_resume">#</a></span></h4> |
| <p>This method will cause the readable stream to resume emitting <code>data</code> |
| events. |
| |
| </p> |
| <p>This method will switch the stream into flowing-mode. If you do <em>not</em> |
| want to consume the data from a stream, but you <em>do</em> want to get to |
| its <code>end</code> event, you can call <code>readable.resume()</code> to open the flow of |
| data. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| readable.resume(); |
| readable.on('end', function(chunk) { |
| console.log('got to the end, but did not read anything'); |
| })</code></pre> |
| <h4>readable.pause()<span><a class="mark" href="#stream_readable_pause" id="stream_readable_pause">#</a></span></h4> |
| <p>This method will cause a stream in flowing-mode to stop emitting |
| <code>data</code> events. Any data that becomes available will remain in the |
| internal buffer. |
| |
| </p> |
| <p>This method is only relevant in flowing mode. When called on a |
| non-flowing stream, it will switch into flowing mode, but remain |
| paused. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| readable.on('data', function(chunk) { |
| console.log('got %d bytes of data', chunk.length); |
| readable.pause(); |
| console.log('there will be no more data for 1 second'); |
| setTimeout(function() { |
| console.log('now data will start flowing again'); |
| readable.resume(); |
| }, 1000); |
| })</code></pre> |
| <h4>readable.pipe(destination, [options])<span><a class="mark" href="#stream_readable_pipe_destination_options" id="stream_readable_pipe_destination_options">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>destination</code> <span class="type"><a href="#stream_class_stream_writable">Writable</a> Stream</span> The destination for writing data</li> |
| <li><code>options</code> <span class="type">Object</span> Pipe options<ul> |
| <li><code>end</code> <span class="type">Boolean</span> End the writer when the reader ends. Default = <code>true</code></li> |
| </ul> |
| </li> |
| </div></ul> |
| <p>This method pulls all the data out of a readable stream, and writes it |
| to the supplied destination, automatically managing the flow so that |
| the destination is not overwhelmed by a fast readable stream. |
| |
| </p> |
| <p>Multiple destinations can be piped to safely. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| var writable = fs.createWriteStream('file.txt'); |
| // All the data from readable goes into 'file.txt' |
| readable.pipe(writable);</code></pre> |
| <p>This function returns the destination stream, so you can set up pipe |
| chains like so: |
| |
| </p> |
| <pre><code class="javascript">var r = fs.createReadStream('file.txt'); |
| var z = zlib.createGzip(); |
| var w = fs.createWriteStream('file.txt.gz'); |
| r.pipe(z).pipe(w);</code></pre> |
| <p>For example, emulating the Unix <code>cat</code> command: |
| |
| </p> |
| <pre><code class="javascript">process.stdin.pipe(process.stdout);</code></pre> |
| <p>By default <a href="#stream_writable_end_chunk_encoding_callback"><code>end()</code></a> is called on the destination when the source stream |
| emits <code>end</code>, so that <code>destination</code> is no longer writable. Pass <code>{ end: |
| false }</code> as <code>options</code> to keep the destination stream open. |
| |
| </p> |
| <p>This keeps <code>writer</code> open so that "Goodbye" can be written at the |
| end. |
| |
| </p> |
| <pre><code class="javascript">reader.pipe(writer, { end: false }); |
| reader.on('end', function() { |
| writer.end('Goodbye\n'); |
| });</code></pre> |
| <p>Note that <code>process.stderr</code> and <code>process.stdout</code> are never closed until |
| the process exits, regardless of the specified options. |
| |
| </p> |
| <h4>readable.unpipe([destination])<span><a class="mark" href="#stream_readable_unpipe_destination" id="stream_readable_unpipe_destination">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>destination</code> <span class="type"><a href="#stream_class_stream_writable">Writable</a> Stream</span> Optional specific stream to unpipe</li> |
| </div></ul> |
| <p>This method will remove the hooks set up for a previous <code>pipe()</code> call. |
| |
| </p> |
| <p>If the destination is not specified, then all pipes are removed. |
| |
| </p> |
| <p>If the destination is specified, but no pipe is set up for it, then |
| this is a no-op. |
| |
| </p> |
| <pre><code class="javascript">var readable = getReadableStreamSomehow(); |
| var writable = fs.createWriteStream('file.txt'); |
| // All the data from readable goes into 'file.txt', |
| // but only for the first second |
| readable.pipe(writable); |
| setTimeout(function() { |
| console.log('stop writing to file.txt'); |
| readable.unpipe(writable); |
| console.log('manually close the file stream'); |
| writable.end(); |
| }, 1000);</code></pre> |
| <h4>readable.unshift(chunk)<span><a class="mark" href="#stream_readable_unshift_chunk" id="stream_readable_unshift_chunk">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>chunk</code> <span class="type">Buffer | String</span> Chunk of data to unshift onto the read queue</li> |
| </div></ul> |
| <p>This is useful in certain cases where a stream is being consumed by a |
| parser, which needs to "un-consume" some data that it has |
| optimistically pulled out of the source, so that the stream can be |
| passed on to some other party. |
| |
| </p> |
| <p>If you find that you must often call <code>stream.unshift(chunk)</code> in your |
| programs, consider implementing a <a href="#stream_class_stream_transform">Transform</a> stream instead. (See API |
| for Stream Implementors, below.) |
| |
| </p> |
| <pre><code class="javascript">// Pull off a header delimited by \n\n |
| // use unshift() if we get too much |
| // Call the callback with (error, header, stream) |
| var StringDecoder = require('string_decoder').StringDecoder; |
| function parseHeader(stream, callback) { |
| stream.on('error', callback); |
| stream.on('readable', onReadable); |
| var decoder = new StringDecoder('utf8'); |
| var header = ''; |
| function onReadable() { |
| var chunk; |
| while (null !== (chunk = stream.read())) { |
| var str = decoder.write(chunk); |
| if (str.match(/\n\n/)) { |
| // found the header boundary |
| var split = str.split(/\n\n/); |
| header += split.shift(); |
| var remaining = split.join('\n\n'); |
| var buf = new Buffer(remaining, 'utf8'); |
| if (buf.length) |
| stream.unshift(buf); |
| stream.removeListener('error', callback); |
| stream.removeListener('readable', onReadable); |
| // now the body of the message can be read from the stream. |
| callback(null, header, stream); |
| } else { |
| // still reading the header. |
| header += str; |
| } |
| } |
| } |
| }</code></pre> |
| <h4>readable.wrap(stream)<span><a class="mark" href="#stream_readable_wrap_stream" id="stream_readable_wrap_stream">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>stream</code> <span class="type">Stream</span> An "old style" readable stream</li> |
| </div></ul> |
| <p>Versions of Node prior to v0.10 had streams that did not implement the |
| entire Streams API as it is today. (See "Compatibility" below for |
| more information.) |
| |
| </p> |
| <p>If you are using an older Node library that emits <code>'data'</code> events and |
| has a <code>pause()</code> method that is advisory only, then you can use the |
| <code>wrap()</code> method to create a <a href="#stream_class_stream_readable">Readable</a> stream that uses the old stream |
| as its data source. |
| |
| </p> |
| <p>You will very rarely ever need to call this function, but it exists |
| as a convenience for interacting with old Node programs and libraries. |
| |
| </p> |
| <p>For example: |
| |
| </p> |
| <pre><code class="javascript">var OldReader = require('./old-api-module.js').OldReader; |
| var oreader = new OldReader; |
| var Readable = require('stream').Readable; |
| var myReader = new Readable().wrap(oreader); |
| |
| myReader.on('readable', function() { |
| myReader.read(); // etc. |
| });</code></pre> |
| <h3>Class: stream.Writable<span><a class="mark" href="#stream_class_stream_writable" id="stream_class_stream_writable">#</a></span></h3> |
| <!--type=class--> |
| |
| <p>The Writable stream interface is an abstraction for a <em>destination</em> |
| that you are writing data <em>to</em>. |
| |
| </p> |
| <p>Examples of writable streams include: |
| |
| </p> |
| <ul> |
| <li><a href="http.html#http_class_http_clientrequest">http requests, on the client</a></li> |
| <li><a href="http.html#http_class_http_serverresponse">http responses, on the server</a></li> |
| <li><a href="fs.html#fs_class_fs_writestream">fs write streams</a></li> |
| <li><a href="zlib.html">zlib streams</a></li> |
| <li><a href="crypto.html">crypto streams</a></li> |
| <li><a href="net.html#net_class_net_socket">tcp sockets</a></li> |
| <li><a href="child_process.html#child_process_child_stdin">child process stdin</a></li> |
| <li><a href="process.html#process_process_stdout">process.stdout</a>, <a href="process.html#process_process_stderr">process.stderr</a></li> |
| </ul> |
| <h4>writable.write(chunk, [encoding], [callback])<span><a class="mark" href="#stream_writable_write_chunk_encoding_callback" id="stream_writable_write_chunk_encoding_callback">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>chunk</code> <span class="type">String | Buffer</span> The data to write</li> |
| <li><code>encoding</code> <span class="type">String</span> The encoding, if <code>chunk</code> is a String</li> |
| <li><code>callback</code> <span class="type">Function</span> Callback for when this chunk of data is flushed</li> |
| <li>Returns: <span class="type">Boolean</span> True if the data was handled completely.</li> |
| </div></ul> |
| <p>This method writes some data to the underlying system, and calls the |
| supplied callback once the data has been fully handled. |
| |
| </p> |
| <p>The return value indicates if you should continue writing right now. |
| If the data had to be buffered internally, then it will return |
| <code>false</code>. Otherwise, it will return <code>true</code>. |
| |
| </p> |
| <p>This return value is strictly advisory. You MAY continue to write, |
| even if it returns <code>false</code>. However, writes will be buffered in |
| memory, so it is best not to do this excessively. Instead, wait for |
| the <code>drain</code> event before writing more data. |
| |
| </p> |
| <h4>Event: 'drain'<span><a class="mark" href="#stream_event_drain" id="stream_event_drain">#</a></span></h4> |
| <p>If a <a href="#stream_writable_write_chunk_encoding_callback"><code>writable.write(chunk)</code></a> call returns false, then the <code>drain</code> |
| event will indicate when it is appropriate to begin writing more data |
| to the stream. |
| |
| </p> |
| <pre><code class="javascript">// Write the data to the supplied writable stream 1MM times. |
| // Be attentive to back-pressure. |
| function writeOneMillionTimes(writer, data, encoding, callback) { |
| var i = 1000000; |
| write(); |
| function write() { |
| var ok = true; |
| do { |
| i -= 1; |
| if (i === 0) { |
| // last time! |
| writer.write(data, encoding, callback); |
| } else { |
| // see if we should continue, or wait |
| // don't pass the callback, because we're not done yet. |
| ok = writer.write(data, encoding); |
| } |
| } while (i > 0 && ok); |
| if (i > 0) { |
| // had to stop early! |
| // write some more once it drains |
| writer.once('drain', write); |
| } |
| } |
| }</code></pre> |
| <h4>writable.end([chunk], [encoding], [callback])<span><a class="mark" href="#stream_writable_end_chunk_encoding_callback" id="stream_writable_end_chunk_encoding_callback">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>chunk</code> <span class="type">String | Buffer</span> Optional data to write</li> |
| <li><code>encoding</code> <span class="type">String</span> The encoding, if <code>chunk</code> is a String</li> |
| <li><code>callback</code> <span class="type">Function</span> Optional callback for when the stream is finished</li> |
| </div></ul> |
| <p>Call this method when no more data will be written to the stream. If |
| supplied, the callback is attached as a listener on the <code>finish</code> event. |
| |
| </p> |
| <p>Calling <a href="#stream_writable_write_chunk_encoding_callback"><code>write()</code></a> after calling <a href="#stream_writable_end_chunk_encoding_callback"><code>end()</code></a> will raise an error. |
| |
| </p> |
| <pre><code class="javascript">// write 'hello, ' and then end with 'world!' |
| http.createServer(function (req, res) { |
| res.write('hello, '); |
| res.end('world!'); |
| // writing more now is not allowed! |
| });</code></pre> |
| <h4>Event: 'finish'<span><a class="mark" href="#stream_event_finish" id="stream_event_finish">#</a></span></h4> |
| <p>When the <a href="#stream_writable_end_chunk_encoding_callback"><code>end()</code></a> method has been called, and all data has been flushed |
| to the underlying system, this event is emitted. |
| |
| </p> |
| <pre><code class="javascript">var writer = getWritableStreamSomehow(); |
| for (var i = 0; i < 100; i ++) { |
| writer.write('hello, #' + i + '!\n'); |
| } |
| writer.end('this is the end\n'); |
| writer.on('finish', function() { |
| console.error('all writes are now complete.'); |
| });</code></pre> |
| <h4>Event: 'pipe'<span><a class="mark" href="#stream_event_pipe" id="stream_event_pipe">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>src</code> <span class="type"><a href="#stream_class_stream_readable">Readable</a> Stream</span> source stream that is piping to this writable</li> |
| </div></ul> |
| <p>This is emitted whenever the <code>pipe()</code> method is called on a readable |
| stream, adding this writable to its set of destinations. |
| |
| </p> |
| <pre><code class="javascript">var writer = getWritableStreamSomehow(); |
| var reader = getReadableStreamSomehow(); |
| writer.on('pipe', function(src) { |
| console.error('something is piping into the writer'); |
| assert.equal(src, reader); |
| }); |
| reader.pipe(writer);</code></pre> |
| <h4>Event: 'unpipe'<span><a class="mark" href="#stream_event_unpipe" id="stream_event_unpipe">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>src</code> <span class="type"><a href="#stream_class_stream_readable">Readable</a> Stream</span> The source stream that <a href="#stream_readable_unpipe_destination">unpiped</a> this writable</li> |
| </div></ul> |
| <p>This is emitted whenever the <a href="#stream_readable_unpipe_destination"><code>unpipe()</code></a> method is called on a |
| readable stream, removing this writable from its set of destinations. |
| |
| </p> |
| <pre><code class="javascript">var writer = getWritableStreamSomehow(); |
| var reader = getReadableStreamSomehow(); |
| writer.on('unpipe', function(src) { |
| console.error('something has stopped piping into the writer'); |
| assert.equal(src, reader); |
| }); |
| reader.pipe(writer); |
| reader.unpipe(writer);</code></pre> |
| <h4>Event: 'error'<span><a class="mark" href="#stream_event_error_1" id="stream_event_error_1">#</a></span></h4> |
| <p>Emitted if there was an error when writing or piping data. |
| |
| </p> |
| <h3>Class: stream.Duplex<span><a class="mark" href="#stream_class_stream_duplex" id="stream_class_stream_duplex">#</a></span></h3> |
| <p>Duplex streams are streams that implement both the <a href="#stream_class_stream_readable">Readable</a> and |
| <a href="#stream_class_stream_writable">Writable</a> interfaces. See above for usage. |
| |
| </p> |
| <p>Examples of Duplex streams include: |
| |
| </p> |
| <ul> |
| <li><a href="net.html#net_class_net_socket">tcp sockets</a></li> |
| <li><a href="zlib.html">zlib streams</a></li> |
| <li><a href="crypto.html">crypto streams</a></li> |
| </ul> |
| <h3>Class: stream.Transform<span><a class="mark" href="#stream_class_stream_transform" id="stream_class_stream_transform">#</a></span></h3> |
| <p>Transform streams are <a href="#stream_class_stream_duplex">Duplex</a> streams where the output is in some way |
| computed from the input. They implement both the <a href="#stream_class_stream_readable">Readable</a> and |
| <a href="#stream_class_stream_writable">Writable</a> interfaces. See above for usage. |
| |
| </p> |
| <p>Examples of Transform streams include: |
| |
| </p> |
| <ul> |
| <li><a href="zlib.html">zlib streams</a></li> |
| <li><a href="crypto.html">crypto streams</a></li> |
| </ul> |
| <h2>API for Stream Implementors<span><a class="mark" href="#stream_api_for_stream_implementors" id="stream_api_for_stream_implementors">#</a></span></h2> |
| <!--type=misc--> |
| |
| <p>To implement any sort of stream, the pattern is the same: |
| |
| </p> |
| <ol> |
| <li>Extend the appropriate parent class in your own subclass. (The |
| <a href="util.html#util_util_inherits_constructor_superconstructor"><code>util.inherits</code></a> method is particularly helpful for this.)</li> |
| <li>Call the appropriate parent class constructor in your constructor, |
| to be sure that the internal mechanisms are set up properly.</li> |
| <li>Implement one or more specific methods, as detailed below.</li> |
| </ol> |
| <p>The class to extend and the method(s) to implement depend on the sort |
| of stream class you are writing: |
| |
| </p> |
| <table> |
| <thead> |
| <tr> |
| <th> |
| <p>Use-case</p> |
| </th> |
| <th> |
| <p>Class</p> |
| </th> |
| <th> |
| <p>Method(s) to implement</p> |
| </th> |
| </tr> |
| </thead> |
| <tr> |
| <td> |
| <p>Reading only</p> |
| </td> |
| <td> |
| <p><a href="#stream_class_stream_readable_1">Readable</a></p> |
| </td> |
| <td> |
| <p><code><a href="#stream_readable_read_size_1">_read</a></code></p> |
| </td> |
| </tr> |
| <tr> |
| <td> |
| <p>Writing only</p> |
| </td> |
| <td> |
| <p><a href="#stream_class_stream_writable_1">Writable</a></p> |
| </td> |
| <td> |
| <p><code><a href="#stream_writable_write_chunk_encoding_callback_1">_write</a></code></p> |
| </td> |
| </tr> |
| <tr> |
| <td> |
| <p>Reading and writing</p> |
| </td> |
| <td> |
| <p><a href="#stream_class_stream_duplex_1">Duplex</a></p> |
| </td> |
| <td> |
| <p><code><a href="#stream_readable_read_size_1">_read</a></code>, <code><a href="#stream_writable_write_chunk_encoding_callback_1">_write</a></code></p> |
| </td> |
| </tr> |
| <tr> |
| <td> |
| <p>Operate on written data, then read the result</p> |
| </td> |
| <td> |
| <p><a href="#stream_class_stream_transform_1">Transform</a></p> |
| </td> |
| <td> |
| <p><code>_transform</code>, <code>_flush</code></p> |
| </td> |
| </tr> |
| </table> |
| |
| <p>In your implementation code, it is very important to never call the |
| methods described in <a href="#stream_api_for_stream_consumers">API for Stream Consumers</a> above. Otherwise, you |
| can potentially cause adverse side effects in programs that consume |
| your streaming interfaces. |
| |
| </p> |
| <h3>Class: stream.Readable<span><a class="mark" href="#stream_class_stream_readable_1" id="stream_class_stream_readable_1">#</a></span></h3> |
| <!--type=class--> |
| |
| <p><code>stream.Readable</code> is an abstract class designed to be extended with an |
| underlying implementation of the <a href="#stream_readable_read_size_1"><code>_read(size)</code></a> method. |
| |
| </p> |
| <p>Please see above under <a href="#stream_api_for_stream_consumers">API for Stream Consumers</a> for how to consume |
| streams in your programs. What follows is an explanation of how to |
| implement Readable streams in your programs. |
| |
| </p> |
| <h4>Example: A Counting Stream<span><a class="mark" href="#stream_example_a_counting_stream" id="stream_example_a_counting_stream">#</a></span></h4> |
| <!--type=example--> |
| |
| <p>This is a basic example of a Readable stream. It emits the numerals |
| from 1 to 1,000,000 in ascending order, and then ends. |
| |
| </p> |
| <pre><code class="javascript">var Readable = require('stream').Readable; |
| var util = require('util'); |
| util.inherits(Counter, Readable); |
| |
| function Counter(opt) { |
| Readable.call(this, opt); |
| this._max = 1000000; |
| this._index = 1; |
| } |
| |
| Counter.prototype._read = function() { |
| var i = this._index++; |
| if (i > this._max) |
| this.push(null); |
| else { |
| var str = '' + i; |
| var buf = new Buffer(str, 'ascii'); |
| this.push(buf); |
| } |
| };</code></pre> |
| <h4>Example: SimpleProtocol v1 (Sub-optimal)<span><a class="mark" href="#stream_example_simpleprotocol_v1_sub_optimal" id="stream_example_simpleprotocol_v1_sub_optimal">#</a></span></h4> |
| <p>This is similar to the <code>parseHeader</code> function described above, but |
| implemented as a custom stream. Also, note that this implementation |
| does not convert the incoming data to a string. |
| |
| </p> |
| <p>However, this would be better implemented as a <a href="#stream_class_stream_transform">Transform</a> stream. See |
| below for a better implementation. |
| |
| </p> |
| <pre><code class="javascript">// A parser for a simple data protocol. |
| // The "header" is a JSON object, followed by 2 \n characters, and |
| // then a message body. |
| // |
| // NOTE: This can be done more simply as a Transform stream! |
| // Using Readable directly for this is sub-optimal. See the |
| // alternative example below under the Transform section. |
| |
| var Readable = require('stream').Readable; |
| var util = require('util'); |
| |
| util.inherits(SimpleProtocol, Readable); |
| |
| function SimpleProtocol(source, options) { |
| if (!(this instanceof SimpleProtocol)) |
| return new SimpleProtocol(options); |
| |
| Readable.call(this, options); |
| this._inBody = false; |
| this._sawFirstCr = false; |
| |
| // source is a readable stream, such as a socket or file |
| this._source = source; |
| |
| var self = this; |
| source.on('end', function() { |
| self.push(null); |
| }); |
| |
| // give it a kick whenever the source is readable |
| // read(0) will not consume any bytes |
| source.on('readable', function() { |
| self.read(0); |
| }); |
| |
| this._rawHeader = []; |
| this.header = null; |
| } |
| |
| SimpleProtocol.prototype._read = function(n) { |
| if (!this._inBody) { |
| var chunk = this._source.read(); |
| |
| // if the source doesn't have data, we don't have data yet. |
| if (chunk === null) |
| return this.push(''); |
| |
| // check if the chunk has a \n\n |
| var split = -1; |
| for (var i = 0; i < chunk.length; i++) { |
| if (chunk[i] === 10) { // '\n' |
| if (this._sawFirstCr) { |
| split = i; |
| break; |
| } else { |
| this._sawFirstCr = true; |
| } |
| } else { |
| this._sawFirstCr = false; |
| } |
| } |
| |
| if (split === -1) { |
| // still waiting for the \n\n |
| // stash the chunk, and try again. |
| this._rawHeader.push(chunk); |
| this.push(''); |
| } else { |
| this._inBody = true; |
| var h = chunk.slice(0, split); |
| this._rawHeader.push(h); |
| var header = Buffer.concat(this._rawHeader).toString(); |
| try { |
| this.header = JSON.parse(header); |
| } catch (er) { |
| this.emit('error', new Error('invalid simple protocol data')); |
| return; |
| } |
| // now, because we got some extra data, unshift the rest |
| // back into the read queue so that our consumer will see it. |
| var b = chunk.slice(split); |
| this.unshift(b); |
| |
| // and let them know that we are done parsing the header. |
| this.emit('header', this.header); |
| } |
| } else { |
| // from there on, just provide the data to our consumer. |
| // careful not to push(null), since that would indicate EOF. |
| var chunk = this._source.read(); |
| if (chunk) this.push(chunk); |
| } |
| }; |
| |
| // Usage: |
| // var parser = new SimpleProtocol(source); |
| // Now parser is a readable stream that will emit 'header' |
| // with the parsed header data.</code></pre> |
| <h4>new stream.Readable([options])<span><a class="mark" href="#stream_new_stream_readable_options" id="stream_new_stream_readable_options">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>options</code> <span class="type">Object</span><ul> |
| <li><code>highWaterMark</code> <span class="type">Number</span> The maximum number of bytes to store in |
| the internal buffer before ceasing to read from the underlying |
| resource. Default=16kb</li> |
| <li><code>encoding</code> <span class="type">String</span> If specified, then buffers will be decoded to |
| strings using the specified encoding. Default=null</li> |
| <li><code>objectMode</code> <span class="type">Boolean</span> Whether this stream should behave |
| as a stream of objects. Meaning that stream.read(n) returns |
| a single value instead of a Buffer of size n. Default=false</li> |
| </ul> |
| </li> |
| </div></ul> |
| <p>In classes that extend the Readable class, make sure to call the |
| Readable constructor so that the buffering settings can be properly |
| initialized. |
| |
| </p> |
| <h4>readable._read(size)<span><a class="mark" href="#stream_readable_read_size_1" id="stream_readable_read_size_1">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>size</code> <span class="type">Number</span> Number of bytes to read asynchronously</li> |
| </div></ul> |
| <p>Note: <strong>Implement this function, but do NOT call it directly.</strong> |
| |
| </p> |
| <p>This function should NOT be called directly. It should be implemented |
| by child classes, and only called by the internal Readable class |
| methods. |
| |
| </p> |
| <p>All Readable stream implementations must provide a <code>_read</code> method to |
| fetch data from the underlying resource. |
| |
| </p> |
| <p>This method is prefixed with an underscore because it is internal to |
| the class that defines it, and should not be called directly by user |
| programs. However, you <strong>are</strong> expected to override this method in |
| your own extension classes. |
| |
| </p> |
| <p>When data is available, put it into the read queue by calling |
| <code>readable.push(chunk)</code>. If <code>push</code> returns false, then you should stop |
| reading. When <code>_read</code> is called again, you should start pushing more |
| data. |
| |
| </p> |
| <p>The <code>size</code> argument is advisory. Implementations where a "read" is a |
| single call that returns data can use this to know how much data to |
| fetch. Implementations where that is not relevant, such as TCP or |
| TLS, may ignore this argument, and simply provide data whenever it |
| becomes available. There is no need, for example to "wait" until |
| <code>size</code> bytes are available before calling <a href="#stream_readable_push_chunk_encoding"><code>stream.push(chunk)</code></a>. |
| |
| </p> |
| <h4>readable.push(chunk, [encoding])<span><a class="mark" href="#stream_readable_push_chunk_encoding" id="stream_readable_push_chunk_encoding">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>chunk</code> <span class="type">Buffer | null | String</span> Chunk of data to push into the read queue</li> |
| <li><code>encoding</code> <span class="type">String</span> Encoding of String chunks. Must be a valid |
| Buffer encoding, such as <code>'utf8'</code> or <code>'ascii'</code></li> |
| <li>return <span class="type">Boolean</span> Whether or not more pushes should be performed</li> |
| </div></ul> |
| <p>Note: <strong>This function should be called by Readable implementors, NOT |
| by consumers of Readable streams.</strong> |
| |
| </p> |
| <p>The <code>_read()</code> function will not be called again until at least one |
| <code>push(chunk)</code> call is made. |
| |
| </p> |
| <p>The <code>Readable</code> class works by putting data into a read queue to be |
| pulled out later by calling the <code>read()</code> method when the <code>'readable'</code> |
| event fires. |
| |
| </p> |
| <p>The <code>push()</code> method will explicitly insert some data into the read |
| queue. If it is called with <code>null</code> then it will signal the end of the |
| data (EOF). |
| |
| </p> |
| <p>This API is designed to be as flexible as possible. For example, |
| you may be wrapping a lower-level source which has some sort of |
| pause/resume mechanism, and a data callback. In those cases, you |
| could wrap the low-level source object by doing something like this: |
| |
| </p> |
| <pre><code class="javascript">// source is an object with readStop() and readStart() methods, |
| // and an `ondata` member that gets called when it has data, and |
| // an `onend` member that gets called when the data is over. |
| |
| util.inherits(SourceWrapper, Readable); |
| |
| function SourceWrapper(options) { |
| Readable.call(this, options); |
| |
| this._source = getLowlevelSourceObject(); |
| var self = this; |
| |
| // Every time there's data, we push it into the internal buffer. |
| this._source.ondata = function(chunk) { |
| // if push() returns false, then we need to stop reading from source |
| if (!self.push(chunk)) |
| self._source.readStop(); |
| }; |
| |
| // When the source ends, we push the EOF-signalling `null` chunk |
| this._source.onend = function() { |
| self.push(null); |
| }; |
| } |
| |
| // _read will be called when the stream wants to pull more data in |
| // the advisory size argument is ignored in this case. |
| SourceWrapper.prototype._read = function(size) { |
| this._source.readStart(); |
| };</code></pre> |
| <h3>Class: stream.Writable<span><a class="mark" href="#stream_class_stream_writable_1" id="stream_class_stream_writable_1">#</a></span></h3> |
| <!--type=class--> |
| |
| <p><code>stream.Writable</code> is an abstract class designed to be extended with an |
| underlying implementation of the <a href="#stream_writable_write_chunk_encoding_callback_1"><code>_write(chunk, encoding, callback)</code></a> method. |
| |
| </p> |
| <p>Please see above under <a href="#stream_api_for_stream_consumers">API for Stream Consumers</a> for how to consume |
| writable streams in your programs. What follows is an explanation of |
| how to implement Writable streams in your programs. |
| |
| </p> |
| <h4>new stream.Writable([options])<span><a class="mark" href="#stream_new_stream_writable_options" id="stream_new_stream_writable_options">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>options</code> <span class="type">Object</span><ul> |
| <li><code>highWaterMark</code> <span class="type">Number</span> Buffer level when <a href="#stream_writable_write_chunk_encoding_callback"><code>write()</code></a> starts |
| returning false. Default=16kb</li> |
| <li><code>decodeStrings</code> <span class="type">Boolean</span> Whether or not to decode strings into |
| Buffers before passing them to <a href="#stream_writable_write_chunk_encoding_callback_1"><code>_write()</code></a>. Default=true</li> |
| </ul> |
| </li> |
| </div></ul> |
| <p>In classes that extend the Writable class, make sure to call the |
| constructor so that the buffering settings can be properly |
| initialized. |
| |
| </p> |
| <h4>writable._write(chunk, encoding, callback)<span><a class="mark" href="#stream_writable_write_chunk_encoding_callback_1" id="stream_writable_write_chunk_encoding_callback_1">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>chunk</code> <span class="type">Buffer | String</span> The chunk to be written. Will always |
| be a buffer unless the <code>decodeStrings</code> option was set to <code>false</code>.</li> |
| <li><code>encoding</code> <span class="type">String</span> If the chunk is a string, then this is the |
| encoding type. Ignore chunk is a buffer. Note that chunk will |
| <strong>always</strong> be a buffer unless the <code>decodeStrings</code> option is |
| explicitly set to <code>false</code>.</li> |
| <li><code>callback</code> <span class="type">Function</span> Call this function (optionally with an error |
| argument) when you are done processing the supplied chunk.</li> |
| </div></ul> |
| <p>All Writable stream implementations must provide a <a href="#stream_writable_write_chunk_encoding_callback_1"><code>_write()</code></a> |
| method to send data to the underlying resource. |
| |
| </p> |
| <p>Note: <strong>This function MUST NOT be called directly.</strong> It should be |
| implemented by child classes, and called by the internal Writable |
| class methods only. |
| |
| </p> |
| <p>Call the callback using the standard <code>callback(error)</code> pattern to |
| signal that the write completed successfully or with an error. |
| |
| </p> |
| <p>If the <code>decodeStrings</code> flag is set in the constructor options, then |
| <code>chunk</code> may be a string rather than a Buffer, and <code>encoding</code> will |
| indicate the sort of string that it is. This is to support |
| implementations that have an optimized handling for certain string |
| data encodings. If you do not explicitly set the <code>decodeStrings</code> |
| option to <code>false</code>, then you can safely ignore the <code>encoding</code> argument, |
| and assume that <code>chunk</code> will always be a Buffer. |
| |
| </p> |
| <p>This method is prefixed with an underscore because it is internal to |
| the class that defines it, and should not be called directly by user |
| programs. However, you <strong>are</strong> expected to override this method in |
| your own extension classes. |
| |
| |
| </p> |
| <h3>Class: stream.Duplex<span><a class="mark" href="#stream_class_stream_duplex_1" id="stream_class_stream_duplex_1">#</a></span></h3> |
| <!--type=class--> |
| |
| <p>A "duplex" stream is one that is both Readable and Writable, such as a |
| TCP socket connection. |
| |
| </p> |
| <p>Note that <code>stream.Duplex</code> is an abstract class designed to be extended |
| with an underlying implementation of the <code>_read(size)</code> and |
| <a href="#stream_writable_write_chunk_encoding_callback_1"><code>_write(chunk, encoding, callback)</code></a> methods as you would with a |
| Readable or Writable stream class. |
| |
| </p> |
| <p>Since JavaScript doesn't have multiple prototypal inheritance, this |
| class prototypally inherits from Readable, and then parasitically from |
| Writable. It is thus up to the user to implement both the lowlevel |
| <code>_read(n)</code> method as well as the lowlevel |
| <a href="#stream_writable_write_chunk_encoding_callback_1"><code>_write(chunk, encoding, callback)</code></a> method on extension duplex classes. |
| |
| </p> |
| <h4>new stream.Duplex(options)<span><a class="mark" href="#stream_new_stream_duplex_options" id="stream_new_stream_duplex_options">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>options</code> <span class="type">Object</span> Passed to both Writable and Readable |
| constructors. Also has the following fields:<ul> |
| <li><code>allowHalfOpen</code> <span class="type">Boolean</span> Default=true. If set to <code>false</code>, then |
| the stream will automatically end the readable side when the |
| writable side ends and vice versa.</li> |
| </ul> |
| </li> |
| </div></ul> |
| <p>In classes that extend the Duplex class, make sure to call the |
| constructor so that the buffering settings can be properly |
| initialized. |
| |
| |
| </p> |
| <h3>Class: stream.Transform<span><a class="mark" href="#stream_class_stream_transform_1" id="stream_class_stream_transform_1">#</a></span></h3> |
| <p>A "transform" stream is a duplex stream where the output is causally |
| connected in some way to the input, such as a <a href="zlib.html">zlib</a> stream or a |
| <a href="crypto.html">crypto</a> stream. |
| |
| </p> |
| <p>There is no requirement that the output be the same size as the input, |
| the same number of chunks, or arrive at the same time. For example, a |
| Hash stream will only ever have a single chunk of output which is |
| provided when the input is ended. A zlib stream will produce output |
| that is either much smaller or much larger than its input. |
| |
| </p> |
| <p>Rather than implement the <a href="#stream_readable_read_size_1"><code>_read()</code></a> and <a href="#stream_writable_write_chunk_encoding_callback_1"><code>_write()</code></a> methods, Transform |
| classes must implement the <code>_transform()</code> method, and may optionally |
| also implement the <code>_flush()</code> method. (See below.) |
| |
| </p> |
| <h4>new stream.Transform([options])<span><a class="mark" href="#stream_new_stream_transform_options" id="stream_new_stream_transform_options">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>options</code> <span class="type">Object</span> Passed to both Writable and Readable |
| constructors.</li> |
| </div></ul> |
| <p>In classes that extend the Transform class, make sure to call the |
| constructor so that the buffering settings can be properly |
| initialized. |
| |
| </p> |
| <h4>transform._transform(chunk, encoding, callback)<span><a class="mark" href="#stream_transform_transform_chunk_encoding_callback" id="stream_transform_transform_chunk_encoding_callback">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>chunk</code> <span class="type">Buffer | String</span> The chunk to be transformed. Will always |
| be a buffer unless the <code>decodeStrings</code> option was set to <code>false</code>.</li> |
| <li><code>encoding</code> <span class="type">String</span> If the chunk is a string, then this is the |
| encoding type. (Ignore if <code>decodeStrings</code> chunk is a buffer.)</li> |
| <li><code>callback</code> <span class="type">Function</span> Call this function (optionally with an error |
| argument) when you are done processing the supplied chunk.</li> |
| </div></ul> |
| <p>Note: <strong>This function MUST NOT be called directly.</strong> It should be |
| implemented by child classes, and called by the internal Transform |
| class methods only. |
| |
| </p> |
| <p>All Transform stream implementations must provide a <code>_transform</code> |
| method to accept input and produce output. |
| |
| </p> |
| <p><code>_transform</code> should do whatever has to be done in this specific |
| Transform class, to handle the bytes being written, and pass them off |
| to the readable portion of the interface. Do asynchronous I/O, |
| process things, and so on. |
| |
| </p> |
| <p>Call <code>transform.push(outputChunk)</code> 0 or more times to generate output |
| from this input chunk, depending on how much data you want to output |
| as a result of this chunk. |
| |
| </p> |
| <p>Call the callback function only when the current chunk is completely |
| consumed. Note that there may or may not be output as a result of any |
| particular input chunk. |
| |
| </p> |
| <p>This method is prefixed with an underscore because it is internal to |
| the class that defines it, and should not be called directly by user |
| programs. However, you <strong>are</strong> expected to override this method in |
| your own extension classes. |
| |
| </p> |
| <h4>transform._flush(callback)<span><a class="mark" href="#stream_transform_flush_callback" id="stream_transform_flush_callback">#</a></span></h4> |
| <div class="signature"><ul> |
| <li><code>callback</code> <span class="type">Function</span> Call this function (optionally with an error |
| argument) when you are done flushing any remaining data.</li> |
| </div></ul> |
| <p>Note: <strong>This function MUST NOT be called directly.</strong> It MAY be implemented |
| by child classes, and if so, will be called by the internal Transform |
| class methods only. |
| |
| </p> |
| <p>In some cases, your transform operation may need to emit a bit more |
| data at the end of the stream. For example, a <code>Zlib</code> compression |
| stream will store up some internal state so that it can optimally |
| compress the output. At the end, however, it needs to do the best it |
| can with what is left, so that the data will be complete. |
| |
| </p> |
| <p>In those cases, you can implement a <code>_flush</code> method, which will be |
| called at the very end, after all the written data is consumed, but |
| before emitting <code>end</code> to signal the end of the readable side. Just |
| like with <code>_transform</code>, call <code>transform.push(chunk)</code> zero or more |
| times, as appropriate, and call <code>callback</code> when the flush operation is |
| complete. |
| |
| </p> |
| <p>This method is prefixed with an underscore because it is internal to |
| the class that defines it, and should not be called directly by user |
| programs. However, you <strong>are</strong> expected to override this method in |
| your own extension classes. |
| |
| </p> |
| <h4>Example: <code>SimpleProtocol</code> parser v2<span><a class="mark" href="#stream_example_simpleprotocol_parser_v2" id="stream_example_simpleprotocol_parser_v2">#</a></span></h4> |
| <p>The example above of a simple protocol parser can be implemented |
| simply by using the higher level <a href="#stream_class_stream_transform">Transform</a> stream class, similar to |
| the <code>parseHeader</code> and <code>SimpleProtocol v1</code> examples above. |
| |
| </p> |
| <p>In this example, rather than providing the input as an argument, it |
| would be piped into the parser, which is a more idiomatic Node stream |
| approach. |
| |
| </p> |
| <pre><code class="javascript">var util = require('util'); |
| var Transform = require('stream').Transform; |
| util.inherits(SimpleProtocol, Transform); |
| |
| function SimpleProtocol(options) { |
| if (!(this instanceof SimpleProtocol)) |
| return new SimpleProtocol(options); |
| |
| Transform.call(this, options); |
| this._inBody = false; |
| this._sawFirstCr = false; |
| this._rawHeader = []; |
| this.header = null; |
| } |
| |
| SimpleProtocol.prototype._transform = function(chunk, encoding, done) { |
| if (!this._inBody) { |
| // check if the chunk has a \n\n |
| var split = -1; |
| for (var i = 0; i < chunk.length; i++) { |
| if (chunk[i] === 10) { // '\n' |
| if (this._sawFirstCr) { |
| split = i; |
| break; |
| } else { |
| this._sawFirstCr = true; |
| } |
| } else { |
| this._sawFirstCr = false; |
| } |
| } |
| |
| if (split === -1) { |
| // still waiting for the \n\n |
| // stash the chunk, and try again. |
| this._rawHeader.push(chunk); |
| } else { |
| this._inBody = true; |
| var h = chunk.slice(0, split); |
| this._rawHeader.push(h); |
| var header = Buffer.concat(this._rawHeader).toString(); |
| try { |
| this.header = JSON.parse(header); |
| } catch (er) { |
| this.emit('error', new Error('invalid simple protocol data')); |
| return; |
| } |
| // and let them know that we are done parsing the header. |
| this.emit('header', this.header); |
| |
| // now, because we got some extra data, emit this first. |
| this.push(chunk.slice(split)); |
| } |
| } else { |
| // from there on, just provide the data to our consumer as-is. |
| this.push(chunk); |
| } |
| done(); |
| }; |
| |
| // Usage: |
| // var parser = new SimpleProtocol(); |
| // source.pipe(parser) |
| // Now parser is a readable stream that will emit 'header' |
| // with the parsed header data.</code></pre> |
| <h3>Class: stream.PassThrough<span><a class="mark" href="#stream_class_stream_passthrough" id="stream_class_stream_passthrough">#</a></span></h3> |
| <p>This is a trivial implementation of a <a href="#stream_class_stream_transform">Transform</a> stream that simply |
| passes the input bytes across to the output. Its purpose is mainly |
| for examples and testing, but there are occasionally use cases where |
| it can come in handy as a building block for novel sorts of streams. |
| |
| |
| </p> |
| <h2>Streams: Under the Hood<span><a class="mark" href="#stream_streams_under_the_hood" id="stream_streams_under_the_hood">#</a></span></h2> |
| <!--type=misc--> |
| |
| <h3>Buffering<span><a class="mark" href="#stream_buffering" id="stream_buffering">#</a></span></h3> |
| <!--type=misc--> |
| |
| <p>Both Writable and Readable streams will buffer data on an internal |
| object called <code>_writableState.buffer</code> or <code>_readableState.buffer</code>, |
| respectively. |
| |
| </p> |
| <p>The amount of data that will potentially be buffered depends on the |
| <code>highWaterMark</code> option which is passed into the constructor. |
| |
| </p> |
| <p>Buffering in Readable streams happens when the implementation calls |
| <a href="#stream_readable_push_chunk_encoding"><code>stream.push(chunk)</code></a>. If the consumer of the Stream does not call |
| <code>stream.read()</code>, then the data will sit in the internal queue until it |
| is consumed. |
| |
| </p> |
| <p>Buffering in Writable streams happens when the user calls |
| <a href="#stream_writable_write_chunk_encoding_callback"><code>stream.write(chunk)</code></a> repeatedly, even when <code>write()</code> returns <code>false</code>. |
| |
| </p> |
| <p>The purpose of streams, especially with the <code>pipe()</code> method, is to |
| limit the buffering of data to acceptable levels, so that sources and |
| destinations of varying speed will not overwhelm the available memory. |
| |
| </p> |
| <h3><code>stream.read(0)</code><span><a class="mark" href="#stream_stream_read_0" id="stream_stream_read_0">#</a></span></h3> |
| <p>There are some cases where you want to trigger a refresh of the |
| underlying readable stream mechanisms, without actually consuming any |
| data. In that case, you can call <code>stream.read(0)</code>, which will always |
| return null. |
| |
| </p> |
| <p>If the internal read buffer is below the <code>highWaterMark</code>, and the |
| stream is not currently reading, then calling <code>read(0)</code> will trigger |
| a low-level <code>_read</code> call. |
| |
| </p> |
| <p>There is almost never a need to do this. However, you will see some |
| cases in Node's internals where this is done, particularly in the |
| Readable stream class internals. |
| |
| </p> |
| <h3><code>stream.push('')</code><span><a class="mark" href="#stream_stream_push" id="stream_stream_push">#</a></span></h3> |
| <p>Pushing a zero-byte string or Buffer (when not in <a href="#stream_object_mode">Object mode</a>) has an |
| interesting side effect. Because it <em>is</em> a call to |
| <a href="#stream_readable_push_chunk_encoding"><code>stream.push()</code></a>, it will end the <code>reading</code> process. However, it |
| does <em>not</em> add any data to the readable buffer, so there's nothing for |
| a user to consume. |
| |
| </p> |
| <p>Very rarely, there are cases where you have no data to provide now, |
| but the consumer of your stream (or, perhaps, another bit of your own |
| code) will know when to check again, by calling <code>stream.read(0)</code>. In |
| those cases, you <em>may</em> call <code>stream.push('')</code>. |
| |
| </p> |
| <p>So far, the only use case for this functionality is in the |
| <a href="tls.html#tls_class_cryptostream">tls.CryptoStream</a> class, which is deprecated in Node v0.12. If you |
| find that you have to use <code>stream.push('')</code>, please consider another |
| approach, because it almost certainly indicates that something is |
| horribly wrong. |
| |
| </p> |
| <h3>Compatibility with Older Node Versions<span><a class="mark" href="#stream_compatibility_with_older_node_versions" id="stream_compatibility_with_older_node_versions">#</a></span></h3> |
| <!--type=misc--> |
| |
| <p>In versions of Node prior to v0.10, the Readable stream interface was |
| simpler, but also less powerful and less useful. |
| |
| </p> |
| <ul> |
| <li>Rather than waiting for you to call the <code>read()</code> method, <code>'data'</code> |
| events would start emitting immediately. If you needed to do some |
| I/O to decide how to handle data, then you had to store the chunks |
| in some kind of buffer so that they would not be lost.</li> |
| <li>The <code>pause()</code> method was advisory, rather than guaranteed. This |
| meant that you still had to be prepared to receive <code>'data'</code> events |
| even when the stream was in a paused state.</li> |
| </ul> |
| <p>In Node v0.10, the Readable class described below was added. For |
| backwards compatibility with older Node programs, Readable streams |
| switch into "flowing mode" when a <code>'data'</code> event handler is added, or |
| when the <code>pause()</code> or <code>resume()</code> methods are called. The effect is |
| that, even if you are not using the new <code>read()</code> method and |
| <code>'readable'</code> event, you no longer have to worry about losing <code>'data'</code> |
| chunks. |
| |
| </p> |
| <p>Most programs will continue to function normally. However, this |
| introduces an edge case in the following conditions: |
| |
| </p> |
| <ul> |
| <li>No <code>'data'</code> event handler is added.</li> |
| <li>The <code>pause()</code> and <code>resume()</code> methods are never called.</li> |
| </ul> |
| <p>For example, consider the following code: |
| |
| </p> |
| <pre><code class="javascript">// WARNING! BROKEN! |
| net.createServer(function(socket) { |
| |
| // we add an 'end' method, but never consume the data |
| socket.on('end', function() { |
| // It will never get here. |
| socket.end('I got your message (but didnt read it)\n'); |
| }); |
| |
| }).listen(1337);</code></pre> |
| <p>In versions of node prior to v0.10, the incoming message data would be |
| simply discarded. However, in Node v0.10 and beyond, the socket will |
| remain paused forever. |
| |
| </p> |
| <p>The workaround in this situation is to call the <code>resume()</code> method to |
| trigger "old mode" behavior: |
| |
| </p> |
| <pre><code class="javascript">// Workaround |
| net.createServer(function(socket) { |
| |
| socket.on('end', function() { |
| socket.end('I got your message (but didnt read it)\n'); |
| }); |
| |
| // start the flow of data, discarding it. |
| socket.resume(); |
| |
| }).listen(1337);</code></pre> |
| <p>In addition to new Readable streams switching into flowing-mode, pre-v0.10 |
| style streams can be wrapped in a Readable class using the <code>wrap()</code> |
| method. |
| |
| |
| </p> |
| <h3>Object Mode<span><a class="mark" href="#stream_object_mode" id="stream_object_mode">#</a></span></h3> |
| <!--type=misc--> |
| |
| <p>Normally, Streams operate on Strings and Buffers exclusively. |
| |
| </p> |
| <p>Streams that are in <strong>object mode</strong> can emit generic JavaScript values |
| other than Buffers and Strings. |
| |
| </p> |
| <p>A Readable stream in object mode will always return a single item from |
| a call to <code>stream.read(size)</code>, regardless of what the size argument |
| is. |
| |
| </p> |
| <p>A Writable stream in object mode will always ignore the <code>encoding</code> |
| argument to <code>stream.write(data, encoding)</code>. |
| |
| </p> |
| <p>The special value <code>null</code> still retains its special value for object |
| mode streams. That is, for object mode readable streams, <code>null</code> as a |
| return value from <code>stream.read()</code> indicates that there is no more |
| data, and <a href="#stream_readable_push_chunk_encoding"><code>stream.push(null)</code></a> will signal the end of stream data |
| (<code>EOF</code>). |
| |
| </p> |
| <p>No streams in Node core are object mode streams. This pattern is only |
| used by userland streaming libraries. |
| |
| </p> |
| <p>You should set <code>objectMode</code> in your stream child class constructor on |
| the options object. Setting <code>objectMode</code> mid-stream is not safe. |
| |
| </p> |
| <h3>State Objects<span><a class="mark" href="#stream_state_objects" id="stream_state_objects">#</a></span></h3> |
| <p><a href="#stream_class_stream_readable">Readable</a> streams have a member object called <code>_readableState</code>. |
| <a href="#stream_class_stream_writable">Writable</a> streams have a member object called <code>_writableState</code>. |
| <a href="#stream_class_stream_duplex">Duplex</a> streams have both. |
| |
| </p> |
| <p><strong>These objects should generally not be modified in child classes.</strong> |
| However, if you have a Duplex or Transform stream that should be in |
| <code>objectMode</code> on the readable side, and not in <code>objectMode</code> on the |
| writable side, then you may do this in the constructor by setting the |
| flag explicitly on the appropriate state object. |
| |
| </p> |
| <pre><code class="javascript">var util = require('util'); |
| var StringDecoder = require('string_decoder').StringDecoder; |
| var Transform = require('stream').Transform; |
| util.inherits(JSONParseStream, Transform); |
| |
| // Gets \n-delimited JSON string data, and emits the parsed objects |
| function JSONParseStream(options) { |
| if (!(this instanceof JSONParseStream)) |
| return new JSONParseStream(options); |
| |
| Transform.call(this, options); |
| this._writableState.objectMode = false; |
| this._readableState.objectMode = true; |
| this._buffer = ''; |
| this._decoder = new StringDecoder('utf8'); |
| } |
| |
| JSONParseStream.prototype._transform = function(chunk, encoding, cb) { |
| this._buffer += this._decoder.write(chunk); |
| // split on newlines |
| var lines = this._buffer.split(/\r?\n/); |
| // keep the last partial line buffered |
| this._buffer = lines.pop(); |
| for (var l = 0; l < lines.length; l++) { |
| var line = lines[l]; |
| try { |
| var obj = JSON.parse(line); |
| } catch (er) { |
| this.emit('error', er); |
| return; |
| } |
| // push the parsed object out to the readable consumer |
| this.push(obj); |
| } |
| cb(); |
| }; |
| |
| JSONParseStream.prototype._flush = function(cb) { |
| // Just handle any leftover |
| var rem = this._buffer.trim(); |
| if (rem) { |
| try { |
| var obj = JSON.parse(rem); |
| } catch (er) { |
| this.emit('error', er); |
| return; |
| } |
| // push the parsed object out to the readable consumer |
| this.push(obj); |
| } |
| cb(); |
| };</code></pre> |
| <p>The state objects contain other useful information for debugging the |
| state of streams in your programs. It is safe to look at them, but |
| beyond setting option flags in the constructor, it is <strong>not</strong> safe to |
| modify them. |
| |
| |
| </p> |
| |
| </div> |
| </div> |
| </div> |
| <div id="footer"> |
| <a href="http://joyent.com" class="joyent-logo">Joyent</a> |
| <ul class="clearfix"> |
| <li><a href="/">Node.js</a></li> |
| <li><a href="/download/">Download</a></li> |
| <li><a href="/about/">About</a></li> |
| <li><a href="http://npmjs.org/">npm Registry</a></li> |
| <li><a href="http://nodejs.org/api/">Docs</a></li> |
| <li><a href="http://blog.nodejs.org">Blog</a></li> |
| <li><a href="/community/">Community</a></li> |
| <li><a href="/logos/">Logos</a></li> |
| <li><a href="http://jobs.nodejs.org/">Jobs</a></li> |
| <li><a href="http://twitter.com/nodejs" class="twitter">@nodejs</a></li> |
| </ul> |
| |
| <p>Copyright <a href="http://joyent.com/">Joyent, Inc</a>, Node.js is a <a href="/trademark-policy.pdf">trademark</a> of Joyent, Inc. View <a href="https://raw.github.com/joyent/node/v0.10.24/LICENSE">license</a>.</p> |
| </div> |
| |
| <script src="../sh_main.js"></script> |
| <script src="../sh_javascript.min.js"></script> |
| <script>highlight(undefined, undefined, 'pre');</script> |
| <script> |
| window._gaq = [['_setAccount', 'UA-10874194-2'], ['_trackPageview']]; |
| (function(d, t) { |
| var g = d.createElement(t), |
| s = d.getElementsByTagName(t)[0]; |
| g.src = '//www.google-analytics.com/ga.js'; |
| s.parentNode.insertBefore(g, s); |
| }(document, 'script')); |
| </script> |
| </body> |
| </html> |
| |