Streams in NodeJS are a way to move data from a source to a destination in a bit-by-bit (or let’s say, in chunks), to avoid any Out-of-Memory Errors.
To understand what exactly a stream does, let us consider a scenario.
We have two buckets named source and destination. The source bucket is full of water, while the destination bucket is empty. We cannot move these two buckets from their spots, however, we have a third movable bucket called buffer. We can use this buffer to transfer water from source to destination.
Now, in the simplest of approach, let’s say, we transfer all the water from source to the buffer bucket. We carry this buffer to the destination, and then transfer everything from buffer to destination. However, there is an alternate approach as well, if we have a hose pipe called stream. We can connect source and destination with this stream, and then transfer water from source to destination in small amounts. This is what streams enable us with in NodeJS.
Consider this scenario, where we have a GET API on our server and a mp4 video file. This server, is not implementing stream, and when the API is called, it reads the entire mp4 file and sends it as response. So, in this case, if there are 5 parallel API calls, then the server would read the file 5 times and the memory consumption would be 5x of the size of the mp4 file.
However, there is a server 2, which has the same GET API, but it has been implemented using streams. Now, the video file is read once in the server and when the API call is made, we transfer the content using streams (in chunks, or bit-by-bit let’s say). Therefore, the memory consumption here, would not increase as much as we saw in the scenario where streams was not implemented.
Now that we have a fair understanding of what streams does, and why is it important, let us understand the different types of streams and the concepts related to them.
Readable Streams
A readable stream, reads data from a source and then feeds it to a pipeline bit by bit. For instance, let’s create a read stream that can read from an array, and then pass it chunk by chunk.
Streams make use of event-emitters, which means they raise events, so we can listen for events.
On a read stream, we want to listen to ‘data’ events. So, basically whenever a data event is raised, a small chunk of data is passed to the callback function.
Another important readStream event is ‘end’ , which is raised when the stream has finished reading.
When we encounter any error while handling the data etc, an ‘error’ event is emitted, with an error object.
Also, streams can read data in the following manner:
Binary: Reads data as binary. This is the default option.
String: Reads data as string. To enable this, we need to pass the encoding: ‘utf-8’ as the option
Object: Reads data as objects. To enable this, we need to pass the objectMode: true as the option
This code snippet shows the implementation of streams in the two different modes (Here, to implement the StreamFromArray, we have created a class and extended the stream.Readable class according to our need, which can be skipped, the main idea was to show the use of readable streams. Most of the time, we don’t need this custom implementations, we can implement the already existing stream types).
NodeJS comes with all types of readable streams, for eg Http requests on the server and a response on the client, they’re readable streams. The file system has readable streams. Zipping and unzipping uses readable streams. TCP sockets, process stdin.
The above code snipped shows how we can use a ReadStream to read and stream a video file.
The streams have two modes:
Flowing Stream Mode: It automatically pushes the chunk of data into the pipeline.
Non-Flowing Stream Mode: It means, we have to ask for the data, then the chunk will be pushed in the pipeline
In the above code snippet, if we invoke, readStream.pause(), we will convert our readStream (which was in flowing mode) to a non-flowing stream. We will have to ask for a chunk of data by using readStream.read().In contrast, we can convert any non-flowing stream to a flowing stream by invoking resume() on the readStream, like shown in the below snippet.
In this code snippet, we can see that we are listening to the ‘data’ event emitted by process.stdin method (process.stdin also implements a ReadStream, so we can listen to the ‘data’ event emitted by process.stdin). So, everytime, we hit ‘enter’, readStream.read() asks for data from the readStream and readStream reads a chunk of data, and prints on the console. This is how Non-Flowing mode works.
Writable Streams
Writable Streams are like the counter-parts of readable streams. Just like how we read data in chunks, we can also write data in chunks. Writable Streams represent a destination for the incoming data. Writable stream enables us to read data from a readable source and do something with that data. Also, just like the readableStreams, writableStreams are also implemented in numerous places.
Write streams comes with methods write(), end(). The write stream takes in the data to be written whereas the end() method ends the writeStream gracefully. Apart from this, writeStreams also emits events, such as ‘close’.
Backpressure
Sometimes the data coming from a readable stream is too fast to be handled by a writable stream. Consider the same water bucket example, suppose we have a destination bucket, and we have attached a hose to it, and on the other end, we have a funnel attached to the hose. Now, we start pouring water from the source bucket to the funnel, which will reach the destination bucket. But, if we start pouring too fast, the funnel will get filled, and the water will spill outside the funnel. To avoid spilling and losing water, we will stop for a while until the water in the funnel gets drained in the hose, and then again resume filling.
In this example, when the funnel is full, it can be referred to as backpressure, and how much water can the funnel accommodate, is called the high watermark.
The writeStream.write() method tells us whether more data can be pushed into the stream or not. It returns a true/false to denote if it can accommodate more data. If it returns false, it means the stream is full. When the writeStream is full, we pause the readStream until all the data from the stream is pushed to the destination and the stream is ready to take data again. To know when the stream is empty to take more data, we have to listen to the ‘drain’ event.
In the code snippet attached below, we can see how we are handling the backpressure, and how, the number of backpressure is reduced when we increase the highWaterMark in the options.
Piping Streams
In the above section we saw a number of events that we need to listen and handle when we are implementing the readable and writable streams. However, to avoid all these complications, we can make use of pipes. The pipe method, automatically handles the backpressure for us. The only thing we need to take care with pipes, is to handle errors using a error event-listenter.
Duplex Streams
A duplex stream is a stream which implements both readable and writable. Readable stream will pipe data into the stream and the duplex stream can also write that data. Duplex streams represent the middle section of the pipelines, meaning, a duplex stream can be piped in between a readable and a writable stream.
A typical duplex stream doesn’t change anything about the data. There’s a type of duplex streams called TransformStreams. Using duplex streams, we can convert, streams into pipelines. Duplex streams are a necessary component to create complex pipelines. For an instance, let us say we need to send data slowly, so we can create a throttle stream using duplexStreams and create a pipeline with it. We can achieve it with the following implementation.
Transform Streams
Transform streams are special kind of duplex streams. Instead of simply passing the data from the read stream to the write stream, transform streams change the data. In the below code snippet we have created a transform stream to take whatever input we give in the console and return an output where vowels have been replaced with Xs.
So, this was all about streams that we have in NodeJS. We can implement more such pipelines using these available streams. Apart from this, we also have a gzip package in nodeJS that can transform an incoming stream to a compressed chunk in the write stream.