Rust 处理 JSONL:serde_json、BufReader 与零成本抽象

在 Rust 中使用 JSONL(JSON Lines)文件的完整指南。学习使用 serde_json、BufReader、rayon 并行处理和 tokio 异步 I/O 来读取、写入、解析和流式处理 JSONL 数据。

最后更新:2026年2月

为什么选择 Rust 处理 JSONL?

当性能、内存安全和正确性至关重要时,Rust 是处理 JSONL 的绝佳选择。其所有权模型在编译时消除数据竞争,其零成本抽象让您编写高层迭代器链并编译为紧凑的循环,而 serde_json 是所有编程语言中速度最快的 JSON 解析器之一。如果您正在构建需要每秒处理数百万条 JSONL 记录的数据管道,Rust 能让您在不牺牲安全性的前提下实现这一目标。

JSONL(JSON Lines)每行存储一个 JSON 对象,非常适合流式处理、仅追加日志记录和大型数据集处理。Rust 的 BufReader 逐行读取文件,无需将整个文件加载到内存中,其迭代器模型与 JSONL 的行导向结构完美契合。在本指南中,您将学习如何使用 BufReader 读写 JSONL、使用 serde 将记录解析为强类型 struct、使用 rayon 并行处理记录、使用 tokio 处理异步 I/O,以及使用 Result 和 ? 运算符构建健壮的错误处理。

使用 BufReader 读取 JSONL 文件

Rust 的标准库提供了 BufReader 用于高效的缓冲文件读取。结合 lines() 迭代器,它以最小的内存开销逐行读取 JSONL 文件。每行独立解析,使这种方式适用于任意大小的文件。

最简单的方式是将 File 包装在 BufReader 中,遍历 lines(),并用 serde_json 解析每行。无论文件多大,都只在内存中保留一行数据。

使用 BufReader 基础读取 JSONL
use std::fs::File;
use std::io::{BufRead, BufReader};
use serde_json::Value;
fn read_jsonl(path: &str) -> std::io::Result<Vec<Value>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut records = Vec::new();
for (line_num, line) in reader.lines().enumerate() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue; // Skip empty lines
}
match serde_json::from_str::<Value>(trimmed) {
Ok(value) => records.push(value),
Err(e) => {
eprintln!("Skipping invalid JSON at line {}: {}", line_num + 1, e);
}
}
}
Ok(records)
}
fn main() -> std::io::Result<()> {
let records = read_jsonl("data.jsonl")?;
println!("Loaded {} records", records.len());
if let Some(first) = records.first() {
println!("First record: {}", first);
}
Ok(())
}

将 serde 和 serde_json 添加到 Cargo.toml 中。derive 特性启用 Serialize 和 Deserialize 派生宏,用于定义类型化的 struct。

Cargo.toml 依赖
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# Optional: for parallel processing
rayon = "1.10"
# Optional: for async I/O
tokio = { version = "1", features = ["full"] }

使用 serde_json 进行类型安全解析

Rust 处理 JSONL 最大的优势之一是 serde 的派生宏,它可以将每行 JSON 直接反序列化为强类型 struct。编译器会验证您的代码是否正确处理了数据,在编译时而非运行时捕获类型不匹配、缺失字段和结构错误。

定义带有 Deserialize 的 struct,使用 serde_json::from_str 将每行 JSONL 解析到其中。缺失或格式错误的字段会在解析时产生清晰的错误消息,而不是在程序后续执行中导致 panic。

类型化 Struct 反序列化
use serde::Deserialize;
use std::fs::File;
use std::io::{BufRead, BufReader};
#[derive(Debug, Deserialize)]
struct LogEntry {
timestamp: String,
level: String,
message: String,
#[serde(default)]
metadata: Option<serde_json::Value>,
}
fn read_typed_jsonl(path: &str) -> anyhow::Result<Vec<LogEntry>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut entries = Vec::new();
for line in reader.lines() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let entry: LogEntry = serde_json::from_str(trimmed)?;
entries.push(entry);
}
Ok(entries)
}
fn main() -> anyhow::Result<()> {
let entries = read_typed_jsonl("logs.jsonl")?;
for entry in &entries {
println!("[{}] {}: {}", entry.level, entry.timestamp, entry.message);
}
Ok(())
}

当 JSONL 记录具有不同的 schema 时,使用 serde_json::Value 进行灵活访问。您可以使用方括号表示法索引值,或使用 as_str()、as_i64() 方法进行安全的类型转换。

使用 serde_json::Value 动态解析
use serde_json::Value;
use std::fs::File;
use std::io::{BufRead, BufReader};
fn process_dynamic_jsonl(path: &str) -> anyhow::Result<()> {
let file = File::open(path)?;
let reader = BufReader::new(file);
for line in reader.lines() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let value: Value = serde_json::from_str(trimmed)?;
// Access fields dynamically
if let Some(name) = value.get("name").and_then(Value::as_str) {
println!("Name: {}", name);
}
if let Some(age) = value.get("age").and_then(Value::as_i64) {
println!("Age: {}", age);
}
// Check if a field exists
if value.get("email").is_some() {
println!("Has email field");
}
}
Ok(())
}

在 Rust 中写入 JSONL 文件

写入 JSONL 是读取的逆过程:将每条记录序列化为 JSON 字符串,追加换行符,然后写入文件。使用 BufWriter 和 serde_json::to_string 可确保正确性和性能。

使用 serde_json::to_string() 序列化每个 struct 并在其后写入换行符。Serialize 派生宏自动处理从 Rust 类型到 JSON 的转换。

基础 JSONL 写入
use serde::Serialize;
use std::fs::File;
use std::io::Write;
#[derive(Serialize)]
struct Record {
id: u64,
name: String,
score: f64,
}
fn write_jsonl(path: &str, records: &[Record]) -> std::io::Result<()> {
let mut file = File::create(path)?;
for record in records {
let json = serde_json::to_string(record)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
writeln!(file, "{}", json)?;
}
Ok(())
}
fn main() -> std::io::Result<()> {
let records = vec![
Record { id: 1, name: "Alice".into(), score: 95.5 },
Record { id: 2, name: "Bob".into(), score: 87.3 },
Record { id: 3, name: "Charlie".into(), score: 92.1 },
];
write_jsonl("output.jsonl", &records)?;
println!("Wrote {} records", records.len());
Ok(())
}

写入大型 JSONL 文件时,使用 BufWriter 包装 File 以减少系统调用次数。它将小写入批量合并为更大的缓冲区刷新,显著提高吞吐量。

大文件缓冲写入
use serde::Serialize;
use std::fs::File;
use std::io::{BufWriter, Write};
#[derive(Serialize)]
struct Event {
timestamp: u64,
event_type: String,
payload: serde_json::Value,
}
fn write_buffered_jsonl(path: &str, events: &[Event]) -> std::io::Result<()> {
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
for event in events {
serde_json::to_writer(&mut writer, event)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
writer.write_all(b"\n")?;
}
writer.flush()?;
Ok(())
}

使用 rayon 并行处理

Rust 的 rayon crate 以极少的代码改动实现数据并行。只需将 iter() 替换为 par_iter(),rayon 就会使用工作窃取线程池自动将工作分配到所有 CPU 核心。这对于解析和处理时间超过 I/O 时间的 CPU 密集型 JSONL 转换尤其有效。

使用 rayon 并行处理
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
#[derive(Debug, Deserialize, Serialize)]
struct Record {
id: u64,
text: String,
#[serde(default)]
word_count: Option<usize>,
}
fn parallel_process_jsonl(
input_path: &str,
output_path: &str,
) -> anyhow::Result<usize> {
// Step 1: Read all lines into memory
let file = File::open(input_path)?;
let reader = BufReader::new(file);
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
// Step 2: Parse and transform in parallel
let processed: Vec<Record> = lines
.par_iter()
.filter(|line| !line.trim().is_empty())
.filter_map(|line| {
serde_json::from_str::<Record>(line.trim()).ok()
})
.map(|mut record| {
record.word_count = Some(record.text.split_whitespace().count());
record
})
.collect();
// Step 3: Write results
let file = File::create(output_path)?;
let mut writer = BufWriter::new(file);
let count = processed.len();
for record in &processed {
serde_json::to_writer(&mut writer, record)?;
writer.write_all(b"\n")?;
}
writer.flush()?;
Ok(count)
}
fn main() -> anyhow::Result<()> {
let count = parallel_process_jsonl("input.jsonl", "output.jsonl")?;
println!("Processed {} records in parallel", count);
Ok(())
}

此模式读取所有行,使用 rayon 的 par_iter() 并行处理,然后按顺序写入结果。并行步骤跨所有可用 CPU 核心处理解析和转换。对于能放入内存的文件,这可以实现接近线性的核心数加速。对于非常大的文件,考虑将输入分块以平衡内存使用和并行度。

使用 tokio 进行异步 I/O

当您的 JSONL 处理涉及网络 I/O(如从 HTTP 端点读取或写入远程存储)时,tokio 的异步运行时可以让 I/O 等待与计算重叠。tokio::io::AsyncBufReadExt trait 提供了一个异步 lines() 方法,与同步 BufReader API 对应。

使用 tokio 进行异步 I/O
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct ApiRecord {
id: u64,
url: String,
status: String,
}
async fn read_jsonl_async(path: &str) -> anyhow::Result<Vec<ApiRecord>> {
let file = File::open(path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut records = Vec::new();
while let Some(line) = lines.next_line().await? {
let trimmed = line.trim().to_string();
if trimmed.is_empty() {
continue;
}
let record: ApiRecord = serde_json::from_str(&trimmed)?;
records.push(record);
}
Ok(records)
}
async fn write_jsonl_async(
path: &str,
records: &[ApiRecord],
) -> anyhow::Result<()> {
use tokio::io::AsyncWriteExt;
let mut file = File::create(path).await?;
for record in records {
let json = serde_json::to_string(record)?;
file.write_all(json.as_bytes()).await?;
file.write_all(b"\n").await?;
}
file.flush().await?;
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let records = read_jsonl_async("api_logs.jsonl").await?;
println!("Read {} async records", records.len());
write_jsonl_async("output.jsonl", &records).await?;
Ok(())
}

异步方式在与网络操作结合时最为有效。对于本地磁盘的纯文件 I/O,同步 BufReader 通常更快,因为 async 会增加任务调度的开销。当您需要从 HTTP 流、S3 存储桶或其他 I/O 延迟占主导的网络源读取 JSONL 时,使用 tokio。

错误处理:Result 与 ? 运算符

Rust 的 Result 类型和 ? 运算符为 JSONL 处理提供了简洁、可组合的错误处理模式。不同于 try-catch 块,每个可能失败的操作返回一个 Result,您可以使用 ? 传播或使用 match 本地处理。这使错误路径显式化,不可能被意外忽略。

错误处理:Result 与 ? 运算符
use serde::Deserialize;
use std::fs::File;
use std::io::{BufRead, BufReader};
use thiserror::Error;
#[derive(Error, Debug)]
enum JsonlError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON parse error at line {line}: {source}")]
Parse {
line: usize,
source: serde_json::Error,
},
#[error("Validation error at line {line}: {message}")]
Validation {
line: usize,
message: String,
},
}
#[derive(Debug, Deserialize)]
struct Record {
id: u64,
name: String,
value: f64,
}
fn validate_record(record: &Record, line: usize) -> Result<(), JsonlError> {
if record.name.is_empty() {
return Err(JsonlError::Validation {
line,
message: "name cannot be empty".into(),
});
}
if record.value < 0.0 {
return Err(JsonlError::Validation {
line,
message: format!("value must be non-negative, got {}", record.value),
});
}
Ok(())
}
fn process_jsonl_with_errors(
path: &str,
) -> Result<Vec<Record>, JsonlError> {
let file = File::open(path)?; // Io error auto-converted via #[from]
let reader = BufReader::new(file);
let mut records = Vec::new();
for (idx, line) in reader.lines().enumerate() {
let line_num = idx + 1;
let line = line?; // Propagate I/O errors
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let record: Record = serde_json::from_str(trimmed)
.map_err(|e| JsonlError::Parse { line: line_num, source: e })?;
validate_record(&record, line_num)?;
records.push(record);
}
Ok(records)
}
fn main() {
match process_jsonl_with_errors("data.jsonl") {
Ok(records) => println!("Successfully processed {} records", records.len()),
Err(JsonlError::Io(e)) => eprintln!("File error: {}", e),
Err(JsonlError::Parse { line, source }) => {
eprintln!("JSON error at line {}: {}", line, source);
}
Err(JsonlError::Validation { line, message }) => {
eprintln!("Validation error at line {}: {}", line, message);
}
}
}

此模式使用 thiserror 定义了一个自定义错误枚举,涵盖所有失败模式:I/O 错误、带行号的 JSON 解析错误和领域特定的验证错误。#[from] 属性启用了从 std::io::Error 的自动转换,? 运算符将错误沿调用栈向上传播。每个错误变体携带足够的上下文来生成清晰的诊断消息,便于精确定位导致问题的行。

用于 JSON 处理的 Rust Crate

Rust 生态系统提供了多个具有不同性能特征的 JSON 解析 crate。serde_json 是标准选择,而 simd-json 和 sonic-rs 使用 SIMD 指令来突破性能极限。

serde_json

标准

Rust 中 JSON 处理的事实标准。与 serde 的派生宏无缝集成,实现零样板序列化。对于大多数 JSONL 工作负载表现优异,具有强类型安全、全面的错误消息和广泛的生态系统支持。

simd-json

最快

simdjson 的 Rust 移植版,使用 SIMD 指令以每秒数 GB 的速度解析 JSON。它需要可变输入缓冲区并提供与 serde_json 不同的 API,但对于解析密集型工作负载可以快 2-4 倍。最适合高吞吐量管道。

sonic-rs

SIMD + serde

具有 serde 兼容 API 的 SIMD 加速 JSON 库。旨在结合 simd-json 的速度和 serde_json 的易用性。支持惰性和急切解析模式,是在不重写代码的情况下获得性能提升的好选择。

试试我们的免费 JSONL 工具

不想写代码?使用我们的免费在线工具直接在浏览器中查看、验证和转换 JSONL 文件。

在线处理 JSONL 文件

直接在浏览器中查看、验证和转换最大 1GB 的 JSONL 文件。无需上传,100% 私密。

常见问题

Rust 处理 JSONL — serde_json、BufReader 与流式模式 | jsonl.co