SparkCore的Topn的代码书写案例

1.需求:

1)求每个域名的流量之和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
object LogApp {
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf()
sparkConf.setAppName("LogApp").setMaster("local[1]")
val sc=new SparkContext(sparkConf)

var lines=sc.textFile("file:///d:/data.txt")

// TODO... 求每个域名的流量之和

/*
* 需求分析:取出域名(第11个字段),流量(20) 两个字段
* 通过域名分组,求流量之和,通过reducebykey
*
* */

lines.map(x=>{
val splits=x.split("\t") //切分完之后是一个字符串数组
var length=splits.length
var traffic=0L
if (length==72){ //判断日志是否有问题,如果有问题就剔除掉,脏数据
val domain=splits(10) //index from zero

/*如果这里的字段不是123这样的类型,
而是xxx这样的,转换不了Long类型,所以需要try一下
* */

try{
traffic =splits(19).toLong
}catch {
}

(domain,traffic)
}else{
("-",0l) //0L
}
}).reduceByKey(_+_).collect().take(10).foreach(println)

sc.stop()

}

}

2)求每个省份访问次数的TOP10

需求分析:

(省份,1) 拿到省份,然后赋值为1 进行 reducebykey操作 然后降序排序

省份通过 ip拿到 需要用到 IPUtil,这个 需要去买,才能用,解析出来的 第一个字段是国家,第二个是省份,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
object LogApp {
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf()
sparkConf.setAppName("LogApp").setMaster("local[1]")
val sc=new SparkContext(sparkConf)

var lines=sc.textFile("file:///d:/data.txt")

lines.map(x=>{//ip 是第7个字段
val splits=x.split("\t")
val length=splits.length
if (length==72){
val args6=splits(6)//ip字段有可能是 IP或者是 ip+port 所以需要判断
val servicePort=args6.split(":")
var ip=""
if(servicePort.length==2){
ip=servicePort(0)
}else{
ip=args6
}
val province=IPUtil.getInstance().getIpInfos(ip)(1)//将省份从IP中解析出来
(province,1)
}else{
("-",1)
}
}).reduceByKey(_+_).sortBy(_._2,false).collect().foreach(println)

sc.stop()

//降序排序 可以用sortBy

}

}

拿到需求,首先要进行分析,分析好以后就是填空

本文标题:SparkCore的Topn的代码书写案例

文章作者:skygzx

发布时间:2019年05月07日 - 10:16

最后更新:2019年05月07日 - 16:14

原始链接:http://yoursite.com/2019/05/07/Sparkcore的Topn代码书写/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------本文结束感谢您的阅读-------------
0%