Spark 编程模型(下)

Posted fengyouheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 编程模型(下)相关的知识,希望对你有一定的参考价值。

创建Pair RDD

什么是Pair RDD

● 包含键值对类型的RDD被称作Pair RDD

● Pair RDD通常用来进行聚合计算

● Pair RDD通常由普通RDD做ETL转换而来

创建Pair RDD

● Python:pairs = lines.map(lambda x: (x.split(" ")[0], x))
● Scala:val pairs = lines.map(x => (x.split(" ")(0), x))
● Java:
PairFunction<string, string,="" string=""> keyData =
	new PairFunction<string, string,="" string="">() {
	public Tuple2<string, string=""> call(String x) {
		return new Tuple2(x.split(" ")[0], x);
	}
};
JavaPairRDD<string, string=""> pairs = lines.mapToPair(keyData);

Pair RDD的transformation操作

Pair RDD转换操作1

● Pair RDD 可以使用所有标准RDD 上转化操作,还提供了特有的转换操作。

技术分享图片

Pair RDD转换操作2

技术分享图片

Pair RDD的action操作

Pair RDD转换操作1

● 所有基础RDD 支持的行动操作也都在pair RDD 上可用

技术分享图片

 

Pair RDD的分区控制

Pair RDD的分区控制

● Spark 中所有的键值对RDD 都可以进行分区控制---自定义分区

● 自定义分区的好处:

1)避免数据倾斜

2)控制task并行度

自定义分区方式

class DomainNamePartitioner(numParts: Int) extends Partitioner {
	override def numPartitions: Int = numParts
	override def getPartition(key: Any): Int = {
		val domain = new Java.net.URL(key.toString).getHost()
		val code = (domain.hashCode % numPartitions)
		if(code < 0) {
			code + numPartitions // 使其非负
		}else{
			code
		}
	}
	// 用来让Spark区分分区函数对象的Java equals方法
	override def equals(other: Any): Boolean = other match {
		case dnp: DomainNamePartitioner =>
			dnp.numPartitions == numPartitions
		case _ =>
			false
	}
}
   
	

以上是关于Spark 编程模型(下)的主要内容,如果未能解决你的问题,请参考以下文章

Spark 编程模型(下)

Spark 编程模型(下)

Spark 编程模型(中)

在这个 spark 代码片段中 ordering.by 是啥意思?

Spark 编程模型(上)

python+spark程序代码片段