JSONLストリーミング:リアルタイムデータ処理

JSONLデータのリアルタイムストリーミングに関する包括的ガイド。Node.js readline、Pythonジェネレーター、Server-Sent Events、WebSocket、本番環境でのログストリーミングパターンを使用したストリーミングパイプラインの構築方法を学びます。

最終更新:2026年2月

JSONLがストリーミングに最適な理由

JSONL(JSON Lines)は、改行文字で区切られた1行に1つの完全なJSONオブジェクトを格納します。この行区切り構造により、各行が自己完結型でパース可能な単位であるため、ストリーミングに最適なフォーマットとなっています。単一の大きなJSON配列とは異なり、レコードの処理を開始する前にデータセット全体をメモリに読み込む必要がありません。コンシューマーは1行を読み、パースし、処理し、次の行に進むことができ、データソースがどれだけ大きくなってもメモリ使用量は一定に保たれます。

この特性が、JSONLがログ集約システム、リアルタイム分析パイプライン、機械学習データフィード、APIストリーミングレスポンスで主流となっている理由です。OpenAIのAPI、AWS CloudWatch、Apache Kafkaなどのツールはすべて、JSONの汎用的な可読性と行指向プロトコルのストリーミング効率を兼ね備えた改行区切りJSONを使用しています。このガイドでは、Node.jsとPythonでのJSONLストリーミングソリューションの構築方法、Server-Sent Eventsによるブラウザへのリアルタイムデータプッシュ、WebSocketでのJSONLの交換、本番環境対応のログストリーミングシステムの実装方法を学びます。

Node.jsでのJSONLストリーミング

Node.jsはイベント駆動型のノンブロッキングI/Oモデルで構築されており、ストリーミングワークロードに非常に適しています。組み込みのreadlineとstreamモジュールにより、フルペイロードをメモリに読み込むことなく、JSONLファイルやネットワークストリームを1行ずつ処理するのに必要なすべてが揃っています。

readlineモジュールとfs.createReadStreamの組み合わせは、Node.jsでJSONLファイルを処理する標準的な方法です。ファイルをチャンク単位で読み取り、完全な行を出力するため、数ギガバイトのファイルでもメモリは一定に保たれます。

readlineによるストリーム読み取り
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function streamJsonl(filePath, onRecord) {
const rl = createInterface({
input: createReadStream(filePath, 'utf-8'),
crlfDelay: Infinity,
});
let count = 0;
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const record = JSON.parse(trimmed);
await onRecord(record);
count++;
} catch (err) {
console.error(`Skipping invalid line: ${err.message}`);
}
}
return count;
}
// Usage: process each record as it arrives
const total = await streamJsonl('events.jsonl', async (record) => {
console.log(`Event: ${record.type} at ${record.timestamp}`);
});
console.log(`Processed ${total} records`);

Node.jsのTransform Streamsを使用すると、JSONLの読み取り、各レコードのフィルタリングや拡充、結果の書き戻しを行う合成可能なデータパイプラインを構築できます。このパターンはバックプレッシャーを自動的に処理し、宛先が追いつけない場合にソースを一時停止します。

パイプライン用Transform Streams
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
// Split incoming chunks into individual lines
class LineSplitter extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, cb) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
if (line.trim()) this.push(line.trim());
}
cb();
}
_flush(cb) {
if (this.buffer.trim()) this.push(this.buffer.trim());
cb();
}
}
// Parse, transform, and re-serialize each record
class JsonlTransform extends Transform {
constructor(transformFn) {
super({ objectMode: true });
this.transformFn = transformFn;
}
_transform(line, encoding, cb) {
try {
const record = JSON.parse(line);
const result = this.transformFn(record);
if (result) cb(null, JSON.stringify(result) + '\n');
else cb(); // Filter out null results
} catch (err) {
cb(); // Skip invalid lines
}
}
}
// Build the pipeline: read -> split -> transform -> write
await pipeline(
createReadStream('input.jsonl', 'utf-8'),
new LineSplitter(),
new JsonlTransform((record) => {
if (record.level === 'error') {
return { ...record, flagged: true, reviewedAt: new Date().toISOString() };
}
return null; // Filter out non-error records
}),
createWriteStream('errors.jsonl', 'utf-8')
);
console.log('Pipeline complete');

PythonでのJSONLストリーミング

Pythonのジェネレーターは、JSONLデータをストリーミングする優雅な方法を提供します。ジェネレーターは一度に1つのレコードをyieldし、コンシューマーが次の値を要求した場合にのみ進むため、メモリ使用量は最小限に抑えられます。組み込みのjsonモジュールと組み合わせることで、わずか数行のコードで強力なストリーミングパイプラインを構築できます。

Pythonのジェネレーター関数は、標準のforループを使用してファイルを1行ずつ読み取ります。next()の各呼び出しで正確に1行を読み、パースし、結果をyieldします。ファイルハンドルは開いたままで、コンシューマーがレコードを取得するときにのみ位置が進みます。

ジェネレーターベースのストリーミング
import json
from typing import Generator, Any
def stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:
"""Stream JSONL records one at a time using a generator."""
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"Skipping line {line_num}: {e}")
# Usage: process records lazily
for record in stream_jsonl('events.jsonl'):
if record.get('level') == 'error':
print(f"ERROR: {record['message']}")
# Or collect a subset with itertools
import itertools
first_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))
print(f"Loaded first {len(first_100)} records")

Python itertoolsを使用すると、中間結果をマテリアライズせずにフィルタリング、バッチ処理、スライスなどのストリーミング操作を合成できます。チェーン内の各ステップは一度に1レコードを処理するため、利用可能なRAMよりも大きなファイルを処理できます。

itertoolsによるチェーン処理
import json
import itertools
from typing import Generator, Iterable
def stream_jsonl(path: str) -> Generator[dict, None, None]:
with open(path, 'r') as f:
for line in f:
if line.strip():
yield json.loads(line)
def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:
"""Filter records where key equals value."""
for record in records:
if record.get(key) == value:
yield record
def batch(iterable: Iterable, size: int) -> Generator[list, None, None]:
"""Yield successive batches of the given size."""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
# Compose a streaming pipeline
records = stream_jsonl('access_logs.jsonl')
errors = filter_records(records, 'status', 500)
for i, error_batch in enumerate(batch(errors, 50)):
print(f"Batch {i + 1}: {len(error_batch)} errors")
# Send batch to alerting system, database, etc.
for record in error_batch:
print(f" {record['timestamp']} - {record['path']}")

Server-Sent EventsとJSONL

Server-Sent Events(SSE)は、サーバーからクライアントにリアルタイム更新をプッシュするためのシンプルなHTTPベースのプロトコルを提供します。各SSEイベントはdataフィールドを持ち、JSONLは各レコードがクライアントで即座にパースできる完全なJSONオブジェクトであるため、自然なペイロードフォーマットです。WebSocketとは異なり、SSEは標準HTTPで動作し、特別なプロキシ設定を必要とせず、障害時に自動的に再接続します。

サーバーはContent-Typeをtext/event-streamに設定し、各JSONLレコードをSSE dataフィールドとして書き込みます。接続は開いたままで、新しいイベントが利用可能になるとサーバーがプッシュします。

SSEサーバー(Node.js / Express)
import express from 'express';
const app = express();
// Simulate a real-time data source
function* generateMetrics() {
let id = 0;
while (true) {
yield {
id: ++id,
cpu: Math.random() * 100,
memory: Math.random() * 16384,
timestamp: new Date().toISOString(),
};
}
}
app.get('/api/stream/metrics', (req, res) => {
// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.flushHeaders();
const metrics = generateMetrics();
const interval = setInterval(() => {
const record = metrics.next().value;
// Each SSE event contains one JSONL record
res.write(`data: ${JSON.stringify(record)}\n\n`);
}, 1000);
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
app.listen(3000, () => console.log('SSE server on :3000'));

ブラウザはEventSource APIを使用してSSEエンドポイントに接続します。受信した各イベントには、リアルタイムでパースしてレンダリングされるJSONLレコードが含まれています。EventSourceは接続が切断された場合に自動的に再接続を処理します。

SSEクライアント(ブラウザ)
// Connect to the SSE endpoint
const source = new EventSource('/api/stream/metrics');
// Parse each JSONL record as it arrives
source.onmessage = (event) => {
const record = JSON.parse(event.data);
console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);
// Update your UI in real time
updateDashboard(record);
};
source.onerror = (err) => {
console.error('SSE connection error:', err);
// EventSource automatically reconnects
};
// For Fetch-based SSE (more control over the stream)
async function fetchSSE(url, onRecord) {
const response = await fetch(url);
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
const events = buffer.split('\n\n');
buffer = events.pop();
for (const event of events) {
const dataLine = event.split('\n').find(l => l.startsWith('data: '));
if (dataLine) {
const record = JSON.parse(dataLine.slice(6));
onRecord(record);
}
}
}
}
await fetchSSE('/api/stream/metrics', (record) => {
console.log('Metric:', record);
});

WebSocket + JSONLストリーミング

SSEがサーバーからクライアントへのプッシュを処理する一方、WebSocketは双方がいつでもJSONLレコードを送信できる全二重通信を可能にします。これは、共同編集、ユーザーコマンド付きのリアルタイムダッシュボード、双方向データ同期などのインタラクティブなアプリケーションに最適です。各WebSocketメッセージには単一のJSONLレコードが含まれるため、両端でのパースが簡単です。

JSONLメッセージを使用したWebSocketサーバー
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
console.log('Client connected');
// Send JSONL records to the client
const interval = setInterval(() => {
const record = {
type: 'metric',
value: Math.random() * 100,
timestamp: new Date().toISOString(),
};
ws.send(JSON.stringify(record));
}, 500);
// Receive JSONL records from the client
ws.on('message', (data) => {
try {
const record = JSON.parse(data.toString());
console.log(`Received: ${record.type}`, record);
// Echo back an acknowledgment
ws.send(JSON.stringify({
type: 'ack',
originalId: record.id,
processedAt: new Date().toISOString(),
}));
} catch (err) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
});
ws.on('close', () => {
clearInterval(interval);
console.log('Client disconnected');
});
});
// Client-side usage:
// const ws = new WebSocket('ws://localhost:8080');
// ws.onmessage = (e) => {
// const record = JSON.parse(e.data);
// console.log(record);
// };
// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));

各WebSocketメッセージは単一のJSONオブジェクトで、メッセージごとに1レコードというJSONLの慣例に従っています。サーバーは500msの間隔でクライアントにメトリクスをストリーミングしながら、クライアントからのコマンドも受け付けます。両方向で同じJSONシリアライゼーションを使用するため、プロトコルは対称的でデバッグが容易です。

実践的なログストリーミング

JSONLストリーミングの最も一般的な実用例の1つは、集中ログ収集です。アプリケーションは構造化されたログエントリをJSONL行としてstdoutまたはファイルに書き込み、コレクターエージェントが出力をリアルタイムでテーリングし、レコードを中央システムに転送します。このパターンは、Docker、Kubernetes、およびほとんどのクラウドロギングサービスで使用されています。

リアルタイムJSONLログコレクター
import { createReadStream, watchFile } from 'node:fs';
import { createInterface } from 'node:readline';
import { stat } from 'node:fs/promises';
class JsonlLogCollector {
constructor(filePath, handlers = {}) {
this.filePath = filePath;
this.handlers = handlers;
this.position = 0;
this.running = false;
}
async start() {
this.running = true;
// Read existing content first
await this.readFrom(0);
// Watch for new appends
this.watch();
}
async readFrom(startPos) {
const rl = createInterface({
input: createReadStream(this.filePath, {
encoding: 'utf-8',
start: startPos,
}),
crlfDelay: Infinity,
});
for await (const line of rl) {
if (!line.trim()) continue;
try {
const record = JSON.parse(line);
await this.dispatch(record);
} catch (err) {
this.handlers.onError?.(err, line);
}
}
const info = await stat(this.filePath);
this.position = info.size;
}
watch() {
watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {
if (!this.running) return;
if (curr.size > this.position) {
await this.readFrom(this.position);
}
});
}
async dispatch(record) {
const level = record.level || 'info';
this.handlers[level]?.(record);
this.handlers.onRecord?.(record);
}
stop() {
this.running = false;
}
}
// Usage
const collector = new JsonlLogCollector('app.log.jsonl', {
error: (r) => console.error(`[ERROR] ${r.message}`),
warn: (r) => console.warn(`[WARN] ${r.message}`),
onRecord: (r) => forwardToElasticsearch(r),
onError: (err, line) => console.error('Bad line:', line),
});
await collector.start();

このコレクターはJSONLログファイルをリアルタイムでテーリングし、ログレベルに基づいてレコードをハンドラーにディスパッチします。初回のキャッチアップ後は新しいコンテンツのみを読み取るようにファイル位置を追跡します。本番環境では、ファイルローテーションの検出、グレースフルシャットダウン、集中ログシステムへのネットワーク呼び出しを削減するためのバッチ転送を追加します。

無料のJSONLツールを試す

ストリーミングパイプラインを構築する前に、JSONLデータの検査や変換を行いたいですか?無料のオンラインツールを使用して、ブラウザでJSONLファイルの表示、変換、フォーマットを行えます。すべての処理はローカルで行われるため、データはプライベートに保たれます。

JSONLファイルをオンラインで操作

ブラウザで最大1GBのJSONLファイルを表示、検証、変換。アップロード不要、100%プライベート。

よくある質問

JSONL ストリーミング — JSON Lines をリアルタイム処理(Node/Python) | jsonl.co