有两种处理 CSV 的场景,我分别都封装了相应的处理代码,这里总结一下。
分割写 CSV 文件
场景:处理结果保存为 CSV 文件有 20GB,需要将其切割为 600 MB 的一份份小 CSV 文件。
实现思路比较简单,就是在读数据的时候记录读取的大小,当超过阈值的时候就创建新的文件继续写,而使用者只需要传入创建 io.WriteCloser
的方法就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| type SplitCSVWriter struct { limitedSize int currSize int writer *csv.Writer rawF io.WriteCloser newRawWriter func() (io.WriteCloser, error) }
func NewSplitCSVWriter(limitedSize int, newRawWriter func() (io.WriteCloser, error)) *SplitCSVWriter { return &SplitCSVWriter{limitedSize: limitedSize, newRawWriter: newRawWriter} }
func (s *SplitCSVWriter) Write(record []string) (err error) { estimateSize := len(record) for i := range record { estimateSize += len(record[i]) } s.currSize += estimateSize if s.currSize > s.limitedSize { s.writer.Flush() if err = s.writer.Error(); err != nil { return err } if err = s.rawF.Close(); err != nil { return err } s.writer, s.rawF = nil, nil s.currSize = estimateSize } if s.writer == nil { if s.rawF, err = s.newRawWriter(); err != nil { return err } s.writer = csv.NewWriter(s.rawF) } if err = s.writer.Write(record); err != nil { return err } return nil }
func (s *SplitCSVWriter) Flush() (err error) { if s.writer == nil { return nil } s.writer.Flush() if err = s.writer.Error(); err != nil { return err } if err = s.rawF.Close(); err != nil { return err } return nil }
|
下面是使用用例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func main() { genName := FileNameIncre("inventory_"+fileDir+"_%s.csv", 4) writer := NewSplitCSVWriter(fileSize, func() (io.WriteCloser, error) { return os.OpenFile(path.Join(outputTrueDir, genName()), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0600) }) csvReader := csv.NewReader(reader) for { record, err := csvReader.Read() if err == io.EOF { break } if err != nil { panic(err) } if err := writer.Write(record); err != nil { panic(err) } } if err := writer.Flush(); err != nil { log.Println("flush", err) } }
func FileNameIncre(nameTempl string, maxSize int) func() string { counter := 0 return func() string { defer func() { counter++ }() return fmt.Sprintf(nameTempl, Num2Str(counter, maxSize)) } }
|
读取 CSV 数据并容错
场景:从网络下载 CSV 文件,希望可以边下载边读取数据,这很简单,只需要将 http 请求的 body 作为 io.Reader 传入 csv.NewReader
中即可。
但是网络是不稳定的,尤其是传输大文件时,可能出现错误。出现错误不要紧,重试就可以了,但是需要保存处理数据的进度。
好在 CSV 可以以行为单位记录处理的进度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| type CSVLineProcessor struct { processedLine int currLine int processFunc func(record []string) error }
func NewCSVLineProcessor(processFunc func(record []string) error) *CSVLineProcessor { return &CSVLineProcessor{processFunc: processFunc, processedLine: -1, currLine: 0} }
func (p *CSVLineProcessor) Reset() { p.currLine = 0 }
func (p *CSVLineProcessor) Process(record []string) error { if p.currLine <= p.processedLine { p.currLine++ return nil } if err := p.processFunc(record); err != nil { return err } p.processedLine = p.currLine p.currLine++ return nil }
|
下面是使用例子,在网络请求出错的时候 Reset
一下就 ok 了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func HandleObject(client *tos.Tos, objKey string, processorHandler *ObjCSVProcessHandler) error { var csvLineProcessor = NewCSVLineProcessor(func(record []string) error { return processorHandler.ProcessLine(record) }) if err := Retry(context.TODO(), func() (stop bool, err error) { ctx, cancel := context.WithTimeout(context.TODO(), defaultContextTimeout) defer cancel() resp, err := client.GetObject(ctx, objKey) if err != nil { return false, fmt.Errorf("get object %s: %v", objKey, err) } defer resp.R.Close() reader := csv.NewReader(resp.R) reader.LazyQuotes = true for { record, err := reader.Read() if err == io.EOF { break } if err != nil { csvLineProcessor.Reset() return false, fmt.Errorf("read object data %s: %v", objKey, err) } if err := csvLineProcessor.Process(record); err != nil { csvLineProcessor.Reset() return true, err } } return true, nil }, WithRetryTimes(retryTimes)); err != nil { return err } return nil }
|