JSONL in ETL-Pipelines: Kafka, Airflow & Data Warehouses

Eine praxisorientierte Anleitung zur Verwendung von JSONL (JSON Lines) als Austauschformat in modernen ETL-Pipelines. Lernen Sie, wie Sie JSONL-Events in Kafka produzieren und konsumieren, JSONL-Transformationen in Airflow-DAGs orchestrieren und JSONL-Daten in Snowflake, BigQuery und Redshift laden.

Letzte Aktualisierung: Februar 2026

Warum JSONL das ideale ETL-Austauschformat ist

Extract-Transform-Load (ETL)-Pipelines bewegen Daten zwischen Systemen, die selten dasselbe Schema teilen. JSONL ist einzigartig für diese Aufgabe geeignet, da jede Zeile ein eigenständiges JSON-Dokument ist. Es gibt keine Header, die verloren gehen können, keine Trennzeichen, die escaped werden müssen, und keine mehrzeiligen Datensätze, die nach einem teilweisen Fehler wieder zusammengesetzt werden müssen. Jede Zeile kann unabhängig validiert, transformiert und weitergeleitet werden, was genau das ist, was verteilte Systeme wie Kafka und Airflow benötigen.

Im Gegensatz zu CSV bewahrt JSONL verschachtelte Strukturen, Arrays und typisierte Werte ohne Mehrdeutigkeit. Im Gegensatz zu Parquet oder Avro ist JSONL menschenlesbar und erfordert keine speziellen Tools zur Inspektion. Diese Eigenschaften machen JSONL zum natürlichen Format für die Verbindung zwischen Pipeline-Stufen: Kafka-Topics transportieren JSONL-Nachrichten, Airflow-Tasks lesen und schreiben JSONL-Dateien in den Objektspeicher, und Data Warehouses nehmen JSONL über ihre nativen Bulk-Load-Befehle auf. In dieser Anleitung lernen Sie konkrete Muster für jede dieser Stufen mit produktionsreifen Codebeispielen.

Kafka + JSONL: Event-Streaming

Apache Kafka ist das Rückgrat von Echtzeit-ETL. Produzenten senden Events als JSONL-kodierte Nachrichten (ein JSON-Objekt pro Kafka-Nachricht), und Konsumenten lesen sie zeilenweise in nachgelagerte Systeme ein. Da jede Kafka-Nachricht einem einzelnen JSONL-Datensatz entspricht, passt das Format perfekt zu Kafkas log-basierter Architektur. Die Serialisierung ist trivial, und Konsumenten können jede Nachricht unabhängig parsen, ohne puffern zu müssen.

Ein Kafka-Producer, der Python-Dictionaries als JSONL-kodierte Nachrichten serialisiert. Jede Nachricht ist eine einzelne JSON-Zeile, die an ein Kafka-Topic gesendet wird. Der value_serializer übernimmt die Kodierung automatisch, sodass Sie einfach ein Dict an die send()-Methode übergeben.

Kafka-JSONL-Producer
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Eine JSONL-Datei lesen und jede Zeile als Kafka-Nachricht veröffentlichen
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

Ein Kafka-Consumer, der JSONL-Nachrichten liest und sie in Batches in eine lokale JSONL-Datei schreibt. Batching reduziert den I/O-Overhead und ermöglicht es nachgelagerten Systemen, größere Blöcke auf einmal zu verarbeiten. Der Consumer committet Offsets erst, nachdem ein Batch vollständig geschrieben wurde, was At-Least-Once-Delivery gewährleistet.

Kafka-JSONL-Consumer mit gebatchten Schreibvorgängen
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Batch in JSONL-Datei schreiben
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Verbleibende Datensätze flushen
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow: JSONL-Tasks orchestrieren

Apache Airflow ist das Standard-Orchestrierungstool für Batch-ETL. Eine typische JSONL-basierte Airflow-Pipeline hat drei Stufen: Rohdaten in JSONL extrahieren, die JSONL-Datei transformieren (filtern, anreichern, umstrukturieren) und das Ergebnis in ein Data Warehouse oder einen Objektspeicher laden. Jede Stufe liest und schreibt JSONL-Dateien, was den Zwischenzustand inspizierbar und einzelne Tasks unabhängig wiederholbar macht.

Ein vollständiger Airflow-DAG, der Daten von einer API extrahiert, als JSONL transformiert und das Ergebnis in den Cloud-Speicher lädt. Jeder Task kommuniziert über JSONL-Dateien im gemeinsamen Speicher, was die Pipeline einfach zu debuggen und zu wiederholen macht.

Airflow-DAG: JSONL-ETL-Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Daten von der API extrahieren und als JSONL auf die lokale Festplatte schreiben."""
ds = context['ds'] # Ausführungsdatum: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Roh-JSONL lesen, filtern und anreichern, bereinigtes JSONL schreiben."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Bereinigtes JSONL nach S3 hochladen."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

Für große JSONL-Dateien, die nicht in den Speicher passen, verwenden Sie eine Streaming-Transformation, die eine Zeile nach der anderen verarbeitet. Dieses Muster hält den Speicherverbrauch unabhängig von der Dateigröße konstant und funktioniert gut in Airflow-Tasks, bei denen Worker-Ressourcen begrenzt sein können.

Streaming-JSONL-Transform-Task
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""JSONL-Datei mit benutzerdefinierter Funktion stream-transformieren.
Geben Sie None von transform_fn zurück, um einen Datensatz zu überspringen."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Verwendung innerhalb eines Airflow-Tasks
def enrich(record):
if record.get('amount', 0) < 0:
return None # Ungültige überspringen
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

JSONL in Data Warehouses laden

Moderne Data Warehouses bieten erstklassige Unterstützung für die Aufnahme von JSONL-Dateien. Snowflake, BigQuery und Redshift können JSONL direkt aus dem Cloud-Speicher laden, das JSON im Handumdrehen parsen und die Ergebnisse in strukturierten Tabellen speichern. Dies eliminiert die Notwendigkeit eines zwischengeschalteten CSV-Konvertierungsschritts und bewahrt verschachtelte Datentypen.

Snowflake

COPY INTO

Snowflake nimmt JSONL über den COPY INTO-Befehl aus S3, GCS oder Azure Blob Stages auf. Verwenden Sie die FILE_FORMAT-Option mit TYPE = 'JSON', um jede Zeile als separates JSON-Dokument zu parsen. Der VARIANT-Spaltentyp speichert semi-strukturierte Daten nativ.

BigQuery

Nativ

BigQuery lädt JSONL-Dateien aus Google Cloud Storage mit automatischer Schema-Erkennung. Der bq load-Befehl mit --source_format=NEWLINE_DELIMITED_JSON verarbeitet JSONL nativ. BigQuery kann auch Spaltentypen automatisch aus den JSON-Werten erkennen.

Redshift

COPY

Amazon Redshift unterstützt das Laden von JSONL aus S3 mit dem COPY-Befehl mit FORMAT AS JSON. Sie können eine JSONPaths-Datei bereitstellen, um JSON-Schlüssel auf Tabellenspalten abzubilden, was Ihnen feinkörnige Kontrolle über den Ladeprozess gibt.

Laden Sie eine JSONL-Datei aus einer S3-Stage in eine Snowflake-Tabelle. Jede Zeile wird zu einer Zeile mit einer VARIANT-Spalte, die das geparste JSON enthält. Anschließend können Sie die VARIANT-Spalte mit Snowflake SQL in typisierte Spalten auffalten.

Snowflake: COPY JSONL aus S3
-- Eine Stage erstellen, die auf Ihren S3-Bucket zeigt
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Zieltabelle erstellen
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- JSONL-Dateien aus der Stage laden
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Geladene Daten abfragen
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

Verwenden Sie den BigQuery-Python-Client, um eine JSONL-Datei aus Google Cloud Storage in eine BigQuery-Tabelle zu laden. Der Client übernimmt Schema-Erkennung, Partitionierung und Fehlerberichterstattung. Dieser Ansatz lässt sich gut mit Airflow-Tasks integrieren.

BigQuery: JSONL mit Python-Client laden
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Aus GCS laden
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Auf Abschluss warten
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

Fehlertoleranz und Idempotenz

ETL-Pipelines scheitern. Netzwerke fallen aus, APIs haben Timeouts und Worker haben nicht genug Speicher. JSONL macht die Wiederherstellung unkompliziert, da jede Zeile unabhängig ist. Sie können Ihre Position checkpointen (die Zeilennummer), ab der letzten erfolgreichen Zeile fortsetzen und eine erneute Verarbeitung von Daten vermeiden. In Kombination mit idempotenten Schreibvorgängen erhalten Sie Exactly-Once-Semantik auch bei At-Least-Once-Delivery.

Ein checkpoint-basierter Prozessor, der den Fortschritt nach Zeilennummer verfolgt. Wenn die Pipeline fehlschlägt, setzt sie beim letzten committeten Checkpoint fort, anstatt die gesamte Datei erneut zu verarbeiten. Die Checkpoint-Datei ist eine einfache Textdatei, die die zuletzt erfolgreich verarbeitete Zeilennummer enthält.

Checkpoint-basierte JSONL-Verarbeitung
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Letzte committete Zeilennummer lesen."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Aktuellen Checkpoint atomar speichern."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomar auf POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""JSONL-Datei mit checkpoint-basierter Fortsetzung verarbeiten."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # Bereits verarbeitete Zeilen überspringen
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- Ihre Transformationslogik ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Finaler Checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Verwendung
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

Das Checkpoint-Muster funktioniert mit jeder JSONL-Pipeline-Stufe. Für Kafka-Consumer ersetzen Sie den Datei-Checkpoint durch committete Offsets. Für Airflow speichern Sie den Checkpoint in XCom oder einer externen Datenbank, damit wiederholte Tasks an der richtigen Position fortsetzen. Die zentrale Erkenntnis ist, dass JSONL's zeilenbasiertes Format die Positionsverfolgung trivial macht: Eine einzelne Ganzzahl (die Zeilennummer oder der Byte-Offset) ist alles, was Sie zum Fortsetzen benötigen.

Validieren Sie Ihr JSONL vor dem Laden

Erkennen Sie fehlerhafte Datensätze, bevor sie Ihre Pipeline erreichen. Verwenden Sie unsere kostenlosen Online-Tools, um JSONL-Dateien in Ihrem Browser zu validieren, anzuzeigen und zu konvertieren.

JSONL-Dateien sofort inspizieren

JSONL-Dateien bis zu 1 GB direkt in Ihrem Browser anzeigen, validieren und konvertieren. Kein Upload erforderlich, 100 % privat.

Häufig gestellte Fragen

JSONL ETL-Pipeline — Kafka, Airflow, BigQuery, Snowflake ...