JSONL 在 ETL 管道中的应用:Kafka、Airflow 与数据仓库

使用 JSONL(JSON Lines)作为现代 ETL 管道交换格式的实践指南。学习在 Kafka 中生产和消费 JSONL 事件、在 Airflow DAG 中编排 JSONL 转换,以及将 JSONL 数据加载到 Snowflake、BigQuery 和 Redshift 中。

最后更新:2026年2月

为什么 JSONL 是理想的 ETL 交换格式

ETL(抽取-转换-加载)管道在很少共享相同 Schema 的系统之间移动数据。JSONL 特别适合这项任务,因为每行都是一个独立的 JSON 文档。没有会丢失的表头,没有需要转义的分隔符,也没有在部分失败后需要重新组装的多行记录。每行可以独立验证、转换和路由,这正是 Kafka 和 Airflow 等分布式系统所需要的。

与 CSV 不同,JSONL 能无歧义地保留嵌套结构、数组和类型化值。与 Parquet 或 Avro 不同,JSONL 是人类可读的,不需要特殊工具来检查。这些特性使 JSONL 成为管道各阶段之间的天然粘合格式:Kafka 主题承载 JSONL 消息,Airflow 任务在对象存储中读写 JSONL 文件,数据仓库通过原生批量加载命令导入 JSONL。在本指南中,您将学习这些阶段的具体模式和生产级代码。

Kafka + JSONL:事件流

Apache Kafka 是实时 ETL 的骨干。生产者将事件作为 JSONL 编码的消息发送(每条 Kafka 消息一个 JSON 对象),消费者逐行读取到下游系统中。由于每条 Kafka 消息就是一条 JSONL 记录,这种格式与 Kafka 基于日志的架构完美对齐。序列化非常简单,消费者可以独立解析每条消息而无需缓冲。

一个将 Python 字典序列化为 JSONL 编码消息的 Kafka 生产者。每条消息是发送到 Kafka 主题的单行 JSON。value_serializer 自动处理编码,因此您只需将字典传递给 send() 方法。

Kafka JSONL 生产者
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Read a JSONL file and publish each line as a Kafka message
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

一个读取 JSONL 消息并分批写入本地 JSONL 文件的 Kafka 消费者。批处理减少 I/O 开销,并允许下游系统一次处理更大的数据块。消费者仅在批次完全写入后才提交偏移量,确保至少一次投递。

带批量写入的 Kafka JSONL 消费者
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Write batch to JSONL file
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Flush remaining records
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow:编排 JSONL 任务

Apache Airflow 是批量 ETL 的标准编排工具。典型的基于 JSONL 的 Airflow 管道有三个阶段:将原始数据抽取为 JSONL、转换 JSONL 文件(过滤、增强、重塑),以及将结果加载到数据仓库或对象存储中。每个阶段读写 JSONL 文件,使中间状态可检查,各个任务可独立重新运行。

一个完整的 Airflow DAG,从 API 抽取数据,将其转换为 JSONL,并将结果加载到云存储中。每个任务通过存储在共享存储上的 JSONL 文件进行通信,使管道易于调试和重试。

Airflow DAG:JSONL ETL 管道
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extract data from API and write JSONL to local disk."""
ds = context['ds'] # execution date: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Read raw JSONL, filter and enrich, write cleaned JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Upload cleaned JSONL to S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

对于无法放入内存的大型 JSONL 文件,使用逐行处理的流式转换。这种模式无论文件大小如何都保持恒定的内存使用,非常适合 Airflow 任务中 worker 资源可能有限的场景。

流式 JSONL 转换任务
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Stream-transform a JSONL file with a user-defined function.
Return None from transform_fn to skip a record."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Usage inside an Airflow task
def enrich(record):
if record.get('amount', 0) < 0:
return None # skip invalid
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

将 JSONL 加载到数据仓库

现代数据仓库对 JSONL 文件的导入提供了一流支持。Snowflake、BigQuery 和 Redshift 都可以直接从云存储加载 JSONL,即时解析 JSON,并将结果存储在结构化表中。这消除了中间 CSV 转换步骤的需要,并保留了嵌套数据类型。

Snowflake

COPY INTO

Snowflake 通过 COPY INTO 命令从 S3、GCS 或 Azure Blob 暂存区导入 JSONL。使用 FILE_FORMAT 选项设置 TYPE = 'JSON' 将每行解析为独立的 JSON 文档。VARIANT 列类型原生存储半结构化数据。

BigQuery

原生支持

BigQuery 从 Google Cloud Storage 加载 JSONL 文件,支持自动 Schema 检测。bq load 命令配合 --source_format=NEWLINE_DELIMITED_JSON 原生处理 JSONL。BigQuery 还可以从 JSON 值自动检测列类型。

Redshift

COPY

Amazon Redshift 支持使用 COPY 命令配合 FORMAT AS JSON 从 S3 加载 JSONL。您可以提供 JSONPaths 文件来映射 JSON 键到表列,实现对加载过程的精细控制。

从 S3 暂存区将 JSONL 文件加载到 Snowflake 表中。每行成为包含已解析 JSON 的 VARIANT 列的一行。然后可以使用 Snowflake SQL 将 VARIANT 列展平为类型化的列。

Snowflake:从 S3 加载 JSONL
-- Create a stage pointing to your S3 bucket
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Create the target table
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load JSONL files from the stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Query the loaded data
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

使用 BigQuery Python 客户端从 Google Cloud Storage 将 JSONL 文件加载到 BigQuery 表中。客户端处理 Schema 检测、分区和错误报告。这种方式与 Airflow 任务集成良好。

BigQuery:使用 Python 客户端加载 JSONL
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Load from GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Wait for completion
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

容错与幂等性

ETL 管道会失败。网络断开、API 超时、worker 内存不足。JSONL 使恢复变得简单,因为每行都是独立的。您可以记录处理位置(行号),从最后成功的行恢复,避免重复处理数据。结合幂等写入,即使在至少一次投递的情况下也能实现精确一次语义。

一个按行号跟踪进度的检查点处理器。如果管道失败,它从最后提交的检查点恢复,而不是重新处理整个文件。检查点文件是一个简单的文本文件,包含最后成功处理的行号。

基于检查点的 JSONL 处理
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Read the last committed line number."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Atomically save the current checkpoint."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomic on POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Process a JSONL file with checkpoint-based resumption."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # skip already-processed lines
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- your transform logic ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Final checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Usage
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

检查点模式适用于任何 JSONL 管道阶段。对于 Kafka 消费者,将文件检查点替换为已提交的偏移量。对于 Airflow,将检查点存储在 XCom 或外部数据库中,以便重试的任务从正确位置恢复。关键在于 JSONL 基于行的格式使位置跟踪变得非常简单:一个整数(行号或字节偏移量)就是恢复所需的全部信息。

加载前验证您的 JSONL

在数据进入管道之前捕获格式错误的记录。使用我们的免费在线工具在浏览器中验证、查看和转换 JSONL 文件。

即时检查 JSONL 文件

直接在浏览器中查看、验证和转换最大 1GB 的 JSONL 文件。无需上传,100% 私密。

常见问题

JSONL ETL 管道 — Kafka、Airflow、BigQuery、Snowflake | jsonl.co