Technology Apr 17, 2026 · 6 min read

Como importei 55 milhões de empresas para PostgreSQL em menos de 3 horas

Quando decidi construir o CNPJ Aberto, que é uma plataforma gratuita de consulta de empresas brasileiras, o primeiro desafio foi óbvio: como colocar 55 milhões de registros dentro do PostgreSQL de forma rápida e repetível (a base atualiza todo mês)? Os dados vêm dos Dados Abertos da Receita Federal...

DE
DEV Community
by Pedro Parker
Como importei 55 milhões de empresas para PostgreSQL em menos de 3 horas

Quando decidi construir o CNPJ Aberto, que é uma plataforma gratuita de consulta de empresas brasileiras, o primeiro desafio foi óbvio: como colocar 55 milhões de registros dentro do PostgreSQL de forma rápida e repetível (a base atualiza todo mês)?

Os dados vêm dos Dados Abertos da Receita Federal, distribuídos em dezenas de ZIPs com CSVs em latin-1, separados por ;, com campos inconsistentes (muito inconsistentes!!).

A ideia deste post rápido é mostrar as técnicas que transformaram uma importação de 12+ horas em menos de 3 horas.

O cenário

Tabela Registros aproximados Peso CSV
empresas ~55M ~4 GB
estabelecimentos ~70M ~15 GB
socios ~25M ~3 GB
simples ~35M ~2 GB
Tabelas auxiliares ~15K total <1 MB

Total: ~25 GB de CSVs descompactados, distribuídos em ~40 arquivos ZIP. Nestes ponto é visível que os maiores problemas seriam empresas e estabelecimentos, não só para querys como para joins, que são muitos para criar um sistema legal e atrativo.

Tentativa 1: INSERT com ORM (12+ horas)

A abordagem ingênua com SQLAlchemy:

for row in csv_reader:
    empresa = Empresa(**parse_row(row))
    session.add(empresa)
    if i % 1000 == 0:
        session.commit()

Resultado: ~1.200 inserts/segundo. Para 55M de registros, isso dá ~12 horas só para a tabela empresas. Inaceitável. Eu acabei ingerindo tudo pois imaginava que > 12 horas de espera seirma melhores que > X horas de implementação e pesquisa para melhoria de queries. Talvez fossem, mas pelo desafio, fui além.

O ORM adiciona overhead em cada objeto: validação de tipo, tracking de estado, construção de SQL dinâmico.

Para melhorias, a IA acaba ajudando um pouco, não muito, ela alucinada bastante na ajuda e as vezes acaba piorando querys e criando index sem sentido, usei o Opus 4.6. Muito útil, mas o double check é necessário.

Tentativa 2: executemany com batches (4+ horas)

Removendo o ORM e usando psycopg2 direto:

BATCH_SIZE = 5000
batch = []
for row in csv_reader:
    batch.append(parse_row(row))
    if len(batch) >= BATCH_SIZE:
        cursor.executemany(INSERT_SQL, batch)
        conn.commit()
        batch.clear()

Resultado: ~4.000 inserts/segundo. Melhor, mas ainda 4+ horas.

O problema: executemany ainda gera um INSERT por linha. O PostgreSQL parseia e planeja cada statement individualmente.

Solução final: COPY + Temp Tables (< 3 horas)

O COPY é o mecanismo de bulk load nativo do PostgreSQL. Ele bypassa o parser SQL, o planner e o executor — escrevendo direto no heap da tabela. É 10-50x mais rápido que INSERT.

Passo 1: Otimizar a sessão

cursor.execute("SET synchronous_commit = off")
cursor.execute("SET work_mem = '256MB'")

synchronous_commit = off permite que o PostgreSQL confirme transações sem esperar o flush do WAL para disco. Seguro para data loads (se o servidor crashar, você reimporta). Como nesse caso os dados não são tão importantes, é possível reimportar sem precisar fazer check de nada.

Passo 2: Dropar indexes antes, recriar depois

Indexes tornam cada INSERT mais caro porque o B-tree/GIN precisa ser atualizado. Para bulk load, é mais eficiente dropar tudo, importar, e recriar:

def drop_indexes(cursor):
    cursor.execute("""
        SELECT indexname, tablename FROM pg_indexes 
        WHERE schemaname = 'public' 
        AND indexname NOT LIKE '%_pkey'
    """)
    for idx, table in cursor.fetchall():
        cursor.execute(f"DROP INDEX IF EXISTS {idx}")

def create_indexes(cursor):
    # Recriar com CONCURRENTLY para não bloquear reads
    cursor.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS 
        ix_empresas_razao_trgm ON empresas 
        USING gin (razao_social gin_trgm_ops)
    """)
    # ... mais indexes

CREATE INDEX CONCURRENTLY é crucial — permite que o site continue respondendo enquanto os indexes são construídos.

Passo 3: COPY via temp table + UPSERT

Para tabelas que precisam de upsert (atualização mensal), usamos temp tables:

BATCH_SIZE = 200_000

def import_batch(cursor, table, columns, rows):
    # 1. Criar temp table com mesma estrutura
    cursor.execute(f"CREATE TEMP TABLE tmp_{table} (LIKE {table} INCLUDING DEFAULTS)")

    # 2. COPY os dados para a temp table
    csv_buffer = io.StringIO()
    writer = csv.writer(csv_buffer)
    for row in rows:
        writer.writerow(row)
    csv_buffer.seek(0)

    cursor.copy_expert(
        f"COPY tmp_{table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv, NULL '')",
        csv_buffer
    )

    # 3. UPSERT da temp table para a tabela real
    cols = ', '.join(columns)
    update_cols = ', '.join(f"{c} = EXCLUDED.{c}" for c in columns if c != pk)

    cursor.execute(f"""
        INSERT INTO {table} ({cols})
        SELECT {cols} FROM tmp_{table}
        ON CONFLICT ({pk}) DO UPDATE SET {update_cols}
    """)

    # 4. Limpar
    cursor.execute(f"DROP TABLE tmp_{table}")

Por que temp table? Porque o COPY não suporta ON CONFLICT diretamente. A temp table recebe o bulk load ultrarrápido, e depois um único INSERT ... ON CONFLICT faz o merge.

Passo 4: Paralelismo no download e importação

Os ZIPs da Receita Federal são independentes, então podemos baixar e importar em paralelo:

with ThreadPoolExecutor(max_workers=4) as pool:
    futures = []
    for zip_url in zip_urls:
        futures.append(pool.submit(download_and_import, zip_url))

    for future in as_completed(futures):
        future.result()  # propaga exceções

4 workers = 4 ZIPs sendo importados simultaneamente. Com SSDs, o PostgreSQL lida bem com escritas paralelas em tabelas diferentes.

Resultados

Abordagem Velocidade Tempo total
ORM (SQLAlchemy) ~1.200/s 12+ horas
executemany batches ~4.000/s 4+ horas
COPY + temp tables + parallelismo ~80.000/s < 3 horas

67x mais rápido que a abordagem inicial.

Lições aprendidas

  1. use COPY para bulk load no PostgreSQL. Não existe nada mais rápido sem ir para pg_bulkload ou extensões externas.

  2. Dropar indexes antes de um bulk load e recriar depois é quase sempre mais rápido que manter os indexes durante a carga.

  3. Temp tables são o bridge entre COPY (que não suporta upsert) e a necessidade de ON CONFLICT.

  4. synchronous_commit = off é uma otimização segura para data loads — o pior que acontece é perder dados que você pode reimportar.

  5. Batch size importa: 200K linhas por batch é o sweet spot. Muito menos = overhead de transação. Muito mais = uso excessivo de memória.

O resultado

Esse pipeline roda todo mês no CNPJ Aberto para atualizar a base com os dados mais recentes da Receita Federal. Qualquer pessoa pode consultar gratuitamente dados de qualquer empresa brasileira, razão social, sócios, endereço, CNAE, situação cadastral e muito mais.

Se você trabalha com dados públicos brasileiros, dá uma olhada: cnpjaberto.com.br

Obrigado!

DE
Source

This article was originally published by DEV Community and written by Pedro Parker.

Read original article on DEV Community
Back to Discover

Reading List