Streaming bidirecional com gRPC

1. Introdução ao Streaming Bidirecional no gRPC

O gRPC oferece quatro tipos de comunicação entre cliente e servidor: unário (requisição-resposta simples), server-side streaming (cliente envia uma requisição e recebe múltiplas respostas), client-side streaming (cliente envia múltiplas requisições e recebe uma resposta) e streaming bidirecional.

No streaming bidirecional, ambas as partes podem enviar e receber mensagens de forma independente e simultânea. Diferente dos outros tipos, não há sincronização obrigatória entre as mensagens — o servidor pode processar e responder enquanto o cliente continua enviando novos dados. Isso é possível porque cada stream utiliza duas goroutines independentes (uma para leitura, outra para escrita), operando sobre o mesmo canal de comunicação.

Casos de uso típicos incluem:
- Chat em tempo real: múltiplos usuários trocam mensagens simultaneamente
- Jogos multiplayer: atualizações contínuas de posição e ações dos jogadores
- Monitoramento contínuo: servidor envia alertas enquanto cliente envia comandos de configuração
- Processamento de dados em pipeline: cliente envia lotes de dados e servidor retorna resultados parciais

2. Definindo o Serviço com Protocol Buffers

Criamos um arquivo chat.proto que define um serviço de chat com streaming bidirecional:

syntax = "proto3";

package chat;

option go_package = "github.com/exemplo/chatpb";

message ChatMessage {
  string user = 1;
  string text = 2;
  int64 timestamp = 3;
}

message JoinRequest {
  string user = 1;
}

service ChatService {
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

A assinatura rpc Chat(stream ChatMessage) returns (stream ChatMessage) indica que tanto a requisição quanto a resposta são streams contínuos.

Para gerar o código Golang, execute:

protoc --go_out=. --go-grpc_out=. chat.proto

Isso gera os arquivos chat.pb.go (estruturas das mensagens) e chat_grpc.pb.go (interfaces do serviço e stubs do cliente).

3. Implementação do Servidor em Golang

O servidor deve gerenciar múltiplos clientes conectados simultaneamente. Cada conexão de stream bidirecional recebe sua própria goroutine.

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "sync"
    "time"

    "google.golang.org/grpc"
    pb "github.com/exemplo/chatpb"
)

type chatServer struct {
    pb.UnimplementedChatServiceServer
    mu      sync.RWMutex
    streams map[string]pb.ChatService_ChatServer
}

func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
    // Identifica o usuário pela primeira mensagem
    msg, err := stream.Recv()
    if err != nil {
        return err
    }
    user := msg.User

    s.mu.Lock()
    s.streams[user] = stream
    s.mu.Unlock()

    defer func() {
        s.mu.Lock()
        delete(s.streams, user)
        s.mu.Unlock()
    }()

    // Goroutine para broadcast de mensagens recebidas
    go func() {
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                return
            }
            if err != nil {
                log.Printf("Erro ao receber de %s: %v", user, err)
                return
            }

            // Broadcast para todos os clientes
            s.mu.RLock()
            for _, clientStream := range s.streams {
                if err := clientStream.Send(in); err != nil {
                    log.Printf("Erro ao enviar para cliente: %v", err)
                }
            }
            s.mu.RUnlock()
        }
    }()

    // Mantém o stream ativo
    <-stream.Context().Done()
    return stream.Context().Err()
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Falha ao iniciar listener: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterChatServiceServer(s, &chatServer{
        streams: make(map[string]pb.ChatService_ChatServer),
    })

    log.Println("Servidor gRPC iniciado na porta :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Falha ao servir: %v", err)
    }
}

4. Implementação do Cliente em Golang

O cliente conecta-se ao servidor e gerencia dois fluxos simultâneos: envio de mensagens do usuário e recebimento de mensagens de outros clientes.

package main

import (
    "bufio"
    "context"
    "log"
    "os"
    "sync"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    pb "github.com/exemplo/chatpb"
)

func main() {
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Falha ao conectar: %v", err)
    }
    defer conn.Close()

    client := pb.NewChatServiceClient(conn)
    stream, err := client.Chat(context.Background())
    if err != nil {
        log.Fatalf("Falha ao criar stream: %v", err)
    }

    var wg sync.WaitGroup
    wg.Add(2)

    // Goroutine para envio de mensagens
    go func() {
        defer wg.Done()
        scanner := bufio.NewScanner(os.Stdin)
        log.Print("Digite seu nome: ")
        scanner.Scan()
        user := scanner.Text()

        // Envia primeira mensagem com o nome do usuário
        stream.Send(&pb.ChatMessage{
            User:      user,
            Text:      "entrou no chat",
            Timestamp: time.Now().Unix(),
        })

        for scanner.Scan() {
            text := scanner.Text()
            if text == "/sair" {
                stream.Send(&pb.ChatMessage{
                    User:      user,
                    Text:      "saiu do chat",
                    Timestamp: time.Now().Unix(),
                })
                stream.CloseSend()
                return
            }

            msg := &pb.ChatMessage{
                User:      user,
                Text:      text,
                Timestamp: time.Now().Unix(),
            }

            if err := stream.Send(msg); err != nil {
                log.Printf("Erro ao enviar: %v", err)
                return
            }
        }
    }()

    // Goroutine para recebimento de mensagens
    go func() {
        defer wg.Done()
        for {
            msg, err := stream.Recv()
            if err != nil {
                log.Printf("Erro ao receber: %v", err)
                return
            }
            log.Printf("[%s] %s: %s",
                time.Unix(msg.Timestamp, 0).Format("15:04:05"),
                msg.User,
                msg.Text)
        }
    }()

    wg.Wait()
}

5. Tratamento de Erros e Cancelamento

O gerenciamento correto de erros é crucial em streams bidirecionais:

  • io.EOF: indica que o cliente fechou o stream de envio (CloseSend)
  • context.Canceled: ocorre quando o contexto é cancelado (ex.: timeout ou desconexão)
  • Erros de rede: RecvMsg retorna erro quando a conexão é perdida

Para cancelamento controlado, utilize context.WithCancel:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Em caso de erro crítico, chame cancel()
if err != nil {
    cancel()
    return
}

6. Exemplo Prático: Chat Simples em Tempo Real

O código completo do servidor e cliente apresentados nas seções 3 e 4 formam um chat funcional. Para testar:

  1. Inicie o servidor: go run server.go
  2. Inicie múltiplos clientes em terminais diferentes: go run client.go
  3. Cada cliente digita seu nome e começa a conversar

O servidor faz broadcast de todas as mensagens para todos os clientes conectados, garantindo que todos vejam as mensagens em tempo real.

7. Considerações de Performance e Boas Práticas

  • Concorrência limitada: Use semaphore.Weighted para limitar o número de streams simultâneos e evitar sobrecarga:
import "golang.org/x/sync/semaphore"

var sem = semaphore.NewWeighted(100) // máximo 100 streams concorrentes
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
  • Backpressure: Implemente buffers com capacidade limitada para evitar que streams lentos consumam memória excessiva:
msgChan := make(chan *pb.ChatMessage, 100) // buffer de 100 mensagens
  • Métricas: Integre Prometheus ou OpenTelemetry para monitorar:
  • Número de streams ativos
  • Latência de entrega de mensagens
  • Taxa de erros por stream

  • Versionamento: Para compatibilidade retroativa, nunca remova campos de mensagens existentes. Adicione novos campos com números de campo não utilizados.

Referências