Go 处理 JSONL:bufio、json.Decoder 与并发处理

在 Go 中使用 JSONL(JSON Lines)文件的完整指南。学习使用标准库的 bufio、encoding/json 和 goroutines 来读取、写入、流式处理和并发处理 JSONL 数据。

最后更新:2026年2月

为什么选择 Go 处理 JSONL?

Go 是处理 JSONL 文件的绝佳选择,尤其是在性能和并发性至关重要的场景下。标准库提供了所需的一切:bufio 用于高效的逐行 I/O,encoding/json 用于解析和序列化,goroutines 用于并行处理。无需任何第三方依赖。Go 的编译特性意味着您的 JSONL 管道运行速度将远超同等的 Python 或 Node.js 脚本,在普通硬件上通常可以每秒处理数百万行。

JSONL(JSON Lines)每行存储一个 JSON 对象,天然适合 Go 的流式 I/O 模型。无需将整个文件加载到内存中,您可以使用 scanner 或 decoder 逐条读取和处理记录。结合 Go 的轻量级 goroutines 和 channels,您可以构建充分利用所有可用 CPU 核心的并发 JSONL 管道。在本指南中,您将学习如何使用 bufio.Scanner 读取 JSONL、使用 json.Decoder 进行流式处理、高效写入 JSONL、并发处理记录以及健壮地处理错误。

使用 bufio.Scanner 读取 JSONL 文件

在 Go 中读取 JSONL 最直接的方式是使用 bufio.Scanner。它逐行读取文件,然后用 json.Unmarshal 解析每一行。这种方式让您可以完全控制缓冲区大小和错误处理。

打开文件,创建 scanner,逐行迭代。每行通过 json.Unmarshal 解析为 map 或 struct。这样可以将内存使用量保持在单条记录级别,而非整个文件。

使用 bufio.Scanner 基础读取
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
func main() {
file, err := os.Open("data.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
var records []map[string]any
scanner := bufio.NewScanner(file)
// Increase buffer for lines longer than 64KB
scanner.Buffer(make([]byte, 0, 1024*1024), 1024*1024)
for scanner.Scan() {
var record map[string]any
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
log.Printf("skipping invalid JSON: %v", err)
continue
}
records = append(records, record)
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
fmt.Printf("Loaded %d records\n", len(records))
}

在生产代码中,将 JSONL 记录解析为类型化的 Go struct 而非通用 map。这提供了编译时类型安全、更好的性能和更清晰的代码。使用 json 标签定义 struct 以控制字段映射。

解析为类型化 Struct
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age,omitempty"`
}
func readUsers(path string) ([]User, error) {
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
defer file.Close()
var users []User
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
lineNum++
var u User
if err := json.Unmarshal(scanner.Bytes(), &u); err != nil {
return nil, fmt.Errorf("line %d: %w", lineNum, err)
}
users = append(users, u)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("scanner error: %w", err)
}
return users, nil
}
func main() {
users, err := readUsers("users.jsonl")
if err != nil {
log.Fatal(err)
}
for _, u := range users {
fmt.Printf("%s (%s)\n", u.Name, u.Email)
}
}

使用 json.Decoder 流式处理

对于高性能流式处理,json.Decoder 直接从 io.Reader 读取,无需中间行分割。它内部处理缓冲,是处理大型 JSONL 文件或网络流的推荐方式,可最大限度减少内存分配。

json.NewDecoder 包装任意 io.Reader 并按顺序解码 JSON 值。每次调用 Decode 恰好读取一个 JSON 对象。当流结束时返回 io.EOF。这比 bufio.Scanner + json.Unmarshal 更高效,因为它避免了将每行复制到单独的缓冲区。

使用 json.Decoder 流式处理
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
)
type LogEntry struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Service string `json:"service"`
}
func processJSONLStream(path string) error {
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer file.Close()
decoder := json.NewDecoder(file)
var count int
for {
var entry LogEntry
err := decoder.Decode(&entry)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("decode record %d: %w", count+1, err)
}
count++
// Process each record as it's decoded
if entry.Level == "ERROR" {
fmt.Printf("[%s] %s: %s\n",
entry.Timestamp, entry.Service, entry.Message)
}
}
fmt.Printf("Processed %d log entries\n", count)
return nil
}
func main() {
if err := processJSONLStream("logs.jsonl"); err != nil {
log.Fatal(err)
}
}

在 Go 中写入 JSONL 文件

在 Go 中写入 JSONL 非常简单:使用 json.Marshal 序列化每条记录,追加换行符,然后写入文件。为了获得最佳性能,用 bufio.Writer 包装文件写入器以减少系统调用。

使用 json.Marshal 将每条记录序列化为 JSON 字节,然后在其后写入换行符。这种方式简单直接,适用于中小型文件。

使用 json.Marshal 基础写入
package main
import (
"encoding/json"
"fmt"
"log"
"os"
)
type Product struct {
ID int `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
}
func main() {
products := []Product{
{ID: 1, Name: "Laptop", Price: 999.99},
{ID: 2, Name: "Mouse", Price: 29.99},
{ID: 3, Name: "Keyboard", Price: 79.99},
}
file, err := os.Create("products.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
for _, p := range products {
data, err := json.Marshal(p)
if err != nil {
log.Printf("skip marshal error: %v", err)
continue
}
data = append(data, '\n')
if _, err := file.Write(data); err != nil {
log.Fatal(err)
}
}
fmt.Printf("Wrote %d records to products.jsonl\n", len(products))
}

对于高吞吐量写入,使用 json.NewEncoder 配合 bufio.Writer。encoder 在每次 Encode 调用后自动追加换行符,缓冲写入器将小写入批量合并为更大的系统调用。务必在关闭文件前调用 Flush。

使用 json.Encoder 缓冲写入
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
type Event struct {
Type string `json:"type"`
Timestamp int64 `json:"timestamp"`
Payload map[string]any `json:"payload"`
}
func writeEvents(path string, events []Event) error {
file, err := os.Create(path)
if err != nil {
return fmt.Errorf("create file: %w", err)
}
defer file.Close()
writer := bufio.NewWriter(file)
encoder := json.NewEncoder(writer)
// Disable HTML escaping for cleaner output
encoder.SetEscapeHTML(false)
for i, event := range events {
if err := encoder.Encode(event); err != nil {
return fmt.Errorf("encode event %d: %w", i, err)
}
}
// Flush buffered data to file
if err := writer.Flush(); err != nil {
return fmt.Errorf("flush: %w", err)
}
fmt.Printf("Wrote %d events to %s\n", len(events), path)
return nil
}
func main() {
events := []Event{
{Type: "click", Timestamp: 1700000001, Payload: map[string]any{"x": 120, "y": 450}},
{Type: "view", Timestamp: 1700000002, Payload: map[string]any{"page": "/home"}},
}
if err := writeEvents("events.jsonl", events); err != nil {
log.Fatal(err)
}
}

使用 Goroutines 并发处理 JSONL

Go 的 goroutines 和 channels 使构建并发 JSONL 管道变得简单。fan-out 模式从文件中读取记录,将它们分发到多个 worker goroutines,并通过 channel 收集结果。这非常适合在大型 JSONL 文件上执行 CPU 密集型转换。

使用 Goroutines 并发处理 JSONL
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"runtime"
"sync"
)
type Record struct {
ID int `json:"id"`
Data map[string]any `json:"data"`
}
type Result struct {
ID int `json:"id"`
Processed bool `json:"processed"`
Score int `json:"score"`
}
func process(r Record) Result {
// Simulate CPU-bound work
score := len(r.Data) * r.ID
return Result{ID: r.ID, Processed: true, Score: score}
}
func main() {
file, err := os.Open("records.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
numWorkers := runtime.NumCPU()
jobs := make(chan Record, numWorkers*2)
results := make(chan Result, numWorkers*2)
// Start workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for record := range jobs {
results <- process(record)
}
}()
}
// Close results channel when all workers finish
go func() {
wg.Wait()
close(results)
}()
// Read file and send records to workers
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var r Record
if err := json.Unmarshal(scanner.Bytes(), &r); err != nil {
log.Printf("skip invalid line: %v", err)
continue
}
jobs <- r
}
close(jobs)
}()
// Collect and write results
outFile, err := os.Create("results.jsonl")
if err != nil {
log.Fatal(err)
}
defer outFile.Close()
writer := bufio.NewWriter(outFile)
encoder := json.NewEncoder(writer)
var count int
for result := range results {
if err := encoder.Encode(result); err != nil {
log.Printf("encode error: %v", err)
}
count++
}
writer.Flush()
fmt.Printf("Processed %d records with %d workers\n", count, numWorkers)
}

这种 fan-out 模式将 JSONL 记录分发到与 CPU 核心数量相等的 goroutines 中。带缓冲的 channels 防止 goroutines 在发送或接收时阻塞。sync.WaitGroup 确保所有 workers 完成后才关闭 results channel。对于 I/O 密集型工作负载(如 API 调用),您可以将 numWorkers 增加到超过 CPU 核心数。请注意,并发处理时输出顺序无法保证。

错误处理最佳实践

在 Go 中进行健壮的 JSONL 处理需要仔细的错误处理。实际的 JSONL 文件可能包含格式错误的行、编码问题或意外的大记录。此模式提供行级错误跟踪、可配置的跳过错误行为和汇总报告。

错误处理最佳实践
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
type ParseError struct {
Line int
Err error
Raw string
}
func (e *ParseError) Error() string {
return fmt.Sprintf("line %d: %v", e.Line, e.Err)
}
type JSONLReader struct {
scanner *bufio.Scanner
lineNum int
Errors []ParseError
Skipped int
Success int
}
func NewJSONLReader(file *os.File) *JSONLReader {
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
return &JSONLReader{scanner: scanner}
}
func (r *JSONLReader) ReadAll(target any) error {
slice, ok := target.(*[]map[string]any)
if !ok {
return fmt.Errorf("target must be *[]map[string]any")
}
for r.scanner.Scan() {
r.lineNum++
line := r.scanner.Bytes()
// Skip empty lines
if len(line) == 0 {
continue
}
var record map[string]any
if err := json.Unmarshal(line, &record); err != nil {
r.Errors = append(r.Errors, ParseError{
Line: r.lineNum,
Err: err,
Raw: string(line),
})
r.Skipped++
continue
}
*slice = append(*slice, record)
r.Success++
}
return r.scanner.Err()
}
func (r *JSONLReader) Summary() string {
return fmt.Sprintf(
"Lines: %d | Success: %d | Skipped: %d | Errors: %d",
r.lineNum, r.Success, r.Skipped, len(r.Errors),
)
}
func main() {
file, err := os.Open("data.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := NewJSONLReader(file)
var records []map[string]any
if err := reader.ReadAll(&records); err != nil {
log.Fatal(err)
}
fmt.Println(reader.Summary())
for _, e := range reader.Errors {
log.Printf("Parse error at %s", e.Error())
}
}

这个 JSONLReader 结构体跟踪行号,收集包含原始内容的解析错误,并报告成功和失败的汇总信息。10MB 的 scanner 缓冲区可以处理异常大的行而不会 panic。在生产环境中,您可以扩展它以支持 context.Context 用于取消操作、设置最大错误阈值以中止处理,或使用 slog 进行结构化日志记录。

用于 JSONL 处理的 Go 包

Go 的标准库提供了处理 JSONL 所需的所有构建模块。以下是三种核心方式及其适用场景。

bufio.Scanner

灵活

标准的逐行读取器。与 json.Unmarshal 配对进行显式解析。当您需要控制缓冲区大小、想要跳过或检查原始行,或需要行号进行错误报告时最为适用。可处理不超过配置缓冲区大小的行。

json.Decoder

推荐

流式 JSON 解码器,直接从 io.Reader 读取。比 Scanner + Unmarshal 更高效,因为它避免了数据复制。适用于大文件和网络流。自动处理背靠背的 JSON 值,无需显式换行分割。

json.Encoder

写入

流式 JSON 编码器,写入 io.Writer。每次 Encode 调用后自动追加换行符,非常适合 JSONL 输出。与 bufio.Writer 结合使用可实现高吞吐量写入。支持 SetEscapeHTML 和 SetIndent 配置。

试试我们的免费 JSONL 工具

不想写代码?使用我们的免费在线工具直接在浏览器中查看、验证和转换 JSONL 文件。

在线处理 JSONL 文件

直接在浏览器中查看、验证和转换最大 1GB 的 JSONL 文件。无需上传,100% 私密。

常见问题

Go 处理 JSONL — bufio.Scanner、json.Decoder 与并发 | jsonl.co