1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import com.google.common.collect.Iterables;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.fs.FileSystem;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.HBaseTestingUtility;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.testclassification.SmallTests;
31  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
32  import org.apache.hadoop.hbase.io.compress.Compression;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.junit.After;
35  import org.junit.Before;
36  import org.junit.Test;
37  import org.junit.experimental.categories.Category;
38  import org.junit.runner.RunWith;
39  import org.junit.runners.Parameterized;
40  
41  import java.io.IOException;
42  import java.util.ArrayList;
43  import java.util.Arrays;
44  import java.util.List;
45  import java.util.Map;
46  import java.util.Random;
47  
48  import static org.junit.Assert.*;
49  
50  
51  
52  
53  
54  @Category(SmallTests.class)
55  @RunWith(Parameterized.class)
56  public class TestLazyDataBlockDecompression {
57    private static final Log LOG = LogFactory.getLog(TestLazyDataBlockDecompression.class);
58    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
59  
60    private FileSystem fs;
61  
62    @Parameterized.Parameter(0)
63    public boolean cacheOnWrite;
64  
65    @Parameterized.Parameters
66    public static Iterable<Object[]> data() {
67      return Arrays.asList(new Object[][] {
68        { false },
69        { true }
70      });
71    }
72  
73    @Before
74    public void setUp() throws IOException {
75      CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
76      fs = FileSystem.get(TEST_UTIL.getConfiguration());
77    }
78  
79    @After
80    public void tearDown() {
81      CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
82      fs = null;
83    }
84  
85    
86  
87  
88  
89    private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path,
90        HFileContext cxt, int entryCount) throws IOException {
91      HFileWriterV2 writer = (HFileWriterV2)
92        new HFileWriterV2.WriterFactoryV2(conf, cc)
93          .withPath(fs, path)
94          .withFileContext(cxt)
95          .create();
96  
97      
98      Random rand = new Random(9713312); 
99      final byte[] family = Bytes.toBytes("f");
100     final byte[] qualifier = Bytes.toBytes("q");
101 
102     for (int i = 0; i < entryCount; i++) {
103       byte[] keyBytes = TestHFileWriterV2.randomOrderedKey(rand, i);
104       byte[] valueBytes = TestHFileWriterV2.randomValue(rand);
105       
106       writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes));
107     }
108     writer.close();
109   }
110 
111   
112 
113 
114   private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs,
115       Path path, HFileContext cxt) throws IOException {
116     FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
117     long fileSize = fs.getFileStatus(path).getLen();
118     FixedFileTrailer trailer =
119       FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
120     HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig,
121       fsdis.getHfs(), conf);
122     reader.loadFileInfo();
123     long offset = trailer.getFirstDataBlockOffset(),
124       max = trailer.getLastDataBlockOffset();
125     List<HFileBlock> blocks = new ArrayList<HFileBlock>(4);
126     HFileBlock block;
127     while (offset <= max) {
128       block = reader.readBlock(offset, -1, 
129       
130       offset += block.getOnDiskSizeWithHeader();
131       blocks.add(block);
132     }
133     LOG.info("read " + Iterables.toString(blocks));
134   }
135 
136   @Test
137   public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception {
138     
139     int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1);
140     Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
141       "testCompressionIncreasesEffectiveBlockcacheSize");
142     HFileContext context = new HFileContextBuilder()
143       .withCompression(Compression.Algorithm.GZ)
144       .build();
145     LOG.info("context=" + context);
146 
147     
148     Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
149     lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
150     lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
151     lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
152     lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
153     CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
154       new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled);
155     CacheConfig cc = new CacheConfig(lazyCompressDisabled);
156     assertFalse(cc.shouldCacheDataCompressed());
157     assertTrue(cc.getBlockCache() instanceof LruBlockCache);
158     LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache();
159     LOG.info("disabledBlockCache=" + disabledBlockCache);
160     assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());
161     assertTrue("eviction thread spawned unintentionally.",
162       disabledBlockCache.getEvictionThread() == null);
163     assertEquals("freshly created blockcache contains blocks.",
164       0, disabledBlockCache.getBlockCount());
165 
166     
167     
168     writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000);
169 
170     
171     cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context);
172     long disabledBlockCount = disabledBlockCache.getBlockCount();
173     assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount,
174       disabledBlockCount > 0);
175     long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount();
176     for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
177       disabledBlockCache.getMapForTests().entrySet()) {
178       HFileBlock block = (HFileBlock) e.getValue().getBuffer();
179       assertTrue("found a packed block, block=" + block, block.isUnpacked());
180     }
181 
182     
183     Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
184     lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
185     lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
186     lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
187     lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
188     CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
189       new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled);
190     cc = new CacheConfig(lazyCompressEnabled);
191     assertTrue("test improperly configured.", cc.shouldCacheDataCompressed());
192     assertTrue(cc.getBlockCache() instanceof LruBlockCache);
193     LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache();
194     LOG.info("enabledBlockCache=" + enabledBlockCache);
195     assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize());
196     assertTrue("eviction thread spawned unintentionally.",
197       enabledBlockCache.getEvictionThread() == null);
198     assertEquals("freshly created blockcache contains blocks.",
199       0, enabledBlockCache.getBlockCount());
200 
201     cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context);
202     long enabledBlockCount = enabledBlockCache.getBlockCount();
203     assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount,
204       enabledBlockCount > 0);
205     long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount();
206     int candidatesFound = 0;
207     for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
208         enabledBlockCache.getMapForTests().entrySet()) {
209       candidatesFound++;
210       HFileBlock block = (HFileBlock) e.getValue().getBuffer();
211       if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) {
212         assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" +
213           block.getBufferWithoutHeader().capacity(), block.isUnpacked());
214       }
215     }
216     assertTrue("did not find any candidates for compressed caching. Invalid test.",
217       candidatesFound > 0);
218 
219     LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
220       enabledBlockCount);
221     assertTrue("enabling compressed data blocks should increase the effective cache size. " +
222       "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
223       enabledBlockCount, disabledBlockCount < enabledBlockCount);
224 
225     LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
226       enabledEvictedCount);
227     assertTrue("enabling compressed data blocks should reduce the number of evictions. " +
228       "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
229       enabledEvictedCount, enabledEvictedCount < disabledEvictedCount);
230   }
231 }