您当前的位置:首页 > 电脑百科 > 数据库 > 百科

彻底弄懂RocketMQ文件存储(附代码实现)

时间:2022-12-13 14:45:56  来源:今日头条  作者:yanlei123study

前言

RocketMQ作为一款优秀的开源消息中间件,实现了文件的高性能存储和读取,在众多消息中间件中脱颖而出,其文件模块设计思想很值得我们学习和借鉴。因此很多开发者在使用的时候,也开始研究其文件存储的实现原理,但是在学习过程中,由于自身知识储备不足,往往只能了解其基本原理和整体架构,对于具体是怎么实现是,用到了什么技术,往往是一知半解。目前网上有很多介绍RocketMQ原理和源码的文章,但是很多都是讲解整体架构,对源代码的分析也仅仅是停留在代码注释层面,导致对整体和细节的把握不能统一, 给人一种"不识庐山真面目,只缘身在此山中"的感觉。

笔者针对开发者在研究RocketMQ的过程中遇到的困惑,基于对RocketMQ的文件存储原理和源码研究,结合JAVA NIO的文件读写,自己动手实现了一个简化版本的RocketMQ文件系统,分享出来,希望能抽丝剥茧,帮助开发者从本质上理解RocketMQ文件存储的原理,起到抛砖引玉,举一反三的作用。

本文不是一篇介绍RocketMQ存储基本原理的文章,本文假设读者对RocketMQ的CommitLog,ConsumeQueue,IndexFile已经有一定的了解,熟悉java NIO文件读写。本文适合对RocketMQ的文件存储原理有一定的了解,并且希望进一步了解RocketMQ是如何通过java NIO实现的读者。

 

核心原理

在向commitLog文件写入消息的时候,需要记录该条消息在commitLog文件的偏移量offset(消息在commitLog的起始字节数),读取的时候根据offset读取。RocketMQ保存offset的文件为consumeQueue 和indexFile。

 

RockeetMQ文件读写流程

 

RocketMQ文件存储示意图

RocketMQ文件逻辑存储结构


RocketMQ文件offset查找示意图

 

CommitLog读写

commitLog文件写入的是完整的消息,长度不固定,因此读取的时候只能根据文件存储偏移量offset读取。实际上offset保存在consumeQueue,indexFile文件中。

consumeQueue读写

consumeQueue在消费方拉取消息的时候读取,读取原理比较简单。

consumeQueue每条数据固定长度是20(8:offset+4:msgLen+8:tagCode),顺序写入,每写入一条消息,写入位置postition+20。读取的时候按消息序号index(第几条消息)读取。

假设消费方要消费消息序号index=2的消息(第2条消息),过程如下:

1.定位consumeQueue文件,然后读位置postition定位到40(2*20),读取数据。

2.根据1读取 的数据取到offset值(存储在consumeQueue的偏移量)。

3.根据2得到的offset值,定位commitLog文件,然后读取commitLog上的整条消息。

参见RocketMQ文件offset查找示意图

indexFile读写

indexFile由indexHead(长度40),500W个hash槽位(每个槽位长度固定4),2000W个indexData组成。

indexFile是为了方便通过messageId读取消息而设计的,因此需要将messageId和消息序号index做一层映射,将messageId取模后得到槽位下标(第几个槽位),然后将当前messageId对应的消息index(消息序号)放到对应的槽位,并将数据顺序保存到indexFile的indexData部分。

写入过程:

1.hash(messageId)%500W得到槽位(slot)的下标slot_index(第几个槽位,槽位长度固定4),

然后将消息序号index存放到对应的槽位(为简化设计,暂不考虑hash冲突的情况)。

2.存储indexData数据,起始存储位置postition 为

indexDataOffset = 40(文件头长度) + 500W * 4+(index-1)*20

读取过程:

1.hash(messageId) % 500W定位到槽位的下标slot_index(第几个槽位)。

2.然后根据槽位下标计算槽位的偏移量slot_offset(每个槽位的固定长度 是4)。

slot_offset = 40(文件头长度) + slot_index * 4。

3.然后根据slot_offset获取到槽位上存储的消息的序号index。

4.根据消息的index计算该条消息存储在indexFile的indexData部分的偏移量indexDataOffset,

indexDataOffset = 40(文件头长度) + 500W * 4+( index - 1 ) * 20

5.根据indexDataOffset读取indexFile的IndexData部分,然后获取commitLog的offset,即可读取到实际的消息。

参见RocketMQ文件offset查找示意图

 

代码实现

1.手动生成10个消息,并创建commitLog文件,consumeQueue,indexFile文件

public class CommitLogWriteTest {

    private static Long commitLogOffset = 0L;//8byte(commitlog offset)
    private static List<ConsumerQueueData> consumerQueueDatas = new ArrayList<>();
    private static List<IndexFileItemData> indexFileItemDatas = new ArrayList<>();
    private static int MESSAGE_COUNT = 10;

    public static void mAIn(String[] args) throws IOException {
        createCommitLog();
        createConsumerQueue();
        createIndexFile();
    }

    private static void createCommitLog() throws IOException {
         System.out.println("");
        System.out.println("commitLog file create!" );

        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MAppedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        fileChannel.close();
        Random random = new Random();

        int count = 0;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String topic = "Topic-test";
            String msgId = UUID.randomUUID().toString();
            String msgBody = "消息内容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48) + 1);//
            long queueOffset = i;//索引偏移量
            String transactionId = UUID.randomUUID().toString();


         /* 数据格式,位置固定
         int totalSize;//消息长度
         String msgId;
         String topic;
         long queueOffset;//索引偏移量
         long bodySize;//消息长度
         byte[] body;//消息内容
         String transactionId;
         long commitLogOffset;//从第一个文件开始算的偏移量

         */

            int msgTotalLen = 8 //msgTotalLen field
                    + 64  //msgId field长度
                    + 64 //topic field长度
                    + 8 //索引偏移量field长度
                    + 8 //消息长度field长度
                    + msgBody.getBytes(StandardCharsets.UTF_8).length //field
                    + 64  //transactionId field长度
                    + 64  //commitLogOffset field长度;
                    ;

            // 定位写入文件的起始位置
            //如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300
            mappedByteBuffer.position(Integer.valueOf(commitLogOffset + ""));

            mappedByteBuffer.putLong(msgTotalLen);//msgTotalLen
            mappedByteBuffer.put(getBytes(msgId, 64));//msgId
            mappedByteBuffer.put(getBytes(topic, 64));//topic,定长64
            mappedByteBuffer.putLong(queueOffset);//索引偏移量
            mappedByteBuffer.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            mappedByteBuffer.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            mappedByteBuffer.put(getBytes(transactionId, 64));
            mappedByteBuffer.putLong(commitLogOffset);//commitLogOffset
           

            System.out.println("写入消息,第:" + i + "次");

            System.out.println("msgTotalLen:" + msgTotalLen);
            System.out.println("msgId:" + msgId);
            System.out.println("topic:" + topic);
            System.out.println("msgBody:" + msgBody);
            System.out.println("transactionId:" + transactionId);
            System.out.println("commitLogOffset:" + commitLogOffset);

            ConsumerQueueData consumerQueueData = new ConsumerQueueData();
            consumerQueueData.setOffset(commitLogOffset);
            consumerQueueData.setMsgLength(msgTotalLen);
            consumerQueueData.setTagCode(100L);
            
          //准备生成consumeQueue文件
            consumerQueueDatas.add(consumerQueueData);
            IndexFileItemData indexFileItemData = new IndexFileItemData();
            indexFileItemData.setKeyHash(msgId.hashCode());
            indexFileItemData.setMessageId(msgId);
            indexFileItemData.setPhyOffset(commitLogOffset);
            //准备生成indexFile文件
            indexFileItemDatas.add(indexFileItemData);
            mappedByteBuffer.force();
          
            commitLogOffset = msgTotalLen + commitLogOffset;
             count++;
        }

        System.out.println("commitLog数据保存完成,totalSize:" + count);

    }


    public static void createConsumerQueue() throws IOException {
        System.out.println("");
        System.out.println("ConsumerQueue file create!" );

        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
        fileChannel.close();
        int count = 0;
        for (int i = 0; i < consumerQueueDatas.size(); i++) {
            ConsumerQueueData consumerQueueData = consumerQueueDatas.get(i);
            //指定写入位置
            mappedByteBuffer.position(i * 20);
            mappedByteBuffer.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)
            mappedByteBuffer.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)
            mappedByteBuffer.putLong(consumerQueueData.getTagCode());//8byte (tagCode)

            count++;
            System.out.println("consumerQueue数据写入完成:" + JSON.toJSONString(consumerQueueData));
            mappedByteBuffer.force();

        }
        System.out.println("ConsumerQueue数据保存完成count:" + count);


    }


    public static void createIndexFile() throws IOException {
        System.out.println("");
        System.out.println("IndexFile file create!" );

        //文件场创建时间,在写第一条消息的时候创建
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        ByteBuffer headerByteBuffer = mappedByteBuffer.slice();
        long firstDataTime = System.currentTimeMillis();

        fileChannel.close();

        //开始写hash槽,从头部后写入
        /*  已经填充有index的slot数量
          (并不是每个slot槽下都挂载有index索引单元,这 里统计的是所有挂载了index索引单元的slot槽的数量,hash冲突)*/
        int hashSlotCount = 0;

        /* 已该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和),
        如果没有hash冲突,hashSlotCount = indexCount*/
        int indexCount = 0;
        //假设建立100个槽位(总长度400)
        int soltNum = 100;

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            IndexFileItemData indexFileItemData = indexFileItemDatas.get(i);
            int keyHash = indexFileItemData.getKeyHash();

            //取模,计算第几个槽位
            int slotPos = keyHash % 100 > 0 ? keyHash % 100 : -1 * (keyHash % 100);

            // slot存放的文件偏移量(字节长度)
            int absSlotPos = 40 + slotPos * 4;

            // 存储实际数据的文件偏移量(字节长度)
            int absIndexPos =
                    40 + soltNum * 4
                            + indexCount * 20;


            //将indexCount存到对应的hash槽
            mappedByteBuffer.putInt(absSlotPos, indexCount);

            //写入数据(IndecFile的实际数据部分)
            mappedByteBuffer.putInt(absIndexPos, indexFileItemData.getKeyHash());//8byte msg hashcode
            mappedByteBuffer.putLong(absIndexPos + 4, indexFileItemData.getPhyOffset());//8byte msg hashcode
            mappedByteBuffer.putInt(absIndexPos + 4 + 8, Integer.valueOf((System.currentTimeMillis() - firstDataTime) + ""));//8byte (timeDiff)
            mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, 0);//8byte (preIndex)暂不考虑hash冲突的情况


            //模拟最后一个文件,写入header
            if (i == 0) {
                //该indexFile中第一条消息的存储时间
                headerByteBuffer.putLong(0, firstDataTime);
                //该indexFile种第一条消息在commitlog种的偏移量commitlog offset
                mappedByteBuffer.putLong(16, indexFileItemData.getPhyOffset());
            }
            //模拟第一个文件,写入header
            if (i == MESSAGE_COUNT - 1) {
                //该indexFile种最后一条消息存储时间
                headerByteBuffer.putLong(8, System.currentTimeMillis());
                //该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
                headerByteBuffer.putLong(24, indexFileItemData.getPhyOffset());
            }
            //已经填充有index的slot数量
            headerByteBuffer.putInt(32, hashSlotCount + 1);
            //该indexFile中包含的索引单元个数
            headerByteBuffer.putInt(36, indexCount + 1);
            mappedByteBuffer.force();
            System.out.println("msgId:" + indexFileItemData.getMessageId() + ",keyHash:" + keyHash + ",保存槽位为" + slotPos + "的数据,absSlotPos=" + absSlotPos + ",值index=" + indexCount + ",绝对位置:" + absIndexPos + ",commit-phyOffset:" + indexFileItemData.getPhyOffset());

            indexCount++;
            hashSlotCount++;

        }

    }


    //将变长字符串定长byte[],方便读取
    private static byte[] getBytes(String s, int length) {
        int fixLength = length - s.getBytes().length;
        if (s.getBytes().length < length) {
            byte[] S_bytes = new byte[length];
            System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);
            for (int x = length - fixLength; x < length; x++) {
                S_bytes[x] = 0x00;
            }
            return S_bytes;
        }
        return s.getBytes(StandardCharsets.UTF_8);
    }

}

运行结果:

commitLog file create!
写入消息,第:0次
msgTotalLen:338
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:874605e6-69d2-4301-a65e-01e63de75a4d
commitLogOffset:0
写入消息,第:1次
msgTotalLen:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3c
commitLogOffset:338
写入消息,第:2次
msgTotalLen:296
msgId:a0c7c833-9811-4f17-800b-847766aef7dd
topic:Topic-test
msgBody:消息内容msgm
transactionId:9a836d21-704f-46ae-926c-b7933efe06a5
commitLogOffset:676
写入消息,第:3次
msgTotalLen:299
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356
topic:Topic-test
msgBody:消息内容msgmsgm
transactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44
commitLogOffset:972
写入消息,第:4次
msgTotalLen:306
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgms
transactionId:09f3b762-159e-4486-8820-0bce0ef7972d
commitLogOffset:1271
写入消息,第:5次
msgTotalLen:313
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsg
transactionId:42dce613-6aaf-466b-b185-02a3f7917579
commitLogOffset:1577
写入消息,第:6次
msgTotalLen:321
msgId:05be27f8-fb7a-4662-904f-2263e8899086
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:6c7db927-911c-4d19-a240-a951fad957bd
commitLogOffset:1890
写入消息,第:7次
msgTotalLen:318
msgId:9a508d90-30f6-4a25-812f-25d750736afe
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgms
transactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897
commitLogOffset:2211
写入消息,第:8次
msgTotalLen:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724
commitLogOffset:2529
写入消息,第:9次
msgTotalLen:329
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:e9078205-15be-42b1-ad7e-55b9f5e229eb
commitLogOffset:2834
commitLog数据保存完成,totalSize:10

ConsumerQueue file create!
consumerQueue数据写入完成:{"msgLength":338,"offset":0,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":338,"offset":338,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":296,"offset":676,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":299,"offset":972,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":306,"offset":1271,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":313,"offset":1577,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":321,"offset":1890,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":318,"offset":2211,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":305,"offset":2529,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":329,"offset":2834,"tagCode":100}
ConsumerQueue数据保存完成count:10

IndexFile file create!
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05,keyHash:-358470777,保存槽位为77的数据,absSlotPos=348,值index=0,绝对位置:440,commit-phyOffset:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e,keyHash:466366793,保存槽位为93的数据,absSlotPos=412,值index=1,绝对位置:460,commit-phyOffset:676
msgId:a0c7c833-9811-4f17-800b-847766aef7dd,keyHash:1237522456,保存槽位为56的数据,absSlotPos=264,值index=2,绝对位置:480,commit-phyOffset:972
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356,keyHash:-1115509881,保存槽位为81的数据,absSlotPos=364,值index=3,绝对位置:500,commit-phyOffset:1271
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5,keyHash:1219778974,保存槽位为74的数据,absSlotPos=336,值index=4,绝对位置:520,commit-phyOffset:1577
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1,keyHash:460184183,保存槽位为83的数据,absSlotPos=372,值index=5,绝对位置:540,commit-phyOffset:1890
msgId:05be27f8-fb7a-4662-904f-2263e8899086,keyHash:-339624012,保存槽位为12的数据,absSlotPos=88,值index=6,绝对位置:560,commit-phyOffset:2211
msgId:9a508d90-30f6-4a25-812f-25d750736afe,keyHash:403329587,保存槽位为87的数据,absSlotPos=388,值index=7,绝对位置:580,commit-phyOffset:2529
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be,keyHash:-1569335572,保存槽位为72的数据,absSlotPos=328,值index=8,绝对位置:600,commit-phyOffset:2834
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489,keyHash:597856342,保存槽位为42的数据,absSlotPos=208,值index=9,绝对位置:620,commit-phyOffset:3163

 

2.读取consumeQueue文件,并根据offset从commitLog读取一条完整的消息

public class ConsumeQueueMessageReadTest {

    public static MappedByteBuffer mappedByteBuffer = null;
    private static int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        fileChannel.close();
        //根据索引下标读取索引,实际情况是用户消费的最新点位(for循环的i值),
      //存在在broker的偏移量文件中
       
        int index = 0;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            mappedByteBuffer.position(i * 20);

            long commitlogOffset = mappedByteBuffer.getLong();
            // System.out.println(commitlogOffset);
            long msgLen = mappedByteBuffer.getInt();
            Long tag = mappedByteBuffer.getLong();
            //System.out.println("======读取到consumerQueue,commitlogOffset:"+commitlogOffset+",msgLen :"+msgLen+"===");
            //根据偏移量读取CommitLog
            System.out.println("=================commitlog读取第:"+index+"消息,偏移量为" + commitlogOffset + "===================");

            readCommitLogByOffset(Integer.valueOf(commitlogOffset + ""));
            index ++;

        }
    }


    public static MappedByteBuffer initFileChannel() throws IOException {
        if (mappedByteBuffer == null) {
            FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                    StandardOpenOption.WRITE, StandardOpenOption.READ);

            mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
            commitLogfileChannel.close();
        }

        return mappedByteBuffer;

    }


    /*
     *
     * 根据偏移量读取commitLog
     * */
    public static void readCommitLogByOffset(int offset) throws IOException {

        /* 存放顺序,读到时候保持顺序一致
           b.putLong(totalSize);//totalSize
            b.put(getBytes(msgId, 64));//msgId
            b.put(getBytes(topic, 64));//topic,定长64
            b.putLong(queueOffset);//索引偏移量
            b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            b.put(getBytes(transactionId, 64));
            b.putLong(commitLogOffset);//commitLogOffset
        */

        MappedByteBuffer mappedByteBuffer = initFileChannel();
        mappedByteBuffer.position(offset);


        long totalSize = mappedByteBuffer.getLong();//消息长度

        byte[] msgIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(msgIdByte);

        byte[] topicByte = new byte[64];// 固定是64
        mappedByteBuffer.get(topicByte);
        long queueOffset = mappedByteBuffer.getLong();
        Long bodySize = mappedByteBuffer.getLong();
        int bSize = Integer.valueOf(bodySize + "");
        byte[] bodyByte = new byte[bSize];//bodySize 长度不固定
        mappedByteBuffer.get(bodyByte);
        byte[] transactionIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(transactionIdByte);
        long commitLogOffset = mappedByteBuffer.getLong();//偏移量
        System.out.println("totalSize:" + totalSize);
        System.out.println("msgId:" + new String(msgIdByte));
        System.out.println("topic:" + new String(topicByte));
        System.out.println("queueOffset:" + queueOffset);
        System.out.println("bodySize:" + bodySize);
        System.out.println("body:" + new String(bodyByte));
        System.out.println("transactionId:" + new String(transactionIdByte));
        System.out.println("commitLogOffset:" + commitLogOffset);

    }

}

运行结果:

=================commitlog读取第:0消息,偏移量为0===================
totalSize:338
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05                            
topic:Topic-test                                                      
queueOffset:0
bodySize:58
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:874605e6-69d2-4301-a65e-01e63de75a4d                            
commitLogOffset:0
=================commitlog读取第:1消息,偏移量为338===================
totalSize:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e                            
topic:Topic-test                                                      
queueOffset:1
bodySize:58
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3c                            
commitLogOffset:338
=================commitlog读取第:2消息,偏移量为676===================
totalSize:296
msgId:a0c7c833-9811-4f17-800b-847766aef7dd                            
topic:Topic-test                                                      
queueOffset:2
bodySize:16
body:消息内容msgm
transactionId:9a836d21-704f-46ae-926c-b7933efe06a5                            
commitLogOffset:676
=================commitlog读取第:3消息,偏移量为972===================
totalSize:299
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356                            
topic:Topic-test                                                      
queueOffset:3
bodySize:19
body:消息内容msgmsgm
transactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44                            
commitLogOffset:972
=================commitlog读取第:4消息,偏移量为1271===================
totalSize:306
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5                            
topic:Topic-test                                                      
queueOffset:4
bodySize:26
body:消息内容msgmsgmsgmsgms
transactionId:09f3b762-159e-4486-8820-0bce0ef7972d                            
commitLogOffset:1271
=================commitlog读取第:5消息,偏移量为1577===================
totalSize:313
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1                            
topic:Topic-test                                                      
queueOffset:5
bodySize:33
body:消息内容msgmsgmsgmsgmsgmsgmsg
transactionId:42dce613-6aaf-466b-b185-02a3f7917579                            
commitLogOffset:1577
=================commitlog读取第:6消息,偏移量为1890===================
totalSize:321
msgId:05be27f8-fb7a-4662-904f-2263e8899086                            
topic:Topic-test                                                      
queueOffset:6
bodySize:41
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:6c7db927-911c-4d19-a240-a951fad957bd                            
commitLogOffset:1890
=================commitlog读取第:7消息,偏移量为2211===================
totalSize:318
msgId:9a508d90-30f6-4a25-812f-25d750736afe                            
topic:Topic-test                                                      
queueOffset:7
bodySize:38
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgms
transactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897                            
commitLogOffset:2211
=================commitlog读取第:8消息,偏移量为2529===================
totalSize:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be                            
topic:Topic-test                                                      
queueOffsmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724                            
commitLogOffset:2529
=================commitlog读取第:9消息,偏移量为2834===================
totalSize:329
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489                            
topic:Topic-test                                                      
queueOffset:9
bodySize:49
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:e9078205-15be-42b1-ad7e-55b9f5e229eb                            
commitLogOffset:2834

3.根据messageId读取indexFile,然后根据偏移量从CommitLog读取一条完整的消息

 public class IndexFileMessageReadTest {

    public static MappedByteBuffer mappedByteBuffer = null;

    public static void main(String[] args) throws IOException {
        String msgId = "8b78474f-b28a-4442-99a0-6f7883f0302b";
        readByMessageId(msgId);

    }

    private static void readByMessageId(String messageId) throws IOException {
        FileChannel indexFileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer indexMappedByteBuffer = indexFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
        indexFileChannel.close();

        System.out.println("============get indexFile header===============");
        System.out.println("beginTimestampIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("endTimestampIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("beginPhyoffsetIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("endPhyoffsetIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("hashSlotcountIndex:" + indexMappedByteBuffer.getInt());
        System.out.println("indexCountIndex:" + indexMappedByteBuffer.getInt());
        System.out.println("");

        int keyHash = messageId.hashCode();

        //取模,计算第几个槽位
        int slotPos = keyHash % 100 > 0 ? keyHash % 100 : -1 * (keyHash % 100);
        System.out.println("messageId:" + messageId + ",取模为:" + slotPos);

        // slot的文件偏移量(字节长度)
        int absSlotPos = 40 + slotPos * 4;
        System.out.println("哈希槽的字节数组位置:(40+" + slotPos + "*4)=" + absSlotPos);


        //获取hash槽上存取的件索引,第几个文件
        int index = indexMappedByteBuffer.getInt(absSlotPos);

        //计算数据需要存储的文件偏移量(字节长度)
        int absIndexPos =
                40 + 100 * 4
                        + index * 20;

        System.out.println("第几个文件index=" + index + ",实际存储数据的字节数组位置:(40 + 100 * 4+index *20)=" + absIndexPos);

        long keyHash1 = indexMappedByteBuffer.getInt(absIndexPos);
        long pyhOffset = indexMappedByteBuffer.getLong(absIndexPos + 4);
        int timeDiff = indexMappedByteBuffer.getInt(absIndexPos + 4 + 8);
        int preIndexNo = indexMappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);


        System.out.println("从index获取到的commitLog偏移量为:" + pyhOffset);
        System.out.println("");

        readCommitLogByOffset((int) pyhOffset);

    }


    public static MappedByteBuffer initFileChannel() throws IOException {
        if (mappedByteBuffer == null) {
            FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                    StandardOpenOption.WRITE, StandardOpenOption.READ);

            mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
            commitLogfileChannel.close();
        }

        return mappedByteBuffer;

    }


    /*
     *
     * 根据偏移量读取CcommitLog
     * */
    public static void readCommitLogByOffset(int offset) throws IOException {


        /*b.putLong(totalSize);//totalSize
            b.put(getBytes(msgId, 64));//msgId
            b.put(getBytes(topic, 64));//topic,定长64
            b.putLong(queueOffset);//索引偏移量
            b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            b.put(getBytes(transactionId, 64));
            b.putLong(commitLogOffset);//commitLogOffset
        */
        System.out.println("=================commitlog读取偏移量为" + offset + "的消息===================");

        MappedByteBuffer mappedByteBuffer = initFileChannel();
        mappedByteBuffer.position(offset);


        long totalSize = mappedByteBuffer.getLong();//消息长度

        byte[] msgIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(msgIdByte);

        byte[] topicByte = new byte[64];// 固定是64
        mappedByteBuffer.get(topicByte);
        long queueOffset = mappedByteBuffer.getLong();
        Long bodySize = mappedByteBuffer.getLong();
        int bSize = Integer.valueOf(bodySize + "");
        byte[] bodyByte = new byte[bSize];//bodySize 长度不固定
        mappedByteBuffer.get(bodyByte);
        byte[] transactionIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(transactionIdByte);
        long commitLogOffset = mappedByteBuffer.getLong();//偏移量
        System.out.println("totalSize:" + totalSize);
        System.out.println("msgId:" + new String(msgIdByte));
        System.out.println("topic:" + new String(topicByte));
        System.out.println("queueOffset:" + queueOffset);
        System.out.println("bodySize:" + bodySize);
        System.out.println("body:" + new String(bodyByte));
        System.out.println("transactionId:" + new String(transactionIdByte));
        System.out.println("commitLogOffset:" + commitLogOffset);

    }


    public static byte[] toByteArray(long number) {
        byte length = Long.BYTES;
        byte[] bytes = new byte[length];

        for (byte i = 0; i < length; i++) {
            bytes[length - 1 - i] = (byte) number;
            number >>= 8;
        }

        return bytes;
    }

}

运行结果:

============get indexFile header===============
beginTimestampIndex:1669602898169
endTimestampIndex:1669602898176
beginPhyoffsetIndex:338
endPhyoffsetIndex:3163
hashSlotcountIndex:10
indexCountIndex:10

messageId:9a508d90-30f6-4a25-812f-25d750736afe,取模为:87
哈希槽的字节数组位置:(40+87*4)=388
第几个文件index=7,实际存储数据的字节数组位置:(40 + 100 * 4+index *20)=580
从index获取到的commitLog偏移量为:2529

=================commitlog读取偏移量为2529的消息===================
totalSize:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be                            
topic:Topic-test                                                      
queueOffset:8
bodySize:25
body:消息内容msgmsgmsgmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724                            
commitLogOffset:2529

结语

本文基于java NIO实现了RocketMQ的文件系统的最精简的功能,希望能帮助开发人员加深对RocketMQ文件系统底层实现原理的了解,并能熟练运用Java NIO进行文件读写。欢迎一起交流讨论,不足的地方欢迎指正。



Tags:RocketMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
大白话设计RocketMQ延迟消息
延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单 释放库存 等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间...【详细内容】
2023-12-27  Search: RocketMQ  点击:(102)  评论:(0)  加入收藏
九个问答牢记RocketMQ架构
RocketMQ是Java兄弟们常用的消息中间件,虽说常用,但对于RocketMQ架构经常忘记。究其原因就2点:忙于业务开发然后长时间不看则忘了、不理解架构设计的根本原因记不牢。本文用大...【详细内容】
2023-12-27  Search: RocketMQ  点击:(111)  评论:(0)  加入收藏
如何应对 RocketMQ 消息堆积
这篇文章,我们聊聊如何应对 RocketMQ 消息堆积。图片1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消...【详细内容】
2023-12-21  Search: RocketMQ  点击:(71)  评论:(0)  加入收藏
解锁RocketMQ秘籍:如何保障消息顺序性?
嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题&mdash;&mdash;“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武...【详细内容】
2023-12-15  Search: RocketMQ  点击:(99)  评论:(0)  加入收藏
Apache RocketMQ 5.0腾讯云落地实践
Apache RocketMQ 发展历程回顾RocketMQ 最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给 Apache 社区,成为 Apache 社区的顶级项目,并在国...【详细内容】
2023-12-13  Search: RocketMQ  点击:(134)  评论:(0)  加入收藏
聊聊 RocketMQ 5.0 的 POP 消费模式!
大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下...【详细内容】
2023-05-16  Search: RocketMQ  点击:(303)  评论:(0)  加入收藏
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。为什么需要保证幂等性呢?是因为消息会重复消费。为什么消息会重复消费?明明已经消费了,为什么消息会...【详细内容】
2023-04-13  Search: RocketMQ  点击:(238)  评论:(0)  加入收藏
SpringBoot整合RocketMQ,老鸟们都是这么玩的!
今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决...【详细内容】
2023-04-12  Search: RocketMQ  点击:(429)  评论:(0)  加入收藏
SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
Spring Boot 是一个基于 Spring 框架的快速开发框架,而 RabbitMQ 和 RocketMQ 则是常用的消息队列中间件。下面是它们常用的一些用法和场景。 订单处理在电商等系统中,下单后...【详细内容】
2023-03-09  Search: RocketMQ  点击:(205)  评论:(0)  加入收藏
通过源码分析RocketMQ主从复制原理
作者:京东物流 宫丙来一、主从复制概述 RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。 CommitLog的消息复制是发生在消息写入时,当...【详细内容】
2023-03-02  Search: RocketMQ  点击:(63)  评论:(0)  加入收藏
▌简易百科推荐
向量数据库落地实践
本文基于京东内部向量数据库vearch进行实践。Vearch 是对大规模深度学习向量进行高性能相似搜索的弹性分布式系统。详见: https://github.com/vearch/zh_docs/blob/v3.3.X/do...【详细内容】
2024-04-03  京东云开发者    Tags:向量数据库   点击:(5)  评论:(0)  加入收藏
原来 SQL 函数是可以内联的!
介绍在某些情况下,SQL 函数(即指定LANGUAGE SQL)会将其函数体内联到调用它的查询中,而不是直接调用。这可以带来显著的性能提升,因为函数体可以暴露给调用查询的规划器,从而规划器...【详细内容】
2024-04-03  红石PG  微信公众号  Tags:SQL 函数   点击:(5)  评论:(0)  加入收藏
如何正确选择NoSQL数据库
译者 | 陈峻审校 | 重楼Allied Market Research最近发布的一份报告指出,业界对于NoSQL数据库的需求正在持续上升。2022年,全球NoSQL市场的销售额已达73亿美元,预计到2032年将达...【详细内容】
2024-03-28    51CTO  Tags:NoSQL   点击:(14)  评论:(0)  加入收藏
为什么数据库连接池不采用 IO 多路复用?
这是一个非常好的问题。IO多路复用被视为是非常好的性能助力器。但是一般我们在使用DB时,还是经常性采用c3p0,tomcat connection pool等技术来与DB连接,哪怕整个程序已经变成以...【详细内容】
2024-03-27  dbaplus社群    Tags:数据库连接池   点击:(14)  评论:(0)  加入收藏
八个常见的数据可视化错误以及如何避免它们
在当今以数据驱动为主导的世界里,清晰且具有洞察力的数据可视化至关重要。然而,在创建数据可视化时很容易犯错误,这可能导致对数据的错误解读。本文将探讨一些常见的糟糕数据可...【详细内容】
2024-03-26  DeepHub IMBA  微信公众号  Tags:数据可视化   点击:(7)  评论:(0)  加入收藏
到底有没有必要分库分表,如何考量的
关于是否需要进行分库分表,可以根据以下考量因素来决定: 数据量和负载:如果数据量巨大且负载压力较大,单一库单一表可能无法满足性能需求,考虑分库分表。 数据增长:预估数据增长...【详细内容】
2024-03-20  码上遇见你  微信公众号  Tags:分库分表   点击:(16)  评论:(0)  加入收藏
在 SQL 中写了 in 和 not in,技术总监说要炒了我……
WHY?IN 和 NOT IN 是比较常用的关键字,为什么要尽量避免呢?1、效率低项目中遇到这么个情况:t1表 和 t2表 都是150w条数据,600M的样子,都不算大。但是这样一句查询 &darr;select *...【详细内容】
2024-03-18  dbaplus社群    Tags:SQL   点击:(6)  评论:(0)  加入收藏
应对慢SQL的致胜法宝:7大实例剖析+优化原则
大促备战,最大的隐患项之一就是慢SQL,对于服务平稳运行带来的破坏性最大,也是日常工作中经常带来整个应用抖动的最大隐患,在日常开发中如何避免出现慢SQL,出现了慢SQL应该按照什...【详细内容】
2024-03-14  京东云开发者    Tags:慢SQL   点击:(5)  评论:(0)  加入收藏
过去一年,我看到了数据库领域的十大发展趋势
作者 | 朱洁策划 | 李冬梅过去一年,行业信心跌至冰点2022 年中,红衫的一篇《适应与忍耐》的报告,对公司经营提出了预警,让各个公司保持现金流,重整团队,想办法增加盈利。这篇报告...【详细内容】
2024-03-12    InfoQ  Tags:数据库   点击:(32)  评论:(0)  加入收藏
SQL优化的七个方法,你会哪个?
一、插入数据优化 普通插入:在平时我们执行insert语句的时候,可能都是一条一条数据插入进去的,就像下面这样。INSERT INTO `department` VALUES(1, &#39;研发部(RD)&#39;, &#39...【详细内容】
2024-03-07  程序员恰恰  微信公众号  Tags:SQL优化   点击:(20)  评论:(0)  加入收藏
站内最新
站内热门
站内头条