Rust で JSONL を扱う:serde_json、BufReader とゼロコスト抽象化
Rust で JSONL(JSON Lines)ファイルを扱うための完全ガイド。serde_json、BufReader、rayon による並列処理、tokio による非同期 I/O を使用して、JSONL データの読み取り、書き込み、パース、ストリーミングを学びます。
最終更新:2026年2月
なぜ Rust で JSONL を処理するのか?
Rust は、パフォーマンス、メモリ安全性、正確性が重要な JSONL 処理に優れた選択肢です。所有権モデルによりコンパイル時にデータ競合が排除され、ゼロコスト抽象化により高レベルなイテレータチェーンがタイトなループにコンパイルされ、serde_json はあらゆる言語の中で最速の JSON パーサーの一つです。毎秒数百万の JSONL レコードを処理するデータパイプラインを構築する場合、Rust は安全性を犠牲にすることなく、それを達成するための制御を提供します。
JSONL(JSON Lines)は1行に1つの JSON オブジェクトを格納するため、ストリーミング、追記専用ログ、大規模データセット処理に最適です。Rust の BufReader はファイル全体をメモリにロードせずに行単位で読み取り、イテレータモデルは JSONL の行指向構造に完全にフィットします。このガイドでは、BufReader で JSONL を読み書きする方法、serde でレコードを厳密に型付けされた構造体にパースする方法、rayon で並列処理する方法、tokio で非同期 I/O を処理する方法、Result と ? 演算子で堅牢なエラーハンドリングを構築する方法を学びます。
BufReader で JSONL ファイルを読み取る
Rust の標準ライブラリは、効率的なバッファード ファイル読み取りのための BufReader を提供しています。lines() イテレータと組み合わせることで、最小限のメモリオーバーヘッドで JSONL ファイルを1行ずつ読み取ります。各行は独立してパースされるため、このアプローチはあらゆるサイズのファイルに適しています。
最もシンプルなアプローチは、File を BufReader でラップし、lines() でイテレートし、各行を serde_json でパースします。ファイルサイズに関係なく、一度にメモリに保持するのは1行分のみです。
use std::fs::File;use std::io::{BufRead, BufReader};use serde_json::Value;fn read_jsonl(path: &str) -> std::io::Result<Vec<Value>> {let file = File::open(path)?;let reader = BufReader::new(file);let mut records = Vec::new();for (line_num, line) in reader.lines().enumerate() {let line = line?;let trimmed = line.trim();if trimmed.is_empty() {continue; // Skip empty lines}match serde_json::from_str::<Value>(trimmed) {Ok(value) => records.push(value),Err(e) => {eprintln!("Skipping invalid JSON at line {}: {}", line_num + 1, e);}}}Ok(records)}fn main() -> std::io::Result<()> {let records = read_jsonl("data.jsonl")?;println!("Loaded {} records", records.len());if let Some(first) = records.first() {println!("First record: {}", first);}Ok(())}
Cargo.toml に serde と serde_json を追加します。derive フィーチャーにより、型付き構造体を定義するための Serialize と Deserialize デリブマクロが有効になります。
[dependencies]serde = { version = "1", features = ["derive"] }serde_json = "1"# Optional: for parallel processingrayon = "1.10"# Optional: for async I/Otokio = { version = "1", features = ["full"] }
serde_json による型安全なパース
JSONL 処理における Rust 最大の強みの一つは、serde のデリブマクロです。各 JSON 行を厳密に型付けされた構造体に直接デシリアライズできます。コンパイラがコードがデータを正しく扱っているかを検証し、型の不一致、欠落フィールド、構造エラーを実行時ではなくコンパイル時にキャッチします。
Deserialize 付きの構造体を定義し、serde_json::from_str で各 JSONL 行をパースします。欠落フィールドや不正なフィールドは、プログラムの後の部分でパニックを引き起こすのではなく、パース時に明確なエラーメッセージを生成します。
use serde::Deserialize;use std::fs::File;use std::io::{BufRead, BufReader};#[derive(Debug, Deserialize)]struct LogEntry {timestamp: String,level: String,message: String,#[serde(default)]metadata: Option<serde_json::Value>,}fn read_typed_jsonl(path: &str) -> anyhow::Result<Vec<LogEntry>> {let file = File::open(path)?;let reader = BufReader::new(file);let mut entries = Vec::new();for line in reader.lines() {let line = line?;let trimmed = line.trim();if trimmed.is_empty() {continue;}let entry: LogEntry = serde_json::from_str(trimmed)?;entries.push(entry);}Ok(entries)}fn main() -> anyhow::Result<()> {let entries = read_typed_jsonl("logs.jsonl")?;for entry in &entries {println!("[{}] {}: {}", entry.level, entry.timestamp, entry.message);}Ok(())}
JSONL レコードのスキーマが可変の場合、serde_json::Value を使用して柔軟にアクセスします。ブラケット記法でインデックスアクセスしたり、as_str()、as_i64() メソッドで安全な型変換を行えます。
use serde_json::Value;use std::fs::File;use std::io::{BufRead, BufReader};fn process_dynamic_jsonl(path: &str) -> anyhow::Result<()> {let file = File::open(path)?;let reader = BufReader::new(file);for line in reader.lines() {let line = line?;let trimmed = line.trim();if trimmed.is_empty() {continue;}let value: Value = serde_json::from_str(trimmed)?;// Access fields dynamicallyif let Some(name) = value.get("name").and_then(Value::as_str) {println!("Name: {}", name);}if let Some(age) = value.get("age").and_then(Value::as_i64) {println!("Age: {}", age);}// Check if a field existsif value.get("email").is_some() {println!("Has email field");}}Ok(())}
Rust で JSONL ファイルを書き込む
JSONL の書き込みは読み取りの逆です:各レコードを JSON 文字列にシリアライズし、改行を追加してファイルに書き込みます。BufWriter と serde_json::to_string を使用することで、正確性とパフォーマンスの両方を確保できます。
serde_json::to_string() で各構造体をシリアライズし、改行に続けて書き込みます。Serialize デリブマクロが Rust 型から JSON への変換を自動的に処理します。
use serde::Serialize;use std::fs::File;use std::io::Write;#[derive(Serialize)]struct Record {id: u64,name: String,score: f64,}fn write_jsonl(path: &str, records: &[Record]) -> std::io::Result<()> {let mut file = File::create(path)?;for record in records {let json = serde_json::to_string(record).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;writeln!(file, "{}", json)?;}Ok(())}fn main() -> std::io::Result<()> {let records = vec![Record { id: 1, name: "Alice".into(), score: 95.5 },Record { id: 2, name: "Bob".into(), score: 87.3 },Record { id: 3, name: "Charlie".into(), score: 92.1 },];write_jsonl("output.jsonl", &records)?;println!("Wrote {} records", records.len());Ok(())}
大きな JSONL ファイルの書き込みには、File を BufWriter でラップしてシステムコールの回数を削減します。小さな書き込みを大きなバッファフラッシュにバッチ処理し、スループットを大幅に向上させます。
use serde::Serialize;use std::fs::File;use std::io::{BufWriter, Write};#[derive(Serialize)]struct Event {timestamp: u64,event_type: String,payload: serde_json::Value,}fn write_buffered_jsonl(path: &str, events: &[Event]) -> std::io::Result<()> {let file = File::create(path)?;let mut writer = BufWriter::new(file);for event in events {serde_json::to_writer(&mut writer, event).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;writer.write_all(b"\n")?;}writer.flush()?;Ok(())}
rayon による並列処理
Rust の rayon クレートは、最小限のコード変更でデータ並列処理を提供します。iter() の代わりに par_iter() を呼び出すだけで、rayon はワークスティーリングスレッドプールを使用して、すべての CPU コアに自動的に作業を分散します。これは、パースと処理が I/O 時間を支配する CPU バウンドな JSONL 変換に特に効果的です。
use rayon::prelude::*;use serde::{Deserialize, Serialize};use std::fs::File;use std::io::{BufRead, BufReader, BufWriter, Write};#[derive(Debug, Deserialize, Serialize)]struct Record {id: u64,text: String,#[serde(default)]word_count: Option<usize>,}fn parallel_process_jsonl(input_path: &str,output_path: &str,) -> anyhow::Result<usize> {// Step 1: Read all lines into memorylet file = File::open(input_path)?;let reader = BufReader::new(file);let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;// Step 2: Parse and transform in parallellet processed: Vec<Record> = lines.par_iter().filter(|line| !line.trim().is_empty()).filter_map(|line| {serde_json::from_str::<Record>(line.trim()).ok()}).map(|mut record| {record.word_count = Some(record.text.split_whitespace().count());record}).collect();// Step 3: Write resultslet file = File::create(output_path)?;let mut writer = BufWriter::new(file);let count = processed.len();for record in &processed {serde_json::to_writer(&mut writer, record)?;writer.write_all(b"\n")?;}writer.flush()?;Ok(count)}fn main() -> anyhow::Result<()> {let count = parallel_process_jsonl("input.jsonl", "output.jsonl")?;println!("Processed {} records in parallel", count);Ok(())}
このパターンはすべての行を読み取り、rayon の par_iter() で並列処理し、結果を順次書き込みます。並列ステップは利用可能なすべての CPU コアでパースと変換を処理します。メモリに収まるファイルの場合、コア数に近い線形のスピードアップを達成できます。非常に大きなファイルの場合は、メモリ使用量と並列処理のバランスを取るために入力をチャンク化することを検討してください。
tokio による非同期 I/O
JSONL 処理に HTTP エンドポイントからの読み取りやリモートストレージへの書き込みなどのネットワーク I/O が含まれる場合、tokio の非同期ランタイムにより I/O の待ち時間と計算処理をオーバーラップできます。tokio::io::AsyncBufReadExt トレイトは、同期的な BufReader API を反映した非同期 lines() メソッドを提供します。
use tokio::fs::File;use tokio::io::{AsyncBufReadExt, BufReader};use serde::Deserialize;#[derive(Debug, Deserialize)]struct ApiRecord {id: u64,url: String,status: String,}async fn read_jsonl_async(path: &str) -> anyhow::Result<Vec<ApiRecord>> {let file = File::open(path).await?;let reader = BufReader::new(file);let mut lines = reader.lines();let mut records = Vec::new();while let Some(line) = lines.next_line().await? {let trimmed = line.trim().to_string();if trimmed.is_empty() {continue;}let record: ApiRecord = serde_json::from_str(&trimmed)?;records.push(record);}Ok(records)}async fn write_jsonl_async(path: &str,records: &[ApiRecord],) -> anyhow::Result<()> {use tokio::io::AsyncWriteExt;let mut file = File::create(path).await?;for record in records {let json = serde_json::to_string(record)?;file.write_all(json.as_bytes()).await?;file.write_all(b"\n").await?;}file.flush().await?;Ok(())}#[tokio::main]async fn main() -> anyhow::Result<()> {let records = read_jsonl_async("api_logs.jsonl").await?;println!("Read {} async records", records.len());write_jsonl_async("output.jsonl", &records).await?;Ok(())}
非同期アプローチは、ネットワーク操作と組み合わせた場合に最も効果を発揮します。ローカルディスクの純粋なファイル I/O の場合、同期的な BufReader の方が高速なことが多いです。これは非同期がタスクスケジューリングのオーバーヘッドを追加するためです。HTTP ストリーム、S3 バケット、その他の I/O レイテンシが支配的なネットワークソースから JSONL を読み取る場合に tokio を使用してください。
エラーハンドリング:Result と ? 演算子
Rust の Result 型と ? 演算子は、JSONL 処理のためのクリーンで合成可能なエラーハンドリングパターンを提供します。try-catch ブロックの代わりに、失敗する可能性のある各操作は Result を返し、? で伝播するかローカルで match を使って処理できます。これにより、エラーパスが明示的になり、うっかり無視することが不可能になります。
use serde::Deserialize;use std::fs::File;use std::io::{BufRead, BufReader};use thiserror::Error;#[derive(Error, Debug)]enum JsonlError {#[error("I/O error: {0}")]Io(#[from] std::io::Error),#[error("JSON parse error at line {line}: {source}")]Parse {line: usize,source: serde_json::Error,},#[error("Validation error at line {line}: {message}")]Validation {line: usize,message: String,},}#[derive(Debug, Deserialize)]struct Record {id: u64,name: String,value: f64,}fn validate_record(record: &Record, line: usize) -> Result<(), JsonlError> {if record.name.is_empty() {return Err(JsonlError::Validation {line,message: "name cannot be empty".into(),});}if record.value < 0.0 {return Err(JsonlError::Validation {line,message: format!("value must be non-negative, got {}", record.value),});}Ok(())}fn process_jsonl_with_errors(path: &str,) -> Result<Vec<Record>, JsonlError> {let file = File::open(path)?; // Io error auto-converted via #[from]let reader = BufReader::new(file);let mut records = Vec::new();for (idx, line) in reader.lines().enumerate() {let line_num = idx + 1;let line = line?; // Propagate I/O errorslet trimmed = line.trim();if trimmed.is_empty() {continue;}let record: Record = serde_json::from_str(trimmed).map_err(|e| JsonlError::Parse { line: line_num, source: e })?;validate_record(&record, line_num)?;records.push(record);}Ok(records)}fn main() {match process_jsonl_with_errors("data.jsonl") {Ok(records) => println!("Successfully processed {} records", records.len()),Err(JsonlError::Io(e)) => eprintln!("File error: {}", e),Err(JsonlError::Parse { line, source }) => {eprintln!("JSON error at line {}: {}", line, source);}Err(JsonlError::Validation { line, message }) => {eprintln!("Validation error at line {}: {}", line, message);}}}
このパターンは、thiserror を使用してすべての失敗モードをカバーするカスタムエラー列挙型を定義します:I/O エラー、行番号付き JSON パースエラー、ドメイン固有の検証エラー。#[from] 属性により std::io::Error からの自動変換が可能になり、? 演算子がコールスタック上位にエラーを伝播します。各エラーバリアントは明確な診断メッセージを生成するのに十分なコンテキストを持ち、問題を引き起こした行を正確に特定するのが容易です。
JSON 処理用 Rust クレート
Rust エコシステムには、異なるパフォーマンス特性を持ついくつかの JSON パースクレートがあります。serde_json が標準的な選択肢であり、simd-json と sonic-rs は SIMD 命令を使用してパフォーマンスの限界を押し上げます。
serde_json
標準Rust における JSON のデファクトスタンダード。serde のデリブマクロとシームレスに統合され、ボイラープレートなしのシリアライゼーションを実現します。強力な型安全性、包括的なエラーメッセージ、幅広いエコシステムサポートにより、ほとんどの JSONL ワークロードに最適です。
simd-json
最速SIMD 命令を使用して毎秒数ギガバイトの速度で JSON をパースする simdjson の Rust ポート。ミュータブルな入力バッファが必要で、serde_json とは異なる API を提供しますが、パースが多いワークロードでは 2〜4 倍高速になる可能性があります。高スループットパイプラインに最適です。
sonic-rs
SIMD + serdeserde 互換 API を持つ SIMD アクセラレーション JSON ライブラリ。simd-json の速度と serde_json のエルゴノミクスを両立することを目指しています。遅延パースと即時パースの両方のモードをサポートし、コードの書き換えなしでパフォーマンスが必要な場合に良い選択です。
無料 JSONL ツールを試す
コードを書きたくない場合は、無料のオンラインツールでブラウザから直接 JSONL ファイルの表示、検証、変換ができます。