JSONL 在 ETL 管線中的應用:Kafka、Airflow 與資料倉儲

使用 JSONL(JSON Lines)作為現代 ETL 管線交換格式的實作指南。學習如何在 Kafka 中產生和消費 JSONL 事件、在 Airflow DAG 中編排 JSONL 轉換,以及將 JSONL 資料載入 Snowflake、BigQuery 和 Redshift。

最後更新:2026 年 2 月

為什麼 JSONL 是理想的 ETL 交換格式

擷取-轉換-載入(ETL)管線在很少共用相同 schema 的系統之間移動資料。JSONL 特別適合這項任務,因為每一行都是一個獨立的 JSON 文件。不會遺失標頭、不需要跳脫分隔符號,也不會在部分失敗後需要重新組合多行記錄。每行都可以獨立驗證、轉換和路由,這正是 Kafka 和 Airflow 等分散式系統所需要的。

與 CSV 不同,JSONL 可以毫無歧義地保留巢狀結構、陣列和型別化的值。與 Parquet 或 Avro 不同,JSONL 是人類可讀的,不需要特殊工具來檢查。這些特性使 JSONL 成為管線各階段之間的天然黏合格式:Kafka 主題攜帶 JSONL 訊息、Airflow 任務在物件儲存中讀寫 JSONL 檔案,資料倉儲透過原生批次載入命令匯入 JSONL。在本指南中,你將學習這些階段中每一個的具體模式和正式環境可用的程式碼。

Kafka + JSONL:事件串流

Apache Kafka 是即時 ETL 的骨幹。生產者將事件以 JSONL 編碼的訊息發出(每個 Kafka 訊息一個 JSON 物件),消費者逐行讀取到下游系統。由於每個 Kafka 訊息就是一筆 JSONL 記錄,格式與 Kafka 基於日誌的架構完美對齊。序列化很簡單,消費者可以獨立解析每則訊息而無需緩衝。

一個將 Python 字典序列化為 JSONL 編碼訊息的 Kafka 生產者。每則訊息是發送到 Kafka 主題的單一 JSON 行。value_serializer 自動處理編碼,因此你只需將字典傳遞給 send() 方法。

Kafka JSONL 生產者
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Read a JSONL file and publish each line as a Kafka message
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

一個讀取 JSONL 訊息並批次寫入本機 JSONL 檔案的 Kafka 消費者。批次處理減少 I/O 開銷,並允許下游系統一次處理更大的資料塊。消費者僅在批次完全寫入後才提交偏移量,確保至少一次傳遞。

Kafka JSONL 消費者(批次寫入)
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Write batch to JSONL file
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Flush remaining records
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow:編排 JSONL 任務

Apache Airflow 是批次 ETL 的標準編排工具。典型的基於 JSONL 的 Airflow 管線有三個階段:將原始資料擷取為 JSONL、轉換 JSONL 檔案(過濾、豐富、重塑),以及將結果載入資料倉儲或物件儲存。每個階段讀寫 JSONL 檔案,使中間狀態可檢查且各個任務可獨立重新執行。

一個完整的 Airflow DAG,從 API 擷取資料、以 JSONL 格式轉換,然後將結果載入雲端儲存。每個任務透過儲存在共享儲存空間的 JSONL 檔案進行通訊,使管線易於除錯和重試。

Airflow DAG:JSONL ETL 管線
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extract data from API and write JSONL to local disk."""
ds = context['ds'] # execution date: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Read raw JSONL, filter and enrich, write cleaned JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Upload cleaned JSONL to S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

對於無法載入記憶體的大型 JSONL 檔案,使用串流轉換逐行處理。這個模式無論檔案大小如何都保持恆定的記憶體使用量,在 Airflow 任務中 worker 資源可能有限的情況下運作良好。

串流 JSONL 轉換任務
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Stream-transform a JSONL file with a user-defined function.
Return None from transform_fn to skip a record."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Usage inside an Airflow task
def enrich(record):
if record.get('amount', 0) < 0:
return None # skip invalid
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

將 JSONL 載入資料倉儲

現代資料倉儲對匯入 JSONL 檔案有一流的支援。Snowflake、BigQuery 和 Redshift 都可以直接從雲端儲存載入 JSONL、即時解析 JSON,並將結果儲存在結構化表中。這消除了中間 CSV 轉換步驟的需要,並保留了巢狀資料型別。

Snowflake

COPY INTO

Snowflake 透過 COPY INTO 命令從 S3、GCS 或 Azure Blob stage 匯入 JSONL。使用 FILE_FORMAT 選項搭配 TYPE = 'JSON' 將每行解析為單獨的 JSON 文件。VARIANT 欄位型別原生儲存半結構化資料。

BigQuery

原生

BigQuery 從 Google Cloud Storage 載入 JSONL 檔案,支援自動 schema 偵測。bq load 命令搭配 --source_format=NEWLINE_DELIMITED_JSON 原生處理 JSONL。BigQuery 也可以從 JSON 值自動偵測欄位型別。

Redshift

COPY

Amazon Redshift 使用 COPY 命令搭配 FORMAT AS JSON 支援從 S3 載入 JSONL。你可以提供 JSONPaths 檔案將 JSON 鍵對應到表欄位,讓你精細控制載入過程。

從 S3 stage 將 JSONL 檔案載入 Snowflake 表。每行成為包含已解析 JSON 的 VARIANT 欄位的一行。然後你可以使用 Snowflake SQL 將 VARIANT 欄位展平為型別化欄位。

Snowflake:從 S3 COPY JSONL
-- Create a stage pointing to your S3 bucket
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Create the target table
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load JSONL files from the stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Query the loaded data
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

使用 BigQuery Python 客戶端從 Google Cloud Storage 將 JSONL 檔案載入 BigQuery 表。客戶端處理 schema 偵測、分區和錯誤報告。此方式與 Airflow 任務整合良好。

BigQuery:使用 Python Client 載入 JSONL
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Load from GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Wait for completion
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

容錯與冪等性

ETL 管線會失敗。網路斷線、API 逾時、worker 記憶體不足。JSONL 使復原變得簡單,因為每一行都是獨立的。你可以設定檢查點(行號)、從最後成功的行恢復,並避免重新處理資料。搭配冪等寫入,即使在至少一次傳遞的情況下也能實現恰好一次語意。

一個基於檢查點的處理器,按行號追蹤進度。如果管線失敗,它從最後提交的檢查點恢復,而不是重新處理整個檔案。檢查點檔案是一個簡單的文字檔案,包含最後成功處理的行號。

基於檢查點的 JSONL 處理
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Read the last committed line number."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Atomically save the current checkpoint."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomic on POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Process a JSONL file with checkpoint-based resumption."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # skip already-processed lines
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- your transform logic ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Final checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Usage
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

檢查點模式適用於任何 JSONL 管線階段。對於 Kafka 消費者,用提交的偏移量替代檔案檢查點。對於 Airflow,將檢查點儲存在 XCom 或外部資料庫中,以便重試的任務從正確的位置恢復。關鍵洞察是 JSONL 基於行的格式使位置追蹤變得極其簡單:一個整數(行號或位元組偏移量)就足以恢復處理。

載入前驗證你的 JSONL

在格式錯誤的記錄進入管線之前捕獲它們。使用我們的免費線上工具在瀏覽器中驗證、檢視和轉換 JSONL 檔案。

即時檢查 JSONL 檔案

在瀏覽器中即時檢視、驗證和轉換高達 1GB 的 JSONL 檔案。無需上傳,100% 隱私保護。

常見問題

JSONL ETL 管線 — Kafka、Airflow、BigQuery、Snowflake | jsonl.co