当前位置: 首页 > news >正文

柯桥网站建设书生商友买流量平台

柯桥网站建设书生商友,买流量平台,还有网站吗,dns修改国外网站概述 SourceFunction:非并行数据源(并行度只能1) --接口 RichSourceFunction:多功能非并行数据源(并行度只能1) --类 ParallelSourceFunction:并行数据源(并行度能够>1) --接口 RichParallelSourceFunction:多功能并行数据源(并行度能够>1) --类 【建议使用的】 ——…

概述

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。

 基础案例

package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {// ctrl + oprivate final Random random = new Random();private boolean flag = true;// 现在不用@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("实现一些资源的开启");}// 现在不用@Overridepublic void close() throws Exception {System.out.println("实现一些资源的关闭");}@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {while (flag){String stu_id = UUID.randomUUID().toString();String stu_name = "Student_"+stu_id;int stu_age = random.nextInt(8)+10;long stu_timestamp = System.currentTimeMillis();Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道@Overridepublic void cancel() {flag = false;System.out.println("停止运行");}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// add + new DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());int parallelism = studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}

cancel+open+close的调用时机

package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度(8个cpu)studentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute = env.execute();JobClient flink_job = env.executeAsync("Flink Job");Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
}

 kafkaSource

package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");// consumerFlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);// sourceDataStreamSource<String> dataStreamSource = env.addSource(consumer);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}}).print();env.execute();}
}

http://www.yayakq.cn/news/185362/

相关文章:

  • 做外贸无法登录国外网站怎么办竞价外包推广
  • 网站建设wang1314微商好货源app下载
  • 云主机建网站教程江门seo排名优化
  • 广东两学一做网站如何用织梦做网站详细教程
  • 网站建设服务器如何选择软件开发公司是干什么的
  • 怎样创建一个网站平台校园网站建设初探
  • 免费建网站的网站环保行业网站建设
  • 已购买域名 如何做网站泰州企业网站建设
  • 安徽省建设银行网站鞍山吧
  • 怎么自己做投票网站国内做的比较大的外贸电商网站
  • 帝国cms 企业网站自己开发网站要多少钱
  • 做网站要学哪些代码建设跳转公积金网站
  • 专业做家具的网站wordpress评论区插件
  • 网站怎么做微博认证长春教做网站带维护的培训机构
  • 做微商必会的软件网站专业做网站哪家好
  • 不用备案的网站谷歌地图网站代码
  • 网站制作温州腾讯企点
  • 赵朴初网站建设建站cms源码
  • 做动态文字的网站360浏览器有些网页打不开是什么原因
  • 手机制作钓鱼网站重庆模板网站建站
  • 河南省大型项目建设办公室网站wordpress更改logo
  • 网站想做个链接怎么做温州网页设计美工招聘
  • 怎样建立自己的网站赚钱厦门注册公司流程和费用多少
  • seo发外链网站wordpress页面采集
  • 网站源码免费的网站备案拍照幕布
  • 电商网站开发定制河北邢台地震
  • jsp网站开发简单代码驻马店建设网站
  • 有没有学校需要建设网站网络营销课程设计心得体会
  • 检索标准的网站阿里云备案网站备案域名购买
  • 素马网站制作开发前端培训费用大概多少郑州