重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这期内容当中小编将会给大家带来有关REST微服务中怎么利用消息中间件实现分布式事务,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
成都创新互联从2013年创立,先为下冶等服务建站,下冶等地企业,进行企业商务咨询服务。为下冶企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
我们还是使用之前的实例,一个订票系统的购票逻辑:
这篇教程的源代码可以从github上获取。
使用消息中间件实现分布式事务,也就是使用事件驱动实现。在这种方式下,Order服务不会直接调用User服务,而是往MQ上发一个消息,说明有新订单需要扣费;User服务会响应这个消息,并处理,处理完成后再发一个消息,说明有新订单需要转移票;然后就会有Ticket服务来处理。而每个服务都是在一个事务里面处理读消息、处理业务、写消息的事情。大致流程如下:
在这种方式下,订单的处理是异步的,用户发起一个订单的时候,只是生成一个正在处理的订单,然后通过消息中间件一步步的进行扣费、交票、完成订单等逻辑。而每一个服务中相应的操作,基本都是:
从一个队列中读取消息
操作相应数据库操作
往下一个队列中发送消息
也就是说,需要在这个方法中需要操作数据库和MQ两个资源,这正好是上一篇文章中介绍Spring内部事务和外部事务时使用的实例中的场景。下面就是大致的代码:
12345678 | @JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);} |
它监听MQ的”order:new”队列,处理订单,往”order:need_to_pay”发送一个消息。然后用户服务就会接收这个消息,触发扣费流程。
在这个地方,我们可以使用JTA事务,来使用两阶段提交来实现两个资源的共同提交,但是这会影响系统的性能。而且,还需要使用的消息中间件实现了XA的规范,提供两阶段提交的功能。
这里也可以使用本地事务,这时,每个事物都会有一个JMS的Session,并使用事务。如此一来,就存在一个数据库的事物和一个JMS的事务,两个事务是相互独立并依次提交的。这样,就有可能在极少数情况下出错,但是也能采取一些错误来尽量解决。我们对上面的事务处理展开(伪代码,只是为了说明处理过程),来看看出错的情况以及该如何处理:
1234567891011 | jmsTransaction.begin(); // get transactions from jms sessiondbTransaction.begin(); // get transactions from JDBC connectiontry {orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);dbTransaction.commit();jmsTransaction.commit();} catch(Exception e) {dbTransaction.rollback();jmsTransaction.rollback();} |
在上面的方法中,只要发生了错误,MQ消息的消费就算失败,MQ的监听器就会重新触发一次这个方法。
这时,如果错误发生在:
数据提交时或之前。这时,整个数据库的操作都会被重置(也可能就根本还没更新),重试的时候不需要考虑重复提交的问题,因为之前的提交都已经被回滚。
数据库提交成功,但是JMS提交失败。这时就需要防止重复提交来避免数据库的重复操作。
我们可以采用之前说过的token方式,在调用这个方法前,生成一个唯一的token。这里使用Java的UUID生成一个ID作为token。(如果这里的重复调用只是在这个服务内部重新触发,就不需要考虑分布式系统的全局一致性ID的问题。这需要根据实际情况来判断用什么样的UUID生成方式)所以,Controller里面接受购票请求如下:
1234567 | @PostMapping(value = "/")@Transactionalpublic void create(@RequestBody OrderDTO orderDTO) {String uid = UUID.randomUUID().toString();orderDTO.setToken(uid);jmsTemplate.convertAndSend("order:new", orderDTO);} |
然后在Service里面监听这个队列,处理购票:
123456789101112131415 | @JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {if (!this.processedUIDs.contains(orderDTO.getToken())) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);orderDTO.setStatus(order.getStatus());orderDTO.setId(order.getId());} else {LOG.info("Duplicate jms message:{}", orderDTO);}jmsTemplate.convertAndSend("order:need_to_pay", orderDTO);processedUIDs.add(orderDTO.getToken());} |
简单来说,解决办法就是,如果是重复触发的,就略过数据库相关的处理,直接往MQ的目标队列发送需要的数据。使得整个流程能够往下走。
刚才说的是在一个服务内出错的情况,还有一种错误情况是,订单服务和用户服务已经处理完订单创建和扣费的操作,然后到了Ticket服务的时候,却发现没有票了。虽然我们可以通过合理的设计业务逻辑来避免这种问题,例如,在操作之前先检查用户余额,检查并锁票,然后进行操作数据的事情。但是,在有些情况下,很难通过业务流程的设计来完全避免这种问题。如果出现了这种的问题,我们也可以通过撤销的流程来实现,业务流程如下:
在上面的解决方案中,使用JDK的UUID类生成一个ID,实际上这个ID只是在当前的JVM内,才能够保证是唯一的。其次,在JMS的标准中,没有规定一个消息的Listener在读取一个消息失败后,重新读取的问题。在微服务环境中,如果一个应用部署了多个实例,那个这个消息有可能会被另一个实例读到。所以在上面的方案中,使用JVM内的唯一ID放在消息的内容中,它有可能被任意一个实例处理,处理失败后,又有可能被另一个实例处理。这就会出问题。所以我们需要一个分布式环境下的生成唯一ID的解决办法。例如,先获得JVM的唯一ID以后,再加上IP+端口等信息。而且,对已经处理过的ID的缓存,也需要存在分布式环境中。
所以,我们完全可以不使用两阶段提交,就实现微服务架构下的分布式事务。使用这种方式,它的优点是:
实现简单。结合Spring的事务,几乎不用写额外的事务相关的代码,就能够实现。我们只需要更好的服务的拆分和设计业务流程。
系统吞吐量高。因为数据库或MQ不会被长期的锁住,可以并发的处理更多的事务。
容错性好。各个服务之间通过MQ来触发协调,即使在处理一个任务的时候有一个服务停了,消息还会一直保持,直到服务起来开始监听,然后继续触发这个任务。
当然这种方式也有一些缺点,最大的问题就是异步处理的问题。用户发出一个请求后,处理该业务的服务只是简单处理,往MQ发送消息开始处理流程,然后就返回了。这时候这个任务还在处理。虽然有时候我们可以通过等的方式,等待最终处理完成的消息,然后在返回给用户。但是这样又得考虑响应时间、超时、各种错误等情况。
有些人会觉得这种方式使得开发和调试都变得复杂,在我看来,恰恰相反,这使得开发和调试都简单了。首先,根据微服务架构的设计原则,就是每个服务只负责一个功能模块;再者,根据面向对象的设计原则,一个方法只做一件事情。如果我们能够合理的拆分服务,和每一步的处理方法,这正是一个好的设计。在维护的时候,每个方法、每个步骤做什么事情,都很清楚。
说到调试,我的原则是,你应该通过单元测试来发现和解决问题,而不是调试。以上面的购票流程为例,每一个服务,通过MQ触发一个方法的时候,它的参数应该是什么、状态是什么都应该是明确的,这个方法执行完成后,会产生什么新的数据,状态会更新成什么,都应该是明确的。而这些都可以通过单元测试来很好的测试。如果你的复杂流程中的每一个都能通过单元测试进行完善的测试,那么这些方法串联到一起,不但能够很好的工作,也能应付各种异常的情况。
上述就是小编为大家分享的REST微服务中怎么利用消息中间件实现分布式事务了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。