当前位置: 首页 > news >正文

河源建设网站网站开发 简单留言板

河源建设网站,网站开发 简单留言板,天津铁路建设投资控股(集团)网站,前端开发培训班多少钱使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” , value 值为 “ 窗口结束时间,设备id” &am…

使用 Flink 消费 Kafka ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” value 值为 窗口结束时间,设备id” (窗口结束时间格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli HGETALL key方式获取 warning_last3min_everymin_out值。
注:时间语义使用 Processing Time
  1. Kafka Source

    • 从 Kafka 中读取实时的设备预警数据,数据内容应当包括设备 ID 和预警状态等信息。
    • 数据通过 SimpleStringSchema 反序列化为字符串格式,再由 parseMessage 进行解析和提取。
  2. 流处理与窗口

    • Flink 使用滑动时间窗口 (SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。
    • 这意味着每 1 分钟计算一次,在每次计算中,会考虑过去 3 分钟内的数据,因此具有滑动窗口的特点。
  3. 窗口函数

    • 在 MaxNumWarnMachineID 中,窗口内的数据按设备 ID 分组,统计每个设备的预警次数,并选出预警次数最多的设备 ID。
    • apply 方法处理窗口内的数据后,输出一个包含时间戳(窗口结束时间)和设备 ID 的元组。
  4. Redis Sink

    • 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为 HSET
    • Redis 中的键为 warning_last3min_everymin_out,值为设备 ID。

 

package flink.calculate.ChangeRecordimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable// 定义常量
object Constants {val TOPIC_NAME = "ChangeRecord"val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"val REDIS_HOST = "192.168.222.101"
}// 主程序逻辑
object WarningLast3MinEveryMinOut {def main(args: Array[String]): Unit = {// 创建流执行环境并配置val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置作业并行度// 构建Kafka数据源val kafkaSource = buildKafkaSource()// 从Kafka读取数据并处理val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME).map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态).filter(_._3 == "预警") // 过滤非预警状态的数据.keyBy(_._1) // 按标识符分组.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口.apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备// 输出到控制台和RedisdataStream.print("Result =>")dataStream.addSink(buildRedisSink())// 执行Flink作业env.execute("WarningLast3MinEveryMinOut Job")}// 构建Kafka数据源private def buildKafkaSource(): KafkaSource[String] = {KafkaSource.builder[String]().setTopics(Constants.TOPIC_NAME).setBootstrapServers(Constants.BOOTSTRAP_SERVERS).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build()}// 解析来自Kafka的消息为元组private def parseMessage(message: String): (String, String, String) = {val fields = message.split(",")("warning_last3min_everymin_out", fields(1), fields(3))}// 构建Redis Sinkprivate def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)}
}// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {// 统计每个设备ID的预警次数val machineCounts = input.groupBy(_._2).view.mapValues(_.size)// 获取窗口结束时间val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))// 获取预警次数最多的设备IDif (machineCounts.nonEmpty) {val maxMachineId = machineCounts.maxBy(_._2)._1out.collect((windowEndTime, maxMachineId))}}
}// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}

 

http://www.yayakq.cn/news/746183/

相关文章:

  • 电商网站建设模板做物流网站多少钱
  • 法库综合网站建设方案软件技术专升本
  • 可以自己买服务器做网站吗小型教育网站开发
  • 国内优秀设计网站站长自适应网站做百度推广
  • 璧山网站建设房产投资还有前景吗
  • 江苏工信部网站备案查询网站建设那个公司好
  • 广州做网站公司电话网站开发招标书
  • 手机html5 网站导航代码微信模板消息
  • 长春定制建站企业网站新乡微网站建设
  • 江西建设职业技能教育咨询网站免费asp网站程序下载
  • 音乐网站设计源码上海建设银行官网网站
  • 做网站分期付款比例招人制作网站
  • 韩国网站购物网站开发怎么自学
  • 学做卤味视频网站什么网站可以发布有偿做项目
  • 光明随心订网站怎么做网站建站的技术解决方案
  • 做视频网站需要什么条件农商1号的网站建设费
  • 模板手机网站建设价格明细表php网站开发模式有哪些
  • 用花生壳怎么做网站的服务器木门行业网站该怎么做
  • 学做网站论坛vip号码开发网站app公司
  • 东莞做网站优化天助网络武安市精品网站开发
  • 大兴做网站学校官网
  • 网站接入商是什么意思会员卡管理系统哪里买
  • 仅有网站做app网络设计有限公司
  • 无忧网站源码关于网站建设的报告
  • 建网站自学wordpress 更新feed
  • 黑龙江省华龙建设有限公司网站seo具体seo怎么优化
  • 米课中有个内贸网站建设方象科技专注于什么领域
  • 一个专门做ppt的网站vps 做网站
  • 番禺响应式网站开发怎么利用QQ空间给网站做排名
  • 网站开发前端与后端源代码同行做的好的网站