MongoDB’s Change Streams feature allows developers to create real-time applications by capturing and processing data changes as they happen in the database.
Real-time applications are designed to respond to user actions or data changes instantly. Some examples include:
Building such applications requires mechanisms to listen to data changes and immediately act upon them. MongoDB’s Change Streams makes this possible by enabling applications to listen to changes in the database in real time.
Change Streams allow applications to subscribe to changes in MongoDB collections, databases, or entire clusters. It captures any insert
, update
, replace
, delete
, and drop
operations and delivers them as change events. Key benefits of using Change Streams include:
Change Streams leverage MongoDB’s internal replication mechanism, tailing the oplog (operations log) to detect changes in collections. These changes are then delivered to any application or service subscribed to the stream.
Using Change Streams offers several benefits for developers building real-time applications:
To use Change Streams, ensure that your MongoDB deployment is in replica set or sharded cluster mode, as Change Streams rely on the oplog. Let’s look at how to set up Change Streams in a MongoDB collection.
Here’s a simple example of starting a Change Stream on a collection named orders
.
const { MongoClient } = require('mongodb');
async function monitorChanges() {
const uri = "mongodb://localhost:27017";
const client = new MongoClient(uri);
try {
await client.connect();
const database = client.db("shop");
const collection = database.collection("orders");
const changeStream = collection.watch();
changeStream.on("change", (change) => {
console.log("Change detected:", change);
});
} finally {
// Ensures that the client will close when you finish/error
await client.close();
}
}
monitorChanges().catch(console.error);
MongoClient
: Connects to the MongoDB instance.collection.watch()
: Creates a Change Stream on the orders
collection.changeStream.on("change")
: Listens for any changes in the collection and logs them.When a new document is inserted into the orders
collection, the Change Stream would output a change event similar to this:
Change detected: {
"_id": { "_data": "8263e57f3e5d0001" },
"operationType": "insert",
"ns": { "db": "shop", "coll": "orders" },
"documentKey": { "_id": ObjectId("63e57f3e5d123") },
"fullDocument": { "_id": ObjectId("63e57f3e5d123"), "item": "book", "quantity": 2 }
}
MongoDB Change Streams capture various operations, each useful for different scenarios:
Suppose we add, update, and delete documents in the orders
collection. Each of these operations will generate corresponding change events.
Imagine a scenario where a user needs to be notified every time an order is placed.
changeStream.on("change", (change) => {
if (change.operationType === "insert") {
console.log(`New order placed: ${JSON.stringify(change.fullDocument)}`);
}
});
New order placed: { "_id": "63e57f3e5d123", "item": "book", "quantity": 2 }
Another useful example is using Change Streams to update a dashboard showing real-time metrics.
changeStream.on("change", (change) => {
if (change.operationType === "update") {
console.log(`Order updated: ${JSON.stringify(change.updateDescription.updatedFields)}`);
}
});
Order updated: { "quantity": 3 }
MongoDB Change Streams are supported in various programming languages. Here are examples for Python and Java.
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017")
db = client.shop
collection = db.orders
with collection.watch() as change_stream:
for change in change_stream:
print("Change detected:", change)
MongoCollection collection = database.getCollection("orders");
MongoCursor> cursor = collection.watch().iterator();
while (cursor.hasNext()) {
ChangeStreamDocument change = cursor.next();
System.out.println("Change detected: " + change);
}
In a distributed environment, consider deploying multiple instances of your application to handle Change Streams from multiple shards or replica sets. Use a load balancer to distribute the Change Stream load.
Change Streams can occasionally encounter errors, such as network failures or replica set failovers. It’s important to manage these by:
let resumeToken = null;
changeStream.on("change", (change) => {
console.log("Change detected:", change);
resumeToken = change._id; // Save the resume token
});
changeStream.on("error", (error) => {
console.error("Error detected:", error);
// Reconnect using the saved resume token
collection.watch([], { resumeAfter: resumeToken });
});
MongoDB’s Change Streams provide a powerful way to build real-time applications that react to data changes immediately. They simplify the development of features such as real-time notifications, live dashboards, and collaborative tools by providing an event-driven approach directly integrated into MongoDB. Happy Coding!❤️