spark 二次排序

数据:
40 20
40 10
40 30
40 5
30 30
30 20
30 10
30 40
50 20
50 50
50 10
50 60

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.scala.test.core.secondsort

import org.apache.spark.{SparkConf, SparkContext}

object SecondarySort {

def main(args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("SecondarySort").setMaster("local[*]")
val sc = new SparkContext(conf)
val line = sc.textFile("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/secondarysort/sort.txt")

val data = line.map(x => {
val line : Array[String] = x.split(" ")
(line(0),line(1))
})

val rdd = data.groupByKey().sortByKey()//todo groupbykey和sortByKey
// (40,CompactBuffer(20, 10, 30, 5))
// (50,CompactBuffer(20, 50, 10, 60))
// (30,CompactBuffer(30, 20, 10, 40))


//todo 之前多条现在变成3条了,改变了原来的条数
// val result = rdd.map(item => (item._1, item._2.toList.sortWith(_.toInt<_.toInt)))//todo values转list并排序

//todo 不改变原来的条数
val result = rdd.flatMap(item => {
val list = item._2.toList.sortWith(_.toInt<_.toInt)
list.map(x => (item._1,x))
})

// 保存
// result.saveAsTextFile("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/secondarysort/out")
// 打印
result.collect().foreach(println)
sc.stop()
}
}

上面的方式,groupbykey之后把相同的key聚合在一起排序,实际上是内存排序,这种方法可能导致归约器耗尽内存。如果数量很少可以用。和mapreduce的setGroupingComparatorClass方式不同,mapreduce在reduce中没有排序操作,是用框架中的排序进行。
使用repartitionAndSortWithinPartitions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.scala.test.core.secondsort

import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

object SecondarySortRepartition {

def main(args: Array[String]): Unit = {
implicit val caseInsensitiveOrdering = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compareTo(a)
}

import org.apache.spark.Partitioner
class KeyBasePartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

override def numPartitions: Int = partitions

override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[SecondarySort]
Math.abs(k.one.hashCode() % numPartitions)
}
}

val conf = new SparkConf().setAppName("SecondarySortRepartition").setMaster("local[*]")
val sc = new SparkContext(conf)
val line = sc.textFile("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/secondarysort/sort.txt")

class SecondarySort(val one :Int,val two : Int) extends Ordered[SecondarySort] with Serializable {
override def compare(that: SecondarySort): Int = {
if(this.one-that.one != 0){
this.one-that.one
}else{
this.two-that.two
}
}
}


line.map(x =>
{
val xy=x.split(" ")
(new SecondarySort(Integer.parseInt(xy(0)),Integer.parseInt(xy(1)).toInt),x)
}
)
.repartitionAndSortWithinPartitions(new KeyBasePartitioner(3))
.saveAsTextFile("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/secondarysort/out")
// .foreach(println)
sc.stop()
}
}

当spark的分区数大于线程数时,spark仍会按照一个一个分区单独处理,而不会像MapReduce设置setGroupingComparatorClass。
可以通过设置.setMaster(“local[2]”)和new KeyBasePartitioner(3)来验证。

spark 1.2之后引入了一个高质量的算子 repartitionAndSortWithinPartitions?。该算子为spark的Shuffle增加了sort。假如,后面再跟mapPartitions算子的话,其算子就是针对已经按照key排序的分区,这就有点像mr的意思了。与groupbykey不同的是,数据不会一次装入内存,而是使用迭代器一次一条记录从磁盘加载。这种方式最小化了内存压力

定义partitioner的方式可以参考
https://www.bbsmax.com/A/LPdoVrl253/
repartitionAndSortWithinPartitions算子比先分区在排序效率高
https://blog.csdn.net/luofazha2012/article/details/80587128

sortbykey VS repartitionAndSortWithinPartitions
https://blog.csdn.net/wyqwilliam/article/details/81627603