加强网站互动交流平台建设自查,网站站做地图软件,做网站的图片,网站后台登陆验证码文章目录 零、本讲学习目标一、使用Spark SQL实现词频统计#xff08;一#xff09;数据源 - words.txt#xff08;二#xff09;创建Maven项目#xff08;三#xff09;添加依赖和构建插件#xff08;四#xff09;修改源目录名称#xff08;五#xff09;创建日志属… 文章目录 零、本讲学习目标一、使用Spark SQL实现词频统计一数据源 - words.txt二创建Maven项目三添加依赖和构建插件四修改源目录名称五创建日志属性文件六创建词频统计单例对象七启动程序查看结果八词频统计数据转化流程图 零、本讲学习目标
使用Spark SQL实现词频统计使用Spark SQL计算总分与平均分使用Spark SQL统计每日新增用户使用Spark SQL实现分组排行榜使用Spark SQL进行智慧交通数据分析
一、使用Spark SQL实现词频统计
一数据源 - words.txt 二创建Maven项目
创建Maven项目 - SparkSQLWordCount
三添加依赖和构建插件
在pom.xml文件里添加依赖和构建插件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdnet.hw.wc/groupIdartifactIdSparkSQLWordCount/artifactIdversion1.0-SNAPSHOT/versiondependenciesdependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion2.11.8/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion2.1.1/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.11/artifactIdversion2.1.1/version/dependency/dependenciesbuild pluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.3.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/pluginplugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.3.2/versionexecutionsexecutionidscala-compile-first/idphaseprocess-resources/phasegoalsgoaladd-source/goalgoalcompile/goal/goals/executionexecutionidscala-test-compile/idphaseprocess-test-resources/phasegoalsgoaltestCompile/goal/goals/execution/executions/plugin/plugins/build
/project四修改源目录名称
将源目录名由java改成scala 在pom.xml文件里设置源目录
五创建日志属性文件
在resources目录里创建log4j.properties文件
log4j.rootLoggerERROR, stdout, logfile
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n
log4j.appender.logfileorg.apache.log4j.FileAppender
log4j.appender.logfile.Filetarget/spark.log
log4j.appender.logfile.layoutorg.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern%d %p [%c] - %m%n六创建词频统计单例对象
创建net.hw.wc包在包里创建SparkSQLWordCount单例对象
package net.hw.wcimport org.apache.spark.sql.{Dataset, SparkSession}/*** 功能利用Spark SQL实现词频统计* 作者华卫* 日期2022年05月15日*/
object SparkSQLWordCount {def main(args: Array[String]): Unit {// 设置HADOOP用户名属性否则本地运行访问会被拒绝System.setProperty(HADOOP_USER_NAME, root)// 创建或得到SparkSessionval spark SparkSession.builder().appName(SparkSQLWordCount).master(local[*]).getOrCreate()// 读取HDFS上的单词文件val lines: Dataset[String] spark.read.textFile(hdfs://master:9000/input/words.txt)// 显示数据集lines内容lines.show()// 导入Spark会话对象的隐式转换import spark.implicits._// 将数据集中的数据按空格切分并合并val words: Dataset[String] lines.flatMap(_.split( ))// 显示数据集words内容words.show()// 将数据集默认列名由value改为word并转换成数据帧val df words.withColumnRenamed(value, word).toDF()// 显示数据帧内容df.show()// 基于数据帧创建临时视图df.createTempView(v_words)// 执行SQL分组查询实现词频统计val wc spark.sql(| select word, count(*) as count| from v_words group by word| order by count desc|.stripMargin)// 显示词频统计结果wc.show()// 关闭会话spark.close()}
}七启动程序查看结果
运行SparkSQLWordCount单例对象
八词频统计数据转化流程图
文本文件转化成数据集再转化成数据帧最后基于表查询得到结果数据帧