Node.js Stream 学习笔记

此为慕课网 两小时学会 Node.js stream 学习笔记。stream 的思想在各个语言中都是相通的,不限于Cpp、Java、Golang等,可以大大减少内存的消耗,提高代码的运行效率。


主要组成部分

  • data 事件,用来监听 stream 数据的输入
  • end 事件,用来监听 stream 数据输入完成
  • fs.createReadStream 方法,返回一个文件读取的 stream 对象
  • fs.createWriteStream 方法,返回一个文件写入的 stream 对象
  • pipe 方法,用来做数据流转
1
2
3
4
5
6
7
8
9
10
const http = require('http');
const fs = require('fs');
const path = require('path');
const PORT = 8000;
const server = http.createServer((req, res) => {
const fileName = path.resolve(__dirname, 'data.txt');
const stream = fs.createReadStream(fileName);
stream.pipe(res);
});
server.listen(PORT);

从哪里来

  • 控制台输入
  • http请求中的request
  • 读取文件

控制台输入

1
2
3
process.stdin.on('data', (chunk) => {
console.log(chunk.toString())
});

http的request

1
2
3
4
5
6
7
8
9
10
11
12
const http = require('http');
const PORT = 8000;
const server = http.createServer((req, res) => {
let data = '';
req.on('data', chunk => { // 收到数据
data += chunk.toString();
});
req.on('end', () => { // 数据传输结束
res.end(data);
});
});
server.listen(PORT);

从文件中读取

1
2
3
4
5
6
7
8
9
10
const fs = require('fs');
const path = require('path');
const readStream = fs.createReadStream(path.join(__dirname, 'data.txt'));
let length = 0;
readStream.on('data', (chunk) => {
length += chunk.toString().length
});
readStream.on('end', () => {
console.log(length)
});

到哪儿去

  • 控制台
  • http请求的response
  • 写入文件

几个示例代码

1
2
// console
process.stdin.pipe(process.stdout)
1
2
3
4
// http
const fs = require('fs');
var stream = fs.createReadStream('....');
stream.path(res);
1
2
3
4
5
// fs
const fs = require('fs');
const rs = fs.createReadStream('....');
const ws = fs.createWriteStream('....');
rs.pipe(ws)

建立WebServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const http = require('http'),
fs = require('fs'),
path = require('path');
const PORT = 8000;
const fileName = path.join(__dirname, 'data.txt');
const server = http.createServer((req, res) => {
switch (req.method) {
case 'GET': // 处理 GET 方法
const rs = fs.createReadStream(fileName);
rs.pipe(res);
break;
case 'POST': // 处理 POST 方法
let len = 0;
req.on('data', (chunk) => {
len += chunk.toString().length;
console.log('chunk ', len);
});
req.on('end', () => {
console.log('chunk end');
res.end(len+'')
});
break;
default:
res.end('method not support')
}
});
server.listen(PORT, () => {
console.log(`Server is listening at port ${PORT}`)
});

按行读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const fs = require('fs');
const path = require('path');
const readline = require('readline');
const fileName = path.join(__dirname, 'data.txt');
const rs = fs.createReadStream(fileName);
const rl = readline.createInterface({
input: rs
});

rl.on('line', (lineData) => { // 按行读取事件
console.log(lineData);
console.log('----this line read----');
});
rl.on('close', () => { // 按行读取结束
console.log('readline end')
});

stream流动的单位

8个二进制单位为一个存储单元,成为字节(byte),能存储的最整数位2^8=258

常常用两位16进制数代表一个字节,如 #CCCCCC 为三字节

1
2
3
4
const str = '学习 nodejs stream';
const buf = Buffer.from(str, 'utf-8');
console.log(buf); // // <Buffer e5 ad a6 e4 b9 a0 20 6e 6f 64 65 6a 73 20 73 74 72 65 61 6d>
console.log(buf.toString('utf-8'));

流的种类

read stream

1
2
3
4
5
6
7
const Readable = require('stream').Readable;
const rs = new Readable;
rs.push('学习 ');
rs.push('nodejs ');
rs.push('stream\n');
rs.push(null);
rs.pipe(process.stdout);

write stream

1
2
3
4
5
6
7
const Writable = require('stream').Writable;
const ws = new Writable;
ws._write = function (chunk, encoding, next) {
console.log(chunk.toString());
next();
};
process.stdin.pipe(ws);

duplex stream

1
2
3
4
5
6
const fs = require('fs');
const zlib = require('zlib');
const readStream = fs.createReadStream('./file1.txt');
const writeStream = fs.createWriteStream('./file1.txt.gz');
readStream.pipe(zlib.createGzip())
.pipe(writeStream);

更多教程

Nodejs 中的 Stream