JSONL w potokach ETL: Kafka, Airflow i hurtownie danych

Praktyczny przewodnik uzywania JSONL (JSON Lines) jako formatu wymiany w nowoczesnych potokach ETL. Dowiedz sie, jak produkowac i konsumowac zdarzenia JSONL w Kafka, orkiestrowac transformacje JSONL w DAG-ach Airflow i ladowac dane JSONL do Snowflake, BigQuery i Redshift.

Ostatnia aktualizacja: luty 2026

Dlaczego JSONL to idealny format wymiany ETL

Potoki Extract-Transform-Load (ETL) przenosa dane miedzy systemami, ktore rzadko wspoldziela ten sam schemat. JSONL jest wyjatkowo odpowiedni do tego zadania, poniewaz kazda linia jest samodzielnym dokumentem JSON. Nie ma naglowkow do utracenia, ogranicznikow do ucieczki ani wieloliniowych rekordow do ponownego zlozenia po czesciowej awarii. Kazda linia moze byc walidowana, transformowana i kierowana niezaleznie, co jest dokladnie tym, czego potrzebuja systemy rozproszone, takie jak Kafka i Airflow.

W przeciwienstwie do CSV, JSONL zachowuje zagniezdzene struktury, tablice i typowane wartosci bez dwuznacznosci. W przeciwienstwie do Parquet lub Avro, JSONL jest czytelny dla czlowieka i nie wymaga specjalnych narzedzi do inspekcji. Te wlasciwosci czynia JSONL naturalnym formatem do laczenia etapow potoku: tematy Kafka przenoza wiadomosci JSONL, zadania Airflow czytaja i zapisuja pliki JSONL do magazynu obiektow, a hurtownie danych pozyskuja JSONL poprzez natywne komendy ladowania masowego. W tym przewodniku poznasz konkretne wzorce dla kazdego z tych etapow z kodem gotowym do produkcji.

Kafka + JSONL: strumieniowanie zdarzen

Apache Kafka to podstawa ETL w czasie rzeczywistym. Producenci emituja zdarzenia jako wiadomosci zakodowane w JSONL (jeden obiekt JSON na wiadomosc Kafka), a konsumenci czytaja je linia po linii do dalszych systemow. Poniewaz kazda wiadomosc Kafka to pojedynczy rekord JSONL, format idealnie pasuje do architektur opartej na logu Kafka. Serializacja jest trywialna, a konsumenci moga parsowac kazda wiadomosc niezaleznie bez buforowania.

Producent Kafka, ktory serializuje slowniki Python jako wiadomosci zakodowane w JSONL. Kazda wiadomosc to pojedyncza linia JSON wyslana do tematu Kafka. value_serializer obsluguje kodowanie automatycznie, wiec po prostu przekazujesz dict do metody send().

Producent JSONL Kafka
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
)
# Odczytaj plik JSONL i opublikuj kazda linie jako wiadomosc Kafka
with open('events.jsonl', 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
producer.send('etl-events', value=event)
producer.flush()
print('Wszystkie zdarzenia opublikowane do Kafka')

Konsument Kafka, ktory odczytuje wiadomosci JSONL i zapisuje je do lokalnego pliku JSONL w partiach. Przetwarzanie wsadowe zmniejsza narzut I/O i pozwala dalszym systemom przetwarzac wieksze porcje naraz. Konsument zatwierdza offsety dopiero po pelnym zapisie partii, zapewniajac dostarczenie co najmniej jednokrotne.

Konsument JSONL Kafka z zapisem wsadowym
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'etl-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='etl-consumer-group',
enable_auto_commit=False,
auto_offset_reset='earliest',
)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message.value)
if len(batch) >= BATCH_SIZE:
# Zapisz partie do pliku JSONL
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()
print(f'Zatwierdzona partia {len(batch)} rekordow')
batch.clear()
# Oproznij pozostale rekordy
if batch:
with open('output/batch.jsonl', 'a') as f:
for record in batch:
f.write(json.dumps(record) + '\n')
consumer.commit()

Apache Airflow: orkiestracja zadan JSONL

Apache Airflow to standardowe narzedzie orkiestracji dla wsadowego ETL. Typowy potok Airflow oparty na JSONL ma trzy etapy: ekstrakcja surowych danych do JSONL, transformacja pliku JSONL (filtrowanie, wzbogacanie, przeksztalcanie) i ladowanie wyniku do hurtowni danych lub magazynu obiektow. Kazdy etap czyta i zapisuje pliki JSONL, co sprawia, ze stan posredni jest mozliwy do inspekcji, a poszczegolne zadania mozna ponownie uruchamiac niezaleznie.

Kompletny DAG Airflow, ktory ekstrahuje dane z API, transformuje je jako JSONL i laduje wynik do przechowywania w chmurze. Kazde zadanie komunikuje sie poprzez pliki JSONL przechowywane we wspolnym magazynie, co ulatwia debugowanie i ponawianie potoku.

DAG Airflow: potok ETL JSONL
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import json
import requests
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract(**context):
"""Ekstrahuj dane z API i zapisz JSONL na dysk lokalny."""
ds = context['ds'] # data wykonania: YYYY-MM-DD
response = requests.get(
'https://api.example.com/events',
params={'date': ds},
)
events = response.json()
path = f'/tmp/raw_{ds}.jsonl'
with open(path, 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
return path
def transform(**context):
"""Odczytaj surowy JSONL, filtruj i wzbogac, zapisz oczyszczony JSONL."""
ds = context['ds']
input_path = f'/tmp/raw_{ds}.jsonl'
output_path = f'/tmp/clean_{ds}.jsonl'
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
if record.get('status') != 'valid':
continue
record['processed_at'] = datetime.utcnow().isoformat()
fout.write(json.dumps(record) + '\n')
return output_path
def load(**context):
"""Przeslij oczyszczony JSONL do S3."""
ds = context['ds']
local_path = f'/tmp/clean_{ds}.jsonl'
s3 = S3Hook(aws_conn_id='aws_default')
s3.load_file(
filename=local_path,
key=f'etl/events/{ds}/data.jsonl',
bucket_name='data-lake',
replace=True,
)
with DAG(
'jsonl_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t_extract = PythonOperator(task_id='extract', python_callable=extract)
t_transform = PythonOperator(task_id='transform', python_callable=transform)
t_load = PythonOperator(task_id='load', python_callable=load)
t_extract >> t_transform >> t_load

Dla duzych plikow JSONL, ktore nie mieszcza sie w pamieci, uzyj strumieniowej transformacji, ktora przetwarza jedna linie naraz. Ten wzorzec utrzymuje stale zuzycie pamieci niezaleznie od rozmiaru pliku i dobrze sprawdza sie w zadaniach Airflow, gdzie zasoby workerow moga byc ograniczone.

Strumieniowe zadanie transformacji JSONL
import json
from typing import Callable
def stream_transform(
input_path: str,
output_path: str,
transform_fn: Callable[[dict], dict | None],
batch_size: int = 5000,
) -> dict:
"""Strumieniowo transformuj plik JSONL za pomoca funkcji zdefiniowanej przez uzytkownika.
Zwroc None z transform_fn, aby pominac rekord."""
stats = {'input': 0, 'output': 0, 'skipped': 0}
batch = []
with open(input_path) as fin, open(output_path, 'w') as fout:
for line in fin:
line = line.strip()
if not line:
continue
stats['input'] += 1
record = json.loads(line)
result = transform_fn(record)
if result is None:
stats['skipped'] += 1
continue
batch.append(json.dumps(result))
if len(batch) >= batch_size:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
batch.clear()
if batch:
fout.write('\n'.join(batch) + '\n')
stats['output'] += len(batch)
return stats
# Uzycie wewnatrz zadania Airflow
def enrich(record):
if record.get('amount', 0) < 0:
return None # pomin nieprawidlowe
record['currency'] = record.get('currency', 'USD').upper()
return record
stats = stream_transform(
'/tmp/raw.jsonl',
'/tmp/enriched.jsonl',
transform_fn=enrich,
)
print(f'Przetworzono: {stats}')

Ladowanie JSONL do hurtowni danych

Nowoczesne hurtownie danych maja pierwszorzedna obsluge pozyskiwania plikow JSONL. Snowflake, BigQuery i Redshift moga ladowac JSONL bezposrednio z przechowywania w chmurze, parsowac JSON w locie i przechowywac wyniki w ustrukturyzowanych tabelach. Eliminuje to potrzebe posredniego kroku konwersji CSV i zachowuje zagniezdzene typy danych.

Snowflake

COPY INTO

Snowflake pozyskuje JSONL za pomoca komendy COPY INTO ze stageow S3, GCS lub Azure Blob. Uzyj opcji FILE_FORMAT z TYPE = 'JSON', aby parsowac kazda linie jako oddzielny dokument JSON. Typ kolumny VARIANT przechowuje polusrtukturalne dane natywnie.

BigQuery

Natywny

BigQuery laduje pliki JSONL z Google Cloud Storage z automatycznym wykrywaniem schematu. Komenda bq load z --source_format=NEWLINE_DELIMITED_JSON obsluguje JSONL natywnie. BigQuery moze rowniez automatycznie wykrywac typy kolumn z wartosci JSON.

Redshift

COPY

Amazon Redshift obsluguje ladowanie JSONL z S3 za pomoca komendy COPY z FORMAT AS JSON. Mozesz udostepnic plik JSONPaths do mapowania kluczy JSON na kolumny tabeli, dajac Ci szczegolowa kontrole nad procesem ladowania.

Zaladuj plik JSONL ze stagea S3 do tabeli Snowflake. Kazda linia staje sie wierszem z kolumna VARIANT zawierajaca sparsowany JSON. Nastepnie mozesz splaszczyc kolumne VARIANT do typowanych kolumn za pomoca Snowflake SQL.

Snowflake: COPY JSONL z S3
-- Utworz stage wskazujacy na Twoj bucket S3
CREATE OR REPLACE STAGE etl_stage
URL = 's3://data-lake/etl/events/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);
-- Utworz tabele docelowa
CREATE TABLE IF NOT EXISTS raw_events (
data VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Zaladuj pliki JSONL ze stagea
COPY INTO raw_events (data)
FROM @etl_stage
FILE_FORMAT = (TYPE = 'JSON')
PATTERN = '.*\.jsonl'
ON_ERROR = 'CONTINUE';
-- Odpytaj zaladowane dane
SELECT
data:id::INT AS event_id,
data:user_id::STRING AS user_id,
data:event_type::STRING AS event_type,
data:timestamp::TIMESTAMP AS event_ts
FROM raw_events
LIMIT 10;

Uzyj klienta Python BigQuery do zaladowania pliku JSONL z Google Cloud Storage do tabeli BigQuery. Klient obsluguje wykrywanie schematu, partycjonowanie i raportowanie bledow. To podejscie dobrze integruje sie z zadaniami Airflow.

BigQuery: ladowanie JSONL za pomoca klienta Python
from google.cloud import bigquery
client = bigquery.Client(project='my-project')
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
max_bad_records=10,
)
# Zaladuj z GCS
uri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'
table_ref = 'my-project.analytics.raw_events'
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config,
)
load_job.result() # Czekaj na zakonczenie
table = client.get_table(table_ref)
print(f'Zaladowano {load_job.output_rows} wierszy')
print(f'Tabela ma teraz {table.num_rows} wierszy lacznie')

Tolerancja na bledy i idempotentnosc

Potoki ETL zawodza. Sieci zrywaja polaczenia, API przekraczaja limity czasu, a workery wyczerpuja pamiec. JSONL upraszcza odzyskiwanie, poniewaz kazda linia jest niezalezna. Mozesz zapisac punkt kontrolny swojej pozycji (numer linii), wznowic od ostatniej udanej linii i uniknac ponownego przetwarzania danych. W polaczeniu z idempotentnym zapisem zapewnia to semantyke dokladnie jednokrotna nawet przy dostarczeniu co najmniej jednokrotnym.

Procesor oparty na punktach kontrolnych, ktory sledzi postep wedlug numeru linii. Jesli potok zawiedzie, wznawia od ostatniego zatwierdzonego punktu kontrolnego zamiast ponownie przetwarzac caly plik. Plik punktu kontrolnego to prosty plik tekstowy zawierajacy ostatni pomyslnie przetworzony numer linii.

Przetwarzanie JSONL oparte na punktach kontrolnych
import json
import os
def get_checkpoint(checkpoint_path: str) -> int:
"""Odczytaj ostatni zatwierdzony numer linii."""
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return int(f.read().strip())
return 0
def save_checkpoint(checkpoint_path: str, line_num: int):
"""Atomowo zapisz biezacy punkt kontrolny."""
tmp = checkpoint_path + '.tmp'
with open(tmp, 'w') as f:
f.write(str(line_num))
os.replace(tmp, checkpoint_path) # atomowe na POSIX
def process_with_checkpoint(
input_path: str,
output_path: str,
checkpoint_path: str,
batch_size: int = 1000,
):
"""Przetwarzaj plik JSONL z wznawianiem opartym na punktach kontrolnych."""
start_line = get_checkpoint(checkpoint_path)
processed = 0
with open(input_path) as fin, \
open(output_path, 'a') as fout:
for line_num, line in enumerate(fin, 1):
if line_num <= start_line:
continue # pomin juz przetworzone linie
line = line.strip()
if not line:
continue
record = json.loads(line)
# --- Twoja logika transformacji ---
record['etl_version'] = 'v2'
fout.write(json.dumps(record) + '\n')
processed += 1
if processed % batch_size == 0:
fout.flush()
save_checkpoint(checkpoint_path, line_num)
print(f'Punkt kontrolny w linii {line_num}')
# Koncowy punkt kontrolny
save_checkpoint(checkpoint_path, line_num)
print(f'Gotowe. Przetworzono {processed} rekordow od linii {start_line + 1}')
# Uzycie
process_with_checkpoint(
'input.jsonl',
'output.jsonl',
'checkpoint.txt',
batch_size=5000,
)

Wzorzec punktow kontrolnych dziala z kazdym etapem potoku JSONL. Dla konsumentow Kafka zastap punkt kontrolny pliku zatwierdzonymi offsetami. Dla Airflow przechowuj punkt kontrolny w XCom lub zewnetrznej bazie danych, aby ponownie uruchamiane zadania wznawialy sie od wlasciwej pozycji. Kluczowa obserwacja jest taka, ze liniowy format JSONL sprawia, ze sledzenie pozycji jest trywialne: pojedyncza liczba calkowita (numer linii lub przesuniecie bajtowe) to wszystko, czego potrzebujesz do wznowienia.

Zwaliduj swoj JSONL przed ladowaniem

Wylapuj znieksztalcone rekordy, zanim trafa do Twojego potoku. Uzyj naszych darmowych narzedzi online, aby walidowac, przegladac i konwertowac pliki JSONL w przegladarce.

Przegladaj pliki JSONL natychmiast

Przegladaj, waliduj i konwertuj pliki JSONL do 1GB bezposrednio w przegladarce. Bez przesylania, 100% prywatnosci.

Czesto zadawane pytania

Pipeline ETL JSONL — Kafka, Airflow, BigQuery, Snowflake ...