Node.js Streams

Node.js streams are a powerful way to handle data. They enable you to work with data piece by piece, processing it as it arrives, which is essential for building efficient and high-performance applications. This chapter will take you from the basics to advanced concepts of streams in Node.js, providing a comprehensive understanding along with detailed examples.

Understanding Streams

Streams are objects that let you read data from a source or write data to a destination in a continuous manner. There are four main types of streams in Node.js:

  • Readable: Used for reading data.
  • Writable: Used for writing data.
  • Duplex: Can be used for both reading and writing.
  • Transform: A type of duplex stream where the output is computed based on the input.

The Basics of Streams

Readable Streams

Readable streams are used to read data from a source. A common example is reading a file.

Example: Reading from a File

				
					const fs = require('fs');

const readableStream = fs.createReadStream('example.txt', 'utf8');

readableStream.on('data', (chunk) => {
  console.log('Received chunk:', chunk);
});

readableStream.on('end', () => {
  console.log('End of file');
});

				
			

Explanation:

  • fs.createReadStream('example.txt', 'utf8'): Creates a readable stream from the file example.txt.
  • readableStream.on('data', callback): Listens for the ‘data’ event to read chunks of data.
  • readableStream.on('end', callback): Listens for the ‘end’ event, indicating the end of the file.
				
					// Output //
Received chunk: (file content)
End of file

				
			

Writable Streams

Writable streams are used to write data to a destination, such as a file.

Example: Writing to a File

				
					const fs = require('fs');

const writableStream = fs.createWriteStream('output.txt');

writableStream.write('Hello, World!\n');
writableStream.write('Writing to a stream.\n');
writableStream.end('Finished writing.');

writableStream.on('finish', () => {
  console.log('All writes are now complete.');
});

				
			

Explanation:

  • fs.createWriteStream('output.txt'): Creates a writable stream to the file output.txt.
  • writableStream.write(data): Writes data to the stream.
  • writableStream.end(data): Signals the end of writing, optionally with a final piece of data.
  • writableStream.on('finish', callback): Listens for the ‘finish’ event, indicating that all writes are complete.
				
					// Output //
All writes are now complete.


				
			

Duplex Streams

Duplex streams can read and write data. A common example is a network socket.

Example: Duplex Stream with net.Socket

				
					const net = require('net');

const server = net.createServer((socket) => {
  socket.on('data', (data) => {
    console.log('Received:', data.toString());
    socket.write('Echo: ' + data);
  });
});

server.listen(3000, () => {
  console.log('Server listening on port 3000');
});

				
			

Explanation:

  • net.createServer(callback): Creates a server that listens for incoming connections.
  • socket.on('data', callback): Listens for data from the client.
  • socket.write(data): Writes data back to the client.
				
					// Output //
Server listening on port 3000
Received: (client data)
Echo: (client data)

				
			

Transform Streams

Transform streams are duplex streams that can modify or transform the data as it is written and read.

Example: Transform Stream

				
					const { Transform } = require('stream');

const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(transformStream).pipe(process.stdout);

				
			

Explanation:

  • new Transform(options): Creates a transform stream with a transform function.
  • this.push(data): Pushes transformed data to the readable side of the stream.
  • process.stdin.pipe(transformStream).pipe(process.stdout): Pipes the input from stdin through the transform stream and outputs to stdout.
				
					// Output //
(input text converted to uppercase)

				
			

Working with Streams

Piping Streams

Piping is a mechanism for connecting multiple streams together, passing the output of one stream directly as input to another.

Example: Piping a Readable Stream to a Writable Stream

				
					const fs = require('fs');

const readableStream = fs.createReadStream('example.txt', 'utf8');
const writableStream = fs.createWriteStream('output.txt');

readableStream.pipe(writableStream);

writableStream.on('finish', () => {
  console.log('Piping complete');
});

				
			

Explanation:

  • readableStream.pipe(writableStream): Pipes data from the readable stream to the writable stream.
				
					// Output //
Piping complete

				
			

Handling Stream Events

Streams emit several events that you can listen to in order to manage the flow of data.

  • ‘data’: Emitted when a chunk of data is available.
  • ‘end’: Emitted when there is no more data to be read.
  • ‘error’: Emitted when an error occurs.
  • ‘finish’: Emitted when all data has been flushed to the underlying system.

Example: Handling Stream Events

				
					const fs = require('fs');

const readableStream = fs.createReadStream('example.txt', 'utf8');
const writableStream = fs.createWriteStream('output.txt');

readableStream.on('data', (chunk) => {
  console.log('Received chunk:', chunk);
  writableStream.write(chunk);
});

readableStream.on('end', () => {
  writableStream.end();
  console.log('Read stream ended');
});

writableStream.on('finish', () => {
  console.log('Write stream finished');
});

readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});

writableStream.on('error', (err) => {
  console.error('Error writing file:', err);
});

				
			

Explanation:

  • Listens for ‘data’, ‘end’, ‘finish’, and ‘error’ events on both streams to handle various scenarios.
				
					// Output //
Received chunk: (file content)
Read stream ended
Write stream finished

				
			

Advanced Stream Techniques

Stream Backpressure

Backpressure occurs when the writable stream cannot handle the rate at which data is being written to it. Properly handling backpressure is crucial for building efficient applications.

Example: Handling Backpressure

				
					const fs = require('fs');

const readableStream = fs.createReadStream('example.txt', 'utf8');
const writableStream = fs.createWriteStream('output.txt');

readableStream.on('data', (chunk) => {
  if (!writableStream.write(chunk)) {
    readableStream.pause();
  }
});

writableStream.on('drain', () => {
  readableStream.resume();
});

readableStream.on('end', () => {
  writableStream.end();
  console.log('Read stream ended');
});

writableStream.on('finish', () => {
  console.log('Write stream finished');
});

				
			

Explanation:

  • readableStream.pause(): Pauses the readable stream if the writable stream is overwhelmed.
  • writableStream.on('drain', callback): Resumes the readable stream when the writable stream is ready to receive more data.
				
					// Output //
Read stream ended
Write stream finished

				
			

Stream Pipelines

The stream.pipeline method is a high-level way to pipe streams together and manage errors.

Example: Using Stream Pipelines

				
					const { pipeline } = require('stream');
const fs = require('fs');

pipeline(
  fs.createReadStream('example.txt', 'utf8'),
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

				
			

Explanation:

  • pipeline(...streams, callback): Pipes multiple streams together and calls the callback on success or error.
				
					// Output //
Pipeline succeeded

				
			

Practical Examples

Reading and Writing Large Files

When dealing with large files, using streams is more efficient than reading the entire file into memory.

Example: Copying a Large File

				
					const fs = require('fs');

const sourceFile = 'largefile.txt';
const destinationFile = 'copy_largefile.txt';

const readableStream = fs.createReadStream(sourceFile);
const writableStream = fs.createWriteStream(destinationFile);

readableStream.pipe(writableStream);

writableStream.on('finish', () => {
  console.log('File copied successfully');
});

readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});

writableStream.on('error', (err) => {
  console.error('Error writing file:', err);
});

				
			

Explanation:

  • Uses fs.createReadStream to create a readable stream from the source file.
  • Uses fs.createWriteStream to create a writable stream for the destination file.
  • Pipes the readable stream to the writable stream to copy the file.
  • Handles errors for both streams.

Creating a Simple File Server

A file server serves files over HTTP, using streams to handle file I/O efficiently.

Example: HTTP File Server

				
					const http = require('http');
const fs = require('fs');
const path = require('path');

const server = http.createServer((req, res) => {
  const filePath = path.join(__dirname, req.url);
  
  fs.access(filePath, fs.constants.F_OK, (err) => {
    if (err) {
      res.writeHead(404, { 'Content-Type': 'text/plain' });
      res.end('File not found');
      return;
    }

    const readableStream = fs.createReadStream(filePath);
    res.writeHead(200, { 'Content-Type': 'application/octet-stream' });
    readableStream.pipe(res);

    readableStream.on('error', (err) => {
      console.error('Error reading file:', err);
      res.writeHead(500, { 'Content-Type': 'text/plain' });
      res.end('Internal server error');
    });
  });
});

server.listen(3000, () => {
  console.log('Server listening on port 3000');
});

				
			

Explanation:

  • Uses http.createServer to create an HTTP server.
  • Uses fs.access to check if the requested file exists.
  • Uses fs.createReadStream to create a readable stream from the file and pipes it to the HTTP response.
  • Handles errors for both file access and stream reading.
				
					// Output //
Server listening on port 3000

				
			

Transforming Data in Real-Time

Transform streams can be used to process data in real-time, such as converting data to uppercase as it’s being read.

Example: Transform Stream for Uppercase Conversion

				
					const { Transform } = require('stream');
const http = require('http');

const uppercaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

const server = http.createServer((req, res) => {
  if (req.method === 'POST') {
    req.pipe(uppercaseTransform).pipe(res);
  } else {
    res.writeHead(405, { 'Content-Type': 'text/plain' });
    res.end('Method Not Allowed');
  }
});

server.listen(3000, () => {
  console.log('Server listening on port 3000');
});

				
			

Explanation:

  • Creates a transform stream that converts data to uppercase.
  • Uses http.createServer to create an HTTP server.
  • Pipes the incoming request through the transform stream and then to the response.
  • Handles only POST requests, responding with an error for other methods.
				
					// Output //
Server listening on port 3000

				
			

Streaming Large JSON Data

Streaming large JSON data efficiently using readable streams and writable streams.

Example: Streaming JSON Data

				
					const fs = require('fs');
const { Transform } = require('stream');

const sourceFile = 'largeData.json';
const destinationFile = 'processedData.json';

const readableStream = fs.createReadStream(sourceFile);
const writableStream = fs.createWriteStream(destinationFile);

const transformStream = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    const data = JSON.parse(chunk);
    // Example transformation: Filter out certain entries
    const filteredData = data.filter(item => item.isActive);
    this.push(JSON.stringify(filteredData));
    callback();
  }
});

readableStream.pipe(transformStream).pipe(writableStream);

writableStream.on('finish', () => {
  console.log('JSON data processed and saved');
});

readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});

transformStream.on('error', (err) => {
  console.error('Error transforming data:', err);
});

writableStream.on('error', (err) => {
  console.error('Error writing file:', err);
});

				
			

Explanation:

  • Reads a large JSON file using fs.createReadStream.
  • Transforms the JSON data using a transform stream that filters out inactive entries.
  • Writes the transformed data to a new file using fs.createWriteStream.
  • Handles errors for reading, transforming, and writing.
				
					// Output //
JSON data processed and saved

				
			

Real-Time Chat Application

Using streams to handle real-time data transfer in a simple chat application.

Example: Real-Time Chat Server

				
					const net = require('net');

const clients = [];

const server = net.createServer((socket) => {
  clients.push(socket);
  socket.write('Welcome to the chat!\n');

  socket.on('data', (data) => {
    clients.forEach((client) => {
      if (client !== socket) {
        client.write(data);
      }
    });
  });

  socket.on('end', () => {
    clients.splice(clients.indexOf(socket), 1);
  });

  socket.on('error', (err) => {
    console.error('Socket error:', err);
  });
});

server.listen(3000, () => {
  console.log('Chat server listening on port 3000');
});

				
			

Explanation:

  • Uses net.createServer to create a TCP server for chat.
  • Maintains a list of connected clients and broadcasts messages to all clients except the sender.
  • Handles client connection and disconnection, as well as errors.
				
					// Output //
Chat server listening on port 3000

				
			

Summary

  • Readable Streams: For reading data from sources.
  • Writable Streams: For writing data to destinations.
  • Duplex Streams: For reading and writing data simultaneously.
  • Transform Streams: For modifying data during read/write operations.
  • Piping and Backpressure: Efficiently manage data flow between streams.
  • Stream Pipelines: Simplify the process of connecting multiple streams and handling errors.

Node.js streams are a powerful and flexible way to handle I/O operations efficiently. By understanding the different types of streams and how to use them, you can build high-performance applications that handle large amounts of data with ease. Streams provide a robust foundation for working with data in Node.js, from simple file operations to complex real-time data processing.Happy coding !❤️

Table of Contents

Contact here

Copyright © 2025 Diginode

Made with ❤️ in India