JSONL en pipelines ETL: Kafka, Airflow y Data Warehouses
Una guia practica para usar JSONL (JSON Lines) como formato de intercambio en pipelines ETL modernos. Aprende a producir y consumir eventos JSONL en Kafka, orquestar transformaciones JSONL en DAGs de Airflow, y cargar datos JSONL en Snowflake, BigQuery y Redshift.
Ultima actualizacion: febrero 2026
Por que JSONL es el formato de intercambio ETL ideal
Los pipelines Extract-Transform-Load (ETL) mueven datos entre sistemas que rara vez comparten el mismo esquema. JSONL es especialmente adecuado para esta tarea porque cada linea es un documento JSON autocontenido. No hay encabezados que perder, ni delimitadores que escapar, ni registros multilinea que reensamblar despues de un fallo parcial. Cada linea puede validarse, transformarse y enrutarse independientemente, que es exactamente lo que los sistemas distribuidos como Kafka y Airflow necesitan.
A diferencia de CSV, JSONL preserva estructuras anidadas, arreglos y valores tipados sin ambiguedad. A diferencia de Parquet o Avro, JSONL es legible por humanos y no requiere herramientas especiales para su inspeccion. Estas propiedades hacen de JSONL el formato natural para el pegamento entre las etapas del pipeline: los topics de Kafka transportan mensajes JSONL, las tareas de Airflow leen y escriben archivos JSONL en almacenamiento de objetos, y los data warehouses ingestan JSONL a traves de sus comandos nativos de carga masiva. En esta guia, aprenderas patrones concretos para cada una de estas etapas con codigo listo para produccion.
Kafka + JSONL: Streaming de eventos
Apache Kafka es la columna vertebral del ETL en tiempo real. Los productores emiten eventos como mensajes codificados en JSONL (un objeto JSON por mensaje de Kafka), y los consumidores los leen linea por linea hacia sistemas downstream. Debido a que cada mensaje de Kafka es un unico registro JSONL, el formato se alinea perfectamente con la arquitectura basada en log de Kafka. La serializacion es trivial, y los consumidores pueden analizar cada mensaje independientemente sin buffering.
Un productor de Kafka que serializa diccionarios de Python como mensajes codificados en JSONL. Cada mensaje es una unica linea JSON enviada a un topic de Kafka. El value_serializer maneja la codificacion automaticamente, asi que simplemente pasas un dict al metodo send().
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Read a JSONL file and publish each line as a Kafka messagewith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
Un consumidor de Kafka que lee mensajes JSONL y los escribe en un archivo JSONL local por lotes. El procesamiento por lotes reduce la sobrecarga de E/S y permite que los sistemas downstream procesen fragmentos mas grandes a la vez. El consumidor confirma los offsets solo despues de que un lote se haya escrito completamente, asegurando entrega al menos una vez.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Write batch to JSONL filewith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Flush remaining recordsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow: Orquestacion de tareas JSONL
Apache Airflow es la herramienta de orquestacion estandar para ETL por lotes. Un pipeline tipico de Airflow basado en JSONL tiene tres etapas: extraer datos sin procesar a JSONL, transformar el archivo JSONL (filtrar, enriquecer, reformar) y cargar el resultado en un data warehouse o almacen de objetos. Cada etapa lee y escribe archivos JSONL, lo que hace el estado intermedio inspeccionable y las tareas individuales re-ejecutables independientemente.
Un DAG completo de Airflow que extrae datos de una API, los transforma como JSONL y carga el resultado en almacenamiento en la nube. Cada tarea se comunica a traves de archivos JSONL almacenados en almacenamiento compartido, lo que facilita la depuracion y el reintento del pipeline.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extract data from API and write JSONL to local disk."""ds = context['ds'] # execution date: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Read raw JSONL, filter and enrich, write cleaned JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Upload cleaned JSONL to S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
Para archivos JSONL grandes que no caben en memoria, usa una transformacion en streaming que procesa una linea a la vez. Este patron mantiene el uso de memoria constante independientemente del tamano del archivo y funciona bien dentro de las tareas de Airflow donde los recursos de los workers pueden ser limitados.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Stream-transform a JSONL file with a user-defined function.Return None from transform_fn to skip a record."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Usage inside an Airflow taskdef enrich(record):if record.get('amount', 0) < 0:return None # skip invalidrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
Carga de JSONL en Data Warehouses
Los data warehouses modernos tienen soporte de primera clase para ingestar archivos JSONL. Snowflake, BigQuery y Redshift pueden cargar JSONL directamente desde almacenamiento en la nube, analizar el JSON sobre la marcha y almacenar los resultados en tablas estructuradas. Esto elimina la necesidad de un paso intermedio de conversion a CSV y preserva los tipos de datos anidados.
Snowflake
COPY INTOSnowflake ingesta JSONL a traves del comando COPY INTO desde stages de S3, GCS o Azure Blob. Usa la opcion FILE_FORMAT con TYPE = 'JSON' para analizar cada linea como un documento JSON separado. El tipo de columna VARIANT almacena datos semiestructurados de forma nativa.
BigQuery
NativoBigQuery carga archivos JSONL desde Google Cloud Storage con deteccion automatica de esquema. El comando bq load con --source_format=NEWLINE_DELIMITED_JSON maneja JSONL de forma nativa. BigQuery tambien puede detectar automaticamente los tipos de columna a partir de los valores JSON.
Redshift
COPYAmazon Redshift soporta la carga de JSONL desde S3 usando el comando COPY con FORMAT AS JSON. Puedes proporcionar un archivo JSONPaths para mapear claves JSON a columnas de tabla, dandote control detallado sobre el proceso de carga.
Carga un archivo JSONL desde un stage de S3 en una tabla de Snowflake. Cada linea se convierte en una fila con una columna VARIANT que contiene el JSON analizado. Luego puedes aplanar la columna VARIANT en columnas tipadas usando SQL de Snowflake.
-- Create a stage pointing to your S3 bucketCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Create the target tableCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Load JSONL files from the stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Query the loaded dataSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
Usa el cliente Python de BigQuery para cargar un archivo JSONL desde Google Cloud Storage en una tabla de BigQuery. El cliente maneja la deteccion de esquema, particionamiento e informes de errores. Este enfoque se integra bien con tareas de Airflow.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Load from GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Wait for completiontable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
Tolerancia a fallos e idempotencia
Los pipelines ETL fallan. Las redes se caen, las APIs exceden el tiempo de espera y los workers se quedan sin memoria. JSONL hace que la recuperacion sea sencilla porque cada linea es independiente. Puedes marcar tu posicion (el numero de linea), reanudar desde la ultima linea exitosa y evitar reprocesar datos. Combinado con escrituras idempotentes, esto te da semanticas de exactamente una vez incluso con entrega de al menos una vez.
Un procesador basado en checkpoints que rastrea el progreso por numero de linea. Si el pipeline falla, reanuda desde el ultimo checkpoint confirmado en lugar de reprocesar el archivo completo. El archivo de checkpoint es un simple archivo de texto que contiene el ultimo numero de linea procesado exitosamente.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Read the last committed line number."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomically save the current checkpoint."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomic on POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Process a JSONL file with checkpoint-based resumption."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # skip already-processed linesline = line.strip()if not line:continuerecord = json.loads(line)# --- your transform logic ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Final checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Usageprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
El patron de checkpoint funciona con cualquier etapa de pipeline JSONL. Para consumidores de Kafka, reemplaza el checkpoint de archivo con offsets confirmados. Para Airflow, almacena el checkpoint en XCom o una base de datos externa para que las tareas reintentadas reanuden desde la posicion correcta. La idea clave es que el formato basado en lineas de JSONL hace que el seguimiento de posicion sea trivial: un unico entero (el numero de linea o el desplazamiento en bytes) es todo lo que necesitas para reanudar.
Valida tu JSONL antes de cargarlo
Detecta registros malformados antes de que lleguen a tu pipeline. Usa nuestras herramientas online gratuitas para validar, ver y convertir archivos JSONL en tu navegador.