ETL 파이프라인에서의 JSONL: Kafka, Airflow & 데이터 웨어하우스

현대 ETL 파이프라인에서 JSONL(JSON Lines)을 교환 형식으로 사용하는 실습 가이드입니다. Kafka에서 JSONL 이벤트를 생산하고 소비하는 방법, Airflow DAG에서 JSONL 변환을 오케스트레이션하는 방법, Snowflake, BigQuery, Redshift에 JSONL 데이터를 로드하는 방법을 배우세요.

최종 업데이트: 2026년 2월

왜 JSONL이 이상적인 ETL 교환 형식인가

ETL(Extract-Transform-Load) 파이프라인은 동일한 스키마를 공유하지 않는 시스템 간에 데이터를 이동시킵니다. JSONL은 모든 줄이 독립된 JSON 문서이기 때문에 이 작업에 특히 적합합니다. 잃어버릴 헤더가 없고, 이스케이프할 구분자가 없으며, 부분 실패 후 재조립해야 할 다중 줄 레코드가 없습니다. 각 줄을 독립적으로 검증, 변환, 라우팅할 수 있으며, 이것이 바로 Kafka와 Airflow 같은 분산 시스템에 필요한 것입니다.

CSV와 달리 JSONL은 중첩 구조, 배열, 타입이 지정된 값을 모호함 없이 보존합니다. Parquet나 Avro와 달리 JSONL은 사람이 읽을 수 있으며 검사를 위한 특별한 도구가 필요 없습니다. 이러한 특성으로 JSONL은 파이프라인 단계 간의 접착제 역할을 하는 자연스러운 형식입니다: Kafka 토픽은 JSONL 메시지를 전달하고, Airflow 작업은 객체 스토리지에서 JSONL 파일을 읽고 쓰며, 데이터 웨어하우스는 네이티브 벌크 로드 명령을 통해 JSONL을 수집합니다. 이 가이드에서는 각 단계에 대한 구체적인 패턴과 프로덕션 수준의 코드를 배우게 됩니다.

Kafka + JSONL: 이벤트 스트리밍

Apache Kafka는 실시간 ETL의 백본입니다. 프로듀서는 JSONL로 인코딩된 메시지(Kafka 메시지당 하나의 JSON 객체)로 이벤트를 발행하고, 컨슈머는 이를 줄 단위로 다운스트림 시스템에 읽어들입니다. 각 Kafka 메시지가 단일 JSONL 레코드에 매핑되므로 형식이 Kafka의 로그 기반 아키텍처와 완벽하게 맞습니다. 직렬화가 간단하며, 컨슈머는 버퍼링 없이 각 메시지를 독립적으로 파싱할 수 있습니다.

Python 딕셔너리를 JSONL로 인코딩된 메시지로 직렬화하는 Kafka 프로듀서입니다. 각 메시지는 Kafka 토픽에 전송되는 단일 JSON 줄입니다. value_serializer가 인코딩을 자동으로 처리하므로 send() 메서드에 dict를 전달하기만 하면 됩니다.

Kafka JSONL 프로듀서
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Read a JSONL file and publish each line as a Kafka message
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('All events published to Kafka')

JSONL 메시지를 읽고 배치로 로컬 JSONL 파일에 쓰는 Kafka 컨슈머입니다. 배치 처리는 I/O 오버헤드를 줄이고 다운스트림 시스템이 더 큰 청크를 한 번에 처리할 수 있게 합니다. 컨슈머는 배치가 완전히 기록된 후에만 오프셋을 커밋하여 최소 한 번의 전달을 보장합니다.

배치 쓰기를 포함한 Kafka JSONL 컨슈머
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Write batch to JSONL file
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Committed batch of {len(batch)} records')
batch.clear()
# Flush remaining records
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow: JSONL 작업 오케스트레이션

Apache Airflow는 배치 ETL의 표준 오케스트레이션 도구입니다. 일반적인 JSONL 기반 Airflow 파이프라인은 세 단계로 구성됩니다: 원시 데이터를 JSONL로 추출, JSONL 파일 변환(필터, 보강, 재구성), 결과를 데이터 웨어하우스 또는 객체 스토어에 로드. 각 단계는 JSONL 파일을 읽고 쓰므로 중간 상태를 검사할 수 있고 개별 작업을 독립적으로 다시 실행할 수 있습니다.

API에서 데이터를 추출하고, JSONL로 변환하고, 결과를 클라우드 스토리지에 로드하는 완전한 Airflow DAG입니다. 각 작업은 공유 스토리지에 저장된 JSONL 파일을 통해 통신하므로 파이프라인을 쉽게 디버깅하고 재시도할 수 있습니다.

Airflow DAG: JSONL ETL 파이프라인
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Extract data from API and write JSONL to local disk."""
ds = context['ds'] # execution date: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Read raw JSONL, filter and enrich, write cleaned JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Upload cleaned JSONL to S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

메모리에 들어가지 않는 대용량 JSONL 파일의 경우, 한 줄씩 읽고 쓰는 스트리밍 변환을 사용하세요. 이 패턴은 파일 크기에 관계없이 메모리 사용량을 일정하게 유지하며, 워커 리소스가 제한될 수 있는 Airflow 작업 내에서 잘 작동합니다.

스트리밍 JSONL 변환 작업
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Stream-transform a JSONL file with a user-defined function.
Return None from transform_fn to skip a record."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Usage inside an Airflow task
def enrich(record):
if record.get('amount', 0) < 0:
return None # skip invalid
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Processed: {stats}')

데이터 웨어하우스에 JSONL 로드하기

현대 데이터 웨어하우스는 JSONL 파일 수집에 대한 최상급 지원을 제공합니다. Snowflake, BigQuery, Redshift 모두 클라우드 스토리지에서 직접 JSONL을 로드하고, JSON을 즉석에서 파싱하고, 결과를 구조화된 테이블에 저장할 수 있습니다. 이를 통해 중간 CSV 변환 단계가 필요 없어지고 중첩 데이터 타입이 보존됩니다.

Snowflake

COPY INTO

Snowflake는 S3, GCS 또는 Azure Blob 스테이지에서 COPY INTO 명령을 통해 JSONL을 수집합니다. FILE_FORMAT 옵션에 TYPE = 'JSON'을 사용하여 각 줄을 별도의 JSON 문서로 파싱합니다. VARIANT 컬럼 타입은 반정형 데이터를 네이티브로 저장합니다.

BigQuery

네이티브

BigQuery는 자동 스키마 감지로 Google Cloud Storage에서 JSONL 파일을 로드합니다. bq load 명령에 --source_format=NEWLINE_DELIMITED_JSON을 사용하면 JSONL을 네이티브로 처리합니다. BigQuery는 JSON 값에서 컬럼 타입을 자동 감지할 수도 있습니다.

Redshift

COPY

Amazon Redshift는 FORMAT AS JSON을 사용한 COPY 명령으로 S3에서 JSONL 로딩을 지원합니다. JSONPaths 파일을 제공하여 JSON 키를 테이블 컬럼에 매핑할 수 있어 로딩 프로세스를 세밀하게 제어할 수 있습니다.

S3 스테이지에서 Snowflake 테이블로 JSONL 파일을 로드합니다. 각 줄은 파싱된 JSON을 포함하는 VARIANT 컬럼을 가진 행이 됩니다. 그런 다음 Snowflake SQL을 사용하여 VARIANT 컬럼을 타입이 지정된 컬럼으로 평탄화할 수 있습니다.

Snowflake: S3에서 JSONL COPY
-- Create a stage pointing to your S3 bucket
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Create the target table
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load JSONL files from the stage
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Query the loaded data
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

BigQuery Python 클라이언트를 사용하여 Google Cloud Storage에서 BigQuery 테이블로 JSONL 파일을 로드합니다. 클라이언트가 스키마 감지, 파티셔닝, 오류 보고를 처리합니다. 이 접근 방식은 Airflow 작업과 잘 통합됩니다.

BigQuery: Python 클라이언트로 JSONL 로드
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Load from GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Wait for completion
table = client.get_table(table_ref)
print(f'Loaded {load_job.output_rows} rows')
print(f'Table now has {table.num_rows} total rows')

장애 허용성과 멱등성

ETL 파이프라인은 실패합니다. 네트워크가 끊기고, API가 타임아웃되고, 워커가 메모리 부족이 됩니다. JSONL은 모든 줄이 독립적이기 때문에 복구를 간단하게 만듭니다. 위치(줄 번호)를 체크포인트하고, 마지막 성공한 줄부터 재개하고, 데이터 재처리를 피할 수 있습니다. 멱등한 쓰기와 결합하면, 최소 한 번의 전달에서도 정확히 한 번의 시맨틱을 제공합니다.

줄 번호로 진행 상황을 추적하는 체크포인트 기반 프로세서입니다. 파이프라인이 실패하면 전체 파일을 재처리하는 대신 마지막으로 커밋된 체크포인트에서 재개합니다. 체크포인트 파일은 마지막으로 성공적으로 처리된 줄 번호를 포함하는 간단한 텍스트 파일입니다.

체크포인트 기반 JSONL 처리
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Read the last committed line number."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Atomically save the current checkpoint."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomic on POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Process a JSONL file with checkpoint-based resumption."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # skip already-processed lines
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- your transform logic ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Checkpoint at line {line_num}')
# Final checkpoint
save_checkpoint(checkpoint_path, line_num)
print(f'Done. Processed {processed} records from line {start_line + 1}')
# Usage
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

체크포인트 패턴은 모든 JSONL 파이프라인 단계에서 작동합니다. Kafka 컨슈머의 경우 파일 체크포인트를 커밋된 오프셋으로 대체합니다. Airflow의 경우 체크포인트를 XCom이나 외부 데이터베이스에 저장하여 재시도된 작업이 올바른 위치에서 재개되도록 합니다. 핵심 인사이트는 JSONL의 줄 기반 형식이 위치 추적을 간단하게 만든다는 것입니다: 단일 정수(줄 번호 또는 바이트 오프셋)만으로 재개할 수 있습니다.

로딩 전 JSONL 검증하기

잘못된 형식의 레코드가 파이프라인에 도달하기 전에 잡으세요. 무료 온라인 도구로 브라우저에서 JSONL 파일을 검증, 보기, 변환하세요.

JSONL 파일을 즉시 검사하기

브라우저에서 최대 1GB의 JSONL 파일을 보고, 검증하고, 변환하세요. 업로드 불필요, 100% 프라이빗.

자주 묻는 질문

JSONL ETL 파이프라인 — Kafka, Airflow, BigQuery, Snowflake | j...