JSONL in ETL Pipelines: Kafka, Airflow & Data Warehouses
A hands-on guide to using JSONL (JSON Lines) as the interchange format across modern ETL pipelines. Learn how to produce and consume JSONL events in Kafka, orchestrate JSONL transformations in Airflow DAGs, and load JSONL data into Snowflake, BigQuery, and Redshift.
Last updated: February 2026
Why JSONL Is the Ideal ETL Interchange Format
Extract-Transform-Load (ETL) pipelines move data between systems that rarely share the same schema. JSONL is uniquely suited to this task because every line is a self-contained JSON document. There are no headers to lose, no delimiters to escape, and no multi-line records to reassemble after a partial failure. Each line can be validated, transformed, and routed independently, which is exactly what distributed systems like Kafka and Airflow need.
Unlike CSV, JSONL preserves nested structures, arrays, and typed values without ambiguity. Unlike Parquet or Avro, JSONL is human-readable and requires no special tooling to inspect. These properties make JSONL the natural format for the glue between pipeline stages: Kafka topics carry JSONL messages, Airflow tasks read and write JSONL files to object storage, and data warehouses ingest JSONL through their native bulk-load commands. In this guide, you will learn concrete patterns for each of these stages with production-ready code.
Kafka + JSONL: Event Streaming
Apache Kafka is the backbone of real-time ETL. Producers emit events as JSONL-encoded messages (one JSON object per Kafka message), and consumers read them line-by-line into downstream systems. Because each Kafka message is a single JSONL record, the format aligns perfectly with Kafka's log-based architecture. Serialization is trivial, and consumers can parse each message independently without buffering.
A Kafka producer that serializes Python dictionaries as JSONL-encoded messages. Each message is a single JSON line sent to a Kafka topic. The value_serializer handles encoding automatically, so you simply pass a dict to the send() method.
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Read a JSONL file and publish each line as a Kafka messagewith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
A Kafka consumer that reads JSONL messages and writes them to a local JSONL file in batches. Batching reduces I/O overhead and allows downstream systems to process larger chunks at once. The consumer commits offsets only after a batch is fully written, ensuring at-least-once delivery.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Write batch to JSONL filewith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Flush remaining recordsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow: Orchestrating JSONL Tasks
Apache Airflow is the standard orchestration tool for batch ETL. A typical JSONL-based Airflow pipeline has three stages: extract raw data to JSONL, transform the JSONL file (filter, enrich, reshape), and load the result into a data warehouse or object store. Each stage reads and writes JSONL files, which makes intermediate state inspectable and individual tasks independently re-runnable.
A complete Airflow DAG that extracts data from an API, transforms it as JSONL, and loads the result into cloud storage. Each task communicates through JSONL files stored on shared storage, making the pipeline easy to debug and retry.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extract data from API and write JSONL to local disk."""ds = context['ds'] # execution date: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Read raw JSONL, filter and enrich, write cleaned JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Upload cleaned JSONL to S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
For large JSONL files that do not fit in memory, use a streaming transform that processes one line at a time. This pattern keeps memory usage constant regardless of file size and works well inside Airflow tasks where worker resources may be limited.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Stream-transform a JSONL file with a user-defined function.Return None from transform_fn to skip a record."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Usage inside an Airflow taskdef enrich(record):if record.get('amount', 0) < 0:return None # skip invalidrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
Loading JSONL into Data Warehouses
Modern data warehouses have first-class support for ingesting JSONL files. Snowflake, BigQuery, and Redshift can all load JSONL directly from cloud storage, parse the JSON on the fly, and store the results in structured tables. This eliminates the need for an intermediate CSV conversion step and preserves nested data types.
Snowflake
COPY INTOSnowflake ingests JSONL via the COPY INTO command from S3, GCS, or Azure Blob stages. Use the FILE_FORMAT option with TYPE = 'JSON' to parse each line as a separate JSON document. The VARIANT column type stores semi-structured data natively.
BigQuery
NativeBigQuery loads JSONL files from Google Cloud Storage with automatic schema detection. The bq load command with --source_format=NEWLINE_DELIMITED_JSON handles JSONL natively. BigQuery can also auto-detect column types from the JSON values.
Redshift
COPYAmazon Redshift supports JSONL loading from S3 using the COPY command with FORMAT AS JSON. You can provide a JSONPaths file to map JSON keys to table columns, giving you fine-grained control over the loading process.
Load a JSONL file from an S3 stage into a Snowflake table. Each line becomes a row with a VARIANT column containing the parsed JSON. You can then flatten the VARIANT column into typed columns using Snowflake SQL.
-- Create a stage pointing to your S3 bucketCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Create the target tableCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Load JSONL files from the stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Query the loaded dataSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
Use the BigQuery Python client to load a JSONL file from Google Cloud Storage into a BigQuery table. The client handles schema detection, partitioning, and error reporting. This approach integrates well with Airflow tasks.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Load from GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Wait for completiontable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
Fault Tolerance and Idempotency
ETL pipelines fail. Networks drop, APIs time out, and workers run out of memory. JSONL makes recovery straightforward because every line is independent. You can checkpoint your position (the line number), resume from the last successful line, and avoid reprocessing data. Combined with idempotent writes, this gives you exactly-once semantics even with at-least-once delivery.
A checkpoint-based processor that tracks progress by line number. If the pipeline fails, it resumes from the last committed checkpoint rather than reprocessing the entire file. The checkpoint file is a simple text file containing the last successfully processed line number.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Read the last committed line number."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomically save the current checkpoint."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomic on POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Process a JSONL file with checkpoint-based resumption."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # skip already-processed linesline = line.strip()if not line:continuerecord = json.loads(line)# --- your transform logic ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Final checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Usageprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
The checkpoint pattern works with any JSONL pipeline stage. For Kafka consumers, replace the file checkpoint with committed offsets. For Airflow, store the checkpoint in XCom or an external database so that retried tasks resume from the correct position. The key insight is that JSONL's line-based format makes position tracking trivial: a single integer (the line number or byte offset) is all you need to resume.
Validate Your JSONL Before Loading
Catch malformed records before they hit your pipeline. Use our free online tools to validate, view, and convert JSONL files in your browser.