1.注意:
在spark中,计算时,有多少个partitions就有多少个task
2.SparkContext(Spark程序的入口)
运行spark程序的第一步:SparkConf()——>SparkContext()
SparkContext表示:如何连接一个集群,运行在哪种模式上
创建SparkConf对象,支持set链式编程,将SparkConf中包含的一些信息(Application name,master)传递到 SparkContext中。
在集群上运行时,不要硬编码master,要通过 spark-submit方式来提交
1 | object SparkContextApp { def main(args: Array[String]): Unit = { |
3.spark-shell
查看命令帮助:-help
1 | 重要参数 |
4.RDD的创建方式
1).把一个集合转换为RDD 测试的时候用
2).使用外部存储系统的数据集(HDFS,HBase…) 生产的时候用
方式一:
1 | scala> val data = Array(1, 2, 3, 4, 5) |
方法2:
外部数据集的方式创建RDD, 支持Hadoop,Hbase…支持 文本格式、sequenceFile
textFile:
Read a text file from HDFS, a local file system (available on all nodes), or any
Hadoop-supported file system URI, and return it as an RDD of Strings.(返回的是字符串)
Standalone模式下:
在standalone模式下少用本地的文件模式
inputfile 必须要确保所有节点上都有
1 | //本地 |
注意点:
1.使用本地文件时,文件必须在各个工作节点上都能以相同的路径访问,所有节点上都必须要有这个文件,文件路径还得一样
2.指定hdfs时,可以指定到文件,也可以指定到文件夹,也可以用通配符匹配多个文件
3.textFile的第二个参数可以控制partition的数量
5.RDD的操作:
分为三大类:
1) transformations
从一个RDD,转换成一个新的RDD,如 rdd.map(+1),只有再涉及到actions的操作,才计算,返回结果
2)actions
是一个操作,返回结果的操作,如rdd.reduce(a+b)
3) cache
如果有数据丢失,损坏,它会通过依赖关系,找到这个丢失的上一个依赖,重新计算。
①map 对RDD中的每个元素进行操作,返回一个新的RDD
1 | scala> var a = sc.parallelize(1 to 9) |
② filter 对元素进行过滤
1 | scala> a.filter(_%2 == 0).collect() |
③flatMap和Map的区别
flatMap是压平后操作的,词频统计里面就用flatMap
1 | scala> var a = sc.parallelize(1 to 9) |
④mapValues 只对value进行操作,key不动
1 | scala> val a = sc.parallelize(List("dog","tiger","cat")) |
⑤count 返回数据集里元素的个数
⑥sum 求和操作
1 | scala> var a = sc.parallelize(1 to 100) |
⑦first 返回数据集第一个元素 和take(1)类似
1 | scala> val a = sc.parallelize(List("dog","tiger","cat")) |
⑧top 排序
1 | scala> sc.parallelize(Array(6,9,4,7,5,8)).top(2) |
⑨subtract
不是相减,是去重复元素
1 | scala> val a = sc.parallelize(1 to 5) |
⑩intersection
返回重复部分
1 | scala> val a = sc.parallelize(1 to 5) |
⑾cartesian
返回笛卡尔积
1 | scala> val a = sc.parallelize(1 to 5) |
注:下面的是action操作, 对结果进行处理
reduce 聚合所有的结果
⑿Join在Spark CORE中的使用
1 | scala> val a=sc.parallelize(Array(("A","a1"),("B","b1"),("C","c1")("D","d1"),("D","d2"))) |
⒀词频统计
1 | scala> val log = sc.textFile("/home/hadoop/date/a.txt") |