当前位置: 首页 > news >正文

网站开发人员的行业分析信息流投放平台有哪些

网站开发人员的行业分析,信息流投放平台有哪些,茶叶网站建设费用明细,宿州做网站的有吗消息队列中的事务,主要是解决消息生产者和消息消费者数据一致性的问题。 应用场景 比如订单系统创建订单后,会发消息给购物车系统,将已下单的商品从购物车中删除。 由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤&a…

消息队列中的事务,主要是解决消息生产者和消息消费者数据一致性的问题。

应用场景

比如订单系统创建订单后,会发消息给购物车系统,将已下单的商品从购物车中删除。

由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤,所以使用消息队列来异步清理购物车是更合理的设计。

在这里插入图片描述

对于订单系统来说,它做了两件事情

  1. 在订单库中插入了一条订单数据,创建了订单;
  2. 给 MQ 发送了一条订单消息。

对于购物车系统来说,它做了一件事情

  1. 接收订单消息,删除购物车库中的商品,清理购物车。

在分布式系统中,上面的这几个步骤,都有可能失败,如果失败了不做处理的话,就会造成订单数据和购物车数据不一致的情况。

比如:

  1. 创建了订单,没有清理购物车;
  2. 购物车中的商品清掉了,订单没有创建成功。

所以,我们需要做的就是,要保证在任何步骤失败的情况下,订单数据和购物车数据的一致性。

对于购物车系统,失败的处理比较简单,只有成功删除商品后再提交消费确认,如果发生失败,因为没有提交消费确认,消息队列会重试。

所以,问题的重点在于,怎么保证订单系统创建订单和发送消息的步骤,要么都成功,要么都失败,不能一个成功一个失败。

分布式事务

消息队列是如何实现分布式事务的?就要用到事务消息了。

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

在这里插入图片描述半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

在上面的步骤中,如果第 4 步提交事务消息失败了(比如网络异常),怎么办?

对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。

  • Kafka :简单粗暴,直接抛出异常,让用户自行处理。可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿;
  • RocketMQ:事务反查机制。

RocketMQ方案

在 RocketMQ 的分布式事务实现中,增加了事务反查机制来解决事务消息提交失败的问题。

如果订单系统在第 4 步提交或回滚事务消息失败(如网络异常),Broker 迟迟没有收到提交或回滚的消息,Broker 会定期去订单系统上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

所以,订单系统需要提供一个反查本地事务状态的接口,即根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。

在这里插入图片描述

使用限制

消息类型一致性

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

消费事务性

RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

中间状态可见性

RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

事务超时机制

RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

使用建议

避免大量未决事务导致超时

RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
  • 程序能正确识别正在进行中的事务。

使用示例

创建事务主题

sh bin/mqadmin updatetopic -n localhost:9876 -t TransactionTopic -c DefaultCluster -a +message.type=TRANSACTION

生产者代码

模拟正常流程,本地事务成功提交

public class ProducerTransactionExample {public static void main(String[] args) throws Exception {String endpoint = "182.92.198.60:8080";String topic = "TransactionTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);builder.setRequestTimeout(Duration.ofSeconds(20));ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).setTransactionChecker(messageView -> {System.out.println("5.broker回查事务状态");String orderId = messageView.getProperties().get("orderId");if (Strings.isNullOrEmpty(orderId)) {return TransactionResolution.ROLLBACK;}if (checkOrderById(orderId)) {System.out.println("7.本地事务状态成功,提交消息");return TransactionResolution.COMMIT;} else {System.out.println("7.本地事务状态失败,回滚消息");return TransactionResolution.ROLLBACK;}}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();System.out.println("1.开启事务");} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。System.out.println("1.事务开启失败");return;}// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("transaction").addProperty("orderId", "o10086")// 消息体。.setBody(("测试事务消息,订单号o10086").getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);System.out.println("2.半消息发送成功,messageId:" + sendReceipt.getMessageId());} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。System.out.println("2.半消息发送失败");return;}boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();System.out.println("4.commit事务消息");} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();System.out.println("4.commit事务消息失败");}} else {try {transaction.rollback();System.out.println("4.rollback事务消息");} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();System.out.println("4.rollback事务消息失败");}}}/*** 模拟本地事务的执行结果** @return*/private static boolean doLocalTransaction() {System.out.println("3.执行本地事务,处理中");try {TimeUnit.SECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3.执行本地事务成功,提交事务");return true;}/*** 模拟本地事务反查** @param orderId* @return*/private static boolean checkOrderById(String orderId) {System.out.println("6.反查本地事务状态,订单号:" + orderId + "能查到");return true;}
}

在这里插入图片描述
消费端在第4步后可以消费到消息。

在这里插入图片描述

模拟异常流程,将第4步提交/回滚的代码注释掉
在这里插入图片描述
消费端在第7步后可以消费到消息。

在这里插入图片描述

设置第一次事务回查时间
CHECK_IMMUNITY_TIME_IN_SECONDS 属性定义了从事务消息发送到 Broker 后,Broker 在多长时间内不会对这条消息发起回查。这个时间窗口为生产者提供了一个缓冲期,以确保即使在网络延迟或短暂的服务中断情况下,事务消息也不会被过早地回查。

Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("transaction").addProperty("orderId", "o10086").addProperty("CHECK_IMMUNITY_TIME_IN_SECONDS", "300")// 消息体。.setBody(("测试事务消息,订单号o10086").getBytes()).build();

在这里插入图片描述
消费端消费

在这里插入图片描述

http://www.yayakq.cn/news/531761/

相关文章:

  • 企业品牌网站源码wordpress页面都在
  • 用虚拟机做网站服务器吗公司名字大全免费查询
  • 正规网站建设公司哪个比较好装饰公司网站模板
  • 网站集群系统 如何做域名解析太原做网站设计
  • 无锡网站建设 微信网站建设公司宣传文案
  • 金州网站建设京东商城网站首页
  • 廊坊网站建设方案最新报价深圳网站制作公司兴田德润在哪里
  • 北京网站设计研究与开发公司东莞住房和城乡建设局网站
  • 做属于自己公司的网站wordpress 插件查看
  • 企业建设网站优势建设心理网站的背景
  • 网站的横幅怎么做的网络维护和故障维修
  • 网站建设需求分析要做的事wordpress集成微博登录
  • 模板支撑体系搭设规范网站首页关键词优化
  • 宁波网站建设设计公司信息wordpress 扫码支付
  • 可以制作网站的软件什么是专业网站
  • 最贵网站建设报价网页设计与制作教程的教学目标
  • 网站推广策略包括哪些内容网站建设补充协议模板
  • 网站建设用哪的图片不侵权wordpress支付文件在哪里设置密码
  • 全影网的网站哪儿做d网站建设市场价
  • 江门骏域网站建设巴中建设局网站
  • wordpress站点设置使用时间文化传媒建设网站
  • 常州建设工程交易网站沭阳奥体小区做网站
  • 邯郸市做网站的公司国外网站推广服务
  • 网站搭建团队放射科网站建设
  • 西安市网站建设网络营销是销售吗
  • 可以盗链图片的网站上google必须翻墙吗
  • 北京做网站个人专业软文发稿平台
  • 网站制作关键词普定县建设局网站
  • 宠物网站建设策划方案多语言外贸网站
  • 做影视会员网站学会网站建设三方协议