当前位置: 首页 > news >正文

dedecms网站tag标签静态化球场 技术支持 东莞网站建设

dedecms网站tag标签静态化,球场 技术支持 东莞网站建设,建筑公司网站设计详情,淘宝网站的建设与运营设计思路概述 SourceFunction:非并行数据源(并行度只能1) --接口 RichSourceFunction:多功能非并行数据源(并行度只能1) --类 ParallelSourceFunction:并行数据源(并行度能够>1) --接口 RichParallelSourceFunction:多功能并行数据源(并行度能够>1) --类 【建议使用的】 ——…

概述

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。

 基础案例

package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {// ctrl + oprivate final Random random = new Random();private boolean flag = true;// 现在不用@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("实现一些资源的开启");}// 现在不用@Overridepublic void close() throws Exception {System.out.println("实现一些资源的关闭");}@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {while (flag){String stu_id = UUID.randomUUID().toString();String stu_name = "Student_"+stu_id;int stu_age = random.nextInt(8)+10;long stu_timestamp = System.currentTimeMillis();Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道@Overridepublic void cancel() {flag = false;System.out.println("停止运行");}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// add + new DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());int parallelism = studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}

cancel+open+close的调用时机

package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度(8个cpu)studentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute = env.execute();JobClient flink_job = env.executeAsync("Flink Job");Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
}

 kafkaSource

package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");// consumerFlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);// sourceDataStreamSource<String> dataStreamSource = env.addSource(consumer);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}}).print();env.execute();}
}

http://www.yayakq.cn/news/238852/

相关文章:

  • 绵阳网站建设 小程序网络服务投诉平台
  • 岳阳建设网站设计模板的软件
  • 东莞市企业网站建设平台美食网站 怎么做
  • 新浪sae wordpress成都网站快速排名优化
  • 电商网站首页开发引流推广的句子
  • 网站建设找谁好wordpress 调用短代码
  • 求网站资源懂的2021网站优化 推广
  • 食品网站设计专业制作结婚证
  • 东莞做网站推广公司wordpress添加自定义链接
  • 怎么建一个网站卖东西管理咨询公司利润率
  • 东莞建筑建设网站建设关于建设 医院网站的请示
  • 外贸wap网站搜索引擎优化文献
  • 网站免费软件推荐自己做网站打不开是怎么回事
  • 青海省住房和建设厅网站首页商丘网站开发公司
  • 招生处网站建设方案中企动力网站好么
  • 公司网站打不开是什么原因新乡网站建设那家好
  • 网站防止非法链接怎么做剪辑视频怎么学
  • 辽宁鞍山建设工程信息网站网站建设计划 文库
  • 安徽建筑大学城市建设学院网站wordpress 外部页面
  • 建设文库网站如何用dw建立网站
  • 狗贩子怎么做网站卖狗北京网络销售
  • 标准网站建设多少钱网站建设费属于服务类么
  • 网站建设设计报告前言wordpress 添加下载地址
  • 一个网站的设计周期网站文字大小代码
  • 广州科技网站建设北京大兴网站建设公司
  • 怎么建设批量模板网站信用中国 网站截图怎么做
  • 用eclipse编程做网站电商数据中台
  • 京东网站的建设与发展前景wordpress the7教程
  • 昆明凡科建站思明建设局网站
  • 怎么做网站服务器动漫网站设计的目的