JSONL 資料庫匯入匯出:PostgreSQL、MongoDB 及更多
將 JSONL 檔案匯入資料庫和將資料庫記錄匯出為 JSONL 的完整指南。涵蓋 PostgreSQL COPY 命令、MongoDB CLI 工具、MySQL JSON_TABLE、SQLite 腳本和生產就緒的批次匯入模式。
最後更新:2026 年 2 月
為什麼使用 JSONL 進行資料庫匯入匯出?
JSONL(JSON Lines)已成為在資料庫和外部系統之間搬移資料的首選格式。每一行都是一個獨立的、自包含的 JSON 物件,這意味著您可以一次串流一筆記錄,而無需將整個資料集載入記憶體。當您在 PostgreSQL 實例之間遷移數百萬行、將 MongoDB 集合同步到資料倉庫,或將資料庫匯出饋送到機器學習管線時,這一點至關重要。
與 CSV 相比,JSONL 保留資料類型、支援巢狀物件和陣列,並消除了分隔符轉義的歧義。JSONL 檔案可以按原樣攜帶 PostgreSQL jsonb 欄位,而 CSV 則需要您展平或轉義巢狀結構。與完整的 JSON 陣列相比,JSONL 是可串流的:您可以在讀取每筆記錄的同時進行處理、驗證或轉換,而不需要等待整個檔案解析完成。
每個主要的資料庫系統現在都有 JSONL 工具。PostgreSQL 可以透過 jsonb 轉型使用 COPY 命令、MongoDB 自帶預設 JSONL 格式的 mongoimport 和 mongoexport、MySQL 8.0 新增了 JSON_TABLE 用於結構化提取、SQLite 有 JSON1 擴充功能。在以下章節中,您將學習每個資料庫的精確命令、腳本和最佳實踐。
PostgreSQL:匯入與匯出 JSONL
PostgreSQL 提供了多種處理 JSONL 的方式。COPY 命令提供最快的批次匯入路徑,而 jsonb 資料類型讓您原生儲存和查詢半結構化資料。對於結構化表格,您可以在匯入過程中將 JSONL 欄位解析為類型化的欄位。
將 JSONL 匯入 PostgreSQL 的最快方式是使用 COPY 命令將每一行載入為 jsonb 值。建立一個具有單一 jsonb 欄位的表格,然後使用 COPY FROM 直接將檔案串流到資料庫中。
-- Create a staging table with a single jsonb columnCREATE TABLE staging_import (data jsonb);-- Import the JSONL file (each line becomes one row)\COPY staging_import (data) FROM 'users.jsonl';-- Verify the importSELECT count(*) FROM staging_import;SELECT data->>'name' AS name, data->>'email' AS emailFROM staging_importLIMIT 5;-- Move data into a structured tableINSERT INTO users (name, email, age)SELECTdata->>'name',data->>'email',(data->>'age')::intFROM staging_import;
要將資料匯出為 JSONL,使用 row_to_json() 或 to_jsonb() 將每一列轉換為 JSON 物件,然後將結果 COPY 到檔案。每一列成為輸出檔案中的一行。
-- Export entire table as JSONL\COPY (SELECT row_to_json(t)FROM (SELECT id, name, email, created_at FROM users) t) TO 'users_export.jsonl';-- Export with filtering and transformation\COPY (SELECT row_to_json(t)FROM (SELECT id, name, email,created_at::text AS created_atFROM usersWHERE active = trueORDER BY id) t) TO 'active_users.jsonl';-- Verify: each line is valid JSON-- {"id":1,"name":"Alice","email":"alice@example.com",...}
對於生產工作負載,使用 psycopg2 搭配 Python 腳本讀取 JSONL 檔案並將記錄插入類型化的欄位。這讓您完全控制驗證、錯誤處理和批次大小。
import jsonimport psycopg2from psycopg2.extras import execute_valuesdef import_jsonl_to_postgres(file_path, conn_string):conn = psycopg2.connect(conn_string)cur = conn.cursor()batch = []batch_size = 1000total = 0with open(file_path, 'r') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:record = json.loads(line)batch.append((record['name'],record['email'],record.get('age'),))except (json.JSONDecodeError, KeyError) as e:print(f"Skipping line {line_num}: {e}")continueif len(batch) >= batch_size:execute_values(cur,"INSERT INTO users (name, email, age) ""VALUES %s ON CONFLICT (email) DO NOTHING",batch)total += len(batch)batch = []print(f"Imported {total} records...")if batch:execute_values(cur,"INSERT INTO users (name, email, age) ""VALUES %s ON CONFLICT (email) DO NOTHING",batch)total += len(batch)conn.commit()cur.close()conn.close()print(f"Done. Imported {total} records.")# Usageimport_jsonl_to_postgres('users.jsonl', 'postgresql://localhost/mydb')
MongoDB:原生 JSONL 支援
MongoDB 透過其命令列 Database Tools 提供一流的 JSONL 支援。mongoimport 和 mongoexport 工具預設使用 JSONL 格式,使 MongoDB 成為最容易進行 JSONL 資料交換的資料庫之一。JSONL 檔案中的每一行直接對應一個 MongoDB 文件。
mongoimport 讀取 JSONL 檔案並將每一行作為文件插入到指定的集合中。它支援 upsert 模式、欄位類型轉換和平行插入以獲得高吞吐量。
# Basic import: each line becomes a documentmongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=users.jsonl# Upsert mode: update existing documents by _idmongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=users.jsonl \--mode=upsert \--upsertFields=email# Drop collection before import (clean slate)mongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=users.jsonl \--drop# Import with parallel workers for large filesmongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=large_users.jsonl \--numInsertionWorkers=4
mongoexport 將集合中的每個文件寫為一行 JSON。您可以過濾、投影和排序輸出。結果是一個有效的 JSONL 檔案,可供任何下游系統處理。
# Export entire collection as JSONL (default format)mongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--out=users_export.jsonl# Export with query filtermongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--query={"active": true} \--out=active_users.jsonl# Export specific fields onlymongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--fields=name,email,created_at \--out=users_partial.jsonl# Export with sortingmongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--sort={"created_at": -1} \--out=users_sorted.jsonl
MySQL:使用 JSON_TABLE 和腳本處理 JSONL
MySQL 沒有像 MongoDB 那樣的內建 JSONL 匯入命令,但 MySQL 8.0 引入了 JSON_TABLE,讓您可以從 JSON 字串中提取結構化資料。對於批次 JSONL 匯入,最可靠的方法是結合 LOAD DATA INFILE 進行原始載入和 JSON_TABLE 進行提取,或者使用像 Python 這樣的腳本語言。
將 JSONL 檔案作為原始文字載入暫存表,然後使用 JSON_TABLE 將欄位提取到結構化表格中。這種兩步驟方法完全在 MySQL 中完成。
-- Step 1: Create a staging table for raw linesCREATE TABLE jsonl_staging (id INT AUTO_INCREMENT PRIMARY KEY,raw_line TEXT NOT NULL);-- Step 2: Load the JSONL file as raw textLOAD DATA INFILE '/var/lib/mysql-files/users.jsonl'INTO TABLE jsonl_stagingLINES TERMINATED BY '\n'(raw_line);-- Step 3: Extract structured data with JSON_TABLEINSERT INTO users (name, email, age)SELECT jt.name, jt.email, jt.ageFROM jsonl_staging,JSON_TABLE(raw_line, '$'COLUMNS (name VARCHAR(255) PATH '$.name',email VARCHAR(255) PATH '$.email',age INT PATH '$.age')) AS jt;-- Step 4: Clean up staging tableDROP TABLE jsonl_staging;-- Verify the importSELECT * FROM users LIMIT 5;
若需要更多彈性,使用 mysql-connector-python 搭配 Python 腳本讀取 JSONL 檔案並批次插入記錄。此方法高效處理大型檔案並提供詳細的錯誤報告。
import jsonimport mysql.connectordef import_jsonl_to_mysql(file_path, config):conn = mysql.connector.connect(**config)cursor = conn.cursor()batch = []batch_size = 1000total = 0insert_sql = ("INSERT INTO users (name, email, age) ""VALUES (%s, %s, %s) ""ON DUPLICATE KEY UPDATE name=VALUES(name)")with open(file_path, 'r') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:record = json.loads(line)batch.append((record['name'],record['email'],record.get('age'),))except (json.JSONDecodeError, KeyError) as e:print(f"Skipping line {line_num}: {e}")continueif len(batch) >= batch_size:cursor.executemany(insert_sql, batch)conn.commit()total += len(batch)batch = []print(f"Imported {total} records...")if batch:cursor.executemany(insert_sql, batch)conn.commit()total += len(batch)cursor.close()conn.close()print(f"Done. Imported {total} records.")# Usageimport_jsonl_to_mysql('users.jsonl', {'host': 'localhost','user': 'root','password': 'secret','database': 'mydb'})
SQLite:輕量級 JSONL 匯入
SQLite 是本機開發和較小資料集的絕佳選擇。雖然 SQLite 沒有原生 JSONL 匯入命令,但其 JSON1 擴充功能提供了 json_extract() 用於處理 JSON 資料,並且簡單的 Python 腳本可以利用 SQLite 的內建交易支援高效匯入 JSONL 檔案。
使用 Python 的內建 sqlite3 模組讀取 JSONL 檔案並將記錄插入 SQLite 資料庫。將所有插入包裝在單一交易中可以顯著提高大型檔案的效能。
import jsonimport sqlite3def import_jsonl_to_sqlite(jsonl_path, db_path):conn = sqlite3.connect(db_path)cursor = conn.cursor()# Create table if it doesn't existcursor.execute("""CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,email TEXT UNIQUE,age INTEGER,raw_json TEXT)""")total = 0errors = 0with open(jsonl_path, 'r') as f:# Use a transaction for bulk insert performanceconn.execute('BEGIN TRANSACTION')for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:record = json.loads(line)cursor.execute("INSERT OR IGNORE INTO users ""(name, email, age, raw_json) ""VALUES (?, ?, ?, ?)",(record['name'],record.get('email'),record.get('age'),line, # Store original JSON))total += 1except (json.JSONDecodeError, KeyError) as e:errors += 1print(f"Line {line_num}: {e}")conn.commit()conn.close()print(f"Imported {total} records, {errors} errors")# Usageimport_jsonl_to_sqlite('users.jsonl', 'app.db')# Query with json_extract (SQLite JSON1 extension)# SELECT json_extract(raw_json, '$.name') FROM users;
批次匯入最佳實踐
將大型 JSONL 檔案匯入資料庫需要仔細注意交易、錯誤處理和資源管理。這些最佳實踐適用於所有資料庫系統,將幫助您建構可靠的、生產就緒的匯入管線。
批次交易
效能永遠不要每筆插入一個交易。將插入分組為每個交易 500-5000 列。這可以將磁碟 I/O 和鎖開銷降低 10-100 倍。大多數資料庫在批次大小為 1000 時獲得最大增益。更大的批次提供遞減的回報,並在發生故障時增加回滾成本。
行級別錯誤處理
可靠性逐一解析和驗證每行 JSONL。為任何驗證或插入失敗的行記錄行號和錯誤訊息。使用 ON CONFLICT / ON DUPLICATE KEY 優雅地處理唯一約束違規,而不是中止整個匯入。將失敗的行儲存到單獨的錯誤檔案中以供後續審查。
進度追蹤
可觀測性對於有數百萬行的檔案,每 N 筆記錄記錄一次進度(例如每 10,000 筆)。追蹤已讀取的總行數、成功插入數、跳過的重複項和錯誤數。計算並顯示匯入速率(記錄/秒)以識別效能瓶頸。這種回饋對於監控長時間運行的匯入至關重要。
此通用模式結合了批次處理、錯誤處理和進度追蹤。透過替換 insert 函式即可適配任何資料庫。
import jsonimport timedef batch_import_jsonl(file_path, insert_fn, batch_size=1000):"""Generic JSONL batch importer with error handling."""batch = []stats = {'total_lines': 0,'imported': 0,'skipped': 0,'errors': 0}start_time = time.time()error_file = open(file_path + '.errors', 'w')with open(file_path, 'r') as f:for line_num, line in enumerate(f, 1):stats['total_lines'] = line_numline = line.strip()if not line:stats['skipped'] += 1continuetry:record = json.loads(line)batch.append(record)except json.JSONDecodeError as e:stats['errors'] += 1error_file.write(f"{line_num}\t{e}\t{line}\n")continueif len(batch) >= batch_size:inserted = insert_fn(batch)stats['imported'] += insertedbatch = []if line_num % 10000 == 0:elapsed = time.time() - start_timerate = stats['imported'] / elapsedprint(f"Progress: {line_num:,} lines | "f"{stats['imported']:,} imported | "f"{rate:,.0f} records/sec")# Flush remaining batchif batch:inserted = insert_fn(batch)stats['imported'] += insertederror_file.close()elapsed = time.time() - start_timeprint(f"\nComplete in {elapsed:.1f}s")print(f" Lines: {stats['total_lines']:,}")print(f" Imported: {stats['imported']:,}")print(f" Errors: {stats['errors']:,}")return stats
使用免費 JSONL 工具準備您的資料
在將 JSONL 匯入資料庫之前,先驗證檔案格式並在格式之間進行轉換。我們的免費線上工具在瀏覽器中本機處理所有內容,因此您的資料永遠不會離開您的電腦。