Deploy de Modelos de Machine Learning em Produção
Guia completo para fazer deploy de modelos de ML usando FastAPI, Docker e monitoramento — do treinamento à produção com boas práticas de MLOps
Deploy de Modelos de Machine Learning em Produção
Treinar um modelo de machine learning é apenas metade do trabalho. A outra metade — frequentemente mais desafiadora — é colocá-lo em produção de forma confiável, escalável e monitorada. Neste tutorial, vamos percorrer todo o caminho desde o treinamento de um modelo até o deploy em produção usando FastAPI, Docker e ferramentas de monitoramento.
Ao final deste guia, você terá um sistema completo e pronto para produção que inclui:
- Um modelo treinado e serializado com joblib.
- Uma API REST construída com FastAPI.
- Containerização com Docker e Docker Compose.
- Testes automatizados.
- Monitoramento e logging.
- Pipeline de CI/CD básico.
Pré-requisitos
Antes de começar, certifique-se de ter instalado:
- Python 3.11 ou superior
- Docker e Docker Compose
- pip ou uv (gerenciador de pacotes)
- Git
Conhecimento básico de Python, machine learning e APIs REST é recomendado.
Estrutura do Projeto
Vamos organizar nosso projeto seguindo boas práticas de MLOps:
ml-deploy/
├── app/
│ ├── __init__.py
│ ├── main.py # Aplicação FastAPI
│ ├── model.py # Carregamento e inferência do modelo
│ ├── schemas.py # Schemas Pydantic
│ ├── preprocessing.py # Pipeline de preprocessamento
│ └── monitoring.py # Monitoramento e métricas
├── model/
│ ├── train.py # Script de treinamento
│ ├── evaluate.py # Avaliação do modelo
│ └── artifacts/ # Modelos salvos
├── tests/
│ ├── __init__.py
│ ├── test_api.py # Testes da API
│ ├── test_model.py # Testes do modelo
│ └── test_preprocessing.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── pyproject.toml
└── README.md
Parte 1: Treinamento do Modelo
Definindo o Problema
Vamos treinar um modelo para prever o preço de imóveis baseado em características como área, número de quartos, localização, etc. Usaremos o dataset California Housing do scikit-learn como base.
Script de Treinamento
Crie o arquivo model/train.py:
"""
Script de treinamento do modelo de previsão de preços de imóveis.
Treina, avalia e salva o modelo com pipeline completo de preprocessamento.
"""
import json
import logging
from datetime import datetime
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
ARTIFACTS_DIR = Path(__file__).parent / "artifacts"
ARTIFACTS_DIR.mkdir(exist_ok=True)
def load_data() -> tuple[pd.DataFrame, pd.Series]:
"""Carrega e prepara os dados para treinamento."""
logger.info("Carregando dataset California Housing...")
housing = fetch_california_housing(as_frame=True)
X = housing.data
y = housing.target
# Adicionar features categóricas derivadas para demonstrar preprocessamento
X = X.copy()
X["ocean_proximity"] = pd.cut(
X["Longitude"],
bins=[-125, -122, -119, -114],
labels=["coastal", "inland", "desert"],
)
X["house_age_category"] = pd.cut(
X["HouseAge"],
bins=[0, 15, 30, 52],
labels=["new", "medium", "old"],
)
logger.info(f"Dataset carregado: {X.shape[0]} amostras, {X.shape[1]} features")
return X, y
def create_preprocessing_pipeline(
numeric_features: list[str], categorical_features: list[str]
) -> ColumnTransformer:
"""Cria pipeline de preprocessamento com transformações numéricas e categóricas."""
numeric_transformer = StandardScaler()
categorical_transformer = OneHotEncoder(drop="first", sparse_output=False, handle_unknown="ignore")
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
return preprocessor
def create_model_pipeline(
numeric_features: list[str], categorical_features: list[str]
) -> Pipeline:
"""Cria pipeline completo: preprocessamento + modelo."""
preprocessor = create_preprocessing_pipeline(numeric_features, categorical_features)
pipeline = Pipeline(
steps=[
("preprocessor", preprocessor),
(
"regressor",
GradientBoostingRegressor(
n_estimators=300,
max_depth=5,
learning_rate=0.1,
min_samples_split=10,
min_samples_leaf=5,
subsample=0.8,
random_state=42,
),
),
]
)
return pipeline
def evaluate_model(
pipeline: Pipeline, X_test: pd.DataFrame, y_test: pd.Series
) -> dict:
"""Avalia o modelo e retorna métricas."""
y_pred = pipeline.predict(X_test)
metrics = {
"mae": float(mean_absolute_error(y_test, y_pred)),
"rmse": float(np.sqrt(mean_squared_error(y_test, y_pred))),
"r2": float(r2_score(y_test, y_pred)),
"mape": float(np.mean(np.abs((y_test - y_pred) / y_test)) * 100),
}
logger.info("Métricas de avaliação:")
for name, value in metrics.items():
logger.info(f" {name.upper()}: {value:.4f}")
return metrics
def save_artifacts(
pipeline: Pipeline,
metrics: dict,
feature_names: list[str],
version: str,
) -> Path:
"""Salva o modelo treinado e metadados."""
model_path = ARTIFACTS_DIR / f"model_v{version}.joblib"
metadata_path = ARTIFACTS_DIR / f"metadata_v{version}.json"
# Salvar modelo
joblib.dump(pipeline, model_path)
logger.info(f"Modelo salvo em: {model_path}")
# Salvar metadados
metadata = {
"version": version,
"trained_at": datetime.now().isoformat(),
"metrics": metrics,
"feature_names": feature_names,
"model_type": "GradientBoostingRegressor",
"sklearn_version": __import__("sklearn").__version__,
}
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
logger.info(f"Metadados salvos em: {metadata_path}")
# Criar link simbólico para versão latest
latest_model = ARTIFACTS_DIR / "model_latest.joblib"
latest_metadata = ARTIFACTS_DIR / "metadata_latest.json"
if latest_model.exists():
latest_model.unlink()
if latest_metadata.exists():
latest_metadata.unlink()
latest_model.symlink_to(model_path.name)
latest_metadata.symlink_to(metadata_path.name)
return model_path
def train():
"""Pipeline principal de treinamento."""
logger.info("=" * 60)
logger.info("Iniciando pipeline de treinamento")
logger.info("=" * 60)
# Carregar dados
X, y = load_data()
# Definir features
numeric_features = [
"MedInc", "HouseAge", "AveRooms", "AveBedrms",
"Population", "AveOccup", "Latitude", "Longitude",
]
categorical_features = ["ocean_proximity", "house_age_category"]
# Split treino/teste
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
logger.info(f"Treino: {len(X_train)} amostras | Teste: {len(X_test)} amostras")
# Criar e treinar pipeline
pipeline = create_model_pipeline(numeric_features, categorical_features)
logger.info("Treinando modelo...")
pipeline.fit(X_train, y_train)
# Validação cruzada
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring="r2")
logger.info(f"CV R² scores: {cv_scores}")
logger.info(f"CV R² médio: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
# Avaliar no conjunto de teste
metrics = evaluate_model(pipeline, X_test, y_test)
# Salvar artefatos
version = datetime.now().strftime("%Y%m%d_%H%M%S")
all_features = numeric_features + categorical_features
save_artifacts(pipeline, metrics, all_features, version)
logger.info("=" * 60)
logger.info("Treinamento concluído com sucesso!")
logger.info("=" * 60)
return pipeline, metrics
if __name__ == "__main__":
train()
Execute o treinamento:
cd ml-deploy
python model/train.py
Parte 2: Criando a API com FastAPI
Schemas de Entrada e Saída
Crie o arquivo app/schemas.py:
"""Schemas Pydantic para validação de entrada e saída da API."""
from pydantic import BaseModel, Field
class HousingFeatures(BaseModel):
"""Features de entrada para previsão de preço."""
MedInc: float = Field(..., ge=0, description="Renda mediana do bloco (em dezenas de milhar)")
HouseAge: float = Field(..., ge=0, le=52, description="Idade mediana das casas no bloco")
AveRooms: float = Field(..., ge=0, description="Média de cômodos por domicílio")
AveBedrms: float = Field(..., ge=0, description="Média de quartos por domicílio")
Population: float = Field(..., ge=0, description="População do bloco")
AveOccup: float = Field(..., ge=0, description="Média de ocupantes por domicílio")
Latitude: float = Field(..., ge=32, le=42, description="Latitude")
Longitude: float = Field(..., ge=-125, le=-114, description="Longitude")
model_config = {
"json_schema_extra": {
"examples": [
{
"MedInc": 8.3252,
"HouseAge": 41.0,
"AveRooms": 6.984,
"AveBedrms": 1.024,
"Population": 322.0,
"AveOccup": 2.556,
"Latitude": 37.88,
"Longitude": -122.23,
}
]
}
}
class PredictionResponse(BaseModel):
"""Resposta da previsão."""
predicted_price: float = Field(..., description="Preço previsto (em centenas de milhar)")
model_version: str = Field(..., description="Versão do modelo utilizado")
confidence_interval: dict = Field(
..., description="Intervalo de confiança da previsão"
)
class BatchPredictionRequest(BaseModel):
"""Requisição para previsão em lote."""
instances: list[HousingFeatures] = Field(
..., min_length=1, max_length=1000, description="Lista de instâncias"
)
class BatchPredictionResponse(BaseModel):
"""Resposta de previsão em lote."""
predictions: list[PredictionResponse]
total_instances: int
model_version: str
class HealthResponse(BaseModel):
"""Resposta do endpoint de health check."""
status: str
model_loaded: bool
model_version: str | None
uptime_seconds: float
Carregamento do Modelo
Crie o arquivo app/model.py:
"""Gerenciamento do modelo: carregamento, inferência e versionamento."""
import json
import logging
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
MODEL_DIR = Path(__file__).parent.parent / "model" / "artifacts"
class ModelManager:
"""Gerencia o ciclo de vida do modelo em produção."""
def __init__(self):
self.pipeline = None
self.metadata = None
self.is_loaded = False
def load(self, version: str = "latest") -> None:
"""Carrega o modelo e metadados."""
model_path = MODEL_DIR / f"model_{version}.joblib"
metadata_path = MODEL_DIR / f"metadata_{version}.json"
if not model_path.exists():
raise FileNotFoundError(f"Modelo não encontrado: {model_path}")
self.pipeline = joblib.load(model_path)
if metadata_path.exists():
with open(metadata_path) as f:
self.metadata = json.load(f)
self.is_loaded = True
logger.info(f"Modelo carregado: versão {self.version}")
@property
def version(self) -> str:
if self.metadata:
return self.metadata.get("version", "unknown")
return "unknown"
def predict(self, features: dict) -> dict:
"""Realiza previsão para uma única instância."""
if not self.is_loaded:
raise RuntimeError("Modelo não carregado")
df = self._prepare_dataframe(features)
prediction = float(self.pipeline.predict(df)[0])
# Calcular intervalo de confiança aproximado baseado no RMSE do treino
rmse = self.metadata.get("metrics", {}).get("rmse", 0.5) if self.metadata else 0.5
ci_lower = prediction - 1.96 * rmse
ci_upper = prediction + 1.96 * rmse
return {
"predicted_price": round(prediction, 4),
"model_version": self.version,
"confidence_interval": {
"lower": round(max(0, ci_lower), 4),
"upper": round(ci_upper, 4),
"confidence_level": 0.95,
},
}
def predict_batch(self, instances: list[dict]) -> list[dict]:
"""Realiza previsão para múltiplas instâncias."""
return [self.predict(instance) for instance in instances]
def _prepare_dataframe(self, features: dict) -> pd.DataFrame:
"""Prepara DataFrame com features derivadas."""
df = pd.DataFrame([features])
df["ocean_proximity"] = pd.cut(
df["Longitude"],
bins=[-125, -122, -119, -114],
labels=["coastal", "inland", "desert"],
)
df["house_age_category"] = pd.cut(
df["HouseAge"],
bins=[0, 15, 30, 52],
labels=["new", "medium", "old"],
)
return df
# Singleton
model_manager = ModelManager()
Monitoramento
Crie o arquivo app/monitoring.py:
"""Monitoramento de performance e métricas da API."""
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class PredictionLog:
"""Log de uma previsão individual."""
timestamp: str
input_features: dict
prediction: float
latency_ms: float
class MonitoringService:
"""Serviço de monitoramento para métricas da API e do modelo."""
def __init__(self, max_history: int = 10000):
self.start_time = time.time()
self.prediction_count = 0
self.error_count = 0
self.total_latency = 0.0
self.prediction_history: deque[PredictionLog] = deque(maxlen=max_history)
self.prediction_values: deque[float] = deque(maxlen=max_history)
@property
def uptime_seconds(self) -> float:
return time.time() - self.start_time
@property
def avg_latency_ms(self) -> float:
if self.prediction_count == 0:
return 0.0
return self.total_latency / self.prediction_count
def log_prediction(
self, features: dict, prediction: float, latency_ms: float
) -> None:
"""Registra uma previsão para monitoramento."""
self.prediction_count += 1
self.total_latency += latency_ms
self.prediction_values.append(prediction)
log_entry = PredictionLog(
timestamp=datetime.now().isoformat(),
input_features=features,
prediction=prediction,
latency_ms=latency_ms,
)
self.prediction_history.append(log_entry)
def log_error(self) -> None:
"""Registra um erro."""
self.error_count += 1
def get_stats(self) -> dict:
"""Retorna estatísticas atuais do serviço."""
recent_predictions = list(self.prediction_values)
stats = {
"uptime_seconds": round(self.uptime_seconds, 2),
"total_predictions": self.prediction_count,
"total_errors": self.error_count,
"error_rate": (
round(self.error_count / max(1, self.prediction_count + self.error_count), 4)
),
"avg_latency_ms": round(self.avg_latency_ms, 2),
}
if recent_predictions:
import numpy as np
stats["prediction_stats"] = {
"mean": round(float(np.mean(recent_predictions)), 4),
"std": round(float(np.std(recent_predictions)), 4),
"min": round(float(np.min(recent_predictions)), 4),
"max": round(float(np.max(recent_predictions)), 4),
}
return stats
# Singleton
monitoring = MonitoringService()
Aplicação FastAPI Principal
Crie o arquivo app/main.py:
"""
API de Machine Learning para previsão de preços de imóveis.
Deploy-ready com FastAPI, monitoramento e validação robusta.
"""
import logging
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from app.model import model_manager
from app.monitoring import monitoring
from app.schemas import (
BatchPredictionRequest,
BatchPredictionResponse,
HealthResponse,
HousingFeatures,
PredictionResponse,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Gerencia o ciclo de vida da aplicação."""
logger.info("Iniciando aplicação...")
try:
model_manager.load("latest")
logger.info("Modelo carregado com sucesso!")
except FileNotFoundError:
logger.error("Modelo não encontrado! Treine o modelo antes de iniciar a API.")
raise
yield
logger.info("Encerrando aplicação...")
app = FastAPI(
title="ML Housing Price Prediction API",
description="API para previsão de preços de imóveis usando Machine Learning",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Verifica a saúde da API e do modelo."""
return HealthResponse(
status="healthy",
model_loaded=model_manager.is_loaded,
model_version=model_manager.version,
uptime_seconds=monitoring.uptime_seconds,
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(features: HousingFeatures):
"""Realiza previsão para uma única instância."""
start_time = time.time()
try:
result = model_manager.predict(features.model_dump())
latency_ms = (time.time() - start_time) * 1000
monitoring.log_prediction(
features=features.model_dump(),
prediction=result["predicted_price"],
latency_ms=latency_ms,
)
return PredictionResponse(**result)
except Exception as e:
monitoring.log_error()
logger.error(f"Erro na previsão: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Realiza previsão para múltiplas instâncias."""
start_time = time.time()
try:
instances = [inst.model_dump() for inst in request.instances]
results = model_manager.predict_batch(instances)
latency_ms = (time.time() - start_time) * 1000
predictions = [PredictionResponse(**r) for r in results]
for inst, result in zip(instances, results):
monitoring.log_prediction(
features=inst,
prediction=result["predicted_price"],
latency_ms=latency_ms / len(instances),
)
return BatchPredictionResponse(
predictions=predictions,
total_instances=len(predictions),
model_version=model_manager.version,
)
except Exception as e:
monitoring.log_error()
logger.error(f"Erro na previsão batch: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def get_metrics():
"""Retorna métricas de monitoramento do serviço."""
return monitoring.get_stats()
@app.get("/model/info")
async def model_info():
"""Retorna informações sobre o modelo carregado."""
if not model_manager.is_loaded:
raise HTTPException(status_code=503, detail="Modelo não carregado")
return model_manager.metadata
Parte 3: Testes
Crie o arquivo tests/test_api.py:
"""Testes para a API de previsão."""
import pytest
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
VALID_INPUT = {
"MedInc": 8.3252,
"HouseAge": 41.0,
"AveRooms": 6.984,
"AveBedrms": 1.024,
"Population": 322.0,
"AveOccup": 2.556,
"Latitude": 37.88,
"Longitude": -122.23,
}
def test_health_check():
"""Testa o endpoint de health check."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert data["model_loaded"] is True
def test_predict_single():
"""Testa previsão de uma única instância."""
response = client.post("/predict", json=VALID_INPUT)
assert response.status_code == 200
data = response.json()
assert "predicted_price" in data
assert "model_version" in data
assert "confidence_interval" in data
assert data["predicted_price"] > 0
def test_predict_batch():
"""Testa previsão em lote."""
request = {"instances": [VALID_INPUT, VALID_INPUT]}
response = client.post("/predict/batch", json=request)
assert response.status_code == 200
data = response.json()
assert data["total_instances"] == 2
assert len(data["predictions"]) == 2
def test_predict_invalid_input():
"""Testa validação de entrada inválida."""
invalid = {**VALID_INPUT, "MedInc": -1}
response = client.post("/predict", json=invalid)
assert response.status_code == 422 # Validation error
def test_metrics():
"""Testa o endpoint de métricas."""
# Fazer uma previsão primeiro
client.post("/predict", json=VALID_INPUT)
response = client.get("/metrics")
assert response.status_code == 200
data = response.json()
assert "total_predictions" in data
assert data["total_predictions"] >= 1
def test_model_info():
"""Testa o endpoint de informações do modelo."""
response = client.get("/model/info")
assert response.status_code == 200
data = response.json()
assert "version" in data
assert "metrics" in data
Execute os testes:
pytest tests/ -v
Parte 4: Docker e Docker Compose
Dockerfile
Crie o Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Instalar dependências do sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copiar requirements e instalar dependências Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copiar código da aplicação
COPY app/ app/
COPY model/ model/
# Criar usuário não-root
RUN useradd --create-home appuser
USER appuser
# Expor porta
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
# Comando de inicialização
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
Docker Compose
Crie o docker-compose.yml:
version: "3.8"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- LOG_LEVEL=info
- WORKERS=2
volumes:
- ./model/artifacts:/app/model/artifacts:ro
restart: unless-stopped
deploy:
resources:
limits:
memory: 1G
cpus: "1.0"
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
restart: unless-stopped
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
restart: unless-stopped
Requirements
Crie o requirements.txt:
fastapi==0.115.0
uvicorn[standard]==0.32.0
scikit-learn==1.5.2
pandas==2.2.3
numpy==2.1.3
joblib==1.4.2
pydantic==2.10.0
httpx==0.28.0
pytest==8.3.4
Parte 5: Deploy e Execução
Build e Execução Local
# Treinar o modelo primeiro
python model/train.py
# Executar localmente (desenvolvimento)
uvicorn app.main:app --reload --port 8000
# Ou com Docker
docker build -t ml-api .
docker run -p 8000:8000 ml-api
# Ou com Docker Compose (inclui monitoramento)
docker-compose up -d
Testando a API
# Health check
curl http://localhost:8000/health
# Previsão individual
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{
"MedInc": 8.3252,
"HouseAge": 41.0,
"AveRooms": 6.984,
"AveBedrms": 1.024,
"Population": 322.0,
"AveOccup": 2.556,
"Latitude": 37.88,
"Longitude": -122.23
}'
# Métricas
curl http://localhost:8000/metrics
# Documentação interativa
# Acesse http://localhost:8000/docs no navegador
Parte 6: Boas Práticas de MLOps
Versionamento de Modelos
Sempre versione seus modelos com metadados completos:
- Data de treinamento
- Métricas de avaliação
- Parâmetros de treinamento
- Versão das bibliotecas
- Hash do dataset utilizado
Monitoramento de Data Drift
Em produção, monitore se os dados de entrada estão mudando em relação aos dados de treinamento. Isso pode indicar que o modelo precisa ser retreinado:
def detect_drift(training_stats: dict, incoming_data: pd.DataFrame) -> dict:
"""Detecta drift entre dados de treino e dados em produção."""
drift_report = {}
for feature, stats in training_stats.items():
if feature in incoming_data.columns:
current_mean = incoming_data[feature].mean()
training_mean = stats["mean"]
training_std = stats["std"]
z_score = abs(current_mean - training_mean) / max(training_std, 1e-8)
drift_report[feature] = {
"z_score": round(z_score, 4),
"drifted": z_score > 3.0,
"training_mean": training_mean,
"current_mean": round(current_mean, 4),
}
return drift_report
Rollback de Modelos
Sempre mantenha a versão anterior do modelo disponível para rollback rápido em caso de problemas. O sistema de versionamento que implementamos facilita isso — basta alterar o symlink model_latest.joblib para apontar para a versão desejada.
Conclusão
Neste tutorial, construímos um sistema completo de deploy de ML que inclui:
- Treinamento estruturado com pipeline de preprocessamento integrado ao modelo.
- API robusta com FastAPI, validação Pydantic e documentação automática.
- Containerização com Docker e Docker Compose para deploy consistente.
- Testes automatizados para garantir qualidade antes do deploy.
- Monitoramento com métricas de latência, erros e estatísticas de previsão.
Este é um ponto de partida sólido que pode ser expandido com autenticação, CI/CD completo, A/B testing de modelos, e integração com plataformas de MLOps como MLflow ou Weights & Biases.
Próximos Passos
- Adicionar autenticação JWT à API.
- Implementar CI/CD com GitHub Actions.
- Configurar dashboards no Grafana para monitoramento visual.
- Explorar deploy em cloud (AWS ECS, Google Cloud Run, Azure Container Instances).
- Implementar A/B testing entre versões de modelos.
- Integrar com MLflow para rastreamento de experimentos.
Acompanhe o Inteligência em Código para mais tutoriais sobre MLOps e engenharia de machine learning.