Go 處理 JSONL:bufio、json.Decoder 與並行處理
在 Go 中使用 JSONL(JSON Lines)檔案的完整指南。學習使用標準函式庫的 bufio、encoding/json 和 goroutines 來讀取、寫入、串流和並行處理 JSONL 資料。
最後更新:2026 年 2 月
為什麼選擇 Go 處理 JSONL?
Go 是處理 JSONL 檔案的絕佳選擇,尤其當效能和並行處理很重要時。標準函式庫提供了你所需的一切:bufio 用於高效的逐行 I/O、encoding/json 用於解析和序列化、goroutines 用於平行處理。不需要任何第三方依賴。Go 的編譯特性意味著你的 JSONL 管線會比同等的 Python 或 Node.js 腳本執行速度快得多,通常在一般硬體上每秒即可處理數百萬行。
JSONL(JSON Lines)每行儲存一個 JSON 物件,非常適合 Go 的串流 I/O 模型。你不需要將整個檔案載入記憶體,而是可以使用 scanner 或 decoder 逐筆讀取和處理記錄。搭配 Go 輕量級的 goroutines 和 channels,你可以建構能充分利用所有 CPU 核心的並行 JSONL 管線。在本指南中,你將學習如何使用 bufio.Scanner 讀取 JSONL、使用 json.Decoder 進行串流處理、高效寫入 JSONL、並行處理記錄,以及健壯地處理錯誤。
使用 bufio.Scanner 讀取 JSONL 檔案
在 Go 中讀取 JSONL 最直接的方式是使用 bufio.Scanner。它逐行讀取檔案,然後你可以用 json.Unmarshal 解析每一行。這種方式讓你完全控制緩衝區大小和錯誤處理。
開啟檔案、建立 scanner,然後逐行迭代。每一行都使用 json.Unmarshal 解析為 map 或 struct。這使得記憶體使用量只與單筆記錄成正比,而非整個檔案。
package mainimport ("bufio""encoding/json""fmt""log""os")func main() {file, err := os.Open("data.jsonl")if err != nil {log.Fatal(err)}defer file.Close()var records []map[string]anyscanner := bufio.NewScanner(file)// Increase buffer for lines longer than 64KBscanner.Buffer(make([]byte, 0, 1024*1024), 1024*1024)for scanner.Scan() {var record map[string]anyif err := json.Unmarshal(scanner.Bytes(), &record); err != nil {log.Printf("skipping invalid JSON: %v", err)continue}records = append(records, record)}if err := scanner.Err(); err != nil {log.Fatal(err)}fmt.Printf("Loaded %d records\n", len(records))}
在正式環境程式碼中,建議將 JSONL 記錄解析為型別化的 Go struct,而不是泛型 map。這提供了編譯期型別安全、更好的效能和更清晰的程式碼。使用 json 標籤定義 struct 來控制欄位對應。
package mainimport ("bufio""encoding/json""fmt""log""os")type User struct {ID int `json:"id"`Name string `json:"name"`Email string `json:"email"`Age int `json:"age,omitempty"`}func readUsers(path string) ([]User, error) {file, err := os.Open(path)if err != nil {return nil, fmt.Errorf("open file: %w", err)}defer file.Close()var users []Userscanner := bufio.NewScanner(file)lineNum := 0for scanner.Scan() {lineNum++var u Userif err := json.Unmarshal(scanner.Bytes(), &u); err != nil {return nil, fmt.Errorf("line %d: %w", lineNum, err)}users = append(users, u)}if err := scanner.Err(); err != nil {return nil, fmt.Errorf("scanner error: %w", err)}return users, nil}func main() {users, err := readUsers("users.jsonl")if err != nil {log.Fatal(err)}for _, u := range users {fmt.Printf("%s (%s)\n", u.Name, u.Email)}}
使用 json.Decoder 進行串流處理
對於高效能串流處理,json.Decoder 直接從 io.Reader 讀取,無需中間的行分割步驟。它內部自行處理緩衝,是處理大型 JSONL 檔案或網路串流時的推薦方式,可將記憶體分配降至最低。
json.NewDecoder 包裝任何 io.Reader 並依序解碼 JSON 值。每次呼叫 Decode 恰好讀取一個 JSON 物件。當串流結束時返回 io.EOF。這比 bufio.Scanner + json.Unmarshal 更高效,因為它避免了將每一行複製到單獨的緩衝區。
package mainimport ("encoding/json""errors""fmt""io""log""os")type LogEntry struct {Timestamp string `json:"timestamp"`Level string `json:"level"`Message string `json:"message"`Service string `json:"service"`}func processJSONLStream(path string) error {file, err := os.Open(path)if err != nil {return fmt.Errorf("open: %w", err)}defer file.Close()decoder := json.NewDecoder(file)var count intfor {var entry LogEntryerr := decoder.Decode(&entry)if errors.Is(err, io.EOF) {break}if err != nil {return fmt.Errorf("decode record %d: %w", count+1, err)}count++// Process each record as it's decodedif entry.Level == "ERROR" {fmt.Printf("[%s] %s: %s\n",entry.Timestamp, entry.Service, entry.Message)}}fmt.Printf("Processed %d log entries\n", count)return nil}func main() {if err := processJSONLStream("logs.jsonl"); err != nil {log.Fatal(err)}}
在 Go 中寫入 JSONL 檔案
在 Go 中寫入 JSONL 非常直接:使用 json.Marshal 序列化每筆記錄、附加換行位元組,然後寫入檔案。為了獲得最佳效能,請將檔案寫入器包裝在 bufio.Writer 中以減少系統呼叫。
使用 json.Marshal 將每筆記錄序列化為 JSON 位元組,然後寫入檔案並附加換行符。這種方式簡單且適用於中小型檔案。
package mainimport ("encoding/json""fmt""log""os")type Product struct {ID int `json:"id"`Name string `json:"name"`Price float64 `json:"price"`}func main() {products := []Product{{ID: 1, Name: "Laptop", Price: 999.99},{ID: 2, Name: "Mouse", Price: 29.99},{ID: 3, Name: "Keyboard", Price: 79.99},}file, err := os.Create("products.jsonl")if err != nil {log.Fatal(err)}defer file.Close()for _, p := range products {data, err := json.Marshal(p)if err != nil {log.Printf("skip marshal error: %v", err)continue}data = append(data, '\n')if _, err := file.Write(data); err != nil {log.Fatal(err)}}fmt.Printf("Wrote %d records to products.jsonl\n", len(products))}
對於高吞吐量寫入,使用 json.NewEncoder 搭配 bufio.Writer。encoder 在每次 Encode 呼叫後會自動附加換行符,而緩衝寫入器會將多次小寫入批次合併為較大的系統呼叫。關閉檔案前務必呼叫 Flush。
package mainimport ("bufio""encoding/json""fmt""log""os")type Event struct {Type string `json:"type"`Timestamp int64 `json:"timestamp"`Payload map[string]any `json:"payload"`}func writeEvents(path string, events []Event) error {file, err := os.Create(path)if err != nil {return fmt.Errorf("create file: %w", err)}defer file.Close()writer := bufio.NewWriter(file)encoder := json.NewEncoder(writer)// Disable HTML escaping for cleaner outputencoder.SetEscapeHTML(false)for i, event := range events {if err := encoder.Encode(event); err != nil {return fmt.Errorf("encode event %d: %w", i, err)}}// Flush buffered data to fileif err := writer.Flush(); err != nil {return fmt.Errorf("flush: %w", err)}fmt.Printf("Wrote %d events to %s\n", len(events), path)return nil}func main() {events := []Event{{Type: "click", Timestamp: 1700000001, Payload: map[string]any{"x": 120, "y": 450}},{Type: "view", Timestamp: 1700000002, Payload: map[string]any{"page": "/home"}},}if err := writeEvents("events.jsonl", events); err != nil {log.Fatal(err)}}
使用 Goroutines 並行處理 JSONL
Go 的 goroutines 和 channels 讓建構並行 JSONL 管線變得簡單。Fan-out 模式從檔案讀取記錄、分發給多個 worker goroutines,並透過 channel 收集結果。這非常適合在大型 JSONL 檔案上進行 CPU 密集型轉換。
package mainimport ("bufio""encoding/json""fmt""log""os""runtime""sync")type Record struct {ID int `json:"id"`Data map[string]any `json:"data"`}type Result struct {ID int `json:"id"`Processed bool `json:"processed"`Score int `json:"score"`}func process(r Record) Result {// Simulate CPU-bound workscore := len(r.Data) * r.IDreturn Result{ID: r.ID, Processed: true, Score: score}}func main() {file, err := os.Open("records.jsonl")if err != nil {log.Fatal(err)}defer file.Close()numWorkers := runtime.NumCPU()jobs := make(chan Record, numWorkers*2)results := make(chan Result, numWorkers*2)// Start workersvar wg sync.WaitGroupfor i := 0; i < numWorkers; i++ {wg.Add(1)go func() {defer wg.Done()for record := range jobs {results <- process(record)}}()}// Close results channel when all workers finishgo func() {wg.Wait()close(results)}()// Read file and send records to workersgo func() {scanner := bufio.NewScanner(file)for scanner.Scan() {var r Recordif err := json.Unmarshal(scanner.Bytes(), &r); err != nil {log.Printf("skip invalid line: %v", err)continue}jobs <- r}close(jobs)}()// Collect and write resultsoutFile, err := os.Create("results.jsonl")if err != nil {log.Fatal(err)}defer outFile.Close()writer := bufio.NewWriter(outFile)encoder := json.NewEncoder(writer)var count intfor result := range results {if err := encoder.Encode(result); err != nil {log.Printf("encode error: %v", err)}count++}writer.Flush()fmt.Printf("Processed %d records with %d workers\n", count, numWorkers)}
這個 fan-out 模式將 JSONL 記錄分發到與 CPU 核心數相等的 goroutines。帶緩衝的 channels 防止 goroutines 在發送或接收時阻塞。sync.WaitGroup 確保所有 workers 完成後才關閉結果 channel。對於 I/O 密集型工作負載(如 API 呼叫),你可以將 numWorkers 設定為超過 CPU 核心數。請注意,並行處理時不保證輸出順序。
錯誤處理最佳實踐
Go 中健壯的 JSONL 處理需要謹慎的錯誤處理。真實世界的 JSONL 檔案可能包含格式錯誤的行、編碼問題或意外的大筆記錄。這個模式提供了行級錯誤追蹤、可設定的跳過錯誤行為和摘要報告。
package mainimport ("bufio""encoding/json""fmt""log""os")type ParseError struct {Line intErr errorRaw string}func (e *ParseError) Error() string {return fmt.Sprintf("line %d: %v", e.Line, e.Err)}type JSONLReader struct {scanner *bufio.ScannerlineNum intErrors []ParseErrorSkipped intSuccess int}func NewJSONLReader(file *os.File) *JSONLReader {scanner := bufio.NewScanner(file)scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)return &JSONLReader{scanner: scanner}}func (r *JSONLReader) ReadAll(target any) error {slice, ok := target.(*[]map[string]any)if !ok {return fmt.Errorf("target must be *[]map[string]any")}for r.scanner.Scan() {r.lineNum++line := r.scanner.Bytes()// Skip empty linesif len(line) == 0 {continue}var record map[string]anyif err := json.Unmarshal(line, &record); err != nil {r.Errors = append(r.Errors, ParseError{Line: r.lineNum,Err: err,Raw: string(line),})r.Skipped++continue}*slice = append(*slice, record)r.Success++}return r.scanner.Err()}func (r *JSONLReader) Summary() string {return fmt.Sprintf("Lines: %d | Success: %d | Skipped: %d | Errors: %d",r.lineNum, r.Success, r.Skipped, len(r.Errors),)}func main() {file, err := os.Open("data.jsonl")if err != nil {log.Fatal(err)}defer file.Close()reader := NewJSONLReader(file)var records []map[string]anyif err := reader.ReadAll(&records); err != nil {log.Fatal(err)}fmt.Println(reader.Summary())for _, e := range reader.Errors {log.Printf("Parse error at %s", e.Error())}}
這個 JSONLReader struct 追蹤行號、收集包含原始內容的解析錯誤,並報告成功和失敗的摘要。10MB 的 scanner 緩衝區可以處理異常大的行而不會發生 panic。在正式環境中,你可能需要擴展此功能,加入 context.Context 支援以便取消操作、設定最大錯誤閾值在超過時中止處理,或使用 slog 進行結構化日誌記錄。
Go 的 JSONL 處理套件
Go 的標準函式庫提供了處理 JSONL 所需的所有基礎元件。以下是三種核心方式及其適用場景。
bufio.Scanner
靈活標準的逐行讀取器。搭配 json.Unmarshal 進行明確的解析。當你需要控制緩衝區大小、想要跳過或檢查原始行資料,或需要行號進行錯誤報告時最為適用。可處理至你設定的緩衝區大小的行。
json.Decoder
推薦串流 JSON 解碼器,直接從 io.Reader 讀取。比 Scanner + Unmarshal 更高效,因為它避免了資料複製。適用於大型檔案和網路串流。自動處理連續的 JSON 值,無需明確的換行分割。
json.Encoder
寫入串流 JSON 編碼器,寫入 io.Writer。每次 Encode 呼叫後自動附加換行符,非常適合 JSONL 輸出。搭配 bufio.Writer 使用可實現高吞吐量寫入。支援 SetEscapeHTML 和 SetIndent 設定。
試試我們的免費 JSONL 工具
不想寫程式?使用我們的免費線上工具,直接在瀏覽器中檢視、驗證和轉換 JSONL 檔案。