-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRDD
More file actions
177 lines (114 loc) · 6.35 KB
/
RDD
File metadata and controls
177 lines (114 loc) · 6.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
-- RDD
1. read-only.
2. you can create a new RDD to make change.
-- RDD method: (map, filter, groupBy, join)
-- RDD implement process: (Transform & Action), Transform only remember the 轨迹.
-- RDD 容错性
1. traditional => 数据复制 或者 记录日志
2. DAG 重新计算丢失分区,无需回滚系统,重算过程在不同节点间并行
-- RDD 特性:
1. 高效容错性
2. 中间结果持久化到内存,数据在内存中的多个RDD操作间进行传递,减免了不必要的读写操作
3. 存放数据都是java对象,避免了不必要的对象序列化和反序列化(转换成字节流读/写入磁盘)
-- RDD narrow dependency:
one father RDD -> one child RDD. // map, filter
multiple father RDD -> one child RDD // when join
-- RDD wide dependency: <shuffle operation> 如果子分区出现问题,想修复困难,因为要从很多父分区恢复
one father -> multiple child RDD. // groupByKey
-- RDD Stage 的划分
在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD放入到Stage中,将窄依赖尽量划分到同一个Stage中,实现流水线计算(Mapper/Reducer之间不用互相等)
-- RDD 运行过程
1. 创建RDD对象
2. SparkContext 负责计算RDD之间的依赖关系(分别宽依赖和窄依赖),构建DAG
3. DAG scheduler负责把DAG图分解成多个stage,每个stage中包含多个Task,每个Task会被Task scheduler分发给各个WorkerNode上的executor去执行
-- RDD 创建
1. val lines = sc.textFile("file:///usr/local/spark/word.txt") 本地文件系统
val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
val lines = sc.textFile("/user/hadoop/word.txt")
2. 通过并行集合(数组)创建RDD
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)
-- RDD 编程
1. filter()
val lines = sc.textFile("file///...")
lines.filter(line=>line.contains("Spark")).count() // count 计算最终RDD里的元素数目
2. map() & reduce()
val lines = sc.textFile("file:///...")
lines.map(line => line.split(" ").size).reduce((a,b)=> if (a>b) a else b)
-- Rdd 持久化
via using persist() 对一个RDD标记为持久化,当遇到第一个行动操作时才会真正的帮你做缓存
persisit(MEMORY_ONLY) === cache()
persisit(MEMORY_AND_DISK)
unpersist() 不需要持久化的时候可以删掉
-- Rdd 分区
原则是尽量等于集群中CPU(core)的数目
spark.default.parallelism
// 手动指定分区数量
sc.textFile(path,partitionNum)
sc.parallize(array,2) 2个分区
.repartition(N) N is the parition number, when data became less
-- RDD 打印
1. 如果在单机情况下,可以直接使用rdd.foreach(println)
2. 如果是集群模式,需要看所有结果,需要用rdd.collect().foreach(println)
因为集群模式数据会在不同worker node上,所以先要收集worker node上所有RDD元素到Driver
3. collect()有时候可能会导致内存溢出,这时候可以用rdd.take(100).foreach(println)
-- Pair RDD 创建
1. 从文件中加载 例子:word count
2. 通过并行集合(数组)创建RDD 例子:list -> parallelize -> map
-- 常用的RDD转换操作
1. reduceByKey(func)
example: pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
a,b 都是数字,假设他们对应相同的Key,则会加起来
2. groupByKey()
example: pairRDD.groupByKey() => RDD[String,Iterable[Int]]
("spark",1),("spark",2),("java",3),("java",5) => ("spark",(1,2)) and ("java",(3,5))
example:
val words = Array("one","two","two","three","three","three")
val wordPairsRDD = sc.parallelize(words).map(word=>(word,1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_+_)
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t=>(t._1,t._2.sum))
上面二者区别:
当用reduceByKey(), Spark可以在每个分区移动数据之前将待输出数据与一个共用的Key结合,减少通信开销。
当用groupByKey(), 因为其不接收函数,Spark只能先将所有键值对都移动,结果是集群间开销很大,导致传输延迟
-- RDD keys, values, sortByKey()
# 只取键值形成一个新的RDD
pairRDD.keys
# 只取value形成一个新的RDD
pairRDD.values
# 排序,默认为生序排列
pairRDD.sortByKey().foreach(println)
pairRDD.reduceByKey(_+_).sortByKey(false).collect
pairRDD.reduceByKey(_+_).sortBy(_._2,false).collect 根据值降序
pairRDD.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1)).foreach(println)
-- RDD mapValues(func) & join: (K,V1) and (K,V2) => (K,(V1,V2))
pairRDD.mapValues(x=>x+1).foreach(println)
val pairRDD1 = sc.parallelize(Array(("spark,1"),("spark",2),("hadoop",3),("hadoop",5)))
val pairRDD2 = sc.parallelize(Array(("spark","fast")))
pairRDD1.join(pairRDD2) => (spark,(1,fast))
(spark,(2,fast))
-- RDD 求平均实例
val rdd = sc.parallelize(Array(("spark,2"),("spark",6),("hadoop",6),("hadoop",4)))
## rdd
("spark,2") ("spark",6) ("hadoop",6) ("hadoop",4)
rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()
# rdd after mapValues
("spark",(2,1)) ("spark",(6,1)) ("hadoop",(6,1)) ("hadoop",(4,1))
# rdd after reduceByKey
("spark",(8,2)) ("hadoop",(10,2))
# rdd after second mapValues
("spark",4) ("hadoop",5)
-- RDD shared variable
减少数据传输量的问题,在executor内部,所有core(线程)共享这个变量。
Broadcast variable (缓存一个只读变量): 在所有节点的内存间进行共享.Spark行动操作会跨越多个stage,对于每个stage内的所有任务
所需要的公共数据,spark都会自动进行广播,广播到executor上。
example: val broadcastVar = sc.broadcast(Array(1,2,3))
get value: broadcastVar.value
accumulator: 不同节点间进行累加运算,计数求和。
# 数值型累加器,如下两种,用add方法来把数值累加到累加器上。当数据量很大的时候,会在不同节点上进行累加操作,最后由driver读取结果。
SparkContext.longAccumulator()
SparkContext.doubleAccumulator()
example: val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=> accum.add(x))
accum.value