![Hadoop大数据技术开发实战](https://wfqqreader-1252317822.image.myqcloud.com/cover/392/27563392/b_27563392.jpg)
5.6 案例分析:二次排序
MapReduce在传递<key,value>对时默认按照key进行排序,而有时候除了key以外,还需要根据value或value中的某一个字段进行排序,基于这种需求进行的自定义排序称为“二次排序”。
例如有以下数据:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P102_12842.jpg?sign=1739183231-LW7ehSnhBwrBpPOJcyTnV1oUFXTGWKtK-0-c8ab4279172591f1d794c69e9e4db6a6)
现需要对上述数据先按照第一字段进行升序排列,若第一字段相同,则按照第二字段进行降序排列,期望的输出结果如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P103_12963.jpg?sign=1739183231-s6O1EJfHnrT02O6TweTw7PySC4LkX0xN-0-17ed1b5fb03abe955801155aaa2bb1d0)
1. 设计思路
由于MapReduce中主要是对key的比较和排序,因此可以将需要排序的两个字段组合成一个复合key,而value值不变,则组合后的<key,value>对形如<(key,value),value>。
在编程时可以自定义一个类MyKeyPair,该类中包含要排序的两个字段,然后将该类作为<key,value>对中的key(Hadoop中的任何类型都可以作为key),形如<MyKeyPair,value>,相当于自定义key的类型。由于所有的key是可序列化并且可比较的,因此自定义的key需要实现接口WritableComparable。
与按照一个字段排序相比,本次二次排序需要自定义的地方如下:
- 自定义组合key类,需要实现WritableComparable接口。
- 自定义分区类,按照第一个字段进行分区,需要继承Partitioner类。
- 自定义分组类,按照第一个字段进行分组,需要继承WritableComparator类。
2. 编写程序
(1)自定义组合key类。
新建自定义组合key类MyKeyPair.java,该类需要实现Hadoop提供的org.apache.hadoop.io.WritableComparable接口,该接口继承了org.apache.hadoop.io.Writable接口和java.lang.Comparable接口,定义源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P103_12964.jpg?sign=1739183231-VZkk76w2vegoGLvnw7ef7kfGTIIgjg6G-0-05f4e181e56504e11aaafb49011230f2)
然后需要实现WritableComparable接口中的序列化方法write()、反序列化方法readFields()、比较方法compareTo()。write()方法用于将数据写入输出流;readFields()方法用于从输入流读取数据;compareTo()方法用于将两个对象进行比较,以便能够进行排序。
自定义组合key类MyKeyPair.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P103_12901.jpg?sign=1739183231-9hSd84bLC7hNqeRbtUDMcYGUT1mLZCbl-0-4e18617315bfeb24e4aac613cc27fa87)
(2)自定义分区类。
新建自定义分区类MyPartitioner.java,该类需要继承Hadoop提供的org.apache.hadoop.mapreduce.Partitioner类,并实现其中的抽象方法getPartition()。Partitioner类是一个抽象泛型类,用于控制对Map任务输出结果的分区,泛型的两个参数分别表示<key,value>对中key的类型和value的类型。Partitioner类的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P105_13463.jpg?sign=1739183231-I6hQ54Lsevj6ftdLmWW3yyUJ2beUbwqU-0-b221388cc03f0937240c59a20278c627)
关于MapReduce的分区规则可参考本章5.1.3节的MapReduce工作原理,此处不再赘述。
自定义分区类MyPartitioner.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P105_13464.jpg?sign=1739183231-A2LkfIBioaD1yO9GBwTAg4lGwr06Kt1x-0-4a3488a7ef3dd364d3e52c7a33ca6218)
上述代码继承Partitioner类的同时指定了<key,value>对中key的类型为MyKeyPair,value的类型为IntWritable。
(3)自定义分组类。
新建自定义分组类MyGroupComparator.java,该类需要继承Hadoop提供的org.apache.hadoop.io.WritableComparator类,并重写其中的compare()方法,以实现按照指定的字段进行分组。
自定义分组类MyGroupComparator.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P106_13683.jpg?sign=1739183231-7VawLn09E07hvLBrFaLPbhWnZCa6f7Jq-0-bbb022364be871f70cfb1f5a9365b698)
上述代码首先通过构造方法指定了<key,value>对中key的类型为MyKeyPair,由于MapReduce默认以<key,value>对中的key值进行分组,因此接下来重写了compare()方法,实现了按照MyKeyPair对象中的first字段进行对比,若值相等则会将当前<key,value>对分为一组。
(4)定义Mapper类。
新建Mapper类MyMapper.java,实现将输入的数据封装为<MyKeyPair, IntWritable>形式的<key,value>对进行输出,即输出的key的类型为MyKeyPair,输出的value的类型为IntWritable。
Mapper类MyMapper.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P106_13684.jpg?sign=1739183231-PyB64RThHHP0aJZsuTnfnFWZs1zA87ox-0-b85fd711f9f9652291cf71a7f353ba48)
(5)定义Reducer类。
新建Reducer类MyReducer.java,将接收到的分组后的<key,value-list>对循环进行输出。
Reducer类MyReducer.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P107_13944.jpg?sign=1739183231-lWNl66HrNpl97unLTWX1fP6NgVEke486-0-702cd9e635ca7464c81f256522ba582b)
上述代码将MyKeyPair类型的key中的first字段值作为输出的key,输出的value从集合values中进行遍历。
(6)定义应用程序主类。
新建应用程序主类MySecondSortApp.java,在该类中需要指定自定义的分区类和分组类,同时需要显式设置Map任务输出的key和value的类型。
应用程序主类MySecondSortApp.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P108_14198.jpg?sign=1739183231-I4ivyocUjWW2RAVEH7nCaPrYH4KORXgr-0-74e3df99482b4276bc67960b066ef337)
上述代码解析如下:
❶ 设置map()方法输出的key和value的类型。若将此省略,则默认采用❷中设置的输出类型。也就是说,若map()方法和reduce()方法的输出类型一致,可以省略对map()方法输出类型的设置。若map()方法和reduce()方法实际的输出类型与此处的设置不匹配,则程序运行过程中将会报错。
在MapReduce程序运行的过程中会通过JobConf类获取map()方法的输出类型,获取map()方法输出key的类型的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P109_14351.jpg?sign=1739183231-VM64ioyODE6munbRdbRqY94fzp44hoJU-0-adb760891e098908b0b16aaea3e72bac)
从上述源码可以看出,当没有设置map()方法的输出类型时,会调用getOutputKeyClass()方法使用reduce()方法的输出类型。
❸ 在执行MapReduce程序时,会首先从HDFS中读取数据块,然后按行拆分成<key,value>对,这个过程则是由TextInputFormat类完成的。TextInputFormat类继承了抽象类FileInputFormat<K,V>,而FileInputFormat<K, V>又继承了抽象类InputFormat<K, V>,抽象类InputFormat<K, V>中定义了两个方法:getSplits()和createRecordReader()。getSplits()方法负责将HDFS数据解析为InputSplit集合,createRecordReader()方法负责将一个InputSplit解析为一个<key,value>对记录。抽象类InputFormat<K, V>的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P109_14352.jpg?sign=1739183231-mfZkDr7FbczS6yQN2I1yrSwPETAelnqc-0-f2440003f2c720f30a219d7f0f788506)
3. 程序运行
程序的打包和执行参考前面的“单词计数”和“数据去重”案例,此处不再赘述。
执行完成后,查看执行结果,如图5-11所示。
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P110_14356.jpg?sign=1739183231-72UAex0ehrW69X8e7DHA0cH22cNRvec0-0-54cf582ed9dc4e1e4f9323b2f8ebd9d8)
图5-11 查看二次排序程序执行结果