JSONLデータベースインポート&エクスポート:PostgreSQL、MongoDB等

JSONLファイルのデータベースへのインポートとデータベースレコードのJSONLとしてのエクスポートに関する包括的ガイド。PostgreSQL COPYコマンド、MongoDB CLIツール、MySQL JSON_TABLE、SQLiteスクリプト、本番環境対応のバルクインポートパターンをカバー。

最終更新:2026年2月

データベースインポート&エクスポートにJSONLを使う理由

JSONL(JSON Lines)は、データベースと外部システム間でデータを移動するための推奨フォーマットになっています。各行が独立した自己完結型のJSONオブジェクトであるため、データセット全体をメモリに読み込むことなく、一度に1レコードずつストリーム処理できます。これは、PostgreSQLインスタンス間で数百万行を移行する場合、MongoDBコレクションをデータウェアハウスに同期する場合、データベースエクスポートを機械学習パイプラインに投入する場合に不可欠です。

CSVと比較して、JSONLはデータ型を保持し、ネストされたオブジェクトや配列をサポートし、デリミタのエスケープの曖昧さを排除します。JSONLファイルはPostgreSQLのjsonbカラムをそのまま運ぶことができますが、CSVではネストされた構造をフラット化またはエスケープする必要があります。完全なJSON配列と比較して、JSONLはストリーミング可能です:ファイル全体のパースを待つことなく、各レコードを読み取った瞬間に処理、検証、変換できます。

すべての主要なデータベースシステムにJSONL用のツールがあります。PostgreSQLはjsonbキャストを通じてCOPYでき、MongoDBはデフォルトでJSONLを使用するmongoimportとmongoexportを搭載し、MySQL 8.0は構造化抽出用のJSON_TABLEを追加し、SQLiteにはJSON1拡張があります。以下のセクションで、各データベースの正確なコマンド、スクリプト、ベストプラクティスを学びます。

PostgreSQL:JSONLのインポート&エクスポート

PostgreSQLはJSONLを扱うためのいくつかのアプローチを提供しています。COPYコマンドは最速のバルクインポートパスを提供し、jsonbデータ型により半構造化データをネイティブに保存・クエリできます。構造化テーブルの場合、インポート時にJSONLフィールドを型付きカラムにパースできます。

JSONLをPostgreSQLにインポートする最速の方法は、COPYコマンドを使用して各行をjsonb値として読み込むことです。単一のjsonbカラムを持つテーブルを作成し、COPY FROMを使用してファイルを直接データベースにストリームします。

jsonbカラムにJSONLをインポート
-- Create a staging table with a single jsonb column
CREATE TABLE staging_import (data jsonb);
-- Import the JSONL file (each line becomes one row)
\COPY staging_import (data) FROM 'users.jsonl';
-- Verify the import
SELECT count(*) FROM staging_import;
SELECT data->>'name' AS name, data->>'email' AS email
FROM staging_import
LIMIT 5;
-- Move data into a structured table
INSERT INTO users (name, email, age)
SELECT
data->>'name',
data->>'email',
(data->>'age')::int
FROM staging_import;

データをJSONLとしてエクスポートするには、row_to_json()またはto_jsonb()を使用して各行をJSONオブジェクトに変換し、結果をファイルにCOPYします。各行が出力ファイルの1行になります。

PostgreSQLの行をJSONLとしてエクスポート
-- Export entire table as JSONL
\COPY (
SELECT row_to_json(t)
FROM (SELECT id, name, email, created_at FROM users) t
) TO 'users_export.jsonl';
-- Export with filtering and transformation
\COPY (
SELECT row_to_json(t)
FROM (
SELECT id, name, email,
created_at::text AS created_at
FROM users
WHERE active = true
ORDER BY id
) t
) TO 'active_users.jsonl';
-- Verify: each line is valid JSON
-- {"id":1,"name":"Alice","email":"alice@example.com",...}

本番ワークロードでは、psycopg2を使用したPythonスクリプトでJSONLファイルを読み取り、型付きカラムにレコードを挿入します。これにより、検証、エラー処理、バッチサイズを完全に制御できます。

構造化PostgreSQLインポート用Pythonスクリプト
import json
import psycopg2
from psycopg2.extras import execute_values
def import_jsonl_to_postgres(file_path, conn_string):
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
batch = []
batch_size = 1000
total = 0
with open(file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
batch.append((
record['name'],
record['email'],
record.get('age'),
))
except (json.JSONDecodeError, KeyError) as e:
print(f"Skipping line {line_num}: {e}")
continue
if len(batch) >= batch_size:
execute_values(
cur,
"INSERT INTO users (name, email, age) "
"VALUES %s ON CONFLICT (email) DO NOTHING",
batch
)
total += len(batch)
batch = []
print(f"Imported {total} records...")
if batch:
execute_values(
cur,
"INSERT INTO users (name, email, age) "
"VALUES %s ON CONFLICT (email) DO NOTHING",
batch
)
total += len(batch)
conn.commit()
cur.close()
conn.close()
print(f"Done. Imported {total} records.")
# Usage
import_jsonl_to_postgres('users.jsonl', 'postgresql://localhost/mydb')

MongoDB:ネイティブJSONLサポート

MongoDBはコマンドラインDatabase Toolsを通じてファーストクラスのJSONLサポートを提供しています。mongoimportとmongoexportユーティリティはデフォルトでJSONL形式を使用するため、JSONLデータ交換において最も簡単に扱えるデータベースの1つです。JSONLファイルの各行はMongoDBドキュメントに直接マッピングされます。

mongoimportはJSONLファイルを読み取り、各行を指定されたコレクションにドキュメントとして挿入します。アップサートモード、フィールド型変換、高スループットのための並列挿入をサポートしています。

mongoimportでJSONLをインポート
# Basic import: each line becomes a document
mongoimport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--file=users.jsonl
# Upsert mode: update existing documents by _id
mongoimport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--file=users.jsonl \
--mode=upsert \
--upsertFields=email
# Drop collection before import (clean slate)
mongoimport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--file=users.jsonl \
--drop
# Import with parallel workers for large files
mongoimport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--file=large_users.jsonl \
--numInsertionWorkers=4

mongoexportはコレクション内の各ドキュメントを単一のJSON行として書き込みます。出力のフィルタリング、射影、ソートが可能です。結果は、任意の下流システムで処理できる有効なJSONLファイルになります。

mongoexportでMongoDBをJSONLにエクスポート
# Export entire collection as JSONL (default format)
mongoexport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--out=users_export.jsonl
# Export with query filter
mongoexport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--query={"active": true} \
--out=active_users.jsonl
# Export specific fields only
mongoexport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--fields=name,email,created_at \
--out=users_partial.jsonl
# Export with sorting
mongoexport \
--uri="mongodb://localhost:27017/mydb" \
--collection=users \
--sort={"created_at": -1} \
--out=users_sorted.jsonl

MySQL:JSON_TABLE&スクリプトによるJSONL

MySQLにはMongoDBのような組み込みのJSONLインポートコマンドはありませんが、MySQL 8.0ではJSON文字列から構造化データを抽出できるJSON_TABLEが導入されました。バルクJSONLインポートの最も信頼性の高いアプローチは、生データの読み込みにLOAD DATA INFILEを、抽出にJSON_TABLEを組み合わせるか、Pythonなどのスクリプト言語を使用することです。

JSONLファイルを生テキストとしてステージングテーブルに読み込み、JSON_TABLEを使用して構造化テーブルにフィールドを抽出します。この2ステップのアプローチはMySQL内で完全に動作します。

ステージングテーブル&JSON_TABLEを使用したJSONLインポート
-- Step 1: Create a staging table for raw lines
CREATE TABLE jsonl_staging (
id INT AUTO_INCREMENT PRIMARY KEY,
raw_line TEXT NOT NULL
);
-- Step 2: Load the JSONL file as raw text
LOAD DATA INFILE '/var/lib/mysql-files/users.jsonl'
INTO TABLE jsonl_staging
LINES TERMINATED BY '\n'
(raw_line);
-- Step 3: Extract structured data with JSON_TABLE
INSERT INTO users (name, email, age)
SELECT jt.name, jt.email, jt.age
FROM jsonl_staging,
JSON_TABLE(
raw_line, '$'
COLUMNS (
name VARCHAR(255) PATH '$.name',
email VARCHAR(255) PATH '$.email',
age INT PATH '$.age'
)
) AS jt;
-- Step 4: Clean up staging table
DROP TABLE jsonl_staging;
-- Verify the import
SELECT * FROM users LIMIT 5;

より柔軟性が必要な場合は、mysql-connector-pythonを使用したPythonスクリプトでJSONLファイルを読み取り、レコードをバッチ挿入します。このアプローチは大きなファイルを効率的に処理し、詳細なエラーレポートを提供します。

MySQL JSONLインポート用Pythonスクリプト
import json
import mysql.connector
def import_jsonl_to_mysql(file_path, config):
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
batch = []
batch_size = 1000
total = 0
insert_sql = (
"INSERT INTO users (name, email, age) "
"VALUES (%s, %s, %s) "
"ON DUPLICATE KEY UPDATE name=VALUES(name)"
)
with open(file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
batch.append((
record['name'],
record['email'],
record.get('age'),
))
except (json.JSONDecodeError, KeyError) as e:
print(f"Skipping line {line_num}: {e}")
continue
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
batch = []
print(f"Imported {total} records...")
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
cursor.close()
conn.close()
print(f"Done. Imported {total} records.")
# Usage
import_jsonl_to_mysql('users.jsonl', {
'host': 'localhost',
'user': 'root',
'password': 'secret',
'database': 'mydb'
})

SQLite:軽量JSONLインポート

SQLiteはローカル開発や小規模なデータセットに優れた選択肢です。SQLiteにはネイティブのJSONLインポートコマンドはありませんが、JSON1拡張がJSONデータを扱うためのjson_extract()を提供し、シンプルなPythonスクリプトでSQLiteの組み込みトランザクションサポートを使用してJSONLファイルを効率的にインポートできます。

Pythonの組み込みsqlite3モジュールを使用して、JSONLファイルを読み取り、SQLiteデータベースにレコードを挿入します。すべての挿入を単一のトランザクションでラップすることで、大きなファイルのパフォーマンスが劇的に向上します。

SQLite JSONLインポート用Pythonスクリプト
import json
import sqlite3
def import_jsonl_to_sqlite(jsonl_path, db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
age INTEGER,
raw_json TEXT
)
""")
total = 0
errors = 0
with open(jsonl_path, 'r') as f:
# Use a transaction for bulk insert performance
conn.execute('BEGIN TRANSACTION')
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
cursor.execute(
"INSERT OR IGNORE INTO users "
"(name, email, age, raw_json) "
"VALUES (?, ?, ?, ?)",
(
record['name'],
record.get('email'),
record.get('age'),
line, # Store original JSON
)
)
total += 1
except (json.JSONDecodeError, KeyError) as e:
errors += 1
print(f"Line {line_num}: {e}")
conn.commit()
conn.close()
print(f"Imported {total} records, {errors} errors")
# Usage
import_jsonl_to_sqlite('users.jsonl', 'app.db')
# Query with json_extract (SQLite JSON1 extension)
# SELECT json_extract(raw_json, '$.name') FROM users;

バルクインポートのベストプラクティス

大きなJSONLファイルをデータベースにインポートするには、トランザクション、エラー処理、リソース管理に注意を払う必要があります。これらのベストプラクティスはすべてのデータベースシステムに適用され、信頼性の高い本番環境対応のインポートパイプラインの構築に役立ちます。

バッチトランザクション

パフォーマンス

1行ごとに1トランザクションで挿入してはいけません。挿入を1トランザクションあたり500〜5000行のバッチにグループ化します。これにより、ディスクI/Oとロックのオーバーヘッドが10〜100倍削減されます。ほとんどのデータベースではバッチサイズ1000で最大の効果が得られます。それ以上大きくしても効果は逓減し、失敗時のロールバックコストが増加します。

行レベルのエラー処理

信頼性

各JSONL行を個別にパースして検証します。検証や挿入に失敗した行について、行番号とエラーメッセージをログに記録します。インポート全体を中止するのではなく、ON CONFLICT / ON DUPLICATE KEYを使用してユニーク制約違反をスムーズに処理します。失敗した行を後で確認するために別のエラーファイルに保存します。

進捗追跡

可観測性

数百万行のファイルの場合、N件ごと(例:10,000件ごと)に進捗をログに記録します。読み取った行数、成功した挿入数、スキップした重複数、エラー数を追跡します。インポート速度(レコード/秒)を計算して表示し、パフォーマンスのボトルネックを特定します。このフィードバックは長時間実行されるインポートの監視に不可欠です。

この汎用パターンは、バッチ処理、エラー処理、進捗追跡を組み合わせています。insert関数を入れ替えることで、任意のデータベースに適応できます。

本番環境対応のバッチインポートパターン
import json
import time
def batch_import_jsonl(file_path, insert_fn, batch_size=1000):
"""Generic JSONL batch importer with error handling."""
batch = []
stats = {
'total_lines': 0,
'imported': 0,
'skipped': 0,
'errors': 0
}
start_time = time.time()
error_file = open(file_path + '.errors', 'w')
with open(file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
stats['total_lines'] = line_num
line = line.strip()
if not line:
stats['skipped'] += 1
continue
try:
record = json.loads(line)
batch.append(record)
except json.JSONDecodeError as e:
stats['errors'] += 1
error_file.write(f"{line_num}\t{e}\t{line}\n")
continue
if len(batch) >= batch_size:
inserted = insert_fn(batch)
stats['imported'] += inserted
batch = []
if line_num % 10000 == 0:
elapsed = time.time() - start_time
rate = stats['imported'] / elapsed
print(
f"Progress: {line_num:,} lines | "
f"{stats['imported']:,} imported | "
f"{rate:,.0f} records/sec"
)
# Flush remaining batch
if batch:
inserted = insert_fn(batch)
stats['imported'] += inserted
error_file.close()
elapsed = time.time() - start_time
print(f"\nComplete in {elapsed:.1f}s")
print(f" Lines: {stats['total_lines']:,}")
print(f" Imported: {stats['imported']:,}")
print(f" Errors: {stats['errors']:,}")
return stats

無料のJSONLツールでデータを準備

JSONLをデータベースにインポートする前に、ファイル形式の検証やフォーマット間の変換を行いましょう。無料のオンラインツールはブラウザ内ですべてをローカルに処理するため、データがマシンから離れることはありません。

JSONLファイルをオンラインで操作

ブラウザで最大1GBのJSONLファイルを表示、検証、変換。アップロード不要、100%プライベート。

よくある質問

JSONL インポート/エクスポート — PostgreSQL、MongoDB、BigQuery 他 | json...