文章目录 零、本节学习目标 一、Spark的概述 (一)Spark的组件 1、Spark Core 2、Spark SQL 3、Spark Streaming 4、MLlib 5、Graph X 6、独立调度器、Yarn、Mesos (二)Spark的发展史 二、Spark的特点 (一)速度快 (二)易用性 (三)通用性 (四)兼容性 (五)代码简洁 1、采用MR实现词频统计 2、采用Spark实现词频统计 3、两种代码对比结论 三、Spark的应用场景 四、Spark与Hadoop的对比 (一)编程方式 (二)数据存储 (三)数据处理 (四)数据容错  
 
 
 零、本节学习目标  
了解什么是Spark计算框架 了解Spark计算框架的特点 了解Spark计算框架的应用场景 理解Spark框架与Hadoop框架的对比   
 一、Spark的概述  
 (一)Spark的组件  
Spark在2013年加入Apache孵化器项目,之后获得迅猛的发展,并于2014年正式成为Apache软件基金会的顶级项目。Spark生态系统已经发展成为一个可应用于大规模数据处理 的统一分析引擎,它是基于内存计算 的大数据并行计算框架,适用于各种各样的分布式平台的系统。在Spark生态圈中包含了Spark SQL、Spark Streaming、GraphX、MLlib等组件。       
 1、Spark Core  
Spark核心组件,实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含对弹性分布式数据集的API定义 。   
 2、Spark SQL  
用来操作结构化数据的核心组件,通过Spark SQL可直接查询Hive、HBase等多种外部数据源中的数据。Spark SQL的重要特点是能够统一处理关系表和RDD 。     
 3、Spark Streaming  
Spark提供的流式计算框架,支持高吞吐量、可容错处理的实时流式数据处理 ,其核心原理是将流数据分解成一系列短小的批处理作业 。     
 4、MLlib  
Spark提供的关于机器学习功能的算法程序库,包括分类、回归、聚类、协同过滤算法等,还提供了模型评估、数据导入等额外的功能。     
 5、Graph X  
Spark提供的分布式图处理框架,拥有对图计算和图挖掘算法的API接口及丰富的功能和运算符,便于对分布式图处理的需求,能在海量数据上运行复杂的图算法。     
 6、独立调度器、Yarn、Mesos  
集群管理器,负责Spark框架高效地在一个到数千个节点之间进行伸缩计算的资源管理。   
 (二)Spark的发展史  
 1、发展简史  
对于一个具有相当技术门槛与复杂度的平台,Spark从诞生到正式版本的成熟,经历的时间如此之短,让人感到惊诧。2009年,Spark诞生于伯克利大学AMPLab,最开初属于伯克利大学的研究性项目。它于2010年正式开源,并于2013年成为了Aparch基金项目,并于2014年成为Aparch基金的顶级项目,整个过程不到五年时间。     
 2、目前最新版本  
Spark目前最新版本是2023年2月17日发布的Spark3.3.2     
 二、Spark的特点  
Spark计算框架在处理数据时,所有的中间数据都保存在内存中,从而减少磁盘读写操作,提高框架计算效率。同时Spark还兼容HDFS、Hive,可以很好地与Hadoop系统融合,从而弥补MapReduce高延迟的性能缺点。所以说,Spark是一个更加快速、高效的大数据计算平台。 Spark官网上给出Spark的特点     
 (一)速度快  
与MapReduce相比,Spark可以支持包括Map和Reduce在内的更多操作,这些操作相互连接形成一个有向无环图(Directed Acyclic Graph,简称DAG),各个操作的中间数据则会被保存在内存中。因此处理速度比MapReduce更加快。Spark通过使用先进的DAG调度器、查询优化器和物理执行引擎,从而能够高性能的实现批处理和流数据处理。     
 (二)易用性  
Spark支持使用Scala、Python、Java及R语言快速编写应用。同时Spark提供超过80个高级运算符,使得编写并行应用程序变得容易并且可以在Scala、Python或R的交互模式下使用Spark。     
 (三)通用性  
Spark可以与SQL、Streaming及复杂的分析良好结合。Spark还有一系列的高级工具,包括Spark SQL、MLlib(机器学习库)、GraphX(图计算)和Spark Streaming,并且支持在一个应用中同时使用这些组件。     
 (四)兼容性  
用户可以使用Spark的独立集群模式运行Spark,也可以在EC2(亚马逊弹性计算云)、Hadoop YARN或者Apache Mesos上运行Spark。并且可以从HDFS、Cassandra、HBase、Hive、Tachyon和任何分布式文件系统读取数据。     
 (五)代码简洁  
参看【经典案例【词频统计】十一种实现方式】     
 1、采用MR实现词频统计  
编写词频统计映射器 - WordCountMapper   
package  net. hw. wc ; import  org. apache. hadoop. io.  IntWritable ; 
import  org. apache. hadoop. io.  LongWritable ; 
import  org. apache. hadoop. io.  Text ; 
import  org. apache. hadoop. mapreduce.  Mapper ; import  java. io.  IOException ; public  class  WordCountMapper  extends  Mapper < LongWritable ,  Text ,  Text ,  IntWritable >   { @Override protected  void  map ( LongWritable  key,  Text  value,  Context  context) throws  IOException ,  InterruptedException  { String  line =  value. toString ( ) ; String [ ]  words =  line. split ( " " ) ; for  ( int  i =  0 ;  i <  words. length;  i++ )  { context. write ( new  Text ( words[ i] ) ,  new  IntWritable ( 1 ) ) ; } } 
} 
  
编写词频统计归约器 - WordCountReducer   
package  net. hw. wc ; import  org. apache. hadoop. io.  IntWritable ; 
import  org. apache. hadoop. io.  Text ; 
import  org. apache. hadoop. mapreduce.  Reducer ; import  java. io.  IOException ; public  class  WordCountReducer  extends  Reducer < Text ,  IntWritable ,  Text ,  IntWritable >   { @Override protected  void  reduce ( Text  key,  Iterable < IntWritable >   values,  Context  context) throws  IOException ,  InterruptedException  { int  count =  0 ; for  ( IntWritable  value :  values)  { count +=  value. get ( ) ;  } context. write ( key,  new  IntWritable ( count) ) ; } 
} 
  
编写词频统计驱动器 - WordCountDriver   
package  net. hw. wc ; import  org. apache. hadoop. conf.  Configuration ; 
import  org. apache. hadoop. fs.  FSDataInputStream ; 
import  org. apache. hadoop. fs.  FileStatus ; 
import  org. apache. hadoop. fs.  FileSystem ; 
import  org. apache. hadoop. fs.  Path ; 
import  org. apache. hadoop. io.  IOUtils ; 
import  org. apache. hadoop. io.  IntWritable ; 
import  org. apache. hadoop. io.  Text ; 
import  org. apache. hadoop. mapreduce.  Job ; 
import  org. apache. hadoop. mapreduce. lib. input.  FileInputFormat ; 
import  org. apache. hadoop. mapreduce. lib. output.  FileOutputFormat ; import  java. net.  URI; public  class  WordCountDriver  { public  static  void  main ( String [ ]  args)  throws  Exception  { Configuration  conf =  new  Configuration ( ) ; conf. set ( "dfs.client.use.datanode.hostname" ,  "true" ) ; Job  job =  Job . getInstance ( conf) ; job. setJarByClass ( WordCountDriver . class ) ; job. setMapperClass ( WordCountMapper . class ) ; job. setMapOutputKeyClass ( Text . class ) ; job. setMapOutputValueClass ( IntWritable . class ) ; job. setReducerClass ( WordCountReducer . class ) ; job. setOutputKeyClass ( Text . class ) ; job. setOutputValueClass ( IntWritable . class ) ; String  uri =  "hdfs://master:9000" ; Path  inputPath =  new  Path ( uri +  "/word/input" ) ; Path  outputPath =  new  Path ( uri +  "/word/result" ) ; FileSystem  fs =  FileSystem . get ( new  URI ( uri) ,  conf) ; fs. delete ( outputPath,  true ) ; FileInputFormat . addInputPath ( job,  inputPath) ; FileOutputFormat . setOutputPath ( job,  outputPath) ; job. waitForCompletion ( true ) ; System . out. println ( "======统计结果======" ) ; FileStatus [ ]  fileStatuses =  fs. listStatus ( outputPath) ; for  ( int  i =  1 ;  i <  fileStatuses. length;  i++ )  { System . out. println ( fileStatuses[ i] . getPath ( ) ) ; FSDataInputStream  in =  fs. open ( fileStatuses[ i] . getPath ( ) ) ; IOUtils . copyBytes ( in,  System . out,  4096 ,  false ) ; } } 
} 
  
运行程序WordCountDriver,查看结果     
 2、采用Spark实现词频统计  
 
package  net. hw. spark. wc import  org. apache. spark.  { SparkConf,  SparkContext} object  WordCount { def  main( args:  Array[ String ] ) :  Unit  =  { val  conf =  new  SparkConf( ) . setMaster( "local" ) . setAppName( "wordcount" ) val  sc =  new  SparkContext( conf) val  rdd =  sc. textFile( "test.txt" ) . flatMap( _. split( " " ) ) . map( ( _,  1 ) ) . reduceByKey( _ +  _) rdd. foreach( println) rdd. saveAsTextFile( "result" ) } 
} 
  
启动程序,查看结果     
 3、两种代码对比结论  
大家可以看出,完成同样的词频统计任务,Spark代码比MapReduce代码简洁很多。   
 三、Spark的应用场景  
 (一)应用场景分类  
 1、数据科学  
数据工程师可以利用Spark进行数据分析与建模,由于Spark具有良好的易用性,数据工程师只需要具备一定的SQL语言基础、统计学、机器学习等方面的经验,以及使用Python、Matlab或者R语言的基础编程能力,就可以使用Spark进行上述工作。   
 2、数据处理  
大数据工程师将Spark技术应用于广告、报表、推荐系统等业务中,在广告业务中,利用Spark系统进行应用分析、效果分析、定向优化等业务,在推荐系统业务中,利用Spark内置机器学习算法训练模型数据,进行个性化推荐及热点点击分析等业务。   
 (二)使用Spark的公司  
 1、腾讯  
广点通是最早使用Spark的应用之一。腾讯大数据精准推荐借助Spark快速迭代的优势,围绕“数据+算法+系统”这套技术方案,实现了在“数据实时采集、算法实时训练、系统实时预测”的全流程实时并行高维算法,最终成功应用于广点通pCTR (Predict Click-Through Rate) 投放系统上,支持每天上百亿的请求量。   
 2、Yahoo  
Yahoo将Spark用在Audience Expansion中。Audience Expansion是广告中寻找目标用户的一种方法,首先广告者提供一些观看了广告并且购买产品的样本客户,据此进行学习,寻找更多可能转化的用户,对他们定向广告。Yahoo采用的算法是Logistic Regression。同时由于某些SQL负载需要更高的服务质量,又加入了专门跑Shark的大内存集群,用于取代商业BI/OLAP工具,承担报表/仪表盘和交互式/即席查询,同时与桌面BI工具对接。   
 3、淘宝  
淘宝技术团队使用了Spark来解决多次迭代的机器学习算法、高计算复杂度的算法等,将Spark运用于淘宝的推荐相关算法上,同时还利用GraphX解决了许多生产问题,包括以下计算场景:基于度分布的中枢节点发现、基于最大连通图的社区发现、基于三角形计数的关系衡量、基于随机游走的用户属性传播等。   
 4、优酷土豆  
目前Spark已经广泛使用在优酷土豆的视频推荐,广告业务等方面,相比Hadoop,Spark交互查询响应快,性能比Hadoop提高若干倍。一方面,使用Spark模拟广告投放的计算效率高、延迟小(同Hadoop比延迟至少降低一个数量级)。另一方面,优酷土豆的视频推荐往往涉及机器学习及图计算,而使用Spark解决机器学习、图计算等迭代计算能够大大减少网络传输、数据落地等的次数,极大地提高了计算性能。   
 四、Spark与Hadoop的对比  
 (一)编程方式  
Hadoop的MapReduce计算数据时,要转化为Map和Reduce两个过程,从而难以描述复杂的数据处理过程;而Spark的计算模型不局限于Map和Reduce操作,还提供了多种数据集的操作类型,编程模型比MapReduce更加灵活。   
 (二)数据存储  
Hadoop的MapReduce进行计算时,每次产生的中间结果都存储在本地磁盘中;而Spark在计算时产生的中间结果存储在内存中。   
 (三)数据处理  
Hadoop在每次执行数据处理时,都要从磁盘中加载数据,导致磁盘IO开销较大;而Spark在执行数据处理时,要将数据加载到内存中,直接在内存中加载中间结果数据集,减少了磁盘的IO开销。   
 (四)数据容错  
MapReduce计算的中间结果数据,保存在磁盘中,Hadoop底层实现了备份机制,从而保证了数据容错;Spark RDD实现了基于Lineage的容错机制和设置检查点方式的容错机制,弥补数据在内存处理时,因断电导致数据丢失的问题。