JSONL w potokach ETL: Kafka, Airflow i hurtownie danych
Praktyczny przewodnik uzywania JSONL (JSON Lines) jako formatu wymiany w nowoczesnych potokach ETL. Dowiedz sie, jak produkowac i konsumowac zdarzenia JSONL w Kafka, orkiestrowac transformacje JSONL w DAG-ach Airflow i ladowac dane JSONL do Snowflake, BigQuery i Redshift.
Ostatnia aktualizacja: luty 2026
Dlaczego JSONL to idealny format wymiany ETL
Potoki Extract-Transform-Load (ETL) przenosa dane miedzy systemami, ktore rzadko wspoldziela ten sam schemat. JSONL jest wyjatkowo odpowiedni do tego zadania, poniewaz kazda linia jest samodzielnym dokumentem JSON. Nie ma naglowkow do utracenia, ogranicznikow do ucieczki ani wieloliniowych rekordow do ponownego zlozenia po czesciowej awarii. Kazda linia moze byc walidowana, transformowana i kierowana niezaleznie, co jest dokladnie tym, czego potrzebuja systemy rozproszone, takie jak Kafka i Airflow.
W przeciwienstwie do CSV, JSONL zachowuje zagniezdzene struktury, tablice i typowane wartosci bez dwuznacznosci. W przeciwienstwie do Parquet lub Avro, JSONL jest czytelny dla czlowieka i nie wymaga specjalnych narzedzi do inspekcji. Te wlasciwosci czynia JSONL naturalnym formatem do laczenia etapow potoku: tematy Kafka przenoza wiadomosci JSONL, zadania Airflow czytaja i zapisuja pliki JSONL do magazynu obiektow, a hurtownie danych pozyskuja JSONL poprzez natywne komendy ladowania masowego. W tym przewodniku poznasz konkretne wzorce dla kazdego z tych etapow z kodem gotowym do produkcji.
Kafka + JSONL: strumieniowanie zdarzen
Apache Kafka to podstawa ETL w czasie rzeczywistym. Producenci emituja zdarzenia jako wiadomosci zakodowane w JSONL (jeden obiekt JSON na wiadomosc Kafka), a konsumenci czytaja je linia po linii do dalszych systemow. Poniewaz kazda wiadomosc Kafka to pojedynczy rekord JSONL, format idealnie pasuje do architektur opartej na logu Kafka. Serializacja jest trywialna, a konsumenci moga parsowac kazda wiadomosc niezaleznie bez buforowania.
Producent Kafka, ktory serializuje slowniki Python jako wiadomosci zakodowane w JSONL. Kazda wiadomosc to pojedyncza linia JSON wyslana do tematu Kafka. value_serializer obsluguje kodowanie automatycznie, wiec po prostu przekazujesz dict do metody send().
import jsonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'),acks='all',retries=3,)# Odczytaj plik JSONL i opublikuj kazda linie jako wiadomosc Kafkawith open('events.jsonl', 'r') as f:for line in f:line = line.strip()if not line:continueevent = json.loads(line)producer.send('etl-events', value=event)producer.flush()print('Wszystkie zdarzenia opublikowane do Kafka')
Konsument Kafka, ktory odczytuje wiadomosci JSONL i zapisuje je do lokalnego pliku JSONL w partiach. Przetwarzanie wsadowe zmniejsza narzut I/O i pozwala dalszym systemom przetwarzac wieksze porcje naraz. Konsument zatwierdza offsety dopiero po pelnym zapisie partii, zapewniajac dostarczenie co najmniej jednokrotne.
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('etl-events',bootstrap_servers=['localhost:9092'],value_deserializer=lambda v: json.loads(v.decode('utf-8')),group_id='etl-consumer-group',enable_auto_commit=False,auto_offset_reset='earliest',)BATCH_SIZE = 500batch = []for message in consumer:batch.append(message.value)if len(batch) >= BATCH_SIZE:# Zapisz partie do pliku JSONLwith open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()print(f'Zatwierdzona partia {len(batch)} rekordow')batch.clear()# Oproznij pozostale rekordyif batch:with open('output/batch.jsonl', 'a') as f:for record in batch:f.write(json.dumps(record) + '\n')consumer.commit()
Apache Airflow: orkiestracja zadan JSONL
Apache Airflow to standardowe narzedzie orkiestracji dla wsadowego ETL. Typowy potok Airflow oparty na JSONL ma trzy etapy: ekstrakcja surowych danych do JSONL, transformacja pliku JSONL (filtrowanie, wzbogacanie, przeksztalcanie) i ladowanie wyniku do hurtowni danych lub magazynu obiektow. Kazdy etap czyta i zapisuje pliki JSONL, co sprawia, ze stan posredni jest mozliwy do inspekcji, a poszczegolne zadania mozna ponownie uruchamiac niezaleznie.
Kompletny DAG Airflow, ktory ekstrahuje dane z API, transformuje je jako JSONL i laduje wynik do przechowywania w chmurze. Kazde zadanie komunikuje sie poprzez pliki JSONL przechowywane we wspolnym magazynie, co ulatwia debugowanie i ponawianie potoku.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom datetime import datetime, timedeltaimport jsonimport requestsdefault_args = {'owner': 'data-team','retries': 2,'retry_delay': timedelta(minutes=5),}def extract(**context):"""Ekstrahuj dane z API i zapisz JSONL na dysk lokalny."""ds = context['ds'] # data wykonania: YYYY-MM-DDresponse = requests.get('https://api.example.com/events',params={'date': ds},)events = response.json()path = f'/tmp/raw_{ds}.jsonl'with open(path, 'w') as f:for event in events:f.write(json.dumps(event) + '\n')return pathdef transform(**context):"""Odczytaj surowy JSONL, filtruj i wzbogac, zapisz oczyszczony JSONL."""ds = context['ds']input_path = f'/tmp/raw_{ds}.jsonl'output_path = f'/tmp/clean_{ds}.jsonl'with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:record = json.loads(line)if record.get('status') != 'valid':continuerecord['processed_at'] = datetime.utcnow().isoformat()fout.write(json.dumps(record) + '\n')return output_pathdef load(**context):"""Przeslij oczyszczony JSONL do S3."""ds = context['ds']local_path = f'/tmp/clean_{ds}.jsonl's3 = S3Hook(aws_conn_id='aws_default')s3.load_file(filename=local_path,key=f'etl/events/{ds}/data.jsonl',bucket_name='data-lake',replace=True,)with DAG('jsonl_etl_pipeline',default_args=default_args,schedule_interval='@daily',start_date=datetime(2026, 1, 1),catchup=False,) as dag:t_extract = PythonOperator(task_id='extract', python_callable=extract)t_transform = PythonOperator(task_id='transform', python_callable=transform)t_load = PythonOperator(task_id='load', python_callable=load)t_extract >> t_transform >> t_load
Dla duzych plikow JSONL, ktore nie mieszcza sie w pamieci, uzyj strumieniowej transformacji, ktora przetwarza jedna linie naraz. Ten wzorzec utrzymuje stale zuzycie pamieci niezaleznie od rozmiaru pliku i dobrze sprawdza sie w zadaniach Airflow, gdzie zasoby workerow moga byc ograniczone.
import jsonfrom typing import Callabledef stream_transform(input_path: str,output_path: str,transform_fn: Callable[[dict], dict | None],batch_size: int = 5000,) -> dict:"""Strumieniowo transformuj plik JSONL za pomoca funkcji zdefiniowanej przez uzytkownika.Zwroc None z transform_fn, aby pominac rekord."""stats = {'input': 0, 'output': 0, 'skipped': 0}batch = []with open(input_path) as fin, open(output_path, 'w') as fout:for line in fin:line = line.strip()if not line:continuestats['input'] += 1record = json.loads(line)result = transform_fn(record)if result is None:stats['skipped'] += 1continuebatch.append(json.dumps(result))if len(batch) >= batch_size:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)batch.clear()if batch:fout.write('\n'.join(batch) + '\n')stats['output'] += len(batch)return stats# Uzycie wewnatrz zadania Airflowdef enrich(record):if record.get('amount', 0) < 0:return None # pomin nieprawidlowerecord['currency'] = record.get('currency', 'USD').upper()return recordstats = stream_transform('/tmp/raw.jsonl','/tmp/enriched.jsonl',transform_fn=enrich,)print(f'Przetworzono: {stats}')
Ladowanie JSONL do hurtowni danych
Nowoczesne hurtownie danych maja pierwszorzedna obsluge pozyskiwania plikow JSONL. Snowflake, BigQuery i Redshift moga ladowac JSONL bezposrednio z przechowywania w chmurze, parsowac JSON w locie i przechowywac wyniki w ustrukturyzowanych tabelach. Eliminuje to potrzebe posredniego kroku konwersji CSV i zachowuje zagniezdzene typy danych.
Snowflake
COPY INTOSnowflake pozyskuje JSONL za pomoca komendy COPY INTO ze stageow S3, GCS lub Azure Blob. Uzyj opcji FILE_FORMAT z TYPE = 'JSON', aby parsowac kazda linie jako oddzielny dokument JSON. Typ kolumny VARIANT przechowuje polusrtukturalne dane natywnie.
BigQuery
NatywnyBigQuery laduje pliki JSONL z Google Cloud Storage z automatycznym wykrywaniem schematu. Komenda bq load z --source_format=NEWLINE_DELIMITED_JSON obsluguje JSONL natywnie. BigQuery moze rowniez automatycznie wykrywac typy kolumn z wartosci JSON.
Redshift
COPYAmazon Redshift obsluguje ladowanie JSONL z S3 za pomoca komendy COPY z FORMAT AS JSON. Mozesz udostepnic plik JSONPaths do mapowania kluczy JSON na kolumny tabeli, dajac Ci szczegolowa kontrole nad procesem ladowania.
Zaladuj plik JSONL ze stagea S3 do tabeli Snowflake. Kazda linia staje sie wierszem z kolumna VARIANT zawierajaca sparsowany JSON. Nastepnie mozesz splaszczyc kolumne VARIANT do typowanych kolumn za pomoca Snowflake SQL.
-- Utworz stage wskazujacy na Twoj bucket S3CREATE OR REPLACE STAGE etl_stageURL = 's3://data-lake/etl/events/'CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE);-- Utworz tabele docelowaCREATE TABLE IF NOT EXISTS raw_events (data VARIANT,loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Zaladuj pliki JSONL ze stageaCOPY INTO raw_events (data)FROM @etl_stageFILE_FORMAT = (TYPE = 'JSON')PATTERN = '.*\.jsonl'ON_ERROR = 'CONTINUE';-- Odpytaj zaladowane daneSELECTdata:id::INT AS event_id,data:user_id::STRING AS user_id,data:event_type::STRING AS event_type,data:timestamp::TIMESTAMP AS event_tsFROM raw_eventsLIMIT 10;
Uzyj klienta Python BigQuery do zaladowania pliku JSONL z Google Cloud Storage do tabeli BigQuery. Klient obsluguje wykrywanie schematu, partycjonowanie i raportowanie bledow. To podejscie dobrze integruje sie z zadaniami Airflow.
from google.cloud import bigqueryclient = bigquery.Client(project='my-project')job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,autodetect=True,write_disposition=bigquery.WriteDisposition.WRITE_APPEND,max_bad_records=10,)# Zaladuj z GCSuri = 'gs://data-lake/etl/events/2026-01-15/data.jsonl'table_ref = 'my-project.analytics.raw_events'load_job = client.load_table_from_uri(uri,table_ref,job_config=job_config,)load_job.result() # Czekaj na zakonczenietable = client.get_table(table_ref)print(f'Zaladowano {load_job.output_rows} wierszy')print(f'Tabela ma teraz {table.num_rows} wierszy lacznie')
Tolerancja na bledy i idempotentnosc
Potoki ETL zawodza. Sieci zrywaja polaczenia, API przekraczaja limity czasu, a workery wyczerpuja pamiec. JSONL upraszcza odzyskiwanie, poniewaz kazda linia jest niezalezna. Mozesz zapisac punkt kontrolny swojej pozycji (numer linii), wznowic od ostatniej udanej linii i uniknac ponownego przetwarzania danych. W polaczeniu z idempotentnym zapisem zapewnia to semantyke dokladnie jednokrotna nawet przy dostarczeniu co najmniej jednokrotnym.
Procesor oparty na punktach kontrolnych, ktory sledzi postep wedlug numeru linii. Jesli potok zawiedzie, wznawia od ostatniego zatwierdzonego punktu kontrolnego zamiast ponownie przetwarzac caly plik. Plik punktu kontrolnego to prosty plik tekstowy zawierajacy ostatni pomyslnie przetworzony numer linii.
import jsonimport osdef get_checkpoint(checkpoint_path: str) -> int:"""Odczytaj ostatni zatwierdzony numer linii."""if os.path.exists(checkpoint_path):with open(checkpoint_path) as f:return int(f.read().strip())return 0def save_checkpoint(checkpoint_path: str, line_num: int):"""Atomowo zapisz biezacy punkt kontrolny."""tmp = checkpoint_path + '.tmp'with open(tmp, 'w') as f:f.write(str(line_num))os.replace(tmp, checkpoint_path) # atomowe na POSIXdef process_with_checkpoint(input_path: str,output_path: str,checkpoint_path: str,batch_size: int = 1000,):"""Przetwarzaj plik JSONL z wznawianiem opartym na punktach kontrolnych."""start_line = get_checkpoint(checkpoint_path)processed = 0with open(input_path) as fin, \open(output_path, 'a') as fout:for line_num, line in enumerate(fin, 1):if line_num <= start_line:continue # pomin juz przetworzone linieline = line.strip()if not line:continuerecord = json.loads(line)# --- Twoja logika transformacji ---record['etl_version'] = 'v2'fout.write(json.dumps(record) + '\n')processed += 1if processed % batch_size == 0:fout.flush()save_checkpoint(checkpoint_path, line_num)print(f'Punkt kontrolny w linii {line_num}')# Koncowy punkt kontrolnysave_checkpoint(checkpoint_path, line_num)print(f'Gotowe. Przetworzono {processed} rekordow od linii {start_line + 1}')# Uzycieprocess_with_checkpoint('input.jsonl','output.jsonl','checkpoint.txt',batch_size=5000,)
Wzorzec punktow kontrolnych dziala z kazdym etapem potoku JSONL. Dla konsumentow Kafka zastap punkt kontrolny pliku zatwierdzonymi offsetami. Dla Airflow przechowuj punkt kontrolny w XCom lub zewnetrznej bazie danych, aby ponownie uruchamiane zadania wznawialy sie od wlasciwej pozycji. Kluczowa obserwacja jest taka, ze liniowy format JSONL sprawia, ze sledzenie pozycji jest trywialne: pojedyncza liczba calkowita (numer linii lub przesuniecie bajtowe) to wszystko, czego potrzebujesz do wznowienia.
Zwaliduj swoj JSONL przed ladowaniem
Wylapuj znieksztalcone rekordy, zanim trafa do Twojego potoku. Uzyj naszych darmowych narzedzi online, aby walidowac, przegladac i konwertowac pliki JSONL w przegladarce.