What is MLOps?
MLOps (Machine Learning Operations) combines ML, DevOps, and data engineering to deploy and maintain ML systems reliably in production.
MLOps Principles:
- Automation: CI/CD for ML pipelines
- Versioning: Code, data, and models
- Monitoring: Track model performance
- Reproducibility: Consistent results
- Collaboration: Data scientists + engineers
🔄 ML Lifecycle
# Typical ML workflow stages:
1. Data Collection & Preparation
- Gather data from sources
- Clean and validate
- Version control data
2. Feature Engineering
- Create features
- Feature store for reuse
3. Model Training
- Experiment tracking
- Hyperparameter tuning
- Model versioning
4. Model Evaluation
- Test on validation set
- Compare with baseline
- A/B testing
5. Deployment
- Package model
- Deploy to production
- Gradual rollout
6. Monitoring
- Track predictions
- Detect drift
- Alert on anomalies
7. Retraining
- Trigger based on performance
- Automated retraining pipeline
📊 Experiment Tracking - MLflow
# pip install mlflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
# Start MLflow run
with mlflow.start_run():
# Generate data
X, y = make_classification(n_samples=1000, n_features=20)
X_train, X_test, y_train, y_test = train_test_split(X, y)
# Log parameters
n_estimators = 100
max_depth = 10
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
# Train model
model = RandomForestClassifier(n_estimators=n_estimators,
max_depth=max_depth)
model.fit(X_train, y_train)
# Log metrics
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
mlflow.log_metric("train_accuracy", train_score)
mlflow.log_metric("test_accuracy", test_score)
# Log model
mlflow.sklearn.log_model(model, "model")
print(f"Run ID: {mlflow.active_run().info.run_id}")
# View results: mlflow ui
# Access at http://localhost:5000
📦 Model Registry
# Register model with MLflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register model
model_uri = "runs:/RUN_ID/model"
mlflow.register_model(model_uri, "RandomForestClassifier")
# Transition to production
client.transition_model_version_stage(
name="RandomForestClassifier",
version=1,
stage="Production"
)
# Load production model
model = mlflow.pyfunc.load_model(
model_uri="models:/RandomForestClassifier/Production"
)
# Use model
predictions = model.predict(X_test)
🔗 Data Versioning - DVC
# pip install dvc
# Initialize DVC
dvc init
# Track data file
dvc add data/train.csv
# Commit to git
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"
# Push data to remote storage (S3, GCS, etc.)
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
# Pull data on another machine
git pull
dvc pull
# Create pipeline
dvc run -n prepare \
-d data/raw.csv \
-o data/processed.csv \
python prepare.py
dvc run -n train \
-d data/processed.csv \
-o model.pkl \
python train.py
# Reproduce pipeline
dvc repro
🤖 CI/CD Pipeline
GitHub Actions
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
branches: [main]
jobs:
train-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run tests
run: |
pytest tests/
- name: Train model
run: |
python train.py
- name: Evaluate model
run: |
python evaluate.py
- name: Build Docker image
run: |
docker build -t ml-model:${{ github.sha }} .
- name: Deploy to production
if: success()
run: |
# Deploy commands here
echo "Deploying model..."
📈 Model Monitoring
import numpy as np
from scipy import stats
class ModelMonitor:
def __init__(self, reference_data):
self.reference_data = reference_data
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
def detect_drift(self, new_data, threshold=0.05):
"""Detect distribution drift using KS test"""
drifted_features = []
for i in range(new_data.shape[1]):
# Kolmogorov-Smirnov test
statistic, p_value = stats.ks_2samp(
self.reference_data[:, i],
new_data[:, i]
)
if p_value < threshold:
drifted_features.append(i)
return drifted_features
def check_input_quality(self, data):
"""Check for data quality issues"""
issues = []
# Missing values
if np.any(np.isnan(data)):
issues.append("Missing values detected")
# Out of range
z_scores = np.abs((data - self.reference_mean) / self.reference_std)
if np.any(z_scores > 3):
issues.append("Outliers detected (z-score > 3)")
return issues
# Usage
monitor = ModelMonitor(X_train)
drifted = monitor.detect_drift(X_test)
if drifted:
print(f"Drift detected in features: {drifted}")
# Trigger retraining
issues = monitor.check_input_quality(X_test)
if issues:
print(f"Quality issues: {issues}")
🔔 Alerting
# Monitor model performance and alert
import smtplib
from email.mime.text import MIMEText
class PerformanceMonitor:
def __init__(self, baseline_accuracy=0.85, threshold=0.05):
self.baseline_accuracy = baseline_accuracy
self.threshold = threshold
self.recent_predictions = []
self.recent_actuals = []
def log_prediction(self, prediction, actual):
self.recent_predictions.append(prediction)
self.recent_actuals.append(actual)
# Keep last 100 predictions
if len(self.recent_predictions) > 100:
self.recent_predictions.pop(0)
self.recent_actuals.pop(0)
def check_performance(self):
if len(self.recent_predictions) < 10:
return True
# Calculate recent accuracy
correct = sum(p == a for p, a in
zip(self.recent_predictions, self.recent_actuals))
accuracy = correct / len(self.recent_predictions)
# Check if degraded
if accuracy < self.baseline_accuracy - self.threshold:
self.send_alert(accuracy)
return False
return True
def send_alert(self, current_accuracy):
message = f"""
Model performance degradation detected!
Baseline accuracy: {self.baseline_accuracy:.2%}
Current accuracy: {current_accuracy:.2%}
Threshold: {self.threshold:.2%}
Action required: Investigate and retrain model.
"""
print(f"ALERT: {message}")
# Send email/Slack notification here
# Usage
monitor = PerformanceMonitor(baseline_accuracy=0.90)
for pred, actual in zip(predictions, actuals):
monitor.log_prediction(pred, actual)
if not monitor.check_performance():
# Trigger retraining pipeline
pass
🔄 Automated Retraining
# Scheduled retraining pipeline
from datetime import datetime
import schedule
class RetrainingPipeline:
def __init__(self, model, data_loader):
self.model = model
self.data_loader = data_loader
self.last_trained = None
def should_retrain(self):
# Check conditions
if self.last_trained is None:
return True
# Retrain if model is old
days_since_training = (datetime.now() - self.last_trained).days
if days_since_training > 7:
return True
# Retrain if performance dropped
# (implement your logic)
return False
def retrain(self):
print(f"Starting retraining at {datetime.now()}")
# Load new data
X_train, y_train = self.data_loader.load_latest()
# Train model
self.model.fit(X_train, y_train)
# Evaluate
X_val, y_val = self.data_loader.load_validation()
score = self.model.score(X_val, y_val)
if score > 0.85: # Threshold
# Save and deploy
joblib.dump(self.model, 'model_new.joblib')
self.last_trained = datetime.now()
print(f"Retraining successful. Score: {score:.3f}")
return True
else:
print(f"Retraining failed. Score: {score:.3f}")
return False
def run(self):
if self.should_retrain():
self.retrain()
# Schedule retraining
pipeline = RetrainingPipeline(model, data_loader)
schedule.every().day.at("02:00").do(pipeline.run)
# Or trigger on demand
if drift_detected or performance_dropped:
pipeline.retrain()
🛠️ MLOps Tools
| Tool | Purpose | Key Features |
|---|---|---|
| MLflow | Experiment tracking | Track runs, model registry, deployment |
| DVC | Data versioning | Git for data, pipeline management |
| Kubeflow | ML on Kubernetes | Pipelines, serving, distributed training |
| Airflow | Workflow orchestration | DAGs, scheduling, monitoring |
| Weights & Biases | Experiment tracking | Visualizations, collaboration |
| SageMaker | End-to-end ML (AWS) | Training, deployment, monitoring |
💡 MLOps Best Practices
- Version everything: Code, data, models, configs
- Automate pipelines: From data to deployment
- Monitor continuously: Performance, drift, quality
- Test thoroughly: Unit tests, integration tests
- Document decisions: Why this model? What features?
- Use feature stores: Consistent features across training/serving
- Implement rollback: Quick recovery from bad deployments
- A/B test changes: Validate improvements in production
- Collaborate: Data scientists + ML engineers + DevOps
🎯 Key Takeaways
- MLOps brings DevOps practices to ML
- MLflow for experiment tracking and model registry
- DVC for data and pipeline versioning
- CI/CD automates training and deployment
- Monitor for drift and performance degradation
- Automate retraining based on triggers
- Version everything for reproducibility