博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rocketmq消息持久化
阅读量:7072 次
发布时间:2019-06-28

本文共 2124 字,大约阅读时间需要 7 分钟。

本文编写,参考:https://my.oschina.net/bieber/blog/725646

producer Send()的Message最终将由broker处理,处理类为:SendMessageProcessor ,处理方法:processRequet.

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List
consumeMessageHookList; public SendMessageProcessor(final BrokerController brokerController) {
super(brokerController); } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {} 上述方法,并不是直接处理消息,而是交由MessageStore处理,相关代码如下:
MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); //...... PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); 然而MessageStore也不直接持久化消息,转交给 CommitLog
long beginTime = this.getSystemClock().now(); PutMessageResult result = this.commitLog.putMessages(messageExtBatch); 从MappedFileQueue中取出最新的一条:
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();  //写消息
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);   //持久化到磁盘,最终通过FileChannel持久化到文件 handleDiskFlush(result, putMessageResult, messageExtBatch); handleHA(result, putMessageResult, messageExtBatch); 2.cousumer 从broker读消息。 消费者从broker读取消息经由PullMessageProcessor类处理的,processRequest()方法处理请求:
RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) 经过一系列的判断处理,之后交由 MessageStore:
final GetMessageResult getMessageResult =     this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),         requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); 读取消息。 之后交由commitLog,读出消息,
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
可以看到是先从ConsumerQueue中获取消息索引,然后再从commitlog中读取消息内容。这些内容也是在存储消息的时候写入的。
相关也可参考:http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
 

转载于:https://www.cnblogs.com/itdev/p/7086322.html

你可能感兴趣的文章
python爬虫笔记-day5
查看>>
Jenkins+newman 控制台输出样式
查看>>
公司业务转型,IT基础架构也要转型,京东云Docker容器集群微服务实践
查看>>
解释try_files $uri $uri/ /index.php$is_args$args;
查看>>
营销圈也可以提供类似“不涂口红的你”的创意文案?
查看>>
【源码分享】短信验证码功能对接CmsEasy
查看>>
学习linux入门之top命令的用法介绍
查看>>
MySQL的基础分部
查看>>
aix knowlgdgecenter
查看>>
好程序员分享JavaScript事件委托代理和函数封装详解
查看>>
VMWARE 占用硬盘空间只增大不减少的清理办法
查看>>
oracle技术之系统触发器的应用顺序(三)
查看>>
oracle RMAN备份FORMAT格式中%a的含义
查看>>
Oracle11gr2数据泵新特性(四)
查看>>
Oracle 11g r2数据泵新特性(一)
查看>>
我的友情链接
查看>>
iftop的安装及使用
查看>>
redis学习笔记之发布订阅
查看>>
电商工作之外的学习途径
查看>>
python 之简单扯一扯time模块
查看>>