Rust 處理 JSONL:serde_json、BufReader 與零成本抽象
在 Rust 中使用 JSONL(JSON Lines)檔案的完整指南。學習使用 serde_json、BufReader、rayon 進行平行處理和 tokio 進行非同步 I/O 來讀取、寫入、解析和串流 JSONL 資料。
最後更新:2026 年 2 月
為什麼選擇 Rust 處理 JSONL?
當效能、記憶體安全和正確性很重要時,Rust 是處理 JSONL 的絕佳選擇。它的所有權模型在編譯期就消除了資料競爭,零成本抽象讓你可以撰寫高階的迭代器鏈式操作並編譯成緊湊的迴圈,而 serde_json 是所有語言中最快的 JSON 解析器之一。如果你正在建構需要每秒處理數百萬筆 JSONL 記錄的資料管線,Rust 讓你在不犧牲安全性的前提下實現這個目標。
JSONL(JSON Lines)每行儲存一個 JSON 物件,非常適合串流處理、僅附加日誌記錄和大型資料集處理。Rust 的 BufReader 可以逐行讀取檔案而無需將整個檔案載入記憶體,其迭代器模型完美契合 JSONL 的行導向結構。在本指南中,你將學習如何使用 BufReader 讀取和寫入 JSONL、使用 serde 將記錄解析為強型別 struct、使用 rayon 進行平行處理記錄、使用 tokio 處理非同步 I/O,以及使用 Result 和 ? 運算子建構健壯的錯誤處理。
使用 BufReader 讀取 JSONL 檔案
Rust 的標準函式庫提供 BufReader 用於高效的緩衝檔案讀取。搭配 lines() 迭代器,它可以以最小的記憶體開銷逐行讀取 JSONL 檔案。每行獨立解析,使此方式適用於任何大小的檔案。
最簡單的方式是將 File 包裝在 BufReader 中,迭代 lines(),然後使用 serde_json 解析每一行。無論檔案大小如何,一次只在記憶體中保留一行。
use std::fs::File;use std::io::{BufRead, BufReader};use serde_json::Value;fn read_jsonl(path: &str) -> std::io::Result<Vec<Value>> {let file = File::open(path)?;let reader = BufReader::new(file);let mut records = Vec::new();for (line_num, line) in reader.lines().enumerate() {let line = line?;let trimmed = line.trim();if trimmed.is_empty() {continue; // Skip empty lines}match serde_json::from_str::<Value>(trimmed) {Ok(value) => records.push(value),Err(e) => {eprintln!("Skipping invalid JSON at line {}: {}", line_num + 1, e);}}}Ok(records)}fn main() -> std::io::Result<()> {let records = read_jsonl("data.jsonl")?;println!("Loaded {} records", records.len());if let Some(first) = records.first() {println!("First record: {}", first);}Ok(())}
在 Cargo.toml 中加入 serde 和 serde_json。derive 功能啟用 Serialize 和 Deserialize derive 巨集,用於定義型別化 struct。
[dependencies]serde = { version = "1", features = ["derive"] }serde_json = "1"# Optional: for parallel processingrayon = "1.10"# Optional: for async I/Otokio = { version = "1", features = ["full"] }
使用 serde_json 進行型別安全解析
Rust 處理 JSONL 最大的優勢之一是 serde 的 derive 巨集,讓你可以將每個 JSON 行直接反序列化為強型別 struct。編譯器會驗證你的程式碼正確處理資料,在編譯期而非執行期捕獲型別不匹配、欄位缺失和結構錯誤。
定義帶有 Deserialize 的 struct,使用 serde_json::from_str 將每個 JSONL 行解析到其中。欄位缺失或格式錯誤會在解析時產生清晰的錯誤訊息,而不是稍後在程式中造成 panic。
use serde::Deserialize;use std::fs::File;use std::io::{BufRead, BufReader};#[derive(Debug, Deserialize)]struct LogEntry {timestamp: String,level: String,message: String,#[serde(default)]metadata: Option<serde_json::Value>,}fn read_typed_jsonl(path: &str) -> anyhow::Result<Vec<LogEntry>> {let file = File::open(path)?;let reader = BufReader::new(file);let mut entries = Vec::new();for line in reader.lines() {let line = line?;let trimmed = line.trim();if trimmed.is_empty() {continue;}let entry: LogEntry = serde_json::from_str(trimmed)?;entries.push(entry);}Ok(entries)}fn main() -> anyhow::Result<()> {let entries = read_typed_jsonl("logs.jsonl")?;for entry in &entries {println!("[{}] {}: {}", entry.level, entry.timestamp, entry.message);}Ok(())}
當 JSONL 記錄具有不同的 schema 時,使用 serde_json::Value 進行靈活存取。你可以使用括號表示法索引值,或使用 as_str()、as_i64() 方法進行安全的型別轉換。
use serde_json::Value;use std::fs::File;use std::io::{BufRead, BufReader};fn process_dynamic_jsonl(path: &str) -> anyhow::Result<()> {let file = File::open(path)?;let reader = BufReader::new(file);for line in reader.lines() {let line = line?;let trimmed = line.trim();if trimmed.is_empty() {continue;}let value: Value = serde_json::from_str(trimmed)?;// Access fields dynamicallyif let Some(name) = value.get("name").and_then(Value::as_str) {println!("Name: {}", name);}if let Some(age) = value.get("age").and_then(Value::as_i64) {println!("Age: {}", age);}// Check if a field existsif value.get("email").is_some() {println!("Has email field");}}Ok(())}
在 Rust 中寫入 JSONL 檔案
寫入 JSONL 是讀取的反向操作:將每筆記錄序列化為 JSON 字串、附加換行符,然後寫入檔案。使用 BufWriter 和 serde_json::to_string 確保正確性和效能。
使用 serde_json::to_string() 序列化每個 struct,然後寫入並附加換行符。Serialize derive 巨集自動處理從 Rust 型別到 JSON 的轉換。
use serde::Serialize;use std::fs::File;use std::io::Write;#[derive(Serialize)]struct Record {id: u64,name: String,score: f64,}fn write_jsonl(path: &str, records: &[Record]) -> std::io::Result<()> {let mut file = File::create(path)?;for record in records {let json = serde_json::to_string(record).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;writeln!(file, "{}", json)?;}Ok(())}fn main() -> std::io::Result<()> {let records = vec![Record { id: 1, name: "Alice".into(), score: 95.5 },Record { id: 2, name: "Bob".into(), score: 87.3 },Record { id: 3, name: "Charlie".into(), score: 92.1 },];write_jsonl("output.jsonl", &records)?;println!("Wrote {} records", records.len());Ok(())}
寫入大型 JSONL 檔案時,將 File 包裝在 BufWriter 中以減少系統呼叫次數。這會將多次小寫入批次合併為較大的緩衝區刷新,顯著提高吞吐量。
use serde::Serialize;use std::fs::File;use std::io::{BufWriter, Write};#[derive(Serialize)]struct Event {timestamp: u64,event_type: String,payload: serde_json::Value,}fn write_buffered_jsonl(path: &str, events: &[Event]) -> std::io::Result<()> {let file = File::create(path)?;let mut writer = BufWriter::new(file);for event in events {serde_json::to_writer(&mut writer, event).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;writer.write_all(b"\n")?;}writer.flush()?;Ok(())}
使用 rayon 進行平行處理
Rust 的 rayon crate 以最少的程式碼修改提供資料平行處理。只需將 iter() 改為 par_iter(),rayon 就會使用工作竊取執行緒池自動將工作分配到所有 CPU 核心。這對於解析和處理時間超過 I/O 時間的 CPU 密集型 JSONL 轉換特別有效。
use rayon::prelude::*;use serde::{Deserialize, Serialize};use std::fs::File;use std::io::{BufRead, BufReader, BufWriter, Write};#[derive(Debug, Deserialize, Serialize)]struct Record {id: u64,text: String,#[serde(default)]word_count: Option<usize>,}fn parallel_process_jsonl(input_path: &str,output_path: &str,) -> anyhow::Result<usize> {// Step 1: Read all lines into memorylet file = File::open(input_path)?;let reader = BufReader::new(file);let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;// Step 2: Parse and transform in parallellet processed: Vec<Record> = lines.par_iter().filter(|line| !line.trim().is_empty()).filter_map(|line| {serde_json::from_str::<Record>(line.trim()).ok()}).map(|mut record| {record.word_count = Some(record.text.split_whitespace().count());record}).collect();// Step 3: Write resultslet file = File::create(output_path)?;let mut writer = BufWriter::new(file);let count = processed.len();for record in &processed {serde_json::to_writer(&mut writer, record)?;writer.write_all(b"\n")?;}writer.flush()?;Ok(count)}fn main() -> anyhow::Result<()> {let count = parallel_process_jsonl("input.jsonl", "output.jsonl")?;println!("Processed {} records in parallel", count);Ok(())}
這個模式讀取所有行、使用 rayon 的 par_iter() 進行平行處理,然後依序寫入結果。平行步驟在所有可用的 CPU 核心上處理解析和轉換。對於可以載入記憶體的檔案,這可以實現接近核心數線性加速。對於非常大的檔案,考慮將輸入分塊以平衡記憶體使用和平行度。
使用 tokio 進行非同步 I/O
當你的 JSONL 處理涉及網路 I/O(例如從 HTTP 端點讀取或寫入遠端儲存)時,tokio 的非同步執行期讓你可以將 I/O 等待與運算重疊。tokio::io::AsyncBufReadExt trait 提供了非同步的 lines() 方法,其 API 與同步的 BufReader 相似。
use tokio::fs::File;use tokio::io::{AsyncBufReadExt, BufReader};use serde::Deserialize;#[derive(Debug, Deserialize)]struct ApiRecord {id: u64,url: String,status: String,}async fn read_jsonl_async(path: &str) -> anyhow::Result<Vec<ApiRecord>> {let file = File::open(path).await?;let reader = BufReader::new(file);let mut lines = reader.lines();let mut records = Vec::new();while let Some(line) = lines.next_line().await? {let trimmed = line.trim().to_string();if trimmed.is_empty() {continue;}let record: ApiRecord = serde_json::from_str(&trimmed)?;records.push(record);}Ok(records)}async fn write_jsonl_async(path: &str,records: &[ApiRecord],) -> anyhow::Result<()> {use tokio::io::AsyncWriteExt;let mut file = File::create(path).await?;for record in records {let json = serde_json::to_string(record)?;file.write_all(json.as_bytes()).await?;file.write_all(b"\n").await?;}file.flush().await?;Ok(())}#[tokio::main]async fn main() -> anyhow::Result<()> {let records = read_jsonl_async("api_logs.jsonl").await?;println!("Read {} async records", records.len());write_jsonl_async("output.jsonl", &records).await?;Ok(())}
非同步方式在搭配網路操作時最有價值。對於純本機磁碟的檔案 I/O,同步 BufReader 通常更快,因為非同步會增加任務排程的開銷。當你需要從 HTTP 串流、S3 bucket 或其他 I/O 延遲較高的網路來源讀取 JSONL 時,使用 tokio。
錯誤處理:Result 與 ? 運算子
Rust 的 Result 型別和 ? 運算子為 JSONL 處理提供了乾淨、可組合的錯誤處理模式。每個可能失敗的操作都返回 Result,你可以用 ? 傳播錯誤或用 match 在本地處理。這使得錯誤路徑變得明確且不可能被意外忽略。
use serde::Deserialize;use std::fs::File;use std::io::{BufRead, BufReader};use thiserror::Error;#[derive(Error, Debug)]enum JsonlError {#[error("I/O error: {0}")]Io(#[from] std::io::Error),#[error("JSON parse error at line {line}: {source}")]Parse {line: usize,source: serde_json::Error,},#[error("Validation error at line {line}: {message}")]Validation {line: usize,message: String,},}#[derive(Debug, Deserialize)]struct Record {id: u64,name: String,value: f64,}fn validate_record(record: &Record, line: usize) -> Result<(), JsonlError> {if record.name.is_empty() {return Err(JsonlError::Validation {line,message: "name cannot be empty".into(),});}if record.value < 0.0 {return Err(JsonlError::Validation {line,message: format!("value must be non-negative, got {}", record.value),});}Ok(())}fn process_jsonl_with_errors(path: &str,) -> Result<Vec<Record>, JsonlError> {let file = File::open(path)?; // Io error auto-converted via #[from]let reader = BufReader::new(file);let mut records = Vec::new();for (idx, line) in reader.lines().enumerate() {let line_num = idx + 1;let line = line?; // Propagate I/O errorslet trimmed = line.trim();if trimmed.is_empty() {continue;}let record: Record = serde_json::from_str(trimmed).map_err(|e| JsonlError::Parse { line: line_num, source: e })?;validate_record(&record, line_num)?;records.push(record);}Ok(records)}fn main() {match process_jsonl_with_errors("data.jsonl") {Ok(records) => println!("Successfully processed {} records", records.len()),Err(JsonlError::Io(e)) => eprintln!("File error: {}", e),Err(JsonlError::Parse { line, source }) => {eprintln!("JSON error at line {}: {}", line, source);}Err(JsonlError::Validation { line, message }) => {eprintln!("Validation error at line {}: {}", line, message);}}}
這個模式使用 thiserror 定義了自訂錯誤列舉,涵蓋所有失敗模式:I/O 錯誤、帶行號的 JSON 解析錯誤,以及領域特定的驗證錯誤。#[from] 屬性啟用從 std::io::Error 的自動轉換,? 運算子將錯誤沿呼叫堆疊向上傳播。每個錯誤變體都攜帶足夠的上下文來產生清晰的診斷訊息,使得精確定位哪一行導致問題變得容易。
Rust 的 JSON 處理 Crate
Rust 生態系統提供了多個具有不同效能特性的 JSON 解析 crate。serde_json 是標準選擇,而 simd-json 和 sonic-rs 使用 SIMD 指令突破效能極限。
serde_json
標準Rust 中事實上的 JSON 標準。它與 serde 的 derive 巨集無縫整合,實現零樣板序列化。適用於大多數 JSONL 工作負載,具有強型別安全、全面的錯誤訊息和廣泛的生態系統支援。
simd-json
最快simdjson 的 Rust 移植版本,使用 SIMD 指令以每秒數 GB 的速度解析 JSON。它需要可變的輸入緩衝區,並提供與 serde_json 不同的 API,但對於解析密集型工作負載可以快 2-4 倍。最適合高吞吐量管線。
sonic-rs
SIMD + serde具有 serde 相容 API 的 SIMD 加速 JSON 函式庫。它旨在結合 simd-json 的速度和 serde_json 的便利性。支援惰性和積極兩種解析模式,是在不重寫程式碼的情況下提升效能的好選擇。
試試我們的免費 JSONL 工具
不想寫程式?使用我們的免費線上工具,直接在瀏覽器中檢視、驗證和轉換 JSONL 檔案。