Golang 中的 Pipe 使用

主要记录一下 io.Pipeos.Pipe 的使用

Golang 版本为 1.13 ,实验环境为 MacOS

io.Pipe

基础

1
func Pipe() (*PipeReader, *PipeWriter)

如果阅读其源代码,它内部有一个 pipe 对象,将其 Read 相关功能和 Write 相关功能拆分成了 PipeReaderPipeWriter 暴露出去。

PipeReader.Read() 方法写入数据,PipeWriter.Write() 读取数据。由于其实现了接口 io.ReadCloserio.WriteCloser 。所以可以使用更高层的工具函数来操作它们。

对于 PipeReader ,可以使用 bytes.Buffer.ReadFrom 或者 bufio.NewScanner

对于 PipeWriter 可以使用 fmt.Fprintf() 或者 bufio.NewWriter


简单例子

这里写一个简单的例子,在 goroutine 中,每隔 1 秒向 PipeWriter 中写入当前的时间,从 PipeReader 读取写入的数据。不使用任何工具函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
r, w := io.Pipe()

go func() {
for {
time.Sleep(time.Second)
w.Write([]byte(time.Now().String()))
}
}()
for {
dataRead := make([]byte, 256)
n, _ := r.Read(dataRead)
fmt.Println(string(dataRead[:n]))
}
}

输出如下

1
2
3
4
2020-07-05 17:44:21.674781 +0800 CST m=+1.005118043
2020-07-05 17:44:22.677565 +0800 CST m=+2.007942604
2020-07-05 17:44:23.680083 +0800 CST m=+3.010501799
...

使用高级函数操作

使用 bufio.NewWriter 封装一层 PipeWriter ,使用 bufio.NewScanner 封装一层 PipeReader 实现跟上述代码相同的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
r, w := io.Pipe()

go func() {
writer := bufio.NewWriter(w)
for {
time.Sleep(time.Second)
writer.WriteString(time.Now().String() + "\n")
// writer.Flush()
}
}()
scanner := bufio.NewScanner(r)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}

writer.Flush() 如果被注释掉的话,会有很长一段时间无数据输出。查看源码发现,其默认的 buffer 大小为 4096 只有当存入的数据大小大于 4096 比特时,它才会调用 PipeWriter.Write() 写入数据。解决方案就是把注释的那行解除注释,每次写入时清空一下缓存就可以了。

其实上面的代码还是有问题的

假设只希望输出限定个数的时间呢?比如说 3 次。把上面的无限次循环改成优先次不就行了嘛。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
go func() {
writer := bufio.NewWriter(w)
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
writer.WriteString(time.Now().String() + "\n")
writer.Flush()
}
// writer.Close()
}()
scanner := bufio.NewScanner(r)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}

执行代码,发现最后出现了死锁的情况,定位到出现死锁的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
// io/pipe.go#L50
func (p *pipe) Read(b []byte) (n int, err error) {
// ...
// here
select {
case bw := <-p.wrCh: // []byte
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}

应该是这个的问题<-p.wrCh 。这个 channel 用于接收从 pipe.Write() 中发送来的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// io/pipe.go#L88
func (p *pipe) Write(b []byte) (n int, err error) {
// ...
for once := true; once || len(b) > 0; once = false {
select {
// here
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}

但是由于数据已经全部发送完毕,所以这里的 Write 函数已经退出了。所以 case bw := <-p.wrCh 不可能得到数据了。那么就只能希望从 case <-p.done 中获得数据,那个就是调用 pipe.CloseWrite() 或者 pipe.CloseRead() ,也就是调用 PipeWriter.Close() 或者调用 PipeReader.Close()

把那段代码中的 writer.Close() 解除注释就可以了。

由于 pipe.CloseWrite()pipe.CloseRead() 是一样的,所以 PipeWriterPipeReader 只需要调用一次即可。


os.Pipe

1
func Pipe() (r *File, w *File, err error)

这个是对操作系统文件句柄的封装。现在使用这个函数实现和上面类似的功能

inRinW 这一对 Pipe 用于向启动的子进程中传入数据

outRoutW 这一对 Pipe 用于子进程向外部输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func main() {
inR, inW, _ := os.Pipe()
outR, outW, _ := os.Pipe()
done := make(chan struct{})

process, _ := os.StartProcess("/bin/sh", nil, &os.ProcAttr{
Files: []*os.File{inR, outW, outW}})

go func() {
writer := bufio.NewWriter(inW)
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
writer.WriteString("date\n")
writer.Flush()
}
inW.Close()
outW.Close()
}()
go func() {
scanner := bufio.NewScanner(outR)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
process.Signal(os.Kill)
done <- struct{}{}
fmt.Println("finish")
}()

process.Wait()
<-done

}

输出类似于如下数据

1
2
3
4
Sun Jul  5 18:36:52 CST 2020
Sun Jul 5 18:36:53 CST 2020
Sun Jul 5 18:36:54 CST 2020
finish

另一个比较简单的例子是调用 ls 查看当前目录,这里使用 bytes.Buffer.ReadFrom 读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
inR, inW, _ := os.Pipe()
outR, outW, _ := os.Pipe()
dir, _ := filepath.Abs(filepath.Dir(os.Args[0]))

process, _ := os.StartProcess("/bin/sh", nil, &os.ProcAttr{
Files: []*os.File{inR, outW, outW},
Dir: dir,
})

go func() {
writer := bufio.NewWriter(inW)
writer.WriteString("ls -a")
writer.Flush()
inW.Close()
outW.Close()
}()

process.Wait()
buffer := new(bytes.Buffer)
buffer.ReadFrom(outR)
fmt.Println(buffer.String())
}