当前位置:首页 » 服务存储 » rdd默认存储在磁盘吗
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

rdd默认存储在磁盘吗

发布时间: 2022-04-21 13:52:50

⑴ 到底spark 缓存机制怎么用

Spark 中一个很重要的能力是将数据persisting持久化(或称为caching缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。
RDD 可以使用persist()方法或cache()方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。
另外,每个持久化的 RDD 可以使用不同的storage level存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个StorageLevel对象 (Scala,Java,Python) 给persist()方法进行设置。cache()方法是使用默认存储级别的快捷设置方法,默认的存储级别是StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)

⑵ hadoop和spark的区别

1、解决问题的层面不一样

首先,Hadoop和Apache Spark两者都是大数据框架,但是各自存在的目的不尽相同。Hadoop实质上更多是一个分布式数据基础设施:它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,意味着您不需要购买和维护昂贵的服务器硬件。
同时,Hadoop还会索引和跟踪这些数据,让大数据处理和分析效率达到前所未有的高度。Spark,则是那么一个专门用来对那些分布式存储的大数据进行处理的工具,它并不会进行分布式数据的存储。

2、两者可合可分

Hadoop除了提供为大家所共识的HDFS分布式数据存储功能之外,还提供了叫做MapRece的数据处理功能。所以这里我们完全可以抛开Spark,使用Hadoop自身的MapRece来完成数据的处理。

相反,Spark也不是非要依附在Hadoop身上才能生存。但如上所述,毕竟它没有提供文件管理系统,所以,它必须和其他的分布式文件系统进行集成才能运作。这里我们可以选择Hadoop的HDFS,也可以选择其他的基于云的数据系统平台。但Spark默认来说还是被用在Hadoop上面的,毕竟,大家都认为它们的结合是最好的。

以下是从网上摘录的对MapRece的最简洁明了的解析:

  • 我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

  • 现在我们到一起,把所有人的统计数加在一起。这就是“Rece”。

3、Spark数据处理速度秒杀MapRece

Spark因为其处理数据的方式不一样,会比MapRece快上很多。MapRece是分步对数据进行处理的: ”从集群中读取数据,进行一次处理,将结果写到集群,从集群中读取更新后的数据,进行下一次的处理,将结果写到集群,等等…“ Booz Allen Hamilton的数据科学家Kirk Borne如此解析。
反观Spark,它会在内存中以接近“实时”的时间完成所有的数据分析:“从集群中读取数据,完成所有必须的分析处理,将结果写回集群,完成,” Born说道。Spark的批处理速度比MapRece快近10倍,内存中的数据分析速度则快近100倍。
如果需要处理的数据和结果需求大部分情况下是静态的,且你也有耐心等待批处理的完成的话,MapRece的处理方式也是完全可以接受的。
但如果你需要对流数据进行分析,比如那些来自于工厂的传感器收集回来的数据,又或者说你的应用是需要多重数据处理的,那么你也许更应该使用Spark进行处理。
大部分机器学习算法都是需要多重数据处理的。此外,通常会用到Spark的应用场景有以下方面:实时的市场活动,在线产品推荐,网络安全分析,机器日记监控等。

4、灾难恢复

两者的灾难恢复方式迥异,但是都很不错。因为Hadoop将每次处理后的数据都写入到磁盘上,所以其天生就能很有弹性的对系统错误进行处理。
Spark的数据对象存储在分布于数据集群中的叫做弹性分布式数据集(RDD: Resilient Distributed Dataset)中。这些数据对象既可以放在内存,也可以放在磁盘,所以RDD同样也可以提供完成的灾难恢复功能。

⑶ 如何理解spark中RDD和DataFrame的结构

RDD就是一个分布式的无序的列表。
RDD中可以存储任何的单机类型的数据,但是,直接使用RDD在字段需求明显时,存在算子难以复用的缺点。
例如,现在RDD存的数据是一个Person类型的数据,现在要求所有每个年龄段(10年一个年龄段)的人中最高的身高与最大的体重。
使用RDD接口,因为RDD不了解其中存储的数据的具体结构,数据的结构对它而言是黑盒,于是这就需要用户自己去写一个很特化的聚合的函数来完成这样的功能。
而有了DataFrame,则框架会去了解RDD中的数据是什么样的结构的,用户可以说清楚自己对每一列进行什么样的操作,这样就有可能可以实现一个算子,用在多个列上,比较容易进行算子的复用。甚至,未来又要同时求出每个年龄段内不同的姓氏有多少个,则使用RDD接口,之前的函数需要改动很大才能满足需求,而使用DataFrame接口,则只需要添加对这一个列的处理,原来的max/min的相关列处理都可保持不变。
总而言之,DataFrame相关接口就是RDD的一个扩展,让RDD了解了RDD中存储的数据包含哪些列,并可以在列上进行操作。

⑷ rdd的定义

RDD(Resilient Distributed Datasets)的定义是: 弹性分布式数据集, 是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。
RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join(转换不是程开发人员在RDD上执行的操作) 。
RDD不需要物化。RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应的RDD分区 。
RDD作为数据结构,本质上是一个只读的分区记录集合。一个RDD可以包含多个分区,每个分区就是一个dataset片段。RDD可以相互依赖。如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为narrow dependency;若多个Child RDD分区都可以依赖,则称之为wide dependency。不同的操作依据其特性,可能会产生不同的依赖。例如map操作会产生narrow dependency,而join操作则产生wide dependency。

⑸ 如何学习Spark大数据

大数据技术,只有相互分享才能彼此共同进步,为什么我们的程序员经常活跃在各大博客和技术论坛?其主要原因是:程序员们并不拒绝分享,甚至是乐于去贡献代码。身为一个程序员,特别值得他们骄傲的事情就是自己写的代码被别人用到而带来的成就感。
今天,为我们分享了当今火爆的大数据技术,讲解了spark技术的核心,我们可以不从事数据分析行业,但国家的一些技术还是要了解的。
Spark核心概念Resilient Distributed Dataset (RDD)弹性分布数据集
RDD是Spark的基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark特别核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapRece大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
RDD的特点:
1、它是在集群节点上的不可变的、已分区的集合对象。
2、用并行转换的方式来创建如(map, filter, join, etc)。
3、失败自动重建。
4、可以控制存储级别(内存、磁盘等)来进行重用。
5、必须是可序列化的。
5、是静态类型的。
RDD的好处:
1、RDD只能从持久存储或经过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
2、RDD的不变性,可以实现类Hadoop MapRece的推测式执行。
3、RDD的数据分区特性,可以用数据的本地性来提高性能,这与Hadoop MapRece是一样的。
4、RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapRece。
RDD的存储与分区
1、用户可以选择不同的存储级别存储RDD以便重用。
2、当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
3、RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
RDD的内部表示
在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:
1、分区列表(数据块列表)
2、计算每个分片的函数(根据父RDD计算出此RDD)
3、对父RDD的依赖列表
4、对key-value RDD的Partitioner【可选】
5、每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】
大数据是互联网发展的方向,大数据人才是未来的高薪贵族。随着大数据人才的供不应求,大数据人才的薪资待遇也在不断提升。

⑹ 为什么说rdd是不变的数据结构存储

首选你要知道什么是RDD;
什么是RDD
RDD的全称是“弹性分布式数据集”(Resilient Distributed Dataset)。首先,它是一个数据集,就像Scala语言中的Array、List、Tuple、Set、Map也是数据集合一样,但从操作上看RDD最像Array和List,里面的数据都是平铺的,可以顺序遍历。而且Array、List对象拥有的许多操作RDD对象也有,比如flatMap、map、filter、rece、groupBy等。
其次,RDD是分布存储的。里面的成员被水平切割成小的数据块,分散在集群的多个节点上,便于对RDD里面的数据进行并行计算。
最后,RDD的分布是弹性的,不是固定不变的。RDD的一些操作可以被拆分成对各数据块直接计算,不涉及其他节点,比如map。这样的操作一般在数据块所在的节点上直接进行,不影响RDD的分布,除非某个节点故障需要转换到其他节点上。但是在有些操作中,只访问部分数据块是无法完成的,必须访问RDD的所有数据块。比如groupBy,在做groupBy之前完全不知道每个key的分布,必须遍历RDD的所有数据块,将具有相同key的元素汇聚在一起,这样RDD的分布就完全重组,而且数量也可能发生变化。此外,RDD的弹性还表现在高可靠性上。

⑺ scala 中rdd类型用什么头文件

1.RDD介绍:

RDD,弹性分布式数据集,即分布式的元素集合。在spark中,对所有数据的操作不外乎是创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化。

Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。

用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序中分发驱动器程序中的对象集合,比如list或者set。

RDD的转化操作都是惰性求值的,这意味着我们对RDD调用转化操作,操作不会立即执行。相反,Spark会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD看做存放着特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。数据读取到RDD中的操作也是惰性的,数据只会在必要时读取。转化操作和读取操作都有可能多次执行。

2.创建RDD数据集

(1)读取一个外部数据集

val input=sc.textFile(inputFileDir)

(2)分发对象集合,这里以list为例

val lines =sc.parallelize(List("hello world","this is a test"));

3.RDD操作

(1)转化操作

实现过滤器转化操作:

val lines =sc.parallelize(List("error:a","error:b","error:c","test"));
val errors=lines.filter(line => line.contains("error"));
errors.collect().foreach(println);

输出:

error:a

error:b

error:c

可见,列表list中包含词语error的表项都被正确的过滤出来了。

(2)合并操作

将两个RDD数据集合并为一个RDD数据集

接上述程序示例:

val lines =sc.parallelize(List("error:a","error:b","error:c","test","warnings:a"));
val errors=lines.filter(line => line.contains("error"));
val warnings =lines.filter(line => line.contains("warnings"));
val unionLines =errors.union(warnings);
unionLines.collect().foreach(println);

输出:

error:a

error:b

error:c

warning:a

可见,将原始列表项中的所有error项和warning项都过滤出来了。

(3)获取RDD数据集中的部分或者全部元素

①获取RDD数据集中的部分元素 .take(int num) 返回值List<T>

获取RDD数据集中的前num项。

/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T]

程序示例:接上

unionLines.take(2).foreach(println);

输出:

error:a

error:b

可见,输出了RDD数据集unionLines的前2项

②获取RDD数据集中的全部元素 .collect() 返回值 List<T>

程序示例:

val all =unionLines.collect();
all.foreach(println);

遍历输出RDD数据集unionLines的每一项

4.向spark传递函数

在scala中,我们可以把定义的内联函数、方法的引用或静态方法传递给Spark,就像Scala的其他函数式API一样。我们还要考虑其他一些细节,必须所传递的函数及其引用的数据需要是可序列化的(实现了Java的Serializable接口)。除此之外,与Python类似,传递一个对象的方法或者字段时,会包含对整个对象的引用。我们可以把需要的字段放在一个局部变量中,来避免包含该字段的整个对象。

class searchFunctions (val query:String){
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchFunctionReference(rdd: RDD[String]) :RDD[String]={
//问题: isMach表示 this.isMatch ,因此我们需要传递整个this
rdd.filter(isMatch)
}
def getMatchesFunctionReference(rdd: RDD[String]) :RDD[String] ={
//问题: query表示 this.query ,因此我们需要传递整个this
rdd.flatMap(line => line.split(query))
}
def getMatchesNoReference(rdd:RDD[String]):RDD[String] ={
//安全,只把我们需要的字段拿出来放入局部变量之中
val query1=this.query;
rdd.flatMap(x =>x.split(query1)
)
}


}

5.针对每个元素的转化操作:

转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应的元素。关键词:转化

转化操作filter()接受一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。关键词:过滤

示例图如下所示:

RDD1.cartesian(RDD2)

返回两个RDD数据集的笛卡尔集

程序示例:生成RDD集合{1,2} 和{1,2}的笛卡尔集

val rdd1=sc.parallelize(List(1,2));
val rdd2=sc.parallelize(List(1,2));
val rdd=rdd1.cartesian(rdd2);
println(rdd.collect().mkString(" "));

输出:

(1,1)

(1,2)

(2,1)

(2,2)

7.行动操作

(1)rece操作

rece()接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD进行累加。使用rece(),可以很方便地计算出RDD中所有元素的总和,元素的个数,以及其他类型的聚合操作。

以下是求RDD数据集所有元素和的程序示例:

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.rece((x,y) =>x+y);
println(results);

输出:55

(2)fold()操作

接收一个与rece()接收的函数签名相同的函数,再加上一个初始值来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作的单位元素,也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如+对应的0,*对应的1,或者拼接操作对应的空列表)。

程序实例:

①计算RDD数据集中所有元素的和:

zeroValue=0;//求和时,初始值为0。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(0)((x,y) =>x+y);
println(results);

②计算RDD数据集中所有元素的积:

zeroValue=1;//求积时,初始值为1。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(1)((x,y) =>x*y);
println(results);

(3)aggregate()操作

aggregate()函数返回值类型不必与所操作的RDD类型相同。

与fold()类似,使用aggregate()时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

以下是程序实例:

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val result=rdd.aggregate((0,0))(
(acc,value) =>(acc._1+value,acc._2+1),
(acc1,acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)
)
val average=result._1/result._2;
println(average)

输出:5

最终返回的是一个Tuple2<int,int>对象, 他被初始化为(0,0),当遇到一个int值时,将该int数的值加到Tuple2对象的_1中,并将_2值加1,如果遇到一个Tuple2对象时,将这个Tuple2的_1和_2的值归并到最终返回的Tuple2值中去。

表格:对一个数据为{1,2,3,3}的RDD进行基本的RDD行动操作

函数名 目的 示例 结果

collect() 返回RDD的所有元素 rdd.collect() {1,2,3,3}

count() RDD的元素个数 rdd.count() 4

countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1,1),
(2,1),
(3,2)
}

take(num) 从RDD中返回num个元素 rdd.take(2) {1,2}

top(num) 从RDD中返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering) {3,3}

takeOrdered(num)
(ordering) 从RDD中按照提供的顺序返回最前面的num个元素
rdd.takeSample(false,1) 非确定的

takeSample(withReplacement,num,[seed]) 从RDD中返回任意一些元素 rdd.takeSample(false,1) 非确定的

rece(func) 并行整合RDD中所有数据 rdd.rece((x,y) => x+y)
9

fold(zero)(func) 和rece()一样,但是需要提供初始值 rdd.fold(0)((x,y) => x+y)
9

aggregate(zeroValue)(seqOp,combOp) 和rece()相似,但是通常返回不同类型的函数 rdd.aggregate((0,0))
((x,y) =>
(x._1+y,x._2+1),
(x,y)=>
(x._1+y._1,x._2+y._2)
) (9,4)

foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func) 无

8.持久化缓存

因为Spark RDD是惰性求值的,而有时我们希望能多次使用同一个RDD。如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。

为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。

出于不同的目的,我们可以为RDD选择不同的持久化级别。默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中

不同关键字对应的存储级别表

级别

使用的空间

cpu时间

是否在内存

是否在磁盘

备注

MEMORY_ONLY

直接储存在内存

MEMORY_ONLY_SER

序列化后储存在内存里

MEMORY_AND_DISK

中等

部分

部分

如果数据在内存中放不下,溢写在磁盘上

MEMORY_AND_DISK_SER

部分

部分

数据在内存中放不下,溢写在磁盘中。内存中存放序列化的数据。

DISK_ONLY

直接储存在硬盘里面

程序示例:将RDD数据集持久化在内存中。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).persist(StorageLevel.MEMORY_ONLY);
println(rdd.count())
println(rdd.collect().mkString(","));

RDD还有unpersist()方法,调用该方法可以手动把持久化的RDD从缓存中移除。

9.不同的RDD类型

在scala中,将RDD转为由特定函数的RDD(比如在RDD[Double]上进行数值操作),是由隐式转换来自动处理的。这些隐式转换可以隐式地将一个RDD转为各种封装类,比如DoubleRDDFunctions(数值数据的RDD)和PairRDDFunctions(键值对RDD),这样我们就有了诸如mean()和variance()之类的额外的函数。

示例程序:

val rdd=sc.parallelize(List(1.0,2.0,3.0,4.0,5.0));
println(rdd.mean());

其实RDD[T]中并没有mean()函数,只是隐式转换自动将其转换为DoubleRDDFunctions。

⑻ 如果中间输出RDD在内存放不下会怎么样

一般来讲,对于陌生的名词,大家的第一个反应都是“What is it?”.
RDD是Spark的核心内容,在Spark的官方文档中解释如下:RDD is a fault-tolerant collection of elements that can be operated on in parallel.由此可见,其中有两个关键词:fault-tolerant & in parallel.首先,容错性是RDD的一个重要特性;其次,它是并行计算的数据.
RDD的中文解释为:弹性分布式数据集,全称Resilient Distributed Datasets.宾语是dataset,即内存中的数据库.RDD 只读、可分区,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用.所谓弹性,是指内存不够时可以与磁盘进行交换.这涉及到了RDD的另一特性:内存计算,就是将数据保存到内存中.同时,为解决内存容量限制问题,Spark为我们提供了最大的自由度,所有数据均可由我们来进行cache的设置,包括是否cache和如何cache.
如果看到这里,你的思维里对RDD还是没有任何概念的话,或许可以参照我的形象化理RDD,就是一个被武装起来的数据集.
主体:a、由源数据分割而来,源码中对应splits变量;
武器有下:b、数据集体内包含了它本身的“血统”信息,即dependencies变量,存储着它的父RDD及两者关系;
c、计算函数,即其与父RDD的转化方式,对应源码中的iterator(split) & compute函数;
d、一些关于如何分块以及如何存放位置的元信息,eg:partitioner & preferredLocations.
有了这些武器,RDD的容错机制也就显而易见了.容错,顾名思义就是在存在故障的情况下,计算机系统仍能正常工作.容错通常有两种方式 checkpoint 和logging update ,RDD 采用的是 logging update .Checkpoint( 数据检查点)意味着要在各个机器间复制大数据,花费会很高,这种拷贝操作相当缓慢,而且会消耗大量的存储资源,因此deserted.Logging update( 记录更新),仅支持粗颗粒度变换,也就是说,仅记录在单个块上执行的单个操作,然后创建某个RDD的变换序列存储下来,数据丢失时,就可通过“血统”重新计算,恢复数据.Nevertheless,血缘链(变换序列)变得很长时,建议用户此时建立一些数据检查点加快容错速度.(saveAstextFile方法手动设置)

⑼ 6何为伯克利数据分析栈BDASMP3

所谓Spark是起源于美国加州大学伯克利分校AMPLab的大数据计算平台,在2011年开源,目前是Apache软件基金会的顶级项目。随着Spark在大数据计算领域的暂露头角,越来越多的企业开始关注和使用。2014年11月,Spark在Daytona Gray Sort 100TB Benchmark竞赛中打破了由Hadoop MapRece保持的排序记录。Spark利用1/10的节点数,把100TB数据的排序时间从72分钟提高到了23分钟。
Spark在架构上包括内核部分和4个官方子模块
Spark SQL
Spark Streaming
机器学习库MLlib
图计算库GraphX

由Spark在伯克利的数据分析软件栈BDAS(Berkeley Data Analytics Stack)中的位置可见,Spark专注于数据的计算,而数据的存储在生产环境中往往还是由Hadoop分布式文件系统HDFS承担。

Spark在BDAS中的位置
Spark被设计成支持多场景的通用大数据计算平台,它可以解决大数据计算中的批处理,交互查询及流式计算等核心问题。Spark可以从多数据源的读取数据,并且拥有不断发展的机器学习库和图计算库供开发者使用。数据和计算在Spark内核及Spark的子模块中是打通的,这就意味着Spark内核和子模块之间成为一个整体。Spark的各个子模块以Spark内核为基础,进一步支持更多的计算场景,例如使用Spark SQL读入的数据可以作为机器学习库MLlib的输入。以下列举了一些在Spark平台上的计算场景。

Spark的应用场景举例

之前在大数据概述的课程中我们提到了Hadoop,大数据工程师都非常了解Hadoop MapRece一个最大的问题是在很多应用场景中速度非常慢,只适合离线的计算任务。这是由于MapRece需要将任务划分成map和rece两个阶段,map阶段产生的中间结果要写回磁盘,而在这两个阶段之间需要进行shuffle操作。Shuffle操作需要从网络中的各个节点进行数据拷贝,使其往往成为最为耗时的步骤,这也是Hadoop MapRece慢的根本原因之一,大量的时间耗费在网络磁盘IO中而不是用于计算。在一些特定的计算场景中,例如像逻辑回归这样的迭代式的计算,MapRece的弊端会显得更加明显。

那Spark是如果设计分布式计算的呢?首先我们需要理解Spark中最重要的概念--弹性分布数据集(Resilient Distributed Dataset),也就是RDD。

关键词:弹性分布数据集RDD

RDD是Spark中对数据和计算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可变的并能够被并行操作的数据集合。对RDD的操作分为两种transformation和action。Transformation操作是通过转换从一个或多个RDD生成新的RDD。Action操作是从RDD生成最后的计算结果。在Spark最新的版本中,提供丰富的transformation和action操作,比起MapRece计算模型中仅有的两种操作,会大大简化程序开发的难度。

RDD的生成方式只有两种,一是从数据源读入,另一种就是从其它RDD通过transformation操作转换。一个典型的Spark程序就是通过Spark上下文环境(SparkContext)生成一个或多个RDD,在这些RDD上通过一系列的transformation操作生成最终的RDD,最后通过调用最终RDD的action方法输出结果。
每个RDD都可以用下面5个特性来表示,其中后两个为可选的:
分片列表(数据块列表)
计算每个分片的函数
对父RDD的依赖列表
对key-value类型的RDD的分片器(Partitioner)(可选)
每个数据分片的预定义地址列表(如HDFS上的数据块的地址)(可选)
虽然Spark是基于内存的计算,但RDD不光可以存储在内存中,根据useDisk、useMemory、useOffHeap, deserialized、replication五个参数的组合Spark提供了12种存储级别,在后面介绍RDD的容错机制时,我们会进一步理解。值得注意的是当StorageLevel设置成OFF_HEAP时,RDD实际被保存到Tachyon中。Tachyon是一个基于内存的分布式文件系统,目前正在快速发展,在这里我们就不做详细介绍啦,可以通过其官方网站进一步了解。

DAG、Stage与任务的生成
Spark的计算发生在RDD的action操作,而对action之前的所有transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。

Spark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。举个例子,在下图中,从输入中逻辑上生成A和C两个RDD,经过一系列transformation操作,逻辑上生成了F,注意,我们说的是逻辑上,因为这时候计算没有发生,Spark内核做的事情只是记录了RDD的生成和依赖关系。当F要进行输出时,也就是F进行了action操作,Spark会根据RDD的依赖生成DAG,并从起点开始真正的计算。

逻辑上的计算过程:DAG

有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集,也就是Stage,这样可以将任务提交到计算节点进行真正的计算。Spark计算的中间结果默认是保存在内存中的,Spark在划分Stage的时候会充分考虑在分布式计算中可流水线计算(pipeline)的部分来提高计算的效率,而在这个过程中,主要的根据就是RDD的依赖类型。

根据不同的transformation操作,RDD的依赖可以分为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency,在代码中为ShuffleDependency)两种类型。窄依赖指的是生成的RDD中每个partition只依赖于父RDD(s) 固定的partition。宽依赖指的是生成的RDD的每一个partition都依赖于父 RDD(s) 所有partition。窄依赖典型的操作有map, filter, union等,宽依赖典型的操作有groupByKey, sortByKey等。可以看到,宽依赖往往意味着shuffle操作,这也是Spark划分stage的主要边界。对于窄依赖,Spark会将其尽量划分在同一个stage中,因为它们可以进行流水线计算。

RDD的宽依赖和窄依赖

最后我们再通过下图来详细解释一下Spark中的Stage划分。我们从HDFS中读入数据生成3个不同的RDD,通过一系列transformation操作后再将计算结果保存回HDFS。可以看到这幅DAG中只有join操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage. 同时我们可以注意到,在图中Stage2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,通过map操作生成的partition可以不用等待整个RDD计算结束,而是继续进行union操作,这样大大提高了计算的效率。

Spark中的Stage划分