JSONL nelle Pipeline ETL: Kafka, Airflow e Data Warehouse
Una guida pratica all'uso di JSONL (JSON Lines) come formato di interscambio nelle moderne pipeline ETL. Impara a produrre e consumare eventi JSONL in Kafka, orchestrare trasformazioni JSONL nei DAG di Airflow e caricare dati JSONL in Snowflake, BigQuery e Redshift.
Ultimo aggiornamento: Febbraio 2026
Perché JSONL è il Formato di Interscambio Ideale per ETL
Le pipeline Extract-Transform-Load (ETL) spostano dati tra sistemi che raramente condividono lo stesso schema. JSONL è particolarmente adatto a questo compito perché ogni riga è un documento JSON autonomo. Non ci sono intestazioni da perdere, né delimitatori da escapare, né record multilinea da riassemblare dopo un fallimento parziale. Ogni riga può essere validata, trasformata e instradata indipendentemente, che è esattamente ciò di cui i sistemi distribuiti come Kafka e Airflow hanno bisogno.
A differenza di CSV, JSONL preserva strutture nidificate, array e valori tipizzati senza ambiguità. A differenza di Parquet o Avro, JSONL è leggibile dall'uomo e non richiede strumenti speciali per l'ispezione. Queste proprietà rendono JSONL il formato naturale per il collegamento tra le fasi della pipeline: i topic Kafka trasportano messaggi JSONL, i task Airflow leggono e scrivono file JSONL su object storage, e i data warehouse ingeriscono JSONL attraverso i loro comandi nativi di caricamento in blocco. In questa guida imparerai pattern concreti per ciascuna di queste fasi con codice pronto per la produzione.
Kafka + JSONL: Streaming di Eventi
Apache Kafka è la spina dorsale dell'ETL in tempo reale. I producer emettono eventi come messaggi codificati in JSONL (un oggetto JSON per messaggio Kafka), e i consumer li leggono riga per riga nei sistemi a valle. Poiché ogni messaggio Kafka è un singolo record JSONL, il formato si allinea perfettamente con l'architettura basata su log di Kafka. La serializzazione è banale, e i consumer possono analizzare ogni messaggio indipendentemente senza buffering.
Un producer Kafka che serializza dizionari Python come messaggi codificati in JSONL. Ogni messaggio è una singola riga JSON inviata a un topic Kafka. Il value_serializer gestisce la codifica automaticamente, quindi basta passare un dict al metodo send().
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Read a JSONL file and publish each line as a Kafka messagewith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
Un consumer Kafka che legge messaggi JSONL e li scrive in un file JSONL locale in batch. Il batching riduce l'overhead di I/O e permette ai sistemi a valle di elaborare blocchi più grandi alla volta. Il consumer committa gli offset solo dopo che un batch è stato completamente scritto, garantendo una consegna at-least-once.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Write batch to JSONL filewith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Flush remaining recordsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow: Orchestrazione dei Task JSONL
Apache Airflow è lo strumento di orchestrazione standard per l'ETL batch. Una tipica pipeline Airflow basata su JSONL ha tre fasi: estrarre dati grezzi in JSONL, trasformare il file JSONL (filtrare, arricchire, ristrutturare) e caricare il risultato in un data warehouse o object store. Ogni fase legge e scrive file JSONL, il che rende lo stato intermedio ispezionabile e i singoli task ri-eseguibili indipendentemente.
Un DAG Airflow completo che estrae dati da un'API, li trasforma come JSONL e carica il risultato nel cloud storage. Ogni task comunica attraverso file JSONL archiviati su storage condiviso, rendendo la pipeline facile da debuggare e riprovare.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extract data from API and write JSONL to local disk."""ds = context['ds'] # execution date: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Read raw JSONL, filter and enrich, write cleaned JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Upload cleaned JSONL to S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
Per file JSONL di grandi dimensioni che non stanno in memoria, usa una trasformazione in streaming che elabora una riga alla volta. Questo pattern mantiene l'uso della memoria costante indipendentemente dalla dimensione del file e funziona bene all'interno dei task Airflow dove le risorse dei worker possono essere limitate.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Stream-transform a JSONL file with a user-defined function.Return None from transform_fn to skip a record."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Usage inside an Airflow taskdef enrich(record):if record.get('amount', 0) < 0:return None # skip invalidrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
Caricamento JSONL nei Data Warehouse
I moderni data warehouse hanno un supporto di prima classe per l'ingestione di file JSONL. Snowflake, BigQuery e Redshift possono tutti caricare JSONL direttamente dal cloud storage, analizzare il JSON al volo e archiviare i risultati in tabelle strutturate. Questo elimina la necessità di una fase di conversione CSV intermedia e preserva i tipi di dati nidificati.
Snowflake
COPY INTOSnowflake ingerisce JSONL tramite il comando COPY INTO da stage S3, GCS o Azure Blob. Usa l'opzione FILE_FORMAT con TYPE = 'JSON' per analizzare ogni riga come un documento JSON separato. Il tipo di colonna VARIANT memorizza nativamente i dati semi-strutturati.
BigQuery
NativoBigQuery carica file JSONL da Google Cloud Storage con rilevamento automatico dello schema. Il comando bq load con --source_format=NEWLINE_DELIMITED_JSON gestisce JSONL nativamente. BigQuery può anche rilevare automaticamente i tipi delle colonne dai valori JSON.
Redshift
COPYAmazon Redshift supporta il caricamento JSONL da S3 usando il comando COPY con FORMAT AS JSON. Puoi fornire un file JSONPaths per mappare le chiavi JSON alle colonne della tabella, dandoti un controllo fine sul processo di caricamento.
Carica un file JSONL da uno stage S3 in una tabella Snowflake. Ogni riga diventa una riga con una colonna VARIANT contenente il JSON analizzato. Puoi poi appiattire la colonna VARIANT in colonne tipizzate usando SQL di Snowflake.
-- Create a stage pointing to your S3 bucketCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Create the target tableCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Load JSONL files from the stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Query the loaded dataSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
Usa il client Python di BigQuery per caricare un file JSONL da Google Cloud Storage in una tabella BigQuery. Il client gestisce il rilevamento dello schema, il partizionamento e la segnalazione degli errori. Questo approccio si integra bene con i task Airflow.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Load from GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Wait for completiontable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
Tolleranza ai Guasti e Idempotenza
Le pipeline ETL falliscono. Le reti cadono, le API vanno in timeout e i worker esauriscono la memoria. JSONL rende il recovery semplice perché ogni riga è indipendente. Puoi salvare la tua posizione (il numero di riga), riprendere dall'ultima riga elaborata con successo e evitare di rielaborare i dati. Combinato con scritture idempotenti, questo ti dà semantiche exactly-once anche con consegna at-least-once.
Un processore basato su checkpoint che traccia il progresso tramite numero di riga. Se la pipeline fallisce, riprende dall'ultimo checkpoint committato piuttosto che rielaborare l'intero file. Il file di checkpoint è un semplice file di testo contenente l'ultimo numero di riga elaborato con successo.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Read the last committed line number."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomically save the current checkpoint."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomic on POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Process a JSONL file with checkpoint-based resumption."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # skip already-processed linesline = line.strip()if not line:continuerecord = json.loads(line)# --- your transform logic ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Final checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Usageprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
Il pattern dei checkpoint funziona con qualsiasi fase della pipeline JSONL. Per i consumer Kafka, sostituisci il checkpoint su file con gli offset committati. Per Airflow, archivia il checkpoint in XCom o in un database esterno così che i task ritentati riprendano dalla posizione corretta. L'intuizione chiave è che il formato basato su righe di JSONL rende banale il tracciamento della posizione: un singolo intero (il numero di riga o l'offset in byte) è tutto ciò che serve per riprendere.
Valida il Tuo JSONL Prima del Caricamento
Intercetta i record malformati prima che entrino nella tua pipeline. Usa i nostri strumenti online gratuiti per validare, visualizzare e convertire file JSONL nel tuo browser.