ETL パイプラインでの JSONL:Kafka、Airflow とデータウェアハウス

最新の ETL パイプラインにおいて JSONL(JSON Lines)を交換フォーマットとして使用するための実践ガイド。Kafka での JSONL イベントのプロデュース・コンシューム、Airflow DAG での JSONL 変換のオーケストレーション、Snowflake、BigQuery、Redshift への JSONL データのロードを学びます。

最終更新:2026年2月

なぜ JSONL は理想的な ETL 交換フォーマットなのか

Extract-Transform-Load(ETL)パイプラインは、同じスキーマを共有することが稀なシステム間でデータを移動します。JSONL はこのタスクに非常に適しています。各行が自己完結型の JSON ドキュメントであるため、失うヘッダーもエスケープするデリミタも、部分的な障害後に再構築する複数行レコードもありません。各行を独立して検証、変換、ルーティングできます。これはまさに Kafka や Airflow のような分散システムが必要とするものです。

CSV と異なり、JSONL はネストされた構造体、配列、型付き値を曖昧さなく保持します。Parquet や Avro と異なり、JSONL は人間が読めるため、内容確認に特別なツールが不要です。これらの特性により、JSONL はパイプラインステージ間の接着剤として自然なフォーマットとなります:Kafka トピックが JSONL メッセージを運び、Airflow タスクがオブジェクトストレージから JSONL ファイルを読み書きし、データウェアハウスがネイティブの一括ロードコマンドで JSONL を取り込みます。このガイドでは、これらの各ステージの具体的なパターンを本番向けコードとともに学びます。

Kafka + JSONL:イベントストリーミング

Apache Kafka はリアルタイム ETL の基盤です。プロデューサーは JSONL エンコードされたメッセージ(Kafka メッセージ1つにつき1つの JSON オブジェクト)としてイベントを送信し、コンシューマーはそれらを行単位で下流システムに読み込みます。各 Kafka メッセージが単一の JSONL レコードであるため、フォーマットは Kafka のログベースアーキテクチャと完全に整合します。シリアライゼーションは単純で、コンシューマーはバッファリングなしで各メッセージを独立してパースできます。

Python の辞書を JSONL エンコードされたメッセージとしてシリアライズする Kafka プロデューサー。各メッセージは Kafka トピックに送信される単一の JSON 行です。value_serializer がエンコーディングを自動的に処理するため、send() メソッドに dict を渡すだけです。

Kafka JSONL プロデューサー
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Read a JSONL file and publish each line as a Kafka message
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

JSONL メッセージを読み取り、バッチ単位でローカル JSONL ファイルに書き込む Kafka コンシューマー。バッチ処理により I/O オーバーヘッドが削減され、下流システムがより大きなチャンクを一度に処理できます。コンシューマーはバッチの書き込みが完全に完了した後にのみオフセットをコミットし、少なくとも一度の配信を保証します。

バッチ書き込み付き Kafka JSONL コンシューマー
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Write batch to JSONL file
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Flush remaining records
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow:JSONL タスクのオーケストレーション

Apache Airflow はバッチ ETL の標準オーケストレーションツールです。典型的な JSONL ベースの Airflow パイプラインには3つのステージがあります:生データを JSONL に抽出、JSONL ファイルを変換(フィルタリング、エンリッチメント、リシェイプ)、結果をデータウェアハウスまたはオブジェクトストアにロード。各ステージが JSONL ファイルを読み書きするため、中間状態を確認でき、個々のタスクを独立して再実行できます。

API からデータを抽出し、JSONL として変換し、結果をクラウドストレージにロードする完全な Airflow DAG。各タスクは共有ストレージ上の JSONL ファイルを通じて通信するため、パイプラインのデバッグとリトライが容易です。

Airflow DAG:JSONL ETL パイプライン
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extract data from API and write JSONL to local disk."""
ds = context['ds'] # execution date: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Read raw JSONL, filter and enrich, write cleaned JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Upload cleaned JSONL to S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

メモリに収まらない大きな JSONL ファイルの場合、1行ずつ処理するストリーミング変換を使用します。このパターンはファイルサイズに関係なくメモリ使用量を一定に保ち、ワーカーリソースが限られる Airflow タスク内でうまく機能します。

ストリーミング JSONL 変換タスク
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Stream-transform a JSONL file with a user-defined function.
Return None from transform_fn to skip a record."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Usage inside an Airflow task
def enrich(record):
if record.get('amount', 0) < 0:
return None # skip invalid
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

JSONL をデータウェアハウスにロード

最新のデータウェアハウスは JSONL ファイルの取り込みをファーストクラスでサポートしています。Snowflake、BigQuery、Redshift はすべてクラウドストレージから JSONL を直接ロードし、JSON をオンザフライでパースし、構造化テーブルに結果を格納できます。これにより、中間的な CSV 変換ステップが不要になり、ネストされたデータ型が保持されます。

Snowflake

COPY INTO

Snowflake は S3、GCS、Azure Blob ステージからの COPY INTO コマンドで JSONL を取り込みます。FILE_FORMAT オプションで TYPE = 'JSON' を使用して各行を個別の JSON ドキュメントとしてパースします。VARIANT 列型が半構造化データをネイティブに格納します。

BigQuery

ネイティブ

BigQuery は自動スキーマ検出で Google Cloud Storage から JSONL ファイルをロードします。bq load コマンドに --source_format=NEWLINE_DELIMITED_JSON を指定すると JSONL をネイティブに処理します。BigQuery は JSON 値から列型を自動検出することもできます。

Redshift

COPY

Amazon Redshift は FORMAT AS JSON を使用した COPY コマンドで S3 からの JSONL ロードをサポートします。JSONPaths ファイルを提供して JSON キーをテーブルカラムにマッピングでき、ロードプロセスをきめ細かく制御できます。

S3 ステージから Snowflake テーブルに JSONL ファイルをロードします。各行はパースされた JSON を含む VARIANT 列を持つ行になります。その後、Snowflake SQL を使用して VARIANT 列を型付き列にフラット化できます。

Snowflake:S3 から JSONL を COPY
-- Create a stage pointing to your S3 bucket
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Create the target table
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load JSONL files from the stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Query the loaded data
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

BigQuery Python クライアントを使用して Google Cloud Storage から BigQuery テーブルに JSONL ファイルをロードします。クライアントがスキーマ検出、パーティショニング、エラーレポートを処理します。このアプローチは Airflow タスクとうまく統合できます。

BigQuery:Python クライアントで JSONL をロード
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Load from GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Wait for completion
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

フォールトトレランスと冪等性

ETL パイプラインは障害が発生します。ネットワークが切断され、API がタイムアウトし、ワーカーがメモリ不足になります。JSONL はすべての行が独立しているため、復旧が簡単です。処理位置(行番号)をチェックポイントとして記録し、最後の成功行から再開し、データの再処理を避けることができます。冪等な書き込みと組み合わせることで、少なくとも一度の配信でも正確に一度のセマンティクスを実現できます。

行番号で進捗を追跡するチェックポイントベースのプロセッサー。パイプラインが失敗した場合、ファイル全体を再処理するのではなく、最後にコミットされたチェックポイントから再開します。チェックポイントファイルは、最後に正常に処理された行番号を含む単純なテキストファイルです。

チェックポイントベースの JSONL 処理
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Read the last committed line number."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Atomically save the current checkpoint."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomic on POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Process a JSONL file with checkpoint-based resumption."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # skip already-processed lines
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- your transform logic ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Final checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Usage
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

チェックポイントパターンは任意の JSONL パイプラインステージで動作します。Kafka コンシューマーの場合、ファイルチェックポイントをコミットされたオフセットに置き換えます。Airflow の場合、リトライされたタスクが正しい位置から再開できるよう、チェックポイントを XCom または外部データベースに保存します。重要な洞察は、JSONL の行ベースのフォーマットにより位置追跡が簡単になることです:再開に必要なのは単一の整数(行番号またはバイトオフセット)だけです。

ロード前に JSONL を検証

パイプラインに到達する前に不正なレコードをキャッチ。無料のオンラインツールでブラウザ内で JSONL ファイルの検証、表示、変換ができます。

JSONL ファイルを即座に確認

ブラウザで最大 1GB の JSONL ファイルを表示、検証、変換。アップロード不要、100% プライベート。

よくある質問

JSONL ETL パイプライン — Kafka、Airflow、BigQuery、Snowflake | jso...