深圳宝安医院的网站建设,马鞍山网站设计价格,国家排污许可网站台账怎么做,无锡百度文章目录 22#xff1a;FineBI配置数据集23#xff1a;FineBI构建报表24#xff1a;FineBI实时配置测试附录二#xff1a;离线消费者完整代码 22#xff1a;FineBI配置数据集 目标#xff1a;实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows… 文章目录 22FineBI配置数据集23FineBI构建报表24FineBI实时配置测试附录二离线消费者完整代码 22FineBI配置数据集 目标实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows版本安装手册.docx》安装FineBI 配置连接 数据连接名称Momo
用户名root
密码自己MySQL的密码
数据连接URLjdbc:mysql://node1:3306/momo?useUnicodetruecharacterEncodingutf8数据准备 SELECT id, momo_totalcount,momo_province,momo_username,momo_msgcount,CASE momo_grouptype WHEN 1 THEN 总消息量 WHEN 2 THEN 各省份发送量 WHEN 3 THEN 各省份接收量WHEN 4 THEN 各用户发送量 WHEN 5 THEN 各用户接收量 END AS momo_grouptype
FROM momo_count小结 实现FineBI访问MySQL结果数据集的配置
23FineBI构建报表 目标实现FineBI实时报表构建 路径 step1实时报表构建step2实时报表配置step3实时刷新测试 实施 实时报表构建 新建仪表盘 添加标题 实时总消息数 发送消息最多的Top10用户 接受消息最多的Top10用户 各省份发送消息Top10 各省份接收消息Top10 各省份总消息量 小结 实现FineBI实时报表构建
24FineBI实时配置测试 目标实现实时报表测试 实施 实时报表配置 官方文档https://help.fanruan.com/finebi/doc-view-363.html 添加jar包将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下 注意如果提示已存在就选择覆盖 添加JS文件 创建js文件refresh.js setTimeout(function () {var b document.title;var a BI.designConfigure.reportId;//获取仪表板id//这里要指定自己仪表盘的idif (ad574631848bd4e33acae54f986d34e69) {setInterval(function () {BI.SharingPool.put(controlFilters, BI.Utils.getControlCalculations());//Data.SharingPool.put(controlFilters, BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);}, 3000);//5000000为定时刷新的频率单位ms}
}, 2000)将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中 关闭FineBI缓存然后关闭FineBI 修改jar包添加js !-- 增加刷新功能 --
script typetext/javascript src/webroot/refresh.js/script重启FineBI 实时刷新测试 清空MySQL结果表 启动Flink程序运行MoMoFlinkCount 启动Flume程序 cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.loggerINFO,console启动模拟数据 java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
10- 观察报表 小结 实现FineBI实时测试
## 附录一Maven依赖xml!--远程仓库--repositoriesrepositoryidaliyun/idurlhttp://maven.aliyun.com/nexus/content/groups/public//urlreleasesenabledtrue/enabled/releasessnapshotsenabledfalse/enabledupdatePolicynever/updatePolicy/snapshots/repository/repositoriesdependencies!--Hbase 客户端--dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion2.1.0/version/dependency!--kafka 客户端--dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.1/version/dependency!--JSON解析工具包--dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency!--Flink依赖--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.10.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion1.10.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_2.11/artifactIdversion1.10.0/version/dependency!-- flink操作hdfs、Kafka、MySQL、Redis所需要导入该包--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-shaded-hadoop-2-uber/artifactIdversion2.7.5-10.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.11/artifactIdversion1.10.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-jdbc_2.11/artifactIdversion1.10.0/version/dependencydependencygroupIdorg.apache.bahir/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.0/version/dependency!--HTTP请求的的依赖--dependencygroupIdorg.apache.httpcomponents/groupIdartifactIdhttpclient/artifactIdversion4.5.4/version/dependency!--MySQL连接驱动--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationtarget1.8/targetsource1.8/source/configuration/plugin/plugins/build附录二离线消费者完整代码
package bigdata.itcast.cn.momo.offline;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;/*** ClassName MomoKafkaToHbase* Description TODO 离线场景消费Kafka的数据写入Hbase* Create By Maynor*/
public class MomoKafkaToHbase {private static SimpleDateFormat format new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);private static Connection conn;private static Table table;private static TableName tableName TableName.valueOf(MOMO_CHAT:MOMO_MSG);//表名private static byte[] family Bytes.toBytes(C1);//列族//todo:2-构建Hbase连接//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try {//构建配置对象Configuration conf HBaseConfiguration.create();conf.set(hbase.zookeeper.quorum,node1:2181,node2:2181,node3:2181);//构建连接conn ConnectionFactory.createConnection(conf);//获取表对象table conn.getTable(tableName);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {//todo:1-构建消费者获取数据consumerKafkaToHbase();
// String momoRowkey getMomoRowkey(2020-08-13 12:30:00, 13071949728, 17719988692);
// System.out.println(momoRowkey);}/*** 用于消费Kafka的数据将合法数据写入Hbase*/private static void consumerKafkaToHbase() throws Exception {//构建配置对象Properties props new Properties();//指定服务端地址props.setProperty(bootstrap.servers, node1:9092,node2:9092,node3:9092);//指定消费者组的idprops.setProperty(group.id, momo1);//关闭自动提交props.setProperty(enable.auto.commit, false);//指定K和V反序列化的类型props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);//构建消费者的连接KafkaConsumerString, String consumer new KafkaConsumer(props);//指定订阅哪些Topicconsumer.subscribe(Arrays.asList(MOMO_MSG));//持续拉取数据while (true) {//向Kafka请求拉取数据等待Kafka响应在100ms以内如果响应就拉取数据如果100ms内没有响应就提交下一次请求 100ms为等待Kafka响应时间//拉取到的所有数据多条KV数据都在ConsumerRecords对象类似于一个集合ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据打印//取出每个分区的数据进行处理SetTopicPartition partitions records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for (TopicPartition partition : partitions) {ListConsumerRecordString, String partRecords records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset 0;for (ConsumerRecordString, String record : partRecords) {//获取TopicString topic record.topic();//获取分区int part record.partition();//获取offsetoffset record.offset();//获取KeyString key record.key();//获取ValueString value record.value();System.out.println(topic \t part \t offset \t key \t value);//将Value数据写入Hbaseif(value ! null !.equals(value) value.split(\001).length 20 ){writeToHbase(value);}}//手动提交分区的commit offsetMapTopicPartition, OffsetAndMetadata offsets Collections.singletonMap(partition,new OffsetAndMetadata(offset1));consumer.commitSync(offsets);}}}/*** 用于实现具体的写入Hbase的方法* param value*/private static void writeToHbase(String value) throws Exception {//todo:3-写入Hbase//切分数据String[] items value.split(\001);String stime items[0];String sender_accounter items[2];String receiver_accounter items[11];//构建rowkeyString rowkey getMomoRowkey(stime,sender_accounter,receiver_accounter);//构建PutPut put new Put(Bytes.toBytes(rowkey));//添加列put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(msg_time),Bytes.toBytes(items[0]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_nickyname),Bytes.toBytes(items[1]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_account),Bytes.toBytes(items[2]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_sex),Bytes.toBytes(items[3]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_ip),Bytes.toBytes(items[4]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_os),Bytes.toBytes(items[5]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_phone_type),Bytes.toBytes(items[6]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_network),Bytes.toBytes(items[7]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(sender_gps),Bytes.toBytes(items[8]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_nickyname),Bytes.toBytes(items[9]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_ip),Bytes.toBytes(items[10]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_account),Bytes.toBytes(items[11]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_os),Bytes.toBytes(items[12]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_phone_type),Bytes.toBytes(items[13]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_network),Bytes.toBytes(items[14]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_gps),Bytes.toBytes(items[15]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(receiver_sex),Bytes.toBytes(items[16]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(msg_type),Bytes.toBytes(items[17]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(distance),Bytes.toBytes(items[18]));put.addColumn(Bytes.toBytes(C1),Bytes.toBytes(message),Bytes.toBytes(items[19]));//执行写入table.put(put);}/*** 基于消息时间、发送人id、接受人id构建rowkey* param stime* param sender_accounter* param receiver_accounter* return* throws Exception*/private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {//转换时间戳long time format.parse(stime).getTime();String suffix sender_accounter_receiver_accounter_time;//构建MD5String prefix MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix_suffix;}
}