hbase源码系列(十三)缓存机制MemStore与Block Cache

作者:岑玉海

出处:https://www.cnblogs.com/cenyuhai/tag/hbase%E6%BA%90%E7%A0%81%E7%B3%BB%E5%88%97/


Hbase 版本:0.96

下面看看getOrMakeChunk看看是啥情况,挺疑惑的东西。


private Chunk getOrMakeChunk() { while (true) { // 当前的Chunk不为空,就取当前的 Chunk c = curChunk.get(); if (c != null) { return c; } // 这里还有个Chunk的Pool,默认是没有的,走的是new Chunk这条路径 c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize); if (curChunk.compareAndSet(null, c)) { // curChunk是为空的话,就设置为c,然后加到chunkQueue里面 c.init(); this.chunkQueue.add(c); return c; } else if (chunkPool != null) { // 先放回去,待会儿再拿出来 chunkPool.putbackChunk(c); } } }

Chunk是一个持有一个byte[]数组的数据结构,属性如下。

static class Chunk {
    /* 实际数据保存的地方,被不停地分配 */
    private byte[] data;
    private static final int UNINITIALIZED = -1;
    private static final int OOM = -2;
    /* 下一个chunk的起始位置,也是上一个chunk的结束位置 */
    private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);

    /** 分配给了多少个kv */
    private AtomicInteger allocCount = new AtomicInteger();

    /** Chunk的大小 */
    private final int size;

好吧,我们现在清楚了,它是给每个kv的数据又重新找了个地方混,从注释上面讲这个Chunk未初始化,没有被分配内存,所以开销小。不太理解这个东西,人家之前也是在byte数组里面混,只不顾挪了个窝了,莫非是为了减少内存碎片?尼玛,还真被我说中了,在我以前的资料里面有《调优》

不管怎么样吧,把多个小的kv写到一个连续的数组里面可能是好点好处吧,下面讲一下它的相关参数吧。

 /** 可分配的最大值,超过这个值就不给它分配了,默认值是256K */
hbase.hregion.memstore.mslab.max.allocation 默认值是256  * 1024
/** 每个Chunk的大小,默认是2M */
hbase.hregion.memstore.mslab.chunksize 默认值是2048 * 1024

那我们继续讲讲这个MemStoreChunkPool吧,它默认是不被开启的,因为它的参数hbase.hregion.memstore.chunkpool.maxsize默认是0 (只允许输入0->1的数值),它是通过堆内存的最大值*比例来计算得出来的结果。

它可以承受的最大的Chunk的数量是这么计算的 MaxCount = MemStore内存限制 * Chunkpool.Maxsize / Chunksize。

MemStore的内存最大最小值分别是0.35 –> 0.4,这个在我之前的博客里面也有。

 hbase.regionserver.global.memstore.upperLimit
hbase.regionserver.global.memstore.lowerLimit

还有这个参数hbase.hregion.memstore.chunkpool.initialsize需要设置,默认又是0,输入0->1的数值,MaxCount乘以它就设置初始的Chunk大小。

没试过开启这个Pool效果是否会好,它是依附在MemStore里面的,它设置过大了,最直接的影响就是,另外两个集合的空间就小了。

1.2 有序集合

分配完Chunk之后,干的是这个函数,就是添加到一个有序集合当中kvset。


private long internalAdd(final KeyValue toAdd) { long s = heapSizeChange(toAdd, addToKVSet(toAdd)); //把时间戳范围加到内部去 timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); return s; }

MemStore里面有两个有序的集合,kvset和snapshot,KeyValueSkipListSet的内部实现是ConcurrentNavigableMap。

 volatile KeyValueSkipListSet kvset;
volatile KeyValueSkipListSet snapshot;

它们的排序规则上一章已经说过了,排过序的在搜索的时候方便查找,这里为什么还有一个snapshot呢?snapshot是一个和它一样的东西,我们都知道MemStore是要flush到文件生成StoreFile的,那我不能写文件的时候让别人都没法读了吧,那怎么办,先把它拷贝到snapshot当中,这个时间很短,复制完了就可以访问kvset,实际flush的之后,我们flush掉snapshot当中的kv就可以啦。

2. CacheConfig

在看这个之前,先推荐看一下我的另外一篇文章《缓存机制以及可以利用SSD作为存储的BucketCache》,否则后面有很多概念,你看不懂的。

这里我们主要关注的是LruBlockCache和BucketCache,至于他们的使用,请参照上面的博客设置,这里不再介绍哦。

CacheConfig是一个HStore一个,属性是根据列族定制的,比如是否常驻内存,但是它内存用来缓存块的BlockCache是Region Server全局共享的的globalBlockCache,在new一个CacheConfig的时候,它会调用instantiateBlockCache方法返回一个BlockCache缓存Block的,如果已经存在globalBlockCache,就直接返回,没有才会重新实例化一个globalBlockCache。

这里还分堆上内存和直接分配的内存,堆上的内存的参数hfile.block.cache.size默认是0.25。

2.1 DoubleCache

直接分配的内存,要通过设置JVM参数-XX:MaxDirectMemorySize来设置,设置了这个之后我们还需要设置hbase.offheapcache.percentage(默认是0)来设置占直接分配内存的比例。

offHeapCacheSize =offheapcache.percentage * DirectMemorySize

这里我们还真不能设置它,因为如果设置了它的话,它会把new一个DoubleCache出来,它是LruBlockCache和SlabCache的合体,之前我提到的那篇文章里面说到SlabCache是一个只能存固定大小的Block大小的Cache,比较垃圾。

2.2 LruBlockCache

如果offHeapCacheSize <= 0,就走下面的逻辑,这里我就简单陈述一下了,代码没啥可贴的。

LruBlockCache和BucketCache的合作方式有两种,一种是BucketCache作为二级缓存使用,比如SSD,一种是在内存当中,它俩各占比列0.1和0.9,还是建议上SSD做二级缓存,其实也不贵。

不管如何,BlockCache这块的总大小是固定的,是由这个参数决定hfile.block.cache.size,默认它是0.25,所以LruBlockCache最大也就是0.25的最大堆内存。

在LruBlockCache当中还分了三种优先级的缓存块,分别是SINGLE、MULTI、MEMORY,比列分别是0.25、0.5、0.25,当快要满的时候,要把块剔除出内存的时候,就要遍历所有的块了,然后计算他们的分别占的比例,剔除的代码还挺有意思。


   PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3); bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); int remainingBuckets = 3; long bytesFreed = 0; BlockBucket bucket; while((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); if(overflow > 0) { //把要释放的空间bytesToFree分给3个bucket,3个分完 long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; }

搞了一个优先级队列,先从SINGLE的开刀、SINGLE不行了,再拿MULTI开刀,最后是MEMORY。bytesToFree是之前计算好的,要释放的大小=当前值-最小值。

在我们设置列族参数的时候,有一个InMemory的参数,如果设置了它就是MEMORY,如果没设置,就是SINGLE,SINGLE类型的一旦被访问过之后,立马变成高富帅的MULTI,但是没有希望变成MEMORY。

这里之前百度的一个哥么问我,Meta表的块会不会一直被保存在MEMORY当中呢,这块的代码写得让人有点儿郁闷的,它是按照列族的参数设置的,但是我怎么去找Meta表的列族设置啊,啊被我找到了,在代码里面写着的。


public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor( TableName.META_TABLE_NAME, new HColumnDescriptor[] { new HColumnDescriptor(HConstants.CATALOG_FAMILY) // 保持10个版本是为了帮助调试 .setMaxVersions(10) .setInMemory(true) .setBlocksize(8 * 1024) .setScope(HConstants.REPLICATION_SCOPE_LOCAL) // 不使用BloomFilter .setBloomFilterType(BloomType.NONE) });

可以看出来Meta表的块只有8K,常驻内存,不使用BloomFilter,允许集群间复制。

再吐槽一下hbase这个Lru算法吧,做得挺粗糙的,它记录了每个Block块的访问次数,但是它并没有按照这个来排序,就是简单的依赖哈希值来排序。

Tips:江湖传言一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能正常启动,想想也是,hbase是内存大户,内存稍有不够就挂掉,大家要小心设置这个缓存的参数。

 2.3 BucketCache

原来这块的图在上面的那篇文章已经提到了,我就不再重复了,之前没看的请一定要看,那边有很详细的图解,我这里只是讲点我了解的实现。

我们可以从两个方法里面看LruBlockCache和BucketCache的关系,一个是getBlock,一个是evictBlock,先看evictBlock。


protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {   //从map里面删除 map.remove(block.getCacheKey());if (evictedByEvictionProcess && victimHandler != null) { boolean wait = getCurrentSize() < acceptableSize(); boolean inMemory = block.getPriority() == BlockPriority.MEMORY;    //保存到victimHandler里面 victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), inMemory, wait); } return block.heapSize() }

在把block剔除出内存之后,就把块加到victimHandler里面,这个victimHandler就是BucketCache,在CacheConfig实例化LruBlockCache之后就用setVictimCache方法传进去的。

看完这个我们再看getBlock。


public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { CachedBlock cb = map.get(cacheKey); if(cb == null) {if (victimHandler != null) return victimHandler.getBlock(cacheKey, caching, repeat); return null; } return cb.getBuffer(); }

 先从map中取,如果找不到就从victimHandler中取得。

从上面两个方法,我们可以看出来BucketCache是LruBlockCache的二级缓存,它不要了才会存到BucketCache当中,取得时候也是,找不到了才想起人家来。

好,我们现在进入到BucketCache里面看看,它里面有几个重要的属性。

 // Store/read block data
IOEngine ioEngine;
// 内存map
private ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> ramCache;
// 后备队列,质保存块的索引信息,比如offset, length
private ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;

这里怎么又来了两个,一个内存的,一个后备队里的,这个是有区别的RAMQueueEntry当中直接保存了块的buffer数据,BucketEntry只是保存了起始位置和长度。

下面我们看看这个流程吧,还是老规矩,先看入口,再看出口,入口在哪里,前面的代码中提到了,入口在cacheBlockWithWait方法。


//已经有就不加啦 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) return; //写入一级缓存 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); ramCache.put(cacheKey, re); //用哈希值给计算出一个随机的队列来 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); //把实体也插入到写入队列 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);

可以看得出来在这个方法当中,先把块写入到ramCache当中,然后再插入到一个随机的写入队列,写入线程有3个,每个写入线程持有一个写入队列,线程的数量由参数hbase.bucketcache.writer.threads控制。

我们看看这个WriterThread的run方法吧。


 List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>(); try { while (cacheEnabled && writerEnabled) { try { //从inputQueue拿出来放到entries,然后再对entries操作 entries.add(inputQueue.take()); inputQueue.drainTo(entries); } catch (InterruptedException ie) { if (!cacheEnabled) break; } doDrain(entries); }

那我们要关注的就是doDrain的方法了,在这个方法里面,它主要干了4件事情。

1、把ramCache当中的实体给剔除出来转换成BucketEntry,并切入到ioEngine。

2、ioEngine同步,ioEngine包括3种(file,offheap,heap),第一种就是写入SSD,用的是FileChannel,后两种是写入到一个ByteBufferArray

3、把BucketEntry添加到backingMap

4、如果空间不足的话,调用freeSpace清理空间,清理空间的方法和LruBlockCache的方法类似。

这里面的Bucket它也不是一个具体的东西,它里面记住的也是起始位置,使用了多少次的这些参数,所以说它是一个逻辑上的,而不是物理上的分配的一块随机的地址。


final private static class Bucket { //基准起始位置 private long baseOffset; //每个item分配的大小 private int itemAllocationSize; //对应的在bucketSizeInfos中的位置 private int sizeIndex; //总容量 private int itemCount; private int freeList[]; //空闲的数量 private int freeCount; //已经使用的数量 private int usedCount; }

我们是不是可以这么理解:就是当我们不需要某个块的时候我们不用去物理的删除它,只需要不断的重用它里面的空间就可以了,而不需要管怎么删除、释放等相关内容。

BucketSizeInfo是负责管理这些Bucket的,它管理着3个队列,同时它可以动态根据需求,new一些新的不同大小的Bucket出来,也可以把现有的Bucket变更它的大小,Bucket的大小最小是5K,最大是513K。

 final class BucketSizeInfo {
    // Free bucket means it has space to allocate a block;
    // Completely free bucket means it has no block.
    private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
    private int sizeIndex;
}

sizeIndex是啥意思?是在BucketSizeInfo的数组里面的位置,它的大小都是有固定的值的,不能多也不能少,这里就不详细介绍了。我们直接看WriteToCache这个方法吧,好验证一下之前的想法。


    //序列化长度 = 数据长度 + 额外的序列化的长度16个字节 int len = data.getSerializedLength(); // This cacheable thing can't be serialized... if (len == 0) return null; //bucketAllocator给分配点空间 long offset = bucketAllocator.allocateBlock(len); //生成一个实体 BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory); //设置Deserializer,具体的实现在HFileBlock当中 bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader(); sliceBuf.rewind(); assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer); //先写入数据信息,再写入头信息 ioEngine.write(sliceBuf, offset); ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); } else { //如果不是HFileBlock的话,把数据序列化到bb当中,然后写入到IOEngine ByteBuffer bb = ByteBuffer.allocate(len); data.serialize(bb); ioEngine.write(bb, offset); } } catch (IOException ioe) { // 出错了就释放掉这个这个块 bucketAllocator.freeBlock(offset); throw ioe;    }

这里我们看这一句就可以了ioEngine.write(sliceBuf, offset);  在写入ioEngine的时候是要传这个offset的,也正好验证了我之前的想法,所以BucketAllocator.allocateBlock的分配管理这块就很关键了。

关于怎么分配这块,还是留个能人讲吧,我是讲不好了。

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » hbase源码系列(十三)缓存机制MemStore与Block Cache
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏