JSONL em Pipelines ETL: Kafka, Airflow e Data Warehouses
Um guia prático para usar JSONL (JSON Lines) como formato de intercâmbio em pipelines ETL modernos. Aprenda a produzir e consumir eventos JSONL no Kafka, orquestrar transformações JSONL em DAGs do Airflow e carregar dados JSONL no Snowflake, BigQuery e Redshift.
Última atualização: fevereiro de 2026
Por que JSONL é o Formato de Intercâmbio ETL Ideal
Pipelines Extract-Transform-Load (ETL) movem dados entre sistemas que raramente compartilham o mesmo schema. JSONL é especialmente adequado para essa tarefa porque cada linha é um documento JSON autocontido. Não há cabeçalhos para perder, delimitadores para escapar, nem registros multi-linha para remontar após uma falha parcial. Cada linha pode ser validada, transformada e roteada independentemente, que é exatamente o que sistemas distribuídos como Kafka e Airflow precisam.
Diferente do CSV, JSONL preserva estruturas aninhadas, arrays e valores tipados sem ambiguidade. Diferente do Parquet ou Avro, JSONL é legível por humanos e não requer ferramentas especiais para inspecionar. Essas propriedades tornam JSONL o formato natural para a cola entre estágios do pipeline: tópicos Kafka carregam mensagens JSONL, tarefas Airflow leem e escrevem arquivos JSONL no armazenamento de objetos, e data warehouses ingerem JSONL através de seus comandos nativos de carga em massa. Neste guia, você aprenderá padrões concretos para cada um desses estágios com código pronto para produção.
Kafka + JSONL: Streaming de Eventos
Apache Kafka é a espinha dorsal do ETL em tempo real. Produtores emitem eventos como mensagens codificadas em JSONL (um objeto JSON por mensagem Kafka), e consumidores os leem linha por linha para sistemas downstream. Como cada mensagem Kafka é um único registro JSONL, o formato se alinha perfeitamente com a arquitetura baseada em log do Kafka. A serialização é trivial, e consumidores podem parsear cada mensagem independentemente sem buffering.
Um produtor Kafka que serializa dicionários Python como mensagens codificadas em JSONL. Cada mensagem é uma única linha JSON enviada para um tópico Kafka. O value_serializer lida com a codificação automaticamente, então você simplesmente passa um dict para o método send().
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Lê um arquivo JSONL e publica cada linha como uma mensagem Kafkawith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('Todos os eventos publicados no Kafka')
Um consumidor Kafka que lê mensagens JSONL e as escreve em um arquivo JSONL local em lotes. O agrupamento em lotes reduz o overhead de I/O e permite que sistemas downstream processem chunks maiores de uma vez. O consumidor commita offsets apenas após um lote ser completamente escrito, garantindo entrega at-least-once.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Escreve o lote no arquivo JSONLwith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Lote de {len(batch)} registros commitado')batch.clear()# Flush dos registros restantesif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow: Orquestrando Tarefas JSONL
Apache Airflow é a ferramenta de orquestração padrão para ETL em lote. Um pipeline típico baseado em JSONL no Airflow tem três estágios: extrair dados brutos para JSONL, transformar o arquivo JSONL (filtrar, enriquecer, remodelar) e carregar o resultado em um data warehouse ou armazenamento de objetos. Cada estágio lê e escreve arquivos JSONL, o que torna o estado intermediário inspecionável e tarefas individuais re-executáveis independentemente.
Um DAG Airflow completo que extrai dados de uma API, transforma-os como JSONL e carrega o resultado no armazenamento em nuvem. Cada tarefa se comunica através de arquivos JSONL armazenados em storage compartilhado, facilitando a depuração e nova tentativa do pipeline.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extrai dados da API e escreve JSONL no disco local."""ds = context['ds'] # data de execução: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Lê JSONL bruto, filtra e enriquece, escreve JSONL limpo."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Faz upload do JSONL limpo para S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
Para arquivos JSONL grandes que não cabem na memória, use uma transformação por streaming que processa uma linha por vez. Este padrão mantém o uso de memória constante independentemente do tamanho do arquivo e funciona bem dentro de tarefas Airflow onde os recursos dos workers podem ser limitados.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Transforma um arquivo JSONL por streaming com uma função definida pelo usuário.Retorne None de transform_fn para pular um registro."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Uso dentro de uma tarefa Airflowdef enrich(record):if record.get('amount', 0) < 0:return None # pula inválidorecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processado: {stats}')
Carregando JSONL em Data Warehouses
Data warehouses modernos têm suporte de primeira classe para ingestão de arquivos JSONL. Snowflake, BigQuery e Redshift podem todos carregar JSONL diretamente do armazenamento em nuvem, parsear o JSON em tempo de execução e armazenar os resultados em tabelas estruturadas. Isso elimina a necessidade de um passo intermediário de conversão para CSV e preserva tipos de dados aninhados.
Snowflake
COPY INTOSnowflake ingere JSONL via comando COPY INTO de stages S3, GCS ou Azure Blob. Use a opção FILE_FORMAT com TYPE = 'JSON' para parsear cada linha como um documento JSON separado. O tipo de coluna VARIANT armazena dados semi-estruturados nativamente.
BigQuery
NativoBigQuery carrega arquivos JSONL do Google Cloud Storage com detecção automática de schema. O comando bq load com --source_format=NEWLINE_DELIMITED_JSON lida com JSONL nativamente. BigQuery também pode detectar automaticamente tipos de colunas a partir dos valores JSON.
Redshift
COPYAmazon Redshift suporta carregamento de JSONL do S3 usando o comando COPY com FORMAT AS JSON. Você pode fornecer um arquivo JSONPaths para mapear chaves JSON para colunas da tabela, dando controle detalhado sobre o processo de carregamento.
Carrega um arquivo JSONL de um stage S3 em uma tabela Snowflake. Cada linha se torna uma linha com uma coluna VARIANT contendo o JSON parseado. Você pode então achatar a coluna VARIANT em colunas tipadas usando SQL do Snowflake.
-- Cria um stage apontando para seu bucket S3CREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Cria a tabela de destinoCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Carrega arquivos JSONL do stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Consulta os dados carregadosSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
Use o cliente Python do BigQuery para carregar um arquivo JSONL do Google Cloud Storage em uma tabela BigQuery. O cliente lida com detecção de schema, particionamento e relatório de erros. Essa abordagem se integra bem com tarefas Airflow.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Carrega do GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Aguarda conclusãotable = client.get_table(table_ref)print(f'Carregadas {load_job.output_rows} linhas')print(f'A tabela agora tem {table.num_rows} linhas no total')
Tolerância a Falhas e Idempotência
Pipelines ETL falham. Redes caem, APIs expiram e workers ficam sem memória. JSONL torna a recuperação simples porque cada linha é independente. Você pode fazer checkpoint da sua posição (o número da linha), retomar a partir da última linha bem-sucedida e evitar reprocessar dados. Combinado com escritas idempotentes, isso oferece semântica exactly-once mesmo com entrega at-least-once.
Um processador baseado em checkpoint que rastreia o progresso por número de linha. Se o pipeline falhar, ele retoma do último checkpoint commitado em vez de reprocessar o arquivo inteiro. O arquivo de checkpoint é um simples arquivo de texto contendo o último número de linha processado com sucesso.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Lê o último número de linha commitado."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Salva atomicamente o checkpoint atual."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atômico em POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Processa um arquivo JSONL com retomada baseada em checkpoint."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # pula linhas já processadasline = line.strip()if not line:continuerecord = json.loads(line)# --- sua lógica de transformação ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint na linha {line_num}')# Checkpoint finalsave_checkpoint(checkpoint_path, line_num)print(f'Concluído. Processados {processed} registros a partir da linha {start_line + 1}')# Usoprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
O padrão de checkpoint funciona com qualquer estágio do pipeline JSONL. Para consumidores Kafka, substitua o checkpoint de arquivo por offsets commitados. Para Airflow, armazene o checkpoint no XCom ou em um banco de dados externo para que tarefas reexecutadas retomem da posição correta. O insight chave é que o formato baseado em linhas do JSONL torna o rastreamento de posição trivial: um único inteiro (o número da linha ou offset de bytes) é tudo que você precisa para retomar.
Valide Seu JSONL Antes de Carregar
Detecte registros malformados antes que cheguem ao seu pipeline. Use nossas ferramentas online gratuitas para validar, visualizar e converter arquivos JSONL no seu navegador.