主要记录一下 io.Pipe
和 os.Pipe
的使用
Golang 版本为 1.13 ,实验环境为 MacOS
io.Pipe 基础 1 func Pipe () (*PipeReader, *PipeWriter)
如果阅读其源代码,它内部有一个 pipe
对象,将其 Read
相关功能和 Write
相关功能拆分成了 PipeReader
和 PipeWriter
暴露出去。
PipeReader.Read()
方法写入数据,PipeWriter.Write()
读取数据。由于其实现了接口 io.ReadCloser
和 io.WriteCloser
。所以可以使用更高层的工具函数来操作它们。
对于 PipeReader
,可以使用 bytes.Buffer.ReadFrom
或者 bufio.NewScanner
。
对于 PipeWriter
可以使用 fmt.Fprintf()
或者 bufio.NewWriter
简单例子 这里写一个简单的例子,在 goroutine 中,每隔 1 秒向 PipeWriter
中写入当前的时间,从 PipeReader
读取写入的数据。不使用任何工具函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func main () { r, w := io.Pipe() go func () { for { time.Sleep(time.Second) w.Write([]byte (time.Now().String())) } }() for { dataRead := make ([]byte , 256 ) n, _ := r.Read(dataRead) fmt.Println(string (dataRead[:n])) } }
输出如下
1 2 3 4 2020-07-05 17:44:21.674781 +0800 CST m=+1.005118043 2020-07-05 17:44:22.677565 +0800 CST m=+2.007942604 2020-07-05 17:44:23.680083 +0800 CST m=+3.010501799 ...
使用高级函数操作 使用 bufio.NewWriter
封装一层 PipeWriter
,使用 bufio.NewScanner
封装一层 PipeReader
实现跟上述代码相同的功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func main () { r, w := io.Pipe() go func () { writer := bufio.NewWriter(w) for { time.Sleep(time.Second) writer.WriteString(time.Now().String() + "\n" ) } }() scanner := bufio.NewScanner(r) for scanner.Scan() { fmt.Println(scanner.Text()) } }
writer.Flush()
如果被注释掉的话,会有很长一段时间无数据输出。查看源码 发现,其默认的 buffer 大小为 4096
只有当存入的数据大小大于 4096
比特时,它才会调用 PipeWriter.Write()
写入数据。解决方案就是把注释的那行解除注释,每次写入时清空一下缓存就可以了。
其实上面的代码还是有问题的
假设只希望输出限定个数的时间呢?比如说 3 次。把上面的无限次循环改成优先次不就行了嘛。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func main () { go func () { writer := bufio.NewWriter(w) for i := 0 ; i < 3 ; i++ { time.Sleep(time.Second) writer.WriteString(time.Now().String() + "\n" ) writer.Flush() } }() scanner := bufio.NewScanner(r) for scanner.Scan() { fmt.Println(scanner.Text()) } }
执行代码,发现最后出现了死锁的情况,定位到出现死锁的位置
1 2 3 4 5 6 7 8 9 10 11 12 13 func (p *pipe) Read (b []byte ) (n int , err error) { select { case bw := <-p.wrCh: nr := copy (b, bw) p.rdCh <- nr return nr, nil case <-p.done: return 0 , p.readCloseError() } }
应该是这个的问题<-p.wrCh
。这个 channel 用于接收从 pipe.Write()
中发送来的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (p *pipe) Write (b []byte ) (n int , err error) { for once := true ; once || len (b) > 0 ; once = false { select { case p.wrCh <- b: nw := <-p.rdCh b = b[nw:] n += nw case <-p.done: return n, p.writeCloseError() } } return n, nil }
但是由于数据已经全部发送完毕,所以这里的 Write
函数已经退出了。所以 case bw := <-p.wrCh
不可能得到数据了。那么就只能希望从 case <-p.done
中获得数据,那个就是调用 pipe.CloseWrite()
或者 pipe.CloseRead()
,也就是调用 PipeWriter.Close()
或者调用 PipeReader.Close()
。
把那段代码中的 writer.Close()
解除注释就可以了。
由于 pipe.CloseWrite()
和 pipe.CloseRead()
是一样的,所以 PipeWriter
和 PipeReader
只需要调用一次即可。
os.Pipe 1 func Pipe () (r *File, w *File, err error)
这个是对操作系统文件句柄的封装。现在使用这个函数实现和上面类似的功能
inR
和 inW
这一对 Pipe 用于向启动的子进程中传入数据
outR
和 outW
这一对 Pipe 用于子进程向外部输出结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func main () { inR, inW, _ := os.Pipe() outR, outW, _ := os.Pipe() done := make (chan struct {}) process, _ := os.StartProcess("/bin/sh" , nil , &os.ProcAttr{ Files: []*os.File{inR, outW, outW}}) go func () { writer := bufio.NewWriter(inW) for i := 0 ; i < 3 ; i++ { time.Sleep(time.Second) writer.WriteString("date\n" ) writer.Flush() } inW.Close() outW.Close() }() go func () { scanner := bufio.NewScanner(outR) for scanner.Scan() { fmt.Println(scanner.Text()) } process.Signal(os.Kill) done <- struct {}{} fmt.Println("finish" ) }() process.Wait() <-done }
输出类似于如下数据
1 2 3 4 Sun Jul 5 18:36:52 CST 2020 Sun Jul 5 18:36:53 CST 2020 Sun Jul 5 18:36:54 CST 2020 finish
另一个比较简单的例子是调用 ls
查看当前目录,这里使用 bytes.Buffer.ReadFrom
读取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func main () { inR, inW, _ := os.Pipe() outR, outW, _ := os.Pipe() dir, _ := filepath.Abs(filepath.Dir(os.Args[0 ])) process, _ := os.StartProcess("/bin/sh" , nil , &os.ProcAttr{ Files: []*os.File{inR, outW, outW}, Dir: dir, }) go func () { writer := bufio.NewWriter(inW) writer.WriteString("ls -a" ) writer.Flush() inW.Close() outW.Close() }() process.Wait() buffer := new (bytes.Buffer) buffer.ReadFrom(outR) fmt.Println(buffer.String()) }