当前位置: 首页 > news >正文

昆明快速做网站网站建设服务合同

昆明快速做网站,网站建设服务合同,网站建设发布设需求,wordpress文章放视频pre 开启 mysql Binlog 网上有众多方法,自行百度。 查询是否成功,在 mysql 客户端输入 show BINARY LOGS; 出现如下提示,即表示 big log 正常开启。 1,下载 canal 服务端 传送门 注意:下载 canal.deployer-xxx …

pre 开启 mysql Binlog

网上有众多方法,自行百度。

查询是否成功,在 mysql 客户端输入

show BINARY LOGS;

出现如下提示,即表示 big log 正常开启。 

1,下载 canal 服务端

传送门

注意:下载 canal.deployer-xxx 版本即可。admin 是 deployer 的管理端。

2,上传到服务器的指定位置并解压

tar xzvf canal.deployer-1.1.6.tar.gz

注意,这个 deployer 解压之后直接是零散文件夹,建议先创建一个文件夹后,在这个文件夹里面进行解压

 3,配置实例

进入 conf 文件夹后,创建实例文件夹

cd conf/
mkdir test

从 example 文件夹中,拷贝instance.properties到当前文件夹

cp ../example/instance.properties .

 4,编辑实例文件

4.1 源数据库位置

//源数据位置
canal.instance.master.address=127.0.0.1:3306
//源数据 binlog 名字
canal.instance.master.journal.name=
//源数据 biglog 偏移量
canal.instance.master.position=

4.2 连接源数据库的用户名和密码

//连接源数据库用户名
canal.instance.dbUsername=canal
//连接源数据库密码
canal.instance.dbPassword=canal

4.3 编辑完,保存退出

5,编辑 canal 的配置文件

cd ..
vim canal.properties

5.1 加入新加的实例,已逗号分割

canal.destinations = example

6,部署客户端

这里客户端可以根据 canal 的 api 文档自行开发。

这里贴一些关键代码

{protected final static Logger logger = LoggerFactory.getLogger(CanalClientApplication.class);private static String ADDRESS = ConfigUtils.getConfigValue("application.properties", "canal.address");private static int PORT = Integer.parseInt(ConfigUtils.getConfigValue("application.properties", "canal.port"));private static String DESTINATION = ConfigUtils.getConfigValue("application.properties", "canal.destination");private static String USERNAME = ConfigUtils.getConfigValue("application.properties", "canal.username");private static String PASSWORD = ConfigUtils.getConfigValue("application.properties", "canal.password");private static String SUBSCRIBER = ConfigUtils.getConfigValue("application.properties", "canal.subscriber");public static void main(String args[]) {SpringApplication.run(CanalClientApplication.class,args);System.out.println("数据同步服务启动成功");// 创建链接logger.info("Trying to connect to " + ADDRESS + ":" + PORT);CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ADDRESS,PORT), DESTINATION, USERNAME, PASSWORD);int batchSize = 1000;try {logger.info("...");connector.connect();logger.info("connected");connector.subscribe(SUBSCRIBER);connector.rollback();logger.info("CanalClient Application started successfully!");while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();logger.info("当前 message 信息为:{}",message);if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {DataProcessor.process(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}} catch (Exception e) {e.printStackTrace();logger.error("Canal Client exit with error.", e);System.exit(-2);} finally {connector.disconnect();}}}
{protected final static Logger logger = LoggerFactory.getLogger(DataProcessor.class);private static String DATABASE = ConfigUtils.getConfigValue("application.properties", "canal.database");private static String TABLE = ConfigUtils.getConfigValue("application.properties", "canal.table");private static String OPERATOR = ConfigUtils.getConfigValue("application.properties", "canal.operator");private static String CANAL_OUTPUT = ConfigUtils.getConfigValue("application.properties", "canal.output");private static DorisUtil dorisUtil;private static MySQLUtil mySQLUtil;public static void process(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();if (eventType == CanalEntry.EventType.TRUNCATE && OPERATOR.contains("TRUNCATE")) {if (StringUtils.isEmpty(DATABASE) ||(entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {if (StringUtils.isEmpty(TABLE) ||(entry.getHeader().getTableName() != null && isContain(TABLE.split(","), entry.getHeader().getTableName()))) {logger.info("TRUNCATE TABLE " + entry.getHeader().getTableName());if (CANAL_OUTPUT.contains("mysql")) {mySQLUtil = MySQLUtil.getInstance();try {mySQLUtil.mySQLTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} catch (SQLException e) {e.printStackTrace();logger.error("MySQL执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());}}if (CANAL_OUTPUT.contains("doris")) {dorisUtil = DorisUtil.getInstance();try {dorisUtil.dorisTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} catch (SQLException e) {e.printStackTrace();logger.error("Doris执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());}}}}}for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {// 过滤database, table, operatorif (StringUtils.isEmpty(DATABASE) ||(entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {if (StringUtils.isEmpty(TABLE) ||(entry.getHeader().getTableName()!=null && isContain(TABLE.split(","),entry.getHeader().getTableName()))) {if (CANAL_OUTPUT.contains("mysql")) {mySQLUtil = MySQLUtil.getInstance();try {if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {mySQLUtil.mySQLDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {mySQLUtil.mySQLInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());} else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {mySQLUtil.mySQLUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());} else {// nothing to do}} catch (SQLException e) {logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);}}if (CANAL_OUTPUT.contains("doris")) {dorisUtil = DorisUtil.getInstance();try {if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {dorisUtil.dorisDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {dorisUtil.dorisInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());} else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {dorisUtil.dorisUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());} else {// nothing to do}} catch (SQLException e) {logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);}}}}}}}public static boolean isContain(String[] list, String value) {if (list == null || value == null) return false;for (String lv : list) {if (value.trim().equals(lv.trim())) {return true;}}return false;}private static void printColumn(String database, String table, List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {logger.info(database + "-" + table + "-" + column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}}

7,启动 canal 服务端

在 canal 根目录下,执行如下命令

./bin/startup.sh 

8,启动 canal 客户端

因为我用的 jar,所以,启动 jar 包就行了。

9,待完成事项

1,doris 官方文档上有通过 binLog 同步数据到 doris 中的方法,这部分待实现。

2,当前客户端写法单一。一旦canal 服务端重启,应用自动停机。待优化。

http://www.yayakq.cn/news/374214/

相关文章:

  • 2017网站开发发展前景计算机软件公司排名
  • 儿童摄影网站源码gif放网站有锯齿
  • 网站建设完成汇报网站建设有什么注意
  • 宜昌市高新区建设局网站南阳教育论坛网站建设
  • 网站开发设计模板厦门微信商城网站建设
  • 做网站怎么挣钱最快怎么样优化网站seo
  • 公司创建网站要多少钱网站建设的基本规范有什么
  • 唯一做魅惑的网站怎么上网站做简易注销的步骤
  • 网站的导航栏怎么做自适应h5网站模板
  • 织梦网站logo站酷app
  • 说明网站建设与网站运营的区别南昌建网站单位
  • 科网站建设杭州门户网站开发
  • wordpress栏目首页企业seo如何优化
  • 做冻品海鲜比较大的网站有哪些化妆品营销型网站模板
  • 都匀住房和城乡建设局网站视频软件app
  • 网站建设用户需求曲阳有没有做网站里
  • 免费的个人主页网站做网站的销售
  • 团购网站建设流程杭州网站建设官网蓝韵网络
  • 做个网站要花多少钱福州seo视频
  • 做网站会后期维护吗建网站的网站有哪些
  • 国外的一个大学生做的匿名社交网站深圳seo优化推广业务员
  • 营销型网站建设的价格珠海网站建设网络推广
  • 如何设置标签wordpress江苏网站seo设计
  • 省住房城乡建设厅网站广州现在可以正常出入吗
  • 网站安全漏洞扫描工具自己做网站如何盈利
  • 花溪村镇建设银行网站网站怎样做的有吸引力
  • 杭州网站设计我选柚v米科技搭建之星
  • 美橙网站建设学习教程哪些方法可以建设网站
  • 网站建设 李奥贝纳电子商务行业发展现状
  • 佛山住房和城乡建设部网站官网中国十大热门网站排名