JSONL dans les pipelines ETL : Kafka, Airflow et entrepots de donnees
Un guide pratique pour utiliser JSONL (JSON Lines) comme format d'echange dans les pipelines ETL modernes. Apprenez a produire et consommer des evenements JSONL dans Kafka, orchestrer des transformations JSONL dans les DAG Airflow, et charger des donnees JSONL dans Snowflake, BigQuery et Redshift.
Derniere mise a jour : fevrier 2026
Pourquoi JSONL est le format d'echange ETL ideal
Les pipelines Extract-Transform-Load (ETL) deplacent des donnees entre des systemes qui partagent rarement le meme schema. JSONL est particulierement adapte a cette tache car chaque ligne est un document JSON autonome. Il n'y a pas d'en-tetes a perdre, pas de delimiteurs a echapper, et pas d'enregistrements multi-lignes a reassembler apres une defaillance partielle. Chaque ligne peut etre validee, transformee et routee independamment, ce qui est exactement ce dont les systemes distribues comme Kafka et Airflow ont besoin.
Contrairement au CSV, JSONL preserve les structures imbriquees, les tableaux et les valeurs typees sans ambiguite. Contrairement a Parquet ou Avro, JSONL est lisible par l'homme et ne necessite aucun outil special pour l'inspection. Ces proprietes font du JSONL le format naturel pour la colle entre les etapes du pipeline : les topics Kafka transportent des messages JSONL, les taches Airflow lisent et ecrivent des fichiers JSONL dans le stockage objet, et les entrepots de donnees ingerent du JSONL via leurs commandes de chargement en masse natives. Dans ce guide, vous apprendrez des modeles concrets pour chacune de ces etapes avec du code pret pour la production.
Kafka + JSONL : streaming d'evenements
Apache Kafka est l'epine dorsale de l'ETL en temps reel. Les producteurs emettent des evenements sous forme de messages encodes en JSONL (un objet JSON par message Kafka), et les consommateurs les lisent ligne par ligne dans les systemes en aval. Comme chaque message Kafka est un seul enregistrement JSONL, le format s'aligne parfaitement avec l'architecture basee sur les journaux de Kafka. La serialisation est triviale, et les consommateurs peuvent analyser chaque message independamment sans mise en tampon.
Un producteur Kafka qui serialise des dictionnaires Python en messages encodes en JSONL. Chaque message est une seule ligne JSON envoyee a un topic Kafka. Le value_serializer gere l'encodage automatiquement, donc vous passez simplement un dict a la methode send().
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Lire un fichier JSONL et publier chaque ligne comme message Kafkawith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
Un consommateur Kafka qui lit les messages JSONL et les ecrit dans un fichier JSONL local par lots. Le regroupement par lots reduit la surcharge d'E/S et permet aux systemes en aval de traiter des morceaux plus grands a la fois. Le consommateur ne valide les offsets qu'apres qu'un lot ait ete entierement ecrit, assurant une livraison au-moins-une-fois.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Ecrire le lot dans un fichier JSONLwith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Vider les enregistrements restantsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow : orchestration des taches JSONL
Apache Airflow est l'outil d'orchestration standard pour l'ETL par lots. Un pipeline Airflow typique base sur JSONL comporte trois etapes : extraire les donnees brutes en JSONL, transformer le fichier JSONL (filtrer, enrichir, reformer), et charger le resultat dans un entrepot de donnees ou un stockage objet. Chaque etape lit et ecrit des fichiers JSONL, ce qui rend l'etat intermediaire inspecable et les taches individuelles re-executables independamment.
Un DAG Airflow complet qui extrait des donnees d'une API, les transforme en JSONL et charge le resultat dans le stockage cloud. Chaque tache communique via des fichiers JSONL stockes sur un stockage partage, rendant le pipeline facile a deboguer et a relancer.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extraire les donnees de l'API et ecrire du JSONL sur le disque local."""ds = context['ds'] # date d'execution : AAAA-MM-JJresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Lire le JSONL brut, filtrer et enrichir, ecrire le JSONL nettoye."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Telecharger le JSONL nettoye vers S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
Pour les gros fichiers JSONL qui ne tiennent pas en memoire, utilisez une transformation en streaming qui traite une ligne a la fois. Ce modele maintient une utilisation memoire constante quelle que soit la taille du fichier et fonctionne bien dans les taches Airflow ou les ressources des workers peuvent etre limitees.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Transformer en streaming un fichier JSONL avec une fonction definie par l'utilisateur.Retournez None depuis transform_fn pour ignorer un enregistrement."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Utilisation dans une tache Airflowdef enrich(record):if record.get('amount', 0) < 0:return None # ignorer les invalidesrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
Charger du JSONL dans les entrepots de donnees
Les entrepots de donnees modernes disposent d'un support natif pour l'ingestion de fichiers JSONL. Snowflake, BigQuery et Redshift peuvent tous charger du JSONL directement depuis le stockage cloud, analyser le JSON a la volee et stocker les resultats dans des tables structurees. Cela elimine le besoin d'une etape de conversion CSV intermediaire et preserve les types de donnees imbriques.
Snowflake
COPY INTOSnowflake ingere du JSONL via la commande COPY INTO depuis des stages S3, GCS ou Azure Blob. Utilisez l'option FILE_FORMAT avec TYPE = 'JSON' pour analyser chaque ligne comme un document JSON separe. Le type de colonne VARIANT stocke les donnees semi-structurees nativement.
BigQuery
NatifBigQuery charge les fichiers JSONL depuis Google Cloud Storage avec detection automatique du schema. La commande bq load avec --source_format=NEWLINE_DELIMITED_JSON gere le JSONL nativement. BigQuery peut aussi detecter automatiquement les types de colonnes a partir des valeurs JSON.
Redshift
COPYAmazon Redshift supporte le chargement JSONL depuis S3 en utilisant la commande COPY avec FORMAT AS JSON. Vous pouvez fournir un fichier JSONPaths pour mapper les cles JSON aux colonnes de table, vous donnant un controle fin sur le processus de chargement.
Chargez un fichier JSONL depuis un stage S3 dans une table Snowflake. Chaque ligne devient une ligne avec une colonne VARIANT contenant le JSON analyse. Vous pouvez ensuite aplatir la colonne VARIANT en colonnes typees en utilisant le SQL Snowflake.
-- Creer un stage pointant vers votre bucket S3CREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Creer la table cibleCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Charger les fichiers JSONL depuis le stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Interroger les donnees chargeesSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
Utilisez le client Python BigQuery pour charger un fichier JSONL depuis Google Cloud Storage dans une table BigQuery. Le client gere la detection de schema, le partitionnement et le rapport d'erreurs. Cette approche s'integre bien avec les taches Airflow.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Charger depuis GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Attendre la fintable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
Tolerance aux pannes et idempotence
Les pipelines ETL echouent. Les reseaux coupent, les API expirent et les workers manquent de memoire. JSONL rend la reprise simple car chaque ligne est independante. Vous pouvez enregistrer votre position (le numero de ligne), reprendre depuis la derniere ligne reussie et eviter de retraiter les donnees. Combine avec des ecritures idempotentes, cela vous donne une semantique exactement-une-fois meme avec une livraison au-moins-une-fois.
Un processeur base sur les points de controle qui suit la progression par numero de ligne. Si le pipeline echoue, il reprend depuis le dernier point de controle valide plutot que de retraiter le fichier entier. Le fichier de point de controle est un simple fichier texte contenant le dernier numero de ligne traite avec succes.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Lire le dernier numero de ligne valide."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Sauvegarder atomiquement le point de controle actuel."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomique sur POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Traiter un fichier JSONL avec reprise basee sur les points de controle."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # ignorer les lignes deja traiteesline = line.strip()if not line:continuerecord = json.loads(line)# --- votre logique de transformation ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Point de controle finalsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Utilisationprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
Le modele de point de controle fonctionne avec n'importe quelle etape de pipeline JSONL. Pour les consommateurs Kafka, remplacez le point de controle fichier par des offsets valides. Pour Airflow, stockez le point de controle dans XCom ou une base de donnees externe afin que les taches relancees reprennent depuis la bonne position. L'idee cle est que le format ligne par ligne de JSONL rend le suivi de position trivial : un seul entier (le numero de ligne ou le decalage en octets) est tout ce dont vous avez besoin pour reprendre.
Validez votre JSONL avant le chargement
Detectez les enregistrements malformes avant qu'ils n'atteignent votre pipeline. Utilisez nos outils en ligne gratuits pour valider, visualiser et convertir des fichiers JSONL dans votre navigateur.