问题描述

之前遇到过一个这样的问题:在服务A里的执行一个保存数据库方法,数据保存成功后会将主键ID通过MQ发送给服务B,服务B再根据主键ID去查询保存的数据,进行其他逻辑处理。后来发现,在服务B中根据MQ发送过来的ID通过数据库偶尔会查不到数据信息。

后来通过调试才发现,是因为在服务A里的保存方法加了事务注解,保存的数据结果只有在当前方法执行完成后才会对外生效,而MQ消息则是在保存方法执行前发送的,如果服务B在服务A中保存方法执行完成前就收到了MQ消息,就会导致上述问题发生。同样,如果在MQ发送成功后,保存方法发生了异常导致事务回滚,服务B也会查不到数据或者查询到错误的数据。

问题分析

导致上述问题发生的根本原因还是因为发送MQ消息是在加了事务回滚的方法内部执行的,通过该方法保存或更新的数据只有在整个方法结束后才会对外生效,而MQ的消费者却有可能于改方法执行完成前收到消息。因此,最直接的解决办法是要将MQ消息放到事务方法结束后再执行。

但是,由于在项目中有很多处都是采用上述的这种逻辑,一个个改起来比较麻烦,最好能有一个通用的方式能够尽量少改动之前的业务逻辑代码就能解决问题。

解决方法

对于这种通用业务的问题第一个想到的解决方法就是利用AOP:拦截所有带有事务回滚注解(@Transactional)的方法,通过某种方式获取到该方法内部所有要执行的发送MQ的调用代码,让它们在事务方法执行成功后在执行。

示例代码

TransactionMessageAspect继承TransactionSynchronizationAdapter,实现对所有带有@Transactional注解方法的拦截:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Aspect
@Component
public class TransactionMessageAspect extends TransactionSynchronizationAdapter {

@Resource
private TransactionInterceptorHandler transactionInterceptorHandler;

/**
* 入口:拦截带有 @Transactional的方法,标记当前方法已进入事务模式
*/
@Before("@annotation(org.springframework.transaction.annotation.Transactional)")
public void registerTransactionSyncrhonization() {
TransactionSynchronizationManager.registerSynchronization(this);
transactionInterceptorHandler.signInTransaction();
}

@Override
public void beforeCommit(boolean readOnly) {
System.out.println("before commit");
}

/**
* 在事务结束并且没被回滚时再依次执行Callable方法
*
* @param status
*/
@Override
public void afterCompletion(int status) {
System.out.println("afterCompletion");
try {
if (status != STATUS_ROLLED_BACK && !CollectionUtils.isEmpty(transactionInterceptorHandler.getActions())) {
for (Callable action : transactionInterceptorHandler.getActions()) {
action.call();
}
}
} catch (Exception e) {

} finally {
transactionInterceptorHandler.clear();
}

}

@Override
public void afterCommit() {
System.out.println("afterCommit");
}


@Override
public void suspend() {
System.out.println("suspend");
}

@Override
public void resume() {
System.out.println("resume");
}

@Override
public void flush() {
System.out.println("flush");
}

@Override
public void beforeCompletion() {
System.out.println("beforeCompletion");
}

}

TransactionInterceptorHandler:使用ThreadLocal对当前线程中要执行的发送MQ方法进行缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Component
public class TransactionInterceptorHandler {
private ThreadLocal<Entity> cache = ThreadLocal.withInitial(() -> null);

public void clear() {
cache.remove();
}

public boolean hasTransaction() {
Entity e = cache.get();
if (e == null) {
return false;
}
return e.getInTransaction() != null && e.getInTransaction();
}

public List<Callable> getActions() {
Entity e = cache.get();
if (e == null) {
return Collections.emptyList();
}
return e.getActions();
}

public void signInTransaction() {
Entity e= cache.get();
if (e == null) {
e = new Entity();
e.setInTransaction(true);
e.setActions(new ArrayList<>());
}
cache.set(e);
}

public void addAction(Callable action) {
Entity e = cache.get();
e.getActions().add(action);
}

@Data
public class Entity {
private List<Callable> actions;
private Boolean inTransaction;
}
}

MqMessage: 发送MQ消息的封装类

1
2
3
4
5
6
7
8
9
@Component
public class MqMessage implements BaseMessage {
@Resource
private TransactionInterceptorHandler transactionInterceptorHandler;
@Override
public void sendMessage(Object message) {
System.out.println("[" + LocalDateTime.now() + "] sendMsg :" + JSON.toJSONString(message));
}
}

将其更改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class MqMessage implements BaseMessage {
@Resource
private TransactionInterceptorHandler transactionInterceptorHandler;
@Override
public void sendMessage(Object message) {

if (transactionInterceptorHandler.hasTransaction()) {
Callable<Object> callable = () -> doSendMessage(message);
transactionInterceptorHandler.addAction(callable);
}else {
doSendMessage(message);
}
}

private Object doSendMessage(Object message) {
System.out.println("[" + LocalDateTime.now() + "] sendMsg :" + JSON.toJSONString(message));
return null;
}
}

流程分析

  1. TransactionMessageAspect会拦截带有@Transactional注解的方法,使用TransactionInterceptorHandler.signInTransaction()标记当前方法已进入事务模式;
  2. 如果在执行事务方法的过程中,有调用MqMessage.sendMessage()方法进行传递,会先将要发送的消息逻辑封装到Callable中,并通过TransactionInterceptorHandler.addAction保存在本地线程中;
  3. 当事务提交成功并没有回滚后再通过TransactionMessageAspect.afterCompletion()方法执行保存在本地线程中要发送MQ的调用方法;

参考

  1. Spring hibernate , how to call some method after transaction commit or transaction rollback
  2. Creating a post commit when using transaction in Spring