RDD基本操作

1.注意:

在spark中,计算时,有多少个partitions就有多少个task

2.SparkContext(Spark程序的入口)

运行spark程序的第一步:SparkConf()——>SparkContext()

SparkContext表示:如何连接一个集群,运行在哪种模式上

创建SparkConf对象,支持set链式编程,将SparkConf中包含的一些信息(Application name,master)传递到 SparkContext中。

在集群上运行时,不要硬编码master,要通过 spark-submit方式来提交

1
2
3
4
5
6
7
8
9
10
11
12
object SparkContextApp { def main(args: Array[String]): Unit = {

val sparkconf = new SparkConf().setAppName("sparkcontextApp").setMaster("local[1]")

val sc =new SparkContext(sparkconf)

// ToDo... 业务逻辑代码

sc.stop()

}
}

3.spark-shell

查看命令帮助:-help

1
2
3
4
5
6
7
重要参数
–master 不建议硬编码指定master
–name 指定application的名称
–jars 以逗号分割,传入多个本地jar包
–conf 指定配置参数
–queue 队列
–num-executor 执行端的个数

4.RDD的创建方式

1).把一个集合转换为RDD 测试的时候用

2).使用外部存储系统的数据集(HDFS,HBase…) 生产的时候用

方式一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

//数组转换为RDD scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> distData.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5)

//数组里面的数两两相加
scala> distData.reduce((a, b) => a + b)
res1: Int = 15

// 一个task对应于一个partition,下面的命令会有5个task
scala> val distData = sc.parallelize(data,5)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

方法2:

外部数据集的方式创建RDD, 支持Hadoop,Hbase…支持 文本格式、sequenceFile

textFile:

Read a text file from HDFS, a local file system (available on all nodes), or any

Hadoop-supported file system URI, and return it as an RDD of Strings.(返回的是字符串)

Standalone模式下:
在standalone模式下少用本地的文件模式

inputfile 必须要确保所有节点上都有

1
2
3
4
5
6
7
8
9
10
11
12
13
//本地
scala> val distFile = sc.textFile("file:///home/hadoop/data/a.txt")
distFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/a.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> distFile.collect
res0: Array[String] = Array(hello word hello, word hello made)

// hdfs
scala> val distFile = sc.textFile("hdfs://hadoop001:9000/data")
distFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:9000/data MapPartitionsRDD[1] at textFile at <console>:24

scala> distFile.collect
res0: Array[String] = Array(hello word hello, word hello made)

注意点:

1.使用本地文件时,文件必须在各个工作节点上都能以相同的路径访问,所有节点上都必须要有这个文件,文件路径还得一样

2.指定hdfs时,可以指定到文件,也可以指定到文件夹,也可以用通配符匹配多个文件

3.textFile的第二个参数可以控制partition的数量

5.RDD的操作:

分为三大类:

1) transformations

从一个RDD,转换成一个新的RDD,如 rdd.map(+1),只有再涉及到actions的操作,才计算,返回结果

2)actions

是一个操作,返回结果的操作,如rdd.reduce(a+b)

3) cache

如果有数据丢失,损坏,它会通过依赖关系,找到这个丢失的上一个依赖,重新计算。

①map 对RDD中的每个元素进行操作,返回一个新的RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> var a = sc.parallelize(1 to 9)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> a.map(_*2)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27

scala> a.map(_*2).collect()
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

scala> var a = sc.parallelize(List("dog","tiger","cat")) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> val b =a.map(x=>(x,1))
b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

scala> b.collect()
res2: Array[(String, Int)] = Array((dog,1), (tiger,1), (cat,1))

② filter 对元素进行过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> a.filter(_%2 == 0).collect()
res3: Array[Int] = Array(2, 4, 6, 8, 10)

scala> a.filter(_<4).collect()
res4: Array[Int] = Array(1, 2, 3)

scala> var a = sc.parallelize(1 to 6)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> val mapRDD = a.map(_*2)
mapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26

cala> mapRDD.collect()
res5: Array[Int] = Array(2, 4, 6, 8, 10, 12)

scala> mapRDD.filter(_>5).collect
res7: Array[Int] = Array(6, 8, 10, 12)

③flatMap和Map的区别
flatMap是压平后操作的,词频统计里面就用flatMap

1
2
3
4
5
6
7
8
9
10
11
12
scala> var a = sc.parallelize(1 to 9)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> var nums = a.map(x=>(x*x))
nums: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

scala> nums.collect
res0: Array[Int] = Array(1, 4, 9, 16, 25, 36, 49, 64, 81)

//把每个x都拓展成1到x
scala> nums.flatMap(x => 1 to x).collect
res1: Array[Int] = Array(1, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 1, 2, 3, 4, 5, ...

④mapValues 只对value进行操作,key不动

1
2
3
4
5
6
7
8
9
10
11
scala> val a = sc.parallelize(List("dog","tiger","cat")) 
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> val b = a.map(x=>(x,x.length))
b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:26

scala> b.collect()
res9: Array[(String, Int)] = Array((dog,3), (tiger,5), (cat,3))

scala> b.mapValues("x"+_+"x").collect()
res10: Array[(String, String)] = Array((dog,x3x), (tiger,x5x), (cat,x3x))

⑤count 返回数据集里元素的个数

⑥sum 求和操作

1
2
3
4
5
6
7
8
scala> var a = sc.parallelize(1 to 100)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> a.sum()
res11: Double = 5050.0

scala> a.reduce(_+_)
res12: Int = 5050

⑦first 返回数据集第一个元素 和take(1)类似

1
2
3
4
5
6
7
8
scala> val a = sc.parallelize(List("dog","tiger","cat")) 
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:24

scala> a.first()
res13: String = dog

scala> a.take(1)
res14: Array[String] = Array(dog)

⑧top 排序

1
2
3
4
5
6
7
8
9
scala> sc.parallelize(Array(6,9,4,7,5,8)).top(2)
res15: Array[Int] = Array(9, 8)

//隐式转换后从小到大排序
scala> implicit val myorder = implicitly[Ordering[Int]].reverse
myorder: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@44473eed

scala> sc.parallelize(Array(6,9,4,7,5,8)).top(2)
res16: Array[Int] = Array(4, 5)

⑨subtract
不是相减,是去重复元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> a.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val b = sc.parallelize(2 to 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> b.collect
res4: Array[Int] = Array(2, 3)

scala> a.subtract(b).collect
res5: Array[Int] = Array(4, 1, 5)

⑩intersection

返回重复部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> a.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val b = sc.parallelize(2 to 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> b.collect
res4: Array[Int] = Array(2, 3)

scala> a.intersection(b).collect
res6: Array[Int] = Array(2, 3)

⑾cartesian
返回笛卡尔积

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> a.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val b = sc.parallelize(2 to 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> b.collect
res4: Array[Int] = Array(2, 3)

scala> a.cartesian(b).collect
res7: Array[(Int, Int)] = Array((1,2), (2,2), (1,3), (2,3), (3,2), (4,2), (5,2), (3,3), (4,3), (5,3))

注:下面的是action操作, 对结果进行处理

reduce 聚合所有的结果

⑿Join在Spark CORE中的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val a=sc.parallelize(Array(("A","a1"),("B","b1"),("C","c1")("D","d1"),("D","d2"))) 
a: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val b=sc.parallelize(Array(("A","a2"),("B","b2"),("C","c1")("D","d2"),("D","d3"),("E","E1")))
b: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[7] at parallelize at <console>:24

//join,只返回左右都匹配上的
scala> a.join(b).collect
res10: Array[(String, (String, String))] = Array((B,(b1,b2)), (D,(d1,d2)), (D,(d1,d3)), (D,(d2,d2)), (D,(d2,d3)), (A,(a1,a2)), (C,(c1,c1)))

//leftOuterJoin , rightOuterJoin,fullOuterJoin跟sql原理是一样的
scala> a.leftOuterJoin(b).collect
res2: Array[(String, (String, Option[String]))] = Array((B,(b1,Some(b2))), (D,(d1,Some(d2))), (D,(d1,Some(d3))), (D,(d2,Some(d2))), (D,(d2,Some(d3))), (A,(a1,Some(a2))), (C,(c1,Some(c1))))

⒀词频统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> val log = sc.textFile("/home/hadoop/date/a.txt") 
log: org.apache.spark.rdd.RDD[String] = /home/hadoop/date/a.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> val splits = log.flatMap(x => x.split("\t"))
splits: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26

scala> val word = splits.map(x=>(x,1))
word: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28

scala> word.collect
res0: Array[(String, Int)] = Array((hello,1), (eurecom,1), (hello,1), (eurecom,1), (hello,1), (yuan,1))

scala> val res1 = wordone.reduceByKey(_+_)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:30

scala> res1.collect
res2: Array[(String, Int)] = Array((hello,3), (eurecom,2), (yuan,1))

本文标题:RDD基本操作

文章作者:skygzx

发布时间:2019年05月03日 - 11:50

最后更新:2019年05月06日 - 13:55

原始链接:http://yoursite.com/2019/05/03/RDD的基本操作/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------本文结束感谢您的阅读-------------
0%