Change Data Capture service for MongoDB

Alejandro Cora González
3 min readDec 20, 2023

There are many situations where is required to react in real or near real-time to changes that occur in some data source. In this scenario, the implementation of a “CDC — Change Data Capture” service could be crucial to achieve that goal. In this post let’s create that mechanism for the MongoDB engine.

Change Data Capture

Change Data Capture also known as CDC refers to the ability to track and capture changes on a certain data source like a database, so they can be replicated or processed by a target service to achieve data replication, integrity, and consistency across the ecosystem.

There are several use cases where the implementation of a Change Data Capture service is required:

  • Event tracking.
  • Business Intelligence dashboards.
  • Real-time applications.
  • Data replication.

Let’s see how we could implement a Change Data Capture service for the MongoDB engine…

MongoDB Change Streams

A change stream on the MongoDB engine is just a stream that contains real-time information in the form of an Event of the changes that occur on the entire database or specific collection.

Change Streams are not available for Standalone mode, your database must be a Replica Set or a Sharded Cluster.

For more information: https://www.mongodb.com/basics/change-streams

MongoDB Replica Set

In MongoDB, a replica set is a group of processes that maintain the same data set, providing redundancy and high availability. For more information about the MongoDB replication process check this link.

For testing purposes, we can create a local Replica Set using Docker containers. Below, you will find the commands to use:

Create the network and execute the containers

docker network create mongoCluster

docker run -d --rm -p 27017:27017 --name mongo1 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo1
docker run -d --rm -p 27018:27017 --name mongo2 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo2
docker run -d --rm -p 27019:27017 --name mongo3 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo3

Initialize the replica set

docker exec -it mongo1 mongosh --eval "rs.initiate({                                                                                                                                                         ─╯
_id: \"myReplicaSet\",
members: [
{_id: 0, host: \"mongo1\"},
{_id: 1, host: \"mongo2\"},
{_id: 2, host: \"mongo3\"}
]
})"

Check containers and replica set status

docker ps
docker exec -it mongo1 mongosh --eval "rs.status()"

Stop the containers

docker stop mongo1 mongo2 mongo3

Creating the service

Now that we have the MongoDB replica set created and running, let’s write a simple Python script to process the events that will be coming through the change stream.

# -*- coding: utf-8 -*-

from pprint import pprint
from time import sleep

from pymongo import MongoClient
from pymongo.errors import PyMongoError


def main():
client = MongoClient(
host="localhost",
port=27017,
directConnection=True,
username=None,
password=None)

while True:
try:
print("Connecting to MongoDB server and waiting for streams events...")

# You could define the operations you want to process...
# pipeline = [{'$match': {'operationType': 'insert'}}, {'$match': {'operationType': 'replace'}}]
# And you could resume the stream...

with client.watch(pipeline=None, resume_after=None) as stream:
for event in stream:
print(
f"Received event: {event.get('operationType')} "
f"for document: {event.get('documentKey', {}).get('_id')}."
)

pprint(event)

except PyMongoError as error:
print(
f"An error has been raised. "
f"Trying to resume after 30 seconds. "
f"Error: {error}."
)

sleep(30)


if __name__ == '__main__':
main()

Testing the service

That’s all for today. It was a simple script/service, but it could be used as the starting point when the requirement is to react in real-time to changes on a MongoDB database or collection…

Bye…

--

--

Alejandro Cora González

Solutions Architect | Python Developer | Serverless Advocate