Spark中的常用操作

Spark的重要性就不赘述了,这里重点介绍spark中的常用api(scala)

##1. 环境准备 从spark官网下载一个bin版本,比如我这里下载的是 spark-1.1.0-bin-hadoop2.3,因为练习,所以版本什么的不是那么重要。

##2. 前期知识

a. 需要了解基本的scala语法,可在scala的官网(http://www.scala-lang.org)查阅。 不过,如果只是为了了解这些api干嘛用,倒是不用在意这些语法,看懂就行。

b. spark的核心rdd

RDD => Resilient Distributed Data Sets 持久化分布式数据集。听起来很酷,到底是什么意思呢?

一个比较靠谱点的解释是这样的:

RDD本质上是一个只读的数据结构。一个RDD可以包含多个分区,每个分区就是一个dataset片段。RDD可以相互依赖。如果RDD的每个分区最多只能被一个child RDD 分区使用,称为narrow dependency。不同的操作依赖其特性,可能会产生不同的依赖。例如map操作会产生narrow dependency, 而reduceByKey操作则产生shuffle(wide) dependency。

narrow dependency可以支持在同一个cluster node上以pipeline形式执行多条命令。

narrow dependency的一个例子是map操作,union也是。

shuffle dependency的例子是groupBy, join等

也许我们从一个实际会碰到的问题来思考它会更容易一点。 考虑这样一种场景,我需要分析文件的内容(比如一个日志文件),最简单的思路就是读取文件。开始的时候,一切正常,但是随着文件内容的增大,需要的内存越来越多。 于是,我可以有以下选择: 选择1,换个更牛B的机器 选择2,把日志切开,切到我能把文件都读取到内存当中。

选择1在某些时候是解决问题的最快途径,但是跟技术无关。

选择2,如果使用单个机器来处理,就是操作系统里的常用策略,比如向磁盘写数据,通常是一块块来写的。如果为了让速度快一些,使用多个机器来处理,就是分布式。

好,我们继续回归主题。刚才上面提到了切分,那么,怎么切,使用怎样的粒度来切呢? 这就是Spark的RDD帮我们做的事情了。通常情况下,切到最细的粒度,分配给多个节点。

有了这一层抽象后,上层(使用者)就可以不关心底层是什么样的。我可以一次读入非常大的文件,而不用关心内存是否够用。

##3. spark在日志处理中的用法

// 让文件进入spark(实际上并未开始执行)
val lines = spark.􏰔􏰌􏰱􏰔􏰢􏰓􏰊􏰌􏰑􏰽􏰕􏰋􏰧􏰗􏰶􏰾􏰾􏰳􏰳􏰳􏰿􏰙textFile("hdfs://...")

// 􏰊􏰓􏰍􏰌􏰗􏰳􏰧􏰓􏰊􏰔􏰌􏰏􏰑􏱀􏰳􏰗􏰔􏰎􏰏􏰔􏰗􏰴􏰓􏰔􏰕􏰑􏰽􏰄􏰇􏰇􏰫􏰇􏰿􏰙􏰙过滤所有以Error开头的行(也没有真正执行)
val errors = lines.filter(_.startsWith("ERROR"))

// 取得ERROR 后面的正文(依然没有开始执行)
val messages = errors.map(_.split('\t')(2))

// 总算执行了
messages.filter(_.contains("mysql")).count()

4. spark的常用操作

union 联合操作,和RDD的方法++一致,操作基本和List的加法是一样的。 a union b 基本等同 a.collect.toList + b.collect.toList

val rdd1 = sc.parallize(List((‘a’,1), (‘b’,2))) val rdd2 = sc.parallize(List((‘a’,1), (‘a’,3))) val result = rdd1 union rdd2 result collect Array[(Char, Int)] = Array((a,1), (b,2), (a,1), (a,3))

Published: June 09 2015

blog comments powered by Disqus