JSONL 数据库导入导出:PostgreSQL、MongoDB 等
将 JSONL 文件导入数据库和将数据库记录导出为 JSONL 的全面指南。涵盖 PostgreSQL COPY 命令、MongoDB CLI 工具、MySQL JSON_TABLE、SQLite 脚本和生产级批量导入模式。
最后更新:2026年2月
为什么使用 JSONL 进行数据库导入导出?
JSONL (JSON Lines) 已成为在数据库和外部系统之间传输数据的首选格式。每一行都是一个独立的、自包含的 JSON 对象,这意味着您可以逐条流式传输记录,无需将整个数据集加载到内存中。这在您需要在 PostgreSQL 实例之间迁移数百万行、将 MongoDB 集合同步到数据仓库或将数据库导出馈入机器学习管道时至关重要。
与 CSV 相比,JSONL 保留数据类型、支持嵌套对象和数组,并消除了分隔符转义的歧义。JSONL 文件可以按原样携带 PostgreSQL jsonb 列,而 CSV 则需要您扁平化或转义嵌套结构。与完整 JSON 数组相比,JSONL 是可流式的:您可以在读取每条记录的瞬间进行处理、验证或转换,而不必等待整个文件解析完成。
每个主流数据库系统现在都有处理 JSONL 的工具。PostgreSQL 可以通过 jsonb 转换进行 COPY 行操作,MongoDB 自带默认使用 JSONL 的 mongoimport 和 mongoexport,MySQL 8.0 添加了 JSON_TABLE 用于结构化提取,SQLite 有 JSON1 扩展。在以下章节中,您将学习每个数据库的确切命令、脚本和最佳实践。
PostgreSQL:导入导出 JSONL
PostgreSQL 提供了多种处理 JSONL 的方法。COPY 命令提供最快的批量导入路径,而 jsonb 数据类型允许您原生存储和查询半结构化数据。对于结构化表,您可以在导入过程中将 JSONL 字段解析为类型化列。
将 JSONL 导入 PostgreSQL 最快的方式是使用 COPY 命令将每一行加载为 jsonb 值。创建一个只有单个 jsonb 列的表,然后使用 COPY FROM 直接将文件流式导入数据库。
-- Create a staging table with a single jsonb columnCREATE TABLE staging_import (data jsonb);-- Import the JSONL file (each line becomes one row)\COPY staging_import (data) FROM 'users.jsonl';-- Verify the importSELECT count(*) FROM staging_import;SELECT data->>'name' AS name, data->>'email' AS emailFROM staging_importLIMIT 5;-- Move data into a structured tableINSERT INTO users (name, email, age)SELECTdata->>'name',data->>'email',(data->>'age')::intFROM staging_import;
要将数据导出为 JSONL,使用 row_to_json() 或 to_jsonb() 将每一行转换为 JSON 对象,然后使用 COPY 将结果写入文件。每一行在输出文件中成为一行。
-- Export entire table as JSONL\COPY (SELECT row_to_json(t)FROM (SELECT id, name, email, created_at FROM users) t) TO 'users_export.jsonl';-- Export with filtering and transformation\COPY (SELECT row_to_json(t)FROM (SELECT id, name, email,created_at::text AS created_atFROM usersWHERE active = trueORDER BY id) t) TO 'active_users.jsonl';-- Verify: each line is valid JSON-- {"id":1,"name":"Alice","email":"alice@example.com",...}
对于生产工作负载,使用 Python 脚本配合 psycopg2 读取 JSONL 文件并将记录插入类型化列。这让您完全控制验证、错误处理和批次大小。
import jsonimport psycopg2from psycopg2.extras import execute_valuesdef import_jsonl_to_postgres(file_path, conn_string):conn = psycopg2.connect(conn_string)cur = conn.cursor()batch = []batch_size = 1000total = 0with open(file_path, 'r') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:record = json.loads(line)batch.append((record['name'],record['email'],record.get('age'),))except (json.JSONDecodeError, KeyError) as e:print(f"Skipping line {line_num}: {e}")continueif len(batch) >= batch_size:execute_values(cur,"INSERT INTO users (name, email, age) ""VALUES %s ON CONFLICT (email) DO NOTHING",batch)total += len(batch)batch = []print(f"Imported {total} records...")if batch:execute_values(cur,"INSERT INTO users (name, email, age) ""VALUES %s ON CONFLICT (email) DO NOTHING",batch)total += len(batch)conn.commit()cur.close()conn.close()print(f"Done. Imported {total} records.")# Usageimport_jsonl_to_postgres('users.jsonl', 'postgresql://localhost/mydb')
MongoDB:原生 JSONL 支持
MongoDB 通过其命令行数据库工具提供一流的 JSONL 支持。mongoimport 和 mongoexport 工具默认使用 JSONL 格式,使 MongoDB 成为最容易进行 JSONL 数据交换的数据库之一。JSONL 文件中的每一行直接映射到一个 MongoDB 文档。
mongoimport 读取 JSONL 文件并将每一行作为文档插入指定集合。它支持 upsert 模式、字段类型转换和并行插入以实现高吞吐量。
# Basic import: each line becomes a documentmongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=users.jsonl# Upsert mode: update existing documents by _idmongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=users.jsonl \--mode=upsert \--upsertFields=email# Drop collection before import (clean slate)mongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=users.jsonl \--drop# Import with parallel workers for large filesmongoimport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--file=large_users.jsonl \--numInsertionWorkers=4
mongoexport 将集合中的每个文档写为单行 JSON。您可以过滤、投影和排序输出。结果是一个有效的 JSONL 文件,可以被任何下游系统处理。
# Export entire collection as JSONL (default format)mongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--out=users_export.jsonl# Export with query filtermongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--query={"active": true} \--out=active_users.jsonl# Export specific fields onlymongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--fields=name,email,created_at \--out=users_partial.jsonl# Export with sortingmongoexport \--uri="mongodb://localhost:27017/mydb" \--collection=users \--sort={"created_at": -1} \--out=users_sorted.jsonl
MySQL:使用 JSON_TABLE 和脚本处理 JSONL
MySQL 没有像 MongoDB 那样的内置 JSONL 导入命令,但 MySQL 8.0 引入了 JSON_TABLE,允许您从 JSON 字符串中提取结构化数据。对于批量 JSONL 导入,最可靠的方法是将 LOAD DATA INFILE 用于原始加载,配合 JSON_TABLE 进行提取,或使用 Python 等脚本语言。
将 JSONL 文件作为原始文本加载到暂存表中,然后使用 JSON_TABLE 将字段提取到结构化表中。这种两步方法完全在 MySQL 内部工作。
-- Step 1: Create a staging table for raw linesCREATE TABLE jsonl_staging (id INT AUTO_INCREMENT PRIMARY KEY,raw_line TEXT NOT NULL);-- Step 2: Load the JSONL file as raw textLOAD DATA INFILE '/var/lib/mysql-files/users.jsonl'INTO TABLE jsonl_stagingLINES TERMINATED BY '\n'(raw_line);-- Step 3: Extract structured data with JSON_TABLEINSERT INTO users (name, email, age)SELECT jt.name, jt.email, jt.ageFROM jsonl_staging,JSON_TABLE(raw_line, '$'COLUMNS (name VARCHAR(255) PATH '$.name',email VARCHAR(255) PATH '$.email',age INT PATH '$.age')) AS jt;-- Step 4: Clean up staging tableDROP TABLE jsonl_staging;-- Verify the importSELECT * FROM users LIMIT 5;
为了更大的灵活性,使用 Python 脚本配合 mysql-connector-python 读取 JSONL 文件并批量插入记录。这种方法可以高效处理大文件并提供详细的错误报告。
import jsonimport mysql.connectordef import_jsonl_to_mysql(file_path, config):conn = mysql.connector.connect(**config)cursor = conn.cursor()batch = []batch_size = 1000total = 0insert_sql = ("INSERT INTO users (name, email, age) ""VALUES (%s, %s, %s) ""ON DUPLICATE KEY UPDATE name=VALUES(name)")with open(file_path, 'r') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:record = json.loads(line)batch.append((record['name'],record['email'],record.get('age'),))except (json.JSONDecodeError, KeyError) as e:print(f"Skipping line {line_num}: {e}")continueif len(batch) >= batch_size:cursor.executemany(insert_sql, batch)conn.commit()total += len(batch)batch = []print(f"Imported {total} records...")if batch:cursor.executemany(insert_sql, batch)conn.commit()total += len(batch)cursor.close()conn.close()print(f"Done. Imported {total} records.")# Usageimport_jsonl_to_mysql('users.jsonl', {'host': 'localhost','user': 'root','password': 'secret','database': 'mydb'})
SQLite:轻量级 JSONL 导入
SQLite 是本地开发和小型数据集的绝佳选择。虽然 SQLite 没有原生的 JSONL 导入命令,但其 JSON1 扩展提供了 json_extract() 用于处理 JSON 数据,而简单的 Python 脚本可以利用 SQLite 内置的事务支持高效地导入 JSONL 文件。
使用 Python 内置的 sqlite3 模块读取 JSONL 文件并将记录插入 SQLite 数据库。将所有插入操作包装在单个事务中可以显著提高大文件的性能。
import jsonimport sqlite3def import_jsonl_to_sqlite(jsonl_path, db_path):conn = sqlite3.connect(db_path)cursor = conn.cursor()# Create table if it doesn't existcursor.execute("""CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,email TEXT UNIQUE,age INTEGER,raw_json TEXT)""")total = 0errors = 0with open(jsonl_path, 'r') as f:# Use a transaction for bulk insert performanceconn.execute('BEGIN TRANSACTION')for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:record = json.loads(line)cursor.execute("INSERT OR IGNORE INTO users ""(name, email, age, raw_json) ""VALUES (?, ?, ?, ?)",(record['name'],record.get('email'),record.get('age'),line, # Store original JSON))total += 1except (json.JSONDecodeError, KeyError) as e:errors += 1print(f"Line {line_num}: {e}")conn.commit()conn.close()print(f"Imported {total} records, {errors} errors")# Usageimport_jsonl_to_sqlite('users.jsonl', 'app.db')# Query with json_extract (SQLite JSON1 extension)# SELECT json_extract(raw_json, '$.name') FROM users;
批量导入最佳实践
将大型 JSONL 文件导入数据库需要仔细关注事务、错误处理和资源管理。这些最佳实践适用于所有数据库系统,将帮助您构建可靠的、生产级的导入管道。
批次事务
性能切勿每条记录一个事务。将插入操作分组为每个事务 500-5000 行的批次。这可以将磁盘 I/O 和锁开销减少 10-100 倍。大多数数据库在批次大小为 1000 时获得最大收益。更大的批次收益递减,且在出错时回滚成本更高。
行级错误处理
可靠性单独解析和验证每一行 JSONL。记录任何验证或插入失败的行号和错误消息。使用 ON CONFLICT / ON DUPLICATE KEY 优雅地处理唯一约束冲突,而不是中止整个导入。将失败的行保存到单独的错误文件中以供后续审查。
进度跟踪
可观测性对于数百万行的文件,每 N 条记录记录一次进度(例如,每 10,000 条)。跟踪总读取行数、成功插入数、跳过的重复项和错误数。计算并显示导入速率(记录/秒)以识别性能瓶颈。这种反馈对于监控长时间运行的导入至关重要。
这个通用模式结合了批处理、错误处理和进度跟踪。通过替换插入函数即可适配任何数据库。
import jsonimport timedef batch_import_jsonl(file_path, insert_fn, batch_size=1000):"""Generic JSONL batch importer with error handling."""batch = []stats = {'total_lines': 0,'imported': 0,'skipped': 0,'errors': 0}start_time = time.time()error_file = open(file_path + '.errors', 'w')with open(file_path, 'r') as f:for line_num, line in enumerate(f, 1):stats['total_lines'] = line_numline = line.strip()if not line:stats['skipped'] += 1continuetry:record = json.loads(line)batch.append(record)except json.JSONDecodeError as e:stats['errors'] += 1error_file.write(f"{line_num}\t{e}\t{line}\n")continueif len(batch) >= batch_size:inserted = insert_fn(batch)stats['imported'] += insertedbatch = []if line_num % 10000 == 0:elapsed = time.time() - start_timerate = stats['imported'] / elapsedprint(f"Progress: {line_num:,} lines | "f"{stats['imported']:,} imported | "f"{rate:,.0f} records/sec")# Flush remaining batchif batch:inserted = insert_fn(batch)stats['imported'] += insertederror_file.close()elapsed = time.time() - start_timeprint(f"\nComplete in {elapsed:.1f}s")print(f" Lines: {stats['total_lines']:,}")print(f" Imported: {stats['imported']:,}")print(f" Errors: {stats['errors']:,}")return stats
使用免费 JSONL 工具准备数据
在将 JSONL 导入数据库之前,验证文件格式并在不同格式之间转换。我们的免费在线工具在浏览器本地处理一切,因此您的数据永远不会离开您的设备。