Go 處理 JSONL:bufio、json.Decoder 與並行處理

在 Go 中使用 JSONL(JSON Lines)檔案的完整指南。學習使用標準函式庫的 bufio、encoding/json 和 goroutines 來讀取、寫入、串流和並行處理 JSONL 資料。

最後更新:2026 年 2 月

為什麼選擇 Go 處理 JSONL?

Go 是處理 JSONL 檔案的絕佳選擇,尤其當效能和並行處理很重要時。標準函式庫提供了你所需的一切:bufio 用於高效的逐行 I/O、encoding/json 用於解析和序列化、goroutines 用於平行處理。不需要任何第三方依賴。Go 的編譯特性意味著你的 JSONL 管線會比同等的 Python 或 Node.js 腳本執行速度快得多,通常在一般硬體上每秒即可處理數百萬行。

JSONL(JSON Lines)每行儲存一個 JSON 物件,非常適合 Go 的串流 I/O 模型。你不需要將整個檔案載入記憶體,而是可以使用 scanner 或 decoder 逐筆讀取和處理記錄。搭配 Go 輕量級的 goroutines 和 channels,你可以建構能充分利用所有 CPU 核心的並行 JSONL 管線。在本指南中,你將學習如何使用 bufio.Scanner 讀取 JSONL、使用 json.Decoder 進行串流處理、高效寫入 JSONL、並行處理記錄,以及健壯地處理錯誤。

使用 bufio.Scanner 讀取 JSONL 檔案

在 Go 中讀取 JSONL 最直接的方式是使用 bufio.Scanner。它逐行讀取檔案,然後你可以用 json.Unmarshal 解析每一行。這種方式讓你完全控制緩衝區大小和錯誤處理。

開啟檔案、建立 scanner,然後逐行迭代。每一行都使用 json.Unmarshal 解析為 map 或 struct。這使得記憶體使用量只與單筆記錄成正比,而非整個檔案。

使用 bufio.Scanner 基本讀取
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
func main() {
file, err := os.Open("data.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
var records []map[string]any
scanner := bufio.NewScanner(file)
// Increase buffer for lines longer than 64KB
scanner.Buffer(make([]byte, 0, 1024*1024), 1024*1024)
for scanner.Scan() {
var record map[string]any
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
log.Printf("skipping invalid JSON: %v", err)
continue
}
records = append(records, record)
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
fmt.Printf("Loaded %d records\n", len(records))
}

在正式環境程式碼中,建議將 JSONL 記錄解析為型別化的 Go struct,而不是泛型 map。這提供了編譯期型別安全、更好的效能和更清晰的程式碼。使用 json 標籤定義 struct 來控制欄位對應。

解析為型別化 Struct
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age,omitempty"`
}
func readUsers(path string) ([]User, error) {
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
defer file.Close()
var users []User
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
lineNum++
var u User
if err := json.Unmarshal(scanner.Bytes(), &u); err != nil {
return nil, fmt.Errorf("line %d: %w", lineNum, err)
}
users = append(users, u)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("scanner error: %w", err)
}
return users, nil
}
func main() {
users, err := readUsers("users.jsonl")
if err != nil {
log.Fatal(err)
}
for _, u := range users {
fmt.Printf("%s (%s)\n", u.Name, u.Email)
}
}

使用 json.Decoder 進行串流處理

對於高效能串流處理,json.Decoder 直接從 io.Reader 讀取,無需中間的行分割步驟。它內部自行處理緩衝,是處理大型 JSONL 檔案或網路串流時的推薦方式,可將記憶體分配降至最低。

json.NewDecoder 包裝任何 io.Reader 並依序解碼 JSON 值。每次呼叫 Decode 恰好讀取一個 JSON 物件。當串流結束時返回 io.EOF。這比 bufio.Scanner + json.Unmarshal 更高效,因為它避免了將每一行複製到單獨的緩衝區。

使用 json.Decoder 串流處理
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
)
type LogEntry struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Service string `json:"service"`
}
func processJSONLStream(path string) error {
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer file.Close()
decoder := json.NewDecoder(file)
var count int
for {
var entry LogEntry
err := decoder.Decode(&entry)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("decode record %d: %w", count+1, err)
}
count++
// Process each record as it's decoded
if entry.Level == "ERROR" {
fmt.Printf("[%s] %s: %s\n",
entry.Timestamp, entry.Service, entry.Message)
}
}
fmt.Printf("Processed %d log entries\n", count)
return nil
}
func main() {
if err := processJSONLStream("logs.jsonl"); err != nil {
log.Fatal(err)
}
}

在 Go 中寫入 JSONL 檔案

在 Go 中寫入 JSONL 非常直接:使用 json.Marshal 序列化每筆記錄、附加換行位元組,然後寫入檔案。為了獲得最佳效能,請將檔案寫入器包裝在 bufio.Writer 中以減少系統呼叫。

使用 json.Marshal 將每筆記錄序列化為 JSON 位元組,然後寫入檔案並附加換行符。這種方式簡單且適用於中小型檔案。

使用 json.Marshal 基本寫入
package main
import (
"encoding/json"
"fmt"
"log"
"os"
)
type Product struct {
ID int `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
}
func main() {
products := []Product{
{ID: 1, Name: "Laptop", Price: 999.99},
{ID: 2, Name: "Mouse", Price: 29.99},
{ID: 3, Name: "Keyboard", Price: 79.99},
}
file, err := os.Create("products.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
for _, p := range products {
data, err := json.Marshal(p)
if err != nil {
log.Printf("skip marshal error: %v", err)
continue
}
data = append(data, '\n')
if _, err := file.Write(data); err != nil {
log.Fatal(err)
}
}
fmt.Printf("Wrote %d records to products.jsonl\n", len(products))
}

對於高吞吐量寫入,使用 json.NewEncoder 搭配 bufio.Writer。encoder 在每次 Encode 呼叫後會自動附加換行符,而緩衝寫入器會將多次小寫入批次合併為較大的系統呼叫。關閉檔案前務必呼叫 Flush。

使用 json.Encoder 緩衝寫入
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
type Event struct {
Type string `json:"type"`
Timestamp int64 `json:"timestamp"`
Payload map[string]any `json:"payload"`
}
func writeEvents(path string, events []Event) error {
file, err := os.Create(path)
if err != nil {
return fmt.Errorf("create file: %w", err)
}
defer file.Close()
writer := bufio.NewWriter(file)
encoder := json.NewEncoder(writer)
// Disable HTML escaping for cleaner output
encoder.SetEscapeHTML(false)
for i, event := range events {
if err := encoder.Encode(event); err != nil {
return fmt.Errorf("encode event %d: %w", i, err)
}
}
// Flush buffered data to file
if err := writer.Flush(); err != nil {
return fmt.Errorf("flush: %w", err)
}
fmt.Printf("Wrote %d events to %s\n", len(events), path)
return nil
}
func main() {
events := []Event{
{Type: "click", Timestamp: 1700000001, Payload: map[string]any{"x": 120, "y": 450}},
{Type: "view", Timestamp: 1700000002, Payload: map[string]any{"page": "/home"}},
}
if err := writeEvents("events.jsonl", events); err != nil {
log.Fatal(err)
}
}

使用 Goroutines 並行處理 JSONL

Go 的 goroutines 和 channels 讓建構並行 JSONL 管線變得簡單。Fan-out 模式從檔案讀取記錄、分發給多個 worker goroutines,並透過 channel 收集結果。這非常適合在大型 JSONL 檔案上進行 CPU 密集型轉換。

使用 Goroutines 並行處理 JSONL
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"runtime"
"sync"
)
type Record struct {
ID int `json:"id"`
Data map[string]any `json:"data"`
}
type Result struct {
ID int `json:"id"`
Processed bool `json:"processed"`
Score int `json:"score"`
}
func process(r Record) Result {
// Simulate CPU-bound work
score := len(r.Data) * r.ID
return Result{ID: r.ID, Processed: true, Score: score}
}
func main() {
file, err := os.Open("records.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
numWorkers := runtime.NumCPU()
jobs := make(chan Record, numWorkers*2)
results := make(chan Result, numWorkers*2)
// Start workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for record := range jobs {
results <- process(record)
}
}()
}
// Close results channel when all workers finish
go func() {
wg.Wait()
close(results)
}()
// Read file and send records to workers
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var r Record
if err := json.Unmarshal(scanner.Bytes(), &r); err != nil {
log.Printf("skip invalid line: %v", err)
continue
}
jobs <- r
}
close(jobs)
}()
// Collect and write results
outFile, err := os.Create("results.jsonl")
if err != nil {
log.Fatal(err)
}
defer outFile.Close()
writer := bufio.NewWriter(outFile)
encoder := json.NewEncoder(writer)
var count int
for result := range results {
if err := encoder.Encode(result); err != nil {
log.Printf("encode error: %v", err)
}
count++
}
writer.Flush()
fmt.Printf("Processed %d records with %d workers\n", count, numWorkers)
}

這個 fan-out 模式將 JSONL 記錄分發到與 CPU 核心數相等的 goroutines。帶緩衝的 channels 防止 goroutines 在發送或接收時阻塞。sync.WaitGroup 確保所有 workers 完成後才關閉結果 channel。對於 I/O 密集型工作負載(如 API 呼叫),你可以將 numWorkers 設定為超過 CPU 核心數。請注意,並行處理時不保證輸出順序。

錯誤處理最佳實踐

Go 中健壯的 JSONL 處理需要謹慎的錯誤處理。真實世界的 JSONL 檔案可能包含格式錯誤的行、編碼問題或意外的大筆記錄。這個模式提供了行級錯誤追蹤、可設定的跳過錯誤行為和摘要報告。

錯誤處理最佳實踐
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
type ParseError struct {
Line int
Err error
Raw string
}
func (e *ParseError) Error() string {
return fmt.Sprintf("line %d: %v", e.Line, e.Err)
}
type JSONLReader struct {
scanner *bufio.Scanner
lineNum int
Errors []ParseError
Skipped int
Success int
}
func NewJSONLReader(file *os.File) *JSONLReader {
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
return &JSONLReader{scanner: scanner}
}
func (r *JSONLReader) ReadAll(target any) error {
slice, ok := target.(*[]map[string]any)
if !ok {
return fmt.Errorf("target must be *[]map[string]any")
}
for r.scanner.Scan() {
r.lineNum++
line := r.scanner.Bytes()
// Skip empty lines
if len(line) == 0 {
continue
}
var record map[string]any
if err := json.Unmarshal(line, &record); err != nil {
r.Errors = append(r.Errors, ParseError{
Line: r.lineNum,
Err: err,
Raw: string(line),
})
r.Skipped++
continue
}
*slice = append(*slice, record)
r.Success++
}
return r.scanner.Err()
}
func (r *JSONLReader) Summary() string {
return fmt.Sprintf(
"Lines: %d | Success: %d | Skipped: %d | Errors: %d",
r.lineNum, r.Success, r.Skipped, len(r.Errors),
)
}
func main() {
file, err := os.Open("data.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := NewJSONLReader(file)
var records []map[string]any
if err := reader.ReadAll(&records); err != nil {
log.Fatal(err)
}
fmt.Println(reader.Summary())
for _, e := range reader.Errors {
log.Printf("Parse error at %s", e.Error())
}
}

這個 JSONLReader struct 追蹤行號、收集包含原始內容的解析錯誤,並報告成功和失敗的摘要。10MB 的 scanner 緩衝區可以處理異常大的行而不會發生 panic。在正式環境中,你可能需要擴展此功能,加入 context.Context 支援以便取消操作、設定最大錯誤閾值在超過時中止處理,或使用 slog 進行結構化日誌記錄。

Go 的 JSONL 處理套件

Go 的標準函式庫提供了處理 JSONL 所需的所有基礎元件。以下是三種核心方式及其適用場景。

bufio.Scanner

靈活

標準的逐行讀取器。搭配 json.Unmarshal 進行明確的解析。當你需要控制緩衝區大小、想要跳過或檢查原始行資料,或需要行號進行錯誤報告時最為適用。可處理至你設定的緩衝區大小的行。

json.Decoder

推薦

串流 JSON 解碼器,直接從 io.Reader 讀取。比 Scanner + Unmarshal 更高效,因為它避免了資料複製。適用於大型檔案和網路串流。自動處理連續的 JSON 值,無需明確的換行分割。

json.Encoder

寫入

串流 JSON 編碼器,寫入 io.Writer。每次 Encode 呼叫後自動附加換行符,非常適合 JSONL 輸出。搭配 bufio.Writer 使用可實現高吞吐量寫入。支援 SetEscapeHTML 和 SetIndent 設定。

試試我們的免費 JSONL 工具

不想寫程式?使用我們的免費線上工具,直接在瀏覽器中檢視、驗證和轉換 JSONL 檔案。

線上處理 JSONL 檔案

在瀏覽器中即時檢視、驗證和轉換高達 1GB 的 JSONL 檔案。無需上傳,100% 隱私保護。

常見問題

Go 處理 JSONL — bufio.Scanner、json.Decoder 與並行處理 | jsonl.co