Node.js streams are a powerful way to handle data. They enable you to work with data piece by piece, processing it as it arrives, which is essential for building efficient and high-performance applications. This chapter will take you from the basics to advanced concepts of streams in Node.js, providing a comprehensive understanding along with detailed examples.
Streams are objects that let you read data from a source or write data to a destination in a continuous manner. There are four main types of streams in Node.js:
Readable streams are used to read data from a source. A common example is reading a file.
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt', 'utf8');
readableStream.on('data', (chunk) => {
console.log('Received chunk:', chunk);
});
readableStream.on('end', () => {
console.log('End of file');
});
fs.createReadStream('example.txt', 'utf8')
: Creates a readable stream from the file example.txt
.readableStream.on('data', callback)
: Listens for the ‘data’ event to read chunks of data.readableStream.on('end', callback)
: Listens for the ‘end’ event, indicating the end of the file.
// Output //
Received chunk: (file content)
End of file
Writable streams are used to write data to a destination, such as a file.
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
writableStream.write('Hello, World!\n');
writableStream.write('Writing to a stream.\n');
writableStream.end('Finished writing.');
writableStream.on('finish', () => {
console.log('All writes are now complete.');
});
fs.createWriteStream('output.txt')
: Creates a writable stream to the file output.txt
.writableStream.write(data)
: Writes data to the stream.writableStream.end(data)
: Signals the end of writing, optionally with a final piece of data.writableStream.on('finish', callback)
: Listens for the ‘finish’ event, indicating that all writes are complete.
// Output //
All writes are now complete.
Duplex streams can read and write data. A common example is a network socket.
const net = require('net');
const server = net.createServer((socket) => {
socket.on('data', (data) => {
console.log('Received:', data.toString());
socket.write('Echo: ' + data);
});
});
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
net.createServer(callback)
: Creates a server that listens for incoming connections.socket.on('data', callback)
: Listens for data from the client.socket.write(data)
: Writes data back to the client.
// Output //
Server listening on port 3000
Received: (client data)
Echo: (client data)
Transform streams are duplex streams that can modify or transform the data as it is written and read.
const { Transform } = require('stream');
const transformStream = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(transformStream).pipe(process.stdout);
new Transform(options)
: Creates a transform stream with a transform function.this.push(data)
: Pushes transformed data to the readable side of the stream.process.stdin.pipe(transformStream).pipe(process.stdout)
: Pipes the input from stdin through the transform stream and outputs to stdout.
// Output //
(input text converted to uppercase)
Piping is a mechanism for connecting multiple streams together, passing the output of one stream directly as input to another.
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt', 'utf8');
const writableStream = fs.createWriteStream('output.txt');
readableStream.pipe(writableStream);
writableStream.on('finish', () => {
console.log('Piping complete');
});
readableStream.pipe(writableStream)
: Pipes data from the readable stream to the writable stream.
// Output //
Piping complete
Streams emit several events that you can listen to in order to manage the flow of data.
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt', 'utf8');
const writableStream = fs.createWriteStream('output.txt');
readableStream.on('data', (chunk) => {
console.log('Received chunk:', chunk);
writableStream.write(chunk);
});
readableStream.on('end', () => {
writableStream.end();
console.log('Read stream ended');
});
writableStream.on('finish', () => {
console.log('Write stream finished');
});
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
// Output //
Received chunk: (file content)
Read stream ended
Write stream finished
Backpressure occurs when the writable stream cannot handle the rate at which data is being written to it. Properly handling backpressure is crucial for building efficient applications.
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt', 'utf8');
const writableStream = fs.createWriteStream('output.txt');
readableStream.on('data', (chunk) => {
if (!writableStream.write(chunk)) {
readableStream.pause();
}
});
writableStream.on('drain', () => {
readableStream.resume();
});
readableStream.on('end', () => {
writableStream.end();
console.log('Read stream ended');
});
writableStream.on('finish', () => {
console.log('Write stream finished');
});
readableStream.pause()
: Pauses the readable stream if the writable stream is overwhelmed.writableStream.on('drain', callback)
: Resumes the readable stream when the writable stream is ready to receive more data.
// Output //
Read stream ended
Write stream finished
The stream.pipeline
method is a high-level way to pipe streams together and manage errors.
const { pipeline } = require('stream');
const fs = require('fs');
pipeline(
fs.createReadStream('example.txt', 'utf8'),
fs.createWriteStream('output.txt'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
pipeline(...streams, callback)
: Pipes multiple streams together and calls the callback on success or error.
// Output //
Pipeline succeeded
When dealing with large files, using streams is more efficient than reading the entire file into memory.
const fs = require('fs');
const sourceFile = 'largefile.txt';
const destinationFile = 'copy_largefile.txt';
const readableStream = fs.createReadStream(sourceFile);
const writableStream = fs.createWriteStream(destinationFile);
readableStream.pipe(writableStream);
writableStream.on('finish', () => {
console.log('File copied successfully');
});
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
fs.createReadStream
to create a readable stream from the source file.fs.createWriteStream
to create a writable stream for the destination file.A file server serves files over HTTP, using streams to handle file I/O efficiently.
Example: HTTP File Server
const http = require('http');
const fs = require('fs');
const path = require('path');
const server = http.createServer((req, res) => {
const filePath = path.join(__dirname, req.url);
fs.access(filePath, fs.constants.F_OK, (err) => {
if (err) {
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('File not found');
return;
}
const readableStream = fs.createReadStream(filePath);
res.writeHead(200, { 'Content-Type': 'application/octet-stream' });
readableStream.pipe(res);
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
res.writeHead(500, { 'Content-Type': 'text/plain' });
res.end('Internal server error');
});
});
});
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
http.createServer
to create an HTTP server.fs.access
to check if the requested file exists.fs.createReadStream
to create a readable stream from the file and pipes it to the HTTP response.
// Output //
Server listening on port 3000
Transform streams can be used to process data in real-time, such as converting data to uppercase as it’s being read.
const { Transform } = require('stream');
const http = require('http');
const uppercaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
const server = http.createServer((req, res) => {
if (req.method === 'POST') {
req.pipe(uppercaseTransform).pipe(res);
} else {
res.writeHead(405, { 'Content-Type': 'text/plain' });
res.end('Method Not Allowed');
}
});
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
http.createServer
to create an HTTP server.
// Output //
Server listening on port 3000
Streaming large JSON data efficiently using readable streams and writable streams.
const fs = require('fs');
const { Transform } = require('stream');
const sourceFile = 'largeData.json';
const destinationFile = 'processedData.json';
const readableStream = fs.createReadStream(sourceFile);
const writableStream = fs.createWriteStream(destinationFile);
const transformStream = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
const data = JSON.parse(chunk);
// Example transformation: Filter out certain entries
const filteredData = data.filter(item => item.isActive);
this.push(JSON.stringify(filteredData));
callback();
}
});
readableStream.pipe(transformStream).pipe(writableStream);
writableStream.on('finish', () => {
console.log('JSON data processed and saved');
});
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
transformStream.on('error', (err) => {
console.error('Error transforming data:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
fs.createReadStream
.fs.createWriteStream
.
// Output //
JSON data processed and saved
Using streams to handle real-time data transfer in a simple chat application.
const net = require('net');
const clients = [];
const server = net.createServer((socket) => {
clients.push(socket);
socket.write('Welcome to the chat!\n');
socket.on('data', (data) => {
clients.forEach((client) => {
if (client !== socket) {
client.write(data);
}
});
});
socket.on('end', () => {
clients.splice(clients.indexOf(socket), 1);
});
socket.on('error', (err) => {
console.error('Socket error:', err);
});
});
server.listen(3000, () => {
console.log('Chat server listening on port 3000');
});
net.createServer
to create a TCP server for chat.
// Output //
Chat server listening on port 3000
Node.js streams are a powerful and flexible way to handle I/O operations efficiently. By understanding the different types of streams and how to use them, you can build high-performance applications that handle large amounts of data with ease. Streams provide a robust foundation for working with data in Node.js, from simple file operations to complex real-time data processing.Happy coding !❤️