JSONL 在 ETL 管道中的应用:Kafka、Airflow 与数据仓库
使用 JSONL(JSON Lines)作为现代 ETL 管道交换格式的实践指南。学习在 Kafka 中生产和消费 JSONL 事件、在 Airflow DAG 中编排 JSONL 转换,以及将 JSONL 数据加载到 Snowflake、BigQuery 和 Redshift 中。
最后更新:2026年2月
为什么 JSONL 是理想的 ETL 交换格式
ETL(抽取-转换-加载)管道在很少共享相同 Schema 的系统之间移动数据。JSONL 特别适合这项任务,因为每行都是一个独立的 JSON 文档。没有会丢失的表头,没有需要转义的分隔符,也没有在部分失败后需要重新组装的多行记录。每行可以独立验证、转换和路由,这正是 Kafka 和 Airflow 等分布式系统所需要的。
与 CSV 不同,JSONL 能无歧义地保留嵌套结构、数组和类型化值。与 Parquet 或 Avro 不同,JSONL 是人类可读的,不需要特殊工具来检查。这些特性使 JSONL 成为管道各阶段之间的天然粘合格式:Kafka 主题承载 JSONL 消息,Airflow 任务在对象存储中读写 JSONL 文件,数据仓库通过原生批量加载命令导入 JSONL。在本指南中,您将学习这些阶段的具体模式和生产级代码。
Kafka + JSONL:事件流
Apache Kafka 是实时 ETL 的骨干。生产者将事件作为 JSONL 编码的消息发送(每条 Kafka 消息一个 JSON 对象),消费者逐行读取到下游系统中。由于每条 Kafka 消息就是一条 JSONL 记录,这种格式与 Kafka 基于日志的架构完美对齐。序列化非常简单,消费者可以独立解析每条消息而无需缓冲。
一个将 Python 字典序列化为 JSONL 编码消息的 Kafka 生产者。每条消息是发送到 Kafka 主题的单行 JSON。value_serializer 自动处理编码,因此您只需将字典传递给 send() 方法。
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Read a JSONL file and publish each line as a Kafka messagewith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
一个读取 JSONL 消息并分批写入本地 JSONL 文件的 Kafka 消费者。批处理减少 I/O 开销,并允许下游系统一次处理更大的数据块。消费者仅在批次完全写入后才提交偏移量,确保至少一次投递。
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Write batch to JSONL filewith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Flush remaining recordsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow:编排 JSONL 任务
Apache Airflow 是批量 ETL 的标准编排工具。典型的基于 JSONL 的 Airflow 管道有三个阶段:将原始数据抽取为 JSONL、转换 JSONL 文件(过滤、增强、重塑),以及将结果加载到数据仓库或对象存储中。每个阶段读写 JSONL 文件,使中间状态可检查,各个任务可独立重新运行。
一个完整的 Airflow DAG,从 API 抽取数据,将其转换为 JSONL,并将结果加载到云存储中。每个任务通过存储在共享存储上的 JSONL 文件进行通信,使管道易于调试和重试。
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extract data from API and write JSONL to local disk."""ds = context['ds'] # execution date: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Read raw JSONL, filter and enrich, write cleaned JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Upload cleaned JSONL to S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
对于无法放入内存的大型 JSONL 文件,使用逐行处理的流式转换。这种模式无论文件大小如何都保持恒定的内存使用,非常适合 Airflow 任务中 worker 资源可能有限的场景。
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Stream-transform a JSONL file with a user-defined function.Return None from transform_fn to skip a record."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Usage inside an Airflow taskdef enrich(record):if record.get('amount', 0) < 0:return None # skip invalidrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
将 JSONL 加载到数据仓库
现代数据仓库对 JSONL 文件的导入提供了一流支持。Snowflake、BigQuery 和 Redshift 都可以直接从云存储加载 JSONL,即时解析 JSON,并将结果存储在结构化表中。这消除了中间 CSV 转换步骤的需要,并保留了嵌套数据类型。
Snowflake
COPY INTOSnowflake 通过 COPY INTO 命令从 S3、GCS 或 Azure Blob 暂存区导入 JSONL。使用 FILE_FORMAT 选项设置 TYPE = 'JSON' 将每行解析为独立的 JSON 文档。VARIANT 列类型原生存储半结构化数据。
BigQuery
原生支持BigQuery 从 Google Cloud Storage 加载 JSONL 文件,支持自动 Schema 检测。bq load 命令配合 --source_format=NEWLINE_DELIMITED_JSON 原生处理 JSONL。BigQuery 还可以从 JSON 值自动检测列类型。
Redshift
COPYAmazon Redshift 支持使用 COPY 命令配合 FORMAT AS JSON 从 S3 加载 JSONL。您可以提供 JSONPaths 文件来映射 JSON 键到表列,实现对加载过程的精细控制。
从 S3 暂存区将 JSONL 文件加载到 Snowflake 表中。每行成为包含已解析 JSON 的 VARIANT 列的一行。然后可以使用 Snowflake SQL 将 VARIANT 列展平为类型化的列。
-- Create a stage pointing to your S3 bucketCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Create the target tableCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Load JSONL files from the stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Query the loaded dataSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
使用 BigQuery Python 客户端从 Google Cloud Storage 将 JSONL 文件加载到 BigQuery 表中。客户端处理 Schema 检测、分区和错误报告。这种方式与 Airflow 任务集成良好。
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Load from GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Wait for completiontable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
容错与幂等性
ETL 管道会失败。网络断开、API 超时、worker 内存不足。JSONL 使恢复变得简单,因为每行都是独立的。您可以记录处理位置(行号),从最后成功的行恢复,避免重复处理数据。结合幂等写入,即使在至少一次投递的情况下也能实现精确一次语义。
一个按行号跟踪进度的检查点处理器。如果管道失败,它从最后提交的检查点恢复,而不是重新处理整个文件。检查点文件是一个简单的文本文件,包含最后成功处理的行号。
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Read the last committed line number."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomically save the current checkpoint."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomic on POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Process a JSONL file with checkpoint-based resumption."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # skip already-processed linesline = line.strip()if not line:continuerecord = json.loads(line)# --- your transform logic ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Final checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Usageprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
检查点模式适用于任何 JSONL 管道阶段。对于 Kafka 消费者,将文件检查点替换为已提交的偏移量。对于 Airflow,将检查点存储在 XCom 或外部数据库中,以便重试的任务从正确位置恢复。关键在于 JSONL 基于行的格式使位置跟踪变得非常简单:一个整数(行号或字节偏移量)就是恢复所需的全部信息。
加载前验证您的 JSONL
在数据进入管道之前捕获格式错误的记录。使用我们的免费在线工具在浏览器中验证、查看和转换 JSONL 文件。