消息复制
传统的复制方式
复制的基本单位是 Broker,即服务端的进程。复制采用的也是主从方式,通常情况下配置成一主一从,也支持一主多从。
提供了两种复制方式,一种是异步复制,消息先发送到主节点上,就返回“写入成功”,然后消息再异步复制到从节点上。另外一种方式是同步双写,消息同步双写到主从节点上,主从都写成功,才返回“写入成功”。
Broker 的主从关系是通过配置固定的,不支持动态切换。如果主节点宕机,生产者就不能再生产消息了,消费者可以自动切换到从节点继续进行消费。即使有一些消息没有来得及复制到从节点上,这些消息依然在主节点的磁盘上,除非是主节点的磁盘坏了,否则等主节点重新恢复服务的时候,这些消息依然可以继续复制到从节点上,也可以继续消费,不会丢消息,消息的顺序也是没有问题的。
这种主从复制方式,牺牲了可用性,换取了比较好的性能和数据一致性。
支持把一个主题分布到多对主从节点上去,每对主从节点中承担主题中的一部分队列,如果某个主节点宕机了,会自动切换到其他主节点上继续发消息,既解决了可用性的问题,还可以通过水平扩容来提升 Topic 总体的性能。
在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息,对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。在这种复制模式下,严格顺序和高可用只能选择一个。
Dledger
在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且支持通过选举来动态切换主节点的。
但选举过程中不能提供服务。最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可用,如果 2 个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较低。
另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制的方式快。
NameServer
一个独立的进程,为 Broker、生产者和消费者提供服务。最主要的功能是为客户端提供寻址服务,协助客户端找到主题对应的 Broker 地址。此外,还负责监控每个 Broker 的存活状态。
支持只部署一个节点,也支持部署多个节点组成一个集群,这样可以避免单点故障。在集群模式下,NameServer 各节点之间是不需要任何通信的,也不会通过任何方式互相感知,每个节点都可以独立提供全部服务。
每个 Broker 都需要和所有的 NameServer 节点进行通信。当 Broker 保存的 Topic 信息发生变化的时候,它会主动通知所有的 NameServer 更新路由信息,为了保证数据一致性,Broker 还会定时给所有的 NameServer 节点上报路由信息。这个上报路由信息的RPC 请求,同时起到 Broker 与 NameServer 之间的心跳作用,NameServer 依靠这个心跳来确定 Broker 的健康状态。
因为每个 NameServer 节点都可以独立提供完整的服务,对于生产者和消费者,只需要选择任意一个 NameServer 节点来查询路由信息就可以了。在生产或消费某个主题的消息之前,会先从 NameServer 上查询这个主题的路由信息,然后根据路由信息获取到当前主题和队列对应的 Broker 物理地址,再连接到 Broker 节点上进行生产或消费。
如果 NameServer 检测到与 Broker 的连接中断了,会认为这个 Broker 不再能提供服务,从路由信息中移除掉,避免客户端连接到一个不可用的 Broker 上去。而客户端在与 Broker 通信失败之后,会重新去NameServer 上拉取路由信息,然后连接到其他 Broker 上继续生产或消费消息,实现了自动切换失效 Broker 的功能。
事务
适用于解决本地事务和发消息的数据一致性问题。
确保执行本地事务和发消息这两个操作,要么都成功,要么都失败。并且,增加了一个事务反查的机制,来尽量提高事务执行的成功率和数据一致性。
选举
在 Dledger 中的 Leader 真的是通过投票选举出来的,所以不需要借助于任何外部的系统,仅靠集群的节点间投票来达成一致,选举出Leader。为了保证数据一致性、避免集群分裂,采用的是 Raft 一致性算法。
相比于Kafka的抢占式选举,优点是不需要借助外部系统,完全可以实现自我选举。缺点也非常明显,就是算法实在是太复杂了,非常难实现。并且,往往集群中的节点要通过多轮投票才能达成一致,这个选举过程是比较慢的,一次选举耗时几秒甚至几十秒都有可能。
消息存储
以 Broker 为单位。分为消息文件和索引文件,每个 Broker 只有一组消息文件,这个 Broker 上的所有主题的消息都存在这一组消息文件中。索引文件和 Kafka 一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件称为 ConsumerQueue。索引是定长稠密索引,它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的 20 个字节。
写入消息时,Broker 上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 * 索引固定长度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。这里两次寻址都是绝对位置寻址,比 Kafka 的查找要快。
优点
- 在写入时,同时写入的文件更少,有更好的批量(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在 Broker 上有很多主题和分区的情况下,有更好的写入性能。
- 采用了稠密索引,查找性能更好。
- 在一个 Broker 上有上千个活动主题的情况下,写入性能就会体现出优势。
缺点
- 以 Broker 为单位,较粗的粒度牺牲了灵活性。
- 稠密索引需要更多的存储空间。
源码分析
https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-5.3.2.tar.gz
Producer
启动过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
// 创建了一个 DefaultMQProducer 的实例
producer = new DefaultMQProducer(producerGroupTemp);
// 为它初始化一些参数
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setCompressMsgBodyOverHowmuch(16);
message = new Message(topic, new byte[] {'a'});
zeroMsg = new Message(topic, new byte[] {});
bigMessage = new Message(topic, "This is a very huge message!".getBytes());
// 调用 start 方法启动它
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public void start(final boolean startFactory) throws MQClientException {
// 记录和管理自身的服务状态,这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。
// 状态模式允许一个对象在其内部状态改变时改变它的行为,对象看起来就像是改变了它的类。
// 与标准的状态模式不同的是,它没有使用状态子类,而是使用分支流程(switch-case)来实现不同状态下的不同行为
switch (this.serviceState) {
case CREATE_JUST: // 中间状态
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 通过一个单例模式(Singleton Pattern)的 MQClientManager 获取MQClientInstance 的实例 mQClientFactory,没有则自动创建新的实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
defaultMQProducer.initProduceAccumulator();
// 在 mQClientFactory 中注册自己
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 启动 mQClientFactory
if (startFactory) {
mQClientFactory.start();
}
this.initTopicRoute();
this.mqFaultStrategy.startDetector();
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 将这些状态之间的转换路径考虑清楚,并在进行状态转换的时候,检查上一个状态是否能转换到下一个状态。
// 比如,只有处于 CREATE_JUST 状态才能转换为 RUNNING 状态,这样就可以确保这个服务是一次性的,只能启动一次。从而避免了多次启动服务而导致的各种问题。
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING: // 正常状态
case START_FAILED: // 异常状态
case SHUTDOWN_ALREADY: // 正常状态
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 给所有 Broker 发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
RequestFutureHolder.getInstance().startScheduledTask(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 启动请求响应通道
// 启动实例 mQClientAPIImpl,其中 mQClientAPIImpl 是类 MQClientAPIImpl 的实例,封装了客户端与 Broker 通信的方法
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 启动各种定时任务,包括与 Broker 之间的定时心跳,定时与 NameServer 同步数据等任务
this.startScheduledTask();
// Start pull service
// 启动拉消息服务
this.pullMessageService.start();
// Start rebalance service
// 启动 Rebalance 服务
this.rebalanceService.start();
// Start push service
// 启动 Producer 服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
- DefaultMQProducerImpl:Producer 的内部实现类,包含大部分 Producer 的业务逻辑,即发消息的逻辑。
- MQClientInstance:这个类中封装了客户端一些通用的业务逻辑,无论是 Producer 还是 Consumer,最终需要与服务端交互时,都需要调用这个类中的方法;
- MQClientAPIImpl:这个类中封装了客户端服务端的 RPC,对调用者隐藏了真正网络通信部分的具体实现;
- NettyRemotingClient:RocketMQ 各进程之间网络通信的底层实现类。
消息发送过程
在 Producer 的接口 MQProducer 中,定义了 19 个不同参数的发消息的方法,按照发送方式不同可以分成三类:
- 单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;
- 同步发送(Sync):发送消息后等待响应;
- 异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。 这三类发送实现基本上是相同的,异步发送稍微有一点儿区别。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param selector
* @param arg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, newCallBack,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
} catch (Exception e) {
newCallBack.onException(e);
}
} else {
newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
}
}
};
executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}
public void executeAsyncMessageSend(Runnable runnable, final Message msg, final BackpressureSendCallBack sendCallback,
final long timeout, final long beginStartTime)
throws MQClientException, InterruptedException {
// 使用了一个 ExecutorService 来实现异步发送:使用asyncSenderExecutor 的线程池,异步调用方法 sendSelectImpl(),继续发送消息的后续工作
ExecutorService executor = this.getAsyncSenderExecutor();
boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
boolean isSemaphoreAsyncNumAcquired = false;
boolean isSemaphoreAsyncSizeAcquired = false;
int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
sendCallback.msgLen = msgLen;
try {
if (isEnableBackpressureForAsyncMode) {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
long costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncNumAcquired = timeout - costTime > 0
&& semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
sendCallback.isSemaphoreAsyncNumAcquired = isSemaphoreAsyncNumAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();
if (!isSemaphoreAsyncNumAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
return;
}
defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncSizeAcquired = timeout - costTime > 0
&& semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
if (!isSemaphoreAsyncSizeAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
return;
}
}
// 当前线程把发送任务提交给 asyncSenderExecutor
executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (isEnableBackpressureForAsyncMode) {
runnable.run();
} else {
throw new MQClientException("executor rejected ", e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 选择将消息发送到哪个队列(Queue)中
try {
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
// 选择哪个队列发送由 MessageQueueSelector#select 方法决定。
// 使用了策略模式(Strategy Pattern),来解决不同场景下需要使用不同的队列选择算法问题。
// 策略模式:定义一系列算法,将每一个算法封装起来,并让它们可以相互替换。策略模式让算法独立于使用它的客户而变化。
// 提供了很多 MessageQueueSelector 的实现,例如随机选择策略,哈希选择策略和同机房选择策略等
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue threw exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
// 发送消息
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
// 上下文
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= this.defaultMQProducer.getCompressType().getCompressionFlag();
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
// 构建发送消息的头 RequestHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
requestHeader.setBrokerName(brokerName);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using compressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 将消息发送给队列所在的 Broker
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException | InterruptedException | MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}
Producer 整个发消息的流程,无论是同步发送还是异步发送,都统一到了同一个流程中。包括异步发送消息的实现,实际上也是通过一个线程池,在异步线程执行的调用和同步发送相同的底层方法来实现的。
NameServer
总体结构
- NamesrvStartup:程序入口。
- NamesrvController:NameServer 的总控制器,负责所有服务的生命周期管理。
- RouteInfoManager:NameServer 最核心的实现类,负责保存和管理集群路由信息,这些路由信息都是保存在内存中,并且没有持久化。
- BrokerHousekeepingService:监控 Broker 连接状态的代理类。
- DefaultRequestProcessor:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
- ClusterTestRequestProcessor:用于测试的请求处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
public class RouteInfoManager {
// ...
// 保存的是主题和队列信息,其中每个队列信息对应的类 QueueData 中,还保存了 brokerName。
// 这个 brokerName 并不真正是某个 Broker 的物理地址,它对应的一组 Broker 节点,包括一个主节点和若干个从节点。
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
// 保存了集群中每个 brokerName 对应 Broker 信息,每个 Broker 信息用一个 BrokerData 对象表示。
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
// 保存的是集群名称与 BrokerName 的对应关系。
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 保存了每个 Broker 当前的动态信息,包括心跳更新时间,路由数据版本等等。
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 保存了每个 Broker 对应的消息过滤服务的地址,用于服务端消息过滤。
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// 用于管理 Topic 与队列映射关系,主要服务于跨 Broker 队列的动态分配与一致性路由。
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
// ...
}
// remoting/src/main/java/org/apache/rocketmq/remoting/protocol/route/BrokerData.java
/**
* The class describes that a typical broker cluster's (in replication) details: the cluster (in sharding) name
* that it belongs to, and all the single instance information for this cluster.
*/
public class BrokerData implements Comparable<BrokerData> {
// 集群名称
private String cluster;
private String brokerName;
/**
* The container that store the all single instances for the current broker replication cluster.
* The key is the brokerId, and the value is the address of the single broker instance.
*/
// 保存 Broker 物理地址的Map:brokerAddrs,它的 Key 是 BrokerID,Value 就是这个 BrokerID 对应的 Broker的物理地址。
private HashMap<Long, String> brokerAddrs;
}
Broker 注册的路由信息处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@Override
// 处理 Request 的路由分发器
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
// 根据 request.getCode() 来分发请求到对应的处理器中
switch (request.getCode()) {
// ...
case RequestCode.REGISTER_BROKER:
// Broker 发给 NameServer 注册请求的 Code 为 REGISTER_BROKER,
// 在代码中根据 Broker 的版本号不同,分别有两个不同的处理实现方法:
// “extractRegisterBrokerBodyFromRequest”和"extractRegisterTopicConfigFromRequest"。
return this.registerBroker(ctx, request);
// ...
default:
String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
}
// namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
// 根据 Broker 请求过来的路由信息,依次对比并更新 clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable和 filterServerTable 这 5 个保存集群信息和路由信息的 Map 对象中的数据。
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final String zoneName,
final Long timeoutMillis,
final Boolean enableActingMaster,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
// 加写锁,防止并发修改数据
this.lock.writeLock().lockInterruptibly();
//init or update the cluster info
// 更新 clusterAddrTable
Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
// // 更新 brokerAddrTable
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true; // 标识需要先注
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
brokerData.setZoneName(zoneName);
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
boolean isMinBrokerIdChanged = false;
long prevMinBrokerId = 0;
if (!brokerAddrsMap.isEmpty()) {
prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
}
if (brokerId < prevMinBrokerId) {
isMinBrokerIdChanged = true;
}
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
// 更新 brokerAddrTable 中的 brokerData
brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
//If Local brokerId stateVersion bigger than the registering one,
String oldBrokerAddr = brokerAddrsMap.get(brokerId);
if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));
if (null != oldBrokerInfo) {
long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
if (oldStateVersion > newStateVersion) {
log.warn("Registering Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
//Remove the rejected brokerAddr from brokerLiveTable.
brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
return result;
}
}
}
if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
return null;
}
// 如果是新注册的 Master Broker,或者 Broker 中的路由信息变了,需要更新topicQueueMappingInfoTable
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
// Delete the topics that don't exist in tcTable from the current broker
// Static topic is not supported currently
if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) {
final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName);
final Set<String> newTopicSet = tcTable.keySet();
final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet);
for (final String toDeleteTopic : toDeleteTopics) {
Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic);
final QueueData removedQD = queueDataMap.remove(brokerName);
if (removedQD != null) {
log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD);
}
if (queueDataMap.isEmpty()) {
log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic);
topicQueueTable.remove(toDeleteTopic);
}
}
}
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
// In Slave Acting Master mode, Namesrv will regard the surviving Slave with the smallest brokerId as the "agent" Master, and modify the brokerPermission to read-only.
if (isPrimeSlave && brokerData.isEnableActingMaster()) {
// Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
}
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
//the topicQueueMappingInfoMap should never be null, but can be empty
for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
}
//Note asset brokerName equal entry.getValue().getBname()
//here use the mappingDetail.bname
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
}
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
// 更新 brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
new BrokerLiveInfo(
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}
// 更新 filterServerTable
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddrInfo);
} else {
this.filterServerTable.put(brokerAddrInfo, filterServerList);
}
}
// 如果是 Slave Broker,需要在返回的信息中带上 master 的相关信息
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
if (masterLiveInfo != null) {
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
} finally {
// 释放写锁
this.lock.writeLock().unlock();
}
return result;
}
客户端寻找Broker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// remoting/src/main/java/org/apache/rocketmq/remoting/protocol/route/TopicRouteData.java
// 客户端在启动后,会启动一个定时器,定期从 NameServer 上拉取相关主题的路由信息,然后缓存在本地内存中,在需要的时候使用。
// 每个主题的路由信息用一个 TopicRouteData 对象来表示
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
/*
客户端选定了队列后,可以在对应的 QueueData 中找到对应的BrokerName,
然后用这个 BrokerName 找到对应的 BrokerData 对象,最终找到对应的Master Broker 的物理地址
*/
// 保存了主题中的所有队列信息
private List<QueueData> queueDatas;
// 保存了主题相关的所有 Broker 信息
private List<BrokerData> brokerDatas;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
// 通过路由分发器将请求分发的对应的处理方法中
public TopicRouteData pickupTopicRouteData(final String topic) {
// 初始化返回数据 topicRouteData
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
List<BrokerData> brokerDataList = new LinkedList<>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
// 加读锁
this.lock.readLock().lockInterruptibly();
// 先获取主题对应的队列信息
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
if (queueDataMap != null) {
// 把队列信息返回值中
topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
foundQueueData = true;
Set<String> brokerNameSet = new HashSet<>(queueDataMap.keySet());
// 遍历这些 BrokerName,找到对应的 BrokerData,并写入返回结果中
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
continue;
}
BrokerData brokerDataClone = new BrokerData(brokerData);
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
if (filterServerTable.isEmpty()) {
continue;
}
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
} finally {
// 释放读锁
this.lock.readLock().unlock();
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
// 将 Topic 的队列映射信息(topicQueueMappingInfoTable)注入路由数据,供客户端识别队列分布规则。
topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
// 若 NameServer 配置未启用 Acting Master 功能,直接返回原始路由数据。
if (!namesrvConfig.isSupportActingMaster()) {
return topicRouteData;
}
// 跳过以 SYNC_BROKER_MEMBER_GROUP_PREFIX 开头的内部 Topic(如 Broker 成员组同步 Topic),避免干扰系统内部通信。
if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
return topicRouteData;
}
if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
return topicRouteData;
}
boolean needActingMaster = false;
// 遍历所有 Broker,若某 Broker 组有存活的节点但无 Master,则标记需处理。
for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (brokerData.getBrokerAddrs().size() != 0
&& !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
needActingMaster = true;
break;
}
}
if (!needActingMaster) {
return topicRouteData;
}
// 动态设置 Acting Master
for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
continue;
}
// No master
for (final QueueData queueData : topicRouteData.getQueueDatas()) {
if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
if (!PermName.isWriteable(queueData.getPerm())) {
final Long minBrokerId = Collections.min(brokerAddrs.keySet());
final String actingMasterAddr = brokerAddrs.remove(minBrokerId);
brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);
}
break;
}
}
}
return topicRouteData;
}
return null;
}
事务
- executeLocalTransaction:执行本地事务。
- checkLocalTransaction:反查本地事务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener localTransactionListener, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionListener && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 这里给消息添加了属性,标明这是一个事务消息,也就是半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// 调用发送普通消息的方法,发送这条半消息
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 执行本地事务
if (null != localTransactionListener) {
localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg);
} else {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return: {} messageTopic: {} transactionId: {} tag: {} key: {}",
localTransactionState, msg.getTopic(), msg.getTransactionId(), msg.getTags(), msg.getKeys());
}
} catch (Throwable e) {
log.error("executeLocalTransactionBranch exception, messageTopic: {} transactionId: {} tag: {} key: {}",
msg.getTopic(), msg.getTransactionId(), msg.getTags(), msg.getKeys(), e);
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// 根据事务消息和本地事务的执行结果 localTransactionState,决定提交或回滚事务消息
// 这里给 Broker 发送提交或回滚事务的 RPC 请求。
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
String uniqId = msgInner.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqId != null && !uniqId.isEmpty()) {
MessageAccessor.putProperty(msgInner, TransactionalMessageUtil.TRANSACTION_ID, uniqId);
}
// 记录消息的主题和队列,到新的属性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 替换消息的主题和队列为:RMQ_SYS_TRANS_HALF_TOPIC,0
// 并没有把半消息保存到消息中客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0。
// 这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。F
// 保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}