Strumieniowanie JSONL: Przetwarzanie danych w czasie rzeczywistym
Kompleksowy przewodnik po strumieniowaniu danych JSONL w czasie rzeczywistym. Dowiedz się, jak budować potoki strumieniowania z Node.js readline, generatorami Python, Server-Sent Events, WebSockets i wzorcami strumieniowania logów produkcyjnych.
Ostatnia aktualizacja: luty 2026
Dlaczego JSONL jest idealny do strumieniowania
JSONL (JSON Lines) przechowuje jeden kompletny obiekt JSON na linię, oddzielony znakami nowej linii. Ta struktura rozdzielana liniami sprawia, że jest idealnym formatem do strumieniowania, ponieważ każda linia jest samodzielną, parsowalną jednostką. W przeciwieństwie do pojedynczej dużej tablicy JSON, nigdy nie musisz ładować całego zbioru danych do pamięci, zanim zaczniesz przetwarzać rekordy. Konsument może odczytać jedną linię, sparsować ją, wykonać na niej działanie i przejść do następnej, utrzymując stałe zużycie pamięci niezależnie od wielkości źródła danych.
Ta właściwość sprawia, że JSONL dominuje w systemach agregacji logów, potokach analityki czasu rzeczywistego, strumieniach danych uczenia maszynowego i odpowiedziach strumieniowych API. Narzędzia takie jak API OpenAI, AWS CloudWatch i Apache Kafka używają JSON-a rozdzielanego znakami nowej linii, ponieważ łączy on uniwersalną czytelność JSON z wydajnością strumieniowania protokołów liniowych. W tym przewodniku dowiesz się, jak budować rozwiązania strumieniowania JSONL w Node.js i Python, przesyłać dane w czasie rzeczywistym do przeglądarek za pomocą Server-Sent Events, wymieniać JSONL przez WebSockets i implementować produkcyjny system strumieniowania logów.
Strumieniowanie JSONL w Node.js
Node.js jest zbudowany na zdarzeniowym, nieblokującym modelu I/O, który czyni go wyjątkowo dobrze przystosowanym do obciążeń strumieniowych. Wbudowane moduły readline i stream dają Ci wszystko, czego potrzebujesz do przetwarzania plików JSONL i strumieni sieciowych linia po linii bez ładowania pełnego ładunku do pamięci.
Moduł readline w połączeniu z fs.createReadStream to standardowy sposób konsumowania pliku JSONL w Node.js. Odczytuje plik w porcjach i emituje kompletne linie, więc pamięć pozostaje stała nawet dla plików wielogigabajtowych.
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Transform Streams w Node.js pozwalają budować komponowalne potoki danych, które odczytują JSONL, filtrują lub wzbogacają każdy rekord i zapisują wyniki z powrotem. Ten wzorzec automatycznie obsługuje backpressure, wstrzymując źródło, gdy miejsce docelowe nie nadąża.
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
Strumieniowanie JSONL w Python
Generatory Python zapewniają elegancki sposób strumieniowania danych JSONL. Ponieważ generator zwraca jeden rekord na raz i przechodzi dalej dopiero, gdy konsument zażąda następnej wartości, zużycie pamięci pozostaje minimalne. W połączeniu z wbudowanym modułem json możesz budować potężne potoki strumieniowania za pomocą zaledwie kilku linii kodu.
Funkcja generatora Python odczytuje plik linia po linii za pomocą standardowej pętli for. Każde wywołanie next() odczytuje dokładnie jedną linię, parsuje ją i zwraca wynik. Uchwyt pliku pozostaje otwarty, a pozycja przesuwa się tylko w miarę pobierania rekordów przez konsumenta.
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
Python itertools pozwala komponować operacje strumieniowe, takie jak filtrowanie, grupowanie i wycinanie, bez materializowania wyników pośrednich. Każdy krok w łańcuchu przetwarza jeden rekord na raz, więc możesz obsługiwać pliki większe niż dostępna pamięć RAM.
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events z JSONL
Server-Sent Events (SSE) zapewniają prosty, oparty na HTTP protokół do przesyłania aktualizacji w czasie rzeczywistym z serwera do klienta. Każde zdarzenie SSE niesie pole data, a JSONL jest naturalnym formatem ładunku, ponieważ każda linia jest kompletnym obiektem JSON, który klient może natychmiast sparsować. W przeciwieństwie do WebSockets, SSE działa przez standardowy HTTP, nie wymaga specjalnej konfiguracji proxy i automatycznie łączy się ponownie po awarii.
Serwer ustawia Content-Type na text/event-stream i zapisuje każdy rekord JSONL jako pole data SSE. Połączenie pozostaje otwarte, a serwer przesyła nowe zdarzenia, gdy stają się dostępne.
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
Przeglądarka używa API EventSource do łączenia się z punktem końcowym SSE. Każde przychodzące zdarzenie zawiera rekord JSONL, który jest parsowany i renderowany w czasie rzeczywistym. EventSource obsługuje automatyczne ponowne połączenie w przypadku utraty połączenia.
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + strumieniowanie JSONL
Podczas gdy SSE obsługuje przesyłanie z serwera do klienta, WebSockets umożliwiają pełnodupleksową komunikację, gdzie obie strony mogą wysyłać rekordy JSONL w dowolnym momencie. Jest to idealne rozwiązanie dla interaktywnych aplikacji, takich jak współedytowanie, pulpity nawigacyjne w czasie rzeczywistym z poleceniami użytkownika i dwukierunkowa synchronizacja danych. Każda wiadomość WebSocket zawiera pojedynczy rekord JSONL, co sprawia, że parsowanie jest proste po obu stronach.
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
Każda wiadomość WebSocket to pojedynczy obiekt JSON, zgodny z konwencją JSONL jednego rekordu na wiadomość. Serwer strumieniuje metryki do klienta w odstępach 500ms, jednocześnie przyjmując polecenia od klienta. Oba kierunki używają tej samej serializacji JSON, więc protokół jest symetryczny i łatwy do debugowania.
Strumieniowanie logów w praktyce
Jednym z najczęstszych zastosowań strumieniowania JSONL w świecie rzeczywistym jest scentralizowane zbieranie logów. Aplikacje zapisują ustrukturyzowane wpisy logów jako linie JSONL do stdout lub pliku, a agent kolektora śledzi wyjście w czasie rzeczywistym, przekazując rekordy do systemu centralnego. Ten wzorzec jest używany przez Docker, Kubernetes i większość usług logowania w chmurze.
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
Ten kolektor śledzi plik logów JSONL w czasie rzeczywistym, wysyłając rekordy do handlerów na podstawie poziomu logowania. Śledzi pozycję w pliku, aby odczytywać tylko nową zawartość po początkowym przebiegu. W produkcji dodałbyś wykrywanie rotacji plików, płynne zamykanie i wsadowe przekazywanie, aby zmniejszyć liczbę wywołań sieciowych do scentralizowanego systemu logowania.
Wypróbuj nasze bezpłatne narzędzia JSONL
Chcesz sprawdzić lub przekonwertować dane JSONL przed zbudowaniem potoku strumieniowania? Użyj naszych bezpłatnych narzędzi online do przeglądania, konwertowania i formatowania plików JSONL bezpośrednio w przeglądarce. Całe przetwarzanie odbywa się lokalnie, więc Twoje dane pozostają prywatne.