JSONL en pipelines ETL: Kafka, Airflow y Data Warehouses

Una guia practica para usar JSONL (JSON Lines) como formato de intercambio en pipelines ETL modernos. Aprende a producir y consumir eventos JSONL en Kafka, orquestar transformaciones JSONL en DAGs de Airflow, y cargar datos JSONL en Snowflake, BigQuery y Redshift.

Ultima actualizacion: febrero 2026

Por que JSONL es el formato de intercambio ETL ideal

Los pipelines Extract-Transform-Load (ETL) mueven datos entre sistemas que rara vez comparten el mismo esquema. JSONL es especialmente adecuado para esta tarea porque cada linea es un documento JSON autocontenido. No hay encabezados que perder, ni delimitadores que escapar, ni registros multilinea que reensamblar despues de un fallo parcial. Cada linea puede validarse, transformarse y enrutarse independientemente, que es exactamente lo que los sistemas distribuidos como Kafka y Airflow necesitan.

A diferencia de CSV, JSONL preserva estructuras anidadas, arreglos y valores tipados sin ambiguedad. A diferencia de Parquet o Avro, JSONL es legible por humanos y no requiere herramientas especiales para su inspeccion. Estas propiedades hacen de JSONL el formato natural para el pegamento entre las etapas del pipeline: los topics de Kafka transportan mensajes JSONL, las tareas de Airflow leen y escriben archivos JSONL en almacenamiento de objetos, y los data warehouses ingestan JSONL a traves de sus comandos nativos de carga masiva. En esta guia, aprenderas patrones concretos para cada una de estas etapas con codigo listo para produccion.

Kafka + JSONL: Streaming de eventos

Apache Kafka es la columna vertebral del ETL en tiempo real. Los productores emiten eventos como mensajes codificados en JSONL (un objeto JSON por mensaje de Kafka), y los consumidores los leen linea por linea hacia sistemas downstream. Debido a que cada mensaje de Kafka es un unico registro JSONL, el formato se alinea perfectamente con la arquitectura basada en log de Kafka. La serializacion es trivial, y los consumidores pueden analizar cada mensaje independientemente sin buffering.

Un productor de Kafka que serializa diccionarios de Python como mensajes codificados en JSONL. Cada mensaje es una unica linea JSON enviada a un topic de Kafka. El value_serializer maneja la codificacion automaticamente, asi que simplemente pasas un dict al metodo send().

Productor JSONL de Kafka
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Read a JSONL file and publish each line as a Kafka message
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

Un consumidor de Kafka que lee mensajes JSONL y los escribe en un archivo JSONL local por lotes. El procesamiento por lotes reduce la sobrecarga de E/S y permite que los sistemas downstream procesen fragmentos mas grandes a la vez. El consumidor confirma los offsets solo despues de que un lote se haya escrito completamente, asegurando entrega al menos una vez.

Consumidor JSONL de Kafka con escrituras por lotes
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Write batch to JSONL file
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Flush remaining records
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow: Orquestacion de tareas JSONL

Apache Airflow es la herramienta de orquestacion estandar para ETL por lotes. Un pipeline tipico de Airflow basado en JSONL tiene tres etapas: extraer datos sin procesar a JSONL, transformar el archivo JSONL (filtrar, enriquecer, reformar) y cargar el resultado en un data warehouse o almacen de objetos. Cada etapa lee y escribe archivos JSONL, lo que hace el estado intermedio inspeccionable y las tareas individuales re-ejecutables independientemente.

Un DAG completo de Airflow que extrae datos de una API, los transforma como JSONL y carga el resultado en almacenamiento en la nube. Cada tarea se comunica a traves de archivos JSONL almacenados en almacenamiento compartido, lo que facilita la depuracion y el reintento del pipeline.

DAG de Airflow: Pipeline ETL JSONL
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extract data from API and write JSONL to local disk."""
ds = context['ds'] # execution date: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Read raw JSONL, filter and enrich, write cleaned JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Upload cleaned JSONL to S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

Para archivos JSONL grandes que no caben en memoria, usa una transformacion en streaming que procesa una linea a la vez. Este patron mantiene el uso de memoria constante independientemente del tamano del archivo y funciona bien dentro de las tareas de Airflow donde los recursos de los workers pueden ser limitados.

Tarea de transformacion JSONL en streaming
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Stream-transform a JSONL file with a user-defined function.
Return None from transform_fn to skip a record."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Usage inside an Airflow task
def enrich(record):
if record.get('amount', 0) < 0:
return None # skip invalid
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

Carga de JSONL en Data Warehouses

Los data warehouses modernos tienen soporte de primera clase para ingestar archivos JSONL. Snowflake, BigQuery y Redshift pueden cargar JSONL directamente desde almacenamiento en la nube, analizar el JSON sobre la marcha y almacenar los resultados en tablas estructuradas. Esto elimina la necesidad de un paso intermedio de conversion a CSV y preserva los tipos de datos anidados.

Snowflake

COPY INTO

Snowflake ingesta JSONL a traves del comando COPY INTO desde stages de S3, GCS o Azure Blob. Usa la opcion FILE_FORMAT con TYPE = 'JSON' para analizar cada linea como un documento JSON separado. El tipo de columna VARIANT almacena datos semiestructurados de forma nativa.

BigQuery

Nativo

BigQuery carga archivos JSONL desde Google Cloud Storage con deteccion automatica de esquema. El comando bq load con --source_format=NEWLINE_DELIMITED_JSON maneja JSONL de forma nativa. BigQuery tambien puede detectar automaticamente los tipos de columna a partir de los valores JSON.

Redshift

COPY

Amazon Redshift soporta la carga de JSONL desde S3 usando el comando COPY con FORMAT AS JSON. Puedes proporcionar un archivo JSONPaths para mapear claves JSON a columnas de tabla, dandote control detallado sobre el proceso de carga.

Carga un archivo JSONL desde un stage de S3 en una tabla de Snowflake. Cada linea se convierte en una fila con una columna VARIANT que contiene el JSON analizado. Luego puedes aplanar la columna VARIANT en columnas tipadas usando SQL de Snowflake.

Snowflake: COPY JSONL desde S3
-- Create a stage pointing to your S3 bucket
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Create the target table
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load JSONL files from the stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Query the loaded data
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

Usa el cliente Python de BigQuery para cargar un archivo JSONL desde Google Cloud Storage en una tabla de BigQuery. El cliente maneja la deteccion de esquema, particionamiento e informes de errores. Este enfoque se integra bien con tareas de Airflow.

BigQuery: Cargar JSONL con cliente Python
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Load from GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Wait for completion
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

Tolerancia a fallos e idempotencia

Los pipelines ETL fallan. Las redes se caen, las APIs exceden el tiempo de espera y los workers se quedan sin memoria. JSONL hace que la recuperacion sea sencilla porque cada linea es independiente. Puedes marcar tu posicion (el numero de linea), reanudar desde la ultima linea exitosa y evitar reprocesar datos. Combinado con escrituras idempotentes, esto te da semanticas de exactamente una vez incluso con entrega de al menos una vez.

Un procesador basado en checkpoints que rastrea el progreso por numero de linea. Si el pipeline falla, reanuda desde el ultimo checkpoint confirmado en lugar de reprocesar el archivo completo. El archivo de checkpoint es un simple archivo de texto que contiene el ultimo numero de linea procesado exitosamente.

Procesamiento JSONL basado en checkpoints
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Read the last committed line number."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Atomically save the current checkpoint."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomic on POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Process a JSONL file with checkpoint-based resumption."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # skip already-processed lines
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- your transform logic ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Final checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Usage
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

El patron de checkpoint funciona con cualquier etapa de pipeline JSONL. Para consumidores de Kafka, reemplaza el checkpoint de archivo con offsets confirmados. Para Airflow, almacena el checkpoint en XCom o una base de datos externa para que las tareas reintentadas reanuden desde la posicion correcta. La idea clave es que el formato basado en lineas de JSONL hace que el seguimiento de posicion sea trivial: un unico entero (el numero de linea o el desplazamiento en bytes) es todo lo que necesitas para reanudar.

Valida tu JSONL antes de cargarlo

Detecta registros malformados antes de que lleguen a tu pipeline. Usa nuestras herramientas online gratuitas para validar, ver y convertir archivos JSONL en tu navegador.

Inspecciona archivos JSONL al instante

Visualiza, valida y convierte archivos JSONL de hasta 1GB directamente en tu navegador. Sin subidas necesarias, 100% privado.

Preguntas frecuentes

Pipeline ETL con JSONL — Kafka, Airflow, BigQuery, Snowfl...