当前位置: 首页 > news >正文

苏州做网站好的公司平台型网站建设预算表

苏州做网站好的公司,平台型网站建设预算表,广西住房城乡和建设厅网站,wordpress 添加证书文章目录 Structured Streaming入门案例 一、Scala代码如下 二、Java 代码如下 三、以上代码注意点如下 Structured Streaming入门案例 我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本#xff0c;首先在Maven pom文件中导… 文章目录 Structured Streaming入门案例 一、Scala代码如下 二、Java 代码如下 三、以上代码注意点如下 Structured Streaming入门案例 我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本首先在Maven pom文件中导入以下依赖 !-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --propertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetspark.version3.4.3/spark.version/propertiesdependencies!-- Spark-core --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion${spark.version}/version/dependency!-- SparkSQL --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion${spark.version}/version/dependency!-- SparkSQL ON Hive--dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion${spark.version}/version/dependency!--mysql依赖的jar包--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.47/version/dependency!--SparkStreaming--dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion${spark.version}/version/dependency!-- Kafka 0.10 Source For Structured Streaming--dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependency!-- 向kafka 生产数据需要包 --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.8.0/version/dependency!-- Scala 包--dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion2.12.15/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-compiler/artifactIdversion2.12.15/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-reflect/artifactIdversion2.12.15/version/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.12/version/dependencydependencygroupIdcom.google.collections/groupIdartifactIdgoogle-collections/artifactIdversion1.0/version/dependency/dependencies 一、Scala代码如下 package com.lanson.structuredStreaming/*** Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/ object SSReadSocketData {def main(args: Array[String]): Unit {//1.创建SparkSession对象val spark: SparkSession SparkSession.builder().master(local).appName(StructuredSocketWordCount)//默认200个并行度由于源头数据量少可以设置少一些并行度.config(spark.sql.shuffle.partitions,1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel(Error)//2.读取Socket中的每行数据,生成DataFrame默认列名为valueval lines: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()//3.将每行数据切分成单词首先通过as[String]转换成Dataset操作val words: Dataset[String] lines.as[String].flatMap(line{line.split( )})//4.按照单词分组统计个数自动多一个列countval wordCounts: DataFrame words.groupBy(value).count()//5.启动流并向控制台打印结果val query: StreamingQuery wordCounts.writeStream//更新模式设置为complete.outputMode(complete).format(console).start()query.awaitTermination()}}二、Java 代码如下 package com.lanson.structuredStreaming;import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark SparkSession.builder().master(local).appName(SSReadSocketData01).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow lines spark.readStream().format(socket).option(host, node3).option(port, 9999).load();DatasetString words lines.as(Encoders.STRING()).flatMap(new FlatMapFunctionString, String() {Overridepublic IteratorString call(String line) throws Exception {return Arrays.asList(line.split( )).iterator();}}, Encoders.STRING());DatasetRow wordCounts words.groupBy(value).count();StreamingQuery query wordCounts.writeStream().outputMode(complete).format(console).start();query.awaitTermination();} }以上代码编写完成之后在node3节点执行“nc -lk 9999”启动socket服务器然后启动代码向socket中输入以下数据 第一次输入a b c 第二次输入d a c 第三次输入a b c 可以看到控制台打印如下结果 ------------------------------------------- Batch: 1 ------------------------------------------- ---------- |value|count| ---------- | c| 1| | b| 1| | a| 1| ----------------------------------------------------- Batch: 2 ------------------------------------------- ---------- |value|count| ---------- | d| 1| | c| 2| | b| 1| | a| 2| ----------------------------------------------------- Batch: 3 ------------------------------------------- ---------- |value|count| ---------- | d| 1| | c| 3| | b| 2| | a| 3| ---------- 三、以上代码注意点如下 SparkSQL 默认并行度为200这里由于数据量少可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。StructuredStreaming读取过来数据默认是DataFrame默认有“value”名称的列对获取的DataFrame需要通过as[String]转换成Dataset进行操作结果输出时的OutputMode有三种输出模式Complete Mode、Append Mode、Update Mode。 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨
http://www.yayakq.cn/news/2951/

相关文章:

  • 站长工具端口查询郑州银行app
  • 网站基建建设怎么用优盘做网站登录密钥
  • 专业做曝光引流网站高端人才招聘网站排名
  • 静态网站建设教程建设一个企业网站多少钱
  • 网站开发过程在线学习平台网站建设有什么功能
  • 网站审批需要什么手续辽宁城建设计院有限公司网站
  • 吴江建网站wordpress 标签插件
  • 天晴创艺网站建设百度小程序佛山网站优化步骤
  • win7可以做网站吗设计师之家
  • 广州网站建设多少钱广东省阳江网络问政平台
  • 青岛网站建设公司招聘上海环球金融中心简笔画
  • 用kid做教育网站域名商丘至开网络科技有限公司
  • 将网站保存怎么做lol怎么做直播网站
  • 福建设计招聘网站做微信公众号的网站吗
  • 两学一做 山西答题网站茌平网站制作
  • 兰州做网站怎么样wordpress怎样修改页脚版权信息
  • 上海虹桥站福州关键词自然排名
  • 医疗门户网站模板网站备案到
  • 用wordpress搭建娱乐网陕西seo快速排名
  • 洛阳便宜网站建设费用电子商务平台经营者有哪些义务
  • 自己公司网站设计怎么制作自己的网址
  • 中小型企业网站大全在微信上怎么卖自己的产品
  • 企业建站系统漏洞甘肃省酒泉市做网站公司
  • 做网站需要具备什么网站开发设计的技术路线
  • 淘宝客网站建站建设网站建设哪家快
  • 自己做蛋糕有什么网站吗口碑好网站建设价格低
  • 莆田有建设网站的公司码环艺做网站
  • 可以做空股票的网站企业文化经典句子
  • 莱芜网站建设公司在线做图网站
  • 阿里云做网站麻烦吗wordpress主题 know how