网站建设的收获体会网站制作案例怎么样
Spark 架构
Spark 是一个基于内存计算的大数据处理框架,相比 Hadoop 的 MapReduce,它能够提供 更高效的迭代计算 和 流式计算能力。Spark 采用 主从架构(Master-Slave),主要包括 Driver、Cluster Manager、Worker、Executor 和 Task 等核心组件。
1. Spark 组件架构
1.1 核心组件
| 组件 | 作用 | 
|---|---|
| Driver(驱动程序) | 负责任务调度,向 Cluster Manager 申请资源,管理 SparkContext。 | 
| Cluster Manager(集群管理器) | 负责资源调度,如 Standalone、YARN、Mesos、Kubernetes。 | 
| Worker(工作节点) | 运行在集群节点上,管理 Executor 进程,执行具体计算任务。 | 
| Executor(执行器) | 由 Worker 启动,执行 Spark 任务,并存储中间计算数据。 | 
| Task(任务) | 运行在 Executor 之上,每个 Stage 被划分为多个 Task 并行执行。 | 
1.2 Spark 架构示意图
+------------------------------------------------------+
|                        Driver                        |
|  - 任务调度                                           |
|  - 运行 SparkContext                                |
|  - 将 Job 划分为多个 Stage                           |
+------------------------------------------------------+| 向集群管理器申请资源V
+------------------------------------------------------+
|                  Cluster Manager                     |
|  - 资源调度                                          |
|  - 可选:Standalone / YARN / Mesos / Kubernetes      |
+------------------------------------------------------+| 分配 Worker 节点V
+----------------+      +----------------+      +----------------+
|    Worker 1    |      |    Worker 2    |      |    Worker 3    |
|  - 启动 Executor  |      |  - 启动 Executor  |      |  - 启动 Executor  |
|  - 执行 Task    |      |  - 执行 Task    |      |  - 执行 Task    |
+----------------+      +----------------+      +----------------+
 
2. Spark 运行模式
Spark 可以运行在不同的集群管理器上:
- Standalone:Spark 自带的资源管理器,简单易用,适合小规模集群。
 - YARN(Hadoop Yarn 集群):适合 Hadoop 生态环境。
 - Mesos(Apache Mesos 集群):适合多租户资源调度。
 - Kubernetes(K8s 集群):适用于云计算和容器化部署。
 
3. Spark 任务执行流程
Spark 任务的执行大致分为以下几个步骤:
3.1 任务提交
- Driver 进程启动 SparkContext,并向 Cluster Manager 申请资源。
 - Cluster Manager 分配 Worker 节点,并在 Worker 上 启动 Executor。
 
3.2 Job 分解
- Driver 将 Job 拆分为多个 Stage(基于 DAG 计算)。
 - 每个 Stage 由多个 Task 组成,并被分配到不同的 Executor 运行。
 
3.3 Task 执行
- Executor 执行 Task,计算数据并存储中间结果(RDD)。
 - Executor 向 Driver 汇报任务执行状态,若失败则重新调度 Task。
 
3.4 结果返回
- 任务执行完成后,Driver 收集最终计算结果,存储到 HDFS、Kafka、MySQL 等。
 
4. Spark 计算模型
Spark 计算任务是基于 RDD(Resilient Distributed Dataset) 和 DAG(有向无环图) 进行调度的。
4.1 RDD(弹性分布式数据集)
RDD 是 Spark 最核心的数据抽象,提供:
- 分区(Partition):数据被分成多个分区,并行计算。
 - 容错性:基于 Lineage(血缘) 记录转换关系,支持自动恢复。
 - 惰性计算:只有在 Action 触发时,RDD 才会真正执行计算。
 
RDD 转换类型:
- Transformation(转换):如 
map()、filter()、flatMap()(不会立即执行)。 - Action(行动):如 
count()、collect()、saveAsTextFile()(触发计算)。 
4.2 DAG(有向无环图)
- Spark 任务会构建 DAG(DAGScheduler),将 RDD 之间的依赖关系转换为多个 Stage。
 - 每个 Stage 由 多个 Task 组成,并行执行计算任务。
 
示例:
val data = sc.textFile("hdfs://input.txt")  // RDD1
val words = data.flatMap(_.split(" "))      // RDD2(Transformation)
val wordCount = words.map((_, 1))           // RDD3(Transformation)
val result = wordCount.reduceByKey(_ + _)   // RDD4(Transformation)
result.saveAsTextFile("hdfs://output.txt")  // Action 触发计算
 
Spark 内部执行过程:
- DAG 构建阶段: 
- RDD1 -> RDD2 -> RDD3 -> RDD4
 
 - Stage 划分阶段: 
flatMap()和map()形成 Stage 1reduceByKey()形成 Stage 2
 - Task 并行执行: 
- 每个 Stage 划分多个 Task,并分发到 Executor 执行。
 
 
5. Spark 生态组件
Spark 具备丰富的生态系统,适用于不同场景:
| 组件 | 作用 | 
|---|---|
| Spark Core | RDD API,DAG 调度,任务执行。 | 
| Spark SQL | 运行 SQL 查询,支持 DataFrame、Dataset API。 | 
| Spark Streaming | 实时流处理,支持 Kafka、Flume 等数据源。 | 
| MLlib | 机器学习库,支持 K-Means、决策树等算法。 | 
| GraphX | 图计算引擎,支持 PageRank、社区检测等。 | 
6. Spark 与 Hadoop 对比
| 对比项 | Spark | Hadoop(MapReduce) | 
|---|---|---|
| 计算模型 | RDD 内存计算 | 磁盘读写 | 
| 速度 | 高速,适用于流计算 | 慢,适用于批处理 | 
| 容错机制 | RDD 通过 Lineage 恢复 | 任务失败后重跑 | 
| 适用场景 | 实时计算、流处理 | 批处理、大规模数据存储 | 
7. 适用场景
- 数据分析(数据挖掘、数据清洗)
 - 实时流计算(结合 Kafka 实现流式数据处理)
 - 机器学习(推荐系统、分类预测)
 - 图计算(社交网络分析、PageRank)
 
总结
Spark 采用 Driver + Executor 的分布式架构,基于 RDD 进行数据计算,通过 DAG 调度任务,并支持 SQL、流式计算、机器学习 等多种应用场景。相较于 Hadoop,Spark 计算更快,适合 大数据分析、实时计算和 AI 训练。
Checkpoint
Spark 中的 Checkpoint 作用
Checkpoint(检查点) 主要用于 RDD 持久化和容错,可以将 RDD 的数据存储到**持久化存储(如 HDFS、S3)**中,以便在失败时快速恢复计算,避免从头计算整个 DAG。
1. 为什么需要 Checkpoint?
在 Spark 中,RDD 具有血缘关系(Lineage),Spark 通过血缘追踪来进行故障恢复。如果某个计算任务失败,Spark 会重新从原始数据集按照血缘关系重新计算。
 但是,在以下情况下,依赖血缘恢复可能导致 高额计算开销:
- RDD 计算链路太长:如果 RDD 经过多次 Transformation,失败后重新计算的开销会很大。
 - Driver 内存溢出:RDD 的血缘信息存储在 Driver 中,过长的 Lineage 可能会导致 Driver 负担过重,甚至 OOM。
 - 需要数据持久化:某些情况下(如流式计算),需要持久化部分数据以便后续任务读取。
 
Checkpoint 可以 截断 RDD 血缘依赖,将计算结果持久化,避免重复计算,提高容错能力。
2. Checkpoint 的作用
(1) 提高容错能力
- 在 RDD 发生丢失时,不再依赖 Lineage 重新计算,而是直接从持久化存储中加载数据,提高恢复速度。
 
(2) 减少 DAG 依赖
- 通过 Checkpoint 截断 RDD 的血缘依赖,避免 DAG 过长,减少 Driver 负担。
 
(3) 持久化计算结果
- 适用于需要在不同任务中复用的 RDD,如流式计算(Spark Streaming)中的状态数据。
 
3. Checkpoint vs Cache
| Checkpoint | Cache / Persist | |
|---|---|---|
| 存储位置 | 持久化到HDFS / S3 / 本地磁盘 | 存储在Executor 的内存 / 磁盘 | 
| 数据存储方式 | 持久化后会丢弃 RDD 血缘信息 | 保留 RDD 血缘信息 | 
| 恢复方式 | 任务失败后直接从 Checkpoint 读取 | 任务失败后需要从头重新计算 | 
| 适用场景 | 长计算链路 / 流式计算 / 容错 | 短期数据复用 / 内存充足 | 
- Cache/Persist 适用于频繁访问数据,但不能容错,如果 Executor 挂掉,数据会丢失,需要重新计算。
 - Checkpoint 适用于长计算 DAG 或需要持久化数据的场景,但由于存储到 HDFS,速度较慢。
 
4. Checkpoint 使用方式
(1) 开启 Checkpoint
在使用 Checkpoint 之前,需要设置存储目录:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConfval conf = new SparkConf().setAppName("CheckpointExample").setMaster("local[*]")
val sc = new SparkContext(conf)// 1. 设置 Checkpoint 存储路径
sc.setCheckpointDir("hdfs://namenode:9000/spark-checkpoint")// 2. 创建 RDD
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))// 3. 设置 Checkpoint
rdd.checkpoint()// 4. 触发计算
rdd.count()
 
sc.setCheckpointDir(path)设置 Checkpoint 目录(必须是 HDFS、S3 或本地持久化存储)。rdd.checkpoint()标记 RDD 需要 Checkpoint。- 由于 Checkpoint 是惰性执行的,必须在 
Action(如count()、collect())时触发计算并存储。 
(2) 与 Cache 结合使用
由于 Checkpoint 计算会重新执行整个 DAG,可以先 cache(),然后 checkpoint(),避免重复计算:
val rdd = sc.textFile("hdfs://namenode:9000/data.txt").map(_.split(" "))rdd.cache()  // 缓存 RDD 避免重复计算
rdd.checkpoint()  // 持久化数据rdd.count()  // 触发计算
 
cache()先把数据缓存到内存,避免在 checkpoint 时重复计算。
5. Checkpoint 在 Spark Streaming 中的应用
在 Spark Streaming 中,Checkpoint 用于存储 Streaming 计算状态,保证数据处理的容错性,防止任务重启后状态丢失。
(1) 设置 Checkpoint 目录
import org.apache.spark.streaming.{Seconds, StreamingContext}// 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))// 设置 Checkpoint 目录
ssc.checkpoint("hdfs://namenode:9000/streaming-checkpoint")// 创建 DStream
val lines = ssc.socketTextStream("localhost", 9999)
val wordCounts = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)// 启动流式计算
ssc.start()
ssc.awaitTermination()
 
ssc.checkpoint(path)设置 Checkpoint 目录,用于存储流式计算的状态数据(如窗口聚合数据)。- 适用于 窗口操作(window, updateStateByKey) 场景。
 
6. 总结
-  
Checkpoint 作用:
- 持久化 RDD,避免 DAG 过长导致计算性能下降。
 - 提高容错性,避免 Executor 挂掉时重算整个 DAG。
 - 适用于 Streaming 计算,存储流式数据状态。
 
 -  
使用方法:
- 先 
sc.setCheckpointDir()设置目录。 - 对 RDD 调用 
checkpoint()。 - 触发 Action(如 
count())来执行 checkpoint 计算。 
 - 先 
 -  
Checkpoint vs Cache
- Cache/Persist 适用于临时缓存,提高性能,但不具备容错能力。
 - Checkpoint 适用于长计算链路、流式计算,保证容错,但性能略慢。
 
 
🚀 最佳实践:
- 长时间运行的任务(如 Spark Streaming)必须开启 Checkpoint。
 - Checkpoint 和 Cache 结合使用,避免重复计算导致性能下降。
 
并行度
Apache Spark 是一个分布式并行计算框架,基于 RDD(弹性分布式数据集) 进行并行计算,并利用集群资源提高计算效率。
Spark 的计算模型遵循 MapReduce 的思想,但相比 Hadoop,Spark 采用 内存计算,并且支持更加细粒度的任务调度和优化,大大提升了计算性能。
Spark 的并行度(parallelism) 取决于以下几个因素:
- RDD 的分区数(Partitions)
 - Executor 的数量
 - CPU 核心数
 - 并行任务数(Task 并发数)
 
1. RDD 的分区数
在 Spark 中,RDD 是由多个 分区(Partitions) 组成的,每个分区可以在一个 Task 中独立计算,因此分区数决定了并行度。
- 默认情况下: 
sc.textFile(path)读取 HDFS 文件时,分区数 = HDFS block 数量(通常是 128MB 一个 block)。sc.parallelize(data, numSlices)允许手动指定分区数numSlices。
 
示例:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 3) // 设置 3 个分区
println(rdd.partitions.length)  // 输出: 3
 
分区数越多,并行度越高,但过多的分区会导致 任务调度开销增加,降低整体效率。
2. Executor 并行度
Executor 是 Spark 任务的执行单元,每个 Executor 拥有多个 CPU 核心,可同时运行多个 Task。
- Executor 并行度计算方式:
[
并行度 = Executors 数量 \times 每个 Executor 的 CPU 核心数
] 
例如:
--num-executors 5  --executor-cores 4
 
表示:
- 5 个 Executors
 - 每个 Executor 4 核心
 - 最大并行 Task 数 = 5 × 4 = 20
 
3. 并行任务数(Task 并发数)
Spark 会按照RDD 分区数来决定 Task 数量,并由集群的可用资源(Executor 和 核心数)来决定同时能运行的 Task 数量。
并行任务数计算公式:
 [
 并行任务数 = min( RDD 分区数, 总 CPU 核心数 )
 ]
 例如:
- RDD 分区数 = 100
 - Spark 资源 = 10 Executors,每个 4 核心
 - 总可用核心数 = 10 × 4 = 40
 
并行度 = min(100, 40) = 40(同时执行 40 个 Task)
如何调整并行度?
- 增加 RDD 分区数: 
rdd.repartition(n)(增加或减少分区)rdd.coalesce(n)(减少分区,避免数据洗牌)
 - 增加 Executor 核心数 
--executor-cores N--num-executors M
 - 增加 Task 并发 
spark.default.parallelism(全局默认并行度)spark.sql.shuffle.partitions(SQL Shuffle 时的分区数)
示例:
 
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 10) // 增加分区数提高并行度
 
总结
| 影响因素 | 说明 | 影响并行度 | 
|---|---|---|
| RDD 分区数 | 任务并行度取决于分区数 | 分区数越多并行度越高 | 
| Executor 数量 | 任务运行的执行节点数量 | Executors 越多并行度越高 | 
| Executor 核心数 | 每个 Executor 可并行运行的 Task 数 | 核心数越多并行度越高 | 
| Task 并发数 | Task 调度和 CPU 资源影响并发 | Task 数量受 CPU 资源限制 | 
🔥 最佳实践:
- 大数据计算时,确保 RDD 分区数 ≥ 任务 CPU 核心数,以充分利用计算资源。
 - 避免单个 Task 计算过长,导致 CPU 资源利用率低下。
 - Spark SQL 计算时,适当调整 
spark.sql.shuffle.partitions(默认 200),减少 Shuffle 代价。 
🚀 结论:
 Spark 是 并行计算框架,并行度主要由 RDD 分区数、Executor 数量、CPU 核心数、任务调度 共同决定,合理调整参数可以优化计算性能。
