Pular para o conteúdo
🧠Inteligência em Código
📚 Tutoriais14 min de leitura

Deploy de Modelos de Machine Learning em Produção

Guia completo para fazer deploy de modelos de ML usando FastAPI, Docker e monitoramento — do treinamento à produção com boas práticas de MLOps

#deploy#ml#docker#fastapi#producao#mlops

Deploy de Modelos de Machine Learning em Produção

Treinar um modelo de machine learning é apenas metade do trabalho. A outra metade — frequentemente mais desafiadora — é colocá-lo em produção de forma confiável, escalável e monitorada. Neste tutorial, vamos percorrer todo o caminho desde o treinamento de um modelo até o deploy em produção usando FastAPI, Docker e ferramentas de monitoramento.

Ao final deste guia, você terá um sistema completo e pronto para produção que inclui:

  • Um modelo treinado e serializado com joblib.
  • Uma API REST construída com FastAPI.
  • Containerização com Docker e Docker Compose.
  • Testes automatizados.
  • Monitoramento e logging.
  • Pipeline de CI/CD básico.

Pré-requisitos

Antes de começar, certifique-se de ter instalado:

  • Python 3.11 ou superior
  • Docker e Docker Compose
  • pip ou uv (gerenciador de pacotes)
  • Git

Conhecimento básico de Python, machine learning e APIs REST é recomendado.


Estrutura do Projeto

Vamos organizar nosso projeto seguindo boas práticas de MLOps:

ml-deploy/
├── app/
│   ├── __init__.py
│   ├── main.py              # Aplicação FastAPI
│   ├── model.py             # Carregamento e inferência do modelo
│   ├── schemas.py           # Schemas Pydantic
│   ├── preprocessing.py     # Pipeline de preprocessamento
│   └── monitoring.py        # Monitoramento e métricas
├── model/
│   ├── train.py             # Script de treinamento
│   ├── evaluate.py          # Avaliação do modelo
│   └── artifacts/           # Modelos salvos
├── tests/
│   ├── __init__.py
│   ├── test_api.py          # Testes da API
│   ├── test_model.py        # Testes do modelo
│   └── test_preprocessing.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── pyproject.toml
└── README.md

Parte 1: Treinamento do Modelo

Definindo o Problema

Vamos treinar um modelo para prever o preço de imóveis baseado em características como área, número de quartos, localização, etc. Usaremos o dataset California Housing do scikit-learn como base.

Script de Treinamento

Crie o arquivo model/train.py:

"""
Script de treinamento do modelo de previsão de preços de imóveis.
Treina, avalia e salva o modelo com pipeline completo de preprocessamento.
"""

import json
import logging
from datetime import datetime
from pathlib import Path

import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

ARTIFACTS_DIR = Path(__file__).parent / "artifacts"
ARTIFACTS_DIR.mkdir(exist_ok=True)


def load_data() -> tuple[pd.DataFrame, pd.Series]:
    """Carrega e prepara os dados para treinamento."""
    logger.info("Carregando dataset California Housing...")
    housing = fetch_california_housing(as_frame=True)
    X = housing.data
    y = housing.target

    # Adicionar features categóricas derivadas para demonstrar preprocessamento
    X = X.copy()
    X["ocean_proximity"] = pd.cut(
        X["Longitude"],
        bins=[-125, -122, -119, -114],
        labels=["coastal", "inland", "desert"],
    )
    X["house_age_category"] = pd.cut(
        X["HouseAge"],
        bins=[0, 15, 30, 52],
        labels=["new", "medium", "old"],
    )

    logger.info(f"Dataset carregado: {X.shape[0]} amostras, {X.shape[1]} features")
    return X, y


def create_preprocessing_pipeline(
    numeric_features: list[str], categorical_features: list[str]
) -> ColumnTransformer:
    """Cria pipeline de preprocessamento com transformações numéricas e categóricas."""
    numeric_transformer = StandardScaler()
    categorical_transformer = OneHotEncoder(drop="first", sparse_output=False, handle_unknown="ignore")

    preprocessor = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )
    return preprocessor


def create_model_pipeline(
    numeric_features: list[str], categorical_features: list[str]
) -> Pipeline:
    """Cria pipeline completo: preprocessamento + modelo."""
    preprocessor = create_preprocessing_pipeline(numeric_features, categorical_features)

    pipeline = Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            (
                "regressor",
                GradientBoostingRegressor(
                    n_estimators=300,
                    max_depth=5,
                    learning_rate=0.1,
                    min_samples_split=10,
                    min_samples_leaf=5,
                    subsample=0.8,
                    random_state=42,
                ),
            ),
        ]
    )
    return pipeline


def evaluate_model(
    pipeline: Pipeline, X_test: pd.DataFrame, y_test: pd.Series
) -> dict:
    """Avalia o modelo e retorna métricas."""
    y_pred = pipeline.predict(X_test)

    metrics = {
        "mae": float(mean_absolute_error(y_test, y_pred)),
        "rmse": float(np.sqrt(mean_squared_error(y_test, y_pred))),
        "r2": float(r2_score(y_test, y_pred)),
        "mape": float(np.mean(np.abs((y_test - y_pred) / y_test)) * 100),
    }

    logger.info("Métricas de avaliação:")
    for name, value in metrics.items():
        logger.info(f"  {name.upper()}: {value:.4f}")

    return metrics


def save_artifacts(
    pipeline: Pipeline,
    metrics: dict,
    feature_names: list[str],
    version: str,
) -> Path:
    """Salva o modelo treinado e metadados."""
    model_path = ARTIFACTS_DIR / f"model_v{version}.joblib"
    metadata_path = ARTIFACTS_DIR / f"metadata_v{version}.json"

    # Salvar modelo
    joblib.dump(pipeline, model_path)
    logger.info(f"Modelo salvo em: {model_path}")

    # Salvar metadados
    metadata = {
        "version": version,
        "trained_at": datetime.now().isoformat(),
        "metrics": metrics,
        "feature_names": feature_names,
        "model_type": "GradientBoostingRegressor",
        "sklearn_version": __import__("sklearn").__version__,
    }

    with open(metadata_path, "w") as f:
        json.dump(metadata, f, indent=2)
    logger.info(f"Metadados salvos em: {metadata_path}")

    # Criar link simbólico para versão latest
    latest_model = ARTIFACTS_DIR / "model_latest.joblib"
    latest_metadata = ARTIFACTS_DIR / "metadata_latest.json"

    if latest_model.exists():
        latest_model.unlink()
    if latest_metadata.exists():
        latest_metadata.unlink()

    latest_model.symlink_to(model_path.name)
    latest_metadata.symlink_to(metadata_path.name)

    return model_path


def train():
    """Pipeline principal de treinamento."""
    logger.info("=" * 60)
    logger.info("Iniciando pipeline de treinamento")
    logger.info("=" * 60)

    # Carregar dados
    X, y = load_data()

    # Definir features
    numeric_features = [
        "MedInc", "HouseAge", "AveRooms", "AveBedrms",
        "Population", "AveOccup", "Latitude", "Longitude",
    ]
    categorical_features = ["ocean_proximity", "house_age_category"]

    # Split treino/teste
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    logger.info(f"Treino: {len(X_train)} amostras | Teste: {len(X_test)} amostras")

    # Criar e treinar pipeline
    pipeline = create_model_pipeline(numeric_features, categorical_features)

    logger.info("Treinando modelo...")
    pipeline.fit(X_train, y_train)

    # Validação cruzada
    cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring="r2")
    logger.info(f"CV R² scores: {cv_scores}")
    logger.info(f"CV R² médio: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")

    # Avaliar no conjunto de teste
    metrics = evaluate_model(pipeline, X_test, y_test)

    # Salvar artefatos
    version = datetime.now().strftime("%Y%m%d_%H%M%S")
    all_features = numeric_features + categorical_features
    save_artifacts(pipeline, metrics, all_features, version)

    logger.info("=" * 60)
    logger.info("Treinamento concluído com sucesso!")
    logger.info("=" * 60)

    return pipeline, metrics


if __name__ == "__main__":
    train()

Execute o treinamento:

cd ml-deploy
python model/train.py

Parte 2: Criando a API com FastAPI

Schemas de Entrada e Saída

Crie o arquivo app/schemas.py:

"""Schemas Pydantic para validação de entrada e saída da API."""

from pydantic import BaseModel, Field


class HousingFeatures(BaseModel):
    """Features de entrada para previsão de preço."""

    MedInc: float = Field(..., ge=0, description="Renda mediana do bloco (em dezenas de milhar)")
    HouseAge: float = Field(..., ge=0, le=52, description="Idade mediana das casas no bloco")
    AveRooms: float = Field(..., ge=0, description="Média de cômodos por domicílio")
    AveBedrms: float = Field(..., ge=0, description="Média de quartos por domicílio")
    Population: float = Field(..., ge=0, description="População do bloco")
    AveOccup: float = Field(..., ge=0, description="Média de ocupantes por domicílio")
    Latitude: float = Field(..., ge=32, le=42, description="Latitude")
    Longitude: float = Field(..., ge=-125, le=-114, description="Longitude")

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "MedInc": 8.3252,
                    "HouseAge": 41.0,
                    "AveRooms": 6.984,
                    "AveBedrms": 1.024,
                    "Population": 322.0,
                    "AveOccup": 2.556,
                    "Latitude": 37.88,
                    "Longitude": -122.23,
                }
            ]
        }
    }


class PredictionResponse(BaseModel):
    """Resposta da previsão."""

    predicted_price: float = Field(..., description="Preço previsto (em centenas de milhar)")
    model_version: str = Field(..., description="Versão do modelo utilizado")
    confidence_interval: dict = Field(
        ..., description="Intervalo de confiança da previsão"
    )


class BatchPredictionRequest(BaseModel):
    """Requisição para previsão em lote."""

    instances: list[HousingFeatures] = Field(
        ..., min_length=1, max_length=1000, description="Lista de instâncias"
    )


class BatchPredictionResponse(BaseModel):
    """Resposta de previsão em lote."""

    predictions: list[PredictionResponse]
    total_instances: int
    model_version: str


class HealthResponse(BaseModel):
    """Resposta do endpoint de health check."""

    status: str
    model_loaded: bool
    model_version: str | None
    uptime_seconds: float

Carregamento do Modelo

Crie o arquivo app/model.py:

"""Gerenciamento do modelo: carregamento, inferência e versionamento."""

import json
import logging
from pathlib import Path

import joblib
import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)

MODEL_DIR = Path(__file__).parent.parent / "model" / "artifacts"


class ModelManager:
    """Gerencia o ciclo de vida do modelo em produção."""

    def __init__(self):
        self.pipeline = None
        self.metadata = None
        self.is_loaded = False

    def load(self, version: str = "latest") -> None:
        """Carrega o modelo e metadados."""
        model_path = MODEL_DIR / f"model_{version}.joblib"
        metadata_path = MODEL_DIR / f"metadata_{version}.json"

        if not model_path.exists():
            raise FileNotFoundError(f"Modelo não encontrado: {model_path}")

        self.pipeline = joblib.load(model_path)

        if metadata_path.exists():
            with open(metadata_path) as f:
                self.metadata = json.load(f)

        self.is_loaded = True
        logger.info(f"Modelo carregado: versão {self.version}")

    @property
    def version(self) -> str:
        if self.metadata:
            return self.metadata.get("version", "unknown")
        return "unknown"

    def predict(self, features: dict) -> dict:
        """Realiza previsão para uma única instância."""
        if not self.is_loaded:
            raise RuntimeError("Modelo não carregado")

        df = self._prepare_dataframe(features)
        prediction = float(self.pipeline.predict(df)[0])

        # Calcular intervalo de confiança aproximado baseado no RMSE do treino
        rmse = self.metadata.get("metrics", {}).get("rmse", 0.5) if self.metadata else 0.5
        ci_lower = prediction - 1.96 * rmse
        ci_upper = prediction + 1.96 * rmse

        return {
            "predicted_price": round(prediction, 4),
            "model_version": self.version,
            "confidence_interval": {
                "lower": round(max(0, ci_lower), 4),
                "upper": round(ci_upper, 4),
                "confidence_level": 0.95,
            },
        }

    def predict_batch(self, instances: list[dict]) -> list[dict]:
        """Realiza previsão para múltiplas instâncias."""
        return [self.predict(instance) for instance in instances]

    def _prepare_dataframe(self, features: dict) -> pd.DataFrame:
        """Prepara DataFrame com features derivadas."""
        df = pd.DataFrame([features])

        df["ocean_proximity"] = pd.cut(
            df["Longitude"],
            bins=[-125, -122, -119, -114],
            labels=["coastal", "inland", "desert"],
        )
        df["house_age_category"] = pd.cut(
            df["HouseAge"],
            bins=[0, 15, 30, 52],
            labels=["new", "medium", "old"],
        )
        return df


# Singleton
model_manager = ModelManager()

Monitoramento

Crie o arquivo app/monitoring.py:

"""Monitoramento de performance e métricas da API."""

import logging
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime

logger = logging.getLogger(__name__)


@dataclass
class PredictionLog:
    """Log de uma previsão individual."""

    timestamp: str
    input_features: dict
    prediction: float
    latency_ms: float


class MonitoringService:
    """Serviço de monitoramento para métricas da API e do modelo."""

    def __init__(self, max_history: int = 10000):
        self.start_time = time.time()
        self.prediction_count = 0
        self.error_count = 0
        self.total_latency = 0.0
        self.prediction_history: deque[PredictionLog] = deque(maxlen=max_history)
        self.prediction_values: deque[float] = deque(maxlen=max_history)

    @property
    def uptime_seconds(self) -> float:
        return time.time() - self.start_time

    @property
    def avg_latency_ms(self) -> float:
        if self.prediction_count == 0:
            return 0.0
        return self.total_latency / self.prediction_count

    def log_prediction(
        self, features: dict, prediction: float, latency_ms: float
    ) -> None:
        """Registra uma previsão para monitoramento."""
        self.prediction_count += 1
        self.total_latency += latency_ms
        self.prediction_values.append(prediction)

        log_entry = PredictionLog(
            timestamp=datetime.now().isoformat(),
            input_features=features,
            prediction=prediction,
            latency_ms=latency_ms,
        )
        self.prediction_history.append(log_entry)

    def log_error(self) -> None:
        """Registra um erro."""
        self.error_count += 1

    def get_stats(self) -> dict:
        """Retorna estatísticas atuais do serviço."""
        recent_predictions = list(self.prediction_values)

        stats = {
            "uptime_seconds": round(self.uptime_seconds, 2),
            "total_predictions": self.prediction_count,
            "total_errors": self.error_count,
            "error_rate": (
                round(self.error_count / max(1, self.prediction_count + self.error_count), 4)
            ),
            "avg_latency_ms": round(self.avg_latency_ms, 2),
        }

        if recent_predictions:
            import numpy as np

            stats["prediction_stats"] = {
                "mean": round(float(np.mean(recent_predictions)), 4),
                "std": round(float(np.std(recent_predictions)), 4),
                "min": round(float(np.min(recent_predictions)), 4),
                "max": round(float(np.max(recent_predictions)), 4),
            }

        return stats


# Singleton
monitoring = MonitoringService()

Aplicação FastAPI Principal

Crie o arquivo app/main.py:

"""
API de Machine Learning para previsão de preços de imóveis.
Deploy-ready com FastAPI, monitoramento e validação robusta.
"""

import logging
import time
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware

from app.model import model_manager
from app.monitoring import monitoring
from app.schemas import (
    BatchPredictionRequest,
    BatchPredictionResponse,
    HealthResponse,
    HousingFeatures,
    PredictionResponse,
)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Gerencia o ciclo de vida da aplicação."""
    logger.info("Iniciando aplicação...")
    try:
        model_manager.load("latest")
        logger.info("Modelo carregado com sucesso!")
    except FileNotFoundError:
        logger.error("Modelo não encontrado! Treine o modelo antes de iniciar a API.")
        raise
    yield
    logger.info("Encerrando aplicação...")


app = FastAPI(
    title="ML Housing Price Prediction API",
    description="API para previsão de preços de imóveis usando Machine Learning",
    version="1.0.0",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Verifica a saúde da API e do modelo."""
    return HealthResponse(
        status="healthy",
        model_loaded=model_manager.is_loaded,
        model_version=model_manager.version,
        uptime_seconds=monitoring.uptime_seconds,
    )


@app.post("/predict", response_model=PredictionResponse)
async def predict(features: HousingFeatures):
    """Realiza previsão para uma única instância."""
    start_time = time.time()
    try:
        result = model_manager.predict(features.model_dump())
        latency_ms = (time.time() - start_time) * 1000

        monitoring.log_prediction(
            features=features.model_dump(),
            prediction=result["predicted_price"],
            latency_ms=latency_ms,
        )

        return PredictionResponse(**result)
    except Exception as e:
        monitoring.log_error()
        logger.error(f"Erro na previsão: {e}")
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
    """Realiza previsão para múltiplas instâncias."""
    start_time = time.time()
    try:
        instances = [inst.model_dump() for inst in request.instances]
        results = model_manager.predict_batch(instances)
        latency_ms = (time.time() - start_time) * 1000

        predictions = [PredictionResponse(**r) for r in results]

        for inst, result in zip(instances, results):
            monitoring.log_prediction(
                features=inst,
                prediction=result["predicted_price"],
                latency_ms=latency_ms / len(instances),
            )

        return BatchPredictionResponse(
            predictions=predictions,
            total_instances=len(predictions),
            model_version=model_manager.version,
        )
    except Exception as e:
        monitoring.log_error()
        logger.error(f"Erro na previsão batch: {e}")
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/metrics")
async def get_metrics():
    """Retorna métricas de monitoramento do serviço."""
    return monitoring.get_stats()


@app.get("/model/info")
async def model_info():
    """Retorna informações sobre o modelo carregado."""
    if not model_manager.is_loaded:
        raise HTTPException(status_code=503, detail="Modelo não carregado")
    return model_manager.metadata

Parte 3: Testes

Crie o arquivo tests/test_api.py:

"""Testes para a API de previsão."""

import pytest
from fastapi.testclient import TestClient

from app.main import app

client = TestClient(app)

VALID_INPUT = {
    "MedInc": 8.3252,
    "HouseAge": 41.0,
    "AveRooms": 6.984,
    "AveBedrms": 1.024,
    "Population": 322.0,
    "AveOccup": 2.556,
    "Latitude": 37.88,
    "Longitude": -122.23,
}


def test_health_check():
    """Testa o endpoint de health check."""
    response = client.get("/health")
    assert response.status_code == 200
    data = response.json()
    assert data["status"] == "healthy"
    assert data["model_loaded"] is True


def test_predict_single():
    """Testa previsão de uma única instância."""
    response = client.post("/predict", json=VALID_INPUT)
    assert response.status_code == 200
    data = response.json()
    assert "predicted_price" in data
    assert "model_version" in data
    assert "confidence_interval" in data
    assert data["predicted_price"] > 0


def test_predict_batch():
    """Testa previsão em lote."""
    request = {"instances": [VALID_INPUT, VALID_INPUT]}
    response = client.post("/predict/batch", json=request)
    assert response.status_code == 200
    data = response.json()
    assert data["total_instances"] == 2
    assert len(data["predictions"]) == 2


def test_predict_invalid_input():
    """Testa validação de entrada inválida."""
    invalid = {**VALID_INPUT, "MedInc": -1}
    response = client.post("/predict", json=invalid)
    assert response.status_code == 422  # Validation error


def test_metrics():
    """Testa o endpoint de métricas."""
    # Fazer uma previsão primeiro
    client.post("/predict", json=VALID_INPUT)
    response = client.get("/metrics")
    assert response.status_code == 200
    data = response.json()
    assert "total_predictions" in data
    assert data["total_predictions"] >= 1


def test_model_info():
    """Testa o endpoint de informações do modelo."""
    response = client.get("/model/info")
    assert response.status_code == 200
    data = response.json()
    assert "version" in data
    assert "metrics" in data

Execute os testes:

pytest tests/ -v

Parte 4: Docker e Docker Compose

Dockerfile

Crie o Dockerfile:

FROM python:3.11-slim

WORKDIR /app

# Instalar dependências do sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Copiar requirements e instalar dependências Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copiar código da aplicação
COPY app/ app/
COPY model/ model/

# Criar usuário não-root
RUN useradd --create-home appuser
USER appuser

# Expor porta
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"

# Comando de inicialização
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

Docker Compose

Crie o docker-compose.yml:

version: "3.8"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=info
      - WORKERS=2
    volumes:
      - ./model/artifacts:/app/model/artifacts:ro
    restart: unless-stopped
    deploy:
      resources:
        limits:
          memory: 1G
          cpus: "1.0"

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    restart: unless-stopped

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    restart: unless-stopped

Requirements

Crie o requirements.txt:

fastapi==0.115.0
uvicorn[standard]==0.32.0
scikit-learn==1.5.2
pandas==2.2.3
numpy==2.1.3
joblib==1.4.2
pydantic==2.10.0
httpx==0.28.0
pytest==8.3.4

Parte 5: Deploy e Execução

Build e Execução Local

# Treinar o modelo primeiro
python model/train.py

# Executar localmente (desenvolvimento)
uvicorn app.main:app --reload --port 8000

# Ou com Docker
docker build -t ml-api .
docker run -p 8000:8000 ml-api

# Ou com Docker Compose (inclui monitoramento)
docker-compose up -d

Testando a API

# Health check
curl http://localhost:8000/health

# Previsão individual
curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{
    "MedInc": 8.3252,
    "HouseAge": 41.0,
    "AveRooms": 6.984,
    "AveBedrms": 1.024,
    "Population": 322.0,
    "AveOccup": 2.556,
    "Latitude": 37.88,
    "Longitude": -122.23
  }'

# Métricas
curl http://localhost:8000/metrics

# Documentação interativa
# Acesse http://localhost:8000/docs no navegador

Parte 6: Boas Práticas de MLOps

Versionamento de Modelos

Sempre versione seus modelos com metadados completos:

  • Data de treinamento
  • Métricas de avaliação
  • Parâmetros de treinamento
  • Versão das bibliotecas
  • Hash do dataset utilizado

Monitoramento de Data Drift

Em produção, monitore se os dados de entrada estão mudando em relação aos dados de treinamento. Isso pode indicar que o modelo precisa ser retreinado:

def detect_drift(training_stats: dict, incoming_data: pd.DataFrame) -> dict:
    """Detecta drift entre dados de treino e dados em produção."""
    drift_report = {}
    for feature, stats in training_stats.items():
        if feature in incoming_data.columns:
            current_mean = incoming_data[feature].mean()
            training_mean = stats["mean"]
            training_std = stats["std"]
            z_score = abs(current_mean - training_mean) / max(training_std, 1e-8)
            drift_report[feature] = {
                "z_score": round(z_score, 4),
                "drifted": z_score > 3.0,
                "training_mean": training_mean,
                "current_mean": round(current_mean, 4),
            }
    return drift_report

Rollback de Modelos

Sempre mantenha a versão anterior do modelo disponível para rollback rápido em caso de problemas. O sistema de versionamento que implementamos facilita isso — basta alterar o symlink model_latest.joblib para apontar para a versão desejada.


Conclusão

Neste tutorial, construímos um sistema completo de deploy de ML que inclui:

  1. Treinamento estruturado com pipeline de preprocessamento integrado ao modelo.
  2. API robusta com FastAPI, validação Pydantic e documentação automática.
  3. Containerização com Docker e Docker Compose para deploy consistente.
  4. Testes automatizados para garantir qualidade antes do deploy.
  5. Monitoramento com métricas de latência, erros e estatísticas de previsão.

Este é um ponto de partida sólido que pode ser expandido com autenticação, CI/CD completo, A/B testing de modelos, e integração com plataformas de MLOps como MLflow ou Weights & Biases.


Próximos Passos

  • Adicionar autenticação JWT à API.
  • Implementar CI/CD com GitHub Actions.
  • Configurar dashboards no Grafana para monitoramento visual.
  • Explorar deploy em cloud (AWS ECS, Google Cloud Run, Azure Container Instances).
  • Implementar A/B testing entre versões de modelos.
  • Integrar com MLflow para rastreamento de experimentos.

Acompanhe o Inteligência em Código para mais tutoriais sobre MLOps e engenharia de machine learning.