问题描述 之前遇到过一个这样的问题:在服务A里的执行一个保存数据库方法,数据保存成功后会将主键ID通过MQ发送给服务B,服务B再根据主键ID去查询保存的数据,进行其他逻辑处理。后来发现,在服务B中根据MQ发送过来的ID通过数据库偶尔会查不到数据信息。
后来通过调试才发现,是因为在服务A里的保存方法加了事务注解,保存的数据结果只有在当前方法执行完成后才会对外生效,而MQ消息则是在保存方法执行前发送的,如果服务B在服务A中保存方法执行完成前就收到了MQ消息,就会导致上述问题发生。同样,如果在MQ发送成功后,保存方法发生了异常导致事务回滚,服务B也会查不到数据或者查询到错误的数据。
问题分析 导致上述问题发生的根本原因还是因为发送MQ消息是在加了事务回滚的方法内部执行的,通过该方法保存或更新的数据只有在整个方法结束后才会对外生效,而MQ的消费者却有可能于改方法执行完成前收到消息。因此,最直接的解决办法是要将MQ消息放到事务方法结束后再执行。
但是,由于在项目中有很多处都是采用上述的这种逻辑,一个个改起来比较麻烦,最好能有一个通用的方式能够尽量少改动之前的业务逻辑代码就能解决问题。
解决方法 对于这种通用业务的问题第一个想到的解决方法就是利用AOP:拦截所有带有事务回滚注解(@Transactional
)的方法,通过某种方式获取到该方法内部所有要执行的发送MQ的调用代码,让它们在事务方法执行成功后在执行。
示例代码 TransactionMessageAspect
继承TransactionSynchronizationAdapter
,实现对所有带有@Transactional
注解方法的拦截:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Aspect @Component public class TransactionMessageAspect extends TransactionSynchronizationAdapter { @Resource private TransactionInterceptorHandler transactionInterceptorHandler; @Before("@annotation(org.springframework.transaction.annotation.Transactional)") public void registerTransactionSyncrhonization () { TransactionSynchronizationManager.registerSynchronization(this ); transactionInterceptorHandler.signInTransaction(); } @Override public void beforeCommit (boolean readOnly) { System.out.println("before commit" ); } @Override public void afterCompletion (int status) { System.out.println("afterCompletion" ); try { if (status != STATUS_ROLLED_BACK && !CollectionUtils.isEmpty(transactionInterceptorHandler.getActions())) { for (Callable action : transactionInterceptorHandler.getActions()) { action.call(); } } } catch (Exception e) { } finally { transactionInterceptorHandler.clear(); } } @Override public void afterCommit () { System.out.println("afterCommit" ); } @Override public void suspend () { System.out.println("suspend" ); } @Override public void resume () { System.out.println("resume" ); } @Override public void flush () { System.out.println("flush" ); } @Override public void beforeCompletion () { System.out.println("beforeCompletion" ); } }
TransactionInterceptorHandler
:使用ThreadLocal
对当前线程中要执行的发送MQ方法进行缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Component public class TransactionInterceptorHandler { private ThreadLocal<Entity> cache = ThreadLocal.withInitial(() -> null ); public void clear () { cache.remove(); } public boolean hasTransaction () { Entity e = cache.get(); if (e == null ) { return false ; } return e.getInTransaction() != null && e.getInTransaction(); } public List<Callable> getActions () { Entity e = cache.get(); if (e == null ) { return Collections.emptyList(); } return e.getActions(); } public void signInTransaction () { Entity e= cache.get(); if (e == null ) { e = new Entity(); e.setInTransaction(true ); e.setActions(new ArrayList<>()); } cache.set(e); } public void addAction (Callable action) { Entity e = cache.get(); e.getActions().add(action); } @Data public class Entity { private List<Callable> actions; private Boolean inTransaction; } }
MqMessage: 发送MQ消息的封装类
1 2 3 4 5 6 7 8 9 @Component public class MqMessage implements BaseMessage { @Resource private TransactionInterceptorHandler transactionInterceptorHandler; @Override public void sendMessage (Object message) { System.out.println("[" + LocalDateTime.now() + "] sendMsg :" + JSON.toJSONString(message)); } }
将其更改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Component public class MqMessage implements BaseMessage { @Resource private TransactionInterceptorHandler transactionInterceptorHandler; @Override public void sendMessage (Object message) { if (transactionInterceptorHandler.hasTransaction()) { Callable<Object> callable = () -> doSendMessage(message); transactionInterceptorHandler.addAction(callable); }else { doSendMessage(message); } } private Object doSendMessage (Object message) { System.out.println("[" + LocalDateTime.now() + "] sendMsg :" + JSON.toJSONString(message)); return null ; } }
流程分析
TransactionMessageAspect
会拦截带有@Transactional
注解的方法,使用TransactionInterceptorHandler.signInTransaction()
标记当前方法已进入事务模式;
如果在执行事务方法的过程中,有调用MqMessage.sendMessage()
方法进行传递,会先将要发送的消息逻辑封装到Callable
中,并通过TransactionInterceptorHandler.addAction
保存在本地线程中;
当事务提交成功并没有回滚后再通过TransactionMessageAspect.afterCompletion()
方法执行保存在本地线程中要发送MQ的调用方法;
参考
Spring hibernate , how to call some method after transaction commit or transaction rollback
Creating a post commit when using transaction in Spring