JSONL in ETL-pipelines: Kafka, Airflow & Datawarehouses

Een praktische handleiding voor het gebruik van JSONL (JSON Lines) als uitwisselingsformaat in moderne ETL-pipelines. Leer hoe je JSONL-events produceert en consumeert in Kafka, JSONL-transformaties orkestreert in Airflow DAG's, en JSONL-data laadt in Snowflake, BigQuery en Redshift.

Laatst bijgewerkt: februari 2026

Waarom JSONL het ideale ETL-uitwisselingsformaat is

Extract-Transform-Load (ETL) pipelines verplaatsen data tussen systemen die zelden hetzelfde schema delen. JSONL is hier bijzonder geschikt voor omdat elke regel een zelfstandig JSON-document is. Er zijn geen headers om kwijt te raken, geen delimiters om te escapen, en geen meerregelige records om opnieuw samen te stellen na een gedeeltelijke fout. Elke regel kan onafhankelijk worden gevalideerd, getransformeerd en gerouteerd, wat precies is wat gedistribueerde systemen zoals Kafka en Airflow nodig hebben.

In tegenstelling tot CSV behoudt JSONL geneste structuren, arrays en getypeerde waarden zonder dubbelzinnigheid. In tegenstelling tot Parquet of Avro is JSONL leesbaar voor mensen en vereist het geen speciaal gereedschap om te inspecteren. Deze eigenschappen maken JSONL het natuurlijke formaat voor de lijm tussen pipelinefasen: Kafka-topics dragen JSONL-berichten, Airflow-taken lezen en schrijven JSONL-bestanden naar objectopslag, en datawarehouses nemen JSONL op via hun native bulk-load commando's. In deze handleiding leer je concrete patronen voor elk van deze fasen met productieklare code.

Kafka + JSONL: Event Streaming

Apache Kafka is de ruggengraat van realtime ETL. Producers zenden events uit als JSONL-gecodeerde berichten (één JSON-object per Kafka-bericht), en consumers lezen ze regel voor regel in downstream systemen. Omdat elk Kafka-bericht een enkel JSONL-record is, sluit het formaat perfect aan op Kafka's log-gebaseerde architectuur. Serialisatie is triviaal, en consumers kunnen elk bericht onafhankelijk parseren zonder buffering.

Een Kafka-producer die Python-dictionaries serialiseert als JSONL-gecodeerde berichten. Elk bericht is een enkele JSON-regel die naar een Kafka-topic wordt gestuurd. De value_serializer handelt codering automatisch af, zodat je eenvoudig een dict aan de send()-methode kunt doorgeven.

Kafka JSONL Producer
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Lees een JSONL-bestand en publiceer elke regel als Kafka-bericht
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('Alle events gepubliceerd naar Kafka')

Een Kafka-consumer die JSONL-berichten leest en ze in batches naar een lokaal JSONL-bestand schrijft. Batching vermindert I/O-overhead en stelt downstream systemen in staat grotere blokken tegelijk te verwerken. De consumer committeert offsets pas nadat een batch volledig is geschreven, wat at-least-once delivery garandeert.

Kafka JSONL Consumer met gebatchte schrijfacties
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Schrijf batch naar JSONL-bestand
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Batch van {len(batch)} records gecommitteerd')
batch.clear()
# Flush overgebleven records
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow: JSONL-taken orkestreren

Apache Airflow is de standaard orkestratietool voor batch-ETL. Een typische JSONL-gebaseerde Airflow-pipeline heeft drie fasen: ruwe data extraheren naar JSONL, het JSONL-bestand transformeren (filteren, verrijken, hervormen), en het resultaat laden in een datawarehouse of objectopslag. Elke fase leest en schrijft JSONL-bestanden, wat tussentijdse status inspecteerbaar maakt en individuele taken onafhankelijk her-uitvoerbaar.

Een complete Airflow DAG die data extraheert van een API, transformeert als JSONL en het resultaat laadt naar cloudopslag. Elke taak communiceert via JSONL-bestanden op gedeelde opslag, wat de pipeline gemakkelijk te debuggen en opnieuw te proberen maakt.

Airflow DAG: JSONL ETL Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extraheer data van API en schrijf JSONL naar lokale schijf."""
ds = context['ds'] # uitvoeringsdatum: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Lees ruwe JSONL, filter en verrijk, schrijf opgeschoonde JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Upload opgeschoonde JSONL naar S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

Voor grote JSONL-bestanden die niet in het geheugen passen, gebruik een streaming transformatie die één regel tegelijk verwerkt. Dit patroon houdt het geheugengebruik constant ongeacht de bestandsgrootte en werkt goed binnen Airflow-taken waar worker-resources beperkt kunnen zijn.

Streaming JSONL-transformatietaak
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Stream-transformeer een JSONL-bestand met een door de gebruiker gedefinieerde functie.
Retourneer None vanuit transform_fn om een record over te slaan."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Gebruik binnen een Airflow-taak
def enrich(record):
if record.get('amount', 0) < 0:
return None # sla ongeldig over
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Verwerkt: {stats}')

JSONL laden in datawarehouses

Moderne datawarehouses hebben eersteklas ondersteuning voor het opnemen van JSONL-bestanden. Snowflake, BigQuery en Redshift kunnen allemaal JSONL rechtstreeks laden vanuit cloudopslag, de JSON on-the-fly parseren en de resultaten in gestructureerde tabellen opslaan. Dit elimineert de noodzaak voor een tussentijdse CSV-conversiestap en behoudt geneste datatypes.

Snowflake

COPY INTO

Snowflake neemt JSONL op via het COPY INTO-commando vanuit S3-, GCS- of Azure Blob-stages. Gebruik de FILE_FORMAT-optie met TYPE = 'JSON' om elke regel als een apart JSON-document te parseren. Het VARIANT-kolomtype slaat semi-gestructureerde data native op.

BigQuery

Native

BigQuery laadt JSONL-bestanden vanuit Google Cloud Storage met automatische schemadetectie. Het bq load-commando met --source_format=NEWLINE_DELIMITED_JSON verwerkt JSONL native. BigQuery kan ook automatisch kolomtypes detecteren uit de JSON-waarden.

Redshift

COPY

Amazon Redshift ondersteunt JSONL-laden vanuit S3 met het COPY-commando met FORMAT AS JSON. Je kunt een JSONPaths-bestand aanleveren om JSON-sleutels aan tabelkolommen te koppelen, wat je fijnmazige controle geeft over het laadproces.

Laad een JSONL-bestand vanuit een S3-stage in een Snowflake-tabel. Elke regel wordt een rij met een VARIANT-kolom die de geparseerde JSON bevat. Je kunt vervolgens de VARIANT-kolom platmaken naar getypeerde kolommen met Snowflake SQL.

Snowflake: COPY JSONL vanuit S3
-- Maak een stage die naar je S3-bucket wijst
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Maak de doeltabel
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Laad JSONL-bestanden vanuit de stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Query de geladen data
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

Gebruik de BigQuery Python-client om een JSONL-bestand vanuit Google Cloud Storage in een BigQuery-tabel te laden. De client verzorgt schemadetectie, partitionering en foutrapportage. Deze aanpak integreert goed met Airflow-taken.

BigQuery: JSONL laden met Python Client
guide-jsonl-etl-pipeline.jsonlEtlPipeline.warehouses.bigqueryCode.code

Fouttolerantie en idempotentie

ETL-pipelines falen. Netwerken vallen uit, API's lopen vast en workers raken door hun geheugen. JSONL maakt herstel eenvoudig omdat elke regel onafhankelijk is. Je kunt je positie bijhouden (het regelnummer), hervatten vanaf de laatste succesvolle regel en herverwerking van data vermijden. Gecombineerd met idempotente schrijfacties geeft dit je exactly-once semantiek zelfs met at-least-once delivery.

Een checkpoint-gebaseerde processor die voortgang bijhoudt op regelnummer. Als de pipeline faalt, hervat deze vanaf het laatste gecommitteerde checkpoint in plaats van het hele bestand opnieuw te verwerken. Het checkpointbestand is een eenvoudig tekstbestand dat het laatst succesvol verwerkte regelnummer bevat.

Checkpoint-gebaseerde JSONL-verwerking
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Lees het laatst gecommitteerde regelnummer."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Sla het huidige checkpoint atomair op."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomair op POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Verwerk een JSONL-bestand met checkpoint-gebaseerde hervatting."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # sla reeds verwerkte regels over
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- je transformatielogica ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint bij regel {line_num}')
# Laatste checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Klaar. {processed} records verwerkt vanaf regel {start_line + 1}')
# Gebruik
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

Het checkpointpatroon werkt met elke JSONL-pipelinefase. Voor Kafka-consumers vervang je het bestandscheckpoint door gecommitteerde offsets. Voor Airflow sla je het checkpoint op in XCom of een externe database zodat herhaalde taken hervatten vanaf de juiste positie. Het belangrijkste inzicht is dat JSONL's regelgebaseerd formaat positiebijhouding triviaal maakt: een enkel geheel getal (het regelnummer of byte-offset) is alles wat je nodig hebt om te hervatten.

Valideer je JSONL voor het laden

Vang misvormde records op voordat ze je pipeline bereiken. Gebruik onze gratis online tools om JSONL-bestanden te valideren, bekijken en converteren in je browser.

Inspecteer JSONL-bestanden direct

Bekijk, valideer en converteer JSONL-bestanden tot 1GB direct in je browser. Geen uploads nodig, 100% privé.

Veelgestelde vragen

JSONL ETL-pipeline — Kafka, Airflow, BigQuery, Snowflake ...