苏州做网站好的公司,平台型网站建设预算表,广西住房城乡和建设厅网站,wordpress 添加证书文章目录
Structured Streaming入门案例
一、Scala代码如下
二、Java 代码如下
三、以上代码注意点如下 Structured Streaming入门案例
我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本#xff0c;首先在Maven pom文件中导…
文章目录
Structured Streaming入门案例
一、Scala代码如下
二、Java 代码如下
三、以上代码注意点如下 Structured Streaming入门案例
我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本首先在Maven pom文件中导入以下依赖 !-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --propertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetspark.version3.4.3/spark.version/propertiesdependencies!-- Spark-core --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion${spark.version}/version/dependency!-- SparkSQL --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion${spark.version}/version/dependency!-- SparkSQL ON Hive--dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion${spark.version}/version/dependency!--mysql依赖的jar包--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.47/version/dependency!--SparkStreaming--dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion${spark.version}/version/dependency!-- Kafka 0.10 Source For Structured Streaming--dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependency!-- 向kafka 生产数据需要包 --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.8.0/version/dependency!-- Scala 包--dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion2.12.15/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-compiler/artifactIdversion2.12.15/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-reflect/artifactIdversion2.12.15/version/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.12/version/dependencydependencygroupIdcom.google.collections/groupIdartifactIdgoogle-collections/artifactIdversion1.0/version/dependency/dependencies
一、Scala代码如下
package com.lanson.structuredStreaming/*** Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/
object SSReadSocketData {def main(args: Array[String]): Unit {//1.创建SparkSession对象val spark: SparkSession SparkSession.builder().master(local).appName(StructuredSocketWordCount)//默认200个并行度由于源头数据量少可以设置少一些并行度.config(spark.sql.shuffle.partitions,1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel(Error)//2.读取Socket中的每行数据,生成DataFrame默认列名为valueval lines: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()//3.将每行数据切分成单词首先通过as[String]转换成Dataset操作val words: Dataset[String] lines.as[String].flatMap(line{line.split( )})//4.按照单词分组统计个数自动多一个列countval wordCounts: DataFrame words.groupBy(value).count()//5.启动流并向控制台打印结果val query: StreamingQuery wordCounts.writeStream//更新模式设置为complete.outputMode(complete).format(console).start()query.awaitTermination()}}二、Java 代码如下
package com.lanson.structuredStreaming;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark SparkSession.builder().master(local).appName(SSReadSocketData01).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow lines spark.readStream().format(socket).option(host, node3).option(port, 9999).load();DatasetString words lines.as(Encoders.STRING()).flatMap(new FlatMapFunctionString, String() {Overridepublic IteratorString call(String line) throws Exception {return Arrays.asList(line.split( )).iterator();}}, Encoders.STRING());DatasetRow wordCounts words.groupBy(value).count();StreamingQuery query wordCounts.writeStream().outputMode(complete).format(console).start();query.awaitTermination();}
}以上代码编写完成之后在node3节点执行“nc -lk 9999”启动socket服务器然后启动代码向socket中输入以下数据
第一次输入a b c
第二次输入d a c
第三次输入a b c
可以看到控制台打印如下结果
-------------------------------------------
Batch: 1
-------------------------------------------
----------
|value|count|
----------
| c| 1|
| b| 1|
| a| 1|
-----------------------------------------------------
Batch: 2
-------------------------------------------
----------
|value|count|
----------
| d| 1|
| c| 2|
| b| 1|
| a| 2|
-----------------------------------------------------
Batch: 3
-------------------------------------------
----------
|value|count|
----------
| d| 1|
| c| 3|
| b| 2|
| a| 3|
----------
三、以上代码注意点如下
SparkSQL 默认并行度为200这里由于数据量少可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。StructuredStreaming读取过来数据默认是DataFrame默认有“value”名称的列对获取的DataFrame需要通过as[String]转换成Dataset进行操作结果输出时的OutputMode有三种输出模式Complete Mode、Append Mode、Update Mode。 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨