1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertNotNull;
25  import static org.junit.Assert.assertTrue;
26  
27  import java.io.IOException;
28  import java.security.PrivilegedExceptionAction;
29  import java.util.Arrays;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.Coprocessor;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.KeyValue;
48  import org.apache.hadoop.hbase.testclassification.MediumTests;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.regionserver.HRegion;
52  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
53  import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
54  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
55  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
56  import org.apache.hadoop.hbase.wal.WAL;
57  import org.apache.hadoop.hbase.wal.WALFactory;
58  import org.apache.hadoop.hbase.wal.WALKey;
59  import org.apache.hadoop.hbase.wal.WALSplitter;
60  import org.apache.hadoop.hbase.security.User;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.EnvironmentEdge;
63  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64  import org.apache.hadoop.hbase.util.FSUtils;
65  import org.junit.After;
66  import org.junit.AfterClass;
67  import org.junit.Before;
68  import org.junit.BeforeClass;
69  import org.junit.Rule;
70  import org.junit.Test;
71  import org.junit.rules.TestName;
72  import org.junit.experimental.categories.Category;
73  
74  
75  
76  
77  
78  
79  @Category(MediumTests.class)
80  public class TestWALObserver {
81    private static final Log LOG = LogFactory.getLog(TestWALObserver.class);
82    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
83  
84    private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
85    private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
86        Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
87    private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
88        Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
89    private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
90        Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
91    private static byte[] TEST_ROW = Bytes.toBytes("testRow");
92  
93    @Rule
94    public TestName currentTest = new TestName();
95  
96    private Configuration conf;
97    private FileSystem fs;
98    private Path dir;
99    private Path hbaseRootDir;
100   private String logName;
101   private Path oldLogDir;
102   private Path logDir;
103   private WALFactory wals;
104 
105   @BeforeClass
106   public static void setupBeforeClass() throws Exception {
107     Configuration conf = TEST_UTIL.getConfiguration();
108     conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
109         SampleRegionWALObserver.class.getName(), SampleRegionWALObserver.Legacy.class.getName());
110     conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
111         SampleRegionWALObserver.class.getName());
112     conf.setBoolean("dfs.support.append", true);
113     conf.setInt("dfs.client.block.recovery.retries", 2);
114 
115     TEST_UTIL.startMiniCluster(1);
116     Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
117         .makeQualified(new Path("/hbase"));
118     LOG.info("hbase.rootdir=" + hbaseRootDir);
119     FSUtils.setRootDir(conf, hbaseRootDir);
120   }
121 
122   @AfterClass
123   public static void teardownAfterClass() throws Exception {
124     TEST_UTIL.shutdownMiniCluster();
125   }
126 
127   @Before
128   public void setUp() throws Exception {
129     this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
130     
131     this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
132     this.hbaseRootDir = FSUtils.getRootDir(conf);
133     this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
134     this.oldLogDir = new Path(this.hbaseRootDir,
135         HConstants.HREGION_OLDLOGDIR_NAME);
136     this.logDir = new Path(this.hbaseRootDir,
137         DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
138     this.logName = HConstants.HREGION_LOGDIR_NAME;
139 
140     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
141       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
142     }
143     this.wals = new WALFactory(conf, null, currentTest.getMethodName());
144   }
145 
146   @After
147   public void tearDown() throws Exception {
148     try {
149       wals.shutdown();
150     } catch (IOException exception) {
151       
152       LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage());
153       LOG.debug("details of failure to close wal factory.", exception);
154     }
155     TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
156   }
157 
158   
159 
160 
161 
162 
163   @Test
164   public void testWALObserverWriteToWAL() throws Exception {
165     final WAL log = wals.getWAL(UNSPECIFIED_REGION);
166     verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.class), false);
167   }
168 
169   
170 
171 
172 
173 
174   @Test
175   public void testLegacyWALObserverWriteToWAL() throws Exception {
176     final WAL log = wals.getWAL(UNSPECIFIED_REGION);
177     verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.Legacy.class), true);
178   }
179 
180   private void verifyWritesSeen(final WAL log, final SampleRegionWALObserver cp,
181       final boolean seesLegacy) throws Exception {
182     HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
183     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
184         .toString(TEST_TABLE));
185 
186     Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
187     deleteDir(basedir);
188     fs.mkdirs(new Path(basedir, hri.getEncodedName()));
189     final AtomicLong sequenceId = new AtomicLong(0);
190 
191     
192     
193     
194     cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
195         TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
196 
197     assertFalse(cp.isPreWALWriteCalled());
198     assertFalse(cp.isPostWALWriteCalled());
199     assertFalse(cp.isPreWALWriteDeprecatedCalled());
200     assertFalse(cp.isPostWALWriteDeprecatedCalled());
201 
202     
203     
204     
205     Put p = creatPutWith2Families(TEST_ROW);
206 
207     Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
208     WALEdit edit = new WALEdit();
209     addFamilyMapToWALEdit(familyMap, edit);
210 
211     boolean foundFamily0 = false;
212     boolean foundFamily2 = false;
213     boolean modifiedFamily1 = false;
214 
215     List<Cell> cells = edit.getCells();
216 
217     for (Cell cell : cells) {
218       if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
219         foundFamily0 = true;
220       }
221       if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
222         foundFamily2 = true;
223       }
224       if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
225         if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
226           modifiedFamily1 = true;
227         }
228       }
229     }
230     assertTrue(foundFamily0);
231     assertFalse(foundFamily2);
232     assertFalse(modifiedFamily1);
233 
234     
235     long now = EnvironmentEdgeManager.currentTime();
236     
237     long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
238         edit, sequenceId, true, null);
239     log.sync(txid);
240 
241     
242     foundFamily0 = false;
243     foundFamily2 = false;
244     modifiedFamily1 = false;
245     for (Cell cell : cells) {
246       if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
247         foundFamily0 = true;
248       }
249       if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
250         foundFamily2 = true;
251       }
252       if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
253         if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
254           modifiedFamily1 = true;
255         }
256       }
257     }
258     assertFalse(foundFamily0);
259     assertTrue(foundFamily2);
260     assertTrue(modifiedFamily1);
261 
262     assertTrue(cp.isPreWALWriteCalled());
263     assertTrue(cp.isPostWALWriteCalled());
264     assertEquals(seesLegacy, cp.isPreWALWriteDeprecatedCalled());
265     assertEquals(seesLegacy, cp.isPostWALWriteDeprecatedCalled());
266   }
267 
268   @Test
269   public void testNonLegacyWALKeysDoNotExplode() throws Exception {
270     TableName tableName = TableName.valueOf(TEST_TABLE);
271     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
272         .toString(TEST_TABLE));
273     final HRegionInfo hri = new HRegionInfo(tableName, null, null);
274     final AtomicLong sequenceId = new AtomicLong(0);
275 
276     fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
277 
278     final Configuration newConf = HBaseConfiguration.create(this.conf);
279 
280     final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
281     final SampleRegionWALObserver newApi = getCoprocessor(wal, SampleRegionWALObserver.class);
282     newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
283     final SampleRegionWALObserver oldApi = getCoprocessor(wal,
284         SampleRegionWALObserver.Legacy.class);
285     oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
286 
287     LOG.debug("ensuring wal entries haven't happened before we start");
288     assertFalse(newApi.isPreWALWriteCalled());
289     assertFalse(newApi.isPostWALWriteCalled());
290     assertFalse(newApi.isPreWALWriteDeprecatedCalled());
291     assertFalse(newApi.isPostWALWriteDeprecatedCalled());
292     assertFalse(oldApi.isPreWALWriteCalled());
293     assertFalse(oldApi.isPostWALWriteCalled());
294     assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
295     assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
296 
297     LOG.debug("writing to WAL with non-legacy keys.");
298     final int countPerFamily = 5;
299     for (HColumnDescriptor hcd : htd.getFamilies()) {
300       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
301           EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
302     }
303 
304     LOG.debug("Verify that only the non-legacy CP saw edits.");
305     assertTrue(newApi.isPreWALWriteCalled());
306     assertTrue(newApi.isPostWALWriteCalled());
307     assertFalse(newApi.isPreWALWriteDeprecatedCalled());
308     assertFalse(newApi.isPostWALWriteDeprecatedCalled());
309     
310     assertFalse(oldApi.isPreWALWriteCalled());
311     assertFalse(oldApi.isPostWALWriteCalled());
312     assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
313     assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
314 
315     LOG.debug("reseting cp state.");
316     newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
317     oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
318 
319     LOG.debug("write a log edit that supports legacy cps.");
320     final long now = EnvironmentEdgeManager.currentTime();
321     final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
322     final WALEdit edit = new WALEdit();
323     final byte[] nonce = Bytes.toBytes("1772");
324     edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
325     final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
326     wal.sync(txid);
327 
328     LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
329     assertTrue("non-legacy WALObserver didn't see pre-write.", newApi.isPreWALWriteCalled());
330     assertTrue("non-legacy WALObserver didn't see post-write.", newApi.isPostWALWriteCalled());
331     assertFalse("non-legacy WALObserver shouldn't have seen legacy pre-write.",
332         newApi.isPreWALWriteDeprecatedCalled());
333     assertFalse("non-legacy WALObserver shouldn't have seen legacy post-write.",
334         newApi.isPostWALWriteDeprecatedCalled());
335     assertTrue("legacy WALObserver didn't see pre-write.", oldApi.isPreWALWriteCalled());
336     assertTrue("legacy WALObserver didn't see post-write.", oldApi.isPostWALWriteCalled());
337     assertTrue("legacy WALObserver didn't see legacy pre-write.",
338         oldApi.isPreWALWriteDeprecatedCalled());
339     assertTrue("legacy WALObserver didn't see legacy post-write.",
340         oldApi.isPostWALWriteDeprecatedCalled());
341   }
342 
343   
344 
345 
346   @Test
347   public void testEmptyWALEditAreNotSeen() throws Exception {
348     final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
349     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
350     final AtomicLong sequenceId = new AtomicLong(0);
351 
352     WAL log = wals.getWAL(UNSPECIFIED_REGION);
353     try {
354       SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
355 
356       cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
357 
358       assertFalse(cp.isPreWALWriteCalled());
359       assertFalse(cp.isPostWALWriteCalled());
360 
361       final long now = EnvironmentEdgeManager.currentTime();
362       long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
363           new WALEdit(), sequenceId, true, null);
364       log.sync(txid);
365 
366       assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
367       assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled());
368     } finally {
369       log.close();
370     }
371   }
372 
373   
374 
375 
376   @Test
377   public void testWALCoprocessorReplay() throws Exception {
378     
379     
380     TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
381     final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
382     final AtomicLong sequenceId = new AtomicLong(0);
383     
384     
385     
386     
387     final HRegionInfo hri = new HRegionInfo(tableName, null, null);
388 
389     final Path basedir =
390         FSUtils.getTableDir(this.hbaseRootDir, tableName);
391     deleteDir(basedir);
392     fs.mkdirs(new Path(basedir, hri.getEncodedName()));
393 
394     final Configuration newConf = HBaseConfiguration.create(this.conf);
395 
396     
397     WAL wal = wals.getWAL(UNSPECIFIED_REGION);
398     
399     WALEdit edit = new WALEdit();
400     long now = EnvironmentEdgeManager.currentTime();
401     
402     final int countPerFamily = 1000;
403     
404     for (HColumnDescriptor hcd : htd.getFamilies()) {
405       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
406           EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
407     }
408     wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
409         true, null);
410     
411     wal.sync();
412 
413     User user = HBaseTestingUtility.getDifferentUser(newConf,
414         ".replay.wal.secondtime");
415     user.runAs(new PrivilegedExceptionAction() {
416       public Object run() throws Exception {
417         Path p = runWALSplit(newConf);
418         LOG.info("WALSplit path == " + p);
419         FileSystem newFS = FileSystem.get(newConf);
420         
421         final WALFactory wals2 = new WALFactory(conf, null, currentTest.getMethodName()+"2");
422         WAL wal2 = wals2.getWAL(UNSPECIFIED_REGION);;
423         HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir,
424             hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
425         long seqid2 = region.getOpenSeqNum();
426 
427         SampleRegionWALObserver cp2 =
428           (SampleRegionWALObserver)region.getCoprocessorHost().findCoprocessor(
429               SampleRegionWALObserver.class.getName());
430         
431         assertNotNull(cp2);
432         assertTrue(cp2.isPreWALRestoreCalled());
433         assertTrue(cp2.isPostWALRestoreCalled());
434         assertFalse(cp2.isPreWALRestoreDeprecatedCalled());
435         assertFalse(cp2.isPostWALRestoreDeprecatedCalled());
436         region.close();
437         wals2.close();
438         return null;
439       }
440     });
441   }
442 
443   
444 
445 
446 
447 
448   @Test
449   public void testWALObserverLoaded() throws Exception {
450     WAL log = wals.getWAL(UNSPECIFIED_REGION);
451     assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class));
452   }
453 
454   private SampleRegionWALObserver getCoprocessor(WAL wal,
455       Class<? extends SampleRegionWALObserver> clazz) throws Exception {
456     WALCoprocessorHost host = wal.getCoprocessorHost();
457     Coprocessor c = host.findCoprocessor(clazz.getName());
458     return (SampleRegionWALObserver) c;
459   }
460 
461   
462 
463 
464 
465 
466 
467   private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
468     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
469 
470     for (int i = 0; i < TEST_FAMILY.length; i++) {
471       HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
472       htd.addFamily(a);
473     }
474     return new HRegionInfo(htd.getTableName(), null, null, false);
475   }
476 
477   
478 
479 
480   private void deleteDir(final Path p) throws IOException {
481     if (this.fs.exists(p)) {
482       if (!this.fs.delete(p, true)) {
483         throw new IOException("Failed remove of " + p);
484       }
485     }
486   }
487 
488   private Put creatPutWith2Families(byte[] row) throws IOException {
489     Put p = new Put(row);
490     for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
491       p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
492     }
493     return p;
494   }
495 
496   
497 
498 
499 
500 
501 
502 
503 
504   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
505       WALEdit walEdit) {
506     for (List<Cell> edits : familyMap.values()) {
507       for (Cell cell : edits) {
508         
509         walEdit.add(cell);
510       }
511     }
512   }
513 
514   private Path runWALSplit(final Configuration c) throws IOException {
515     List<Path> splits = WALSplitter.split(
516       hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
517     
518     assertEquals(1, splits.size());
519     
520     assertTrue(fs.exists(splits.get(0)));
521     LOG.info("Split file=" + splits.get(0));
522     return splits.get(0);
523   }
524 
525   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
526 
527   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
528       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
529       final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
530     String familyStr = Bytes.toString(family);
531     long txid = -1;
532     for (int j = 0; j < count; j++) {
533       byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
534       byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
535       WALEdit edit = new WALEdit();
536       edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
537       
538       
539       txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
540           ee.currentTime()), edit, sequenceId, true, null);
541     }
542     if (-1 != txid) {
543       wal.sync(txid);
544     }
545   }
546 
547   private HTableDescriptor getBasic3FamilyHTableDescriptor(
548       final TableName tableName) {
549     HTableDescriptor htd = new HTableDescriptor(tableName);
550 
551     for (int i = 0; i < TEST_FAMILY.length; i++) {
552       HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
553       htd.addFamily(a);
554     }
555     return htd;
556   }
557 
558   private HTableDescriptor createBasic3FamilyHTD(final String tableName) {
559     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
560     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
561     htd.addFamily(a);
562     HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
563     htd.addFamily(b);
564     HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
565     htd.addFamily(c);
566     return htd;
567   }
568 }