ETL パイプラインでの JSONL:Kafka、Airflow とデータウェアハウス
最新の ETL パイプラインにおいて JSONL(JSON Lines)を交換フォーマットとして使用するための実践ガイド。Kafka での JSONL イベントのプロデュース・コンシューム、Airflow DAG での JSONL 変換のオーケストレーション、Snowflake、BigQuery、Redshift への JSONL データのロードを学びます。
最終更新:2026年2月
なぜ JSONL は理想的な ETL 交換フォーマットなのか
Extract-Transform-Load(ETL)パイプラインは、同じスキーマを共有することが稀なシステム間でデータを移動します。JSONL はこのタスクに非常に適しています。各行が自己完結型の JSON ドキュメントであるため、失うヘッダーもエスケープするデリミタも、部分的な障害後に再構築する複数行レコードもありません。各行を独立して検証、変換、ルーティングできます。これはまさに Kafka や Airflow のような分散システムが必要とするものです。
CSV と異なり、JSONL はネストされた構造体、配列、型付き値を曖昧さなく保持します。Parquet や Avro と異なり、JSONL は人間が読めるため、内容確認に特別なツールが不要です。これらの特性により、JSONL はパイプラインステージ間の接着剤として自然なフォーマットとなります:Kafka トピックが JSONL メッセージを運び、Airflow タスクがオブジェクトストレージから JSONL ファイルを読み書きし、データウェアハウスがネイティブの一括ロードコマンドで JSONL を取り込みます。このガイドでは、これらの各ステージの具体的なパターンを本番向けコードとともに学びます。
Kafka + JSONL:イベントストリーミング
Apache Kafka はリアルタイム ETL の基盤です。プロデューサーは JSONL エンコードされたメッセージ(Kafka メッセージ1つにつき1つの JSON オブジェクト)としてイベントを送信し、コンシューマーはそれらを行単位で下流システムに読み込みます。各 Kafka メッセージが単一の JSONL レコードであるため、フォーマットは Kafka のログベースアーキテクチャと完全に整合します。シリアライゼーションは単純で、コンシューマーはバッファリングなしで各メッセージを独立してパースできます。
Python の辞書を JSONL エンコードされたメッセージとしてシリアライズする Kafka プロデューサー。各メッセージは Kafka トピックに送信される単一の JSON 行です。value_serializer がエンコーディングを自動的に処理するため、send() メソッドに dict を渡すだけです。
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Read a JSONL file and publish each line as a Kafka messagewith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
JSONL メッセージを読み取り、バッチ単位でローカル JSONL ファイルに書き込む Kafka コンシューマー。バッチ処理により I/O オーバーヘッドが削減され、下流システムがより大きなチャンクを一度に処理できます。コンシューマーはバッチの書き込みが完全に完了した後にのみオフセットをコミットし、少なくとも一度の配信を保証します。
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Write batch to JSONL filewith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Flush remaining recordsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow:JSONL タスクのオーケストレーション
Apache Airflow はバッチ ETL の標準オーケストレーションツールです。典型的な JSONL ベースの Airflow パイプラインには3つのステージがあります:生データを JSONL に抽出、JSONL ファイルを変換(フィルタリング、エンリッチメント、リシェイプ)、結果をデータウェアハウスまたはオブジェクトストアにロード。各ステージが JSONL ファイルを読み書きするため、中間状態を確認でき、個々のタスクを独立して再実行できます。
API からデータを抽出し、JSONL として変換し、結果をクラウドストレージにロードする完全な Airflow DAG。各タスクは共有ストレージ上の JSONL ファイルを通じて通信するため、パイプラインのデバッグとリトライが容易です。
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extract data from API and write JSONL to local disk."""ds = context['ds'] # execution date: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Read raw JSONL, filter and enrich, write cleaned JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Upload cleaned JSONL to S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
メモリに収まらない大きな JSONL ファイルの場合、1行ずつ処理するストリーミング変換を使用します。このパターンはファイルサイズに関係なくメモリ使用量を一定に保ち、ワーカーリソースが限られる Airflow タスク内でうまく機能します。
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Stream-transform a JSONL file with a user-defined function.Return None from transform_fn to skip a record."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Usage inside an Airflow taskdef enrich(record):if record.get('amount', 0) < 0:return None # skip invalidrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
JSONL をデータウェアハウスにロード
最新のデータウェアハウスは JSONL ファイルの取り込みをファーストクラスでサポートしています。Snowflake、BigQuery、Redshift はすべてクラウドストレージから JSONL を直接ロードし、JSON をオンザフライでパースし、構造化テーブルに結果を格納できます。これにより、中間的な CSV 変換ステップが不要になり、ネストされたデータ型が保持されます。
Snowflake
COPY INTOSnowflake は S3、GCS、Azure Blob ステージからの COPY INTO コマンドで JSONL を取り込みます。FILE_FORMAT オプションで TYPE = 'JSON' を使用して各行を個別の JSON ドキュメントとしてパースします。VARIANT 列型が半構造化データをネイティブに格納します。
BigQuery
ネイティブBigQuery は自動スキーマ検出で Google Cloud Storage から JSONL ファイルをロードします。bq load コマンドに --source_format=NEWLINE_DELIMITED_JSON を指定すると JSONL をネイティブに処理します。BigQuery は JSON 値から列型を自動検出することもできます。
Redshift
COPYAmazon Redshift は FORMAT AS JSON を使用した COPY コマンドで S3 からの JSONL ロードをサポートします。JSONPaths ファイルを提供して JSON キーをテーブルカラムにマッピングでき、ロードプロセスをきめ細かく制御できます。
S3 ステージから Snowflake テーブルに JSONL ファイルをロードします。各行はパースされた JSON を含む VARIANT 列を持つ行になります。その後、Snowflake SQL を使用して VARIANT 列を型付き列にフラット化できます。
-- Create a stage pointing to your S3 bucketCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Create the target tableCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Load JSONL files from the stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Query the loaded dataSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
BigQuery Python クライアントを使用して Google Cloud Storage から BigQuery テーブルに JSONL ファイルをロードします。クライアントがスキーマ検出、パーティショニング、エラーレポートを処理します。このアプローチは Airflow タスクとうまく統合できます。
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Load from GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Wait for completiontable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
フォールトトレランスと冪等性
ETL パイプラインは障害が発生します。ネットワークが切断され、API がタイムアウトし、ワーカーがメモリ不足になります。JSONL はすべての行が独立しているため、復旧が簡単です。処理位置(行番号)をチェックポイントとして記録し、最後の成功行から再開し、データの再処理を避けることができます。冪等な書き込みと組み合わせることで、少なくとも一度の配信でも正確に一度のセマンティクスを実現できます。
行番号で進捗を追跡するチェックポイントベースのプロセッサー。パイプラインが失敗した場合、ファイル全体を再処理するのではなく、最後にコミットされたチェックポイントから再開します。チェックポイントファイルは、最後に正常に処理された行番号を含む単純なテキストファイルです。
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Read the last committed line number."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomically save the current checkpoint."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomic on POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Process a JSONL file with checkpoint-based resumption."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # skip already-processed linesline = line.strip()if not line:continuerecord = json.loads(line)# --- your transform logic ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Final checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Usageprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
チェックポイントパターンは任意の JSONL パイプラインステージで動作します。Kafka コンシューマーの場合、ファイルチェックポイントをコミットされたオフセットに置き換えます。Airflow の場合、リトライされたタスクが正しい位置から再開できるよう、チェックポイントを XCom または外部データベースに保存します。重要な洞察は、JSONL の行ベースのフォーマットにより位置追跡が簡単になることです:再開に必要なのは単一の整数(行番号またはバイトオフセット)だけです。
ロード前に JSONL を検証
パイプラインに到達する前に不正なレコードをキャッチ。無料のオンラインツールでブラウザ内で JSONL ファイルの検証、表示、変換ができます。