JSONL em Pipelines ETL: Kafka, Airflow e Data Warehouses

Um guia prático para usar JSONL (JSON Lines) como formato de intercâmbio em pipelines ETL modernos. Aprenda a produzir e consumir eventos JSONL no Kafka, orquestrar transformações JSONL em DAGs do Airflow e carregar dados JSONL no Snowflake, BigQuery e Redshift.

Última atualização: fevereiro de 2026

Por que JSONL é o Formato de Intercâmbio ETL Ideal

Pipelines Extract-Transform-Load (ETL) movem dados entre sistemas que raramente compartilham o mesmo schema. JSONL é especialmente adequado para essa tarefa porque cada linha é um documento JSON autocontido. Não há cabeçalhos para perder, delimitadores para escapar, nem registros multi-linha para remontar após uma falha parcial. Cada linha pode ser validada, transformada e roteada independentemente, que é exatamente o que sistemas distribuídos como Kafka e Airflow precisam.

Diferente do CSV, JSONL preserva estruturas aninhadas, arrays e valores tipados sem ambiguidade. Diferente do Parquet ou Avro, JSONL é legível por humanos e não requer ferramentas especiais para inspecionar. Essas propriedades tornam JSONL o formato natural para a cola entre estágios do pipeline: tópicos Kafka carregam mensagens JSONL, tarefas Airflow leem e escrevem arquivos JSONL no armazenamento de objetos, e data warehouses ingerem JSONL através de seus comandos nativos de carga em massa. Neste guia, você aprenderá padrões concretos para cada um desses estágios com código pronto para produção.

Kafka + JSONL: Streaming de Eventos

Apache Kafka é a espinha dorsal do ETL em tempo real. Produtores emitem eventos como mensagens codificadas em JSONL (um objeto JSON por mensagem Kafka), e consumidores os leem linha por linha para sistemas downstream. Como cada mensagem Kafka é um único registro JSONL, o formato se alinha perfeitamente com a arquitetura baseada em log do Kafka. A serialização é trivial, e consumidores podem parsear cada mensagem independentemente sem buffering.

Um produtor Kafka que serializa dicionários Python como mensagens codificadas em JSONL. Cada mensagem é uma única linha JSON enviada para um tópico Kafka. O value_serializer lida com a codificação automaticamente, então você simplesmente passa um dict para o método send().

Produtor JSONL para Kafka
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Lê um arquivo JSONL e publica cada linha como uma mensagem Kafka
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('Todos os eventos publicados no Kafka')

Um consumidor Kafka que lê mensagens JSONL e as escreve em um arquivo JSONL local em lotes. O agrupamento em lotes reduz o overhead de I/O e permite que sistemas downstream processem chunks maiores de uma vez. O consumidor commita offsets apenas após um lote ser completamente escrito, garantindo entrega at-least-once.

Consumidor JSONL do Kafka com Escritas em Lote
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Escreve o lote no arquivo JSONL
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Lote de {len(batch)} registros commitado')
batch.clear()
# Flush dos registros restantes
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow: Orquestrando Tarefas JSONL

Apache Airflow é a ferramenta de orquestração padrão para ETL em lote. Um pipeline típico baseado em JSONL no Airflow tem três estágios: extrair dados brutos para JSONL, transformar o arquivo JSONL (filtrar, enriquecer, remodelar) e carregar o resultado em um data warehouse ou armazenamento de objetos. Cada estágio lê e escreve arquivos JSONL, o que torna o estado intermediário inspecionável e tarefas individuais re-executáveis independentemente.

Um DAG Airflow completo que extrai dados de uma API, transforma-os como JSONL e carrega o resultado no armazenamento em nuvem. Cada tarefa se comunica através de arquivos JSONL armazenados em storage compartilhado, facilitando a depuração e nova tentativa do pipeline.

DAG Airflow: Pipeline ETL JSONL
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extrai dados da API e escreve JSONL no disco local."""
ds = context['ds'] # data de execução: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Lê JSONL bruto, filtra e enriquece, escreve JSONL limpo."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Faz upload do JSONL limpo para S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

Para arquivos JSONL grandes que não cabem na memória, use uma transformação por streaming que processa uma linha por vez. Este padrão mantém o uso de memória constante independentemente do tamanho do arquivo e funciona bem dentro de tarefas Airflow onde os recursos dos workers podem ser limitados.

Tarefa de Transformação JSONL por Streaming
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Transforma um arquivo JSONL por streaming com uma função definida pelo usuário.
Retorne None de transform_fn para pular um registro."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Uso dentro de uma tarefa Airflow
def enrich(record):
if record.get('amount', 0) < 0:
return None # pula inválido
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processado: {stats}')

Carregando JSONL em Data Warehouses

Data warehouses modernos têm suporte de primeira classe para ingestão de arquivos JSONL. Snowflake, BigQuery e Redshift podem todos carregar JSONL diretamente do armazenamento em nuvem, parsear o JSON em tempo de execução e armazenar os resultados em tabelas estruturadas. Isso elimina a necessidade de um passo intermediário de conversão para CSV e preserva tipos de dados aninhados.

Snowflake

COPY INTO

Snowflake ingere JSONL via comando COPY INTO de stages S3, GCS ou Azure Blob. Use a opção FILE_FORMAT com TYPE = 'JSON' para parsear cada linha como um documento JSON separado. O tipo de coluna VARIANT armazena dados semi-estruturados nativamente.

BigQuery

Nativo

BigQuery carrega arquivos JSONL do Google Cloud Storage com detecção automática de schema. O comando bq load com --source_format=NEWLINE_DELIMITED_JSON lida com JSONL nativamente. BigQuery também pode detectar automaticamente tipos de colunas a partir dos valores JSON.

Redshift

COPY

Amazon Redshift suporta carregamento de JSONL do S3 usando o comando COPY com FORMAT AS JSON. Você pode fornecer um arquivo JSONPaths para mapear chaves JSON para colunas da tabela, dando controle detalhado sobre o processo de carregamento.

Carrega um arquivo JSONL de um stage S3 em uma tabela Snowflake. Cada linha se torna uma linha com uma coluna VARIANT contendo o JSON parseado. Você pode então achatar a coluna VARIANT em colunas tipadas usando SQL do Snowflake.

Snowflake: COPY JSONL do S3
-- Cria um stage apontando para seu bucket S3
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Cria a tabela de destino
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Carrega arquivos JSONL do stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Consulta os dados carregados
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

Use o cliente Python do BigQuery para carregar um arquivo JSONL do Google Cloud Storage em uma tabela BigQuery. O cliente lida com detecção de schema, particionamento e relatório de erros. Essa abordagem se integra bem com tarefas Airflow.

BigQuery: Carregar JSONL com Cliente Python
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Carrega do GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Aguarda conclusão
table = client.get_table(table_ref)
print(f'Carregadas {load_job.output_rows} linhas')
print(f'A tabela agora tem {table.num_rows} linhas no total')

Tolerância a Falhas e Idempotência

Pipelines ETL falham. Redes caem, APIs expiram e workers ficam sem memória. JSONL torna a recuperação simples porque cada linha é independente. Você pode fazer checkpoint da sua posição (o número da linha), retomar a partir da última linha bem-sucedida e evitar reprocessar dados. Combinado com escritas idempotentes, isso oferece semântica exactly-once mesmo com entrega at-least-once.

Um processador baseado em checkpoint que rastreia o progresso por número de linha. Se o pipeline falhar, ele retoma do último checkpoint commitado em vez de reprocessar o arquivo inteiro. O arquivo de checkpoint é um simples arquivo de texto contendo o último número de linha processado com sucesso.

Processamento JSONL Baseado em Checkpoint
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Lê o último número de linha commitado."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Salva atomicamente o checkpoint atual."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atômico em POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Processa um arquivo JSONL com retomada baseada em checkpoint."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # pula linhas já processadas
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- sua lógica de transformação ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint na linha {line_num}')
# Checkpoint final
save_checkpoint(checkpoint_path, line_num)
print(f'Concluído. Processados {processed} registros a partir da linha {start_line + 1}')
# Uso
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

O padrão de checkpoint funciona com qualquer estágio do pipeline JSONL. Para consumidores Kafka, substitua o checkpoint de arquivo por offsets commitados. Para Airflow, armazene o checkpoint no XCom ou em um banco de dados externo para que tarefas reexecutadas retomem da posição correta. O insight chave é que o formato baseado em linhas do JSONL torna o rastreamento de posição trivial: um único inteiro (o número da linha ou offset de bytes) é tudo que você precisa para retomar.

Valide Seu JSONL Antes de Carregar

Detecte registros malformados antes que cheguem ao seu pipeline. Use nossas ferramentas online gratuitas para validar, visualizar e converter arquivos JSONL no seu navegador.

Inspecione Arquivos JSONL Instantaneamente

Visualize, valide e converta arquivos JSONL de até 1GB diretamente no seu navegador. Sem uploads necessários, 100% privado.

Perguntas Frequentes

Pipeline ETL JSONL — Kafka, Airflow, BigQuery, Snowflake ...