【Zookeeper源码分析】—持久化(一)之FileTxnLog

作者:leesf

出处:https://www.cnblogs.com/leesf456/p/6518040.html


一、前言

前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。

二、持久化总体框架

持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。

  • TxnLog,接口类型,读取事务性日志的接口。
  • FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。
  • Snapshot,接口类型,持久层快照接口。
  • FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
  • FileTxnSnapLog,封装了TxnLog和SnapShot。
  • Util,工具类,提供持久化所需的API。

下面先来分析TxnLog和FileTxnLog的源码。

三、TxnLog源码分析

TxnLog是接口,规定了对日志的响应操作。

public interface TxnLog {

    /
     * roll the current
     * log being appended to
     * @throws IOException
     */
    // 回滚日志
    void rollLog() throws IOException;
    /
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * returns true iff something appended, otw false
     * @throws IOException
     */
    // 添加一个请求至事务性日志
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the
     * next transaction in the logs.
     * @throws IOException
     */
    // 读取事务性日志
    TxnIterator read(long zxid) throws IOException;

    /
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    // 事务性操作的最新zxid
    long getLastLoggedZxid() throws IOException;

    /
     * truncate the log to get in sync with the
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException
     */
    // 清空日志,与Leader保持同步
    boolean truncate(long zxid) throws IOException;

    /
     * the dbid for this transaction log.
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    // 获取数据库的id
    long getDbId() throws IOException;

    /
     * commmit the trasaction and make sure
     * they are persisted
     * @throws IOException
     */
    // 提交事务并进行确认
    void commit() throws IOException;

    /
     * close the transactions logs
     */
    // 关闭事务性日志
    void close() throws IOException;
    /
     * an iterating interface for reading
     * transaction logs.
     */
    // 读取事务日志的迭代器接口
    public interface TxnIterator {
        /
         * return the transaction header.
         * @return return the transaction header.
         */
        // 获取事务头部
        TxnHeader getHeader();

        /
         * return the transaction record.
         * @return return the transaction record.
         */
        // 获取事务
        Record getTxn();

        /
         * go to the next transaction record.
         * @throws IOException
         */
        // 下个事务
        boolean next() throws IOException;

        /
         * close files and release the
         * resources
         * @throws IOException
         */
        // 关闭文件释放资源
        void close() throws IOException;
    }
}

其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。

四、FileTxnLog源码分析

对于LogFile而言,其格式可分为如下三部分

LogFile:

FileHeader TxnList ZeroPad

FileHeader格式如下

FileHeader: {
  magic 4bytes (ZKLG)
  version 4bytes
  dbid 8bytes
}

TxnList格式如下

TxnList:
Txn || Txn TxnList

Txn格式如下

  Txn:

checksum Txnlen TxnHeader Record 0x42

Txnlen格式如下

Txnlen:
len 4bytes

TxnHeader格式如下

TxnHeader: {

sessionid 8bytes
cxid 4bytes
    zxid 8bytes
time 8bytes
type 4bytes
}

ZeroPad格式如下

ZeroPad:
0 padded to EOF (filled during preallocation stage)

了解LogFile的格式对于理解源码会有很大的帮助。

4.1 属性

public class FileTxnLog implements TxnLog {
    private static final Logger LOG;

    // 预分配大小 64M
    static long preAllocSize =  65536 * 1024;

    // 魔术数字,默认为1514884167
    public final static int TXNLOG_MAGIC =
        ByteBuffer.wrap("ZKLG".getBytes()).getInt();

    // 版本号
    public final static int VERSION = 2;

    / Maximum time we allow for elapsed fsync before WARNing */
    // 进行同步时,发出warn之前所能等待的最长时间
    private final static long fsyncWarningThresholdMS;

    // 静态属性,确定Logger、预分配空间大小和最长时间
    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);

        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
        fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
    }

    // 最大(新)的zxid
    long lastZxidSeen;
    // 存储数据相关的流
    volatile BufferedOutputStream logStream = null;
    volatile OutputArchive oa;
    volatile FileOutputStream fos = null;

    // log目录文件
    File logDir;

    // 是否强制同步
    private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;

    // 数据库id
    long dbId;

    // 流列表
    private LinkedList<FileOutputStream> streamsToFlush =
        new LinkedList<FileOutputStream>();

    // 当前大小
    long currentSize;
    // 写日志文件
    File logFileWrite = null;
}

4.2. 核心函数 

1. append函数

    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr != null) { // 事务头部不为空
            if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid
                LOG.warn("Current zxid " + hdr.getZxid()
                        + " is <= " + lastZxidSeen + " for "
                        + hdr.getType());
            }
            if (logStream==null) { // 日志流为空
               if(LOG.isInfoEnabled()){
                    LOG.info("Creating new log file: log." +  
                            Long.toHexString(hdr.getZxid()));
               }

               //
               logFileWrite = new File(logDir, ("log." +
                       Long.toHexString(hdr.getZxid())));
               fos = new FileOutputStream(logFileWrite);
               logStream=new BufferedOutputStream(fos);
               oa = BinaryOutputArchive.getArchive(logStream);
               //
               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
               // 序列化
               fhdr.serialize(oa, "fileheader");
               // Make sure that the magic number is written before padding.
               // 刷新到磁盘
               logStream.flush();

               // 当前通道的大小
               currentSize = fos.getChannel().position();
               // 添加fos
               streamsToFlush.add(fos);
            }

            // 填充文件
            padFile(fos);

            // Serializes transaction header and transaction data into a byte buffer.
            // 将事务头和事务数据序列化成Byte Buffer
            byte[] buf = Util.marshallTxnEntry(hdr, txn);
            if (buf == null || buf.length == 0) { // 为空,抛出异常
                throw new IOException("Faulty serialization for header " +
                        "and txn");
            }
            // 生成一个验证算法
            Checksum crc = makeChecksumAlgorithm();
            // Updates the current checksum with the specified array of bytes
            // 使用Byte数组来更新当前的Checksum
            crc.update(buf, 0, buf.length);
            // 写long类型数据
            oa.writeLong(crc.getValue(), "txnEntryCRC");
            // Write the serialized transaction record to the output archive.
            // 将序列化的事务记录写入OutputArchive
            Util.writeTxnBytes(oa, buf);

            return true;
        }
        return false;
    }

说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下

  • ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false
  • ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤
  • ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④
  • ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤
  • ⑤ 填充数据,填充0,进入⑥
  • ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦
  • ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧
  • ⑧ 将更新的ByteBuffer写入磁盘文件,返回true

append间接调用了padLog函数,其源码如下 

    public static long padLogFile(FileOutputStream f,long currentSize,
            long preAllocSize) throws IOException{
        // 获取位置
        long position = f.getChannel().position();
        if (position + 4096 >= currentSize) { // 计算后是否大于当前大小
            // 重新设置当前大小,剩余部分填充0
            currentSize = currentSize + preAllocSize;
            fill.position(0);
            f.getChannel().write(fill, currentSize-fill.remaining());
        }
        return currentSize;
    }

说明:其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。

2. getLogFiles函数

    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        // 按照zxid对文件进行排序
        List<File> files = Util.sortDataDir(logDirList, "log", true);
        long logZxid = 0;
        // Find the log file that starts before or at the same time as the
        // zxid of the snapshot
        for (File f : files) { // 遍历文件
            // 从文件中获取zxid
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件
                continue;
            }
            // the files
            // are sorted with zxid's
            if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid)
                logZxid = fzxid;
            }
        }
        // 文件列表
        List<File> v=new ArrayList<File>(5);
        for (File f : files) { // 再次遍历文件
            // 从文件中获取zxid
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid < logZxid) { // 跳过小于logZxid的文件
                continue;
            }
            // 添加
            v.add(f);
        }
        // 转化成File[] 类型后返回
        return v.toArray(new File[0]);

    }

说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。

  • ① 对所有log文件按照zxid进行升序排序,进入②
  • ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③
  • ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④
  • ④ 转化后返回

getLogFiles函数调用了sortDataDir,其源码如下: 

public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
    {
        if(files==null)
            return new ArrayList<File>(0);
        // 转化为列表
        List<File> filelist = Arrays.asList(files);
        // 进行排序,Comparator是关键,根据zxid进行排序
        Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
        return filelist;
    }

说明:其用于排序log文件,可以选择根据zxid进行升序或降序。

getLogFiles函数间接调用了getZxidFromName,其源码如下: 

    // 从文件名中解析出zxid
    public static long getZxidFromName(String name, String prefix) {
        long zxid = -1;
        // 对文件名进行分割
        String nameParts[] = name.split("\\.");
        if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同
            try {
                // 转化成长整形
                zxid = Long.parseLong(nameParts[1], 16);
            } catch (NumberFormatException e) {
            }
        }
        return zxid;
    }

说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。

3. getLastLoggedZxid函数 

    public long getLastLoggedZxid() {
        // 获取已排好序的所有的log文件
        File[] files = getLogFiles(logDir.listFiles(), 0);
        // 获取最大的zxid(最后一个log文件对应的zxid)
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

        // if a log file is more recent we must scan it to find
        // the highest zxid
        //
        long zxid = maxLog;
        // 迭代器
        TxnIterator itr = null;
        try {
            // 新生FileTxnLog
            FileTxnLog txn = new FileTxnLog(logDir);
            // 开始读取从给定zxid之后的所有事务
            itr = txn.read(maxLog);
            while (true) { // 遍历
                if(!itr.next()) // 是否存在下一项
                    break;
                // 获取事务头
                TxnHeader hdr = itr.getHeader();
                // 获取zxid
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            // 关闭迭代器
            close(itr);
        }
        return zxid;
    }

说明:该函数主要用于获取记录在log中的最后一个zxid。其步骤大致如下

  • ① 获取已排好序的所有log文件,并从最后一个文件中取出zxid作为候选的最大zxid,进入②
  • ② 新生成FileTxnLog并读取步骤①中zxid之后的所有事务,进入③
  • ③ 遍历所有事务并提取出相应的zxid,最后返回。

其中getLastLoggedZxid调用了read函数,其源码如下 

 public TxnIterator read(long zxid) throws IOException {
        // 返回事务文件访问迭代器
        return new FileTxnIterator(logDir, zxid);
    }

说明:read函数会生成一个FileTxnIterator,其是TxnLog.TxnIterator的子类,之后在FileTxnIterator构造函数中会调用init函数,其源码如下 

    void init() throws IOException {
        // 新生成文件列表
        storedFiles = new ArrayList<File>();
        // 进行排序
        List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
        for (File f: files) { // 遍历文件
            if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大于等于指定zxid的文件
                storedFiles.add(f);
            }
            // add the last logfile that is less than the zxid
            else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一个zxid小于指定zxid的文件,然后退出
                storedFiles.add(f);
                break;
            }
        }
        // go to the next logfile
        // 进入下一个log文件
        goToNextLog();
        if (!next()) // 不存在下一项,返回
            return;
        while (hdr.getZxid() < zxid) { // 从事务头中获取zxid小于给定zxid,直到不存在下一项或者大于给定zxid时退出
            if (!next())
                return;
        }
    }

说明:init函数用于进行初始化操作,会根据zxid的不同进行不同的初始化操作,在init函数中会调用goToNextLog函数,其源码如下

    private boolean goToNextLog() throws IOException {
        if (storedFiles.size() > 0) { // 存储的文件列表大于0
            // 取最后一个log文件
            this.logFile = storedFiles.remove(storedFiles.size()-1);
            // 针对该文件,创建InputArchive
            ia = createInputArchive(this.logFile);
            // 返回true
            return true;
        }
        return false;
    }

说明:goToNextLog表示选取下一个log文件,在init函数中还调用了next函数,其源码如下

    public boolean next() throws IOException {
        if (ia == null) { // 为空,返回false
            return false;
        }
        try {
            // 读取长整形crcValue
            long crcValue = ia.readLong("crcvalue");
            // 通过input archive读取一个事务条目
            byte[] bytes = Util.readTxnBytes(ia);
            // Since we preallocate, we define EOF to be an
            if (bytes == null || bytes.length==0) { // 对bytes进行判断
                throw new EOFException("Failed to read " + logFile);
            }
            // EOF or corrupted record
            // validate CRC
            // 验证CRC
            Checksum crc = makeChecksumAlgorithm();
            // 更新
            crc.update(bytes, 0, bytes.length);
            if (crcValue != crc.getValue()) // 验证不相等,抛出异常
                throw new IOException(CRC_ERROR);
            if (bytes == null || bytes.length == 0) // bytes为空,返回false
                return false;
            // 新生成TxnHeader
            hdr = new TxnHeader();
            // 将Txn反序列化,并且将对应的TxnHeader反序列化至hdr,整个Record反序列化至record
            record = SerializeUtils.deserializeTxn(bytes, hdr);
        } catch (EOFException e) { // 抛出异常
            LOG.debug("EOF excepton " + e);
            // 关闭输入流
            inputStream.close();
            // 赋值为null
            inputStream = null;
            ia = null;
            hdr = null;
            // this means that the file has ended
            // we should go to the next file
            if (!goToNextLog()) { // 没有log文件,则返回false
                return false;
            }
            // if we went to the next log file, we should call next() again
            // 继续调用next
            return next();
        } catch (IOException e) {
            inputStream.close();
            throw e;
        }
        // 返回true
        return true;
    }

说明:next表示将迭代器移动至下一个事务,方便读取,next函数的步骤如下。

  • ① 读取事务的crcValue值,用于后续的验证,进入②
  • ② 读取事务,使用CRC32进行更新并与①中的结果进行比对,若不相同,则抛出异常,否则,进入③
  • ③ 将事务进行反序列化并保存至相应的属性中(如事务头和事务体),会确定具体的事务操作类型。
  • ④ 在读取过程抛出异常时,会首先关闭流,然后再尝试调用next函数(即进入下一个事务进行读取)。

4. commit函数

    public synchronized void commit() throws IOException {
        if (logStream != null) {
            // 强制刷到磁盘
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) { // 遍历流
            // 强制刷到磁盘
            log.flush();
            if (forceSync) { // 是否强制同步
                long startSyncNS = System.nanoTime();

                log.getChannel().force(false);
                // 计算流式的时间
                long syncElapsedMS =
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) { // 大于阈值时则会警告
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) { // 移除流并关闭
            streamsToFlush.removeFirst().close();
        }
    }

说明:该函数主要用于提交事务日志至磁盘,其大致步骤如下

  • ① 若日志流logStream不为空,则强制刷新至磁盘,进入②
  • ② 遍历需要刷新至磁盘的所有流streamsToFlush并进行刷新,进入③
  • ③ 判断是否需要强制性同步,如是,则计算每个流的流式时间并在控制台给出警告,进入④
  • ④ 移除所有流并关闭。

5. truncate函数 

    public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            // 获取迭代器
            itr = new FileTxnIterator(this.logDir, zxid);
            PositionInputStream input = itr.inputStream;
            long pos = input.getPosition();
            // now, truncate at the current position
            // 从当前位置开始清空
            RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
            raf.setLength(pos);
            raf.close();
            while (itr.goToNextLog()) { // 存在下一个log文件
                if (!itr.logFile.delete()) { // 删除
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            // 关闭迭代器
            close(itr);
        }
        return true;
    }

说明:该函数用于清空大于给定zxid的所有事务日志。

五、总结

对于持久化中的TxnLog和FileTxnLog的源码分析就已经完成了,其源码还是相对简单,也谢谢各位园友的观看~

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » 【Zookeeper源码分析】—持久化(一)之FileTxnLog
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 技术驿站 | 致力打造 Java 精品博客

联系作者优质文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏