ETL 파이프라인에서의 JSONL: Kafka, Airflow & 데이터 웨어하우스
현대 ETL 파이프라인에서 JSONL(JSON Lines)을 교환 형식으로 사용하는 실습 가이드입니다. Kafka에서 JSONL 이벤트를 생산하고 소비하는 방법, Airflow DAG에서 JSONL 변환을 오케스트레이션하는 방법, Snowflake, BigQuery, Redshift에 JSONL 데이터를 로드하는 방법을 배우세요.
최종 업데이트: 2026년 2월
왜 JSONL이 이상적인 ETL 교환 형식인가
ETL(Extract-Transform-Load) 파이프라인은 동일한 스키마를 공유하지 않는 시스템 간에 데이터를 이동시킵니다. JSONL은 모든 줄이 독립된 JSON 문서이기 때문에 이 작업에 특히 적합합니다. 잃어버릴 헤더가 없고, 이스케이프할 구분자가 없으며, 부분 실패 후 재조립해야 할 다중 줄 레코드가 없습니다. 각 줄을 독립적으로 검증, 변환, 라우팅할 수 있으며, 이것이 바로 Kafka와 Airflow 같은 분산 시스템에 필요한 것입니다.
CSV와 달리 JSONL은 중첩 구조, 배열, 타입이 지정된 값을 모호함 없이 보존합니다. Parquet나 Avro와 달리 JSONL은 사람이 읽을 수 있으며 검사를 위한 특별한 도구가 필요 없습니다. 이러한 특성으로 JSONL은 파이프라인 단계 간의 접착제 역할을 하는 자연스러운 형식입니다: Kafka 토픽은 JSONL 메시지를 전달하고, Airflow 작업은 객체 스토리지에서 JSONL 파일을 읽고 쓰며, 데이터 웨어하우스는 네이티브 벌크 로드 명령을 통해 JSONL을 수집합니다. 이 가이드에서는 각 단계에 대한 구체적인 패턴과 프로덕션 수준의 코드를 배우게 됩니다.
Kafka + JSONL: 이벤트 스트리밍
Apache Kafka는 실시간 ETL의 백본입니다. 프로듀서는 JSONL로 인코딩된 메시지(Kafka 메시지당 하나의 JSON 객체)로 이벤트를 발행하고, 컨슈머는 이를 줄 단위로 다운스트림 시스템에 읽어들입니다. 각 Kafka 메시지가 단일 JSONL 레코드에 매핑되므로 형식이 Kafka의 로그 기반 아키텍처와 완벽하게 맞습니다. 직렬화가 간단하며, 컨슈머는 버퍼링 없이 각 메시지를 독립적으로 파싱할 수 있습니다.
Python 딕셔너리를 JSONL로 인코딩된 메시지로 직렬화하는 Kafka 프로듀서입니다. 각 메시지는 Kafka 토픽에 전송되는 단일 JSON 줄입니다. value_serializer가 인코딩을 자동으로 처리하므로 send() 메서드에 dict를 전달하기만 하면 됩니다.
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Read a JSONL file and publish each line as a Kafka messagewith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('All events published to Kafka')
JSONL 메시지를 읽고 배치로 로컬 JSONL 파일에 쓰는 Kafka 컨슈머입니다. 배치 처리는 I/O 오버헤드를 줄이고 다운스트림 시스템이 더 큰 청크를 한 번에 처리할 수 있게 합니다. 컨슈머는 배치가 완전히 기록된 후에만 오프셋을 커밋하여 최소 한 번의 전달을 보장합니다.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Write batch to JSONL filewith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Committed batch of {len(batch)} records')batch.clear()# Flush remaining recordsif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow: JSONL 작업 오케스트레이션
Apache Airflow는 배치 ETL의 표준 오케스트레이션 도구입니다. 일반적인 JSONL 기반 Airflow 파이프라인은 세 단계로 구성됩니다: 원시 데이터를 JSONL로 추출, JSONL 파일 변환(필터, 보강, 재구성), 결과를 데이터 웨어하우스 또는 객체 스토어에 로드. 각 단계는 JSONL 파일을 읽고 쓰므로 중간 상태를 검사할 수 있고 개별 작업을 독립적으로 다시 실행할 수 있습니다.
API에서 데이터를 추출하고, JSONL로 변환하고, 결과를 클라우드 스토리지에 로드하는 완전한 Airflow DAG입니다. 각 작업은 공유 스토리지에 저장된 JSONL 파일을 통해 통신하므로 파이프라인을 쉽게 디버깅하고 재시도할 수 있습니다.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Extract data from API and write JSONL to local disk."""ds = context['ds'] # execution date: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Read raw JSONL, filter and enrich, write cleaned JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Upload cleaned JSONL to S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
메모리에 들어가지 않는 대용량 JSONL 파일의 경우, 한 줄씩 읽고 쓰는 스트리밍 변환을 사용하세요. 이 패턴은 파일 크기에 관계없이 메모리 사용량을 일정하게 유지하며, 워커 리소스가 제한될 수 있는 Airflow 작업 내에서 잘 작동합니다.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Stream-transform a JSONL file with a user-defined function.Return None from transform_fn to skip a record."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Usage inside an Airflow taskdef enrich(record):if record.get('amount', 0) < 0:return None # skip invalidrecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Processed: {stats}')
데이터 웨어하우스에 JSONL 로드하기
현대 데이터 웨어하우스는 JSONL 파일 수집에 대한 최상급 지원을 제공합니다. Snowflake, BigQuery, Redshift 모두 클라우드 스토리지에서 직접 JSONL을 로드하고, JSON을 즉석에서 파싱하고, 결과를 구조화된 테이블에 저장할 수 있습니다. 이를 통해 중간 CSV 변환 단계가 필요 없어지고 중첩 데이터 타입이 보존됩니다.
Snowflake
COPY INTOSnowflake는 S3, GCS 또는 Azure Blob 스테이지에서 COPY INTO 명령을 통해 JSONL을 수집합니다. FILE_FORMAT 옵션에 TYPE = 'JSON'을 사용하여 각 줄을 별도의 JSON 문서로 파싱합니다. VARIANT 컬럼 타입은 반정형 데이터를 네이티브로 저장합니다.
BigQuery
네이티브BigQuery는 자동 스키마 감지로 Google Cloud Storage에서 JSONL 파일을 로드합니다. bq load 명령에 --source_format=NEWLINE_DELIMITED_JSON을 사용하면 JSONL을 네이티브로 처리합니다. BigQuery는 JSON 값에서 컬럼 타입을 자동 감지할 수도 있습니다.
Redshift
COPYAmazon Redshift는 FORMAT AS JSON을 사용한 COPY 명령으로 S3에서 JSONL 로딩을 지원합니다. JSONPaths 파일을 제공하여 JSON 키를 테이블 컬럼에 매핑할 수 있어 로딩 프로세스를 세밀하게 제어할 수 있습니다.
S3 스테이지에서 Snowflake 테이블로 JSONL 파일을 로드합니다. 각 줄은 파싱된 JSON을 포함하는 VARIANT 컬럼을 가진 행이 됩니다. 그런 다음 Snowflake SQL을 사용하여 VARIANT 컬럼을 타입이 지정된 컬럼으로 평탄화할 수 있습니다.
-- Create a stage pointing to your S3 bucketCREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Create the target tableCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Load JSONL files from the stageCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Query the loaded dataSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
BigQuery Python 클라이언트를 사용하여 Google Cloud Storage에서 BigQuery 테이블로 JSONL 파일을 로드합니다. 클라이언트가 스키마 감지, 파티셔닝, 오류 보고를 처리합니다. 이 접근 방식은 Airflow 작업과 잘 통합됩니다.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Load from GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Wait for completiontable = client.get_table(table_ref)print(f'Loaded {load_job.output_rows} rows')print(f'Table now has {table.num_rows} total rows')
장애 허용성과 멱등성
ETL 파이프라인은 실패합니다. 네트워크가 끊기고, API가 타임아웃되고, 워커가 메모리 부족이 됩니다. JSONL은 모든 줄이 독립적이기 때문에 복구를 간단하게 만듭니다. 위치(줄 번호)를 체크포인트하고, 마지막 성공한 줄부터 재개하고, 데이터 재처리를 피할 수 있습니다. 멱등한 쓰기와 결합하면, 최소 한 번의 전달에서도 정확히 한 번의 시맨틱을 제공합니다.
줄 번호로 진행 상황을 추적하는 체크포인트 기반 프로세서입니다. 파이프라인이 실패하면 전체 파일을 재처리하는 대신 마지막으로 커밋된 체크포인트에서 재개합니다. 체크포인트 파일은 마지막으로 성공적으로 처리된 줄 번호를 포함하는 간단한 텍스트 파일입니다.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Read the last committed line number."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomically save the current checkpoint."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomic on POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Process a JSONL file with checkpoint-based resumption."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # skip already-processed linesline = line.strip()if not line:continuerecord = json.loads(line)# --- your transform logic ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Checkpoint at line {line_num}')# Final checkpointsave_checkpoint(checkpoint_path, line_num)print(f'Done. Processed {processed} records from line {start_line + 1}')# Usageprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
체크포인트 패턴은 모든 JSONL 파이프라인 단계에서 작동합니다. Kafka 컨슈머의 경우 파일 체크포인트를 커밋된 오프셋으로 대체합니다. Airflow의 경우 체크포인트를 XCom이나 외부 데이터베이스에 저장하여 재시도된 작업이 올바른 위치에서 재개되도록 합니다. 핵심 인사이트는 JSONL의 줄 기반 형식이 위치 추적을 간단하게 만든다는 것입니다: 단일 정수(줄 번호 또는 바이트 오프셋)만으로 재개할 수 있습니다.
로딩 전 JSONL 검증하기
잘못된 형식의 레코드가 파이프라인에 도달하기 전에 잡으세요. 무료 온라인 도구로 브라우저에서 JSONL 파일을 검증, 보기, 변환하세요.