Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on connection retry logic! ๐ In this guide, weโll explore how to build resilient applications that gracefully handle network failures and connection issues.
Youโll discover how retry logic can transform your Python applications from fragile to bulletproof. Whether youโre building web APIs ๐, database applications ๐๏ธ, or microservices ๐ง, understanding retry logic is essential for writing robust, production-ready code.
By the end of this tutorial, youโll feel confident implementing retry strategies in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Connection Retry Logic
๐ค What is Connection Retry Logic?
Connection retry logic is like a persistent friend who keeps trying to reach you when your phone is off ๐ฑ. Think of it as a safety net that catches temporary failures and gives your application multiple chances to succeed.
In Python terms, retry logic is a pattern that automatically re-attempts failed operations, typically with delays between attempts. This means you can:
- โจ Handle temporary network hiccups gracefully
- ๐ Improve application reliability
- ๐ก๏ธ Protect against transient failures
๐ก Why Use Retry Logic?
Hereโs why developers love retry logic:
- Network Reality ๐: Networks arenโt perfect - packets drop, servers restart
- Better User Experience ๐: Users see fewer errors
- Resilient Systems ๐๏ธ: Your app keeps working when things go wrong
- Cost Efficiency ๐ฐ: Fewer manual interventions needed
Real-world example: Imagine an e-commerce site ๐. With retry logic, a temporary database glitch wonโt lose customer orders!
๐ง Basic Syntax and Usage
๐ Simple Example
Letโs start with a friendly example:
# ๐ Hello, Retry Logic!
import time
import random
def unreliable_connection():
# ๐ฒ Simulating a flaky connection
if random.random() < 0.7: # 70% failure rate
raise ConnectionError("Connection failed! ๐ฑ")
return "Success! Connected! ๐"
# ๐ Basic retry logic
def retry_connection(max_attempts=3):
for attempt in range(max_attempts):
try:
result = unreliable_connection()
print(f"โ
{result}")
return result
except ConnectionError as e:
print(f"โ Attempt {attempt + 1} failed: {e}")
if attempt < max_attempts - 1:
print("โณ Retrying...")
time.sleep(1) # Wait 1 second before retry
else:
print("๐ฅ All attempts failed!")
raise
# ๐ฎ Let's try it!
retry_connection()
๐ก Explanation: Notice how we use a loop to retry the operation! The time.sleep()
prevents overwhelming the server.
๐ฏ Common Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Exponential Backoff
def exponential_backoff_retry(func, max_attempts=5):
for attempt in range(max_attempts):
try:
return func()
except Exception as e:
if attempt == max_attempts - 1:
raise
wait_time = 2 ** attempt # 1, 2, 4, 8, 16...
print(f"โฐ Waiting {wait_time} seconds...")
time.sleep(wait_time)
# ๐จ Pattern 2: Retry with specific exceptions
def selective_retry(func, retry_exceptions=(ConnectionError, TimeoutError)):
max_attempts = 3
for attempt in range(max_attempts):
try:
return func()
except retry_exceptions as e:
if attempt == max_attempts - 1:
raise
print(f"๐ Retrying after {type(e).__name__}")
time.sleep(1)
except Exception:
# ๐ซ Don't retry other exceptions
raise
# ๐ Pattern 3: Retry decorator
def retry(max_attempts=3, delay=1):
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
print(f"๐ Retry {attempt + 1}/{max_attempts}")
time.sleep(delay)
return wrapper
return decorator
๐ก Practical Examples
๐ Example 1: Database Connection Manager
Letโs build something real:
# ๐๏ธ Database connection with retry logic
import sqlite3
import time
import random
class DatabaseConnection:
def __init__(self, db_path, max_retries=3):
self.db_path = db_path
self.max_retries = max_retries
self.connection = None
# ๐ Connect with retry logic
def connect(self):
for attempt in range(self.max_retries):
try:
# ๐ฒ Simulate occasional connection failures
if random.random() < 0.3: # 30% failure rate
raise sqlite3.OperationalError("Database locked! ๐")
self.connection = sqlite3.connect(self.db_path)
print(f"โ
Connected to database! ๐")
return self.connection
except sqlite3.OperationalError as e:
print(f"โ Connection attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
wait_time = (attempt + 1) * 2 # Progressive delay
print(f"โณ Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
print("๐ฅ Failed to connect after all retries!")
raise
# ๐ Execute query with retry
@retry(max_attempts=3, delay=1)
def execute_query(self, query):
if not self.connection:
self.connect()
cursor = self.connection.cursor()
result = cursor.execute(query)
print(f"โ
Query executed successfully! ๐")
return result
# ๐ฎ Let's use it!
db = DatabaseConnection("shopping.db")
db.connect()
db.execute_query("CREATE TABLE IF NOT EXISTS products (id INTEGER, name TEXT)")
๐ฏ Try it yourself: Add a method to handle connection pool with retry logic!
๐ฎ Example 2: API Client with Smart Retry
Letโs make it fun:
# ๐ API client with intelligent retry
import requests
import time
from datetime import datetime
class SmartAPIClient:
def __init__(self, base_url, max_retries=3):
self.base_url = base_url
self.max_retries = max_retries
self.retry_codes = [429, 500, 502, 503, 504] # Retryable HTTP codes
# ๐ฏ Smart retry with exponential backoff
def make_request(self, endpoint, method="GET", **kwargs):
url = f"{self.base_url}{endpoint}"
for attempt in range(self.max_retries):
try:
print(f"๐ Attempting {method} request to {endpoint}...")
# ๐ก Make the request
response = requests.request(method, url, **kwargs)
# โ
Success!
if response.status_code == 200:
print(f"โ
Request successful! ๐")
return response.json()
# ๐ Should we retry?
if response.status_code in self.retry_codes:
if attempt < self.max_retries - 1:
# ๐ฒ Exponential backoff with jitter
base_delay = 2 ** attempt
jitter = random.uniform(0, 1)
wait_time = base_delay + jitter
print(f"โ ๏ธ Got {response.status_code}, retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
continue
# ๐ซ Non-retryable error
response.raise_for_status()
except requests.exceptions.ConnectionError:
print(f"โ Connection error on attempt {attempt + 1}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
except requests.exceptions.Timeout:
print(f"โฑ๏ธ Timeout on attempt {attempt + 1}")
if attempt < self.max_retries - 1:
time.sleep(1)
else:
raise
print("๐ฅ All retry attempts exhausted!")
raise Exception("Max retries exceeded")
# ๐ฎ Circuit breaker pattern
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
# ๐ Check circuit state
if self.state == "OPEN":
if (datetime.now() - self.last_failure_time).seconds > self.recovery_timeout:
print("๐ Circuit breaker entering HALF_OPEN state")
self.state = "HALF_OPEN"
else:
raise Exception("๐ซ Circuit breaker is OPEN!")
try:
# ๐ Make the call
result = func(*args, **kwargs)
# โ
Success - reset on success
if self.state == "HALF_OPEN":
print("โ
Circuit breaker closing - service recovered!")
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
# โ Failure - increment counter
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
print(f"๐จ Circuit breaker opening after {self.failure_count} failures!")
self.state = "OPEN"
raise e
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Retry with Context
When youโre ready to level up, try this advanced pattern:
# ๐ฏ Advanced retry with context and metrics
import functools
import logging
from typing import Dict, Any
class RetryContext:
def __init__(self):
self.attempts = 0
self.errors = []
self.metrics = {
"total_attempts": 0,
"total_wait_time": 0,
"success": False
}
class AdvancedRetry:
def __init__(self, max_attempts=3, backoff_factor=2):
self.max_attempts = max_attempts
self.backoff_factor = backoff_factor
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
context = RetryContext()
for attempt in range(self.max_attempts):
context.attempts = attempt + 1
context.metrics["total_attempts"] = attempt + 1
try:
# ๐ฒ Pass context to function if it accepts it
if "retry_context" in func.__code__.co_varnames:
result = func(*args, retry_context=context, **kwargs)
else:
result = func(*args, **kwargs)
# โ
Success!
context.metrics["success"] = True
print(f"โจ Success after {context.attempts} attempts!")
return result
except Exception as e:
# ๐ Record error
context.errors.append({
"attempt": attempt + 1,
"error": str(e),
"type": type(e).__name__
})
if attempt < self.max_attempts - 1:
wait_time = self.backoff_factor ** attempt
context.metrics["total_wait_time"] += wait_time
print(f"๐ Retry {attempt + 1}: Waiting {wait_time}s...")
time.sleep(wait_time)
else:
# ๐ Log final metrics
print(f"๐ Retry metrics: {context.metrics}")
print(f"โ All errors: {context.errors}")
raise
return wrapper
# ๐ช Using the advanced retry
@AdvancedRetry(max_attempts=4, backoff_factor=1.5)
def fetch_data_with_context(url, retry_context=None):
if retry_context and retry_context.attempts > 2:
print("๐ฏ Using fallback strategy...")
# Use different approach after 2 failures
# Simulate operation
if random.random() < 0.6:
raise ConnectionError("Network unreachable! ๐")
return {"data": "Success! ๐", "attempts": retry_context.attempts}
๐๏ธ Advanced Topic 2: Async Retry Logic
For the brave developers:
# ๐ Async retry for modern Python
import asyncio
import aiohttp
class AsyncRetry:
def __init__(self, max_attempts=3, timeout=30):
self.max_attempts = max_attempts
self.timeout = timeout
def __call__(self, func):
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(self.max_attempts):
try:
# โฑ๏ธ Add timeout to prevent hanging
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.timeout
)
except asyncio.TimeoutError:
print(f"โฑ๏ธ Timeout on attempt {attempt + 1}")
last_exception = TimeoutError("Operation timed out")
except Exception as e:
print(f"โ Attempt {attempt + 1} failed: {e}")
last_exception = e
if attempt < self.max_attempts - 1:
wait_time = 2 ** attempt
print(f"โณ Async waiting {wait_time}s...")
await asyncio.sleep(wait_time)
raise last_exception
return wrapper
# ๐ Async HTTP client with retry
@AsyncRetry(max_attempts=3, timeout=10)
async def fetch_async_data(session, url):
async with session.get(url) as response:
if response.status >= 500:
raise aiohttp.ClientError(f"Server error: {response.status}")
return await response.json()
# ๐ฎ Using async retry
async def main():
async with aiohttp.ClientSession() as session:
try:
data = await fetch_async_data(session, "https://api.example.com/data")
print(f"โ
Got data: {data}")
except Exception as e:
print(f"๐ฅ Failed after retries: {e}")
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Retry Thundering Herd
# โ Wrong way - all clients retry at the same time!
def bad_retry():
for attempt in range(3):
try:
return make_request()
except:
time.sleep(5) # Everyone waits exactly 5 seconds! ๐ฐ
# โ
Correct way - add jitter to spread out retries!
def good_retry():
for attempt in range(3):
try:
return make_request()
except:
# Add random jitter to prevent thundering herd
base_delay = 2 ** attempt
jitter = random.uniform(0, base_delay * 0.1)
time.sleep(base_delay + jitter) # โ
Spread out retries!
๐คฏ Pitfall 2: Retrying Non-Transient Errors
# โ Dangerous - retrying errors that won't fix themselves!
def retry_everything(func):
for _ in range(5):
try:
return func()
except Exception: # ๐ฅ Don't retry EVERYTHING!
time.sleep(1)
# โ
Safe - only retry transient errors!
def retry_transient(func):
transient_errors = (ConnectionError, TimeoutError, requests.exceptions.Timeout)
for attempt in range(3):
try:
return func()
except transient_errors: # โ
Only retry network errors
if attempt < 2:
time.sleep(2 ** attempt)
else:
raise
except Exception: # ๐ซ Don't retry programming errors!
raise
๐ ๏ธ Best Practices
- ๐ฏ Be Selective: Only retry transient failures (network, timeouts)
- โฐ Use Exponential Backoff: Donโt hammer the server with retries
- ๐ฒ Add Jitter: Prevent thundering herd problems
- ๐ Log and Monitor: Track retry patterns and success rates
- ๐จ Set Limits: Donโt retry forever - know when to give up
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Resilient Download Manager
Create a download manager with smart retry logic:
๐ Requirements:
- โ Download files with automatic retry on failure
- ๐ท๏ธ Support partial downloads (resume capability)
- ๐ค Track download progress and retry attempts
- ๐ Implement timeout handling
- ๐จ Add exponential backoff with jitter
๐ Bonus Points:
- Add circuit breaker for failing servers
- Implement parallel downloads with retry
- Create download queue with priority
๐ก Solution
๐ Click to see solution
# ๐ฏ Resilient download manager with retry logic!
import os
import time
import random
import requests
from typing import Optional, Dict
from dataclasses import dataclass
@dataclass
class DownloadStats:
url: str
file_size: int
downloaded: int = 0
attempts: int = 0
status: str = "pending" # pending, downloading, completed, failed
errors: list = None
def __post_init__(self):
if self.errors is None:
self.errors = []
class ResilientDownloadManager:
def __init__(self, max_retries=3, chunk_size=1024*1024): # 1MB chunks
self.max_retries = max_retries
self.chunk_size = chunk_size
self.downloads: Dict[str, DownloadStats] = {}
# ๐ Download with retry and resume
def download_file(self, url: str, destination: str) -> bool:
# ๐ Initialize stats
stats = DownloadStats(url=url, file_size=0)
self.downloads[url] = stats
# ๐ฏ Get file size
try:
response = requests.head(url)
stats.file_size = int(response.headers.get('content-length', 0))
except:
stats.file_size = 0
for attempt in range(self.max_retries):
stats.attempts = attempt + 1
try:
# ๐ Check if partial download exists
resume_pos = 0
if os.path.exists(destination):
resume_pos = os.path.getsize(destination)
stats.downloaded = resume_pos
print(f"๐ Resuming download from {resume_pos} bytes")
# ๐ Set headers for resume
headers = {}
if resume_pos > 0:
headers['Range'] = f'bytes={resume_pos}-'
# ๐ฅ Download with progress
stats.status = "downloading"
response = requests.get(url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
# ๐พ Write to file
mode = 'ab' if resume_pos > 0 else 'wb'
with open(destination, mode) as f:
for chunk in response.iter_content(chunk_size=self.chunk_size):
if chunk:
f.write(chunk)
stats.downloaded += len(chunk)
# ๐ Progress update
if stats.file_size > 0:
progress = (stats.downloaded / stats.file_size) * 100
print(f"๐ฅ Progress: {progress:.1f}% ({stats.downloaded}/{stats.file_size})")
# โ
Success!
stats.status = "completed"
print(f"โ
Download completed: {destination}")
return True
except requests.exceptions.Timeout:
error = f"Timeout on attempt {attempt + 1}"
stats.errors.append(error)
print(f"โฑ๏ธ {error}")
except requests.exceptions.ConnectionError as e:
error = f"Connection error on attempt {attempt + 1}: {e}"
stats.errors.append(error)
print(f"โ {error}")
except Exception as e:
error = f"Unexpected error: {e}"
stats.errors.append(error)
print(f"๐ฅ {error}")
# ๐ Retry logic
if attempt < self.max_retries - 1:
# Exponential backoff with jitter
base_delay = 2 ** attempt
jitter = random.uniform(0, base_delay * 0.3)
wait_time = base_delay + jitter
print(f"โณ Retrying in {wait_time:.1f} seconds...")
time.sleep(wait_time)
else:
stats.status = "failed"
print(f"๐ฅ Download failed after {self.max_retries} attempts")
return False
# ๐ Get download statistics
def get_stats(self, url: str) -> Optional[DownloadStats]:
return self.downloads.get(url)
# ๐ฏ Batch download with retry
def download_batch(self, downloads: Dict[str, str]):
results = {}
for url, destination in downloads.items():
print(f"\n๐ Downloading: {url}")
success = self.download_file(url, destination)
results[url] = success
# โธ๏ธ Small delay between downloads
if success:
time.sleep(0.5)
# ๐ Summary
successful = sum(1 for success in results.values() if success)
print(f"\n๐ Download summary: {successful}/{len(downloads)} successful")
return results
# ๐ฎ Test it out!
manager = ResilientDownloadManager(max_retries=3)
# Single download
manager.download_file(
"https://example.com/large-file.zip",
"downloads/large-file.zip"
)
# Batch downloads
downloads = {
"https://example.com/file1.pdf": "downloads/file1.pdf",
"https://example.com/file2.zip": "downloads/file2.zip",
}
manager.download_batch(downloads)
# Check stats
stats = manager.get_stats("https://example.com/large-file.zip")
if stats:
print(f"๐ Stats: {stats.attempts} attempts, {len(stats.errors)} errors")
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Implement retry logic with confidence ๐ช
- โ Avoid common retry pitfalls that trip up beginners ๐ก๏ธ
- โ Apply exponential backoff in real projects ๐ฏ
- โ Debug connection issues like a pro ๐
- โ Build resilient applications with Python! ๐
Remember: Retry logic is your safety net, not a magic fix for all problems! Use it wisely. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered connection retry logic!
Hereโs what to do next:
- ๐ป Practice with the download manager exercise
- ๐๏ธ Add retry logic to your existing projects
- ๐ Explore advanced patterns like circuit breakers
- ๐ Share your resilient code with others!
Remember: Every robust application started with simple retry logic. Keep building, keep improving, and most importantly, keep your apps running! ๐
Happy coding! ๐๐โจ