JSONL 스트리밍: 실시간 데이터 처리

JSONL 데이터를 실시간으로 스트리밍하는 종합 가이드입니다. Node.js readline, Python 제너레이터, Server-Sent Events, WebSocket, 프로덕션 로그 스트리밍 패턴으로 스트리밍 파이프라인을 구축하는 방법을 배우세요.

최종 업데이트: 2026년 2월

JSONL이 스트리밍에 이상적인 이유

JSONL(JSON Lines)은 줄바꿈 문자로 구분된 줄당 하나의 완전한 JSON 객체를 저장합니다. 이 줄 구분 구조 덕분에 각 줄이 독립적으로 파싱 가능한 자체 완결 단위이므로 스트리밍에 완벽한 형식입니다. 단일 대형 JSON 배열과 달리 레코드 처리를 시작하기 전에 전체 데이터셋을 메모리에 로드할 필요가 없습니다. 소비자는 한 줄을 읽고, 파싱하고, 처리한 후 다음 줄로 이동할 수 있으며, 데이터 소스 크기에 관계없이 메모리 사용량을 일정하게 유지합니다.

이러한 속성 때문에 JSONL은 로그 수집 시스템, 실시간 분석 파이프라인, 머신러닝 데이터 피드, API 스트리밍 응답에서 주류를 이루고 있습니다. OpenAI의 API, AWS CloudWatch, Apache Kafka 등의 도구들이 모두 줄바꿈 구분 JSON을 사용하는데, JSON의 범용 가독성과 줄 지향 프로토콜의 스트리밍 효율성을 결합하기 때문입니다. 이 가이드에서는 Node.js와 Python에서 JSONL 스트리밍 솔루션을 구축하고, Server-Sent Events로 브라우저에 실시간 데이터를 푸시하고, WebSocket을 통해 JSONL을 교환하고, 프로덕션 준비 로그 스트리밍 시스템을 구현하는 방법을 배웁니다.

Node.js에서 JSONL 스트리밍

Node.js는 이벤트 기반 논블로킹 I/O 모델 위에 구축되어 스트리밍 워크로드에 매우 적합합니다. 내장 readline과 stream 모듈은 전체 페이로드를 메모리에 로드하지 않고 JSONL 파일과 네트워크 스트림을 줄 단위로 처리하는 데 필요한 모든 것을 제공합니다.

readline 모듈과 fs.createReadStream의 조합은 Node.js에서 JSONL 파일을 소비하는 표준 방법입니다. 파일을 청크 단위로 읽고 완전한 줄을 방출하므로, 수 기가바이트 파일에서도 메모리가 일정하게 유지됩니다.

readline을 사용한 스트림 읽기
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function streamJsonl(filePath, onRecord) {
const rl = createInterface({
input: createReadStream(filePath, 'utf-8'),
crlfDelay: Infinity,
});
let count = 0;
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const record = JSON.parse(trimmed);
await onRecord(record);
count++;
} catch (err) {
console.error(`Skipping invalid line: ${err.message}`);
}
}
return count;
}
// Usage: process each record as it arrives
const total = await streamJsonl('events.jsonl', async (record) => {
console.log(`Event: ${record.type} at ${record.timestamp}`);
});
console.log(`Processed ${total} records`);

Node.js Transform 스트림을 사용하면 JSONL을 읽고, 각 레코드를 필터링하거나 보강하고, 결과를 다시 쓰는 조합 가능한 데이터 파이프라인을 구축할 수 있습니다. 이 패턴은 배압(backpressure)을 자동으로 처리하여, 대상이 따라잡지 못할 때 소스를 일시 정지합니다.

파이프라인을 위한 Transform 스트림
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
// Split incoming chunks into individual lines
class LineSplitter extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, cb) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
if (line.trim()) this.push(line.trim());
}
cb();
}
_flush(cb) {
if (this.buffer.trim()) this.push(this.buffer.trim());
cb();
}
}
// Parse, transform, and re-serialize each record
class JsonlTransform extends Transform {
constructor(transformFn) {
super({ objectMode: true });
this.transformFn = transformFn;
}
_transform(line, encoding, cb) {
try {
const record = JSON.parse(line);
const result = this.transformFn(record);
if (result) cb(null, JSON.stringify(result) + '\n');
else cb(); // Filter out null results
} catch (err) {
cb(); // Skip invalid lines
}
}
}
// Build the pipeline: read -> split -> transform -> write
await pipeline(
createReadStream('input.jsonl', 'utf-8'),
new LineSplitter(),
new JsonlTransform((record) => {
if (record.level === 'error') {
return { ...record, flagged: true, reviewedAt: new Date().toISOString() };
}
return null; // Filter out non-error records
}),
createWriteStream('errors.jsonl', 'utf-8')
);
console.log('Pipeline complete');

Python에서 JSONL 스트리밍

Python 제너레이터는 JSONL 데이터를 스트리밍하는 우아한 방법을 제공합니다. 제너레이터는 한 번에 하나의 레코드를 yield하고 소비자가 다음 값을 요청할 때만 진행하므로 메모리 사용량이 최소한으로 유지됩니다. 내장 json 모듈과 결합하면 몇 줄의 코드만으로 강력한 스트리밍 파이프라인을 구축할 수 있습니다.

Python 제너레이터 함수는 표준 for 루프를 사용하여 파일을 줄 단위로 읽습니다. next() 호출마다 정확히 한 줄을 읽고 파싱하여 결과를 yield합니다. 파일 핸들은 열린 상태로 유지되며 소비자가 레코드를 가져올 때만 위치가 진행됩니다.

제너레이터 기반 스트리밍
import json
from typing import Generator, Any
def stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:
"""Stream JSONL records one at a time using a generator."""
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"Skipping line {line_num}: {e}")
# Usage: process records lazily
for record in stream_jsonl('events.jsonl'):
if record.get('level') == 'error':
print(f"ERROR: {record['message']}")
# Or collect a subset with itertools
import itertools
first_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))
print(f"Loaded first {len(first_100)} records")

Python itertools를 사용하면 중간 결과를 구체화하지 않고 필터링, 배칭, 슬라이싱과 같은 스트리밍 연산을 조합할 수 있습니다. 체인의 각 단계는 한 번에 하나의 레코드를 처리하므로 사용 가능한 RAM보다 큰 파일도 처리할 수 있습니다.

itertools를 사용한 체인 처리
import json
import itertools
from typing import Generator, Iterable
def stream_jsonl(path: str) -> Generator[dict, None, None]:
with open(path, 'r') as f:
for line in f:
if line.strip():
yield json.loads(line)
def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:
"""Filter records where key equals value."""
for record in records:
if record.get(key) == value:
yield record
def batch(iterable: Iterable, size: int) -> Generator[list, None, None]:
"""Yield successive batches of the given size."""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
# Compose a streaming pipeline
records = stream_jsonl('access_logs.jsonl')
errors = filter_records(records, 'status', 500)
for i, error_batch in enumerate(batch(errors, 50)):
print(f"Batch {i + 1}: {len(error_batch)} errors")
# Send batch to alerting system, database, etc.
for record in error_batch:
print(f" {record['timestamp']} - {record['path']}")

JSONL을 활용한 Server-Sent Events

Server-Sent Events(SSE)는 서버에서 클라이언트로 실시간 업데이트를 푸시하기 위한 간단한 HTTP 기반 프로토콜입니다. 각 SSE 이벤트는 data 필드를 포함하며, JSONL은 각 레코드가 클라이언트가 즉시 파싱할 수 있는 완전한 JSON 객체이므로 자연스러운 페이로드 형식입니다. WebSocket과 달리 SSE는 표준 HTTP를 통해 작동하며 특별한 프록시 구성이 필요 없고 실패 시 자동으로 재연결합니다.

서버는 Content-Type을 text/event-stream으로 설정하고 각 JSONL 레코드를 SSE data 필드로 전송합니다. 연결은 열린 상태로 유지되며 새 이벤트가 사용 가능해지면 서버가 푸시합니다.

SSE 서버 (Node.js / Express)
import express from 'express';
const app = express();
// Simulate a real-time data source
function* generateMetrics() {
let id = 0;
while (true) {
yield {
id: ++id,
cpu: Math.random() * 100,
memory: Math.random() * 16384,
timestamp: new Date().toISOString(),
};
}
}
app.get('/api/stream/metrics', (req, res) => {
// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.flushHeaders();
const metrics = generateMetrics();
const interval = setInterval(() => {
const record = metrics.next().value;
// Each SSE event contains one JSONL record
res.write(`data: ${JSON.stringify(record)}\n\n`);
}, 1000);
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
app.listen(3000, () => console.log('SSE server on :3000'));

브라우저는 EventSource API를 사용하여 SSE 엔드포인트에 연결합니다. 수신되는 각 이벤트에는 실시간으로 파싱되고 렌더링되는 JSONL 레코드가 포함됩니다. EventSource는 연결이 끊어지면 자동으로 재연결을 처리합니다.

SSE 클라이언트 (브라우저)
// Connect to the SSE endpoint
const source = new EventSource('/api/stream/metrics');
// Parse each JSONL record as it arrives
source.onmessage = (event) => {
const record = JSON.parse(event.data);
console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);
// Update your UI in real time
updateDashboard(record);
};
source.onerror = (err) => {
console.error('SSE connection error:', err);
// EventSource automatically reconnects
};
// For Fetch-based SSE (more control over the stream)
async function fetchSSE(url, onRecord) {
const response = await fetch(url);
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
const events = buffer.split('\n\n');
buffer = events.pop();
for (const event of events) {
const dataLine = event.split('\n').find(l => l.startsWith('data: '));
if (dataLine) {
const record = JSON.parse(dataLine.slice(6));
onRecord(record);
}
}
}
}
await fetchSSE('/api/stream/metrics', (record) => {
console.log('Metric:', record);
});

WebSocket + JSONL 스트리밍

SSE가 서버에서 클라이언트로의 푸시를 처리하는 반면, WebSocket은 양방향 통신을 가능하게 하여 양쪽 모두 언제든지 JSONL 레코드를 보낼 수 있습니다. 이는 협업 편집, 사용자 명령이 있는 실시간 대시보드, 양방향 데이터 동기화와 같은 인터랙티브 애플리케이션에 이상적입니다. 각 WebSocket 메시지는 단일 JSONL 레코드를 포함하므로 양쪽 모두에서 파싱이 간단합니다.

JSONL 메시지를 사용하는 WebSocket 서버
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
console.log('Client connected');
// Send JSONL records to the client
const interval = setInterval(() => {
const record = {
type: 'metric',
value: Math.random() * 100,
timestamp: new Date().toISOString(),
};
ws.send(JSON.stringify(record));
}, 500);
// Receive JSONL records from the client
ws.on('message', (data) => {
try {
const record = JSON.parse(data.toString());
console.log(`Received: ${record.type}`, record);
// Echo back an acknowledgment
ws.send(JSON.stringify({
type: 'ack',
originalId: record.id,
processedAt: new Date().toISOString(),
}));
} catch (err) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
});
ws.on('close', () => {
clearInterval(interval);
console.log('Client disconnected');
});
});
// Client-side usage:
// const ws = new WebSocket('ws://localhost:8080');
// ws.onmessage = (e) => {
// const record = JSON.parse(e.data);
// console.log(record);
// };
// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));

각 WebSocket 메시지는 메시지당 하나의 레코드라는 JSONL 규칙을 따르는 단일 JSON 객체입니다. 서버는 500ms 간격으로 클라이언트에 메트릭을 스트리밍하면서 클라이언트로부터의 명령도 수신합니다. 양방향 모두 동일한 JSON 직렬화를 사용하므로 프로토콜이 대칭적이고 디버깅이 용이합니다.

실전 로그 스트리밍

JSONL 스트리밍의 가장 일반적인 실제 사용 사례 중 하나는 중앙 집중식 로그 수집입니다. 애플리케이션은 구조화된 로그 항목을 JSONL 줄로 stdout이나 파일에 쓰고, 수집기 에이전트가 출력을 실시간으로 테일링하여 중앙 시스템으로 레코드를 전달합니다. 이 패턴은 Docker, Kubernetes 및 대부분의 클라우드 로깅 서비스에서 사용됩니다.

실시간 JSONL 로그 수집기
import { createReadStream, watchFile } from 'node:fs';
import { createInterface } from 'node:readline';
import { stat } from 'node:fs/promises';
class JsonlLogCollector {
constructor(filePath, handlers = {}) {
this.filePath = filePath;
this.handlers = handlers;
this.position = 0;
this.running = false;
}
async start() {
this.running = true;
// Read existing content first
await this.readFrom(0);
// Watch for new appends
this.watch();
}
async readFrom(startPos) {
const rl = createInterface({
input: createReadStream(this.filePath, {
encoding: 'utf-8',
start: startPos,
}),
crlfDelay: Infinity,
});
for await (const line of rl) {
if (!line.trim()) continue;
try {
const record = JSON.parse(line);
await this.dispatch(record);
} catch (err) {
this.handlers.onError?.(err, line);
}
}
const info = await stat(this.filePath);
this.position = info.size;
}
watch() {
watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {
if (!this.running) return;
if (curr.size > this.position) {
await this.readFrom(this.position);
}
});
}
async dispatch(record) {
const level = record.level || 'info';
this.handlers[level]?.(record);
this.handlers.onRecord?.(record);
}
stop() {
this.running = false;
}
}
// Usage
const collector = new JsonlLogCollector('app.log.jsonl', {
error: (r) => console.error(`[ERROR] ${r.message}`),
warn: (r) => console.warn(`[WARN] ${r.message}`),
onRecord: (r) => forwardToElasticsearch(r),
onError: (err, line) => console.error('Bad line:', line),
});
await collector.start();

이 수집기는 JSONL 로그 파일을 실시간으로 테일링하며 로그 레벨에 따라 레코드를 핸들러에 디스패치합니다. 초기 캐치업 패스 이후에는 새로운 콘텐츠만 읽도록 파일 위치를 추적합니다. 프로덕션에서는 파일 로테이션 감지, 그레이스풀 셧다운, 중앙 로깅 시스템에 대한 네트워크 호출을 줄이기 위한 배치 전달을 추가해야 합니다.

무료 JSONL 도구 사용해보기

스트리밍 파이프라인을 구축하기 전에 JSONL 데이터를 검사하거나 변환하고 싶으신가요? 무료 온라인 도구를 사용하여 브라우저에서 직접 JSONL 파일을 보고, 변환하고, 포맷하세요. 모든 처리는 로컬에서 이루어지므로 데이터 프라이버시가 보장됩니다.

온라인으로 JSONL 파일 작업하기

브라우저에서 최대 1GB의 JSONL 파일을 보고, 검증하고, 변환하세요. 업로드 불필요, 100% 프라이빗.

자주 묻는 질문

JSONL 스트리밍 — 실시간 JSON Lines 처리 (Node/Python) | jsonl.co