做美团网站代码小程序商城使用教程
一.RDD
28.RDD_为什么需要RDD
29.RDD_定义


30.RDD_五大特性总述


31.RDD_五大特性1

32.RDD_五大特性2

33.RDD_五大特性3

34.RDD_五大特性4

35.RDD_五大特性5

36.RDD_五大特性总结

37.RDD_创建概述

38.RDD_并行化创建

 演示代码:  
 
 // 获取当前 RDD 的分区数  
 
 @Since ( "1.6.0" )  
 
 final def  getNumPartitions :  Int  =  
 
 partitions . length  
 
 // 显示出 RDD 被分配到不同分区的信息  
 
 /**Return an RDD created by coalescing all  
 
 elements within each partition into an  
 
 array.*/  
 
 def  glom ():  RDD [ Array [ T ]]  
 
 1  
 
 2  
 
 3  
 
 4  
 
 5  
 
 6  
 
 package  com . itbaizhan . rdd  
 
 //1. 导入 SparkConf 类、 SparkContext  
 
 import  org . apache . spark . rdd . RDD  
 
 import  org . apache . spark .{  SparkConf ,  
 
 SparkContext }  
 
 object  CreateByParallelize  {  
 
 
 def  main ( args :  Array [ String ]):  Unit  =  {  
 
 
 //2. 构建 SparkConf 对象。并设置本地运行和程序的  
 
 名称  
 
 
 val  conf  =  new  
 
 SparkConf (). setMaster ( "local[2]" ). setAppName  
 
 ( "CreateRdd1" )  
 
 
 //3. 构建 SparkContext 对象  
 
 
 val  sc  =  new  SparkContext ( conf )  
 
 
 //4. 通过并行化创建 RDD 对象:将本地集合 -> 分布式的  
 
 RDD 对象  
 
 1  
 
 2  
 
 3  
 
 4  
 
 5  
 
 6  
 
 7  
 
 8  
 
 9  
 
 10  
 
 11  
 
 12  
 
 79     
 
 //val rdd: RDD[Int] =  
 
 sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,  
 
 7, 8))  
 
 
 val  rdd :  RDD [ Int ]  =  
 
 sc . parallelize ( List ( 1 ,  2 ,  3 ,  4 ,  5 ,  6 ,  7 ,  
 
 8 ), 3 )  
 
 
 //5. 输出默认的分区数  
 
 
 //5.1  
 
 setMaster("local[*]")&¶llelize(List(1,  
 
 2, 3, 4, 5, 6, 7, 8))  
 
 
 //println(" 默认分区  
 
 数: "+rdd.getNumPartitions)//8, 默认当前系统的  
 
 CPU 数  
 
 
 //5.2  
 
 setMaster("local[2]")&¶llelize(List(1,  
 
 2, 3, 4, 5, 6, 7, 8))  
 
 
 //println(" 默认分区  
 
 数: "+rdd.getNumPartitions)//2  
 
 
 //5.3  
 
 setMaster("local[2]")&¶llelize(List(1,  
 
 2, 3, 4, 5, 6, 7, 8),3)  
 
 
 println ( " 默认分区  
 
 数: " + rdd . getNumPartitions ) //3  
 
 
 //6.collect 方法:将 rdd 对象中每个分区的数据,都  
 
 发送到 Driver ,形成一个 Array 对象  
 
 
 val  array1 :  Array [ Int ]  =  rdd . collect ()  
 
 
 println ( "rdd.collect()=" + array1 . mkString ( ",  
 
 " ))  
 
 
 //7. 显示出 rdd 对象中元素被分布到不同分区的数据信  
 
 息  
 
 13  
 
 14  
 
 15  
 
 16  
 
 17  
 
 18  
 
 19  
 
 20  
 
 21  
 
 22  
 
 23  
 
 24  
 
 25  
 
 80 运行结果:  
 
 实时效果反馈  
 
 1.  以下关于并行化创建 RDD 的描述错误的是:  
 
 A  
 
 通过并行化集合创建,将本地集合对象转分布式 RDD 。  
 
 B  
 
 parallelize() 方法必须传递两个参数。  
 
 C  
 
 parallelize 没有给定分区数 ,  默认分区数等于执行程序的当前  
 
 服务器 CPU 核数。  
 
 答案:  
 
 
 val  array2 :  Array [ Array [ Int ]]  =  
 
 rdd . glom (). collect ()  
 
 
 println ( "rdd.glom().collect() 的内容是 :" )  
 
 
 /*for(eleArr<- array2){  
 
 
 println(eleArr.mkString(","))  
 
 
 }*/  
 
 
 array2 . foreach ( eleArr => println ( eleArr . mkStr  
 
 ing ( "," )))  
 
 }  
 
 }  
 
 26  
 
 27  
 
 28  
 
 29  
 
 30  
 
 31  
 
 32  
 
 33  
 
 默认分区数: 3  
 
 rdd.collect()=1,2,3,4,5,6,7,8  
 
 rdd.glom().collect() 的内容是 :  
 
 1,2  
 
 3,4,5  
 
 6,7,8 
 
 
39.RDD_读取文件创建RDD

40.RDD_读取小文件创建RDD
 扩展 wholeTextFiles 适合读取一堆小文件:  
 
 //path 指定小文件的路径目录  
 
 //minPartitions  最小分区数 可选参数  
 
 def  wholeTextFiles ( path :  
 
 String , minPartitions :  Int  =  
 
 defaultMinPartitions ):  RDD [( String ,  String )]  
 
 1  
 
 2  
 
 3  
 
 85 代码演示:  
 
 package  com . itbaizhan . rdd  
 
 //1. 导入类  
 
 import  org . apache . spark . rdd . RDD  
 
 import  org . apache . spark .{  SparkConf ,  
 
 SparkContext }  
 
 object  CreateByWholeTextFiles  {  
 
 
 def  main ( args :  Array [ String ]):  Unit  =  {  
 
 
 //2. 构建 SparkConf 对象,并设置本地运行和程序名  
 
 称  
 
 
 val  conf :  SparkConf  =  new  
 
 SparkConf (). setMaster ( "local[*]" ). setAppName  
 
 ( "WholeTextFiles" )  
 
 
 //3. 使用 conf 对象构建 SparkContet 对象  
 
 
 val  sc  =  new  SparkContext ( conf )  
 
 
 //5. 读取指定目录下的小文件  
 
 
 val  rdd :  RDD [( String ,  String )]  =  
 
 sc . wholeTextFiles ( "data/tiny_files" )  
 
 
 //(filePath1, " 内容 1"),(filePath2, " 内容  
 
 2"),...,(filePathN, " 内容 N")  
 
 
 val  tuples :  Array [( String ,  String )]  =  
 
 rdd . collect ()  
 
 
 tuples . foreach ( ele => println ( ele . _1 , ele . _2 ))  
 
 
 //6. 获取小文件中的内容  
 
 
 val  array :  Array [ String ]  =  
 
 rdd . map ( _ . _2 ). collect ()  
 
 
 println ( "---------------------------" )  
 
 
 println ( array . mkString ( "|" ))  
 
 
 //4. 关闭 sc 对象  
 
 1  
 
 2  
 
 3  
 
 4  
 
 5  
 
 6  
 
 7  
 
 8  
 
 9  
 
 10  
 
 11  
 
 12  
 
 13  
 
 14  
 
 15  
 
 16  
 
 17  
 
 18  
 
 19  
 
 20  
 
 21  
 
 86 运行输出结果 :  
 
 RDD_ 算子概述  
 
 定义: 分布式集合 RDD 对象的方法被称为算子  
 
 算子分类:  
 
 Transformation 转换算子  
 
 1  
 
 Action 行动算子  
 
 2  
 
 
 sc . stop ()  
 
 }  
 
 }  
 
 22  
 
 23  
 
 24  
 
 (file:/D:/codes/itbaizhan/sparkdemo/data/tin  
 
 y_files/file1.txt,hello Linux  
 
 hello Zookeper  
 
 hello Maven  
 
 hello hive  
 
 hello spark)  
 
 (file:/D:/codes/itbaizhan/sparkdemo/data/tin  
 
 y_files/file2.txt,Spark Core  
 
 Spark RDD  
 
 Spark Sql)  
 
 ----------------  
 
 hello Linux  
 
 hello Zookeper  
 
 hello Maven  
 
 hello hive  
 
 hello spark|Spark Core  
 
 Spark RDD  
 
 Spark Sql 
 
 
41.RDD_算子概述

42.RDD_转换算子map

43.RDD_转换算子flatmap

44.RDD_转换算子reducebykey

45.RDD_转换算子filter

46.RDD_转换算子distinct

47.RDD_转换算子glom

48.RDD_转换算子groupby

 object  RddGroupBy  {  
 
 
 def  main ( args :  Array [ String ]):  Unit  =  {  
 
 
 //2. 构建 SparkConf 对象,并设置本地运行和程序名  
 
 称  
 
 
 val  conf :  SparkConf  =  new  
 
 SparkConf (). setMaster ( "local[*]" ). setAppName  
 
 ( "groupBy" )  
 
 
 //3. 使用 conf 对象构建 SparkContet 对象  
 
 
 val  sc  =  new  SparkContext ( conf )  
 
 
 //5. 创建 Rdd  
 
 
 val  rdd :  RDD [( Char ,  Int )]  =  
 
 sc . parallelize ( Array (( 'a' ,  1 ), ( 'a' ,  2 ),  
 
 ( 'b' ,  1 ), ( 'b' ,  2 ), ( 'a' ,  3 ), ( 'a' ,  4 )))  
 
 
 //6. 通过 groupBy 算子对 rdd 对象中的数据进行分组  
 
 
 //groupBy 插入的函数的用意是指定按照谁进行分组  
 
 
 // 分组后的结果是有二元组组成的 RDD  
 
 
 val  gbRdd :  RDD [( Char ,  Iterable [( Char ,  
 
 Int )])]  =  rdd . groupBy ( tupEle  =>  tupEle . _1 )  
 
 
 // 收集到 Driver 端  
 
 
 val  result1 :  Array [( Char ,  
 
 Iterable [( Char ,  Int )])]  =  gbRdd . collect ()  
 
 
 //(a,CompactBuffer((a,1), (a,2), (a,3),  
 
 (a,4))),(b,CompactBuffer((b,1), (b,2)))  
 
 
 println ( result1 . mkString ( "," ))  
 
 
 //7. 使用 map 转换算子  
 
 
 //(a,List((a,1), (a,2), (a,3), (a,4))),  
 
 (b,List((b,1), (b,2)))  
 
 
 val  result2 :  Array [( Char ,  List [( Char ,  
 
 Int )])]  =  gbRdd . map ( tup  =>  ( tup . _1 ,  
 
 tup . _2 . toList )). collect ()  
 
 
 println ( result2 . mkString ( "," ))  
 
 6  
 
 7  
 
 8  
 
 9  
 
 10  
 
 11  
 
 12  
 
 13  
 
 14  
 
 15  
 
 16  
 
 17  
 
 18  
 
 19  
 
 20  
 
 21  
 
 22  
 
 23  
 
 24  
 
 25  
 
 26  
 
 104 实时效果反馈  
 
 1.  以下关于  
 
 rdd.groupBy(tupEle => tupEle._1)  
 
 的描述错误的是:  
 
 A 
 
 groupBy 传入的函数的意思是 :  通过这个函数 , 确定按照谁来  
 
 分组。  
 
 B 
 
 groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个  
 
 数只能为 2 。  
 
 C 
 
 groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个  
 
 数 >=2 。  
 
 答案:  
 
 1=>B 
 
 
49.RDD_转换算子groupbyKey

50.RDD_转换算子sortby

51.RDD_转换算子sortbyKey



52.RDD_转换算子union并集


53.RDD_转换算子交集和差集




54.RDD_转换算子关联算子


55.RDD_转换算子partitionBy



56.RDD_转换算子mapPatitions


57.RDD_转换算子sample



58.RDD_行动算子foreachPartition


59.RDD_行动算子foreach



60.RDD_行动算子saveAsTestFile




61.RDD_行动算子countByKey


62.RDD_行动算子reduce


63.RDD_行动算子fold



64.RDD_行动算子first_take_count



65.RDD_行动算子top_takeOrderd



66.RDD_行动算子takeSample



二.内核进阶
67.内核进阶_DAG概述


68.内核进阶_血缘关系



69.内核进阶_宽窄依赖关系

70.内核进阶_stage划分


71.内核进阶_任务调度概述


72.内核进阶_管道计算模式上


73.内核进阶_管道计算模式下




74.内核进阶_cache缓存



75.内核进阶_checkpoint检查点



76.内核进阶_cache和checkpoint区别


77.内核进阶_并行度


78.内核进阶_广播变量


79.内核进阶_累加器一


80.内核进阶_累加器二


81.内核进阶_累加器之重复计算



82.内核进阶_项目实战PVUV需求分析

83.内核进阶_项目实战PV分析



84.内核进阶_项目实战UV分析


85.内核进阶_二次排序实战



86.内核进阶_分组取topN实战



87.内核进阶_卡口统计项目需求分析



88.内核进阶_卡口统计项目统计正常的卡口



89.内核进阶_卡口统计项目TOP5



90.内核进阶_卡口统计项目统计不同区域同时出现的车辆


91.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹一

92.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹二


93.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹三


94.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹四

















