Go 处理 CSV 的两个场景

有两种处理 CSV 的场景,我分别都封装了相应的处理代码,这里总结一下。

分割写 CSV 文件

场景:处理结果保存为 CSV 文件有 20GB,需要将其切割为 600 MB 的一份份小 CSV 文件。

实现思路比较简单,就是在读数据的时候记录读取的大小,当超过阈值的时候就创建新的文件继续写,而使用者只需要传入创建 io.WriteCloser 的方法就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// SplitCSVWriter 对写的 csv 进行拆分写
type SplitCSVWriter struct {
limitedSize int
currSize int
writer *csv.Writer
rawF io.WriteCloser
newRawWriter func() (io.WriteCloser, error)
}

func NewSplitCSVWriter(limitedSize int, newRawWriter func() (io.WriteCloser, error)) *SplitCSVWriter {
return &SplitCSVWriter{limitedSize: limitedSize, newRawWriter: newRawWriter}
}

func (s *SplitCSVWriter) Write(record []string) (err error) {
estimateSize := len(record) // , 和 \n
for i := range record {
estimateSize += len(record[i])
}
s.currSize += estimateSize
if s.currSize > s.limitedSize {
// 暂时不管 estimateSize > limitedSize 的情况,不对 p 的数据进行截断操作
s.writer.Flush()
if err = s.writer.Error(); err != nil {
return err
}
if err = s.rawF.Close(); err != nil {
return err
}
s.writer, s.rawF = nil, nil
s.currSize = estimateSize
}
if s.writer == nil {
if s.rawF, err = s.newRawWriter(); err != nil {
return err
}
s.writer = csv.NewWriter(s.rawF)
}
if err = s.writer.Write(record); err != nil {
return err
}
return nil
}

func (s *SplitCSVWriter) Flush() (err error) {
if s.writer == nil {
return nil
}
s.writer.Flush()
if err = s.writer.Error(); err != nil {
return err
}
if err = s.rawF.Close(); err != nil {
return err
}
return nil
}

下面是使用用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func main() {
genName := FileNameIncre("inventory_"+fileDir+"_%s.csv", 4)
writer := NewSplitCSVWriter(fileSize, func() (io.WriteCloser, error) {
return os.OpenFile(path.Join(outputTrueDir, genName()), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0600)
})
csvReader := csv.NewReader(reader)
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
if err := writer.Write(record); err != nil {
panic(err)
}
}
if err := writer.Flush(); err != nil {
log.Println("flush", err)
}
}

func FileNameIncre(nameTempl string, maxSize int) func() string {
counter := 0
return func() string {
defer func() {
counter++
}()
return fmt.Sprintf(nameTempl, Num2Str(counter, maxSize))
}
}

读取 CSV 数据并容错

场景:从网络下载 CSV 文件,希望可以边下载边读取数据,这很简单,只需要将 http 请求的 body 作为 io.Reader 传入 csv.NewReader 中即可。

但是网络是不稳定的,尤其是传输大文件时,可能出现错误。出现错误不要紧,重试就可以了,但是需要保存处理数据的进度。

好在 CSV 可以以行为单位记录处理的进度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// CSVLineProcessor 处理 csv 每行数据,允许
type CSVLineProcessor struct {
processedLine int
currLine int
processFunc func(record []string) error
}

func NewCSVLineProcessor(processFunc func(record []string) error) *CSVLineProcessor {
return &CSVLineProcessor{processFunc: processFunc, processedLine: -1, currLine: 0}
}

func (p *CSVLineProcessor) Reset() {
p.currLine = 0
}

func (p *CSVLineProcessor) Process(record []string) error {
if p.currLine <= p.processedLine {
p.currLine++
return nil
}
if err := p.processFunc(record); err != nil {
return err
}
p.processedLine = p.currLine
p.currLine++
return nil
}

下面是使用例子,在网络请求出错的时候 Reset 一下就 ok 了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func HandleObject(client *tos.Tos, objKey string, processorHandler *ObjCSVProcessHandler) error {
var csvLineProcessor = NewCSVLineProcessor(func(record []string) error {
return processorHandler.ProcessLine(record)
})
if err := Retry(context.TODO(), func() (stop bool, err error) {
ctx, cancel := context.WithTimeout(context.TODO(), defaultContextTimeout)
defer cancel()
resp, err := client.GetObject(ctx, objKey)
if err != nil {
return false, fmt.Errorf("get object %s: %v", objKey, err)
}
defer resp.R.Close()
reader := csv.NewReader(resp.R)
reader.LazyQuotes = true
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
csvLineProcessor.Reset()
return false, fmt.Errorf("read object data %s: %v", objKey, err)
}
if err := csvLineProcessor.Process(record); err != nil {
csvLineProcessor.Reset()
return true, err
}
}
return true, nil
}, WithRetryTimes(retryTimes)); err != nil {
return err
}
return nil
}