使用 map-reduce 按行比较两组数据的 diff

对于大批量的数据,使用本地的 diff 命令比较速度慢,推荐采用 map-reduce 的方式来进行。本文的主要内容为 mr 相关脚本的实现及分析。

脚本如下:

  • run.sh : 用于提交 mr 任务

  • mapper.sh : mr 任务的 mapper 脚本

  • reducer.sh : mr 任务的 reducer 脚本

代码是抄同事的,分析是抄博客的(滑稽

run.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#!/bin/bash

set -x
HADOOP_BIN=xxxxx # hadoop bin 路径
TIME_STAMP=`date +%s`
JOB_UGI=xxxxx # 密码
FS_NAME=xxxx # 集群名称
JOB_QUEUE=xxxx # 队列名称
JOB_GROUP=xxxx # 队列组
JOB_TRACKER=xxxx # 队列 tracker
JOB_NAME=xxxxx # 任务名称
REDUCE_NUM=100
context1=xxxxx # 第一组数据 hdfs 存储路径中特有的字符串,作为表示其的 id
context2=xxxxx # 第二组数据 hdfs 存储路径中特有的字符串,作为表示其的 id
INPUT1=xxxxx # 第一组数据存储在 hdfs 上的绝对路径
INPUT2=xxxxx # 第二组数据存储在 hdfs 上的绝对路径
OUTPUT=xxxxx # 结果输出的 hdfs 上的绝对路径

${HADOOP_BIN} fs -D hadoop.job.ugi="${JOB_UGI}" -rmr ${OUTPUT}

${HADOOP_BIN} streaming \
-D hadoop.job.ugi="${JOB_UGI}" \
-D fs.default.name="${FS_NAME}" \
-D mapred.job.queue.name="${JOB_QUEUE}" \
-D mapred.job.groups="${JOB_GROUP}" \
-D mapred.job.tracker="${JOB_TRACKER}" \
-D mapred.job.name="${JOB_NAME}" \
-D mapred.job.priority="VERY_HIGH" \
-D mapred.job.map.capacity=10000 \
-D mapred.job.reduce.capacity=2000 \
-D mapred.reduce.tasks=${REDUCE_NUM} \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1,2" \
-D stream.memory.limit=10240 \
-D abaci.job.map.child.memory.mb=10240 \
-D abaci.job.reduce.child.memory.mb=10240 \
-D mapred.map.max.attempts=10 \
-D mapred.reduce.max.attempts=10 \
-D dce.shuffle.enable=true \
-input ${INPUT1[@]} \
-input ${INPUT2} \
-output ${OUTPUT} \
-mapper "sh -x mapper.sh ${context1} ${context2}" \
-reducer "sh -x reducer.sh" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat \
-file mapper.sh \
-file reducer.sh

if [[ $? != 0 ]]; then
exit -1
fi
1
2
3
4
5
6
7
8
9
10
11
12
13
11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

===>

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1
  • org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

  • org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat

    • hadoop 支持 reduce 多路输出的功能,一个reduce可以输出到多个 part-xxxxx-X 文件中,其中 XA-Z 的字母之一,程序在输出 <key,value> 对的时候,在 value 的后面追加 #X 后缀,比如 #A,输出的文件就是 part-00000-A ,不同的后缀可以把 <key,value> 输出到不同的文件中,方便做输出类型分类, #X 仅仅用做指定输出文件后缀,不会体现到输出的内容中

    • 参考:https://www.cnblogs.com/shapherd/archive/2012/12/21/2827860.html


mapper.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
set -x
context1=$1
context2=$2
type=$(echo ${map_input_file} |
awk -F"\t" '{
if (index($0, "'${context1}'") > 0) {
print "1"
} else if (index($0, "'${context2}'") > 0) {
print "2"
} else {
print "-1"
}
}')

if [[ ${type} == "1" ]]; then
awk -F '\t' '{print $1"\t1"}'
elif [[ ${type} == "2" ]]; then
awk -F '\t' '{print $1"\t2"}'
else
exit -1
fi

在 run.sh 里指定了执行命令:-mapper "sh -x mapper.sh ${context1} ${context2}"

对于 hadoop 路径中包含 context1 的文件,会在其每行url后添加 \t1,例如 url1 \t 1,然后输出

对于 hadoop 路径中包含 context2 的文件,会在其每行url后添加 \t2,例如 url2 \t 2,然后输出

之后 hadoop 会根据这两个列进行排序( -D mapred.text.key.comparator.options=”-k1,2”)

reducer.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash

set -x

awk -F '\t' 'BEGIN {
only1_cnt = 0
only2_cnt = 0
comm_cnt = 0
} {
url = $1
type = $2
if (url != last_url) {
if (last_type == 1) {
++only1_cnt
print "file1 only: "last_line > "/dev/stderr"
print last_url"\t#A"
}
if (type == 2) {
++only2_cnt
print "file2 only: "$0 > "/dev/stderr"
print url"\t#B"
}
} else {
++comm_cnt
print url"\t#C"
print "comm: "$0 > "/dev/stderr"
}

last_url = url
last_type = type
last_line = $0
} END {
if (type == 1){
++only1_cnt
print "file1 only: "last_line > "/dev/stderr"
print url"\t#A"
}
printf "reporter:counter:counts,file1_only,%ld\n", only1_cnt > "/dev/stderr"
printf "reporter:counter:counts,file2_only,%ld\n", only2_cnt > "/dev/stderr"
printf "reporter:counter:counts,common,%ld\n", comm_cnt > "/dev/stderr"
}'

这个脚本接受的输入大致如下:

1
2
3
4
url1 1
url1 2
url2 1
url3 2

url1 在 1 和 2 中都存在,则输出 url1#C

url2 只在 1 中存在,则输出 url2#A

url3 只在 2 中存在,则输出 url3#B

org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat 会根据 # 后的值,将 url 输出到不同的 xxx-X 的文件中,X 对应 A、B 或 C