JSONL 串流處理:即時處理資料
JSONL 即時串流資料的完整指南。學習使用 Node.js readline、Python 生成器、Server-Sent Events、WebSockets 和生產環境日誌串流模式來建構串流管線。
最後更新:2026 年 2 月
為什麼 JSONL 非常適合串流處理
JSONL(JSON Lines)每行儲存一個完整的 JSON 物件,以換行符分隔。這種行分隔結構使其成為串流處理的完美格式,因為每一行都是一個獨立的、可解析的單元。與單一大型 JSON 陣列不同,您永遠不需要在開始處理記錄之前將整個資料集載入記憶體。消費者可以讀取一行、解析它、對其進行操作,然後移到下一行,無論資料來源有多大,記憶體使用量都保持恆定。
這個特性是 JSONL 在日誌聚合系統、即時分析管線、機器學習資料饋送和 API 串流回應中佔據主導地位的原因。OpenAI 的 API、AWS CloudWatch 和 Apache Kafka 等工具都使用換行分隔的 JSON,因為它結合了 JSON 的通用可讀性和行導向協議的串流效率。在本指南中,您將學習如何在 Node.js 和 Python 中建構 JSONL 串流解決方案,使用 Server-Sent Events 將即時資料推送到瀏覽器,透過 WebSockets 交換 JSONL,以及實作生產就緒的日誌串流系統。
在 Node.js 中串流 JSONL
Node.js 建立在事件驅動、非阻塞 I/O 模型之上,使其特別適合串流工作負載。內建的 readline 和 stream 模組提供了您逐行處理 JSONL 檔案和網路串流所需的一切,無需將完整負載載入記憶體。
readline 模組搭配 fs.createReadStream 是在 Node.js 中消費 JSONL 檔案的標準方式。它以區塊方式讀取檔案並發出完整的行,因此即使對於數 GB 的檔案,記憶體也保持平穩。
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Node.js Transform streams 讓您建構可組合的資料管線,讀取 JSONL、過濾或豐富每筆記錄,然後將結果寫回。此模式自動處理背壓,在目標無法跟上時暫停來源。
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
在 Python 中串流 JSONL
Python 生成器提供了一種優雅的方式來串流 JSONL 資料。由於生成器一次產出一筆記錄,並且僅在消費者請求下一個值時才前進,因此記憶體使用量保持最小。結合內建的 json 模組,您只需幾行程式碼就能建構強大的串流管線。
Python 生成器函式使用標準 for 迴圈逐行讀取檔案。每次呼叫 next() 恰好讀取一行、解析它並產出結果。檔案控制代碼保持開啟狀態,位置僅在消費者拉取記錄時才前進。
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
Python itertools 讓您組合串流操作,如過濾、批次和切片,而無需實體化中間結果。鏈中的每一步一次處理一筆記錄,因此您可以處理大於可用 RAM 的檔案。
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events 與 JSONL
Server-Sent Events(SSE)提供了一種簡單的、基於 HTTP 的協議,用於從伺服器向客戶端推送即時更新。每個 SSE 事件攜帶一個 data 欄位,而 JSONL 是天然的負載格式,因為每筆記錄都是一個完整的 JSON 物件,客戶端可以立即解析。與 WebSockets 不同,SSE 透過標準 HTTP 工作,不需要特殊的代理設定,且在連線失敗時會自動重連。
伺服器將 Content-Type 設定為 text/event-stream,並將每筆 JSONL 記錄作為 SSE data 欄位寫入。連線保持開啟,伺服器在新事件可用時推送。
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
瀏覽器使用 EventSource API 連接到 SSE 端點。每個傳入事件包含一筆 JSONL 記錄,即時解析和渲染。EventSource 在連線中斷時自動處理重連。
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + JSONL 串流
SSE 處理伺服器到客戶端的推送,而 WebSockets 則啟用全雙工通訊,雙方可以隨時發送 JSONL 記錄。這非常適合互動式應用程式,如協作編輯、帶使用者命令的即時儀表板,以及雙向資料同步。每個 WebSocket 訊息包含一筆 JSONL 記錄,使兩端的解析都很簡單。
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
每個 WebSocket 訊息是一個單一的 JSON 物件,遵循 JSONL 每筆訊息一筆記錄的慣例。伺服器以 500 毫秒的間隔向客戶端串流指標資料,同時也接受來自客戶端的命令。兩個方向使用相同的 JSON 序列化,因此協議是對稱的且易於除錯。
日誌串流實務
JSONL 串流最常見的實際用途之一是集中式日誌收集。應用程式將結構化日誌條目作為 JSONL 行寫入 stdout 或檔案,收集器代理即時追蹤輸出,將記錄轉發到集中系統。Docker、Kubernetes 和大多數雲端日誌服務都使用此模式。
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
此收集器即時追蹤 JSONL 日誌檔案,根據日誌級別將記錄分發到處理器。它追蹤檔案位置,因此在初始追趕之後只讀取新內容。在生產環境中,您需要新增檔案輪替偵測、優雅關閉,以及批次轉發以減少對集中日誌系統的網路呼叫。
試用我們的免費 JSONL 工具
在建構串流管線之前,想要檢查或轉換 JSONL 資料嗎?使用我們的免費線上工具直接在瀏覽器中檢視、轉換和格式化 JSONL 檔案。所有處理都在本機進行,因此您的資料保持私密。