JSONL in ETL-Pipelines: Kafka, Airflow & Data Warehouses
Eine praxisorientierte Anleitung zur Verwendung von JSONL (JSON Lines) als Austauschformat in modernen ETL-Pipelines. Lernen Sie, wie Sie JSONL-Events in Kafka produzieren und konsumieren, JSONL-Transformationen in Airflow-DAGs orchestrieren und JSONL-Daten in Snowflake, BigQuery und Redshift laden.
Letzte Aktualisierung: Februar 2026
Warum JSONL das ideale ETL-Austauschformat ist
Extract-Transform-Load (ETL)-Pipelines bewegen Daten zwischen Systemen, die selten dasselbe Schema teilen. JSONL ist einzigartig für diese Aufgabe geeignet, da jede Zeile ein eigenständiges JSON-Dokument ist. Es gibt keine Header, die verloren gehen können, keine Trennzeichen, die escaped werden müssen, und keine mehrzeiligen Datensätze, die nach einem teilweisen Fehler wieder zusammengesetzt werden müssen. Jede Zeile kann unabhängig validiert, transformiert und weitergeleitet werden, was genau das ist, was verteilte Systeme wie Kafka und Airflow benötigen.
Im Gegensatz zu CSV bewahrt JSONL verschachtelte Strukturen, Arrays und typisierte Werte ohne Mehrdeutigkeit. Im Gegensatz zu Parquet oder Avro ist JSONL menschenlesbar und erfordert keine speziellen Tools zur Inspektion. Diese Eigenschaften machen JSONL zum natürlichen Format für die Verbindung zwischen Pipeline-Stufen: Kafka-Topics transportieren JSONL-Nachrichten, Airflow-Tasks lesen und schreiben JSONL-Dateien in den Objektspeicher, und Data Warehouses nehmen JSONL über ihre nativen Bulk-Load-Befehle auf. In dieser Anleitung lernen Sie konkrete Muster für jede dieser Stufen mit produktionsreifen Codebeispielen.
Kafka + JSONL: Event-Streaming
Apache Kafka ist das Rückgrat von Echtzeit-ETL. Produzenten senden Events als JSONL-kodierte Nachrichten (ein JSON-Objekt pro Kafka-Nachricht), und Konsumenten lesen sie zeilenweise in nachgelagerte Systeme ein. Da jede Kafka-Nachricht einem einzelnen JSONL-Datensatz entspricht, passt das Format perfekt zu Kafkas log-basierter Architektur. Die Serialisierung ist trivial, und Konsumenten können jede Nachricht unabhängig parsen, ohne puffern zu müssen.
Ein Kafka-Producer, der Python-Dictionaries als JSONL-kodierte Nachrichten serialisiert. Jede Nachricht ist eine einzelne JSON-Zeile, die an ein Kafka-Topic gesendet wird. Der value_serializer übernimmt die Kodierung automatisch, sodass Sie einfach ein Dict an die send()-Methode übergeben.
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Eine JSONL-Datei lesen und jede Zeile als Kafka-Nachricht veröffentlichenwith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
Ein Kafka-Consumer, der JSONL-Nachrichten liest und sie in Batches in eine lokale JSONL-Datei schreibt. Batching reduziert den I/O-Overhead und ermöglicht es nachgelagerten Systemen, größere Blöcke auf einmal zu verarbeiten. Der Consumer committet Offsets erst, nachdem ein Batch vollständig geschrieben wurde, was At-Least-Once-Delivery gewährleistet.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Batch in JSONL-Datei schreibenwith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Verbleibende Datensätze flushenif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow: JSONL-Tasks orchestrieren
Apache Airflow ist das Standard-Orchestrierungstool für Batch-ETL. Eine typische JSONL-basierte Airflow-Pipeline hat drei Stufen: Rohdaten in JSONL extrahieren, die JSONL-Datei transformieren (filtern, anreichern, umstrukturieren) und das Ergebnis in ein Data Warehouse oder einen Objektspeicher laden. Jede Stufe liest und schreibt JSONL-Dateien, was den Zwischenzustand inspizierbar und einzelne Tasks unabhängig wiederholbar macht.
Ein vollständiger Airflow-DAG, der Daten von einer API extrahiert, als JSONL transformiert und das Ergebnis in den Cloud-Speicher lädt. Jeder Task kommuniziert über JSONL-Dateien im gemeinsamen Speicher, was die Pipeline einfach zu debuggen und zu wiederholen macht.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Daten von der API extrahieren und als JSONL auf die lokale Festplatte schreiben."""ds = context['ds'] # Ausführungsdatum: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Roh-JSONL lesen, filtern und anreichern, bereinigtes JSONL schreiben."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Bereinigtes JSONL nach S3 hochladen."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
Für große JSONL-Dateien, die nicht in den Speicher passen, verwenden Sie eine Streaming-Transformation, die eine Zeile nach der anderen verarbeitet. Dieses Muster hält den Speicherverbrauch unabhängig von der Dateigröße konstant und funktioniert gut in Airflow-Tasks, bei denen Worker-Ressourcen begrenzt sein können.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""JSONL-Datei mit benutzerdefinierter Funktion stream-transformieren.Geben Sie None von transform_fn zurück, um einen Datensatz zu überspringen."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Verwendung innerhalb eines Airflow-Tasksdef enrich(record):if record.get('amount', 0) < 0:return None # Ungültige überspringenrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
JSONL in Data Warehouses laden
Moderne Data Warehouses bieten erstklassige Unterstützung für die Aufnahme von JSONL-Dateien. Snowflake, BigQuery und Redshift können JSONL direkt aus dem Cloud-Speicher laden, das JSON im Handumdrehen parsen und die Ergebnisse in strukturierten Tabellen speichern. Dies eliminiert die Notwendigkeit eines zwischengeschalteten CSV-Konvertierungsschritts und bewahrt verschachtelte Datentypen.
Snowflake
COPY INTOSnowflake nimmt JSONL über den COPY INTO-Befehl aus S3, GCS oder Azure Blob Stages auf. Verwenden Sie die FILE_FORMAT-Option mit TYPE = 'JSON', um jede Zeile als separates JSON-Dokument zu parsen. Der VARIANT-Spaltentyp speichert semi-strukturierte Daten nativ.
BigQuery
NativBigQuery lädt JSONL-Dateien aus Google Cloud Storage mit automatischer Schema-Erkennung. Der bq load-Befehl mit --source_format=NEWLINE_DELIMITED_JSON verarbeitet JSONL nativ. BigQuery kann auch Spaltentypen automatisch aus den JSON-Werten erkennen.
Redshift
COPYAmazon Redshift unterstützt das Laden von JSONL aus S3 mit dem COPY-Befehl mit FORMAT AS JSON. Sie können eine JSONPaths-Datei bereitstellen, um JSON-Schlüssel auf Tabellenspalten abzubilden, was Ihnen feinkörnige Kontrolle über den Ladeprozess gibt.
Laden Sie eine JSONL-Datei aus einer S3-Stage in eine Snowflake-Tabelle. Jede Zeile wird zu einer Zeile mit einer VARIANT-Spalte, die das geparste JSON enthält. Anschließend können Sie die VARIANT-Spalte mit Snowflake SQL in typisierte Spalten auffalten.
-- Eine Stage erstellen, die auf Ihren S3-Bucket zeigtCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Zieltabelle erstellenCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- JSONL-Dateien aus der Stage ladenCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Geladene Daten abfragenSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
Verwenden Sie den BigQuery-Python-Client, um eine JSONL-Datei aus Google Cloud Storage in eine BigQuery-Tabelle zu laden. Der Client übernimmt Schema-Erkennung, Partitionierung und Fehlerberichterstattung. Dieser Ansatz lässt sich gut mit Airflow-Tasks integrieren.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Aus GCS ladenuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Auf Abschluss wartentable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
Fehlertoleranz und Idempotenz
ETL-Pipelines scheitern. Netzwerke fallen aus, APIs haben Timeouts und Worker haben nicht genug Speicher. JSONL macht die Wiederherstellung unkompliziert, da jede Zeile unabhängig ist. Sie können Ihre Position checkpointen (die Zeilennummer), ab der letzten erfolgreichen Zeile fortsetzen und eine erneute Verarbeitung von Daten vermeiden. In Kombination mit idempotenten Schreibvorgängen erhalten Sie Exactly-Once-Semantik auch bei At-Least-Once-Delivery.
Ein checkpoint-basierter Prozessor, der den Fortschritt nach Zeilennummer verfolgt. Wenn die Pipeline fehlschlägt, setzt sie beim letzten committeten Checkpoint fort, anstatt die gesamte Datei erneut zu verarbeiten. Die Checkpoint-Datei ist eine einfache Textdatei, die die zuletzt erfolgreich verarbeitete Zeilennummer enthält.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Letzte committete Zeilennummer lesen."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Aktuellen Checkpoint atomar speichern."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomar auf POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""JSONL-Datei mit checkpoint-basierter Fortsetzung verarbeiten."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # Bereits verarbeitete Zeilen überspringenline = line.strip()if not line:continuerecord = json.loads(line)# --- Ihre Transformationslogik ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Finaler Checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Verwendungprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
Das Checkpoint-Muster funktioniert mit jeder JSONL-Pipeline-Stufe. Für Kafka-Consumer ersetzen Sie den Datei-Checkpoint durch committete Offsets. Für Airflow speichern Sie den Checkpoint in XCom oder einer externen Datenbank, damit wiederholte Tasks an der richtigen Position fortsetzen. Die zentrale Erkenntnis ist, dass JSONL's zeilenbasiertes Format die Positionsverfolgung trivial macht: Eine einzelne Ganzzahl (die Zeilennummer oder der Byte-Offset) ist alles, was Sie zum Fortsetzen benötigen.
Validieren Sie Ihr JSONL vor dem Laden
Erkennen Sie fehlerhafte Datensätze, bevor sie Ihre Pipeline erreichen. Verwenden Sie unsere kostenlosen Online-Tools, um JSONL-Dateien in Ihrem Browser zu validieren, anzuzeigen und zu konvertieren.