JSONL dans les pipelines ETL : Kafka, Airflow et entrepots de donnees

Un guide pratique pour utiliser JSONL (JSON Lines) comme format d'echange dans les pipelines ETL modernes. Apprenez a produire et consommer des evenements JSONL dans Kafka, orchestrer des transformations JSONL dans les DAG Airflow, et charger des donnees JSONL dans Snowflake, BigQuery et Redshift.

Derniere mise a jour : fevrier 2026

Pourquoi JSONL est le format d'echange ETL ideal

Les pipelines Extract-Transform-Load (ETL) deplacent des donnees entre des systemes qui partagent rarement le meme schema. JSONL est particulierement adapte a cette tache car chaque ligne est un document JSON autonome. Il n'y a pas d'en-tetes a perdre, pas de delimiteurs a echapper, et pas d'enregistrements multi-lignes a reassembler apres une defaillance partielle. Chaque ligne peut etre validee, transformee et routee independamment, ce qui est exactement ce dont les systemes distribues comme Kafka et Airflow ont besoin.

Contrairement au CSV, JSONL preserve les structures imbriquees, les tableaux et les valeurs typees sans ambiguite. Contrairement a Parquet ou Avro, JSONL est lisible par l'homme et ne necessite aucun outil special pour l'inspection. Ces proprietes font du JSONL le format naturel pour la colle entre les etapes du pipeline : les topics Kafka transportent des messages JSONL, les taches Airflow lisent et ecrivent des fichiers JSONL dans le stockage objet, et les entrepots de donnees ingerent du JSONL via leurs commandes de chargement en masse natives. Dans ce guide, vous apprendrez des modeles concrets pour chacune de ces etapes avec du code pret pour la production.

Kafka + JSONL : streaming d'evenements

Apache Kafka est l'epine dorsale de l'ETL en temps reel. Les producteurs emettent des evenements sous forme de messages encodes en JSONL (un objet JSON par message Kafka), et les consommateurs les lisent ligne par ligne dans les systemes en aval. Comme chaque message Kafka est un seul enregistrement JSONL, le format s'aligne parfaitement avec l'architecture basee sur les journaux de Kafka. La serialisation est triviale, et les consommateurs peuvent analyser chaque message independamment sans mise en tampon.

Un producteur Kafka qui serialise des dictionnaires Python en messages encodes en JSONL. Chaque message est une seule ligne JSON envoyee a un topic Kafka. Le value_serializer gere l'encodage automatiquement, donc vous passez simplement un dict a la methode send().

Producteur Kafka JSONL
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Lire un fichier JSONL et publier chaque ligne comme message Kafka
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

Un consommateur Kafka qui lit les messages JSONL et les ecrit dans un fichier JSONL local par lots. Le regroupement par lots reduit la surcharge d'E/S et permet aux systemes en aval de traiter des morceaux plus grands a la fois. Le consommateur ne valide les offsets qu'apres qu'un lot ait ete entierement ecrit, assurant une livraison au-moins-une-fois.

Consommateur Kafka JSONL avec ecritures par lots
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Ecrire le lot dans un fichier JSONL
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Vider les enregistrements restants
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow : orchestration des taches JSONL

Apache Airflow est l'outil d'orchestration standard pour l'ETL par lots. Un pipeline Airflow typique base sur JSONL comporte trois etapes : extraire les donnees brutes en JSONL, transformer le fichier JSONL (filtrer, enrichir, reformer), et charger le resultat dans un entrepot de donnees ou un stockage objet. Chaque etape lit et ecrit des fichiers JSONL, ce qui rend l'etat intermediaire inspecable et les taches individuelles re-executables independamment.

Un DAG Airflow complet qui extrait des donnees d'une API, les transforme en JSONL et charge le resultat dans le stockage cloud. Chaque tache communique via des fichiers JSONL stockes sur un stockage partage, rendant le pipeline facile a deboguer et a relancer.

DAG Airflow : pipeline ETL JSONL
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extraire les donnees de l'API et ecrire du JSONL sur le disque local."""
ds = context['ds'] # date d'execution : AAAA-MM-JJ
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Lire le JSONL brut, filtrer et enrichir, ecrire le JSONL nettoye."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Telecharger le JSONL nettoye vers S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

Pour les gros fichiers JSONL qui ne tiennent pas en memoire, utilisez une transformation en streaming qui traite une ligne a la fois. Ce modele maintient une utilisation memoire constante quelle que soit la taille du fichier et fonctionne bien dans les taches Airflow ou les ressources des workers peuvent etre limitees.

Tache de transformation JSONL en streaming
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Transformer en streaming un fichier JSONL avec une fonction definie par l'utilisateur.
Retournez None depuis transform_fn pour ignorer un enregistrement."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Utilisation dans une tache Airflow
def enrich(record):
if record.get('amount', 0) < 0:
return None # ignorer les invalides
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

Charger du JSONL dans les entrepots de donnees

Les entrepots de donnees modernes disposent d'un support natif pour l'ingestion de fichiers JSONL. Snowflake, BigQuery et Redshift peuvent tous charger du JSONL directement depuis le stockage cloud, analyser le JSON a la volee et stocker les resultats dans des tables structurees. Cela elimine le besoin d'une etape de conversion CSV intermediaire et preserve les types de donnees imbriques.

Snowflake

COPY INTO

Snowflake ingere du JSONL via la commande COPY INTO depuis des stages S3, GCS ou Azure Blob. Utilisez l'option FILE_FORMAT avec TYPE = 'JSON' pour analyser chaque ligne comme un document JSON separe. Le type de colonne VARIANT stocke les donnees semi-structurees nativement.

BigQuery

Natif

BigQuery charge les fichiers JSONL depuis Google Cloud Storage avec detection automatique du schema. La commande bq load avec --source_format=NEWLINE_DELIMITED_JSON gere le JSONL nativement. BigQuery peut aussi detecter automatiquement les types de colonnes a partir des valeurs JSON.

Redshift

COPY

Amazon Redshift supporte le chargement JSONL depuis S3 en utilisant la commande COPY avec FORMAT AS JSON. Vous pouvez fournir un fichier JSONPaths pour mapper les cles JSON aux colonnes de table, vous donnant un controle fin sur le processus de chargement.

Chargez un fichier JSONL depuis un stage S3 dans une table Snowflake. Chaque ligne devient une ligne avec une colonne VARIANT contenant le JSON analyse. Vous pouvez ensuite aplatir la colonne VARIANT en colonnes typees en utilisant le SQL Snowflake.

Snowflake : COPY JSONL depuis S3
-- Creer un stage pointant vers votre bucket S3
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Creer la table cible
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Charger les fichiers JSONL depuis le stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Interroger les donnees chargees
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

Utilisez le client Python BigQuery pour charger un fichier JSONL depuis Google Cloud Storage dans une table BigQuery. Le client gere la detection de schema, le partitionnement et le rapport d'erreurs. Cette approche s'integre bien avec les taches Airflow.

BigQuery : charger du JSONL avec le client Python
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Charger depuis GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Attendre la fin
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

Tolerance aux pannes et idempotence

Les pipelines ETL echouent. Les reseaux coupent, les API expirent et les workers manquent de memoire. JSONL rend la reprise simple car chaque ligne est independante. Vous pouvez enregistrer votre position (le numero de ligne), reprendre depuis la derniere ligne reussie et eviter de retraiter les donnees. Combine avec des ecritures idempotentes, cela vous donne une semantique exactement-une-fois meme avec une livraison au-moins-une-fois.

Un processeur base sur les points de controle qui suit la progression par numero de ligne. Si le pipeline echoue, il reprend depuis le dernier point de controle valide plutot que de retraiter le fichier entier. Le fichier de point de controle est un simple fichier texte contenant le dernier numero de ligne traite avec succes.

Traitement JSONL base sur les points de controle
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Lire le dernier numero de ligne valide."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Sauvegarder atomiquement le point de controle actuel."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomique sur POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Traiter un fichier JSONL avec reprise basee sur les points de controle."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # ignorer les lignes deja traitees
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- votre logique de transformation ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Point de controle final
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Utilisation
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

Le modele de point de controle fonctionne avec n'importe quelle etape de pipeline JSONL. Pour les consommateurs Kafka, remplacez le point de controle fichier par des offsets valides. Pour Airflow, stockez le point de controle dans XCom ou une base de donnees externe afin que les taches relancees reprennent depuis la bonne position. L'idee cle est que le format ligne par ligne de JSONL rend le suivi de position trivial : un seul entier (le numero de ligne ou le decalage en octets) est tout ce dont vous avez besoin pour reprendre.

Validez votre JSONL avant le chargement

Detectez les enregistrements malformes avant qu'ils n'atteignent votre pipeline. Utilisez nos outils en ligne gratuits pour valider, visualiser et convertir des fichiers JSONL dans votre navigateur.

Inspectez des fichiers JSONL instantanement

Visualisez, validez et convertissez des fichiers JSONL jusqu'a 1 Go directement dans votre navigateur. Sans telechargement, 100% prive.

Questions frequemment posees

Pipeline ETL JSONL — Kafka, Airflow, BigQuery, Snowflake ...