TypeScript e Workers

1. Introdução aos Workers no Ecossistema TypeScript

Workers são mecanismos que permitem executar código em threads separadas da thread principal, possibilitando processamento paralelo sem bloquear a interface do usuário ou o loop de eventos. No ecossistema TypeScript, existem três principais tipos de Workers:

  • Web Workers: Executam scripts em segundo plano no navegador, com comunicação via postMessage e eventos message.
  • Service Workers: Atuam como proxies entre o navegador e a rede, interceptando requisições e gerenciando cache.
  • Worker Threads: Implementação nativa do Node.js para paralelismo real, com acesso a require e módulos.

O principal desafio ao usar Workers com TypeScript é garantir a tipagem segura da comunicação entre threads. Como postMessage serializa objetos em JSON (ou usa transferência de buffers), perder informações de tipo é comum sem uma abordagem estruturada.

2. Configuração de Projeto TypeScript para Workers

Para configurar um projeto TypeScript com Workers, é necessário compilar o código do worker separadamente do código principal. Uma estrutura recomendada:

projeto/
├── src/
│   ├── main.ts
│   └── workers/
│       └── process.worker.ts
├── tsconfig.json
├── tsconfig.worker.json
└── package.json

tsconfig.worker.json — configuração específica para workers:

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "ESNext",
    "lib": ["ES2022", "WebWorker"],
    "outDir": "./dist/workers",
    "strict": true,
    "moduleResolution": "bundler"
  },
  "include": ["src/workers/**/*"]
}

tsconfig.json — configuração principal:

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "ESNext",
    "lib": ["ES2022", "DOM"],
    "outDir": "./dist",
    "strict": true,
    "moduleResolution": "bundler"
  },
  "include": ["src/main.ts"]
}

Para bundlers como Webpack ou Vite, o suporte a Workers é nativo. No Vite, basta importar o worker com ?worker:

import ProcessWorker from './workers/process.worker?worker'
const worker = new ProcessWorker()

3. Tipagem da Comunicação entre Main Thread e Worker

A comunicação entre threads é feita através de mensagens serializadas. Para garantir type safety, definimos interfaces específicas:

// types.ts
export type WorkerRequest = 
  | { type: "process"; data: number[] }
  | { type: "cancel"; taskId: string }
  | { type: "status" }

export type WorkerResponse =
  | { type: "result"; taskId: string; output: number[] }
  | { type: "error"; taskId: string; message: string }
  | { type: "progress"; taskId: string; percentage: number }

// main.ts
const worker = new Worker(new URL('./workers/process.worker.ts', import.meta.url))

worker.postMessage({ type: "process", data: [1, 2, 3] } satisfies WorkerRequest)

worker.onmessage = (event: MessageEvent<WorkerResponse>) => {
  const response = event.data
  switch (response.type) {
    case "result":
      console.log("Processado:", response.output)
      break
    case "error":
      console.error("Erro:", response.message)
      break
  }
}

No worker:

// workers/process.worker.ts
import type { WorkerRequest, WorkerResponse } from '../types'

self.onmessage = (event: MessageEvent<WorkerRequest>) => {
  const request = event.data

  if (request.type === "process") {
    const result: WorkerResponse = {
      type: "result",
      taskId: crypto.randomUUID(),
      output: request.data.map(x => x * 2)
    }
    self.postMessage(result)
  }
}

4. Wrappers Tipados para Workers

Para facilitar o uso, podemos criar um wrapper genérico que transforma a comunicação baseada em eventos em Promises:

// TypedWorker.ts
export class TypedWorker<TIn, TOut> {
  private worker: Worker
  private pending = new Map<string, { 
    resolve: (value: TOut) => void
    reject: (reason: any) => void
  }>()
  private idCounter = 0

  constructor(workerScript: string | URL) {
    this.worker = new Worker(workerScript)
    this.worker.onmessage = (event: MessageEvent<{ id: string; data: TOut | { error: string } }>) => {
      const { id, data } = event.data
      const pending = this.pending.get(id)
      if (!pending) return

      if ('error' in data) {
        pending.reject(new Error(data.error))
      } else {
        pending.resolve(data)
      }
      this.pending.delete(id)
    }
  }

  async send(message: TIn, timeout = 5000): Promise<TOut> {
    return new Promise((resolve, reject) => {
      const id = `msg_${++this.idCounter}`
      this.pending.set(id, { resolve, reject })

      const timeoutId = setTimeout(() => {
        this.pending.delete(id)
        reject(new Error(`Timeout após ${timeout}ms`))
      }, timeout)

      this.worker.postMessage({ id, data: message })

      // Limpa timeout ao resolver
      const originalResolve = resolve
      this.pending.set(id, {
        resolve: (value) => {
          clearTimeout(timeoutId)
          originalResolve(value)
        },
        reject: (reason) => {
          clearTimeout(timeoutId)
          reject(reason)
        }
      })
    })
  }

  terminate() {
    this.worker.terminate()
  }
}

// Uso prático
interface FilterRequest { kernel: number[]; imageData: ImageData }
interface FilterResult { filteredData: ImageData; processingTime: number }

const worker = new TypedWorker<FilterRequest, FilterResult>('./image-filter.worker.ts')

async function applyFilter(imageData: ImageData) {
  try {
    const result = await worker.send({
      kernel: [0, -1, 0, -1, 5, -1, 0, -1, 0],
      imageData
    })
    console.log(`Processado em ${result.processingTime}ms`)
    return result.filteredData
  } catch (error) {
    console.error("Falha no processamento:", error)
    throw error
  }
}

5. Pool de Workers com Tipagem Segura

Para processamento paralelo de grandes volumes de dados, um pool de workers tipado é essencial:

// WorkerPool.ts
export class WorkerPool<TIn, TOut> {
  private workers: TypedWorker<TIn, TOut>[]
  private queue: { input: TIn; resolve: (value: TOut) => void; reject: (reason: any) => void }[] = []
  private activeCount = 0

  constructor(workerScript: string | URL, poolSize = navigator.hardwareConcurrency || 4) {
    this.workers = Array.from({ length: poolSize }, () => new TypedWorker<TIn, TOut>(workerScript))
  }

  async execute(input: TIn): Promise<TOut> {
    return new Promise((resolve, reject) => {
      this.queue.push({ input, resolve, reject })
      this.processNext()
    })
  }

  private processNext() {
    if (this.activeCount >= this.workers.length || this.queue.length === 0) return

    const worker = this.workers[this.activeCount % this.workers.length]
    const task = this.queue.shift()!
    this.activeCount++

    worker.send(task.input)
      .then(task.resolve)
      .catch(task.reject)
      .finally(() => {
        this.activeCount--
        this.processNext()
      })
  }

  async executeAll(inputs: TIn[]): Promise<TOut[]> {
    return Promise.all(inputs.map(input => this.execute(input)))
  }

  terminate() {
    this.workers.forEach(w => w.terminate())
  }
}

// Exemplo de uso
interface ChunkData { start: number; end: number; values: number[] }
interface AnalysisResult { sum: number; average: number; max: number }

const pool = new WorkerPool<ChunkData, AnalysisResult>('./analyzer.worker.ts', 4)

async function analyzeLargeDataset(data: number[]) {
  const chunkSize = Math.ceil(data.length / 4)
  const chunks: ChunkData[] = []

  for (let i = 0; i < data.length; i += chunkSize) {
    chunks.push({
      start: i,
      end: Math.min(i + chunkSize, data.length),
      values: data.slice(i, i + chunkSize)
    })
  }

  const results = await pool.executeAll(chunks)

  return {
    totalSum: results.reduce((acc, r) => acc + r.sum, 0),
    overallAverage: results.reduce((acc, r) => acc + r.average, 0) / results.length,
    globalMax: Math.max(...results.map(r => r.max))
  }
}

6. Patterns Avançados: SharedArrayBuffer e Transferables

Para dados grandes, SharedArrayBuffer permite memória compartilhada entre threads sem serialização:

// worker.ts
self.onmessage = (event: MessageEvent<{ buffer: SharedArrayBuffer; operation: string }>) => {
  const { buffer } = event.data
  const view = new Float64Array(buffer)

  // Processamento concorrente
  for (let i = 0; i < view.length; i++) {
    view[i] = Math.sqrt(view[i])
  }

  // Sinaliza conclusão
  Atomics.store(new Int32Array(buffer, buffer.byteLength - 4), 0, 1)
  Atomics.notify(new Int32Array(buffer, buffer.byteLength - 4), 0)
}

// main.ts
const buffer = new SharedArrayBuffer(1024 * 1024 * 10 + 4) // 10MB + flag
const dataView = new Float64Array(buffer, 0, 1024 * 1024 * 10 / 8)
const flagView = new Int32Array(buffer, buffer.byteLength - 4, 1)

// Preenche dados
for (let i = 0; i < dataView.length; i++) {
  dataView[i] = Math.random() * 1000
}

worker.postMessage({ buffer, operation: "sqrt" })

// Aguarda conclusão
Atomics.wait(flagView, 0, 0)
console.log("Processamento concluído")

7. Workers em Node.js com TypeScript

No Node.js, a API é diferente:

// worker.ts (Node.js)
import { parentPort, workerData } from 'worker_threads'
import type { Stream } from 'stream'

interface ProcessRequest {
  chunkSize: number
  encoding: BufferEncoding
}

if (parentPort) {
  parentPort.on('message', (msg: ProcessRequest) => {
    const { chunkSize } = msg
    // Processa dados recebidos via workerData
    const inputData = workerData as Buffer

    const result = inputData.toString('utf-8').toUpperCase()
    parentPort.postMessage(result)
  })
}

// main.ts (Node.js)
import { Worker } from 'worker_threads'
import { fileURLToPath } from 'url'
import path from 'path'

const __dirname = path.dirname(fileURLToPath(import.meta.url))

const worker = new Worker(path.join(__dirname, 'worker.ts'), {
  workerData: Buffer.from('dados para processar'),
  eval: false
})

worker.on('message', (result: string) => {
  console.log('Resultado:', result)
})

worker.postMessage({ chunkSize: 1024, encoding: 'utf-8' })

8. Testes e Debugging de Workers Tipados

Para testar workers isoladamente, podemos mockar a comunicação:

// __tests__/worker.test.ts
import { describe, it, expect, vi } from 'vitest'

function createMockWorker() {
  const listeners = new Map<string, Function[]>()

  return {
    postMessage: vi.fn((msg) => {
      // Simula processamento
      const response = { id: msg.id, data: msg.data.map((x: number) => x * 2) }
      setTimeout(() => {
        const messageListeners = listeners.get('message') || []
        messageListeners.forEach(fn => fn({ data: response }))
      }, 10)
    }),
    onmessage: null as Function | null,
    addEventListener: vi.fn((event: string, handler: Function) => {
      if (!listeners.has(event)) listeners.set(event, [])
      listeners.get(event)!.push(handler)
    }),
    terminate: vi.fn()
  }
}

describe('TypedWorker', () => {
  it('deve processar mensagens corretamente', async () => {
    const mockWorker = createMockWorker()
    const worker = new TypedWorker<number[], number[]>('./dummy')

    // Substitui worker interno
    (worker as any).worker = mockWorker

    const result = await worker.send([1, 2, 3])
    expect(result).toEqual([2, 4, 6])
    expect(mockWorker.postMessage).toHaveBeenCalled()
  })
})

Para debugging, use source maps com a flag --enable-source-maps no Node.js, ou configure o Chrome DevTools para workers no navegador.

Referências