初识Spark系列-RDD转换

一颗蔬菜 2019-09-22 PM 67℃ 0条

RDD整体上分为Value型和Key-Value型

1.Value型

map(func)算子

作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

eg:创建一个1-10数组RDD,将所有元素*2形成新的RDD

val source = sc.makeRDD(1 to 10)  // 创建RDD
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

source.collect() // 打印RDD
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

val mapped = source.map(_*2) // 将所有元素*2
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26

mapped.collect()  // 打印结果
es8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
mapPartitions(func)算子

作用:类似与map,但独立地在RDD的每一个分区上运行。

假设有N个元素,M个分区:

如果使用map算子,则需要调用N次;如果使用mapPartitions算子,则需要调用M次。

即map算子每次处理一个元素,而mapPartitions每次处理一个分区。

eg:创建一个RDD,使每个元素*2组成新的RDD

val source = sc.makeRDD(Array(1,2,3,4))   // 创建RDD

source.mapPartitions(x=>x.map(_*2))   // 使每个元素*2组成新的RDD
res4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at mapPartitions at <console>:30

res4.collect() // 打印结果
map和mapPartitions算子的区别

1.map()每次只处理一条数据

2.mapPartitions每次处理一个分区的数据,这个分区处理完成后,原RDD中分区的数据才能释放,导致OOM。

当内存比较小时,建议使用map,虽然慢,但安全。

当内存比较大时,建议使用mapPartitions,提高生产效率。

flatMap(func)算子

作用:类似与Map,但是每一个输入元素被映射为0或者多个输出元素,func函数返回的结果是一个序列,而不是单一元素

eg:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5)

scala> val flatMap = sc.makeRDD(1 to 5)
flatMap: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:27

scala> flatMap.flatMap((1 to _))
res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at flatMap at <console>:30

scala> res14.collect()
res15: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
groupBy(func)算子

作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入同一个迭代器。

eg:创建一个RDD,按照元素%2的指进行分组。

scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:27

scala> rdd.groupBy(_%2) // 分组
res16: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[10] at groupBy at <console>:30

scala> res16.collect()
res17: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8, 10)), (1,CompactBuffer(1, 3, 5, 7, 9)))
filter(func)算子

作用:过滤。返回一个新的RDD,该RDD由func函数计算后返回值为true的输入元素组成。

eg:创建一个RDD(由字符串组成), 过滤出一个新的RDD(包含“xiao”子串)

scala> val sourceFilter  = sc.makeRDD(Array("xiaohua","dahua","xiaogou","xiaoshu","dashu"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:27


scala> sourceFilter.filter(_.contains("xiao"))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at filter at <console>:30

scala> res0.collect()
res1: Array[String] = Array(xiaohua, xiaogou, xiaoshu)
标签: none

非特殊说明,本博所有文章均为博主原创。