+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 484 of 541

๐Ÿ“˜ NoSQL with Python: MongoDB

Master NoSQL with Python: MongoDB with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿš€Intermediate
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand NoSQL database fundamentals ๐ŸŽฏ
  • Apply MongoDB in real projects ๐Ÿ—๏ธ
  • Debug common MongoDB issues ๐Ÿ›
  • Write clean, Pythonic database code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on NoSQL with Python and MongoDB! ๐ŸŽ‰ In this guide, weโ€™ll explore how to work with MongoDB, one of the most popular NoSQL databases in the world.

Youโ€™ll discover how MongoDB can transform your Python development experience when dealing with flexible, document-based data. Whether youโ€™re building web applications ๐ŸŒ, APIs ๐Ÿ–ฅ๏ธ, or data analytics pipelines ๐Ÿ“Š, understanding MongoDB is essential for handling modern data requirements.

By the end of this tutorial, youโ€™ll feel confident using MongoDB in your own Python projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding NoSQL and MongoDB

๐Ÿค” What is NoSQL?

NoSQL is like a flexible filing cabinet ๐Ÿ—„๏ธ where you can store documents of different shapes and sizes, unlike traditional SQL databases which are more like strict spreadsheets. Think of it as the difference between organizing your notes in a binder (where every page can be unique) versus filling out identical forms.

In Python terms, NoSQL databases store data in formats similar to Python dictionaries and lists. This means you can:

  • โœจ Store complex, nested data structures easily
  • ๐Ÿš€ Scale horizontally across multiple servers
  • ๐Ÿ›ก๏ธ Handle varied data without rigid schemas

๐Ÿ’ก Why Use MongoDB?

Hereโ€™s why developers love MongoDB:

  1. Flexible Schema ๐Ÿ”“: Store documents with different structures
  2. JSON-like Documents ๐Ÿ“‹: Work with familiar data formats
  3. Powerful Queries ๐Ÿ”: Rich query language for complex operations
  4. Scalability ๐Ÿ“ˆ: Built for distributed systems
  5. Python Integration ๐Ÿ: Excellent PyMongo driver

Real-world example: Imagine building an e-commerce platform ๐Ÿ›’. With MongoDB, you can store products with varying attributes (books have authors, electronics have specifications) without creating complex table relationships.

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Setting Up MongoDB with Python

Letโ€™s start by installing and connecting to MongoDB:

# ๐Ÿ‘‹ First, install PyMongo
# pip install pymongo

from pymongo import MongoClient
from datetime import datetime

# ๐ŸŽจ Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
print("โœ… Connected to MongoDB!")

# ๐Ÿ“ฆ Create or access a database
db = client['my_store']

# ๐Ÿ“‹ Create or access a collection (like a table)
products = db['products']

๐Ÿ’ก Explanation: MongoDB uses databases to group collections, and collections to store documents. Itโ€™s like having folders (databases) containing files (collections) with pages (documents).

๐ŸŽฏ Basic CRUD Operations

Here are the essential MongoDB operations:

# ๐Ÿ—๏ธ CREATE - Insert documents
def add_product(name, price, category, tags=[]):
    # ๐Ÿ“ Create a product document
    product = {
        "name": name,
        "price": price,
        "category": category,
        "tags": tags,
        "created_at": datetime.now(),
        "in_stock": True,
        "emoji": "๐Ÿ“ฆ"  # Every product needs an emoji!
    }
    
    # โž• Insert into collection
    result = products.insert_one(product)
    print(f"โœ… Added product with ID: {result.inserted_id}")
    return result.inserted_id

# ๐Ÿ” READ - Find documents
def find_products_by_category(category):
    # ๐ŸŽฏ Query documents
    cursor = products.find({"category": category})
    
    print(f"๐Ÿ›๏ธ Products in {category}:")
    for product in cursor:
        print(f"  {product['emoji']} {product['name']} - ${product['price']}")

# ๐Ÿ”„ UPDATE - Modify documents
def update_price(product_id, new_price):
    # ๐Ÿ“ Update a document
    result = products.update_one(
        {"_id": product_id},
        {"$set": {"price": new_price, "updated_at": datetime.now()}}
    )
    print(f"โœ๏ธ Updated {result.modified_count} product(s)")

# ๐Ÿ—‘๏ธ DELETE - Remove documents
def remove_out_of_stock():
    # ๐ŸŽฏ Delete multiple documents
    result = products.delete_many({"in_stock": False})
    print(f"๐Ÿ—‘๏ธ Removed {result.deleted_count} out-of-stock products")

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: E-Commerce Product Catalog

Letโ€™s build a real product catalog system:

# ๐Ÿ›๏ธ Advanced product management system
class ProductCatalog:
    def __init__(self, db_name='ecommerce'):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client[db_name]
        self.products = self.db['products']
        self.categories = self.db['categories']
        print("๐Ÿช Product Catalog initialized!")
    
    # โž• Add product with validation
    def add_product(self, product_data):
        # ๐Ÿ›ก๏ธ Validate required fields
        required = ['name', 'price', 'category']
        for field in required:
            if field not in product_data:
                print(f"โŒ Missing required field: {field}")
                return None
        
        # ๐ŸŽจ Enhance product data
        product_data.update({
            'created_at': datetime.now(),
            'views': 0,
            'ratings': [],
            'emoji': self._get_category_emoji(product_data['category'])
        })
        
        result = self.products.insert_one(product_data)
        print(f"โœ… Added: {product_data['emoji']} {product_data['name']}")
        return result.inserted_id
    
    # ๐Ÿ” Advanced search with filters
    def search_products(self, filters=None, sort_by='price', limit=10):
        # ๐ŸŽฏ Build query
        query = filters or {}
        
        # ๐Ÿ“Š Execute search with sorting
        cursor = self.products.find(query).sort(sort_by).limit(limit)
        
        results = []
        for product in cursor:
            results.append({
                'id': str(product['_id']),
                'name': product['name'],
                'price': product['price'],
                'emoji': product.get('emoji', '๐Ÿ“ฆ')
            })
        
        return results
    
    # ๐Ÿ’ฐ Price range queries
    def find_by_price_range(self, min_price, max_price):
        # ๐Ÿ” Range query
        query = {
            "price": {
                "$gte": min_price,  # Greater than or equal
                "$lte": max_price   # Less than or equal
            }
        }
        
        print(f"๐Ÿ’ฐ Products between ${min_price} and ${max_price}:")
        for product in self.products.find(query):
            print(f"  {product['emoji']} {product['name']} - ${product['price']}")
    
    # ๐Ÿท๏ธ Tag-based search
    def find_by_tags(self, tags):
        # ๐ŸŽฏ Find products with ANY of the tags
        query = {"tags": {"$in": tags}}
        
        results = list(self.products.find(query))
        print(f"๐Ÿท๏ธ Found {len(results)} products with tags {tags}")
        return results
    
    # ๐Ÿ“Š Analytics
    def get_category_stats(self):
        # ๐ŸŽจ Aggregation pipeline
        pipeline = [
            {
                "$group": {
                    "_id": "$category",
                    "count": {"$sum": 1},
                    "avg_price": {"$avg": "$price"},
                    "total_value": {"$sum": "$price"}
                }
            },
            {
                "$sort": {"count": -1}
            }
        ]
        
        print("๐Ÿ“Š Category Statistics:")
        for stat in self.products.aggregate(pipeline):
            print(f"  ๐Ÿ“ {stat['_id']}: {stat['count']} products, "
                  f"avg ${stat['avg_price']:.2f}")
    
    # ๐ŸŽจ Helper method
    def _get_category_emoji(self, category):
        emojis = {
            'electronics': '๐Ÿ’ป',
            'books': '๐Ÿ“š',
            'clothing': '๐Ÿ‘•',
            'food': '๐Ÿ•',
            'toys': '๐ŸŽฎ',
            'default': '๐Ÿ“ฆ'
        }
        return emojis.get(category.lower(), emojis['default'])

# ๐ŸŽฎ Let's use it!
catalog = ProductCatalog()

# Add some products
catalog.add_product({
    'name': 'Python Cookbook',
    'price': 45.99,
    'category': 'books',
    'tags': ['programming', 'python', 'reference'],
    'author': 'David Beazley'
})

catalog.add_product({
    'name': 'Mechanical Keyboard',
    'price': 129.99,
    'category': 'electronics',
    'tags': ['gaming', 'rgb', 'mechanical'],
    'features': ['RGB backlight', 'Cherry MX switches']
})

๐ŸŽฏ Try it yourself: Add a method to handle product reviews and ratings!

๐ŸŽฎ Example 2: Real-time Analytics Dashboard

Letโ€™s create an analytics system:

# ๐Ÿ“Š Analytics and monitoring system
class AnalyticsDashboard:
    def __init__(self):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client['analytics']
        self.events = self.db['events']
        self.sessions = self.db['sessions']
        print("๐Ÿ“Š Analytics Dashboard ready!")
    
    # ๐Ÿ“ Track user events
    def track_event(self, user_id, event_type, metadata=None):
        event = {
            'user_id': user_id,
            'type': event_type,
            'timestamp': datetime.now(),
            'metadata': metadata or {},
            'emoji': self._get_event_emoji(event_type)
        }
        
        self.events.insert_one(event)
        print(f"{event['emoji']} Event tracked: {event_type}")
    
    # ๐Ÿ“ˆ Real-time metrics
    def get_realtime_stats(self, minutes=5):
        # ๐Ÿ• Calculate time window
        from datetime import timedelta
        cutoff_time = datetime.now() - timedelta(minutes=minutes)
        
        # ๐ŸŽฏ Aggregation for real-time stats
        pipeline = [
            {
                "$match": {
                    "timestamp": {"$gte": cutoff_time}
                }
            },
            {
                "$group": {
                    "_id": "$type",
                    "count": {"$sum": 1},
                    "users": {"$addToSet": "$user_id"}
                }
            },
            {
                "$project": {
                    "event_type": "$_id",
                    "count": 1,
                    "unique_users": {"$size": "$users"}
                }
            }
        ]
        
        print(f"๐Ÿ“Š Real-time stats (last {minutes} minutes):")
        for stat in self.events.aggregate(pipeline):
            print(f"  {stat['event_type']}: {stat['count']} events, "
                  f"{stat['unique_users']} unique users")
    
    # ๐ŸŽฏ User journey tracking
    def track_user_journey(self, user_id):
        # ๐Ÿ“‹ Get user's event history
        user_events = list(
            self.events.find(
                {"user_id": user_id}
            ).sort("timestamp", 1)
        )
        
        print(f"๐Ÿšถ User {user_id} journey:")
        for event in user_events:
            print(f"  {event['emoji']} {event['timestamp'].strftime('%H:%M:%S')} - "
                  f"{event['type']}")
    
    # ๐Ÿ’น Trending analysis
    def get_trending_events(self, hours=24):
        # ๐Ÿ• Time-based trending
        from datetime import timedelta
        cutoff = datetime.now() - timedelta(hours=hours)
        
        pipeline = [
            {"$match": {"timestamp": {"$gte": cutoff}}},
            {
                "$group": {
                    "_id": {
                        "type": "$type",
                        "hour": {"$hour": "$timestamp"}
                    },
                    "count": {"$sum": 1}
                }
            },
            {"$sort": {"count": -1}},
            {"$limit": 10}
        ]
        
        print(f"๐Ÿ”ฅ Trending events (last {hours} hours):")
        for trend in self.events.aggregate(pipeline):
            print(f"  ๐Ÿ“ˆ {trend['_id']['type']} at hour {trend['_id']['hour']}: "
                  f"{trend['count']} events")
    
    # ๐ŸŽจ Helper
    def _get_event_emoji(self, event_type):
        emojis = {
            'page_view': '๐Ÿ‘€',
            'click': '๐Ÿ‘†',
            'purchase': '๐Ÿ’ฐ',
            'signup': 'โœจ',
            'login': '๐Ÿ”‘',
            'share': '๐Ÿ”—',
            'default': '๐Ÿ“Œ'
        }
        return emojis.get(event_type, emojis['default'])

# ๐ŸŽฎ Demo usage
dashboard = AnalyticsDashboard()

# Track some events
dashboard.track_event('user123', 'page_view', {'page': '/home'})
dashboard.track_event('user123', 'click', {'button': 'buy_now'})
dashboard.track_event('user456', 'purchase', {'amount': 99.99})

# View analytics
dashboard.get_realtime_stats()
dashboard.track_user_journey('user123')

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Indexing and Performance

When youโ€™re ready to level up, optimize your queries:

# ๐ŸŽฏ Index management for performance
class IndexOptimizer:
    def __init__(self, collection):
        self.collection = collection
        
    # ๐Ÿš€ Create indexes for faster queries
    def optimize_for_search(self):
        # ๐Ÿ“ Single field index
        self.collection.create_index("name")
        print("โœ… Created index on 'name' field")
        
        # ๐Ÿ” Compound index for complex queries
        self.collection.create_index([
            ("category", 1),    # 1 for ascending
            ("price", -1)       # -1 for descending
        ])
        print("โœ… Created compound index on category+price")
        
        # ๐Ÿ“ Text search index
        self.collection.create_index([("name", "text"), ("tags", "text")])
        print("โœ… Created text search index")
    
    # ๐Ÿ” Use text search
    def search_text(self, query):
        # ๐ŸŽฏ MongoDB text search
        results = self.collection.find(
            {"$text": {"$search": query}},
            {"score": {"$meta": "textScore"}}
        ).sort([("score", {"$meta": "textScore"})])
        
        return list(results)

# ๐Ÿช„ Aggregation framework mastery
def advanced_aggregations(collection):
    # ๐ŸŽจ Complex pipeline with multiple stages
    pipeline = [
        # Stage 1: Match active products
        {"$match": {"in_stock": True}},
        
        # Stage 2: Lookup related data (like JOIN)
        {
            "$lookup": {
                "from": "reviews",
                "localField": "_id",
                "foreignField": "product_id",
                "as": "reviews"
            }
        },
        
        # Stage 3: Add computed fields
        {
            "$addFields": {
                "avg_rating": {"$avg": "$reviews.rating"},
                "review_count": {"$size": "$reviews"}
            }
        },
        
        # Stage 4: Group by category
        {
            "$group": {
                "_id": "$category",
                "products": {"$push": "$$ROOT"},
                "avg_category_rating": {"$avg": "$avg_rating"}
            }
        },
        
        # Stage 5: Sort by average rating
        {"$sort": {"avg_category_rating": -1}}
    ]
    
    return list(collection.aggregate(pipeline))

๐Ÿ—๏ธ Transactions and Data Consistency

For critical operations requiring ACID properties:

# ๐Ÿ’ฐ Transaction example for e-commerce
def process_order(client, user_id, items):
    # ๐Ÿ”’ Start a session for transaction
    with client.start_session() as session:
        try:
            # ๐ŸŽฏ Start transaction
            with session.start_transaction():
                db = client['ecommerce']
                
                # 1๏ธโƒฃ Deduct from inventory
                for item in items:
                    result = db.products.update_one(
                        {
                            "_id": item['product_id'],
                            "stock": {"$gte": item['quantity']}
                        },
                        {
                            "$inc": {"stock": -item['quantity']}
                        },
                        session=session
                    )
                    
                    if result.modified_count == 0:
                        raise Exception(f"โŒ Insufficient stock for {item['product_id']}")
                
                # 2๏ธโƒฃ Create order
                order = {
                    "user_id": user_id,
                    "items": items,
                    "total": sum(item['price'] * item['quantity'] for item in items),
                    "status": "confirmed",
                    "created_at": datetime.now()
                }
                db.orders.insert_one(order, session=session)
                
                # 3๏ธโƒฃ Update user's order history
                db.users.update_one(
                    {"_id": user_id},
                    {"$push": {"orders": order['_id']}},
                    session=session
                )
                
                print("โœ… Order processed successfully!")
                return order
                
        except Exception as e:
            print(f"โŒ Transaction failed: {e}")
            raise

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Not Using Indexes

# โŒ Wrong way - slow queries without indexes
def find_products_slow(name_pattern):
    # This will scan ALL documents!
    return products.find({"name": {"$regex": name_pattern}})

# โœ… Correct way - create index first
products.create_index("name")
def find_products_fast(name_pattern):
    # Now MongoDB uses the index!
    return products.find({"name": {"$regex": name_pattern}})

๐Ÿคฏ Pitfall 2: Unbounded Array Growth

# โŒ Dangerous - arrays can grow infinitely
def add_comment_bad(post_id, comment):
    posts.update_one(
        {"_id": post_id},
        {"$push": {"comments": comment}}  # No limit!
    )

# โœ… Safe - use separate collection for comments
def add_comment_good(post_id, comment):
    comment_doc = {
        "post_id": post_id,
        "text": comment,
        "created_at": datetime.now()
    }
    comments.insert_one(comment_doc)  # Scalable approach

๐Ÿ’ฅ Pitfall 3: Not Handling Connection Errors

# โŒ No error handling
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # What if MongoDB is down?

# โœ… Proper error handling
from pymongo.errors import ConnectionFailure

def get_mongodb_connection():
    try:
        client = MongoClient('mongodb://localhost:27017/', 
                           serverSelectionTimeoutMS=3000)
        # Force connection to verify it works
        client.admin.command('ping')
        print("โœ… Connected to MongoDB!")
        return client
    except ConnectionFailure:
        print("โŒ Failed to connect to MongoDB")
        return None

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Design for Your Queries: Structure documents based on how youโ€™ll query them
  2. ๐Ÿ“ Use Indexes Wisely: Index fields you query frequently
  3. ๐Ÿ”’ Validate Data: Use schema validation for critical collections
  4. ๐Ÿ“Š Monitor Performance: Use explain() to understand query performance
  5. ๐Ÿ›ก๏ธ Handle Errors Gracefully: Always catch and handle exceptions
  6. ๐Ÿ’พ Backup Regularly: Implement backup strategies
  7. ๐Ÿš€ Use Connection Pooling: Reuse connections for better performance

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Blog Platform Backend

Create a complete blog platform with MongoDB:

๐Ÿ“‹ Requirements:

  • โœ… User profiles with authentication
  • ๐Ÿ“ Blog posts with categories and tags
  • ๐Ÿ’ฌ Comments system with nested replies
  • โค๏ธ Like/bookmark functionality
  • ๐Ÿ” Full-text search across posts
  • ๐Ÿ“Š Analytics for post views and engagement

๐Ÿš€ Bonus Points:

  • Implement real-time notifications
  • Add recommendation system
  • Create trending posts algorithm

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Complete blog platform backend
from pymongo import MongoClient, ASCENDING, TEXT
from datetime import datetime, timedelta
import hashlib

class BlogPlatform:
    def __init__(self):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client['blog_platform']
        
        # ๐Ÿ“‹ Collections
        self.users = self.db['users']
        self.posts = self.db['posts']
        self.comments = self.db['comments']
        self.interactions = self.db['interactions']
        
        # ๐Ÿš€ Create indexes
        self._setup_indexes()
        print("๐Ÿ“ Blog Platform initialized!")
    
    def _setup_indexes(self):
        # ๐Ÿ‘ค User indexes
        self.users.create_index("email", unique=True)
        self.users.create_index("username", unique=True)
        
        # ๐Ÿ“ Post indexes
        self.posts.create_index([("title", TEXT), ("content", TEXT), ("tags", TEXT)])
        self.posts.create_index([("author_id", ASCENDING), ("created_at", -1)])
        self.posts.create_index("tags")
        
        # ๐Ÿ’ฌ Comment indexes
        self.comments.create_index([("post_id", ASCENDING), ("created_at", ASCENDING)])
        
        # โค๏ธ Interaction indexes
        self.interactions.create_index([("user_id", ASCENDING), ("post_id", ASCENDING)], unique=True)
    
    # ๐Ÿ‘ค User management
    def create_user(self, username, email, password):
        # ๐Ÿ”’ Hash password
        password_hash = hashlib.sha256(password.encode()).hexdigest()
        
        user = {
            "username": username,
            "email": email,
            "password_hash": password_hash,
            "created_at": datetime.now(),
            "bio": "",
            "avatar": "๐Ÿ‘ค",
            "followers": [],
            "following": []
        }
        
        try:
            result = self.users.insert_one(user)
            print(f"โœ… User {username} created!")
            return result.inserted_id
        except Exception as e:
            print(f"โŒ Error creating user: {e}")
            return None
    
    # ๐Ÿ“ Post management
    def create_post(self, author_id, title, content, tags=[], category="general"):
        post = {
            "author_id": author_id,
            "title": title,
            "content": content,
            "tags": tags,
            "category": category,
            "created_at": datetime.now(),
            "updated_at": datetime.now(),
            "views": 0,
            "likes": 0,
            "status": "published",
            "emoji": self._get_category_emoji(category)
        }
        
        result = self.posts.insert_one(post)
        print(f"โœ… Post created: {post['emoji']} {title}")
        return result.inserted_id
    
    # ๐Ÿ’ฌ Comment system
    def add_comment(self, post_id, user_id, content, parent_id=None):
        comment = {
            "post_id": post_id,
            "user_id": user_id,
            "content": content,
            "parent_id": parent_id,  # For nested replies
            "created_at": datetime.now(),
            "likes": 0,
            "emoji": "๐Ÿ’ฌ"
        }
        
        result = self.comments.insert_one(comment)
        
        # ๐Ÿ“Š Update post comment count
        self.posts.update_one(
            {"_id": post_id},
            {"$inc": {"comment_count": 1}}
        )
        
        return result.inserted_id
    
    # โค๏ธ Like/bookmark functionality
    def toggle_like(self, user_id, post_id):
        # ๐Ÿ” Check if already liked
        existing = self.interactions.find_one({
            "user_id": user_id,
            "post_id": post_id,
            "type": "like"
        })
        
        if existing:
            # ๐Ÿ‘Ž Unlike
            self.interactions.delete_one({"_id": existing["_id"]})
            self.posts.update_one({"_id": post_id}, {"$inc": {"likes": -1}})
            print("๐Ÿ’” Post unliked")
            return False
        else:
            # โค๏ธ Like
            self.interactions.insert_one({
                "user_id": user_id,
                "post_id": post_id,
                "type": "like",
                "created_at": datetime.now()
            })
            self.posts.update_one({"_id": post_id}, {"$inc": {"likes": 1}})
            print("โค๏ธ Post liked!")
            return True
    
    # ๐Ÿ” Search functionality
    def search_posts(self, query, limit=10):
        # ๐Ÿ“ Full-text search
        results = self.posts.find(
            {"$text": {"$search": query}},
            {"score": {"$meta": "textScore"}}
        ).sort([("score", {"$meta": "textScore"})]).limit(limit)
        
        posts = []
        for post in results:
            posts.append({
                "id": str(post["_id"]),
                "title": post["title"],
                "excerpt": post["content"][:100] + "...",
                "score": post["score"],
                "emoji": post.get("emoji", "๐Ÿ“„")
            })
        
        return posts
    
    # ๐Ÿ“Š Analytics
    def get_trending_posts(self, days=7):
        # ๐Ÿ• Calculate trending based on recent engagement
        cutoff = datetime.now() - timedelta(days=days)
        
        pipeline = [
            {"$match": {"created_at": {"$gte": cutoff}}},
            {
                "$addFields": {
                    "engagement_score": {
                        "$add": [
                            "$views",
                            {"$multiply": ["$likes", 2]},
                            {"$multiply": ["$comment_count", 3]}
                        ]
                    }
                }
            },
            {"$sort": {"engagement_score": -1}},
            {"$limit": 10}
        ]
        
        trending = list(self.posts.aggregate(pipeline))
        print(f"๐Ÿ”ฅ Top {len(trending)} trending posts:")
        for post in trending:
            print(f"  {post['emoji']} {post['title']} - Score: {post['engagement_score']}")
        
        return trending
    
    # ๐ŸŽจ Helper
    def _get_category_emoji(self, category):
        emojis = {
            'tech': '๐Ÿ’ป',
            'travel': 'โœˆ๏ธ',
            'food': '๐Ÿ•',
            'fitness': '๐Ÿ’ช',
            'music': '๐ŸŽต',
            'general': '๐Ÿ“„'
        }
        return emojis.get(category.lower(), '๐Ÿ“„')

# ๐ŸŽฎ Test the platform
blog = BlogPlatform()

# Create users
alice_id = blog.create_user("alice", "[email protected]", "password123")
bob_id = blog.create_user("bob", "[email protected]", "password456")

# Create posts
post_id = blog.create_post(
    alice_id,
    "Getting Started with MongoDB",
    "MongoDB is an amazing NoSQL database that...",
    ["mongodb", "python", "tutorial"],
    "tech"
)

# Add interaction
blog.toggle_like(bob_id, post_id)
blog.add_comment(post_id, bob_id, "Great tutorial! Very helpful ๐ŸŽ‰")

# Get trending
blog.get_trending_posts()

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Connect to MongoDB from Python with confidence ๐Ÿ’ช
  • โœ… Perform CRUD operations on document collections ๐Ÿ›ก๏ธ
  • โœ… Design flexible schemas for real applications ๐ŸŽฏ
  • โœ… Use advanced features like aggregation and indexing ๐Ÿ›
  • โœ… Build scalable applications with MongoDB! ๐Ÿš€

Remember: MongoDB gives you the flexibility to evolve your data model as your application grows. Itโ€™s here to help you build amazing things! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered MongoDB with Python!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the blog platform exercise above
  2. ๐Ÿ—๏ธ Build a real project using MongoDB (maybe a chat app or inventory system)
  3. ๐Ÿ“š Move on to our next tutorial: Advanced MongoDB Features
  4. ๐ŸŒŸ Explore MongoDB Atlas for cloud deployment

Remember: Every database expert was once a beginner. Keep coding, keep learning, and most importantly, have fun with your data! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ