yjl49 发表于 2013-2-3 10:14:15

fqueue 消息队列(一)

fqueue 是国内开发人员用JAVA开发的一款开源消息队列系统。消息队列可用来处理高并发量的数据库读写操作,降低数据库负载。fqueue的通信层使用了netty框架,数据存储则采用文件队列的方式。
存储层主要的类有以下几个:
FQueue:主要的队列实现主类,所有的消息存储与读取都通过此类进行
FSQueue:FQueue的底层实现,主要是在文件队列系统层面的读写与管理控制
LogEntity:针对单个数据存储文件的操作类
LogIndex:索引文件控制类
整个存储队列关键的代码及功能如下:
1. 有专门的读和写操作句柄负责对当前存储文件的操作。
 
//当前写到的位置指针private LogEntity writerHandle = null;//当前读到的位置指针private LogEntity readerHandle = null;
 
 
 
writerHandle = createLogEntity(path + fileSeparator + filePrefix + "data_" + writerIndex + ".idb", db,writerIndex);if (readerIndex == writerIndex) {readerHandle = writerHandle;} else {readerHandle = createLogEntity(path + fileSeparator + filePrefix + "data_" + readerIndex + ".idb", db,readerIndex);}
 
2.专门的索引管理
 
public class LogIndex { /*** 记录写位置*   * @param pos*/ public void putWriterPosition(int pos); /*** 记录读取的位置*   * @param pos*/ public void putReaderPosition(int pos); /*** 记录写文件索引*   * @param index*/ public void putWriterIndex(int index); /*** 记录读取文件索引*   * @param index*/ public void putReaderIndex(int index); 3. 使用内存映射文件来处理存储文件,默认最大为150M,
 
 
private File file;private RandomAccessFile raFile;private FileChannel fc;public MappedByteBuffer mappedByteBuffer;raFile = new RandomAccessFile(file, "rwd");fc = raFile.getChannel();mappedByteBuffer = fc.map(MapMode.READ_WRITE, 0, this.fileLimitLength);
public FSQueue(String path) throws Exception {this(path, 1024 * 1024 * 150);} 4.实际的数据存储开始位置从20byte开始,前8byte为“FQueuefs”标识,接着4byte为版本,接着4byte为下一个文件编号,再接着4byte 为当前存储位置。
public class LogEntity {         .....      public static final String MAGIC = "FQueuefs";public static int messageStartPosition = 20;mappedByteBuffer.put(MAGIC.getBytes());mappedByteBuffer.putInt(version);// 8 versionmappedByteBuffer.putInt(nextFile);// 12next fileindexmappedByteBuffer.putInt(endPosition);// 16
基本的增删流程如下:
新增FQueue.offer() ---> FSQueue.add() ----->LogEntity.write() ------->生成下一个文件ID,新建存储文件和索引rotateNextLogWriter() ---->LogEntity.write()
 
      读取FQueue.poll() ---->FSQueue.readNextAndRemove() ----->LogEntity.readNextAndRemove()------[当前文件已读到了尾部]---> 获取下一个文件,将当前文件加入删除队列FileRunner.addDeleteFile()---->再次读取LogEntity.readNextAndRemove() 并更新索引位置。
 
页: [1]
查看完整版本: fqueue 消息队列(一)