Como construir um pipeline RAG completo do zero com LangChain

1. Fundamentos do RAG e Visão Geral do Pipeline

RAG (Retrieval-Augmented Generation) é uma arquitetura que combina recuperação de informações com geração de texto, permitindo que modelos de linguagem acessem conhecimento externo atualizado sem necessidade de fine-tuning. A abordagem resolve problemas críticos como alucinações, dados desatualizados e falta de fontes verificáveis.

A arquitetura típica de um pipeline RAG segue quatro etapas principais:

  1. Ingestão: carregamento e divisão de documentos brutos
  2. Indexação: geração de embeddings e armazenamento vetorial
  3. Recuperação: busca semântica no banco de vetores
  4. Geração: combinação do contexto recuperado com o LLM para produzir respostas

O LangChain oferece componentes modulares para cada etapa: Document Loaders, Text Splitters, Vector Stores, Retrievers e Chains.

2. Preparação do Ambiente e Configuração Inicial

Instale as dependências necessárias:

pip install langchain langchain-community langchain-chroma chromadb
pip install pypdf python-dotenv
pip install sentence-transformers

Para usar modelos locais com Ollama (recomendado para desenvolvimento sem custos):

pip install langchain-ollama

Configure as variáveis de ambiente em um arquivo .env:

# Para OpenAI (opcional)
OPENAI_API_KEY=sua_chave_aqui

# Para Hugging Face (opcional)
HUGGINGFACEHUB_API_TOKEN=seu_token_aqui

Crie o arquivo de configuração inicial:

import os
from dotenv import load_dotenv

load_dotenv()

# Configuração do modelo de embeddings
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# Ou para OpenAI: "text-embedding-ada-002"

# Configuração do LLM
LLM_MODEL = "llama3.2"  # ou "gpt-4o-mini", "mistral", etc.

3. Etapa de Ingestão: Carregamento e Divisão de Documentos

Carregue documentos de diferentes formatos:

from langchain_community.document_loaders import (
    PyPDFLoader,
    TextLoader,
    UnstructuredMarkdownLoader,
    WebBaseLoader
)

# Carregar PDF
pdf_loader = PyPDFLoader("documento.pdf")
pdf_docs = pdf_loader.load()

# Carregar arquivo texto
txt_loader = TextLoader("artigo.txt", encoding="utf-8")
txt_docs = txt_loader.load()

# Carregar página web
web_loader = WebBaseLoader("https://exemplo.com/artigo")
web_docs = web_loader.load()

Divida os documentos em chunks otimizados:

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " ", ""],
    length_function=len
)

all_docs = pdf_docs + txt_docs + web_docs
chunks = text_splitter.split_documents(all_docs)

# Adicionar metadados personalizados
for i, chunk in enumerate(chunks):
    chunk.metadata["chunk_id"] = i
    chunk.metadata["source_type"] = "pdf" if "pdf" in chunk.metadata.get("source", "") else "text"

4. Etapa de Indexação: Criação do Vector Store

Gere embeddings e armazene no ChromaDB:

from langchain_chroma import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
# Para Ollama: from langchain_ollama import OllamaEmbeddings

# Inicializar modelo de embeddings local
embeddings = HuggingFaceEmbeddings(
    model_name=EMBEDDING_MODEL,
    model_kwargs={"device": "cpu"},
    encode_kwargs={"normalize_embeddings": True}
)

# Criar vector store persistente
vector_store = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

# Para indexação incremental
def add_documents_to_store(new_docs):
    new_chunks = text_splitter.split_documents(new_docs)
    vector_store.add_documents(new_chunks)

# Indexação persistente
vector_store.persist()
print(f"Total de chunks indexados: {vector_store._collection.count()}")

5. Etapa de Recuperação: Busca Semântica e Filtragem

Configure diferentes estratégias de recuperação:

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# Retriever básico por similaridade
retriever_similarity = vector_store.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 4}
)

# Retriever com MMR (Maximum Marginal Relevance) para diversidade
retriever_mmr = vector_store.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 5,
        "fetch_k": 20,
        "lambda_mult": 0.5
    }
)

# Retriever com filtro por metadados
retriever_filtered = vector_store.as_retriever(
    search_kwargs={
        "k": 3,
        "filter": {"source_type": "pdf"}
    }
)

# Retriever com compressão contextual (remove chunks irrelevantes)
llm = ChatOllama(model=LLM_MODEL)  # ou ChatOpenAI
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever_similarity
)

6. Etapa de Geração: Prompt Engineering e Chain Final

Construa o prompt RAG e a chain completa:

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import ChatOllama
# Para OpenAI: from langchain_openai import ChatOpenAI

# Template do prompt RAG
rag_prompt = ChatPromptTemplate.from_messages([
    ("system", """Você é um assistente especializado em responder perguntas com base no contexto fornecido.

Regras:
1. Use APENAS as informações do contexto para responder
2. Se o contexto não contiver a resposta, diga "Não encontrei essa informação no material disponível"
3. Cite as fontes quando possível
4. Seja conciso e direto

Contexto:
{context}"""),
    ("human", "{question}")
])

# Inicializar LLM
llm = ChatOllama(
    model=LLM_MODEL,
    temperature=0.3,
    num_predict=2048
)
# Para OpenAI: llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)

# Função para formatar documentos recuperados
def format_docs(docs):
    return "\n\n".join(f"Fonte: {doc.metadata.get('source', 'desconhecida')}\n{doc.page_content}" for doc in docs)

# Chain completa com LCEL
rag_chain = (
    {"context": retriever_similarity | format_docs, "question": RunnablePassthrough()}
    | rag_prompt
    | llm
    | StrOutputParser()
)

7. Otimização e Boas Práticas em Produção

Implemente cache e monitoramento:

from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
import hashlib
import json

# Cache de embeddings
set_llm_cache(InMemoryCache())

# Cache de respostas para perguntas repetidas
response_cache = {}

def cached_rag_chain(question):
    question_hash = hashlib.md5(question.encode()).hexdigest()

    if question_hash in response_cache:
        return response_cache[question_hash]

    response = rag_chain.invoke(question)
    response_cache[question_hash] = response
    return response

# Tratamento de erros
def safe_rag_query(question):
    try:
        # Verificar se há documentos relevantes
        docs = retriever_similarity.get_relevant_documents(question)
        if not docs or all(doc.page_content.strip() == "" for doc in docs):
            return "Não encontrei documentos relevantes para responder sua pergunta."

        # Verificar tamanho do contexto
        context_size = sum(len(doc.page_content) for doc in docs)
        if context_size > 8000:  # Limite de tokens
            docs = docs[:3]  # Reduzir para evitar estouro de contexto

        return rag_chain.invoke(question)

    except Exception as e:
        return f"Erro ao processar a pergunta: {str(e)}"

8. Exemplo Completo: Pipeline RAG do Zero

Pipeline completo integrando todas as etapas:

import os
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

load_dotenv()

# 1. Configuração
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
LLM_MODEL = "llama3.2"
DATA_DIR = "./data"
DB_DIR = "./chroma_db"

# 2. Ingestão
def load_and_split_documents(file_path):
    loader = TextLoader(file_path, encoding="utf-8")
    documents = loader.load()

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
        separators=["\n\n", "\n", ".", " ", ""]
    )
    return text_splitter.split_documents(documents)

# 3. Indexação
def create_vector_store(chunks):
    embeddings = HuggingFaceEmbeddings(
        model_name=EMBEDDING_MODEL,
        model_kwargs={"device": "cpu"}
    )

    vector_store = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=DB_DIR
    )
    vector_store.persist()
    return vector_store

# 4. Recuperação
def setup_retriever(vector_store):
    return vector_store.as_retriever(
        search_type="similarity",
        search_kwargs={"k": 4}
    )

# 5. Geração
def setup_rag_chain(retriever):
    llm = ChatOllama(model=LLM_MODEL, temperature=0.3)

    prompt = ChatPromptTemplate.from_messages([
        ("system", "Use o contexto abaixo para responder à pergunta. Se não souber, diga que não sabe.\nContexto: {context}"),
        ("human", "{question}")
    ])

    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)

    return (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

# 6. Execução
if __name__ == "__main__":
    # Carregar documentos
    print("Carregando documentos...")
    chunks = load_and_split_documents(os.path.join(DATA_DIR, "artigo.txt"))
    print(f"{len(chunks)} chunks criados")

    # Indexar
    print("Indexando vetores...")
    vector_store = create_vector_store(chunks)

    # Configurar retriever e chain
    retriever = setup_retriever(vector_store)
    rag_chain = setup_rag_chain(retriever)

    # Testar
    perguntas = [
        "Qual é o conceito principal do artigo?",
        "Quais são as principais contribuições?",
        "Como esse conhecimento pode ser aplicado?"
    ]

    for pergunta in perguntas:
        print(f"\nPergunta: {pergunta}")
        resposta = rag_chain.invoke(pergunta)
        print(f"Resposta: {resposta}")

Para produção, considere:

  • Monitoramento: implemente logging com LangSmith ou ferramentas similares
  • Escalabilidade: use bancos vetoriais como Pinecone, Weaviate ou Qdrant
  • Qualidade: avalie métricas como faithfulness, answer relevancy e context precision
  • Memória: adicione histórico de conversas com ConversationBufferMemory
  • Agentes: combine RAG com ferramentas para busca na web, APIs e bancos de dados

Referências

  • LangChain Documentation: RAG — Guia oficial da LangChain para construção de sistemas RAG, incluindo exemplos práticos com diferentes componentes
  • ChromaDB Documentation — Documentação completa do banco vetorial ChromaDB, com exemplos de persistência e consulta
  • Ollama Documentation — Repositório oficial do Ollama para execução local de modelos LLM, incluindo instruções de instalação e uso
  • HuggingFace Embeddings Guide — Guia da Hugging Face sobre modelos de embeddings, com lista de modelos recomendados para RAG
  • LangChain Expression Language (LCEL) — Documentação oficial sobre LCEL, a linguagem de expressão para construir chains modulares e reutilizáveis
  • RAG from Scratch (LangChain Blog) — Série de artigos técnicos do time LangChain explicando cada componente do RAG em profundidade
  • Pinecone RAG Guide — Guia completo da Pinecone sobre arquiteturas RAG, incluindo melhores práticas para indexação e recuperação