+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 408 of 541

๐Ÿš€ Context Managers: Advanced Usage

Master advanced context manager techniques in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
35 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on advanced context managers! ๐ŸŽ‰ In this guide, weโ€™ll explore powerful techniques that go beyond the basic with statement to create sophisticated resource management solutions.

Youโ€™ll discover how context managers can transform your Python development experience. Whether youโ€™re building web applications ๐ŸŒ, managing database connections ๐Ÿ—„๏ธ, or creating thread-safe operations ๐Ÿ”’, understanding advanced context managers is essential for writing robust, maintainable code.

By the end of this tutorial, youโ€™ll feel confident creating custom context managers that handle complex scenarios like nested resources, async operations, and error recovery! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Advanced Context Managers

๐Ÿค” What Makes a Context Manager โ€œAdvancedโ€?

Advanced context managers are like professional-grade Swiss Army knives ๐Ÿ”ง. Think of them as sophisticated resource guardians that not only open and close resources but also handle complex scenarios like multiple resources, error recovery, and state management.

In Python terms, advanced context managers go beyond simple __enter__ and __exit__ methods. This means you can:

  • โœจ Manage multiple resources atomically
  • ๐Ÿš€ Create reusable context manager decorators
  • ๐Ÿ›ก๏ธ Implement sophisticated error handling
  • ๐Ÿ”„ Build async context managers
  • ๐ŸŽญ Create context managers that modify behavior

๐Ÿ’ก Why Master Advanced Context Managers?

Hereโ€™s why developers love advanced context managers:

  1. Resource Safety ๐Ÿ”’: Guarantee cleanup even in complex scenarios
  2. Code Reusability ๐Ÿ’ป: Create modular resource management components
  3. Error Resilience ๐Ÿ“–: Handle failures gracefully with automatic recovery
  4. Performance Control ๐Ÿ”ง: Manage resource lifecycles efficiently

Real-world example: Imagine managing a distributed transaction ๐Ÿ’ณ. With advanced context managers, you can ensure all operations complete atomically or roll back together!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Context Manager Protocol Deep Dive

Letโ€™s start with the complete protocol:

# ๐Ÿ‘‹ Hello, Advanced Context Managers!
class AdvancedResource:
    def __init__(self, name):
        self.name = name  # ๐Ÿท๏ธ Resource identifier
        self.state = "initialized"  # ๐Ÿ“Š Track state
    
    def __enter__(self):
        # ๐Ÿš€ Acquisition phase
        print(f"โœจ Acquiring {self.name}")
        self.state = "active"
        return self  # ๐ŸŽ Return resource to 'as' variable
    
    def __exit__(self, exc_type, exc_value, traceback):
        # ๐Ÿงน Cleanup phase
        print(f"๐Ÿ”’ Releasing {self.name}")
        self.state = "closed"
        
        # ๐Ÿ›ก๏ธ Exception handling
        if exc_type:
            print(f"โš ๏ธ Handling {exc_type.__name__}: {exc_value}")
            # Return True to suppress exception
            return False  # Let exception propagate

๐Ÿ’ก Explanation: The __exit__ method receives exception information, allowing sophisticated error handling!

๐ŸŽฏ Advanced Patterns

Here are patterns for advanced usage:

# ๐Ÿ—๏ธ Pattern 1: Contextlib utilities
from contextlib import contextmanager, ExitStack

@contextmanager
def managed_resource(name):
    # ๐ŸŽจ Setup phase
    resource = {"name": name, "active": True}
    print(f"โœจ Setting up {name}")
    
    try:
        yield resource  # ๐ŸŽ Provide resource
    finally:
        # ๐Ÿงน Cleanup phase
        resource["active"] = False
        print(f"๐Ÿ”’ Cleaning up {name}")

# ๐ŸŽฏ Pattern 2: Multiple resource management
def process_multiple_files(filenames):
    with ExitStack() as stack:
        # ๐Ÿ“‚ Open all files safely
        files = [
            stack.enter_context(open(fname))
            for fname in filenames
        ]
        # ๐ŸŽจ Process all files
        return [f.read() for f in files]

# ๐Ÿ”„ Pattern 3: Reentrant context manager
from threading import RLock

class ReentrantResource:
    def __init__(self):
        self._lock = RLock()  # ๐Ÿ”’ Reentrant lock
        self._count = 0
    
    def __enter__(self):
        self._lock.acquire()
        self._count += 1
        print(f"๐Ÿ”„ Entered (depth: {self._count})")
        return self
    
    def __exit__(self, *exc_info):
        self._count -= 1
        print(f"๐Ÿ”„ Exited (depth: {self._count})")
        self._lock.release()

๐Ÿ’ก Practical Examples

๐Ÿ—„๏ธ Example 1: Database Transaction Manager

Letโ€™s build a sophisticated transaction manager:

# ๐Ÿ—„๏ธ Advanced database transaction manager
import sqlite3
from contextlib import contextmanager
from typing import Optional, Any

class DatabaseTransaction:
    def __init__(self, db_path: str):
        self.db_path = db_path  # ๐Ÿ“ Database location
        self.connection: Optional[sqlite3.Connection] = None
        self.cursor: Optional[sqlite3.Cursor] = None
        self.savepoint_count = 0  # ๐Ÿท๏ธ Track nested transactions
    
    def __enter__(self):
        # ๐Ÿš€ Start transaction
        self.connection = sqlite3.connect(self.db_path)
        self.cursor = self.connection.cursor()
        self.connection.execute("BEGIN")
        print("โœจ Transaction started")
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        # ๐Ÿ›ก๏ธ Handle transaction completion
        if exc_type is None:
            # โœ… Success - commit
            self.connection.commit()
            print("โœ… Transaction committed")
        else:
            # โŒ Error - rollback
            self.connection.rollback()
            print(f"โŒ Transaction rolled back: {exc_value}")
        
        # ๐Ÿงน Cleanup
        self.cursor.close()
        self.connection.close()
        return False  # Don't suppress exceptions
    
    @contextmanager
    def savepoint(self, name: str):
        # ๐ŸŽฏ Nested transaction support
        savepoint_name = f"sp_{name}_{self.savepoint_count}"
        self.savepoint_count += 1
        
        self.connection.execute(f"SAVEPOINT {savepoint_name}")
        print(f"๐Ÿ“ Savepoint '{name}' created")
        
        try:
            yield
            print(f"โœ… Savepoint '{name}' released")
        except Exception as e:
            self.connection.execute(f"ROLLBACK TO {savepoint_name}")
            print(f"โช Rolled back to savepoint '{name}'")
            raise
        finally:
            self.savepoint_count -= 1

# ๐ŸŽฎ Let's use it!
with DatabaseTransaction("store.db") as tx:
    tx.cursor.execute("INSERT INTO orders VALUES (?, ?)", (1, "Coffee โ˜•"))
    
    # ๐ŸŽฏ Nested savepoint
    with tx.savepoint("product_check"):
        tx.cursor.execute("INSERT INTO items VALUES (?, ?)", (1, "Espresso โ˜•"))
        # This could fail but won't affect the order!

๐ŸŽฏ Try it yourself: Add support for read-only transactions and connection pooling!

๐Ÿ”„ Example 2: Async Context Manager

Letโ€™s create an async resource manager:

# ๐Ÿš€ Async context manager for API rate limiting
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List

class AsyncRateLimiter:
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests  # ๐ŸŽฏ Request limit
        self.time_window = time_window  # โฑ๏ธ Time window in seconds
        self.requests: List[datetime] = []  # ๐Ÿ“Š Request timestamps
        self.lock = asyncio.Lock()  # ๐Ÿ”’ Thread safety
    
    async def __aenter__(self):
        # ๐ŸŽจ Async acquisition
        async with self.lock:
            now = datetime.now()
            # ๐Ÿงน Clean old requests
            self.requests = [
                req for req in self.requests
                if now - req < timedelta(seconds=self.time_window)
            ]
            
            # ๐Ÿ›ก๏ธ Check rate limit
            if len(self.requests) >= self.max_requests:
                # โณ Calculate wait time
                oldest = self.requests[0]
                wait_time = (oldest + timedelta(seconds=self.time_window) - now).total_seconds()
                if wait_time > 0:
                    print(f"โณ Rate limited! Waiting {wait_time:.1f}s...")
                    await asyncio.sleep(wait_time)
                    # ๐Ÿ”„ Recursive retry
                    return await self.__aenter__()
            
            # โœ… Record request
            self.requests.append(now)
            print(f"โœจ Request allowed (#{len(self.requests)}/{self.max_requests})")
            return self
    
    async def __aexit__(self, exc_type, exc_value, traceback):
        # ๐Ÿ“Š Log completion
        if exc_type:
            print(f"โš ๏ธ Request failed: {exc_value}")
        else:
            print("โœ… Request completed")
        return False

# ๐ŸŽฎ Using async context manager
async def make_api_calls():
    # ๐Ÿšฆ Allow 3 requests per 10 seconds
    rate_limiter = AsyncRateLimiter(max_requests=3, time_window=10)
    
    async def api_call(call_id: int):
        async with rate_limiter:
            print(f"๐ŸŒ Making API call #{call_id}")
            await asyncio.sleep(1)  # Simulate API call
            return f"Result {call_id} ๐ŸŽ‰"
    
    # ๐Ÿš€ Make concurrent calls
    tasks = [api_call(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    return results

๐ŸŽญ Example 3: Behavioral Context Manager

Create a context manager that modifies program behavior:

# ๐ŸŽญ Context manager for temporary configuration
import sys
from contextlib import contextmanager
from typing import Dict, Any, Optional

class ConfigurationContext:
    def __init__(self):
        self._config_stack: List[Dict[str, Any]] = []  # ๐Ÿ“š Config stack
        self._current_config: Dict[str, Any] = {}  # ๐ŸŽฏ Active config
    
    @contextmanager
    def temporary_config(self, **kwargs):
        # ๐Ÿ“ธ Snapshot current state
        old_config = self._current_config.copy()
        self._config_stack.append(old_config)
        
        # ๐ŸŽจ Apply new configuration
        self._current_config.update(kwargs)
        print(f"โœจ Config updated: {kwargs}")
        
        try:
            yield self._current_config
        finally:
            # ๐Ÿ”„ Restore previous state
            self._current_config = self._config_stack.pop()
            print("๐Ÿ”„ Config restored")
    
    def get(self, key: str, default: Any = None) -> Any:
        return self._current_config.get(key, default)

# ๐ŸŽฎ Global config instance
config = ConfigurationContext()

# ๐Ÿ—๏ธ Temporary environment modifier
@contextmanager
def temporary_env(**env_vars):
    # ๐Ÿ“ธ Save original environment
    original_env = {}
    for key, value in env_vars.items():
        original_env[key] = os.environ.get(key)
        os.environ[key] = str(value)
        print(f"๐ŸŒ Set {key}={value}")
    
    try:
        yield
    finally:
        # ๐Ÿ”„ Restore environment
        for key, original_value in original_env.items():
            if original_value is None:
                os.environ.pop(key, None)
            else:
                os.environ[key] = original_value
            print(f"๐Ÿ”„ Restored {key}")

# ๐ŸŽจ Usage example
with config.temporary_config(debug=True, cache_size=1000):
    print(f"๐Ÿ” Debug mode: {config.get('debug')}")
    # Debug mode is True here
    
    with config.temporary_config(debug=False):
        print(f"๐Ÿ” Nested debug: {config.get('debug')}")
        # Debug is False in this block
    
    # Debug is True again here

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Context Manager Decorators

When youโ€™re ready to level up, try these advanced patterns:

# ๐ŸŽฏ Advanced decorator for automatic retry
from functools import wraps
import time
from typing import Callable, TypeVar, Optional

T = TypeVar('T')

def retry_context(max_attempts: int = 3, delay: float = 1.0):
    # ๐Ÿช„ Decorator factory
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            last_exception: Optional[Exception] = None
            
            for attempt in range(max_attempts):
                try:
                    if attempt > 0:
                        print(f"๐Ÿ”„ Retry attempt {attempt + 1}/{max_attempts}")
                        time.sleep(delay * attempt)  # Exponential backoff
                    
                    return func(*args, **kwargs)
                    
                except Exception as e:
                    last_exception = e
                    print(f"โš ๏ธ Attempt {attempt + 1} failed: {e}")
            
            print(f"โŒ All {max_attempts} attempts failed!")
            raise last_exception
        
        return wrapper
    return decorator

# ๐ŸŽจ Context manager composition
class CompositeContextManager:
    def __init__(self, *managers):
        self.managers = managers  # ๐Ÿ“ฆ Store managers
        self.stack = []  # ๐Ÿ“š Exit stack
    
    def __enter__(self):
        # ๐ŸŽฏ Enter all managers in order
        results = []
        for manager in self.managers:
            try:
                result = manager.__enter__()
                self.stack.append((manager, True))
                results.append(result)
            except Exception:
                # ๐Ÿ›ก๏ธ Cleanup on failure
                self.__exit__(None, None, None)
                raise
        
        return results if len(results) > 1 else results[0]
    
    def __exit__(self, exc_type, exc_value, traceback):
        # ๐Ÿงน Exit in reverse order
        exception_handled = False
        
        while self.stack:
            manager, entered = self.stack.pop()
            if entered:
                try:
                    if manager.__exit__(exc_type, exc_value, traceback):
                        exception_handled = True
                        # ๐ŸŽฏ Clear exception info for remaining managers
                        exc_type = exc_value = traceback = None
                except Exception:
                    # ๐Ÿšจ Ignore cleanup errors during exception handling
                    if exc_type is None:
                        raise
        
        return exception_handled

๐Ÿ—๏ธ Context Variables and Thread Safety

For the brave developers:

# ๐Ÿš€ Thread-local context management
import threading
from contextvars import ContextVar
from typing import Generic, TypeVar

T = TypeVar('T')

# ๐ŸŽฏ Context variable for request tracking
request_id: ContextVar[Optional[str]] = ContextVar('request_id', default=None)

class ThreadSafeContextManager(Generic[T]):
    def __init__(self, factory: Callable[[], T]):
        self.factory = factory  # ๐Ÿญ Resource factory
        self.local = threading.local()  # ๐Ÿ”’ Thread-local storage
        self.lock = threading.RLock()  # ๐Ÿ”„ Reentrant lock
    
    def __enter__(self) -> T:
        with self.lock:
            # ๐ŸŽจ Create thread-local resource
            if not hasattr(self.local, 'resource'):
                self.local.resource = self.factory()
                self.local.count = 0
            
            self.local.count += 1
            print(f"๐Ÿ”’ Thread {threading.current_thread().name} entered (count: {self.local.count})")
            return self.local.resource
    
    def __exit__(self, exc_type, exc_value, traceback):
        with self.lock:
            self.local.count -= 1
            print(f"๐Ÿ”“ Thread {threading.current_thread().name} exited (count: {self.local.count})")
            
            # ๐Ÿงน Cleanup when last exit
            if self.local.count == 0:
                if hasattr(self.local.resource, 'close'):
                    self.local.resource.close()
                delattr(self.local, 'resource')

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Resource Leaks in Exceptions

# โŒ Wrong way - resource leak on exception!
class BadResource:
    def __enter__(self):
        self.file = open('data.txt')  # ๐Ÿ“‚ Opens file
        self.socket = socket.socket()  # ๐ŸŒ Opens socket
        # ๐Ÿ’ฅ If socket creation fails, file never closes!
        return self
    
    def __exit__(self, *args):
        self.file.close()
        self.socket.close()

# โœ… Correct way - use ExitStack!
from contextlib import ExitStack

class GoodResource:
    def __enter__(self):
        self.stack = ExitStack()
        try:
            # ๐Ÿ›ก๏ธ Each resource is protected
            self.file = self.stack.enter_context(open('data.txt'))
            self.socket = self.stack.enter_context(socket.socket())
            return self
        except:
            # ๐Ÿงน Cleanup partial acquisitions
            self.stack.close()
            raise
    
    def __exit__(self, *args):
        self.stack.close()  # โœ… Closes all resources safely!

๐Ÿคฏ Pitfall 2: Generator Context Manager Errors

# โŒ Dangerous - generator might not cleanup!
@contextmanager
def risky_resource():
    resource = acquire_resource()
    yield resource  # ๐Ÿ’ฅ If exception here, no cleanup!
    release_resource(resource)

# โœ… Safe - always cleanup!
@contextmanager
def safe_resource():
    resource = acquire_resource()
    try:
        yield resource  # โœ… Protected by try-finally
    finally:
        release_resource(resource)  # ๐Ÿงน Always runs!

๐Ÿ› Pitfall 3: Async Context Manager Deadlocks

# โŒ Deadlock risk with nested locks!
class BadAsyncResource:
    def __init__(self):
        self.lock = asyncio.Lock()
    
    async def __aenter__(self):
        await self.lock.acquire()
        # ๐Ÿ’ฅ If used recursively, deadlock!
        return self

# โœ… Safe recursive context manager!
class GoodAsyncResource:
    def __init__(self):
        self.lock = asyncio.Lock()
        self.count = 0
        self.owner = None
    
    async def __aenter__(self):
        current = asyncio.current_task()
        
        if self.owner == current:
            # ๐Ÿ”„ Same task - allow reentry
            self.count += 1
        else:
            # ๐Ÿ”’ Different task - acquire lock
            await self.lock.acquire()
            self.owner = current
            self.count = 1
        
        return self
    
    async def __aexit__(self, *args):
        self.count -= 1
        if self.count == 0:
            # ๐Ÿ”“ Last exit - release lock
            self.owner = None
            self.lock.release()

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Always Handle Exceptions: Never let __exit__ raise new exceptions
  2. ๐Ÿ“ Document Resource Behavior: Clear docs on what resources are managed
  3. ๐Ÿ›ก๏ธ Use ExitStack for Multiple Resources: Safer than manual management
  4. ๐ŸŽจ Keep Context Managers Focused: One responsibility per manager
  5. โœจ Test Error Paths: Ensure cleanup works even when things go wrong

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Distributed Lock Manager

Create a context manager for distributed locking:

๐Ÿ“‹ Requirements:

  • โœ… Support for multiple lock servers
  • ๐Ÿท๏ธ Lock expiration with automatic renewal
  • ๐Ÿ‘ค Lock ownership tracking
  • ๐Ÿ“… Queue fairness for waiting clients
  • ๐ŸŽจ Metrics collection for monitoring

๐Ÿš€ Bonus Points:

  • Add support for read/write locks
  • Implement lock priority levels
  • Create a distributed transaction coordinator

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Distributed lock manager implementation!
import time
import uuid
import threading
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from contextlib import contextmanager

class DistributedLock:
    def __init__(self, lock_name: str, ttl: int = 30, servers: List[str] = None):
        self.lock_name = lock_name  # ๐Ÿท๏ธ Lock identifier
        self.ttl = ttl  # โฑ๏ธ Time to live in seconds
        self.servers = servers or ["localhost:6379"]  # ๐Ÿ–ฅ๏ธ Lock servers
        self.owner_id = str(uuid.uuid4())  # ๐Ÿ”‘ Unique owner ID
        self.acquired = False  # ๐Ÿšฆ Lock state
        self.renew_thread: Optional[threading.Thread] = None
        self.stop_renewal = threading.Event()
    
    def __enter__(self):
        # ๐ŸŽฏ Try to acquire lock
        start_time = time.time()
        retry_delay = 0.1
        
        while not self._try_acquire():
            # โณ Exponential backoff
            time.sleep(retry_delay)
            retry_delay = min(retry_delay * 2, 1.0)
            
            # ๐Ÿšจ Timeout check
            if time.time() - start_time > 60:
                raise TimeoutError(f"โฐ Failed to acquire lock '{self.lock_name}' after 60s")
        
        # โœ… Lock acquired!
        self.acquired = True
        print(f"๐Ÿ”’ Acquired lock '{self.lock_name}' (owner: {self.owner_id[:8]}...)")
        
        # ๐Ÿ”„ Start renewal thread
        self._start_renewal()
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        # ๐Ÿ›‘ Stop renewal
        if self.renew_thread:
            self.stop_renewal.set()
            self.renew_thread.join()
        
        # ๐Ÿ”“ Release lock
        if self.acquired:
            self._release()
            print(f"๐Ÿ”“ Released lock '{self.lock_name}'")
        
        return False
    
    def _try_acquire(self) -> bool:
        # ๐ŸŽฏ Attempt to acquire on majority of servers
        acquired_count = 0
        majority = len(self.servers) // 2 + 1
        
        for server in self.servers:
            try:
                # ๐Ÿ”’ Simulate distributed lock acquisition
                # In real implementation, this would be Redis SET NX or similar
                if self._acquire_on_server(server):
                    acquired_count += 1
                    if acquired_count >= majority:
                        return True
            except Exception as e:
                print(f"โš ๏ธ Failed to acquire on {server}: {e}")
        
        # ๐Ÿ”„ Rollback partial acquisitions
        if acquired_count > 0:
            self._release_partial(acquired_count)
        
        return False
    
    def _acquire_on_server(self, server: str) -> bool:
        # ๐ŸŽจ Simulate lock acquisition (implement with actual backend)
        # This would typically use Redis SET NX EX or similar
        return True  # Simplified for example
    
    def _release(self):
        # ๐Ÿ”“ Release on all servers
        for server in self.servers:
            try:
                self._release_on_server(server)
            except Exception as e:
                print(f"โš ๏ธ Failed to release on {server}: {e}")
    
    def _release_on_server(self, server: str):
        # ๐ŸŽจ Simulate lock release
        pass  # Simplified for example
    
    def _release_partial(self, count: int):
        # ๐Ÿ”„ Cleanup partial acquisitions
        pass  # Simplified for example
    
    def _start_renewal(self):
        # ๐Ÿ”„ Auto-renewal thread
        def renew():
            while not self.stop_renewal.is_set():
                # โฐ Renew at half TTL
                time.sleep(self.ttl / 2)
                if not self.stop_renewal.is_set():
                    self._renew_lock()
                    print(f"๐Ÿ”„ Renewed lock '{self.lock_name}'")
        
        self.renew_thread = threading.Thread(target=renew, daemon=True)
        self.renew_thread.start()
    
    def _renew_lock(self):
        # ๐Ÿ”„ Extend lock TTL
        pass  # Simplified for example

# ๐Ÿ—๏ธ Lock manager with metrics
class LockManager:
    def __init__(self):
        self.metrics: Dict[str, Dict] = {}  # ๐Ÿ“Š Lock metrics
        self.lock = threading.Lock()  # ๐Ÿ”’ Thread safety
    
    @contextmanager
    def distributed_lock(self, name: str, ttl: int = 30):
        # ๐Ÿ“Š Track metrics
        with self.lock:
            if name not in self.metrics:
                self.metrics[name] = {
                    "acquisitions": 0,
                    "releases": 0,
                    "failures": 0,
                    "total_hold_time": 0
                }
        
        start_time = time.time()
        lock = DistributedLock(name, ttl)
        
        try:
            with lock:
                self.metrics[name]["acquisitions"] += 1
                yield lock
        except Exception as e:
            self.metrics[name]["failures"] += 1
            raise
        finally:
            # ๐Ÿ“Š Update metrics
            hold_time = time.time() - start_time
            self.metrics[name]["total_hold_time"] += hold_time
            self.metrics[name]["releases"] += 1
    
    def get_metrics(self, lock_name: str) -> Dict:
        # ๐Ÿ“Š Return lock metrics
        return self.metrics.get(lock_name, {})

# ๐ŸŽฎ Test it out!
manager = LockManager()

with manager.distributed_lock("order_processing", ttl=60) as lock:
    print("๐ŸŽฏ Processing orders with distributed lock!")
    # Critical section protected across multiple servers
    time.sleep(2)

print(f"๐Ÿ“Š Metrics: {manager.get_metrics('order_processing')}")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Create sophisticated context managers with confidence ๐Ÿ’ช
  • โœ… Handle complex resource scenarios like a pro ๐Ÿ›ก๏ธ
  • โœ… Build async and thread-safe managers for modern applications ๐ŸŽฏ
  • โœ… Debug context manager issues effectively ๐Ÿ›
  • โœ… Implement distributed patterns with context managers! ๐Ÿš€

Remember: Context managers are your safety net! They ensure resources are managed correctly even when things go wrong. ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered advanced context managers!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the distributed lock exercise above
  2. ๐Ÿ—๏ธ Build a context manager library for your projects
  3. ๐Ÿ“š Explore async context managers with aiocontextlib
  4. ๐ŸŒŸ Share your creative context manager solutions!

Remember: Every Python expert uses context managers to write safer, cleaner code. Keep practicing, keep learning, and most importantly, have fun! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ