JSONL nelle Pipeline ETL: Kafka, Airflow e Data Warehouse

Una guida pratica all'uso di JSONL (JSON Lines) come formato di interscambio nelle moderne pipeline ETL. Impara a produrre e consumare eventi JSONL in Kafka, orchestrare trasformazioni JSONL nei DAG di Airflow e caricare dati JSONL in Snowflake, BigQuery e Redshift.

Ultimo aggiornamento: Febbraio 2026

Perché JSONL è il Formato di Interscambio Ideale per ETL

Le pipeline Extract-Transform-Load (ETL) spostano dati tra sistemi che raramente condividono lo stesso schema. JSONL è particolarmente adatto a questo compito perché ogni riga è un documento JSON autonomo. Non ci sono intestazioni da perdere, né delimitatori da escapare, né record multilinea da riassemblare dopo un fallimento parziale. Ogni riga può essere validata, trasformata e instradata indipendentemente, che è esattamente ciò di cui i sistemi distribuiti come Kafka e Airflow hanno bisogno.

A differenza di CSV, JSONL preserva strutture nidificate, array e valori tipizzati senza ambiguità. A differenza di Parquet o Avro, JSONL è leggibile dall'uomo e non richiede strumenti speciali per l'ispezione. Queste proprietà rendono JSONL il formato naturale per il collegamento tra le fasi della pipeline: i topic Kafka trasportano messaggi JSONL, i task Airflow leggono e scrivono file JSONL su object storage, e i data warehouse ingeriscono JSONL attraverso i loro comandi nativi di caricamento in blocco. In questa guida imparerai pattern concreti per ciascuna di queste fasi con codice pronto per la produzione.

Kafka + JSONL: Streaming di Eventi

Apache Kafka è la spina dorsale dell'ETL in tempo reale. I producer emettono eventi come messaggi codificati in JSONL (un oggetto JSON per messaggio Kafka), e i consumer li leggono riga per riga nei sistemi a valle. Poiché ogni messaggio Kafka è un singolo record JSONL, il formato si allinea perfettamente con l'architettura basata su log di Kafka. La serializzazione è banale, e i consumer possono analizzare ogni messaggio indipendentemente senza buffering.

Un producer Kafka che serializza dizionari Python come messaggi codificati in JSONL. Ogni messaggio è una singola riga JSON inviata a un topic Kafka. Il value_serializer gestisce la codifica automaticamente, quindi basta passare un dict al metodo send().

Producer Kafka JSONL
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Read a JSONL file and publish each line as a Kafka message
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

Un consumer Kafka che legge messaggi JSONL e li scrive in un file JSONL locale in batch. Il batching riduce l'overhead di I/O e permette ai sistemi a valle di elaborare blocchi più grandi alla volta. Il consumer committa gli offset solo dopo che un batch è stato completamente scritto, garantendo una consegna at-least-once.

Consumer Kafka JSONL con Scritture Batch
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Write batch to JSONL file
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Flush remaining records
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow: Orchestrazione dei Task JSONL

Apache Airflow è lo strumento di orchestrazione standard per l'ETL batch. Una tipica pipeline Airflow basata su JSONL ha tre fasi: estrarre dati grezzi in JSONL, trasformare il file JSONL (filtrare, arricchire, ristrutturare) e caricare il risultato in un data warehouse o object store. Ogni fase legge e scrive file JSONL, il che rende lo stato intermedio ispezionabile e i singoli task ri-eseguibili indipendentemente.

Un DAG Airflow completo che estrae dati da un'API, li trasforma come JSONL e carica il risultato nel cloud storage. Ogni task comunica attraverso file JSONL archiviati su storage condiviso, rendendo la pipeline facile da debuggare e riprovare.

DAG Airflow: Pipeline ETL JSONL
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extract data from API and write JSONL to local disk."""
ds = context['ds'] # execution date: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Read raw JSONL, filter and enrich, write cleaned JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Upload cleaned JSONL to S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

Per file JSONL di grandi dimensioni che non stanno in memoria, usa una trasformazione in streaming che elabora una riga alla volta. Questo pattern mantiene l'uso della memoria costante indipendentemente dalla dimensione del file e funziona bene all'interno dei task Airflow dove le risorse dei worker possono essere limitate.

Task di Trasformazione JSONL in Streaming
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Stream-transform a JSONL file with a user-defined function.
Return None from transform_fn to skip a record."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Usage inside an Airflow task
def enrich(record):
if record.get('amount', 0) < 0:
return None # skip invalid
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

Caricamento JSONL nei Data Warehouse

I moderni data warehouse hanno un supporto di prima classe per l'ingestione di file JSONL. Snowflake, BigQuery e Redshift possono tutti caricare JSONL direttamente dal cloud storage, analizzare il JSON al volo e archiviare i risultati in tabelle strutturate. Questo elimina la necessità di una fase di conversione CSV intermedia e preserva i tipi di dati nidificati.

Snowflake

COPY INTO

Snowflake ingerisce JSONL tramite il comando COPY INTO da stage S3, GCS o Azure Blob. Usa l'opzione FILE_FORMAT con TYPE = 'JSON' per analizzare ogni riga come un documento JSON separato. Il tipo di colonna VARIANT memorizza nativamente i dati semi-strutturati.

BigQuery

Nativo

BigQuery carica file JSONL da Google Cloud Storage con rilevamento automatico dello schema. Il comando bq load con --source_format=NEWLINE_DELIMITED_JSON gestisce JSONL nativamente. BigQuery può anche rilevare automaticamente i tipi delle colonne dai valori JSON.

Redshift

COPY

Amazon Redshift supporta il caricamento JSONL da S3 usando il comando COPY con FORMAT AS JSON. Puoi fornire un file JSONPaths per mappare le chiavi JSON alle colonne della tabella, dandoti un controllo fine sul processo di caricamento.

Carica un file JSONL da uno stage S3 in una tabella Snowflake. Ogni riga diventa una riga con una colonna VARIANT contenente il JSON analizzato. Puoi poi appiattire la colonna VARIANT in colonne tipizzate usando SQL di Snowflake.

Snowflake: COPY JSONL da S3
-- Create a stage pointing to your S3 bucket
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Create the target table
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load JSONL files from the stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Query the loaded data
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

Usa il client Python di BigQuery per caricare un file JSONL da Google Cloud Storage in una tabella BigQuery. Il client gestisce il rilevamento dello schema, il partizionamento e la segnalazione degli errori. Questo approccio si integra bene con i task Airflow.

BigQuery: Carica JSONL con il Client Python
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Load from GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Wait for completion
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

Tolleranza ai Guasti e Idempotenza

Le pipeline ETL falliscono. Le reti cadono, le API vanno in timeout e i worker esauriscono la memoria. JSONL rende il recovery semplice perché ogni riga è indipendente. Puoi salvare la tua posizione (il numero di riga), riprendere dall'ultima riga elaborata con successo e evitare di rielaborare i dati. Combinato con scritture idempotenti, questo ti dà semantiche exactly-once anche con consegna at-least-once.

Un processore basato su checkpoint che traccia il progresso tramite numero di riga. Se la pipeline fallisce, riprende dall'ultimo checkpoint committato piuttosto che rielaborare l'intero file. Il file di checkpoint è un semplice file di testo contenente l'ultimo numero di riga elaborato con successo.

Elaborazione JSONL Basata su Checkpoint
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Read the last committed line number."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Atomically save the current checkpoint."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomic on POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Process a JSONL file with checkpoint-based resumption."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # skip already-processed lines
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- your transform logic ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Final checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Usage
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

Il pattern dei checkpoint funziona con qualsiasi fase della pipeline JSONL. Per i consumer Kafka, sostituisci il checkpoint su file con gli offset committati. Per Airflow, archivia il checkpoint in XCom o in un database esterno così che i task ritentati riprendano dalla posizione corretta. L'intuizione chiave è che il formato basato su righe di JSONL rende banale il tracciamento della posizione: un singolo intero (il numero di riga o l'offset in byte) è tutto ciò che serve per riprendere.

Valida il Tuo JSONL Prima del Caricamento

Intercetta i record malformati prima che entrino nella tua pipeline. Usa i nostri strumenti online gratuiti per validare, visualizzare e convertire file JSONL nel tuo browser.

Ispeziona File JSONL Istantaneamente

Visualizza, valida e converti file JSONL fino a 1GB direttamente nel tuo browser. Nessun upload necessario, 100% privato.

Domande Frequenti

Pipeline ETL JSONL — Kafka, Airflow, BigQuery, Snowflake ...