Spark 是一个依托于 Hadoop 生态的分布式内存计算框架,在吸收了 Hadoop MapReduce 优点的基础上提出以 RDD 数据表示模型,将中间数据放到内存,用于迭代运算,适用于实时计算,交互式计算场景。

什么是 Spark

简单的讲是一个通用计算引擎。

  • A fast and general engine for large-scale data processing
  • An open source implementation of Resilient Distributed Datasets (RDD)
  • Support cyclic data flow and in-memory computing

快速入门

性能对比

内存中比 Hadoop MapReduce 快 100 倍,磁盘中快 10 倍。

几个名词

  • MapReduce 分布式数据处理模式和执行环境,运行于大型商用机集群
  • HDFS 分布式文件系统
  • HBase 分布式,列存储数据库,HBase 使用 HDFS 作为底层存储,同时支持 MapReduce 批量计算和点查询
  • Zookeeper 分布式、高可用协调服务,提供分布式锁之类的基本服务用于构建分布式应用
  • Hive 分布式数据仓库,Hive 管理 HDFS 中存储的数据,并提供基于 SQL 的查询语言(运行时翻译为 MapReduce 作业)用以查询数据

基本组件

Spark 是一个用来实现快速而通用的集群计算的平台。

  • Spark core 主要功能 RDD 相关 API
  • Spark SQL Spark 用来操作结构化数据的程序包
  • Spark Streaming 是 Spark 提供的对实时数据进行流式计算的组件
  • MLlib Spark 中用来进行机器学习和数学建模的软件包
  • GraphX Spark 中进行图计算的函数库
  • Cluster Managers 管理集群节点的平台,包括 YARN,Mesos 和 Standalone Scheduler

RDD 弹性分布式数据集

RDD 是 Spark 的核心概念,具有容错机制可以被并行操作的元素集合。RDD 可以通过并行化(parallelizing) 一个存在于 driver program 中的集合,或者引用外部存储系统上的数据集,比如共享文件系统,HDFS,HBase 或者任何提供了 Hadoop InputFormat 的数据源。

RDDs can be roughly viewed as partitioned, locality aware distributed vectors

基本使用

通过[官方快速入门文档][1],这个文档主要通过本地 shell 运行一些例子,通过例子可以初步的学习怎么使用 Spark,用 Spark 的思维方式思考。

看过基本的 Spark 之后可以进一步了解 [RDD 官方指南][2],

本机运行

Spark 本身是用 Scala 写的,运行在 Java 虚拟机 (JVM) 上。要在你的电脑或集群上运行 Spark, 你要做的准备工作只是安装 Java 6 或者更新的版本。如果你希望使用 Python 接口,你还需要一个 Python 解释器 (2.6 以上版本)。

每一个 Spark 应用都由一个驱动器(driver program)来发起集群上的各种并行操作。driver program 包含应用 main 函数,定义集群上的分布式数据集。driver program 通过 SparkContext 对象访问 Spark,这个对象代表对计算集群的一个连接。

shell 中已经默认创建了一个 SparkContext 对象 sc,SparkContext 可以用来创建 RDD。driver program 通常要管理多个执行器 (executor)。

实例

Scala 版本

val sparkConf = new SparkConf().setAppName("wordcount")
val sc = new SparkContext(sparkConf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val wordsCount = words.map(x=>(x,1))
val counts = wordsCount.reduceByKey(_ + _)
val result = counts.collect()

Fluent Style:

val result = sc.textFile("hdfs://...")
               .flatMap(_.split(" "))
               .map(x=>(x,1))
               .reduceByKey(_ + _)
               .collect()

RDD

Spark 中的 RDD 是一个不可变的分布式对象集合,每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

RDD 支持两种操作:转化(transforming)和动作(action)。转化操作会由一个 RDD 生成一个新 RDD,比如 map(), filter() 等:

linesWithJava = lines.filter(lambda line: "Java" in line)

动作操作(action)会对 RDD 计算结果,并将结果返回到 driver program 中,或者把结果存储到外部系统(HDFS 等)中,比如 first(), count() 等:

linesWithJava.first()

创建 RDD

Spark 提供如下方式创建 RDD:

  • 读取外部数据集
  • driver program 中对集合并行化

最简单的方式就是将集合传给 SparkContext 的 parallilize() 方法,这种方法需要将整个数据集存放到一台机器中,所以只适合学习使用。或者使用 textFile() 来从外部存储中读取。

RDD 操作

filter() 操作不会改变已有 inputRDD 数据,会返回一个全新的 RDD。

RDD transforming 操作都是惰性求值,被调用 action 操作之前 Spark 不会开始计算。不应当将 RDD 看做存放特定数据的数据集,最好把 RDD 当作通过转化操作构建出来的,记录如何计算数据的指令列表。把数据读取到 RDD 同样也是惰性的。当调用 sc.textFile() 时,数据并没有真正的被读取。

针对集合中每个元素的操作

最常用的就是 filter 和 map。filter 接受一个函数,将 RDD 中满足该函数的值放入新的 RDD,map 接受一个函数,函数作用于每一个元素,函数返回值作为结果。

如果希望对每一个输入的元素输出多个元素,该功能操作叫做 flatMap(),提供给 flatMap() 的函数分别应用到输入的每一个元素,返回的是一个返回值序列的迭代器。输出的 RDD 不是由迭代器组成,而是一个包含各个迭代器可访问的所有元素的 RDD。 flatMap 最简单的用途,将输入的字符串切分为单词。

Scala

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // 返回"hello"

flatMap 可以看作将返回的迭代器“压扁”,得到一个由各个列表中元素组成的 RDD,而不是一个由列表组成的 RDD。

伪集合操作

RDD 本身不是严格意义上的集合,但也支持许多数学意义的集合操作,比如合并和相交。需要注意的是 RDD 具有相同的数据类型。

  • rdd.distinct() 转化为唯一元素的 RDD
  • rdd.union(rdd2) 并集
  • rdd.intersection(rdd2) 交集
  • rdd.subtract(rdd2) rdd-rdd2
  • rdd.cartesian(rdd2) 笛卡尔积

action 操作

常见的 reduce,接收一个函数,函数操作两个元素并返回一个同样类型的元素。

fold() 和 reduce() 类似,接收一个与 reduce() 相同签名的函数,加上一个初始值来作为每一个分区第一次调用的结果。

aggregate() 函数不要求返回值类型必须和输入的 RDD 类型相同。比如用 aggregate 来计算 RDD 的平均值

Scala:

val result = input.aggregate((0, 0))(
    (acc, value) => (acc._1 + value, acc._2 + 1),
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
val avg = result._1 / result._2.toDouble

collect() 可以将数据返回到 driver program 中,collect() 需要将数据复制到 driver program 所以要求所有数据都必须能一同放到单台机器内存中。

take(n) 返回 RDD 中 n 个元素,并且尝试只访问尽量少的分区,会得到一个不均衡的集合。

如果数据定义了顺序,可以使用 top() 从 RDD 中获取前几个元素,top 会使用默认的顺序,也可以提供比较函数。

takeSample(withReplacement, num, seed) 函数可以从数据中获取采样,并指定是否替换。

foreach() 操作对 RDD 中每个元素进行操作,而不需要把 RDD 发回本地。

函数名 目的 示例 结果
collect() 返回 rdd 中的所有元素 rdd.collect() {1, 2, 3, 3}
count() rdd 中的元素个数 rdd.count() 4
countByValue() 各元素在 rdd 中出现的次数 rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) 从 rdd 中返回 num 个元素 rdd.take(2) {1, 2}
top(num) 从 rdd 中返回最前面的 num 个元素 rdd.top(2) {3, 3}
takeOrdered(num)(ordering) 从 rdd 中按照提供的顺序返回最前面的 num 个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSample(withReplacement, num, [seed]) 从 rdd 中返回任意一些元素 rdd.takeSample(false, 1) 非确定
reduce(func) 并行整合 rdd 中所有数据 rdd.reduce((x, y) => x + y) 9
fold(zero)(func) 和 reduce() 一 样,但是需要提供初始值 rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) 和 reduce() 相似,但是通常返回不同类型的函数 rdd.aggregate((0, 0))((x, y) =>(x._1 + y, x._2 + 1),(x, y) => (x._1 + y._1, x._2 + y._2)) (9,4)
foreach(func) 对 rdd 中的每个元素使用给定的函数 rdd.foreach(func)

Spark SQL

Spark SQL 用来操作结构化和半结构化的数据,Spark SQL 提供:

  • 从结构化数据源(JSON,Hive,Parquet) 中读取数据
  • 不仅支持 Spark 内部使用 SQL 查询,也支持从外部标准 JDBC/ODBC 连接中进行查询
  • Spark SQL 支持 SQL 与常规 Python/Java/Scala 整合,包括连接 RDD 和 SQL 表、公开的自定义 SQL 函数接口等等

Spark SQL 提供特殊的 RDD,SchemaRDD,存放 Row 对象 RDD,每个 Row 对象代表一行记录。

Spark Stream

很多应用需要实时计算,比如可能有些应用需要实时追踪 page view, 然后将数据给 machine learning model 训练,来自动检测异常。Spark Streaming 就是提供这样的功能。

Spark 建立在 RDD 基础上, Spark Streaming 提供了 DStreams 抽象,叫做 discretized streams。DStreams 是一个不断输入的序列数据。在内部,每一个 DStreams 都是时间序列的 RDD,离散的数据。

Spark 运行方式

  • 单机运行
  • 伪分布式运行
  • 分布式运行

在集群中运行 Spark

Spark 可以在各种各样的集群管理器(Hadoop YRAN,Apache Mesos )中运行。分布式环境下,Spark 集群采用主从结构,一个节点负责中央协调,中央协调节点被称为驱动器 (driver)节点,工作节点被称为执行器(executor)节点,驱动器节点可以和大量执行器节点进行通信,作为独立 Java 进程运行。驱动器节点和执行节点一起被称为一个 Spark 应用(application)。

驱动器节点

驱动器是执行程序 main() 方法的进程。它执行用户编写的 SparkContext,创建 RDD,RDD 转化和行动操作的代码。

驱动器程序在 Spark 下职责:

  • 把用户程序转为任务,Spark 把逻辑转化为步骤(stage),每个步骤由多个任务组成,任务被打包送到集群,任务是 Spark 中最小的工作单位
  • 为执行器节点调度任务,Spark 驱动器程序吧任务基于数据所在位置分配给合适的执行器进程,驱动器进程会跟踪缓存数据位置,利用这些位置信息来调度任务,减少数据的网络传输

执行器节点

执行器负责 Spark 作业运行任务,任务相互独立。执行器进程的作用:

  • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
  • 通过自身的块管理器(block Manager)为用户程序中要求缓存的 RDD 提供内存存储

spark-submit 提供工具

Spark 为各种集群管理器提供了统一的工具来提交作业,这个工具是 spark-submit

bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py
  • --master 标记指定要连接的集群 URL,spark:// 表示集群使用独立模式
  • --executor-memory 执行器进程使用的内存量,以字节为单位

Java 和 Scala 可以通过 spark-submit –jars 选项来提交独立的 JAR 包,因为通常都有非常复杂的依赖树,手动维护和提交全部的依赖太过麻烦,常规的做法是通过构建工具,生成单个大 JAR 包,包含应用所有的传递依赖。这个通常被称为超级 JAR(uber)或者组合 JAR(assembly)。Java 和 Scala 最广泛的构建工具是 Maven 和 sbt。

reference