Spark是一个分布式计算系统/组件/平台,这是都知道的,其用Scala实现Spark任务也是最原生的,但万万不能认为只要是在Spark环境下执行的Scala代码都是分布式执行的,这是大错特错的,一开始一直有错误的认识,但现在想想,如果拿JavaHadoop的关系来作对比,其就很容易理解了。

思维纠正

  • Java&Hadoop的关系
    • Java是独立的语言,Hadoop本身由Java实现,可以由Java调用;
    • Java编写的一般代码不能够分布式执行,缺少计算模型的支持;
    • Java调用Hadoop实现的具体类方法(如MapperReducer)实现的代码可以在Hadoop之上分布式执行;

同理,

  • Scala&Spark的关系
    • Scala是独立的语言,Spark本身由Scala实现,可以由Scala调用;
    • Scala编写的一般代码不能够分布式执行,缺少计算模型的支持;
    • Scala调用Spark实现的具体类方法(如Pregel)实现的代码可以在Spark之上分布式执行;

另外值得注意的是,SparkRDDTransformAction操作也都可以分布式执行,这里可以理解为RDD内部的各种算子操作都是基于分布式设计的。除此之外的诸如使用scala基本数据类型实现的代码,都是不能分布式执行的(sacla本身的不可变特性和能不能分布式执行没有关系)。

纠错场景

文件的读写

如果调用java.util.File来进行文件写入,Local模式自然是没有问题,但是集群分布式运行时,必须先执行collect操作来取回数据到本地,这就造成一个问题,假如在100个节点的集群中执行任务,现在要将文件写入到Linux文件系统,这本身就很搞笑,这样做的后果是,写操作在某个节点上被触发,全部数据都被收集到这个节点,然后此Worker将数据写入到本地,注意,这里的本地就是该Worker所在的节点,如果使用者要查看结果,那么他必须去到该节点的文件系统中查看。

上述就是为什么Spark运行时要将输出写入hdfs的原因,对于hdfs来说,其对于使用者来说就变成了一个存储环境,使用者无需关心数据具体哪部分存在哪个节点上。

所以,对于有写文件操作的代码在提交分布式执行时,切记检查是否调用的java.util.File.

对象的遍历

这是最具迷惑性的部分,一开始写Spark代码时可能会在其中充斥着ListMap等等操作对象,更有甚者甚至引用java.util.List,并且希望在循环中对其进行更新,这在本地模式时显然也是正确的,但是其显然也不是分布式执行的代码。

那么,如果我就想维护一个大的K,V结构,并且想分布式执行地更新,那应该怎么做,答案是使用RDD,当然可以使用之前说过的IndexedRDD,这都是RDD做的封装,可能用起来非常别扭,比如由于不可变性,必须每次迭代都重新创建新的RDD等。

正确的分布式执行代码

到底什么才是正确的正规的分布式执行代码呢,其实一句话就可以概括,那就是全部逻辑都用RDD操作实现,即如果有个单机串行算法要分布式并行化,如果目标是在Spark上运行,那么最好的方式就是将原算法中的全部逻辑用RDD的操作来实现。

比如,原算法要求度分布,串行版本的程序一般需要遍历一遍数据集并且维护多个集合来完成,那么在这里,只需要句行代码:

1
2
val maxd = rdd.map( x => { (x._2,1) } )
.reduceByKey( (a,b) => a+b ).sortBy(x => x._1)

mapreduceByKeysortBy都是RDD的操作,具体特性以后在细说,这里的实现由于是建立在RDD之上,所以其可以被分布式执行,即原数据量巨大时,其内部实现会令其分发到多个节点的worker进行计算,计算完毕后的结果仍然存储在一个分布式内存数据集RDD中。

后续会继续对RDD的各种操作进行分析。