Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to the exciting world of data pipelines and ETL (Extract, Transform, Load)! ๐ In todayโs data-driven world, knowing how to build robust data pipelines is like having a superpower.
Youโll discover how ETL processes can transform messy data into golden insights ๐. Whether youโre processing customer data ๐ฅ, analyzing sales trends ๐, or building machine learning pipelines ๐ค, understanding ETL is essential for every data professional.
By the end of this tutorial, youโll be building data pipelines like a pro! Letโs dive in! ๐โโ๏ธ
๐ Understanding ETL and Data Pipelines
๐ค What is ETL?
ETL is like being a data chef ๐จโ๐ณ. Think of it as a three-step recipe:
- Extract ๐ฅ: Gathering ingredients (data) from various sources
- Transform ๐: Preparing and cooking (cleaning, formatting)
- Load ๐ค: Serving the final dish (storing processed data)
In Python terms, ETL means:
- โจ Extract: Reading data from files, APIs, databases
- ๐ Transform: Cleaning, validating, aggregating data
- ๐ก๏ธ Load: Saving to databases, data warehouses, or files
๐ก Why Use Data Pipelines?
Hereโs why developers love data pipelines:
- Automation ๐ค: Process data without manual intervention
- Scalability ๐: Handle growing data volumes effortlessly
- Reliability ๐ก๏ธ: Consistent, repeatable processes
- Time Savings โฐ: Focus on insights, not data wrangling
Real-world example: Imagine an e-commerce store ๐. Every day, you need to process orders, update inventory, and generate reports. A data pipeline automates all of this!
๐ง Basic ETL Syntax and Usage
๐ Simple ETL Example
Letโs start with a friendly example using pandas:
# ๐ Hello, ETL!
import pandas as pd
import numpy as np
from datetime import datetime
# ๐ฅ EXTRACT: Load sales data
def extract_data():
# ๐จ Simulating data extraction from CSV
data = {
'order_id': [1001, 1002, 1003, 1004, 1005],
'product': ['Laptop ๐ป', 'Mouse ๐ฑ๏ธ', 'Keyboard โจ๏ธ', 'Monitor ๐ฅ๏ธ', 'Laptop ๐ป'],
'price': [999.99, 29.99, 79.99, 299.99, 1199.99],
'quantity': [1, 2, 1, 1, 1],
'date': ['2024-01-15', '2024-01-15', '2024-01-16', '2024-01-16', '2024-01-17']
}
df = pd.DataFrame(data)
print("๐ฅ Extracted data successfully! ๐")
return df
# ๐ TRANSFORM: Clean and enrich data
def transform_data(df):
# ๐งน Clean data
df['date'] = pd.to_datetime(df['date'])
# ๐ฐ Calculate total revenue
df['total_revenue'] = df['price'] * df['quantity']
# ๐ Add day of week
df['day_of_week'] = df['date'].dt.day_name()
# ๐ท๏ธ Add price category
df['price_category'] = pd.cut(df['price'],
bins=[0, 50, 200, 1000, 5000],
labels=['Budget ๐ต', 'Mid-range ๐ฐ', 'Premium ๐', 'Luxury ๐'])
print("๐ Transformed data successfully! โจ")
return df
# ๐ค LOAD: Save processed data
def load_data(df, filename='processed_sales.csv'):
# ๐พ Save to CSV
df.to_csv(filename, index=False)
print(f"๐ค Loaded data to {filename} successfully! ๐")
# ๐ฎ Run the ETL pipeline
def run_etl_pipeline():
print("๐ Starting ETL Pipeline...")
# Execute ETL steps
raw_data = extract_data()
transformed_data = transform_data(raw_data)
load_data(transformed_data)
print("โ
ETL Pipeline completed successfully! ๐")
return transformed_data
# Run it!
result = run_etl_pipeline()
print("\n๐ Sample of processed data:")
print(result.head())
๐ก Explanation: Notice how we break down the process into clear steps! Each function has a single responsibility, making our pipeline modular and maintainable.
๐ฏ Common ETL Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Data Validation
def validate_data(df):
# โ
Check for missing values
if df.isnull().sum().sum() > 0:
print("โ ๏ธ Found missing values!")
df = df.fillna(method='ffill') # Forward fill
# ๐ก๏ธ Remove duplicates
initial_rows = len(df)
df = df.drop_duplicates()
if len(df) < initial_rows:
print(f"๐งน Removed {initial_rows - len(df)} duplicate rows")
return df
# ๐จ Pattern 2: Data Type Conversion
def convert_data_types(df):
# ๐
Convert date strings to datetime
date_columns = ['order_date', 'ship_date', 'delivery_date']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# ๐ฏ Convert numeric strings to numbers
numeric_columns = ['price', 'quantity', 'discount']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
# ๐ Pattern 3: Data Aggregation
def aggregate_sales_data(df):
# ๐ Group by product category
summary = df.groupby('product_category').agg({
'total_revenue': 'sum',
'quantity': 'sum',
'order_id': 'count'
}).rename(columns={'order_id': 'order_count'})
# ๐ Add average order value
summary['avg_order_value'] = summary['total_revenue'] / summary['order_count']
return summary
๐ก Practical Examples
๐ Example 1: E-commerce Data Pipeline
Letโs build a real e-commerce ETL pipeline:
# ๐๏ธ E-commerce ETL Pipeline
import pandas as pd
import requests
from sqlalchemy import create_engine
class EcommercePipeline:
def __init__(self):
self.data = None
self.transformed_data = None
# ๐ฅ Extract from multiple sources
def extract(self):
print("๐ฅ Extracting data from multiple sources...")
# ๐ Extract from API
# api_data = requests.get('https://api.store.com/orders').json()
# ๐ Extract from CSV
orders_data = {
'order_id': ['ORD001', 'ORD002', 'ORD003', 'ORD004'],
'customer_id': ['CUST101', 'CUST102', 'CUST101', 'CUST103'],
'product_id': ['PROD201', 'PROD202', 'PROD203', 'PROD201'],
'quantity': [2, 1, 3, 1],
'order_date': ['2024-01-15', '2024-01-15', '2024-01-16', '2024-01-17'],
'status': ['completed', 'pending', 'completed', 'completed']
}
products_data = {
'product_id': ['PROD201', 'PROD202', 'PROD203'],
'product_name': ['Wireless Mouse ๐ฑ๏ธ', 'Mechanical Keyboard โจ๏ธ', 'USB Hub ๐'],
'price': [29.99, 89.99, 19.99],
'category': ['Accessories', 'Accessories', 'Accessories']
}
self.orders_df = pd.DataFrame(orders_data)
self.products_df = pd.DataFrame(products_data)
print("โ
Data extraction complete! ๐")
# ๐ Transform the data
def transform(self):
print("๐ Transforming data...")
# ๐ Join orders with products
self.transformed_data = self.orders_df.merge(
self.products_df,
on='product_id',
how='left'
)
# ๐ฐ Calculate revenue
self.transformed_data['revenue'] = (
self.transformed_data['quantity'] *
self.transformed_data['price']
)
# ๐
Convert dates
self.transformed_data['order_date'] = pd.to_datetime(
self.transformed_data['order_date']
)
# ๐ Add time-based features
self.transformed_data['order_month'] = (
self.transformed_data['order_date'].dt.month_name()
)
self.transformed_data['order_day'] = (
self.transformed_data['order_date'].dt.day_name()
)
# ๐ท๏ธ Categorize order value
self.transformed_data['order_size'] = pd.cut(
self.transformed_data['revenue'],
bins=[0, 50, 100, 500, 1000],
labels=['Small ๐ญ', 'Medium ๐', 'Large ๐', 'Huge ๐ฆ']
)
# ๐งน Clean up
self.transformed_data = self.transformed_data[
self.transformed_data['status'] == 'completed'
]
print("โจ Transformation complete!")
# ๐ค Load to destination
def load(self):
print("๐ค Loading data to destinations...")
# ๐พ Save to CSV
self.transformed_data.to_csv('sales_report.csv', index=False)
# ๐ Create summary report
summary = self.transformed_data.groupby('product_name').agg({
'revenue': 'sum',
'quantity': 'sum',
'order_id': 'count'
}).rename(columns={'order_id': 'total_orders'})
summary.to_csv('product_summary.csv')
# ๐๏ธ Load to database (example)
# engine = create_engine('sqlite:///sales.db')
# self.transformed_data.to_sql('sales_fact', engine, if_exists='append')
print("๐ Data loaded successfully!")
# ๐ Run the complete pipeline
def run(self):
print("๐ Starting E-commerce ETL Pipeline...\n")
try:
self.extract()
self.transform()
self.load()
print("\nโ
Pipeline completed successfully! ๐")
print(f"\n๐ Processed {len(self.transformed_data)} orders")
print(f"๐ฐ Total revenue: ${self.transformed_data['revenue'].sum():.2f}")
except Exception as e:
print(f"โ Pipeline failed: {str(e)}")
raise
# ๐ฎ Run the pipeline
pipeline = EcommercePipeline()
pipeline.run()
# ๐ View results
print("\n๐ Sample transformed data:")
print(pipeline.transformed_data.head())
๐ฎ Example 2: Real-time Data Stream ETL
Letโs create a streaming ETL pipeline:
# ๐ Streaming ETL Pipeline
import time
import random
from datetime import datetime
from collections import deque
class StreamingETL:
def __init__(self, window_size=100):
self.window_size = window_size
self.data_buffer = deque(maxlen=window_size)
self.processed_count = 0
# ๐ฅ Extract streaming data
def extract_stream(self):
# ๐ฒ Simulate streaming sensor data
sensor_types = ['Temperature ๐ก๏ธ', 'Humidity ๐ง', 'Pressure ๐ช๏ธ']
data_point = {
'timestamp': datetime.now(),
'sensor_type': random.choice(sensor_types),
'value': random.uniform(20, 100),
'location': random.choice(['Room A ๐ ', 'Room B ๐ข', 'Room C ๐ญ']),
'status': 'active' if random.random() > 0.1 else 'error'
}
return data_point
# ๐ Transform streaming data
def transform_stream(self, data):
# ๐ก๏ธ Add temperature conversion
if data['sensor_type'] == 'Temperature ๐ก๏ธ':
data['value_fahrenheit'] = (data['value'] * 9/5) + 32
data['value_kelvin'] = data['value'] + 273.15
# ๐จ Add alert level
if data['value'] > 80:
data['alert_level'] = 'High โ ๏ธ'
elif data['value'] > 60:
data['alert_level'] = 'Medium ๐ก'
else:
data['alert_level'] = 'Low ๐ข'
# ๐ Add rolling statistics
recent_values = [d['value'] for d in self.data_buffer if d['sensor_type'] == data['sensor_type']]
if recent_values:
data['rolling_avg'] = sum(recent_values) / len(recent_values)
data['rolling_max'] = max(recent_values)
data['rolling_min'] = min(recent_values)
return data
# ๐ค Load streaming data
def load_stream(self, data):
# ๐พ Add to buffer
self.data_buffer.append(data)
self.processed_count += 1
# ๐จ Trigger alerts
if data['alert_level'] == 'High โ ๏ธ':
print(f"๐จ ALERT: High {data['sensor_type']} reading in {data['location']}!")
# ๐ Periodic summary
if self.processed_count % 10 == 0:
self.print_summary()
# ๐ Print summary statistics
def print_summary(self):
if not self.data_buffer:
return
print(f"\n๐ Stream Summary (Last {len(self.data_buffer)} readings):")
# Group by sensor type
sensor_stats = {}
for data in self.data_buffer:
sensor = data['sensor_type']
if sensor not in sensor_stats:
sensor_stats[sensor] = []
sensor_stats[sensor].append(data['value'])
for sensor, values in sensor_stats.items():
avg_value = sum(values) / len(values)
print(f" {sensor}: Avg={avg_value:.2f}, Count={len(values)}")
# ๐ Run streaming pipeline
def run_stream(self, duration=30):
print(f"๐ Starting streaming ETL pipeline for {duration} seconds...\n")
start_time = time.time()
while time.time() - start_time < duration:
# ETL cycle
raw_data = self.extract_stream()
transformed_data = self.transform_stream(raw_data)
self.load_stream(transformed_data)
# Display current reading
print(f"๐ก {transformed_data['timestamp'].strftime('%H:%M:%S')} | "
f"{transformed_data['sensor_type']} | "
f"Value: {transformed_data['value']:.2f} | "
f"Alert: {transformed_data['alert_level']}")
# Simulate streaming delay
time.sleep(1)
print(f"\nโ
Streaming pipeline completed! Processed {self.processed_count} readings ๐")
# ๐ฎ Run the streaming pipeline
streaming_etl = StreamingETL(window_size=50)
streaming_etl.run_stream(duration=10) # Run for 10 seconds
๐ Advanced ETL Concepts
๐งโโ๏ธ Advanced Topic 1: Parallel Processing
When youโre ready to level up, try parallel ETL processing:
# ๐ฏ Parallel ETL Processing
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import pandas as pd
class ParallelETL:
def __init__(self, num_workers=4):
self.num_workers = num_workers
# ๐ Parallel extraction
def parallel_extract(self, file_list):
print(f"๐ฅ Extracting {len(file_list)} files in parallel...")
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
# ๐ฏ Map extract function to each file
results = list(executor.map(self._extract_file, file_list))
# ๐ Combine all dataframes
combined_df = pd.concat(results, ignore_index=True)
print(f"โ
Extracted {len(combined_df)} total records!")
return combined_df
def _extract_file(self, filename):
# ๐ Simulate file extraction
print(f" ๐ Extracting {filename}...")
# Simulate data
data = {
'file': [filename] * 100,
'value': [random.random() * 100 for _ in range(100)],
'timestamp': [datetime.now() for _ in range(100)]
}
return pd.DataFrame(data)
# ๐ Parallel transformation
def parallel_transform(self, df):
print("๐ Transforming data in parallel...")
# ๐ช Split dataframe into chunks
chunk_size = len(df) // self.num_workers
chunks = [df[i:i+chunk_size] for i in range(0, len(df), chunk_size)]
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
# ๐ฏ Transform each chunk
transformed_chunks = list(executor.map(self._transform_chunk, chunks))
# ๐ Combine results
result = pd.concat(transformed_chunks, ignore_index=True)
print(f"โจ Transformed {len(result)} records!")
return result
@staticmethod
def _transform_chunk(chunk):
# ๐ง Apply transformations
chunk['value_squared'] = chunk['value'] ** 2
chunk['value_category'] = pd.cut(
chunk['value'],
bins=[0, 25, 50, 75, 100],
labels=['Low ๐', 'Medium ๐', 'High ๐', 'Very High ๐']
)
return chunk
# ๐ฎ Use parallel ETL
files = [f'data_file_{i}.csv' for i in range(1, 5)]
parallel_etl = ParallelETL(num_workers=4)
# Run parallel extraction
# data = parallel_etl.parallel_extract(files)
# transformed = parallel_etl.parallel_transform(data)
๐๏ธ Advanced Topic 2: Error Handling and Recovery
For production-ready pipelines:
# ๐ Robust ETL with Error Handling
class RobustETL:
def __init__(self):
self.error_log = []
self.checkpoint_data = None
# ๐ก๏ธ Decorator for error handling
def handle_errors(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as e:
error_msg = f"โ Error in {func.__name__}: {str(e)}"
print(error_msg)
self.error_log.append({
'function': func.__name__,
'error': str(e),
'timestamp': datetime.now()
})
# ๐ Try recovery
if hasattr(self, f'recover_{func.__name__}'):
print(f"๐ Attempting recovery for {func.__name__}...")
recovery_func = getattr(self, f'recover_{func.__name__}')
return recovery_func(*args, **kwargs)
raise
return wrapper
# ๐พ Checkpoint management
def save_checkpoint(self, data, stage):
self.checkpoint_data = {
'data': data.copy(),
'stage': stage,
'timestamp': datetime.now()
}
print(f"๐พ Checkpoint saved at stage: {stage}")
def load_checkpoint(self):
if self.checkpoint_data:
print(f"๐ฅ Loading checkpoint from stage: {self.checkpoint_data['stage']}")
return self.checkpoint_data['data']
return None
@handle_errors
def extract_with_retry(self, source, max_retries=3):
for attempt in range(max_retries):
try:
print(f"๐ฅ Extraction attempt {attempt + 1}/{max_retries}...")
# Simulate extraction that might fail
if random.random() < 0.3: # 30% chance of failure
raise Exception("Network timeout")
# Success!
data = pd.DataFrame({
'id': range(1000),
'value': [random.random() * 100 for _ in range(1000)]
})
self.save_checkpoint(data, 'extraction')
return data
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"โ ๏ธ Attempt {attempt + 1} failed: {str(e)}. Retrying...")
time.sleep(2 ** attempt) # Exponential backoff
# ๐ Data quality checks
def validate_data_quality(self, df):
quality_report = {
'total_rows': len(df),
'null_values': df.isnull().sum().sum(),
'duplicate_rows': df.duplicated().sum(),
'data_types': df.dtypes.to_dict()
}
# ๐จ Quality alerts
if quality_report['null_values'] > len(df) * 0.1:
print("โ ๏ธ Warning: More than 10% null values detected!")
if quality_report['duplicate_rows'] > 0:
print(f"๐งน Removing {quality_report['duplicate_rows']} duplicates...")
df = df.drop_duplicates()
return df, quality_report
# ๐ฎ Example usage
robust_etl = RobustETL()
# data = robust_etl.extract_with_retry('data_source')
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Memory Overflow with Large Datasets
# โ Wrong way - loading entire dataset into memory
def bad_etl():
huge_df = pd.read_csv('10GB_file.csv') # ๐ฅ Memory error!
processed = huge_df.apply(complex_transform)
# โ
Correct way - process in chunks
def good_etl():
chunk_size = 10000
# ๐ฆ Process file in chunks
for chunk in pd.read_csv('10GB_file.csv', chunksize=chunk_size):
processed_chunk = chunk.apply(simple_transform)
# ๐พ Append to output file
processed_chunk.to_csv('output.csv', mode='a', header=False, index=False)
print("โ
Large file processed successfully! ๐")
๐คฏ Pitfall 2: Not Handling Schema Changes
# โ Dangerous - assumes fixed schema
def rigid_etl(df):
return df[['col1', 'col2', 'col3']] # ๐ฅ KeyError if columns change!
# โ
Safe - handle schema flexibility
def flexible_etl(df):
# ๐ก๏ธ Define expected columns
expected_cols = ['col1', 'col2', 'col3']
# ๐ Check what columns exist
existing_cols = [col for col in expected_cols if col in df.columns]
missing_cols = [col for col in expected_cols if col not in df.columns]
if missing_cols:
print(f"โ ๏ธ Missing columns: {missing_cols}")
# ๐ง Add missing columns with default values
for col in missing_cols:
df[col] = None
# โ
Now safe to select columns
return df[expected_cols]
๐ ๏ธ Best Practices
- ๐ฏ Idempotency: Make your ETL processes repeatable without side effects
- ๐ Monitoring: Track metrics like processing time, record counts, and errors
- ๐ก๏ธ Error Handling: Implement comprehensive error handling and recovery
- ๐พ Checkpointing: Save progress for long-running pipelines
- ๐ Documentation: Document data sources, transformations, and business rules
- ๐งช Testing: Test with edge cases and bad data
- ๐ Performance: Optimize for your specific use case (batch vs. streaming)
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Weather Data ETL Pipeline
Create a comprehensive weather data ETL pipeline:
๐ Requirements:
- โ Extract data from multiple weather stations
- ๐ก๏ธ Convert between temperature units (Celsius/Fahrenheit/Kelvin)
- ๐ Calculate daily aggregates (min, max, average)
- ๐ฆ๏ธ Categorize weather conditions
- ๐ Generate weather trend analysis
- ๐จ Alert system for extreme weather
๐ Bonus Points:
- Add data validation and quality checks
- Implement parallel processing for multiple stations
- Create visualizations of weather patterns
- Add predictive analytics
๐ก Solution
๐ Click to see solution
# ๐ฏ Weather Data ETL Pipeline Solution
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
class WeatherETL:
def __init__(self):
self.stations = ['Station_A ๐๏ธ', 'Station_B ๐', 'Station_C ๐๏ธ']
self.weather_data = None
self.processed_data = None
self.alerts = []
# ๐ฅ Extract weather data
def extract(self):
print("๐ฅ Extracting weather data from stations...")
# ๐ฒ Generate sample weather data
data_list = []
for station in self.stations:
for day in range(30): # Last 30 days
date = datetime.now() - timedelta(days=day)
# ๐ก๏ธ Generate temperature data
base_temp = 20 + (10 * np.sin(day/7)) # Seasonal variation
for hour in range(24):
temp_c = base_temp + np.random.normal(0, 5) + (5 * np.sin(hour/6))
data_list.append({
'station': station,
'timestamp': date.replace(hour=hour),
'temperature_c': round(temp_c, 1),
'humidity': round(50 + np.random.normal(0, 15), 1),
'pressure': round(1013 + np.random.normal(0, 10), 1),
'wind_speed': round(abs(np.random.normal(10, 5)), 1)
})
self.weather_data = pd.DataFrame(data_list)
print(f"โ
Extracted {len(self.weather_data)} weather records!")
# ๐ Transform weather data
def transform(self):
print("๐ Transforming weather data...")
df = self.weather_data.copy()
# ๐ก๏ธ Temperature conversions
df['temperature_f'] = (df['temperature_c'] * 9/5) + 32
df['temperature_k'] = df['temperature_c'] + 273.15
# ๐
Add time features
df['date'] = df['timestamp'].dt.date
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.day_name()
# ๐ฆ๏ธ Categorize weather conditions
def categorize_weather(row):
temp = row['temperature_c']
humidity = row['humidity']
wind = row['wind_speed']
if temp > 30:
return 'Hot ๐ฅ'
elif temp < 0:
return 'Freezing โ๏ธ'
elif humidity > 80 and temp > 20:
return 'Humid ๐ฆ'
elif wind > 25:
return 'Windy ๐ช๏ธ'
elif 15 <= temp <= 25 and humidity < 70:
return 'Pleasant โ๏ธ'
else:
return 'Mild ๐ค๏ธ'
df['weather_condition'] = df.apply(categorize_weather, axis=1)
# ๐ Calculate daily aggregates
daily_stats = df.groupby(['station', 'date']).agg({
'temperature_c': ['min', 'max', 'mean'],
'humidity': 'mean',
'pressure': 'mean',
'wind_speed': 'max'
}).round(1)
daily_stats.columns = ['temp_min', 'temp_max', 'temp_avg',
'humidity_avg', 'pressure_avg', 'wind_max']
daily_stats = daily_stats.reset_index()
# ๐จ Generate alerts
extreme_temps = df[
(df['temperature_c'] > 35) | (df['temperature_c'] < -5)
]
for _, row in extreme_temps.iterrows():
alert = {
'station': row['station'],
'timestamp': row['timestamp'],
'alert_type': 'Extreme Temperature ๐จ',
'value': f"{row['temperature_c']}ยฐC",
'severity': 'High'
}
self.alerts.append(alert)
self.processed_data = df
self.daily_stats = daily_stats
print(f"โจ Transformation complete! Generated {len(self.alerts)} weather alerts.")
# ๐ค Load processed data
def load(self):
print("๐ค Loading processed data...")
# ๐พ Save detailed data
self.processed_data.to_csv('weather_detailed.csv', index=False)
# ๐ Save daily summaries
self.daily_stats.to_csv('weather_daily_summary.csv', index=False)
# ๐จ Save alerts
if self.alerts:
alerts_df = pd.DataFrame(self.alerts)
alerts_df.to_csv('weather_alerts.csv', index=False)
print("\n๐จ Weather Alerts:")
for alert in self.alerts[:5]: # Show first 5 alerts
print(f" {alert['station']} - {alert['alert_type']}: {alert['value']}")
# ๐ Generate visualizations
self.create_visualizations()
print("๐ Data loaded successfully!")
# ๐ Create weather visualizations
def create_visualizations(self):
print("๐ Creating weather visualizations...")
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle('Weather Analysis Dashboard ๐ก๏ธ', fontsize=16)
# Temperature trends
for station in self.stations:
station_data = self.daily_stats[self.daily_stats['station'] == station]
axes[0, 0].plot(station_data['date'], station_data['temp_avg'],
label=station, marker='o')
axes[0, 0].set_title('Average Temperature Trends')
axes[0, 0].set_ylabel('Temperature (ยฐC)')
axes[0, 0].legend()
axes[0, 0].tick_params(axis='x', rotation=45)
# Weather condition distribution
condition_counts = self.processed_data['weather_condition'].value_counts()
axes[0, 1].pie(condition_counts.values, labels=condition_counts.index,
autopct='%1.1f%%')
axes[0, 1].set_title('Weather Condition Distribution')
# Station comparison
station_avg = self.daily_stats.groupby('station')['temp_avg'].mean()
axes[1, 0].bar(station_avg.index, station_avg.values,
color=['#ff9999', '#66b3ff', '#99ff99'])
axes[1, 0].set_title('Average Temperature by Station')
axes[1, 0].set_ylabel('Temperature (ยฐC)')
# Humidity vs Temperature
sample_data = self.processed_data.sample(min(1000, len(self.processed_data)))
scatter = axes[1, 1].scatter(sample_data['temperature_c'],
sample_data['humidity'],
c=sample_data['temperature_c'],
cmap='coolwarm', alpha=0.6)
axes[1, 1].set_title('Temperature vs Humidity')
axes[1, 1].set_xlabel('Temperature (ยฐC)')
axes[1, 1].set_ylabel('Humidity (%)')
plt.colorbar(scatter, ax=axes[1, 1])
plt.tight_layout()
plt.savefig('weather_dashboard.png', dpi=300, bbox_inches='tight')
print("๐ Dashboard saved as weather_dashboard.png")
# ๐ Run complete pipeline
def run(self):
print("๐ Starting Weather ETL Pipeline...\n")
try:
self.extract()
self.transform()
self.load()
print(f"\nโ
Pipeline completed successfully! ๐")
print(f"๐ Processed {len(self.weather_data)} records")
print(f"๐ก๏ธ Temperature range: {self.processed_data['temperature_c'].min():.1f}ยฐC to {self.processed_data['temperature_c'].max():.1f}ยฐC")
print(f"๐จ Generated {len(self.alerts)} weather alerts")
except Exception as e:
print(f"โ Pipeline failed: {str(e)}")
raise
# ๐ฎ Run the weather ETL pipeline
weather_etl = WeatherETL()
weather_etl.run()
# ๐ Display sample results
print("\n๐ Sample Daily Statistics:")
print(weather_etl.daily_stats.head())
๐ Key Takeaways
Youโve learned so much about ETL and data pipelines! Hereโs what you can now do:
- โ Build ETL pipelines from scratch with confidence ๐ช
- โ Extract data from multiple sources (files, APIs, databases) ๐ฅ
- โ Transform data with cleaning, validation, and enrichment ๐
- โ Load data to various destinations reliably ๐ค
- โ Handle errors gracefully with retry logic and checkpointing ๐ก๏ธ
- โ Process large datasets efficiently with chunking and parallel processing ๐
- โ Monitor and alert on data quality issues ๐จ
Remember: ETL is the backbone of data engineering! Every data scientist and engineer needs these skills. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered ETL and data pipelines with Python!
Hereโs what to do next:
- ๐ป Practice with the weather ETL exercise above
- ๐๏ธ Build an ETL pipeline for your own data project
- ๐ Learn about Apache Airflow for production ETL orchestration
- ๐ Explore streaming ETL with Apache Kafka or Apache Spark
- ๐ Share your ETL projects with the data community!
Remember: Every data engineering expert started with their first ETL pipeline. Keep building, keep learning, and most importantly, have fun transforming data! ๐
Happy data pipelining! ๐๐โจ