Como usar o Anthropic Claude API em aplicações de produção

1. Introdução à Anthropic Claude API e Casos de Uso em Produção

A Anthropic Claude API oferece acesso aos modelos Claude 3 — Opus, Sonnet e Haiku — cada um otimizado para diferentes equilíbrios entre capacidade, velocidade e custo. Em produção, a diferença entre um protótipo e um sistema robusto é substancial: requisitos de escalabilidade, latência previsível e controle de custos tornam-se críticos.

Casos de uso típicos incluem chatbots de atendimento ao cliente, sumarização automática de documentos jurídicos, análise de contratos, geração de código assistida e moderação de conteúdo. Cada cenário exige configurações específicas de parâmetros e estratégias de tratamento de erros.

2. Autenticação, Configuração e Gerenciamento de Chaves

A chave de API deve ser armazenada em variáveis de ambiente ou em um secrets manager como AWS Secrets Manager ou HashiCorp Vault. Nunca hardcode chaves no código-fonte.

import os
from anthropic import Anthropic

client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

Para rate limiting, implemente retry com backoff exponencial usando a biblioteca tenacity:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def call_claude_safe(messages):
    return client.messages.create(
        model="claude-3-haiku-20240307",
        max_tokens=1024,
        messages=messages
    )

3. Construindo Requisições Robusto e Eficientes

A estrutura de mensagens inclui system prompt, mensagens de usuário e assistente. O system prompt define o comportamento geral do modelo.

system_prompt = "Você é um assistente especializado em documentação técnica. Responda de forma concisa e precisa."

messages = [
    {"role": "user", "content": "Explique o padrão Circuit Breaker em microsserviços."}
]

response = client.messages.create(
    model="claude-3-sonnet-20240229",
    system=system_prompt,
    max_tokens=500,
    temperature=0.3,
    messages=messages
)

Para streaming, reduza a latência percebida:

stream = client.messages.create(
    model="claude-3-haiku-20240307",
    max_tokens=1024,
    messages=messages,
    stream=True
)

for chunk in stream:
    if chunk.type == "content_block_delta":
        print(chunk.delta.text, end="", flush=True)

4. Tratamento de Erros, Timeouts e Resiliência

Erros comuns incluem AuthenticationError (401), RateLimitError (429) e APIStatusError (5xx). Implemente um handler centralizado:

from anthropic import (
    Anthropic,
    APIError,
    APITimeoutError,
    RateLimitError,
    AuthenticationError
)

def call_claude_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.messages.create(
                model="claude-3-haiku-20240307",
                max_tokens=1024,
                messages=messages,
                timeout=30.0
            )
        except RateLimitError:
            if attempt == max_retries - 1:
                raise
            import time
            time.sleep(2 ** attempt + 0.5)  # backoff com jitter
        except APITimeoutError:
            if attempt == max_retries - 1:
                raise
            time.sleep(1)
        except AuthenticationError:
            raise  # não tente novamente
        except APIError as e:
            if e.status_code >= 500:
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)
            else:
                raise

5. Otimização de Custo e Performance

Use cache semântico para respostas repetitivas. Para cache exato, um dicionário simples funciona; para cache semântico, use embeddings e similaridade de cosseno.

import hashlib
import json

cache = {}

def get_cached_or_call(messages):
    key = hashlib.md5(json.dumps(messages, sort_keys=True).encode()).hexdigest()
    if key in cache:
        return cache[key]
    response = client.messages.create(
        model="claude-3-haiku-20240307",
        max_tokens=1024,
        messages=messages
    )
    cache[key] = response
    return response

Para controle de tokens, trunque o histórico de conversa mantendo apenas as últimas N mensagens:

def truncate_history(messages, max_messages=10):
    if len(messages) > max_messages:
        return messages[-max_messages:]
    return messages

6. Segurança, Moderação e Compliance

Antes de enviar dados para a API, filtre informações sensíveis (PII) usando regex ou bibliotecas especializadas:

import re

def sanitize_input(text):
    # Remove números de cartão de crédito
    text = re.sub(r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', '[REDACTED]', text)
    # Remove e-mails
    text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[REDACTED]', text)
    return text

Para auditoria, registre todas as interações em um banco de dados:

import logging
import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("claude_audit")

def log_interaction(user_id, prompt, response, tokens_used):
    logger.info({
        "timestamp": datetime.datetime.utcnow().isoformat(),
        "user_id": user_id,
        "prompt_length": len(prompt),
        "response_length": len(response),
        "tokens_used": tokens_used
    })

7. Monitoramento e Observabilidade em Produção

Métricas essenciais incluem latência por requisição, taxa de erro, tokens consumidos por usuário e custo estimado. Integre com Prometheus:

from prometheus_client import Histogram, Counter, Gauge

request_latency = Histogram(
    'claude_request_latency_seconds',
    'Latência das requisições para Claude API',
    ['model', 'endpoint']
)

error_counter = Counter(
    'claude_error_total',
    'Total de erros na API Claude',
    ['error_type']
)

tokens_gauge = Gauge(
    'claude_tokens_consumed',
    'Tokens consumidos na última hora'
)

8. Estratégias de Deployment e Escalabilidade

Uma arquitetura típica de produção usa API Gateway + fila de mensagens (RabbitMQ, SQS) + workers assíncronos. Para gerenciar concorrência, use connection pooling:

import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

async def process_batch(prompts):
    tasks = []
    for prompt in prompts:
        task = asyncio.create_task(
            async_client.messages.create(
                model="claude-3-haiku-20240307",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            )
        )
        tasks.append(task)
    return await asyncio.gather(*tasks, return_exceptions=True)

Para testes de carga, simule requisições concorrentes com locust ou k6 antes de promover uma nova configuração de parâmetros para produção. Realize canary releases expondo primeiro 10% do tráfego para o novo modelo ou parâmetros.

Referências