1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  import static org.mockito.Matchers.any;
26  import static org.mockito.Matchers.eq;
27  import static org.mockito.Mockito.doAnswer;
28  import static org.mockito.Mockito.spy;
29  import static org.mockito.Mockito.when;
30  
31  import java.io.FilterInputStream;
32  import java.io.IOException;
33  import java.lang.reflect.Field;
34  import java.security.PrivilegedExceptionAction;
35  import java.util.ArrayList;
36  import java.util.Collection;
37  import java.util.HashSet;
38  import java.util.List;
39  import java.util.Set;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FSDataInputStream;
48  import org.apache.hadoop.fs.FileStatus;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.fs.PathFilter;
52  import org.apache.hadoop.hbase.Cell;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseTestingUtility;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.KeyValue;
60  import org.apache.hadoop.hbase.MasterNotRunningException;
61  import org.apache.hadoop.hbase.testclassification.MediumTests;
62  import org.apache.hadoop.hbase.MiniHBaseCluster;
63  import org.apache.hadoop.hbase.ServerName;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
66  import org.apache.hadoop.hbase.client.Delete;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.HTable;
69  import org.apache.hadoop.hbase.client.Put;
70  import org.apache.hadoop.hbase.client.Result;
71  import org.apache.hadoop.hbase.client.ResultScanner;
72  import org.apache.hadoop.hbase.client.Scan;
73  import org.apache.hadoop.hbase.client.Table;
74  import org.apache.hadoop.hbase.master.HMaster;
75  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
76  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
77  import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
78  import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
79  import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
80  import org.apache.hadoop.hbase.regionserver.FlushRequester;
81  import org.apache.hadoop.hbase.regionserver.HRegion;
82  import org.apache.hadoop.hbase.regionserver.HRegionServer;
83  import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
84  import org.apache.hadoop.hbase.regionserver.Region;
85  import org.apache.hadoop.hbase.regionserver.RegionScanner;
86  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
87  import org.apache.hadoop.hbase.regionserver.Store;
88  import org.apache.hadoop.hbase.security.User;
89  import org.apache.hadoop.hbase.util.Bytes;
90  import org.apache.hadoop.hbase.util.EnvironmentEdge;
91  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
92  import org.apache.hadoop.hbase.util.FSUtils;
93  import org.apache.hadoop.hbase.util.HFileTestUtil;
94  import org.apache.hadoop.hbase.util.Pair;
95  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
96  import org.apache.hadoop.hbase.wal.WAL;
97  import org.apache.hadoop.hbase.wal.WALFactory;
98  import org.apache.hadoop.hbase.wal.WALKey;
99  import org.apache.hadoop.hbase.wal.WALSplitter;
100 import org.apache.hadoop.hdfs.DFSInputStream;
101 import org.junit.After;
102 import org.junit.AfterClass;
103 import org.junit.Before;
104 import org.junit.BeforeClass;
105 import org.junit.Rule;
106 import org.junit.Test;
107 import org.junit.experimental.categories.Category;
108 import org.junit.rules.TestName;
109 import org.mockito.Mockito;
110 import org.mockito.invocation.InvocationOnMock;
111 import org.mockito.stubbing.Answer;
112 
113 
114 
115 
116 @Category(MediumTests.class)
117 public class TestWALReplay {
118   public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
119   static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
120   private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
121   private Path hbaseRootDir = null;
122   private String logName;
123   private Path oldLogDir;
124   private Path logDir;
125   private FileSystem fs;
126   private Configuration conf;
127   private RecoveryMode mode;
128   private WALFactory wals;
129 
130   @Rule
131   public final TestName currentTest = new TestName();
132 
133 
134   @BeforeClass
135   public static void setUpBeforeClass() throws Exception {
136     Configuration conf = TEST_UTIL.getConfiguration();
137     conf.setBoolean("dfs.support.append", true);
138     
139     conf.setInt("dfs.client.block.recovery.retries", 2);
140     TEST_UTIL.startMiniCluster(3);
141     Path hbaseRootDir =
142       TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
143     LOG.info("hbase.rootdir=" + hbaseRootDir);
144     FSUtils.setRootDir(conf, hbaseRootDir);
145   }
146 
147   @AfterClass
148   public static void tearDownAfterClass() throws Exception {
149     TEST_UTIL.shutdownMiniCluster();
150   }
151 
152   @Before
153   public void setUp() throws Exception {
154     this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
155     this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
156     this.hbaseRootDir = FSUtils.getRootDir(this.conf);
157     this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
158     this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
159     this.logDir = new Path(this.hbaseRootDir, logName);
160     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
161       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
162     }
163     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
164         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
165     this.wals = new WALFactory(conf, null, currentTest.getMethodName());
166   }
167 
168   @After
169   public void tearDown() throws Exception {
170     this.wals.close();
171     TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
172   }
173 
174   
175 
176 
177   private void deleteDir(final Path p) throws IOException {
178     if (this.fs.exists(p)) {
179       if (!this.fs.delete(p, true)) {
180         throw new IOException("Failed remove of " + p);
181       }
182     }
183   }
184 
185   
186 
187 
188 
189   @Test
190   public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
191     final TableName tableName =
192         TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
193     byte[] family1 = Bytes.toBytes("cf1");
194     byte[] family2 = Bytes.toBytes("cf2");
195     byte[] qualifier = Bytes.toBytes("q");
196     byte[] value = Bytes.toBytes("testV");
197     byte[][] familys = { family1, family2 };
198     TEST_UTIL.createTable(tableName, familys);
199     Table htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
200     Put put = new Put(Bytes.toBytes("r1"));
201     put.add(family1, qualifier, value);
202     htable.put(put);
203     ResultScanner resultScanner = htable.getScanner(new Scan());
204     int count = 0;
205     while (resultScanner.next() != null) {
206       count++;
207     }
208     resultScanner.close();
209     assertEquals(1, count);
210 
211     MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
212     List<HRegion> regions = hbaseCluster.getRegions(tableName);
213     assertEquals(1, regions.size());
214 
215     
216     Region destRegion = regions.get(0);
217     int originServerNum = hbaseCluster
218         .getServerWith(destRegion.getRegionInfo().getRegionName());
219     assertTrue("Please start more than 1 regionserver", hbaseCluster
220         .getRegionServerThreads().size() > 1);
221     int destServerNum = 0;
222     while (destServerNum == originServerNum) {
223       destServerNum++;
224     }
225     HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
226     HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
227     
228     moveRegionAndWait(destRegion, destServer);
229 
230     
231     Delete del = new Delete(Bytes.toBytes("r1"));
232     htable.delete(del);
233     resultScanner = htable.getScanner(new Scan());
234     count = 0;
235     while (resultScanner.next() != null) {
236       count++;
237     }
238     resultScanner.close();
239     assertEquals(0, count);
240 
241     
242     Region region =  destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
243     region.flush(true);
244     
245     for (Store store : region.getStores()) {
246       store.triggerMajorCompaction();
247     }
248     region.compact(true);
249 
250     
251     moveRegionAndWait(destRegion, originServer);
252     
253     originServer.abort("testing");
254 
255     
256     Result result = htable.get(new Get(Bytes.toBytes("r1")));
257     if (result != null) {
258       assertTrue("Row is deleted, but we get" + result.toString(),
259           (result == null) || result.isEmpty());
260     }
261     resultScanner.close();
262   }
263 
264   private void moveRegionAndWait(Region destRegion, HRegionServer destServer)
265       throws InterruptedException, MasterNotRunningException,
266       ZooKeeperConnectionException, IOException {
267     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
268     TEST_UTIL.getHBaseAdmin().move(
269         destRegion.getRegionInfo().getEncodedNameAsBytes(),
270         Bytes.toBytes(destServer.getServerName().getServerName()));
271     while (true) {
272       ServerName serverName = master.getAssignmentManager()
273         .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
274       if (serverName != null && serverName.equals(destServer.getServerName())) {
275         TEST_UTIL.assertRegionOnServer(
276           destRegion.getRegionInfo(), serverName, 200);
277         break;
278       }
279       Thread.sleep(10);
280     }
281   }
282 
283   
284 
285 
286 
287 
288   @Test
289   public void test2727() throws Exception {
290     
291     
292     final TableName tableName =
293         TableName.valueOf("test2727");
294     HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
295     Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
296     deleteDir(basedir);
297 
298     HTableDescriptor htd = createBasic3FamilyHTD(tableName);
299     HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
300     HRegion.closeHRegion(region2);
301     final byte [] rowName = tableName.getName();
302 
303     WAL wal1 = createWAL(this.conf);
304     
305     final int countPerFamily = 1000;
306     final AtomicLong sequenceId = new AtomicLong(1);
307     for (HColumnDescriptor hcd: htd.getFamilies()) {
308       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
309           wal1, htd, sequenceId);
310     }
311     wal1.shutdown();
312     runWALSplit(this.conf);
313 
314     WAL wal2 = createWAL(this.conf);
315     
316     for (HColumnDescriptor hcd: htd.getFamilies()) {
317       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
318           ee, wal2, htd, sequenceId);
319     }
320     wal2.shutdown();
321     runWALSplit(this.conf);
322 
323     WAL wal3 = createWAL(this.conf);
324     try {
325       HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
326       long seqid = region.getOpenSeqNum();
327       
328       
329       assertTrue(seqid > sequenceId.get());
330       assertEquals(seqid - 1, sequenceId.get());
331       LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
332           + sequenceId.get());
333 
334       
335       region.close();
336     } finally {
337       wal3.close();
338     }
339   }
340 
341   
342 
343 
344 
345 
346 
347 
348 
349 
350   @Test
351   public void testRegionMadeOfBulkLoadedFilesOnly()
352   throws IOException, SecurityException, IllegalArgumentException,
353       NoSuchFieldException, IllegalAccessException, InterruptedException {
354     final TableName tableName =
355         TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
356     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
357     final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
358     deleteDir(basedir);
359     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
360     HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
361     HRegion.closeHRegion(region2);
362     WAL wal = createWAL(this.conf);
363     HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
364 
365     byte [] family = htd.getFamilies().iterator().next().getName();
366     Path f =  new Path(basedir, "hfile");
367     HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
368         Bytes.toBytes("z"), 10);
369     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
370     hfs.add(Pair.newPair(family, f.toString()));
371     region.bulkLoadHFiles(hfs, true, null);
372 
373     
374     byte [] row = tableName.getName();
375     region.put((new Put(row)).add(family, family, family));
376     wal.sync();
377     final int rowsInsertedCount = 11;
378 
379     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
380 
381     
382     final Configuration newConf = HBaseConfiguration.create(this.conf);
383     User user = HBaseTestingUtility.getDifferentUser(newConf,
384         tableName.getNameAsString());
385     user.runAs(new PrivilegedExceptionAction() {
386       @Override
387       public Object run() throws Exception {
388         runWALSplit(newConf);
389         WAL wal2 = createWAL(newConf);
390 
391         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
392           hbaseRootDir, hri, htd, wal2);
393         long seqid2 = region2.getOpenSeqNum();
394         assertTrue(seqid2 > -1);
395         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
396 
397         
398         region2.close();
399         wal2.close();
400         return null;
401       }
402     });
403   }
404 
405   
406 
407 
408 
409 
410 
411 
412 
413 
414 
415 
416   @Test
417   public void testCompactedBulkLoadedFiles()
418       throws IOException, SecurityException, IllegalArgumentException,
419       NoSuchFieldException, IllegalAccessException, InterruptedException {
420     final TableName tableName =
421         TableName.valueOf("testCompactedBulkLoadedFiles");
422     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
423     final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
424     deleteDir(basedir);
425     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
426     HRegion region2 = HRegion.createHRegion(hri,
427         hbaseRootDir, this.conf, htd);
428     HRegion.closeHRegion(region2);
429     WAL wal = createWAL(this.conf);
430     HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
431 
432     
433     byte [] row = tableName.getName();
434     byte [] family = htd.getFamilies().iterator().next().getName();
435     region.put((new Put(row)).add(family, family, family));
436     wal.sync();
437 
438     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
439     for (int i = 0; i < 3; i++) {
440       Path f = new Path(basedir, "hfile"+i);
441       HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
442           Bytes.toBytes(i + "50"), 10);
443       hfs.add(Pair.newPair(family, f.toString()));
444     }
445     region.bulkLoadHFiles(hfs, true, null);
446     final int rowsInsertedCount = 31;
447     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
448 
449     
450     region.compact(true);
451     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
452 
453     
454     final Configuration newConf = HBaseConfiguration.create(this.conf);
455     User user = HBaseTestingUtility.getDifferentUser(newConf,
456         tableName.getNameAsString());
457     user.runAs(new PrivilegedExceptionAction() {
458       @Override
459       public Object run() throws Exception {
460         runWALSplit(newConf);
461         WAL wal2 = createWAL(newConf);
462 
463         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
464             hbaseRootDir, hri, htd, wal2);
465         long seqid2 = region2.getOpenSeqNum();
466         assertTrue(seqid2 > -1);
467         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
468 
469         
470         region2.close();
471         wal2.close();
472         return null;
473       }
474     });
475   }
476 
477 
478   
479 
480 
481 
482 
483 
484 
485 
486 
487   @Test
488   public void testReplayEditsWrittenViaHRegion()
489   throws IOException, SecurityException, IllegalArgumentException,
490       NoSuchFieldException, IllegalAccessException, InterruptedException {
491     final TableName tableName =
492         TableName.valueOf("testReplayEditsWrittenViaHRegion");
493     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
494     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
495     deleteDir(basedir);
496     final byte[] rowName = tableName.getName();
497     final int countPerFamily = 10;
498     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
499     HRegion region3 = HRegion.createHRegion(hri,
500             hbaseRootDir, this.conf, htd);
501     HRegion.closeHRegion(region3);
502     
503     
504     
505     WAL wal = createWAL(this.conf);
506     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
507     long seqid = region.getOpenSeqNum();
508     boolean first = true;
509     for (HColumnDescriptor hcd: htd.getFamilies()) {
510       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
511       if (first) {
512         
513         region.flush(true);
514         first = false;
515       }
516     }
517     
518     final Get g = new Get(rowName);
519     Result result = region.get(g);
520     assertEquals(countPerFamily * htd.getFamilies().size(),
521       result.size());
522     
523     
524     
525     region.close(true);
526     wal.shutdown();
527     runWALSplit(this.conf);
528     WAL wal2 = createWAL(this.conf);
529     HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
530     long seqid2 = region2.getOpenSeqNum();
531     assertTrue(seqid + result.size() < seqid2);
532     final Result result1b = region2.get(g);
533     assertEquals(result.size(), result1b.size());
534 
535     
536     
537     
538     for (HColumnDescriptor hcd: htd.getFamilies()) {
539       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
540     }
541     
542     final Result result2 = region2.get(g);
543     assertEquals(2 * result.size(), result2.size());
544     wal2.sync();
545     final Configuration newConf = HBaseConfiguration.create(this.conf);
546     User user = HBaseTestingUtility.getDifferentUser(newConf,
547       tableName.getNameAsString());
548     user.runAs(new PrivilegedExceptionAction() {
549       @Override
550       public Object run() throws Exception {
551         runWALSplit(newConf);
552         FileSystem newFS = FileSystem.get(newConf);
553         
554         WAL wal3 = createWAL(newConf);
555         final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
556         HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
557           @Override
558           protected boolean restoreEdit(Store s, Cell cell) {
559             boolean b = super.restoreEdit(s, cell);
560             countOfRestoredEdits.incrementAndGet();
561             return b;
562           }
563         };
564         long seqid3 = region3.initialize();
565         Result result3 = region3.get(g);
566         
567         assertEquals(result2.size(), result3.size());
568         assertEquals(htd.getFamilies().size() * countPerFamily,
569           countOfRestoredEdits.get());
570 
571         
572         region3.close();
573         wal3.close();
574         return null;
575       }
576     });
577   }
578 
579   
580 
581 
582 
583 
584 
585 
586 
587 
588 
589 
590 
591 
592 
593 
594 
595 
596 
597   @Test
598   public void testReplayEditsAfterPartialFlush()
599   throws IOException, SecurityException, IllegalArgumentException,
600       NoSuchFieldException, IllegalAccessException, InterruptedException {
601     final TableName tableName =
602         TableName.valueOf("testReplayEditsWrittenViaHRegion");
603     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
604     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
605     deleteDir(basedir);
606     final byte[] rowName = tableName.getName();
607     final int countPerFamily = 10;
608     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
609     HRegion region3 = HRegion.createHRegion(hri,
610             hbaseRootDir, this.conf, htd);
611     HRegion.closeHRegion(region3);
612     
613     
614     
615     WAL wal = createWAL(this.conf);
616     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
617     long seqid = region.getOpenSeqNum();
618     for (HColumnDescriptor hcd: htd.getFamilies()) {
619       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
620     }
621 
622     
623     final Get g = new Get(rowName);
624     Result result = region.get(g);
625     assertEquals(countPerFamily * htd.getFamilies().size(),
626       result.size());
627 
628     
629     region.flush(true);
630     region.close(true);
631     wal.shutdown();
632 
633     
634     
635     
636     
637     int cf_count = 0;
638     for (HColumnDescriptor hcd: htd.getFamilies()) {
639       cf_count++;
640       if (cf_count == 2) {
641         region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
642       }
643     }
644 
645 
646     
647     runWALSplit(this.conf);
648     WAL wal2 = createWAL(this.conf);
649     HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
650     long seqid2 = region2.getOpenSeqNum();
651     assertTrue(seqid + result.size() < seqid2);
652 
653     final Result result1b = region2.get(g);
654     assertEquals(result.size(), result1b.size());
655   }
656 
657 
658   
659   
660   public static class CustomStoreFlusher extends DefaultStoreFlusher {
661     
662     static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
663 
664     public CustomStoreFlusher(Configuration conf, Store store) {
665       super(conf, store);
666     }
667     @Override
668     public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
669         MonitoredTask status) throws IOException {
670       if (throwExceptionWhenFlushing.get()) {
671         throw new IOException("Simulated exception by tests");
672       }
673       return super.flushSnapshot(snapshot, cacheFlushId, status);
674     }
675 
676   };
677 
678   
679 
680 
681 
682 
683 
684   @Test
685   public void testReplayEditsAfterAbortingFlush() throws IOException {
686     final TableName tableName =
687         TableName.valueOf("testReplayEditsAfterAbortingFlush");
688     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
689     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
690     deleteDir(basedir);
691     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
692     HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
693     region3.close();
694     region3.getWAL().close();
695     
696     
697     
698     WAL wal = createWAL(this.conf);
699     RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
700     Mockito.doReturn(false).when(rsServices).isAborted();
701     when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
702     Configuration customConf = new Configuration(this.conf);
703     customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
704         CustomStoreFlusher.class.getName());
705     HRegion region =
706       HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
707     int writtenRowCount = 10;
708     List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
709         htd.getFamilies());
710     for (int i = 0; i < writtenRowCount; i++) {
711       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
712       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
713           Bytes.toBytes("val"));
714       region.put(put);
715     }
716 
717     
718     RegionScanner scanner = region.getScanner(new Scan());
719     assertEquals(writtenRowCount, getScannedCount(scanner));
720 
721     
722     CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
723     try {
724       region.flush(true);
725       fail("Injected exception hasn't been thrown");
726     } catch (Throwable t) {
727       LOG.info("Expected simulated exception when flushing region,"
728           + t.getMessage());
729       
730       Mockito.doReturn(true).when(rsServices).isAborted();
731       region.setClosing(false); 
732       
733     }
734     
735     int moreRow = 10;
736     for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
737       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
738       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
739           Bytes.toBytes("val"));
740       region.put(put);
741     }
742     writtenRowCount += moreRow;
743     
744     CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
745     try {
746       region.flush(true);
747     } catch (IOException t) {
748       LOG.info("Expected exception when flushing region because server is stopped,"
749           + t.getMessage());
750     }
751 
752     region.close(true);
753     wal.shutdown();
754 
755     
756     runWALSplit(this.conf);
757     WAL wal2 = createWAL(this.conf);
758     Mockito.doReturn(false).when(rsServices).isAborted();
759     HRegion region2 =
760       HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
761     scanner = region2.getScanner(new Scan());
762     assertEquals(writtenRowCount, getScannedCount(scanner));
763   }
764 
765   private int getScannedCount(RegionScanner scanner) throws IOException {
766     int scannedCount = 0;
767     List<Cell> results = new ArrayList<Cell>();
768     while (true) {
769       boolean existMore = scanner.next(results);
770       if (!results.isEmpty())
771         scannedCount++;
772       if (!existMore)
773         break;
774       results.clear();
775     }
776     return scannedCount;
777   }
778 
779   
780 
781 
782 
783 
784   @Test
785   public void testReplayEditsWrittenIntoWAL() throws Exception {
786     final TableName tableName =
787         TableName.valueOf("testReplayEditsWrittenIntoWAL");
788     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
789     final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
790     deleteDir(basedir);
791 
792     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
793     HRegion region2 = HRegion.createHRegion(hri,
794             hbaseRootDir, this.conf, htd);
795     HRegion.closeHRegion(region2);
796     final WAL wal = createWAL(this.conf);
797     final byte[] rowName = tableName.getName();
798     final byte[] regionName = hri.getEncodedNameAsBytes();
799     final AtomicLong sequenceId = new AtomicLong(1);
800 
801     
802     final int countPerFamily = 1000;
803     Set<byte[]> familyNames = new HashSet<byte[]>();
804     for (HColumnDescriptor hcd: htd.getFamilies()) {
805       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
806           ee, wal, htd, sequenceId);
807       familyNames.add(hcd.getName());
808     }
809 
810     
811     wal.startCacheFlush(regionName, familyNames);
812     wal.completeCacheFlush(regionName);
813 
814     
815     WALEdit edit = new WALEdit();
816     long now = ee.currentTime();
817     edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
818       now, rowName));
819     wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
820         true, null);
821 
822     
823     edit = new WALEdit();
824     now = ee.currentTime();
825     edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
826       KeyValue.Type.DeleteFamily));
827     wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
828         true, null);
829 
830     
831     wal.sync();
832     
833     
834     final Configuration newConf = HBaseConfiguration.create(this.conf);
835     User user = HBaseTestingUtility.getDifferentUser(newConf,
836       ".replay.wal.secondtime");
837     user.runAs(new PrivilegedExceptionAction<Void>() {
838       @Override
839       public Void run() throws Exception {
840         runWALSplit(newConf);
841         FileSystem newFS = FileSystem.get(newConf);
842         
843         newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
844         
845         WAL newWal = createWAL(newConf);
846         final AtomicInteger flushcount = new AtomicInteger(0);
847         try {
848           final HRegion region =
849               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
850             @Override
851             protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
852                 final Collection<Store> storesToFlush, MonitoredTask status,
853                 boolean writeFlushWalMarker)
854                     throws IOException {
855               LOG.info("InternalFlushCache Invoked");
856               FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
857                   Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
858               flushcount.incrementAndGet();
859               return fs;
860             };
861           };
862           long seqid = region.initialize();
863           
864           assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
865           assertTrue(seqid - 1 == sequenceId.get());
866 
867           Get get = new Get(rowName);
868           Result result = region.get(get);
869           
870           assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
871             result.size());
872           region.close();
873         } finally {
874           newWal.close();
875         }
876         return null;
877       }
878     });
879   }
880 
881   @Test
882   
883   public void testSequentialEditLogSeqNum() throws IOException {
884     final TableName tableName = TableName.valueOf(currentTest.getMethodName());
885     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
886     final Path basedir =
887         FSUtils.getTableDir(this.hbaseRootDir, tableName);
888     deleteDir(basedir);
889     final byte[] rowName = tableName.getName();
890     final int countPerFamily = 10;
891     final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
892 
893     
894     MockWAL wal = createMockWAL();
895 
896     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
897     for (HColumnDescriptor hcd : htd.getFamilies()) {
898       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
899     }
900 
901     
902     
903     region.flush(true);
904     for (HColumnDescriptor hcd : htd.getFamilies()) {
905       addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
906     }
907     long lastestSeqNumber = region.getSequenceId().get();
908     
909     wal.doCompleteCacheFlush = true;
910     
911     
912     wal.completeCacheFlush(hri.getEncodedNameAsBytes());
913     wal.shutdown();
914     FileStatus[] listStatus = wal.getFiles();
915     assertNotNull(listStatus);
916     assertTrue(listStatus.length > 0);
917     WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
918         this.fs, this.conf, null, null, null, mode, wals);
919     FileStatus[] listStatus1 = this.fs.listStatus(
920       new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
921           "recovered.edits")), new PathFilter() {
922         @Override
923         public boolean accept(Path p) {
924           if (WALSplitter.isSequenceIdFile(p)) {
925             return false;
926           }
927           return true;
928         }
929       });
930     int editCount = 0;
931     for (FileStatus fileStatus : listStatus1) {
932       editCount = Integer.parseInt(fileStatus.getPath().getName());
933     }
934     
935     assertEquals(
936         "The sequence number of the recoverd.edits and the current edit seq should be same",
937         lastestSeqNumber, editCount);
938   }
939 
940   
941 
942 
943   @Test
944   public void testDatalossWhenInputError() throws IOException, InstantiationException,
945       IllegalAccessException {
946     final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
947     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
948     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
949     deleteDir(basedir);
950     final byte[] rowName = tableName.getName();
951     final int countPerFamily = 10;
952     final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
953     HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
954     Path regionDir = region1.getRegionFileSystem().getRegionDir();
955     HBaseTestingUtility.closeRegionAndWAL(region1);
956 
957     WAL wal = createWAL(this.conf);
958     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
959     for (HColumnDescriptor hcd : htd.getFamilies()) {
960       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
961     }
962     
963     final Get g = new Get(rowName);
964     Result result = region.get(g);
965     assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
966     
967     
968     region.close(true);
969     wal.shutdown();
970 
971     runWALSplit(this.conf);
972 
973     
974     Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
975     FSDataInputStream stream = fs.open(editFile);
976     stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
977     Class<? extends DefaultWALProvider.Reader> logReaderClass =
978         conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
979           DefaultWALProvider.Reader.class);
980     DefaultWALProvider.Reader reader = logReaderClass.newInstance();
981     reader.init(this.fs, editFile, conf, stream);
982     final long headerLength = stream.getPos();
983     reader.close();
984     FileSystem spyFs = spy(this.fs);
985     doAnswer(new Answer<FSDataInputStream>() {
986 
987       @Override
988       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
989         FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
990         Field field = FilterInputStream.class.getDeclaredField("in");
991         field.setAccessible(true);
992         final DFSInputStream in = (DFSInputStream) field.get(stream);
993         DFSInputStream spyIn = spy(in);
994         doAnswer(new Answer<Integer>() {
995 
996           private long pos;
997 
998           @Override
999           public Integer answer(InvocationOnMock invocation) throws Throwable {
1000             if (pos >= headerLength) {
1001               throw new IOException("read over limit");
1002             }
1003             int b = (Integer) invocation.callRealMethod();
1004             if (b > 0) {
1005               pos += b;
1006             }
1007             return b;
1008           }
1009         }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
1010         doAnswer(new Answer<Void>() {
1011 
1012           @Override
1013           public Void answer(InvocationOnMock invocation) throws Throwable {
1014             invocation.callRealMethod();
1015             in.close();
1016             return null;
1017           }
1018         }).when(spyIn).close();
1019         field.set(stream, spyIn);
1020         return stream;
1021       }
1022     }).when(spyFs).open(eq(editFile));
1023 
1024     WAL wal2 = createWAL(this.conf);
1025     HRegion region2;
1026     try {
1027       
1028       region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
1029       assertEquals(result.size(), region2.get(g).size());
1030     } catch (IOException e) {
1031       assertEquals("read over limit", e.getMessage());
1032     }
1033     region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
1034     assertEquals(result.size(), region2.get(g).size());
1035   }
1036 
1037   static class MockWAL extends FSHLog {
1038     boolean doCompleteCacheFlush = false;
1039 
1040     public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1041         throws IOException {
1042       super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1043     }
1044 
1045     @Override
1046     public void completeCacheFlush(byte[] encodedRegionName) {
1047       if (!doCompleteCacheFlush) {
1048         return;
1049       }
1050       super.completeCacheFlush(encodedRegionName);
1051     }
1052   }
1053 
1054   private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1055     HTableDescriptor htd = new HTableDescriptor(tableName);
1056     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1057     htd.addFamily(a);
1058     return htd;
1059   }
1060 
1061   private MockWAL createMockWAL() throws IOException {
1062     MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1063     
1064     
1065     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1066     return wal;
1067   }
1068 
1069   
1070   
1071   class TestFlusher implements FlushRequester {
1072     private HRegion r;
1073 
1074     @Override
1075     public void requestFlush(Region region, boolean force) {
1076       try {
1077         r.flush(force);
1078       } catch (IOException e) {
1079         throw new RuntimeException("Exception flushing", e);
1080       }
1081     }
1082 
1083     @Override
1084     public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) {
1085       
1086 
1087     }
1088 
1089     @Override
1090     public void registerFlushRequestListener(FlushRequestListener listener) {
1091 
1092     }
1093 
1094     @Override
1095     public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1096       return false;
1097     }
1098 
1099     @Override
1100     public void setGlobalMemstoreLimit(long globalMemStoreSize) {
1101 
1102     }
1103   }
1104 
1105   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1106       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1107       final HTableDescriptor htd, final AtomicLong sequenceId)
1108   throws IOException {
1109     String familyStr = Bytes.toString(family);
1110     for (int j = 0; j < count; j++) {
1111       byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
1112       byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
1113       WALEdit edit = new WALEdit();
1114       edit.add(new KeyValue(rowName, family, qualifierBytes,
1115         ee.currentTime(), columnBytes));
1116       wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()),
1117           edit, sequenceId, true, null);
1118     }
1119     wal.sync();
1120   }
1121 
1122   static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
1123       final int count, EnvironmentEdge ee, final Region r,
1124       final String qualifierPrefix)
1125   throws IOException {
1126     List<Put> puts = new ArrayList<Put>();
1127     for (int j = 0; j < count; j++) {
1128       byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1129       Put p = new Put(rowName);
1130       p.add(family, qualifier, ee.currentTime(), rowName);
1131       r.put(p);
1132       puts.add(p);
1133     }
1134     return puts;
1135   }
1136 
1137   
1138 
1139 
1140 
1141 
1142    private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1143     return new HRegionInfo(tableName, null, null, false);
1144    }
1145 
1146   
1147 
1148 
1149 
1150 
1151 
1152   private Path runWALSplit(final Configuration c) throws IOException {
1153     List<Path> splits = WALSplitter.split(
1154       hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1155     
1156     assertEquals("splits=" + splits, 1, splits.size());
1157     
1158     assertTrue(fs.exists(splits.get(0)));
1159     LOG.info("Split file=" + splits.get(0));
1160     return splits.get(0);
1161   }
1162 
1163   
1164 
1165 
1166 
1167 
1168   private WAL createWAL(final Configuration c) throws IOException {
1169     FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
1170     
1171     
1172     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1173     return wal;
1174   }
1175 
1176   private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1177     HTableDescriptor htd = new HTableDescriptor(tableName);
1178     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1179     htd.addFamily(a);
1180     HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1181     htd.addFamily(b);
1182     HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1183     htd.addFamily(c);
1184     return htd;
1185   }
1186 }