JSONL 在 ETL 管線中的應用:Kafka、Airflow 與資料倉儲
使用 JSONL(JSON Lines)作為現代 ETL 管線交換格式的實作指南。學習如何在 Kafka 中產生和消費 JSONL 事件、在 Airflow DAG 中編排 JSONL 轉換,以及將 JSONL 資料載入 Snowflake、BigQuery 和 Redshift。
最後更新:2026 年 2 月
為什麼 JSONL 是理想的 ETL 交換格式
擷取-轉換-載入(ETL)管線在很少共用相同 schema 的系統之間移動資料。JSONL 特別適合這項任務,因為每一行都是一個獨立的 JSON 文件。不會遺失標頭、不需要跳脫分隔符號,也不會在部分失敗後需要重新組合多行記錄。每行都可以獨立驗證、轉換和路由,這正是 Kafka 和 Airflow 等分散式系統所需要的。
與 CSV 不同,JSONL 可以毫無歧義地保留巢狀結構、陣列和型別化的值。與 Parquet 或 Avro 不同,JSONL 是人類可讀的,不需要特殊工具來檢查。這些特性使 JSONL 成為管線各階段之間的天然黏合格式:Kafka 主題攜帶 JSONL 訊息、Airflow 任務在物件儲存中讀寫 JSONL 檔案,資料倉儲透過原生批次載入命令匯入 JSONL。在本指南中,你將學習這些階段中每一個的具體模式和正式環境可用的程式碼。
Kafka + JSONL:事件串流
Apache Kafka 是即時 ETL 的骨幹。生產者將事件以 JSONL 編碼的訊息發出(每個 Kafka 訊息一個 JSON 物件),消費者逐行讀取到下游系統。由於每個 Kafka 訊息就是一筆 JSONL 記錄,格式與 Kafka 基於日誌的架構完美對齊。序列化很簡單,消費者可以獨立解析每則訊息而無需緩衝。
一個將 Python 字典序列化為 JSONL 編碼訊息的 Kafka 生產者。每則訊息是發送到 Kafka 主題的單一 JSON 行。value_serializer 自動處理編碼,因此你只需將字典傳遞給 send() 方法。
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Read a JSONL file and publish each line as a Kafka messagewith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
一個讀取 JSONL 訊息並批次寫入本機 JSONL 檔案的 Kafka 消費者。批次處理減少 I/O 開銷,並允許下游系統一次處理更大的資料塊。消費者僅在批次完全寫入後才提交偏移量,確保至少一次傳遞。
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Write batch to JSONL filewith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Flush remaining recordsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow:編排 JSONL 任務
Apache Airflow 是批次 ETL 的標準編排工具。典型的基於 JSONL 的 Airflow 管線有三個階段:將原始資料擷取為 JSONL、轉換 JSONL 檔案(過濾、豐富、重塑),以及將結果載入資料倉儲或物件儲存。每個階段讀寫 JSONL 檔案,使中間狀態可檢查且各個任務可獨立重新執行。
一個完整的 Airflow DAG,從 API 擷取資料、以 JSONL 格式轉換,然後將結果載入雲端儲存。每個任務透過儲存在共享儲存空間的 JSONL 檔案進行通訊,使管線易於除錯和重試。
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extract data from API and write JSONL to local disk."""ds = context['ds'] # execution date: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Read raw JSONL, filter and enrich, write cleaned JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Upload cleaned JSONL to S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
對於無法載入記憶體的大型 JSONL 檔案,使用串流轉換逐行處理。這個模式無論檔案大小如何都保持恆定的記憶體使用量,在 Airflow 任務中 worker 資源可能有限的情況下運作良好。
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Stream-transform a JSONL file with a user-defined function.Return None from transform_fn to skip a record."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Usage inside an Airflow taskdef enrich(record):if record.get('amount', 0) < 0:return None # skip invalidrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
將 JSONL 載入資料倉儲
現代資料倉儲對匯入 JSONL 檔案有一流的支援。Snowflake、BigQuery 和 Redshift 都可以直接從雲端儲存載入 JSONL、即時解析 JSON,並將結果儲存在結構化表中。這消除了中間 CSV 轉換步驟的需要,並保留了巢狀資料型別。
Snowflake
COPY INTOSnowflake 透過 COPY INTO 命令從 S3、GCS 或 Azure Blob stage 匯入 JSONL。使用 FILE_FORMAT 選項搭配 TYPE = 'JSON' 將每行解析為單獨的 JSON 文件。VARIANT 欄位型別原生儲存半結構化資料。
BigQuery
原生BigQuery 從 Google Cloud Storage 載入 JSONL 檔案,支援自動 schema 偵測。bq load 命令搭配 --source_format=NEWLINE_DELIMITED_JSON 原生處理 JSONL。BigQuery 也可以從 JSON 值自動偵測欄位型別。
Redshift
COPYAmazon Redshift 使用 COPY 命令搭配 FORMAT AS JSON 支援從 S3 載入 JSONL。你可以提供 JSONPaths 檔案將 JSON 鍵對應到表欄位,讓你精細控制載入過程。
從 S3 stage 將 JSONL 檔案載入 Snowflake 表。每行成為包含已解析 JSON 的 VARIANT 欄位的一行。然後你可以使用 Snowflake SQL 將 VARIANT 欄位展平為型別化欄位。
-- Create a stage pointing to your S3 bucketCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Create the target tableCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Load JSONL files from the stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Query the loaded dataSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
使用 BigQuery Python 客戶端從 Google Cloud Storage 將 JSONL 檔案載入 BigQuery 表。客戶端處理 schema 偵測、分區和錯誤報告。此方式與 Airflow 任務整合良好。
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Load from GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Wait for completiontable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
容錯與冪等性
ETL 管線會失敗。網路斷線、API 逾時、worker 記憶體不足。JSONL 使復原變得簡單,因為每一行都是獨立的。你可以設定檢查點(行號)、從最後成功的行恢復,並避免重新處理資料。搭配冪等寫入,即使在至少一次傳遞的情況下也能實現恰好一次語意。
一個基於檢查點的處理器,按行號追蹤進度。如果管線失敗,它從最後提交的檢查點恢復,而不是重新處理整個檔案。檢查點檔案是一個簡單的文字檔案,包含最後成功處理的行號。
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Read the last committed line number."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomically save the current checkpoint."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomic on POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Process a JSONL file with checkpoint-based resumption."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # skip already-processed linesline = line.strip()if not line:continuerecord = json.loads(line)# --- your transform logic ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Final checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Usageprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
檢查點模式適用於任何 JSONL 管線階段。對於 Kafka 消費者,用提交的偏移量替代檔案檢查點。對於 Airflow,將檢查點儲存在 XCom 或外部資料庫中,以便重試的任務從正確的位置恢復。關鍵洞察是 JSONL 基於行的格式使位置追蹤變得極其簡單:一個整數(行號或位元組偏移量)就足以恢復處理。
載入前驗證你的 JSONL
在格式錯誤的記錄進入管線之前捕獲它們。使用我們的免費線上工具在瀏覽器中驗證、檢視和轉換 JSONL 檔案。