Go 处理 JSONL:bufio、json.Decoder 与并发处理
在 Go 中使用 JSONL(JSON Lines)文件的完整指南。学习使用标准库的 bufio、encoding/json 和 goroutines 来读取、写入、流式处理和并发处理 JSONL 数据。
最后更新:2026年2月
为什么选择 Go 处理 JSONL?
Go 是处理 JSONL 文件的绝佳选择,尤其是在性能和并发性至关重要的场景下。标准库提供了所需的一切:bufio 用于高效的逐行 I/O,encoding/json 用于解析和序列化,goroutines 用于并行处理。无需任何第三方依赖。Go 的编译特性意味着您的 JSONL 管道运行速度将远超同等的 Python 或 Node.js 脚本,在普通硬件上通常可以每秒处理数百万行。
JSONL(JSON Lines)每行存储一个 JSON 对象,天然适合 Go 的流式 I/O 模型。无需将整个文件加载到内存中,您可以使用 scanner 或 decoder 逐条读取和处理记录。结合 Go 的轻量级 goroutines 和 channels,您可以构建充分利用所有可用 CPU 核心的并发 JSONL 管道。在本指南中,您将学习如何使用 bufio.Scanner 读取 JSONL、使用 json.Decoder 进行流式处理、高效写入 JSONL、并发处理记录以及健壮地处理错误。
使用 bufio.Scanner 读取 JSONL 文件
在 Go 中读取 JSONL 最直接的方式是使用 bufio.Scanner。它逐行读取文件,然后用 json.Unmarshal 解析每一行。这种方式让您可以完全控制缓冲区大小和错误处理。
打开文件,创建 scanner,逐行迭代。每行通过 json.Unmarshal 解析为 map 或 struct。这样可以将内存使用量保持在单条记录级别,而非整个文件。
package mainimport ("bufio""encoding/json""fmt""log""os")func main() {file, err := os.Open("data.jsonl")if err != nil {log.Fatal(err)}defer file.Close()var records []map[string]anyscanner := bufio.NewScanner(file)// Increase buffer for lines longer than 64KBscanner.Buffer(make([]byte, 0, 1024*1024), 1024*1024)for scanner.Scan() {var record map[string]anyif err := json.Unmarshal(scanner.Bytes(), &record); err != nil {log.Printf("skipping invalid JSON: %v", err)continue}records = append(records, record)}if err := scanner.Err(); err != nil {log.Fatal(err)}fmt.Printf("Loaded %d records\n", len(records))}
在生产代码中,将 JSONL 记录解析为类型化的 Go struct 而非通用 map。这提供了编译时类型安全、更好的性能和更清晰的代码。使用 json 标签定义 struct 以控制字段映射。
package mainimport ("bufio""encoding/json""fmt""log""os")type User struct {ID int `json:"id"`Name string `json:"name"`Email string `json:"email"`Age int `json:"age,omitempty"`}func readUsers(path string) ([]User, error) {file, err := os.Open(path)if err != nil {return nil, fmt.Errorf("open file: %w", err)}defer file.Close()var users []Userscanner := bufio.NewScanner(file)lineNum := 0for scanner.Scan() {lineNum++var u Userif err := json.Unmarshal(scanner.Bytes(), &u); err != nil {return nil, fmt.Errorf("line %d: %w", lineNum, err)}users = append(users, u)}if err := scanner.Err(); err != nil {return nil, fmt.Errorf("scanner error: %w", err)}return users, nil}func main() {users, err := readUsers("users.jsonl")if err != nil {log.Fatal(err)}for _, u := range users {fmt.Printf("%s (%s)\n", u.Name, u.Email)}}
使用 json.Decoder 流式处理
对于高性能流式处理,json.Decoder 直接从 io.Reader 读取,无需中间行分割。它内部处理缓冲,是处理大型 JSONL 文件或网络流的推荐方式,可最大限度减少内存分配。
json.NewDecoder 包装任意 io.Reader 并按顺序解码 JSON 值。每次调用 Decode 恰好读取一个 JSON 对象。当流结束时返回 io.EOF。这比 bufio.Scanner + json.Unmarshal 更高效,因为它避免了将每行复制到单独的缓冲区。
package mainimport ("encoding/json""errors""fmt""io""log""os")type LogEntry struct {Timestamp string `json:"timestamp"`Level string `json:"level"`Message string `json:"message"`Service string `json:"service"`}func processJSONLStream(path string) error {file, err := os.Open(path)if err != nil {return fmt.Errorf("open: %w", err)}defer file.Close()decoder := json.NewDecoder(file)var count intfor {var entry LogEntryerr := decoder.Decode(&entry)if errors.Is(err, io.EOF) {break}if err != nil {return fmt.Errorf("decode record %d: %w", count+1, err)}count++// Process each record as it's decodedif entry.Level == "ERROR" {fmt.Printf("[%s] %s: %s\n",entry.Timestamp, entry.Service, entry.Message)}}fmt.Printf("Processed %d log entries\n", count)return nil}func main() {if err := processJSONLStream("logs.jsonl"); err != nil {log.Fatal(err)}}
在 Go 中写入 JSONL 文件
在 Go 中写入 JSONL 非常简单:使用 json.Marshal 序列化每条记录,追加换行符,然后写入文件。为了获得最佳性能,用 bufio.Writer 包装文件写入器以减少系统调用。
使用 json.Marshal 将每条记录序列化为 JSON 字节,然后在其后写入换行符。这种方式简单直接,适用于中小型文件。
package mainimport ("encoding/json""fmt""log""os")type Product struct {ID int `json:"id"`Name string `json:"name"`Price float64 `json:"price"`}func main() {products := []Product{{ID: 1, Name: "Laptop", Price: 999.99},{ID: 2, Name: "Mouse", Price: 29.99},{ID: 3, Name: "Keyboard", Price: 79.99},}file, err := os.Create("products.jsonl")if err != nil {log.Fatal(err)}defer file.Close()for _, p := range products {data, err := json.Marshal(p)if err != nil {log.Printf("skip marshal error: %v", err)continue}data = append(data, '\n')if _, err := file.Write(data); err != nil {log.Fatal(err)}}fmt.Printf("Wrote %d records to products.jsonl\n", len(products))}
对于高吞吐量写入,使用 json.NewEncoder 配合 bufio.Writer。encoder 在每次 Encode 调用后自动追加换行符,缓冲写入器将小写入批量合并为更大的系统调用。务必在关闭文件前调用 Flush。
package mainimport ("bufio""encoding/json""fmt""log""os")type Event struct {Type string `json:"type"`Timestamp int64 `json:"timestamp"`Payload map[string]any `json:"payload"`}func writeEvents(path string, events []Event) error {file, err := os.Create(path)if err != nil {return fmt.Errorf("create file: %w", err)}defer file.Close()writer := bufio.NewWriter(file)encoder := json.NewEncoder(writer)// Disable HTML escaping for cleaner outputencoder.SetEscapeHTML(false)for i, event := range events {if err := encoder.Encode(event); err != nil {return fmt.Errorf("encode event %d: %w", i, err)}}// Flush buffered data to fileif err := writer.Flush(); err != nil {return fmt.Errorf("flush: %w", err)}fmt.Printf("Wrote %d events to %s\n", len(events), path)return nil}func main() {events := []Event{{Type: "click", Timestamp: 1700000001, Payload: map[string]any{"x": 120, "y": 450}},{Type: "view", Timestamp: 1700000002, Payload: map[string]any{"page": "/home"}},}if err := writeEvents("events.jsonl", events); err != nil {log.Fatal(err)}}
使用 Goroutines 并发处理 JSONL
Go 的 goroutines 和 channels 使构建并发 JSONL 管道变得简单。fan-out 模式从文件中读取记录,将它们分发到多个 worker goroutines,并通过 channel 收集结果。这非常适合在大型 JSONL 文件上执行 CPU 密集型转换。
package mainimport ("bufio""encoding/json""fmt""log""os""runtime""sync")type Record struct {ID int `json:"id"`Data map[string]any `json:"data"`}type Result struct {ID int `json:"id"`Processed bool `json:"processed"`Score int `json:"score"`}func process(r Record) Result {// Simulate CPU-bound workscore := len(r.Data) * r.IDreturn Result{ID: r.ID, Processed: true, Score: score}}func main() {file, err := os.Open("records.jsonl")if err != nil {log.Fatal(err)}defer file.Close()numWorkers := runtime.NumCPU()jobs := make(chan Record, numWorkers*2)results := make(chan Result, numWorkers*2)// Start workersvar wg sync.WaitGroupfor i := 0; i < numWorkers; i++ {wg.Add(1)go func() {defer wg.Done()for record := range jobs {results <- process(record)}}()}// Close results channel when all workers finishgo func() {wg.Wait()close(results)}()// Read file and send records to workersgo func() {scanner := bufio.NewScanner(file)for scanner.Scan() {var r Recordif err := json.Unmarshal(scanner.Bytes(), &r); err != nil {log.Printf("skip invalid line: %v", err)continue}jobs <- r}close(jobs)}()// Collect and write resultsoutFile, err := os.Create("results.jsonl")if err != nil {log.Fatal(err)}defer outFile.Close()writer := bufio.NewWriter(outFile)encoder := json.NewEncoder(writer)var count intfor result := range results {if err := encoder.Encode(result); err != nil {log.Printf("encode error: %v", err)}count++}writer.Flush()fmt.Printf("Processed %d records with %d workers\n", count, numWorkers)}
这种 fan-out 模式将 JSONL 记录分发到与 CPU 核心数量相等的 goroutines 中。带缓冲的 channels 防止 goroutines 在发送或接收时阻塞。sync.WaitGroup 确保所有 workers 完成后才关闭 results channel。对于 I/O 密集型工作负载(如 API 调用),您可以将 numWorkers 增加到超过 CPU 核心数。请注意,并发处理时输出顺序无法保证。
错误处理最佳实践
在 Go 中进行健壮的 JSONL 处理需要仔细的错误处理。实际的 JSONL 文件可能包含格式错误的行、编码问题或意外的大记录。此模式提供行级错误跟踪、可配置的跳过错误行为和汇总报告。
package mainimport ("bufio""encoding/json""fmt""log""os")type ParseError struct {Line intErr errorRaw string}func (e *ParseError) Error() string {return fmt.Sprintf("line %d: %v", e.Line, e.Err)}type JSONLReader struct {scanner *bufio.ScannerlineNum intErrors []ParseErrorSkipped intSuccess int}func NewJSONLReader(file *os.File) *JSONLReader {scanner := bufio.NewScanner(file)scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)return &JSONLReader{scanner: scanner}}func (r *JSONLReader) ReadAll(target any) error {slice, ok := target.(*[]map[string]any)if !ok {return fmt.Errorf("target must be *[]map[string]any")}for r.scanner.Scan() {r.lineNum++line := r.scanner.Bytes()// Skip empty linesif len(line) == 0 {continue}var record map[string]anyif err := json.Unmarshal(line, &record); err != nil {r.Errors = append(r.Errors, ParseError{Line: r.lineNum,Err: err,Raw: string(line),})r.Skipped++continue}*slice = append(*slice, record)r.Success++}return r.scanner.Err()}func (r *JSONLReader) Summary() string {return fmt.Sprintf("Lines: %d | Success: %d | Skipped: %d | Errors: %d",r.lineNum, r.Success, r.Skipped, len(r.Errors),)}func main() {file, err := os.Open("data.jsonl")if err != nil {log.Fatal(err)}defer file.Close()reader := NewJSONLReader(file)var records []map[string]anyif err := reader.ReadAll(&records); err != nil {log.Fatal(err)}fmt.Println(reader.Summary())for _, e := range reader.Errors {log.Printf("Parse error at %s", e.Error())}}
这个 JSONLReader 结构体跟踪行号,收集包含原始内容的解析错误,并报告成功和失败的汇总信息。10MB 的 scanner 缓冲区可以处理异常大的行而不会 panic。在生产环境中,您可以扩展它以支持 context.Context 用于取消操作、设置最大错误阈值以中止处理,或使用 slog 进行结构化日志记录。
用于 JSONL 处理的 Go 包
Go 的标准库提供了处理 JSONL 所需的所有构建模块。以下是三种核心方式及其适用场景。
bufio.Scanner
灵活标准的逐行读取器。与 json.Unmarshal 配对进行显式解析。当您需要控制缓冲区大小、想要跳过或检查原始行,或需要行号进行错误报告时最为适用。可处理不超过配置缓冲区大小的行。
json.Decoder
推荐流式 JSON 解码器,直接从 io.Reader 读取。比 Scanner + Unmarshal 更高效,因为它避免了数据复制。适用于大文件和网络流。自动处理背靠背的 JSON 值,无需显式换行分割。
json.Encoder
写入流式 JSON 编码器,写入 io.Writer。每次 Encode 调用后自动追加换行符,非常适合 JSONL 输出。与 bufio.Writer 结合使用可实现高吞吐量写入。支持 SetEscapeHTML 和 SetIndent 配置。
试试我们的免费 JSONL 工具
不想写代码?使用我们的免费在线工具直接在浏览器中查看、验证和转换 JSONL 文件。