1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.security.Key;
23  import java.security.SecureRandom;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.NavigableSet;
31  import java.util.concurrent.ConcurrentSkipListSet;
32  
33  import javax.crypto.spec.SecretKeySpec;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.HarFileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HColumnDescriptor;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.HRegionInfo;
48  import org.apache.hadoop.hbase.HTableDescriptor;
49  import org.apache.hadoop.hbase.KeyValue;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.Tag;
52  import org.apache.hadoop.hbase.TagType;
53  import org.apache.hadoop.hbase.client.Get;
54  import org.apache.hadoop.hbase.client.Scan;
55  import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
56  import org.apache.hadoop.hbase.io.crypto.aes.AES;
57  import org.apache.hadoop.hbase.io.hfile.HFile;
58  import org.apache.hadoop.hbase.mob.MobConstants;
59  import org.apache.hadoop.hbase.mob.MobUtils;
60  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
61  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
62  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
63  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
64  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
65  import org.apache.hadoop.hbase.security.EncryptionUtil;
66  import org.apache.hadoop.hbase.security.User;
67  import org.apache.hadoop.hbase.testclassification.MediumTests;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.FSUtils;
70  import org.apache.hadoop.hbase.wal.WALFactory;
71  import org.junit.Assert;
72  import org.junit.Before;
73  import org.junit.Rule;
74  import org.junit.Test;
75  import org.junit.experimental.categories.Category;
76  import org.junit.rules.TestName;
77  import org.mockito.Mockito;
78  
79  @Category(MediumTests.class)
80  public class TestHMobStore {
81    public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
82    @Rule public TestName name = new TestName();
83  
84    private HMobStore store;
85    private HRegion region;
86    private HColumnDescriptor hcd;
87    private FileSystem fs;
88    private byte [] table = Bytes.toBytes("table");
89    private byte [] family = Bytes.toBytes("family");
90    private byte [] row = Bytes.toBytes("row");
91    private byte [] row2 = Bytes.toBytes("row2");
92    private byte [] qf1 = Bytes.toBytes("qf1");
93    private byte [] qf2 = Bytes.toBytes("qf2");
94    private byte [] qf3 = Bytes.toBytes("qf3");
95    private byte [] qf4 = Bytes.toBytes("qf4");
96    private byte [] qf5 = Bytes.toBytes("qf5");
97    private byte [] qf6 = Bytes.toBytes("qf6");
98    private byte[] value = Bytes.toBytes("value");
99    private byte[] value2 = Bytes.toBytes("value2");
100   private Path mobFilePath;
101   private Date currentDate = new Date();
102   private KeyValue seekKey1;
103   private KeyValue seekKey2;
104   private KeyValue seekKey3;
105   private NavigableSet<byte[]> qualifiers =
106     new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
107   private List<Cell> expected = new ArrayList<Cell>();
108   private long id = System.currentTimeMillis();
109   private Get get = new Get(row);
110   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
111   private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString();
112 
113   
114 
115 
116 
117   @Before
118   public void setUp() throws Exception {
119     qualifiers.add(qf1);
120     qualifiers.add(qf3);
121     qualifiers.add(qf5);
122 
123     Iterator<byte[]> iter = qualifiers.iterator();
124     while(iter.hasNext()){
125       byte [] next = iter.next();
126       expected.add(new KeyValue(row, family, next, 1, value));
127       get.addColumn(family, next);
128       get.setMaxVersions(); 
129     }
130   }
131 
132   private void init(String methodName, Configuration conf, boolean testStore)
133   throws IOException {
134     hcd = new HColumnDescriptor(family);
135     hcd.setMobEnabled(true);
136     hcd.setMobThreshold(3L);
137     hcd.setMaxVersions(4);
138     init(methodName, conf, hcd, testStore);
139   }
140 
141   private void init(String methodName, Configuration conf,
142       HColumnDescriptor hcd, boolean testStore) throws IOException {
143     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
144     init(methodName, conf, htd, hcd, testStore);
145   }
146 
147   private void init(String methodName, Configuration conf, HTableDescriptor htd,
148       HColumnDescriptor hcd, boolean testStore) throws IOException {
149     
150     Path basedir = new Path(DIR+methodName);
151     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
152     String logName = "logs";
153     Path logdir = new Path(basedir, logName);
154     FileSystem fs = FileSystem.get(conf);
155     fs.delete(logdir, true);
156 
157     htd.addFamily(hcd);
158     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
159 
160     final Configuration walConf = new Configuration(conf);
161     FSUtils.setRootDir(walConf, basedir);
162     final WALFactory wals = new WALFactory(walConf, null, methodName);
163     region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
164             info, htd, null);
165     store = new HMobStore(region, hcd, conf);
166     if(testStore) {
167       init(conf, hcd);
168     }
169   }
170 
171   private void init(Configuration conf, HColumnDescriptor hcd)
172       throws IOException {
173     Path basedir = FSUtils.getRootDir(conf);
174     fs = FileSystem.get(conf);
175     Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR
176         + Bytes.toString(family));
177     fs.mkdirs(homePath);
178 
179     KeyValue key1 = new KeyValue(row, family, qf1, 1, value);
180     KeyValue key2 = new KeyValue(row, family, qf2, 1, value);
181     KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
182     KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
183     int maxKeyCount = keys.length;
184     StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate, maxKeyCount,
185         hcd.getCompactionCompression(), region.getRegionInfo().getStartKey());
186     mobFilePath = mobWriter.getPath();
187 
188     mobWriter.append(key1);
189     mobWriter.append(key2);
190     mobWriter.append(key3);
191     mobWriter.close();
192 
193     String targetPathName = MobUtils.formatDate(currentDate);
194     byte[] referenceValue = Bytes.toBytes(targetPathName + Path.SEPARATOR + mobFilePath.getName());
195     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName().getName());
196     KeyValue kv1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, referenceValue);
197     KeyValue kv2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, referenceValue);
198     KeyValue kv3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, referenceValue);
199     seekKey1 = MobUtils.createMobRefKeyValue(kv1, referenceValue, tableNameTag);
200     seekKey2 = MobUtils.createMobRefKeyValue(kv2, referenceValue, tableNameTag);
201     seekKey3 = MobUtils.createMobRefKeyValue(kv3, referenceValue, tableNameTag);
202   }
203 
204   
205 
206 
207 
208   @Test
209   public void testGetFromMemStore() throws IOException {
210     final Configuration conf = HBaseConfiguration.create();
211     init(name.getMethodName(), conf, false);
212 
213     
214     this.store.add(new KeyValue(row, family, qf1, 1, value));
215     this.store.add(new KeyValue(row, family, qf2, 1, value));
216     this.store.add(new KeyValue(row, family, qf3, 1, value));
217     this.store.add(new KeyValue(row, family, qf4, 1, value));
218     this.store.add(new KeyValue(row, family, qf5, 1, value));
219     this.store.add(new KeyValue(row, family, qf6, 1, value));
220 
221     Scan scan = new Scan(get);
222     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
223         scan.getFamilyMap().get(store.getFamily().getName()),
224         0);
225 
226     List<Cell> results = new ArrayList<Cell>();
227     scanner.next(results);
228     Collections.sort(results, KeyValue.COMPARATOR);
229     scanner.close();
230 
231     
232     Assert.assertEquals(expected.size(), results.size());
233     for(int i=0; i<results.size(); i++) {
234       
235       Assert.assertEquals(expected.get(i), results.get(i));
236     }
237   }
238 
239   
240 
241 
242 
243   @Test
244   public void testGetFromFiles() throws IOException {
245     final Configuration conf = TEST_UTIL.getConfiguration();
246     init(name.getMethodName(), conf, false);
247 
248     
249     this.store.add(new KeyValue(row, family, qf1, 1, value));
250     this.store.add(new KeyValue(row, family, qf2, 1, value));
251     
252     flush(1);
253 
254     
255     this.store.add(new KeyValue(row, family, qf3, 1, value));
256     this.store.add(new KeyValue(row, family, qf4, 1, value));
257     
258     flush(2);
259 
260     
261     this.store.add(new KeyValue(row, family, qf5, 1, value));
262     this.store.add(new KeyValue(row, family, qf6, 1, value));
263     
264     flush(3);
265 
266     Scan scan = new Scan(get);
267     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
268         scan.getFamilyMap().get(store.getFamily().getName()),
269         0);
270 
271     List<Cell> results = new ArrayList<Cell>();
272     scanner.next(results);
273     Collections.sort(results, KeyValue.COMPARATOR);
274     scanner.close();
275 
276     
277     Assert.assertEquals(expected.size(), results.size());
278     for(int i=0; i<results.size(); i++) {
279       Assert.assertEquals(expected.get(i), results.get(i));
280     }
281   }
282 
283   
284 
285 
286 
287   @Test
288   public void testGetReferencesFromFiles() throws IOException {
289     final Configuration conf = HBaseConfiguration.create();
290     init(name.getMethodName(), conf, false);
291 
292     
293     this.store.add(new KeyValue(row, family, qf1, 1, value));
294     this.store.add(new KeyValue(row, family, qf2, 1, value));
295     
296     flush(1);
297 
298     
299     this.store.add(new KeyValue(row, family, qf3, 1, value));
300     this.store.add(new KeyValue(row, family, qf4, 1, value));
301     
302     flush(2);
303 
304     
305     this.store.add(new KeyValue(row, family, qf5, 1, value));
306     this.store.add(new KeyValue(row, family, qf6, 1, value));
307     
308     flush(3);
309 
310     Scan scan = new Scan(get);
311     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
312     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
313       scan.getFamilyMap().get(store.getFamily().getName()),
314       0);
315 
316     List<Cell> results = new ArrayList<Cell>();
317     scanner.next(results);
318     Collections.sort(results, KeyValue.COMPARATOR);
319     scanner.close();
320 
321     
322     Assert.assertEquals(expected.size(), results.size());
323     for(int i=0; i<results.size(); i++) {
324       Cell cell = results.get(i);
325       Assert.assertTrue(MobUtils.isMobReferenceCell(cell));
326     }
327   }
328 
329   
330 
331 
332 
333   @Test
334   public void testGetFromMemStoreAndFiles() throws IOException {
335 
336     final Configuration conf = HBaseConfiguration.create();
337 
338     init(name.getMethodName(), conf, false);
339 
340     
341     this.store.add(new KeyValue(row, family, qf1, 1, value));
342     this.store.add(new KeyValue(row, family, qf2, 1, value));
343     
344     flush(1);
345 
346     
347     this.store.add(new KeyValue(row, family, qf3, 1, value));
348     this.store.add(new KeyValue(row, family, qf4, 1, value));
349     
350     flush(2);
351 
352     
353     this.store.add(new KeyValue(row, family, qf5, 1, value));
354     this.store.add(new KeyValue(row, family, qf6, 1, value));
355 
356     Scan scan = new Scan(get);
357     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
358         scan.getFamilyMap().get(store.getFamily().getName()),
359         0);
360 
361     List<Cell> results = new ArrayList<Cell>();
362     scanner.next(results);
363     Collections.sort(results, KeyValue.COMPARATOR);
364     scanner.close();
365 
366     
367     Assert.assertEquals(expected.size(), results.size());
368     for(int i=0; i<results.size(); i++) {
369       Assert.assertEquals(expected.get(i), results.get(i));
370     }
371   }
372 
373   
374 
375 
376 
377   @Test
378   public void testMobCellSizeThreshold() throws IOException {
379 
380     final Configuration conf = HBaseConfiguration.create();
381 
382     HColumnDescriptor hcd;
383     hcd = new HColumnDescriptor(family);
384     hcd.setMobEnabled(true);
385     hcd.setMobThreshold(100);
386     hcd.setMaxVersions(4);
387     init(name.getMethodName(), conf, hcd, false);
388 
389     
390     this.store.add(new KeyValue(row, family, qf1, 1, value));
391     this.store.add(new KeyValue(row, family, qf2, 1, value));
392     
393     flush(1);
394 
395     
396     this.store.add(new KeyValue(row, family, qf3, 1, value));
397     this.store.add(new KeyValue(row, family, qf4, 1, value));
398     
399     flush(2);
400 
401     
402     this.store.add(new KeyValue(row, family, qf5, 1, value));
403     this.store.add(new KeyValue(row, family, qf6, 1, value));
404     
405     flush(3);
406 
407     Scan scan = new Scan(get);
408     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
409     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
410       scan.getFamilyMap().get(store.getFamily().getName()),
411       0);
412 
413     List<Cell> results = new ArrayList<Cell>();
414     scanner.next(results);
415     Collections.sort(results, KeyValue.COMPARATOR);
416     scanner.close();
417 
418     
419     Assert.assertEquals(expected.size(), results.size());
420     for(int i=0; i<results.size(); i++) {
421       Cell cell = results.get(i);
422       
423       Assert.assertFalse(MobUtils.isMobReferenceCell(cell));
424       Assert.assertEquals(expected.get(i), results.get(i));
425       Assert.assertEquals(100, store.getFamily().getMobThreshold());
426     }
427   }
428 
429   @Test
430   public void testCommitFile() throws Exception {
431     final Configuration conf = HBaseConfiguration.create();
432     init(name.getMethodName(), conf, true);
433     String targetPathName = MobUtils.formatDate(new Date());
434     Path targetPath = new Path(store.getPath(), (targetPathName
435         + Path.SEPARATOR + mobFilePath.getName()));
436     fs.delete(targetPath, true);
437     Assert.assertFalse(fs.exists(targetPath));
438     
439     store.commitFile(mobFilePath, targetPath);
440     Assert.assertTrue(fs.exists(targetPath));
441   }
442 
443   @Test
444   public void testResolve() throws Exception {
445     final Configuration conf = HBaseConfiguration.create();
446     init(name.getMethodName(), conf, true);
447     String targetPathName = MobUtils.formatDate(currentDate);
448     Path targetPath = new Path(store.getPath(), targetPathName);
449     store.commitFile(mobFilePath, targetPath);
450     
451     Cell resultCell1 = store.resolve(seekKey1, false);
452     Cell resultCell2 = store.resolve(seekKey2, false);
453     Cell resultCell3 = store.resolve(seekKey3, false);
454     
455     Assert.assertEquals(Bytes.toString(value),
456         Bytes.toString(CellUtil.cloneValue(resultCell1)));
457     Assert.assertEquals(Bytes.toString(value),
458         Bytes.toString(CellUtil.cloneValue(resultCell2)));
459     Assert.assertEquals(Bytes.toString(value2),
460         Bytes.toString(CellUtil.cloneValue(resultCell3)));
461   }
462 
463   
464 
465 
466 
467 
468   private void flush(int storeFilesSize) throws IOException{
469     this.store.snapshot();
470     flushStore(store, id++);
471     Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
472     Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
473   }
474 
475   
476 
477 
478 
479 
480 
481   private static void flushStore(HMobStore store, long id) throws IOException {
482     StoreFlushContext storeFlushCtx = store.createFlushContext(id);
483     storeFlushCtx.prepare();
484     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
485     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
486   }
487 
488   @Test
489   public void testMOBStoreEncryption() throws Exception {
490     final Configuration conf = TEST_UTIL.getConfiguration();
491 
492     conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
493     conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
494     SecureRandom rng = new SecureRandom();
495     byte[] keyBytes = new byte[AES.KEY_LENGTH];
496     rng.nextBytes(keyBytes);
497     String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
498     Key cfKey = new SecretKeySpec(keyBytes, algorithm);
499 
500     HColumnDescriptor hcd = new HColumnDescriptor(family);
501     hcd.setMobEnabled(true);
502     hcd.setMobThreshold(100);
503     hcd.setMaxVersions(4);
504     hcd.setEncryptionType(algorithm);
505     hcd.setEncryptionKey(EncryptionUtil.wrapKey(conf,
506       conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey));
507 
508     init(name.getMethodName(), conf, hcd, false);
509 
510     this.store.add(new KeyValue(row, family, qf1, 1, value));
511     this.store.add(new KeyValue(row, family, qf2, 1, value));
512     this.store.add(new KeyValue(row, family, qf3, 1, value));
513     flush(1);
514 
515     this.store.add(new KeyValue(row, family, qf4, 1, value));
516     this.store.add(new KeyValue(row, family, qf5, 1, value));
517     this.store.add(new KeyValue(row, family, qf6, 1, value));
518     flush(2);
519 
520     Collection<StoreFile> storefiles = this.store.getStorefiles();
521     checkMobHFileEncrytption(storefiles);
522 
523     
524     Scan scan = new Scan(get);
525     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
526         scan.getFamilyMap().get(store.getFamily().getName()),
527         0);
528 
529     List<Cell> results = new ArrayList<Cell>();
530     scanner.next(results);
531     Collections.sort(results, KeyValue.COMPARATOR);
532     scanner.close();
533     Assert.assertEquals(expected.size(), results.size());
534     for(int i=0; i<results.size(); i++) {
535       Assert.assertEquals(expected.get(i), results.get(i));
536     }
537 
538     
539     this.store.triggerMajorCompaction();
540     CompactionContext requestCompaction = this.store.requestCompaction(1, null);
541     this.store.compact(requestCompaction, NoLimitCompactionThroughputController.INSTANCE);
542     Assert.assertEquals(1, this.store.getStorefiles().size());
543 
544     
545     checkMobHFileEncrytption(this.store.getStorefiles());
546   }
547 
548   private void checkMobHFileEncrytption(Collection<StoreFile> storefiles) {
549     StoreFile storeFile = storefiles.iterator().next();
550     HFile.Reader reader = storeFile.getReader().getHFileReader();
551     byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
552     Assert.assertTrue(null != encryptionKey);
553     Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
554         .equals(HConstants.CIPHER_AES));
555   }
556 
557 }